activemq演示 - wtdig/study GitHub Wiki

Activemq

一、第一部分

1 . 简介

ctiveMq是Java,RabbitMQ是Erlang,理论上,RabbitMQ的性能比ActiveMq更强,是非Java系统的首选,ActiveMq是Java的,整套系统如果本来就是Java的,配合的默契更佳。虽然目前项目是java的,所以使用ActiveMq,但是出于参考学习的态度,把RabbitMq一起搭建学习了更好。

RabbitMQ:基于AMQP协议(Advanced Message Queue Protocol)

ActiveMQ:基于STOMP协议

http://swingchen.javaeye.com/blog/173658 RabbitMQ/AMQP所描绘的美好前景(转)

rabbitMQ 是 AMQP 用 Erlang 实现的 MQ 。之前不是很理解,为什么要用 Erlang 来“又实现一个中间件”呢?这么做能发挥 Erlang 的优势么?机缘巧合,最近了解了一下 AMQP ,有了一点新的认识。

AMQP 主要是由金融领域的软件专家们贡献的创意,而联合了通讯和软件方面的力量,一起打造出来的规范。【Contributors: JPMorgan Chase Bank & Co., Cisco Systems, Inc., Credit Suisse, Envoy Technologies Inc., iMatix Corporation, IONA Technologies, Rabbit Technologies Ltd., Red Hat, Inc., TWIST Process Innovations Ltd, and 29West, Inc.】粗略的从概念上来讲 AMQP 首先满足的是金融系统的消息通讯业务需求。这是一个可以和 JMS 进行类比的消息中间件开放规范,所不同的是 AMQP 同时定义了消息中间件的语意层面和协议层面;另外一个不同是 AMQP 是语言中立的,而 JMS 仅和 Java 相关。AMQP 在“语意层面的定义”,这就意味着,它并不仅仅是象 JMS 或者其他的 MQ 一样,仅能按照预定义的方式工作,而是“可编程”的消息中间件。而“语言中立”则意味着只要遵循 AMQP 的协议,任何一种语言都可以开发消息组件乃至中间件本身。比如说这样的场景:“Java 写的消息端(新的前端)通过 Erlang 写的消息中间件(基础设施)与 C 写的另外一个消息端(遗留系统)进行消息交互”。AMQP 是一个开放标准,目前还在 0.9 版本。尚未成熟,但市场上已经出现了很多这个标准的实现产品。在 AMQP 所描绘的美好前景下,我们可以这么设想将来构建在成熟之后构建在 AMQP 之上的金融系统。前端程序员用他们熟悉的“工业语言”来构建系统中新的应用模块。后端程序员则继续用“老旧语言”在“遗产系统”上慢慢改进。当然,金融系统需要他们赖以沟通消息互相调用的“基础设施”必须坚若磐石。为业界提供“高并发,易扩容”的产品,这似乎正是 Erlang 的强项。之前听说“ Erlang 进入金融系统”,具体的事例,大概就是这件事了。

2 . 特性

3 . 两种使用方式

4 . 通讯协议

常用的通讯协议为:tcp和nio协议,一般为tcp,当需要大量的客户端链接borker时,使用nio可以提高效率

5 . 使用场景


二、第二部分

1、activemq的安装启动

1、下载activemq

 wget https://archive.apache.org/dist/activemq/5.14.3/apache-activemq-5.14.3-bin.tar.gz

 注意:不同版本的mq,要求的jdk不一样,5.14.3可以使用jdk1.7。如果要求的jdk不一样,会出现无法启动mq

2、解压

 tar -zxvf apache-activemq-5.14.3-bin.tar.gz

3、目录介绍

从它的目录来说,还是很简单的: 

bin存放的是脚本文件
conf存放的是基本配置文件
data存放的是日志文件
docs存放的是说明文档
examples存放的是简单的实例
lib存放的是activemq所需jar包
webapps用于存放项目的目录

4、启动、停止、查看指令

进入bin目录

./activemq start  启动mq

./activemq status 查看mq状态

./activemq stop 停止mq

5、启动后,可以访问默认的8161端口,查询相关消息 ,默认用户名admin 密码admin

2、activemq的配置文件

activemq.xml

1)安全配置,在broker里面添加安全插件,这里是消息发送者和消息接受者需要设置的密码,控制台的密码需要在jetty-realm.properties的文件中配置

<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}">

	   <plugins>
            <simpleAuthenticationPlugin>
                <users>
                    <authenticationUser username="root" password="root" groups="admins,publishers,consumers"/>
                </users>
            </simpleAuthenticationPlugin>
        </plugins>
</broker>

3、activemq的理论知识点

1)api


a、点对点的消息模型: 每个消息只能有一个消费者。
消息的生产者和消费者之间没有时间上的相关性。无论消费者在生产者发送消息的时候是否处于运行状态,它都可以提取消息

b、发布订阅消息模型   每个消息可以有多个消费者
生产者和消费者之间有时间上的相关性。订阅一个主题的消费者只能消费自它订阅之后发布的消息。JMS规范允许客户创建持久订阅,这在一定程度上放松了时间上的相关性要求。持久订阅允许消费者消费它在未处于激活状态时发送的消息。

1) 接口描述:

ConnectionFactory:连接工厂,JMS创建连接的方式

Connection:JMS客户端与JMS Provider的连接(通过ConnectionFactory创建的)。

Destination:消息的目的地

Session:一个接收或发送消息的一次回话

MessageProducer:由session对象创建的用于发送消息的对象

MessageConsumer:由session对象创建的用来接收消息的对象

2)消息传输

一、消息协商器(Message Broker)

broke:消息的交换器,就是对消息进行管理的容器。ActiveMQ 可以创建多个 Broker,客户端与ActiveMQ交互,实际上都是与ActiveMQ中的Broker交互,Broker配置在${MQ_HOME}\conf\activemq.xml。
生产者----》broke---->>消费者

二、连接器(Connectors)
(一)、传输连接器 (transportConnectors)

transportConnectors 连接器:就是建立broker与消息生产者、消息消费者之间的交互。

传输连接器常用的协议:

  在Active MQ中常用的连接协议:tcp、udp、nio、ssl、http、https、vm。如果使用ssl协议需要配置证书,使用http或者https需要使用httpclient来发送接收消息。

(1)TCP默认的协议

tcp://hostname:port?key=value&key=value   后面的参数选填
使用TCP协议的好处

高效的:该协议连接使用了OpenWire协议,通过把消息转换成字节流,性能非常好
可用性:TCP是使用非常广泛的网络协议,基本上所有的平台都支持
TCP配置示例conf/activemq.xml   :

        <transportConnectors>
            <!--activemq 的默认连接 tcp-->
            <transportConnector name="openwire" uri="tcp://0.0.0.0:61616"/>
        </transportConnectors>
(2)、NIO
a、NIO协议和TCP协议类似,但NIO更侧重于底层的访问操作。它允许开发人员对同一资源可有 更多的client调用和服务端有更多的负载。
b、适合使用NIO协议的场景: 
  可能有大量的Client去链接到Broker上 一般情况下,大量的Client去链接Broker是被操作系统的线程数所限制的。因此, NIO的实现比TCP需要更少的线程去运行,所以建议使用NIO协议 
  可能对于Broker有一个很迟钝的网络传输 NIO比TCP提供更好的性能
c、NIO连接的URI形式:nio://hostname:port?key=value   后面的参数选填

d、NIO配置实例 conf/activemq.xml:

         <transportConnectors>
            <!-- 设置一个NIO的连接-->
            <transportConnector name="nio" uri="nio://0.0.0.0:61617"/>
        </transportConnectors>
(3)、UDP

  a、UDP和TCP的区别

TCP是一个原始流的传递协议,意味着数据包是有保证的,换句话说,数据包是不会被复 制和丢失的。UDP,另一方面,它是不会保证数据包的传递的
TCP也是一个稳定可靠的数据包传递协议,意味着数据在传递的过程中不会被丢失。这样 确保了在发送和接收之间能够可靠的传递。相反,UDP仅仅是一个链接协议,所以它没有可 靠性之说
TCP是被用在稳定可靠的场景中使用的;UDP通常用在快速数据传递和不 怕数据丢失的场景中,还有ActiveMQ通过防火墙时,只能用UDP
  b、UDP连接的URI形式:udp://hostname:port?key=value

       c、配置实例 conf/activemq.xml

        <transportConnectors>
            <transportConnector name="udp" uri="udp://localhost:61618"/>
        </transportConnectors>
(4)、SSL

底层是TCP协议,但是对传输数据进行了加密
a、适用场景:MQ暴露在外网,要求客户端与broker之间通讯

b、使用步骤:
  b-1、创建SSL协议:
  b-2、配置Broker SSL协议  conf/activemq.xml:

       <sslContext>
            <sslContext keyStore="F:/beifeng/apache-activemq-5.6.0/conf/mybroker.ks"
              keyStorePassword="test123" />
       </sslContext>
  b-3、配置客户端SSL协议:



c、SSL的连接的URI形式:ssl://hostname:port?key=value

d、配置实例 conf/activemq.xml

        <transportConnectors>
            <transportConnector name="ssl" uri="ssl://localhost:61619"/>
        </transportConnectors>
(5)HTTP、HTTPS

a、通过jetty容器来接收http协议的mq消息
b、用于只允许基本HTTP服务通过的网络环境
c、通过httpclient来发送/接收消息,需要添加额外的java包 Httpclient、Xstream、activemq-optional
d、URI:http://hostname:port?key=value

e、配置实例conf/activemq.xml:

     <transportConnectors>
            <transportConnector name="http" uri="http://localhost:8080"/>
        </transportConnectors>
配置 HTTPS则需要在jetty.xml中配置相关证书 HTTPS= HTTP+SSL

(二、)网络连接器(NetWorkConnectors) 

NetWorkConnectors:用于Broke与Broke之间的交互 ,主要是ActiveMq集群部署时。

3) 消息存储

参考资料

一、消息的存储方式
ActiveMQ支持JMS规范中的持久化消息与非持久化消息

持久化消息通常用于不管是否消费者在线,它们都会保证消息会被消费者消费。当消息被确认消费后,会从存储中删除
非持久化消息通常用于发送通知以及实时数据,通常要求性能优先,消息可靠性并不是必须的情况
MQ支持可插拔式的消息存储,如:内存、文件和关系数据库等方式
Queue消息模型在ActiveMQ的存储

  采用存储采用先进先出(FIFO),一个消息只能被一个消费者消费,当消息被确认消费之后才会被删除。

Topic消息模型(针对持久订阅)
  每个订阅者获取的消息实际是消息的一个副本,只有一个消息副本会被存储,MQ提供了一个指针来指向消息存储并且分发消息副本到订阅者,消息直到所有的持久化订阅者都被接收才能被删除。

持久化存储方式:

KahaDB消息存储
AMQ消息存储
JDBC消息存储
内存消息存储
二、KahaDB存储方式
  KahaDB是从ActiveMQ 5.4开始默认的持久化插件。KahaDb恢复时间远远小于其前身AMQ并且使用更少的数据文件,所以可以完全代替AMQ,kahaDB的持久化机制同样是基于日志文件,索引和缓存。

(一)、KahaDB主要特性:

日志形式存储消息;
消息索引以B-Tree结构存储,可以快速更新;
完全支持JMS事务;
支持多种恢复机制;
(二)、适用场景:

高吞吐量的应用程序
存储大数据量的消息
(三)、配置方式 conf/activemq.xml:

       <persistenceAdapter>
            <kahaDB directory="${activemq.data}/kahadb"/>
        </persistenceAdapter>

(四)、KahaDB存储原理:

    当有活动消费者时,用于临时存储,消息会被发送给消费着,同时被安排将被存储,如果消息及时被确认,就不需要写入到磁盘。写入到磁盘中的数据消息,在后续的消息活动中,如果消息发送成功,变标记为可删除的。系统会周期性的清除或者归档日志文件。

  1、KahaDB内部结构

Data logs:消息日志包含了消息日志和一些命令
Cache:当有活动消费者时,用于临时存储,消息会被发送给消费着,同时被安排将被存储,如果消息及时被确认,这不需要写入到磁盘
Btree indexes(消息索引):用于引用消息日志(message id),它存储在内存中,这样能快速定位到。MQ会定期将内存中的消息索引保存到metadata store中,避免大量消息未发送时,消息索引占用过多内存空间。
Redo log用于在非正常关机情况下维护索引完整性。

2、目录结构:

Db log files:用于存储消息(默认大小32M),当log日志满了,会创建一个新的,当log日志中的消息都被删除,该日志文件会被删除或者归档。
Archive directory:当datalog不在被kahadb需要会被归档(通过archiveDataLogs属性控制)。
Db.data:存放Btree indexs。
Db.redo:存放redo file,用于恢复Btree indexs。

三、AMQ消息存储
  写入消息时,会将消息写入日志文件,由于是顺序追加写,性能很高。为了提升性能,创建消息主键索引,并且提供缓存机制,进一步提升性能。每个日志文件的大小都是有限制的(默认32m,可自行配置)。当超过这个大小,系统会重新建立一个文件。当所有的消息都消费完成,系统会删除这个文件或者归档(取决于配置)。主要的缺点是AMQ Message会为每一个Destination创建一个索引,如果使用了大量的Queue,索引文件的大小会占用很多磁盘空间。而且由于索引巨大,一旦Broker崩溃,重建索引的速度会非常慢。

特点:类似KahaDB,也包含了事务日志,每个destination都包含一个index文件,AMQ适用于高吞吐量的应用场景,但是不适合多个队列的场景。

 配置方式conf/activemq.xml:

       <!--AMQ    directory:数据存储路径 syncOnWrite:是否同步写入  maxFileLength:日志文件大小 -->
        <persistenceAdapter>
            <amqPersistenceAdapter
                directory="${activemq.data}/AMQdb"
                syncOnWrite="true"
                maxFileLength="10mb" />
        </persistenceAdapter>

1、AMQ内部结构:


Data logs:消息日志包含了消息日志
Cache:用于消息的快速检索
Reference store indexes:用于引用datalogs中的消息,通过message ID 关联

2、目录结构:

Lock:保证同一时间只有一个borker访问文件目录
temp-storag:用于存储非持久化消息(当不在被存储在内存中),如等待慢消费者处理消息
Kr-store:用于存储引用消息日志数据
journal directory:包含了消息文件、消息日志和消息控制信息
Archive:归档的数据日志

四、JDBC存储[参考资料](https://www.aliyun.com/jiaocheng/206340.html)
支持通过JDBC将消息存储到关系数据库,性能上不如文件存储,能通过关系型数据库查询到消息的信息。

MQ支持的数据库:Apache Derby、MYsql、PostgreSQL、Oracle、SQLServer、Sybase、Informix、MaxDB。

存储表结构:

A、ACTIVEMQ_MSGS:用于存储消息,Queue和Topic都存储在这个表中:

ID:自增的数据库主键
CONTAINER:消息的Destination
MSGID_PROD:消息发送者客户端的主键
MSG_SEQ:是发送消息的顺序,MSGID_PROD+MSG_SEQ可以组成JMS的MessageID
EXPIRATION:消息的过期时间,存储的是从1970-01-01到现在的毫秒数
MSG:消息本体的Java序列化对象的二进制数据
PRIORITY:优先级,从0-9,数值越大优先级越高
B、ACTIVEMQ_ACKS:用于存储订阅关系。如果是持久化Topic,订阅者和服务器的订阅关系在这个表保存:

主要的数据库字段如下:
CONTAINER:消息的Destination
SUB_DEST:如果是使用Static集群,这个字段会有集群其他系统的信息
CLIENT_ID:每个订阅者都必须有一个唯一的客户端ID用以区分
SUB_NAME:订阅者名称
SELECTOR:选择器,可以选择只消费满足条件的消息。条件可以用自定义属性实现,可支持多属性AND和OR操作
LAST_ACKED_ID:记录消费过的消息的ID。
C、ACTIVEMQ_LOCK(消息锁,保证同一时间只能有一个broker访问这些表结构):
        表activemq_lock在集群环境中才有用,只有一个Broker可以获得消息,称为Master Broker,其他的只能作为备份等待Master Broker不可用,才可能成为下一个Master Broker。这个表用于记录哪个Broker是当前的Master Broker。

 

配置方式:

1、配置数据源 conf/acticvemq.xml文件:

 <!-- 配置数据源-->
      <bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
        <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
        <property name="url" value="jdbc:mysql://localhost:3306/activemq?relaxAutoCommit=true"/>
        <property name="username" value="root"/>
        <property name="password" value="111111"/>
        <property name="maxActive" value="200"/>
        <property name="poolPreparedStatements" value="true"/>
      </bean>
2、配置broke中的persistenceAdapter :

dataSource指定持久化数据库的bean,createTablesOnStartup是否在启动的时候创建数据表,默认值是true,这样每次启动都会去创建数据表了,一般是第一次启动的时候设置为true,之后改成false。

 <!-- JDBC配置 -->
        <persistenceAdapter>
            <jdbcPersistenceAdapter dataSource="#mysql-ds"  createTablesOnStartup="false"/>
        </persistenceAdapter> 
ps:数据库activemq 需要手动创建。

实际使用配置案例:使用的mq是5.14.3版本

首先配置bean:

<!-- mysql database -->
     <bean id="mysqlDs" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close">
        <property name="driverClassName" value="com.mysql.jdbc.Driver" />
		<property name="url" value="jdbc:mysql://45.78.9.159:3306/activemq?relaxAutoCommit=true"/>
        <property name="username" value="root"/>
        <property name="password" value="root"/>
        <property name="poolPreparedStatements" value="true"/>
     </bean>

注意:activemq5.14.3使用的是org.apache.commons.dbcp2.BasicDataSource,另外需要在lib目录下传入mysql的驱动mysql-connector-java-5.1.38.jar,案例中数据库是linux中的服务器地址(使用前需要新建一个activemq的数据库,使用window上的数据库可能报错)

然后再配置broker,在broker标签中配置

<persistenceAdapter>
     <jdbcPersistenceAdapter dataSource="#mysqlDs" useDatabaseLock="false"/>
</persistenceAdapter> 

五、内存消息存储
内存消息存储,会将所有的持久化消息存储在内存中,必须注意JVM使用情况以及内存限制,适用于一些能快速消费的数据量不大的小消息,当MQ关闭或者宕机,未被消费的内存消息会被清空。

配置方式 设置 broker属性值  persistent="false":

    <broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}" persistent="false">

4)安全性

一、认证
认证(Authentication):验证某个实体或者用户是否有权限访问受保护资源。

MQ提供两种插件用于权限认证:
(一)、Simple authentication plug-in:直接把相关的权限认证信息配置到XML文件中。

配置 conf/activemq.xml 的 broke元素添加插件:

        <plugins>
            <simpleAuthenticationPlugin>
                <users>
                    <authenticationUser username="admin" password="password" groups="admins,publishers,consumers"/>
                    <authenticationUser username="publisher" password="password"  groups="publishers,consumers"/>
                    <authenticationUser username="consumer" password="password" groups="consumers"/>
                    <authenticationUser username="guest" password="password"  groups="guests"/>
                </users>
            </simpleAuthenticationPlugin>
        </plugins>

代码中的认证方式两种:

1、在创建Connection的时候认证

//用户认证
Connection conn = connFactory.createConnection("admin","password");
2、也可以在创建ConnectionFactory工厂的时候认证

ConnectionFactory connFactory = new ActiveMQConnectionFactory("admin","password",url);
 

(二)、JAAS authentication plug-in:实现了JAAS API,提供了一个更强大的和可定制的权限方案。

配置方式:

1、在conf目录中创建 login.config 文件 用户 配置 PropertiesLoginModule:

activemq-domain {
    org.apache.activemq.jaas.PropertiesLoginModule required debug=true
    org.apache.activemq.jaas.properties.user="users.properties"
    org.apache.activemq.jaas.properties.group="groups.properties";
};
2、在conf目录中创建users.properties 文件用户配置用户:

# 创建四个用户
admin=password  
publisher=password 
consumer=password  
guest=password
3、在conf目录中创建groups.properties 文件用户配置用户组:

#创建四个组并分配用户
admins=admin
publishers=admin,publisher
consumers=admin,publisher,consumer
guests=guest
4、将该配置插入到activemq.xml中:

<!-- JAAS authentication plug-in -->
        <plugins>
            <jaasAuthenticationPlugin configuration="activemq-domain" />
        </plugins>
5、配置MQ的启动参数:

使用dos命令启动:

D:\tools\apache-activemq-5.6.0-bin\apache-activemq-5.6.0\bin\win64>activemq.bat -Djava.security.auth.login.config=D:/tools/apache-activemq-5.6.0-bin/apache-activemq-5.6.0/conf/login.config
6、在代码中的认证方式与Simple authentication plug-in 相同。

二、授权
基于认证的基础上,可以根据实际用户角色来授予相应的权限,如有些用户有队列写的权限,有些则只能读等等。
两种授权方式
(一)、目的地级别授权

JMS目的地的三种操作级别:
  Read :读取目的地消息权限
  Write:发送消息到目的地权限
  Admin:管理目的地的权限

配置方式  conf/activemq.xml :

<plugins>
    <jaasAuthenticationPlugin configuration="activemq-domain" />
    <authorizationPlugin>
        <map>
            <authorizationMap>
                <authorizationEntries>
                    <authorizationEntry topic="topic.ch09" read="consumers" write="publishers" admin="publishers" />
                </authorizationEntries>
            </authorizationMap>
        </map>
    </authorizationPlugin>
</plugins>

(二)、消息级别授权

授权特定的消息。

开发步骤:
1、实现消息授权插件,需要实现MessageAuthorizationPolicy接口

public class AuthorizationPolicy implements MessageAuthorizationPolicy {
    private static final Log LOG = LogFactory.
        getLog(AuthorizationPolicy.class);
    public boolean isAllowedToConsume(ConnectionContext context,
        Message message) {
        LOG.info(context.getConnection().getRemoteAddress());
        String remoteAddress = context.getConnection().getRemoteAddress();
        if (remoteAddress.startsWith("/127.0.0.1")) {
            LOG.info("Permission to consume granted");
            return true;
        } else {
        LOG.info("Permission to consume denied");
        return false;
    }
    }
}

2、把插件实现类打成JAR包,放入到activeMq 的 lib目录中

3、在activemq.xml中设置<messageAuthorizationPolicy>元素

<messageAuthorizationPolicy>
    <bean class="org.apache.activemq.book.ch6.AuthorizationPolicy" xmlns="http://www.springframework.org/schema/beans" />
</messageAuthorizationPolicy>
三、自定义安全插件
插件逻辑需要实现BrokerFilter类,并且通过BrokerPlugin实现类来安装,用于拦截,Broker级别的操作:

接入消费者和生产者
提交事务
添加和删除broker的连接
demo:基于IP地址,限制Broker连接。


package ch02.ptp;
import java.util.List;

import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerFilter;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ConnectionInfo;

public class IPAuthenticationBroker extends BrokerFilter {
    List<String> allowedIPAddresses;
    public IPAuthenticationBroker(Broker next, List<String>allowedIPAddresses) {
        super(next);
        this.allowedIPAddresses = allowedIPAddresses;
    }
    public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {
        String remoteAddress = context.getConnection().getRemoteAddress();
        if (!allowedIPAddresses.contains(remoteAddress)) {
        throw new SecurityException("Connecting from IP address "
            + remoteAddress+ " is not allowed" );
        }
        super.addConnection(context, info);
    }
}

安装插件:


package ch02.ptp;

import java.util.List;

import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerPlugin;

public class IPAuthenticationPlugin implements BrokerPlugin {
    List<String> allowedIPAddresses;
    public Broker installPlugin(Broker broker) throws Exception {
        return new IPAuthenticationBroker(broker, allowedIPAddresses);
    }
    public List<String> getAllowedIPAddresses() {
        return allowedIPAddresses;
    }
    public void setAllowedIPAddresses(List<String> allowedIPAddresses) {
        this.allowedIPAddresses = allowedIPAddresses;
    }
}

ps:将这连个类打成jar包放到activemq的lib目录下

配置自定义插件:


<plugins>
    <bean xmlns="http://www.springframework.org/schema/beans" id="ipAuthenticationPlugin"
       class="org.apache.activemq.book.ch6.IPAuthenticationPlugin">
        <property name="allowedIPAddresses">
            <list>
              <value>127.0.0.1</value>
            </list>
        </property>
    </bean>
</plugins>

5)嵌入式broke

一、如何启动active MQ 服务

(一)、使用命令启动

   a、/usr/local/activemq-5.9.0/bin 目录下 ./activemq start   默认使用conf/activemq.xml 配置文件
   b、[root@localhost bin]# ./activemq start xbean:file:../conf/activemq-slave1.xml  使用指定的配置文件启动
(二)、代码启动broker

  在程序中可以通过编码的方式启动broker,如果要启动多个broker需要为每一个broker设置名字  broker.setName("brokerOne")

1、使用BrokerService 启动broker

    public static void main(String[] args) throws Exception {
        BrokerService broker=new BrokerService();
        broker.setUseJmx(true);
        broker.addConnector("tcp://localhost:61616");
        broker.start();
    }
2、使用BrokerFactory启动broker

private static void brokerFactoryStart() throws Exception{
        String uri="properties:broker.properties";
        BrokerService broker=BrokerFactory.createBroker(new URI(uri));
        broker.addConnector("tcp://localhost:61616");
        broker.start();
    }
broker.properties:
useJmx=true
persistent=false
brokerName=Cheese
3、使用spring

spring-activemq.xml:


<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:amq="http://activemq.apache.org/schema/core"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
    http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd
    http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">
    <bean id="jmsBroker" class="org.apache.activemq.broker.BrokerService" init-method="start" destroy-method="stop">
      <property name="brokerName" value="myBroker"/>
      <property name="persistent" value="false"/>
      <property name="transportConnectorURIs">
          <list>
              <value>tcp://localhost:61616</value>
          </list>
      </property>
    </bean>
</beans>

private static void springStart() throws Exception{
        ApplicationContext context=new ClassPathXmlApplicationContext("spring-activemq.xml");
        BrokerService broker=(BrokerService) context.getBean("jmsBroker");
        broker.start();
    }

6)网络连接

网络连接模式(network connector)
针对海量消息所要求的横向扩展性和系统的高可用性,ActiveMQ提供了网络连接模式的集群功能。简单的说,就是通过把多个不同的broker实例连接在一起,作为一个整体对外提供服务,从而提高整体对外的消息服务能力。通过这种方式连接在一起的broker实例之间,可以共享队列和消费者列表,从而达到分布式队列的目的。

配置多broker示例:
一台服务器启动多个broker的案例
1、把整个conf文件夹复制一份,比如叫做ocnf2
2、修改activemq.xml文件
1)里面的brokerName不能跟原来的重复
2)数据存放的文件名称不能重复
   <kahaDB directory="${activemq.data}/kahadb2"/>
3)transportConnectors端口修改
<transportConnector name="openwire"uri="tcp://0.0.0.0:61716maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5772maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp"uri="stomp://0.0.0.0:61713maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt"uri="mqtt://0.0.0.0:1983maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/><transportConnector name="ws" uri="ws://0.0.0.0:61714?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
3、修改jetty.xml文件,修改端口
    <property name="port" value="8181"/>
4、到bin下面,复制一个activemq,比如activemq2
1)修改程序的id
ACTIVEMQ_PIDFILE="$ACTIVEMQ_DATA/activemq2.pid"
2)修改配置文件路径
 ACTIVEMQ_CONF="$ACTIVEMQ_BASE/conf2"

在61616的activemq.xml中加入静态网络链接配置:
<networkConnectors>
   <networkConnector name ="localnetwork" uri="static://(tcp://45.78.9.159:61616,tcp://45.78.9.159:61716)"/>
</networkConnectors>

启动服务 ./activemq start
        ./activemq2 start

静态网络连接,消息丢失问题:使用消息回流

一个很有意思的场景是,broker1和broker2通过networkConnector连接。一些个consumers连接到broker1,消费broker2上的消息。消息先被broker1从broker2上消费掉,然后转发给这些consumers。不幸的是转发部分消息的时候broker1重启了,这些consumers发现broker1连接失败,通过failover连接到broker2上去了,但是有一部分他们还没有消费的消息被broker2已经分发到了broker1上去了。这些消息,就好像是消失了,除非有消费者重新连接到broker1上来消费。怎么办呢?
办法就是从5.6版本destinationPolicy上新增的选项replayWhenNoConsumers。这个选项使得broker1上有需要转发的消息但是没有消费者时,把消息回流到它原始的broker。同时把enableAudit设置为false,为了防止消息回流后被当做重复消息而不被分发。

在2个broker上都配置以下信息:
<policyEntry queue="TEST.>" >
      <conditionalNetworkBridgeFilterFactory replayWhenNoConsumers="true" enableAudit="false"/>
</policyEntry>

7)容错机制

failover://tcp://45.78.9.159:61616
failover连接失败后,随机连接另一个tcp

8) activemq集群的方案:

方案一:基于静态网络(或者动态网络)连接的方式,(如上所述的静态网络连接示例);

方案二:基于master、slave(主从方式)

1)Shared File System Master Slave:基于共享存储的Mater-Salve:多个broker实例使用一个存储文件,谁拿到文件锁就是master,其它处于待启动状态,如果master挂掉了,抢到文件锁的slave就会变成master;

2)JDBC Master Slave:基于JDBC的Master-Slave:使用同一个数据库,拿到Lock表的写锁的broker成为master(只要配置jdbc存储形式,多个broker连接同一个数据库,就是集群模式了);

3)Replicated LevelDB Store:基于ZooKeeper复制LevelDB存储的Master-Slave机制;

9)使用LevelDB Store进行activemq的集群案例:

(1)zookeeper集群方案(参考zookeeper集群文章):

机器ip              

192.168.2.51

192.168.2.52

192.168.2.53

(2)activemq集群方式(为了方便,在192.168.2.51机器上进行3个文件的配置,达到集群的目的):


1、新建activemqCluster目录,复制3个activemq完整文件,分别命名为node1、node2、node3;

2、分别修改node1、node2、node3的jetty.xml的端口:

   <bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
             <!-- the default port number for the web console -->
        <property name="host" value="0.0.0.0"/>
        <property name="port" value="8161"/> 端口位置
   </bean>

   node1     8161

   node2     8162

   node3     8163

3、分别修改node1、node2、node3的activemq.xml文件

1)、broker的name(3个必须改成一致的)

 <broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}">

  修改brokerName="activemq-cluster"

2)、持久化方案改成levelDb

  <persistenceAdapter>
                 <kahaDB directory="${activemq.data}/kahadb"/>
  </persistenceAdapter>


  将以上默认的配置修改成:


  node1:

  <persistenceAdapter>
	<replicatedLevelDB
	directory="${activemq.data}/leveldb"
	replicas="3"
	bind="tcp://0.0.0.0:62621"
	zkAddress="192.168.2.51:2181,192.168.2.52:2181,192.168.2.53:2181"
	hostname="bhz111"
	zkPath="/activemq/leveldb-stores"
	/>
  </persistenceAdapter>

  node2:

  <persistenceAdapter>
	<replicatedLevelDB
	directory="${activemq.data}/leveldb"
	replicas="3"
	bind="tcp://0.0.0.0:62622"
	zkAddress="192.168.2.51:2181,192.168.2.52:2181,192.168.2.53:2181"
	hostname="bhz111"
	zkPath="/activemq/leveldb-stores"
	/>
  </persistenceAdapter>

  node3:

  <persistenceAdapter>
	<replicatedLevelDB
	directory="${activemq.data}/leveldb"
	replicas="3"
	bind="tcp://0.0.0.0:62623"
	zkAddress="192.168.2.51:2181,192.168.2.52:2181,192.168.2.53:2181"
	hostname="bhz111"
	zkPath="/activemq/leveldb-stores"
	/>
  </persistenceAdapter>

  注:replicas="3" 3台集群

      bind="tcp://0.0.0.0:62621" 对应activemq对应启动的端口

      zkAddress="192.168.2.51:2181,192.168.2.52:2181,192.168.2.53:2181"  zookeeper集群的地址

      hostname="bhz111"  主机名,可以通过指令hostname查看,修改对应的名称

3、修改activemq的启动端口

   <transportConnectors>
            <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
            <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
   </transportConnectors>


   目前我们使用的是 <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>

   只修改该端口即可(端口与之前配置的levelDb对应):

   node1: <transportConnector name="openwire" uri="tcp://0.0.0.0:62621?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>

   node2: <transportConnector name="openwire" uri="tcp://0.0.0.0:62622?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>

   node3: <transportConnector name="openwire" uri="tcp://0.0.0.0:62623?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>


以上zookeeper集群和activemq集群搭建完成,启动顺序:

先启动zookeeper的3台机器,在分别启动activemq的3个node;

在使用 ./zkCli,进入zookeeper客户端,输入 cd /zookeeper ,发现除了zookeeper还有activemq,说明搭建完成;

在java代码段,使用:

"failover:(tcp://192.168.2.51:62621,tcp://192.168.2.51:62622,tcp://192.168.2.51:62623)?randomize=false"

4、activemq使用一:单机运行版

1) pom文件

<!--activemq-->
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
            <version>5.14.3</version>
        </dependency>

2)java代码

消息发送:

package com.wtdig.product;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;
import java.util.concurrent.atomic.AtomicInteger;

public class Producter {

    //ActiveMq 的默认用户名
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
    //ActiveMq 的默认登录密码
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    //ActiveMQ 的链接地址ActiveMQConnection.DEFAULT_BROKER_URL
    private static final String BROKEN_URL = "failover://tcp://45.78.9.159:61616";

    AtomicInteger count = new AtomicInteger(0);
    //链接工厂
    ConnectionFactory connectionFactory;
    //链接对象
    Connection connection;
    //事务管理
    Session session;
    ThreadLocal<MessageProducer> threadLocal = new ThreadLocal<>();

    public void init(){
        try {
            //创建一个链接工厂
            connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEN_URL);
            //从工厂中创建一个链接
            connection  = connectionFactory.createConnection();
            //开启链接
            connection.start();
            //创建一个事务(这里通过参数可以设置事务的级别)
            session = connection.createSession(true,Session.SESSION_TRANSACTED);
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

    public void sendMessage(String disname){
        try {
            //创建一个消息队列
            Queue queue = session.createQueue(disname);
            //消息生产者
            MessageProducer messageProducer = null;
            if(threadLocal.get()!=null){
                messageProducer = threadLocal.get();
            }else{
                messageProducer = session.createProducer(queue);
                threadLocal.set(messageProducer);
            }
           while(true){
                Thread.sleep(1000);
                int num = count.getAndIncrement();
                //创建一条消息
                TextMessage msg = session.createTextMessage(Thread.currentThread().getName()+
                        "productor:我是大帅哥,我现在正在生产东西!,count:"+num);
                System.out.println(Thread.currentThread().getName()+
                        "productor:我是大帅哥,我现在正在生产东西!,count:"+num);
                //发送消息
                messageProducer.send(msg);
                //提交事务
                session.commit();
            }
        } catch (JMSException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

消息消费:
package com.wtdig.consumer;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;
import java.util.concurrent.atomic.AtomicInteger;

public class Comsumer {

    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;

    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    //ActiveMQConnection.DEFAULT_BROKER_URL
    private static final String BROKEN_URL = "failover://tcp://45.78.9.159:61616";

    ConnectionFactory connectionFactory;

    Connection connection;

    Session session;

    ThreadLocal<MessageConsumer> threadLocal = new ThreadLocal<>();
    AtomicInteger count = new AtomicInteger();

    public void init(){
        try {
            connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEN_URL);
            connection  = connectionFactory.createConnection();
            connection.start();
            session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }


    public void getMessage(String disname){
        try {
            Queue queue = session.createQueue(disname);
            MessageConsumer consumer = null;

            if(threadLocal.get()!=null){
                consumer = threadLocal.get();
            }else{
                consumer = session.createConsumer(queue);
                threadLocal.set(consumer);
            }
            while(true){
                Thread.sleep(1000);
                TextMessage msg = (TextMessage) consumer.receive();
                if(msg!=null) {
                    msg.acknowledge();
                    System.out.println(Thread.currentThread().getName()+": Consumer:我是消费者,我正在消费Msg"+msg.getText()+"--->"+count.getAndIncrement());
                }else {
                    break;
                }
            }
        } catch (JMSException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

测试消息发送:
package com.wtdig.MqTest;

import com.wtdig.product.Producter;

public class TestMq {
    public static void main(String[] args) {
        Producter producter = new Producter();
        producter.init();
        TestMq testMq = new TestMq();
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //Thread 1
        new Thread(testMq.new ProductorMq(producter)).start();
        //Thread 2
        new Thread(testMq.new ProductorMq(producter)).start();
        //Thread 3
        new Thread(testMq.new ProductorMq(producter)).start();
        //Thread 4
        new Thread(testMq.new ProductorMq(producter)).start();
        //Thread 5
        new Thread(testMq.new ProductorMq(producter)).start();
    }

    private class ProductorMq implements Runnable {
        Producter producter;

        public ProductorMq(Producter producter) {
            this.producter = producter;
        }

        @Override
        public void run() {
            while (true) {
                try {
                    producter.sendMessage("Jaycekon-MQ");
                    Thread.sleep(10000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

测试消息消费:

package com.wtdig.MqTest;

import com.wtdig.consumer.Comsumer;

public class TestConsumer {
    public static void main(String[] args){
        Comsumer comsumer = new Comsumer();
        comsumer.init();
        TestConsumer testConsumer = new TestConsumer();
        new Thread(testConsumer.new ConsumerMq(comsumer)).start();
        new Thread(testConsumer.new ConsumerMq(comsumer)).start();
        new Thread(testConsumer.new ConsumerMq(comsumer)).start();
        new Thread(testConsumer.new ConsumerMq(comsumer)).start();
    }

    private class ConsumerMq implements Runnable{
        Comsumer comsumer;
        public ConsumerMq(Comsumer comsumer){
            this.comsumer = comsumer;
        }

        @Override
        public void run() {
            while(true){
                try {
                    comsumer.getMessage("Jaycekon-MQ");
                    Thread.sleep(10000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

3、activemq使用二:与spring的整合

1)pom文件

<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.wtdig</groupId>
    <artifactId>springactivemq</artifactId>
    <version>1.0</version>
    <packaging>war</packaging>

    <name>springactivemq Maven Webapp</name>
    <!-- FIXME change it to the project's website -->
    <url>http://www.example.com</url>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.7</maven.compiler.source>
        <maven.compiler.target>1.7</maven.compiler.target>
        <jsp.version>2.2</jsp.version>
        <jstl.version>1.2</jstl.version>
        <servlet.version>2.5</servlet.version>
        <!-- Spring -->
        <spring-framework.version>3.2.8.RELEASE</spring-framework.version>
        <!-- Logging -->
        <logback.version>1.0.13</logback.version>
        <slf4j.version>1.7.5</slf4j.version>
        <!-- Test -->
        <junit.version>4.11</junit.version>
    </properties>
    <dependencies>
        <!--spring-->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
            <version>${spring-framework.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context-support</artifactId>
            <version>${spring-framework.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-web</artifactId>
            <version>${spring-framework.version}</version>
        </dependency>
        <!--quartz定时任务-->
        <dependency>
            <groupId>org.quartz-scheduler</groupId>
            <artifactId>quartz</artifactId>
            <version>1.8.5</version>
        </dependency>
        <!-- Spring MVC -->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-webmvc</artifactId>
            <version>${spring-framework.version}</version>
        </dependency>
        <!-- Other Web dependencies -->
        <dependency>
            <groupId>javax.servlet</groupId>
            <artifactId>jstl</artifactId>
            <version>${jstl.version}</version>
        </dependency>
        <dependency>
            <groupId>javax.servlet</groupId>
            <artifactId>servlet-api</artifactId>
            <version>${servlet.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>javax.servlet.jsp</groupId>
            <artifactId>jsp-api</artifactId>
            <version>${jsp.version}</version>
            <scope>provided</scope>
        </dependency>
        <!-- Spring and Transactions -->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-tx</artifactId>
            <version>${spring-framework.version}</version>
        </dependency>
        <dependency>
            <groupId>javax.annotation</groupId>
            <artifactId>jsr250-api</artifactId>
            <version>1.0</version>
        </dependency>
        <!--activemq-->
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
            <version>5.14.3</version>
        </dependency>
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-pool</artifactId>
            <version>5.14.3</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-jms</artifactId>
            <version>${spring-framework.version}</version>
        </dependency>
    </dependencies>
    <build>
        <finalName>springactivemq</finalName>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <!-- 设置JDK版本 -->
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

  1. web.xml文件
<!DOCTYPE web-app PUBLIC
        "-//Sun Microsystems, Inc.//DTD Web Application 2.3//EN"
        "http://java.sun.com/dtd/web-app_2_3.dtd" >

<web-app>
    <display-name>Archetype Created Web Application</display-name>
    <context-param>
        <param-name>contextConfigLocation</param-name>
        <param-value>
            classpath:spring-mq.xml,
            classpath:spring-job.xml
        </param-value>
    </context-param>
    <filter>
        <filter-name>encodingFilter</filter-name>
        <filter-class>org.springframework.web.filter.CharacterEncodingFilter</filter-class>
        <init-param>
            <param-name>encoding</param-name>
            <param-value>utf-8</param-value>
        </init-param>
    </filter>
    <filter-mapping>
        <filter-name>encodingFilter</filter-name>
        <url-pattern>/*</url-pattern>
    </filter-mapping>
    <listener>
        <listener-class>org.springframework.web.context.ContextLoaderListener</listener-class>
    </listener>
</web-app>

  1. activemq配置文件和定时器配置文件

spring-mq.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
       http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.8.0.xsd">

    <!-- Activemq 连接工厂 -->
    <bean id="activeMQConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <constructor-arg value="admin"/>
        <constructor-arg value="admin"/>
        <constructor-arg value="failover:(tcp://45.78.9.159:61616)?timeout=2000"/>
    </bean>

    <!-- ConnectionFactory Definition -->
    <bean id="connectionFactory"
          class="org.springframework.jms.connection.CachingConnectionFactory">
        <constructor-arg ref="activeMQConnectionFactory"/>
    </bean>

    <!-- Default Destination Queue Definition -->
    <!-- 测试配置多个Destination -->
    <bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg index="0" value="test.activemq.queue"/>
    </bean>

    <!-- JmsTemplate Definition -->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="connectionFactory"/>
        <property name="defaultDestination" ref="destination"/>
    </bean>

    <!-- Message Sender Definition -->
    <bean id="messageSender" class="com.wtdig.message.MessageSender">
        <constructor-arg index="0" ref="jmsTemplate"/>
        <constructor-arg index="1" ref="destination"/>
    </bean>

    <!-- 消息监听器,主要监听的目的地址 Message Receiver Definition -->
    <bean id="messageReceiver" class="com.wtdig.listen.MessageReceiver">
    </bean>
    <bean class="org.springframework.jms.listener.SimpleMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory"/>
        <property name="destinationName" value="test.activemq.queue"/>
        <property name="messageListener" ref="messageReceiver"/>
    </bean>

</beans>

spring-job.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
    http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-2.5.xsd">

    <context:annotation-config/>
    <bean id="QuartzFactoryBean"
          class="org.springframework.scheduling.quartz.SchedulerFactoryBean">
        <property name="triggers">
            <list>
                <ref bean="capacityDataPublisherJobTrigger"/>
            </list>
        </property>
    </bean>

    <bean id="capacityDataPublisherJob" class="com.wtdig.job.TestSenderService"
          init-method="run">
    </bean>

    <bean id="capacityDataPublisherJobTrigger" class="org.springframework.scheduling.quartz.CronTriggerBean">
        <property name="jobDetail" ref="capacityDataPublisherJobDetail"/>
        <property name="cronExpression">
            <!--一分钟执行一次-->
            <value>0 0/1 * * * ?</value>
        </property>
    </bean>
    <bean id="capacityDataPublisherJobDetail"
          class="org.springframework.scheduling.quartz.MethodInvokingJobDetailFactoryBean">
        <property name="targetObject" ref="capacityDataPublisherJob"/>
        <property name="targetMethod" value="run"/>
        <property name="concurrent" value="false"/>
    </bean>

</beans>

3)java代码

消息发送

package com.wtdig.message;

import org.springframework.jms.core.JmsTemplate;

import javax.jms.Destination;

/**
 * mq的消息发送
 */
public class MessageSender {

    private final JmsTemplate jmsTemplate;
    private final Destination destination;

    public MessageSender(final JmsTemplate jmsTemplate, final Destination destination) {
        this.jmsTemplate = jmsTemplate;
        this.destination = destination;
    }

    public void send(final String text) {
        try {
            jmsTemplate.setDefaultDestination(destination);
            jmsTemplate.convertAndSend(text);
            System.out.println("发送消息 : " + text+System.currentTimeMillis());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

消息的接受,接听器方式

package com.wtdig.listen;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

/**
 * mq的消息接受消费
 */
public class MessageReceiver implements MessageListener {

    public void onMessage(Message message) {
        if (message instanceof TextMessage) {
            TextMessage textMessage = (TextMessage) message;
            try {
                String text = textMessage.getText();
                System.out.println("接收到消息: " + text+System.currentTimeMillis());
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }

}

定时器的启动执行

package com.wtdig.job;

import com.wtdig.message.MessageSender;
import org.springframework.beans.factory.annotation.Autowired;

/**
 * 定时任务启动类
 */
public class TestSenderService {

    @Autowired
    private MessageSender messageSender;

    public void run() {
        messageSender.send("message");
    }

}

配置完成,直接启动服务即可,查看控制台打印的信息

⚠️ **GitHub.com Fallback** ⚠️