Administrator
发布于 2023-05-05 / 77 阅读
0
0

MqttNote

Spring-Integration

Doc

refer to :https://docs.spring.io/spring-integration/docs/5.5.12/reference/html/

https://docs.spring.io/spring-integration/docs/5.5.12/reference/html/overview.html#spring-integration-introduction

pipes-and-filters architecture

messaging systems typically follow the similarly abstract “pipes-and-filters” model.

The “filters” represent any components capable of producing or consuming messages,

and the “pipes” transport the messages between filters so that the components themselves remain loosely-coupled

Message Channel

A message channel represents the “pipe” of a pipes-and-filters architecture. Producers send messages to a channel, and consumers receive messages from a channel. The message channel therefore decouples the messaging components and also provides a convenient point for interception and monitoring of messages.

image-20230418185402286

A message channel may follow either point-to-point or publish-subscribe semantics.

With a point-to-point channel, no more than one consumer can receive each message sent to the channel.

Publish-subscribe channels, on the other hand, attempt to broadcast each message to all subscribers on the channel.

Spring Integration supports both of these models.

Whereas “point-to-point” and "publish-subscribe" define the two options for how many consumers ultimately receive each message, there is another important consideration: Should the channel buffer messages? In Spring Integration, pollable channels are capable of buffering Messages within a queue.

The advantage of buffering is that it allows for throttling the inbound messages and thereby prevents overloading a consumer. However, as the name suggests, this also adds some complexity, since a consumer can only receive the messages from such a channel if a poller is configured.

Message Endpoint

what's the message endpoint?

any component that can be connected to a message channel in order to send or receive messages.

One of the primary goals of Spring Integration is to simplify the development of enterprise integration solutions through inversion of control. This means that you should not have to implement consumers and producers directly, and you should not even have to build messages and invoke send or receive operations on a message channel.

Instead, you should be able to focus on your specific domain model with an implementation based on plain objects. Then, by providing declarative configuration, you can “connect” your domain-specific code to the messaging infrastructure provided by Spring Integration.

The components responsible for these connections are message endpoints.

This does not mean that you should necessarily connect your existing application code directly.

Any real-world enterprise integration solution requires some amount of code focused upon integration concerns such as routing and transformation.

The important thing is to achieve separation of concerns between the integration logic and the business logic.

In other words, as with the Model-View-Controller (MVC) paradigm for web applications, the goal should be to provide a thin but dedicated layer that translates inbound requests into service layer invocations and then translates service layer return values into outbound replies.

A Message Endpoint represents the “filter” of a pipes-and-filters architecture.

As mentioned earlier, the endpoint’s primary role is to connect application code to the messaging framework and to do so in a non-invasive manner.

In other words, the application code should ideally have no awareness of the message objects or the message channels.

This is similar to the role of a controller in the MVC paradigm.

Just as a controller handles HTTP requests, the message endpoint handles messages. Just as controllers are mapped to URL patterns, message endpoints are mapped to message channels.

Message Filter

A message filter determines whether a message should be passed to an output channel at all.

Message Router

A message router is responsible for deciding what channel or channels (if any) should receive the message next.

image-20230418191621574

Splitter

A splitter is another type of message endpoint whose responsibility is to accept a message from its input channel, split that message into multiple messages, and send each of those to its output channel.

Service Activator

A Service Activator is a generic endpoint for connecting a service instance to the messaging system. The input message channel must be configured, and, if the service method to be invoked is capable of returning a value, an output message Channel may also be provided.

The service activator invokes an operation on some service object to process the request message, extracting the request message’s payload and converting

Whenever the service object’s method returns a value, that return value is likewise converted to a reply message if necessary.That reply message is sent to the output channel.

A request-reply service activator endpoint connects a target object’s method to input and output Message Channels.

image-20230418192720331

Channel Adapter

A channel adapter is an endpoint that connects a message channel to some other system or transport.

image-20230418193022116

image-20230418193115009

file, HTTP Request, JMS message, and others

channelAdapter是和协议绑定的,比如

  • AbstractMqttMessageDrivenChannelAdapter

  • WebSocketInboundChannelAdapter`

  • WebSocketOutboundChannelAdapter`

Endpoint Bean Names

Consuming endpoints (anything with an inputChannel) consist of two beans, the consumer and the message handler.

The consumer has a reference to the message handler and invokes it as messages arrive.

Consider the following example of an annotated POJO:

@Component
public class SomeComponent {

    @ServiceActivator(inputChannel = ...)
    public String someMethod(...) {
        ...
    }

}

Given the preceding example, the bean names are as follows:

  • Consumer: someComponent.someMethod.serviceActivator
  • Handler: someComponent.someMethod.serviceActivator.handler

Finding Class Names for Java and DSL Configuration

The AbstractEndpoint is widely used throughout the Spring Framework for different component implementations. Its main implementations are:

  • EventDrivenConsumer, used when we subscribe to a SubscribableChannel to listen for messages.

  • PollingConsumer, used when we poll for messages from a PollableChannel.

When you use messaging annotations or the Java DSL, you don’t need to worry about these components, because the Framework automatically produces them with appropriate annotations and BeanPostProcessor implementations.

When building components manually, you should use the ConsumerEndpointFactoryBean to help determine the target AbstractEndpoint consumer implementation to create, based on the provided inputChannel property.

MessageHandler

the first class citizen in the Framework - org.springframework.messaging.MessageHandler.

The goal of the implementation of this interface is to handle the message consumed by the endpoint from the channel.

All EIP components in Spring Integration are MessageHandler implementations (for example, AggregatingMessageHandler, MessageTransformingHandler, AbstractMessageSplitter, and others). The target protocol outbound adapters (FileWritingMessageHandler, HttpRequestExecutingMessageHandler, AbstractMqttMessageHandler, and others) are also MessageHandler implementations.

When you develop Spring Integration applications with Java configuration, you should look into the Spring Integration module to find an appropriate MessageHandler implementation to use for the @ServiceActivator configuration.

For example, to send an XMPP message (see XMPP Support) you should configure something like the following:

@Bean
@ServiceActivator(inputChannel = "input")
public MessageHandler sendChatMessageHandler(XMPPConnection xmppConnection) {
    ChatMessageSendingMessageHandler handler = new ChatMessageSendingMessageHandler(xmppConnection);

    DefaultXmppHeaderMapper xmppHeaderMapper = new DefaultXmppHeaderMapper();
    xmppHeaderMapper.setRequestHeaderNames("*");
    handler.setHeaderMapper(xmppHeaderMapper);

    return handler;
}

The MessageHandler implementations represent the outbound and processing part of the message flow.

ChannelAdapter

The inbound message flow side has its own components, which are divided into polling and listening behaviors.

The listening (message-driven) components are simple and typically require only one target class implementation to be ready to produce messages.

Listening components can be one-way MessageProducerSupport implementations, (such as AbstractMqttMessageDrivenChannelAdapter and ImapIdleChannelAdapter) or request-reply MessagingGatewaySupport implementations (such as AmqpInboundGateway and AbstractWebServiceInboundGateway).

These inbound endpoints consist of two components: the poller configuration, to initiate the polling task periodically, and a message source class to read data from the target protocol and produce a message for the downstream integration flow.

The first class for the poller configuration is a SourcePollingChannelAdapter. The Framework produces a bean for it, based on the @InboundChannelAdapter configuration or a Java DSL builder spec.

You can find all the required inbound and outbound classes for the target protocols in the particular Spring Integration module (in most cases, in the respective package). For example, the spring-integration-websocket adapters are:

  • o.s.i.websocket.inbound.WebSocketInboundChannelAdapter: Implements MessageProducerSupport to listen for frames on the socket and produce message to the channel.
  • o.s.i.websocket.outbound.WebSocketOutboundMessageHandler: The one-way AbstractMessageHandler implementation to convert incoming messages to the appropriate frame and send over websocket.

POJO Method invocation

As discussed in Programming Considerations, we recommend using a POJO programming style, as the following example shows:

@ServiceActivator
public String myService(String payload) { ... }

In this case, the framework extracts a String payload, invokes your method, and wraps the result in a message to send to the next component in the flow (the original headers are copied to the new message).

You can also obtain header information in your POJO methods, as the following example shows:

@ServiceActivator
public String myService(@Payload String payload, @Header("foo") String fooHeader) { ... }

You can also dereference properties on the message, as the following example shows:

@ServiceActivator
public String myService(@Payload("payload.foo") String foo, @Header("bar.baz") String barbaz) { ... }

FAQ

Service Activator 、 Channel Adapter、MessageHandler,这3者之间的关系是什么?分别负责什么?

ChannelAdapter是和协议绑定的,比如Mqttv5PahoMessageHandler


The service activator is essentially a generic endpoint for calling a method on some object with an input message (payload and headers).
Its internal logic is based on a MessageHandler which can be any possible implementation for a specific use-case,
for example DefaultMessageSplitter, AggregatingMessageHandler, SftpMessageHandler, JpaOutboundGateway etc.

也就是说,被@ServiceActivator注解修饰的方法,最终会被构造成一个MessageHandler类,然后在代码运行过程中,ServiceActivator会调用MessageHandler的handleMessage方法,将数据封装成Message类,提供给MessageHandler,
接下来,MessageHandler会将数据提供给另一端,比如Message System 或者 a service instance method


他们之间的关系如下:

image-20241111142920436

  1. ServiceActivator和 MessageHandler的关系?

ServiceActivator就是,从inbound message channel中消费消息,将这个消息转换成方法参数,调用MessageHandler的handleMessage方法,对消息内容进行处理。

而处理消息,又分为只消费消息的、消费一条就生产一条消息的,这2种。就类似于lambda表达式中cosumer和function这2个函数式接口。

如果是只消费消息的,就可以归类为MessageHandler了。定义如下;

package org.springframework.messaging;

/**
 * Simple contract for handling a {@link Message}.
 *
 * @author Mark Fisher
 * @author Iwein Fuld
 * @since 4.0
 * @see ReactiveMessageHandler
 */
@FunctionalInterface
public interface MessageHandler {

	/**
	 * Handle the given message.
	 * @param message the message to be handled
	 * @throws MessagingException if the handler failed to process the message
	 */
	void handleMessage(Message<?> message) throws MessagingException;

}

如果是消费一条就生产一条消息的,这一类的,可以同如下定义:

You can also obtain header information in your POJO methods, as the following example shows:

@ServiceActivator
public String myService(@Payload String payload, @Header("foo") String fooHeader) { ... }

2.上面这些概念,整体的逻辑是什么样的?

image-20230419084506583

Message Gateway

什么叫message Gateway?用于什么场景?

message Gateway主要用于隐藏spring集成的逻辑。设想一下,假设我们想要发送一条消息,给rabbitmq,那么我们需要在代码中,显示的调用rabbitmq的sdk,才能将消息发送到rabbitmq中。

但是spring希望,我们不在代码中,显示的调用rabbitmq的sdk。因此抽象出来一层,让我们直接调用message Gateway的方法,然后spring自己在message Gateway的实现类代码中,自行调用rabbitmq的sdk,将这个消息发送到rabbitmq中。

怎么定义一个message Gateway呢?以及message Gateway是怎么工作的呢?

message Gateway就是定义一个接口,然后给这个接口,添加一个 @MessagingGateway注解即可。如下:

 @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
    public interface MyGateway {

        void sendToMqtt(String data);

    }

Spring Integration provides the GatewayProxyFactoryBean, which generates a proxy for any interface and internally invokes the gateway methods shown below. By using dependency injection, you can then expose the interface to your business methods.

@MessagingGateway 对接口使用,@Gateway 对接口类的方法使用(就算没有 @Gateway 也会对所有接口方法生成具体实现,需要使用 proxyDefaultMethods 指定为 true,此时使用的 channel 等配置为 @MessagingGateway 上的值)。

方法的参数可以使用 @Header,@Payload 参数注解指定。

适用 throws Throwable 来定义需要处理的异常。

@MessagingGateway(name = "myGateway", defaultRequestChannel = "inputC",
		  defaultHeaders = @GatewayHeader(name = "calledMethod",
		                           expression="#gatewayMethod.name"))
public interface TestGateway {

   @Gateway(requestChannel = "inputA", replyTimeout = 2, requestTimeout = 200)
   String echo(String payload);

   @Gateway(requestChannel = "inputB", headers = @GatewayHeader(name = "thing1", value="thing2"))
   String echoUpperCase(String payload);

   String echoViaDefault(String payload);

}

@MessagingGateway有哪些可配置的属性?

default-reply-timeout: If a reply timeout is not specified, the calling thread waits indefinitely for a reply.

default-reply-channel: it listens for the reply.

Messaging Gateway 产生的消息将根据消息头中的 request-channel 发送到对应的 channel,并由 reply-channel 中获取响应。

如果没有指定 Channel,那么会默认创建一个匿名的 Channel,意味着由这个 Messaging Gateway 发出的消息无法前提创建或订阅,可以指定 default-request-channel 和 default-reply-channel 指定没有相应消息头时使用的 channel。

Gateway Timeouts

Gateways have two timeout properties: requestTimeout and replyTimeout. The request timeout applies only if the channel can block (for example, a bounded QueueChannel that is full). The replyTimeout value is how long the gateway waits for a reply or returns null. It defaults to infinity.

The timeouts can be set as defaults for all methods on the gateway (defaultRequestTimeout and defaultReplyTimeout) or on the MessagingGateway interface annotation. Individual methods can override these defaults (in <method/> child elements) or on the @Gateway annotation.

Starting with version 5.0, the timeouts can be defined as expressions, as the following example shows:

@Gateway(payloadExpression = "args[0]", requestChannel = "someChannel",
        requestTimeoutExpression = "args[1]", replyTimeoutExpression = "args[2]")
String lateReply(String payload, long requestTimeout, long replyTimeout);

Mapping Method Arguments to a Message

Using the configuration techniques in the previous section allows control of how method arguments are mapped to message elements (payload and headers). When no explicit configuration is used, certain conventions are used to perform the mapping. In some cases, these conventions cannot determine which argument is the payload and which should be mapped to headers. Consider the following example:

public String send1(Object thing1, Map thing2);

public String send2(Map thing1, Map thing2);

In the first case, the convention is to map the first argument to the payload (as long as it is not a Map) and the contents of the second argument become headers.

In the second case (or the first when the argument for parameter thing1 is a Map), the framework cannot determine which argument should be the payload. Consequently, mapping fails. This can generally be resolved using a payload-expression, a @Payload annotation, or a @Headers annotation.

Mapping Method Arguments

The following examples show how method arguments can be mapped to the message and shows some examples of invalid configuration:

public interface MyGateway {

    void payloadAndHeaderMapWithoutAnnotations(String s, Map<String, Object> map);

    void payloadAndHeaderMapWithAnnotations(@Payload String s, @Headers Map<String, Object> map);

    void headerValuesAndPayloadWithAnnotations(@Header("k1") String x, @Payload String s, @Header("k2") String y);

    void mapOnly(Map<String, Object> map); // the payload is the map and no custom headers are added

    void twoMapsAndOneAnnotatedWithPayload(@Payload Map<String, Object> payload, Map<String, Object> headers);

    @Payload("args[0] + args[1] + '!'")
    void payloadAnnotationAtMethodLevel(String a, String b);

    @Payload("@someBean.exclaim(args[0])")
    void payloadAnnotationAtMethodLevelUsingBeanResolver(String s);

    void payloadAnnotationWithExpression(@Payload("toUpperCase()") String s);

    void payloadAnnotationWithExpressionUsingBeanResolver(@Payload("@someBean.sum(#this)") String s); //  

    // invalid
    void twoMapsWithoutAnnotations(Map<String, Object> m1, Map<String, Object> m2);

    // invalid
    void twoPayloads(@Payload String s1, @Payload String s2);

    // invalid
    void payloadAndHeaderAnnotationsOnSameParameter(@Payload @Header("x") String s);

    // invalid
    void payloadAndHeadersAnnotationsOnSameParameter(@Payload @Headers Map<String, Object> map);

}

@MessagingGateway Annotation

Starting with version 4.0, gateway service interfaces can be marked with a @MessagingGateway annotation instead of requiring the definition of a <gateway /> xml element for configuration. The following pair of examples compares the two approaches for configuring the same gateway:

<int:gateway id="myGateway" service-interface="org.something.TestGateway"
      default-request-channel="inputC">
  <int:default-header name="calledMethod" expression="#gatewayMethod.name"/>
  <int:method name="echo" request-channel="inputA" reply-timeout="2" request-timeout="200"/>
  <int:method name="echoUpperCase" request-channel="inputB">
    <int:header name="thing1" value="thing2"/>
  </int:method>
  <int:method name="echoViaDefault"/>
</int:gateway>
@MessagingGateway(name = "myGateway", defaultRequestChannel = "inputC",
		  defaultHeaders = @GatewayHeader(name = "calledMethod",
		                           expression="#gatewayMethod.name"))
public interface TestGateway {

   @Gateway(requestChannel = "inputA", replyTimeout = 2, requestTimeout = 200)
   String echo(String payload);

   @Gateway(requestChannel = "inputB", headers = @GatewayHeader(name = "thing1", value="thing2"))
   String echoUpperCase(String payload);

   String echoViaDefault(String payload);

}

The @Header annotation lets you add values that are interpreted as message headers, as the following example shows:

public interface FileWriter {

    @Gateway(requestChannel="filesOut")
    void write(byte[] content, @Header(FileHeaders.FILENAME) String filename);

}

消息异常 和 errorChannel

对于消息处理过程中的异常,默认情况下会层层的向上传递,为了捕获相应的异常,可以在接口的方法上添加 throws 关键字定义需要捕获的异常。除此之外,还可以通过指定一个 errorChannel 将错误由指定的消息消费者处理。

MQTT

https://docs.spring.io/spring-integration/docs/5.5.12/reference/html/mqtt.html#mqtt

MQTT: Message Queueing Telemetry Transport (MQTT) protocol.

Messaging Gateway

refer to : https://docs.spring.io/spring-integration/reference/html/gateway.html

https://zhuanlan.zhihu.com/p/577226216

Outbound Channel Adapter

Configuring with Java Configuration

The following Spring Boot application show an example of how to configure the outbound adapter with Java configuration:

@SpringBootApplication
@IntegrationComponentScan
public class MqttJavaApplication {

    public static void main(String[] args) {
        ConfigurableApplicationContext context =
                new SpringApplicationBuilder(MqttJavaApplication.class)
                        .web(false)
                        .run(args);
        MyGateway gateway = context.getBean(MyGateway.class);
        gateway.sendToMqtt("foo");
    }

    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        MqttConnectOptions options = new MqttConnectOptions();
        options.setServerURIs(new String[] { "tcp://host1:1883", "tcp://host2:1883" });
        options.setUserName("username");
        options.setPassword("password".toCharArray());
        factory.setConnectionOptions(options);
        return factory;
    }

    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public MessageHandler mqttOutbound() {
        MqttPahoMessageHandler messageHandler =
                       new MqttPahoMessageHandler("testClient", mqttClientFactory());
        messageHandler.setAsync(true);
        messageHandler.setDefaultTopic("testTopic");
        return messageHandler;
    }

    @Bean
    public MessageChannel mqttOutboundChannel() {
        return new DirectChannel();
    }

    @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
    public interface MyGateway {

        void sendToMqtt(String data);

    }

}

上面的代码,如何理解呢?

业务代码中,调用MessagingGateway的sendToMqtt方法,将消息发送到mqttOutboundChannel,然后ServiceActivator从mqttOutboundChannel中获取到消息,再将这个消息,发送到testTopic中。

Inbound (Message-driven) Channel Adapter

Configuring with Java Configuration

The following Spring Boot application shows an example of how to configure the inbound adapter with Java configuration:

@SpringBootApplication
public class MqttJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(MqttJavaApplication.class)
                .web(false)
                .run(args);
    }

    @Bean
    public MessageChannel mqttInputChannel() {
        return new DirectChannel();
    }

    @Bean
    public MessageProducer inbound() {
        MqttPahoMessageDrivenChannelAdapter adapter =
                new MqttPahoMessageDrivenChannelAdapter("tcp://localhost:1883", "testClient",
                                                 "topic1", "topic2");
        adapter.setCompletionTimeout(5000);
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setQos(1);
        adapter.setOutputChannel(mqttInputChannel());
        return adapter;
    }

    @Bean
    @ServiceActivator(inputChannel = "mqttInputChannel")
    public MessageHandler handler() {
        return new MessageHandler() {

            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                System.out.println(message.getPayload());
            }

        };
    }

}

上面的代码,如何理解呢?

首先创建了一个MqttPahoMessageDrivenChannelAdapter,从rabbitmq中获取消息,获取到的消息,输出到mqttInputChannel中,然后ServiceActivator从mqttInputChannel中获取消息,将获取到消息作为方法参数,调用MessageHandler的handleMessage方法。我们的业务逻辑,就写在handleMessage中即可。

源码分析

参考: https://blog.csdn.net/xrq1995/article/details/127690595

mqtt-java-demo

默认情况下,只有消费端才能创建queue。因此,如果生成端先启动,而生成端不能自动创建队列,因此发送的消息不会被保存到queue中,而是会自动丢弃。

因此,Best Practise是,消费端先启动,自动创建队列,订阅指定的topic;然后生成端才能启动,发送消息。

在rabbitmq模型中,生成端指定的topic,其实就是routingKey。

如果rabbitmq宕机,如何保证消息不丢失

1.queue是持久化的

2.message是持久化的。

使用spring-integration-mqtt,默认情况下,queue是持久化的。但是message不是持久化的,如果希望将message设置为持久化的,那么需要做以下设置:

CleanSession需要设置为false

@Bean
    public MqttPahoClientFactory mqttPahoClientFactory(CustomMqttProperties customMqttProperties) {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        MqttConnectOptions options = new MqttConnectOptions();
        // 如果设置为true则接收不到掉线的消息,改为false则可以收到掉线的消息,
        options.setCleanSession(false);
        options.setServerURIs(customMqttProperties.getUris());
        options.setUserName(customMqttProperties.getUserName());
        options.setPassword(customMqttProperties.getPassword().toCharArray());
        factory.setConnectionOptions(options);
        return factory;
    }

发送的Qos设置为1

@Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public MessageHandler mqttOutbound(MqttPahoClientFactory mqttPahoClientFactory) {
        MqttPahoMessageHandler messageHandler =
                new MqttPahoMessageHandler("testClient", mqttPahoClientFactory);
        messageHandler.setAsync(true);
        messageHandler.setDefaultQos(1);
        messageHandler.setDefaultTopic("/testtopic");
        return messageHandler;
    }

评论