Skip to content

Commit bd4523f

Browse files
[Example] Add MCP server example for AI agent integration
Closes #302 Co-authored-by: Cursor <cursoragent@cursor.com>
1 parent 7e74cd6 commit bd4523f

1 file changed

Lines changed: 233 additions & 0 deletions

File tree

examples/mcp_server.py

Lines changed: 233 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,233 @@
1+
#!/usr/bin/env python3
2+
#
3+
# Licensed to the Apache Software Foundation (ASF) under one
4+
# or more contributor license agreements. See the NOTICE file
5+
# distributed with this work for additional information
6+
# regarding copyright ownership. The ASF licenses this file
7+
# to you under the Apache License, Version 2.0 (the
8+
# "License"); you may not use this file except in compliance
9+
# with the License. You may obtain a copy of the License at
10+
#
11+
# http://www.apache.org/licenses/LICENSE-2.0
12+
#
13+
# Unless required by applicable law or agreed to in writing,
14+
# software distributed under the License is distributed on an
15+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16+
# KIND, either express or implied. See the License for the
17+
# specific language governing permissions and limitations
18+
# under the License.
19+
#
20+
21+
"""
22+
MCP (Model Context Protocol) server for Apache Pulsar.
23+
24+
Exposes three tools that allow AI agents (Cursor, Claude Desktop, etc.)
25+
to produce, consume, and inspect Pulsar topics via natural language.
26+
27+
Prerequisites:
28+
pip install "mcp[cli]" pulsar-client
29+
30+
Usage:
31+
PULSAR_BROKER_URL=pulsar://localhost:6650 python3 examples/mcp_server.py
32+
33+
Configure in Cursor (~/.cursor/mcp.json) or Claude Desktop:
34+
{
35+
"mcpServers": {
36+
"pulsar": {
37+
"command": "python3",
38+
"args": ["examples/mcp_server.py"],
39+
"env": {
40+
"PULSAR_BROKER_URL": "pulsar://localhost:6650",
41+
"PULSAR_TOKEN": ""
42+
}
43+
}
44+
}
45+
}
46+
"""
47+
48+
import logging
49+
import os
50+
import sys
51+
from datetime import timedelta
52+
from typing import Optional
53+
54+
import pulsar
55+
from mcp.server.fastmcp import FastMCP
56+
57+
mcp = FastMCP("pulsar")
58+
59+
_broker_url = os.getenv("PULSAR_BROKER_URL", "pulsar://localhost:6650")
60+
_token = os.getenv("PULSAR_TOKEN", "")
61+
62+
# The C++ client logs to stdout by default, which would corrupt the
63+
# MCP stdio JSON-RPC stream. Route logs to stderr instead.
64+
_pulsar_logger = logging.getLogger("pulsar")
65+
_pulsar_logger.setLevel(logging.INFO)
66+
if not _pulsar_logger.handlers:
67+
_handler = logging.StreamHandler(sys.stderr)
68+
_handler.setFormatter(logging.Formatter("%(asctime)s %(levelname)s %(message)s"))
69+
_pulsar_logger.addHandler(_handler)
70+
71+
72+
def _make_client() -> pulsar.Client:
73+
if _token:
74+
return pulsar.Client(
75+
_broker_url,
76+
authentication=pulsar.AuthenticationToken(_token),
77+
logger=_pulsar_logger,
78+
)
79+
return pulsar.Client(_broker_url, logger=_pulsar_logger)
80+
81+
82+
@mcp.tool()
83+
def pulsar_publish(
84+
topic: str,
85+
payload: str,
86+
properties: Optional[dict] = None,
87+
partition_key: Optional[str] = None,
88+
delay_seconds: Optional[int] = None,
89+
) -> dict:
90+
"""
91+
Publish a message to a Pulsar topic.
92+
93+
Args:
94+
topic: Full topic name, e.g. persistent://public/default/my-topic
95+
payload: Message body as a string
96+
properties: Optional key-value string properties
97+
partition_key: Optional routing key for partitioned topics
98+
delay_seconds: Deliver the message after this many seconds
99+
"""
100+
client = _make_client()
101+
try:
102+
producer = client.create_producer(topic)
103+
msg_id = producer.send(
104+
payload.encode("utf-8"),
105+
properties=properties or {},
106+
partition_key=partition_key,
107+
deliver_after=(
108+
timedelta(seconds=delay_seconds)
109+
if delay_seconds and delay_seconds > 0
110+
else None
111+
),
112+
)
113+
producer.close()
114+
return {"message_id": str(msg_id)}
115+
finally:
116+
client.close()
117+
118+
119+
@mcp.tool()
120+
def pulsar_consume(
121+
topic: str,
122+
subscription: str,
123+
subscription_type: str = "Shared",
124+
initial_position: str = "earliest",
125+
max_messages: int = 10,
126+
timeout_ms: int = 3000,
127+
) -> dict:
128+
"""
129+
Consume messages from a Pulsar topic, auto-acknowledging each one.
130+
131+
Args:
132+
topic: Full topic name, e.g. persistent://public/default/my-topic
133+
subscription: Subscription name
134+
subscription_type: Exclusive | Shared | Failover | Key_Shared
135+
initial_position: Where a NEW subscription starts: "earliest" or "latest"
136+
(ignored if the subscription already exists)
137+
max_messages: Maximum number of messages to return (default 10)
138+
timeout_ms: Milliseconds to wait for each message (default 3000)
139+
"""
140+
sub_type_map = {
141+
"Exclusive": pulsar.ConsumerType.Exclusive,
142+
"Shared": pulsar.ConsumerType.Shared,
143+
"Failover": pulsar.ConsumerType.Failover,
144+
"Key_Shared": pulsar.ConsumerType.KeyShared,
145+
}
146+
consumer_type = sub_type_map.get(subscription_type, pulsar.ConsumerType.Shared)
147+
init_pos = (
148+
pulsar.InitialPosition.Earliest
149+
if initial_position == "earliest"
150+
else pulsar.InitialPosition.Latest
151+
)
152+
153+
client = _make_client()
154+
try:
155+
consumer = client.subscribe(
156+
topic,
157+
subscription,
158+
consumer_type=consumer_type,
159+
initial_position=init_pos,
160+
)
161+
messages = []
162+
for _ in range(max_messages):
163+
try:
164+
msg = consumer.receive(timeout_millis=timeout_ms)
165+
messages.append(
166+
{
167+
"message_id": str(msg.message_id()),
168+
"topic": msg.topic_name(),
169+
"publish_timestamp": msg.publish_timestamp(),
170+
"properties": msg.properties(),
171+
"payload": msg.data().decode("utf-8", errors="replace"),
172+
}
173+
)
174+
consumer.acknowledge(msg)
175+
except Exception:
176+
break
177+
consumer.close()
178+
return {"messages": messages, "count": len(messages)}
179+
finally:
180+
client.close()
181+
182+
183+
@mcp.tool()
184+
def pulsar_peek(
185+
topic: str,
186+
start_from: str = "earliest",
187+
max_messages: int = 10,
188+
timeout_ms: int = 2000,
189+
) -> dict:
190+
"""
191+
Read messages from a topic using a Reader (no subscription, no ack).
192+
Useful for inspecting topic contents without affecting consumer state.
193+
194+
Args:
195+
topic: Full topic name, e.g. persistent://public/default/my-topic
196+
start_from: "earliest" or "latest" (default "earliest")
197+
max_messages: Maximum messages to return (default 10)
198+
timeout_ms: Milliseconds to wait for each message (default 2000)
199+
"""
200+
start_id = (
201+
pulsar.MessageId.earliest
202+
if start_from == "earliest"
203+
else pulsar.MessageId.latest
204+
)
205+
206+
client = _make_client()
207+
try:
208+
reader = client.create_reader(topic, start_id)
209+
messages = []
210+
for _ in range(max_messages):
211+
if not reader.has_message_available():
212+
break
213+
try:
214+
msg = reader.read_next(timeout_millis=timeout_ms)
215+
messages.append(
216+
{
217+
"message_id": str(msg.message_id()),
218+
"topic": msg.topic_name(),
219+
"publish_timestamp": msg.publish_timestamp(),
220+
"properties": msg.properties(),
221+
"payload": msg.data().decode("utf-8", errors="replace"),
222+
}
223+
)
224+
except Exception:
225+
break
226+
reader.close()
227+
return {"messages": messages, "count": len(messages)}
228+
finally:
229+
client.close()
230+
231+
232+
if __name__ == "__main__":
233+
mcp.run()

0 commit comments

Comments
 (0)