springboot集成mqtt的订阅端

张开发
2026/4/7 19:05:16 15 分钟阅读

分享文章

springboot集成mqtt的订阅端
环境JDK21springboot3.3.5EMQX5.8.5MQTT协议5.0引入的mqtt jar包dependency groupIdcom.hivemq/groupId artifactIdhivemq-mqtt-client/artifactId version1.3.0/version /dependencyyml中mqtt的配置$share/my-group/test/topic共享订阅也就是负载均衡$share是固定写法my-group共享组共享组内的消费者只能有一个消费mqtt: host: 192.168.1.2 port: 1883 client-id: springboot-listener-001 username: admin password: xxyy1212 qos: 1 topics: - device/up - device/data - $share/my-group/test/topic读取yml的配置文件import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.stereotype.Component; import java.util.List; Component ConfigurationProperties(prefix mqtt) public class MqttProperties { private String host; private int port; private String clientId; private String username; private String password; private int qos; private ListString topics; // ✅ 列表正确读取 // 自动生成 getter setter public String getHost() {return host;} public void setHost(String host) {this.host host;} public int getPort() {return port;} public void setPort(int port) {this.port port;} public String getClientId() {return clientId;} public void setClientId(String clientId) {this.clientId clientId;} public String getUsername() {return username;} public void setUsername(String username) {this.username username;} public String getPassword() {return password;} public void setPassword(String password) {this.password password;} public int getQos() {return qos;} public void setQos(int qos) {this.qos qos;} public ListString getTopics() {return topics;} public void setTopics(ListString topics) {this.topics topics;} }接收消息1、自动检测链接状态2、连接中断后自动连接package com.crazy.shopping.listener; import com.crazy.shopping.config.MqttProperties; import com.hivemq.client.mqtt.MqttClientState; import com.hivemq.client.mqtt.MqttGlobalPublishFilter; import com.hivemq.client.mqtt.datatypes.MqttQos; import com.hivemq.client.mqtt.mqtt5.Mqtt5AsyncClient; import com.hivemq.client.mqtt.mqtt5.Mqtt5Client; import com.hivemq.client.mqtt.mqtt5.message.connect.connack.Mqtt5ConnAck; import com.hivemq.client.mqtt.mqtt5.message.disconnect.Mqtt5DisconnectReasonCode; import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish; import jakarta.annotation.PostConstruct; import jakarta.annotation.PreDestroy; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.nio.charset.StandardCharsets; import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.*; Component public class Mqtt5MessageListener { Autowired private MqttProperties mqttProperties; private MqttQos mqttQos; private Mqtt5AsyncClient client; private final ScheduledExecutorService scheduler Executors.newSingleThreadScheduledExecutor(); private final ExecutorService messageExecutor new ThreadPoolExecutor( 3, 10, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue(1000), new ThreadPoolExecutor.CallerRunsPolicy() ); PostConstruct public void start() { mqttQos MqttQos.fromCode(mqttProperties.getQos()); // 每3秒检查重连 强制绑定监听 scheduler.scheduleAtFixedRate(this::checkAndConnect, 0, 3, TimeUnit.SECONDS); } private void checkAndConnect() { if (client ! null client.getState() MqttClientState.CONNECTED) { return; } System.out.println(⏳ 重连中...); // 每次重连都新建客户端彻底解决监听丢失 client Mqtt5Client.builder() .serverHost(mqttProperties.getHost()) .serverPort(mqttProperties.getPort()) .identifier(mqttProperties.getClientId()) .buildAsync(); // // 关键连接之前 先注册监听 // registerMessageListener(); // 连接 client.connectWith() .cleanStart(false) .sessionExpiryInterval(86400L) .keepAlive(60) .simpleAuth() .username(mqttProperties.getUsername()) .password(mqttProperties.getPassword().getBytes(StandardCharsets.UTF_8)) .applySimpleAuth() .send() .whenComplete((ack, t) - { if (t null) { System.out.println(✅ 连接成功); subscribeAllTopics(); } }); } // // 每次重连都重新注册监听绝杀 // private void registerMessageListener() { client.toAsync().publishes(MqttGlobalPublishFilter.ALL, this::handleMessage); } // 消息处理可以进行幂等性校验结合redis--TODO private void handleMessage(Mqtt5Publish publish) { messageExecutor.submit(() - { try { String topic publish.getTopic().toString(); String msg new String(publish.getPayloadAsBytes(), StandardCharsets.UTF_8); System.out.println(); System.out.println( 收到消息 topic); System.out.println(内容 msg); System.out.println(); } catch (Exception e) { e.printStackTrace(); } }); } // 订阅 private void subscribeAllTopics() { for (String topic : mqttProperties.getTopics()) { client.subscribeWith() .topicFilter(topic) .qos(mqttQos) .send() .whenComplete((sub, t) - { if (t null) { System.out.println(✅ 已订阅 topic); } }); } } PreDestroy public void close() { try { scheduler.shutdownNow(); messageExecutor.shutdown(); if (client ! null client.getState().isConnected()) { client.disconnect(); } } catch (Exception ignored) {} } }效果

更多文章