feat: decouple ACP protocol with AgentProtocolGateway abstraction#190
feat: decouple ACP protocol with AgentProtocolGateway abstraction#190prassanna-ravishankar wants to merge 2 commits intomainfrom
Conversation
Introduce protocol abstraction layer so future protocols (A2A, etc.) can be added without touching business logic. AgentACPService now implements AgentProtocolGateway ABC, and all consumers depend on the interface rather than the concrete class.
There was a problem hiding this comment.
seems like a legacy change that I needed for CI to pass
|
The "After: Protocol Abstraction" section in the diagram has "ACPGateway" after ABC which should ideally be "AgentACPService". Right? |
|
So how will we resolve the right gateway at runtime - per-request or at DI time? |
|
The abstract gateway abstracts the outbound communications, but the entire inbound stack — registration API, RPC schema, streaming format, ACP type validation, and URL resolution — remains ACP-specific. When an A2A agent registers, it would still need to provide acp_url and acp_type. Should we scope the abstraction to also cover the inbound/registration path, or is the plan to build a separate A2A entry point (e.g., /agents/{id}/a2a) that bypasses the ACP-specific layers? |
|
Nice questions @paulretinraj
|
The Problem
Agentex currently speaks one language to downstream agents: ACP, a JSON-RPC 2.0 protocol that handles task creation, message passing, cancellation, and event delivery over HTTP. Every layer of the backend — from the task service that forwards work to agents, through the use case that dispatches RPC methods, down to the Temporal health check workflows — assumes ACP as the only way to communicate. The protocol's concepts are woven directly into domain services, with
AgentACPServiceinjected as a concrete dependency,acp_urlparameters threaded through method signatures, and a staticACP_TYPE_TO_ALLOWED_RPC_METHODSdictionary hard-wiring which operations each agent type supports.This tight coupling means that supporting a second protocol — whether that's Google's A2A, a future internal protocol, or a lightweight webhook-based approach — would require invasive changes across the entire service layer. Business logic that has nothing to do with wire format (task lifecycle management, message persistence, delta accumulation for streaming) would need to be touched, tested, and re-validated, even though the logic itself wouldn't change.
The Approach
This PR introduces a protocol abstraction layer by extracting an
AgentProtocolGatewayabstract base class that captures the essential operations any agent communication protocol must support: creating tasks, sending messages (both synchronous and streaming), cancelling tasks, sending events, and checking health. The interface uses protocol-neutral parameter names (service_urlrather thanacp_url) and accepts only domain entities — no JSON-RPC concepts leak through the boundary.AgentACPServicenow implements this ABC, and bothAgentTaskServiceandAgentsACPUseCasedepend on the interface rather than the concrete class. FastAPI's dependency injection resolvesDAgentProtocolGatewaytoAgentACPServicetoday, but swapping that target is a one-line change when a second protocol arrives. The health check Temporal workflow, previously making rawhttpxcalls to/healthz, now delegates toprotocol_gateway.check_health(), which accepts a lightweightagent_idstring rather than requiring a full entity fetch from the database.The PR also adds an
AgentProtocolenum (ACP = "acp") toAgentEntitywith a corresponding database migration, establishing the per-agent protocol field that will drive gateway resolution in the future.Key design decisions
The ABC deliberately excludes ACP-specific concepts.
get_allowed_methods(), which mapsACPTypeto permitted RPC methods, lives onAgentACPServiceas a class-level constant rather than on the protocol-neutral interface — a non-ACP protocol would have no use for ACP type enums. Similarly,check_health()acceptsagent_id: strinstead ofAgentEntity, avoiding an unnecessary database round-trip on every health check tick. Thesend_event()method carries a defaultNotImplementedErrorsince not all protocols support the event concept, while the four core operations (create, send, stream, cancel) remain abstract.What Changed
src/domain/services/agent_protocol_gateway.py— the ABC defining the protocol-neutral interfaceAgentACPService: implements the ABC, addscheck_health()andACP_ALLOWED_METHODSclass constant, renamesacp_url→service_urlin all public method signaturesAgentTaskService: depends onDAgentProtocolGatewayinstead ofDAgentACPService, method renames (forward_task_to_acp→forward_task,create_event_and_forward_to_acp→create_event_and_forward)AgentsACPUseCase: depends on the ABC, renames_resolve_acp_url→_resolve_service_url,acp_url_override→service_url_overrideAgentEntity: newAgentProtocolenum andprotocolfield (default"acp")protocolcolumn onagentstable withserver_default="acp"HealthCheckActivitiesusesprotocol_gateway.check_health(agent_id, service_url)instead of raw httpx, eliminating a per-tick DB queryFuture: A2A Protocol Support
The primary motivation for this decoupling is enabling support for Google's Agent-to-Agent (A2A) protocol. A2A differs from ACP in fundamental ways: tasks are created implicitly via
SendMessagerather than through an explicit creation step, agents advertise capabilities through discoverable Agent Cards at well-known endpoints rather than push-based registration, streaming uses SSE rather than NDJSON, and the task state machine includes states likeINPUT_REQUIREDandAUTH_REQUIREDthat formalize human-in-the-loop patterns.Adding A2A support now becomes a contained effort rather than a cross-cutting refactor. The work involves creating an
A2AGateway(AgentProtocolGateway)that maps between A2A'sParttypes and ourTaskMessageContentEntitymodel, handles SSE streaming behind the sameAsyncIterator[TaskMessageUpdateEntity]interface, and implements Agent Card-based discovery incheck_health(). TheAgentProtocolenum gets a newA2A = "a2a"variant, and aProtocolGatewayResolverreplaces the current direct DI alias to route requests based onagent.protocol. The business logic layer — task lifecycle, message persistence, delta accumulation, authorization — remains entirely untouched.Test plan
ACPType.AGENTICandACPType.ASYNCbehavior preserved)ruff checkandruff formatclean🤖 Generated with Claude Code
Greptile Summary
This PR introduces an
AgentProtocolGatewayABC to decouple ACP-specific wire-format logic from the domain service and use-case layers, and adds anAgentProtocolenum plus aprotocolcolumn onagentsto enable per-agent protocol routing in the future. The Temporal health-check workflow is also migrated from rawhttpxcalls toprotocol_gateway.check_health(), eliminating a per-tick DB fetch.Confidence Score: 5/5
Safe to merge; all remaining findings are P2 style/naming suggestions with no correctness impact
The abstraction is well-structured and backwards-compatible (11/11 ACP type tests pass). The migration is correct, DI wiring is sound, and no logic bugs were found. The three P2 findings — duplicate allowlist, leftover acp_url naming in the Temporal layer, and non-abstract check_health — are cleanup items that do not affect current runtime behavior and can be addressed in a follow-up.
agent_protocol_gateway.py (check_health abstractness), agents_rpc.py (deprecated dict still actively used), healthcheck_activities.py (acp_url parameter naming)
Important Files Changed
Class Diagram
%%{init: {'theme': 'neutral'}}%% classDiagram class AgentProtocolGateway { <<abstract>> +create_task(agent, task, service_url, params)* +send_message(agent, task, content, service_url)* +send_message_stream(agent, task, content, service_url)* +cancel_task(agent, task, service_url)* +send_event(agent, event, task, service_url) +check_health(agent_id, service_url) } class AgentACPService { +ACP_ALLOWED_METHODS dict +create_task() +send_message() +send_message_stream() +cancel_task() +send_event() +check_health() +get_allowed_methods(acp_type) } class AgentTaskService { -protocol_gateway DAgentProtocolGateway +forward_task() +send_message() +send_message_stream() +cancel_task() +create_event_and_forward() } class AgentsACPUseCase { -protocol_gateway DAgentProtocolGateway +handle_rpc_request() -_validate_rpc_method_for_acp_type()$ -_resolve_service_url() } class HealthCheckActivities { -protocol_gateway AgentProtocolGateway +check_status_activity(agent_id, acp_url) +update_agent_status_activity(agent_id, status) } AgentProtocolGateway <|-- AgentACPService : implements AgentTaskService --> AgentProtocolGateway : DAgentProtocolGateway AgentsACPUseCase --> AgentProtocolGateway : DAgentProtocolGateway HealthCheckActivities --> AgentProtocolGateway : injectedPrompt To Fix All With AI
Reviews (2): Last reviewed commit: "fix: rename acp_url→service_url in use c..." | Re-trigger Greptile