HTTP Handler
Expose Durably via HTTP/SSE endpoints for React clients and external integrations.
createDurablyHandler
Create a handler that routes HTTP requests to the appropriate Durably operations.
import { createDurablyHandler } from '@coji/durably'
const handler = createDurablyHandler(durably, {
onRequest: async () => {
// Called before each request - useful for lazy initialization
await durably.init()
},
})Options
interface CreateDurablyHandlerOptions<TContext, TLabels> {
/** Called before handling each request (after authentication) */
onRequest?: () => Promise<void> | void
/**
* Throttle interval (ms) for SSE progress events.
* First and last progress events are always delivered immediately.
* Set to 0 to disable. Default: 100
*/
sseThrottleMs?: number
/** Auth middleware. When set, authenticate is required and applies to ALL endpoints. */
auth?: AuthConfig<TContext, TLabels>
}Framework Integration
React Router / Remix
Use a splat route to handle all Durably endpoints under a single path.
// app/routes/api.durably.$.ts
import { durablyHandler } from '~/lib/durably.server'
import type { Route } from './+types/api.durably.$'
export async function loader({ request }: Route.LoaderArgs) {
return durablyHandler.handle(request, '/api/durably')
}
export async function action({ request }: Route.ActionArgs) {
return durablyHandler.handle(request, '/api/durably')
}Next.js
// app/api/durably/[...path]/route.ts
import { durablyHandler } from '@/lib/durably'
export async function GET(request: Request) {
return durablyHandler.handle(request, '/api/durably')
}
export async function POST(request: Request) {
return durablyHandler.handle(request, '/api/durably')
}
export async function DELETE(request: Request) {
return durablyHandler.handle(request, '/api/durably')
}Express / Hono
// Express
app.use('/api/durably', async (req, res, next) => {
const request = new Request(`http://localhost${req.url}`, {
method: req.method,
headers: req.headers,
body: req.method !== 'GET' ? JSON.stringify(req.body) : undefined,
})
const response = await handler.handle(request, '/api/durably')
res.status(response.status)
response.headers.forEach((v, k) => res.setHeader(k, v))
res.send(await response.text())
})
// Hono
app.all('/api/durably/*', (c) => handler.handle(c.req.raw, '/api/durably'))Response Shape
The /runs and /run endpoints return ClientRun objects — a subset of the full Run type with internal fields (leaseOwner, leaseExpiresAt, idempotencyKey, concurrencyKey, leaseGeneration, updatedAt) stripped, plus isTerminal and isActive derived from status. Use toClientRun() to apply the same projection in custom code:
import { toClientRun } from '@coji/durably'
const run = await durably.getRun(runId)
const clientRun = toClientRun(run) // strips internal fields; adds isTerminal / isActiveEndpoints
The handler provides these endpoints:
| Method | Path | Description |
|---|---|---|
POST | /trigger | Trigger a job |
GET | /subscribe?runId=xxx | SSE stream for run events |
GET | /runs | List runs with filtering |
GET | /run?runId=xxx | Get single run |
GET | /steps?runId=xxx | Get steps for a run |
GET | /runs/subscribe | SSE stream for run list updates |
POST | /retrigger?runId=xxx | Retrigger a failed run (creates new run) |
POST | /cancel?runId=xxx | Cancel a pending or leased run |
DELETE | /run?runId=xxx | Delete a run |
Trigger Request
// POST /api/durably/trigger
{
"jobName": "import-csv",
"input": { "filename": "data.csv" },
"idempotencyKey": "unique-key", // optional
"concurrencyKey": "user-123", // optional
"coalesce": "skip", // optional — requires concurrencyKey
"labels": { "organizationId": "org_123" } // optional
}
// Response
{ "runId": "run_abc123", "disposition": "created" }
// disposition: "created" | "idempotent" | "coalesced"
// When disposition is not "created", runId refers to the existing run.
// idempotencyKey match returns "idempotent" (takes priority over coalesce).SSE behavior
run:trigger is not emitted for idempotent or coalesced triggers. A run:coalesced event is emitted instead when coalesce: 'skip' returns an existing pending run.
SSE Event Stream
The /subscribe endpoint returns Server-Sent Events for real-time updates.
// GET /api/durably/subscribe?runId=run_abc123
// Events:
data: {"type":"run:leased","runId":"run_abc123","jobName":"import-csv",...}
data: {"type":"run:progress","runId":"run_abc123","progress":{"current":1,"total":10},...}
data: {"type":"step:complete","runId":"run_abc123","stepName":"parse",...}
data: {"type":"run:complete","runId":"run_abc123","output":{"count":10},...}The stream closes automatically when the run completes or fails.
List Runs
// GET /api/durably/runs?jobName=import-csv&status=completed&label.organizationId=org_123&limit=10&offset=0
// Multiple jobName params filter by any of them:
// GET /api/durably/runs?jobName=import-csv&jobName=sync-users
// Multiple label params use AND logic:
// GET /api/durably/runs?label.env=prod&label.region=us-east
// Response
{
"runs": [
{
"id": "run_abc123",
"jobName": "import-csv",
"status": "completed",
"input": { "filename": "data.csv" },
"output": { "count": 10 },
"createdAt": "2024-01-01T00:00:00.000Z",
"completedAt": "2024-01-01T00:01:00.000Z"
}
],
"total": 100,
"hasMore": true
}Auth Middleware
Built-in auth middleware for multi-tenant apps. When auth is configured, authenticate is called on every request before any processing.
const handler = createDurablyHandler(durably, {
auth: {
// Required: authenticate every request. Return context or throw Response to reject.
authenticate: async (request) => {
const session = await requireUser(request)
const orgId = await resolveCurrentOrgId(request, session.user.id)
return { orgId }
},
// Guard before trigger (called AFTER body validation and job resolution)
onTrigger: async (ctx, { jobName, input, labels }) => {
if (labels?.organizationId !== ctx.orgId) {
throw new Response('Forbidden', { status: 403 })
}
},
// Guard before run-level operations
onRunAccess: async (ctx, run, { operation }) => {
if (run.labels.organizationId !== ctx.orgId) {
throw new Response('Forbidden', { status: 403 })
}
},
// Scope runs list queries (GET /runs)
scopeRuns: async (ctx, filter) => ({
...filter,
labels: { ...filter.labels, organizationId: ctx.orgId },
}),
// Scope runs subscribe stream (GET /runs/subscribe)
// Falls back to scopeRuns if not set
scopeRunsSubscribe: async (ctx, filter) => ({
...filter,
labels: { ...filter.labels, organizationId: ctx.orgId },
}),
},
})AuthConfig
interface AuthConfig<TContext, TLabels> {
/** Authenticate every request. Return context or throw Response to reject. */
authenticate: (request: Request) => Promise<TContext> | TContext
/** Guard before trigger. Called after body validation and job resolution. */
onTrigger?: (
ctx: TContext,
trigger: TriggerRequest<TLabels>,
) => Promise<void> | void
/** Guard before run-level operations. Run is pre-fetched. */
onRunAccess?: (
ctx: TContext,
run: Run<TLabels>,
info: { operation: RunOperation },
) => Promise<void> | void
/** Scope runs list queries (GET /runs). */
scopeRuns?: (
ctx: TContext,
filter: RunFilter<TLabels>,
) => RunFilter<TLabels> | Promise<RunFilter<TLabels>>
/** Scope runs subscribe stream. Falls back to scopeRuns if not set. */
scopeRunsSubscribe?: (
ctx: TContext,
filter: RunsSubscribeFilter<TLabels>,
) => RunsSubscribeFilter<TLabels> | Promise<RunsSubscribeFilter<TLabels>>
}
type RunOperation =
| 'read'
| 'subscribe'
| 'steps'
| 'retrigger'
| 'cancel'
| 'delete'Execution Order
authenticate(request)— fail fast before anything elseonRequest()— lazy init (migrations, worker start)- Validate request (parse body/params)
- Auth hook (
onTrigger,onRunAccess,scopeRuns, orscopeRunsSubscribe) - Execute operation
Rejecting Requests
Auth hooks reject requests by throwing a Response:
throw new Response('Forbidden', { status: 403 })This pattern is framework-agnostic and works with React Router, Next.js, Hono, etc.
TContext Generic
TContext is automatically inferred from the return type of authenticate. All other hooks receive the same typed context:
// TContext is inferred as { orgIds: Set<string> }
auth: {
authenticate: async (request) => {
return { orgIds: new Set(['org_1', 'org_2']) }
},
onTrigger: async (ctx, trigger) => {
ctx.orgIds // Set<string> — fully typed
},
}TLabels Generic
TLabels is inferred from the Durably instance when a labels schema is provided via createDurably({ labels: z.object({...}) }). This provides type-safe labels throughout auth hooks.