本文是「Pineapple」系列的第二篇,上一篇完整拆解了引擎的核心原理——数据冒险驱动的 DAG 构建、channel-per-node 并行调度、Lua 嵌入和控制流编译。之后的两天里,项目经历了 10 个版本迭代、27 次功能提交,沿三条主线推进:可观测性与调试体验、运行时架构演进、新算子与数据并行框架。本文逐一记录每个改动的背景、技术决策和收效。
DAG 可视化:让黑箱变成白盒
Pineapple 的 DAG 是引擎自动从算子的输入输出字段推导出来的——用户不手动画图。这带来了便利,但也带来了黑箱:当 pipeline 复杂到二三十个算子时,用户无法直观确认「引擎理解的依赖关系是不是我想的那样」。
DOT 与 Mermaid 双格式
internal/dag/visualize.go 提供了两种渲染格式:
1 | func RenderDOT(g *Graph) string { |
Graphviz DOT 适合本地渲染成 SVG,精确控制布局;Mermaid flowchart 适合直接嵌入 Markdown 文档和 PR description。节点按算子类型着色——Recall 绿、Transform 蓝、Filter 橙、Merge 紫、Reorder 黄、Observe 灰——一目了然。
通过 Engine.RenderDAG(format) 公共 API 暴露,HTTP 端点 GET /dag?format=dot|mermaid 直接服务。用户一条 curl 就能拿到 DAG 图:
1 | curl -s localhost:8080/dag | dot -Tsvg -o dag.svg |
传递性归约与纵向布局
初版可视化直接遍历所有推导出的边,包括被更长路径隐含的冗余边,导致图上出现大量交叉线。解决方案是传递性归约:遍历每条边 u→v,检查是否存在中间路径 u→...→v,有则跳过。同时将布局方向从左到右(LR)改为自上而下(TB),更符合 pipeline 的流水线直觉。
这个渲染层归约后来被更彻底的方案取代(见后文「执行图传递性归约」),渲染函数简化为直接遍历 Node.Succs 画边——因为 Succs 本身已经是归约后的最小边集。
资源配置热加载
Pineapple 在上一个版本已经支持引擎配置热加载——修改 JSON 配置文件后服务自动重建 Engine。但 ResourceManager(管理特征索引等需要定时刷新的外部数据)没有跟上,仍然需要重启服务才能更新资源配置。
解决方案是将 resources 从裸指针改为 atomic.Pointer[resource.Manager],与 enginePtr 对齐。pkg/server/server.go 中的热加载流程变为:
1 | func reloadConfig(path string) error { |
关键设计:新 Manager 先 Start 成功再替换。如果新配置有问题(解析失败、资源拉取超时),旧配置保持不变,服务继续运行。这避免了「加载到一半,新的没起来、旧的已停掉」的窗口期。resources.Swap 原子替换后才 Stop 旧 Manager,读路径始终有效。
列存 DataFrame
行存的 GC 瓶颈
原有的 RowFrame 以 []map[string]any 存储每个 item——一行一个 map。推荐系统的 pipeline 经常处理上千个 item,每个 item 几十个字段,这意味着数万个小 map 分配,GC 压力显著。
列存设计
引入 Frame 接口,抽象出行存和列存两种实现:
1 | type Frame interface { |
ColumnFrame 以 map[string][]any 存储,每个字段一个 slice,item 按下标访问:
1 | type ColumnFrame struct { |
一个设计选择值得一提:列存使用 []any 而非泛型 slice。原因是 Pineapple 的字段值类型在运行时才确定(JSON 解析后是 float64、string、bool 等混合类型),泛型化无法避免 interface boxing,反而增加复杂度。
通过 JSON 配置的 "storage_mode": "row"|"column" 选择,Python DSL 侧同步支持 Flow(storage_mode="column")。列存在字段写入和构造上更高效(分配更少),但结构变更(删除行、重排序)因需遍历所有列而略慢——适用于「字段多、结构变更少」的 pipeline。
控制算子显式命名
Python DSL 中的 if_() / else_() / end_if_() 在编译时被降级为 transform_by_lua 算子。之前这些控制算子的名字是 transform_by_lua_XXXXXX(哈希后缀),在 DAG 可视化中完全看不出哪个节点是条件分支。
编译器改为生成 if_1、elseif_2、else_3 这样的显式名称。DAG 图中一眼就能识别控制流节点,不再淹没在一堆 transform_by_lua_* 里。这个改动看似微小,但对包含多层嵌套条件分支的复杂 pipeline 来说,可视化的可读性提升是质的飞跃。
运行时架构两项重构
v0.3.10 在同一个版本内完成了两项关联的架构改善。
Frame 并发自治
原来的调度器持有一把全局 sync.RWMutex,每次算子读写 DataFrame 都要经过调度器加锁。这个设计把「数据安全」的职责放在了错误的层级——调度器负责 DAG 编排,不应该关心数据访问的同步。
重构后锁下沉到 Frame 实现内部。RowFrame 和 ColumnFrame 各自持有一个 sync.RWMutex:
1 | type RowFrame struct { |
读操作(Common、Item、BuildInput)取 RLock,写操作(SetCommon、ApplyOutput)取 Lock。调度器代码中移除所有锁操作。
过程中曾尝试更激进的方案:给 ColumnFrame 用双锁(commonMu + structMu),让 common 字段和 item 字段的并发访问互不干扰。但 benchmark 显示结构体从 24 字节膨胀到 72 字节后,cache line 压力导致结构变更操作约 1.8 倍退化。最终回退到单锁方案——简单且够用。
执行图传递性归约
前文提到的 DAG 可视化传递性归约做在渲染层,每次渲染都要重新计算。更重要的是,调度器遍历 Node.Preds 等待前驱时,仍然要遍历包含冗余边的完整边集——虽然语义正确(等待一个已经被间接等过的前驱只是多一次 channel receive),但浪费了 CPU。
解决方案是将归约从渲染层下沉到 dag.Build() 阶段:
1 | func Build(sequence []string, operators map[string]config.OperatorConfig) (*Graph, error) { |
reduce 的实现对每条边 u→v 做 BFS 检查是否存在绕过直连的替代路径:
1 | func reducedEdges(g *Graph) [][2]int { |
归约不改变执行语义:done[pred] channel 的 close 在 Go 内存模型下提供 happens-before 保证,可达性不变则执行顺序约束不变。渲染层代码因此大幅简化(删除 76 行归约逻辑),直接遍历 Node.Succs 画边即可。整个系统只做一次归约,调度器和可视化同时受益。
跨服务 Transform 算子
推荐系统中常见的模式是:主 pipeline 需要调用下游的特征服务获取额外字段。这本质上是一个「远程子 pipeline 调用」。之前用户只能写自定义 Go 算子来实现 HTTP 调用,缺乏统一抽象。
新增的 transform_by_remote_pineapple 算子调用下游 Pineapple 服务的 /execute 端点,核心挑战是字段映射:本地 frame 的字段名和下游服务的字段名可能不同。
按位置对应的映射模型
1 | // 构造下游请求:local common_input[i] → remote common_request[i] |
common_request[i] 与 common_input[i] 按位置一一对应,item_request[i] 与 item_input[i] 同理,响应侧亦然。这四组映射参数是可选的——当未提供时,直接使用 metadata 字段名(无映射),适用于上下游字段名一致的场景。
错误处理策略
1 | func (o *RemotePineappleOp) handleError(out *pine.OperatorOutput, err error) error { |
fail_on_error 参数控制下游错误的处理方式:true 时下游错误为 fatal,整个 DAG 中止;false 时降级为 warning,pipeline 继续执行。这让业务团队可以根据下游服务的重要程度灵活配置——核心特征服务用 fatal,辅助增强服务用 warning。
Python DSL 中一行声明即可接入下游服务:
1 | flow.transform_by_remote_pineapple( |
算子级数据并行框架
许多 Transform 算子的 per-item 计算是独立的(如特征归一化、远程调用、模型推理),天然可以并行加速。但之前没有统一机制——要么靠用户在算子内部自己开 goroutine,要么靠 DAG 层面的算子间并行。
三个关键设计决策
在动手实现之前,有三个问题需要先回答。
第一,哪些算子类型支持数据并行? 初看之下 Recall 和 Observe 似乎也可以,但仔细分析后发现只有 Transform 兼容:
| 类型 | 禁止原因 |
|---|---|
| Recall | 大多不依赖 item 输入,分片只会重复召回 N 次 |
| Observe | 只读,并行只是重复副作用 |
| Filter/Merge/Reorder | 屏障语义,需要全局视图 |
第二,数据并行时允许写 common 字段吗? 多个 shard 并行写 common 字段没有安全的 merge 语义——不同 shard 可能算出不同值,引擎无法决定谁胜出。而真正需要数据并行的场景(per-item 密集计算)天然只写 item 字段。因此:启用 data_parallel 时强制要求 common_output 为空。如果未来出现真实的 common 聚合需求,再讨论 map-reduce 方案。
第三,在哪一层实现? 不改 DAG 推导逻辑。数据并行是单个算子节点的运行时优化,对 DAG 拓扑完全透明。
编译期守门
这两条约束在 NewEngine 阶段强制校验:
1 | func validateDataParallel(opName string, opCfg *config.OperatorConfig, opType types.OperatorType) error { |
编译期拒绝不合法配置,而非运行时静默失败。
分片、并行、合并
运行时实现在 internal/runtime/parallel.go,分三步。
分片——将 items 切成 N 份,尽量等分,余量分配给前几个 shard:
1 | func splitInput(input *types.OperatorInput, n int) ([]*types.OperatorInput, []int) { |
所有 shard 共享同一个 common map(只读),items 按下标范围独立拷贝。offsets 记录每个 shard 在原始 items 中的起始位置,供合并时重映射索引。
并行执行——每个 shard 一个 goroutine,带独立的 panic recovery:
1 | func parallelExecute(ctx context.Context, cop *CompiledOperator, input *types.OperatorInput) (*types.OperatorOutput, error) { |
errOnce + cancel() 确保任一 shard 失败立即终止其他 shard。executeWithRecovery 将 panic 转化为 PanicError,与单线程路径行为一致。
合并——将各 shard 的 itemWrites 按偏移量重映射为绝对索引:
1 | func mergeOutputs(outputs []*types.OperatorOutput, offsets []int) *types.OperatorOutput { |
shard 0 写 [0, k) 的 item,shard 1 写 [k, m) 的 item……各 shard 的本地索引加上 offset 就是全局索引。合并后的 OperatorOutput 与单线程执行产生的完全一致。
调度器接入
调度器的改动极小——只需在执行前判断 DataParallel > 1:
1 | if cop.Config.DataParallel > 1 { |
后续的 ValidateOutput、warning 收集、debug snapshot、ApplyOutput 全部保持原流程不变。
等价性验证
构造 100 个 items,同一个 doubleItemOp 分别以 data_parallel=1(基线)和 data_parallel=2,3,4,7 执行,逐 item 对比输出值,断言完全一致。测试覆盖了等分、非等分、shard 数大于 item 数等边界情况。
性能实测
使用 Horner 多项式(degree 5,per-item 做 5 次乘加)通过完整引擎路径测量:
| items | shards=1 | shards=2 | shards=4 | shards=8 |
|---|---|---|---|---|
| 100 | 40 us | 69 us | 66 us | 70 us |
| 1,000 | 453 us | 587 us | 596 us | 651 us |
| 10,000 | 4.5 ms | 5.9 ms | 5.8 ms | 5.9 ms |
轻量 per-item 计算下,数据并行有负收益——goroutine 创建、shard 分配、Output 合并的固定开销(约 30-50%)大于并行节省的时间。这个结果符合预期:数据并行的目标场景是 per-item 计算量足够重的算子。
以 10,000 items 为例,框架「税」约为 1.4 ms,折合每 item 约 0.14 us。这意味着只要单 item 计算时间超过这个水位线,并行就开始盈利。对于一次 HTTP 调用动辄几毫秒的场景(比如刚才介绍的 transform_by_remote_pineapple),这个开销可以忽略。算子完全不需要改代码,在 DSL 中加一行 data_parallel=4 就能让引擎自动分片并行。
小结
从 v0.3.4 到 v0.4.0 的十次迭代,表面上是功能列表的增长,背后是两条贯穿性的设计理念在反复兑现。
职责归位。资源热加载把 ResourceManager 的生命周期对齐到引擎级别;Frame 并发自治把锁从调度器下沉到数据层;传递性归约从渲染层下沉到构建阶段。每一次「下沉」都在回答同一个问题:这件事该由谁负责?当职责归位后,上层代码自然简化,下层代码自然内聚。
约束即能力。数据并行框架最有说服力的设计决策不是「支持什么」,而是「禁止什么」——只支持 Transform、禁止 common_output。正是这两条编译期硬约束,让分片-并行-合并的运行时实现变得直截了当:不需要处理 common 字段冲突,不需要担心屏障语义被破坏,不需要为每种算子类型写特化路径。约束越明确,实现越简单,出错的空间越小。
这个原则在 Pineapple 中反复出现:Recall 只能 AddItem 所以多路召回可以安全并行;六种类型的 Output 约束在运行时强制校验而非靠约定;data_parallel 的类型和字段约束在编译期拒绝而非运行时报错。每一条「做不到的事」,都是引擎可以自动做出正确决策的前提。