OffloadingConnectorScheduler 功能
__init__
初始化调度器:按 offloading spec 初始化 KV group 分组(full-attention、sliding-window、mamba)、
block 大小因子、lookup 策略;创建 manager(CPU 或 Tiering);初始化数据结构——
_req_status(每个请求的 offload 状态)、_jobs(在途 transfer job)、
_blocks_being_loaded(正在加载的 block,防止并发加载)、
_block_id_to_pending_jobs(block→job 的映射,用于 block 被重新分配时触发 flush)、
_stalled_job_ids(stalled job 的 worker 计数)以及 stalled 检测参数(300s timeout, 10s 间隔)。
TransferJobStatus:每个 in-flight job 的状态记录,创建时自动生成。 字段:req_id(属于哪个请求)、pending_count(还剩几个 worker 没报告完成)、keys(传输的 block keys)、is_store(store 还是 load)、sliding_window_block_ids/non_sliding_window_block_ids(引用的 GPU block)、submitted_at(创建时间戳,用于 stalled 检测)。_complete_job的status参数就是这个。
_remove_pending_job
从 _block_id_to_pending_jobs 中删除指定 job_id 在给定 block_ids 上的条目。
如果某个 block 的 pending job 集合被清空,连该 block 的条目也一并删除。
pending job:还没被 worker 确认完成的 store/load 任务。
_block_id_to_pending_jobs记录了每个 GPU block 被哪些 store job 引用。 如果这个 block 被 KV cache manager 重新分配给了另一个请求,而 store job 还没写完, 就需要把这个 store job 加入_current_batch_jobs_to_flush, 通过 metadata 告诉 worker”别往这个 block 写了,它已经被回收了”。
_check_stalled_jobs
遍历所有在途 job,比较 submitted_at 与当前时间的差值。
超过 300s timeout 的 job 视为卡死,交给 _handle_stalled_job 清理。
self._jobs:所有还在传输中(GPU→CPU 的 store 或 CPU→GPU 的 load)的任务字典。 key 是 job_id,value 是TransferJobStatus(req_id、pending_count、keys、is_store 等)。 worker 没报告完成之前,job 就一直在这里。
_handle_stalled_job
委托给 _complete_job(success=False) 清理卡死的 job(success=False 会导致 manager 丢弃残缺数据,
不标记为已存储、不级联到 secondary tier),然后将 job_id 记入 _stalled_job_ids 并附带剩余 worker 计数,
用于跳过后续到达的 late worker 报告。
self._stalled_job_ids:stalled 检测清理 job 后,worker 的报告可能还会陆续到达。 这个 dict 记录”已经处理过的 stalled job 还剩多少个 worker 的报告没到”。 key 是 job_id,value 是剩余 worker 数。每个 late report 到达时递减,到 0 就删除。
_complete_job
统一的 job 完成清理路径,正常和 stalled 共享。执行 5 步:
- 通知 manager(
complete_store(success=...)或complete_load) - 清理
_blocks_being_loaded(仅 load) - 清理
_block_id_to_pending_jobs(block→job 反向索引) - 从
_jobs删除 job - 从
req_status.transfer_jobs移除 job;如果是最后一个 job 且请求已完成,删除_req_status
不调用 on_request_finished——那是 request_finished() 的职责。
manager 做什么:
OffloadingManager,负责真正的 KV cache 读写。complete_store:标记 block 已从 GPU 传到 CPU,更新 ref_cnt,标记为可驱逐。complete_load:标记 block 已从 CPU 加载回 GPU。
_blocks_being_loaded:set,记录当前正在从 CPU 加载回 GPU 的 block keys。 另一个请求如果要加载同样的 block,发现已经在 set 中就跳过——避免重复加载。
difference_update:Python set 方法,A.difference_update(B)等价于A = A - B, 从 A 中删除所有 B 里有的元素。
_maximal_prefix_lookup
从 block 0 开始逐个调用 manager.lookup(),统计连续 HIT 的数量。
遇到 HIT_PENDING/RETRY 不中断(继续扫描让 manager 发起异步 lookup),遇到 MISS 停止。
返回连续命中数,如果有 deferred lookup 则返回 None。
_sliding_window_lookup
从 block 序列末尾向前扫描,统计连续 HIT 的 streak。 遇到连续 HIT 数 ≥ sliding_window_size 时停止,返回该 streak 的起始索引。 用于滑动窗口 attention 场景——只有末尾的 window_size 个 block 可能被命中。
_touch
标记指定请求的所有 offload_keys 的 access_time,用于 LRU 驱逐策略。
在 update_state_after_alloc 中调用,表示这些 block 的 GPU 副本已被访问。
manager.touch:标记 block 的最近访问时间。CPU 内存满了需要驱逐 block 时, 优先驱逐最久没被访问的。touch 就是在说”这些 block 刚被 GPU 读过了,不要驱逐它们”。
_lookup
对请求的 offload_keys 调用 manager.lookup(),
先对 full-attention groups 调用 _maximal_prefix_lookup,
再对 sliding-window groups 调用 _sliding_window_lookup。
返回能命中的 token 数(None 表示需要延迟,等异步 lookup 完成)。
为什么复杂:同时处理三种情况:
- Full attention:从 block 0 开始找连续命中的前缀,命中多少就跳过多少 token
- Sliding window:只有末尾 window_size 个 block 可能被命中,必须从尾向前扫
- 异步 lookup:block 缓存状态不确定(HIT_PENDING / RETRY)时不能立刻返回, 需要 manager 先发起异步查询,defer 结果等下次再查
_lookup是入口,内部调用_maximal_prefix_lookup和_sliding_window_lookup, 分别处理 full-attention 和 sliding-window 的命中查询。
on_new_request
请求首次加入调度器时调用:创建 ReqContext、调用 manager.on_new_request()
获取 offload policy(block_level / request_level)、初始化 RequestOffloadState 并加入 _req_status。
get_num_new_matched_tokens
返回该请求可以加载的额外 token 数。如果还有在途 transfer job 则返回 None(延迟处理)。
调用 _lookup 查询 prefix cache 命中情况,返回命中 token 数 + 是否需要异步加载。
update_state_after_alloc
请求在 KV cache manager 中分配了 block 后调用。更新 _req_status 中的 block_ids 分组,
处理 sliding-window 的 stale 条目(被重新分配的 block 标记为 0),并将新分配的 block_ids
加入 _current_batch_allocated_block_ids。
为什么需要:防止往已经被回收的 block 里写东西。 场景:请求 A 的 block 5 被滑动窗口机制释放,KV cache manager 把 block 5 重新分配给了请求 B。 但请求 A 可能还有 store job 等着往 block 5 写数据!
这个函数做的事:
- 更新请求 A 的 block_ids(标记 block 5 已不属于它)
- 把 block 5 的条目设为 0(stale entry)
- 后续
_update_req_states扫描时发现block_ids[j] == 0→ 跳过这个 block,不往它写数据
_update_req_states
在 build_connector_meta 之前调用,决定哪些 block 需要 store、构建 store jobs。
遍历所有活跃请求,计算可 offload 的 token 数,调用 manager.prepare_store() 获取
GPU→CPU 的传输参数,创建 TransferJob 加入 _jobs 和 req_status.transfer_jobs。
对滑动窗口 block 注册到 _block_id_to_pending_jobs。
_build_store_jobs
(内部方法,被 _update_req_states 调用)为一个请求构建具体的 store job:
收集待 offload 的 keys,调用 manager.prepare_store(),生成 GPULoadStoreSpec,
分配 job_id,关联 block→job 映射(包括 sliding-window block 的反向索引注册)。
build_connector_meta
构造当前 engine step 的 OffloadingConnectorMetadata:
包含待提交的 load jobs(key + 传输参数)、store jobs(key + GPULoadStoreSpec)、
以及需要 flush(取消)的 store job 集合。被 scheduler 在每个 step 中调用,
然后 connector 将 metadata 分发给 worker。
has_pending_push_work
返回是否有未完成的 offload 工作(store jobs 待处理或 _req_status 中还有未完成的请求)。 用于判断 engine 是否需要继续处理该 connector。
update_connector_output
处理 worker 报告的 transfer 完成:遍历 meta.completed_jobs,
递减 pending_count,当 pending_count == 0 时调用 _complete_job 清理 job。
末尾运行 stalled 检测(10s 间隔节流)。
跳过 _stalled_job_ids 中的 late worker 报告和 reset_cache 之前的 stale job。
get_stats
收集并返回 offloading connector 的统计指标(store/load 次数、耗时、大小、cache 使用率)。
合并 _connector_stats(来自 worker)和 manager.get_stats()(来自 manager)。
request_finished
请求生成结束(is_finished)时调用:
- 调用
manager.on_request_finished()—— 通知 manager 不会再有新的 submit - 如果该请求没有在途 job,立即删除
_req_status - 如果有在途 job,保留
_req_status(等_complete_job清理最后一个 job 时才删除), 并将 non-sliding-window block 注册到_block_id_to_pending_jobs
take_events
从 OffloadingEventsTracker 和 manager 收集 KV cache 事件(BlockStored/BlockRemoved),
供引擎转发给 vLLM core 的 KV cache manager。
reset_cache
全停操作:清空所有缓存 block。清空 _req_status 中的已完请求、重置 manager(丢弃所有 block)、
重置 store 进度、清空 _jobs 和 _block_id_to_pending_jobs、设置 _stale_job_threshold
以丢弃 reset 之前的 worker 响应、重置 events tracker 和 stalled 检测状态。
shutdown
关闭调度器:调用 manager.shutdown() 释放后端资源(停止传输线程池、关闭 NIXL agent 等)。
CPUOffloadingManager
单层 offloading manager,直接管理 CPU 内存中的 KV cache block。
complete_store(keys, req_context, success=True)
标记 block 已从 GPU 传输到 CPU。
success=True(正常完成):遍历每个 key → 找到对应的 block → ref_cnt = 0(解除保护)→ 计入可驱逐计数 → 标记为 evictable(LRU 驱逐时可被回收)→ 记录到事件流。
Block 变为”已存储 + 可被其他请求加载”。
success=False(stalled 清理):遍历每个 key → 从 _policy 索引中删除 → self._free_block(block)(立即释放 CPU 内存)。
Block 被丢弃,不标记为已存储,其他请求无法加载它。
complete_load(keys, req_context)
标记 block 已从 CPU 加载回 GPU。增加 block 的 ref_cnt(保护 GPU 副本不被驱逐),
记录到事件流(BlockStored 表示 GPU 现在有了这份数据)。
TieringOffloadingManager
多层 offloading manager,primary tier 是 CPU,secondary tier 可以是磁盘/S3。 在 CPU manager 基础上增加了级联传输:GPU→CPU 完成后自动触发 CPU→磁盘。
complete_store(keys, req_context, success=True)
Step 1 — Primary tier 完成:调用 primary_tier.complete_store(keys, success),
逻辑和 CPU manager 一样(成功则标记 evictable,失败则释放内存)。
Step 2 — 级联到所有 secondary tier(仅 success=True 时):
对每个 secondary tier:调用 primary.prepare_read(keys) 获取 LoadStoreSpec 并增加 ref_cnt
(保护 block 在异步传输期间不被驱逐)→ tier.submit_store(job) 启动异步传输 primary→磁盘。
Step 3 — 递减 pending_primary_stores 计数:
prepare_store 时加了 1,这里减 1。当计数归零时调用 _maybe_finalize_request:
如果请求已结束(is_finished=True)且所有 primary store 都完成了,
对每个 secondary tier 调用 on_request_finished() 清理 lookup 索引。
pending_primary_stores的作用:延迟 secondary tier 的 finalize, 确保 GPU→CPU 的 store 全部完成后才开始清理 secondary tier 的请求状态。 如果提前 finalize,后续的complete_store可能还会触发新的级联传输, 但 secondary tier 已经认为”这个请求结束了”,状态不一致。
success=False 时:只执行 Step 1(primary tier 丢弃数据),跳过 Step 2 级联。
残数据不会传播到 secondary tier。
complete_load(keys, req_context)
只调用 primary_tier.complete_load(keys, req_context),
不涉及 secondary tier(load 只是 CPU→GPU,和磁盘无关)。