国产亚洲精品福利在线无卡一,国产精久久一区二区三区,亚洲精品无码国模,精品久久久久久无码专区不卡

    當(dāng)前位置: 首頁 > news >正文

    駐馬店高端網(wǎng)站建設(shè)免費(fèi)大數(shù)據(jù)查詢平臺(tái)

    駐馬店高端網(wǎng)站建設(shè),免費(fèi)大數(shù)據(jù)查詢平臺(tái),淘寶網(wǎng)站做淘寶客,網(wǎng)站建設(shè) 百科摘要 很多時(shí)候flink消費(fèi)上游kafka的數(shù)據(jù)是有重復(fù)的,因此有時(shí)候我們想數(shù)據(jù)在落盤之前進(jìn)行去重,這在實(shí)際開發(fā)中具有廣泛的應(yīng)用場景,此處不說詳細(xì)代碼,只粘貼相應(yīng)的flinksql 代碼 --********************************************…

    摘要

    很多時(shí)候flink消費(fèi)上游kafka的數(shù)據(jù)是有重復(fù)的,因此有時(shí)候我們想數(shù)據(jù)在落盤之前進(jìn)行去重,這在實(shí)際開發(fā)中具有廣泛的應(yīng)用場景,此處不說詳細(xì)代碼,只粘貼相應(yīng)的flinksql

    代碼

    --********************************************************************--
    -- 創(chuàng)建臨時(shí)表(只在當(dāng)前sessoin生效的表稱為臨時(shí)表) DDL
    CREATE TEMPORARY TABLE UserAttrSource ( `data` string,`kafkaMetaTimestamp` TIMESTAMP(3) METADATA FROM 'timestamp', -- kafka record攜帶的源數(shù)據(jù)時(shí)間戳,參考官網(wǎng)kafka connectorproctime as PROCTIME() -- 獲取數(shù)據(jù)處理時(shí)間,這是flink內(nèi)置支持的關(guān)鍵字
    ) WITH ('connector' = 'kafka','topic' = 'user_attri_ad_dirty_data','properties.bootstrap.servers' = 'kafka地址','scan.startup.mode' = 'timestamp', -- kafka掃描數(shù)據(jù)模式,參考官網(wǎng)kafka connector'scan.startup.timestamp-millis' ='1687305600000' , -- 2023-06-21 08:00:00'format' = 'raw' -- 意思是將kafka數(shù)據(jù)格式化為string
    );-- 創(chuàng)建SINKCREATE TEMPORARY TABLE ADB (log_date DATE,`errorType` int,appId string,`errorCode` int,`errorReason` string,`deserialization` string,`originalData` string,kafkaMetaTimestamp TIMESTAMP,data_hash string,PRIMARY KEY (`data_hash`) NOT ENFORCED
    )
    WITH ('connector' = 'adb3.0','url' = 'jdbc:mysql://xxxx:3306/flink_data?rewriteBatchedStatements=true','tableName' = 'usr_attr_dirty', 'userName'='username','password'='password'
    );
    -- 去重視圖, 這是關(guān)鍵(json_value是flink的內(nèi)置函數(shù),data_hash是數(shù)據(jù)本身的primary key)
    -- 下述語句含義是:根據(jù)data_hash字段分組,按照處理時(shí)間排序,取出最新的一條數(shù)據(jù),其他的重復(fù)數(shù)據(jù)將被拋棄
    CREATE TEMPORARY VIEW quchong ASSELECT data,kafkaMetaTimestamp FROM (SELECT *,ROW_NUMBER() OVER (PARTITION BY json_value(data,'$.data_hash') ORDER BY proctime DESC) as row_numFROM UserAttrSource)WHERE row_num = 1;--  插入目標(biāo)表
    insert into ADB
    select TO_DATE(DATE_FORMAT(kafkaMetaTimestamp,'yyyy-MM-dd') )AS log_date,json_value(data,'$.errorType' RETURNING INT) errorType,json_value(data,'$.appId' NULL ON EMPTY) appId,json_value(data,'$.errorCode'  RETURNING INT) errorCode,json_value(data,'$.errorReason' NULL ON EMPTY) errorReason,json_value(data,'$.deserialization' NULL ON EMPTY) deserialization,json_value(data,'$.originalData') originalData,kafkaMetaTimestamp,json_value(data,'$.data_hash') data_hash
    from quchong;
    http://m.aloenet.com.cn/news/36171.html

    相關(guān)文章:

  1. wordpress怎么發(fā)布公告廣州谷歌seo
  2. 網(wǎng)站建設(shè)合同書公司企業(yè)員工培訓(xùn)
  3. 做網(wǎng)站的結(jié)論網(wǎng)站的宣傳推廣方式
  4. 域名解析映射到網(wǎng)站空間怎么做seo發(fā)外鏈的網(wǎng)站
  5. 給我做網(wǎng)站的人老是給我留點(diǎn)尾巴百度成都總部
  6. 公眾號(hào)購物做網(wǎng)站還是小程序推廣產(chǎn)品的方法
  7. 做ae動(dòng)圖的網(wǎng)站網(wǎng)店推廣的渠道有哪些
  8. 自己做的網(wǎng)站被篡改怎么辦愛站網(wǎng)站排名查詢工具
  9. 電商網(wǎng)站建設(shè)開發(fā)的語言有哪些競價(jià)sem托管
  10. 慈溪做無痛同濟(jì)&網(wǎng)站體驗(yàn)營銷是什么
  11. 焦作網(wǎng)站設(shè)計(jì)網(wǎng)絡(luò)推廣外包搜索手機(jī)蛙軟件
  12. 微信支持什么wordpress360優(yōu)化大師官方網(wǎng)站
  13. 通用wap網(wǎng)站生成系統(tǒng)中小企業(yè)網(wǎng)站制作
  14. 青島網(wǎng)站建設(shè)公司正太原百度搜索排名優(yōu)化
  15. 溫州建設(shè)誠信網(wǎng)站seo學(xué)習(xí)論壇
  16. 學(xué)校網(wǎng)站模版網(wǎng)絡(luò)推廣優(yōu)化方案
  17. 山東省作風(fēng)建設(shè)網(wǎng)站湖南網(wǎng)站排名
  18. 做動(dòng)效的網(wǎng)站免費(fèi)外鏈發(fā)布
  19. 網(wǎng)站的后臺(tái)登錄注冊(cè)怎么做賬戶競價(jià)托管公司
  20. asia域名發(fā)布網(wǎng)站谷歌瀏覽器安卓下載
  21. 長治制作網(wǎng)站傳統(tǒng)營銷和網(wǎng)絡(luò)營銷的區(qū)別
  22. ui設(shè)計(jì)與網(wǎng)站建設(shè)網(wǎng)絡(luò)推廣渠道都有哪些
  23. 網(wǎng)站一個(gè)人可以做嗎找培訓(xùn)機(jī)構(gòu)的網(wǎng)站
  24. 網(wǎng)站建設(shè)技術(shù)分類aso具體優(yōu)化
  25. 做中介開什么網(wǎng)站如何搜索網(wǎng)頁關(guān)鍵詞
  26. 湖北網(wǎng)站建設(shè)企業(yè)百度sem推廣具體做什么
  27. 富平做網(wǎng)站中國職業(yè)培訓(xùn)在線平臺(tái)
  28. 網(wǎng)站設(shè)計(jì)創(chuàng)意2023年6月疫情情況
  29. 做ppt好用的網(wǎng)站廣告代發(fā)平臺(tái)
  30. wen前端網(wǎng)站開發(fā)日記外鏈的作用