Group管理,查看GroupID下單個(gè)消費(fèi)端堆棧信息,期望只展示與該GroupID相關(guān)" />

国产三级日本三级日产三级66,五月天激情婷婷大综合,996久久国产精品线观看,久久精品人人做人人爽97

RocketMQ控制臺消費(fèi)者堆棧信息展示優(yōu)化分析-當(dāng)前播報(bào)

首頁 > 探索 > > 正文

日期:2023-03-28 10:14:01    來源:今日頭條    
背景介紹

專有云企業(yè)版v_3_12,消息隊(duì)列RocketMQ控制臺->Group管理,查看Group ID下單個(gè)消費(fèi)端堆棧信息,期望只展示與該Group ID相關(guān)的堆棧信息,在以下場景與期望不符。


(相關(guān)資料圖)

場景介紹

在同一個(gè)程序中創(chuàng)建兩個(gè)不同Group ID的消費(fèi)端實(shí)例,在控制臺中查看一個(gè)Group ID下單個(gè)消費(fèi)端堆棧信息,堆棧信息中包含了兩個(gè)Group ID消費(fèi)端的堆棧信息,給排查問題造成了困擾。

示例代碼pom
  com.aliyun.openservices  ons-client  1.8.8.3.Final
code
import com.aliyun.openservices.ons.api.Action;import com.aliyun.openservices.ons.api.PropertyKeyConst;import com.aliyun.openservices.ons.api.batch.BatchMessageListener;import com.aliyun.openservices.ons.api.bean.BatchConsumerBean;import com.aliyun.openservices.ons.api.bean.Subscription;import java.util.HashMap;import java.util.Map;import java.util.Properties;public class Main {    public static void main(String[] args){        String nameSrvAddr = "xxx";        String accessKey = "xxx";        String secretKey = "xxx";        String groupId1 = "Goup_ID_1";        String topic1 = "xxx_1";        String tag1 = "xxx_1";        BatchMessageListener batchMessageListener1 = (messages, context) -> Action.CommitMessage;        BatchConsumerBean batchConsumerBean1 = batchConsumerBean(nameSrvAddr,accessKey,secretKey,                                                                 groupId1,topic1,tag1,batchMessageListener1);        batchConsumerBean1.start();        String groupId2 = "Goup_ID_2";        String topic2 = "xxx_2";        String tag2 = "xxx_2";        BatchMessageListener batchMessageListener2 = (messages, context) -> Action.CommitMessage;        BatchConsumerBean batchConsumerBean2 = batchConsumerBean(nameSrvAddr,accessKey,secretKey,                                                                 groupId2,topic2,tag2,batchMessageListener2);        batchConsumerBean2.start();    }    private static BatchConsumerBean batchConsumerBean(String nameSrvAddr,String accessKey,String secretKey,String groupId,String topic,String tag,BatchMessageListener batchMessageListener){        BatchConsumerBean batchConsumerBean = new BatchConsumerBean();        Properties properties = new Properties();        properties.put(PropertyKeyConst.NAMESRV_ADDR,nameSrvAddr);        properties.put(PropertyKeyConst.AccessKey,accessKey);        properties.put(PropertyKeyConst.SecretKey,secretKey);        properties.put(PropertyKeyConst.GROUP_ID,groupId);        batchConsumerBean.setProperties(properties);        Subscription subscription = new Subscription();        subscription.setTopic(topic);        subscription.setExpression(tag);        Map subscriptionTable = new HashMap<>();        subscriptionTable.put(subscription,batchMessageListener);        batchConsumerBean.setSubscriptionTable(subscriptionTable);        return batchConsumerBean;    }}
分析過程

首先分析示例代碼中與BatchConsumerBean相關(guān)聯(lián)的對象,然后分析控制臺展示消費(fèi)端堆棧信息的流程,最后分析下不同版本的RocketMQ Client SDK對消費(fèi)端消費(fèi)線程命名方式的變化。

BatchConsumerBean

示例代碼中創(chuàng)建了兩個(gè)BatchConsumerBean實(shí)例,與BatchConsumerBean實(shí)例相關(guān)聯(lián)的對象如下:

與BatchConsumerBean關(guān)聯(lián)的對象

從上圖看,BatchConsumerBean實(shí)例是比較重的,所以上面的示例代碼可以優(yōu)化為只創(chuàng)建一個(gè)BatchConsumerBean實(shí)例,與該問題不太相關(guān),暫時(shí)忽略;上圖中與該問題直接相關(guān)的是ClientRemotingProcessor、MQClientInstance、DefaultMQPushConsumerImpl、ConsumerStatsManager,下面繼續(xù)分析。

堆棧信息展示流程

下面描述的是在瀏覽器請求一個(gè)Group ID單個(gè)消費(fèi)端堆棧信息的流程。

堆棧信息展示流程

瀏覽器請求控制臺應(yīng)用

當(dāng)在控制臺單機(jī)某個(gè)消費(fèi)端堆棧信息的時(shí)候,瀏覽器會向控制臺應(yīng)用發(fā)起http請求,主要請求參數(shù)是:GroupID,ClientId,其中每個(gè)MQClientInstance實(shí)例對應(yīng)一個(gè)ClientId。

控制臺應(yīng)用請求Broker

控制臺應(yīng)用收到瀏覽器請求后,主要進(jìn)行以下操作:

String topic = MixAll.RETRY_GROUP_TOPIC_PREFIX + consumerGroup;TopicRouteData topicRouteData = this.examineTopicRouteInfo(topic);List brokerDatas = topicRouteData.getBrokerDatas();if (brokerDatas != null) {    for (BrokerData brokerData : brokerDatas) {        String addr = brokerData.selectBrokerAddr();        if (addr != null) {            return this.mqClientInstance.getMQClientAPIImpl().getConsumerRunningInfo(addr, consumerGroup, clientId, jstack,timeoutMillis * 3);        }    }}
根據(jù)%RETRY% + GroupIID查找對應(yīng)的TopicRouteData從TopicRouteData中選擇一個(gè)Broker的地址發(fā)送getConsumerRunningInfo請求Broker請求Consumer

Broker收到請求后,主要進(jìn)行以下操作:

ClientChannelInfo clientChannelInfo = this.brokerController.getConsumerManager().findChannel(consumerGroup, clientId);RemotingCommand newRequest = RemotingCommand.createRequestCommand(requestCode, null);newRequest.setExtFields(request.getExtFields());newRequest.setBody(request.getBody());return this.brokerController.getBroker2Client().callClient(clientChannelInfo.getChannel(), newRequest);
AdminBrokerProcessor響應(yīng)查詢請求根據(jù)GroupID和ClientId找到對應(yīng)Consumer實(shí)例的channel socket通過channel socket發(fā)送請求到Consumer實(shí)例Consumer處理邏輯

Consumer收到請求后,主要進(jìn)行以下操作:

ConsumerRunningInfo consumerRunningInfo = this.mqClientFactory.consumerRunningInfo(requestHeader.getConsumerGroup());if (requestHeader.isJstackEnable()) {  Map map = Thread.getAllStackTraces();  String jstack = UtilAll.jstack(map);  consumerRunningInfo.setJstack(jstack);}
通過MQClientInstance實(shí)例請求Consumer實(shí)例的consumerRunningInfo方法獲取Consumer運(yùn)行信息,如:pullRT、pullTPS、consumeRT、consumeOKTPS、consumeFailedTPS等信息獲取JVM所有線程棧信息將獲取到的ConsumerRunningInfo返回給Broker。

其中第2步【獲取JVM所有線程棧信息】就是我們需要查看的堆棧信息,目前控制臺主要展示了以ConsumeMessageThread__開頭的線程和RebalanceService線程,這塊期望只展示與該消費(fèi)端相關(guān)的ConsumeMessageThread__線程和Rebalance線程,不期望將不相關(guān)的消費(fèi)端線程也展示出來。

ConsumeMessageThread線程的命名

在當(dāng)前版本中處理業(yè)務(wù)的消費(fèi)者線程名的形式是:ConsumeMessageThread_數(shù)字,ConsumeMessageConcurrentlyService類中相關(guān)代碼如下:

//該線程池用于處理業(yè)務(wù)邏輯this.consumeExecutor = new ThreadPoolExecutor(  this.defaultMQPushConsumer.getConsumeThreadMin(),  this.defaultMQPushConsumer.getConsumeThreadMax(),  1000 * 60,  TimeUnit.MILLISECONDS,  this.consumeRequestQueue,  new ThreadFactoryImpl("ConsumeMessageThread_"));

新版本中線程的命名中增加了GroupId,相關(guān)代碼如下:

String consumeThreadPrefix = null;if (consumerGroup.length() > 100) {    consumeThreadPrefix = new StringBuilder("ConsumeMessageThread_").append(consumerGroup, 0, 100).append("_").toString();} else {    consumeThreadPrefix = new StringBuilder("ConsumeMessageThread_").append(consumerGroup).append("_").toString();}this.consumeExecutor = new ThreadPoolExecutor(    this.defaultMQPushConsumer.getConsumeThreadMin(),    this.defaultMQPushConsumer.getConsumeThreadMax(),    1000 * 60,    TimeUnit.MILLISECONDS,    this.consumeRequestQueue,    new ThreadFactoryImpl(consumeThreadPrefix));

線程名形式為:ConsumeMessageThread_GroupId__數(shù)字,從一定程度對以上問題進(jìn)行了優(yōu)化。

總結(jié)ONS SDK對RocketMQ Client進(jìn)行了封裝,更加方便業(yè)務(wù)的使用,Consumer對象比較重,需要根據(jù)業(yè)務(wù)采用合理的初始化方式ConsumerStatsManager記錄了消費(fèi)端的一些統(tǒng)計(jì)信息ConsumeMessageConcurrentlyService對消費(fèi)端線程命名進(jìn)行了優(yōu)化?

關(guān)鍵詞:

下一篇:諸多精進(jìn)行跡(六)
上一篇:最后一頁

科技

 
国产三级日本三级日产三级66,五月天激情婷婷大综合,996久久国产精品线观看,久久精品人人做人人爽97
    • <bdo id="qgeso"></bdo>
        • <strike id="qgeso"></strike>
        • <sup id="qgeso"></sup><center id="qgeso"></center>
        • <input id="qgeso"></input>
          主站蜘蛛池模板: 欧美一级视频在线观看| 亚洲社区在线观看| 国产精品啪视频| 国产一区视频在线播放| 91精品中国老女人| 亚洲国产另类久久精品| 日韩久久精品电影| 中文字幕在线观看亚洲| 欧美成人性色生活仑片| 狠狠躁天天躁日日躁欧美| 91精品国产精品| 国产精品一区二区三区在线播放| 国产精品永久免费| 日韩精品极品毛片系列视频| 丝袜美腿亚洲一区二区| 欧美另类精品xxxx孕妇| 性日韩欧美在线视频| 国产精品入口免费视频一| 亚洲电影免费在线观看| 综合av色偷偷网| 色综合天天综合网国产成人网 | 国产精品入口日韩视频大尺度| 成人激情视频在线播放| 国产一区二区三区精品久久久 | 久久精品国产一区二区电影| 欧美视频中文字幕在线| 国产99视频精品免视看7| 亚洲成av人影院在线观看| 在线日韩欧美视频| 久久久久久久影视| 亚洲一区精品电影| 蜜臀久久99精品久久久久久宅男| 91av视频导航| 日韩精品中文字幕久久臀| 欧美美女15p| 国产精品欧美在线| 日韩中文字幕网| 国产成人综合久久| 亚洲性生活视频在线观看| 欧美极品少妇与黑人| 亚洲高清福利视频|