java api rabbitmq - TuPengXiong/TuPengXiong.github.io GitHub Wiki

建立链接和创建CHANNAL

package tpx.rabbitmq;

import java.io.IOException;
import java.net.URISyntaxException;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
 * 建立链接和创建CHANNAL
 * @author tpx
 *
 */
public class Client {

	private String userName;
	private String password;
	private String host;
	private String virtualHost;
	private int port;
	
	private Connection connection;
	private Channel channel;
	public static ConnectionFactory factory = new ConnectionFactory();

	public Client(String host, int port, String userName,
			String password, String virtualHost){
		this.host = host;
		this.password = password;
		this.userName = userName;
		this.port = port;
		this.virtualHost = virtualHost;
	}

	/**
	 * 获取与mq server的连接
	 * 
	 * @param host
	 * @param port
	 * @param userName
	 * @param password
	 * @param virtualHost
	 * @return
	 * @throws TimeoutException 
	 * @throws IOException 
	 */
	public Connection getConnection() throws IOException, TimeoutException {
		if (userName != null) {
			factory.setUsername(userName);
		}
		if (password != null) {
			factory.setPassword(password);
		}
		if (virtualHost != null) {
			factory.setVirtualHost(virtualHost);
		}
		factory.setHost(host);
		factory.setPort(port);
		factory.setRequestedHeartbeat(60); //设置心跳测试 单位s
		factory.setAutomaticRecoveryEnabled(true); //自动连接
		connection = factory.newConnection(); //获取链接
		return connection;
	}

	/**
	 * 获取与mq server的连接
	 * 
	 * @param uri
	 * @return
	 * @throws TimeoutException
	 * @throws IOException
	 * @throws URISyntaxException
	 * @throws NoSuchAlgorithmException
	 * @throws KeyManagementException
	 */
	public Connection getConnection(String uri) throws IOException,
			TimeoutException, KeyManagementException, NoSuchAlgorithmException,
			URISyntaxException {
		// factory.setUri("amqp://userName:password@hostName:portNumber/virtualHost");
		factory.setUri(uri);
		connection = factory.newConnection();
		return connection;
	}

	/**
	 * 创建Channel
	 * @return
	 * @throws IOException
	 */
	public Channel createChannel() throws IOException {
		  channel = connection.createChannel();
		  return channel;
	}

	/**
	 * 关闭连接
	 */
	public void closeConnection() {		
		if (channel != null) {
			try {
				channel.close();
			} catch (IOException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			} catch (TimeoutException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}
		
		if (connection != null) {
			try {
				connection.close();
			} catch (IOException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}
	}
	
	
	public static void main(String[] args) {
		Client client = new Client("tpxsoft.wicp.net", 5672, null,
				null, null);
		try {
			Connection connection = client.getConnection();
			Channel channel = connection.createChannel(10086);
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} catch (TimeoutException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}

		client.closeConnection();
	}
}


发布信息

package tpx.rabbitmq;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.AMQP.Queue.BindOk;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

/**
 * 发布信息
 * @author tpx
 *
 */
public class Publish {

	private String exchangeName;
	private String routingKey;
	private Channel channel;
	private String queueName;

	public Publish(Channel channel, String exchangeName, String routingKey) {
		this.channel = channel;
		this.exchangeName = exchangeName;
		this.routingKey = routingKey;
	}

	/**
	 * 发布信息
	 * 
	 * @param data
	 * @throws IOException
	 */
	public void publish(String data) throws IOException {
		byte[] messageBodyBytes = data.getBytes();
		channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);
	}

	/**
	 * 创建Exchange
	 * 
	 * @return
	 * @throws IOException
	 */
	private BindOk createExchange(String queueName) throws IOException {
		channel.exchangeDeclare(exchangeName, "direct", true);
		 /**
	     * 声明一个消息队列 
	     * @param queue 消息队列名字
	     * @param durable 持久化
	     * @param exclusive 排他队列
	     * @param autoDelete 自动删除,如果该队列没有任何订阅的消费者的话,该队列会被自动删除。这种队列适用于临时队列。
	     * @param arguments other properties (construction arguments) for the queue
	     * @return a declaration-confirm method to indicate the queue was successfully declared
	     * @throws java.io.IOException if an error is encountered
	     */
		this.queueName = channel.queueDeclare(queueName, false, false, false,null).getQueue();
		//exchangeName 和 routingKey进行队列绑定
		BindOk bindOk = channel.queueBind(queueName, exchangeName, routingKey);
		return bindOk;
	}

	/**
	 * 监听
	 * 
	 * @throws IOException
	 */
	/*public void resp() throws IOException {
		System.out.println("监听:");
		channel.addReturnListener(new ReturnListener() {

			public void handleReturn(int replyCode, String replyText,
					String exchange, String routingKey,
					BasicProperties properties, byte[] body) throws IOException {
				// TODO Auto-generated method stub
				System.out.println(replyCode);
				System.out.println(exchange);
				System.out.println(routingKey);
				System.out.println(properties);
				System.out.println(new String(body));
			}
		});
		channel.addShutdownListener(new ShutdownListener() {

			public void shutdownCompleted(ShutdownSignalException cause) {
				// TODO Auto-generated method stub
				System.out.println("addShutdownListener:");
				System.out.println(cause);
			}
		});

		channel.addConfirmListener(new ConfirmListener() {

			public void handleNack(long deliveryTag, boolean multiple)
					throws IOException {
				// TODO Auto-generated method stub
				System.out.println("handleNack:" + deliveryTag);
			}

			public void handleAck(long deliveryTag, boolean multiple)
					throws IOException {
				// TODO Auto-generated method stub
				System.out.println("handleAck:" + deliveryTag);
			}
		});
	}*/

	public static void main(String[] args) {

		Client client = new Client("tpxsoft.wicp.net", 5672, null, null, null);
		try {
			Connection connection = client.getConnection();
			Channel channel = connection.createChannel();
			Publish publish = new Publish(channel, "exchange1", "routingKey1");
			publish.createExchange("queueName1");
			for(int i=0;i<1;i++){
				publish.publish("我是来自Publish的数据 "+i +"消息队列=="+publish.queueName +"|exchangeName=="+publish.exchangeName);
			}
			//publish.receive();
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} catch (TimeoutException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}

		 client.closeConnection();
	}
}

接收信息

package tpx.rabbitmq;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.AMQP.Queue.BindOk;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
/**
 * 接受信息
 * @author tpx
 *
 */
public class Receive {

	private String exchangeName;
	private String routingKey;
	private Channel channel;
	private String queueName;

	public Receive(Channel channel, String exchangeName, String routingKey) {
		this.channel = channel;
		this.exchangeName = exchangeName;
		this.routingKey = routingKey;
	}

	/**
	 * 创建Exchange
	 * 
	 * @return
	 * @throws IOException
	 */
	private void createExchange(String queueName) throws IOException {
		channel.exchangeDeclare(exchangeName, "direct", true);
		this.queueName = channel.queueDeclare(queueName, false, false, false,null).getQueue();
		//BindOk bindOk = channel.queueBind(queueName, exchangeName, routingKey);
		//return bindOk;
	}

	/**
	 * 接受信息
	 * @throws IOException
	 */
	public void receive() throws IOException {
		System.out.println("等待消息:消息队列=="+queueName+"|exchangeName=="+exchangeName+"|route:"+routingKey);
		boolean autoAck = false;
		channel.basicConsume(queueName, autoAck, "myConsumerTag",
				new DefaultConsumer(channel) {
					public void handleDelivery(String consumerTag,
							Envelope envelope, AMQP.BasicProperties properties,
							byte[] body) throws IOException {
						String routingKey = envelope.getRoutingKey();
						String contentType = properties.getContentType();
						long deliveryTag = envelope.getDeliveryTag();
						System.out.println("确认收到的消息:"+new String(body));
						/*System.out.println(this.getClass().getName()+"==routingKey:"+routingKey);
						System.out.println(this.getClass().getName()+"==contentType:"+contentType);
						System.out.println(this.getClass().getName()+"==deliveryTag:"+deliveryTag);*/
						channel.basicAck(deliveryTag, false);//确认收到的消息
					}
				});
	}

	public static void main(String[] args) {

		Client client = new Client("tpxsoft.wicp.net", 5672, null, null, null);
		try {
			Connection connection = client.getConnection();
			Channel channel = connection.createChannel();
			Receive Receive = new Receive(channel, "exchange1", "routingKey1");
			Receive.createExchange("queueName1");
			Receive.receive();
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} catch (TimeoutException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}

		// client.closeConnection();
	}
}