AI agent实现打字机效果

张开发
2026/4/4 3:25:46 15 分钟阅读
AI agent实现打字机效果
搭建AI核心工作流程1、流式传输实现AI打字机效果1、首先进行流式调用大模型参考相关api。FluxChatResponse streamResponse chatModel.stream(new Prompt(new UserMessage(prompt)));/** * 调用 LLM流式输出 */ private String callLlmWithStreaming(String prompt, ConsumerString streamHandler, SseMessageTypeEnum messageType) { StringBuilder contentBuilder new StringBuilder(); FluxChatResponse streamResponse chatModel.stream(new Prompt(new UserMessage(prompt))); streamResponse .doOnNext(response - { String chunk response.getResult().getOutput().getText(); if (chunk ! null !chunk.isEmpty()) { contentBuilder.append(chunk); streamHandler.accept(messageType.getStreamingPrefix() chunk); } }) .doOnError(error - log.error(LLM 流式调用失败, messageType{}, messageType, error)) .blockLast(); return contentBuilder.toString(); }2、在接收流式调用时传递一个回调函数对象。【ConsumerString streamHandler】1、该对象accept信息时会触发回调。2、回调一直往上传传递到初始的定义方【import java.util.function.Consumer;】【public voidexecuteArticleGeneration(ArticleState state, ConsumerString streamHandler) {】【agent2GenerateOutline(state, streamHandler);】3、在往上传就是调用方。再调用方产生了回调。【// 执行智能体编排,并通过 SSE 推送进度articleAgentService.executeArticleGeneration(state, message - {handleAgentMessage(taskId, message, state);});】4、handleAgentMessage回调函数会调用SSE进行流式推送到前端页面。3、SSE流式调用管理代码package com.panda.multiagent.manager; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; import java.io.IOException; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import static com.panda.multiagent.constant.ArticleConstant.SSE_RECONNECT_TIME_MS; import static com.panda.multiagent.constant.ArticleConstant.SSE_TIMEOUT_MS; /** * Author panda * Date 2026-04-02 * Des */ Component Slf4j public class SseEmitterManager { /** * 存储所有的 SseEmitter */ private final MapString, SseEmitter emitterMap new ConcurrentHashMap(); /** * 创建 SseEmitter * * param taskId 任务ID * return SseEmitter */ public SseEmitter createEmitter(String taskId) { SseEmitter emitter new SseEmitter(SSE_TIMEOUT_MS); // 设置超时回调 emitter.onTimeout(() - { log.warn(SSE 连接超时, taskId{}, taskId); emitterMap.remove(taskId); }); // 设置完成回调 emitter.onCompletion(() - { log.info(SSE 连接完成, taskId{}, taskId); emitterMap.remove(taskId); }); // 设置错误回调 emitter.onError((e) - { log.error(SSE 连接错误, taskId{}, taskId, e); emitterMap.remove(taskId); }); emitterMap.put(taskId, emitter); log.info(SSE 连接已创建, taskId{}, taskId); return emitter; } /** * 发送消息 * * param taskId 任务ID * param message 消息内容 */ public void send(String taskId, String message) { SseEmitter emitter emitterMap.get(taskId); if (emitter null) { log.warn(SSE Emitter 不存在, taskId{}, taskId); return; } try { SseEmitter.SseEventBuilder builder SseEmitter.event(); builder.data(message).reconnectTime(SSE_RECONNECT_TIME_MS); emitter.send(builder); log.debug(SSE 消息发送成功, taskId{}, message{}, taskId, message); } catch (IOException e) { log.error(SSE 消息发送失败, taskId{}, taskId, e); emitterMap.remove(taskId); } } /** * 完成连接 * * param taskId 任务ID */ public void complete(String taskId) { SseEmitter emitter emitterMap.get(taskId); if (emitter null) { log.warn(SSE Emitter 不存在, taskId{}, taskId); return; } try { emitter.complete(); log.info(SSE 连接已完成, taskId{}, taskId); } catch (Exception e) { log.error(SSE 连接完成失败, taskId{}, taskId, e); } finally { emitterMap.remove(taskId); } } /** * 检查 Emitter 是否存在 * * param taskId 任务ID * return 是否存在 */ public boolean exists(String taskId) { return emitterMap.containsKey(taskId); } }

更多文章