芥末
发布于 2025-10-11 / 0 阅读
0
0

使用 Spring AI Alibaba 构建后台定时运行的 AI Agent

常见的 AI(人工智能)Agent 大多以 Chat 形态出现:用户发一句话,Agent 理解意图、调用工具、生成结果,再把结果返回给用户。这种模式适合问答、辅助写作、代码解释、临时查询等交互场景,但不适合所有业务。

很多企业任务并不是由用户临时发起的,而是固定周期运行,或者由外部事件触发。例如:

  • 每天早上 8 点生成门店经营日报;
  • 每小时扫描用户评价,发现投诉风险后通知负责人;
  • 定时汇总供应商舆情,遇到高风险事件再进入人工审核;
  • 周期性分析金融资讯、社交媒体、公告数据,识别潜在风险;
  • 后台处理长耗时的数据分析任务,把结果提前加工好。

这些任务的共同点是:Agent 不应该一直等用户输入,而应该在后台按计划工作。它更像一个带有大语言模型能力的业务批处理器,可以自动采集数据、分析数据、调用工具、生成消息,并在关键节点让人介入。

Spring AI Alibaba(SAA)提供了基于图的 Agent 编排能力,可以用 StateGraph 定义工作流,用 CompiledGraph 编译运行图,并通过 schedule(ScheduleConfig config) 把 Agent 注册成定时任务。

后台定时 Agent 解决什么问题

Chat 模式的 Agent 强依赖用户主动发起请求。用户不说话,Agent 就不会执行。这对后台业务并不友好,因为很多任务天然是周期性的。

后台定时 Agent 的核心价值在于让 Agent 具备自主运行能力:

场景Chat Agent 的问题后台定时 Agent 的做法
经营日报每天都要人工询问一次到点自动读取业务数据并生成日报
舆情监控用户不知道什么时候该查定时扫描评价、新闻、社媒内容
风险告警只能被动回答风险情况发现风险后主动发送通知
长周期分析对话等待时间太长后台异步处理,完成后推送结果
人工审批全自动执行风险较高只在关键节点请求人工确认

后台定时 Agent 并不是简单地给一个 Chat 接口套上定时器。真正可用的实现至少要处理四件事:

  1. 周期触发:按 Cron 表达式或业务计划启动任务。
  2. 工作流编排:把数据加载、模型分析、结果发送、人工确认拆成节点。
  3. 状态管理:在不同节点之间传递中间结果,例如原始数据、分析结果、告警内容。
  4. 可控执行:支持最大迭代次数、条件分支、异常处理、人工介入。

定时 Agent 的运行结构

在 SAA 中,一个典型的后台定时 Agent 可以拆成三层:

  • StateGraph:定义 Agent 的节点和边,也就是工作流结构。
  • CompiledGraph:把工作流编译成可运行对象。
  • ScheduledAgentManager:管理被注册的定时 Agent,并在触发时间执行它们。

整体关系可以这样理解:

flowchart LR
    A[Cron 表达式] --> B[ScheduleConfig]
    B --> C[CompiledGraph.schedule]
    C --> D[ScheduledAgentManager]
    D --> E[触发 Agent 执行]
    E --> F[StateGraph 工作流]
    F --> G[数据加载节点]
    F --> H[LLM 分析节点]
    F --> I[消息发送节点]
    F --> J[人工确认节点]

StateGraph 负责描述“做什么”和“怎么流转”,ScheduleConfig 负责描述“什么时候做”。当 CompiledGraph 调用 schedule 后,这个 Agent 就会被定时任务管理器接管,到达指定时间后自动执行。

在单机应用中,默认实现可以直接在进程内管理这些任务。进入分布式部署后,需要进一步考虑任务去重、分布式锁、调度中心、失败重试和任务状态持久化,否则多个实例可能同时执行同一个 Agent。

Chat、低代码平台、Agent 开发框架的差异

定时 AI 任务大致有三种实现形态。

形态使用方式适合场景局限
Chat 任务在聊天窗口里描述周期任务简单提醒、周期性摘要、个人助手复杂业务集成能力弱,流程控制有限
低代码平台配置定时触发器和流程节点快速搭建标准化流程深度定制、复杂状态管理、企业系统集成受平台能力限制
Agent 开发框架用代码定义工作流、节点、状态和调度复杂企业级 Agent、强业务逻辑、可测试可维护开发成本高,需要工程化能力

如果任务只是“每天提醒我看天气”,Chat 任务就够了。如果任务需要接入订单系统、评价系统、消息系统、审批系统,并根据模型结果走不同分支,代码化的 Agent 框架会更合适。

定时 Agent 的核心编程模型

SAA 的图式 Agent 通常围绕几个概念组织代码。

概念作用
StateGraph定义 Agent 的节点、边、条件分支和状态策略
CompiledGraph编译后的可执行图
ScheduleConfig定义定时运行规则,常用 Cron 表达式
AsyncNodeAction异步节点动作,适合执行数据读取、业务处理、工具调用
LlmNode调用 LLM(大语言模型,Large Language Model)完成分析、生成、分类等任务
KeyStrategy定义状态字段写入策略,例如覆盖、追加等
HumanFeedbackNode等待人工反馈或确认
HumanActionNode根据人工决策执行后续动作

一个定时 Agent 的基本代码结构通常长这样:

StateGraph graph = new StateGraph("AgentName", stateStrategyFactory)
    .addNode("node_a", nodeA)
    .addNode("node_b", nodeB)
    .addEdge(StateGraph.START, "node_a")
    .addEdge("node_a", "node_b")
    .addEdge("node_b", StateGraph.END);

CompiledGraph compiledGraph = graph.compile();

ScheduleConfig scheduleConfig = ScheduleConfig.builder()
    .cronExpression("0 0 8 * * ?")
    .build();

compiledGraph.schedule(scheduleConfig);

Cron 表达式 "0 0 8 * * ?" 表示每天 8 点执行一次。和普通定时任务不同的是,这里执行的不是一个单独方法,而是一整个 Agent 工作流。

实践一:店铺经营日报 Agent

店铺经营日报适合用后台定时 Agent 实现。它的任务不是回答用户某个临时问题,而是每天固定时间汇总经营数据,生成结构化报告,并发送给指定人员。

这个 Agent 可以从多个维度加载数据:

  • 交易订单:销售额、订单数、客单价、退款情况;
  • 商品信息:热销商品、滞销商品、库存情况;
  • 客户画像:新客、老客、复购情况;
  • 门店信息:门店基础指标、营业时段表现;
  • 客户反馈:评价、投诉、建议等非结构化文本。

传统报表擅长展示准确数值,但不擅长解释非结构化反馈。LLM 可以把用户评价、投诉内容、销售变化等信息整理成经营结论,例如“哪个商品被频繁吐槽”“哪个时段服务压力较高”“下一步应该优先处理什么”。

工作流可以设计成三步:

flowchart LR
    A[定时触发] --> B[加载经营数据]
    B --> C[LLM 生成经营日报]
    C --> D[发送日报消息]
    D --> E[结束]

核心代码如下:

@Bean
public CompiledGraph dailyReportAgent(ChatModel chatModel) throws GraphStateException {
    ChatClient chatClient = ChatClient.builder(chatModel)
        .defaultAdvisors(new SimpleLoggerAdvisor())
        .build();

    AsyncNodeAction dataLoaderNode = node_async(state -> {
        /*
         * 从业务系统读取指定周期的数据,例如:
         * - 订单销量数据
         * - 门店商品信息
         * - 用户评价反馈
         * - 客户画像数据
         */
        Map<String, Object> dataSummary = loadDailyOperationData();

        return Map.of("data_summary", dataSummary);
    });

    LlmNode dataAnalysisNode = LlmNode.builder()
        .chatClient(chatClient)
        .paramsKey("data_summary")
        .outputKey("summary_message_to_sender")
        .userPromptTemplate(DAILY_REPORT_PROMPT)
        .build();

    StateGraph stateGraph = new StateGraph("OperationAnalysisAgent", () -> {
        Map<String, KeyStrategy> strategies = new HashMap<>();
        strategies.put("data_summary", new ReplaceStrategy());
        strategies.put("summary_message_to_sender", new ReplaceStrategy());
        strategies.put("message_sender_result", new ReplaceStrategy());
        strategies.put("access_token", new ReplaceStrategy());
        return strategies;
    })
        .addNode("data_loader", dataLoaderNode)
        .addNode("data_analysis", node_async(dataAnalysisNode))
        .addNode("message_sender", node_async(generateMessageSender()))
        .addEdge(StateGraph.START, "data_loader")
        .addEdge("data_loader", "data_analysis")
        .addEdge("data_analysis", "message_sender")
        .addEdge("message_sender", StateGraph.END);

    CompiledGraph compiledGraph = stateGraph.compile();
    compiledGraph.setMaxIterations(100);

    ScheduleConfig scheduleConfig = ScheduleConfig.builder()
        .cronExpression("0 0 8 * * ?")
        .build();

    compiledGraph.schedule(scheduleConfig);

    return compiledGraph;
}

这个实现有几个关键点。

data_loader 不应该把所有原始明细一股脑塞给模型,而应该先在业务侧做必要聚合。比如订单明细可以先计算总销售额、商品排行、退款比例,再把聚合结果传给 LLM。这样既能减少 Token 消耗,也能降低模型误读原始数据的概率。

data_analysis 负责生成日报。Prompt 模板应该明确报告结构,例如:

你是门店经营分析助手。
请根据输入数据生成一份经营日报,要求包含:

1. 今日核心指标
2. 销售变化原因分析
3. 用户反馈摘要
4. 需要关注的问题
5. 明日行动建议

要求:
- 不要编造输入数据中不存在的数值
- 对异常指标说明可能原因
- 行动建议要具体,可执行

message_sender 可以对接企业微信、钉钉、邮件或内部通知系统。发送前最好保留一份报告记录,方便后续追踪某天的分析依据和发送状态。

实践二:评价舆情分析 Agent

评价舆情分析比经营日报更复杂,因为它不只是定时生成报告,还需要在发现风险时通知人工决策。

假设系统每小时扫描一次用户评价。Agent 需要判断哪些评价是投诉,提炼投诉原因,并在出现投诉时生成告警。如果负责人认为需要处理,再执行后续动作,例如创建工单、通知门店、生成整改建议。

流程可以设计成这样:

flowchart TD
    A[每小时定时触发] --> B[加载评价和舆情数据]
    B --> C[逐条调用 LLM 分类]
    C --> D[汇总投诉数量和风险点]
    D --> E{是否存在投诉}
    E -- 否 --> F[结束]
    E -- 是 --> G[生成告警报告]
    G --> H[发送给负责人]
    H --> I[等待人工反馈]
    I --> J{是否忽略}
    J -- 是 --> F
    J -- 否 --> K[执行人工选择的处理动作]
    K --> F

这类 Agent 的价值在于,它不再只依赖关键词匹配。传统监控通常会写规则,例如包含“难吃”“投诉”“退款”就报警,但真实评价会更复杂:

  • “等了半小时才上菜,以后不会再来了。”
  • “包装漏了,客服一直没回复。”
  • “味道还行,但孩子吃完不舒服。”

这些内容不一定命中固定关键词,却可能代表服务、履约、食品安全等风险。LLM 可以从语义层面做分类和摘要。

核心代码可以这样组织:

@Bean
public CompiledGraph evaluationAnalysisAgent(
        ChatModel chatModel,
        FeedbackMapper feedbackMapper
) throws GraphStateException {

    ChatClient chatClient = ChatClient.builder(chatModel)
        .defaultAdvisors(new SimpleLoggerAdvisor())
        .build();

    EvaluationClassifierNode evaluationClassifierNode =
        EvaluationClassifierNode.builder()
            .chatClient(chatClient)
            .inputTextKey("iterator_item")
            .outputKey("session_analysis_result")
            .categories(List.of("yes", "no"))
            .classificationInstructions(List.of(
                "要求返回纯 JSON 字符串,禁止包含非 JSON 格式内容。",
                "JSON 字段包含 user、time、complaint、satisfaction、summary。",
                "complaint 表示当前评价是否为店铺或产品投诉,取值范围为 yes 或 no。",
                "satisfaction 表示用户实际消费满意度。",
                "summary 提炼本条评价的核心问题和可改进方向。"
            ))
            .build();

    StateGraph singleEvaluationAnalysisGraph =
        new StateGraph("single_evaluation_analysis", subStateFactory())
            .addNode("classifier", node_async(evaluationClassifierNode))
            .addEdge(StateGraph.START, "classifier")
            .addEdge("classifier", StateGraph.END);

    AsyncNodeAction sessionLoaderNode = node_async(state -> {
        List<String> evaluations = loadRecentEvaluations();
        return Map.of("evaluation_list", evaluations);
    });

    AsyncNodeAction iterationNode = node_async(state -> {
        List<String> evaluations = state.value("evaluation_list", List.of());

        List<Map<String, Object>> analysisResults = new ArrayList<>();
        for (String evaluation : evaluations) {
            Map<String, Object> result = runSubGraph(
                singleEvaluationAnalysisGraph,
                Map.of("iterator_item", evaluation)
            );
            analysisResults.add(result);
        }

        return Map.of("evaluation_analysis_results", analysisResults);
    });

    AsyncNodeAction resultSummaryNode = node_async(state -> {
        List<Map<String, Object>> results =
            state.value("evaluation_analysis_results", List.of());

        long complaintCount = results.stream()
            .filter(item -> "yes".equals(item.get("complaint")))
            .count();

        Map<String, Object> summary = buildComplaintSummary(results);

        return Map.of(
            "complaint", complaintCount,
            "summary_message", summary
        );
    });

    LlmNode alertReportNode = LlmNode.builder()
        .chatClient(chatClient)
        .paramsKey("summary_message")
        .outputKey("summary_message_to_sender")
        .systemPromptTemplate(EVALUATION_ALERT_PROMPT)
        .build();

    StateGraph stateGraph = new StateGraph("ReviewAnalysisAgent", () -> {
        Map<String, KeyStrategy> strategies = new HashMap<>();
        strategies.put("evaluation_list", new ReplaceStrategy());
        strategies.put("evaluation_analysis_results", new ReplaceStrategy());
        strategies.put("complaint", new ReplaceStrategy());
        strategies.put("summary_message", new ReplaceStrategy());
        strategies.put("summary_message_to_sender", new ReplaceStrategy());
        strategies.put("ignore", new ReplaceStrategy());
        strategies.put("human_action_result", new ReplaceStrategy());
        return strategies;
    })
        .addNode("session_loader", sessionLoaderNode)
        .addNode("iteration_analysis", iterationNode)
        .addNode("result_summary", resultSummaryNode)
        .addNode("alert_report", node_async(alertReportNode))
        .addNode("message_sender", node_async(generateMessageSender()))
        .addNode("human_feedback", node_async(new HumanFeedbackNode(feedbackMapper)))
        .addNode("human_action", node_async(new HumanActionNode()))

        .addEdge(StateGraph.START, "session_loader")
        .addEdge("session_loader", "iteration_analysis")
        .addEdge("iteration_analysis", "result_summary")

        .addConditionalEdges(
            "result_summary",
            AsyncEdgeAction.edge_async(state -> {
                Long complaint = state.value("complaint", 0L);
                return complaint > 0 ? "alert_report" : StateGraph.END;
            }),
            Map.of(
                "alert_report", "alert_report",
                StateGraph.END, StateGraph.END
            )
        )

        .addEdge("alert_report", "message_sender")
        .addEdge("message_sender", "human_feedback")

        .addConditionalEdges(
            "human_feedback",
            AsyncEdgeAction.edge_async(state -> {
                boolean ignore = state.value("ignore", true);
                return ignore ? StateGraph.END : "human_action";
            }),
            Map.of(
                "human_action", "human_action",
                StateGraph.END, StateGraph.END
            )
        )

        .addEdge("human_action", StateGraph.END);

    CompiledGraph compiledGraph = stateGraph.compile();
    compiledGraph.setMaxIterations(1000);

    ScheduleConfig scheduleConfig = ScheduleConfig.builder()
        .cronExpression("0 0 */1 * * ?")
        .build();

    compiledGraph.schedule(scheduleConfig);

    return compiledGraph;
}

这里有两个分支控制点。

第一个分支在 result_summary 后面:

return complaint > 0 ? "alert_report" : StateGraph.END;

如果没有投诉,任务直接结束,不发送无意义通知。如果发现投诉,才生成告警报告。

第二个分支在 human_feedback 后面:

return ignore ? StateGraph.END : "human_action";

负责人可以选择忽略,也可以要求 Agent 继续执行动作。这个设计让 Agent 保持自主性,但不会在高风险环节完全自动决策。

人工介入节点应该怎么设计

后台 Agent 越自动化,越需要明确哪些节点必须由人确认。适合加入人工确认的场景包括:

场景为什么需要人工确认
批量通知用户错发会造成用户投诉
商品下架可能影响销售和库存
风险定级模型判断可能存在误报
对外发布报告内容需要合规审核
财务或清算动作后果不可轻易回滚

人工节点不应该只是一个“暂停按钮”,还应该记录以下信息:

  • 谁处理了这个任务;
  • 处理时间是什么;
  • 选择了哪个动作;
  • 是否填写了备注;
  • Agent 当时给出的判断依据是什么;
  • 后续动作是否执行成功。

这样做的好处是,出了问题可以追溯,不会只看到“模型建议处理”,却不知道人为什么同意执行。

生产环境需要注意的坑

1. 定时任务要保证幂等

后台任务可能因为重试、实例重启、网络抖动而重复执行。经营日报、舆情告警、工单创建这类动作都需要幂等控制。

常见做法是给每次执行生成业务唯一键:

agentName + businessDate + scheduleSlot

例如:

OperationAnalysisAgent:2026-06-07:08
ReviewAnalysisAgent:2026-06-07:13

发送消息或创建工单前先检查这个唯一键是否已经处理过,避免重复推送。

2. 分布式部署要避免多实例重复执行

如果应用部署了 3 个实例,而每个实例都注册同一个定时 Agent,到点后可能会同时执行 3 次。解决方式通常有两类:

方案做法适合场景
分布式锁执行前抢锁,抢到锁的实例运行任务中小规模任务
调度中心由统一调度系统分发任务多应用、多任务、需要可视化管理

单机进程内调度适合开发、测试和小规模应用。进入生产环境后,定时 Agent 最好接入统一任务调度或加分布式锁。

3. LLM 输出必须校验

评价分类节点要求模型返回 JSON(JavaScript Object Notation),但模型可能输出 Markdown 代码块、解释文本或字段缺失。不要直接信任模型输出。

可以增加解析和校验逻辑:

EvaluationResult result = jsonParser.parse(rawText, EvaluationResult.class);

if (result.getComplaint() == null) {
    throw new IllegalArgumentException("complaint 字段不能为空");
}

if (!List.of("yes", "no").contains(result.getComplaint())) {
    throw new IllegalArgumentException("complaint 字段取值非法");
}

对关键任务来说,模型输出应该经过结构化解析、字段校验和兜底处理。

4. 控制最大迭代次数

compiledGraph.setMaxIterations(1000) 用来限制图执行的最大步数。复杂 Agent 如果有循环、条件分支或子图调用,没有上限可能导致异常情况下长时间运行。

设置上限时要结合任务规模估算。例如每小时最多处理 500 条评价,每条评价走一个子图,最大迭代次数就不能只设置成几十。

5. 把长耗时任务拆开

如果一个 Agent 每次要处理大量数据,不建议把所有逻辑塞进一次调度执行里。可以拆成多个阶段:

flowchart LR
    A[定时触发] --> B[生成待处理任务]
    B --> C[分批处理数据]
    C --> D[汇总结果]
    D --> E[生成报告]
    E --> F[发送通知]

分批处理可以降低单次执行时间,也方便失败重试。某一批失败时,只需要重跑这一批,不必从头处理全部数据。

6. 记录每次执行状态

后台 Agent 至少要记录这些运行信息:

字段说明
agent_nameAgent 名称
schedule_time计划触发时间
start_time实际开始时间
end_time结束时间
status成功、失败、运行中、等待人工
input_snapshot输入数据摘要
output_snapshot输出结果摘要
error_message异常信息
human_operator人工处理人
human_decision人工处理结论

没有运行记录,排查问题会非常困难,尤其是涉及模型判断和人工决策的任务。

适合和不适合的场景

后台定时 Agent 适合处理“周期性、可拆解、需要分析判断”的任务。

适合使用不适合使用
每日经营分析毫秒级实时交易链路
舆情和评价扫描强确定性的简单定时脚本
多数据源汇总报告不允许模型参与判断的合规流程
长周期风险监控没有明确输入输出边界的开放式任务
需要人工确认的自动化流程执行失败无法补偿的危险动作

如果一个任务只是每天删除临时文件,用普通定时任务更简单。如果任务需要读业务数据、理解文本、生成判断、触发人工决策,定时 Agent 才能发挥价值。

落地时抓住四件事

后台定时 Agent 可以理解为“定时任务 + 工作流 + LLM + 人工确认”的组合。SAA 的 StateGraph 负责把业务流程拆成节点,CompiledGraph 负责运行工作流,ScheduleConfig 负责让 Agent 按计划触发。

落地时重点关注四个方面:

  1. 周期触发要可靠,生产环境要处理分布式重复执行问题。
  2. 工作流节点要清晰,数据加载、模型分析、消息发送、人工确认不要混在一起。
  3. 模型输出要校验,尤其是 JSON、分类结果和风险判断。
  4. 高风险动作要有人确认,并记录完整审计信息。

这样构建出来的 Agent 不只是一个会聊天的助手,而是可以在后台持续工作的业务自动化单元。


评论