Skip to content

RocketMQ快速实战 🚀

RocketMQ 是阿里巴巴开源的一个消息中间件,在阿里内部历经了双十一等很多高并发场景的考验,能够处理亿万级别的消息。2016年开源后捐赠给Apache,现在是Apache的一个顶级项目。

目前RocketMQ在阿里云上有一个购买即可用的商业版本,商业版本集成了阿里内部一些更深层次的功能及运维定制。我们这里学习的是Apache的开源版本。开源版本相对于阿里云上的商业版本,功能上略有缺失,但是大体上功能是一样的。当前最新的版本是4.7.1。我们就用这个4.7.1版本来进行学习。

1、下载RocketMQ 4.7.1版本 🚀

RocketMQ运行版本下载地址: https://www.apache.org/dyn/closer.cgi?path=rocketmq/4.7.1/rocketmq-all-4.7.1-bin-release.zip

RocketMQ源码版本下载地址: https://www.apache.org/dyn/closer.cgi?path=rocketmq/4.7.1/rocketmq-all-4.7.1-source-release.zip

这两个版本我们都下载下来。

2、快速安装RocketMQ 🚀

RocketMQ的安装非常简单,就是上传解压就可以了。

然后我们准备一台CentOS7的Linux机器,快速把RocketMQ给运行起来。我使用的Linux版本如下:

shell
[oper@worker1 jdk1.8]$ uname -a
Linux worker1 3.10.0-1127.el7.x86_64 #1 SMP Tue Mar 31 23:36:51 UTC 2020 x86_64 x86_64 x86_64 GNU/Linux

我们需要创建一个操作用户用来运行自己的程序,与root用户区分开。使用root用户创建一个oper用户,并给他创建一个工作目录。

shell
[root@worker1 ~]# useradd oper
[root@worker1 ~]# passwd oper 
设置用户密码
[root@worker1 ~]# mkdir /app
[root@worker1 ~]# chown oper:oper /app

运行RocketMQ需要先安装JDK。我们采用目前最稳定的JDK1.8版本。CentOS可以采用课件资料中的jdk-8u171-linux-x64.tar.gz,也可以自行去Oracle官网上下载。然后用FTP上传到oper用户的工作目录下。由oper用户解压到/app/jdk1.8目录下。

shell
[oper@worker1 tools]$ tar -zxvf jdk-8u171-linux-x64.tar.gz
[oper@worker1 tools]$ mv jdk1.8.0_171/ /app/jdk1.8

配置环境变量。使用 vi ~/.bash_profile编辑文件,在下面加入以下内容:

shell
export JAVA_HOME=/app/jdk1.8/
PATH=$JAVA_HOME/bin:$PATH:$HOME/.local/bin:$HOME/bin
export PATH

编辑完成后,执行 source ~/.bash_profile让环境变量生效。输入java -version能查看到以下内容表明JDK安装成功了。

shell
[oper@worker1 ~]$ java -version
java version "1.8.0_171"
Java(TM) SE Runtime Environment (build 1.8.0_171-b11)
Java HotSpot(TM) 64-Bit Server VM (build 25.171-b11, mixed mode)

然后我们把下载的rocketmq-all-4.7.1-bin-release.zip在本地完成解压,并上传到/app/rocketmq目录。完成后,把rocketmq的bin目录也配置到环境变量当中。 vi ~/.bash_profile,加入以下内容,并执行source ~/.bash_profile让环境变量生效:

shell
export JAVA_HOME=/app/jdk1.8/
export ROCKETMQ_HOME=/app/rocketmq/rocketmq-all-4.7.1-bin-release
PATH=$ROCKETMQ_HOME/bin:$JAVA_HOME/bin:$PATH:$HOME/.local/bin:$HOME/bin
export PATH

这样RocketMQ就安装完成了,我们把他运行起来。

这个ROCKETMQ_HOME的环境变量是必须要单独配置的,如果不配置的话,启动NameSever和Broker都会报错。

这个环境变量的作用是用来加载$ROCKETMQ_HOME/conf下的除broker.conf以外的几个配置文件。所以实际情况中,可以不按这个配置,但是一定要能找到配置文件。

3、 快速运行RocketMQ 🚀

运行之前,我们需要对RocketMQ的组件结构有个大致的了解。

RocketMQ组件

RocketMQ由以下这几个组件组成

  • NameServer : 提供轻量级的Broker路由服务。
  • Broker:实际处理消息存储、转发等服务的核心组件。
  • Producer:消息生产者集群。通常是业务系统中的一个功能模块。
  • Consumer:消息消费者集群。通常也是业务系统中的一个功能模块。

所以我们要启动RocketMQ服务,需要先启动NameServer。

3.1 启动NameServer 🚀

启动 NameServer 非常简单, 在 $ROCKETMQ_HOME/bin 目录下有个 mqnamesrv。直接执行这个脚本就可以启动 RocketMQ 的 NameServer 服务。

但是要注意,RocketMQ 默认预设的 JVM 内存是 4G,这是RocketMQ给我们的最佳配置。但是通常我们用虚拟机的话都是不够 4G 内存的,所以需要调整下 JVM内存大小。修改的方式是直接修改 runserver.sh。 用 vi runserver.sh 编辑这个脚本,在脚本中找到这一行调整内存大小为 512M

shell
JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m -
XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"

然后我们用静默启动的方式启动NameServer服务:

shell
nohup bin/mqnamesrv &

启动完成后,在 nohup.out 看到这一条关键日志就是启动成功了。并且使用jps指令可以看到有一个NamesrvStartup进程。

Java HotSpot(TM) 64-Bit Server VM warning: Using the DefNew young collector with the CMS
collector is deprecated and will likely be removed in a future release
Java HotSpot(TM) 64-Bit Server VM warning: UseCMSCompactAtFullCollection is deprecated and
will likely be removed in a future release.
The Name Server boot success. serializeType=JSON

3.2 启动Broker 🚀

启动Broker的脚本是 runbroker.sh 。Broker 的默认预设内存是 8G ,启动前,如果内存不够,同样需要调整下JVM内存。vi runbroker.sh,找到这一行,进行内存调整:

shell
JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m"

然后我们需要找到 $ROCKETMQ_HOME/conf/broker.confvi 指令进行编辑,在最下面加入一个配置:

shell
autoCreateTopicEnable=true

然后也以静默启动的方式启动 runbroker.sh

shell
nohup ./mqbroker &

启动完成后,同样是检查 nohup.out 日志,有这一条关键日志就标识启动成功了。 并且 jps 指令可以看到一个 BrokerStartup 进程。

The broker[worker1, 192.168.232.128:10911] boot success. serializeType=JSON

在观察runserver.sh和runbroker.sh时,我们还可以查看到其他的JVM执行参数,这些参数都可以进行定制。

例如我们观察到一个比较有意思的地方,nameServer 使用的是 CMS 垃圾回收器,而Broker使用的是 G1 垃圾回收器。 关于垃圾回收器的知识你还记得吗?

3.3 命令行快速验证 🚀

在RocketMQ的安装包中,提供了一个 tools.sh 工具可以用来在命令行快速验证RocketMQ服务。我们在worker2上进入RocketMQ的安装目录:

首先需要配置一个环境变量NAMESRV_ADDR指向我们启动的NameServer服务。

shell
export NAMESRV_ADDR='localhost:9876'

然后启动消息生产者发送消息:默认会发1000条消息

shell
bin/tools.sh org.apache.rocketmq.example.quickstart.Producer

我们可以看到发送消息的日志:

.....
SendResult [sendStatus=SEND_OK, msgId=C0A8E88007AC3764951D891CE9A003E7, offsetMsgId=C0A8E88000002A9F00000000000317BF, messageQueue=MessageQueue [topic=TopicTest, brokerName=worker1, queueId=1], queueOffset=249]
14:59:33.418 [NettyClientSelector_1] INFO  RocketmqRemoting - closeChannel: close the connection to remote address[127.0.0.1:9876] result: true
14:59:33.423 [NettyClientSelector_1] INFO  RocketmqRemoting - closeChannel: close the connection to remote address[192.168.232.128:10911] result: true

这日志中,上面部分就是我们发送的消息的内容。后面两句标识消息生产者正常关闭。

然后启动消息消费者接收消息:

shell
bin/tools.sh  org.apache.rocketmq.example.quickstart.Consumer

启动后,可以看到消费到的消息。

......
ConsumeMessageThread_19 Receive New Messages: [MessageExt [brokerName=worker1, queueId=2, storeSize=203, queueOffset=53, sysFlag=0, bornTimestamp=1606460371999, bornHost=/192.168.232.128:43436, storeTimestamp=1606460372000, storeHost=/192.168.232.128:10911, msgId=C0A8E88000002A9F000000000000A7AE, commitLogOffset=42926, bodyCRC=1968636794, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=250, CONSUME_START_TIME=1606460450150, UNIQ_KEY=C0A8E88007AC3764951D891CE41F00D4, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 50, 49, 50], transactionId='null'}]]

日志中MessageExt后的整个内容就是一条完整的RocketMQ消息。我们要对这个消息的结构有个大概的了解,后面会对这个消息进行深入的理解。

其中比较关键的属性有:brokerName,queueId,msgId,topic,cluster,tags,body,transactionId。先找下这些属性在哪里。

而这个Consume指令并不会结束,他会继续挂起,等待消费其他的消息。我们可以使用CTRL+C停止该进程。

3.4 关闭RocketMQ服务 🚀

要关闭RocketMQ服务可以通过mqshutdown脚本直接关闭

Shell
# 1.关闭NameServer
sh bin/mqshutdown namesrv
# 2.关闭Broker
sh bin/mqshutdown broker