物联智能电表代码调整

This commit is contained in:
曹磊 2026-03-12 17:24:41 +08:00
parent a5b97d9524
commit 2a997f68ce
21 changed files with 501 additions and 576 deletions

View File

@ -12,10 +12,8 @@ import lombok.Data;
public class IotElectricityMeterMQDto{
/**主键*/
private Integer id;
/**采集器*/
private String cid;
/**表号*/
private String address;
private String sn;
/**电表值*/
private String eleValue;
/**阀门状态*/

View File

@ -59,6 +59,13 @@
<scope>compile</scope>
</dependency>
<!-- MQTT 客户端依赖 -->
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.5</version>
</dependency>
</dependencies>
</project>

View File

@ -319,4 +319,5 @@ public class CameraInfo implements Serializable {
private String ftpUploadpath; //回放视频转FTP上传路径
/**维修状态*/
private String maintainStatus;
private String remarks;//备注
}

View File

@ -71,18 +71,6 @@ public class TqDeviceInfoServiceImpl extends ServiceImpl<TqDeviceInfoMapper, TqD
}else{
waterMeterMapper.update(waterMeter);
}
}else{
ElectricityMeter electricityMeter = new ElectricityMeter();
electricityMeter.setCid(cid);
electricityMeter.setAddress(address);
electricityMeter.setRelayState(relayState);
electricityMeter.setRemark(remark);
ElectricityMeter entity = electricityMeterMapper.getElectricityMeter(electricityMeter);
if(entity == null){
electricityMeterMapper.insert(electricityMeter);
}else{
electricityMeterMapper.update(electricityMeter);
}
}
}
}

View File

@ -1,87 +0,0 @@
package com.nu.modules.tq.electricity.api;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.nu.modules.tq.electricity.entity.ElectricityMeter;
import com.nu.modules.tq.utils.SignTool;
import io.swagger.annotations.ApiOperation;
import lombok.extern.slf4j.Slf4j;
import org.jeecg.common.system.base.controller.JeecgController;
import com.nu.modules.tq.electricity.service.IElectricityMeterService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.servlet.http.HttpServletRequest;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.*;
@Slf4j
@RestController
@RequestMapping("/iot/tq/api/electricityMeter")
public class ElectricityMeterApiController extends JeecgController<ElectricityMeter, IElectricityMeterService> {
@Autowired
private IElectricityMeterService service;
/**
* 电表清零回调
* @param response_content
* @param timestamp
* @param sign
* @return
*/
@PostMapping("/eleResetNotify")
@ApiOperation("电表清零回调")
public String eleResetNotify(String response_content, String timestamp, String sign) {
log.info("eleResetNotify:response_content:{}",response_content);
log.info("eleResetNotify:timestamp:{}",timestamp);
log.info("eleResetNotify:sign:{}",sign);
//-----------加入业务逻辑-----------
String result = service.eleResetNotify(response_content, timestamp, sign);
return result;
}
/**
* 电表开关闸回调
* @param response_content
* @param timestamp
* @param sign
* @return
*/
@PostMapping("/eleControlNotify")
@ApiOperation("电表开关闸回调")
public String eleControlNotify(String response_content, String timestamp, String sign) {
log.info("eleControlNotify:response_content:{}",response_content);
log.info("eleControlNotify:timestamp:{}",timestamp);
log.info("eleControlNotify:sign:{}",sign);
//-----------加入业务逻辑-----------
String result = service.eleControlNotify(response_content, timestamp, sign);
return result;
}
/**
* 电表抄表回调
* @param response_content
* @param timestamp
* @param sign
* @return
*/
@PostMapping("/eleReadNotify")
@ApiOperation("电表抄表回调")
public String eleReadNotify(String response_content, String timestamp, String sign) {
log.info("eleReadNotify:response_content:{}",response_content);
log.info("eleReadNotify:timestamp:{}",timestamp);
log.info("eleReadNotify:sign:{}",sign);
//-----------加入业务逻辑-----------
String result = service.eleReadNotify(response_content, timestamp, sign);
//--------------------------------
return result;
}
}

View File

@ -19,22 +19,22 @@ import java.io.Serializable;
* <p>功能描述:功能描述
*/
@Data
@TableName("nu_iot_tq_electricity_meter")
@TableName("nu_iot_ds_electricity_meter")
@Accessors(chain = true)
@EqualsAndHashCode(callSuper = false)
@ApiModel(value="nu_iot_tq_electricity_meter对象", description="物联电表")
@ApiModel(value="nu_iot_ds_electricity_meter对象", description="物联电表")
public class ElectricityMeter implements Serializable {
private static final long serialVersionUID = 1L;
@TableId(type = IdType.AUTO)
@ApiModelProperty(value = "主键")
private Integer id;
@Excel(name = "采集器号", width = 15)
@ApiModelProperty(value = "采集器号,4G、NB设备采集器号与表号一致")
private String cid;
@ApiModelProperty(value = "表号")
@TableField(exist = false)
private String address;
@Excel(name = "表号", width = 15)
@ApiModelProperty(value = "表号(表地址)")
private String address;
private String sn;
@Excel(name = "操作类型", width = 15)
@ApiModelProperty(value = "操作类型")
@TableField(exist = false)

View File

@ -5,16 +5,13 @@
<select id="findPage" parameterType="com.nu.modules.tq.electricity.entity.ElectricityMeter" resultType="com.nu.modules.tq.electricity.entity.ElectricityMeter">
select
a.id,
a.cid,
a.address,
a.sn,
a.sn as address,
a.ele_value as eleValue,
a.relay_state as relayState,
a.read_time as readTime,
a.remark,
b.online,
b.csq,
b.disconnect_time as disconnectTime,
b.connect_time as connectTime,
( case when timestampdiff(minute, str_to_date(read_time, '%y-%m-%d %h:%i:%s'), now()) > 120 then 'false' else 'true' end ) as online,
a.nu_id as nuId,
c.nu_name as nuName,
a.depart_id as departId,
@ -23,12 +20,11 @@
a.old_server_url ,
a.sync_type,
a.maintain_status
from nu_iot_tq_electricity_meter a
left join nu_iot_tq_collector b on a.cid = b.cid
from nu_iot_ds_electricity_meter a
left join nu_base_info c on a.nu_id = c.nu_id
<where>
<if test="params.address != null and params.address != ''">
AND a.address like concat('%',#{params.address},'%')
<if test="params.sn != null and params.sn != ''">
AND a.sn like concat('%',#{params.sn},'%')
</if>
<if test="params.nuId != null and params.nuId != ''">
AND a.nu_id = #{params.nuId}
@ -40,16 +36,7 @@
AND a.relay_state = #{params.relayState}
</if>
<if test="params.online != null and params.online != ''">
AND b.online = #{params.online}
</if>
<if test="params.csq != null and params.csq != '' and params.csq == 1">
AND b.csq &gt; 20
</if>
<if test="params.csq != null and params.csq != '' and params.csq == 2">
AND b.csq &gt;= 10 AND b.csq &lt;=20
</if>
<if test="params.csq != null and params.csq != '' and params.csq == 3">
AND b.csq &lt; 10
AND ( case when timestampdiff(minute, str_to_date(read_time, '%y-%m-%d %h:%i:%s'), now()) > 120 then 'false' else 'true' end ) = #{params.online}
</if>
</where>
</select>
@ -57,72 +44,68 @@
<select id="findAllList" parameterType="com.nu.modules.tq.electricity.entity.ElectricityMeter" resultType="com.nu.modules.tq.electricity.entity.ElectricityMeter">
select
a.id,
a.cid,
a.address,
a.sn,
a.sn as address,
a.ele_value as eleValue,
a.relay_state as relayState,
a.read_time as readTime,
a.remark,
b.online,
b.csq,
b.disconnect_time as disconnectTime,
b.connect_time as connectTime,
( case when timestampdiff(minute, str_to_date(read_time, '%y-%m-%d %h:%i:%s'), now()) > 120 then 'false' else 'true' end ) as online,
a.nu_id as nuId,
a.nu_name as nuName,
a.depart_id as departId,
a.depart_name as departName,
a.depart_server_url as departServerUrl,
a.old_server_url ,
a.sync_type
from nu_iot_tq_electricity_meter a
a.sync_type,
a.maintain_status
from nu_iot_ds_electricity_meter a
</select>
<select id="getElectricityMeter" parameterType="com.nu.modules.tq.electricity.entity.ElectricityMeter" resultType="com.nu.modules.tq.electricity.entity.ElectricityMeter">
select
id,
cid,
address,
sn,
sn as address,
ele_value as eleValue,
relay_state as relayState,
read_time as readTime,
( case when timestampdiff(minute, str_to_date(read_time, '%y-%m-%d %h:%i:%s'), now()) > 120 then 'false' else 'true' end ) as online,
remark,
nu_id as nuId,
nu_name as nuName,
depart_id as departId,
depart_name as departName,
depart_server_url as departServerUrl
from nu_iot_tq_electricity_meter
where cid = #{cid}
and address = #{address}
from nu_iot_ds_electricity_meter
where sn = #{sn}
</select>
<update id="update" parameterType="com.nu.modules.tq.electricity.entity.ElectricityMeter">
update nu_iot_tq_electricity_meter
update nu_iot_ds_electricity_meter
set
ele_value = #{eleValue},
relay_state = #{relayState},
remark = #{remark},
update_time = now()
where cid = #{cid}
and address = #{address}
where sn = #{sn}
</update>
<update id="updateValue" parameterType="com.nu.modules.tq.electricity.entity.ElectricityMeter">
update nu_iot_tq_electricity_meter
update nu_iot_ds_electricity_meter
set
ele_value = #{eleValue},
read_time = #{readTime},
update_time = now()
where cid = #{cid}
and address = #{address}
where sn = #{sn}
</update>
<update id="updateRelayState" parameterType="com.nu.modules.tq.electricity.entity.ElectricityMeter">
update nu_iot_tq_electricity_meter
update nu_iot_ds_electricity_meter
set
relay_state = #{relayState},
update_time = now()
where cid = #{cid}
and address = #{address}
where sn = #{sn}
</update>

View File

@ -17,12 +17,9 @@ public interface IElectricityMeterService extends IService<ElectricityMeter> {
Result<String> eleControl(ElectricityMeter electricityMeter);
Result<String> eleRead(ElectricityMeter electricityMeter);
String eleResetNotify(String response_content, String timestamp, String sign);
String eleControlNotify(String response_content, String timestamp, String sign);
String eleReadNotify(String response_content, String timestamp, String sign);
void editHldy(ElectricityMeter electricityMeter);
void baoxiu(ElectricityMeter electricityMeter);
void processReceivedMessage(String payload);
}

View File

@ -16,9 +16,7 @@ import com.nu.modules.syncLog.entity.SyncBizLog;
import com.nu.modules.syncLog.service.ISyncBizLogService;
import com.nu.modules.tq.common.entity.TqApiLog;
import com.nu.modules.tq.common.service.ITqApiLogService;
import com.nu.modules.tq.utils.HttpTool;
import com.nu.modules.tq.utils.SignTool;
import com.nu.modules.tq.utils.TqApi;
import com.nu.modules.tq.utils.*;
import com.nu.modules.tq.water.entity.WaterMeter;
import com.nu.utils.RabbitMQUtil;
import lombok.extern.slf4j.Slf4j;
@ -59,6 +57,13 @@ public class ElectricityMeterServiceImpl extends ServiceImpl<ElectricityMeterMap
@Autowired
private ElectricityMeterServiceImpl syncImpl;
@Autowired
@Lazy
private MqttMessageHandler mqttMessageHandler;
@Autowired
private MqttPublisherParams mqttPublisherParams;
@Override
public IPage<ElectricityMeter> findPage(Page<ElectricityMeter> page, ElectricityMeter electricityMeter) {
return baseMapper.findPage(page, electricityMeter);
@ -79,87 +84,17 @@ public class ElectricityMeterServiceImpl extends ServiceImpl<ElectricityMeterMap
baseMapper.updateRelayState(electricityMeter);
}
/**
* 清零
*/
@Override
public Result<String> eleReset(ElectricityMeter electricityMeter) {
ElectricityMeter entity = baseMapper.getElectricityMeter(electricityMeter);
if (entity == null) {
return Result.error("请先同步电表设备信息后再进行清零");
}
String readTimeStr = entity.getReadTime();
if (readTimeStr == null || readTimeStr.equals("")) {
return Result.error("为避免结算误差,清零操作时,需要近一小时的抄表数据。");
}
Date readTime = DateUtil.parse(readTimeStr, "yyyy-MM-dd HH:mm:ss");
Calendar c = Calendar.getInstance();
c.add(Calendar.HOUR_OF_DAY, -1);
int comInt = DateUtil.compare(readTime, c.getTime());
if (comInt < 0) {
return Result.error("为避免结算误差,清零操作时,需要近一小时的抄表数据。");
}
List<Map<String, Object>> req = new ArrayList<>();
Map<String, Object> params = new HashMap<>();
params.put("paymentmode", "noprepay");
Map<String, Object> item = new HashMap<>();
item.put("opr_id", HttpTool.generateOperateId());
item.put("time_out", 0);
item.put("must_online", true);
item.put("retry_times", 1);
item.put("cid", electricityMeter.getCid());
item.put("address", electricityMeter.getAddress());
item.put("params", params);
req.add(item);
String responseStr = tqApi.eleReset(req);
JSONObject jsonObject = new JSONObject(responseStr);
String httpStatus = jsonObject.getStr("status");
if (httpStatus.equals("SUCCESS")) {
String response_content = jsonObject.getStr("response_content");
JSONArray jsonArray = new JSONArray(response_content);
for (int i = 0; i < jsonArray.size(); i++) {
JSONObject json = (JSONObject) jsonArray.get(i);
String status = json.getStr("status");//状态
String oprId = json.getStr("opr_id");//操作ID
if (status.equals("SUCCESS")) {
TqApiLog tqApiLog = new TqApiLog();
tqApiLog.setOprId(oprId);
tqApiLog.setCid(electricityMeter.getCid());
tqApiLog.setAddress(electricityMeter.getAddress());
tqApiLog.setType(9);
tqApiLog.setRequestValue(entity.getEleValue());
tqApiLog.setRequestTime(DateUtil.now());
tqApiLog.setRequestStatus("清零中");
logService.insert(tqApiLog);
} else {
String errorMsg = json.getStr("error_msg");
TqApiLog tqApiLog = new TqApiLog();
tqApiLog.setOprId(oprId);
tqApiLog.setCid(electricityMeter.getCid());
tqApiLog.setAddress(electricityMeter.getAddress());
tqApiLog.setType(9);
tqApiLog.setRequestValue(entity.getEleValue());
tqApiLog.setRequestTime(DateUtil.now());
tqApiLog.setRequestStatus("清零失败");
tqApiLog.setRequestRemark(errorMsg);
logService.insert(tqApiLog);
return Result.error(errorMsg);
}
}
} else {
String errorMsg = jsonObject.getStr("error_msg");
TqApiLog tqApiLog = new TqApiLog();
tqApiLog.setCid(electricityMeter.getCid());
tqApiLog.setAddress(electricityMeter.getAddress());
tqApiLog.setType(9);
tqApiLog.setRequestValue(entity.getEleValue());
tqApiLog.setRequestTime(DateUtil.now());
tqApiLog.setRequestStatus("清零失败");
tqApiLog.setRequestRemark(errorMsg);
logService.insert(tqApiLog);
return Result.error(errorMsg);
String deviceId = electricityMeter.getSn();
MqttMessageHandler.DownlinkMessage message = mqttPublisherParams.getResetParams(deviceId);
try {
mqttMessageHandler.publishDownlinkMessage(deviceId,message);
} catch (Exception e) {
e.printStackTrace();
}
return Result.OK("清零中请1分钟后刷新页面");
}
@ -189,82 +124,32 @@ public class ElectricityMeterServiceImpl extends ServiceImpl<ElectricityMeterMap
Integer type = electricityMeter.getType();
ElectricityMeter entity = baseMapper.getElectricityMeter(electricityMeter);
if (entity == null) {
if (type.equals(10)) {
if (type.equals("10")) {
return Result.error("请先同步电表设备信息后再进行拉闸");
} else {
return Result.error("请先同步电表设备信息后再进行合闸");
}
}
List<Map<String, Object>> req = new ArrayList<>();
Map<String, Object> item = new HashMap<>();
item.put("opr_id", HttpTool.generateOperateId());
item.put("time_out", 0);
item.put("must_online", true);
item.put("retry_times", 1);
item.put("cid", electricityMeter.getCid());
item.put("address", electricityMeter.getAddress());
item.put("type", type);
req.add(item);
String responseStr = tqApi.eleControl(req);
JSONObject jsonObject = new JSONObject(responseStr);
String httpStatus = jsonObject.getStr("status");
if (httpStatus.equals("SUCCESS")) {
String response_content = jsonObject.getStr("response_content");
JSONArray jsonArray = new JSONArray(response_content);
for (int i = 0; i < jsonArray.size(); i++) {
JSONObject json = (JSONObject) jsonArray.get(i);
String status = json.getStr("status");//状态
String oprId = json.getStr("opr_id");//操作ID
if (status.equals("SUCCESS")) {
TqApiLog tqApiLog = new TqApiLog();
tqApiLog.setOprId(oprId);
tqApiLog.setCid(electricityMeter.getCid());
tqApiLog.setAddress(electricityMeter.getAddress());
tqApiLog.setType(electricityMeter.getType());
tqApiLog.setRequestTime(DateUtil.now());
if (type.equals(10)) {
tqApiLog.setRequestStatus("拉闸中");
} else {
tqApiLog.setRequestStatus("合闸中");
}
logService.insert(tqApiLog);
} else {
String errorMsg = json.getStr("error_msg");
TqApiLog tqApiLog = new TqApiLog();
tqApiLog.setOprId(oprId);
tqApiLog.setCid(electricityMeter.getCid());
tqApiLog.setAddress(electricityMeter.getAddress());
tqApiLog.setType(electricityMeter.getType());
tqApiLog.setRequestTime(DateUtil.now());
if (type.equals(10)) {
tqApiLog.setRequestStatus("拉闸失败");
} else {
tqApiLog.setRequestStatus("合闸失败");
}
tqApiLog.setRequestRemark(errorMsg);
logService.insert(tqApiLog);
return Result.error(errorMsg);
}
if (type.equals("10")) {
String deviceId = electricityMeter.getSn();
MqttMessageHandler.DownlinkMessage message = mqttPublisherParams.getCutOffParams(deviceId);
try {
mqttMessageHandler.publishDownlinkMessage(deviceId,message);
} catch (Exception e) {
e.printStackTrace();
}
} else {
String errorMsg = jsonObject.getStr("error_msg");
TqApiLog tqApiLog = new TqApiLog();
tqApiLog.setCid(electricityMeter.getCid());
tqApiLog.setAddress(electricityMeter.getAddress());
tqApiLog.setType(electricityMeter.getType());
tqApiLog.setRequestTime(DateUtil.now());
if (type.equals(10)) {
tqApiLog.setRequestStatus("拉闸失败");
} else {
tqApiLog.setRequestStatus("合闸失败");
}else{
String deviceId = electricityMeter.getSn();
MqttMessageHandler.DownlinkMessage message = mqttPublisherParams.getConnectedParams(deviceId);
try {
mqttMessageHandler.publishDownlinkMessage(deviceId,message);
} catch (Exception e) {
e.printStackTrace();
}
tqApiLog.setRequestRemark(errorMsg);
logService.insert(tqApiLog);
return Result.error(errorMsg);
}
if (type.equals(10)) {
if(type.equals("10")){
return Result.OK("拉闸中请1分钟后刷新页面");
} else {
}else{
return Result.OK("合闸中请1分钟后刷新页面");
}
}
@ -305,67 +190,12 @@ public class ElectricityMeterServiceImpl extends ServiceImpl<ElectricityMeterMap
*/
@Override
public Result<String> eleRead(ElectricityMeter electricityMeter) {
ElectricityMeter entity = baseMapper.getElectricityMeter(electricityMeter);
if (entity == null) {
return Result.error("请先同步电表设备信息后再进行抄表");
}
List<Map<String, Object>> req = new ArrayList<>();
Map<String, Object> item = new HashMap<>();
item.put("opr_id", HttpTool.generateOperateId());
item.put("time_out", 0);
item.put("must_online", true);
item.put("retry_times", 1);
item.put("cid", electricityMeter.getCid());
item.put("address", electricityMeter.getAddress());
item.put("type", 3);
req.add(item);
String responseStr = tqApi.eleRead(req);
JSONObject jsonObject = new JSONObject(responseStr);
String httpStatus = jsonObject.getStr("status");
if (httpStatus.equals("SUCCESS")) {
String response_content = jsonObject.getStr("response_content");
JSONArray jsonArray = new JSONArray(response_content);
for (int i = 0; i < jsonArray.size(); i++) {
JSONObject json = (JSONObject) jsonArray.get(i);
String status = json.getStr("status");//状态
String oprId = json.getStr("opr_id");//操作ID
if (status.equals("SUCCESS")) {
TqApiLog tqApiLog = new TqApiLog();
tqApiLog.setOprId(oprId);
tqApiLog.setCid(electricityMeter.getCid());
tqApiLog.setAddress(electricityMeter.getAddress());
tqApiLog.setType(3);
tqApiLog.setRequestValue(entity.getEleValue());
tqApiLog.setRequestTime(DateUtil.now());
tqApiLog.setRequestStatus("抄表中");
logService.insert(tqApiLog);
} else {
String errorMsg = json.getStr("error_msg");
TqApiLog tqApiLog = new TqApiLog();
tqApiLog.setOprId(oprId);
tqApiLog.setCid(electricityMeter.getCid());
tqApiLog.setAddress(electricityMeter.getAddress());
tqApiLog.setType(3);
tqApiLog.setRequestValue(entity.getEleValue());
tqApiLog.setRequestTime(DateUtil.now());
tqApiLog.setRequestStatus("抄表失败");
tqApiLog.setRequestRemark(errorMsg);
logService.insert(tqApiLog);
return Result.error(errorMsg);
}
}
} else {
String errorMsg = jsonObject.getStr("error_msg");
TqApiLog tqApiLog = new TqApiLog();
tqApiLog.setCid(electricityMeter.getCid());
tqApiLog.setAddress(electricityMeter.getAddress());
tqApiLog.setType(3);
tqApiLog.setRequestValue(entity.getEleValue());
tqApiLog.setRequestTime(DateUtil.now());
tqApiLog.setRequestStatus("抄表失败");
tqApiLog.setRequestRemark(errorMsg);
logService.insert(tqApiLog);
return Result.error(errorMsg);
String deviceId = electricityMeter.getSn();
MqttMessageHandler.DownlinkMessage message = mqttPublisherParams.getReadingParams(deviceId);
try {
mqttMessageHandler.publishDownlinkMessage(deviceId,message);
} catch (Exception e) {
e.printStackTrace();
}
return Result.OK("抄表中请1分钟后刷新页面");
}
@ -377,145 +207,7 @@ public class ElectricityMeterServiceImpl extends ServiceImpl<ElectricityMeterMap
return syncImpl.eleRead(electricityMeter);
}
/**
* 清零回调通知
*/
@Override
public String eleResetNotify(String response_content, String timestamp, String sign) {
// 验签
if (!SignTool.checkSign(response_content, timestamp, sign, tqApi.getTqConfig().getRandomCode())) {
log.info("eleResetNotify:sign check failed");
return "sign check failed";
}
try {
JSONArray jsonArray = new JSONArray(response_content);
for (int i = 0; i < jsonArray.size(); i++) {
JSONObject jsonObject = (JSONObject) jsonArray.get(i);
String status = jsonObject.getStr("status");//返回状态
String oprId = jsonObject.getStr("opr_id");//操作ID
String resolveTime = jsonObject.getStr("resolve_time");//反馈时间
String data = jsonObject.getStr("data");//反馈备注
TqApiLog tqApiLog = new TqApiLog();
tqApiLog.setOprId(oprId);
tqApiLog.setResolveValue("0");
tqApiLog.setResolveTime(resolveTime);
if (status.equals("SUCCESS")) {
tqApiLog.setResolveStatus("清零成功");
} else {
tqApiLog.setResolveStatus("清零失败");
}
tqApiLog.setResolveRemark(data);
TqApiLog logEntity = logService.getApiLog(tqApiLog);
if (logEntity != null) {
tqApiLog.setId(logEntity.getId());
logService.update(tqApiLog);
}
if (status.equals("SUCCESS")) {
if (logEntity != null) {
String cid = logEntity.getCid();
String address = logEntity.getAddress();
ElectricityMeter em = new ElectricityMeter();
em.setCid(cid);
em.setAddress(address);
em.setEleValue("0");
em.setReadTime(DateUtils.formatDateTime());
baseMapper.updateValue(em);
syncCleanMq(em);
}
}
}
} catch (Exception e) {
return "FAIL";
}
return "SUCCESS";
}
/**
* 清零同步到业务系统
*
* @param electricityMeter
*/
private void syncCleanMq(ElectricityMeter electricityMeter) {
ElectricityMeter entity = baseMapper.getElectricityMeter(electricityMeter);
if (entity != null) {
IotElectricityMeterMQDto iem = new IotElectricityMeterMQDto();
BeanUtils.copyProperties(entity, iem);
String json = JSON.toJSONString(entity);
String logId = UuidUtils.getUUID();
SyncBizLog log = new SyncBizLog();
log.setLogId(logId);
log.setOrgCode(entity.getDepartServerUrl());
log.setOrgName(entity.getDepartName());
log.setContent(json);
log.setSyncType("更新");
log.setStatus("同步中");
log.setServerType("电表");
bizLogService.addLog(log);
iem.setLogId(logId);
rabbitMQUtil.sendToExchange("hldy.iotDeviceValues", "hldy.iotElectricity.eleValue.async", iem);
}
}
/**
* 开关闸回调通知
*/
@Override
public String eleControlNotify(String response_content, String timestamp, String sign) {
// 验签
if (!SignTool.checkSign(response_content, timestamp, sign, tqApi.getTqConfig().getRandomCode())) {
log.info("eleResetNotify:sign check failed");
return "sign check failed";
}
try {
JSONArray jsonArray = new JSONArray(response_content);
for (int i = 0; i < jsonArray.size(); i++) {
JSONObject jsonObject = (JSONObject) jsonArray.get(i);
String status = jsonObject.getStr("status");//返回状态
String oprId = jsonObject.getStr("opr_id");//操作ID
String resolveTime = jsonObject.getStr("resolve_time");//反馈时间
TqApiLog tqApiLog = new TqApiLog();
tqApiLog.setOprId(oprId);
tqApiLog.setResolveTime(resolveTime);
TqApiLog logEntity = logService.getApiLog(tqApiLog);
if (logEntity != null) {
Integer type = logEntity.getType();
String typeStr = "";
if (type.equals(10)) {
typeStr = "拉闸";
} else {
typeStr = "合闸";
}
if (status.equals("SUCCESS")) {
tqApiLog.setResolveStatus(typeStr + "成功");
} else {
tqApiLog.setResolveStatus(typeStr + "失败");
}
tqApiLog.setId(logEntity.getId());
logService.update(tqApiLog);
}
if (status.equals("SUCCESS")) {
if (logEntity != null) {
String cid = logEntity.getCid();
String address = logEntity.getAddress();
Integer type = logEntity.getType();
ElectricityMeter em = new ElectricityMeter();
em.setCid(cid);
em.setAddress(address);
if (type.equals(10)) {
em.setRelayState("0");
} else {
em.setRelayState("1");
}
baseMapper.updateRelayState(em);
syncControlMq(em);
}
}
}
} catch (Exception e) {
return "FAIL";
}
return "SUCCESS";
}
/**
* 开关闸同步到业务系统
@ -543,74 +235,6 @@ public class ElectricityMeterServiceImpl extends ServiceImpl<ElectricityMeterMap
}
}
/**
* 抄表回调通知
*/
@Override
public String eleReadNotify(String response_content, String timestamp, String sign) {
// 验签
if (!SignTool.checkSign(response_content, timestamp, sign, tqApi.getTqConfig().getRandomCode())) {
log.info("eleResetNotify:sign check failed");
return "sign check failed";
}
try {
JSONArray jsonArray = new JSONArray(response_content);
for (int i = 0; i < jsonArray.size(); i++) {
JSONObject jsonObject = (JSONObject) jsonArray.get(i);
String status = jsonObject.getStr("status");//返回状态
String oprId = jsonObject.getStr("opr_id");//操作ID
String resolveTime = jsonObject.getStr("resolve_time");//反馈时间
if (status.equals("SUCCESS")) {
JSONArray datas = jsonObject.getJSONArray("data");//返回数据
for (int j = 0; j < datas.size(); j++) {
JSONObject data = (JSONObject) datas.get(j);
Integer type = data.getInt("type");//类型
String resolveValue = data.getJSONArray("value").get(0).toString();//
String dsp = data.getStr("dsp");//描述
TqApiLog tqApiLog = new TqApiLog();
tqApiLog.setOprId(oprId);
tqApiLog.setType(type);
tqApiLog.setResolveValue(resolveValue);
tqApiLog.setResolveTime(resolveTime);
if (status.equals("SUCCESS")) {
tqApiLog.setResolveStatus("抄表成功");
} else {
tqApiLog.setResolveStatus("抄表失败");
}
tqApiLog.setResolveRemark(dsp);
TqApiLog logEntity = logService.getApiLog(tqApiLog);
if (logEntity != null) {
String cid = logEntity.getCid();
String address = logEntity.getAddress();
ElectricityMeter em = new ElectricityMeter();
em.setCid(cid);
em.setAddress(address);
em.setEleValue(resolveValue);
em.setReadTime(resolveTime);
baseMapper.updateValue(em);
tqApiLog.setId(logEntity.getId());
logService.update(tqApiLog);
syncValueMq(em);
}
}
} else {
TqApiLog tqApiLog = new TqApiLog();
tqApiLog.setOprId(oprId);
tqApiLog.setResolveTime(resolveTime);
tqApiLog.setResolveStatus("抄表失败");
TqApiLog logEntity = logService.getApiLog(tqApiLog);
if (logEntity != null) {
tqApiLog.setId(logEntity.getId());
logService.update(tqApiLog);
}
}
}
} catch (Exception e) {
return "FAIL";
}
return "SUCCESS";
}
/**
* 抄表同步到业务系统
*
@ -676,7 +300,7 @@ public class ElectricityMeterServiceImpl extends ServiceImpl<ElectricityMeterMap
public boolean syncElectricitySaveOrUpdate(String dataSourceCode, ElectricityMeter electricityMeter) {
try {
QueryWrapper<ElectricityMeter> queryWrapper = new QueryWrapper<>();
queryWrapper.eq("cid", electricityMeter.getCid());
queryWrapper.eq("sn", electricityMeter.getSn());
ElectricityMeter oldParam = baseMapper.selectOne(queryWrapper);//查询数据库中该表号数据原始数据
if (oldParam == null) {
baseMapper.insert(electricityMeter);
@ -689,5 +313,93 @@ public class ElectricityMeterServiceImpl extends ServiceImpl<ElectricityMeterMap
return true;
}
/**
* mqtt收到消息并处理
*/
@Override
public void processReceivedMessage(String message){
JSONObject jsonObject = new JSONObject(message);
JSONObject header = (JSONObject)jsonObject.get("header");
JSONObject payload = (JSONObject)jsonObject.get("payload");
String name = header.getStr("name");
String sn = header.getStr("ID");
switch (name){
case "901":
// 抄表
String EgT901 = payload.getStr("EgT");//总有功电能
String RelyF901 = payload.getStr("RelyF");//表状态
String eleValue901 = "0";
String relayState901 = "0";
if(RelyF901.equals("ON")){
relayState901 = "1";
}
if(EgT901.toLowerCase().indexOf("kwh")>0){
eleValue901 = EgT901.substring(0,EgT901.toLowerCase().indexOf("kwh"));
}
ElectricityMeter entity901 = new ElectricityMeter();
entity901.setSn(sn);
entity901.setEleValue(eleValue901);
entity901.setRelayState(relayState901);
entity901.setReadTime(DateUtil.now());
baseMapper.update(entity901);
break;
case "902":
// 拉闸合闸
String result902 = payload.getStr("result");//拉合闸执行结果
if(result902.equals("SUCCESS")){
// //拉合闸成功直接抄表获取表状态
ElectricityMeter dme902 = new ElectricityMeter();
dme902.setSn(sn);
// eleRead(dme902);
ElectricityMeter entity902 = baseMapper.getElectricityMeter(dme902);
if(entity902!=null){
if(entity902.getRelayState().equals("0")){
entity902.setRelayState("1");
}else{
entity902.setRelayState("0");
}
baseMapper.updateRelayState(entity902);
}
}
break;
case "904":
// 参数配置包括清零
String mtype904 = payload.getStr("mtype");//mtype200 TCP信息206 表复位重启208 表清零
String dcode904 = payload.getStr("dcode");
if(dcode904.equals("SUCCESS")){
if(mtype904.equals("208")){
ElectricityMeter entity904 = new ElectricityMeter();
entity904.setSn(sn);
entity904.setEleValue("0");
entity904.setReadTime(DateUtil.now());
baseMapper.updateValue(entity904);
}
}
break;
case "601":
// 定时主动上报表计数据
String EgT601 = payload.getStr("EgT");//总有功电能
String RelyF601 = payload.getStr("RelyF");//表状态
String eleValue601 = "0";
String relayState601 = "0";
if(RelyF601.equals("ON")){
relayState601 = "1";
}
if(EgT601.toLowerCase().indexOf("kwh")>0){
eleValue601 = EgT601.substring(0,EgT601.toLowerCase().indexOf("kwh"));
}
ElectricityMeter entity601 = new ElectricityMeter();
entity601.setSn(sn);
entity601.setEleValue(eleValue601);
entity601.setRelayState(relayState601);
entity601.setReadTime(DateUtil.now());
baseMapper.update(entity601);
break;
case "701":
// 定时主动上报事件数据
break;
}
}
}

View File

@ -0,0 +1,69 @@
package com.nu.modules.tq.utils;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.UUID;
/**
* 配置 MQTT 连接
*/
@Slf4j
@Configuration
@ConfigurationProperties(prefix = "mqtt")
public class MqttConfig {
@Value("${mqtt.broker-url}")
private String brokerUrl;
@Value("${mqtt.client-id}")
private String clientId;
@Value("${mqtt.username}")
private String username;
@Value("${mqtt.password}")
private String password;
@Value("${mqtt.clean-session}")
private boolean cleanSession;
@Value("${mqtt.connection-timeout}")
private int connectionTimeout;
@Value("${mqtt.keep-alive-interval}")
private int keepAliveInterval;
@Bean
public MqttConnectOptions mqttConnectOptions() {
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(cleanSession);
options.setConnectionTimeout(connectionTimeout);
options.setKeepAliveInterval(keepAliveInterval);
options.setAutomaticReconnect(true);
if (username != null && !username.isEmpty()) {
options.setUserName(username);
}
if (password != null && !password.isEmpty()) {
options.setPassword(password.toCharArray());
}
return options;
}
@Bean
public MqttClient mqttClient() throws MqttException {
String mqttClientId = clientId + UUID.randomUUID();
MqttClient client = new MqttClient(brokerUrl, mqttClientId, new MemoryPersistence());
client.connect(mqttConnectOptions());
return client;
}
}

View File

@ -0,0 +1,105 @@
package com.nu.modules.tq.utils;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.nu.modules.tq.electricity.entity.ElectricityMeter;
import com.nu.modules.tq.electricity.service.IElectricityMeterService;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttMessageListener;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Slf4j
@Component
public class MqttMessageHandler implements CommandLineRunner {
@Autowired
private MqttClient mqttClient;
@Autowired
@Lazy
private IElectricityMeterService electricityMeterService;
@Autowired
private ObjectMapper objectMapper;
/**
* 发送下行消息发布1
* @param deviceId 设备ID
* @param message 消息内容
*/
public void publishDownlinkMessage(String deviceId, DownlinkMessage message) throws Exception {
String topic = "BY/dn/" + deviceId;
String payload = objectMapper.writeValueAsString(message);
MqttMessage mqttMessage = new MqttMessage(payload.getBytes());
mqttMessage.setQos(0);
mqttMessage.setRetained(false);
log.info("发布 - 主题: {}, 消息: {}", topic, payload);
mqttClient.publish(topic, mqttMessage);
log.info("发布成功 - 主题: {}, 消息: {}", topic, payload);
}
/**
* 订阅上行消息接收2
* @param deviceId 设备ID
*/
public void subscribeUplinkMessage(String deviceId) throws MqttException {
String topic = "BY/up/" + deviceId;
mqttClient.subscribe(topic, 0, new IMqttMessageListener() {
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
String payload = new String(message.getPayload());
log.info("收到上行消息, 主题: {}, 内容: {}", topic, payload);
// 解析消息
electricityMeterService.processReceivedMessage(payload);
}
});
log.info("已订阅上行主题: {}", topic);
}
/**
* 取消订阅上行消息
* @param deviceId 设备ID
*/
public void unsubscribeUplinkMessage(String deviceId) throws MqttException {
String topic = "BY/up/" + deviceId;
log.info("取消已订阅上行主题: {}", topic);
mqttClient.unsubscribe(topic);
log.info("取消已订阅上行主题成功: {}", topic);
}
@Override
public void run(String... args) throws Exception {
// 启动时订阅示例
List<ElectricityMeter> list = electricityMeterService.findAllList();
for(int i=0;i<list.size();i++){
ElectricityMeter dem = list.get(i);
subscribeUplinkMessage(dem.getSn());
}
log.info("MQTT客户端启动完成");
}
// 下行消息实体类
@Data
public static class DownlinkMessage {
private Header header;
private Map<String, Object> payload = new HashMap<>();
@Data
public static class Header {
private String name;
private String ID;
}
}
}

View File

@ -0,0 +1,84 @@
package com.nu.modules.tq.utils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
/**
* 发布命令参数
*/
@Component
@Slf4j
public class MqttPublisherParams {
/**
* 获取抄表参数
* @return
*/
public MqttMessageHandler.DownlinkMessage getReadingParams(String deviceId){
MqttMessageHandler.DownlinkMessage message = new MqttMessageHandler.DownlinkMessage();
MqttMessageHandler.DownlinkMessage.Header header = new MqttMessageHandler.DownlinkMessage.Header();
header.setName("801"); //801:读取表计数据
header.setID(deviceId);
message.setHeader(header);
Map<String, Object> payload = new HashMap<>();
payload.put("mtype", "101"); //"101"对应上数据中的总有功电量
message.setPayload(payload);
return message;
}
/**
* 拉闸
* @return
*/
public MqttMessageHandler.DownlinkMessage getCutOffParams(String deviceId){
MqttMessageHandler.DownlinkMessage message = new MqttMessageHandler.DownlinkMessage();
MqttMessageHandler.DownlinkMessage.Header header = new MqttMessageHandler.DownlinkMessage.Header();
header.setName("802"); //802:拉合闸命令
header.setID(deviceId);
message.setHeader(header);
Map<String, Object> payload = new HashMap<>();
payload.put("mtype", "OFF"); //mtype信息类型ON---合闸OFF---拉闸KEEPON--保电KEEPOFF--保电取消
payload.put("passwd", "02123456"); //"101"对应上数据中的总有功电量
message.setPayload(payload);
return message;
}
/**
* 合闸
* @return
*/
public MqttMessageHandler.DownlinkMessage getConnectedParams(String deviceId){
MqttMessageHandler.DownlinkMessage message = new MqttMessageHandler.DownlinkMessage();
MqttMessageHandler.DownlinkMessage.Header header = new MqttMessageHandler.DownlinkMessage.Header();
header.setName("802"); //802:拉合闸命令
header.setID(deviceId);
message.setHeader(header);
Map<String, Object> payload = new HashMap<>();
payload.put("mtype", "ON"); //mtype信息类型ON---合闸OFF---拉闸KEEPON--保电KEEPOFF--保电取消
payload.put("passwd", "02123456"); //"101"对应上数据中的总有功电量
message.setPayload(payload);
return message;
}
/**
* 清零
* @return
*/
public MqttMessageHandler.DownlinkMessage getResetParams(String deviceId){
MqttMessageHandler.DownlinkMessage message = new MqttMessageHandler.DownlinkMessage();
MqttMessageHandler.DownlinkMessage.Header header = new MqttMessageHandler.DownlinkMessage.Header();
header.setName("804"); //804:参数配置
header.setID(deviceId);
message.setHeader(header);
Map<String, Object> payload = new HashMap<>();
payload.put("mtype", "208");
payload.put("Code", "A5A5A5A5");
payload.put("passwd", "02123456");
message.setPayload(payload);
return message;
}
}

View File

@ -115,7 +115,7 @@ public class IotSyncBizMQListener {
statusMQDto.setAsyncId(iotCameraInfoMQDto.getId().toString());
statusMQDto.setMessage("成功");
statusMQDto.setPrimaryKey(iotCameraInfoMQDto.getLogId());
statusMQDto.setNote("摄像头");
statusMQDto.setNote("TPLINK");
QueryWrapper<CameraInfo> cameraQw = new QueryWrapper<>();
cameraQw.eq("device_index",iotCameraInfoMQDto.getDeviceIndex());
CameraInfo entity = cameraInfoService.getOne(cameraQw);
@ -138,7 +138,7 @@ public class IotSyncBizMQListener {
statusMQDto.setAsyncId(iotCameraInfoMQDto.getId().toString());
statusMQDto.setMessage("失败");
statusMQDto.setPrimaryKey(iotCameraInfoMQDto.getLogId());
statusMQDto.setNote("摄像头");
statusMQDto.setNote("TPLINK");
statusList.add(statusMQDto);
e.printStackTrace();
}
@ -187,7 +187,7 @@ public class IotSyncBizMQListener {
statusMQDto.setPrimaryKey(iotElectricityMeterMQDto.getLogId());
statusMQDto.setNote("电表");
QueryWrapper<ElectricityMeter> electricityQw = new QueryWrapper<>();
electricityQw.eq("cid",iotElectricityMeterMQDto.getCid());
electricityQw.eq("sn",iotElectricityMeterMQDto.getSn());
ElectricityMeter entity = electricityMeterService.getOne(electricityQw);
if(entity!=null){
if(iotElectricityMeterMQDto.getMaintainStatus().equals(entity.getMaintainStatus())){

View File

@ -51,8 +51,7 @@ public class IotSyncTqMQListener {
statusMQDto.setPrimaryKey(dto.getLogId());
statusMQDto.setNote("电表");
QueryWrapper<ElectricityMeter> emQw = new QueryWrapper<>();
emQw.eq("cid",dto.getCid());
emQw.eq("address",dto.getAddress());
emQw.eq("sn",dto.getSn());
ElectricityMeter entity = electricityMeterService.getOne(emQw);
if(entity!=null){
entity.setEleValue(dto.getEleValue());
@ -90,8 +89,7 @@ public class IotSyncTqMQListener {
statusMQDto.setPrimaryKey(dto.getLogId());
statusMQDto.setNote("电表");
QueryWrapper<ElectricityMeter> emQw = new QueryWrapper<>();
emQw.eq("cid",dto.getCid());
emQw.eq("address",dto.getAddress());
emQw.eq("sn",dto.getSn());
ElectricityMeter entity = electricityMeterService.getOne(emQw);
if(entity!=null){
entity.setRelayState(dto.getRelayState());

View File

@ -409,3 +409,13 @@ nu:
#试验机构信息
master:
code: 101
# MQTT 配置
mqtt:
broker-url: tcp://121.36.88.64:11001
client-id: mqtt-dev-nursing-unit-002
username: admin
password: admin@123..
clean-session: true
connection-timeout: 30
keep-alive-interval: 60

View File

@ -409,3 +409,13 @@ nu:
#试验机构信息
master:
code: 101
# MQTT 配置
mqtt:
broker-url: tcp://121.36.88.64:11001
client-id: mqtt-dev-nursing-unit-003
username: admin
password: admin@123..
clean-session: true
connection-timeout: 30
keep-alive-interval: 60

View File

@ -412,3 +412,13 @@ nu:
#试验机构信息
master:
code: 101
# MQTT 配置
mqtt:
broker-url: tcp://121.36.88.64:11001
client-id: mqtt-dev-nursing-unit-001
username: admin
password: admin@123..
clean-session: true
connection-timeout: 30
keep-alive-interval: 60

View File

@ -409,3 +409,13 @@ nu:
#试验机构信息
master:
code: 101
# MQTT 配置
mqtt:
broker-url: tcp://121.36.88.64:11001
client-id: mqtt-nursing-unit-001
username: admin
password: admin@123..
clean-session: true
connection-timeout: 30
keep-alive-interval: 60

View File

@ -409,3 +409,13 @@ nu:
#试验机构信息
master:
code: 101
# MQTT 配置
mqtt:
broker-url: tcp://121.36.88.64:11001
client-id: mqtt-nursing-unit-002
username: admin
password: admin@123..
clean-session: true
connection-timeout: 30
keep-alive-interval: 60

View File

@ -409,3 +409,13 @@ nu:
#试验机构信息
master:
code: 101
# MQTT 配置
mqtt:
broker-url: tcp://121.36.88.64:11001
client-id: mqtt-nursing-unit-003
username: admin
password: admin@123..
clean-session: true
connection-timeout: 30
keep-alive-interval: 60

View File

@ -409,3 +409,13 @@ nu:
#试验机构信息
master:
code: 101
# MQTT 配置
mqtt:
broker-url: tcp://121.36.88.64:11001
client-id: mqtt-nursing-unit-004
username: admin
password: admin@123..
clean-session: true
connection-timeout: 30
keep-alive-interval: 60