Преглед изворни кода

change mqtt service

tags/v0.8.5^2
yk пре 2 година
родитељ
комит
2ccdc0738c
3 измењених фајлова са 176 додато и 10 уклоњено
  1. +30
    -0
      xueyi-api/xueyi-api-system/src/main/java/com/xueyi/system/api/digitalmans/feign/RemoteDigitalmanMqttService.java
  2. +21
    -2
      xueyi-modules/xueyi-message/src/main/java/com/xueyi/message/handler/MqttTopicHandle.java
  3. +125
    -8
      xueyi-modules/xueyi-system/src/main/java/com/xueyi/system/digitalmans/controller/DmDigitalmanController.java

+ 30
- 0
xueyi-api/xueyi-api-system/src/main/java/com/xueyi/system/api/digitalmans/feign/RemoteDigitalmanMqttService.java Прегледај датотеку

@@ -0,0 +1,30 @@
package com.xueyi.system.api.digitalmans.feign;

import com.xueyi.common.core.constant.basic.SecurityConstants;
import com.xueyi.common.core.constant.basic.ServiceConstants;
import com.xueyi.system.api.organize.feign.factory.RemoteUserFallbackFactory;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestHeader;
import org.springframework.web.bind.annotation.RequestParam;

/**
* 一体机mqtt服务
*
* @author yk
*/
@FeignClient(contextId = "remoteDigitalmanService", value = ServiceConstants.SYSTEM_SERVICE, fallbackFactory = RemoteUserFallbackFactory.class)
public interface RemoteDigitalmanMqttService {

/**
* 同步数字人信息 | 内部调用
*/
@GetMapping("/man/inner/mqtt/heartbeat/{message}")
public void heartbeat(@RequestParam(value = "message") String message);


@GetMapping("/man/inner/mqtt/log-upload/{message}")
public void logUpload(@RequestParam(value = "message") String message,@RequestHeader(SecurityConstants.ENTERPRISE_ID) Long enterpriseId, @RequestHeader(SecurityConstants.SOURCE_NAME) String sourceName, @RequestHeader(SecurityConstants.FROM_SOURCE) String source);


}

+ 21
- 2
xueyi-modules/xueyi-message/src/main/java/com/xueyi/message/handler/MqttTopicHandle.java Прегледај датотеку

@@ -1,7 +1,12 @@
package com.xueyi.message.handler;

import com.alibaba.fastjson2.JSONObject;
import com.xueyi.common.core.constant.basic.SecurityConstants;
import com.xueyi.common.mqtt.annotation.MqttService;
import com.xueyi.common.mqtt.annotation.MqttTopic;
import com.xueyi.system.api.device.domain.vo.DeviceTenantSourceMergeVo;
import com.xueyi.system.api.device.feign.RemoteDeviceTenantMergeService;
import com.xueyi.system.api.digitalmans.feign.RemoteDigitalmanMqttService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.Message;
@@ -9,6 +14,9 @@ import org.springframework.messaging.Message;
@MqttService
public class MqttTopicHandle {

private RemoteDeviceTenantMergeService remoteDeviceTenantMergeService;

private RemoteDigitalmanMqttService remoteDigitalmanMqttService;
private static final Logger log = LoggerFactory.getLogger(MqttTopicHandle.class);

/**
@@ -39,13 +47,24 @@ public class MqttTopicHandle {

@MqttTopic("/digital_man/heart_beat")
public void heartbeat(Message message){
//TODO. heartbeat mqtt 处理逻辑
System.err.println("rev heartbeat mqtt msg:"+message.getPayload().toString());

String messageStr = message.getPayload().toString();
JSONObject heartBeatObj = JSONObject.parseObject(messageStr);
String devId = heartBeatObj.getString("devId");
DeviceTenantSourceMergeVo vo = remoteDeviceTenantMergeService.selectDeviceTenantSourceMerge(devId);
remoteDigitalmanMqttService.heartbeat(messageStr);

}

@MqttTopic("/digital_man/log_upload_notification")
public void logUpload(Message message){
//TODO. 预警上报 mqtt 处理逻辑
System.err.println("rev logUpload mqtt msg:"+message.getPayload().toString());

String messageStr = message.getPayload().toString();
JSONObject heartBeatObj = JSONObject.parseObject(messageStr);
String devId = heartBeatObj.getString("devId");
DeviceTenantSourceMergeVo vo = remoteDeviceTenantMergeService.selectDeviceTenantSourceMerge(devId);
remoteDigitalmanMqttService.logUpload(messageStr, vo.getTenantId(), vo.getSourceSlave(), SecurityConstants.INNER);
}
}

+ 125
- 8
xueyi-modules/xueyi-system/src/main/java/com/xueyi/system/digitalmans/controller/DmDigitalmanController.java Прегледај датотеку

@@ -1,8 +1,10 @@
package com.xueyi.system.digitalmans.controller;

import com.alibaba.fastjson2.JSONObject;
import com.baomidou.mybatisplus.core.toolkit.StringUtils;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.xueyi.common.core.constant.basic.BaseConstants;
import com.xueyi.common.core.constant.basic.SecurityConstants;
import com.xueyi.common.core.constant.digitalman.InitConstants;
import com.xueyi.common.core.constant.basic.SqlConstants;
import com.xueyi.common.core.constant.digitalman.MessageConstants;
import com.xueyi.common.core.web.result.AjaxResult;
import com.xueyi.common.core.web.result.R;
@@ -13,33 +15,47 @@ import com.xueyi.common.log.enums.BusinessType;
import com.xueyi.common.security.annotation.InnerAuth;
import com.xueyi.common.security.annotation.RequiresPermissions;
import com.xueyi.common.web.entity.controller.BaseController;
import com.xueyi.common.web.utils.DateUtils;
import com.xueyi.message.api.transfer.feign.RemoteTransferService;
import com.xueyi.system.api.device.domain.po.DmDeviceTenantMergePo;
import com.xueyi.system.api.digitalmans.domain.dto.DmDigitalmanWorktimeDto;
import com.xueyi.system.api.digitalmans.domain.dto.DmManDeviceDto;
import com.xueyi.system.api.digitalmans.domain.dto.DmModelDto;
import com.xueyi.system.api.digitalmans.domain.dto.DmSyncDigitalmanDto;
import com.xueyi.system.api.digitalmans.domain.po.DmDigitalmanExtPo;
import com.xueyi.system.api.digitalmans.domain.po.DmDigitalmanPo;
import com.xueyi.system.api.digitalmans.domain.po.DmDigitalmanPo;
import com.xueyi.system.api.digitalmans.domain.po.DmManDevicePo;
import com.xueyi.system.device.service.impl.DmDeviceTenantMergeServiceImpl;
import com.xueyi.system.digitalmans.domain.dto.DmDigitalmanDto;
import com.xueyi.system.digitalmans.domain.dto.DmDigitalmanExtDto;
import com.xueyi.system.digitalmans.domain.query.DmDigitalmanExtQuery;
import com.xueyi.system.digitalmans.domain.query.DmDigitalmanQuery;
import com.xueyi.system.digitalmans.mapper.DmManDeviceMapper;
import com.xueyi.system.digitalmans.service.IDmDigitalmanExtService;
import com.xueyi.system.digitalmans.service.IDmDigitalmanService;
import com.xueyi.system.digitalmans.service.IDmModelService;
import com.xueyi.system.digitalmans.service.IDmSkillService;
import com.xueyi.system.digitalmans.service.impl.DmManDeviceServiceImpl;
import com.xueyi.system.emcs.constant.EmcsUploadType;
import com.xueyi.system.emcs.domain.dto.DmDeviceLogFileDto;
import com.xueyi.system.emcs.domain.dto.DmExceptionLogDto;
import com.xueyi.system.emcs.domain.dto.RedisBaseDto;
import com.xueyi.system.emcs.mapper.DmDeviceLogFileMapper;
import com.xueyi.system.emcs.mapper.DmExceptionLogMapper;
import com.xueyi.system.emcs.service.BaseRedisListDataCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.PutMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import java.io.Serializable;
import java.text.ParseException;
import java.util.List;

/**
@@ -51,6 +67,21 @@ import java.util.List;
@RequestMapping("/man")
public class DmDigitalmanController extends BaseController<DmDigitalmanQuery, DmDigitalmanDto, IDmDigitalmanService> {

private final static String HEART_MQTT_PREFIX = "digital-man";
private final static Integer REDIS_LIST_SIZE = 60;

@Autowired
private BaseRedisListDataCache baseRedisListDataCache;

@Autowired
private DmManDeviceMapper manDeviceMapper;

@Autowired
private DmExceptionLogMapper exceptionLogMapper;

@Autowired
private DmDeviceLogFileMapper deviceLogFileMapper;

private static final Logger log = LoggerFactory.getLogger(DmDigitalmanController.class);

@Autowired
@@ -224,4 +255,90 @@ public class DmDigitalmanController extends BaseController<DmDigitalmanQuery, Dm
/** 系统 - 数字人基础管理 - 删除 */
String DM_DIGITALMAN_DEL = "man:man:delete";
}


@InnerAuth
@GetMapping("/inner/mqtt/heartbeat/{message}")
public void heartbeat(@RequestParam(value = "message") String message) {
JSONObject heartBeatObj = JSONObject.parseObject(message);
String devId = heartBeatObj.getString("devId");
String timestamp = heartBeatObj.getString("timestamp");
String network = heartBeatObj.getString("wifi_rssi");
String cpu_max = heartBeatObj.getString("cpuinfo_max_freq");
String cpu_scaling = heartBeatObj.getString("scaling_cur_freq");
String cpu = Double.valueOf(cpu_scaling) / Double.valueOf(cpu_max) * 100 + "";
String memory_cur = heartBeatObj.getString("memory_total");
// String memoryTotal = heartBeatObj.getString("MemTotal").substring(0,heartBeatObj.getString("MemTotal").length() - 2);
String memoryTotal = heartBeatObj.getString("memory_total");
Double memoryFree = Double.valueOf(heartBeatObj.getString("memory_free"));
String memory = (Double.valueOf(memoryTotal) - memoryFree) / Double.valueOf(memoryTotal) * 100 + "";
String occurTime = "";

try {
occurTime = DateUtils.formatDate(DateUtils.parseLongToDate(timestamp),"HH:mm");
} catch (ParseException e) {
throw new RuntimeException(e);
}


if (StringUtils.isNotEmpty(devId)) {
if (StringUtils.isNotEmpty(network)){
baseRedisListDataCache.init (REDIS_LIST_SIZE).addToList(HEART_MQTT_PREFIX+":"+devId+":network", new RedisBaseDto(occurTime, network));
}
if (StringUtils.isNotEmpty(cpu)) {
baseRedisListDataCache.init(REDIS_LIST_SIZE).addToList(HEART_MQTT_PREFIX + ":" + devId + ":cpu", new RedisBaseDto(occurTime, cpu));
}
if (StringUtils.isNotEmpty(memory)) {
baseRedisListDataCache.init(REDIS_LIST_SIZE).addToList(HEART_MQTT_PREFIX + ":" + devId + ":memory", new RedisBaseDto(occurTime, memory));
}
}
System.err.println(heartBeatObj.toJSONString());
}

@InnerAuth
@GetMapping("/inner/mqtt/log-upload/{message}")
public void logUpload(@RequestParam(value = "message") String message) {
JSONObject heartBeatObj = JSONObject.parseObject(message);
String devId = heartBeatObj.getString("devId");
String timestamp = heartBeatObj.getString("timestamp");
String fileName = heartBeatObj.getString("file_name");
String path = heartBeatObj.getString("path");
String type = heartBeatObj.getString("type");
Integer level = heartBeatObj.getInteger("level");
Integer uploadType = heartBeatObj.getInteger("upload_type");


DmExceptionLogDto exceptionLogDto = new DmExceptionLogDto();
exceptionLogDto.setType(type);
exceptionLogDto.setLevel(level);
exceptionLogDto.setDevId(devId);

DmManDevicePo devicePo = manDeviceMapper.selectOne(Wrappers.<DmManDevicePo>lambdaQuery().eq(DmManDevicePo::getDeviceId, devId).last(SqlConstants.LIMIT_ONE));
try {
if (null!= devicePo) {
exceptionLogDto.setManCode(devicePo.getManCode());
exceptionLogDto.setTId(devicePo.getTId());

exceptionLogDto.setOccurTime(DateUtils.dateToLocalDateTime(DateUtils.parseLongToDate(timestamp)));
// 上传类型为崩溃上传时,插入异常日志
if (uploadType == EmcsUploadType.getTypeCrackUpload()) {
exceptionLogMapper.insert(exceptionLogDto);
}

//DmExceptionLogDto 持久化
DmDeviceLogFileDto deviceLogFileDto = new DmDeviceLogFileDto();
deviceLogFileDto.setManDeviceId(devicePo.getId());
deviceLogFileDto.setUrl(path);
deviceLogFileDto.setType(uploadType);
deviceLogFileDto.setName(fileName);

deviceLogFileMapper.insert(deviceLogFileDto);

} else {
return;
}
} catch (ParseException e) {
throw new RuntimeException(e);
}
}
}

Loading…
Откажи
Сачувај