0%

Pineapple v0.3.4 → v0.4.0:从可视化到数据并行的十次迭代

本文是「Pineapple」系列的第二篇,上一篇完整拆解了引擎的核心原理——数据冒险驱动的 DAG 构建、channel-per-node 并行调度、Lua 嵌入和控制流编译。之后的两天里,项目经历了 10 个版本迭代、27 次功能提交,沿三条主线推进:可观测性与调试体验运行时架构演进新算子与数据并行框架。本文逐一记录每个改动的背景、技术决策和收效。

DAG 可视化:让黑箱变成白盒

Pineapple 的 DAG 是引擎自动从算子的输入输出字段推导出来的——用户不手动画图。这带来了便利,但也带来了黑箱:当 pipeline 复杂到二三十个算子时,用户无法直观确认「引擎理解的依赖关系是不是我想的那样」。

DOT 与 Mermaid 双格式

internal/dag/visualize.go 提供了两种渲染格式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func RenderDOT(g *Graph) string {
var b strings.Builder
b.WriteString("digraph pipeline {\n")
b.WriteString(" rankdir=TB;\n")
b.WriteString(" node [shape=box, style=filled, fontname=\"Helvetica\"];\n\n")

for _, node := range g.Nodes {
opType := types.OperatorType(node.Config.OperatorType)
color := dotColors[opType]
label := fmt.Sprintf("%s\\n[%s]", node.Name, opType)
fmt.Fprintf(&b, " %q [label=%q, fillcolor=%q];\n", node.Name, label, color)
}
// edges: 直接遍历 Node.Succs
for _, node := range g.Nodes {
for _, succ := range node.Succs {
fmt.Fprintf(&b, " %q -> %q;\n", node.Name, g.Nodes[succ].Name)
}
}
b.WriteString("}\n")
return b.String()
}

Graphviz DOT 适合本地渲染成 SVG,精确控制布局;Mermaid flowchart 适合直接嵌入 Markdown 文档和 PR description。节点按算子类型着色——Recall 绿、Transform 蓝、Filter 橙、Merge 紫、Reorder 黄、Observe 灰——一目了然。

通过 Engine.RenderDAG(format) 公共 API 暴露,HTTP 端点 GET /dag?format=dot|mermaid 直接服务。用户一条 curl 就能拿到 DAG 图:

1
curl -s localhost:8080/dag | dot -Tsvg -o dag.svg

传递性归约与纵向布局

初版可视化直接遍历所有推导出的边,包括被更长路径隐含的冗余边,导致图上出现大量交叉线。解决方案是传递性归约:遍历每条边 u→v,检查是否存在中间路径 u→...→v,有则跳过。同时将布局方向从左到右(LR)改为自上而下(TB),更符合 pipeline 的流水线直觉。

这个渲染层归约后来被更彻底的方案取代(见后文「执行图传递性归约」),渲染函数简化为直接遍历 Node.Succs 画边——因为 Succs 本身已经是归约后的最小边集。

资源配置热加载

Pineapple 在上一个版本已经支持引擎配置热加载——修改 JSON 配置文件后服务自动重建 Engine。但 ResourceManager(管理特征索引等需要定时刷新的外部数据)没有跟上,仍然需要重启服务才能更新资源配置。

解决方案是将 resources 从裸指针改为 atomic.Pointer[resource.Manager],与 enginePtr 对齐。pkg/server/server.go 中的热加载流程变为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func reloadConfig(path string) error {
data, err := os.ReadFile(path)
if err != nil {
return err
}

engine, err := pine.NewEngine(data)
if err != nil {
return err
}

newRM := resource.NewManager()
if err := newRM.LoadFromRootConfig(data); err != nil {
return err
}
if err := newRM.Start(context.Background()); err != nil {
return err
}

if err := resource.ValidateResourceDeps(data, newRM); err != nil {
newRM.Stop()
return err
}

enginePtr.Store(engine)
oldRM := resources.Swap(newRM)
if oldRM != nil {
oldRM.Stop()
}
return nil
}

关键设计:新 Manager 先 Start 成功再替换。如果新配置有问题(解析失败、资源拉取超时),旧配置保持不变,服务继续运行。这避免了「加载到一半,新的没起来、旧的已停掉」的窗口期。resources.Swap 原子替换后才 Stop 旧 Manager,读路径始终有效。

列存 DataFrame

行存的 GC 瓶颈

原有的 RowFrame[]map[string]any 存储每个 item——一行一个 map。推荐系统的 pipeline 经常处理上千个 item,每个 item 几十个字段,这意味着数万个小 map 分配,GC 压力显著。

列存设计

引入 Frame 接口,抽象出行存和列存两种实现:

1
2
3
4
5
6
7
8
9
10
type Frame interface {
Common(field string) any
SetCommon(field string, value any)
ItemCount() int
Item(index int, field string) any

BuildInput(commonFields, itemFields []string, commonDefaults, itemDefaults map[string]any) *types.OperatorInput
ApplyOutput(out *types.OperatorOutput, opName string, recall bool) error
ToResult(commonOut, itemOut []string) *types.Result
}

ColumnFramemap[string][]any 存储,每个字段一个 slice,item 按下标访问:

1
2
3
4
5
6
type ColumnFrame struct {
mu sync.RWMutex
common map[string]any
columns map[string][]any
rowCount int
}

一个设计选择值得一提:列存使用 []any 而非泛型 slice。原因是 Pineapple 的字段值类型在运行时才确定(JSON 解析后是 float64stringbool 等混合类型),泛型化无法避免 interface boxing,反而增加复杂度。

通过 JSON 配置的 "storage_mode": "row"|"column" 选择,Python DSL 侧同步支持 Flow(storage_mode="column")。列存在字段写入和构造上更高效(分配更少),但结构变更(删除行、重排序)因需遍历所有列而略慢——适用于「字段多、结构变更少」的 pipeline。

控制算子显式命名

Python DSL 中的 if_() / else_() / end_if_() 在编译时被降级为 transform_by_lua 算子。之前这些控制算子的名字是 transform_by_lua_XXXXXX(哈希后缀),在 DAG 可视化中完全看不出哪个节点是条件分支。

编译器改为生成 if_1elseif_2else_3 这样的显式名称。DAG 图中一眼就能识别控制流节点,不再淹没在一堆 transform_by_lua_* 里。这个改动看似微小,但对包含多层嵌套条件分支的复杂 pipeline 来说,可视化的可读性提升是质的飞跃。

运行时架构两项重构

v0.3.10 在同一个版本内完成了两项关联的架构改善。

Frame 并发自治

原来的调度器持有一把全局 sync.RWMutex,每次算子读写 DataFrame 都要经过调度器加锁。这个设计把「数据安全」的职责放在了错误的层级——调度器负责 DAG 编排,不应该关心数据访问的同步。

重构后锁下沉到 Frame 实现内部。RowFrameColumnFrame 各自持有一个 sync.RWMutex

1
2
3
4
5
6
7
8
9
10
11
12
type RowFrame struct {
mu sync.RWMutex
common map[string]any
items []map[string]any
}

func (f *RowFrame) Common(field string) any {
f.mu.RLock()
v := f.common[field]
f.mu.RUnlock()
return v
}

读操作(CommonItemBuildInput)取 RLock,写操作(SetCommonApplyOutput)取 Lock。调度器代码中移除所有锁操作。

过程中曾尝试更激进的方案:给 ColumnFrame 用双锁(commonMu + structMu),让 common 字段和 item 字段的并发访问互不干扰。但 benchmark 显示结构体从 24 字节膨胀到 72 字节后,cache line 压力导致结构变更操作约 1.8 倍退化。最终回退到单锁方案——简单且够用。

执行图传递性归约

前文提到的 DAG 可视化传递性归约做在渲染层,每次渲染都要重新计算。更重要的是,调度器遍历 Node.Preds 等待前驱时,仍然要遍历包含冗余边的完整边集——虽然语义正确(等待一个已经被间接等过的前驱只是多一次 channel receive),但浪费了 CPU。

解决方案是将归约从渲染层下沉到 dag.Build() 阶段:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func Build(sequence []string, operators map[string]config.OperatorConfig) (*Graph, error) {
// Phase 1: barrier edges
// Phase 2: data hazard edges
// Phase 3: merge source edges

// Phase 4: Transitive reduction
reduce(g)

// Validate: no cycles
if _, err := TopologicalSort(g); err != nil {
return nil, err
}
return g, nil
}

reduce 的实现对每条边 u→v 做 BFS 检查是否存在绕过直连的替代路径:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func reducedEdges(g *Graph) [][2]int {
n := len(g.Nodes)
adj := make([][]int, n)
for i, node := range g.Nodes {
adj[i] = node.Succs
}

var edges [][2]int
for u := 0; u < n; u++ {
for _, v := range adj[u] {
if !reachableWithout(adj, n, u, v) {
edges = append(edges, [2]int{u, v})
}
}
}
return edges
}

归约不改变执行语义:done[pred] channel 的 close 在 Go 内存模型下提供 happens-before 保证,可达性不变则执行顺序约束不变。渲染层代码因此大幅简化(删除 76 行归约逻辑),直接遍历 Node.Succs 画边即可。整个系统只做一次归约,调度器和可视化同时受益。

跨服务 Transform 算子

推荐系统中常见的模式是:主 pipeline 需要调用下游的特征服务获取额外字段。这本质上是一个「远程子 pipeline 调用」。之前用户只能写自定义 Go 算子来实现 HTTP 调用,缺乏统一抽象。

新增的 transform_by_remote_pineapple 算子调用下游 Pineapple 服务的 /execute 端点,核心挑战是字段映射:本地 frame 的字段名和下游服务的字段名可能不同。

按位置对应的映射模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 构造下游请求:local common_input[i] → remote common_request[i]
reqCommon := make(map[string]any, len(o.CommonInput))
for i, localField := range o.CommonInput {
if i < len(commonReqFields) {
reqCommon[commonReqFields[i]] = in.Common(localField)
}
}

// 解析下游响应:remote common_response[i] → local common_output[i]
for i, localField := range o.CommonOutput {
if i < len(commonRespFields) {
if val, ok := result.Common[commonRespFields[i]]; ok {
out.SetCommon(localField, val)
}
}
}

common_request[i]common_input[i] 按位置一一对应,item_request[i]item_input[i] 同理,响应侧亦然。这四组映射参数是可选的——当未提供时,直接使用 metadata 字段名(无映射),适用于上下游字段名一致的场景。

错误处理策略

1
2
3
4
5
6
7
func (o *RemotePineappleOp) handleError(out *pine.OperatorOutput, err error) error {
if o.failOnError {
return err
}
out.SetWarning(err)
return nil
}

fail_on_error 参数控制下游错误的处理方式:true 时下游错误为 fatal,整个 DAG 中止;false 时降级为 warning,pipeline 继续执行。这让业务团队可以根据下游服务的重要程度灵活配置——核心特征服务用 fatal,辅助增强服务用 warning。

Python DSL 中一行声明即可接入下游服务:

1
2
3
4
5
6
7
8
9
flow.transform_by_remote_pineapple(
common_input=["user_age"],
item_input=["item_id"],
item_output=["item_feature"],
host="feature-service",
port=8080,
item_request=["id"],
item_response=["feature"],
)

算子级数据并行框架

许多 Transform 算子的 per-item 计算是独立的(如特征归一化、远程调用、模型推理),天然可以并行加速。但之前没有统一机制——要么靠用户在算子内部自己开 goroutine,要么靠 DAG 层面的算子间并行。

三个关键设计决策

在动手实现之前,有三个问题需要先回答。

第一,哪些算子类型支持数据并行? 初看之下 Recall 和 Observe 似乎也可以,但仔细分析后发现只有 Transform 兼容:

类型 禁止原因
Recall 大多不依赖 item 输入,分片只会重复召回 N 次
Observe 只读,并行只是重复副作用
Filter/Merge/Reorder 屏障语义,需要全局视图

第二,数据并行时允许写 common 字段吗? 多个 shard 并行写 common 字段没有安全的 merge 语义——不同 shard 可能算出不同值,引擎无法决定谁胜出。而真正需要数据并行的场景(per-item 密集计算)天然只写 item 字段。因此:启用 data_parallel 时强制要求 common_output 为空。如果未来出现真实的 common 聚合需求,再讨论 map-reduce 方案。

第三,在哪一层实现? 不改 DAG 推导逻辑。数据并行是单个算子节点的运行时优化,对 DAG 拓扑完全透明。

编译期守门

这两条约束在 NewEngine 阶段强制校验:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func validateDataParallel(opName string, opCfg *config.OperatorConfig, opType types.OperatorType) error {
if opCfg.DataParallel > 1 {
if opType != types.OpTypeTransform {
return &ValidationError{
Message: fmt.Sprintf("operator %q: data_parallel=%d is only supported for Transform operators, got %s",
opName, opCfg.DataParallel, opType),
}
}
if len(opCfg.Meta.CommonOutput) > 0 {
return &ValidationError{
Message: fmt.Sprintf("operator %q: data_parallel=%d requires empty $metadata.common_output",
opName, opCfg.DataParallel),
}
}
}
return nil
}

编译期拒绝不合法配置,而非运行时静默失败。

分片、并行、合并

运行时实现在 internal/runtime/parallel.go,分三步。

分片——将 items 切成 N 份,尽量等分,余量分配给前几个 shard:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func splitInput(input *types.OperatorInput, n int) ([]*types.OperatorInput, []int) {
total := input.ItemCount()
common := input.RawCommon()
items := input.RawItems()

if n > total {
n = total
}

base := total / n
rem := total % n

parts := make([]*types.OperatorInput, n)
offsets := make([]int, n)
start := 0
for i := 0; i < n; i++ {
size := base
if i < rem {
size++
}
// ... copy shard items, share common
parts[i] = types.NewOperatorInput(common, shardItems)
offsets[i] = start
start += size
}
return parts, offsets
}

所有 shard 共享同一个 common map(只读),items 按下标范围独立拷贝。offsets 记录每个 shard 在原始 items 中的起始位置,供合并时重映射索引。

并行执行——每个 shard 一个 goroutine,带独立的 panic recovery:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
func parallelExecute(ctx context.Context, cop *CompiledOperator, input *types.OperatorInput) (*types.OperatorOutput, error) {
parts, offsets := splitInput(input, cop.Config.DataParallel)

execCtx, cancel := context.WithCancel(ctx)
defer cancel()

outputs := make([]*types.OperatorOutput, len(parts))
var (
wg sync.WaitGroup
errOnce sync.Once
first error
)

for i := range parts {
wg.Add(1)
go func(idx int) {
defer wg.Done()
out := types.NewOperatorOutput()
err := executeWithRecovery(execCtx, cop, parts[idx], out)
if err != nil {
errOnce.Do(func() {
first = err
cancel() // 任一 shard 失败,取消所有
})
return
}
outputs[idx] = out
}(i)
}

wg.Wait()
if first != nil {
return nil, first
}
return mergeOutputs(outputs, offsets), nil
}

errOnce + cancel() 确保任一 shard 失败立即终止其他 shard。executeWithRecovery 将 panic 转化为 PanicError,与单线程路径行为一致。

合并——将各 shard 的 itemWrites 按偏移量重映射为绝对索引:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func mergeOutputs(outputs []*types.OperatorOutput, offsets []int) *types.OperatorOutput {
merged := types.NewOperatorOutput()
for i, out := range outputs {
if out == nil {
continue
}
if w := out.GetWarning(); w != nil {
merged.SetWarning(w)
}
offset := offsets[i]
for localIdx, fields := range out.GetItemWrites() {
absIdx := localIdx + offset
for field, value := range fields {
merged.SetItem(absIdx, field, value)
}
}
}
return merged
}

shard 0 写 [0, k) 的 item,shard 1 写 [k, m) 的 item……各 shard 的本地索引加上 offset 就是全局索引。合并后的 OperatorOutput 与单线程执行产生的完全一致。

调度器接入

调度器的改动极小——只需在执行前判断 DataParallel > 1

1
2
3
4
5
6
if cop.Config.DataParallel > 1 {
output, execErr = parallelExecute(ctx, cop, input)
} else {
output = types.NewOperatorOutput()
execErr = cop.Instance.Execute(ctx, input, output)
}

后续的 ValidateOutput、warning 收集、debug snapshot、ApplyOutput 全部保持原流程不变。

等价性验证

构造 100 个 items,同一个 doubleItemOp 分别以 data_parallel=1(基线)和 data_parallel=2,3,4,7 执行,逐 item 对比输出值,断言完全一致。测试覆盖了等分、非等分、shard 数大于 item 数等边界情况。

性能实测

使用 Horner 多项式(degree 5,per-item 做 5 次乘加)通过完整引擎路径测量:

items shards=1 shards=2 shards=4 shards=8
100 40 us 69 us 66 us 70 us
1,000 453 us 587 us 596 us 651 us
10,000 4.5 ms 5.9 ms 5.8 ms 5.9 ms

轻量 per-item 计算下,数据并行有负收益——goroutine 创建、shard 分配、Output 合并的固定开销(约 30-50%)大于并行节省的时间。这个结果符合预期:数据并行的目标场景是 per-item 计算量足够重的算子

以 10,000 items 为例,框架「税」约为 1.4 ms,折合每 item 约 0.14 us。这意味着只要单 item 计算时间超过这个水位线,并行就开始盈利。对于一次 HTTP 调用动辄几毫秒的场景(比如刚才介绍的 transform_by_remote_pineapple),这个开销可以忽略。算子完全不需要改代码,在 DSL 中加一行 data_parallel=4 就能让引擎自动分片并行。

小结

从 v0.3.4 到 v0.4.0 的十次迭代,表面上是功能列表的增长,背后是两条贯穿性的设计理念在反复兑现。

职责归位。资源热加载把 ResourceManager 的生命周期对齐到引擎级别;Frame 并发自治把锁从调度器下沉到数据层;传递性归约从渲染层下沉到构建阶段。每一次「下沉」都在回答同一个问题:这件事该由谁负责?当职责归位后,上层代码自然简化,下层代码自然内聚。

约束即能力。数据并行框架最有说服力的设计决策不是「支持什么」,而是「禁止什么」——只支持 Transform、禁止 common_output。正是这两条编译期硬约束,让分片-并行-合并的运行时实现变得直截了当:不需要处理 common 字段冲突,不需要担心屏障语义被破坏,不需要为每种算子类型写特化路径。约束越明确,实现越简单,出错的空间越小。

这个原则在 Pineapple 中反复出现:Recall 只能 AddItem 所以多路召回可以安全并行;六种类型的 Output 约束在运行时强制校验而非靠约定;data_parallel 的类型和字段约束在编译期拒绝而非运行时报错。每一条「做不到的事」,都是引擎可以自动做出正确决策的前提。

俗话说,投资效率是最好的投资。 如果您感觉我的文章质量不错,读后收获很大,预计能为您提高 10% 的工作效率,不妨小额捐助我一下,让我有动力继续写出更多好文章。