Administrator
发布于 2024-11-12 / 4 阅读
0
0

MQ13Use

Consumers

Consumer Prefetch

Overview

Consumer prefetch is an extension to the channel prefetch mechanism.

AMQP 0-9-1 specifies the basic.qos method to make it possible to limit the number of unacknowledged messages on a channel (or connection) when consuming (aka "prefetch count"). Unfortunately the channel is not the ideal scope for this - since a single channel may consume from multiple queues, the channel and the queue(s) need to coordinate with each other for every message sent to ensure they don't go over the limit. This is slow on a single machine, and very slow when consuming across a cluster.

Furthermore for many uses it is simply more natural to specify a prefetch count that applies to each consumer.

Therefore RabbitMQ redefines the meaning of the global flag in the basic.qos method:

globalMeaning of prefetch_count in AMQP 0-9-1Meaning of prefetch_count in RabbitMQ
falseshared across all consumers on the channelapplied separately to each new consumer on the channel
trueshared across all consumers on the connectionshared across all consumers on the channel

Note that the default value for the global flag is false in most APIs.

Single Consumer

The following basic example in Java will receive a maximum of 10 unacknowledged messages at once:

Channel channel = ...;
Consumer consumer = ...;
channel.basicQos(10); // Per consumer limit
channel.basicConsume("my-queue", false, consumer);

A value of 0 is treated as infinite, allowing any number of unacknowledged messages.

Channel channel = ...;
Consumer consumer = ...;
channel.basicQos(0); // No limit for this consumer
channel.basicConsume("my-queue", false, consumer);

Independent Consumers

This example starts two consumers on the same channel, each of which will independently receive a maximum of 10 unacknowledged messages at once:

Channel channel = ...;
Consumer consumer1 = ...;
Consumer consumer2 = ...;
channel.basicQos(10); // Per consumer limit
channel.basicConsume("my-queue1", false, consumer1);
channel.basicConsume("my-queue2", false, consumer2);

Multiple Consumers Sharing the Limit

The AMQP 0-9-1 specification does not explain what happens if you invoke basic.qos multiple times with different global values. RabbitMQ interprets this as meaning that the two prefetch limits should be enforced independently of each other; consumers will only receive new messages when neither limit on unacknowledged messages has been reached.

For example:

Channel channel = ...;
Consumer consumer1 = ...;
Consumer consumer2 = ...;
channel.basicQos(10, false); // Per consumer limit
channel.basicQos(15, true);  // Per channel limit
channel.basicConsume("my-queue1", false, consumer1);
channel.basicConsume("my-queue2", false, consumer2);

These two consumers will only ever have 15 unacknowledged messages between them, with a maximum of 10 messages for each consumer. This will be slower than the above examples, due to the additional overhead of coordinating between the channel and the queues to enforce the global limit.

Configurable Default Prefetch

RabbitMQ can use a default prefetch that will be applied if the consumer doesn't specify one. The value can be configured as rabbit.default_consumer_prefetch in the advanced configuration file:

%% advanced.config file
[
 {rabbit, [
       {default_consumer_prefetch, {false,250}}
     ]
 }
].

Queues

What is a Queue?

A queue in RabbitMQ is an ordered collection of messages. Messages are enqueued and dequeued (delivered to consumers) in a (FIFO ("first in, first out") manner.

To define a queue in generic terms, it is a sequential data structure with two primary operations: an item can be enqueued (added) at the tail and dequeued (consumed) from the head.

Queues play a major role in the messaging technology space. Many messaging protocols and tools assume that publishers and consumers communicate using a queue-like storage mechanism.

Many features in a messaging system are related to queues. Some RabbitMQ queue features such as priorities and requeueing by consumers can affect the ordering as observed by consumers.

The information in this topic includes an overview of queues in RabbitMQ and also links out to other topics so you can learn more about using queues in RabbitMQ.

This information primarily covers queues in the context of the AMQP 0-9-1 protocol, however, much of the content is applicable to other supported protocols.

Some protocols (for example: STOMP and MQTT) are based around the idea of topics. For these protocols, queues act as a data accumulation buffer for consumers. However, it is still important to understand the role queues play because many features still operate at the queue level, even for those protocols.

Streams is an alternative messaging data structure available in RabbitMQ. Streams provide different features from queues.

The information about RabbitMQ queues covered in this topic includes:

For topics related to consumers, see the Consumers guide. Classic queues, quorum queues and streams also have dedicated guides.

Queue Names

Queues have names so that applications can reference them.

Applications may pick queue names or ask the broker to generate a name for them. Queue names may be up to 255 bytes of UTF-8 characters.

Queue names starting with "amq." are reserved for internal use by the broker. Attempts to declare a queue with a name that violates this rule will result in a channel-level exception with reply code 403 (ACCESS_REFUSED).

Queue Properties

Queues have properties that define how they behave. There is a set of mandatory properties and a map of optional ones:

  • Name
  • Durable (the queue will survive a broker restart)
  • Exclusive (used by only one connection and the queue will be deleted when that connection closes)
  • Auto-delete (queue that has had at least one consumer is deleted when last consumer unsubscribes)
  • Arguments (optional; used by plugins and broker-specific features such as message TTL, queue length limit, etc)

Note that not all property combination make sense in practice. For example, auto-delete and exclusive queues should be server-named. Such queues are supposed to be used for client-specific or connection (session)-specific data.

When auto-delete or exclusive queues use well-known (static) names, in case of client disconnection and immediate reconnection there will be a natural race condition between RabbitMQ nodes that will delete such queues and recovering clients that will try to re-declare them. This can result in client-side connection recovery failure or exceptions, and create unnecessary confusion or affect application availability.

Declaration and Property Equivalence

Before a queue can be used it has to be declared. Declaring a queue will cause it to be created if it does not already exist. The declaration will have no effect if the queue does already exist and its attributes are the same as those in the declaration. When the existing queue attributes are not the same as those in the declaration a channel-level exception with code 406 (PRECONDITION_FAILED) will be raised.

Specifically for the queue type property, the property equivalence checks can be relaxed or configured to use a default.

See the Virtual Hosts guide to learn more.

Message Ordering in RabbitMQ

Queues in RabbitMQ are ordered collections of messages. Messages are enqueued and dequeued (delivered to consumers) in the FIFO manner.

FIFO ordering is not guaranteed for priority and sharded queues.

Ordering also can be affected by the presence of multiple competing consumers, consumer priorities, message redeliveries. This applies to redeliveries of any kind: automatic after channel closure and negative consumer acknowledgements.

Applications can assume messages published on a single channel will be enqueued in publishing order in all the queues they get routed to. When publishing happens on multiple connections or channels, their sequences of messages will be routed concurrently and interleaved.

Consuming applications can assume that initial deliveries (those where the redelivered property is set to false) to a single consumer are performed in the same FIFO order as they were enqueued. For repeated deliveries (the redelivered property is set to true), original ordering can be affected by the timing of consumer acknowledgements and redeliveries, and thus not guaranteed.

In case of multiple consumers, messages will be dequeued for delivery in the FIFO order but actual delivery will happen to multiple consumers. If all of the consumers have equal priorities, they will be picked on a round-robin basis. Only consumers on channels that have not exceeded their prefetch value (the number of outstanding unacknowledged deliveries) will be considered.

Durability

Queues can be durable or transient. Metadata of a durable queue is stored on disk, while metadata of a transient queue is stored in memory when possible. The same distinction is made for messages at publishing time in some protocols, e.g. AMQP 0-9-1 and MQTT.

In environments and use cases where durability is important, applications must use durable queues and make sure that publishers mark published messages as persisted.

Transient queues will be deleted on node boot. They therefore will not survive a node restart, by design. Messages in transient queues will also be discarded.

Durable queues will be recovered on node boot, including messages in them published as persistent. Messages published as transient will be discarded during recovery, even if they were stored in durable queues.

How to Choose

In most other cases, durable queues are the recommended option. For replicated queues, the only reasonable option is to use durable queues.

Throughput and latency of a queue is not affected by whether a queue is durable or not in most cases. Only environments with very high queue or binding churn — that is, where queues are deleted and re-declared hundreds or more times a second — will see latency improvements for some operations, namely on bindings. The choice between durable and transient queues therefore comes down to the semantics of the use case.

Temporary queues can be a reasonable choice for workloads with transient clients, for example, temporary WebSocket connections in user interfaces, mobile applications and devices that are expected to go offline or use switch identities. Such clients usually have inherently transient state that should be replaced when the client reconnects.

Some queue types do not support transient queues. Quorum queues must be durable due to the assumptions and requirements of the underlying replication protocol, for example.

Temporary Queues

With some workloads queues are supposed to be short lived. While clients can delete the queues they declare before disconnection, this is not always convenient. On top of that, client connections can fail, potentially leaving unused resources (queues) behind.

There are three ways to make queue deleted automatically:

  • Exclusive queues (covered below)
  • TTLs (also covered below)
  • Auto-delete queues

An auto-delete queue will be deleted when its last consumer is cancelled (e.g. using the basic.cancel in AMQP 0-9-1) or gone (closed channel or connection, or lost TCP connection with the server).

If a queue never had any consumers, for instance, when all consumption happens using the basic.get method (the "pull" API), it won't be automatically deleted. For such cases, use exclusive queues or queue TTL.

Replicated and Distributed Queues

Quorum queues is replicated, data safety and consistency-oriented queue type. Classic queues historically supported replication but this feature was removed for RabbitMQ 4.x.

Any client connection can use any queue, whether it is replicated or not, regardless of the node the queue replica is hosted on or the node the client is connected to. RabbitMQ will route the operations to the appropriate node transparently for clients.

For example, in a cluster with nodes A, B and C, a client connected to node A can consume from a queue Q hosted on B, while a client connected to node C can publish in a way that routes messages to queue Q.

Client libraries or applications may choose to connect to the node that hosts the current leader replica of a specific queue for improved data locality.

This general rule applies to all messaging data types supported by RabbitMQ except for one. Streams are an exception to this rule, and require clients, regardless of the protocol they use, to connect to a node that hosts a replica (a leader of rollower) of the target stream. Consequently, RabbitMQ Stream protocol clients will connect to multiple nodes in parallel.

Queues can also be federated across loosely coupled nodes or clusters.

Note that intra-cluster replication and federation are orthogonal features and should not be considered direct alternatives.

Streams is another replicated data structure supported by RabbitMQ, with a different set of supported operations and features.

Non-Replicated Queues and Client Operations

Any client connection can use any queue, including non-replicated (single replica) queues, regardless of the node the queue replica is hosted on or the node the client is connected to. RabbitMQ will route the operations to the appropriate node transparently for clients.

For example, in a cluster with nodes A, B and C, a client connected to node A can consume from a queue Q hosted on B, while a client connected to node C can publish in a way that routes messages to queue Q.

Client libraries or applications may choose to connect to the node that hosts the current leader replica of a specific queue for improved data locality.

This general rule applies to all messaging data types supported by RabbitMQ except for one. Streams are an exception to this rule, and require clients, regardless of the protocol they use, to connect to a node that hosts a replica (a leader of rollower) of the target stream. Consequently, RabbitMQ Stream protocol clients will connect to multiple nodes in parallel.

Consumers and Acknowledgements

Messages can be consumed by registering a consumer (subscription), which means RabbitMQ will push messages to the client, or fetched individually for protocols that support this (e.g. the basic.get AMQP 0-9-1 method), similarly to HTTP GET.

Delivered messages can be acknowledged by consumer explicitly or automatically as soon as a delivery is written to connection socket.

Automatic acknowledgement mode generally will provide higher throughput rate and uses less network bandwidth. However, it offers the least number of guarantees when it comes to failures. As a rule of thumb, consider using manual acknowledgement mode first.

Prefetch and Consumer Overload

Automatic acknowledgement mode can also overwhelm consumers which cannot process messages as quickly as they are delivered. This can result in permanently growing memory usage and/or OS swapping for the consumer process.

Manual acknowledgement mode provides a way to set a limit on the number of outstanding (unconfirmed) deliveries: channel QoS (prefetch).

Consumers using higher (several thousands or more) prefetch levels can experience the same overload problem as consumers using automatic acknowledgements.

High number of unacknowledged messages will lead to higher memory usage by the broker.

Management Plugin

Overview

The RabbitMQ management plugin provides an HTTP-based API for management and monitoring of RabbitMQ nodes and clusters, along with a browser-based UI and a command line tool, rabbitmqadmin.

It periodically collects and aggregates data about many aspects of the system. Those metrics are exposed to human operators in the UI. The API it provides can be used by monitoring systems, however, Prometheus is the recommended option for long term storage, alerting, visualisation, chart analysis and so on.

The plugin also provides tools for analysing memory usage of the node, and other features related to monitoring, metrics, user, permission, and topology management. Previously it also provided definition export and import functionality. Those are now core RabbitMQ features and do not require or rely on this plugin.

This guide covers:

The plugin also provides extension points that other plugins, such as rabbitmq-top or rabbitmq-shovel-management, use to extend the UI.

Getting Started

The management plugin is included in the RabbitMQ distribution. Like any other plugin, it must be enabled before it can be used. That's done using rabbitmq-plugins:

rabbitmq-plugins enable rabbitmq_management

Node restart is not required after plugin activation.

During automated deployments, the plugin can be enabled via enabled plugin file.

Usage

Management UI Access

The management UI can be accessed using a Web browser at http://*node-hostname*:15672/.

For example, for a node running on a machine with the hostname of warp10.local, it can be accessed by users with sufficient privileges at either http://warp10.local:15672/ or http://localhost:15672/ (provided that localhost resolves correctly).

Note that the UI and HTTP API port — typically 15672 — does not support AMQP 0-9-1, AMQP 1.0, STOMP or MQTT connections. Separate ports should be used by those clients.

Users must be granted permissions for management UI access.

Notable Features

The management UI is implemented as a single page application which relies on the HTTP API. Some of the features include:

  • Declare, list and delete exchanges, queues, bindings, users, virtual hosts and user permissions.
  • Monitor queue length, message rates (globally and per queue, exchange or channel), resource usage of queue, node GC activity, data rates of client connections, and more.
  • Monitor node resource use: sockets and file descriptors, memory usage breakdown, available disk space and bandwidth usage on inter-node communication links.
  • Manage users (provided administrative permissions of the current user).
  • Manage policies and runtime parameters (provided sufficient permissions of the current user).
  • Export schema (vhosts, users, permissions, queues, exchanges, bindings, parameters, policies) and import it on node start. This can be used for recovery purposes or setup automation of new nodes and clusters.
  • Force close client connections, purge queues.
  • Send and receive messages (useful in development environments and for troubleshooting).

The UI application supports recent versions of Google Chrome, Safari, Firefox, and Microsoft Edge browsers.

Management UI Access in Clusters

Any cluster node with rabbitmq_management plugin enabled can be used for management UI access or data collection by monitoring tools. It will reach out to other nodes and collect their stats, then aggregate and return a response to the client.

To access management UI the user has to authenticate and have certain permissions (be authorised). This is covered in the following section.

Access and Permissions

The management UI requires authentication and authorisation, much like RabbitMQ requires it from connecting clients. In addition to successful authentication, management UI access is controlled by user tags. The tags are managed using rabbitmqctl. Newly created users do not have any tags set on them by default.

See Deployment Guidelines for general recommendations on user and credential management.

TagCapabilities
(None)No access to the management plugin
managementAnything the user could do via messaging protocols plus:List virtual hosts to which they can log in via AMQPView all queues, exchanges and bindings in "their" virtual hostsView and close their own channels and connectionsView "global" statistics covering all their virtual hosts, including activity by other users within them
policymakerEverything "management" can plus:View, create and delete policies and parameters for virtual hosts to which they can log in via AMQP
monitoringEverything "management" can plus:List all virtual hosts, including ones they could not access using messaging protocolsView other users's connections and channelsView node-level data such as memory use and clusteringView truly global statistics for all virtual hosts
administratorEverything "policymaker" and "monitoring" can plus:Create and delete virtual hostsView, create and delete usersView, create and delete permissionsClose other users's connections

Note that since "administrator" does everything "monitoring" does, and "monitoring" does everything "management" does, each user often needs a maximum of one tag.

Normal RabbitMQ permissions to resources still apply to monitors and administrators; just because a user is a monitor or administrator does not grant them full access to exchanges, queues and bindings through the management plugin or other means.

All users can only list objects within the virtual hosts they have any permissions for.

MQTT Plugin

Overview

RabbitMQ supports MQTT versions 3.1, 3.1.1, and 5.0 via a plugin that ships in the core distribution.

This guide covers the following topics:

Enabling the Plugin

The MQTT plugin is included in the RabbitMQ distribution. Before clients can successfully connect, it must be enabled on all cluster nodes using rabbitmq-plugins:

rabbitmq-plugins enable rabbitmq_mqtt

Supported MQTT features

RabbitMQ supports most MQTT 5.0 features including the following:

The MQTT 5.0 blog post provides a complete list of supported MQTT 5.0 features including their usage and implementation details.

MQTT clients can interoperate with other protocols. For example, MQTT publishers can send messages to AMQP 0.9.1 or AMQP 1.0 consumers if these consumers consume from a queue that is bound to the MQTT topic exchange (configured via mqtt.exchange and defaulting to amq.topic). Likewise an AMQP 0.9.1, AMQP 1.0, or STOMP publisher can send messages to an MQTT subscriber if the publisher publishes to the MQTT topic exchange.

Limitations

The following MQTT features are unsupported:

The following MQTT features are supported with limitations:

  • Retained messages are stored and queried only node local. See Retained Messages and Stores.
  • Overlapping subscriptions with different QoS levels can result in duplicate messages being delivered. Applications need to account for this. For example when the same MQTT client creates a QoS 0 subscription for topic filter /sports/football/epl/# and a QoS 1 subscription for topic filter /sports/football/#, it will be delivered duplicate messages.

How the MQTT plugin works

RabbitMQ MQTT plugin targets MQTT 3.1, 3.1.1, and 5.0 supporting a broad range of MQTT clients. It also makes it possible for MQTT clients to interoperate with AMQP 0-9-1, AMQP 1.0, and STOMP clients. There is also support for multi-tenancy.

Mapping MQTT to the AMQP 0.9.1 model

RabbitMQ core implements the AMQP 0.9.1 protocol. The plugin builds on top of the AMQP 0.9.1 entities: exchanges, queues, and bindings. Messages published to MQTT topics are routed by an AMQP 0.9.1 topic exchange. MQTT subscribers consume from queues bound to the topic exchange.

The MQTT plugin creates a dedicated queue per MQTT subscriber. To be more precise, there could be 0, 1, or 2 queues per MQTT session:

  • There are 0 queues for an MQTT session if the MQTT client never sends a SUBSCRIBE packet. The MQTT client is only publishing messages.
  • There is 1 queue for an MQTT session if the MQTT client creates one or multiple subscriptions with the same QoS level.
  • There are 2 queues for an MQTT session if the MQTT client creates subscriptions with both QoS levels: QoS 0 and QoS 1.

When listing queues you will observe the queue naming pattern mqtt-subscription-<MQTT client ID>qos[0|1] where <MQTT client ID> is the MQTT client identifier and [0|1] is either 0 for a QoS 0 subscription or 1 for a QoS 1 subscription. Having a separate queue per MQTT subscriber allows every MQTT subscriber to receive its own copy of the application message.

The plugin creates queues transparently for MQTT subscribing clients. The MQTT specification does not define the concept of queues and MQTT clients are not aware that these queues exist. A queue is an implementation detail of how RabbitMQ implements the MQTT protocol.

Queue Types

An MQTT client can publish a message to any queue type. For that to work, a classic queue, quorum queue, or stream must be bound to the topic exchange with a binding key matching the topic of the PUBLISH packet.

The MQTT plugin creates a classic queue, quorum queue, or MQTT QoS 0 queue per MQTT subscriber. By default, the plugin creates a classic queue.

The plugin can be configured to create quorum queues (instead of classic queues) for subscribers whose MQTT session lasts longer than their MQTT network connection. This is explained in section Quorum Queues.

If feature flag rabbit_mqtt_qos0_queue is enabled, the plugin creates an MQTT QoS 0 queue for QoS 0 subscribers whose MQTT session last as long as their MQTT network connection. This is explained in section MQTT QoS 0 queue type.

Queue Properties and Arguments

Since RabbitMQ 3.12 all queues created by the MQTT plugin

  • are durable, i.e. queue metadata is stored on disk.
  • are exclusive if the MQTT session lasts as long as the MQTT network connection. In that case, RabbitMQ will delete all state for the MQTT client - including its queue - when the network connection (and session) ends. Only the subscribing MQTT client can consume from its queue.
  • are not auto-delete. For example, if an MQTT client subscribes to a topic and subsequently unsubscribes, the queue will not be deleted. However, the queue will be deleted when the MQTT session ends.
  • have a Queue TTL set (queue argument x-expires) if the MQTT session expires eventually (i.e. session expiry is not disabled by the RabbitMQ operator, see below) and outlasts the MQTT network connection. The Queue TTL (in milliseconds) is determined by the minimum of the Session Expiry Interval (in seconds) requested by the MQTT client in the CONNECT packet and the server side configured mqtt.max_session_expiry_interval_seconds.

The default value for mqtt.max_session_expiry_interval_seconds is 86400 (1 day). A RabbitMQ operator can force all MQTT sessions to end as soon as their network connections end by setting this parameter to 0.

A RabbitMQ operator can allow MQTT sessions to last forever by setting this parameter to infinity. This carries a risk: short-lived clients that don't use clean sessions can leave queues and messages behind, which will consume resources and require manual cleanup.

RabbitMQ deletes any state for the MQTT client when the MQTT session ends. This state includes the client's queue(s) including QoS 0 and QoS 1 messages and the queue bindings (i.e. the client's subscriptions).

Topic level separator and wildcards

The MQTT protocol specification defines slash ("/") as topic level separator whereas AMQP 0-9-1 defines a dot (".") as topic level separator. This plugin translates patterns under the hood to bridge the two.

For example, MQTT topic cities/london becomes AMQP 0.9.1 topic cities.london and vice versa. Therefore, when an MQTT client publishes a message with topic cities/london, if an AMQP 0.9.1 client wants to receive that message, it should create a binding from its queue to the topic exchange with binding key cities.london.

Vice versa, when an AMQP 0.9.1 client publishes a message with topic cities.london, if an MQTT client wants to receive that message, it should create an MQTT subscription with topic filter cities/london.

This has one important limitation: MQTT topics that have dots in them won't work as expected and are to be avoided, the same goes for AMQP 0-9-1 routing and binding keys that contains slashes.

Furthermore, MQTT defines the plus sign ("+") as single-level wildcard whereas AMQP 0.9.1 defines the asterisk sign ("*") to match a single word:

MQTTAMQP 0.9.1Description
/ (slash). (dot)topic level separator
+ (plus)* (asterisk)single-level wildcard (match a single word)
# (hash)# (hash)multi-level wildcard (match zero or more words)

Users and Authentication

MQTT clients will be able to connect provided that they have a set of credentials for an existing user with the appropriate permissions.

For an MQTT connection to succeed, it must successfully authenticate and the user must have the appropriate permissions to the virtual host used by the plugin (see below).

MQTT clients can (and usually do) specify a set of credentials when they connect. The credentials can be a username and password pair, or a x.509 certificate (see below).

The plugin supports anonymous authentication but its use is highly discouraged and it is a subject to certain limitations (listed below) enforced for a reasonable level of security by default.

Users and their permissions can be managed using rabbitmqctl, management UI or HTTP API.

For example, the following commands create a new user for MQTT connections with full access to the default virtual host used by this plugin:

# username and password are both "mqtt-test"
rabbitmqctl add_user mqtt-test mqtt-test
rabbitmqctl set_permissions -p "/" mqtt-test ".*" ".*" ".*"
rabbitmqctl set_user_tags mqtt-test management

Note that colons may not appear in usernames.

Plugin Configuration

Here is a sample configuration that demonstrates a number of MQTT plugin settings:

mqtt.listeners.tcp.default = 1883
## Default MQTT with TLS port is 8883
# mqtt.listeners.ssl.default = 8883

# anonymous connections, if allowed, will use the default
# credentials specified here
mqtt.allow_anonymous  = true
mqtt.default_user     = guest
mqtt.default_pass     = guest

mqtt.vhost            = /
mqtt.exchange         = amq.topic
mqtt.prefetch         = 10
# 24 hours by default
mqtt.max_session_expiry_interval_seconds = 86400

Virtual Hosts

RabbitMQ is a multi-tenant system at the core and every connection belongs to a virtual host. Some messaging protocols have the concept of vhosts, others don't. MQTT falls into the latter category. Therefore the MQTT plugin needs to provide a way to map connections to vhosts.

The vhost option controls which RabbitMQ vhost the adapter connects to by default. The vhost configuration is only consulted if no vhost is provided during connection establishment. There are several (optional) ways to specify the vhost the client will connect to.

Port to Virtual Host Mapping

First way is mapping MQTT plugin (TCP or TLS) listener ports to vhosts. The mapping is specified thanks to the mqtt_port_to_vhost_mapping global runtime parameter. Let's take the following plugin configuration:

mqtt.listeners.tcp.1 = 1883
mqtt.listeners.tcp.2 = 1884

mqtt.listeners.ssl.1 = 8883
mqtt.listeners.ssl.2 = 8884

# (other TLS settings are omitted for brevity)

mqtt.vhost            = /

Note the plugin listens on ports 1883, 1884, 8883, and 8884. Imagine you want clients connecting to ports 1883 and 8883 to connect to the vhost1 virtual host, and clients connecting to ports 1884 and 8884 to connect to the vhost2 virtual host. A port-to-vhost mapping can be created by setting the mqtt_port_to_vhost_mapping global parameter with rabbitmqctl:

  • bash
  • PowerShell
  • HTTP API
rabbitmqctl set_global_parameter mqtt_port_to_vhost_mapping \
    '{"1883":"vhost1", "8883":"vhost1", "1884":"vhost2", "8884":"vhost2"}'

If there's no mapping for a given port (because the port cannot be found in the mqtt_port_to_vhost_mapping global parameter JSON document or if the global parameter isn't set at all), the plugin will try to extract the virtual host from the username (see below) and will ultimately use the vhost plugin config option.

The broker queries the mqtt_port_to_vhost_mapping global parameter value at connection time. If the value changes, connected clients are not notified or disconnected. They need to reconnect to switch to a new virtual host.

Virtual Host as Part of Username

Another and more specific way to specify a vhost while connecting is to prepend the vhost to the username and to separate with a colon.

For example, connecting with

/:guest

is equivalent to the default vhost and username, while

mqtt-vhost:mqtt-username

means connecting to the vhost mqtt-host with username mqtt-username.

Specifying the virtual host in the username takes precedence over the port-to-vhost mapping specified with the mqtt_port_to_vhost_mapping global parameter.

Flow Control

The prefetch option controls the maximum number of unacknowledged PUBLISH packets with QoS=1 that will be delivered. This option is interpreted in the same way as consumer prefetch.

An MQTT 5.0 client can define a lower number by setting Receive Maximum in the CONNECT packet.

Custom Exchanges

The exchange option determines which exchange messages from MQTT clients are published to. The exchange must be created before clients publish any messages. The exchange is expected to be a topic exchange.

The default topic exchange amq.topic is pre-declared: It therefore exists when RabbitMQ is started.

Retained Messages and Stores

The plugin supports retained messages with the limitations described in this section. The message store implementation is pluggable and the plugin ships with two implementation out of the box:

  • ETS-based (in memory), implemented in module rabbit_mqtt_retained_msg_store_ets
  • DETS-based (on disk), implemented in module rabbit_mqtt_retained_msg_store_dets

Both implementations have limitations and trade-offs. With the first one, the maximum number of messages that can be retained is limited by RAM. With the second one, there is a limit of 2 GB per vhost. Both are node-local: Retained messages are neither replicated to nor queried from remote cluster nodes.

An example that works is the following: An MQTT Client publishes a retained message to node A with topic topic/1. Thereafter another client subscribes with topic filter topic/1 on node A. The new subscriber will receive the retained message.

However, if a client publishes a retained message on node A and another client subsequently subscribes on node B, that subscribing client will not receive any retained message stored on node A.

Furthermore, if the topic filter contains wildcards (the multi-level wildcard character “#” or the single-level wildcard character “+”), no retained messages are sent.

To configure the store, use the mqtt.retained_message_store configuration key:

## use DETS (disk-based) store for retained messages
mqtt.retained_message_store = rabbit_mqtt_retained_msg_store_dets
## only used by DETS store (in milliseconds)
mqtt.retained_message_store_dets_sync_interval = 2000

The value must be a module that implements the store:

  • rabbit_mqtt_retained_msg_store_ets for RAM-based
  • rabbit_mqtt_retained_msg_store_dets for disk-based (This is the default value.)

These implementations are suitable for development but sometimes won't be for production needs. The MQTT specification does not define consistency or replication requirements for retained message stores, therefore RabbitMQ allows for custom ones to meet the consistency and availability needs of a particular environment. For example, stores based on Riak and Cassandra would be suitable for most production environments as those data stores provide tunable consistency.

Message stores must implement the rabbit_mqtt_retained_msg_store behaviour.


评论