Loading source
Pulling the file list, source metadata, and syntax-aware rendering for this listing.
Source from repo
Comprehensive Cloudflare platform skill covering Workers, D1, R2, KV, AI, Durable Objects, and security.
Files
Skill
Size
Entrypoint
Format
Open file
Syntax-highlighted preview of this file as included in the skill package.
references/r2-data-catalog/api.md
1# API Reference23R2 Data Catalog exposes standard [Apache Iceberg REST Catalog API](https://github.com/apache/iceberg/blob/main/open-api/rest-catalog-open-api.yaml).45## Quick Reference67**Most common operations:**89| Task | PyIceberg Code |10|------|----------------|11| Connect | `RestCatalog(name="r2", warehouse=bucket, uri=uri, token=token)` |12| List namespaces | `catalog.list_namespaces()` |13| Create namespace | `catalog.create_namespace("logs")` |14| Create table | `catalog.create_table(("ns", "table"), schema=schema)` |15| Load table | `catalog.load_table(("ns", "table"))` |16| Append data | `table.append(pyarrow_table)` |17| Query data | `table.scan().to_pandas()` |18| Compact files | `table.rewrite_data_files(target_file_size_bytes=128*1024*1024)` |19| Expire snapshots | `table.expire_snapshots(older_than=timestamp_ms, retain_last=10)` |2021## REST Endpoints2223Base: `https://<account-id>.r2.cloudflarestorage.com/iceberg/<bucket-name>`2425| Operation | Method | Path |26|-----------|--------|------|27| Catalog config | GET | `/v1/config` |28| List namespaces | GET | `/v1/namespaces` |29| Create namespace | POST | `/v1/namespaces` |30| Delete namespace | DELETE | `/v1/namespaces/{ns}` |31| List tables | GET | `/v1/namespaces/{ns}/tables` |32| Create table | POST | `/v1/namespaces/{ns}/tables` |33| Load table | GET | `/v1/namespaces/{ns}/tables/{table}` |34| Update table | POST | `/v1/namespaces/{ns}/tables/{table}` |35| Delete table | DELETE | `/v1/namespaces/{ns}/tables/{table}` |36| Rename table | POST | `/v1/tables/rename` |3738**Authentication:** Bearer token in header: `Authorization: Bearer <token>`3940## PyIceberg Client API4142Most users use PyIceberg, not raw REST.4344### Connection4546```python47from pyiceberg.catalog.rest import RestCatalog4849catalog = RestCatalog(50name="my_catalog",51warehouse="<bucket-name>",52uri="<catalog-uri>",53token="<api-token>",54)55```5657### Namespace Operations5859```python60from pyiceberg.exceptions import NamespaceAlreadyExistsError6162namespaces = catalog.list_namespaces() # [('default',), ('logs',)]63catalog.create_namespace("logs", properties={"owner": "team"})64catalog.drop_namespace("logs") # Must be empty65```6667### Table Operations6869```python70from pyiceberg.schema import Schema71from pyiceberg.types import NestedField, StringType, IntegerType7273schema = Schema(74NestedField(1, "id", IntegerType(), required=True),75NestedField(2, "name", StringType(), required=False),76)77table = catalog.create_table(("logs", "app_logs"), schema=schema)78tables = catalog.list_tables("logs")79table = catalog.load_table(("logs", "app_logs"))80catalog.rename_table(("logs", "old"), ("logs", "new"))81```8283### Data Operations8485```python86import pyarrow as pa8788data = pa.table({"id": [1, 2], "name": ["Alice", "Bob"]})89table.append(data)90table.overwrite(data)9192# Read with filters93scan = table.scan(row_filter="id > 100", selected_fields=["id", "name"])94df = scan.to_pandas()95```9697### Schema Evolution9899```python100from pyiceberg.types import IntegerType, LongType101102with table.update_schema() as update:103update.add_column("user_id", IntegerType(), doc="User ID")104update.rename_column("msg", "message")105update.delete_column("old_field")106update.update_column("id", field_type=LongType()) # int→long only107```108109### Time-Travel110111```python112from datetime import datetime, timedelta113114# Query specific snapshot or timestamp115scan = table.scan(snapshot_id=table.snapshots()[-2].snapshot_id)116yesterday_ms = int((datetime.now() - timedelta(days=1)).timestamp() * 1000)117scan = table.scan(as_of_timestamp=yesterday_ms)118```119120### Partitioning121122```python123from pyiceberg.partitioning import PartitionSpec, PartitionField124from pyiceberg.transforms import DayTransform125from pyiceberg.types import TimestampType126127partition_spec = PartitionSpec(128PartitionField(source_id=1, field_id=1000, transform=DayTransform(), name="day")129)130table = catalog.create_table(("events", "actions"), schema=schema, partition_spec=partition_spec)131scan = table.scan(row_filter="day = '2026-01-27'") # Prunes partitions132```133134## Table Maintenance135136### Compaction137138```python139files = table.scan().plan_files()140avg_mb = sum(f.file_size_in_bytes for f in files) / len(files) / (1024**2)141print(f"Files: {len(files)}, Avg: {avg_mb:.1f} MB")142143table.rewrite_data_files(target_file_size_bytes=128 * 1024 * 1024)144```145146**When:** Avg <10MB or >1000 files. **Frequency:** High-write daily, medium weekly.147148### Snapshot Expiration149150```python151from datetime import datetime, timedelta152153seven_days_ms = int((datetime.now() - timedelta(days=7)).timestamp() * 1000)154table.expire_snapshots(older_than=seven_days_ms, retain_last=10)155```156157**Retention:** Production 7-30d, dev 1-7d, audit 90+d.158159### Orphan Cleanup160161```python162three_days_ms = int((datetime.now() - timedelta(days=3)).timestamp() * 1000)163table.delete_orphan_files(older_than=three_days_ms)164```165166⚠️ Always expire snapshots first, use 3+ day threshold, run during low traffic.167168### Full Maintenance169170```python171# Compact → Expire → Cleanup (in order)172if len(table.scan().plan_files()) > 1000:173table.rewrite_data_files(target_file_size_bytes=128 * 1024 * 1024)174seven_days_ms = int((datetime.now() - timedelta(days=7)).timestamp() * 1000)175table.expire_snapshots(older_than=seven_days_ms, retain_last=10)176three_days_ms = int((datetime.now() - timedelta(days=3)).timestamp() * 1000)177table.delete_orphan_files(older_than=three_days_ms)178```179180## Metadata Inspection181182```python183table = catalog.load_table(("logs", "app_logs"))184print(table.schema())185print(table.current_snapshot())186print(table.properties)187print(f"Files: {len(table.scan().plan_files())}")188```189190## Error Codes191192| Code | Meaning | Common Causes |193|------|---------|---------------|194| 401 | Unauthorized | Invalid/missing token |195| 404 | Not Found | Catalog not enabled, namespace/table missing |196| 409 | Conflict | Already exists, concurrent update |197| 422 | Validation | Invalid schema, incompatible type |198199See [gotchas.md](gotchas.md) for detailed troubleshooting.200