浏览代码

yinruoxi:

新增
    1.Message模块新增SocketIO
tags/B0810-test
kira 2 年前
父节点
当前提交
d379372cc2
共有 5 个文件被更改,包括 267 次插入1 次删除
  1. +16
    -0
      xueyi-api/xueyi-api-message/src/main/java/com/xueyi/message/api/transfer/domain/vo/Message.java
  2. +54
    -0
      xueyi-common/xueyi-common-core/src/main/java/com/xueyi/common/core/utils/InetAddressUtils.java
  3. +11
    -1
      xueyi-modules/xueyi-message/pom.xml
  4. +19
    -0
      xueyi-modules/xueyi-message/src/main/java/com/xueyi/message/transfer/service/IMessageQueueService.java
  5. +167
    -0
      xueyi-modules/xueyi-message/src/main/java/com/xueyi/message/transfer/service/impl/MessageQueueServiceImpl.java

+ 16
- 0
xueyi-api/xueyi-api-message/src/main/java/com/xueyi/message/api/transfer/domain/vo/Message.java 查看文件

@@ -0,0 +1,16 @@
package com.xueyi.message.api.transfer.domain.vo;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import lombok.*;

import java.io.Serializable;

@Data
@ToString
@Builder
@NoArgsConstructor
@JsonIgnoreProperties(ignoreUnknown = true)
public class Message implements Serializable,Cloneable{


}

+ 54
- 0
xueyi-common/xueyi-common-core/src/main/java/com/xueyi/common/core/utils/InetAddressUtils.java 查看文件

@@ -0,0 +1,54 @@
package com.xueyi.common.core.utils;


import java.net.InetAddress;
import java.net.NetworkInterface;
import java.net.UnknownHostException;
import java.util.Enumeration;

/**
* @author yinruoxi
* @version V1.0
* @className InetAddressUtils
* @description TO DO
* @Date 2023/8/10 14:19 PM
*/
public class InetAddressUtils {

public static InetAddress getLocalHostLANAddress() throws UnknownHostException {
try {
InetAddress candidateAddress = null;
// 遍历所有的网络接口
for (Enumeration ifaces = NetworkInterface.getNetworkInterfaces(); ifaces.hasMoreElements();) {
NetworkInterface iface = (NetworkInterface) ifaces.nextElement();
// 在所有的接口下再遍历IP
for (Enumeration inetAddrs = iface.getInetAddresses(); inetAddrs.hasMoreElements();) {
InetAddress inetAddr = (InetAddress) inetAddrs.nextElement();
if (!inetAddr.isLoopbackAddress()) {// 排除loopback类型地址
if (inetAddr.isSiteLocalAddress()) {
// 如果是site-local地址,就是它了
return inetAddr;
} else if (candidateAddress == null) {
// site-local类型的地址未被发现,先记录候选地址
candidateAddress = inetAddr;
}
}
}
}
if (candidateAddress != null) {
return candidateAddress;
}
// 如果没有发现 non-loopback地址.只能用最次选的方案
InetAddress jdkSuppliedAddress = InetAddress.getLocalHost();
if (jdkSuppliedAddress == null) {
throw new UnknownHostException("The JDK InetAddress.getLocalHost() method unexpectedly returned null.");
}
return jdkSuppliedAddress;
} catch (Exception e) {
UnknownHostException unknownHostException = new UnknownHostException(
"Failed to determine LAN address: " + e);
unknownHostException.initCause(e);
throw unknownHostException;
}
}
}

+ 11
- 1
xueyi-modules/xueyi-message/pom.xml 查看文件

@@ -47,7 +47,16 @@
<artifactId>spring-integration-mqtt</artifactId>
<version>5.5.14</version>
</dependency>

<dependency>
<groupId>com.corundumstudio.socketio</groupId>
<artifactId>netty-socketio</artifactId>
<version>1.7.13</version>
</dependency>
<dependency>
<groupId>io.socket</groupId>
<artifactId>socket.io-client</artifactId>
<version>1.0.0</version>
</dependency>

<!-- XueYi Common Log -->
<dependency>
@@ -55,6 +64,7 @@
<artifactId>xueyi-common-log</artifactId>
</dependency>


<!-- XueYi Common Web -->
<dependency>
<groupId>com.xueyi</groupId>


+ 19
- 0
xueyi-modules/xueyi-message/src/main/java/com/xueyi/message/transfer/service/IMessageQueueService.java 查看文件

@@ -0,0 +1,19 @@
package com.xueyi.message.transfer.service;

import com.corundumstudio.socketio.SocketIOClient;
import com.xueyi.message.api.transfer.domain.vo.Message;

import java.util.Collection;
import java.util.List;

public interface IMessageQueueService {

// public List<Message> getAll();
//
// public boolean add(Message message);

public void broadcast(String channel, String message);

public Collection<SocketIOClient> getAllClients();

}

+ 167
- 0
xueyi-modules/xueyi-message/src/main/java/com/xueyi/message/transfer/service/impl/MessageQueueServiceImpl.java 查看文件

@@ -0,0 +1,167 @@
package com.xueyi.message.transfer.service.impl;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.corundumstudio.socketio.AckRequest;
import com.corundumstudio.socketio.Configuration;
import com.corundumstudio.socketio.SocketIOClient;
import com.corundumstudio.socketio.SocketIOServer;
import com.corundumstudio.socketio.listener.ConnectListener;
import com.corundumstudio.socketio.listener.DataListener;
import com.corundumstudio.socketio.listener.DisconnectListener;
import com.corundumstudio.socketio.listener.ExceptionListener;
import com.xueyi.common.core.utils.InetAddressUtils;
import com.xueyi.message.api.transfer.domain.vo.Message;
import com.xueyi.message.transfer.service.IMessageQueueService;
import io.netty.channel.ChannelHandlerContext;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;

import java.io.Serializable;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;

@Component
public class MessageQueueServiceImpl implements IMessageQueueService {

@Autowired
RedisTemplate<String, Serializable> redisTemplate;
private Integer port = 9901;

private SocketIOServer server;
private ConcurrentHashMap<String, Message> messageMap = new ConcurrentHashMap<>();

MessageQueueServiceImpl(){
startServer(port);
}

// @Override
// public List<Message> getAll(){
// List<Message> list = new ArrayList<Message>();
// Iterator<Map.Entry<String, Message>> entries = messageMap.entrySet().iterator();
// while(entries.hasNext()){
// Map.Entry<String, Message> entry = entries.next();
// Message value = entry.getValue();
// if(System.currentTimeMillis()- value.getTimestamp() > 10000){
// messageMap.remove(entry.getKey());
// }else{
// list.add(value);
// }
// }
// if(list.size()>0){
// return list;
// }
// return null;
// }
//
// @Override
// public boolean add(Message message){
//
// messageMap.put(message.getVid(),message);
// return true;
// }

@Override
public void broadcast(String channel, String message){
server.getBroadcastOperations().sendEvent(channel,message);
}

@Override
public Collection<SocketIOClient> getAllClients(){
return server.getAllClients();
}


private void startServer(int port){
Configuration config = new Configuration();
ExceptionListener exceptionListener = new ExceptionListener() {
@Override
public void onEventException(Exception e, List<Object> list, SocketIOClient socketIOClient) {

}

@Override
public void onDisconnectException(Exception e, SocketIOClient socketIOClient) {

}

@Override
public void onConnectException(Exception e, SocketIOClient socketIOClient) {

}

@Override
public boolean exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable throwable) throws Exception {
channelHandlerContext.close();
return true;
}
};
try {
System.out.println("启动");
config.setHostname(InetAddressUtils.getLocalHostLANAddress().getHostAddress());

} catch (Exception e) {
e.printStackTrace();
}
config.setPort(port);
config.setExceptionListener(exceptionListener);
server = new SocketIOServer(config);
server.addConnectListener(new ConnectListener() {
// 添加客户端连接监听器
@Override
public void onConnect(SocketIOClient client) {
System.out.println(client.getRemoteAddress().toString());
if(client.getRemoteAddress().toString().contains("100.117") || client.getRemoteAddress().toString().contains("100.171")){
client.disconnect();
return;
}
client.sendEvent("connected", "hello");
}
});

server.addEventListener("client_info", String.class, new DataListener<String>(){
@Override
public void onData(SocketIOClient client, String data, AckRequest ackRequest) throws ClassNotFoundException {
//客户端推送advert_info事件时,onData接受数据,这里是string类型的json数据,还可以为Byte[],object其他类型
String sa = client.getRemoteAddress().toString();
String clientIp = sa.substring(1,sa.indexOf(":"));//获取客户端连接的ip
Map params = client.getHandshakeData().getUrlParams();//获取客户端url参数
System.out.println(clientIp+":客户端:************"+data);
}
});
// server.addEventListener("remove_vinfromrsu", String.class, new DataListener<String>() {
// @Override
// public void onData(SocketIOClient socketIOClient, String s, AckRequest ackRequest) throws Exception {
// System.out.println("remove_vinfromrsu : " + s);
// List<RsuEventSimulatorMessage> list = JSON.parseArray(s, RsuEventSimulatorMessage.class);
// for (RsuEventSimulatorMessage item : list) {
// for (String vout : item.getVinouts()) {
// redisTemplate.opsForSet().remove("rsu_" + item.getRsuId(),vout);
// }
// }
// }
// });
// server.addEventListener("screen_info", String.class, new DataListener<String>(){
// @Override
// public void onData(SocketIOClient client, String data, AckRequest ackRequest) throws ClassNotFoundException {
// VehicleGridCountItem item = ((JSONObject)JSON.parse(data)).toJavaObject(VehicleGridCountItem.class);
// gridCountItemMap.put(client,item);
// }
// });
//添加客户端断开连接事件
server.addDisconnectListener(new DisconnectListener(){
@Override
public void onDisconnect(SocketIOClient client) {
String sa = client.getRemoteAddress().toString();
String clientIp = sa.substring(1,sa.indexOf(":"));//获取设备ip
System.out.println(clientIp+"-------------------------"+"客户端已断开连接");
//给客户端发送消息
client.sendEvent("advert_info",clientIp+"客户端你好,我是服务端,期待下次和你见面");
}
});
server.start();
}

}

正在加载...
取消
保存