一文搞懂MQTT,如何在SpringBoot中使用MQTT实现消息的订阅和发布
之前介绍了RabbitMQ以及如何在SpringBoot项目中整合使用RabbitMQ,看过的朋友都说写的比较详细,希望再总结一下目前比较流行的MQTT。所以接下来,就来介绍什么MQTT?它在IoT中有着怎样的作用?如何在项目中使用MQTT? 一、MQTT介绍1.1 什么是MQTT?
MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的"轻量级"通讯协议,该协议构建于TCP/IP协议上,由IBM在1999年发布。
MQTT最大优点在于用极少的代码和有限的带宽,为连接远程设备提供实时可靠的消息服务。
MQTT具有协议简洁、轻巧、可扩展性强、低开销、低带宽占用等优点,已经有PHP,JAVA,Python,C,C#,Go等多个语言版本,基本可以使用在任何平台上。在物联网、小型设备、移动应用等方面有较广泛的应用,特别适合用来当做物联网的通信协议。
1.2 MQTT特点
MQTT是一个基于客户端-服务器的消息发布/订阅传输协议。MQTT协议是轻量、简单、开放和易于实现的,这些特点使它适用范围非常广泛。在很多情况下,包括受限的环境中,如:机器与机器(M2M)通信和物联网(IoT)。
MQTT协议是为硬件性能有限,且工作在低带宽、不可靠的网络的远程传感器和控制设备通讯而设计的协议,它具有以下主要的几项特性: 1.使用发布/订阅消息模式,提供多对多的消息发布,解除应用程序耦合; 2.对负载内容屏蔽的消息传输; 3.使用TCP/IP 提供网络连接; 4.支持三种消息发布服务质量(QoS): QoS 0(最多一次):消息发布完全依赖底层 TCP/IP 网络。会发生消息丢失或重复。这个级别可用于如下情况,环境传感器数据,丢失一次数据无所谓,因为不久后还会有第二次发送。 QoS 1(至少一次):确保消息到达,但消息重复可能会发生。 QoS 2(只有一次):确保消息到达一次。这个级别可用于如下情况,在计费系统中,消息重复或丢失会导致不正确的结果。 5.传输数据小,开销很小(固定长度的头部是 2 字节),协议交换最小化,以降低网络流量;(用极少的代码和有限的带宽,为连接远程设备提供实时可靠的消息服务。)
1.3 MQTT应用场景
MQTT作为一种低开销、低带宽占用的即时通讯协议,使其在物联网、小型设备、移动应用等方面有着广泛的应用。MQTT服务只负责消息的接收和传递,应用系统连接到MQTT服务器后,可以实现采集数据接收、解析、业务处理、存储入库、数据展示等功能。常见的应用场景主要有以下几个方面:
(1)消息推送: 如PC端的推送公告,比如安卓的推送服务,还有一些即时通信软件如微信、易信等也是采用的推送技术。
(2)智能点餐: 通过MQTT消息队列产品,消费者可在餐桌上扫码点餐,并与商家后端系统连接实现自助下单、支付。
(3)信息更新: 实现商场超市等场所的电子标签、公共场所的多媒体屏幕的显示更新管理。
(4)扫码出站: 最常见的停车场扫码缴费,自动起竿;地铁闸口扫码进出站。
二、MQTT的角色组成2.1 MQTT的客户端和服务端
2.1.1 服务端(Broker)
EMQX就是一个MQTT的Broker,emqx只是基于erlang语言开发的软件而已,其它的MQ还有ActiveMQ、RabbitMQ、HiveMQ等等。
EMQX服务端:https://www.emqx.io/zh/downloads?os=Windows
2.1.2 客户端(发布/订阅)
EMQX客户端:https://mqttx.app/zh
这个是用来测试验证的客户端,实际项目是通过代码来实现我们消息的生产者和消费者。
2.2 MQTT中的几个概念
相比RabbitMQ等消息队列,MQTT要相对简单一些,只有Broker、Topic、发布者、订阅者等几部分构成。接下来我们先简单整理下MQTT日常使用中最常见的几个概念: 1.Topic主题:MQTT消息的主要传播途径, 我们向主题发布消息, 订阅主题, 从主题中读取消息并进行.业务逻辑处理, 主题是消息的通道 2.生产者:MQTT消息的发送者, 他们向主题发送消息 3.消费者:MQTT消息的接收者, 他们订阅自己需要的主题, 并从中获取消息 4.broker服务:消息转发器, 消息是通过它来承载的, EMQX就是我们的broker, 在使用中我们不用关心它的具体实现
其实, MQTT的使用流程就是: 生产者给broker的某个topic发消息->broker通过topic进行消息的传递->订阅该主题的消费者拿到消息并进行相应的业务逻辑
三、EMQX的安装和使用
下面以Windows为例,演示Windows下如何安装和使用EXQX。
step 1: 下载EMQ安装包,配置EMQ环境
EMQX服务端:https://www.emqx.io/zh/downloads?os=Windows
step 2: 下载压缩包解压,cmd进入bin文件夹
step 3 :启动EMQX服务
在命令行输入:emqx start 启动服务,打开浏览器输入:http://localhost:18083/ 进入登录页面。默认用户名密码 admin/public 。登录成功后,会进入emqx的后台管理页面,如下图所示:
四、使用SpringBoot整合MQTT协议
前面介绍了MQTT协议以及如何安装和启动MQTT服务。接下来演示如何在SpringBoot项目中整合MQTT实现消息的订阅和发布。 4.1 创建工程
首先,创建spring-boot-starter-mqtt父工程,在父工程下分别创建消息的提供者spring-boot-starter-mqtt-provider 模块和消息的消费者spring-boot-starter-mqtt-consumer模块。
4.2 实现生产者
接下来,修改生产者模块spring-boot-starter-mqtt-provider 相关的代码,实现消息发布的功能模块。 4.2.1 导入依赖包
修改pom.xml 文件,添加MQTT相关依赖,具体示例代码如下所示: org.springframework.boot spring-boot-starter org.springframework.boot spring-boot-starter-web org.springframework.boot spring-boot-starter-test test org.springframework.integration spring-integration-mqtt 5.3.2.RELEASE org.projectlombok lombok 1.18.4 4.2.2 修改配置文件
修改application.yml配置文件,增加MQTT相关配置。示例代码如下所示: spring: application: name: provider #MQTT配置信息 mqtt: #MQTT服务地址,端口号默认11883,如果有多个,用逗号隔开 url: tcp://127.0.0.1:11883 #用户名 username: admin #密码 password: public #客户端id(不能重复) client: id: provider-id #MQTT默认的消息推送主题,实际可在调用接口是指定 default: topic: topic server: port: 80804.2.3 消息生产者客户端配置
创建MqttProviderConfig配置类,读取application.yml中的相关配置,并初始化创建MQTT的连接。示例代码如下所示: import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Configuration; import javax.annotation.PostConstruct; @Configuration @Slf4j public class MqttProviderConfig { @Value("${spring.mqtt.username}") private String username; @Value("${spring.mqtt.password}") private String password; @Value("${spring.mqtt.url}") private String hostUrl; @Value("${spring.mqtt.client.id}") private String clientId; @Value("${spring.mqtt.default.topic}") private String defaultTopic; /** * 客户端对象 */ private MqttClient client; /** * 在bean初始化后连接到服务器 */ @PostConstruct public void init(){ connect(); } /** * 客户端连接服务端 */ public void connect(){ try{ //创建MQTT客户端对象 client = new MqttClient(hostUrl,clientId,new MemoryPersistence()); //连接设置 MqttConnectOptions options = new MqttConnectOptions(); //是否清空session,设置false表示服务器会保留客户端的连接记录(订阅主题,qos),客户端重连之后能获取到服务器在客户端断开连接期间推送的消息 //设置为true表示每次连接服务器都是以新的身份 options.setCleanSession(true); //设置连接用户名 options.setUserName(username); //设置连接密码 options.setPassword(password.toCharArray()); //设置超时时间,单位为秒 options.setConnectionTimeout(100); //设置心跳时间 单位为秒,表示服务器每隔 1.5*20秒的时间向客户端发送心跳判断客户端是否在线 options.setKeepAliveInterval(20); //设置遗嘱消息的话题,若客户端和服务器之间的连接意外断开,服务器将发布客户端的遗嘱信息 options.setWill("willTopic",(clientId + "与服务器断开连接").getBytes(),0,false); //设置回调 client.setCallback(new MqttProviderCallBack()); client.connect(options); } catch(MqttException e){ e.printStackTrace(); } } public void publish(int qos,boolean retained,String topic,String message){ MqttMessage mqttMessage = new MqttMessage(); mqttMessage.setQos(qos); mqttMessage.setRetained(retained); mqttMessage.setPayload(message.getBytes()); //主题的目的地,用于发布/订阅信息 MqttTopic mqttTopic = client.getTopic(topic); //提供一种机制来跟踪消息的传递进度 //用于在以非阻塞方式(在后台运行)执行发布是跟踪消息的传递进度 MqttDeliveryToken token; try { //将指定消息发布到主题,但不等待消息传递完成,返回的token可用于跟踪消息的传递状态 //一旦此方法干净地返回,消息就已被客户端接受发布,当连接可用,将在后台完成消息传递。 token = mqttTopic.publish(mqttMessage); token.waitForCompletion(); } catch (MqttException e) { e.printStackTrace(); } } }
4.2.4 生产者客户端消息回调
创建MqttProviderCallBack类并继承MqttCallback,实现相关消息回调事件,示例代码如下图所示: import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttMessage; public class MqttConsumerCallBack implements MqttCallback{ /** * 客户端断开连接的回调 */ @Override public void connectionLost(Throwable throwable) { System.out.println("与服务器断开连接,可重连"); } /** * 消息到达的回调 */ @Override public void messageArrived(String topic, MqttMessage message) throws Exception { System.out.println(String.format("接收消息主题 : %s",topic)); System.out.println(String.format("接收消息Qos : %d",message.getQos())); System.out.println(String.format("接收消息内容 : %s",new String(message.getPayload()))); System.out.println(String.format("接收消息retained : %b",message.isRetained())); } /** * 消息发布成功的回调 */ @Override public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { System.out.println(String.format("接收消息成功")); } }4.2.5 创建Controller控制器实现消息发布功能
创建SendController控制器类,实现消息的发送功能,示例代码如下所示: @Controller public class SendController { @Autowired private MqttProviderConfig providerClient; @RequestMapping("/sendMessage") @ResponseBody public String sendMessage(int qos,boolean retained,String topic,String message){ try { providerClient.publish(qos, retained, topic, message); return "发送成功"; } catch (Exception e) { e.printStackTrace(); return "发送失败"; } } }4.3 实现消费者
前面完成了生成者消息发布的模块,接下来修改消费者模块spring-boot-starter-mqtt-consumer实现消息订阅、处理的功能。 4.3.1 导入依赖包
修改pom.xml 文件,添加MQTT相关依赖,具体示例代码如下所示: org.springframework.boot spring-boot-starter org.springframework.boot spring-boot-starter-web org.springframework.boot spring-boot-starter-test test org.springframework.integration spring-integration-mqtt 5.3.2.RELEASE org.projectlombok lombok 1.18.4 4.3.2 修改配置文件
修改application.yml配置文件,增加MQTT相关配置。示例代码如下所示: spring: application: name: consumer #MQTT配置信息 mqtt: #MQTT服务端地址,端口默认为11883,如果有多个,用逗号隔开 url: tcp://127.0.0.1:11883 #用户名 username: admin #密码 password: public #客户端id(不能重复) client: id: consumer-id #MQTT默认的消息推送主题,实际可在调用接口时指定 default: topic: topic server: port: 80854.3.3 消费者客户端配置
创建消费者客户端配置类MqttConsumerConfig,读取application.yml中的相关配置,并初始化创建MQTT的连接。示例代码如下所示: import javax.annotation.PostConstruct; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Configuration; @Configuration public class MqttConsumerConfig { @Value("${spring.mqtt.username}") private String username; @Value("${spring.mqtt.password}") private String password; @Value("${spring.mqtt.url}") private String hostUrl; @Value("${spring.mqtt.client.id}") private String clientId; @Value("${spring.mqtt.default.topic}") private String defaultTopic; /** * 客户端对象 */ private MqttClient client; /** * 在bean初始化后连接到服务器 */ @PostConstruct public void init(){ connect(); } /** * 客户端连接服务端 */ public void connect(){ try { //创建MQTT客户端对象 client = new MqttClient(hostUrl,clientId,new MemoryPersistence()); //连接设置 MqttConnectOptions options = new MqttConnectOptions(); //是否清空session,设置为false表示服务器会保留客户端的连接记录,客户端重连之后能获取到服务器在客户端断开连接期间推送的消息 //设置为true表示每次连接到服务端都是以新的身份 options.setCleanSession(true); //设置连接用户名 options.setUserName(username); //设置连接密码 options.setPassword(password.toCharArray()); //设置超时时间,单位为秒 options.setConnectionTimeout(100); //设置心跳时间 单位为秒,表示服务器每隔1.5*20秒的时间向客户端发送心跳判断客户端是否在线 options.setKeepAliveInterval(20); //设置遗嘱消息的话题,若客户端和服务器之间的连接意外断开,服务器将发布客户端的遗嘱信息 options.setWill("willTopic",(clientId + "与服务器断开连接").getBytes(),0,false); //设置回调 client.setCallback(new MqttConsumerCallBack()); client.connect(options); //订阅主题 //消息等级,和主题数组一一对应,服务端将按照指定等级给订阅了主题的客户端推送消息 int[] qos = {1,1}; //主题 String[] topics = {"topic1","topic2"}; //订阅主题 client.subscribe(topics,qos); } catch (MqttException e) { e.printStackTrace(); } } /** * 断开连接 */ public void disConnect(){ try { client.disconnect(); } catch (MqttException e) { e.printStackTrace(); } } /** * 订阅主题 */ public void subscribe(String topic,int qos){ try { client.subscribe(topic,qos); } catch (MqttException e) { e.printStackTrace(); } } }4.3.4 消费者客户端消息回调
创建MqttConsumerCallBack类并继承MqttCallback,实现相关消息回调事件,示例代码如下图所示: import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttMessage; public class MqttConsumerCallBack implements MqttCallback{ /** * 客户端断开连接的回调 */ @Override public void connectionLost(Throwable throwable) { System.out.println("与服务器断开连接,可重连"); } /** * 消息到达的回调 */ @Override public void messageArrived(String topic, MqttMessage message) throws Exception { System.out.println(String.format("接收消息主题 : %s",topic)); System.out.println(String.format("接收消息Qos : %d",message.getQos())); System.out.println(String.format("接收消息内容 : %s",new String(message.getPayload()))); System.out.println(String.format("接收消息retained : %b",message.isRetained())); } /** * 消息发布成功的回调 */ @Override public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { System.out.println(String.format("接收消息成功")); } }4.3.5 创建Controller控制器,实现MQTT连接的建立和断开
接下来,创建Controller控制器MqttController,并实现MQTT连接的建立和断开等方法。示例代码如下所示: import com.weiz.mqtt.config.MqttConsumerConfig; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.ResponseBody; @Controller public class MqttController { @Autowired private MqttConsumerConfig client; @Value("${spring.mqtt.client.id}") private String clientId; @RequestMapping("/connect") @ResponseBody public String connect(){ client.connect(); return clientId + "连接到服务器"; } @RequestMapping("/disConnect") @ResponseBody public String disConnect(){ client.disConnect(); return clientId + "与服务器断开连接"; } }4.4 测试验证
首先,分别启动生产者spring-boot-starter-mqtt-provider 和消费者spring-boot-starter-mqtt-consumer两个项目,打开浏览器,输入地址http://localhost:18083/,在EMQX管理界面可以看到连接上来的两个客户端。如下图所示:
接下来,调用生产者的消息发布接口验证消息发布是否成功。使用Pomstman调用消息发送接口:http://localhost:8080/sendMessage ,如下图所示:
通过上图可以发现,生产者模块已经把消息发送成功。接下来查看消费者模块,验证消息是否处理成功。如下图所示:
通过日志输出可以发现,消费者已经成功接收到生产者发送的消息,说明我们成功实现在Spring Boot项目中整合MQTT实现了消息的发布和订阅的功能。
最后
以上就是如何在Spring Boot中使用MQTT的详细内容,更多关于在Spring Boot中MQTT的使用大家可以去自己研究学习。比如:如何利用qos机制保证数据不会丢失?消息的队列和排序?集群模式下的应用?等等。
央视的春晚王心凌董宇辉成亮点,比以往更接地气兜兜转转半个月,年味儿还是央视开的。近日,CCTV1和CCTV3联合播出了2023年网络春晚。虽然晚会被录播了,但还是有人翻盘。央视能播出这些细节,也证明了现在的人更自信了,媒体不
惠安百狮园露天石狮博物馆看遍中国各地名狮每条通往山上的道路两旁都有石狮把守泉州网1月18日讯(记者张素萍王柏峰文图)腊月里,惠安科山森林公园的百狮园仍绿意葱茏。沿着园中一条条山路步道摆放的石狮表情各异千姿百态。这里的10
中国古人发明的四个奇技淫巧的东西,有的放到现在仍算是高科技随着我国考古发现的古人遗留物品越来越多,网友们无不感叹古人的智慧之先进。我们现在用的一些物品跟古人的东西非常相似,似乎几百几千年过去,我们在有些方面压根没有进步,用的还是古代的科技
历史人物故事中国史上的两司马中国史学上的两司马司马迁,司马光司马迁生于汉武帝,公元前145年,陕西人。司马光生于北宋,公元1019年,是山西人。两人相差一千多年,但这两人当时都是著名的史学家。司马迁作史记,司
宗良从三方面科学看待2022年中国经济增速中国银行首席研究员宗良17日在京参加中新社举办的国是论坛2023年中国经济形势分析会。田雨昊摄中新网1月17日电题宗良从三方面科学看待2022年中国经济增速中新财经记者夏宾2022
资讯中汽协中国增程式电动汽车产业发展报告正式发布文懂车帝原创李德喆懂车帝原创行业懂车帝从中国汽车工业协会获悉,2023年1月16日,中国增程式电动汽车产业发展报告新书发布会暨增程式电动汽车产业发展研讨会在京举行。中国增程式电动汽
大话西游2曾经拥有一只变色黄金,就是网吧最靓的仔经历过早期版本的大话2玩家都应该知道,黄金兽冰雪魔泥石怪这三种召唤兽代替了当初猴精雷鸟人的地位,成为每个玩家里人手必备的宝宝。泥巴不用多说敏捷劣势逐渐被人们遗忘,但是敏功属性都还不
被评为江苏十大经典名菜,质地醇酥,滋味鲜香今天带给大家的这带菜有点化学成分在里面,根据相关的野史记载,在几百年前,在镇江的一条叫酒海街里有个开饭店的老板,有天贪小便宜,街边的猪肉佬杀猪太多,导致猪腿卖不出去。饭店老板看到后
马来西亚公开赛20击败日本组合,郑思维黄雅琼夺国羽赛季首冠北京时间1月15日下午,2023年马来西亚羽毛球公开赛进入了决赛的争夺,中国组合郑思维黄雅琼陈清晨贾一凡梁伟铿王昶在混双女双男双三个项目中争夺冠军。在率先进行的混双决赛中,郑思维黄
外国友人宁波乡村过大年1月16日,在浙江宁波海曙区龙观乡,20余名外国友人齐聚一堂,共同体验中国传统年俗活动,在欢声笑语和浓浓年味中,迎接农历春节的到来。年俗活动是新春佳节必不可少的重要活动。近年来,随
33岁杨颖同框江珊,赢了身材输了气质,小花和演员的差距一目了然女人越是上了年纪,其实会越来越有韵味,很多人觉得年纪的增长,会让自己感到特别焦虑烦恼,可实际上,当你真正踏入了50多岁的阶段,就会感到特别从容淡定,反倒是在年轻的时候,我们会有一种
谁知道crm营销管理系统是什么?好用吗?般来说,市场管理是企业管理客户的第一步,但是市场管理牵扯潜在客户管理,市场活动成本分析,市场活动追踪等等比较复杂的体系,做好营销的第一步,才能更好地管理客户,CRM系统的售前管理的
十大入门级红酒,你喝过哪几种?红酒流入中国已久,也慢慢被国人接受。但许多人的初次品酒体验并不是很好,原因可能是你没有选对入门的红酒。现在来为大家带来真正适合红酒品鉴初学者的红酒。1。澳大利亚黄尾红葡萄酒黄尾葡萄
零基础非师范生如何备战教师资格笔试?知己知彼,百战不殆。要想通过教师资格证笔试,那么首先就要先了解它。第一教师资格证考试内容。考小学资格证考两门。教育学知识和综合知识。考初中高中资格证则需要另外增加一门学科知识。教育
在医院太平间上班是什么感受?太平间又称停尸房,属于停留死人尸体的地方,太平间在古代称为义庄,也称为义冢,现代称为停尸房太平间,可以是一栋楼,也可能是一层楼或者一个房间,绝大多数情况下太平间都在负一楼的位置,这
你多大年纪生的二胎?32岁,大宝五岁的时候生的二胎,主要原因是大宝老是说想要个弟弟。我自己也觉得应该生俩个孩子,长大了也好有个伴。我是八零后,确切地说就是81年的,属鸡。我老二是33岁生的,今年七岁,
栾川景区免费的有哪些推荐?一切向钱看的时代貌似没有免费的景区让你去吧,不过,你可以办一张洛阳的旅游年票,鸡冠洞,龙峪湾,抱犊寨,天河大峡谷,蝴蝶谷都可以用年票,可以给你省一笔不小的费用,去一两个景点就赚回来
西宁周边景点怎么玩?西宁周边景点怎么玩?整个西宁市区周边的景点是非常多的,有的适合当天去当天回来,有的则适合住一天,那我就分别来说一下。先来说说适合玩一天或半天的景区,这些景区主要集中在湟中,湟源,大
丹东有哪些好的景区?丹东是全国最大的边境城市,她临江沿海,自然风光优美,人文景观独特,北有层峦叠障的青山为屏,南有一望无际的良田沃野与黄海相接。这座小城气候宜人,景观别致,山美水美,每年的3至10月份
如何进行伪原创创作,有什么工具吗?长期需要输入文章,可有时写作却找不到灵感,真的很尴尬,这个时候就可以找个伪原创工具来一键搞定了,今天就来说说,好多自媒体人们用的几个的写作软件,功能强大,适用于工作和学习的方方面面
大型超市招聘的水电工具体做哪些工作?题主你好!大型超市招聘的水电工具体工作如我简单给你阐述一下,一般水电工这块,单招水工的不多,但是单招电工的不少,因为超市水这方面的工作不是太多,关于电方面的超市涉及的可就太多了,或
如何快速成为电商运营高手?在淘宝运营店铺10余年,希望我的回答能够帮助到你。现在各个电商平台上的细节运营点,不太一样,但是大体运营思路都一样的。记住一个公式销售额店铺流量转化率客单价。题主就围绕着这个公式,