Pregel基于Checkpoint的持久化机制是实现Agent应用高可用性和长期记忆的基础它本质上是将 不断向前推进的图在“Superstep”之间将其状态固化的过程。和很多数据库持久化类似Pregel采用基于全量数据的状态快照基于增量更新的操作日志的持久化策略。1. 持久化Channel状态Pregel将状态“焊死”在Channel上这使持久化变得很简单它只要针对每个Superstep将每个Channel状态存下来就可以了。为了提高性能它只需要考虑有过更新的Channel而确Channel是否更新可以利用它的版本来决定。每个Channel都具有一个不断更新的版本如果某个Channel在某个Superstep内有过更新版本会往前更替。至于这个版本采用何种格式具体如何管理执行引擎将其下放到具体的Checkpointer实现中。作为Checkpointer的基类BaseCheckpointSaver将基于更新快照的存储实现在如下所示的put方法中。待持久化的数据被封装在一个Checkpoint对象中以checkpoint参数传入该方法config和metadata参数提供描述该Checkpoint的配置和元数据而new_versions以一个字典的形式提供了涉及的每个Channel的版本。config参数提供的RunnableConfig主要提供标识当前调用会话的Thread ID和Checkpoint命名空间。方法会返回的RunnableConfig对象一般会携带Thread ID、Checkpoint命名空间和Checkpoint ID。/* by yours.tools - online tools website : yours.tools/zh/navtiveunicode.html */ class BaseCheckpointSaver(Generic[V]): def put( self, config: RunnableConfig, checkpoint: Checkpoint, metadata: CheckpointMetadata, new_versions: ChannelVersions, ) - RunnableConfig async def aput( self, config: RunnableConfig, checkpoint: Checkpoint, metadata: CheckpointMetadata, new_versions: ChannelVersions, ) - RunnableConfig … ChannelVersions dict[str, str | int | float]1.1 CheckpointMetadata描述Checkpoint元数据的CheckpointMetadata类型定义如下其三个成员构成了图执行的谱系追踪Lineage Tracking和控制流导航的核心。它们不参与业务逻辑计算但决定了如何管理、回溯和审计图的状态。/* by yours.tools - online tools website : yours.tools/zh/navtiveunicode.html */ class CheckpointMetadata(TypedDict, totalFalse): source: Literal[input, loop, update, fork] step: int parents: dict[str, str]CheckpointMetadata的step字段返回Superstep编号代表当前Checkpoint在逻辑时间轴上的位置。source字段定义了当前这个Checkpoint是由哪种类型的操作触发生成的它是理解图生命历程的关键具体的选项包括input: 首次调用invoke或stream方法时触发代表图的“创世点”。这是由外部初始数据输入产生的第一个CheckpointSuperstep序号为 -1loop内部根据 Node 和 Channel 的订阅关系进行迭代时触发代表图在正常执行流程中的自动化流转。大多数中间步骤的 source 都是此值。update用户手动调用了update_state方法时触发代表一种“非自然”的状态变更。这通常用于人为干预、修正数据或在中断后注入信息。fork: 当用户从历史中的某个非最新Checkpoint重新启动执行时触发代表图产生了分支。它标记了执行流从主线脱离开启了一个独立的时间线。CheckpointMetadata的parents字段返回一个字典记录了当前Checkpoint与之前Checkpoint之间的拓扑关系其结果通常为通常形式为dict[namespace, parent_checkpoint_id]。由于 采用增量持久化当我们需要恢复一个完整的状态视图时引擎必须知道去哪里找那些没变动的数据parents字典提供了回溯路径。如果当前 Checkpoint 没有 某个Channel的值引擎就会根据parents指引跳转到父级Checkpoint去查找直到找到该 Channel最近一次被更新的版本。在包含子图的复杂场景中parents字段会记录父图命名空间对应的Checkpoint ID确保父子图之间的状态逻辑能够跨层级对齐。当source为“fork”时parents字段指向的是那个被分叉的历史点而不是时间线上的物理前一个点。当我们查看一个CheckpointMetadata对象时可以构建出如下逻辑这个状态是由于source产生的目前处于第step步。如果你想知道这个状态从何而来或者想找回那些没变的数据请根据parents列表向回追溯。1.2 Channel版本执行引擎将Channel版本的格式化权力下放给具体的Checkpointer实现它们通过重写如下这个get_next_version方法提供某个Channel的下一个版本。如果表示当前版本的current参数为None该方法会返回Channel的初始版本。class BaseCheckpointSaver(Generic[V]): def get_next_version(self, current: V | None, channel: None) - V以InMemorySaver为例它会将Channel版本格式化成一个由51个字符组成的字符串具体格式为f{sequence:032}.{random:016}。sequence从1开始递增代表了“物理状态的演进顺序”。在顺序执行时反映了Superstep的进度但是在遇到人为干预、中断、分叉或重试的情况下物理序列号仅能反映“存盘的次数”。Superstep序号反映的是“算法迭代的深度”所以不能将两者等同起来前者是大于或等于Superstep序号的。“random”部分是一个0-1之间的随机数。如果说Channel版本前 32 位第一部分是时间轴上的大刻度那么这第二部分就是确保状态在微观层面绝对唯一且可追溯的防伪码。由于 CPU 处理速度极快多个并行任务可能在极短的微秒级时间内尝试触发写入。如果仅依靠前 32 位的递增序列号在Superstep内部的多次写入可能会因为序列号来不及递增或在高并发下产生冲突。第二部分包含的随机数源自浮点数的小数位确保了即使第一部分相同物理上的 Checkpoint ID 也是全球唯一的这保证了UUID 级别的碰撞安全性。从支持时间旅行与状态分叉的角度来看第二部分就显得更加重要了。当我们从某个历史点重启时流程时假设从历史上的“00...001”处分叉出两条不同的执行路径两条路径的序列号可能都会递增到“00...002”但通过第二部分的随机随机值系统能以如下形式物理隔离这两条路径。所以版本的第二部分的内容不仅仅为了解决冲突而存在它使得持久化层可以同时存储同一逻辑步下的多个平行宇宙而不会发生覆盖。路径 A: 00...002.0.8494...路径 B: 00...002.0.1234...有的实现会严格采用类似于{ sequence}.{step_index}.{random_entropy}这样的三段式的版本格式化第二部分通常包含了.0. 或.1.这样的前缀它还兼具如下的功能子图导航当主图调用子图时子图产生的Checkpoint会通过第二部分的特定位来标识它属于哪个父级任务的“逻辑分支”。任务索引在同一个Superstep中如果一个Node产生了多条Pending Write第二部分可以用来索引这些写入的先后次序确保在恢复合并时不会错位。1.3 Checkpoint如下所示的是Checkpoint类型的定义。它的v字段表示决定Checkpoint结构的版本号用于后向兼容性。如果未来改变了存储格式运行时会根据这个值决定如何正确地反序列化旧数据。id和ts分别表示Checkpoint的唯一标识和生成时间戳。updated_channels字段返回的本Superstep内涉及更新的Channel列表。引擎根据订阅它们的Node来创建下一步执行的任务。channel_values字段存储了“涉及更新”的每个Channel的更新值。channel_versions字段返回所有Channel的版本。class Checkpoint(TypedDict): v : int id : str ts : str channel_values : dict[str, Any] channel_versions : ChannelVersions versions_seen : dict[str, ChannelVersions] updated_channels : list[str] | NoneNode并不能实时观察到Channel的变化versions_seen字段以{ Node名: { 依赖Channel名: 版本ID } }这样的结构返回每个Node执行时所能“看到”的Channel版本 它记录了Node完成计算时的前置条件。在中断恢复时引擎对比versions_seen如果Node看到的输入版本没变且它已经有了输出记录那么就无需重复执行所以这是实现因果一致性和幂等性的关键。如下的JSON是由Pregel生成的一个Checkpoint对象序列化后的结果。{ v: 4, ts: 2026-01-18T13:42:07.54215500:00, id: 1f0f4737-b4b1-6bbb-8001-1e44d720a9df, channel_versions: { foo: 00000000000000000000000000000001.0.6943525017042773, bar: 00000000000000000000000000000002.0.24038201058058928, baz: 00000000000000000000000000000003.0.8444674692332181 }, versions_seen: { __input__: {}, foo: { foo: 00000000000000000000000000000001.0.6943525017042773 }, bar: { bar: 00000000000000000000000000000002.0.24038201058058928 } }, updated_channels: [ baz ], channel_values: { foo: begin, bar: bar, baz: baz } }1.4 存储结构接下来我们以InMemorySaver为例看看Checkpoint会采用怎样的存储结构以及以此结构基础的读取方式。InMemorySave针对Checkpoint的存储涉及两个字典。一个名为blobs的字典用于存储Channel的荷载内容值采用的Key是由Thread ID、Checkpoint命名空间、Channel名称和版本构成的四元组。另一个名为storage的字典时一个具有四层结构的字典具体类型为defaultdict[str, dict[str, dict[str, tuple[tuple[str, bytes], tuple[str, bytes], str | None]]]]每一层字典的Key顶如下。我们可以认为blobs用于存储数据storage为索引表。层级Key 类型说明第一层Thread_id会话隔离区分不同的用户或对话流第二层Checkpoint_ns命名空间隔离支持子图或不同模块的独立状态空间第三层checkpoint_id版本隔离用于定位特定版本的快照最后一层字典的值是一个三元组tuple[tuple[str, bytes], tuple[str, bytes], str | None]它们将数据序列化后存储以确保内存中的数据是不可变且易于复制的三个部分包括tuple[str, bytes]前一部分表示的序列化类型通常是json或msgpack第二部分表示经过序列化后的Checkpoint对象由于具体的值已经存储在blobs中了所以此时的Checkpoint的channel_values字段已经被移除。这里不再存储 Python 字典对象而是存储字节流。这模拟了数据库存取过程并防止了Node在内存中意外修改已存盘的状态实现对象的深度隔离。tuple[str, bytes]Checkpoint元数据前一部分同样表示序列化类型第二部分为经过序列化后的CheckpointMetadata对象。str | None父级checkpoint_id这里的Parent与是否以子图形式执行没有关系这里代表作为调用者的NodeInMemorySaver可以通过这个 ID在内存中顺着链条向上追溯从而在恢复时合并增量状态。2. 持久化Pending WriteCheckpoint是在Superstep成功结束时针对Channel状态创建的它并不能反映一个尚未结束Superstep内的真实状态。Pregel在执行过程中可以能出现不可预期的错误或者需要人为介入导致可预期的中断并行执行的任务就会出现部分部分成功、部分失败和中断的情况。对于成功执行的操作它们针对目标Channel的写入并没有通过一个Checkpoint固定下来仅仅属于一个Pending Write。如果这种中间状态没有被持久化等下次恢复执行的时候本来已经成功执行的任务还会重复执行这是无法接受的。如果某个任务涉及到多次人为中断每次恢复执行都需要提供Resume Value。如果这些Resume Value没有持久化那么每次恢复调用提供的Resume Value永远都会提供给第一个中断多次中断根本就没法实现所以提供的Resume Value也需要以Pending Write的形式存储下来。持久化不仅仅需要将Superstep完成时将Channel的状态以Checkpoint固定下来还需要将涉及到的所有Pending Write按照先后顺序记录下来。Pending Write不仅仅限于描述成功任务针对目标Channel的写入和依序提供Resume Value任务在执行中抛出的异常和中断也会以Pending Write的形式被记录下来。实际上这种基于全量基础数据和增量操作日志相结合的持久化形式在很多内存数据库中得到了广泛的应用。以Redis为例它会采用相应的策略每隔一段时间将当前时间点的内存快照以RDB形式固化下来同时针对数据库所作的每个操作都会按照时间顺序以AOL的形式存储下来。对于Pregel来说Checkpoint就是RDBPending Wrtes就是AOL。当Pregel以恢复形式执行的时候它会先提取并应用指定的Checkpoint快照然后对状态为成功执行的Pending Write进行重放就能恢复中断时的状态。针对Pending Write的持久化通过调用BaseCheckpointSaver如下所示的put_writes/aput_writes方法完成。Pending Write的持久化是基于任务进行的所以我们需要指定任务的ID和路径。config参数提供RunnableConfig对象携带了所需的Thread ID Checkpoint命名空间和Checkpoint ID。具体针对Channel的Pending Write由writes参数提供的 这是一个由Channel名称和值的二元组组成的序列。class BaseCheckpointSaver(Generic[V]): def put_writes( self, config: RunnableConfig, writes: Sequence[tuple[str, Any]], task_id: str, task_path: str , ) - None async def aput_writes( self, config: RunnableConfig, writes: Sequence[tuple[str, Any]], task_id: str, task_path: str , ) - None对于InMemorySaver来说它将PendingWrite存储于一个结构为defaultdict[tuple[str, str, str], dict[tuple[str, int], tuple[str, str, tuple[str, bytes], str]]]的两层字典中。第一层字典的Key为Thread IDCheckpoint命名空间和Checkpoint ID三元组。第二层元组的第一个部分为Task ID第二部分是当前Pending Write在writes序列中的索引。真正存储的内容是由如下四部分组成的元组task_id冗余存储任务 ID便于快速检索。channel Channel的名称。tuple[str, bytes]前部分表示序列化格式如json或pickle。后一部分为序列化后的字节。task_path任务在图结构中的完整路径。