API Reference

Broadcasts

A broadcast chains multiple signals into a directed acyclic graph (DAG). Each node in the graph is a signal. Nodes execute when all their upstream dependencies complete. Outputs from upstream nodes are passed to downstream nodes automatically. The broadcast runner orchestrates the entire graph, handling fan-out, fan-in, conditional execution, and failure propagation.

broadcast(name)

Creates a named broadcast definition. The name must be unique, start with a letter, and contain only letters, digits, hyphens, and underscores. Returns a builder that connects signals into a dependency graph.

import { broadcast } from "station-broadcast";import { signal, z } from "station-signal"; const validate = signal("validate")  .input(z.object({ orderId: z.string() }))  .output(z.object({ orderId: z.string(), total: z.number() }))  .run(async (input) => {    const order = await db.orders.findById(input.orderId);    return { orderId: order.id, total: order.total };  }); const charge = signal("charge")  .input(z.object({ orderId: z.string(), total: z.number() }))  .output(z.object({ chargeId: z.string() }))  .run(async (input) => {    const result = await paymentGateway.charge(input.total);    return { chargeId: result.id };  }); const notify = signal("notify")  .input(z.object({ chargeId: z.string() }))  .run(async (input) => {    await emailService.sendReceipt(input.chargeId);  }); export const orderFlow = broadcast("orderFlow")  .input(validate)  .then(charge)  .then(notify)  .build();

Builder methods

.input(signal)

Sets the entry signal -- the root node of the DAG. This is required and must be called first. The entry signal's input type becomes the broadcast's input type: when you call broadcast.trigger(data), that data is passed to this signal.

const pipeline = broadcast("pipeline")  .input(validate)  // validate's input schema defines what .trigger() accepts  // ...

.then(signal, opts?)

Adds one or more downstream nodes. By default, each node depends on the most recently added tier (the "last tier"). Multiple signals passed to a single .then() call create parallel fan-out -- they all depend on the same upstream tier and run concurrently.

// Single signal with options:.then(charge, {  as: "payment",  after: ["validate"],  map: (upstream) => ({ total: upstream.validate.total }),  when: (upstream) => upstream.validate.total > 0,}) // Fan-out (multiple signals, no options):.then(emailReceipt, smsNotification, slackAlert)
OptionTypeDescription
asstringCustom name for this node. Defaults to the signal's name. Used in after arrays and as the key in downstream map functions. Required when the same signal appears multiple times in a broadcast.
afterstring[]Explicit dependency list. The node waits for all named nodes to complete before executing. Overrides the default "depends on previous tier" behavior. Use this for fan-in patterns.
map(upstream: Record<string, unknown>) => unknownTransform function. Receives an object keyed by upstream node names, each containing that node's deserialized output. Returns the input for this signal. Without map, the behavior depends on the number of dependencies: a single dependency passes its output directly; multiple dependencies pass the entire upstream object.
when(upstream: Record<string, unknown>) => booleanGuard function. Receives the same upstream object as map. Return false to skip this node. Skipped nodes (with skip reason "guard") do not propagate failure downstream -- their dependents still execute, receiving undefined for the skipped node's output.
Options (as, after, map, when) cannot be used with fan-out (multiple signals in a single .then() call). Use separate .then() calls with options for each signal instead.

.every(interval)

Makes the broadcast recurring. Accepts the same duration strings as signals: "30s", "5m", "1h", "1d". The broadcast runner enqueues a new run after each interval elapses. If a pending or running instance already exists, that tick is skipped to prevent overlap.

const hourlySync = broadcast("hourlySync")  .input(fetchData)  .then(transformData)  .then(loadData)  .every("1h")  .withInput({ source: "production" })  .build();

.withInput(data)

Default input data for recurring broadcasts. Without this, recurring broadcasts run with {} as input. The data is passed to the entry signal.

.onFailure(policy)

Sets the failure policy that determines what happens when a node fails. Default: "fail-fast".

PolicyBehavior
"fail-fast"Stop the entire broadcast immediately. All running nodes are cancelled, all pending nodes are skipped. The broadcast is marked as failed.
"skip-downstream"Mark the failed node, skip its direct and transitive dependents (with skip reason "upstream-failed"), but continue executing independent branches. The broadcast is marked as failed when all branches finish.
"continue"Same as "skip-downstream" for node skipping, but the broadcast is marked as "completed" (with an error message noting the partial failure) rather than "failed". Use this when some nodes are non-critical.
const resilientPipeline = broadcast("resilientPipeline")  .input(fetchData)  .then(sendEmail)       // non-critical: ok if this fails  .then(updateDatabase)  // critical  .onFailure("continue")  .build();

.timeout(ms)

Maximum total time for the entire broadcast execution in milliseconds. If the broadcast runs longer than this, all active nodes are cancelled and the broadcast is marked as failed with a timeout error. This is separate from per-signal timeouts, which still apply individually.

const timeBoundPipeline = broadcast("timeBoundPipeline")  .input(validate)  .then(processA)  .then(processB)  .timeout(60_000) // entire broadcast must finish within 60 seconds  .build();

.build()

Finalizes the broadcast definition. Validates the DAG: checks for duplicate node names, missing dependencies, and cycles. Throws BroadcastValidationError or BroadcastCycleError if the graph is invalid. Returns a BroadcastDefinition object that can be registered with the runner and triggered.


Patterns

Linear chain

A → B → C. Each node depends on the previous. The output of each node is passed directly as input to the next.

const linear = broadcast("linear")  .input(validate)   // A  .then(charge)      // B: receives validate's output  .then(notify)      // C: receives charge's output  .build();

Fan-out

A → [B, C, D]. Multiple nodes run in parallel after A completes. Each receives A's output.

const fanOut = broadcast("fanOut")  .input(validate)  .then(emailReceipt, smsNotification, slackAlert)  .build(); // Equivalent to:const fanOutExplicit = broadcast("fanOutExplicit")  .input(validate)  .then(emailReceipt, { after: ["validate"] })  .then(smsNotification, { after: ["validate"] })  .then(slackAlert, { after: ["validate"] })  .build();

Fan-in

[B, C] → D. Node D waits for both B and C to complete. Use after to declare dependencies on multiple upstream nodes, and map to combine their outputs.

const fanIn = broadcast("fanIn")  .input(validate)  .then(emailReceipt, { as: "email" })  .then(notifyWarehouse, { as: "warehouse" })  .then(generateReport, {    after: ["email", "warehouse"],    map: (upstream) => ({      emailSent: upstream.email.sent,      warehouseAck: upstream.warehouse.ackId,    }),  })  .build();

Conditional execution

Use when to skip nodes based on upstream results. Skipped nodes (reason: "guard") do not count as failures and do not block their dependents.

const conditional = broadcast("conditional")  .input(validate)  .then(chargeCard, {    when: (upstream) => upstream.validate.paymentMethod === "card",    map: (upstream) => ({ amount: upstream.validate.total }),  })  .then(chargeBankTransfer, {    after: ["validate"],    when: (upstream) => upstream.validate.paymentMethod === "bank",    map: (upstream) => ({ amount: upstream.validate.total }),  })  .then(sendConfirmation, {    after: ["chargeCard", "chargeBankTransfer"],  })  .build();

Data mapping

Use map to transform upstream outputs into the downstream signal's expected input format. The upstream argument is keyed by node name (or the as alias).

const mapped = broadcast("mapped")  .input(fetchUser)  .then(enrichProfile, {    map: (upstream) => ({      userId: upstream.fetchUser.id,      email: upstream.fetchUser.email,    }),  })  .build();

Default data when upstream is skipped

When a node is guard-skipped, its output is undefined in downstream map functions. Use nullish coalescing to provide fallback values.

const withFallback = broadcast("withFallback")  .input(validate)  .then(enrichFromCache, {    when: (upstream) => upstream.validate.cacheEnabled,  })  .then(process, {    after: ["validate", "enrichFromCache"],    map: (upstream) => ({      data: upstream.validate.data,      enrichment: upstream.enrichFromCache ?? { source: "none" },    }),  })  .build();

Triggering

Trigger a broadcast to enqueue it for execution. The input is passed to the entry signal. Returns a broadcast run ID.

// Trigger via the definition (uses the global broadcast adapter)const runId = await orderFlow.trigger({ orderId: "ORD-9281" }); // Trigger via the runner (preferred — uses the runner's own adapter)const runId = await broadcastRunner.trigger("orderFlow", { orderId: "ORD-9281" }); // Optionally wait for the broadcast to completeconst result = await broadcastRunner.waitForBroadcastRun(runId, {  timeoutMs: 30_000,  pollMs: 200,}); if (result?.status === "completed") {  console.log("Broadcast finished successfully");} else if (result?.status === "failed") {  console.error("Broadcast failed:", result.error);}

BroadcastRunner

The broadcast runner orchestrates DAG execution. It polls for triggered broadcasts, initializes node run records, triggers root signals, and advances the graph as nodes complete. It coordinates with a SignalRunner instance to execute individual signals and monitor their completion.

import { BroadcastRunner, ConsoleBroadcastSubscriber } from "station-broadcast";import { BroadcastSqliteAdapter } from "station-adapter-sqlite/broadcast"; const broadcastRunner = new BroadcastRunner({  signalRunner: runner,  adapter: new BroadcastSqliteAdapter({ dbPath: "./data/broadcasts.db" }),  subscribers: [new ConsoleBroadcastSubscriber()],  pollIntervalMs: 1000,}); broadcastRunner.register(orderFlow);broadcastRunner.register(hourlySync); // Start the broadcast runner (blocks until stop() is called)await broadcastRunner.start();
Shutdown order matters. The broadcast runner must stop before the signal runner because it queries the signal adapter during shutdown to check node completion status. Stop them in this order:
await broadcastRunner.stop({ graceful: true });await signalRunner.stop({ graceful: true });

Constructor options

OptionTypeDefaultDescription
signalRunnerSignalRunnerRequired. The signal runner instance that executes individual signals. The broadcast runner reads from the signal runner's adapter to track node completion.
adapterBroadcastQueueAdapterBroadcastMemoryAdapterStorage backend for broadcast runs and node states. Use BroadcastSqliteAdapter for production persistence.
broadcastsDirstringDirectory path for auto-discovery. Recursively imports all .ts and .js files and registers any exported broadcast definitions.
subscribersBroadcastSubscriber[][]Array of subscriber objects notified on broadcast lifecycle events.
pollIntervalMsnumber1000Milliseconds between poll ticks. Each tick checks for pending broadcasts, advances running broadcasts, and handles recurring schedules.

Methods

MethodReturnsDescription
register(definition)thisRegister a broadcast definition. Alternative to auto-discovery via broadcastsDir. Chainable. Warns on duplicate names.
start()Promise<void>Start the poll loop. Discovers broadcasts from broadcastsDir (if set), installs shutdown handlers, and begins polling. Blocks until stop() is called.
stop(opts?)Promise<void>Stop the runner. Accepts optional { graceful: boolean, timeoutMs: number }. When graceful, waits for running broadcasts to finish (up to the timeout). Closes the adapter.
trigger(name, input)Promise<string>Trigger a registered broadcast by name. Writes directly to this runner's adapter rather than the global singleton. Returns the broadcast run ID.
waitForBroadcastRun(id, opts?)Promise<BroadcastRun | null>Poll until a broadcast run reaches a terminal status (completed, failed, or cancelled). Options: { pollMs?, timeoutMs? }. Default timeout: 60 seconds.
cancel(broadcastRunId)Promise<boolean>Cancel a broadcast run. Cancels all running signal runs, skips all pending nodes, and marks the broadcast as cancelled. Returns false if the run does not exist or is already terminal.
getBroadcastRun(id)Promise<BroadcastRun | null>Look up a broadcast run by its ID.
getNodeRuns(broadcastRunId)Promise<BroadcastNodeRun[]>Get all node run records for a broadcast run.
listRegistered()Array<{ name, nodeCount, failurePolicy, timeout?, interval? }>List metadata for all registered broadcast definitions.
hasBroadcast(name)booleanCheck whether a broadcast is registered by name.
subscribe(subscriber)thisAdd a subscriber after construction. Chainable.

BroadcastQueueAdapter

The adapter interface for broadcast storage. Manages broadcast runs and their node runs. Two adapters ship with the framework: BroadcastMemoryAdapter (built into station-broadcast) and BroadcastSqliteAdapter (from station-adapter-sqlite).

MethodSignatureDescription
addBroadcastRun(run)(BroadcastRun) => Promise<void>Insert a new broadcast run record.
getBroadcastRun(id)(string) => Promise<BroadcastRun | null>Look up a broadcast run by ID.
updateBroadcastRun(id, patch)(string, BroadcastRunPatch) => Promise<void>Partially update a broadcast run. Identity fields (id, broadcastName, createdAt) are immutable.
getBroadcastRunsDue()() => Promise<BroadcastRun[]>Get all broadcast runs with "pending" status, ready for initialization.
getBroadcastRunsRunning()() => Promise<BroadcastRun[]>Get all broadcast runs with "running" status, needing advancement.
listBroadcastRuns(broadcastName)(string) => Promise<BroadcastRun[]>List all runs for a specific broadcast name.
hasBroadcastRunWithStatus(name, statuses)(string, BroadcastRunStatus[]) => Promise<boolean>Check if any run exists for the given broadcast in one of the specified statuses. Used for recurring overlap prevention.
purgeBroadcastRuns(olderThan, statuses)(Date, BroadcastRunStatus[]) => Promise<number>Delete broadcast runs in terminal statuses older than the cutoff. Returns count deleted.
addNodeRun(nodeRun)(BroadcastNodeRun) => Promise<void>Insert a node run record.
getNodeRun(id)(string) => Promise<BroadcastNodeRun | null>Look up a node run by ID.
updateNodeRun(id, patch)(string, BroadcastNodeRunPatch) => Promise<void>Partially update a node run. Identity fields (id, broadcastRunId, nodeName, signalName) are immutable.
getNodeRuns(broadcastRunId)(string) => Promise<BroadcastNodeRun[]>Get all node runs for a given broadcast run.
generateId()() => stringGenerate a unique ID for runs and node runs.
ping()() => Promise<boolean>Health check. Returns true if the adapter is operational.
close()() => Promise<void>Optional. Release resources. Called automatically on stop.

BroadcastRun

Represents a single execution of a broadcast.

FieldTypeDescription
idstringUnique broadcast run identifier.
broadcastNamestringName of the broadcast definition.
inputstringJSON-serialized input provided when triggered.
status"pending" | "running" | "completed" | "failed" | "cancelled"Current lifecycle state.
failurePolicy"fail-fast" | "skip-downstream" | "continue"The failure policy in effect for this run.
timeoutnumber | undefinedBroadcast-level timeout in milliseconds, if set.
intervalstring | undefinedRecurring interval string, if this is a recurring broadcast.
nextRunAtDate | undefinedScheduled time for the next recurring execution.
createdAtDateWhen the broadcast run was created.
startedAtDate | undefinedWhen the broadcast began executing (first nodes triggered).
completedAtDate | undefinedWhen the broadcast reached a terminal state.
errorstring | undefinedError message on failure. Also set on completed broadcasts with "continue" policy if any nodes failed (partial failure).

BroadcastNodeRun

Represents a single node's execution within a broadcast run.

FieldTypeDescription
idstringUnique node run identifier.
broadcastRunIdstringID of the parent broadcast run.
nodeNamestringName of this node in the DAG (signal name or the as alias).
signalNamestringName of the underlying signal.
signalRunIdstring | undefinedID of the signal run created for this node. Links to the Run record in the signal adapter.
status"pending" | "running" | "completed" | "failed" | "skipped"Current node state.
skipReason"guard" | "upstream-failed" | "cancelled" | undefinedWhy this node was skipped. Only set when status is "skipped". "guard": the when function returned false. "upstream-failed": an upstream dependency failed. "cancelled": the broadcast was cancelled.
inputstring | undefinedJSON-serialized input passed to the signal.
outputstring | undefinedJSON-serialized output from the completed signal.
errorstring | undefinedError message if the node failed.
startedAtDate | undefinedWhen the node started executing.
completedAtDate | undefinedWhen the node reached a terminal state.

BroadcastSubscriber

All methods are optional. Implement only the events you need. Subscriber errors are caught and logged without affecting broadcast execution.

MethodEvent dataWhen it fires
onBroadcastDiscovered{ broadcastName, filePath }A broadcast file was found during auto-discovery from broadcastsDir.
onBroadcastQueued{ broadcastRun }A broadcast run was created and added to the queue.
onBroadcastStarted{ broadcastRun }A broadcast run transitioned from "pending" to "running". Node run records have been created and root nodes are about to be triggered.
onBroadcastCompleted{ broadcastRun }All nodes reached terminal states and the broadcast is marked as completed. Under the "continue" policy, the broadcastRun.error field may contain a partial failure message.
onBroadcastFailed{ broadcastRun, error }The broadcast failed. Under "fail-fast", this fires immediately when any node fails. Under "skip-downstream", this fires after all branches finish.
onBroadcastCancelled{ broadcastRun }The broadcast was cancelled via broadcastRunner.cancel().
onNodeTriggered{ broadcastRun, nodeRun }A node's signal was triggered via .trigger(). The nodeRun.signalRunId is now set.
onNodeCompleted{ broadcastRun, nodeRun }A node's signal completed. The nodeRun.output contains the JSON-serialized result.
onNodeFailed{ broadcastRun, nodeRun, error }A node's signal failed, its map function threw, its when function threw, or input validation failed.
onNodeSkipped{ broadcastRun, nodeRun, reason }A node was skipped. Reasons include: guard returned false, upstream dependency failed, or broadcast was cancelled.

The built-in ConsoleBroadcastSubscriber logs all events to stdout with a [station-broadcast] prefix.