Skip to content

Commit 4ae542c

Browse files
[Example] Add MCP server example for AI agent integration
Closes #302 Co-authored-by: Cursor <cursoragent@cursor.com>
1 parent 7e74cd6 commit 4ae542c

1 file changed

Lines changed: 200 additions & 0 deletions

File tree

examples/mcp_server.py

Lines changed: 200 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,200 @@
1+
#!/usr/bin/env python3
2+
#
3+
# Licensed to the Apache Software Foundation (ASF) under one
4+
# or more contributor license agreements. See the NOTICE file
5+
# distributed with this work for additional information
6+
# regarding copyright ownership. The ASF licenses this file
7+
# to you under the Apache License, Version 2.0 (the
8+
# "License"); you may not use this file except in compliance
9+
# with the License. You may obtain a copy of the License at
10+
#
11+
# http://www.apache.org/licenses/LICENSE-2.0
12+
#
13+
# Unless required by applicable law or agreed to in writing,
14+
# software distributed under the License is distributed on an
15+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16+
# KIND, either express or implied. See the License for the
17+
# specific language governing permissions and limitations
18+
# under the License.
19+
#
20+
21+
"""
22+
MCP (Model Context Protocol) server for Apache Pulsar.
23+
24+
Exposes three tools that allow AI agents (Cursor, Claude Desktop, etc.)
25+
to produce, consume, and inspect Pulsar topics via natural language.
26+
27+
Prerequisites:
28+
pip install "mcp[cli]" pulsar-client
29+
30+
Usage:
31+
PULSAR_BROKER_URL=pulsar://localhost:6650 python3 examples/mcp_server.py
32+
33+
Configure in Cursor (~/.cursor/mcp.json) or Claude Desktop:
34+
{
35+
"mcpServers": {
36+
"pulsar": {
37+
"command": "python3",
38+
"args": ["examples/mcp_server.py"],
39+
"env": {
40+
"PULSAR_BROKER_URL": "pulsar://localhost:6650",
41+
"PULSAR_TOKEN": ""
42+
}
43+
}
44+
}
45+
}
46+
"""
47+
48+
import os
49+
from datetime import timedelta
50+
from typing import Optional
51+
52+
import pulsar
53+
from mcp.server.fastmcp import FastMCP
54+
55+
mcp = FastMCP("pulsar")
56+
57+
_broker_url = os.getenv("PULSAR_BROKER_URL", "pulsar://localhost:6650")
58+
_token = os.getenv("PULSAR_TOKEN", "")
59+
60+
61+
def _make_client() -> pulsar.Client:
62+
if _token:
63+
return pulsar.Client(
64+
_broker_url,
65+
authentication=pulsar.AuthenticationToken(_token),
66+
)
67+
return pulsar.Client(_broker_url)
68+
69+
70+
@mcp.tool()
71+
def pulsar_publish(
72+
topic: str,
73+
payload: str,
74+
properties: Optional[dict] = None,
75+
partition_key: Optional[str] = None,
76+
delay_seconds: Optional[int] = None,
77+
) -> dict:
78+
"""
79+
Publish a message to a Pulsar topic.
80+
81+
Args:
82+
topic: Full topic name, e.g. persistent://public/default/my-topic
83+
payload: Message body as a string
84+
properties: Optional key-value string properties
85+
partition_key: Optional routing key for partitioned topics
86+
delay_seconds: Deliver the message after this many seconds
87+
"""
88+
client = _make_client()
89+
try:
90+
producer = client.create_producer(topic)
91+
kwargs = {"properties": properties or {}}
92+
if partition_key:
93+
kwargs["partition_key"] = partition_key
94+
if delay_seconds and delay_seconds > 0:
95+
kwargs["deliver_after"] = timedelta(seconds=delay_seconds)
96+
msg_id = producer.send(payload.encode("utf-8"), **kwargs)
97+
producer.close()
98+
return {"message_id": str(msg_id)}
99+
finally:
100+
client.close()
101+
102+
103+
@mcp.tool()
104+
def pulsar_consume(
105+
topic: str,
106+
subscription: str,
107+
subscription_type: str = "Shared",
108+
max_messages: int = 10,
109+
timeout_ms: int = 3000,
110+
) -> dict:
111+
"""
112+
Consume messages from a Pulsar topic, auto-acknowledging each one.
113+
114+
Args:
115+
topic: Full topic name, e.g. persistent://public/default/my-topic
116+
subscription: Subscription name
117+
subscription_type: Exclusive | Shared | Failover | Key_Shared
118+
max_messages: Maximum number of messages to return (default 10)
119+
timeout_ms: Milliseconds to wait for each message (default 3000)
120+
"""
121+
sub_type_map = {
122+
"Exclusive": pulsar.ConsumerType.Exclusive,
123+
"Shared": pulsar.ConsumerType.Shared,
124+
"Failover": pulsar.ConsumerType.Failover,
125+
"Key_Shared": pulsar.ConsumerType.KeyShared,
126+
}
127+
consumer_type = sub_type_map.get(subscription_type, pulsar.ConsumerType.Shared)
128+
129+
client = _make_client()
130+
try:
131+
consumer = client.subscribe(topic, subscription, consumer_type=consumer_type)
132+
messages = []
133+
for _ in range(max_messages):
134+
try:
135+
msg = consumer.receive(timeout_millis=timeout_ms)
136+
messages.append({
137+
"message_id": str(msg.message_id()),
138+
"topic": msg.topic_name(),
139+
"publish_timestamp": msg.publish_timestamp(),
140+
"properties": msg.properties(),
141+
"payload": msg.data().decode("utf-8", errors="replace"),
142+
})
143+
consumer.acknowledge(msg)
144+
except Exception:
145+
break
146+
consumer.close()
147+
return {"messages": messages, "count": len(messages)}
148+
finally:
149+
client.close()
150+
151+
152+
@mcp.tool()
153+
def pulsar_peek(
154+
topic: str,
155+
start_from: str = "earliest",
156+
max_messages: int = 10,
157+
timeout_ms: int = 2000,
158+
) -> dict:
159+
"""
160+
Read messages from a topic using a Reader (no subscription, no ack).
161+
Useful for inspecting topic contents without affecting consumer state.
162+
163+
Args:
164+
topic: Full topic name, e.g. persistent://public/default/my-topic
165+
start_from: "earliest" or "latest" (default "earliest")
166+
max_messages: Maximum messages to return (default 10)
167+
timeout_ms: Milliseconds to wait for each message (default 2000)
168+
"""
169+
start_id = (
170+
pulsar.MessageId.earliest
171+
if start_from == "earliest"
172+
else pulsar.MessageId.latest
173+
)
174+
175+
client = _make_client()
176+
try:
177+
reader = client.create_reader(topic, start_id)
178+
messages = []
179+
for _ in range(max_messages):
180+
if not reader.has_message_available():
181+
break
182+
try:
183+
msg = reader.read_next(timeout_millis=timeout_ms)
184+
messages.append({
185+
"message_id": str(msg.message_id()),
186+
"topic": msg.topic_name(),
187+
"publish_timestamp": msg.publish_timestamp(),
188+
"properties": msg.properties(),
189+
"payload": msg.data().decode("utf-8", errors="replace"),
190+
})
191+
except Exception:
192+
break
193+
reader.close()
194+
return {"messages": messages, "count": len(messages)}
195+
finally:
196+
client.close()
197+
198+
199+
if __name__ == "__main__":
200+
mcp.run()

0 commit comments

Comments
 (0)