和女的做那個視頻網(wǎng)站應(yīng)用商店優(yōu)化
前言
? ? ? ? 學數(shù)倉的時候發(fā)現(xiàn) flume 落了一點,趕緊補齊。
1、Flume 事務(wù)
Source 在往 Channel 發(fā)送數(shù)據(jù)之前會開啟一個 Put 事務(wù):
- doPut:將批量數(shù)據(jù)寫入臨時緩沖區(qū) putList(當 source 中的數(shù)據(jù)達到 batchsize 或者 超過特定的時間就會發(fā)送數(shù)據(jù))
- doCommit:檢查 channel 內(nèi)存隊列是否足夠合并
- doRollback:如果 channel 內(nèi)存隊列空間不足沒救回滾數(shù)據(jù)
同樣 Sink 在從 Channel 主動拉取數(shù)據(jù)的時候也會開啟一個 Take 事務(wù):
- doTake:將數(shù)據(jù)讀取到臨時緩沖區(qū) takeList,并將數(shù)據(jù)發(fā)送到 HDFS
- doCommit:如果數(shù)據(jù)全部發(fā)送成功,就會清除臨時緩沖區(qū) taskList
- dooRollback:數(shù)據(jù)發(fā)送過程如果出現(xiàn)異常,rollback 將臨時緩沖區(qū)的數(shù)據(jù)歸還給 channel 內(nèi)存隊列
2、Flume Agent 內(nèi)部原理
注意:只有 source 和 channel 之間可以存在攔截器,channel 和 sink 之間不可以!??
- source 接收數(shù)據(jù),把數(shù)據(jù)封裝成 Event?
- 傳給 channel processor 也就是 channel 處理器
- 把事件傳給攔截器(interceptor),在攔截器這里可以對數(shù)據(jù)進行一些處理(我們在上一節(jié)中說過,當我們的路徑信息中包含時間的時候,需要從 Event Header 中讀取時間信息,如果沒有就需要我們指定從本地讀取 timestamp,所以這里我們就可以在攔截器這里給我們的 event 添加頭部信息);而且,攔截器可以設(shè)置多個
- 經(jīng)過攔截器處理的事件又返回給了 channel processor ,然后 channel processor 把事件傳給 channel 選擇器(channel selector 有兩種類型:Replicating 和 Multiplexing ,Replicating 會把source 發(fā)送來的 events 發(fā)往所有 channel,而 multiplexing 可以配置指定發(fā)往哪些 channel)
- 經(jīng)過 channel 選擇器處理后的事件仍然返回給 channel processor
- channel processor 會根據(jù) channel 選擇器的結(jié)果,發(fā)送給相應(yīng)的 channel(也就是這個時候才會真正的開啟 put 事務(wù),之前都是對 event 進行簡單的處理)
- SinkProcessor 負責協(xié)調(diào)拉取 channel 中的數(shù)據(jù),它有三種類型:DefaultSinkProcessor、LoadBalancingSinkpProcessor(負載均衡,也就是多個 Sink 輪詢的方式去讀取 channel 中的數(shù)據(jù))、FailoverSinkProcessor(故障轉(zhuǎn)移,每個 sink 有自己的優(yōu)先級,優(yōu)先級高的去讀取 channel 中的事件,只有當它掛掉的時候,才會輪到下一個優(yōu)先級的 sink 去讀)。其中 DefaultSinkProcessor 一個 channel 只能綁定一個 Sink,所以它也就沒有 sink 組的概念。
注意:一個 sink 只可以綁定一個 channel ,但是一個 channel 可以綁定多個 sink!
3、Flume 拓撲結(jié)構(gòu)
3.1、簡單串聯(lián)
官網(wǎng)這段話翻譯過來就是:為了將數(shù)據(jù)跨越多個代理或躍點進行傳輸,前一個代理的接收器(sink)和當前躍點的源(source)需要是avro類型,接收器指向源的主機名(或IP地址)和端口。
這種模式的缺點很好理解,就像串聯(lián)電路,一個節(jié)點壞了會影響整個系統(tǒng)。
3.2、復(fù)制和多路復(fù)用
從官網(wǎng)翻譯過來就是:上述示例顯示了一個名為“foo”的代理源將流程分散到三個不同的通道。這種分散可以是復(fù)制或多路復(fù)用。在復(fù)制流程的情況下,每個事件都會發(fā)送到這三個通道。對于多路復(fù)用的情況,當事件的屬性與預(yù)配置的值匹配時,事件將被發(fā)送到可用通道的子集。例如,如果事件屬性名為“txnType”設(shè)置為“customer”,則應(yīng)發(fā)送到channel1和channel3,如果為“vendor”,則應(yīng)發(fā)送到channel2,否則發(fā)送到channel3。映射可以在代理的配置文件中設(shè)置。
這種模式相比上面的串聯(lián)模式的優(yōu)點無非就是可以發(fā)送過多個目的地。
3.3、負載均衡和故障轉(zhuǎn)移
Flume 支持多個 Sink 邏輯上分到一個 Sink 組,sink 組配合不同的 SinkProcessor ,可以實現(xiàn)負載均衡和錯誤恢復(fù)的功能。
3.4、聚合
這種模式在實際開發(fā)中是經(jīng)常會用到的,日常web應(yīng)用通常分布在上百個服務(wù)器,大者甚至上千個、上萬個服務(wù)器。產(chǎn)生的日志,處理起來也非常麻煩。用flume的這種組合方式能很好的解決這一問題,每臺服務(wù)器部署一個flume采集日志,傳送到一個集中收集日志的 flume,再由此flume上傳到hdfs、hive、hbase等,進行日志分析。
4、Flume 企業(yè)開發(fā)實例
4.1、復(fù)制和多路復(fù)用
注意:多路復(fù)用必須配合攔截器使用,因為需要在 Event Header 中添加一些信息。
1)案例需求
2)需求分析
- 監(jiān)控文件變動我們可以考慮使用 taildir 或者 exec 這兩種 source
- flume-1 sink 需要使用 avro sink 才能傳輸?shù)较乱粋€ flume-2 和 flume-3 的 source
- flume-2 需要上傳數(shù)據(jù)到 HDFS 所以?sink 為 hdfs
- flume-3 需要把數(shù)據(jù)輸出到本地,所以 sink 為 file_roll sink(要保存到本地目錄,這個目錄就必須提前創(chuàng)建好,它不像 HDFS Sink 會自動幫我們創(chuàng)建)
我們需要實現(xiàn)三個 flume 作業(yè):
- flume-1 把監(jiān)聽到的新日志讀取到 flume-2 和 flume-3 的 source
- flume-2 把日志上傳到 hdfs
- flume-3 把日志寫到本地
3)需求實現(xiàn)
flume-file-flume.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2# 將數(shù)據(jù)流復(fù)制給所有 channel 默認就是 replicating 所以也可以不用配置
a1.sources.r1.selector.type = replicating
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/module/hive-3.1.2/logs/hive.log
a1.sources.r1.shell = /bin/bash -c# Describe the sink
# sink 端的 avro 是一個數(shù)據(jù)發(fā)送者
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop102
a1.sinks.k1.port = 4141
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop102
a1.sinks.k2.port = 4142# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100# Bind the source and sink to the channel
# 一個 sink 只可以指定一個 channel,但是一個 channel 可以指定多個 sink
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
flume-hdfs.conf
# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1# Describe/configure the source
# source 端的 avro 是一個數(shù)據(jù)接收服務(wù)
a2.sources.r1.type = avro
a2.sources.r1.bind = hadoop102
a2.sources.r1.port = 4141# Describe the sink
a2.sinks.k1.type = hdfs
a2.sinks.k1.hdfs.path = hdfs://hadoop102:9820/flume2/%Y%m%d/%H
#上傳文件的前綴
a2.sinks.k1.hdfs.filePrefix = flume2-
#是否按照時間滾動文件夾
a2.sinks.k1.hdfs.round = true
#多少時間單位創(chuàng)建一個新的文件夾
a2.sinks.k1.hdfs.roundValue = 1
#重新定義時間單位
a2.sinks.k1.hdfs.roundUnit = hour
#是否使用本地時間戳
a2.sinks.k1.hdfs.useLocalTimeStamp = true
#積攢多少個 Event 才 flush 到 HDFS 一次
a2.sinks.k1.hdfs.batchSize = 100
#設(shè)置文件類型,可支持壓縮
a2.sinks.k1.hdfs.fileType = DataStream
#多久生成一個新的文件
a2.sinks.k1.hdfs.rollInterval = 30
#設(shè)置每個文件的滾動大小大概是 128M
a2.sinks.k1.hdfs.rollSize = 134217700
#文件的滾動與 Event 數(shù)量無關(guān)
a2.sinks.k1.hdfs.rollCount = 0# Describe the channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
flume-dir.conf
# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c2# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop102
a3.sources.r1.port = 4142# Describe the sink
a3.sinks.k1.type = file_roll
a3.sinks.k1.sink.directory = /opt/module/data/flume3# Describe the channel
a3.channels.c2.type = memory
a3.channels.c2.capacity = 1000
a3.channels.c2.transactionCapacity = 100# Bind the source and sink to the channel
a3.sources.r1.channels = c2
a3.sinks.k1.channel = c2
4)測試
bin/flume-ng agent -c conf/ -n a3 -f job/group1/flume-dir.conf
bin/flume-ng agent -n a1 -c conf/ -f job/group1/flume-file-flumc.conf
bin/flume-ng agent -n a2 -c conf/ -f job/group1/flume-hdfs.conf
查看結(jié)果:
注意:寫入本地文件時,當一段時間沒有新的日志時,它仍然會創(chuàng)建一個新的文件,而不像 hdfs sink 即使達到了設(shè)置的間隔時間但是沒有新日志產(chǎn)生,那么它也不會創(chuàng)建一個新的文件。
這個需要注意的就是 hdfs 的端口不要寫錯,比如我的就不是 9870 而是 8020.
4.2、負載均衡和故障轉(zhuǎn)移
1)案例需求
2)需求分析
- 開啟一個端口 88888 來發(fā)送數(shù)據(jù)
- 使用 flume-1 監(jiān)聽該端口,并發(fā)送到 flume-2 和 flume-3 (需要 flume-1 的 sink 為 avro sink,flume-2 和 flume-3 的 source 為 avro source),flume-2 和 flume-3 發(fā)送日志到控制臺(flume-2 和 flume-3 的 sink 為 logger sink)
3)需求實現(xiàn)
flume-nc-flume.conf
# Name the components on this agent
a1.sources = r1
a1.channels = c1
a1.sinkgroups = g1
a1.sinks = k1 k2# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
a1.sinkgroups.g1.processor.maxpenalty = 10000# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop102
a1.sinks.k1.port = 4141
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop102
a1.sinks.k2.port = 4142# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1
flume-flume-console1.conf
# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1# Describe/configure the source
a2.sources.r1.type = avro
a2.sources.r1.bind = hadoop102
a2.sources.r1.port = 4141# Describe the sink
a2.sinks.k1.type = logger# Describe the channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
flume-flume-console2.conf?
# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c2# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop102
a3.sources.r1.port = 4142# Describe the sink
a3.sinks.k1.type = logger# Describe the channel
a3.channels.c2.type = memory
a3.channels.c2.capacity = 1000
a3.channels.c2.transactionCapacity = 100# Bind the source and sink to the channel
a3.sources.r1.channels = c2
a3.sinks.k1.channel = c2
?4)案例測試
bin/flume-ng agent -c conf/ -n a3 -f job/group2/flume-flume-console2.conf -Dflume.root.logger=INFO,console
bin/flume-ng agent -c conf/ -n a2 -f job/group2/flume-flume-console1.conf -Dflume.root.logger=INFO,console
bin/flume-ng agent -c conf/ -n a1 -f job/group2/flume-nc-flume.conf
關(guān)閉 flume-flume-console1.conf 作業(yè)?
?我們發(fā)現(xiàn),一開始我們開啟三個 flume 作業(yè),當向 netcat 輸入數(shù)據(jù)時,只有 flume-flume-console1.conf 作業(yè)的控制臺有日志輸出,這是因為它的優(yōu)先級更高,當把作業(yè)?flume-flume-console1.conf 關(guān)閉時,再次向端口 44444 發(fā)送數(shù)據(jù),發(fā)現(xiàn)?flume-flume-console2.conf 作業(yè)開始輸出。
如果要使用負載均衡,只需要替換上面 flume-nc-flume.conf 中:
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
a1.sinkgroups.g1.processor.maxpenalty = 10000
替換為:
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.maxTimeOut = 30000
其中,backoff 代表退避,默認為 false,?如果當前 sink 沒有拉到數(shù)據(jù),那么接下來一段時間就不用這個 sink 。maxTimeOut 代表最大的退避時間,因為退避默認是指數(shù)增長的(比如一個 sink 第一次沒有拉到數(shù)據(jù),需要等 1 s,第二次還沒拉到,等 2s,第三次等 4s ...),默認最大值為 30 s。
4.3、聚合
1)案例需求
- hadoop102 上的 Flume-1 監(jiān)控文件/opt/module/group.log,
- hadoop103 上的 Flume-2 監(jiān)控某一個端口的數(shù)據(jù)流,
- Flume-1 與 Flume-2 將數(shù)據(jù)發(fā)給?hadoop104 上的 Flume-3,Flume-3 將最終數(shù)據(jù)打印到控制臺。
注意:主機只能在 hadoop104 上配,因為 avro source 在 hadoop104 上,客戶端(hadoop02 和 hadoop103 的 sink)可以遠程連接,但是服務(wù)端(hadoop104 的 source)只能綁定自己的端口號。
2)需求實現(xiàn)
flume-log-flume.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/module/group.log
a1.sources.r1.shell = /bin/bash -c# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop104
a1.sinks.k1.port = 4141# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
?flume-nc-flume.conf
# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1# Describe/configure the source
a2.sources.r1.type = netcat
a2.sources.r1.bind = hadoop103
a2.sources.r1.port = 44444# Describe the sink
a2.sinks.k1.type = avro
a2.sinks.k1.hostname = hadoop104
a2.sinks.k1.port = 4141# Use a channel which buffers events in memory
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
flume-flume-log.conf
# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c1# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop104
a3.sources.r1.port = 4141# Describe the sink
a3.sinks.k1.type = logger# Describe the channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1
3)測試
向 group.log 文件中追加文本:
注意:hadoop103 這里不能寫 nc localhost 44444 而要寫 nc hadoop103 44444! 否則報錯:Ncat: Connection refused.
5、自定義 Interceptor
前面我們的多路復(fù)用還沒有實現(xiàn),因為我們說多路復(fù)用必須配合攔截器來使用,因為我們必須知道每個 Channel 發(fā)往哪些 Sink,這需要攔截器往 Event Header 中寫一些內(nèi)容。
1)案例需求
2)需求分析
在實際的開發(fā)中,一臺服務(wù)器產(chǎn)生的日志類型可能有很多種,不同類型的日志可能需要發(fā)送到不同的分析系統(tǒng)。此時會用到 Flume 拓撲結(jié)構(gòu)中的 Multiplexing 結(jié)構(gòu),Multiplexing 的原理是,根據(jù) event 中 Header 的某個 key 的值,將不同的 event 發(fā)送到不同的 Channel中,所以我們需要自定義一個 Interceptor,為不同類型的 event 的 Header 中的 key 賦予不同的值。
在該案例中,我們以端口數(shù)據(jù)模擬日志,以是否包含”lyh”模擬不同類型的日志,我們需要自定義 interceptor 區(qū)分數(shù)據(jù)中是否包含”lyh”,將其分別發(fā)往不同的分析系統(tǒng)(Channel)。
?3)需求實現(xiàn)
自定義攔截器
引入 flume 依賴
<dependency><groupId>org.apache.flume</groupId><artifactId>flume-ng-core</artifactId><version>1.9.0</version>
</dependency>
package com.lyh.gmall.interceptor;import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;import java.util.ArrayList;
import java.util.List;
import java.util.Map;public class TypeInterceptor implements Interceptor {// 存放事件集合private List<Event> addHeaderEvents;@Overridepublic void initialize() {// 初始化存放事件的集合addHeaderEvents = new ArrayList<>();}// 單個事件攔截@Overridepublic Event intercept(Event event) {// 1. 獲取事件中的 header 信息Map<String, String> headers = event.getHeaders();// 2. 獲取事件中的 body 信息String body = new String(event.getBody());// 3. 根據(jù) body 中是否包含 'lyh' 來決定發(fā)往哪個 sinkif (body.contains("lyh"))headers.put("type","first");elseheaders.put("type","second");return event;}// 批量事件攔截@Overridepublic List<Event> intercept(List<Event> events) {// 1. 清空集合addHeaderEvents.clear();// 2. 遍歷 eventsfor (Event event : events) {// 3. 給每個事件添加頭信息addHeaderEvents.add(intercept(event));}return addHeaderEvents;}@Overridepublic void close() {}public static class Builder implements Interceptor.Builder{@Overridepublic Interceptor build() {return new TypeInterceptor();}@Overridepublic void configure(Context context) {}}
}
打包放到 flume 安裝目錄的 lib 目錄下:
?
flume 作業(yè)配置
hadoop102:
# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.lyh.interceptor.TypeInterceptor$Builder
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = type
a1.sources.r1.selector.mapping.first = c1 # 包含 'lyh'
a1.sources.r1.selector.mapping.second = c2 # 不包含 'lyh'# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop103
a1.sinks.k1.port = 4141
a1.sinks.k2.type=avro
a1.sinks.k2.hostname = hadoop104
a1.sinks.k2.port = 4242# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# Use a channel which buffers events in memory
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
a1.sources = r1
a1.sinks = k1
a1.channels = c1a1.sources.r1.type = avro
a1.sources.r1.bind = hadoop103
a1.sources.r1.port = 4141a1.sinks.k1.type = loggera1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100a1.sinks.k1.channel = c1
a1.sources.r1.channels = c1
a1.sources = r1
a1.sinks = k1
a1.channels = c1a1.sources.r1.type = avro
a1.sources.r1.bind = hadoop104
a1.sources.r1.port = 4242a1.sinks.k1.type = loggera1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100a1.sinks.k1.channel = c1
a1.sources.r1.channels = c1
4)需求實現(xiàn)
#hadoop103
bin/flume-ng agent -n a1 -c conf/ -f job/group4/flume2.conf -Dflume.root.logger=INFO,console#hadoop104
bin/flume-ng agent -n a1 -c conf/ -f job/group4/flume3.conf -Dflume.root.logger=INFO,console#hadoop102
bin/flume-ng agent -n a1 -c conf/ -f job/group4/flume1.conf
nc localhost 44444
hadoop102:
hadoop103:
hadoop104:?
可以看到,從 hadoop102 發(fā)送的日志中,包含 "lyh" 的都被發(fā)往 hadoop103 的 4141 端口,其它日志則被發(fā)往 hadoop104 的 4242端口。
6、自定義 Source
自定義 source 用的還是比較少的,畢竟 flume 已經(jīng)提供了很多常用的了。
1)介紹
- getBackOffSleepIncrement() //backoff 步長,當從數(shù)據(jù)源拉取數(shù)據(jù)時,拉取不到數(shù)據(jù)的話它不會一直再去拉取,而是等待,之后每一次再=如果還拉取不到,就會比上一次多等待步長單位個時間。
- getMaxBackOffSleepInterval()? //backoff 最長時間,如果不設(shè)置最長等待時間,它最終會無限等待,所以需要指定。
- configure(Context context)? //初始化 context(讀取配置文件內(nèi)容)
- process()? //獲取數(shù)據(jù)封裝成 event 并寫入 channel,這個方法將被循環(huán)調(diào)用。
2)需求
3)分析
4)需求實現(xiàn)
代碼
package com.lyh.source;import org.apache.flume.Context;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.PollableSource;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.SimpleEvent;
import org.apache.flume.source.AbstractSource;import java.util.HashMap;
import java.util.Map;public class MySource extends AbstractSource implements Configurable, PollableSource {// 定義配置文件將來要讀取的字段private Long delay;private String field;@Overridepublic Status process() throws EventDeliveryException {try {// 創(chuàng)建事件頭信息Map<String,String> headerMap = new HashMap<>();// 創(chuàng)建事件SimpleEvent event = new SimpleEvent();// 循環(huán)封裝事件for (int i = 0; i < 5; i++) {// 給事件設(shè)置頭信息event.setHeaders(headerMap);// 給事件設(shè)置內(nèi)容event.setBody((field + i).getBytes());// 將事件寫入 channelgetChannelProcessor().processEvent(event);Thread.sleep(delay);}} catch (InterruptedException e) {e.printStackTrace();}return Status.READY;}// 步長@Overridepublic long getBackOffSleepIncrement() {return 0;}// 最大間隔時間@Overridepublic long getMaxBackOffSleepInterval() {return 0;}// 初始化配置信息@Overridepublic void configure(Context context) {delay = context.getLong("delay");field = context.getString("field","Hello");}
}
配置文件
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1# Describe/configure the source
a1.sources.r1.type = com.lyh.source.MySource
a1.sources.r1.delay = 1000
a1.sources.r1.field = lyh# Describe the sink
a1.sinks.k1.type = logger# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
bin/flume-ng agent -n a1 -c conf/ -f job/custom-source.conf -Dflume.root.logger=INFO,console
運行結(jié)果:?
7、自定義 Sink
1)介紹
- configure(Context context)//初始化 context(讀取配置文件內(nèi)容)
- process()//從 Channel 讀取獲取數(shù)據(jù)(event),這個方法將被循環(huán)調(diào)用。
2)需求分析

?3)需求實現(xiàn)
package com.lyh.sink;import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public class MySink extends AbstractSink implements Configurable{private final static Logger LOG = LoggerFactory.getLogger(AbstractSink.class);private String prefix;private String suffix;@Overridepublic Status process() throws EventDeliveryException {// 聲明返回值狀態(tài)信息Status status;// 獲取當前 sink 綁定的 channelChannel channel = getChannel();// 獲取事務(wù)Transaction txn = channel.getTransaction();// 聲明事件Event event;// 開啟事務(wù)txn.begin();// 讀取 channel 中的事件、直到讀取事件結(jié)束循環(huán)while (true){event = channel.take();if (event!=null) break;}try {// 打印事件LOG.info(prefix + new String(event.getBody()) + suffix);// 事務(wù)提交txn.commit();status = Status.READY;}catch (Exception e){// 遇到異?;貪L事務(wù)txn.rollback();status = Status.BACKOFF;}finally {// 關(guān)閉事務(wù)txn.close();}return null;}// 初始化配置信息@Overridepublic void configure(Context context) {// 帶默認值prefix = context.getString("prefix","hello");// 不帶默認值suffix = context.getString("suffix");}
}
配置文件
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444# Describe the sink
a1.sinks.k1.type = com.atguigu.MySink
a1.sinks.k1.prefix = lyh:
a1.sinks.k1.suffix = :lyh# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
4)測試
bin/flume-ng agent -n a1 -c conf/ -f job/custom-sink.conf -Dflume.root.logger=INFO,console
運行結(jié)果:

總結(jié)
? ? ? ? 自此,flume 的學習基本也完了,這一篇雖然不多但也用了大概3天時間。相比較 kafka、flink,flume 這個框架還是非常簡單的,比如我們自己實現(xiàn)一些 source、sink,都是很簡單的,沒有太多復(fù)雜的理解的東西。
? ? ? ? 總之 flume 這個工具還是多看官網(wǎng)。