江門住房城鄉(xiāng)建設廳網(wǎng)站列舉常見的網(wǎng)絡營銷工具
前面分析完Broker啟動會啟動RemotingServer服務同時會注冊Processor處理器,接著分析Producer進行消息的發(fā)送,當Producer發(fā)送完消息后就得到Broker去接收Producer發(fā)送的消息了。
Producer發(fā)送給Broker消息時候,發(fā)送的請求code為SEND_MESSAGE(這里在上一章節(jié)有過分析),根據(jù)消息發(fā)送過來的Code,這時會調(diào)用NettyRemotingAbstract的processRequestCommand方法,該方法里面會根據(jù)消息傳輸?shù)腃ode來取出對應的Processor,進入Processor系列類的SendMessageProcessor的asyncProcessRequest方法(前面這一部分之前都有過分析,接下來我們一起看看后面的操作,正好也將之前的知識串在一起更有利于理解和記憶)
public CompletableFuture<RemotingCommand> asyncProcessRequest(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {final SendMessageContext mqtraceContext;switch (request.getCode()) {// 消息重回隊列case RequestCode.CONSUMER_SEND_MSG_BACK:return this.asyncConsumerSendMsgBack(ctx, request);default:// 解析消息頭SendMessageRequestHeader requestHeader = parseRequestHeader(request);if (requestHeader == null) {return CompletableFuture.completedFuture(null);}// 構(gòu)建上下文,并調(diào)用處理前鉤子函數(shù)mqtraceContext = buildMsgContext(ctx, requestHeader);this.executeSendMessageHookBefore(ctx, request, mqtraceContext);// 判斷批量消息還是單條消息if (requestHeader.isBatch()) {return this.asyncSendBatchMessage(ctx, request, mqtraceContext, requestHeader);} else {return this.asyncSendMessage(ctx, request, mqtraceContext, requestHeader);}}
}
首先解析消息頭構(gòu)建上下文,處理消息發(fā)送前鉤子函數(shù),最后異步處理消息請求,如果是批量消息調(diào)用asyncSendBatchMessage方法,如果是單條消息調(diào)用asyncSendMessage方法。
處理單條消息 private CompletableFuture<RemotingCommand> asyncSendMessage(ChannelHandlerContext ctx, RemotingCommand request,SendMessageContext mqtraceContext,SendMessageRequestHeader requestHeader) {// 準備響應命令對象final RemotingCommand response = preSend(ctx, request, requestHeader);final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader();if (response.getCode() != -1) {return CompletableFuture.completedFuture(response);}final byte[] body = request.getBody();int queueIdInt = requestHeader.getQueueId();TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());if (queueIdInt < 0) {queueIdInt = randomQueueId(topicConfig.getWriteQueueNums());}MessageExtBrokerInner msgInner = new MessageExtBrokerInner();msgInner.setTopic(requestHeader.getTopic());msgInner.setQueueId(queueIdInt);if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig)) {return CompletableFuture.completedFuture(response);}msgInner.setBody(body);msgInner.setFlag(requestHeader.getFlag());Map<String, String> origProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());MessageAccessor.setProperties(msgInner, origProps);// 時間msgInner.setBornTimestamp(requestHeader.getBornTimestamp());// 遠程地址msgInner.setBornHost(ctx.channel().remoteAddress());// 主機msgInner.setStoreHost(this.getStoreHost());// 重試次數(shù)msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());String clusterName = this.brokerController.getBrokerConfig().getBrokerClusterName();// ...省略CompletableFuture<PutMessageResult> putMessageResult = null;String transFlag = origProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);// 事務消息if (Boolean.parseBoolean(transFlag)) {if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {response.setCode(ResponseCode.NO_PERMISSION);response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()+ "] sending transaction message is forbidden");return CompletableFuture.completedFuture(response);}// 事務消息的狀態(tài)(后面再分析)putMessageResult = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);} else {// 消息存儲putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);}// 生成結(jié)果返回return handlePutMessageResultFuture(putMessageResult, response, request, msgInner, responseHeader, mqtraceContext, ctx, queueIdInt);}
構(gòu)建MessageExtBrokerInner對象,設置相關(guān)屬性執(zhí)行asyncPutMessage方法存儲消息并將結(jié)果返回客戶端。
創(chuàng)建響應,驗證以及自動創(chuàng)建topic
// 準備響應,驗證以及自動創(chuàng)建topic
private RemotingCommand preSend(ChannelHandlerContext ctx, RemotingCommand request,SendMessageRequestHeader requestHeader) {// 準備響應final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);// 設置唯一idresponse.setOpaque(request.getOpaque());response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId());response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn()));log.debug("Receive SendMessage request command {}", request);// 獲取broker處理請求服務的起始時間final long startTimestamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp();if (this.brokerController.getMessageStore().now() < startTimestamp) {response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark(String.format("broker unable to service, until %s", UtilAll.timeMillisToHumanString2(startTimestamp)));return response;}response.setCode(-1);// 驗證topic以及自動創(chuàng)建邏輯super.msgCheck(ctx, requestHeader, response);return response;
}
this.brokerController.getBrokerConfig().isAutoCreateTopicEnable()用來判斷是否支持自動創(chuàng)建topic,根據(jù)權(quán)限來判斷如果是不支持自動創(chuàng)建就將權(quán)限設置為可讀可寫不可繼承,后面我們?nèi)ヅ袛嗍欠窨梢匀ダ^承,如果能繼承就說明支持自動創(chuàng)建,這是就會new一個TopicConfig,這樣就通過autoCreateTopicEnable自動來控制是否能夠自動創(chuàng)建topic,同時也會調(diào)用registerBrokerAll方法注冊到Broker路由信息里面,當然官方建議我們還是不要開啟這個配置因為它沒有做到壓力的分攤。
存盤 asyncPutMessage方法
根據(jù)topic查詢對應的路由信息即broker。
public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) { msg.setStoreTimestamp(System.currentTimeMillis());msg.setBodyCRC(UtilAll.crc32(msg.getBody()));AppendMessageResult result = null;StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();String topic = msg.getTopic();final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {// Delay Delivery// 延遲消息轉(zhuǎn)到系統(tǒng)Topic(后面在分析)if (msg.getDelayTimeLevel() > 0) {// ...省略}}// 發(fā)送消息地址InetSocketAddress bornSocketAddress = (InetSocketAddress) msg.getBornHost();if (bornSocketAddress.getAddress() instanceof Inet6Address) {msg.setBornHostV6Flag();}// 存儲消息地址InetSocketAddress storeSocketAddress = (InetSocketAddress) msg.getStoreHost();if (storeSocketAddress.getAddress() instanceof Inet6Address) {msg.setStoreHostAddressV6Flag();}// 更新消息大小PutMessageThreadLocal putMessageThreadLocal = this.putMessageThreadLocal.get();updateMaxMessageSize(putMessageThreadLocal);if (!multiDispatch.isMultiDispatchMsg(msg)) {PutMessageResult encodeResult = putMessageThreadLocal.getEncoder().encode(msg);if (encodeResult != null) {return CompletableFuture.completedFuture(encodeResult);}msg.setEncodedBuff(putMessageThreadLocal.getEncoder().getEncoderBuffer());}PutMessageContext putMessageContext = new PutMessageContext(generateKey(putMessageThreadLocal.getKeyBuilder(), msg));long elapsedTimeInLock = 0;MappedFile unlockMappedFile = null;// 寫入CommitLog文件前加鎖,保證文件操作并發(fā)安全putMessageLock.lock(); //spin or ReentrantLock ,depending on store configtry {// 獲取最后一個mapperFileMappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();this.beginTimeInLock = beginLockTimestamp;msg.setStoreTimestamp(beginLockTimestamp);// 如果不存在或者滿了就創(chuàng)建一個if (null == mappedFile || mappedFile.isFull()) {mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise}if (null == mappedFile) {log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null));}// 實際寫入CommitLog,在后面追加result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);switch (result.getStatus()) {// 添加成功直接breakcase PUT_OK:break;// 表示當前文件存放不下,只保存了一部分case END_OF_FILE:unlockMappedFile = mappedFile;// 創(chuàng)建一個新的文件mappedFile = this.mappedFileQueue.getLastMappedFile(0);if (null == mappedFile) {// XXX: warn and notify melog.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result));}// 繼續(xù)追加進去result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);break;case MESSAGE_SIZE_EXCEEDED:case PROPERTIES_SIZE_EXCEEDED:return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result));case UNKNOWN_ERROR:return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));default:return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));}// 鎖的時間elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;} finally {beginTimeInLock = 0;putMessageLock.unlock();}if (elapsedTimeInLock > 500) {log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", elapsedTimeInLock, msg.getBody().length, result);}if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {this.defaultMessageStore.unlockMappedFile(unlockMappedFile);}PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);// StatisticsstoreStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).add(1);storeStatsService.getSinglePutMessageTopicSizeTotal(topic).add(result.getWroteBytes());// 提交刷盤申請CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, msg);// 提交主從復制申請CompletableFuture<PutMessageStatus> replicaResultFuture = submitReplicaRequest(result, msg);return flushResultFuture.thenCombine(replicaResultFuture, (flushStatus, replicaStatus) -> {if (flushStatus != PutMessageStatus.PUT_OK) {putMessageResult.setPutMessageStatus(flushStatus);}if (replicaStatus != PutMessageStatus.PUT_OK) {putMessageResult.setPutMessageStatus(replicaStatus);}return putMessageResult;});}
首先它會去處理延時消息這里我不做過細的分析,后面針對各種消息在來具體分析,接著就將消息進行編碼然后加鎖并寫入消息以獲取最后文件進行追加的方式來將消息內(nèi)存文件里面,最后進行刷盤以及通知主從同步的操作。