普通的订阅、发布
普通redis订阅,是以用container做容器的。
添加依赖
<!-- pool 对象池 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
</dependency>
<!-- redis 缓存操作 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
package com.iot.api.pubsub;
import com.iot.common.constant.PubSubConstants;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
@Configuration
public class RedisSubscibeConfig {
@Bean
public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, @Qualifier("monitorIdListenerAdapter") MessageListenerAdapter adapter) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
// 可以添加多个MessageListener
container.addMessageListener(adapter, new PatternTopic(PubSubConstants.TOPIC_MONITOR_WS_ID));
return container;
}
@Bean("monitorIdListenerAdapter")
public MessageListenerAdapter monitorListenerAdapter(@Qualifier("monitorIdReceiveListener") MessageListener messageListener) {
// messageListenerAdapter 传入一个消息接受的处理器,利用反射的方式调用对应的处理方法
return new MessageListenerAdapter(messageListener, "onMessage");
}
}
自定义的MessageListener,如下:
package com.iot.api.pubsub;
import cn.hutool.core.collection.ConcurrentHashSet;
import com.alibaba.fastjson2.JSON;
import com.iot.common.constant.CacheConstants;
import com.iot.common.constant.PubSubConstants;
import com.iot.common.core.redis.RedisCache;
import com.iot.dto.pubsub.MonitorSubNotificationDTO;
import org.apache.commons.collections4.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.stereotype.Component;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
@Component("monitorIdReceiveListener")
public class MonitorIdReceiveListener implements MessageListener {
private static final Logger LOGGER = LoggerFactory.getLogger(MonitorIdReceiveListener.class);
@Autowired
private RedisTemplate redisTemplate;
@Autowired
private RedisCache redisCache;
private RedisSerializer<String> keySerializer;
private RedisSerializer valueSerializer;
private volatile Set<String> monitorSubscribeSet = new HashSet<>();
@Override
public void onMessage(Message message, byte[] pattern) {
if (Objects.isNull(keySerializer)) {
keySerializer = redisTemplate.getKeySerializer();
}
if (Objects.isNull(valueSerializer)) {
valueSerializer = redisTemplate.getValueSerializer();
}
String channelName = keySerializer.deserialize(message.getChannel());
if (channelName.equals(PubSubConstants.TOPIC_MONITOR_WS_ID)) {
MonitorSubNotificationDTO monitorSubNotificationDTO = (MonitorSubNotificationDTO) valueSerializer.deserialize(message.getBody());
LOGGER.info("channelName:{}, monitorSubNotificationDTO:{}", channelName, JSON.toJSONString(monitorSubNotificationDTO));
// 读取CacheConstants.LIVE_CURVE_SESSION_PREFIX + sessionId
Collection<String> keys = redisCache.keys(CacheConstants.LIVE_CURVE_SESSION_PREFIX.concat("*"));
monitorSubscribeSet.clear();
if (CollectionUtils.isNotEmpty(keys)) {
//
for (String key : keys) {
List<String> liveCurveMonitorIds = (List<String>) redisCache.getCacheObject(key);
monitorSubscribeSet.addAll(liveCurveMonitorIds);
}
}
}
LOGGER.info("monitorSubscribeSet:{}", JSON.toJSONString(monitorSubscribeSet));
}
public Set<String> getSubscribedMonitorIds() {
return monitorSubscribeSet;
}
}
动态发布/订阅
refer to :https://diuut.com/?p=874
普通redis订阅,是以用container做容器,配置类配置文件方式直接在spring init的时候进行加载,不能进行动态添加。在程序运行时修改不起作用。
如果只是当做固定的消息队列进行订阅发布,足够,但是如果需求是根据前台传入的字段,动态的订阅的话就无法满足了,想要实现就不能用 container 的方式进行订阅,但是可以利用Lettuce客户端进行订阅
实现如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.5.14</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.wp</groupId>
<artifactId>redis-pubsub-demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>redis-pubsub-demo</name>
<description>redis-pubsub-demo</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<!-- pool 对象池 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
</dependency>
<!-- redis 缓存操作 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
application.yml:
spring:
redis:
# 地址
host: 221.181.222.135
# 端口,默认为6379
port: 6379
# 数据库索引
database: 0
# 密码
password: long123456
# 连接超时时间
timeout: 10s
lettuce:
pool:
# 连接池中的最小空闲连接
min-idle: 0
# 连接池中的最大空闲连接
max-idle: 8
# 连接池的最大数据库连接数
max-active: 8
# #连接池最大阻塞等待时间(使用负值表示没有限制)
max-wait: -1ms
server:
port: 8087
核心类DynamicSubscribeService:
package com.wp.redispubsubdemo.subscribe;
import io.lettuce.core.AbstractRedisClient;
import io.lettuce.core.RedisClient;
import io.lettuce.core.pubsub.RedisPubSubListener;
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.connection.util.ByteArrayWrapper;
import org.springframework.data.redis.core.RedisConnectionUtils;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.List;
@Service
public class DynamicSubscribeService {
@Autowired
private RedisTemplate redisTemplate;
@Autowired
private MyRedisPubSubListener redisPubSubListener;
private volatile RedisSerializer<String> serializer = RedisSerializer.string();
private StatefulRedisPubSubConnection<String, String> redisPubSubConnection;
@PostConstruct
public void initSub(){
RedisConnectionFactory connectionFactory = redisTemplate.getConnectionFactory();
LettuceConnectionFactory lettuceConnectionFactory = (LettuceConnectionFactory)connectionFactory;
RedisClient redisClient = (RedisClient)lettuceConnectionFactory.getNativeClient();
redisPubSubConnection = redisClient.connectPubSub();
redisPubSubConnection.addListener((RedisPubSubListener)redisPubSubListener);
}
public void subscribe(String channelName) {
// redisPubSubConnection.async().subscribe("monitor_228964512222830217");
redisPubSubConnection.async().subscribe(channelName);
}
}
监听的listener:
package com.wp.redispubsubdemo.subscribe;
import io.lettuce.core.pubsub.RedisPubSubListener;
import org.springframework.stereotype.Component;
@Component
public class MyRedisPubSubListener implements RedisPubSubListener<String,Object> {
@Override
public void message(String channel, Object message) {
System.out.println("-----------MyRedisPubSubListener message1------------------");
System.out.println("channel:"+channel+", message:"+message);
}
@Override
public void message(String pattern, String channel, Object message) {
System.out.println("-----------MyRedisPubSubListener message2------------------");
}
@Override
public void subscribed(String channel, long count) {
System.out.println("-----------MyRedisPubSubListener subscribed------------------");
}
@Override
public void psubscribed(String pattern, long count) {
System.out.println("-----------MyRedisPubSubListener psubscribed------------------");
}
@Override
public void unsubscribed(String channel, long count) {
System.out.println("-----------MyRedisPubSubListener unsubscribed------------------");
}
@Override
public void punsubscribed(String pattern, long count) {
System.out.println("-----------MyRedisPubSubListener punsubscribed------------------");
}
}
controller:
package com.wp.redispubsubdemo.controller;
import com.wp.redispubsubdemo.subscribe.DynamicSubscribeService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/pub")
public class PubController {
@Autowired
private RedisTemplate redisTemplate;
@Autowired
private DynamicSubscribeService dynamicSubscribeService;
@GetMapping(value = "/publishMessage")
public void publishMessage(String channel,String message){
// 发布者
redisTemplate.convertAndSend(channel,message);
}
@GetMapping(value = "/dynamicSub")
public void dynamicSub(String channelName){
// 发布者
dynamicSubscribeService.subscribe(channelName);
}
}
测试:
http://localhost:8087/pub/dynamicSub?channelName=topicz
http://localhost:8087/pub/dynamicSub?channelName=topicl
http://localhost:8087/pub/publishMessage?channel=topicz&message=zhangsan
http://localhost:8087/pub/publishMessage?channel=topicl&message=lisi