国产亚洲精品福利在线无卡一,国产精久久一区二区三区,亚洲精品无码国模,精品久久久久久无码专区不卡

當前位置: 首頁 > news >正文

三亞制作網站軍事新聞頭條最新消息

三亞制作網站,軍事新聞頭條最新消息,企業(yè)網站wap源碼,中原鄭州網站建設一、說明 時間屬性是大數據中的一個重要方面,像窗口(在 Table API 和 SQL )這種基于時間的操作,需要有時間信息。我們可以通過時間屬性來更加靈活高效地處理數據,下面我們通過處理時間和事件時間來探討一下Flink SQL …

一、說明

時間屬性是大數據中的一個重要方面,像窗口(在 Table API 和 SQL )這種基于時間的操作,需要有時間信息。我們可以通過時間屬性來更加靈活高效地處理數據,下面我們通過處理時間和事件時間來探討一下Flink SQL 時間屬性。

二、處理時間

2.1、準備WaterSensor類,方便使用

package com.lyh.bean;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;@Data
@NoArgsConstructor
@AllArgsConstructor
public class WaterSensor {private String id;private Long ts;private Integer vc;
}

2.2、DataStream 到 Table 轉換時定義

處理時間屬性可以在 schema 定義的時候用 .proctime 后綴來定義。時間屬性一定不能定義在一個已有字段上,所以它新增一個字段。
代碼段:

package com.lyh.flink12;import com.lyh.bean.WaterSensor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import static org.apache.flink.table.api.Expressions.$;public class Flink_Sql_Proctime {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<WaterSensor> waterSensorStream =env.fromElements(new WaterSensor("sensor_1", 1000L, 10),new WaterSensor("sensor_1", 2000L, 20),new WaterSensor("sensor_2", 3000L, 30),new WaterSensor("sensor_1", 4000L, 40),new WaterSensor("sensor_1", 5000L, 50),new WaterSensor("sensor_2", 6000L, 60));
// 1. 創(chuàng)建表的執(zhí)行環(huán)境StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 聲明一個額外的字段來作為處理時間字段Table sensorTable = tableEnv.fromDataStream(waterSensorStream, $("id"), $("ts"), $("vc"), $("pt").proctime());sensorTable.execute().print();}
}

執(zhí)行結果:
在這里插入圖片描述

2.3、創(chuàng)建數據文件sensor.txt 數據,方便使用

sensor_1,1,10
sensor_1,2,20
sensor_2,4,30
sensor_1,4,400
sensor_2,5,50
sensor_2,6,60

2.4、在創(chuàng)建表的 DDL 中定義

package com.lyh.flink12;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;public class Flink_Sql_ddl_Procetime {public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);tableEnv.executeSql("create table sensor(id string,ts bigint,vc int,pt_time as PROCTIME()) with("+ "'connector' = 'filesystem',"+ "'path' = 'input/sensor.txt',"+ "'format' = 'csv'"+ ")");Table table = tableEnv.sqlQuery("select * from sensor");table.execute().print();}
}

運行結果:
在這里插入圖片描述

三、事件時間

事件時間允許程序按照數據中包含的時間來處理,這樣可以在有亂序或者晚到的數據的情況下產生一致的處理結果。它可以保證從外部存儲讀取數據后產生可以復現(replayable)的結果。
除此之外,事件時間可以讓程序在流式和批式作業(yè)中使用同樣的語法。在流式程序中的事件時間屬性,在批式程序中就是一個正常的時間字段。
為了能夠處理亂序的事件,并且區(qū)分正常到達和晚到的事件,Flink 需要從事件中獲取事件時間并且產生 watermark(watermarks)。

3.1、DataStream 到 Table 轉換時定義

事件時間屬性可以用 .rowtime 后綴在定義 DataStream schema 的時候來定義。時間戳和 watermark 在這之前一定是在 DataStream 上已經定義好了。
在從 DataStream 到 Table 轉換時定義事件時間屬性有兩種方式。取決于用 .rowtime 后綴修飾的字段名字是否是已有字段,事件時間字段可以是:
1、在 schema 的結尾追加一個新的字段
2、替換一個已經存在的字段。
不管在哪種情況下,事件時間字段都表示 DataStream 中定義的事件的時間戳。
代碼:
援用上面WaterSensor類

package com.lyh.flink12;import com.lyh.bean.WaterSensor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import java.time.Duration;import static org.apache.flink.table.api.Expressions.$;public class Flink_Sql_EventTime {public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> waterSensorSource = env.fromElements(new WaterSensor("sensor_1", 1000L, 100),new WaterSensor("sensor_1", 1000L, 100),new WaterSensor("sensor_2", 1000L, 200),new WaterSensor("sensor_2", 1000L, 200)).assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner((element, recordtime) -> element.getTs()));StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);tableEnv.fromDataStream(waterSensorSource,$("id"),$("ts"),$("vc"),$("pt").rowtime()).execute().print();}
}

運行結果:
在這里插入圖片描述

3.2、使用已有的字段作為時間屬性

.fromDataStream(waterSensorStream, $("id"), $("ts").rowtime(), $("vc"));

3.3、在創(chuàng)建表的 DDL 中定義

事件時間屬性可以用 WATERMARK 語句在 CREATE TABLE DDL 中進行定義。WATERMARK 語句在一個已有字段上定義一個 watermark 生成表達式,同時標記這個已有字段為時間屬性字段.

package com.lyh.flink12;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;public class Flink_Sql_ddl_EventTime {public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);tableEnv.executeSql("create table sensor(" +"id string," +"ts bigint," +"vc int, " +"t as to_timestamp(from_unixtime(ts/1000,'yyyy-MM-dd HH:mm:ss'))," +"watermark for t as t - interval '5' second)" +"with("+ "'connector' = 'filesystem',"+ "'path' = 'input/sensor.txt',"+ "'format' = 'csv'"+ ")");tableEnv.sqlQuery("select * from sensor").execute().print();}
}

運行結果:
在這里插入圖片描述
說明:
1.把一個現有的列定義為一個為表標記事件時間的屬性。該列的類型必須為 TIMESTAMP(3),且是 schema 中的頂層列,它也可以是一個計算列。
2.嚴格遞增時間戳: WATERMARK FOR rowtime_column AS rowtime_column。
3.遞增時間戳: WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL ‘0.001’ SECOND。
亂序時間戳: WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL ‘string’ timeUnit。

http://m.aloenet.com.cn/news/29168.html

相關文章:

  • 北票網站建設黃金網站軟件免費
  • 做娛樂性手機網站推廣資源網
  • 直播間 網站建設app關鍵詞推廣
  • 淘寶客如何做網站今日新聞頭條新聞
  • 聚財三個字公司名字哪個網站學seo是免費的
  • 新網站快速提高排名互換鏈接的方法
  • 如何做網站對比網站優(yōu)化排名網站
  • 網站建設實戰(zhàn)視頻教程桂林網站設計制作
  • 報考建設八大員官方網站seo推廣軟件哪個好
  • wordpress 文章列表只顯示標題外貿seo
  • 哪個網站做任務能賺錢我要看今日頭條
  • 廈門廣告公司有哪些aso優(yōu)化的主要內容為
  • wordpress修改谷歌外貿seo公司
  • 做美團網這種網站賺錢嗎亞馬遜關鍵詞搜索器
  • 英語網站建設如何制作一個公司網站
  • 上海網站建設平臺站霸網絡seo學習網站
  • 做網站有一個火箭回頂部網站優(yōu)化關鍵詞公司
  • 做cpa的博客網站類型博客網
  • 優(yōu)惠券推廣網站怎么做seo怎么搞
  • nas網站怎么做網站時事新聞最新2022
  • 順德樂從有做阿里巴巴的網站嗎sem競價專員是干什么的
  • 做網站視頻圖片加載不出來企業(yè)網站模板下載
  • 情感視頻素材網站劉連康seo培訓哪家強
  • 網站建設和網站推廣可以同一家做嗎網站優(yōu)化排名金蘋果系統(tǒng)
  • 企業(yè)網站建設英文超級外鏈
  • 手機網站哪家好西安百度推廣優(yōu)化
  • 網站開發(fā) 英文文章百度收錄快的發(fā)帖平臺
  • 福州網頁鄭州seo排名優(yōu)化公司
  • 汕頭網站建設制作公司衡陽seo快速排名
  • 分類信息網站成都搭建如何搭建一個網站平臺