完成时间: 2026-05-20
研究范围: n8n workflow 执行引擎 · dify workflow/core/workflow 节点系统 · ComfyUI 节点注册与图执行引擎
研究者: 技术架构师(基于源码研究,非单纯介绍性文档)
V27 的数据处理流程是硬编码的:
文件落盘 → DocumentConv → [NaiveRAG | GraphRAG×N](并行)
这一结构存在以下问题:
- 新增文档处理节点(如 OCR、翻译、结构化提取)需要修改核心代码
- 不同租户无法拥有不同的处理流程
- 无法按文件类型动态路由到不同处理分支
- 流程不可视化,管理员无法自行调整
将硬编码的文档处理流程替换为可视化、可配置的 DAG Pipeline 引擎,允许 RegDesk 管理员:
| 约束 | 说明 |
|---|---|
| Pipeline 层级 | 对「数据内容」的处理编排,不是「上传方式」的配置 |
| 输入边界 | File Store 之后(已落盘文件的内容) |
| 粒度 | 租户级(每个 tenant 有自己的 default pipeline) |
| 节点扩展 | 内置节点 + 第三方注册扩展 |
| 目标用户 | RegDesk 管理员(技术向,不是终端客户) |
| 兼容性 | 必须向后兼容 V27(无 DAG 配置时走 V27 默认流程) |
| 技术栈 | Python 3.12+,asyncio,原有 FastAPI 框架 |
┌─────────────────────────────────────────────────────────────┐
│ ragateway V27 │
│ ┌─────────────┐ ┌──────────────────────────────────┐ │
│ │ File Store │───▶│ PipelineExecutor (NEW) │ │
│ └─────────────┘ │ ┌─────────────────────────────┐ │ │
│ │ │ DAG: nodes + edges JSON │ │ │
│ │ │ 执行引擎(asyncio DAG walk)│ │ │
│ │ └─────────────────────────────┘ │ │
│ └──────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘
PipelineExecutor 替换了 V27 中硬编码的:
- DocumentConverterPlugin 调用
- NaiveRAGPlugin.ingest() 调用
- GraphRAGPlugin[n].ingest() 调用
n8n 是什么: 开源工作流自动化平台,Node.js/TypeScript 实现,支持复杂条件分支、重试、超时、凭证管理等企业级特性。生态有数千个社区节点。
核心借鉴:
// 来自 n8n/packages/workflow/src/interfaces.ts
interface INode {
id: string;
name: string;
type: string; // 节点类型标识符,如 "n8n-nodes-base.httpRequest"
typeVersion: number; // 节点版本号
position: [number, number]; // UI 画布坐标
disabled?: boolean;
retryOnFail?: boolean;
maxTries?: number;
parameters: INodeParameters;
}
interface IConnections {
[nodeName: string]: { // 输出节点名
[inputName: string]: Array<{
node: string; // 目标节点名
type: string; // 连接类型(如 "main")
index: number; // 目标输入端口索引
}>;
};
}
借鉴点:
- 节点与连接分离的图结构(nodes + connections)
- 节点参数与类型解耦(type 不包含参数,parameters 才是配置)
- type + typeVersion 支持节点版本控制
n8n 的 WorkflowExecute 执行引擎核心逻辑:
1. 根据 connections 构建邻接表
2. 对每个节点执行 runNode(),支持并行执行同一层级的节点
3. 通过 waitOnFail / retryOnFail 处理节点失败
4. 通过 continueErrorOutput / continueRegularOutput / stopWorkflow 控制错误路由
借鉴点: 节点失败处理的三种策略(继续、跳过、停止)。
| 原因 | 说明 |
|---|---|
| 技术栈不匹配 | n8n 是 Node.js/TypeScript,ragateway 是 Python FastAPI |
| 面向场景不同 | n8n 面向工作流自动化(HTTP 请求、数据库操作),不是文档处理管道 |
| 过度设计 | n8n 有完整的凭证管理、webhook、trigger 系统,ragateway 只需要 DAG 执行 |
| 部署复杂度 | 引入 n8n 意味着多一个 Node.js 服务,运维成本高 |
结论: 借鉴 n8n 的 JSON Schema 结构、节点版本化设计、错误处理策略,但自研 Python 执行引擎。
dify 是什么: 开源 LLM 应用开发平台(Python),有完整的 workflow 系统,支持知识库检索、LLM 调用、条件分支、代码执行等。
核心借鉴:
dify 内部将 workflow 引擎重构为独立的 graphon 子系统:
# 来自 dify api/core/workflow/nodes/knowledge_retrieval/knowledge_retrieval_node.py
class KnowledgeRetrievalNode(LLMUsageTrackingMixin, Node[KnowledgeRetrievalNodeData]):
node_type = BuiltinNodeTypes.KNOWLEDGE_RETRIEVAL
def _run(self) -> NodeRunResult:
...
class _NodeConfigDict(TypedDict):
id: str
width: int
height: int
type: str
data: dict[str, Any]
class _EdgeConfigDict(TypedDict):
source: str
target: str
sourceHandle: str
targetHandle: str
def resolve_workflow_node_class(*, node_type: NodeType, node_version: str) -> type[Node]:
node_mapping = get_node_type_classes_mapping().get(node_type)
latest_node_class = node_mapping.get(LATEST_VERSION)
matched_node_class = node_mapping.get(node_version)
return matched_node_class or latest_node_class
dify 的节点间数据传递通过 VariablePool:
- 变量标识:node_id:output_name
- 节点 A 输出 → VariablePool → 节点 B 从 VariablePool 读取
| 原因 | 说明 |
|---|---|
| 领域不匹配 | dify 的 workflow 是「LLM 应用编排」,不是「文档处理管道」 |
| RAG 对接成本高 | dify 的 Knowledge Retrieval Node 对接 dify 自己的知识库,不是 RAGFlow/Neo4j |
| 依赖过多 | dify 依赖其自身的数据库 schema、模型管理、向量库,耦合太重 |
结论: 借鉴 dify 的节点基类设计(Node[NodeData] ABC)、节点工厂注册模式、VariablePool 变量池概念。自研时保持简化。
ComfyUI 是什么: 最强大的模块化 AI 生成引擎,以工作流图为核心,拥有最大的自定义节点生态系统。
核心借鉴:
ComfyUI 的节点通过全局字典注册:
NODE_CLASS_MAPPINGS = {
"LoadImage": LoadImage,
"KSampler": KSampler,
"CLIPTextEncode": CLIPTextEncode,
}
OUTPUT_TYPES = {
"LoadImage": {"images": ["IMAGE"]},
}
借鉴点:
- 装饰器注册 + 全局字典:简单却极其强大
- 第三方节点只需 import 并追加到 NODE_CLASS_MAPPINGS,无需修改核心代码
- INPUT_TYPES / OUTPUT_TYPES 强类型定义输入输出约束
class LoadImage:
@classmethod
def INPUT_TYPES(cls):
return {
"required": {
"image": ("IMAGE",),
},
"optional": {
"channel": (["alpha", "RGB"],),
}
}
RETURN_TYPES = ("IMAGE", "MASK")
FUNCTION = "load_image"
借鉴点:
- INPUT_TYPES 强制输入类型,UI 编辑器只允许连接兼容的输出端
- 类型不匹配时 UI 直接不显示连接线,降低用户错误率
| 特性 | 效果 |
|---|---|
| 极简注册 | 一个 Python 文件 + 两行注册代码 = 一个新节点 |
| 强类型约束 | 类型不匹配时 UI 直接不显示连接线 |
| 无中心注册表 | 节点不依赖中央注册表,通过 import 链式注册 |
| 纯 Python | 不需要额外配置,纯代码即可扩展 |
| 原因 | 说明 |
|---|---|
| 同步执行模型 | ComfyUI 是同步 Python,不适合 asyncio 异步场景 |
| 无并行分支 | ComfyUI 图执行是严格拓扑排序,没有真正的并行执行 |
| 无错误路由 | ComfyUI 不支持条件分支/错误处理 |
结论: 借鉴 ComfyUI 的节点注册装饰器、INPUT_TYPES/OUTPUT_TYPES 强类型系统。
| 维度 | n8n | dify | ComfyUI | ragateway-pipeline V28 |
|---|---|---|---|---|
| 技术栈 | TypeScript | Python | Python | Python |
| 数据传递 | item 数组流 | VariablePool | slot 类型系统 | PipelineItem + VariablePool |
| 图结构 | {nodes, connections} |
{nodes, edges} |
{nodes, links} |
{nodes, edges} |
| 节点注册 | INodeType 接口 | Node 类继承 | 全局字典 | 装饰器注册 |
| 类型约束 | 无 | 无 | 强类型 | 强类型 |
| 并行分支 | asyncio.gather | graphon engine | 拓扑+缓存 | asyncio 并行层 |
| 条件路由 | 错误输出策略 | If/Else 节点 | 无 | Router 节点 |
| 节点版本 | type + typeVersion | type + version | 无 | type + version |
| 第三方扩展 | npm 包 | Python 包 | pip 包 | pip 包 + 热加载 |
最终设计决策:
- 图结构: 融合 dify 的 {nodes, edges} + ComfyUI 的强类型
- 节点注册: ComfyUI 装饰器模式 + dify 的 Node 基类
- 数据传递: dify VariablePool 概念 + n8n item 流的简化版(PipelineItem)
- 并行执行: n8n 的层级并行 + asyncio
- 错误处理: n8n 三策略
{
"$schema": "https://ragateway.regdesk.co/schemas/pipeline-v1.json",
"version": "1",
"pipeline_id": "pipeline-uuid-xxx",
"tenant_id": "tenant-uuid-xxx",
"name": "RegDesk 标准文档处理 Pipeline",
"description": "处理 RegDesk 医疗设备文档的标准流程",
"config": {
"max_concurrency": 4,
"timeout_seconds": 300,
"retry_on_node_fail": true,
"max_retries": 2,
"continue_on_error": false
},
"nodes": [
{
"id": "node-001",
"type": "source.file_store",
"version": "1",
"name": "文件来源",
"position": {"x": 0, "y": 100},
"inputs": {},
"outputs": ["file"],
"config": {}
},
{
"id": "node-002",
"type": "converter.mineru",
"version": "1",
"name": "文档转换",
"position": {"x": 250, "y": 100},
"inputs": {
"file": {"from_node": "node-001", "from_output": "file"}
},
"outputs": ["markdown", "toc", "metadata"],
"config": {
"backend": "hybrid-auto-engine",
"lang_list": ["en", "zh"],
"timeout": 300
}
},
{
"id": "node-003",
"type": "splitter.recursive",
"version": "1",
"name": "文本分块",
"position": {"x": 500, "y": 100},
"inputs": {
"text": {"from_node": "node-002", "from_output": "markdown"}
},
"outputs": ["chunks"],
"config": {
"chunk_size": 512,
"chunk_overlap": 64
}
},
{
"id": "node-004",
"type": "router.file_type",
"version": "1",
"name": "文件类型路由",
"position": {"x": 500, "y": 280},
"inputs": {
"metadata": {"from_node": "node-002", "from_output": "metadata"}
},
"outputs": ["pdf_branch", "docx_branch", "default_branch"],
"config": {
"routes": [
{"condition": "file_type == 'pdf'", "output": "pdf_branch"},
{"condition": "file_type in ['docx','doc']", "output": "docx_branch"}
],
"default_output": "default_branch"
}
},
{
"id": "node-005",
"type": "sink.ragflow_naive",
"version": "1",
"name": "NaiveRAG 摄入",
"position": {"x": 750, "y": 50},
"inputs": {
"chunks": {"from_node": "node-003", "from_output": "chunks"},
"metadata": {"from_node": "node-002", "from_output": "metadata"}
},
"outputs": ["result"],
"config": {"enabled": true}
},
{
"id": "node-006",
"type": "sink.ragflow_graphrag",
"version": "1",
"name": "GraphRAG 摄入",
"position": {"x": 750, "y": 200},
"inputs": {
"markdown": {"from_node": "node-002", "from_output": "markdown"},
"metadata": {"from_node": "node-002", "from_output": "metadata"}
},
"outputs": ["result"],
"config": {"source": "ragflow_general", "enabled": true}
}
],
"edges": [
{"id": "e1", "source": "node-001", "source_output": "file", "target": "node-002", "target_input": "file"},
{"id": "e2", "source": "node-002", "source_output": "markdown", "target": "node-003", "target_input": "text"},
{"id": "e3", "source": "node-002", "source_output": "toc", "target": "node-004", "target_input": "toc"},
{"id": "e4", "source": "node-002", "source_output": "metadata", "target": "node-004", "target_input": "metadata"}
]
}
JSON Schema:
{
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "RagatewayPipeline",
"type": "object",
"required": ["version", "pipeline_id", "nodes", "edges"],
"properties": {
"version": {"type": "string", "const": "1"},
"pipeline_id": {"type": "string"},
"tenant_id": {"type": "string"},
"name": {"type": "string", "maxLength": 255},
"description": {"type": "string", "maxLength": 1000},
"config": {
"type": "object",
"properties": {
"max_concurrency": {"type": "integer", "minimum": 1, "maximum": 16, "default": 4},
"timeout_seconds": {"type": "integer", "minimum": 10, "maximum": 3600, "default": 300},
"retry_on_node_fail": {"type": "boolean", "default": true},
"max_retries": {"type": "integer", "minimum": 0, "maximum": 5, "default": 2},
"continue_on_error": {"type": "boolean", "default": false}
}
},
"nodes": {
"type": "array",
"items": {
"type": "object",
"required": ["id", "type", "version", "name", "inputs", "outputs", "config"],
"properties": {
"id": {"type": "string"},
"type": {"type": "string", "pattern": "^[a-z]+\\.[a-z_]+$"},
"version": {"type": "string"},
"name": {"type": "string"},
"position": {"type": "object", "properties": {"x": {"type": "number"}, "y": {"type": "number"}}},
"inputs": {"type": "object", "additionalProperties": {
"type": "object", "required": ["from_node", "from_output"],
"properties": {"from_node": {"type": "string"}, "from_output": {"type": "string"}}
}},
"outputs": {"type": "array", "items": {"type": "string"}},
"config": {"type": "object"}
}
}
},
"edges": {
"type": "array",
"items": {
"type": "object",
"required": ["id", "source", "source_output", "target", "target_input"],
"properties": {
"id": {"type": "string"},
"source": {"type": "string"},
"source_output": {"type": "string"},
"target": {"type": "string"},
"target_input": {"type": "string"}
}
}
}
}
}
{
"id": "node-router-001",
"type": "router.file_type",
"version": "1",
"name": "文件类型路由",
"position": {"x": 400, "y": 200},
"inputs": {
"metadata": {"from_node": "node-002", "from_output": "metadata"}
},
"outputs": ["pdf_branch", "docx_branch", "md_branch", "default_branch"],
"config": {
"routes": [
{"condition": "file_type == 'pdf'", "output": "pdf_branch"},
{"condition": "file_type in ['docx','doc']", "output": "docx_branch"},
{"condition": "file_type == 'md'", "output": "md_branch"}
],
"default_output": "default_branch"
}
}
{
"id": "node-splitter-001",
"type": "splitter.semantic",
"version": "1",
"name": "语义分块",
"position": {"x": 500, "y": 150},
"inputs": {
"text": {"from_node": "node-002", "from_output": "markdown"}
},
"outputs": ["chunks"],
"config": {
"strategy": "semantic",
"chunk_size": 512,
"chunk_overlap": 64,
"min_chunk_size": 50,
"max_chunk_size": 2048,
"embedding_model": "bge-m3",
"split_by": "sentence",
"language": "auto"
}
}
config_schema 说明(JSON Schema 定义):
{
"chunk_size": {
"type": "integer",
"minimum": 50,
"maximum": 4096,
"default": 512,
"description": "每个 chunk 的最大 token 数(估算)"
},
"chunk_overlap": {
"type": "integer",
"minimum": 0,
"maximum": 512,
"default": 64,
"description": "相邻 chunk 之间的重叠 token 数"
},
"strategy": {
"type": "string",
"enum": ["fixed", "semantic", "sentence", "recursive"],
"default": "recursive",
"description": "分块策略:fixed(固定长度) | semantic(语义/LLM驱动) | sentence(按句子) | recursive(递归字符)"
},
"embedding_model": {
"type": "string",
"default": "bge-m3",
"description": "语义分块使用的 embedding 模型"
},
"split_by": {
"type": "string",
"enum": ["character", "sentence", "paragraph", "markdown-header"],
"default": "character",
"description": "递归分块的最小分割单位"
}
}
{
"id": "node-enricher-001",
"type": "enricher.metadata",
"version": "1",
"name": "元数据增强",
"position": {"x": 500, "y": 100},
"inputs": {
"text": {"from_node": "node-002", "from_output": "markdown"},
"metadata": {"from_node": "node-002", "from_output": "metadata"}
},
"outputs": ["metadata"],
"config": {
"extract_fields": ["title", "author", "date", "version"],
"language_detection": {
"enabled": true,
"default_lang": "en",
"fallback_lang": "en"
},
"custom_fields": [
{"key": "document_source", "value": "mineru"},
{"key": "processing_version", "value": "v1"}
]
}
}
Enricher 节点 config_schema:
{
"extract_fields": {
"type": "array",
"items": {"type": "string"},
"default": ["title", "date"],
"description": "从文本内容中提取的元数据字段"
},
"language_detection": {
"type": "object",
"properties": {
"enabled": {"type": "boolean", "default": true},
"default_lang": {"type": "string", "default": "en"},
"fallback_lang": {"type": "string", "default": "en"}
},
"description": "语言检测配置,检测结果写入 metadata.language"
},
"custom_fields": {
"type": "array",
"items": {
"type": "object",
"properties": {
"key": {"type": "string"},
"value": {"type": "string"}
}
},
"description": "自定义静态元数据字段,直接注入"
}
}
| 类别 | 前缀 | 说明 | 示例 |
|---|---|---|---|
| Source | source.* |
管道入口,从 File Store 读取文件 | source.file_store |
| Converter | converter.* |
文档格式转换 | converter.mineru, converter.docling, converter.markitdown |
| Splitter | splitter.* |
文本/内容分割 | splitter.recursive, splitter.semantic, splitter.fixed |
| Enricher | enricher.* |
内容增强/处理 | enricher.metadata, enricher.ocr, enricher.translate |
| Router | router.* |
条件路由/并行分支 | router.file_type, router.metadata, router.if_else |
| Sink | sink.* |
输出到外部系统 | sink.ragflow_naive, sink.ragflow_graphrag, sink.neo4j_wiki |
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from typing import Any, TypedDict
class IOType(TypedDict):
"""节点输入/输出类型定义,参考 ComfyUI INPUT_TYPES"""
name: str
display_name: str
description: str
@dataclass
class NodeInputDef:
"""节点输入定义"""
name: str
type: str
required: bool = True
description: str = ""
default: Any = None
@dataclass
class NodeOutputDef:
"""节点输出定义"""
name: str
type: str
description: str = ""
@dataclass
class NodeConfigField:
"""节点配置字段定义"""
name: str
type: str # "string" | "integer" | "boolean" | "select" | "list"
label: str
required: bool = False
default: Any = None
options: list[Any] | None = None
description: str = ""
class PipelineItem:
"""
Pipeline 中流动的数据单元。
融合 n8n item 模型 + dify VariablePool 概念。
"""
def __init__(
self,
id: str,
data: dict[str, Any],
metadata: dict[str, Any] | None = None,
errors: list[str] | None = None,
source_node_id: str | None = None,
):
self.id = id
self.data = data
self.metadata = metadata or {}
self.errors = errors or []
self.source_node_id = source_node_id
self.created_at = None # ISO 时间戳,执行时填充
def with_error(self, error: str) -> "PipelineItem":
return PipelineItem(
id=self.id, data=self.data,
metadata=self.metadata.copy(),
errors=self.errors + [error],
source_node_id=self.source_node_id,
)
def with_metadata(self, key: str, value: Any) -> "PipelineItem":
return PipelineItem(
id=self.id, data=self.data,
metadata={**self.metadata, key: value},
errors=self.errors,
source_node_id=self.source_node_id,
)
class PipelineNode(ABC):
"""Pipeline 节点执行器基类"""
type: str = "" # 类属性,由子类填充,如 "converter.mineru"
version: str = "1" # 类属性
def __init__(
self,
node_id: str,
config: dict,
inputs: dict[str, list[PipelineItem]],
):
self.node_id = node_id
self.config = config
self.inputs = inputs # {input_name: [PipelineItem]}
@abstractmethod
async def run(self) -> dict[str, list[PipelineItem]]:
"""
执行节点逻辑。
返回 {output_name: [PipelineItem]} 字典。
"""
...
async def health_check(self) -> bool:
return True
Source 节点:
| 类型 | 说明 | 输入 | 输出 |
|---|---|---|---|
source.file_store |
从 File Store 读取文件 | — | file |
Converter 节点:
| 类型 | 说明 | 输入 | 输出 |
|---|---|---|---|
converter.mineru |
MinerU 文档转换 | file |
markdown, toc, metadata |
converter.docling |
Docling 文档转换 | file |
markdown, toc, metadata |
converter.markitdown |
markitdown 轻量转换 | file |
markdown |
Splitter 节点:
| 类型 | 说明 | 输入 | 输出 |
|---|---|---|---|
splitter.recursive |
递归字符分块 | text |
chunks |
splitter.semantic |
语义分块(LLM 驱动) | text |
chunks |
splitter.fixed |
固定长度分块 | text |
chunks |
splitter.section |
按章节分块 | markdown, toc |
chunks |
Enricher 节点:
| 类型 | 说明 | 输入 | 输出 |
|---|---|---|---|
enricher.metadata |
元数据增强(从内容提取) | text, metadata |
metadata |
enricher.ocr |
OCR 文字识别 | file |
text |
enricher.translate |
文档翻译 | text |
text |
enricher.chunk_meta |
为每个 chunk 附加元数据 | chunks, metadata |
chunks |
Router 节点:
| 类型 | 说明 | 输入 | 输出 |
|---|---|---|---|
router.file_type |
按文件类型路由 | metadata |
多命名分支 |
router.metadata |
按 metadata 字段路由 | metadata |
多命名分支 |
router.if_else |
Jinja2 条件路由 | condition_input, metadata |
true_branch, false_branch |
Sink 节点:
| 类型 | 说明 | 输入 | 输出 |
|---|---|---|---|
sink.ragflow_naive |
RAGFlow NaiveRAG 摄入 | chunks, metadata |
result |
sink.ragflow_graphrag |
RAGFlow GraphRAG 摄入 | markdown, metadata |
result |
sink.neo4j_wiki |
Neo4j WikiGraph 摄入 | markdown, metadata |
result |
import asyncio
from typing import Self
class VariablePool:
"""
节点间共享的变量池(参考 dify VariablePool)。
变量命名规范:{node_id}:{output_name}
例如: "node-002:markdown"
每个变量是一个 PipelineItem 列表。
"""
def __init__(self):
self._variables: dict[str, list[PipelineItem]] = {}
self._lock = asyncio.Lock()
async def set(self, key: str, items: list[PipelineItem]) -> None:
async with self._lock:
self._variables[key] = items
async def get(self, key: str) -> list[PipelineItem]:
async with self._lock:
return self._variables.get(key, [])
async def get_first(self, key: str) -> PipelineItem | None:
items = await self.get(key)
return items[0] if items else None
@staticmethod
def make_key(node_id: str, output_name: str) -> str:
return f"{node_id}:{output_name}"
async def dump(self) -> dict[str, list[dict]]:
"""导出所有变量(用于调试)"""
async with self._lock:
return {
k: [{"id": i.id, "data_keys": list(i.data.keys())} for i in items]
for k, items in self._variables.items()
}
节点 converter.mineru 执行后:
# VariablePool 中:
# "node-002:markdown" → [PipelineItem(id="chunk-1", data={"text": "..."})]
# "node-002:toc" → [PipelineItem(id="toc-1", data={"toc": [...]})]
# "node-002:metadata" → [PipelineItem(id="meta-1", data={"file_type": "pdf", ...})]
节点 splitter.recursive 获取输入时:
markdown_items = await variable_pool.get("node-002:markdown")
# markdown_items: [PipelineItem(id="chunk-1", data={"text": "..."})]
import asyncio
from collections import defaultdict
from typing import Any
class DAGExecutor:
"""
DAG 执行引擎。
核心逻辑(参考 n8n WorkflowExecute + ComfyUI execution.py):
1. 构建邻接表
2. Kahn 算法拓扑排序
3. 按层级分组,同一层级 asyncio.gather 并行执行
4. 每个节点完成后写入 VariablePool
"""
def __init__(
self,
pipeline: dict,
node_registry: "NodeRegistry",
variable_pool: VariablePool,
execution_id: str,
config: dict | None = None,
):
self.pipeline = pipeline
self.nodes = {n["id"]: n for n in pipeline["nodes"]}
self.edges = pipeline["edges"]
self.node_registry = node_registry
self.variable_pool = variable_pool
self.execution_id = execution_id
self.config = config or {}
self._node_instances: dict[str, PipelineNode] = {}
self._execution_order: list[list[str]] = [] # 按层级分组
# ── 4.1.1 构建邻接表 ─────────────────────────────────────────────────
def _build_adjacency(self) -> tuple[dict[str, list[str]], dict[str, int]]:
"""
构建邻接表和入度表。
adj[source_id] = [target_id, ...]
in_degree[target_id] = 多少条边指向它
边结构:{source, source_output, target, target_input}
"""
adj = defaultdict(list)
in_degree: dict[str, int] = {n["id"]: 0 for n in self.pipeline["nodes"]}
for edge in self.edges:
adj[edge["source"]].append(edge["target"])
in_degree[edge["target"]] += 1
return adj, in_degree
def _topological_sort_levels(self) -> list[list[str]]:
"""
Kahn 算法拓扑排序,按层级分组。
返回:[[node_id, ...], [node_id, ...], ...]
每一层是可以并行执行的节点。
注意:Router 节点的多个输出分支会在不同层级中重复出现,
由 Router 自身处理分支选择。
"""
adj, in_degree = self._build_adjacency()
queue = [nid for nid, deg in in_degree.items() if deg == 0]
levels: list[list[str]] = []
while queue:
level = queue
levels.append(level)
queue = []
for node_id in level:
for successor in adj[node_id]:
in_degree[successor] -= 1
if in_degree[successor] == 0:
queue.append(successor)
return levels
# ── 4.1.2 获取节点的依赖输入 ─────────────────────────────────────────
def _resolve_inputs(self, node_id: str) -> dict[str, list[PipelineItem]]:
"""
解析节点所有输入的实际值。
从节点的 inputs 配置中,找到 {from_node, from_output},
从 VariablePool 中获取对应的 PipelineItem 列表。
"""
node = self.nodes[node_id]
resolved = {}
for input_name, input_ref in node.get("inputs", {}).items():
key = VariablePool.make_key(input_ref["from_node"], input_ref["from_output"])
resolved[input_name] = await self.variable_pool.get(key)
return resolved
# ── 4.1.3 主执行循环 ─────────────────────────────────────────────────
async def execute(self) -> dict[str, Any]:
"""
执行整个 Pipeline。
流程:
1. 找到 Source 节点,初始化输入
2. 按层级执行
3. 追踪执行状态
"""
# 拓扑排序
levels = self._topological_sort_levels()
self._execution_order = levels
results = {}
errors: dict[str, str] = {}
for level_idx, node_ids in enumerate(levels):
# 并行执行当前层级的所有节点
tasks = [
self._execute_node(node_id, errors)
for node_id in node_ids
]
level_results = await asyncio.gather(*tasks, return_exceptions=True)
# 检查结果
for node_id, result in zip(node_ids, level_results):
if isinstance(result, Exception):
errors[node_id] = str(result)
if not self.config.get("continue_on_error", False):
# 可以选择停止或继续
pass
else:
results[node_id] = result
return {
"execution_id": self.execution_id,
"status": "completed" if not errors else "partial",
"completed_nodes": list(results.keys()),
"failed_nodes": list(errors.keys()),
"errors": errors,
}
async def _execute_node(
self, node_id: str, errors: dict[str, str]
) -> dict[str, list[PipelineItem]]:
"""
执行单个节点(含重试逻辑)。
"""
node = self.nodes[node_id]
max_retries = self.config.get("max_retries", 2)
last_error: Exception | None = None
for attempt in range(max_retries + 1):
try:
# 获取依赖输入
inputs = await self._resolve_inputs(node_id)
# 获取或创建节点实例
instance = self._get_or_create_instance(node_id, node, inputs)
# 执行(带超时)
timeout = self.config.get("timeout_seconds", 300)
result = await asyncio.wait_for(
instance.run(),
timeout=timeout
)
# 写入 VariablePool
for output_name, items in result.items():
key = VariablePool.make_key(node_id, output_name)
await self.variable_pool.set(key, items)
return result
except asyncio.TimeoutError:
last_error = Exception(f"Node {node_id} timed out after {self.config.get('timeout_seconds', 300)}s")
except Exception as e:
last_error = e
if attempt < max_retries:
await asyncio.sleep(2 ** attempt) # 指数退避
# 所有重试都失败
errors[node_id] = str(last_error)
raise last_error
def _get_or_create_instance(
self, node_id: str, node: dict, inputs: dict[str, list[PipelineItem]]
) -> PipelineNode:
"""获取或创建节点实例(带缓存)"""
if node_id not in self._node_instances:
node_class = self.node_registry.get(node["type"], node.get("version", "1"))
self._node_instances[node_id] = node_class(
node_id=node_id,
config=node.get("config", {}),
inputs=inputs,
)
else:
# 更新 inputs(每次执行都重新解析)
self._node_instances[node_id].inputs = inputs
return self._node_instances[node_id]
async def _execute_parallel_branch(
self,
branch_nodes: list[str],
shared_pool: VariablePool,
) -> dict[str, Any]:
"""
执行一个并行分支(多个节点同时执行)。
场景:Router 节点同时向多个 Sink 节点输出,
Sink 节点并行执行 ingest。
"""
tasks = [
self._execute_node(node_id, {})
for node_id in branch_nodes
]
results = await asyncio.gather(*tasks, return_exceptions=True)
return {node_id: r for node_id, r in zip(branch_nodes, results)}
# Router 节点执行示例:if_else 节点
class RouterIfElseNode(PipelineNode):
type = "router.if_else"
version = "1"
async def run(self) -> dict[str, list[PipelineItem]]:
from jinja2 import Template
condition_expr = self.config.get("condition_expr", "false")
# 从 inputs 获取条件值
condition_input_items = self.inputs.get("condition_input", [])
metadata_items = self.inputs.get("metadata", [])
condition_value = (
condition_input_items[0].data.get("text", "") if condition_input_items else ""
)
ctx = {"value": condition_value, "metadata": metadata_items[0].data if metadata_items else {}}
try:
# Jinja2 安全渲染条件表达式
template = Template(f"{{{{ {condition_expr} }}}}")
result = template.render(**ctx)
is_true = result.lower() in ("true", "1", "yes")
except Exception:
is_true = False
output = "true_branch" if is_true else "false_branch"
return {
output: [PipelineItem(id=f"{self.node_id}-{output}", data={"triggered": True})],
"other_branch": [], # 未选中的分支输出空列表
}
Router 节点是 DAG 中的关键分支控制节点。参考 n8n 的 continueErrorOutput 策略和 dify 的 If/Else 节点设计:
class RouterFileTypeNode(PipelineNode):
"""
按文件类型路由。
配置 routes:[{"condition": "file_type == 'pdf'", "output": "pdf_branch"}, ...]
输入:metadata(包含 file_type 字段)
输出:每个 route 一个命名的控制信号输出
"""
type = "router.file_type"
version = "1"
async def run(self) -> dict[str, list[PipelineItem]]:
metadata_items = self.inputs.get("metadata", [])
if not metadata_items:
return {output: [] for output in self._output_names}
file_type = metadata_items[0].data.get("file_type", "unknown")
routes = self.config.get("routes", [])
default_output = self.config.get("default_output", "default_branch")
triggered_output = default_output
for route in routes:
condition = route.get("condition", "")
# 简单条件解析:file_type == 'pdf'
if self._eval_simple_condition(condition, {"file_type": file_type}):
triggered_output = route["output"]
break
result = {}
for output in self._output_names:
result[output] = [PipelineItem(
id=f"{self.node_id}-{output}",
data={"file_type": file_type, "triggered": output == triggered_output}
)] if output == triggered_output else []
return result
def _eval_simple_condition(self, condition: str, ctx: dict) -> bool:
"""评估简单条件表达式"""
import re
m = re.match(r"(\w+)\s*(==|in)\s*(.+)", condition.strip())
if not m:
return False
field, op, value = m.group(1), m.group(2), m.group(3).strip()
field_val = ctx.get(field, "")
if op == "==":
return str(field_val) == value.strip("'\"")
elif op == "in":
options = [v.strip().strip("'\"") for v in value[1:-1].split(",")]
return str(field_val) in options
return False
@property
def _output_names(self) -> list[str]:
return list(self.config.get("routes", [])) + [self.config.get("default_output", "default_branch")]
参考 n8n 的三种错误处理策略:
| 策略 | n8n 常量 | 说明 | Pipeline 配置 |
|---|---|---|---|
| 停止(Stop) | stopWorkflow |
节点失败后整个 Pipeline 停止 | "continue_on_error": false |
| 继续(Continue) | continueRegularOutput |
失败节点输出空,继续执行后续节点 | "continue_on_error": true |
| 跳过(Skip) | continueErrorOutput |
将错误路由到错误输出分支 | Router 节点处理 |
class ErrorHandlingStrategy:
STOP = "stop"
CONTINUE = "continue"
SKIP = "skip"
async def _execute_node_with_error_handling(
self,
node_id: str,
strategy: str = ErrorHandlingStrategy.CONTINUE,
) -> dict[str, list[PipelineItem]]:
"""
执行节点并应用错误处理策略。
"""
try:
return await self._execute_node(node_id, {})
except Exception as e:
node = self.nodes[node_id]
if strategy == ErrorHandlingStrategy.STOP:
raise
elif strategy == ErrorHandlingStrategy.CONTINUE:
# 输出空 item,允许后续节点继续
outputs = {}
for output_def in self.node_registry.get_output_defs(node["type"]):
outputs[output_def.name] = [PipelineItem(
id=f"error-{self.node_id}-{output_def.name}",
data={"error": str(e)},
errors=[str(e)],
source_node_id=node_id,
)]
return outputs
elif strategy == ErrorHandlingStrategy.SKIP:
# Router 节点会处理这个信号
return {"error_output": [PipelineItem(
id=f"error-{self.node_id}",
data={"error": str(e)},
errors=[str(e)],
source_node_id=node_id,
)]}
raise
Pipeline 执行状态与 V27 的 ragateway_documents.status 状态机对接:
class PipelineExecutionStatus:
"""
与 ragateway_documents.status 状态机对接:
queued → running → indexed | failed
"""
QUEUED = "queued"
RUNNING = "running"
INDEXED = "indexed"
FAILED = "failed"
PARTIAL = "partial" # 部分节点成功
class PipelineExecutor:
"""
Pipeline 执行器(入口类)。
替换 V27 中硬编码的:
DocumentConverterPlugin.convert() → NaiveRAG.ingest() → GraphRAG[n].ingest()
"""
def __init__(
self,
tenant_id: str,
pipeline: dict,
node_registry: "NodeRegistry",
db_session,
config: dict | None = None,
):
self.tenant_id = tenant_id
self.pipeline = pipeline
self.node_registry = node_registry
self.db_session = db_session
self.config = config or {}
self.execution_id = str(uuid.uuid4())
self.variable_pool = VariablePool()
self._status = PipelineExecutionStatus.QUEUED
async def execute(self, doc_id: str, file_path: str) -> dict[str, Any]:
"""
执行一个文档的 Pipeline。
1. 初始化 Source 节点输入
2. 执行 DAG
3. 更新 ragateway_documents 状态
"""
execution_id = self.execution_id
# 1. 更新状态为 running
await self._update_doc_status(doc_id, PipelineExecutionStatus.RUNNING)
try:
# 2. 注入 Source 节点输入
await self.variable_pool.set(
VariablePool.make_key("__source__", "file"),
[PipelineItem(
id=f"file-{doc_id}",
data={"path": file_path, "doc_id": doc_id},
metadata={"doc_id": doc_id, "tenant_id": self.tenant_id},
source_node_id="__source__",
)]
)
# 3. 替换 pipeline 中的 source 节点引用
pipeline = self._resolve_source_node(self.pipeline, file_path)
# 4. 执行 DAG
engine = DAGExecutor(
pipeline=pipeline,
node_registry=self.node_registry,
variable_pool=self.variable_pool,
execution_id=execution_id,
config=self.pipeline.get("config", {}),
)
result = await engine.execute()
# 5. 更新状态
if result["status"] == "completed":
await self._update_doc_status(doc_id, PipelineExecutionStatus.INDEXED)
elif result["status"] == "partial":
await self._update_doc_status(doc_id, PipelineExecutionStatus.PARTIAL)
else:
await self._update_doc_status(doc_id, PipelineExecutionStatus.FAILED)
return result
except Exception as e:
await self._update_doc_status(doc_id, PipelineExecutionStatus.FAILED)
raise
def _resolve_source_node(self, pipeline: dict, file_path: str) -> dict:
"""将 pipeline 中的 source.* 节点替换为实际文件路径"""
resolved = copy.deepcopy(pipeline)
for node in resolved["nodes"]:
if node["type"].startswith("source."):
node["config"]["file_path"] = file_path
return resolved
async def _update_doc_status(self, doc_id: str, status: str) -> None:
"""更新 ragateway_documents 表状态"""
await self.db_session.execute(
"UPDATE ragateway_documents SET status = %s, updated_at = NOW() WHERE doc_id = %s",
status, doc_id
)
以下为可直接嵌入 ragateway/pipeline/executor.py 的完整实现,包含 Kahn 拓扑排序、多层 asyncio 并行执行、三策略错误处理、以及 PostgreSQL 状态写入:
# ragateway/pipeline/executor.py
import asyncio
import copy
import logging
import uuid
from collections import defaultdict
from dataclasses import dataclass, field
from datetime import datetime, timezone
from enum import Enum
from typing import Any
from ragateway.pipeline.registry import NodeRegistry
from ragateway.pipeline.variable_pool import VariablePool
logger = logging.getLogger(__name__)
class ErrorStrategy(str, Enum):
STOP = "stop" # 节点失败立即停止整个 Pipeline
CONTINUE = "continue" # 节点失败输出错误 item,允许后续节点继续
SKIP = "skip" # 节点失败路由到错误分支(由 Router 处理)
class ExecutionStatus(str, Enum):
QUEUED = "queued"
RUNNING = "running"
INDEXED = "indexed"
FAILED = "failed"
PARTIAL = "partial"
@dataclass
class NodeExecutionResult:
node_id: str
status: str # "success" | "failed" | "skipped"
duration_ms: int
output_keys: list[str] = field(default_factory=list)
error: str | None = None
retry_count: int = 0
class DAGExecutor:
"""
DAG 执行引擎(完整实现)。
核心流程:
1. build_execution_layers() — Kahn 拓扑排序,按层级分组
2. execute() — 主入口,按层 asyncio.gather 并行
3. execute_node() — 单节点执行,含三策略错误处理
4. _write_status_to_db() — 执行状态写入 PostgreSQL
"""
def __init__(
self,
pipeline: dict,
node_registry: NodeRegistry,
variable_pool: VariablePool,
execution_id: str,
db_pool, # asyncpg Pool 或 SQLAlchemy async session
config: dict | None = None,
):
self.pipeline = pipeline
self.nodes: dict[str, dict] = {n["id"]: n for n in pipeline["nodes"]}
self.edges = pipeline["edges"]
self.node_registry = node_registry
self.variable_pool = variable_pool
self.execution_id = execution_id
self.db_pool = db_pool
self.config = config or {}
self._node_instances: dict[str, Any] = {}
self._execution_layers: list[list[str]] = []
self._node_results: dict[str, NodeExecutionResult] = {}
# ── 4.1 build_execution_layers:Kahn 拓扑排序,返回按层分组 ───────────────
def build_execution_layers(self) -> list[list[str]]:
"""
Kahn 算法拓扑排序,返回按层级分组的节点 ID 列表。
返回:[[layer_0_nodes], [layer_1_nodes], ...]
同一层的所有节点互相无依赖,可并行执行。
"""
# 构建邻接表和入度表
adj: dict[str, list[str]] = defaultdict(list)
in_degree: dict[str, int] = {nid: 0 for nid in self.nodes}
for edge in self.edges:
adj[edge["source"]].append(edge["target"])
in_degree[edge["target"]] += 1
# Kahn 拓扑排序
queue = [nid for nid, deg in in_degree.items() if deg == 0]
layers: list[list[str]] = []
while queue:
layers.append(sorted(queue)) # 当前层所有节点(排序保证顺序一致)
next_queue = []
for node_id in queue:
for successor in adj[node_id]:
in_degree[successor] -= 1
if in_degree[successor] == 0:
next_queue.append(successor)
queue = next_queue
self._execution_layers = layers
logger.info(
"[%s] Built %d execution layers: %s",
self.execution_id,
len(layers),
[[len(n) for n in layers]],
)
return layers
# ── 4.2 execute:主执行入口 ───────────────────────────────────────────────
async def execute(self) -> dict[str, Any]:
"""
主执行入口。
流程:
1. build_execution_layers() 拓扑排序
2. 逐层 asyncio.gather 并行执行
3. 每节点执行后写入 PostgreSQL 状态
4. 最终汇总状态并写入
"""
import time
start_ts = time.monotonic()
layers = self.build_execution_layers()
# 写入 Pipeline 启动状态
await self._write_execution_start(layers)
errors: dict[str, str] = {}
for layer_idx, node_ids in enumerate(layers):
logger.info(
"[%s] Executing layer %d/%d with %d nodes: %s",
self.execution_id, layer_idx + 1, len(layers), len(node_ids), node_ids
)
# 获取本层节点的错误处理策略(默认 CONTINUE)
tasks = [
self._execute_node_with_strategy(
node_id,
ErrorStrategy(self.config.get("error_strategy", "continue"))
)
for node_id in node_ids
]
layer_results = await asyncio.gather(*tasks, return_exceptions=True)
for node_id, result in zip(node_ids, layer_results):
if isinstance(result, Exception):
errors[node_id] = str(result)
logger.error("[%s] Node %s failed: %s", self.execution_id, node_id, result)
elif isinstance(result, NodeExecutionResult):
self._node_results[node_id] = result
# 每节点完成后写入 DB
await self._write_node_result(result)
# ── 汇总最终状态 ────────────────────────────────────────────────────
total_duration_ms = int((time.monotonic() - start_ts) * 1000)
if not errors:
final_status = ExecutionStatus.INDEXED
elif len(errors) < len(self._node_results):
final_status = ExecutionStatus.PARTIAL
else:
final_status = ExecutionStatus.FAILED
await self._write_execution_end(final_status, total_duration_ms, errors)
return {
"execution_id": self.execution_id,
"status": final_status.value,
"total_duration_ms": total_duration_ms,
"completed_nodes": [nid for nid, r in self._node_results.items() if r.status == "success"],
"failed_nodes": list(errors.keys()),
"node_results": {
nid: {"status": r.status, "duration_ms": r.duration_ms, "error": r.error}
for nid, r in self._node_results.items()
},
"errors": errors,
}
# ── 4.3 execute_node:三策略错误处理 ─────────────────────────────────────
async def _execute_node_with_strategy(
self, node_id: str, strategy: ErrorStrategy
) -> NodeExecutionResult:
"""
执行单个节点,应用错误处理策略。
策略说明:
- STOP: 重试用尽后直接抛出异常,Pipeline 终止
- CONTINUE: 重试用尽后返回带错误数据的 NodeExecutionResult(failed)
- SKIP: 重试用尽后返回 NodeExecutionResult(skipped),不阻塞后续节点
"""
import time
max_retries = self.config.get("max_retries", 2)
timeout_sec = self.config.get("timeout_seconds", 300)
node = self.nodes[node_id]
last_error: Exception | None = None
for attempt in range(max_retries + 1):
attempt_start = time.monotonic()
try:
# ① 解析依赖输入
inputs = await self._resolve_inputs(node_id)
# ② 获取节点实例
instance = self._get_or_create_instance(node_id, node, inputs)
# ③ 执行(含超时)
result = await asyncio.wait_for(instance.run(), timeout=timeout_sec)
# ④ 写入 VariablePool
for output_name, items in result.items():
key = VariablePool.make_key(node_id, output_name)
await self.variable_pool.set(key, items)
duration_ms = int((time.monotonic() - attempt_start) * 1000)
return NodeExecutionResult(
node_id=node_id,
status="success",
duration_ms=duration_ms,
output_keys=list(result.keys()),
retry_count=attempt,
)
except asyncio.TimeoutError:
last_error = TimeoutError(
f"Node {node_id} timed out after {timeout_sec}s (attempt {attempt + 1})"
)
except Exception as e:
last_error = e
if attempt < max_retries:
wait = 2 ** attempt
logger.warning(
"[%s] Node %s attempt %d failed: %s — retrying in %ds",
self.execution_id, node_id, attempt + 1, last_error, wait
)
await asyncio.sleep(wait)
# ── 所有重试耗尽,按策略处理 ────────────────────────────────────────
duration_ms = int((time.monotonic() - attempt_start) * 1000)
error_str = str(last_error) if last_error else "unknown"
if strategy == ErrorStrategy.STOP:
# 立即抛出,Pipeline 执行循环会捕获
raise RuntimeError(f"Node {node_id} failed after {max_retries + 1} attempts: {error_str}")
logger.error(
"[%s] Node %s exhausted retries (%s), applying strategy: %s — %s",
self.execution_id, node_id, max_retries + 1, strategy.value, error_str
)
if strategy == ErrorStrategy.CONTINUE:
# 输出错误 item 到 VariablePool,允许后续节点继续
await self.variable_pool.set(
VariablePool.make_key(node_id, "_error"),
[PipelineItem(id=f"err-{node_id}", data={"error": error_str},
errors=[error_str], source_node_id=node_id)]
)
return NodeExecutionResult(
node_id=node_id, status="failed",
duration_ms=duration_ms, error=error_str, retry_count=max_retries + 1,
)
# SKIP — 不输出任何数据,标记为 skipped
return NodeExecutionResult(
node_id=node_id, status="skipped",
duration_ms=duration_ms, error=error_str, retry_count=max_retries + 1,
)
async def _resolve_inputs(self, node_id: str) -> dict[str, list[PipelineItem]]:
"""解析节点输入:从 VariablePool 获取依赖的 PipelineItem 列表"""
node = self.nodes[node_id]
resolved = {}
for input_name, input_ref in node.get("inputs", {}).items():
key = VariablePool.make_key(input_ref["from_node"], input_ref["from_output"])
resolved[input_name] = await self.variable_pool.get(key)
return resolved
def _get_or_create_instance(self, node_id: str, node: dict, inputs: dict) -> PipelineNode:
"""获取或创建节点实例(带缓存,每次执行重新注入 inputs)"""
if node_id not in self._node_instances:
node_class = self.node_registry.get(node["type"], node.get("version", "1"))
self._node_instances[node_id] = node_class(
node_id=node_id,
config=node.get("config", {}),
inputs=inputs,
)
else:
self._node_instances[node_id].inputs = inputs
return self._node_instances[node_id]
# ── 4.4 PostgreSQL 状态写入 ───────────────────────────────────────────────
async def _write_execution_start(self, layers: list[list[str]]) -> None:
"""写入 pipeline_executions 启动记录"""
try:
async with self.db_pool.acquire() as conn:
await conn.execute(
"""
INSERT INTO pipeline_executions
(execution_id, tenant_id, pipeline_id, doc_id, status, started_at, node_results)
VALUES ($1, $2, $3, $4, $5, $6, $7)
ON CONFLICT (execution_id) DO NOTHING
""",
self.execution_id,
self.pipeline.get("tenant_id"),
self.pipeline.get("pipeline_id"),
self.config.get("doc_id"),
ExecutionStatus.RUNNING.value,
datetime.now(timezone.utc),
{},
)
except Exception as e:
logger.error("[%s] Failed to write execution start: %s", self.execution_id, e)
async def _write_node_result(self, result: NodeExecutionResult) -> None:
"""单节点完成后写入/更新 pipeline_executions.node_results JSONB"""
try:
async with self.db_pool.acquire() as conn:
await conn.execute(
"""
UPDATE pipeline_executions
SET node_results = node_results ||
jsonb_build_object($2, jsonb_build_object(
'status', $3, 'duration_ms', $4, 'error', $5, 'retry_count', $6
))
WHERE execution_id = $1
""",
self.execution_id,
result.node_id,
result.status,
result.duration_ms,
result.error,
result.retry_count,
)
except Exception as e:
logger.error("[%s] Failed to write node result for %s: %s",
self.execution_id, result.node_id, e)
async def _write_execution_end(
self,
status: ExecutionStatus,
total_duration_ms: int,
errors: dict[str, str],
) -> None:
"""Pipeline 执行完成后更新最终状态"""
try:
async with self.db_pool.acquire() as conn:
await conn.execute(
"""
UPDATE pipeline_executions
SET status = $2,
completed_at = $3,
total_duration_ms = $4,
error_message = $5
WHERE execution_id = $1
""",
self.execution_id,
status.value,
datetime.now(timezone.utc),
total_duration_ms,
"; ".join(f"{n}: {e}" for n, e in errors.items()) if errors else None,
)
except Exception as e:
logger.error("[%s] Failed to write execution end: %s", self.execution_id, e)
所有内置节点(Python 模块,位于 ragateway/pipeline/nodes/):
ragateway/pipeline/nodes/
├── __init__.py
├── source/
│ ├── __init__.py
│ └── file_store.py # source.file_store
├── converter/
│ ├── __init__.py
│ ├── mineru.py # converter.mineru
│ ├── docling.py # converter.docling
│ └── markitdown.py # converter.markitdown
├── splitter/
│ ├── __init__.py
│ ├── recursive.py # splitter.recursive
│ ├── semantic.py # splitter.semantic
│ ├── fixed.py # splitter.fixed
│ └── section.py # splitter.section
├── enricher/
│ ├── __init__.py
│ ├── metadata.py # enricher.metadata
│ ├── ocr.py # enricher.ocr
│ └── translate.py # enricher.translate
├── router/
│ ├── __init__.py
│ ├── file_type.py # router.file_type
│ ├── metadata.py # router.metadata
│ └── if_else.py # router.if_else
└── sink/
├── __init__.py
├── ragflow_naive.py # sink.ragflow_naive
├── ragflow_graphrag.py # sink.ragflow_graphrag
└── neo4j_wiki.py # sink.neo4j_wiki
参考 ComfyUI 的 NODE_CLASS_MAPPINGS 装饰器注册模式:
# ragateway/pipeline/registry.py
from typing import type as type_t
class NodeRegistry:
"""
节点注册表。
参考 ComfyUI NODE_CLASS_MAPPINGS 全局注册表 + dify NodeFactory。
使用方式:
@register_node(type="converter.my_custom", version="1")
class MyCustomConverter(PipelineNode):
type = "converter.my_custom"
version = "1"
...
"""
_registry: dict[str, dict[str, type_t[PipelineNode]]] = {}
_metadata: dict[str, dict[str, "NodeMetadata"]] = {}
@classmethod
def register(
cls,
node_type: str,
version: str,
node_class: type_t[PipelineNode],
metadata: "NodeMetadata | None" = None,
) -> None:
"""装饰器回调,实际注册"""
if node_type not in cls._registry:
cls._registry[node_type] = {}
cls._registry[node_type][version] = node_class
if metadata:
if node_type not in cls._metadata:
cls._metadata[node_type] = {}
cls._metadata[node_type][version] = metadata
@classmethod
def get(
cls, node_type: str, version: str | None = None
) -> type_t[PipelineNode]:
"""获取节点类"""
versions = cls._registry.get(node_type, {})
if not versions:
raise KeyError(f"Unknown node type: {node_type}")
if version and version in versions:
return versions[version]
# 返回最新版本(version 为 None 时)
return versions.get("latest", versions.get("1", next(iter(versions.values()))))
@classmethod
def list_types(cls) -> list[str]:
"""列出所有已注册的节点类型"""
return list(cls._registry.keys())
@classmethod
def get_metadata(cls, node_type: str, version: str = "1") -> "NodeMetadata | None":
"""获取节点元信息"""
return cls._metadata.get(node_type, {}).get(version)
@dataclass
class NodeMetadata:
type: str
version: str = "1"
category: str = ""
display_name: str = ""
description: str = ""
icon: str = ""
input_defs: list[NodeInputDef] = field(default_factory=list)
output_defs: list[NodeOutputDef] = field(default_factory=list)
config_defs: list[NodeConfigField] = field(default_factory=list)
def register_node(
node_type: str,
version: str = "1",
category: str = "",
display_name: str = "",
description: str = "",
icon: str = "",
input_defs: list[NodeInputDef] | None = None,
output_defs: list[NodeOutputDef] | None = None,
config_defs: list[NodeConfigField] | None = None,
) -> callable:
"""
节点注册装饰器。
使用示例:
@register_node(
type="converter.my_custom",
version="1",
category="Converter",
display_name="我的自定义转换器",
description="将任何文档转换为 Markdown",
icon="🔧",
)
class MyCustomConverter(PipelineNode):
type = "converter.my_custom"
version = "1"
async def run(self) -> dict[str, list[PipelineItem]]:
...
"""
def decorator(cls: type_t[PipelineNode]) -> type_t[PipelineNode]:
# 注册节点类
NodeRegistry.register(node_type, version, cls)
# 注册元信息
meta = NodeMetadata(
type=node_type,
version=version,
category=category,
display_name=display_name or cls.__name__,
description=description,
icon=icon,
input_defs=input_defs or [],
output_defs=output_defs or [],
config_defs=config_defs or [],
)
NodeRegistry.register(node_type, version, cls, metadata=meta)
# 设置类属性
cls.type = node_type
cls.version = version
return cls
return decorator
# ─── 内置节点导入(启动时注册) ─────────────────────────────────────────────
def load_builtin_nodes() -> None:
"""导入所有内置节点模块,触发 @register_node 装饰器"""
import ragateway.pipeline.nodes.source
import ragateway.pipeline.nodes.converter
import ragateway.pipeline.nodes.splitter
import ragateway.pipeline.nodes.enricher
import ragateway.pipeline.nodes.router
import ragateway.pipeline.nodes.sink
第三方节点使用示例:
# third_party_node.py(第三方节点)
from ragateway.pipeline.registry import register_node, NodeInputDef, NodeOutputDef, NodeConfigField, PipelineNode, PipelineItem
@register_node(
type="converter.tesseract_ocr",
version="1",
category="Converter",
display_name="Tesseract OCR 转换器",
description="使用 Tesseract 进行 OCR 文字识别",
icon="🔍",
input_defs=[
NodeInputDef(name="image", type="file", required=True),
],
output_defs=[
NodeOutputDef(name="text", type="text"),
],
config_defs=[
NodeConfigField(name="lang", type="select", label="语言",
options=["eng", "chi_sim", "jpn"], default="eng"),
],
)
class TesseractOCRNode(PipelineNode):
type = "converter.tesseract_ocr"
version = "1"
async def run(self) -> dict[str, list[PipelineItem]]:
import pytesseract
from PIL import Image
image_items = self.inputs.get("image", [])
lang = self.config.get("lang", "eng")
results = []
for item in image_items:
image_path = item.data["path"]
text = pytesseract.image_to_string(Image.open(image_path), lang=lang)
results.append(PipelineItem(
id=f"ocr-{item.id}",
data={"text": text},
metadata=item.metadata,
source_node_id=self.node_id,
))
return {"text": results}
以下是一个完整的第三方节点开发示例,实现 enricher.paddle_ocr 节点(基于 PaddleOCR),展示:
- @register_node 装饰器完整用法
- INPUT_TYPES / OUTPUT_TYPES 定义(参考 ComfyUI 风格)
- config_schema(JSON Schema,节点配置 UI 渲染依据)
- execute() 异步实现 + 错误处理
- 重试机制与错误 item 传播
# third_party_nodes/paddle_ocr_node.py
"""
第三方节点示例:PaddleOCR 文字识别节点
安装:pip install paddlepaddle paddleocr
使用:将此文件放入 site-packages/ragateway_nodes/paddle_ocr_node.py
ragateway 启动时自动发现并注册,无需重启进程(热加载)。
"""
import asyncio
import logging
from dataclasses import dataclass
from typing import Any
from ragateway.pipeline.registry import register_node, NodeInputDef, NodeOutputDef
from ragateway.pipeline.registry import NodeConfigField, PipelineNode, PipelineItem
logger = logging.getLogger(__name__)
@dataclass
class PaddleOCRConfigSchema:
"""PaddleOCR 节点配置 schema(JSON Schema 定义,UI 渲染依据)"""
lang: dict = None # 将在装饰器中传入
def __post_init__(self):
self.lang = {
"type": "string",
"enum": ["ch", "en", "japan", "korean", "chinese_cht"],
"default": "ch",
"description": "OCR 识别语言",
}
@register_node(
type="enricher.paddle_ocr",
version="1",
category="Enricher",
display_name="PaddleOCR 文字识别",
description="基于 PaddleOCR 的图片/PDF 文字识别,支持多语言",
icon="🖼️",
input_defs=[
NodeInputDef(
name="image",
type="file",
required=True,
description="支持图片文件路径或 PDF 临时解压路径",
),
NodeInputDef(
name="metadata",
type="metadata",
required=False,
description="上游元数据,透传到输出",
),
],
output_defs=[
NodeOutputDef(
name="text",
type="text",
description="OCR 识别结果文本(按行聚合)",
),
NodeOutputDef(
name="word_boxes",
type="json",
description="每个词的边界框坐标(用于后续布局分析)",
),
NodeOutputDef(
name="metadata",
type="metadata",
description="透传的元数据",
),
],
config_defs=[
NodeConfigField(
name="lang",
type="select",
label="识别语言",
default="ch",
options=["ch", "en", "japan", "korean", "chinese_cht"],
description="PaddleOCR 模型语言,ch=中文,en=英文",
),
NodeConfigField(
name="use_angle_cls",
type="boolean",
label="自动旋转校正",
default=True,
description="是否启用方向分类器(适合含旋转文本的扫描件)",
),
NodeConfigField(
name="det_limit_side_len",
type="integer",
label="检测边长限制",
default=960,
description="文字检测最大边长(像素),越大越慢但更准",
),
NodeConfigField(
name="merge_spacing",
type="boolean",
label="合并段落空格",
default=False,
description="将行间空格替换为段落分隔符,适合长文档",
),
NodeConfigField(
name="fail_strategy",
type="select",
label="失败策略",
default="continue",
options=["stop", "continue", "skip"],
description="识别失败时的处理策略(stop=停止Pipeline,continue=输出错误item,skip=静默跳过)",
),
],
)
class PaddleOCRNode(PipelineNode):
"""
PaddleOCR 节点:支持图片和 PDF 页面的文字识别。
输入:
image: 文件路径(str)
metadata: 上游透传的元数据字典
输出:
text: 识别文本(str)
word_boxes: 词边界框列表(list[dict])
metadata: 透传的元数据(dict)
"""
type = "enricher.paddle_ocr"
version = "1"
async def run(self) -> dict[str, list[PipelineItem]]:
import os
from pathlib import Path
image_items = self.inputs.get("image", [])
metadata_items = self.inputs.get("metadata", [])
config = self.config
if not image_items:
logger.warning("[%s] No image input items, returning empty", self.node_id)
return {
"text": [],
"word_boxes": [],
"metadata": metadata_items or [],
}
# ── 初始化 PaddleOCR(延迟加载,避免启动耗时) ──────────────────────
lang = config.get("lang", "ch")
use_angle_cls = config.get("use_angle_cls", True)
det_limit = config.get("det_limit_side_len", 960)
try:
from paddleocr import PaddleOCR
except ImportError:
logger.error(
"[%s] PaddleOCR not installed. Run: pip install paddlepaddle paddleocr",
self.node_id
)
return await self._handle_failure(
"PaddleOCR package not installed",
metadata_items,
config.get("fail_strategy", "continue"),
)
# 单例缓存(避免重复初始化)
cache_key = f"paddle_ocr_{lang}_{use_angle_cls}"
if not hasattr(PaddleOCRNode, "_ocr_cache"):
PaddleOCRNode._ocr_cache: dict = {}
if cache_key not in PaddleOCRNode._ocr_cache:
logger.info("[%s] Initializing PaddleOCR (lang=%s, cls=%s)",
self.node_id, lang, use_angle_cls)
PaddleOCRNode._ocr_cache[cache_key] = PaddleOCR(
lang=lang,
use_angle_cls=use_angle_cls,
det_limit_side_len=det_limit,
show_log=False,
)
ocr = PaddleOCRNode._ocr_cache[cache_key]
# ── 逐文件 OCR 识别 ──────────────────────────────────────────────
all_texts: list[str] = []
all_word_boxes: list[dict] = []
errors: list[str] = []
for item in image_items:
file_path = item.data.get("path", "") if isinstance(item.data, dict) else str(item.data)
if not file_path or not Path(file_path).exists():
errors.append(f"File not found: {file_path}")
logger.error("[%s] File not found: %s", self.node_id, file_path)
continue
try:
# PaddleOCR.ocr() 是同步的,放在线程池中避免阻塞事件循环
loop = asyncio.get_event_loop()
ocr_result = await loop.run_in_executor(
None, lambda: ocr.ocr(file_path, cls=use_angle_cls)
)
if not ocr_result or not ocr_result[0]:
logger.warning("[%s] No text found in %s", self.node_id, file_path)
continue
# ocr_result: [[[[x1,y1],[x2,y2],[x3,y3],[x4,y3]], (text, confidence)], ...]
for line_result in ocr_result[0]:
if not line_result:
continue
box = line_result[0] # 4点坐标
text_info = line_result[1]
text = text_info[0] if isinstance(text_info, tuple) else str(text_info)
all_texts.append(text)
all_word_boxes.append({
"text": text,
"box": box,
"confidence": text_info[1] if isinstance(text_info, tuple) and len(text_info) > 1 else None,
"source_file": file_path,
"source_item_id": item.id,
})
logger.info(
"[%s] OCR completed for %s: %d lines, %.1f avg confidence",
self.node_id, Path(file_path).name,
len(all_texts),
sum(w["confidence"] for w in all_word_boxes if w["confidence"]) /
max(1, sum(1 for w in all_word_boxes if w["confidence"]))
)
except Exception as e:
errors.append(f"OCR failed for {file_path}: {str(e)}")
logger.error("[%s] OCR error for %s: %s", self.node_id, file_path, e)
fail_strategy = config.get("fail_strategy", "continue")
if fail_strategy == "stop":
raise
elif fail_strategy == "skip":
continue # 静默跳过此文件
# continue 模式:收集错误但不中断
# ── 组装输出 PipelineItem ─────────────────────────────────────────
merge_spacing = config.get("merge_spacing", False)
final_text = "\n".join(all_texts)
if merge_spacing:
final_text = final_text.replace(" ", "")
# 透传元数据
output_metadata = metadata_items[0].data if metadata_items else {}
output_metadata["ocr_lang"] = lang
output_metadata["ocr_lines"] = len(all_texts)
output_metadata["ocr_errors"] = errors if errors else None
return {
"text": [
PipelineItem(
id=f"ocr-text-{self.node_id}",
data={"text": final_text},
metadata=output_metadata,
source_node_id=self.node_id,
)
],
"word_boxes": [
PipelineItem(
id=f"ocr-boxes-{self.node_id}",
data={"word_boxes": all_word_boxes},
metadata=output_metadata,
source_node_id=self.node_id,
)
],
"metadata": [
PipelineItem(
id=f"ocr-meta-{self.node_id}",
data=output_metadata,
metadata=output_metadata,
source_node_id=self.node_id,
)
],
}
async def _handle_failure(
self, error_msg: str, metadata_items: list[PipelineItem], fail_strategy: str
) -> dict[str, list[PipelineItem]]:
"""处理节点失败,按 fail_strategy 策略返回"""
if fail_strategy == "stop":
raise RuntimeError(f"[{self.node_id}] {error_msg}")
logger.warning("[%s] OCR failure (strategy=%s): %s",
self.node_id, fail_strategy, error_msg)
error_item = PipelineItem(
id=f"ocr-error-{self.node_id}",
data={"error": error_msg},
errors=[error_msg],
source_node_id=self.node_id,
)
if fail_strategy == "skip":
return {"text": [], "word_boxes": [], "metadata": metadata_items}
# continue — 输出错误 item,允许后续节点继续
return {
"text": [error_item],
"word_boxes": [error_item],
"metadata": metadata_items,
}
节点注册流程说明(热加载):
# ragateway/pipeline/hot_loader.py
import importlib
import pkgutil
import logging
logger = logging.getLogger(__name__)
def discover_and_register_third_party_nodes() -> int:
"""
扫描 site-packages/ragateway_nodes/ 下的所有 Python 模块,
自动导入触发 @register_node 装饰器注册。
返回:注册成功的节点数量。
"""
import sys
from pathlib import Path
count = 0
for path in sys.path:
node_dir = Path(path) / "ragateway_nodes"
if not node_dir.exists():
continue
logger.info("Discovering third-party nodes in: %s", node_dir)
for _, module_name, _ in pkgutil.iter_modules([str(node_dir)]):
try:
full_name = f"ragateway_nodes.{module_name}"
importlib.import_module(full_name)
count += 1
logger.info(" ✓ Loaded: %s", full_name)
except ImportError as e:
logger.warning(" ✗ Failed to import %s: %s", module_name, e)
return count
import importlib
import pkgutil
from pathlib import Path
class HotReloadableNodeRegistry(NodeRegistry):
"""
支持热加载的节点注册表。
第三方节点通过 pip 安装到 site-packages/ragateway_nodes/ 后,
自动发现并注册,无需重启 ragateway 进程。
"""
_extra_paths: list[Path] = []
@classmethod
def add_extra_path(cls, path: Path) -> None:
"""添加第三方节点搜索路径"""
cls._extra_paths.append(path)
@classmethod
def discover_nodes(cls) -> None:
"""扫描并加载所有节点(内置 + 第三方)"""
# 1. 加载内置节点
load_builtin_nodes()
# 2. 扫描第三方节点路径
for extra_path in cls._extra_paths:
if not extra_path.exists():
continue
for _, module_name, _ in pkgutil.walk_packages([str(extra_path)]):
try:
importlib.import_module(module_name)
except ImportError as e:
logging.warning(f"Failed to import node module {module_name}: {e}")
-- Pipeline 元数据表
CREATE TABLE tenant_pipelines (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id UUID NOT NULL REFERENCES tenants(id) ON DELETE CASCADE,
pipeline_id UUID NOT NULL UNIQUE, -- 对外暴露的 ID
name VARCHAR(255) NOT NULL,
description TEXT,
dag_json JSONB NOT NULL, -- 完整的 pipeline DAG 配置
version INTEGER NOT NULL DEFAULT 1, -- pipeline 版本(乐观锁)
is_active BOOLEAN NOT NULL DEFAULT false, -- 是否为当前激活的 pipeline
is_default BOOLEAN NOT NULL DEFAULT false, -- 是否为默认 pipeline
created_by UUID REFERENCES users(id),
updated_by UUID REFERENCES users(id),
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
-- 约束:每个租户只能有一个激活的默认 pipeline
EXCLUDE USING gist (
tenant_id WITH =,
is_default WITH =
) WHERE (is_default = true)
);
-- Pipeline 执行历史表
CREATE TABLE pipeline_executions (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
execution_id VARCHAR(64) NOT NULL UNIQUE,
tenant_id UUID NOT NULL REFERENCES tenants(id),
pipeline_id UUID NOT NULL REFERENCES tenant_pipelines(pipeline_id),
doc_id UUID REFERENCES ragateway_documents(doc_id),
status VARCHAR(32) NOT NULL, -- queued/running/indexed/failed/partial
started_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
completed_at TIMESTAMPTZ,
node_results JSONB, -- {node_id: {status, duration_ms, error}}
total_duration_ms INTEGER,
error_message TEXT,
INDEX idx_tenant_status (tenant_id, status),
INDEX idx_doc_id (doc_id),
);
-- Pipeline 版本历史表(审计用)
CREATE TABLE pipeline_versions (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
pipeline_id UUID NOT NULL REFERENCES tenant_pipelines(pipeline_id),
version INTEGER NOT NULL,
dag_json JSONB NOT NULL,
changed_by UUID REFERENCES users(id),
changed_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
change_summary TEXT,
UNIQUE (pipeline_id, version)
);
POST /api/admin/pipelines # 创建新 pipeline
GET /api/admin/pipelines # 列出所有 pipeline(含 DAG)
GET /api/admin/pipelines/{id} # 获取单个 pipeline 详情
PUT /api/admin/pipelines/{id} # 更新 pipeline(DAG)
DELETE /api/admin/pipelines/{id} # 删除 pipeline
POST /api/admin/pipelines/{id}/activate # 激活为当前 pipeline
POST /api/admin/pipelines/{id}/duplicate # 复制 pipeline
GET /api/admin/pipelines/{id}/versions # 版本历史
GET /api/admin/node-types # 列出所有可用节点类型(含元信息)
GET /api/admin/node-types/{type} # 获取特定节点类型的详细信息
GET /api/admin/executions/{id} # 获取执行结果
POST /api/admin/pipelines/{id}/dry-run # 干运行验证 DAG 可达性
API 请求/响应示例:
# POST /api/admin/pipelines
{
"name": "RegDesk 医疗文档标准流程",
"description": "处理医疗设备注册文档的默认流程",
"dag_json": { ... }, # 完整的 pipeline JSON
"is_default": true
}
# 响应:
{
"pipeline_id": "pipeline-uuid-xxx",
"status": "draft",
"message": "Pipeline created. Activate it to use."
}
# POST /api/admin/pipelines/{id}/activate
# 响应:
{
"pipeline_id": "pipeline-uuid-xxx",
"is_active": true,
"previous_pipeline_id": "pipeline-uuid-yyy", # 切换前的 pipeline
"message": "Pipeline activated. All new uploads will use this pipeline."
}
# GET /api/admin/node-types
# 响应:
{
"node_types": [
{
"type": "converter.mineru",
"version": "1",
"category": "Converter",
"display_name": "MinerU 文档转换器",
"description": "使用 MinerU 引擎将 PDF/DOCX 等文档转换为 Markdown",
"icon": "📄",
"inputs": [
{"name": "file", "type": "file", "required": True, "description": "原始文件路径"}
],
"outputs": [
{"name": "markdown", "type": "markdown"},
{"name": "toc", "type": "toc"},
{"name": "metadata", "type": "metadata"}
],
"config_fields": [
{"name": "backend", "type": "select", "label": "转换引擎",
"options": ["hybrid-auto-engine", "vlm-auto-engine", "pipeline"]}
]
},
...
]
}
当租户没有配置自己的 pipeline 时,使用全局默认 pipeline:
# ragateway/pipeline/defaults.py
V27_DEFAULT_PIPELINE = {
"version": "1",
"pipeline_id": "__v27_default__",
"tenant_id": "__system__",
"name": "V27 兼容默认 Pipeline",
"description": "向后兼容 V27 的硬编码流程",
"config": {"max_concurrency": 4, "timeout_seconds": 600},
"nodes": [
{
"id": "source",
"type": "source.file_store",
"version": "1",
"name": "文件来源",
"inputs": {},
"outputs": ["file"],
"config": {},
},
{
"id": "converter",
"type": "converter.mineru",
"version": "1",
"name": "文档转换",
"inputs": {"file": {"from_node": "source", "from_output": "file"}},
"outputs": ["markdown", "toc", "metadata"],
"config": {"backend": "hybrid-auto-engine", "lang_list": ["en", "zh"]},
},
{
"id": "splitter",
"type": "splitter.recursive",
"version": "1",
"name": "文本分块",
"inputs": {"text": {"from_node": "converter", "from_output": "markdown"}},
"outputs": ["chunks"],
"config": {"chunk_size": 512, "chunk_overlap": 64},
},
{
"id": "sink-naive",
"type": "sink.ragflow_naive",
"version": "1",
"name": "NaiveRAG 摄入",
"inputs": {
"chunks": {"from_node": "splitter", "from_output": "chunks"},
"metadata": {"from_node": "converter", "from_output": "metadata"},
},
"outputs": ["result"],
"config": {"enabled": True},
},
{
"id": "sink-graphrag",
"type": "sink.ragflow_graphrag",
"version": "1",
"name": "GraphRAG 摄入",
"inputs": {
"markdown": {"from_node": "converter", "from_output": "markdown"},
"metadata": {"from_node": "converter", "from_output": "metadata"},
},
"outputs": ["result"],
"config": {"source": "ragflow_general", "enabled": True},
},
],
"edges": [
{"id": "e1", "source": "source", "source_output": "file", "target": "converter", "target_input": "file"},
{"id": "e2", "source": "converter", "source_output": "markdown", "target": "splitter", "target_input": "text"},
{"id": "e3", "source": "converter", "source_output": "metadata", "target": "sink-naive", "target_input": "metadata"},
{"id": "e4", "source": "converter", "source_output": "metadata", "target": "sink-graphrag", "target_input": "metadata"},
{"id": "e5", "source": "splitter", "source_output": "chunks", "target": "sink-naive", "target_input": "chunks"},
{"id": "e6", "source": "converter", "source_output": "markdown", "target": "sink-graphrag", "target_input": "markdown"},
],
}
V27 的 upsert 接口 (POST /api/file/upsert) 核心逻辑变化:
# V27 硬编码(ragateway/api/file_upsert.py):
async def handle_upsert(file, metadata, converter_name):
# 1. 转换
converter = await plugin_manager.get_converter(converter_name)
result = await converter.convert(file.path)
# 2. 分块(硬编码在 ragateway 里)
chunks = await chunk_text(result.markdown, chunk_size=512, overlap=64)
# 3. NaiveRAG ingest
await naive_plugin.ingest(doc_id, chunks, metadata)
# 4. GraphRAG ingest(并行)
await asyncio.gather(*[
gp.ingest(doc_id, result.markdown, metadata)
for gp in graphrag_plugins
])
# V28 替换为:
async def handle_upsert(file, metadata, tenant_id):
# 1. 获取当前激活的 pipeline
pipeline = await get_active_pipeline(tenant_id)
if not pipeline:
pipeline = V27_DEFAULT_PIPELINE # 向下兼容
# 2. PipelineExecutor 执行
executor = PipelineExecutor(
tenant_id=tenant_id,
pipeline=pipeline,
node_registry=HotReloadableNodeRegistry,
db_session=db,
)
result = await executor.execute(doc_id=doc_id, file_path=file.path)
return result
| 场景 | 行为 |
|---|---|
| 租户没有配置 pipeline | 使用 V27_DEFAULT_PIPELINE |
converter 请求参数仍传递 |
转换为 pipeline 中的 converter 节点配置 |
ragateway_documents.status 状态机 |
PipelineExecutor 负责更新,与 V27 完全一致 |
现有 /api/file/upsert 接口签名 |
不变化,内部改为调用 PipelineExecutor |
# 将 V27 pending.yaml 配置映射为 V28 Pipeline
def migrate_v27_config_to_pipeline(v27_config: dict, tenant_id: str) -> dict:
"""
将 V27 的 converter/naive_rag/graphrag 配置转换为 V28 Pipeline DAG。
用于首次启用 V28 时,自动将已有 V27 配置生成为 Pipeline。
"""
nodes = []
edges = []
node_counter = 1
# Source
source_id = f"node-{node_counter:03d}"
nodes.append({
"id": source_id, "type": "source.file_store", "version": "1",
"name": "文件来源", "inputs": {}, "outputs": ["file"], "config": {}
})
node_counter += 1
# Converter
conv_id = f"node-{node_counter:03d}"
active_conv = v27_config.get("converter", {}).get("active", "mineru")
conv_cfg = v27_config.get("converter", {}).get(active_conv, {})
nodes.append({
"id": conv_id, "type": f"converter.{active_conv}", "version": "1",
"name": "文档转换",
"inputs": {"file": {"from_node": source_id, "from_output": "file"}},
"outputs": ["markdown", "toc", "metadata"],
"config": conv_cfg,
})
edges.append({"id": f"e{len(edges)+1}", "source": source_id,
"source_output": "file", "target": conv_id, "target_input": "file"})
node_counter += 1
# Splitter
splitter_id = f"node-{node_counter:03d}"
nodes.append({
"id": splitter_id, "type": "splitter.recursive", "version": "1",
"name": "文本分块",
"inputs": {"text": {"from_node": conv_id, "from_output": "markdown"}},
"outputs": ["chunks"],
"config": {"chunk_size": 512, "chunk_overlap": 64},
})
edges.append({"id": f"e{len(edges)+1}", "source": conv_id,
"source_output": "markdown", "target": splitter_id, "target_input": "text"})
node_counter += 1
# NaiveRAG
if v27_config.get("naive_rag", {}).get("active"):
naive_id = f"node-{node_counter:03d}"
nodes.append({
"id": naive_id, "type": "sink.ragflow_naive", "version": "1",
"name": "NaiveRAG",
"inputs": {
"chunks": {"from_node": splitter_id, "from_output": "chunks"},
"metadata": {"from_node": conv_id, "from_output": "metadata"},
},
"outputs": ["result"],
"config": {"enabled": True},
})
edges.append({"id": f"e{len(edges)+1}", "source": splitter_id,
"source_output": "chunks", "target": naive_id, "target_input": "chunks"})
edges.append({"id": f"e{len(edges)+1}", "source": conv_id,
"source_output": "metadata", "target": naive_id, "target_input": "metadata"})
node_counter += 1
# GraphRAG
active_graphrags = v27_config.get("graphrag", {}).get("active", [])
for i, gr_name in enumerate(active_graphrags):
gr_id = f"node-{node_counter:03d}"
nodes.append({
"id": gr_id, "type": "sink.ragflow_graphrag", "version": "1",
"name": f"GraphRAG-{gr_name}",
"inputs": {
"markdown": {"from_node": conv_id, "from_output": "markdown"},
"metadata": {"from_node": conv_id, "from_output": "metadata"},
},
"outputs": ["result"],
"config": {"source": gr_name, "enabled": True},
})
edges.append({"id": f"e{len(edges)+1}", "source": conv_id,
"source_output": "markdown", "target": gr_id, "target_input": "markdown"})
edges.append({"id": f"e{len(edges)+1}", "source": conv_id,
"source_output": "metadata", "target": gr_id, "target_input": "metadata"})
node_counter += 1
return {
"version": "1",
"pipeline_id": str(uuid.uuid4()),
"tenant_id": tenant_id,
"name": f"V27 配置迁移 Pipeline",
"description": "由 V27 配置自动生成",
"config": {"max_concurrency": 4, "timeout_seconds": 600},
"nodes": nodes,
"edges": edges,
}
| 方案 | 优点 | 缺点 |
|---|---|---|
| React Flow ⭐ | 专为 DAG 可视化设计,TypeScript,节点自定义能力强,与 React 技术栈契合 | 需要 React |
| JointJS | 老牌图库,功能全 | API 较旧,UI 定制成本高 |
| D3.js | 灵活,可做任何图 | 工作量大,不适合 DAG 编辑 |
| BPMN.js | 面向业务流程建模 | 不适合 RAG pipeline 场景 |
选型:React Flow
理由:
- React Flow 的自定义节点 API 完美匹配 INPUT_TYPES / OUTPUT_TYPES 强类型设计
- 支持条件边(animated edges),可用于 Router 节点
- 内置 minimap、controls、背景网格,开箱即用
- 社区活跃,TypeScript 支持好
/api/admin/node-types 动态加载)// ragateway-ui/src/components/PipelineEditor/
// - PipelineCanvas.tsx (React Flow 主画布)
// - NodePalette.tsx (左侧节点面板)
// - NodeConfigPanel.tsx (右侧节点配置面板)
// - EdgeValidation.tsx (连接校验逻辑)
import React, { useCallback, useState } from "react";
import ReactFlow, {
Node,
Edge,
Controls,
Background,
MiniMap,
addEdge,
Connection,
useNodesState,
useEdgesState,
MarkerType,
} from "reactflow";
import "reactflow/dist/style.css";
// 节点类型映射(动态从后端加载)
const NODE_TYPE_COMPONENTS = {
"source.*": SourceNodeComponent,
"converter.*": ConverterNodeComponent,
"splitter.*": SplitterNodeComponent,
"enricher.*": EnricherNodeComponent,
"router.*": RouterNodeComponent,
"sink.*": SinkNodeComponent,
};
function PipelineCanvas() {
const [nodes, setNodes, onNodesChange] = useNodesState([]);
const [edges, setEdges, onEdgesChange] = useEdgesState([]);
const [nodeTypes, setNodeTypes] = useState<NodeType[]>([]);
// 从后端加载节点类型
useEffect(() => {
fetch("/api/admin/node-types")
.then(r => r.json())
.then(data => setNodeTypes(data.node_types));
}, []);
// 添加节点
const onDragOver = useCallback((event: React.DragEvent) => {
event.preventDefault();
event.dataTransfer.dropEffect = "move";
}, []);
const onDrop = useCallback((event: React.DragEvent) => {
event.preventDefault();
const nodeType = event.dataTransfer.getData("application/reactflow");
const position = { x: event.clientX, y: event.clientY };
// 查找匹配的节点类型
const matchedType = nodeTypes.find(nt => {
const regex = new RegExp(`^${nt.type.replace("*", ".*")}$`);
return regex.test(nodeType);
});
const newNode = {
id: `node-${Date.now()}`,
type: nodeType,
position,
data: { label: matchedType?.display_name || nodeType, config: {} },
};
setNodes(nodes => [...nodes, newNode]);
}, [nodes, nodeTypes]);
// 连接校验:类型兼容性检查
const onConnect = useCallback((connection: Connection) => {
if (!connection.source || !connection.target) return;
// 从 nodeTypes 获取源输出类型和目标输入类型
const sourceNode = nodes.find(n => n.id === connection.source);
const targetNode = nodes.find(n => n.id === connection.target);
if (!sourceNode || !targetNode) return;
const sourceType = sourceNode.type;
const sourceOutput = connection.sourceHandle;
const targetInput = connection.targetHandle;
// 验证类型兼容性
const sourceNodeMeta = nodeTypes.find(nt => nt.type === sourceType);
if (!sourceNodeMeta) return;
const outputDef = sourceNodeMeta.outputs.find(o => o.name === sourceOutput);
if (!outputDef) return;
// 如果类型兼容,添加边
setEdges(eds => addEdge({
...connection,
animated: true,
markerEnd: { type: MarkerType.ArrowClosed },
style: { stroke: "#6366f1" },
}, eds));
}, [nodes, nodeTypes]);
return (
<div style={{ height: "100vh", display: "flex" }}>
{/* 左侧:节点面板 */}
<NodePalette nodeTypes={nodeTypes} />
{/* 中间:画布 */}
<div style={{ flex: 1 }}>
<ReactFlow
nodes={nodes}
edges={edges}
onNodesChange={onNodesChange}
onEdgesChange={onEdgesChange}
onConnect={onConnect}
onDragOver={onDragOver}
onDrop={onDrop}
>
<Controls />
<MiniMap />
<Background />
</ReactFlow>
</div>
{/* 右侧:节点配置面板 */}
{selectedNode && (
<NodeConfigPanel
node={selectedNode}
nodeType={nodeTypes.find(nt => nt.type === selectedNode.type)}
onUpdate={(config) => updateNodeConfig(selectedNode.id, config)}
/>
)}
</div>
);
}
节点配置面板:
// NodeConfigPanel.tsx
function NodeConfigPanel({ node, nodeType, onUpdate }) {
if (!nodeType) return null;
return (
<div style={{ width: 320, borderLeft: "1px solid #e5e7eb", padding: 16 }}>
<h3>{nodeType.display_name}</h3>
<p style={{ color: "#6b7280", fontSize: 13 }}>{nodeType.description}</p>
{nodeType.config_fields.map(field => (
<div key={field.name} style={{ marginBottom: 12 }}>
<label style={{ display: "block", fontSize: 13, fontWeight: 500 }}>
{field.label}
{field.required && <span style={{ color: "red" }}>*</span>}
</label>
{field.type === "select" && (
<select
value={node.data.config?.[field.name] ?? field.default}
onChange={e => onUpdate({ [field.name]: e.target.value })}
style={{ width: "100%", padding: 6, borderRadius: 6 }}
>
{field.options.map(opt => (
<option key={opt} value={opt}>{opt}</option>
))}
</select>
)}
{field.type === "string" && (
<input
type="text"
value={node.data.config?.[field.name] ?? field.default ?? ""}
onChange={e => onUpdate({ [field.name]: e.target.value })}
style={{ width: "100%", padding: 6, borderRadius: 6 }}
/>
)}
{field.type === "integer" && (
<input
type="number"
value={node.data.config?.[field.name] ?? field.default ?? 0}
onChange={e => onUpdate({ [field.name]: parseInt(e.target.value) })}
style={{ width: "100%", padding: 6, borderRadius: 6 }}
/>
)}
</div>
))}
</div>
);
}
// PipelineMonitor.tsx
// 展示 DAG 执行状态:每个节点的颜色代表执行状态(pending/running/success/failed)
const NODE_STATUS_COLORS = {
pending: "#e5e7eb", // 灰
running: "#60a5fa", // 蓝(动画)
success: "#34d399", // 绿
failed: "#f87171", // 红
skipped: "#fbbf24", // 黄
};
function PipelineMonitor({ executionId }: { executionId: string }) {
const [status, setStatus] = useState<Record<string, string>>({});
// WebSocket 实时接收节点状态更新
useEffect(() => {
const ws = new WebSocket(`/ws/pipeline/execution/${executionId}`);
ws.onmessage = (event) => {
const data = JSON.parse(event.data);
setStatus(prev => ({ ...prev, [data.node_id]: data.status }));
};
return () => ws.close();
}, [executionId]);
return (
<div>
<h3>Pipeline 执行监控</h3>
<div style={{ display: "grid", gridTemplateColumns: "repeat(5, 1fr)", gap: 8 }}>
{Object.entries(status).map(([nodeId, nodeStatus]) => (
<div
key={nodeId}
style={{
padding: 12,
borderRadius: 8,
background: NODE_STATUS_COLORS[nodeStatus],
color: "white",
textAlign: "center",
}}
>
<div>{nodeId}</div>
<div style={{ fontSize: 12 }}>{nodeStatus}</div>
</div>
))}
</div>
</div>
);
}
GET /api/admin/node-types 完整响应 JSON 结构{
"node_types": [
{
"type": "converter.mineru",
"version": "1",
"category": "Converter",
"display_name": "MinerU 文档转换器",
"description": "使用 MinerU 引擎将 PDF/DOCX 等文档转换为 Markdown,支持表格、公式、图表",
"icon": "📄",
"input_defs": [
{
"name": "file",
"type": "file",
"required": true,
"description": "原始文件路径(由 Source 节点提供)",
"default": null
}
],
"output_defs": [
{"name": "markdown", "type": "markdown", "description": "转换后的 Markdown 文本"},
{"name": "toc", "type": "toc", "description": "目录结构(层级 JSON)"},
{"name": "metadata", "type": "metadata", "description": "文档元数据(页数、字数、语言等)"}
],
"config_defs": [
{
"name": "backend",
"type": "select",
"label": "转换引擎",
"required": false,
"default": "hybrid-auto-engine",
"options": ["hybrid-auto-engine", "vlm-auto-engine", "pipeline"],
"description": "MinerU 内部转换引擎"
},
{
"name": "lang_list",
"type": "list",
"label": "支持语言",
"required": false,
"default": ["en", "zh"],
"description": "文档语言列表"
},
{
"name": "timeout",
"type": "integer",
"label": "超时(秒)",
"required": false,
"default": 300,
"min": 60,
"max": 1800,
"description": "转换超时时间"
}
]
},
{
"type": "router.file_type",
"version": "1",
"category": "Router",
"display_name": "文件类型路由器",
"description": "根据 metadata.file_type 将内容路由到不同的分支处理路径",
"icon": "🔀",
"input_defs": [
{"name": "metadata", "type": "metadata", "required": true, "description": "包含 file_type 的元数据"}
],
"output_defs": [
{"name": "pdf_branch", "type": "control", "description": "PDF 文件分支"},
{"name": "docx_branch", "type": "control", "description": "Word 文件分支"},
{"name": "md_branch", "type": "control", "description": "Markdown 文件分支"},
{"name": "default_branch", "type": "control", "description": "默认分支(不匹配任何规则时)"}
],
"config_defs": [
{
"name": "routes",
"type": "list",
"label": "路由规则",
"required": false,
"default": [],
"description": "条件 → 分支映射列表"
}
]
}
],
"categories": ["Source", "Converter", "Splitter", "Enricher", "Router", "Sink"],
"total_count": 24,
"generated_at": "2026-05-20T10:00:00Z"
}
API 端点说明:
| 方法 | 路径 | 说明 |
|---|---|---|
| GET | /api/admin/node-types |
列出所有已注册节点类型(内置 + 第三方) |
| GET | /api/admin/node-types?category=Converter |
按分类过滤 |
| GET | /api/admin/node-types/{type} |
获取特定节点详情(含完整 config_schema) |
| GET | /api/admin/node-types/{type}/versions |
获取该类型所有版本 |
WebSocket 连接:
ws://ragateway-host/api/pipeline/executions/{execution_id}/ws
服务端 → 客户端事件格式(JSON):
// 事件1:Pipeline 启动
{
"event": "execution_started",
"execution_id": "exec-uuid-xxx",
"pipeline_id": "pipeline-uuid-xxx",
"total_nodes": 6,
"started_at": "2026-05-20T10:30:00Z"
}
// 事件2:节点开始执行
{
"event": "node_started",
"execution_id": "exec-uuid-xxx",
"node_id": "node-002",
"node_name": "文档转换",
"node_type": "converter.mineru",
"layer_index": 1,
"started_at": "2026-05-20T10:30:01Z"
}
// 事件3:节点执行成功
{
"event": "node_completed",
"execution_id": "exec-uuid-xxx",
"node_id": "node-002",
"node_type": "converter.mineru",
"duration_ms": 8450,
"outputs": {
"markdown": {"item_count": 12, "total_chars": 45230},
"toc": {"item_count": 1},
"metadata": {"item_count": 1}
}
}
// 事件4:节点执行失败
{
"event": "node_failed",
"execution_id": "exec-uuid-xxx",
"node_id": "node-003",
"node_type": "splitter.semantic",
"error": "LLM timeout after 120s",
"strategy_applied": "continue",
"failed_at": "2026-05-20T10:30:15Z"
}
// 事件5:节点跳过(SKIP 策略)
{
"event": "node_skipped",
"execution_id": "exec-uuid-xxx",
"node_id": "node-005",
"reason": "Upstream node node-003 failed with strategy=skip",
"skipped_at": "2026-05-20T10:30:16Z"
}
// 事件6:Pipeline 执行完成
{
"event": "execution_completed",
"execution_id": "exec-uuid-xxx",
"status": "indexed",
"total_duration_ms": 52340,
"completed_nodes": 5,
"failed_nodes": 1,
"node_summary": [
{"node_id": "node-001", "status": "success", "duration_ms": 120},
{"node_id": "node-002", "status": "success", "duration_ms": 8450},
{"node_id": "node-003", "status": "success", "duration_ms": 3200},
{"node_id": "node-004", "status": "success", "duration_ms": 50},
{"node_id": "node-005", "status": "failed", "duration_ms": 120000, "error": "timeout"},
{"node_id": "node-006", "status": "skipped", "duration_ms": 0}
],
"completed_at": "2026-05-20T10:30:52Z"
}
客户端 SDK 示例(前端消费 WebSocket 事件):
// ragateway-ui/src/hooks/usePipelineExecution.ts
import { useEffect, useRef, useState } from "react";
interface NodeStatus {
nodeId: string;
status: "pending" | "running" | "success" | "failed" | "skipped";
startedAt?: string;
durationMs?: number;
error?: string;
}
interface ExecutionProgress {
executionId: string;
totalNodes: number;
nodes: Record<string, NodeStatus>;
}
export function usePipelineExecution(executionId: string) {
const [progress, setProgress] = useState<ExecutionProgress | null>(null);
const wsRef = useRef<WebSocket | null>(null);
useEffect(() => {
if (!executionId) return;
const protocol = window.location.protocol === "https:" ? "wss:" : "ws:";
const wsUrl = `${protocol}//${window.location.host}/api/pipeline/executions/${executionId}/ws`;
const ws = new WebSocket(wsUrl);
wsRef.current = ws;
ws.onopen = () => {
console.log(`[WS] Connected to execution ${executionId}`);
};
ws.onmessage = (event) => {
const data = JSON.parse(event.data);
switch (data.event) {
case "execution_started":
setProgress({
executionId: data.execution_id,
totalNodes: data.total_nodes,
nodes: {},
});
break;
case "node_started":
setProgress(prev => prev ? {
...prev,
nodes: {
...prev.nodes,
[data.node_id]: { nodeId: data.node_id, status: "running", startedAt: data.started_at },
},
} : prev);
break;
case "node_completed":
setProgress(prev => prev ? {
...prev,
nodes: {
...prev.nodes,
[data.node_id]: {
nodeId: data.node_id,
status: "success",
durationMs: data.duration_ms,
},
},
} : prev);
break;
case "node_failed":
setProgress(prev => prev ? {
...prev,
nodes: {
...prev.nodes,
[data.node_id]: {
nodeId: data.node_id,
status: "failed",
error: data.error,
},
},
} : prev);
break;
case "node_skipped":
setProgress(prev => prev ? {
...prev,
nodes: {
...prev.nodes,
[data.node_id]: { nodeId: data.node_id, status: "skipped" },
},
} : prev);
break;
case "execution_completed":
// 全部完成,可在 UI 显示最终汇总
console.log(`[WS] Execution completed: ${data.status}`, data.node_summary);
break;
}
};
ws.onerror = (err) => {
console.error("[WS] Error:", err);
};
ws.onclose = () => {
console.log(`[WS] Disconnected from execution ${executionId}`);
};
return () => ws.close();
}, [executionId]);
return progress;
}
// ragateway-ui/src/components/PipelineEditor/customNodes/
// - SourceNode.tsx
// - ConverterNode.tsx
// - SplitterNode.tsx
// - EnricherNode.tsx
// - RouterNode.tsx (多输出端口)
// - SinkNode.tsx
import React, { memo } from "react";
import { Handle, Position, NodeProps } from "reactflow";
import { NODE_STATUS_COLORS } from "../PipelineMonitor";
// ── 基础节点组件模板 ──────────────────────────────────────────────────────
const BaseNode = memo(({ data, status, icon, label }: any) => {
return (
<div style={{
background: "white",
border: `2px solid ${NODE_STATUS_COLORS[status] || "#e5e7eb"}`,
borderRadius: 10,
minWidth: 160,
boxShadow: "0 2px 8px rgba(0,0,0,0.08)",
transition: "border-color 0.3s",
}}>
{/* 输入 Handle(左侧) */}
<Handle
type="target"
position={Position.Left}
style={{ background: "#6366f1", width: 10, height: 10 }}
/>
{/* 节点头部 */}
<div style={{
padding: "8px 12px",
borderBottom: "1px solid #f0f4f8",
display: "flex",
alignItems: "center",
gap: 8,
background: "#f8fafc",
borderRadius: "8px 8px 0 0",
}}>
<span style={{ fontSize: 18 }}>{icon}</span>
<span style={{ fontWeight: 600, fontSize: 13, color: "#334155" }}>{label}</span>
{status === "running" && (
<span style={{
marginLeft: "auto",
width: 8, height: 8,
borderRadius: "50%",
background: "#60a5fa",
animation: "pulse 1s infinite",
}} />
)}
</div>
{/* 节点内容(config 摘要) */}
<div style={{ padding: "8px 12px", fontSize: 12, color: "#64748b" }}>
{Object.entries(data.config || {}).slice(0, 3).map(([k, v]) => (
<div key={k}>
<span style={{ color: "#94a3b8" }}>{k}:</span> {String(v).slice(0, 20)}
</div>
))}
</div>
{/* 输出 Handle(右侧) */}
<Handle
type="source"
position={Position.Right}
style={{ background: "#6366f1", width: 10, height: 10 }}
/>
</div>
);
});
// ── Converter 节点 ────────────────────────────────────────────────────────
export const ConverterNode = memo((props: NodeProps) => (
<BaseNode {...props} icon="📄" label={props.data.label || "Converter"} />
));
// ── Sink 节点(RAG 摄入) ──────────────────────────────────────────────────
export const SinkNode = memo((props: NodeProps) => (
<BaseNode {...props} icon="📥" label={props.data.label || "Sink"} />
));
// ── Router 节点(多输出端口) ───────────────────────────────────────────────
export const RouterNode = memo((props: NodeProps) => {
const outputBranches = props.data.outputBranches || ["output"];
return (
<div style={{
background: "white",
border: "2px solid #fbbf24",
borderRadius: 10,
minWidth: 180,
boxShadow: "0 2px 8px rgba(0,0,0,0.08)",
}}>
{/* 多个输入 Handle */}
<Handle
type="target"
position={Position.Left}
id="input"
style={{ background: "#6366f1", width: 10, height: 10 }}
/>
{/* 节点头部 */}
<div style={{
padding: "8px 12px",
background: "#fefce8",
borderBottom: "1px solid #fef08a",
borderRadius: "8px 8px 0 0",
display: "flex",
alignItems: "center",
gap: 8,
}}>
<span style={{ fontSize: 18 }}>🔀</span>
<span style={{ fontWeight: 600, fontSize: 13 }}>{props.data.label || "Router"}</span>
</div>
{/* 分支标签 */}
<div style={{ padding: "6px 12px" }}>
{outputBranches.map((branch: string, idx: number) => (
<div key={branch} style={{
display: "flex",
alignItems: "center",
marginBottom: 4,
fontSize: 12,
color: "#92400e",
}}>
<span style={{ color: "#fbbf24", marginRight: 6 }}>●</span>
{branch}
</div>
))}
</div>
{/* 每个分支的输出 Handle(右侧,多个) */}
{outputBranches.map((branch: string, idx: number) => (
<Handle
key={branch}
type="source"
position={Position.Right}
id={branch}
style={{
top: `${40 + idx * 28}px`,
background: "#f59e0b",
width: 10,
height: 10,
}}
/>
))}
</div>
);
});
// 节点类型注册(供 React Flow 使用)
export const CUSTOM_NODE_TYPES = {
"converter.*": ConverterNode,
"splitter.*": SplitterNode,
"enricher.*": EnricherNode,
"router.*": RouterNode,
"sink.*": SinkNode,
"source.*": SourceNode,
};
基于 1 名高级后端工程师 + 1 名前端工程师,分模块估算。
| 模块 | 任务 | 人天 |
|---|---|---|
| 核心框架 | NodeRegistry + 注册装饰器 + 内置节点骨架 | 3 天 |
| DAGExecutor | 拓扑排序、并行执行、超时控制 | 4 天 |
| VariablePool | 变量池实现 + PipelineItem 数据结构 | 2 天 |
| 错误处理 | 三策略(Stop/Continue/Skip)+ 重试逻辑 | 2 天 |
| 内置节点:Source/Converter | source.file_store, converter.mineru/docling/markitdown | 3 天 |
| 内置节点:Splitter/Enricher | splitter.recursive/semantic/fixed, enricher.metadata | 3 天 |
| 内置节点:Router | router.file_type, router.metadata, router.if_else | 3 天 |
| 内置节点:Sink | sink.ragflow_naive, sink.ragflow_graphrag, sink.neo4j_wiki | 3 天 |
| API 层 | Pipeline CRUD + activate + node-types 端点 | 3 天 |
| 数据库 | tenant_pipelines 表 + pipeline_executions 表 | 2 天 |
| V27 兼容层 | V27_DEFAULT_PIPELINE + 配置迁移函数 | 2 天 |
| 集成测试 | Pipeline 端到端测试 + 异常测试 | 3 天 |
| 合计后端 | 33 天 |
| 模块 | 任务 | 人天 |
|---|---|---|
| React Flow 集成 | 画布、拖拽、节点移动、保存/加载 | 4 天 |
| 节点面板 | 从 /api/admin/node-types 动态加载 + 分类展示 |
2 天 |
| 节点配置面板 | 动态表单(select/string/integer) | 3 天 |
| 连接校验 | 类型兼容性检查 + 非法连接过滤 | 2 天 |
| Router 节点特殊 UI | 多输出端口 + 分支颜色标识 | 2 天 |
| Pipeline 列表页 | CRUD 列表 + 激活状态展示 | 2 天 |
| 执行监控面板 | WebSocket 实时状态 + 节点颜色变化 | 3 天 |
| DAG 导入/导出 | JSON 导出 + 导入验证 | 2 天 |
| 合计前端 | 20 天 |
| 角色 | 人天 |
|---|---|
| 高级后端工程师 | 33 天 |
| 前端工程师 | 20 天 |
| 总计 | 53 人天(约 10.6 周) |
| 版本 | 日期 | 作者 | 变更内容 |
|---|---|---|---|
| V28 初稿 | 2026-05-20 | 技术架构师 | 初始版本,基于 n8n/dify/ComfyUI 源码研究 |
以下列举 3 个 RegDesk 客户的典型 Pipeline 配置,分别对应不同的业务场景与优先级。
业务背景: 某医疗器械客户处理 FDA 510(k) 申报文档,PDF 格式,包含复杂表格、公式、图表。要求:
- 完整保留原文结构(表格、标题层级)
- 支持语义分块(按段落/Section 切分)
- 同时建立 NaiveRAG(精准匹配)和 GraphRAG General(全局关联)
Pipeline 配置:
{
"version": "1",
"pipeline_id": "pipeline-510k-quality",
"tenant_id": "tenant-abc-123",
"name": "510(k) 文档处理(质量优先)",
"description": "用于 FDA 510(k) 申报文档的高质量处理流程",
"config": {
"max_concurrency": 2,
"timeout_seconds": 900,
"retry_on_node_fail": true,
"max_retries": 2,
"continue_on_error": false
},
"nodes": [
{
"id": "src-001",
"type": "source.file_store",
"version": "1",
"name": "文件来源",
"position": {"x": 0, "y": 200},
"inputs": {},
"outputs": ["file"],
"config": {}
},
{
"id": "conv-001",
"type": "converter.mineru",
"version": "1",
"name": "MinerU 转换(表格+公式)",
"position": {"x": 250, "y": 200},
"inputs": {"file": {"from_node": "src-001", "from_output": "file"}},
"outputs": ["markdown", "toc", "metadata"],
"config": {
"backend": "hybrid-auto-engine",
"lang_list": ["en"],
"timeout": 600,
"extract_tables": true,
"extract_formulas": true,
"preserve_layout": true
}
},
{
"id": "enr-001",
"type": "enricher.metadata",
"version": "1",
"name": "元数据提取",
"position": {"x": 500, "y": 200},
"inputs": {
"text": {"from_node": "conv-001", "from_output": "markdown"},
"metadata": {"from_node": "conv-001", "from_output": "metadata"}
},
"outputs": ["metadata"],
"config": {
"extract_fields": ["title", "date", "version", "document_id"],
"language_detection": {"enabled": true, "default_lang": "en"}
}
},
{
"id": "split-001",
"type": "splitter.semantic",
"version": "1",
"name": "语义分块(512 tokens)",
"position": {"x": 750, "y": 200},
"inputs": {"text": {"from_node": "conv-001", "from_output": "markdown"}},
"outputs": ["chunks"],
"config": {
"strategy": "semantic",
"chunk_size": 512,
"chunk_overlap": 64,
"embedding_model": "bge-m3",
"split_by": "sentence"
}
},
{
"id": "enr-002",
"type": "enricher.chunk_meta",
"version": "1",
"name": "Chunk 元数据注入",
"position": {"x": 1000, "y": 200},
"inputs": {
"chunks": {"from_node": "split-001", "from_output": "chunks"},
"metadata": {"from_node": "enr-001", "from_output": "metadata"}
},
"outputs": ["chunks"],
"config": {
"inject_fields": ["document_id", "title", "language", "chunk_index"]
}
},
{
"id": "sink-naive",
"type": "sink.ragflow_naive",
"version": "1",
"name": "NaiveRAG 摄入",
"position": {"x": 1250, "y": 100},
"inputs": {
"chunks": {"from_node": "enr-002", "from_output": "chunks"},
"metadata": {"from_node": "enr-001", "from_output": "metadata"}
},
"outputs": ["result"],
"config": {"enabled": true, "dedup": true}
},
{
"id": "sink-graphrag",
"type": "sink.ragflow_graphrag",
"version": "1",
"name": "GraphRAG General 摄入",
"position": {"x": 1250, "y": 300},
"inputs": {
"markdown": {"from_node": "conv-001", "from_output": "markdown"},
"metadata": {"from_node": "enr-001", "from_output": "metadata"}
},
"outputs": ["result"],
"config": {"source": "ragflow_general", "enabled": true}
}
],
"edges": [
{"id": "e1", "source": "src-001", "source_output": "file", "target": "conv-001", "target_input": "file"},
{"id": "e2", "source": "conv-001", "source_output": "markdown", "target": "enr-001", "target_input": "text"},
{"id": "e3", "source": "conv-001", "source_output": "metadata", "target": "enr-001", "target_input": "metadata"},
{"id": "e4", "source": "conv-001", "source_output": "markdown", "target": "split-001", "target_input": "text"},
{"id": "e5", "source": "enr-001", "source_output": "metadata", "target": "enr-002", "target_input": "metadata"},
{"id": "e6", "source": "split-001", "source_output": "chunks", "target": "enr-002", "target_input": "chunks"},
{"id": "e7", "source": "enr-002", "source_output": "chunks", "target": "sink-naive", "target_input": "chunks"},
{"id": "e8", "source": "enr-001", "source_output": "metadata", "target": "sink-naive", "target_input": "metadata"},
{"id": "e9", "source": "conv-001", "source_output": "markdown", "target": "sink-graphrag", "target_input": "markdown"},
{"id": "e10", "source": "enr-001", "source_output": "metadata", "target": "sink-graphrag", "target_input": "metadata"}
]
}
节点执行层级(Kahn 拓扑排序结果):
- Layer 0: src-001
- Layer 1: conv-001
- Layer 2: enr-001, split-001(并行)
- Layer 3: enr-002
- Layer 4: sink-naive, sink-graphrag(并行)
业务背景: 某客户每日需处理数百份各国法规标准 PDF(ISO/IEC/AAMI 等),以"快速入库、尽早可查"为目标。侧重处理速度,牺牲部分精度。
Pipeline 配置:
{
"version": "1",
"pipeline_id": "pipeline-regulatory-speed",
"tenant_id": "tenant-def-456",
"name": "法规标准处理(速度优先)",
"description": "快速处理大量法规标准文档,以入库速度为优先",
"config": {
"max_concurrency": 8,
"timeout_seconds": 300,
"retry_on_node_fail": true,
"max_retries": 1,
"continue_on_error": true
},
"nodes": [
{
"id": "src-001",
"type": "source.file_store",
"version": "1",
"name": "文件来源",
"position": {"x": 0, "y": 200},
"inputs": {},
"outputs": ["file"],
"config": {}
},
{
"id": "conv-001",
"type": "converter.mineru",
"version": "1",
"name": "MinerU 快速转换",
"position": {"x": 250, "y": 200},
"inputs": {"file": {"from_node": "src-001", "from_output": "file"}},
"outputs": ["markdown", "metadata"],
"config": {
"backend": "pipeline",
"lang_list": ["en", "zh"],
"timeout": 120,
"skip_tables": false
}
},
{
"id": "split-001",
"type": "splitter.fixed",
"version": "1",
"name": "固定分块(256字符)",
"position": {"x": 500, "y": 200},
"inputs": {"text": {"from_node": "conv-001", "from_output": "markdown"}},
"outputs": ["chunks"],
"config": {
"strategy": "fixed",
"chunk_size": 256,
"chunk_overlap": 32,
"split_by": "character"
}
},
{
"id": "sink-graphrag-light",
"type": "sink.ragflow_graphrag",
"version": "1",
"name": "GraphRAG Light 摄入",
"position": {"x": 750, "y": 200},
"inputs": {
"markdown": {"from_node": "conv-001", "from_output": "markdown"},
"metadata": {"from_node": "conv-001", "from_output": "metadata"}
},
"outputs": ["result"],
"config": {
"source": "ragflow_light",
"enabled": true
}
}
],
"edges": [
{"id": "e1", "source": "src-001", "source_output": "file", "target": "conv-001", "target_input": "file"},
{"id": "e2", "source": "conv-001", "source_output": "markdown", "target": "split-001", "target_input": "text"},
{"id": "e3", "source": "conv-001", "source_output": "markdown", "target": "sink-graphrag-light", "target_input": "markdown"},
{"id": "e4", "source": "conv-001", "source_output": "metadata", "target": "sink-graphrag-light", "target_input": "metadata"}
]
}
优化策略说明:
- max_concurrency: 8:高并发处理,充分利用 CPU
- splitter.fixed:固定 256 字符分块,速度远快于 semantic 分块(无需 LLM 调用)
- GraphRAG Light:跳过 Community Detection,直接建图建向量,速度提升 3-5x
- continue_on_error: true:个别文件失败不影响整体批次
业务背景: 某客户的内部 SOP、技术规范以 Markdown 格式存储,无需 PDF 转换,直接分块入库。
Pipeline 配置:
{
"version": "1",
"pipeline_id": "pipeline-markdown-kb",
"tenant_id": "tenant-ghi-789",
"name": "Markdown 知识库处理",
"description": "内部 Markdown 文档知识库,无需格式转换",
"config": {
"max_concurrency": 4,
"timeout_seconds": 120,
"retry_on_node_fail": false,
"max_retries": 0,
"continue_on_error": false
},
"nodes": [
{
"id": "src-001",
"type": "source.file_store",
"version": "1",
"name": "文件来源",
"position": {"x": 0, "y": 200},
"inputs": {},
"outputs": ["file"],
"config": {"file_pattern": "*.md"}
},
{
"id": "conv-001",
"type": "converter.skip",
"version": "1",
"name": "跳过转换(.md 直传)",
"position": {"x": 250, "y": 200},
"inputs": {"file": {"from_node": "src-001", "from_output": "file"}},
"outputs": ["markdown", "metadata"],
"config": {
"pass_through": true,
"extract_frontmatter": true
}
},
{
"id": "enr-001",
"type": "enricher.metadata",
"version": "1",
"name": "Markdown 元数据提取",
"position": {"x": 500, "y": 200},
"inputs": {
"text": {"from_node": "conv-001", "from_output": "markdown"},
"metadata": {"from_node": "conv-001", "from_output": "metadata"}
},
"outputs": ["metadata"],
"config": {
"extract_fields": ["title", "date", "author", "tags"],
"language_detection": {"enabled": true, "default_lang": "auto"}
}
},
{
"id": "split-001",
"type": "splitter.fixed",
"version": "1",
"name": "Markdown 固定分块(1024字符)",
"position": {"x": 750, "y": 200},
"inputs": {"text": {"from_node": "conv-001", "from_output": "markdown"}},
"outputs": ["chunks"],
"config": {
"strategy": "fixed",
"chunk_size": 1024,
"chunk_overlap": 128,
"split_by": "markdown-header"
}
},
{
"id": "enr-002",
"type": "enricher.chunk_meta",
"version": "1",
"name": "Chunk 标题注入",
"position": {"x": 1000, "y": 200},
"inputs": {
"chunks": {"from_node": "split-001", "from_output": "chunks"},
"metadata": {"from_node": "enr-001", "from_output": "metadata"}
},
"outputs": ["chunks"],
"config": {
"inject_fields": ["title", "author", "language"]
}
},
{
"id": "sink-naive",
"type": "sink.ragflow_naive",
"version": "1",
"name": "NaiveRAG 摄入",
"position": {"x": 1250, "y": 200},
"inputs": {
"chunks": {"from_node": "enr-002", "from_output": "chunks"},
"metadata": {"from_node": "enr-001", "from_output": "metadata"}
},
"outputs": ["result"],
"config": {"enabled": true}
}
],
"edges": [
{"id": "e1", "source": "src-001", "source_output": "file", "target": "conv-001", "target_input": "file"},
{"id": "e2", "source": "conv-001", "source_output": "markdown", "target": "enr-001", "target_input": "text"},
{"id": "e3", "source": "conv-001", "source_output": "metadata", "target": "enr-001", "target_input": "metadata"},
{"id": "e4", "source": "conv-001", "source_output": "markdown", "target": "split-001", "target_input": "text"},
{"id": "e5", "source": "enr-001", "source_output": "metadata", "target": "enr-002", "target_input": "metadata"},
{"id": "e6", "source": "split-001", "source_output": "chunks", "target": "enr-002", "target_input": "chunks"},
{"id": "e7", "source": "enr-002", "source_output": "chunks", "target": "sink-naive", "target_input": "chunks"},
{"id": "e8", "source": "enr-001", "source_output": "metadata", "target": "sink-naive", "target_input": "metadata"}
]
}
关键节点说明:
- converter.skip:跳过转换步骤,文件直接作为 Markdown 传递(节省 MinerU 转换时间)
- split_by: markdown-header:按 Markdown 标题(#、##、###)分割,保持文档结构完整性
- chunk_size: 1024:Markdown 文档每块较大,因为 Markdown 格式已天然分节
- 仅 NaiveRAG:内部知识库以关键词精确匹配为主,不需要 GraphRAG 的全局关联能力
| 文件 | 用途 |
|---|---|
packages/workflow/src/interfaces.ts |
INode, IConnections, IWorkflowBase 接口定义 |
packages/workflow/src/workflow.ts |
Workflow 类,节点图操作 |
packages/core/src/executors/workflow-execute.ts |
WorkflowExecute 执行引擎 |
packages/workflow/src/errors/ |
错误类型定义 |
| 文件 | 用途 |
|---|---|
api/core/workflow/node_factory.py |
resolve_workflow_node_class, 节点工厂 |
api/core/workflow/workflow_entry.py |
WorkflowEntry, SingleNodeGraphDict |
api/core/workflow/nodes/knowledge_retrieval/knowledge_retrieval_node.py |
KnowledgeRetrievalNode 实现参考 |
graphon/nodes/base/node.py |
Node 基类(dify 重构后的 graphon 子系统) |
| 设计 | 位置 | 借鉴点 |
|---|---|---|
| NODE_CLASS_MAPPINGS | 全局字典 | 装饰器注册 |
| INPUT_TYPES | 类方法 | 强类型约束 |
| RETURN_TYPES | 类属性 | 输出类型声明 |
| model_management.py | 执行缓存 | 节点结果缓存机制 |