redis官网
https://redis.io/docs/reference/
redis命令
https://redis.io/commands/unlink/
工作中实际使用总结
设置多个Key/Value
使用redisTemplate的executePipelined 结合SessionCallback,来完成设置多个Key/Value
Map<String, Object> keyValueMap = new HashMap<>();
for (SignalRDTO signalRDTO : dataList) {
keyValueMap.put(CacheConstants.VALUE_CHANGED_MONITOR + boxNo + CacheConstants.COLON_STR + signalRDTO.getId(), 1);
}
redisCache.batchSetCacheObject(keyValueMap, CacheConstants.VALUE_CHANGED_MONITOR_EXPIRE, TimeUnit.MINUTES);
其中,batchSetCacheObject如下:
/**
* 批量设置key 和 value
* @param keyValueMap
* @param timeout
* @param timeUnit
*/
public void batchSetCacheObject(Map<String,Object> keyValueMap, long timeout, TimeUnit timeUnit) {
redisTemplate.executePipelined(new SessionCallback<Object>() {
@Override
public Object execute(RedisOperations operations) throws DataAccessException {
for (Map.Entry<String, Object> entry : keyValueMap.entrySet()) {
operations.opsForValue().set(entry.getKey(), entry.getValue(), timeout, timeUnit);
}
return null;
}
});
}
分布式锁
获取锁使用setIfAbsent方法,外加自旋思想
释放锁,使用lua脚本
@Component
public class RedisDistributeLock {
private static final Logger LOGGER = LoggerFactory.getLogger(RedisDistributeLock.class);
private static final String LOCK_PREFIX = "distribute:lock:";
/**
* KEYS[1]指的就是lockKey,ARGV[1]指第一个参数lockKeyValue,这里用于确定当前锁是否是当前进程的锁
*
* if redis.call('get',KEYS[1])==ARGV[1]
* then
* return redis.call('del',KEYS[1])
* else
* return 0
* end
*
*/
private static final String RELEASE_LOCK_SCRIPT = "if redis.call('get',KEYS[1])==ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end";
@Autowired
public RedisTemplate redisTemplate;
/**
* 获取锁
*
* @param key
* @param value
* @param holdLockMaxTimeInSeconds 持有锁的最长时间,超过这个时间,自动释放锁 单位:秒
* @param waitingLockTimeInSeconds 等待锁的时间 单位:秒
*/
public boolean obtainLock(String key, String value, long holdLockMaxTimeInSeconds, long waitingLockTimeInSeconds) {
// 自旋次数 自旋间隔:100ms
long spinCount = waitingLockTimeInSeconds * (1000 / 100);
long failCount = 0;
while (failCount < spinCount) {
Boolean lockSuccess = redisTemplate.opsForValue().setIfAbsent(LOCK_PREFIX + key, value, holdLockMaxTimeInSeconds, TimeUnit.SECONDS);
if (lockSuccess) {
return true;
}
//
try {
failCount++;
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
LOGGER.info("等待获取锁过程中,被中断,key:{},holdLockMaxTimeInSeconds:{},waitingLockTimeInSeconds:{}", key, holdLockMaxTimeInSeconds, waitingLockTimeInSeconds);
return false;
}
}
//自旋多次后,仍然没有获取到锁,直接退出
return false;
}
/**
* 释放锁
*
* @param key
* @param value
*/
public void releaseLock(String key, String value) {
DefaultRedisScript<Long> defaultRedisScript = new DefaultRedisScript<>(RELEASE_LOCK_SCRIPT);
defaultRedisScript.setResultType(Long.class);
redisTemplate.execute(defaultRedisScript, Arrays.asList(LOCK_PREFIX + key), value);
}
}
使用方式如下:
String lockVal = UUID.randomUUID().toString();
long curMs = DateTimeUtils.getMillSecondByLocalDateTime(LocalDateTime.now());
try {
boolean lockSuccess = distributeLock.obtainLock(LOCK_KEY, lockVal, 5, 15);
if (!lockSuccess) {
LOGGER.error("值稳定的监控点,拉取实时值,获取分布式锁失败");
return null;
}
// 为了防止ip被屏蔽,这里,需要让程序被阻塞2s
TimeUnit.SECONDS.sleep(2);
CtUtil ctUtil = new CtUtil();
curMs = DateTimeUtils.getMillSecondByLocalDateTime(LocalDateTime.now());
monitorValueList = monitorImpl.getMonitorValue(getMonitorValueReqDTO);
LOGGER.info("getMonitorValueCt:{}", ctUtil.cost());
} catch (IOException | InterruptedException e) {
LOGGER.error("查询监控点值,occur exception:{},monitorIds:{}", ExceptionUtils.getStackTrace(e), JSON.toJSONString(monitorIds));
} finally {
distributeLock.releaseLock(LOCK_KEY, lockVal);
}
可以看到,虽然只是获取锁,但是传入了一个key 和 一个value,其中key就是分布式锁,而value的作用是什么呢?
value的作用是:当我们释放锁时,我们需要拿着这个value和 redis中这个key,对应的value进行比较,如果相同,才能删除这个key,释放锁;如果不同,那么就不能删除这个key。原因是:防止A进程释放了B进程的锁。
场景:A进程加锁,执行A的业务逻辑,花了10s,等到第6s时,其实这个key已经过期了,被自动删除了,所以B进程能够成功加锁,所以第6s时,此时的锁,应该属于B进程。
但是等到第10s时,如果A进程直接删除这个key,那么就表示,A进程释放了B进程的锁,那么这个在逻辑上是错误的。因为只有B进程,才能释放B进程的锁,否则,会做成数据混乱。
根据前缀,获取匹配的所有key
不能使用keys命令,应该使用scan命令
Set<String> resultKeys = (Set<String>)redisTemplate.execute(new RedisCallback<Set<String>>() {
@Override
public Set<String> doInRedis(RedisConnection connection) throws DataAccessException {
Set<String> keysTmp = new HashSet<>();
Cursor<byte[]> cursor = connection.scan(new ScanOptions.ScanOptionsBuilder().match("1sys_dict:".concat("*")).count(10).build());
while (cursor.hasNext()) {
String key = new String(cursor.next());
keysTmp.add(key);
}
return keysTmp;
}
});
System.out.println(resultKeys);