Administrator
发布于 2023-03-17 / 43 阅读
0
0

Redis发布订阅

普通的订阅、发布

普通redis订阅,是以用container做容器的。

添加依赖


<!-- pool 对象池 -->
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-pool2</artifactId>
        </dependency>

        <!-- redis 缓存操作 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>
package com.iot.api.pubsub;

import com.iot.common.constant.PubSubConstants;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;

@Configuration
public class RedisSubscibeConfig {

    @Bean
    public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, @Qualifier("monitorIdListenerAdapter") MessageListenerAdapter adapter) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        // 可以添加多个MessageListener
        container.addMessageListener(adapter, new PatternTopic(PubSubConstants.TOPIC_MONITOR_WS_ID));
        return container;
    }

    @Bean("monitorIdListenerAdapter")
    public MessageListenerAdapter monitorListenerAdapter(@Qualifier("monitorIdReceiveListener") MessageListener messageListener) {
        // messageListenerAdapter 传入一个消息接受的处理器,利用反射的方式调用对应的处理方法
        return new MessageListenerAdapter(messageListener, "onMessage");
    }


}

自定义的MessageListener,如下:

package com.iot.api.pubsub;

import cn.hutool.core.collection.ConcurrentHashSet;
import com.alibaba.fastjson2.JSON;
import com.iot.common.constant.CacheConstants;
import com.iot.common.constant.PubSubConstants;
import com.iot.common.core.redis.RedisCache;
import com.iot.dto.pubsub.MonitorSubNotificationDTO;
import org.apache.commons.collections4.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.stereotype.Component;

import java.util.*;
import java.util.concurrent.ConcurrentHashMap;

@Component("monitorIdReceiveListener")
public class MonitorIdReceiveListener implements MessageListener {
    private static final Logger LOGGER = LoggerFactory.getLogger(MonitorIdReceiveListener.class);

    @Autowired
    private RedisTemplate redisTemplate;

    @Autowired
    private RedisCache redisCache;

    private RedisSerializer<String> keySerializer;

    private RedisSerializer valueSerializer;

    private volatile Set<String> monitorSubscribeSet = new HashSet<>();

    @Override
    public void onMessage(Message message, byte[] pattern) {
        if (Objects.isNull(keySerializer)) {
            keySerializer = redisTemplate.getKeySerializer();
        }
        if (Objects.isNull(valueSerializer)) {
            valueSerializer = redisTemplate.getValueSerializer();
        }

        String channelName = keySerializer.deserialize(message.getChannel());
        if (channelName.equals(PubSubConstants.TOPIC_MONITOR_WS_ID)) {
            MonitorSubNotificationDTO monitorSubNotificationDTO = (MonitorSubNotificationDTO) valueSerializer.deserialize(message.getBody());
            LOGGER.info("channelName:{}, monitorSubNotificationDTO:{}", channelName, JSON.toJSONString(monitorSubNotificationDTO));

            // 读取CacheConstants.LIVE_CURVE_SESSION_PREFIX + sessionId
            Collection<String> keys = redisCache.keys(CacheConstants.LIVE_CURVE_SESSION_PREFIX.concat("*"));
            monitorSubscribeSet.clear();
            if (CollectionUtils.isNotEmpty(keys)) {
                //
                for (String key : keys) {
                    List<String> liveCurveMonitorIds = (List<String>) redisCache.getCacheObject(key);
                    monitorSubscribeSet.addAll(liveCurveMonitorIds);
                }
            }
        }
        LOGGER.info("monitorSubscribeSet:{}", JSON.toJSONString(monitorSubscribeSet));
    }

    public Set<String> getSubscribedMonitorIds() {
        return monitorSubscribeSet;
    }

}

动态发布/订阅

refer to :https://diuut.com/?p=874

普通redis订阅,是以用container做容器,配置类配置文件方式直接在spring init的时候进行加载,不能进行动态添加。在程序运行时修改不起作用。

如果只是当做固定的消息队列进行订阅发布,足够,但是如果需求是根据前台传入的字段,动态的订阅的话就无法满足了,想要实现就不能用 container 的方式进行订阅,但是可以利用Lettuce客户端进行订阅

实现如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.5.14</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.wp</groupId>
    <artifactId>redis-pubsub-demo</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>redis-pubsub-demo</name>
    <description>redis-pubsub-demo</description>
    <properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <!-- pool 对象池 -->
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-pool2</artifactId>
        </dependency>

        <!-- redis 缓存操作 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>


        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>


application.yml:

spring:
  redis:
    # 地址
    host: 221.181.222.135
    # 端口,默认为6379
    port: 6379
    # 数据库索引
    database: 0
    # 密码
    password: long123456
    # 连接超时时间
    timeout: 10s
    lettuce:
      pool:
        # 连接池中的最小空闲连接
        min-idle: 0
        # 连接池中的最大空闲连接
        max-idle: 8
        # 连接池的最大数据库连接数
        max-active: 8
        # #连接池最大阻塞等待时间(使用负值表示没有限制)
        max-wait: -1ms

server:
  port: 8087

核心类DynamicSubscribeService:

package com.wp.redispubsubdemo.subscribe;

import io.lettuce.core.AbstractRedisClient;
import io.lettuce.core.RedisClient;
import io.lettuce.core.pubsub.RedisPubSubListener;
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.connection.util.ByteArrayWrapper;
import org.springframework.data.redis.core.RedisConnectionUtils;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.List;

@Service
public class DynamicSubscribeService {

    @Autowired
    private RedisTemplate redisTemplate;

    @Autowired
    private MyRedisPubSubListener redisPubSubListener;

    private volatile RedisSerializer<String> serializer = RedisSerializer.string();

    private StatefulRedisPubSubConnection<String, String> redisPubSubConnection;

    @PostConstruct
    public void initSub(){
        RedisConnectionFactory connectionFactory = redisTemplate.getConnectionFactory();
        LettuceConnectionFactory lettuceConnectionFactory = (LettuceConnectionFactory)connectionFactory;
        RedisClient redisClient = (RedisClient)lettuceConnectionFactory.getNativeClient();
        redisPubSubConnection = redisClient.connectPubSub();
        redisPubSubConnection.addListener((RedisPubSubListener)redisPubSubListener);
    }


    public void subscribe(String channelName) {
//        redisPubSubConnection.async().subscribe("monitor_228964512222830217");
        redisPubSubConnection.async().subscribe(channelName);
    }

}

监听的listener:

package com.wp.redispubsubdemo.subscribe;

import io.lettuce.core.pubsub.RedisPubSubListener;
import org.springframework.stereotype.Component;

@Component
public class MyRedisPubSubListener implements RedisPubSubListener<String,Object> {
    @Override
    public void message(String channel, Object message) {
        System.out.println("-----------MyRedisPubSubListener message1------------------");
        System.out.println("channel:"+channel+", message:"+message);
    }

    @Override
    public void message(String pattern, String channel, Object message) {
        System.out.println("-----------MyRedisPubSubListener message2------------------");
    }

    @Override
    public void subscribed(String channel, long count) {
        System.out.println("-----------MyRedisPubSubListener subscribed------------------");
    }

    @Override
    public void psubscribed(String pattern, long count) {
        System.out.println("-----------MyRedisPubSubListener psubscribed------------------");
    }

    @Override
    public void unsubscribed(String channel, long count) {
        System.out.println("-----------MyRedisPubSubListener unsubscribed------------------");
    }

    @Override
    public void punsubscribed(String pattern, long count) {
        System.out.println("-----------MyRedisPubSubListener punsubscribed------------------");
    }
}

controller:


package com.wp.redispubsubdemo.controller;

import com.wp.redispubsubdemo.subscribe.DynamicSubscribeService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/pub")
public class PubController {

    @Autowired
    private RedisTemplate redisTemplate;

    @Autowired
    private DynamicSubscribeService dynamicSubscribeService;

    @GetMapping(value = "/publishMessage")
    public void publishMessage(String channel,String message){
        // 发布者
        redisTemplate.convertAndSend(channel,message);
    }

    @GetMapping(value = "/dynamicSub")
    public void dynamicSub(String channelName){
        // 发布者
        dynamicSubscribeService.subscribe(channelName);
    }

}

测试:

http://localhost:8087/pub/dynamicSub?channelName=topicz
http://localhost:8087/pub/dynamicSub?channelName=topicl


http://localhost:8087/pub/publishMessage?channel=topicz&message=zhangsan

http://localhost:8087/pub/publishMessage?channel=topicl&message=lisi

评论