MQTT 协议设计原则
灵活的发布订阅和主题设计
发布订阅模式是传统 Client/Server 模式的一种解耦方案。发布者通过 Broker 与消费者之间通信,Broker 的作用是将收到的消息通过某种过滤规则,正确地发送给消费者。发布/订阅模式 相对于 客户端/服务器模式 的好处在于:
● 发布者和消费者之间不必预先知道对方的存在,比如不需要预先沟通对方的 IP Address 和 Port
● 发布者和消费者之间不必同时运行。因为 Broker 是一直运行的。
在 MQTT 协议里,上面提到的 过滤规则 是 Topic。比如:所有发布到 news 这个 Topic 的消息,都会被 Broker 转发给已经订阅了 news 的订阅者:
上图中订阅者预先订阅了 news,然后发布者向 Broker 发布了一条消息 "some msg" 并指定发布到 news 主题,Broker 通过 Topic 匹配,决定将这条消息转发给订阅者。
MQTT 的 Topic 有层级结构,并且支持通配符 + 和 #:
● + 是匹配单层的通配符。比如 news/+ 可以匹配 news/sports,news/+/basketball 可匹配到 news/sports/basketball。
● # 是一到多层的通配符。比如 news/# 可以匹配 news、 news/sports、news/sports/basketball 以及 news/sports/basketball/x 等等。
MQTT 的主题是不要预先创建的,发布者发送消息到某个主题、或者订阅者订阅某个主题的时候,Broker 就会自动创建这个主题。
带宽消耗最小化
MQTT 协议将协议本身占用的额外消耗最小化,消息头部最小只需要占用 2 个字节。
MQTT 的消息格式分三部分:
固定长度头部,2 个字节,所有消息类型里都有
可变长度头部,只有某些消息类型里有
Payload,只有某些消息类型里有
MQTT 的主要消息类型有:
● CONNECT / CONNACK
● PUBLISH / PUBACK
● SUBSCRIBE / SUBACK
● UNSUBSCRIBE / UNSUBACK
● PINGREQ / PINGRESP
● DISCONNECT
其中 PINGREQ / PINGRESP 和 DISCONNECT 报文是不需要可变头部的,也没有 Payload,也就是说它们的报文大小仅仅消耗 2 个字节。
在 CONNECT 报文的可变长度头部里,有个 Protocol Version 的字段。为了节省空间,只有一个字节。所以版本号不是按照字符串 "3.1.1" 存放的,而是使用数字 4 来表示 3.1.1 版本。
三个可选的QoS等级
为适应设备不同的网络环境,MQTT 设计了 3 个 QoS 等级,0, 1, 2:
● At most once (0)
● At least once (1)
● Exactly once (2)
QoS 0 是一种 "fire and forget" 的消息发送模式:Sender (可能是 Publisher 或者 Broker) 发送一条消息之后,就不再关心它有没有发送到对方,也不设置任何重发机制。
QoS 1 包含了简单的重发机制,Sender 发送消息之后等待接收者的 ACK,如果没收到 ACK 则重新发送消息。这种模式能保证消息至少能到达一次,但无法保证消息重复。
QoS 2 设计了略微复杂的重发和重复消息发现机制,保证消息到达对方并且严格只到达一次。
会话保持
MQTT 没有假设设备或 Broker 使用了 TCP 的保活机制4,而是设计了协议层的保活机制:在 CONNECT 报文里可设置 Keepalive 字段,来设置保活心跳包 PINGREQ/PINGRESP 的发送时间间隔。当长时间无法收到设备的 PINGREQ 的时候,Broker 就会认为设备已经下线。
总的来说,Keepalive 有两个作用:
● 发现对端死亡或者网络中断
● 在长时间无消息交互的情况下,保持连接不被网络设备断开
对于那些想要在重新上线后,重新收到离线期间错过的消息的设备,MQTT 设计了持久化连接:在 CONNECT 报文里可设置 CleanSession 字段为 False,则 Broker 会为终端存储:
● 设备所有的订阅
● 还未被设备确认的 QoS1 和 QoS 消息
● 设备离线时错过的消息
在线状态感知
MQTT 设计了遗愿(Last Will) 消息,让 Broker 在发现设备异常下线的情况下,帮助设备发布一条遗愿消息到指定的主题。
实际上在某些 MQTT 服务器的实现里 (比如 EMQX),设备上线或下线的时候 Broker 会通过某些系统主题发布设备状态更新,更符合实际应用场景。
TCP保活功能
KeepAlive初衷
客户端和服务器需要了解什么时候终止进程或者与对方断开连接。
应用进程之间没有任何数据交换,但仍然需要通过连接保持一个最小的数据流。
keepAlive是由一个保活计时器实现的。当计时器被激发,连接一端 将发送一个保活探测(简称保活)报文,另一端接收报文的同时会发送一个ACK作为响应。
TCP协议中实现KeepAlive但是保活机制存在争议。许多人认为,如果需要,这一功能也不应在TCP协议中提供, 而应在应用程序中实现。另一种观点认为,如果许多应用程序中都需要这一功能,那么在 TCP协议中提供的话就可以使所有的实现都包含这一功能。
描述
保活功能在默认情况下是关闭的。TCP连接的任何一端都可以请求打开这一功能。保活功能可以被设置在连接的一端、两端,或者两端都没有。
有几个配置参数可以用来控制保活 功能的操作。如果在一段时间(称为保活时间, keepalive time)内连接处于非活动状态,开启保活功能的一端将向对方发送一个保活探测报文。
如果发送端没有收到响应报文,那么经过一个已经提前配置好的保活时间间隔(keepalive interval),将继续发送保活探测报文,直到发送探测报文的次数达到保活探测数(keepalive probe), 这时对方主机将被确认为不可到达,连接也将被中断。
保活探测报文为一个空报文段(或只包含1字节)。它的序列号等于对方主机发送的 ACK报文的最大序列号减1。因为这一序列号的数据段已经被成功接收,所以不会对到达的报文段造成影响,但探测报文返回的响应可以确定连接是否仍在工作。
探测及其响应报文都不包含任何新的有效数据(它是“垃圾”数据),当它们丢失时也不会进行重传。
TCP保活功能工作过程中,开启该功能的一端会发现对方处于以下四种状态之一:
对方主机仍在工作,并且可以到达。对方的TCP响应正常,并且请求端也知道对方 在正常工作。请求端将保活计时器重置(重新设定为保活时间值)。如果在计时器超时之前有应用程序通过该连接传输数据,那么计时器将再次被设定为保活时间值。
对方主机已经崩溃,包括已经关闭或者正在重新启动。这时对方的TCP将不会响应。 请求端不会接收到响应报文,并在经过保活时间间隔指定的时间后超时。超时前,请求端会 持续发送探测报文,一共发送保活探测数指定次数的探测报文,如果请求端没有收到任何探 测报文的响应,那么它将认为对方主机已经关闭,连接也将被断开。
客户主机崩溃并且已重启。在这种情况下,请求端会收到一个对其保活探测报文的响应,但这个响应是一个重置报文段,请求端将会断开连接。
对方主机仍在工作,但是由于某些原因不能到达请求端(例如网络无法传输,而且可 圃 能使用ICMP通知也可能不通知对方这一事实)。这种情况与状态2相同,因为TCP不能区 分状态2与状态4,结果都是没有收到探测报文的响应。
请求端不必担心对方主机正常关闭然后重启(不同于主机崩溃)的情况。当系统关机时, 所有的应用进程也会终止(即对方的进程),这会使对方的TCP发送一个FIN。请求端接收 到FIN后,会向请求端进程报告文件结束,并在检测到该状态后退出。
Retained & Last Will & Clean Session
背景导入
让我们来看一下这个场景:
你有一个温度传感器,它每三个小时向一个 Topic 发布当前的温度。那么问题来了,有一个新的订阅者在它刚刚发布了当前温度之后订阅了这个主题,那么这个订阅端什么时候能才能收到温度消息?
对的,它必须等到三个小时以后,温度传感器再次发布消息的时候才能收到。在这之前,这个新的订阅者对传感器的温度数据一无所知。
怎么来解决这个问题呢?
这个时候就轮到 Retained 消息出场解决这个问题了。
Retained
Retained 消息是指在 PUBLISH 数据包中 Retain 标识设为 1 的消息,Broker 收到这样的 PUBLISH 包以后,将保存这个消息,当有一个新的订阅者订阅相应主题的时候,Broker 会马上将这个消息发送给订阅者。
Retain 消息有以下一些特点:
- 一个 Topic 只能有 1 条 Retained 消息,发布新的 Retained 消息将覆盖老的 Retained 消息;
- 如果订阅者使用通配符订阅主题,它会收到所有匹配的主题上的 Retained 消息;
- 只有新的订阅者才会收到 Retained 消息,如果订阅者重复订阅一个主题,也会被当做新的订阅者,然后收到 Retained 消息;
- Retained 消息发送到订阅者时,消息的 Retain 标识仍然是 1,订阅者可以判断这个消息是否是 Retained 消息,以做相应的处理。
注意:Retained 消息和持久性会话没有任何关系,Retained 消息是 Broker 为每一个 Topic 单独存储的,而持久性会话是 Broker 为每一个 Client 单独存储的。
如果你想删除一个 Retained 消息也很简单,只要向这个主题发布一个 Payload 长度为 0 的 Retained 消息就可以了。
那么开头我们提到的那个场景的解决方案就很简单了,温度传感器每 3 个小时发布当前的温度的 Retained 消息,那么无论新的订阅者什么时候进行订阅,它都能收到温度传感器上一次发布的数据。
我们需要在这里理清一些概念:
Client & Topic & Queue & Pub & Sub
1.Pub发送消息时,只能指定发送到哪一个Topic中
2.Queue是与Sub绑定的,一个Sub对应一个Queue,只有Sub才能创建Queue,Pub 和 Topic是不能创建Queue的
3.消息发送到Topic中后,通过binding,路由到对应的queue中,进行保存。这里的binding值,也是由Sub设置的
4.如果Clean Session设置成true,那么当sub下线后,对应的queue也会被自动删除掉;如果Clean Session设置成false,那么当sub下线后,对应的queue仍然保留着,消息也仍然会根据binding,自动路由到queue中,进行存储
5.注意,retained消息是与topic进行绑定的,每一个 Topic 可以存储一个retained消息,如果新上线一个订阅者,那么就新创建了一个queue,此时,如果topic会把retained消息,自动路由到新创建的queue中,这样新上线的订阅者就能感知到,该topic的最后一条retained消息了。
CleanSession设置如下:
@Bean("transientClientFactory")
public MqttPahoClientFactory transientClientFactory(CustomMqttProperties properties) {
final MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(properties.getUris());
// 如果设置为true则接收不到掉线的消息,改为false则可以收到掉线的消息,
options.setCleanSession(true);
options.setUserName(properties.getUserName());
options.setPassword(properties.getPassword().toCharArray());
final DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setConnectionOptions(
options);
return factory;
}
LWT(最后遗嘱)
LWT 全称为 Last Will and Testament,也就是我们在连接到 Broker 时提到的遗嘱,包括遗嘱主题、遗嘱 QoS、遗嘱消息等。
顾名思义,当 Broker 检测到 Client 非正常地断开连接的时候,就会向遗嘱主题里面发布一条消息。
这里注意下,遗嘱消息,是broker往topic发送的,不是client自己发送的
遗嘱相关的设置是在建立连接的时候,在 CONNECT 数据包里面的 Variable header(可变头与) Payload(有效载荷) 中 指定的。
- Will Flag:是1否0使用 LWT
- Will Topic:遗嘱主题名,不可使用通配符(在 CONNECT报文中的 有效载荷 中 设置)
- Will Qos:发布遗嘱消息时使用的 QoS 等级,如果遗嘱标志(Will Flag)被设置为0,遗嘱QoS也必须设置为0(0x00)
- Will Retain:遗嘱消息的 Retain 标识
- Will Message:遗嘱消息内容(在 CONNECT报文中的 有效载荷 中 设置)
Broker 在以下情况下认为 Client 是非正常断开连接的:
- Broker 检测到底层的 I/O 异常;
- Client 未能在 Keep Alive 的间隔内和 Broker 之间有消息交互;
- Client 在关闭底层 TCP 连接前没有发送 DISCONNECT 数据包;
- Broker 因为协议错误关闭和 Client 的连接,比如 Client 发送了一个格式错误的 MQTT 数据包。
如果 Client 通过发布 DISCONNECT 数据包断开连接,这个属于正常断开连接,不会触发 LWT 的机制,同时,Broker 还会丢弃掉这个 Client 在连接时指定的 LWT 参数。
根据遗嘱的属性和触发机制,我们不难看出,遗嘱常用于获取设备的连接状态。
注意,设置好遗嘱以后还不够(因为你只要订阅者一启动就会收到遗嘱消息,如果此时发布者已经在线,会导致不准确),
所以,还需要在设备成功连接MQTT的时候主动发个消息,发送的主题必须和遗嘱的主题相同,设置好消息的 retain 属性,让其自动纠正过来。