# Durably - LLM Documentation > Step-oriented resumable batch execution for Node.js and browsers using SQLite. ## Overview Durably is a minimal workflow engine that persists step results to SQLite. If a job is interrupted (server restart, browser tab close, crash), it automatically resumes from the last successful step. ## Installation ```bash # Node.js with libsql (recommended) pnpm add @coji/durably kysely zod @libsql/client @libsql/kysely-libsql # Browser with SQLocal pnpm add @coji/durably kysely zod sqlocal ``` ## Core Concepts ### 1. Durably Instance ```ts import { createDurably } from '@coji/durably' import { LibsqlDialect } from '@libsql/kysely-libsql' import { createClient } from '@libsql/client' const client = createClient({ url: 'file:local.db' }) const dialect = new LibsqlDialect({ client }) const durably = createDurably({ dialect, pollingInterval: 1000, // Job polling interval (ms) heartbeatInterval: 5000, // Heartbeat update interval (ms) staleThreshold: 30000, // When to consider a job abandoned (ms) }) ``` ### 2. Job Definition ```ts import { defineJob } from '@coji/durably' import { z } from 'zod' const syncUsersJob = defineJob({ name: 'sync-users', input: z.object({ orgId: z.string() }), output: z.object({ syncedCount: z.number() }), run: async (step, payload) => { // Step 1: Fetch users (result is persisted) const users = await step.run('fetch-users', async () => { return await api.fetchUsers(payload.orgId) }) // Step 2: Save to database await step.run('save-to-db', async () => { await db.upsertUsers(users) }) return { syncedCount: users.length } }, }) // Register jobs with durably instance const { syncUsers } = durably.register({ syncUsers: syncUsersJob, }) ``` ### 3. Starting the Worker ```ts // Initialize: runs migrations and starts the worker await durably.init() // Or separately if needed: // await durably.migrate() // Run migrations only // durably.start() // Start worker only ``` ### 4. Triggering Jobs ```ts // Basic trigger (fire and forget) const run = await syncUsers.trigger({ orgId: 'org_123' }) console.log(run.id, run.status) // "pending" // Wait for completion const result = await syncUsers.triggerAndWait( { orgId: 'org_123' }, { timeout: 5000 }, ) console.log(result.output.syncedCount) // With idempotency key (prevents duplicate jobs) await syncUsers.trigger( { orgId: 'org_123' }, { idempotencyKey: 'webhook-event-456' }, ) // With concurrency key (serializes execution) await syncUsers.trigger({ orgId: 'org_123' }, { concurrencyKey: 'org_123' }) ``` ## Step Context API The `step` object provides these methods: ### step.run(name, fn) Executes a step and persists its result. On resume, returns cached result without re-executing. ```ts const result = await step.run('step-name', async () => { return await someAsyncOperation() }) ``` ### step.progress(current, total?, message?) Updates progress information for the run. ```ts step.progress(50, 100, 'Processing items...') ``` ### step.log Structured logging within jobs. ```ts step.log.info('Starting process', { userId: '123' }) step.log.warn('Rate limit approaching') step.log.error('Failed to connect', { error: err.message }) ``` ## Run Management ### Get Run Status ```ts // Via job handle (type-safe output) const run = await syncUsers.getRun(runId) if (run?.status === 'completed') { console.log(run.output.syncedCount) } // Via durably instance (untyped) const run = await durably.getRun(runId) // Via durably instance (typed with generic parameter) type MyRun = Run & { payload: { userId: string } output: { count: number } | null } const typedRun = await durably.getRun(runId) ``` ### Query Runs ```ts // Get failed runs const failedRuns = await durably.getRuns({ status: 'failed' }) // Filter by job name with pagination const runs = await durably.getRuns({ jobName: 'sync-users', status: 'completed', limit: 10, offset: 0, }) // Typed getRuns with generic parameter type MyRun = Run & { payload: { userId: string } output: { count: number } | null } const typedRuns = await durably.getRuns({ jobName: 'my-job' }) ``` ### Retry Failed Runs ```ts await durably.retry(runId) ``` ### Cancel Runs ```ts await durably.cancel(runId) ``` ### Delete Runs ```ts await durably.deleteRun(runId) ``` ## Events Subscribe to job execution events: ```ts // Run lifecycle events durably.on('run:trigger', (e) => console.log('Triggered:', e.runId)) durably.on('run:start', (e) => console.log('Started:', e.runId)) durably.on('run:complete', (e) => console.log('Done:', e.output)) durably.on('run:fail', (e) => console.error('Failed:', e.error)) durably.on('run:cancel', (e) => console.log('Cancelled:', e.runId)) durably.on('run:retry', (e) => console.log('Retried:', e.runId)) durably.on('run:progress', (e) => console.log('Progress:', e.progress.current, '/', e.progress.total), ) // Step events durably.on('step:start', (e) => console.log('Step:', e.stepName)) durably.on('step:complete', (e) => console.log('Step done:', e.stepName)) durably.on('step:fail', (e) => console.error('Step failed:', e.stepName)) // Log events durably.on('log:write', (e) => console.log(`[${e.level}]`, e.message)) ``` ## Advanced APIs ### getJob Get a registered job by name: ```ts const job = durably.getJob('sync-users') if (job) { const run = await job.trigger({ orgId: 'org_123' }) } ``` ### subscribe Subscribe to events for a specific run as a ReadableStream: ```ts const stream = durably.subscribe(runId) const reader = stream.getReader() while (true) { const { done, value } = await reader.read() if (done) break switch (value.type) { case 'run:start': console.log('Started') break case 'run:complete': console.log('Completed:', value.output) break case 'run:fail': console.error('Failed:', value.error) break case 'run:progress': console.log('Progress:', value.progress) break case 'log:write': console.log(`[${value.level}]`, value.message) break } } ``` ### createDurablyHandler Create HTTP handlers for client/server architecture using Web Standard Request/Response: ```ts import { createDurablyHandler } from '@coji/durably' const handler = createDurablyHandler(durably) // Use the unified handle() method with automatic routing app.all('/api/durably/*', async (req) => { return await handler.handle(req, '/api/durably') }) // Or use individual endpoints app.post('/api/durably/trigger', (req) => handler.trigger(req)) app.get('/api/durably/subscribe', (req) => handler.subscribe(req)) app.get('/api/durably/runs', (req) => handler.runs(req)) app.get('/api/durably/run', (req) => handler.run(req)) app.get('/api/durably/steps', (req) => handler.steps(req)) app.get('/api/durably/runs/subscribe', (req) => handler.runsSubscribe(req)) app.post('/api/durably/retry', (req) => handler.retry(req)) app.post('/api/durably/cancel', (req) => handler.cancel(req)) app.delete('/api/durably/run', (req) => handler.delete(req)) ``` **Handler Interface:** ```ts interface DurablyHandler { // Unified routing handler handle(request: Request, basePath: string): Promise // Individual endpoints trigger(request: Request): Promise // POST /trigger subscribe(request: Request): Response // GET /subscribe?runId=xxx (SSE) runs(request: Request): Promise // GET /runs run(request: Request): Promise // GET /run?runId=xxx steps(request: Request): Promise // GET /steps?runId=xxx runsSubscribe(request: Request): Response // GET /runs/subscribe (SSE) retry(request: Request): Promise // POST /retry?runId=xxx cancel(request: Request): Promise // POST /cancel?runId=xxx delete(request: Request): Promise // DELETE /run?runId=xxx } interface TriggerRequest { jobName: string input: Record idempotencyKey?: string concurrencyKey?: string } interface TriggerResponse { runId: string } ``` ## Plugins ### Log Persistence ```ts import { withLogPersistence } from '@coji/durably' durably.use(withLogPersistence()) ``` ## Browser Usage ```ts import { createDurably, defineJob } from '@coji/durably' import { SQLocalKysely } from 'sqlocal/kysely' import { z } from 'zod' const { dialect } = new SQLocalKysely('app.sqlite3') const durably = createDurably({ dialect, pollingInterval: 100, heartbeatInterval: 500, staleThreshold: 3000, }) // Same API as Node.js const { myJob } = durably.register({ myJob: defineJob({ name: 'my-job', input: z.object({}), run: async (step) => { /* ... */ }, }), }) // Initialize (same as Node.js) await durably.init() ``` ## Run Lifecycle ```text trigger() → pending → running → completed ↘ ↗ → failed ``` - **pending**: Waiting for worker to pick up - **running**: Worker is executing steps - **completed**: All steps finished successfully - **failed**: A step threw an error - **cancelled**: Manually cancelled via `cancel()` ## Resumability When a job resumes after interruption: 1. Worker polls for pending/stale runs 2. Job function is re-executed from the beginning 3. `step.run()` checks SQLite for cached results 4. Completed steps return cached values immediately (no re-execution) 5. Execution continues from the first incomplete step ## Type Definitions ```ts interface JobDefinition { name: TName input: ZodType output?: ZodType run: (step: StepContext, payload: TInput) => Promise } interface StepContext { runId: string run(name: string, fn: () => T | Promise): Promise progress(current: number, total?: number, message?: string): void log: { info(message: string, data?: unknown): void warn(message: string, data?: unknown): void error(message: string, data?: unknown): void } } interface Run { id: string jobName: string status: 'pending' | 'running' | 'completed' | 'failed' | 'cancelled' payload: unknown output?: TOutput error?: string progress?: { current: number; total?: number; message?: string } createdAt: string updatedAt: string } interface JobHandle { name: TName trigger(input: TInput, options?: TriggerOptions): Promise> triggerAndWait( input: TInput, options?: TriggerOptions, ): Promise<{ id: string; output: TOutput }> batchTrigger(inputs: BatchTriggerInput[]): Promise[]> getRun(id: string): Promise | null> getRuns(filter?: RunFilter): Promise[]> } interface TriggerOptions { idempotencyKey?: string concurrencyKey?: string timeout?: number } ``` ## License MIT --- # Durably React - LLM Documentation > React bindings for Durably - step-oriented resumable batch execution. ## Requirements - **React 19+** (uses `React.use()` for Promise resolution) ## Overview `@coji/durably-react` provides React hooks for triggering and monitoring Durably jobs. It supports two modes: 1. **Browser Hooks**: Run Durably entirely in the browser with SQLite WASM (OPFS) 2. **Server Hooks**: Connect to a remote Durably server via HTTP/SSE ## Installation ```bash # Browser mode - runs Durably in the browser pnpm add @coji/durably-react @coji/durably kysely zod sqlocal # Server mode - connects to Durably server pnpm add @coji/durably-react ``` ## Browser Hooks Import from `@coji/durably-react` for browser-complete mode. ### DurablyProvider Wraps your app and provides the Durably instance to all hooks: ```tsx import { DurablyProvider } from '@coji/durably-react' import { createDurably } from '@coji/durably' import { SQLocalKysely } from 'sqlocal/kysely' // Create and initialize Durably async function initDurably() { const sqlocal = new SQLocalKysely('app.sqlite3') const durably = createDurably({ dialect: sqlocal.dialect }) await durably.init() return durably } const durablyPromise = initDurably() // With fallback prop (recommended) function App() { return ( Loading...}> ) } // Or with external Suspense function AppAlt() { return ( Loading...}> ) } ``` **Props:** - `durably: Durably | Promise` - Durably instance or Promise (should be initialized via `await durably.init()`) - `fallback?: ReactNode` - Fallback to show while Promise resolves (wraps in Suspense automatically) ### useDurably Access the Durably instance directly: ```tsx import { useDurably } from '@coji/durably-react' function Component() { const { durably } = useDurably() // Use durably instance directly const handleGetRuns = async () => { const runs = await durably.getRuns() } } ``` **Return type:** ```ts interface UseDurablyResult { durably: Durably } ``` ### useJob Trigger and monitor a job: ```tsx import { defineJob } from '@coji/durably' import { useJob } from '@coji/durably-react' import { z } from 'zod' const myJob = defineJob({ name: 'my-job', input: z.object({ value: z.string() }), output: z.object({ result: z.number() }), run: async (step, payload) => { const data = await step.run('process', () => process(payload.value)) return { result: data.length } }, }) function Component() { const { trigger, triggerAndWait, status, output, error, logs, progress, isRunning, isPending, isCompleted, isFailed, isCancelled, currentRunId, reset, } = useJob(myJob, { initialRunId: undefined, autoResume: true, // Auto-resume pending/running jobs (default: true) followLatest: true, // Switch to tracking new runs (default: true) }) // Trigger job const handleClick = async () => { const { runId } = await trigger({ value: 'test' }) console.log('Started:', runId) } // Or trigger and wait for result const handleSync = async () => { const { runId, output } = await triggerAndWait({ value: 'test' }) console.log('Result:', output.result) } return (

Status: {status}

{progress && (

Progress: {progress.current}/{progress.total}

)} {isCompleted &&

Result: {output?.result}

} {isFailed &&

Error: {error}

}
) } ``` **Options:** ```ts interface UseJobOptions { initialRunId?: string // Initial Run ID to subscribe to autoResume?: boolean // Auto-resume pending/running jobs (default: true) followLatest?: boolean // Switch to tracking new runs (default: true) } ``` **Return type:** ```ts interface UseJobResult { trigger: (input: TInput) => Promise<{ runId: string }> triggerAndWait: (input: TInput) => Promise<{ runId: string; output: TOutput }> status: 'pending' | 'running' | 'completed' | 'failed' | 'cancelled' | null output: TOutput | null error: string | null logs: LogEntry[] progress: Progress | null isRunning: boolean isPending: boolean isCompleted: boolean isFailed: boolean isCancelled: boolean currentRunId: string | null reset: () => void } ``` ### useJobRun Subscribe to an existing run by ID: ```tsx import { useJobRun } from '@coji/durably-react' function RunMonitor({ runId }: { runId: string | null }) { const { status, output, error, progress, logs, isRunning, isCompleted, isFailed, } = useJobRun<{ result: number }>({ runId }) if (!runId) return
No run selected
return (

Status: {status}

{isCompleted &&

Output: {JSON.stringify(output)}

}
) } ``` ### useJobLogs Subscribe to logs from a run: ```tsx import { useJobLogs } from '@coji/durably-react' function LogViewer({ runId }: { runId: string | null }) { const { logs, clearLogs } = useJobLogs({ runId, maxLogs: 100, // Optional: limit stored logs }) return (
    {logs.map((log) => (
  • [{log.level}] {log.message} {log.data &&
    {JSON.stringify(log.data)}
    }
  • ))}
) } ``` ### useRuns List runs with filtering, pagination, and real-time updates: ```tsx import { useRuns, TypedRun } from '@coji/durably-react' import { defineJob } from '@coji/durably' // Option 1: Generic type parameter (dashboard with multiple job types) type ImportRun = TypedRun<{ file: string }, { count: number }> type SyncRun = TypedRun<{ userId: string }, { synced: boolean }> type DashboardRun = ImportRun | SyncRun function Dashboard() { const { runs } = useRuns({ pageSize: 10 }) // runs are typed as DashboardRun[] // Use run.jobName to narrow the type } // Option 2: JobDefinition (single job, auto-filters by jobName) const myJob = defineJob({ name: 'my-job', input: z.object({ value: z.string() }), output: z.object({ result: z.number() }), run: async (step, payload) => { /* ... */ }, }) function SingleJobDashboard() { const { runs } = useRuns(myJob, { status: 'completed', pageSize: 10 }) // runs[0].output is typed as { result: number } | null } // Option 3: Untyped (simple cases) function UntypedDashboard() { const { runs } = useRuns({ jobName: 'my-job', pageSize: 10 }) // runs[0].output is unknown } ``` ## Server Hooks Import from `@coji/durably-react/client` for server-connected mode. ### createDurablyClient Create a type-safe client for all registered jobs (recommended): ```tsx // Server: register jobs (app/lib/durably.server.ts) import { createDurably, createDurablyHandler } from '@coji/durably' export const durably = createDurably({ dialect }).register({ importCsv: importCsvJob, syncUsers: syncUsersJob, }) export const durablyHandler = createDurablyHandler(durably) await durably.init() // Client: create typed client (app/lib/durably.client.ts) import type { durably } from '~/lib/durably.server' import { createDurablyClient } from '@coji/durably-react/client' export const durablyClient = createDurablyClient({ api: '/api/durably', }) // In your component - fully type-safe with autocomplete function CsvImporter() { const { trigger, output, isRunning } = durablyClient.importCsv.useJob() return ( ) } // Subscribe to an existing run function RunViewer({ runId }: { runId: string }) { const { status, output, progress } = durablyClient.importCsv.useRun(runId) return
Status: {status}
} // Subscribe to logs function LogViewer({ runId }: { runId: string }) { const { logs } = durablyClient.importCsv.useLogs(runId) return
{logs.map(l => l.message).join('\n')}
} ``` ### Client useJob Direct hook when not using `createDurablyClient`: ```tsx import { useJob } from '@coji/durably-react/client' function Component() { const { trigger, triggerAndWait, status, output, error, logs, progress, isRunning, isPending, isCompleted, isFailed, isCancelled, currentRunId, reset, } = useJob< { userId: string }, // Input type { count: number } // Output type >({ api: '/api/durably', jobName: 'sync-data', initialRunId: undefined, // Optional: resume existing run autoResume: true, // Auto-resume pending/running jobs on mount (default: true) followLatest: true, // Switch to tracking new runs (default: true) }) const handleClick = async () => { const { runId } = await trigger({ userId: 'user_123' }) console.log('Started:', runId) } return } ``` **Options:** ```ts interface UseJobClientOptions { api: string // API endpoint URL (e.g., '/api/durably') jobName: string // Job name to trigger initialRunId?: string // Initial Run ID to subscribe to autoResume?: boolean // Auto-resume pending/running jobs on mount (default: true) followLatest?: boolean // Switch to tracking new runs via SSE (default: true) } ``` The `autoResume` option automatically fetches running/pending jobs on mount and subscribes to them. This is useful for SSR applications where users may refresh the page while a job is running. The `followLatest` option subscribes to job-level SSE events and automatically switches to tracking the latest triggered job. This enables real-time updates when jobs are triggered from other tabs or clients. ### Client useJobRun ```tsx import { useJobRun } from '@coji/durably-react/client' function Component({ runId }: { runId: string }) { const { status, output, error, progress, logs } = useJobRun<{ count: number }>({ api: '/api/durably', runId, }) return
Status: {status}
} ``` ### Client useJobLogs ```tsx import { useJobLogs } from '@coji/durably-react/client' function Component({ runId }: { runId: string }) { const { logs, clearLogs } = useJobLogs({ api: '/api/durably', runId, maxLogs: 50, }) return (
    {logs.map((log) => (
  • {log.message}
  • ))}
) } ``` ### Client useRuns List runs with pagination and real-time updates: ```tsx import { useRuns, TypedClientRun } from '@coji/durably-react/client' import { defineJob } from '@coji/durably' // Option 1: Generic type parameter (dashboard with multiple job types) type ImportRun = TypedClientRun<{ file: string }, { count: number }> type SyncRun = TypedClientRun<{ userId: string }, { synced: boolean }> type DashboardRun = ImportRun | SyncRun function Dashboard() { const { runs } = useRuns({ api: '/api/durably', pageSize: 10 }) // runs are typed as DashboardRun[] } // Option 2: JobDefinition (single job, auto-filters by jobName) const syncDataJob = defineJob({ name: 'sync-data', input: z.object({ userId: z.string() }), output: z.object({ count: z.number() }), run: async (step, payload) => { /* ... */ }, }) function SingleJobDashboard() { const { runs } = useRuns(syncDataJob, { api: '/api/durably', status: 'completed', pageSize: 20, }) // runs[0].output is typed as { count: number } | null } // Option 3: Untyped (simple cases) function UntypedDashboard() { const { runs } = useRuns({ api: '/api/durably', jobName: 'sync-data' }) // runs[0].output is unknown } ``` ### Client useRunActions Actions for runs (retry, cancel, delete): ```tsx import { useRunActions } from '@coji/durably-react/client' function RunActions({ runId, status }: { runId: string; status: string }) { const { retry, cancel, deleteRun, getRun, getSteps, isLoading, error } = useRunActions({ api: '/api/durably', }) return (
{(status === 'failed' || status === 'cancelled') && ( )} {(status === 'pending' || status === 'running') && ( )} {error && {error}}
) } ``` ## Server Handler Setup On your server, use `createDurablyHandler`: ```ts // app/lib/durably.server.ts import { createDurably } from '@coji/durably' import { createDurablyHandler } from '@coji/durably' import { LibsqlDialect } from '@libsql/kysely-libsql' import { createClient } from '@libsql/client' const client = createClient({ url: 'file:local.db' }) const dialect = new LibsqlDialect({ client }) export const durably = createDurably({ dialect }).register({ syncData: syncDataJob, }) export const durablyHandler = createDurablyHandler(durably) await durably.init() // app/routes/api.durably.$.ts (React Router / Remix) import { durablyHandler } from '~/lib/durably.server' import type { Route } from './+types/api.durably.$' export async function loader({ request }: Route.LoaderArgs) { return durablyHandler.handle(request, '/api/durably') } export async function action({ request }: Route.ActionArgs) { return durablyHandler.handle(request, '/api/durably') } ``` ## Type Definitions ```ts type RunStatus = 'pending' | 'running' | 'completed' | 'failed' | 'cancelled' interface Progress { current: number total?: number message?: string } interface LogEntry { id: string runId: string stepName: string | null level: 'info' | 'warn' | 'error' message: string data: unknown timestamp: string } // Browser hooks: TypedRun with generic input/output type TypedRun< TInput extends Record = Record, TOutput extends Record | undefined = Record, > = Omit & { payload: TInput output: TOutput | null } // Client hooks: TypedClientRun with generic input/output type TypedClientRun< TInput extends Record = Record, TOutput extends Record | undefined = Record, > = Omit & { input: TInput output: TOutput | null } ``` ## Common Patterns ### Loading States ```tsx function Component() { const { isRunning, trigger } = useJob(myJob) return ( ) } ``` ### Error Handling ```tsx function Component() { const { trigger, error, isFailed, reset } = useJob(myJob) const handleClick = async () => { try { await trigger({ value: 'test' }) } catch (e) { console.error('Trigger failed:', e) } } if (isFailed) { return (

Error: {error}

) } return } ``` ### Progress Tracking ```tsx function Component() { const { trigger, progress, isRunning } = useJob(progressJob) return (
{isRunning && progress && (

{progress.message}

)}
) } ``` ### Reconnecting to Existing Run ```tsx function Component({ existingRunId }: { existingRunId?: string }) { const { status, output } = useJob(myJob, { initialRunId: existingRunId, }) // Will automatically subscribe to the existing run return
Status: {status}
} ``` ## License MIT