三亞制作網站軍事新聞頭條最新消息
一、說明
時間屬性是大數據中的一個重要方面,像窗口(在 Table API 和 SQL )這種基于時間的操作,需要有時間信息。我們可以通過時間屬性來更加靈活高效地處理數據,下面我們通過處理時間和事件時間來探討一下Flink SQL 時間屬性。
二、處理時間
2.1、準備WaterSensor類,方便使用
package com.lyh.bean;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;@Data
@NoArgsConstructor
@AllArgsConstructor
public class WaterSensor {private String id;private Long ts;private Integer vc;
}
2.2、DataStream 到 Table 轉換時定義
處理時間屬性可以在 schema 定義的時候用 .proctime 后綴來定義。時間屬性一定不能定義在一個已有字段上,所以它新增一個字段。
代碼段:
package com.lyh.flink12;import com.lyh.bean.WaterSensor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import static org.apache.flink.table.api.Expressions.$;public class Flink_Sql_Proctime {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<WaterSensor> waterSensorStream =env.fromElements(new WaterSensor("sensor_1", 1000L, 10),new WaterSensor("sensor_1", 2000L, 20),new WaterSensor("sensor_2", 3000L, 30),new WaterSensor("sensor_1", 4000L, 40),new WaterSensor("sensor_1", 5000L, 50),new WaterSensor("sensor_2", 6000L, 60));
// 1. 創(chuàng)建表的執(zhí)行環(huán)境StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 聲明一個額外的字段來作為處理時間字段Table sensorTable = tableEnv.fromDataStream(waterSensorStream, $("id"), $("ts"), $("vc"), $("pt").proctime());sensorTable.execute().print();}
}
執(zhí)行結果:
2.3、創(chuàng)建數據文件sensor.txt 數據,方便使用
sensor_1,1,10
sensor_1,2,20
sensor_2,4,30
sensor_1,4,400
sensor_2,5,50
sensor_2,6,60
2.4、在創(chuàng)建表的 DDL 中定義
package com.lyh.flink12;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;public class Flink_Sql_ddl_Procetime {public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);tableEnv.executeSql("create table sensor(id string,ts bigint,vc int,pt_time as PROCTIME()) with("+ "'connector' = 'filesystem',"+ "'path' = 'input/sensor.txt',"+ "'format' = 'csv'"+ ")");Table table = tableEnv.sqlQuery("select * from sensor");table.execute().print();}
}
運行結果:
三、事件時間
事件時間允許程序按照數據中包含的時間來處理,這樣可以在有亂序或者晚到的數據的情況下產生一致的處理結果。它可以保證從外部存儲讀取數據后產生可以復現(replayable)的結果。
除此之外,事件時間可以讓程序在流式和批式作業(yè)中使用同樣的語法。在流式程序中的事件時間屬性,在批式程序中就是一個正常的時間字段。
為了能夠處理亂序的事件,并且區(qū)分正常到達和晚到的事件,Flink 需要從事件中獲取事件時間并且產生 watermark(watermarks)。
3.1、DataStream 到 Table 轉換時定義
事件時間屬性可以用 .rowtime 后綴在定義 DataStream schema 的時候來定義。時間戳和 watermark 在這之前一定是在 DataStream 上已經定義好了。
在從 DataStream 到 Table 轉換時定義事件時間屬性有兩種方式。取決于用 .rowtime 后綴修飾的字段名字是否是已有字段,事件時間字段可以是:
1、在 schema 的結尾追加一個新的字段
2、替換一個已經存在的字段。
不管在哪種情況下,事件時間字段都表示 DataStream 中定義的事件的時間戳。
代碼:
援用上面WaterSensor類
package com.lyh.flink12;import com.lyh.bean.WaterSensor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import java.time.Duration;import static org.apache.flink.table.api.Expressions.$;public class Flink_Sql_EventTime {public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> waterSensorSource = env.fromElements(new WaterSensor("sensor_1", 1000L, 100),new WaterSensor("sensor_1", 1000L, 100),new WaterSensor("sensor_2", 1000L, 200),new WaterSensor("sensor_2", 1000L, 200)).assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner((element, recordtime) -> element.getTs()));StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);tableEnv.fromDataStream(waterSensorSource,$("id"),$("ts"),$("vc"),$("pt").rowtime()).execute().print();}
}
運行結果:
3.2、使用已有的字段作為時間屬性
.fromDataStream(waterSensorStream, $("id"), $("ts").rowtime(), $("vc"));
3.3、在創(chuàng)建表的 DDL 中定義
事件時間屬性可以用 WATERMARK 語句在 CREATE TABLE DDL 中進行定義。WATERMARK 語句在一個已有字段上定義一個 watermark 生成表達式,同時標記這個已有字段為時間屬性字段.
package com.lyh.flink12;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;public class Flink_Sql_ddl_EventTime {public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);tableEnv.executeSql("create table sensor(" +"id string," +"ts bigint," +"vc int, " +"t as to_timestamp(from_unixtime(ts/1000,'yyyy-MM-dd HH:mm:ss'))," +"watermark for t as t - interval '5' second)" +"with("+ "'connector' = 'filesystem',"+ "'path' = 'input/sensor.txt',"+ "'format' = 'csv'"+ ")");tableEnv.sqlQuery("select * from sensor").execute().print();}
}
運行結果:
說明:
1.把一個現有的列定義為一個為表標記事件時間的屬性。該列的類型必須為 TIMESTAMP(3),且是 schema 中的頂層列,它也可以是一個計算列。
2.嚴格遞增時間戳: WATERMARK FOR rowtime_column AS rowtime_column。
3.遞增時間戳: WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL ‘0.001’ SECOND。
亂序時間戳: WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL ‘string’ timeUnit。