vLLM V1 Block
本章节基于 v0.9.0 分支的 V1 版本 ,自底向上分析 vLLM 灵魂特性 KV Cache Page Attention 中的块(Block)管理。这块内容可以参考 kaiyuan 详细的分析:vLLM 的 prefix cache 为何零开销 。
底层:KVCacheBlock 和 FreeKVCacheBlockQueue
最底层的 KVCacheBlock 使用了引用计数和双向链表,其中 block_hash=None 指的是未满块(暂未哈希)。
1 2 3 4 5 6 7 8 9 10 11 12 13 class KVCacheBlock : block_id: int ref_cnt: int = 0 _block_hash: Optional [BlockHashType] = None prev_free_block: Optional ["KVCacheBlock" ] = None next_free_block: Optional ["KVCacheBlock" ] = None
FreeKVCacheBlockQueue 是 V1 架构里十分出彩的结构设计,内部维护了一个 blocks 列表和 head/tail 指针,支持 remove(any_existing_block)
、append(new_end_block)
、pop_left(front_block)
的双向链表基本操作。其内含的设计哲学是:设计一个结构体来统一维护无引用计数(相关请求均已结束)但未来可能可复用的这批块 。新请求尝试复用时也会尝试去这个队列里(用哈希)匹配,匹配成功则可复用此块结果,无需重新计算 KV Cache;申请新块时,如果存量空块不足,可以再从这个链表里淘汰一些旧块。
1 2 3 4 5 6 7 8 9 10 11 12 class FreeKVCacheBlockQueue : def __init__ (self, blocks: list [KVCacheBlock] ) -> None : self .num_free_blocks = len (blocks) self .free_list_head: Optional [KVCacheBlock] = blocks[0 ] self .free_list_tail: Optional [KVCacheBlock] = blocks[-1 ] for i in range (self .num_free_blocks): if i > 0 : blocks[i].prev_free_block = blocks[i - 1 ] if i < self .num_free_blocks - 1 : blocks[i].next_free_block = blocks[i + 1 ]
官方称这里用到了 Zero-Overhead Prefix Caching 技术,因为双向链表会带来如下好处:
复用成功时支持 O ( 1 ) O(1) O ( 1 ) 移除这个块。
天然支持 LRU 淘汰策略,即释放请求时将块塞入队尾、征用空间时将队首块取出。
中层:BlockPool
BlockPool 结构体用 blocks 列表维护全量块信息,用 free_block_queue 维护上述空闲但可复用的双向队列,并维护 cached_block_hash_to_block 字典用来从块哈希映射到块本身。根据官方的描述,他们保留了可能出现哈希值相同的场景,辅助用 block_id 来保证索引唯一,这一点我甚感奇怪。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 class BlockPool : """BlockPool that manages KVCacheBlocks. It provides methods to allocate, free and cache the kv cache blocks. The free_block_queue stores the free blocks in eviction order to enable allocation, free, and cache eviction. The cached_block_hash_to_block maps between block hash and cached block to support finding cached blocks by their block hash. """ def __init__ ( self, num_gpu_blocks: int , enable_caching: bool , enable_kv_cache_events: bool = False , ): assert isinstance (num_gpu_blocks, int ) and num_gpu_blocks > 0 self .num_gpu_blocks = num_gpu_blocks self .enable_caching = enable_caching self .blocks: list [KVCacheBlock] = [ KVCacheBlock(idx) for idx in range (num_gpu_blocks) ] self .free_block_queue = FreeKVCacheBlockQueue(self .blocks) self .cached_block_hash_to_block: dict [BlockHashType, dict [ int , KVCacheBlock]] = defaultdict(dict ) self .null_block = self .free_block_queue.popleft() self .enable_kv_cache_events = enable_kv_cache_events self .kv_event_queue: list [KVCacheEvent] = []
BlockPool 提供以下几个关键方法:
get_cached_block(block_hash)
:利用映射表查找特定哈希的块,若命中多个则默认返回第一个。
cache_full_blocks(request, cached_blocks, goal_blocks, ...)
:将一段完整块进行哈希运算并保存下来。哈希函数可自定义,默认是 sha256。这里加了个优化,当前请求在尝试匹配时可能已经算过一遍前缀哈希了,参数里支持传入缓存结果 block_hashes
,函数会从未缓存处开始计算。吐槽一下 request 这个参数,BlockPool 不应该感知请求的具体结构,而应拆解成所需的内容(主要是用来哈希)。
get_new_blocks(num_blocks)
:新征用指定个块。从 free_block_queue 队首取出 num_blocks 个块并把它们的引用计数 + 1 +1 + 1 。若某些块已存在哈希值,需要清空并从 cached_block_hash_to_block 中移除。
touch(blocks)
:表示征用 blocks 列表里的块(暂时不能被释放),用于当前请求命中哈希的场景。做法是遍历 blocks 列表进行引用计数 + 1 +1 + 1 ,若从 0 0 0 变 1 1 1 则需从 free_block_queue 里移除。
free_blocks(ordered_blocks)
:将 ordered_blocks 列表里的块释放,用于当前请求已结束的场景。按顺序遍历 ordered_blocks 列表进行引用计数 − 1 -1 − 1 ,若从 1 1 1 变 0 0 0 则需插入 free_block_queue 尾部。
注意到 BlockPool 里还维护了 KVCacheEvent 事件列表。当新保存哈希块或从映射表中移除哈希块时,分别会触发 BlockStored 和 BlockRemoved 事件,并记录所有相关参数。
上层:SingleTypeKVCacheManager
SingleTypeKVCacheManager 是包装了 BlockPool 的上层结构。之所以加了 SingleType 这个怪前缀,应该是为了兼容不同类型 KV Cache(这个类会根据传入的单个 KVCacheSpec 处理单个类型),目前实现了两个派生类 FullAttentionManager 和 SlidingWindowManager 。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 def __init__ ( self, kv_cache_spec: KVCacheSpec, block_pool: BlockPool, use_eagle: bool , num_kv_cache_groups: int , caching_hash_fn: Callable , ) -> None : self .block_size = kv_cache_spec.block_size self .kv_cache_spec = kv_cache_spec self .block_pool = block_pool self .use_eagle = use_eagle self .req_to_blocks: defaultdict[str , list [KVCacheBlock]] = defaultdict(list ) self .num_cached_block: dict [str , int ] = {} self .num_kv_cache_groups = num_kv_cache_groups self .caching_hash_fn = caching_hash_fn
除了核心的 BlockPool 实例之外,SingleTypeKVCacheManager 会维护两个字典:
self.req_to_blocks
:记录从请求到已开辟块列表的映射,官方注释说用于请求结束时定位待释放块。
self.num_cached_block
:记录每个请求已缓存的块数。官方注释提到只会记录 Running 状态的请求,不记录 Reempted 状态的请求。
我认为从这层开始感知请求是比较合理的。不过这两个字典会让人很迷惑,因为后者就像是前者取 len()
。分析代码后发现两者并不是实时同步的:self.req_to_blocks
会忠实地记录每个请求已复用的或新开辟的块 ,而 self.num_cached_block
仅会计数 每个请求中已进入可复用哈希阶段的块数 。
SingleTypeKVCacheManager 核心方法如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 def get_num_blocks_to_allocate (self, request_id: str , num_tokens: int , new_computed_blocks: list [KVCacheBlock] ) -> int : num_required_blocks = cdiv(num_tokens, self .block_size) num_new_blocks = (num_required_blocks - len (new_computed_blocks) -len (self .req_to_blocks[request_id])) num_evictable_computed_blocks = sum (blk.ref_cnt == 0 for blk in new_computed_blocks) return ((num_new_blocks + num_evictable_computed_blocks) * self .num_kv_cache_groups) def save_new_computed_blocks (self, request_id: str , new_computed_blocks: list [KVCacheBlock] ) -> None : if request_id not in self .num_cached_block: req_blocks = self .req_to_blocks[request_id] assert len (req_blocks) == 0 req_blocks.extend(new_computed_blocks) self .num_cached_block[request_id] = len (new_computed_blocks) else : assert len (new_computed_blocks) == 0 def allocate_new_blocks (self, request_id: str , num_tokens: int ) -> list [KVCacheBlock]: req_blocks = self .req_to_blocks[request_id] num_required_blocks = cdiv(num_tokens, self .block_size) num_new_blocks = num_required_blocks - len (req_blocks) if num_new_blocks <= 0 : return [] else : new_blocks = self .block_pool.get_new_blocks(num_new_blocks * self .num_kv_cache_groups) req_blocks.extend(new_blocks) return new_blocks def cache_blocks (self, request: Request, block_hashes: list [BlockHashType], num_tokens: int ) -> None : num_cached_blocks = self .num_cached_block[request.request_id] num_full_blocks = num_tokens // self .block_size self .block_pool.cache_full_blocks( request=request, blocks=self .req_to_blocks[request.request_id], block_hashes=block_hashes, num_cached_blocks=num_cached_blocks, num_full_blocks=num_full_blocks, block_size=self .block_size, hash_fn=self .caching_hash_fn, ) self .num_cached_block[request.request_id] = num_full_blocks def free (self, request_id: str ) -> None : req_blocks = self .req_to_blocks.pop(request_id, []) ordered_blocks = reversed (req_blocks) self .block_pool.free_blocks(ordered_blocks) self .num_cached_block.pop(request_id, None )
SingleTypeKVCacheManager 的子类会提供不同 cache 命中功能的函数。以 FullAttentionManager 为例:
1 2 3 4 5 6 7 8 9 10 11 12 def find_longest_cache_hit (self, block_hashes: list [BlockHashType], max_length: int ) -> list [KVCacheBlock]: computed_blocks: list [KVCacheBlock] = [] max_num_blocks = max_length // self .block_size for i in range (max_num_blocks): block_hash = block_hashes[i] if cached_block := self .block_pool.get_cached_block(block_hash): computed_blocks.append(cached_block) else : break if self .use_eagle and len (computed_blocks) > 0 : computed_blocks.pop() return computed_blocks
封装层:KVCacheManager
KVCacheManager 包装了更多的业务相关的语义,比如感知 Page Attention 里的 block_size 和 block_number,定义哈希函数、日志和监控。单从块管理角度来说,在核心的 SingleTypeKVCacheManager 字段基础上额外维护了一个 self.req_to_block_hashes
字段,用来缓存同一个请求在不同函数调用时的哈希值,减少冗余计算。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 class KVCacheManager : def __init__ ( self, kv_cache_config: KVCacheConfig, max_model_len: int , enable_caching: bool = True , caching_hash_algo: str = "builtin" , use_eagle: bool = False , log_stats: bool = False , enable_kv_cache_events: bool = False , ) -> None : assert len (kv_cache_config.kv_cache_groups) == 1 kv_cache_spec = kv_cache_config.kv_cache_groups[0 ].kv_cache_spec self .block_size = kv_cache_spec.block_size self .num_gpu_blocks = kv_cache_config.num_blocks self .max_model_len = max_model_len self .enable_caching = enable_caching self .caching_hash_fn = sha256 if caching_hash_algo == "sha256" else hash self .use_eagle = use_eagle self .log_stats = log_stats self .prefix_cache_stats = PrefixCacheStats() if log_stats else None self .block_pool = BlockPool(self .num_gpu_blocks, enable_caching, enable_kv_cache_events) self .single_type_manager = get_manager_for_kv_cache_spec( kv_cache_spec=kv_cache_spec, block_pool=self .block_pool, use_eagle=self .use_eagle, num_kv_cache_groups=1 , caching_hash_fn=self .caching_hash_fn, ) self .req_to_block_hashes: defaultdict[str , list [BlockHashType]] = defaultdict(list )
核心函数只有两个,正好覆盖了 Prefix Cache 的前缀查询和正式分配两大功能。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 def get_computed_blocks (self, request: Request ) -> tuple [KVCacheBlocks, int ]: if (not self .enable_caching or request.sampling_params.prompt_logprobs is not None ): return KVCacheBlocks.create_empty(), 0 block_hashes = self .req_to_block_hashes[request.request_id] if not block_hashes: block_hashes = hash_request_tokens(self .caching_hash_fn, self .block_size, request) self .req_to_block_hashes[request.request_id] = block_hashes if self .log_stats: assert self .prefix_cache_stats is not None self .prefix_cache_stats.requests += 1 max_cache_hit_length = request.num_tokens - 1 computed_blocks = self .single_type_manager.find_longest_cache_hit(block_hashes, max_cache_hit_length) num_computed_tokens = len (computed_blocks) * self .block_size if self .log_stats: assert self .prefix_cache_stats is not None self .prefix_cache_stats.queries += request.num_tokens self .prefix_cache_stats.hits += num_computed_tokens return KVCacheBlocks(computed_blocks), num_computed_tokens def allocate_slots ( self, request: Request, num_new_tokens: int , num_new_computed_tokens: int = 0 , new_computed_blocks: Optional [KVCacheBlocks] = None , num_draft_tokens: int = 0 , num_lookahead_tokens: int = 0 , delay_cache_blocks: bool = False , ) -> Optional [KVCacheBlocks]: if num_new_tokens == 0 : raise ValueError("num_new_tokens must be greater than 0" ) if new_computed_blocks is not None : new_computed_block_list = new_computed_blocks.blocks else : new_computed_block_list = [] num_computed_tokens = (request.num_computed_tokens + num_new_computed_tokens) num_tokens_need_slot = min (num_computed_tokens + num_new_tokens + num_lookahead_tokens, self .max_model_len) num_blocks_to_allocate = ( self .single_type_manager.get_num_blocks_to_allocate( request_id=request.request_id, num_tokens=num_tokens_need_slot, new_computed_blocks=new_computed_block_list, )) if num_blocks_to_allocate > self .block_pool.get_num_free_blocks(): return None if self .enable_caching: self .block_pool.touch(new_computed_block_list) else : assert not new_computed_block_list, ("Computed blocks should be empty when prefix caching is disabled" ) self .single_type_manager.save_new_computed_blocks(request.request_id, new_computed_block_list) new_blocks = self .single_type_manager.allocate_new_blocks(request.request_id, num_tokens_need_slot) if not self .enable_caching or delay_cache_blocks: return KVCacheBlocks(new_blocks) self .single_type_manager.cache_blocks( request, self .req_to_block_hashes[request.request_id], num_computed_tokens + num_new_tokens - num_draft_tokens) return KVCacheBlocks(new_blocks)
这段代码有个 trick 的点是:allocate_slots()
在开辟完新块后,就会直接尝试把新的完整块的哈希利用起来。考虑接连到来的两个请求,如果它们的 tokens 列表完全相同,后者在逻辑上会直接复用前者的哈希块,但前者在 GPU worker runner 处其实还没算出来。针对这个疑惑的分析如下:
由于 Python 的 GIL 特性,每个 EngineCore 下的 Scheduler 会单线程处理并发请求,并发上没有问题。
在 GPU Worker Runner 执行时,即便两个请求处于同一个 batch,它们应该会各自触发 forward 计算对应的 KV Cache,并尝试对同一个 GPU block list 进行相同内容的写入操作,也没有问题。
vLLM V1 Scheduler
vLLM V1 WorkerRunner
vLLM V1 EngineCore 和 Executor
Ascend