Claude Code 实战(三):任务运行时 — 任务系统、后台任务与定时调度
当 Agent 开始处理真正的工作负载,同步执行就不够了。本文覆盖任务运行时阶段的 3 个机制。
前言
在第一篇和第二篇中,我们搭建了 Agent 的核心闭环并完成了系统加固。
现在系统已经能安全、稳定地工作了。但还有一个问题:所有操作都是同步的。
当 agent 开始处理真正的工作负载时,会遇到:
- 一个任务拆出 8 个子步骤,要跟踪每个步骤的状态
- 有些操作要跑几分钟(测试套件、构建),主循环不能卡住等
- 有些操作不是现在做,而是"每天晚上跑一次"、"30 分钟后提醒我"
本文解决这 3 个问题:
- 任务系统(s12)— 把工作拆成可追踪的任务单元
- 后台任务(s13)— 让慢操作异步执行,不卡主循环
- 定时调度(s14)— 让时间也成为触发工作的入口
第 12 章:任务系统
"任务不是待办清单的升级版,而是一等公民工作单元"
问题
核心闭环阶段有了 todo_write 做待办清单。但当工作变得更复杂时,简单的 todo 列表不够了:
- 需要跟踪任务之间的依赖关系
- 需要记录每个任务用了什么工具、产生了什么输出
- 需要支持任务的中断和恢复
任务 vs Todo 的区别
| Todo | Task |
|---|---|
| 轻量级文本列表 | 结构化工作单元 |
| 只有 pending/in_progress/completed | 有完整生命周期 |
| 不记录执行历史 | 记录输入、输出、工具调用 |
| 不支持依赖关系 | 支持任务间依赖 |
任务生命周期
created -> pending -> in_progress -> completed
|
+-> failed
|
+-> cancelled
关键数据结构
task = {
"id": "task_001",
"title": "Fix authentication bug",
"status": "in_progress",
"dependencies": ["task_000"], # 依赖哪些任务先完成
"created_at": 1710000000.0,
"started_at": 1710000060.0,
"tool_calls": [...], # 执行过的工具调用记录
"output": "...", # 任务输出
}
最小实现
class TaskManager:
def __init__(self):
self.tasks = {}
def create(self, title: str, deps: list = None) -> dict:
task_id = f"task_{len(self.tasks) + 1:03d}"
task = {
"id": task_id,
"title": title,
"status": "pending",
"dependencies": deps or [],
"created_at": time.time(),
}
self.tasks[task_id] = task
return task
def start(self, task_id: str):
task = self.tasks[task_id]
# 检查依赖是否都完成了
for dep_id in task["dependencies"]:
if self.tasks[dep_id]["status"] != "completed":
raise ValueError(f"Dependency {dep_id} not completed")
task["status"] = "in_progress"
task["started_at"] = time.time()
def complete(self, task_id: str, output: str = ""):
task = self.tasks[task_id]
task["status"] = "completed"
task["output"] = output
task["completed_at"] = time.time()
def next_ready(self) -> dict:
"""找到下一个可以开始的任务"""
for task in self.tasks.values():
if task["status"] == "pending":
deps_met = all(
self.tasks[d]["status"] == "completed"
for d in task["dependencies"]
)
if deps_met:
return task
return None
任务系统的价值
- 可追踪:每个工作单元有完整记录
- 可恢复:任务中断后知道从哪里继续
- 可依赖:支持任务间的先后关系
- 可审计:记录了每个任务用了什么工具、产生了什么结果
第 13 章:后台任务
"已经启动的慢操作,结果什么时候回来?"
问题
有些操作天然就很慢:
- 跑完整的测试套件(3-5 分钟)
- 构建一个大型项目(5-10 分钟)
- 批量处理数据
如果主循环卡在这些操作上,整个 agent 就被阻塞了,无法响应其他请求。
最小心智模型
主循环 后台工作线程
| |
| -> 提交慢命令 |
| (不等待) |
| <- 返回 task_id |
| | -> 开始执行
| (继续处理其他工作) | ...
| | -> 完成
| <- 轮询/通知:任务完成了 |
| -> 获取结果 |
关键数据结构
background_task = {
"id": "bg_001",
"command": "pytest tests/",
"status": "running", # running / completed / failed
"started_at": 1710000000.0,
"output": "", # 执行输出(实时追加)
"exit_code": None, # 退出码
}
最小实现
import threading
import subprocess
class BackgroundRunner:
def __init__(self):
self.tasks = {}
self.lock = threading.Lock()
def submit(self, command: str, cwd: str = None) -> str:
"""提交一个后台任务,立即返回 task_id"""
task_id = f"bg_{len(self.tasks) + 1:03d}"
task = {
"id": task_id,
"command": command,
"status": "running",
"started_at": time.time(),
"output": "",
"exit_code": None,
}
self.tasks[task_id] = task
# 在后台线程中执行
thread = threading.Thread(
target=self._run, args=(task_id, command, cwd)
)
thread.daemon = True
thread.start()
return task_id
def _run(self, task_id, command, cwd):
result = subprocess.run(
command, shell=True, capture_output=True, text=True, cwd=cwd
)
with self.lock:
self.tasks[task_id]["output"] = result.stdout + result.stderr
self.tasks[task_id]["exit_code"] = result.returncode
self.tasks[task_id]["status"] = "completed" if result.returncode == 0 else "failed"
def poll(self, task_id: str) -> dict:
"""查看任务状态"""
return self.tasks.get(task_id)
def result(self, task_id: str) -> str:
"""获取任务结果"""
task = self.tasks[task_id]
if task["status"] == "running":
return f"[仍在运行] 当前输出:\n{task['output'][-500:]}"
return f"[退出码 {task['exit_code']}] 输出:\n{task['output']}"
后台任务接入主循环
# 模型调用 bash 工具时,判断是否需要后台执行
def run_bash(command, timeout=30):
if is_long_running(command):
task_id = bg_runner.submit(command)
return f"[后台任务已提交] ID: {task_id}\n使用 poll_background 查看状态"
# 短命令同步执行
result = subprocess.run(command, shell=True, capture_output=True, text=True, timeout=timeout)
return result.stdout + result.stderr
关键
- 后台任务让主循环不被慢操作阻塞
- 通过 task_id 关联,随时可以查看状态和获取结果
- 模型可以继续处理其他工作,不必干等
第 14 章:定时调度
"一件事应该在未来什么时候开始?"
问题
后台任务解决的是"现在启动,稍后拿结果"。但很多需求不是现在做,而是:
- 每天晚上跑一次测试
- 每周一早上生成报告
- 30 分钟后提醒我检查结果
定时调度 = 把一条未来要执行的意图先记下来,等时间到了再触发。
最小心智模型
schedule_create(...)
-> 把记录写到列表或文件里
-> 后台检查器每分钟看一次"现在是否匹配"
-> 如果匹配,就把 prompt 放进通知队列
-> 主循环下一轮把它当成新的用户消息喂给模型
关键理解:定时调度不是另一套 agent,它最终还是回到同一条主循环。
Cron 表达式
分 时 日 月 周
*/5 * * * * 每 5 分钟
0 9 * * 1 每周一 9 点
30 14 * * * 每天 14:30
关键数据结构
schedule = {
"id": "job_001",
"cron": "0 9 * * 1",
"prompt": "Run the weekly status report.",
"recurring": True, # 是否反复触发
"durable": True, # 是否落盘保存(程序重启后还在)
"created_at": 1710000000.0,
"last_fired_at": None,
}
最小实现
class Scheduler:
def __init__(self):
self.jobs = []
self.queue = [] # 通知队列
def create(self, cron_expr, prompt, recurring=True):
job = {
"id": f"job_{len(self.jobs) + 1:03d}",
"cron": cron_expr,
"prompt": prompt,
"recurring": recurring,
"created_at": time.time(),
"last_fired_at": None,
}
self.jobs.append(job)
return job
def check_loop(self):
"""后台线程:每分钟检查一次"""
while True:
now = datetime.now()
for job in self.jobs:
if cron_matches(job["cron"], now):
# 避免同一分钟重复触发
if job["last_fired_at"]:
last = datetime.fromtimestamp(job["last_fired_at"])
if last.minute == now.minute:
continue
self.queue.append({
"type": "scheduled_prompt",
"schedule_id": job["id"],
"prompt": job["prompt"],
})
job["last_fired_at"] = now.timestamp()
time.sleep(60)
def drain(self) -> list:
"""主循环调用:取走所有待处理通知"""
notifications = self.queue[:]
self.queue.clear()
return notifications
接入主循环
# 主循环每轮开始时,先处理定时通知
notifications = scheduler.drain()
for item in notifications:
messages.append({
"role": "user",
"content": f"[定时任务:{item['schedule_id']}] {item['prompt']}",
})
后台任务 vs 定时调度
| 机制 | 回答的问题 | 触发方式 |
|---|---|---|
| 后台任务 | "已启动的慢操作,结果什么时候回来?" | 用户主动触发 |
| 定时调度 | "一件事应该在未来什么时候开始?" | 时间自动触发 |
总结
任务运行时阶段让 Agent 从"单线程"变成"多线程工作":
| 章节 | 机制 | 核心价值 |
|---|---|---|
| s12 | 任务系统 | 结构化工作单元,可追踪可恢复 |
| s13 | 后台任务 | 慢操作不阻塞主循环 |
| s14 | 定时调度 | 时间成为触发入口 |
下一篇也是最后一篇,我们将进入"多 Agent 平台"阶段:Agent 团队、团队协议、自主代理、Worktree 隔离、MCP 与插件。
常见问题
Q: 任务系统和普通 TODO 列表有什么区别?
TODO 是扁平的文本列表,只能标记完成/未完成。任务系统是结构化的工作单元,有生命周期管理、依赖关系、执行历史记录。可以理解成 TODO 的"企业版"。
Q: 后台任务失败了怎么办?
后台任务失败后状态变为 failed,主循环可以通过 poll 接口查到。你可以让模型根据错误信息决定是重试、换方案还是报告给用户。
Q: Cron 表达式太难记了怎么办?
记住几个常用模式就够了:*/5 * * * * = 每5分钟,0 9 * * * = 每天9点,0 9 * * 1 = 每周一9点。更复杂的可以用在线 Cron 生成器。
Q: 定时调度重启后还在吗?
取决于实现。如果 durable=True,调度记录会落盘保存,程序重启后自动恢复。如果存在内存中,重启就没了。生产环境建议用持久化存储。
Q: 这套任务系统支持分布式吗?
本文的实现是单机版。如果需要分布式,建议用 Celery、RQ 等成熟框架。核心概念是通用的。入门推荐先看 Claude Code 配置教程。
本文基于 Learn Claude Code 开源项目(GitHub: shareAI-lab/learn-claude-code)整理改编。
相关阅读:
相关阅读:
