Skip to content

[XPU] fix zmq err catch#7844

Open
cmcamdy wants to merge 4 commits into
PaddlePaddle:developfrom
cmcamdy:fix_zmq_server
Open

[XPU] fix zmq err catch#7844
cmcamdy wants to merge 4 commits into
PaddlePaddle:developfrom
cmcamdy:fix_zmq_server

Conversation

@cmcamdy
Copy link
Copy Markdown
Collaborator

@cmcamdy cmcamdy commented May 18, 2026

Motivation

💡 If this PR is a Cherry Pick, the PR title needs to follow the format by adding the [Cherry-Pick] label at the very beginning and appending the original PR ID at the end. For example, [Cherry-Pick][CI] Add check trigger and logic(#5191)

💡 如若此PR是Cherry Pick,PR标题需遵循格式,在最开始加上[Cherry-Pick]标签,以及最后面加上原PR ID,例如[Cherry-Pick][CI] Add check trigger and logic(#5191)

Modifications

Usage or Command

Accuracy Tests

Checklist

  • Add at least a tag in the PR title.
    • Tag list: [[FDConfig],[APIServer],[Engine], [Scheduler], [PD Disaggregation], [Executor], [Graph Optimization], [Speculative Decoding], [RL], [Models], [Quantization], [Loader], [OP], [KVCache], [DataProcessor], [BugFix], [Docs], [CI], [Optimization], [Feature], [Benchmark], [Others], [XPU], [HPU], [GCU], [DCU], [Iluvatar], [Metax]]
    • You can add new tags based on the PR content, but the semantics must be clear.
  • Format your code, run pre-commit before commit.
  • Add unit tests. Please write the reason in this PR if no unit tests.
  • Provide accuracy results.
  • If the current PR is submitting to the release branch, make sure the PR has been submitted to the develop branch, then cherry-pick it to the release branch with the [Cherry-Pick] PR tag.

PaddlePaddle-bot

This comment was marked as outdated.

@paddle-bot
Copy link
Copy Markdown

paddle-bot Bot commented May 18, 2026

Thanks for your contribution!

@codecov-commenter
Copy link
Copy Markdown

codecov-commenter commented May 18, 2026

Codecov Report

❌ Patch coverage is 42.10526% with 11 lines in your changes missing coverage. Please review.
⚠️ Please upload report for BASE (develop@bda1756). Learn more about missing BASE report.

Files with missing lines Patch % Lines
fastdeploy/inter_communicator/zmq_server.py 42.10% 10 Missing and 1 partial ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             develop    #7844   +/-   ##
==========================================
  Coverage           ?   63.28%           
==========================================
  Files              ?      462           
  Lines              ?    64387           
  Branches           ?     9873           
==========================================
  Hits               ?    40745           
  Misses             ?    20871           
  Partials           ?     2771           
Flag Coverage Δ
GPU 72.38% <42.10%> (?)
XPU 7.12% <5.26%> (?)

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@PaddlePaddle-bot
Copy link
Copy Markdown

PaddlePaddle-bot commented May 18, 2026

🤖 Paddle-CI-Agent | ci_status_monitor | 2026-05-19 20:57:06

CI报告基于以下代码生成(30分钟更新一次):


1 任务总览

⚠️ Required 失败 2 个,需优先处理后方可合并。

总执行(rerun次数) 总任务 ✅ 通过 ❌ 失败 ⏳ 运行中 ⏸️ 等待中 跳过
42(0) 42 37 4 0 1 0

2 任务状态汇总

2.1 Required任务 : 8/10 通过

必选任务阻塞合并,失败需优先处理。

状态 任务 耗时 根因 修复建议 日志 重跑
run_tests_with_coverage 1h21m PR问题:新增代码覆盖率47%,未达80%阈值 为新增异常处理代码添加单测 Job -
run_4_cards_tests 19m24s 环境问题:NCCL CUDA错误,worker进程启动失败 环境问题,请 rerun Job -
其余 8 个必选任务通过 - - - - -

2.2 可选任务 — 29/32 通过

可选任务不阻塞合并,失败仅供参考。

状态 任务 耗时 日志 重跑
run_iluvatar_cases 1m14s Job -
Check PR Template 17s Job -
⏸️ CI_HPU - - -
其余 29 个可选任务通过 - - -

3 失败详情(仅 required)

Run FastDeploy Unit Tests and Coverage / run_tests_with_coverage — 代码覆盖率不足(置信度: 高)

Run FastDeploy Unit Tests and Coverage / run_tests_with_coverage

  • 状态: ❌ 失败
  • 错误类型: 代码覆盖率不足
  • 置信度: 高
  • 根因摘要: PR新增zmq_server.py代码19行,diff覆盖率47%,未达80%阈值

根因详情:
本 PR 对 fastdeploy/inter_communicator/zmq_server.py 新增了 19 行代码(异常捕获处理 + 帧验证 + unbind逻辑),但单元测试中未覆盖这些新增分支,导致 diff 覆盖率仅 47%(< 80% 阈值)。具体未覆盖行包括:recv_json 中的 except 块(L116-118)、recv_pyobj 中的 except 块(L162-164)、recv_control_cmd 中的帧数检查(L553-554)、close 中的 unbind 调用(L594-595)。

关键日志:

GPU Patch Coverage Details:
  fastdeploy/inter_communicator/zmq_server.py:
    percent_covered: 47.37%
    violation_lines: [116, 117, 118, 162, 163, 164, 553, 554, 594, 595]
    total_num_lines: 19 | total_num_violations: 10
##[error]Process completed with exit code 9.

修复建议:

  1. tests/ 中为 zmq_server.py 新增异常处理路径的单元测试,覆盖 UnicodeDecodeError/ValueError/UnpicklingError 分支(L116-118, L162-164)以及帧数不足场景(L553-554)
  2. 若无法添加测试(如依赖底层ZMQ环境),可向 CI 申请覆盖率豁免

修复建议摘要: 为zmq_server.py新增异常分支添加单元测试

关联变更: fastdeploy/inter_communicator/zmq_server.py L111-118, L157-164, L548-554, L591-595
链接: 查看日志

Run Four Cards Tests / run_4_cards_tests — 基础设施错误(置信度: 高)

Run Four Cards Tests / run_4_cards_tests

  • 状态: ❌ 失败
  • 错误类型: 基础设施
  • 置信度: 高
  • 根因摘要: NCCL CUDA错误,CustomAllreduce初始化时all_gather失败,测试超时

失败用例:

测试 错误 根因
test_determinism_long.py::test_deterministic_long_sequence[0.0-100] OSError: NCCL unhandled cuda error GPU/NCCL环境异常,与PR无关

根因详情:
test_determinism_long.py 在初始化 CustomAllreducecreate_shared_buffer 时调用 dist.all_gather_object,触发 NCCL 底层 CUDA 错误(process_group_nccl.cc:912 'unhandled cuda error'),导致 worker 进程启动失败。此错误与本 PR 修改的 ZMQ 通信模块无任何关联,属于 CI 机器 GPU/NCCL 环境故障。

关键日志:

ERROR engine.py[line:168] Failed to launch worker processes
OSError: (External) Failed, NCCL error
  process_group_nccl.cc:912 'unhandled cuda error'
Pytest timeout (10 min) for: test_determinism_long.py

修复建议:

  1. 环境问题,请直接 rerun 该任务

修复建议摘要: 环境问题,请 rerun

关联变更: 无(PR仅修改ZMQ模块,与NCCL/GPU无关)
链接: 查看日志

@PaddlePaddle-bot
Copy link
Copy Markdown

🤖 Paddle-CI-Agent | ci_status_monitor | 2026-05-19 06:39:34

CI报告基于以下代码生成(30分钟更新一次):


1 任务总览

1 个 Required 任务失败,需优先处理后方可合并。

总执行(rerun次数) 总任务 ✅ 通过 ❌ 失败 ⏳ 运行中 ⏸️ 等待中 跳过
41(0) 41 37 3 0 1 0

2 任务状态汇总

2.1 Required任务 : 9/10 通过

必选任务阻塞合并,失败需优先处理。

状态 任务 耗时 根因 修复建议 日志 重跑
Run FastDeploy Unit Tests and Coverage / run_tests_with_coverage 1h21m PR问题:recv() 新增 copy=False,_FakeSocket 未适配,5 用例 TypeError 更新 _FakeSocket.recv() 支持 copy 参数并返回 frame 对象 Job -
其余 9 个必选任务通过 - - - - -

2.2 可选任务 — 28/31 通过

可选任务不阻塞合并,失败仅供参考。

状态 任务 耗时 日志 重跑
Run iluvatar Tests / run_iluvatar_cases 22m21s Job -
Check PR Template 14s Job -
⏸️ CI_HPU - - -
其余 28 个可选任务通过 - - -

3 失败详情(仅 required)

Run FastDeploy Unit Tests and Coverage / run_tests_with_coverage — 测试失败(置信度: 高)

Run FastDeploy Unit Tests and Coverage / run_tests_with_coverage

  • 状态: ❌ 失败
  • 错误类型: 测试失败
  • 置信度: 高
  • 根因摘要: PR 为 recv() 新增 copy=False 参数,_FakeSocket mock 未适配,5 个用例失败
  • 分析器: ci_analyze_unittest_fastdeploy

失败用例:

测试 错误 根因
test_zmq_server.py::TestZmqServerBase::test_recv_json_returns_raw_payload TypeError: unexpected keyword argument 'copy' _FakeSocket.recv() 不接受 copy 参数
test_zmq_server.py::TestZmqServerBase::test_send_and_recv_json_roundtrip TypeError: unexpected keyword argument 'copy' _FakeSocket.recv() 不接受 copy 参数
test_zmq_server.py::TestZmqServerBase::test_recv_pyobj_meta_envelope TypeError: unexpected keyword argument 'copy' _FakeSocket.recv() 不接受 copy 参数
test_zmq_server.py::TestZmqServerBase::test_send_pyobj_and_recv_pyobj_fallback TypeError: unexpected keyword argument 'copy' _FakeSocket.recv() 不接受 copy 参数
test_zmq_server.py::TestZmqServers::test_zmq_ipc_server_close_cleans_ipc AssertionError: True is not false IPC 文件未被清理

根因详情:

PR 将 zmq_server.pyrecv_json() (L113) 和 recv_pyobj() (L160) 的调用从 socket.recv(flags=flags) 改为 socket.recv(copy=False, flags=flags),并通过 frame.bytes 获取数据;但测试中的 _FakeSocket.recv() 未添加 copy 参数,导致 4 个用例抛出 TypeError。另外,PR 在 ZmqIpcServer.close() (L505-507) 新增了 self.socket.unbind(self.address) 调用,_FakeSocketunbind() 或调用失败导致后续 IPC 文件清理逻辑未执行,第 5 个测试断言 assertFalse(os.path.exists(...)) 失败。

关键日志:

frame = self.socket.recv(copy=False, flags=flags)
TypeError: _FakeSocket.recv() got an unexpected keyword argument 'copy'
fastdeploy/inter_communicator/zmq_server.py:113 / zmq_server.py:160

server.close()
AssertionError: True is not false  (IPC socket file still exists)
tests/inter_communicator/test_zmq_server.py:542

修复建议:

  1. tests/inter_communicator/test_zmq_server.py 中更新 _FakeSocket.recv() 签名为 def recv(self, copy=True, flags=0),并在 copy=False 时返回带 .bytes 属性的帧对象(可用 types.SimpleNamespace(bytes=self.recv_payload) 包装)
  2. _FakeSocket 添加 unbind(self, addr) 方法(空实现即可),并确认 ZmqIpcServer.close() 中包含 os.remove(self.file_name) 的清理逻辑

修复建议摘要: 更新 _FakeSocket.recv() 添加 copy 参数并返回 frame 对象

关联变更: fastdeploy/inter_communicator/zmq_server.py L113, L160(recv 接口变更);L505-507(unbind 新增)

链接: 查看日志

@PaddlePaddle-bot
Copy link
Copy Markdown

🤖 Paddle-CI-Agent | ci_status_monitor | 2026-05-19 12:10:50

CI报告基于以下代码生成(30分钟更新一次):


1 任务总览

存在 1 个 required 失败任务1 个 required 任务运行中,需处理后方可合并。

总执行(rerun次数) 总任务 ✅ 通过 ❌ 失败 ⏳ 运行中 ⏸️ 等待中 跳过
41(0) 41 36 3 2 0 0

2 任务状态汇总

2.1 Required任务 : 8/10 通过

必选任务阻塞合并,失败需优先处理。

状态 任务 耗时 根因 修复建议 日志 重跑
xpu_8cards_case_test / run_xpu_8cards_cases 18m11s PR问题:XPU PD分离服务启动失败,XCCL RDMA通信rank间网络异常 检查RDMA网络或验证zmq_server.py改动对PD分离启动的影响 Job -
Run FastDeploy Unit Tests and Coverage / run_tests_with_coverage - 运行中 - Job -
其余 8 个必选任务通过 - - - - -

2.2 可选任务 — 28/31 通过

可选任务不阻塞合并,失败仅供参考。

状态 任务 耗时 日志 重跑
Run iluvatar Tests / run_iluvatar_cases 10m0s Job -
Check PR Template 11s Job -
CI_HPU - Job -
其余 28 个可选任务通过 - - -

3 失败详情(仅 required)

xpu_8cards_case_test / run_xpu_8cards_cases — 测试失败(置信度: 中)

xpu_8cards_case_test / run_xpu_8cards_cases

  • 状态: ❌ 失败
  • 错误类型: 测试失败
  • 置信度: 中
  • 根因摘要: XPU PD分离服务启动失败,XCCL RDMA通信rank间网络异常
  • 分析器: ci_analyze_unittest_fastdeploy

失败用例:

测试 错误 根因
test_pd_p_tp4ep4_d_tp1ep4.py::test_pd_separation Failed: PD分离服务启动失败 XCCL RDMA rank间通信失败

根因详情:
测试用例 test_pd_separation 启动 XPU 8卡 PD分离服务(TP4EP4 Prefill + TP1EP4 Decode),服务启动过程中 XCCL 初始化阶段出现 RDMA 网络通信失败:rank 0 无法从 rank 3(host 10.214.181.222)接收数据(net_socket_recv failed, error code=2)。本次 PR 修改了 zmq_server.py 中的 ZMQ 错误捕获逻辑(+19/-3),可能改变了服务启动时的错误处理行为,但 XCCL RDMA 层的网络异常也可能是底层基础设施不稳定所致。

关键日志:

[11:02:22][WARN][XCCL][net_socket.cpp:509] NET/Socket : recv data size 0 errno 0
[11:02:22][WARN][XCCL][init_common.cpp:48] rank 0 net_socket_recv failed,
    please check log in rank[3], host 10.214.181.222, expected 0, error code=2
tests/xpu_ci/8cards_cases/test_pd_p_tp4ep4_d_tp1ep4.py:292: in test_pd_separation
E   Failed: PD分离服务启动失败
FAILED tests/xpu_ci/8cards_cases/test_pd_p_tp4ep4_d_tp1ep4.py::test_pd_separation
=================== 1 failed, 3 passed in 887.80s (0:14:47) ====================

修复建议:

  1. 检查 fastdeploy/inter_communicator/zmq_server.py 新增的错误捕获逻辑是否改变了 PD 分离服务初始化时的错误传播路径(尤其是 receive_json_once / receive_pyobj_onceself.close() 的调用时机)
  2. 排查 XPU 节点间 RDMA 网络连通性(rank 0 与 rank 3 / host 10.214.181.222 之间),若为基础设施问题请 rerun

修复建议摘要: 检查zmq_server.py错误捕获是否影响PD分离服务启动,或rerun

关联变更: fastdeploy/inter_communicator/zmq_server.py(+19/-3)
链接: 查看日志

Copy link
Copy Markdown

@PaddlePaddle-bot PaddlePaddle-bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 Paddle-CI-Agent | pr_review | 2026-05-19 17:40:05

📋 Review 摘要

PR 概述:修复 ZMQ 通信层消息反序列化失败时的错误捕获缺失问题,以及 close() 中 socket 未正确解绑导致的资源泄漏问题
变更范围fastdeploy/inter_communicator/zmq_server.py
影响面 Tag[XPU] [Engine]

问题

级别 文件 概述
🔴 Bug zmq_server.py:595 close()unbind() 抛异常时 socket.close() 被跳过,导致 socket 资源泄漏
🟡 建议 zmq_server.py:117 recv_json 将原始消息字节记录到 warning 日志,可能暴露敏感数据
🟡 建议 zmq_server.py:163 recv_pyobj 将原始消息字节记录到 warning 日志,可能暴露敏感数据

📝 PR 规范检查

PR 标题含 [XPU] 官方 Tag 格式合规;但 MotivationModificationsUsage or CommandAccuracy Tests 等所有描述段落均为空(仅含模板注释占位符),Checklist 条目均未勾选,不符合描述模板要求。

标题建议(可直接复制):

  • [BugFix] Fix zmq error handling and resource cleanup in inter_communicator

PR 描述建议(可直接复制):

## Motivation
修复 `fastdeploy/inter_communicator/zmq_server.py` 中以下问题:
1. `recv_json` / `recv_pyobj` 在消息反序列化失败时缺少错误捕获,导致异常无日志直接向上传播,难以排查
2. `recv_control_cmd` 使用固定帧数解包,遇到异常帧数时直接抛异常,改为先收帧再按下标取值并添加帧数保护
3. `close()` 未在关闭前显式 unbind,且若 unbind 抛异常,socket.close() 会被跳过造成资源泄漏

## Modifications
- `fastdeploy/inter_communicator/zmq_server.py`- `recv_json`:在 `_deserialize` 外层增加 try/except,捕获 `UnicodeDecodeError``ValueError`,记录 warning 日志后重抛
  - `recv_pyobj`:在 `ForkingPickler.loads` 外层增加 try/except,捕获 `UnicodeDecodeError``ValueError``pickle.UnpicklingError`,记录 warning 日志后重抛
  - `recv_control_cmd`:改为先接收所有帧 `frames`,校验帧数(< 2 则丢弃),再按下标取 `client = frames[0]``task_data = frames[-1]`
  - `close`:增加 `socket.unbind(self.address)` 调用(通过 `if self.address:` 保护),在 close 前完成解绑

## Usage or Command
N/A

## Accuracy Tests
N/A

## Checklist

- [x] Add at least a tag in the PR title.
  - Tag list: [`[FDConfig]`,`[APIServer]`,`[Engine]`, `[Scheduler]`, `[PD Disaggregation]`, `[Executor]`, `[Graph Optimization]`, `[Speculative Decoding]`, `[RL]`, `[Models]`, `[Quantization]`, `[Loader]`, `[OP]`, `[KVCache]`, `[DataProcessor]`, `[BugFix]`, `[Docs]`, `[CI]`, `[Optimization]`, `[Feature]`, `[Benchmark]`, `[Others]`, `[XPU]`, `[HPU]`, `[GCU]`, `[DCU]`, `[Iluvatar]`, `[Metax]`]
  - You can add new tags based on the PR content, but the semantics must be clear.
- [ ] Format your code, run `pre-commit` before commit.
- [ ] Add unit tests. Please write the reason in this PR if no unit tests.
- [ ] Provide accuracy results.
- [ ] If the current PR is submitting to the `release` branch, make sure the PR has been submitted to the `develop` branch, then cherry-pick it to the `release` branch with the `[Cherry-Pick]` PR tag.

总体评价

错误捕获和帧数保护方向正确,但 close()unbind() 未独立捕获异常,可能导致 socket 未被关闭;建议在 unbind 外层独立包裹 try/except 以确保 socket.close() 总能执行。

try:
if self.socket is not None and not self.socket.closed:
if self.address:
self.socket.unbind(self.address)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Bug unbind() 抛出异常时 socket.close() 将被跳过,导致 ZMQ socket 文件描述符泄漏。

建议修复方式:

if self.socket is not None and not self.socket.closed:
    if self.address:
        try:
            self.socket.unbind(self.address)
        except Exception as e:
            llm_logger.warning(f"Failed to unbind socket from {self.address}: {e}")
    self.socket.close()

try:
envelope = ForkingPickler.loads(data_bytes)
except (UnicodeDecodeError, ValueError, pickle.UnpicklingError) as e:
llm_logger.warning(f"recv_pyobj decode failed, msg={data_bytes}, err={e}")
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 建议 将原始消息字节 data_bytes 直接打印到 warning 日志,若消息包含 prompt / token 数据,可能造成敏感信息泄漏,且大消息体会产生大量日志。

建议只打印字节长度:

llm_logger.warning(f"recv_pyobj decode failed, msg_len={len(data_bytes)}, err={e}")

try:
data_dict = self.socket._deserialize(msg, lambda buf: jsonapi.loads(buf))
except (UnicodeDecodeError, ValueError) as e:
llm_logger.warning(f"recv_json decode failed, msg={msg}, err={e}")
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 建议 将原始消息字节 msg 直接打印到 warning 日志,若消息包含 prompt 等敏感内容,可能造成信息泄漏,且大消息体会产生大量日志。

建议只打印字节长度:

llm_logger.warning(f"recv_json decode failed, msg_len={len(msg)}, err={e}")

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants