Administrator
发布于 2023-02-06 / 93 阅读
0
0

Redis Template Best Pracetise

redis官网

https://redis.io/docs/reference/

redis命令

https://redis.io/commands/unlink/

工作中实际使用总结

设置多个Key/Value

使用redisTemplate的executePipelined 结合SessionCallback,来完成设置多个Key/Value

Map<String, Object> keyValueMap = new HashMap<>();

for (SignalRDTO signalRDTO : dataList) {
     keyValueMap.put(CacheConstants.VALUE_CHANGED_MONITOR + boxNo + CacheConstants.COLON_STR + signalRDTO.getId(), 1);
 }
 
redisCache.batchSetCacheObject(keyValueMap, CacheConstants.VALUE_CHANGED_MONITOR_EXPIRE, TimeUnit.MINUTES);

其中,batchSetCacheObject如下:

/**
     * 批量设置key 和 value
     * @param keyValueMap
     * @param timeout
     * @param timeUnit
     */
    public void batchSetCacheObject(Map<String,Object> keyValueMap, long timeout, TimeUnit timeUnit) {

        redisTemplate.executePipelined(new SessionCallback<Object>() {
            @Override
            public Object execute(RedisOperations operations) throws DataAccessException {
                for (Map.Entry<String, Object> entry : keyValueMap.entrySet()) {
                    operations.opsForValue().set(entry.getKey(), entry.getValue(), timeout, timeUnit);
                }
                return null;
            }
        });
    }

分布式锁

获取锁使用setIfAbsent方法,外加自旋思想


释放锁,使用lua脚本

@Component
public class RedisDistributeLock {

    private static final Logger LOGGER = LoggerFactory.getLogger(RedisDistributeLock.class);

    private static final String LOCK_PREFIX = "distribute:lock:";

    /**
     * KEYS[1]指的就是lockKey,ARGV[1]指第一个参数lockKeyValue,这里用于确定当前锁是否是当前进程的锁
     *
     * if redis.call('get',KEYS[1])==ARGV[1]
     * then
     * 	return redis.call('del',KEYS[1])
     * else
     * 	return 0
     * end
     *
     */
    private static final String RELEASE_LOCK_SCRIPT = "if redis.call('get',KEYS[1])==ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end";

    @Autowired
    public RedisTemplate redisTemplate;

    /**
     * 获取锁
     *
     * @param key
     * @param value
     * @param holdLockMaxTimeInSeconds 持有锁的最长时间,超过这个时间,自动释放锁  单位:秒
     * @param waitingLockTimeInSeconds 等待锁的时间  单位:秒
     */
    public boolean obtainLock(String key, String value, long holdLockMaxTimeInSeconds, long waitingLockTimeInSeconds) {
        // 自旋次数 自旋间隔:100ms
        long spinCount = waitingLockTimeInSeconds * (1000 / 100);
        long failCount = 0;
        while (failCount < spinCount) {
            Boolean lockSuccess = redisTemplate.opsForValue().setIfAbsent(LOCK_PREFIX + key, value, holdLockMaxTimeInSeconds, TimeUnit.SECONDS);
            if (lockSuccess) {
                return true;
            }
            //
            try {
                failCount++;
                TimeUnit.MILLISECONDS.sleep(100);
            } catch (InterruptedException e) {
                LOGGER.info("等待获取锁过程中,被中断,key:{},holdLockMaxTimeInSeconds:{},waitingLockTimeInSeconds:{}", key, holdLockMaxTimeInSeconds, waitingLockTimeInSeconds);
                return false;
            }
        }
        //自旋多次后,仍然没有获取到锁,直接退出
        return false;
    }

    /**
     * 释放锁
     *
     * @param key
     * @param value
     */
    public void releaseLock(String key, String value) {
        DefaultRedisScript<Long> defaultRedisScript = new DefaultRedisScript<>(RELEASE_LOCK_SCRIPT);
        defaultRedisScript.setResultType(Long.class);
        redisTemplate.execute(defaultRedisScript, Arrays.asList(LOCK_PREFIX + key), value);
    }
}

使用方式如下:

 String lockVal = UUID.randomUUID().toString();
        long curMs = DateTimeUtils.getMillSecondByLocalDateTime(LocalDateTime.now());
        try {
            boolean lockSuccess = distributeLock.obtainLock(LOCK_KEY, lockVal, 5, 15);
            if (!lockSuccess) {
                LOGGER.error("值稳定的监控点,拉取实时值,获取分布式锁失败");
                return null;
            }
            // 为了防止ip被屏蔽,这里,需要让程序被阻塞2s
            TimeUnit.SECONDS.sleep(2);
            CtUtil ctUtil = new CtUtil();
            curMs = DateTimeUtils.getMillSecondByLocalDateTime(LocalDateTime.now());
            monitorValueList = monitorImpl.getMonitorValue(getMonitorValueReqDTO);
            LOGGER.info("getMonitorValueCt:{}", ctUtil.cost());
        } catch (IOException | InterruptedException e) {
            LOGGER.error("查询监控点值,occur exception:{},monitorIds:{}", ExceptionUtils.getStackTrace(e), JSON.toJSONString(monitorIds));
        } finally {
            distributeLock.releaseLock(LOCK_KEY, lockVal);
        }

可以看到,虽然只是获取锁,但是传入了一个key 和 一个value,其中key就是分布式锁,而value的作用是什么呢?

value的作用是:当我们释放锁时,我们需要拿着这个value和 redis中这个key,对应的value进行比较,如果相同,才能删除这个key,释放锁;如果不同,那么就不能删除这个key。原因是:防止A进程释放了B进程的锁。

场景:A进程加锁,执行A的业务逻辑,花了10s,等到第6s时,其实这个key已经过期了,被自动删除了,所以B进程能够成功加锁,所以第6s时,此时的锁,应该属于B进程。

但是等到第10s时,如果A进程直接删除这个key,那么就表示,A进程释放了B进程的锁,那么这个在逻辑上是错误的。因为只有B进程,才能释放B进程的锁,否则,会做成数据混乱。

根据前缀,获取匹配的所有key

不能使用keys命令,应该使用scan命令


 Set<String> resultKeys = (Set<String>)redisTemplate.execute(new RedisCallback<Set<String>>() {
            @Override
            public Set<String> doInRedis(RedisConnection connection) throws DataAccessException {
                Set<String> keysTmp = new HashSet<>();
                Cursor<byte[]> cursor = connection.scan(new ScanOptions.ScanOptionsBuilder().match("1sys_dict:".concat("*")).count(10).build());
                while (cursor.hasNext()) {
                    String key = new String(cursor.next());
                    keysTmp.add(key);
                }
                return keysTmp;
            }
        });
        System.out.println(resultKeys);

评论