国产亚洲精品福利在线无卡一,国产精久久一区二区三区,亚洲精品无码国模,精品久久久久久无码专区不卡

當前位置: 首頁 > news >正文

坪山住房和建設局網(wǎng)站推銷

坪山住房和建設局網(wǎng)站,推銷,男女做受網(wǎng)站,成都網(wǎng)站建設制作設計Flink在早期版本有一個split算子用來做數(shù)據(jù)分流使用的,但是在flink-1.12開始這個API就已經(jīng)被刪除了,在1.12版本以后我們是通過process算子來做數(shù)據(jù)分流的,這里就介紹一下如何使用prodess進行數(shù)據(jù)分流. 代碼 import org.apache.flink.api.common.typeinfo.TypeInformation; im…

Flink在早期版本有一個split算子用來做數(shù)據(jù)分流使用的,但是在flink-1.12開始這個API就已經(jīng)被刪除了,在1.12版本以后我們是通過process算子來做數(shù)據(jù)分流的,這里就介紹一下如何使用prodess進行數(shù)據(jù)分流.

  • 代碼
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;/*** @Author: J* @Version: 1.0* @CreateTime: 2023/8/7* @Description: 測流輸出**/
public class FlinkSideOutput {public static void main(String[] args) throws Exception {// 構(gòu)建流環(huán)境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 設置并行度env.setParallelism(3);// 這里使用的是自定義數(shù)據(jù)源為了方便測試,具體數(shù)據(jù)源根據(jù)自己的實際情況進行更換DataStreamSource<CustomizeBean> customizeSourceStream = env.addSource(new CustomizeSource());/*** 需求* 1. 將性別為M且愛好為'羽毛球運動愛好者'分到一個流* 2. 將性別為W且愛好為'籃球運動愛好者'或'釣魚愛好者'分到一個流* 3. 其他保留到主流**/SingleOutputStreamOperator<CustomizeBean> processedStream = customizeSourceStream.process(new ProcessFunction<CustomizeBean, CustomizeBean>() {@Overridepublic void processElement(CustomizeBean value, ProcessFunction<CustomizeBean, CustomizeBean>.Context ctx, Collector<CustomizeBean> out) throws Exception {String gender = value.getGender(); // 性別String hobbit = value.getHobbit(); // 愛好if (gender.equals("M") && hobbit.equals("羽毛球運動愛好者")) {// 將性別為M且愛好為'羽毛球運動愛好者'進行分流, 注意這里要聲明類型,Java無法自行推斷ctx.output(new OutputTag<CustomizeBean>("M-羽毛球", TypeInformation.of(CustomizeBean.class)), value);} else if (gender.equals("W") && (hobbit.equals("籃球運動愛好者") || hobbit.equals("釣魚愛好者"))) {// 將性別為W且愛好為'籃球運動愛好者'或'釣魚愛好者'進行分流, 注意這里要聲明類型,Java無法自行推斷ctx.output(new OutputTag<CustomizeBean>("W-籃球/釣魚", TypeInformation.of(CustomizeBean.class)), value);} else {// 將剩下的數(shù)據(jù)保留在主流中out.collect(value);}}});// 獲取'M-羽毛球'分流數(shù)據(jù),這里也要加上類型聲明DataStream<CustomizeBean> mSideOutput = processedStream.getSideOutput(new OutputTag<CustomizeBean>("M-羽毛球", TypeInformation.of(CustomizeBean.class)));// 打印'M-羽毛球'結(jié)果mSideOutput.print("M-羽毛球");// 獲取'W-籃球/釣魚'分流數(shù)據(jù),這里也要加上類型聲明DataStream<CustomizeBean> wSideOutput = processedStream.getSideOutput(new OutputTag<CustomizeBean>("W-籃球/釣魚", TypeInformation.of(CustomizeBean.class)));// 打印結(jié)果wSideOutput.print("W-籃球/釣魚");// 主流數(shù)據(jù)打印結(jié)果processedStream.print("主數(shù)據(jù)流");env.execute("Side Output");}
}
  • 結(jié)果數(shù)據(jù)
主數(shù)據(jù)流:2> CustomizeBean(name=AAA-641, age=44, gender=W, hobbit=非遺文化愛好者)
主數(shù)據(jù)流:3> CustomizeBean(name=AAA-17, age=62, gender=M, hobbit=書法愛好者)
主數(shù)據(jù)流:1> CustomizeBean(name=AAA-429, age=25, gender=W, hobbit=非遺文化愛好者)
主數(shù)據(jù)流:2> CustomizeBean(name=AAA-218, age=33, gender=M, hobbit=旅游愛好者)
主數(shù)據(jù)流:3> CustomizeBean(name=AAA-826, age=39, gender=M, hobbit=籃球運動愛好者)
主數(shù)據(jù)流:1> CustomizeBean(name=AAA-190, age=31, gender=M, hobbit=旅游愛好者)
主數(shù)據(jù)流:2> CustomizeBean(name=AAA-266, age=32, gender=W, hobbit=網(wǎng)吧戰(zhàn)神)
主數(shù)據(jù)流:3> CustomizeBean(name=AAA-106, age=70, gender=M, hobbit=書法愛好者)
主數(shù)據(jù)流:1> CustomizeBean(name=AAA-911, age=50, gender=M, hobbit=網(wǎng)吧戰(zhàn)神)
M-羽毛球:2> CustomizeBean(name=AAA-925, age=65, gender=M, hobbit=羽毛球運動愛好者)
主數(shù)據(jù)流:3> CustomizeBean(name=AAA-20, age=59, gender=M, hobbit=書法愛好者)
主數(shù)據(jù)流:1> CustomizeBean(name=AAA-409, age=79, gender=W, hobbit=天文知識愛好者)
主數(shù)據(jù)流:2> CustomizeBean(name=AAA-865, age=58, gender=W, hobbit=天文知識愛好者)
主數(shù)據(jù)流:3> CustomizeBean(name=AAA-898, age=33, gender=M, hobbit=天文知識愛好者)
主數(shù)據(jù)流:1> CustomizeBean(name=AAA-85, age=38, gender=W, hobbit=非遺文化愛好者)
主數(shù)據(jù)流:2> CustomizeBean(name=AAA-883, age=51, gender=M, hobbit=美食愛好者)
主數(shù)據(jù)流:3> CustomizeBean(name=AAA-243, age=37, gender=M, hobbit=釣魚愛好者)
主數(shù)據(jù)流:1> CustomizeBean(name=AAA-430, age=28, gender=W, hobbit=旅游愛好者)
主數(shù)據(jù)流:2> CustomizeBean(name=AAA-127, age=65, gender=W, hobbit=網(wǎng)吧戰(zhàn)神)
W-籃球/釣魚:3> CustomizeBean(name=AAA-986, age=52, gender=W, hobbit=釣魚愛好者)
主數(shù)據(jù)流:1> CustomizeBean(name=AAA-840, age=50, gender=W, hobbit=旅游愛好者)
M-羽毛球:2> CustomizeBean(name=AAA-196, age=34, gender=M, hobbit=羽毛球運動愛好者)
主數(shù)據(jù)流:3> CustomizeBean(name=AAA-142, age=46, gender=W, hobbit=乒乓球運動愛好者)
主數(shù)據(jù)流:1> CustomizeBean(name=AAA-985, age=78, gender=W, hobbit=美食愛好者)
W-籃球/釣魚:2> CustomizeBean(name=AAA-490, age=50, gender=W, hobbit=釣魚愛好者)
主數(shù)據(jù)流:3> CustomizeBean(name=AAA-295, age=77, gender=M, hobbit=籃球運動愛好者)
主數(shù)據(jù)流:1> CustomizeBean(name=AAA-754, age=50, gender=M, hobbit=天文知識愛好者)
主數(shù)據(jù)流:2> CustomizeBean(name=AAA-249, age=35, gender=W, hobbit=羽毛球運動愛好者)
W-籃球/釣魚:3> CustomizeBean(name=AAA-908, age=27, gender=W, hobbit=釣魚愛好者)
主數(shù)據(jù)流:1> CustomizeBean(name=AAA-674, age=73, gender=M, hobbit=非遺文化愛好者)

通過結(jié)果內(nèi)容可以看到數(shù)據(jù)完全按照我們分流的邏輯進行輸出的,如果想在主數(shù)據(jù)流中講所有數(shù)據(jù)保留下來,Collector<Object> out單獨拎出來即可,也就是不加到判斷邏輯中,代碼如下,這里就只展示部分代碼了

SingleOutputStreamOperator<CustomizeBean> processedStream = customizeSourceStream.process(new ProcessFunction<CustomizeBean, CustomizeBean>() {@Overridepublic void processElement(CustomizeBean value, ProcessFunction<CustomizeBean, CustomizeBean>.Context ctx, Collector<CustomizeBean> out) throws Exception {String gender = value.getGender(); // 性別String hobbit = value.getHobbit(); // 愛好// 將所有數(shù)據(jù)保留在主流中out.collect(value);// 開始進行分流處理if (gender.equals("M") && hobbit.equals("羽毛球運動愛好者")) {// 將性別為M且愛好為'羽毛球運動愛好者'進行分流, 注意這里要聲明類型,Java無法自行推斷ctx.output(new OutputTag<CustomizeBean>("M-羽毛球", TypeInformation.of(CustomizeBean.class)), value);} else if ((gender.equals("W") && (hobbit.equals("籃球運動愛好者")) || (gender.equals("W") && hobbit.equals("釣魚愛好者")))) {// 將性別為W且愛好為'籃球運動愛好者'或'釣魚愛好者'進行分流, 注意這里要聲明類型,Java無法自行推斷ctx.output(new OutputTag<CustomizeBean>("W-籃球/釣魚", TypeInformation.of(CustomizeBean.class)), value);}}});

所有的內(nèi)容到這里就結(jié)束了.

http://m.aloenet.com.cn/news/35731.html

相關文章:

  • AAP網(wǎng)站開發(fā)需要多少錢微指數(shù)
  • 西寧做網(wǎng)站的好公司北京seo優(yōu)化技術(shù)
  • 站群網(wǎng)絡促銷的方法有哪些
  • 成都網(wǎng)站建設前幾公司關鍵詞搜索愛站網(wǎng)
  • 自己做soho需要做網(wǎng)站嗎云盤網(wǎng)頁版登錄
  • 網(wǎng)絡推廣主要工作內(nèi)容網(wǎng)站關鍵詞排名優(yōu)化軟件
  • 網(wǎng)站測試問題提交模板百度搜索引擎地址
  • 金融做市場廣告掛哪些網(wǎng)站seo百度網(wǎng)站排名研究中心關鍵詞首頁優(yōu)化
  • dedecms做網(wǎng)站有多快網(wǎng)絡營銷推廣專員
  • 太原推廣型網(wǎng)站制作汕頭seo快速排名
  • 房山網(wǎng)站建設網(wǎng)絡seo哈爾濱
  • 政府網(wǎng)站建設分析專注于seo顧問
  • 如何進行網(wǎng)站維護seo云優(yōu)化如何
  • 動態(tài)網(wǎng)站設計與開發(fā)心得體會貴陽關鍵詞優(yōu)化平臺
  • 齊諾網(wǎng)站建設成都私人做網(wǎng)站建設
  • 好域名做網(wǎng)站微信視頻號怎么推廣引流
  • 先做它個天貓網(wǎng)站百度搜索關鍵詞
  • 做網(wǎng)站的流程分析-圖靈吧百度指數(shù)查詢app
  • wordpress blod關鍵詞是網(wǎng)站seo的核心工作
  • 旅游網(wǎng)站建設的目的及功能定位優(yōu)幫云首頁推薦
  • 博客網(wǎng)站的建設手機百度網(wǎng)盤下載慢怎么解決
  • 自己做網(wǎng)站可以隨便起名字嗎友情鏈接站長平臺
  • 什么行業(yè)做網(wǎng)站百度指數(shù)數(shù)據(jù)
  • 建設銀行官方網(wǎng)站地址新品牌推廣策略
  • 橋頭鎮(zhèn)網(wǎng)站仿做電商網(wǎng)頁
  • 大城網(wǎng)站制作新手怎么做網(wǎng)頁
  • 微信推送怎么做購物網(wǎng)站360搜索引擎網(wǎng)址
  • 成都捕魚網(wǎng)站建設昆明seo培訓
  • 服務器網(wǎng)站綁定域名網(wǎng)站建設最新中央人事任免
  • 個人做網(wǎng)站賺錢太原做網(wǎng)站的