Sweeper¶
The background sweeper keeps the system healthy by cleaning up stale workers, soft-deleting orphan jobs, and failing stuck internal tasks. Without it, workers that crash or lose network connectivity would leave behind claimed tasks and dangling job registrations indefinitely.
Overview¶
The sweeper is a background asyncio coroutine (run_sweeper()) that the host app starts explicitly. It runs on a fixed interval and performs three cleanup operations in sequence, each with its own fresh database session.
Sweeper Loop¶
The host app creates the sweeper as a background task during startup:
get_session is an async generator that yields AsyncSession instances. The sweeper calls it twice per cycle -- once for stale worker cleanup and once for stuck internal task cleanup -- so each operation gets its own session and transaction boundary.
Each cycle:
- Sleep for
sweeper_interval_seconds(default 30s) - Clean up stale workers (external)
- Emit accumulated Socket.IO events from step 2
- Clean up stuck internal tasks (
@internal) - Emit accumulated Socket.IO events from step 4
- Log any errors but continue running
flowchart TD
A[Sleep sweeper_interval_seconds] --> B[Open session 1]
B --> C[cleanup_stale_workers]
C --> D[Emit events]
D --> E[Close session 1]
E --> F[Open session 2]
F --> G[cleanup_stuck_internal_tasks]
G --> H[Emit events]
H --> I[Close session 2]
I --> A
C -- "exception" --> ERR[Log error]
G -- "exception" --> ERR
ERR --> A
The try/except wraps the entire body of the loop. If any step raises, the error is logged and the sweeper continues to the next cycle. The sweeper never exits unless the event loop is shut down.
Stale Worker Cleanup¶
cleanup_stale_workers(session, timeout) queries for workers where last_heartbeat < now - worker_timeout_seconds (default 60s).
For each stale worker found, it calls cleanup_worker(session, worker) -- the same function used by the graceful disconnect path and Socket.IO disconnect handler. After processing all stale workers, it commits once (batching all changes in a single transaction).
cleanup_worker Cascade¶
cleanup_worker() performs a multi-step cascade for a single worker. It does NOT commit -- the caller is responsible for committing the transaction. This allows batching multiple worker cleanups into one atomic operation.
flowchart TD
A[Start cleanup_worker] --> B[Find CLAIMED/RUNNING tasks\nowned by this worker]
B --> C[Mark each task FAILED\nerror = 'Worker disconnected']
C --> D[Find all WorkerJobLinks\nfor this worker]
D --> E[Collect job_ids and room_ids\nfrom linked jobs]
E --> F[Find all ProviderRecords\nowned by this worker]
F --> G[Delete each ProviderRecord\nemit ProvidersInvalidate per room]
G --> H[Delete all WorkerJobLinks]
H --> I[Delete the Worker record]
I --> J[Flush to DB]
J --> K{For each affected job_id}
K --> L[_soft_delete_orphan_job]
L --> K
K -- "all done" --> M[Return accumulated emissions]
The accumulated emissions include:
TaskStatusEventfor each task that was moved to FAILEDJobsInvalidatefor each room that had affected jobs (worker count changed)ProvidersInvalidatefor each room that had providers deleted
Orphan Job Soft-Deletion¶
After a worker is removed, _soft_delete_orphan_job(session, job_id) checks each affected job to determine whether it should be soft-deleted.
A job is soft-deleted (deleted=True) only if ALL of these conditions are true:
- The job is not
@internal-- server-managed jobs are never auto-deleted because they are re-registered at startup. - The job has no remaining workers -- checked via
WorkerJobLink. If another worker still serves this job, it stays active. - The job has no pending tasks -- pending tasks could be picked up by a new worker that registers later, so the job must remain visible.
If any condition is false, the job stays active and no changes are made. Soft-deleted jobs retain their task history and can be reactivated when a new worker registers the same (room_id, category, name) tuple.
Stuck Internal Task Cleanup¶
cleanup_stuck_internal_tasks(session, timeout) targets @internal tasks (those dispatched to taskiq workers) that are stuck in RUNNING or CLAIMED beyond internal_task_timeout_seconds (default 3600s = 1 hour).
The reference time for staleness is COALESCE(started_at, created_at):
| Task status | Reference column | Rationale |
|---|---|---|
RUNNING |
started_at |
The task began execution; timeout measures actual run duration |
CLAIMED |
created_at |
The task was dispatched but never started; started_at is NULL |
Stuck tasks are marked FAILED with error = "Internal worker timeout" and a completed_at timestamp. If any tasks were failed, the session is committed and a count is logged.
This cleanup only applies to @internal jobs. External worker tasks that are stuck are handled by the stale worker cleanup (if the worker's heartbeat lapses) or remain in their current state if the worker is still alive.
Three Disconnect Paths¶
The sweeper is the fallback for workers that do not disconnect gracefully. The system provides three paths for cleaning up a worker, all converging on the same cleanup_worker() function:
| Path | Trigger | Latency | Mechanism |
|---|---|---|---|
| Graceful | Client calls disconnect() or context manager exit |
Immediate | DELETE /workers/{id} --> cleanup_worker() |
| SIO disconnect | Network drop detected by Socket.IO | Seconds | Host app's SIO disconnect handler --> cleanup_worker() |
| Sweeper | No heartbeat received within timeout | Up to worker_timeout_seconds |
Sweeper periodic scan --> cleanup_stale_workers() --> cleanup_worker() |
Because all three paths use cleanup_worker(), the behavior is identical regardless of how the worker disappears: tasks are failed, links are removed, providers are deleted, and orphan jobs are soft-deleted.
Configuration¶
All settings are controlled via JobLibSettings (environment prefix ZNDRAW_JOBLIB_):
| Setting | Default | Purpose |
|---|---|---|
sweeper_interval_seconds |
30 | How often the sweeper wakes up and runs its cleanup cycle |
worker_timeout_seconds |
60 | How long since the last heartbeat before a worker is considered stale |
internal_task_timeout_seconds |
3600 | Maximum time an @internal task can remain in RUNNING or CLAIMED before being force-failed |
The worker timeout should be at least 2x the client's heartbeat interval to avoid false positives from transient network delays. The sweeper interval should be shorter than the worker timeout so that stale workers are detected within one timeout period.