更新時間:2022-11-11 來源:黑馬程序員 瀏覽量:
相信很多同學都聽說過分布式鎖,但也僅僅停留在概念的理解上,這篇文章會從分布式鎖的應用場景講起,從實現(xiàn)的角度上深度剖析redis如何實現(xiàn)分布式鎖。
一、超賣問題
我們先來看超賣的概念:
當寶貝庫存接近0時,如果多個買家同時付款購買此寶貝,或者店鋪后臺在架數(shù)量大于倉庫實際數(shù)量,將會出現(xiàn)超賣現(xiàn)象。超賣現(xiàn)象本質上就是買到了比倉庫中數(shù)量更多的寶貝。
> 本文主要解決超賣問題的第一種,同時多人購買寶貝時,造成超賣。
測試代碼
那么超賣問題是如何產生的呢?我們準備一段代碼進行測試:
@Autowired private StringRedisTemplate stringRedisTemplate; /** * 第一種實現(xiàn),進程內就存在線程安全問題 * 可以只啟動一個進程測試 */ @RequestMapping("/deduct_stock1") public void deductStock1(){ String stock = stringRedisTemplate.opsForValue().get("stock"); int stockNum = Integer.parseInt(stock); if(stockNum > 0){ //設置庫存減1 int realStock = stockNum - 1; stringRedisTemplate.opsForValue().set("stock",realStock + ""); System.out.println("設置庫存" + realStock); }else{ System.out.println("庫存不足"); } }
這段代碼中,使用redis先獲取庫存數(shù)量(當然實際場景中不會只保存一個全局庫存數(shù),應該根據(jù)每一個商品單元(sku)保存一份庫存數(shù))。
String stock = stringRedisTemplate.opsForValue().get("stock"); int stockNum = Integer.parseInt(stock);
接下來,判斷庫存數(shù)是否大于0:
- 如果大于0,將庫存數(shù)減一,通過set命令,寫回redis
>這里沒有使用redis的decrement命令,因為此命令在redis單線程模型下是線程安全的,而為了可以模擬線程不安全的情況將其拆成三步操作。
//設置庫存減1 int realStock = stockNum - 1; stringRedisTemplate.opsForValue().set("stock",realStock + ""); System.out.println("設置庫存" + realStock);
- 如果小于等于0,提示庫存不足
JMeter測試
通過JMeter進行并發(fā)測試,看下會不會出現(xiàn)超賣的問題:
1.啟動tomcat
這種情況下,只需要啟動一個tomcat就會出現(xiàn)超賣。我們先啟動一個tomcat在8080端口上。
2.下載JMeter
Apache JMeter是Apache組織開發(fā)的基于Java的壓力測試工具。
從官網上下載即可:
[https://jmeter.apache.org/download_jmeter.cgi](https://links.jianshu.com/go?to=https%3A%2F%2Fjmeter.apache.org%2Fdownload_jmeter.cgi)
下載完之后解壓,運行bin目錄下的jmeter.bat,顯示如下界面:
如果嫌字體太小,可以選擇放大:
3.配置JMeter
在Test Plan上點擊右鍵,創(chuàng)建`線程組(Thread Group)`
配置一下具體參數(shù):
- `Number of Threads` 同時并發(fā)線程數(shù)
- `Ramp-Up Period(in-seconds)` 代表隔多長時間執(zhí)行,0代表同時并發(fā)。假設線程數(shù)為100, 估計的點擊率為每秒10次, 那么估計的理想ramp-up period 就是 100/10 = 10 秒
- `Loop Count` 循環(huán)次數(shù)
> 這里給出500是為了直接測試并發(fā)500搶,看看能不能正好把500個貨物搶完。
添加Http請求:
添加請求URL:
添加聚合結果,用來顯示整體的運行情況:
到此為止JMeter的配置結束。
4.設置庫存量
啟動redis-server,使用redis-client連接:
把庫存數(shù)設置為500。
5.開始測試
點擊運行按鈕,啟動測試:
首先我們看到聚合報告里輸出的結果:
錯誤率0%,樣本數(shù)500,證明500個請求都已經執(zhí)行,但是發(fā)現(xiàn)控制臺輸出如下:
<img src="assets/12.png" alt="img" style="zoom:67%;" />
很顯然,一份商品都被賣了多次,這顯然是不合理的。
原因分析
現(xiàn)在我們只啟動了一個tomcat,在單jvm進程的情況下,tomcat會使用線程池接收請求:
而由于每個線程可能同時獲取到庫存量,所以庫存量在兩個線程中顯示的都是500,然后兩個線程就繼續(xù)進行扣減庫存操作,得出499寫回redis中,在這個過程中,顯然存在線程安全的問題。同一個商品被賣出了2份,超賣問題就出現(xiàn)了。
二、加鎖優(yōu)化
synchronized鎖
要保證單jvm中線程安全,最簡單直接的方式就是添加synchronized關鍵字,那么這樣行不行呢,我們來做一個測試:
/** * 第二種實現(xiàn),使用synchronized加鎖 * 可以只啟動一個進程測試 */ @RequestMapping("/deduct_stock2") public void deductStock2(){ synchronized (this){ String stock = stringRedisTemplate.opsForValue().get("stock"); int stockNum = Integer.parseInt(stock); if(stockNum > 0){ //設置庫存減1 int realStock = stockNum - 1; stringRedisTemplate.opsForValue().set("stock",realStock + ""); System.out.println("設置庫存" + realStock); }else{ System.out.println("庫存不足"); } } }
在進行扣減庫存前,先通過synchronized關鍵字,對資源加鎖,這樣就只有一個線程能進入到扣減庫存的代碼塊中。來測試一下:
重置庫存
set stock 500
修改接口地址
測試
<img src="assets/15.png" alt="img" style="zoom:67%;" />
可以看到,庫存被扣減為0,并且沒有出現(xiàn)超賣的情況(設置了500庫存,并且500個人搶,正好搶完)。
但是這種方案顯然是不行的,在生產環(huán)境上如果部署多個tomcat實例,那么就會出現(xiàn)如下情況:
多個進程無法共享jvm內存中的鎖,所以會出現(xiàn)多把鎖,這種情況下也會出現(xiàn)超賣問題。
三、分布式鎖的實現(xiàn)
多Tomcat實例下的超賣演示
接下來我們演示一下如何在多個Tomcat情況下,演示超賣的問題:
1.啟動兩個tomcat服務
在IDEA中配置兩個spring boot的啟動項,使用vm參數(shù)指定不同的端口號
```undefined
-Dserver.port=8080
```
2.配置nginx
編寫~/nginx_redis/conf/nginx.conf如下:
user nginx; worker_processes 1; error_log /var/log/nginx/error.log warn; pid /var/run/nginx.pid; events { worker_connections 1024; } http { include /etc/nginx/mime.types; default_type application/octet-stream; log_format main '$remote_addr - $remote_user [$time_local] "$request" ' '$status $body_bytes_sent "$http_referer" ' '"$http_user_agent" "$http_x_forwarded_for"'; upstream redislock{ server 192.168.226.1:8080 weight=1; server 192.168.226.1:8081 weight=1; } server { listen 80; server_name localhost; location /{ root html; proxy_pass http://redislock; } } access_log /var/log/nginx/access.log main; sendfile on; #tcp_nopush on; keepalive_timeout 65; #gzip on; include /etc/nginx/conf.d/*.conf; }
> 192.168.226.1這是我宿主機的IP
準備一個虛擬機(也可以使用windows下的nginx),使用docker啟動nginx:
docker pull nginx docker run -di -p 10085:80 --name nginx-redis-hc -v ~/nginx_redis/html:/usr/share/nginx/html -v ~/nginx_redis/conf/nginx.conf:/etc/nginx/nginx.conf -v ~/nginx_redis/logs:/var/log/nginx nginx
在宿主機下使用`虛擬機的IP地址:10085`訪問nginx,如果出現(xiàn)如下頁面就代表成功:
3.測試
修改接口地址為nginx:
運行查看兩個tomcat的控制臺:
- tomcat1
<img src="assets/21.png" alt="img" style="zoom:67%;" />
- tomcat2
<img src="assets/22.png" alt="img" style="zoom:67%;" />
沒有將庫存清空,證明存在超賣問題。
手動實現(xiàn)分布式鎖
使用redis手動實現(xiàn)分布式鎖,需要用到命令`setnx`。先來介紹一下setnx:
SETNX key value[]
> 可用版本: >= 1.0.0
>
> 時間復雜度: O(1)
只在鍵 `key` 不存在的情況下, 將鍵 `key` 的值設置為 `value` 。
若鍵 `key` 已經存在, 則 `SETNX` 命令不做任何動作。
`SETNX` 是『SET if Not eXists』(如果不存在,則 SET)的簡寫。
返回值
命令在設置成功時返回 `1` , 設置失敗時返回 `0` 。
代碼示例
redis> EXISTS job # job 不存在 # job 不存在 (integer) 0 redis> SETNX job "programmer" # job 設置成功 (integer) 1 redis> SETNX job "code-farmer" # 嘗試覆蓋 job ,失敗 (integer) 0 redis> GET job # 沒有被覆蓋
使用redis構建分布式鎖流程如下:
image.png
- 線程1申請鎖(`setnx`),拿到了鎖。
- 線程2申請鎖,由于線程1已經擁有了鎖,`setnx`返回0失敗,這一步用戶操作會失敗。
- 線程1執(zhí)行扣減庫存操作并釋放鎖。
- 線程2再次申請鎖,獲取到鎖并執(zhí)行扣減庫存,然后釋放鎖。
> 注意這里線程沒有拿到鎖,如果不嘗試while(true)重新獲取鎖,這個操作就直接失敗了。
代碼實現(xiàn)
/** * 第三種實現(xiàn),使用redis中的setIfAbsent(setnx命令)實現(xiàn)分布式鎖 */ @RequestMapping("/deduct_stock3") public void deductStock3(){ //在獲取到鎖的時候,給鎖分配一個id String opId = UUID.randomUUID().toString(); Boolean stockLock = stringRedisTemplate .opsForValue().setIfAbsent("stockLock", opId, Duration.ofSeconds(30); if(stockLock){ try{ String stock = stringRedisTemplate.opsForValue().get("stock"); int stockNum = Integer.parseInt(stock); if(stockNum > 0){ //設置庫存減1 int realStock = stockNum - 1; stringRedisTemplate.opsForValue().set("stock",realStock + ""); System.out.println("設置庫存" + realStock); }else{ System.out.println("庫存不足"); } }catch(Exception e){ e.printStackTrace(); }finally { if(opId.equals(stringRedisTemplate .opsForValue().get("stockLock"))){ stringRedisTemplate.delete("stockLock"); } } } }
測試略過,這里有幾個知識點需要說明
setIfAbsent設置超時
如果setIfAbsent不設置超時時間,假設線程執(zhí)行業(yè)務代碼時間時死鎖或者其他原因導致長時間不釋放,那么會影響其他線程獲取到鎖,這個時候整體業(yè)務就會出現(xiàn)不可用。
Boolean stockLock = stringRedisTemplate .opsForValue().setIfAbsent("stockLock", opId, Duration.ofSeconds(30);
設置超時時間為30秒,該時間一般大于業(yè)務執(zhí)行的最大時間。
每次獲取到鎖,設置唯一ID
考慮這樣的場景
- 線程1獲取鎖扣減庫存,但是由于操作不當,長時間卡住,這樣會觸發(fā)超時時間鎖被釋放。
- 線程2獲取到鎖,扣減庫存。
- 線程1的代碼拋出異常,執(zhí)行finally釋放鎖,但是釋放的是進程B的鎖。
解決方案就是在**加鎖前生成UUID**,釋放的時候校驗UUID是否正確,如果不正確,說明加鎖線程不是當前線程。
使用Redisson實現(xiàn)分布式鎖
setnx雖好,但是實現(xiàn)起來畢竟太過麻煩,一不小心就可能陷入并發(fā)編程的陷阱中,那么有沒有更加簡單的實現(xiàn)方式呢?答案就是`redisson`。
> Redisson是架設在[Redis](https://links.jianshu.com/go?to=http%3A%2F%2Fwww.oschina.net%2Fp%2Fredis)基礎上的一個Java駐內存數(shù)據(jù)網格(In-Memory Data Grid)?!綶Redis官方推薦](https://links.jianshu.com/go?to=http%3A%2F%2Fwww.redis.io%2Fclients)】
> Redisson在基于NIO的[Netty](https://links.jianshu.com/go?to=http%3A%2F%2Fnetty.io%2F)框架上,充分的利用了Redis鍵值數(shù)據(jù)庫提供的一系列優(yōu)勢,在Java實用工具包中常用接口的基礎上,為使用者提供了一系列具有分布式特性的常用工具類。使得原本作為協(xié)調單機多線程并發(fā)程序的工具包獲得了協(xié)調分布式多機多線程并發(fā)系統(tǒng)的能力,大大降低了設計和研發(fā)大規(guī)模分布式系統(tǒng)的難度。同時結合各富特色的分布式服務,更進一步簡化了分布式環(huán)境中程序相互之間的協(xié)作。
總而言之,`redisson`提供了一系列較為完善的工具類,其中就包含了分布式鎖。用`redisson`實現(xiàn)分布式鎖的流程極為簡單。
引入依賴
<dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.14.0</version> </dependency>
創(chuàng)建Redisson實例
@Bean public RedissonClient redisson(){ // 1. Create config object Config config = new Config(); config.useSingleServer().setAddress("redis://127.0.0.1:6379"); // config.useClusterServers() // // use "rediss://" for SSL connection // .addNodeAddress("redis://127.0.0.1:7181"); return Redisson.create(config); }
編寫分布式鎖代碼
@Autowired private RedissonClient redissonClient; /** * 第四種實現(xiàn),使用redisson實現(xiàn) */ @RequestMapping("/deduct_stock4") public void deductStock4(){ RLock lock = redissonClient.getLock("redisson:stockLock"); try{ //加鎖 lock.lock(); String stock = stringRedisTemplate.opsForValue().get("stock"); int stockNum = Integer.parseInt(stock); if(stockNum > 0){ //設置庫存減1 int realStock = stockNum - 1; stringRedisTemplate.opsForValue().set("stock",realStock + ""); System.out.println("設置庫存" + realStock); }else{ System.out.println("庫存不足"); } }catch(Exception e){ e.printStackTrace(); }finally { lock.unlock(); } }
其中加鎖代碼基本與進程內加鎖一致,就不再詳細解讀,讀者自行實踐即可。
Redisson分布式鎖原理
`Redisson分布式鎖`的主要原理非常簡單,利用了lua腳本的原子性。
在分布式環(huán)境下產生并發(fā)問題的主要原因是三個操作并不是原子操作:
- 獲取庫存
- 扣減庫存
- 寫入庫存
那么如果我們把三個操作合并為一個操作,在默認單線程的Redis中運行,是不會產生并發(fā)問題的。源碼如下:
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { internalLockLeaseTime = unit.toMillis(leaseTime); return evalWriteAsync(getName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "return redis.call('pttl', KEYS[1]);", Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId)); }
這一段源碼中,`redisson`利用了lua腳本的原子性,校驗key是否存在,如果不存在就創(chuàng)建key并利用incrby加一操作(這步操作主要是為了實現(xiàn)可重入性)。`redisson`實現(xiàn)的分布式鎖具備如下特性:
- 鎖失效
- 鎖續(xù)租
> 執(zhí)行時間長的鎖快要到期時會自動續(xù)租
- 可重入
- 操作原子性
鎖續(xù)租原理
使用如下代碼進行測試鎖續(xù)租的情況
@Test void test() throws InterruptedException { RLock testlock1111 = redissonClient.getLock("testlock"); testlock1111.lock(); try{ Thread thread = new Thread(() -> { while(true){ Long testlock = redisTemplate.getExpire("testlock"); System.out.println(LocalDateTime.now().format(DateTimeFormatter.ISO_DATE_TIME) + " ttl:" + testlock); try { Thread.sleep(1000L); } catch (InterruptedException e) { e.printStackTrace(); } } }); thread.start(); thread.join(); }finally { if(testlock1111.isHeldByCurrentThread()){ testlock1111.unlock(); } } }
我們會發(fā)現(xiàn),每隔10秒會自動續(xù)租一次,保證鎖不被釋放。
<img src="assets/25.png" alt="image-20220721153251169" style="zoom:67%;" />
那么這種續(xù)租的行為是如何實現(xiàn)的呢?考慮這種情況:如果線程加鎖之后,進程宕機,線程無法執(zhí)行解鎖代碼,那么這個鎖就無法得到釋放(注意,不是加鎖線程不允許亂解鎖),為了避免這種情況的發(fā)生,鎖都會設置一個過期時間。比如使用**lock**無參命令會默認設置30秒的過期時間。那么30秒之后呢?如果線程還在工作,自動釋放依然會產生線程安全的問題。所以Redisson使用了watch dog看門狗機制來實現(xiàn)自動續(xù)租。
核心代碼及注釋:
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) { RFuture<Long> ttlRemainingFuture; //lock()無參方法leaseTime為-1,所以進else分值 if (leaseTime > 0) { ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG); } else { //通過lua腳本加鎖 ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime, TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); } CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> { // 異步方法,等到加鎖成功會回調,第一次加鎖ttlRemaining為空,leaseTime為-1 if (ttlRemaining == null) { if (leaseTime > 0) { internalLockLeaseTime = unit.toMillis(leaseTime); } else { //設置延時任務 scheduleExpirationRenewal(threadId); } } return ttlRemaining; }); return new CompletableFutureWrapper<>(f); }
接下來分析scheduleExpirationRenewal的過程:
private void renewExpiration() { ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName()); if (ee == null) { return; } //創(chuàng)建一個延遲任務 Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() { @Override public void run(Timeout timeout) throws Exception { ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName()); if (ent == null) { return; } Long threadId = ent.getFirstThreadId(); if (threadId == null) { return; } //執(zhí)行l(wèi)ua腳本進行續(xù)租 CompletionStage<Boolean> future = renewExpirationAsync(threadId); future.whenComplete((res, e) -> { if (e != null) { log.error("Can't update lock " + getRawName() + " expiration", e); EXPIRATION_RENEWAL_MAP.remove(getEntryName()); return; } //執(zhí)行l(wèi)ua續(xù)租,鎖還在就續(xù)租,鎖不在返回false就取消續(xù)租的行為 if (res) { // reschedule itself renewExpiration(); } else { cancelExpirationRenewal(null); } }); } }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS); //internalLockLeaseTime默認值30,所以每10秒會續(xù)租一次,續(xù)租到30秒 ee.setTimeout(task); }
其中,renewExpirationAsync執(zhí)行的lua腳本如下:
protected CompletionStage<Boolean> renewExpirationAsync(long threadId) { return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return 1; " + "end; " + "return 0;", Collections.singletonList(getRawName()), internalLockLeaseTime, getLockName(threadId)); }
判斷hash中是否存在鎖,如果存在就設置過期時間為30秒,返回1。如果不存在就返回0。
總結
本文介紹了超賣問題產生的原因:操作不具備原子性,同時提出了集中解決思路。
- `synchronized鎖`,無法保證多實例下的線程安全
- `setnx`手動實現(xiàn),坑很多、代碼較為復雜
- `redisson`實現(xiàn),能夠保證多實例下線程安全,代碼簡單可靠