Loading source
Pulling the file list, source metadata, and syntax-aware rendering for this listing.
Source from repo
A comprehensive collection of Agent Skills for context engineering, multi-agent architectures, and production agent systems.
Files
Skill
Size
Entrypoint
Format
Open file
Syntax-highlighted preview of this file as included in the skill package.
skills/context-compression/scripts/compression_evaluator.py
1"""2Context Compression Evaluation34Public API for evaluating context compression quality using probe-based5assessment. This module provides three composable components:67- **ProbeGenerator**: Extracts factual claims, file operations, and decisions8from conversation history, then generates typed probes for evaluation.9Use when: building a compression evaluation pipeline and needing to10automatically derive test questions from raw conversation history.1112- **CompressionEvaluator**: Scores probe responses against a multi-dimensional13rubric (accuracy, context awareness, artifact trail, completeness,14continuity, instruction following). Use when: comparing compression methods15or validating that a compression strategy preserves critical information.1617- **StructuredSummarizer**: Implements anchored iterative summarization with18explicit sections for session intent, file tracking, decisions, and next19steps. Use when: compressing long-running coding sessions where file20tracking and decision rationale must survive compression.2122Top-level convenience function:23- **evaluate_compression_quality**: End-to-end pipeline that generates probes,24collects model responses, evaluates them, and returns a scored summary with25recommendations. Use when: running a one-shot compression quality check26without wiring up individual components.2728PRODUCTION NOTES:29- The LLM judge calls are stubbed for demonstration. Production systems30should implement actual API calls to a frontier model.31- Token estimation uses simplified heuristics. Production systems should32use model-specific tokenizers.33- Ground truth extraction uses pattern matching. Production systems may34benefit from more sophisticated fact extraction.35"""3637from dataclasses import dataclass, field38from typing import List, Dict, Optional, Callable39from enum import Enum40import json41import re4243__all__ = [44"ProbeType",45"Probe",46"CriterionResult",47"EvaluationResult",48"RUBRIC_CRITERIA",49"ProbeGenerator",50"CompressionEvaluator",51"StructuredSummarizer",52"evaluate_compression_quality",53]545556class ProbeType(Enum):57"""Types of evaluation probes for compression quality assessment."""58RECALL = "recall"59ARTIFACT = "artifact"60CONTINUATION = "continuation"61DECISION = "decision"626364@dataclass65class Probe:66"""A probe question for evaluating compression quality.6768Use when: constructing evaluation inputs for CompressionEvaluator.69Each probe targets a specific information category that compression70may have lost.71"""72probe_type: ProbeType73question: str74ground_truth: Optional[str] = None75context_reference: Optional[str] = None767778@dataclass79class CriterionResult:80"""Result for a single evaluation criterion."""81criterion_id: str82score: float83reasoning: str848586@dataclass87class EvaluationResult:88"""Complete evaluation result for a probe response.8990Contains per-criterion scores, per-dimension aggregates, and an91overall aggregate score.92"""93probe: Probe94response: str95criterion_results: List[CriterionResult]96aggregate_score: float97dimension_scores: Dict[str, float] = field(default_factory=dict)9899100# Evaluation Rubrics101102RUBRIC_CRITERIA: Dict[str, List[Dict]] = {103"accuracy": [104{105"id": "accuracy_factual",106"question": "Are facts, file paths, and technical details correct?",107"weight": 0.6108},109{110"id": "accuracy_technical",111"question": "Are code references and technical concepts correct?",112"weight": 0.4113}114],115"context_awareness": [116{117"id": "context_conversation_state",118"question": "Does the response reflect current conversation state?",119"weight": 0.5120},121{122"id": "context_artifact_state",123"question": "Does the response reflect which files/artifacts were accessed?",124"weight": 0.5125}126],127"artifact_trail": [128{129"id": "artifact_files_created",130"question": "Does the agent know which files were created?",131"weight": 0.3132},133{134"id": "artifact_files_modified",135"question": "Does the agent know which files were modified?",136"weight": 0.4137},138{139"id": "artifact_key_details",140"question": "Does the agent remember function names, variable names, error messages?",141"weight": 0.3142}143],144"completeness": [145{146"id": "completeness_coverage",147"question": "Does the response address all parts of the question?",148"weight": 0.6149},150{151"id": "completeness_depth",152"question": "Is sufficient detail provided?",153"weight": 0.4154}155],156"continuity": [157{158"id": "continuity_work_state",159"question": "Can the agent continue without re-fetching information?",160"weight": 0.4161},162{163"id": "continuity_todo_state",164"question": "Does the agent maintain awareness of pending tasks?",165"weight": 0.3166},167{168"id": "continuity_reasoning",169"question": "Does the agent retain rationale behind previous decisions?",170"weight": 0.3171}172],173"instruction_following": [174{175"id": "instruction_format",176"question": "Does the response follow the requested format?",177"weight": 0.5178},179{180"id": "instruction_constraints",181"question": "Does the response respect stated constraints?",182"weight": 0.5183}184]185}186187188class ProbeGenerator:189"""Generate typed probes from conversation history.190191Use when: automatically deriving evaluation questions from raw192conversation history at compression points. Extracts facts, file193operations, and decisions via pattern matching, then produces194one probe per category.195196For production systems, replace the regex-based extraction with197an LLM-based extractor for higher recall.198"""199200def __init__(self, conversation_history: str) -> None:201self.history = conversation_history202self.extracted_facts = self._extract_facts()203self.extracted_files = self._extract_files()204self.extracted_decisions = self._extract_decisions()205206def generate_probes(self) -> List[Probe]:207"""Generate all probe types for evaluation.208209Use when: preparing evaluation inputs at a compression point.210Returns one probe per category (recall, artifact, continuation,211decision) based on extractable content from the history.212"""213probes: List[Probe] = []214215# Recall probes216if self.extracted_facts:217probes.append(Probe(218probe_type=ProbeType.RECALL,219question="What was the original error or issue that started this session?",220ground_truth=self.extracted_facts.get("original_error"),221context_reference="session_start"222))223224# Artifact probes225if self.extracted_files:226probes.append(Probe(227probe_type=ProbeType.ARTIFACT,228question="Which files have we modified? Describe what changed in each.",229ground_truth=json.dumps(self.extracted_files),230context_reference="file_operations"231))232233# Continuation probes234probes.append(Probe(235probe_type=ProbeType.CONTINUATION,236question="What should we do next?",237ground_truth=self.extracted_facts.get("next_steps"),238context_reference="task_state"239))240241# Decision probes242if self.extracted_decisions:243probes.append(Probe(244probe_type=ProbeType.DECISION,245question="What key decisions did we make and why?",246ground_truth=json.dumps(self.extracted_decisions),247context_reference="decision_points"248))249250return probes251252def _extract_facts(self) -> Dict[str, str]:253"""Extract factual claims from history."""254facts: Dict[str, str] = {}255256# Extract error patterns257error_patterns = [258r"error[:\s]+(.+?)(?:\n|$)",259r"(\d{3})\s+(Unauthorized|Not Found|Internal Server Error)",260r"exception[:\s]+(.+?)(?:\n|$)"261]262263for pattern in error_patterns:264match = re.search(pattern, self.history, re.IGNORECASE)265if match:266facts["original_error"] = match.group(0).strip()267break268269# Extract next steps270next_step_patterns = [271r"next[:\s]+(.+?)(?:\n|$)",272r"TODO[:\s]+(.+?)(?:\n|$)",273r"remaining[:\s]+(.+?)(?:\n|$)"274]275276for pattern in next_step_patterns:277match = re.search(pattern, self.history, re.IGNORECASE)278if match:279facts["next_steps"] = match.group(0).strip()280break281282return facts283284def _extract_files(self) -> List[Dict[str, str]]:285"""Extract file operations from history."""286files: List[Dict[str, str]] = []287288# Common file patterns289file_patterns = [290r"(?:modified|changed|updated|edited)\s+([^\s]+\.[a-z]+)",291r"(?:created|added)\s+([^\s]+\.[a-z]+)",292r"(?:read|examined|opened)\s+([^\s]+\.[a-z]+)"293]294295for pattern in file_patterns:296matches = re.findall(pattern, self.history, re.IGNORECASE)297for match in matches:298if match not in [f["path"] for f in files]:299files.append({300"path": match,301"operation": "modified" if "modif" in pattern else "created" if "creat" in pattern else "read"302})303304return files305306def _extract_decisions(self) -> List[Dict[str, str]]:307"""Extract decision points from history."""308decisions: List[Dict[str, str]] = []309310decision_patterns = [311r"decided to\s+(.+?)(?:\n|$)",312r"chose\s+(.+?)(?:\n|$)",313r"going with\s+(.+?)(?:\n|$)",314r"will use\s+(.+?)(?:\n|$)"315]316317for pattern in decision_patterns:318matches = re.findall(pattern, self.history, re.IGNORECASE)319for match in matches:320decisions.append({321"decision": match.strip(),322"context": pattern.split("\\s+")[0]323})324325return decisions[:5] # Limit to 5 decisions326327328class CompressionEvaluator:329"""Evaluate compression quality using probes and LLM judge.330331Use when: comparing compression methods or validating that a specific332compression pass preserved critical information. Scores responses333across six dimensions (accuracy, context awareness, artifact trail,334completeness, continuity, instruction following) and produces an335aggregate quality score.336337The evaluate() method is the primary entry point. Call it once per338probe, then call get_summary() to retrieve aggregated results.339"""340341def __init__(self, model: str = "gpt-5.2") -> None:342self.model = model343self.results: List[EvaluationResult] = []344345def evaluate(self,346probe: Probe,347response: str,348compressed_context: str) -> EvaluationResult:349"""Evaluate a single probe response against the rubric.350351Use when: scoring how well a model's response (given compressed352context) answers a probe question. Returns per-criterion scores,353per-dimension aggregates, and an overall score.354355Args:356probe: The probe question with expected ground truth.357response: The model's response to evaluate.358compressed_context: The compressed context that was provided359to the model when generating the response.360361Returns:362EvaluationResult with scores and reasoning across all363applicable dimensions.364"""365# Get relevant criteria based on probe type366criteria = self._get_criteria_for_probe(probe.probe_type)367368# Evaluate each criterion369criterion_results: List[CriterionResult] = []370for criterion in criteria:371result = self._evaluate_criterion(372criterion,373probe,374response,375compressed_context376)377criterion_results.append(result)378379# Calculate dimension scores380dimension_scores = self._calculate_dimension_scores(criterion_results)381382# Calculate aggregate score383aggregate_score = sum(dimension_scores.values()) / len(dimension_scores) if dimension_scores else 0.0384385result = EvaluationResult(386probe=probe,387response=response,388criterion_results=criterion_results,389aggregate_score=aggregate_score,390dimension_scores=dimension_scores391)392393self.results.append(result)394return result395396def get_summary(self) -> Dict:397"""Get summary of all evaluation results.398399Use when: all probes have been evaluated and an aggregate400report is needed to compare methods or make a go/no-go401decision on a compression strategy.402403Returns:404Dictionary with total evaluations, average score,405per-dimension averages, and weakest/strongest dimensions.406"""407if not self.results:408return {"error": "No evaluations performed"}409410avg_score = sum(r.aggregate_score for r in self.results) / len(self.results)411412# Average dimension scores413dimension_totals: Dict[str, float] = {}414dimension_counts: Dict[str, int] = {}415416for result in self.results:417for dim, score in result.dimension_scores.items():418dimension_totals[dim] = dimension_totals.get(dim, 0) + score419dimension_counts[dim] = dimension_counts.get(dim, 0) + 1420421avg_dimensions = {422dim: dimension_totals[dim] / dimension_counts[dim]423for dim in dimension_totals424}425426return {427"total_evaluations": len(self.results),428"average_score": avg_score,429"dimension_averages": avg_dimensions,430"weakest_dimension": min(avg_dimensions, key=avg_dimensions.get) if avg_dimensions else None,431"strongest_dimension": max(avg_dimensions, key=avg_dimensions.get) if avg_dimensions else None,432}433434def _get_criteria_for_probe(self, probe_type: ProbeType) -> List[Dict]:435"""Get relevant criteria for probe type."""436criteria: List[Dict] = []437438# All probes get accuracy and completeness439criteria.extend(RUBRIC_CRITERIA["accuracy"])440criteria.extend(RUBRIC_CRITERIA["completeness"])441442# Add type-specific criteria443if probe_type == ProbeType.ARTIFACT:444criteria.extend(RUBRIC_CRITERIA["artifact_trail"])445elif probe_type == ProbeType.CONTINUATION:446criteria.extend(RUBRIC_CRITERIA["continuity"])447elif probe_type == ProbeType.RECALL:448criteria.extend(RUBRIC_CRITERIA["context_awareness"])449elif probe_type == ProbeType.DECISION:450criteria.extend(RUBRIC_CRITERIA["context_awareness"])451criteria.extend(RUBRIC_CRITERIA["continuity"])452453criteria.extend(RUBRIC_CRITERIA["instruction_following"])454455return criteria456457def _evaluate_criterion(self,458criterion: Dict,459probe: Probe,460response: str,461context: str) -> CriterionResult:462"""463Evaluate a single criterion using LLM judge.464465PRODUCTION NOTE: This is a stub implementation.466Production systems should call the actual LLM API:467468```python469result = openai.chat.completions.create(470model="gpt-5.2",471messages=[472{"role": "system", "content": JUDGE_SYSTEM_PROMPT},473{"role": "user", "content": self._format_judge_input(criterion, probe, response, context)}474]475)476return self._parse_judge_output(result)477```478"""479# Stub implementation - in production, call LLM judge480score = self._heuristic_score(criterion, response, probe.ground_truth)481reasoning = f"Evaluated {criterion['id']} based on response content."482483return CriterionResult(484criterion_id=criterion["id"],485score=score,486reasoning=reasoning487)488489def _heuristic_score(self,490criterion: Dict,491response: str,492ground_truth: Optional[str]) -> float:493"""494Heuristic scoring for demonstration.495496Production systems should use LLM judge instead.497"""498score = 3.0 # Base score499500# Adjust based on response length and content501if len(response) < 50:502score -= 1.0 # Too short503elif len(response) > 500:504score += 0.5 # Detailed505506# Check for technical content507if any(ext in response for ext in [".ts", ".py", ".js", ".md"]):508score += 0.5 # Contains file references509510overlap_ratio = self._ground_truth_overlap_ratio(response, ground_truth)511if overlap_ratio >= 0.75:512score += 1.0513elif overlap_ratio >= 0.4:514score += 0.5515elif ground_truth:516score -= 0.5517518return min(5.0, max(0.0, score))519520def _ground_truth_overlap_ratio(self,521response: str,522ground_truth: Optional[str]) -> float:523if not ground_truth:524return 0.0525526terms = self._extract_ground_truth_terms(ground_truth)527if not terms:528return 1.0 if ground_truth.lower() in response.lower() else 0.0529530response_lower = response.lower()531matches = sum(1 for term in terms if term in response_lower)532return matches / len(terms)533534def _extract_ground_truth_terms(self, ground_truth: str) -> List[str]:535try:536parsed = json.loads(ground_truth)537except json.JSONDecodeError:538return [ground_truth.lower()] if ground_truth.strip() else []539540terms: List[str] = []541542def collect(value) -> None:543if isinstance(value, str):544normalized = value.strip().lower()545if normalized:546terms.append(normalized)547elif isinstance(value, dict):548for nested in value.values():549collect(nested)550elif isinstance(value, list):551for nested in value:552collect(nested)553554collect(parsed)555return list(dict.fromkeys(terms))556557def _calculate_dimension_scores(self,558criterion_results: List[CriterionResult]) -> Dict[str, float]:559"""Calculate dimension scores from criterion results."""560dimension_scores: Dict[str, float] = {}561562for dimension, criteria in RUBRIC_CRITERIA.items():563criterion_ids = [c["id"] for c in criteria]564relevant_results = [565r for r in criterion_results566if r.criterion_id in criterion_ids567]568569if relevant_results:570# Weighted average571total_weight = sum(572c["weight"] for c in criteria573if c["id"] in [r.criterion_id for r in relevant_results]574)575weighted_sum = sum(576r.score * next(c["weight"] for c in criteria if c["id"] == r.criterion_id)577for r in relevant_results578)579dimension_scores[dimension] = weighted_sum / total_weight if total_weight > 0 else 0.0580581return dimension_scores582583584class StructuredSummarizer:585"""Generate structured summaries with explicit sections.586587Use when: implementing anchored iterative summarization for588long-running coding sessions. Maintains a persistent summary589with dedicated sections for session intent, file modifications,590decisions, current state, and next steps.591592Call update_from_span() each time a new content span is truncated.593The summarizer merges new information into existing sections rather594than regenerating, preventing cumulative detail loss.595"""596597TEMPLATE = """## Session Intent598{intent}599600## Files Modified601{files_modified}602603## Files Read (Not Modified)604{files_read}605606## Decisions Made607{decisions}608609## Current State610{current_state}611612## Next Steps613{next_steps}614"""615616def __init__(self) -> None:617self.sections: Dict = {618"intent": "",619"files_modified": [],620"files_read": [],621"decisions": [],622"current_state": "",623"next_steps": []624}625626def update_from_span(self, new_content: str) -> str:627"""Update summary from newly truncated content span.628629Use when: a compression trigger fires and a portion of630conversation history is about to be discarded. Pass the631content that will be truncated; the summarizer extracts632structured information and merges it with prior state.633634Args:635new_content: The conversation span being truncated.636637Returns:638Formatted summary string with all sections populated.639"""640# Extract information from new content641new_info = self._extract_from_content(new_content)642643# Merge with existing sections644self._merge_sections(new_info)645646# Generate formatted summary647return self._format_summary()648649def _extract_from_content(self, content: str) -> Dict:650"""Extract structured information from content."""651extracted: Dict = {652"intent": "",653"files_modified": [],654"files_read": [],655"decisions": [],656"current_state": "",657"next_steps": []658}659660# Extract file modifications661mod_pattern = r"(?:modified|changed|updated|fixed)\s+([^\s]+\.[a-z]+)[:\s]*(.+?)(?:\n|$)"662for match in re.finditer(mod_pattern, content, re.IGNORECASE):663extracted["files_modified"].append({664"path": match.group(1),665"change": match.group(2).strip()[:100]666})667668# Extract file reads669read_pattern = r"(?:read|examined|opened|checked)\s+([^\s]+\.[a-z]+)"670for match in re.finditer(read_pattern, content, re.IGNORECASE):671file_path = match.group(1)672if file_path not in [f["path"] for f in extracted["files_modified"]]:673extracted["files_read"].append(file_path)674675# Extract decisions676decision_pattern = r"(?:decided|chose|going with|will use)\s+(.+?)(?:\n|$)"677for match in re.finditer(decision_pattern, content, re.IGNORECASE):678extracted["decisions"].append(match.group(1).strip()[:150])679680return extracted681682def _merge_sections(self, new_info: Dict) -> None:683"""Merge new information with existing sections."""684# Update intent if empty685if new_info["intent"] and not self.sections["intent"]:686self.sections["intent"] = new_info["intent"]687688# Merge file lists (deduplicate by path)689existing_mod_paths = [f["path"] for f in self.sections["files_modified"]]690for file_info in new_info["files_modified"]:691if file_info["path"] not in existing_mod_paths:692self.sections["files_modified"].append(file_info)693694# Merge read files695for file_path in new_info["files_read"]:696if file_path not in self.sections["files_read"]:697self.sections["files_read"].append(file_path)698699# Append decisions700self.sections["decisions"].extend(new_info["decisions"])701702# Update current state (latest wins)703if new_info["current_state"]:704self.sections["current_state"] = new_info["current_state"]705706# Merge next steps707self.sections["next_steps"].extend(new_info["next_steps"])708709def _format_summary(self) -> str:710"""Format sections into summary string."""711files_modified_str = "\n".join(712f"- {f['path']}: {f['change']}"713for f in self.sections["files_modified"]714) or "None"715716files_read_str = "\n".join(717f"- {f}" for f in self.sections["files_read"]718) or "None"719720decisions_str = "\n".join(721f"- {d}" for d in self.sections["decisions"][-5:] # Keep last 5722) or "None"723724next_steps_str = "\n".join(725f"{i+1}. {s}" for i, s in enumerate(self.sections["next_steps"][-5:])726) or "None"727728return self.TEMPLATE.format(729intent=self.sections["intent"] or "Not specified",730files_modified=files_modified_str,731files_read=files_read_str,732decisions=decisions_str,733current_state=self.sections["current_state"] or "In progress",734next_steps=next_steps_str735)736737738def evaluate_compression_quality(739original_history: str,740compressed_context: str,741model_response_fn: Callable[[str, str], str],742) -> Dict:743"""Evaluate compression quality for a conversation end-to-end.744745Use when: running a one-shot quality check on a compression pass.746Generates probes from original history, collects model responses747using the compressed context, evaluates each response, and returns748a scored summary with actionable recommendations.749750Args:751original_history: The full conversation before compression.752compressed_context: The compressed version to evaluate.753model_response_fn: Callable that takes (compressed_context, question)754and returns the model's response string.755756Returns:757Dictionary with total evaluations, average score, per-dimension758averages, weakest/strongest dimensions, and recommendations list.759"""760# Generate probes761generator = ProbeGenerator(original_history)762probes = generator.generate_probes()763764# Evaluate each probe765evaluator = CompressionEvaluator()766767for probe in probes:768# Get model response using compressed context769response = model_response_fn(compressed_context, probe.question)770771# Evaluate response772evaluator.evaluate(probe, response, compressed_context)773774# Get summary775summary = evaluator.get_summary()776777# Add recommendations778summary["recommendations"] = []779780if summary.get("weakest_dimension") == "artifact_trail":781summary["recommendations"].append(782"Consider implementing separate artifact tracking outside compression"783)784785if summary.get("average_score", 0) < 3.5:786summary["recommendations"].append(787"Compression quality is below threshold - consider less aggressive compression"788)789790return summary791792793if __name__ == "__main__":794# Demo: generate probes and evaluate a sample compression795796sample_history = """797User reported error: 401 Unauthorized on /api/auth/login endpoint.798Examined auth.controller.ts - JWT generation looks correct.799Examined middleware/cors.ts - no issues found.800Modified config/redis.ts: Fixed connection pooling configuration.801Modified services/session.service.ts: Added retry logic for transient failures.802Decided to use Redis connection pool instead of per-request connections.803Modified tests/auth.test.ts: Updated mock setup for new config.80414 tests passing, 2 failing (mock setup issues).805Next: Fix remaining test failures in session service mocks.806"""807808sample_compressed = """809## Session Intent810Debug 401 Unauthorized on /api/auth/login.811812## Root Cause813Stale Redis connection in session store.814815## Files Modified816- config/redis.ts: Fixed connection pooling817- services/session.service.ts: Added retry logic818- tests/auth.test.ts: Updated mock setup819820## Test Status82114 passing, 2 failing822823## Next Steps8241. Fix remaining test failures825"""826827# Stub model response function828def mock_model_response(context: str, question: str) -> str:829if "error" in question.lower():830return "The original error was a 401 Unauthorized on /api/auth/login."831if "files" in question.lower():832return "Modified config/redis.ts, services/session.service.ts, tests/auth.test.ts."833if "next" in question.lower():834return "Fix remaining test failures in session service mocks."835if "decision" in question.lower():836return "Decided to use Redis connection pool instead of per-request connections."837return "No specific information available."838839# Run evaluation840result = evaluate_compression_quality(841original_history=sample_history,842compressed_context=sample_compressed,843model_response_fn=mock_model_response,844)845846print("=== Compression Quality Evaluation ===")847print(f"Total evaluations: {result['total_evaluations']}")848print(f"Average score: {result['average_score']:.2f}")849print()850print("Dimension averages:")851for dim, score in result.get("dimension_averages", {}).items():852print(f" {dim}: {score:.2f}")853print()854print(f"Weakest dimension: {result.get('weakest_dimension')}")855print(f"Strongest dimension: {result.get('strongest_dimension')}")856print()857if result.get("recommendations"):858print("Recommendations:")859for rec in result["recommendations"]:860print(f" - {rec}")861else:862print("No recommendations - compression quality looks acceptable.")863