API Reference

Adapters

Adapters are pluggable storage backends. They determine how signal runs are persisted and retrieved. The runner polls the adapter for due entries on every tick, and signal handlers write their results back through it.

Two built-in adapters ship with Station. You can write custom adapters by implementing the SignalQueueAdapter interface (for signals) or BroadcastQueueAdapter interface (for broadcasts).


MemoryAdapter

The default adapter when none is specified. Stores all runs in a JavaScript Map inside the process. No data survives a restart. Good for development, testing, and single-run scripts where persistence is irrelevant.

import { SignalRunner } from "station-signal"; // MemoryAdapter is the default — no configuration neededconst runner = new SignalRunner({  signalsDir: "./signals",});

To explicitly construct one (for example, to pass to Station):

import { MemoryAdapter } from "station-signal"; const adapter = new MemoryAdapter();

SqliteAdapter

Production-ready persistent storage backed by better-sqlite3 — synchronous C++ bindings that are significantly faster than async alternatives for single-node workloads.

WAL (Write-Ahead Logging) mode is enabled on connection, allowing concurrent reads and writes without blocking. Tables, indexes, and columns are created automatically on first use. Date fields are stored as ISO-8601 text strings.

The adapter interface is async (all methods return Promises) even though better-sqlite3 is synchronous. This preserves compatibility with the adapter contract so that truly async backends (Postgres, DynamoDB, etc.) can implement the same interface without friction.

Install

pnpm add station-adapter-sqlite
pnpm 10+: better-sqlite3 is a native addon that compiles C++ during installation. pnpm 10 blocks dependency build scripts by default. Add this to your project’s package.json and re-run pnpm install:
{  "pnpm": {    "onlyBuiltDependencies": ["better-sqlite3"]  }}

Usage

import { SqliteAdapter } from "station-adapter-sqlite"; const adapter = new SqliteAdapter({  dbPath: "./jobs.db",});

Options

OptionTypeDefaultDescription
dbPathstring"station.db"Path to the SQLite database file. Created automatically if it does not exist.
tableNamestring"runs"Table name for signal run entries. Must be alphanumeric and underscores only. A companion {tableName}_steps table is created for step data.

Methods

Implements every method from SignalQueueAdapter (see below), plus:

MethodDescription
close()Close the database connection. Call during graceful shutdown to flush the WAL and release the file lock.
toManifest()Returns a serializable descriptor so child processes can reconstruct this adapter automatically. Part of the SerializableAdapter interface.

Database schema

The adapter creates a runs table (or whatever you set in tableName) with these columns:

ColumnTypeDescription
idTEXT PKUUID generated by the adapter
signal_nameTEXTName of the signal that owns this run
kindTEXTtrigger or recurring
inputTEXTJSON-serialized input payload
outputTEXTJSON-serialized output on completion
errorTEXTError message on failure
statusTEXTpending | running | completed | failed | cancelled
attemptsINTEGERHow many times this run has been attempted
max_attemptsINTEGERMaximum retry count before marking failed
timeoutINTEGERTimeout in milliseconds
intervalTEXTRecurring interval string (e.g. "every 5m"). Null for triggered runs.
next_run_atTEXTISO-8601 timestamp of when this run becomes due
last_run_atTEXTISO-8601 timestamp of last execution start
started_atTEXTISO-8601 timestamp when execution began
completed_atTEXTISO-8601 timestamp when execution finished
created_atTEXTISO-8601 timestamp when the run was queued

Three indexes are created automatically: a composite index on (status, next_run_at) for the getRunsDue() query, a partial index on status where status = 'running' for the getRunsRunning() query, and an index on signal_name for listRuns() queries.

A companion {tableName}_steps table stores step execution records, linked by a foreign key with ON DELETE CASCADE.


BroadcastSqliteAdapter

Persistent storage for broadcast runs and their individual node runs. Ships in the same package as SqliteAdapter — import it from the /broadcast subpath.

Install

pnpm add station-adapter-sqlite

Usage

import { BroadcastSqliteAdapter } from "station-adapter-sqlite/broadcast"; const broadcastAdapter = new BroadcastSqliteAdapter({  dbPath: "./jobs.db",});
You can (and should) point both adapters at the same database file. They use separate tables: runs for signals and broadcast_runs / broadcast_runs_nodes for broadcasts. Sharing a file avoids managing multiple SQLite databases.

Options

OptionTypeDefaultDescription
dbPathstring"station.db"Path to the SQLite database file.
tableNamestring"broadcast_runs"Table name for broadcast run entries. A companion {tableName}_nodes table is created for node runs.

Methods

Implements every method from BroadcastQueueAdapter (see below), plus close() for graceful shutdown.


Sharing adapters across processes

When the runner spawns a child process to execute a signal handler, that child process needs to know which adapter to use. If the handler calls .trigger() on another signal, the trigger must write to the same database. There are two ways to solve this.

Automatic: SerializableAdapter

SqliteAdapter implements the SerializableAdapter interface. When the runner detects a serializable adapter, it passes a compact manifest (adapter name + constructor options) to the child process, which reconstructs an identical adapter instance automatically. No extra configuration is needed.

import path from "node:path";import { SignalRunner } from "station-signal";import { SqliteAdapter } from "station-adapter-sqlite"; // SqliteAdapter is serializable — child processes// reconstruct it from the manifest automatically.const runner = new SignalRunner({  signalsDir: path.join(import.meta.dirname, "signals"),  adapter: new SqliteAdapter({ dbPath: "./jobs.db" }),});

Manual: configModule

For adapters that are not serializable, or when you need to run additional setup code in the child process, use the configModule option. Create a module that calls configure(), and point the runner at it:

// config.tsimport { configure } from "station-signal";import { SqliteAdapter } from "station-adapter-sqlite"; configure({  adapter: new SqliteAdapter({ dbPath: "./jobs.db" }),});
// runner.tsimport path from "node:path";import { SignalRunner } from "station-signal";import { SqliteAdapter } from "station-adapter-sqlite"; const runner = new SignalRunner({  signalsDir: path.join(import.meta.dirname, "signals"),  adapter: new SqliteAdapter({ dbPath: "./jobs.db" }),  configModule: path.join(import.meta.dirname, "config.ts"),});

The runner imports the configModule before executing each signal handler in the child process. This sets the global adapter so that .trigger() calls inside handlers write to the correct database.

If your triggers always happen in the runner process (e.g. recurring signals, or signals triggered from an API route in the same process), you do not need configModule. The runner’s adapter is already available.

Writing a custom adapter

SignalQueueAdapter

Implement this interface to create a custom storage backend for signals. Every method is async to support both synchronous and network-based backends.

interface SignalQueueAdapter {  addRun(run: Run): Promise<void>;  removeRun(id: string): Promise<void>;  getRun(id: string): Promise<Run | null>;  getRunsDue(): Promise<Run[]>;  getRunsRunning(): Promise<Run[]>;  updateRun(id: string, patch: RunPatch): Promise<void>;  listRuns(signalName: string): Promise<Run[]>;  hasRunWithStatus(signalName: string, statuses: RunStatus[]): Promise<boolean>;  purgeRuns(olderThan: Date, statuses: RunStatus[]): Promise<number>;   addStep(step: Step): Promise<void>;  updateStep(id: string, patch: StepPatch): Promise<void>;  getSteps(runId: string): Promise<Step[]>;  removeSteps(runId: string): Promise<void>;   generateId(): string;  ping(): Promise<boolean>;  close?(): Promise<void>;}
MethodContract
addRun(run)Store a new run. The run arrives with status: "pending" and a nextRunAt timestamp indicating when it becomes due.
removeRun(id)Delete a run and its associated steps. Called for completed non-recurring runs during cleanup.
getRun(id)Retrieve a single run by ID. Return null if it does not exist.
getRunsDue()Return all runs where status === "pending" and nextRunAt <= now (or nextRunAt is null). The runner calls this on every poll tick. Order by createdAt ascending.
getRunsRunning()Return all runs with status === "running". Used by the runner for timeout detection.
updateRun(id, patch)Partially update a run’s fields. Used to change status, increment attempts, set timestamps, and store output or error messages.
listRuns(signalName)Return all runs for a given signal name. Used by Station and for concurrency checks.
hasRunWithStatus(signalName, statuses)Return true if any run for the given signal has one of the specified statuses. Used for concurrency gating (e.g. preventing duplicate recurring runs).
purgeRuns(olderThan, statuses)Delete runs in terminal statuses whose completedAt is older than the given date. Return the count deleted.
addStep(step)Store a new step record. Steps belong to a run and track individual step execution within multi-step signals.
updateStep(id, patch)Partially update a step’s fields — status, output, error, timestamps.
getSteps(runId)Return all steps for a given run.
removeSteps(runId)Delete all steps for a given run.
generateId()Return a unique ID string for new runs and steps. UUID, nanoid, ULID, or any scheme that produces unique strings.
ping()Health check. Return true if the adapter is operational. Called during runner startup and by Station’s health endpoint.
close()Optional. Clean up resources (close database connections, flush buffers). Called during graceful shutdown.

BroadcastQueueAdapter

Implement this interface for custom broadcast storage. Broadcast adapters track two entity types: broadcast runs (the overall execution) and node runs (one per DAG node per execution).

interface BroadcastQueueAdapter {  addBroadcastRun(run: BroadcastRun): Promise<void>;  getBroadcastRun(id: string): Promise<BroadcastRun | null>;  updateBroadcastRun(id: string, patch: BroadcastRunPatch): Promise<void>;  getBroadcastRunsDue(): Promise<BroadcastRun[]>;  getBroadcastRunsRunning(): Promise<BroadcastRun[]>;  listBroadcastRuns(broadcastName: string): Promise<BroadcastRun[]>;  hasBroadcastRunWithStatus(    broadcastName: string,    statuses: BroadcastRunStatus[],  ): Promise<boolean>;  purgeBroadcastRuns(    olderThan: Date,    statuses: BroadcastRunStatus[],  ): Promise<number>;   addNodeRun(nodeRun: BroadcastNodeRun): Promise<void>;  getNodeRun(id: string): Promise<BroadcastNodeRun | null>;  updateNodeRun(id: string, patch: BroadcastNodeRunPatch): Promise<void>;  getNodeRuns(broadcastRunId: string): Promise<BroadcastNodeRun[]>;   generateId(): string;  ping(): Promise<boolean>;  close?(): Promise<void>;}
MethodContract
addBroadcastRun(run)Store a new broadcast run with status: "pending". Includes the broadcast name, serialized input, failure policy, and scheduling fields.
getBroadcastRun(id)Retrieve a broadcast run by ID. Return null if not found.
updateBroadcastRun(id, patch)Partially update broadcast run fields — status, timestamps, error.
getBroadcastRunsDue()Return pending broadcast runs where nextRunAt <= now. Polled on each broadcast runner tick.
getBroadcastRunsRunning()Return all broadcast runs with status === "running". Used for timeout detection.
listBroadcastRuns(broadcastName)Return all runs for a given broadcast name.
hasBroadcastRunWithStatus(name, statuses)Return true if any run for the broadcast has one of the specified statuses.
purgeBroadcastRuns(olderThan, statuses)Delete broadcast runs (and their node runs via cascade) older than the given date. Return count deleted.
addNodeRun(nodeRun)Store a node run. Each node in the DAG gets one record per broadcast execution. Includes the node name, linked signal name, and initial status.
getNodeRun(id)Retrieve a single node run by ID. Return null if not found.
updateNodeRun(id, patch)Partially update a node run — status, signal run ID, output, error, skip reason, timestamps.
getNodeRuns(broadcastRunId)Return all node runs for a given broadcast run.
generateId()Return a unique ID string for new broadcast and node runs.
ping()Health check. Return true if operational.
close()Optional. Clean up resources during graceful shutdown.

SerializableAdapter

If you write a custom adapter that needs to work across processes (child process execution of signal handlers), implement the SerializableAdapter interface in addition to SignalQueueAdapter:

interface SerializableAdapter extends SignalQueueAdapter {  toManifest(): AdapterManifest;} interface AdapterManifest {  name: string;                       // Registry key (e.g. "sqlite")  options: Record<string, unknown>;   // Constructor options (must be JSON-serializable)  moduleUrl?: string;                 // Absolute URL to the module that registers this adapter}

Register your adapter with a factory function so the child process can reconstruct it from the manifest:

import { registerAdapter } from "station-signal"; registerAdapter("my-adapter", (options) => {  return new MyAdapter(options as MyAdapterOptions);});

When the runner detects a SerializableAdapter, it skips the configModule path entirely — the manifest is passed to the child process as a lightweight JSON payload, and the adapter is reconstructed from the registered factory.