搭建一個20人的辦公網(wǎng)絡(luò)優(yōu)化是什么梗
kafka入門
一種分布式的、基于發(fā)布/訂閱的消息系統(tǒng),scala編寫,具備快速、可擴展、可持久化的特點。
基本概念
topic
主題
partition
分區(qū),一個topic下可以有多個partition,消息是分散到多個partition里存儲的,partition支持水平擴展。
一個partition內(nèi)的消息是有序的,partition間的消息則是無序的。
每個partition會有若干副本。
broker
一個kafka節(jié)點
consumer
消費者,從topic里取得消息。
每個consumer維護自己的offset。
consumer數(shù)量小于分區(qū)數(shù),會有一個消費者處理多個分區(qū);反之,會有空閑的消費者,造成浪費。
producer
生產(chǎn)者,負責(zé)將消息寫入topic
特點
基于硬盤的消息保存,避免在producer上累積消息或者消息丟失。
同一個消息可以由多個consumer消費。
可擴展性:隨著數(shù)據(jù)的增加,可擴展為數(shù)十臺,上百臺規(guī)模的大集群。擴展可以在集群正常運行的時候進行,對于整個系統(tǒng)的運作沒有影響;這也就意味著,對于很多臺broker 的集群,如果一臺broker 有故障,不影響為client 提供服務(wù).集群如果要同時容忍更多的故障的話, 可以配置更高的replication
factors。
高性能:上面的這些特性使得Apache Kafka 成為一個能夠在高負載的情況下表現(xiàn)出優(yōu)越性能的發(fā)布-訂閱消息系統(tǒng)。Producer, consumer 和broker 都能在大數(shù)據(jù)流的情況下輕松的擴展.
kafka的版本
比如kafka_2.10-0.10.2.0
這里,2.10指的是編譯kafka的scala版本,真正的kafka版本號是后面的:0.10.2.0
配置kafka
config/server.properties文件里的配置項說明:
broker.id
每個kafka 的broker 都需要有一個整型的唯一標識,這個標識通過broker.id 來設(shè)置。默認的情況下,這個數(shù)字是0,但是它可以設(shè)置成任何值。需要注意的
是,需要保證集群中這個id 是唯一的。這個值是可以任意填寫的,并且可以在必要的時候從broker 集群中刪除。比較好的做法是使用主機名相關(guān)的標識來做
為id,比如,你的主機名當中有數(shù)字相關(guān)的信息,如hosts1.example.com,host2.example.com,那么這個數(shù)字就可以用來作為broker.id 的值。
port
默認啟動kafka 時,監(jiān)聽的是TCP 的9092 端口,端口號可以被任意修改。如果端口號設(shè)置為小于1024,那么kafka 需要以root 身份啟動。但是并不推薦以root 身份啟動。
zookeeper.connect
這個參數(shù)指定了Zookeeper 所在的地址,它存儲了broker 的元信息。默認是運行在本機的2181 端口上,因此這個值被設(shè)置成
localhost:2181。這個值可以通過分號設(shè)置多個值,每個值的格式都是hostname:port/path
log.dirs
這個參數(shù)用于配置Kafka 保存數(shù)據(jù)的位置,Kafka 中所有的消息都會存在這個目錄下??梢酝ㄟ^逗號來指定多個目錄,kafka 會根據(jù)最少被使用的原則選擇目錄分配新的partition。注意kafka 在分配partition 的時候選擇的規(guī)則不是按照磁盤的空間大小來定的,而是分配的parition 的個數(shù)大小。
num.recovery.thread.per.data.dir
kafka 可以配置一個線程池,線程池的使用場景如下:
- 當正常啟動的時候,開啟每個parition 的文檔塊
- 當失敗后重啟時,檢查parition 的文檔塊
- 當關(guān)閉kafka 的時候,清除關(guān)閉文檔塊
默認,每個目錄只有一個線程。最好是設(shè)置多個線程數(shù),這樣在服務(wù)器啟動或者關(guān)閉的時候,都可以并行的進行操作。尤其是當非正常停機后,重啟時,如果有大量的分區(qū)數(shù),那么啟動broker 將會花費大量的時間。
【注意】這個參數(shù)是針對每個目錄的。比如,num.recovery.threads.per.data.dir 設(shè)置為8,如果有3個log.dirs 路徑,那么一共會有24 個線程。
num.partitions
這個參數(shù)用于配置新創(chuàng)建的topic 有多少個分區(qū),默認是1 個。注意partition 的個數(shù)只可以被增加,不能被減少。這就意味著如果想要減少主題的分區(qū)數(shù),那
么就需要重新創(chuàng)建topic。
在第一章中介紹過,kafka 通過分區(qū)來對topic 進行擴展,因此需要使用分區(qū)的個數(shù)來做負載均衡,如果新增了broker,那么就會引發(fā)重新負載分配。這并不意味著所有的主題的分區(qū)數(shù)都需要大于broker 的數(shù)量,因為kafka 是支持多個主題的,其他的主題會使用其余的broker。需要注意的是,如果消息的吞吐量很高,那么可以通過設(shè)置一個比較大的分區(qū)數(shù),來分攤壓力。
log.retention.ms
這個參數(shù)用于配置kafka 中消息保存的時間,也可以使用log.retention.hours,默認這個參數(shù)是168 個小時,即一周。另外,還支持log.retention.minutes 和log.retention.ms。這三個參數(shù)都會控制刪除過期數(shù)據(jù)的時間,推薦還是使用log.retention.ms。如果多個同時設(shè)置,那么會選擇最小的那個。
log.retention.bytes
這個參數(shù)也是用來配置消息過期的,它會應(yīng)用到每個分區(qū),比如,你有一個主題,有8 個分區(qū),并且設(shè)置了log.retention.bytes 為1G,那么這個主題總共可以保留8G 的數(shù)據(jù)。注意,所有的過期配置都會應(yīng)用到patition 粒度,而不是主題粒度。這也意味著,如果增加了主題的分區(qū)數(shù),那么主題所能保留的數(shù)據(jù)也就隨之增加了。
如果設(shè)置了log.retention.bytes 和log.retention.ms(或者其他過期時間的配置),只要滿足其中一個條件,消息就會被刪除。
log.segment.bytes
這個參數(shù)用來控制log 段文件的大小,而不是消息的大小。在kafka 中,所有的消息都會進入broker,然后以追加的方式追加到分區(qū)當前最新的segment 段文件中。一旦這個段文件到達log.segment.bytes 設(shè)置的大小,比如默認的1G,這個段文件就會被關(guān)閉,然后創(chuàng)建一個新的。一旦這個文件被關(guān)閉,就可以理解成這個文件已經(jīng)過期了。這個參數(shù)設(shè)置的越小,那么關(guān)閉文件創(chuàng)建文件的操作就會越頻繁,這樣也會造成大量的磁盤讀寫的開銷。
通過生產(chǎn)者發(fā)送過來的消息的情況可以判斷這個值的大小。比如,主題每天接收100M 的消息,并且log.segment.bytes 為默認設(shè)置,那么10 天后,這個段文件才會被填滿。由于段文件在沒有關(guān)閉的時候,是不能刪除的,log.retention.ms 又是默認的設(shè)置,那么這個消息將會在17 天后,才過期刪除。因為10 天后,段文件才關(guān)閉。再過7 天,這個文件才算真正過期,才能被清除。
message.max.bytes
這個參數(shù)用于限制生產(chǎn)者消息的大小,默認是1000000,也就是1M。生產(chǎn)者在發(fā)送消息給broker 的時候,如果出錯,會嘗試重發(fā);但是如果是因為大小的原因,那生產(chǎn)者是不會重發(fā)的。另外,broker上的消息可以進行壓縮,這個參數(shù)是指壓縮后的大小,這樣能多存儲很多消息。
需要注意的是,允許發(fā)送更大的消息會對性能有很大影響。更大的消息,就意味著broker 在處理網(wǎng)絡(luò)連接的時候需要更長的時間,它也會增加磁盤的寫操作壓力,影響IO 吞吐量。
啟動kafka
進入kafka/bin目錄。
啟動zk
nohup ./zookeeper-server-start.sh ../config/zookeeper.properties &
測試zk是否啟動
telnet localhost 2181
輸入srvr,應(yīng)該會返回:
[2017-12-01 15:59:18,829] INFO Processing srvr command from /0:0:0:0:0:0:0:1:40194 (org.apache.zookeeper.server.NIOServerCnxn)
Zookeeper version: 3.4.6-1569965, built on 02/20/2014 09:09 GMT
Latency min/avg/max: 0/0/0
Received: 1
Sent: 0
Connections: 1
Outstanding: 0
Zxid: 0xdf
Mode: standalone
Node count: 127
[2017-12-01 15:59:18,833] INFO Closed socket connection for client /0:0:0:0:0:0:0:1:40194 (no session established for client) (org.apache.zookeeper.server.NIOServerCnxn)
Connection closed by foreign host.
不建議zookeeper運行在多于7個的節(jié)點上,因為集群性能會因一致性的特征而降低。
啟動kafka
nohup ./kafka-server-start.sh -daemon ../config/server.properties
以守護進程方式執(zhí)行。
topic管理命令
創(chuàng)建具有指定數(shù)量分區(qū)(或復(fù)制因子)的topic
./kafka-topics.sh --create --zookeeper xx.xx.xx.xx:2181 --topic test1 --replication-factor 1 --partitions 6
查看topic的元信息
./kafka-topics.sh --zookeeper localhost:2181 --describe --topic test1
查看所有topic
./kafka-topics.sh --zookeeper localhost:2181 --list
修改已有topic的分區(qū)
./kafka-topics.sh --alter --zookeeper localhost:2181 --topic test1 --partitions 4./kafka-topics.sh --zookeeper xx.xx.xx.xx:2181 --alter --topic test1 --config flush.ms=5000./kafka-topics.sh --zookeeper xx.xx.xx.xx:2181 --alter --topic test1 --config flush.messages=20000
刪除topic
./kafka-topics.sh --delete --zookeeper xx.xx.xx.xx:2181 --topic test1
但刪除topic并不是真的刪除,只是打個標記,要真正刪除,需要同時修改server.properties:
delete.topic.enable=true
并重啟kafka才能生效。
向topic發(fā)布消息
./kafka-console-producer.sh --broker-list localhost:9092 --topic test1
輸入:
msg1
msg2
按^D停止發(fā)送。
若kafka的server.properties配置了host.name,則localhost必須改成host.name的值,例如:
./kafka-console-producer.sh --broker-list xx.xx.xx.xx:9092 --topic test1
查看topic里的消息
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test1 --from-beginning
輸出:
msg1
msg2
按^C停止接收。