Loading source
Pulling the file list, source metadata, and syntax-aware rendering for this listing.
Source from repo
Comprehensive Cloudflare platform skill covering Workers, D1, R2, KV, AI, Durable Objects, and security.
Files
Skill
Size
Entrypoint
Format
Open file
Syntax-highlighted preview of this file as included in the skill package.
references/pipelines/configuration.md
1# Pipelines Configuration23Templates for creating streams, sinks, and pipelines via CLI, REST, or Terraform. For the full flag/field list and allowed values, pull `https://developers.cloudflare.com/pipelines/reference/wrangler-commands/` and the streams/sinks/pipelines docs.45## Naming Rules67- **Streams, sinks, pipelines** use underscores: `my_stream`, `my_sink`, `my_pipeline`.8- **Buckets** use hyphens: `my-bucket`.910## Schema (Structured Streams)1112Schema is a JSON object with a `fields` array; each field has `name`, `type`, `required`.1314```json15{16"fields": [17{ "name": "event_id", "type": "string", "required": true },18{ "name": "amount", "type": "float64", "required": false }19]20}21```2223Field types include `string`, `bool`, `int32/64`, `float32/64`, `timestamp`, `json`, `binary`, `list`, `struct` (with nested `items`/`fields`). For the authoritative type list, see `https://developers.cloudflare.com/pipelines/sql-reference/sql-data-types/`.2425Unstructured streams (no schema) store everything in a single `value` column.2627> Pipelines auto-adds `__ingest_ts` (TIMESTAMP, day-partitioned). Do **not** include it in your schema.2829## Option A: Interactive (Simplest)3031```bash32npx wrangler pipelines setup # creates stream + sink + pipeline, optionally bucket + catalog33```3435## Option B: Wrangler CLI (Explicit)3637```bash38# 1. Stream39npx wrangler pipelines streams create my_stream --schema-file schema.json4041# 2. Sink — R2 Data Catalog (Iceberg). Creates the namespace + table.42npx wrangler pipelines sinks create my_sink \43--type r2-data-catalog \44--bucket my-bucket --namespace my_namespace --table my_table \45--catalog-token $API_TOKEN \46--compression zstd --roll-interval 3004748# 2b. Sink — R2 raw Parquet (alternative)49npx wrangler pipelines sinks create my_sink \50--type r2 --bucket my-bucket --format parquet \51--path analytics/events --partitioning "year=%Y/month=%m/day=%d" \52--access-key-id $KEY --secret-access-key $SECRET5354# 3. Pipeline (SQL connects stream → sink)55npx wrangler pipelines create my_pipeline \56--sql "INSERT INTO my_sink SELECT * FROM my_stream"57```5859Tuning knobs (`--compression`, `--roll-interval`, `--roll-size`, etc.) and their allowed values/defaults change — pull the wrangler-commands and sinks docs rather than hardcoding. Rule of thumb: prod `--roll-interval 300+`, dev `10` (creates many small files).6061> **⚠️ Pipelines are immutable.** SQL, schema, and sink config can't be changed — delete and recreate.6263## Option C: REST API (Programmatic)6465Base: `https://api.cloudflare.com/client/v4/accounts/$ACCOUNT_ID/pipelines/v1`6667```bash68# Stream69curl -X POST "$BASE_URL/streams" -H "Authorization: Bearer $API_TOKEN" \70-H "Content-Type: application/json" -d '{71"name": "my_stream",72"http": {"enabled": true, "authentication": false},73"schema": {"fields": [{"name": "event_id", "type": "string", "required": true}]}74}'7576# Sink — NOTE REST field names differ from CLI flags (see table)77curl -X POST "$BASE_URL/sinks" -H "Authorization: Bearer $API_TOKEN" \78-H "Content-Type: application/json" -d '{79"name": "my_sink", "type": "r2_data_catalog",80"config": {"bucket": "my-bucket", "namespace": "my_namespace",81"table_name": "my_table", "token": "'$API_TOKEN'",82"rolling_policy": {"interval_seconds": 300}},83"format": {"type": "parquet"}84}'8586# Pipeline87curl -X POST "$BASE_URL/pipelines" -H "Authorization: Bearer $API_TOKEN" \88-H "Content-Type: application/json" \89-d '{"name": "my_pipeline", "sql": "INSERT INTO my_sink SELECT * FROM my_stream;"}'90```9192**REST field names ≠ CLI flags** (common failure — not obvious from docs):9394| REST (config body) | CLI flag | Gotcha |95|--------------------|----------|--------|96| `"type": "r2_data_catalog"` | `--type r2-data-catalog` | underscores vs hyphens |97| `"table_name"` | `--table` | different key |98| `"token"` | `--catalog-token` | different key |99| `"format": {"type": "parquet"}` | (implied) | required in REST, omitted in CLI |100101## Worker Binding102103```jsonc104// wrangler.jsonc105{ "pipelines": [ { "stream": "<STREAM_ID>", "binding": "MY_STREAM" } ] }106```107108> Binding field is `"stream"` as of June 2026 (was `"pipeline"`, still accepted). Use the **stream ID** (`wrangler pipelines streams list`), not the pipeline ID. Redeploy after adding. Generate typed bindings with `npx wrangler types` → `Pipeline<Cloudflare.MyStreamRecord>` from `cloudflare:pipelines`.109110## Terraform111112Resources: `cloudflare_pipeline_stream`, `cloudflare_pipeline_sink`, `cloudflare_pipeline`. For current attribute schemas pull `https://developers.cloudflare.com/pipelines/reference/terraform/`.113114```hcl115resource "cloudflare_pipeline_stream" "my_stream" {116account_id = var.cloudflare_account_id117name = "my_stream"118format = { type = "json" }119schema = { fields = [{ name = "value", type = "json", required = true }] }120http = { enabled = true, authentication = false, cors = {} }121worker_binding = { enabled = false }122}123124resource "cloudflare_pipeline_sink" "my_sink" {125account_id = var.cloudflare_account_id126name = "my_sink"127type = "r2_data_catalog"128format = { type = "parquet" }129schema = { fields = [] }130config = {131account_id = var.cloudflare_account_id132bucket = cloudflare_r2_bucket.pipeline_bucket.name133table_name = "my_table"134token = var.catalog_token135}136}137138resource "cloudflare_pipeline" "my_pipeline" {139account_id = var.cloudflare_account_id140name = "my_pipeline"141sql = "INSERT INTO ${cloudflare_pipeline_sink.my_sink.name} SELECT * FROM ${cloudflare_pipeline_stream.my_stream.name}"142}143```144145## Credentials146147| Type | Permission |148|------|------------|149| Catalog token (Iceberg sink) | R2 Storage Admin R&W + R2 Data Catalog R&W |150| R2 credentials (raw sink) | Object Read & Write |151| HTTP ingest token | Workers Pipelines Send (only if stream auth enabled) |152153## See Also154155- [api.md](api.md) — sending events, REST API, lifecycle · [gotchas.md](gotchas.md) — immutability, REST≠CLI156