[XPU] fix zmq err catch#7844
Conversation
|
Thanks for your contribution! |
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## develop #7844 +/- ##
==========================================
Coverage ? 63.28%
==========================================
Files ? 462
Lines ? 64387
Branches ? 9873
==========================================
Hits ? 40745
Misses ? 20871
Partials ? 2771
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
CI报告基于以下代码生成(30分钟更新一次): 1 任务总览
2 任务状态汇总2.1 Required任务 : 8/10 通过
2.2 可选任务 — 29/32 通过
3 失败详情(仅 required)Run FastDeploy Unit Tests and Coverage / run_tests_with_coverage — 代码覆盖率不足(置信度: 高)Run FastDeploy Unit Tests and Coverage / run_tests_with_coverage
根因详情: 关键日志: 修复建议:
修复建议摘要: 为zmq_server.py新增异常分支添加单元测试 关联变更: Run Four Cards Tests / run_4_cards_tests — 基础设施错误(置信度: 高)Run Four Cards Tests / run_4_cards_tests
失败用例:
根因详情: 关键日志: 修复建议:
修复建议摘要: 环境问题,请 rerun 关联变更: 无(PR仅修改ZMQ模块,与NCCL/GPU无关) |
CI报告基于以下代码生成(30分钟更新一次): 1 任务总览❌ 1 个 Required 任务失败,需优先处理后方可合并。
2 任务状态汇总2.1 Required任务 : 9/10 通过
2.2 可选任务 — 28/31 通过
3 失败详情(仅 required)Run FastDeploy Unit Tests and Coverage / run_tests_with_coverage — 测试失败(置信度: 高)Run FastDeploy Unit Tests and Coverage / run_tests_with_coverage
失败用例:
根因详情: PR 将 关键日志: 修复建议:
修复建议摘要: 更新 _FakeSocket.recv() 添加 copy 参数并返回 frame 对象 关联变更: 链接: 查看日志 |
CI报告基于以下代码生成(30分钟更新一次): 1 任务总览存在 1 个 required 失败任务,1 个 required 任务运行中,需处理后方可合并。
2 任务状态汇总2.1 Required任务 : 8/10 通过
2.2 可选任务 — 28/31 通过
3 失败详情(仅 required)xpu_8cards_case_test / run_xpu_8cards_cases — 测试失败(置信度: 中)xpu_8cards_case_test / run_xpu_8cards_cases
失败用例:
根因详情: 关键日志: 修复建议:
修复建议摘要: 检查zmq_server.py错误捕获是否影响PD分离服务启动,或rerun 关联变更: |
PaddlePaddle-bot
left a comment
There was a problem hiding this comment.
🤖 Paddle-CI-Agent | pr_review |
2026-05-19 17:40:05
📋 Review 摘要
PR 概述:修复 ZMQ 通信层消息反序列化失败时的错误捕获缺失问题,以及 close() 中 socket 未正确解绑导致的资源泄漏问题
变更范围:fastdeploy/inter_communicator/zmq_server.py
影响面 Tag:[XPU] [Engine]
问题
| 级别 | 文件 | 概述 |
|---|---|---|
| 🔴 Bug | zmq_server.py:595 |
close() 中 unbind() 抛异常时 socket.close() 被跳过,导致 socket 资源泄漏 |
| 🟡 建议 | zmq_server.py:117 |
recv_json 将原始消息字节记录到 warning 日志,可能暴露敏感数据 |
| 🟡 建议 | zmq_server.py:163 |
recv_pyobj 将原始消息字节记录到 warning 日志,可能暴露敏感数据 |
📝 PR 规范检查
PR 标题含 [XPU] 官方 Tag 格式合规;但 Motivation、Modifications、Usage or Command、Accuracy Tests 等所有描述段落均为空(仅含模板注释占位符),Checklist 条目均未勾选,不符合描述模板要求。
标题建议(可直接复制):
[BugFix] Fix zmq error handling and resource cleanup in inter_communicator
PR 描述建议(可直接复制):
## Motivation
修复 `fastdeploy/inter_communicator/zmq_server.py` 中以下问题:
1. `recv_json` / `recv_pyobj` 在消息反序列化失败时缺少错误捕获,导致异常无日志直接向上传播,难以排查
2. `recv_control_cmd` 使用固定帧数解包,遇到异常帧数时直接抛异常,改为先收帧再按下标取值并添加帧数保护
3. `close()` 未在关闭前显式 unbind,且若 unbind 抛异常,socket.close() 会被跳过造成资源泄漏
## Modifications
- `fastdeploy/inter_communicator/zmq_server.py`:
- `recv_json`:在 `_deserialize` 外层增加 try/except,捕获 `UnicodeDecodeError`、`ValueError`,记录 warning 日志后重抛
- `recv_pyobj`:在 `ForkingPickler.loads` 外层增加 try/except,捕获 `UnicodeDecodeError`、`ValueError`、`pickle.UnpicklingError`,记录 warning 日志后重抛
- `recv_control_cmd`:改为先接收所有帧 `frames`,校验帧数(< 2 则丢弃),再按下标取 `client = frames[0]`、`task_data = frames[-1]`
- `close`:增加 `socket.unbind(self.address)` 调用(通过 `if self.address:` 保护),在 close 前完成解绑
## Usage or Command
N/A
## Accuracy Tests
N/A
## Checklist
- [x] Add at least a tag in the PR title.
- Tag list: [`[FDConfig]`,`[APIServer]`,`[Engine]`, `[Scheduler]`, `[PD Disaggregation]`, `[Executor]`, `[Graph Optimization]`, `[Speculative Decoding]`, `[RL]`, `[Models]`, `[Quantization]`, `[Loader]`, `[OP]`, `[KVCache]`, `[DataProcessor]`, `[BugFix]`, `[Docs]`, `[CI]`, `[Optimization]`, `[Feature]`, `[Benchmark]`, `[Others]`, `[XPU]`, `[HPU]`, `[GCU]`, `[DCU]`, `[Iluvatar]`, `[Metax]`]
- You can add new tags based on the PR content, but the semantics must be clear.
- [ ] Format your code, run `pre-commit` before commit.
- [ ] Add unit tests. Please write the reason in this PR if no unit tests.
- [ ] Provide accuracy results.
- [ ] If the current PR is submitting to the `release` branch, make sure the PR has been submitted to the `develop` branch, then cherry-pick it to the `release` branch with the `[Cherry-Pick]` PR tag.总体评价
错误捕获和帧数保护方向正确,但 close() 中 unbind() 未独立捕获异常,可能导致 socket 未被关闭;建议在 unbind 外层独立包裹 try/except 以确保 socket.close() 总能执行。
| try: | ||
| if self.socket is not None and not self.socket.closed: | ||
| if self.address: | ||
| self.socket.unbind(self.address) |
There was a problem hiding this comment.
🔴 Bug unbind() 抛出异常时 socket.close() 将被跳过,导致 ZMQ socket 文件描述符泄漏。
建议修复方式:
if self.socket is not None and not self.socket.closed:
if self.address:
try:
self.socket.unbind(self.address)
except Exception as e:
llm_logger.warning(f"Failed to unbind socket from {self.address}: {e}")
self.socket.close()| try: | ||
| envelope = ForkingPickler.loads(data_bytes) | ||
| except (UnicodeDecodeError, ValueError, pickle.UnpicklingError) as e: | ||
| llm_logger.warning(f"recv_pyobj decode failed, msg={data_bytes}, err={e}") |
There was a problem hiding this comment.
🟡 建议 将原始消息字节 data_bytes 直接打印到 warning 日志,若消息包含 prompt / token 数据,可能造成敏感信息泄漏,且大消息体会产生大量日志。
建议只打印字节长度:
llm_logger.warning(f"recv_pyobj decode failed, msg_len={len(data_bytes)}, err={e}")| try: | ||
| data_dict = self.socket._deserialize(msg, lambda buf: jsonapi.loads(buf)) | ||
| except (UnicodeDecodeError, ValueError) as e: | ||
| llm_logger.warning(f"recv_json decode failed, msg={msg}, err={e}") |
There was a problem hiding this comment.
🟡 建议 将原始消息字节 msg 直接打印到 warning 日志,若消息包含 prompt 等敏感内容,可能造成信息泄漏,且大消息体会产生大量日志。
建议只打印字节长度:
llm_logger.warning(f"recv_json decode failed, msg_len={len(msg)}, err={e}")
Motivation
Modifications
Usage or Command
Accuracy Tests
Checklist
[FDConfig],[APIServer],[Engine],[Scheduler],[PD Disaggregation],[Executor],[Graph Optimization],[Speculative Decoding],[RL],[Models],[Quantization],[Loader],[OP],[KVCache],[DataProcessor],[BugFix],[Docs],[CI],[Optimization],[Feature],[Benchmark],[Others],[XPU],[HPU],[GCU],[DCU],[Iluvatar],[Metax]]pre-commitbefore commit.releasebranch, make sure the PR has been submitted to thedevelopbranch, then cherry-pick it to thereleasebranch with the[Cherry-Pick]PR tag.