Skip to content

createDurably

Creates a new Durably instance.

Signature

ts
// Without jobs (use .register() later)
function createDurably<TLabels>(
  options: DurablyOptions<TLabels>,
): Durably<{}, TLabels>

// With jobs (1-step initialization)
function createDurably<TLabels, TJobs>(
  options: DurablyOptions<TLabels, TJobs> & { jobs: TJobs },
): Durably<TransformToHandles<TJobs, TLabels>, TLabels>

Options

ts
interface DurablyOptions<
  TLabels extends Record<string, string> = Record<string, string>,
  TJobs extends Record<string, JobDefinition> = Record<string, never>,
> {
  dialect: Dialect
  pollingIntervalMs?: number
  leaseRenewIntervalMs?: number
  leaseMs?: number
  preserveSteps?: boolean
  retainRuns?: string
  labels?: z.ZodType<TLabels>
  jobs?: TJobs
}
OptionTypeDefaultDescription
dialectDialectrequiredKysely dialect (SQLite, libSQL, or PostgreSQL)
pollingIntervalMsnumber1000How often to check for pending jobs (ms)
leaseRenewIntervalMsnumber5000How often to renew the lease (ms)
leaseMsnumber30000Lease duration — time until a job is considered stale (ms)
labelsz.ZodTypeZod schema for labels. Enables type-safe labels and runtime validation on trigger()
preserveStepsbooleanfalseKeep step output data when runs reach terminal state (completed/failed/cancelled)
retainRunsstringAuto-delete terminal runs older than this duration (e.g. '30d', '12h', '90m'). Throws if format is invalid.
jobsTJobsJob definitions to register. Shorthand for calling .register() after creation

Returns

Returns a Durably instance with the following methods:

init()

ts
await durably.init(): Promise<void>

Initialize Durably: runs database migrations and starts the worker. This is the recommended way to start Durably. Equivalent to calling migrate() then start().

migrate()

ts
await durably.migrate(): Promise<void>

Runs database migrations to create the required tables. Use init() instead for most cases.

start()

ts
durably.start(): void

Starts the worker that processes pending jobs. Typically called after migrate(), or use init() for both.

stop()

ts
await durably.stop(): Promise<void>

Stops the worker gracefully, waiting for the current job to complete.

register()

ts
durably.register<TJobs extends Record<string, JobDefinition>>(
  jobs: TJobs
): { [K in keyof TJobs]: JobHandle }

Registers one or more job definitions and returns an object of job handles. Also populates durably.jobs with the same handles for type-safe access.

TIP

You can also pass jobs directly to createDurably() as a shorthand:

ts
const durably = createDurably({
  dialect,
  jobs: { syncUsers: syncUsersJob, processImage: processImageJob },
})
ts
const { syncUsers, processImage } = durably.register({
  syncUsers: syncUsersJob,
  processImage: processImageJob,
})

// Or access via durably.jobs
await durably.jobs.syncUsers.trigger({ orgId: '123' })

See defineJob for details.

on()

ts
durably.on<E extends EventType>(
  event: E,
  handler: EventHandler<E>
): () => void

Subscribes to an event. Returns an unsubscribe function. See Events.

retrigger()

ts
await durably.retrigger(runId: string): Promise<Run>

Retriggers a completed, failed, or cancelled run by creating a fresh run with the same input and labels (idempotency key is not carried forward). Returns the new Run object. Throws if the original input doesn't match the current job's input schema.

cancel()

ts
await durably.cancel(runId: string): Promise<void>

Cancels a pending or leased run.

deleteRun()

ts
await durably.deleteRun(runId: string): Promise<void>

Deletes a run and its associated steps and logs.

purgeRuns()

ts
await durably.purgeRuns(options: {
  olderThan: Date      // cutoff — terminal runs with completedAt before this are deleted
  limit?: number       // max rows to delete per call (default: 500)
}): Promise<number>

Deletes terminal runs (completed, failed, cancelled) with completedAt older than the cutoff. Returns the number of deleted runs. Associated steps, logs, and labels are cascade-deleted.

For automatic cleanup, use the retainRuns option instead (auto-purge uses a batch size of 100).

getRun()

ts
await durably.getRun<T extends Run<TLabels> = Run<TLabels>>(runId: string): Promise<T | null>

Gets a single run by ID. Supports generic type parameter for type-safe access.

ts
// Untyped (returns Run)
const run = await durably.getRun(runId)

// Typed (returns custom type)
type MyRun = Run & {
  input: { userId: string }
  output: { count: number } | null
}
const typedRun = await durably.getRun<MyRun>(runId)

getRuns()

ts
await durably.getRuns<T extends Run<TLabels> = Run<TLabels>>(filter?: RunFilter<TLabels>): Promise<T[]>

interface RunFilter<TLabels extends Record<string, string> = Record<string, string>> {
  jobName?: string | string[]  // single or multiple job names
  status?: 'pending' | 'leased' | 'completed' | 'failed' | 'cancelled'
  labels?: Partial<TLabels>    // filter by labels (all specified must match)
  limit?: number
  offset?: number
}

Gets runs with optional filtering and pagination. Supports generic type parameter.

ts
// Filter by multiple job names
const runs = await durably.getRuns({
  jobName: ['sync-users', 'import-data'],
  status: 'completed',
})

// Typed getRuns
type MyRun = Run & {
  input: { userId: string }
  output: { count: number } | null
}
const runs = await durably.getRuns<MyRun>({ jobName: 'my-job' })

Run Type

The Run object returned by getRun() and getRuns():

ts
interface Run<TLabels extends Record<string, string> = Record<string, string>> {
  id: string
  jobName: string
  input: unknown
  status: 'pending' | 'leased' | 'completed' | 'failed' | 'cancelled'
  idempotencyKey: string | null
  concurrencyKey: string | null
  currentStepIndex: number
  completedStepCount: number
  progress: { current: number; total?: number; message?: string } | null
  output: unknown | null
  error: string | null
  labels: TLabels
  leaseOwner: string | null
  leaseExpiresAt: string | null
  startedAt: string | null
  completedAt: string | null
  createdAt: string
  updatedAt: string
}
FieldTypeDescription
idstringUnique run ID
jobNamestringName of the job
inputunknownInput payload passed to the job
status'pending' | 'leased' | 'completed' | 'failed' | 'cancelled'Current run status
idempotencyKeystring | nullDeduplication key
concurrencyKeystring | nullConcurrency group key
currentStepIndexnumberIndex of the current step being executed
completedStepCountnumberTotal number of completed steps
progress{ current: number; total?: number; message?: string } | nullLatest progress report
outputunknown | nullReturn value of the job (when completed)
errorstring | nullError message (when failed)
labelsTLabels (defaults to Record<string, string>)Key/value labels for filtering (type-safe when schema provided)
leaseOwnerstring | nullWorker ID that holds the lease (null when not leased)
leaseExpiresAtstring | nullISO timestamp when the lease expires (null when not leased)
startedAtstring | nullISO timestamp when the run started
completedAtstring | nullISO timestamp when the run completed or failed
createdAtstringISO timestamp when the run was created
updatedAtstringISO timestamp of the last update

getJob()

ts
durably.getJob(name: string): JobHandle | undefined

Gets a registered job by name.

subscribe()

ts
durably.subscribe(runId: string): ReadableStream<DurablyEvent>

Subscribes to events for a specific run as a ReadableStream. The stream automatically closes when the run completes or fails.

Example

ts
import { createDurably } from '@coji/durably'
import { createClient } from '@libsql/client'
import { LibsqlDialect } from '@libsql/kysely-libsql'

const client = createClient({ url: 'file:local.db' })
const dialect = new LibsqlDialect({ client })

const durably = createDurably({
  dialect,
  pollingIntervalMs: 1000,
  leaseRenewIntervalMs: 5000,
  leaseMs: 30000,
})

// Initialize (migrate + start)
await durably.init()

// Define and register jobs
import { defineJob } from '@coji/durably'
import { z } from 'zod'

const myJobDef = defineJob({
  name: 'my-job',
  input: z.object({ id: z.string() }),
  run: async (step, input) => {
    // ...
  },
})

const { myJob } = durably.register({ myJob: myJobDef })

// Or trigger via durably.jobs
await durably.jobs.myJob.trigger({ id: '123' })

// Clean shutdown
process.on('SIGTERM', async () => {
  await durably.stop()
})

See Also

  • HTTP Handler — Expose Durably via HTTP/SSE for React clients
  • defineJob — Define jobs with typed schemas
  • Events — Subscribe to run and step events

Released under the MIT License.