Java实现MQTT协议完整示例

一、基础环境配置

1.1 依赖引入(Maven)

<!-- POM.xml 配置 -->
<dependencies>
    <!-- Eclipse Paho MQTT客户端 -->
    <dependency>
        <groupId>org.eclipse.paho</groupId>
        <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
        <version>1.2.5</version>
    </dependency>
    
    <!-- 日志框架 -->
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
        <version>2.0.7</version>
    </dependency>
    <dependency>
        <groupId>ch.qos.logback</groupId>
        <artifactId>logback-classic</artifactId>
        <version>1.4.8</version>
    </dependency>
</dependencies>
图片[1]_Java实现MQTT协议完整示例_知途无界

二、MQTT客户端核心实现

2.1 连接配置类

public class MqttConfig {
    private String brokerUrl = "tcp://mqtt.eclipse.org:1883";
    private String clientId = "JavaClient_" + System.currentTimeMillis();
    private String username = null;
    private String password = null;
    private int connectionTimeout = 30;
    private int keepAliveInterval = 60;
    private boolean cleanSession = true;
    private int qos = 1;
    
    // 构造方法和getter/setter省略...
    
    public MqttConnectOptions getConnectOptions() {
        MqttConnectOptions options = new MqttConnectOptions();
        options.setUserName(username);
        options.setPassword(password != null ? password.toCharArray() : null);
        options.setConnectionTimeout(connectionTimeout);
        options.setKeepAliveInterval(keepAliveInterval);
        options.setCleanSession(cleanSession);
        options.setAutomaticReconnect(true); // 自动重连
        return options;
    }
}

2.2 客户端封装类

public class MqttClientWrapper {
    private static final Logger logger = LoggerFactory.getLogger(MqttClientWrapper.class);
    
    private IMqttClient client;
    private MqttConfig config;
    private MqttCallback callback;
    
    public MqttClientWrapper(MqttConfig config) throws MqttException {
        this.config = config;
        this.client = new MqttClient(
            config.getBrokerUrl(), 
            config.getClientId(),
            new MemoryPersistence()
        );
        
        this.callback = new MqttCallback() {
            @Override
            public void connectionLost(Throwable cause) {
                logger.warn("连接丢失", cause);
            }

            @Override
            public void messageArrived(String topic, MqttMessage message) {
                logger.info("收到消息 - Topic: {}, QoS: {}, Payload: {}",
                    topic, message.getQos(), new String(message.getPayload()));
            }

            @Override
            public void deliveryComplete(IMqttDeliveryToken token) {
                logger.debug("消息投递完成: {}", token.getMessageId());
            }
        };
        
        client.setCallback(callback);
    }
    
    public void connect() throws MqttException {
        if (!client.isConnected()) {
            client.connect(config.getConnectOptions());
            logger.info("MQTT客户端已连接");
        }
    }
    
    public void disconnect() throws MqttException {
        if (client.isConnected()) {
            client.disconnect();
            logger.info("MQTT客户端已断开");
        }
    }
    
    public void subscribe(String topic) throws MqttException {
        subscribe(topic, config.getQos());
    }
    
    public void subscribe(String topic, int qos) throws MqttException {
        client.subscribe(topic, qos);
        logger.info("已订阅主题: {}, QoS: {}", topic, qos);
    }
    
    public void unsubscribe(String topic) throws MqttException {
        client.unsubscribe(topic);
        logger.info("已取消订阅主题: {}", topic);
    }
    
    public void publish(String topic, String payload) throws MqttException {
        publish(topic, payload, config.getQos(), false);
    }
    
    public void publish(String topic, String payload, int qos, boolean retained) 
            throws MqttException {
        MqttMessage message = new MqttMessage(payload.getBytes());
        message.setQos(qos);
        message.setRetained(retained);
        client.publish(topic, message);
        logger.debug("已发布消息 - Topic: {}, QoS: {}, Retained: {}",
            topic, qos, retained);
    }
    
    public boolean isConnected() {
        return client != null && client.isConnected();
    }
    
    public void close() throws MqttException {
        if (client != null) {
            client.close();
            logger.info("MQTT客户端已关闭");
        }
    }
}

三、高级功能实现

3.1 消息处理器接口

public interface MessageHandler {
    void handleMessage(String topic, MqttMessage message);
}

public class CustomMqttCallback implements MqttCallback {
    private final Map<String, MessageHandler> handlers = new ConcurrentHashMap<>();
    
    public void registerHandler(String topicFilter, MessageHandler handler) {
        handlers.put(topicFilter, handler);
    }
    
    public void unregisterHandler(String topicFilter) {
        handlers.remove(topicFilter);
    }
    
    @Override
    public void connectionLost(Throwable cause) {
        System.err.println("连接丢失: " + cause.getMessage());
    }

    @Override
    public void messageArrived(String topic, MqttMessage message) {
        handlers.entrySet().stream()
            .filter(entry -> topic.matches(entry.getKey()))
            .forEach(entry -> entry.getValue().handleMessage(topic, message));
    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        System.out.println("消息投递完成: " + token.getMessageId());
    }
}

3.2 断线重连机制

public class ReconnectThread extends Thread {
    private static final int MAX_RETRIES = 5;
    private static final long RETRY_INTERVAL = 5000;
    
    private final MqttClientWrapper client;
    private int retryCount = 0;
    
    public ReconnectThread(MqttClientWrapper client) {
        this.client = client;
        setName("MQTT-Reconnect-Thread");
    }
    
    @Override
    public void run() {
        while (retryCount < MAX_RETRIES && !isInterrupted()) {
            try {
                Thread.sleep(RETRY_INTERVAL);
                logger.info("尝试重新连接...({}/{})", retryCount + 1, MAX_RETRIES);
                
                if (client.connect()) {
                    logger.info("重新连接成功");
                    return;
                }
                
                retryCount++;
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                logger.warn("重连线程被中断");
                break;
            } catch (Exception e) {
                logger.error("重连失败", e);
            }
        }
        
        if (retryCount >= MAX_RETRIES) {
            logger.error("达到最大重试次数,放弃重连");
        }
    }
}

四、SSL/TLS安全连接

4.1 SSL配置类

public class SslConfig {
    private String caCertificateFile;
    private String clientCertificateFile;
    private String clientPrivateKeyFile;
    private String keyPassword;
    
    public SSLSocketFactory getSocketFactory() throws Exception {
        // 加载CA证书
        CertificateFactory cf = CertificateFactory.getInstance("X.509");
        Certificate caCert = cf.generateCertificate(
            new FileInputStream(caCertificateFile));
        
        // 创建KeyStore
        KeyStore keyStore = KeyStore.getInstance(KeyStore.getDefaultType());
        keyStore.load(null, null);
        keyStore.setCertificateEntry("ca", caCert);
        
        // 加载客户端证书和私钥
        KeyStore clientKeyStore = KeyStore.getInstance("PKCS12");
        clientKeyStore.load(
            new FileInputStream(clientCertificateFile),
            keyPassword.toCharArray());
        
        // 创建KeyManagerFactory
        KeyManagerFactory kmf = KeyManagerFactory.getInstance(
            KeyManagerFactory.getDefaultAlgorithm());
        kmf.init(clientKeyStore, keyPassword.toCharArray());
        
        // 创建TrustManagerFactory
        TrustManagerFactory tmf = TrustManagerFactory.getInstance(
            TrustManagerFactory.getDefaultAlgorithm());
        tmf.init(keyStore);
        
        // 创建SSLContext
        SSLContext sslContext = SSLContext.getInstance("TLSv1.2");
        sslContext.init(
            kmf.getKeyManagers(),
            tmf.getTrustManagers(),
            new SecureRandom());
        
        return sslContext.getSocketFactory();
    }
    
    // 构造方法和getter/setter省略...
}

4.2 安全连接配置

public class SecureMqttConfig extends MqttConfig {
    private SslConfig sslConfig;
    
    @Override
    public MqttConnectOptions getConnectOptions() {
        MqttConnectOptions options = super.getConnectOptions();
        try {
            options.setSocketFactory(sslConfig.getSocketFactory());
            options.setHttpsHostnameVerificationEnabled(false); // 禁用主机名验证
        } catch (Exception e) {
            throw new RuntimeException("SSL配置失败", e);
        }
        return options;
    }
    
    // 构造方法和getter/setter省略...
}

五、完整使用示例

5.1 发布者示例

public class MqttPublisher {
    private static final Logger logger = LoggerFactory.getLogger(MqttPublisher.class);
    
    public static void main(String[] args) {
        MqttConfig config = new MqttConfig();
        config.setBrokerUrl("tcp://localhost:1883");
        config.setClientId("JavaPublisher");
        
        try (MqttClientWrapper client = new MqttClientWrapper(config)) {
            client.connect();
            
            // 发布10条测试消息
            for (int i = 1; i <= 10; i++) {
                String topic = "test/topic";
                String payload = "消息 " + i + " - " + LocalDateTime.now();
                
                client.publish(topic, payload);
                logger.info("已发布: {}", payload);
                
                Thread.sleep(1000);
            }
        } catch (Exception e) {
            logger.error("发布消息出错", e);
        }
    }
}

5.2 订阅者示例

public class MqttSubscriber {
    private static final Logger logger = LoggerFactory.getLogger(MqttSubscriber.class);
    
    public static void main(String[] args) {
        MqttConfig config = new MqttConfig();
        config.setBrokerUrl("tcp://localhost:1883");
        config.setClientId("JavaSubscriber");
        
        try (MqttClientWrapper client = new MqttClientWrapper(config)) {
            client.connect();
            
            // 订阅主题并添加处理器
            String topic = "test/#";
            client.subscribe(topic);
            
            logger.info("订阅成功,等待消息...");
            
            // 保持运行状态
            while (true) {
                Thread.sleep(1000);
            }
        } catch (Exception e) {
            logger.error("订阅消息出错", e);
        }
    }
}

六、Spring Boot集成方案

6.1 Spring配置类

@Configuration
public class MqttConfig {
    
    @Value("${mqtt.broker.url}")
    private String brokerUrl;
    
    @Value("${mqtt.client.id}")
    private String clientId;
    
    @Bean
    public IMqttClient mqttClient() throws MqttException {
        IMqttClient client = new MqttClient(
            brokerUrl, 
            clientId, 
            new MemoryPersistence()
        );
        
        MqttConnectOptions options = new MqttConnectOptions();
        options.setAutomaticReconnect(true);
        options.setCleanSession(true);
        
        client.connect(options);
        return client;
    }
    
    @Bean
    public MqttTemplate mqttTemplate(IMqttClient client) {
        return new MqttTemplate(client);
    }
}

6.2 MQTT模板类

@Component
public class MqttTemplate {
    private final IMqttClient client;
    
    public MqttTemplate(IMqttClient client) {
        this.client = client;
    }
    
    public void publish(String topic, String payload) throws MqttException {
        publish(topic, payload, 1, false);
    }
    
    public void publish(String topic, String payload, int qos, boolean retained) 
            throws MqttException {
        MqttMessage message = new MqttMessage(payload.getBytes());
        message.setQos(qos);
        message.setRetained(retained);
        client.publish(topic, message);
    }
    
    public void subscribe(String topic, MessageHandler handler) 
            throws MqttException {
        client.subscribe(topic);
        ((MqttCallback)client.getCallback())
            .registerHandler(topic, handler);
    }
}

七、性能优化建议

7.1 连接池配置

public class MqttConnectionPool {
    private final BlockingQueue<IMqttClient> pool;
    private final int maxSize;
    
    public MqttConnectionPool(String brokerUrl, int poolSize) 
            throws MqttException {
        this.maxSize = poolSize;
        this.pool = new LinkedBlockingQueue<>(maxSize);
        
        for (int i = 0; i < poolSize; i++) {
            IMqttClient client = new MqttClient(
                brokerUrl, 
                "PooledClient_" + i,
                new MemoryPersistence()
            );
            client.connect();
            pool.offer(client);
        }
    }
    
    public IMqttClient borrowClient() throws InterruptedException {
        return pool.take();
    }
    
    public void returnClient(IMqttClient client) {
        if (client.isConnected()) {
            pool.offer(client);
        }
    }
    
    public void close() throws MqttException {
        IMqttClient client;
        while ((client = pool.poll()) != null) {
            if (client.isConnected()) {
                client.disconnect();
            }
            client.close();
        }
    }
}

7.2 批量消息处理

public class MqttBatchProcessor {
    private final MqttClientWrapper client;
    private final Queue<MqttMessage> messageQueue = new ConcurrentLinkedQueue<>();
    private final ScheduledExecutorService executor;
    
    public MqttBatchProcessor(MqttClientWrapper client) {
        this.client = client;
        this.executor = Executors.newSingleThreadScheduledExecutor();
        
        // 每5秒处理一次批量消息
        executor.scheduleAtFixedRate(this::processBatch, 5, 5, TimeUnit.SECONDS);
    }
    
    public void addMessage(String topic, String payload) {
        MqttMessage message = new MqttMessage(payload.getBytes());
        message.setQos(1);
        messageQueue.add(message);
    }
    
    private void processBatch() {
        if (messageQueue.isEmpty()) return;
        
        List<MqttMessage> batch = new ArrayList<>();
        messageQueue.drainTo(batch, 100); // 每次最多处理100条
        
        if (!batch.isEmpty()) {
            try {
                IMqttDeliveryToken[] tokens = new IMqttDeliveryToken[batch.size()];
                for (int i = 0; i < batch.size(); i++) {
                    tokens[i] = client.publish("batch/topic", batch.get(i));
                }
                
                // 等待所有消息确认
                for (IMqttDeliveryToken token : tokens) {
                    token.waitForCompletion();
                }
            } catch (Exception e) {
                // 处理失败的消息重新入队
                batch.forEach(messageQueue::add);
            }
        }
    }
    
    public void shutdown() {
        executor.shutdown();
    }
}

八、常见问题解决方案

8.1 连接问题排查

问题现象可能原因解决方案
连接超时网络不通/防火墙检查网络连接和端口(1883/8883)
认证失败用户名密码错误验证凭据,检查MQTT broker配置
SSL握手失败证书不匹配确保证书链完整且受信任
频繁断开KeepAlive设置不当调整keepAliveInterval参数

8.2 消息可靠性保障

graph TD
    A[消息发送] --> B{QoS级别}
    B -->|QoS 0| C[最多一次]
    B -->|QoS 1| D[至少一次]
    B -->|QoS 2| E[恰好一次]
    
    D --> F[确认机制]
    E --> G[握手协议]
    
    F --> H[重发机制]
    G --> I[消息去重]

核心要点总结​:

  1. Eclipse Paho是Java实现MQTT协议的标准选择
  2. 连接管理和消息处理需要完善的异常处理
  3. QoS级别选择直接影响消息可靠性
  4. SSL/TLS为通信提供安全保障
  5. 生产环境应考虑连接池和批量处理优化

最佳实践建议​:

  1. 使用连接池管理MQTT客户端连接
  2. 根据业务需求选择合适的QoS级别
  3. 实现完善的断线重连机制
  4. 重要业务消息实现本地持久化和重试
  5. 监控关键指标:连接状态、消息吞吐量、延迟等
© 版权声明
THE END
喜欢就点个赞,支持一下吧!
点赞16 分享
评论 抢沙发
头像
欢迎您留下评论!
提交
头像

昵称

取消
昵称表情代码图片

    暂无评论内容