River Job Queue
River is the Postgres-backed background job queue embedded inside the orchestrator process. It replaced the two Kubernetes CronJobs (memory-process, memory-maintain) with one in-process worker pool — zero new components, one image, one deployment.
Why River?
| Problem with CronJobs | River solution |
|---|---|
| Two extra pods to schedule, monitor, and roll out | Runs inside the orchestrator binary — one deployment |
| Two extra Docker images to build and tag | Single crawbl-platform image |
| CronJob overlap if a previous run exceeded 1 minute | UniqueOpts on ByArgs + ByPeriod dedupe automatically |
| Minimum 1-minute latency for new drawer classification | Expected behavior — cold classification is not on the user-visible path. Phase 1/2 drawers skip the cold pipeline entirely. |
| K8s concurrency policy was fragile | River enforces concurrency at the Postgres-row level |
Migrated in backend commit 4051134 and argocd-apps commit b842deb. Issue #148 (CronJob overlap concern) is resolved.
Architecture
┌─────────────────────────────────────────────────────┐
│ Orchestrator Pod (Deployment) │
│ │
│ ┌───────────────────────────────────────────────┐ │
│ │ HTTP API │ │
│ │ (REST + WebSocket + MCP) │ │
│ │ ChatService — finalize() submits to │ │
│ │ autoingest.Pool (alitto/pond, NOT River) │ │
│ └───────────────────────────────────────────────┘ │
│ │
│ ┌───────────────────────────────────────────────┐ │
│ │ autoingest.Pool (in-process, NOT a River job) │ │
│ │ 16 workers, 1024-slot queue (pond v2) │ │
│ │ Submit is non-blocking; drops on full queue │ │
│ └───────────────────────────────────────────────┘ │
│ │
│ ┌───────────────────────────────────────────────┐ │
│ │ River Client (in-process queue) │ │
│ │ - ProcessWorker (memory_process, 1 min) │ │
│ │ - EnrichWorker (memory_enrich, 10 min) │ │
│ │ - MaintainWorker (memory_maintain daily) │ │
│ │ - CentroidRecompute (memory_centroid_recompute, Sun 03:00 UTC) │
│ │ - PricingRefresh (pricing_refresh daily) │ │
│ │ - UsageWriter (usage_write) │ │
│ │ - MessageCleanup (message_cleanup, 1m) │ │
│ │ - riverui dashboard (host-gated) │ │
│ └───────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────┘
│
▼
┌────────────────┐
│ PostgreSQL │
│ river_job │
│ river_queue │
│ river_leader │
└────────────────┘
│ (usage_write jobs)
▼
┌────────────────┐
│ ClickHouse │
│ llm_usage │
└────────────────┘
Connection Details
| Property | Value |
|---|---|
| Location | Embedded in orchestrator binary |
| Queue state | PostgreSQL tables (river_job, river_queue, river_leader, ...) |
| Schema | Same orchestrator database, river_* tables created by pkgriver.Migrate() |
| Worker count | Default River pool size |
| Migration driver | riverdatabasesql (native database/sql) |
| Dashboard | riverui, mounted at host CRAWBL_RIVERUI_HOST |
Registered Workers
memory_process
Reclassifies raw drawers with LLM structured output and links entities into the knowledge graph. Only processes drawers with state=raw (those that did not qualify for the heuristic or centroid fast paths).
| Setting | Value |
|---|---|
| Args type | queue.MemoryProcessArgs{} |
| Queue | memory_process |
| Trigger | Periodic (every 1 minute, RunOnStart: true) |
| Uniqueness | ByArgs + ByPeriod (dedupes overlapping jobs) |
| Handler | jobs.RunProcess in internal/orchestrator/memory/jobs/process.go |
Note: the ad-hoc enqueue from the auto-ingest path has been removed (commit 47433b8). The periodic sweep is the sole trigger. Raw drawers wait at most ~60 seconds before being claimed, which is acceptable because classification result is not on any user-visible critical path.
memory_enrich
Backfills entity linking and knowledge graph triples for heuristic- and centroid-classified drawers that bypassed the cold pipeline. Only runs on high-importance drawers (importance ≥ 3).
| Setting | Value |
|---|---|
| Args type | queue.MemoryEnrichArgs{} |
| Queue | memory_enrich |
| Trigger | Periodic (every 10 minutes) |
| Handler | jobs.RunEnrich in internal/orchestrator/memory/jobs/enrich.go |
| Query | state=processed AND pipeline_tier <> 'llm' AND entity_count=0 AND importance >= 3.0 LIMIT 100 |
memory_maintain
Daily decay and pruning pass over active workspaces.
| Setting | Value |
|---|---|
| Args type | queue.MemoryMaintainArgs{} |
| Queue | memory_maintain |
| Trigger | Periodic (daily at midnight UTC) |
| Handler | jobs.RunMaintain in internal/orchestrator/memory/jobs/maintain.go |
memory_centroid_recompute
Weekly rebuild of the prototype centroid vectors used by Phase 2 k-NN classification. Aggregates the top 500 LLM-labelled drawers per memory type from the last 90 days and upserts into memory_type_centroids. The job is a no-op when source_hash is unchanged (no new LLM-labelled drawers since the last run). Training uses only pipeline_tier='llm' drawers to prevent a feedback loop.
| Setting | Value |
|---|---|
| Args type | queue.MemoryCentroidRecomputeArgs{} |
| Queue | memory_centroid_recompute |
| Trigger | Periodic cron (0 3 * * 0 — Sunday 03:00 UTC) |
| Handler | jobs.RunCentroidRecompute in internal/orchestrator/memory/jobs/centroids.go |
Also runs once at startup (best-effort, 30-second timeout) to seed the table after a fresh deploy.
message_cleanup
Marks stale pending messages as failed so the mobile UI never hangs on an orphaned placeholder.
| Setting | Value |
|---|---|
| Args type | queue.MessageCleanupArgs{} |
| Queue | message_cleanup (single worker) |
| Trigger | Periodic (every 1 minute, deduped by a 30-second ByPeriod window) |
| Handler | queue.MessageCleanup.Work — transitions every row in pending older than 5 minutes to failed via messageRepo.FailStalePending |
pricing_refresh
Refreshes the Postgres model_pricing table with current per-token prices fetched from LiteLLM.
| Setting | Value |
|---|---|
| Args type | queue.PricingRefreshArgs{} |
| Queue | pricing_refresh |
| Trigger | Periodic (daily) |
| Handler | Fetches prices from LiteLLM, inserts new rows via modelpricingrepo |
usage_write
Writes a single LLM usage event row to ClickHouse. Enqueued by orchestrator.usagepublisher.Publish() on every usage event received from the agent runtime.
| Setting | Value |
|---|---|
| Args type | queue.UsageEvent{} |
| Queue | usage_write |
| Trigger | Ad-hoc — enqueued by usagepublisher.Publish() on each gRPC UsageEvent |
| Handler | queue.UsageWriter.Work — inserts one row into ClickHouse llm_usage |
| Batching | ClickHouse async_insert handles server-side coalescing; no client-side buffering needed |
Because the job is persisted in Postgres before the worker executes it, usage events survive orchestrator restarts. At-least-once delivery applies — ClickHouse's event_id (UUID) can deduplicate if needed.
All seven workers live in internal/orchestrator/queue/ and are registered in a single call to queue.NewConfig(queue.Deps{...}). The four memory workers (memory_process, memory_maintain, memory_enrich, memory_centroid_recompute) are in memory_workers.go; the three cross-cutting workers (usage_write, pricing_refresh, message_cleanup) are in orchestrator_workers.go. There is no separate background.NewConfig or Register* step.
Wiring: queue.Deps
queue.Deps is the single dependency struct passed to queue.NewConfig. Individual workers read only the fields they need; missing fields degrade cleanly rather than blocking boot.
| Field | Type | Used by |
|---|---|---|
DB | *dbr.Connection | All workers (shared session factory) |
Logger | *slog.Logger | All workers (structured logging) |
DrawerRepo | memrepo.DrawerRepo | memory_process, memory_maintain, memory_enrich, memory_centroid_recompute |
KGRepo | memrepo.KGRepo | memory_process, memory_enrich |
CentroidRepo | memrepo.CentroidRepo | memory_centroid_recompute |
LLMClassifier | extract.LLMClassifier | memory_process, memory_enrich |
Embedder | embed.Embedder | memory_process |
MessageRepo | orchestratorrepo.MessageRepo | message_cleanup |
ModelPricingRepo | modelpricingrepo.Repo | pricing_refresh |
LLMUsageRepo | llmusagerepo.Repo | usage_write |
Any field may be nil when the matching feature is intentionally disabled (e.g. ClickHouse unavailable → LLMUsageRepo nil → usage_write worker no-ops silently). NewConfig does not refuse to return when a repo is missing.
Lifecycle
River is wired up in orchestrator.runServer():
pkgriver.Migrate(ctx, *sql.DB)— applies River schema migrations against the orchestrator databasequeue.NewConfig(queue.Deps{...})— builds the single*river.Configcovering all seven workers, periodic jobs, and queuespkgriver.New(...)— constructs the River clientclient.Start(ctx)— starts the worker pool;jobs.RunCentroidRecomputeruns once (best-effort, 30s timeout) to seed the centroid table after a fresh deploydefer pkgriver.Shutdown(client)— three-phase graceful shutdown (runs afteringestPool.Shutdown)
Graceful Shutdown
The full shutdown order on SIGTERM is:
- Socket.IO teardown
ingestPool.Shutdown(shutdownCtx)— drain the pond poolpkgriver.Shutdown(riverClient)— three-phase River shutdown
| Phase | Duration | Action |
|---|---|---|
| 1. Stop | 20s | Let running jobs finish, reject new ones |
| 2. StopAndCancel | 10s | Cancel in-flight jobs that haven't finished |
| 3. Exit | - | Force close remaining connections |
riverui Dashboard
The River UI dashboard is mounted on the orchestrator at a host controlled by the CRAWBL_RIVERUI_HOST env var. Traffic is gated at the edge by an Envoy SecurityPolicy that enforces basic auth.
Access
| Setting | Value |
|---|---|
| Host env var | CRAWBL_RIVERUI_HOST (e.g., dev.river.crawbl.com) |
| Basic auth secret | crawbl/dev/riverui/auth (AWS Secrets Manager, .htpasswd key) |
| Kubernetes resources | httproute-river.yaml, security-policy-river.yaml, es-riverui-secrets.yaml (under components/orchestrator/resources/) |
| DNS record | Auto-created by external-dns on HTTPRoute admission |
Use the dashboard to inspect job states, retry counts, error messages, and periodic schedules.
Debugging
Check River schema exists
kubectl port-forward svc/backend-postgresql 5432:5432 -n backend
psql -h localhost -U crawbl -d crawbl -c "\dt orchestrator.river_*"
View failed jobs
SELECT id, kind, state, attempt, errors, finalized_at
FROM orchestrator.river_job
WHERE state IN ('retryable', 'discarded')
ORDER BY finalized_at DESC
LIMIT 20;
Manually enqueue a memory_process job
-- Use only for debugging; the 1-minute periodic sweep handles this automatically.
INSERT INTO orchestrator.river_job (kind, args, queue, state, priority, attempt, max_attempts, metadata, tags)
VALUES ('memory_process', '{}', 'default', 'available', 1, 0, 25, '{}', '{}');
Check riverui logs
kubectl logs -n backend -l app.kubernetes.io/name=orchestrator --tail=50 | grep -i river
Common issues
| Symptom | Likely Cause | Fix |
|---|---|---|
| riverui returns 401 | Basic auth secret not synced | Check backend-riverui-auth secret exists |
Jobs stuck in available | Worker pool down or Postgres unreachable | Check orchestrator pod logs and Postgres connectivity |
| Duplicate jobs running | UniqueOpts not configured on periodic job | Confirm queue.NewConfig sets ByArgs + ByPeriod on the relevant InsertOpts() method in internal/orchestrator/queue/types.go |
| Memory classification delayed > 60s | memory_process periodic sweep not running | Check orchestrator logs for River startup errors |
| Centroid table empty after deploy | Insufficient LLM-labelled samples (< 50/type) | Expected on first deploy; Phase 2 activates after weekly recompute |
| Enrich backlog growing | High heuristic/centroid drawer rate | Check memory.enrich.sweep log for remaining_backlog; scale if > 10,000 sustained |
Source Files
| File | Purpose |
|---|---|
internal/pkg/river/ | River client/config type aliases, Migrate(), Shutdown() helper |
internal/orchestrator/memory/autoingest/ | In-process pond pool for hot-path auto-ingest (NOT a River worker) |
internal/orchestrator/queue/types.go | All static symbols: queue name constants, Args types, InsertOpts, Worker struct declarations, tag + metadata vars, event payloads, Deps struct |
internal/orchestrator/queue/config.go | NewConfig(Deps) — single entry point that registers all seven workers and periodic jobs into one river.Config |
internal/orchestrator/queue/memory_workers.go | MemoryProcessWorker, MemoryMaintainWorker, MemoryEnrichWorker, MemoryCentroidRecomputeWorker River adapters |
internal/orchestrator/queue/orchestrator_workers.go | UsageWriter, PricingRefresh, MessageCleanup River adapters + LiteLLM fetch helpers |
internal/orchestrator/queue/publishers.go | MemoryPublisher (NATS) + UsagePublisher (River insert) + shared event stamper |
internal/orchestrator/memory/jobs/process.go | RunProcess(ctx, deps) — cold LLM reclassification logic |
internal/orchestrator/memory/jobs/enrich.go | RunEnrich(ctx, deps) — KG backfill sweep logic |
internal/orchestrator/memory/jobs/maintain.go | RunMaintain(ctx, deps) — decay + prune logic |
internal/orchestrator/memory/jobs/centroids.go | RunCentroidRecompute(ctx, deps) — centroid aggregation logic |
internal/orchestrator/memory/repo/centroidrepo/ | postgres.go — GetAll, Upsert, NearestType, ListCentroidTrainingSamples implementation |
cmd/crawbl/platform/orchestrator/orchestrator.go | Wires River and autoingest.Pool into runServer() |
.omc/plans/memory-river-migration.md | Original River migration plan (backend repo) |
.omc/plans/mempalace-heuristic-knn-phase1-2.md | Phase 0/1/2 cost-reduction plan (backend repo) |
components/orchestrator/resources/httproute-river.yaml | riverui HTTPRoute (argocd-apps) |
components/orchestrator/resources/security-policy-river.yaml | Edge auth policy (argocd-apps) |
components/orchestrator/resources/es-riverui-secrets.yaml | ExternalSecret for basic auth (argocd-apps) |
What's next: See the MemPalace Memory Pipeline to understand how River jobs drive the memory classification pipeline, or the Database Table Definitions for the memory_drawers.state machine that River workers advance.