From ef3ae576ba1474b48e4f8850e1bc465da88a6780 Mon Sep 17 00:00:00 2001 From: cuizhibin Date: Tue, 22 Apr 2025 17:34:24 +0800 Subject: [PATCH] =?UTF-8?q?1.=E6=96=B0=E5=A2=9Emq=E7=9B=B8=E5=85=B3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- core/pom.xml | 5 +- data-bus/pom.xml | 32 +++++++++ .../dite/znpt/data/bus/config/MQConfig.java | 67 +++++++++++++++++++ .../dite/znpt/data/bus/config/MQHandle.java | 20 ++++++ .../data/bus/config/entity/CommonMessage.java | 9 +++ .../bus/config/entity/req/ProjectDTO.java | 4 ++ .../data/bus/config/enums/MQTypeEnum.java | 28 ++++++++ .../bus/config/handles/ProjectHandleImpl.java | 31 +++++++++ pom.xml | 5 +- sip/pom.xml | 1 - 10 files changed, 196 insertions(+), 6 deletions(-) create mode 100644 data-bus/pom.xml create mode 100644 data-bus/src/main/java/com/dite/znpt/data/bus/config/MQConfig.java create mode 100644 data-bus/src/main/java/com/dite/znpt/data/bus/config/MQHandle.java create mode 100644 data-bus/src/main/java/com/dite/znpt/data/bus/config/entity/CommonMessage.java create mode 100644 data-bus/src/main/java/com/dite/znpt/data/bus/config/entity/req/ProjectDTO.java create mode 100644 data-bus/src/main/java/com/dite/znpt/data/bus/config/enums/MQTypeEnum.java create mode 100644 data-bus/src/main/java/com/dite/znpt/data/bus/config/handles/ProjectHandleImpl.java diff --git a/core/pom.xml b/core/pom.xml index 0442ebb..93340d9 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -35,7 +35,6 @@ org.projectlombok lombok - true org.springframework.boot @@ -143,8 +142,8 @@ maven-compiler-plugin 3.1 - 8 - 8 + 17 + 17 diff --git a/data-bus/pom.xml b/data-bus/pom.xml new file mode 100644 index 0000000..d296946 --- /dev/null +++ b/data-bus/pom.xml @@ -0,0 +1,32 @@ + + + 4.0.0 + + com.dite.znpt + parent + 1.0.0-SNAPSHOT + + + org.dite.znpt + data-bus + + + 17 + 17 + UTF-8 + + + + + com.dite.znpt + core + 1.0.0-SNAPSHOT + + + org.springframework.boot + spring-boot-starter-amqp + + + \ No newline at end of file diff --git a/data-bus/src/main/java/com/dite/znpt/data/bus/config/MQConfig.java b/data-bus/src/main/java/com/dite/znpt/data/bus/config/MQConfig.java new file mode 100644 index 0000000..a1abbe2 --- /dev/null +++ b/data-bus/src/main/java/com/dite/znpt/data/bus/config/MQConfig.java @@ -0,0 +1,67 @@ +package com.dite.znpt.data.bus.config; + +import cn.hutool.extra.spring.SpringUtil; +import com.dite.znpt.data.bus.config.enums.MQTypeEnum; +import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.core.*; +import org.springframework.amqp.rabbit.connection.ConnectionFactory; +import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; +import java.util.Arrays; +import java.util.List; + +@Slf4j +@Component +public class MQConfig { + @Autowired + private ConnectionFactory connectionFactory; + + @PostConstruct + public void registerListeners() { + + for (MQTypeEnum typeEnum : MQTypeEnum.values()) { + String queueName = typeEnum.getType() + ".queue"; + + SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); + container.setConnectionFactory(connectionFactory); + container.setQueueNames(queueName); + container.setMessageListener(message -> { + String body = new String(message.getBody()); + log.debug("队列: " + queueName + " 收到消息: " + body); + try { + SpringUtil.getBean(typeEnum.getImpl()).handle(body); + } catch (Exception e) { + log.error("mq队列处理出错", e); + } + }); + // 启动监听 + container.start(); + } + } + + @Bean + public TopicExchange topicExchange() { + return new TopicExchange("data.topic.exchange"); + } + + @Bean + public List declareQueues() { + return Arrays.stream(MQTypeEnum.values()) + .map(type -> new Queue(type + ".queue", true)) + .toList(); + } + + @Bean + public List declareBindings(TopicExchange topicExchange) { + return Arrays.stream(MQTypeEnum.values()) + .map(type -> BindingBuilder + .bind(new Queue(type + ".queue")) + .to(topicExchange) + .with(type + ".*")) + .toList(); + } +} diff --git a/data-bus/src/main/java/com/dite/znpt/data/bus/config/MQHandle.java b/data-bus/src/main/java/com/dite/znpt/data/bus/config/MQHandle.java new file mode 100644 index 0000000..21e7af3 --- /dev/null +++ b/data-bus/src/main/java/com/dite/znpt/data/bus/config/MQHandle.java @@ -0,0 +1,20 @@ +package com.dite.znpt.data.bus.config; + +import com.fasterxml.jackson.core.JsonProcessingException; + +/** + * @author cuizhibin + * @date 2025/04/22 17:03 + * @description mq处理接口 + */ +public interface MQHandle { + + /** + * 功能描述:mq处理类 + * + * @param msg 消息 + * @author cuizhibin + * @date 2025/04/22 17:03 + **/ + void handle(String msg) throws JsonProcessingException; +} diff --git a/data-bus/src/main/java/com/dite/znpt/data/bus/config/entity/CommonMessage.java b/data-bus/src/main/java/com/dite/znpt/data/bus/config/entity/CommonMessage.java new file mode 100644 index 0000000..91fb0f2 --- /dev/null +++ b/data-bus/src/main/java/com/dite/znpt/data/bus/config/entity/CommonMessage.java @@ -0,0 +1,9 @@ +package com.dite.znpt.data.bus.config.entity; + +import lombok.Data; + +@Data +public class CommonMessage { + private String operation; + private T payload; +} \ No newline at end of file diff --git a/data-bus/src/main/java/com/dite/znpt/data/bus/config/entity/req/ProjectDTO.java b/data-bus/src/main/java/com/dite/znpt/data/bus/config/entity/req/ProjectDTO.java new file mode 100644 index 0000000..198efeb --- /dev/null +++ b/data-bus/src/main/java/com/dite/znpt/data/bus/config/entity/req/ProjectDTO.java @@ -0,0 +1,4 @@ +package com.dite.znpt.data.bus.config.entity.req; + +public class ProjectDTO { +} diff --git a/data-bus/src/main/java/com/dite/znpt/data/bus/config/enums/MQTypeEnum.java b/data-bus/src/main/java/com/dite/znpt/data/bus/config/enums/MQTypeEnum.java new file mode 100644 index 0000000..b83bdf2 --- /dev/null +++ b/data-bus/src/main/java/com/dite/znpt/data/bus/config/enums/MQTypeEnum.java @@ -0,0 +1,28 @@ +package com.dite.znpt.data.bus.config.enums; + +import com.dite.znpt.data.bus.config.MQHandle; +import com.dite.znpt.data.bus.config.handles.ProjectHandleImpl; +import lombok.Getter; + +/** + * @author cuizhibin + * @date 2025/04/22 17:05 + * @description mq类型枚举 + */ +@Getter +public enum MQTypeEnum { + PROJECT("project", "项目", ProjectHandleImpl.class), + CREW("crew", "机组", ProjectHandleImpl.class), + PARTS("parts", "部件", ProjectHandleImpl.class), + ; + + private final String type; + private final String desc; + private final Class impl; + + MQTypeEnum(String type, String desc, Class impl) { + this.type = type; + this.desc = desc; + this.impl = impl; + } +} diff --git a/data-bus/src/main/java/com/dite/znpt/data/bus/config/handles/ProjectHandleImpl.java b/data-bus/src/main/java/com/dite/znpt/data/bus/config/handles/ProjectHandleImpl.java new file mode 100644 index 0000000..9891788 --- /dev/null +++ b/data-bus/src/main/java/com/dite/znpt/data/bus/config/handles/ProjectHandleImpl.java @@ -0,0 +1,31 @@ +package com.dite.znpt.data.bus.config.handles; + +import cn.hutool.json.JSONUtil; +import com.dite.znpt.data.bus.config.MQHandle; +import com.dite.znpt.data.bus.config.entity.CommonMessage; +import com.dite.znpt.data.bus.config.entity.req.ProjectDTO; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.springframework.stereotype.Service; + +/** + * @author cuizhibin + * @date 2025/04/22 17:11 + * @description 项目处理 + */ +@Service +public class ProjectHandleImpl implements MQHandle { + /** + * 功能描述:mq处理类 + * + * @param msg 消息 + * @author cuizhibin + * @date 2025/04/22 17:03 + **/ + @Override + public void handle(String msg) throws JsonProcessingException { + ObjectMapper objectMapper = new ObjectMapper(); + CommonMessage projectMsg = objectMapper.readValue(msg, + objectMapper.getTypeFactory().constructParametricType(CommonMessage.class, ProjectDTO.class)); + } +} diff --git a/pom.xml b/pom.xml index 2eb257b..e1ec815 100644 --- a/pom.xml +++ b/pom.xml @@ -14,6 +14,7 @@ core sip web + data-bus pom @@ -43,8 +44,8 @@ maven-compiler-plugin 3.1 - 8 - 8 + 17 + 17 diff --git a/sip/pom.xml b/sip/pom.xml index 5072c7a..7e62790 100644 --- a/sip/pom.xml +++ b/sip/pom.xml @@ -27,7 +27,6 @@ org.projectlombok lombok - true