大型門戶網(wǎng)站建設(shè)需要哪些技術(shù)百度號(hào)碼認(rèn)證平臺(tái)官網(wǎng)首頁
背景
當(dāng)flink消費(fèi)kafka的消息時(shí),我們經(jīng)常會(huì)用到FlinkKafkaConsumer進(jìn)行水位線的發(fā)送,本文就從源碼看下FlinkKafkaConsumer.assignTimestampsAndWatermarks指定周期性水位線發(fā)送的流程
FlinkKafkaConsumer水位線發(fā)送
1.首先從Fetcher類開始,創(chuàng)建Fetcher類的時(shí)候會(huì)構(gòu)建一個(gè)周期性的水位線發(fā)送線程并啟動(dòng)
// if we have periodic watermarks, kick off the interval schedulerif (timestampWatermarkMode == WITH_WATERMARK_GENERATOR && autoWatermarkInterval > 0) {PeriodicWatermarkEmitter<T, KPH> periodicEmitter =new PeriodicWatermarkEmitter<>(checkpointLock,subscribedPartitionStates,watermarkOutputMultiplexer,processingTimeProvider,autoWatermarkInterval);periodicEmitter.start();}
2.隨后,PeriodicWatermarkEmitter中注冊處理時(shí)間定時(shí)器,周期性執(zhí)行
public void start() {timerService.registerTimer(timerService.getCurrentProcessingTime() + interval, this);}@Overridepublic void onProcessingTime(long timestamp) {synchronized (checkpointLock) {for (KafkaTopicPartitionState<?, ?> state : allPartitions) {// 這里當(dāng)前算子任務(wù)消費(fèi)的kafka 分區(qū)分別記錄每個(gè)分區(qū)的水位值state.onPeriodicEmit();}//這里當(dāng)前算子會(huì)把自己消費(fèi)的kafka分區(qū)的所有水位線取最小值后當(dāng)成當(dāng)前算子任務(wù)自身的水位線發(fā)送出去,注意這里是當(dāng)前算子任務(wù)級(jí)別的watermarkOutputMultiplexer.onPeriodicEmit();}// schedule the next watermarktimerService.registerTimer(timerService.getCurrentProcessingTime() + interval, this);}}
3.對應(yīng)state.onPeriodicEmit();記錄每個(gè)kafka分區(qū)的水位線方法
@Overridepublic void onPeriodicEmit(WatermarkOutput output) {final org.apache.flink.streaming.api.watermark.Watermark next = wms.getCurrentWatermark();if (next != null) {output.emitWatermark(new Watermark(next.getTimestamp()));}}
其中 WatermarkOutput output.emitWatermark(new Watermark(next.getTimestamp()))代碼如下:public DeferredOutput(OutputState state) {this.state = state;}@Overridepublic void emitWatermark(Watermark watermark) {state.setWatermark(watermark.getTimestamp());}
所以這里最終效果只是對應(yīng)state(kafka分區(qū)[注意,一個(gè)算子任務(wù)有可能消費(fèi)好幾個(gè)kafka分區(qū)])上設(shè)置了水位線/*** Returns true if the watermark was advanced, that is if the new watermark is larger than* the previous one.** <p>Setting a watermark will clear the idleness flag.*/public boolean setWatermark(long watermark) {this.idle = false;final boolean updated = watermark > this.watermark;// 這里也可以看出來,即使代碼里面發(fā)送了更小值的水位線,水位線也不會(huì)回退this.watermark = Math.max(watermark, this.watermark);return updated;}
4.對應(yīng)算子任務(wù)組合當(dāng)前任務(wù)消費(fèi)的所有分區(qū)水位線的方法
private void updateCombinedWatermark() {long minimumOverAllOutputs = Long.MAX_VALUE;boolean hasOutputs = false;boolean allIdle = true;for (OutputState outputState : watermarkOutputs) {if (!outputState.isIdle()) {minimumOverAllOutputs = Math.min(minimumOverAllOutputs, outputState.getWatermark());allIdle = false;}hasOutputs = true;}// if we don't have any outputs minimumOverAllOutputs is not valid, it's still// at its initial Long.MAX_VALUE state and we must not emit that// 如果算子任務(wù)不消費(fèi)任何分區(qū),它不會(huì)發(fā)出任何水位線,這里是不是就是kafka消費(fèi)者要小于kafka主題的原因所在???if (!hasOutputs) {return;}if (allIdle) {// 如果當(dāng)前算子任務(wù)處于空閑時(shí)間,標(biāo)識(shí)空閑,以便后續(xù)算子可以繼續(xù)推進(jìn)underlyingOutput.markIdle();} else if (minimumOverAllOutputs > combinedWatermark) {combinedWatermark = minimumOverAllOutputs;underlyingOutput.emitWatermark(new Watermark(minimumOverAllOutputs));}}```