Claude Code 实战(三):任务运行时 — 任务系统、后台任务与定时调度
AI 教程教程进阶12 分钟阅读
学习路径:Claude Code 实战

Claude Code 实战(三):任务运行时 — 任务系统、后台任务与定时调度

AI Agent任务运行时实战!详解任务系统生命周期管理、后台任务异步执行、Cron定时调度系统,让Agent从单线程变多线程。

Claude Code 实战(三):任务运行时 — 任务系统、后台任务与定时调度

当 Agent 开始处理真正的工作负载,同步执行就不够了。本文覆盖任务运行时阶段的 3 个机制。

前言

第一篇第二篇中,我们搭建了 Agent 的核心闭环并完成了系统加固。

现在系统已经能安全、稳定地工作了。但还有一个问题:所有操作都是同步的。

当 agent 开始处理真正的工作负载时,会遇到:

  • 一个任务拆出 8 个子步骤,要跟踪每个步骤的状态
  • 有些操作要跑几分钟(测试套件、构建),主循环不能卡住等
  • 有些操作不是现在做,而是"每天晚上跑一次"、"30 分钟后提醒我"

本文解决这 3 个问题:

  1. 任务系统(s12)— 把工作拆成可追踪的任务单元
  2. 后台任务(s13)— 让慢操作异步执行,不卡主循环
  3. 定时调度(s14)— 让时间也成为触发工作的入口

第 12 章:任务系统

"任务不是待办清单的升级版,而是一等公民工作单元"

问题

核心闭环阶段有了 todo_write 做待办清单。但当工作变得更复杂时,简单的 todo 列表不够了:

  • 需要跟踪任务之间的依赖关系
  • 需要记录每个任务用了什么工具、产生了什么输出
  • 需要支持任务的中断和恢复

任务 vs Todo 的区别

Todo Task
轻量级文本列表 结构化工作单元
只有 pending/in_progress/completed 有完整生命周期
不记录执行历史 记录输入、输出、工具调用
不支持依赖关系 支持任务间依赖

任务生命周期

created -> pending -> in_progress -> completed
                      |
                      +-> failed
                      |
                      +-> cancelled

关键数据结构

task = {
    "id": "task_001",
    "title": "Fix authentication bug",
    "status": "in_progress",
    "dependencies": ["task_000"],       # 依赖哪些任务先完成
    "created_at": 1710000000.0,
    "started_at": 1710000060.0,
    "tool_calls": [...],                # 执行过的工具调用记录
    "output": "...",                    # 任务输出
}

最小实现

class TaskManager:
    def __init__(self):
        self.tasks = {}

    def create(self, title: str, deps: list = None) -> dict:
        task_id = f"task_{len(self.tasks) + 1:03d}"
        task = {
            "id": task_id,
            "title": title,
            "status": "pending",
            "dependencies": deps or [],
            "created_at": time.time(),
        }
        self.tasks[task_id] = task
        return task

    def start(self, task_id: str):
        task = self.tasks[task_id]
        # 检查依赖是否都完成了
        for dep_id in task["dependencies"]:
            if self.tasks[dep_id]["status"] != "completed":
                raise ValueError(f"Dependency {dep_id} not completed")
        task["status"] = "in_progress"
        task["started_at"] = time.time()

    def complete(self, task_id: str, output: str = ""):
        task = self.tasks[task_id]
        task["status"] = "completed"
        task["output"] = output
        task["completed_at"] = time.time()

    def next_ready(self) -> dict:
        """找到下一个可以开始的任务"""
        for task in self.tasks.values():
            if task["status"] == "pending":
                deps_met = all(
                    self.tasks[d]["status"] == "completed"
                    for d in task["dependencies"]
                )
                if deps_met:
                    return task
        return None

任务系统的价值

  • 可追踪:每个工作单元有完整记录
  • 可恢复:任务中断后知道从哪里继续
  • 可依赖:支持任务间的先后关系
  • 可审计:记录了每个任务用了什么工具、产生了什么结果

第 13 章:后台任务

"已经启动的慢操作,结果什么时候回来?"

问题

有些操作天然就很慢:

  • 跑完整的测试套件(3-5 分钟)
  • 构建一个大型项目(5-10 分钟)
  • 批量处理数据

如果主循环卡在这些操作上,整个 agent 就被阻塞了,无法响应其他请求。

最小心智模型

主循环                     后台工作线程
  |                           |
  | -> 提交慢命令              |
  |    (不等待)                |
  | <- 返回 task_id            |
  |                           | -> 开始执行
  | (继续处理其他工作)          |    ...
  |                           | -> 完成
  | <- 轮询/通知:任务完成了     |
  | -> 获取结果                |

关键数据结构

background_task = {
    "id": "bg_001",
    "command": "pytest tests/",
    "status": "running",     # running / completed / failed
    "started_at": 1710000000.0,
    "output": "",            # 执行输出(实时追加)
    "exit_code": None,       # 退出码
}

最小实现

import threading
import subprocess

class BackgroundRunner:
    def __init__(self):
        self.tasks = {}
        self.lock = threading.Lock()

    def submit(self, command: str, cwd: str = None) -> str:
        """提交一个后台任务,立即返回 task_id"""
        task_id = f"bg_{len(self.tasks) + 1:03d}"
        task = {
            "id": task_id,
            "command": command,
            "status": "running",
            "started_at": time.time(),
            "output": "",
            "exit_code": None,
        }
        self.tasks[task_id] = task

        # 在后台线程中执行
        thread = threading.Thread(
            target=self._run, args=(task_id, command, cwd)
        )
        thread.daemon = True
        thread.start()
        return task_id

    def _run(self, task_id, command, cwd):
        result = subprocess.run(
            command, shell=True, capture_output=True, text=True, cwd=cwd
        )
        with self.lock:
            self.tasks[task_id]["output"] = result.stdout + result.stderr
            self.tasks[task_id]["exit_code"] = result.returncode
            self.tasks[task_id]["status"] = "completed" if result.returncode == 0 else "failed"

    def poll(self, task_id: str) -> dict:
        """查看任务状态"""
        return self.tasks.get(task_id)

    def result(self, task_id: str) -> str:
        """获取任务结果"""
        task = self.tasks[task_id]
        if task["status"] == "running":
            return f"[仍在运行] 当前输出:\n{task['output'][-500:]}"
        return f"[退出码 {task['exit_code']}] 输出:\n{task['output']}"

后台任务接入主循环

# 模型调用 bash 工具时,判断是否需要后台执行
def run_bash(command, timeout=30):
    if is_long_running(command):
        task_id = bg_runner.submit(command)
        return f"[后台任务已提交] ID: {task_id}\n使用 poll_background 查看状态"
    
    # 短命令同步执行
    result = subprocess.run(command, shell=True, capture_output=True, text=True, timeout=timeout)
    return result.stdout + result.stderr

关键

  • 后台任务让主循环不被慢操作阻塞
  • 通过 task_id 关联,随时可以查看状态和获取结果
  • 模型可以继续处理其他工作,不必干等

第 14 章:定时调度

"一件事应该在未来什么时候开始?"

问题

后台任务解决的是"现在启动,稍后拿结果"。但很多需求不是现在做,而是:

  • 每天晚上跑一次测试
  • 每周一早上生成报告
  • 30 分钟后提醒我检查结果

定时调度 = 把一条未来要执行的意图先记下来,等时间到了再触发。

最小心智模型

schedule_create(...)
  -> 把记录写到列表或文件里
  -> 后台检查器每分钟看一次"现在是否匹配"
  -> 如果匹配,就把 prompt 放进通知队列
  -> 主循环下一轮把它当成新的用户消息喂给模型

关键理解:定时调度不是另一套 agent,它最终还是回到同一条主循环。

Cron 表达式

分 时 日 月 周

*/5 * * * *     每 5 分钟
0 9 * * 1       每周一 9 点
30 14 * * *     每天 14:30

关键数据结构

schedule = {
    "id": "job_001",
    "cron": "0 9 * * 1",
    "prompt": "Run the weekly status report.",
    "recurring": True,      # 是否反复触发
    "durable": True,        # 是否落盘保存(程序重启后还在)
    "created_at": 1710000000.0,
    "last_fired_at": None,
}

最小实现

class Scheduler:
    def __init__(self):
        self.jobs = []
        self.queue = []   # 通知队列

    def create(self, cron_expr, prompt, recurring=True):
        job = {
            "id": f"job_{len(self.jobs) + 1:03d}",
            "cron": cron_expr,
            "prompt": prompt,
            "recurring": recurring,
            "created_at": time.time(),
            "last_fired_at": None,
        }
        self.jobs.append(job)
        return job

    def check_loop(self):
        """后台线程:每分钟检查一次"""
        while True:
            now = datetime.now()
            for job in self.jobs:
                if cron_matches(job["cron"], now):
                    # 避免同一分钟重复触发
                    if job["last_fired_at"]:
                        last = datetime.fromtimestamp(job["last_fired_at"])
                        if last.minute == now.minute:
                            continue
                    self.queue.append({
                        "type": "scheduled_prompt",
                        "schedule_id": job["id"],
                        "prompt": job["prompt"],
                    })
                    job["last_fired_at"] = now.timestamp()
            time.sleep(60)

    def drain(self) -> list:
        """主循环调用:取走所有待处理通知"""
        notifications = self.queue[:]
        self.queue.clear()
        return notifications

接入主循环

# 主循环每轮开始时,先处理定时通知
notifications = scheduler.drain()
for item in notifications:
    messages.append({
        "role": "user",
        "content": f"[定时任务:{item['schedule_id']}] {item['prompt']}",
    })

后台任务 vs 定时调度

机制 回答的问题 触发方式
后台任务 "已启动的慢操作,结果什么时候回来?" 用户主动触发
定时调度 "一件事应该在未来什么时候开始?" 时间自动触发

总结

任务运行时阶段让 Agent 从"单线程"变成"多线程工作":

章节 机制 核心价值
s12 任务系统 结构化工作单元,可追踪可恢复
s13 后台任务 慢操作不阻塞主循环
s14 定时调度 时间成为触发入口

下一篇也是最后一篇,我们将进入"多 Agent 平台"阶段:Agent 团队、团队协议、自主代理、Worktree 隔离、MCP 与插件。


常见问题

Q: 任务系统和普通 TODO 列表有什么区别?

TODO 是扁平的文本列表,只能标记完成/未完成。任务系统是结构化的工作单元,有生命周期管理、依赖关系、执行历史记录。可以理解成 TODO 的"企业版"。

Q: 后台任务失败了怎么办?

后台任务失败后状态变为 failed,主循环可以通过 poll 接口查到。你可以让模型根据错误信息决定是重试、换方案还是报告给用户。

Q: Cron 表达式太难记了怎么办?

记住几个常用模式就够了:*/5 * * * * = 每5分钟,0 9 * * * = 每天9点,0 9 * * 1 = 每周一9点。更复杂的可以用在线 Cron 生成器。

Q: 定时调度重启后还在吗?

取决于实现。如果 durable=True,调度记录会落盘保存,程序重启后自动恢复。如果存在内存中,重启就没了。生产环境建议用持久化存储。

Q: 这套任务系统支持分布式吗?

本文的实现是单机版。如果需要分布式,建议用 Celery、RQ 等成熟框架。核心概念是通用的。入门推荐先看 Claude Code 配置教程


本文基于 Learn Claude Code 开源项目(GitHub: shareAI-lab/learn-claude-code)整理改编。

相关阅读:

相关阅读: