網(wǎng)站菜單導(dǎo)航怎么做網(wǎng)站seo優(yōu)化免費
版本說明
Flink和kafka的版本號有一定的匹配關(guān)系,操作成功的版本:
- Flink1.17.1
- kafka_2.12-3.3.1
添加kafka連接器依賴
將flink-sql-connector-kafka-1.17.1.jar上傳到flink的lib目錄下
下載flink-sql-connector-kafka連接器jar包
https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka/1.17.1
上傳到flink的lib目錄下
[hadoop@node2 ~]$ cp flink-connector-kafka-1.17.1.jar $FLINK_HOME/lib
分發(fā)flink-connector-kafka-1.17.1.jar
xsync $FLINK_HOME/lib/flink-connector-kafka-1.17.1.jar
啟動yarn-session
[hadoop@node2 ~]$ myhadoop.sh start [hadoop@node2 ~]$ yarn-session.sh -d
啟動kafka集群
[hadoop@node2 ~]$ zk.sh start [hadoop@node2 ~]$ kf.sh start
創(chuàng)建kafka主題
查看主題 [hadoop@node2 ~]$ kafka-topics.sh --bootstrap-server node2:9092,node3:9092,node4:9092 --list ? 如果沒有ws1,則創(chuàng)建 [hadoop@node2 ~]$ kafka-topics.sh --bootstrap-server node2:9092,node3:9092,node4:9092 --create --replication-factor 1 --partitions 1 --topic ws1 ?
普通Kafka表
'connector' = 'kafka'
進(jìn)入Flink SQL客戶端
[hadoop@node2 ~]$ sql-client.sh embedded -s yarn-session ... 省略若干日志輸出 ... Flink SQL>
創(chuàng)建Kafka的映射表
CREATE TABLE t1( `event_time` TIMESTAMP(3) METADATA FROM 'timestamp',--列名和元數(shù)據(jù)名一致可以省略 FROM 'xxxx', VIRTUAL表示只讀`partition` BIGINT METADATA VIRTUAL,`offset` BIGINT METADATA VIRTUAL,
id int,
ts bigint ,
vc int )
WITH ('connector' = 'kafka','properties.bootstrap.servers' = 'node2:9092,node3:9092,node4:9094','properties.group.id' = 'test',
-- 'earliest-offset', 'latest-offset', 'group-offsets', 'timestamp' and 'specific-offsets''scan.startup.mode' = 'earliest-offset',-- fixed為flink實現(xiàn)的分區(qū)器,一個并行度只寫往kafka一個分區(qū)
'sink.partitioner' = 'fixed','topic' = 'ws1','format' = 'json'
);
可以往kafka讀數(shù)據(jù),也可以往kafka寫數(shù)據(jù)。
插入數(shù)據(jù)到Kafka表
如果沒有source表,先創(chuàng)建source表,如果source表存在則不需要再創(chuàng)建。
CREATE TABLE source ( id INT, ts BIGINT, vc INT
) WITH ( 'connector' = 'datagen', 'rows-per-second'='1', 'fields.id.kind'='random', 'fields.id.min'='1', 'fields.id.max'='10', 'fields.ts.kind'='sequence', 'fields.ts.start'='1', 'fields.ts.end'='1000000', 'fields.vc.kind'='random', 'fields.vc.min'='1', 'fields.vc.max'='100'
);
把source表插入t1表
insert into t1(id,ts,vc) select * from source;
如果報錯
[ERROR] Could not execute SQL statement. Reason: java.lang.ClassNotFoundException: org.apache.kafka.common.serialization.ByteArraySerializer
依然同樣錯誤,還不行,把kafka libs目錄下的kafka-clients-3.3.1.jar,把jar包發(fā)到Flink的lib目錄,同時也注意重啟sql-client、yarn-session也要重啟(重要)
cp $KAFKA_HOME/libs/kafka-clients-3.3.1.jar $FLINK_HOME/lib
查看是否復(fù)制成功
$ ls $FLINK_HOME/lib
重啟sql-client重新操作,成功如下:
Flink SQL> CREATE TABLE t1( > ? `event_time` TIMESTAMP(3) METADATA FROM 'timestamp', > ? --列名和元數(shù)據(jù)名一致可以省略 FROM 'xxxx', VIRTUAL表示只讀 > ? `partition` BIGINT METADATA VIRTUAL, > ? `offset` BIGINT METADATA VIRTUAL, > id int, > ts bigint , > vc int ) > WITH ( > ? 'connector' = 'kafka', > ? 'properties.bootstrap.servers' = 'node2:9092,node3:9092,node4:9094', > ? 'properties.group.id' = 'test', > -- 'earliest-offset', 'latest-offset', 'group-offsets', 'timestamp' and 'specific-offsets' > ? 'scan.startup.mode' = 'earliest-offset', > ? -- fixed為flink實現(xiàn)的分區(qū)器,一個并��度只寫往kafka一個分區(qū) > 'sink.partitioner' = 'fixed', > ? 'topic' = 'ws1', > ? 'format' = 'json' > ); [INFO] Execute statement succeed. ? Flink SQL> CREATE TABLE source ( > ? ? id INT, > ? ? ts BIGINT, > ? ? vc INT > ) WITH ( > ? ? 'connector' = 'datagen', > ? ? 'rows-per-second'='1', > ? ? 'fields.id.kind'='random', > ? ? 'fields.id.min'='1', > ? ? 'fields.id.max'='10', > ? ? 'fields.ts.kind'='sequence', > ? ? 'fields.ts.start'='1', > ? ? 'fields.ts.end'='1000000', > ? ? 'fields.vc.kind'='random', > ? ? 'fields.vc.min'='1', > ? ? 'fields.vc.max'='100' > ); [INFO] Execute statement succeed. ? Flink SQL> insert into t1(id,ts,vc) select * from source;2024-06-14 10:45:30,125 WARN org.apache.flink.yarn.configuration.YarnLogConfigUtil ? ? ? [] - The configuration directory ('/home/hadoop/soft/flink-1.17.1/conf') already contains a LOG4J config file.If you want to use logback, then please delete or rename the log configuration file. 2024-06-14 10:45:30,673 INFO org.apache.hadoop.yarn.client.RMProxy ? ? ? ? ? ? ? ? ? ? ? [] - Connecting to ResourceManager at node3/192.168.193.143:8032 2024-06-14 10:45:31,027 INFO org.apache.flink.yarn.YarnClusterDescriptor ? ? ? ? ? ? ? ? [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar 2024-06-14 10:45:31,227 INFO org.apache.flink.yarn.YarnClusterDescriptor ? ? ? ? ? ? ? ? [] - Found Web Interface node3:41749 of application 'application_1718331886020_0001'. insert into t1(id,ts,vc) select * from source; [INFO] Submitting SQL update statement to the cluster... [INFO] SQL update statement has been successfully submitted to the cluster: Job ID: b1765f969c3ae637bd4c8100efbb0c4e ?
查詢Kafka表
select * from t1;
報錯
[ERROR] Could not execute SQL statement. Reason: java.lang.ClassNotFoundException: org.apache.kafka.clients.consumer.ConsumerRecord?
重啟yarn session,重新操作,成功如下:
Flink SQL> CREATE TABLE t1( > ? `event_time` TIMESTAMP(3) METADATA FROM 'timestamp', > ? --列名和元數(shù)據(jù)名一致可以省略 FROM 'xxxx', VIRTUAL表示只讀 > ? `partition` BIGINT METADATA VIRTUAL, > ? `offset` BIGINT METADATA VIRTUAL, > id int, > ts bigint , > vc int ) > WITH ( > ? 'connector' = 'kafka', > ? 'properties.bootstrap.servers' = 'node2:9092,node3:9092,node4:9094', > ? 'properties.group.id' = 'test', > -- 'earliest-offset', 'latest-offset', 'group-offsets', 'timestamp' and 'specific-offsets' > ? 'scan.startup.mode' = 'earliest-offset', > ? -- fixed為flink實現(xiàn)的分區(qū)器,一個并??度只寫往kafka一個分區(qū) > 'sink.partitioner' = 'fixed', > ? 'topic' = 'ws1', > ? 'format' = 'json' > ); [INFO] Execute statement succeed. ? Flink SQL> CREATE TABLE source ( > ? ? id INT, > ? ? ts BIGINT, > ? ? vc INT > ) WITH ( > ? ? 'connector' = 'datagen', > ? ? 'rows-per-second'='1', > ? ? 'fields.id.kind'='random', > ? ? 'fields.id.min'='1', > ? ? 'fields.id.max'='10', > ? ? 'fields.ts.kind'='sequence', > ? ? 'fields.ts.start'='1', > ? ? 'fields.ts.end'='1000000', > ? ? 'fields.vc.kind'='random', > ? ? 'fields.vc.min'='1', > ? ? 'fields.vc.max'='100' > ); [INFO] Execute statement succeed. ? Flink SQL> insert into t1(id,ts,vc) select * from source;2024-06-14 11:22:17,971 WARN org.apache.flink.yarn.configuration.YarnLogConfigUtil ? ? ? [] - The configuration directory ('/home/hadoop/soft/flink-1.17.1/conf') already contains a LOG4J config file.If you want to use logback, then please delete or rename the log configuration file. 2024-06-14 11:22:18,422 INFO org.apache.hadoop.yarn.client.RMProxy ? ? ? ? ? ? ? ? ? ? ? [] - Connecting to ResourceManager at node3/192.168.193.143:8032 2024-06-14 11:22:18,895 INFO org.apache.flink.yarn.YarnClusterDescriptor ? ? ? ? ? ? ? ? [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar 2024-06-14 11:22:19,052 INFO org.apache.flink.yarn.YarnClusterDescriptor ? ? ? ? ? ? ? ? [] - Found Web Interface node4:38788 of application 'application_1718331886020_0002'. insert into t1(id,ts,vc) select * from source; [INFO] Submitting SQL update statement to the cluster... [INFO] SQL update statement has been successfully submitted to the cluster: Job ID: 84292f84d1fce4756ccd8ae294b6163a ? ? Flink SQL> select * from t1;2024-06-14 11:23:38,338 WARN org.apache.flink.yarn.configuration.YarnLogConfigUtil ? ? ? [] - The configuration directory ('/home/hadoop/soft/flink-1.17.1/conf') already contains a LOG4J config file.If you want to use logback, then please delete or rename the log configuration file. 2024-06-14 11:23:38,606 INFO org.apache.hadoop.yarn.client.RMProxy ? ? ? ? ? ? ? ? ? ? ? [] - Connecting to ResourceManager at node3/192.168.193.143:8032 2024-06-14 11:23:38,617 INFO org.apache.flink.yarn.YarnClusterDescriptor ? ? ? ? ? ? ? ? [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar 2024-06-14 11:23:38,649 INFO org.apache.flink.yarn.YarnClusterDescriptor ? ? ? ? ? ? ? ? [] - Found Web Interface node4:38788 of application 'application_1718331886020_0002'. select * from t1; [INFO] Result retrieval cancelled. ? Flink SQL> ?
?
upsert-kafka表
'connector' = 'upsert-kafka'
如果當(dāng)前表存在更新操作,那么普通的kafka連接器將無法滿足,此時可以使用Upsert Kafka連接器。
創(chuàng)建upsert-kafka的映射表(必須定義主鍵)
CREATE TABLE t2( id int , sumVC int ,primary key (id) NOT ENFORCED
)
WITH ('connector' = 'upsert-kafka','properties.bootstrap.servers' = 'node2:9092','topic' = 'ws2','key.format' = 'json','value.format' = 'json'
);
如果沒有kafka名為ws2的topic,將自動被創(chuàng)建。
插入upsert-kafka表
insert into t2 select id,sum(vc) sumVC from source group by id;
查詢upsert-kafka表
upsert-kafka 無法從指定的偏移量讀取,只會從主題的源讀取。如此,才知道整個數(shù)據(jù)的更新過程。并且通過 -U,+U,+I 等符號來顯示數(shù)據(jù)的變化過程。
設(shè)置顯示模式
SET sql-client.execution.result-mode=tableau;
?查詢t2表數(shù)據(jù)
select * from t2;
如果發(fā)現(xiàn)沒有輸出數(shù)據(jù),原因是之前的source表已經(jīng)生成到end(1000000)就不再生成數(shù)據(jù)了。
進(jìn)入Flink Web UI,cancel掉所有running job,重新操作成功如下:
刪除表
Flink SQL> show tables; +------------+ | table name | +------------+ | ? ? source | | ? ? ? ? t1 | | ? ? ? ? t2 | +------------+ 3 rows in set ? Flink SQL> drop table source; Flink SQL> drop table t1; Flink SQL> drop table t2;
創(chuàng)建表
CREATE TABLE source ( id INT, ts BIGINT, vc INT
) WITH ( 'connector' = 'datagen', 'rows-per-second'='1', 'fields.id.kind'='random', 'fields.id.min'='1', 'fields.id.max'='10', 'fields.ts.kind'='sequence', 'fields.ts.start'='1', 'fields.ts.end'='1000000', 'fields.vc.kind'='random', 'fields.vc.min'='1', 'fields.vc.max'='100'
);
CREATE TABLE t2( id int , sumVC int ,primary key (id) NOT ENFORCED
)
WITH ('connector' = 'upsert-kafka','properties.bootstrap.servers' = 'node2:9092','topic' = 'ws2','key.format' = 'json','value.format' = 'json'
);
設(shè)置顯示模式
SET sql-client.execution.result-mode=tableau;
查詢表
select * from t2;
?
完成!enjoy it!