rabbitmq - TuPengXiong/TuPengXiong.github.io GitHub Wiki

rabbitmq

rabbitmq-流程

docker 安装

  • yum 更新
 sudo yum update
  • Add the yum repo
sudo tee /etc/yum.repos.d/docker.repo <<-'EOF'
[dockerrepo]
name=Docker Repository
baseurl=https://yum.dockerproject.org/repo/main/centos/7/
enabled=1
gpgcheck=1
gpgkey=https://yum.dockerproject.org/gpg
EOF
  • Install the Docker package
sudo yum install docker-engine
  • 启动 docker service.
sudo systemctl enable docker.service
  • 启动 docker
sudo systemctl start docker

docker rabbitmq mqtt 服务

# 下载 rabbitmq 镜像 端口映射 fork 运行
docker run -itd -p 8883:8883 -p 1883:1883 -p 5671:5671 -p 5672:5672 -p 15672:15672 -p 15675:15675 -p 4369:4369 -p 25672:25672 docker.io/rabbitmq
  • docker ps
1b7b52b7fb40        docker.io/rabbitmq   "docker-entrypoint.sh"   2 minutes ago       Up 2 minutes        0.0.0.0:1883->1883/tcp, 0.0.0.0:4369->4369/tcp, 0.0.0.0:5672->5672/tcp, 0.0.0.0:8883->8883/tcp, 0.0.0.0:25672->25672/tcp, 0.0.0.0:5617->5671/tcp  ... sad_kare
  • 加载rabbitmq的mqtt插件 1b7b52b7fb40 为容器id 默认端口号为 1883 ssl:8883
docker exec -it  1b7b52b7fb40 rabbitmq-plugins enable rabbitmq_mqtt
  • 加载rabbitmq的rabbitmq_management插件 1b7b52b7fb40 为容器id 默认端口号为 15672
docker exec -it  1b7b52b7fb40 rabbitmq-plugins enable rabbitmq_management
  • 加载 rabbitmq_web_mqtt 插件 js webSocket使用 默认端口 15675
  1. 由于rabbitmq_web_mqtt 插件 在rabbitmq中不存在,我们需要手动将其资源添加到 rabbitmq_server-version/plugins (depending on where it was installed) 的位置

  2. 在docker中进入相应的容器

docker exec -it  1b7b52b7fb40 /bin/bash
cd  /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/plugins

插件的下载地址:https://bintray.com/rabbitmq/community-plugins/rabbitmq_web_mqtt/v3.6.x#files

apt-get update

apt-get install wget

下载 插件 
wget https://bintray.com/rabbitmq/community-plugins/download_file?file_path=rabbitmq_web_mqtt-3.6.x-14dae543.ez
如果下载的文件名不是 rabbitmq_web_mqtt-3.6.x-14dae543.ez 将其改名为 rabbitmq_web_mqtt-3.6.x-14dae543.ez

下载完成之后 ctrl + c /  ctrl + d退出

保存刚才在容器中的操作 并将tag值设为 rabbitmq
docker commit 1b7b52b7fb40 rabbitmq

杀掉容器 进程 防止端口冲突
docker kill 1b7b52b7fb40

启动 rabbitmq(commit的tag值) 而不是 docker.io/rabbitmq
docker run -itd -p 8883:8883 -p 1883:1883 -p 5671:5671 -p 5672:5672 -p 15672:15672 -p 15675:15675 -p 4369:4369 -p 25672:25672 rabbitmq

找到启动的容器id
docker ps 
# 假设 容器id为 7a189affa24c 查看插件列表
docker exec -it 7a189affa24c rabbitmq-plugins list

# 假设 容器id为 7a189affa24 启用 rabbitmq_web_mqt插件
docker exec -it 7a189affa24c rabbitmq-plugins enable rabbitmq_web_mqtt

#访问 rabbitmq 管理 http:localhost:15672 用户名:密码 guest:guest

设置默认的用户名和密码 和默认的vhost

docker run -itd -p 8883:8883 -p 1883:1883 -p 5671:5671 -p 5672:5672 -p 15672:15672 -p 15675:15675 -p 4369:4369 -p 25672:25672 -e RABBITMQ_DEFAULT_USER=user -e RABBITMQ_DEFAULT_PASS=password -e RABBITMQ_DEFAULT_VHOST=my_vhost       rabbitmq

java client 使用 eclipse.paho 测试

  • jar 包依赖
<dependency>
 <groupId>org.eclipse.paho</groupId>
 <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
 <version>1.1.0</version>
</dependency>

eclipse.paho测试代码

package tpx.rabbitmq;

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

/**
 * 发布/订阅
 * 
 * @author tpx
 *
 */
public class RabbitMQMqttPaho {

	// paho -> RabbotMQ Server
	public static void main(String[] args) {

		String topic = "World";
		String content = "Message from RabbitMQMqttPaho";
		int qos = 0;
		// String broker = "tcp://test13.kingsilk.xyz:1883";
		String broker = "tcp://test13.kingsilk.xyz:1883";
		String clientId = "tpx";
		MemoryPersistence persistence = new MemoryPersistence();

		try {
			MqttClient sampleClient = new MqttClient(broker, clientId,
					persistence);
			MqttConnectOptions connOpts = new MqttConnectOptions();
			connOpts.setCleanSession(true);
			System.out.println("Connecting to broker: " + broker);
			sampleClient.connect(connOpts);
			System.out.println("Connected");
			System.out.println("Publishing message: " + content);

			
			//订阅
			sampleClient.subscribe("World");

			sampleClient.setCallback(new MqttCallback() {
				public void messageArrived(String topic, MqttMessage message)
						throws Exception {
			
					System.out.println("messageArrived:"
							+ new String(message.getPayload()));
				}

				// FIXME: 这个的具体含义是?
				public void deliveryComplete(IMqttDeliveryToken token) {
					System.out.println("deliveryComplete:"
							+ token.getMessageId());
				}

				public void connectionLost(Throwable cause) {
					System.out.println("connectionLost:" + cause);
				}
			});
			
			
			// 发送100次
			for (int i = 0; i < 1; i++) {
				MqttMessage message = new MqttMessage((content + i).getBytes());
				message.setQos(qos);
				
				//发布消息
				sampleClient.publish(topic, message);
				System.out.println("Message published");
			}
			
			// sampleClient.disconnect();
			// System.out.println("Disconnected");
			// System.exit(0);
		} catch (MqttException me) {
			System.out.println("reason " + me.getReasonCode());
			System.out.println("msg " + me.getMessage());
			System.out.println("loc " + me.getLocalizedMessage());
			System.out.println("cause " + me.getCause());
			System.out.println("excep " + me);
			me.printStackTrace();
		}
	}
}

RabbitmqAMQP测试代码

加载maven依赖

<dependency>
			<groupId>com.rabbitmq</groupId>
			<artifactId>amqp-client</artifactId>
			<version>3.6.5</version>
		</dependency>

#client.java 通用

package tpx.rabbitmq;

import java.io.IOException;
import java.net.URISyntaxException;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
 * 建立链接和创建CHANNAL
 * @author tpx
 *
 */
public class Client {

	private String userName;
	private String password;
	private String host;
	private String virtualHost;
	private int port;
	
	private Connection connection;
	private Channel channel;
	public static ConnectionFactory factory = new ConnectionFactory();

	public Client(String host, int port, String userName,
			String password, String virtualHost){
		this.host = host;
		this.password = password;
		this.userName = userName;
		this.port = port;
		this.virtualHost = virtualHost;
	}

	/**
	 * 获取与mq server的连接
	 * 
	 * @param host
	 * @param port
	 * @param userName
	 * @param password
	 * @param virtualHost
	 * @return
	 * @throws TimeoutException 
	 * @throws IOException 
	 */
	public Connection getConnection() throws IOException, TimeoutException {
		if (userName != null) {
			factory.setUsername(userName);
		}
		if (password != null) {
			factory.setPassword(password);
		}
		if (virtualHost != null) {
			factory.setVirtualHost(virtualHost);
		}
		factory.setHost(host);
		factory.setPort(port);
		factory.setRequestedHeartbeat(60); //设置心跳测试 单位s
		factory.setAutomaticRecoveryEnabled(true); //自动连接
		connection = factory.newConnection(); //获取链接
		return connection;
	}

	/**
	 * 获取与mq server的连接
	 * 
	 * @param uri
	 * @return
	 * @throws TimeoutException
	 * @throws IOException
	 * @throws URISyntaxException
	 * @throws NoSuchAlgorithmException
	 * @throws KeyManagementException
	 */
	public Connection getConnection(String uri) throws IOException,
			TimeoutException, KeyManagementException, NoSuchAlgorithmException,
			URISyntaxException {
		// factory.setUri("amqp://userName:password@hostName:portNumber/virtualHost");
		factory.setUri(uri);
		connection = factory.newConnection();
		return connection;
	}

	/**
	 * 创建Channel
	 * @return
	 * @throws IOException
	 */
	public Channel createChannel() throws IOException {
		  channel = connection.createChannel();
		  return channel;
	}

	/**
	 * 关闭连接
	 */
	public void closeConnection() {		
		if (channel != null) {
			try {
				channel.close();
			} catch (IOException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			} catch (TimeoutException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}
		
		if (connection != null) {
			try {
				connection.close();
			} catch (IOException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}
	}
	
	
	public static void main(String[] args) {
		Client client = new Client("192.168.0.13", 5672, null,
				null, null);
		try {
		Connection connection = client.getConnection();
		//	Connection connection = client.getConnection("amqps://guest:[email protected]:5671");
			System.out.println("connectioned");
			Channel channel = connection.createChannel(10086);
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} catch (TimeoutException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}

		//client.closeConnection();
	}
}

RabbitmqAMQP.java 文件 测试

package tpx.rabbitmq;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.AMQP.Queue.BindOk;
import com.rabbitmq.client.AMQP.Queue.DeclareOk;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

/**
 * AMQP 测试
 * 
 * @author tpx
 *
 */
public class RabbitmqAMQP {

	private String exchangeName;
	private String routingKey;
	private Channel channel;

	public RabbitmqAMQP(Channel channel, String exchangeName, String routingKey)
			throws IOException {
		this.channel = channel;
		this.exchangeName = exchangeName;
		this.routingKey = routingKey;
	}

	/**
	 * 发布信息
	 * 
	 * @param data
	 * @throws IOException
	 */
	public void publish(String data) throws IOException {
		byte[] messageBodyBytes = data.getBytes();
		channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);
	}

	/**
	 * 创建Exchange
	 * 
	 * @return
	 * @throws IOException
	 */
	private BindOk createExchange(String queueName) throws IOException {
		//申明 exchange  类型是 topic  持久化
		channel.exchangeDeclare(exchangeName, "topic", true);
		/**
		 * 声明一个消息队列
		 * 
		 * @param queue
		 *            消息队列名字
		 * @param durable
		 *            持久化
		 * @param exclusive
		 *            排他队列
		 * @param autoDelete
		 *            自动删除,如果该队列没有任何订阅的消费者的话,该队列会被自动删除。这种队列适用于临时队列。
		 * @param arguments
		 *            other properties (construction arguments) for the queue
		 * @return a declaration-confirm method to indicate the queue was
		 *         successfully declared
		 * @throws java.io.IOException
		 *             if an error is encountered
		 */
		DeclareOk declareOk = channel.queueDeclare(queueName, false, false,
				false, null);
		declareOk.getQueue();
		// exchangeName 和 routingKey进行队列绑定
		BindOk bindOk = channel.queueBind(queueName, exchangeName, routingKey);
		return bindOk;
	}

	public static void main(String[] args) {

		Client client = new Client("192.168.0.13", 5672, "guest", "guest", "/");
		try {
			// 获取连接
			Connection connection = client.getConnection();
			// 创建 Channel
			final Channel channel = connection.createChannel();
			// exchangeName 为 amq.topic;routingKey 为World
			RabbitmqAMQP publish = new RabbitmqAMQP(channel, "amq.topic",
					"World");
			// 创建queue
			publish.createExchange("public");
			// 发送消息
			for (int i = 0; i < 1; i++) {
				publish.publish("发送消息--tpx" + i);
			}

			//消息监听
			channel.basicConsume("public", false, "test", new DefaultConsumer(
					channel) {
				public void handleDelivery(String consumerTag,
						Envelope envelope, AMQP.BasicProperties properties,
						byte[] body) throws IOException {

					System.out.println("确认收到的消息:" + new String(body));
					channel.basicAck(envelope.getDeliveryTag(), false);// 确认收到的消息
				}
			});
			// publish.receive();
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} catch (TimeoutException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}

		// client.closeConnection();
	}
}

测试 rabbitmq_web_mqtt 使用 eclipse paho-mqtt/1.0.1/mqttws31.js

<!doctype html>
<html>
<head>
    <title>rabbitmq_web_mqtt test</title>
    <meta charset="utf-8">
    <meta http-equiv="X-UA-Compatible" content="IE=edge">
    <meta name="viewport" content="width=device-width, initial-scale=1.0, user-scalable=0, minimum-scale=1.0, maximum-scale=1.0">
    <meta name="apple-mobile-web-app-capable" content="yes">
    <meta name="apple-mobile-web-app-status-bar-style" content="black">
    <meta name="format-detection" content="telephone=no">
    <script src="https://cdnjs.cloudflare.com/ajax/libs/paho-mqtt/1.0.1/mqttws31.js" type="text/javascript"></script>
</head>

<body layout ng-cloak>
<script>

    // Create a client instance
    client = new Paho.MQTT.Client("192.168.0.13", 15675, "/ws","clientId");

    // set callback handlers
    client.onConnectionLost = onConnectionLost;
    client.onMessageArrived = onMessageArrived;

    // connect the client
    client.connect({onSuccess:onConnect});


    // called when the client connects
    function onConnect() {
        // Once a connection has been made, make a subscription and send a message.
        console.log("onConnect");
        client.subscribe("World");
        message = new Paho.MQTT.Message("Hello");
        message.destinationName = "World";
        client.send(message);
    }

    // called when the client loses its connection
    function onConnectionLost(responseObject) {
        if (responseObject.errorCode !== 0) {
            console.log("onConnectionLost:"+responseObject.errorMessage);
        }
    }

    // called when a message arrives
    function onMessageArrived(message) {
        console.log("onMessageArrived:"+message.payloadString);
    }
</script>
</body>
</html>
  • docker的用户
sudo docker exec -it 1a90547000a8 rabbitmqctl -n  rabbit@1a90547000a8 list_users

ssl 连接 需要证书 请参照官网 http://www.rabbitmq.com/ssl.html

  1. 目录创建均在docker 目录的 /etc/rabbitmq/ 下面
root@f9ac9a9d630a:/etc/rabbitmq# ls
all_cacerts.pem  client  enabled_plugins  rabbitmq.config  server  testca
  1. 配置文件修改 /etc/rabbitmq/rabbitmq.config
%% Disable SSLv3.0 support, leaves TLSv1.0 enabled.
[
 {ssl, [{versions, ['tlsv1.2', 'tlsv1.1', tlsv1]}]},
 {rabbit, [
           {loopback_users, [ ]},
           {ssl_listeners, [5671]},
           {ssl_options, [{cacertfile,"/etc/rabbitmq/testca/cacert.pem"},
                          {certfile,  "/etc/rabbitmq/server/cert.pem"},
                          {keyfile,   "/etc/rabbitmq/server/key.pem"},
                          %%是否验证
			  {verify,     verify_peer},   
                          %%验证签名失败 拒绝
                          {fail_if_no_peer_cert, true}, 
                          {versions, ['tlsv1.2', 'tlsv1.1', tlsv1]} //
                         ]}
          ]},
 {rabbitmq_mqtt, [
                  {ssl_listeners,[8883]},
                  {tcp_listeners,[1883]},
		  {ssl_cert_login, true},
		  {default_user,     <<"guest">>},
                  {default_pass,     <<"guest">>},
                  {allow_anonymous,  true},
                  {vhost,            <<"/">>},
                  {exchange,         <<"amq.topic">>},
                  {subscription_ttl, 1800000}
          ]}
].

  1. 将docker的 cert.pem 保存到本地 并设置密码 为 MySecretPassword
复制 签名文件到 192.168.0.64:/home/tpx/
scp -r /etc/rabbitmq/ [email protected]:/home/tpx/

退出容器 ctrl+c ctrl+d 

这时需要保存镜像文件 
docker commit 容器id 镜像名字

杀死以前的进程 防止端口冲突
docker kill  容器id

启动
docker run -itd -p 8883:8883 -p 1883:1883 -p 5617:5671 -p 5672:5672 -p 15672:15672 -p 15675:15675 -p 4369:4369 -p 25672:25672 镜像名字

本机执行 并设置密码 为 MySecretPassword
keytool -import -alias server1 -file /home/tpx/server/cert.pem -keystore /home/tpx/rabbitstore

ssl连接测试

package tpx.rabbitmq;

import java.io.*;
import java.security.*;

import javax.net.ssl.*;

import com.rabbitmq.client.*;
/**
 * ssl 证书连接
 * @author tpx
 *
 */
public class RabbitmqCert {
	public static void main(String[] args) throws Exception {

		// 加载客户端证书
		char[] keyPassphrase = "MySecretPassword".toCharArray();
		KeyStore ks = KeyStore.getInstance("PKCS12");
		ks.load(new FileInputStream("/home/tpx/client/keycert.p12"),
				keyPassphrase);

		KeyManagerFactory kmf = KeyManagerFactory.getInstance("SunX509");
		kmf.init(ks, keyPassphrase);

		// 加载store的key
		char[] trustPassphrase = "MySecretPassword".toCharArray();
		KeyStore tks = KeyStore.getInstance("JKS");
		tks.load(new FileInputStream("/home/tpx/rabbitstore"), trustPassphrase);

		TrustManagerFactory tmf = TrustManagerFactory.getInstance("SunX509");
		tmf.init(tks);

		// ssl协议
		SSLContext c = SSLContext.getInstance("TLSv1.1");
		c.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null);

		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("test13.kingsilk.xyz");
		factory.setPort(5671);
		factory.useSslProtocol(c);
		/*
		 * factory.setUri("amqps://tpx:[email protected]:5671/test");
		 * factory.useSslProtocol(c);
		 */
		Connection conn = factory.newConnection();
		final Channel channel = conn.createChannel();

		channel.queueDeclare("public", false, false, false, null);
		channel.basicPublish("amq.topic", "World", null,
				"Hello, World".getBytes());
		
		channel.basicConsume("public", false, "test",
				new DefaultConsumer(channel) {
					public void handleDelivery(String consumerTag,
							Envelope envelope, AMQP.BasicProperties properties,
							byte[] body) throws IOException {

						System.out.println("确认收到的消息:"+new String(body));
						channel.basicAck(envelope.getDeliveryTag(), false);//确认收到的消息
					}
				});
		

		/*单条消息
		 * GetResponse chResponse = channel.basicGet("public", false);
		if (chResponse == null) {
			System.out.println("No message retrieved");
		} else {
			byte[] body = chResponse.getBody();
			System.out.println("Recieved: " + new String(body));
		}*/

		// channel.close();
		// conn.close();
	}
}

总结

  1. 经测试 js 和 amqp 和 paho 和 cert 四者的信息可以进行通信

  2. 其 js和paho 测试使用的 mqtt 默认使用的exchange 是 amq.topic

  3. amqp 的api完全可以代替 paho ,可以不使用 paho的 api

⚠️ **GitHub.com Fallback** ⚠️