Loading source
Pulling the file list, source metadata, and syntax-aware rendering for this listing.
Source from repo
Comprehensive Cloudflare platform skill covering Workers, D1, R2, KV, AI, Durable Objects, and security.
Files
Skill
Size
Entrypoint
Format
Open file
Syntax-highlighted preview of this file as included in the skill package.
references/r2-data-catalog/patterns.md
1# R2 Data Catalog Patterns23Code templates with PyIceberg (lightweight, no JVM) and PySpark (full Iceberg ecosystem). For per-engine config (DuckDB, Trino, Snowflake, StarRocks) and partitioning/maintenance best practices, pull `https://developers.cloudflare.com/r2/data-catalog/config-examples/` and `.../table-maintenance/`.45| Need | Tool |6|------|------|7| Catalog ops, append/scan, small-medium loads | PyIceberg |8| Batch ETL, INSERT INTO SELECT, DELETE/MERGE, write-back, >1 TB maintenance | PySpark |9| Pure SQL analytics (no writes) | [R2 SQL](../r2-sql/) |1011## PyIceberg: Connect, Create, Load1213```python14import os, pyarrow as pa15from pyiceberg.catalog.rest import RestCatalog1617catalog = RestCatalog(18name="r2",19warehouse=os.environ["R2_WAREHOUSE"], # {ACCOUNT_ID}_{BUCKET}20uri=os.environ["R2_CATALOG_URI"], # https://catalog.cloudflarestorage.com/{ACCOUNT_ID}/{BUCKET}21token=os.environ["R2_TOKEN"],22)23catalog.create_namespace_if_not_exists("analytics")2425schema = pa.schema([("id", pa.int64()), ("name", pa.string()), ("amount", pa.float64())])26table = catalog.create_table(("analytics", "events"), schema=schema)27table.append(pa.table({"id": [1, 2], "name": ["a", "b"], "amount": [80.0, 92.5]}))28print(table.scan().to_arrow().to_pandas())29```3031## PyIceberg: Partitioned Time-Series Table3233```python34from pyiceberg.schema import Schema35from pyiceberg.types import NestedField, TimestampType, StringType36from pyiceberg.partitioning import PartitionSpec, PartitionField37from pyiceberg.transforms import DayTransform3839schema = Schema(40NestedField(1, "timestamp", TimestampType(), required=True),41NestedField(2, "level", StringType(), required=True),42NestedField(3, "message", StringType(), required=False),43)44spec = PartitionSpec(PartitionField(source_id=1, field_id=1000, transform=DayTransform(), name="day"))45table = catalog.create_table(("logs", "app_logs"), schema=schema, partition_spec=spec)46errors = table.scan(row_filter="level = 'ERROR'").to_pandas() # partition pruning47```4849## PySpark Session5051Verified template — requires Iceberg **1.6.1** and vended credentials. S3 keys are only needed for orphan-file removal. (If this drifts, cross-check `config-examples/spark-python/`.)5253```python54from pyspark.sql import SparkSession5556spark = SparkSession.builder \57.appName("R2DataCatalog") \58.config('spark.jars.packages',59'org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.6.1,'60'org.apache.iceberg:iceberg-aws-bundle:1.6.1,'61'org.apache.hadoop:hadoop-aws:3.3.4,'62'com.amazonaws:aws-java-sdk-bundle:1.12.262') \63.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \64.config("spark.sql.catalog.r2dc", "org.apache.iceberg.spark.SparkCatalog") \65.config("spark.sql.catalog.r2dc.type", "rest") \66.config("spark.sql.catalog.r2dc.uri", CATALOG_URI) \67.config("spark.sql.catalog.r2dc.warehouse", WAREHOUSE) \68.config("spark.sql.catalog.r2dc.token", TOKEN) \69.config("spark.sql.catalog.r2dc.header.X-Iceberg-Access-Delegation", "vended-credentials") \70.config("spark.sql.catalog.r2dc.s3.remote-signing-enabled", "false") \71.config("spark.sql.defaultCatalog", "r2dc") \72.config("spark.hadoop.fs.s3a.access.key", S3_ACCESS_KEY) \73.config("spark.hadoop.fs.s3a.secret.key", S3_SECRET_KEY) \74.config("spark.hadoop.fs.s3a.endpoint", S3_ENDPOINT) \75.config("spark.hadoop.fs.s3a.path.style.access", "true") \76.getOrCreate()77spark.sql("USE r2dc")78```7980> `X-Iceberg-Access-Delegation: vended-credentials` is required; `s3.remote-signing-enabled` must be `false`. First startup ~30–60s for JAR downloads (cached after).8182## PySpark: Batch ETL8384```python85spark.sql("""86CREATE TABLE IF NOT EXISTS my_ns.events (87__ingest_ts TIMESTAMP, event_id STRING, category STRING, amount DOUBLE88) PARTITIONED BY (days(__ingest_ts))89""")9091spark.read.option("header","true").csv("data.csv").writeTo("my_ns.events").append()92spark.read.parquet("data.parquet").writeTo("my_ns.events").append()93spark.sql("INSERT INTO my_ns.target SELECT col1, col2 FROM my_ns.source WHERE col1 > 0")94spark.sql("DELETE FROM my_ns.events WHERE amount < 0")95```9697> Partition large tables (`PARTITIONED BY (days(__ingest_ts))`). Unpartitioned works for small datasets (<1000 files) but degrades at scale.9899## Concurrent Writes with Retry (PyIceberg)100101```python102from pyiceberg.exceptions import CommitFailedException103import time104105def append_with_retry(table, data, max_retries=3):106for attempt in range(max_retries):107try:108table.append(data); return109except CommitFailedException:110if attempt == max_retries - 1: raise111time.sleep(2 ** attempt)112```113114Optimistic locking: concurrent commits to the same table may conflict; different-partition writes are safe.115116## Connecting Any Iceberg Engine117118Engines connect with the Iceberg REST catalog config — Catalog URI `https://catalog.cloudflarestorage.com/{ACCOUNT_ID}/{BUCKET}`, warehouse `{ACCOUNT_ID}_{BUCKET}`, your token, and header `X-Iceberg-Access-Delegation: vended-credentials`. Copy-paste configs per engine: `config-examples/`.119120## See Also121122- [api.md](api.md) · [gotchas.md](gotchas.md) · [pipelines/patterns.md](../pipelines/patterns.md) · [r2-sql/patterns.md](../r2-sql/patterns.md)123