OffloadingConnectorScheduler 功能

__init__

初始化调度器:按 offloading spec 初始化 KV group 分组(full-attention、sliding-window、mamba)、 block 大小因子、lookup 策略;创建 manager(CPU 或 Tiering);初始化数据结构—— _req_status(每个请求的 offload 状态)、_jobs(在途 transfer job)、 _blocks_being_loaded(正在加载的 block,防止并发加载)、 _block_id_to_pending_jobs(block→job 的映射,用于 block 被重新分配时触发 flush)、 _stalled_job_ids(stalled job 的 worker 计数)以及 stalled 检测参数(300s timeout, 10s 间隔)。

TransferJobStatus:每个 in-flight job 的状态记录,创建时自动生成。 字段:req_id(属于哪个请求)、pending_count(还剩几个 worker 没报告完成)、 keys(传输的 block keys)、is_store(store 还是 load)、 sliding_window_block_ids / non_sliding_window_block_ids(引用的 GPU block)、 submitted_at(创建时间戳,用于 stalled 检测)。 _complete_jobstatus 参数就是这个。

_remove_pending_job

_block_id_to_pending_jobs 中删除指定 job_id 在给定 block_ids 上的条目。 如果某个 block 的 pending job 集合被清空,连该 block 的条目也一并删除。

pending job:还没被 worker 确认完成的 store/load 任务。 _block_id_to_pending_jobs 记录了每个 GPU block 被哪些 store job 引用。 如果这个 block 被 KV cache manager 重新分配给了另一个请求,而 store job 还没写完, 就需要把这个 store job 加入 _current_batch_jobs_to_flush, 通过 metadata 告诉 worker”别往这个 block 写了,它已经被回收了”。

_check_stalled_jobs

遍历所有在途 job,比较 submitted_at 与当前时间的差值。 超过 300s timeout 的 job 视为卡死,交给 _handle_stalled_job 清理。

self._jobs:所有还在传输中(GPU→CPU 的 store 或 CPU→GPU 的 load)的任务字典。 key 是 job_id,value 是 TransferJobStatus(req_id、pending_count、keys、is_store 等)。 worker 没报告完成之前,job 就一直在这里。

_handle_stalled_job

委托给 _complete_job(success=False) 清理卡死的 job(success=False 会导致 manager 丢弃残缺数据, 不标记为已存储、不级联到 secondary tier),然后将 job_id 记入 _stalled_job_ids 并附带剩余 worker 计数, 用于跳过后续到达的 late worker 报告。

self._stalled_job_ids:stalled 检测清理 job 后,worker 的报告可能还会陆续到达。 这个 dict 记录”已经处理过的 stalled job 还剩多少个 worker 的报告没到”。 key 是 job_id,value 是剩余 worker 数。每个 late report 到达时递减,到 0 就删除。

_complete_job

统一的 job 完成清理路径,正常和 stalled 共享。执行 5 步:

  1. 通知 manager(complete_store(success=...)complete_load
  2. 清理 _blocks_being_loaded(仅 load)
  3. 清理 _block_id_to_pending_jobs(block→job 反向索引)
  4. _jobs 删除 job
  5. req_status.transfer_jobs 移除 job;如果是最后一个 job 且请求已完成,删除 _req_status

不调用 on_request_finished——那是 request_finished() 的职责。

manager 做什么OffloadingManager,负责真正的 KV cache 读写。 complete_store:标记 block 已从 GPU 传到 CPU,更新 ref_cnt,标记为可驱逐。 complete_load:标记 block 已从 CPU 加载回 GPU。

_blocks_being_loaded:set,记录当前正在从 CPU 加载回 GPU 的 block keys。 另一个请求如果要加载同样的 block,发现已经在 set 中就跳过——避免重复加载。

difference_update:Python set 方法,A.difference_update(B) 等价于 A = A - B, 从 A 中删除所有 B 里有的元素。

_maximal_prefix_lookup

从 block 0 开始逐个调用 manager.lookup(),统计连续 HIT 的数量。 遇到 HIT_PENDING/RETRY 不中断(继续扫描让 manager 发起异步 lookup),遇到 MISS 停止。 返回连续命中数,如果有 deferred lookup 则返回 None。

_sliding_window_lookup

从 block 序列末尾向前扫描,统计连续 HIT 的 streak。 遇到连续 HIT 数 ≥ sliding_window_size 时停止,返回该 streak 的起始索引。 用于滑动窗口 attention 场景——只有末尾的 window_size 个 block 可能被命中。

_touch

标记指定请求的所有 offload_keys 的 access_time,用于 LRU 驱逐策略。 在 update_state_after_alloc 中调用,表示这些 block 的 GPU 副本已被访问。

manager.touch:标记 block 的最近访问时间。CPU 内存满了需要驱逐 block 时, 优先驱逐最久没被访问的。touch 就是在说”这些 block 刚被 GPU 读过了,不要驱逐它们”。

_lookup

对请求的 offload_keys 调用 manager.lookup(), 先对 full-attention groups 调用 _maximal_prefix_lookup, 再对 sliding-window groups 调用 _sliding_window_lookup。 返回能命中的 token 数(None 表示需要延迟,等异步 lookup 完成)。

为什么复杂:同时处理三种情况:

  1. Full attention:从 block 0 开始找连续命中的前缀,命中多少就跳过多少 token
  2. Sliding window:只有末尾 window_size 个 block 可能被命中,必须从尾向前扫
  3. 异步 lookup:block 缓存状态不确定(HIT_PENDING / RETRY)时不能立刻返回, 需要 manager 先发起异步查询,defer 结果等下次再查

_lookup 是入口,内部调用 _maximal_prefix_lookup_sliding_window_lookup, 分别处理 full-attention 和 sliding-window 的命中查询。

on_new_request

请求首次加入调度器时调用:创建 ReqContext、调用 manager.on_new_request() 获取 offload policy(block_level / request_level)、初始化 RequestOffloadState 并加入 _req_status

get_num_new_matched_tokens

返回该请求可以加载的额外 token 数。如果还有在途 transfer job 则返回 None(延迟处理)。 调用 _lookup 查询 prefix cache 命中情况,返回命中 token 数 + 是否需要异步加载。

update_state_after_alloc

请求在 KV cache manager 中分配了 block 后调用。更新 _req_status 中的 block_ids 分组, 处理 sliding-window 的 stale 条目(被重新分配的 block 标记为 0),并将新分配的 block_ids 加入 _current_batch_allocated_block_ids

为什么需要:防止往已经被回收的 block 里写东西。 场景:请求 A 的 block 5 被滑动窗口机制释放,KV cache manager 把 block 5 重新分配给了请求 B。 但请求 A 可能还有 store job 等着往 block 5 写数据!

这个函数做的事:

  1. 更新请求 A 的 block_ids(标记 block 5 已不属于它)
  2. 把 block 5 的条目设为 0(stale entry)
  3. 后续 _update_req_states 扫描时发现 block_ids[j] == 0 → 跳过这个 block,不往它写数据

_update_req_states

build_connector_meta 之前调用,决定哪些 block 需要 store、构建 store jobs。 遍历所有活跃请求,计算可 offload 的 token 数,调用 manager.prepare_store() 获取 GPU→CPU 的传输参数,创建 TransferJob 加入 _jobsreq_status.transfer_jobs。 对滑动窗口 block 注册到 _block_id_to_pending_jobs

_build_store_jobs

(内部方法,被 _update_req_states 调用)为一个请求构建具体的 store job: 收集待 offload 的 keys,调用 manager.prepare_store(),生成 GPULoadStoreSpec, 分配 job_id,关联 block→job 映射(包括 sliding-window block 的反向索引注册)。

build_connector_meta

构造当前 engine step 的 OffloadingConnectorMetadata: 包含待提交的 load jobs(key + 传输参数)、store jobs(key + GPULoadStoreSpec)、 以及需要 flush(取消)的 store job 集合。被 scheduler 在每个 step 中调用, 然后 connector 将 metadata 分发给 worker。

has_pending_push_work

返回是否有未完成的 offload 工作(store jobs 待处理或 _req_status 中还有未完成的请求)。 用于判断 engine 是否需要继续处理该 connector。

update_connector_output

处理 worker 报告的 transfer 完成:遍历 meta.completed_jobs, 递减 pending_count,当 pending_count == 0 时调用 _complete_job 清理 job。 末尾运行 stalled 检测(10s 间隔节流)。 跳过 _stalled_job_ids 中的 late worker 报告和 reset_cache 之前的 stale job。

get_stats

收集并返回 offloading connector 的统计指标(store/load 次数、耗时、大小、cache 使用率)。 合并 _connector_stats(来自 worker)和 manager.get_stats()(来自 manager)。

request_finished

请求生成结束(is_finished)时调用:

  1. 调用 manager.on_request_finished() —— 通知 manager 不会再有新的 submit
  2. 如果该请求没有在途 job,立即删除 _req_status
  3. 如果有在途 job,保留 _req_status(等 _complete_job 清理最后一个 job 时才删除), 并将 non-sliding-window block 注册到 _block_id_to_pending_jobs

take_events

OffloadingEventsTrackermanager 收集 KV cache 事件(BlockStored/BlockRemoved), 供引擎转发给 vLLM core 的 KV cache manager。

reset_cache

全停操作:清空所有缓存 block。清空 _req_status 中的已完请求、重置 manager(丢弃所有 block)、 重置 store 进度、清空 _jobs_block_id_to_pending_jobs、设置 _stale_job_threshold 以丢弃 reset 之前的 worker 响应、重置 events tracker 和 stalled 检测状态。

shutdown

关闭调度器:调用 manager.shutdown() 释放后端资源(停止传输线程池、关闭 NIXL agent 等)。

CPUOffloadingManager

单层 offloading manager,直接管理 CPU 内存中的 KV cache block。

complete_store(keys, req_context, success=True)

标记 block 已从 GPU 传输到 CPU。

success=True(正常完成):遍历每个 key → 找到对应的 block → ref_cnt = 0(解除保护)→ 计入可驱逐计数 → 标记为 evictable(LRU 驱逐时可被回收)→ 记录到事件流。 Block 变为”已存储 + 可被其他请求加载”。

success=False(stalled 清理):遍历每个 key → 从 _policy 索引中删除 → self._free_block(block)(立即释放 CPU 内存)。 Block 被丢弃,不标记为已存储,其他请求无法加载它。

complete_load(keys, req_context)

标记 block 已从 CPU 加载回 GPU。增加 block 的 ref_cnt(保护 GPU 副本不被驱逐), 记录到事件流(BlockStored 表示 GPU 现在有了这份数据)。


TieringOffloadingManager

多层 offloading manager,primary tier 是 CPU,secondary tier 可以是磁盘/S3。 在 CPU manager 基础上增加了级联传输:GPU→CPU 完成后自动触发 CPU→磁盘。

complete_store(keys, req_context, success=True)

Step 1 — Primary tier 完成:调用 primary_tier.complete_store(keys, success), 逻辑和 CPU manager 一样(成功则标记 evictable,失败则释放内存)。

Step 2 — 级联到所有 secondary tier(仅 success=True 时): 对每个 secondary tier:调用 primary.prepare_read(keys) 获取 LoadStoreSpec 并增加 ref_cnt (保护 block 在异步传输期间不被驱逐)→ tier.submit_store(job) 启动异步传输 primary→磁盘。

Step 3 — 递减 pending_primary_stores 计数prepare_store 时加了 1,这里减 1。当计数归零时调用 _maybe_finalize_request: 如果请求已结束(is_finished=True)且所有 primary store 都完成了, 对每个 secondary tier 调用 on_request_finished() 清理 lookup 索引。

pending_primary_stores 的作用:延迟 secondary tier 的 finalize, 确保 GPU→CPU 的 store 全部完成后才开始清理 secondary tier 的请求状态。 如果提前 finalize,后续的 complete_store 可能还会触发新的级联传输, 但 secondary tier 已经认为”这个请求结束了”,状态不一致。

success=False:只执行 Step 1(primary tier 丢弃数据),跳过 Step 2 级联。 残数据不会传播到 secondary tier。

complete_load(keys, req_context)

只调用 primary_tier.complete_load(keys, req_context), 不涉及 secondary tier(load 只是 CPU→GPU,和磁盘无关)。