Loading source
Pulling the file list, source metadata, and syntax-aware rendering for this listing.
Source from repo
Comprehensive Cloudflare platform skill covering Workers, D1, R2, KV, AI, Durable Objects, and security.
Files
Skill
Size
Entrypoint
Format
Open file
Syntax-highlighted preview of this file as included in the skill package.
references/pipelines/patterns.md
1# Pipelines Patterns23## Fire-and-Forget45```typescript6export default {7async fetch(request, env, ctx) {8const event = { user_id: '...', event_type: 'page_view', timestamp: new Date().toISOString() };9ctx.waitUntil(env.STREAM.send([event])); // Don't block response10return new Response('OK');11}12};13```1415## Schema Validation with Zod1617```typescript18import { z } from 'zod';1920const EventSchema = z.object({21user_id: z.string(),22event_type: z.enum(['purchase', 'view']),23amount: z.number().positive().optional()24});2526const validated = EventSchema.parse(rawEvent); // Throws on invalid27await env.STREAM.send([validated]);28```2930**Why:** Structured streams drop invalid events silently. Client validation gives immediate feedback.3132## SQL Transform Patterns3334```sql35-- Filter early (reduce storage)36INSERT INTO my_sink37SELECT user_id, event_type, amount38FROM my_stream39WHERE event_type = 'purchase' AND amount > 104041-- Select only needed fields42INSERT INTO my_sink43SELECT user_id, event_type, timestamp FROM my_stream4445-- Enrich with CASE46INSERT INTO my_sink47SELECT user_id, amount,48CASE WHEN amount > 1000 THEN 'vip' ELSE 'standard' END as tier49FROM my_stream50```5152## Pipelines + Queues Fan-out5354```typescript55await Promise.all([56env.ANALYTICS_STREAM.send([event]), // Long-term storage57env.PROCESS_QUEUE.send(event) // Immediate processing58]);59```6061| Need | Use |62|------|-----|63| Long-term storage, SQL queries | Pipelines |64| Immediate processing, retries | Queues |65| Both | Fan-out pattern |6667## Performance Tuning6869| Goal | Config |70|------|--------|71| Low latency | `--roll-interval 10` |72| Query performance | `--roll-interval 300 --roll-size 100` |73| Cost optimal | `--compression zstd --roll-interval 300` |7475## Schema Evolution7677Pipelines are immutable. Use versioning:7879```bash80# Create v2 stream/sink/pipeline81npx wrangler pipelines streams create events-v2 --schema-file v2.json8283# Dual-write during transition84await Promise.all([env.EVENTS_V1.send([event]), env.EVENTS_V2.send([event])]);8586# Query across versions with UNION ALL87```88