Running dynamically generated agents with ParallelAgent #4346
Replies: 3 comments 2 replies
-
|
Response from ADK Answering Agent (experimental, answer may be inaccurate) TLDR: Your current approach of creating "orphan" agents at runtime is discouraged. It can interfere with essential ADK features like state management, introspection, and observability. The recommended pattern is to manage the parallel execution of your tasks within a single custom agent's Hi! That's a great question about handling dynamic, parallel workloads with ADK. Your concern is valid. The documentation snippet you highlighted is key: ADK's framework relies on knowing the hierarchy of agents at initialization time. 1 Creating agents dynamically at runtime, as you are doing with Why the "Orphan" Agent Approach Is DiscouragedBy creating an unmanaged agent, you bypass several features that are critical for building robust and maintainable agent-based systems:
Recommended Pattern for Dynamic Parallel TasksInstead of dynamically creating ADK agents, the recommended approach is to contain the dynamic parallelism inside a single, well-defined custom agent. Your Here’s how you could structure it:
This pattern allows you to leverage Python's native async capabilities for your dynamic workload while ensuring that your agent remains fully compliant with the ADK framework, benefiting from proper lifecycle management, state persistence, and observability. This response was generated by an AI assistant. Please leave a 👍 if it was helpful. |
Beta Was this translation helpful? Give feedback.
-
|
@adk-bot maybe this solution is better? import random
import secrets
from typing import ClassVar, List
from google.adk.events import Event, EventActions
from google.adk.agents import BaseAgent, ParallelAgent, SequentialAgent
from google.genai import types
class Worker(BaseAgent):
"""Simple worker that calculates n²."""
def __init__(self, *, name: str, run_id: str):
super().__init__(name=name)
self._run_id = run_id
async def _run_async_impl(self, ctx):
n = ctx.session.state.get(f"task:{self._run_id}:{self.name}", 0)
result = n * n
yield Event(
author=self.name,
content=types.Content(
role=self.name, parts=[types.Part(text=f"{n}² = {result}")]
),
actions=EventActions(
state_delta={f"result:{self._run_id}:{self.name}": result}
),
)
class PlannerAndRunner(BaseAgent):
"""Distributes tasks and dynamically creates a ParallelAgent."""
POOL: ClassVar[List[str]] = ["w0", "w1", "w2"]
async def _run_async_impl(self, ctx):
run_id = secrets.token_hex(2)
picked = random.sample(self.POOL, k=random.randint(1, len(self.POOL)))
task_delta = {f"task:{run_id}:{name}": random.randint(1, 9) for name in picked}
yield Event(
author=self.name,
content=types.Content(
role=self.name,
parts=[types.Part(text=f"Run {run_id} tasks {task_delta}")],
),
actions=EventActions(state_delta={"current_run": run_id, **task_delta}),
)
parallel = ParallelAgent(
name=f"block_{run_id}",
sub_agents=[Worker(name=n, run_id=run_id) for n in picked],
)
async for ev in parallel.run_async(ctx):
yield ev
class Aggregator(BaseAgent):
"""Aggregates results from workers."""
async def _run_async_impl(self, ctx):
run_id = ctx.session.state.get("current_run")
vals = [
v
for k, v in ctx.session.state.items()
if run_id and k.startswith(f"result:{run_id}:")
]
yield Event(
author=self.name,
content=types.Content(
role=self.name, parts=[types.Part(text=f"Sum = {sum(vals)}")]
),
actions=EventActions(escalate=True),
)
root_agent = SequentialAgent(
name="root",
sub_agents=[PlannerAndRunner(name="planner"), Aggregator(name="collector")],
) |
Beta Was this translation helpful? Give feedback.
-
|
Response from ADK Answering Agent (experimental, answer may be inaccurate) TLDR: Your current "orphan agent" approach is discouraged because it breaks framework features like resumability and introspection. The recommended pattern is to define a single, reusable sub-agent for the parallel task and orchestrate the concurrent calls within your custom agent using Hi! This is an excellent question that touches on a common and powerful pattern: dynamic fan-out/gather workflows. Your current implementation is clever, but your intuition is correct—creating "orphan" agents at runtime is discouraged, and the documentation snippet you found explains exactly why. Let's break down your questions. Is the runtime
|
Beta Was this translation helpful? Give feedback.
Uh oh!
There was an error while loading. Please reload this page.
-
Hi!
I'm working on a project where we need to perform the following steps:
My current implementation is having a custom agent that:
ParallelAgentwhose sub-agents are defined dynamically based on the output of the first agent.Here's a simple example:
Note that in this case, the
ParallelAgentis an orphan agent, since it needs to be recreated in every run.The reason we recreate the
ParallelAgentif every run, is that if we set it as a sub agent of theGeneratorWorkflow, it means it would be shared across different sessions, potentially resulting in race condition issues.While this seems to work well, the documentation explicitly mentions that:
My questions are:
I'm happy to provide more details or examples if needed. Thanks!
Beta Was this translation helpful? Give feedback.
All reactions