JMS - DmitryGontarenko/usefultricks GitHub Wiki
MQ (message queue, очередь сообщений) - популярный способ поддержки взаимодействия между приложениями. В Java стандартом для подключения к серверу MQ с целью отправки и получения сообщения является JMS. Сервер MQ поддерживает список очередей и топиков (тем), для который приложения могут подключаться, отправлять и получать сообщения.
JMS (Java Message Service) - является стандартом обмена сообщениями между приложениями. Программное обеспечение, используемое для передачи сообщений между приложениями по стандарту JMS, формирует очереди сообщений. JMS поддерживает две модели обмена сообщениями: «точка-точка» и «публикация-подписка».
Коммуникация между компонентами, использующими JMS, асинхронна (процедура не дожидается ответа на своё сообщение) и независима от исполнения компонентов.
Queue (очередь) - очередь сообщений применяется для поддержки модели обмена сообщениями вида "точка-точка" (p2p). Когда генератор отправляет сообщения в очередь, сервер MQ сохраняет это сообщение в очереди и доставляет его одному и только одному потребителю при следующем его подключении. В этой модели, как только сообщение получено потребителем, оно уничтожается в соответствии с моделью обмена сообщениями p2p.
Topic (тема) - используется для поддержки модели обмена сообщения вида "публикация-подписка". На сообщение внутри темы может подписываться любое количество клиентов. Когда сообщение поступает в заданную тему, сервер MQ доставляет его всем клиентам, которые на него подписаны.
Для того, что бы работать с JMS, необходимо установить Message Broker.
Заходим на сайт ActiveMQ - https://activemq.apache.org/download.
Выбираем необходимую версию (на момент написания это ActiveMQ 5), скачиваем и распаковываем архив для Windows.
Перед запуском проверьте, какая версия Java у вас установлена, это можно сделать командой java -version
. Для работы необходима как минимум Java 8!
Запуск ActiveMQ производится командой - apache-activemq-5.15.10\bin activemq start
Для того что бы быстро зайти через консоль в папку bin, находясь в ней нужно выделить путь и набрать cmd
.
Так же можно установить ActiveMQ в качестве Службы Windows - для этого необходимо зайти в папку win 32/64 и запустить InstallService.bat apache-activemq-5.15.10\bin\win64 InstallService.bat
, для удаления UninstallService.bat соответственно. Желательно делать это так же через консоль, для того что бы видеть возможные исключения.
Заходим в Службы и находим ActiveMQ, через контекстное меню нажимаем Запустить.
Так же важно убедиться в том, что путь до запускаемого файла activemq
не содержит букв русского алфавита, иначе получите подобную ошибку при запуске - Illegal character in hostname at index 10: ws://Admin-ПК:61614
.
Все логи при запуске можно посмотреть в консоли, но удобнее это сделать через файл, он находится по по пути apache-activemq-5.15.10\data activemq.log
.
Если все запустилось - открываем браузер и проверяем http://localhost:8161/index.html. Переходим по одной из двух гиперссылок Manage ActiveMQ broker
на панель администрирования, данные для входа - admin, admin.
Если вы зашли на главную страницу ActiveMQ, но при переходе по ссылке Manage ActiveMQ broker
начинается бесконечная загрузка - отключите рекламные блокировщики или попробуйте другой браузер.
Теперь с ActiveMQ можно работать.
Официальная документация
Настраиваем конфигурацию в application.properties
:
spring.activemq.broker-url=tcp://localhost:61616
spring.activemq.user=admin
spring.activemq.password=admin
Класс Application
:
@SpringBootApplication
@EnableJms
public class Application {
public static void main(String[] args) {
SpringApplication.run(JmsspringApplication.class, args);
}
}
Создаем класс Listener
:
@Component
public class Listener {
@JmsListener(destination = "inbound.queue")
@SendTo("outbound.queue")
public String receiveMessage(Message message) throws JMSException {
TextMessage textMessage = (TextMessage) message; // получит из очереди inbound.queue
System.out.println("Received message " + textMessage.getText());
String jsonMessage = textMessage.getText();
Map map = new Gson().fromJson(jsonMessage, Map.class);
return "Hello " + map.get("name"); // вернет в очередь outbound.queue
}
}
Класс Listener
отвечает за прослушивание сообщений из входящей очереди и их обработку.
Аннотация @JmsListener
помечает метод как цель для прослушивания JMS-сообщений в указанном месте назначения, в данном случае это - inbound.queue
.
Аннотация @SendTo
отвечает за отправку возвращаемого значения метода receiveMessage()
в пункт назначения, в данном случае это - outbound.queue
.
Метод receiveMessage()
получает сообщение в формате json из очереди inbound.queue, помещает его в коллекцию типа Map и возвращает в очередь outbound.queue сообщения типа "Hello + name".
Проверим работу этого метода, для этого зайдем на панель администрирования ActiveMQ, раздел Queues, выберем нужную нам очередь - inbound.queue и в столбце Operations нажмем Send To. Появится форма отправки сообщения, введем в Message body сообщение типа json:
{
"name": "John",
"age": 23
}
После того, как мы нажали Send, возвращаемся обратно в раздел Queues и выбираем очередь outbound.queue. Заходим в последнее полученное сообщение и в поле Message Details видим - Hello John.
Так же в консоли запущенного приложения можно увидеть:
Received message {
"name": "John",
"age": 23
}
Spring-boot по умолчанию описывает поведение очереди сообщений (p2p), но если мы хотим воспользоваться типом publish-subscribe - это необходимо будет сделать вручную:
@Configuration
public class JmsConfig {
@Bean
public ActiveMQConnectionFactory connectionFactory(){
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
connectionFactory.setBrokerURL("tcp://localhost:61616");
connectionFactory.setPassword("admin");
connectionFactory.setUserName("admin");
return connectionFactory;
}
@Bean
public JmsTemplate jmsTemplate(){
JmsTemplate template = new JmsTemplate();
template.setConnectionFactory(connectionFactory());
template.setPubSubDomain(true);
return template;
}
@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
factory.setPubSubDomain(true);
factory.setConcurrency("1-1");
return factory;
}
}
Именно метод setPubSubDomain(boolean)
отвечает за "перевод" поведения публикации сообщений, если установлен параметр true
- работает метод publish-subscribe, если false
- point-to-point.
Все остальная конфигурация остается неизменной, единственное что можно поменять для лучшего восприятия - это название топика в классе Listener
с inbound.queue
на inbound.topic
.
Теперь на наш топик inbound.topic
может быть подписано сколько угодно подписчиков и все они будут получать новые опубликованные сообщения. К тому же, количество подписчиков в данный момент можно легко проверить, для этого в ActiveMQ на вкладке Topics находим свою созданную очередь inbound.topic и в столбце Operation нажимаем Subscribers, там должны отображаться все методы, которые подписаны на данный топик.
Реализуем класс слушателя SimpleListener
:
public class SimpleListener implements MessageListener {
@Override
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
System.out.println("Message received: " + textMessage.getText());
}
}
Далее реализуем еще один класс, который будет отправлять сообщения (предварительно создав интерфейс):
@Component("messageSender")
public class MessageSenderImpl implements MessageSender {
@Autowired
private JmsTemplate jmsTemplate;
@Override
public void sendMessage(String message) {
jmsTemplate.send(new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage(message);
}
});
}
}
Теперь настроим контекст приложения application-context.xml
:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:jms="http://www.springframework.org/schema/jms"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:p="http://www.springframework.org/schema/p"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/jms
http://www.springframework.org/schema/jms/spring-jms.xsd">
<context:component-scan base-package="com.home.jmsspring.spring4"/>
<bean id="connectionFactory"
class="org.apache.activemq.ActiveMQConnectionFactory"
p:brokerURL="tcp://localhost:61616" />
<bean id="simpleListener"
class="com.home.jmsspring.spring4.SimpleListener" />
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<constructor-arg name="connectionFactory" ref="connectionFactory" />
<property name="defaultDestinationName" value="simple.queue" />
</bean>
<jms:listener-container container-type="default"
connection-factory="connectionFactory" acknowledge="auto">
<jms:listener destination="simple.queue" ref="simpleListener" method="onMessage" />
</jms:listener-container>
</beans>
Дескриптор <jms:listener-container>
служит для объявления прослушивателя сообщения с указанием получателя (т.е. очереди, в данном случае - simple.queue
), ссылки на бин и метод.
Теперь в классе Application
попробуем отправим несколько сообщения в очередь:
public class Application {
public static void main(String[] args) {
GenericXmlApplicationContext context = new GenericXmlApplicationContext();
context.load("classpath:application-context.xml");
context.refresh();
MessageSender messageSender = context.getBean("messageSender", MessageSender.class);
for (int i = 0; i < 10; i++) {
messageSender.sendMessage("Test message: " + i);
}
}
}
В консоле запущенного приложения можно увидеть ряд сообщений - Message received: Test message: n
. Это значит что сообщения были успешно отправлены и "прочитаны" слушателем.
Если в spring-конфигурации у дескриптора <jms:listener>
изменить атрибут destination
на несуществующую очередь, все 10 сообщений, который мы генерируем с помощью цикла, останутся в очереди simple.queue
до момента получения их потребителем.