parent
48840cffd2
commit
b6fdc7d5d4
|
|
@ -211,6 +211,7 @@ public class CareDirectivePlanServiceImpl extends ServiceImpl<CareDirectivePlanM
|
|||
upData.setPositioning(careDirectiveEntity.getPositioning());
|
||||
upData.setPositioningLong(careDirectiveEntity.getPositioningLong());
|
||||
upData.setTagName(careDirectiveEntity.getTagName());
|
||||
upData.setStartTime(careDirectiveEntity.getStartTime());
|
||||
|
||||
UpdateWrapper<CareDirectivePlan> uw = new UpdateWrapper<>();
|
||||
uw.eq("nu_id", careDirectiveEntity.getNuId());
|
||||
|
|
|
|||
|
|
@ -83,9 +83,6 @@ public class InstructionTagController extends JeecgController<InstructionTag, II
|
|||
instructionTag.setId(instructionTag.getInstructionType());
|
||||
instructionTag.setSysOrgCode(deptInfo.getString("code"));
|
||||
instructionTagService.save(instructionTag);
|
||||
// InstructionTagMQDto instructionTagMQDto = new InstructionTagMQDto();
|
||||
// BeanUtils.copyProperties(instructionTag,instructionTagMQDto);
|
||||
// rabbitMQUtil.sendToExchange("hldy.instructionTag.add.fanout", "", instructionTagMQDto);
|
||||
return Result.OK("添加成功!");
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -4,8 +4,6 @@ import cn.hutool.core.bean.BeanUtil;
|
|||
import com.baomidou.dynamic.datasource.annotation.DS;
|
||||
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
|
||||
import com.nu.dto.DirectiveMQDto;
|
||||
import com.nu.dto.StatusMQDto;
|
||||
import com.nu.enums.MQStatus;
|
||||
import com.nu.modules.config.instructiontag.entity.InstructionTag;
|
||||
import com.nu.modules.config.instructiontag.service.IInstructionTagService;
|
||||
import com.nu.modules.config.servicecategory.entity.ConfigServiceCategory;
|
||||
|
|
@ -14,17 +12,10 @@ import com.nu.modules.config.servicedirective.entity.ConfigServiceDirective;
|
|||
import com.nu.modules.config.servicedirective.service.IConfigServiceDirectiveService;
|
||||
import com.nu.modules.config.servicetype.entity.ConfigServiceType;
|
||||
import com.nu.modules.config.servicetype.service.IConfigServiceTypeService;
|
||||
import com.nu.utils.RabbitMQUtil;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.compress.utils.Lists;
|
||||
import org.springframework.amqp.core.ExchangeTypes;
|
||||
import org.springframework.amqp.rabbit.annotation.Exchange;
|
||||
import org.springframework.amqp.rabbit.annotation.Queue;
|
||||
import org.springframework.amqp.rabbit.annotation.QueueBinding;
|
||||
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
|
@ -43,126 +34,6 @@ public class DirectiveMQListener {
|
|||
private IConfigServiceCategoryService serviceCategoryService;
|
||||
@Autowired
|
||||
private IConfigServiceTypeService serviceTypeService;
|
||||
@Autowired
|
||||
private RabbitMQUtil rabbitMQUtil;
|
||||
|
||||
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "#{directiveAsyncDQNP.getSyncDirectiveQueueName()}"), exchange = @Exchange(name = "hldy.directive", type = ExchangeTypes.DIRECT), key = "#{directiveAsyncDQNP.getSyncDirectiveKeyName()}"), errorHandler = "directiveMQErrorHandler")
|
||||
@Transactional(rollbackFor = {Exception.class})
|
||||
public void handleAuditResult(DirectiveMQDto dto) {
|
||||
dto.setIzInc(true);
|
||||
dto.setIdStr(dto.getDirectiveList().stream().map(d -> d.getId()).collect(Collectors.joining(",")));
|
||||
try {
|
||||
//增量处理数据
|
||||
handleIncremental(dto);
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
//返回错误日志
|
||||
StatusMQDto statusMQDto = new StatusMQDto();
|
||||
statusMQDto.setStatus(MQStatus.PROCESS_FAILED.getCode());
|
||||
statusMQDto.setMessage(e.getMessage());
|
||||
statusMQDto.setAsyncId(dto.getAsyncId());
|
||||
statusMQDto.setCode("data");
|
||||
rabbitMQUtil.sendToExchange("hldy.fwzl", "fwzl.async.result", statusMQDto);
|
||||
//这里就不会再执行文件的逻辑 要把文件按同步失败处理
|
||||
StatusMQDto statusMQDtoFile = new StatusMQDto();
|
||||
statusMQDtoFile.setStatus(MQStatus.PROCESS_FAILED.getCode());
|
||||
statusMQDtoFile.setMessage("文件同步失败,不再处理资源");
|
||||
statusMQDtoFile.setAsyncId(dto.getAsyncId());
|
||||
statusMQDtoFile.setCode("file");
|
||||
rabbitMQUtil.sendToExchange("hldy.fwzl", "fwzl.async.result", statusMQDtoFile);
|
||||
throw new RuntimeException("数据同步失败");
|
||||
}
|
||||
//在这里返回数据成功和日志
|
||||
StatusMQDto statusMQDto = new StatusMQDto();
|
||||
statusMQDto.setStatus(MQStatus.SUCCESS.getCode());
|
||||
statusMQDto.setMessage("数据同步成功!");
|
||||
statusMQDto.setAsyncId(dto.getAsyncId());
|
||||
statusMQDto.setCode("data");
|
||||
rabbitMQUtil.sendToExchange("hldy.fwzl", "fwzl.async.result", statusMQDto);
|
||||
//资源日志在handleCreateMedia中有处理
|
||||
}
|
||||
|
||||
/**
|
||||
* 拉取指令资源
|
||||
*
|
||||
* @param mqDto
|
||||
*/
|
||||
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "#{directiveAsyncDQNP.getCreateMediaQueueName()}"), exchange = @Exchange(name = "hldy.directive", type = ExchangeTypes.DIRECT), key = "#{directiveAsyncDQNP.getCreateMediaKeyName()}"), errorHandler = "directiveMQErrorHandler")
|
||||
public void handleCreateMedia(DirectiveMQDto mqDto) {
|
||||
try {
|
||||
mqDto.getDirectiveList().stream().forEach(dto -> {
|
||||
//查询现有服务指令
|
||||
QueryWrapper<ConfigServiceDirective> qw = new QueryWrapper<>();
|
||||
qw.eq("id", dto.getId());
|
||||
ConfigServiceDirective currentDirective = directiveService.getOne(qw);
|
||||
if (mqDto.isIzDirectiveMedia() && currentDirective == null) {
|
||||
//管理平台服务指令媒体资源管理功能发起的(本地未必会有对应指令,没有的话只保存物理文件)
|
||||
currentDirective = new ConfigServiceDirective();
|
||||
}
|
||||
|
||||
//更新服务指令媒体资源字段
|
||||
ConfigServiceDirective configServiceDirective = new ConfigServiceDirective();
|
||||
configServiceDirective.setId(dto.getId());
|
||||
configServiceDirective.setServiceContent(dto.getServiceContent());//服务指令说明
|
||||
|
||||
//拉取媒体资源至本地目录
|
||||
{
|
||||
configServiceDirective.setPreviewFile(dto.getPreviewFile());//服务指令图片大图
|
||||
configServiceDirective.setPreviewFileMd5(dto.getPreviewFileMd5());
|
||||
configServiceDirective.setPreviewFileSmall(dto.getPreviewFileSmall());//服务指令图片小图
|
||||
configServiceDirective.setPreviewFileSmallMd5(dto.getPreviewFileSmallMd5());
|
||||
configServiceDirective.setImmediateFile(dto.getImmediateFile());//即时指令图标
|
||||
configServiceDirective.setImmediateFileMd5(dto.getImmediateFileMd5());
|
||||
configServiceDirective.setImmediateFileFocus(dto.getImmediateFileFocus());//即时指令图标
|
||||
configServiceDirective.setImmediateFileFocusMd5(dto.getImmediateFileFocusMd5());
|
||||
configServiceDirective.setMp3File(dto.getMp3File());//指令音频文件
|
||||
configServiceDirective.setMp3FileMd5(dto.getMp3FileMd5());
|
||||
configServiceDirective.setMp4File(dto.getMp4File());//指令视频文件
|
||||
configServiceDirective.setMp4FileMd5(dto.getMp4FileMd5());
|
||||
directiveService.updateById(configServiceDirective);
|
||||
}
|
||||
});
|
||||
//在这里返回数据成功和日志
|
||||
StatusMQDto statusMQDto = new StatusMQDto();
|
||||
statusMQDto.setStatus(MQStatus.SUCCESS.getCode());
|
||||
statusMQDto.setMessage("文件同步成功!");
|
||||
statusMQDto.setAsyncId(mqDto.getAsyncId());
|
||||
statusMQDto.setCode("file");
|
||||
rabbitMQUtil.sendToExchange("hldy.fwzl", "fwzl.async.result", statusMQDto);
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
//返回错误日志
|
||||
StatusMQDto statusMQDto = new StatusMQDto();
|
||||
statusMQDto.setStatus(MQStatus.PROCESS_FAILED.getCode());
|
||||
statusMQDto.setMessage(e.getMessage());
|
||||
statusMQDto.setAsyncId(mqDto.getAsyncId());
|
||||
statusMQDto.setCode("file");
|
||||
rabbitMQUtil.sendToExchange("hldy.fwzl", "fwzl.async.result", statusMQDto);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理增量同步数据
|
||||
*
|
||||
* @param dto
|
||||
* @throws Exception
|
||||
*/
|
||||
private void handleIncremental(DirectiveMQDto dto) {
|
||||
//增量:传过来的是已勾选的全部数据,需将重复部分去除
|
||||
//先查出所有指令id 然后进行去重
|
||||
List<ConfigServiceDirective> tempList = directiveService.allData();
|
||||
Set<String> existingIds = tempList.stream().map(ConfigServiceDirective::getId).map(String::valueOf).collect(Collectors.toSet());
|
||||
|
||||
String idStr = dto.getIdStr();
|
||||
List<String> inputIds = Arrays.asList(idStr.split(","));
|
||||
|
||||
List<String> uniqueIds = inputIds.stream().filter(id -> !existingIds.contains(id)).collect(Collectors.toList());
|
||||
|
||||
dto.setIdList(uniqueIds);
|
||||
|
||||
handleData(dto);
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理数据
|
||||
|
|
@ -317,22 +188,8 @@ public class DirectiveMQListener {
|
|||
directiveService.updateById(configServiceDirective);
|
||||
}
|
||||
});
|
||||
//在这里返回数据成功和日志
|
||||
StatusMQDto statusMQDto = new StatusMQDto();
|
||||
statusMQDto.setStatus(MQStatus.SUCCESS.getCode());
|
||||
statusMQDto.setMessage("文件同步成功!");
|
||||
statusMQDto.setAsyncId(mqDto.getAsyncId());
|
||||
statusMQDto.setCode("file");
|
||||
rabbitMQUtil.sendToExchange("hldy.fwzl", "fwzl.async.result", statusMQDto);
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
//返回错误日志
|
||||
StatusMQDto statusMQDto = new StatusMQDto();
|
||||
statusMQDto.setStatus(MQStatus.PROCESS_FAILED.getCode());
|
||||
statusMQDto.setMessage(e.getMessage());
|
||||
statusMQDto.setAsyncId(mqDto.getAsyncId());
|
||||
statusMQDto.setCode("file");
|
||||
rabbitMQUtil.sendToExchange("hldy.fwzl", "fwzl.async.result", statusMQDto);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -40,31 +40,4 @@ public class DynamicQueueNameProvider {
|
|||
return getAuditResultQueueName();
|
||||
}
|
||||
|
||||
public String getCreateMediaQueueName() {
|
||||
JSONObject deptInfo = sysBaseAPI.getDeptInfo();
|
||||
String orgCode = deptInfo.getString("code");
|
||||
if (StringUtils.isNotBlank(orgCode)) {
|
||||
return orgCode + ".directive.createmedia";
|
||||
} else {
|
||||
return "";
|
||||
}
|
||||
}
|
||||
|
||||
public String getCreateMediaKeyName() {
|
||||
return getCreateMediaQueueName();
|
||||
}
|
||||
|
||||
public String getSyncDirectiveQueueName() {
|
||||
JSONObject deptInfo = sysBaseAPI.getDeptInfo();
|
||||
String orgCode = deptInfo.getString("code");
|
||||
if (StringUtils.isNotBlank(orgCode)) {
|
||||
return orgCode + ".directive.syncDirective";
|
||||
} else {
|
||||
return "";
|
||||
}
|
||||
}
|
||||
|
||||
public String getSyncDirectiveKeyName() {
|
||||
return getSyncDirectiveQueueName();
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,28 +0,0 @@
|
|||
package com.nu.mq.instructiontag.exceptionhandler;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.amqp.AmqpRejectAndDontRequeueException;
|
||||
import org.springframework.amqp.core.Message;
|
||||
import org.springframework.amqp.rabbit.listener.api.RabbitListenerErrorHandler;
|
||||
import org.springframework.amqp.rabbit.support.ListenerExecutionFailedException;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
@Slf4j
|
||||
@Component("instructionTagMQErrorHandler")
|
||||
public class InstructionTagMQExceptionHandler implements RabbitListenerErrorHandler {
|
||||
|
||||
@Override
|
||||
public Object handleError(Message message, org.springframework.messaging.Message<?> message1, ListenerExecutionFailedException e) {
|
||||
log.error("MQ消息处理失败 | 消息体: {} | 异常原因: {}", new String(message.getBody()), e.getCause().getMessage());
|
||||
|
||||
// 根据异常类型选择处理策略
|
||||
// if (isRetryable(e)) {
|
||||
// // 可重试异常:抛出异常触发重试
|
||||
// throw e;
|
||||
// } else {
|
||||
// 不可恢复异常:拒绝消息且不重新入队
|
||||
throw new AmqpRejectAndDontRequeueException("消息处理失败且禁止重试", e);
|
||||
// }
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -1,49 +0,0 @@
|
|||
package com.nu.mq.instructiontag.listener;
|
||||
|
||||
import com.nu.dto.InstructionTagMQDto;
|
||||
import com.nu.modules.config.instructiontag.entity.InstructionTag;
|
||||
import com.nu.modules.config.instructiontag.service.IInstructionTagService;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.amqp.core.ExchangeTypes;
|
||||
import org.springframework.amqp.rabbit.annotation.Exchange;
|
||||
import org.springframework.amqp.rabbit.annotation.Queue;
|
||||
import org.springframework.amqp.rabbit.annotation.QueueBinding;
|
||||
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
||||
import org.springframework.beans.BeanUtils;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
@Slf4j
|
||||
@Component
|
||||
public class InstructionTagMQListener {
|
||||
|
||||
@Autowired
|
||||
private IInstructionTagService instructionTagService;
|
||||
|
||||
/**
|
||||
* 主指令库新增分类标签后通知所有业务平台进行新增处理(增量)
|
||||
*
|
||||
* @param dto
|
||||
*/
|
||||
@RabbitListener(
|
||||
bindings = @QueueBinding(
|
||||
value = @Queue(
|
||||
name = "#{T(java.util.UUID).randomUUID().toString()}",
|
||||
autoDelete = "true"
|
||||
),
|
||||
exchange = @Exchange(
|
||||
name = "hldy.instructionTag.add.fanout",
|
||||
type = ExchangeTypes.FANOUT
|
||||
)
|
||||
), errorHandler = "instructionTagMQErrorHandler"
|
||||
)
|
||||
public void addInstructionMq(InstructionTagMQDto dto) {
|
||||
InstructionTag r_ = instructionTagService.lambdaQuery().eq(InstructionTag::getId, dto.getId()).one();
|
||||
if (r_ == null) {
|
||||
InstructionTag instructionTag = new InstructionTag();
|
||||
BeanUtils.copyProperties(dto, instructionTag);
|
||||
instructionTagService.save(instructionTag);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
Loading…
Reference in New Issue