Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
283 changes: 142 additions & 141 deletions api/export_api.py
Comment thread
clean6378-max-it marked this conversation as resolved.
Original file line number Diff line number Diff line change
@@ -1,141 +1,142 @@
"""
API route for export — produces per-chat Markdown in a zip download.
POST /api/export { since: "all"|"last", zip: true }
GET /api/export/state — returns last export time
"""

from __future__ import annotations

import io
import json
import logging
import os
import zipfile
from datetime import datetime
from pathlib import Path
from typing import Any, Literal

from flask import Blueprint, Response, request

from api.flask_config import exclusion_rules, json_response
from services.export_engine import collect_export_entries, read_last_export_ms
from services.workspace_db import global_storage_db_path
from utils.workspace_path import resolve_workspace_path

bp = Blueprint("export_api", __name__)
_logger = logging.getLogger(__name__)


def _get_state_dir() -> str:
return os.path.join(str(Path.home()), ".cursor-chat-browser")


def _get_export_state() -> dict[str, Any]:
"""Read the export state file."""
state_path = os.path.join(_get_state_dir(), "export_state.json")
if os.path.isfile(state_path):
try:
with open(state_path, "r", encoding="utf-8") as f:
parsed = json.load(f)
if isinstance(parsed, dict):
return parsed
_logger.warning(
"Export state in %s is not a JSON object (got %s); ignoring",
state_path,
type(parsed).__name__,
)
except (json.JSONDecodeError, ValueError, OSError) as e:
_logger.warning(
"Could not read export state from %s: %s",
state_path,
e,
)
return {}


def _save_export_state(count: int) -> None:
"""Save export state after an export."""
state_dir = _get_state_dir()
os.makedirs(state_dir, exist_ok=True)
state = {
"lastExportTime": datetime.now().isoformat(),
"exportedCount": count,
}
state_path = os.path.join(state_dir, "export_state.json")
with open(state_path, "w", encoding="utf-8") as f:
json.dump(state, f, indent=2)


@bp.route("/api/export/state")
def get_export_state() -> Response:
"""Return the last export timestamp."""
state = _get_export_state()
return json_response(state)


@bp.route("/api/export", methods=["POST"])
def export_chats() -> tuple[Response, int] | Response:
"""Export chats as a zip archive.

Exclusion rules (``EXCLUSION_RULES`` app config key) are evaluated against
each chat's project name, title, and model. Rules are loaded once at
application startup; an app restart is required to pick up changes to the
exclusion rules file.
"""
try:
body = request.get_json(silent=True)
if not isinstance(body, dict):
return json_response({"error": "request body must be a JSON object"}, 400)
since: Literal["all", "last"] = (
"last" if body.get("since") == "last" else "all"
)

workspace_path = resolve_workspace_path()
gdb = global_storage_db_path(workspace_path)
if not os.path.isfile(gdb):
return json_response({"error": "Cursor global storage not found"}, 404)

exported = collect_export_entries(
workspace_path=workspace_path,
exclusion_rules=exclusion_rules(),
since=since,
last_export_ms=read_last_export_ms(since, state=_get_export_state()),
out_dir="",
include_composer=True,
include_cli=False,
)
count = len(exported)
if count == 0:
return json_response(
{"error": "No conversations to export" + (
" since last export" if since == "last" else ""
)},
404,
)

buf = io.BytesIO()
with zipfile.ZipFile(buf, "w", zipfile.ZIP_DEFLATED) as zf:
for entry in exported:
zf.writestr(entry["rel_path"], entry["content"])

buf.seek(0)
_save_export_state(count)

filename = "cursor-export.zip"
return Response(
buf.getvalue(),
mimetype="application/zip",
headers={
"Content-Disposition": f'attachment; filename="{filename}"',
"X-Export-Count": str(count),
},
)

except Exception as e:
_logger.error(
"Export failed: %s (%s)",
e,
type(e).__name__,
exc_info=True,
)
return json_response({"error": "Export failed"}, 500)
"""
API route for export — produces per-chat Markdown in a zip download.
POST /api/export { since: "all"|"last", zip: true }
GET /api/export/state — returns last export time
"""

from __future__ import annotations

import io
import json
import logging
import os
import zipfile
from datetime import datetime
from pathlib import Path
from typing import Any, Literal

from flask import Blueprint, Response, request

from api.flask_config import exclusion_rules, json_response
from services.export_engine import collect_export_entries, read_last_export_ms
from services.workspace_db import global_storage_db_path
from utils.workspace_path import resolve_workspace_path

bp = Blueprint("export_api", __name__)
_logger = logging.getLogger(__name__)


def _get_state_dir() -> str:
return os.path.join(str(Path.home()), ".cursor-chat-browser")


def _get_export_state() -> dict[str, Any]:
"""Read the export state file."""
state_path = os.path.join(_get_state_dir(), "export_state.json")
if os.path.isfile(state_path):
try:
with open(state_path, "r", encoding="utf-8") as f:
parsed = json.load(f)
if isinstance(parsed, dict):
return parsed
_logger.warning(
"Export state in %s is not a JSON object (got %s); ignoring",
state_path,
type(parsed).__name__,
)
except (json.JSONDecodeError, ValueError, OSError) as e:
_logger.warning(
"Could not read export state from %s: %s",
state_path,
e,
)
return {}


def _save_export_state(count: int) -> None:
"""Save export state after an export."""
state_dir = _get_state_dir()
os.makedirs(state_dir, exist_ok=True)
state = {
"lastExportTime": datetime.now().isoformat(),
"exportedCount": count,
}
state_path = os.path.join(state_dir, "export_state.json")
with open(state_path, "w", encoding="utf-8") as f:
json.dump(state, f, indent=2)


@bp.route("/api/export/state")
def get_export_state() -> Response:
"""Return the last export timestamp."""
state = _get_export_state()
return json_response(state)


@bp.route("/api/export", methods=["POST"])
def export_chats() -> tuple[Response, int] | Response:
"""Export chats as a zip archive.

Exclusion rules (``EXCLUSION_RULES`` app config key) are evaluated against
each chat's project name, title, and model. Rules are loaded once at
application startup; an app restart is required to pick up changes to the
exclusion rules file.
"""
try:
body = request.get_json(silent=True)
if not isinstance(body, dict):
return json_response({"error": "request body must be a JSON object"}, 400)
since: Literal["all", "last"] = (
"last" if body.get("since") == "last" else "all"
)

workspace_path = resolve_workspace_path()
gdb = global_storage_db_path(workspace_path)
if not os.path.isfile(gdb):
return json_response({"error": "Cursor global storage not found"}, 404)

exported = collect_export_entries(
workspace_path=workspace_path,
exclusion_rules=exclusion_rules(),
since=since,
last_export_ms=read_last_export_ms(since, state=_get_export_state()),
out_dir="",
include_composer=True,
include_cli=False,
)
count = len(exported)
if count == 0:
return json_response(
{"error": "No conversations to export" + (
" since last export" if since == "last" else ""
)},
404,
)

buf = io.BytesIO()
with zipfile.ZipFile(buf, "w", zipfile.ZIP_DEFLATED) as zf:
for entry in exported:
zf.writestr(entry["rel_path"], entry["content"])

buf.seek(0)
_save_export_state(count)

filename = "cursor-export.zip"
return Response(
buf.getvalue(),
mimetype="application/zip",
headers={
"Content-Disposition": f'attachment; filename="{filename}"',
"X-Export-Count": str(count),
},
)

except Exception as e:
_logger.error(
"Export failed: %s (%s)",
e,
type(e).__name__,
exc_info=True,
)
return json_response({"error": "Export failed"}, 500)

Expand Down
17 changes: 13 additions & 4 deletions services/export_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,10 +230,17 @@ def _collect_ide_export_entries(
ctx = orch.ctx
exported: list[CollectedExportEntry] = []
for row in db_data.ide_composer_rows:
composer_id = row["key"].split(":")[1]
row_key = row["key"]
if ":" not in row_key:
_logger.debug(
"Skipping composer row with malformed key %r",
row_key,
)
continue
composer_id = row_key.split(":", 1)[1]
try:
cd = json.loads(row["value"])
except (json.JSONDecodeError, ValueError) as parse_err:
except (json.JSONDecodeError, TypeError, ValueError) as parse_err:
_logger.debug(
"Skipping corrupt composerData row %s: %s",
composer_id,
Expand Down Expand Up @@ -383,9 +390,11 @@ def _collect_cli_export_entries(
continue

for session in cp["sessions"]:
meta = session.get("meta", {})
raw_meta = session.get("meta")
meta = raw_meta if isinstance(raw_meta, dict) else {}
session_id = session["session_id"]
created_ms: int = meta.get("createdAt") or int(
created_raw = meta.get("createdAt")
created_ms = to_epoch_ms(created_raw) if created_raw else int(
datetime.now().timestamp() * 1000,
)
session_name = meta.get("name") or f"Session {session_id[:8]}"
Expand Down
26 changes: 22 additions & 4 deletions services/workspace_listing.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,15 @@
from models import Bubble, ParseWarningCollector
from services.export_engine import WorkspaceOrchestration, prepare_workspace_orchestration
from services.summary_cache import (
fingerprint_workspace_storage,
get_cached_projects,
nocache_enabled,
set_cached_projects,
)
from services.workspace_db import (
COMPOSER_ROWS_WITH_HEADERS_SQL,
collect_workspace_entries,
global_storage_db_path,
load_project_layouts_for_composer,
load_project_layouts_map,
open_global_db,
Expand Down Expand Up @@ -91,14 +94,29 @@ def list_workspace_projects(
:meth:`models.ParseWarningCollector.to_api_list`; empty when no skips.
"""
effective_nocache = nocache_enabled(request_nocache=nocache)
orch = prepare_workspace_orchestration(
workspace_path, rules, nocache=effective_nocache,
)
workspace_entries: list[dict[str, Any]] | None = None
if not effective_nocache:
cached = get_cached_projects(orch.fingerprint)
workspace_entries = collect_workspace_entries(workspace_path)
gdb = global_storage_db_path(workspace_path)
cli_path = get_cli_chats_path()
fingerprint = fingerprint_workspace_storage(
workspace_path,
workspace_entries,
global_db_path=gdb if os.path.isfile(gdb) else None,
rules=rules,
cli_chats_path=cli_path if os.path.isdir(cli_path) else None,
)
cached = get_cached_projects(fingerprint)
if cached is not None:
return cached

orch = prepare_workspace_orchestration(
workspace_path,
rules,
nocache=effective_nocache,
workspace_entries=workspace_entries,
)

projects, warnings = _build_workspace_projects_uncached(
workspace_path, rules, orch,
)
Expand Down
Loading
Loading