[PD] PD send cache via storage & Refine swap_cache_layout op#7839
[PD] PD send cache via storage & Refine swap_cache_layout op#7839juncaipeng wants to merge 6 commits into
Conversation
|
Thanks for your contribution! |
There was a problem hiding this comment.
Pull request overview
本 PR 在 PD 分离场景中新增「Storage Pool 模式」:P 侧不再通过 RDMA/IPC 直接把 KVCache 推到 D 侧 GPU,而是写入全局存储池,由 D 侧从存储池读回,依靠环境变量 FD_PD_TRANSFER_VIA_STORAGE 开关。同时对 swap_cache_layout.cu 进行重构,引入按 swap block 聚合所有 layer 的 D2H/H2D kernel 与全局 staging/指针缓冲,以提升交换性能。
Changes:
- 新增
FD_PD_TRANSFER_VIA_STORAGE环境变量及在 P 侧token_processor、cache_messager、D 侧common_engine、resource_manager_v1中的对应分支,配套增加prefix_cache_manager.write_all_cache_to_storage/read_cache_from_storage_for_pd。 - 重构
swap_cache_layout.cu,引入swap_d2h_kernel/scatter_blocks_kernel与文件级 staging/指针缓存,针对 cpu_block_ids 连续的常见路径走优化分支。 - 调整若干调度调试日志(
schedule()内强制update_metrics(True)、scheduled_reqs日志移出守卫),更新examples/cache_storage/run_03b_pd_storage.sh中MOONCAKE_GLOBAL_SEGMENT_SIZE。
Reviewed changes
Copilot reviewed 8 out of 8 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
| fastdeploy/envs.py | 新增 FD_PD_TRANSFER_VIA_STORAGE 开关 |
| fastdeploy/output/token_processor.py | P 侧在 send_first_token 前把全量 cache 写入存储 |
| fastdeploy/cache_manager/cache_messager.py | Storage 模式下跳过 RDMA 传输直接标记完成 |
| fastdeploy/cache_manager/prefix_cache_manager.py | 新增 PD 存储池模式专用的 write/read 流程及 :partial:N key |
| fastdeploy/engine/common_engine.py | D 侧在加入运行队列前同步从存储池读取 cache |
| fastdeploy/engine/sched/resource_manager_v1.py | finish 路径走新写法 + 调度内增加 verbose metrics/debug 日志 |
| custom_ops/gpu_ops/swap_cache_layout.cu | 重构为基于全局缓冲与 kernel 的批量 swap 实现 |
| examples/cache_storage/run_03b_pd_storage.sh | 调大示例脚本中的 Mooncake segment size |
Comments suppressed due to low confidence (2)
fastdeploy/engine/common_engine.py:1854
- 这里直接以
self.resource_manager.requests[request_id]取请求对象,没有任何防御性判断。在 D 流程中,前面add_prefilled_request之前的几个分支(first token eos、error_code != 200)已经会走pre_recycle_resource然后continue,但是当存储读取失败时本函数自己又会调用pre_recycle_resource(request_id)然后continue,紧接着外层循环可能再次拿到同一request_id(例如重试或者错误重发场景),或者上游已经清理过requests字典时,就会抛KeyError并把整个_process_prefilled_requests异常吞掉,导致后续请求处理被阻塞。建议使用self.resource_manager.requests.get(request_id),并在为None时记录错误并跳过/回收,而不是依赖外层 try/except。
request = self.resource_manager.requests[request_id]
fastdeploy/cache_manager/cache_messager.py:695
- 此处的 log 字符串
"[PD Storage] Skip RDMA transfer, mark as finished, " f"req_id: {task['request_id']}"实际是两个相邻字面量拼接,第二个才是 f-string。代码功能没问题,但写法容易让读者误以为前半段也是 f-string;建议直接合并为一个 f-string。类似写法在本 PR 的common_engine.py1859、1866、1873 行也出现,建议一并统一。
logger.info(
f"[PD Storage] Skip RDMA transfer, mark as finished, " f"req_id: {task['request_id']}"
)
8aa93f4 to
2c1334e
Compare
2c1334e to
100061d
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 8 out of 8 changed files in this pull request and generated 6 comments.
Comments suppressed due to low confidence (1)
fastdeploy/cache_manager/prefix_cache_manager.py:1410
write_all_cache_to_storage与read_cache_from_storage_for_pd两个方法中,partial block 的 key 构造、token_ids 还原、block_size 切分逻辑几乎完全一致(lines 1304–1331 与 1385–1410)。建议抽出一个私有方法(例如_compute_storage_keys_with_partial(input_token_ids, request))返回 keys 列表,避免后续修改 hash 算法时两处遗漏导致 P/D 端 key 不一致而读不到 cache。
# 2. Calculate cache keys using same algorithm as write_all_cache_to_storage
keys = []
prefix_block_key = []
block_size = self.config.cache_config.block_size
mm_idx = 0
for i in range(0, len(input_token_ids), block_size):
block_token_ids = input_token_ids[i : i + block_size]
actual_token_num = len(block_token_ids)
if actual_token_num < block_size:
key = get_hash_str(block_token_ids, prefix_block_key)
key = f"{key}:partial:{actual_token_num}"
keys.append(key)
else:
mm_idx, extra_keys = self.get_block_hash_extra_keys(
request=request,
start_idx=i,
end_idx=i + block_size,
mm_idx=mm_idx,
)
prefix_block_key.extend(extra_keys)
key = get_hash_str(block_token_ids, prefix_block_key)
keys.append(key)
prefix_block_key = [key]
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## release/2.6 #7839 +/- ##
==============================================
Coverage ? 72.30%
==============================================
Files ? 381
Lines ? 54317
Branches ? 8494
==============================================
Hits ? 39274
Misses ? 12274
Partials ? 2769
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
CI报告基于以下代码生成(30分钟更新一次): 1 任务总览存在 1 个 Required 失败任务需要优先处理(Approval 审批未通过)。
2 任务状态汇总2.1 Required任务 : 3/4 通过
2.2 可选任务 — 11/14 通过
3 失败详情(仅 required)Approval — 代码规范/审批不通过(置信度: 高)Approval
根因详情: 关键日志: 修复建议:
修复建议摘要: 请FastDeploy RD进行Approve(共需满足2项审批要求) 链接: 查看日志 |
This comment was marked as outdated.
This comment was marked as outdated.
PaddlePaddle-bot
left a comment
There was a problem hiding this comment.
🤖 Paddle-CI-Agent | pr_review |
2026-05-18 17:46:07
📋 Review 摘要
PR 概述:新增 PD 分离 Storage Pool 传输模式(通过 Mooncake 存储解耦 P/D 直连),并重构 swap_cache_layout CUDA 算子(连续 Block 合并 DMA + Scatter Kernel 优化)
变更范围:custom_ops/gpu_ops/、fastdeploy/cache_manager/、fastdeploy/engine/、fastdeploy/envs.py、fastdeploy/model_executor/layers/attention/
影响面 Tag:[OP] [KVCache] [Engine] [PD Disaggregation]
问题
| 级别 | 文件 | 概述 |
|---|---|---|
| 🔴 Bug | custom_ops/gpu_ops/swap_cache_layout.cu:76 |
三个全局 CUDA 缓冲区(g_staging_buffer 等)只增不减,进程退出前无 cudaFree,符合 §C 显存泄漏信号 |
| 🟡 建议 | custom_ops/gpu_ops/swap_cache_layout.cu:32 |
num_vec_per_layer 使用整除,block_stride * sizeof(T) 不能被 16 整除时尾部数据静默丢失 |
| ❓ 疑问 | fastdeploy/cache_manager/cache_transfer_manager.py:1302 |
swap_to_storage_barrier.reset() 被移除,需确认 Barrier 实现是否自动重置,否则并发请求可能死锁 |
📝 PR 规范检查
存在两处标题不合规:
- 使用
[PD](非官方 Tag),官方对应 Tag 为[PD Disaggregation] - 目标分支为
release/2.6(非 develop),必须使用[Cherry-Pick]格式;Checklist 第 5 条(cherry-pick)未勾选,且未在描述中注明 develop PR 编号
标题建议(可直接复制):
[Cherry-Pick][PD Disaggregation] PD send cache via storage & Refine swap_cache_layout op (#原始PR号)
注:
#原始PR号请替换为已合入 develop 分支的对应 PR 编号。
总体评价
Storage Pool 模式整体方案设计合理,TP 多 rank 结果聚合(min(saved_results, key=len))逻辑清晰,错误处理链路也有明显改善。但 CUDA 侧引入了三个永不释放的全局缓冲区,在 GPU 显存紧张的生产环境会造成显存碎片,属于 §C 必报项,需修复后合入。
Motivation
在 PD 分离场景下,原方案 P 实例通过 RDMA/IPC 直传 KV Cache 到 D 实例 GPU,耦合度较高。本 PR 新增 Storage Pool 模式(
FD_PD_TRANSFER_VIA_STORAGE=1),P 将 KV Cache 写入全局存储池(Mooncake),D 从存储读取,解耦 P/D 直连依赖,提升部署灵活性。同时重构swap_cache_layoutCUDA 算子,CPU Block ID 连续时合并为单次 DMA + Scatter Kernel,降低 PCIe 传输开销。Modifications
fastdeploy/envs.py:新增FD_PD_TRANSFER_VIA_STORAGE(0=直传,1=存储池)fastdeploy/cache_manager/prefix_cache_manager.py:新增write_all_cache_to_storage()(支持最后不完整 block,key 加:partial:N后缀)和read_cache_from_storage_for_pd()fastdeploy/cache_manager/cache_messager.py:存储池模式下跳过 RDMA 传输,直接 mark finishedfastdeploy/output/token_processor.py:P 端 prefill 结束后直接写存储再发 first_tokenfastdeploy/engine/common_engine.py:D 端收到 first_token 后从存储读 Cache,失败返回 502fastdeploy/engine/common_engine_prepare_mixin.py:存储池模式跳过 send_cache_info_to_messagerfastdeploy/engine/sched/resource_manager_v1.py:存储池模式仅 D 端写 Cachefastdeploy/model_executor/layers/attention/*.py:存储池模式跳过 kv_signal 初始化custom_ops/gpu_ops/swap_cache_layout.cu:重构算子,连续 Block 走 D2H kernel / H2D staging+scatter 优化路径Usage or Command
Accuracy Tests
N/A
Checklist
[FDConfig],[APIServer],[Engine],[Scheduler],[PD Disaggregation],[Executor],[Graph Optimization],[Speculative Decoding],[RL],[Models],[Quantization],[Loader],[OP],[KVCache],[DataProcessor],[BugFix],[Docs],[CI],[Optimization],[Feature],[Benchmark],[Others],[XPU],[HPU],[GCU],[DCU],[Iluvatar],[Metax]]pre-commitbefore commit.releasebranch, make sure the PR has been submitted to thedevelopbranch, then cherry-pick it to thereleasebranch with the[Cherry-Pick]PR tag.