西安旅游攻略2天自由行攻略seo每日一貼
Flink on k8s部署日志詳解及與Yarn部署時的日志生成模式對比
最近需要將flink由原先部署到Yarn集群切換到kubernetes集群,在切換之后需要熟悉flink on k8s的運行模式。在使用過程中針對日志模塊發(fā)現(xiàn),在k8s的容器中,flink的系統(tǒng)日志只有jobmanager.log/taskmanager.log 兩個,而當時在使用Yarn集群部署時,flink的日志會有多個,比如:jobmanager.log、jobmanager.err和jobmanager.out,TaskManager同理。
因此,有同事就提出為什么在k8s中部署時,只有.log一個文件,能不能類似Yarn部署時那樣對日志文件進行區(qū)分。只是從容器日志來看的話,在一開始不夠了解k8s的情況下,會覺得日志收集的不夠準確。
因此針對上面的這個問題,就歸我進行研究和解決了。網(wǎng)上的相關資料也比較少,因此,在本次對上面這個問題整體了解分析之后,進行一次學習記錄。有遇到相關類似問題的,也可以參考這個思路。
一、認為需要修改log4j配置即可
拿到這個問題的第一步首先想到的是,既然要對日志的類別進行區(qū)分,則可以修改log4j的配置,將INFO類別和ERROR類別分別寫入不同的日志文件即可。于是,先對flink路徑下的conf/log4j-console.properties進行修改(flink on k8s部署時,使用的log4j配置文件是flink-console.properties文件,而不是log4j.properties)。
這里我們留下一個小疑問:為什么部署到k8s中時,使用的是log4j-console.properties,而不是部署到Yarn時的log4j.properties?有什么區(qū)別?
修改后的log4j-console.properties示例如下所示:
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################# Allows this configuration to be modified at runtime. The file will be checked every 30 seconds.
monitorInterval=30# This affects logging for both user code and Flink
rootLogger.level = INFO
rootLogger.appenderRef.console.ref = ConsoleAppender
rootLogger.appenderRef.rolling.ref = RollingFileAppender
rootLogger.appenderRef.errorLogFile.ref = errorLogFile# Uncomment this if you want to _only_ change Flink's logging
#logger.flink.name = org.apache.flink
#logger.flink.level = INFO# The following lines keep the log level of common libraries/connectors on
# log level INFO. The root logger does not override this. You have to manually
# change the log levels here.
logger.akka.name = akka
logger.akka.level = INFO
logger.kafka.name= org.apache.kafka
logger.kafka.level = INFO
logger.hadoop.name = org.apache.hadoop
logger.hadoop.level = INFO
logger.zookeeper.name = org.apache.zookeeper
logger.zookeeper.level = INFO# Log all infos to the console
appender.console.name = ConsoleAppender
appender.console.type = CONSOLE
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n# Log all infos in the given rolling file
appender.rolling.name = RollingFileAppender
appender.rolling.type = RollingFile
appender.rolling.append = true
appender.rolling.fileName = ${sys:log.file}
appender.rolling.filePattern = ${sys:log.file}.%i
appender.rolling.layout.type = PatternLayout
appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
appender.rolling.policies.type = Policies
appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
appender.rolling.policies.size.size=100MB
appender.rolling.policies.startup.type = OnStartupTriggeringPolicy
appender.rolling.strategy.type = DefaultRolloverStrategy
appender.rolling.strategy.max = ${env:MAX_LOG_FILE_NUMBER:-10}
appender.rolling.filter.threshold.type = LevelMatchFilter
appender.rolling.filter.threshold.level = INFO
appender.rolling.filter.threshold.onMatch = ACCEPT
appender.rolling.filter.threshold.onMisMatch = DENYappender.errorFile.name = errorLogFile
appender.errorFile.type = RollingFile
appender.errorFile.append = true
appender.errorFile.fileName = ${sys:log.file}.err
appender.errorFile.filePattern = ${sys:log.file}.err.%i
appender.errorFile.layout.type = PatternLayout
appender.errorFile.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
appender.errorFile.policies.type = Policies
appender.errorFile.policies.size.type = SizeBasedTriggeringPolicy
appender.errorFile.policies.size.size = 100MB
appender.errorFile.policies.startup.type = OnStartupTriggeringPolicy
appender.errorFile.strategy.type = DefaultRolloverStrategy
appender.errorFile.strategy.max = ${env:MAX_LOG_FILE_NUMBER:-10}
appender.errorFile.filter.threshold.type = ThresholdFilter
appender.errorFile.filter.threshold.level = ERROR
appender.errorFile.filter.threshold.onMatch = ACCEPT
appender.errorFile.filter.threshold.onMisMatch = DENY# Suppress the irrelevant (wrong) warnings from the Netty channel handler
logger.netty.name = org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline
logger.netty.level = OFF
這里相比原始文件的修改,主要集中在以下兩個方面:
- 增加RollingFileAppender的threshold參數(shù)。因為我最初希望.log日志就只顯示INFO日志,而不顯示其他類型日志。但是在log4j官網(wǎng)中介紹的threshold.level參數(shù),其過濾的是低于設置類型的日志。比如當threshold.level=INFO時,過濾的是低于INFO類型的日志,比如DEBUG,而高于的比如ERROR類型的日志還是會保留。在查找一番資料后,發(fā)現(xiàn)了threshold.type = LevelMatchFilter的配置,這種配置可以使得當前appender只保留所設置的日志類型的日志,從而實現(xiàn)了只保留INFO日志的需求。
- 增加了errorLogFile的appender。配置同上,使得當前appender只保留ERROR類型的日志數(shù)據(jù)。
觀察上面的log4j-console.properties配置可以發(fā)現(xiàn),在設置文件名時,使用了一個系統(tǒng)變量${sys:log.file},這個系統(tǒng)變量使用過flink配置的應該都比較熟悉,指定本地flink日志的默認路徑,比如/opt/log/jobmanager.log。
經(jīng)過測試后,使用上面的log4j配置能夠?qū)崿F(xiàn)我最初的想法,即將INFO日志和ERROR日志區(qū)分開,寫入不同的文件。但是經(jīng)過與Yarn部署時的文件對比發(fā)現(xiàn),實際上并不能滿足原始需求。
因為在Yarn中,.log日志中也會存在ERROR日志類型的數(shù)據(jù),似乎并不是利用log4j配置進行分開。而且我查看log4j.properties配置,也沒有發(fā)現(xiàn)類似這種區(qū)分日志類型的配置。同時在Yarn中,.err日志輸出的是任務異常信息,比如e.printStackTrace(),.out日志輸出的是類似System.out.println中的數(shù)據(jù)。而log4j的配置實際上單純的只是針對flink執(zhí)行時的系統(tǒng)日志進行配置處理,似乎跟上面的場景還不是一樣的。
因此,就要去尋找新的思路,在摸索之后,決定從根據(jù)這個log.file的系統(tǒng)變量,從flink的源碼入手
二、Flink源碼分析-Yarn
在本地git clone好flink的源碼后,切換到flink1.12版本分支,進行全局搜索"log.file",在flink-runtime模塊下發(fā)現(xiàn)了BootstrapTools
類,在該類下,有一個getTaskManagerShellCommand
的方法,在方法中,有一處代碼非常有用,如下所示:
startCommandValues.put("redirects","1> "+ logDirectory+ "/taskmanager.out "+ "2> "+ logDirectory+ "/taskmanager.err");
可以看到,這里不就是我們最初想要生成的.out和.err文件嗎!!。那么這里的redirects表示什么意思呢?
觀察后源碼知道,flink設置了一個啟動命令行的template模塊,有一個redirects的占位符,因此上面實際上就是后續(xù)將重定向命令替換redirects占位符。
接下來看一下這個方法在哪里被調(diào)用了,發(fā)現(xiàn)除了在BootstrarpToolsTest
測試類中被調(diào)用外,只在flink-yarn項目下src/main/java/org/apache/flink/yarn/Utils.java
類中被使用,如下所示:
String launchCommand =BootstrapTools.getTaskManagerShellCommand(flinkConfig,tmParams,".",ApplicationConstants.LOG_DIR_EXPANSION_VAR,hasLogback,hasLog4j,hasKrb5,taskManagerMainClass,taskManagerDynamicProperties);if (log.isDebugEnabled()) {log.debug("Starting TaskManagers with command: " + launchCommand);} else {log.info("Starting TaskManagers");}
因此,當部署到Yarn集群上上時,在構建TaskManager的啟動命令時,會使用上述的方法。同時,上面的代碼發(fā)現(xiàn),當滿足log.isDebugEnabled()條件時,可以打印出這個啟動命令。如何能滿足這個條件呢?實際上,log.isDebugEnabled()
就是表示當前l(fā)og4j的配置是允許打印DEBUG類型日志的,因此,我們?nèi)サ絝link的conf/log4j.properties下,修改rootLogger.level = INFO
=> rootLogger.level = DEBUG
,然后再重新運行任務,即可在日志中看到這個啟動命令:
可以看到,在啟動命令的最后位置,有上面代碼中的重定向命令,這個重定向命令將標準輸出和標準錯誤分別重定向到了.out和.err文件。
至此,我們就成功定位了在Yarn中為什么能夠生成.err和.out日志的原因了。實際上就是由于這樣的一條重定向語句,將flink任務執(zhí)行時的標準輸出和標準錯誤分別重定向到了.out和.err文件。這也解釋了為什么在Yarn部署時,.err日志里顯示的是異常信息,比如e.printStackTrace(),.out文件輸出的是包括System.out的日志數(shù)據(jù)
弄明白了Yarn的日志生成機制后,我們接下來去看一下k8s是怎么實現(xiàn)的?
三、Flink源碼分析-Kubernetes
那么在k8s部署時,是否也有這樣的重定向語句呢?為了一探究竟,仍然是分析flink 1.12版本的源碼。在flink-kubernetes項目下,有一個src/main/java/org/apache/flink/kubernetes/utils/KubernetesUtils.java
類,在該類下,存在一個getCommonStartCommand
方法,該方法類似于上面getTaskManagerShellCommand
方法,也是用來構造啟動命令參數(shù)的,但是在該方法下,我發(fā)現(xiàn)就不存在這樣的一條重定向語句:
startCommandValues.put("logging",getLogging(logDirectory + "/" + logFileName + ".log",configDirectory,hasLogback,hasLog4j));
只有這樣的一個寫入.log文件的啟動命令配置。同時遺憾的是,在k8s部署時,也沒有類似上面Yarn那樣可以在DEBUG日志類型下,打印出啟動命令的語句。但是,我們?nèi)匀荒茏龀鲆粋€初步的結論:
flink部署在k8s上時,沒有.out和.err文件,就是由于源碼中在啟動TaskManager/JobManager的啟動命令參數(shù)中,沒有將標準輸出和標準錯誤進行重定向到指定的.out和.err文件導致的。而生成的.log文件,就是在log4j-console.properties中配置的RollingFile滾動的系統(tǒng)日志。
同時我發(fā)現(xiàn),在flink1.11版本時,上面的方法中還保留著跟Yarn一樣的重定向語句,只是從1.12版本之后,就去掉了該重定向語句,是什么原因呢?
至此,我們找出了flink部署到k8s中時,只有一個.log文件的根源。接下來,為了解決最初的原始問題,需要向方案去解決。
四、設計解決方案
首先想到的解決方案,肯定就是將Yarn那里的重定向源碼復制一份到上面的k8s代碼中,然后重新打包Flink再進行部署。但這種方案嘗試之后發(fā)現(xiàn),在用maven打包flink時會出現(xiàn)很多異常,比如包找不到。而且flink有180多個pom要打包,時間應該會花費非常長,在本次需求對flink源碼改動要求不是很大的情況下,感覺這種調(diào)試會花費太多不必要的時間。遂舍棄改方案。
另一個方案,就是想辦法在外層,能不能在將flink打包成鏡像的時候,在它原先源碼中定義的啟動命令參數(shù)后,再手動添加上重定向命令。為此,觀察pod的yaml可以發(fā)現(xiàn),容器啟動的參數(shù)有args下,啟動命令時執(zhí)行/docker-entrypoint.sh腳本
有了這些信息后,就找到docker-entrypoint.sh的啟動腳本,打開后進行分析,通過日志可以知道,腳本執(zhí)行的過程中,會進入到下面的這個分支下:
其中args參數(shù)就是上面容器中的args參數(shù),可以看到原先這個分支的最后一行是去執(zhí)行exec $(drop_privs_cmd) bash -c "${args[@]}"
。因此,我們就可以在這里,手動添加上標準輸出和標準錯誤的重定向到指定文件,也相當于實現(xiàn)了在啟動參數(shù)中加入重定向語句。
這里我們還需要借助args參數(shù)中的-Dlog.file中顯示的是jobmanager還是taskmanager來決定重定向?qū)懭氲奈募莏obmanager.err還是taskmanager.err。為此使用sed命令,先獲取到args中的-Dlog.file內(nèi)容(即上面的參數(shù)logFilePath),然后從logFilePath中,獲取到jobmanager/taskmanager的文件名(即logFileName參數(shù))。
然后,我們添加上重定向命令:
exec $(drop_privs_cmd) bash -c "${args[@]} 1> /opt/flink/log/${logFileName}.out 2> /opt/flink/log/${logFileName}.err
至此,我們就成功在外層flink打包成鏡像時,手動在啟動命令參數(shù)后添加了重定向命令,模擬了Yarn執(zhí)行時的命令,來生成.err和.out文件。接下來,就是打包成鏡像,然后在k8s中進行測試了。經(jīng)過測試,我們發(fā)現(xiàn),在/opt/log/路徑下,真的生成了.out、.err和.log三個文件!!!
同時經(jīng)過測試可以發(fā)現(xiàn),.err、.out和.log文件分別對應了標準錯誤、標準輸出和系統(tǒng)文件三部分內(nèi)容。實現(xiàn)了跟部署在Yarn上時一樣的場景,解決了我們文章最初提出的問題!!!
然而。。卻出現(xiàn)了問題。
五、問題以及對k8s日志的理解
在完成上述測試之后,當我再次點擊pod,或者使用kubectl logs命令來查看日志時發(fā)現(xiàn),日志里竟然只有啟動腳本的一些日志,而flink執(zhí)行的系統(tǒng)日志都沒有了!!
沒辦法,只能再去分析原因了。在kubernetes的官網(wǎng)中,在日志架構章節(jié)中,赫然寫著如下一段話:
容器運行時對寫入到容器化應用程序的
stdout
和stderr
流的所有輸出進行處理和轉(zhuǎn)發(fā)。 不同的容器運行時以不同的方式實現(xiàn)這一點;不過它們與 kubelet 的集成都被標準化為 CRI 日志格式。默認情況下,如果容器重新啟動,kubelet 會保留一個終止的容器及其日志。 如果一個 Pod 被逐出節(jié)點,所對應的所有容器及其日志也會被逐出。
kubelet 通過 Kubernetes API 的特殊功能將日志提供給客戶端訪問。 訪問這個日志的常用方法是運行
kubectl logs
。
雖然我現(xiàn)在對k8s的理解也不夠,但看上面這段話讓我意識到,容器的日志收集或許也是通過監(jiān)聽stdout和stderr來生成的。。。 而由于我上面使用重定向命令,將標準輸出和標準錯誤都重定向到了指定的文件中,導致stdout和stderr無法監(jiān)聽到日志數(shù)據(jù),所以容器內(nèi)的日志就獲取不到了。
或者說,利用上面將標準輸出和標準錯誤重定向?qū)懭胫付ㄖ付ㄎ募姆绞?#xff0c;是相當于將原先容器里的日志,分別根據(jù)日志類型映射到了.err、.out和.log日志文件下來展示。
那這樣分析下來,我發(fā)現(xiàn),flink之所以在1.12版本之后將重定向命令從源碼中去掉,可能為的就是利用k8s的日志聚合,將stdout和stderr都寫入容器日志中,方便后續(xù)對容器日志的監(jiān)控和分析等操作。
嗯。。此時,感覺上面最開始的分析都白費了,因為本身容器的日志實際上就已經(jīng)包含了所有日志數(shù)據(jù)了,根本不用再做.out和.err的區(qū)分了
這里插一句,還記得文章在第一部分提出的問題嗎?這里,大家再思考另一個問題,就是講到這里,我們知道容器會對stdout和stderr流進行處理和轉(zhuǎn)發(fā)。stderr包含flink任務執(zhí)行時的異常信息,stdout包含任務執(zhí)行時的標準輸出信息,那么flink執(zhí)行時的系統(tǒng)日志比如INFO、ERROR日志數(shù)據(jù),容器時從哪里獲取到的呢?log4j中配置的RollingFile類型的appender可不屬于標準輸出。
那么這個問題的答案,也就是flink提交到k8s部署時,為什么使用的是log4j-console.properties配置的原因了。
因為在log4j-console.properties中,會有一個ConsoleAppender的配置,將flink的系統(tǒng)日志打印到CONSOLE(System.out),所以相當于將系統(tǒng)日志打印到了標準輸出,然后容器再通過監(jiān)聽stdout從而獲取到系統(tǒng)日志。
而部署到Yarn時,使用的log4j.properties的配置中,就可以看到并沒有ConsoleAppender的配置,所以它的系統(tǒng)日志全部打印到了.log文件中。
解決了這個問題,再說回之前的分析。我們上面添加的重定向操作,相當于是模仿著Yarn上部署的方式,將原先容器里的日志,分別根據(jù)日志類型映射到了.err、.out和.log日志文件下來展示。但是此時容器中的日志卻丟失了,可能會對后續(xù)我們最容器上的日志采集和分析有影響。
那有沒有什么解決方案呢?
雙寫。嘗試在將標準輸出和標準錯誤重定向到指定文件時,同時重定向到stdout和stderr。為此,我們進行了測試,也就是docker-entrypoint.sh中的下面這行代碼:
exec $(drop_privs_cmd) bash -c "${args[@]} 1> >(tee /opt/flink/log/${logFileName}.out >/dev/stdout) 2> >(tee /opt/flink/log/${logFileName}.err >/dev/stderr)"
命令中的1> >(tee /opt/flink/log/${logFileName}.out >/dev/stdout)
表示將標準輸出重定向到一個匿名的管道中,并將管道中的內(nèi)容通過tee
命令同時輸出到文件/opt/flink/log/${logFileName}.out
和標準輸出設備中。
經(jīng)過測試,可以實現(xiàn)上面的功能,即既有.out和.err文件,同時,容器日志也恢復最初的狀態(tài)。
但是需要說明一點的是,由于log4j-console.properties配置把系統(tǒng)日志也作為標準輸出的一部分,因此生成的.out文件中實際上包含了任務中System.out的輸出和系統(tǒng)文件兩部分內(nèi)容。而.err文件則只包含了標準錯誤的日志內(nèi)容。
至此,實現(xiàn)的日志效果是:
- 容器日志:包含系統(tǒng)日志、標準輸出、標準錯誤
- .out日志:包含系統(tǒng)日志、標準輸出
- .err日志:包含標準錯誤
以上就是本次,針對最初提出的k8s日志問題,進行的一次深入探究和思考。在研究過程中,對log4j的日志配置也有了更深入的理解,由于一開始對容器和k8s技術的不了解,導致最后似乎實現(xiàn)的結果不理想,但技術不就是不斷探究的過程嗎!
關于上面的問題,如果有遇到類似的也歡迎找我探討,感謝閱讀!