Loading source
Pulling the file list, source metadata, and syntax-aware rendering for this listing.
Source from repo
Enterprise-grade research with multi-source synthesis, citation tracking, and verification. 8-phase pipeline with auto-continuation.
Files
Skill
Size
Entrypoint
Format
Open file
Syntax-highlighted preview of this file as included in the skill package.
scripts/extract_claims.py
1#!/usr/bin/env python32"""3Atomic Claim Extractor — decomposes report sections into typed claims.45CLI subcommands:6extract Parse a markdown report into atomic claims (claims.jsonl)7add Manually add a single claim8list List claims, optionally filtered by section or type9stats Show claim statistics (counts by type/status)1011Claim identity:12claim_id = sha256(section_id + normalized_text)[:16]1314Claim types (per GPT Pro's refinement of Codex's proposal):15- factual: hard-fails on lack of support16- synthesis: needs traceability, softer threshold17- recommendation: needs traceability, softer threshold18- speculation: labeled, no support gate19"""2021import argparse22import hashlib23import json24import os25import re26import sys27from datetime import datetime, timezone282930# ---------------------------------------------------------------------------31# Claim ID computation32# ---------------------------------------------------------------------------3334_WHITESPACE_RE = re.compile(r'\s+')353637def normalize_text(text: str) -> str:38"""Normalize for stable hashing."""39return _WHITESPACE_RE.sub(' ', text.strip()).lower()404142def compute_claim_id(section_id: str, text: str) -> str:43"""sha256(section_id + normalized_text)[:16] hex."""44payload = section_id + normalize_text(text)45return hashlib.sha256(payload.encode('utf-8')).hexdigest()[:16]464748# ---------------------------------------------------------------------------49# JSONL helpers50# ---------------------------------------------------------------------------5152def append_jsonl(path: str, obj: dict) -> None:53with open(path, 'a') as f:54f.write(json.dumps(obj, ensure_ascii=False) + '\n')555657def read_jsonl(path: str) -> list[dict]:58rows = []59if not os.path.exists(path):60return rows61with open(path) as f:62for line in f:63line = line.strip()64if line:65rows.append(json.loads(line))66return rows676869# ---------------------------------------------------------------------------70# Report parsing helpers71# ---------------------------------------------------------------------------7273# Section header patterns74SECTION_PATTERNS = [75(re.compile(r'^##\s+Executive\s+Summary', re.I), 'executive_summary'),76(re.compile(r'^##\s+Introduction', re.I), 'introduction'),77(re.compile(r'^##\s+Finding\s+(\d+)', re.I), lambda m: f'finding_{m.group(1)}'),78(re.compile(r'^##\s+Synthesis', re.I), 'synthesis'),79(re.compile(r'^##\s+Limitations', re.I), 'limitations'),80(re.compile(r'^##\s+Recommendations', re.I), 'recommendations'),81(re.compile(r'^##\s+Conclusion', re.I), 'conclusion'),82(re.compile(r'^##\s+(.+)', re.I), lambda m: re.sub(r'\W+', '_', m.group(1).strip().lower())[:30]),83]8485# Citation pattern [N] or [N, M]86CITATION_RE = re.compile(r'\[(\d+(?:,\s*\d+)*)\]')8788# Sentence splitting (basic but handles abbreviations)89SENTENCE_RE = re.compile(r'(?<=[.!?])\s+(?=[A-Z])')909192def classify_claim(text: str, section_id: str) -> str:93"""Heuristic claim type classification."""94lower = text.lower()9596# Recommendation indicators97if any(w in lower for w in ['should', 'recommend', 'suggest', 'advise', 'consider']):98if section_id == 'recommendations':99return 'recommendation'100return 'recommendation'101102# Speculation indicators103if any(w in lower for w in ['might', 'could potentially', 'it is possible', 'may eventually',104'hypothetically', 'speculatively']):105return 'speculation'106107# Synthesis indicators (often in synthesis/conclusion sections)108if section_id in ('synthesis', 'conclusion', 'limitations'):109if any(w in lower for w in ['overall', 'taken together', 'collectively',110'the evidence suggests', 'this implies']):111return 'synthesis'112113# Default: factual114return 'factual'115116117def parse_sections(markdown: str) -> list[tuple[str, str]]:118"""Parse markdown into (section_id, content) pairs."""119lines = markdown.split('\n')120sections = []121current_id = 'preamble'122current_lines = []123124for line in lines:125matched = False126for pattern, id_or_fn in SECTION_PATTERNS:127m = pattern.match(line)128if m:129if current_lines:130sections.append((current_id, '\n'.join(current_lines)))131current_id = id_or_fn(m) if callable(id_or_fn) else id_or_fn132current_lines = []133matched = True134break135if not matched:136current_lines.append(line)137138if current_lines:139sections.append((current_id, '\n'.join(current_lines)))140141return sections142143144def extract_sentences(text: str) -> list[str]:145"""Split text into sentences, filtering noise."""146# Remove markdown formatting noise147text = re.sub(r'^[-*]\s+', '', text, flags=re.M) # bullet points148text = re.sub(r'\*\*([^*]+)\*\*', r'\1', text) # bold149text = re.sub(r'\*([^*]+)\*', r'\1', text) # italic150151sentences = SENTENCE_RE.split(text)152result = []153for s in sentences:154s = s.strip()155# Filter out very short fragments, headings, empty lines156if len(s) > 30 and not s.startswith('#') and not s.startswith('|'):157result.append(s)158return result159160161# ---------------------------------------------------------------------------162# Subcommands163# ---------------------------------------------------------------------------164165def cmd_extract(args: argparse.Namespace) -> None:166"""Extract atomic claims from a markdown report."""167report_path = args.report168if not os.path.exists(report_path):169print(json.dumps({'error': f'Report not found: {report_path}'}), file=sys.stderr)170sys.exit(1)171172with open(report_path) as f:173markdown = f.read()174175claims_path = os.path.join(args.dir, 'claims.jsonl')176existing_ids = {r['claim_id'] for r in read_jsonl(claims_path)}177178sections = parse_sections(markdown)179added = 0180skipped = 0181182for section_id, content in sections:183if section_id == 'preamble':184continue185sentences = extract_sentences(content)186for sentence in sentences:187claim_id = compute_claim_id(section_id, sentence)188if claim_id in existing_ids:189skipped += 1190continue191192# Extract citation numbers from sentence193citation_nums = []194for m in CITATION_RE.finditer(sentence):195nums = [int(n.strip()) for n in m.group(1).split(',')]196citation_nums.extend(nums)197198claim = {199'claim_id': claim_id,200'section_id': section_id,201'text': sentence,202'claim_type': classify_claim(sentence, section_id),203'cited_source_ids': [], # Populated by linking step204'evidence_ids': [], # Populated by verify_claim_support205'support_status': 'unverified',206'extracted_at': datetime.now(timezone.utc).isoformat(),207'_citation_numbers': citation_nums, # Temporary, for linking208}209append_jsonl(claims_path, claim)210existing_ids.add(claim_id)211added += 1212213print(json.dumps({214'status': 'ok',215'claims_added': added,216'claims_skipped': skipped,217'total_claims': len(existing_ids),218}))219220221def cmd_add(args: argparse.Namespace) -> None:222"""Manually add a single claim."""223data = json.loads(args.json)224section_id = data.get('section_id', 'unknown')225text = data.get('text', '')226if not text:227print(json.dumps({'error': 'text is required'}), file=sys.stderr)228sys.exit(1)229230claim_id = compute_claim_id(section_id, text)231claims_path = os.path.join(args.dir, 'claims.jsonl')232233existing = read_jsonl(claims_path)234for row in existing:235if row.get('claim_id') == claim_id:236print(json.dumps({'status': 'duplicate', 'claim_id': claim_id}))237return238239valid_types = {'factual', 'synthesis', 'recommendation', 'speculation'}240claim_type = data.get('claim_type', 'factual')241if claim_type not in valid_types:242claim_type = 'factual'243244claim = {245'claim_id': claim_id,246'section_id': section_id,247'text': text,248'claim_type': claim_type,249'cited_source_ids': data.get('cited_source_ids', []),250'evidence_ids': data.get('evidence_ids', []),251'support_status': 'unverified',252'extracted_at': datetime.now(timezone.utc).isoformat(),253}254append_jsonl(claims_path, claim)255print(json.dumps({'status': 'added', 'claim_id': claim_id}))256257258def cmd_list(args: argparse.Namespace) -> None:259"""List claims with optional filters."""260claims_path = os.path.join(args.dir, 'claims.jsonl')261rows = read_jsonl(claims_path)262263if args.section:264rows = [r for r in rows if r.get('section_id') == args.section]265if args.type:266rows = [r for r in rows if r.get('claim_type') == args.type]267if args.status:268rows = [r for r in rows if r.get('support_status') == args.status]269270# Deduplicate271seen = set()272unique = []273for r in rows:274cid = r.get('claim_id')275if cid not in seen:276seen.add(cid)277unique.append(r)278279print(json.dumps({'count': len(unique), 'claims': unique}, indent=2, ensure_ascii=False))280281282def cmd_stats(args: argparse.Namespace) -> None:283"""Show claim statistics."""284claims_path = os.path.join(args.dir, 'claims.jsonl')285rows = read_jsonl(claims_path)286287# Deduplicate288seen = set()289unique = []290for r in rows:291cid = r.get('claim_id')292if cid not in seen:293seen.add(cid)294unique.append(r)295296by_type = {}297by_status = {}298by_section = {}299for r in unique:300t = r.get('claim_type', 'unknown')301s = r.get('support_status', 'unknown')302sec = r.get('section_id', 'unknown')303by_type[t] = by_type.get(t, 0) + 1304by_status[s] = by_status.get(s, 0) + 1305by_section[sec] = by_section.get(sec, 0) + 1306307print(json.dumps({308'total': len(unique),309'by_type': by_type,310'by_status': by_status,311'by_section': by_section,312}, indent=2))313314315# ---------------------------------------------------------------------------316# CLI entry point317# ---------------------------------------------------------------------------318319def main() -> None:320parser = argparse.ArgumentParser(321prog='extract_claims',322description='Atomic claim extraction and ledger for deep-research v3.0',323)324sub = parser.add_subparsers(dest='command', required=True)325326# extract327p_ext = sub.add_parser('extract', help='Extract claims from markdown report')328p_ext.add_argument('--report', required=True, help='Path to report.md')329p_ext.add_argument('--dir', required=True, help='Run directory containing claims.jsonl')330331# add332p_add = sub.add_parser('add', help='Manually add a single claim')333p_add.add_argument('--json', required=True, help='JSON with section_id, text, claim_type')334p_add.add_argument('--dir', required=True, help='Run directory')335336# list337p_list = sub.add_parser('list', help='List claims')338p_list.add_argument('--dir', required=True, help='Run directory')339p_list.add_argument('--section', default=None, help='Filter by section_id')340p_list.add_argument('--type', default=None, help='Filter by claim_type')341p_list.add_argument('--status', default=None, help='Filter by support_status')342343# stats344p_stats = sub.add_parser('stats', help='Claim statistics')345p_stats.add_argument('--dir', required=True, help='Run directory')346347args = parser.parse_args()348dispatch = {349'extract': cmd_extract,350'add': cmd_add,351'list': cmd_list,352'stats': cmd_stats,353}354dispatch[args.command](args)355356357if __name__ == '__main__':358main()359