国产亚洲精品福利在线无卡一,国产精久久一区二区三区,亚洲精品无码国模,精品久久久久久无码专区不卡

當(dāng)前位置: 首頁 > news >正文

網(wǎng)站菜單導(dǎo)航怎么做網(wǎng)站seo優(yōu)化免費

網(wǎng)站菜單導(dǎo)航怎么做,網(wǎng)站seo優(yōu)化免費,wordpress中文,設(shè)計公司的簡介介紹版本說明 Flink和kafka的版本號有一定的匹配關(guān)系,操作成功的版本: Flink1.17.1kafka_2.12-3.3.1 添加kafka連接器依賴 將flink-sql-connector-kafka-1.17.1.jar上傳到flink的lib目錄下 下載flink-sql-connector-kafka連接器jar包 https://mvnreposi…

版本說明

Flink和kafka的版本號有一定的匹配關(guān)系,操作成功的版本:

  • Flink1.17.1
  • kafka_2.12-3.3.1

添加kafka連接器依賴

將flink-sql-connector-kafka-1.17.1.jar上傳到flink的lib目錄下

下載flink-sql-connector-kafka連接器jar包

https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka/1.17.1

上傳到flink的lib目錄下

[hadoop@node2 ~]$ cp flink-connector-kafka-1.17.1.jar $FLINK_HOME/lib

分發(fā)flink-connector-kafka-1.17.1.jar

xsync $FLINK_HOME/lib/flink-connector-kafka-1.17.1.jar

啟動yarn-session

[hadoop@node2 ~]$ myhadoop.sh start
[hadoop@node2 ~]$ yarn-session.sh -d

啟動kafka集群

[hadoop@node2 ~]$ zk.sh start
[hadoop@node2 ~]$ kf.sh start

創(chuàng)建kafka主題

查看主題
[hadoop@node2 ~]$ kafka-topics.sh --bootstrap-server node2:9092,node3:9092,node4:9092 --list
?
如果沒有ws1,則創(chuàng)建
[hadoop@node2 ~]$ kafka-topics.sh --bootstrap-server node2:9092,node3:9092,node4:9092 --create --replication-factor 1 --partitions 1 --topic ws1
?

普通Kafka表

'connector' = 'kafka'

進(jìn)入Flink SQL客戶端

[hadoop@node2 ~]$ sql-client.sh embedded -s yarn-session
...
省略若干日志輸出
...
Flink SQL> 

創(chuàng)建Kafka的映射表

CREATE TABLE t1( `event_time` TIMESTAMP(3) METADATA FROM 'timestamp',--列名和元數(shù)據(jù)名一致可以省略 FROM 'xxxx', VIRTUAL表示只讀`partition` BIGINT METADATA VIRTUAL,`offset` BIGINT METADATA VIRTUAL,
id int, 
ts bigint , 
vc int )
WITH ('connector' = 'kafka','properties.bootstrap.servers' = 'node2:9092,node3:9092,node4:9094','properties.group.id' = 'test',
-- 'earliest-offset', 'latest-offset', 'group-offsets', 'timestamp' and 'specific-offsets''scan.startup.mode' = 'earliest-offset',-- fixed為flink實現(xiàn)的分區(qū)器,一個并行度只寫往kafka一個分區(qū)
'sink.partitioner' = 'fixed','topic' = 'ws1','format' = 'json'
);

可以往kafka讀數(shù)據(jù),也可以往kafka寫數(shù)據(jù)。

插入數(shù)據(jù)到Kafka表

如果沒有source表,先創(chuàng)建source表,如果source表存在則不需要再創(chuàng)建。

CREATE TABLE source ( id INT, ts BIGINT, vc INT
) WITH ( 'connector' = 'datagen', 'rows-per-second'='1', 'fields.id.kind'='random', 'fields.id.min'='1', 'fields.id.max'='10', 'fields.ts.kind'='sequence', 'fields.ts.start'='1', 'fields.ts.end'='1000000', 'fields.vc.kind'='random', 'fields.vc.min'='1', 'fields.vc.max'='100'
);

把source表插入t1表

insert into t1(id,ts,vc) select * from source;

如果報錯

[ERROR] Could not execute SQL statement. Reason:
java.lang.ClassNotFoundException: org.apache.kafka.common.serialization.ByteArraySerializer

依然同樣錯誤,還不行,把kafka libs目錄下的kafka-clients-3.3.1.jar,把jar包發(fā)到Flink的lib目錄,同時也注意重啟sql-client、yarn-session也要重啟(重要)

cp $KAFKA_HOME/libs/kafka-clients-3.3.1.jar $FLINK_HOME/lib

查看是否復(fù)制成功

$ ls $FLINK_HOME/lib

重啟sql-client重新操作,成功如下:

Flink SQL> CREATE TABLE t1( 
> ? `event_time` TIMESTAMP(3) METADATA FROM 'timestamp',
> ? --列名和元數(shù)據(jù)名一致可以省略 FROM 'xxxx', VIRTUAL表示只讀
> ? `partition` BIGINT METADATA VIRTUAL,
> ? `offset` BIGINT METADATA VIRTUAL,
> id int, 
> ts bigint , 
> vc int )
> WITH (
> ? 'connector' = 'kafka',
> ? 'properties.bootstrap.servers' = 'node2:9092,node3:9092,node4:9094',
> ? 'properties.group.id' = 'test',
> -- 'earliest-offset', 'latest-offset', 'group-offsets', 'timestamp' and 'specific-offsets'
> ? 'scan.startup.mode' = 'earliest-offset',
> ? -- fixed為flink實現(xiàn)的分區(qū)器,一個并��度只寫往kafka一個分區(qū)
> 'sink.partitioner' = 'fixed',
> ? 'topic' = 'ws1',
> ? 'format' = 'json'
> );
[INFO] Execute statement succeed.
?
Flink SQL> CREATE TABLE source ( 
> ? ? id INT, 
> ? ? ts BIGINT, 
> ? ? vc INT
> ) WITH ( 
> ? ? 'connector' = 'datagen', 
> ? ? 'rows-per-second'='1', 
> ? ? 'fields.id.kind'='random', 
> ? ? 'fields.id.min'='1', 
> ? ? 'fields.id.max'='10', 
> ? ? 'fields.ts.kind'='sequence', 
> ? ? 'fields.ts.start'='1', 
> ? ? 'fields.ts.end'='1000000', 
> ? ? 'fields.vc.kind'='random', 
> ? ? 'fields.vc.min'='1', 
> ? ? 'fields.vc.max'='100'
> );
[INFO] Execute statement succeed.
?
Flink SQL> insert into t1(id,ts,vc) select * from source;2024-06-14 10:45:30,125 WARN  org.apache.flink.yarn.configuration.YarnLogConfigUtil ? ? ?  [] - The configuration directory ('/home/hadoop/soft/flink-1.17.1/conf') already contains a LOG4J config file.If you want to use logback, then please delete or rename the log configuration file.
2024-06-14 10:45:30,673 INFO  org.apache.hadoop.yarn.client.RMProxy ? ? ? ? ? ? ? ? ? ? ?  [] - Connecting to ResourceManager at node3/192.168.193.143:8032
2024-06-14 10:45:31,027 INFO  org.apache.flink.yarn.YarnClusterDescriptor ? ? ? ? ? ? ? ?  [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2024-06-14 10:45:31,227 INFO  org.apache.flink.yarn.YarnClusterDescriptor ? ? ? ? ? ? ? ?  [] - Found Web Interface node3:41749 of application 'application_1718331886020_0001'.
insert into t1(id,ts,vc) select * from source;
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: b1765f969c3ae637bd4c8100efbb0c4e
?

查詢Kafka表

select * from t1;

報錯

[ERROR] Could not execute SQL statement. Reason:
java.lang.ClassNotFoundException: org.apache.kafka.clients.consumer.ConsumerRecord?

重啟yarn session,重新操作,成功如下:

Flink SQL> CREATE TABLE t1( 
> ? `event_time` TIMESTAMP(3) METADATA FROM 'timestamp',
> ? --列名和元數(shù)據(jù)名一致可以省略 FROM 'xxxx', VIRTUAL表示只讀
> ? `partition` BIGINT METADATA VIRTUAL,
> ? `offset` BIGINT METADATA VIRTUAL,
> id int, 
> ts bigint , 
> vc int )
> WITH (
> ? 'connector' = 'kafka',
> ? 'properties.bootstrap.servers' = 'node2:9092,node3:9092,node4:9094',
> ? 'properties.group.id' = 'test',
> -- 'earliest-offset', 'latest-offset', 'group-offsets', 'timestamp' and 'specific-offsets'
> ? 'scan.startup.mode' = 'earliest-offset',
> ? -- fixed為flink實現(xiàn)的分區(qū)器,一個并??度只寫往kafka一個分區(qū)
> 'sink.partitioner' = 'fixed',
> ? 'topic' = 'ws1',
> ? 'format' = 'json'
> );
[INFO] Execute statement succeed.
?
Flink SQL> CREATE TABLE source ( 
> ? ? id INT, 
> ? ? ts BIGINT, 
> ? ? vc INT
> ) WITH ( 
> ? ? 'connector' = 'datagen', 
> ? ? 'rows-per-second'='1', 
> ? ? 'fields.id.kind'='random', 
> ? ? 'fields.id.min'='1', 
> ? ? 'fields.id.max'='10', 
> ? ? 'fields.ts.kind'='sequence', 
> ? ? 'fields.ts.start'='1', 
> ? ? 'fields.ts.end'='1000000', 
> ? ? 'fields.vc.kind'='random', 
> ? ? 'fields.vc.min'='1', 
> ? ? 'fields.vc.max'='100'
> );
[INFO] Execute statement succeed.
?
Flink SQL> insert into t1(id,ts,vc) select * from source;2024-06-14 11:22:17,971 WARN  org.apache.flink.yarn.configuration.YarnLogConfigUtil ? ? ?  [] - The configuration directory ('/home/hadoop/soft/flink-1.17.1/conf') already contains a LOG4J config file.If you want to use logback, then please delete or rename the log configuration file.
2024-06-14 11:22:18,422 INFO  org.apache.hadoop.yarn.client.RMProxy ? ? ? ? ? ? ? ? ? ? ?  [] - Connecting to ResourceManager at node3/192.168.193.143:8032
2024-06-14 11:22:18,895 INFO  org.apache.flink.yarn.YarnClusterDescriptor ? ? ? ? ? ? ? ?  [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2024-06-14 11:22:19,052 INFO  org.apache.flink.yarn.YarnClusterDescriptor ? ? ? ? ? ? ? ?  [] - Found Web Interface node4:38788 of application 'application_1718331886020_0002'.
insert into t1(id,ts,vc) select * from source;
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 84292f84d1fce4756ccd8ae294b6163a
?
?
Flink SQL> select * from t1;2024-06-14 11:23:38,338 WARN  org.apache.flink.yarn.configuration.YarnLogConfigUtil ? ? ?  [] - The configuration directory ('/home/hadoop/soft/flink-1.17.1/conf') already contains a LOG4J config file.If you want to use logback, then please delete or rename the log configuration file.
2024-06-14 11:23:38,606 INFO  org.apache.hadoop.yarn.client.RMProxy ? ? ? ? ? ? ? ? ? ? ?  [] - Connecting to ResourceManager at node3/192.168.193.143:8032
2024-06-14 11:23:38,617 INFO  org.apache.flink.yarn.YarnClusterDescriptor ? ? ? ? ? ? ? ?  [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2024-06-14 11:23:38,649 INFO  org.apache.flink.yarn.YarnClusterDescriptor ? ? ? ? ? ? ? ?  [] - Found Web Interface node4:38788 of application 'application_1718331886020_0002'.
select * from t1;
[INFO] Result retrieval cancelled.
?
Flink SQL> 
?

?

upsert-kafka表

'connector' = 'upsert-kafka'

如果當(dāng)前表存在更新操作,那么普通的kafka連接器將無法滿足,此時可以使用Upsert Kafka連接器。

創(chuàng)建upsert-kafka的映射表(必須定義主鍵)

CREATE TABLE t2( id int , sumVC int ,primary key (id) NOT ENFORCED 
)
WITH ('connector' = 'upsert-kafka','properties.bootstrap.servers' = 'node2:9092','topic' = 'ws2','key.format' = 'json','value.format' = 'json'
);

如果沒有kafka名為ws2的topic,將自動被創(chuàng)建。

插入upsert-kafka表

insert into t2 select id,sum(vc) sumVC  from source group by id;

查詢upsert-kafka表

upsert-kafka 無法從指定的偏移量讀取,只會從主題的源讀取。如此,才知道整個數(shù)據(jù)的更新過程。并且通過 -U,+U,+I 等符號來顯示數(shù)據(jù)的變化過程。

設(shè)置顯示模式

SET sql-client.execution.result-mode=tableau;

?查詢t2表數(shù)據(jù)

select * from t2;

如果發(fā)現(xiàn)沒有輸出數(shù)據(jù),原因是之前的source表已經(jīng)生成到end(1000000)就不再生成數(shù)據(jù)了。

進(jìn)入Flink Web UI,cancel掉所有running job,重新操作成功如下:

刪除表

Flink SQL> show tables;
+------------+
| table name |
+------------+
| ? ? source |
| ? ? ? ? t1 |
| ? ? ? ? t2 |
+------------+
3 rows in set
?
Flink SQL> drop table source;
Flink SQL> drop table t1;
Flink SQL> drop table t2;

創(chuàng)建表

CREATE TABLE source ( id INT, ts BIGINT, vc INT
) WITH ( 'connector' = 'datagen', 'rows-per-second'='1', 'fields.id.kind'='random', 'fields.id.min'='1', 'fields.id.max'='10', 'fields.ts.kind'='sequence', 'fields.ts.start'='1', 'fields.ts.end'='1000000', 'fields.vc.kind'='random', 'fields.vc.min'='1', 'fields.vc.max'='100'
);
CREATE TABLE t2( id int , sumVC int ,primary key (id) NOT ENFORCED 
)
WITH ('connector' = 'upsert-kafka','properties.bootstrap.servers' = 'node2:9092','topic' = 'ws2','key.format' = 'json','value.format' = 'json'
);

設(shè)置顯示模式

SET sql-client.execution.result-mode=tableau;

查詢表

select * from t2;

?

完成!enjoy it!

http://m.aloenet.com.cn/news/34184.html

相關(guān)文章:

  • 恒網(wǎng)做的網(wǎng)站網(wǎng)站排名優(yōu)化服務(wù)公司
  • wordpress 設(shè)置數(shù)據(jù)庫南陽網(wǎng)站seo
  • 太原seo網(wǎng)站排名網(wǎng)站優(yōu)化包括
  • 成都網(wǎng)站建設(shè)哪里好點seo1短視頻網(wǎng)頁入口營銷
  • 深圳網(wǎng)站制作公司咨詢小紅書搜索關(guān)鍵詞排名
  • 亞馬遜虛擬主機做網(wǎng)站最新清遠(yuǎn)發(fā)布
  • 怎么給自己的網(wǎng)站做模版全網(wǎng)營銷推廣平臺有哪些
  • 羅湖網(wǎng)站建設(shè)羅湖網(wǎng)站設(shè)計seo是什么意思為什么要做seo
  • 網(wǎng)站要咋做2022年最新熱點素材
  • 免費做h5的網(wǎng)站展示型網(wǎng)站有哪些
  • 做室內(nèi)設(shè)計的網(wǎng)站有哪些內(nèi)容數(shù)字營銷服務(wù)商seo
  • 日本設(shè)計創(chuàng)意網(wǎng)站web網(wǎng)站設(shè)計
  • 萊蕪網(wǎng)站優(yōu)化招聘網(wǎng)seo搜索如何優(yōu)化
  • 學(xué)用mvc做網(wǎng)站重慶seo網(wǎng)絡(luò)推廣優(yōu)化
  • vue做移動端網(wǎng)站與pc端有什么區(qū)別網(wǎng)站推廣軟件免費版下載
  • 網(wǎng)站開發(fā)的項目開發(fā)網(wǎng)站開發(fā)公司排行榜
  • 品牌廣告設(shè)計制作公司網(wǎng)站源碼班級優(yōu)化大師的功能有哪些
  • wordpress使用兩個主題如何推廣seo
  • 獨立站都有哪些百度快速排名提升
  • 網(wǎng)站投票活動怎么做百度域名注冊查詢
  • 沒有網(wǎng)站怎么做seob站推廣平臺
  • 網(wǎng)站報名怎么做市場營銷培訓(xùn)
  • 網(wǎng)站目錄鏈接怎么做天津百度推廣電話
  • 粉色做網(wǎng)站背景圖片競價推廣是什么意思
  • 臨汾哪做網(wǎng)站seo關(guān)鍵詞優(yōu)化推廣哪家好
  • 京東淘寶網(wǎng)站是怎么做的360免費做網(wǎng)站
  • 微信鏈接網(wǎng)頁網(wǎng)站制作網(wǎng)站seo優(yōu)化推廣
  • wordpress quizzin網(wǎng)站怎樣優(yōu)化關(guān)鍵詞好
  • 大興智能網(wǎng)站建設(shè)哪家好企業(yè)營銷策劃
  • 吉林省住房城鄉(xiāng)建設(shè)廳網(wǎng)站首頁什么是搜索推廣