Spring rabbit - TuPengXiong/TuPengXiong.github.io GitHub Wiki

测试代码

package tpx.rabbitmq;

import java.util.HashMap;
import java.util.Map;

import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory.CacheMode;
import org.springframework.amqp.rabbit.connection.ChannelListener;
import org.springframework.amqp.rabbit.connection.Connection;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionListener;
import org.springframework.amqp.rabbit.connection.SimpleRoutingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
import org.springframework.retry.support.RetryTemplate;

import com.rabbitmq.client.Channel;

public class CachingConnectionFactoryDemo {

	public static void main(String[] args) throws InterruptedException {

		CachingConnectionFactory connectionFactory = new CachingConnectionFactory(
				"192.168.0.12", 380);
		connectionFactory.setUsername("guest");
		connectionFactory.setPassword("guest");
		// 缓存模式
		connectionFactory.setCacheMode(CacheMode.CHANNEL);
		// 当前允许空闲的最大通道数目
		connectionFactory.setChannelCacheSize(50);
		// 当前允许空闲的最大连接数目
		connectionFactory.setConnectionCacheSize(50);
		connectionFactory.setVirtualHost("/");
		// 确认和返回消息可通过分别设置 CachingConnectionFactory 的 publisherConfirms 和
		// publisherReturns 属性为ture来完成.
		connectionFactory.setPublisherReturns(true);
		connectionFactory.setPublisherConfirms(true);
		// channel监听
		connectionFactory.addChannelListener(new ChannelListener() {
			public void onCreate(Channel channel, boolean transactional) {
				// TODO Auto-generated method stub
				/*
				 * System.out.println("channel Created:" +
				 * channel.getChannelNumber());
				 */
			}
		});
		// connection监听
		connectionFactory.addConnectionListener(new ConnectionListener() {
			public void onCreate(Connection connection) {
				// TODO Auto-generated method stub
				System.out.println("Connection Created:"
						+ connection.getLocalPort());
			}

			public void onClose(Connection connection) {
				// TODO Auto-generated method stub
				System.out.println("Connection onClose:"
						+ connection.getLocalPort());
			}
		});

		/**
		 * 申明exchange queue routingKey
		 **/
		RabbitAdmin admin = new RabbitAdmin(connectionFactory);
		Queue queue = new Queue("spring");
		admin.declareQueue(queue);
		TopicExchange exchange = new TopicExchange("amq.topic", true, false);
		admin.declareExchange(exchange);
		admin.declareBinding(BindingBuilder.bind(queue).to(exchange)
				.with("World"));

		/**
		 * 消息监听
		 */
		SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
		container.setConnectionFactory(connectionFactory);
		// 开启事务
		container.setChannelTransacted(true);
		// 不会自动创建 --默认是自动创建
		// container.setAutoDeclare(false);
		// 设置当前的消费者
		container.setConcurrentConsumers(1);
		// 添加绑定的队列
		container.addQueueNames("	java", "mqtt-subscription-clientIdqos0",
				"public", "test");
		// 添加消息监听
		container.setMessageListener(new ExampleListener());
		// 开始监听
		container.start();

		// 断开链接重新获取链接
		RabbitTemplate rabbitTemplate = new RabbitTemplate(
				container.getConnectionFactory());

		RetryTemplate retryTemplate = new RetryTemplate();
		ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
		// 重新实例化 延迟时间
		backOffPolicy.setInitialInterval(500);

		backOffPolicy.setMultiplier(10.0);
		// 做多延迟时间 10s
		backOffPolicy.setMaxInterval(10000);

		retryTemplate.setBackOffPolicy(backOffPolicy);
		rabbitTemplate.setRetryTemplate(retryTemplate);
		/**
		 * 只可以有一个 接受到消息回调
		 */
		rabbitTemplate.setReturnCallback(new ReturnCallback() {

			public void returnedMessage(Message message, int replyCode,
					String replyText, String exchange, String routingKey) {
				// TODO Auto-generated method stub
				System.out.println("setReturnCallback:"
						+ new String(message.getBody()));
			}
		});

		/**
		 * 只可以有一个 发布消息回调
		 */
		rabbitTemplate.setConfirmCallback(new ConfirmCallback() {

			public void confirm(CorrelationData correlationData, boolean ack,
					String cause) {
				// TODO Auto-generated method stub
				System.out.println("setConfirmCallback"
						+ correlationData.toString());
			}
		});

		int count = 3;
		// 发送信息
		for (int i = 0; i < count; i++) {
			String msg = "Hello World " + i;
			Object reply = rabbitTemplate.convertSendAndReceive("amq.topic",
					"World", msg.getBytes());

			// convertAndSend
			rabbitTemplate.convertAndSend("111");
		}

		int flag = 0;
		while (flag < count) {
			Object msg = rabbitTemplate.receiveAndConvert("spring");
			if (msg == null) {
				Thread.sleep(1000);
			} else {
				System.out.println("receiveAndConvert:"
						+ new String((byte[]) msg));
			}
			flag++;
		}
		// 关闭链接
		connectionFactory.destroy();

	}

	public static void SimpleRoutingConnectionFactory() {
		SimpleRoutingConnectionFactory connectionFactory = new SimpleRoutingConnectionFactory();
		Map<Object, ConnectionFactory> targetConnectionFactories = new HashMap<Object, ConnectionFactory>();
		targetConnectionFactories.put("192.168.0.12",
				new CachingConnectionFactory("192.168.0.12", 5672));
		connectionFactory
				.setTargetConnectionFactories(targetConnectionFactories);

		final RabbitTemplate rabbitTemplate = new RabbitTemplate(
				connectionFactory);
		RetryTemplate retryTemplate = new RetryTemplate();
		ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
		backOffPolicy.setInitialInterval(500);
		backOffPolicy.setMultiplier(10.0);
		backOffPolicy.setMaxInterval(10000);
		retryTemplate.setBackOffPolicy(backOffPolicy);
		rabbitTemplate.setRetryTemplate(retryTemplate);

	}

	static class ExampleListener extends MessageListenerAdapter {

		public void onMessage(Message message) {
			System.out.println("received: " + message);
		}
	}
}

code

package xyz.kingsilk.qh.service.amqp

import com.rabbitmq.client.AMQP.Queue.DeclareOk
import com.rabbitmq.client.Channel
import org.springframework.amqp.core.BindingBuilder
import org.springframework.amqp.core.Exchange
import org.springframework.amqp.core.Queue
import org.springframework.amqp.core.TopicExchange
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory
import org.springframework.amqp.rabbit.core.RabbitAdmin
import org.springframework.amqp.rabbit.core.RabbitTemplate
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer
import org.springframework.retry.backoff.ExponentialBackOffPolicy
import org.springframework.retry.support.RetryTemplate

class QhAmqpDevice {

    CachingConnectionFactory connectionFactory
    RabbitTemplate rabbitTemplate
    RabbitAdmin rabbitAdmin
    SimpleMessageListenerContainer container
    RetryTemplate retryTemplate
    String[] queues = ["status", "cmd"]
    String exchage = "amq.topic"
    String routingKeyPrefix = "dev."
    AmqpMessageListener amqpMessageListener
    boolean enabled = false

    /**
     * 链接amqp,使用默认设置
     * @param connectionFactory
     * @param rabbitTemplate
     * @param rabbitAdmin
     */
    public QhAmqpDevice(CachingConnectionFactory connectionFactory, RabbitTemplate rabbitTemplate, RabbitAdmin rabbitAdmin) {

        this.connectionFactory = connectionFactory;
        this.rabbitTemplate = rabbitTemplate;
        this.rabbitAdmin = rabbitAdmin
        for (String queue : queues) {
            Queue q = new Queue(queue);
            rabbitAdmin.declareQueue(q);
            TopicExchange exchange = new TopicExchange(exchage);
            rabbitAdmin.declareExchange(exchange);
            rabbitAdmin.declareBinding(BindingBuilder.bind(q).to(exchange).with(routingKeyPrefix + queue + ".*"))
        }
        setRetryTemplate();
    }

    public QhAmqpDevice(CachingConnectionFactory connectionFactory, RabbitTemplate rabbitTemplate, RabbitAdmin rabbitAdmin, boolean enabled, amqpMessageListener) {

        this.connectionFactory = connectionFactory;
        this.rabbitTemplate = rabbitTemplate;
        this.rabbitAdmin = rabbitAdmin
        this.enabled = enabled
        this.amqpMessageListener = amqpMessageListener
        for (String queue : queues) {
            Queue q = new Queue(queue);
            rabbitAdmin.declareQueue(q);
            TopicExchange exchange = new TopicExchange(exchage);
            rabbitAdmin.declareExchange(exchange);
            rabbitAdmin.declareBinding(BindingBuilder.bind(q).to(exchange).with(routingKeyPrefix + queue + ".*"))
        }
        setRetryTemplate();
        if (enabled) {
            setMessageListener();
        }
    }


    public QhAmqpDevice(CachingConnectionFactory connectionFactory, RabbitTemplate rabbitTemplate, RabbitAdmin rabbitAdmin, String[] queues, String exchage, String routingKeyPrefix) {
        this.queues = queues;
        this.exchage = exchage;
        this.routingKeyPrefix = routingKeyPrefix
        this.connectionFactory = connectionFactory;
        this.rabbitTemplate = rabbitTemplate;
        this.rabbitAdmin = rabbitAdmin
        for (String queue : queues) {
            Queue q = new Queue(queue);
            rabbitAdmin.declareQueue(q);
            TopicExchange exchange = new TopicExchange(exchage);
            rabbitAdmin.declareExchange(exchange);
            rabbitAdmin.declareBinding(BindingBuilder.bind(q).to(exchange).with(routingKeyPrefix + queue + ".*"))
        }
        setRetryTemplate();
    }

    public QhAmqpDevice(CachingConnectionFactory connectionFactory, RabbitTemplate rabbitTemplate, RabbitAdmin rabbitAdmin, String[] queues, String exchage, String routingKeyPrefix, boolean enabled, amqpMessageListener) {
        this.queues = queues;
        this.exchage = exchage;
        this.enabled = enabled
        this.routingKeyPrefix = routingKeyPrefix
        this.connectionFactory = connectionFactory;
        this.rabbitTemplate = rabbitTemplate;
        this.rabbitAdmin = rabbitAdmin
        this.amqpMessageListener = amqpMessageListener
        for (String queue : queues) {
            Queue q = new Queue(queue);
            rabbitAdmin.declareQueue(q);
            TopicExchange exchange = new TopicExchange(exchage);
            rabbitAdmin.declareExchange(exchange);
            rabbitAdmin.declareBinding(BindingBuilder.bind(q).to(exchange).with(routingKeyPrefix + queue + ".*"))
        }
        setRetryTemplate();
        if (enabled) {
            setMessageListener();
        }

    }

    /**
     * 消息监听
     */
    protected void setMessageListener() {
        container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setMessageListener(amqpMessageListener);
        for (String queue : queues) {
            container.addQueueNames(queue)
        }
        container.start();
    }

    /**
     *
     */
    protected void setRetryTemplate() {
        retryTemplate = new RetryTemplate();
        ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
        // 重新实例化 延迟时间
        backOffPolicy.setInitialInterval(500);
        //重新链接次数
        backOffPolicy.setMultiplier(10.0);
        // 最多延迟时间 10s
        backOffPolicy.setMaxInterval(10000);
        retryTemplate.setBackOffPolicy(backOffPolicy);
        rabbitTemplate.setRetryTemplate(retryTemplate)
    }

    /**
     * 容器添加队列
     * @param queues
     */
    public void containerAddQueues(Queue... queues) {
        container.addQueues(queues)
    }

    /**
     * 容器添加队列
     * @param queues
     */
    public void containerAddQueueNames(String... names) {
        container.addQueues(names)
    }

    /**
     * 发布消息
     * @param exchangeName
     * @param routingKey
     * @param message
     * @return
     */
    public Object publish(String exchangeName, String routingKey, Object message) {
        try {
            Object reply = rabbitTemplate.convertSendAndReceive(exchangeName, routingKey, message);
            return reply
        } catch (Exception e) {

            return null
        }
    }

    /**
     * 申明一个exchange
     * @param exchange
     * @return
     */
    public boolean declareExchange(Exchange exchange) {
        try {
            rabbitAdmin.declareExchange(exchange)
            return true
        } catch (Exception e) {

            return false
        }
    }
    /**
     * 申明一个exchange
     * @param exchange
     * @return
     */
    public boolean declareExchaneges(Channel channel, Exchange... exchange) {
        try {
            rabbitAdmin.declareExchanges(channel, exchange)
            return true
        } catch (Exception e) {

            return false
        }
    }

    /**
     * 申明一个队列
     * @param queueName ;队列名字
     * @return
     * 队列的名字
     */
    public String declareQueue(String queueName) {
        try {
            Queue queue = new Queue(queueName);
            return rabbitAdmin.declareQueue(queue);
        } catch (Exception e) {

            return null
        }
    }

    /**
     * 申明一个队列
     * @param queue ;队列
     * @return
     * 队列的名字
     */
    public String declareQueue(Queue queue) {
        try {
            return rabbitAdmin.declareQueue(queue);
        } catch (Exception e) {
            return null
        }
    }

    /**
     * 申明队列
     * @param queue ;队列
     * @return
     * 队列的名字
     */
    public DeclareOk[] declareQueues(Channel channel, Queue... queue) {
        try {
            return rabbitAdmin.declareQueues(channel, queue);
        } catch (Exception e) {

            return null
        }
    }

    /**
     * 绑定
     * @param binding
     * @return
     */
    public boolean declareBinding(final Binding binding) {
        try {
            rabbitAdmin.declareBinding(binding)
            return true
        } catch (Exception e) {

            return false
        }
    }

    /**
     *  绑定
     * @param queue
     * @param exchange
     * @param routingKey
     * @return
     */
    public boolean declareBinding(Queue queue, Exchange exchange, String routingKey) {
        try {
            rabbitAdmin.declareBinding(BindingBuilder.bind(queue).to(exchange).with(routingKey))
            return true
        } catch (Exception e) {

            return false
        }
    }

    /**
     * 关闭链接
     * @return
     */
    public boolean closeConnectionFactory() {
        try {
            if (!connectionFactory) {
                connectionFactory.destroy();
            }
            return true
        } catch (Exception e) {

            return false
        }
    }

    /**
     * 关注并获取消息
     */
    public Object subscribe(String queueName) {
        try {
            return rabbitTemplate.receiveAndConvert(queueName)
        } catch (Exception e) {

            return null
        }
    }


}