优化 mq starter 组件,增加 RedisMQTemplate 模板类

This commit is contained in:
YunaiV
2021-12-05 23:00:41 +08:00
parent ade55d89a4
commit d29b0beb9b
26 changed files with 137 additions and 75 deletions

View File

@ -1,12 +1,12 @@
package cn.iocoder.yudao.framework.mq.config;
import cn.hutool.system.SystemUtil;
import cn.iocoder.yudao.framework.mq.core.RedisMQTemplate;
import cn.iocoder.yudao.framework.mq.core.pubsub.AbstractChannelMessageListener;
import cn.iocoder.yudao.framework.mq.core.stream.AbstractStreamMessageListener;
import cn.iocoder.yudao.framework.redis.config.YudaoRedisAutoConfiguration;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.data.redis.RedisAutoConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
@ -15,6 +15,7 @@ import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
@ -31,6 +32,13 @@ import java.util.List;
@Slf4j
public class YudaoMQAutoConfiguration {
@Bean
public RedisMQTemplate redisMQTemplate(StringRedisTemplate stringRedisTemplate) {
return new RedisMQTemplate(stringRedisTemplate);
}
// ========== 消费者相关 ==========
/**
* 创建 Redis Pub/Sub 广播消费的容器
*/
@ -71,7 +79,6 @@ public class YudaoMQAutoConfiguration {
// 第二步,注册监听器,消费对应的 Stream 主题
String consumerName = buildConsumerName();
// String consumerName = "110";
listeners.forEach(listener -> {
// 创建 listener 对应的消费者分组
try {

View File

@ -1,37 +1,39 @@
package cn.iocoder.yudao.framework.mq.core.util;
package cn.iocoder.yudao.framework.mq.core;
import cn.iocoder.yudao.framework.mq.core.pubsub.ChannelMessage;
import cn.iocoder.yudao.framework.mq.core.stream.StreamMessage;
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
import cn.iocoder.yudao.framework.mq.core.pubsub.AbstractChannelMessage;
import cn.iocoder.yudao.framework.mq.core.stream.AbstractStreamMessage;
import lombok.AllArgsConstructor;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.connection.stream.StreamRecords;
import org.springframework.data.redis.core.RedisTemplate;
/**
* Redis 消息工具
* Redis MQ 操作模板
*
* @author 芋道源码
*/
public class RedisMessageUtils {
@AllArgsConstructor
public class RedisMQTemplate {
private final RedisTemplate<String, ?> redisTemplate;
/**
* 发送 Redis 消息基于 Redis pub/sub 实现
*
* @param redisTemplate Redis 操作模板
* @param message 消息
*/
public static <T extends ChannelMessage> void sendChannelMessage(RedisTemplate<?, ?> redisTemplate, T message) {
public <T extends AbstractChannelMessage> void send(T message) {
redisTemplate.convertAndSend(message.getChannel(), JsonUtils.toJsonString(message));
}
/**
* 发送 Redis 消息基于 Redis Stream 实现
*
* @param redisTemplate Redis 操作模板
* @param message 消息
* @return 消息记录的编号对象
*/
public static <T extends StreamMessage> RecordId sendStreamMessage(RedisTemplate<String, ?> redisTemplate, T message) {
public <T extends AbstractStreamMessage> RecordId send(T message) {
return redisTemplate.opsForStream().add(StreamRecords.newRecord()
.ofObject(JsonUtils.toJsonString(message)) // 设置内容
.withStreamKey(message.getStreamKey())); // 设置 stream key

View File

@ -0,0 +1,26 @@
package cn.iocoder.yudao.framework.mq.core.message;
import java.util.HashMap;
import java.util.Map;
/**
* Redis 消息抽象基类
*
* @author 芋道源码
*/
public abstract class AbstractRedisMessage {
/**
* 头
*/
private final Map<String, String> headers = new HashMap<>();
public String getHeader(String key) {
return headers.get(key);
}
public void addHeader(String key, String value) {
headers.put(key, value);
}
}

View File

@ -1,13 +1,14 @@
package cn.iocoder.yudao.framework.mq.core.pubsub;
import cn.iocoder.yudao.framework.mq.core.message.AbstractRedisMessage;
import com.fasterxml.jackson.annotation.JsonIgnore;
/**
* Redis Channel Message 接口
* Redis Channel Message 抽象类
*
* @author 芋道源码
*/
public interface ChannelMessage {
public abstract class AbstractChannelMessage extends AbstractRedisMessage {
/**
* 获得 Redis Channel
@ -15,6 +16,6 @@ public interface ChannelMessage {
* @return Channel
*/
@JsonIgnore // 避免序列化
String getChannel();
public abstract String getChannel();
}

View File

@ -15,7 +15,7 @@ import java.lang.reflect.Type;
*
* @author 芋道源码
*/
public abstract class AbstractChannelMessageListener<T extends ChannelMessage> implements MessageListener {
public abstract class AbstractChannelMessageListener<T extends AbstractChannelMessage> implements MessageListener {
/**
* 消息类型

View File

@ -3,11 +3,11 @@ package cn.iocoder.yudao.framework.mq.core.stream;
import com.fasterxml.jackson.annotation.JsonIgnore;
/**
* Redis Stream Message 接口
* Redis Stream Message 抽象类
*
* @author 芋道源码
*/
public interface StreamMessage {
public abstract class AbstractStreamMessage {
/**
* 获得 Redis Stream Key
@ -15,6 +15,6 @@ public interface StreamMessage {
* @return Channel
*/
@JsonIgnore // 避免序列化
String getStreamKey();
public abstract String getStreamKey();
}

View File

@ -19,7 +19,7 @@ import java.lang.reflect.Type;
*
* @author 芋道源码
*/
public abstract class AbstractStreamMessageListener<T extends StreamMessage>
public abstract class AbstractStreamMessageListener<T extends AbstractStreamMessage>
implements StreamListener<String, ObjectRecord<String, String>> {
/**
@ -39,7 +39,7 @@ public abstract class AbstractStreamMessageListener<T extends StreamMessage>
@Getter
private String group;
/**
*
* RedisTemplate
*/
@Setter
private RedisTemplate<String, ?> redisTemplate;