Compare commits
2 Commits
b9375fc375
...
1e8702673e
Author | SHA1 | Date |
---|---|---|
|
1e8702673e | |
|
dc14de4fcc |
|
@ -0,0 +1,22 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<parent>
|
||||
<groupId>com.nursingunit.boot</groupId>
|
||||
<artifactId>nursing-unit-parent</artifactId>
|
||||
<version>2.0.0</version>
|
||||
</parent>
|
||||
<description>通用工具模块</description>
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<artifactId>nursing-unit-common</artifactId>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>com.nursingunit.boot</groupId>
|
||||
<artifactId>nursing-unit-base-core</artifactId>
|
||||
<version>2.0.0</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
</project>
|
|
@ -0,0 +1,68 @@
|
|||
package com.nu.config;
|
||||
|
||||
import org.springframework.amqp.core.*;
|
||||
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
|
||||
import org.springframework.amqp.rabbit.core.RabbitAdmin;
|
||||
import org.springframework.amqp.support.converter.Jackson2JavaTypeMapper;
|
||||
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
@Configuration
|
||||
public class RabbitMQConfig {
|
||||
|
||||
@Bean
|
||||
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
|
||||
return new RabbitAdmin(connectionFactory);
|
||||
}
|
||||
|
||||
/**
|
||||
* JSON消息转换器
|
||||
*/
|
||||
@Bean
|
||||
public Jackson2JsonMessageConverter jsonMessageConverter() {
|
||||
Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
|
||||
converter.setTypePrecedence(Jackson2JavaTypeMapper.TypePrecedence.TYPE_ID);
|
||||
return converter;
|
||||
}
|
||||
|
||||
// 交换器(以Topic为例)
|
||||
@Bean
|
||||
public DirectExchange fwzlExchange() {
|
||||
return new DirectExchange("hldy.fwzl");
|
||||
}
|
||||
|
||||
// 队列
|
||||
@Bean
|
||||
public Queue nu001FwzlAsyncQueue() {
|
||||
return new Queue("nu001.fwzl.async", true);
|
||||
}
|
||||
@Bean
|
||||
public Queue nu001FwzlStatusQueue() {
|
||||
return new Queue("nu001.fwzl.status", true);
|
||||
}
|
||||
@Bean
|
||||
public Queue nu002FwzlAsyncQueue() {
|
||||
return new Queue("nu002.fwzl.async", true);
|
||||
}
|
||||
@Bean
|
||||
public Queue nu002FwzlStatusQueue() {
|
||||
return new Queue("nu002.fwzl.status", true);
|
||||
}
|
||||
@Bean
|
||||
public Binding binding1(Queue nu001FwzlAsyncQueue, DirectExchange fwzlExchange) {
|
||||
return BindingBuilder.bind(nu001FwzlAsyncQueue).to(fwzlExchange).with("nu001.fwzl.async");
|
||||
}
|
||||
@Bean
|
||||
public Binding binding2(Queue nu001FwzlStatusQueue, DirectExchange fwzlExchange) {
|
||||
return BindingBuilder.bind(nu001FwzlStatusQueue).to(fwzlExchange).with("nu001.fwzl.status");
|
||||
}
|
||||
@Bean
|
||||
public Binding binding3(Queue nu002FwzlAsyncQueue, DirectExchange fwzlExchange) {
|
||||
return BindingBuilder.bind(nu002FwzlAsyncQueue).to(fwzlExchange).with("nu002.fwzl.async");
|
||||
}
|
||||
@Bean
|
||||
public Binding binding4(Queue nu002FwzlStatusQueue, DirectExchange fwzlExchange) {
|
||||
return BindingBuilder.bind(nu002FwzlStatusQueue).to(fwzlExchange).with("nu002.fwzl.status");
|
||||
}
|
||||
}
|
|
@ -0,0 +1,13 @@
|
|||
package com.nu.dto;
|
||||
|
||||
import lombok.Data;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
@Data
|
||||
public class DirectiveMQDto {
|
||||
private boolean izInc;//是否为增量 否则为全量
|
||||
private String orgCode;//机构编码
|
||||
private String idStr;
|
||||
private List<String> idList;
|
||||
}
|
|
@ -0,0 +1,9 @@
|
|||
package com.nu.dto;
|
||||
|
||||
import lombok.Data;
|
||||
|
||||
@Data
|
||||
public class StatusMQDto {
|
||||
int status;
|
||||
String message;
|
||||
}
|
|
@ -0,0 +1,53 @@
|
|||
package com.nu.enums;
|
||||
|
||||
import lombok.Getter;
|
||||
|
||||
/**
|
||||
* MQ消息处理状态枚举
|
||||
*/
|
||||
@Getter
|
||||
public enum MQStatus {
|
||||
|
||||
SUCCESS(200, "消息处理成功"),
|
||||
INVALID_PARAM(400, "请求参数不合法"),
|
||||
PROCESS_FAILED(500, "消息处理失败"),
|
||||
MAX_RETRY_EXCEEDED(1001, "已达到最大重试次数"),
|
||||
MESSAGE_FORMAT_ERROR(1002, "消息格式错误"),
|
||||
TIMEOUT(1003, "处理超时"),
|
||||
DUPLICATE_MESSAGE(1004, "消息重复消费"),
|
||||
UNKNOWN_ERROR(9999, "未知错误");
|
||||
|
||||
private final int code;
|
||||
private final String message;
|
||||
|
||||
MQStatus(int code, String message) {
|
||||
this.code = code;
|
||||
this.message = message;
|
||||
}
|
||||
|
||||
/**
|
||||
* 根据状态码查找枚举
|
||||
*/
|
||||
public static MQStatus findByCode(int code) {
|
||||
for (MQStatus status : values()) {
|
||||
if (status.code == code) {
|
||||
return status;
|
||||
}
|
||||
}
|
||||
return UNKNOWN_ERROR; // 默认返回未知错误
|
||||
}
|
||||
|
||||
/**
|
||||
* 判断是否成功状态
|
||||
*/
|
||||
public boolean isSuccess() {
|
||||
return this == SUCCESS;
|
||||
}
|
||||
|
||||
/**
|
||||
* 判断是否可重试错误
|
||||
*/
|
||||
public boolean isRetryable() {
|
||||
return this == PROCESS_FAILED || this == TIMEOUT;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,157 @@
|
|||
package com.nu.utils;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.amqp.core.*;
|
||||
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
|
||||
import org.springframework.amqp.rabbit.core.RabbitAdmin;
|
||||
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
||||
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
|
||||
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
|
||||
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
@Configuration
|
||||
public class RabbitMQUtil {
|
||||
private static final Logger logger = LoggerFactory.getLogger(RabbitMQUtil.class);
|
||||
|
||||
@Autowired
|
||||
private RabbitTemplate rabbitTemplate;
|
||||
|
||||
@Autowired
|
||||
private ConnectionFactory connectionFactory;
|
||||
|
||||
@Autowired
|
||||
private RabbitAdmin rabbitAdmin;
|
||||
|
||||
@Autowired
|
||||
private Jackson2JsonMessageConverter jsonMessageConverter;
|
||||
|
||||
// 初始化配置
|
||||
@PostConstruct
|
||||
public void init() {
|
||||
rabbitTemplate.setMessageConverter(jsonMessageConverter);
|
||||
}
|
||||
|
||||
// 基础消息操作
|
||||
|
||||
/**
|
||||
* 发送消息到指定队列(使用默认直接交换机)
|
||||
*
|
||||
* @param queueName 目标队列名称
|
||||
* @param message 消息内容(自动JSON序列化)
|
||||
*/
|
||||
public void sendToQueue(String queueName, Object message) {
|
||||
rabbitTemplate.convertAndSend("", queueName, message);
|
||||
}
|
||||
|
||||
/**
|
||||
* 发送消息到指定交换机
|
||||
*
|
||||
* @param exchange 交换机名称
|
||||
* @param routingKey 路由键
|
||||
* @param message 消息内容
|
||||
*/
|
||||
public void sendToExchange(String exchange, String routingKey, Object message) {
|
||||
rabbitTemplate.convertAndSend(exchange, routingKey, message);
|
||||
}
|
||||
|
||||
/**
|
||||
* 从队列接收消息(自动确认)
|
||||
*
|
||||
* @return 返回消息对象,队列为空时返回null
|
||||
*/
|
||||
public Object receiveFromQueue(String queueName) {
|
||||
return rabbitTemplate.receiveAndConvert(queueName);
|
||||
}
|
||||
|
||||
// 队列与交换机管理
|
||||
|
||||
/**
|
||||
* 创建持久化队列
|
||||
*
|
||||
* @param queueName 队列名称
|
||||
* @return 队列对象
|
||||
*/
|
||||
public Queue createQueue(String queueName) {
|
||||
try {
|
||||
Queue queue = new Queue(queueName, true, false, false);
|
||||
rabbitAdmin.declareQueue(queue);
|
||||
return queue;
|
||||
} catch (Exception e) {
|
||||
logger.error("队列[{}]创建失败", queueName, e);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 创建带死信队列的队列
|
||||
*
|
||||
* @param queueName 主队列名称
|
||||
* @param dlxExchange 死信交换机名称
|
||||
* @param dlxRoutingKey 死信路由键
|
||||
*/
|
||||
public Queue createQueueWithDLX(String queueName, String dlxExchange, String dlxRoutingKey) {
|
||||
try {
|
||||
Map<String, Object> args = new HashMap<>();
|
||||
args.put("x-dead-letter-exchange", dlxExchange);
|
||||
args.put("x-dead-letter-routing-key", dlxRoutingKey);
|
||||
Queue queue = new Queue(queueName, true, false, false, args);
|
||||
rabbitAdmin.declareQueue(queue);
|
||||
return queue;
|
||||
} catch (Exception e) {
|
||||
logger.error("带死信的队列[{}]创建失败", queueName, e);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 绑定队列到主题交换机
|
||||
*
|
||||
* @param queue 队列名称
|
||||
* @param exchange 交换机名称
|
||||
* @param routingKey 路由规则
|
||||
*/
|
||||
public void bindToTopicExchange(String queue, String exchange, String routingKey) {
|
||||
try {
|
||||
Binding binding = BindingBuilder.bind(new Queue(queue))
|
||||
.to(new TopicExchange(exchange)).with(routingKey);
|
||||
rabbitAdmin.declareBinding(binding);
|
||||
} catch (Exception e) {
|
||||
logger.error("队列[{}]绑定到交换机[{}]失败", queue, exchange, e);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
// 监听器管理
|
||||
|
||||
/**
|
||||
* 创建消息监听容器
|
||||
*
|
||||
* @param queueName 监听的队列
|
||||
* @param listener 消息处理器
|
||||
* @param concurrency 并发消费者数量
|
||||
*/
|
||||
public SimpleMessageListenerContainer createListener(
|
||||
String queueName,
|
||||
ChannelAwareMessageListener listener,
|
||||
int concurrency) {
|
||||
try {
|
||||
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
|
||||
container.addQueues(new Queue(queueName));
|
||||
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
|
||||
container.setMessageListener(listener);
|
||||
container.setConcurrentConsumers(concurrency);
|
||||
container.start();
|
||||
return container;
|
||||
} catch (Exception e) {
|
||||
logger.error("监听器[{}]创建失败", queueName, e);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -37,6 +37,13 @@
|
|||
<artifactId>pinyin4j</artifactId>
|
||||
<version>2.5.0</version>
|
||||
</dependency>
|
||||
|
||||
<!-- COMMON 通用工具模块 -->
|
||||
<dependency>
|
||||
<groupId>com.nursingunit.boot</groupId>
|
||||
<artifactId>nursing-unit-common</artifactId>
|
||||
<version>${nursingunit.version}</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
</project>
|
||||
|
|
|
@ -5,6 +5,7 @@ import com.baomidou.mybatisplus.core.metadata.IPage;
|
|||
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
|
||||
import com.nu.modules.directivetag.entity.DirectiveTag;
|
||||
import com.nu.modules.directivetag.service.IDirectiveTagService;
|
||||
import com.rabbitmq.client.impl.AMQImpl;
|
||||
import io.swagger.annotations.Api;
|
||||
import io.swagger.annotations.ApiOperation;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
@ -13,10 +14,14 @@ import org.jeecg.common.aspect.annotation.AutoLog;
|
|||
import org.jeecg.common.system.base.controller.JeecgController;
|
||||
import org.jeecg.common.system.query.QueryGenerator;
|
||||
import org.jeecg.common.system.query.QueryRuleEnum;
|
||||
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
||||
import org.springframework.amqp.support.AmqpHeaders;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.messaging.handler.annotation.Header;
|
||||
import org.springframework.web.bind.annotation.*;
|
||||
import org.springframework.web.servlet.ModelAndView;
|
||||
|
||||
import javax.servlet.DispatcherType;
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
import java.util.Arrays;
|
||||
|
|
|
@ -0,0 +1,35 @@
|
|||
package com.nu.mq.directive.exceptionhandler;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.amqp.AmqpRejectAndDontRequeueException;
|
||||
import org.springframework.amqp.core.Message;
|
||||
import org.springframework.amqp.rabbit.listener.api.RabbitListenerErrorHandler;
|
||||
import org.springframework.amqp.rabbit.support.ListenerExecutionFailedException;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
@Slf4j
|
||||
@Component("directiveMQErrorHandler")
|
||||
public class DirectiveMQExceptionHandler implements RabbitListenerErrorHandler {
|
||||
|
||||
@Override
|
||||
public Object handleError(Message message, org.springframework.messaging.Message<?> message1, ListenerExecutionFailedException e) {
|
||||
log.error("MQ消息处理失败 | 消息体: {} | 异常原因: {}", new String(message.getBody()), e.getCause().getMessage());
|
||||
|
||||
// 根据异常类型选择处理策略
|
||||
if (isRetryable(e)) {
|
||||
// 可重试异常:抛出异常触发重试
|
||||
throw e;
|
||||
} else {
|
||||
// 不可恢复异常:拒绝消息且不重新入队
|
||||
throw new AmqpRejectAndDontRequeueException("消息处理失败且禁止重试", e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 判断异常是否可重试
|
||||
*/
|
||||
private boolean isRetryable(ListenerExecutionFailedException e) {
|
||||
// 示例:网络异常、数据库临时锁超时可重试
|
||||
return e.getCause() instanceof RuntimeException; // 根据实际业务调整
|
||||
}
|
||||
}
|
|
@ -0,0 +1,33 @@
|
|||
package com.nu.mq.directive.listener;
|
||||
|
||||
import com.nu.dto.DirectiveMQDto;
|
||||
import com.nu.dto.StatusMQDto;
|
||||
import com.nu.enums.MQStatus;
|
||||
import com.nu.utils.RabbitMQUtil;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
@Slf4j
|
||||
@Component
|
||||
public class DirectiveMQListener {
|
||||
|
||||
@Autowired
|
||||
private RabbitMQUtil rabbitMQUtil;
|
||||
|
||||
@RabbitListener(queues = "nu001.fwzl.async", errorHandler = "directiveMQErrorHandler")
|
||||
public void handleMessage(DirectiveMQDto msg) {
|
||||
try {
|
||||
|
||||
StatusMQDto statusMQDto = new StatusMQDto();
|
||||
statusMQDto.setStatus(MQStatus.SUCCESS.getCode());
|
||||
statusMQDto.setMessage("成了!");
|
||||
rabbitMQUtil.sendToExchange("hldy.fwzl", "nu001.fwzl.status", statusMQDto);
|
||||
} catch (Exception e) {
|
||||
System.out.println("异常了:" + e.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
|
@ -12,6 +12,12 @@
|
|||
<artifactId>nu-system-start</artifactId>
|
||||
|
||||
<dependencies>
|
||||
<!-- COMMON 通用工具模块 -->
|
||||
<dependency>
|
||||
<groupId>com.nursingunit.boot</groupId>
|
||||
<artifactId>nursing-unit-common</artifactId>
|
||||
<version>${nursingunit.version}</version>
|
||||
</dependency>
|
||||
<!-- SYSTEM 系统管理模块 -->
|
||||
<dependency>
|
||||
<groupId>com.nursingunit.boot</groupId>
|
||||
|
|
|
@ -166,16 +166,15 @@ spring:
|
|||
slow-sql-millis: 5000
|
||||
datasource:
|
||||
master:
|
||||
url: jdbc:mysql://1.92.152.160:33061/nursing_unit_001?characterEncoding=UTF-8&useUnicode=true&useSSL=false&tinyInt1isBit=false&allowPublicKeyRetrieval=true&serverTimezone=Asia/Shanghai
|
||||
# url: jdbc:mysql://localhost:3306/nursing_unit_001?characterEncoding=UTF-8&useUnicode=true&useSSL=false&tinyInt1isBit=false&allowPublicKeyRetrieval=true&serverTimezone=Asia/Shanghai
|
||||
username: root
|
||||
password: root
|
||||
url: jdbc:mysql://192.168.2.199:3306/nursing_unit_001?characterEncoding=UTF-8&useUnicode=true&useSSL=false&tinyInt1isBit=false&allowPublicKeyRetrieval=true&serverTimezone=Asia/Shanghai
|
||||
username: nu001
|
||||
password: nu001
|
||||
driver-class-name: com.mysql.cj.jdbc.Driver
|
||||
# 多数据源配置-运维系统
|
||||
multi-datasource1:
|
||||
url: jdbc:mysql://1.92.152.160:33061/nursing_unit?characterEncoding=UTF-8&useUnicode=true&useSSL=false&tinyInt1isBit=false&allowPublicKeyRetrieval=true&serverTimezone=Asia/Shanghai
|
||||
username: root
|
||||
password: root
|
||||
url: jdbc:mysql://192.168.2.199:3306/nursing_unit?characterEncoding=UTF-8&useUnicode=true&useSSL=false&tinyInt1isBit=false&allowPublicKeyRetrieval=true&serverTimezone=Asia/Shanghai
|
||||
username: nu_sys
|
||||
password: nu_sys
|
||||
driver-class-name: com.mysql.cj.jdbc.Driver
|
||||
#redis 配置
|
||||
redis:
|
||||
|
@ -183,6 +182,14 @@ spring:
|
|||
host: 127.0.0.1
|
||||
port: 6379
|
||||
password:
|
||||
#rabbitmq 配置
|
||||
rabbitmq:
|
||||
host: 192.168.2.199
|
||||
prot: 5672
|
||||
username: hldy
|
||||
password: hldy
|
||||
virtual-host: /hldy
|
||||
|
||||
#mybatis plus 设置
|
||||
mybatis-plus:
|
||||
mapper-locations: classpath*:org/jeecg/**/xml/*Mapper.xml,classpath*:com/nu/**/xml/*Mapper.xml
|
||||
|
|
15
pom.xml
15
pom.xml
|
@ -68,6 +68,8 @@
|
|||
</properties>
|
||||
|
||||
<modules>
|
||||
<!-- COMMON 通用工具模块 -->
|
||||
<module>nursing-unit-common</module>
|
||||
<!-- 框架基础包模块 -->
|
||||
<module>nursing-unit-base-core</module>
|
||||
<!-- 框架demo功能模块 -->
|
||||
|
@ -161,6 +163,12 @@
|
|||
<!-- <groupId>com.alibaba.cloud</groupId>-->
|
||||
<!-- <artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>-->
|
||||
<!-- </dependency>-->
|
||||
|
||||
<!-- RabbitMQ -->
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-amqp</artifactId>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<dependencyManagement>
|
||||
|
@ -187,6 +195,13 @@
|
|||
<version>${seata.version}</version>
|
||||
</dependency>
|
||||
|
||||
<!-- COMMON 通用工具模块 -->
|
||||
<dependency>
|
||||
<groupId>com.nursingunit.boot</groupId>
|
||||
<artifactId>nursing-unit-common</artifactId>
|
||||
<version>${nursingunit.version}</version>
|
||||
</dependency>
|
||||
|
||||
<!-- system 模块-->
|
||||
<dependency>
|
||||
<groupId>com.nursingunit.boot</groupId>
|
||||
|
|
Loading…
Reference in New Issue