真人做爰網(wǎng)站怎么提交網(wǎng)址讓百度收錄
? Flink SQL支持對動態(tài)表進行復(fù)雜而靈活的連接操作。 為了處理不同的場景,需要多種查詢語義,因此有幾種不同類型的 Join。默認情況下,joins 的順序是沒有優(yōu)化的。表的 join 順序是在 FROM
從句指定的??梢酝ㄟ^把更新頻率最低的表放在第一個、頻率最高的放在最后這種方式來微調(diào) join 查詢的性能。需要確保表的順序不會產(chǎn)生笛卡爾積,因為不支持這樣的操作并且會導(dǎo)致查詢失敗。
? Flink Join根據(jù)輸入源形式不同可以分為雙流Join、維表Join和其他Join多種形式,下面根據(jù)大類分別介紹各自特點。
一 雙流JOIN
? 在正式進入FlinkSQL Join場景研究之前,首先我們先介紹一下在FlinkSQL場景下常見的Kafka數(shù)據(jù)流分類。截止到Flink1.18為止,目前常見的Kafka數(shù)據(jù)流包括不含鍵更新的普通Kafka數(shù)據(jù)流(即Kafka SQL Connector數(shù)據(jù)流)和包含鍵更新的Kafka數(shù)據(jù)流(即Upsert-Kafka SQL Connector數(shù)據(jù)流)兩種。
1 Regular Join
? Regular join 是最通用的 join 類型。在這種 join 下,join 兩側(cè)表的任何新記錄或變更都是可見的,并會影響整個 join 的結(jié)果。對于流式查詢,regular join 的語法是最靈活的,允許任何類型的更新(插入、更新、刪除)輸入表。 然而,這種操作具有重要的操作意義:Flink 需要將 Join 輸入的兩邊數(shù)據(jù)永遠保持在狀態(tài)中。 因此,計算查詢結(jié)果所需的狀態(tài)可能會無限增長,這取決于所有輸入表的輸入數(shù)據(jù)量。你可以提供一個合適的狀態(tài) time-to-live (TTL) 配置來防止狀態(tài)過大。注意:這樣做可能會影響查詢的正確性。
? 左右兩邊流數(shù)據(jù)都能驅(qū)動join,左側(cè)流新加入數(shù)據(jù)會和右側(cè)流狀態(tài)中所有匹配記錄join上;同理,右側(cè)流新增數(shù)據(jù)會和左側(cè)流所有匹配記錄join上,外連接不會等待,即使Join不上也會即及時輸出,待對側(cè)數(shù)據(jù)到來通過回撤修復(fù)數(shù)據(jù)。
-
Inner Join
根據(jù) join 限制條件返回一個簡單的笛卡爾積。目前只支持 equi-joins,即:至少有一個等值條件。不支持任意的 cross join 和 theta join。
select t1.order_id as order_id,t2.product_id as product_id,t1.create_time as create_time from tbl_order t1 join tbl_order_product t2 on t1.order_id = t2.order_id ;
Inner join不會產(chǎn)生回撤流,source端可以是Kafka SQL Connector也可以試Upsert-kafka SQL Connector,也可以是混合模式,sink端理論均可以是Kafka Connector,但如果輸入端有重復(fù)輸入,輸出端可以設(shè)置成Upsert-Kafka SQL Connector接收數(shù)據(jù)。Upsert-Kafka SQL Connector注意設(shè)置主鍵。
-
outer join
返回所有符合條件的笛卡爾積(即:所有通過 join 條件連接的行),加上所有外表沒有匹配到的行。Flink 支持 LEFT、RIGHT 和 FULL outer joins。目前只支持 equi-joins,即:至少有一個等值條件。不支持任意的 cross join 和 theta join。
select t1.order_id as order_id,t2.product_id as product_id,t1.create_time as create_time from tbl_order t1 left join tbl_order_product t2 on t1.order_id = t2.order_id ;select t1.order_id as order_id,t2.product_id as product_id,t1.create_time as create_time from tbl_order t1 right join tbl_order_product t2 on t1.order_id = t2.order_id ;select t1.order_id as order_id,t2.product_id as product_id,t1.create_time as create_time from tbl_order t1 full join tbl_order_product t2 on t1.order_id = t2.order_id ;
Outer Join會產(chǎn)生回撤流,source端可以是Kafka SQL Connector也可以是Upsert-kafka SQL Connector,也可以是混合模式,sink端理僅支持設(shè)置成Upsert-Kafka SQL Connector接收數(shù)據(jù)。Upsert-Kafka SQL Connector注意設(shè)置主鍵。
-
Regular Join總結(jié)應(yīng)用模式如下(a代表Append-Only流,u代表Upsert-Kafka流):
-
a join a => a|u
-
u join u => a|u
-
a join u => a|u
-
a left join a => u
-
u left join u => u
-
a left join u => u
-
2 Interval Join
? 返回一個符合 join 條件和時間限制的簡單笛卡爾積。Interval join 需要至少一個 equi-join 條件和一個 join 兩邊都包含的時間限定 join 條件。范圍判斷可以定義成就像一個條件(<, <=, >=, >),也可以是一個 BETWEEN 條件,或者兩邊表的一個相同類型(即:處理時間 或 事件時間)的時間屬性 的等式判斷。
? 下面列舉了一些有效的 interval join 時間條件:
ltime = rtime
ltime >= rtime AND ltime < rtime + INTERVAL '10' MINUTE
ltime BETWEEN rtime - INTERVAL '10' SECOND AND rtime + INTERVAL '5' SECOND
? 對于流式查詢,對比 regular join,interval join 只支持有時間屬性的Append-Only表。 由于時間屬性是遞增的,Flink 從狀態(tài)中移除舊值也不會影響結(jié)果的正確性,即interval join會根據(jù)間隔自動維護狀態(tài)大小,不丟棄狀態(tài)也不會讓狀態(tài)無限增長。
-
Inner join
select * from tbl_order t1 join tbl_shopment t2 on t1.order_id = t2.order_id and t1.create_time between t2.create_time - interval '4' hour and t2.create_time ;
? 輸入源只支持Kafka SQL Connector,不支持任何一方回撤流,這也可以理解,因為Interval Join是有時間屬性參與Join的。輸出數(shù)據(jù)可以是Kafka SQL Connector也可以試Upsert-kafka SQL Connector。Upsert-kafka SQL Connector要注意鍵設(shè)計。
-
Outer join
select * from tbl_order t1 left join tbl_shopment t2 on t1.order_id = t2.order_id and t1.create_time between t2.create_time - interval '4' hour and t2.create_time ;select * from tbl_order t1 right join tbl_shopment t2 on t1.order_id = t2.order_id and t1.create_time between t2.create_time - interval '4' hour and t2.create_time ;select * from tbl_order t1 full join tbl_shopment t2 on t1.order_id = t2.order_id and t1.create_time between t2.create_time - interval '4' hour and t2.create_time ;
? 輸入端僅至此Kafka SQL Connector,不支持任何一方回撤流,這也可以理解,因為Interval Join是有時間屬性參與Outer Join的。輸出數(shù)據(jù)可以是Kafka SQL Connector也可以試Upsert-kafka SQL Connector。Upsert-kafka SQL Connector要注意鍵設(shè)計。
-
注意點
-
測試要配置并行度為1,否則右表關(guān)聯(lián)不上數(shù)據(jù)因為水位線識別不到會而不超時輸出;
executionEnvironment.setParallelism(1);
-
left join右表關(guān)聯(lián)不上輸出條件
- 右表關(guān)聯(lián)數(shù)據(jù)出現(xiàn)觸發(fā)輸出
- 超時觸發(fā)器輸出關(guān)聯(lián)不上數(shù)據(jù)
-
-
Interval Join總結(jié)應(yīng)用模式如下(a代表Append-Only流,u代表Upsert-Kafka流):
-
a join a => a|u
-
a left join a => a|u
-
3 Temporal Join(Snapshot Join)
? 時態(tài)表(Temporal table)是一個隨時間變化的表:在 Flink 中被稱為動態(tài)表。時態(tài)表中的行與一個或多個時間段相關(guān)聯(lián),所有 Flink 中的表都是時態(tài)的(Temporal)。 時態(tài)表包含一個或多個版本的表快照,它可以是一個變化的歷史表,跟蹤變化(例如,數(shù)據(jù)庫變化日志,包含所有快照)或一個變化的維度表,也可以是一個將變更物化的維表(例如,存放最終快照的數(shù)據(jù)表)。
-
Inner join
select t1.order_id as order_id,t1.user_id as user_id,t2.user_name as user_name,t1.create_time as create_time from tbl_order t1 join tbl_user for system_time as of t1.create_time t2 on t1.user_id = t2.user_id ;
特點:
- 左右兩邊事件時間屬性,標識兩側(cè)流join場景,如果處理時間請參考Lookup join;
- 只支持event-time,如果是processing-time那么就變成join最新版本數(shù)據(jù),同Lookup Join;
- 左表支持append流和upsert流;
- 右表只支持upsert流;
- 輸出可以是append流或者upsert流;
- 左表觸發(fā)計算,右表更新不觸發(fā)計算;
- 設(shè)置超時時間:
tableEnvironment.getConfig().set("table.exec.source.idle-timeout","3s");
;
-
Left join
select t1.order_id as order_id,t1.user_id as user_id,t2.user_name as user_name,t1.create_time as create_time from tbl_order t1 left join tbl_user for system_time as of t1.create_time t2 on t1.user_id = t2.user_id ;
特點:
- 左右兩邊事件時間屬性,標識兩側(cè)流join場景,如果處理時間請參考Lookup join;
- 只支持event-time,如果是processing-time那么就變成join最新版本數(shù)據(jù),同Lookup Join;
- 左表支持append流和upsert流;
- 右表只支持upsert流;
- 輸出可以是append流或者upsert流;
- 左表觸發(fā)計算,右表更新不觸發(fā)計算;
- 設(shè)置超時時間:
tableEnvironment.getConfig().set("table.exec.source.idle-timeout","3s");
;
-
Snapshot Join總結(jié)應(yīng)用模式如下(a代表Append-Only流,u代表Upsert-Kafka流):
-
a join u => a|u
-
u join u => u
-
a left join u => a|u
-
u left join u => u
-
4 Window Join
? 窗口關(guān)聯(lián)就是增加時間維度到關(guān)聯(lián)條件中。在此過程中,窗口關(guān)聯(lián)將兩個流中在同一窗口且符合 join 條件的元素 join 起來。窗口關(guān)聯(lián)的語義和DataStream window join相同。
? 在流式查詢中,與其他連續(xù)表上的關(guān)聯(lián)不同,窗口關(guān)聯(lián)不產(chǎn)生中間結(jié)果,只在窗口結(jié)束產(chǎn)生一個最終的結(jié)果。另外,窗口關(guān)聯(lián)會清除不需要的中間狀態(tài)。
? 通常,窗口關(guān)聯(lián)和窗口表值函數(shù)一起使用。而且,窗口關(guān)聯(lián)可以在其他基于窗口表值函數(shù)的操作后使用,例如窗口聚合,窗口 Top-N和窗口關(guān)聯(lián)。
? 目前,窗口關(guān)聯(lián)需要在 join on 條件中包含兩個輸入表的 window_start
等值條件和 window_end
等值條件。
? 窗口關(guān)聯(lián)支持 INNER/LEFT/RIGHT/FULL OUTER/ANTI/SEMI JOIN。
-
語法
select ... from l [left|right|full outer] join r -- l and r are relations applied windowing TVF on l.window_start = r.window_start and l.window_end = r.window_end and ...
-
注意
-
當前版本窗口Join必須同時指定window_start和window_end等值條件
-
窗口Join不支持源是upsert流的情況
-
-
限制
- Join 子句的限制
? 目前,窗口關(guān)聯(lián)需要在 join on 條件中包含兩個輸入表的
window_start
等值條件和window_end
等值條件。未來,如果是滾動或滑動窗口,只需要在 join on 條件中包含窗口開始相等即可。- 輸入的窗口表值函數(shù)的限制
? 目前,關(guān)聯(lián)的左右兩邊必須使用相同的窗口表值函數(shù)。這個規(guī)則在未來可以擴展,比如:滾動和滑動窗口在窗口大小相同的情況下 join。
- 窗口表值函數(shù)之后直接使用窗口關(guān)聯(lián)的限制
? 目前窗口關(guān)聯(lián)支持作用在滾動(TUMBLE)、滑動(HOP)和累積(CUMULATE)窗口表值函數(shù)之上,但是還不支持會話窗口(SESSION)。
-
Snapshot Join總結(jié)應(yīng)用模式如下(a代表Append-Only流,u代表Upsert-Kafka流):
-
a join a => a|u
-
a left join a => a|u
-
二 維表JOIN
5 Lookup Join(processing-time temporal join)
? lookup join 通常用于使用從外部系統(tǒng)查詢的數(shù)據(jù)來豐富表。join 要求一個表具有處理時間屬性,另一個表由查找源連接器(lookup source connnector)支持。通常使用基于處理時間的流表與外部版本表(例如 mysql、hbase)的最新版本相關(guān)聯(lián)(即processing-time temporal join 常常用在使用外部系統(tǒng)來豐富流的數(shù)據(jù))。
? 通過定義一個處理時間屬性,這個 join 總是返回最新的值??梢詫?build side 中被查找的表想象成一個存儲所有記錄簡單的 HashMap<K,V>
。 這種 join 的強大之處在于,當無法在 Flink 中將表具體化為動態(tài)表時,它允許 Flink 直接針對外部系統(tǒng)工作。
? Join操作由流端觸發(fā),當新增一個流數(shù)據(jù),會查詢外部DB映射,獲取數(shù)據(jù)補全后發(fā)出結(jié)果數(shù)據(jù)。
-
inner join
select t1.order_id as order_id,t1.user_id as user_id,t2.user_name as user_name,t1.create_time as create_time from tbl_order t1 join tbl_user for system_time as of t1.create_time t2 on t1.user_id = t2.user_id ;
特點:
- Lookup join只支持inner join和left join;
- 源必須聲明處理時間,即row_time as proctime(),如果源聲明為事件時間,那么要走Snapshot join方式;
- 源支持kafka和upsert-kafka連接器
- 輸出支持kafka和upsert-kafka連接器
- 查詢外部表注意使用異步IO/Cache特性優(yōu)化外表查詢性能
-
Left join
select t1.order_id as order_id,t1.user_id as user_id,t2.user_name as user_name,t1.create_time as create_time from tbl_order t1 left join tbl_user for system_time as of t1.create_time t2 on t1.user_id = t2.user_id ;
特點:
- Lookup join只支持inner join和left join;
- 源必須聲明處理時間,即row_time as proctime(),如果源聲明為事件時間,那么要走Snapshot join方式;
- 源支持kafka和upsert-kafka連接器
- 輸出支持kafka和upsert-kafka連接器
- 查詢外部表注意使用異步IO/Cache特性優(yōu)化外表查詢性能
-
Lookup Join總結(jié)應(yīng)用模式如下(a代表Append-Only流,s代表外表靜態(tài)表):
-
a join s => a|u
-
u join s => a|u
-
a left join s => a|u
-
u left join s => a|u
-
三 其他JOIN
6 Array Expansion
? 對于輸入的包含數(shù)組列的單行數(shù)據(jù),返回給定數(shù)組中每個元素的新行,拆分后的數(shù)據(jù)除解析數(shù)組元素外,其他元素與原始行數(shù)據(jù)一致。
selectorder_id,order_tag,tag
from tbl_order_source cross join unnest(order_tag) as t(tag)
;
特征:
- 輸入數(shù)據(jù)可以是Append或者Upsert
- 輸出數(shù)據(jù)可以是Append或者Upsert
7 Table Function
? 將表與表函數(shù)的結(jié)果聯(lián)接。左側(cè)(外部)表的每一行都與表函數(shù)的相應(yīng)調(diào)用產(chǎn)生的所有行相連接。用戶自定義表函數(shù)必須在使用前注冊。
? 對于是inner join,如果表函數(shù)調(diào)用返回一個空結(jié)果,那么左表的這行數(shù)據(jù)將不會輸出。對于left join,如果表函數(shù)調(diào)用返回了一個空結(jié)果,則保留相應(yīng)的行,并用空值填充未關(guān)聯(lián)到的結(jié)果。當前,針對 lateral table 的 left outer join 需要 ON 子句中有一個固定的 TRUE 連接條件。
select order_id,order_tag,tag
from tbl_order_source
left join lateral table(table_func(order_tag)) t(tag) on true
;
特征:
- 輸入數(shù)據(jù)可以是Append或者Upsert
- 輸出數(shù)據(jù)可以是Append或者Upsert