Examples
CI Pipeline
Simulated CI/CD workflow with fan-out, branch guards, and result fallback. The most complex DAG in the examples.
checkout |---> lint (parallel) |---> test-unit (parallel, 2 retries) |---> test-integration (parallel, 1 retry) | build-app (waits for all above) | deploy-staging | deploy-prod (guard: only on "main" branch) | notify (fallback: uses staging output if prod skipped)broadcasts/ci-pipeline.ts
import { broadcast } from "station-broadcast";import { checkout } from "../signals/checkout.js";import { lint } from "../signals/lint.js";import { testUnit } from "../signals/test-unit.js";import { testIntegration } from "../signals/test-integration.js";import { buildApp } from "../signals/build-app.js";import { deployStaging } from "../signals/deploy-staging.js";import { deployProd } from "../signals/deploy-prod.js";import { notify } from "../signals/notify.js"; export const ciPipeline = broadcast("ci-pipeline") .input(checkout) .then(lint, testUnit, testIntegration) // fan-out: all run in parallel .then(buildApp, { // Wait for all tests AND checkout; pass checkout output as build input after: ["lint", "test-unit", "test-integration", "checkout"], map: (upstream) => upstream["checkout"], }) .then(deployStaging) .then(deployProd, { // Need checkout data for the branch guard after: ["deploy-staging", "checkout"], map: (upstream) => upstream["deploy-staging"], when: (upstream) => { const co = upstream["checkout"] as { branch: string } | undefined; return co?.branch === "main"; }, }) .then(notify, { // If deploy-prod was skipped (non-main branch), fall back to staging output after: ["deploy-prod", "deploy-staging"], map: (upstream) => upstream["deploy-prod"] ?? upstream["deploy-staging"], }) .onFailure("fail-fast") .timeout(120_000) .build();signals/checkout.ts
import { signal, z } from "station-signal"; export const checkout = signal("checkout") .input(z.object({ repo: z.string(), branch: z.string(), commitSha: z.string() })) .output(z.object({ repo: z.string(), branch: z.string(), commitSha: z.string(), workdir: z.string(), })) .timeout(10_000) .run(async (input) => { console.log(`[checkout] Cloning ${input.repo}@${input.branch} (${input.commitSha.slice(0, 7)})...`); await new Promise((r) => setTimeout(r, 600)); const workdir = `/tmp/ci/${input.commitSha.slice(0, 7)}`; console.log(`[checkout] Workspace ready at ${workdir}`); return { ...input, workdir }; });runner.ts
import path from "node:path";import { SignalRunner, ConsoleSubscriber } from "station-signal";import { BroadcastRunner, ConsoleBroadcastSubscriber } from "station-broadcast";import { SqliteAdapter } from "station-adapter-sqlite";import { BroadcastSqliteAdapter } from "station-adapter-sqlite/broadcast";import { ciPipeline } from "./broadcasts/ci-pipeline.js"; const DB_PATH = path.join(import.meta.dirname, "jobs.db"); const signalRunner = new SignalRunner({ signalsDir: path.join(import.meta.dirname, "signals"), adapter: new SqliteAdapter({ dbPath: DB_PATH }), subscribers: [new ConsoleSubscriber()], maxConcurrent: 4, retryBackoffMs: 500,}); const broadcastRunner = new BroadcastRunner({ signalRunner, adapter: new BroadcastSqliteAdapter({ dbPath: DB_PATH }), subscribers: [new ConsoleBroadcastSubscriber()],}); broadcastRunner.register(ciPipeline); const branch = process.argv[2] || "main";const sha = Math.random().toString(36).slice(2, 10) + Math.random().toString(36).slice(2, 6); setTimeout(async () => { const id = await ciPipeline.trigger({ repo: "acme/web-app", branch, commitSha: sha, }); console.log(`\nTriggered CI pipeline: ${id}`); console.log(` repo: acme/web-app`); console.log(` branch: ${branch}`); console.log(` commit: ${sha.slice(0, 7)}`); console.log(`\nProd deploy ${branch === "main" ? "enabled" : "skipped"} (branch guard).`);}, 500); signalRunner.start();broadcastRunner.start();after overrides implicit dependencies so build-app waits for all three parallel steps. map transforms upstream outputs into the shape the next signal expects. when conditionally skips nodes — here it gates prod deployment on the "main" branch. The ?? in notify's map provides a fallback when deploy-prod was skipped and returned undefined. onFailure("fail-fast") stops the entire pipeline on the first failure.
Pass a branch name as a CLI argument to test the guard: pnpm --filter example-ci-pipeline start -- feature/xyz skips the prod deploy step.
Run it: pnpm --filter example-ci-pipeline start