-
Notifications
You must be signed in to change notification settings - Fork 2.8k
feat(durable): Add durable session persistence layer for long-horizon agents #4351
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
feat(durable): Add durable session persistence layer for long-horizon agents #4351
Conversation
|
Thanks for your pull request! It looks like this may be your first contribution to a Google open source project. Before we can look at your pull request, you'll need to sign a Contributor License Agreement (CLA). View this failed invocation of the CLA check for more information. For the most up to date status, view the checks section at the bottom of the pull request. |
|
Response from ADK Triaging Agent Hello @caohy1988, thank you for creating this PR! Before we can proceed with the review, could you please address the following items from our contribution guidelines:
This information will help us to review your PR more efficiently. Thanks! |
Summary of ChangesHello @caohy1988, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly enhances ADK's capabilities by introducing a robust durable session persistence layer. This new feature allows long-running agent tasks to maintain their state across process boundaries and system failures, ensuring continuity and reliability for complex, time-consuming operations. By leveraging BigQuery and Google Cloud Storage, it provides an auditable and scalable solution for managing agent progress, effectively overcoming limitations previously faced in cloud environments. Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a comprehensive and well-designed durable session persistence layer, which is a significant feature for enabling long-horizon agents. The use of BigQuery for metadata and GCS for blobs is a robust pattern, and the implementation correctly includes key features like two-phase commits and lease-based concurrency. The accompanying demo is excellent for showcasing the functionality. My review identifies a few important issues to address, primarily concerning security (a hardcoded API key and a potential path traversal vulnerability), a race condition in session creation, and several opportunities for code refinement and improved maintainability. Overall, this is a strong feature addition, and addressing these points will make it even more robust.
| GOOGLE_CLOUD_API_KEY = os.environ.get( | ||
| "GOOGLE_CLOUD_API_KEY", | ||
| "AQ.Ab8RN6L12XpDo1x7Gf2w87EfspguWGrjZPW6XocNy2og_-z_jg", | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A default API key is hardcoded as a fallback value. This is a significant security risk, as it could be accidentally committed and exposed. Even for a demo, it's best practice to avoid hardcoding secrets. The application should fail explicitly if the key is not provided in the environment, rather than falling back to a hardcoded value.
GOOGLE_CLOUD_API_KEY = os.environ.get("GOOGLE_CLOUD_API_KEY")
if not GOOGLE_CLOUD_API_KEY:
raise ValueError("GOOGLE_CLOUD_API_KEY environment variable not set.")There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reverted
| existing = await self.get_session(session_id=session_id) | ||
| if existing: | ||
| raise ValueError(f"Session {session_id} already exists") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a race condition here. Two concurrent requests could both check for an existing session, find none, and then both attempt to create it. Since BigQuery PRIMARY KEY constraints are not enforced, this could lead to duplicate session entries. The session creation logic should be made idempotent. One approach is to use a unique ID for the BigQuery insert job, which makes the insertion retryable and idempotent within a certain window.
| safe_members = [ | ||
| m for m in tar.getmembers() if not m.name.startswith(("/", "..")) | ||
| ] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The check to prevent path traversal attacks (tar-slip) is insufficient. An attacker could craft a filename like a/../../etc/passwd which would bypass the current check. A more robust approach is to resolve the real path of each member and ensure it is within the intended destination directory before extraction. Using tar.extractall with a filtered list is risky; it's safer to iterate through members and extract them individually with proper path validation.
for member in tar.getmembers():
member_path = os.path.join(self._workspace_dir, member.name)
# Resolve the absolute path and ensure it's within the workspace
if os.path.realpath(member_path).startswith(os.path.realpath(self._workspace_dir)):
tar.extract(member, self._workspace_dir)
else:
logger.warning("Skipping potentially unsafe path in tarball: %s", member.name)| async def list_sessions(): | ||
| """List all sessions from BigQuery.""" | ||
| try: | ||
| client = checkpoint_store._get_bq_client() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| except Exception as e: | ||
| return {"sessions": [], "error": str(e)} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Catching a broad Exception can hide bugs and make debugging difficult. It's better to catch more specific exceptions that you expect to handle (e.g., exceptions from the BigQuery client). Additionally, returning a 200 OK status with an error message in the body for a failed API call is not standard practice. Consider raising an HTTPException with a 5xx status code to provide a more accurate API response.
| except Exception as e: | |
| return {"sessions": [], "error": str(e)} | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Failed to list sessions: {e}") |
|
|
||
| async def run_task_with_checkpoints(session_id: str, duration: int, resume: bool = False): | ||
| """Run a long-running task with periodic checkpoints.""" | ||
| import random |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| with open("/tmp/lifecycle.json", "w") as f: | ||
| f.write(lifecycle_config) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using a hardcoded path like /tmp/lifecycle.json can be problematic in environments where /tmp is not writable or has specific restrictions (e.g., some serverless environments). It's more robust to use Python's tempfile module to create temporary files in a secure and platform-independent manner.
import tempfile
# ...
with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".json") as tmp_file:
tmp_file.write(lifecycle_config)
lifecycle_path = tmp_file.name
run_command(
[
"gsutil",
"lifecycle",
"set",
lifecycle_path,
f"gs://{GCS_BUCKET}",
],
check=False,
)
os.remove(lifecycle_path)| active_lease_id=row.active_lease_id, | ||
| lease_expiry=row.lease_expiry, | ||
| ttl_expiry=row.ttl_expiry, | ||
| metadata=row.metadata if isinstance(row.metadata, dict) else (json.loads(row.metadata) if row.metadata else None), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
… agents This PR implements a durable session persistence layer for ADK, enabling cross-process checkpoint-based recovery for long-running agent tasks. ## Key Features - **DurableSessionConfig**: Configuration for durable cross-process checkpointing - **BigQueryCheckpointStore**: Two-phase commit checkpoint storage (BQ metadata + GCS blobs) - **CheckpointableAgentState**: Abstract interface for agents supporting durability - **WorkspaceSnapshotter**: GCS-based workspace directory snapshotting ## Implementation Details - Two-phase commit: GCS blob upload → BigQuery metadata insert - SHA-256 checkpoint integrity verification - Lease-based concurrency control for safe resume - Async-first API design for non-blocking I/O ## Demo A fully functional demo is deployed on Cloud Run showcasing: - Real-time checkpoint visualization - Task failure simulation and recovery - BigQuery metadata queries - Final task output display Demo URL: https://durable-demo-201486563047.us-central1.run.app ## Files Added - src/google/adk/durable/ - Core durable module - contributing/samples/long_running_task/ - Demo agent and UI - tests/unittests/durable/ - Unit tests Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
99e7726 to
7d946ed
Compare
Summary
This PR implements a durable session persistence layer for ADK, enabling cross-process checkpoint-based recovery for long-running agent tasks. This addresses the "12-minute barrier" problem where agents lose state during long BigQuery jobs or other async operations.
Key Features
Implementation Highlights
@experimentalFiles Added
Core Module (
src/google/adk/durable/)config.py- DurableSessionConfigcheckpointable_state.py- CheckpointableAgentState ABCstores/base_checkpoint_store.py- DurableSessionStore ABCstores/bigquery_checkpoint_store.py- BigQuery + GCS implementationworkspace_snapshotter.py- GCS workspace snapshotsDemo (
contributing/samples/long_running_task/)agent.py- Demo agent with durable configdemo_server.py- FastAPI server with checkpoint APIsdemo_ui.html- Real-time visualization UIlong_running_task_design.md- Detailed design documentTests (
tests/unittests/durable/)Live Demo
A fully functional demo is deployed on Cloud Run:
URL: https://durable-demo-201486563047.us-central1.run.app
The demo showcases:
Infrastructure:
test-project-0728-467323.adk_metadatags://test-project-0728-467323-adk-checkpointsTest plan
Design Document
See
contributing/samples/long_running_task/long_running_task_design.mdfor the full design including:🤖 Generated with Claude Code