Loading source
Pulling the file list, source metadata, and syntax-aware rendering for this listing.
Source from repo
Comprehensive Cloudflare platform skill covering Workers, D1, R2, KV, AI, Durable Objects, and security.
Files
Skill
Size
Entrypoint
Format
Open file
Syntax-highlighted preview of this file as included in the skill package.
references/r2-sql/patterns.md
1# R2 SQL Patterns23Common patterns, use cases, and integration examples for R2 SQL.45## Wrangler CLI Query67```bash8# Basic query9npx wrangler r2 sql query "my-bucket" "SELECT * FROM default.logs LIMIT 10"1011# Multi-line query12npx wrangler r2 sql query "my-bucket" "13SELECT status, COUNT(*), AVG(response_time)14FROM logs.http_requests15WHERE timestamp >= '2025-01-01T00:00:00Z'16GROUP BY status17ORDER BY COUNT(*) DESC18LIMIT 10019"2021# Use environment variable22export R2_SQL_WAREHOUSE="my-bucket"23npx wrangler r2 sql query "$R2_SQL_WAREHOUSE" "SELECT * FROM default.logs"24```2526## HTTP API Query2728For programmatic access from external systems (not Workers - see gotchas.md).2930```bash31curl -X POST https://api.cloudflare.com/client/v4/accounts/{account_id}/r2/sql/query \32-H "Authorization: Bearer <your-token>" \33-H "Content-Type: application/json" \34-d '{35"warehouse": "my-bucket",36"query": "SELECT * FROM default.my_table WHERE status = 200 LIMIT 100"37}'38```3940Response:41```json42{43"success": true,44"result": [{"user_id": "user_123", "timestamp": "2025-01-15T10:30:00Z", "status": 200}],45"errors": []46}47```4849## Pipelines Integration5051Stream data to Iceberg tables via Pipelines, then query with R2 SQL.5253```bash54# Setup pipeline (select Data Catalog Table destination)55npx wrangler pipelines setup5657# Key settings:58# - Destination: Data Catalog Table59# - Compression: zstd (recommended)60# - Roll file time: 300+ sec (production), 10 sec (dev)6162# Send data to pipeline63curl -X POST https://{stream-id}.ingest.cloudflare.com \64-H "Content-Type: application/json" \65-d '[{"user_id": "user_123", "event_type": "purchase", "timestamp": "2025-01-15T10:30:00Z", "amount": 29.99}]'6667# Query ingested data (wait for roll interval)68npx wrangler r2 sql query "my-bucket" "69SELECT event_type, COUNT(*), SUM(amount)70FROM default.events71WHERE timestamp >= '2025-01-15T00:00:00Z'72GROUP BY event_type73"74```7576See [pipelines/patterns.md](../pipelines/patterns.md) for detailed setup.7778## PyIceberg Integration7980Create and populate Iceberg tables with PyIceberg, then query with R2 SQL.8182```python83from pyiceberg.catalog.rest import RestCatalog84import pyarrow as pa85import pandas as pd8687# Setup catalog88catalog = RestCatalog(89name="my_catalog",90warehouse="my-bucket",91uri="https://<account-id>.r2.cloudflarestorage.com/iceberg/my-bucket",92token="<your-token>",93)94catalog.create_namespace_if_not_exists("analytics")9596# Create table97schema = pa.schema([98pa.field("user_id", pa.string(), nullable=False),99pa.field("event_time", pa.timestamp("us", tz="UTC"), nullable=False),100pa.field("page_views", pa.int64(), nullable=False),101])102table = catalog.create_table(("analytics", "user_metrics"), schema=schema)103104# Append data105df = pd.DataFrame({106"user_id": ["user_1", "user_2"],107"event_time": pd.to_datetime(["2025-01-15 10:00:00", "2025-01-15 11:00:00"], utc=True),108"page_views": [10, 25],109})110table.append(pa.Table.from_pandas(df, schema=schema))111```112113Query with R2 SQL:114```bash115npx wrangler r2 sql query "my-bucket" "116SELECT user_id, SUM(page_views)117FROM analytics.user_metrics118WHERE event_time >= '2025-01-15T00:00:00Z'119GROUP BY user_id120"121```122123See [r2-data-catalog/patterns.md](../r2-data-catalog/patterns.md) for advanced PyIceberg patterns.124125## Use Cases126127### Log Analytics128```sql129-- Error rate by endpoint130SELECT path, COUNT(*), SUM(CASE WHEN status >= 400 THEN 1 ELSE 0 END) as errors131FROM logs.http_requests132WHERE timestamp BETWEEN '2025-01-01T00:00:00Z' AND '2025-01-31T23:59:59Z'133GROUP BY path ORDER BY errors DESC LIMIT 20;134135-- Response time stats136SELECT method, MIN(response_time_ms), AVG(response_time_ms), MAX(response_time_ms)137FROM logs.http_requests WHERE timestamp >= '2025-01-15T00:00:00Z' GROUP BY method;138139-- Traffic by status140SELECT status, COUNT(*) FROM logs.http_requests141WHERE timestamp >= '2025-01-15T00:00:00Z' AND method = 'GET'142GROUP BY status ORDER BY COUNT(*) DESC;143```144145### Fraud Detection146```sql147-- High-value transactions148SELECT location, COUNT(*), SUM(amount), AVG(amount)149FROM fraud.transactions WHERE transaction_timestamp >= '2025-01-01T00:00:00Z' AND amount > 1000.0150GROUP BY location ORDER BY SUM(amount) DESC LIMIT 20;151152-- Flagged transactions153SELECT merchant_category, COUNT(*), AVG(amount) FROM fraud.transactions154WHERE is_fraud_flag = true AND transaction_timestamp >= '2025-01-01T00:00:00Z'155GROUP BY merchant_category HAVING COUNT(*) > 10 ORDER BY COUNT(*) DESC;156```157158### Business Intelligence159```sql160-- Sales by department161SELECT department, SUM(revenue), AVG(revenue), COUNT(*) FROM sales.transactions162WHERE sale_date >= '2024-01-01' GROUP BY department ORDER BY SUM(revenue) DESC LIMIT 10;163164-- Product performance165SELECT category, COUNT(DISTINCT product_id), SUM(units_sold), SUM(revenue)166FROM sales.product_sales WHERE sale_date BETWEEN '2024-10-01' AND '2024-12-31'167GROUP BY category ORDER BY SUM(revenue) DESC;168```169170## Connecting External Engines171172R2 Data Catalog exposes Iceberg REST API. Connect Spark, Snowflake, Trino, DuckDB, etc.173174```scala175// Apache Spark example176val spark = SparkSession.builder()177.config("spark.sql.catalog.my_catalog", "org.apache.iceberg.spark.SparkCatalog")178.config("spark.sql.catalog.my_catalog.catalog-impl", "org.apache.iceberg.rest.RESTCatalog")179.config("spark.sql.catalog.my_catalog.uri", "https://<account-id>.r2.cloudflarestorage.com/iceberg/my-bucket")180.config("spark.sql.catalog.my_catalog.token", "<token>")181.getOrCreate()182183spark.sql("SELECT * FROM my_catalog.default.my_table LIMIT 10").show()184```185186See [r2-data-catalog/patterns.md](../r2-data-catalog/patterns.md) for more engines.187188## Performance Optimization189190### Partitioning191- **Time-series:** day/hour on timestamp192- **Geographic:** region/country193- **Avoid:** High-cardinality keys (user_id)194195```python196from pyiceberg.partitioning import PartitionSpec, PartitionField197from pyiceberg.transforms import DayTransform198199PartitionSpec(PartitionField(source_id=1, field_id=1000, transform=DayTransform(), name="day"))200```201202### Query Optimization203- **Always use LIMIT** for early termination204- **Filter on partition keys first**205- **Multiple filters** for better pruning206207```sql208-- Better: Multiple filters on partition key209SELECT * FROM logs.requests210WHERE timestamp >= '2025-01-15T00:00:00Z' AND status = 404 AND method = 'GET' LIMIT 100;211```212213### File Organization214- **Pipelines roll:** Dev 10-30s, Prod 300+s215- **Target Parquet:** 100-500MB compressed216217## See Also218219- [api.md](api.md) - SQL syntax reference220- [gotchas.md](gotchas.md) - Limitations and troubleshooting221- [r2-data-catalog/patterns.md](../r2-data-catalog/patterns.md) - PyIceberg advanced patterns222- [pipelines/patterns.md](../pipelines/patterns.md) - Streaming ingestion patterns223