# DistributedLock
**Repository Path**: paultest/distributed-lock
## Basic Information
- **Project Name**: DistributedLock
- **Description**: 分布式锁的几种实现方式
- **Primary Language**: Unknown
- **License**: Not specified
- **Default Branch**: master
- **Homepage**: None
- **GVP Project**: No
## Statistics
- **Stars**: 6
- **Forks**: 8
- **Created**: 2021-03-03
- **Last Updated**: 2025-08-08
## Categories & Tags
**Categories**: Uncategorized
**Tags**: None
## README
可参考博客:[三种分布式锁的实现](https://blog.csdn.net/jiandanokok/article/details/114296755)
## 0x00 概述
随着互联网技术的不断发展,用户量的不断增加,越来越多的业务场景需要用到分布式系统。
分布式系统有一个著名的理论[CAP](https://baike.baidu.com/item/CAP%E5%8E%9F%E5%88%99/5712863?fr=aladdin),指在一个分布式系统中,**最多只能同时满足下面三项中的两项**:
* 一致性(Consistency):在[分布式系统](https://baike.baidu.com/item/分布式系统/4905336)中的所有数据备份,在同一时刻是否同样的值(等同于所有节点访问同一份最新的数据副本)
* 可用性(Availability):保证每个请求不管成功或者失败都有响应
* 分区容错性(Partition tolerance):系统中任意信息的丢失或失败不会影响系统的继续运作
所以在设计系统时,往往需要权衡,在CAP中作选择,要么AP,要么CP、要么AC。
当然,这个理论也并不一定完美,不同系统对CAP的要求级别不一样,选择需要考虑方方面面。
而在分布式系统中访问共享资源就需要一种互斥机制,来防止彼此之间的互相干扰,以保证一致性,这个时候就需要使用分布式锁。
分布式锁:
当在分布式模型下,数据只有一份(或有限制),此时需要利用锁技术来控制某一时刻修改数据的进程数。这种锁即为分布式锁。
为了保证一个方法或属性在高并发情况下的同一时间只能被同一个线程执行,在传统单体应用单机部署的情况下,可以使用并发处理相关的功能进行互斥控制。但是,随着业务发展的需要,原单体单机部署的系统被演化成分布式集群系统后,由于分布式系统多线程、多进程并且分布在不同机器上,这将使原单机部署情况下的并发控制锁策略失效,单纯的应用并不能提供分布式锁的能力。为了解决这个问题就需要一种跨机器的互斥机制来控制共享资源的访问,这就是分布式锁要解决的问题!
分布式锁应该具备哪些条件:
* 互斥性:在分布式系统环境下,一个方法在同一时间只能被一个机器的一个线程执行;
* 高可用的获取锁与释放锁;
* 高性能的获取锁与释放锁;
* 可重入性:具备可重入特性,具备锁失效机制,防止死锁,即就算一个客户端持有锁的期间崩溃而没有主动释放锁,也需要保证后续其他客户端能够加锁成功
* 非阻塞:具备非阻塞锁特性,即没有获取到锁将直接返回获取锁失败
分布式锁的业务场景:
* 互联网秒杀(商品库存)
* 抢优惠券
## 0x02 实现方式
分布式锁主要有几种实现方式:
* 基于数据库实现
* 基于Zookeeper实现
* 基于Redis实现
* 其他
* Chubby:谷歌公司实现的粗粒度分布式锁服务,底层使用了Paxos一致性算法
* Tair:淘宝的分布式Key/Value存储系统,主要是使用Tair的put()方法,原理和Redis类似
* Memcached:利用Memcached的add命令,此命令是原子性操作,只有在`key`不存在的情况下才能add成功,也就意味着加锁成功
如图:

## 0x03 分布式锁:基于数据库
#### 1. 实现思想
主要有两种方式:
* 悲观锁
* 乐观锁
##### A. 悲观锁(排他锁)
利用`select … where xx=yy for update`排他锁
注意:这里需要注意的是`where xx=yy`,xx字段**必须要走索引,否则会锁表**。有些情况下,比如表不大,mysql优化器会不走这个索引,导致锁表问题。
核心思想:以「悲观的心态」操作资源,无法获得锁成功,就一直阻塞着等待。
注意:该方式有很多缺陷,**一般不建议使用**。
实现:
创建一张资源锁表:
```sql
CREATE TABLE `resource_lock` (
`id` int(4) NOT NULL AUTO_INCREMENT COMMENT '主键',
`resource_name` varchar(64) NOT NULL DEFAULT '' COMMENT '锁定的资源名',
`owner` varchar(64) NOT NULL DEFAULT '' COMMENT '锁拥有者',
`desc` varchar(1024) NOT NULL DEFAULT '备注信息',
`update_time` timestamp NOT NULL DEFAULT '' COMMENT '保存数据时间,自动生成',
PRIMARY KEY (`id`),
UNIQUE KEY `uidx_resource_name` (`resource_name `) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='锁定中的资源';
```
注意:resource_name 锁资源名称必须有唯一索引
使用事务查询更新:
```java
@Transaction
public void lock(String name) {
ResourceLock rlock = exeSql("select * from resource_lock where resource_name = name for update");
if (rlock == null) {
exeSql("insert into resource_lock(reosurce_name,owner,count) values (name, 'ip',0)");
}
}
```
使用 `for update` 锁定的资源。如果执行成功,会立即返回,执行插入数据库,后续再执行一些其他业务逻辑,直到事务提交,执行结束;如果执行失败,就会一直阻塞着。
可以在数据库客户端工具上测试出来这个效果,当在一个终端执行了 for update,不提交事务。在另外的终端上执行相同条件的 for update,会一直卡着
虽然也能实现分布式锁的效果,但是会存在性能瓶颈。
**优点:**
简单易用,好理解,保障数据强一致性。
**缺点**:
1)在 RR 事务级别,select 的 for update 操作是基于`间隙锁(gap lock)` 实现的,是一种悲观锁的实现方式,所以存在`阻塞问题`。
2)高并发情况下,大量请求进来,会导致大部分请求进行排队,影响数据库稳定性,也会`耗费`服务的CPU等`资源`。
当获得锁的客户端等待时间过长时,会提示:
```
[40001][1205] Lock wait timeout exceeded; try restarting transaction
```
高并发情况下,也会造成占用过多的应用线程,导致业务无法正常响应。
3)如果优先获得锁的线程因为某些原因,一直没有释放掉锁,可能会导致`死锁`的发生。
4)锁的长时间不释放,会一直占用数据库连接,可能会将`数据库连接池撑爆`,影响其他服务。
5)MySql数据库会做查询优化,即便使用了索引,优化时发现全表扫效率更高,则可能会将行锁升级为表锁,此时可能就更悲剧了。
6)不支持可重入特性,并且超时等待时间是全局的,不能随便改动。
##### B. 乐观锁
所谓乐观锁与悲观锁最大区别在于**基于CAS思想**,表中添加一个时间戳或者是版本号的字段来实现,`update xx set version=new_version where xx=yy and version=Old_version`,通过增加递增的版本号字段实现乐观锁。
不具有互斥性,不会产生锁等待而消耗资源,操作过程中认为不存在并发冲突,只有update version失败后才能觉察到。
抢购、秒杀就是用了这种实现以防止超卖。
如下图:

实现:
创建一张资源锁表:
```sql
CREATE TABLE `resource` (
`id` int(4) NOT NULL AUTO_INCREMENT COMMENT '主键',
`resource_name` varchar(64) NOT NULL DEFAULT '' COMMENT '资源名',
`share` varchar(64) NOT NULL DEFAULT '' COMMENT '状态',
`version` int(4) NOT NULL DEFAULT '' COMMENT '版本号',
`desc` varchar(1024) NOT NULL DEFAULT '备注信息',
`update_time` timestamp NOT NULL DEFAULT '' COMMENT '保存数据时间,自动生成',
PRIMARY KEY (`id`),
UNIQUE KEY `uidx_resource_name` (`resource_name `) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='资源';
```
为表添加一个字段,版本号或者时间戳都可以。通过版本号或者时间戳,来保证多线程同时间操作共享资源的有序性和正确性。
伪代码实现:
```java
Resrouce resource = exeSql("select * from resource where resource_name = xxx");
boolean succ = exeSql("update resource set version= 'newVersion' ... where resource_name = xxx and version = 'oldVersion'");
if (!succ) {
// 发起重试
}
```
实际代码中可以写个while循环不断重试,版本号不一致,更新失败,重新获取新的版本号,直到更新成功。
#### 2. 优缺点
优点:
* 实现简单,复杂度低
* 保障数据一致性
缺点:
* 性能低,并且有锁表的风险
* 可靠性差
* 非阻塞操作失败后,需要轮询,占用CPU资源
* 长时间不commit或者是长时间轮询,可能会占用较多的连接资源
## 0x04 分布式锁:基于Zookeeper
#### 1. 实现思想
ZooKeeper是一个为分布式应用提供一致性服务的开源组件,它内部是一个分层的文件系统目录树结构,规定同一个目录下只能有一个唯一文件名。
基于ZooKeeper实现分布式锁的步骤如下:
1. 创建一个目录mylock;
2. 线程A想获取锁就在mylock目录下创建临时顺序节点;
3. 获取mylock目录下所有的子节点,然后获取比自己小的兄弟节点,如果不存在,则说明当前线程顺序号最小,获得锁;
4. 线程B获取所有节点,判断自己不是最小节点,设置监听比自己次小的节点;
5. 线程A处理完,删除自己的节点,线程B监听到变更事件,判断自己是不是最小的节点,如果是则获得锁。
整个过程如图:

业界推荐直接使用Apache的开源库Curator,它是一个ZooKeeper客户端,Curator提供的InterProcessMutex是分布式锁的实现,acquire方法用于获取锁,release方法用于释放锁。
使用方式很简单:
```java
InterProcessMutex interProcessMutex = new InterProcessMutex(client,"/anyLock");
interProcessMutex.acquire();
interProcessMutex.release();
```
其实现分布式锁的核心源码如下:
```java
private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception
{
boolean haveTheLock = false;
boolean doDelete = false;
try {
if ( revocable.get() != null ) {
client.getData().usingWatcher(revocableWatcher).forPath(ourPath);
}
while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock ) {
// 获取当前所有节点排序后的集合
List children = getSortedChildren();
// 获取当前节点的名称
String sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash
// 判断当前节点是否是最小的节点
PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
if ( predicateResults.getsTheLock() ) {
// 获取到锁
haveTheLock = true;
} else {
// 没获取到锁,对当前节点的上一个节点注册一个监听器
String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();
synchronized(this){
Stat stat = client.checkExists().usingWatcher(watcher).forPath(previousSequencePath);
if ( stat != null ){
if ( millisToWait != null ){
millisToWait -= (System.currentTimeMillis() - startMillis);
startMillis = System.currentTimeMillis();
if ( millisToWait <= 0 ){
doDelete = true; // timed out - delete our node
break;
}
wait(millisToWait);
}else{
wait();
}
}
}
// else it may have been deleted (i.e. lock released). Try to acquire again
}
}
}
catch ( Exception e ) {
doDelete = true;
throw e;
} finally{
if ( doDelete ){
deleteOurPath(ourPath);
}
}
return haveTheLock;
}
```
其实 Curator 实现分布式锁的底层原理和上面分析的是差不多的。如图详细描述其原理:

另外,可基于Zookeeper自身的特性和原生Zookeeper API自行实现分布式锁。
#### 2. 优缺点
优点:
* 可靠性非常高
* 性能较好
* CAP模型属于CP,基于ZAB一致性算法实现
缺点:
* 性能并不如Redis(主要原因是在写操作,即获取锁释放锁都需要在Leader上执行,然后同步到follower)
* 实现复杂度高
## 0x05 分布式锁:基于Redis
#### 1. 实现思想
主要是基于命令:`SETNX key value`
命令官方文档:[https://redis.io/commands/setnx](https://redis.io/commands/setnx)
用法可参考:[Redis命令参考](http://redisdoc.com/string/setnx.html)
如图:

实现思想的具体步骤:
1. 获取锁的时候,使用setnx加锁,并使用expire命令为锁添加一个超时时间,超过该时间则自动释放锁,锁的value值为一个随机生成的UUID,通过此在释放锁的时候进行判断。
2. 获取锁的时候还设置一个获取的超时时间,若超过这个时间则放弃获取锁。
3. 释放锁的时候,通过UUID判断是不是该锁,若是该锁,则执行delete进行锁释放。
具体的分布式锁的实现可参考后面内容
#### 2. 优缺点
优点:
* 性能非常高
* 可靠性较高
* CAP模型属于AP
缺点:
* 复杂度较高
* 无一致性算法,可靠性并不如Zookeeper
* 锁删除失败 过期时间不好控制
* 非阻塞,获取失败后,需要轮询不断尝试获取锁,比较消耗性能,占用cpu资源
## 0x06 分布式锁对比
* 从理解的难易程度角度(从低到高):数据库 > 缓存 > Zookeeper
* 从实现的复杂性角度(从低到高):Zookeeper >= 缓存 > 数据库
* 从性能角度(从高到低):缓存 > Zookeeper >= 数据库
* 从可靠性角度(从高到低):Zookeeper > 缓存 > 数据库
## 0x07 Redis分布式锁实现
下面以减库存接口为例子,访问接口的时候自动减商品的库存
### 一、方案一
```java
@Service
public class RedisLockDemo {
@Autowired
private StringRedisTemplate redisTemplate;
public String deduceStock() {
ValueOperations valueOperations = redisTemplate.opsForValue();
//获取redis中的库存
int stock = Integer.valueOf(valueOperations.get("stock"));
if (stock > 0) {
int newStock = stock - 1;
valueOperations.set("stock", newStock + "");
System.out.println("扣减库存成功, 剩余库存:" + newStock);
} else {
System.out.println("库存已经为0,不能继续扣减");
}
return "success";
}
}
```
表示:
* 先从Redis中读取stock的值,表示商品的库存
* 判断商品库存是否大于0,如果大于0,则库存减1,然后再保存到Redis里面去,否则就报错
#### 1. 改进
方案一这种简单的从Redis读取、判断值再减1保存到Redis的操作,很容易在**并发**场景下出问题:
* 商品超卖
比如:
假设商品的库存有50个,有3个用户同时访问该接口,先是同时读取Redis中商品的库存值,即都是读取到了50,即同时执行到了这一行:
```java
int stock = Integer.valueOf(valueOperations.get("stock"));
```
然后减1,即到了这一行:
```java
int newStock = stock - 1;
```
此时3个用户的realStock都是49,然后3个用户都去设置stock为49,那么就会产生库存明明被3个用户抢了,理论上是应该减去3的,结果库存数只减去了1导致商品超卖。
这种问题的产生原因是因为读取库存、减库存、保存到Redis这几步并不是**原子操作**
那么可以使用**加并发锁**`synchronized`来解决:
```java
@Service
public class RedisLockDemo {
@Autowired
private StringRedisTemplate redisTemplate;
public String deduceStock() {
ValueOperations valueOperations = redisTemplate.opsForValue();
synchronized (this) {
//获取redis中的库存
int stock = Integer.valueOf(valueOperations.get("stock"));
if (stock > 0) {
int newStock = stock - 1;
valueOperations.set("stock", newStock + "");
System.out.println("扣减库存成功, 剩余库存:" + newStock);
} else {
System.out.println("库存已经为0,不能继续扣减");
}
}
return "success";
}
}
```
*注意:在Java中关键字**synchronized**可以保证在同一时刻,只有一个线程可以执行某个方法或某个代码块。*
#### 2. 再改进
以上的代码在单体模式下并没太大问题,但是在分布式或集群架构环境下存在问题,比如架构如下:

在分布式或集群架构下,**synchronized**只能保证当前的主机在同一时刻只能有一个线程执行减库存操作,但如图同时有多个请求过来访问的时候,**不同主机在同一时刻依然是可以访问减库存接口的**,这就导致问题1(商品超卖)在集群架构下依然存在。
*注意:可以使用**JMeter**来模拟出高并发场景下访问Nginx来测试触发上面的问题*
**解决方法**
使用如下的**分布式锁**进行解决
***注意:方案一并不能称之为分布式锁的***
### 二、方案二
分布式锁的简单实现如图:

代码实现如下:
```java
@Service
public class RedisLockDemo {
@Autowired
private StringRedisTemplate redisTemplate;
public String deduceStock() {
ValueOperations valueOperations = redisTemplate.opsForValue();
String lockKey = "product_001";
//加锁: setnx
Boolean isSuccess = valueOperations.setIfAbsent(lockKey, "1");
if(null == isSuccess || isSuccess) {
System.out.println("服务器繁忙, 请稍后重试");
return "error";
}
//------ 执行业务逻辑 ----start------
int stock = Integer.valueOf(valueOperations.get("stock"));
if (stock > 0) {
int newStock = stock - 1;
//执行业务操作减库存
valueOperations.set("stock", newStock + "");
System.out.println("扣减库存成功, 剩余库存:" + newStock);
} else {
System.out.println("库存已经为0,不能继续扣减");
}
//------ 执行业务逻辑 ----end------
//释放锁
redisTemplate.delete(lockKey);
return "success";
}
}
```
其实就是对每一个商品加一把锁,代码里面是product_001
* 使用setnx对商品进行加锁
* 如成功说明加锁成功,如失败说明有其他请求抢占了该商品的锁,则当前请求失败退出
* 加锁成功之后进行扣减库存操作
* 删除商品锁
#### 1. 改进1
上面的方式是有可能会造成死锁的,比如说加锁成功之后,扣减库存的逻辑可能抛异常了,即并不会执行到释放锁的逻辑,那么该商品锁是一直没有释放,会成为死锁的,其他请求完全无法扣减该商品的
使用`try...catch...finally`的方式可以解决抛异常的问题,如下:
```java
@Service
public class RedisLockDemo {
@Autowired
private StringRedisTemplate redisTemplate;
public String deduceStock() {
ValueOperations valueOperations = redisTemplate.opsForValue();
String lockKey = "product_001";
try {
//加锁: setnx
Boolean isSuccess = valueOperations.setIfAbsent(lockKey, "1");
if(null == isSuccess || isSuccess) {
System.out.println("服务器繁忙, 请稍后重试");
return "error";
}
//------ 执行业务逻辑 ----start------
int stock = Integer.valueOf(valueOperations.get("stock"));
if (stock > 0) {
int newStock = stock - 1;
//执行业务操作减库存
valueOperations.set("stock", newStock + "");
System.out.println("扣减库存成功, 剩余库存:" + newStock);
} else {
System.out.println("库存已经为0,不能继续扣减");
}
//------ 执行业务逻辑 ----end------
} finally {
//释放锁
redisTemplate.delete(lockKey);
}
return "success";
}
}
```
把释放锁的逻辑放到`finally`里面去,即不管`try`里面的逻辑最终是成功还是失败都会执行释放锁的逻辑
#### 2. 改进2
那么上面的方式是不是能够解决死锁的问题呢?
其实不然,除了抛异常之外,比如程序崩溃、服务器宕机、服务器重启、请求超时被终止、发布、人为kill等都有可能导致释放锁的逻辑没有执行,比如对商品加分布式锁成功之后,在扣减库存的时候服务器正在执行重启,会导致没有执行释放锁。
可以通过对锁设置超时时间来防止死锁的发生,使用Redis的`expire`命令可以对key进行设置超时时间,如图:

代码实现如下:
```java
@Service
public class RedisLockDemo {
@Autowired
private StringRedisTemplate redisTemplate;
public String deduceStock() {
ValueOperations valueOperations = redisTemplate.opsForValue();
String lockKey = "product_001";
try {
//加锁: setnx
Boolean isSuccess = valueOperations.setIfAbsent(lockKey, "1");
//expire增加超时时间
redisTemplate.expire(lockKey, 10, TimeUnit.SECONDS);
if(null == isSuccess || isSuccess) {
System.out.println("服务器繁忙, 请稍后重试");
return "error";
}
//------ 执行业务逻辑 ----start------
int stock = Integer.valueOf(valueOperations.get("stock"));
if (stock > 0) {
int newStock = stock - 1;
//执行业务操作减库存
valueOperations.set("stock", newStock + "");
System.out.println("扣减库存成功, 剩余库存:" + newStock);
} else {
System.out.println("库存已经为0,不能继续扣减");
}
//------ 执行业务逻辑 ----end------
} finally {
//释放锁
redisTemplate.delete(lockKey);
}
return "success";
}
}
```
加锁成功之后,把锁的超时时间设置为10秒,即10秒之后自动会释放锁,避免死锁的发生。
#### 3. 改进3
但是上面的方式同样会产生死锁问题,**加锁和对锁设置超时时间并不是原子操作**,在加锁成功之后,即将执行设置超时时间的时候系统发生崩溃,同样还是会导致死锁。
如图:

对此,有两种做法:
* lua脚本
* set原生命令(Redis 2.6.12版本及以上)
一般是推荐使用set命令,Redis官方在2.6.12版本对set命令增加了NX、EX、PX等参数,即可以将上面的加锁和设置时间放到一条命令上执行,通过set命令即可:
命令官方文档:[https://redis.io/commands/set](https://redis.io/commands/set)
用法可参考:[Redis命令参考](http://redisdoc.com/string/set.html)
如图:

`SET key value NX` 等同于 `SETNX key value`命令,并且可以使用EX参数来设置过期时间
**注意**:其实目前在Redis 2.6.12版本之后,所说的setnx命令,并非单单指Redis的`SETNX key value`命令,一般是代指Redis中对**set命令加上nx参数**进行使用,一般不会直接使用`SETNX key value`命令了
**注意**:Redis2.6.12之前的版本,只能通过**lua脚本**来保证原子性了。
如图:

代码实现如下:
```java
@Service
public class RedisLockDemo {
@Autowired
private StringRedisTemplate redisTemplate;
public String deduceStock() {
ValueOperations valueOperations = redisTemplate.opsForValue();
String lockKey = "product_001";
try {
//加锁: setnx 和 expire增加超时时间
Boolean isSuccess = valueOperations.setIfAbsent(lockKey, "1", 10, TimeUnit.SECONDS);
if(null == isSuccess || isSuccess) {
System.out.println("服务器繁忙, 请稍后重试");
return "error";
}
//------ 执行业务逻辑 ----start------
int stock = Integer.valueOf(valueOperations.get("stock"));
if (stock > 0) {
int newStock = stock - 1;
//执行业务操作减库存
valueOperations.set("stock", newStock + "");
System.out.println("扣减库存成功, 剩余库存:" + newStock);
} else {
System.out.println("库存已经为0,不能继续扣减");
}
//------ 执行业务逻辑 ----end------
} finally {
//释放锁
redisTemplate.delete(lockKey);
}
return "success";
}
}
```
#### 4. 改进4
以上的方式其实还是存在着问题,**在高并发场景下会存在问题**,**超时时间设置不合理**导致的问题
大概的流程图可参考: 
流程:
* 进程A加锁之后,**扣减库存的时间超过设置的超时时间**,这里设置的锁是10秒
* 在第10秒的时候由于时间到期了所以进程A设置的锁被Redis释放了(T5)
* 刚好进程B请求进来了,加锁成功(T6)
* 进程A操作完成(扣减库存)之后,**把进程B设置的锁给释放了**
* 刚好进程C请求进来了,加锁成功
* 进程B操作完成之后,也把进程C设置的锁给释放了
* 以此类推...
解决方法也很简单:
* 加锁的时候,把值设置为唯一值,比如说UUID这种随机数
* 释放锁的时候,获取锁的值判断value是不是当前进程设置的唯一值,如果是再去删除
如图:

代码如下:
```java
@Service
public class RedisLockDemo {
@Autowired
private StringRedisTemplate redisTemplate;
public String deduceStock() {
ValueOperations valueOperations = redisTemplate.opsForValue();
String lockKey = "product_001";
String clientId = UUID.randomUUID().toString();
try {
//加锁: setnx 和 expire增加超时时间
Boolean isSuccess = valueOperations.setIfAbsent(lockKey, clientId, 10, TimeUnit.SECONDS);
if(null == isSuccess || isSuccess) {
System.out.println("服务器繁忙, 请稍后重试");
return "error";
}
//------ 执行业务逻辑 ----start------
int stock = Integer.valueOf(valueOperations.get("stock"));
if (stock > 0) {
int newStock = stock - 1;
//执行业务操作减库存
valueOperations.set("stock", newStock + "");
System.out.println("扣减库存成功, 剩余库存:" + newStock);
} else {
System.out.println("库存已经为0,不能继续扣减");
}
//------ 执行业务逻辑 ----end------
} finally {
if (clientId.equals(valueOperations.get(lockKey))) {
//释放锁
redisTemplate.delete(lockKey);
}
}
return "success";
}
}
```
#### 5. 改进5
上面的方式其实存在一个明显的问题,就是在finally代码块中,释放锁的时候,**get和del并非原子操作**,存在进程安全问题。
那么删除锁的正确姿势是使用**lua脚本**,通过redis的**eval/evalsha**命令来运行:
```lua
-- lua删除锁:
-- KEYS和ARGV分别是以集合方式传入的参数,对应上文的Test和uuid。
-- 如果对应的value等于传入的uuid。
if redis.call('get', KEYS[1]) == ARGV[1]
then
-- 执行删除操作
return redis.call('del', KEYS[1])
else
-- 不成功,返回0
return 0
end
```
通俗一点的说,即lua脚本能够保证原子性,在lua脚本里执行是一个命令(eval/evalsha)去执行的,一条命令没有执行完,其他客户端是看不到的。
到此,基本上**Redis的分布式锁的实现思想**如下:
* 获取锁的时候,使用setnx加锁,并使用expire命令为锁添加一个超时时间,超过该时间则自动释放锁,锁的value值为一个随机生成的UUID,通过此在释放锁的时候进行判断。
* 获取锁的时候还设置一个获取的超时时间,若超过这个时间则放弃获取锁。
* 释放锁的时候,通过UUID判断是不是该锁,若是该锁,则执行delete进行锁释放。
#### 6. 改进6
虽然通过上面的方式解决了会删除其他进程的锁的问题,但是超时时间的设置依然是没有解决的,设置成多少依然是个比较棘手的问题,设置少了容易导致业务没有执行完锁就被释放了,而设置过大万一服务出现异常无法正常释放锁会导致出现异常锁的时间也很长。
怎么解决这个问题呢?
目前大公司的一个方案是这样子的:
* 在加锁成功之后,启动一个守护线程
* 守护线程每隔1/3的锁的超时时间就去延迟锁的超时时间,比如说锁设置为30秒,那就是每隔10秒就去延长锁的超时时间,重新设置为30秒
* 业务代码执行完成,关闭守护线程
在实际操作中,需要注意几点:
* 只续对的:和释放锁一样,需要判断锁的对象有没有发生变化,否则会造成无论谁加锁,守护线程都会重新设置锁的超时时间
* 不能动不动就续:守护线程要在合理的时间再去设置锁的超时时间,否则会造成资源的浪费
* 及时销毁:如果加锁的线程/进程已经处理完业务了,那么守护进程应该被销毁,否则会造成资源的浪费
### 三、方案三
上面的方案还得考虑Redis的部署问题。
众所周知,Redis有3种部署方式:
* 单机模式
* Master-Slave + Sentinel(哨兵)选举模式
* Redis Cluster(集群)模式
使用 Redis 做分布式锁的缺点在于:如果采用单机部署模式,会存在单点问题,只要 Redis 故障了。加锁就不行了。
采用 Master-Slave 模式/集群模式,如下:
```
线程1加了锁去执行业务了
刚好Redis的 master 发生故障挂掉了,此时还没有将数据同步到 slave 上
集群会选举一个新的 master 出来,但是新的 master 上并没有这个锁
线程2可以在新选举产生的 master 上去加锁,然后处理业务
```
这样的话,就导致了两个线程同时持有了锁,锁就不再具有安全性。
针对这个问题,有两个解决方案:
* RedLock
* **Zookeeper**【推荐】
#### 1. RedLock
基于以上的考虑,Redis的作者提出了一个RedLock的算法。
这个算法的意思大概是这样的:假设 Redis 的部署模式是 Redis Cluster,总共有 5 个 Master 节点。
通过以下步骤获取一把锁:
- 获取当前时间戳,单位是毫秒。
- 轮流尝试在每个 Master 节点上创建锁,过期时间设置较短,一般就几十毫秒。
- 尝试在大多数节点上建立一个锁,比如 5 个节点就要求是 3 个节点(n / 2 +1)。
- 客户端计算建立好锁的时间,如果建立锁的时间小于超时时间,就算建立成功了。
- 要是锁建立失败了,那么就依次删除这个锁。
- 只要别人建立了一把分布式锁,你就得不断轮询去尝试获取锁。
如图:

但是这样的这种算法还是**颇具争议**的,可能还会存在不少的问题,无法保证加锁的过程一定正确,**不太推荐**。
更多关于RedLock的资料可参考:
* [Redis官网的Redlock](https://redis.io/topics/distlock)
* [基于Redis的分布式锁到底安全吗(上)](http://zhangtielei.com/posts/blog-redlock-reasoning.html)
* [How to do distributed locking](https://martin.kleppmann.com/2016/02/08/how-to-do-distributed-locking.html)
**注意**:除了RedLock之外目前并没有有效解决Redis主从切换导致锁失效的方法。在这种情况下(一致性要求非常高的情况下)一般是不会使用Redis,而推荐使用**Zookeeper**。
### 四、Redisson
目前业界对于Redis的分布式锁有了现成的实现方案了,比较出名的是Redisson开源框架。
Redisson 是 Redis 的 **Java** 实现的客户端,其 API 提供了比较全面的 Redis 命令的支持。
Redission 通过 Netty 支持非阻塞 I/O。
Redisson 封装了锁的实现,让我们像操作我们的本地 Lock一样来使用,除此之外还有对集合、对象、常用缓存框架等做了友好的封装,易于使用。
除此之外,Redisson还实现了分布式锁的自动续期机制、锁的互斥自等待机制、锁的可重入加锁于释放锁的机制,可以说Redisson对分布式锁的实现是实现了一整套机制的。
Redisson 可以便捷的支持多种Redis部署架构:
* 单机模式
* Master-Slave + Sentinel(哨兵)选举模式
* Redis Cluster(集群)模式
引入Redission之后,使用上非常简单,RedissonClient客户端提供了众多的接口实现,支持**可重入锁、公平锁、读写锁、锁超时、RedLock**等都提供了完整实现。
使用如下:
A. 引入maven
```java
org.redisson
redisson-spring-boot-starter
3.13.4
```
B. 增加配置文件
```java
@Configuration
public class RedissonConfig {
@Bean
public Redisson redisson() {
Config config = new Config();
//单机版
//config.useSingleServer().setAddress("redis://192.168.1.1:8001").setDatabase(0);
//集群版
config.useClusterServers()
.addNodeAddress("redis://192.168.1.1:8001")
.addNodeAddress("redis://192.168.1.1:8002")
.addNodeAddress("redis://192.168.1.2:8001")
.addNodeAddress("redis://192.168.1.2:8002")
.addNodeAddress("redis://192.168.1.3:8001")
.addNodeAddress("redis://192.168.1.3:8002");
return (Redisson) Redisson.create(config);
}
}
```
C. 分布式锁的实现
```java
@Service
public class RedisLockDemo {
@Autowired
private StringRedisTemplate redisTemplate;
@Autowired
private Redisson redisson;
public String deduceStock() {
String lockKey = "lockKey";
RLock redissonLock = redisson.getLock(lockKey);
try {
//加锁(超时默认30s), 实现锁续命的功能(后台启动一个timer, 默认每10s检测一次是否持有锁)
redissonLock.lock();
//------ 执行业务逻辑 ----start------
int stock = Integer.valueOf(redisTemplate.opsForValue().get("stock"));
if (stock > 0) {
int newStock = stock - 1;
//执行业务操作减库存
redisTemplate.opsForValue().set("stock", newStock + "");
System.out.println("扣减库存成功, 剩余库存:" + newStock);
} else {
System.out.println("库存已经为0,不能继续扣减");
}
//------ 执行业务逻辑 ----end------
} finally {
//解锁
redissonLock.unlock();
}
return "success";
}
}
```
实现的原理如下:

RedissonLock的使用介绍
```java
// 锁默认有效时间30秒,每10秒去检查并重新设置超时时间
void lock();
// 超过锁有效时间 leaseTime,就会释放锁
void lock(long leaseTime, TimeUnit unit);
// 尝试获取锁;成功则返回true,失败则返回false
boolean tryLock();
// 不会去启动定时任务;在 time 时间内还没有获取到锁,则返回false
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
// 不会去启动定时任务;当 waitTime 的时间到了,还没有获取到锁则返回false;若获取到锁了,锁的有效时间设置为 leaseTime
boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException;
```
也就是说,用法非常简单,但是内部上实现了方案二里面的所有细节:
- 为了兼容老的Redis版本,Redisson 所有指令都**通过 Lua 脚本**执行,Redis 支持 Lua 脚本原子性执行。
- Redisson 设置的Key 的默认过期时间为 30s,如果某个客户端持有一个锁超过了 30s 怎么办?Redisson 中有一个 Watchdog 的概念,翻译过来就是看门狗,它会在你获取锁之后,每隔 10s 帮你把 Key 的超时时间设为 30s。
- 如果获取锁失败,Redsson会通过while循环一直尝试获取锁(可自定义等待时间,超时后返回失败)
这样的话,就算一直持有锁也不会出现 Key 过期了,其他线程获取到锁的问题了。
另外,Redssion还提供了对Redlock算法的支持,用法也很简单:
```java
RedissonClient redisson = Redisson.create(config);
RLock lock1 = redisson.getFairLock("lock1");
RLock lock2 = redisson.getFairLock("lock2");
RLock lock3 = redisson.getFairLock("lock3");
RedissonRedLock multiLock = new RedissonRedLock(lock1, lock2, lock3);
multiLock.lock();
multiLock.unlock();
```
Redisson里面关于加锁/获取锁的Lua脚本流程图如下:

释放锁的Lua脚本流程图如下:

强烈建议大家看一下Redisson里面关于分布式锁的源码,更多关于Redisson的资料可参考:
* [Redisson官网](https://redisson.org/)
* [Redisson官方中文文档](https://www.bookstack.cn/read/redisson-doc-cn/overview.md)
* [Github](https://github.com/redisson/redisson/)
* [Redisson分布式锁实战与Watch Dog机制解读](https://www.cnblogs.com/keeya/p/14332131.html)
* [Redisson分布式的原理](https://www.toutiao.com/i6907120405061042692)
* [一文掌握Redisson分布式锁的原理](https://www.toutiao.com/i6907120405061042692)
注意:**Redison并不能有效的解决Redis的主从切换问题的**,**目前推荐使用Zookeeper分布式锁来解决。**
### 五、分段锁
怎么在高并发的场景去实现一个高性能的分布式锁呢?
电商网站在大促的时候并发量很大:
(1)若抢购不是同一个商品,则可以增加Redis集群的cluster来实现,因为不是同一个商品,所以通过计算 key 的hash会落到不同的 cluster上;
(2)若抢购的是同一个商品,则计算key的hash值会落同一个cluster上,所以加机器也是没有用的。
针对第二个问题,可以使用库存分段锁的方式去实现。
**分段锁**
假如产品1有200个库存,可以将这200个库存分为10个段存储(每段20个),每段存储到一个cluster上;将key使用hash计算,使这些key最后落在不同的cluster上。
每个下单请求锁了一个库存分段,然后在业务逻辑里面,就对数据库或者是Redis中的那个分段库存进行操作即可,包括查库存 -> 判断库存是否充足 -> 扣减库存。
具体可以参照 ConcurrentHashMap 的源码去实现,它使用的就是分段锁。
高性能分布式锁具体可参考链接:[每秒上千订单场景下的分布式锁高并发优化实践!【石杉的架构笔记】](https://mp.weixin.qq.com/s/RLeujAj5rwZGNYMD0uLbrg)
原理如图:

## 0x05 总结
总结:
* 追求数据可靠性/强一致性:使用Zookeeper
* 追求性能:选择Redis,推荐Redisson
* Redis分布式锁目前最大问题在于:主从模式下/集群模式下,master节点宕机,异步同步数据导致锁丢失问题
* Redis的RedLock算法具有很大争议性,一般不推荐使用
## 0x06 附录
### Python代码实现
注意:没有实现看门狗的逻辑,需要自己实现
```python
import redis
import uuid
import time
class LockService:
"""
基于Redis实现的分布式锁
"""
host = 'localhost'
port = 6379
password = ''
db = 1
def __init__(self, conn=None):
"""
如果不传连接池的话,默认读取配置的Redis作为连接池
:param conn:
"""
self.conn = conn if conn else self.get_redis_client()
def get_redis_client(self):
"""
获取Redis连接
:return:
"""
return redis.Redis(
host=self.host,
port=self.port,
password=self.password,
db=self.db
)
def acquire_lock(self, lock_name, acquire_timeout=10, expire_time=30):
"""
加锁/获取锁
如果不存在lock_name,则加锁,并且设置过期时间,避免死锁
如果存在lock_name,则刷新过期时间
:param lock_name: 锁的名称
:param acquire_timeout: 加锁/获取锁的超时时间,默认10秒
:param expire_time: 锁的超时时间,默认30秒
:return:
"""
lockname = f'lock:{lock_name}'
value = str(uuid.uuid4())
end_time = time.time() + acquire_timeout
while time.time() < end_time:
# 如果不存在这个锁则加锁并设置过期时间,避免死锁
if self.conn.set(lockname, value, ex=expire_time, nx=True):
return value
time.sleep(0.1)
return False
def release_lock(self, lock_name, value):
"""
释放锁
:param lock_name: 锁的名称
:param value: 锁的值
:return:
"""
unlock_script = """
if redis.call("get",KEYS[1]) == ARGV[1] then
return redis.call("del",KEYS[1])
else
return 0
end
"""
lockname = f'lock:{lock_name}'
unlock = self.conn.register_script(unlock_script)
result = unlock(keys=[lockname], args=[value])
if result:
return True
else:
return False
```
## 参考
* *[120分钟搞懂Redis分布式锁的实现方式](https://www.bilibili.com/video/av66833923)*
* *[分布式锁的三种实现方式](https://www.cnblogs.com/barrywxx/p/11644803.html)*
* *[Redis之分布式锁的使用](https://www.cnblogs.com/yufeng218/p/13733205.html)*
* *[石杉的架构笔记:分布式锁的实现](https://mp.weixin.qq.com/s?__biz=MzU0OTk3ODQ3Ng==&mid=2247486884&idx=1&sn=ecd79b960efe4638e5da5db0f6fe9def&chksm=fba6e5a7ccd16cb10ef3cb6e96df79d5c152b597763d9726ff6b855b2680d31f9c9ecf1325e6&mpshare=1&scene=1&srcid=&sharer_sharetime=1578449410632&sharer_shareid=457a571080dcfbb66a2b355b8b090c0a#rd)*
* *[石杉的架构笔记:每秒上千订单场景下的分布式锁高并发优化实践](https://mp.weixin.qq.com/s/RLeujAj5rwZGNYMD0uLbrg)*
* *[面试不懂分布式锁?那得多吃亏](https://blog.csdn.net/qq_42046105/article/details/95557159)*
* *[Redisson官网](https://redisson.org/)*
* *[Redisson官方中文文档](https://www.bookstack.cn/read/redisson-doc-cn/overview.md)*
* *[Redisson的Github](https://github.com/redisson/redisson/)*
* *[Redisson分布式锁实战与Watch Dog机制解读](https://www.cnblogs.com/keeya/p/14332131.html)*
* *[Redisson分布式的原理](https://www.toutiao.com/i6907120405061042692)*
* [一文掌握Redisson分布式锁的原理](https://www.toutiao.com/i6907120405061042692)
* *[Redis官网的Redlock](https://redis.io/topics/distlock)*
* *[基于Redis的分布式锁到底安全吗(上)](http://zhangtielei.com/posts/blog-redlock-reasoning.html)*
* *[Distributed locks with Redis](https://link.zhihu.com/?target=https%3A//redis.io/topics/distlock)*
* *[Maritin博客:How to do distributed locking](https://martin.kleppmann.com/2016/02/08/how-to-do-distributed-locking.html)*