diff --git a/nursing-unit-services/nu-services-biz/src/main/java/com/nu/mq/directive/listener/DirectiveMQListener.java b/nursing-unit-services/nu-services-biz/src/main/java/com/nu/mq/directive/listener/DirectiveMQListener.java index 1a418ce..5de34a3 100644 --- a/nursing-unit-services/nu-services-biz/src/main/java/com/nu/mq/directive/listener/DirectiveMQListener.java +++ b/nursing-unit-services/nu-services-biz/src/main/java/com/nu/mq/directive/listener/DirectiveMQListener.java @@ -2,9 +2,11 @@ package com.nu.mq.directive.listener; import cn.hutool.core.bean.BeanUtil; import cn.hutool.core.collection.ListUtil; +import com.baomidou.dynamic.datasource.annotation.DSTransactional; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.nu.dto.*; +import com.nu.enums.MQStatus; import com.nu.modules.directivetag.body.entity.DirectiveBodyTag; import com.nu.modules.directivetag.body.entity.DirectiveBodyTagRelation; import com.nu.modules.directivetag.body.service.IDirectiveBodyTagService; @@ -24,6 +26,7 @@ import com.nu.modules.servicetype.service.IConfigServiceTypeService; import com.nu.modules.sysconfig.entity.SysConfig; import com.nu.modules.sysconfig.service.ISysConfigService; import com.nu.utils.FileDownloader; +import com.nu.utils.RabbitMQUtil; import com.nu.utils.SafetyUtil; import lombok.extern.slf4j.Slf4j; import org.apache.commons.compress.utils.Lists; @@ -36,6 +39,7 @@ import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Transactional; import java.io.File; import java.net.URLEncoder; @@ -70,23 +74,38 @@ public class DirectiveMQListener { private IConfigServiceCategoryService serviceCategoryService; @Autowired private IConfigServiceTypeService serviceTypeService; -// @Autowired -// private RabbitMQUtil rabbitMQUtil; + @Autowired + private RabbitMQUtil rabbitMQUtil; - @RabbitListener( - bindings = @QueueBinding( - value = @Queue(name = "#{directiveAsyncDQNP.getSyncDirectiveQueueName()}"), - exchange = @Exchange(name = "hldy.directive", type = ExchangeTypes.DIRECT), - key = "#{directiveAsyncDQNP.getSyncDirectiveKeyName()}" - ), - errorHandler = "directiveMQErrorHandler" - ) + @RabbitListener(bindings = @QueueBinding(value = @Queue(name = "#{directiveAsyncDQNP.getSyncDirectiveQueueName()}"), exchange = @Exchange(name = "hldy.directive", type = ExchangeTypes.DIRECT), key = "#{directiveAsyncDQNP.getSyncDirectiveKeyName()}"), errorHandler = "directiveMQErrorHandler") + @Transactional(rollbackFor = {Exception.class}) public void handleAuditResult(DirectiveMQDto dto) { dto.setIzInc(true); dto.setIdStr(dto.getDirectiveList().stream().map(d -> d.getId()).collect(Collectors.joining(","))); - //增量处理 - handleIncremental(dto); - handleCreateMedia(dto); + try { + //增量处理数据 + handleIncremental(dto); + //数据处理出错的话就不能继续处理指令资源了 所以方法放到这个位置 + handleCreateMedia(dto); + } catch (Exception e) { + e.printStackTrace(); + //返回错误日志 + StatusMQDto statusMQDto = new StatusMQDto(); + statusMQDto.setStatus(MQStatus.PROCESS_FAILED.getCode()); + statusMQDto.setMessage(e.getMessage()); + statusMQDto.setAsyncId(dto.getAsyncId()); + statusMQDto.setCode("data"); + rabbitMQUtil.sendToExchange("hldy.fwzl", "fwzl.async.result", statusMQDto); + throw new RuntimeException("数据同步失败"); + } + //在这里返回数据成功和日志 + StatusMQDto statusMQDto = new StatusMQDto(); + statusMQDto.setStatus(MQStatus.SUCCESS.getCode()); + statusMQDto.setMessage("数据同步成功!"); + statusMQDto.setAsyncId(dto.getAsyncId()); + statusMQDto.setCode("data"); + rabbitMQUtil.sendToExchange("hldy.fwzl", "fwzl.async.result", statusMQDto); + //资源日志在handleCreateMedia中有处理 } /** @@ -94,134 +113,150 @@ public class DirectiveMQListener { * * @param mqDto */ - @RabbitListener( - bindings = @QueueBinding( - value = @Queue(name = "#{directiveAsyncDQNP.getCreateMediaQueueName()}"), - exchange = @Exchange(name = "hldy.directive", type = ExchangeTypes.DIRECT), - key = "#{directiveAsyncDQNP.getCreateMediaKeyName()}" - ), - errorHandler = "directiveMQErrorHandler" - ) + @RabbitListener(bindings = @QueueBinding(value = @Queue(name = "#{directiveAsyncDQNP.getCreateMediaQueueName()}"), exchange = @Exchange(name = "hldy.directive", type = ExchangeTypes.DIRECT), key = "#{directiveAsyncDQNP.getCreateMediaKeyName()}"), errorHandler = "directiveMQErrorHandler") public void handleCreateMedia(DirectiveMQDto mqDto) { - mqDto.getDirectiveList().stream().forEach(dto -> { - //查询现有服务指令 - QueryWrapper qw = new QueryWrapper<>(); - qw.eq("id", dto.getId()); - ConfigServiceDirective currentDirective = directiveService.getOne(qw); + try { + mqDto.getDirectiveList().stream().forEach(dto -> { + //查询现有服务指令 + QueryWrapper qw = new QueryWrapper<>(); + qw.eq("id", dto.getId()); + ConfigServiceDirective currentDirective = directiveService.getOne(qw); - //更新服务指令媒体资源字段 - ConfigServiceDirective configServiceDirective = new ConfigServiceDirective(); - configServiceDirective.setId(dto.getId()); - configServiceDirective.setServiceContent(dto.getServiceContent());//服务指令说明 + //更新服务指令媒体资源字段 + ConfigServiceDirective configServiceDirective = new ConfigServiceDirective(); + configServiceDirective.setId(dto.getId()); + configServiceDirective.setServiceContent(dto.getServiceContent());//服务指令说明 - //拉取媒体资源至本地目录 - { - //接口协议域名+上下文访问路径 - String baseUrl = dto.getApi(); + //拉取媒体资源至本地目录 + { + //接口协议域名+上下文访问路径 + String baseUrl = dto.getApi(); - //处理服务指令图片 - if (!dto.getPreviewFileMd5().equals(currentDirective.getPreviewFileMd5())) { - String previewFile = dto.getPreviewFile(); - if (StringUtils.isNotBlank(previewFile)) { - String url = baseUrl + "/sys/common/open/static/" + URLEncoder.encode(previewFile, StandardCharsets.UTF_8).replace("%2F", "/") + "?name=" + SafetyUtil.getSecureKey(); - if (upLoadPath.endsWith("/") || upLoadPath.endsWith("\\")) { - upLoadPath = upLoadPath.substring(0, upLoadPath.length() - 1); - } - String filePath = previewFile.substring(0, previewFile.lastIndexOf("/")); - String fileName = previewFile.substring(previewFile.lastIndexOf("/") + 1); - if (filePath.startsWith("/") || filePath.startsWith("\\")) { - filePath = filePath.substring(1); - } - try { - FileDownloader.downloadFile(url, upLoadPath + File.separator + filePath, fileName); - configServiceDirective.setPreviewFile(dto.getPreviewFile());//服务指令图片 - configServiceDirective.setPreviewFileMd5(dto.getPreviewFileMd5()); - } catch (Exception e) { - e.printStackTrace(); - MediaAsyncErrorLog mediaAsyncErrorLog = new MediaAsyncErrorLog(); - mediaAsyncErrorLog.setMediaid(previewFile); - mediaAsyncErrorLogService.save(mediaAsyncErrorLog); + //处理服务指令图片 + if (!dto.getPreviewFileMd5().equals(currentDirective.getPreviewFileMd5())) { + String previewFile = dto.getPreviewFile(); + if (StringUtils.isNotBlank(previewFile)) { + String url = baseUrl + "/sys/common/open/static/" + URLEncoder.encode(previewFile, StandardCharsets.UTF_8).replace("%2F", "/") + "?name=" + SafetyUtil.getSecureKey(); + if (upLoadPath.endsWith("/") || upLoadPath.endsWith("\\")) { + upLoadPath = upLoadPath.substring(0, upLoadPath.length() - 1); + } + String filePath = previewFile.substring(0, previewFile.lastIndexOf("/")); + String fileName = previewFile.substring(previewFile.lastIndexOf("/") + 1); + if (filePath.startsWith("/") || filePath.startsWith("\\")) { + filePath = filePath.substring(1); + } + try { + FileDownloader.downloadFile(url, upLoadPath + File.separator + filePath, fileName); + configServiceDirective.setPreviewFile(dto.getPreviewFile());//服务指令图片 + configServiceDirective.setPreviewFileMd5(dto.getPreviewFileMd5()); + } catch (Exception e) { + e.printStackTrace(); + MediaAsyncErrorLog mediaAsyncErrorLog = new MediaAsyncErrorLog(); + mediaAsyncErrorLog.setMediaid(previewFile); + mediaAsyncErrorLogService.save(mediaAsyncErrorLog); + throw new RuntimeException("服务指令图片文件拉取错误,指令id" + currentDirective.getId()); + } } } - } - //处理即时指令图标 - if (!dto.getImmediateFileMd5().equals(currentDirective.getImmediateFileMd5())) { - String immediateFile = dto.getImmediateFile(); - if (StringUtils.isNotBlank(immediateFile)) { - String url = baseUrl + "/sys/common/open/static/" + URLEncoder.encode(immediateFile, StandardCharsets.UTF_8).replace("%2F", "/") + "?name=" + SafetyUtil.getSecureKey(); - if (upLoadPath.endsWith("/") || upLoadPath.endsWith("\\")) { - upLoadPath = upLoadPath.substring(0, upLoadPath.length() - 1); - } - String filePath = immediateFile.substring(0, immediateFile.lastIndexOf("/")); - String fileName = immediateFile.substring(immediateFile.lastIndexOf("/") + 1); - if (filePath.startsWith("/") || filePath.startsWith("\\")) { - filePath = filePath.substring(1); - } - try { - FileDownloader.downloadFile(url, upLoadPath + File.separator + filePath, fileName); - configServiceDirective.setImmediateFile(dto.getImmediateFile());//即时指令图标 - configServiceDirective.setImmediateFileMd5(dto.getImmediateFileMd5()); - } catch (Exception e) { - MediaAsyncErrorLog mediaAsyncErrorLog = new MediaAsyncErrorLog(); - mediaAsyncErrorLog.setMediaid(immediateFile); - mediaAsyncErrorLogService.save(mediaAsyncErrorLog); - e.printStackTrace(); + //处理即时指令图标 + if (!dto.getImmediateFileMd5().equals(currentDirective.getImmediateFileMd5())) { + String immediateFile = dto.getImmediateFile(); + if (StringUtils.isNotBlank(immediateFile)) { + String url = baseUrl + "/sys/common/open/static/" + URLEncoder.encode(immediateFile, StandardCharsets.UTF_8).replace("%2F", "/") + "?name=" + SafetyUtil.getSecureKey(); + if (upLoadPath.endsWith("/") || upLoadPath.endsWith("\\")) { + upLoadPath = upLoadPath.substring(0, upLoadPath.length() - 1); + } + String filePath = immediateFile.substring(0, immediateFile.lastIndexOf("/")); + String fileName = immediateFile.substring(immediateFile.lastIndexOf("/") + 1); + if (filePath.startsWith("/") || filePath.startsWith("\\")) { + filePath = filePath.substring(1); + } + try { + FileDownloader.downloadFile(url, upLoadPath + File.separator + filePath, fileName); + configServiceDirective.setImmediateFile(dto.getImmediateFile());//即时指令图标 + configServiceDirective.setImmediateFileMd5(dto.getImmediateFileMd5()); + } catch (Exception e) { + MediaAsyncErrorLog mediaAsyncErrorLog = new MediaAsyncErrorLog(); + mediaAsyncErrorLog.setMediaid(immediateFile); + mediaAsyncErrorLogService.save(mediaAsyncErrorLog); + e.printStackTrace(); + throw new RuntimeException("即时指令图标文件拉取错误,指令id" + currentDirective.getId()); + } } } - } - //处理指令音频文件 - if (!dto.getMp3FileMd5().equals(currentDirective.getMp3FileMd5())) { - String mp3File = dto.getMp3File(); - if (StringUtils.isNotBlank(mp3File)) { - String url = baseUrl + "/sys/common/open/static/" + URLEncoder.encode(mp3File, StandardCharsets.UTF_8).replace("%2F", "/") + "?name=" + SafetyUtil.getSecureKey(); - if (upLoadPath.endsWith("/") || upLoadPath.endsWith("\\")) { - upLoadPath = upLoadPath.substring(0, upLoadPath.length() - 1); - } - String filePath = mp3File.substring(0, mp3File.lastIndexOf("/")); - String fileName = mp3File.substring(mp3File.lastIndexOf("/") + 1); - if (filePath.startsWith("/") || filePath.startsWith("\\")) { - filePath = filePath.substring(1); - } - try { - FileDownloader.downloadFile(url, upLoadPath + File.separator + filePath, fileName); - configServiceDirective.setMp3File(dto.getMp3File());//指令音频文件 - configServiceDirective.setMp3FileMd5(dto.getMp3FileMd5()); - } catch (Exception e) { - MediaAsyncErrorLog mediaAsyncErrorLog = new MediaAsyncErrorLog(); - mediaAsyncErrorLog.setMediaid(mp3File); - mediaAsyncErrorLogService.save(mediaAsyncErrorLog); - e.printStackTrace(); + //处理指令音频文件 + if (!dto.getMp3FileMd5().equals(currentDirective.getMp3FileMd5())) { + String mp3File = dto.getMp3File(); + if (StringUtils.isNotBlank(mp3File)) { + String url = baseUrl + "/sys/common/open/static/" + URLEncoder.encode(mp3File, StandardCharsets.UTF_8).replace("%2F", "/") + "?name=" + SafetyUtil.getSecureKey(); + if (upLoadPath.endsWith("/") || upLoadPath.endsWith("\\")) { + upLoadPath = upLoadPath.substring(0, upLoadPath.length() - 1); + } + String filePath = mp3File.substring(0, mp3File.lastIndexOf("/")); + String fileName = mp3File.substring(mp3File.lastIndexOf("/") + 1); + if (filePath.startsWith("/") || filePath.startsWith("\\")) { + filePath = filePath.substring(1); + } + try { + FileDownloader.downloadFile(url, upLoadPath + File.separator + filePath, fileName); + configServiceDirective.setMp3File(dto.getMp3File());//指令音频文件 + configServiceDirective.setMp3FileMd5(dto.getMp3FileMd5()); + } catch (Exception e) { + MediaAsyncErrorLog mediaAsyncErrorLog = new MediaAsyncErrorLog(); + mediaAsyncErrorLog.setMediaid(mp3File); + mediaAsyncErrorLogService.save(mediaAsyncErrorLog); + e.printStackTrace(); + throw new RuntimeException("指令音频文件拉取错误,指令id" + currentDirective.getId()); + } } } - } - //处理指令视频文件 - if (!dto.getMp4FileMd5().equals(currentDirective.getMp4FileMd5())) { - String mp4File = dto.getMp4File(); - if (StringUtils.isNotBlank(mp4File)) { - String url = baseUrl + "/sys/common/open/static/" + URLEncoder.encode(mp4File, StandardCharsets.UTF_8).replace("%2F", "/") + "?name=" + SafetyUtil.getSecureKey(); - if (upLoadPath.endsWith("/") || upLoadPath.endsWith("\\")) { - upLoadPath = upLoadPath.substring(0, upLoadPath.length() - 1); - } - String filePath = mp4File.substring(0, mp4File.lastIndexOf("/")); - String fileName = mp4File.substring(mp4File.lastIndexOf("/") + 1); - if (filePath.startsWith("/") || filePath.startsWith("\\")) { - filePath = filePath.substring(1); - } - try { - FileDownloader.downloadFile(url, upLoadPath + File.separator + filePath, fileName); - configServiceDirective.setMp4File(dto.getMp4File());//指令视频文件 - configServiceDirective.setMp4FileMd5(dto.getMp4FileMd5()); - } catch (Exception e) { - MediaAsyncErrorLog mediaAsyncErrorLog = new MediaAsyncErrorLog(); - mediaAsyncErrorLog.setMediaid(mp4File); - mediaAsyncErrorLogService.save(mediaAsyncErrorLog); - e.printStackTrace(); + //处理指令视频文件 + if (!dto.getMp4FileMd5().equals(currentDirective.getMp4FileMd5())) { + String mp4File = dto.getMp4File(); + if (StringUtils.isNotBlank(mp4File)) { + String url = baseUrl + "/sys/common/open/static/" + URLEncoder.encode(mp4File, StandardCharsets.UTF_8).replace("%2F", "/") + "?name=" + SafetyUtil.getSecureKey(); + if (upLoadPath.endsWith("/") || upLoadPath.endsWith("\\")) { + upLoadPath = upLoadPath.substring(0, upLoadPath.length() - 1); + } + String filePath = mp4File.substring(0, mp4File.lastIndexOf("/")); + String fileName = mp4File.substring(mp4File.lastIndexOf("/") + 1); + if (filePath.startsWith("/") || filePath.startsWith("\\")) { + filePath = filePath.substring(1); + } + try { + FileDownloader.downloadFile(url, upLoadPath + File.separator + filePath, fileName); + configServiceDirective.setMp4File(dto.getMp4File());//指令视频文件 + configServiceDirective.setMp4FileMd5(dto.getMp4FileMd5()); + } catch (Exception e) { + MediaAsyncErrorLog mediaAsyncErrorLog = new MediaAsyncErrorLog(); + mediaAsyncErrorLog.setMediaid(mp4File); + mediaAsyncErrorLogService.save(mediaAsyncErrorLog); + e.printStackTrace(); + throw new RuntimeException("指令视频文件拉取错误,指令id" + currentDirective.getId()); + } } } + directiveService.updateById(configServiceDirective); } - directiveService.updateById(configServiceDirective); - } - }); + }); + //在这里返回数据成功和日志 + StatusMQDto statusMQDto = new StatusMQDto(); + statusMQDto.setStatus(MQStatus.SUCCESS.getCode()); + statusMQDto.setMessage("文件同步成功!"); + statusMQDto.setAsyncId(mqDto.getAsyncId()); + statusMQDto.setCode("file"); + rabbitMQUtil.sendToExchange("hldy.fwzl", "fwzl.async.result", statusMQDto); + } catch (Exception e) { + e.printStackTrace(); + //返回错误日志 + StatusMQDto statusMQDto = new StatusMQDto(); + statusMQDto.setStatus(MQStatus.PROCESS_FAILED.getCode()); + statusMQDto.setMessage(e.getMessage()); + statusMQDto.setAsyncId(mqDto.getAsyncId()); + statusMQDto.setCode("file"); + rabbitMQUtil.sendToExchange("hldy.fwzl", "fwzl.async.result", statusMQDto); + } + } // @RabbitListener( // bindings = @QueueBinding( @@ -471,17 +506,12 @@ public class DirectiveMQListener { QueryWrapper dtw = new QueryWrapper<>(); dtw.select("id"); List tempList = directiveService.list(dtw); - Set existingIds = tempList.stream() - .map(ConfigServiceDirective::getId) - .map(String::valueOf) - .collect(Collectors.toSet()); + Set existingIds = tempList.stream().map(ConfigServiceDirective::getId).map(String::valueOf).collect(Collectors.toSet()); String idStr = dto.getIdStr(); List inputIds = Arrays.asList(idStr.split(",")); - List uniqueIds = inputIds.stream() - .filter(id -> !existingIds.contains(id)) - .collect(Collectors.toList()); + List uniqueIds = inputIds.stream().filter(id -> !existingIds.contains(id)).collect(Collectors.toList()); dto.setIdList(uniqueIds);