MQTT
使用 Java SDK 接入 (opens new window)
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.5</version>
</dependency>
1
2
3
4
5
2
3
4
5
@Slf4j
@Configuration
public class MqttConfig {
@Bean
public MqttClient mqttClient() throws MqttException {
String broker = "tcp://broker.emqx.io:1883";
String clientId = MqttClient.generateClientId();
// 持久化
MemoryPersistence persistence = new MemoryPersistence();
// MQTT 连接选项
MqttConnectOptions connOpts = new MqttConnectOptions();
// 设置认证信息
connOpts.setUserName("emqx_user");
connOpts.setPassword("emqx_password".toCharArray());
MqttClient client = new MqttClient(broker, clientId, persistence);
// 设置回调
client.setCallback(new SampleCallback());
// 建立连接
client.connect(connOpts);
// 订阅消息
client.subscribe("test/topic");
return client;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class SampleCallback implements MqttCallback {
// 连接丢失
public void connectionLost(Throwable cause) {
System.out.println("connection lost:" + cause.getMessage());
}
// 收到消息
public void messageArrived(String topic, MqttMessage message) {
System.out.println("Received message: \n topic:" + topic + "\n Qos:" + message.getQos() + "\n payload:" + new String(message.getPayload()));
}
// 消息传递成功
public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("deliveryComplete");
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
编辑 (opens new window)
上次更新: 2025-03-17 10:13:56