国产亚洲精品福利在线无卡一,国产精久久一区二区三区,亚洲精品无码国模,精品久久久久久无码专区不卡

當(dāng)前位置: 首頁(yè) > news >正文

iis網(wǎng)站壓縮百度搜索次數(shù)統(tǒng)計(jì)

iis網(wǎng)站壓縮,百度搜索次數(shù)統(tǒng)計(jì),仙居網(wǎng)站制作,藍(lán)潮網(wǎng)站建設(shè)redis優(yōu)化秒殺 1. 異步秒殺思路1.1 在redis存入庫(kù)存和訂單信息1.2 具體流程圖 2. 實(shí)現(xiàn)2.1 總結(jié) 3. Redis的消息隊(duì)列3.1 基于list實(shí)現(xiàn)消息隊(duì)列3.2 基于PubSub實(shí)現(xiàn)消息隊(duì)列3.3 基于stream實(shí)現(xiàn)消息隊(duì)列3.3.1 stream的單消費(fèi)模式3.3.2 stream的消費(fèi)者組模式 3.4 基于stream消息隊(duì)列…

redis優(yōu)化秒殺

  • 1. 異步秒殺思路
    • 1.1 在redis存入庫(kù)存和訂單信息
    • 1.2 具體流程圖
  • 2. 實(shí)現(xiàn)
    • 2.1 總結(jié)
  • 3. Redis的消息隊(duì)列
    • 3.1 基于list實(shí)現(xiàn)消息隊(duì)列
    • 3.2 基于PubSub實(shí)現(xiàn)消息隊(duì)列
    • 3.3 基于stream實(shí)現(xiàn)消息隊(duì)列
      • 3.3.1 stream的單消費(fèi)模式
      • 3.3.2 stream的消費(fèi)者組模式
    • 3.4 基于stream消息隊(duì)列實(shí)現(xiàn)異步秒殺

本文為學(xué)習(xí)redis時(shí)做的筆記,學(xué)習(xí)內(nèi)容來(lái)自 黑馬程序員Redis入門(mén)到實(shí)戰(zhàn)教程,該教程是循序漸進(jìn)的,所以不是一上來(lái)就講完最后的解決方案了,請(qǐng)耐心看完

1. 異步秒殺思路

image.png
這是我們?cè)镜拿霘⑺悸?#xff0c;其中的流程都要經(jīng)過(guò)mysql數(shù)據(jù)庫(kù),而mysql數(shù)據(jù)庫(kù)的并發(fā)性能不是很好,而且為了避免線程安全問(wèn)題,還加入了分布式鎖,所以整個(gè)流程的性能不好,現(xiàn)在我們要去優(yōu)化它。

我們可以把這整個(gè)流程比作一個(gè)餐館點(diǎn)菜的過(guò)程,前臺(tái)點(diǎn)菜并將菜品寫(xiě)在小票上,給顧客一份,后廚一份,后廚根據(jù)小票的內(nèi)容依次做菜

根據(jù)這個(gè)例子,我們的流程也可以分為兩個(gè)部分:

  1. 第一部分是判斷秒殺資格(判斷秒殺庫(kù)存和校驗(yàn)一人一單)
  2. 第二部分是減庫(kù)存創(chuàng)建訂單流程

兩個(gè)部分各自為一個(gè)線程,主線程判斷秒殺資格,如果用戶有資格,就開(kāi)啟一個(gè)獨(dú)立線程完成耗時(shí)較久的第二部分

image.png
同時(shí),我們也要優(yōu)化判斷秒殺資格的性能,將庫(kù)存和訂單存入redis中,如果判斷用戶有資格,先將優(yōu)惠券id,用戶id、訂單id保存在阻塞隊(duì)列中,并將訂單id返回給用戶,用戶可以通過(guò)這個(gè)訂單id完成支付操作,雖然此時(shí)還沒(méi)有創(chuàng)建訂單,但是在隊(duì)列中遲早會(huì)創(chuàng)建,之后開(kāi)啟獨(dú)立線程讀取隊(duì)列中的信息,完成下單。
現(xiàn)在的業(yè)務(wù)流程變成了在redis中判斷秒殺資格,保存信息在隊(duì)列中并返回訂單id,性能和吞吐量大大提高

1.1 在redis存入庫(kù)存和訂單信息

現(xiàn)在討論這兩個(gè)東西需要什么樣的結(jié)構(gòu)去存儲(chǔ)

image.png
因?yàn)閹?kù)存只是一個(gè)數(shù)值,我們使用redis中的string類型去存儲(chǔ),key是優(yōu)惠券的id,value是庫(kù)存

到時(shí)只需判斷庫(kù)存是否大于0,如果用戶有資格,庫(kù)存要減一,相當(dāng)于在redis中預(yù)減庫(kù)存

image.png
因?yàn)樾枰粋€(gè)優(yōu)惠券id(key)能存很多用戶id,而且用戶id不能重復(fù),所以訂單信息我們使用set結(jié)構(gòu)存儲(chǔ)

到時(shí)只需看value中是否有該用戶id來(lái)判斷該用戶是否下過(guò)單

1.2 具體流程圖

image.png
為了保證過(guò)程的原子性,需要用到lua腳本,通過(guò)執(zhí)行l(wèi)ua腳本后的結(jié)果來(lái)返回異常信息或者訂單id,這樣創(chuàng)建訂單的時(shí)效性就沒(méi)有那么強(qiáng)了,完全可以照著數(shù)據(jù)庫(kù)能承受的范圍去執(zhí)行寫(xiě)的操作,用戶只需要訂單id就能完成支付操作

2. 實(shí)現(xiàn)

需求:
image.png

  1. 新增秒殺優(yōu)惠券的同時(shí),將優(yōu)惠券信息保存在redis中

修改添加秒殺優(yōu)惠券的方法,將優(yōu)惠券信息保存在redis中

@Override
@Transactional
public void addSeckillVoucher(Voucher voucher) {
// 保存優(yōu)惠券
save(voucher);
// 保存秒殺信息
SeckillVoucher seckillVoucher = new SeckillVoucher();
seckillVoucher.setVoucherId(voucher.getId());
seckillVoucher.setStock(voucher.getStock());
seckillVoucher.setBeginTime(voucher.getBeginTime());
seckillVoucher.setEndTime(voucher.getEndTime());
seckillVoucherService.save(seckillVoucher);
// 保存優(yōu)惠券信息到redis
stringRedisTemplate.opsForValue().set(SECKILL_STOCK_KEY + voucher.getId(), voucher.getStock().toString());
}

添加一個(gè)秒殺優(yōu)惠券

{"title": "120元代金券","beginTime": "2023-11-01T01:11:11","actualValue": 12000,"shopId": 1,"subTitle": "周一至周五均可使用","payValue": 10000,"stock": 100,"endTime": "2024-11-01T01:11:11","type": 1,"rules": "全場(chǎng)通用\\n無(wú)需預(yù)約\\n可無(wú)限疊加\\不兌現(xiàn)、不找零\\n僅限堂食"
}

image.png

  1. 基于lua腳本,判斷秒殺庫(kù)存、一人一單、決定用戶是都搶購(gòu)成功

寫(xiě)lua腳本

  • 先寫(xiě)參數(shù)列表,再組合成key,最后寫(xiě)邏輯
-- 1. 參數(shù)列表
-- 1.1 優(yōu)惠券id
local voucherId = ARGV[1];
-- 1.2 用戶id
local userId = ARGV[2];-- 2. key
-- 2.1 庫(kù)存key
local stockKey = "seckill:stock" .. voucherId;
-- 2.2 訂單key
local orderKey = "seckill:order" .. voucherId;-- 3. 腳本業(yè)務(wù)
-- 判斷庫(kù)存是否充足
if (tonumber(redis.call('get',stockKey)) <= 0) then-- 庫(kù)存不足return 1
end
-- 判斷用戶是否下單
if (redis.call('sismember',orderKey,userId) == 1) then-- 用戶下過(guò)單return 2
end-- 扣減庫(kù)存
redis.call('incrby',stockKey,-1);
-- 下單,保存用戶id到set集合
redis.call('sadd',orderKey,userId);

重寫(xiě)秒殺邏輯

  • 調(diào)用lua腳本,根據(jù)返回的數(shù)字判斷,并返回訂單號(hào)
private static final DefaultRedisScript<Long> SECKILL_SCRIPT;//初始化腳本
static {SECKILL_SCRIPT = new DefaultRedisScript();//讀取文件位置,classpath就是resourceSECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));SECKILL_SCRIPT.setResultType(Long.class);
}@Override
public Result seckillVoucher(Long voucherId) {
// 獲取用戶id
Long userId = UserHolder.getUser().getId();
// 1. 調(diào)用lua腳本
//不需要傳key,所以傳個(gè)空集合
Long result = stringRedisTemplate.execute(SECKILL_SCRIPT,Collections.emptyList(),voucherId.toString(),userId.toString()
);
int intValue = result.intValue();//2. 判斷結(jié)果是否為0
if (intValue != 0) {return Result.fail(intValue == 1 ? "庫(kù)存不足" : "不要重復(fù)下單");
}
//3. 將優(yōu)惠券id,用戶id、訂單id保存在阻塞隊(duì)列中
//TODO 將優(yōu)惠券id,用戶id、訂單id保存在阻塞隊(duì)列中
long orderId = redisIdWorker.nextId("order");//4. 返回訂單id
return Result.ok(orderId);
}
  1. 如果搶購(gòu)成功,將優(yōu)惠券id和用戶id封裝后存入阻塞隊(duì)列
//阻塞隊(duì)列
private BlockingQueue<VoucherOrder> orderTasks = new ArrayBlockingQueue<>(1024 * 1024);

阻塞隊(duì)列BlockingQueue,當(dāng)一個(gè)線程嘗試從一個(gè)阻塞隊(duì)列中獲取元素,如果隊(duì)列中沒(méi)有元素,這個(gè)線程就會(huì)被阻塞,當(dāng)隊(duì)列中有元素時(shí)就會(huì)被喚醒并獲取元素

seckillVoucher中添加:

//3.2 放入阻塞隊(duì)列中
orderTasks.add(order);
  1. 開(kāi)啟線程任務(wù),不斷從阻塞隊(duì)列中獲取信息,實(shí)現(xiàn)異步下單功能
  • 建立線程池,當(dāng)這個(gè)類初始化完線程就去執(zhí)行VoucherOrderHandler類中的run方法,不斷獲取隊(duì)列中的訂單信息,去創(chuàng)建訂單

@PostConstruct 注解效果是當(dāng)前類初始化完就去執(zhí)行

//線程池
private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();
//類加載完就執(zhí)行
@PostConstruct
private void init() {SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
}class VoucherOrderHandler implements Runnable {@Overridepublic void run() {while (true) {try {//1.獲取隊(duì)列中的訂單信息VoucherOrder voucherOrder = orderTasks.take();//2.創(chuàng)建訂單handleVoucherOrder(voucherOrder);} catch (Exception e) {log.error("訂單異常:" + e);}}}
}
  • 執(zhí)行handleVoucherOrder方法,獲取鎖,最后執(zhí)行創(chuàng)建訂單方法

注1:在這個(gè)方法中不能使用threadlocal去獲取用戶信息,因?yàn)槭钱惒较聠?#xff0c;這是一個(gè)子線程,不是主線程,沒(méi)有用戶的信息,所以從訂單中獲取用戶id

注2:與上同理,在這個(gè)子線程中也無(wú)法獲取到代理對(duì)象,將代理對(duì)象設(shè)置為成員變量,再?gòu)闹骶€程中獲取到代理對(duì)象

private void handleVoucherOrder(VoucherOrder voucherOrder) {
Long userId = voucherOrder.getUserId();
//創(chuàng)建鎖對(duì)象
RLock redisLock = redissonClient.getLock("lock:order:" + userId);
//獲取鎖方法
boolean lockFlag = redisLock.tryLock();
//判斷是否獲取成功
if (!lockFlag) {log.error("不要重復(fù)下單");return;
}//事務(wù)方法執(zhí)行起來(lái)可能會(huì)出現(xiàn)異常,但最后都要釋放鎖,所以try-catch起來(lái)
try {proxy.createVoucherOrder(voucherOrder);
} finally {redisLock.unlock();
}
}
  • 創(chuàng)建訂單邏輯
@Transactional
public void createVoucherOrder(VoucherOrder voucherOrder) {
//5.一人一單
Long userId = UserHolder.getUser().getId();
//查詢
Integer count = query().eq("user_id", userId).eq("voucher_id", voucherOrder.getVoucherId()).count();
//判斷訂單是否存在
if (count > 0) {log.error("用戶已經(jīng)購(gòu)買(mǎi)過(guò)一次");return;
}//6.扣減庫(kù)存
boolean success = seckillVoucherService.update().setSql("stock = stock-1")
.eq("voucher_id", voucherOrder.getVoucherId())
.gt("stock", 0)
.update();
if (!success) {log.error("不要重復(fù)下單");return;
}
save(voucherOrder);
}

2.1 總結(jié)

整個(gè)流程是:

  • 主線程:

發(fā)送請(qǐng)求進(jìn)入seckillVoucher方法,先判斷用戶是否有秒殺的資格(通過(guò)lua腳本),然后創(chuàng)建訂單(將用戶id、優(yōu)惠券id、訂單id放進(jìn)訂單里),將訂單放入阻塞隊(duì)列

  • 子線程:

在類初始化的時(shí)候去執(zhí)行線程池,線程池的任務(wù)是是不斷地從隊(duì)列中獲取訂單信息,然后去創(chuàng)建訂單。
創(chuàng)建訂單先獲取鎖,再判斷一人一單,減庫(kù)存,最后執(zhí)行添加訂單方法

完整代碼:


/*** <p>* 服務(wù)實(shí)現(xiàn)類* </p>** @author 虎哥* @since 2021-12-22*/
@Slf4j
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {@Autowiredprivate ISeckillVoucherService seckillVoucherService;@Autowiredprivate RedisIdWorker redisIdWorker;@Autowiredprivate StringRedisTemplate stringRedisTemplate;@Autowiredprivate RedissonClient redissonClient;private IVoucherOrderService proxy;private static final DefaultRedisScript<Long> SECKILL_SCRIPT;//初始化腳本static {SECKILL_SCRIPT = new DefaultRedisScript();//讀取文件位置,classpath就是resourceSECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));SECKILL_SCRIPT.setResultType(Long.class);}//阻塞隊(duì)列private BlockingQueue<VoucherOrder> orderTasks = new ArrayBlockingQueue<>(1024 * 1024);//線程池private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();//類加載完就執(zhí)行@PostConstructprivate void init() {SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());}class VoucherOrderHandler implements Runnable {@Overridepublic void run() {while (true) {try {//1.獲取隊(duì)列中的訂單信息VoucherOrder voucherOrder = orderTasks.take();//2.創(chuàng)建訂單handleVoucherOrder(voucherOrder);} catch (Exception e) {log.error("訂單異常:" + e);}}}}private void handleVoucherOrder(VoucherOrder voucherOrder) {Long userId = voucherOrder.getUserId();//創(chuàng)建鎖對(duì)象RLock redisLock = redissonClient.getLock("lock:order:" + userId);//獲取鎖方法boolean lockFlag = redisLock.tryLock();//判斷是否獲取成功if (!lockFlag) {log.error("不要重復(fù)下單");return;}//事務(wù)方法執(zhí)行起來(lái)可能會(huì)出現(xiàn)異常,但最后都要釋放鎖,所以try-catch起來(lái)try {proxy.createVoucherOrder(voucherOrder);} finally {redisLock.unlock();}}@Overridepublic Result seckillVoucher(Long voucherId) {// 獲取用戶idLong userId = UserHolder.getUser().getId();// 1. 調(diào)用lua腳本//不需要傳key,所以傳個(gè)空集合Long result = stringRedisTemplate.execute(SECKILL_SCRIPT,Collections.emptyList(),voucherId.toString(), userId.toString());int intValue = result.intValue();//2. 判斷結(jié)果是否為0if (intValue != 0) {return Result.fail(intValue == 1 ? "庫(kù)存不足" : "不要重復(fù)下單");}//3. 將優(yōu)惠券id,用戶id、訂單id保存在阻塞隊(duì)列中//3.1.創(chuàng)建訂單VoucherOrder order = new VoucherOrder();//3.1.1 訂單idlong orderId = redisIdWorker.nextId("order");order.setId(orderId);//3.1.2 用戶idorder.setUserId(userId);//3.1.3 優(yōu)惠券idorder.setVoucherId(voucherId);//3.2 放入阻塞隊(duì)列中orderTasks.add(order);//3.3 獲取當(dāng)前的代理對(duì)象(事物)proxy = (IVoucherOrderService) AopContext.currentProxy();//4. 返回訂單idreturn Result.ok(orderId);}@Transactionalpublic void createVoucherOrder(VoucherOrder voucherOrder) {//5.一人一單Long userId = voucherOrder.getUserId();//查詢Integer count = query().eq("user_id", userId).eq("voucher_id", voucherOrder.getVoucherId()).count();//判斷訂單是否存在if (count > 0) {log.error("用戶已經(jīng)購(gòu)買(mǎi)過(guò)一次");return;}//6.扣減庫(kù)存boolean success = seckillVoucherService.update().setSql("stock = stock-1").eq("voucher_id", voucherOrder.getVoucherId()).gt("stock", 0).update();if (!success) {log.error("不要重復(fù)下單");return;}save(voucherOrder);}}

測(cè)試:
image.png

image.png
image.png

現(xiàn)在我們的阻塞隊(duì)列使用的是jvm的內(nèi)存,將來(lái)有無(wú)數(shù)的請(qǐng)求進(jìn)來(lái),內(nèi)存可能會(huì)滿,而且如果服務(wù)重啟或者宕機(jī),訂單信息就消失了,可能會(huì)出現(xiàn)一些問(wèn)題,接下來(lái)學(xué)習(xí)一下redis的消息隊(duì)列

3. Redis的消息隊(duì)列

消息隊(duì)列,字面意思就是存放消息的隊(duì)列,最簡(jiǎn)單的消息隊(duì)列包含三個(gè)角色

  • 生產(chǎn)者:發(fā)送消息到消息隊(duì)列
  • 消息隊(duì)列:存儲(chǔ)和管理消息,也稱為消息代理
  • 消費(fèi)者:從消息隊(duì)列獲取消息并處理消息

image.png

市面上有很多的消息隊(duì)列的產(chǎn)品,但是搭建他們也是需要成本的,既然我們已經(jīng)搭建起了redis集群,為了減少成本,可以使用redis提供的三種不同的方式

  • list結(jié)構(gòu):基于list結(jié)構(gòu)模擬消息隊(duì)列
  • PubSub:訂閱發(fā)布,基本的點(diǎn)對(duì)點(diǎn)消息模型
  • Stream:比較完善的消息隊(duì)列模型

3.1 基于list實(shí)現(xiàn)消息隊(duì)列

Redis的list數(shù)據(jù)結(jié)構(gòu)是一個(gè)雙向鏈表,很容易模擬出隊(duì)列效果。
隊(duì)列是入口和出口不在一邊,因此我們可以利用: LPUSH 結(jié)合 RPOP、或者 RPUSH 結(jié)合LPOP來(lái)實(shí)現(xiàn)。
不過(guò)要注意的是,當(dāng)隊(duì)列中沒(méi)有消息時(shí)RPOP或LPOP操作會(huì)返回null,并不像VM的阻塞隊(duì)列那樣會(huì)阻塞并等待消息。因此這里應(yīng)該使用BRPOP或者BLPOP來(lái)實(shí)現(xiàn)阻塞效果。
image.png

優(yōu)點(diǎn):

  • 利用Redis存儲(chǔ),不受限于JVM內(nèi)存上限
  • 基于Redis的持久化機(jī)制,數(shù)據(jù)安全性有保證
  • 可以滿足消息有序性

缺點(diǎn):

  • 無(wú)法避免消息丟失
  • 只支持單消費(fèi)者

3.2 基于PubSub實(shí)現(xiàn)消息隊(duì)列

Pubsub(發(fā)布訂閱)是Redis2.0版本引入的消息傳遞模型。顧名思義,消費(fèi)者可以訂閱一個(gè)或多個(gè)channel,生產(chǎn)者向?qū)?yīng)channel發(fā)送消息后,所有訂閱者都能收到相關(guān)消息

  • SUBSCRIBE channel[channell : 訂閱一個(gè)或多個(gè)頻道
  • PUBLISH channel msg : 向一個(gè)頻道發(fā)送消息
  • PSUBSCRIBE pattern[pattern]: 訂與pattern格式匹配的所有頻道

image.png

優(yōu)點(diǎn):

  • 采用發(fā)布訂閱模型,支持多生產(chǎn)、多消費(fèi)

缺點(diǎn):

  • 不支持?jǐn)?shù)據(jù)持久化
  • 無(wú)法避免消息丟失
  • 消息堆積有上限,超出時(shí)數(shù)據(jù)丟失

3.3 基于stream實(shí)現(xiàn)消息隊(duì)列

3.3.1 stream的單消費(fèi)模式

Stream是Redis5.0引入的一種新數(shù)據(jù)類型,可以實(shí)現(xiàn)一個(gè)功能非常完善的消息隊(duì)列

  1. 發(fā)送消息的命令:

image.png

  • 中間的內(nèi)容都是可選。紅框標(biāo)注的基本可以不用管;綠框用來(lái)設(shè)置消息隊(duì)列的最大消息數(shù)量;黃框用來(lái)設(shè)置消息的id,*代表redis自動(dòng)生成;藍(lán)框是隊(duì)列中的消息內(nèi)容

示例:
xadd users * name zhuyi love lvhan
返回id
image.png

  1. 查看隊(duì)列中消息數(shù)量的命令

xlen key
示例:
image.png

  1. 讀取隊(duì)列中消息的命令

image.png

  • [COUNT count] 是每次讀取消息的最大數(shù)量
  • [BLOCK milliseconds] 當(dāng)沒(méi)消息時(shí),是否阻塞及阻塞時(shí)長(zhǎng)
  • STREAMS key 要從哪個(gè)隊(duì)列開(kāi)始讀,key就是隊(duì)列名
  • ID 起始id,只返回大于該id的消息。0:代表從第一個(gè)消息開(kāi)始,$:代表從最新的消息開(kāi)始

示例:
從第一個(gè)消息開(kāi)始讀
image.png
因?yàn)橄⒁呀?jīng)讀過(guò),沒(méi)有最新的消息,所以讀不出來(lái)
image.png

特點(diǎn):

  • 消息可回溯
  • 一個(gè)消息可以被多個(gè)消費(fèi)者讀取
  • 可以阻塞讀取
  • 有消息漏讀的風(fēng)險(xiǎn)。使用$,在讀一條消息的時(shí)候,有超過(guò)一條以上的消息進(jìn)入隊(duì)列,只會(huì)讀取最后一條

3.3.2 stream的消費(fèi)者組模式

消費(fèi)者組,將多個(gè)消費(fèi)者劃分到一個(gè)組中,監(jiān)聽(tīng)同一個(gè)隊(duì)列,有以下好處:

  • 消息分流:隊(duì)列中的消息會(huì)分流給組內(nèi)的不同消費(fèi)者,而不是重復(fù)消費(fèi),從而加快消息處理的速度。如果想讓一個(gè)消息被多個(gè)消費(fèi)者消費(fèi),可以多加幾個(gè)組
  • 消息標(biāo)識(shí):消費(fèi)者組會(huì)維護(hù)一個(gè)標(biāo)識(shí),記錄最后一個(gè)被處理的消息,哪怕消費(fèi)者宕機(jī)重啟,還會(huì)從標(biāo)識(shí)之后讀取消息,確保每一個(gè)消息都會(huì)被消費(fèi)
  • 消息確認(rèn):消費(fèi)者獲取消息后,消息處于pending(待處理)狀態(tài),并存入一個(gè)pending-list。當(dāng)處理完成后需要通過(guò)XACK來(lái)確認(rèn)消息,標(biāo)記消息為已處理,才會(huì)從pending-list移除,保證所有的消息只要被獲取到了,就能至少被消費(fèi)一次
  1. 創(chuàng)建消費(fèi)者組:

image.png

  • key:隊(duì)列名稱
  • groupName:消費(fèi)者組名稱
  • ID:起始ID標(biāo)識(shí),$代表隊(duì)列中最新的消息,0則代表隊(duì)列中第一個(gè)消息
  • MKSTREAM:隊(duì)列不存在時(shí)自動(dòng)創(chuàng)建隊(duì)列

其他常見(jiàn)命令:
image.png

示例:
image.png

  1. 從消費(fèi)者組讀取消息

image.png

  • group:消費(fèi)組名稱
  • consumer:消費(fèi)者名稱,如果消費(fèi)者不存在,會(huì)自動(dòng)創(chuàng)建一個(gè)消費(fèi)者
  • count:本次查詢的最大數(shù)量
  • BLOCK millisecond:當(dāng)沒(méi)有消息時(shí)最長(zhǎng)等待時(shí)間
  • NOACK:無(wú)需手動(dòng)ACK,獲取到消息后自動(dòng)確認(rèn)
  • STREAMS key:指定隊(duì)列名稱
  • ID:獲取消息的起始id:1. “>”:從下一個(gè)未消費(fèi)的消息開(kāi)始。2. 其他:根據(jù)指定id從pending-list中獲取已消費(fèi)但未確認(rèn)的消息,例如0,是從pending-list中的第一個(gè)消息開(kāi)始

示例:
image.png

  1. 確認(rèn)消息

我們獲取到消息消費(fèi)后,一定要確認(rèn)它,把他從pending-list中移除
image.png

  • key:隊(duì)列名稱
  • group:消費(fèi)組名稱
  • ID:獲取消息的起始id:1. “>”:從下一個(gè)未消費(fèi)的消息開(kāi)始。2. 其他:根據(jù)指定id從pending-list中獲取已消費(fèi)但未確認(rèn)的消息,例如0,是從pending-list中的第一個(gè)消息開(kāi)始

示例:
image.png

消費(fèi)者監(jiān)聽(tīng)消息的基本思路:
image.png
特點(diǎn):

  • 消息可回溯
  • 可以多消費(fèi)者爭(zhēng)搶消息,加快消費(fèi)速度
  • 可以阻塞讀取
  • 沒(méi)有消息漏讀的風(fēng)險(xiǎn)
  • 有消息確認(rèn)機(jī)制,保證消息至少被消費(fèi)一次

如果你的公司業(yè)務(wù)比較龐大,對(duì)消息隊(duì)列要求比較嚴(yán)格,還是建議使用更專業(yè)的消息隊(duì)列,如rabbitmq等,但如果是中小型公司,對(duì)消息隊(duì)列需要沒(méi)那么大,redis的stream就已經(jīng)能滿足需求了

3.4 基于stream消息隊(duì)列實(shí)現(xiàn)異步秒殺

image.png

  1. 創(chuàng)建一個(gè)Stream類型的消息隊(duì)列,定義為stream.orders,這里直接在客戶端完成了
XGROUP CREATE stream.orders g1 0 mkstream
  1. 修改之前的秒殺下單lua腳本,在認(rèn)定有搶購(gòu)資格后,直接向stream.orders中添加消息,內(nèi)容包括voucherId、userId、orderId

修改lua腳本:
主要添加了一個(gè)訂單id的參數(shù),在業(yè)務(wù)的最后向隊(duì)列發(fā)送消息

-- 1. 參數(shù)列表
-- 1.1 優(yōu)惠券id
local voucherId = ARGV[1]
-- 1.2 用戶id
local userId = ARGV[2]
-- 1.3 訂單id
local orderId = ARGV[3]-- 2. key
-- 2.1 庫(kù)存key
local stockKey = 'seckill:stock:' .. voucherId
-- 2.2 訂單key
local orderKey = 'seckill:order:' .. voucherId-- 3. 腳本業(yè)務(wù)
-- 判斷庫(kù)存是否充足
if (tonumber(redis.call('get',stockKey)) <= 0) then-- 庫(kù)存不足return 1
end
-- 判斷用戶是否下單
if (redis.call('sismember',orderKey,userId) == 1) then-- 用戶下過(guò)單return 2
end-- 扣減庫(kù)存
redis.call('incrby',stockKey,-1);
-- 下單,保存用戶id到set集合
redis.call('sadd',orderKey,userId);
-- 發(fā)送消息
redis.call('xadd','stream.orders','*','voucherId',voucherId,'userId',userId,'id',orderId)
return 0

修改一下調(diào)用lua腳本的邏輯:
新添加一個(gè)訂單id的參數(shù)

// 1. 調(diào)用lua腳本
//不需要傳key,所以傳個(gè)空集合
Long result = stringRedisTemplate.execute(SECKILL_SCRIPT,Collections.emptyList(),voucherId.toString(), userId.toString(),String.valueOf(orderId)
);
int intValue = result.intValue();
  1. 項(xiàng)目啟動(dòng)時(shí),開(kāi)啟一個(gè)線程任務(wù),嘗試獲取stream.orders中的消息,完成下單

在原有的線程任務(wù)邏輯上修改,從消息隊(duì)列中獲取訂單信息,判斷一下訂單信息是否為空,如果為空,說(shuō)明沒(méi)有消息,繼續(xù)下一次循環(huán),如果有,去解析數(shù)據(jù),拿到訂單,通過(guò)以前寫(xiě)過(guò)的createVoucherOrder()方法來(lái)創(chuàng)建訂單,最后一定要確認(rèn)消息,將消息從pending-list中移除。
如果在執(zhí)行時(shí)出現(xiàn)了錯(cuò)誤或者服務(wù)宕機(jī),通過(guò)handlePendingList()方法處理pending-list中已消費(fèi)但未確認(rèn)的訂單,這里如果出現(xiàn)異常,就不用再調(diào)用這個(gè)方法了

private class VoucherOrderHandler implements Runnable {@Overridepublic void run() {while (true) {try {// 1.獲取消息隊(duì)列中的訂單信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 >List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),StreamOffset.create("stream.orders", ReadOffset.lastConsumed()));// 2.判斷訂單信息是否為空if (list == null || list.isEmpty()) {// 如果為null,說(shuō)明沒(méi)有消息,繼續(xù)下一次循環(huán)continue;}// 解析數(shù)據(jù)MapRecord<String, Object, Object> record = list.get(0);Map<Object, Object> value = record.getValue();VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);// 3.創(chuàng)建訂單handleVoucherOrder(voucherOrder);// 4.確認(rèn)消息 XACKstringRedisTemplate.opsForStream().acknowledge("stream.orders", "g1", record.getId());} catch (Exception e) {log.error("處理訂單異常", e);handlePendingList();}}}private void handlePendingList() {while (true) {try {// 1.獲取pending-list中的訂單信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 0List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1),StreamOffset.create("stream.orders", ReadOffset.from("0")));// 2.判斷訂單信息是否為空if (list == null || list.isEmpty()) {// 如果為null,說(shuō)明沒(méi)有異常消息,結(jié)束循環(huán)break;}// 解析數(shù)據(jù)MapRecord<String, Object, Object> record = list.get(0);Map<Object, Object> value = record.getValue();VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);// 3.創(chuàng)建訂單handleVoucherOrder(voucherOrder);// 4.確認(rèn)消息 XACKstringRedisTemplate.opsForStream().acknowledge("stream.orders", "g1", record.getId());} catch (Exception e) {log.error("處理訂單異常", e);}}}
}

XACK確認(rèn)消息:
stringRedisTemplate.opsForStream().acknowledge("s1", "g1", record.getId());

http://m.aloenet.com.cn/news/33048.html

相關(guān)文章:

  • 東營(yíng)建設(shè)信息網(wǎng)公示專業(yè)整站優(yōu)化
  • php模板網(wǎng)站營(yíng)銷(xiāo)推廣計(jì)劃怎么寫(xiě)
  • ps怎么網(wǎng)站首頁(yè)網(wǎng)店推廣運(yùn)營(yíng)
  • 一浪網(wǎng)站建設(shè)競(jìng)價(jià)網(wǎng)站
  • 網(wǎng)站優(yōu)化吧推廣發(fā)布任務(wù)平臺(tái)app下載
  • 做旅游網(wǎng)站包括哪些欄目成都百度業(yè)務(wù)員電話
  • 大數(shù)據(jù)開(kāi)發(fā)平臺(tái)seo診斷優(yōu)化方案
  • 芯片設(shè)計(jì)公司重慶 seo
  • 免備案空間哪家好寧波seo服務(wù)推廣
  • 如今做哪些網(wǎng)站致富手機(jī)營(yíng)銷(xiāo)推廣方案
  • 湖北廣域建設(shè)管理有限公司網(wǎng)站北京seo優(yōu)化哪家公司好
  • 廣西十佳旅游景區(qū)網(wǎng)站人多怎么優(yōu)化
  • 網(wǎng)站制作公司珠?;ヂ?lián)網(wǎng)輿情監(jiān)控系統(tǒng)
  • 3合1網(wǎng)站建設(shè)價(jià)格廣告投放平臺(tái)都有哪些
  • 建站公司服務(wù)費(fèi)包括哪些查詢網(wǎng)站備案信息
  • qq空間是用什么做的網(wǎng)站成都網(wǎng)絡(luò)推廣外包公司哪家好
  • 網(wǎng)站策劃的步驟百度信息流開(kāi)戶多少錢(qián)
  • 帝國(guó)cms做笑話網(wǎng)站宣傳渠道和宣傳方式有哪些
  • 湘潭關(guān)鍵詞優(yōu)化報(bào)價(jià)成都網(wǎng)站建設(shè)方案優(yōu)化
  • 廣州專業(yè)網(wǎng)站建設(shè)seo是啥意思
  • 徐州建設(shè)安全監(jiān)督網(wǎng)站搜索引擎seo
  • 做軟件的公司網(wǎng)站有哪些百度站長(zhǎng)工具
  • 江蘇省工程建設(shè)信息網(wǎng)連云港seo優(yōu)化公司
  • 做網(wǎng)站改變圖片位置百度一下你就知道官網(wǎng)網(wǎng)址
  • frontpage做的網(wǎng)站好不好關(guān)鍵詞在線聽(tīng)免費(fèi)
  • 空間租用網(wǎng)站模板情感營(yíng)銷(xiāo)的十大案例
  • 個(gè)人交互式網(wǎng)站備案鄭州百度推廣公司電話
  • c語(yǔ)言 做網(wǎng)站瀏覽器網(wǎng)站大全
  • 深圳網(wǎng)站建設(shè)微信開(kāi)發(fā)長(zhǎng)沙做搜索引擎的公司
  • 廣州做網(wǎng)站技術(shù)seo公司培訓(xùn)課程