diff --git a/nursing-unit-common/pom.xml b/nursing-unit-common/pom.xml new file mode 100644 index 0000000..995b5fc --- /dev/null +++ b/nursing-unit-common/pom.xml @@ -0,0 +1,22 @@ + + + + com.nursingunit.boot + nursing-unit-parent + 2.0.0 + + 通用工具模块 + 4.0.0 + nursing-unit-common + + + + com.nursingunit.boot + nursing-unit-base-core + 2.0.0 + + + + diff --git a/nursing-unit-common/src/main/java/com/nu/config/RabbitMQConfig.java b/nursing-unit-common/src/main/java/com/nu/config/RabbitMQConfig.java new file mode 100644 index 0000000..2d30697 --- /dev/null +++ b/nursing-unit-common/src/main/java/com/nu/config/RabbitMQConfig.java @@ -0,0 +1,68 @@ +package com.nu.config; + +import org.springframework.amqp.core.*; +import org.springframework.amqp.rabbit.connection.ConnectionFactory; +import org.springframework.amqp.rabbit.core.RabbitAdmin; +import org.springframework.amqp.support.converter.Jackson2JavaTypeMapper; +import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration +public class RabbitMQConfig { + + @Bean + public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) { + return new RabbitAdmin(connectionFactory); + } + + /** + * JSON消息转换器 + */ + @Bean + public Jackson2JsonMessageConverter jsonMessageConverter() { + Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter(); + converter.setTypePrecedence(Jackson2JavaTypeMapper.TypePrecedence.TYPE_ID); + return converter; + } + + // 交换器(以Topic为例) + @Bean + public DirectExchange fwzlExchange() { + return new DirectExchange("hldy.fwzl"); + } + + // 队列 + @Bean + public Queue nu001FwzlAsyncQueue() { + return new Queue("nu001.fwzl.async", true); + } + @Bean + public Queue nu001FwzlStatusQueue() { + return new Queue("nu001.fwzl.status", true); + } + @Bean + public Queue nu002FwzlAsyncQueue() { + return new Queue("nu002.fwzl.async", true); + } + @Bean + public Queue nu002FwzlStatusQueue() { + return new Queue("nu002.fwzl.status", true); + } + @Bean + public Binding binding1(Queue nu001FwzlAsyncQueue, DirectExchange fwzlExchange) { + return BindingBuilder.bind(nu001FwzlAsyncQueue).to(fwzlExchange).with("nu001.fwzl.async"); + } + @Bean + public Binding binding2(Queue nu001FwzlStatusQueue, DirectExchange fwzlExchange) { + return BindingBuilder.bind(nu001FwzlStatusQueue).to(fwzlExchange).with("nu001.fwzl.status"); + } + @Bean + public Binding binding3(Queue nu002FwzlAsyncQueue, DirectExchange fwzlExchange) { + return BindingBuilder.bind(nu002FwzlAsyncQueue).to(fwzlExchange).with("nu002.fwzl.async"); + } + @Bean + public Binding binding4(Queue nu002FwzlStatusQueue, DirectExchange fwzlExchange) { + return BindingBuilder.bind(nu002FwzlStatusQueue).to(fwzlExchange).with("nu002.fwzl.status"); + } +} diff --git a/nursing-unit-common/src/main/java/com/nu/dto/DirectiveMQDto.java b/nursing-unit-common/src/main/java/com/nu/dto/DirectiveMQDto.java new file mode 100644 index 0000000..42376ba --- /dev/null +++ b/nursing-unit-common/src/main/java/com/nu/dto/DirectiveMQDto.java @@ -0,0 +1,13 @@ +package com.nu.dto; + +import lombok.Data; + +import java.util.List; + +@Data +public class DirectiveMQDto { + private boolean izInc;//是否为增量 否则为全量 + private String orgCode;//机构编码 + private String idStr; + private List idList; +} diff --git a/nursing-unit-common/src/main/java/com/nu/dto/StatusMQDto.java b/nursing-unit-common/src/main/java/com/nu/dto/StatusMQDto.java new file mode 100644 index 0000000..c78a7c1 --- /dev/null +++ b/nursing-unit-common/src/main/java/com/nu/dto/StatusMQDto.java @@ -0,0 +1,9 @@ +package com.nu.dto; + +import lombok.Data; + +@Data +public class StatusMQDto { + int status; + String message; +} diff --git a/nursing-unit-common/src/main/java/com/nu/enums/MQStatus.java b/nursing-unit-common/src/main/java/com/nu/enums/MQStatus.java new file mode 100644 index 0000000..f322784 --- /dev/null +++ b/nursing-unit-common/src/main/java/com/nu/enums/MQStatus.java @@ -0,0 +1,53 @@ +package com.nu.enums; + +import lombok.Getter; + +/** + * MQ消息处理状态枚举 + */ +@Getter +public enum MQStatus { + + SUCCESS(200, "消息处理成功"), + INVALID_PARAM(400, "请求参数不合法"), + PROCESS_FAILED(500, "消息处理失败"), + MAX_RETRY_EXCEEDED(1001, "已达到最大重试次数"), + MESSAGE_FORMAT_ERROR(1002, "消息格式错误"), + TIMEOUT(1003, "处理超时"), + DUPLICATE_MESSAGE(1004, "消息重复消费"), + UNKNOWN_ERROR(9999, "未知错误"); + + private final int code; + private final String message; + + MQStatus(int code, String message) { + this.code = code; + this.message = message; + } + + /** + * 根据状态码查找枚举 + */ + public static MQStatus findByCode(int code) { + for (MQStatus status : values()) { + if (status.code == code) { + return status; + } + } + return UNKNOWN_ERROR; // 默认返回未知错误 + } + + /** + * 判断是否成功状态 + */ + public boolean isSuccess() { + return this == SUCCESS; + } + + /** + * 判断是否可重试错误 + */ + public boolean isRetryable() { + return this == PROCESS_FAILED || this == TIMEOUT; + } +} diff --git a/nursing-unit-common/src/main/java/com/nu/utils/RabbitMQUtil.java b/nursing-unit-common/src/main/java/com/nu/utils/RabbitMQUtil.java new file mode 100644 index 0000000..f33497f --- /dev/null +++ b/nursing-unit-common/src/main/java/com/nu/utils/RabbitMQUtil.java @@ -0,0 +1,157 @@ +package com.nu.utils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.amqp.core.*; +import org.springframework.amqp.rabbit.connection.ConnectionFactory; +import org.springframework.amqp.rabbit.core.RabbitAdmin; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; +import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener; +import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Configuration; + +import javax.annotation.PostConstruct; +import java.util.HashMap; +import java.util.Map; + +@Configuration +public class RabbitMQUtil { + private static final Logger logger = LoggerFactory.getLogger(RabbitMQUtil.class); + + @Autowired + private RabbitTemplate rabbitTemplate; + + @Autowired + private ConnectionFactory connectionFactory; + + @Autowired + private RabbitAdmin rabbitAdmin; + + @Autowired + private Jackson2JsonMessageConverter jsonMessageConverter; + + // 初始化配置 + @PostConstruct + public void init() { + rabbitTemplate.setMessageConverter(jsonMessageConverter); + } + + // 基础消息操作 + + /** + * 发送消息到指定队列(使用默认直接交换机) + * + * @param queueName 目标队列名称 + * @param message 消息内容(自动JSON序列化) + */ + public void sendToQueue(String queueName, Object message) { + rabbitTemplate.convertAndSend("", queueName, message); + } + + /** + * 发送消息到指定交换机 + * + * @param exchange 交换机名称 + * @param routingKey 路由键 + * @param message 消息内容 + */ + public void sendToExchange(String exchange, String routingKey, Object message) { + rabbitTemplate.convertAndSend(exchange, routingKey, message); + } + + /** + * 从队列接收消息(自动确认) + * + * @return 返回消息对象,队列为空时返回null + */ + public Object receiveFromQueue(String queueName) { + return rabbitTemplate.receiveAndConvert(queueName); + } + + // 队列与交换机管理 + + /** + * 创建持久化队列 + * + * @param queueName 队列名称 + * @return 队列对象 + */ + public Queue createQueue(String queueName) { + try { + Queue queue = new Queue(queueName, true, false, false); + rabbitAdmin.declareQueue(queue); + return queue; + } catch (Exception e) { + logger.error("队列[{}]创建失败", queueName, e); + throw e; + } + } + + /** + * 创建带死信队列的队列 + * + * @param queueName 主队列名称 + * @param dlxExchange 死信交换机名称 + * @param dlxRoutingKey 死信路由键 + */ + public Queue createQueueWithDLX(String queueName, String dlxExchange, String dlxRoutingKey) { + try { + Map args = new HashMap<>(); + args.put("x-dead-letter-exchange", dlxExchange); + args.put("x-dead-letter-routing-key", dlxRoutingKey); + Queue queue = new Queue(queueName, true, false, false, args); + rabbitAdmin.declareQueue(queue); + return queue; + } catch (Exception e) { + logger.error("带死信的队列[{}]创建失败", queueName, e); + throw e; + } + } + + /** + * 绑定队列到主题交换机 + * + * @param queue 队列名称 + * @param exchange 交换机名称 + * @param routingKey 路由规则 + */ + public void bindToTopicExchange(String queue, String exchange, String routingKey) { + try { + Binding binding = BindingBuilder.bind(new Queue(queue)) + .to(new TopicExchange(exchange)).with(routingKey); + rabbitAdmin.declareBinding(binding); + } catch (Exception e) { + logger.error("队列[{}]绑定到交换机[{}]失败", queue, exchange, e); + throw e; + } + } + + // 监听器管理 + + /** + * 创建消息监听容器 + * + * @param queueName 监听的队列 + * @param listener 消息处理器 + * @param concurrency 并发消费者数量 + */ + public SimpleMessageListenerContainer createListener( + String queueName, + ChannelAwareMessageListener listener, + int concurrency) { + try { + SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); + container.addQueues(new Queue(queueName)); + container.setAcknowledgeMode(AcknowledgeMode.MANUAL); + container.setMessageListener(listener); + container.setConcurrentConsumers(concurrency); + container.start(); + return container; + } catch (Exception e) { + logger.error("监听器[{}]创建失败", queueName, e); + throw e; + } + } +} diff --git a/nursing-unit-services/nu-services-biz/pom.xml b/nursing-unit-services/nu-services-biz/pom.xml index eb55e93..5ac98e6 100644 --- a/nursing-unit-services/nu-services-biz/pom.xml +++ b/nursing-unit-services/nu-services-biz/pom.xml @@ -37,6 +37,13 @@ pinyin4j 2.5.0 + + + + com.nursingunit.boot + nursing-unit-common + ${nursingunit.version} + diff --git a/nursing-unit-services/nu-services-biz/src/main/java/com/nu/modules/directivetag/controller/DirectiveTagController.java b/nursing-unit-services/nu-services-biz/src/main/java/com/nu/modules/directivetag/controller/DirectiveTagController.java index a2ffdf6..93950ed 100644 --- a/nursing-unit-services/nu-services-biz/src/main/java/com/nu/modules/directivetag/controller/DirectiveTagController.java +++ b/nursing-unit-services/nu-services-biz/src/main/java/com/nu/modules/directivetag/controller/DirectiveTagController.java @@ -5,6 +5,7 @@ import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.nu.modules.directivetag.entity.DirectiveTag; import com.nu.modules.directivetag.service.IDirectiveTagService; +import com.rabbitmq.client.impl.AMQImpl; import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; import lombok.extern.slf4j.Slf4j; @@ -13,10 +14,14 @@ import org.jeecg.common.aspect.annotation.AutoLog; import org.jeecg.common.system.base.controller.JeecgController; import org.jeecg.common.system.query.QueryGenerator; import org.jeecg.common.system.query.QueryRuleEnum; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.amqp.support.AmqpHeaders; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.messaging.handler.annotation.Header; import org.springframework.web.bind.annotation.*; import org.springframework.web.servlet.ModelAndView; +import javax.servlet.DispatcherType; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import java.util.Arrays; diff --git a/nursing-unit-services/nu-services-biz/src/main/java/com/nu/mq/directive/exceptionhandler/DirectiveMQExceptionHandler.java b/nursing-unit-services/nu-services-biz/src/main/java/com/nu/mq/directive/exceptionhandler/DirectiveMQExceptionHandler.java new file mode 100644 index 0000000..549a827 --- /dev/null +++ b/nursing-unit-services/nu-services-biz/src/main/java/com/nu/mq/directive/exceptionhandler/DirectiveMQExceptionHandler.java @@ -0,0 +1,35 @@ +package com.nu.mq.directive.exceptionhandler; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.AmqpRejectAndDontRequeueException; +import org.springframework.amqp.core.Message; +import org.springframework.amqp.rabbit.listener.api.RabbitListenerErrorHandler; +import org.springframework.amqp.rabbit.support.ListenerExecutionFailedException; +import org.springframework.stereotype.Component; + +@Slf4j +@Component("directiveMQErrorHandler") +public class DirectiveMQExceptionHandler implements RabbitListenerErrorHandler { + + @Override + public Object handleError(Message message, org.springframework.messaging.Message message1, ListenerExecutionFailedException e) { + log.error("MQ消息处理失败 | 消息体: {} | 异常原因: {}", new String(message.getBody()), e.getCause().getMessage()); + + // 根据异常类型选择处理策略 + if (isRetryable(e)) { + // 可重试异常:抛出异常触发重试 + throw e; + } else { + // 不可恢复异常:拒绝消息且不重新入队 + throw new AmqpRejectAndDontRequeueException("消息处理失败且禁止重试", e); + } + } + + /** + * 判断异常是否可重试 + */ + private boolean isRetryable(ListenerExecutionFailedException e) { + // 示例:网络异常、数据库临时锁超时可重试 + return e.getCause() instanceof RuntimeException; // 根据实际业务调整 + } +} diff --git a/nursing-unit-services/nu-services-biz/src/main/java/com/nu/mq/directive/listener/DirectiveMQListener.java b/nursing-unit-services/nu-services-biz/src/main/java/com/nu/mq/directive/listener/DirectiveMQListener.java new file mode 100644 index 0000000..777e4ac --- /dev/null +++ b/nursing-unit-services/nu-services-biz/src/main/java/com/nu/mq/directive/listener/DirectiveMQListener.java @@ -0,0 +1,33 @@ +package com.nu.mq.directive.listener; + +import com.nu.dto.DirectiveMQDto; +import com.nu.dto.StatusMQDto; +import com.nu.enums.MQStatus; +import com.nu.utils.RabbitMQUtil; +import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import java.util.List; + +@Slf4j +@Component +public class DirectiveMQListener { + + @Autowired + private RabbitMQUtil rabbitMQUtil; + + @RabbitListener(queues = "nu001.fwzl.async", errorHandler = "directiveMQErrorHandler") + public void handleMessage(DirectiveMQDto msg) { + try { + + StatusMQDto statusMQDto = new StatusMQDto(); + statusMQDto.setStatus(MQStatus.SUCCESS.getCode()); + statusMQDto.setMessage("成了!"); + rabbitMQUtil.sendToExchange("hldy.fwzl", "nu001.fwzl.status", statusMQDto); + } catch (Exception e) { + System.out.println("异常了:" + e.getMessage()); + } + } +} diff --git a/nursing-unit-system/nu-system-start/pom.xml b/nursing-unit-system/nu-system-start/pom.xml index 7cdec20..00dd953 100644 --- a/nursing-unit-system/nu-system-start/pom.xml +++ b/nursing-unit-system/nu-system-start/pom.xml @@ -12,6 +12,12 @@ nu-system-start + + + com.nursingunit.boot + nursing-unit-common + ${nursingunit.version} + com.nursingunit.boot diff --git a/nursing-unit-system/nu-system-start/src/main/resources/application-dev.yml b/nursing-unit-system/nu-system-start/src/main/resources/application-dev.yml index 0085e5a..f67a39c 100644 --- a/nursing-unit-system/nu-system-start/src/main/resources/application-dev.yml +++ b/nursing-unit-system/nu-system-start/src/main/resources/application-dev.yml @@ -166,16 +166,15 @@ spring: slow-sql-millis: 5000 datasource: master: - url: jdbc:mysql://1.92.152.160:33061/nursing_unit_001?characterEncoding=UTF-8&useUnicode=true&useSSL=false&tinyInt1isBit=false&allowPublicKeyRetrieval=true&serverTimezone=Asia/Shanghai -# url: jdbc:mysql://localhost:3306/nursing_unit_001?characterEncoding=UTF-8&useUnicode=true&useSSL=false&tinyInt1isBit=false&allowPublicKeyRetrieval=true&serverTimezone=Asia/Shanghai - username: root - password: root + url: jdbc:mysql://192.168.2.199:3306/nursing_unit_001?characterEncoding=UTF-8&useUnicode=true&useSSL=false&tinyInt1isBit=false&allowPublicKeyRetrieval=true&serverTimezone=Asia/Shanghai + username: nu001 + password: nu001 driver-class-name: com.mysql.cj.jdbc.Driver # 多数据源配置-运维系统 multi-datasource1: - url: jdbc:mysql://1.92.152.160:33061/nursing_unit?characterEncoding=UTF-8&useUnicode=true&useSSL=false&tinyInt1isBit=false&allowPublicKeyRetrieval=true&serverTimezone=Asia/Shanghai - username: root - password: root + url: jdbc:mysql://192.168.2.199:3306/nursing_unit?characterEncoding=UTF-8&useUnicode=true&useSSL=false&tinyInt1isBit=false&allowPublicKeyRetrieval=true&serverTimezone=Asia/Shanghai + username: nu_sys + password: nu_sys driver-class-name: com.mysql.cj.jdbc.Driver #redis 配置 redis: @@ -183,6 +182,14 @@ spring: host: 127.0.0.1 port: 6379 password: + #rabbitmq 配置 + rabbitmq: + host: 192.168.2.199 + prot: 5672 + username: hldy + password: hldy + virtual-host: /hldy + #mybatis plus 设置 mybatis-plus: mapper-locations: classpath*:org/jeecg/**/xml/*Mapper.xml,classpath*:com/nu/**/xml/*Mapper.xml diff --git a/pom.xml b/pom.xml index 7efb21c..bfafb89 100644 --- a/pom.xml +++ b/pom.xml @@ -68,6 +68,8 @@ + + nursing-unit-common nursing-unit-base-core @@ -161,6 +163,12 @@ + + + + org.springframework.boot + spring-boot-starter-amqp + @@ -187,6 +195,13 @@ ${seata.version} + + + com.nursingunit.boot + nursing-unit-common + ${nursingunit.version} + + com.nursingunit.boot