Skip to content

feat: decouple ACP protocol with AgentProtocolGateway abstraction#190

Open
prassanna-ravishankar wants to merge 2 commits intomainfrom
feat/decouple-acp
Open

feat: decouple ACP protocol with AgentProtocolGateway abstraction#190
prassanna-ravishankar wants to merge 2 commits intomainfrom
feat/decouple-acp

Conversation

@prassanna-ravishankar
Copy link
Copy Markdown
Member

@prassanna-ravishankar prassanna-ravishankar commented Apr 7, 2026

The Problem

Agentex currently speaks one language to downstream agents: ACP, a JSON-RPC 2.0 protocol that handles task creation, message passing, cancellation, and event delivery over HTTP. Every layer of the backend — from the task service that forwards work to agents, through the use case that dispatches RPC methods, down to the Temporal health check workflows — assumes ACP as the only way to communicate. The protocol's concepts are woven directly into domain services, with AgentACPService injected as a concrete dependency, acp_url parameters threaded through method signatures, and a static ACP_TYPE_TO_ALLOWED_RPC_METHODS dictionary hard-wiring which operations each agent type supports.

This tight coupling means that supporting a second protocol — whether that's Google's A2A, a future internal protocol, or a lightweight webhook-based approach — would require invasive changes across the entire service layer. Business logic that has nothing to do with wire format (task lifecycle management, message persistence, delta accumulation for streaming) would need to be touched, tested, and re-validated, even though the logic itself wouldn't change.

image

The Approach

This PR introduces a protocol abstraction layer by extracting an AgentProtocolGateway abstract base class that captures the essential operations any agent communication protocol must support: creating tasks, sending messages (both synchronous and streaming), cancelling tasks, sending events, and checking health. The interface uses protocol-neutral parameter names (service_url rather than acp_url) and accepts only domain entities — no JSON-RPC concepts leak through the boundary.

AgentACPService now implements this ABC, and both AgentTaskService and AgentsACPUseCase depend on the interface rather than the concrete class. FastAPI's dependency injection resolves DAgentProtocolGateway to AgentACPService today, but swapping that target is a one-line change when a second protocol arrives. The health check Temporal workflow, previously making raw httpx calls to /healthz, now delegates to protocol_gateway.check_health(), which accepts a lightweight agent_id string rather than requiring a full entity fetch from the database.

The PR also adds an AgentProtocol enum (ACP = "acp") to AgentEntity with a corresponding database migration, establishing the per-agent protocol field that will drive gateway resolution in the future.

Key design decisions

The ABC deliberately excludes ACP-specific concepts. get_allowed_methods(), which maps ACPType to permitted RPC methods, lives on AgentACPService as a class-level constant rather than on the protocol-neutral interface — a non-ACP protocol would have no use for ACP type enums. Similarly, check_health() accepts agent_id: str instead of AgentEntity, avoiding an unnecessary database round-trip on every health check tick. The send_event() method carries a default NotImplementedError since not all protocols support the event concept, while the four core operations (create, send, stream, cancel) remain abstract.

What Changed

  • New file: src/domain/services/agent_protocol_gateway.py — the ABC defining the protocol-neutral interface
  • AgentACPService: implements the ABC, adds check_health() and ACP_ALLOWED_METHODS class constant, renames acp_urlservice_url in all public method signatures
  • AgentTaskService: depends on DAgentProtocolGateway instead of DAgentACPService, method renames (forward_task_to_acpforward_task, create_event_and_forward_to_acpcreate_event_and_forward)
  • AgentsACPUseCase: depends on the ABC, renames _resolve_acp_url_resolve_service_url, acp_url_overrideservice_url_override
  • AgentEntity: new AgentProtocol enum and protocol field (default "acp")
  • ORM + migration: new protocol column on agents table with server_default="acp"
  • Health check: HealthCheckActivities uses protocol_gateway.check_health(agent_id, service_url) instead of raw httpx, eliminating a per-tick DB query
  • Tests: updated constructor args, method names, and mock fixtures across 8 test files

Future: A2A Protocol Support

The primary motivation for this decoupling is enabling support for Google's Agent-to-Agent (A2A) protocol. A2A differs from ACP in fundamental ways: tasks are created implicitly via SendMessage rather than through an explicit creation step, agents advertise capabilities through discoverable Agent Cards at well-known endpoints rather than push-based registration, streaming uses SSE rather than NDJSON, and the task state machine includes states like INPUT_REQUIRED and AUTH_REQUIRED that formalize human-in-the-loop patterns.

Adding A2A support now becomes a contained effort rather than a cross-cutting refactor. The work involves creating an A2AGateway(AgentProtocolGateway) that maps between A2A's Part types and our TaskMessageContentEntity model, handles SSE streaming behind the same AsyncIterator[TaskMessageUpdateEntity] interface, and implements Agent Card-based discovery in check_health(). The AgentProtocol enum gets a new A2A = "a2a" variant, and a ProtocolGatewayResolver replaces the current direct DI alias to route requests based on agent.protocol. The business logic layer — task lifecycle, message persistence, delta accumulation, authorization — remains entirely untouched.

Test plan

  • Backwards compatibility tests pass (11/11 — ACPType.AGENTIC and ACPType.ASYNC behavior preserved)
  • ruff check and ruff format clean
  • No secrets detected (TruffleHog)
  • Unit tests with Docker (testcontainers needed for Postgres/MongoDB/Redis)
  • Integration tests with Docker
  • Manual verification: agent registration, RPC dispatch, streaming responses, health check workflow

🤖 Generated with Claude Code

Greptile Summary

This PR introduces an AgentProtocolGateway ABC to decouple ACP-specific wire-format logic from the domain service and use-case layers, and adds an AgentProtocol enum plus a protocol column on agents to enable per-agent protocol routing in the future. The Temporal health-check workflow is also migrated from raw httpx calls to protocol_gateway.check_health(), eliminating a per-tick DB fetch.

Confidence Score: 5/5

Safe to merge; all remaining findings are P2 style/naming suggestions with no correctness impact

The abstraction is well-structured and backwards-compatible (11/11 ACP type tests pass). The migration is correct, DI wiring is sound, and no logic bugs were found. The three P2 findings — duplicate allowlist, leftover acp_url naming in the Temporal layer, and non-abstract check_health — are cleanup items that do not affect current runtime behavior and can be addressed in a follow-up.

agent_protocol_gateway.py (check_health abstractness), agents_rpc.py (deprecated dict still actively used), healthcheck_activities.py (acp_url parameter naming)

Important Files Changed

Filename Overview
agentex/src/domain/services/agent_protocol_gateway.py New ABC defining the protocol-neutral gateway interface; check_health and send_event use NotImplementedError rather than @AbstractMethod, weakening the compile-time contract
agentex/src/domain/services/agent_acp_service.py Correctly implements AgentProtocolGateway; adds check_health via /healthz and ACP_ALLOWED_METHODS class constant; all acp_url→service_url renames applied
agentex/src/domain/services/task_service.py Switches dependency from DAgentACPService to DAgentProtocolGateway; method renames (forward_task, create_event_and_forward) applied cleanly
agentex/src/domain/use_cases/agents_acp_use_case.py Depends on DAgentProtocolGateway; _validate_rpc_method_for_acp_type @staticmethod still uses deprecated ACP_TYPE_TO_ALLOWED_RPC_METHODS alongside new ACP_ALLOWED_METHODS on AgentACPService
agentex/src/temporal/activities/healthcheck_activities.py Delegates to protocol_gateway.check_health() correctly; activity parameter still named acp_url rather than service_url, inconsistent with rest of PR
agentex/src/domain/entities/agents_rpc.py ACP_TYPE_TO_ALLOWED_RPC_METHODS marked deprecated but still actively used by AgentsACPUseCase; two allowlist definitions now coexist
agentex/database/migrations/alembic/versions/2026_04_07_1538_add_agent_protocol_field_a064de6df78e.py Adds protocol column with server_default='acp'; upgrade and downgrade both correct
agentex/src/domain/entities/agents.py Adds AgentProtocol enum (ACP='acp') and protocol field with default AgentProtocol.ACP to AgentEntity; clean addition

Class Diagram

%%{init: {'theme': 'neutral'}}%%
classDiagram
    class AgentProtocolGateway {
        <<abstract>>
        +create_task(agent, task, service_url, params)*
        +send_message(agent, task, content, service_url)*
        +send_message_stream(agent, task, content, service_url)*
        +cancel_task(agent, task, service_url)*
        +send_event(agent, event, task, service_url)
        +check_health(agent_id, service_url)
    }
    class AgentACPService {
        +ACP_ALLOWED_METHODS dict
        +create_task()
        +send_message()
        +send_message_stream()
        +cancel_task()
        +send_event()
        +check_health()
        +get_allowed_methods(acp_type)
    }
    class AgentTaskService {
        -protocol_gateway DAgentProtocolGateway
        +forward_task()
        +send_message()
        +send_message_stream()
        +cancel_task()
        +create_event_and_forward()
    }
    class AgentsACPUseCase {
        -protocol_gateway DAgentProtocolGateway
        +handle_rpc_request()
        -_validate_rpc_method_for_acp_type()$
        -_resolve_service_url()
    }
    class HealthCheckActivities {
        -protocol_gateway AgentProtocolGateway
        +check_status_activity(agent_id, acp_url)
        +update_agent_status_activity(agent_id, status)
    }
    AgentProtocolGateway <|-- AgentACPService : implements
    AgentTaskService --> AgentProtocolGateway : DAgentProtocolGateway
    AgentsACPUseCase --> AgentProtocolGateway : DAgentProtocolGateway
    HealthCheckActivities --> AgentProtocolGateway : injected
Loading
Prompt To Fix All With AI
This is a comment left during a code review.
Path: agentex/src/domain/entities/agents_rpc.py
Line: 88-102

Comment:
**Duplicate allowlist requires manual sync**

`ACP_TYPE_TO_ALLOWED_RPC_METHODS` is marked deprecated here, but `AgentsACPUseCase._validate_rpc_method_for_acp_type` still imports and uses it. `AgentACPService.ACP_ALLOWED_METHODS` is now the canonical definition and both contain identical data — any future addition of a new RPC method or ACP type must be made in both places with no guard against divergence. Because `_validate_rpc_method_for_acp_type` is a `@staticmethod` it cannot call the service instance; consider either converting it to an instance method that delegates to `AgentACPService.ACP_ALLOWED_METHODS`, or removing this dict and inlining `AgentACPService.ACP_ALLOWED_METHODS` directly in the use case.

How can I resolve this? If you propose a fix, please make it concise.

---

This is a comment left during a code review.
Path: agentex/src/temporal/activities/healthcheck_activities.py
Line: 46-60

Comment:
**Protocol-neutral rename incomplete at the Temporal layer**

The activity parameter is still named `acp_url` (line 46), and `healthcheck_workflow.py` passes it as `workflow_args["acp_url"]`, while every other layer in this PR now uses `service_url`. Consider renaming the parameter to `service_url` here and updating the workflow args dict key to match, so the Temporal boundary is consistently protocol-agnostic.

How can I resolve this? If you propose a fix, please make it concise.

---

This is a comment left during a code review.
Path: agentex/src/domain/services/agent_protocol_gateway.py
Line: 69-75

Comment:
**`check_health` should be `@abstractmethod`**

`check_health` raises `NotImplementedError` instead of being declared `@abstractmethod`. Because the Temporal health-check workflow invokes it unconditionally on every tick, a future `AgentProtocolGateway` subclass that omits this method will pass class instantiation silently and only fail at runtime during a live health-check cycle. Marking it `@abstractmethod` surfaces the omission at instantiation, consistent with the other four core operations.

```suggestion
    @abstractmethod
    async def check_health(
        self,
        agent_id: str,
        service_url: str,
    ) -> bool:
        """Check if the agent server is healthy."""
        ...
```

How can I resolve this? If you propose a fix, please make it concise.

Reviews (2): Last reviewed commit: "fix: rename acp_url→service_url in use c..." | Re-trigger Greptile

Introduce protocol abstraction layer so future protocols (A2A, etc.)
can be added without touching business logic. AgentACPService now
implements AgentProtocolGateway ABC, and all consumers depend on the
interface rather than the concrete class.
@prassanna-ravishankar prassanna-ravishankar requested a review from a team as a code owner April 7, 2026 17:00
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems like a legacy change that I needed for CI to pass

@paulretinraj paulretinraj self-assigned this Apr 7, 2026
@paulretinraj
Copy link
Copy Markdown

The "After: Protocol Abstraction" section in the diagram has "ACPGateway" after ABC which should ideally be "AgentACPService". Right?

@paulretinraj
Copy link
Copy Markdown

So how will we resolve the right gateway at runtime - per-request or at DI time?
Today i see that DAgentProtocolGateway is a static DI alias to AgentACPService. When A2A arrives, we need per-agent resolution since agent.protocol isn't known until we fetch the agent from the DB. What's the plan — a ProtocolGatewayResolver injected into the use case, or a factory function in the DI layer?

@paulretinraj
Copy link
Copy Markdown

The abstract gateway abstracts the outbound communications, but the entire inbound stack — registration API, RPC schema, streaming format, ACP type validation, and URL resolution — remains ACP-specific. When an A2A agent registers, it would still need to provide acp_url and acp_type. Should we scope the abstraction to also cover the inbound/registration path, or is the plan to build a separate A2A entry point (e.g., /agents/{id}/a2a) that bypasses the ACP-specific layers?

@prassanna-ravishankar
Copy link
Copy Markdown
Member Author

prassanna-ravishankar commented Apr 8, 2026

Nice questions @paulretinraj

  1. Updated the image
  2. combine with below
  3. In this PR, the ingress interface is ACP and the egress interface is made flexible. The next PR would be to make ingress interface flexible, and then we have the question on <figure out gateway> on request. Few approaches there, but we can abstract that away as routing policies . Will be after this PR though

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants