網(wǎng)站開發(fā)軟件系統(tǒng)教程seo推廣排名網(wǎng)站
媒資管理模塊 - 視頻處理
文章目錄
- 媒資管理模塊 - 視頻處理
- 一、視頻轉(zhuǎn)碼
- 1.1 視頻轉(zhuǎn)碼介紹
- 1.2 FFmpeg 基本使用
- 1.2.1 下載安裝配置
- 1.2.2 轉(zhuǎn)碼測試
- 1.3 工具類
- 1.3.1 VideoUtil
- 1.3.2 Mp4VideoUtil
- 1.3.3 測試工具類
- 二、分布式任務處理
- 2.1 分布式任務調(diào)度
- 2.2 XXL-JOB 配置執(zhí)行器 中間件
- 2.3 搭建XXL-JOB
- 2.3.1 調(diào)度中心
- 2.3.2 執(zhí)行器
- 2.3.3 執(zhí)行任務
- 2.4 XXL-JOB 高級配置參數(shù)
- 2.5 分片廣播
- 2.5.1 分片廣播事例
- 三、視頻處理
- 3.1 技術方案
- 3.1.1 作業(yè)分片方案
- 3.1.2 保證任務不重復執(zhí)行
- 3.1.3 視頻處理方案
一、視頻轉(zhuǎn)碼
1.1 視頻轉(zhuǎn)碼介紹
視頻轉(zhuǎn)碼是指的對視頻文件的編碼格式進行轉(zhuǎn)換
視頻上傳成功需要對視頻的格式進行轉(zhuǎn)碼處理,比如:avi轉(zhuǎn)成mp4
一般做文件存儲的服務都需要對文件進行處理,例如對視頻進行轉(zhuǎn)碼處理,可能由于文件量較大需要使用多線程等技術進行高效處理
文件格式:是指.mp4、.avi、.rmvb等 這些不同擴展名的視頻文件的文件格式
視頻文件的內(nèi)容主要包括視頻和音頻,其文件格式是按照一 定的編碼格式去編碼,并且按照該文件所規(guī)定的封裝格式將視頻、音頻、字幕等信息封裝在一起,播放器會根據(jù)它們的封裝格式去提取出編碼,然后由播放器解碼,最終播放音視頻
音視頻編碼格式:通過音視頻的壓縮技術,將視頻格式轉(zhuǎn)換成另一種視頻格式,通過視頻編碼實現(xiàn)流媒體的傳輸
目前最常用的編碼標準是視頻H.264,音頻AAC
比如:
一個.avi的視頻文件原來的編碼是a,通過編碼后編碼格式變?yōu)閎,
音頻原來為c,通過編碼后變?yōu)閐
1.2 FFmpeg 基本使用
1.2.1 下載安裝配置
我們Java程序員只需要調(diào)用流媒體程序員寫的工具類即可完成對視頻的操作,這個工具可能是c或c++寫的
流媒體程序員:專門做視頻處理類的東西
FFmpeg開源工具被許多開源項目采用,QQ影音、暴風影音、VLC等
下載鏈接:https://www.ffmpeg.org/download.html#build-windows
最終下載之后三個exe文件
查看是否安裝成功
ffmpeg -v
也可以把ffmpeg.exe文件配置在path環(huán)境變量中
現(xiàn)在我們就可以在任意一個位置執(zhí)行命令了
1.2.2 轉(zhuǎn)碼測試
將avi文件轉(zhuǎn)換成mp4文件
ffmpeg.exe -i avi測試視頻.avi 1.mp4
轉(zhuǎn)成mp3
ffmpeg -i xxx.avi xxx1.mp3
轉(zhuǎn)成gif
ffmpeg -i xxx.avi xxx1.gif
1.3 工具類
在xuecheng-plus-base工程添加此工具類
這份工具類其實就是流媒體程序員進行提供的
其實我們需要的是怎么調(diào)用ffmpeg.exe文件
1.3.1 VideoUtil
/*** 此文件作為視頻文件處理父類,提供:* 1、查看視頻時長* 2、校驗兩個視頻的時長是否相等**/
public class VideoUtil {String ffmpeg_path = "D:\\Program Files\\ffmpeg-20180227-fa0c9d6-win64-static\\bin\\ffmpeg.exe";//ffmpeg的安裝位置public VideoUtil(String ffmpeg_path){this.ffmpeg_path = ffmpeg_path;}//檢查視頻時間是否一致public Boolean check_video_time(String source,String target) {String source_time = get_video_time(source);//取出時分秒source_time = source_time.substring(0,source_time.lastIndexOf("."));String target_time = get_video_time(target);//取出時分秒target_time = target_time.substring(0,target_time.lastIndexOf("."));if(source_time == null || target_time == null){return false;}if(source_time.equals(target_time)){return true;}return false;}//獲取視頻時間(時:分:秒:毫秒)public String get_video_time(String video_path) {/*ffmpeg -i lucene.mp4*/List<String> commend = new ArrayList<String>();commend.add(ffmpeg_path);commend.add("-i");commend.add(video_path);try {ProcessBuilder builder = new ProcessBuilder();builder.command(commend);//將標準輸入流和錯誤輸入流合并,通過標準輸入流程讀取信息builder.redirectErrorStream(true);Process p = builder.start();String outstring = waitFor(p);System.out.println(outstring);int start = outstring.trim().indexOf("Duration: ");if(start>=0){int end = outstring.trim().indexOf(", start:");if(end>=0){String time = outstring.substring(start+10,end);if(time!=null && !time.equals("")){return time.trim();}}}} catch (Exception ex) {ex.printStackTrace();}return null;}public String waitFor(Process p) {InputStream in = null;InputStream error = null;String result = "error";int exitValue = -1;StringBuffer outputString = new StringBuffer();try {in = p.getInputStream();error = p.getErrorStream();boolean finished = false;int maxRetry = 600;//每次休眠1秒,最長執(zhí)行時間10分種int retry = 0;while (!finished) {if (retry > maxRetry) {return "error";}try {while (in.available() > 0) {Character c = new Character((char) in.read());outputString.append(c);System.out.print(c);}while (error.available() > 0) {Character c = new Character((char) in.read());outputString.append(c);System.out.print(c);}//進程未結束時調(diào)用exitValue將拋出異常exitValue = p.exitValue();finished = true;} catch (IllegalThreadStateException e) {Thread.currentThread().sleep(1000);//休眠1秒retry++;}}} catch (Exception e) {e.printStackTrace();} finally {if (in != null) {try {in.close();} catch (IOException e) {System.out.println(e.getMessage());}}}return outputString.toString();}public static void main(String[] args) throws IOException {String ffmpeg_path = "D:\\Program Files\\ffmpeg-20180227-fa0c9d6-win64-static\\bin\\ffmpeg.exe";//ffmpeg的安裝位置VideoUtil videoUtil = new VideoUtil(ffmpeg_path);String video_time = videoUtil.get_video_time("E:\\ffmpeg_test\\1.avi");System.out.println(video_time);}
}
1.3.2 Mp4VideoUtil
public class Mp4VideoUtil extends VideoUtil {String ffmpeg_path = "D:\\Program Files\\ffmpeg-20180227-fa0c9d6-win64-static\\bin\\ffmpeg.exe";//ffmpeg的安裝位置String video_path = "D:\\BaiduNetdiskDownload\\test1.avi";String mp4_name = "test1.mp4";String mp4folder_path = "D:/BaiduNetdiskDownload/Movies/test1/";public Mp4VideoUtil(String ffmpeg_path, String video_path, String mp4_name, String mp4folder_path){super(ffmpeg_path);this.ffmpeg_path = ffmpeg_path;this.video_path = video_path;this.mp4_name = mp4_name;this.mp4folder_path = mp4folder_path;}//清除已生成的mp4private void clear_mp4(String mp4_path){//刪除原來已經(jīng)生成的m3u8及ts文件File mp4File = new File(mp4_path);if(mp4File.exists() && mp4File.isFile()){mp4File.delete();}}/*** 視頻編碼,生成mp4文件* @return 成功返回success,失敗返回控制臺日志*/public String generateMp4(){//清除已生成的mp4
// clear_mp4(mp4folder_path+mp4_name);clear_mp4(mp4folder_path);/*ffmpeg.exe -i lucene.avi -c:v libx264 -s 1280x720 -pix_fmt yuv420p -b:a 63k -b:v 753k -r 18 .\lucene.mp4*/List<String> commend = new ArrayList<String>();//commend.add("D:\\Program Files\\ffmpeg-20180227-fa0c9d6-win64-static\\bin\\ffmpeg.exe");commend.add(ffmpeg_path);commend.add("-i");
// commend.add("D:\\BaiduNetdiskDownload\\test1.avi");commend.add(video_path);commend.add("-c:v");commend.add("libx264");commend.add("-y");//覆蓋輸出文件commend.add("-s");commend.add("1280x720");commend.add("-pix_fmt");commend.add("yuv420p");commend.add("-b:a");commend.add("63k");commend.add("-b:v");commend.add("753k");commend.add("-r");commend.add("18");
// commend.add(mp4folder_path + mp4_name );commend.add(mp4folder_path );String outstring = null;try {ProcessBuilder builder = new ProcessBuilder();builder.command(commend);//將標準輸入流和錯誤輸入流合并,通過標準輸入流程讀取信息builder.redirectErrorStream(true);Process p = builder.start();outstring = waitFor(p);} catch (Exception ex) {ex.printStackTrace();}
// Boolean check_video_time = this.check_video_time(video_path, mp4folder_path + mp4_name);Boolean check_video_time = this.check_video_time(video_path, mp4folder_path);if(!check_video_time){return outstring;}else{return "success";}}}
上面的代碼中大多數(shù)是參數(shù)封裝,真正調(diào)用FFmpeg的是下面幾行
ProcessBuilder builder = new ProcessBuilder();builder.command(commend);//將標準輸入流和錯誤輸入流合并,通過標準輸入流程讀取信息builder.redirectErrorStream(true);
Process p = builder.start();
1.3.3 測試工具類
我們可以測試一下,比如打開一下“咪咕視頻”
public static void main(String[] args) throws IOException {ProcessBuilder builder = new ProcessBuilder();//啟動一下我本地的咪咕視頻(路徑中盡量不要含有中文)builder.command("D:\\soft\\MiguVideo\\MiGuApp.exe");//將標準輸入流和錯誤輸入流合并,通過標準輸入流程讀取信息builder.redirectErrorStream(true);Process p = builder.start();}
可以在此類中執(zhí)行main函數(shù)調(diào)用一下此工具類是否能完成視頻轉(zhuǎn)碼
public static void main(String[] args) throws IOException {//ffmpeg的路徑String ffmpeg_path = "";//ffmpeg的安裝位置//源avi視頻的路徑String video_path = "";//轉(zhuǎn)換后mp4文件的名稱String mp4_name = "";//轉(zhuǎn)換后mp4文件的路徑String mp4_path = "";//創(chuàng)建工具類對象Mp4VideoUtil videoUtil = new Mp4VideoUtil(ffmpeg_path,video_path,mp4_name,mp4_path);//開始視頻轉(zhuǎn)換,成功將返回successString s = videoUtil.generateMp4();System.out.println(s);}
二、分布式任務處理
2.1 分布式任務調(diào)度
什么是任務調(diào)度?
下面的場景就是一個調(diào)度方案
-
每隔24小時執(zhí)行數(shù)據(jù)備份任務。
-
12306網(wǎng)站會根據(jù)車次不同,設置幾個時間點分批次放票。
-
某財務系統(tǒng)需要在每天上午10點前結算前一天的賬單數(shù)據(jù),統(tǒng)計匯總。
-
商品成功發(fā)貨后,需要向客戶發(fā)送短信提醒。
任務調(diào)度:對任務的調(diào)度,它是指系統(tǒng)為了完成特定業(yè)務,基于給定時間點,給定時間間隔或者給定執(zhí)行次數(shù)自動執(zhí)行任務
我們可以將一個視頻的轉(zhuǎn)碼理解為一個任務的執(zhí)行,如果視頻的數(shù)量比較多,如何去高效處理一批任務呢?
- 多線程
多線程是充分利用單機的資源。
- 分布式加多線程
充分利用多臺計算機,每臺計算機使用多線程處理。每臺計算機都在同時運行指定任務處理
方案2可擴展性更強,并且是一種分布式任務調(diào)度的處理方案。
什么是分布式任務調(diào)度?
通常任務調(diào)度的程序是集成在應用中的,
比如:優(yōu)惠卷服務中包括了定時發(fā)放優(yōu)惠卷的的調(diào)度程序,
結算服務中包括了定期生成報表的任務調(diào)度程序
由于采用分布式架構,一個服務往往會部署多個冗余實例來運行我們的業(yè)務,在這種分布式系統(tǒng)環(huán)境下運行任務調(diào)度,我們稱之為分布式任務調(diào)度,如下圖:
分布式調(diào)度要實現(xiàn)的目標:
? 不管是任務調(diào)度程序集成在應用程序中,還是單獨構建的任務調(diào)度系統(tǒng),如果采用分布式調(diào)度任務的方式就相當于將任務調(diào)度程序分布式構建,這樣就可以具有分布式系統(tǒng)的特點,并且提高任務的調(diào)度處理能力:
1、并行任務調(diào)度
? 并行任務調(diào)度實現(xiàn)靠多線程,如果有大量任務需要調(diào)度,此時光靠多線程就會有瓶頸了,因為一臺計算機CPU的處理能力是有限的。
? 如果將任務調(diào)度程序分布式部署,每個結點還可以部署為集群,這樣就可以讓多臺計算機共同去完成任務調(diào)度,我們可以將任務分割為若干個分片,由不同的實例并行執(zhí)行,來提高任務調(diào)度的處理效率。
2、高可用
? 若某一個實例宕機,不影響其他實例來執(zhí)行任務。
3、彈性擴容
? 當集群中增加實例就可以提高并執(zhí)行任務的處理效率。
4、任務管理與監(jiān)測
? 對系統(tǒng)中存在的所有定時任務進行統(tǒng)一的管理及監(jiān)測。讓開發(fā)人員及運維人員能夠時刻了解任務執(zhí)行情況,從而做出快速的應急處理響應。
5、避免任務重復執(zhí)行
? 當任務調(diào)度以集群方式部署,同一個任務調(diào)度可能會執(zhí)行多次
比如在上面提到的電商系統(tǒng)中到點發(fā)優(yōu)惠券的例子,就會發(fā)放多次優(yōu)惠券,對公司造成很多損失,所以我們需要控制相同的任務在多個運行實例上只執(zhí)行一次
2.2 XXL-JOB 配置執(zhí)行器 中間件
我們只需要編寫任務的執(zhí)行邏輯即可,其他的部分都在中間件中
XXL-JOB是一個輕量級分布式任務調(diào)度平臺
其核心設計目標是開發(fā)迅速、學習簡單、輕量級、易擴展。
現(xiàn)已開放源代碼并接入多家公司線上產(chǎn)品線,開箱即用。
官網(wǎng):https://www.xuxueli.com/xxl-job/
文檔:https://www.xuxueli.com/xxl-job/#%E3%80%8A%E5%88%86%E5%B8%83%E5%BC%8F%E4%BB%BB%E5%8A%A1%E8%B0%83%E5%BA%A6%E5%B9%B3%E5%8F%B0XXL-JOB%E3%80%8B
XXL-JOB主要有調(diào)度中心、執(zhí)行器、任務:
調(diào)度中心:
? 負責管理調(diào)度信息,按照調(diào)度配置發(fā)出調(diào)度請求,自身不承擔業(yè)務代碼;
? 主要職責為執(zhí)行器管理、任務管理、監(jiān)控運維、日志管理等
調(diào)度中心其實就是一個管理者
任務執(zhí)行器:
? 負責接收調(diào)度請求并執(zhí)行任務邏輯;
? 只要職責是注冊服務、任務執(zhí)行服務(接收到任務后會放入線程池中的任務隊列)、執(zhí)行結果上報、日志服務等
任務執(zhí)行器相當于分布式部署,兩個執(zhí)行器相當于兩個人執(zhí)行
**任務:**負責執(zhí)行具體的業(yè)務處理。
執(zhí)行流程:
- 任務執(zhí)行器根據(jù)配置的調(diào)度中心的地址,自動注冊到調(diào)度中心
調(diào)度中心要知道自己下面有多少個任務執(zhí)行器
- 達到任務觸發(fā)條件,調(diào)度中心下發(fā)任務
調(diào)度中心會根據(jù)任務的調(diào)度策略來下發(fā)任務
- 執(zhí)行器基于線程池執(zhí)行任務,并把執(zhí)行結果放入內(nèi)存隊列中、把執(zhí)行日志寫入日志文件中
任務執(zhí)行器可能會執(zhí)行多個任務,所以要先將任務放入線程池中
- 執(zhí)行器消費內(nèi)存隊列中的執(zhí)行結果,主動上報給調(diào)度中心
任務執(zhí)行器將執(zhí)行結果異步上報給調(diào)度中心
也就是能夠在調(diào)度中心里面必須能夠拿到幾點幾分幾秒,哪個執(zhí)行器執(zhí)行任務是成功還是失敗的
- 當用戶在調(diào)度中心查看任務日志,調(diào)度中心請求任務執(zhí)行器,任務執(zhí)行器讀取任務日志文件并返回日志詳情
其實就是調(diào)度中心主動查詢?nèi)蝿請?zhí)行器執(zhí)行的任務是成功還是失敗
2.3 搭建XXL-JOB
調(diào)度中心負責給執(zhí)行器下發(fā)任務,執(zhí)行器負責執(zhí)行任務
2.3.1 調(diào)度中心
首先下載XXL-JOB
GitHub:https://github.com/xuxueli/xxl-job
碼云:https://gitee.com/xuxueli0323/xxl-job
項目使用2.3.1版本: https://github.com/xuxueli/xxl-job/releases/tag/2.3.1
如果想本地運行的話,我們需要修改一些參數(shù)才可以運行
包結構
xxl-job-admin:調(diào)度中心
xxl-job-core:公共依賴
xxl-job-executor-samples:執(zhí)行器Sample示例(選擇合適的版本執(zhí)行器,可直接使用)
:xxl-job-executor-sample-springboot:Springboot版本,通過Springboot管理執(zhí)行器,推薦這種方式;
:xxl-job-executor-sample-frameless:無框架版本;
進入xxl-job
http://192.168.101.65:8088/xxl-job-admin/toLogin
賬號:admin
密碼:123456
2.3.2 執(zhí)行器
在調(diào)度中心創(chuàng)建一個執(zhí)行器
配置執(zhí)行器,執(zhí)行器負責與調(diào)度中心通信接收調(diào)度中心發(fā)起的任務調(diào)度請求
創(chuàng)建“執(zhí)行器管理”,如下圖所示
此時沒有一個java程序在執(zhí)行任務,知識創(chuàng)建了一個執(zhí)行器而已
因為我們要在media工程的media-service工程中使用xxl-job,所以在media-service的pom文件中增加下面這個坐標
我們的執(zhí)行器就是在media-service工程中編寫
我們現(xiàn)在的目的是讓執(zhí)行器注冊到調(diào)度中心,我們添加之后就注冊到調(diào)度中心了
<dependency><groupId>com.xuxueli</groupId><artifactId>xxl-job-core</artifactId>
</dependency>
在nacos下的media-service-dev.yaml下配置xxl-job
我們配置上坐標后還不能說是完整的注冊到調(diào)度中心了,我們還需要告訴它調(diào)度中心在哪里,所以就需要下面調(diào)度中心的配置了
注意配置中的appname這是執(zhí)行器的應用名
調(diào)度中心要給執(zhí)行器下發(fā)任務,那執(zhí)行器肯定得啟動一個服務
port是執(zhí)行器啟動的端口,如果本地啟動多個執(zhí)行器注意端口不能重復。執(zhí)行器啟動起來后,調(diào)度中心會調(diào)用它
xxl:job:admin: addresses: http://192.168.101.65:8088/xxl-job-adminexecutor:appname: testHandleraddress: ip: port: 9999logpath: /data/applogs/xxl-job/jobhandlerlogretentiondays: 30accessToken: default_token
將下面的配置復制到media-service工程
@Configuration
public class XxlJobConfig {private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class);@Value("${xxl.job.admin.addresses}")private String adminAddresses;@Value("${xxl.job.accessToken}")private String accessToken;@Value("${xxl.job.executor.appname}")private String appname;@Value("${xxl.job.executor.address}")private String address;@Value("${xxl.job.executor.ip}")private String ip;@Value("${xxl.job.executor.port}")private int port;@Value("${xxl.job.executor.logpath}")private String logPath;@Value("${xxl.job.executor.logretentiondays}")private int logRetentionDays;@Beanpublic XxlJobSpringExecutor xxlJobExecutor() {logger.info(">>>>>>>>>>> xxl-job config init.");XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();xxlJobSpringExecutor.setAdminAddresses(adminAddresses);xxlJobSpringExecutor.setAppname(appname);xxlJobSpringExecutor.setAddress(address);xxlJobSpringExecutor.setIp(ip);xxlJobSpringExecutor.setPort(port);xxlJobSpringExecutor.setAccessToken(accessToken);xxlJobSpringExecutor.setLogPath(logPath);xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);return xxlJobSpringExecutor;}/*** 針對多網(wǎng)卡、容器內(nèi)部署等情況,可借助 "spring-cloud-commons" 提供的 "InetUtils" 組件靈活定制注冊IP;** 1、引入依賴:* <dependency>* <groupId>org.springframework.cloud</groupId>* <artifactId>spring-cloud-commons</artifactId>* <version>${version}</version>* </dependency>** 2、配置文件,或者容器啟動變量* spring.cloud.inetutils.preferred-networks: 'xxx.xxx.xxx.'** 3、獲取IP* String ip_ = inetUtils.findFirstNonLoopbackHostInfo().getIpAddress();*/}
最終結果如下圖所示
2.3.3 執(zhí)行任務
為什么要配置執(zhí)行器呢?
我們要讓執(zhí)行器執(zhí)行任務
那我們怎么告訴執(zhí)行器來執(zhí)行什么樣的任務呢?
如下圖所示的地方有個事例,拷貝到我們自己的工程中
第一步:定義任務類
/*** XxlJob開發(fā)示例(Bean模式)** 開發(fā)步驟:* 1、任務開發(fā):在Spring Bean實例中,開發(fā)Job方法;* 2、注解配置:為Job方法添加注解 "@XxlJob(value="自定義jobhandler名稱", init = "JobHandler初始化方法", destroy = "JobHandler銷毀方法")",注解value值對應的是調(diào)度中心新建任務的JobHandler屬性的值。* 3、執(zhí)行日志:需要通過 "XxlJobHelper.log" 打印執(zhí)行日志;* 4、任務結果:默認任務結果為 "成功" 狀態(tài),不需要主動設置;如有訴求,比如設置任務結果為失敗,可以通過 "XxlJobHelper.handleFail/handleSuccess" 自主設置任務結果;** @author xuxueli 2019-12-11 21:52:51*//*** 任務類*/
@Component
public class SampleXxlJob {private static Logger logger = LoggerFactory.getLogger(SampleXxlJob.class);/*** 執(zhí)行器拿到任務后就會執(zhí)行這個方法* 具體的任務方法*/@XxlJob("demoJobHandler") //任務名稱是demoJobHandlerpublic void demoJobHandler() throws Exception {System.out.println("處理視頻.....");//任務執(zhí)行邏輯...}/*** 執(zhí)行器拿到任務后就會執(zhí)行這個方法* 具體的任務方法*/@XxlJob("demoJobHandler2")public void demoJobHandler2() throws Exception {System.out.println("處理文檔.....");//任務執(zhí)行邏輯....}}
第二步:調(diào)度中心中注冊任務
調(diào)度類型:
- 固定速度:每隔多長時間進行調(diào)度
CRON:不僅可以配置每隔多長時間,還可以配置年月日時分秒
cron = “0/30 * * * * ?”
- 從第0秒開始,每間隔30秒執(zhí)行1次
- 秒 分 時 日 月 周
- 以秒為例
- *:每秒都執(zhí)行
- 1-3:從第1秒開始執(zhí)行,到第3秒結束執(zhí)行
- 0/3:從第0秒開始,每隔3秒執(zhí)行1次
- 1,2,3:在指定的第1、2、3秒執(zhí)行
- ?:不指定
- 日和周不能同時制定,指定其中之一,則另一個設置為?
30 10 1 * * ? 每天1點10分30秒觸發(fā)
0/30 * * * * ? 每30秒觸發(fā)一次
* 0/10 * * * ? 每10分鐘觸發(fā)一次
第三步:啟動任務
第四步:觀察控制臺
2.4 XXL-JOB 高級配置參數(shù)
XXL-JOB分布式調(diào)度平臺包括調(diào)度中心和執(zhí)行器,我們剛剛已經(jīng)在media-service工程中創(chuàng)建了一個執(zhí)行器,但是分布式任務調(diào)度要有多個執(zhí)行器來執(zhí)行任務,所以我們需要把執(zhí)行器至少部署兩個節(jié)點
怎么部署至少兩個節(jié)點呢?
將media-api工程運行兩個即可
怎么讓XXL-JOB調(diào)度多個集群(即多個執(zhí)行器)進行執(zhí)行任務呢?
其實就是剛剛任務管理中的這些配置
-
路由策略:
我們有一個調(diào)度中心,三個任務執(zhí)行器
當我們規(guī)定的任務調(diào)度時間到了后調(diào)度中心就會下發(fā)任務,但是現(xiàn)在面臨一個問題,這個任務分發(fā)給哪個任務執(zhí)行器?
這就需要我們配置路由策略了
第一個:每次都會下發(fā)給第一個任務執(zhí)行器
最后一個:每次都會下發(fā)給第最后一個任務執(zhí)行器
輪訓:每個人輪著來
一致性HASH:我們的任務有一個id,會求此id的hash值,并且此hash值一定會是執(zhí)行器中的其中一個
最不經(jīng)常使用:最不經(jīng)常執(zhí)行任務的執(zhí)行器
最近最久未使用:最近最不經(jīng)常執(zhí)行任務的執(zhí)行器
故障轉(zhuǎn)移:任務路由策略選擇"故障轉(zhuǎn)移"情況下,如果執(zhí)行器集群中某一臺機器故障,將會自動Failover切換到一臺正常的執(zhí)行器發(fā)送調(diào)度請求。
忙碌轉(zhuǎn)移:某個執(zhí)行器任務挺多或者正在忙,就會發(fā)送給其他執(zhí)行器
分片廣播:執(zhí)行器集群部署時,任務路由策略選擇"分片廣播"情況下,一次任務調(diào)度將會廣播觸發(fā)集群中所有執(zhí)行器執(zhí)行一次任務,可根據(jù)分片參數(shù)開發(fā)分片任務;
除了分片廣播之外,都是一個任務由一個執(zhí)行器進行執(zhí)行,不能將執(zhí)行能力發(fā)揮到最大
分片廣播可以實現(xiàn)將任務同時發(fā)送給多個任務執(zhí)行器
-
子任務ID
不經(jīng)常使用
當執(zhí)行完一個任務又想執(zhí)行第二個任務,此時第二個任務就是第一個任務的子任務
-
調(diào)度過期策略
到了改調(diào)度的時候不知道什么原因沒有調(diào)度
-
堵塞處理策略
當前執(zhí)行器在執(zhí)行任務,但是任務調(diào)度中心又讓此執(zhí)行器進行執(zhí)行任務2,此時任務2就被堵塞了
單擊串行:隊列的形式,任務進行排隊,執(zhí)行器按次序執(zhí)行任務
丟棄后續(xù)調(diào)度:執(zhí)行器正在執(zhí)行任務但是新派了任務,不會干新派的任務
覆蓋之前調(diào)度:執(zhí)行器正在執(zhí)行任務但是新派了任務,會把當前的活終止,去做新的活
-
任務超時時間
假如我們?nèi)蝿沼喌?0秒,但是15秒還沒有執(zhí)行完,那超時了就不執(zhí)行了
- 失敗重試次數(shù)
2.5 分片廣播
分片廣播:執(zhí)行器集群部署時,任務路由策略選擇"分片廣播"情況下,一次任務調(diào)度將會廣播觸發(fā)集群中所有執(zhí)行器執(zhí)行一次任務,可根據(jù)分片參數(shù)開發(fā)分片任務
那這樣處理同一批視頻的話,會不會重復處理同一個視頻?
不會
分片廣播在給執(zhí)行器分配任務的時候會給執(zhí)行器分發(fā)序號
調(diào)度中心去廣播的時候,會通知三個執(zhí)行器執(zhí)行任務
比如通知第0號執(zhí)行器:把第零部分執(zhí)行一下
比如通知第1號執(zhí)行器:把第1部分執(zhí)行一下
比如通知第2號執(zhí)行器:把第2部分執(zhí)行一下
…
此時各個執(zhí)行器就能執(zhí)行各個的任務
作業(yè)分片適用哪些場景呢?
? 分片任務場景:10個執(zhí)行器的集群來處理10w條數(shù)據(jù),每臺機器只需要處理1w條數(shù)據(jù),耗時降低10倍;
? 廣播任務場景:廣播執(zhí)行器同時運行shell腳本、廣播集群節(jié)點進行緩存更新等。
所以,廣播分片方式不僅可以充分發(fā)揮每個執(zhí)行器的能力,并且根據(jù)分片參數(shù)可以控制任務是否執(zhí)行,最終靈活控制了執(zhí)行器集群分布式處理任務。
2.5.1 分片廣播事例
如下所示的代碼有一個示例,我們可以看一下
可以把這個復制到我們的工程中
下面的代碼需要做的就是告訴執(zhí)行器編號,給每個執(zhí)行器進行編號
/*** 2、分片廣播任務*/@XxlJob("shardingJobHandler")public void shardingJobHandler() throws Exception {// 分片參數(shù)int shardIndex = XxlJobHelper.getShardIndex();//執(zhí)行器的序號,從0開始int shardTotal = XxlJobHelper.getShardTotal();//執(zhí)行器總數(shù)//只要有了上面兩個參數(shù),我們就可以人為確定我們執(zhí)行器執(zhí)行哪一部分System.out.println("shardIndex:"+shardIndex);System.out.println("shardTotal:"+shardTotal);}
我們需要兩個執(zhí)行器,那只能啟動兩個MediaApplication項目
其中對于media-service-dev工程一定要在nacos配置本地優(yōu)先策略
spring:cloud:config:override-none: true
-Dserver.port=63051 配置一下啟動端口
-Dxxl.job.executor.port=9998 配置執(zhí)行器的端口
啟動兩個執(zhí)行器,如下圖所示:
然后查看一下調(diào)度中心,是否有兩個執(zhí)行器
要想讓執(zhí)行器執(zhí)行任務,還需要在調(diào)度中心的任務管理注冊任務
執(zhí)行任務
觀察控制臺情況
分別是0號執(zhí)行器和1號執(zhí)行器,總共兩個執(zhí)行器
三、視頻處理
3.1 技術方案
3.1.1 作業(yè)分片方案
此時如何保證多個執(zhí)行器不會查詢到重復的任務呢?
將待處理的文件進行編號
兩個執(zhí)行器實例那么分片總數(shù)為2,序號為0、1
從任務1開始,如下:
1 % 2 = 1 執(zhí)行器2執(zhí)行
2 % 2 = 0 執(zhí)行器1執(zhí)行
3 % 2 = 1 執(zhí)行器2執(zhí)行
以此類推.
3.1.2 保證任務不重復執(zhí)行
多個執(zhí)行器在并行執(zhí)行,怎么保證任務不重復執(zhí)行呢?
比如說0號執(zhí)行器有1號任務和3號任務,但是此時調(diào)度中心又來進行調(diào)度,那1號任務和3號任務可能又執(zhí)行了一遍
如果一個執(zhí)行器在處理一個視頻還沒有完成,此時調(diào)度中心又一次請求調(diào)度,為了不重復處理同一個視頻該怎么辦
- 配置調(diào)度過期策略
調(diào)度過期策略:調(diào)度中心錯過調(diào)度時間的補償處理策略,包括:忽略、立即補償觸發(fā)一次等;
- 忽略:調(diào)度過期后,忽略過期的任務,從當前時間開始重新計算下次觸發(fā)時間;
- 立即執(zhí)行一次:調(diào)度過期后,立即執(zhí)行一次,并從當前時間開始重新計算下次觸發(fā)時間;
這里我們選擇忽略,如果立即執(zhí)行一次就可能重復執(zhí)行相同的任務
-
配置阻塞處理策略
阻塞處理策略:調(diào)度過于密集執(zhí)行器來不及處理時的處理策略;
阻塞處理策略就是當前執(zhí)行器正在執(zhí)行任務還沒有結束時調(diào)度中心進行任務調(diào)度,此時該如何處理
單機串行(默認):調(diào)度請求進入單機執(zhí)行器后,調(diào)度請求進入FIFO隊列并以串行方式運行;
丟棄后續(xù)調(diào)度:調(diào)度請求進入單機執(zhí)行器后,發(fā)現(xiàn)執(zhí)行器存在運行的調(diào)度任務,本次請求將會被丟棄并標記為失敗;
覆蓋之前調(diào)度:調(diào)度請求進入單機執(zhí)行器后,發(fā)現(xiàn)執(zhí)行器存在運行的調(diào)度任務,將會終止運行中的調(diào)度任務并清空隊列,然后運行本地調(diào)度任務;
這里如果選擇覆蓋之前調(diào)度則可能重復執(zhí)行任務,這里選擇 丟棄后續(xù)調(diào)度來避免任務重復執(zhí)行
- 保證任務處理的冪等性
任務的冪等性是指:對于數(shù)據(jù)的操作不論多少次,操作的結果始終是一致的
某一個視頻不管是調(diào)度多少次,只會轉(zhuǎn)碼一次
某個視頻轉(zhuǎn)碼成功后再來調(diào)度就不會進行了,因為已經(jīng)轉(zhuǎn)碼成功了
什么是冪等性?它描述了一次和多次請求某一個資源對于資源本身應該具有同樣的結果
冪等性是為了解決重復提交問題,比如:惡意刷單,重復支付等
解決冪等性常用的方案
1)數(shù)據(jù)庫約束,比如:唯一索引,主鍵。
2)樂觀鎖,常用于數(shù)據(jù)庫,更新數(shù)據(jù)時根據(jù)樂觀鎖狀態(tài)去更新。
3)唯一序列號,操作傳遞一個唯一序列號,操作時判斷與該序列號相等則執(zhí)行。
基于以上分析,在執(zhí)行器接收調(diào)度請求去執(zhí)行視頻處理任務時要實現(xiàn)視頻處理的冪等性,要有辦法去判斷該視頻是否處理完成,如果正在處理中或處理完則不再處理。這里我們在數(shù)據(jù)庫視頻處理表中添加處理狀態(tài)字段,視頻處理完成更新狀態(tài)為完成,執(zhí)行視頻處理前判斷狀態(tài)是否完成,如果完成則不再處理。
3.1.3 視頻處理方案
邊梳理整個視頻上傳及處理的業(yè)務流程
上傳視頻成功向視頻處理待處理表添加記錄
我們上傳視頻的代碼已經(jīng)做了,但是沒有做向任務表插入一條待處理視頻的任務
視頻處理的詳細流程如下:
1、任務調(diào)度中心廣播作業(yè)分片。
2、執(zhí)行器收到廣播作業(yè)分片,從數(shù)據(jù)庫讀取待處理任務,讀取未處理及處理失敗的任務。
3、執(zhí)行器更新任務為處理中,根據(jù)任務內(nèi)容從MinIO下載要處理的文件。
4、執(zhí)行器啟動多線程去處理任務。
5、任務處理完成,上傳處理后的視頻到MinIO。
6、將更新任務處理結果,如果視頻處理完成除了更新任務處理結果以外還要將文件的訪問地址更新至任務處理表及文件表中,最后將任務完成記錄寫入歷史表。
行器存在運行的調(diào)度任務,本次請求將會被丟棄并標記為失敗;
覆蓋之前調(diào)度:調(diào)度請求進入單機執(zhí)行器后,發(fā)現(xiàn)執(zhí)行器存在運行的調(diào)度任務,將會終止運行中的調(diào)度任務并清空隊列,然后運行本地調(diào)度任務;
這里如果選擇覆蓋之前調(diào)度則可能重復執(zhí)行任務,這里選擇 丟棄后續(xù)調(diào)度來避免任務重復執(zhí)行
[外鏈圖片轉(zhuǎn)存中…(img-Bdh0beJE-1704806448805)]
- 保證任務處理的冪等性
任務的冪等性是指:對于數(shù)據(jù)的操作不論多少次,操作的結果始終是一致的
某一個視頻不管是調(diào)度多少次,只會轉(zhuǎn)碼一次
某個視頻轉(zhuǎn)碼成功后再來調(diào)度就不會進行了,因為已經(jīng)轉(zhuǎn)碼成功了
什么是冪等性?它描述了一次和多次請求某一個資源對于資源本身應該具有同樣的結果
冪等性是為了解決重復提交問題,比如:惡意刷單,重復支付等
解決冪等性常用的方案
1)數(shù)據(jù)庫約束,比如:唯一索引,主鍵。
2)樂觀鎖,常用于數(shù)據(jù)庫,更新數(shù)據(jù)時根據(jù)樂觀鎖狀態(tài)去更新。
3)唯一序列號,操作傳遞一個唯一序列號,操作時判斷與該序列號相等則執(zhí)行。
基于以上分析,在執(zhí)行器接收調(diào)度請求去執(zhí)行視頻處理任務時要實現(xiàn)視頻處理的冪等性,要有辦法去判斷該視頻是否處理完成,如果正在處理中或處理完則不再處理。這里我們在數(shù)據(jù)庫視頻處理表中添加處理狀態(tài)字段,視頻處理完成更新狀態(tài)為完成,執(zhí)行視頻處理前判斷狀態(tài)是否完成,如果完成則不再處理。