mirror of
https://gitee.com/hhyykk/ipms-sjy.git
synced 2025-07-14 11:05:06 +08:00
项目结构调整 x 16 : 将 monitor、sms、dict 等组件拆分出去
This commit is contained in:
@ -0,0 +1,119 @@
|
||||
package cn.iocoder.yudao.framework.redis.config;
|
||||
|
||||
import cn.hutool.system.SystemUtil;
|
||||
import cn.iocoder.yudao.framework.redis.core.pubsub.AbstractChannelMessageListener;
|
||||
import cn.iocoder.yudao.framework.redis.core.stream.AbstractStreamMessageListener;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.data.redis.connection.RedisConnectionFactory;
|
||||
import org.springframework.data.redis.connection.stream.Consumer;
|
||||
import org.springframework.data.redis.connection.stream.ObjectRecord;
|
||||
import org.springframework.data.redis.connection.stream.ReadOffset;
|
||||
import org.springframework.data.redis.connection.stream.StreamOffset;
|
||||
import org.springframework.data.redis.core.RedisTemplate;
|
||||
import org.springframework.data.redis.listener.ChannelTopic;
|
||||
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
|
||||
import org.springframework.data.redis.serializer.RedisSerializer;
|
||||
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Redis 配置类
|
||||
*/
|
||||
@Configuration
|
||||
@Slf4j
|
||||
public class RedisConfig {
|
||||
|
||||
/**
|
||||
* 创建 RedisTemplate Bean,使用 JSON 序列化方式
|
||||
*/
|
||||
@Bean
|
||||
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
|
||||
// 创建 RedisTemplate 对象
|
||||
RedisTemplate<String, Object> template = new RedisTemplate<>();
|
||||
// 设置 RedisConnection 工厂。😈 它就是实现多种 Java Redis 客户端接入的秘密工厂。感兴趣的胖友,可以自己去撸下。
|
||||
template.setConnectionFactory(factory);
|
||||
// 使用 String 序列化方式,序列化 KEY 。
|
||||
template.setKeySerializer(RedisSerializer.string());
|
||||
template.setHashKeySerializer(RedisSerializer.string());
|
||||
// 使用 JSON 序列化方式(库是 Jackson ),序列化 VALUE 。
|
||||
template.setValueSerializer(RedisSerializer.json());
|
||||
template.setHashValueSerializer(RedisSerializer.json());
|
||||
return template;
|
||||
}
|
||||
|
||||
/**
|
||||
* 创建 Redis Pub/Sub 广播消费的容器
|
||||
*/
|
||||
@Bean
|
||||
public RedisMessageListenerContainer redisMessageListenerContainer(
|
||||
RedisConnectionFactory factory, List<AbstractChannelMessageListener<?>> listeners) {
|
||||
// 创建 RedisMessageListenerContainer 对象
|
||||
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
|
||||
// 设置 RedisConnection 工厂。
|
||||
container.setConnectionFactory(factory);
|
||||
// 添加监听器
|
||||
listeners.forEach(listener -> {
|
||||
container.addMessageListener(listener, new ChannelTopic(listener.getChannel()));
|
||||
log.info("[redisMessageListenerContainer][注册 Channel({}) 对应的监听器({})]",
|
||||
listener.getChannel(), listener.getClass().getName());
|
||||
});
|
||||
return container;
|
||||
}
|
||||
|
||||
/**
|
||||
* 创建 Redis Stream 集群消费的容器
|
||||
*
|
||||
* Redis Stream 的 xreadgroup 命令:https://www.geek-book.com/src/docs/redis/redis/redis.io/commands/xreadgroup.html
|
||||
*/
|
||||
@Bean(initMethod = "start", destroyMethod = "stop")
|
||||
public StreamMessageListenerContainer<String, ObjectRecord<String, String>> redisStreamMessageListenerContainer(
|
||||
RedisTemplate<String, Object> redisTemplate, List<AbstractStreamMessageListener<?>> listeners) {
|
||||
// 第一步,创建 StreamMessageListenerContainer 容器
|
||||
// 创建 options 配置
|
||||
StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, String>> containerOptions =
|
||||
StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder()
|
||||
.batchSize(10) // 一次性最多拉取多少条消息
|
||||
.targetType(String.class) // 目标类型。统一使用 String,通过自己封装的 AbstractStreamMessageListener 去反序列化
|
||||
.build();
|
||||
// 创建 container 对象
|
||||
StreamMessageListenerContainer<String, ObjectRecord<String, String>> container = StreamMessageListenerContainer.create(
|
||||
redisTemplate.getRequiredConnectionFactory(), containerOptions);
|
||||
|
||||
// 第二步,注册监听器,消费对应的 Stream 主题
|
||||
String consumerName = buildConsumerName();
|
||||
// String consumerName = "110";
|
||||
listeners.forEach(listener -> {
|
||||
// 创建 listener 对应的消费者分组
|
||||
try {
|
||||
redisTemplate.opsForStream().createGroup(listener.getStreamKey(), listener.getGroup());
|
||||
} catch (Exception ignore) {}
|
||||
// 设置 listener 对应的 redisTemplate
|
||||
listener.setRedisTemplate(redisTemplate);
|
||||
// 创建 Consumer 对象
|
||||
Consumer consumer = Consumer.from(listener.getGroup(), consumerName);
|
||||
// 设置 Consumer 消费进度,以最小消费进度为准
|
||||
StreamOffset<String> streamOffset = StreamOffset.create(listener.getStreamKey(), ReadOffset.lastConsumed());
|
||||
// 设置 Consumer 监听
|
||||
StreamMessageListenerContainer.StreamReadRequestBuilder<String> builder = StreamMessageListenerContainer.StreamReadRequest
|
||||
.builder(streamOffset).consumer(consumer)
|
||||
.autoAcknowledge(false) // 不自动 ack
|
||||
.cancelOnError(throwable -> false); // 默认配置,发生异常就取消消费,显然不符合预期;因此,我们设置为 false
|
||||
container.register(builder.build(), listener);
|
||||
});
|
||||
return container;
|
||||
}
|
||||
|
||||
/**
|
||||
* 构建消费者名字,使用本地 IP + 进程编号的方式。
|
||||
* 参考自 RocketMQ clientId 的实现
|
||||
*
|
||||
* @return 消费者名字
|
||||
*/
|
||||
private static String buildConsumerName() {
|
||||
return String.format("%s@%d", SystemUtil.getHostInfo().getAddress(), SystemUtil.getCurrentPID());
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,101 @@
|
||||
package cn.iocoder.yudao.framework.redis.core;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonValue;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Data;
|
||||
import lombok.Getter;
|
||||
|
||||
import java.time.Duration;
|
||||
|
||||
/**
|
||||
* Redis Key 定义类
|
||||
*
|
||||
* @author 芋道源码
|
||||
*/
|
||||
@Data
|
||||
public class RedisKeyDefine {
|
||||
|
||||
@Getter
|
||||
@AllArgsConstructor
|
||||
public enum KeyTypeEnum {
|
||||
|
||||
STRING("String"),
|
||||
LIST("List"),
|
||||
HASH("Hash"),
|
||||
SET("Set"),
|
||||
ZSET("Sorted Set"),
|
||||
STREAM("Stream"),
|
||||
PUBSUB("Pub/Sub");
|
||||
|
||||
/**
|
||||
* 类型
|
||||
*/
|
||||
@JsonValue
|
||||
private final String type;
|
||||
|
||||
}
|
||||
|
||||
@Getter
|
||||
@AllArgsConstructor
|
||||
public enum TimeoutTypeEnum {
|
||||
|
||||
FOREVER(1), // 永不超时
|
||||
DYNAMIC(2), // 动态超时
|
||||
FIXED(3); // 固定超时
|
||||
|
||||
/**
|
||||
* 类型
|
||||
*/
|
||||
@JsonValue
|
||||
private final Integer type;
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Key 模板
|
||||
*/
|
||||
private final String keyTemplate;
|
||||
/**
|
||||
* Key 类型的枚举
|
||||
*/
|
||||
private final KeyTypeEnum keyType;
|
||||
/**
|
||||
* Value 类型
|
||||
*
|
||||
* 如果是使用分布式锁,设置为 {@link java.util.concurrent.locks.Lock} 类型
|
||||
*/
|
||||
private final Class<?> valueType;
|
||||
/**
|
||||
* 超时类型
|
||||
*/
|
||||
private final TimeoutTypeEnum timeoutType;
|
||||
/**
|
||||
* 过期时间
|
||||
*/
|
||||
private final Duration timeout;
|
||||
/**
|
||||
* 备注
|
||||
*/
|
||||
private final String memo;
|
||||
|
||||
private RedisKeyDefine(String memo, String keyTemplate, KeyTypeEnum keyType, Class<?> valueType,
|
||||
TimeoutTypeEnum timeoutType, Duration timeout) {
|
||||
this.memo = memo;
|
||||
this.keyTemplate = keyTemplate;
|
||||
this.keyType = keyType;
|
||||
this.valueType = valueType;
|
||||
this.timeout = timeout;
|
||||
this.timeoutType = timeoutType;
|
||||
// 添加注册表
|
||||
RedisKeyRegistry.add(this);
|
||||
}
|
||||
|
||||
public RedisKeyDefine(String memo, String keyTemplate, KeyTypeEnum keyType, Class<?> valueType, Duration timeout) {
|
||||
this(memo, keyTemplate, keyType, valueType, TimeoutTypeEnum.FIXED, timeout);
|
||||
}
|
||||
|
||||
public RedisKeyDefine(String memo, String keyTemplate, KeyTypeEnum keyType, Class<?> valueType, TimeoutTypeEnum timeoutType) {
|
||||
this(memo, keyTemplate, keyType, valueType, timeoutType, Duration.ZERO);
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,28 @@
|
||||
package cn.iocoder.yudao.framework.redis.core;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* {@link RedisKeyDefine} 注册表
|
||||
*/
|
||||
public class RedisKeyRegistry {
|
||||
|
||||
/**
|
||||
* Redis RedisKeyDefine 数组
|
||||
*/
|
||||
private static final List<RedisKeyDefine> defines = new ArrayList<>();
|
||||
|
||||
public static void add(RedisKeyDefine define) {
|
||||
defines.add(define);
|
||||
}
|
||||
|
||||
public static List<RedisKeyDefine> list() {
|
||||
return defines;
|
||||
}
|
||||
|
||||
public static int size() {
|
||||
return defines.size();
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,71 @@
|
||||
package cn.iocoder.yudao.framework.redis.core.pubsub;
|
||||
|
||||
import cn.hutool.core.util.TypeUtil;
|
||||
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
|
||||
import lombok.SneakyThrows;
|
||||
import org.springframework.data.redis.connection.Message;
|
||||
import org.springframework.data.redis.connection.MessageListener;
|
||||
|
||||
import java.lang.reflect.Type;
|
||||
|
||||
/**
|
||||
* Redis Pub/Sub 监听器抽象类,用于实现广播消费
|
||||
*
|
||||
* @param <T> 消息类型。一定要填写噢,不然会报错
|
||||
*
|
||||
* @author 芋道源码
|
||||
*/
|
||||
public abstract class AbstractChannelMessageListener<T extends ChannelMessage> implements MessageListener {
|
||||
|
||||
/**
|
||||
* 消息类型
|
||||
*/
|
||||
private final Class<T> messageType;
|
||||
/**
|
||||
* Redis Channel
|
||||
*/
|
||||
private final String channel;
|
||||
|
||||
@SneakyThrows
|
||||
protected AbstractChannelMessageListener() {
|
||||
this.messageType = getMessageClass();
|
||||
this.channel = messageType.newInstance().getChannel();
|
||||
}
|
||||
|
||||
/**
|
||||
* 获得 Sub 订阅的 Redis Channel 通道
|
||||
*
|
||||
* @return channel
|
||||
*/
|
||||
public final String getChannel() {
|
||||
return channel;
|
||||
}
|
||||
|
||||
@Override
|
||||
public final void onMessage(Message message, byte[] bytes) {
|
||||
T messageObj = JsonUtils.parseObject(message.getBody(), messageType);
|
||||
this.onMessage(messageObj);
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理消息
|
||||
*
|
||||
* @param message 消息
|
||||
*/
|
||||
public abstract void onMessage(T message);
|
||||
|
||||
/**
|
||||
* 通过解析类上的泛型,获得消息类型
|
||||
*
|
||||
* @return 消息类型
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
private Class<T> getMessageClass() {
|
||||
Type type = TypeUtil.getTypeArgument(getClass(), 0);
|
||||
if (type == null) {
|
||||
throw new IllegalStateException(String.format("类型(%s) 需要设置消息类型", getClass().getName()));
|
||||
}
|
||||
return (Class<T>) type;
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,20 @@
|
||||
package cn.iocoder.yudao.framework.redis.core.pubsub;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonIgnore;
|
||||
|
||||
/**
|
||||
* Redis Channel Message 接口
|
||||
*
|
||||
* @author 芋道源码
|
||||
*/
|
||||
public interface ChannelMessage {
|
||||
|
||||
/**
|
||||
* 获得 Redis Channel
|
||||
*
|
||||
* @return Channel
|
||||
*/
|
||||
@JsonIgnore // 避免序列化
|
||||
String getChannel();
|
||||
|
||||
}
|
@ -0,0 +1,88 @@
|
||||
package cn.iocoder.yudao.framework.redis.core.stream;
|
||||
|
||||
import cn.hutool.core.util.TypeUtil;
|
||||
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
|
||||
import lombok.Getter;
|
||||
import lombok.Setter;
|
||||
import lombok.SneakyThrows;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.data.redis.connection.stream.ObjectRecord;
|
||||
import org.springframework.data.redis.core.RedisTemplate;
|
||||
import org.springframework.data.redis.stream.StreamListener;
|
||||
|
||||
import java.lang.reflect.Type;
|
||||
|
||||
/**
|
||||
* Redis Stream 监听器抽象类,用于实现集群消费
|
||||
*
|
||||
* @param <T> 消息类型。一定要填写噢,不然会报错
|
||||
*
|
||||
* @author 芋道源码
|
||||
*/
|
||||
public abstract class AbstractStreamMessageListener<T extends StreamMessage>
|
||||
implements StreamListener<String, ObjectRecord<String, String>> {
|
||||
|
||||
/**
|
||||
* 消息类型
|
||||
*/
|
||||
private final Class<T> messageType;
|
||||
/**
|
||||
* Redis Channel
|
||||
*/
|
||||
@Getter
|
||||
private final String streamKey;
|
||||
|
||||
/**
|
||||
* Redis 消费者分组,默认使用 spring.application.name 名字
|
||||
*/
|
||||
@Value("${spring.application.name}")
|
||||
@Getter
|
||||
private String group;
|
||||
/**
|
||||
*
|
||||
*/
|
||||
@Setter
|
||||
private RedisTemplate<String, ?> redisTemplate;
|
||||
|
||||
@SneakyThrows
|
||||
protected AbstractStreamMessageListener() {
|
||||
this.messageType = getMessageClass();
|
||||
this.streamKey = messageType.newInstance().getStreamKey();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onMessage(ObjectRecord<String, String> message) {
|
||||
// 消费消息
|
||||
T messageObj = JsonUtils.parseObject(message.getValue(), messageType);
|
||||
this.onMessage(messageObj);
|
||||
// ack 消息消费完成
|
||||
redisTemplate.opsForStream().acknowledge(group, message);
|
||||
// TODO 芋艿:需要额外考虑以下几个点:
|
||||
// 1. 处理异常的情况
|
||||
// 2. 发送日志;以及事务的结合
|
||||
// 3. 消费日志;以及通用的幂等性
|
||||
// 4. 消费失败的重试,https://zhuanlan.zhihu.com/p/60501638
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理消息
|
||||
*
|
||||
* @param message 消息
|
||||
*/
|
||||
public abstract void onMessage(T message);
|
||||
|
||||
/**
|
||||
* 通过解析类上的泛型,获得消息类型
|
||||
*
|
||||
* @return 消息类型
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
private Class<T> getMessageClass() {
|
||||
Type type = TypeUtil.getTypeArgument(getClass(), 0);
|
||||
if (type == null) {
|
||||
throw new IllegalStateException(String.format("类型(%s) 需要设置消息类型", getClass().getName()));
|
||||
}
|
||||
return (Class<T>) type;
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,20 @@
|
||||
package cn.iocoder.yudao.framework.redis.core.stream;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonIgnore;
|
||||
|
||||
/**
|
||||
* Redis Stream Message 接口
|
||||
*
|
||||
* @author 芋道源码
|
||||
*/
|
||||
public interface StreamMessage {
|
||||
|
||||
/**
|
||||
* 获得 Redis Stream Key
|
||||
*
|
||||
* @return Channel
|
||||
*/
|
||||
@JsonIgnore // 避免序列化
|
||||
String getStreamKey();
|
||||
|
||||
}
|
@ -0,0 +1,40 @@
|
||||
package cn.iocoder.yudao.framework.redis.core.util;
|
||||
|
||||
import cn.iocoder.yudao.framework.redis.core.pubsub.ChannelMessage;
|
||||
import cn.iocoder.yudao.framework.redis.core.stream.StreamMessage;
|
||||
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
|
||||
import org.springframework.data.redis.connection.stream.RecordId;
|
||||
import org.springframework.data.redis.connection.stream.StreamRecords;
|
||||
import org.springframework.data.redis.core.RedisTemplate;
|
||||
|
||||
/**
|
||||
* Redis 消息工具类
|
||||
*
|
||||
* @author 芋道源码
|
||||
*/
|
||||
public class RedisMessageUtils {
|
||||
|
||||
/**
|
||||
* 发送 Redis 消息,基于 Redis pub/sub 实现
|
||||
*
|
||||
* @param redisTemplate Redis 操作模板
|
||||
* @param message 消息
|
||||
*/
|
||||
public static <T extends ChannelMessage> void sendChannelMessage(RedisTemplate<?, ?> redisTemplate, T message) {
|
||||
redisTemplate.convertAndSend(message.getChannel(), JsonUtils.toJsonString(message));
|
||||
}
|
||||
|
||||
/**
|
||||
* 发送 Redis 消息,基于 Redis Stream 实现
|
||||
*
|
||||
* @param redisTemplate Redis 操作模板
|
||||
* @param message 消息
|
||||
* @return 消息记录的编号对象
|
||||
*/
|
||||
public static <T extends StreamMessage> RecordId sendStreamMessage(RedisTemplate<String, ?> redisTemplate, T message) {
|
||||
return redisTemplate.opsForStream().add(StreamRecords.newRecord()
|
||||
.ofObject(JsonUtils.toJsonString(message)) // 设置内容
|
||||
.withStreamKey(message.getStreamKey())); // 设置 stream key
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,4 @@
|
||||
/**
|
||||
* 采用 Spring Data Redis 操作 Redis,底层使用 Redisson 作为客户端
|
||||
*/
|
||||
package cn.iocoder.yudao.framework.redis;
|
Reference in New Issue
Block a user