網(wǎng)站維護(hù)包括深圳企業(yè)網(wǎng)站制作公司
文章目錄
- 一、簡介
- 二、集成redission
- pom文件
- redission 配置文件
- application.yml文件
- 啟動類
- 三、JAVA 操作案例
- 字符串操作
- 哈希操作
- 列表操作
- 集合操作
- 有序集合操作
- 布隆過濾器操作
- 分布式鎖操作
- 四、源碼解析
一、簡介
Redisson 是一個在 Redis 的基礎(chǔ)上實現(xiàn)的 Java 駐內(nèi)存數(shù)據(jù)網(wǎng)格客戶端(In-Memory Data Grid)。它不僅提供了一系列的 redis 常用數(shù)據(jù)結(jié)構(gòu)命令服務(wù),還提供了許多分布式服務(wù),例如分布式鎖、分布式對象、分布式集合、分布式遠(yuǎn)程服務(wù)、分布式調(diào)度任務(wù)服務(wù)等等。
本文會介紹如何將redission集成到springboot和如何使用 Redisson 操作 Redis 中的字符串、哈希、列表、集合、有序集合,以及布隆過濾器和分布式鎖等功能,并解析源碼中其最大亮點的看門狗
機(jī)制。
二、集成redission
本章內(nèi)容使用的springboot版本為2.6.3
,redission-starter版本為3.17.7
pom文件
引入redission相關(guān)依賴
<dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-core</artifactId><version>2.12.3</version>
</dependency><dependency><groupId>com.fasterxml.jackson.dataformat</groupId><artifactId>jackson-dataformat-yaml</artifactId><version>2.12.3</version>
</dependency><dependency><groupId>org.redisson</groupId><artifactId>redisson-spring-boot-starter</artifactId><version>3.17.7</version>
</dependency>
redission 配置文件
將redission相關(guān)的配置抽到單獨(dú)的一個redission文件,放在application.yml的同級目錄
# 單節(jié)點配置
singleServerConfig:# 連接空閑超時,單位:毫秒idleConnectionTimeout: 10000# 連接超時,單位:毫秒connectTimeout: 10000# 命令等待超時,單位:毫秒timeout: 3000# 命令失敗重試次數(shù),如果嘗試達(dá)到 retryAttempts(命令失敗重試次數(shù)) 仍然不能將命令發(fā)送至某個指定的節(jié)點時,將拋出錯誤。# 如果嘗試在此限制之內(nèi)發(fā)送成功,則開始啟用 timeout(命令等待超時) 計時。retryAttempts: 3# 命令重試發(fā)送時間間隔,單位:毫秒retryInterval: 1500# 密碼,沒有設(shè)置密碼時,需要注釋掉,否則會報錯# password: redis.shbeta# 單個連接最大訂閱數(shù)量subscriptionsPerConnection: 5# 客戶端名稱clientName: "axin"# 節(jié)點地址address: "redis://192.168.102.111:6379"password: Cidneueopx# 發(fā)布和訂閱連接的最小空閑連接數(shù)subscriptionConnectionMinimumIdleSize: 1# 發(fā)布和訂閱連接池大小subscriptionConnectionPoolSize: 50# 最小空閑連接數(shù)connectionMinimumIdleSize: 32# 連接池大小connectionPoolSize: 64# 數(shù)據(jù)庫編號database: 1# DNS監(jiān)測時間間隔,單位:毫秒dnsMonitoringInterval: 5000
# 線程池數(shù)量,默認(rèn)值: 當(dāng)前處理核數(shù)量 * 2
threads: 0
# Netty線程池數(shù)量,默認(rèn)值: 當(dāng)前處理核數(shù)量 * 2
nettyThreads: 0
# 編碼
codec: !<org.redisson.codec.JsonJacksonCodec> {}
# 傳輸模式
transportMode : "NIO"# 配置看門狗的默認(rèn)超時時間為30s,這里改為 10s
lockWatchdogTimeout: 10000
application.yml文件
在該配置文件將redission配置文件引入
spring:redis:redisson:file: classpath:redisson.yml
啟動類
啟動類需要加上注解@ImportAutoConfiguration,如下所示
@SpringBootApplication
@ImportAutoConfiguration(RedissonAutoConfiguration.class)
public class EasyUserApplication {public static void main(String[] args) {SpringApplication.run(EasyUserApplication.class, args);}
}
三、JAVA 操作案例
字符串操作
Redisson 支持通過RBucket對象來操作字符串或?qū)ο?#xff08;對象需要實現(xiàn)序列化Serializable)數(shù)據(jù)結(jié)構(gòu),同時支持設(shè)置數(shù)據(jù)和有效期,例子如下
@Autowired
RedissonClient redissonClient;@GetMapping("/opString")
public void opString() {RBucket<String> strKey = redissonClient.getBucket("strKey");strKey.set("china");//表示10分鐘后刪除該鍵strKey.expire(Duration.ofMinutes(10));System.out.println(strKey.get());
}
哈希操作
通過獲取一個RMap 對象來進(jìn)行哈希數(shù)據(jù)結(jié)構(gòu)的操作,例子如下
@Autowired
RedissonClient redissonClient;@GetMapping("/opMap")
public void opMap() {//獲取一個map對象RMap<String, String> rMap = redissonClient.getMap("mapKey");rMap.put("wawa", "1212");//表示10分鐘后刪除該鍵rMap.expire(Duration.ofMinutes(10));System.out.println(rMap.get("wawa"));
}
列表操作
redission支持通過RList對象來操作列表數(shù)據(jù)結(jié)構(gòu),例子如下
@Autowired
RedissonClient redissonClient;@GetMapping("/opList")
public void opList() {RList<Integer> rList = redissonClient.getList("listKey");rList.add(100);rList.add(200);rList.add(300);//表示10分鐘后刪除該鍵rList.expire(Duration.ofMinutes(10));//讀取列表全部數(shù)據(jù),不刪除System.out.println(rList.readAll());
}
集合操作
Redisson支持通過RSet對象來操作集合,例子如下
@Autowired
RedissonClient redissonClient;@GetMapping("/opSet")
public void opSet() {RSet<Integer> rSet = redissonClient.getSet("setKey");rSet.add(100);rSet.add(200);rSet.add(300);//表示10分鐘后刪除該鍵rSet.expire(Duration.ofMinutes(10));System.out.println(rSet.readAll());log.info("========testSkyworking");
}
有序集合操作
Redisson 支持通過RSortedSet對象來操作有序集合數(shù)據(jù)結(jié)構(gòu),如果使用存儲對象,則實體對象必須先實現(xiàn)Comparable接口,并重寫比較邏輯,否則會報錯,例子如下
@Autowired
RedissonClient redissonClient;@GetMapping("/opSortSet")
public void opSortSet() {RSortedSet<Integer> rSortSet = redissonClient.getSortedSet("sortsetKey");rSortSet.add(300);rSortSet.add(200);rSortSet.add(100);System.out.println(rSortSet.readAll());
}
布隆過濾器操作
Redisson支持通過RBloomFilter對象來操作布隆過濾器,布隆過濾器可以用于檢索一個元素是否在一個集合中。它的優(yōu)點是空間效率和查詢時間都比一般的算法要好的多,缺點是有一定的誤識別率和刪除困難,例子如下
@Autowired
RedissonClient redissonClient;@GetMapping("/opBloomFilter")
public void opBloomFilter() {RBloomFilter rBloomFilter = redissonClient.getBloomFilter("BloomFilterKey");// 初始化預(yù)期插入的數(shù)據(jù)量為50000和期望誤差率為0.01rBloomFilter.tryInit(50000, 0.01);rBloomFilter.add(300);rBloomFilter.add(200);rBloomFilter.add(100);//表示100分鐘后刪除該鍵rBloomFilter.expire(Duration.ofMinutes(100));// 判斷是否存在System.out.println(rBloomFilter.contains(300));System.out.println(rBloomFilter.contains(700));
}
分布式鎖操作
其實很多時候,我們引入redission,最想使用的功能場景就是其分布式鎖,因為我們不需要去設(shè)置最大釋放鎖的時間,redission內(nèi)部有一個看門狗機(jī)制會主動去續(xù)期。
Redisson通過RLock對象來操作分布式鎖,例子如下
@Autowired
RedissonClient redissonClient;@GetMapping("/opLock")
public void opLock() {//獲取鎖對象實例final String lockKey = "mylock";RLock rLock = redissonClient.getLock(lockKey);try {//嘗試5秒內(nèi)獲取鎖 不設(shè)置釋放鎖的時間boolean res = rLock.tryLock(5L, TimeUnit.SECONDS);System.out.println("獲取鎖成功");if (res) {for (int i = 0; i < 10; i++) {System.out.println(i);Thread.sleep(1000L);}//成功獲得鎖,在這里處理業(yè)務(wù)System.out.println("處理業(yè)務(wù)");}} catch (Exception e) {System.out.println("獲取鎖失敗,失敗原因:" + e.getMessage());} finally {//無論如何, 最后都要解鎖rLock.unlock();}
}
四、源碼解析
分布式鎖的看門狗機(jī)制大致如下列流程圖所示
調(diào)用鏈路
org.redisson.RedissonLock#tryLock
=》org.redisson.RedissonLock#tryLockAsync
=》org.redisson.RedissonLock#tryAcquireAsync
我們來仔細(xì)看看org.redisson.RedissonLock#tryAcquireAsync
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {RFuture ttlRemainingFuture;//判斷鎖的持有時間是否由用戶自定義if (leaseTime > 0L) {ttlRemainingFuture = this.tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);} else {//當(dāng)用戶沒有自定義鎖占有時間時,默認(rèn)傳入 internalLockLeaseTime //private long lockWatchdogTimeout = 30 * 1000; 默認(rèn)30秒ttlRemainingFuture = this.tryLockInnerAsync(waitTime, this.internalLockLeaseTime, TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);}CompletionStage<Long> f = ttlRemainingFuture.thenApply((ttlRemaining) -> {if (ttlRemaining == null) {if (leaseTime > 0L) {//如果用戶傳入占用時間直接轉(zhuǎn)換,把默認(rèn)值internalLockLeaseTime 更新為用戶自定義的占有時間this.internalLockLeaseTime = unit.toMillis(leaseTime);} else {//這里就是觸發(fā)看門狗機(jī)制的方法,只有當(dāng) leaseTime == -1時才會觸發(fā)看門狗機(jī)制this.scheduleExpirationRenewal(threadId);}}return ttlRemaining;});return new CompletableFutureWrapper(f);
}
接著進(jìn)行scheduleExpirationRenewal
protected void scheduleExpirationRenewal(long threadId) {RedissonBaseLock.ExpirationEntry entry = new RedissonBaseLock.ExpirationEntry();//EXPIRATION_RENEWAL_MAP 是一個全局的靜態(tài)常量MapRedissonBaseLock.ExpirationEntry oldEntry = (RedissonBaseLock.ExpirationEntry)EXPIRATION_RENEWAL_MAP.putIfAbsent(this.getEntryName(), entry);if (oldEntry != null) {//oldEntry != null 表示該線程不是第一次觸發(fā)oldEntry.addThreadId(threadId);} else {//oldEntry == null 表示該線程是第一次觸發(fā)entry.addThreadId(threadId);try {//更新過期時間this.renewExpiration();} finally {if (Thread.currentThread().isInterrupted()) {this.cancelExpirationRenewal(threadId);}}}}
org.redisson.RedissonBaseLock#renewExpiration
中主要是更新時間的詳細(xì)邏輯
private void renewExpiration() {//獲取當(dāng)前線程的更新對象RedissonBaseLock.ExpirationEntry ee = (RedissonBaseLock.ExpirationEntry)EXPIRATION_RENEWAL_MAP.get(this.getEntryName());if (ee != null) {//創(chuàng)建了一個定時任務(wù)Timeout task = this.getServiceManager().newTimeout(new TimerTask() {public void run(Timeout timeout) throws Exception {RedissonBaseLock.ExpirationEntry ent = (RedissonBaseLock.ExpirationEntry)RedissonBaseLock.EXPIRATION_RENEWAL_MAP.get(RedissonBaseLock.this.getEntryName());if (ent != null) {Long threadId = ent.getFirstThreadId();if (threadId != null) {//異步更新過期時間 CompletionStage<Boolean> future = RedissonBaseLock.this.renewExpirationAsync(threadId);future.whenComplete((res, e) -> {if (e != null) {//如果出現(xiàn)異常,從map中刪除,直接返回RedissonBaseLock.log.error("Can't update lock {} expiration", RedissonBaseLock.this.getRawName(), e);RedissonBaseLock.EXPIRATION_RENEWAL_MAP.remove(RedissonBaseLock.this.getEntryName());} else {if (res) {//如果沒有報錯,就再次定時延期RedissonBaseLock.this.renewExpiration();} else {//否則取消定時RedissonBaseLock.this.cancelExpirationRenewal((Long)null);}}});}}}}, this.internalLockLeaseTime / 3L, TimeUnit.MILLISECONDS);ee.setTimeout(task);}
}