diff --git a/yudao-admin-server/pom.xml b/yudao-admin-server/pom.xml
index 15fc83132..107b67e6b 100644
--- a/yudao-admin-server/pom.xml
+++ b/yudao-admin-server/pom.xml
@@ -65,6 +65,12 @@
yudao-spring-boot-starter-job
+
+
+ cn.iocoder.boot
+ yudao-spring-boot-starter-mq
+
+
cn.iocoder.boot
@@ -104,6 +110,10 @@
cn.smallbun.screw
screw-core
+
+ cn.iocoder.boot
+ yudao-spring-boot-starter-mq
+
diff --git a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/infra/mq/consumer/config/InfConfigRefreshConsumer.java b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/infra/mq/consumer/config/InfConfigRefreshConsumer.java
index 4ab2ba4c2..35d3f791b 100644
--- a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/infra/mq/consumer/config/InfConfigRefreshConsumer.java
+++ b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/infra/mq/consumer/config/InfConfigRefreshConsumer.java
@@ -1,7 +1,7 @@
package cn.iocoder.yudao.adminserver.modules.infra.mq.consumer.config;
import cn.iocoder.yudao.framework.apollo.internals.DBConfigRepository;
-import cn.iocoder.yudao.framework.redis.core.pubsub.AbstractChannelMessageListener;
+import cn.iocoder.yudao.framework.mq.core.pubsub.AbstractChannelMessageListener;
import cn.iocoder.yudao.adminserver.modules.infra.mq.message.config.InfConfigRefreshMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
diff --git a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/infra/mq/message/config/InfConfigRefreshMessage.java b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/infra/mq/message/config/InfConfigRefreshMessage.java
index a4f6e0116..433048143 100644
--- a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/infra/mq/message/config/InfConfigRefreshMessage.java
+++ b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/infra/mq/message/config/InfConfigRefreshMessage.java
@@ -1,6 +1,6 @@
package cn.iocoder.yudao.adminserver.modules.infra.mq.message.config;
-import cn.iocoder.yudao.framework.redis.core.pubsub.ChannelMessage;
+import cn.iocoder.yudao.framework.mq.core.pubsub.ChannelMessage;
import lombok.Data;
/**
diff --git a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/infra/mq/producer/config/InfConfigProducer.java b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/infra/mq/producer/config/InfConfigProducer.java
index 44d52e646..39a8e76b3 100644
--- a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/infra/mq/producer/config/InfConfigProducer.java
+++ b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/infra/mq/producer/config/InfConfigProducer.java
@@ -1,7 +1,7 @@
package cn.iocoder.yudao.adminserver.modules.infra.mq.producer.config;
-import cn.iocoder.yudao.framework.redis.core.util.RedisMessageUtils;
import cn.iocoder.yudao.adminserver.modules.infra.mq.message.config.InfConfigRefreshMessage;
+import cn.iocoder.yudao.framework.mq.core.util.RedisMessageUtils;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
diff --git a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/consumer/dept/SysDeptRefreshConsumer.java b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/consumer/dept/SysDeptRefreshConsumer.java
index 6673826fd..81bed5083 100644
--- a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/consumer/dept/SysDeptRefreshConsumer.java
+++ b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/consumer/dept/SysDeptRefreshConsumer.java
@@ -1,6 +1,6 @@
package cn.iocoder.yudao.adminserver.modules.system.mq.consumer.dept;
-import cn.iocoder.yudao.framework.redis.core.pubsub.AbstractChannelMessageListener;
+import cn.iocoder.yudao.framework.mq.core.pubsub.AbstractChannelMessageListener;
import cn.iocoder.yudao.adminserver.modules.system.mq.message.dept.SysDeptRefreshMessage;
import cn.iocoder.yudao.adminserver.modules.system.service.dept.SysDeptService;
import lombok.extern.slf4j.Slf4j;
diff --git a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/consumer/dict/SysDictDataRefreshConsumer.java b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/consumer/dict/SysDictDataRefreshConsumer.java
index d7ff74d25..08f4b813e 100644
--- a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/consumer/dict/SysDictDataRefreshConsumer.java
+++ b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/consumer/dict/SysDictDataRefreshConsumer.java
@@ -1,6 +1,6 @@
package cn.iocoder.yudao.adminserver.modules.system.mq.consumer.dict;
-import cn.iocoder.yudao.framework.redis.core.pubsub.AbstractChannelMessageListener;
+import cn.iocoder.yudao.framework.mq.core.pubsub.AbstractChannelMessageListener;
import cn.iocoder.yudao.adminserver.modules.system.mq.message.dict.SysDictDataRefreshMessage;
import cn.iocoder.yudao.adminserver.modules.system.service.dict.SysDictDataService;
import lombok.extern.slf4j.Slf4j;
diff --git a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/consumer/mail/SysMailSendConsumer.java b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/consumer/mail/SysMailSendConsumer.java
index 38629873c..ff3b017a9 100644
--- a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/consumer/mail/SysMailSendConsumer.java
+++ b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/consumer/mail/SysMailSendConsumer.java
@@ -1,6 +1,6 @@
package cn.iocoder.yudao.adminserver.modules.system.mq.consumer.mail;
-import cn.iocoder.yudao.framework.redis.core.stream.AbstractStreamMessageListener;
+import cn.iocoder.yudao.framework.mq.core.stream.AbstractStreamMessageListener;
import cn.iocoder.yudao.adminserver.modules.system.mq.message.mail.SysMailSendMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
diff --git a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/consumer/permission/SysMenuRefreshConsumer.java b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/consumer/permission/SysMenuRefreshConsumer.java
index 3d2541676..9048e3ccb 100644
--- a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/consumer/permission/SysMenuRefreshConsumer.java
+++ b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/consumer/permission/SysMenuRefreshConsumer.java
@@ -1,6 +1,6 @@
package cn.iocoder.yudao.adminserver.modules.system.mq.consumer.permission;
-import cn.iocoder.yudao.framework.redis.core.pubsub.AbstractChannelMessageListener;
+import cn.iocoder.yudao.framework.mq.core.pubsub.AbstractChannelMessageListener;
import cn.iocoder.yudao.adminserver.modules.system.mq.message.permission.SysMenuRefreshMessage;
import cn.iocoder.yudao.adminserver.modules.system.service.permission.SysMenuService;
import lombok.extern.slf4j.Slf4j;
diff --git a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/consumer/permission/SysRoleMenuRefreshConsumer.java b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/consumer/permission/SysRoleMenuRefreshConsumer.java
index fc8270638..5cdaeef00 100644
--- a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/consumer/permission/SysRoleMenuRefreshConsumer.java
+++ b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/consumer/permission/SysRoleMenuRefreshConsumer.java
@@ -1,6 +1,6 @@
package cn.iocoder.yudao.adminserver.modules.system.mq.consumer.permission;
-import cn.iocoder.yudao.framework.redis.core.pubsub.AbstractChannelMessageListener;
+import cn.iocoder.yudao.framework.mq.core.pubsub.AbstractChannelMessageListener;
import cn.iocoder.yudao.adminserver.modules.system.mq.message.permission.SysRoleMenuRefreshMessage;
import cn.iocoder.yudao.adminserver.modules.system.service.permission.SysPermissionService;
import lombok.extern.slf4j.Slf4j;
diff --git a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/consumer/permission/SysRoleRefreshConsumer.java b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/consumer/permission/SysRoleRefreshConsumer.java
index 8bb2deec3..a2f4cc528 100644
--- a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/consumer/permission/SysRoleRefreshConsumer.java
+++ b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/consumer/permission/SysRoleRefreshConsumer.java
@@ -1,6 +1,6 @@
package cn.iocoder.yudao.adminserver.modules.system.mq.consumer.permission;
-import cn.iocoder.yudao.framework.redis.core.pubsub.AbstractChannelMessageListener;
+import cn.iocoder.yudao.framework.mq.core.pubsub.AbstractChannelMessageListener;
import cn.iocoder.yudao.adminserver.modules.system.mq.message.permission.SysRoleRefreshMessage;
import cn.iocoder.yudao.adminserver.modules.system.service.permission.SysRoleService;
import lombok.extern.slf4j.Slf4j;
diff --git a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/consumer/sms/SysSmsChannelRefreshConsumer.java b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/consumer/sms/SysSmsChannelRefreshConsumer.java
index 21c187414..540ff17e7 100644
--- a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/consumer/sms/SysSmsChannelRefreshConsumer.java
+++ b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/consumer/sms/SysSmsChannelRefreshConsumer.java
@@ -1,6 +1,6 @@
package cn.iocoder.yudao.adminserver.modules.system.mq.consumer.sms;
-import cn.iocoder.yudao.framework.redis.core.pubsub.AbstractChannelMessageListener;
+import cn.iocoder.yudao.framework.mq.core.pubsub.AbstractChannelMessageListener;
import cn.iocoder.yudao.adminserver.modules.system.mq.message.sms.SysSmsChannelRefreshMessage;
import cn.iocoder.yudao.adminserver.modules.system.service.sms.SysSmsChannelService;
import lombok.extern.slf4j.Slf4j;
diff --git a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/consumer/sms/SysSmsSendConsumer.java b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/consumer/sms/SysSmsSendConsumer.java
index 68fa3159f..947b23940 100644
--- a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/consumer/sms/SysSmsSendConsumer.java
+++ b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/consumer/sms/SysSmsSendConsumer.java
@@ -1,6 +1,6 @@
package cn.iocoder.yudao.adminserver.modules.system.mq.consumer.sms;
-import cn.iocoder.yudao.framework.redis.core.stream.AbstractStreamMessageListener;
+import cn.iocoder.yudao.framework.mq.core.stream.AbstractStreamMessageListener;
import cn.iocoder.yudao.adminserver.modules.system.mq.message.sms.SysSmsSendMessage;
import cn.iocoder.yudao.adminserver.modules.system.service.sms.SysSmsService;
import lombok.extern.slf4j.Slf4j;
diff --git a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/consumer/sms/SysSmsTemplateRefreshConsumer.java b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/consumer/sms/SysSmsTemplateRefreshConsumer.java
index b083243bb..1f4d92ffc 100644
--- a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/consumer/sms/SysSmsTemplateRefreshConsumer.java
+++ b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/consumer/sms/SysSmsTemplateRefreshConsumer.java
@@ -1,6 +1,6 @@
package cn.iocoder.yudao.adminserver.modules.system.mq.consumer.sms;
-import cn.iocoder.yudao.framework.redis.core.pubsub.AbstractChannelMessageListener;
+import cn.iocoder.yudao.framework.mq.core.pubsub.AbstractChannelMessageListener;
import cn.iocoder.yudao.adminserver.modules.system.mq.message.sms.SysSmsTemplateRefreshMessage;
import cn.iocoder.yudao.adminserver.modules.system.service.sms.SysSmsTemplateService;
import lombok.extern.slf4j.Slf4j;
diff --git a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/message/dept/SysDeptRefreshMessage.java b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/message/dept/SysDeptRefreshMessage.java
index 2aa5332a6..724547d68 100644
--- a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/message/dept/SysDeptRefreshMessage.java
+++ b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/message/dept/SysDeptRefreshMessage.java
@@ -1,6 +1,6 @@
package cn.iocoder.yudao.adminserver.modules.system.mq.message.dept;
-import cn.iocoder.yudao.framework.redis.core.pubsub.ChannelMessage;
+import cn.iocoder.yudao.framework.mq.core.pubsub.ChannelMessage;
import lombok.Data;
/**
diff --git a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/message/dict/SysDictDataRefreshMessage.java b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/message/dict/SysDictDataRefreshMessage.java
index b4520fbe7..7b735deb9 100644
--- a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/message/dict/SysDictDataRefreshMessage.java
+++ b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/message/dict/SysDictDataRefreshMessage.java
@@ -1,6 +1,6 @@
package cn.iocoder.yudao.adminserver.modules.system.mq.message.dict;
-import cn.iocoder.yudao.framework.redis.core.pubsub.ChannelMessage;
+import cn.iocoder.yudao.framework.mq.core.pubsub.ChannelMessage;
import lombok.Data;
/**
diff --git a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/message/mail/SysMailSendMessage.java b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/message/mail/SysMailSendMessage.java
index cfea76b27..bb9f62170 100644
--- a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/message/mail/SysMailSendMessage.java
+++ b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/message/mail/SysMailSendMessage.java
@@ -1,6 +1,6 @@
package cn.iocoder.yudao.adminserver.modules.system.mq.message.mail;
-import cn.iocoder.yudao.framework.redis.core.stream.StreamMessage;
+import cn.iocoder.yudao.framework.mq.core.stream.StreamMessage;
import lombok.Data;
import javax.validation.constraints.NotNull;
diff --git a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/message/permission/SysMenuRefreshMessage.java b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/message/permission/SysMenuRefreshMessage.java
index 73d611f4d..1fa2a3879 100644
--- a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/message/permission/SysMenuRefreshMessage.java
+++ b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/message/permission/SysMenuRefreshMessage.java
@@ -1,6 +1,6 @@
package cn.iocoder.yudao.adminserver.modules.system.mq.message.permission;
-import cn.iocoder.yudao.framework.redis.core.pubsub.ChannelMessage;
+import cn.iocoder.yudao.framework.mq.core.pubsub.ChannelMessage;
import lombok.Data;
/**
diff --git a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/message/permission/SysRoleMenuRefreshMessage.java b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/message/permission/SysRoleMenuRefreshMessage.java
index 03e321c6c..8b9f50c91 100644
--- a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/message/permission/SysRoleMenuRefreshMessage.java
+++ b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/message/permission/SysRoleMenuRefreshMessage.java
@@ -1,6 +1,6 @@
package cn.iocoder.yudao.adminserver.modules.system.mq.message.permission;
-import cn.iocoder.yudao.framework.redis.core.pubsub.ChannelMessage;
+import cn.iocoder.yudao.framework.mq.core.pubsub.ChannelMessage;
import lombok.Data;
/**
diff --git a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/message/permission/SysRoleRefreshMessage.java b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/message/permission/SysRoleRefreshMessage.java
index 9ae1d94ce..8d8d1e01a 100644
--- a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/message/permission/SysRoleRefreshMessage.java
+++ b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/message/permission/SysRoleRefreshMessage.java
@@ -1,6 +1,6 @@
package cn.iocoder.yudao.adminserver.modules.system.mq.message.permission;
-import cn.iocoder.yudao.framework.redis.core.pubsub.ChannelMessage;
+import cn.iocoder.yudao.framework.mq.core.pubsub.ChannelMessage;
import lombok.Data;
/**
diff --git a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/message/sms/SysSmsChannelRefreshMessage.java b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/message/sms/SysSmsChannelRefreshMessage.java
index 7379188f8..a37295615 100644
--- a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/message/sms/SysSmsChannelRefreshMessage.java
+++ b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/message/sms/SysSmsChannelRefreshMessage.java
@@ -1,6 +1,6 @@
package cn.iocoder.yudao.adminserver.modules.system.mq.message.sms;
-import cn.iocoder.yudao.framework.redis.core.pubsub.ChannelMessage;
+import cn.iocoder.yudao.framework.mq.core.pubsub.ChannelMessage;
import lombok.Data;
/**
diff --git a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/message/sms/SysSmsSendMessage.java b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/message/sms/SysSmsSendMessage.java
index f073640da..34ebf9101 100644
--- a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/message/sms/SysSmsSendMessage.java
+++ b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/message/sms/SysSmsSendMessage.java
@@ -1,7 +1,7 @@
package cn.iocoder.yudao.adminserver.modules.system.mq.message.sms;
import cn.iocoder.yudao.framework.common.core.KeyValue;
-import cn.iocoder.yudao.framework.redis.core.stream.StreamMessage;
+import cn.iocoder.yudao.framework.mq.core.stream.StreamMessage;
import lombok.Data;
import javax.validation.constraints.NotNull;
diff --git a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/message/sms/SysSmsTemplateRefreshMessage.java b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/message/sms/SysSmsTemplateRefreshMessage.java
index 9472f8bf7..c8bb00af2 100644
--- a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/message/sms/SysSmsTemplateRefreshMessage.java
+++ b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/message/sms/SysSmsTemplateRefreshMessage.java
@@ -1,6 +1,6 @@
package cn.iocoder.yudao.adminserver.modules.system.mq.message.sms;
-import cn.iocoder.yudao.framework.redis.core.pubsub.ChannelMessage;
+import cn.iocoder.yudao.framework.mq.core.pubsub.ChannelMessage;
import lombok.Data;
/**
diff --git a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/producer/dept/SysDeptProducer.java b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/producer/dept/SysDeptProducer.java
index b4c974a26..948796e22 100644
--- a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/producer/dept/SysDeptProducer.java
+++ b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/producer/dept/SysDeptProducer.java
@@ -1,6 +1,6 @@
package cn.iocoder.yudao.adminserver.modules.system.mq.producer.dept;
-import cn.iocoder.yudao.framework.redis.core.util.RedisMessageUtils;
+import cn.iocoder.yudao.framework.mq.core.util.RedisMessageUtils;
import cn.iocoder.yudao.adminserver.modules.system.mq.message.dept.SysDeptRefreshMessage;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
diff --git a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/producer/dict/SysDictDataProducer.java b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/producer/dict/SysDictDataProducer.java
index 5eb29ebff..ea0183722 100644
--- a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/producer/dict/SysDictDataProducer.java
+++ b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/producer/dict/SysDictDataProducer.java
@@ -1,6 +1,6 @@
package cn.iocoder.yudao.adminserver.modules.system.mq.producer.dict;
-import cn.iocoder.yudao.framework.redis.core.util.RedisMessageUtils;
+import cn.iocoder.yudao.framework.mq.core.util.RedisMessageUtils;
import cn.iocoder.yudao.adminserver.modules.system.mq.message.dict.SysDictDataRefreshMessage;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
diff --git a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/producer/permission/SysMenuProducer.java b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/producer/permission/SysMenuProducer.java
index 4e1e6a659..6b3493469 100644
--- a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/producer/permission/SysMenuProducer.java
+++ b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/producer/permission/SysMenuProducer.java
@@ -1,6 +1,6 @@
package cn.iocoder.yudao.adminserver.modules.system.mq.producer.permission;
-import cn.iocoder.yudao.framework.redis.core.util.RedisMessageUtils;
+import cn.iocoder.yudao.framework.mq.core.util.RedisMessageUtils;
import cn.iocoder.yudao.adminserver.modules.system.mq.message.permission.SysMenuRefreshMessage;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
diff --git a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/producer/permission/SysPermissionProducer.java b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/producer/permission/SysPermissionProducer.java
index 310fefd87..d9a1bfcc2 100644
--- a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/producer/permission/SysPermissionProducer.java
+++ b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/producer/permission/SysPermissionProducer.java
@@ -1,6 +1,6 @@
package cn.iocoder.yudao.adminserver.modules.system.mq.producer.permission;
-import cn.iocoder.yudao.framework.redis.core.util.RedisMessageUtils;
+import cn.iocoder.yudao.framework.mq.core.util.RedisMessageUtils;
import cn.iocoder.yudao.adminserver.modules.system.mq.message.permission.SysRoleMenuRefreshMessage;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
diff --git a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/producer/permission/SysRoleProducer.java b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/producer/permission/SysRoleProducer.java
index 09579e7e3..6888f27bc 100644
--- a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/producer/permission/SysRoleProducer.java
+++ b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/producer/permission/SysRoleProducer.java
@@ -1,6 +1,6 @@
package cn.iocoder.yudao.adminserver.modules.system.mq.producer.permission;
-import cn.iocoder.yudao.framework.redis.core.util.RedisMessageUtils;
+import cn.iocoder.yudao.framework.mq.core.util.RedisMessageUtils;
import cn.iocoder.yudao.adminserver.modules.system.mq.message.permission.SysRoleRefreshMessage;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
diff --git a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/producer/sms/SysSmsProducer.java b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/producer/sms/SysSmsProducer.java
index 53aa97321..81236287c 100644
--- a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/producer/sms/SysSmsProducer.java
+++ b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/producer/sms/SysSmsProducer.java
@@ -1,7 +1,7 @@
package cn.iocoder.yudao.adminserver.modules.system.mq.producer.sms;
import cn.iocoder.yudao.framework.common.core.KeyValue;
-import cn.iocoder.yudao.framework.redis.core.util.RedisMessageUtils;
+import cn.iocoder.yudao.framework.mq.core.util.RedisMessageUtils;
import cn.iocoder.yudao.adminserver.modules.system.mq.message.sms.SysSmsChannelRefreshMessage;
import cn.iocoder.yudao.adminserver.modules.system.mq.message.sms.SysSmsSendMessage;
import cn.iocoder.yudao.adminserver.modules.system.mq.message.sms.SysSmsTemplateRefreshMessage;
diff --git a/yudao-admin-server/src/test-integration/java/cn/iocoder/yudao/adminserver/framework/redis/core/stream/RedisStreamTest.java b/yudao-admin-server/src/test-integration/java/cn/iocoder/yudao/adminserver/framework/redis/core/stream/RedisStreamTest.java
index eea5c00a5..727b25750 100644
--- a/yudao-admin-server/src/test-integration/java/cn/iocoder/yudao/adminserver/framework/redis/core/stream/RedisStreamTest.java
+++ b/yudao-admin-server/src/test-integration/java/cn/iocoder/yudao/adminserver/framework/redis/core/stream/RedisStreamTest.java
@@ -6,7 +6,7 @@ import cn.iocoder.yudao.adminserver.modules.system.mq.consumer.mail.SysMailSendC
import cn.iocoder.yudao.adminserver.modules.system.mq.consumer.sms.SysSmsSendConsumer;
import cn.iocoder.yudao.adminserver.modules.system.mq.message.mail.SysMailSendMessage;
import cn.iocoder.yudao.adminserver.modules.system.mq.message.sms.SysSmsSendMessage;
-import cn.iocoder.yudao.framework.redis.core.util.RedisMessageUtils;
+import cn.iocoder.yudao.framework.mq.core.util.RedisMessageUtils;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.springframework.context.annotation.Import;
diff --git a/yudao-dependencies/pom.xml b/yudao-dependencies/pom.xml
index 7522fff9a..7fab31091 100644
--- a/yudao-dependencies/pom.xml
+++ b/yudao-dependencies/pom.xml
@@ -193,6 +193,13 @@
${revision}
+
+
+ cn.iocoder.boot
+ yudao-spring-boot-starter-mq
+ ${revision}
+
+
cn.iocoder.boot
diff --git a/yudao-framework/pom.xml b/yudao-framework/pom.xml
index 9363279f6..635dc7cf6 100644
--- a/yudao-framework/pom.xml
+++ b/yudao-framework/pom.xml
@@ -15,12 +15,16 @@
yudao-spring-boot-starter-redis
yudao-spring-boot-starter-web
yudao-spring-boot-starter-security
+
yudao-spring-boot-starter-monitor
yudao-spring-boot-starter-protection
yudao-spring-boot-starter-config
yudao-spring-boot-starter-job
+ yudao-spring-boot-starter-mq
+
yudao-spring-boot-starter-excel
yudao-spring-boot-starter-test
+
yudao-spring-boot-starter-biz-operatelog
yudao-spring-boot-starter-biz-dict
yudao-spring-boot-starter-biz-sms
diff --git a/yudao-framework/yudao-spring-boot-starter-mq/pom.xml b/yudao-framework/yudao-spring-boot-starter-mq/pom.xml
new file mode 100644
index 000000000..caef6f420
--- /dev/null
+++ b/yudao-framework/yudao-spring-boot-starter-mq/pom.xml
@@ -0,0 +1,26 @@
+
+
+
+ cn.iocoder.boot
+ yudao-framework
+ ${revision}
+
+ 4.0.0
+ yudao-spring-boot-starter-mq
+ jar
+
+ ${artifactId}
+ 消息队列,基于 Redis Pub/Sub 实现广播消费,基于 Stream 实现集群消费
+ https://github.com/YunaiV/ruoyi-vue-pro
+
+
+
+
+ cn.iocoder.boot
+ yudao-spring-boot-starter-redis
+
+
+
+
diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/config/YudaoMQAutoConfiguration.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/config/YudaoMQAutoConfiguration.java
new file mode 100644
index 000000000..e105dd8f3
--- /dev/null
+++ b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/config/YudaoMQAutoConfiguration.java
@@ -0,0 +1,105 @@
+package cn.iocoder.yudao.framework.mq.config;
+
+import cn.hutool.system.SystemUtil;
+import cn.iocoder.yudao.framework.mq.core.pubsub.AbstractChannelMessageListener;
+import cn.iocoder.yudao.framework.mq.core.stream.AbstractStreamMessageListener;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.boot.autoconfigure.AutoConfigureAfter;
+import org.springframework.boot.autoconfigure.data.redis.RedisAutoConfiguration;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.data.redis.connection.RedisConnectionFactory;
+import org.springframework.data.redis.connection.stream.Consumer;
+import org.springframework.data.redis.connection.stream.ObjectRecord;
+import org.springframework.data.redis.connection.stream.ReadOffset;
+import org.springframework.data.redis.connection.stream.StreamOffset;
+import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.data.redis.listener.ChannelTopic;
+import org.springframework.data.redis.listener.RedisMessageListenerContainer;
+import org.springframework.data.redis.stream.StreamMessageListenerContainer;
+
+import java.util.List;
+
+/**
+ * 消息队列配置类
+ *
+ * @author 芋道源码
+ */
+@Configuration
+@AutoConfigureAfter(YudaoMQAutoConfiguration.class)
+@Slf4j
+public class YudaoMQAutoConfiguration {
+
+ /**
+ * 创建 Redis Pub/Sub 广播消费的容器
+ */
+ @Bean
+ public RedisMessageListenerContainer redisMessageListenerContainer(
+ RedisConnectionFactory factory, List> listeners) {
+ // 创建 RedisMessageListenerContainer 对象
+ RedisMessageListenerContainer container = new RedisMessageListenerContainer();
+ // 设置 RedisConnection 工厂。
+ container.setConnectionFactory(factory);
+ // 添加监听器
+ listeners.forEach(listener -> {
+ container.addMessageListener(listener, new ChannelTopic(listener.getChannel()));
+ log.info("[redisMessageListenerContainer][注册 Channel({}) 对应的监听器({})]",
+ listener.getChannel(), listener.getClass().getName());
+ });
+ return container;
+ }
+
+ /**
+ * 创建 Redis Stream 集群消费的容器
+ *
+ * Redis Stream 的 xreadgroup 命令:https://www.geek-book.com/src/docs/redis/redis/redis.io/commands/xreadgroup.html
+ */
+ @Bean(initMethod = "start", destroyMethod = "stop")
+ public StreamMessageListenerContainer> redisStreamMessageListenerContainer(
+ RedisTemplate redisTemplate, List> listeners) {
+ // 第一步,创建 StreamMessageListenerContainer 容器
+ // 创建 options 配置
+ StreamMessageListenerContainer.StreamMessageListenerContainerOptions> containerOptions =
+ StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder()
+ .batchSize(10) // 一次性最多拉取多少条消息
+ .targetType(String.class) // 目标类型。统一使用 String,通过自己封装的 AbstractStreamMessageListener 去反序列化
+ .build();
+ // 创建 container 对象
+ StreamMessageListenerContainer> container = StreamMessageListenerContainer.create(
+ redisTemplate.getRequiredConnectionFactory(), containerOptions);
+
+ // 第二步,注册监听器,消费对应的 Stream 主题
+ String consumerName = buildConsumerName();
+// String consumerName = "110";
+ listeners.forEach(listener -> {
+ // 创建 listener 对应的消费者分组
+ try {
+ redisTemplate.opsForStream().createGroup(listener.getStreamKey(), listener.getGroup());
+ } catch (Exception ignore) {}
+ // 设置 listener 对应的 redisTemplate
+ listener.setRedisTemplate(redisTemplate);
+ // 创建 Consumer 对象
+ Consumer consumer = Consumer.from(listener.getGroup(), consumerName);
+ // 设置 Consumer 消费进度,以最小消费进度为准
+ StreamOffset streamOffset = StreamOffset.create(listener.getStreamKey(), ReadOffset.lastConsumed());
+ // 设置 Consumer 监听
+ StreamMessageListenerContainer.StreamReadRequestBuilder builder = StreamMessageListenerContainer.StreamReadRequest
+ .builder(streamOffset).consumer(consumer)
+ .autoAcknowledge(false) // 不自动 ack
+ .cancelOnError(throwable -> false); // 默认配置,发生异常就取消消费,显然不符合预期;因此,我们设置为 false
+ container.register(builder.build(), listener);
+ });
+ return container;
+ }
+
+ /**
+ * 构建消费者名字,使用本地 IP + 进程编号的方式。
+ * 参考自 RocketMQ clientId 的实现
+ *
+ * @return 消费者名字
+ */
+ private static String buildConsumerName() {
+ return String.format("%s@%d", SystemUtil.getHostInfo().getAddress(), SystemUtil.getCurrentPID());
+ }
+
+}
diff --git a/yudao-framework/yudao-spring-boot-starter-redis/src/main/java/cn/iocoder/yudao/framework/redis/core/pubsub/AbstractChannelMessageListener.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/pubsub/AbstractChannelMessageListener.java
similarity index 97%
rename from yudao-framework/yudao-spring-boot-starter-redis/src/main/java/cn/iocoder/yudao/framework/redis/core/pubsub/AbstractChannelMessageListener.java
rename to yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/pubsub/AbstractChannelMessageListener.java
index 8d3bf6fc1..9905a08ed 100644
--- a/yudao-framework/yudao-spring-boot-starter-redis/src/main/java/cn/iocoder/yudao/framework/redis/core/pubsub/AbstractChannelMessageListener.java
+++ b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/pubsub/AbstractChannelMessageListener.java
@@ -1,4 +1,4 @@
-package cn.iocoder.yudao.framework.redis.core.pubsub;
+package cn.iocoder.yudao.framework.mq.core.pubsub;
import cn.hutool.core.util.TypeUtil;
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
diff --git a/yudao-framework/yudao-spring-boot-starter-redis/src/main/java/cn/iocoder/yudao/framework/redis/core/pubsub/ChannelMessage.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/pubsub/ChannelMessage.java
similarity index 84%
rename from yudao-framework/yudao-spring-boot-starter-redis/src/main/java/cn/iocoder/yudao/framework/redis/core/pubsub/ChannelMessage.java
rename to yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/pubsub/ChannelMessage.java
index 60e5f494f..ff55f8b01 100644
--- a/yudao-framework/yudao-spring-boot-starter-redis/src/main/java/cn/iocoder/yudao/framework/redis/core/pubsub/ChannelMessage.java
+++ b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/pubsub/ChannelMessage.java
@@ -1,4 +1,4 @@
-package cn.iocoder.yudao.framework.redis.core.pubsub;
+package cn.iocoder.yudao.framework.mq.core.pubsub;
import com.fasterxml.jackson.annotation.JsonIgnore;
diff --git a/yudao-framework/yudao-spring-boot-starter-redis/src/main/java/cn/iocoder/yudao/framework/redis/core/stream/AbstractStreamMessageListener.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/stream/AbstractStreamMessageListener.java
similarity index 97%
rename from yudao-framework/yudao-spring-boot-starter-redis/src/main/java/cn/iocoder/yudao/framework/redis/core/stream/AbstractStreamMessageListener.java
rename to yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/stream/AbstractStreamMessageListener.java
index 85d44f4b1..612b5a029 100644
--- a/yudao-framework/yudao-spring-boot-starter-redis/src/main/java/cn/iocoder/yudao/framework/redis/core/stream/AbstractStreamMessageListener.java
+++ b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/stream/AbstractStreamMessageListener.java
@@ -1,4 +1,4 @@
-package cn.iocoder.yudao.framework.redis.core.stream;
+package cn.iocoder.yudao.framework.mq.core.stream;
import cn.hutool.core.util.TypeUtil;
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
diff --git a/yudao-framework/yudao-spring-boot-starter-redis/src/main/java/cn/iocoder/yudao/framework/redis/core/stream/StreamMessage.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/stream/StreamMessage.java
similarity index 84%
rename from yudao-framework/yudao-spring-boot-starter-redis/src/main/java/cn/iocoder/yudao/framework/redis/core/stream/StreamMessage.java
rename to yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/stream/StreamMessage.java
index ecb9c7409..30b38c62d 100644
--- a/yudao-framework/yudao-spring-boot-starter-redis/src/main/java/cn/iocoder/yudao/framework/redis/core/stream/StreamMessage.java
+++ b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/stream/StreamMessage.java
@@ -1,4 +1,4 @@
-package cn.iocoder.yudao.framework.redis.core.stream;
+package cn.iocoder.yudao.framework.mq.core.stream;
import com.fasterxml.jackson.annotation.JsonIgnore;
diff --git a/yudao-framework/yudao-spring-boot-starter-redis/src/main/java/cn/iocoder/yudao/framework/redis/core/util/RedisMessageUtils.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/util/RedisMessageUtils.java
similarity index 87%
rename from yudao-framework/yudao-spring-boot-starter-redis/src/main/java/cn/iocoder/yudao/framework/redis/core/util/RedisMessageUtils.java
rename to yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/util/RedisMessageUtils.java
index 0d8358699..57c925fa7 100644
--- a/yudao-framework/yudao-spring-boot-starter-redis/src/main/java/cn/iocoder/yudao/framework/redis/core/util/RedisMessageUtils.java
+++ b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/util/RedisMessageUtils.java
@@ -1,7 +1,7 @@
-package cn.iocoder.yudao.framework.redis.core.util;
+package cn.iocoder.yudao.framework.mq.core.util;
-import cn.iocoder.yudao.framework.redis.core.pubsub.ChannelMessage;
-import cn.iocoder.yudao.framework.redis.core.stream.StreamMessage;
+import cn.iocoder.yudao.framework.mq.core.pubsub.ChannelMessage;
+import cn.iocoder.yudao.framework.mq.core.stream.StreamMessage;
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.connection.stream.StreamRecords;
diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/package-info.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/package-info.java
new file mode 100644
index 000000000..48eaf2386
--- /dev/null
+++ b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/package-info.java
@@ -0,0 +1,6 @@
+/**
+ * 消息队列,基于 Redis 提供:
+ * 1. 基于 Pub/Sub 实现广播消费
+ * 2. 基于 Stream 实现集群消费
+ */
+package cn.iocoder.yudao.framework.mq;
diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/resources/META-INF/spring.factories b/yudao-framework/yudao-spring-boot-starter-mq/src/main/resources/META-INF/spring.factories
new file mode 100644
index 000000000..d4ca5b91d
--- /dev/null
+++ b/yudao-framework/yudao-spring-boot-starter-mq/src/main/resources/META-INF/spring.factories
@@ -0,0 +1,2 @@
+org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
+ cn.iocoder.yudao.framework.mq.config.YudaoMQAutoConfiguration
diff --git a/yudao-framework/yudao-spring-boot-starter-redis/src/main/java/cn/iocoder/yudao/framework/redis/config/YudaoRedisAutoConfiguration.java b/yudao-framework/yudao-spring-boot-starter-redis/src/main/java/cn/iocoder/yudao/framework/redis/config/YudaoRedisAutoConfiguration.java
index 4e9f0a002..8f4b5ad6c 100644
--- a/yudao-framework/yudao-spring-boot-starter-redis/src/main/java/cn/iocoder/yudao/framework/redis/config/YudaoRedisAutoConfiguration.java
+++ b/yudao-framework/yudao-spring-boot-starter-redis/src/main/java/cn/iocoder/yudao/framework/redis/config/YudaoRedisAutoConfiguration.java
@@ -1,23 +1,11 @@
package cn.iocoder.yudao.framework.redis.config;
-import cn.hutool.system.SystemUtil;
-import cn.iocoder.yudao.framework.redis.core.pubsub.AbstractChannelMessageListener;
-import cn.iocoder.yudao.framework.redis.core.stream.AbstractStreamMessageListener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
-import org.springframework.data.redis.connection.stream.Consumer;
-import org.springframework.data.redis.connection.stream.ObjectRecord;
-import org.springframework.data.redis.connection.stream.ReadOffset;
-import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.core.RedisTemplate;
-import org.springframework.data.redis.listener.ChannelTopic;
-import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.serializer.RedisSerializer;
-import org.springframework.data.redis.stream.StreamMessageListenerContainer;
-
-import java.util.List;
/**
* Redis 配置类
@@ -44,76 +32,4 @@ public class YudaoRedisAutoConfiguration {
return template;
}
- /**
- * 创建 Redis Pub/Sub 广播消费的容器
- */
- @Bean
- public RedisMessageListenerContainer redisMessageListenerContainer(
- RedisConnectionFactory factory, List> listeners) {
- // 创建 RedisMessageListenerContainer 对象
- RedisMessageListenerContainer container = new RedisMessageListenerContainer();
- // 设置 RedisConnection 工厂。
- container.setConnectionFactory(factory);
- // 添加监听器
- listeners.forEach(listener -> {
- container.addMessageListener(listener, new ChannelTopic(listener.getChannel()));
- log.info("[redisMessageListenerContainer][注册 Channel({}) 对应的监听器({})]",
- listener.getChannel(), listener.getClass().getName());
- });
- return container;
- }
-
- /**
- * 创建 Redis Stream 集群消费的容器
- *
- * Redis Stream 的 xreadgroup 命令:https://www.geek-book.com/src/docs/redis/redis/redis.io/commands/xreadgroup.html
- */
- @Bean(initMethod = "start", destroyMethod = "stop")
- public StreamMessageListenerContainer> redisStreamMessageListenerContainer(
- RedisTemplate redisTemplate, List> listeners) {
- // 第一步,创建 StreamMessageListenerContainer 容器
- // 创建 options 配置
- StreamMessageListenerContainer.StreamMessageListenerContainerOptions> containerOptions =
- StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder()
- .batchSize(10) // 一次性最多拉取多少条消息
- .targetType(String.class) // 目标类型。统一使用 String,通过自己封装的 AbstractStreamMessageListener 去反序列化
- .build();
- // 创建 container 对象
- StreamMessageListenerContainer> container = StreamMessageListenerContainer.create(
- redisTemplate.getRequiredConnectionFactory(), containerOptions);
-
- // 第二步,注册监听器,消费对应的 Stream 主题
- String consumerName = buildConsumerName();
-// String consumerName = "110";
- listeners.forEach(listener -> {
- // 创建 listener 对应的消费者分组
- try {
- redisTemplate.opsForStream().createGroup(listener.getStreamKey(), listener.getGroup());
- } catch (Exception ignore) {}
- // 设置 listener 对应的 redisTemplate
- listener.setRedisTemplate(redisTemplate);
- // 创建 Consumer 对象
- Consumer consumer = Consumer.from(listener.getGroup(), consumerName);
- // 设置 Consumer 消费进度,以最小消费进度为准
- StreamOffset streamOffset = StreamOffset.create(listener.getStreamKey(), ReadOffset.lastConsumed());
- // 设置 Consumer 监听
- StreamMessageListenerContainer.StreamReadRequestBuilder builder = StreamMessageListenerContainer.StreamReadRequest
- .builder(streamOffset).consumer(consumer)
- .autoAcknowledge(false) // 不自动 ack
- .cancelOnError(throwable -> false); // 默认配置,发生异常就取消消费,显然不符合预期;因此,我们设置为 false
- container.register(builder.build(), listener);
- });
- return container;
- }
-
- /**
- * 构建消费者名字,使用本地 IP + 进程编号的方式。
- * 参考自 RocketMQ clientId 的实现
- *
- * @return 消费者名字
- */
- private static String buildConsumerName() {
- return String.format("%s@%d", SystemUtil.getHostInfo().getAddress(), SystemUtil.getCurrentPID());
- }
-
}