Adapters
Adapters are pluggable storage backends. They determine how signal runs are persisted and retrieved. The runner polls the adapter for due entries on every tick, and signal handlers write their results back through it.
Two built-in adapters ship with Station. You can write custom adapters by implementing the SignalQueueAdapter interface (for signals) or BroadcastQueueAdapter interface (for broadcasts).
MemoryAdapter
The default adapter when none is specified. Stores all runs in a JavaScript Map inside the process. No data survives a restart. Good for development, testing, and single-run scripts where persistence is irrelevant.
- No external dependencies
- No configuration required
- Cannot share state across processes — each process gets its own isolated store
- Does not implement
SerializableAdapter, so child processes spawned by the runner cannot access the parent’s in-memory data
import { SignalRunner } from "station-signal"; // MemoryAdapter is the default — no configuration neededconst runner = new SignalRunner({ signalsDir: "./signals",});To explicitly construct one (for example, to pass to Station):
import { MemoryAdapter } from "station-signal"; const adapter = new MemoryAdapter();SqliteAdapter
Production-ready persistent storage backed by better-sqlite3 — synchronous C++ bindings that are significantly faster than async alternatives for single-node workloads.
WAL (Write-Ahead Logging) mode is enabled on connection, allowing concurrent reads and writes without blocking. Tables, indexes, and columns are created automatically on first use. Date fields are stored as ISO-8601 text strings.
The adapter interface is async (all methods return Promises) even though better-sqlite3 is synchronous. This preserves compatibility with the adapter contract so that truly async backends (Postgres, DynamoDB, etc.) can implement the same interface without friction.
Install
pnpm add station-adapter-sqlitepackage.json and re-run pnpm install:{ "pnpm": { "onlyBuiltDependencies": ["better-sqlite3"] }}Usage
import { SqliteAdapter } from "station-adapter-sqlite"; const adapter = new SqliteAdapter({ dbPath: "./jobs.db",});Options
| Option | Type | Default | Description |
|---|---|---|---|
dbPath | string | "station.db" | Path to the SQLite database file. Created automatically if it does not exist. |
tableName | string | "runs" | Table name for signal run entries. Must be alphanumeric and underscores only. A companion {tableName}_steps table is created for step data. |
Methods
Implements every method from SignalQueueAdapter (see below), plus:
| Method | Description |
|---|---|
close() | Close the database connection. Call during graceful shutdown to flush the WAL and release the file lock. |
toManifest() | Returns a serializable descriptor so child processes can reconstruct this adapter automatically. Part of the SerializableAdapter interface. |
Database schema
The adapter creates a runs table (or whatever you set in tableName) with these columns:
| Column | Type | Description |
|---|---|---|
id | TEXT PK | UUID generated by the adapter |
signal_name | TEXT | Name of the signal that owns this run |
kind | TEXT | trigger or recurring |
input | TEXT | JSON-serialized input payload |
output | TEXT | JSON-serialized output on completion |
error | TEXT | Error message on failure |
status | TEXT | pending | running | completed | failed | cancelled |
attempts | INTEGER | How many times this run has been attempted |
max_attempts | INTEGER | Maximum retry count before marking failed |
timeout | INTEGER | Timeout in milliseconds |
interval | TEXT | Recurring interval string (e.g. "every 5m"). Null for triggered runs. |
next_run_at | TEXT | ISO-8601 timestamp of when this run becomes due |
last_run_at | TEXT | ISO-8601 timestamp of last execution start |
started_at | TEXT | ISO-8601 timestamp when execution began |
completed_at | TEXT | ISO-8601 timestamp when execution finished |
created_at | TEXT | ISO-8601 timestamp when the run was queued |
Three indexes are created automatically: a composite index on (status, next_run_at) for the getRunsDue() query, a partial index on status where status = 'running' for the getRunsRunning() query, and an index on signal_name for listRuns() queries.
A companion {tableName}_steps table stores step execution records, linked by a foreign key with ON DELETE CASCADE.
BroadcastSqliteAdapter
Persistent storage for broadcast runs and their individual node runs. Ships in the same package as SqliteAdapter — import it from the /broadcast subpath.
Install
pnpm add station-adapter-sqliteUsage
import { BroadcastSqliteAdapter } from "station-adapter-sqlite/broadcast"; const broadcastAdapter = new BroadcastSqliteAdapter({ dbPath: "./jobs.db",});runs for signals and broadcast_runs / broadcast_runs_nodes for broadcasts. Sharing a file avoids managing multiple SQLite databases.Options
| Option | Type | Default | Description |
|---|---|---|---|
dbPath | string | "station.db" | Path to the SQLite database file. |
tableName | string | "broadcast_runs" | Table name for broadcast run entries. A companion {tableName}_nodes table is created for node runs. |
Methods
Implements every method from BroadcastQueueAdapter (see below), plus close() for graceful shutdown.
Sharing adapters across processes
When the runner spawns a child process to execute a signal handler, that child process needs to know which adapter to use. If the handler calls .trigger() on another signal, the trigger must write to the same database. There are two ways to solve this.
Automatic: SerializableAdapter
SqliteAdapter implements the SerializableAdapter interface. When the runner detects a serializable adapter, it passes a compact manifest (adapter name + constructor options) to the child process, which reconstructs an identical adapter instance automatically. No extra configuration is needed.
import path from "node:path";import { SignalRunner } from "station-signal";import { SqliteAdapter } from "station-adapter-sqlite"; // SqliteAdapter is serializable — child processes// reconstruct it from the manifest automatically.const runner = new SignalRunner({ signalsDir: path.join(import.meta.dirname, "signals"), adapter: new SqliteAdapter({ dbPath: "./jobs.db" }),});Manual: configModule
For adapters that are not serializable, or when you need to run additional setup code in the child process, use the configModule option. Create a module that calls configure(), and point the runner at it:
// config.tsimport { configure } from "station-signal";import { SqliteAdapter } from "station-adapter-sqlite"; configure({ adapter: new SqliteAdapter({ dbPath: "./jobs.db" }),});// runner.tsimport path from "node:path";import { SignalRunner } from "station-signal";import { SqliteAdapter } from "station-adapter-sqlite"; const runner = new SignalRunner({ signalsDir: path.join(import.meta.dirname, "signals"), adapter: new SqliteAdapter({ dbPath: "./jobs.db" }), configModule: path.join(import.meta.dirname, "config.ts"),});The runner imports the configModule before executing each signal handler in the child process. This sets the global adapter so that .trigger() calls inside handlers write to the correct database.
configModule. The runner’s adapter is already available.Writing a custom adapter
SignalQueueAdapter
Implement this interface to create a custom storage backend for signals. Every method is async to support both synchronous and network-based backends.
interface SignalQueueAdapter { addRun(run: Run): Promise<void>; removeRun(id: string): Promise<void>; getRun(id: string): Promise<Run | null>; getRunsDue(): Promise<Run[]>; getRunsRunning(): Promise<Run[]>; updateRun(id: string, patch: RunPatch): Promise<void>; listRuns(signalName: string): Promise<Run[]>; hasRunWithStatus(signalName: string, statuses: RunStatus[]): Promise<boolean>; purgeRuns(olderThan: Date, statuses: RunStatus[]): Promise<number>; addStep(step: Step): Promise<void>; updateStep(id: string, patch: StepPatch): Promise<void>; getSteps(runId: string): Promise<Step[]>; removeSteps(runId: string): Promise<void>; generateId(): string; ping(): Promise<boolean>; close?(): Promise<void>;}| Method | Contract |
|---|---|
addRun(run) | Store a new run. The run arrives with status: "pending" and a nextRunAt timestamp indicating when it becomes due. |
removeRun(id) | Delete a run and its associated steps. Called for completed non-recurring runs during cleanup. |
getRun(id) | Retrieve a single run by ID. Return null if it does not exist. |
getRunsDue() | Return all runs where status === "pending" and nextRunAt <= now (or nextRunAt is null). The runner calls this on every poll tick. Order by createdAt ascending. |
getRunsRunning() | Return all runs with status === "running". Used by the runner for timeout detection. |
updateRun(id, patch) | Partially update a run’s fields. Used to change status, increment attempts, set timestamps, and store output or error messages. |
listRuns(signalName) | Return all runs for a given signal name. Used by Station and for concurrency checks. |
hasRunWithStatus(signalName, statuses) | Return true if any run for the given signal has one of the specified statuses. Used for concurrency gating (e.g. preventing duplicate recurring runs). |
purgeRuns(olderThan, statuses) | Delete runs in terminal statuses whose completedAt is older than the given date. Return the count deleted. |
addStep(step) | Store a new step record. Steps belong to a run and track individual step execution within multi-step signals. |
updateStep(id, patch) | Partially update a step’s fields — status, output, error, timestamps. |
getSteps(runId) | Return all steps for a given run. |
removeSteps(runId) | Delete all steps for a given run. |
generateId() | Return a unique ID string for new runs and steps. UUID, nanoid, ULID, or any scheme that produces unique strings. |
ping() | Health check. Return true if the adapter is operational. Called during runner startup and by Station’s health endpoint. |
close() | Optional. Clean up resources (close database connections, flush buffers). Called during graceful shutdown. |
BroadcastQueueAdapter
Implement this interface for custom broadcast storage. Broadcast adapters track two entity types: broadcast runs (the overall execution) and node runs (one per DAG node per execution).
interface BroadcastQueueAdapter { addBroadcastRun(run: BroadcastRun): Promise<void>; getBroadcastRun(id: string): Promise<BroadcastRun | null>; updateBroadcastRun(id: string, patch: BroadcastRunPatch): Promise<void>; getBroadcastRunsDue(): Promise<BroadcastRun[]>; getBroadcastRunsRunning(): Promise<BroadcastRun[]>; listBroadcastRuns(broadcastName: string): Promise<BroadcastRun[]>; hasBroadcastRunWithStatus( broadcastName: string, statuses: BroadcastRunStatus[], ): Promise<boolean>; purgeBroadcastRuns( olderThan: Date, statuses: BroadcastRunStatus[], ): Promise<number>; addNodeRun(nodeRun: BroadcastNodeRun): Promise<void>; getNodeRun(id: string): Promise<BroadcastNodeRun | null>; updateNodeRun(id: string, patch: BroadcastNodeRunPatch): Promise<void>; getNodeRuns(broadcastRunId: string): Promise<BroadcastNodeRun[]>; generateId(): string; ping(): Promise<boolean>; close?(): Promise<void>;}| Method | Contract |
|---|---|
addBroadcastRun(run) | Store a new broadcast run with status: "pending". Includes the broadcast name, serialized input, failure policy, and scheduling fields. |
getBroadcastRun(id) | Retrieve a broadcast run by ID. Return null if not found. |
updateBroadcastRun(id, patch) | Partially update broadcast run fields — status, timestamps, error. |
getBroadcastRunsDue() | Return pending broadcast runs where nextRunAt <= now. Polled on each broadcast runner tick. |
getBroadcastRunsRunning() | Return all broadcast runs with status === "running". Used for timeout detection. |
listBroadcastRuns(broadcastName) | Return all runs for a given broadcast name. |
hasBroadcastRunWithStatus(name, statuses) | Return true if any run for the broadcast has one of the specified statuses. |
purgeBroadcastRuns(olderThan, statuses) | Delete broadcast runs (and their node runs via cascade) older than the given date. Return count deleted. |
addNodeRun(nodeRun) | Store a node run. Each node in the DAG gets one record per broadcast execution. Includes the node name, linked signal name, and initial status. |
getNodeRun(id) | Retrieve a single node run by ID. Return null if not found. |
updateNodeRun(id, patch) | Partially update a node run — status, signal run ID, output, error, skip reason, timestamps. |
getNodeRuns(broadcastRunId) | Return all node runs for a given broadcast run. |
generateId() | Return a unique ID string for new broadcast and node runs. |
ping() | Health check. Return true if operational. |
close() | Optional. Clean up resources during graceful shutdown. |
SerializableAdapter
If you write a custom adapter that needs to work across processes (child process execution of signal handlers), implement the SerializableAdapter interface in addition to SignalQueueAdapter:
interface SerializableAdapter extends SignalQueueAdapter { toManifest(): AdapterManifest;} interface AdapterManifest { name: string; // Registry key (e.g. "sqlite") options: Record<string, unknown>; // Constructor options (must be JSON-serializable) moduleUrl?: string; // Absolute URL to the module that registers this adapter}Register your adapter with a factory function so the child process can reconstruct it from the manifest:
import { registerAdapter } from "station-signal"; registerAdapter("my-adapter", (options) => { return new MyAdapter(options as MyAdapterOptions);});When the runner detects a SerializableAdapter, it skips the configModule path entirely — the manifest is passed to the child process as a lightweight JSON payload, and the adapter is reconstructed from the registered factory.