Spring-Integration
Doc
refer to :https://docs.spring.io/spring-integration/docs/5.5.12/reference/html/
pipes-and-filters architecture
messaging systems typically follow the similarly abstract “pipes-and-filters” model.
The “filters” represent any components capable of producing or consuming messages,
and the “pipes” transport the messages between filters so that the components themselves remain loosely-coupled
Message Channel
A message channel represents the “pipe” of a pipes-and-filters architecture. Producers send messages to a channel, and consumers receive messages from a channel. The message channel therefore decouples the messaging components and also provides a convenient point for interception and monitoring of messages.
A message channel may follow either point-to-point or publish-subscribe semantics.
With a point-to-point channel, no more than one consumer can receive each message sent to the channel.
Publish-subscribe channels, on the other hand, attempt to broadcast each message to all subscribers on the channel.
Spring Integration supports both of these models.
Whereas “point-to-point” and "publish-subscribe" define the two options for how many consumers ultimately receive each message, there is another important consideration: Should the channel buffer messages? In Spring Integration, pollable channels are capable of buffering Messages within a queue.
The advantage of buffering is that it allows for throttling the inbound messages and thereby prevents overloading a consumer. However, as the name suggests, this also adds some complexity, since a consumer can only receive the messages from such a channel if a poller is configured.
Message Endpoint
what's the message endpoint?
any component that can be connected to a message channel in order to send or receive messages.
One of the primary goals of Spring Integration is to simplify the development of enterprise integration solutions through inversion of control. This means that you should not have to implement consumers and producers directly, and you should not even have to build messages and invoke send or receive operations on a message channel.
Instead, you should be able to focus on your specific domain model with an implementation based on plain objects. Then, by providing declarative configuration, you can “connect” your domain-specific code to the messaging infrastructure provided by Spring Integration.
The components responsible for these connections are message endpoints.
This does not mean that you should necessarily connect your existing application code directly.
Any real-world enterprise integration solution requires some amount of code focused upon integration concerns such as routing and transformation.
The important thing is to achieve separation of concerns between the integration logic and the business logic.
In other words, as with the Model-View-Controller (MVC) paradigm for web applications, the goal should be to provide a thin but dedicated layer that translates inbound requests into service layer invocations and then translates service layer return values into outbound replies.
A Message Endpoint represents the “filter” of a pipes-and-filters architecture.
As mentioned earlier, the endpoint’s primary role is to connect application code to the messaging framework and to do so in a non-invasive manner.
In other words, the application code should ideally have no awareness of the message objects or the message channels.
This is similar to the role of a controller in the MVC paradigm.
Just as a controller handles HTTP requests, the message endpoint handles messages. Just as controllers are mapped to URL patterns, message endpoints are mapped to message channels.
Message Filter
A message filter determines whether a message should be passed to an output channel at all.
Message Router
A message router is responsible for deciding what channel or channels (if any) should receive the message next.
Splitter
A splitter is another type of message endpoint whose responsibility is to accept a message from its input channel, split that message into multiple messages, and send each of those to its output channel.
Service Activator
A Service Activator is a generic endpoint for connecting a service instance to the messaging system. The input message channel must be configured, and, if the service method to be invoked is capable of returning a value, an output message Channel may also be provided.
The service activator invokes an operation on some service object to process the request message, extracting the request message’s payload and converting
Whenever the service object’s method returns a value, that return value is likewise converted to a reply message if necessary.That reply message is sent to the output channel.
A request-reply service activator endpoint connects a target object’s method to input and output Message Channels.
Channel Adapter
A channel adapter is an endpoint that connects a message channel to some other system or transport.
file, HTTP Request, JMS message, and others
channelAdapter是和协议绑定的,比如
-
AbstractMqttMessageDrivenChannelAdapter
-
WebSocketInboundChannelAdapter`
-
WebSocketOutboundChannelAdapter`
Endpoint Bean Names
Consuming endpoints (anything with an inputChannel
) consist of two beans, the consumer and the message handler.
The consumer has a reference to the message handler and invokes it as messages arrive.
Consider the following example of an annotated POJO:
@Component
public class SomeComponent {
@ServiceActivator(inputChannel = ...)
public String someMethod(...) {
...
}
}
Given the preceding example, the bean names are as follows:
- Consumer:
someComponent.someMethod.serviceActivator
- Handler:
someComponent.someMethod.serviceActivator.handler
Finding Class Names for Java and DSL Configuration
The AbstractEndpoint
is widely used throughout the Spring Framework for different component implementations. Its main implementations are:
-
EventDrivenConsumer
, used when we subscribe to aSubscribableChannel
to listen for messages. -
PollingConsumer
, used when we poll for messages from aPollableChannel
.
When you use messaging annotations or the Java DSL, you don’t need to worry about these components, because the Framework automatically produces them with appropriate annotations and BeanPostProcessor
implementations.
When building components manually, you should use the ConsumerEndpointFactoryBean
to help determine the target AbstractEndpoint
consumer implementation to create, based on the provided inputChannel
property.
MessageHandler
the first class citizen in the Framework - org.springframework.messaging.MessageHandler
.
The goal of the implementation of this interface is to handle the message consumed by the endpoint from the channel.
All EIP components in Spring Integration are MessageHandler
implementations (for example, AggregatingMessageHandler
, MessageTransformingHandler
, AbstractMessageSplitter
, and others). The target protocol outbound adapters (FileWritingMessageHandler
, HttpRequestExecutingMessageHandler
, AbstractMqttMessageHandler
, and others) are also MessageHandler
implementations.
When you develop Spring Integration applications with Java configuration, you should look into the Spring Integration module to find an appropriate MessageHandler
implementation to use for the @ServiceActivator
configuration.
For example, to send an XMPP message (see XMPP Support) you should configure something like the following:
@Bean
@ServiceActivator(inputChannel = "input")
public MessageHandler sendChatMessageHandler(XMPPConnection xmppConnection) {
ChatMessageSendingMessageHandler handler = new ChatMessageSendingMessageHandler(xmppConnection);
DefaultXmppHeaderMapper xmppHeaderMapper = new DefaultXmppHeaderMapper();
xmppHeaderMapper.setRequestHeaderNames("*");
handler.setHeaderMapper(xmppHeaderMapper);
return handler;
}
The MessageHandler
implementations represent the outbound and processing part of the message flow.
ChannelAdapter
The inbound message flow side has its own components, which are divided into polling and listening behaviors.
The listening (message-driven) components are simple and typically require only one target class implementation to be ready to produce messages.
Listening components can be one-way MessageProducerSupport
implementations, (such as AbstractMqttMessageDrivenChannelAdapter
and ImapIdleChannelAdapter
) or request-reply MessagingGatewaySupport
implementations (such as AmqpInboundGateway
and AbstractWebServiceInboundGateway
).
These inbound endpoints consist of two components: the poller configuration, to initiate the polling task periodically, and a message source class to read data from the target protocol and produce a message for the downstream integration flow.
The first class for the poller configuration is a SourcePollingChannelAdapter
. The Framework produces a bean for it, based on the @InboundChannelAdapter
configuration or a Java DSL builder spec.
You can find all the required inbound and outbound classes for the target protocols in the particular Spring Integration module (in most cases, in the respective package). For example, the spring-integration-websocket
adapters are:
o.s.i.websocket.inbound.WebSocketInboundChannelAdapter
: ImplementsMessageProducerSupport
to listen for frames on the socket and produce message to the channel.o.s.i.websocket.outbound.WebSocketOutboundMessageHandler
: The one-wayAbstractMessageHandler
implementation to convert incoming messages to the appropriate frame and send over websocket.
POJO Method invocation
As discussed in Programming Considerations, we recommend using a POJO programming style, as the following example shows:
@ServiceActivator
public String myService(String payload) { ... }
In this case, the framework extracts a String
payload, invokes your method, and wraps the result in a message to send to the next component in the flow (the original headers are copied to the new message).
You can also obtain header information in your POJO methods, as the following example shows:
@ServiceActivator
public String myService(@Payload String payload, @Header("foo") String fooHeader) { ... }
You can also dereference properties on the message, as the following example shows:
@ServiceActivator
public String myService(@Payload("payload.foo") String foo, @Header("bar.baz") String barbaz) { ... }
FAQ
Service Activator 、 Channel Adapter、MessageHandler,这3者之间的关系是什么?分别负责什么?
ChannelAdapter是和协议绑定的,比如Mqttv5PahoMessageHandler
The service activator is essentially a generic endpoint for calling a method on some object with an input message (payload and headers).
Its internal logic is based on a MessageHandler which can be any possible implementation for a specific use-case,
for example DefaultMessageSplitter, AggregatingMessageHandler, SftpMessageHandler, JpaOutboundGateway etc.
也就是说,被@ServiceActivator注解修饰的方法,最终会被构造成一个MessageHandler类,然后在代码运行过程中,ServiceActivator会调用MessageHandler的handleMessage方法,将数据封装成Message类,提供给MessageHandler,
接下来,MessageHandler会将数据提供给另一端,比如Message System 或者 a service instance method
他们之间的关系如下:
- ServiceActivator和 MessageHandler的关系?
ServiceActivator就是,从inbound message channel中消费消息,将这个消息转换成方法参数,调用MessageHandler的handleMessage方法,对消息内容进行处理。
而处理消息,又分为只消费消息的、消费一条就生产一条消息的,这2种。就类似于lambda表达式中cosumer和function这2个函数式接口。
如果是只消费消息的,就可以归类为MessageHandler了。定义如下;
package org.springframework.messaging;
/**
* Simple contract for handling a {@link Message}.
*
* @author Mark Fisher
* @author Iwein Fuld
* @since 4.0
* @see ReactiveMessageHandler
*/
@FunctionalInterface
public interface MessageHandler {
/**
* Handle the given message.
* @param message the message to be handled
* @throws MessagingException if the handler failed to process the message
*/
void handleMessage(Message<?> message) throws MessagingException;
}
如果是消费一条就生产一条消息的,这一类的,可以同如下定义:
You can also obtain header information in your POJO methods, as the following example shows:
@ServiceActivator
public String myService(@Payload String payload, @Header("foo") String fooHeader) { ... }
2.上面这些概念,整体的逻辑是什么样的?
Message Gateway
什么叫message Gateway?用于什么场景?
message Gateway主要用于隐藏spring集成的逻辑。设想一下,假设我们想要发送一条消息,给rabbitmq,那么我们需要在代码中,显示的调用rabbitmq的sdk,才能将消息发送到rabbitmq中。
但是spring希望,我们不在代码中,显示的调用rabbitmq的sdk。因此抽象出来一层,让我们直接调用message Gateway的方法,然后spring自己在message Gateway的实现类代码中,自行调用rabbitmq的sdk,将这个消息发送到rabbitmq中。
怎么定义一个message Gateway呢?以及message Gateway是怎么工作的呢?
message Gateway就是定义一个接口,然后给这个接口,添加一个 @MessagingGateway
注解即可。如下:
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MyGateway {
void sendToMqtt(String data);
}
Spring Integration provides the GatewayProxyFactoryBean
, which generates a proxy for any interface and internally invokes the gateway methods shown below. By using dependency injection, you can then expose the interface to your business methods.
@MessagingGateway 对接口使用,@Gateway 对接口类的方法使用(就算没有 @Gateway 也会对所有接口方法生成具体实现,需要使用 proxyDefaultMethods 指定为 true,此时使用的 channel 等配置为 @MessagingGateway 上的值)。
方法的参数可以使用 @Header,@Payload 参数注解指定。
适用 throws Throwable 来定义需要处理的异常。
@MessagingGateway(name = "myGateway", defaultRequestChannel = "inputC",
defaultHeaders = @GatewayHeader(name = "calledMethod",
expression="#gatewayMethod.name"))
public interface TestGateway {
@Gateway(requestChannel = "inputA", replyTimeout = 2, requestTimeout = 200)
String echo(String payload);
@Gateway(requestChannel = "inputB", headers = @GatewayHeader(name = "thing1", value="thing2"))
String echoUpperCase(String payload);
String echoViaDefault(String payload);
}
@MessagingGateway
有哪些可配置的属性?
default-reply-timeout
: If a reply timeout is not specified, the calling thread waits indefinitely for a reply.
default-reply-channel
: it listens for the reply.
Messaging Gateway 产生的消息将根据消息头中的 request-channel 发送到对应的 channel,并由 reply-channel 中获取响应。
如果没有指定 Channel,那么会默认创建一个匿名的 Channel,意味着由这个 Messaging Gateway 发出的消息无法前提创建或订阅,可以指定 default-request-channel 和 default-reply-channel 指定没有相应消息头时使用的 channel。
Gateway Timeouts
Gateways have two timeout properties: requestTimeout
and replyTimeout
. The request timeout applies only if the channel can block (for example, a bounded QueueChannel
that is full). The replyTimeout
value is how long the gateway waits for a reply or returns null
. It defaults to infinity.
The timeouts can be set as defaults for all methods on the gateway (defaultRequestTimeout
and defaultReplyTimeout
) or on the MessagingGateway
interface annotation. Individual methods can override these defaults (in <method/>
child elements) or on the @Gateway
annotation.
Starting with version 5.0, the timeouts can be defined as expressions, as the following example shows:
@Gateway(payloadExpression = "args[0]", requestChannel = "someChannel",
requestTimeoutExpression = "args[1]", replyTimeoutExpression = "args[2]")
String lateReply(String payload, long requestTimeout, long replyTimeout);
Mapping Method Arguments to a Message
Using the configuration techniques in the previous section allows control of how method arguments are mapped to message elements (payload and headers). When no explicit configuration is used, certain conventions are used to perform the mapping. In some cases, these conventions cannot determine which argument is the payload and which should be mapped to headers. Consider the following example:
public String send1(Object thing1, Map thing2);
public String send2(Map thing1, Map thing2);
In the first case, the convention is to map the first argument to the payload (as long as it is not a Map
) and the contents of the second argument become headers.
In the second case (or the first when the argument for parameter thing1
is a Map
), the framework cannot determine which argument should be the payload. Consequently, mapping fails. This can generally be resolved using a payload-expression
, a @Payload
annotation, or a @Headers
annotation.
Mapping Method Arguments
The following examples show how method arguments can be mapped to the message and shows some examples of invalid configuration:
public interface MyGateway {
void payloadAndHeaderMapWithoutAnnotations(String s, Map<String, Object> map);
void payloadAndHeaderMapWithAnnotations(@Payload String s, @Headers Map<String, Object> map);
void headerValuesAndPayloadWithAnnotations(@Header("k1") String x, @Payload String s, @Header("k2") String y);
void mapOnly(Map<String, Object> map); // the payload is the map and no custom headers are added
void twoMapsAndOneAnnotatedWithPayload(@Payload Map<String, Object> payload, Map<String, Object> headers);
@Payload("args[0] + args[1] + '!'")
void payloadAnnotationAtMethodLevel(String a, String b);
@Payload("@someBean.exclaim(args[0])")
void payloadAnnotationAtMethodLevelUsingBeanResolver(String s);
void payloadAnnotationWithExpression(@Payload("toUpperCase()") String s);
void payloadAnnotationWithExpressionUsingBeanResolver(@Payload("@someBean.sum(#this)") String s); //
// invalid
void twoMapsWithoutAnnotations(Map<String, Object> m1, Map<String, Object> m2);
// invalid
void twoPayloads(@Payload String s1, @Payload String s2);
// invalid
void payloadAndHeaderAnnotationsOnSameParameter(@Payload @Header("x") String s);
// invalid
void payloadAndHeadersAnnotationsOnSameParameter(@Payload @Headers Map<String, Object> map);
}
@MessagingGateway
Annotation
Starting with version 4.0, gateway service interfaces can be marked with a @MessagingGateway
annotation instead of requiring the definition of a <gateway />
xml element for configuration. The following pair of examples compares the two approaches for configuring the same gateway:
<int:gateway id="myGateway" service-interface="org.something.TestGateway"
default-request-channel="inputC">
<int:default-header name="calledMethod" expression="#gatewayMethod.name"/>
<int:method name="echo" request-channel="inputA" reply-timeout="2" request-timeout="200"/>
<int:method name="echoUpperCase" request-channel="inputB">
<int:header name="thing1" value="thing2"/>
</int:method>
<int:method name="echoViaDefault"/>
</int:gateway>
@MessagingGateway(name = "myGateway", defaultRequestChannel = "inputC",
defaultHeaders = @GatewayHeader(name = "calledMethod",
expression="#gatewayMethod.name"))
public interface TestGateway {
@Gateway(requestChannel = "inputA", replyTimeout = 2, requestTimeout = 200)
String echo(String payload);
@Gateway(requestChannel = "inputB", headers = @GatewayHeader(name = "thing1", value="thing2"))
String echoUpperCase(String payload);
String echoViaDefault(String payload);
}
The @Header
annotation lets you add values that are interpreted as message headers, as the following example shows:
public interface FileWriter {
@Gateway(requestChannel="filesOut")
void write(byte[] content, @Header(FileHeaders.FILENAME) String filename);
}
消息异常 和 errorChannel
对于消息处理过程中的异常,默认情况下会层层的向上传递,为了捕获相应的异常,可以在接口的方法上添加 throws 关键字定义需要捕获的异常。除此之外,还可以通过指定一个 errorChannel 将错误由指定的消息消费者处理。
MQTT
https://docs.spring.io/spring-integration/docs/5.5.12/reference/html/mqtt.html#mqtt
MQTT: Message Queueing Telemetry Transport (MQTT) protocol.
Messaging Gateway
refer to : https://docs.spring.io/spring-integration/reference/html/gateway.html
https://zhuanlan.zhihu.com/p/577226216
Outbound Channel Adapter
Configuring with Java Configuration
The following Spring Boot application show an example of how to configure the outbound adapter with Java configuration:
@SpringBootApplication
@IntegrationComponentScan
public class MqttJavaApplication {
public static void main(String[] args) {
ConfigurableApplicationContext context =
new SpringApplicationBuilder(MqttJavaApplication.class)
.web(false)
.run(args);
MyGateway gateway = context.getBean(MyGateway.class);
gateway.sendToMqtt("foo");
}
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(new String[] { "tcp://host1:1883", "tcp://host2:1883" });
options.setUserName("username");
options.setPassword("password".toCharArray());
factory.setConnectionOptions(options);
return factory;
}
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler =
new MqttPahoMessageHandler("testClient", mqttClientFactory());
messageHandler.setAsync(true);
messageHandler.setDefaultTopic("testTopic");
return messageHandler;
}
@Bean
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MyGateway {
void sendToMqtt(String data);
}
}
上面的代码,如何理解呢?
业务代码中,调用MessagingGateway的sendToMqtt方法,将消息发送到mqttOutboundChannel,然后ServiceActivator从mqttOutboundChannel中获取到消息,再将这个消息,发送到testTopic中。
Inbound (Message-driven) Channel Adapter
Configuring with Java Configuration
The following Spring Boot application shows an example of how to configure the inbound adapter with Java configuration:
@SpringBootApplication
public class MqttJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(MqttJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}
@Bean
public MessageProducer inbound() {
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter("tcp://localhost:1883", "testClient",
"topic1", "topic2");
adapter.setCompletionTimeout(5000);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(1);
adapter.setOutputChannel(mqttInputChannel());
return adapter;
}
@Bean
@ServiceActivator(inputChannel = "mqttInputChannel")
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
System.out.println(message.getPayload());
}
};
}
}
上面的代码,如何理解呢?
首先创建了一个MqttPahoMessageDrivenChannelAdapter,从rabbitmq中获取消息,获取到的消息,输出到mqttInputChannel中,然后ServiceActivator从mqttInputChannel中获取消息,将获取到消息作为方法参数,调用MessageHandler的handleMessage方法。我们的业务逻辑,就写在handleMessage中即可。
源码分析
参考: https://blog.csdn.net/xrq1995/article/details/127690595
mqtt-java-demo
默认情况下,只有消费端才能创建queue。因此,如果生成端先启动,而生成端不能自动创建队列,因此发送的消息不会被保存到queue中,而是会自动丢弃。
因此,Best Practise是,消费端先启动,自动创建队列,订阅指定的topic;然后生成端才能启动,发送消息。
在rabbitmq模型中,生成端指定的topic,其实就是routingKey。
如果rabbitmq宕机,如何保证消息不丢失
1.queue是持久化的
2.message是持久化的。
使用spring-integration-mqtt,默认情况下,queue是持久化的。但是message不是持久化的,如果希望将message设置为持久化的,那么需要做以下设置:
CleanSession需要设置为false
@Bean
public MqttPahoClientFactory mqttPahoClientFactory(CustomMqttProperties customMqttProperties) {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions options = new MqttConnectOptions();
// 如果设置为true则接收不到掉线的消息,改为false则可以收到掉线的消息,
options.setCleanSession(false);
options.setServerURIs(customMqttProperties.getUris());
options.setUserName(customMqttProperties.getUserName());
options.setPassword(customMqttProperties.getPassword().toCharArray());
factory.setConnectionOptions(options);
return factory;
}
发送的Qos设置为1
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound(MqttPahoClientFactory mqttPahoClientFactory) {
MqttPahoMessageHandler messageHandler =
new MqttPahoMessageHandler("testClient", mqttPahoClientFactory);
messageHandler.setAsync(true);
messageHandler.setDefaultQos(1);
messageHandler.setDefaultTopic("/testtopic");
return messageHandler;
}