Loading source
Pulling the file list, source metadata, and syntax-aware rendering for this listing.
Source from repo
Structured planning workflow that uses files to track tasks, decisions, and project progress.
Files
Skill
Size
Entrypoint
Format
Open file
Syntax-highlighted preview of this file as included in the skill package.
scripts/session-catchup.py
1#!/usr/bin/env python32"""3Session Catchup Script for planning-with-files45Analyzes the previous session to find unsynced context after the last6planning file update. Designed to run on SessionStart.78Usage: python3 session-catchup.py [project-path]9"""1011import json12import sys13import os14from pathlib import Path15from typing import Any, Dict, Iterable, List, Optional, Tuple1617try:18import orjson19except ImportError:20orjson = None2122PLANNING_FILES = ['task_plan.md', 'progress.md', 'findings.md']23MIN_SESSION_BYTES = 5000242526def json_loads(line: str) -> Optional[Dict[str, Any]]:27"""Prefer optional orjson while keeping the hook dependency-free."""28try:29if orjson is not None:30data = orjson.loads(line)31else:32data = json.loads(line)33except (ValueError, TypeError, UnicodeDecodeError):34return None35return data if isinstance(data, dict) else None363738def normalize_for_compare(path_value: str) -> str:39expanded = os.path.expanduser(path_value)40try:41return str(Path(expanded).resolve())42except (OSError, ValueError):43return os.path.abspath(expanded)444546def normalize_path(project_path: str) -> str:47"""Normalize project path to match Claude Code's internal representation.4849Claude Code stores session directories using the Windows-native path50(e.g., C:\\Users\\...) sanitized with separators replaced by dashes.51Git Bash passes /c/Users/... which produces a DIFFERENT sanitized52string. This function converts Git Bash paths to Windows paths first.53"""54p = project_path5556# Git Bash / MSYS2: /c/Users/... -> C:/Users/...57if len(p) >= 3 and p[0] == '/' and p[2] == '/':58p = p[1].upper() + ':' + p[2:]5960# Resolve to absolute path to handle relative paths and symlinks61try:62resolved = str(Path(p).resolve())63# On Windows, resolve() returns C:\Users\... which is what we want64if os.name == 'nt' or '\\' in resolved:65p = resolved66except (OSError, ValueError):67pass6869return p707172def get_claude_project_dir(project_path: str) -> Path:73"""Resolve Claude Code's project-specific session storage path."""74normalized = normalize_path(project_path)7576# Claude Code's sanitization: replace path separators and : with -77sanitized = normalized.replace('\\', '-').replace('/', '-').replace(':', '-')78sanitized = sanitized.replace('_', '-')79# Strip leading dash if present (Unix absolute paths start with /)80if sanitized.startswith('-'):81sanitized = sanitized[1:]8283return Path.home() / '.claude' / 'projects' / sanitized848586def get_sessions_sorted(project_dir: Path) -> List[Path]:87"""Get all session files sorted by modification time (newest first)."""88sessions = list(project_dir.glob('*.jsonl'))89main_sessions = [s for s in sessions if not s.name.startswith('agent-')]90return sorted(main_sessions, key=safe_stat_mtime, reverse=True)919293def safe_stat_mtime(path: Path) -> float:94try:95return path.stat().st_mtime96except OSError:97return 0.09899100def is_substantial_session(session: Path) -> bool:101try:102return session.stat().st_size > MIN_SESSION_BYTES103except OSError:104return False105106107def read_codex_meta(session_file: Path) -> Optional[Dict[str, Any]]:108"""Read the first session_meta; later meta records may be copied parent context."""109try:110with open(session_file, 'r', encoding='utf-8', errors='replace') as f:111for line in f:112data = json_loads(line)113if not data or data.get('type') != 'session_meta':114continue115payload = data.get('payload')116return payload if isinstance(payload, dict) else None117except OSError:118return None119return None120121122def codex_meta_cwd(meta: Dict[str, Any]) -> Optional[str]:123cwd = meta.get('cwd')124return cwd if isinstance(cwd, str) else None125126127def find_current_codex_session(sessions: List[Path]) -> Optional[Path]:128thread_id = os.getenv('CODEX_THREAD_ID', '').strip()129if not thread_id:130return None131132for session in sessions:133if thread_id in session.name:134return session135return None136137138def is_codex_project_session(session: Path, project_cmp: str) -> bool:139if not is_substantial_session(session):140return False141142meta = read_codex_meta(session)143if not meta:144return False145source = meta.get('source')146if isinstance(source, dict) and 'subagent' in source:147return False148cwd = codex_meta_cwd(meta)149return bool(cwd and normalize_for_compare(cwd) == project_cmp)150151152def get_codex_sessions(project_path: str) -> Iterable[Path]:153sessions_dir = Path(os.path.expanduser(os.getenv('CODEX_SESSIONS_DIR', '~/.codex/sessions')))154if not sessions_dir.exists():155return156157project_cmp = normalize_for_compare(project_path)158sessions = sorted(sessions_dir.rglob('rollout-*.jsonl'), key=safe_stat_mtime, reverse=True)159current = find_current_codex_session(sessions)160if current and is_codex_project_session(current, project_cmp):161yield current162163for session in sessions:164if session == current:165continue166if is_codex_project_session(session, project_cmp):167yield session168169170def get_session_candidates(project_path: str) -> Tuple[str, Iterable[Path]]:171if '/.codex/' in Path(__file__).resolve().as_posix().lower():172return 'codex', get_codex_sessions(project_path)173174claude_project_dir = get_claude_project_dir(project_path)175if claude_project_dir.exists():176return 'claude', get_sessions_sorted(claude_project_dir)177return 'claude', []178179180def parse_session_messages(session_file: Path) -> List[Dict[str, Any]]:181"""Parse all messages from a session file, preserving order."""182messages = []183with open(session_file, 'r', encoding='utf-8', errors='replace') as f:184for line_num, line in enumerate(f):185data = json_loads(line)186if data is not None:187data['_line_num'] = line_num188messages.append(data)189return messages190191192def planning_file_from_path(path_value: Any) -> Optional[str]:193if not isinstance(path_value, str):194return None195for pf in PLANNING_FILES:196if path_value.endswith(pf):197return pf198return None199200201def planning_file_from_paths(paths: Iterable[Any]) -> Optional[str]:202matches = {pf for path in paths if (pf := planning_file_from_path(path))}203for pf in PLANNING_FILES:204if pf in matches:205return pf206return None207208209def codex_planning_update(payload: Dict[str, Any]) -> Optional[str]:210"""Use Codex's structured apply_patch result instead of parsing tool text."""211if payload.get('type') != 'patch_apply_end' or payload.get('success') is not True:212return None213changes = payload.get('changes')214return planning_file_from_paths(changes.keys()) if isinstance(changes, dict) else None215216217def find_last_planning_update(messages: List[Dict[str, Any]]) -> Tuple[int, Optional[str]]:218"""219Find the last time a planning file was written/edited.220Returns (line_number, filename) or (-1, None) if not found.221"""222last_update_line = -1223last_update_file = None224225for msg in messages:226line_num = msg.get('_line_num')227if not isinstance(line_num, int):228continue229msg_type = msg.get('type')230231if msg_type == 'assistant':232content = msg.get('message', {}).get('content', [])233if isinstance(content, list):234for item in content:235if item.get('type') == 'tool_use':236tool_name = item.get('name', '')237tool_input = item.get('input', {})238if not isinstance(tool_input, dict):239tool_input = {}240241if tool_name in ('Write', 'Edit'):242planning_file = planning_file_from_path(tool_input.get('file_path', ''))243if planning_file:244last_update_line = line_num245last_update_file = planning_file246247elif msg_type == 'event_msg':248payload = msg.get('payload')249if isinstance(payload, dict):250planning_file = codex_planning_update(payload)251if planning_file:252last_update_line = line_num253last_update_file = planning_file254255return last_update_line, last_update_file256257258def text_content(content: Any) -> str:259if isinstance(content, str):260return content261if not isinstance(content, list):262return ''263return '\n'.join(264item.get('text', '')265for item in content266if isinstance(item, dict) and isinstance(item.get('text'), str)267)268269270def parse_codex_tool_args(payload: Dict[str, Any]) -> Tuple[Dict[str, Any], str]:271raw_args = payload.get('arguments', payload.get('input', ''))272if isinstance(raw_args, dict):273return raw_args, json.dumps(raw_args, ensure_ascii=True)274if not isinstance(raw_args, str):275return {}, ''276decoded = json_loads(raw_args)277return (decoded, raw_args) if isinstance(decoded, dict) else ({}, raw_args)278279280def summarize_codex_tool(payload: Dict[str, Any]) -> str:281tool_name = payload.get('name', 'tool')282tool_args, raw_args = parse_codex_tool_args(payload)283if tool_name == 'exec_command':284command = tool_args.get('cmd', raw_args)285if isinstance(command, str):286return f"exec_command: {command[:80]}"287return str(tool_name)288289290def extract_messages_after(messages: List[Dict[str, Any]], after_line: int) -> List[Dict[str, Any]]:291"""Extract conversation messages after a certain line number."""292result = []293for msg in messages:294line_num = msg.get('_line_num')295if not isinstance(line_num, int) or line_num <= after_line:296continue297298msg_type = msg.get('type')299is_meta = msg.get('isMeta', False)300301if msg_type == 'user' and not is_meta:302content = text_content(msg.get('message', {}).get('content', ''))303304if content:305if content.startswith(('<local-command', '<command-', '<task-notification')):306continue307if len(content) > 20:308result.append({'role': 'user', 'content': content, 'line': line_num})309310elif msg_type == 'assistant':311msg_content = msg.get('message', {}).get('content', '')312text = text_content(msg_content)313tool_uses = []314315if isinstance(msg_content, list):316for item in msg_content:317if isinstance(item, dict) and item.get('type') == 'tool_use':318tool_name = item.get('name', '')319tool_input = item.get('input', {})320if not isinstance(tool_input, dict):321tool_input = {}322if tool_name == 'Edit':323tool_uses.append(f"Edit: {tool_input.get('file_path', 'unknown')}")324elif tool_name == 'Write':325tool_uses.append(f"Write: {tool_input.get('file_path', 'unknown')}")326elif tool_name == 'Bash':327cmd = tool_input.get('command', '')[:80]328tool_uses.append(f"Bash: {cmd}")329else:330tool_uses.append(f"{tool_name}")331332if text or tool_uses:333result.append({334'role': 'assistant',335'content': text[:600] if text else '',336'tools': tool_uses,337'line': line_num338})339340elif msg_type == 'response_item':341payload = msg.get('payload')342if not isinstance(payload, dict):343continue344345payload_type = payload.get('type')346if payload_type == 'message':347role = payload.get('role')348if role not in ('user', 'assistant'):349continue350content = text_content(payload.get('content'))351if role == 'user':352if content.startswith(('<local-command', '<command-', '<task-notification')):353continue354if len(content) > 20:355result.append({'role': 'user', 'content': content, 'line': line_num})356elif content:357result.append({358'role': 'assistant',359'content': content[:600],360'tools': [],361'line': line_num362})363elif payload_type in ('function_call', 'custom_tool_call'):364result.append({365'role': 'assistant',366'content': '',367'tools': [summarize_codex_tool(payload)],368'line': line_num369})370371return result372373374def main():375project_path = sys.argv[1] if len(sys.argv) > 1 else os.getcwd()376377# Check if planning files exist (indicates active task)378has_planning_files = any(379Path(project_path, f).exists() for f in PLANNING_FILES380)381if not has_planning_files:382# No planning files in this project; skip catchup to avoid noise.383return384385runtime_name, sessions = get_session_candidates(project_path)386387# Find a substantial previous session388target_session = None389for session in sessions:390if runtime_name == 'claude' and not is_substantial_session(session):391continue392target_session = session393break394395if not target_session:396return397398messages = parse_session_messages(target_session)399last_update_line, last_update_file = find_last_planning_update(messages)400401# No planning updates in the target session; skip catchup output.402if last_update_line < 0:403return404405# Only output if there's unsynced content406messages_after = extract_messages_after(messages, last_update_line)407408if not messages_after:409return410411# Output catchup report412print("\n[planning-with-files] SESSION CATCHUP DETECTED")413print(f"Previous session: {target_session.stem}")414print(f"Runtime: {runtime_name}")415416print(f"Last planning update: {last_update_file} at message #{last_update_line}")417print(f"Unsynced messages: {len(messages_after)}")418419print("\n--- UNSYNCED CONTEXT ---")420assistant_label = 'CODEX' if runtime_name == 'codex' else 'CLAUDE'421for msg in messages_after[-15:]: # Last 15 messages422if msg['role'] == 'user':423print(f"USER: {msg['content'][:300]}")424else:425if msg.get('content'):426print(f"{assistant_label}: {msg['content'][:300]}")427if msg.get('tools'):428print(f" Tools: {', '.join(msg['tools'][:4])}")429430print("\n--- RECOMMENDED ---")431print("1. Run: git diff --stat")432print("2. Read: task_plan.md, progress.md, findings.md")433print("3. Update planning files based on above context")434print("4. Continue with task")435436437if __name__ == '__main__':438main()439