短信渠道的本地缓存,使用 Job 轮询,替代 MQ 广播

This commit is contained in:
YunaiV
2023-07-29 08:59:07 +08:00
parent 27e70e73a3
commit dfa09c787a
7 changed files with 62 additions and 83 deletions

View File

@@ -6,6 +6,9 @@ import cn.iocoder.yudao.framework.mybatis.core.query.LambdaQueryWrapperX;
import cn.iocoder.yudao.module.system.controller.admin.sms.vo.channel.SmsChannelPageReqVO;
import cn.iocoder.yudao.module.system.dal.dataobject.sms.SmsChannelDO;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Select;
import java.time.LocalDateTime;
@Mapper
public interface SmsChannelMapper extends BaseMapperX<SmsChannelDO> {
@@ -18,4 +21,7 @@ public interface SmsChannelMapper extends BaseMapperX<SmsChannelDO> {
.orderByDesc(SmsChannelDO::getId));
}
@Select("SELECT COUNT(*) FROM system_sms_channel WHERE update_time > #{maxUpdateTime}")
Long selectCountByUpdateTimeGt(LocalDateTime maxTime);
}

View File

@@ -1,29 +0,0 @@
package cn.iocoder.yudao.module.system.mq.consumer.sms;
import cn.iocoder.yudao.framework.mq.core.pubsub.AbstractChannelMessageListener;
import cn.iocoder.yudao.module.system.mq.message.sms.SmsChannelRefreshMessage;
import cn.iocoder.yudao.module.system.service.sms.SmsChannelService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
* 针对 {@link SmsChannelRefreshMessage} 的消费者
*
* @author 芋道源码
*/
@Component
@Slf4j
public class SmsChannelRefreshConsumer extends AbstractChannelMessageListener<SmsChannelRefreshMessage> {
@Resource
private SmsChannelService smsChannelService;
@Override
public void onMessage(SmsChannelRefreshMessage message) {
log.info("[onMessage][收到 SmsChannel 刷新消息]");
smsChannelService.initLocalCache();
}
}

View File

@@ -1,21 +0,0 @@
package cn.iocoder.yudao.module.system.mq.message.sms;
import cn.iocoder.yudao.framework.mq.core.pubsub.AbstractChannelMessage;
import lombok.Data;
import lombok.EqualsAndHashCode;
/**
* 短信渠道的数据刷新 Message
*
* @author 芋道源码
*/
@Data
@EqualsAndHashCode(callSuper = true)
public class SmsChannelRefreshMessage extends AbstractChannelMessage {
@Override
public String getChannel() {
return "system.sms-channel.refresh";
}
}

View File

@@ -2,7 +2,6 @@ package cn.iocoder.yudao.module.system.mq.producer.sms;
import cn.iocoder.yudao.framework.common.core.KeyValue;
import cn.iocoder.yudao.framework.mq.core.RedisMQTemplate;
import cn.iocoder.yudao.module.system.mq.message.sms.SmsChannelRefreshMessage;
import cn.iocoder.yudao.module.system.mq.message.sms.SmsSendMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@@ -23,14 +22,6 @@ public class SmsProducer {
@Resource
private RedisMQTemplate redisMQTemplate;
/**
* 发送 {@link SmsChannelRefreshMessage} 消息
*/
public void sendSmsChannelRefreshMessage() {
SmsChannelRefreshMessage message = new SmsChannelRefreshMessage();
redisMQTemplate.send(message);
}
/**
* 发送 {@link SmsSendMessage} 消息
*

View File

@@ -1,5 +1,6 @@
package cn.iocoder.yudao.module.system.service.sms;
import cn.hutool.core.collection.CollUtil;
import cn.iocoder.yudao.framework.common.pojo.PageResult;
import cn.iocoder.yudao.framework.sms.core.client.SmsClientFactory;
import cn.iocoder.yudao.framework.sms.core.property.SmsChannelProperties;
@@ -9,15 +10,20 @@ import cn.iocoder.yudao.module.system.controller.admin.sms.vo.channel.SmsChannel
import cn.iocoder.yudao.module.system.convert.sms.SmsChannelConvert;
import cn.iocoder.yudao.module.system.dal.dataobject.sms.SmsChannelDO;
import cn.iocoder.yudao.module.system.dal.mysql.sms.SmsChannelMapper;
import cn.iocoder.yudao.module.system.mq.producer.sms.SmsProducer;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import static cn.iocoder.yudao.framework.common.exception.util.ServiceExceptionUtil.exception;
import static cn.iocoder.yudao.framework.common.util.collection.CollectionUtils.getMaxValue;
import static cn.iocoder.yudao.module.system.enums.ErrorCodeConstants.SMS_CHANNEL_HAS_CHILDREN;
import static cn.iocoder.yudao.module.system.enums.ErrorCodeConstants.SMS_CHANNEL_NOT_EXISTS;
@@ -30,6 +36,12 @@ import static cn.iocoder.yudao.module.system.enums.ErrorCodeConstants.SMS_CHANNE
@Slf4j
public class SmsChannelServiceImpl implements SmsChannelService {
/**
* 短信渠道列表的缓存
*/
@Getter
private volatile List<SmsChannelDO> channelCache = Collections.emptyList();
@Resource
private SmsClientFactory smsClientFactory;
@@ -39,9 +51,6 @@ public class SmsChannelServiceImpl implements SmsChannelService {
@Resource
private SmsTemplateService smsTemplateService;
@Resource
private SmsProducer smsProducer;
@Override
@PostConstruct
public void initLocalCache() {
@@ -52,6 +61,27 @@ public class SmsChannelServiceImpl implements SmsChannelService {
// 第二步:构建缓存:创建或更新短信 Client
List<SmsChannelProperties> propertiesList = SmsChannelConvert.INSTANCE.convertList02(channels);
propertiesList.forEach(properties -> smsClientFactory.createOrUpdateSmsClient(properties));
this.channelCache = channels;
}
/**
* 通过定时任务轮询,刷新缓存
*
* 目的:多节点部署时,通过轮询”通知“所有节点,进行刷新
*/
@Scheduled(initialDelay = 60, fixedRate = 60, timeUnit = TimeUnit.SECONDS)
public void refreshLocalCache() {
// 情况一:如果缓存里没有数据,则直接刷新缓存
if (CollUtil.isEmpty(channelCache)) {
initLocalCache();
return;
}
// 情况二,如果缓存里数据,则通过 updateTime 判断是否有数据变更,有变更则刷新缓存
LocalDateTime maxTime = getMaxValue(channelCache, SmsChannelDO::getUpdateTime);
if (smsChannelMapper.selectCountByUpdateTimeGt(maxTime) > 0) {
initLocalCache();
}
}
@Override
@@ -59,9 +89,9 @@ public class SmsChannelServiceImpl implements SmsChannelService {
// 插入
SmsChannelDO smsChannel = SmsChannelConvert.INSTANCE.convert(createReqVO);
smsChannelMapper.insert(smsChannel);
// 发送刷新消息
smsProducer.sendSmsChannelRefreshMessage();
// 返回
// 刷新缓存
initLocalCache();
return smsChannel.getId();
}
@@ -72,8 +102,9 @@ public class SmsChannelServiceImpl implements SmsChannelService {
// 更新
SmsChannelDO updateObj = SmsChannelConvert.INSTANCE.convert(updateReqVO);
smsChannelMapper.updateById(updateObj);
// 发送刷新消息
smsProducer.sendSmsChannelRefreshMessage();
// 刷新缓存
initLocalCache();
}
@Override
@@ -86,8 +117,9 @@ public class SmsChannelServiceImpl implements SmsChannelService {
}
// 删除
smsChannelMapper.deleteById(id);
// 发送刷新消息
smsProducer.sendSmsChannelRefreshMessage();
// 刷新缓存
initLocalCache();
}
private void validateSmsChannelExists(Long id) {