Browse Source

Merge remote-tracking branch 'origin/dev' into dev

tags/v0.8.5^2
yk 2 years ago
parent
commit
3fdc2af963
10 changed files with 595 additions and 0 deletions
  1. +133
    -0
      xueyi-common/xueyi-common-core/src/main/java/com/xueyi/common/core/utils/core/SpringUtils.java
  2. +18
    -0
      xueyi-common/xueyi-common-mqtt/src/main/java/com/xueyi/common/mqtt/annotation/MqttService.java
  3. +17
    -0
      xueyi-common/xueyi-common-mqtt/src/main/java/com/xueyi/common/mqtt/annotation/MqttTopic.java
  4. +79
    -0
      xueyi-common/xueyi-common-mqtt/src/main/java/com/xueyi/common/mqtt/configure/MqttConfigV2.java
  5. +68
    -0
      xueyi-common/xueyi-common-mqtt/src/main/java/com/xueyi/common/mqtt/configure/MqttProperties.java
  6. +22
    -0
      xueyi-common/xueyi-common-mqtt/src/main/java/com/xueyi/common/mqtt/connection/MqttTemplate.java
  7. +11
    -0
      xueyi-common/xueyi-common-mqtt/src/main/java/com/xueyi/common/mqtt/service/IMqttMessageHandler.java
  8. +75
    -0
      xueyi-modules/xueyi-message/src/main/java/com/xueyi/message/comfiguration/MqttReceiveConfig.java
  9. +133
    -0
      xueyi-modules/xueyi-message/src/main/java/com/xueyi/message/handler/MqttMessageHandle.java
  10. +39
    -0
      xueyi-modules/xueyi-message/src/main/java/com/xueyi/message/handler/MqttTopicHandle.java

+ 133
- 0
xueyi-common/xueyi-common-core/src/main/java/com/xueyi/common/core/utils/core/SpringUtils.java View File

@@ -0,0 +1,133 @@
package com.xueyi.common.core.utils.core;

import org.springframework.aop.framework.AopContext;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
import org.springframework.beans.factory.config.BeanFactoryPostProcessor;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;

import java.util.Map;

public class SpringUtils implements BeanFactoryPostProcessor, ApplicationContextAware {

/** Spring应用上下文环境 */
private static ConfigurableListableBeanFactory beanFactory;

private static ApplicationContext applicationContext;

public static Map<String, Object> getBeansByAnnotation(Class clsName) throws BeansException{

return beanFactory.getBeansWithAnnotation(clsName);
}

@Override
public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
SpringUtils.beanFactory = beanFactory;
}

@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
SpringUtils.applicationContext = applicationContext;
}

/**
* 获取对象
*
* @param name
* @return Object 一个以所给名字注册的bean的实例
* @throws org.springframework.beans.BeansException
*
*/
@SuppressWarnings("unchecked")
public static <T> T getBean(String name) throws BeansException
{
return (T) beanFactory.getBean(name);
}

/**
* 获取类型为requiredType的对象
*
* @param clz
* @return
* @throws org.springframework.beans.BeansException
*
*/
public static <T> T getBean(Class<T> clz) throws BeansException
{
T result = (T) beanFactory.getBean(clz);
return result;
}

/**
* 如果BeanFactory包含一个与所给名称匹配的bean定义,则返回true
*
* @param name
* @return boolean
*/
public static boolean containsBean(String name)
{
return beanFactory.containsBean(name);
}

/**
* 判断以给定名字注册的bean定义是一个singleton还是一个prototype。 如果与给定名字相应的bean定义没有被找到,将会抛出一个异常(NoSuchBeanDefinitionException)
*
* @param name
* @return boolean
* @throws org.springframework.beans.factory.NoSuchBeanDefinitionException
*
*/
public static boolean isSingleton(String name) throws NoSuchBeanDefinitionException
{
return beanFactory.isSingleton(name);
}

/**
* @param name
* @return Class 注册对象的类型
* @throws org.springframework.beans.factory.NoSuchBeanDefinitionException
*
*/
public static Class<?> getType(String name) throws NoSuchBeanDefinitionException
{
return beanFactory.getType(name);
}

/**
* 如果给定的bean名字在bean定义中有别名,则返回这些别名
*
* @param name
* @return
* @throws org.springframework.beans.factory.NoSuchBeanDefinitionException
*
*/
public static String[] getAliases(String name) throws NoSuchBeanDefinitionException
{
return beanFactory.getAliases(name);
}

/**
* 获取aop代理对象
*
* @param invoker
* @return
*/
@SuppressWarnings("unchecked")
public static <T> T getAopProxy(T invoker)
{
return (T) AopContext.currentProxy();
}

/**
* 获取当前的环境配置,无配置返回null
*
* @return 当前的环境配置
*/
public static String[] getActiveProfiles()
{
return applicationContext.getEnvironment().getActiveProfiles();
}

}

+ 18
- 0
xueyi-common/xueyi-common-mqtt/src/main/java/com/xueyi/common/mqtt/annotation/MqttService.java View File

@@ -0,0 +1,18 @@
package com.xueyi.common.mqtt.annotation;

import org.springframework.core.annotation.AliasFor;
import org.springframework.stereotype.Component;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface MqttService {

@AliasFor(annotation = Component.class)
String value() default "";
}

+ 17
- 0
xueyi-common/xueyi-common-mqtt/src/main/java/com/xueyi/common/mqtt/annotation/MqttTopic.java View File

@@ -0,0 +1,17 @@
package com.xueyi.common.mqtt.annotation;

import org.springframework.stereotype.Component;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface MqttTopic {
/**
* 主题名称
*/
String value() default "";
}

+ 79
- 0
xueyi-common/xueyi-common-mqtt/src/main/java/com/xueyi/common/mqtt/configure/MqttConfigV2.java View File

@@ -0,0 +1,79 @@
package com.xueyi.common.mqtt.configure;

import com.xueyi.common.mqtt.service.IMqttMessageHandler;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.ExecutorChannel;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.ThreadPoolExecutor;

/**
* mqtt config
*/
@Configuration
public class MqttConfigV2 {

@Autowired
private MqttProperties mqttProperties;

//Mqtt 客户端工厂
@Bean
public MqttPahoClientFactory mqttPahoClientFactory(){
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(mqttProperties.getHostUrl().split(","));
options.setUserName(mqttProperties.getUsername());
options.setPassword(mqttProperties.getPassword().toCharArray());
factory.setConnectionOptions(options);
return factory;
}


// 消息生产者
// @Bean
// public IntegrationFlow mqttInbound(MqttPahoMessageDrivenChannelAdapter adapter){
// adapter.setCompletionTimeout(5000);
// adapter.setConverter(new DefaultPahoMessageConverter());
// //入站投递的通道
// adapter.setOutputChannel(mqttInboundChannel());
// adapter.setQos(1);
// return IntegrationFlows.from(adapter)
//// .channel(new ExecutorChannel(mqttThreadPoolTaskExecutor()))
//// .handle(mqttMessageHandler)
// .get();
// }

//出站消息管道,
@Bean
public MessageChannel mqttOutboundChannel(){
return new DirectChannel();
}



// 出站处理器 (向 mqtt 发送消息 生产者)
@Bean
public IntegrationFlow mqttOutboundFlow(MqttPahoClientFactory factory) {

MqttPahoMessageHandler handler = new MqttPahoMessageHandler(mqttProperties.getOutClientId(),factory);
handler.setAsync(true);
handler.setConverter(new DefaultPahoMessageConverter());
handler.setDefaultTopic(mqttProperties.getDefaultTopic().split(",")[0]);
return IntegrationFlows.from( "mqttOutboundChannel").handle(handler).get();
}
}

+ 68
- 0
xueyi-common/xueyi-common-mqtt/src/main/java/com/xueyi/common/mqtt/configure/MqttProperties.java View File

@@ -0,0 +1,68 @@
package com.xueyi.common.mqtt.configure;

import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

@Data
@Component
public class MqttProperties {

/**
* 用户名
*/
@Value("${mqtt.username}")
private String username;

/**
* 密码
*/
@Value("${mqtt.password}")
private String password;

/**
* 连接地址
*/
@Value("${mqtt.host-url}")
private String hostUrl;

/**
* 进-客户Id
*/
@Value("${mqtt.in-client-id}")
private String inClientId;

/**
* 出-客户Id
*/
@Value("${mqtt.out-client-id}")
private String outClientId;

/**
* 客户Id
*/
@Value("${mqtt.client-id}")
private String clientId;

/**
* 默认连接话题
*/
@Value("${mqtt.default-topic}")
private String defaultTopic;

/**
* 超时时间
*/
@Value("${mqtt.timeout}")
private int timeout;

/**
* 保持连接数
*/
@Value("${mqtt.keepalive}")
private int keepalive;

/**是否清除session*/
@Value("${mqtt.clearSession}")
private boolean clearSession;
}

+ 22
- 0
xueyi-common/xueyi-common-mqtt/src/main/java/com/xueyi/common/mqtt/connection/MqttTemplate.java View File

@@ -0,0 +1,22 @@
package com.xueyi.common.mqtt.connection;

import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

/**
* MqttGateway
*
* @author yinruoxi
* @date 2022/8/23
*/

@Component
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttTemplate {

void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String data);

void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) Integer Qos, String data);
}

+ 11
- 0
xueyi-common/xueyi-common-mqtt/src/main/java/com/xueyi/common/mqtt/service/IMqttMessageHandler.java View File

@@ -0,0 +1,11 @@
package com.xueyi.common.mqtt.service;

import org.springframework.messaging.MessageHandler;

/**
* @author yk
* @description
* @date 2023-05-10 19:05
*/
public interface IMqttMessageHandler extends MessageHandler {
}

+ 75
- 0
xueyi-modules/xueyi-message/src/main/java/com/xueyi/message/comfiguration/MqttReceiveConfig.java View File

@@ -0,0 +1,75 @@
package com.xueyi.message.comfiguration;

import com.xueyi.common.mqtt.configure.MqttProperties;
import com.xueyi.common.mqtt.service.IMqttMessageHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.ExecutorChannel;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.MessageChannel;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;

import java.util.concurrent.ThreadPoolExecutor;

@Configuration
public class MqttReceiveConfig {

@Autowired
private MqttProperties mqttProperties;
@Autowired
private IMqttMessageHandler mqttMessageHandler;


// 入站消息管道
@Bean
public MessageChannel mqttInboundChannel(){
return new DirectChannel();
}

// Mqtt 管道适配器
@Bean
public MqttPahoMessageDrivenChannelAdapter adapter(MqttPahoClientFactory factory){
return new MqttPahoMessageDrivenChannelAdapter(mqttProperties.getInClientId(),factory,mqttProperties.getDefaultTopic().split(","));
}

@Bean
public IntegrationFlow mqttMessageInbound(MqttPahoMessageDrivenChannelAdapter adapter){
adapter.setCompletionTimeout(5000);
adapter.setConverter(new DefaultPahoMessageConverter());
//入站投递的通道
adapter.setOutputChannel(mqttInboundChannel());
adapter.setQos(1);
return IntegrationFlows.from(adapter)
.channel(new ExecutorChannel(mqttThreadPoolTaskExecutor()))
.handle(mqttMessageHandler)
.get();
}

@Bean
public ThreadPoolTaskExecutor mqttThreadPoolTaskExecutor()
{
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 最大可创建的线程数
int maxPoolSize = 200;
executor.setMaxPoolSize(maxPoolSize);
// 核心线程池大小
int corePoolSize = 50;
executor.setCorePoolSize(corePoolSize);
// 队列最大长度
int queueCapacity = 1000;
executor.setQueueCapacity(queueCapacity);
// 线程池维护线程所允许的空闲时间
int keepAliveSeconds = 300;
executor.setKeepAliveSeconds(keepAliveSeconds);
// 线程池对拒绝任务(无线程可用)的处理策略
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
return executor;
}
}

+ 133
- 0
xueyi-modules/xueyi-message/src/main/java/com/xueyi/message/handler/MqttMessageHandle.java View File

@@ -0,0 +1,133 @@
package com.xueyi.message.handler;

import com.xueyi.common.core.utils.core.SpringUtils;
import com.xueyi.common.mqtt.annotation.MqttService;
import com.xueyi.common.mqtt.annotation.MqttTopic;
import com.xueyi.common.mqtt.service.IMqttMessageHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.stereotype.Component;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Map;

/**
* mqtt消息处理类
* 这里的代码是在spring-integration-mqtt的基础上修改的
* 他的代码是在接收到消息后 直接调用了方法
* 我这里是把消息放到线程池里面去处理
* 这样就不会阻塞mqtt的消息接收
*/
@Component
public class MqttMessageHandle implements IMqttMessageHandler {

private static final Logger log = LoggerFactory.getLogger(MqttMessageHandle.class);

// 包含 @MqttService注解 的类(Component)
public static Map<String, Object> mqttServices;

/**
* 所有mqtt到达的消息都会在这里处理
* 要注意这个方法是在线程池里面运行的
* @param message message
*/
@Override
public void handleMessage(Message<?> message) throws MessagingException {
getMqttTopicService(message);

}

public Map<String, Object> getMqttServices(){
if(mqttServices==null){
mqttServices = SpringUtils.getBeansByAnnotation(MqttService.class);
}
return mqttServices;
}

public void getMqttTopicService(Message<?> message){
// 在这里 我们根据不同的 主题 分发不同的消息
String receivedTopic = message.getHeaders().get("mqtt_receivedTopic",String.class);
if(receivedTopic==null || "".equals(receivedTopic)){
return;
}
for(Map.Entry<String, Object> entry : getMqttServices().entrySet()){
// 把所有带有 @MqttService 的类遍历
Class<?> clazz = entry.getValue().getClass();
// 获取他所有方法
Method[] methods = clazz.getDeclaredMethods();
for ( Method method: methods ){
if (method.isAnnotationPresent(MqttTopic.class)){
// 如果这个方法有 这个注解
MqttTopic handleTopic = method.getAnnotation(MqttTopic.class);
if(isMatch(receivedTopic,handleTopic.value())){
// 并且 这个 topic 匹配成功
try {
method.invoke(SpringUtils.getBean(clazz),message);
return;
} catch (IllegalAccessException e) {
e.printStackTrace();
log.error("代理发生问题");
} catch (InvocationTargetException e) {
log.error("执行 {} 方法出现错误",handleTopic.value(),e);
}
}
}
}
}
}

/**
* mqtt 订阅的主题与我实际的主题是否匹配
* @param topic 是实际的主题
* @param pattern 是我订阅的主题 可以是通配符模式
* @return 是否匹配
*/
public static boolean isMatch(String topic, String pattern){

if((topic==null) || (pattern==null) ){
return false;
}

if(topic.equals(pattern)){
// 完全相等是肯定匹配的
return true;
}

if("#".equals(pattern)){
// # 号代表所有主题 肯定匹配的
return true;
}
String[] splitTopic = topic.split("/");
String[] splitPattern = pattern.split("/");

boolean match = true;

// 如果包含 # 则只需要判断 # 前面的
for (int i = 0; i < splitPattern.length; i++) {
if(!"#".equals(splitPattern[i])){
// 不是# 号 正常判断
if(i>=splitTopic.length){
// 此时长度不相等 不匹配
match = false;
break;
}
if(!splitTopic[i].equals(splitPattern[i]) && !"+".equals(splitPattern[i])){
// 不相等 且不等于 +
match = false;
break;
}
}
else {
// 是# 号 肯定匹配的
break;
}
}

return match;
}

}

+ 39
- 0
xueyi-modules/xueyi-message/src/main/java/com/xueyi/message/handler/MqttTopicHandle.java View File

@@ -0,0 +1,39 @@
package com.xueyi.message.handler;

import com.xueyi.common.mqtt.annotation.MqttService;
import com.xueyi.common.mqtt.annotation.MqttTopic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.Message;

@MqttService
public class MqttTopicHandle {

private static final Logger log = LoggerFactory.getLogger(MqttTopicHandle.class);

/**
* 以下内容是使用范例
// 这里的 # 号是通配符
@MqttTopic("test/#")
public void test(Message<?> message){
log.info("test="+message.getPayload());
}

// 这里的 + 号是通配符
@MqttTopic("topic/+/+/up")
public void up(Message<?> message){
log.info("up="+message.getPayload());
}

// 注意 你必须先订阅
@MqttTopic("topic/1/2/down")
public void down(Message<?> message){
log.info("down="+message.getPayload());
}
*/

@MqttTopic("digital_man/+/reply/setup")
public void setup(Message message){
log.info("down="+message.getPayload());
}
}

Loading…
Cancel
Save