鄭州建站的站長收錄平臺
Flink系列之:Top-N
- 一、TOP-N
- 二、無排名輸出優(yōu)化
一、TOP-N
- 適用于流、批
- Top-N 查詢可以根據(jù)指定列排序后獲得前 N 個最小或最大值。最小值和最大值集都被認(rèn)為是Top-N查詢。在需要從批表或流表中僅顯示 N 個底部或 N 個頂部記錄時,Top-N 查詢是非常有用的。并且該結(jié)果集還可用于進(jìn)一步分析。
- Flink 使用 OVER 窗口子句和過濾條件的組合來表達(dá)一個 Top-N 查詢。借助 OVER 窗口的 PARTITION BY 子句能力,Flink 也能支持分組 Top-N。例如:實(shí)時顯示每個分類下銷售額最高的五個產(chǎn)品。對于批處理和流處理模式的SQL,都支持 Top-N 查詢。
下面展示了 Top-N 的語法:
SELECT [column_list]
FROM (SELECT [column_list],ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownumFROM table_name)
WHERE rownum <= N [AND conditions]
參數(shù)說明:
- ROW_NUMBER():根據(jù)分區(qū)數(shù)據(jù)的排序,為每一行分配一個唯一且連續(xù)的序號,從 1 開始。目前,只支持 ROW_NUMBER 作為 OVER 窗口函數(shù)。未來會支持 RANK() 和 DENSE_RANK()。
- PARTITION BY col1[, col2…]:指定分區(qū)字段。每個分區(qū)都會有一個 Top-N 的結(jié)果。
- ORDER BY col1 [asc|desc][, col2 [asc|desc]…]: 指定排序列。 每個列的排序類型(ASC/DESC)可以不同。
- WHERE rownum <= N: Flink 需要 rownum <= N 才能識別此查詢是 Top-N 查詢。 N 表示將要保留 N 個最大或最小數(shù)據(jù)。
- [AND conditions]: 可以在 WHERE 子句中添加其他條件,但是這些其他條件和 rownum <= N 需要使用 AND 結(jié)合。
Top-N 查詢是結(jié)果更新的. Flink SQL會根據(jù)ORDER BY的字段對輸入的數(shù)據(jù)流進(jìn)行排序,所以如果前 N 條記錄發(fā)生了變化,那么變化后的記錄將作為回撤/更新記錄發(fā)送到下游。 建議使用一個支持更新的存儲作為 Top-N 查詢的結(jié)果表。此外,如果 Top-N 條記錄需要存儲在外部存儲中,結(jié)果表應(yīng)該與Top-N查詢的唯一鍵保持一致。
Top-N 查詢的唯一鍵是分區(qū)字段和 rownum 字段的組合。Top-N 查詢也可以獲取上游的唯一鍵。用下面的 job 舉例:比如 product_id 是 ShopSales 的唯一鍵,這時 Top-N 查詢的唯一鍵是[category, rownum] 和 [product_id]。
下面的示例展示了在流式表上指定 Top-N SQL 查詢。這也是上面提到的 ‘實(shí)時顯示每個分類下銷售額最高的五個產(chǎn)品’ 的示例。
CREATE TABLE ShopSales (product_id STRING,category STRING,product_name STRING,sales BIGINT
) WITH (...);SELECT *
FROM (SELECT *,ROW_NUMBER() OVER (PARTITION BY category ORDER BY sales DESC) AS row_numFROM ShopSales)
WHERE row_num <= 5
二、無排名輸出優(yōu)化
如上所述, rownum 將作為唯一鍵的一個字段寫入到結(jié)果表,這可能會導(dǎo)致大量數(shù)據(jù)寫入到結(jié)果表。例如,排名第九(比如 product-1001)的記錄更新為 1,排名 1 到 9 的所有記錄都會作為更新信息逐條寫入到結(jié)果表。如果結(jié)果表收到太多的數(shù)據(jù),它將會成為這個 SQL 任務(wù)的瓶頸。
優(yōu)化的方法是在 Top-N 查詢的外層 SELECT 子句中省略 rownum 字段。因?yàn)橥ǔ?Top-N 的數(shù)據(jù)量不大,消費(fèi)端就可以快速地排序。下面的示例中就沒有 rownum 字段,只需要發(fā)送變更數(shù)據(jù)(product-1001)到下游,這樣可以減少結(jié)果表很多 IO。
下面的示例展示了用這種方法怎樣去優(yōu)化上面的 Top-N:
CREATE TABLE ShopSales (product_id STRING,category STRING,product_name STRING,sales BIGINT
) WITH (...);-- omit row_num field from the output
SELECT product_id, category, product_name, sales
FROM (SELECT *,ROW_NUMBER() OVER (PARTITION BY category ORDER BY sales DESC) AS row_numFROM ShopSales)
WHERE row_num <= 5
Attention in Streaming Mode 為了上面的查詢輸出到外部存儲的正確性,外部存儲必須和 Top-N 查詢擁有相同的唯一鍵。在上面的示例中,如果 product_id 是查詢的唯一鍵,外部表應(yīng)該也把 product_id 作為唯一鍵。