Eucalyptus日记2-实现思路

乐云一
  • 开发日记
  • 开发日记
  • AI
  • 数字分身
About 2331 wordsAbout 8 min

Eucalyptus 日记2 — 最终实现思路

上一篇聊了设计思路和链路推演,这篇来聊聊这些设计是怎么落地的

说实话,从设计到实现之间,差了十万八千里。很多在设计阶段觉得"应该没问题"的东西,真正跑起来才发现到处是坑。

这篇依然不聊代码细节,而是聚焦在实现的核心思路——时序怎么串的、流程怎么编排的、Agent之间怎么协作的。

vistask-ai的实现思路

DDD分层

后端采用的是DDD(领域驱动设计)分层架构。选DDD不是赶时髦,而是因为任务这个领域模型本身就有复杂的业务逻辑——状态流转、并发认领、进度追踪、重试机制——这些逻辑不应该散落在Service层里。

┌─────────────────────────────────────────┐
│  interfaces(接口层)                      │
│  ├── TaskController      REST接口        │
│  ├── McpTaskService      MCP工具暴露      │
│  └── TaskFeignClient     远程调用          │
├─────────────────────────────────────────┤
│  application(应用层)                     │
│  └── TaskApplicationService  业务编排      │
├─────────────────────────────────────────┤
│  domain(领域层)                          │
│  ├── Task               聚合根            │
│  ├── TaskRepository     仓储接口          │
│  └── TaskStatus         状态枚举          │
├─────────────────────────────────────────┤
│  infrastructure(基础设施层)              │
│  ├── TaskEntity         JPA实体           │
│  ├── TaskRepositoryImpl 仓储实现          │
│  └── TaskMapper         对象转换          │
└─────────────────────────────────────────┘

核心逻辑都内聚在Task这个聚合根里:

class Task:
    # 状态流转——所有状态变更的逻辑都在这里
    def claim(self, processor_id):
        assert self.status == PENDING
        self.status = IN_PROGRESS
        self.claim_time = now()
        self.processor_id = processor_id

    def complete(self, result):
        assert self.status == IN_PROGRESS
        self.status = COMPLETED
        self.result = result

    def fail(self, error_msg):
        assert self.status == IN_PROGRESS
        self.status = FAILED
        self.error_message = error_msg

这样做的好处是,不管是从REST API来的请求,还是从MCP工具来的请求,都走同一套业务逻辑,不会出现不一致的情况。

MCP工具暴露

vistask-ai通过Spring AI MCP Server框架,把任务操作暴露为MCP工具。这样任何支持MCP协议的AI Agent都能直接操作任务:

┌──────────────────────────────────────────┐
│          vistask-ai MCP Server            │
│          端点: /mcp                       │
├──────────────────────────────────────────┤
│                                          │
│  @McpTool claimTask()                    │
│    → AI主动认领待处理任务                  │
│    → 自动加锁,防止并发冲突                │
│                                          │
│  @McpTool completeTask(taskId, result)   │
│    → AI标记任务完成                       │
│    → 记录执行结果                         │
│                                          │
│  @McpTool failTask(taskId, errorMsg)     │
│    → AI标记任务失败                       │
│    → 记录错误信息,便于后续分析            │
│                                          │
└──────────────────────────────────────────┘

MCP工具的调用时序:

AI Agent              MCP Server            领域层             数据库
   │                     │                    │                  │
   │──claimTask()──────>│                    │                  │
   │                     │──task.claim()────>│                  │
   │                     │                    │──SELECT FOR UPD─>│
   │                     │                    │──UPDATE STATUS──>│
   │                     │<──claim result────│                  │
   │<──Task Info────────│                    │                  │

前端看板

前端用的是Vue3 + Vite,UI风格走的是赛博朋克工业风——霓虹色调、深色背景。说实话这个风格挺酷的,看任务状态一目了然。

核心组件就一个TaskList,负责展示所有任务的状态、进度、执行日志。

Eucalyptus的实现思路

技能体系

Eucalyptus的核心能力是通过**技能(Skill)**来组织的。41个技能,每个技能都是一个独立的提示词工程。

技能之间有明确的层级关系:

master-skill(总调度)
    │
    ├── fix-workflow(Bug修复工作流)
    │     ├── fix-triage Agent
    │     ├── fix-reproduce Agent
    │     ├── fix-diagnose Agent
    │     ├── fix-repair Agent
    │     ├── fix-deliver Agent
    │     └── fix-verify Agent
    │
    ├── feature-workflow(新需求工作流)
    │     └── 需求分析 → 设计 → 实现 → 测试
    │
    ├── refactor-workflow(重构工作流)
    │     └── 分析 → 规划 → 执行 → 验证
    │
    └── deploy-workflow(部署工作流)
          └── 构建 → 测试 → 部署 → 验证

项目路由

当任务进来时,第一个关键步骤是判断任务属于哪个项目。这通过project-router技能实现:

function project_router(task):
    // 从任务上下文中提取项目标识
    project = extract_project_info(task)

    // 加载对应项目的知识库
    knowledge = load_project_context(project.name)
    // knowledge包含:
    //   - context.yaml   项目上下文
    //   - tech-stack.md  技术栈
    //   - architecture.md 架构文档

    // 切换到项目工作目录
    switch_workspace(project.workspace)

    // 将知识库注入到后续执行的上下文中
    inject_context(knowledge)

    return project

Agent协作流水线

以Bug修复为例,六个Agent之间形成一条流水线。每个Agent完成自己的工作后,将结果写入共享上下文文件,下一个Agent从中读取:

时序:

fix-triage          fix-reproduce       fix-diagnose        fix-repair
    │                     │                   │                   │
    │ [分析Bug描述]       │                   │                   │
    │ [分类定级]          │                   │                   │
    │ [评估影响范围]      │                   │                   │
    │                     │                   │                   │
    │──写入triage结果──> context.md            │                   │
    │                     │                   │                   │
    │                     │ [读取triage结果]   │                   │
    │                     │ [尝试复现]         │                   │
    │                     │ [收集日志]         │                   │
    │                     │                   │                   │
    │                     │──写入reproduce结果─> context.md          │
    │                     │                   │                   │
    │                     │                   │ [读取reproduce结果]│
    │                     │                   │ [定位根因]         │
    │                     │                   │ [分析代码]         │
    │                     │                   │                   │
    │                     │                   │──写入diagnose结果─> context.md
    │                     │                   │                   │
    │                     │                   │                   │ [读取diagnose结果]
    │                     │                   │                   │ [编写修复代码]
    │                     │                   │                   │ [编写测试]
    │                     │                   │                   │

这种设计的好处是每个Agent职责单一、互不干扰,而且任何一个阶段失败了,只需要从失败的阶段重新开始,不用从头来过。

知识库管理

Eucalyptus的知识库结构:

memory/
├── projects/
│   ├── DMS/
│   │   ├── context.yaml       # 项目上下文(仓库地址、分支、环境)
│   │   ├── tech-stack.md      # 技术栈(Java、Spring Boot、Vue、Netty)
│   │   ├── architecture.md    # 架构文档(微服务、多仓库)
│   │   ├── conventions.md     # 代码规范
│   │   ├── recent-tasks.md    # 近期任务记录
│   │   └── known-bugs.md      # 已知Bug记录
│   ├── APM/
│   │   └── ...
│   └── AMS/
│       └── ...
└── templates/
    └── project-template/      # 新项目的知识库模板

知识库会在每次任务执行后自动更新——新的Bug记录、新的架构变更、新的代码规范都会被自动写入对应的知识库文件。

Task Processor的实现思路

执行引擎

Task Processor的核心执行逻辑,是把vistask-ai的任务转换为Eucalyptus能理解的指令:

function execute_task(task):
    // 1. 构建执行参数
    params = {
        "task_id": task.id,
        "prompt": task.prompt,
        "project": task.project,
        "context_file": task.context_file
    }

    // 2. 调用Claude CLI
    process = subprocess.Popen([
        "claude",
        "--print",                    // 非交互模式
        "--output-format", "stream-json",  // JSON流式输出
        "--session-id", task.session_id,    // 会话管理
        "/execute-task",              // 调用Eucalyptus技能
        json.dumps(params)            // 传入任务参数
    ])

    // 3. 实时解析输出流
    for line in process.stdout:
        event = json.loads(line)
        handle_event(event)

    return process.wait()

事件解析与进度上报

Claude CLI输出的JSON事件有几种类型,每种都需要不同的处理:

def handle_event(event):
    match event["type"]:
        case "TEXT":
            # AI输出的文本信息 → 作为进度描述上报
            report_progress(
                message=event["content"],
                percentage=estimate_progress()
            )

        case "TOOL_USE":
            # AI调用了工具(读文件、执行命令等)→ 记录操作步骤
            report_progress(
                message=f"执行操作: {event['tool']}",
                detail=json.dumps(event["input"])
            )

        case "TOOL_RESULT":
            # 工具返回结果 → 静默处理,不上报
            pass

        case "RESULT":
            # 最终结果 → 任务完成
            report_progress(
                message="任务执行完成",
                percentage=100
            )

        case "SYSTEM":
            # 系统消息 → 日志记录
            log_info(event["content"])

进度上报的时序:

Claude CLI          Task Processor          vistask-ai
    │                     │                     │
    │──TEXT event───────>│                     │
    │                     │──progress(15%)────>│
    │                     │                     │
    │──TOOL_USE event───>│                     │
    │                     │──progress(30%)────>│
    │                     │                     │
    │──TEXT event───────>│                     │
    │                     │──progress(60%)────>│
    │                     │                     │
    │──RESULT event─────>│                     │
    │                     │──progress(100%)───>│
    │                     │──complete()───────>│

异常处理

7x24运行最怕的就是挂掉。所以异常处理是重中之重:

function main_loop():
    while True:
        try:
            ensure_authenticated()    // 确保Token有效

            task = claim_task()       // 认领任务

            if task is None:
                sleep(IDLE_WAIT)      // 没活儿就等
                continue

            execute(task)             // 执行任务

        except RateLimitError:
            // 429限流 → 指数退避重试
            sleep(backoff_time)
            backoff_time *= 2

        except AuthError:
            // 认证失败 → 重新登录
            reauthenticate()

        except NetworkError:
            // 网络异常 → 等待后重试
            sleep(RETRY_WAIT)

        except TaskExecutionError as e:
            // 任务执行失败 → 上报失败状态
            fail_task(task_id, str(e))

        except Exception as e:
            // 未知异常 → 记录日志,继续循环
            log_error(e)
            sleep(ERROR_WAIT)

关键设计:不管出什么异常,主循环都不能断。每次异常处理后继续下一轮轮询。这就是7x24的底气。

文件上传链路

任务执行完成后,如果有产出文件(比如生成的报告、修复的代码补丁等),需要上传到文件服务器:

Task Processor              文件服务器
    │                           │
    │──计算文件MD5─────────────>│ (本地计算)
    │                           │
    │──判断文件大小             │
    │   ├── ≤ 5MB: 直接上传    │
    │   ├── ≤ 100MB: 10MB分片   │
    │   └── > 100MB: 20MB分片   │
    │                           │
    │──HMAC签名认证────────────>│
    │                           │
    │──分片上传(并发)──────────>│
    │   ├── chunk_1 ──────────>│
    │   ├── chunk_2 ──────────>│
    │   └── chunk_3 ──────────>│
    │                           │
    │──校验MD5─────────────────>│
    │<──上传完成───────────────│

上传完成后,把文件URL写回任务结果中。

全链路串起来

把所有组件的实现串在一起,完整的执行流程是这样的:

[阶段1: 任务创建]
人类/系统 → vistask-ai REST API → 创建Task(PENDING)

[阶段2: 任务认领]
Task Processor → 轮询 → SELECT FOR UPDATE → UPDATE IN_PROGRESS

[阶段3: 意图路由]
Task Processor → Claude CLI → Eucalyptus → master-skill → 路由到工作流

[阶段4: 项目切换]
project-router → 加载项目知识库 → 注入上下文

[阶段5: 工作流执行]
以fix-workflow为例:
  fix-triage → 分析 → 写入上下文
  fix-reproduce → 复现 → 写入上下文
  fix-diagnose → 诊断 → 写入上下文
  fix-repair → 修复 → 写入代码
  fix-deliver → 提交 → 创建PR

[阶段6: 进度上报]
每一步 → JSON事件流 → Process解析 → REST API上报

[阶段7: 结果上传]
产出文件 → HMAC认证 → 分片上传 → 文件服务器

[阶段8: 任务完成]
Process → completeTask() → vistask-ai → Task(COMPLETED)

[阶段9: 知识库更新]
Eucalyptus → 更新项目知识库 → 记录本次任务经验

小结

这篇日记主要梳理了落地的核心实现思路

  1. vistask-ai用DDD分层,把核心逻辑内聚在领域模型中
  2. Eucalyptus用技能+Agent的编排实现复杂工作流,通过共享上下文文件实现Agent间协作
  3. Task Processor用无限轮询+异常兜底实现7x24不间断运行
  4. 三个组件通过REST API、Claude CLI、文件上传三条链路串联

设计到实现,最大的感触就是:简单的东西能跑起来就是好东西

不要过度设计,先让链路跑通,再慢慢加功能。这才是工程化的正确打开方式。

下一篇,就该聊聊这个系统到底有多厉害了——数字分身的真正价值。

Last update:
Contributors: LeYunone
Comments
  • Latest
  • Oldest
  • Hottest
Powered by Waline v2.14.7