一个Redis实现的分布式锁

2021年11月23日 阅读数:3
这篇文章主要向大家介绍一个Redis实现的分布式锁,主要内容包括基础应用、实用技巧、原理机制等方面,希望对大家有所帮助。


 

 



import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.core.RedisTemplate;

import java.util.Random;
import java.util.concurrent.TimeUnit;

public class RedisLock implements AutoCloseable {
private static final Logger LOGGER = LoggerFactory.getLogger(RedisLock.class);
public static final String REDIS_LOCK = "RedisLock:";


private static final long DEFAULT_WAIT_LOCK_TIME_OUT = 60;//60s 有慢sql,超时时间设置长一点
private static final long DEFAULT_EXPIRE = 80;//80s 有慢sql,超时时间设置长一点
private String key;
private RedisTemplate redisTemplate;

public RedisLock(RedisTemplate redisTemplate,String key) {
this.redisTemplate = redisTemplate;
this.key = key;
}

/**
* 等待锁的时间,单位为s
*
* @param key
* @param timeout s
* @param seconds
*/
public boolean lock(String key, long timeout, TimeUnit seconds) {
String lockKey = generateLockKey(key);
long nanoWaitForLock = seconds.toNanos(timeout);
long start = System.nanoTime();

try {
while ((System.nanoTime() - start) < nanoWaitForLock) {
if (redisTemplate.getConnectionFactory().getConnection().setNX(lockKey.getBytes(), new byte[0])) {
redisTemplate.expire(lockKey, DEFAULT_EXPIRE, TimeUnit.SECONDS);//暂设置为80s过时,防止异常中断锁未释放
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("add RedisLock[{}].{}", key, Thread.currentThread());
}
return true;
}
TimeUnit.MILLISECONDS.sleep(1000 + new Random().nextInt(100));//加随机时间防止活锁
}
} catch (Exception e) {
LOGGER.error("{}", e.getMessage(), e);
unlock();
}
return false;
}

public void unlock() {
try {
String lockKey = generateLockKey(key);
RedisConnection connection = redisTemplate.getConnectionFactory().getConnection();
connection.del(lockKey.getBytes());
connection.del(key.getBytes());
connection.close();
} catch (Exception e) {
LOGGER.error("{}", e.getMessage(), e);
}
}

private String generateLockKey(String key) {
return String.format(REDIS_LOCK + "%s", key);
}

public boolean lock() {
return lock(key, DEFAULT_WAIT_LOCK_TIME_OUT, TimeUnit.SECONDS);
}

@Override
public void close(){
try {
String lockKey = generateLockKey(key);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("release RedisLock[" + lockKey + "].");
}
RedisConnection connection = redisTemplate.getConnectionFactory().getConnection();
connection.del(lockKey.getBytes());
connection.close();
} catch (Exception e) {
LOGGER.error("{}", e.getMessage(), e);
}
}
}


 

 

在高并发的使用场景下,如何让redis里的数据尽可能保持一致,能够采用分布式锁。以分布式锁的方式来保证对临界资源的互斥读写。html

   redis使用缓存做为分布式锁,性能很是强劲,在一些不错的硬件上,redis能够每秒执行10w次,内网延迟不超过1ms,足够知足绝大部分应用的锁定需求。java

   redis经常使用的分布式锁的实现方式:redis

1、setbit / getbitspring

   用索引号为0的第一个比特位来表示锁定状态,其中:0表示未得到锁,1表示已得到锁。sql

   优点:简单;缓存

   劣势:竞态条件(race condition),死锁。并发

   得到锁的过程至少须要两步:先getbit判断,后setbit上锁。因为不是原子操做,所以可能存在竞态条件;若是一个客户端使用setbit获取到锁,而后没来得及释放crash掉了,那么其余在等待的客户端将永远没法得到该锁,进而造成了死锁。因此这种形式不太适合实现分布式锁。dom

2、setnx / del / getset分布式

  redis官网有一篇文章专门谈论了实现分布式锁的话题。基本的原则是:采用setnx尝试获取锁并判断是否得到了锁,setnx设置的值是它想占用锁的时间(预估):ide

一个Redis实现的分布式锁_java

一个Redis实现的分布式锁_客户端_02

  • 如返回1,则该客户端得到锁,把lock.foo的键值设置为时间值表示该键已被锁定,该客户端最后能够经过DEL lock.foo来释放该锁。
  • 如返回0,代表该锁已被其余客户端取得,这时咱们能够先返回或进行重试等对方完成或等待锁超时。

  经过del删除key来释放锁。某个想得到锁的客户端,先采用setnx尝试获取锁,若是获取失败了,那么会经过get命令来得到锁的过时时间以判断该锁的占用是否过时。若是跟当前时间对比,发现过时,那么先执行del,而后执行setnx获取锁。若是整个流程就这样,可能会产生死锁,请参考下面的执行序列:

一个Redis实现的分布式锁_客户端_03

   因此,在高并发的场景下,若是检测到锁过时,不能简单地进行del并尝试经过setnx得到锁。咱们能够经过getset命令来避免这个问题。来看看,若是存在一个用户user4,它经过调用getset命令如何避免这种状况的发生:

一个Redis实现的分布式锁_Redis  分布式锁_04

 getset设置的过时时间跟上面的setnx设置的相同:

一个Redis实现的分布式锁_Redis  分布式锁_05

一个Redis实现的分布式锁_Redis  分布式锁_06

   若是该命令返回的结果跟上一步经过get得到的过时时间一致,则说明这两步之间,没有新的客户端抢占了锁,则该客户端即得到锁。若是该命令返回的结果跟上一步经过get得到的过时时间不一致,则该锁可能已被其余客户端抢先得到,则本次获取锁失败。

   这种实现方式得益于getset命令的原子性,从而有效得避免了竞态条件。而且,经过将比对锁的过时时间做为获取锁逻辑的一部分,从而避免了死锁。

3、setnx / del / expire

   这是使用最多的实现方式:setnx的目的同上,用来实现尝试获取锁以及判断是否获取到锁的原子性,del删除key来释放锁,与上面不一样的是,使用redis自带的expire命令来防止死锁(可能出现某个客户端得到了锁,可是crash了,永不释放致使死锁)。这算是一种比较简单但粗暴的实现方式:由于,无论实际的状况如何,当你设置expire以后,它必定会在那个时间点删除key。如何当时某个客户端已得到了锁,正在执行临界区内的代码,但执行时间超过了expire的时间,将会致使另外一个正在竞争该锁的客户端也得到了该锁,这个问题下面还会谈到。

  咱们来看一下宿舍锁的简单实现很简单:

一个Redis实现的分布式锁_死锁_07

经过一个while(true),在当前线程上进行阻塞等待,并经过一个计数器进行自减操做,防止永久等待。 

http://www.cnblogs.com/moonandstar08/p/5682822.html

 

多节点的部署中,对锁的控制,参考:

​http://www.jeffkit.info/2011/07/1000/​

直接贴上代码实现,同上一篇文章同样,都是基于AOP

定义注解,标志切入点:



package com.ns.annotation;

import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
public @interface RedisLock {
/**
* redis的key
* @return
*/
String value();
/**
* 持锁时间,单位毫秒,默认一分钟
*/
long keepMills() default 60000;
/**
* 当获取失败时候动做
*/
LockFailAction action() default LockFailAction.GIVEUP;

public enum LockFailAction{
/**
* 放弃
*/
GIVEUP,
/**
* 继续
*/
CONTINUE;
}
/**
* 睡眠时间,设置GIVEUP忽略此项
* @return
*/
long sleepMills() default 1000;
}


 

切面实现:



package com.redis.aop;
import java.lang.reflect.Method;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import com.ns.annotation.RedisLock;
import com.ns.annotation.RedisLock.LockFailAction;
import com.ns.redis.dao.base.BaseRedisDao;
@Aspect
public class RedisLockAspect extends BaseRedisDao<String, Long>{
private static final Logger log = LoggerFactory.getLogger(RedisLockAspect.class);
//execution(* com.ns..*(*,..)) and @within(com.ns.annotation.RedisLock)

@Pointcut("execution(* com.ns..*(..)) && @annotation(com.ns.annotation.RedisLock)")
private void lockPoint(){}
@Around("lockPoint()")
public Object arround(ProceedingJoinPoint pjp) throws Throwable{
MethodSignature methodSignature = (MethodSignature) pjp.getSignature();
Method method = methodSignature.getMethod();
RedisLock lockInfo = method.getAnnotation(RedisLock.class);
boolean lock = false;
Object obj = null;
while(!lock){
long timestamp = System.currentTimeMillis()+lockInfo.keepMills();
lock = setNX(lockInfo.value(), timestamp);
//获得锁,已过时而且成功设置后旧的时间戳依然是过时的,能够认为获取到了锁(成功设置防止锁竞争)
long now = System.currentTimeMillis();
if(lock || ((now > getLock(lockInfo.value())) && (now > getSet(lockInfo.value(), timestamp)))){
//获得锁,执行方法,释放锁
log.info("获得锁...");
obj = pjp.proceed();
//不加这一行,对于只能执行一次的定时任务,时间差上不能保证另外一个必定正好放弃
if(lockInfo.action().equals(LockFailAction.CONTINUE)){
delete(lockInfo.value());
}
}else{
if(lockInfo.action().equals(LockFailAction.CONTINUE)){
log.info("稍后从新请求锁...");
Thread.currentThread().sleep(lockInfo.sleepMills());
}else{
log.info("放弃锁...");
break;
}
}
}
return obj;
}
public boolean setNX(String key,Long value){
return valueOperations.setIfAbsent(key, value);
}
public long getLock(String key){
return valueOperations.get(key);
}
public Long getSet(String key,Long value){
return valueOperations.getAndSet(key, value);
}
public void releaseLock(String key){
delete(key);
}
}


Python的一个实现



LOCK_TIMEOUT = 3
lock = 0
lock_timeout = 0
lock_key = 'lock.foo'

# 获取锁
while lock != 1:
now = int(time.time())
lock_timeout = now + LOCK_TIMEOUT + 1
lock = redis_client.setnx(lock_key, lock_timeout)
if lock == 1 or (now > int(redis_client.get(lock_key))) and now > int(redis_client.getset(lock_key, lock_timeout)):
break
else:
time.sleep(0.001)

# 已得到锁
do_job()

# 释放锁
now = int(time.time())
if now < lock_timeout:
redis_client.delete(lock_key)

 

以上有些代码只符合我如今的项目场景,根据实际须要进行调整