Providers¶
Providers are a separate abstraction from jobs. While jobs handle user-initiated computation (workers pull tasks via FIFO claiming), providers handle server-dispatched read requests with result caching. A provider answers the question "give me this data" rather than "run this computation."
Jobs vs. Providers¶
| Jobs | Providers | |
|---|---|---|
| Purpose | User-initiated computation | Remote resource access |
| Dispatch | Workers pull/claim (FIFO) | Server pushes to specific provider via SIO |
| Results | Side effects (modify room state) | Data returned to caller (cached) |
| Formats | JSON payloads | JSON or binary (msgpack, arrow, etc.) |
| HTTP pattern | POST creates task, PATCH updates | GET reads resource -- 200 (cached) or 202 (dispatched) |
| Communication | REST only (claim loop) | SIO for dispatch, REST for result upload |
Jobs are fire-and-forget from the caller's perspective: a frontend submits a task and the worker modifies room state as a side effect. Providers are request/response: the caller blocks (long-polls) until the provider returns data, and that data is cached so subsequent identical requests return instantly.
Provider Read Flow¶
The read flow is the most complex interaction pattern in the system. It
combines REST endpoints, a Redis-backed result cache, inflight coalescing
via SET NX, Socket.IO dispatch, and pub/sub-based long-polling.
sequenceDiagram
participant FE as Frontend
participant S as Server (Router)
participant RB as ResultBackend (Redis)
participant SIO as Socket.IO
participant PC as Provider Client (JobManager)
FE->>S: GET /rooms/{room_id}/providers/{name}?params
S->>S: request_hash = SHA256(canonical_json(params))
S->>S: cache_key = "provider-result:{full_name}:{hash}"
S->>RB: get(cache_key)
alt Cache HIT
RB-->>S: cached bytes
S-->>FE: 200 OK + cached bytes (provider's content_type)
else Cache MISS
RB-->>S: None
S->>RB: acquire_inflight(inflight_key, ttl)
alt Inflight lock acquired
RB-->>S: True
S->>SIO: emit ProviderRequest to room "providers:{full_name}"
SIO->>PC: ProviderRequest(request_id, provider_name, params)
else Inflight already held
RB-->>S: False
Note over S: Skip dispatch (another request already triggered it)
end
S->>RB: wait_for_key(cache_key, timeout)
Note over S: Blocks on Redis pub/sub
PC->>PC: Instantiate Provider class with params
PC->>PC: result = provider.read(handler)
PC->>S: POST /providers/{id}/results<br/>Body: raw bytes, Header: X-Request-Hash
S->>RB: store(cache_key, data, ttl)
S->>RB: release_inflight(inflight_key)
S->>RB: notify_key(cache_key) [pub/sub PUBLISH]
RB-->>S: wait_for_key wakes, returns cached bytes
S->>SIO: emit ProviderResultReady to room "room:{room_id}"
S-->>FE: 200 OK + result bytes (provider's content_type)
Note over S,FE: If timeout expires before result: 504 + Retry-After: 2
end
Step-by-step breakdown¶
- Frontend sends
GET /rooms/{room_id}/providers/{name}?paramsto the server. - Server computes
request_hash = SHA256(canonical_json(params))usingdependencies.request_hash(). The canonical form isjson.dumps(params, sort_keys=True, separators=(",", ":")). - Server builds
cache_key = "provider-result:{full_name}:{hash}"and checksresult_backend.get(cache_key). - Cache HIT: return
200with the cached bytes and the provider's declaredcontent_type. - Cache MISS: attempt
result_backend.acquire_inflight(inflight_key, ttl)using RedisSET NXsemantics. - If inflight acquired: emit a
ProviderRequestevent to the SIO roomproviders:{full_name}. - If inflight NOT acquired (another request already dispatched for this hash): skip dispatch.
- Server calls
result_backend.wait_for_key(cache_key, timeout), which blocks on Redis pub/sub. - Provider client receives the
ProviderRequestvia its SIO connection (handled byJobManager._on_provider_request). - Provider client instantiates the
Providersubclass with the decoded params and callsprovider.read(handler). - Provider client sends
POST /providers/{id}/resultswith the raw result bytes and anX-Request-Hashheader. - Server stores the result in the cache, releases the inflight lock, and
calls
notify_key(cache_key)to publish a pub/sub notification. - Server emits
ProviderResultReadytoroom:{room_id}so the frontend can refresh its UI. - The
wait_for_keycall wakes up and returns the cached data. The server responds with200and the result bytes. - If timeout: return
504 Gateway Timeoutwith RFC 9457 Problem Details and aRetry-After: 2header.
Inflight Coalescing¶
Multiple concurrent requests with identical params must not dispatch multiple
ProviderRequest events. The inflight lock (acquire_inflight with Redis
SET NX + TTL) ensures only the first request triggers a dispatch. All
concurrent requests block on the same cache_key via wait_for_key().
flowchart TD
A["Request arrives"] --> B{"Check cache<br/>get(cache_key)"}
B -- "HIT" --> C["Return 200<br/>with cached bytes"]
B -- "MISS" --> D{"acquire_inflight<br/>(SET NX + TTL)"}
D -- "Acquired" --> E["Emit ProviderRequest<br/>via Socket.IO"]
D -- "Not acquired" --> F["Skip dispatch<br/>(another request already did)"]
E --> G["wait_for_key(cache_key, timeout)"]
F --> G
G --> H{"Result arrives<br/>before timeout?"}
H -- "Yes" --> I["notify_key wakes waiters<br/>Return 200"]
H -- "No" --> J["Return 504<br/>Retry-After: 2"]
The inflight TTL (provider_inflight_ttl_seconds, default 30s) acts as a
safety net: if the provider client crashes or the Socket.IO message is lost,
the lock expires and a subsequent request can re-dispatch. The result cache
TTL (provider_result_ttl_seconds, default 300s) controls how long cached
results remain valid.
ResultBackend Protocol¶
The host app must provide a ResultBackend implementation (typically
Redis-backed) by overriding the get_result_backend dependency. The
protocol is defined in dependencies.py:
| Method | Signature | Purpose |
|---|---|---|
store |
async (key: str, data: bytes, ttl: int) -> None |
Store bytes in cache with expiration (seconds). |
get |
async (key: str) -> bytes \| None |
Retrieve cached bytes, or None if not present. |
delete |
async (key: str) -> None |
Remove a cached entry. |
acquire_inflight |
async (key: str, ttl: int) -> bool |
Atomic set-if-not-exists with TTL (Redis SET NX). Returns True if the lock was acquired. |
release_inflight |
async (key: str) -> None |
Release the inflight lock. |
wait_for_key |
async (key: str, timeout: float) -> bytes \| None |
Subscribe to pub/sub, check cache (race-safe), wait for notification or timeout. Returns cached data or None. |
notify_key |
async (key: str) -> None |
Publish notification that the key is populated, waking all wait_for_key waiters. |
Race safety in wait_for_key¶
The wait_for_key implementation must be race-safe: subscribe to the
notification channel FIRST, then check the cache, then wait. This prevents
the race where a result arrives between the cache check and the subscribe:
WRONG (race-prone): CORRECT (race-safe):
1. check cache -> miss 1. subscribe to channel
2. result arrives + notify 2. check cache -> miss
3. subscribe to channel 3. wait for notification
4. wait forever (missed it) 4. result arrives + notify -> wakes up
If step 2 in the correct sequence finds a cache hit, wait_for_key returns
immediately without waiting.
Content Types¶
Each Provider subclass declares its response format via the
content_type ClassVar (default "application/json"). This value is stored
in the ProviderRecord database row at registration time.
- JSON providers:
read()returns a Python object. The client JSON-serializes it (json.dumps(result).encode()), uploads the bytes, and the server returns them withmedia_type="application/json". - Binary providers:
read()returnsbytesdirectly. The client uploads the raw bytes as-is, and the server returns them with the provider's declaredcontent_type(e.g.,application/x-msgpack,image/png).
The result upload endpoint (POST /providers/{id}/results) stores the raw
request body with no parsing or re-serialization. The server is
content-type-agnostic -- it passes bytes through from the provider client
to the frontend caller.
class FilesystemRead(Provider):
category: ClassVar[str] = "filesystem"
content_type: ClassVar[str] = "application/json"
path: str
def read(self, handler: Any) -> Any:
# Return value is JSON-serialized by the client
return handler.cat(self.path)
class ThumbnailProvider(Provider):
category: ClassVar[str] = "thumbnails"
content_type: ClassVar[str] = "image/png"
frame: int
def read(self, handler: Any) -> bytes:
# Return raw bytes for binary content types
return handler.render_thumbnail(self.frame)
Provider Registration¶
Providers are registered via PUT /rooms/{room_id}/providers with a schema,
content_type, and worker_id. The endpoint is idempotent -- it updates the
existing record if one already exists for the same (room_id, category, name)
tuple.
On the client side, manager.register_provider(cls, name=..., handler=..., room=...)
performs the following steps:
- Generate JSON schema from the
Providersubclass viacls.model_json_schema(). - PUT to server with the schema,
content_type, and optionalworker_id. If noworker_idis provided, the server auto-creates aWorkerrecord. - Emit
JoinProviderRoomvia Socket.IO, which causes the host app to add the client's SIO session to the roomproviders:{full_name}. FutureProviderRequestevents emitted to this room will reach this client. - Store the handler locally in
JobManager._providerskeyed byfull_name. When aProviderRequestarrives, the handler is passed toprovider.read(handler).
manager.register_provider(
FilesystemRead,
name="local",
handler=fsspec.filesystem("file"),
room="@global",
)
# Registers as @global:filesystem:local
# handler is passed to FilesystemRead(...).read(handler) on dispatch
The handlers property¶
The handlers property on JobManager returns all registered handlers keyed
by full_name:
This dict is typically passed into Extension.run() during job execution, so
extensions can look up provider handlers by name to read data during their
computation.
Provider Lifecycle on the Client¶
When a ProviderRequest arrives via Socket.IO, JobManager._on_provider_request
handles it synchronously:
- Look up the
_RegisteredProviderbyevent.provider_name. - Deserialize
event.params(a canonical JSON string) into a dict. - Instantiate the
Providersubclass with the params as keyword arguments. - Call
instance.read(handler)to produce the result. - Serialize: JSON providers get
json.dumps(result).encode(), binary providers passresultthrough as-is. - Upload via
POST /providers/{id}/resultswith theX-Request-Hashheader.
If read() raises an exception, the error is logged and a chat message is
posted to the provider's room (best-effort). The inflight lock eventually
expires, allowing a retry.
Configuration¶
Provider behavior is controlled by JobLibSettings fields with the
ZNDRAW_JOBLIB_ env prefix:
| Setting | Default | Description |
|---|---|---|
allowed_provider_categories |
None (unrestricted) |
When set, only listed categories can be registered. |
provider_result_ttl_seconds |
300 |
How long cached results remain valid (seconds). |
provider_inflight_ttl_seconds |
30 |
TTL for the inflight deduplication lock (seconds). |
provider_long_poll_default_seconds |
5 |
Default long-poll timeout when no Prefer: wait=N header is sent. |
provider_long_poll_max_seconds |
30 |
Maximum allowed long-poll timeout regardless of Prefer header value. |
Error Handling¶
Provider endpoints use the same RFC 9457 Problem Details system as the rest of the router:
| Error | Status | When |
|---|---|---|
ProviderNotFound |
404 | Provider does not exist or is not visible from the requested room. |
ProviderTimeout |
504 | Provider did not respond within the long-poll timeout. Includes Retry-After: 2 header. |
Forbidden |
403 | Provider belongs to a different user (registration/upload/delete). |
InvalidCategory |
400 | Provider category not in allowed_provider_categories. |
WorkerNotFound |
404 | Specified worker_id does not exist or is not owned by the user. |
On the client side, ProviderTimeoutError (defined in exceptions.py) can
be raised from ProviderTimeout.raise_for_client() when the client detects a
504 response.