1、系统级别数据字典同步
2、调整业务系统mq监听的队列:根据机构动态生成队列名称、key名称并绑定交换机(解决队列名称写死,每次新增业务系统都需要修改问题)
This commit is contained in:
parent
67d83e35bf
commit
7bcd2fc403
|
@ -17,6 +17,11 @@
|
|||
<artifactId>nursing-unit-base-core</artifactId>
|
||||
<version>2.0.0</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.nursingunit.boot</groupId>
|
||||
<artifactId>nu-system-local-api</artifactId>
|
||||
<version>2.0.0</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
</project>
|
||||
|
|
|
@ -32,43 +32,6 @@ public class RabbitMQConfig {
|
|||
return new DirectExchange("hldy.fwzl");
|
||||
}
|
||||
|
||||
// 队列
|
||||
@Bean
|
||||
public Queue nu001FwzlAsyncQueue() {
|
||||
return new Queue("nu001.fwzl.async", true);
|
||||
}
|
||||
@Bean
|
||||
public Queue nu001FwzlStatusQueue() {
|
||||
return new Queue("nu001.fwzl.status", true);
|
||||
}
|
||||
@Bean
|
||||
public Queue nu002FwzlAsyncQueue() {
|
||||
return new Queue("nu002.fwzl.async", true);
|
||||
}
|
||||
@Bean
|
||||
public Queue nu002FwzlStatusQueue() {
|
||||
return new Queue("nu002.fwzl.status", true);
|
||||
}
|
||||
|
||||
@Bean
|
||||
public Binding binding1(Queue nu001FwzlAsyncQueue, DirectExchange fwzlExchange) {
|
||||
return BindingBuilder.bind(nu001FwzlAsyncQueue).to(fwzlExchange).with("nu001.fwzl.async");
|
||||
}
|
||||
@Bean
|
||||
public Binding binding2(Queue nu001FwzlStatusQueue, DirectExchange fwzlExchange) {
|
||||
return BindingBuilder.bind(nu001FwzlStatusQueue).to(fwzlExchange).with("nu001.fwzl.status");
|
||||
}
|
||||
@Bean
|
||||
public Binding binding3(Queue nu002FwzlAsyncQueue, DirectExchange fwzlExchange) {
|
||||
return BindingBuilder.bind(nu002FwzlAsyncQueue).to(fwzlExchange).with("nu002.fwzl.async");
|
||||
}
|
||||
@Bean
|
||||
public Binding binding4(Queue nu002FwzlStatusQueue, DirectExchange fwzlExchange) {
|
||||
return BindingBuilder.bind(nu002FwzlStatusQueue).to(fwzlExchange).with("nu002.fwzl.status");
|
||||
}
|
||||
|
||||
|
||||
|
||||
//注册
|
||||
@Bean
|
||||
public DirectExchange registerExchange() {
|
||||
|
|
|
@ -16,4 +16,6 @@ public class StatusMQDto {
|
|||
|
||||
private String dictId;
|
||||
private String orgCode;
|
||||
private String orgName;
|
||||
}
|
||||
|
||||
|
|
|
@ -2,6 +2,7 @@ package com.nu.modules.async.controller;
|
|||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
|
||||
|
@ -17,6 +18,7 @@ import lombok.extern.slf4j.Slf4j;
|
|||
|
||||
import org.jeecg.common.system.base.controller.JeecgController;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.context.annotation.Lazy;
|
||||
import org.springframework.web.bind.annotation.*;
|
||||
import org.springframework.web.servlet.ModelAndView;
|
||||
import io.swagger.annotations.Api;
|
||||
|
@ -35,6 +37,7 @@ import org.apache.shiro.authz.annotation.RequiresPermissions;
|
|||
@RequestMapping("/asyncmain/asyncMain")
|
||||
@Slf4j
|
||||
public class AsyncMainController extends JeecgController<AsyncMain, IAsyncMainService> {
|
||||
|
||||
@Autowired
|
||||
private IAsyncMainService asyncMainService;
|
||||
|
||||
|
@ -60,7 +63,7 @@ public class AsyncMainController extends JeecgController<AsyncMain, IAsyncMainSe
|
|||
IPage<AsyncMain> pageList = asyncMainService.page(page, queryWrapper);
|
||||
List<AsyncMain> records = pageList.getRecords();
|
||||
if (records != null && !records.isEmpty()) {
|
||||
records = asyncMainService.pageList(records);
|
||||
records = asyncMainService.pageList(records,asyncMain);
|
||||
pageList.setRecords(records);
|
||||
}
|
||||
return Result.OK(pageList);
|
||||
|
@ -168,4 +171,8 @@ public class AsyncMainController extends JeecgController<AsyncMain, IAsyncMainSe
|
|||
return super.importExcel(request, response, AsyncMain.class);
|
||||
}
|
||||
|
||||
@PostMapping("/listByType")
|
||||
public Result<Map<String,List<AsyncMain>>> listByType(@RequestBody AsyncMain am){
|
||||
return Result.OK(asyncMainService.listByType(am));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -43,6 +43,10 @@ public class AsyncMain implements Serializable {
|
|||
@Excel(name = "机构编码", width = 15)
|
||||
@ApiModelProperty(value = "机构编码")
|
||||
private java.lang.String orgCode;
|
||||
/**机构名称*/
|
||||
@Excel(name = "机构名称", width = 15)
|
||||
@ApiModelProperty(value = "机构名称")
|
||||
private java.lang.String orgName;
|
||||
/**类型(同步的是什么类型的数据)*/
|
||||
@Excel(name = "类型(同步的是什么类型的数据)", width = 15)
|
||||
@ApiModelProperty(value = "类型(同步的是什么类型的数据)")
|
||||
|
@ -67,4 +71,6 @@ public class AsyncMain implements Serializable {
|
|||
|
||||
@TableField(exist = false)
|
||||
private List<AsyncStatus> asyncStatusList;
|
||||
@TableField(exist = false)
|
||||
private List<String> ids;
|
||||
}
|
||||
|
|
|
@ -14,5 +14,5 @@ import java.util.List;
|
|||
*/
|
||||
public interface AsyncMainMapper extends BaseMapper<AsyncMain> {
|
||||
|
||||
List<AsyncMain> pageList(@Param("pojo") List<AsyncMain> records);
|
||||
List<AsyncMain> pageList(@Param("records") List<AsyncMain> records,@Param("dto") AsyncMain dto);
|
||||
}
|
||||
|
|
|
@ -6,6 +6,7 @@
|
|||
<id property="id" column="id"/>
|
||||
<result property="primaryKey" column="primaryKey"/>
|
||||
<result property="orgCode" column="orgCode"/>
|
||||
<result property="orgName" column="orgName"/>
|
||||
<result property="type" column="type"/>
|
||||
<result property="descr" column="descr"/>
|
||||
<result property="createBy" column="createBy"/>
|
||||
|
@ -25,6 +26,7 @@
|
|||
f.id as id,
|
||||
f.primary_key as primaryKey,
|
||||
f.org_code as orgCode,
|
||||
f.org_name as orgName,
|
||||
f.type as type,
|
||||
f.descr as descr,
|
||||
f.create_by as createBy,
|
||||
|
@ -38,10 +40,16 @@
|
|||
from nu_async_main f
|
||||
left join nu_async_status z on f.id = z.pkid
|
||||
<where>
|
||||
f.id in
|
||||
<foreach collection="pojo" item="am" separator="," open="(" close=")">
|
||||
#{am.id}
|
||||
</foreach>
|
||||
<if test="records !=null and !records.isEmpty()">
|
||||
f.id in
|
||||
<foreach collection="records" item="am" separator="," open="(" close=")">
|
||||
#{am.id}
|
||||
</foreach>
|
||||
</if>
|
||||
<if test="dto.primaryKey != '' and dto.primaryKey != null">
|
||||
and f.primary_key = #{dto.primaryKey}
|
||||
and f.status = '500'
|
||||
</if>
|
||||
</where>
|
||||
order by f.create_time desc,z.update_time desc
|
||||
</select>
|
||||
|
|
|
@ -4,6 +4,7 @@ import com.nu.modules.async.entity.AsyncMain;
|
|||
import com.baomidou.mybatisplus.extension.service.IService;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* @Description: 数据同步主表
|
||||
|
@ -13,5 +14,7 @@ import java.util.List;
|
|||
*/
|
||||
public interface IAsyncMainService extends IService<AsyncMain> {
|
||||
|
||||
List<AsyncMain> pageList(List<AsyncMain> records);
|
||||
List<AsyncMain> pageList(List<AsyncMain> records, AsyncMain dto);
|
||||
|
||||
Map<String,List<AsyncMain>> listByType(AsyncMain am);
|
||||
}
|
||||
|
|
|
@ -1,25 +1,73 @@
|
|||
package com.nu.modules.async.service.impl;
|
||||
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.nu.enums.MQStatus;
|
||||
import com.nu.modules.async.entity.AsyncMain;
|
||||
import com.nu.modules.async.mapper.AsyncMainMapper;
|
||||
import com.nu.modules.async.service.IAsyncMainService;
|
||||
import org.apache.commons.compress.utils.Lists;
|
||||
import org.jeecg.common.system.api.ISysBaseAPI;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* @Description: 数据同步主表
|
||||
* @Author: 张明远
|
||||
* @Date: 2025-04-23
|
||||
* @Date: 2025-04-23
|
||||
* @Version: V1.0
|
||||
*/
|
||||
@Service
|
||||
public class AsyncMainServiceImpl extends ServiceImpl<AsyncMainMapper, AsyncMain> implements IAsyncMainService {
|
||||
|
||||
@Autowired
|
||||
private ISysBaseAPI sysBaseAPI;
|
||||
|
||||
@Override
|
||||
public List<AsyncMain> pageList(List<AsyncMain> records) {
|
||||
return baseMapper.pageList(records);
|
||||
public List<AsyncMain> pageList(List<AsyncMain> records, AsyncMain dto) {
|
||||
return baseMapper.pageList(records, dto);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, List<AsyncMain>> listByType(AsyncMain am) {
|
||||
Map<String, List<AsyncMain>> result = Maps.newHashMap();
|
||||
List<AsyncMain> errorList = Lists.newArrayList();
|
||||
List<AsyncMain> processingList = Lists.newArrayList();
|
||||
List<AsyncMain> successList = Lists.newArrayList();
|
||||
|
||||
List<JSONObject> depts = sysBaseAPI.queryOpeDept();
|
||||
|
||||
QueryWrapper<AsyncMain> asyncMainQueryWrapper = new QueryWrapper<>();
|
||||
asyncMainQueryWrapper.eq("primary_key", am.getPrimaryKey());
|
||||
asyncMainQueryWrapper.orderByAsc("org_code");
|
||||
List<AsyncMain> asyncMains = baseMapper.selectList(asyncMainQueryWrapper);
|
||||
if (asyncMains != null && !asyncMains.isEmpty()) {
|
||||
asyncMains.stream().forEach(a -> {
|
||||
if ((MQStatus.SUCCESS.getCode() + "").equals(a.getStatus())) {
|
||||
successList.add(a);
|
||||
} else {
|
||||
errorList.add(a);
|
||||
}
|
||||
depts.removeIf(dept -> a.getOrgCode().equals(dept.getString("code")));
|
||||
});
|
||||
}
|
||||
depts.forEach(dept -> {
|
||||
AsyncMain asyncMain = new AsyncMain();
|
||||
asyncMain.setOrgCode(dept.getString("code"));
|
||||
asyncMain.setOrgName(dept.getString("name"));
|
||||
asyncMain.setDescr("同步中/待同步");
|
||||
processingList.add(asyncMain);
|
||||
});
|
||||
|
||||
result.put("errorList", errorList);
|
||||
result.put("processingList", processingList);
|
||||
result.put("successList", successList);
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -67,7 +67,7 @@ public class ConfigServiceDirectiveController extends JeecgController<ConfigServ
|
|||
@ApiOperation(value = "服务指令-分页列表查询", notes = "服务指令-分页列表查询")
|
||||
@GetMapping(value = "/list")
|
||||
@DS("#dataSourceCode")
|
||||
public Result<IPage<ConfigServiceDirective>> queryPageList(String dataSourceCode,ConfigServiceDirective configServiceDirective,
|
||||
public Result<IPage<ConfigServiceDirective>> queryPageList(String dataSourceCode, ConfigServiceDirective configServiceDirective,
|
||||
@RequestParam(name = "pageNo", defaultValue = "1") Integer pageNo,
|
||||
@RequestParam(name = "pageSize", defaultValue = "10") Integer pageSize,
|
||||
HttpServletRequest req) {
|
||||
|
@ -113,8 +113,9 @@ public class ConfigServiceDirectiveController extends JeecgController<ConfigServ
|
|||
*/
|
||||
@PostMapping("/async")
|
||||
public Result<String> async(@RequestBody DirectiveMQDto dto) {
|
||||
List<DictModel> dicts = sysBaseAPI.getDictItems("mq_org_queue");
|
||||
String queue = dicts.stream().filter(d -> d.getValue().equals(dto.getOrgCode())).findFirst().map(DictModel::getText).orElse(null);
|
||||
// List<DictModel> dicts = sysBaseAPI.getDictItems("mq_org_queue");
|
||||
// String queue = dicts.stream().filter(d -> d.getValue().equals(dto.getOrgCode())).findFirst().map(DictModel::getText).orElse(null);
|
||||
String queue = dto.getOrgCode() + ".fwzl.async";
|
||||
if (StringUtils.isNotBlank(queue)) {
|
||||
//先在数据同步表中插入数据
|
||||
AsyncMain asyncMain = new AsyncMain();
|
||||
|
@ -167,5 +168,4 @@ public class ConfigServiceDirectiveController extends JeecgController<ConfigServ
|
|||
}
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
|
|
@ -9,6 +9,10 @@ import com.nu.modules.async.service.IAsyncMainService;
|
|||
import com.nu.modules.async.service.IAsyncStatusService;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.compress.utils.Lists;
|
||||
import org.springframework.amqp.core.ExchangeTypes;
|
||||
import org.springframework.amqp.rabbit.annotation.Exchange;
|
||||
import org.springframework.amqp.rabbit.annotation.Queue;
|
||||
import org.springframework.amqp.rabbit.annotation.QueueBinding;
|
||||
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
@ -24,16 +28,14 @@ public class DirectiveMQListener {
|
|||
@Autowired
|
||||
private IAsyncStatusService asyncStatusService;
|
||||
|
||||
@RabbitListener(queues = "nu001.fwzl.status", errorHandler = "directiveMQErrorHandler")
|
||||
public void handleNu001AsyncMessageStatus(StatusMQDto dto) {
|
||||
try {
|
||||
System.out.println("接收到了消息:" + dto.getStatus() + "消息体:" + dto.getMessage());
|
||||
} catch (Exception e) {
|
||||
System.out.println("异常了:" + e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
@RabbitListener(queues = "nu002.fwzl.status", errorHandler = "directiveMQErrorHandler")
|
||||
@RabbitListener(
|
||||
bindings = @QueueBinding(
|
||||
value = @Queue(name = "fwzl.async.result"),
|
||||
exchange = @Exchange(name = "hldy.fwzl", type = ExchangeTypes.DIRECT),
|
||||
key = "fwzl.async.result"
|
||||
),
|
||||
errorHandler = "directiveMQErrorHandler"
|
||||
)
|
||||
public void handleNu002AsyncMessageStatus(StatusMQDto dto) {
|
||||
LambdaQueryWrapper<AsyncStatus> qw = new LambdaQueryWrapper<>();
|
||||
qw.eq(AsyncStatus::getPkid, dto.getAsyncId());
|
||||
|
|
|
@ -544,4 +544,9 @@ public interface ISysBaseAPI extends CommonAPI {
|
|||
*/
|
||||
boolean dictTableWhiteListCheckByDict(String tableOrDictCode, String... fields);
|
||||
|
||||
/**
|
||||
* 查询各业务机构 编码、名称
|
||||
*
|
||||
*/
|
||||
List<JSONObject> queryOpeDept();
|
||||
}
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -75,6 +75,7 @@ public class SysDictServiceImpl extends ServiceImpl<SysDictMapper, SysDict> impl
|
|||
private DictQueryBlackListHandler dictQueryBlackListHandler;
|
||||
@Autowired
|
||||
private RabbitMQUtil rabbitMQUtil;
|
||||
@Lazy
|
||||
@Autowired
|
||||
private IAsyncMainService asyncMainService;
|
||||
|
||||
|
|
|
@ -21,6 +21,7 @@ public class DictMQListener {
|
|||
asyncMain.setPrimaryKey(dto.getDictId());
|
||||
asyncMain.setStatus(dto.getStatus()+"");
|
||||
asyncMain.setOrgCode(dto.getOrgCode());
|
||||
asyncMain.setOrgName(dto.getOrgName());
|
||||
asyncMain.setDescr(dto.getMessage());
|
||||
asyncMainService.save(asyncMain);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue