Loading source
Pulling the file list, source metadata, and syntax-aware rendering for this listing.
Source from repo
Build LLM-powered apps with the Anthropic Claude API or SDK across Python, TypeScript, Java, Go, Ruby, C#, and PHP.
Files
Skill
Size
Entrypoint
Format
Open file
Syntax-highlighted preview of this file as included in the skill package.
python/claude-api/streaming.md
1# Streaming — Python23## Quick Start45```python6with client.messages.stream(7model="claude-opus-4-8",8max_tokens=64000,9messages=[{"role": "user", "content": "Write a story"}]10) as stream:11for text in stream.text_stream:12print(text, end="", flush=True)13```1415### Async1617```python18async with async_client.messages.stream(19model="claude-opus-4-8",20max_tokens=64000,21messages=[{"role": "user", "content": "Write a story"}]22) as stream:23async for text in stream.text_stream:24print(text, end="", flush=True)25```2627### Low-level: `stream=True`2829`messages.stream()` (above) is the recommended helper — it accumulates state and exposes `text_stream` / `get_final_message()`. If you only need the raw event iterator and want lower memory use, pass `stream=True` to `messages.create()` instead:3031```python32for event in client.messages.create(33model="claude-opus-4-8",34max_tokens=64000,35messages=[{"role": "user", "content": "Write a story"}],36stream=True,37):38print(event.type)39```4041No final-message accumulation is done for you in this form.4243---4445## Handling Different Content Types4647Claude may return text, thinking blocks, or tool use. Handle each appropriately:4849> **Fable 5 / Opus 4.8 / Opus 4.7 / Opus 4.6:** Use `thinking: {type: "adaptive"}`. On older models, use `thinking: {type: "enabled", budget_tokens: N}` instead.5051```python52with client.messages.stream(53model="claude-opus-4-8",54max_tokens=64000,55thinking={"type": "adaptive"},56messages=[{"role": "user", "content": "Analyze this problem"}]57) as stream:58for event in stream:59if event.type == "content_block_start":60if event.content_block.type == "thinking":61print("\n[Thinking...]")62elif event.content_block.type == "text":63print("\n[Response:]")6465elif event.type == "content_block_delta":66if event.delta.type == "thinking_delta":67print(event.delta.thinking, end="", flush=True)68elif event.delta.type == "text_delta":69print(event.delta.text, end="", flush=True)70```7172---7374## Streaming with Tool Use7576The Python tool runner currently returns complete messages. Use streaming for individual API calls within a manual loop if you need per-token streaming with tools:7778```python79with client.messages.stream(80model="claude-opus-4-8",81max_tokens=64000,82tools=tools,83messages=messages84) as stream:85for text in stream.text_stream:86print(text, end="", flush=True)8788response = stream.get_final_message()89# Continue with tool execution if response.stop_reason == "tool_use"90```9192---9394## Getting the Final Message9596```python97with client.messages.stream(98model="claude-opus-4-8",99max_tokens=64000,100messages=[{"role": "user", "content": "Hello"}]101) as stream:102for text in stream.text_stream:103print(text, end="", flush=True)104105# Get full message after streaming106final_message = stream.get_final_message()107print(f"\n\nTokens used: {final_message.usage.output_tokens}")108```109110---111112## Streaming with Progress Updates113114```python115def stream_with_progress(client, **kwargs):116"""Stream a response with progress updates."""117total_tokens = 0118content_parts = []119120with client.messages.stream(**kwargs) as stream:121for event in stream:122if event.type == "content_block_delta":123if event.delta.type == "text_delta":124text = event.delta.text125content_parts.append(text)126print(text, end="", flush=True)127128elif event.type == "message_delta":129if event.usage and event.usage.output_tokens is not None:130total_tokens = event.usage.output_tokens131132final_message = stream.get_final_message()133134print(f"\n\n[Tokens used: {total_tokens}]")135return "".join(content_parts)136```137138---139140## Error Handling in Streams141142```python143try:144with client.messages.stream(145model="claude-opus-4-8",146max_tokens=64000,147messages=[{"role": "user", "content": "Write a story"}]148) as stream:149for text in stream.text_stream:150print(text, end="", flush=True)151except anthropic.APIConnectionError:152print("\nConnection lost. Please retry.")153except anthropic.RateLimitError:154print("\nRate limited. Please wait and retry.")155except anthropic.APIStatusError as e:156print(f"\nAPI error: {e.status_code}")157```158159---160161## Stream Event Types162163| Event Type | Description | When it fires |164| --------------------- | --------------------------- | --------------------------------- |165| `message_start` | Contains message metadata | Once at the beginning |166| `content_block_start` | New content block beginning | When a text/tool_use block starts |167| `content_block_delta` | Incremental content update | For each token/chunk |168| `content_block_stop` | Content block complete | When a block finishes |169| `message_delta` | Message-level updates | Contains `stop_reason`, usage |170| `message_stop` | Message complete | Once at the end |171172## Best Practices1731741. **Always flush output** — Use `flush=True` to show tokens immediately1752. **Handle partial responses** — If the stream is interrupted, you may have incomplete content1763. **Track token usage** — The `message_delta` event contains usage information1774. **Use timeouts** — Set appropriate timeouts for your application1785. **Default to streaming** — Use `.get_final_message()` to get the complete response even when streaming, giving you timeout protection without needing to handle individual events1796. **Large `max_tokens` without streaming raises `ValueError`** — The SDK refuses non-streaming requests it estimates will exceed ~10 minutes (idle connections drop). Pass `stream=True` / use `messages.stream()`, or explicitly override `timeout`, to suppress the guard.180