createDurably
Creates a new Durably instance.
Signature
function createDurably(options: DurablyOptions): DurablyOptions
interface DurablyOptions {
dialect: Dialect
pollingInterval?: number
heartbeatInterval?: number
staleThreshold?: number
}| Option | Type | Default | Description |
|---|---|---|---|
dialect | Dialect | required | Kysely SQLite dialect |
pollingInterval | number | 1000 | How often to check for pending jobs (ms) |
heartbeatInterval | number | 5000 | How often to update heartbeat (ms) |
staleThreshold | number | 30000 | Time until a job is considered stale (ms) |
Returns
Returns a Durably instance with the following methods:
init()
await durably.init(): Promise<void>Initialize Durably: runs database migrations and starts the worker. This is the recommended way to start Durably. Equivalent to calling migrate() then start().
migrate()
await durably.migrate(): Promise<void>Runs database migrations to create the required tables. Use init() instead for most cases.
start()
durably.start(): voidStarts the worker that processes pending jobs. Typically called after migrate(), or use init() for both.
stop()
await durably.stop(): Promise<void>Stops the worker gracefully, waiting for the current job to complete.
register()
durably.register<TJobs extends Record<string, JobDefinition>>(
jobs: TJobs
): { [K in keyof TJobs]: JobHandle }Registers one or more job definitions and returns an object of job handles. Also populates durably.jobs with the same handles for type-safe access.
const { syncUsers, processImage } = durably.register({
syncUsers: syncUsersJob,
processImage: processImageJob,
})
// Or access via durably.jobs
await durably.jobs.syncUsers.trigger({ orgId: '123' })See defineJob for details.
on()
durably.on<E extends EventType>(
event: E,
handler: EventHandler<E>
): () => voidSubscribes to an event. Returns an unsubscribe function. See Events.
retry()
await durably.retry(runId: string): Promise<void>Retries a failed or cancelled run by resetting its status to pending.
cancel()
await durably.cancel(runId: string): Promise<void>Cancels a pending or running run.
deleteRun()
await durably.deleteRun(runId: string): Promise<void>Deletes a run and its associated steps and logs.
getRun()
await durably.getRun<T extends Run = Run>(runId: string): Promise<T | null>Gets a single run by ID. Supports generic type parameter for type-safe access.
// Untyped (returns Run)
const run = await durably.getRun(runId)
// Typed (returns custom type)
type MyRun = Run & { payload: { userId: string }; output: { count: number } | null }
const typedRun = await durably.getRun<MyRun>(runId)getRuns()
await durably.getRuns<T extends Run = Run>(filter?: RunFilter): Promise<T[]>
interface RunFilter {
jobName?: string
status?: 'pending' | 'running' | 'completed' | 'failed' | 'cancelled'
limit?: number
offset?: number
}Gets runs with optional filtering and pagination. Supports generic type parameter.
// Typed getRuns
type MyRun = Run & { payload: { userId: string }; output: { count: number } | null }
const runs = await durably.getRuns<MyRun>({ jobName: 'my-job' })getJob()
durably.getJob(name: string): JobHandle | undefinedGets a registered job by name.
subscribe()
durably.subscribe(runId: string): ReadableStream<DurablyEvent>Subscribes to events for a specific run as a ReadableStream. The stream automatically closes when the run completes or fails.
Example
import { createDurably } from '@coji/durably'
import { createClient } from '@libsql/client'
import { LibsqlDialect } from '@libsql/kysely-libsql'
const client = createClient({ url: 'file:local.db' })
const dialect = new LibsqlDialect({ client })
const durably = createDurably({
dialect,
pollingInterval: 1000,
heartbeatInterval: 5000,
staleThreshold: 30000,
})
// Initialize (migrate + start)
await durably.init()
// Define and register jobs
import { defineJob } from '@coji/durably'
import { z } from 'zod'
const myJobDef = defineJob({
name: 'my-job',
input: z.object({ id: z.string() }),
run: async (step, payload) => {
// ...
},
})
const { myJob } = durably.register({ myJob: myJobDef })
// Or trigger via durably.jobs
await durably.jobs.myJob.trigger({ id: '123' })
// Clean shutdown
process.on('SIGTERM', async () => {
await durably.stop()
})See Also
- HTTP Handler — Expose Durably via HTTP/SSE for React clients
- defineJob — Define jobs with typed schemas
- Events — Subscribe to run and step events