Examples

Broadcast

DAG workflow orchestration. Chain signals into a dependency graph with fan-out and conditional execution.

signals/validate-order.ts

import { signal, z } from "station-signal"; export const validateOrder = signal("validate-order")  .input(z.object({ orderId: z.string(), amount: z.number() }))  .output(z.object({ orderId: z.string(), amount: z.number(), valid: z.boolean() }))  .run(async (input) => {    console.log(`Validating order ${input.orderId} ($${input.amount})`);    return { orderId: input.orderId, amount: input.amount, valid: input.amount > 0 };  });

signals/charge-payment.ts

import { signal, z } from "station-signal"; export const chargePayment = signal("charge-payment")  .input(z.object({ orderId: z.string(), amount: z.number(), valid: z.boolean() }))  .output(z.object({ orderId: z.string(), chargeId: z.string() }))  .run(async (input) => {    const chargeId = `ch_${Math.random().toString(36).slice(2, 8)}`;    console.log(`Charging $${input.amount} for order ${input.orderId}`);    return { orderId: input.orderId, chargeId };  });

signals/send-receipt.ts

import { signal, z } from "station-signal"; export const sendReceipt = signal("send-receipt")  .input(z.object({ orderId: z.string(), chargeId: z.string() }))  .run(async (input) => {    console.log(`Sending receipt for order ${input.orderId} (charge: ${input.chargeId})`);  });

signals/notify-warehouse.ts

import { signal, z } from "station-signal"; export const notifyWarehouse = signal("notify-warehouse")  .input(z.object({ orderId: z.string(), chargeId: z.string() }))  .run(async (input) => {    console.log(`Notifying warehouse for order ${input.orderId}`);  });

broadcasts/order-pipeline.ts

import { broadcast } from "station-broadcast";import { validateOrder } from "../signals/validate-order.js";import { chargePayment } from "../signals/charge-payment.js";import { sendReceipt } from "../signals/send-receipt.js";import { notifyWarehouse } from "../signals/notify-warehouse.js"; export const orderPipeline = broadcast("order-pipeline")  .input(validateOrder)  .then(chargePayment, {    when: (prev) => (prev["validate-order"] as { valid: boolean }).valid === true,  })  .then(sendReceipt, notifyWarehouse) // fan-out: both run in parallel  .build();

runner.ts

import path from "node:path";import { SignalRunner, ConsoleSubscriber } from "station-signal";import { BroadcastRunner, ConsoleBroadcastSubscriber } from "station-broadcast";import { orderPipeline } from "./broadcasts/order-pipeline.js"; const signalRunner = new SignalRunner({  signalsDir: path.join(import.meta.dirname, "signals"),  subscribers: [new ConsoleSubscriber()],}); const broadcastRunner = new BroadcastRunner({  signalRunner,  subscribers: [new ConsoleBroadcastSubscriber()],}); broadcastRunner.register(orderPipeline); setTimeout(async () => {  const broadcastRunId = await orderPipeline.trigger({    orderId: "ORD-42",    amount: 99.99,  });  console.log(`\nTriggered broadcast: ${broadcastRunId}\n`);   const result = await broadcastRunner.waitForBroadcastRun(broadcastRunId, {    timeoutMs: 30_000,  });  console.log(`\nBroadcast finished: ${result?.status}\n`);   await broadcastRunner.stop();  await signalRunner.stop();}, 500); signalRunner.start();broadcastRunner.start();

broadcast() creates a DAG. .input() sets the entry signal. .then() adds downstream nodes. Multiple signals in one .then() = fan-out (parallel). when is a guard that returns false to skip a node. BroadcastRunner orchestrates the DAG. waitForBroadcastRun blocks until completion.

Run it: pnpm --filter example-broadcast start


← All examples