指令同步日志

This commit is contained in:
1378012178@qq.com 2025-07-31 16:24:58 +08:00
parent be5cc5f120
commit 4c9def0952
1 changed files with 165 additions and 135 deletions

View File

@ -2,9 +2,11 @@ package com.nu.mq.directive.listener;
import cn.hutool.core.bean.BeanUtil; import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.collection.ListUtil; import cn.hutool.core.collection.ListUtil;
import com.baomidou.dynamic.datasource.annotation.DSTransactional;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.nu.dto.*; import com.nu.dto.*;
import com.nu.enums.MQStatus;
import com.nu.modules.directivetag.body.entity.DirectiveBodyTag; import com.nu.modules.directivetag.body.entity.DirectiveBodyTag;
import com.nu.modules.directivetag.body.entity.DirectiveBodyTagRelation; import com.nu.modules.directivetag.body.entity.DirectiveBodyTagRelation;
import com.nu.modules.directivetag.body.service.IDirectiveBodyTagService; import com.nu.modules.directivetag.body.service.IDirectiveBodyTagService;
@ -24,6 +26,7 @@ import com.nu.modules.servicetype.service.IConfigServiceTypeService;
import com.nu.modules.sysconfig.entity.SysConfig; import com.nu.modules.sysconfig.entity.SysConfig;
import com.nu.modules.sysconfig.service.ISysConfigService; import com.nu.modules.sysconfig.service.ISysConfigService;
import com.nu.utils.FileDownloader; import com.nu.utils.FileDownloader;
import com.nu.utils.RabbitMQUtil;
import com.nu.utils.SafetyUtil; import com.nu.utils.SafetyUtil;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.compress.utils.Lists; import org.apache.commons.compress.utils.Lists;
@ -36,6 +39,7 @@ import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import java.io.File; import java.io.File;
import java.net.URLEncoder; import java.net.URLEncoder;
@ -70,23 +74,38 @@ public class DirectiveMQListener {
private IConfigServiceCategoryService serviceCategoryService; private IConfigServiceCategoryService serviceCategoryService;
@Autowired @Autowired
private IConfigServiceTypeService serviceTypeService; private IConfigServiceTypeService serviceTypeService;
// @Autowired @Autowired
// private RabbitMQUtil rabbitMQUtil; private RabbitMQUtil rabbitMQUtil;
@RabbitListener( @RabbitListener(bindings = @QueueBinding(value = @Queue(name = "#{directiveAsyncDQNP.getSyncDirectiveQueueName()}"), exchange = @Exchange(name = "hldy.directive", type = ExchangeTypes.DIRECT), key = "#{directiveAsyncDQNP.getSyncDirectiveKeyName()}"), errorHandler = "directiveMQErrorHandler")
bindings = @QueueBinding( @Transactional(rollbackFor = {Exception.class})
value = @Queue(name = "#{directiveAsyncDQNP.getSyncDirectiveQueueName()}"),
exchange = @Exchange(name = "hldy.directive", type = ExchangeTypes.DIRECT),
key = "#{directiveAsyncDQNP.getSyncDirectiveKeyName()}"
),
errorHandler = "directiveMQErrorHandler"
)
public void handleAuditResult(DirectiveMQDto dto) { public void handleAuditResult(DirectiveMQDto dto) {
dto.setIzInc(true); dto.setIzInc(true);
dto.setIdStr(dto.getDirectiveList().stream().map(d -> d.getId()).collect(Collectors.joining(","))); dto.setIdStr(dto.getDirectiveList().stream().map(d -> d.getId()).collect(Collectors.joining(",")));
//增量处理 try {
handleIncremental(dto); //增量处理数据
handleCreateMedia(dto); handleIncremental(dto);
//数据处理出错的话就不能继续处理指令资源了 所以方法放到这个位置
handleCreateMedia(dto);
} catch (Exception e) {
e.printStackTrace();
//返回错误日志
StatusMQDto statusMQDto = new StatusMQDto();
statusMQDto.setStatus(MQStatus.PROCESS_FAILED.getCode());
statusMQDto.setMessage(e.getMessage());
statusMQDto.setAsyncId(dto.getAsyncId());
statusMQDto.setCode("data");
rabbitMQUtil.sendToExchange("hldy.fwzl", "fwzl.async.result", statusMQDto);
throw new RuntimeException("数据同步失败");
}
//在这里返回数据成功和日志
StatusMQDto statusMQDto = new StatusMQDto();
statusMQDto.setStatus(MQStatus.SUCCESS.getCode());
statusMQDto.setMessage("数据同步成功!");
statusMQDto.setAsyncId(dto.getAsyncId());
statusMQDto.setCode("data");
rabbitMQUtil.sendToExchange("hldy.fwzl", "fwzl.async.result", statusMQDto);
//资源日志在handleCreateMedia中有处理
} }
/** /**
@ -94,134 +113,150 @@ public class DirectiveMQListener {
* *
* @param mqDto * @param mqDto
*/ */
@RabbitListener( @RabbitListener(bindings = @QueueBinding(value = @Queue(name = "#{directiveAsyncDQNP.getCreateMediaQueueName()}"), exchange = @Exchange(name = "hldy.directive", type = ExchangeTypes.DIRECT), key = "#{directiveAsyncDQNP.getCreateMediaKeyName()}"), errorHandler = "directiveMQErrorHandler")
bindings = @QueueBinding(
value = @Queue(name = "#{directiveAsyncDQNP.getCreateMediaQueueName()}"),
exchange = @Exchange(name = "hldy.directive", type = ExchangeTypes.DIRECT),
key = "#{directiveAsyncDQNP.getCreateMediaKeyName()}"
),
errorHandler = "directiveMQErrorHandler"
)
public void handleCreateMedia(DirectiveMQDto mqDto) { public void handleCreateMedia(DirectiveMQDto mqDto) {
mqDto.getDirectiveList().stream().forEach(dto -> { try {
//查询现有服务指令 mqDto.getDirectiveList().stream().forEach(dto -> {
QueryWrapper<ConfigServiceDirective> qw = new QueryWrapper<>(); //查询现有服务指令
qw.eq("id", dto.getId()); QueryWrapper<ConfigServiceDirective> qw = new QueryWrapper<>();
ConfigServiceDirective currentDirective = directiveService.getOne(qw); qw.eq("id", dto.getId());
ConfigServiceDirective currentDirective = directiveService.getOne(qw);
//更新服务指令媒体资源字段 //更新服务指令媒体资源字段
ConfigServiceDirective configServiceDirective = new ConfigServiceDirective(); ConfigServiceDirective configServiceDirective = new ConfigServiceDirective();
configServiceDirective.setId(dto.getId()); configServiceDirective.setId(dto.getId());
configServiceDirective.setServiceContent(dto.getServiceContent());//服务指令说明 configServiceDirective.setServiceContent(dto.getServiceContent());//服务指令说明
//拉取媒体资源至本地目录 //拉取媒体资源至本地目录
{ {
//接口协议域名+上下文访问路径 //接口协议域名+上下文访问路径
String baseUrl = dto.getApi(); String baseUrl = dto.getApi();
//处理服务指令图片 //处理服务指令图片
if (!dto.getPreviewFileMd5().equals(currentDirective.getPreviewFileMd5())) { if (!dto.getPreviewFileMd5().equals(currentDirective.getPreviewFileMd5())) {
String previewFile = dto.getPreviewFile(); String previewFile = dto.getPreviewFile();
if (StringUtils.isNotBlank(previewFile)) { if (StringUtils.isNotBlank(previewFile)) {
String url = baseUrl + "/sys/common/open/static/" + URLEncoder.encode(previewFile, StandardCharsets.UTF_8).replace("%2F", "/") + "?name=" + SafetyUtil.getSecureKey(); String url = baseUrl + "/sys/common/open/static/" + URLEncoder.encode(previewFile, StandardCharsets.UTF_8).replace("%2F", "/") + "?name=" + SafetyUtil.getSecureKey();
if (upLoadPath.endsWith("/") || upLoadPath.endsWith("\\")) { if (upLoadPath.endsWith("/") || upLoadPath.endsWith("\\")) {
upLoadPath = upLoadPath.substring(0, upLoadPath.length() - 1); upLoadPath = upLoadPath.substring(0, upLoadPath.length() - 1);
} }
String filePath = previewFile.substring(0, previewFile.lastIndexOf("/")); String filePath = previewFile.substring(0, previewFile.lastIndexOf("/"));
String fileName = previewFile.substring(previewFile.lastIndexOf("/") + 1); String fileName = previewFile.substring(previewFile.lastIndexOf("/") + 1);
if (filePath.startsWith("/") || filePath.startsWith("\\")) { if (filePath.startsWith("/") || filePath.startsWith("\\")) {
filePath = filePath.substring(1); filePath = filePath.substring(1);
} }
try { try {
FileDownloader.downloadFile(url, upLoadPath + File.separator + filePath, fileName); FileDownloader.downloadFile(url, upLoadPath + File.separator + filePath, fileName);
configServiceDirective.setPreviewFile(dto.getPreviewFile());//服务指令图片 configServiceDirective.setPreviewFile(dto.getPreviewFile());//服务指令图片
configServiceDirective.setPreviewFileMd5(dto.getPreviewFileMd5()); configServiceDirective.setPreviewFileMd5(dto.getPreviewFileMd5());
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
MediaAsyncErrorLog mediaAsyncErrorLog = new MediaAsyncErrorLog(); MediaAsyncErrorLog mediaAsyncErrorLog = new MediaAsyncErrorLog();
mediaAsyncErrorLog.setMediaid(previewFile); mediaAsyncErrorLog.setMediaid(previewFile);
mediaAsyncErrorLogService.save(mediaAsyncErrorLog); mediaAsyncErrorLogService.save(mediaAsyncErrorLog);
throw new RuntimeException("服务指令图片文件拉取错误,指令id" + currentDirective.getId());
}
} }
} }
} //处理即时指令图标
//处理即时指令图标 if (!dto.getImmediateFileMd5().equals(currentDirective.getImmediateFileMd5())) {
if (!dto.getImmediateFileMd5().equals(currentDirective.getImmediateFileMd5())) { String immediateFile = dto.getImmediateFile();
String immediateFile = dto.getImmediateFile(); if (StringUtils.isNotBlank(immediateFile)) {
if (StringUtils.isNotBlank(immediateFile)) { String url = baseUrl + "/sys/common/open/static/" + URLEncoder.encode(immediateFile, StandardCharsets.UTF_8).replace("%2F", "/") + "?name=" + SafetyUtil.getSecureKey();
String url = baseUrl + "/sys/common/open/static/" + URLEncoder.encode(immediateFile, StandardCharsets.UTF_8).replace("%2F", "/") + "?name=" + SafetyUtil.getSecureKey(); if (upLoadPath.endsWith("/") || upLoadPath.endsWith("\\")) {
if (upLoadPath.endsWith("/") || upLoadPath.endsWith("\\")) { upLoadPath = upLoadPath.substring(0, upLoadPath.length() - 1);
upLoadPath = upLoadPath.substring(0, upLoadPath.length() - 1); }
} String filePath = immediateFile.substring(0, immediateFile.lastIndexOf("/"));
String filePath = immediateFile.substring(0, immediateFile.lastIndexOf("/")); String fileName = immediateFile.substring(immediateFile.lastIndexOf("/") + 1);
String fileName = immediateFile.substring(immediateFile.lastIndexOf("/") + 1); if (filePath.startsWith("/") || filePath.startsWith("\\")) {
if (filePath.startsWith("/") || filePath.startsWith("\\")) { filePath = filePath.substring(1);
filePath = filePath.substring(1); }
} try {
try { FileDownloader.downloadFile(url, upLoadPath + File.separator + filePath, fileName);
FileDownloader.downloadFile(url, upLoadPath + File.separator + filePath, fileName); configServiceDirective.setImmediateFile(dto.getImmediateFile());//即时指令图标
configServiceDirective.setImmediateFile(dto.getImmediateFile());//即时指令图标 configServiceDirective.setImmediateFileMd5(dto.getImmediateFileMd5());
configServiceDirective.setImmediateFileMd5(dto.getImmediateFileMd5()); } catch (Exception e) {
} catch (Exception e) { MediaAsyncErrorLog mediaAsyncErrorLog = new MediaAsyncErrorLog();
MediaAsyncErrorLog mediaAsyncErrorLog = new MediaAsyncErrorLog(); mediaAsyncErrorLog.setMediaid(immediateFile);
mediaAsyncErrorLog.setMediaid(immediateFile); mediaAsyncErrorLogService.save(mediaAsyncErrorLog);
mediaAsyncErrorLogService.save(mediaAsyncErrorLog); e.printStackTrace();
e.printStackTrace(); throw new RuntimeException("即时指令图标文件拉取错误,指令id" + currentDirective.getId());
}
} }
} }
} //处理指令音频文件
//处理指令音频文件 if (!dto.getMp3FileMd5().equals(currentDirective.getMp3FileMd5())) {
if (!dto.getMp3FileMd5().equals(currentDirective.getMp3FileMd5())) { String mp3File = dto.getMp3File();
String mp3File = dto.getMp3File(); if (StringUtils.isNotBlank(mp3File)) {
if (StringUtils.isNotBlank(mp3File)) { String url = baseUrl + "/sys/common/open/static/" + URLEncoder.encode(mp3File, StandardCharsets.UTF_8).replace("%2F", "/") + "?name=" + SafetyUtil.getSecureKey();
String url = baseUrl + "/sys/common/open/static/" + URLEncoder.encode(mp3File, StandardCharsets.UTF_8).replace("%2F", "/") + "?name=" + SafetyUtil.getSecureKey(); if (upLoadPath.endsWith("/") || upLoadPath.endsWith("\\")) {
if (upLoadPath.endsWith("/") || upLoadPath.endsWith("\\")) { upLoadPath = upLoadPath.substring(0, upLoadPath.length() - 1);
upLoadPath = upLoadPath.substring(0, upLoadPath.length() - 1); }
} String filePath = mp3File.substring(0, mp3File.lastIndexOf("/"));
String filePath = mp3File.substring(0, mp3File.lastIndexOf("/")); String fileName = mp3File.substring(mp3File.lastIndexOf("/") + 1);
String fileName = mp3File.substring(mp3File.lastIndexOf("/") + 1); if (filePath.startsWith("/") || filePath.startsWith("\\")) {
if (filePath.startsWith("/") || filePath.startsWith("\\")) { filePath = filePath.substring(1);
filePath = filePath.substring(1); }
} try {
try { FileDownloader.downloadFile(url, upLoadPath + File.separator + filePath, fileName);
FileDownloader.downloadFile(url, upLoadPath + File.separator + filePath, fileName); configServiceDirective.setMp3File(dto.getMp3File());//指令音频文件
configServiceDirective.setMp3File(dto.getMp3File());//指令音频文件 configServiceDirective.setMp3FileMd5(dto.getMp3FileMd5());
configServiceDirective.setMp3FileMd5(dto.getMp3FileMd5()); } catch (Exception e) {
} catch (Exception e) { MediaAsyncErrorLog mediaAsyncErrorLog = new MediaAsyncErrorLog();
MediaAsyncErrorLog mediaAsyncErrorLog = new MediaAsyncErrorLog(); mediaAsyncErrorLog.setMediaid(mp3File);
mediaAsyncErrorLog.setMediaid(mp3File); mediaAsyncErrorLogService.save(mediaAsyncErrorLog);
mediaAsyncErrorLogService.save(mediaAsyncErrorLog); e.printStackTrace();
e.printStackTrace(); throw new RuntimeException("指令音频文件拉取错误,指令id" + currentDirective.getId());
}
} }
} }
} //处理指令视频文件
//处理指令视频文件 if (!dto.getMp4FileMd5().equals(currentDirective.getMp4FileMd5())) {
if (!dto.getMp4FileMd5().equals(currentDirective.getMp4FileMd5())) { String mp4File = dto.getMp4File();
String mp4File = dto.getMp4File(); if (StringUtils.isNotBlank(mp4File)) {
if (StringUtils.isNotBlank(mp4File)) { String url = baseUrl + "/sys/common/open/static/" + URLEncoder.encode(mp4File, StandardCharsets.UTF_8).replace("%2F", "/") + "?name=" + SafetyUtil.getSecureKey();
String url = baseUrl + "/sys/common/open/static/" + URLEncoder.encode(mp4File, StandardCharsets.UTF_8).replace("%2F", "/") + "?name=" + SafetyUtil.getSecureKey(); if (upLoadPath.endsWith("/") || upLoadPath.endsWith("\\")) {
if (upLoadPath.endsWith("/") || upLoadPath.endsWith("\\")) { upLoadPath = upLoadPath.substring(0, upLoadPath.length() - 1);
upLoadPath = upLoadPath.substring(0, upLoadPath.length() - 1); }
} String filePath = mp4File.substring(0, mp4File.lastIndexOf("/"));
String filePath = mp4File.substring(0, mp4File.lastIndexOf("/")); String fileName = mp4File.substring(mp4File.lastIndexOf("/") + 1);
String fileName = mp4File.substring(mp4File.lastIndexOf("/") + 1); if (filePath.startsWith("/") || filePath.startsWith("\\")) {
if (filePath.startsWith("/") || filePath.startsWith("\\")) { filePath = filePath.substring(1);
filePath = filePath.substring(1); }
} try {
try { FileDownloader.downloadFile(url, upLoadPath + File.separator + filePath, fileName);
FileDownloader.downloadFile(url, upLoadPath + File.separator + filePath, fileName); configServiceDirective.setMp4File(dto.getMp4File());//指令视频文件
configServiceDirective.setMp4File(dto.getMp4File());//指令视频文件 configServiceDirective.setMp4FileMd5(dto.getMp4FileMd5());
configServiceDirective.setMp4FileMd5(dto.getMp4FileMd5()); } catch (Exception e) {
} catch (Exception e) { MediaAsyncErrorLog mediaAsyncErrorLog = new MediaAsyncErrorLog();
MediaAsyncErrorLog mediaAsyncErrorLog = new MediaAsyncErrorLog(); mediaAsyncErrorLog.setMediaid(mp4File);
mediaAsyncErrorLog.setMediaid(mp4File); mediaAsyncErrorLogService.save(mediaAsyncErrorLog);
mediaAsyncErrorLogService.save(mediaAsyncErrorLog); e.printStackTrace();
e.printStackTrace(); throw new RuntimeException("指令视频文件拉取错误,指令id" + currentDirective.getId());
}
} }
} }
directiveService.updateById(configServiceDirective);
} }
directiveService.updateById(configServiceDirective); });
} //在这里返回数据成功和日志
}); StatusMQDto statusMQDto = new StatusMQDto();
statusMQDto.setStatus(MQStatus.SUCCESS.getCode());
statusMQDto.setMessage("文件同步成功!");
statusMQDto.setAsyncId(mqDto.getAsyncId());
statusMQDto.setCode("file");
rabbitMQUtil.sendToExchange("hldy.fwzl", "fwzl.async.result", statusMQDto);
} catch (Exception e) {
e.printStackTrace();
//返回错误日志
StatusMQDto statusMQDto = new StatusMQDto();
statusMQDto.setStatus(MQStatus.PROCESS_FAILED.getCode());
statusMQDto.setMessage(e.getMessage());
statusMQDto.setAsyncId(mqDto.getAsyncId());
statusMQDto.setCode("file");
rabbitMQUtil.sendToExchange("hldy.fwzl", "fwzl.async.result", statusMQDto);
}
} }
// @RabbitListener( // @RabbitListener(
// bindings = @QueueBinding( // bindings = @QueueBinding(
@ -471,17 +506,12 @@ public class DirectiveMQListener {
QueryWrapper<ConfigServiceDirective> dtw = new QueryWrapper<>(); QueryWrapper<ConfigServiceDirective> dtw = new QueryWrapper<>();
dtw.select("id"); dtw.select("id");
List<ConfigServiceDirective> tempList = directiveService.list(dtw); List<ConfigServiceDirective> tempList = directiveService.list(dtw);
Set<String> existingIds = tempList.stream() Set<String> existingIds = tempList.stream().map(ConfigServiceDirective::getId).map(String::valueOf).collect(Collectors.toSet());
.map(ConfigServiceDirective::getId)
.map(String::valueOf)
.collect(Collectors.toSet());
String idStr = dto.getIdStr(); String idStr = dto.getIdStr();
List<String> inputIds = Arrays.asList(idStr.split(",")); List<String> inputIds = Arrays.asList(idStr.split(","));
List<String> uniqueIds = inputIds.stream() List<String> uniqueIds = inputIds.stream().filter(id -> !existingIds.contains(id)).collect(Collectors.toList());
.filter(id -> !existingIds.contains(id))
.collect(Collectors.toList());
dto.setIdList(uniqueIds); dto.setIdList(uniqueIds);