MQTT

使用 Java SDK 接入 (opens new window)

<dependency>
    <groupId>org.eclipse.paho</groupId>
    <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
    <version>1.2.5</version>
</dependency>
1
2
3
4
5
@Slf4j
@Configuration
public class MqttConfig {
    @Bean
    public MqttClient mqttClient() throws MqttException {
        String broker = "tcp://broker.emqx.io:1883";
        String clientId = MqttClient.generateClientId();
        // 持久化
        MemoryPersistence persistence = new MemoryPersistence();
        // MQTT 连接选项
        MqttConnectOptions connOpts = new MqttConnectOptions();
        // 设置认证信息
        connOpts.setUserName("emqx_user");
        connOpts.setPassword("emqx_password".toCharArray());

        MqttClient client = new MqttClient(broker, clientId, persistence);
        // 设置回调
        client.setCallback(new SampleCallback());
        // 建立连接
        client.connect(connOpts);
        // 订阅消息
        client.subscribe("test/topic");
        return client;
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class SampleCallback implements MqttCallback {
    // 连接丢失
    public void connectionLost(Throwable cause) {
        System.out.println("connection lost:" + cause.getMessage());
    }

    //  收到消息
    public void messageArrived(String topic, MqttMessage message) {
        System.out.println("Received message: \n  topic:" + topic + "\n  Qos:" + message.getQos() + "\n  payload:" + new String(message.getPayload()));
    }

    // 消息传递成功
    public void deliveryComplete(IMqttDeliveryToken token) {
        System.out.println("deliveryComplete");
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
上次更新: 2025-03-17 10:13:56