Loading source
Pulling the file list, source metadata, and syntax-aware rendering for this listing.
Source from repo
Comprehensive Cloudflare platform skill covering Workers, D1, R2, KV, AI, Durable Objects, and security.
Files
Skill
Size
Entrypoint
Format
Open file
Syntax-highlighted preview of this file as included in the skill package.
references/pipelines/api.md
1# Pipelines API Reference23## Pipeline Binding Interface45```typescript6// From @cloudflare/workers-types7interface Pipeline {8send(data: object | object[]): Promise<void>;9}1011interface Env {12STREAM: Pipeline;13}1415export default {16async fetch(request: Request, env: Env): Promise<Response> {17// send() returns Promise<void> - no result data18await env.STREAM.send([event]);19return new Response('OK');20}21} satisfies ExportedHandler<Env>;22```2324**Key points:**25- `send()` accepts single object or array26- Always returns `Promise<void>` (no confirmation data)27- Throws on network/validation errors (wrap in try/catch)28- Use `ctx.waitUntil()` for fire-and-forget pattern2930## Writing Events3132### Single Event3334```typescript35await env.STREAM.send([{36user_id: "12345",37event_type: "purchase",38product_id: "widget-001",39amount: 29.9940}]);41```4243### Batch Events4445```typescript46const events = [47{ user_id: "user1", event_type: "view" },48{ user_id: "user2", event_type: "purchase", amount: 50 }49];50await env.STREAM.send(events);51```5253**Limits:**54- Max 1 MB per request55- 5 MB/s per stream5657### Fire-and-Forget Pattern5859```typescript60export default {61async fetch(request: Request, env: Env, ctx: ExecutionContext): Promise<Response> {62const event = { /* ... */ };6364// Don't block response on send65ctx.waitUntil(env.STREAM.send([event]));6667return new Response('OK');68}69};70```7172### Error Handling7374```typescript75try {76await env.STREAM.send([event]);77} catch (error) {78console.error('Pipeline send failed:', error);79// Log to another system, retry, or return error response80return new Response('Failed to track event', { status: 500 });81}82```8384## HTTP Ingest API8586### Endpoint Format8788```89https://{stream-id}.ingest.cloudflare.com90```9192Get `{stream-id}` from: `npx wrangler pipelines streams list`9394### Request Format9596**CRITICAL:** Must send array, not single object9798```bash99# ✅ Correct100curl -X POST https://{stream-id}.ingest.cloudflare.com \101-H "Content-Type: application/json" \102-d '[{"user_id": "123", "event_type": "purchase"}]'103104# ❌ Wrong - will fail105curl -X POST https://{stream-id}.ingest.cloudflare.com \106-H "Content-Type: application/json" \107-d '{"user_id": "123", "event_type": "purchase"}'108```109110### Authentication111112```bash113curl -X POST https://{stream-id}.ingest.cloudflare.com \114-H "Content-Type: application/json" \115-H "Authorization: Bearer YOUR_API_TOKEN" \116-d '[{"event": "data"}]'117```118119**Required permission:** Workers Pipeline Send120121Create token: Dashboard → Workers → API tokens → Create with Pipeline Send permission122123### Response Codes124125| Code | Meaning | Action |126|------|---------|--------|127| 200 | Accepted | Success |128| 400 | Invalid format | Check JSON array, schema match |129| 401 | Auth failed | Verify token valid |130| 413 | Payload too large | Split into smaller batches (<1 MB) |131| 429 | Rate limited | Back off, retry with delay |132| 5xx | Server error | Retry with exponential backoff |133134## SQL Functions Quick Reference135136Available in `INSERT INTO sink SELECT ... FROM stream` transformations:137138| Function | Example | Use Case |139|----------|---------|----------|140| `UPPER(s)` | `UPPER(event_type)` | Normalize strings |141| `LOWER(s)` | `LOWER(email)` | Case-insensitive matching |142| `CONCAT(...)` | `CONCAT(user_id, '_', product_id)` | Generate composite keys |143| `CASE WHEN ... THEN ... END` | `CASE WHEN amount > 100 THEN 'high' ELSE 'low' END` | Conditional enrichment |144| `CAST(x AS type)` | `CAST(timestamp AS string)` | Type conversion |145| `COALESCE(x, y)` | `COALESCE(amount, 0.0)` | Default values |146| Math operators | `amount * 1.1`, `price / quantity` | Calculations |147| Comparison | `amount > 100`, `status IN ('active', 'pending')` | Filtering |148149**String types for CAST:** `string`, `int32`, `int64`, `float32`, `float64`, `bool`, `timestamp`150151Full reference: [Pipelines SQL Reference](https://developers.cloudflare.com/pipelines/sql-reference/)152153## SQL Transform Examples154155### Filter Events156157```sql158INSERT INTO my_sink159SELECT * FROM my_stream160WHERE event_type = 'purchase' AND amount > 100161```162163### Select Specific Fields164165```sql166INSERT INTO my_sink167SELECT user_id, event_type, timestamp, amount168FROM my_stream169```170171### Transform and Enrich172173```sql174INSERT INTO my_sink175SELECT176user_id,177UPPER(event_type) as event_type,178timestamp,179amount * 1.1 as amount_with_tax,180CONCAT(user_id, '_', product_id) as unique_key,181CASE182WHEN amount > 1000 THEN 'high_value'183WHEN amount > 100 THEN 'medium_value'184ELSE 'low_value'185END as customer_tier186FROM my_stream187WHERE event_type IN ('purchase', 'refund')188```189190## Querying Results (R2 Data Catalog)191192```bash193export WRANGLER_R2_SQL_AUTH_TOKEN=YOUR_CATALOG_TOKEN194195npx wrangler r2 sql query "warehouse_name" "196SELECT197event_type,198COUNT(*) as event_count,199SUM(amount) as total_revenue200FROM default.my_table201WHERE event_type = 'purchase'202AND timestamp >= '2025-01-01'203GROUP BY event_type204ORDER BY total_revenue DESC205LIMIT 100"206```207208**Note:** Iceberg tables support standard SQL queries with GROUP BY, JOINs, WHERE, ORDER BY, etc.209