Examples
ETL Pipeline
Extract-transform-load workflow with multi-step signals. A linear broadcast chain: extract, transform, load, report. Each signal's output becomes the next signal's input.
broadcasts/etl-pipeline.ts
import { broadcast } from "station-broadcast";import { extractUsers } from "../signals/extract-users.js";import { transformUsers } from "../signals/transform-users.js";import { loadUsers } from "../signals/load-users.js";import { generateReport } from "../signals/generate-report.js"; export const etlPipeline = broadcast("etl-pipeline") .input(extractUsers) .then(transformUsers) .then(loadUsers) .then(generateReport) .timeout(60_000) .build();signals/extract-users.ts
import { signal, z } from "station-signal"; export const extractUsers = signal("extract-users") .input(z.object({ source: z.string(), batchSize: z.number() })) .output( z.object({ records: z.array(z.object({ id: z.number(), name: z.string(), email: z.string() })), source: z.string(), }), ) .timeout(15_000) .step("connect", async (input) => { console.log(`[extract] Connecting to ${input.source}...`); await new Promise((r) => setTimeout(r, 400)); return { ...input, connected: true }; }) .step("query", async (prev) => { console.log(`[extract] Querying ${prev.batchSize} records from ${prev.source}...`); await new Promise((r) => setTimeout(r, 800)); const records = Array.from({ length: prev.batchSize }, (_, i) => ({ id: i + 1, name: `User ${i + 1}`, email: `user${i + 1}@${prev.source}`, raw_signup: `2024-0${(i % 9) + 1}-15`, status_code: i % 3 === 0 ? "A" : i % 3 === 1 ? "I" : "P", })); console.log(`[extract] Fetched ${records.length} records.`); return { records, source: prev.source }; }) .step("validate", async (prev) => { console.log(`[extract] Validating ${prev.records.length} records...`); const valid = prev.records.filter((r: { email: string }) => r.email.includes("@")); const dropped = prev.records.length - valid.length; if (dropped > 0) console.log(`[extract] Dropped ${dropped} invalid records.`); return { records: valid.map((r: { id: number; name: string; email: string }) => ({ id: r.id, name: r.name, email: r.email, })), source: prev.source, }; }) .build();signals/load-users.ts
import { signal, z } from "station-signal"; const userRecord = z.object({ id: z.number(), name: z.string(), email: z.string() }); export const loadUsers = signal("load-users") .input( z.object({ records: z.array(userRecord), source: z.string(), transformedAt: z.string(), }), ) .output(z.object({ inserted: z.number(), updated: z.number(), source: z.string() })) .timeout(20_000) .retries(2) .step("upsert", async (input) => { console.log(`[load] Upserting ${input.records.length} records into target database...`); await new Promise((r) => setTimeout(r, 600)); if (Math.random() < 0.1) { throw new Error("Connection to target database lost"); } const inserted = Math.floor(input.records.length * 0.7); const updated = input.records.length - inserted; console.log(`[load] Inserted ${inserted}, updated ${updated}.`); return { inserted, updated, source: input.source }; }) .step("verify", async (prev) => { console.log(`[load] Verifying load integrity...`); await new Promise((r) => setTimeout(r, 300)); const total = prev.inserted + prev.updated; console.log(`[load] Verified ${total} records in target.`); return prev; }) .build();The extract signal uses three steps (connect, query, validate) to demonstrate multi-step signals within a broadcast. The load signal has .retries(2) to handle transient database failures. Each signal's final output shape must match the next signal's input schema.
Run it: pnpm --filter example-etl-pipeline start