返回 Skill 列表
extension
分类: 其它无需 API Key

智能工作流

agent-workflow-orchestrator

person作者: user_af28addahubcommunity

Agent Workflow Orchestrator v2.0

AI Agent 原生工作流编排器


定位对比

| 特性 | Airflow/DataWorks | 虾评工作流引擎 | 本技能 | |------|-------------------|---------------|-----------| | 定位 | 企业级调度平台 | 通用任务编排 | AI Agent 工作流 + 数据处理 | | 数据处理 | 依赖Operator | 无 | 内置8种数据节点 | | 执行模式 | 串行/后台 | 串行 | 串行/并行/条件路由 | | AI能力 | 无 | 无 | 自然语言生成DAG | | 技能调用 | 无 | 有 | 支持OpenClaw技能 | | 安全验证 | 基础 | 基础 | SQL白名单+路径校验+资源限制 | | 部署 | 需安装/配置 | 云端 | 零依赖+单文件运行 | | 适用场景 | 大规模ETL | 日常自动化 | 数据处理流水线+AI Agent编排 |

独特价值

用户需求 → 自然语言 → AI生成DAG → 串行/并行/条件执行 → 技能调用 → 结果输出
                      ↓
              数据清洗 → 数据转换 → 数据聚合 → 可视化
                              ↓
                        [调用数据分析技能]

快速开始

1. 执行工作流

# 从JSON文件执行
python workflow_cli.py run workflow.json

# 并行模式执行
python workflow_cli.py run workflow.json --mode parallel

# 仅验证
python workflow_cli.py run workflow.json --validate-only

# 模拟运行
python workflow_cli.py run workflow.json --dry-run

2. 从自然语言生成

python workflow_cli.py generate \
  "从sales.csv读取数据,过滤status=completed,按category聚合求和,输出到summary.csv"

3. 生成可视化

python workflow_cli.py visualize workflow.json -o workflow.html

DAG 定义格式

{
  "workflow_id": "my_workflow",
  "name": "我的工作流",
  "version": "2.0.0",
  "execution_mode": "auto",
  "on_failure": "stop",
  
  "nodes": [
    {
      "id": "node_1",
      "type": "DataIn",
      "name": "读取数据",
      "config": {
        "path": "data/sales.csv",
        "format": "csv"
      }
    },
    {
      "id": "node_2",
      "type": "Filter",
      "name": "过滤数据",
      "depends_on": ["node_1"],
      "config": {
        "conditions": [
          {"field": "status", "operator": "==", "value": "completed"}
        ],
        "logic": "and"
      }
    },
    {
      "id": "node_3",
      "type": "DataOut",
      "name": "输出数据",
      "depends_on": ["node_2"],
      "config": {
        "path": "output/summary.csv"
      }
    }
  ],
  
  "edges": [
    {"from": "node_1", "to": "node_2"},
    {"from": "node_2", "to": "node_3"}
  ]
}

节点类型

| 类型 | 说明 | 配置示例 | |------|------|----------| | DataIn | 数据输入 | {"path": "data.csv", "format": "csv"} | | DataOut | 数据输出 | {"path": "output.csv", "format": "csv"} | | Filter | 数据过滤 | {"conditions": [{"field": "status", "operator": "==", "value": "active"}]} | | Join | 数据关联 | {"left_key": "id", "right_key": "id", "join_type": "left"} | | Aggregate | 数据聚合 | {"group_by": ["category"], "aggregations": [{"func": "sum", "field": "amount"}]} | | Python | Python脚本 | {"code": "output_data = input_data * 2"} | | Branch | 条件分支 | {"condition": "result > 0", "true_branch": "node_a", "false_branch": "node_b"} | | CLI | 外部命令 | {"command": "echo hello"} | | Skill | 技能调用 | {"skill": "smart-data-analyzer"} |


执行模式

1. 串行流水线 (sequential)

按拓扑排序顺序依次执行每个节点。

{
  "execution_mode": "sequential"
}

2. 并行协作 (parallel)

无依赖的节点并发执行。

{
  "execution_mode": "parallel"
}

3. 自动模式 (auto)

根据DAG结构自动选择最优执行方式:

  • 存在条件分支 → 条件路由
  • 存在并行机会 → 并行执行
  • 简单链式 → 串行执行
{
  "execution_mode": "auto"
}

条件路由

支持 if-else 分支,根据上游节点输出决定执行路径。

{
  "nodes": [
    {"id": "check", "type": "DataIn", "name": "数据检查"},
    {"id": "branch", "type": "Branch", "depends_on": ["check"]},
    {"id": "process_valid", "type": "Filter"},
    {"id": "process_invalid", "type": "Filter"}
  ],
  "edges": [
    {"from": "check", "to": "branch"},
    {"from": "branch", "to": "process_valid", "condition": true},
    {"from": "branch", "to": "process_invalid", "condition": false}
  ]
}

技能调用

支持调用 OpenClaw 平台的技能。

{
  "id": "analyze",
  "type": "Skill",
  "name": "数据分析",
  "skill": "smart-data-analyzer",
  "config": {
    "query": "SELECT * FROM sales"
  }
}

支持的技能

| 技能名 | 说明 | |--------|------| | smart-data-analyzer | DuckDB数据分析 | | echarts-viz | ECharts可视化 | | chartjs-reporter | Chart.js报表 | | excel-workbench | Excel工作台 | | duckdb-data-analysis | DuckDB分析 | | mysql-analytics | MySQL分析 | | postgres-analytics | PostgreSQL分析 | | data-visualization | 数据可视化 |


安全机制

1. SQL白名单

# 允许的关键字
ALLOWED_KEYWORDS = {'SELECT', 'INSERT', 'UPDATE', 'CREATE', 'ALTER'}

# 禁止的关键字
FORBIDDEN_KEYWORDS = {'DROP', 'DELETE', 'TRUNCATE', 'EXEC', 'GRANT'}

2. 路径校验

禁止路径遍历攻击:

  • ../ 模式
  • URL编码的遍历
  • 符号链接到敏感目录

3. 资源限制

| 限制项 | 默认值 | |--------|--------| | 最大执行时间 | 3600秒 | | 最大节点数 | 100 | | 最大文件大小 | 100MB | | 最大内存 | 512MB |


API 使用

Python API

from dag_runner import DAGRunner, ExecutionMode
from node_executor import NodeExecutor

# 创建执行器
executor = NodeExecutor()

# 加载工作流
runner = DAGRunner(workflow=workflow, execution_mode=ExecutionMode.AUTO)

# 注册节点执行器
for node_type in executor.list_types():
    runner.register_executor(node_type, executor.execute)

# 执行
result = runner.run()

print(result['status'])
print(result['summary'])

AI生成

from ai_dag_generator import AIDAGGenerator

generator = AIDAGGenerator()
workflow = generator.generate("从sales.csv读取数据,过滤status=completed,按category聚合")

print(workflow)

可视化

from dag_visualizer import DAGVisualizer

visualizer = DAGVisualizer()
html = visualizer.render(workflow)

with open('workflow.html', 'w') as f:
    f.write(html)

示例工作流

数据ETL

{
  "workflow_id": "etl_pipeline",
  "name": "数据ETL流程",
  "execution_mode": "parallel",
  "nodes": [
    {
      "id": "load_sales",
      "type": "DataIn",
      "name": "读取销售数据",
      "config": {"path": "data/sales.csv"}
    },
    {
      "id": "load_products",
      "type": "DataIn",
      "name": "读取产品数据",
      "config": {"path": "data/products.csv"}
    },
    {
      "id": "join_data",
      "type": "Join",
      "name": "关联数据",
      "depends_on": ["load_sales", "load_products"],
      "config": {"left_key": "product_id", "right_key": "id", "join_type": "left"}
    },
    {
      "id": "aggregate",
      "type": "Aggregate",
      "name": "按产品聚合",
      "depends_on": ["join_data"],
      "config": {
        "group_by": ["product_name"],
        "aggregations": [
          {"field": "amount", "func": "sum", "alias": "total_sales"},
          {"field": "quantity", "func": "count", "alias": "order_count"}
        ]
      }
    },
    {
      "id": "output",
      "type": "DataOut",
      "name": "输出结果",
      "depends_on": ["aggregate"],
      "config": {"path": "output/summary.csv"}
    }
  ]
}

条件分支

{
  "workflow_id": "conditional_flow",
  "name": "条件分支流程",
  "execution_mode": "auto",
  "nodes": [
    {"id": "start", "type": "DataIn", "name": "读取数据"},
    {"id": "check", "type": "Branch", "depends_on": ["start"]},
    {"id": "path_a", "type": "Filter", "name": "路径A"},
    {"id": "path_b", "type": "Filter", "name": "路径B"},
    {"id": "merge", "type": "Join", "depends_on": ["path_a", "path_b"]}
  ],
  "edges": [
    {"from": "start", "to": "check"},
    {"from": "check", "to": "path_a", "condition": true},
    {"from": "check", "to": "path_b", "condition": false},
    {"from": "path_a", "to": "merge"},
    {"from": "path_b", "to": "merge"}
  ]
}

目录结构

agent-workflow-orchestrator/
├── SKILL.md                    # 本文档
├── scripts/
│   ├── __init__.py
│   ├── dag_runner.py           # DAG执行引擎
│   ├── node_executor.py        # 节点执行器
│   ├── skills_executor.py      # 技能调用器
│   ├── ai_dag_generator.py     # AI DAG生成
│   ├── lineage_tracker.py      # 数据血缘追踪
│   ├── dag_visualizer.py       # DAG可视化
│   ├── security_guard.py       # 安全守卫
│   ├── state_manager.py        # 状态管理
│   ├── config_exporter.py      # 配置导出
│   └── workflow_cli.py         # CLI入口
├── references/
│   ├── workflows/             # 工作流示例
│   └── node_templates/         # 节点模板
└── tests/
    ├── test_dag_runner.py
    ├── test_node_executor.py
    ├── test_skills_executor.py
    ├── test_security_guard.py
    └── test_dag_visualizer.py

版本信息

  • 版本: 2.0.0
  • 更新: 2026-04-27
  • 作者: captain_ai