服务指令同步

This commit is contained in:
1378012178@qq.com 2025-04-25 16:18:36 +08:00
parent 55ef2c60f8
commit 652d890293
16 changed files with 266 additions and 36 deletions

View File

@ -53,7 +53,10 @@ public class NuBizAdvisoryInfoServiceImpl extends ServiceImpl<NuBizAdvisoryInfoM
public boolean updateById(NuBizAdvisoryInfo entity) {
baseMapper.updateById(entity);//插入业务数据
serverAdvisoryInfoService.updateById(entity);//插入运维数据
// serverAdvisoryInfoService.updateById(entity);//插入运维数据
NuBizAdvisoryInfoDto nuBizAdvisoryInfoDto = new NuBizAdvisoryInfoDto();
BeanUtils.copyProperties(entity,nuBizAdvisoryInfoDto);
rabbitMQUtil.sendToExchange("hldy.register","register.editData",nuBizAdvisoryInfoDto);
//审核通过并且是要成为员工的
if(StringUtils.equals("2", entity.getStatus()) && StringUtils.equals("2", entity.getAdvisoryType())){
//插入员工数据

View File

@ -37,30 +37,37 @@ public class RabbitMQConfig {
public Queue nu001FwzlAsyncQueue() {
return new Queue("nu001.fwzl.async", true);
}
@Bean
public Queue nu001FwzlStatusQueue() {
return new Queue("nu001.fwzl.status", true);
}
@Bean
public Queue nu002FwzlAsyncQueue() {
return new Queue("nu002.fwzl.async", true);
}
@Bean
public Queue nu002FwzlStatusQueue() {
return new Queue("nu002.fwzl.status", true);
}
@Bean
public Binding binding1(Queue nu001FwzlAsyncQueue, DirectExchange fwzlExchange) {
return BindingBuilder.bind(nu001FwzlAsyncQueue).to(fwzlExchange).with("nu001.fwzl.async");
}
@Bean
public Binding binding2(Queue nu001FwzlStatusQueue, DirectExchange fwzlExchange) {
return BindingBuilder.bind(nu001FwzlStatusQueue).to(fwzlExchange).with("nu001.fwzl.status");
}
@Bean
public Binding binding3(Queue nu002FwzlAsyncQueue, DirectExchange fwzlExchange) {
return BindingBuilder.bind(nu002FwzlAsyncQueue).to(fwzlExchange).with("nu002.fwzl.async");
}
@Bean
public Binding binding4(Queue nu002FwzlStatusQueue, DirectExchange fwzlExchange) {
return BindingBuilder.bind(nu002FwzlStatusQueue).to(fwzlExchange).with("nu002.fwzl.status");

View File

@ -10,4 +10,9 @@ public class DirectiveMQDto {
private String orgCode;//机构编码
private String idStr;
private List<String> idList;
//同步主表id
private String asyncId;
//同步子表code
private String code;
}

View File

@ -4,6 +4,13 @@ import lombok.Data;
@Data
public class StatusMQDto {
int status;
String message;
//同步状态 MQStatus枚举类
private int status;
//同步结果
private String message;
//同步表主表id
private String asyncId;
//同步表子表code
private String code;
}

View File

@ -0,0 +1,28 @@
package com.nu.modules.directivetag.entity;
import com.baomidou.mybatisplus.annotation.TableName;
import io.swagger.annotations.ApiModel;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.experimental.Accessors;
import java.io.Serializable;
/**
* @Description: 服务指令-指令标签中间表
* @Author: 张明远
* @Date: 2025-4-23 09:26:04
* @Version: V1.0
*/
@Data
@TableName("nu_directive_tag")
@Accessors(chain = true)
@EqualsAndHashCode(callSuper = false)
@ApiModel(value="nu_directive_tag对象", description="服务指令-指令标签中间表")
public class DirectiveTagRelation implements Serializable {
private static final long serialVersionUID = 1L;
private String directiveId;
private String tagId;
}

View File

@ -0,0 +1,18 @@
package com.nu.modules.directivetag.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.nu.modules.directivetag.entity.DirectiveTagRelation;
import java.util.List;
/**
* @Description: 指令标签
* @Author: 张明远
* @Date: 2025-03-17
* @Version: V1.0
*/
public interface DirectiveTagRelationMapper extends BaseMapper<DirectiveTagRelation> {
void removeAllRelation();
List<DirectiveTagRelation> selectAllRelation(List<String> ids);
}

View File

@ -0,0 +1,16 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.nu.modules.directivetag.mapper.DirectiveTagRelationMapper">
<delete id="removeAllRelation">
delete from nu_directive_tag
</delete>
<select id="selectAllRelation" resultType="com.nu.modules.directivetag.entity.DirectiveTagRelation">
select * from nu_directive_tag
<where>
directive_id in
<foreach collection="ids" item="id" open="(" separator="," close=")">
#{id}
</foreach>
</where>
</select>
</mapper>

View File

@ -2,6 +2,9 @@ package com.nu.modules.directivetag.service;
import com.nu.modules.directivetag.entity.DirectiveTag;
import com.baomidou.mybatisplus.extension.service.IService;
import com.nu.modules.directivetag.entity.DirectiveTagRelation;
import java.util.List;
/**
* @Description: 指令标签
@ -17,4 +20,10 @@ public interface IDirectiveTagService extends IService<DirectiveTag> {
* @return
*/
boolean isUsed(String id);
public void removeAllRelation();
public List<DirectiveTagRelation> selectAllRelation(String dataSourceCode, List<String> ids);
void insertAllRelation(List<DirectiveTagRelation> relations);
}

View File

@ -1,16 +1,20 @@
package com.nu.modules.directivetag.service.impl;
import com.baomidou.dynamic.datasource.annotation.DS;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.google.common.collect.Lists;
import com.nu.modules.directivetag.entity.DirectiveTag;
import com.nu.modules.directivetag.entity.DirectiveTagRelation;
import com.nu.modules.directivetag.mapper.DirectiveTagMapper;
import com.nu.modules.directivetag.mapper.DirectiveTagRelationMapper;
import com.nu.modules.directivetag.service.IDirectiveTagService;
import com.nu.modules.servicedirective.mapper.ConfigServiceDirectiveMapper;
import org.apache.commons.lang.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import java.util.Arrays;
import java.util.List;
/**
* @Description: 指令标签
@ -23,6 +27,8 @@ public class DirectiveTagServiceImpl extends ServiceImpl<DirectiveTagMapper, Dir
@Autowired
private ConfigServiceDirectiveMapper serviceDirectiveMapper;
@Autowired
private DirectiveTagRelationMapper tagRelationMapper;
@Override
public boolean isUsed(String ids) {
@ -37,4 +43,22 @@ public class DirectiveTagServiceImpl extends ServiceImpl<DirectiveTagMapper, Dir
}
return result;
}
@Override
public void removeAllRelation() {
tagRelationMapper.removeAllRelation();
}
@Override
@DS("#dataSourceCode")
public List<DirectiveTagRelation> selectAllRelation(String dataSourceCode, List<String> ids) {
return tagRelationMapper.selectAllRelation(ids);
}
@Override
public void insertAllRelation(List<DirectiveTagRelation> idRelations) {
idRelations.forEach(ir -> {
tagRelationMapper.insert(ir);
});
}
}

View File

@ -5,6 +5,7 @@ import java.util.List;
import org.apache.ibatis.annotations.Param;
import com.nu.modules.servicedirective.entity.ConfigServiceDirective;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import retrofit2.http.DELETE;
/**
* @Description: 服务指令
@ -44,4 +45,7 @@ public interface ConfigServiceDirectiveMapper extends BaseMapper<ConfigServiceDi
* @return
*/
int queryCountByTagIds(@Param("tagIds") List<String> tagIds);
int removeAll();
}

View File

@ -105,4 +105,8 @@
(#{directive.id}, #{tagId})
</foreach>
</insert>
<delete id="removeAll">
delete from nu_config_service_directive
</delete>
</mapper>

View File

@ -31,4 +31,10 @@ public interface IConfigServiceDirectiveService extends IService<ConfigServiceDi
void removeTags(ConfigServiceDirective configServiceDirective);
List<ConfigServiceDirective> queryDirectiveIdByTagIds(String tags);
void removeAll();
List<ConfigServiceDirective> selectAllByIds(String dataSourceCode, List<String> idList);
void insertAllDirectives(List<ConfigServiceDirective> directives);
}

View File

@ -1,5 +1,9 @@
package com.nu.modules.servicedirective.service.impl;
import cn.hutool.core.bean.BeanUtil;
import com.baomidou.dynamic.datasource.annotation.DS;
import com.baomidou.dynamic.datasource.toolkit.DynamicDataSourceContextHolder;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.nu.modules.directivetag.entity.DirectiveTag;
@ -22,8 +26,8 @@ public class ConfigServiceDirectiveServiceImpl extends ServiceImpl<ConfigService
@Override
public List<ConfigServiceDirective> pageList(ConfigServiceDirective configServiceDirective,IPage<ConfigServiceDirective> list_) {
if(list_.getRecords() == null || list_.getRecords().isEmpty()){
public List<ConfigServiceDirective> pageList(ConfigServiceDirective configServiceDirective, IPage<ConfigServiceDirective> list_) {
if (list_.getRecords() == null || list_.getRecords().isEmpty()) {
return list_.getRecords();
}
List<ConfigServiceDirective> list = baseMapper.pageList(configServiceDirective, list_.getRecords());
@ -165,6 +169,7 @@ public class ConfigServiceDirectiveServiceImpl extends ServiceImpl<ConfigService
/**
* 移除改服务指令下指令标签
*
* @param configServiceDirective
*/
@Override
@ -176,4 +181,25 @@ public class ConfigServiceDirectiveServiceImpl extends ServiceImpl<ConfigService
public List<ConfigServiceDirective> queryDirectiveIdByTagIds(String tags) {
return baseMapper.queryDirectiveIdByTagIds(tags);
}
@Override
public void removeAll() {
baseMapper.removeAll();
}
@Override
@DS("#dataSourceCode")
public List<ConfigServiceDirective> selectAllByIds(String dataSourceCode, List<String> idList) {
QueryWrapper<ConfigServiceDirective> qw = new QueryWrapper<>();
qw.in("id", idList);
return baseMapper.selectList(qw);
}
@Override
public void insertAllDirectives(List<ConfigServiceDirective> directives) {
directives.forEach(d -> {
baseMapper.insert(d);
});
}
}

View File

@ -16,20 +16,13 @@ public class DirectiveMQExceptionHandler implements RabbitListenerErrorHandler {
log.error("MQ消息处理失败 | 消息体: {} | 异常原因: {}", new String(message.getBody()), e.getCause().getMessage());
// 根据异常类型选择处理策略
if (isRetryable(e)) {
// 可重试异常抛出异常触发重试
throw e;
} else {
// 不可恢复异常拒绝消息且不重新入队
throw new AmqpRejectAndDontRequeueException("消息处理失败且禁止重试", e);
}
// if (isRetryable(e)) {
// // 可重试异常抛出异常触发重试
// throw e;
// } else {
// 不可恢复异常拒绝消息且不重新入队
throw new AmqpRejectAndDontRequeueException("消息处理失败且禁止重试", e);
// }
}
/**
* 判断异常是否可重试
*/
private boolean isRetryable(ListenerExecutionFailedException e) {
// 示例网络异常数据库临时锁超时可重试
return e.getCause() instanceof RuntimeException; // 根据实际业务调整
}
}

View File

@ -1,33 +1,108 @@
package com.nu.mq.directive.listener;
import com.baomidou.dynamic.datasource.annotation.DSTransactional;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.nu.dto.DirectiveMQDto;
import com.nu.dto.StatusMQDto;
import com.nu.enums.MQStatus;
import com.nu.modules.directivetag.entity.DirectiveTagRelation;
import com.nu.modules.directivetag.service.IDirectiveTagService;
import com.nu.modules.servicedirective.entity.ConfigServiceDirective;
import com.nu.modules.servicedirective.service.IConfigServiceDirectiveService;
import com.nu.utils.RabbitMQUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
@Slf4j
@Component
public class DirectiveMQListener {
@Autowired
private IConfigServiceDirectiveService directiveService;
@Autowired
private IDirectiveTagService tagService;
@Autowired
private RabbitMQUtil rabbitMQUtil;
@RabbitListener(queues = "nu001.fwzl.async", errorHandler = "directiveMQErrorHandler")
public void handleMessage(DirectiveMQDto msg) {
/**
* if 未到运营开始时间时 全量变更
* else 增量
* 备注指令相应的字典表有相应的实时更新处理 这里无需处理字典表 但是需要处理中间关系表
* <p>
* 具体逻辑说明
* 1未到运营开始时间时dto.isIzInc()为true时
* 1先将指令主表指令标签关系表数据全部删除
* 2将指令标签关系表数据增加进去
* 3将指令主表数据增加进去
* <p>
* 2已到运营开始时间dto.isIzInc()为false时
* 1)只传递过来需要增加的指令
* 2)先将指令标签关系表数据增加进去
* 3再将指令主表数据增加
*
* @param dto
*/
@RabbitListener(queues = "nu002.fwzl.async", errorHandler = "directiveMQErrorHandler")
@DSTransactional(rollbackFor = {Exception.class})
public void handleMessage(DirectiveMQDto dto) {
try {
if (!dto.isIzInc()) {
//全量移除所有服务指令主表指令标签中间表数据
directiveService.removeAll();
tagService.removeAllRelation();
dto.setIdList(Arrays.asList(dto.getIdStr().split(",")));
} else {
//增量传过来的是已勾选的全部数据需将重复部分去除
//先查出所有指令id 然后进行去重
QueryWrapper<ConfigServiceDirective> dtw = new QueryWrapper<>();
dtw.select("id");
List<ConfigServiceDirective> tempList = directiveService.list(dtw);
Set<String> existingIds = tempList.stream()
.map(ConfigServiceDirective::getId)
.map(String::valueOf)
.collect(Collectors.toSet());
StatusMQDto statusMQDto = new StatusMQDto();
statusMQDto.setStatus(MQStatus.SUCCESS.getCode());
statusMQDto.setMessage("成了!");
rabbitMQUtil.sendToExchange("hldy.fwzl", "nu001.fwzl.status", statusMQDto);
String idStr = dto.getIdStr();
List<String> inputIds = Arrays.asList(idStr.split(","));
List<String> uniqueIds = inputIds.stream()
.filter(id -> !existingIds.contains(id))
.collect(Collectors.toList());
dto.setIdList(uniqueIds);
}
//查询指令标签关系 并将指令标签关系表数据增加进自己的库表中
if (dto.getIdList() != null && !dto.getIdList().isEmpty()) {
List<DirectiveTagRelation> relations = tagService.selectAllRelation("nuro", dto.getIdList());
if (relations != null && !relations.isEmpty()) {
tagService.insertAllRelation(relations);
}
//查询服务指令并将服务指令新增进自己的数据库表中
List<ConfigServiceDirective> directives = directiveService.selectAllByIds("nuro", dto.getIdList());
if (directives != null && !directives.isEmpty()) {
directiveService.insertAllDirectives(directives);
}
}
} catch (Exception e) {
System.out.println("异常了:" + e.getMessage());
StatusMQDto statusMQDto = new StatusMQDto();
statusMQDto.setStatus(MQStatus.PROCESS_FAILED.getCode());
statusMQDto.setMessage(e.getMessage());
statusMQDto.setAsyncId(dto.getAsyncId());
statusMQDto.setCode("data");
rabbitMQUtil.sendToExchange("hldy.fwzl", "nu002.fwzl.status", statusMQDto);
throw new RuntimeException(e);
}
StatusMQDto statusMQDto = new StatusMQDto();
statusMQDto.setStatus(MQStatus.SUCCESS.getCode());
statusMQDto.setMessage("数据同步成功!");
statusMQDto.setAsyncId(dto.getAsyncId());
statusMQDto.setCode("data");
rabbitMQUtil.sendToExchange("hldy.fwzl", "nu002.fwzl.status", statusMQDto);
}
}

View File

@ -1,5 +1,5 @@
server:
port: 8081
port: 8082
tomcat:
max-swallow-size: -1
error:
@ -7,7 +7,7 @@ server:
include-stacktrace: ALWAYS
include-message: ALWAYS
servlet:
context-path: /nursing-unit_001
context-path: /nursing-unit_002
compression:
enabled: true
min-response-size: 1024
@ -21,7 +21,7 @@ management:
spring:
application:
name: nursing-unit-001
name: nursing-unit-002
# flyway配置
flyway:
# 是否启用flyway
@ -166,9 +166,9 @@ spring:
slow-sql-millis: 5000
datasource:
master:
url: jdbc:mysql://192.168.2.199:3306/nursing_unit_001?characterEncoding=UTF-8&useUnicode=true&useSSL=false&tinyInt1isBit=false&allowPublicKeyRetrieval=true&serverTimezone=Asia/Shanghai
username: nu001
password: nu001
url: jdbc:mysql://192.168.2.199:3306/nursing_unit_002?characterEncoding=UTF-8&useUnicode=true&useSSL=false&tinyInt1isBit=false&allowPublicKeyRetrieval=true&serverTimezone=Asia/Shanghai
username: nu002
password: nu002
driver-class-name: com.mysql.cj.jdbc.Driver
# 多数据源配置-运维系统
multi-datasource1:
@ -176,6 +176,12 @@ spring:
username: nu_sys
password: nu_sys
driver-class-name: com.mysql.cj.jdbc.Driver
# 多数据源配置-运维系统
nuro:
url: jdbc:mysql://192.168.2.199:3306/nursing_unit_001?characterEncoding=UTF-8&useUnicode=true&useSSL=false&tinyInt1isBit=false&allowPublicKeyRetrieval=true&serverTimezone=Asia/Shanghai
username: nu_ro
password: nu_ro
driver-class-name: com.mysql.cj.jdbc.Driver
#redis 配置
redis:
database: 0
@ -189,7 +195,6 @@ spring:
username: hldy
password: hldy
virtual-host: /hldy
#mybatis plus 设置
mybatis-plus:
mapper-locations: classpath*:org/jeecg/**/xml/*Mapper.xml,classpath*:com/nu/**/xml/*Mapper.xml