計(jì)算機(jī)網(wǎng)站開(kāi)發(fā)職業(yè)定位app開(kāi)發(fā)制作
一、Redisson 分布式鎖源碼解析
Redisson
是架設(shè)在Redis
基礎(chǔ)上的一個(gè)Java
駐內(nèi)存數(shù)據(jù)網(wǎng)格。在基于NIO
的Netty
框架上,充分的利用了Redis
鍵值數(shù)據(jù)庫(kù)提供的一系列優(yōu)勢(shì),在Java
實(shí)用工具包中常用接口的基礎(chǔ)上,為使用者提供了一系列具有分布式特性的常用工具類。使得原本作為協(xié)調(diào)單機(jī)多線程并發(fā)程序的工具包獲得了協(xié)調(diào)分布式多機(jī)多線程并發(fā)系統(tǒng)的能力,大大降低了設(shè)計(jì)和研發(fā)大規(guī)模分布式系統(tǒng)的難度。同時(shí)結(jié)合各富特色的分布式服務(wù),更進(jìn)一步簡(jiǎn)化了分布式環(huán)境中程序相互之間的協(xié)作。
其中比較具體特色的就是 Redisson
對(duì)分布式鎖的支持,不僅簡(jiǎn)化了分布式鎖的應(yīng)用過(guò)程還支持 Fair Lock、MultiLock、RedLock、ReadWriteLock
等鎖的實(shí)現(xiàn)。本文就 Redisson
分布式鎖的加鎖和解鎖過(guò)程的源碼進(jìn)行大致的解析。
下面是Redisson
源碼地址:
https://github.com/redisson/redisson
如果對(duì) Redisson
的使用還不了解的小伙伴可以先看下下面這篇文章:
https://xiaobichao.blog.csdn.net/article/details/112726748
Redisson
中的分布式鎖在使用起來(lái)非常簡(jiǎn)便,例如:
public class TestLock {@ResourceRedissonClient redissonClient;@Testpublic void test() {RLock lock = null;try {// 獲取可重入鎖lock = redissonClient.getLock("redislock");// 獲取鎖,如果獲取不到會(huì)等待lock.lock();Thread.sleep(30000000);} catch (InterruptedException e) {throw new RuntimeException(e);} finally {if (lock != null) {// 釋放鎖lock.unlock();}}}@Testpublic void test1() {RLock lock = null;try {// 獲取可重入鎖lock = redissonClient.getLock("redislock");// 嘗試獲取鎖,返回獲取鎖的狀態(tài)Boolean isLock = lock.tryLock();Thread.sleep(30000000);} catch (InterruptedException e) {throw new RuntimeException(e);} finally {if (lock != null) {// 釋放鎖lock.unlock();}}}
}
下面分別從 lock
、tryLock
、unlock
、三個(gè)地方進(jìn)行源碼的解析。
二、lock 獲取鎖和看門狗機(jī)制
先看下 redissonClient.getLock
方法,它默認(rèn)創(chuàng)建了一個(gè) RedissonLock
對(duì)象,并將鎖的key
傳遞進(jìn)來(lái):
而 RedissonLock
對(duì)象又繼承至RedissonBaseLock
類:
因此我們下面大多的源碼分析都基于這兩個(gè)類進(jìn)行。
首先進(jìn)到 RedissonLock
類下的 lock()
方法中:
這里主要又調(diào)用了 lock(long leaseTime, TimeUnit unit, boolean interruptibly)
方法,注意如果沒(méi)有指定過(guò)期時(shí)間默認(rèn)為 -1
,下面看到 lock(long leaseTime, TimeUnit unit, boolean interruptibly)
方法中:
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {// 當(dāng)前線程IDlong threadId = Thread.currentThread().getId();// 嘗試獲取鎖,如果已經(jīng)有鎖的話返回鎖的剩余時(shí)間Long ttl = tryAcquire(-1, leaseTime, unit, threadId);// 獲取鎖成功if (ttl == null) {return;}// 如果獲取鎖失敗,訂閱當(dāng)前線程,以便后續(xù)獲取鎖時(shí)得到通知。CompletableFuture<RedissonLockEntry> future = subscribe(threadId);//設(shè)置超時(shí)處理,當(dāng)訂閱的future完成時(shí),觸發(fā)超時(shí)處理。pubSub.timeout(future);//定義一個(gè)RedissonLockEntry對(duì)象,用于表示當(dāng)前線程在分布式鎖中的狀態(tài)。RedissonLockEntry entry;if (interruptibly) {// 可中斷entry = commandExecutor.getInterrupted(future);} else {entry = commandExecutor.get(future);}try {// 循環(huán)嘗試獲取鎖while (true) {// 嘗試獲取鎖ttl = tryAcquire(-1, leaseTime, unit, threadId);// 獲取鎖成功if (ttl == null) {break;}// 如果已經(jīng)存在鎖的過(guò)期時(shí)間大于等于0,需要等待通知if (ttl >= 0) {try {// 通過(guò)Semaphore 的 tryAcquire方法等待指定時(shí)間entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);} catch (InterruptedException e) {if (interruptibly) {throw e;}entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);}} else { //如果剩余時(shí)間小于0,就一直等待。if (interruptibly) {entry.getLatch().acquire();} else {entry.getLatch().acquireUninterruptibly();}}}} finally {// 無(wú)論加鎖成功或失敗,都取消訂閱unsubscribe(entry, threadId);}
}
代碼中加了注釋,這里我總結(jié)下,首先調(diào)用 tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId)
方法嘗試獲取鎖,如果鎖存在的話則返回過(guò)期時(shí)間,為 null
的話表示獲取鎖成功。如果獲取鎖失敗,則將自己加入到訂閱中,然后開(kāi)啟一個(gè)死循環(huán),在循環(huán)中再次嘗試獲取鎖,如果還是沒(méi)有獲取到的話則使用 Semaphore
的 tryAcquire
方法阻塞當(dāng)前線程,如果其他線程釋放了鎖,則這里繼續(xù)循環(huán)再次嘗試獲取鎖。
下面主要看下 tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId)
嘗試獲取鎖的邏輯,看到該方法下:
tryAcquire
方法又調(diào)用了 tryAcquireAsync0
方法,然后又主要調(diào)用了 tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId)
方法,下面主要看到這個(gè)方法下:
private RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {RFuture<Long> ttlRemainingFuture;if (leaseTime > 0) {//如果指定了鎖持有時(shí)間,則根據(jù)指定的時(shí)間設(shè)置 key 的過(guò)期時(shí)間ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);} else {// 沒(méi)指定,默認(rèn)鎖持有 30sttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);}// 執(zhí)行 lua 操作CompletionStage<Long> s = handleNoSync(threadId, ttlRemainingFuture);ttlRemainingFuture = new CompletableFutureWrapper<>(s);CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> {// 如果加鎖成功if (ttlRemaining == null) {if (leaseTime > 0) {internalLockLeaseTime = unit.toMillis(leaseTime);} else {// 沒(méi)指定的話// 啟動(dòng)看門狗,延長(zhǎng)鎖持有時(shí)間scheduleExpirationRenewal(threadId);}}// 返回鎖的過(guò)期時(shí)間return ttlRemaining;});return new CompletableFutureWrapper<>(f);
}
這里其中 tryLockInnerAsync
方法主要是指定了 Lua
腳本,主要注意的是如果沒(méi)有指定了鎖的過(guò)期則默認(rèn)為 30s
的時(shí)間,然后在 Lua
腳本執(zhí)行后,同樣的判斷,如果獲取到鎖的話并且沒(méi)有指定鎖的過(guò)期時(shí)間則開(kāi)啟看門狗機(jī)制,為鎖延長(zhǎng)時(shí)間續(xù)命的操作。
這里先看下核心操作 tryLockInnerAsync
方法中 Lua
腳本:
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {// lua 腳本return evalWriteSyncedAsync(getRawName(), LongCodec.INSTANCE, command,// 如果鎖不存在,或者哈希表中鎖對(duì)應(yīng)的線程ID存在的話"if ((redis.call('exists', KEYS[1]) == 0) " +"or (redis.call('hexists', KEYS[1], ARGV[2]) == 1)) then " +// 對(duì)hash中的內(nèi)容值 +1"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +// 設(shè)置過(guò)期時(shí)間"redis.call('pexpire', KEYS[1], ARGV[1]); " +//表示腳本執(zhí)行成功,且不需要返回特定的值。"return nil; " +"end; " +// 如果if條件不滿足,返回剩余過(guò)期時(shí)間(以毫秒為單位)"return redis.call('pttl', KEYS[1]);",// 對(duì)應(yīng)這 lua 腳本中的參數(shù),第一個(gè)參數(shù)就是 KEYS[1],以此類推Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
}
這里主要利于 Lua
的原子性將整個(gè)判斷操作過(guò)程給原子化了,其中這里鎖的結(jié)構(gòu)是以 hash
的形式存放的,key
為鎖的名稱,hash
中的key
為線程ID
(UUID
+線程ID
的形式),因?yàn)榉植际角闆r下線程ID
也有可能重復(fù),value
為數(shù)字表示鎖重入的次數(shù), lua
腳本如果執(zhí)行加鎖邏輯成功則返回 null
,否則返回鎖的過(guò)期時(shí)間,也就對(duì)應(yīng)前面獲取鎖的時(shí)候判斷的依據(jù)。
下面回到上面的 tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId)
方法中,在 ttlRemainingFuture.thenApply
中如果獲取鎖成功,并且沒(méi)有指定鎖的過(guò)期時(shí)間則會(huì)開(kāi)啟看門狗機(jī)制為鎖進(jìn)行續(xù)命操作,主要調(diào)用的是 scheduleExpirationRenewal(long threadId)
方法,下面看到該方法下的邏輯:
protected void scheduleExpirationRenewal(long threadId) {ExpirationEntry entry = new ExpirationEntry();// 加入看門狗記錄中ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);// 如果已經(jīng)存在if (oldEntry != null) {// 重新指定線程IDoldEntry.addThreadId(threadId);} else { // 如果不存在的話就開(kāi)啟看門狗entry.addThreadId(threadId);try {// 啟動(dòng)看門狗renewExpiration();} finally {// 如果線程已經(jīng)終止,則關(guān)閉看門狗if (Thread.currentThread().isInterrupted()) {cancelExpirationRenewal(threadId);}}}
}
主要的邏輯在 renewExpiration()
方法下,繼續(xù)看到該方法中:
private void renewExpiration() {// 獲取當(dāng)前信息ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());if (ee == null) {return;}// 執(zhí)行計(jì)時(shí)任務(wù)Timeout task = getServiceManager().newTimeout(new TimerTask() {@Overridepublic void run(Timeout timeout) throws Exception {//再次獲取信息ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());if (ent == null) {return;}// 獲取線程IDLong threadId = ent.getFirstThreadId();if (threadId == null) {return;}// 延長(zhǎng)鎖的過(guò)期時(shí)間CompletionStage<Boolean> future = renewExpirationAsync(threadId);future.whenComplete((res, e) -> {if (e != null) { //如果有異常刪除該任務(wù)log.error("Can't update lock {} expiration", getRawName(), e);EXPIRATION_RENEWAL_MAP.remove(getEntryName());return;}if (res) { // 如果執(zhí)行成功// 遞歸繼續(xù)執(zhí)行renewExpiration();} else { // 執(zhí)行失敗// 關(guān)閉看門狗cancelExpirationRenewal(null);}});}}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);ee.setTimeout(task);
}
這里主要通過(guò)遞歸延時(shí)任務(wù)的方式實(shí)現(xiàn)循環(huán)執(zhí)行的效果,其中延時(shí)的時(shí)間為 internalLockLeaseTime
的三分之一,也就是默認(rèn) 10s
觸發(fā)一次,在任務(wù)中主要通過(guò) renewExpirationAsync(long threadId)
方法,對(duì)鎖進(jìn)行了延時(shí)續(xù)命操作,看到該方法中:
protected CompletionStage<Boolean> renewExpirationAsync(long threadId) {// lua 腳本return evalWriteSyncedAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,// 如果鎖和線程ID存在"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +// 重置過(guò)期時(shí)間"redis.call('pexpire', KEYS[1], ARGV[1]); " +// 成功返回 1"return 1; " +"end; " +// 失敗返回 0"return 0;",// lua 腳本中對(duì)應(yīng)的參數(shù)Collections.singletonList(getRawName()),internalLockLeaseTime, getLockName(threadId));
}
這里還是依靠 Lua
腳本的方式,如果鎖存在的話就重置過(guò)期時(shí)間,達(dá)到續(xù)命的效果。
三、tryLock 獲取鎖
tryLock
和lock
是兩種獲取分布式鎖的方法,它們的主要區(qū)別在于獲取鎖的方式和阻塞行為。tryLock
默認(rèn)是一種非阻塞的獲取鎖的方法,也可以通過(guò)設(shè)置 waitTime
變成阻塞的。而lock
默認(rèn)就是一種阻塞的獲取鎖的方法。
他們倆的最終處理邏輯都是一樣的,只不過(guò)默認(rèn)的 tryLock
沒(méi)有訂閱阻塞的操作。
下面看下默認(rèn)的 tryLock
的操作 ,進(jìn)到 RedissonLock
下的 tryLock()
中:
再進(jìn)入 tryLockAsync()
方法中:
這里調(diào)用了 tryLockAsync
方法,并將當(dāng)前線程的ID
傳遞了進(jìn)來(lái),繼續(xù)看到 tryLockAsync
方法中:
在看到 tryAcquireOnceAsync
方法中,注意這里的等待時(shí)間和上面 lock()
默認(rèn)一樣,是 -1
:
private RFuture<Boolean> tryAcquireOnceAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {CompletionStage<Boolean> acquiredFuture;if (leaseTime > 0) {//如果指定了鎖持有時(shí)間,則根據(jù)指定的時(shí)間設(shè)置 key 的過(guò)期時(shí)間acquiredFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);} else {// 沒(méi)指定,默認(rèn)鎖持有 30sacquiredFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);}// 執(zhí)行 lua 操作acquiredFuture = handleNoSync(threadId, acquiredFuture);CompletionStage<Boolean> f = acquiredFuture.thenApply(acquired -> {// 如果加鎖成功if (acquired) {// 如果指定了鎖持有時(shí)間if (leaseTime > 0) {internalLockLeaseTime = unit.toMillis(leaseTime);} else { // 沒(méi)指定的話,// 看門狗,延長(zhǎng)鎖持有時(shí)間scheduleExpirationRenewal(threadId);}}// 返回獲取鎖的狀態(tài)return acquired;});return new CompletableFutureWrapper<>(f);
}
這里的邏輯相比于前面 lock()
的邏輯就差不多了,只不過(guò)缺少了訂閱和阻塞等待重試的操作,再下面的操作和lock()
的邏輯是一致的。
四、unlock 解鎖和關(guān)閉看門狗
解鎖的邏輯看到 RedissonBaseLock
下的 unlock()
方法中:
繼續(xù)看到 unlockAsync
方法中:
主要邏輯在 unlockAsync0
方法中:
private RFuture<Void> unlockAsync0(long threadId) {// 解鎖CompletionStage<Boolean> future = unlockInnerAsync(threadId);CompletionStage<Void> f = future.handle((opStatus, e) -> {// 關(guān)閉看門狗cancelExpirationRenewal(threadId);if (e != null) { // 如果執(zhí)行有異常if (e instanceof CompletionException) {throw (CompletionException) e;}throw new CompletionException(e);}if (opStatus == null) { // 如果結(jié)果為空的話,表示鎖不存在IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "+ id + " thread-id: " + threadId);throw new CompletionException(cause);}return null;});return new CompletableFutureWrapper<>(f);
}
主要做了兩件事,解鎖和關(guān)閉看門狗,先看下 unlockInnerAsync(long threadId)
方法解鎖的過(guò)程:
protected final RFuture<Boolean> unlockInnerAsync(long threadId) {String id = getServiceManager().generateId();MasterSlaveServersConfig config = getServiceManager().getConfig();int timeout = (config.getTimeout() + config.getRetryInterval()) * config.getRetryAttempts();timeout = Math.max(timeout, 1);// 解鎖RFuture<Boolean> r = unlockInnerAsync(threadId, id, timeout);CompletionStage<Boolean> ff = r.thenApply(v -> {CommandAsyncExecutor ce = commandExecutor;if (ce instanceof CommandBatchService) {ce = new CommandBatchService(commandExecutor);}ce.writeAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.DEL, getUnlockLatchName(id));if (ce instanceof CommandBatchService) {((CommandBatchService) ce).executeAsync();}// 釋放鎖的結(jié)果return v;});return new CompletableFutureWrapper<>(ff);
}
這里的重點(diǎn)主要關(guān)注 unlockInnerAsync
方法,通過(guò)使用 Lua 腳本進(jìn)行解鎖的操作:
protected RFuture<Boolean> unlockInnerAsync(long threadId, String requestId, int timeout) {// lua 腳本return evalWriteSyncedAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,// 從Redis中獲取鎖的狀態(tài)。"local val = redis.call('get', KEYS[3]); " +//如果不是false"if val ~= false then " +//將其轉(zhuǎn)換為數(shù)字并返回,也就是 true 返回 1"return tonumber(val);" +"end; " +// 如果哈希表鎖中不存在線程ID,表示鎖已經(jīng)被釋放,返回nil。"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +"return nil;" +"end; " +//對(duì)鎖中的線程ID的值減1,并將結(jié)果存儲(chǔ)在 counter 變量中。這是一個(gè)計(jì)數(shù)器的操作。"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +//如果計(jì)數(shù)器值大于0,表示鎖仍然被持有。"if (counter > 0) then " +// 更新哈希表鎖的過(guò)期時(shí)間。"redis.call('pexpire', KEYS[1], ARGV[2]); " +// 設(shè)置鍵鎖的狀態(tài)值為0,并設(shè)置過(guò)期時(shí)間,表示鎖仍然被持有。"redis.call('set', KEYS[3], 0, 'px', ARGV[5]); " +//返回0,表示鎖仍然被持有"return 0; " +"else " + //如果計(jì)數(shù)器值不大于0,表示鎖即將被釋放。//刪除鎖"redis.call('del', KEYS[1]); " +"redis.call(ARGV[4], KEYS[2], ARGV[1]); " +// 設(shè)置鍵鎖的狀態(tài)值為1,并設(shè)置過(guò)期時(shí)間,表示鎖已經(jīng)被釋放。"redis.call('set', KEYS[3], 1, 'px', ARGV[5]); " +//返回1,表示鎖已經(jīng)被釋放"return 1; " +"end; ",Arrays.asList(getRawName(), getChannelName(), getUnlockLatchName(requestId)),LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime,getLockName(threadId), getSubscribeService().getPublishCommand(), timeout);
}
需要注意的是,在 Lua
腳本中,如果鎖還存在的話,就對(duì) hash
中的 value
減一,如果此時(shí) value
結(jié)果還大于 0
的話,則表示這是重入鎖的場(chǎng)景,此時(shí)不能直接刪除鎖,而是對(duì)重入的次數(shù)進(jìn)行減一,并且要重置過(guò)期時(shí)間。
下面再回到 unlockAsync0(long threadId)
方法中,釋放鎖通過(guò) Lua
腳本實(shí)現(xiàn)了,下面看下 cancelExpirationRenewal(Long threadId)
關(guān)閉看門狗的操作:
protected void cancelExpirationRenewal(Long threadId) {// 從記錄中獲取信息ExpirationEntry task = EXPIRATION_RENEWAL_MAP.get(getEntryName());if (task == null) {return;}if (threadId != null) {// 移除線程IDtask.removeThreadId(threadId);}if (threadId == null || task.hasNoThreads()) {// 關(guān)閉計(jì)時(shí)任務(wù)Timeout timeout = task.getTimeout();if (timeout != null) {timeout.cancel();}// 從緩存記錄中刪除EXPIRATION_RENEWAL_MAP.remove(getEntryName());}
}
這里就比較好理解了,停止計(jì)時(shí)任務(wù),從緩存記錄中移除。