Skip to main content

River Job Queue

River is the Postgres-backed background job queue embedded inside the orchestrator process. It replaced the two Kubernetes CronJobs (memory-process, memory-maintain) with one in-process worker pool — zero new components, one image, one deployment.


Why River?

Problem with CronJobsRiver solution
Two extra pods to schedule, monitor, and roll outRuns inside the orchestrator binary — one deployment
Two extra Docker images to build and tagSingle crawbl-platform image
CronJob overlap if a previous run exceeded 1 minuteUniqueOpts on ByArgs + ByPeriod dedupe automatically
Minimum 1-minute latency for new drawer classificationExpected behavior — cold classification is not on the user-visible path. Phase 1/2 drawers skip the cold pipeline entirely.
K8s concurrency policy was fragileRiver enforces concurrency at the Postgres-row level

Migrated in backend commit 4051134 and argocd-apps commit b842deb. Issue #148 (CronJob overlap concern) is resolved.


Architecture

┌─────────────────────────────────────────────────────┐
│ Orchestrator Pod (Deployment) │
│ │
│ ┌───────────────────────────────────────────────┐ │
│ │ HTTP API │ │
│ │ (REST + WebSocket + MCP) │ │
│ │ ChatService — finalize() submits to │ │
│ │ autoingest.Pool (alitto/pond, NOT River) │ │
│ └───────────────────────────────────────────────┘ │
│ │
│ ┌───────────────────────────────────────────────┐ │
│ │ autoingest.Pool (in-process, NOT a River job) │ │
│ │ 16 workers, 1024-slot queue (pond v2) │ │
│ │ Submit is non-blocking; drops on full queue │ │
│ └───────────────────────────────────────────────┘ │
│ │
│ ┌───────────────────────────────────────────────┐ │
│ │ River Client (in-process queue) │ │
│ │ - ProcessWorker (memory_process, 1 min) │ │
│ │ - EnrichWorker (memory_enrich, 10 min) │ │
│ │ - MaintainWorker (memory_maintain daily) │ │
│ │ - CentroidRecompute (memory_centroid_recompute, Sun 03:00 UTC) │
│ │ - PricingRefresh (pricing_refresh daily) │ │
│ │ - UsageWriter (usage_write) │ │
│ │ - MessageCleanup (message_cleanup, 1m) │ │
│ │ - riverui dashboard (host-gated) │ │
│ └───────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────┘


┌────────────────┐
│ PostgreSQL │
│ river_job │
│ river_queue │
│ river_leader │
└────────────────┘
│ (usage_write jobs)

┌────────────────┐
│ ClickHouse │
│ llm_usage │
└────────────────┘

Connection Details

PropertyValue
LocationEmbedded in orchestrator binary
Queue statePostgreSQL tables (river_job, river_queue, river_leader, ...)
SchemaSame orchestrator database, river_* tables created by pkgriver.Migrate()
Worker countDefault River pool size
Migration driverriverdatabasesql (native database/sql)
Dashboardriverui, mounted at host CRAWBL_RIVERUI_HOST

Registered Workers

memory_process

Reclassifies raw drawers with LLM structured output and links entities into the knowledge graph. Only processes drawers with state=raw (those that did not qualify for the heuristic or centroid fast paths).

SettingValue
Args typequeue.MemoryProcessArgs{}
Queuememory_process
TriggerPeriodic (every 1 minute, RunOnStart: true)
UniquenessByArgs + ByPeriod (dedupes overlapping jobs)
Handlerjobs.RunProcess in internal/orchestrator/memory/jobs/process.go

Note: the ad-hoc enqueue from the auto-ingest path has been removed (commit 47433b8). The periodic sweep is the sole trigger. Raw drawers wait at most ~60 seconds before being claimed, which is acceptable because classification result is not on any user-visible critical path.

memory_enrich

Backfills entity linking and knowledge graph triples for heuristic- and centroid-classified drawers that bypassed the cold pipeline. Only runs on high-importance drawers (importance ≥ 3).

SettingValue
Args typequeue.MemoryEnrichArgs{}
Queuememory_enrich
TriggerPeriodic (every 10 minutes)
Handlerjobs.RunEnrich in internal/orchestrator/memory/jobs/enrich.go
Querystate=processed AND pipeline_tier <> 'llm' AND entity_count=0 AND importance >= 3.0 LIMIT 100

memory_maintain

Daily decay and pruning pass over active workspaces.

SettingValue
Args typequeue.MemoryMaintainArgs{}
Queuememory_maintain
TriggerPeriodic (daily at midnight UTC)
Handlerjobs.RunMaintain in internal/orchestrator/memory/jobs/maintain.go

memory_centroid_recompute

Weekly rebuild of the prototype centroid vectors used by Phase 2 k-NN classification. Aggregates the top 500 LLM-labelled drawers per memory type from the last 90 days and upserts into memory_type_centroids. The job is a no-op when source_hash is unchanged (no new LLM-labelled drawers since the last run). Training uses only pipeline_tier='llm' drawers to prevent a feedback loop.

SettingValue
Args typequeue.MemoryCentroidRecomputeArgs{}
Queuememory_centroid_recompute
TriggerPeriodic cron (0 3 * * 0 — Sunday 03:00 UTC)
Handlerjobs.RunCentroidRecompute in internal/orchestrator/memory/jobs/centroids.go

Also runs once at startup (best-effort, 30-second timeout) to seed the table after a fresh deploy.

message_cleanup

Marks stale pending messages as failed so the mobile UI never hangs on an orphaned placeholder.

SettingValue
Args typequeue.MessageCleanupArgs{}
Queuemessage_cleanup (single worker)
TriggerPeriodic (every 1 minute, deduped by a 30-second ByPeriod window)
Handlerqueue.MessageCleanup.Work — transitions every row in pending older than 5 minutes to failed via messageRepo.FailStalePending

pricing_refresh

Refreshes the Postgres model_pricing table with current per-token prices fetched from LiteLLM.

SettingValue
Args typequeue.PricingRefreshArgs{}
Queuepricing_refresh
TriggerPeriodic (daily)
HandlerFetches prices from LiteLLM, inserts new rows via modelpricingrepo

usage_write

Writes a single LLM usage event row to ClickHouse. Enqueued by orchestrator.usagepublisher.Publish() on every usage event received from the agent runtime.

SettingValue
Args typequeue.UsageEvent{}
Queueusage_write
TriggerAd-hoc — enqueued by usagepublisher.Publish() on each gRPC UsageEvent
Handlerqueue.UsageWriter.Work — inserts one row into ClickHouse llm_usage
BatchingClickHouse async_insert handles server-side coalescing; no client-side buffering needed
info

Because the job is persisted in Postgres before the worker executes it, usage events survive orchestrator restarts. At-least-once delivery applies — ClickHouse's event_id (UUID) can deduplicate if needed.

All seven workers live in internal/orchestrator/queue/ and are registered in a single call to queue.NewConfig(queue.Deps{...}). The four memory workers (memory_process, memory_maintain, memory_enrich, memory_centroid_recompute) are in memory_workers.go; the three cross-cutting workers (usage_write, pricing_refresh, message_cleanup) are in orchestrator_workers.go. There is no separate background.NewConfig or Register* step.


Wiring: queue.Deps

queue.Deps is the single dependency struct passed to queue.NewConfig. Individual workers read only the fields they need; missing fields degrade cleanly rather than blocking boot.

FieldTypeUsed by
DB*dbr.ConnectionAll workers (shared session factory)
Logger*slog.LoggerAll workers (structured logging)
DrawerRepomemrepo.DrawerRepomemory_process, memory_maintain, memory_enrich, memory_centroid_recompute
KGRepomemrepo.KGRepomemory_process, memory_enrich
CentroidRepomemrepo.CentroidRepomemory_centroid_recompute
LLMClassifierextract.LLMClassifiermemory_process, memory_enrich
Embedderembed.Embeddermemory_process
MessageRepoorchestratorrepo.MessageRepomessage_cleanup
ModelPricingRepomodelpricingrepo.Repopricing_refresh
LLMUsageRepollmusagerepo.Repousage_write

Any field may be nil when the matching feature is intentionally disabled (e.g. ClickHouse unavailable → LLMUsageRepo nil → usage_write worker no-ops silently). NewConfig does not refuse to return when a repo is missing.


Lifecycle

River is wired up in orchestrator.runServer():

  1. pkgriver.Migrate(ctx, *sql.DB) — applies River schema migrations against the orchestrator database
  2. queue.NewConfig(queue.Deps{...}) — builds the single *river.Config covering all seven workers, periodic jobs, and queues
  3. pkgriver.New(...) — constructs the River client
  4. client.Start(ctx) — starts the worker pool; jobs.RunCentroidRecompute runs once (best-effort, 30s timeout) to seed the centroid table after a fresh deploy
  5. defer pkgriver.Shutdown(client) — three-phase graceful shutdown (runs after ingestPool.Shutdown)

Graceful Shutdown

The full shutdown order on SIGTERM is:

  1. Socket.IO teardown
  2. ingestPool.Shutdown(shutdownCtx) — drain the pond pool
  3. pkgriver.Shutdown(riverClient) — three-phase River shutdown
PhaseDurationAction
1. Stop20sLet running jobs finish, reject new ones
2. StopAndCancel10sCancel in-flight jobs that haven't finished
3. Exit-Force close remaining connections

riverui Dashboard

The River UI dashboard is mounted on the orchestrator at a host controlled by the CRAWBL_RIVERUI_HOST env var. Traffic is gated at the edge by an Envoy SecurityPolicy that enforces basic auth.

Access

SettingValue
Host env varCRAWBL_RIVERUI_HOST (e.g., dev.river.crawbl.com)
Basic auth secretcrawbl/dev/riverui/auth (AWS Secrets Manager, .htpasswd key)
Kubernetes resourceshttproute-river.yaml, security-policy-river.yaml, es-riverui-secrets.yaml (under components/orchestrator/resources/)
DNS recordAuto-created by external-dns on HTTPRoute admission

Use the dashboard to inspect job states, retry counts, error messages, and periodic schedules.


Debugging

Check River schema exists

kubectl port-forward svc/backend-postgresql 5432:5432 -n backend
psql -h localhost -U crawbl -d crawbl -c "\dt orchestrator.river_*"

View failed jobs

SELECT id, kind, state, attempt, errors, finalized_at
FROM orchestrator.river_job
WHERE state IN ('retryable', 'discarded')
ORDER BY finalized_at DESC
LIMIT 20;

Manually enqueue a memory_process job

-- Use only for debugging; the 1-minute periodic sweep handles this automatically.
INSERT INTO orchestrator.river_job (kind, args, queue, state, priority, attempt, max_attempts, metadata, tags)
VALUES ('memory_process', '{}', 'default', 'available', 1, 0, 25, '{}', '{}');

Check riverui logs

kubectl logs -n backend -l app.kubernetes.io/name=orchestrator --tail=50 | grep -i river

Common issues

SymptomLikely CauseFix
riverui returns 401Basic auth secret not syncedCheck backend-riverui-auth secret exists
Jobs stuck in availableWorker pool down or Postgres unreachableCheck orchestrator pod logs and Postgres connectivity
Duplicate jobs runningUniqueOpts not configured on periodic jobConfirm queue.NewConfig sets ByArgs + ByPeriod on the relevant InsertOpts() method in internal/orchestrator/queue/types.go
Memory classification delayed > 60smemory_process periodic sweep not runningCheck orchestrator logs for River startup errors
Centroid table empty after deployInsufficient LLM-labelled samples (< 50/type)Expected on first deploy; Phase 2 activates after weekly recompute
Enrich backlog growingHigh heuristic/centroid drawer rateCheck memory.enrich.sweep log for remaining_backlog; scale if > 10,000 sustained

Source Files

FilePurpose
internal/pkg/river/River client/config type aliases, Migrate(), Shutdown() helper
internal/orchestrator/memory/autoingest/In-process pond pool for hot-path auto-ingest (NOT a River worker)
internal/orchestrator/queue/types.goAll static symbols: queue name constants, Args types, InsertOpts, Worker struct declarations, tag + metadata vars, event payloads, Deps struct
internal/orchestrator/queue/config.goNewConfig(Deps) — single entry point that registers all seven workers and periodic jobs into one river.Config
internal/orchestrator/queue/memory_workers.goMemoryProcessWorker, MemoryMaintainWorker, MemoryEnrichWorker, MemoryCentroidRecomputeWorker River adapters
internal/orchestrator/queue/orchestrator_workers.goUsageWriter, PricingRefresh, MessageCleanup River adapters + LiteLLM fetch helpers
internal/orchestrator/queue/publishers.goMemoryPublisher (NATS) + UsagePublisher (River insert) + shared event stamper
internal/orchestrator/memory/jobs/process.goRunProcess(ctx, deps) — cold LLM reclassification logic
internal/orchestrator/memory/jobs/enrich.goRunEnrich(ctx, deps) — KG backfill sweep logic
internal/orchestrator/memory/jobs/maintain.goRunMaintain(ctx, deps) — decay + prune logic
internal/orchestrator/memory/jobs/centroids.goRunCentroidRecompute(ctx, deps) — centroid aggregation logic
internal/orchestrator/memory/repo/centroidrepo/postgres.goGetAll, Upsert, NearestType, ListCentroidTrainingSamples implementation
cmd/crawbl/platform/orchestrator/orchestrator.goWires River and autoingest.Pool into runServer()
.omc/plans/memory-river-migration.mdOriginal River migration plan (backend repo)
.omc/plans/mempalace-heuristic-knn-phase1-2.mdPhase 0/1/2 cost-reduction plan (backend repo)
components/orchestrator/resources/httproute-river.yamlriverui HTTPRoute (argocd-apps)
components/orchestrator/resources/security-policy-river.yamlEdge auth policy (argocd-apps)
components/orchestrator/resources/es-riverui-secrets.yamlExternalSecret for basic auth (argocd-apps)

What's next: See the MemPalace Memory Pipeline to understand how River jobs drive the memory classification pipeline, or the Database Table Definitions for the memory_drawers.state machine that River workers advance.