Examples
Broadcast
DAG workflow orchestration. Chain signals into a dependency graph with fan-out and conditional execution.
signals/validate-order.ts
import { signal, z } from "station-signal"; export const validateOrder = signal("validate-order") .input(z.object({ orderId: z.string(), amount: z.number() })) .output(z.object({ orderId: z.string(), amount: z.number(), valid: z.boolean() })) .run(async (input) => { console.log(`Validating order ${input.orderId} ($${input.amount})`); return { orderId: input.orderId, amount: input.amount, valid: input.amount > 0 }; });signals/charge-payment.ts
import { signal, z } from "station-signal"; export const chargePayment = signal("charge-payment") .input(z.object({ orderId: z.string(), amount: z.number(), valid: z.boolean() })) .output(z.object({ orderId: z.string(), chargeId: z.string() })) .run(async (input) => { const chargeId = `ch_${Math.random().toString(36).slice(2, 8)}`; console.log(`Charging $${input.amount} for order ${input.orderId}`); return { orderId: input.orderId, chargeId }; });signals/send-receipt.ts
import { signal, z } from "station-signal"; export const sendReceipt = signal("send-receipt") .input(z.object({ orderId: z.string(), chargeId: z.string() })) .run(async (input) => { console.log(`Sending receipt for order ${input.orderId} (charge: ${input.chargeId})`); });signals/notify-warehouse.ts
import { signal, z } from "station-signal"; export const notifyWarehouse = signal("notify-warehouse") .input(z.object({ orderId: z.string(), chargeId: z.string() })) .run(async (input) => { console.log(`Notifying warehouse for order ${input.orderId}`); });broadcasts/order-pipeline.ts
import { broadcast } from "station-broadcast";import { validateOrder } from "../signals/validate-order.js";import { chargePayment } from "../signals/charge-payment.js";import { sendReceipt } from "../signals/send-receipt.js";import { notifyWarehouse } from "../signals/notify-warehouse.js"; export const orderPipeline = broadcast("order-pipeline") .input(validateOrder) .then(chargePayment, { when: (prev) => (prev["validate-order"] as { valid: boolean }).valid === true, }) .then(sendReceipt, notifyWarehouse) // fan-out: both run in parallel .build();runner.ts
import path from "node:path";import { SignalRunner, ConsoleSubscriber } from "station-signal";import { BroadcastRunner, ConsoleBroadcastSubscriber } from "station-broadcast";import { orderPipeline } from "./broadcasts/order-pipeline.js"; const signalRunner = new SignalRunner({ signalsDir: path.join(import.meta.dirname, "signals"), subscribers: [new ConsoleSubscriber()],}); const broadcastRunner = new BroadcastRunner({ signalRunner, subscribers: [new ConsoleBroadcastSubscriber()],}); broadcastRunner.register(orderPipeline); setTimeout(async () => { const broadcastRunId = await orderPipeline.trigger({ orderId: "ORD-42", amount: 99.99, }); console.log(`\nTriggered broadcast: ${broadcastRunId}\n`); const result = await broadcastRunner.waitForBroadcastRun(broadcastRunId, { timeoutMs: 30_000, }); console.log(`\nBroadcast finished: ${result?.status}\n`); await broadcastRunner.stop(); await signalRunner.stop();}, 500); signalRunner.start();broadcastRunner.start();broadcast() creates a DAG. .input() sets the entry signal. .then() adds downstream nodes. Multiple signals in one .then() = fan-out (parallel). when is a guard that returns false to skip a node. BroadcastRunner orchestrates the DAG. waitForBroadcastRun blocks until completion.
Run it: pnpm --filter example-broadcast start