李马克
李马克
发布于 2024-07-24 / 61 阅读
0

【Server】RocketMq集群——2m-2a-async

1. 配置三台主机的IP与主机名的映射

三台机器都使用 vim /etc/hosts 将上边的ip与主机名称放到[[hosts文件]]里边。

cat /etc/hosts 可以显示hosts文件里边的内容

开启mq1,mq2,mq3防火墙端口

sudo firewall-cmd --permanent --add-port=9876/tcp
sudo firewall-cmd --permanent --add-port=10911/tcp
sudo firewall-cmd --permanent --add-port=10909/tcp
sudo firewall-cmd --permanent --add-port=11009/tcp
sudo firewall-cmd --permanent --add-port=11011/tcp
sudo firewall-cmd --permanent --add-port=30909/tcp
sudo firewall-cmd --permanent --add-port=30911/tcp
sudo firewall-cmd --permanent --add-port=40911/tcp
sudo firewall-cmd --permanent --reload

配置RocketMQ启动文件和配置文件

2. 安装JDK 1.8和RocketMQ

直接使用yum或者其他安装工具安装jdk,保证安装javac和jps(jdk而不是只有jre)

yum install java-1.8.0-openjdk-devel

在三台机器 /opt/ 目录下下载rocketmq,可以使用wget

wget https://archive.apache.org/dist/rocketmq/4.9.8/rocketmq-all-4.9.8-bin-release.zip

unzip rocketmq-all-4.9.8-bin-release.zip 解压文件到 /opt 目录。可以修改目录的名称为 rocketmq

3. 配置JDK和RocketMQ环境变量

打开 /etc/profile 文件

添加如下内容(次部分内容只要正确export环境变量即可,具体写法不限)

ROCKETMQ_HOME=/opt/rocketmq
PATH= $PATH:$ROCKETMQ_HOME/bin
export PATH ROCKETMQ_HOME
export NAMESRV_ADDR='mq1:9876;mq2:9876;mq3:9876'

4. 集群部署模式介绍

1. 单 master 模式

只有一个 master 节点,称不上是集群,一旦这个 master 节点宕机,那么整个服务就不可用。

2. 多 master 模式

多个 master 节点组成集群,单个 master 节点宕机或者重启对应用没有影响。

优点:所有模式中性能最高(一个Topic的可以分布在不同的master,进行横向拓展)在多主多从的架构体系下,无论使用客户端还是管理界面创建主题,一个主题都会创建多份队列在多主中(默认是4个的话,双主就会有8个队列,每台主4个队列,所以双主可以提高性能,一个Topic的分布在不同的master,方便进行横向拓展。

缺点:单个 master 节点宕机期间,未被消费的消息在节点恢复之前不可用,消息的实时性就受到影响。

3. 多master 多 slave 异步复制模式

而从节点(Slave)就是复制主节点的数据,对于生产者完全感知不到,对于消费者正常情况下也感知不到。(只有当Master不可用或者繁忙的时候,Consumer会被自动切换到从Slave 读。)

在多 master 模式的基础上,每个 master 节点都有至少一个对应的 slave。master节点可读可写,但是 slave只能读不能写,类似于 mysql 的主备模式。

优点: 一般情况下都是master消费,在 master 宕机或超过负载时,消费者可以从 slave 读取消息,消息的实时性不会受影响,性能几乎和多 master 一样。

缺点:使用异步复制的同步方式有可能会有消息丢失的问题。(Master宕机后,生产者发送的消息没有消费完,同时到Slave节点的数据也没有同步完)

4. 多master多 slave主从同步复制+异步刷盘

优点:主从同步复制模式能保证数据不丢失。

缺点:发送单个消息响应时间会略长,性能相比异步复制低10%左右。

对数据要求较高的场景,主从同步复制方式,保存数据热备份,通过异步刷盘方式,保证rocketMQ高吞吐量。

5. Dlegder

在RocketMQ4.5版本之后推出了Dlegder模式,但是这种模式一直存在严重BUG,同时性能有可能有问题,包括升级到了4.8的版本后也一样(类似于Zookeeper的集群选举模式)

5. 刷盘模式介绍

1. SYNC_FLUSH(同步刷盘):

生产者发送的每一条消息都在保存到磁盘成功后才返回告诉生产者成功。这种方式不会存在消息丢失的问题,但是有很大的磁盘IO开销,性能有一定影响。

2. ASYNC_FLUSH(异步刷盘):

生产者发送的每一条消息并不是立即保存到磁盘,而是暂时缓存起来,然后就返回生产者成功。随后再异步的将缓存数据保存到磁盘,有两种情况:1是定期将缓存中更新的数据进行刷盘,2是当缓存中更新的数据条数达到某一设定值后进行刷盘。这种异步的方式会存在消息丢失(在还未来得及同步到磁盘的时候宕机),但是性能很好。默认是这种模式

6. 开始搭建双主双从异步复制+异步刷盘

集群规划情况如下

机器名

nameServer部署

broker部署

mq1

nameServer

broker-a和broker-b-s

mq2

nameServer

broker-b和broker-a-s

mq3

nameServer

1. 配置RocketMQ启动文件和配置文件(机器内存不够修改此配置可以正常启动)

在三台机器修改runserver.sh和runbroker.sh的启动内存

vim /opt/rocketmq/bin/runserver.sh,把NameServer启动内存从默认的4G调整为512M。

JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"

vim /opt/rocketmq/bin/runbroker.sh 把Broker的8G默认预设内存调整为如下:

JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m"

第一个节点配

vim /opt/rocketmq/conf/2m-2s-async/broker-a.properties 然后把下边的RocketMQ启动文件内容配置进去。

#所属集群名字 ,名字一样的节点就在同一个集群内
brokerClusterName=rocketmq-cluster
brokerIP1=mq1
brokerIP2=mq1
#broker名字 ,名字一样的节点就是一组主从节点。
brokerName=broker-a
#brokerid ,0就表示是Master,>0的都是表示 Slave
brokerId=0
#nameServer地址 ,分号分割
namesrvAddr=mq1:9876;mq2:9876;mq3:9876
#在发送消息时 ,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=10911
#删除文件时间点 ,默认凌晨 4点
deleteWhen=04
#文件保留时间 ,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条 ,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly =120000
#redeleteHangedFileInterval =120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/rocketmq/store
#commitLog 存储路径
storePathCommitLog=/rocketmq/store/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/rocketmq/store/consumequeue
#消息索引存储路径
storePathIndex=/rocketmq/store/index
#checkpoint 文件存储路径
storeCheckpoint=/rocketmq/store/checkpoint
#abort 文件存储路径
abortFile=/rocketmq/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages =4
#flushConsumeQueueLeastPages =2
#flushCommitLogThoroughInterval =10000
#flushConsumeQueueThoroughInterval =60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=ASYNC_MASTER
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable =false
#发消息线程池数量
#sendMessageThreadPoolNums =128
#拉消息线程池数量
#pullMessageThreadPoolNums =128

vim /opt/rocketmq/conf/2m-2s-async/broker-b-s.properties,把下边的内容配置进去。

#所属集群名字 ,名字一样的节点就在同一个集群内
brokerClusterName=rocketmq-cluster
brokerIP1=mq1
#broker名字 ,名字一样的节点就是一组主从节点。
brokerName=broker-b
#brokerid ,0就表示是Master,>0的都是表示 Slave
brokerId=100
#nameServer地址 ,分号分割
namesrvAddr=mq1:9876;mq2:9876;mq3:9876
#在发送消息时 ,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=11011
#删除文件时间点 ,默认凌晨 4点
deleteWhen=04
#文件保留时间 ,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条 ,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly =120000
#redeleteHangedFileInterval =120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/rocketmq/storeSlave
#commitLog 存储路径
storePathCommitLog=/rocketmq/storeSlave/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/rocketmq/storeSlave/consumequeue
#消息索引存储路径
storePathIndex=/rocketmq/storeSlave/index
#checkpoint 文件存储路径
storeCheckpoint=/rocketmq/storeSlave/checkpoint
#abort 文件存储路径
abortFile=/rocketmq/storeSlave/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages =4
#flushConsumeQueueLeastPages =2
#flushCommitLogThoroughInterval =10000
#flushConsumeQueueThoroughInterval =60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SLAVE
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable =false
#发消息线程池数量
#sendMessageThreadPoolNums =128
#拉消息线程池数量
#pullMessageThreadPoolNums =128

第二个节点配置

vim /opt/rocketmq/conf/2m-2s-async/broker-b.properties,把下边的内容配置进去。

#所属集群名字 ,名字一样的节点就在同一个集群内
brokerClusterName=rocketmq-cluster
brokerIP1=mq2
brokerIP2=mq2
#broker名字 ,名字一样的节点就是一组主从节点。
brokerName=broker-b
#brokerid ,0就表示是Master,>0的都是表示 Slave
brokerId=0
#nameServer地址 ,分号分割
namesrvAddr=mq1:9876;mq2:9876;mq3:9876
#在发送消息时 ,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=10911
#删除文件时间点 ,默认凌晨 4点
deleteWhen=04
#文件保留时间 ,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条 ,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly =120000
#redeleteHangedFileInterval =120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/rocketmq/store
#commitLog 存储路径
storePathCommitLog=/rocketmq/store/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/rocketmq/store/consumequeue
#消息索引存储路径
storePathIndex=/rocketmq/store/index
#checkpoint 文件存储路径
storeCheckpoint=/rocketmq/store/checkpoint
#abort 文件存储路径
abortFile=/rocketmq/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages =4
#flushConsumeQueueLeastPages =2
#flushCommitLogThoroughInterval =10000
#flushConsumeQueueThoroughInterval =60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=ASYNC_MASTER
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable =false
#发消息线程池数量
#sendMessageThreadPoolNums =128
#拉消息线程池数量
#pullMessageThreadPoolNums =128

vim /opt/rocketmq/conf/2m-2s-async/broker-a-s.properties,把下边的内容配置进去。

#所属集群名字 ,名字一样的节点就在同一个集群内
brokerClusterName=rocketmq-cluster
#暴露外网的IP
brokerIP1=mq2
#broker名字 ,名字一样的节点就是一组主从节点。
brokerName=broker-a
#brokerid ,0就表示是Master,>0的都是表示 Slave
brokerId=100
#nameServer地址 ,分号分割
namesrvAddr=mq1:9876;mq2:9876;mq3:9876
#在发送消息时 ,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=11011
#删除文件时间点 ,默认凌晨 4点
deleteWhen=04
#文件保留时间 ,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条 ,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly =120000
#redeleteHangedFileInterval =120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/rocketmq/storeSlave
#commitLog 存储路径
storePathCommitLog=/rocketmq/storeSlave/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/rocketmq/storeSlave/consumequeue
#消息索引存储路径
storePathIndex=/rocketmq/storeSlave/index
#checkpoint 文件存储路径
storeCheckpoint=/rocketmq/storeSlave/checkpoint
#abort 文件存储路径
abortFile=/rocketmq/storeSlave/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages =4
#flushConsumeQueueLeastPages =2
#flushCommitLogThoroughInterval =10000
#flushConsumeQueueThoroughInterval =60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SLAVE
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable =false
#发消息线程池数量
#sendMessageThreadPoolNums =128
#拉消息线程池数量
#pullMessageThreadPoolNums =128

这里对几个需要重点关注的属性,做下简单介绍:
brokerClusterName: 集群名。RocketMQ会将同一个局域网下所有brokerClusterName相同的服务自动组成一个集群,这个集群可以作为一个整体对外提供服务
brokerName: Broker服务名。同一个RocketMQ集群当中,brokerName相同的多个服务会有一套相同的数据副本。同一个RocketMQ集群中,是可以将消息分散存储到多个不同的brokerName服务上的。
brokerId: RocketMQ中对每个服务的唯一标识。RocketMQ对brokerId定义了一套简单的规则,master节点需要固定配置为0,负责响应客户端的请求。slave节点配置成其他任意数字,负责备份master上的消息。brokerRole: 服务的角色。这个属性有三个可选项:ASYNC_MASTER,SYNC_MASTER和SLAVE。其中,ASYNC_MASTER和SYNC_MASTER表示当前节点是master节点,目前暂时不用关心他们的区别。SLAVE则表示从节点。
namesrvAddr: nameserver服务的地址。nameserver服务默认占用9876端口。多个nameserver地址用;隔开

启动

先启动三个节点的nameserver,在三个节点分别执行

nohup sh mqnamesrv &

去/root/logs/rocketmqlogs目录下查看namesrv.log输出如下内容:

2023-12-14 15:10:57 INFO main - The Name Server boot success. serializeType=JSON

启动mq1和mq2节点的broker,先启动主节点再启动从节点。

启动broker-a的主从节点:

1. 执行 nohup sh mqbroker -c /opt/rocketmq/conf/2m-2s-async/broker-a.properties & 启动mq1的broker-a节点
去/root/logs/rocketmqlogs目录下查看broker.log输出如下内容:
2023-12-14 15:25:58 INFO main - The broker[broker-a, mq1:10911] boot success. serializeType=JSON and name server is mq1:9876;mq2:9876;mq3:9876
2. 执行 nohup sh mqbroker -c /opt/rocketmq/conf/2m-2s-async/broker-a-s.properties & 启动mq2的broker-a-s节点,去/root/logs/rocketmqlogs目录下查看broker.log输出如下内容:
2023-12-14 15:28:46 INFO main - The broker[broker-a, mq2:11011] boot success. serializeType=JSON and name server is mq1:9876;mq2:9876;mq3:9876

启动broker-b的主从节点:

1. 执行 nohup sh mqbroker -c /opt/rocketmq/conf/2m-2s-async/broker-b.properties & 启动mq2的broker-b节点,去/root/logs/rocketmqlogs目录下查看broker.log输出如下内容:
2023-12-14 15:31:55 INFO main - The broker[broker-b, mq2:10911] boot success. serializeType=JSON and name server is mq1:9876;mq2:9876;mq3:9876
2. 执行 nohup sh mqbroker -c /opt/rocketmq/conf/2m-2s-async/broker-b-s.properties & 启动mq1的broker-b-s节点,去/root/logs/rocketmqlogs目录下查看broker.log输出如下内容:
2023-12-14 15:34:07 INFO main - The broker[broker-b, mq1:11011] boot success. serializeType=JSON and name server is mq1:9876;mq2:9876;mq3:9876

两主两从启动需要新建以下文件目录:

/rocketmq/store
/rocketmq/store/commitlog
/rocketmq/store/consumequeue
/rocketmq/store/index
/rocketmq/store/checkpoint
/rocketmq/store/abort
/rocketmq/storeSlave
/rocketmq/storeSlave/commitlog
/rocketmq/storeSlave/consumequeue
/rocketmq/storeSlave/index
/rocketmq/storeSlave/checkpoint
/rocketmq/storeSlave/abort

在mq1,mq2,mq3分别执行jps看启动的节点:

[ root@localhost rocketmqlogs]# jps
2960 NamesrvStartup
3010 BrokerStartup
3239 Jps
3161 BrokerStartup
[ root@mq2 logs]# jps
3908 BrokerStartup
4036 Jps
3765 NamesrvStartup
3822 BrokerStartup
[ root@mq3 logs]# jps
3099 NamesrvStartup
3182 Jps

执行 mqadmin clusterlist 查看集群状态:

[ root@mq2 logs]# mqadmin clusterlist
RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.InternalThreadLocalMap).
RocketMQLog:WARN Please initialize the logger system properly.
#Cluster Name     #Broker Name            #BID  #Addr                  #Version                #InTPS (LOAD)       #OutTPS (LOAD) #PCWait (ms) #Hour #SPACE
rocketmq-cluster  broker-a                0     mq1:10911              V4_9_1                   0.00(0,0ms)         0.00(0,0ms)          0 50.07 0.1270
rocketmq-cluster  broker-a                100   mq2:11011              V4_9_1                   0.00(0,0ms)         0.00(0,0ms)          0 50.07 0.1603
rocketmq-cluster  broker-b                0     mq2:10911              V4_9_1                   0.00(0,0ms)         0.00(0,0ms)          0 49.97 0.1603
rocketmq-cluster  broker-b                100   mq1:11011              V4_9_1                   0.00(0,0ms)         0.00(0,0ms)          0 49.97 0.1270

进行启动后的测试

在第一个节点上使用示例代码 tools.sh org.apache.rocketmq.example.quickstart.Producer 使用发送消息。

消息发送成功会显示如下内容:

SendResult [sendStatus=SEND_OK, msgId=7F0000010CF91B6D35864651356A03DC, offsetMsgId=C0A8968C00002A9F0000000000104D42, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=3], queueOffset=1373]
SendResult [sendStatus=SEND_OK, msgId=7F0000010CF91B6D35864651356B03DD, offsetMsgId=C0A8968900002A9F00000000000D5008, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-b, queueId=0], queueOffset=1124]
SendResult [sendStatus=SEND_OK, msgId=7F0000010CF91B6D35864651356D03DE, offsetMsgId=C0A8968900002A9F00000000000D50CA, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-b, queueId=1], queueOffset=1123]
SendResult [sendStatus=SEND_OK, msgId=7F0000010CF91B6D35864651357003DF, offsetMsgId=C0A8968900002A9F00000000000D518C, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-b, queueId=2], queueOffset=1123]
SendResult [sendStatus=SEND_OK, msgId=7F0000010CF91B6D35864651357203E0, offsetMsgId=C0A8968900002A9F00000000000D524E, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-b, queueId=3], queueOffset=1124]
SendResult [sendStatus=SEND_OK, msgId=7F0000010CF91B6D35864651357403E1, offsetMsgId=C0A8968C00002A9F0000000000104E04, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=0], queueOffset=1374]
SendResult [sendStatus=SEND_OK, msgId=7F0000010CF91B6D35864651357603E2, offsetMsgId=C0A8968C00002A9F0000000000104EC6, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=1], queueOffset=1375]
SendResult [sendStatus=SEND_OK, msgId=7F0000010CF91B6D35864651357803E3, offsetMsgId=C0A8968C00002A9F0000000000104F88, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=2], queueOffset=1374]
SendResult [sendStatus=SEND_OK, msgId=7F0000010CF91B6D35864651357903E4, offsetMsgId=C0A8968C00002A9F000000000010504A, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=3], queueOffset=1374]
SendResult [sendStatus=SEND_OK, msgId=7F0000010CF91B6D35864651357B03E5, offsetMsgId=C0A8968900002A9F00000000000D5310, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-b, queueId=0], queueOffset=1125]
SendResult [sendStatus=SEND_OK, msgId=7F0000010CF91B6D35864651357E03E6, offsetMsgId=C0A8968900002A9F00000000000D53D2, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-b, queueId=1], queueOffset=1124]
SendResult [sendStatus=SEND_OK, msgId=7F0000010CF91B6D35864651358003E7, offsetMsgId=C0A8968900002A9F00000000000D5494, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-b, queueId=2], queueOffset=1124]
15:42:07.273 [NettyClientSelector_1] INFO  RocketmqRemoting - closeChannel: close the connection to remote address[192.168.150.140:11011] result: true
15:42:07.280 [NettyClientSelector_1] INFO  RocketmqRemoting - closeChannel: close the connection to remote address[192.168.150.137:10911] result: true
15:42:07.280 [NettyClientSelector_1] INFO  RocketmqRemoting - closeChannel: close the connection to remote address[192.168.150.140:10911] result: true
15:42:07.281 [NettyClientSelector_1] INFO  RocketmqRemoting - closeChannel: close the connection to remote address[192.168.150.137:11011] result: true
15:42:07.281 [NettyClientSelector_1] INFO  RocketmqRemoting - closeChannel: close the connection to remote address[192.168.150.138:9876] result: true

按下Ctrl+C按键,可以停止。

在第二个节点执行 tools.sh org.apache.rocketmq.example.quickstart.Consumer 显示下边类似的内容就是成功:

ConsumeMessageThread_8 Receive New Messages: [MessageExt [brokerName=broker-a, queueId=3, storeSize=194, queueOffset=1372, sysFlag=0, bornTimestamp=1702539727192, bornHost=/192.168.150.140:39784, storeTimestamp=1702539727192, storeHost=/192.168.150.140:10911, msgId=C0A8968C00002A9F0000000000104A3A, commitLogOffset=1067578, bodyCRC=1187919614, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1375, CONSUME_START_TIME=1702539742283, UNIQ_KEY=7F0000010CF91B6D35864651355803D4, CLUSTER=rocketmq-cluster, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 57, 56, 48], transactionId='null'}]] 
ConsumeMessageThread_6 Receive New Messages: [MessageExt [brokerName=broker-b, queueId=0, storeSize=194, queueOffset=1123, sysFlag=0, bornTimestamp=1702539727193, bornHost=/192.168.150.140:47668, storeTimestamp=1702539727176, storeHost=/192.168.150.137:10911, msgId=C0A8968900002A9F00000000000D4D00, commitLogOffset=871680, bodyCRC=835257960, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1126, CONSUME_START_TIME=1702539742477, UNIQ_KEY=7F0000010CF91B6D35864651355903D5, CLUSTER=rocketmq-cluster, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 57, 56, 49], transactionId='null'}]] 
ConsumeMessageThread_20 Receive New Messages: [MessageExt [brokerName=broker-b, queueId=0, storeSize=194, queueOffset=1121, sysFlag=0, bornTimestamp=1702539727161, bornHost=/192.168.150.140:47668, storeTimestamp=1702539727144, storeHost=/192.168.150.137:10911, msgId=C0A8968900002A9F00000000000D46F0, commitLogOffset=870128, bodyCRC=673705983, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1126, CONSUME_START_TIME=1702539742477, UNIQ_KEY=7F0000010CF91B6D35864651353903C5, CLUSTER=rocketmq-cluster, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 57, 54, 53], transactionId='null'}]] 
ConsumeMessageThread_15 Receive New Messages: [MessageExt [brokerName=broker-b, queueId=0, storeSize=194, queueOffset=1119, sysFlag=0, bornTimestamp=1702539727132, bornHost=/192.168.150.140:47668, storeTimestamp=1702539727114, storeHost=/192.168.150.137:10911, msgId=C0A8968900002A9F00000000000D40E0, commitLogOffset=868576, bodyCRC=329761110, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1126, CONSUME_START_TIME=1702539742477, UNIQ_KEY=7F0000010CF91B6D35864651351C03B5, CLUSTER=rocketmq-cluster, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 57, 52, 57], transactionId='null'}]] 
ConsumeMessageThread_1 Receive New Messages: [MessageExt [brokerName=broker-b, queueId=0, storeSize=194, queueOffset=1118, sysFlag=0, bornTimestamp=1702539727117, bornHost=/192.168.150.140:47668, storeTimestamp=1702539727100, storeHost=/192.168.150.137:10911, msgId=C0A8968900002A9F00000000000D3DD8, commitLogOffset=867800, bodyCRC=494684516, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1126, CONSUME_START_TIME=1702539742477, UNIQ_KEY=7F0000010CF91B6D35864651350D03AD, CLUSTER=rocketmq-cluster, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 57, 52, 49], transactionId='null'}]] 
ConsumeMessageThread_9 Receive New Messages: [MessageExt [brokerName=broker-b, queueId=0, storeSize=194, queueOffset=1108, sysFlag=0, bornTimestamp=1702539726944, bornHost=/192.168.150.140:47668, storeTimestamp=1702539726927, storeHost=/192.168.150.137:10911, msgId=C0A8968900002A9F00000000000D1F88, commitLogOffset=860040, bodyCRC=780681681, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1126, CONSUME_START_TIME=1702539742476, UNIQ_KEY=7F0000010CF91B6D358646513460035D, CLUSTER=rocketmq-cluster, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 56, 54, 49], transactionId='null'}]] 
ConsumeMessageThread_4 Receive New Messages: [MessageExt [brokerName=broker-b, queueId=0, storeSize=194, queueOffset=1104, sysFlag=0, bornTimestamp=1702539726875, bornHost=/192.168.150.140:47668, storeTimestamp=1702539726858, storeHost=/192.168.150.137:10911, msgId=C0A8968900002A9F00000000000D1368, commitLogOffset=856936, bodyCRC=1144982759, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1126, CONSUME_START_TIME=1702539742476, UNIQ_KEY=7F0000010CF91B6D35864651341B033D, CLUSTER=rocketmq-cluster, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 56, 50, 57], transactionId='null'}]] 
ConsumeMessageThread_2 Receive New Messages: [MessageExt [brokerName=broker-b, queueId=0, storeSize=194, queueOffset=1101, sysFlag=0, bornTimestamp=1702539726815, bornHost=/192.168.150.140:47668, storeTimestamp=1702539726798, storeHost=/192.168.150.137:10911, msgId=C0A8968900002A9F00000000000D0A50, commitLogOffset=854608, bodyCRC=2143232590, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1126, CONSUME_START_TIME=1702539742476, UNIQ_KEY=7F0000010CF91B6D3586465133DF0325, CLUSTER=rocketmq-cluster, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 56, 48, 53], transactionId='null'}]] 
ConsumeMessageThread_18 Receive New Messages: [MessageExt [brokerName=broker-b, queueId=0, storeSize=194, queueOffset=1090, sysFlag=0, bornTimestamp=1702539726540, bornHost=/192.168.150.140:47668, storeTimestamp=1702539726523, storeHost=/192.168.150.137:10911, msgId=C0A8968900002A9F00000000000CE8F8, commitLogOffset=846072, bodyCRC=66467102, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1126, CONSUME_START_TIME=1702539742475, UNIQ_KEY=7F0000010CF91B6D3586465132CC02CD, CLUSTER=rocketmq-cluster, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 55, 49, 55], transactionId='null'}]] 
ConsumeMessageThread_12 Receive New Messages: [MessageExt [brokerName=broker-b, queueId=0, storeSize=194, queueOffset=1078, sysFlag=0, bornTimestamp=1702539726293, bornHost=/192.168.150.140:47668, storeTimestamp=1702539726277, storeHost=/192.168.150.137:10911, msgId=C0A8968900002A9F00000000000CC498, commitLogOffset=836760, bodyCRC=1081780703, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1126, CONSUME_START_TIME=1702539742475, UNIQ_KEY=7F0000010CF91B6D3586465131D5026D, CLUSTER=rocketmq-cluster, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 54, 50, 49], transactionId='null'}]] 
ConsumeMessageThread_17 Receive New Messages: [MessageExt [brokerName=broker-b, queueId=1, storeSize=194, queueOffset=1118, sysFlag=0, bornTimestamp=1702539727133, bornHost=/192.168.150.140:47668, storeTimestamp=1702539727117, storeHost=/192.168.150.137:10911, msgId=C0A8968900002A9F00000000000D41A2, commitLogOffset=868770, bodyCRC=1935689907, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1125, CONSUME_START_TIME=1702539742472, UNIQ_KEY=7F0000010CF91B6D35864651351D03B6, CLUSTER=rocketmq-cluster, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 57, 53, 48], transactionId='null'}]] 

```