Loading source
Pulling the file list, source metadata, and syntax-aware rendering for this listing.
Source from repo
Comprehensive Cloudflare platform skill covering Workers, D1, R2, KV, AI, Durable Objects, and security.
Files
Skill
Size
Entrypoint
Format
Open file
Syntax-highlighted preview of this file as included in the skill package.
references/pipelines/patterns.md
1# Pipelines Patterns23Code-first patterns. For observability dataset/field schemas and Logpush dataset lists, pull `https://developers.cloudflare.com/pipelines/observability/metrics/` and `https://developers.cloudflare.com/pipelines/streams/logpush/`.45## Fire-and-Forget Producer67```typescript8export default {9async fetch(req, env, ctx) {10const event = { event_id: crypto.randomUUID(), event_type: "page_view", timestamp: new Date().toISOString() };11ctx.waitUntil(env.MY_STREAM.send([event])); // don't block the response12return new Response("OK");13}14};15```1617## Client-Side Validation with Zod1819Structured streams drop invalid events silently during processing. Validate before sending for immediate feedback.2021```typescript22import { z } from "zod";2324const EventSchema = z.object({25event_id: z.string(),26category: z.enum(["purchase", "view"]),27amount: z.number().positive().optional(),28});2930const validated = EventSchema.parse(rawEvent); // throws synchronously31await env.MY_STREAM.send([validated]);32```3334## Scheduled Collector Worker3536```jsonc37// wrangler.jsonc38{39"name": "collector",40"pipelines": [{ "stream": "<STREAM_ID>", "binding": "EVENT_STREAM" }],41"triggers": { "crons": ["*/5 * * * *"] }42}43```4445```typescript46export default {47async scheduled(event, env, ctx) {48const items = await (await fetch("https://api.example.com/data")).json();49const events = items.map(i => ({50event_id: crypto.randomUUID(),51timestamp: new Date().toISOString(),52category: i.type, amount: i.value,53}));54await env.EVENT_STREAM.send(events);55},56};57```5859## Logpush → Pipelines6061Pipelines is a native Logpush destination — ingest Cloudflare logs, transform with SQL, store as Iceberg/Parquet. For the current supported dataset list and field names, pull the Logpush doc above.6263```sql64INSERT INTO http_logs_sink65SELECT66ClientIP,67EdgeResponseStatus,68to_timestamp_micros(EdgeStartTimestamp) AS event_time,69upper(ClientRequestMethod) AS method,70sha256(ClientIP) AS hashed_ip -- redact PII at ingest71FROM http_logs_stream72WHERE EdgeResponseStatus >= 400;73```7475Configure via Dashboard (**Logpush → Create a job → Pipelines** destination) or API.7677## Pipelines + Queues Fan-out7879```typescript80await Promise.all([81env.ANALYTICS_STREAM.send([event]), // long-term storage + SQL82env.PROCESS_QUEUE.send(event), // immediate processing + retries83]);84```8586Use Pipelines for long-term storage + SQL; Queues for immediate processing/retries/DLQ; both for fan-out.8788## Observability (GraphQL Analytics)8990Same R2 API token works. Endpoint: `https://api.cloudflare.com/client/v4/graphql`. Datasets cover ingestion, processing (incl. `decodeErrors`), delivery, sink writes (`filesWritten`), and user/validation errors — see the metrics doc for the full dataset/field catalog.9192```bash93curl -X POST "https://api.cloudflare.com/client/v4/graphql" \94-H "Authorization: Bearer $API_TOKEN" -H "Content-Type: application/json" \95-d '{"query": "query { viewer { accounts(filter: {accountTag: \"'$ACCOUNT_ID'\"}) { pipelinesIngestionAdaptiveGroups(filter: {pipelineId: \"PIPELINE-UUID-WITH-DASHES\", datetime_geq: \"2026-03-01T00:00:00Z\"}, limit: 10) { sum { ingestedRecords ingestedBytes } dimensions { datetimeHour } } } } }"}'96```9798> **Sink/pipeline IDs need dashes for GraphQL** but wrangler may show them without: `b909fe6e544844abbd63f6dcbc81d602` → `b909fe6e-5448-44ab-bd63-f6dcbc81d602`. Metrics take 5–10 min to populate.99100### Detecting Silent Data Loss101102If a sink's bucket is deleted or its token expires, events are accepted but lost. Tell-tale: `recordsWritten > 0` but `filesWritten = 0`. Always verify data lands in R2 within the roll interval and R2 SQL returns expected counts.103104## Schema Evolution (Immutable Pipelines)105106Pipelines can't change. Version + dual-write:107108```bash109npx wrangler pipelines streams create events_v2 --schema-file v2.json110```111```typescript112await Promise.all([env.EVENTS_V1.send([event]), env.EVENTS_V2.send([event])]);113// query across versions with UNION ALL in R2 SQL114```115116## End-to-End: Streaming Analytics Dashboard117118```119External APIs → Collector Worker (cron) → Pipeline → R2 (Iceberg) → Dashboard Worker → R2 SQL120```1211221. Create bucket + enable catalog ([r2-data-catalog](../r2-data-catalog/configuration.md))1232. Create stream + sink + pipeline (here)1243. Collector Worker with cron + stream binding (above)1254. Dashboard Worker querying R2 SQL ([r2-sql/patterns.md](../r2-sql/patterns.md))1265. Enable automatic compaction127128## See Also129130- [configuration.md](configuration.md) · [api.md](api.md) · [gotchas.md](gotchas.md) · [r2-sql](../r2-sql/)131