Skip to content

createDurably

Creates a new Durably instance.

Signature

ts
function createDurably(options: DurablyOptions): Durably

Options

ts
interface DurablyOptions {
  dialect: Dialect
  pollingInterval?: number
  heartbeatInterval?: number
  staleThreshold?: number
}
OptionTypeDefaultDescription
dialectDialectrequiredKysely SQLite dialect
pollingIntervalnumber1000How often to check for pending jobs (ms)
heartbeatIntervalnumber5000How often to update heartbeat (ms)
staleThresholdnumber30000Time until a job is considered stale (ms)

Returns

Returns a Durably instance with the following methods:

init()

ts
await durably.init(): Promise<void>

Initialize Durably: runs database migrations and starts the worker. This is the recommended way to start Durably. Equivalent to calling migrate() then start().

migrate()

ts
await durably.migrate(): Promise<void>

Runs database migrations to create the required tables. Use init() instead for most cases.

start()

ts
durably.start(): void

Starts the worker that processes pending jobs. Typically called after migrate(), or use init() for both.

stop()

ts
await durably.stop(): Promise<void>

Stops the worker gracefully, waiting for the current job to complete.

register()

ts
durably.register<TJobs extends Record<string, JobDefinition>>(
  jobs: TJobs
): { [K in keyof TJobs]: JobHandle }

Registers one or more job definitions and returns an object of job handles. Also populates durably.jobs with the same handles for type-safe access.

ts
const { syncUsers, processImage } = durably.register({
  syncUsers: syncUsersJob,
  processImage: processImageJob,
})

// Or access via durably.jobs
await durably.jobs.syncUsers.trigger({ orgId: '123' })

See defineJob for details.

on()

ts
durably.on<E extends EventType>(
  event: E,
  handler: EventHandler<E>
): () => void

Subscribes to an event. Returns an unsubscribe function. See Events.

retry()

ts
await durably.retry(runId: string): Promise<void>

Retries a failed or cancelled run by resetting its status to pending.

cancel()

ts
await durably.cancel(runId: string): Promise<void>

Cancels a pending or running run.

deleteRun()

ts
await durably.deleteRun(runId: string): Promise<void>

Deletes a run and its associated steps and logs.

getRun()

ts
await durably.getRun<T extends Run = Run>(runId: string): Promise<T | null>

Gets a single run by ID. Supports generic type parameter for type-safe access.

ts
// Untyped (returns Run)
const run = await durably.getRun(runId)

// Typed (returns custom type)
type MyRun = Run & { payload: { userId: string }; output: { count: number } | null }
const typedRun = await durably.getRun<MyRun>(runId)

getRuns()

ts
await durably.getRuns<T extends Run = Run>(filter?: RunFilter): Promise<T[]>

interface RunFilter {
  jobName?: string
  status?: 'pending' | 'running' | 'completed' | 'failed' | 'cancelled'
  limit?: number
  offset?: number
}

Gets runs with optional filtering and pagination. Supports generic type parameter.

ts
// Typed getRuns
type MyRun = Run & { payload: { userId: string }; output: { count: number } | null }
const runs = await durably.getRuns<MyRun>({ jobName: 'my-job' })

getJob()

ts
durably.getJob(name: string): JobHandle | undefined

Gets a registered job by name.

subscribe()

ts
durably.subscribe(runId: string): ReadableStream<DurablyEvent>

Subscribes to events for a specific run as a ReadableStream. The stream automatically closes when the run completes or fails.

Example

ts
import { createDurably } from '@coji/durably'
import { createClient } from '@libsql/client'
import { LibsqlDialect } from '@libsql/kysely-libsql'

const client = createClient({ url: 'file:local.db' })
const dialect = new LibsqlDialect({ client })

const durably = createDurably({
  dialect,
  pollingInterval: 1000,
  heartbeatInterval: 5000,
  staleThreshold: 30000,
})

// Initialize (migrate + start)
await durably.init()

// Define and register jobs
import { defineJob } from '@coji/durably'
import { z } from 'zod'

const myJobDef = defineJob({
  name: 'my-job',
  input: z.object({ id: z.string() }),
  run: async (step, payload) => {
    // ...
  },
})

const { myJob } = durably.register({ myJob: myJobDef })

// Or trigger via durably.jobs
await durably.jobs.myJob.trigger({ id: '123' })

// Clean shutdown
process.on('SIGTERM', async () => {
  await durably.stop()
})

See Also

  • HTTP Handler — Expose Durably via HTTP/SSE for React clients
  • defineJob — Define jobs with typed schemas
  • Events — Subscribe to run and step events

Released under the MIT License.