Loading source
Pulling the file list, source metadata, and syntax-aware rendering for this listing.
Source from repo
Enterprise-grade research with multi-source synthesis, citation tracking, and verification. 8-phase pipeline with auto-continuation.
Files
Skill
Size
Entrypoint
Format
Open file
Syntax-highlighted preview of this file as included in the skill package.
scripts/evidence_store.py
1#!/usr/bin/env python32"""3Evidence Store — append-only evidence persistence for deep-research v3.0.45CLI subcommands:6init Create empty evidence.jsonl in a run directory7add Append an evidence row, return evidence_id8list List evidence rows, optionally filtered by source_id9export Export evidence as JSON array1011Evidence identity:12evidence_id = sha256(source_id + normalized_quote + locator)[:16]1314All state is append-only JSONL. Evidence is never modified after capture.15"""1617import argparse18import hashlib19import json20import os21import re22import sys23from datetime import datetime, timezone242526# ---------------------------------------------------------------------------27# Evidence ID computation28# ---------------------------------------------------------------------------2930_WHITESPACE_RE = re.compile(r'\s+')313233def normalize_quote(quote: str) -> str:34"""Normalize whitespace for stable hashing."""35return _WHITESPACE_RE.sub(' ', quote.strip()).lower()363738def compute_evidence_id(source_id: str, quote: str, locator: str | None) -> str:39"""sha256(source_id + normalized_quote + locator)[:16] hex."""40payload = source_id + normalize_quote(quote) + (locator or '')41return hashlib.sha256(payload.encode('utf-8')).hexdigest()[:16]424344# ---------------------------------------------------------------------------45# JSONL helpers (shared pattern with citation_manager)46# ---------------------------------------------------------------------------4748def append_jsonl(path: str, obj: dict) -> None:49with open(path, 'a') as f:50f.write(json.dumps(obj, ensure_ascii=False) + '\n')515253def read_jsonl(path: str) -> list[dict]:54rows = []55if not os.path.exists(path):56return rows57with open(path) as f:58for line in f:59line = line.strip()60if line:61rows.append(json.loads(line))62return rows636465# ---------------------------------------------------------------------------66# Subcommands67# ---------------------------------------------------------------------------6869def cmd_init(args: argparse.Namespace) -> None:70"""Create empty evidence.jsonl if it doesn't exist."""71out_dir = os.path.abspath(args.dir)72path = os.path.join(out_dir, 'evidence.jsonl')73if not os.path.exists(path):74os.makedirs(out_dir, exist_ok=True)75open(path, 'w').close()76print(json.dumps({'status': 'ok', 'path': path}))777879def cmd_add(args: argparse.Namespace) -> None:80"""Append evidence row, print evidence_id."""81data = json.loads(args.json)82source_id = data.get('source_id', '')83quote = data.get('quote', '')84if not source_id or not quote:85print(json.dumps({'error': 'source_id and quote are required'}), file=sys.stderr)86sys.exit(1)8788locator = data.get('locator')89evidence_id = compute_evidence_id(source_id, quote, locator)90evidence_path = os.path.join(args.dir, 'evidence.jsonl')9192# Check for duplicate93existing = read_jsonl(evidence_path)94for row in existing:95if row.get('evidence_id') == evidence_id:96print(json.dumps({97'status': 'duplicate',98'evidence_id': evidence_id,99}))100return101102valid_types = {'direct_quote', 'paraphrase', 'data_point', 'figure_reference', 'methodology'}103evidence_type = data.get('evidence_type', 'direct_quote')104if evidence_type not in valid_types:105evidence_type = 'direct_quote'106107row = {108'evidence_id': evidence_id,109'source_id': source_id,110'retrieval_query': data.get('retrieval_query'),111'locator': locator,112'quote': quote,113'evidence_type': evidence_type,114'captured_at': datetime.now(timezone.utc).isoformat(),115}116append_jsonl(evidence_path, row)117print(json.dumps({118'status': 'added',119'evidence_id': evidence_id,120'source_id': source_id,121}))122123124def cmd_list(args: argparse.Namespace) -> None:125"""List evidence rows, optionally filtered."""126evidence_path = os.path.join(args.dir, 'evidence.jsonl')127rows = read_jsonl(evidence_path)128129if args.source_id:130rows = [r for r in rows if r.get('source_id') == args.source_id]131132# Deduplicate by evidence_id133seen = set()134unique = []135for r in rows:136eid = r.get('evidence_id')137if eid not in seen:138seen.add(eid)139unique.append(r)140141print(json.dumps({142'count': len(unique),143'evidence': unique,144}, indent=2, ensure_ascii=False))145146147def cmd_export(args: argparse.Namespace) -> None:148"""Export all evidence as JSON array."""149evidence_path = os.path.join(args.dir, 'evidence.jsonl')150rows = read_jsonl(evidence_path)151152# Deduplicate153seen = set()154unique = []155for r in rows:156eid = r.get('evidence_id')157if eid not in seen:158seen.add(eid)159unique.append(r)160161print(json.dumps(unique, indent=2, ensure_ascii=False))162163164# ---------------------------------------------------------------------------165# CLI entry point166# ---------------------------------------------------------------------------167168def main() -> None:169parser = argparse.ArgumentParser(170prog='evidence_store',171description='Append-only evidence persistence for deep-research v3.0',172)173sub = parser.add_subparsers(dest='command', required=True)174175# init176p_init = sub.add_parser('init', help='Create empty evidence.jsonl')177p_init.add_argument('--dir', required=True, help='Run directory')178179# add180p_add = sub.add_parser('add', help='Append evidence row')181p_add.add_argument('--json', required=True, help='JSON with source_id, quote, locator, evidence_type, retrieval_query')182p_add.add_argument('--dir', required=True, help='Run directory containing evidence.jsonl')183184# list185p_list = sub.add_parser('list', help='List evidence rows')186p_list.add_argument('--dir', required=True, help='Run directory')187p_list.add_argument('--source-id', default=None, help='Filter by source_id')188189# export190p_export = sub.add_parser('export', help='Export all evidence as JSON array')191p_export.add_argument('--dir', required=True, help='Run directory')192193args = parser.parse_args()194195dispatch = {196'init': cmd_init,197'add': cmd_add,198'list': cmd_list,199'export': cmd_export,200}201dispatch[args.command](args)202203204if __name__ == '__main__':205main()206