Loading source
Pulling the file list, source metadata, and syntax-aware rendering for this listing.
Source from bundle
Locate repo-scoped Codex sessions and extract plain prompts and replies from JSONL logs without tool calls.
Files
Skill
Size
Entrypoint
Format
Open file
Syntax-highlighted preview of this file as included in the skill package.
scripts/codex_sessions.py
1#!/usr/bin/env python32from __future__ import annotations34import argparse5import json6import sys7from dataclasses import dataclass8from pathlib import Path9from typing import Any1011DEFAULT_SESSIONS_ROOT = Path.home() / ".codex" / "sessions"121314@dataclass15class SessionMeta:16path: Path17session_id: str | None18timestamp: str | None19cwd: str | None20source: str | None21cli_version: str | None22model_provider: str | None232425@dataclass26class TextMessage:27role: str28text: str29timestamp: str | None30phase: str | None = None313233@dataclass34class SessionRecord:35meta: SessionMeta36messages: list[TextMessage]373839def parse_args() -> argparse.Namespace:40parser = argparse.ArgumentParser(41description=(42"Find and parse Codex JSONL sessions under ~/.codex/sessions, filter them by repo path, "43"and extract only plain user/assistant text without tool calls."44)45)46parser.add_argument(47"--sessions-root",48default=str(DEFAULT_SESSIONS_ROOT),49help="Root directory that contains Codex session JSONL files (default: ~/.codex/sessions)",50)5152subparsers = parser.add_subparsers(dest="command", required=True)5354list_parser = subparsers.add_parser("list", help="List matching sessions with short previews")55add_session_filters(list_parser)56list_parser.add_argument("--limit", type=int, default=10, help="Maximum number of sessions to print")57list_parser.add_argument(58"--preview-chars",59type=int,60default=140,61help="Maximum preview length for last user/assistant messages",62)63list_parser.add_argument("--json", action="store_true", help="Emit machine-readable JSON")6465messages_parser = subparsers.add_parser(66"messages",67help="Print extracted user/assistant messages for one or more sessions",68)69add_session_filters(messages_parser)70messages_parser.add_argument(71"--session",72help="Exact session file path or unique filename fragment to inspect directly",73)74messages_parser.add_argument(75"--limit-sessions",76type=int,77default=1,78help="Maximum number of sessions to print when using repo/path filters",79)80messages_parser.add_argument(81"--tail",82type=int,83default=8,84help="Number of trailing text messages to print per session (0 = all)",85)86messages_parser.add_argument(87"--role",88choices=["all", "user", "assistant"],89default="all",90help="Restrict output to one side of the conversation",91)92messages_parser.add_argument(93"--assistant-phase",94choices=["all", "commentary", "final_answer"],95default="all",96help="Restrict assistant output to commentary or final answers when event metadata is present",97)98messages_parser.add_argument("--json", action="store_true", help="Emit machine-readable JSON")99100return parser.parse_args()101102103def add_session_filters(parser: argparse.ArgumentParser) -> None:104parser.add_argument(105"--repo",106help="Exact repo/worktree path to match against session_meta.payload.cwd",107)108parser.add_argument(109"--cwd-contains",110help="Substring match against session_meta.payload.cwd",111)112113114def iter_session_paths(root: Path):115if not root.exists():116return117for path in sorted(root.rglob("*.jsonl"), reverse=True):118if path.is_file():119yield path120121122def load_session_meta(path: Path) -> SessionMeta:123payload: dict[str, Any] = {}124with path.open("r", encoding="utf-8") as handle:125for line in handle:126if not line.strip():127continue128record = json.loads(line)129if record.get("type") == "session_meta":130payload = record.get("payload") or {}131break132133return SessionMeta(134path=path,135session_id=payload.get("id"),136timestamp=payload.get("timestamp"),137cwd=payload.get("cwd"),138source=payload.get("source") or payload.get("originator"),139cli_version=payload.get("cli_version"),140model_provider=payload.get("model_provider"),141)142143144def matches_filters(meta: SessionMeta, repo: str | None, cwd_contains: str | None) -> bool:145cwd = meta.cwd or ""146if repo and cwd != repo:147return False148if cwd_contains and cwd_contains not in cwd:149return False150return True151152153def content_to_text(content: Any) -> str:154if isinstance(content, str):155return content.strip()156if not isinstance(content, list):157return ""158159parts: list[str] = []160for item in content:161if not isinstance(item, dict):162continue163item_type = item.get("type")164if item_type in {"input_text", "output_text", "text"}:165text = item.get("text")166if isinstance(text, str) and text.strip():167parts.append(text.strip())168return "\n\n".join(parts).strip()169170171def extract_messages(path: Path) -> list[TextMessage]:172event_messages: list[TextMessage] = []173fallback_messages: list[TextMessage] = []174175with path.open("r", encoding="utf-8") as handle:176for line in handle:177if not line.strip():178continue179record = json.loads(line)180record_type = record.get("type")181payload = record.get("payload") or {}182timestamp = record.get("timestamp")183184if record_type == "event_msg":185event_type = payload.get("type")186if event_type == "user_message":187text = (payload.get("message") or "").strip()188if text:189event_messages.append(TextMessage(role="user", text=text, timestamp=timestamp))190elif event_type == "agent_message":191text = (payload.get("message") or "").strip()192if text:193event_messages.append(194TextMessage(195role="assistant",196text=text,197timestamp=timestamp,198phase=payload.get("phase"),199)200)201continue202203if record_type != "response_item":204continue205if payload.get("type") != "message":206continue207role = payload.get("role")208if role not in {"user", "assistant"}:209continue210text = content_to_text(payload.get("content"))211if text:212fallback_messages.append(TextMessage(role=role, text=text, timestamp=timestamp))213214return event_messages or fallback_messages215216217def load_session_record(path: Path) -> SessionRecord:218return SessionRecord(meta=load_session_meta(path), messages=extract_messages(path))219220221def find_matching_sessions(root: Path, repo: str | None, cwd_contains: str | None) -> list[SessionMeta]:222matches: list[SessionMeta] = []223for path in iter_session_paths(root):224meta = load_session_meta(path)225if matches_filters(meta, repo=repo, cwd_contains=cwd_contains):226matches.append(meta)227matches.sort(key=lambda item: (item.timestamp or "", str(item.path)), reverse=True)228return matches229230231def resolve_session_target(root: Path, target: str) -> Path:232candidate = Path(target).expanduser()233if candidate.exists() and candidate.is_file():234return candidate235236matches = [path for path in iter_session_paths(root) if target in path.name or target in str(path)]237if not matches:238raise SystemExit(f"No session file matched: {target}")239if len(matches) > 1:240joined = "\n".join(str(path) for path in matches[:10])241extra = "" if len(matches) <= 10 else f"\n... and {len(matches) - 10} more"242raise SystemExit(f"Session target is ambiguous: {target}\n{joined}{extra}")243return matches[0]244245246def preview(text: str, limit: int) -> str:247compact = " ".join(text.split())248if len(compact) <= limit:249return compact250return compact[: max(limit - 1, 0)].rstrip() + "…"251252253def filter_messages(254messages: list[TextMessage],255role: str,256assistant_phase: str,257tail: int,258) -> list[TextMessage]:259filtered = messages260if role != "all":261filtered = [message for message in filtered if message.role == role]262if assistant_phase != "all":263filtered = [264message265for message in filtered266if message.role != "assistant" or (message.phase or "commentary") == assistant_phase267]268if tail > 0:269filtered = filtered[-tail:]270return filtered271272273def session_to_dict(record: SessionRecord, preview_chars: int | None = None) -> dict[str, Any]:274last_user = next((message for message in reversed(record.messages) if message.role == "user"), None)275last_assistant = next((message for message in reversed(record.messages) if message.role == "assistant"), None)276277payload: dict[str, Any] = {278"path": str(record.meta.path),279"session_id": record.meta.session_id,280"timestamp": record.meta.timestamp,281"cwd": record.meta.cwd,282"source": record.meta.source,283"cli_version": record.meta.cli_version,284"model_provider": record.meta.model_provider,285"message_count": len(record.messages),286"messages": [287{288"role": message.role,289"timestamp": message.timestamp,290"phase": message.phase,291"text": message.text,292}293for message in record.messages294],295}296if preview_chars is not None:297payload["last_user_preview"] = preview(last_user.text, preview_chars) if last_user else None298payload["last_assistant_preview"] = preview(last_assistant.text, preview_chars) if last_assistant else None299return payload300301302def command_list(args: argparse.Namespace) -> int:303root = Path(args.sessions_root).expanduser()304matches = find_matching_sessions(root, repo=args.repo, cwd_contains=args.cwd_contains)[: args.limit]305records = [load_session_record(meta.path) for meta in matches]306307if args.json:308json.dump([session_to_dict(record, preview_chars=args.preview_chars) for record in records], sys.stdout, ensure_ascii=False, indent=2)309sys.stdout.write("\n")310return 0311312if not records:313print("No matching Codex sessions found.")314return 0315316for index, record in enumerate(records, 1):317data = session_to_dict(record, preview_chars=args.preview_chars)318print(f"[{index}] {data['timestamp'] or '-'} {data['session_id'] or record.meta.path.name}")319print(f" path: {data['path']}")320print(f" cwd: {data['cwd'] or '-'}")321print(f" msgs: {data['message_count']}")322print(f" user: {data['last_user_preview'] or '-'}")323print(f" asst: {data['last_assistant_preview'] or '-'}")324if index != len(records):325print()326return 0327328329def command_messages(args: argparse.Namespace) -> int:330root = Path(args.sessions_root).expanduser()331332if args.session:333session_paths = [resolve_session_target(root, args.session)]334else:335matches = find_matching_sessions(root, repo=args.repo, cwd_contains=args.cwd_contains)336session_paths = [meta.path for meta in matches[: args.limit_sessions]]337338records = [load_session_record(path) for path in session_paths]339filtered_records: list[dict[str, Any]] = []340341for record in records:342filtered_messages = filter_messages(343record.messages,344role=args.role,345assistant_phase=args.assistant_phase,346tail=args.tail,347)348filtered_records.append(349{350**session_to_dict(record),351"messages": [352{353"role": message.role,354"timestamp": message.timestamp,355"phase": message.phase,356"text": message.text,357}358for message in filtered_messages359],360}361)362363if args.json:364json.dump(filtered_records, sys.stdout, ensure_ascii=False, indent=2)365sys.stdout.write("\n")366return 0367368if not filtered_records:369print("No matching Codex sessions found.")370return 0371372for record_index, record in enumerate(filtered_records, 1):373print(f"[{record_index}] {record['timestamp'] or '-'} {record['session_id'] or Path(record['path']).name}")374print(f"path: {record['path']}")375print(f"cwd: {record['cwd'] or '-'}")376print()377for message in record["messages"]:378phase_suffix = ""379if message["role"] == "assistant" and message.get("phase"):380phase_suffix = f" ({message['phase']})"381print(f"[{message['role']}{phase_suffix}]")382print(message["text"].rstrip())383print()384if record_index != len(filtered_records):385print("=" * 80)386return 0387388389def main() -> int:390args = parse_args()391if args.command == "list":392return command_list(args)393if args.command == "messages":394return command_messages(args)395raise SystemExit(f"Unsupported command: {args.command}")396397398if __name__ == "__main__":399raise SystemExit(main())400