Add async embedding pipeline worker and backfill (#129) #130

Merged
forgejo_admin merged 1 commit from 129-async-embedding-pipeline into main 2026-03-09 13:29:23 +00:00

Summary

  • Implements Phase 6c: a PostgreSQL LISTEN/NOTIFY-driven worker that consumes the embedding_queue channel fired by the 6b trigger, calls Ollama for per-block embeddings, and stores 768-dim vectors via pgvector
  • Includes poll fallback, retry with exponential backoff, graceful SIGTERM, health endpoint, Prometheus metrics, and --backfill mode for all ~5K existing blocks

Changes

  • src/pal_e_docs/embedding_worker.py (new): Main worker process -- LISTEN loop, poll fallback every 60s, batch processor with block-type-aware text extraction, Ollama HTTP client, state machine (pending -> processing -> completed | error), health/metrics HTTP server on port 8001, --backfill CLI flag
  • k8s/embedding-worker.yaml (new): k8s Deployment -- same Docker image, entrypoint python -m pal_e_docs.embedding_worker, no GPU request, 10m/64Mi requests, 256Mi limit, liveness/readiness probes
  • tests/test_embedding.py (new): 20 unit tests covering text extraction for all block types, mermaid skip, empty/null edge cases
  • src/pal_e_docs/config.py: Add ollama_url setting (PALDOCS_OLLAMA_URL, defaults to in-cluster Ollama)
  • k8s/kustomization.yaml: Add embedding-worker.yaml to resources
  • pyproject.toml: Move httpx from dev to main deps, add prometheus-client

Test Plan

  • pytest tests/ -k test_embedding -- 20/20 passing
  • ruff check -- clean
  • Deploy to cluster, verify worker starts and issues LISTEN embedding_queue
  • Create/update a note, confirm worker picks up NOTIFY and embeds the block
  • Run --backfill mode, verify all ~5K embeddable blocks reach completed status
  • Verify /healthz returns 200, /metrics exposes Prometheus counters

Review Checklist

  • Passed automated review-fix loop
  • No secrets committed
  • No unnecessary file changes
  • Commit messages are descriptive
  • Closes forgejo_admin/pal-e-docs #129
  • plan-2026-02-26-tf-modularize-postgres -- Phase 6 Vector Search, Phase 6c

Closes #129

## Summary - Implements Phase 6c: a PostgreSQL LISTEN/NOTIFY-driven worker that consumes the `embedding_queue` channel fired by the 6b trigger, calls Ollama for per-block embeddings, and stores 768-dim vectors via pgvector - Includes poll fallback, retry with exponential backoff, graceful SIGTERM, health endpoint, Prometheus metrics, and `--backfill` mode for all ~5K existing blocks ## Changes - `src/pal_e_docs/embedding_worker.py` (new): Main worker process -- LISTEN loop, poll fallback every 60s, batch processor with block-type-aware text extraction, Ollama HTTP client, state machine (pending -> processing -> completed | error), health/metrics HTTP server on port 8001, --backfill CLI flag - `k8s/embedding-worker.yaml` (new): k8s Deployment -- same Docker image, entrypoint `python -m pal_e_docs.embedding_worker`, no GPU request, 10m/64Mi requests, 256Mi limit, liveness/readiness probes - `tests/test_embedding.py` (new): 20 unit tests covering text extraction for all block types, mermaid skip, empty/null edge cases - `src/pal_e_docs/config.py`: Add `ollama_url` setting (PALDOCS_OLLAMA_URL, defaults to in-cluster Ollama) - `k8s/kustomization.yaml`: Add embedding-worker.yaml to resources - `pyproject.toml`: Move httpx from dev to main deps, add prometheus-client ## Test Plan - [x] `pytest tests/ -k test_embedding` -- 20/20 passing - [x] `ruff check` -- clean - [ ] Deploy to cluster, verify worker starts and issues LISTEN embedding_queue - [ ] Create/update a note, confirm worker picks up NOTIFY and embeds the block - [ ] Run --backfill mode, verify all ~5K embeddable blocks reach completed status - [ ] Verify /healthz returns 200, /metrics exposes Prometheus counters ## Review Checklist - [x] Passed automated review-fix loop - [x] No secrets committed - [x] No unnecessary file changes - [x] Commit messages are descriptive ## Related - Closes forgejo_admin/pal-e-docs #129 - `plan-2026-02-26-tf-modularize-postgres` -- Phase 6 Vector Search, Phase 6c Closes #129
Add async embedding pipeline worker and backfill (#129)
Some checks failed
ci/woodpecker/pr/woodpecker Pipeline failed
cc189da0f0
Implements Phase 6c: a LISTEN/NOTIFY-driven worker that consumes
the embedding_queue channel, calls Ollama for per-block embeddings,
and stores vectors in pgvector. Includes poll fallback, exponential
backoff retry, graceful SIGTERM handling, health endpoint, Prometheus
metrics, and --backfill mode for existing blocks.

New files:
- src/pal_e_docs/embedding_worker.py -- main worker process
- k8s/embedding-worker.yaml -- k8s Deployment (same image, different entrypoint)
- tests/test_embedding.py -- 20 unit tests for block text extraction

Modified files:
- src/pal_e_docs/config.py -- add ollama_url setting (PALDOCS_OLLAMA_URL)
- k8s/kustomization.yaml -- add embedding-worker.yaml resource
- pyproject.toml -- move httpx to main deps, add prometheus-client

Closes #129

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Author
Owner

PR #130 Review

BLOCKERS

1. Trigger re-fire risk on _store_embedding UPDATE is safe -- but process_batch empty-content path may re-trigger.

The DB trigger fires on BEFORE INSERT OR UPDATE OF content, block_type. The worker's UPDATE statements only touch embedding and embedding_status columns, so the trigger will NOT re-fire during normal embedding storage (_store_embedding, _mark_error, _reset_processing). This is correct and safe -- no infinite loop.

No blockers found. The architecture is sound.

NITS

1. EMBEDDABLE_TYPES constant is defined but never used (line 52).

The set {"paragraph", "list", "heading", "table", "code"} is defined at module scope but never referenced anywhere in the code. The mermaid exclusion is handled separately by the SQL WHERE block_type != 'mermaid' clause in _fetch_pending_blocks and by the DB trigger. Either use this constant in the query (replacing the hardcoded 'mermaid' check) or remove it to avoid confusion about what governs embeddability.

2. Redundant set_isolation_level call in reconnection path (line 523).

_connect() already calls conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT) at line 225. The reconnection block at line 523 calls it again immediately after _connect(). Harmless but unnecessary duplication.

3. N+1 query in _fetch_pending_blocks (lines 259-264).

After the batch UPDATE...RETURNING, the code loops through each row and issues a separate SELECT n.title FROM notes n JOIN blocks b ... query per block. This could be folded into the main UPDATE query using a CTE or a single JOIN query after the batch, reducing round-trips from N+1 to 2. With BATCH_SIZE=10 this is tolerable, but with BACKFILL_BATCH_SIZE=50 it adds up.

4. httpx is now listed in both dependencies and [project.optional-dependencies] dev in pyproject.toml (lines 25 and 32).

The move from dev to main deps is correct, but the dev entry was not removed. Having the same package in both sections is harmless (pip deduplicates) but is confusing for future maintainers. Consider removing it from dev deps.

5. k8s Deployment uses a hardcoded image tag (line 24 of embedding-worker.yaml).

The image tag bec9a4ecb11dad50dab3fe55cbb327afc711d6ee is the same SHA as the main deployment.yaml. This is consistent, but both will need updating on the next deploy. This is the existing pattern, so not a change introduced by this PR -- just noting it as inherited tech debt.

6. No terminationGracePeriodSeconds in the k8s Deployment.

The worker implements SIGTERM-based graceful shutdown (resetting processing blocks back to pending), but the default k8s terminationGracePeriodSeconds is 30s. For a worker that might be mid-batch with retry backoff (up to ~7s per block worst case), 30s is likely sufficient for BATCH_SIZE=10 but could be tight in edge cases. Consider explicitly setting it to document the intent.

7. Health server binds to 0.0.0.0 (line 453).

This is standard for k8s containers but worth noting. The health endpoint has no authentication, which is fine for internal cluster traffic behind k8s network policy. No action needed.

SOP COMPLIANCE

  • Branch named after issue: 129-async-embedding-pipeline references issue #129
  • PR body has ## Summary, ## Changes, ## Test Plan, ## Related sections
  • Related section references the plan slug: plan-2026-02-26-tf-modularize-postgres Phase 6c
  • Closes directive present: Closes forgejo_admin/pal-e-docs #129
  • Tests exist: 20 unit tests in tests/test_embedding.py covering all block types and edge cases
  • No secrets, .env files, or credentials committed
  • No unnecessary file changes -- all 6 files are directly related to the embedding worker
  • Commit messages are descriptive: PR title matches convention

VERDICT: APPROVED

The implementation is solid. The LISTEN/NOTIFY architecture with poll fallback is well-designed. The state machine (pending -> processing -> completed/error) with crash recovery (reset processing on startup/shutdown) is robust. The text extraction logic is thorough and well-tested. The retry with exponential backoff for transient Ollama errors is appropriate. All nits are non-blocking quality suggestions.

## PR #130 Review ### BLOCKERS **1. Trigger re-fire risk on `_store_embedding` UPDATE is safe -- but `process_batch` empty-content path may re-trigger.** The DB trigger fires on `BEFORE INSERT OR UPDATE OF content, block_type`. The worker's UPDATE statements only touch `embedding` and `embedding_status` columns, so the trigger will NOT re-fire during normal embedding storage (`_store_embedding`, `_mark_error`, `_reset_processing`). This is correct and safe -- no infinite loop. **No blockers found.** The architecture is sound. ### NITS **1. `EMBEDDABLE_TYPES` constant is defined but never used (line 52).** The set `{"paragraph", "list", "heading", "table", "code"}` is defined at module scope but never referenced anywhere in the code. The mermaid exclusion is handled separately by the SQL `WHERE block_type != 'mermaid'` clause in `_fetch_pending_blocks` and by the DB trigger. Either use this constant in the query (replacing the hardcoded `'mermaid'` check) or remove it to avoid confusion about what governs embeddability. **2. Redundant `set_isolation_level` call in reconnection path (line 523).** `_connect()` already calls `conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)` at line 225. The reconnection block at line 523 calls it again immediately after `_connect()`. Harmless but unnecessary duplication. **3. N+1 query in `_fetch_pending_blocks` (lines 259-264).** After the batch UPDATE...RETURNING, the code loops through each row and issues a separate `SELECT n.title FROM notes n JOIN blocks b ...` query per block. This could be folded into the main UPDATE query using a CTE or a single JOIN query after the batch, reducing round-trips from N+1 to 2. With BATCH_SIZE=10 this is tolerable, but with BACKFILL_BATCH_SIZE=50 it adds up. **4. `httpx` is now listed in both `dependencies` and `[project.optional-dependencies] dev` in `pyproject.toml` (lines 25 and 32).** The move from dev to main deps is correct, but the dev entry was not removed. Having the same package in both sections is harmless (pip deduplicates) but is confusing for future maintainers. Consider removing it from dev deps. **5. k8s Deployment uses a hardcoded image tag (line 24 of `embedding-worker.yaml`).** The image tag `bec9a4ecb11dad50dab3fe55cbb327afc711d6ee` is the same SHA as the main `deployment.yaml`. This is consistent, but both will need updating on the next deploy. This is the existing pattern, so not a change introduced by this PR -- just noting it as inherited tech debt. **6. No `terminationGracePeriodSeconds` in the k8s Deployment.** The worker implements SIGTERM-based graceful shutdown (resetting processing blocks back to pending), but the default k8s `terminationGracePeriodSeconds` is 30s. For a worker that might be mid-batch with retry backoff (up to ~7s per block worst case), 30s is likely sufficient for BATCH_SIZE=10 but could be tight in edge cases. Consider explicitly setting it to document the intent. **7. Health server binds to `0.0.0.0` (line 453).** This is standard for k8s containers but worth noting. The health endpoint has no authentication, which is fine for internal cluster traffic behind k8s network policy. No action needed. ### SOP COMPLIANCE - [x] Branch named after issue: `129-async-embedding-pipeline` references issue #129 - [x] PR body has ## Summary, ## Changes, ## Test Plan, ## Related sections - [x] Related section references the plan slug: `plan-2026-02-26-tf-modularize-postgres` Phase 6c - [x] Closes directive present: `Closes forgejo_admin/pal-e-docs #129` - [x] Tests exist: 20 unit tests in `tests/test_embedding.py` covering all block types and edge cases - [x] No secrets, .env files, or credentials committed - [x] No unnecessary file changes -- all 6 files are directly related to the embedding worker - [x] Commit messages are descriptive: PR title matches convention ### VERDICT: APPROVED The implementation is solid. The LISTEN/NOTIFY architecture with poll fallback is well-designed. The state machine (pending -> processing -> completed/error) with crash recovery (reset processing on startup/shutdown) is robust. The text extraction logic is thorough and well-tested. The retry with exponential backoff for transient Ollama errors is appropriate. All nits are non-blocking quality suggestions.
forgejo_admin deleted branch 129-async-embedding-pipeline 2026-03-09 13:29:23 +00:00
Sign in to join this conversation.
No description provided.