mirror of
				https://gitee.com/hhyykk/ipms-sjy.git
				synced 2025-11-04 20:28:44 +08:00 
			
		
		
		
	websocket:重新封装 websocket 组件,支持 sender 广播
This commit is contained in:
		@@ -4,6 +4,9 @@ import lombok.Data;
 | 
			
		||||
import org.springframework.boot.context.properties.ConfigurationProperties;
 | 
			
		||||
import org.springframework.validation.annotation.Validated;
 | 
			
		||||
 | 
			
		||||
import javax.validation.constraints.NotEmpty;
 | 
			
		||||
import javax.validation.constraints.NotNull;
 | 
			
		||||
 | 
			
		||||
/**
 | 
			
		||||
 * WebSocket 配置项
 | 
			
		||||
 *
 | 
			
		||||
@@ -17,6 +20,15 @@ public class WebSocketProperties {
 | 
			
		||||
    /**
 | 
			
		||||
     * WebSocket 的连接路径
 | 
			
		||||
     */
 | 
			
		||||
    @NotEmpty(message = "WebSocket 的连接路径不能为空")
 | 
			
		||||
    private String path = "/ws";
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * 消息发送器的类型
 | 
			
		||||
     *
 | 
			
		||||
     * 可选值:local、redis、rocketmq、kafka、rabbitmq
 | 
			
		||||
     */
 | 
			
		||||
    @NotNull(message = "WebSocket 的消息发送者不能为空")
 | 
			
		||||
    private String senderType = "local";
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -1,15 +1,31 @@
 | 
			
		||||
package cn.iocoder.yudao.framework.websocket.config;
 | 
			
		||||
 | 
			
		||||
import cn.iocoder.yudao.framework.mq.redis.core.RedisMQTemplate;
 | 
			
		||||
import cn.iocoder.yudao.framework.websocket.core.handler.JsonWebSocketMessageHandler;
 | 
			
		||||
import cn.iocoder.yudao.framework.websocket.core.listener.WebSocketMessageListener;
 | 
			
		||||
import cn.iocoder.yudao.framework.websocket.core.security.LoginUserHandshakeInterceptor;
 | 
			
		||||
import cn.iocoder.yudao.framework.websocket.core.sender.kafka.KafkaWebSocketMessageConsumer;
 | 
			
		||||
import cn.iocoder.yudao.framework.websocket.core.sender.kafka.KafkaWebSocketMessageSender;
 | 
			
		||||
import cn.iocoder.yudao.framework.websocket.core.sender.local.LocalWebSocketMessageSender;
 | 
			
		||||
import cn.iocoder.yudao.framework.websocket.core.sender.rabbitmq.RabbitMQWebSocketMessageConsumer;
 | 
			
		||||
import cn.iocoder.yudao.framework.websocket.core.sender.rabbitmq.RabbitMQWebSocketMessageSender;
 | 
			
		||||
import cn.iocoder.yudao.framework.websocket.core.sender.redis.RedisWebSocketMessageConsumer;
 | 
			
		||||
import cn.iocoder.yudao.framework.websocket.core.sender.redis.RedisWebSocketMessageSender;
 | 
			
		||||
import cn.iocoder.yudao.framework.websocket.core.sender.rocketmq.RocketMQWebSocketMessageConsumer;
 | 
			
		||||
import cn.iocoder.yudao.framework.websocket.core.sender.rocketmq.RocketMQWebSocketMessageSender;
 | 
			
		||||
import cn.iocoder.yudao.framework.websocket.core.session.WebSocketSessionHandlerDecorator;
 | 
			
		||||
import cn.iocoder.yudao.framework.websocket.core.session.WebSocketSessionManager;
 | 
			
		||||
import cn.iocoder.yudao.framework.websocket.core.session.WebSocketSessionManagerImpl;
 | 
			
		||||
import org.apache.rocketmq.spring.core.RocketMQTemplate;
 | 
			
		||||
import org.springframework.amqp.core.TopicExchange;
 | 
			
		||||
import org.springframework.amqp.rabbit.core.RabbitTemplate;
 | 
			
		||||
import org.springframework.beans.factory.annotation.Value;
 | 
			
		||||
import org.springframework.boot.autoconfigure.AutoConfiguration;
 | 
			
		||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 | 
			
		||||
import org.springframework.boot.context.properties.EnableConfigurationProperties;
 | 
			
		||||
import org.springframework.context.annotation.Bean;
 | 
			
		||||
import org.springframework.context.annotation.Configuration;
 | 
			
		||||
import org.springframework.kafka.core.KafkaTemplate;
 | 
			
		||||
import org.springframework.web.socket.WebSocketHandler;
 | 
			
		||||
import org.springframework.web.socket.config.annotation.EnableWebSocket;
 | 
			
		||||
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
 | 
			
		||||
@@ -25,7 +41,6 @@ import java.util.List;
 | 
			
		||||
@AutoConfiguration
 | 
			
		||||
@EnableWebSocket // 开启 websocket
 | 
			
		||||
@ConditionalOnProperty(prefix = "yudao.websocket", value = "enable", matchIfMissing = true) // 允许使用 yudao.websocket.enable=false 禁用 websocket
 | 
			
		||||
 | 
			
		||||
@EnableConfigurationProperties(WebSocketProperties.class)
 | 
			
		||||
public class YudaoWebSocketAutoConfiguration {
 | 
			
		||||
 | 
			
		||||
@@ -60,4 +75,103 @@ public class YudaoWebSocketAutoConfiguration {
 | 
			
		||||
        return new WebSocketSessionManagerImpl();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    // ==================== Sender 相关 ====================
 | 
			
		||||
 | 
			
		||||
    @Configuration
 | 
			
		||||
    @ConditionalOnProperty(prefix = "yudao.websocket", name = "sender-type", havingValue = "local", matchIfMissing = true)
 | 
			
		||||
    public class LocalWebSocketMessageSenderConfiguration {
 | 
			
		||||
 | 
			
		||||
        @Bean
 | 
			
		||||
        public LocalWebSocketMessageSender localWebSocketMessageSender(WebSocketSessionManager sessionManager) {
 | 
			
		||||
            return new LocalWebSocketMessageSender(sessionManager);
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Configuration
 | 
			
		||||
    @ConditionalOnProperty(prefix = "yudao.websocket", name = "sender-type", havingValue = "redis", matchIfMissing = true)
 | 
			
		||||
    public class RedisWebSocketMessageSenderConfiguration {
 | 
			
		||||
 | 
			
		||||
        @Bean
 | 
			
		||||
        public RedisWebSocketMessageSender redisWebSocketMessageSender(WebSocketSessionManager sessionManager,
 | 
			
		||||
                                                                       RedisMQTemplate redisMQTemplate) {
 | 
			
		||||
            return new RedisWebSocketMessageSender(sessionManager, redisMQTemplate);
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        // TODO 芋艿:需要额外删除 YudaoRedisMQAutoConfiguration 的 RedisMessageListenerContainer Bean 上的 @ConditionalOnBean 注解。可能是 spring boot 的 bug!
 | 
			
		||||
        @Bean
 | 
			
		||||
        public RedisWebSocketMessageConsumer redisWebSocketMessageConsumer(
 | 
			
		||||
                RedisWebSocketMessageSender redisWebSocketMessageSender) {
 | 
			
		||||
            return new RedisWebSocketMessageConsumer(redisWebSocketMessageSender);
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Configuration
 | 
			
		||||
    @ConditionalOnProperty(prefix = "yudao.websocket", name = "sender-type", havingValue = "rocketmq", matchIfMissing = true)
 | 
			
		||||
    public class RocketMQWebSocketMessageSenderConfiguration {
 | 
			
		||||
 | 
			
		||||
        @Bean
 | 
			
		||||
        public RocketMQWebSocketMessageSender rocketMQWebSocketMessageSender(
 | 
			
		||||
                WebSocketSessionManager sessionManager, RocketMQTemplate rocketMQTemplate,
 | 
			
		||||
                @Value("${yudao.websocket.sender-rocketmq.topic}") String topic) {
 | 
			
		||||
            return new RocketMQWebSocketMessageSender(sessionManager, rocketMQTemplate, topic);
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        @Bean
 | 
			
		||||
        public RocketMQWebSocketMessageConsumer rocketMQWebSocketMessageConsumer(
 | 
			
		||||
                RocketMQWebSocketMessageSender rocketMQWebSocketMessageSender) {
 | 
			
		||||
            return new RocketMQWebSocketMessageConsumer(rocketMQWebSocketMessageSender);
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Configuration
 | 
			
		||||
    @ConditionalOnProperty(prefix = "yudao.websocket", name = "sender-type", havingValue = "rabbitmq", matchIfMissing = true)
 | 
			
		||||
    public class RabbitMQWebSocketMessageSenderConfiguration {
 | 
			
		||||
 | 
			
		||||
        @Bean
 | 
			
		||||
        public RabbitMQWebSocketMessageSender rabbitMQWebSocketMessageSender(
 | 
			
		||||
                WebSocketSessionManager sessionManager, RabbitTemplate rabbitTemplate,
 | 
			
		||||
                TopicExchange websocketTopicExchange) {
 | 
			
		||||
            return new RabbitMQWebSocketMessageSender(sessionManager, rabbitTemplate, websocketTopicExchange);
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        @Bean
 | 
			
		||||
        public RabbitMQWebSocketMessageConsumer rabbitMQWebSocketMessageConsumer(
 | 
			
		||||
                RabbitMQWebSocketMessageSender rabbitMQWebSocketMessageSender) {
 | 
			
		||||
            return new RabbitMQWebSocketMessageConsumer(rabbitMQWebSocketMessageSender);
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        /**
 | 
			
		||||
         * 创建 Topic Exchange
 | 
			
		||||
         */
 | 
			
		||||
        @Bean
 | 
			
		||||
        public TopicExchange websocketTopicExchange(@Value("${yudao.websocket.sender-rabbitmq.exchange}") String exchange) {
 | 
			
		||||
            return new TopicExchange(exchange,
 | 
			
		||||
                    true,  // durable: 是否持久化
 | 
			
		||||
                    false);  // exclusive: 是否排它
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Configuration
 | 
			
		||||
    @ConditionalOnProperty(prefix = "yudao.websocket", name = "sender-type", havingValue = "kafka", matchIfMissing = true)
 | 
			
		||||
    public class KafkaWebSocketMessageSenderConfiguration {
 | 
			
		||||
 | 
			
		||||
        @Bean
 | 
			
		||||
        public KafkaWebSocketMessageSender kafkaWebSocketMessageSender(
 | 
			
		||||
                WebSocketSessionManager sessionManager, KafkaTemplate<Object, Object> kafkaTemplate,
 | 
			
		||||
                @Value("${yudao.websocket.sender-kafka.topic}") String topic) {
 | 
			
		||||
            return new KafkaWebSocketMessageSender(sessionManager, kafkaTemplate, topic);
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        @Bean
 | 
			
		||||
        public KafkaWebSocketMessageConsumer kafkaWebSocketMessageConsumer(
 | 
			
		||||
                KafkaWebSocketMessageSender kafkaWebSocketMessageSender) {
 | 
			
		||||
            return new KafkaWebSocketMessageConsumer(kafkaWebSocketMessageSender);
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
@@ -3,8 +3,10 @@ package cn.iocoder.yudao.framework.websocket.core.handler;
 | 
			
		||||
import cn.hutool.core.util.StrUtil;
 | 
			
		||||
import cn.hutool.core.util.TypeUtil;
 | 
			
		||||
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
 | 
			
		||||
import cn.iocoder.yudao.framework.tenant.core.util.TenantUtils;
 | 
			
		||||
import cn.iocoder.yudao.framework.websocket.core.listener.WebSocketMessageListener;
 | 
			
		||||
import cn.iocoder.yudao.framework.websocket.core.message.JsonWebSocketMessage;
 | 
			
		||||
import cn.iocoder.yudao.framework.websocket.core.util.WebSocketFrameworkUtils;
 | 
			
		||||
import lombok.extern.slf4j.Slf4j;
 | 
			
		||||
import org.springframework.web.socket.TextMessage;
 | 
			
		||||
import org.springframework.web.socket.WebSocketHandler;
 | 
			
		||||
@@ -70,8 +72,9 @@ public class JsonWebSocketMessageHandler extends TextWebSocketHandler {
 | 
			
		||||
            }
 | 
			
		||||
            // 2.3 处理消息
 | 
			
		||||
            Type type = TypeUtil.getTypeArgument(messageListener.getClass(), 0);
 | 
			
		||||
            Object messageObj = JsonUtils.parseObject(jsonMessage.getMessage(), type);
 | 
			
		||||
            messageListener.onMessage(session, messageObj);
 | 
			
		||||
            Object messageObj = JsonUtils.parseObject(jsonMessage.getContent(), type);
 | 
			
		||||
            Long tenantId = WebSocketFrameworkUtils.getTenantId(session);
 | 
			
		||||
            TenantUtils.execute(tenantId, () -> messageListener.onMessage(session, messageObj));
 | 
			
		||||
        } catch (Throwable ex) {
 | 
			
		||||
            log.error("[handleTextMessage][session({}) message({}) 处理异常]", session.getId(), message.getPayload());
 | 
			
		||||
        }
 | 
			
		||||
 
 | 
			
		||||
@@ -3,13 +3,15 @@ package cn.iocoder.yudao.framework.websocket.core.message;
 | 
			
		||||
import cn.iocoder.yudao.framework.websocket.core.listener.WebSocketMessageListener;
 | 
			
		||||
import lombok.Data;
 | 
			
		||||
 | 
			
		||||
import java.io.Serializable;
 | 
			
		||||
 | 
			
		||||
/**
 | 
			
		||||
 * JSON 格式的 WebSocket 消息帧
 | 
			
		||||
 *
 | 
			
		||||
 * @author 芋道源码
 | 
			
		||||
 */
 | 
			
		||||
@Data
 | 
			
		||||
public class JsonWebSocketMessage {
 | 
			
		||||
public class JsonWebSocketMessage implements Serializable {
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * 消息类型
 | 
			
		||||
@@ -22,6 +24,6 @@ public class JsonWebSocketMessage {
 | 
			
		||||
     *
 | 
			
		||||
     * 要求 JSON 对象
 | 
			
		||||
     */
 | 
			
		||||
    private String message;
 | 
			
		||||
    private String content;
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -0,0 +1,104 @@
 | 
			
		||||
package cn.iocoder.yudao.framework.websocket.core.sender;
 | 
			
		||||
 | 
			
		||||
import cn.hutool.core.collection.CollUtil;
 | 
			
		||||
import cn.hutool.core.util.StrUtil;
 | 
			
		||||
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
 | 
			
		||||
import cn.iocoder.yudao.framework.websocket.core.message.JsonWebSocketMessage;
 | 
			
		||||
import cn.iocoder.yudao.framework.websocket.core.session.WebSocketSessionManager;
 | 
			
		||||
import lombok.RequiredArgsConstructor;
 | 
			
		||||
import lombok.extern.slf4j.Slf4j;
 | 
			
		||||
import org.springframework.web.socket.TextMessage;
 | 
			
		||||
import org.springframework.web.socket.WebSocketSession;
 | 
			
		||||
 | 
			
		||||
import java.io.IOException;
 | 
			
		||||
import java.util.Collection;
 | 
			
		||||
import java.util.Collections;
 | 
			
		||||
import java.util.List;
 | 
			
		||||
 | 
			
		||||
/**
 | 
			
		||||
 * WebSocketMessageSender 实现类
 | 
			
		||||
 *
 | 
			
		||||
 * @author 芋道源码
 | 
			
		||||
 */
 | 
			
		||||
@Slf4j
 | 
			
		||||
@RequiredArgsConstructor
 | 
			
		||||
public abstract class AbstractWebSocketMessageSender implements WebSocketMessageSender {
 | 
			
		||||
 | 
			
		||||
    private final WebSocketSessionManager sessionManager;
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void send(Integer userType, Long userId, String messageType, String messageContent) {
 | 
			
		||||
        send(null, userType, userId, messageType, messageContent);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void send(Integer userType, String messageType, String messageContent) {
 | 
			
		||||
        send(null, userType, null, messageType, messageContent);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void send(String sessionId, String messageType, String messageContent) {
 | 
			
		||||
        send(sessionId, null, null, messageType, messageContent);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * 发送消息
 | 
			
		||||
     *
 | 
			
		||||
     * @param sessionId Session 编号
 | 
			
		||||
     * @param userType 用户类型
 | 
			
		||||
     * @param userId 用户编号
 | 
			
		||||
     * @param messageType 消息类型
 | 
			
		||||
     * @param messageContent 消息内容
 | 
			
		||||
     */
 | 
			
		||||
    public void send(String sessionId, Integer userType, Long userId, String messageType, String messageContent) {
 | 
			
		||||
        // 1. 获得 Session 列表
 | 
			
		||||
        List<WebSocketSession> sessions = Collections.emptyList();
 | 
			
		||||
        if (StrUtil.isNotEmpty(sessionId)) {
 | 
			
		||||
            WebSocketSession session = sessionManager.getSession(sessionId);
 | 
			
		||||
            if (session != null) {
 | 
			
		||||
                sessions = Collections.singletonList(session);
 | 
			
		||||
            }
 | 
			
		||||
        } else if (userType != null && userId != null) {
 | 
			
		||||
            sessions = (List<WebSocketSession>) sessionManager.getSessionList(userType, userId);
 | 
			
		||||
        } else if (userType != null) {
 | 
			
		||||
            sessions = (List<WebSocketSession>) sessionManager.getSessionList(userType);
 | 
			
		||||
        }
 | 
			
		||||
        if (CollUtil.isEmpty(sessions)) {
 | 
			
		||||
            log.info("[send][sessionId({}) userType({}) userId({}) messageType({}) messageContent({}) 未匹配到会话]",
 | 
			
		||||
                    sessionId, userType, userId, messageType, messageContent);
 | 
			
		||||
        }
 | 
			
		||||
        // 2. 执行发送
 | 
			
		||||
        doSend(sessions, messageType, messageContent);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * 发送消息的具体实现
 | 
			
		||||
     *
 | 
			
		||||
     * @param sessions Session 列表
 | 
			
		||||
     * @param messageType 消息类型
 | 
			
		||||
     * @param messageContent 消息内容
 | 
			
		||||
     */
 | 
			
		||||
    public void doSend(Collection<WebSocketSession> sessions, String messageType, String messageContent) {
 | 
			
		||||
        JsonWebSocketMessage message = new JsonWebSocketMessage().setType(messageType).setContent(messageContent);
 | 
			
		||||
        String payload = JsonUtils.toJsonString(message); // 关键,使用 JSON 序列化
 | 
			
		||||
        sessions.forEach(session -> {
 | 
			
		||||
            // 1. 各种校验,保证 Session 可以被发送
 | 
			
		||||
            if (session == null) {
 | 
			
		||||
                log.error("[doSend][session 为空, message({})]", message);
 | 
			
		||||
                return;
 | 
			
		||||
            }
 | 
			
		||||
            if (!session.isOpen()) {
 | 
			
		||||
                log.error("[doSend][session({}) 已关闭, message({})]", session.getId(), message);
 | 
			
		||||
                return;
 | 
			
		||||
            }
 | 
			
		||||
            // 2. 执行发送
 | 
			
		||||
            try {
 | 
			
		||||
                session.sendMessage(new TextMessage(payload));
 | 
			
		||||
                log.info("[doSend][session({}) 发送消息成功,message({})]", session.getId(), message);
 | 
			
		||||
            } catch (IOException ex) {
 | 
			
		||||
                log.error("[doSend][session({}) 发送消息失败,message({})]", session.getId(), message, ex);
 | 
			
		||||
            }
 | 
			
		||||
        });
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
@@ -0,0 +1,52 @@
 | 
			
		||||
package cn.iocoder.yudao.framework.websocket.core.sender;
 | 
			
		||||
 | 
			
		||||
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
 | 
			
		||||
 | 
			
		||||
/**
 | 
			
		||||
 * WebSocket 消息的发送器接口
 | 
			
		||||
 *
 | 
			
		||||
 * @author 芋道源码
 | 
			
		||||
 */
 | 
			
		||||
public interface WebSocketMessageSender {
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * 发送消息给指定用户
 | 
			
		||||
     *
 | 
			
		||||
     * @param userType 用户类型
 | 
			
		||||
     * @param userId 用户编号
 | 
			
		||||
     * @param messageType 消息类型
 | 
			
		||||
     * @param messageContent 消息内容,JSON 格式
 | 
			
		||||
     */
 | 
			
		||||
    void send(Integer userType, Long userId, String messageType, String messageContent);
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * 发送消息给指定用户类型
 | 
			
		||||
     *
 | 
			
		||||
     * @param userType 用户类型
 | 
			
		||||
     * @param messageType 消息类型
 | 
			
		||||
     * @param messageContent 消息内容,JSON 格式
 | 
			
		||||
     */
 | 
			
		||||
    void send(Integer userType, String messageType, String messageContent);
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * 发送消息给指定 Session
 | 
			
		||||
     *
 | 
			
		||||
     * @param sessionId Session 编号
 | 
			
		||||
     * @param messageType 消息类型
 | 
			
		||||
     * @param messageContent 消息内容,JSON 格式
 | 
			
		||||
     */
 | 
			
		||||
    void send(String sessionId, String messageType, String messageContent);
 | 
			
		||||
 | 
			
		||||
    default void sendObject(Integer userType, Long userId, String messageType, Object messageContent) {
 | 
			
		||||
        send(userType, userId, messageType, JsonUtils.toJsonString(messageContent));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    default void sendObject(Integer userType, String messageType, Object messageContent) {
 | 
			
		||||
        send(userType, messageType, JsonUtils.toJsonString(messageContent));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    default void sendObject(String sessionId, String messageType, Object messageContent) {
 | 
			
		||||
        send(sessionId, messageType, JsonUtils.toJsonString(messageContent));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
@@ -0,0 +1,35 @@
 | 
			
		||||
package cn.iocoder.yudao.framework.websocket.core.sender.kafka;
 | 
			
		||||
 | 
			
		||||
import lombok.Data;
 | 
			
		||||
 | 
			
		||||
/**
 | 
			
		||||
 * Kafka 广播 WebSocket 的消息
 | 
			
		||||
 *
 | 
			
		||||
 * @author 芋道源码
 | 
			
		||||
 */
 | 
			
		||||
@Data
 | 
			
		||||
public class KafkaWebSocketMessage {
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * Session 编号
 | 
			
		||||
     */
 | 
			
		||||
    private String sessionId;
 | 
			
		||||
    /**
 | 
			
		||||
     * 用户类型
 | 
			
		||||
     */
 | 
			
		||||
    private Integer userType;
 | 
			
		||||
    /**
 | 
			
		||||
     * 用户编号
 | 
			
		||||
     */
 | 
			
		||||
    private Long userId;
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * 消息类型
 | 
			
		||||
     */
 | 
			
		||||
    private String messageType;
 | 
			
		||||
    /**
 | 
			
		||||
     * 消息内容
 | 
			
		||||
     */
 | 
			
		||||
    private String messageContent;
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
@@ -0,0 +1,28 @@
 | 
			
		||||
package cn.iocoder.yudao.framework.websocket.core.sender.kafka;
 | 
			
		||||
 | 
			
		||||
import lombok.RequiredArgsConstructor;
 | 
			
		||||
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
 | 
			
		||||
import org.springframework.kafka.annotation.KafkaListener;
 | 
			
		||||
 | 
			
		||||
/**
 | 
			
		||||
 * {@link KafkaWebSocketMessage} 广播消息的消费者,真正把消息发送出去
 | 
			
		||||
 *
 | 
			
		||||
 * @author 芋道源码
 | 
			
		||||
 */
 | 
			
		||||
@RequiredArgsConstructor
 | 
			
		||||
public class KafkaWebSocketMessageConsumer {
 | 
			
		||||
 | 
			
		||||
    private final KafkaWebSocketMessageSender rabbitMQWebSocketMessageSender;
 | 
			
		||||
 | 
			
		||||
    @RabbitHandler
 | 
			
		||||
    @KafkaListener(
 | 
			
		||||
            topics = "${yudao.websocket.sender-kafka.topic}",
 | 
			
		||||
            // 在 Group 上,使用 UUID 生成其后缀。这样,启动的 Consumer 的 Group 不同,以达到广播消费的目的
 | 
			
		||||
            groupId = "${yudao.websocket.sender-kafka.consumer-group}" + "-" + "#{T(java.util.UUID).randomUUID()}")
 | 
			
		||||
    public void onMessage(KafkaWebSocketMessage message) {
 | 
			
		||||
        rabbitMQWebSocketMessageSender.send(message.getSessionId(),
 | 
			
		||||
                message.getUserType(), message.getUserId(),
 | 
			
		||||
                message.getMessageType(), message.getMessageContent());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
@@ -0,0 +1,67 @@
 | 
			
		||||
package cn.iocoder.yudao.framework.websocket.core.sender.kafka;
 | 
			
		||||
 | 
			
		||||
import cn.iocoder.yudao.framework.websocket.core.sender.AbstractWebSocketMessageSender;
 | 
			
		||||
import cn.iocoder.yudao.framework.websocket.core.sender.WebSocketMessageSender;
 | 
			
		||||
import cn.iocoder.yudao.framework.websocket.core.session.WebSocketSessionManager;
 | 
			
		||||
import lombok.extern.slf4j.Slf4j;
 | 
			
		||||
import org.springframework.kafka.core.KafkaTemplate;
 | 
			
		||||
 | 
			
		||||
import java.util.concurrent.ExecutionException;
 | 
			
		||||
 | 
			
		||||
/**
 | 
			
		||||
 * 基于 Kafka 的 {@link WebSocketMessageSender} 实现类
 | 
			
		||||
 *
 | 
			
		||||
 * @author 芋道源码
 | 
			
		||||
 */
 | 
			
		||||
@Slf4j
 | 
			
		||||
public class KafkaWebSocketMessageSender extends AbstractWebSocketMessageSender {
 | 
			
		||||
 | 
			
		||||
    private final KafkaTemplate<Object, Object> kafkaTemplate;
 | 
			
		||||
 | 
			
		||||
    private final String topic;
 | 
			
		||||
 | 
			
		||||
    public KafkaWebSocketMessageSender(WebSocketSessionManager sessionManager,
 | 
			
		||||
                                       KafkaTemplate<Object, Object> kafkaTemplate,
 | 
			
		||||
                                       String topic) {
 | 
			
		||||
        super(sessionManager);
 | 
			
		||||
        this.kafkaTemplate = kafkaTemplate;
 | 
			
		||||
        this.topic = topic;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void send(Integer userType, Long userId, String messageType, String messageContent) {
 | 
			
		||||
        sendKafkaMessage(null, userId, userType, messageType, messageContent);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void send(Integer userType, String messageType, String messageContent) {
 | 
			
		||||
        sendKafkaMessage(null, null, userType, messageType, messageContent);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void send(String sessionId, String messageType, String messageContent) {
 | 
			
		||||
        sendKafkaMessage(sessionId, null, null, messageType, messageContent);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * 通过 Kafka 广播消息
 | 
			
		||||
     *
 | 
			
		||||
     * @param sessionId Session 编号
 | 
			
		||||
     * @param userId 用户编号
 | 
			
		||||
     * @param userType 用户类型
 | 
			
		||||
     * @param messageType 消息类型
 | 
			
		||||
     * @param messageContent 消息内容
 | 
			
		||||
     */
 | 
			
		||||
    private void sendKafkaMessage(String sessionId, Long userId, Integer userType,
 | 
			
		||||
                                  String messageType, String messageContent) {
 | 
			
		||||
        KafkaWebSocketMessage mqMessage = new KafkaWebSocketMessage()
 | 
			
		||||
                .setSessionId(sessionId).setUserId(userId).setUserType(userType)
 | 
			
		||||
                .setMessageType(messageType).setMessageContent(messageContent);
 | 
			
		||||
        try {
 | 
			
		||||
            kafkaTemplate.send(topic, mqMessage).get();
 | 
			
		||||
        } catch (InterruptedException | ExecutionException e) {
 | 
			
		||||
            log.error("[sendKafkaMessage][发送消息({}) 到 Kafka 失败]", mqMessage, e);
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
@@ -0,0 +1,20 @@
 | 
			
		||||
package cn.iocoder.yudao.framework.websocket.core.sender.local;
 | 
			
		||||
 | 
			
		||||
import cn.iocoder.yudao.framework.websocket.core.sender.AbstractWebSocketMessageSender;
 | 
			
		||||
import cn.iocoder.yudao.framework.websocket.core.sender.WebSocketMessageSender;
 | 
			
		||||
import cn.iocoder.yudao.framework.websocket.core.session.WebSocketSessionManager;
 | 
			
		||||
 | 
			
		||||
/**
 | 
			
		||||
 * 本地的 {@link WebSocketMessageSender} 实现类
 | 
			
		||||
 *
 | 
			
		||||
 * 注意:仅仅适合单机场景!!!
 | 
			
		||||
 *
 | 
			
		||||
 * @author 芋道源码
 | 
			
		||||
 */
 | 
			
		||||
public class LocalWebSocketMessageSender extends AbstractWebSocketMessageSender {
 | 
			
		||||
 | 
			
		||||
    public LocalWebSocketMessageSender(WebSocketSessionManager sessionManager) {
 | 
			
		||||
        super(sessionManager);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
@@ -0,0 +1,37 @@
 | 
			
		||||
package cn.iocoder.yudao.framework.websocket.core.sender.rabbitmq;
 | 
			
		||||
 | 
			
		||||
import lombok.Data;
 | 
			
		||||
 | 
			
		||||
import java.io.Serializable;
 | 
			
		||||
 | 
			
		||||
/**
 | 
			
		||||
 * RabbitMQ 广播 WebSocket 的消息
 | 
			
		||||
 *
 | 
			
		||||
 * @author 芋道源码
 | 
			
		||||
 */
 | 
			
		||||
@Data
 | 
			
		||||
public class RabbitMQWebSocketMessage implements Serializable {
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * Session 编号
 | 
			
		||||
     */
 | 
			
		||||
    private String sessionId;
 | 
			
		||||
    /**
 | 
			
		||||
     * 用户类型
 | 
			
		||||
     */
 | 
			
		||||
    private Integer userType;
 | 
			
		||||
    /**
 | 
			
		||||
     * 用户编号
 | 
			
		||||
     */
 | 
			
		||||
    private Long userId;
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * 消息类型
 | 
			
		||||
     */
 | 
			
		||||
    private String messageType;
 | 
			
		||||
    /**
 | 
			
		||||
     * 消息内容
 | 
			
		||||
     */
 | 
			
		||||
    private String messageContent;
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
@@ -0,0 +1,39 @@
 | 
			
		||||
package cn.iocoder.yudao.framework.websocket.core.sender.rabbitmq;
 | 
			
		||||
 | 
			
		||||
import lombok.RequiredArgsConstructor;
 | 
			
		||||
import org.springframework.amqp.core.ExchangeTypes;
 | 
			
		||||
import org.springframework.amqp.rabbit.annotation.*;
 | 
			
		||||
 | 
			
		||||
/**
 | 
			
		||||
 * {@link RabbitMQWebSocketMessage} 广播消息的消费者,真正把消息发送出去
 | 
			
		||||
 *
 | 
			
		||||
 * @author 芋道源码
 | 
			
		||||
 */
 | 
			
		||||
@RabbitListener(
 | 
			
		||||
        bindings = @QueueBinding(
 | 
			
		||||
                value = @Queue(
 | 
			
		||||
                        // 在 Queue 的名字上,使用 UUID 生成其后缀。这样,启动的 Consumer 的 Queue 不同,以达到广播消费的目的
 | 
			
		||||
                        name = "${yudao.websocket.sender-rabbitmq.queue}" + "-" + "#{T(java.util.UUID).randomUUID()}",
 | 
			
		||||
                        // Consumer 关闭时,该队列就可以被自动删除了
 | 
			
		||||
                        autoDelete = "true"
 | 
			
		||||
                ),
 | 
			
		||||
                exchange = @Exchange(
 | 
			
		||||
                        name = "${yudao.websocket.sender-rabbitmq.exchange}",
 | 
			
		||||
                        type = ExchangeTypes.TOPIC,
 | 
			
		||||
                        declare = "false"
 | 
			
		||||
                )
 | 
			
		||||
        )
 | 
			
		||||
)
 | 
			
		||||
@RequiredArgsConstructor
 | 
			
		||||
public class RabbitMQWebSocketMessageConsumer {
 | 
			
		||||
 | 
			
		||||
    private final RabbitMQWebSocketMessageSender rabbitMQWebSocketMessageSender;
 | 
			
		||||
 | 
			
		||||
    @RabbitHandler
 | 
			
		||||
    public void onMessage(RabbitMQWebSocketMessage message) {
 | 
			
		||||
        rabbitMQWebSocketMessageSender.send(message.getSessionId(),
 | 
			
		||||
                message.getUserType(), message.getUserId(),
 | 
			
		||||
                message.getMessageType(), message.getMessageContent());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
@@ -0,0 +1,62 @@
 | 
			
		||||
package cn.iocoder.yudao.framework.websocket.core.sender.rabbitmq;
 | 
			
		||||
 | 
			
		||||
import cn.iocoder.yudao.framework.websocket.core.sender.AbstractWebSocketMessageSender;
 | 
			
		||||
import cn.iocoder.yudao.framework.websocket.core.sender.WebSocketMessageSender;
 | 
			
		||||
import cn.iocoder.yudao.framework.websocket.core.session.WebSocketSessionManager;
 | 
			
		||||
import lombok.extern.slf4j.Slf4j;
 | 
			
		||||
import org.springframework.amqp.core.TopicExchange;
 | 
			
		||||
import org.springframework.amqp.rabbit.core.RabbitTemplate;
 | 
			
		||||
 | 
			
		||||
/**
 | 
			
		||||
 * 基于 RabbitMQ 的 {@link WebSocketMessageSender} 实现类
 | 
			
		||||
 *
 | 
			
		||||
 * @author 芋道源码
 | 
			
		||||
 */
 | 
			
		||||
@Slf4j
 | 
			
		||||
public class RabbitMQWebSocketMessageSender extends AbstractWebSocketMessageSender {
 | 
			
		||||
 | 
			
		||||
    private final RabbitTemplate rabbitTemplate;
 | 
			
		||||
 | 
			
		||||
    private final TopicExchange topicExchange;
 | 
			
		||||
 | 
			
		||||
    public RabbitMQWebSocketMessageSender(WebSocketSessionManager sessionManager,
 | 
			
		||||
                                          RabbitTemplate rabbitTemplate,
 | 
			
		||||
                                          TopicExchange topicExchange) {
 | 
			
		||||
        super(sessionManager);
 | 
			
		||||
        this.rabbitTemplate = rabbitTemplate;
 | 
			
		||||
        this.topicExchange = topicExchange;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void send(Integer userType, Long userId, String messageType, String messageContent) {
 | 
			
		||||
        sendRabbitMQMessage(null, userId, userType, messageType, messageContent);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void send(Integer userType, String messageType, String messageContent) {
 | 
			
		||||
        sendRabbitMQMessage(null, null, userType, messageType, messageContent);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void send(String sessionId, String messageType, String messageContent) {
 | 
			
		||||
        sendRabbitMQMessage(sessionId, null, null, messageType, messageContent);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * 通过 RabbitMQ 广播消息
 | 
			
		||||
     *
 | 
			
		||||
     * @param sessionId Session 编号
 | 
			
		||||
     * @param userId 用户编号
 | 
			
		||||
     * @param userType 用户类型
 | 
			
		||||
     * @param messageType 消息类型
 | 
			
		||||
     * @param messageContent 消息内容
 | 
			
		||||
     */
 | 
			
		||||
    private void sendRabbitMQMessage(String sessionId, Long userId, Integer userType,
 | 
			
		||||
                                     String messageType, String messageContent) {
 | 
			
		||||
        RabbitMQWebSocketMessage mqMessage = new RabbitMQWebSocketMessage()
 | 
			
		||||
                .setSessionId(sessionId).setUserId(userId).setUserType(userType)
 | 
			
		||||
                .setMessageType(messageType).setMessageContent(messageContent);
 | 
			
		||||
        rabbitTemplate.convertAndSend(topicExchange.getName(), null, mqMessage);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
@@ -0,0 +1,34 @@
 | 
			
		||||
package cn.iocoder.yudao.framework.websocket.core.sender.redis;
 | 
			
		||||
 | 
			
		||||
import cn.iocoder.yudao.framework.mq.redis.core.pubsub.AbstractRedisChannelMessage;
 | 
			
		||||
import lombok.Data;
 | 
			
		||||
 | 
			
		||||
/**
 | 
			
		||||
 * Redis 广播 WebSocket 的消息
 | 
			
		||||
 */
 | 
			
		||||
@Data
 | 
			
		||||
public class RedisWebSocketMessage extends AbstractRedisChannelMessage {
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * Session 编号
 | 
			
		||||
     */
 | 
			
		||||
    private String sessionId;
 | 
			
		||||
    /**
 | 
			
		||||
     * 用户类型
 | 
			
		||||
     */
 | 
			
		||||
    private Integer userType;
 | 
			
		||||
    /**
 | 
			
		||||
     * 用户编号
 | 
			
		||||
     */
 | 
			
		||||
    private Long userId;
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * 消息类型
 | 
			
		||||
     */
 | 
			
		||||
    private String messageType;
 | 
			
		||||
    /**
 | 
			
		||||
     * 消息内容
 | 
			
		||||
     */
 | 
			
		||||
    private String messageContent;
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
@@ -0,0 +1,23 @@
 | 
			
		||||
package cn.iocoder.yudao.framework.websocket.core.sender.redis;
 | 
			
		||||
 | 
			
		||||
import cn.iocoder.yudao.framework.mq.redis.core.pubsub.AbstractRedisChannelMessageListener;
 | 
			
		||||
import lombok.RequiredArgsConstructor;
 | 
			
		||||
 | 
			
		||||
/**
 | 
			
		||||
 * {@link RedisWebSocketMessage} 广播消息的消费者,真正把消息发送出去
 | 
			
		||||
 *
 | 
			
		||||
 * @author 芋道源码
 | 
			
		||||
 */
 | 
			
		||||
@RequiredArgsConstructor
 | 
			
		||||
public class RedisWebSocketMessageConsumer extends AbstractRedisChannelMessageListener<RedisWebSocketMessage> {
 | 
			
		||||
 | 
			
		||||
    private final RedisWebSocketMessageSender redisWebSocketMessageSender;
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void onMessage(RedisWebSocketMessage message) {
 | 
			
		||||
        redisWebSocketMessageSender.send(message.getSessionId(),
 | 
			
		||||
                message.getUserType(), message.getUserId(),
 | 
			
		||||
                message.getMessageType(), message.getMessageContent());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
@@ -0,0 +1,57 @@
 | 
			
		||||
package cn.iocoder.yudao.framework.websocket.core.sender.redis;
 | 
			
		||||
 | 
			
		||||
import cn.iocoder.yudao.framework.mq.redis.core.RedisMQTemplate;
 | 
			
		||||
import cn.iocoder.yudao.framework.websocket.core.sender.AbstractWebSocketMessageSender;
 | 
			
		||||
import cn.iocoder.yudao.framework.websocket.core.sender.WebSocketMessageSender;
 | 
			
		||||
import cn.iocoder.yudao.framework.websocket.core.session.WebSocketSessionManager;
 | 
			
		||||
import lombok.extern.slf4j.Slf4j;
 | 
			
		||||
 | 
			
		||||
/**
 | 
			
		||||
 * 基于 Redis 的 {@link WebSocketMessageSender} 实现类
 | 
			
		||||
 *
 | 
			
		||||
 * @author 芋道源码
 | 
			
		||||
 */
 | 
			
		||||
@Slf4j
 | 
			
		||||
public class RedisWebSocketMessageSender extends AbstractWebSocketMessageSender {
 | 
			
		||||
 | 
			
		||||
    private final RedisMQTemplate redisMQTemplate;
 | 
			
		||||
 | 
			
		||||
    public RedisWebSocketMessageSender(WebSocketSessionManager sessionManager,
 | 
			
		||||
                                       RedisMQTemplate redisMQTemplate) {
 | 
			
		||||
        super(sessionManager);
 | 
			
		||||
        this.redisMQTemplate = redisMQTemplate;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void send(Integer userType, Long userId, String messageType, String messageContent) {
 | 
			
		||||
        sendRedisMessage(null, userId, userType, messageType, messageContent);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void send(Integer userType, String messageType, String messageContent) {
 | 
			
		||||
        sendRedisMessage(null, null, userType, messageType, messageContent);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void send(String sessionId, String messageType, String messageContent) {
 | 
			
		||||
        sendRedisMessage(sessionId, null, null, messageType, messageContent);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * 通过 Redis 广播消息
 | 
			
		||||
     *
 | 
			
		||||
     * @param sessionId Session 编号
 | 
			
		||||
     * @param userId 用户编号
 | 
			
		||||
     * @param userType 用户类型
 | 
			
		||||
     * @param messageType 消息类型
 | 
			
		||||
     * @param messageContent 消息内容
 | 
			
		||||
     */
 | 
			
		||||
    private void sendRedisMessage(String sessionId, Long userId, Integer userType,
 | 
			
		||||
                                  String messageType, String messageContent) {
 | 
			
		||||
        RedisWebSocketMessage mqMessage = new RedisWebSocketMessage()
 | 
			
		||||
                .setSessionId(sessionId).setUserId(userId).setUserType(userType)
 | 
			
		||||
                .setMessageType(messageType).setMessageContent(messageContent);
 | 
			
		||||
        redisMQTemplate.send(mqMessage);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
@@ -0,0 +1,35 @@
 | 
			
		||||
package cn.iocoder.yudao.framework.websocket.core.sender.rocketmq;
 | 
			
		||||
 | 
			
		||||
import lombok.Data;
 | 
			
		||||
 | 
			
		||||
/**
 | 
			
		||||
 * RocketMQ 广播 WebSocket 的消息
 | 
			
		||||
 *
 | 
			
		||||
 * @author 芋道源码
 | 
			
		||||
 */
 | 
			
		||||
@Data
 | 
			
		||||
public class RocketMQWebSocketMessage {
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * Session 编号
 | 
			
		||||
     */
 | 
			
		||||
    private String sessionId;
 | 
			
		||||
    /**
 | 
			
		||||
     * 用户类型
 | 
			
		||||
     */
 | 
			
		||||
    private Integer userType;
 | 
			
		||||
    /**
 | 
			
		||||
     * 用户编号
 | 
			
		||||
     */
 | 
			
		||||
    private Long userId;
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * 消息类型
 | 
			
		||||
     */
 | 
			
		||||
    private String messageType;
 | 
			
		||||
    /**
 | 
			
		||||
     * 消息内容
 | 
			
		||||
     */
 | 
			
		||||
    private String messageContent;
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
@@ -0,0 +1,30 @@
 | 
			
		||||
package cn.iocoder.yudao.framework.websocket.core.sender.rocketmq;
 | 
			
		||||
 | 
			
		||||
import lombok.RequiredArgsConstructor;
 | 
			
		||||
import org.apache.rocketmq.spring.annotation.MessageModel;
 | 
			
		||||
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
 | 
			
		||||
import org.apache.rocketmq.spring.core.RocketMQListener;
 | 
			
		||||
 | 
			
		||||
/**
 | 
			
		||||
 * {@link RocketMQWebSocketMessage} 广播消息的消费者,真正把消息发送出去
 | 
			
		||||
 *
 | 
			
		||||
 * @author 芋道源码
 | 
			
		||||
 */
 | 
			
		||||
@RocketMQMessageListener( // 重点:添加 @RocketMQMessageListener 注解,声明消费的 topic
 | 
			
		||||
        topic = "${yudao.websocket.sender-rocketmq.topic}",
 | 
			
		||||
        consumerGroup = "${yudao.websocket.sender-rocketmq.consumer-group}",
 | 
			
		||||
        messageModel = MessageModel.BROADCASTING // 设置为广播模式,保证每个实例都能收到消息
 | 
			
		||||
)
 | 
			
		||||
@RequiredArgsConstructor
 | 
			
		||||
public class RocketMQWebSocketMessageConsumer implements RocketMQListener<RocketMQWebSocketMessage> {
 | 
			
		||||
 | 
			
		||||
    private final RocketMQWebSocketMessageSender rocketMQWebSocketMessageSender;
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void onMessage(RocketMQWebSocketMessage message) {
 | 
			
		||||
        rocketMQWebSocketMessageSender.send(message.getSessionId(),
 | 
			
		||||
                message.getUserType(), message.getUserId(),
 | 
			
		||||
                message.getMessageType(), message.getMessageContent());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
@@ -0,0 +1,61 @@
 | 
			
		||||
package cn.iocoder.yudao.framework.websocket.core.sender.rocketmq;
 | 
			
		||||
 | 
			
		||||
import cn.iocoder.yudao.framework.websocket.core.sender.AbstractWebSocketMessageSender;
 | 
			
		||||
import cn.iocoder.yudao.framework.websocket.core.sender.WebSocketMessageSender;
 | 
			
		||||
import cn.iocoder.yudao.framework.websocket.core.session.WebSocketSessionManager;
 | 
			
		||||
import lombok.extern.slf4j.Slf4j;
 | 
			
		||||
import org.apache.rocketmq.spring.core.RocketMQTemplate;
 | 
			
		||||
 | 
			
		||||
/**
 | 
			
		||||
 * 基于 RocketMQ 的 {@link WebSocketMessageSender} 实现类
 | 
			
		||||
 *
 | 
			
		||||
 * @author 芋道源码
 | 
			
		||||
 */
 | 
			
		||||
@Slf4j
 | 
			
		||||
public class RocketMQWebSocketMessageSender extends AbstractWebSocketMessageSender {
 | 
			
		||||
 | 
			
		||||
    private final RocketMQTemplate rocketMQTemplate;
 | 
			
		||||
 | 
			
		||||
    private final String topic;
 | 
			
		||||
 | 
			
		||||
    public RocketMQWebSocketMessageSender(WebSocketSessionManager sessionManager,
 | 
			
		||||
                                          RocketMQTemplate rocketMQTemplate,
 | 
			
		||||
                                          String topic) {
 | 
			
		||||
        super(sessionManager);
 | 
			
		||||
        this.rocketMQTemplate = rocketMQTemplate;
 | 
			
		||||
        this.topic = topic;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void send(Integer userType, Long userId, String messageType, String messageContent) {
 | 
			
		||||
        sendRocketMQMessage(null, userId, userType, messageType, messageContent);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void send(Integer userType, String messageType, String messageContent) {
 | 
			
		||||
        sendRocketMQMessage(null, null, userType, messageType, messageContent);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void send(String sessionId, String messageType, String messageContent) {
 | 
			
		||||
        sendRocketMQMessage(sessionId, null, null, messageType, messageContent);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * 通过 RocketMQ 广播消息
 | 
			
		||||
     *
 | 
			
		||||
     * @param sessionId Session 编号
 | 
			
		||||
     * @param userId 用户编号
 | 
			
		||||
     * @param userType 用户类型
 | 
			
		||||
     * @param messageType 消息类型
 | 
			
		||||
     * @param messageContent 消息内容
 | 
			
		||||
     */
 | 
			
		||||
    private void sendRocketMQMessage(String sessionId, Long userId, Integer userType,
 | 
			
		||||
                                     String messageType, String messageContent) {
 | 
			
		||||
        RocketMQWebSocketMessage mqMessage = new RocketMQWebSocketMessage()
 | 
			
		||||
                .setSessionId(sessionId).setUserId(userId).setUserType(userType)
 | 
			
		||||
                .setMessageType(messageType).setMessageContent(messageContent);
 | 
			
		||||
        rocketMQTemplate.syncSend(topic, mqMessage);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
@@ -2,10 +2,14 @@ package cn.iocoder.yudao.framework.websocket.core.session;
 | 
			
		||||
 | 
			
		||||
import cn.hutool.core.collection.CollUtil;
 | 
			
		||||
import cn.iocoder.yudao.framework.security.core.LoginUser;
 | 
			
		||||
import cn.iocoder.yudao.framework.tenant.core.context.TenantContextHolder;
 | 
			
		||||
import cn.iocoder.yudao.framework.websocket.core.util.WebSocketFrameworkUtils;
 | 
			
		||||
import org.springframework.web.socket.WebSocketSession;
 | 
			
		||||
 | 
			
		||||
import java.util.*;
 | 
			
		||||
import java.util.ArrayList;
 | 
			
		||||
import java.util.Collection;
 | 
			
		||||
import java.util.LinkedList;
 | 
			
		||||
import java.util.List;
 | 
			
		||||
import java.util.concurrent.ConcurrentHashMap;
 | 
			
		||||
import java.util.concurrent.ConcurrentMap;
 | 
			
		||||
import java.util.concurrent.CopyOnWriteArrayList;
 | 
			
		||||
@@ -91,10 +95,18 @@ public class WebSocketSessionManagerImpl implements WebSocketSessionManager {
 | 
			
		||||
            return new ArrayList<>();
 | 
			
		||||
        }
 | 
			
		||||
        LinkedList<WebSocketSession> result = new LinkedList<>(); // 避免扩容
 | 
			
		||||
        Long contextTenantId = TenantContextHolder.getTenantId();
 | 
			
		||||
        for (List<WebSocketSession> sessions : userSessionsMap.values()) {
 | 
			
		||||
            if (CollUtil.isNotEmpty(sessions)) {
 | 
			
		||||
            if (CollUtil.isEmpty(sessions)) {
 | 
			
		||||
                continue;
 | 
			
		||||
            }
 | 
			
		||||
            // 特殊:如果租户不匹配,则直接排除
 | 
			
		||||
            if (contextTenantId != null) {
 | 
			
		||||
                Long userTenantId = WebSocketFrameworkUtils.getTenantId(sessions.get(0));
 | 
			
		||||
                if (!contextTenantId.equals(userTenantId)) {
 | 
			
		||||
                    continue;
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
            result.addAll(sessions);
 | 
			
		||||
        }
 | 
			
		||||
        return result;
 | 
			
		||||
 
 | 
			
		||||
@@ -1,7 +1,6 @@
 | 
			
		||||
package cn.iocoder.yudao.framework.websocket.core.util;
 | 
			
		||||
 | 
			
		||||
import cn.iocoder.yudao.framework.security.core.LoginUser;
 | 
			
		||||
import org.springframework.lang.Nullable;
 | 
			
		||||
import org.springframework.web.socket.WebSocketSession;
 | 
			
		||||
 | 
			
		||||
import java.util.Map;
 | 
			
		||||
@@ -39,7 +38,6 @@ public class WebSocketFrameworkUtils {
 | 
			
		||||
     *
 | 
			
		||||
     * @return 用户编号
 | 
			
		||||
     */
 | 
			
		||||
    @Nullable
 | 
			
		||||
    public static Long getLoginUserId(WebSocketSession session) {
 | 
			
		||||
        LoginUser loginUser = getLoginUser(session);
 | 
			
		||||
        return loginUser != null ? loginUser.getId() : null;
 | 
			
		||||
@@ -50,10 +48,20 @@ public class WebSocketFrameworkUtils {
 | 
			
		||||
     *
 | 
			
		||||
     * @return 用户编号
 | 
			
		||||
     */
 | 
			
		||||
    @Nullable
 | 
			
		||||
    public static Integer getLoginUserType(WebSocketSession session) {
 | 
			
		||||
        LoginUser loginUser = getLoginUser(session);
 | 
			
		||||
        return loginUser != null ? loginUser.getUserType() : null;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * 获得当前用户的租户编号
 | 
			
		||||
     *
 | 
			
		||||
     * @param session Session
 | 
			
		||||
     * @return 租户编号
 | 
			
		||||
     */
 | 
			
		||||
    public static Long getTenantId(WebSocketSession session) {
 | 
			
		||||
        LoginUser loginUser = getLoginUser(session);
 | 
			
		||||
        return loginUser != null ? loginUser.getTenantId() : null;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user