1、系统级别数据字典同步

2、调整业务系统mq监听的队列:根据机构动态生成队列名称、key名称并绑定交换机(解决队列名称写死,每次新增业务系统都需要修改问题)
This commit is contained in:
1378012178@qq.com 2025-05-08 16:22:35 +08:00
parent 43c030e89a
commit b8749a6b71
10 changed files with 79 additions and 50 deletions

View File

@ -32,43 +32,6 @@ public class RabbitMQConfig {
return new DirectExchange("hldy.fwzl");
}
// 队列
@Bean
public Queue nu001FwzlAsyncQueue() {
return new Queue("nu001.fwzl.async", true);
}
@Bean
public Queue nu001FwzlStatusQueue() {
return new Queue("nu001.fwzl.status", true);
}
@Bean
public Queue nu002FwzlAsyncQueue() {
return new Queue("nu002.fwzl.async", true);
}
@Bean
public Queue nu002FwzlStatusQueue() {
return new Queue("nu002.fwzl.status", true);
}
@Bean
public Binding binding1(Queue nu001FwzlAsyncQueue, DirectExchange fwzlExchange) {
return BindingBuilder.bind(nu001FwzlAsyncQueue).to(fwzlExchange).with("nu001.fwzl.async");
}
@Bean
public Binding binding2(Queue nu001FwzlStatusQueue, DirectExchange fwzlExchange) {
return BindingBuilder.bind(nu001FwzlStatusQueue).to(fwzlExchange).with("nu001.fwzl.status");
}
@Bean
public Binding binding3(Queue nu002FwzlAsyncQueue, DirectExchange fwzlExchange) {
return BindingBuilder.bind(nu002FwzlAsyncQueue).to(fwzlExchange).with("nu002.fwzl.async");
}
@Bean
public Binding binding4(Queue nu002FwzlStatusQueue, DirectExchange fwzlExchange) {
return BindingBuilder.bind(nu002FwzlStatusQueue).to(fwzlExchange).with("nu002.fwzl.status");
}
//注册
@Bean
public DirectExchange registerExchange() {

View File

@ -16,4 +16,5 @@ public class StatusMQDto {
private String dictId;
private String orgCode;
private String orgName;
}

View File

@ -44,6 +44,12 @@
<artifactId>nursing-unit-common</artifactId>
<version>${nursingunit.version}</version>
</dependency>
</dependencies>
<dependency>
<groupId>com.nursingunit.boot</groupId>
<artifactId>nu-system-local-api</artifactId>
<version>2.0.0</version>
<scope>compile</scope>
</dependency>
</dependencies>
</project>

View File

@ -20,6 +20,10 @@ import com.nu.modules.servicetype.service.IConfigServiceTypeService;
import com.nu.utils.RabbitMQUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.compress.utils.Lists;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@ -65,7 +69,14 @@ public class DirectiveMQListener {
*
* @param dto
*/
// @RabbitListener(queues = "nu002.fwzl.async", errorHandler = "directiveMQErrorHandler")
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(name = "#{directiveAsyncDQNP.getQueueName()}"),
exchange = @Exchange(name = "hldy.fwzl", type = ExchangeTypes.DIRECT),
key = "#{directiveAsyncDQNP.getKeyName()}"
),
errorHandler = "directiveMQErrorHandler"
)
@DSTransactional(rollbackFor = {Exception.class})
public void handleMessage(DirectiveMQDto dto) {
try {
@ -82,8 +93,7 @@ public class DirectiveMQListener {
statusMQDto.setMessage(e.getMessage());
statusMQDto.setAsyncId(dto.getAsyncId());
statusMQDto.setCode("data");
//TODO 返回的key应该是动态获取该机构编码 然后使用topic
rabbitMQUtil.sendToExchange("hldy.fwzl", "nu002.fwzl.status", statusMQDto);
rabbitMQUtil.sendToExchange("hldy.fwzl", "fwzl.async.result", statusMQDto);
throw new RuntimeException(e);
}
StatusMQDto statusMQDto = new StatusMQDto();
@ -91,7 +101,7 @@ public class DirectiveMQListener {
statusMQDto.setMessage("数据同步成功!");
statusMQDto.setAsyncId(dto.getAsyncId());
statusMQDto.setCode("data");
rabbitMQUtil.sendToExchange("hldy.fwzl", "nu002.fwzl.status", statusMQDto);
rabbitMQUtil.sendToExchange("hldy.fwzl", "fwzl.async.result", statusMQDto);
}

View File

@ -0,0 +1,26 @@
package com.nu.mq.directive.listener;
import org.apache.commons.lang.StringUtils;
import org.jeecg.common.system.api.ISysBaseAPI;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component("directiveAsyncDQNP")
public class DynamicQueueNameProvider {
@Autowired
private ISysBaseAPI sysBaseAPI;
public String getQueueName() {
String orgCode = sysBaseAPI.getDeptCode();
if (StringUtils.isNotBlank(orgCode)) {
return orgCode + ".fwzl.async";
} else {
return "";
}
}
public String getKeyName() {
return getQueueName();
}
}

View File

@ -544,4 +544,9 @@ public interface ISysBaseAPI extends CommonAPI {
*/
boolean dictTableWhiteListCheckByDict(String tableOrDictCode, String... fields);
/**
* 获取本机构编码
* @return
*/
String getDeptCode();
}

View File

@ -1828,4 +1828,17 @@ public class SysBaseApiImpl implements ISysBaseAPI {
}
}
@Override
public String getDeptCode() {
String result = null;
QueryWrapper<SysDepart> qw = new QueryWrapper<>();
qw.eq("org_category","1");
qw.eq("del_flag","0");
List<SysDepart> list = sysDepartService.list(qw);
if(list!=null && !list.isEmpty()){
result = list.get(0).getOrgCode();
}
return result;
}
}

View File

@ -43,10 +43,11 @@ public class DictMQListener {
* 字典同步
* 如果 没有字典则创建 然后把每一项字典项新增进去
* 有字典项 则检查每一项字典项是否存在不存在则插入
* <p>
* 动态生成队列名称避免每次部署新系统都需要改动
*
* @param dto
*/
// @RabbitListener(queues = "sysdict.async", errorHandler = "dictMQErrorHandler")
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(
@ -60,10 +61,12 @@ public class DictMQListener {
), errorHandler = "dictMQErrorHandler"
)
public void handleMessage(SysDictMQDto dto) {
String orgCode = "业务系统未查询到";
String orgName = "业务系统未查询到";
//查询此业务系统的机构编码名称
String orgCode = "业务系统中未设置";
String orgName = "业务系统中未设置";
QueryWrapper<SysDepart> sysDepartQueryWrapper = new QueryWrapper<>();
sysDepartQueryWrapper.eq("org_category", "1");
sysDepartQueryWrapper.eq("del_flag", "0");
List<SysDepart> list = departService.list(sysDepartQueryWrapper);
if (list != null && list.size() > 0) {
orgCode = list.get(0).getOrgCode();
@ -107,17 +110,19 @@ public class DictMQListener {
} catch (Exception e) {
StatusMQDto statusMQDto = new StatusMQDto();
statusMQDto.setStatus(MQStatus.PROCESS_FAILED.getCode());
statusMQDto.setMessage(orgName + " 同步失败:" + e.getMessage());
statusMQDto.setMessage(e.getMessage());
statusMQDto.setDictId(dto.getId());
statusMQDto.setOrgCode(orgCode);
statusMQDto.setOrgName(orgName);
rabbitMQUtil.sendToExchange("hldy.sysdict", "sysdict.async.result", statusMQDto);
throw new RuntimeException(e);
}
StatusMQDto statusMQDto = new StatusMQDto();
statusMQDto.setStatus(MQStatus.SUCCESS.getCode());
statusMQDto.setMessage(orgName + " 数据同步成功!");
statusMQDto.setMessage("数据同步成功!");
statusMQDto.setDictId(dto.getId());
statusMQDto.setOrgCode(orgCode);
statusMQDto.setOrgName(orgName);
rabbitMQUtil.sendToExchange("hldy.sysdict", "sysdict.async.result", statusMQDto);
}

View File

@ -1,5 +1,5 @@
server:
port: 8081
port: 8084
tomcat:
max-swallow-size: -1
error:

View File

@ -491,7 +491,7 @@
<id>dev</id>
<activation>
<!--默认激活配置-->
<activeByDefault>false</activeByDefault>
<activeByDefault>true</activeByDefault>
</activation>
<properties>
<!--当前环境-->
@ -543,7 +543,7 @@
<id>uat</id>
<activation>
<!--默认激活配置-->
<activeByDefault>true</activeByDefault>
<activeByDefault>false</activeByDefault>
</activation>
<properties>
<!--当前环境-->