Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
997a648
Add DAG-based message architecture spec
cboos Feb 12, 2026
232a73d
Add DAG infrastructure module (Phase A)
cboos Feb 13, 2026
70425ec
Integrate DAG ordering into directory-mode loading (Phase B)
cboos Feb 13, 2026
b5fe77f
Add hierarchical session navigation with parent/child relationships (…
cboos Feb 14, 2026
2268d16
Make session backlinks navigable with #msg-d-{N} anchors
cboos Feb 15, 2026
5d3a502
Remove unused extract_working_directories (review clean-up)
cboos Feb 16, 2026
deed202
Remove unused has_cache_changes (review clean-up)
cboos Feb 16, 2026
ec1d2c1
Fix progress entries breaking DAG chain by repairing parent pointers
cboos Feb 17, 2026
72cf3bf
Add within-session fork (rewind) visualization
cboos Feb 17, 2026
0427177
Add message previews to fork/branch nav items and headers
cboos Feb 17, 2026
5f14b23
Improve fork/branch presentation with context and visual hierarchy
cboos Feb 17, 2026
d656d2b
Fix false forks from context compaction replays and tool-result side-…
cboos Feb 18, 2026
b7fb95d
Add debug UUID toggle to show uuid/parentUuid on each message
cboos Feb 19, 2026
922dc4e
Handle tool-result variant 2: User continues, Assistant subtree dead-…
cboos Feb 19, 2026
7bfbf8e
Fix false orphans caused by over-aggressive user text deduplication
cboos Feb 22, 2026
ae15389
Fix false forks from context compaction replays and tool-result side-…
cboos Feb 22, 2026
98347ed
Thread session_tree through individual session file generation
cboos Feb 22, 2026
c7f1fed
Remove unused import (ruff fix)
cboos Feb 22, 2026
7d597e1
Fix pyright errors: type narrowing and protected access
cboos Feb 23, 2026
9325bab
Fix ty check warnings: narrow TranscriptEntry union before accessing …
cboos Mar 3, 2026
e198d23
Address CodeRabbit review feedback
cboos Mar 5, 2026
2005ee3
Review response: items not addressed (by design)
cboos Mar 5, 2026
2eeacec
Thread session_tree through end-to-end integration tests
cboos Mar 5, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
214 changes: 164 additions & 50 deletions claude_code_log/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,16 @@
from .parser import parse_timestamp
from .factories import create_transcript_entry
from .models import (
BaseTranscriptEntry,
TranscriptEntry,
AssistantTranscriptEntry,
QueueOperationTranscriptEntry,
SummaryTranscriptEntry,
SystemTranscriptEntry,
UserTranscriptEntry,
ToolResultContent,
)
from .dag import SessionTree, build_dag_from_entries, traverse_session_tree
from .renderer import get_renderer, is_html_outdated


Expand All @@ -47,6 +50,105 @@ def get_file_extension(format: str) -> str:
return "md" if format in ("md", "markdown") else format


# =============================================================================
# Progress Chain Repair
# =============================================================================


def _scan_file_progress(path: Path, chain: dict[str, Optional[str]]) -> None:
"""Extract progress entry uuid->parentUuid from a single JSONL file."""
try:
with open(path, "r", encoding="utf-8", errors="replace") as f:
for line in f:
if "progress" not in line: # Fast pre-filter
continue
line = line.strip()
if not line:
continue
try:
raw = json.loads(line)
if not isinstance(raw, dict):
continue
d = cast(dict[str, Any], raw)
if d.get("type") == "progress":
uuid = d.get("uuid")
if isinstance(uuid, str):
chain[uuid] = d.get("parentUuid")
except json.JSONDecodeError:
continue
except FileNotFoundError:
pass # Race condition: file may have been deleted


def _scan_progress_chains(*paths: Path) -> dict[str, Optional[str]]:
"""Fast scan of JSONL files for progress entry uuid->parentUuid mappings."""
chain: dict[str, Optional[str]] = {}
for path in paths:
if path.is_file():
_scan_file_progress(path, chain)
elif path.is_dir():
for f in path.glob("*.jsonl"):
_scan_file_progress(f, chain)
# Also scan subagent directories
for f in path.glob("*/subagents/*.jsonl"):
_scan_file_progress(f, chain)
return chain


def _scan_sidechain_uuids(directory: Path) -> set[str]:
"""Collect UUIDs from sidechain/subagent files not loaded into the DAG.

Some subagent files (e.g. aprompt_suggestion) are never referenced
via agentId in the main session, so they aren't loaded by
load_transcript(). Their UUIDs are needed to suppress false orphan
warnings when main-chain entries reference sidechain parents.
"""
uuids: set[str] = set()
for f in directory.glob("*/subagents/*.jsonl"):
try:
with open(f, "r", encoding="utf-8", errors="replace") as fh:
for line in fh:
line = line.strip()
if not line:
continue
try:
raw = json.loads(line)
if isinstance(raw, dict):
uuid = cast(dict[str, Any], raw).get("uuid")
if isinstance(uuid, str):
uuids.add(uuid)
except json.JSONDecodeError:
continue
except FileNotFoundError:
pass
return uuids


def _repair_parent_chains(
messages: list[TranscriptEntry],
progress_chain: dict[str, Optional[str]],
) -> None:
"""Repair parentUuid fields that point to progress entries.

Walks the progress chain to find the nearest non-progress ancestor.
Mutates entries in place (Pydantic v2 models are mutable by default).
"""
if not progress_chain:
return
for msg in messages:
parent = getattr(msg, "parentUuid", None)
if parent and parent in progress_chain:
current: Optional[str] = parent
seen: set[str] = set()
while current is not None and current in progress_chain:
if current in seen:
current = None
break
seen.add(current)
current = progress_chain[current]
msg.parentUuid = current # type: ignore[union-attr]


# =============================================================================
# Transcript Loading Functions
# =============================================================================
Expand Down Expand Up @@ -315,8 +417,12 @@ def load_directory_transcripts(
from_date: Optional[str] = None,
to_date: Optional[str] = None,
silent: bool = False,
) -> list[TranscriptEntry]:
"""Load all JSONL transcript files from a directory and combine them."""
) -> tuple[list[TranscriptEntry], SessionTree]:
"""Load all JSONL transcript files from a directory and combine them.

Returns (messages, session_tree) — the tree is reused by the renderer
to avoid rebuilding the DAG.
"""
all_messages: list[TranscriptEntry] = []

# Find all .jsonl files, excluding agent files (they are loaded via load_transcript
Expand All @@ -331,14 +437,35 @@ def load_directory_transcripts(
)
all_messages.extend(messages)

# Sort all messages chronologically
def get_timestamp(entry: TranscriptEntry) -> str:
if hasattr(entry, "timestamp"):
return entry.timestamp # type: ignore
return ""
# Repair parent chains: progress entries create UUID gaps
progress_chain = _scan_progress_chains(directory_path)
_repair_parent_chains(all_messages, progress_chain)

all_messages.sort(key=get_timestamp)
return all_messages
# Partition: sidechain entries excluded from DAG (Phase C scope)
sidechain_entries = [e for e in all_messages if getattr(e, "isSidechain", False)]
main_entries = [e for e in all_messages if not getattr(e, "isSidechain", False)]

# Collect sidechain UUIDs so DAG build can suppress orphan warnings
# for parents that exist in sidechain data (will be integrated in Phase C)
sidechain_uuids: set[str] = {
e.uuid for e in sidechain_entries if isinstance(e, BaseTranscriptEntry)
}
# Also scan unloaded subagent files (e.g. aprompt_suggestion agents
# that are never referenced via agentId in the main session)
sidechain_uuids |= _scan_sidechain_uuids(directory_path)

# Build DAG and traverse (entries grouped by session, depth-first)
tree = build_dag_from_entries(main_entries, sidechain_uuids=sidechain_uuids)
dag_ordered = traverse_session_tree(tree)

# Re-add summaries/queue-ops (excluded from DAG since they lack uuid)
non_dag_entries: list[TranscriptEntry] = [
e
for e in main_entries
if isinstance(e, (SummaryTranscriptEntry, QueueOperationTranscriptEntry))
]

return dag_ordered + sidechain_entries + non_dag_entries, tree


# =============================================================================
Expand Down Expand Up @@ -406,9 +533,11 @@ def deduplicate_messages(messages: list[TranscriptEntry]) -> list[TranscriptEntr
content_key = item.tool_use_id
break
else:
# No tool result found - this is a user text message
# No tool result found - this is a user text message.
# Use uuid to keep distinct messages (even at same timestamp)
# so DAG parent references remain valid.
is_user_text = True
# content_key stays empty (dedupe by timestamp alone)
content_key = message.uuid
elif isinstance(message, SummaryTranscriptEntry):
# Summaries have no timestamp or uuid - use leafUuid to keep them distinct
content_key = message.leafUuid
Expand Down Expand Up @@ -698,6 +827,7 @@ def _generate_paginated_html(
session_data: Dict[str, SessionCacheData],
working_directories: List[str],
silent: bool = False,
session_tree: Optional[SessionTree] = None,
) -> Path:
"""Generate paginated HTML files for combined transcript.

Expand Down Expand Up @@ -856,6 +986,7 @@ def _generate_paginated_html(
page_title,
page_info=page_info,
page_stats=page_stats,
session_tree=session_tree,
)
page_file.write_text(html_content, encoding="utf-8")

Expand Down Expand Up @@ -948,11 +1079,18 @@ def convert_jsonl_to(
# Initialize working_directories for both branches (used by pagination in directory mode)
working_directories: List[str] = []

# session_tree is populated in directory mode (DAG already built);
# None in single-file mode (renderer builds it on demand)
session_tree: Optional[SessionTree] = None

if input_path.is_file():
# Single file mode - cache only available for directory mode
if output_path is None:
output_path = input_path.with_suffix(f".{ext}")
messages = load_transcript(input_path, silent=silent)
# Repair progress chain gaps for single-file mode
progress_chain = _scan_progress_chains(input_path)
_repair_parent_chains(messages, progress_chain)
title = f"Claude Transcript - {input_path.stem}"
cache_was_updated = False # No cache in single file mode
else:
Expand Down Expand Up @@ -988,7 +1126,7 @@ def convert_jsonl_to(
return output_path

# Phase 2: Load messages (will use fresh cache when available)
messages = load_directory_transcripts(
messages, session_tree = load_directory_transcripts(
input_path, cache_manager, from_date, to_date, silent
)

Expand Down Expand Up @@ -1065,6 +1203,7 @@ def convert_jsonl_to(
session_data,
working_directories,
silent=silent,
session_tree=session_tree,
)
else:
# Use single-file generation for small projects or filtered views
Expand All @@ -1091,7 +1230,9 @@ def convert_jsonl_to(
if should_regenerate:
# For referenced images, pass the output directory
output_dir = output_path.parent
content = renderer.generate(messages, title, output_dir=output_dir)
content = renderer.generate(
messages, title, output_dir=output_dir, session_tree=session_tree
)
assert content is not None
output_path.write_text(content, encoding="utf-8")

Expand All @@ -1117,44 +1258,12 @@ def convert_jsonl_to(
cache_was_updated,
image_export_mode,
silent=silent,
session_tree=session_tree,
)

return output_path


def has_cache_changes(
project_dir: Path,
cache_manager: Optional[CacheManager],
from_date: Optional[str] = None,
to_date: Optional[str] = None,
) -> bool:
"""Check if cache needs updating (fast mtime comparison only).

Returns True if there are modified files or cache is stale.
Does NOT load any messages - that's deferred to ensure_fresh_cache.
"""
if cache_manager is None:
return True # No cache means we need to process

jsonl_files = list(project_dir.glob("*.jsonl"))
if not jsonl_files:
return False

# Get cached project data
cached_project_data = cache_manager.get_cached_project_data()

# Check various invalidation conditions
modified_files = cache_manager.get_modified_files(jsonl_files)

return (
cached_project_data is None
or from_date is not None
or to_date is not None
or bool(modified_files)
or (cached_project_data.total_message_count == 0 and bool(jsonl_files))
)


def ensure_fresh_cache(
project_dir: Path,
cache_manager: Optional[CacheManager],
Expand All @@ -1165,7 +1274,6 @@ def ensure_fresh_cache(
"""Ensure cache is fresh and populated. Returns True if cache was updated.

This does the heavy lifting of loading and parsing files.
Call has_cache_changes() first for a fast check.
"""
if cache_manager is None:
return False
Expand Down Expand Up @@ -1201,7 +1309,7 @@ def ensure_fresh_cache(
# Load and process messages to populate cache
if not silent:
print(f"Updating cache for {project_dir.name}...")
messages = load_directory_transcripts(
messages, _tree = load_directory_transcripts(
project_dir, cache_manager, from_date, to_date, silent
)

Expand Down Expand Up @@ -1479,6 +1587,7 @@ def _generate_individual_session_files(
cache_was_updated: bool = False,
image_export_mode: Optional[str] = None,
silent: bool = False,
session_tree: Optional[SessionTree] = None,
) -> int:
"""Generate individual files for each session in the specified format.

Expand Down Expand Up @@ -1577,7 +1686,12 @@ def _generate_individual_session_files(
if should_regenerate_session:
# Generate session content
session_content = renderer.generate_session(
messages, session_id, session_title, cache_manager, output_dir
messages,
session_id,
session_title,
cache_manager,
output_dir,
session_tree=session_tree,
)
assert session_content is not None
# Write session file
Expand Down Expand Up @@ -1897,7 +2011,7 @@ def process_projects_hierarchy(
print(
f"Warning: No cached data available for {project_dir.name}, using fallback processing"
)
messages = load_directory_transcripts(
messages, _tree = load_directory_transcripts(
project_dir, cache_manager, from_date, to_date, silent=silent
)
# Ensure cache is populated with session data (including working directories)
Expand Down
Loading
Loading