Springboot集成Kafka入门流程及示例代码

张开发
2026/5/23 9:43:10 15 分钟阅读
Springboot集成Kafka入门流程及示例代码
场景一、Kafka 是什么Apache Kafka 是一个分布式、高吞吐、可持久化、高可用的消息队列 / 事件流平台。主要用途异步解耦系统之间不直接调用通过消息通信削峰填谷流量突增时缓冲防止下游被打崩日志收集 / 数据同步如用户行为日志、埋点、CDC 数据实时流处理配合 Flink、Spark Streaming 做实时计算特点极高吞吐单机几十万 TPS消息持久化到磁盘可重复消费分布式可扩展支持多副本高可用消息顺序性保证二、核心架构角色1. BrokerKafka 服务器节点一个集群由多个 Broker 组成负责存储消息、处理读写请求2. Producer生产者发送消息到 Kafka 的客户端决定消息发往哪个分区轮询、哈希、自定义3. Consumer消费者从 Kafka 拉取消息的客户端主动拉取pull不是 Kafka 推送4. Consumer Group消费者组最重要概念之一一组消费者共同消费一个或多个 Topic同一个组内一条消息只会被一个消费者消费不同组可以重复消费同一条消息组内消费者数量 ≤ 分区数多了闲置5. Topic主题消息的分类类似 “消息队列名称”生产者发往 Topic消费者订阅 Topic6. Partition分区核心中的核心一个 Topic 可以分成多个 Partition消息被分散存储在不同分区同一分区内消息严格有序不同分区不保证全局有序分区越多并发消费能力越强7. Replica副本每个分区可以有多个副本备份分为Leader负责读写Follower只同步数据不对外提供服务副本机制保证高可用Leader 挂了 Follower 自动选举新 Leader8. Offset偏移量消息在分区内的唯一编号从 0 递增消费者通过 Offset 标记消费位置Offset 由消费者自己维护新版存在 Kafka 内部主题 __consumer_offsets三、消息存储机制消息持久化到磁盘不是内存以分段文件存储.log 数据文件 .index 索引文件可设置过期策略按时间 / 大小删除顺序写磁盘所以极快四、消费模式Pull 模式消费者主动拉取消费位点自主管理自动提交手动提交业务更可靠三种消费策略earliest从头开始消费latest只消费最新消息none无位移时报错五、核心特性1. 顺序保证分区内严格有序想要全局有序Topic 只设 1 个分区但失去并发2. 高可用多副本机制自动故障转移数据不丢配置acksall、min.insync.replicas23. 高吞吐批量发送页缓存 零拷贝顺序写磁盘4. 可扩展集群可水平扩容Topic 可增加分区5. 消息可靠性acks 机制acks0发完不管最快acks1Leader 写入即确认acksall所有 ISR 副本写入才确认最安全六、重要进阶概念1. ISRIn-Sync Replicas与 Leader 保持同步的副本集合只有 ISR 内的副本才有资格被选为新 Leader防止数据不一致2. 重平衡Rebalance组内消费者数量变化、分区变化时触发重新分配分区给消费者频繁 Rebalance 会影响性能要避免3. 事务消息保证生产者发送多条消息原子性消费者可设置读已提交模式避免脏数据4. 延迟队列 / 死信队列Kafka 原生不支持延迟队列但可通过多层 Topic 定时转移第三方插件实现消费失败的消息可丢入死信队列DLQ5. 消息回溯可以重置 Offset重新消费历史消息七、和其他 MQ 的简单对比注博客https://blog.csdn.net/badao_liumang_qizhi实现Windows 本地搭建 Kafka Zookeeper1. 下载安装 JDK 8Kafka 需要 Java 环境你本地必须装 JDK 并配置好环境变量。2. 下载 Kafka自带 Zookeeper不用单独下去官网下载https://archive.apache.org/dist/kafka/2.8.2/kafka_2.13-2.8.2.tgz推荐 2.8.2 版本最稳定、Windows 兼容最好。下载后解压到一个无中文、无空格的目录3. 修改 Zookeeper 配置进入上面解压后config目录打开文件zookeeper.properties确认这一行默认就是对的dataDir/tmp/zookeeperWindows 可以不改也能跑。4. 修改 Kafka 配置必须改打开文件server.properties找到并修改两处① 日志路径Windows 必须改log.dirsD:\kafka_2.13-2.8.2\kafka-logs② 允许外部访问本地测试必须加在文件末尾加listenersPLAINTEXT://:9092 advertised.listenersPLAINTEXT://localhost:9092保存即可。5、启动 Zookeeper必须先启动打开 CMD 命令行进入 Kafka 目录执行启动命令bin\windows\zookeeper-server-start.bat config\zookeeper.properties看到输出Started AdminServer on address 0.0.0.0 at port 80806、启动 Kafka第二个 CMD 窗口重新开一个新的 CMD不要关 zk进入 Kafka 目录启动 Kafkabin\windows\kafka-server-start.bat config\server.properties看到[KafkaServer id0] started7、创建测试主题必须创建再开一个新 CMD进入目录执行创建主题命令bin\windows\kafka-topics.bat --create --topic test-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1看到Created topic test-topic.SpringBoot 集成 KafkaSpringBoot 官方提供 Kafka 集成 starter直接引入即可dependencies !-- SpringBoot Web用于测试接口发送消息 -- dependency groupIdorg.springframework.boot/groupId artifactIdspring-boot-starter-web/artifactId /dependency !-- SpringBoot Kafka 核心依赖 -- dependency groupIdorg.springframework.kafka/groupId artifactIdspring-kafka/artifactId /dependency /dependenciesapplication.yml 配置spring: kafka: # Kafka 服务地址 bootstrap-servers: 127.0.0.1:9092 # 生产者配置 producer: # 消息key序列化 key-serializer: org.apache.kafka.common.serialization.StringSerializer # 消息value序列化 value-serializer: org.apache.kafka.common.serialization.StringSerializer # 应答级别all保证消息不丢失 acks: 1 # 消费者配置 consumer: # 消费者组ID group-id: test-group # 消息key反序列化 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer # 消息value反序列化 value-deserializer: org.apache.kafka.common.serialization.StringDeserializer # 从头开始消费 auto-offset-reset: earliest # 自动提交偏移量 enable-auto-commit: true核心代码实现1. 生产者发送消息创建一个 Controller 接口用于测试发送消息到 Kafkaimport org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RestController; /** * Kafka 生产者发送消息 */ RestController public class KafkaProducerController { Autowired private KafkaTemplateString, String kafkaTemplate; // 接口发送消息到 test-topic GetMapping(/send/{message}) public String sendMessage(PathVariable String message) { // 参数topic名称 | 消息内容 kafkaTemplate.send(test-topic, message); return 消息发送成功 message; } }2. 消费者监听并消费消息使用 KafkaListener 注解监听主题自动消费消息import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; /** * Kafka 消费者监听主题消费消息 */ Component public class KafkaConsumerService { // 监听 test-topic 主题 KafkaListener(topics test-topic, groupId test-group) public void consumeMessage(String message) { System.out.println(【消费者收到消息】 message); } }启动 SpringBoot 项目浏览器访问http://localhost:885/send/Hello-Kafka验证结果

更多文章