电表、水表、温湿度计抄表、拉合闸时,将数值和状态同步到业务系统

This commit is contained in:
曹磊 2025-08-21 09:45:09 +08:00
parent 79630bba85
commit bddd08eacb
12 changed files with 590 additions and 7 deletions

View File

@ -20,6 +20,8 @@ public class IotElectricityMeterMQDto{
private String eleValue;
/**阀门状态*/
private String relayState;
/**上次查表时间*/
private String readTime;
/**描述*/
private String remark;
/**区域编码*/

View File

@ -22,6 +22,8 @@ public class IotWaterMeterMQDto{
private String relayState;
/**水表电池状态*/
private String batteryState;
/**上次查表时间*/
private String readTime;
/**描述*/
private String remark;
/**区域编码*/

View File

@ -12,6 +12,8 @@ import java.util.Map;
public interface IElectricityMeterService extends IService<ElectricityMeter> {
IPage<ElectricityMeter> findPage(Page<ElectricityMeter> page, ElectricityMeter electricityMeter);
List<ElectricityMeter> findAllList();
void updateValue(ElectricityMeter electricityMeter);
void updateRelayState(ElectricityMeter electricityMeter);
Result<String> eleReset(ElectricityMeter electricityMeter);
Result<String> eleControl(ElectricityMeter electricityMeter);
Result<String> eleRead(ElectricityMeter electricityMeter);

View File

@ -3,12 +3,17 @@ package com.nu.modules.tq.electricity.service.impl;
import cn.hutool.core.date.DateUtil;
import cn.hutool.json.JSONArray;
import cn.hutool.json.JSONObject;
import com.alibaba.fastjson.JSON;
import com.baomidou.dynamic.datasource.annotation.DS;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.nu.dto.IotElectricityMeterMQDto;
import com.nu.dto.IotHumidDeviceMQDto;
import com.nu.modules.syncLog.entity.SyncBizLog;
import com.nu.modules.syncLog.entity.SyncLog;
import com.nu.modules.syncLog.service.ISyncBizLogService;
import com.nu.modules.syncLog.service.ISyncLogService;
import com.nu.modules.tq.common.entity.TqApiLog;
import com.nu.modules.tq.common.entity.TqDeviceInfo;
@ -18,12 +23,15 @@ import com.nu.modules.tq.utils.HttpTool;
import com.nu.modules.tq.utils.SignTool;
import com.nu.modules.tq.utils.TqApi;
import com.nu.modules.yiweilian.humid.entity.HumidDevice;
import com.nu.utils.RabbitMQUtil;
import lombok.extern.slf4j.Slf4j;
import com.nu.modules.tq.electricity.entity.ElectricityMeter;
import com.nu.modules.tq.electricity.mapper.ElectricityMeterMapper;
import com.nu.modules.tq.electricity.service.IElectricityMeterService;
import me.zhyd.oauth.utils.UuidUtils;
import org.apache.commons.lang.StringUtils;
import org.jeecg.common.api.vo.Result;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
@ -35,13 +43,19 @@ import java.util.*;
public class ElectricityMeterServiceImpl extends ServiceImpl<ElectricityMeterMapper, ElectricityMeter> implements IElectricityMeterService {
@Autowired
TqApi tqApi;
private TqApi tqApi;
@Autowired
ITqApiLogService logService;
private ITqApiLogService logService;
@Autowired
TqDeviceInfoMapper tqDeviceInfoMapper;
private ISyncBizLogService bizLogService;
@Autowired
private RabbitMQUtil rabbitMQUtil;
@Autowired
private TqDeviceInfoMapper tqDeviceInfoMapper;
@Autowired
public ISyncLogService nuIotTqElectricitySyncLogService;
@ -60,6 +74,16 @@ public class ElectricityMeterServiceImpl extends ServiceImpl<ElectricityMeterMap
return baseMapper.findAllList();
}
@Override
public void updateValue(ElectricityMeter electricityMeter){
baseMapper.updateValue(electricityMeter);
}
@Override
public void updateRelayState(ElectricityMeter electricityMeter){
baseMapper.updateRelayState(electricityMeter);
}
/**
* 清零
*/
@ -343,6 +367,7 @@ public class ElectricityMeterServiceImpl extends ServiceImpl<ElectricityMeterMap
em.setAddress(address);
em.setEleValue("0");
baseMapper.updateValue(em);
syncCleanMq(em);
}
}
}
@ -352,6 +377,31 @@ public class ElectricityMeterServiceImpl extends ServiceImpl<ElectricityMeterMap
return "SUCCESS";
}
/**
* 清零同步到业务系统
* @param electricityMeter
*/
private void syncCleanMq(ElectricityMeter electricityMeter){
ElectricityMeter entity = baseMapper.getElectricityMeter(electricityMeter);
if(entity!=null) {
IotElectricityMeterMQDto iem = new IotElectricityMeterMQDto();
BeanUtils.copyProperties(entity, iem);
String json = JSON.toJSONString(entity);
String logId = UuidUtils.getUUID();
SyncBizLog log = new SyncBizLog();
log.setLogId(logId);
log.setOrgCode(entity.getDepartServerUrl());
log.setOrgName(entity.getDepartName());
log.setContent(json);
log.setSyncType("更新");
log.setStatus("同步中");
log.setServerType("电表");
bizLogService.addLog(log);
iem.setLogId(logId);
rabbitMQUtil.sendToExchange("hldy.iotDeviceValues", entity.getDepartServerUrl() + ".iotElectricity.eleValue.async", iem);
}
}
/**
* 开关闸回调通知
*/
@ -403,6 +453,7 @@ public class ElectricityMeterServiceImpl extends ServiceImpl<ElectricityMeterMap
em.setRelayState("1");
}
baseMapper.updateRelayState(em);
syncControlMq(em);
}
}
}
@ -412,6 +463,32 @@ public class ElectricityMeterServiceImpl extends ServiceImpl<ElectricityMeterMap
return "SUCCESS";
}
/**
* 开关闸同步到业务系统
* @param electricityMeter
*/
private void syncControlMq(ElectricityMeter electricityMeter){
ElectricityMeter entity = baseMapper.getElectricityMeter(electricityMeter);
if(entity!=null) {
IotElectricityMeterMQDto iem = new IotElectricityMeterMQDto();
BeanUtils.copyProperties(entity, iem);
String json = JSON.toJSONString(entity);
String logId = UuidUtils.getUUID();
SyncBizLog log = new SyncBizLog();
log.setLogId(logId);
log.setOrgCode(entity.getDepartServerUrl());
log.setOrgName(entity.getDepartName());
log.setContent(json);
log.setSyncType("更新");
log.setStatus("同步中");
log.setServerType("电表");
bizLogService.addLog(log);
iem.setLogId(logId);
rabbitMQUtil.sendToExchange("hldy.iotDeviceValues", entity.getDepartServerUrl() + ".iotElectricity.eleControl.async", iem);
}
}
/**
* 抄表回调通知
*/
@ -459,6 +536,7 @@ public class ElectricityMeterServiceImpl extends ServiceImpl<ElectricityMeterMap
baseMapper.updateValue(em);
tqApiLog.setId(logEntity.getId());
logService.update(tqApiLog);
syncValueMq(em);
}
}
}else{
@ -479,6 +557,31 @@ public class ElectricityMeterServiceImpl extends ServiceImpl<ElectricityMeterMap
return "SUCCESS";
}
/**
* 抄表同步到业务系统
* @param electricityMeter
*/
private void syncValueMq(ElectricityMeter electricityMeter){
ElectricityMeter entity = baseMapper.getElectricityMeter(electricityMeter);
if(entity!=null) {
IotElectricityMeterMQDto iem = new IotElectricityMeterMQDto();
BeanUtils.copyProperties(entity, iem);
String json = JSON.toJSONString(entity);
String logId = UuidUtils.getUUID();
SyncBizLog log = new SyncBizLog();
log.setLogId(logId);
log.setOrgCode(entity.getDepartServerUrl());
log.setOrgName(entity.getDepartName());
log.setContent(json);
log.setSyncType("更新");
log.setStatus("同步中");
log.setServerType("电表");
bizLogService.addLog(log);
iem.setLogId(logId);
rabbitMQUtil.sendToExchange("hldy.iotDeviceValues", entity.getDepartServerUrl() + ".ywIotElectricity.eleValue.async", iem);
}
}
@Override
public void syncElectricity(String dataSourceCode, ElectricityMeter electricityMeter) {

View File

@ -12,6 +12,8 @@ import java.util.Map;
public interface IWaterMeterService extends IService<WaterMeter> {
IPage<WaterMeter> findPage(Page<WaterMeter> page, WaterMeter waterMeter);
List<WaterMeter> findAllList();
void updateValue(WaterMeter waterMeter);
void updateRelayState(WaterMeter waterMeter);
Result<String> waterReset(WaterMeter waterMeter);
Result<String> waterControl(WaterMeter waterMeter);
Result<String> waterRead(WaterMeter waterMeter);

View File

@ -3,12 +3,17 @@ package com.nu.modules.tq.water.service.impl;
import cn.hutool.core.date.DateUtil;
import cn.hutool.json.JSONArray;
import cn.hutool.json.JSONObject;
import com.alibaba.fastjson.JSON;
import com.baomidou.dynamic.datasource.annotation.DS;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.nu.dto.IotElectricityMeterMQDto;
import com.nu.dto.IotWaterMeterMQDto;
import com.nu.modules.syncLog.entity.SyncBizLog;
import com.nu.modules.syncLog.entity.SyncLog;
import com.nu.modules.syncLog.service.ISyncBizLogService;
import com.nu.modules.syncLog.service.ISyncLogService;
import com.nu.modules.tplink.camera.entity.CameraInfo;
import com.nu.modules.tq.common.entity.TqApiLog;
@ -23,9 +28,12 @@ import com.nu.modules.tq.utils.TqApi;
import com.nu.modules.tq.water.entity.WaterMeter;
import com.nu.modules.tq.water.mapper.WaterMeterMapper;
import com.nu.modules.tq.water.service.IWaterMeterService;
import com.nu.utils.RabbitMQUtil;
import lombok.extern.slf4j.Slf4j;
import me.zhyd.oauth.utils.UuidUtils;
import org.apache.commons.lang.StringUtils;
import org.jeecg.common.api.vo.Result;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
@ -37,13 +45,19 @@ import java.util.*;
public class WaterMeterServiceImpl extends ServiceImpl<WaterMeterMapper, WaterMeter> implements IWaterMeterService {
@Autowired
TqApi tqApi;
private TqApi tqApi;
@Autowired
ITqApiLogService logService;
private ITqApiLogService logService;
@Autowired
TqDeviceInfoMapper tqDeviceInfoMapper;
private ISyncBizLogService bizLogService;
@Autowired
private RabbitMQUtil rabbitMQUtil;
@Autowired
private TqDeviceInfoMapper tqDeviceInfoMapper;
@Autowired
public ISyncLogService nuIotTqElectricitySyncLogService;
@ -62,6 +76,16 @@ public class WaterMeterServiceImpl extends ServiceImpl<WaterMeterMapper, WaterMe
return baseMapper.findAllList();
}
@Override
public void updateValue(WaterMeter updateValue){
baseMapper.updateValue(updateValue);
}
@Override
public void updateRelayState(WaterMeter updateValue){
baseMapper.updateRelayState(updateValue);
}
/**
* 清零
*/
@ -346,6 +370,7 @@ public class WaterMeterServiceImpl extends ServiceImpl<WaterMeterMapper, WaterMe
wm.setAddress(address);
wm.setWaterValue("0");
baseMapper.updateValue(wm);
syncCleanMq(wm);
}
}
}
@ -355,6 +380,31 @@ public class WaterMeterServiceImpl extends ServiceImpl<WaterMeterMapper, WaterMe
return "SUCCESS";
}
/**
* 清零同步到业务系统
* @param waterMeter
*/
private void syncCleanMq(WaterMeter waterMeter){
WaterMeter entity = baseMapper.getWaterMeter(waterMeter);
if(entity!=null) {
IotWaterMeterMQDto iwm = new IotWaterMeterMQDto();
BeanUtils.copyProperties(entity, iwm);
String json = JSON.toJSONString(entity);
String logId = UuidUtils.getUUID();
SyncBizLog log = new SyncBizLog();
log.setLogId(logId);
log.setOrgCode(entity.getDepartServerUrl());
log.setOrgName(entity.getDepartName());
log.setContent(json);
log.setSyncType("更新");
log.setStatus("同步中");
log.setServerType("水表");
bizLogService.addLog(log);
iwm.setLogId(logId);
rabbitMQUtil.sendToExchange("hldy.iotDeviceValues", entity.getDepartServerUrl() + ".iotWater.waterValue.async", iwm);
}
}
/**
* 开关阀回调通知
*/
@ -406,6 +456,7 @@ public class WaterMeterServiceImpl extends ServiceImpl<WaterMeterMapper, WaterMe
wm.setRelayState("0");
}
baseMapper.updateRelayState(wm);
syncControlMq(wm);
}
}
}
@ -415,6 +466,31 @@ public class WaterMeterServiceImpl extends ServiceImpl<WaterMeterMapper, WaterMe
return "SUCCESS";
}
/**
* 开关阀同步到业务系统
* @param waterMeter
*/
private void syncControlMq(WaterMeter waterMeter){
WaterMeter entity = baseMapper.getWaterMeter(waterMeter);
if(entity!=null) {
IotWaterMeterMQDto iwm = new IotWaterMeterMQDto();
BeanUtils.copyProperties(entity, iwm);
String json = JSON.toJSONString(entity);
String logId = UuidUtils.getUUID();
SyncBizLog log = new SyncBizLog();
log.setLogId(logId);
log.setOrgCode(entity.getDepartServerUrl());
log.setOrgName(entity.getDepartName());
log.setContent(json);
log.setSyncType("更新");
log.setStatus("同步中");
log.setServerType("水表");
bizLogService.addLog(log);
iwm.setLogId(logId);
rabbitMQUtil.sendToExchange("hldy.iotDeviceValues", entity.getDepartServerUrl() + ".iotWater.waterControl.async", iwm);
}
}
/**
* 抄表回调通知
*/
@ -471,6 +547,7 @@ public class WaterMeterServiceImpl extends ServiceImpl<WaterMeterMapper, WaterMe
baseMapper.updateValue(wm);
tqApiLog.setId(logEntity.getId());
logService.update(tqApiLog);
syncValueMq(wm);
}
}
}else{
@ -491,7 +568,30 @@ public class WaterMeterServiceImpl extends ServiceImpl<WaterMeterMapper, WaterMe
return "SUCCESS";
}
/**
* 抄表同步到业务系统
* @param waterMeter
*/
private void syncValueMq(WaterMeter waterMeter){
WaterMeter entity = baseMapper.getWaterMeter(waterMeter);
if(entity!=null) {
IotWaterMeterMQDto iwm = new IotWaterMeterMQDto();
BeanUtils.copyProperties(entity, iwm);
String json = JSON.toJSONString(entity);
String logId = UuidUtils.getUUID();
SyncBizLog log = new SyncBizLog();
log.setLogId(logId);
log.setOrgCode(entity.getDepartServerUrl());
log.setOrgName(entity.getDepartName());
log.setContent(json);
log.setSyncType("更新");
log.setStatus("同步中");
log.setServerType("水表");
bizLogService.addLog(log);
iwm.setLogId(logId);
rabbitMQUtil.sendToExchange("hldy.iotDeviceValues", entity.getDepartServerUrl() + ".ywIotWater.waterValue.async", iwm);
}
}
@Override
public void syncElectricity(String dataSourceCode, WaterMeter waterMeter) {

View File

@ -15,6 +15,7 @@ public interface IHumidDeviceService extends IService<HumidDevice> {
String updateDeviceParameters(HumidDevice humidDevice,String type);
String updateDeviceRealTime(HumidDevice humidDevice);
Result<String> updateDevice(HumidDevice humidDevice);
void updateValue(HumidDevice humidDevice);
Result<String> delFlagDevice(HumidDevice humidDevice);
Result getDeviceParameters(HumidDevice humidDevice);
IPage<HumidDevice> findLogPage(Page<HumidDevice> page, HumidDevice humidDevice);

View File

@ -3,23 +3,33 @@ package com.nu.modules.yiweilian.humid.service.impl;
import cn.hutool.core.date.DateUtil;
import cn.hutool.json.JSONArray;
import cn.hutool.json.JSONObject;
import com.alibaba.fastjson.JSON;
import com.baomidou.dynamic.datasource.annotation.DS;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.nu.dto.IotHumidDeviceMQDto;
import com.nu.dto.IotTumsConfigMQDto;
import com.nu.modules.syncLog.entity.SyncBizLog;
import com.nu.modules.syncLog.entity.SyncConfigLog;
import com.nu.modules.syncLog.entity.SyncLog;
import com.nu.modules.syncLog.service.ISyncBizLogService;
import com.nu.modules.syncLog.service.ISyncLogService;
import com.nu.modules.tplink.common.entity.TumsConfig;
import com.nu.modules.tq.water.entity.WaterMeter;
import com.nu.modules.yiweilian.humid.entity.HumidDevice;
import com.nu.modules.yiweilian.humid.mapper.HumidDeviceMapper;
import com.nu.modules.yiweilian.humid.service.IHumidDeviceService;
import com.nu.modules.yiweilian.utils.YiweilianApi;
import com.nu.utils.RabbitMQUtil;
import lombok.extern.slf4j.Slf4j;
import me.zhyd.oauth.utils.UuidUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.shiro.SecurityUtils;
import org.jeecg.common.api.vo.Result;
import org.jeecg.common.system.vo.LoginUser;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
@ -33,6 +43,11 @@ public class HumidDeviceServiceImpl extends ServiceImpl<HumidDeviceMapper, Humid
@Autowired
YiweilianApi yiweilianApi;
@Autowired
private ISyncBizLogService logService;
@Autowired
private RabbitMQUtil rabbitMQUtil;
@Autowired
public ISyncLogService nuIotTqElectricitySyncLogService;
@ -258,6 +273,7 @@ public class HumidDeviceServiceImpl extends ServiceImpl<HumidDeviceMapper, Humid
dh.setOptBy(sysUser.getUsername());
}
baseMapper.insertLog(dh);
syncStatusMq(dh);
}
}else{
errorMsg += humidDevice.getSn()+"温湿度设备丢失,请联系管理员";
@ -275,6 +291,7 @@ public class HumidDeviceServiceImpl extends ServiceImpl<HumidDeviceMapper, Humid
}
baseMapper.insertLog(dh);
baseMapper.updateValue(dh);
syncStatusMq(dh);
}
if((page+1)*limit<count){
Map<String, Object> params = getParmas(page+1,limit,humidDevice);
@ -286,6 +303,31 @@ public class HumidDeviceServiceImpl extends ServiceImpl<HumidDeviceMapper, Humid
return errorMsg;
}
/**
* 同步到业务系统
* @param humidDevice
*/
private void syncStatusMq(HumidDevice humidDevice){
HumidDevice entity = baseMapper.getHumidInfo(humidDevice);
if(entity!=null){
IotHumidDeviceMQDto ihd = new IotHumidDeviceMQDto();
BeanUtils.copyProperties(entity, ihd);
String json = JSON.toJSONString(entity);
String logId = UuidUtils.getUUID();
SyncBizLog log = new SyncBizLog();
log.setLogId(logId);
log.setOrgCode(entity.getDepartServerUrl());
log.setOrgName(entity.getDepartName());
log.setContent(json);
log.setSyncType("更新");
log.setStatus("同步中");
log.setServerType("温湿度计");
logService.addLog(log);
ihd.setLogId(logId);
rabbitMQUtil.sendToExchange("hldy.iotDeviceValues", entity.getDepartServerUrl() + ".ywIotHumid.status.async", ihd);
}
}
private Map<String, Object> getUpdateParmas(HumidDevice humidDevice){
Map<String, Object> params = new HashMap<>();
params.put("sn", humidDevice.getSn());
@ -329,6 +371,15 @@ public class HumidDeviceServiceImpl extends ServiceImpl<HumidDeviceMapper, Humid
}
}
/**
* 更新温湿度值
* @param humidDevice
*/
@Override
public void updateValue(HumidDevice humidDevice){
baseMapper.updateValue(humidDevice);
}
/**
* 启用/停用设备
*/

View File

@ -0,0 +1,36 @@
package com.nu.mq.device.listener;
import com.nu.dto.StatusMQDto;
import com.nu.modules.syncLog.entity.SyncBizLog;
import com.nu.modules.syncLog.service.ISyncBizLogService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class IotDeviceValuesMQListener {
@Autowired
private ISyncBizLogService logService;
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(name = "ywIotDeviceValues.result.async", durable = "true"),
exchange = @Exchange(name = "hldy.iotDeviceValues", type = ExchangeTypes.DIRECT),
key = "ywIotDeviceValues.result.async"
),
errorHandler = "iotDeviceMQErrorHandler"
)
public void handleMessage(StatusMQDto dto) {
SyncBizLog log = new SyncBizLog();
log.setLogId(dto.getPrimaryKey());
log.setStatus(dto.getMessage());
logService.updateLog(log);
}
}

View File

@ -0,0 +1,107 @@
package com.nu.mq.tq.listener;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.nu.dto.IotElectricityMeterMQDto;
import com.nu.dto.IotHumidDeviceMQDto;
import com.nu.dto.StatusMQDto;
import com.nu.modules.tq.electricity.entity.ElectricityMeter;
import com.nu.modules.tq.electricity.service.IElectricityMeterService;
import com.nu.modules.yiweilian.humid.entity.HumidDevice;
import com.nu.modules.yiweilian.humid.service.IHumidDeviceService;
import com.nu.utils.RabbitMQUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class IotSyncElectricityMQListener {
@Autowired
private RabbitMQUtil rabbitMQUtil;
@Autowired
private IElectricityMeterService electricityMeterService;
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(name = "hldy.iotElectricity.eleValue.async"),
exchange = @Exchange(name = "hldy.iotDeviceValues", type = ExchangeTypes.DIRECT),
key = "hldy.iotElectricity.eleValue.async"
),
errorHandler = "iotDeviceMQErrorHandler"
)
public void handleEleValue_unify(IotElectricityMeterMQDto dto) {
saveSyncEleValue(dto);
}
private void saveSyncEleValue(IotElectricityMeterMQDto dto) {
StatusMQDto statusMQDto = new StatusMQDto();
try {
statusMQDto.setAsyncId(dto.getId().toString());
statusMQDto.setMessage("成功");
statusMQDto.setPrimaryKey(dto.getLogId());
statusMQDto.setNote("电表");
QueryWrapper<ElectricityMeter> emQw = new QueryWrapper<>();
emQw.eq("cid",dto.getCid());
emQw.eq("address",dto.getAddress());
ElectricityMeter entity = electricityMeterService.getOne(emQw);
if(entity!=null){
entity.setEleValue(dto.getEleValue());
entity.setReadTime(dto.getReadTime());
electricityMeterService.updateValue(entity);
}
}
catch (Exception e) {
statusMQDto.setAsyncId(dto.getId().toString());
statusMQDto.setMessage("失败");
statusMQDto.setPrimaryKey(dto.getLogId());
statusMQDto.setNote("电表");
e.printStackTrace();
}
rabbitMQUtil.sendToExchange("hldy.iotDeviceValues", dto.getDepartServerUrl() +".bizIotDeviceValues.result.async", statusMQDto);
}
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(name = "hldy.iotElectricity.eleControl.async"),
exchange = @Exchange(name = "hldy.iotDeviceValues", type = ExchangeTypes.DIRECT),
key = "hldy.iotElectricity.eleControl.async"
),
errorHandler = "iotDeviceMQErrorHandler"
)
public void handleEleControl_unify(IotElectricityMeterMQDto dto) {
saveSyncEleControl(dto);
}
private void saveSyncEleControl(IotElectricityMeterMQDto dto) {
StatusMQDto statusMQDto = new StatusMQDto();
try {
statusMQDto.setAsyncId(dto.getId().toString());
statusMQDto.setMessage("成功");
statusMQDto.setPrimaryKey(dto.getLogId());
statusMQDto.setNote("电表");
QueryWrapper<ElectricityMeter> emQw = new QueryWrapper<>();
emQw.eq("cid",dto.getCid());
emQw.eq("address",dto.getAddress());
ElectricityMeter entity = electricityMeterService.getOne(emQw);
if(entity!=null){
entity.setRelayState(dto.getRelayState());
electricityMeterService.updateRelayState(entity);
}
}
catch (Exception e) {
statusMQDto.setAsyncId(dto.getId().toString());
statusMQDto.setMessage("失败");
statusMQDto.setPrimaryKey(dto.getLogId());
statusMQDto.setNote("电表");
e.printStackTrace();
}
rabbitMQUtil.sendToExchange("hldy.iotDeviceValues", dto.getDepartServerUrl() +".bizIotDeviceValues.result.async", statusMQDto);
}
}

View File

@ -0,0 +1,109 @@
package com.nu.mq.tq.listener;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.nu.dto.IotElectricityMeterMQDto;
import com.nu.dto.IotHumidDeviceMQDto;
import com.nu.dto.IotWaterMeterMQDto;
import com.nu.dto.StatusMQDto;
import com.nu.modules.tq.electricity.entity.ElectricityMeter;
import com.nu.modules.tq.water.entity.WaterMeter;
import com.nu.modules.tq.water.service.IWaterMeterService;
import com.nu.modules.yiweilian.humid.entity.HumidDevice;
import com.nu.modules.yiweilian.humid.service.IHumidDeviceService;
import com.nu.utils.RabbitMQUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class IotSyncWaterMQListener {
@Autowired
private RabbitMQUtil rabbitMQUtil;
@Autowired
private IWaterMeterService waterMeterService;
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(name = "hldy.iotWater.waterValue.async"),
exchange = @Exchange(name = "hldy.iotDeviceValues", type = ExchangeTypes.DIRECT),
key = "hldy.iotWater.waterValue.async"
),
errorHandler = "iotDeviceMQErrorHandler"
)
public void handleWaterValue_unify(IotWaterMeterMQDto dto) {
saveSyncWaterValue(dto);
}
private void saveSyncWaterValue(IotWaterMeterMQDto dto) {
StatusMQDto statusMQDto = new StatusMQDto();
try {
statusMQDto.setAsyncId(dto.getId().toString());
statusMQDto.setMessage("成功");
statusMQDto.setPrimaryKey(dto.getLogId());
statusMQDto.setNote("水表");
QueryWrapper<WaterMeter> emQw = new QueryWrapper<>();
emQw.eq("cid",dto.getCid());
emQw.eq("address",dto.getAddress());
WaterMeter entity = waterMeterService.getOne(emQw);
if(entity!=null){
entity.setWaterValue(dto.getWaterValue());
entity.setReadTime(dto.getReadTime());
waterMeterService.updateValue(entity);
}
}
catch (Exception e) {
statusMQDto.setAsyncId(dto.getId().toString());
statusMQDto.setMessage("失败");
statusMQDto.setPrimaryKey(dto.getLogId());
statusMQDto.setNote("水表");
e.printStackTrace();
}
rabbitMQUtil.sendToExchange("hldy.iotDeviceValues", dto.getDepartServerUrl() +".bizIotDeviceValues.result.async", statusMQDto);
}
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(name = "hldy.iotWater.waterControl.async"),
exchange = @Exchange(name = "hldy.iotDeviceValues", type = ExchangeTypes.DIRECT),
key = "hldy.iotWater.waterControl.async"
),
errorHandler = "iotDeviceMQErrorHandler"
)
public void handleWaterControl_unify(IotElectricityMeterMQDto dto) {
saveSyncWaterControl(dto);
}
private void saveSyncWaterControl(IotElectricityMeterMQDto dto) {
StatusMQDto statusMQDto = new StatusMQDto();
try {
statusMQDto.setAsyncId(dto.getId().toString());
statusMQDto.setMessage("成功");
statusMQDto.setPrimaryKey(dto.getLogId());
statusMQDto.setNote("水表");
QueryWrapper<WaterMeter> emQw = new QueryWrapper<>();
emQw.eq("cid",dto.getCid());
emQw.eq("address",dto.getAddress());
WaterMeter entity = waterMeterService.getOne(emQw);
if(entity!=null){
entity.setRelayState(dto.getRelayState());
waterMeterService.updateRelayState(entity);
}
}
catch (Exception e) {
statusMQDto.setAsyncId(dto.getId().toString());
statusMQDto.setMessage("失败");
statusMQDto.setPrimaryKey(dto.getLogId());
statusMQDto.setNote("水表");
e.printStackTrace();
}
rabbitMQUtil.sendToExchange("hldy.iotDeviceValues", dto.getDepartServerUrl() +".bizIotDeviceValues.result.async", statusMQDto);
}
}

View File

@ -0,0 +1,68 @@
package com.nu.mq.yiweilian.listener;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.nu.dto.IotHumidDeviceMQDto;
import com.nu.dto.StatusMQDto;
import com.nu.modules.yiweilian.humid.entity.HumidDevice;
import com.nu.modules.yiweilian.humid.service.IHumidDeviceService;
import com.nu.utils.RabbitMQUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class IotSyncHumidMQListener {
@Autowired
private RabbitMQUtil rabbitMQUtil;
@Autowired
private IHumidDeviceService humidDeviceService;
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(name = "hldy.iotHumid.status.async"),
exchange = @Exchange(name = "hldy.iotDeviceValues", type = ExchangeTypes.DIRECT),
key = "hldy.iotHumid.status.async"
),
errorHandler = "iotDeviceMQErrorHandler"
)
public void handleMessage_unify(IotHumidDeviceMQDto dto) {
saveSyncInfo(dto);
}
private void saveSyncInfo(IotHumidDeviceMQDto iotHumidDeviceMQDto) {
StatusMQDto statusMQDto = new StatusMQDto();
try {
statusMQDto.setAsyncId(iotHumidDeviceMQDto.getId().toString());
statusMQDto.setMessage("成功");
statusMQDto.setPrimaryKey(iotHumidDeviceMQDto.getLogId());
statusMQDto.setNote("温湿度计");
QueryWrapper<HumidDevice> humidQw = new QueryWrapper<>();
humidQw.eq("sn",iotHumidDeviceMQDto.getSn());
HumidDevice entity = humidDeviceService.getOne(humidQw);
if(entity!=null){
entity.setStatus(iotHumidDeviceMQDto.getStatus());
entity.setElectricity(iotHumidDeviceMQDto.getElectricity());
entity.setTemperature(iotHumidDeviceMQDto.getTemperature());
entity.setHumidity(iotHumidDeviceMQDto.getHumidity());
entity.setReportingTime(iotHumidDeviceMQDto.getReportingTime());
humidDeviceService.updateValue(entity);
}
}
catch (Exception e) {
statusMQDto.setAsyncId(iotHumidDeviceMQDto.getId().toString());
statusMQDto.setMessage("失败");
statusMQDto.setPrimaryKey(iotHumidDeviceMQDto.getLogId());
statusMQDto.setNote("温湿度计");
e.printStackTrace();
}
rabbitMQUtil.sendToExchange("hldy.iotDeviceValues", iotHumidDeviceMQDto.getDepartServerUrl() +".bizIotDeviceValues.result.async", statusMQDto);
}
}