Loading source
Pulling the file list, source metadata, and syntax-aware rendering for this listing.
Source from repo
Comprehensive Cloudflare platform skill covering Workers, D1, R2, KV, AI, Durable Objects, and security.
Files
Skill
Size
Entrypoint
Format
Open file
Syntax-highlighted preview of this file as included in the skill package.
references/pipelines/api.md
1# Pipelines API Reference23Code templates and verified behavior. For the full SQL function set and HTTP status semantics, pull `https://developers.cloudflare.com/pipelines/sql-reference/` and the streams docs.45## Worker Binding Interface67```typescript8// from cloudflare:pipelines / @cloudflare/workers-types9interface Pipeline<T = any> { send(records: T[]): Promise<void>; }1011interface Env { MY_STREAM: Pipeline; }1213export default {14async fetch(req: Request, env: Env, ctx: ExecutionContext): Promise<Response> {15await env.MY_STREAM.send([{ event_id: crypto.randomUUID(), amount: 29.99 }]);16return new Response("OK");17}18} satisfies ExportedHandler<Env>;19```2021- `send()` takes an **array**, returns `Promise<void>` (no confirmation payload).22- Throws on network errors — wrap in try/catch or use `ctx.waitUntil()` for fire-and-forget.23- Validation errors are **not** thrown here (deferred during processing — see [gotchas.md](gotchas.md)).24- Payload/rate limits apply — check `https://developers.cloudflare.com/pipelines/platform/limits/` before sizing batches.2526## HTTP Ingest2728```29https://{stream-id}.ingest.cloudflare.com30```3132Get `{stream-id}` from `npx wrangler pipelines streams list`.3334```bash35# Batch (preferred)36curl -X POST https://{stream-id}.ingest.cloudflare.com \37-H "Content-Type: application/json" \38-d '[{"event_id":"evt-1","amount":29.99},{"event_id":"evt-2","amount":14.99}]'3940# Single event — auto-wrapped in an array41curl -X POST https://{stream-id}.ingest.cloudflare.com \42-H "Content-Type: application/json" -d '{"event_id":"evt-3","amount":9.99}'43```4445If stream auth is enabled, add `-H "Authorization: Bearer $TOKEN"` (token needs **Workers Pipelines Send**). Standard HTTP status codes apply (400 invalid, 401 auth, 413 too large, 429 rate-limited, 5xx retry).4647> **JSON only** — no Avro, Protobuf, or CSV input.4849## REST Management API5051Base: `https://api.cloudflare.com/client/v4/accounts/$ACCOUNT_ID/pipelines/v1`5253```bash54# List55curl -s "$BASE_URL/streams" -H "Authorization: Bearer $API_TOKEN"56curl -s "$BASE_URL/sinks" -H "Authorization: Bearer $API_TOKEN"57curl -s "$BASE_URL/pipelines" -H "Authorization: Bearer $API_TOKEN"5859# Get one (pipeline GET includes status + failure_reason — useful for debugging)60curl -s "$BASE_URL/pipelines/{pipeline-id}" -H "Authorization: Bearer $API_TOKEN"6162# Delete in reverse order: pipeline → sink → stream63curl -X DELETE "$BASE_URL/pipelines/{id}" -H "Authorization: Bearer $API_TOKEN"64curl -X DELETE "$BASE_URL/sinks/{id}" -H "Authorization: Bearer $API_TOKEN"65curl -X DELETE "$BASE_URL/streams/{id}" -H "Authorization: Bearer $API_TOKEN"66```6768> `wrangler pipelines delete` defaults to "no" non-interactively — use the REST API for automated cleanup. Deleting a stream removes buffered events and dependent pipelines.6970### Pipeline Lifecycle States7172| Status | Meaning |73|--------|---------|74| `running` | Active, processing events |75| `initializing` | Starting up (minutes after creation or recovery) |76| `failed` | Stopped on error — check `failure_reason` (expired token, deleted bucket, disabled catalog) |7778> A `GET` on a sink shows `schema.fields: []` — expected. The sink inherits schema from the stream via the pipeline SQL.7980## Pipeline SQL (Transforms)8182Row-level only — no GROUP BY/aggregation. CTEs (`WITH`) and `UNNEST` are supported. Full function list: `https://developers.cloudflare.com/pipelines/sql-reference/`.8384```sql85-- Passthrough / filter / enrich86INSERT INTO my_sink SELECT * FROM my_stream;87INSERT INTO my_sink SELECT * FROM my_stream WHERE amount > 10;88INSERT INTO my_sink89SELECT event_id, UPPER(category) AS category, amount * 1.1 AS amount_with_tax90FROM my_stream;9192-- CTE93WITH filtered AS (SELECT event_id, amount FROM my_stream WHERE amount > 50)94INSERT INTO my_sink SELECT * FROM filtered;9596-- UNNEST arrays (one per SELECT)97SELECT UNNEST(tags) AS tag FROM my_stream;98```99100Supported categories: string, regex, hashing (`sha256`), JSON extraction, timestamp conversion, conditional (`CASE`), `CAST`, `COALESCE`, math/comparison operators.101102## Verifying End-to-End Data Flow103104```bash105# 1. Pipeline running (not initializing/failed)?106curl -s "$BASE_URL/pipelines/{id}" -H "Authorization: Bearer $API_TOKEN"107108# 2. Table created yet? (3–7 min on first flush)109curl -s "https://api.cloudflare.com/client/v4/accounts/$ACCOUNT_ID/r2-catalog/$BUCKET/namespaces/my_ns/tables" \110-H "Authorization: Bearer $API_TOKEN"111112# 3. Data present? (R2 SQL)113curl -s -X POST \114"https://api.sql.cloudflarestorage.com/api/v1/accounts/$ACCOUNT_ID/r2-sql/query/$BUCKET" \115-H "Authorization: Bearer $API_TOKEN" -H "Content-Type: application/json" \116-d '{"query": "SELECT COUNT(*) AS total FROM my_ns.my_table"}'117```118119> Expect **3–7 minutes** from first send to first queryable data. Subsequent flushes are much faster.120121## See Also122123- [configuration.md](configuration.md) — creating resources · [patterns.md](patterns.md) — producers, Logpush, observability124- [r2-sql/api.md](../r2-sql/api.md) — querying results125