Administrator
发布于 2024-11-12 / 7 阅读
0
0

MQTT 5

MQTT 5 Feature

https://www.rabbitmq.com/blog/2023/07/21/mqtt5

https://www.emqx.com/zh/blog/a-comprehensive-comparison-of-open-source-mqtt-brokers-in-2023

https://help.aliyun.com/zh/apsaramq-for-mqtt/mqtt-upgraded/developer-reference/mqtt-5-0-overview

特性说明
用户属性MQTT 5.0报文中引入了用户属性能力,允许在控制报文中添加额外的键值对,允许用户定义完整的UTF8字符串键值对。除了心跳报文,MQTT 5.0控制报文都支持设置用户属性,设备/应用程序可以通过用户属性传递额外的自定义信息。
主题别名使用蜂窝网络接入物联网平台的物联网设备,大部分对流量和功耗非常敏感。在这种场景下,更小的数据包传输可以降低电量和流量的消耗。主题别名可以将字符串主题替换为数字,从而降低消息的数据包长度。
会话过期在MQTT 5.0中,把cleanSession修改为cleanStart,还增加了会话过期间隔属性Session Expiry Interval,当cleanStart为false并且连接的时候指定了会话过期时间,那么在会话过期之前客户端重连的时候可以恢复其会话和所有相关状态,当cleanStart为true的时候,离线后会直接删除会话。
消息过期当消息过期间隔被设置并且消息已过期时,存储的消息将被删除,不会被推送。这在物联网弱网场景下很有用,例如,您不希望过期的指令下发到设备,造成设备行为异常。
订阅选项客户端订阅某个主题时,可以指定一些参数和选项(例如QoS),可以有效地管理和优化消息传递过程,满足不同应用场景的需求。
请求与响应模式MQTT 5.0 提供了一种新的请求/响应模式,以支持一对一的通信。在物联网设备的实际业务场景下,可使用请求与响应模式,解决需要回复的场景需求。
消息格式描述消息增加Payload格式说明,保证发布者和订阅者对数据格式的一致理解,帮助订阅者正确处理数据。
增强端云交互支持功能参数协商、增强错误码、服务端主动断开等特性,提高问题排查效率。

一些限制:

  • not support Shared Subscriptions
  • Retained messages stored and queried only node local

重要特性

会话过期

功能介绍

会话过期(Session Expiry Interval)是指在客户端和服务端(Broker)之间的会话结束后,会话状态数据被保留一段时间。会话状态包括未确认的QoS 1消息、客户端的订阅等。

会话过期时间可以在客户端连接到MQTT服务端的时候,在CONNECT消息的会话过期间隔(Session Expiry Interval)字段中指定。

在MQTT 5.0中,把cleanSession修改为cleanStart,还增加了会话过期间隔属性Session Expiry Interval,当cleanStar为false并且连接的时候指定了会话过期时间,那么在会话过期之前客户端重连的时候可以恢复其会话和所有相关状态,当cleanStart为true的时候,离线后会直接删除会话。

使用场景

断连重连场景:在网络不稳定或者移动环境中,例如车联网,客户端的连接可能频繁中断,通过设置较长的会话过期时间,确保连接重建后会话可以无缝恢复。

临时设备连接:对于那些短暂连接、发送少量数据后即断开连接的设备(如一次性数据上报设备或临时加入网络的传感器),可以设置较短的会话过期时间,这样可以有效释放服务器资源,避免无用会话占用存储。

安全与隐私保护:在对数据保密性和隐私有严格要求的应用中,可以设置合理的会话过期时间,确保在客户端长时间不活动后,其会话状态和相关数据被自动清除,降低数据泄露的风险。

MQTT 5.0的会话过期的功能,可以以根据不同的应用场景灵活调整会话过期时间,以达到最佳的系统性能、资源利用率和用户体验。

消息示例
例如,有一个应用场景,其中一个IoT设备需要经常断开并重新连接到MQTT服务端。为了在设备断线时不丢失其订阅和消息,这时可以使用MQTT 5.0的会话过期特性。

此处展示该特性在设备首次连接和后续重连接时的使用:

首次连接:

设备首次连接到MQTT服务端,发送一个CONNECT消息,并设置会话过期间隔为4个小时。这意味着即使设备断开连接,MQTT服务端将保留其会话状态(包括订阅和队列中的消息)最多4个小时。

CONNECT
ClientId: IoT_Device123
Session Expiry Interval: 14400 // 4小时,以秒为单位
Clean Start: 0 // 用于指示服务端继续使用现有的会话
// 其他需的CONNECT报文信息

在这4个小时内,如果设备因为网络问题导致连接断开,MQTT服务端会保持该设备的所有订阅和未确认的消息。

重连接:

CONNECT
ClientId: IoT_Device123
Session Expiry Interval: 14400 // 再次指定为4小时
Clean Start: 0 //告诉服务端使用上次保存的会话
// 其他需的CONNECT报文信息

当IoT设备恢复连接时,由于之前的会话并未过期,服务端将使用之前保存的会话状态,包括IoT设备的订阅信息,以及它因断线而未能收到的任何消息,这一切将在重连接后送达给设备。

超时情形:

如果IoT设备在超过4个小时而没有重连,那么MQTT服务端就会清除相关的会话状态。此时,如果IoT设备再次连接,它需要像首次连接一样重新建立新的会话,包括重新订阅所需的主题,并且先前队列中的消息则会丢失。

消息格式描述

为了实现MQTT payload中数据,可以是json字符串,也可以是二进制数据,也可以是十六进制数据

我们需要指定Payload Format Indicator 和 Content Type。

contentType可以是application/json或者application/octet-stream,或者hex等



Payload Format Indicator 是MQTT 5.0特有的特性,这是一个标志位,用于指示PUBLISH消息的负载内容是否是预定义的格式。

它使得接收者可以更容易地解释PUBLISH消息的负载。这个标志位可以设置为0或1,0表示未指定,1表示负载是一个UTF-8编码的字符数据。

客户端发送到服务端时,如果Payload Format Indicator为1,则服务端会校验格式是否为UTF-8,如果不是UTF-8则发送失败,在QoS为1时回复错误码为PAYLOAD_FORMAT_INVALID的PubAck报文。

image-20241112100012385

当我们发送json字符串类型的数据时,往往需要指定Payload Format Indicator为1,且contentType为application/json



Content Type 也是 MQTT 5.0 引入的一个全新属性,与 Payload Format Indicator 类似,它同样仅存在于 PUBLISH 报文和 CONNECT 报文的遗嘱消息中。

Content Type 的值是一个 UTF-8 编码的字符串,用来描述应用消息的内容,这可以帮助接收端了解如何解析应用消息的有效载荷。

这个字符串的具体内容完全由发送端和接收端决定,在消息的整个传输过程中,服务端不会使用这个属性来验证消息内容的格式是否正确,它只负责将这个属性原封不动地转发给订阅者。

所以只要接收端能够理解,你甚至可以用 “cocktail” 来描述 JSON 类型。但为了避免造成不必要的困扰,通常我们更推荐使用已知的 MIME 类型来描述消息内容,例如 application/json、application/xml 等等。

Content Type 在需要支持多种数据类型的场景中非常有用。比如当我们在聊天软件中向对方发送图片,图片可能有 png,gif,jpeg 等多种格式,如何向对端指示我们发送的二进制数据所对应的图片格式?

在 5.0 之前,我们可能会选择在主题中包含图片格式,比如 to/userA/image/png。但显然,随着支持的图片格式的增加,系统中的主题也会泛滥成灾。而在 5.0 中,我们只需要将 Content Type 属性设置为 image/png 即可。

image-20241112100338431


Payload Format Indicator 与 Content Type 必须一起使用吗?

Payload Format Indicator 和 Content Type 是否需要同时使用,主要取决于我们的应用场景。

对于订阅端来说,他可以根据 Content Type 属性的值来判断消息的内容应该是 UTF-8 字符串还是二进制数据,所以 Payload Format Indicator 属性的意义不大。

不过对于服务端来说,他并不了解 Content Type 的值的含义,所以如果我们希望服务端检查消息的有效载荷是否符合 UTF-8 编码规范,就必须借助 Payload Format Indicator 属性。

Receive Maximum

MQTT 5.0新增了Receive Maximum属性,它存在于CONNECT报文与CONNACK报文,表示客户端或服务端能够同时处理的QoS为1、QoS为2的PUBLISH报文最大数量,即对端可以使用的最大发送配额,最大值为65535,超过该数值则按照65535设置。

Receive Maximum属性提供了告诉对端发送QoS为1、QoS为2的PUBLISH的最大数量,对端发现未决PUBLISH个数等于Receive Maximum时,不能再发送QoS为1、QoS为2的PUBLISH消息了,以此来实现流量控制。

Spring-Integration-MQTT

image-20241111142920436

MQTT v5 Support

https://docs.spring.io/spring-integration/reference/mqtt.html#mqtt-v5

Starting with version 5.5.5, the spring-integration-mqtt module provides channel adapter implementations for the MQTT v5 protocol. The org.eclipse.paho:org.eclipse.paho.mqttv5.client is an optional dependency, so has to be included explicitly in the target project.

Since the MQTT v5 protocol supports extra arbitrary properties in an MQTT message, the MqttHeaderMapper implementation has been introduced to map to/from headers on publish and receive operations. By default, (via the * pattern) it maps all the received PUBLISH frame properties (including user properties). On the outbound side it maps this subset of headers for PUBLISH frame: contentType, mqtt_messageExpiryInterval, mqtt_responseTopic, mqtt_correlationData.

The outbound channel adapter for the MQTT v5 protocol is present as an Mqttv5PahoMessageHandler. It requires a clientId and MQTT broker URL or MqttConnectionOptions reference. It supports a MqttClientPersistence option, can be async and can emit MqttIntegrationEvent objects in that case (see asyncEvents option). If a request message payload is an org.eclipse.paho.mqttv5.common.MqttMessage, it is published as is via the internal IMqttAsyncClient. If the payload is byte[] it is used as is for the target MqttMessage payload to publish. If the payload is a String it is converted to byte[] to publish. The remaining use-cases are delegated to the provided MessageConverter which is a IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME ConfigurableCompositeMessageConverter bean from the application context. Note: the provided HeaderMapper<MqttProperties> is not used when the requested message payload is already an MqttMessage. The following Java DSL configuration sample demonstrates how to use this channel adapter in the integration flow:

@Bean
public IntegrationFlow mqttOutFlow() {
    Mqttv5PahoMessageHandler messageHandler = new Mqttv5PahoMessageHandler(MQTT_URL, "mqttv5SIout");
    MqttHeaderMapper mqttHeaderMapper = new MqttHeaderMapper();
    mqttHeaderMapper.setOutboundHeaderNames("some_user_header", MessageHeaders.CONTENT_TYPE);
    messageHandler.setHeaderMapper(mqttHeaderMapper);
    messageHandler.setAsync(true);
    messageHandler.setAsyncEvents(true);
    messageHandler.setConverter(mqttStringToBytesConverter());

    return f -> f.handle(messageHandler);
}

注意:

The org.springframework.integration.mqtt.support.MqttMessageConverter cannot be used with the Mqttv5PahoMessageHandler since its contract is aimed only for the MQTT v3 protocol.

If connection fails on start up or at runtime, the Mqttv5PahoMessageHandler tries to reconnect on the next message produced to this handler. If this manual reconnection fails, the connection is exception is thrown back to the caller. In this case the standard Spring Integration error handling procedure is applied, including request handler advices, e.g. retry or circuit breaker.

See more information in the Mqttv5PahoMessageHandler javadocs and its superclass.

The inbound channel adapter for the MQTT v5 protocol is present as an Mqttv5PahoMessageDrivenChannelAdapter. It requires a clientId and MQTT broker URL or MqttConnectionOptions reference, plus topics to which to subscribe and consume from. It supports a MqttClientPersistence option, which is in-memory by default. The expected payloadType (byte[] by default) can be configured, and it is propagated to the provided SmartMessageConverter for conversion from byte[] of the received MqttMessage. If the manualAck option is set, then an IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK header is added to the message to produce as an instance of SimpleAcknowledgment. The HeaderMapper<MqttProperties> is used to map PUBLISH frame properties (including user properties) into the target message headers. Standard MqttMessage properties, such as qos, id, dup, retained, plus received topic are always mapped to headers. See MqttHeaders for more information.

Starting with version 6.3, the Mqttv5PahoMessageDrivenChannelAdapter provides constructors based on the MqttSubscription for fine-grained configuration instead of plain topic names. When these subscriptions are provided, the qos option of the channel adapter cannot be used, since such a qos mode is a part of MqttSubscription API.

The following Java DSL configuration sample demonstrates how to use this channel adapter in the integration flow:

@Bean
public IntegrationFlow mqttInFlow() {
    Mqttv5PahoMessageDrivenChannelAdapter messageProducer =
        new Mqttv5PahoMessageDrivenChannelAdapter(MQTT_URL, "mqttv5SIin", "siTest");
    messageProducer.setPayloadType(String.class);
    messageProducer.setMessageConverter(mqttStringToBytesConverter());
    messageProducer.setManualAcks(true);

    return IntegrationFlow.from(messageProducer)
            .channel(c -> c.queue("fromMqttChannel"))
            .get();
}

注意:The org.springframework.integration.mqtt.support.MqttMessageConverter cannot be used with the Mqttv5PahoMessageDrivenChannelAdapter since its contract is aimed only for the MQTT v3 protocol.

See more information in the Mqttv5PahoMessageDrivenChannelAdapter javadocs and its superclass.

注意:

It is recommended to have the MqttConnectionOptions#setAutomaticReconnect(boolean) set to true to let an internal IMqttAsyncClient instance to handle reconnects. Otherwise, only the manual restart of Mqttv5PahoMessageDrivenChannelAdapter can handle reconnects, e.g. via MqttConnectionFailedEvent handling on disconnection.

Shared MQTT Client Support

If a single MQTT ClientID is required for several integrations, multiple MQTT client instances cannot be used because MQTT brokers may have a limitation on a number of connections per ClientID (typically, a single connection is allowed). For having a single client reused for different channel adapters, a org.springframework.integration.mqtt.core.ClientManager component may be used and passed to any channel adapter needed. It will manage MQTT connection lifecycle and do automatic reconnects if needed. Also, a custom connection options and MqttClientPersistence may be provided to the client manager just as currently it can be done for channel adapter components.

Note that both MQTT v5 and v3 channel adapters are supported.

The following Java DSL configuration sample demonstrates how to use this client manager in the integration flow:

@Bean
public ClientManager<IMqttAsyncClient, MqttConnectionOptions> clientManager() {
    MqttConnectionOptions connectionOptions = new MqttConnectionOptions();
    connectionOptions.setServerURIs(new String[]{ "tcp://localhost:1883" });
    connectionOptions.setConnectionTimeout(30000);
    connectionOptions.setMaxReconnectDelay(1000);
    connectionOptions.setAutomaticReconnect(true);
    Mqttv5ClientManager clientManager = new Mqttv5ClientManager(connectionOptions, "client-manager-client-id-v5");
    clientManager.setPersistence(new MqttDefaultFilePersistence());
    return clientManager;
}

@Bean
public IntegrationFlow mqttInFlowTopic1(
        ClientManager<IMqttAsyncClient, MqttConnectionOptions> clientManager) {

    Mqttv5PahoMessageDrivenChannelAdapter messageProducer =
        new Mqttv5PahoMessageDrivenChannelAdapter(clientManager, "topic1");
    return IntegrationFlow.from(messageProducer)
            .channel(c -> c.queue("fromMqttChannel"))
            .get();
}

@Bean
public IntegrationFlow mqttInFlowTopic2(
        ClientManager<IMqttAsyncClient, MqttConnectionOptions> clientManager) {

    Mqttv5PahoMessageDrivenChannelAdapter messageProducer =
        new Mqttv5PahoMessageDrivenChannelAdapter(clientManager, "topic2");
    return IntegrationFlow.from(messageProducer)
            .channel(c -> c.queue("fromMqttChannel"))
            .get();
}

@Bean
public IntegrationFlow mqttOutFlow(
        ClientManager<IMqttAsyncClient, MqttConnectionOptions> clientManager) {

    return f -> f.handle(new Mqttv5PahoMessageHandler(clientManager));
}

示例

公共配置

package com.jshxhb.framework.mqtt.entity;

import org.springframework.boot.context.properties.ConfigurationProperties;

@ConfigurationProperties(value = "custom.mqtt")
public class CustomMqttProperties {

    /** mqtt Broker地址 */
    private String[] uris;
    /** 连接用户名 */
    private String userName;
    /** 连接密码 */
    private String password;
    /**
     * 客户端id
     */
    private String clientIdPrefix;

    private String companyId;

    private String defaultTopic;

    public String[] getUris() {
        return uris;
    }

    public void setUris(String[] uris) {
        this.uris = uris;
    }

    public String getUserName() {
        return userName;
    }

    public void setUserName(String userName) {
        this.userName = userName;
    }

    public String getPassword() {
        return password;
    }

    public void setPassword(String password) {
        this.password = password;
    }

    public String getClientIdPrefix() {
        return clientIdPrefix;
    }

    public void setClientIdPrefix(String clientIdPrefix) {
        this.clientIdPrefix = clientIdPrefix;
    }

    public String getCompanyId() {
        return companyId;
    }

    public void setCompanyId(String companyId) {
        this.companyId = companyId;
    }

    public String getDefaultTopic() {
        return defaultTopic;
    }

    public void setDefaultTopic(String defaultTopic) {
        this.defaultTopic = defaultTopic;
    }
}

package com.jshxhb.framework.mqtt.config;

import com.jshxhb.framework.mqtt.entity.CustomMqttProperties;
import org.eclipse.paho.mqttv5.client.MqttConnectionOptions;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeUnit;

@Component
public class Mqttv5ConnectionHelper {

    @Autowired
    private CustomMqttProperties properties;

    public MqttConnectionOptions getMqttConnectionOptions() {
        MqttConnectionOptions options = new MqttConnectionOptions();
        options.setServerURIs(properties.getUris());
        options.setUserName(properties.getUserName());
        options.setPassword(properties.getPassword().getBytes(StandardCharsets.UTF_8));
        options.setReceiveMaximum(20);
        options.setAutomaticReconnect(true);
        options.setCleanStart(false);
        options.setSessionExpiryInterval(TimeUnit.DAYS.toSeconds(5));
        return options;
    }
}

消费端

package com.jshxhb.iot2servercollect.config;

import com.jshxhb.framework.mqtt.config.MqttConfiguration;
import com.jshxhb.framework.mqtt.config.Mqttv5ConnectionHelper;
import com.jshxhb.framework.mqtt.entity.CustomMqttProperties;
import com.jshxhb.iot2servercollect.entity.MqttClassifySubscribe;
import com.jshxhb.iot2servercollect.service.MonitorDataService;
import com.jshxhb.iot2servercollect.threadpool.MqttThreadPoolService;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.eclipse.paho.mqttv5.client.MqttConnectionOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.inbound.Mqttv5PahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;

import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

@Configuration
@AutoConfigureAfter(value = {MqttConfiguration.class,MqttSubscribeConfiguration.class})
public class FlexemMqttConfiguration {
    private static final Logger LOGGER = LoggerFactory.getLogger(FlexemMqttConfiguration.class);

    @Autowired
    @Qualifier("flexemMonitorDataServiceImpl")
    private MonitorDataService monitorDataService;

    // 这里需要提供一个线程池
    @Autowired
    private MqttThreadPoolService mqttThreadPoolService;

    @Autowired
    private Mqttv5ConnectionHelper mqttv5ConnectionHelper;

    private ThreadPoolExecutor executor;

    /**
     * in bound channel
     */
    @Bean("flexemInboundChannel")
    public MessageChannel flexemInboundChannel() {
        return new DirectChannel();
    }


    /**
     * in bound channel adapter
     * @param properties
     * @return
     */
    @Bean("flexemInboundChannelAdapter")
    public Mqttv5PahoMessageDrivenChannelAdapter inboundChannelAdapter(CustomMqttProperties properties,
                                                                     MqttClassifySubscribe mqttClassifySubscribe) {
        executor = mqttThreadPoolService.getExecutor();
        MqttConnectionOptions mqttConnectionOptions = mqttv5ConnectionHelper.getMqttConnectionOptions();

        Mqttv5PahoMessageDrivenChannelAdapter mqttv5ChannelAdapter = new Mqttv5PahoMessageDrivenChannelAdapter(mqttConnectionOptions,
                properties.getClientIdPrefix() + mqttClassifySubscribe.getMqttClientId() + "_flexem",
                properties.getDefaultTopic());

        //
        List<String> subscribeTopics = mqttClassifySubscribe.getSubscribeFlexemTopics();
        if (CollectionUtils.isNotEmpty(subscribeTopics)) {
            subscribeTopics.stream().forEach(topic -> mqttv5ChannelAdapter.addTopic(topic));
        }
        mqttv5ChannelAdapter.setCompletionTimeout(3000);
//        mqttv5ChannelAdapter.setConverter(new DefaultPahoMessageConverter());
        mqttv5ChannelAdapter.setQos(1);
        mqttv5ChannelAdapter.setOutputChannel(flexemInboundChannel());
        return mqttv5ChannelAdapter;
    }


    /**
     * consumer message handler
     *
     * @return
     */
    @Bean
    @ServiceActivator(inputChannel = "flexemInboundChannel")
    public MessageHandler flexemInboundMessageHandler() {
        return message -> {
            if (mqttThreadPoolService.checkForDelay()) {
                // 等待1s
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    LOGGER.error("checkForDelay occur exception:{}", ExceptionUtils.getStackTrace(e));
                }
                LOGGER.info("mqtt delay");
            }
            executor.submit(() -> {
                String topic = String.valueOf(message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC));
                String payload = new String((byte[]) message.getPayload(), StandardCharsets.UTF_8);
                monitorDataService.handleRealTimeMonitorData(topic, payload);
            });
        };
    }
}

发送端

package com.jshxhb.iot2servercollect.config;

import com.jshxhb.framework.mqtt.config.MqttConfiguration;
import com.jshxhb.framework.mqtt.config.Mqttv5ConnectionHelper;
import com.jshxhb.framework.mqtt.entity.CustomMqttProperties;
import com.jshxhb.iot2servercollect.entity.MqttClassifySubscribe;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.outbound.Mqttv5PahoMessageHandler;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.handler.annotation.Header;

@Configuration
@AutoConfigureAfter(value = {MqttConfiguration.class,MqttSubscribeConfiguration.class})
public class MqttPubConfiguration {

    @Bean("mqttOutboundChannel")
    public MessageChannel mqttOutboundChannel() {
        return new DirectChannel();
    }

    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public Mqttv5PahoMessageHandler mqttOutbound(Mqttv5ConnectionHelper mqttv5ConnectionHelper, CustomMqttProperties properties,
                                               MqttClassifySubscribe mqttClassifySubscribe) {
        Mqttv5PahoMessageHandler mqttv5PahoMessageHandler = new Mqttv5PahoMessageHandler(mqttv5ConnectionHelper.getMqttConnectionOptions(),
                properties.getClientIdPrefix() + mqttClassifySubscribe.getMqttClientId() + "_pub");
        mqttv5PahoMessageHandler.setAsync(true);
        mqttv5PahoMessageHandler.setDefaultTopic(properties.getDefaultTopic());
        return mqttv5PahoMessageHandler;
    }

    @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
    public interface MyGateway {

        void sendToMqtt(String payload);

        void sendToTopic(String payload, @Header(MqttHeaders.TOPIC) String topic);
    }
}


评论