rocketmq安装使用 - wtdig/study GitHub Wiki
一、rocketmq安装
1、环境要求
jdk 1.8
maven
git
64位系统
git安装:
# 安装git
$ yum install git
# 验证git安装成功
$ git --version
# 这样git即可安装成功
maven安装:
#在/usr下新建maven文件夹
cd /usr
mkdir maven
cd maven
# 下载maven:http://maven.apache.org/download.cgi
wget http://mirrors.shu.edu.cn/apache/maven/maven-3/3.5.3/binaries/apache-maven-3.5.3-bin.tar.gz
# 解压
tar -zxvf apache-maven-3.5.3-bin.tar.gz
# 进入更目录修正环境变量
vim etc/profile
# 在尾部添加环境变量如下
# maven
export MAVEN_HMOE=/usr/maven/apache-maven-3.5.3
export PATH=$PATH:$MAVEN_HMOE/bin
source etc/profile
# 验证maven安装成功
mvn -version
没有生效的化,可以重启服务器
2、安装rocketmq
#在usr/local下新建rocketmq文件
cd /usr/local
mkdir rocketmq
cd rocketmq
#下载rocketmq安装包
wget http://mirrors.shu.edu.cn/apache/rocketmq/4.2.0/rocketmq-all-4.2.0-source-release.zip
#解压rocketmq
unzip rocketmq-all-4.2.0-source-release.zip
#构建rocketmq
cd rocketmq-all-4.2.0/
mvn -Prelease-all -DskipTests clean install -U
3、启动rocketmq
cd distribution/target/apache-rocketmq
#启动Name Server
nohup sh bin/mqnamesrv &
tail -f ~/logs/rocketmqlogs/namesrv.log
The Name Server boot success...
#启动borker
nohup sh bin/mqbroker -n localhost:9876 &
tail -f ~/logs/rocketmqlogs/broker.log
The broker[%s, 172.30.30.233:10911] boot success...
注意:其中~的路径,如果是root用户,指定是root目录下,
启动Name Server、borker要先修改下配置文件,否则会报内层不足
修改distribution/target/apache-rocketmq/bin目录下的runserver.sh和runbroker.sh文件
runserver.sh:
由
JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
改为
JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn126m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
runbroker.sh:
由
JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g"
改为
JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn126m"
4、关闭rocketmq
切换至distribution/target/apache-rocketmq/bin目录
停止name server命令:
./mqshutdown namesrv
停止broker命令:
./mqshutdown broker
5、搭建rocketmq的控制台
1、下载控制台代码:https://github.com/apache/rocketmq-externals
2、进入rocketmq-externals\rocketmq-console\target\classes目录下,修改application.properties文件
namesrvAddr为安装rocketmq的ip地址
rocketmq.config.namesrvAddr=45.78.9.159:9876
#if you use rocketmq version < 3.5.8, rocketmq.config.isVIPChannel should be false.default true
rocketmq.config.isVIPChannel=true
3、在window下,在rocketmq-console目录下,使用cmd命令,进入dos命令中,输入mvn clean package -Dmaven.test.skip=true,打包完成后会在rocketmq-externals\rocketmq-console\target目录下生成rocketmq-console-ng-1.0.0.jar
4、新建一个文件夹rocketmq-console,将rocketmq-console-ng-1.0.0.jar复制进入,新建一个start.bat文件,内容如下:
@echo off
java -jar rocketmq-console-ng-1.0.0.jar --server.port=12581 --rocketmq.config.namesrvAddr=45.78.9.159:9876
@pause
5、双击start.bat,等待启动完成即可(rocketmq-console是使用spring-boot的,启动使用内嵌的jetty服务器)
6、打开浏览器,输入127.0.0.1:12581即可进入网页控制台
二、rocketmq使用
rocketmq双master模式集群搭建
总的规划master1(192.168.2.50服务器)\master2(192.168.2.51服务器)
1、修改hosts文件,2台服务器都修改
vim /etc/hosts
添加如下信息:
192.168.2.50 rocketmq-nameserver1
192.168.2.50 rocketmq-master1
192.168.2.51 rocketmq-nameserver2
192.168.2.51 rocketmq-master2
2、修改完成之后,重启网络,进行测试
service network restart
master1服务器上:
ping rocketmq-nameserver2
ping rocketmq-master2
master2服务器上:
ping rocketmq-nameserver1
ping rocketmq-master1
可以ping通即可
3、2台机器上安装rocketmq(参考rocketmq的安装)
注意:安装过程中,如果程序无法解析该host(couldn't resolve host 'mirrorlist.centos.org)
解决办法:vim /etc/resolv.conf,在后面添加:
nameserver 8.8.8.8
nameserver 8.8.4.4
然后 service network restart 重启网卡
4、配置2台机器的rocketmq配置文件
1)分别在2台机器的/usr/wt/rocketmq/rocketmq-all-4.2.0/distribution/target/apache-rocketmq目录下新建如下目录:
store文件夹
store下面新建commitlog\consumequeue\index3个文件夹
2)2台机器同样修改broker-a.properties、broker-b.properties文件
修改/usr/wt/rocketmq/rocketmq-all-4.2.0/distribution/target/apache-rocketmq/conf/2m-noslave/broker-a.properties文件,如下:
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-a
#0 表示 Master,>0 表示 Slave
brokerId=0
#nameServer地址,分号分割
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=10911
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/wt/rocketmq/rocketmq-all-4.2.0/distribution/target/apache-rocketmq/store
#commitLog 存储路径
storePathCommitLog=/usr/wt/rocketmq/rocketmq-all-4.2.0/distribution/target/apache-rocketmq/store/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/wt/rocketmq/rocketmq-all-4.2.0/distribution/target/apache-rocketmq/store/consumequeue
#消息索引存储路径
storePathIndex=/usr/wt/rocketmq/rocketmq-all-4.2.0/distribution/target/apache-rocketmq/store/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/wt/rocketmq/rocketmq-all-4.2.0/distribution/target/apache-rocketmq/store/checkpoint
#abort 文件存储路径
abortFile=/usr/wt/rocketmq/rocketmq-all-4.2.0/distribution/target/apache-rocketmq/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=ASYNC_MASTER
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128
修改/usr/wt/rocketmq/rocketmq-all-4.2.0/distribution/target/apache-rocketmq/conf/2m-noslave/broker-a.properties文件,如下:
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-b
#0 表示 Master,>0 表示 Slave
brokerId=0
#nameServer地址,分号分割
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=10911
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/wt/rocketmq/rocketmq-all-4.2.0/distribution/target/apache-rocketmq/store
#commitLog 存储路径
storePathCommitLog=/usr/wt/rocketmq/rocketmq-all-4.2.0/distribution/target/apache-rocketmq/store/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/wt/rocketmq/rocketmq-all-4.2.0/distribution/target/apache-rocketmq/store/consumequeue
#消息索引存储路径
storePathIndex=/usr/wt/rocketmq/rocketmq-all-4.2.0/distribution/target/apache-rocketmq/store/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/wt/rocketmq/rocketmq-all-4.2.0/distribution/target/apache-rocketmq/store/checkpoint
#abort 文件存储路径
abortFile=/usr/wt/rocketmq/rocketmq-all-4.2.0/distribution/target/apache-rocketmq/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=ASYNC_MASTER
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128
3)2台一样修改;
修改/usr/wt/rocketmq/rocketmq-all-4.2.0/distribution/target/apache-rocketmq/bin/runbroker.sh
主要配置jvm参数:JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn512m"
修改/usr/wt/rocketmq/rocketmq-all-4.2.0/distribution/target/apache-rocketmq/bin/runserver.sh
主要配置jvm参数:JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn512m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
4)启动2台机器
先启动nameserver服务,在启动broker服务
cd /usr/wt/rocketmq/rocketmq-all-4.2.0/distribution/target/apache-rocketmq/bin
启动master1服务器的nameserver:
nohup sh mqnamesrv &
启动master2服务器的nameserver:
nohup sh mqnamesrv &
启动master1服务器的broker:
nohup sh mqbroker -c /usr/wt/rocketmq/rocketmq-all-4.2.0/distribution/target/apache-rocketmq/conf/2m-noslave/broker-a.properties >/dev/null 2>&1&
启动master1服务器的broker:
nohup sh mqbroker -c /usr/wt/rocketmq/rocketmq-all-4.2.0/distribution/target/apache-rocketmq/conf/2m-noslave/broker-b.properties >/dev/null 2>&1&
使用jps查看2台机器启动状态:
1957 NamesrvStartup
1981 BrokerStartup
2815 Jps
关机时,先关闭broker,在关闭nameserver服务
5、启动控制台,查看集群效果
修改配置文件:
rocketmq.config.namesrvAddr=192.168.2.50:9876;192.168.2.51:9876
start.bat文件内容如下:
@echo off
java -jar rocketmq-console-ng-1.0.0.jar --server.port=12581 --rocketmq.config.namesrvAddr=192.168.2.50:9876;192.168.2.51:9876
@pause
注意:如果控制台下,查看不到集群信息,重新按照顺序进行启动rocketmq
6、下载rocketmq的示例程序进行测试
git clone https://github.com/apache/rocketmq.git
下载代码,导入idea中,在example包下,有Producer、Consumer2个测试类
在生产者和消费者2个测试类上加上如下代码:
producer.setNamesrvAddr("192.168.2.51:9876;192.168.2.50:9876");
consumer.setNamesrvAddr("192.168.2.50:9876;192.168.2.51:9876");
在生产消息的类启动时,控制台可以观察到生产的消息自动复制均衡到broker-a\broker-b2台集群上了
rocketmq双master双slave模式集群搭建(50主节点、52从节点;51主节点、53从节点)
1、步骤和前面的双master模式搭建一样,需要修改的点如下:
在vim /etc/hosts
添加如下信息:
192.168.2.50 rocketmq-nameserver1
192.168.2.50 rocketmq-master1
192.168.2.51 rocketmq-nameserver2
192.168.2.51 rocketmq-master2
192.168.2.52 rocketmq-nameserver3
192.168.2.52 rocketmq-master1-slave
192.168.2.53 rocketmq-nameserver4
192.168.2.53 rocketmq-master2-slave
2、修改配置文件:
/usr/local/rocketmq/rocketmq-all-4.2.0/distribution/target/apache-rocketmq/conf
下的2m-2s-sync文件夹是2主2从的同步双写配置文件
下的2m-2s-async文件夹是2主2从的异步复制配置文件
本次采用异步复制的配置文件:
修改2m-2s-async下的broker-a-s.properties文件(从服务器的配置文件)
brokerName=broker-a 名称与主服务器名称一致
#0 表示 Master,>0 表示 Slave
brokerId=1
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876;rocketmq-nameserver3:9876;rocketmq-nameserver4:9876
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SLAVE
修改2m-2s-async下的broker-b-s.properties文件(从服务器的配置文件)
brokerName=broker-b 名称与主服务器名称一致
#0 表示 Master,>0 表示 Slave
brokerId=1
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876;rocketmq-nameserver3:9876;rocketmq-nameserver4:9876
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SLAVE
1、roketmq术语
Producer
消息生产者,负责产生消息,一般由业务系统负责产生消息。
Consumer
消息消费者,负责消费消息,一般是后台系统负责异步消费。
Push Consumer
Consumer 的一种,应用通常向 Consumer 对象注册一个 Listener 接口,一旦收到消息,Consumer 对象立刻回调 Listener 接口方法。
Pull Consumer
Consumer 的一种,应用通常主动调用 Consumer 的拉消息方法从 Broker 拉消息,主动权由应用控制。
Producer Group
一类 Producer 的集合名称,这类 Producer 通常发送一类消息,且发送逻辑一致。
Consumer Group
一类 Consumer 的集合名称,这类 Consumer 通常消费一类消息,且消费逻辑一致。
Broker
消息中转角色,负责存储消息,转发消息,一般也称为 Server。在 JMS 规范中称为 Provider。
2、rocketmq的发送消息的三种方式
RocketMQ 以三种方式发送消息:可靠的同步,可靠的异步和单向传输
1)同步
producer.send(msg);
2)异步
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.printf("%-10d OK %s %n", index,
sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
System.out.printf("%-10d Exception %s %n", index, e);
e.printStackTrace();
}
});
3)单向传输
producer.sendOneway(msg);