Android基于MQTT实现消息通知
一、写在前面
在对接项目中IoT时,发现目前有对MQTT做了接入,这里记录一下,官方的资料比较详细,这里主要从实现细节出发;对具体的需求以及配套的技术方案进行整理,以供参考。一、IoT与MQTT
提到IoT(Internet of Things)、IIoT(Industrial IoT )不得不说MQTT,其被广泛的应用在物联网以及工业物联网之中,是一种消息传递协议。不同于我们所认识的平时常见的一些智能设备,如手机、电脑、平板等;这些设备一般都有着很好的计算能力,所依赖的网络环境很优质。但是一般的硬件设备性能较差,网络环境不稳定,而MQTT则是专门针对于硬件性能,网络状态不稳定场景下而生的。有着天然的优势。二、什么是MQTT
MQTT是用于物联网的最常用的消息传递协议(IoT)。MQTT代表MQ遥测运输。该协议是一组规则,它定义了IoT设备如何通过Internet发布和订阅数据。用于IoT和工业IoT(IIT)设备(例如嵌入式设备,传感器,工业PLC等)之间的消息传递和数据交换。协议是事件驱动的,并使用发布/订阅(PUB / SUB)模式连接设备。发布者和接收器(订阅者)通过主题通信,并彼此分离。它们之间的连接由MQTT代理处理。MQTT代理过滤所有传入消息并将其正确分发给订阅者。三、与传统Http的区别MQTT以数据为中心,底层基于TCP链接,直接操作轻量级的二进制数据,并且数据包很小(可以小到一个字符,两个字节)。划重点,由于这个特性,其对于网络环境状态要求没有HTTP那么高,这也是为什么广泛应用于IoT设备的原因之一。MQTT基于发布/订阅模式,区别于HTTP的请求/回调模式,这就决定了一个同一个设备即可以是客户端(Client)同时也可以是服务端(Server),回想发布订阅模式,消息的发布可以是1toN(N>=0),而HTTP则是1to1。MQTT的发布/订阅架构决定了其无法基于UDP(面向无链接),而HTPP底层可以是基于TCP或者UDP。消息体量的区别,MQTT数据包很小,而HTTP数据量一般较大。四、MQTT构成部分1.Publish&Subscribe
MQTT对于发布订阅做了自身的解耦处理,主要是从三个维度出发,1.空间解耦:发布者和订阅者不需要相互了解(例如,没有IP地址和端口的交换)。2.时间解耦:发布者和订阅者不需要同时运行。3.同步解耦:在发布或接收期间,两个组件的操作不需要中断。详细信息2.Client、Broker
详细信息3.Topics&Best Practices
主要需要注意Topics的匹配规则,分为单项通配符,与多项通配符。单项以+连接:this/is/+/single,其中仅仅+部分可以被替换为单个路径(以/分割)。多项通配符仅支持在尾端支持:this/is/multi/#,并且是多级的。
详细信息4.Keep Alive
保活时效,包括其他的字段,官方文档都给出了很详细的解释,认真了解一项技术实现,官方的文档还是最好的选择文档。这里主要基本认识MQTT是个什么东西,具体的实现细节与规范也不是一两句话可以说的清楚的,且可能存在误导的风险。MQTT五、MQTT实际项目中的使用1.实现什么需求?
以实际的项目为例,现需要实现的功能有:服务端下发消息通知到IoT设备,消息以Type区分,不同的消息需对应不同的处理措施。根据消息的不同,有语音播放、叫号、本地数据更新、服务端配置下发。
功能相对很简单,总结就是服务端推送消息,设备根据消息做出响应。2.具体实现方案
导入依赖implementation "org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.1.0" implementation "org.eclipse.paho:org.eclipse.paho.android.service:1.1.1"
主要分为几个类:a.主体请求Client,b.数据返回的回调dataCallback,c.链接状态回调connectCallback,d.具体消息处理策略IHandler。方案主要就包括这几个大类,逐步实现各个细节。3.数据接口回调IDataCallbackinterface IDataCallback { fun connectionError(cause: Throwable?) fun dataMessage(dataMessage: String) }4.连接状态回调IConnectionCallbackinterface IConnectionCallback { fun connectSuccess() fun connectFail(reason: Throwable?) }5.CMqttClient链接类
在实现之前,列举几个关键的参数,参数配置在MqttConnectOptions中val options = MqttConnectOptions() //默认为true,表示非持久订阅,无论服务端或者客户端重启,不会保持状态,重启后指定消息也无法送达 //设置为false,表示持久订阅,服务端与客户端重启或重链,指定消息可以送达 options.isCleanSession = false //链接的用户名(账号名) options.userName = user //链接的用户密码(账户密码) options.password = password.toCharArray() //链接超时值s,默认30s,为0时,等待网络状态,即成功或失败 options.connectionTimeout = connectTimeout //默认60s,检测服务端是否可用,为0时则禁止客户端保活,保活间隔内,没有消息的情况下客户端会通过ping来检测链接是否保持 options.keepAliveInterval = keepAliveInterval //是否开启自动重连接,初始尝试重连是等待1s,失败情况下,延迟加倍,直到2分钟。 options.isAutomaticReconnect = true
关于自动重新连接有三个必要条件,cleanSession需要设置为false,isAutomaticReconnect需要设置为true,并且初始已经连接过。划重点,这里就要求,MQTT虽然可以自动重试连接当时必须有这三个前提,那么首次由于网络等其他原因,这里的重试机制是需要我们自身去实现的,也就是需要保证首次能够连接到服务端。源码以及注释://Reconnect Only appropriate if cleanSession is false and we were connected. Declare as synchronized to avoid multiple calls to this method to send connect multiple times synchronized void reconnect() { //.... } //注释说的很明确三个条件,1.cleanSession需设置为false,2.isAutomaticReconnect需设置为true,并且是之前是已经连接过了。class CMqttClient { //正在连接服务器 private var connecting = false private var mqttAndroidClient: MqttAndroidClient? = null //连接到服务器 fun connectToServer( context: Context, host: String, clientId: String, accountName: String, accountPsw: String, connectTimeout: Int = 20, keepAliveTime: Int = 60, connectionCallback: IConnectionCallback? = null, dataCallback: IDataCallback? = null ) { if(null == mqttAndroidClient) { mqttAndroidClient = MqttAndroidClient(context, host, clientId) mqttAndroidClient?.setCallback(object: MqttCallback { override fun connectionLost(cause: Throwable?) { dataCallback?.connectionError(cause) } override fun messageArrived(topic: String?, message: MqttMessage?) { message?.let { val payLoad = String(it.payload) Log.d(xxxxxxxxx) dataCallback?.dataMessage(payLoad) } } override fun deliveryComplete(token: IMqttDeliveryToken?) { //do something } }) } connecting = true val options = MqttConnectOptions() options.isCleanSession = false options.userName = user options.password = password.toCharArray() options.connectionTimeout = connectTimeout options.keepAliveInterval = keepAliveInterval options.isAutomaticReconnect = true mqttAndroidClient?.connect(options, null, object: IMqttActionListener{ override fun onSuccess(asyncActionToken: IMqttToken?) { connecting = false connectionCallback?.connectSuccess() } override fun onFailure(asyncActionToken: IMqttToken?, exception: Throwable?) { connecting = false connectionCallback?.connectFail(exception) } }) } //是否已经建立链接 fun hasConnected(): Boolean { return try { mqttAndroidClient?.isConnected == true } catch { false } } //断开与服务端连接 fun disConnectFromServer() { if(mqttAndroidClient?.isConnected == true) { mqttAndroidClient?.disconnect() } connecting = false } //释放资源 fun relaseClient() { mqttAndroidClient?.close() mqttAndroidClient = null connecting = false } //订阅消息 fun subscribe(topics: Array, qos: IntArray, timeOut: Long = 2000): Boolean { return mqttAndroidClient?.let { try{ val mqttToken = it.subscribe(topics, qos) mqttToken.waitForCompletion(timeout) mqttToken.isComplete true } catch { Log.d(xxxx) false } }?: false } //取消订阅 fun unSubscribe(): Boolean { //省略 } }
需要注意的是这里的ClientId,是唯一性的,像IoT设备以设备deviceId作为ClientId,如果换成用户userId,当在多设备登录的情况下,那么重试等其他一些机制会影响预期结果,给排查问题带来一定的难度。5.消息处理接口IHandlerinterface IHandler { fun handlerMessage(message: String) }
消息体中会包含不同的type,根据不同的type实现不同的处理器,当然为了灵活还要借助注解机制。6.注解模版@Target(AnnotationTarget.CLASS) @Retention(AnnotationRetention.SOURCE) annotation class MQTTHandler { val groupName: String val type: String }
通过反射的方式加载对应的IHandler实现类,核心代码interface IHandlerProvide { fun provideHandler(handlerType: String): IHandler? fun release() } class XXXHandlerProvider: IHandlerProvide { override fun provideHandler(handlerType: String): IHandler? { //find the imp IHandler } override fun release() { //release the source } } override fun provideHandler(handlerType: String): IHandler? { val handlerClass = Maneger.instance.findIHandlersByGroup(GroupName)?.get(handlerType) }
使用时,直接加上注解:@MQTTHandler(groupName = groupxxxx, type = typeNamexxxx) class TestHandler: IHandler { override fun handlerMessage(message: String) { //do something what you want } }
整个流程的主要部分已经给出,核心是通过不同的消息type查找出对应的处理器;当然这部分主要是由注解完成的,对于处理器的查找则是通过反射的方式来进行匹配的。六、文档
MQTT官网
东方新能科技直流变桨系统交流化改造产品成功批量应用5月28日,由东方新能科技自主研发的直流变桨系统交流化改造产品在云南某运行近10年的风电场成功批量应用,改造后变桨系统故障率降低90以上,标志着该变桨系统技改产品研发取得成功,并成
当大家都快忘记魅蓝时,魅族又准备重启这个系列了?众所周知,各大手机品牌都喜欢设立几个自己的独立子品牌,比如小米与红米(Redmi),vivo与iQOO,OPPO与realme,华为和荣耀(现在彻底分离了)等等,而在数年前国内智能
复合材料赋能新基建,我国研发取得新进步2018年12月29日,南京长江大桥恢复通车。此次维修运用了新的材料和新的技术,在保留大桥原有韵味的基础上,让大桥从内到外,焕然一新,在一些关键部位,先进复合材料玄武岩纤维已嵌入体
HUAWEI的一步!为什么可以震惊世界?HUAWEI的宣布成功,这意味着,第五代通信技术网络的到来,一代代的通信技术的更迭,其里面的科技企业也起到了至关重要的影响。占据全球的订单份额此前,HUAWEI的总裁任正非对HUA
物联网卡大家了解么?一起看看需要注意的3个方面吧大家知道物联网卡是什么么?它是目前一种使用广泛的数据传输方式,是移动电信联通三大运营商联合推出的,并且具备传输稳定且资费优惠的优势,获得了众多企业的认同,那么物联网卡能使用多久呢?
华为芯片之战的关键在哪?11nm成了难以逾越的鸿沟说起全球芯片企业中最出名的,当属中国的台积电,它是全球芯片代工企业中最大的巨头,它包揽了全球50的订单,像华为中兴的芯片全部是台积电生产的,这足以看出台积电的实力。华为被美国压制要
孔令贤为什么会离开华为?华为总裁任正非的道歉说起华为,相信大家对华为这个品牌并不陌生,华为自身是一个技术公司,一直密切关注新技术的发展,华为公司自己的理念也是特别强大的,在华为公司创立之初,便把培训员工放在了第一位,在看到华
高考结束想放飞自我,却不想被行李拖累!咋办?8号下午,高考生们的最后一科终于结束,不管是在进入考场之前的紧张还是对考试过程中的一些失误的懊恼,这些都已经成为了过去。许多考生都会在高考结束之后试图撒欢一波,但是在撒欢之前,还要
中国如何从一无所有到全球领先?我国的5G基站数量在全球领先现在我们国家的5G基站数量在全球已经是处于领先位置了,截止到现在已经拥有35万座。华为5G技术让中国在国际上扬眉吐气如果西方企业愿意买断华为的5G技术,
中国为什么开通快捷通道?对于这场新冠肺炎疫情,我国采取了强势有效的疫情防控举措,所以我国也成了全球率先复工复产的国家之一,而作为全球重要的市场以及亚洲金融中心,愈来愈多的国家希望能够跟我国达成快捷通道,以
隐私与安全对比,小米创双高,投入高和安全等级步步高手机已经成为了我们生活的一部分,在用手机娱乐的时候,我们的信息隐私可能正在被盗取,不知道你有没有关注过隐私数据安全数据的泄露这些问题,它给我们带来的风险是非常巨大的。小米安全与隐私