ragateway-pipeline V28 — 数据处理 Pipeline DAG 引擎

完成时间: 2026-05-20
研究范围: n8n workflow 执行引擎 · dify workflow/core/workflow 节点系统 · ComfyUI 节点注册与图执行引擎
研究者: 技术架构师(基于源码研究,非单纯介绍性文档)


1. 设计定位与边界

1.1 问题陈述

V27 的数据处理流程是硬编码的:

文件落盘 → DocumentConv → [NaiveRAG | GraphRAG×N](并行)

这一结构存在以下问题:
- 新增文档处理节点(如 OCR、翻译、结构化提取)需要修改核心代码
- 不同租户无法拥有不同的处理流程
- 无法按文件类型动态路由到不同处理分支
- 流程不可视化,管理员无法自行调整

1.2 V28 目标

将硬编码的文档处理流程替换为可视化、可配置的 DAG Pipeline 引擎,允许 RegDesk 管理员:

1.3 设计约束

约束 说明
Pipeline 层级 对「数据内容」的处理编排,不是「上传方式」的配置
输入边界 File Store 之后(已落盘文件的内容)
粒度 租户级(每个 tenant 有自己的 default pipeline)
节点扩展 内置节点 + 第三方注册扩展
目标用户 RegDesk 管理员(技术向,不是终端客户)
兼容性 必须向后兼容 V27(无 DAG 配置时走 V27 默认流程)
技术栈 Python 3.12+,asyncio,原有 FastAPI 框架

1.4 与 V27 的架构关系

┌─────────────────────────────────────────────────────────────┐
│                      ragateway V27                          │
│  ┌─────────────┐    ┌──────────────────────────────────┐   │
│  │ File Store  │───▶│     PipelineExecutor (NEW)       │   │
│  └─────────────┘    │  ┌─────────────────────────────┐ │   │
│                     │  │  DAG: nodes + edges JSON     │ │   │
│                     │  │  执行引擎(asyncio DAG walk)│ │   │
│                     │  └─────────────────────────────┘ │   │
│                     └──────────────────────────────────┘   │
└─────────────────────────────────────────────────────────────┘

PipelineExecutor 替换了 V27 中硬编码的:
- DocumentConverterPlugin 调用
- NaiveRAGPlugin.ingest() 调用
- GraphRAGPlugin[n].ingest() 调用

1.5 整体架构图

V28 整体架构图

ragateway-pipeline V28 — 数据处理 DAG 引擎
数据处理路径(正向) 文件上传 → 处理 → 摄入
Pipeline 配置路径(逆向) 管理员自定义 → 生效
👤 RegDesk 管理员
自定义租户 Pipeline
↓ 拖拽编辑
📤 HTTP 上传
POST /api/file/upsert
🔗 FTP / SFTP
source_url 批量下载
📦 Batch
POST /api/file/batch-upsert
V27 File Store 层(上传形式,V28 不涉及)
📁 File Store(V27)— name_hash 多版本存储 · PostgreSQL 元数据 · content_hash 去重
↓ 文件内容 bytes
⚙️ PipelineExecutor(V28 新增)— DAG 调度 · 状态追踪 · 错误处理
① 查租户 Pipeline
tenant_pipelines 表
→ default DAG JSON
② Kahn 拓扑排序
构建执行层
→ asyncio.gather 并行
③ 状态写 PostgreSQL
节点级进度
→ ragateway_documents
DAG 节点类型(@register_node 装饰器注册)
📄 Converter
MinerU · Docling
markitdown · skip
✂️ Splitter
fixed · semantic
sentence
🔀 Router
按文件类型
按 metadata 条件
✨ Enricher
metadata 注入
语言检测 · 实体
🎯 Sink
NaiveRAG
GraphRAG × N
典型 Pipeline 示例(租户可自定义)
510(k) 文件
MinerU Semantic Split 512 ↓ 并行
NaiveRAG GraphRAG
法规标准
MinerU Fixed Split 256 GraphRAG Light
Markdown 知识库
Skip Converter Fixed Split 1024 NaiveRAG only
执行处理结果
↕ Pipeline 配置
存 tenant_pipelines 表
热更新无需重启
ragateway-ui :7891 — Pipeline 管控界面(V28 新增功能)
← 管理员入口
🎨 Pipeline 编辑器
React Flow 拖拽
/admin/pipelines
📊 执行监控
WebSocket 实时
节点级进度可视化
🧩 节点面板
GET /api/admin/node-types
内置 + 第三方节点
🔌 外部后端服务(V27 不变)
Document Convert
MinerU · Docling · markitdown
Naive RAG Backend
RAGFlow Naive · sparse+dense
GraphRAG Backends(多选)
General · Light · LLM Wiki
📌 关键设计原则
输入边界:Pipeline 从 File Store 之后开始,不涉及上传协议(HTTP/FTP/SFTP)
租户隔离:每个 tenant 有自己的 default pipeline,存 PostgreSQL tenant_pipelines 表
向下兼容:无 DAG 配置的租户走 V27_DEFAULT_PIPELINE 静态常量,零迁移成本
节点扩展:@register_node 装饰器(ComfyUI 风格),第三方节点零改动注册

1.6 Overall Architecture (English)

V28 Overall Architecture (English)

ragateway-pipeline V28 — Data Processing DAG Engine
Data Processing Path (forward) Upload → Process → Ingest
Pipeline Config Path (reverse) Admin customizes → Takes effect
📤 HTTP Upload
POST /api/file/upsert
🔗 FTP / SFTP
source_url batch download
📦 Batch
POST /api/file/batch-upsert
👤 RegDesk Admin
Customizes tenant pipelines
↓ Drag-and-drop editor
V27 File Store layer (upload protocol, out of V28 scope)
📁 File Store (V27) — name_hash multi-version · PostgreSQL metadata · content_hash dedup
↓ file content bytes
⚙️ PipelineExecutor (V28 New) — DAG scheduling · state tracking · error handling
① Look up Tenant Pipeline
tenant_pipelines table
→ default DAG JSON
② Kahn Topological Sort
Build execution layers
→ asyncio.gather parallel
③ Write State → PostgreSQL
Node-level progress
→ ragateway_documents
DAG Node Types (registered via @register_node decorator)
📄 Converter
MinerU · Docling
markitdown · skip
✂️ Splitter
fixed · semantic
sentence
🔀 Router
by file type
by metadata conditions
✨ Enricher
metadata inject
lang detect · entities
🎯 Sink
NaiveRAG
GraphRAG × N
Typical Pipeline Examples (per-tenant configurable)
510(k) Submissions
MinerU Semantic Split 512 ↓ parallel
NaiveRAG GraphRAG
Regulatory Standards
MinerU Fixed Split 256 GraphRAG Light
Markdown Knowledge Base
Skip Converter Fixed Split 1024 NaiveRAG only
execution results
↕ Pipeline Config
stored in tenant_pipelines
hot-reload, no restart needed
ragateway-ui :7891 — Pipeline Management Console (V28 new)
← Admin entry
🎨 Pipeline Editor
React Flow drag-and-drop
/admin/pipelines
📊 Execution Monitor
WebSocket real-time
node-level progress viz
🧩 Node Panel
GET /api/admin/node-types
built-in + 3rd-party nodes
🔌 External Backend Services (V27, unchanged)
Document Converter
MinerU · Docling · markitdown
Naive RAG Backend
RAGFlow Naive · sparse+dense
GraphRAG Backends (multi-select)
General · Light · LLM Wiki
📌 Key Design Principles
Input boundary: Pipeline starts after File Store — upload protocol (HTTP/FTP/SFTP) is out of scope
Tenant isolation: Each tenant has its own default pipeline stored in PostgreSQL tenant_pipelines
Backward compatible: Tenants without a DAG config fall back to V27_DEFAULT_PIPELINE — zero migration cost
Node extensibility: @register_node decorator (ComfyUI-style) — third-party nodes register with zero core changes

2. 借鉴分析

2.1 n8n — 借鉴什么

n8n 是什么: 开源工作流自动化平台,Node.js/TypeScript 实现,支持复杂条件分支、重试、超时、凭证管理等企业级特性。生态有数千个社区节点。

核心借鉴:

2.1.1 Workflow JSON 结构(IWorkflowBase)

// 来自 n8n/packages/workflow/src/interfaces.ts
interface INode {
  id: string;
  name: string;
  type: string;              // 节点类型标识符,如 "n8n-nodes-base.httpRequest"
  typeVersion: number;       // 节点版本号
  position: [number, number]; // UI 画布坐标
  disabled?: boolean;
  retryOnFail?: boolean;
  maxTries?: number;
  parameters: INodeParameters;
}

interface IConnections {
  [nodeName: string]: {       // 输出节点名
    [inputName: string]: Array<{
      node: string;            // 目标节点名
      type: string;            // 连接类型(如 "main")
      index: number;           // 目标输入端口索引
    }>;
  };
}

借鉴点:
- 节点与连接分离的图结构(nodes + connections)
- 节点参数与类型解耦(type 不包含参数,parameters 才是配置)
- type + typeVersion 支持节点版本控制

2.1.2 执行引擎(WorkflowExecute)

n8n 的 WorkflowExecute 执行引擎核心逻辑:
1. 根据 connections 构建邻接表
2. 对每个节点执行 runNode(),支持并行执行同一层级的节点
3. 通过 waitOnFail / retryOnFail 处理节点失败
4. 通过 continueErrorOutput / continueRegularOutput / stopWorkflow 控制错误路由

借鉴点: 节点失败处理的三种策略(继续、跳过、停止)。

2.1.3 为什么不直接用 n8n

原因 说明
技术栈不匹配 n8n 是 Node.js/TypeScript,ragateway 是 Python FastAPI
面向场景不同 n8n 面向工作流自动化(HTTP 请求、数据库操作),不是文档处理管道
过度设计 n8n 有完整的凭证管理、webhook、trigger 系统,ragateway 只需要 DAG 执行
部署复杂度 引入 n8n 意味着多一个 Node.js 服务,运维成本高

结论: 借鉴 n8n 的 JSON Schema 结构、节点版本化设计、错误处理策略,但自研 Python 执行引擎。


2.2 dify — 借鉴什么

dify 是什么: 开源 LLM 应用开发平台(Python),有完整的 workflow 系统,支持知识库检索、LLM 调用、条件分支、代码执行等。

核心借鉴:

2.2.1 graphon 子系统

dify 内部将 workflow 引擎重构为独立的 graphon 子系统:

# 来自 dify api/core/workflow/nodes/knowledge_retrieval/knowledge_retrieval_node.py
class KnowledgeRetrievalNode(LLMUsageTrackingMixin, Node[KnowledgeRetrievalNodeData]):
    node_type = BuiltinNodeTypes.KNOWLEDGE_RETRIEVAL

    def _run(self) -> NodeRunResult:
        ...

2.2.2 graph_config 结构

class _NodeConfigDict(TypedDict):
    id: str
    width: int
    height: int
    type: str
    data: dict[str, Any]

class _EdgeConfigDict(TypedDict):
    source: str
    target: str
    sourceHandle: str
    targetHandle: str

2.2.3 节点工厂模式(NodeFactory)

def resolve_workflow_node_class(*, node_type: NodeType, node_version: str) -> type[Node]:
    node_mapping = get_node_type_classes_mapping().get(node_type)
    latest_node_class = node_mapping.get(LATEST_VERSION)
    matched_node_class = node_mapping.get(node_version)
    return matched_node_class or latest_node_class

2.2.4 VariablePool 变量池

dify 的节点间数据传递通过 VariablePool
- 变量标识:node_id:output_name
- 节点 A 输出 → VariablePool → 节点 B 从 VariablePool 读取

2.2.5 为什么不直接用 dify

原因 说明
领域不匹配 dify 的 workflow 是「LLM 应用编排」,不是「文档处理管道」
RAG 对接成本高 dify 的 Knowledge Retrieval Node 对接 dify 自己的知识库,不是 RAGFlow/Neo4j
依赖过多 dify 依赖其自身的数据库 schema、模型管理、向量库,耦合太重

结论: 借鉴 dify 的节点基类设计(Node[NodeData] ABC)、节点工厂注册模式、VariablePool 变量池概念。自研时保持简化。


2.3 ComfyUI — 借鉴什么

ComfyUI 是什么: 最强大的模块化 AI 生成引擎,以工作流图为核心,拥有最大的自定义节点生态系统。

核心借鉴:

2.3.1 节点注册机制(NODE_CLASS_MAPPINGS)

ComfyUI 的节点通过全局字典注册:

NODE_CLASS_MAPPINGS = {
    "LoadImage": LoadImage,
    "KSampler": KSampler,
    "CLIPTextEncode": CLIPTextEncode,
}
OUTPUT_TYPES = {
    "LoadImage": {"images": ["IMAGE"]},
}

借鉴点:
- 装饰器注册 + 全局字典:简单却极其强大
- 第三方节点只需 import 并追加到 NODE_CLASS_MAPPINGS,无需修改核心代码
- INPUT_TYPES / OUTPUT_TYPES 强类型定义输入输出约束

2.3.2 强类型连接

class LoadImage:
    @classmethod
    def INPUT_TYPES(cls):
        return {
            "required": {
                "image": ("IMAGE",),
            },
            "optional": {
                "channel": (["alpha", "RGB"],),
            }
        }
    RETURN_TYPES = ("IMAGE", "MASK")
    FUNCTION = "load_image"

借鉴点:
- INPUT_TYPES 强制输入类型,UI 编辑器只允许连接兼容的输出端
- 类型不匹配时 UI 直接不显示连接线,降低用户错误率

2.3.3 为什么 ComfyUI 节点生态最好

特性 效果
极简注册 一个 Python 文件 + 两行注册代码 = 一个新节点
强类型约束 类型不匹配时 UI 直接不显示连接线
无中心注册表 节点不依赖中央注册表,通过 import 链式注册
纯 Python 不需要额外配置,纯代码即可扩展

2.3.4 为什么不直接用 ComfyUI

原因 说明
同步执行模型 ComfyUI 是同步 Python,不适合 asyncio 异步场景
无并行分支 ComfyUI 图执行是严格拓扑排序,没有真正的并行执行
无错误路由 ComfyUI 不支持条件分支/错误处理

结论: 借鉴 ComfyUI 的节点注册装饰器、INPUT_TYPES/OUTPUT_TYPES 强类型系统。


2.4 三者对比与取舍

维度 n8n dify ComfyUI ragateway-pipeline V28
技术栈 TypeScript Python Python Python
数据传递 item 数组流 VariablePool slot 类型系统 PipelineItem + VariablePool
图结构 {nodes, connections} {nodes, edges} {nodes, links} {nodes, edges}
节点注册 INodeType 接口 Node 类继承 全局字典 装饰器注册
类型约束 强类型 强类型
并行分支 asyncio.gather graphon engine 拓扑+缓存 asyncio 并行层
条件路由 错误输出策略 If/Else 节点 Router 节点
节点版本 type + typeVersion type + version type + version
第三方扩展 npm 包 Python 包 pip 包 pip 包 + 热加载

最终设计决策:
- 图结构: 融合 dify 的 {nodes, edges} + ComfyUI 的强类型
- 节点注册: ComfyUI 装饰器模式 + dify 的 Node 基类
- 数据传递: dify VariablePool 概念 + n8n item 流的简化版(PipelineItem
- 并行执行: n8n 的层级并行 + asyncio
- 错误处理: n8n 三策略


3. 核心概念

3.1 Pipeline 定义(DAG JSON Schema)

{
  "$schema": "https://ragateway.regdesk.co/schemas/pipeline-v1.json",
  "version": "1",
  "pipeline_id": "pipeline-uuid-xxx",
  "tenant_id": "tenant-uuid-xxx",
  "name": "RegDesk 标准文档处理 Pipeline",
  "description": "处理 RegDesk 医疗设备文档的标准流程",
  "config": {
    "max_concurrency": 4,
    "timeout_seconds": 300,
    "retry_on_node_fail": true,
    "max_retries": 2,
    "continue_on_error": false
  },
  "nodes": [
    {
      "id": "node-001",
      "type": "source.file_store",
      "version": "1",
      "name": "文件来源",
      "position": {"x": 0, "y": 100},
      "inputs": {},
      "outputs": ["file"],
      "config": {}
    },
    {
      "id": "node-002",
      "type": "converter.mineru",
      "version": "1",
      "name": "文档转换",
      "position": {"x": 250, "y": 100},
      "inputs": {
        "file": {"from_node": "node-001", "from_output": "file"}
      },
      "outputs": ["markdown", "toc", "metadata"],
      "config": {
        "backend": "hybrid-auto-engine",
        "lang_list": ["en", "zh"],
        "timeout": 300
      }
    },
    {
      "id": "node-003",
      "type": "splitter.recursive",
      "version": "1",
      "name": "文本分块",
      "position": {"x": 500, "y": 100},
      "inputs": {
        "text": {"from_node": "node-002", "from_output": "markdown"}
      },
      "outputs": ["chunks"],
      "config": {
        "chunk_size": 512,
        "chunk_overlap": 64
      }
    },
    {
      "id": "node-004",
      "type": "router.file_type",
      "version": "1",
      "name": "文件类型路由",
      "position": {"x": 500, "y": 280},
      "inputs": {
        "metadata": {"from_node": "node-002", "from_output": "metadata"}
      },
      "outputs": ["pdf_branch", "docx_branch", "default_branch"],
      "config": {
        "routes": [
          {"condition": "file_type == 'pdf'", "output": "pdf_branch"},
          {"condition": "file_type in ['docx','doc']", "output": "docx_branch"}
        ],
        "default_output": "default_branch"
      }
    },
    {
      "id": "node-005",
      "type": "sink.ragflow_naive",
      "version": "1",
      "name": "NaiveRAG 摄入",
      "position": {"x": 750, "y": 50},
      "inputs": {
        "chunks": {"from_node": "node-003", "from_output": "chunks"},
        "metadata": {"from_node": "node-002", "from_output": "metadata"}
      },
      "outputs": ["result"],
      "config": {"enabled": true}
    },
    {
      "id": "node-006",
      "type": "sink.ragflow_graphrag",
      "version": "1",
      "name": "GraphRAG 摄入",
      "position": {"x": 750, "y": 200},
      "inputs": {
        "markdown": {"from_node": "node-002", "from_output": "markdown"},
        "metadata": {"from_node": "node-002", "from_output": "metadata"}
      },
      "outputs": ["result"],
      "config": {"source": "ragflow_general", "enabled": true}
    }
  ],
  "edges": [
    {"id": "e1", "source": "node-001", "source_output": "file", "target": "node-002", "target_input": "file"},
    {"id": "e2", "source": "node-002", "source_output": "markdown", "target": "node-003", "target_input": "text"},
    {"id": "e3", "source": "node-002", "source_output": "toc", "target": "node-004", "target_input": "toc"},
    {"id": "e4", "source": "node-002", "source_output": "metadata", "target": "node-004", "target_input": "metadata"}
  ]
}

JSON Schema:

{
  "$schema": "http://json-schema.org/draft-07/schema#",
  "title": "RagatewayPipeline",
  "type": "object",
  "required": ["version", "pipeline_id", "nodes", "edges"],
  "properties": {
    "version": {"type": "string", "const": "1"},
    "pipeline_id": {"type": "string"},
    "tenant_id": {"type": "string"},
    "name": {"type": "string", "maxLength": 255},
    "description": {"type": "string", "maxLength": 1000},
    "config": {
      "type": "object",
      "properties": {
        "max_concurrency": {"type": "integer", "minimum": 1, "maximum": 16, "default": 4},
        "timeout_seconds": {"type": "integer", "minimum": 10, "maximum": 3600, "default": 300},
        "retry_on_node_fail": {"type": "boolean", "default": true},
        "max_retries": {"type": "integer", "minimum": 0, "maximum": 5, "default": 2},
        "continue_on_error": {"type": "boolean", "default": false}
      }
    },
    "nodes": {
      "type": "array",
      "items": {
        "type": "object",
        "required": ["id", "type", "version", "name", "inputs", "outputs", "config"],
        "properties": {
          "id": {"type": "string"},
          "type": {"type": "string", "pattern": "^[a-z]+\\.[a-z_]+$"},
          "version": {"type": "string"},
          "name": {"type": "string"},
          "position": {"type": "object", "properties": {"x": {"type": "number"}, "y": {"type": "number"}}},
          "inputs": {"type": "object", "additionalProperties": {
            "type": "object", "required": ["from_node", "from_output"],
            "properties": {"from_node": {"type": "string"}, "from_output": {"type": "string"}}
          }},
          "outputs": {"type": "array", "items": {"type": "string"}},
          "config": {"type": "object"}
        }
      }
    },
    "edges": {
      "type": "array",
      "items": {
        "type": "object",
        "required": ["id", "source", "source_output", "target", "target_input"],
        "properties": {
          "id": {"type": "string"},
          "source": {"type": "string"},
          "source_output": {"type": "string"},
          "target": {"type": "string"},
          "target_input": {"type": "string"}
        }
      }
    }
  }
}

补充1-A:完整的条件路由节点示例(Router 节点,按文件类型分叉)

{
  "id": "node-router-001",
  "type": "router.file_type",
  "version": "1",
  "name": "文件类型路由",
  "position": {"x": 400, "y": 200},
  "inputs": {
    "metadata": {"from_node": "node-002", "from_output": "metadata"}
  },
  "outputs": ["pdf_branch", "docx_branch", "md_branch", "default_branch"],
  "config": {
    "routes": [
      {"condition": "file_type == 'pdf'", "output": "pdf_branch"},
      {"condition": "file_type in ['docx','doc']", "output": "docx_branch"},
      {"condition": "file_type == 'md'", "output": "md_branch"}
    ],
    "default_output": "default_branch"
  }
}

补充1-B:Splitter 节点的完整 config_schema

{
  "id": "node-splitter-001",
  "type": "splitter.semantic",
  "version": "1",
  "name": "语义分块",
  "position": {"x": 500, "y": 150},
  "inputs": {
    "text": {"from_node": "node-002", "from_output": "markdown"}
  },
  "outputs": ["chunks"],
  "config": {
    "strategy": "semantic",
    "chunk_size": 512,
    "chunk_overlap": 64,
    "min_chunk_size": 50,
    "max_chunk_size": 2048,
    "embedding_model": "bge-m3",
    "split_by": "sentence",
    "language": "auto"
  }
}

config_schema 说明(JSON Schema 定义):

{
  "chunk_size": {
    "type": "integer",
    "minimum": 50,
    "maximum": 4096,
    "default": 512,
    "description": "每个 chunk 的最大 token 数(估算)"
  },
  "chunk_overlap": {
    "type": "integer",
    "minimum": 0,
    "maximum": 512,
    "default": 64,
    "description": "相邻 chunk 之间的重叠 token 数"
  },
  "strategy": {
    "type": "string",
    "enum": ["fixed", "semantic", "sentence", "recursive"],
    "default": "recursive",
    "description": "分块策略:fixed(固定长度) | semantic(语义/LLM驱动) | sentence(按句子) | recursive(递归字符)"
  },
  "embedding_model": {
    "type": "string",
    "default": "bge-m3",
    "description": "语义分块使用的 embedding 模型"
  },
  "split_by": {
    "type": "string",
    "enum": ["character", "sentence", "paragraph", "markdown-header"],
    "default": "character",
    "description": "递归分块的最小分割单位"
  }
}

补充1-C:Enricher 节点示例(metadata 注入 + language detection)

{
  "id": "node-enricher-001",
  "type": "enricher.metadata",
  "version": "1",
  "name": "元数据增强",
  "position": {"x": 500, "y": 100},
  "inputs": {
    "text": {"from_node": "node-002", "from_output": "markdown"},
    "metadata": {"from_node": "node-002", "from_output": "metadata"}
  },
  "outputs": ["metadata"],
  "config": {
    "extract_fields": ["title", "author", "date", "version"],
    "language_detection": {
      "enabled": true,
      "default_lang": "en",
      "fallback_lang": "en"
    },
    "custom_fields": [
      {"key": "document_source", "value": "mineru"},
      {"key": "processing_version", "value": "v1"}
    ]
  }
}

Enricher 节点 config_schema:

{
  "extract_fields": {
    "type": "array",
    "items": {"type": "string"},
    "default": ["title", "date"],
    "description": "从文本内容中提取的元数据字段"
  },
  "language_detection": {
    "type": "object",
    "properties": {
      "enabled": {"type": "boolean", "default": true},
      "default_lang": {"type": "string", "default": "en"},
      "fallback_lang": {"type": "string", "default": "en"}
    },
    "description": "语言检测配置,检测结果写入 metadata.language"
  },
  "custom_fields": {
    "type": "array",
    "items": {
      "type": "object",
      "properties": {
        "key": {"type": "string"},
        "value": {"type": "string"}
      }
    },
    "description": "自定义静态元数据字段,直接注入"
  }
}

3.2 节点类型系统

3.2.1 节点分类(5 大类)

类别 前缀 说明 示例
Source source.* 管道入口,从 File Store 读取文件 source.file_store
Converter converter.* 文档格式转换 converter.mineru, converter.docling, converter.markitdown
Splitter splitter.* 文本/内容分割 splitter.recursive, splitter.semantic, splitter.fixed
Enricher enricher.* 内容增强/处理 enricher.metadata, enricher.ocr, enricher.translate
Router router.* 条件路由/并行分支 router.file_type, router.metadata, router.if_else
Sink sink.* 输出到外部系统 sink.ragflow_naive, sink.ragflow_graphrag, sink.neo4j_wiki

3.2.2 节点 ABC 定义

from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from typing import Any, TypedDict


class IOType(TypedDict):
    """节点输入/输出类型定义,参考 ComfyUI INPUT_TYPES"""
    name: str
    display_name: str
    description: str


@dataclass
class NodeInputDef:
    """节点输入定义"""
    name: str
    type: str
    required: bool = True
    description: str = ""
    default: Any = None


@dataclass
class NodeOutputDef:
    """节点输出定义"""
    name: str
    type: str
    description: str = ""


@dataclass
class NodeConfigField:
    """节点配置字段定义"""
    name: str
    type: str  # "string" | "integer" | "boolean" | "select" | "list"
    label: str
    required: bool = False
    default: Any = None
    options: list[Any] | None = None
    description: str = ""


class PipelineItem:
    """
    Pipeline 中流动的数据单元。
    融合 n8n item 模型 + dify VariablePool 概念。
    """
    def __init__(
        self,
        id: str,
        data: dict[str, Any],
        metadata: dict[str, Any] | None = None,
        errors: list[str] | None = None,
        source_node_id: str | None = None,
    ):
        self.id = id
        self.data = data
        self.metadata = metadata or {}
        self.errors = errors or []
        self.source_node_id = source_node_id
        self.created_at = None  # ISO 时间戳,执行时填充

    def with_error(self, error: str) -> "PipelineItem":
        return PipelineItem(
            id=self.id, data=self.data,
            metadata=self.metadata.copy(),
            errors=self.errors + [error],
            source_node_id=self.source_node_id,
        )

    def with_metadata(self, key: str, value: Any) -> "PipelineItem":
        return PipelineItem(
            id=self.id, data=self.data,
            metadata={**self.metadata, key: value},
            errors=self.errors,
            source_node_id=self.source_node_id,
        )


class PipelineNode(ABC):
    """Pipeline 节点执行器基类"""

    type: str = ""        # 类属性,由子类填充,如 "converter.mineru"
    version: str = "1"    # 类属性

    def __init__(
        self,
        node_id: str,
        config: dict,
        inputs: dict[str, list[PipelineItem]],
    ):
        self.node_id = node_id
        self.config = config
        self.inputs = inputs  # {input_name: [PipelineItem]}

    @abstractmethod
    async def run(self) -> dict[str, list[PipelineItem]]:
        """
        执行节点逻辑。
        返回 {output_name: [PipelineItem]} 字典。
        """
        ...

    async def health_check(self) -> bool:
        return True

3.2.3 内置节点清单

Source 节点:

类型 说明 输入 输出
source.file_store 从 File Store 读取文件 file

Converter 节点:

类型 说明 输入 输出
converter.mineru MinerU 文档转换 file markdown, toc, metadata
converter.docling Docling 文档转换 file markdown, toc, metadata
converter.markitdown markitdown 轻量转换 file markdown

Splitter 节点:

类型 说明 输入 输出
splitter.recursive 递归字符分块 text chunks
splitter.semantic 语义分块(LLM 驱动) text chunks
splitter.fixed 固定长度分块 text chunks
splitter.section 按章节分块 markdown, toc chunks

Enricher 节点:

类型 说明 输入 输出
enricher.metadata 元数据增强(从内容提取) text, metadata metadata
enricher.ocr OCR 文字识别 file text
enricher.translate 文档翻译 text text
enricher.chunk_meta 为每个 chunk 附加元数据 chunks, metadata chunks

Router 节点:

类型 说明 输入 输出
router.file_type 按文件类型路由 metadata 多命名分支
router.metadata 按 metadata 字段路由 metadata 多命名分支
router.if_else Jinja2 条件路由 condition_input, metadata true_branch, false_branch

Sink 节点:

类型 说明 输入 输出
sink.ragflow_naive RAGFlow NaiveRAG 摄入 chunks, metadata result
sink.ragflow_graphrag RAGFlow GraphRAG 摄入 markdown, metadata result
sink.neo4j_wiki Neo4j WikiGraph 摄入 markdown, metadata result

3.3 节点间数据模型

3.3.1 VariablePool

import asyncio
from typing import Self


class VariablePool:
    """
    节点间共享的变量池(参考 dify VariablePool)。

    变量命名规范:{node_id}:{output_name}
    例如: "node-002:markdown"

    每个变量是一个 PipelineItem 列表。
    """

    def __init__(self):
        self._variables: dict[str, list[PipelineItem]] = {}
        self._lock = asyncio.Lock()

    async def set(self, key: str, items: list[PipelineItem]) -> None:
        async with self._lock:
            self._variables[key] = items

    async def get(self, key: str) -> list[PipelineItem]:
        async with self._lock:
            return self._variables.get(key, [])

    async def get_first(self, key: str) -> PipelineItem | None:
        items = await self.get(key)
        return items[0] if items else None

    @staticmethod
    def make_key(node_id: str, output_name: str) -> str:
        return f"{node_id}:{output_name}"

    async def dump(self) -> dict[str, list[dict]]:
        """导出所有变量(用于调试)"""
        async with self._lock:
            return {
                k: [{"id": i.id, "data_keys": list(i.data.keys())} for i in items]
                for k, items in self._variables.items()
            }

3.3.2 数据传递示例

节点 converter.mineru 执行后:

# VariablePool 中:
# "node-002:markdown" → [PipelineItem(id="chunk-1", data={"text": "..."})]
# "node-002:toc"      → [PipelineItem(id="toc-1", data={"toc": [...]})]
# "node-002:metadata"  → [PipelineItem(id="meta-1", data={"file_type": "pdf", ...})]

节点 splitter.recursive 获取输入时:

markdown_items = await variable_pool.get("node-002:markdown")
# markdown_items: [PipelineItem(id="chunk-1", data={"text": "..."})]

4. 执行引擎设计

4.1 DAG 拓扑排序与执行顺序

import asyncio
from collections import defaultdict
from typing import Any


class DAGExecutor:
    """
    DAG 执行引擎。

    核心逻辑(参考 n8n WorkflowExecute + ComfyUI execution.py):
    1. 构建邻接表
    2. Kahn 算法拓扑排序
    3. 按层级分组,同一层级 asyncio.gather 并行执行
    4. 每个节点完成后写入 VariablePool
    """

    def __init__(
        self,
        pipeline: dict,
        node_registry: "NodeRegistry",
        variable_pool: VariablePool,
        execution_id: str,
        config: dict | None = None,
    ):
        self.pipeline = pipeline
        self.nodes = {n["id"]: n for n in pipeline["nodes"]}
        self.edges = pipeline["edges"]
        self.node_registry = node_registry
        self.variable_pool = variable_pool
        self.execution_id = execution_id
        self.config = config or {}
        self._node_instances: dict[str, PipelineNode] = {}
        self._execution_order: list[list[str]] = []  # 按层级分组

    # ── 4.1.1 构建邻接表 ─────────────────────────────────────────────────
    def _build_adjacency(self) -> tuple[dict[str, list[str]], dict[str, int]]:
        """
        构建邻接表和入度表。

        adj[source_id] = [target_id, ...]
        in_degree[target_id] = 多少条边指向它

        边结构:{source, source_output, target, target_input}
        """
        adj = defaultdict(list)
        in_degree: dict[str, int] = {n["id"]: 0 for n in self.pipeline["nodes"]}

        for edge in self.edges:
            adj[edge["source"]].append(edge["target"])
            in_degree[edge["target"]] += 1

        return adj, in_degree

    def _topological_sort_levels(self) -> list[list[str]]:
        """
        Kahn 算法拓扑排序,按层级分组。

        返回:[[node_id, ...], [node_id, ...], ...]
        每一层是可以并行执行的节点。

        注意:Router 节点的多个输出分支会在不同层级中重复出现,
        由 Router 自身处理分支选择。
        """
        adj, in_degree = self._build_adjacency()
        queue = [nid for nid, deg in in_degree.items() if deg == 0]
        levels: list[list[str]] = []

        while queue:
            level = queue
            levels.append(level)
            queue = []

            for node_id in level:
                for successor in adj[node_id]:
                    in_degree[successor] -= 1
                    if in_degree[successor] == 0:
                        queue.append(successor)

        return levels

    # ── 4.1.2 获取节点的依赖输入 ─────────────────────────────────────────
    def _resolve_inputs(self, node_id: str) -> dict[str, list[PipelineItem]]:
        """
        解析节点所有输入的实际值。

        从节点的 inputs 配置中,找到 {from_node, from_output},
        从 VariablePool 中获取对应的 PipelineItem 列表。
        """
        node = self.nodes[node_id]
        resolved = {}

        for input_name, input_ref in node.get("inputs", {}).items():
            key = VariablePool.make_key(input_ref["from_node"], input_ref["from_output"])
            resolved[input_name] = await self.variable_pool.get(key)

        return resolved

    # ── 4.1.3 主执行循环 ─────────────────────────────────────────────────
    async def execute(self) -> dict[str, Any]:
        """
        执行整个 Pipeline。

        流程:
        1. 找到 Source 节点,初始化输入
        2. 按层级执行
        3. 追踪执行状态
        """
        # 拓扑排序
        levels = self._topological_sort_levels()
        self._execution_order = levels

        results = {}
        errors: dict[str, str] = {}

        for level_idx, node_ids in enumerate(levels):
            # 并行执行当前层级的所有节点
            tasks = [
                self._execute_node(node_id, errors)
                for node_id in node_ids
            ]
            level_results = await asyncio.gather(*tasks, return_exceptions=True)

            # 检查结果
            for node_id, result in zip(node_ids, level_results):
                if isinstance(result, Exception):
                    errors[node_id] = str(result)
                    if not self.config.get("continue_on_error", False):
                        # 可以选择停止或继续
                        pass
                else:
                    results[node_id] = result

        return {
            "execution_id": self.execution_id,
            "status": "completed" if not errors else "partial",
            "completed_nodes": list(results.keys()),
            "failed_nodes": list(errors.keys()),
            "errors": errors,
        }

    async def _execute_node(
        self, node_id: str, errors: dict[str, str]
    ) -> dict[str, list[PipelineItem]]:
        """
        执行单个节点(含重试逻辑)。
        """
        node = self.nodes[node_id]
        max_retries = self.config.get("max_retries", 2)
        last_error: Exception | None = None

        for attempt in range(max_retries + 1):
            try:
                # 获取依赖输入
                inputs = await self._resolve_inputs(node_id)

                # 获取或创建节点实例
                instance = self._get_or_create_instance(node_id, node, inputs)

                # 执行(带超时)
                timeout = self.config.get("timeout_seconds", 300)
                result = await asyncio.wait_for(
                    instance.run(),
                    timeout=timeout
                )

                # 写入 VariablePool
                for output_name, items in result.items():
                    key = VariablePool.make_key(node_id, output_name)
                    await self.variable_pool.set(key, items)

                return result

            except asyncio.TimeoutError:
                last_error = Exception(f"Node {node_id} timed out after {self.config.get('timeout_seconds', 300)}s")
            except Exception as e:
                last_error = e
                if attempt < max_retries:
                    await asyncio.sleep(2 ** attempt)  # 指数退避

        # 所有重试都失败
        errors[node_id] = str(last_error)
        raise last_error

    def _get_or_create_instance(
        self, node_id: str, node: dict, inputs: dict[str, list[PipelineItem]]
    ) -> PipelineNode:
        """获取或创建节点实例(带缓存)"""
        if node_id not in self._node_instances:
            node_class = self.node_registry.get(node["type"], node.get("version", "1"))
            self._node_instances[node_id] = node_class(
                node_id=node_id,
                config=node.get("config", {}),
                inputs=inputs,
            )
        else:
            # 更新 inputs(每次执行都重新解析)
            self._node_instances[node_id].inputs = inputs
        return self._node_instances[node_id]

4.2 并行分支执行(asyncio)

async def _execute_parallel_branch(
    self,
    branch_nodes: list[str],
    shared_pool: VariablePool,
) -> dict[str, Any]:
    """
    执行一个并行分支(多个节点同时执行)。

    场景:Router 节点同时向多个 Sink 节点输出,
    Sink 节点并行执行 ingest。
    """
    tasks = [
        self._execute_node(node_id, {})
        for node_id in branch_nodes
    ]
    results = await asyncio.gather(*tasks, return_exceptions=True)
    return {node_id: r for node_id, r in zip(branch_nodes, results)}


# Router 节点执行示例:if_else 节点
class RouterIfElseNode(PipelineNode):
    type = "router.if_else"
    version = "1"

    async def run(self) -> dict[str, list[PipelineItem]]:
        from jinja2 import Template

        condition_expr = self.config.get("condition_expr", "false")

        # 从 inputs 获取条件值
        condition_input_items = self.inputs.get("condition_input", [])
        metadata_items = self.inputs.get("metadata", [])

        condition_value = (
            condition_input_items[0].data.get("text", "") if condition_input_items else ""
        )
        ctx = {"value": condition_value, "metadata": metadata_items[0].data if metadata_items else {}}

        try:
            # Jinja2 安全渲染条件表达式
            template = Template(f"{{{{ {condition_expr} }}}}")
            result = template.render(**ctx)
            is_true = result.lower() in ("true", "1", "yes")
        except Exception:
            is_true = False

        output = "true_branch" if is_true else "false_branch"

        return {
            output: [PipelineItem(id=f"{self.node_id}-{output}", data={"triggered": True})],
            "other_branch": [],  # 未选中的分支输出空列表
        }

4.3 条件路由(按文件类型/metadata 路由到不同分支)

Router 节点是 DAG 中的关键分支控制节点。参考 n8n 的 continueErrorOutput 策略和 dify 的 If/Else 节点设计:

class RouterFileTypeNode(PipelineNode):
    """
    按文件类型路由。

    配置 routes:[{"condition": "file_type == 'pdf'", "output": "pdf_branch"}, ...]
    输入:metadata(包含 file_type 字段)
    输出:每个 route 一个命名的控制信号输出
    """
    type = "router.file_type"
    version = "1"

    async def run(self) -> dict[str, list[PipelineItem]]:
        metadata_items = self.inputs.get("metadata", [])
        if not metadata_items:
            return {output: [] for output in self._output_names}

        file_type = metadata_items[0].data.get("file_type", "unknown")
        routes = self.config.get("routes", [])
        default_output = self.config.get("default_output", "default_branch")

        triggered_output = default_output
        for route in routes:
            condition = route.get("condition", "")
            # 简单条件解析:file_type == 'pdf'
            if self._eval_simple_condition(condition, {"file_type": file_type}):
                triggered_output = route["output"]
                break

        result = {}
        for output in self._output_names:
            result[output] = [PipelineItem(
                id=f"{self.node_id}-{output}",
                data={"file_type": file_type, "triggered": output == triggered_output}
            )] if output == triggered_output else []

        return result

    def _eval_simple_condition(self, condition: str, ctx: dict) -> bool:
        """评估简单条件表达式"""
        import re
        m = re.match(r"(\w+)\s*(==|in)\s*(.+)", condition.strip())
        if not m:
            return False
        field, op, value = m.group(1), m.group(2), m.group(3).strip()
        field_val = ctx.get(field, "")
        if op == "==":
            return str(field_val) == value.strip("'\"")
        elif op == "in":
            options = [v.strip().strip("'\"") for v in value[1:-1].split(",")]
            return str(field_val) in options
        return False

    @property
    def _output_names(self) -> list[str]:
        return list(self.config.get("routes", [])) + [self.config.get("default_output", "default_branch")]

4.4 错误处理与重试

参考 n8n 的三种错误处理策略:

策略 n8n 常量 说明 Pipeline 配置
停止(Stop) stopWorkflow 节点失败后整个 Pipeline 停止 "continue_on_error": false
继续(Continue) continueRegularOutput 失败节点输出空,继续执行后续节点 "continue_on_error": true
跳过(Skip) continueErrorOutput 将错误路由到错误输出分支 Router 节点处理
class ErrorHandlingStrategy:
    STOP = "stop"
    CONTINUE = "continue"
    SKIP = "skip"


async def _execute_node_with_error_handling(
    self,
    node_id: str,
    strategy: str = ErrorHandlingStrategy.CONTINUE,
) -> dict[str, list[PipelineItem]]:
    """
    执行节点并应用错误处理策略。
    """
    try:
        return await self._execute_node(node_id, {})
    except Exception as e:
        node = self.nodes[node_id]

        if strategy == ErrorHandlingStrategy.STOP:
            raise

        elif strategy == ErrorHandlingStrategy.CONTINUE:
            # 输出空 item,允许后续节点继续
            outputs = {}
            for output_def in self.node_registry.get_output_defs(node["type"]):
                outputs[output_def.name] = [PipelineItem(
                    id=f"error-{self.node_id}-{output_def.name}",
                    data={"error": str(e)},
                    errors=[str(e)],
                    source_node_id=node_id,
                )]
            return outputs

        elif strategy == ErrorHandlingStrategy.SKIP:
            # Router 节点会处理这个信号
            return {"error_output": [PipelineItem(
                id=f"error-{self.node_id}",
                data={"error": str(e)},
                errors=[str(e)],
                source_node_id=node_id,
            )]}

        raise

4.5 执行状态追踪(与 ragateway_documents 状态机对接)

Pipeline 执行状态与 V27 的 ragateway_documents.status 状态机对接:

class PipelineExecutionStatus:
    """
    与 ragateway_documents.status 状态机对接:
    queued → running → indexed | failed
    """
    QUEUED = "queued"
    RUNNING = "running"
    INDEXED = "indexed"
    FAILED = "failed"
    PARTIAL = "partial"  # 部分节点成功


class PipelineExecutor:
    """
    Pipeline 执行器(入口类)。
    替换 V27 中硬编码的:
    DocumentConverterPlugin.convert() → NaiveRAG.ingest() → GraphRAG[n].ingest()
    """

    def __init__(
        self,
        tenant_id: str,
        pipeline: dict,
        node_registry: "NodeRegistry",
        db_session,
        config: dict | None = None,
    ):
        self.tenant_id = tenant_id
        self.pipeline = pipeline
        self.node_registry = node_registry
        self.db_session = db_session
        self.config = config or {}
        self.execution_id = str(uuid.uuid4())
        self.variable_pool = VariablePool()
        self._status = PipelineExecutionStatus.QUEUED

    async def execute(self, doc_id: str, file_path: str) -> dict[str, Any]:
        """
        执行一个文档的 Pipeline。

        1. 初始化 Source 节点输入
        2. 执行 DAG
        3. 更新 ragateway_documents 状态
        """
        execution_id = self.execution_id

        # 1. 更新状态为 running
        await self._update_doc_status(doc_id, PipelineExecutionStatus.RUNNING)

        try:
            # 2. 注入 Source 节点输入
            await self.variable_pool.set(
                VariablePool.make_key("__source__", "file"),
                [PipelineItem(
                    id=f"file-{doc_id}",
                    data={"path": file_path, "doc_id": doc_id},
                    metadata={"doc_id": doc_id, "tenant_id": self.tenant_id},
                    source_node_id="__source__",
                )]
            )

            # 3. 替换 pipeline 中的 source 节点引用
            pipeline = self._resolve_source_node(self.pipeline, file_path)

            # 4. 执行 DAG
            engine = DAGExecutor(
                pipeline=pipeline,
                node_registry=self.node_registry,
                variable_pool=self.variable_pool,
                execution_id=execution_id,
                config=self.pipeline.get("config", {}),
            )
            result = await engine.execute()

            # 5. 更新状态
            if result["status"] == "completed":
                await self._update_doc_status(doc_id, PipelineExecutionStatus.INDEXED)
            elif result["status"] == "partial":
                await self._update_doc_status(doc_id, PipelineExecutionStatus.PARTIAL)
            else:
                await self._update_doc_status(doc_id, PipelineExecutionStatus.FAILED)

            return result

        except Exception as e:
            await self._update_doc_status(doc_id, PipelineExecutionStatus.FAILED)
            raise

    def _resolve_source_node(self, pipeline: dict, file_path: str) -> dict:
        """将 pipeline 中的 source.* 节点替换为实际文件路径"""
        resolved = copy.deepcopy(pipeline)
        for node in resolved["nodes"]:
            if node["type"].startswith("source."):
                node["config"]["file_path"] = file_path
        return resolved

    async def _update_doc_status(self, doc_id: str, status: str) -> None:
        """更新 ragateway_documents 表状态"""
        await self.db_session.execute(
            "UPDATE ragateway_documents SET status = %s, updated_at = NOW() WHERE doc_id = %s",
            status, doc_id
        )

补充2:完整的 DAGExecutor Python 实现(≥80 行)

以下为可直接嵌入 ragateway/pipeline/executor.py 的完整实现,包含 Kahn 拓扑排序、多层 asyncio 并行执行、三策略错误处理、以及 PostgreSQL 状态写入:

# ragateway/pipeline/executor.py
import asyncio
import copy
import logging
import uuid
from collections import defaultdict
from dataclasses import dataclass, field
from datetime import datetime, timezone
from enum import Enum
from typing import Any

from ragateway.pipeline.registry import NodeRegistry
from ragateway.pipeline.variable_pool import VariablePool


logger = logging.getLogger(__name__)


class ErrorStrategy(str, Enum):
    STOP    = "stop"     # 节点失败立即停止整个 Pipeline
    CONTINUE = "continue" # 节点失败输出错误 item,允许后续节点继续
    SKIP    = "skip"     # 节点失败路由到错误分支(由 Router 处理)


class ExecutionStatus(str, Enum):
    QUEUED   = "queued"
    RUNNING  = "running"
    INDEXED  = "indexed"
    FAILED   = "failed"
    PARTIAL  = "partial"


@dataclass
class NodeExecutionResult:
    node_id: str
    status: str          # "success" | "failed" | "skipped"
    duration_ms: int
    output_keys: list[str] = field(default_factory=list)
    error: str | None = None
    retry_count: int = 0


class DAGExecutor:
    """
    DAG 执行引擎(完整实现)。

    核心流程:
    1. build_execution_layers()  — Kahn 拓扑排序,按层级分组
    2. execute()                 — 主入口,按层 asyncio.gather 并行
    3. execute_node()            — 单节点执行,含三策略错误处理
    4. _write_status_to_db()     — 执行状态写入 PostgreSQL
    """

    def __init__(
        self,
        pipeline: dict,
        node_registry: NodeRegistry,
        variable_pool: VariablePool,
        execution_id: str,
        db_pool,           # asyncpg Pool 或 SQLAlchemy async session
        config: dict | None = None,
    ):
        self.pipeline = pipeline
        self.nodes: dict[str, dict] = {n["id"]: n for n in pipeline["nodes"]}
        self.edges = pipeline["edges"]
        self.node_registry = node_registry
        self.variable_pool = variable_pool
        self.execution_id = execution_id
        self.db_pool = db_pool
        self.config = config or {}
        self._node_instances: dict[str, Any] = {}
        self._execution_layers: list[list[str]] = []
        self._node_results: dict[str, NodeExecutionResult] = {}

    # ── 4.1 build_execution_layers:Kahn 拓扑排序,返回按层分组 ───────────────
    def build_execution_layers(self) -> list[list[str]]:
        """
        Kahn 算法拓扑排序,返回按层级分组的节点 ID 列表。

        返回:[[layer_0_nodes], [layer_1_nodes], ...]
        同一层的所有节点互相无依赖,可并行执行。
        """
        # 构建邻接表和入度表
        adj: dict[str, list[str]] = defaultdict(list)
        in_degree: dict[str, int] = {nid: 0 for nid in self.nodes}

        for edge in self.edges:
            adj[edge["source"]].append(edge["target"])
            in_degree[edge["target"]] += 1

        # Kahn 拓扑排序
        queue = [nid for nid, deg in in_degree.items() if deg == 0]
        layers: list[list[str]] = []

        while queue:
            layers.append(sorted(queue))   # 当前层所有节点(排序保证顺序一致)
            next_queue = []

            for node_id in queue:
                for successor in adj[node_id]:
                    in_degree[successor] -= 1
                    if in_degree[successor] == 0:
                        next_queue.append(successor)

            queue = next_queue

        self._execution_layers = layers
        logger.info(
            "[%s] Built %d execution layers: %s",
            self.execution_id,
            len(layers),
            [[len(n) for n in layers]],
        )
        return layers

    # ── 4.2 execute:主执行入口 ───────────────────────────────────────────────
    async def execute(self) -> dict[str, Any]:
        """
        主执行入口。

        流程:
        1. build_execution_layers() 拓扑排序
        2. 逐层 asyncio.gather 并行执行
        3. 每节点执行后写入 PostgreSQL 状态
        4. 最终汇总状态并写入
        """
        import time
        start_ts = time.monotonic()

        layers = self.build_execution_layers()

        # 写入 Pipeline 启动状态
        await self._write_execution_start(layers)

        errors: dict[str, str] = {}

        for layer_idx, node_ids in enumerate(layers):
            logger.info(
                "[%s] Executing layer %d/%d with %d nodes: %s",
                self.execution_id, layer_idx + 1, len(layers), len(node_ids), node_ids
            )

            # 获取本层节点的错误处理策略(默认 CONTINUE)
            tasks = [
                self._execute_node_with_strategy(
                    node_id,
                    ErrorStrategy(self.config.get("error_strategy", "continue"))
                )
                for node_id in node_ids
            ]

            layer_results = await asyncio.gather(*tasks, return_exceptions=True)

            for node_id, result in zip(node_ids, layer_results):
                if isinstance(result, Exception):
                    errors[node_id] = str(result)
                    logger.error("[%s] Node %s failed: %s", self.execution_id, node_id, result)
                elif isinstance(result, NodeExecutionResult):
                    self._node_results[node_id] = result
                    # 每节点完成后写入 DB
                    await self._write_node_result(result)

        # ── 汇总最终状态 ────────────────────────────────────────────────────
        total_duration_ms = int((time.monotonic() - start_ts) * 1000)
        if not errors:
            final_status = ExecutionStatus.INDEXED
        elif len(errors) < len(self._node_results):
            final_status = ExecutionStatus.PARTIAL
        else:
            final_status = ExecutionStatus.FAILED

        await self._write_execution_end(final_status, total_duration_ms, errors)

        return {
            "execution_id": self.execution_id,
            "status": final_status.value,
            "total_duration_ms": total_duration_ms,
            "completed_nodes": [nid for nid, r in self._node_results.items() if r.status == "success"],
            "failed_nodes": list(errors.keys()),
            "node_results": {
                nid: {"status": r.status, "duration_ms": r.duration_ms, "error": r.error}
                for nid, r in self._node_results.items()
            },
            "errors": errors,
        }

    # ── 4.3 execute_node:三策略错误处理 ─────────────────────────────────────
    async def _execute_node_with_strategy(
        self, node_id: str, strategy: ErrorStrategy
    ) -> NodeExecutionResult:
        """
        执行单个节点,应用错误处理策略。

        策略说明:
        - STOP:    重试用尽后直接抛出异常,Pipeline 终止
        - CONTINUE: 重试用尽后返回带错误数据的 NodeExecutionResult(failed)
        - SKIP:   重试用尽后返回 NodeExecutionResult(skipped),不阻塞后续节点
        """
        import time
        max_retries = self.config.get("max_retries", 2)
        timeout_sec = self.config.get("timeout_seconds", 300)
        node = self.nodes[node_id]
        last_error: Exception | None = None

        for attempt in range(max_retries + 1):
            attempt_start = time.monotonic()
            try:
                # ① 解析依赖输入
                inputs = await self._resolve_inputs(node_id)

                # ② 获取节点实例
                instance = self._get_or_create_instance(node_id, node, inputs)

                # ③ 执行(含超时)
                result = await asyncio.wait_for(instance.run(), timeout=timeout_sec)

                # ④ 写入 VariablePool
                for output_name, items in result.items():
                    key = VariablePool.make_key(node_id, output_name)
                    await self.variable_pool.set(key, items)

                duration_ms = int((time.monotonic() - attempt_start) * 1000)
                return NodeExecutionResult(
                    node_id=node_id,
                    status="success",
                    duration_ms=duration_ms,
                    output_keys=list(result.keys()),
                    retry_count=attempt,
                )

            except asyncio.TimeoutError:
                last_error = TimeoutError(
                    f"Node {node_id} timed out after {timeout_sec}s (attempt {attempt + 1})"
                )
            except Exception as e:
                last_error = e

            if attempt < max_retries:
                wait = 2 ** attempt
                logger.warning(
                    "[%s] Node %s attempt %d failed: %s — retrying in %ds",
                    self.execution_id, node_id, attempt + 1, last_error, wait
                )
                await asyncio.sleep(wait)

        # ── 所有重试耗尽,按策略处理 ────────────────────────────────────────
        duration_ms = int((time.monotonic() - attempt_start) * 1000)
        error_str = str(last_error) if last_error else "unknown"

        if strategy == ErrorStrategy.STOP:
            # 立即抛出,Pipeline 执行循环会捕获
            raise RuntimeError(f"Node {node_id} failed after {max_retries + 1} attempts: {error_str}")

        logger.error(
            "[%s] Node %s exhausted retries (%s), applying strategy: %s — %s",
            self.execution_id, node_id, max_retries + 1, strategy.value, error_str
        )

        if strategy == ErrorStrategy.CONTINUE:
            # 输出错误 item 到 VariablePool,允许后续节点继续
            await self.variable_pool.set(
                VariablePool.make_key(node_id, "_error"),
                [PipelineItem(id=f"err-{node_id}", data={"error": error_str},
                             errors=[error_str], source_node_id=node_id)]
            )
            return NodeExecutionResult(
                node_id=node_id, status="failed",
                duration_ms=duration_ms, error=error_str, retry_count=max_retries + 1,
            )

        # SKIP — 不输出任何数据,标记为 skipped
        return NodeExecutionResult(
            node_id=node_id, status="skipped",
            duration_ms=duration_ms, error=error_str, retry_count=max_retries + 1,
        )

    async def _resolve_inputs(self, node_id: str) -> dict[str, list[PipelineItem]]:
        """解析节点输入:从 VariablePool 获取依赖的 PipelineItem 列表"""
        node = self.nodes[node_id]
        resolved = {}
        for input_name, input_ref in node.get("inputs", {}).items():
            key = VariablePool.make_key(input_ref["from_node"], input_ref["from_output"])
            resolved[input_name] = await self.variable_pool.get(key)
        return resolved

    def _get_or_create_instance(self, node_id: str, node: dict, inputs: dict) -> PipelineNode:
        """获取或创建节点实例(带缓存,每次执行重新注入 inputs)"""
        if node_id not in self._node_instances:
            node_class = self.node_registry.get(node["type"], node.get("version", "1"))
            self._node_instances[node_id] = node_class(
                node_id=node_id,
                config=node.get("config", {}),
                inputs=inputs,
            )
        else:
            self._node_instances[node_id].inputs = inputs
        return self._node_instances[node_id]

    # ── 4.4 PostgreSQL 状态写入 ───────────────────────────────────────────────
    async def _write_execution_start(self, layers: list[list[str]]) -> None:
        """写入 pipeline_executions 启动记录"""
        try:
            async with self.db_pool.acquire() as conn:
                await conn.execute(
                    """
                    INSERT INTO pipeline_executions
                        (execution_id, tenant_id, pipeline_id, doc_id, status, started_at, node_results)
                    VALUES ($1, $2, $3, $4, $5, $6, $7)
                    ON CONFLICT (execution_id) DO NOTHING
                    """,
                    self.execution_id,
                    self.pipeline.get("tenant_id"),
                    self.pipeline.get("pipeline_id"),
                    self.config.get("doc_id"),
                    ExecutionStatus.RUNNING.value,
                    datetime.now(timezone.utc),
                    {},
                )
        except Exception as e:
            logger.error("[%s] Failed to write execution start: %s", self.execution_id, e)

    async def _write_node_result(self, result: NodeExecutionResult) -> None:
        """单节点完成后写入/更新 pipeline_executions.node_results JSONB"""
        try:
            async with self.db_pool.acquire() as conn:
                await conn.execute(
                    """
                    UPDATE pipeline_executions
                    SET node_results = node_results ||
                        jsonb_build_object($2, jsonb_build_object(
                            'status', $3, 'duration_ms', $4, 'error', $5, 'retry_count', $6
                        ))
                    WHERE execution_id = $1
                    """,
                    self.execution_id,
                    result.node_id,
                    result.status,
                    result.duration_ms,
                    result.error,
                    result.retry_count,
                )
        except Exception as e:
            logger.error("[%s] Failed to write node result for %s: %s",
                         self.execution_id, result.node_id, e)

    async def _write_execution_end(
        self,
        status: ExecutionStatus,
        total_duration_ms: int,
        errors: dict[str, str],
    ) -> None:
        """Pipeline 执行完成后更新最终状态"""
        try:
            async with self.db_pool.acquire() as conn:
                await conn.execute(
                    """
                    UPDATE pipeline_executions
                    SET status = $2,
                        completed_at = $3,
                        total_duration_ms = $4,
                        error_message = $5
                    WHERE execution_id = $1
                    """,
                    self.execution_id,
                    status.value,
                    datetime.now(timezone.utc),
                    total_duration_ms,
                    "; ".join(f"{n}: {e}" for n, e in errors.items()) if errors else None,
                )
        except Exception as e:
            logger.error("[%s] Failed to write execution end: %s", self.execution_id, e)

5. 节点注册机制

5.1 内置节点清单

所有内置节点(Python 模块,位于 ragateway/pipeline/nodes/):

ragateway/pipeline/nodes/
├── __init__.py
├── source/
│   ├── __init__.py
│   └── file_store.py          # source.file_store
├── converter/
│   ├── __init__.py
│   ├── mineru.py              # converter.mineru
│   ├── docling.py              # converter.docling
│   └── markitdown.py           # converter.markitdown
├── splitter/
│   ├── __init__.py
│   ├── recursive.py            # splitter.recursive
│   ├── semantic.py              # splitter.semantic
│   ├── fixed.py                 # splitter.fixed
│   └── section.py               # splitter.section
├── enricher/
│   ├── __init__.py
│   ├── metadata.py              # enricher.metadata
│   ├── ocr.py                  # enricher.ocr
│   └── translate.py             # enricher.translate
├── router/
│   ├── __init__.py
│   ├── file_type.py            # router.file_type
│   ├── metadata.py              # router.metadata
│   └── if_else.py              # router.if_else
└── sink/
    ├── __init__.py
    ├── ragflow_naive.py        # sink.ragflow_naive
    ├── ragflow_graphrag.py     # sink.ragflow_graphrag
    └── neo4j_wiki.py           # sink.neo4j_wiki

5.2 第三方节点 SDK(Python ABC + 注册装饰器)

参考 ComfyUI 的 NODE_CLASS_MAPPINGS 装饰器注册模式:

# ragateway/pipeline/registry.py

from typing import type as type_t


class NodeRegistry:
    """
    节点注册表。
    参考 ComfyUI NODE_CLASS_MAPPINGS 全局注册表 + dify NodeFactory。

    使用方式:

    @register_node(type="converter.my_custom", version="1")
    class MyCustomConverter(PipelineNode):
        type = "converter.my_custom"
        version = "1"
        ...
    """
    _registry: dict[str, dict[str, type_t[PipelineNode]]] = {}
    _metadata: dict[str, dict[str, "NodeMetadata"]] = {}

    @classmethod
    def register(
        cls,
        node_type: str,
        version: str,
        node_class: type_t[PipelineNode],
        metadata: "NodeMetadata | None" = None,
    ) -> None:
        """装饰器回调,实际注册"""
        if node_type not in cls._registry:
            cls._registry[node_type] = {}
        cls._registry[node_type][version] = node_class

        if metadata:
            if node_type not in cls._metadata:
                cls._metadata[node_type] = {}
            cls._metadata[node_type][version] = metadata

    @classmethod
    def get(
        cls, node_type: str, version: str | None = None
    ) -> type_t[PipelineNode]:
        """获取节点类"""
        versions = cls._registry.get(node_type, {})
        if not versions:
            raise KeyError(f"Unknown node type: {node_type}")
        if version and version in versions:
            return versions[version]
        # 返回最新版本(version 为 None 时)
        return versions.get("latest", versions.get("1", next(iter(versions.values()))))

    @classmethod
    def list_types(cls) -> list[str]:
        """列出所有已注册的节点类型"""
        return list(cls._registry.keys())

    @classmethod
    def get_metadata(cls, node_type: str, version: str = "1") -> "NodeMetadata | None":
        """获取节点元信息"""
        return cls._metadata.get(node_type, {}).get(version)


@dataclass
class NodeMetadata:
    type: str
    version: str = "1"
    category: str = ""
    display_name: str = ""
    description: str = ""
    icon: str = ""
    input_defs: list[NodeInputDef] = field(default_factory=list)
    output_defs: list[NodeOutputDef] = field(default_factory=list)
    config_defs: list[NodeConfigField] = field(default_factory=list)


def register_node(
    node_type: str,
    version: str = "1",
    category: str = "",
    display_name: str = "",
    description: str = "",
    icon: str = "",
    input_defs: list[NodeInputDef] | None = None,
    output_defs: list[NodeOutputDef] | None = None,
    config_defs: list[NodeConfigField] | None = None,
) -> callable:
    """
    节点注册装饰器。

    使用示例:

    @register_node(
        type="converter.my_custom",
        version="1",
        category="Converter",
        display_name="我的自定义转换器",
        description="将任何文档转换为 Markdown",
        icon="🔧",
    )
    class MyCustomConverter(PipelineNode):
        type = "converter.my_custom"
        version = "1"

        async def run(self) -> dict[str, list[PipelineItem]]:
            ...
    """
    def decorator(cls: type_t[PipelineNode]) -> type_t[PipelineNode]:
        # 注册节点类
        NodeRegistry.register(node_type, version, cls)

        # 注册元信息
        meta = NodeMetadata(
            type=node_type,
            version=version,
            category=category,
            display_name=display_name or cls.__name__,
            description=description,
            icon=icon,
            input_defs=input_defs or [],
            output_defs=output_defs or [],
            config_defs=config_defs or [],
        )
        NodeRegistry.register(node_type, version, cls, metadata=meta)

        # 设置类属性
        cls.type = node_type
        cls.version = version

        return cls

    return decorator


# ─── 内置节点导入(启动时注册) ─────────────────────────────────────────────
def load_builtin_nodes() -> None:
    """导入所有内置节点模块,触发 @register_node 装饰器"""
    import ragateway.pipeline.nodes.source
    import ragateway.pipeline.nodes.converter
    import ragateway.pipeline.nodes.splitter
    import ragateway.pipeline.nodes.enricher
    import ragateway.pipeline.nodes.router
    import ragateway.pipeline.nodes.sink

第三方节点使用示例:

# third_party_node.py(第三方节点)
from ragateway.pipeline.registry import register_node, NodeInputDef, NodeOutputDef, NodeConfigField, PipelineNode, PipelineItem


@register_node(
    type="converter.tesseract_ocr",
    version="1",
    category="Converter",
    display_name="Tesseract OCR 转换器",
    description="使用 Tesseract 进行 OCR 文字识别",
    icon="🔍",
    input_defs=[
        NodeInputDef(name="image", type="file", required=True),
    ],
    output_defs=[
        NodeOutputDef(name="text", type="text"),
    ],
    config_defs=[
        NodeConfigField(name="lang", type="select", label="语言",
                        options=["eng", "chi_sim", "jpn"], default="eng"),
    ],
)
class TesseractOCRNode(PipelineNode):
    type = "converter.tesseract_ocr"
    version = "1"

    async def run(self) -> dict[str, list[PipelineItem]]:
        import pytesseract
        from PIL import Image

        image_items = self.inputs.get("image", [])
        lang = self.config.get("lang", "eng")

        results = []
        for item in image_items:
            image_path = item.data["path"]
            text = pytesseract.image_to_string(Image.open(image_path), lang=lang)
            results.append(PipelineItem(
                id=f"ocr-{item.id}",
                data={"text": text},
                metadata=item.metadata,
                source_node_id=self.node_id,
            ))

        return {"text": results}

补充3:完整的第三方 OCR 节点 SDK 示例(开发自定义节点)

以下是一个完整的第三方节点开发示例,实现 enricher.paddle_ocr 节点(基于 PaddleOCR),展示:
- @register_node 装饰器完整用法
- INPUT_TYPES / OUTPUT_TYPES 定义(参考 ComfyUI 风格)
- config_schema(JSON Schema,节点配置 UI 渲染依据)
- execute() 异步实现 + 错误处理
- 重试机制与错误 item 传播

# third_party_nodes/paddle_ocr_node.py
"""
第三方节点示例:PaddleOCR 文字识别节点
安装:pip install paddlepaddle paddleocr
使用:将此文件放入 site-packages/ragateway_nodes/paddle_ocr_node.py
     ragateway 启动时自动发现并注册,无需重启进程(热加载)。
"""

import asyncio
import logging
from dataclasses import dataclass
from typing import Any

from ragateway.pipeline.registry import register_node, NodeInputDef, NodeOutputDef
from ragateway.pipeline.registry import NodeConfigField, PipelineNode, PipelineItem

logger = logging.getLogger(__name__)


@dataclass
class PaddleOCRConfigSchema:
    """PaddleOCR 节点配置 schema(JSON Schema 定义,UI 渲染依据)"""
    lang: dict = None  # 将在装饰器中传入

    def __post_init__(self):
        self.lang = {
            "type": "string",
            "enum": ["ch", "en", "japan", "korean", "chinese_cht"],
            "default": "ch",
            "description": "OCR 识别语言",
        }


@register_node(
    type="enricher.paddle_ocr",
    version="1",
    category="Enricher",
    display_name="PaddleOCR 文字识别",
    description="基于 PaddleOCR 的图片/PDF 文字识别,支持多语言",
    icon="🖼️",
    input_defs=[
        NodeInputDef(
            name="image",
            type="file",
            required=True,
            description="支持图片文件路径或 PDF 临时解压路径",
        ),
        NodeInputDef(
            name="metadata",
            type="metadata",
            required=False,
            description="上游元数据,透传到输出",
        ),
    ],
    output_defs=[
        NodeOutputDef(
            name="text",
            type="text",
            description="OCR 识别结果文本(按行聚合)",
        ),
        NodeOutputDef(
            name="word_boxes",
            type="json",
            description="每个词的边界框坐标(用于后续布局分析)",
        ),
        NodeOutputDef(
            name="metadata",
            type="metadata",
            description="透传的元数据",
        ),
    ],
    config_defs=[
        NodeConfigField(
            name="lang",
            type="select",
            label="识别语言",
            default="ch",
            options=["ch", "en", "japan", "korean", "chinese_cht"],
            description="PaddleOCR 模型语言,ch=中文,en=英文",
        ),
        NodeConfigField(
            name="use_angle_cls",
            type="boolean",
            label="自动旋转校正",
            default=True,
            description="是否启用方向分类器(适合含旋转文本的扫描件)",
        ),
        NodeConfigField(
            name="det_limit_side_len",
            type="integer",
            label="检测边长限制",
            default=960,
            description="文字检测最大边长(像素),越大越慢但更准",
        ),
        NodeConfigField(
            name="merge_spacing",
            type="boolean",
            label="合并段落空格",
            default=False,
            description="将行间空格替换为段落分隔符,适合长文档",
        ),
        NodeConfigField(
            name="fail_strategy",
            type="select",
            label="失败策略",
            default="continue",
            options=["stop", "continue", "skip"],
            description="识别失败时的处理策略(stop=停止Pipeline,continue=输出错误item,skip=静默跳过)",
        ),
    ],
)
class PaddleOCRNode(PipelineNode):
    """
    PaddleOCR 节点:支持图片和 PDF 页面的文字识别。

    输入:
        image: 文件路径(str)
        metadata: 上游透传的元数据字典

    输出:
        text: 识别文本(str)
        word_boxes: 词边界框列表(list[dict])
        metadata: 透传的元数据(dict)
    """

    type = "enricher.paddle_ocr"
    version = "1"

    async def run(self) -> dict[str, list[PipelineItem]]:
        import os
        from pathlib import Path

        image_items = self.inputs.get("image", [])
        metadata_items = self.inputs.get("metadata", [])
        config = self.config

        if not image_items:
            logger.warning("[%s] No image input items, returning empty", self.node_id)
            return {
                "text": [],
                "word_boxes": [],
                "metadata": metadata_items or [],
            }

        # ── 初始化 PaddleOCR(延迟加载,避免启动耗时) ──────────────────────
        lang = config.get("lang", "ch")
        use_angle_cls = config.get("use_angle_cls", True)
        det_limit = config.get("det_limit_side_len", 960)

        try:
            from paddleocr import PaddleOCR
        except ImportError:
            logger.error(
                "[%s] PaddleOCR not installed. Run: pip install paddlepaddle paddleocr",
                self.node_id
            )
            return await self._handle_failure(
                "PaddleOCR package not installed",
                metadata_items,
                config.get("fail_strategy", "continue"),
            )

        # 单例缓存(避免重复初始化)
        cache_key = f"paddle_ocr_{lang}_{use_angle_cls}"
        if not hasattr(PaddleOCRNode, "_ocr_cache"):
            PaddleOCRNode._ocr_cache: dict = {}

        if cache_key not in PaddleOCRNode._ocr_cache:
            logger.info("[%s] Initializing PaddleOCR (lang=%s, cls=%s)",
                        self.node_id, lang, use_angle_cls)
            PaddleOCRNode._ocr_cache[cache_key] = PaddleOCR(
                lang=lang,
                use_angle_cls=use_angle_cls,
                det_limit_side_len=det_limit,
                show_log=False,
            )
        ocr = PaddleOCRNode._ocr_cache[cache_key]

        # ── 逐文件 OCR 识别 ──────────────────────────────────────────────
        all_texts: list[str] = []
        all_word_boxes: list[dict] = []
        errors: list[str] = []

        for item in image_items:
            file_path = item.data.get("path", "") if isinstance(item.data, dict) else str(item.data)

            if not file_path or not Path(file_path).exists():
                errors.append(f"File not found: {file_path}")
                logger.error("[%s] File not found: %s", self.node_id, file_path)
                continue

            try:
                # PaddleOCR.ocr() 是同步的,放在线程池中避免阻塞事件循环
                loop = asyncio.get_event_loop()
                ocr_result = await loop.run_in_executor(
                    None, lambda: ocr.ocr(file_path, cls=use_angle_cls)
                )

                if not ocr_result or not ocr_result[0]:
                    logger.warning("[%s] No text found in %s", self.node_id, file_path)
                    continue

                # ocr_result: [[[[x1,y1],[x2,y2],[x3,y3],[x4,y3]], (text, confidence)], ...]
                for line_result in ocr_result[0]:
                    if not line_result:
                        continue
                    box = line_result[0]     # 4点坐标
                    text_info = line_result[1]
                    text = text_info[0] if isinstance(text_info, tuple) else str(text_info)

                    all_texts.append(text)
                    all_word_boxes.append({
                        "text": text,
                        "box": box,
                        "confidence": text_info[1] if isinstance(text_info, tuple) and len(text_info) > 1 else None,
                        "source_file": file_path,
                        "source_item_id": item.id,
                    })

                logger.info(
                    "[%s] OCR completed for %s: %d lines, %.1f avg confidence",
                    self.node_id, Path(file_path).name,
                    len(all_texts),
                    sum(w["confidence"] for w in all_word_boxes if w["confidence"]) /
                    max(1, sum(1 for w in all_word_boxes if w["confidence"]))
                )

            except Exception as e:
                errors.append(f"OCR failed for {file_path}: {str(e)}")
                logger.error("[%s] OCR error for %s: %s", self.node_id, file_path, e)
                fail_strategy = config.get("fail_strategy", "continue")
                if fail_strategy == "stop":
                    raise
                elif fail_strategy == "skip":
                    continue  # 静默跳过此文件
                # continue 模式:收集错误但不中断

        # ── 组装输出 PipelineItem ─────────────────────────────────────────
        merge_spacing = config.get("merge_spacing", False)
        final_text = "\n".join(all_texts)
        if merge_spacing:
            final_text = final_text.replace(" ", "")

        # 透传元数据
        output_metadata = metadata_items[0].data if metadata_items else {}
        output_metadata["ocr_lang"] = lang
        output_metadata["ocr_lines"] = len(all_texts)
        output_metadata["ocr_errors"] = errors if errors else None

        return {
            "text": [
                PipelineItem(
                    id=f"ocr-text-{self.node_id}",
                    data={"text": final_text},
                    metadata=output_metadata,
                    source_node_id=self.node_id,
                )
            ],
            "word_boxes": [
                PipelineItem(
                    id=f"ocr-boxes-{self.node_id}",
                    data={"word_boxes": all_word_boxes},
                    metadata=output_metadata,
                    source_node_id=self.node_id,
                )
            ],
            "metadata": [
                PipelineItem(
                    id=f"ocr-meta-{self.node_id}",
                    data=output_metadata,
                    metadata=output_metadata,
                    source_node_id=self.node_id,
                )
            ],
        }

    async def _handle_failure(
        self, error_msg: str, metadata_items: list[PipelineItem], fail_strategy: str
    ) -> dict[str, list[PipelineItem]]:
        """处理节点失败,按 fail_strategy 策略返回"""
        if fail_strategy == "stop":
            raise RuntimeError(f"[{self.node_id}] {error_msg}")

        logger.warning("[%s] OCR failure (strategy=%s): %s",
                       self.node_id, fail_strategy, error_msg)

        error_item = PipelineItem(
            id=f"ocr-error-{self.node_id}",
            data={"error": error_msg},
            errors=[error_msg],
            source_node_id=self.node_id,
        )

        if fail_strategy == "skip":
            return {"text": [], "word_boxes": [], "metadata": metadata_items}

        # continue — 输出错误 item,允许后续节点继续
        return {
            "text": [error_item],
            "word_boxes": [error_item],
            "metadata": metadata_items,
        }

节点注册流程说明(热加载):

# ragateway/pipeline/hot_loader.py
import importlib
import pkgutil
import logging

logger = logging.getLogger(__name__)


def discover_and_register_third_party_nodes() -> int:
    """
    扫描 site-packages/ragateway_nodes/ 下的所有 Python 模块,
    自动导入触发 @register_node 装饰器注册。

    返回:注册成功的节点数量。
    """
    import sys
    from pathlib import Path

    count = 0
    for path in sys.path:
        node_dir = Path(path) / "ragateway_nodes"
        if not node_dir.exists():
            continue

        logger.info("Discovering third-party nodes in: %s", node_dir)
        for _, module_name, _ in pkgutil.iter_modules([str(node_dir)]):
            try:
                full_name = f"ragateway_nodes.{module_name}"
                importlib.import_module(full_name)
                count += 1
                logger.info("  ✓ Loaded: %s", full_name)
            except ImportError as e:
                logger.warning("  ✗ Failed to import %s: %s", module_name, e)

    return count

5.3 节点发现与热加载

import importlib
import pkgutil
from pathlib import Path


class HotReloadableNodeRegistry(NodeRegistry):
    """
    支持热加载的节点注册表。

    第三方节点通过 pip 安装到 site-packages/ragateway_nodes/ 后,
    自动发现并注册,无需重启 ragateway 进程。
    """

    _extra_paths: list[Path] = []

    @classmethod
    def add_extra_path(cls, path: Path) -> None:
        """添加第三方节点搜索路径"""
        cls._extra_paths.append(path)

    @classmethod
    def discover_nodes(cls) -> None:
        """扫描并加载所有节点(内置 + 第三方)"""
        # 1. 加载内置节点
        load_builtin_nodes()

        # 2. 扫描第三方节点路径
        for extra_path in cls._extra_paths:
            if not extra_path.exists():
                continue
            for _, module_name, _ in pkgutil.walk_packages([str(extra_path)]):
                try:
                    importlib.import_module(module_name)
                except ImportError as e:
                    logging.warning(f"Failed to import node module {module_name}: {e}")

6. 租户级 Pipeline 管理

6.1 数据库设计(tenant_pipelines 表)

-- Pipeline 元数据表
CREATE TABLE tenant_pipelines (
    id              UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    tenant_id       UUID NOT NULL REFERENCES tenants(id) ON DELETE CASCADE,
    pipeline_id     UUID NOT NULL UNIQUE,          -- 对外暴露的 ID
    name            VARCHAR(255) NOT NULL,
    description     TEXT,
    dag_json        JSONB NOT NULL,               -- 完整的 pipeline DAG 配置
    version         INTEGER NOT NULL DEFAULT 1,    -- pipeline 版本(乐观锁)
    is_active       BOOLEAN NOT NULL DEFAULT false, -- 是否为当前激活的 pipeline
    is_default      BOOLEAN NOT NULL DEFAULT false, -- 是否为默认 pipeline
    created_by      UUID REFERENCES users(id),
    updated_by      UUID REFERENCES users(id),
    created_at      TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    updated_at      TIMESTAMPTZ NOT NULL DEFAULT NOW(),

    -- 约束:每个租户只能有一个激活的默认 pipeline
    EXCLUDE USING gist (
        tenant_id WITH =,
        is_default WITH =
    ) WHERE (is_default = true)
);

-- Pipeline 执行历史表
CREATE TABLE pipeline_executions (
    id                  UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    execution_id        VARCHAR(64) NOT NULL UNIQUE,
    tenant_id           UUID NOT NULL REFERENCES tenants(id),
    pipeline_id         UUID NOT NULL REFERENCES tenant_pipelines(pipeline_id),
    doc_id              UUID REFERENCES ragateway_documents(doc_id),
    status              VARCHAR(32) NOT NULL,  -- queued/running/indexed/failed/partial
    started_at          TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    completed_at        TIMESTAMPTZ,
    node_results        JSONB,   -- {node_id: {status, duration_ms, error}}
    total_duration_ms   INTEGER,
    error_message       TEXT,
    INDEX idx_tenant_status (tenant_id, status),
    INDEX idx_doc_id (doc_id),
);

-- Pipeline 版本历史表(审计用)
CREATE TABLE pipeline_versions (
    id              UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    pipeline_id     UUID NOT NULL REFERENCES tenant_pipelines(pipeline_id),
    version         INTEGER NOT NULL,
    dag_json        JSONB NOT NULL,
    changed_by      UUID REFERENCES users(id),
    changed_at      TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    change_summary  TEXT,
    UNIQUE (pipeline_id, version)
);

6.2 API 设计(CRUD + 激活/切换)

POST   /api/admin/pipelines              # 创建新 pipeline
GET    /api/admin/pipelines              # 列出所有 pipeline(含 DAG)
GET    /api/admin/pipelines/{id}         # 获取单个 pipeline 详情
PUT    /api/admin/pipelines/{id}         # 更新 pipeline(DAG)
DELETE /api/admin/pipelines/{id}         # 删除 pipeline
POST   /api/admin/pipelines/{id}/activate   # 激活为当前 pipeline
POST   /api/admin/pipelines/{id}/duplicate  # 复制 pipeline
GET    /api/admin/pipelines/{id}/versions  # 版本历史

GET    /api/admin/node-types             # 列出所有可用节点类型(含元信息)
GET    /api/admin/node-types/{type}      # 获取特定节点类型的详细信息
GET    /api/admin/executions/{id}       # 获取执行结果

POST   /api/admin/pipelines/{id}/dry-run  # 干运行验证 DAG 可达性

API 请求/响应示例:

# POST /api/admin/pipelines
{
  "name": "RegDesk 医疗文档标准流程",
  "description": "处理医疗设备注册文档的默认流程",
  "dag_json": { ... },  # 完整的 pipeline JSON
  "is_default": true
}

# 响应:
{
  "pipeline_id": "pipeline-uuid-xxx",
  "status": "draft",
  "message": "Pipeline created. Activate it to use."
}

# POST /api/admin/pipelines/{id}/activate
# 响应:
{
  "pipeline_id": "pipeline-uuid-xxx",
  "is_active": true,
  "previous_pipeline_id": "pipeline-uuid-yyy",  # 切换前的 pipeline
  "message": "Pipeline activated. All new uploads will use this pipeline."
}

# GET /api/admin/node-types
# 响应:
{
  "node_types": [
    {
      "type": "converter.mineru",
      "version": "1",
      "category": "Converter",
      "display_name": "MinerU 文档转换器",
      "description": "使用 MinerU 引擎将 PDF/DOCX 等文档转换为 Markdown",
      "icon": "📄",
      "inputs": [
        {"name": "file", "type": "file", "required": True, "description": "原始文件路径"}
      ],
      "outputs": [
        {"name": "markdown", "type": "markdown"},
        {"name": "toc", "type": "toc"},
        {"name": "metadata", "type": "metadata"}
      ],
      "config_fields": [
        {"name": "backend", "type": "select", "label": "转换引擎",
         "options": ["hybrid-auto-engine", "vlm-auto-engine", "pipeline"]}
      ]
    },
    ...
  ]
}

6.3 全局默认 Pipeline(向下兼容 V27)

当租户没有配置自己的 pipeline 时,使用全局默认 pipeline:

# ragateway/pipeline/defaults.py

V27_DEFAULT_PIPELINE = {
    "version": "1",
    "pipeline_id": "__v27_default__",
    "tenant_id": "__system__",
    "name": "V27 兼容默认 Pipeline",
    "description": "向后兼容 V27 的硬编码流程",
    "config": {"max_concurrency": 4, "timeout_seconds": 600},
    "nodes": [
        {
            "id": "source",
            "type": "source.file_store",
            "version": "1",
            "name": "文件来源",
            "inputs": {},
            "outputs": ["file"],
            "config": {},
        },
        {
            "id": "converter",
            "type": "converter.mineru",
            "version": "1",
            "name": "文档转换",
            "inputs": {"file": {"from_node": "source", "from_output": "file"}},
            "outputs": ["markdown", "toc", "metadata"],
            "config": {"backend": "hybrid-auto-engine", "lang_list": ["en", "zh"]},
        },
        {
            "id": "splitter",
            "type": "splitter.recursive",
            "version": "1",
            "name": "文本分块",
            "inputs": {"text": {"from_node": "converter", "from_output": "markdown"}},
            "outputs": ["chunks"],
            "config": {"chunk_size": 512, "chunk_overlap": 64},
        },
        {
            "id": "sink-naive",
            "type": "sink.ragflow_naive",
            "version": "1",
            "name": "NaiveRAG 摄入",
            "inputs": {
                "chunks": {"from_node": "splitter", "from_output": "chunks"},
                "metadata": {"from_node": "converter", "from_output": "metadata"},
            },
            "outputs": ["result"],
            "config": {"enabled": True},
        },
        {
            "id": "sink-graphrag",
            "type": "sink.ragflow_graphrag",
            "version": "1",
            "name": "GraphRAG 摄入",
            "inputs": {
                "markdown": {"from_node": "converter", "from_output": "markdown"},
                "metadata": {"from_node": "converter", "from_output": "metadata"},
            },
            "outputs": ["result"],
            "config": {"source": "ragflow_general", "enabled": True},
        },
    ],
    "edges": [
        {"id": "e1", "source": "source", "source_output": "file", "target": "converter", "target_input": "file"},
        {"id": "e2", "source": "converter", "source_output": "markdown", "target": "splitter", "target_input": "text"},
        {"id": "e3", "source": "converter", "source_output": "metadata", "target": "sink-naive", "target_input": "metadata"},
        {"id": "e4", "source": "converter", "source_output": "metadata", "target": "sink-graphrag", "target_input": "metadata"},
        {"id": "e5", "source": "splitter", "source_output": "chunks", "target": "sink-naive", "target_input": "chunks"},
        {"id": "e6", "source": "converter", "source_output": "markdown", "target": "sink-graphrag", "target_input": "markdown"},
    ],
}

7. 与 V27 的集成

7.1 替换 V27 硬编码流程的具体方案

V27 的 upsert 接口 (POST /api/file/upsert) 核心逻辑变化:

# V27 硬编码(ragateway/api/file_upsert.py):
async def handle_upsert(file, metadata, converter_name):
    # 1. 转换
    converter = await plugin_manager.get_converter(converter_name)
    result = await converter.convert(file.path)

    # 2. 分块(硬编码在 ragateway 里)
    chunks = await chunk_text(result.markdown, chunk_size=512, overlap=64)

    # 3. NaiveRAG ingest
    await naive_plugin.ingest(doc_id, chunks, metadata)

    # 4. GraphRAG ingest(并行)
    await asyncio.gather(*[
        gp.ingest(doc_id, result.markdown, metadata)
        for gp in graphrag_plugins
    ])

# V28 替换为:
async def handle_upsert(file, metadata, tenant_id):
    # 1. 获取当前激活的 pipeline
    pipeline = await get_active_pipeline(tenant_id)
    if not pipeline:
        pipeline = V27_DEFAULT_PIPELINE  # 向下兼容

    # 2. PipelineExecutor 执行
    executor = PipelineExecutor(
        tenant_id=tenant_id,
        pipeline=pipeline,
        node_registry=HotReloadableNodeRegistry,
        db_session=db,
    )
    result = await executor.execute(doc_id=doc_id, file_path=file.path)
    return result

7.2 向下兼容策略

场景 行为
租户没有配置 pipeline 使用 V27_DEFAULT_PIPELINE
converter 请求参数仍传递 转换为 pipeline 中的 converter 节点配置
ragateway_documents.status 状态机 PipelineExecutor 负责更新,与 V27 完全一致
现有 /api/file/upsert 接口签名 不变化,内部改为调用 PipelineExecutor

7.3 配置迁移路径

# 将 V27 pending.yaml 配置映射为 V28 Pipeline
def migrate_v27_config_to_pipeline(v27_config: dict, tenant_id: str) -> dict:
    """
    将 V27 的 converter/naive_rag/graphrag 配置转换为 V28 Pipeline DAG。

    用于首次启用 V28 时,自动将已有 V27 配置生成为 Pipeline。
    """
    nodes = []
    edges = []
    node_counter = 1

    # Source
    source_id = f"node-{node_counter:03d}"
    nodes.append({
        "id": source_id, "type": "source.file_store", "version": "1",
        "name": "文件来源", "inputs": {}, "outputs": ["file"], "config": {}
    })
    node_counter += 1

    # Converter
    conv_id = f"node-{node_counter:03d}"
    active_conv = v27_config.get("converter", {}).get("active", "mineru")
    conv_cfg = v27_config.get("converter", {}).get(active_conv, {})
    nodes.append({
        "id": conv_id, "type": f"converter.{active_conv}", "version": "1",
        "name": "文档转换",
        "inputs": {"file": {"from_node": source_id, "from_output": "file"}},
        "outputs": ["markdown", "toc", "metadata"],
        "config": conv_cfg,
    })
    edges.append({"id": f"e{len(edges)+1}", "source": source_id,
                   "source_output": "file", "target": conv_id, "target_input": "file"})
    node_counter += 1

    # Splitter
    splitter_id = f"node-{node_counter:03d}"
    nodes.append({
        "id": splitter_id, "type": "splitter.recursive", "version": "1",
        "name": "文本分块",
        "inputs": {"text": {"from_node": conv_id, "from_output": "markdown"}},
        "outputs": ["chunks"],
        "config": {"chunk_size": 512, "chunk_overlap": 64},
    })
    edges.append({"id": f"e{len(edges)+1}", "source": conv_id,
                   "source_output": "markdown", "target": splitter_id, "target_input": "text"})
    node_counter += 1

    # NaiveRAG
    if v27_config.get("naive_rag", {}).get("active"):
        naive_id = f"node-{node_counter:03d}"
        nodes.append({
            "id": naive_id, "type": "sink.ragflow_naive", "version": "1",
            "name": "NaiveRAG",
            "inputs": {
                "chunks": {"from_node": splitter_id, "from_output": "chunks"},
                "metadata": {"from_node": conv_id, "from_output": "metadata"},
            },
            "outputs": ["result"],
            "config": {"enabled": True},
        })
        edges.append({"id": f"e{len(edges)+1}", "source": splitter_id,
                       "source_output": "chunks", "target": naive_id, "target_input": "chunks"})
        edges.append({"id": f"e{len(edges)+1}", "source": conv_id,
                       "source_output": "metadata", "target": naive_id, "target_input": "metadata"})
        node_counter += 1

    # GraphRAG
    active_graphrags = v27_config.get("graphrag", {}).get("active", [])
    for i, gr_name in enumerate(active_graphrags):
        gr_id = f"node-{node_counter:03d}"
        nodes.append({
            "id": gr_id, "type": "sink.ragflow_graphrag", "version": "1",
            "name": f"GraphRAG-{gr_name}",
            "inputs": {
                "markdown": {"from_node": conv_id, "from_output": "markdown"},
                "metadata": {"from_node": conv_id, "from_output": "metadata"},
            },
            "outputs": ["result"],
            "config": {"source": gr_name, "enabled": True},
        })
        edges.append({"id": f"e{len(edges)+1}", "source": conv_id,
                       "source_output": "markdown", "target": gr_id, "target_input": "markdown"})
        edges.append({"id": f"e{len(edges)+1}", "source": conv_id,
                       "source_output": "metadata", "target": gr_id, "target_input": "metadata"})
        node_counter += 1

    return {
        "version": "1",
        "pipeline_id": str(uuid.uuid4()),
        "tenant_id": tenant_id,
        "name": f"V27 配置迁移 Pipeline",
        "description": "由 V27 配置自动生成",
        "config": {"max_concurrency": 4, "timeout_seconds": 600},
        "nodes": nodes,
        "edges": edges,
    }

8. ragateway-ui 编辑器

8.1 技术选型(React Flow vs 其他)

方案 优点 缺点
React Flow 专为 DAG 可视化设计,TypeScript,节点自定义能力强,与 React 技术栈契合 需要 React
JointJS 老牌图库,功能全 API 较旧,UI 定制成本高
D3.js 灵活,可做任何图 工作量大,不适合 DAG 编辑
BPMN.js 面向业务流程建模 不适合 RAG pipeline 场景

选型:React Flow

理由:
- React Flow 的自定义节点 API 完美匹配 INPUT_TYPES / OUTPUT_TYPES 强类型设计
- 支持条件边(animated edges),可用于 Router 节点
- 内置 minimap、controls、背景网格,开箱即用
- 社区活跃,TypeScript 支持好

8.2 节点面板设计(从 /api/admin/node-types 动态加载)

// ragateway-ui/src/components/PipelineEditor/
//   - PipelineCanvas.tsx    (React Flow 主画布)
//   - NodePalette.tsx      (左侧节点面板)
//   - NodeConfigPanel.tsx  (右侧节点配置面板)
//   - EdgeValidation.tsx   (连接校验逻辑)

import React, { useCallback, useState } from "react";
import ReactFlow, {
  Node,
  Edge,
  Controls,
  Background,
  MiniMap,
  addEdge,
  Connection,
  useNodesState,
  useEdgesState,
  MarkerType,
} from "reactflow";
import "reactflow/dist/style.css";

// 节点类型映射(动态从后端加载)
const NODE_TYPE_COMPONENTS = {
  "source.*": SourceNodeComponent,
  "converter.*": ConverterNodeComponent,
  "splitter.*": SplitterNodeComponent,
  "enricher.*": EnricherNodeComponent,
  "router.*": RouterNodeComponent,
  "sink.*": SinkNodeComponent,
};

function PipelineCanvas() {
  const [nodes, setNodes, onNodesChange] = useNodesState([]);
  const [edges, setEdges, onEdgesChange] = useEdgesState([]);
  const [nodeTypes, setNodeTypes] = useState<NodeType[]>([]);

  // 从后端加载节点类型
  useEffect(() => {
    fetch("/api/admin/node-types")
      .then(r => r.json())
      .then(data => setNodeTypes(data.node_types));
  }, []);

  // 添加节点
  const onDragOver = useCallback((event: React.DragEvent) => {
    event.preventDefault();
    event.dataTransfer.dropEffect = "move";
  }, []);

  const onDrop = useCallback((event: React.DragEvent) => {
    event.preventDefault();
    const nodeType = event.dataTransfer.getData("application/reactflow");
    const position = { x: event.clientX, y: event.clientY };


    // 查找匹配的节点类型
    const matchedType = nodeTypes.find(nt => {
      const regex = new RegExp(`^${nt.type.replace("*", ".*")}$`);
      return regex.test(nodeType);
    });

    const newNode = {
      id: `node-${Date.now()}`,
      type: nodeType,
      position,
      data: { label: matchedType?.display_name || nodeType, config: {} },
    };
    setNodes(nodes => [...nodes, newNode]);
  }, [nodes, nodeTypes]);

  // 连接校验:类型兼容性检查
  const onConnect = useCallback((connection: Connection) => {
    if (!connection.source || !connection.target) return;

    // 从 nodeTypes 获取源输出类型和目标输入类型
    const sourceNode = nodes.find(n => n.id === connection.source);
    const targetNode = nodes.find(n => n.id === connection.target);
    if (!sourceNode || !targetNode) return;

    const sourceType = sourceNode.type;
    const sourceOutput = connection.sourceHandle;
    const targetInput = connection.targetHandle;

    // 验证类型兼容性
    const sourceNodeMeta = nodeTypes.find(nt => nt.type === sourceType);
    if (!sourceNodeMeta) return;

    const outputDef = sourceNodeMeta.outputs.find(o => o.name === sourceOutput);
    if (!outputDef) return;

    // 如果类型兼容,添加边
    setEdges(eds => addEdge({
      ...connection,
      animated: true,
      markerEnd: { type: MarkerType.ArrowClosed },
      style: { stroke: "#6366f1" },
    }, eds));
  }, [nodes, nodeTypes]);

  return (
    <div style={{ height: "100vh", display: "flex" }}>
      {/* 左侧:节点面板 */}
      <NodePalette nodeTypes={nodeTypes} />

      {/* 中间:画布 */}
      <div style={{ flex: 1 }}>
        <ReactFlow
          nodes={nodes}
          edges={edges}
          onNodesChange={onNodesChange}
          onEdgesChange={onEdgesChange}
          onConnect={onConnect}
          onDragOver={onDragOver}
          onDrop={onDrop}
        >
          <Controls />
          <MiniMap />
          <Background />
        </ReactFlow>
      </div>

      {/* 右侧:节点配置面板 */}
      {selectedNode && (
        <NodeConfigPanel
          node={selectedNode}
          nodeType={nodeTypes.find(nt => nt.type === selectedNode.type)}
          onUpdate={(config) => updateNodeConfig(selectedNode.id, config)}
        />
      )}
    </div>
  );
}

节点配置面板:

// NodeConfigPanel.tsx
function NodeConfigPanel({ node, nodeType, onUpdate }) {
  if (!nodeType) return null;

  return (
    <div style={{ width: 320, borderLeft: "1px solid #e5e7eb", padding: 16 }}>
      <h3>{nodeType.display_name}</h3>
      <p style={{ color: "#6b7280", fontSize: 13 }}>{nodeType.description}</p>

      {nodeType.config_fields.map(field => (
        <div key={field.name} style={{ marginBottom: 12 }}>
          <label style={{ display: "block", fontSize: 13, fontWeight: 500 }}>
            {field.label}
            {field.required && <span style={{ color: "red" }}>*</span>}
          </label>

          {field.type === "select" && (
            <select
              value={node.data.config?.[field.name] ?? field.default}
              onChange={e => onUpdate({ [field.name]: e.target.value })}
              style={{ width: "100%", padding: 6, borderRadius: 6 }}
            >
              {field.options.map(opt => (
                <option key={opt} value={opt}>{opt}</option>
              ))}
            </select>
          )}
          {field.type === "string" && (
            <input
              type="text"
              value={node.data.config?.[field.name] ?? field.default ?? ""}
              onChange={e => onUpdate({ [field.name]: e.target.value })}
              style={{ width: "100%", padding: 6, borderRadius: 6 }}
            />
          )}
          {field.type === "integer" && (
            <input
              type="number"
              value={node.data.config?.[field.name] ?? field.default ?? 0}
              onChange={e => onUpdate({ [field.name]: parseInt(e.target.value) })}
              style={{ width: "100%", padding: 6, borderRadius: 6 }}
            />
          )}
        </div>
      ))}
    </div>
  );
}

8.3 运行时监控面板(DAG 执行可视化)

// PipelineMonitor.tsx
// 展示 DAG 执行状态:每个节点的颜色代表执行状态(pending/running/success/failed)

const NODE_STATUS_COLORS = {
  pending: "#e5e7eb",   // 灰
  running: "#60a5fa",   // 蓝(动画)
  success: "#34d399",   // 绿
  failed: "#f87171",     // 红
  skipped: "#fbbf24",   // 黄
};

function PipelineMonitor({ executionId }: { executionId: string }) {
  const [status, setStatus] = useState<Record<string, string>>({});

  // WebSocket 实时接收节点状态更新
  useEffect(() => {
    const ws = new WebSocket(`/ws/pipeline/execution/${executionId}`);
    ws.onmessage = (event) => {
      const data = JSON.parse(event.data);
      setStatus(prev => ({ ...prev, [data.node_id]: data.status }));
    };
    return () => ws.close();
  }, [executionId]);

  return (
    <div>
      <h3>Pipeline 执行监控</h3>
      <div style={{ display: "grid", gridTemplateColumns: "repeat(5, 1fr)", gap: 8 }}>
        {Object.entries(status).map(([nodeId, nodeStatus]) => (
          <div
            key={nodeId}
            style={{
              padding: 12,
              borderRadius: 8,
              background: NODE_STATUS_COLORS[nodeStatus],
              color: "white",
              textAlign: "center",
            }}
          >
            <div>{nodeId}</div>
            <div style={{ fontSize: 12 }}>{nodeStatus}</div>
          </div>
        ))}
      </div>
    </div>
  );
}

补充4:节点面板 API 协议 + WebSocket 监控协议 + React Flow 节点组件

补充4-A:GET /api/admin/node-types 完整响应 JSON 结构
{
  "node_types": [
    {
      "type": "converter.mineru",
      "version": "1",
      "category": "Converter",
      "display_name": "MinerU 文档转换器",
      "description": "使用 MinerU 引擎将 PDF/DOCX 等文档转换为 Markdown,支持表格、公式、图表",
      "icon": "📄",
      "input_defs": [
        {
          "name": "file",
          "type": "file",
          "required": true,
          "description": "原始文件路径(由 Source 节点提供)",
          "default": null
        }
      ],
      "output_defs": [
        {"name": "markdown", "type": "markdown", "description": "转换后的 Markdown 文本"},
        {"name": "toc", "type": "toc", "description": "目录结构(层级 JSON)"},
        {"name": "metadata", "type": "metadata", "description": "文档元数据(页数、字数、语言等)"}
      ],
      "config_defs": [
        {
          "name": "backend",
          "type": "select",
          "label": "转换引擎",
          "required": false,
          "default": "hybrid-auto-engine",
          "options": ["hybrid-auto-engine", "vlm-auto-engine", "pipeline"],
          "description": "MinerU 内部转换引擎"
        },
        {
          "name": "lang_list",
          "type": "list",
          "label": "支持语言",
          "required": false,
          "default": ["en", "zh"],
          "description": "文档语言列表"
        },
        {
          "name": "timeout",
          "type": "integer",
          "label": "超时(秒)",
          "required": false,
          "default": 300,
          "min": 60,
          "max": 1800,
          "description": "转换超时时间"
        }
      ]
    },
    {
      "type": "router.file_type",
      "version": "1",
      "category": "Router",
      "display_name": "文件类型路由器",
      "description": "根据 metadata.file_type 将内容路由到不同的分支处理路径",
      "icon": "🔀",
      "input_defs": [
        {"name": "metadata", "type": "metadata", "required": true, "description": "包含 file_type 的元数据"}
      ],
      "output_defs": [
        {"name": "pdf_branch", "type": "control", "description": "PDF 文件分支"},
        {"name": "docx_branch", "type": "control", "description": "Word 文件分支"},
        {"name": "md_branch", "type": "control", "description": "Markdown 文件分支"},
        {"name": "default_branch", "type": "control", "description": "默认分支(不匹配任何规则时)"}
      ],
      "config_defs": [
        {
          "name": "routes",
          "type": "list",
          "label": "路由规则",
          "required": false,
          "default": [],
          "description": "条件 → 分支映射列表"
        }
      ]
    }
  ],
  "categories": ["Source", "Converter", "Splitter", "Enricher", "Router", "Sink"],
  "total_count": 24,
  "generated_at": "2026-05-20T10:00:00Z"
}

API 端点说明:

方法 路径 说明
GET /api/admin/node-types 列出所有已注册节点类型(内置 + 第三方)
GET /api/admin/node-types?category=Converter 按分类过滤
GET /api/admin/node-types/{type} 获取特定节点详情(含完整 config_schema)
GET /api/admin/node-types/{type}/versions 获取该类型所有版本
补充4-B:WebSocket 实时执行进度协议

WebSocket 连接:

ws://ragateway-host/api/pipeline/executions/{execution_id}/ws

服务端 → 客户端事件格式(JSON):

// 事件1:Pipeline 启动
{
  "event": "execution_started",
  "execution_id": "exec-uuid-xxx",
  "pipeline_id": "pipeline-uuid-xxx",
  "total_nodes": 6,
  "started_at": "2026-05-20T10:30:00Z"
}

// 事件2:节点开始执行
{
  "event": "node_started",
  "execution_id": "exec-uuid-xxx",
  "node_id": "node-002",
  "node_name": "文档转换",
  "node_type": "converter.mineru",
  "layer_index": 1,
  "started_at": "2026-05-20T10:30:01Z"
}

// 事件3:节点执行成功
{
  "event": "node_completed",
  "execution_id": "exec-uuid-xxx",
  "node_id": "node-002",
  "node_type": "converter.mineru",
  "duration_ms": 8450,
  "outputs": {
    "markdown": {"item_count": 12, "total_chars": 45230},
    "toc": {"item_count": 1},
    "metadata": {"item_count": 1}
  }
}

// 事件4:节点执行失败
{
  "event": "node_failed",
  "execution_id": "exec-uuid-xxx",
  "node_id": "node-003",
  "node_type": "splitter.semantic",
  "error": "LLM timeout after 120s",
  "strategy_applied": "continue",
  "failed_at": "2026-05-20T10:30:15Z"
}

// 事件5:节点跳过(SKIP 策略)
{
  "event": "node_skipped",
  "execution_id": "exec-uuid-xxx",
  "node_id": "node-005",
  "reason": "Upstream node node-003 failed with strategy=skip",
  "skipped_at": "2026-05-20T10:30:16Z"
}

// 事件6:Pipeline 执行完成
{
  "event": "execution_completed",
  "execution_id": "exec-uuid-xxx",
  "status": "indexed",
  "total_duration_ms": 52340,
  "completed_nodes": 5,
  "failed_nodes": 1,
  "node_summary": [
    {"node_id": "node-001", "status": "success", "duration_ms": 120},
    {"node_id": "node-002", "status": "success", "duration_ms": 8450},
    {"node_id": "node-003", "status": "success", "duration_ms": 3200},
    {"node_id": "node-004", "status": "success", "duration_ms": 50},
    {"node_id": "node-005", "status": "failed", "duration_ms": 120000, "error": "timeout"},
    {"node_id": "node-006", "status": "skipped", "duration_ms": 0}
  ],
  "completed_at": "2026-05-20T10:30:52Z"
}

客户端 SDK 示例(前端消费 WebSocket 事件):

// ragateway-ui/src/hooks/usePipelineExecution.ts
import { useEffect, useRef, useState } from "react";

interface NodeStatus {
  nodeId: string;
  status: "pending" | "running" | "success" | "failed" | "skipped";
  startedAt?: string;
  durationMs?: number;
  error?: string;
}

interface ExecutionProgress {
  executionId: string;
  totalNodes: number;
  nodes: Record<string, NodeStatus>;
}

export function usePipelineExecution(executionId: string) {
  const [progress, setProgress] = useState<ExecutionProgress | null>(null);
  const wsRef = useRef<WebSocket | null>(null);

  useEffect(() => {
    if (!executionId) return;

    const protocol = window.location.protocol === "https:" ? "wss:" : "ws:";
    const wsUrl = `${protocol}//${window.location.host}/api/pipeline/executions/${executionId}/ws`;

    const ws = new WebSocket(wsUrl);
    wsRef.current = ws;

    ws.onopen = () => {
      console.log(`[WS] Connected to execution ${executionId}`);
    };

    ws.onmessage = (event) => {
      const data = JSON.parse(event.data);

      switch (data.event) {
        case "execution_started":
          setProgress({
            executionId: data.execution_id,
            totalNodes: data.total_nodes,
            nodes: {},
          });
          break;

        case "node_started":
          setProgress(prev => prev ? {
            ...prev,
            nodes: {
              ...prev.nodes,
              [data.node_id]: { nodeId: data.node_id, status: "running", startedAt: data.started_at },
            },
          } : prev);
          break;

        case "node_completed":
          setProgress(prev => prev ? {
            ...prev,
            nodes: {
              ...prev.nodes,
              [data.node_id]: {
                nodeId: data.node_id,
                status: "success",
                durationMs: data.duration_ms,
              },
            },
          } : prev);
          break;

        case "node_failed":
          setProgress(prev => prev ? {
            ...prev,
            nodes: {
              ...prev.nodes,
              [data.node_id]: {
                nodeId: data.node_id,
                status: "failed",
                error: data.error,
              },
            },
          } : prev);
          break;

        case "node_skipped":
          setProgress(prev => prev ? {
            ...prev,
            nodes: {
              ...prev.nodes,
              [data.node_id]: { nodeId: data.node_id, status: "skipped" },
            },
          } : prev);
          break;

        case "execution_completed":
          // 全部完成,可在 UI 显示最终汇总
          console.log(`[WS] Execution completed: ${data.status}`, data.node_summary);
          break;
      }
    };

    ws.onerror = (err) => {
      console.error("[WS] Error:", err);
    };

    ws.onclose = () => {
      console.log(`[WS] Disconnected from execution ${executionId}`);
    };

    return () => ws.close();
  }, [executionId]);

  return progress;
}
补充4-C:React Flow 自定义节点组件伪代码
// ragateway-ui/src/components/PipelineEditor/customNodes/
//   - SourceNode.tsx
//   - ConverterNode.tsx
//   - SplitterNode.tsx
//   - EnricherNode.tsx
//   - RouterNode.tsx  (多输出端口)
//   - SinkNode.tsx

import React, { memo } from "react";
import { Handle, Position, NodeProps } from "reactflow";
import { NODE_STATUS_COLORS } from "../PipelineMonitor";

// ── 基础节点组件模板 ──────────────────────────────────────────────────────
const BaseNode = memo(({ data, status, icon, label }: any) => {
  return (
    <div style={{
      background: "white",
      border: `2px solid ${NODE_STATUS_COLORS[status] || "#e5e7eb"}`,
      borderRadius: 10,
      minWidth: 160,
      boxShadow: "0 2px 8px rgba(0,0,0,0.08)",
      transition: "border-color 0.3s",
    }}>
      {/* 输入 Handle(左侧) */}
      <Handle
        type="target"
        position={Position.Left}
        style={{ background: "#6366f1", width: 10, height: 10 }}
      />

      {/* 节点头部 */}
      <div style={{
        padding: "8px 12px",
        borderBottom: "1px solid #f0f4f8",
        display: "flex",
        alignItems: "center",
        gap: 8,
        background: "#f8fafc",
        borderRadius: "8px 8px 0 0",
      }}>
        <span style={{ fontSize: 18 }}>{icon}</span>
        <span style={{ fontWeight: 600, fontSize: 13, color: "#334155" }}>{label}</span>
        {status === "running" && (
          <span style={{
            marginLeft: "auto",
            width: 8, height: 8,
            borderRadius: "50%",
            background: "#60a5fa",
            animation: "pulse 1s infinite",
          }} />
        )}
      </div>

      {/* 节点内容(config 摘要) */}
      <div style={{ padding: "8px 12px", fontSize: 12, color: "#64748b" }}>
        {Object.entries(data.config || {}).slice(0, 3).map(([k, v]) => (
          <div key={k}>
            <span style={{ color: "#94a3b8" }}>{k}:</span> {String(v).slice(0, 20)}
          </div>
        ))}
      </div>

      {/* 输出 Handle(右侧) */}
      <Handle
        type="source"
        position={Position.Right}
        style={{ background: "#6366f1", width: 10, height: 10 }}
      />
    </div>
  );
});

// ── Converter 节点 ────────────────────────────────────────────────────────
export const ConverterNode = memo((props: NodeProps) => (
  <BaseNode {...props} icon="📄" label={props.data.label || "Converter"} />
));

// ── Sink 节点(RAG 摄入) ──────────────────────────────────────────────────
export const SinkNode = memo((props: NodeProps) => (
  <BaseNode {...props} icon="📥" label={props.data.label || "Sink"} />
));

// ── Router 节点(多输出端口) ───────────────────────────────────────────────
export const RouterNode = memo((props: NodeProps) => {
  const outputBranches = props.data.outputBranches || ["output"];

  return (
    <div style={{
      background: "white",
      border: "2px solid #fbbf24",
      borderRadius: 10,
      minWidth: 180,
      boxShadow: "0 2px 8px rgba(0,0,0,0.08)",
    }}>
      {/* 多个输入 Handle */}
      <Handle
        type="target"
        position={Position.Left}
        id="input"
        style={{ background: "#6366f1", width: 10, height: 10 }}
      />

      {/* 节点头部 */}
      <div style={{
        padding: "8px 12px",
        background: "#fefce8",
        borderBottom: "1px solid #fef08a",
        borderRadius: "8px 8px 0 0",
        display: "flex",
        alignItems: "center",
        gap: 8,
      }}>
        <span style={{ fontSize: 18 }}>🔀</span>
        <span style={{ fontWeight: 600, fontSize: 13 }}>{props.data.label || "Router"}</span>
      </div>

      {/* 分支标签 */}
      <div style={{ padding: "6px 12px" }}>
        {outputBranches.map((branch: string, idx: number) => (
          <div key={branch} style={{
            display: "flex",
            alignItems: "center",
            marginBottom: 4,
            fontSize: 12,
            color: "#92400e",
          }}>
            <span style={{ color: "#fbbf24", marginRight: 6 }}>●</span>
            {branch}
          </div>
        ))}
      </div>

      {/* 每个分支的输出 Handle(右侧,多个) */}
      {outputBranches.map((branch: string, idx: number) => (
        <Handle
          key={branch}
          type="source"
          position={Position.Right}
          id={branch}
          style={{
            top: `${40 + idx * 28}px`,
            background: "#f59e0b",
            width: 10,
            height: 10,
          }}
        />
      ))}
    </div>
  );
});

// 节点类型注册(供 React Flow 使用)
export const CUSTOM_NODE_TYPES = {
  "converter.*": ConverterNode,
  "splitter.*": SplitterNode,
  "enricher.*": EnricherNode,
  "router.*":  RouterNode,
  "sink.*":    SinkNode,
  "source.*":  SourceNode,
};

9. 工作量评估

基于 1 名高级后端工程师 + 1 名前端工程师,分模块估算。

9.1 后端模块

模块 任务 人天
核心框架 NodeRegistry + 注册装饰器 + 内置节点骨架 3 天
DAGExecutor 拓扑排序、并行执行、超时控制 4 天
VariablePool 变量池实现 + PipelineItem 数据结构 2 天
错误处理 三策略(Stop/Continue/Skip)+ 重试逻辑 2 天
内置节点:Source/Converter source.file_store, converter.mineru/docling/markitdown 3 天
内置节点:Splitter/Enricher splitter.recursive/semantic/fixed, enricher.metadata 3 天
内置节点:Router router.file_type, router.metadata, router.if_else 3 天
内置节点:Sink sink.ragflow_naive, sink.ragflow_graphrag, sink.neo4j_wiki 3 天
API 层 Pipeline CRUD + activate + node-types 端点 3 天
数据库 tenant_pipelines 表 + pipeline_executions 表 2 天
V27 兼容层 V27_DEFAULT_PIPELINE + 配置迁移函数 2 天
集成测试 Pipeline 端到端测试 + 异常测试 3 天
合计后端 33 天

9.2 前端模块

模块 任务 人天
React Flow 集成 画布、拖拽、节点移动、保存/加载 4 天
节点面板 /api/admin/node-types 动态加载 + 分类展示 2 天
节点配置面板 动态表单(select/string/integer) 3 天
连接校验 类型兼容性检查 + 非法连接过滤 2 天
Router 节点特殊 UI 多输出端口 + 分支颜色标识 2 天
Pipeline 列表页 CRUD 列表 + 激活状态展示 2 天
执行监控面板 WebSocket 实时状态 + 节点颜色变化 3 天
DAG 导入/导出 JSON 导出 + 导入验证 2 天
合计前端 20 天

9.3 总工作量

角色 人天
高级后端工程师 33 天
前端工程师 20 天
总计 53 人天(约 10.6 周)

10. 变更记录

版本 日期 作者 变更内容
V28 初稿 2026-05-20 技术架构师 初始版本,基于 n8n/dify/ComfyUI 源码研究

11. 典型使用场景(RegDesk 客户 Pipeline 配置)

以下列举 3 个 RegDesk 客户的典型 Pipeline 配置,分别对应不同的业务场景与优先级。

场景1:510(k) 文件处理(质量优先)

业务背景: 某医疗器械客户处理 FDA 510(k) 申报文档,PDF 格式,包含复杂表格、公式、图表。要求:
- 完整保留原文结构(表格、标题层级)
- 支持语义分块(按段落/Section 切分)
- 同时建立 NaiveRAG(精准匹配)和 GraphRAG General(全局关联)

Pipeline 配置:

{
  "version": "1",
  "pipeline_id": "pipeline-510k-quality",
  "tenant_id": "tenant-abc-123",
  "name": "510(k) 文档处理(质量优先)",
  "description": "用于 FDA 510(k) 申报文档的高质量处理流程",
  "config": {
    "max_concurrency": 2,
    "timeout_seconds": 900,
    "retry_on_node_fail": true,
    "max_retries": 2,
    "continue_on_error": false
  },
  "nodes": [
    {
      "id": "src-001",
      "type": "source.file_store",
      "version": "1",
      "name": "文件来源",
      "position": {"x": 0, "y": 200},
      "inputs": {},
      "outputs": ["file"],
      "config": {}
    },
    {
      "id": "conv-001",
      "type": "converter.mineru",
      "version": "1",
      "name": "MinerU 转换(表格+公式)",
      "position": {"x": 250, "y": 200},
      "inputs": {"file": {"from_node": "src-001", "from_output": "file"}},
      "outputs": ["markdown", "toc", "metadata"],
      "config": {
        "backend": "hybrid-auto-engine",
        "lang_list": ["en"],
        "timeout": 600,
        "extract_tables": true,
        "extract_formulas": true,
        "preserve_layout": true
      }
    },
    {
      "id": "enr-001",
      "type": "enricher.metadata",
      "version": "1",
      "name": "元数据提取",
      "position": {"x": 500, "y": 200},
      "inputs": {
        "text": {"from_node": "conv-001", "from_output": "markdown"},
        "metadata": {"from_node": "conv-001", "from_output": "metadata"}
      },
      "outputs": ["metadata"],
      "config": {
        "extract_fields": ["title", "date", "version", "document_id"],
        "language_detection": {"enabled": true, "default_lang": "en"}
      }
    },
    {
      "id": "split-001",
      "type": "splitter.semantic",
      "version": "1",
      "name": "语义分块(512 tokens)",
      "position": {"x": 750, "y": 200},
      "inputs": {"text": {"from_node": "conv-001", "from_output": "markdown"}},
      "outputs": ["chunks"],
      "config": {
        "strategy": "semantic",
        "chunk_size": 512,
        "chunk_overlap": 64,
        "embedding_model": "bge-m3",
        "split_by": "sentence"
      }
    },
    {
      "id": "enr-002",
      "type": "enricher.chunk_meta",
      "version": "1",
      "name": "Chunk 元数据注入",
      "position": {"x": 1000, "y": 200},
      "inputs": {
        "chunks": {"from_node": "split-001", "from_output": "chunks"},
        "metadata": {"from_node": "enr-001", "from_output": "metadata"}
      },
      "outputs": ["chunks"],
      "config": {
        "inject_fields": ["document_id", "title", "language", "chunk_index"]
      }
    },
    {
      "id": "sink-naive",
      "type": "sink.ragflow_naive",
      "version": "1",
      "name": "NaiveRAG 摄入",
      "position": {"x": 1250, "y": 100},
      "inputs": {
        "chunks": {"from_node": "enr-002", "from_output": "chunks"},
        "metadata": {"from_node": "enr-001", "from_output": "metadata"}
      },
      "outputs": ["result"],
      "config": {"enabled": true, "dedup": true}
    },
    {
      "id": "sink-graphrag",
      "type": "sink.ragflow_graphrag",
      "version": "1",
      "name": "GraphRAG General 摄入",
      "position": {"x": 1250, "y": 300},
      "inputs": {
        "markdown": {"from_node": "conv-001", "from_output": "markdown"},
        "metadata": {"from_node": "enr-001", "from_output": "metadata"}
      },
      "outputs": ["result"],
      "config": {"source": "ragflow_general", "enabled": true}
    }
  ],
  "edges": [
    {"id": "e1", "source": "src-001", "source_output": "file", "target": "conv-001", "target_input": "file"},
    {"id": "e2", "source": "conv-001", "source_output": "markdown", "target": "enr-001", "target_input": "text"},
    {"id": "e3", "source": "conv-001", "source_output": "metadata", "target": "enr-001", "target_input": "metadata"},
    {"id": "e4", "source": "conv-001", "source_output": "markdown", "target": "split-001", "target_input": "text"},
    {"id": "e5", "source": "enr-001", "source_output": "metadata", "target": "enr-002", "target_input": "metadata"},
    {"id": "e6", "source": "split-001", "source_output": "chunks", "target": "enr-002", "target_input": "chunks"},
    {"id": "e7", "source": "enr-002", "source_output": "chunks", "target": "sink-naive", "target_input": "chunks"},
    {"id": "e8", "source": "enr-001", "source_output": "metadata", "target": "sink-naive", "target_input": "metadata"},
    {"id": "e9", "source": "conv-001", "source_output": "markdown", "target": "sink-graphrag", "target_input": "markdown"},
    {"id": "e10", "source": "enr-001", "source_output": "metadata", "target": "sink-graphrag", "target_input": "metadata"}
  ]
}

节点执行层级(Kahn 拓扑排序结果):
- Layer 0: src-001
- Layer 1: conv-001
- Layer 2: enr-001, split-001(并行)
- Layer 3: enr-002
- Layer 4: sink-naive, sink-graphrag(并行)


场景2:法规标准处理(速度优先)

业务背景: 某客户每日需处理数百份各国法规标准 PDF(ISO/IEC/AAMI 等),以"快速入库、尽早可查"为目标。侧重处理速度,牺牲部分精度。

Pipeline 配置:

{
  "version": "1",
  "pipeline_id": "pipeline-regulatory-speed",
  "tenant_id": "tenant-def-456",
  "name": "法规标准处理(速度优先)",
  "description": "快速处理大量法规标准文档,以入库速度为优先",
  "config": {
    "max_concurrency": 8,
    "timeout_seconds": 300,
    "retry_on_node_fail": true,
    "max_retries": 1,
    "continue_on_error": true
  },
  "nodes": [
    {
      "id": "src-001",
      "type": "source.file_store",
      "version": "1",
      "name": "文件来源",
      "position": {"x": 0, "y": 200},
      "inputs": {},
      "outputs": ["file"],
      "config": {}
    },
    {
      "id": "conv-001",
      "type": "converter.mineru",
      "version": "1",
      "name": "MinerU 快速转换",
      "position": {"x": 250, "y": 200},
      "inputs": {"file": {"from_node": "src-001", "from_output": "file"}},
      "outputs": ["markdown", "metadata"],
      "config": {
        "backend": "pipeline",
        "lang_list": ["en", "zh"],
        "timeout": 120,
        "skip_tables": false
      }
    },
    {
      "id": "split-001",
      "type": "splitter.fixed",
      "version": "1",
      "name": "固定分块(256字符)",
      "position": {"x": 500, "y": 200},
      "inputs": {"text": {"from_node": "conv-001", "from_output": "markdown"}},
      "outputs": ["chunks"],
      "config": {
        "strategy": "fixed",
        "chunk_size": 256,
        "chunk_overlap": 32,
        "split_by": "character"
      }
    },
    {
      "id": "sink-graphrag-light",
      "type": "sink.ragflow_graphrag",
      "version": "1",
      "name": "GraphRAG Light 摄入",
      "position": {"x": 750, "y": 200},
      "inputs": {
        "markdown": {"from_node": "conv-001", "from_output": "markdown"},
        "metadata": {"from_node": "conv-001", "from_output": "metadata"}
      },
      "outputs": ["result"],
      "config": {
        "source": "ragflow_light",
        "enabled": true
      }
    }
  ],
  "edges": [
    {"id": "e1", "source": "src-001", "source_output": "file", "target": "conv-001", "target_input": "file"},
    {"id": "e2", "source": "conv-001", "source_output": "markdown", "target": "split-001", "target_input": "text"},
    {"id": "e3", "source": "conv-001", "source_output": "markdown", "target": "sink-graphrag-light", "target_input": "markdown"},
    {"id": "e4", "source": "conv-001", "source_output": "metadata", "target": "sink-graphrag-light", "target_input": "metadata"}
  ]
}

优化策略说明:
- max_concurrency: 8:高并发处理,充分利用 CPU
- splitter.fixed:固定 256 字符分块,速度远快于 semantic 分块(无需 LLM 调用)
- GraphRAG Light:跳过 Community Detection,直接建图建向量,速度提升 3-5x
- continue_on_error: true:个别文件失败不影响整体批次


场景3:Markdown 知识库(内部文档)

业务背景: 某客户的内部 SOP、技术规范以 Markdown 格式存储,无需 PDF 转换,直接分块入库。

Pipeline 配置:

{
  "version": "1",
  "pipeline_id": "pipeline-markdown-kb",
  "tenant_id": "tenant-ghi-789",
  "name": "Markdown 知识库处理",
  "description": "内部 Markdown 文档知识库,无需格式转换",
  "config": {
    "max_concurrency": 4,
    "timeout_seconds": 120,
    "retry_on_node_fail": false,
    "max_retries": 0,
    "continue_on_error": false
  },
  "nodes": [
    {
      "id": "src-001",
      "type": "source.file_store",
      "version": "1",
      "name": "文件来源",
      "position": {"x": 0, "y": 200},
      "inputs": {},
      "outputs": ["file"],
      "config": {"file_pattern": "*.md"}
    },
    {
      "id": "conv-001",
      "type": "converter.skip",
      "version": "1",
      "name": "跳过转换(.md 直传)",
      "position": {"x": 250, "y": 200},
      "inputs": {"file": {"from_node": "src-001", "from_output": "file"}},
      "outputs": ["markdown", "metadata"],
      "config": {
        "pass_through": true,
        "extract_frontmatter": true
      }
    },
    {
      "id": "enr-001",
      "type": "enricher.metadata",
      "version": "1",
      "name": "Markdown 元数据提取",
      "position": {"x": 500, "y": 200},
      "inputs": {
        "text": {"from_node": "conv-001", "from_output": "markdown"},
        "metadata": {"from_node": "conv-001", "from_output": "metadata"}
      },
      "outputs": ["metadata"],
      "config": {
        "extract_fields": ["title", "date", "author", "tags"],
        "language_detection": {"enabled": true, "default_lang": "auto"}
      }
    },
    {
      "id": "split-001",
      "type": "splitter.fixed",
      "version": "1",
      "name": "Markdown 固定分块(1024字符)",
      "position": {"x": 750, "y": 200},
      "inputs": {"text": {"from_node": "conv-001", "from_output": "markdown"}},
      "outputs": ["chunks"],
      "config": {
        "strategy": "fixed",
        "chunk_size": 1024,
        "chunk_overlap": 128,
        "split_by": "markdown-header"
      }
    },
    {
      "id": "enr-002",
      "type": "enricher.chunk_meta",
      "version": "1",
      "name": "Chunk 标题注入",
      "position": {"x": 1000, "y": 200},
      "inputs": {
        "chunks": {"from_node": "split-001", "from_output": "chunks"},
        "metadata": {"from_node": "enr-001", "from_output": "metadata"}
      },
      "outputs": ["chunks"],
      "config": {
        "inject_fields": ["title", "author", "language"]
      }
    },
    {
      "id": "sink-naive",
      "type": "sink.ragflow_naive",
      "version": "1",
      "name": "NaiveRAG 摄入",
      "position": {"x": 1250, "y": 200},
      "inputs": {
        "chunks": {"from_node": "enr-002", "from_output": "chunks"},
        "metadata": {"from_node": "enr-001", "from_output": "metadata"}
      },
      "outputs": ["result"],
      "config": {"enabled": true}
    }
  ],
  "edges": [
    {"id": "e1", "source": "src-001", "source_output": "file", "target": "conv-001", "target_input": "file"},
    {"id": "e2", "source": "conv-001", "source_output": "markdown", "target": "enr-001", "target_input": "text"},
    {"id": "e3", "source": "conv-001", "source_output": "metadata", "target": "enr-001", "target_input": "metadata"},
    {"id": "e4", "source": "conv-001", "source_output": "markdown", "target": "split-001", "target_input": "text"},
    {"id": "e5", "source": "enr-001", "source_output": "metadata", "target": "enr-002", "target_input": "metadata"},
    {"id": "e6", "source": "split-001", "source_output": "chunks", "target": "enr-002", "target_input": "chunks"},
    {"id": "e7", "source": "enr-002", "source_output": "chunks", "target": "sink-naive", "target_input": "chunks"},
    {"id": "e8", "source": "enr-001", "source_output": "metadata", "target": "sink-naive", "target_input": "metadata"}
  ]
}

关键节点说明:
- converter.skip:跳过转换步骤,文件直接作为 Markdown 传递(节省 MinerU 转换时间)
- split_by: markdown-header:按 Markdown 标题(######)分割,保持文档结构完整性
- chunk_size: 1024:Markdown 文档每块较大,因为 Markdown 格式已天然分节
- 仅 NaiveRAG:内部知识库以关键词精确匹配为主,不需要 GraphRAG 的全局关联能力


附录 A:研究笔记

A.1 n8n 关键源码文件

文件 用途
packages/workflow/src/interfaces.ts INode, IConnections, IWorkflowBase 接口定义
packages/workflow/src/workflow.ts Workflow 类,节点图操作
packages/core/src/executors/workflow-execute.ts WorkflowExecute 执行引擎
packages/workflow/src/errors/ 错误类型定义

A.2 dify 关键源码文件

文件 用途
api/core/workflow/node_factory.py resolve_workflow_node_class, 节点工厂
api/core/workflow/workflow_entry.py WorkflowEntry, SingleNodeGraphDict
api/core/workflow/nodes/knowledge_retrieval/knowledge_retrieval_node.py KnowledgeRetrievalNode 实现参考
graphon/nodes/base/node.py Node 基类(dify 重构后的 graphon 子系统)

A.3 ComfyUI 关键设计

设计 位置 借鉴点
NODE_CLASS_MAPPINGS 全局字典 装饰器注册
INPUT_TYPES 类方法 强类型约束
RETURN_TYPES 类属性 输出类型声明
model_management.py 执行缓存 节点结果缓存机制