API Reference

Signals

A signal is a named, type-safe background job. You define its input schema, handler function, and execution constraints. The runner picks it up, spawns an isolated child process, and manages retries, timeouts, and concurrency on your behalf.

signal(name)

Creates a named signal definition. The name must be unique across your application, start with a letter, and contain only letters, digits, hyphens, and underscores. Returns a builder with chainable methods.

import { signal, z } from "station-signal"; const sendEmail = signal("sendEmail")  .input(z.object({    to: z.string().email(),    subject: z.string(),    body: z.string(),  }))  .output(z.object({    messageId: z.string(),    sentAt: z.string().datetime(),  }))  .timeout(10_000)        // 10 seconds  .retries(3)             // up to 4 total attempts  .concurrency(5)         // max 5 concurrent sends  .onComplete(async (output, input) => {    console.log(`Email ${output.messageId} sent to ${input.to}`);  })  .run(async (input) => {    const result = await emailService.send(input);    return {      messageId: result.id,      sentAt: new Date().toISOString(),    };  });

Builder methods

.input(schema)

Sets the Zod schema for input validation. Every call to .trigger() validates the provided data against this schema before enqueuing. If validation fails, the run is rejected immediately and never enters the queue. The schema also drives TypeScript type inference: your handler receives the exact inferred type, and the compiler enforces it at build time.

const processOrder = signal("processOrder")  .input(z.object({    orderId: z.string().uuid(),    items: z.array(z.object({      sku: z.string(),      quantity: z.number().int().positive(),    })),  }))  .run(async (input) => {    // input is typed as { orderId: string; items: { sku: string; quantity: number }[] }    for (const item of input.items) {      await reserveInventory(item.sku, item.quantity);    }  });

If no input schema is provided, the signal accepts an empty object {} by default.

.output(schema)

Optional output schema. When provided, the handler's return value is validated against it. This is particularly useful when chaining signals inside broadcasts: the downstream signal's input type must match the upstream signal's output type. The validated output is JSON-serialized and stored on the run record, making it available to subscribers and the broadcast orchestrator.

const geocode = signal("geocode")  .input(z.object({ address: z.string() }))  .output(z.object({    lat: z.number(),    lng: z.number(),  }))  .run(async (input) => {    const coords = await geocodingApi.lookup(input.address);    return { lat: coords.latitude, lng: coords.longitude };  });

.timeout(ms)

Maximum execution time in milliseconds. If the handler exceeds this duration, the child process is killed with SIGTERM and the run is marked "failed" with a timeout error. Default: 300_000 (5 minutes). Set this lower for operations that should fail fast, such as HTTP requests or payment processing.

const healthCheck = signal("healthCheck")  .input(z.object({ url: z.string().url() }))  .timeout(5_000) // 5 seconds — fail fast if the service is down  .run(async (input) => {    const res = await fetch(input.url);    if (!res.ok) throw new Error(`Health check failed: ${res.status}`);  });

.retries(n)

Maximum retry attempts after the initial failure. Total execution attempts = 1 + n. Default: 0 (no retry). When a run fails and has remaining attempts, it is re-enqueued with "pending" status and an incremented attempt counter. The runner applies exponential backoff between retries (base delay configurable via retryBackoffMs on the runner). The retry delay doubles with each attempt: base * 2^(attempt - 1).

const syncInventory = signal("syncInventory")  .input(z.object({ warehouseId: z.string() }))  .retries(3)  // up to 4 total attempts (1 initial + 3 retries)  .run(async (input) => {    await externalApi.sync(input.warehouseId);  });

.concurrency(n)

Maximum concurrent executions of this specific signal. When the limit is reached, additional runs remain in the queue and are picked up on subsequent poll ticks as slots open. This is separate from the runner-level maxConcurrent setting, which caps total concurrent executions across all signal types. Use per-signal concurrency to rate-limit API calls or protect database-heavy operations.

const callStripeApi = signal("callStripeApi")  .input(z.object({ customerId: z.string(), amount: z.number() }))  .concurrency(3) // max 3 concurrent Stripe API calls  .retries(2)  .run(async (input) => {    await stripe.charges.create({      customer: input.customerId,      amount: input.amount,      currency: "usd",    });  });

.every(interval)

Makes the signal recurring. Accepts human-readable duration strings: "30s", "5m", "1h", "1d". After each completion, the runner automatically enqueues the next run. If a pending or running instance already exists for this signal, the scheduler skips that tick and advances the schedule to prevent overlapping executions.

const cleanupExpiredSessions = signal("cleanupExpiredSessions")  .every("15m")  .run(async () => {    const deleted = await db.sessions.deleteExpired();    console.log(`Cleaned up ${deleted} expired sessions`);  });

.withInput(data)

Default input data for recurring signals. Without this, recurring signals run with {} as input. The data must conform to the input schema if one is defined.

const dailyReport = signal("dailyReport")  .input(z.object({    reportType: z.enum(["summary", "detailed"]),    recipients: z.array(z.string().email()),  }))  .every("1d")  .withInput({    reportType: "summary",    recipients: ["ops@example.com", "team@example.com"],  })  .run(async (input) => {    const report = await generateReport(input.reportType);    await sendReport(report, input.recipients);  });

.step(name, fn)

Adds a named execution step. Steps run sequentially within the same child process. The first step receives the signal's input. Each subsequent step receives the return value of the previous step. Step completion events are emitted to subscribers as each step finishes, giving you granular progress tracking. When using steps, finalize with .build() instead of .run().

const processPayment = signal("processPayment")  .input(z.object({    orderId: z.string(),    amount: z.number(),    currency: z.string(),  }))  .step("validate", async (input) => {    const order = await db.orders.findById(input.orderId);    if (!order) throw new Error(`Order ${input.orderId} not found`);    return { order, amount: input.amount, currency: input.currency };  })  .step("charge", async (prev) => {    const charge = await paymentGateway.charge({      amount: prev.amount,      currency: prev.currency,    });    return { orderId: prev.order.id, chargeId: charge.id };  })  .step("confirm", async (prev) => {    await db.orders.update(prev.orderId, { chargeId: prev.chargeId, status: "paid" });    return { orderId: prev.orderId, chargeId: prev.chargeId };  })  .build();

.onComplete(fn)

Post-completion callback. Called with (output, input) after the handler finishes successfully. Runs in the same child process as the handler. If the callback throws, the run is still marked as completed but an onCompleteError event is emitted to subscribers. Use this for side effects like sending notifications, updating caches, or triggering downstream signals.

const generateInvoice = signal("generateInvoice")  .input(z.object({ orderId: z.string() }))  .output(z.object({ invoiceUrl: z.string().url() }))  .onComplete(async (output, input) => {    await notificationService.send({      channel: "email",      template: "invoice-ready",      data: { orderId: input.orderId, invoiceUrl: output.invoiceUrl },    });  })  .run(async (input) => {    const pdf = await renderInvoice(input.orderId);    const url = await storage.upload(pdf);    return { invoiceUrl: url };  });
.onComplete() can be called before or after .run(). When chaining after .run(), it returns the final Signal object. On a step-based builder, call it before .build().

.run(handler)

Sets the handler function and finalizes the signal. The handler receives the validated input and should return the output (or void). The handler runs in an isolated child process spawned by the runner, so it has its own memory space and cannot crash the runner.

const resizeImage = signal("resizeImage")  .input(z.object({    sourceUrl: z.string().url(),    width: z.number().int().positive(),    height: z.number().int().positive(),  }))  .run(async (input) => {    const image = await downloadImage(input.sourceUrl);    const resized = await sharp(image).resize(input.width, input.height).toBuffer();    await storage.upload(resized);  });

.build()

Finalizes a step-based signal. Use this instead of .run() when the signal is defined with .step() calls. Returns the same Signal object.


Signal instance

After calling .run() or .build(), you get a Signal object. This is the handle you use to trigger runs and the object you pass to the runner for registration.

PropertyTypeDescription
.namestringThe signal name passed to signal().
.inputSchemaz.ZodTypeThe Zod input schema. Defaults to z.object({}) if none was provided.
.outputSchemaz.ZodType | undefinedThe Zod output schema, if one was set via .output().
.timeoutnumberTimeout in milliseconds. Default: 300_000.
.maxAttemptsnumberTotal attempts (1 + retries). Default: 1.
.maxConcurrencynumber | undefinedPer-signal concurrency limit, if set.
.intervalstring | undefinedRecurring interval string (e.g. "5m"), if set.
.recurringInputTInput | undefinedDefault input for recurring runs, if set via .withInput().
.handlerFunction | undefinedThe handler function, if defined via .run().
.stepsStepDefinition[] | undefinedArray of step definitions, if defined via .step().

.trigger(input)

Enqueues a run for execution. Validates the input against the schema first. Returns a Promise<string> resolving to the unique run ID. Throws SignalValidationError if the input fails validation. The run is written to the adapter with "pending" status and picked up by the runner on the next poll tick.

const runId = await sendEmail.trigger({  to: "user@example.com",  subject: "Order Confirmation",  body: "Your order has been placed.",}); console.log(`Enqueued run: ${runId}`);

SignalRunner

The runner is the long-running process that polls the adapter for due entries, spawns isolated child processes to execute signal handlers, and manages the full signal lifecycle: enqueue, dispatch, execute, retry, timeout, and completion. It also handles recurring signal scheduling, per-signal concurrency enforcement, and graceful shutdown on SIGINT/SIGTERM.

import { SignalRunner, ConsoleSubscriber } from "station-signal";import { SqliteAdapter } from "station-adapter-sqlite"; const runner = new SignalRunner({  signalsDir: "./src/signals",  adapter: new SqliteAdapter({ dbPath: "./data/signals.db" }),  subscribers: [new ConsoleSubscriber()],  pollIntervalMs: 1000,  maxConcurrent: 10,  retryBackoffMs: 2000,}); await runner.start();

Constructor options

OptionTypeDefaultDescription
signalsDirstringDirectory path for auto-discovery. The runner recursively imports all .ts and .js files and registers any exported signal objects. Paths are resolved relative to the working directory.
adapterSignalQueueAdapterMemoryAdapterStorage backend for run persistence. The default MemoryAdapter is in-process only and loses data on restart. Use SqliteAdapter for production. The runner automatically calls configure({ adapter }) so child processes can access the same adapter.
subscribersSignalSubscriber[][]Array of subscriber objects notified on lifecycle events. Subscribers have all-optional methods; implement only the events you need.
pollIntervalMsnumber1000Milliseconds between poll ticks. Each tick checks for due runs, running timeouts, and recurring schedules. Lower values give faster pickup but higher CPU usage.
maxConcurrentnumber5Global maximum concurrent child processes across all signal types. When this limit is reached, no new runs are dispatched until a slot opens. This is independent of per-signal .concurrency() limits.
maxAttemptsnumber1Default max attempts for signals that do not specify their own via .retries(). Per-signal settings override this.
retryBackoffMsnumber1000Base delay in milliseconds for exponential retry backoff. The actual delay is retryBackoffMs * 2^(attempt - 1). First retry waits 1s, second 2s, third 4s, and so on.

Methods

MethodReturnsDescription
start()Promise<void>Start the poll loop. Discovers signals from signalsDir (if set), installs SIGINT/SIGTERM shutdown handlers, and begins polling. This method blocks until stop() is called.
stop(opts?)Promise<void>Gracefully stop the runner. Accepts optional { graceful?: boolean, timeoutMs?: number }. When graceful, waits for active child processes to finish (up to the timeout), then kills any remaining. Closes the adapter to release resources.
register(name, filePath, opts?)thisManually register a signal by name and file path. Alternative to auto-discovery via signalsDir. Accepts optional { maxConcurrency }.
listRegistered()Array<{ name, filePath, maxConcurrency? }>Returns metadata for all registered signals.
hasSignal(name)booleanCheck whether a signal is registered by name.
getRun(id)Promise<Run | null>Look up a run by its ID.
listRuns(signalName)Promise<Run[]>List all runs for a specific signal.
getSteps(runId)Promise<Step[]>Get all step records for a multi-step run.
waitForRun(runId, opts?)Promise<Run | null>Poll until a run reaches a terminal status (completed, failed, or cancelled). Options: { pollMs?, timeoutMs?, waitForExistence? }. Returns null if the run does not exist and waitForExistence is false.
cancel(runId)Promise<boolean>Cancel a specific run. Marks it as "cancelled" and kills the child process if running. Returns false if the run does not exist or is already in a terminal state.
purgeCompleted(olderThanMs)Promise<number>Delete completed, failed, and cancelled runs older than the specified age in milliseconds. Returns the count of purged runs.
getAdapter()SignalQueueAdapterAccess the underlying queue adapter. Used by the broadcast runner to coordinate with the signal layer.
subscribe(subscriber)thisAdd a subscriber after construction. Chainable.

SignalRunner.create(signalsDir, opts?)

Static factory method. Creates a runner with the given signals directory and a default ConsoleSubscriber if no subscribers are provided.

const runner = SignalRunner.create("./src/signals", {  adapter: new SqliteAdapter({ dbPath: "./data/signals.db" }),  maxConcurrent: 10,});

SignalQueueAdapter

The adapter interface defines storage operations for runs and steps. Implement this to use a custom storage backend. Two adapters ship with the framework: MemoryAdapter (built into station-signal) and SqliteAdapter (from station-adapter-sqlite).

MethodSignatureDescription
addRun(run)(Run) => Promise<void>Insert a new run record into the store.
removeRun(id)(string) => Promise<void>Delete a run and its associated steps by run ID.
getRun(id)(string) => Promise<Run | null>Look up a single run by ID. Returns null if not found.
getRunsDue()() => Promise<Run[]>Get all runs with "pending" status whose nextRunAt is in the past (or unset). Results are sorted by creation time, oldest first.
getRunsRunning()() => Promise<Run[]>Get all runs with "running" status. Used by the runner for timeout detection.
updateRun(id, patch)(string, RunPatch) => Promise<void>Partially update a run. Identity fields (id, signalName, kind, createdAt) are immutable and excluded from the patch type.
listRuns(signalName)(string) => Promise<Run[]>List all runs for a specific signal name.
hasRunWithStatus(name, statuses)(string, RunStatus[]) => Promise<boolean>Check if any run exists for the given signal in one of the specified statuses. Used for recurring overlap prevention.
purgeRuns(olderThan, statuses)(Date, RunStatus[]) => Promise<number>Delete runs in the given statuses that completed before the cutoff date. Returns the count deleted.
addStep(step)(Step) => Promise<void>Insert a step record.
updateStep(id, patch)(string, StepPatch) => Promise<void>Partially update a step record.
getSteps(runId)(string) => Promise<Step[]>Get all steps for a given run ID.
removeSteps(runId)(string) => Promise<void>Delete all steps for a given run ID.
generateId()() => stringGenerate a unique run ID. The built-in adapters use crypto.randomUUID().
ping()() => Promise<boolean>Health check. Returns true if the adapter is operational.
close()() => Promise<void>Optional. Release resources (database connections, file handles). Called automatically by the runner on stop.

Run

The Run interface represents a single execution of a signal. It is the primary record stored by the adapter.

FieldTypeDescription
idstringUnique run identifier (UUID).
signalNamestringName of the signal this run belongs to.
kind"trigger" | "recurring"Whether this run was created by an explicit .trigger() call or by the recurring scheduler.
inputstringJSON-serialized input data.
outputstring | undefinedJSON-serialized output from the handler, set on completion.
errorstring | undefinedError message, set on failure or timeout.
status"pending" | "running" | "completed" | "failed" | "cancelled"Current lifecycle state. Transitions: pending → running → completed/failed. Can also be cancelled from any non-terminal state.
attemptsnumberNumber of attempts executed so far. Starts at 0, incremented when dispatched.
maxAttemptsnumberMaximum allowed attempts (1 + retries).
timeoutnumberTimeout in milliseconds for this run.
intervalstring | undefinedRecurring interval string, present only for recurring runs.
nextRunAtDate | undefinedEarliest time this run should be picked up.
lastRunAtDate | undefinedTimestamp of the most recent execution attempt.
startedAtDate | undefinedWhen the current attempt started (reset on retry).
completedAtDate | undefinedWhen the run reached a terminal state.
createdAtDateWhen the run was created.

Step

For multi-step signals, each step has its own record:

FieldTypeDescription
idstringUnique step identifier.
runIdstringID of the parent run.
namestringStep name as defined in .step(name, fn).
status"pending" | "running" | "completed" | "failed"Current step state.
inputstring | undefinedJSON-serialized input passed to this step.
outputstring | undefinedJSON-serialized return value of this step.
errorstring | undefinedError message if the step failed.
startedAtDate | undefinedWhen step execution started.
completedAtDate | undefinedWhen step execution finished.

SignalSubscriber

All methods are optional. Implement only the events you need. Subscriber methods are called synchronously and should not throw. If a subscriber throws, the error is caught, logged, and does not affect signal execution.

MethodEvent dataWhen it fires
onSignalDiscovered{ signalName, filePath }A signal file was found during auto-discovery from signalsDir.
onRunDispatched{ run }A run was marked as "running" and the child process is about to spawn.
onRunStarted{ run }The child process confirmed it found the signal and is about to execute the handler.
onRunCompleted{ run, output? }The handler finished successfully. output is the JSON-serialized return value.
onRunTimeout{ run }A running run exceeded its timeout. The child process is killed. If retries remain, the run is re-enqueued; otherwise it fails.
onRunRetry{ run, attempt, maxAttempts }A failed run was reset to "pending" for another attempt. attempt is the current attempt number.
onRunFailed{ run, error? }A run failed terminally. All retries are exhausted, or the error was marked as non-retryable.
onRunCancelled{ run }A run was cancelled via runner.cancel().
onRunSkipped{ run, reason }A due run was skipped because the per-signal concurrency limit was reached or backoff has not elapsed.
onRunRescheduled{ run, nextRunAt }A recurring run was enqueued and the next execution time was computed.
onStepStarted{ run, step }A step within a multi-step run started execution.
onStepCompleted{ run, step }A step completed successfully.
onStepFailed{ run, step }A step threw an error.
onCompleteError{ run, error }The onComplete callback threw. The run is still marked as completed because the handler itself succeeded.
onLogOutput{ run, level, message }Console output (stdout or stderr) captured from the child process.

The built-in ConsoleSubscriber logs all events to stdout with a [station-signal] prefix.


configure()

Sets a global default adapter. When the runner spawns a child process, that child may need to enqueue new runs (for example, if a signal's handler calls .trigger() on another signal). The child process reconstructs the adapter from serialized metadata and calls configure() automatically. You rarely need to call this directly; the runner does it for you in its constructor.

import { configure } from "station-signal";import { SqliteAdapter } from "station-adapter-sqlite"; configure({  adapter: new SqliteAdapter({ dbPath: "./data/signals.db" }),});
If configure() is called more than once, a warning is logged and the previous adapter is replaced. Each runner should use its own adapter instance.

Re-exported Zod

import { signal, z } from "station-signal";

station-signal re-exports z from Zod v4. One import for schema definitions and signal definitions. No need to install Zod as a separate dependency.