Loading source
Pulling the file list, source metadata, and syntax-aware rendering for this listing.
Source from repo
A comprehensive collection of Agent Skills for context engineering, multi-agent architectures, and production agent systems.
Files
Skill
Size
Entrypoint
Format
Open file
Syntax-highlighted preview of this file as included in the skill package.
examples/interleaved-thinking/reasoning_trace_optimizer/capture.py
1"""2TraceCapture: Wraps M2.1 API to capture interleaved thinking traces.34This module provides the core functionality for executing agent tasks5through MiniMax M2.1 while capturing all reasoning traces for analysis.6"""78import json9import os10import uuid11from datetime import datetime12from typing import Any, Callable1314import anthropic1516from reasoning_trace_optimizer.models import (17ReasoningTrace,18ThinkingBlock,19ToolCall,20)212223class TraceCapture:24"""25Captures reasoning traces from MiniMax M2.1's interleaved thinking.2627This class wraps the Anthropic SDK configured for M2.1 and captures28all thinking blocks, tool calls, and responses during agent execution.2930Example:31```python32capture = TraceCapture()33trace = capture.run(34task="What's the weather in San Francisco?",35tools=[weather_tool],36tool_executor=execute_tool37)38print(f"Captured {len(trace.thinking_blocks)} thinking blocks")39```40"""4142def __init__(43self,44api_key: str | None = None,45base_url: str = "https://api.minimax.io/anthropic",46model: str = "MiniMax-M2.1",47):48"""49Initialize TraceCapture with M2.1 configuration.5051Args:52api_key: MiniMax API key (defaults to ANTHROPIC_API_KEY env var)53base_url: API base URL (international or China endpoint)54model: Model to use (MiniMax-M2.1, MiniMax-M2.1-lightning, MiniMax-M2)55"""56self.model = model57self.client = anthropic.Anthropic(58api_key=api_key or os.environ.get("ANTHROPIC_API_KEY"),59base_url=base_url,60)6162def run(63self,64task: str,65system_prompt: str = "You are a helpful assistant.",66tools: list[dict[str, Any]] | None = None,67tool_executor: Callable[[str, dict], str] | None = None,68max_turns: int = 10,69max_tokens: int = 4096,70) -> ReasoningTrace:71"""72Execute a task and capture the full reasoning trace.7374Args:75task: The user task/query to execute76system_prompt: System prompt for the agent77tools: List of tool definitions in Anthropic format78tool_executor: Function to execute tool calls (name, input) -> result79max_turns: Maximum conversation turns before stopping80max_tokens: Maximum tokens per response8182Returns:83ReasoningTrace containing all thinking blocks, tool calls, and responses84"""85trace = ReasoningTrace(86session_id=str(uuid.uuid4()),87task=task,88system_prompt=system_prompt,89model=self.model,90started_at=datetime.now(),91)9293messages = [{"role": "user", "content": task}]94turn = 09596try:97while turn < max_turns:98# Build request parameters99params = {100"model": self.model,101"max_tokens": max_tokens,102"system": system_prompt,103"messages": messages,104}105if tools:106params["tools"] = tools107108# Make API call109response = self.client.messages.create(**params)110111# Process response content blocks112thinking_blocks, text_blocks, tool_use_blocks = self._process_response(113response, turn, trace114)115116# If no tool calls, we're done117if not tool_use_blocks:118trace.final_response = (119text_blocks[0].text if text_blocks else None120)121trace.success = True122break123124# Append assistant response to history (CRITICAL for M2.1)125messages.append({"role": "assistant", "content": response.content})126127# Execute tools and collect results128tool_results = []129for tool_block in tool_use_blocks:130result = self._execute_tool(131tool_block, tool_executor, turn, trace132)133tool_results.append(134{135"type": "tool_result",136"tool_use_id": tool_block.id,137"content": result,138}139)140141# Add tool results to messages142messages.append({"role": "user", "content": tool_results})143144turn += 1145trace.total_turns = turn146147# Check if we hit max turns without completion148if turn >= max_turns and not trace.success:149trace.success = False150trace.error = f"Reached maximum turns ({max_turns}) without completion"151152except Exception as e:153trace.success = False154trace.error = str(e)155156trace.completed_at = datetime.now()157return trace158159def _process_response(160self,161response: anthropic.types.Message,162turn: int,163trace: ReasoningTrace,164) -> tuple[list, list, list]:165"""Process response content blocks and update trace."""166thinking_blocks = []167text_blocks = []168tool_use_blocks = []169170for block in response.content:171if block.type == "thinking":172thinking = ThinkingBlock(173content=block.thinking,174turn_index=turn,175signature=getattr(block, "signature", None),176)177trace.thinking_blocks.append(thinking)178thinking_blocks.append(block)179180elif block.type == "text":181text_blocks.append(block)182183elif block.type == "tool_use":184tool_use_blocks.append(block)185186# Update token count187trace.total_tokens += response.usage.input_tokens + response.usage.output_tokens188189return thinking_blocks, text_blocks, tool_use_blocks190191def _execute_tool(192self,193tool_block: Any,194executor: Callable[[str, dict], str] | None,195turn: int,196trace: ReasoningTrace,197) -> str:198"""Execute a tool call and record it in the trace."""199tool_call = ToolCall(200id=tool_block.id,201name=tool_block.name,202input=tool_block.input,203turn_index=turn,204)205206try:207if executor:208result = executor(tool_block.name, tool_block.input)209else:210result = f"[Mock result for {tool_block.name}]"211212tool_call.result = result213tool_call.success = True214215except Exception as e:216result = f"Error: {str(e)}"217tool_call.result = result218tool_call.success = False219tool_call.error = str(e)220221trace.tool_calls.append(tool_call)222223# Link thinking to tool call224if trace.thinking_blocks:225last_thinking = trace.thinking_blocks[-1]226if last_thinking.turn_index == turn:227last_thinking.following_action = f"tool_use:{tool_block.name}"228229return result230231def run_streaming(232self,233task: str,234system_prompt: str = "You are a helpful assistant.",235tools: list[dict[str, Any]] | None = None,236tool_executor: Callable[[str, dict], str] | None = None,237max_turns: int = 10,238max_tokens: int = 4096,239on_thinking: Callable[[str], None] | None = None,240on_text: Callable[[str], None] | None = None,241on_tool_call: Callable[[str, dict], None] | None = None,242on_error: Callable[[str], None] | None = None,243) -> ReasoningTrace:244"""245Execute a task with streaming output and capture reasoning trace.246247Similar to run() but streams thinking and text content in real-time248via callback functions.249250Note: For multi-turn tool interactions, the non-streaming run() method251is recommended as it provides more reliable trace capture. Use this252method when you need real-time display of thinking/text content.253254Args:255task: The user task/query to execute256system_prompt: System prompt for the agent257tools: List of tool definitions258tool_executor: Function to execute tool calls259max_turns: Maximum conversation turns260max_tokens: Maximum tokens per response261on_thinking: Callback for thinking content chunks262on_text: Callback for text content chunks263on_tool_call: Callback when tool is called (name, input)264on_error: Callback when an error occurs (error message)265266Returns:267ReasoningTrace containing the full captured trace268"""269trace = ReasoningTrace(270session_id=str(uuid.uuid4()),271task=task,272system_prompt=system_prompt,273model=self.model,274started_at=datetime.now(),275)276277messages = [{"role": "user", "content": task}]278turn = 0279280try:281while turn < max_turns:282params = {283"model": self.model,284"max_tokens": max_tokens,285"system": system_prompt,286"messages": messages,287"stream": True,288}289if tools:290params["tools"] = tools291292# Collect streamed content293thinking_buffer = ""294text_buffer = ""295tool_use_blocks = []296current_content = []297298with self.client.messages.stream(**params) as stream:299for event in stream:300if event.type == "content_block_start":301if hasattr(event, "content_block"):302current_content.append(event.content_block)303304elif event.type == "content_block_delta":305if hasattr(event, "delta"):306if event.delta.type == "thinking_delta":307chunk = event.delta.thinking308thinking_buffer += chunk309if on_thinking:310on_thinking(chunk)311312elif event.delta.type == "text_delta":313chunk = event.delta.text314text_buffer += chunk315if on_text:316on_text(chunk)317318# Get final message for tool_use blocks319final_message = stream.get_final_message()320for block in final_message.content:321if block.type == "tool_use":322tool_use_blocks.append(block)323if on_tool_call:324on_tool_call(block.name, block.input)325326# Record thinking block327if thinking_buffer:328trace.thinking_blocks.append(329ThinkingBlock(330content=thinking_buffer,331turn_index=turn,332)333)334335# Update tokens336trace.total_tokens += (337final_message.usage.input_tokens + final_message.usage.output_tokens338)339340# If no tool calls, we're done341if not tool_use_blocks:342trace.final_response = text_buffer or None343trace.success = True344break345346# Append to history347messages.append({"role": "assistant", "content": final_message.content})348349# Execute tools350tool_results = []351for tool_block in tool_use_blocks:352result = self._execute_tool(tool_block, tool_executor, turn, trace)353tool_results.append(354{355"type": "tool_result",356"tool_use_id": tool_block.id,357"content": result,358}359)360361messages.append({"role": "user", "content": tool_results})362turn += 1363trace.total_turns = turn364365if turn >= max_turns and not trace.success:366trace.success = False367trace.error = f"Reached maximum turns ({max_turns})"368369except Exception as e:370trace.success = False371trace.error = str(e)372if on_error:373on_error(str(e))374375trace.completed_at = datetime.now()376return trace377378379def format_trace_for_display(trace: ReasoningTrace) -> str:380"""Format a reasoning trace for human-readable display."""381lines = [382f"Session: {trace.session_id}",383f"Task: {trace.task}",384f"Model: {trace.model}",385f"Status: {'Success' if trace.success else 'Failed'}",386f"Turns: {trace.total_turns}",387f"Tokens: {trace.total_tokens}",388"",389"=" * 60,390"REASONING TRACE",391"=" * 60,392]393394for i, thinking in enumerate(trace.thinking_blocks):395lines.append(f"\n[Turn {thinking.turn_index}] Thinking:")396lines.append("-" * 40)397lines.append(thinking.content[:500] + "..." if len(thinking.content) > 500 else thinking.content)398399# Show tool calls at this turn400turn_tools = trace.get_tool_calls_at_turn(thinking.turn_index)401for tool in turn_tools:402lines.append(f"\n Tool: {tool.name}({json.dumps(tool.input)})")403lines.append(f" Result: {tool.result[:100]}..." if tool.result and len(tool.result) > 100 else f" Result: {tool.result}")404405if trace.final_response:406lines.append("\n" + "=" * 60)407lines.append("FINAL RESPONSE")408lines.append("=" * 60)409lines.append(trace.final_response)410411if trace.error:412lines.append("\n" + "=" * 60)413lines.append("ERROR")414lines.append("=" * 60)415lines.append(trace.error)416417return "\n".join(lines)418