老熟女激烈的高潮_日韩一级黄色录像_亚洲1区2区3区视频_精品少妇一区二区三区在线播放_国产欧美日产久久_午夜福利精品导航凹凸

重慶分公司,新征程啟航

為企業(yè)提供網(wǎng)站建設(shè)、域名注冊、服務(wù)器等服務(wù)

如何進行RocketMQ事務(wù)消息實現(xiàn)-創(chuàng)新互聯(lián)

這篇文章將為大家詳細講解有關(guān)如何進行RocketMQ事務(wù)消息實現(xiàn),文章內(nèi)容質(zhì)量較高,因此小編分享給大家做個參考,希望大家閱讀完這篇文章后對相關(guān)知識有一定的了解。

成都創(chuàng)新互聯(lián)專注于平遠企業(yè)網(wǎng)站建設(shè),自適應(yīng)網(wǎng)站建設(shè),成都商城網(wǎng)站開發(fā)。平遠網(wǎng)站建設(shè)公司,為平遠等地區(qū)提供建站服務(wù)。全流程按需求定制開發(fā),專業(yè)設(shè)計,全程項目跟蹤,成都創(chuàng)新互聯(lián)專業(yè)和態(tài)度為您提供的服務(wù)

摘要:            事務(wù)消息提交或回滾的實現(xiàn)原理就是根據(jù)commitlogOffset找到消息,如果是提交動作,就恢復(fù)原消息的主題與隊列,再次存入commitlog文件進而轉(zhuǎn)到消息消費隊列,供消費者消費,然后將原預(yù)處理消息存入一個新的主題RMQ_SYS_TRANS_OP_HALF_TOPIC,代表該消息已被處理;回滾消息與提交事務(wù)消息不同的是,提交事務(wù)消息會將消息恢復(fù)原主題與隊列,再次存儲在commitlog文件中。

若您對RocketMQ技術(shù)感興趣,請加入 RocketMQ技術(shù)交流群

小編將重點分析RocketMQ Broker如何處理事務(wù)消息提交、回滾命令,根據(jù)前面的介紹,其入口EndTransactionProcessor#processRequest:

OperationResult result = new OperationResult();if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) {        // @1result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader);    // @2
      if (result.getResponseCode() == ResponseCode.SUCCESS) {  // @3
            RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);    // @4
          if (res.getCode() == ResponseCode.SUCCESS) {
                MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage());     // @5
                msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));
                msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());
                msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());
                msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp());    // @6
                RemotingCommand sendResult = sendFinalMessage(msgInner);                              // @7
                if (sendResult.getCode() == ResponseCode.SUCCESS) {             
                    this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());    // @8
                }                return sendResult;
           }          return res;
     }
}

代碼@1:如果請求為提交事務(wù),進入事務(wù)消息提交處理流程。
代碼@2:提交消息,別被這名字誤導(dǎo)了,該方法主要是根據(jù)commitLogOffset從commitlog文件中查找消息返回OperationResult實例:

  • private MessageExt prepareMessage :消息對象。

  • private int responseCode:查找結(jié)果。

  • private String responseRemark :錯誤提示。

代碼@3:如果成功查找到消息,則繼續(xù)處理,否則返回給客戶端,消息未找到錯誤信息。

代碼@4:驗證消息必要字段。
驗證消息的生產(chǎn)組與請求信息中的生產(chǎn)者組是否一致。
驗證消息的隊列偏移量(queueOffset)與請求信息中的偏移量是否一致。
驗證消息的commitLogOffset與請求信息中的CommitLogOffset是否一致。

代碼@5:調(diào)用endMessageTransaction方法,該方法主要的目的就是恢復(fù)事務(wù)消息的真實的主題、隊列,并設(shè)置事務(wù)ID。

代碼@6:設(shè)置消息的相關(guān)屬性,這一步應(yīng)該直接在endMessageTransaction中實現(xiàn)就好,統(tǒng)一恢復(fù)原消息的數(shù)量,特別關(guān)注的是取消了事務(wù)相關(guān)的系統(tǒng)標(biāo)記。

代碼@7:發(fā)送最終消息,其實現(xiàn)原理非常簡單,調(diào)用MessageStore將消息存儲在commitlog文件中,此時的消息,會被轉(zhuǎn)發(fā)到原消息主題對應(yīng)的消費隊列,被消費者消費。

代碼@8:刪除預(yù)處理消息(prepare),其實是將消息存儲在主題為:RMQ_SYS_TRANS_OP_HALF_TOPIC的主題中,代表這些消息已經(jīng)被處理(提交或回滾)。

上述就是事務(wù)消息提交的流程,事務(wù)回滾類似,接下來大概分析一下事務(wù)消息回滾的流程。

EndTransactionProcessor#processRequest else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) {
       result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader);    // @1
       if (result.getResponseCode() == ResponseCode.SUCCESS) {
            RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);            if (res.getCode() == ResponseCode.SUCCESS) {                this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());   // @2
            }           return res;
       }
}

代碼@1:回滾消息,其實內(nèi)部就是根據(jù)commitlogOffset查找消息。
代碼@2:將消息存儲在RMQ_SYS_TRANS_OP_HALF_TOPIC中,代表該消息已被處理,與提交事務(wù)消息不同的是,提交事務(wù)消息會將消息恢復(fù)原主題與隊列,再次存儲在commitlog文件中。

事務(wù)消息在Broker服務(wù)端的提交回滾流程就介紹到這了。其核心實現(xiàn)就是根據(jù)commitlogOffset找到消息,如果是提交動作,就恢復(fù)原消息的主題與隊列,再次存入commitlog文件進而轉(zhuǎn)到消息消費隊列,供消費者消費,然后將原預(yù)處理消息存入一個新的主題RMQ_SYS_TRANS_OP_HALF_TOPIC,代表該消息已被處理;回滾消息與提交事務(wù)消息不同的是,提交事務(wù)消息會將消息恢復(fù)原主題與隊列,再次存儲在commitlog文件中。

關(guān)于如何進行RocketMQ事務(wù)消息實現(xiàn)就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,可以學(xué)到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。


分享題目:如何進行RocketMQ事務(wù)消息實現(xiàn)-創(chuàng)新互聯(lián)
鏈接URL:http://www.xueling.net.cn/article/jspde.html

其他資訊

在線咨詢
服務(wù)熱線
服務(wù)熱線:028-86922220
TOP
主站蜘蛛池模板: 超碰97人人草 | 欧美一级特黄aaaaaaa色戒 | 日本少妇又色又爽又高潮看你 | 一区二区国产精品视频 | 欧美一区二区高清在线观看 | 国产精品一区二区高清在线 | 久久久久亚洲AV成人网址 | 亚洲国产精品成人综合色 | 日韩精品视频一区二区三区 | 人妻丝袜中文无码av影音先锋专区 | 亚色成人 | 国产学生系列一区二区三区 | 亚洲精品久久久久久久久久久 | 超鹏97国语 | 国产一区在线观看视频 | 国内丰满少妇猛烈精品播 | 日韩国产精 | 国产91色在线免费 | 三级黄色在线看 | 肉岳疯狂69式激情的高潮 | bt天堂新版中文在线地址 | 亚洲一区在线国产 | 国产精品免费视频二三区 | 看曰本女人大战黑人视频 | 亚洲一区二区自拍 | 初尝办公室人妻少妇 | 久久精品福利网站免费 | 在线观看日韩 | 日本成本人片无码免费视频网站 | 18毛片| 凹凸69堂国产成人精品 | 欧美日韩国产一区二区三区不卡 | 亚洲爆乳成AV人在线视水卜 | 国产精品成人永久在线 | 人人草人人澡 | 国产精品色区 | 亚洲欧美日韩综合久久久 | 亚洲成人字幕 | 国产精品一区hongkongdoll | 亚洲天堂福利视频 | 成人国产一区二区三区 |