Loading source
Pulling the file list, source metadata, and syntax-aware rendering for this listing.
Source from repo
Comprehensive Cloudflare platform skill covering Workers, D1, R2, KV, AI, Durable Objects, and security.
Files
Skill
Size
Entrypoint
Format
Open file
Syntax-highlighted preview of this file as included in the skill package.
references/r2-data-catalog/patterns.md
1# Common Patterns23Practical patterns for R2 Data Catalog with PyIceberg.45## PyIceberg Connection67```python8import os9from pyiceberg.catalog.rest import RestCatalog10from pyiceberg.exceptions import NamespaceAlreadyExistsError1112catalog = RestCatalog(13name="r2_catalog",14warehouse=os.getenv("R2_WAREHOUSE"), # bucket name15uri=os.getenv("R2_CATALOG_URI"), # catalog endpoint16token=os.getenv("R2_TOKEN"), # API token17)1819# Create namespace (idempotent)20try:21catalog.create_namespace("default")22except NamespaceAlreadyExistsError:23pass24```2526## Pattern 1: Log Analytics Pipeline2728Ingest logs incrementally, query by time/level.2930```python31import pyarrow as pa32from datetime import datetime33from pyiceberg.schema import Schema34from pyiceberg.types import NestedField, TimestampType, StringType, IntegerType35from pyiceberg.partitioning import PartitionSpec, PartitionField36from pyiceberg.transforms import DayTransform3738# Create partitioned table (once)39schema = Schema(40NestedField(1, "timestamp", TimestampType(), required=True),41NestedField(2, "level", StringType(), required=True),42NestedField(3, "service", StringType(), required=True),43NestedField(4, "message", StringType(), required=False),44)4546partition_spec = PartitionSpec(47PartitionField(source_id=1, field_id=1000, transform=DayTransform(), name="day")48)4950catalog.create_namespace("logs")51table = catalog.create_table(("logs", "app_logs"), schema=schema, partition_spec=partition_spec)5253# Append logs (incremental)54data = pa.table({55"timestamp": [datetime(2026, 1, 27, 10, 30, 0)],56"level": ["ERROR"],57"service": ["auth-service"],58"message": ["Failed login"],59})60table.append(data)6162# Query by time + level (leverages partitioning)63scan = table.scan(row_filter="level = 'ERROR' AND day = '2026-01-27'")64errors = scan.to_pandas()65```6667## Pattern 2: Time-Travel Queries6869```python70from datetime import datetime, timedelta7172table = catalog.load_table(("logs", "app_logs"))7374# Query specific snapshot75snapshot_id = table.current_snapshot().snapshot_id76data = table.scan(snapshot_id=snapshot_id).to_pandas()7778# Query as of timestamp (yesterday)79yesterday_ms = int((datetime.now() - timedelta(days=1)).timestamp() * 1000)80data = table.scan(as_of_timestamp=yesterday_ms).to_pandas()81```8283## Pattern 3: Schema Evolution8485```python86from pyiceberg.types import StringType8788table = catalog.load_table(("users", "profiles"))8990with table.update_schema() as update:91update.add_column("email", StringType(), required=False)92update.rename_column("name", "full_name")93# Old readers ignore new columns, new readers see nulls for old data94```9596## Pattern 4: Partitioned Tables9798```python99from pyiceberg.partitioning import PartitionSpec, PartitionField100from pyiceberg.transforms import DayTransform, IdentityTransform101102# Partition by day + country103partition_spec = PartitionSpec(104PartitionField(source_id=1, field_id=1000, transform=DayTransform(), name="day"),105PartitionField(source_id=2, field_id=1001, transform=IdentityTransform(), name="country"),106)107table = catalog.create_table(("events", "user_events"), schema=schema, partition_spec=partition_spec)108109# Queries prune partitions automatically110scan = table.scan(row_filter="country = 'US' AND day = '2026-01-27'")111```112113## Pattern 5: Table Maintenance114115```python116from datetime import datetime, timedelta117118table = catalog.load_table(("logs", "app_logs"))119120# Compact → expire → cleanup (in order)121table.rewrite_data_files(target_file_size_bytes=128 * 1024 * 1024)122seven_days_ms = int((datetime.now() - timedelta(days=7)).timestamp() * 1000)123table.expire_snapshots(older_than=seven_days_ms, retain_last=10)124three_days_ms = int((datetime.now() - timedelta(days=3)).timestamp() * 1000)125table.delete_orphan_files(older_than=three_days_ms)126```127128See [api.md](api.md#table-maintenance) for detailed parameters.129130## Pattern 6: Concurrent Writes with Retry131132```python133from pyiceberg.exceptions import CommitFailedException134import time135136def append_with_retry(table, data, max_retries=3):137for attempt in range(max_retries):138try:139table.append(data)140return141except CommitFailedException:142if attempt == max_retries - 1:143raise144time.sleep(2 ** attempt)145```146147## Pattern 7: Upsert Simulation148149```python150import pandas as pd151import pyarrow as pa152153# Read → merge → overwrite (not atomic, use Spark MERGE INTO for production)154existing = table.scan().to_pandas()155new_data = pd.DataFrame({"id": [1, 3], "value": [100, 300]})156merged = pd.concat([existing, new_data]).drop_duplicates(subset=["id"], keep="last")157table.overwrite(pa.Table.from_pandas(merged))158```159160## Pattern 8: DuckDB Integration161162```python163import duckdb164165arrow_table = table.scan().to_arrow()166con = duckdb.connect()167con.register("logs", arrow_table)168result = con.execute("SELECT level, COUNT(*) FROM logs GROUP BY level").fetchdf()169```170171## Pattern 9: Monitor Table Health172173```python174files = table.scan().plan_files()175avg_mb = sum(f.file_size_in_bytes for f in files) / len(files) / (1024**2)176print(f"Files: {len(files)}, Avg: {avg_mb:.1f}MB, Snapshots: {len(table.snapshots())}")177178if avg_mb < 10 or len(files) > 1000:179print("⚠️ Needs compaction")180```181182## Best Practices183184| Area | Guideline |185|------|-----------|186| **Partitioning** | Use day/hour for time-series; 100-1000 partitions; avoid high cardinality |187| **File sizes** | Target 128-512MB; compact when avg <10MB or >10k files |188| **Schema** | Add columns as nullable (`required=False`); batch changes |189| **Maintenance** | Compact high-write daily/weekly; expire snapshots 7-30d; cleanup orphans after |190| **Concurrency** | Reads automatic; writes to different partitions safe; retry same partition |191| **Performance** | Filter on partitions; select only needed columns; batch appends 100MB+ |192