From 2a997f68ce7335d6bb54aeb81c6430fc666c2759 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9B=B9=E7=A3=8A?= <45566618@qq.com> Date: Thu, 12 Mar 2026 17:24:41 +0800 Subject: [PATCH] =?UTF-8?q?=E7=89=A9=E8=81=94=E6=99=BA=E8=83=BD=E7=94=B5?= =?UTF-8?q?=E8=A1=A8=E4=BB=A3=E7=A0=81=E8=B0=83=E6=95=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/nu/dto/IotElectricityMeterMQDto.java | 4 +- nursing-unit-iot/nu-iot-biz/pom.xml | 7 + .../tplink/camera/entity/CameraInfo.java | 1 + .../service/impl/TqDeviceInfoServiceImpl.java | 12 - .../api/ElectricityMeterApiController.java | 87 --- .../electricity/entity/ElectricityMeter.java | 12 +- .../mapper/xml/ElectricityMeterMapper.xml | 67 +-- .../service/IElectricityMeterService.java | 7 +- .../impl/ElectricityMeterServiceImpl.java | 540 ++++-------------- .../com/nu/modules/tq/utils/MqttConfig.java | 69 +++ .../modules/tq/utils/MqttMessageHandler.java | 105 ++++ .../modules/tq/utils/MqttPublisherParams.java | 84 +++ .../device/listener/IotSyncBizMQListener.java | 6 +- .../mq/tq/listenter/IotSyncTqMQListener.java | 6 +- .../main/resources/application-dev-nu002.yml | 10 + .../main/resources/application-dev-nu003.yml | 10 + .../src/main/resources/application-dev.yml | 10 + .../src/main/resources/application-uat.yml | 10 + .../src/main/resources/application-uat102.yml | 10 + .../src/main/resources/application-uat103.yml | 10 + .../src/main/resources/application-uat104.yml | 10 + 21 files changed, 501 insertions(+), 576 deletions(-) delete mode 100644 nursing-unit-iot/nu-iot-biz/src/main/java/com/nu/modules/tq/electricity/api/ElectricityMeterApiController.java create mode 100644 nursing-unit-iot/nu-iot-biz/src/main/java/com/nu/modules/tq/utils/MqttConfig.java create mode 100644 nursing-unit-iot/nu-iot-biz/src/main/java/com/nu/modules/tq/utils/MqttMessageHandler.java create mode 100644 nursing-unit-iot/nu-iot-biz/src/main/java/com/nu/modules/tq/utils/MqttPublisherParams.java diff --git a/nursing-unit-common/src/main/java/com/nu/dto/IotElectricityMeterMQDto.java b/nursing-unit-common/src/main/java/com/nu/dto/IotElectricityMeterMQDto.java index d3410c4f..4a05209a 100644 --- a/nursing-unit-common/src/main/java/com/nu/dto/IotElectricityMeterMQDto.java +++ b/nursing-unit-common/src/main/java/com/nu/dto/IotElectricityMeterMQDto.java @@ -12,10 +12,8 @@ import lombok.Data; public class IotElectricityMeterMQDto{ /**主键*/ private Integer id; - /**采集器*/ - private String cid; /**表号*/ - private String address; + private String sn; /**电表值*/ private String eleValue; /**阀门状态*/ diff --git a/nursing-unit-iot/nu-iot-biz/pom.xml b/nursing-unit-iot/nu-iot-biz/pom.xml index fffc6d0b..9897f7b6 100644 --- a/nursing-unit-iot/nu-iot-biz/pom.xml +++ b/nursing-unit-iot/nu-iot-biz/pom.xml @@ -59,6 +59,13 @@ compile + + + org.eclipse.paho + org.eclipse.paho.client.mqttv3 + 1.2.5 + + diff --git a/nursing-unit-iot/nu-iot-biz/src/main/java/com/nu/modules/tplink/camera/entity/CameraInfo.java b/nursing-unit-iot/nu-iot-biz/src/main/java/com/nu/modules/tplink/camera/entity/CameraInfo.java index 4af1f07b..b96bc03d 100644 --- a/nursing-unit-iot/nu-iot-biz/src/main/java/com/nu/modules/tplink/camera/entity/CameraInfo.java +++ b/nursing-unit-iot/nu-iot-biz/src/main/java/com/nu/modules/tplink/camera/entity/CameraInfo.java @@ -319,4 +319,5 @@ public class CameraInfo implements Serializable { private String ftpUploadpath; //回放视频转FTP上传路径 /**维修状态*/ private String maintainStatus; + private String remarks;//备注 } diff --git a/nursing-unit-iot/nu-iot-biz/src/main/java/com/nu/modules/tq/common/service/impl/TqDeviceInfoServiceImpl.java b/nursing-unit-iot/nu-iot-biz/src/main/java/com/nu/modules/tq/common/service/impl/TqDeviceInfoServiceImpl.java index 62008522..a9b68ac9 100644 --- a/nursing-unit-iot/nu-iot-biz/src/main/java/com/nu/modules/tq/common/service/impl/TqDeviceInfoServiceImpl.java +++ b/nursing-unit-iot/nu-iot-biz/src/main/java/com/nu/modules/tq/common/service/impl/TqDeviceInfoServiceImpl.java @@ -71,18 +71,6 @@ public class TqDeviceInfoServiceImpl extends ServiceImpl { - - @Autowired - private IElectricityMeterService service; - - /** - * 电表清零回调 - * @param response_content - * @param timestamp - * @param sign - * @return - */ - @PostMapping("/eleResetNotify") - @ApiOperation("电表清零回调") - public String eleResetNotify(String response_content, String timestamp, String sign) { - log.info("eleResetNotify:response_content:{}",response_content); - log.info("eleResetNotify:timestamp:{}",timestamp); - log.info("eleResetNotify:sign:{}",sign); - //-----------加入业务逻辑----------- - String result = service.eleResetNotify(response_content, timestamp, sign); - return result; - } - - /** - * 电表开关闸回调 - * @param response_content - * @param timestamp - * @param sign - * @return - */ - @PostMapping("/eleControlNotify") - @ApiOperation("电表开关闸回调") - public String eleControlNotify(String response_content, String timestamp, String sign) { - log.info("eleControlNotify:response_content:{}",response_content); - log.info("eleControlNotify:timestamp:{}",timestamp); - log.info("eleControlNotify:sign:{}",sign); - //-----------加入业务逻辑----------- - String result = service.eleControlNotify(response_content, timestamp, sign); - return result; - } - - /** - * 电表抄表回调 - * @param response_content - * @param timestamp - * @param sign - * @return - */ - @PostMapping("/eleReadNotify") - @ApiOperation("电表抄表回调") - public String eleReadNotify(String response_content, String timestamp, String sign) { - log.info("eleReadNotify:response_content:{}",response_content); - log.info("eleReadNotify:timestamp:{}",timestamp); - log.info("eleReadNotify:sign:{}",sign); - //-----------加入业务逻辑----------- - String result = service.eleReadNotify(response_content, timestamp, sign); - //-------------------------------- - return result; - } - -} diff --git a/nursing-unit-iot/nu-iot-biz/src/main/java/com/nu/modules/tq/electricity/entity/ElectricityMeter.java b/nursing-unit-iot/nu-iot-biz/src/main/java/com/nu/modules/tq/electricity/entity/ElectricityMeter.java index f3772370..f69afa04 100644 --- a/nursing-unit-iot/nu-iot-biz/src/main/java/com/nu/modules/tq/electricity/entity/ElectricityMeter.java +++ b/nursing-unit-iot/nu-iot-biz/src/main/java/com/nu/modules/tq/electricity/entity/ElectricityMeter.java @@ -19,22 +19,22 @@ import java.io.Serializable; *

功能描述:功能描述 */ @Data -@TableName("nu_iot_tq_electricity_meter") +@TableName("nu_iot_ds_electricity_meter") @Accessors(chain = true) @EqualsAndHashCode(callSuper = false) -@ApiModel(value="nu_iot_tq_electricity_meter对象", description="物联电表") +@ApiModel(value="nu_iot_ds_electricity_meter对象", description="物联电表") public class ElectricityMeter implements Serializable { private static final long serialVersionUID = 1L; @TableId(type = IdType.AUTO) @ApiModelProperty(value = "主键") private Integer id; - @Excel(name = "采集器号", width = 15) - @ApiModelProperty(value = "采集器号,4G、NB设备采集器号与表号一致") - private String cid; + @ApiModelProperty(value = "表号") + @TableField(exist = false) + private String address; @Excel(name = "表号", width = 15) @ApiModelProperty(value = "表号(表地址)") - private String address; + private String sn; @Excel(name = "操作类型", width = 15) @ApiModelProperty(value = "操作类型") @TableField(exist = false) diff --git a/nursing-unit-iot/nu-iot-biz/src/main/java/com/nu/modules/tq/electricity/mapper/xml/ElectricityMeterMapper.xml b/nursing-unit-iot/nu-iot-biz/src/main/java/com/nu/modules/tq/electricity/mapper/xml/ElectricityMeterMapper.xml index a80a89c1..ed6d543b 100644 --- a/nursing-unit-iot/nu-iot-biz/src/main/java/com/nu/modules/tq/electricity/mapper/xml/ElectricityMeterMapper.xml +++ b/nursing-unit-iot/nu-iot-biz/src/main/java/com/nu/modules/tq/electricity/mapper/xml/ElectricityMeterMapper.xml @@ -5,16 +5,13 @@ @@ -57,72 +44,68 @@ - update nu_iot_tq_electricity_meter + update nu_iot_ds_electricity_meter set + ele_value = #{eleValue}, relay_state = #{relayState}, remark = #{remark}, update_time = now() - where cid = #{cid} - and address = #{address} + where sn = #{sn} - update nu_iot_tq_electricity_meter + update nu_iot_ds_electricity_meter set ele_value = #{eleValue}, read_time = #{readTime}, update_time = now() - where cid = #{cid} - and address = #{address} + where sn = #{sn} - update nu_iot_tq_electricity_meter + update nu_iot_ds_electricity_meter set relay_state = #{relayState}, update_time = now() - where cid = #{cid} - and address = #{address} + where sn = #{sn} diff --git a/nursing-unit-iot/nu-iot-biz/src/main/java/com/nu/modules/tq/electricity/service/IElectricityMeterService.java b/nursing-unit-iot/nu-iot-biz/src/main/java/com/nu/modules/tq/electricity/service/IElectricityMeterService.java index 493562d7..290299b4 100644 --- a/nursing-unit-iot/nu-iot-biz/src/main/java/com/nu/modules/tq/electricity/service/IElectricityMeterService.java +++ b/nursing-unit-iot/nu-iot-biz/src/main/java/com/nu/modules/tq/electricity/service/IElectricityMeterService.java @@ -17,12 +17,9 @@ public interface IElectricityMeterService extends IService { Result eleControl(ElectricityMeter electricityMeter); Result eleRead(ElectricityMeter electricityMeter); - String eleResetNotify(String response_content, String timestamp, String sign); - String eleControlNotify(String response_content, String timestamp, String sign); - String eleReadNotify(String response_content, String timestamp, String sign); - - void editHldy(ElectricityMeter electricityMeter); void baoxiu(ElectricityMeter electricityMeter); + + void processReceivedMessage(String payload); } diff --git a/nursing-unit-iot/nu-iot-biz/src/main/java/com/nu/modules/tq/electricity/service/impl/ElectricityMeterServiceImpl.java b/nursing-unit-iot/nu-iot-biz/src/main/java/com/nu/modules/tq/electricity/service/impl/ElectricityMeterServiceImpl.java index 5105f3f3..bac9f9a5 100644 --- a/nursing-unit-iot/nu-iot-biz/src/main/java/com/nu/modules/tq/electricity/service/impl/ElectricityMeterServiceImpl.java +++ b/nursing-unit-iot/nu-iot-biz/src/main/java/com/nu/modules/tq/electricity/service/impl/ElectricityMeterServiceImpl.java @@ -16,9 +16,7 @@ import com.nu.modules.syncLog.entity.SyncBizLog; import com.nu.modules.syncLog.service.ISyncBizLogService; import com.nu.modules.tq.common.entity.TqApiLog; import com.nu.modules.tq.common.service.ITqApiLogService; -import com.nu.modules.tq.utils.HttpTool; -import com.nu.modules.tq.utils.SignTool; -import com.nu.modules.tq.utils.TqApi; +import com.nu.modules.tq.utils.*; import com.nu.modules.tq.water.entity.WaterMeter; import com.nu.utils.RabbitMQUtil; import lombok.extern.slf4j.Slf4j; @@ -59,6 +57,13 @@ public class ElectricityMeterServiceImpl extends ServiceImpl findPage(Page page, ElectricityMeter electricityMeter) { return baseMapper.findPage(page, electricityMeter); @@ -79,87 +84,17 @@ public class ElectricityMeterServiceImpl extends ServiceImpl eleReset(ElectricityMeter electricityMeter) { - ElectricityMeter entity = baseMapper.getElectricityMeter(electricityMeter); - if (entity == null) { - return Result.error("请先同步电表设备信息后再进行清零"); - } - String readTimeStr = entity.getReadTime(); - if (readTimeStr == null || readTimeStr.equals("")) { - return Result.error("为避免结算误差,清零操作时,需要近一小时的抄表数据。"); - } - Date readTime = DateUtil.parse(readTimeStr, "yyyy-MM-dd HH:mm:ss"); - Calendar c = Calendar.getInstance(); - c.add(Calendar.HOUR_OF_DAY, -1); - int comInt = DateUtil.compare(readTime, c.getTime()); - if (comInt < 0) { - return Result.error("为避免结算误差,清零操作时,需要近一小时的抄表数据。"); - } - List> req = new ArrayList<>(); - Map params = new HashMap<>(); - params.put("paymentmode", "noprepay"); - Map item = new HashMap<>(); - item.put("opr_id", HttpTool.generateOperateId()); - item.put("time_out", 0); - item.put("must_online", true); - item.put("retry_times", 1); - item.put("cid", electricityMeter.getCid()); - item.put("address", electricityMeter.getAddress()); - item.put("params", params); - req.add(item); - String responseStr = tqApi.eleReset(req); - JSONObject jsonObject = new JSONObject(responseStr); - String httpStatus = jsonObject.getStr("status"); - if (httpStatus.equals("SUCCESS")) { - String response_content = jsonObject.getStr("response_content"); - JSONArray jsonArray = new JSONArray(response_content); - for (int i = 0; i < jsonArray.size(); i++) { - JSONObject json = (JSONObject) jsonArray.get(i); - String status = json.getStr("status");//状态 - String oprId = json.getStr("opr_id");//操作ID - if (status.equals("SUCCESS")) { - TqApiLog tqApiLog = new TqApiLog(); - tqApiLog.setOprId(oprId); - tqApiLog.setCid(electricityMeter.getCid()); - tqApiLog.setAddress(electricityMeter.getAddress()); - tqApiLog.setType(9); - tqApiLog.setRequestValue(entity.getEleValue()); - tqApiLog.setRequestTime(DateUtil.now()); - tqApiLog.setRequestStatus("清零中"); - logService.insert(tqApiLog); - } else { - String errorMsg = json.getStr("error_msg"); - TqApiLog tqApiLog = new TqApiLog(); - tqApiLog.setOprId(oprId); - tqApiLog.setCid(electricityMeter.getCid()); - tqApiLog.setAddress(electricityMeter.getAddress()); - tqApiLog.setType(9); - tqApiLog.setRequestValue(entity.getEleValue()); - tqApiLog.setRequestTime(DateUtil.now()); - tqApiLog.setRequestStatus("清零失败"); - tqApiLog.setRequestRemark(errorMsg); - logService.insert(tqApiLog); - return Result.error(errorMsg); - } - } - } else { - String errorMsg = jsonObject.getStr("error_msg"); - TqApiLog tqApiLog = new TqApiLog(); - tqApiLog.setCid(electricityMeter.getCid()); - tqApiLog.setAddress(electricityMeter.getAddress()); - tqApiLog.setType(9); - tqApiLog.setRequestValue(entity.getEleValue()); - tqApiLog.setRequestTime(DateUtil.now()); - tqApiLog.setRequestStatus("清零失败"); - tqApiLog.setRequestRemark(errorMsg); - logService.insert(tqApiLog); - return Result.error(errorMsg); + String deviceId = electricityMeter.getSn(); + MqttMessageHandler.DownlinkMessage message = mqttPublisherParams.getResetParams(deviceId); + try { + mqttMessageHandler.publishDownlinkMessage(deviceId,message); + } catch (Exception e) { + e.printStackTrace(); } return Result.OK("清零中,请1分钟后刷新页面"); } @@ -189,82 +124,32 @@ public class ElectricityMeterServiceImpl extends ServiceImpl> req = new ArrayList<>(); - Map item = new HashMap<>(); - item.put("opr_id", HttpTool.generateOperateId()); - item.put("time_out", 0); - item.put("must_online", true); - item.put("retry_times", 1); - item.put("cid", electricityMeter.getCid()); - item.put("address", electricityMeter.getAddress()); - item.put("type", type); - req.add(item); - String responseStr = tqApi.eleControl(req); - JSONObject jsonObject = new JSONObject(responseStr); - String httpStatus = jsonObject.getStr("status"); - if (httpStatus.equals("SUCCESS")) { - String response_content = jsonObject.getStr("response_content"); - JSONArray jsonArray = new JSONArray(response_content); - for (int i = 0; i < jsonArray.size(); i++) { - JSONObject json = (JSONObject) jsonArray.get(i); - String status = json.getStr("status");//状态 - String oprId = json.getStr("opr_id");//操作ID - if (status.equals("SUCCESS")) { - TqApiLog tqApiLog = new TqApiLog(); - tqApiLog.setOprId(oprId); - tqApiLog.setCid(electricityMeter.getCid()); - tqApiLog.setAddress(electricityMeter.getAddress()); - tqApiLog.setType(electricityMeter.getType()); - tqApiLog.setRequestTime(DateUtil.now()); - if (type.equals(10)) { - tqApiLog.setRequestStatus("拉闸中"); - } else { - tqApiLog.setRequestStatus("合闸中"); - } - logService.insert(tqApiLog); - } else { - String errorMsg = json.getStr("error_msg"); - TqApiLog tqApiLog = new TqApiLog(); - tqApiLog.setOprId(oprId); - tqApiLog.setCid(electricityMeter.getCid()); - tqApiLog.setAddress(electricityMeter.getAddress()); - tqApiLog.setType(electricityMeter.getType()); - tqApiLog.setRequestTime(DateUtil.now()); - if (type.equals(10)) { - tqApiLog.setRequestStatus("拉闸失败"); - } else { - tqApiLog.setRequestStatus("合闸失败"); - } - tqApiLog.setRequestRemark(errorMsg); - logService.insert(tqApiLog); - return Result.error(errorMsg); - } + if (type.equals("10")) { + String deviceId = electricityMeter.getSn(); + MqttMessageHandler.DownlinkMessage message = mqttPublisherParams.getCutOffParams(deviceId); + try { + mqttMessageHandler.publishDownlinkMessage(deviceId,message); + } catch (Exception e) { + e.printStackTrace(); } - } else { - String errorMsg = jsonObject.getStr("error_msg"); - TqApiLog tqApiLog = new TqApiLog(); - tqApiLog.setCid(electricityMeter.getCid()); - tqApiLog.setAddress(electricityMeter.getAddress()); - tqApiLog.setType(electricityMeter.getType()); - tqApiLog.setRequestTime(DateUtil.now()); - if (type.equals(10)) { - tqApiLog.setRequestStatus("拉闸失败"); - } else { - tqApiLog.setRequestStatus("合闸失败"); + }else{ + String deviceId = electricityMeter.getSn(); + MqttMessageHandler.DownlinkMessage message = mqttPublisherParams.getConnectedParams(deviceId); + try { + mqttMessageHandler.publishDownlinkMessage(deviceId,message); + } catch (Exception e) { + e.printStackTrace(); } - tqApiLog.setRequestRemark(errorMsg); - logService.insert(tqApiLog); - return Result.error(errorMsg); } - if (type.equals(10)) { + if(type.equals("10")){ return Result.OK("拉闸中,请1分钟后刷新页面"); - } else { + }else{ return Result.OK("合闸中,请1分钟后刷新页面"); } } @@ -305,67 +190,12 @@ public class ElectricityMeterServiceImpl extends ServiceImpl eleRead(ElectricityMeter electricityMeter) { - ElectricityMeter entity = baseMapper.getElectricityMeter(electricityMeter); - if (entity == null) { - return Result.error("请先同步电表设备信息后再进行抄表"); - } - List> req = new ArrayList<>(); - Map item = new HashMap<>(); - item.put("opr_id", HttpTool.generateOperateId()); - item.put("time_out", 0); - item.put("must_online", true); - item.put("retry_times", 1); - item.put("cid", electricityMeter.getCid()); - item.put("address", electricityMeter.getAddress()); - item.put("type", 3); - req.add(item); - String responseStr = tqApi.eleRead(req); - JSONObject jsonObject = new JSONObject(responseStr); - String httpStatus = jsonObject.getStr("status"); - if (httpStatus.equals("SUCCESS")) { - String response_content = jsonObject.getStr("response_content"); - JSONArray jsonArray = new JSONArray(response_content); - for (int i = 0; i < jsonArray.size(); i++) { - JSONObject json = (JSONObject) jsonArray.get(i); - String status = json.getStr("status");//状态 - String oprId = json.getStr("opr_id");//操作ID - if (status.equals("SUCCESS")) { - TqApiLog tqApiLog = new TqApiLog(); - tqApiLog.setOprId(oprId); - tqApiLog.setCid(electricityMeter.getCid()); - tqApiLog.setAddress(electricityMeter.getAddress()); - tqApiLog.setType(3); - tqApiLog.setRequestValue(entity.getEleValue()); - tqApiLog.setRequestTime(DateUtil.now()); - tqApiLog.setRequestStatus("抄表中"); - logService.insert(tqApiLog); - } else { - String errorMsg = json.getStr("error_msg"); - TqApiLog tqApiLog = new TqApiLog(); - tqApiLog.setOprId(oprId); - tqApiLog.setCid(electricityMeter.getCid()); - tqApiLog.setAddress(electricityMeter.getAddress()); - tqApiLog.setType(3); - tqApiLog.setRequestValue(entity.getEleValue()); - tqApiLog.setRequestTime(DateUtil.now()); - tqApiLog.setRequestStatus("抄表失败"); - tqApiLog.setRequestRemark(errorMsg); - logService.insert(tqApiLog); - return Result.error(errorMsg); - } - } - } else { - String errorMsg = jsonObject.getStr("error_msg"); - TqApiLog tqApiLog = new TqApiLog(); - tqApiLog.setCid(electricityMeter.getCid()); - tqApiLog.setAddress(electricityMeter.getAddress()); - tqApiLog.setType(3); - tqApiLog.setRequestValue(entity.getEleValue()); - tqApiLog.setRequestTime(DateUtil.now()); - tqApiLog.setRequestStatus("抄表失败"); - tqApiLog.setRequestRemark(errorMsg); - logService.insert(tqApiLog); - return Result.error(errorMsg); + String deviceId = electricityMeter.getSn(); + MqttMessageHandler.DownlinkMessage message = mqttPublisherParams.getReadingParams(deviceId); + try { + mqttMessageHandler.publishDownlinkMessage(deviceId,message); + } catch (Exception e) { + e.printStackTrace(); } return Result.OK("抄表中,请1分钟后刷新页面"); } @@ -377,145 +207,7 @@ public class ElectricityMeterServiceImpl extends ServiceImpl queryWrapper = new QueryWrapper<>(); - queryWrapper.eq("cid", electricityMeter.getCid()); + queryWrapper.eq("sn", electricityMeter.getSn()); ElectricityMeter oldParam = baseMapper.selectOne(queryWrapper);//查询数据库中该表号数据原始数据 if (oldParam == null) { baseMapper.insert(electricityMeter); @@ -689,5 +313,93 @@ public class ElectricityMeterServiceImpl extends ServiceImpl0){ + eleValue901 = EgT901.substring(0,EgT901.toLowerCase().indexOf("kwh")); + } + ElectricityMeter entity901 = new ElectricityMeter(); + entity901.setSn(sn); + entity901.setEleValue(eleValue901); + entity901.setRelayState(relayState901); + entity901.setReadTime(DateUtil.now()); + baseMapper.update(entity901); + break; + case "902": + // 拉闸合闸 + String result902 = payload.getStr("result");//拉合闸执行结果 + if(result902.equals("SUCCESS")){ +// //拉合闸成功,直接抄表,获取表状态 + ElectricityMeter dme902 = new ElectricityMeter(); + dme902.setSn(sn); +// eleRead(dme902); + ElectricityMeter entity902 = baseMapper.getElectricityMeter(dme902); + if(entity902!=null){ + if(entity902.getRelayState().equals("0")){ + entity902.setRelayState("1"); + }else{ + entity902.setRelayState("0"); + } + baseMapper.updateRelayState(entity902); + } + } + break; + case "904": + // 参数配置,包括清零 + String mtype904 = payload.getStr("mtype");//mtype:200 TCP信息;206 表复位重启;208 表清零 + String dcode904 = payload.getStr("dcode"); + if(dcode904.equals("SUCCESS")){ + if(mtype904.equals("208")){ + ElectricityMeter entity904 = new ElectricityMeter(); + entity904.setSn(sn); + entity904.setEleValue("0"); + entity904.setReadTime(DateUtil.now()); + baseMapper.updateValue(entity904); + } + } + break; + case "601": + // 定时主动上报表计数据 + String EgT601 = payload.getStr("EgT");//总有功电能 + String RelyF601 = payload.getStr("RelyF");//表状态 + String eleValue601 = "0"; + String relayState601 = "0"; + if(RelyF601.equals("ON")){ + relayState601 = "1"; + } + if(EgT601.toLowerCase().indexOf("kwh")>0){ + eleValue601 = EgT601.substring(0,EgT601.toLowerCase().indexOf("kwh")); + } + ElectricityMeter entity601 = new ElectricityMeter(); + entity601.setSn(sn); + entity601.setEleValue(eleValue601); + entity601.setRelayState(relayState601); + entity601.setReadTime(DateUtil.now()); + baseMapper.update(entity601); + break; + case "701": + // 定时主动上报事件数据 + break; + } + } + } diff --git a/nursing-unit-iot/nu-iot-biz/src/main/java/com/nu/modules/tq/utils/MqttConfig.java b/nursing-unit-iot/nu-iot-biz/src/main/java/com/nu/modules/tq/utils/MqttConfig.java new file mode 100644 index 00000000..968a28a6 --- /dev/null +++ b/nursing-unit-iot/nu-iot-biz/src/main/java/com/nu/modules/tq/utils/MqttConfig.java @@ -0,0 +1,69 @@ +package com.nu.modules.tq.utils; + +import lombok.extern.slf4j.Slf4j; +import org.eclipse.paho.client.mqttv3.MqttClient; +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.eclipse.paho.client.mqttv3.MqttException; +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import java.util.UUID; + +/** + * 配置 MQTT 连接 + */ +@Slf4j +@Configuration +@ConfigurationProperties(prefix = "mqtt") +public class MqttConfig { + + @Value("${mqtt.broker-url}") + private String brokerUrl; + + @Value("${mqtt.client-id}") + private String clientId; + + @Value("${mqtt.username}") + private String username; + + @Value("${mqtt.password}") + private String password; + + @Value("${mqtt.clean-session}") + private boolean cleanSession; + + @Value("${mqtt.connection-timeout}") + private int connectionTimeout; + + @Value("${mqtt.keep-alive-interval}") + private int keepAliveInterval; + + @Bean + public MqttConnectOptions mqttConnectOptions() { + MqttConnectOptions options = new MqttConnectOptions(); + options.setCleanSession(cleanSession); + options.setConnectionTimeout(connectionTimeout); + options.setKeepAliveInterval(keepAliveInterval); + options.setAutomaticReconnect(true); + + if (username != null && !username.isEmpty()) { + options.setUserName(username); + } + if (password != null && !password.isEmpty()) { + options.setPassword(password.toCharArray()); + } + + return options; + } + + @Bean + public MqttClient mqttClient() throws MqttException { + String mqttClientId = clientId + UUID.randomUUID(); + MqttClient client = new MqttClient(brokerUrl, mqttClientId, new MemoryPersistence()); + client.connect(mqttConnectOptions()); + return client; + } +} \ No newline at end of file diff --git a/nursing-unit-iot/nu-iot-biz/src/main/java/com/nu/modules/tq/utils/MqttMessageHandler.java b/nursing-unit-iot/nu-iot-biz/src/main/java/com/nu/modules/tq/utils/MqttMessageHandler.java new file mode 100644 index 00000000..a38d1984 --- /dev/null +++ b/nursing-unit-iot/nu-iot-biz/src/main/java/com/nu/modules/tq/utils/MqttMessageHandler.java @@ -0,0 +1,105 @@ +package com.nu.modules.tq.utils; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.nu.modules.tq.electricity.entity.ElectricityMeter; +import com.nu.modules.tq.electricity.service.IElectricityMeterService; +import lombok.Data; +import lombok.extern.slf4j.Slf4j; +import org.eclipse.paho.client.mqttv3.IMqttMessageListener; +import org.eclipse.paho.client.mqttv3.MqttClient; +import org.eclipse.paho.client.mqttv3.MqttException; +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.CommandLineRunner; +import org.springframework.context.annotation.Lazy; +import org.springframework.stereotype.Component; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +@Slf4j +@Component +public class MqttMessageHandler implements CommandLineRunner { + + @Autowired + private MqttClient mqttClient; + + @Autowired + @Lazy + private IElectricityMeterService electricityMeterService; + + @Autowired + private ObjectMapper objectMapper; + + /** + * 发送下行消息(发布1) + * @param deviceId 设备ID + * @param message 消息内容 + */ + public void publishDownlinkMessage(String deviceId, DownlinkMessage message) throws Exception { + String topic = "BY/dn/" + deviceId; + String payload = objectMapper.writeValueAsString(message); + + MqttMessage mqttMessage = new MqttMessage(payload.getBytes()); + mqttMessage.setQos(0); + mqttMessage.setRetained(false); + log.info("发布 - 主题: {}, 消息: {}", topic, payload); + mqttClient.publish(topic, mqttMessage); + log.info("发布成功 - 主题: {}, 消息: {}", topic, payload); + } + + /** + * 订阅上行消息(接收2) + * @param deviceId 设备ID + */ + public void subscribeUplinkMessage(String deviceId) throws MqttException { + String topic = "BY/up/" + deviceId; + mqttClient.subscribe(topic, 0, new IMqttMessageListener() { + @Override + public void messageArrived(String topic, MqttMessage message) throws Exception { + String payload = new String(message.getPayload()); + log.info("收到上行消息, 主题: {}, 内容: {}", topic, payload); + // 解析消息 + electricityMeterService.processReceivedMessage(payload); + } + }); + log.info("已订阅上行主题: {}", topic); + } + + /** + * 取消订阅上行消息 + * @param deviceId 设备ID + */ + public void unsubscribeUplinkMessage(String deviceId) throws MqttException { + String topic = "BY/up/" + deviceId; + log.info("取消已订阅上行主题: {}", topic); + mqttClient.unsubscribe(topic); + log.info("取消已订阅上行主题成功: {}", topic); + } + + @Override + public void run(String... args) throws Exception { + // 启动时订阅示例 + List list = electricityMeterService.findAllList(); + for(int i=0;i payload = new HashMap<>(); + + @Data + public static class Header { + private String name; + private String ID; + } + } + +} diff --git a/nursing-unit-iot/nu-iot-biz/src/main/java/com/nu/modules/tq/utils/MqttPublisherParams.java b/nursing-unit-iot/nu-iot-biz/src/main/java/com/nu/modules/tq/utils/MqttPublisherParams.java new file mode 100644 index 00000000..68a31998 --- /dev/null +++ b/nursing-unit-iot/nu-iot-biz/src/main/java/com/nu/modules/tq/utils/MqttPublisherParams.java @@ -0,0 +1,84 @@ +package com.nu.modules.tq.utils; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import java.util.HashMap; +import java.util.Map; + +/** + * 发布命令参数 + */ +@Component +@Slf4j +public class MqttPublisherParams { + + /** + * 获取抄表参数 + * @return + */ + public MqttMessageHandler.DownlinkMessage getReadingParams(String deviceId){ + MqttMessageHandler.DownlinkMessage message = new MqttMessageHandler.DownlinkMessage(); + MqttMessageHandler.DownlinkMessage.Header header = new MqttMessageHandler.DownlinkMessage.Header(); + header.setName("801"); //801:表⽰读取表计数据 + header.setID(deviceId); + message.setHeader(header); + Map payload = new HashMap<>(); + payload.put("mtype", "101"); //"101"对应上⾏数据中的总有功电量 + message.setPayload(payload); + return message; + } + + /** + * 拉闸 + * @return + */ + public MqttMessageHandler.DownlinkMessage getCutOffParams(String deviceId){ + MqttMessageHandler.DownlinkMessage message = new MqttMessageHandler.DownlinkMessage(); + MqttMessageHandler.DownlinkMessage.Header header = new MqttMessageHandler.DownlinkMessage.Header(); + header.setName("802"); //802:表⽰拉合闸命令 + header.setID(deviceId); + message.setHeader(header); + Map payload = new HashMap<>(); + payload.put("mtype", "OFF"); //mtype:表⽰信息类型;ON---合闸;OFF---拉闸;KEEPON--保电;KEEPOFF--保电取消 + payload.put("passwd", "02123456"); //"101"对应上⾏数据中的总有功电量 + message.setPayload(payload); + return message; + } + + /** + * 合闸 + * @return + */ + public MqttMessageHandler.DownlinkMessage getConnectedParams(String deviceId){ + MqttMessageHandler.DownlinkMessage message = new MqttMessageHandler.DownlinkMessage(); + MqttMessageHandler.DownlinkMessage.Header header = new MqttMessageHandler.DownlinkMessage.Header(); + header.setName("802"); //802:表⽰拉合闸命令 + header.setID(deviceId); + message.setHeader(header); + Map payload = new HashMap<>(); + payload.put("mtype", "ON"); //mtype:表⽰信息类型;ON---合闸;OFF---拉闸;KEEPON--保电;KEEPOFF--保电取消 + payload.put("passwd", "02123456"); //"101"对应上⾏数据中的总有功电量 + message.setPayload(payload); + return message; + } + + /** + * 清零 + * @return + */ + public MqttMessageHandler.DownlinkMessage getResetParams(String deviceId){ + MqttMessageHandler.DownlinkMessage message = new MqttMessageHandler.DownlinkMessage(); + MqttMessageHandler.DownlinkMessage.Header header = new MqttMessageHandler.DownlinkMessage.Header(); + header.setName("804"); //804:参数配置 + header.setID(deviceId); + message.setHeader(header); + Map payload = new HashMap<>(); + payload.put("mtype", "208"); + payload.put("Code", "A5A5A5A5"); + payload.put("passwd", "02123456"); + message.setPayload(payload); + return message; + } + +} diff --git a/nursing-unit-iot/nu-iot-biz/src/main/java/com/nu/mq/device/listener/IotSyncBizMQListener.java b/nursing-unit-iot/nu-iot-biz/src/main/java/com/nu/mq/device/listener/IotSyncBizMQListener.java index b39635b9..be5e0e7a 100644 --- a/nursing-unit-iot/nu-iot-biz/src/main/java/com/nu/mq/device/listener/IotSyncBizMQListener.java +++ b/nursing-unit-iot/nu-iot-biz/src/main/java/com/nu/mq/device/listener/IotSyncBizMQListener.java @@ -115,7 +115,7 @@ public class IotSyncBizMQListener { statusMQDto.setAsyncId(iotCameraInfoMQDto.getId().toString()); statusMQDto.setMessage("成功"); statusMQDto.setPrimaryKey(iotCameraInfoMQDto.getLogId()); - statusMQDto.setNote("摄像头"); + statusMQDto.setNote("TPLINK"); QueryWrapper cameraQw = new QueryWrapper<>(); cameraQw.eq("device_index",iotCameraInfoMQDto.getDeviceIndex()); CameraInfo entity = cameraInfoService.getOne(cameraQw); @@ -138,7 +138,7 @@ public class IotSyncBizMQListener { statusMQDto.setAsyncId(iotCameraInfoMQDto.getId().toString()); statusMQDto.setMessage("失败"); statusMQDto.setPrimaryKey(iotCameraInfoMQDto.getLogId()); - statusMQDto.setNote("摄像头"); + statusMQDto.setNote("TPLINK"); statusList.add(statusMQDto); e.printStackTrace(); } @@ -187,7 +187,7 @@ public class IotSyncBizMQListener { statusMQDto.setPrimaryKey(iotElectricityMeterMQDto.getLogId()); statusMQDto.setNote("电表"); QueryWrapper electricityQw = new QueryWrapper<>(); - electricityQw.eq("cid",iotElectricityMeterMQDto.getCid()); + electricityQw.eq("sn",iotElectricityMeterMQDto.getSn()); ElectricityMeter entity = electricityMeterService.getOne(electricityQw); if(entity!=null){ if(iotElectricityMeterMQDto.getMaintainStatus().equals(entity.getMaintainStatus())){ diff --git a/nursing-unit-iot/nu-iot-biz/src/main/java/com/nu/mq/tq/listenter/IotSyncTqMQListener.java b/nursing-unit-iot/nu-iot-biz/src/main/java/com/nu/mq/tq/listenter/IotSyncTqMQListener.java index 04447851..51018c0a 100644 --- a/nursing-unit-iot/nu-iot-biz/src/main/java/com/nu/mq/tq/listenter/IotSyncTqMQListener.java +++ b/nursing-unit-iot/nu-iot-biz/src/main/java/com/nu/mq/tq/listenter/IotSyncTqMQListener.java @@ -51,8 +51,7 @@ public class IotSyncTqMQListener { statusMQDto.setPrimaryKey(dto.getLogId()); statusMQDto.setNote("电表"); QueryWrapper emQw = new QueryWrapper<>(); - emQw.eq("cid",dto.getCid()); - emQw.eq("address",dto.getAddress()); + emQw.eq("sn",dto.getSn()); ElectricityMeter entity = electricityMeterService.getOne(emQw); if(entity!=null){ entity.setEleValue(dto.getEleValue()); @@ -90,8 +89,7 @@ public class IotSyncTqMQListener { statusMQDto.setPrimaryKey(dto.getLogId()); statusMQDto.setNote("电表"); QueryWrapper emQw = new QueryWrapper<>(); - emQw.eq("cid",dto.getCid()); - emQw.eq("address",dto.getAddress()); + emQw.eq("sn",dto.getSn()); ElectricityMeter entity = electricityMeterService.getOne(emQw); if(entity!=null){ entity.setRelayState(dto.getRelayState()); diff --git a/nursing-unit-system/nu-system-start/src/main/resources/application-dev-nu002.yml b/nursing-unit-system/nu-system-start/src/main/resources/application-dev-nu002.yml index 8cf397f1..de048d9e 100644 --- a/nursing-unit-system/nu-system-start/src/main/resources/application-dev-nu002.yml +++ b/nursing-unit-system/nu-system-start/src/main/resources/application-dev-nu002.yml @@ -409,3 +409,13 @@ nu: #试验机构信息 master: code: 101 + +# MQTT 配置 +mqtt: + broker-url: tcp://121.36.88.64:11001 + client-id: mqtt-dev-nursing-unit-002 + username: admin + password: admin@123.. + clean-session: true + connection-timeout: 30 + keep-alive-interval: 60 \ No newline at end of file diff --git a/nursing-unit-system/nu-system-start/src/main/resources/application-dev-nu003.yml b/nursing-unit-system/nu-system-start/src/main/resources/application-dev-nu003.yml index 1a324b6f..9dd55678 100644 --- a/nursing-unit-system/nu-system-start/src/main/resources/application-dev-nu003.yml +++ b/nursing-unit-system/nu-system-start/src/main/resources/application-dev-nu003.yml @@ -409,3 +409,13 @@ nu: #试验机构信息 master: code: 101 + +# MQTT 配置 +mqtt: + broker-url: tcp://121.36.88.64:11001 + client-id: mqtt-dev-nursing-unit-003 + username: admin + password: admin@123.. + clean-session: true + connection-timeout: 30 + keep-alive-interval: 60 \ No newline at end of file diff --git a/nursing-unit-system/nu-system-start/src/main/resources/application-dev.yml b/nursing-unit-system/nu-system-start/src/main/resources/application-dev.yml index f6fb2565..8fe64cb9 100644 --- a/nursing-unit-system/nu-system-start/src/main/resources/application-dev.yml +++ b/nursing-unit-system/nu-system-start/src/main/resources/application-dev.yml @@ -412,3 +412,13 @@ nu: #试验机构信息 master: code: 101 + +# MQTT 配置 +mqtt: + broker-url: tcp://121.36.88.64:11001 + client-id: mqtt-dev-nursing-unit-001 + username: admin + password: admin@123.. + clean-session: true + connection-timeout: 30 + keep-alive-interval: 60 \ No newline at end of file diff --git a/nursing-unit-system/nu-system-start/src/main/resources/application-uat.yml b/nursing-unit-system/nu-system-start/src/main/resources/application-uat.yml index e88327d5..07b61334 100644 --- a/nursing-unit-system/nu-system-start/src/main/resources/application-uat.yml +++ b/nursing-unit-system/nu-system-start/src/main/resources/application-uat.yml @@ -409,3 +409,13 @@ nu: #试验机构信息 master: code: 101 + +# MQTT 配置 +mqtt: + broker-url: tcp://121.36.88.64:11001 + client-id: mqtt-nursing-unit-001 + username: admin + password: admin@123.. + clean-session: true + connection-timeout: 30 + keep-alive-interval: 60 diff --git a/nursing-unit-system/nu-system-start/src/main/resources/application-uat102.yml b/nursing-unit-system/nu-system-start/src/main/resources/application-uat102.yml index ddb84c1e..f272a532 100644 --- a/nursing-unit-system/nu-system-start/src/main/resources/application-uat102.yml +++ b/nursing-unit-system/nu-system-start/src/main/resources/application-uat102.yml @@ -409,3 +409,13 @@ nu: #试验机构信息 master: code: 101 + +# MQTT 配置 +mqtt: + broker-url: tcp://121.36.88.64:11001 + client-id: mqtt-nursing-unit-002 + username: admin + password: admin@123.. + clean-session: true + connection-timeout: 30 + keep-alive-interval: 60 \ No newline at end of file diff --git a/nursing-unit-system/nu-system-start/src/main/resources/application-uat103.yml b/nursing-unit-system/nu-system-start/src/main/resources/application-uat103.yml index 9ca156d9..e0653839 100644 --- a/nursing-unit-system/nu-system-start/src/main/resources/application-uat103.yml +++ b/nursing-unit-system/nu-system-start/src/main/resources/application-uat103.yml @@ -409,3 +409,13 @@ nu: #试验机构信息 master: code: 101 + +# MQTT 配置 +mqtt: + broker-url: tcp://121.36.88.64:11001 + client-id: mqtt-nursing-unit-003 + username: admin + password: admin@123.. + clean-session: true + connection-timeout: 30 + keep-alive-interval: 60 \ No newline at end of file diff --git a/nursing-unit-system/nu-system-start/src/main/resources/application-uat104.yml b/nursing-unit-system/nu-system-start/src/main/resources/application-uat104.yml index 1a68a6ca..336dc8a8 100644 --- a/nursing-unit-system/nu-system-start/src/main/resources/application-uat104.yml +++ b/nursing-unit-system/nu-system-start/src/main/resources/application-uat104.yml @@ -409,3 +409,13 @@ nu: #试验机构信息 master: code: 101 + +# MQTT 配置 +mqtt: + broker-url: tcp://121.36.88.64:11001 + client-id: mqtt-nursing-unit-004 + username: admin + password: admin@123.. + clean-session: true + connection-timeout: 30 + keep-alive-interval: 60 \ No newline at end of file