新增:iot 模块 集成 emqx 接收 mqtt 接收消息

This commit is contained in:
安浩浩
2024-08-06 22:17:29 +08:00
parent 9eeaa11520
commit b47176c96e
21 changed files with 550 additions and 68 deletions

View File

@ -0,0 +1,57 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>yudao-module-iot</artifactId>
<groupId>cn.iocoder.boot</groupId>
<version>${revision}</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<packaging>jar</packaging>
<artifactId>yudao-module-iot-biz</artifactId>
<name>${project.artifactId}</name>
<description>
物联网 模块,主要实现 产品管理、设备管理、协议管理等功能。
</description>
<dependencies> <!-- 5. 新增依赖,这里引入的都是比较常用的业务组件、技术组件 -->
<dependency>
<groupId>cn.iocoder.boot</groupId>
<artifactId>yudao-module-iot-api</artifactId>
<version>${revision}</version>
</dependency>
<!-- Web 相关 -->
<dependency>
<groupId>cn.iocoder.boot</groupId>
<artifactId>yudao-spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>cn.iocoder.boot</groupId>
<artifactId>yudao-spring-boot-starter-security</artifactId>
</dependency>
<!-- DB 相关 -->
<dependency>
<groupId>cn.iocoder.boot</groupId>
<artifactId>yudao-spring-boot-starter-mybatis</artifactId>
</dependency>
<!-- Test 测试相关 -->
<dependency>
<groupId>cn.iocoder.boot</groupId>
<artifactId>yudao-spring-boot-starter-test</artifactId>
</dependency>
<!-- mqtt -->
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,6 @@
/**
* 提供 RESTful API 给前端:
* 1. admin 包:提供给管理后台 yudao-ui-admin 前端项目
* 2. app 包:提供给用户 APP yudao-ui-app 前端项目,它的 Controller 和 VO 都要添加 App 前缀,用于和管理后台进行区分
*/
package cn.iocoder.yudao.module.iot.controller;

View File

@ -0,0 +1,53 @@
package cn.iocoder.yudao.module.iot.emq.callback;
import cn.iocoder.yudao.module.iot.emq.client.EmqxClient;
import cn.iocoder.yudao.module.iot.emq.service.EmqxService;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;
/**
* 用于处理MQTT连接的回调如连接断开、消息到达、消息发布完成、连接完成等事件。
*
* @author ahh
*/
@Slf4j
@Component
public class EmqxCallback implements MqttCallbackExtended {
@Lazy
@Resource
private EmqxService emqxService;
@Lazy
@Resource
private EmqxClient emqxClient;
@Override
public void connectionLost(Throwable throwable) {
log.info("MQTT 连接断开", throwable);
}
@Override
public void messageArrived(String topic, MqttMessage mqttMessage) {
emqxService.subscribeCallback(topic, mqttMessage);
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
log.info("消息发送成功: {}", iMqttDeliveryToken.getMessageId());
}
@Override
public void connectComplete(boolean reconnect, String serverURI) {
log.info("MQTT 已连接到服务器: {}", serverURI);
emqxService.subscribe(emqxClient.getMqttClient());
}
}

View File

@ -0,0 +1,86 @@
package cn.iocoder.yudao.module.iot.emq.client;
import cn.iocoder.yudao.module.iot.emq.callback.EmqxCallback;
import cn.iocoder.yudao.module.iot.emq.config.MqttConfig;
import jakarta.annotation.Resource;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.stereotype.Component;
/**
* MQTT客户端类负责建立与MQTT服务器的连接提供发布消息和订阅主题的功能
*
* @author ahh
*/
@Slf4j
@Data
@Component
public class EmqxClient {
@Resource
private EmqxCallback emqxCallback;
@Resource
private MqttConfig mqttConfig;
private MqttClient mqttClient;
public void connect() {
if (mqttClient == null) {
createMqttClient();
}
try {
mqttClient.connect(createMqttOptions());
log.info("MQTT客户端连接成功");
} catch (MqttException e) {
log.error("MQTT客户端连接失败", e);
}
}
private void createMqttClient() {
try {
mqttClient = new MqttClient(mqttConfig.getHostUrl(), "yudao-" + mqttConfig.getClientId(), new MemoryPersistence());
mqttClient.setCallback(emqxCallback);
} catch (MqttException e) {
log.error("创建MQTT客户端失败", e);
}
}
private MqttConnectOptions createMqttOptions() {
MqttConnectOptions options = new MqttConnectOptions();
options.setUserName(mqttConfig.getUsername());
options.setPassword(mqttConfig.getPassword().toCharArray());
options.setConnectionTimeout(mqttConfig.getTimeout());
options.setKeepAliveInterval(mqttConfig.getKeepalive());
options.setCleanSession(mqttConfig.isClearSession());
return options;
}
public void publish(String topic, String message) {
try {
if (mqttClient == null || !mqttClient.isConnected()) {
connect();
}
mqttClient.publish(topic, new MqttMessage(message.getBytes()));
log.info("消息已发布到主题: {}", topic);
} catch (MqttException e) {
log.error("消息发布失败", e);
}
}
public void subscribe(String topic) {
try {
if (mqttClient == null || !mqttClient.isConnected()) {
connect();
}
mqttClient.subscribe(topic);
log.info("订阅了主题: {}", topic);
} catch (MqttException e) {
log.error("订阅主题失败", e);
}
}
}

View File

@ -0,0 +1,66 @@
package cn.iocoder.yudao.module.iot.emq.config;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
/**
* 配置类用于读取MQTT连接的配置信息如用户名、密码、连接地址等
*
* @author ahh
*/
@Data
@Component
@ConfigurationProperties("iot.emq")
public class MqttConfig {
/**
* 用户名
*/
private String username;
/**
* 密码
*/
private String password;
/**
* 连接地址
*/
private String hostUrl;
/**
* 客户Id
*/
private String clientId;
/**
* 默认连接话题
*/
private String defaultTopic;
/**
* 超时时间
*/
private int timeout;
/**
* 保持连接数
*/
private int keepalive;
/**
* 是否清除session
*/
private boolean clearSession;
/**
* 是否共享订阅
*/
private boolean isShared;
/**
* 分组共享订阅
*/
private boolean isSharedGroup;
}

View File

@ -0,0 +1,34 @@
package cn.iocoder.yudao.module.iot.emq.service;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.stereotype.Service;
/**
* 用于处理MQTT消息的具体业务逻辑如订阅回调
*
* @author ahh
*/
@Slf4j
@Service
public class EmqxService {
public void subscribeCallback(String topic, MqttMessage mqttMessage) {
log.info("收到消息,主题: {}, 内容: {}", topic, new String(mqttMessage.getPayload()));
// 根据不同的主题,处理不同的业务逻辑
if (topic.contains("/property/post")) {
// 设备上报数据
}
}
public void subscribe(MqttClient client) {
try {
// 订阅默认主题,可以根据需要修改
client.subscribe("$share/yudao/+/+/#", 1);
log.info("订阅默认主题成功");
} catch (Exception e) {
log.error("订阅默认主题失败", e);
}
}
}

View File

@ -0,0 +1,24 @@
package cn.iocoder.yudao.module.iot.emq.start;
import cn.iocoder.yudao.module.iot.emq.client.EmqxClient;
import jakarta.annotation.Resource;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
/**
* 用于在应用启动时自动连接MQTT服务器
*
* @author ahh
*/
@Component
public class EmqxStart implements ApplicationRunner {
@Resource
private EmqxClient emqxClient;
@Override
public void run(ApplicationArguments applicationArguments) {
emqxClient.connect();
}
}

View File

@ -0,0 +1,6 @@
/**
* 属于 iot 模块的 framework 封装
*
* @author 芋道源码
*/
package cn.iocoder.yudao.module.iot.framework;

View File

@ -0,0 +1,24 @@
package cn.iocoder.yudao.module.iot.framework.web.config;
import cn.iocoder.yudao.framework.swagger.config.YudaoSwaggerAutoConfiguration;
import org.springdoc.core.models.GroupedOpenApi;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* iot 模块的 web 组件的 Configuration
*
* @author 芋道源码
*/
@Configuration(proxyBeanMethods = false)
public class IotWebConfiguration {
/**
* iot 模块的 API 分组
*/
@Bean
public GroupedOpenApi iotGroupedOpenApi() {
return YudaoSwaggerAutoConfiguration.buildGroupedOpenApi("iot");
}
}

View File

@ -0,0 +1,4 @@
/**
* iot 模块的 web 拓展封装
*/
package cn.iocoder.yudao.module.iot.framework.web;