|
|
|
@@ -1,108 +0,0 @@ |
|
|
|
package com.xueyi.common.mqtt.service; |
|
|
|
|
|
|
|
import com.baomidou.mybatisplus.core.toolkit.StringUtils; |
|
|
|
import com.xueyi.common.mqtt.configure.MqttConfig; |
|
|
|
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; |
|
|
|
import org.eclipse.paho.client.mqttv3.MqttCallback; |
|
|
|
import org.eclipse.paho.client.mqttv3.MqttClient; |
|
|
|
import org.eclipse.paho.client.mqttv3.MqttException; |
|
|
|
import org.eclipse.paho.client.mqttv3.MqttMessage; |
|
|
|
import org.springframework.beans.factory.annotation.Autowired; |
|
|
|
import org.springframework.stereotype.Component; |
|
|
|
|
|
|
|
import java.nio.charset.StandardCharsets; |
|
|
|
import java.util.ArrayList; |
|
|
|
import java.util.Arrays; |
|
|
|
import java.util.List; |
|
|
|
|
|
|
|
/** |
|
|
|
* spring mqtt 工具类 |
|
|
|
* |
|
|
|
* @author xueyi |
|
|
|
**/ |
|
|
|
@Component |
|
|
|
@SuppressWarnings(value = {"unchecked", "rawtypes"}) |
|
|
|
public class MqttService implements MqttCallback{ |
|
|
|
|
|
|
|
private List<MqttMessageHandler> observers = new ArrayList<>(); |
|
|
|
|
|
|
|
public void addObserver(MqttMessageHandler observer) { |
|
|
|
observers.add(observer); |
|
|
|
} |
|
|
|
|
|
|
|
public void removeObserver(MqttMessageHandler observer) { |
|
|
|
observers.remove(observer); |
|
|
|
} |
|
|
|
@Autowired |
|
|
|
private MqttClient mqttClient; |
|
|
|
|
|
|
|
@Autowired |
|
|
|
private MqttConfig mqttConfig; |
|
|
|
|
|
|
|
/** |
|
|
|
* 发布消息到指定 topic |
|
|
|
* |
|
|
|
* @param topic 消息主题 |
|
|
|
* @param message 消息内容 |
|
|
|
*/ |
|
|
|
public void sendMessage(String topic, Object message) { |
|
|
|
System.out.println("发送消息到 topic: " + topic); |
|
|
|
System.out.println("消息内容: " + message); |
|
|
|
MqttMessage msg = new MqttMessage(); |
|
|
|
msg.setPayload(message.toString().getBytes()); |
|
|
|
try { |
|
|
|
mqttClient.publish(topic, msg); |
|
|
|
} catch (MqttException e) { |
|
|
|
e.printStackTrace(); |
|
|
|
System.err.println(e.getMessage()); |
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// @PostConstruct |
|
|
|
public void subscribe() { |
|
|
|
mqttClient.setCallback(this); |
|
|
|
for (String topic : Arrays.stream(mqttConfig.getTopics().split(",")).toList()) { |
|
|
|
if (StringUtils.isNotEmpty(topic)) { |
|
|
|
try { |
|
|
|
mqttClient.subscribe(topic, 0); |
|
|
|
} catch (MqttException e) { |
|
|
|
e.printStackTrace(); |
|
|
|
System.err.println(e.getMessage()); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
@Override |
|
|
|
public void connectionLost(Throwable throwable) { |
|
|
|
throwable.printStackTrace(); |
|
|
|
System.out.println("Connection lost: " + throwable.getMessage()); |
|
|
|
try { |
|
|
|
mqttClient.reconnect(); |
|
|
|
subscribe(); |
|
|
|
System.out.println("Reconnected to MQTT broker"); |
|
|
|
} catch (MqttException e) { |
|
|
|
System.out.println("Reconnect failed: " + e.getMessage()); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
@Override |
|
|
|
public void messageArrived(String s, MqttMessage mqttMessage) { |
|
|
|
try{ |
|
|
|
for (MqttMessageHandler observer : observers) { |
|
|
|
observer.handleMessage(s, new String(mqttMessage.getPayload(), StandardCharsets.UTF_8)); |
|
|
|
} |
|
|
|
} catch (Exception e) { |
|
|
|
e.printStackTrace(); |
|
|
|
System.err.println(e.getMessage()); |
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
@Override |
|
|
|
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { |
|
|
|
|
|
|
|
} |
|
|
|
} |