1.增加数据同步模块,封装rabbitmq消息解析

This commit is contained in:
cuizhibin 2025-04-23 13:27:03 +08:00
parent ef3ae576ba
commit 5ba3760b67
143 changed files with 165 additions and 247 deletions

5
.gitignore vendored
View File

@ -1,5 +1,8 @@
HELP.md
demo/target/
core/target/
data-bus/target/
sip/target/
web/target/
!.mvn/wrapper/maven-wrapper.jar
!**/src/main/**/target/
!**/src/test/**/target/

View File

@ -9,8 +9,8 @@
<version>1.0.0-SNAPSHOT</version>
</parent>
<groupId>org.dite.znpt</groupId>
<artifactId>data-bus</artifactId>
<version>1.0.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>17</maven.compiler.source>

View File

@ -1,17 +1,19 @@
package com.dite.znpt.data.bus.config;
import cn.hutool.extra.spring.SpringUtil;
import com.dite.znpt.data.bus.config.enums.MQTypeEnum;
import com.dite.znpt.data.bus.entity.CommonMessage;
import com.dite.znpt.data.bus.enums.MQTypeEnum;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.List;
@Slf4j
@ -20,9 +22,39 @@ public class MQConfig {
@Autowired
private ConnectionFactory connectionFactory;
@PostConstruct
public void registerListeners() {
@Bean
public RabbitAdmin rabbitAdmin() {
RabbitAdmin admin = new RabbitAdmin(connectionFactory);
admin.setAutoStartup(true);
return admin;
}
@Bean
public TopicExchange topicExchange() {
return new TopicExchange("data.topic.exchange");
}
@Bean
public Declarables declarables(TopicExchange exchange) {
List<Declarable> declarables = new ArrayList<>();
declarables.add(exchange);
for (MQTypeEnum typeEnum : MQTypeEnum.values()) {
String queueName = typeEnum.getType() + ".queue";
Queue queue = new Queue(queueName, true);
declarables.add(queue);
Binding binding = BindingBuilder.bind(queue)
.to(exchange)
.with(typeEnum.getType() + ".*");
declarables.add(binding);
}
return new Declarables(declarables);
}
@Bean
public List<SimpleMessageListenerContainer> registerListeners(Declarables declarables) {
List<SimpleMessageListenerContainer> messageListeners = new ArrayList<>();
for (MQTypeEnum typeEnum : MQTypeEnum.values()) {
String queueName = typeEnum.getType() + ".queue";
@ -33,35 +65,26 @@ public class MQConfig {
String body = new String(message.getBody());
log.debug("队列: " + queueName + " 收到消息: " + body);
try {
SpringUtil.getBean(typeEnum.getImpl()).handle(body);
ObjectMapper objectMapper = new ObjectMapper();
CommonMessage<?> msg = objectMapper.readValue(body,
objectMapper.getTypeFactory().constructParametricType(CommonMessage.class, typeEnum.getDto()));
MQHandle<?> handle = SpringUtil.getBean(typeEnum.getImpl());
handleMessageWithType(handle, msg);
} catch (Exception e) {
log.error("mq队列处理出错", e);
}
});
messageListeners.add(container);
// 启动监听
container.start();
}
log.info("mq监听启动完成");
return messageListeners;
}
@Bean
public TopicExchange topicExchange() {
return new TopicExchange("data.topic.exchange");
@SuppressWarnings("unchecked")
private <T> void handleMessageWithType(MQHandle<T> handler, CommonMessage<?> message) {
handler.handle((CommonMessage<T>) message);
}
@Bean
public List<Queue> declareQueues() {
return Arrays.stream(MQTypeEnum.values())
.map(type -> new Queue(type + ".queue", true))
.toList();
}
@Bean
public List<Binding> declareBindings(TopicExchange topicExchange) {
return Arrays.stream(MQTypeEnum.values())
.map(type -> BindingBuilder
.bind(new Queue(type + ".queue"))
.to(topicExchange)
.with(type + ".*"))
.toList();
}
}

View File

@ -1,5 +1,6 @@
package com.dite.znpt.data.bus.config;
import com.dite.znpt.data.bus.entity.CommonMessage;
import com.fasterxml.jackson.core.JsonProcessingException;
/**
@ -7,7 +8,7 @@ import com.fasterxml.jackson.core.JsonProcessingException;
* @date 2025/04/22 17:03
* @description mq处理接口
*/
public interface MQHandle {
public interface MQHandle<T> {
/**
* 功能描述mq处理类
@ -16,5 +17,5 @@ public interface MQHandle {
* @author cuizhibin
* @date 2025/04/22 17:03
**/
void handle(String msg) throws JsonProcessingException;
void handle(CommonMessage<T> msg);
}

View File

@ -1,9 +0,0 @@
package com.dite.znpt.data.bus.config.entity;
import lombok.Data;
@Data
public class CommonMessage<T> {
private String operation;
private T payload;
}

View File

@ -1,4 +0,0 @@
package com.dite.znpt.data.bus.config.entity.req;
public class ProjectDTO {
}

View File

@ -1,28 +0,0 @@
package com.dite.znpt.data.bus.config.enums;
import com.dite.znpt.data.bus.config.MQHandle;
import com.dite.znpt.data.bus.config.handles.ProjectHandleImpl;
import lombok.Getter;
/**
* @author cuizhibin
* @date 2025/04/22 17:05
* @description mq类型枚举
*/
@Getter
public enum MQTypeEnum {
PROJECT("project", "项目", ProjectHandleImpl.class),
CREW("crew", "机组", ProjectHandleImpl.class),
PARTS("parts", "部件", ProjectHandleImpl.class),
;
private final String type;
private final String desc;
private final Class<? extends MQHandle> impl;
MQTypeEnum(String type, String desc, Class<? extends MQHandle> impl) {
this.type = type;
this.desc = desc;
this.impl = impl;
}
}

View File

@ -1,31 +0,0 @@
package com.dite.znpt.data.bus.config.handles;
import cn.hutool.json.JSONUtil;
import com.dite.znpt.data.bus.config.MQHandle;
import com.dite.znpt.data.bus.config.entity.CommonMessage;
import com.dite.znpt.data.bus.config.entity.req.ProjectDTO;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.stereotype.Service;
/**
* @author cuizhibin
* @date 2025/04/22 17:11
* @description 项目处理
*/
@Service
public class ProjectHandleImpl implements MQHandle {
/**
* 功能描述mq处理类
*
* @param msg 消息
* @author cuizhibin
* @date 2025/04/22 17:03
**/
@Override
public void handle(String msg) throws JsonProcessingException {
ObjectMapper objectMapper = new ObjectMapper();
CommonMessage<ProjectDTO> projectMsg = objectMapper.readValue(msg,
objectMapper.getTypeFactory().constructParametricType(CommonMessage.class, ProjectDTO.class));
}
}

View File

@ -0,0 +1,10 @@
package com.dite.znpt.data.bus.entity;
import com.dite.znpt.data.bus.enums.MQOperationEnum;
import lombok.Data;
@Data
public class CommonMessage<T> {
private MQOperationEnum operation;
private T payload;
}

View File

@ -0,0 +1,4 @@
package com.dite.znpt.data.bus.entity.req;
public class ProjectDTO {
}

View File

@ -0,0 +1,33 @@
package com.dite.znpt.data.bus.enums;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonValue;
import lombok.Getter;
@Getter
public enum MQOperationEnum {
ADD("add", "新增"),
UPDATE("update", "修改"),
DEL("del", "删除"),
;
private final String code;
private final String desc;
MQOperationEnum(String code, String desc) {
this.code = code;
this.desc = desc;
}
@JsonValue
public String getCode() {
return code;
}
@JsonCreator
public static MQOperationEnum fromCode(String code) {
for (MQOperationEnum s : values()) {
if (s.code.equals(code)) return s;
}
throw new IllegalArgumentException("Unknown code: " + code);
}
}

View File

@ -0,0 +1,31 @@
package com.dite.znpt.data.bus.enums;
import com.dite.znpt.data.bus.config.MQHandle;
import com.dite.znpt.data.bus.entity.req.ProjectDTO;
import com.dite.znpt.data.bus.handles.ProjectHandleImpl;
import lombok.Getter;
/**
* @author cuizhibin
* @date 2025/04/22 17:05
* @description mq类型枚举
*/
@Getter
public enum MQTypeEnum {
PROJECT("project", "项目", ProjectHandleImpl.class, ProjectDTO.class),
CREW("crew", "机组", ProjectHandleImpl.class, ProjectDTO.class),
PARTS("parts", "部件", ProjectHandleImpl.class, ProjectDTO.class),
;
private final String type;
private final String desc;
private final Class<? extends MQHandle<?>> impl;
private final Class<?> dto;
MQTypeEnum(String type, String desc, Class<? extends MQHandle<?>> impl, Class<?> dto) {
this.type = type;
this.desc = desc;
this.impl = impl;
this.dto = dto;
}
}

View File

@ -0,0 +1,26 @@
package com.dite.znpt.data.bus.handles;
import com.dite.znpt.data.bus.config.MQHandle;
import com.dite.znpt.data.bus.entity.CommonMessage;
import com.dite.znpt.data.bus.entity.req.ProjectDTO;
import org.springframework.stereotype.Service;
/**
* @author cuizhibin
* @date 2025/04/22 17:11
* @description 项目处理
*/
@Service
public class ProjectHandleImpl implements MQHandle<ProjectDTO> {
/**
* 功能描述mq处理类
*
* @param msg 消息
* @author cuizhibin
* @date 2025/04/22 17:03
**/
@Override
public void handle(CommonMessage<ProjectDTO> msg) {
}
}

Some files were not shown because too many files have changed in this diff Show More