Messaging - fidransky/kiv-pia-labs GitHub Wiki
TOTD:
- learn about messaging and application events
- pass message from application to message broker and back
- finish streaming WS API implementation
Generally, messaging is a programming technique allowing us to add indirection to our source code. Instead of making service A call service B directly, we can make service A produce messages and service B consume messages. The part sitting in between, actually handling message delivery is called a message broker (aka message queue, MQ).
This is publish-subscribe messaging model in practice. Effectively, there may be many modules producing messages and many modules consuming them - which is not always desirable. Most message brokers provide two message delivery strategies:
- topic - each produced message is delivered to all subscribed consumers (possibly multiple times)
- queue - each produced message is delivered to one subscribed consumer (possibly multiple times)
The strategy determines the destination that messages are published to and consumed from. Additionally to consuming messages sent to a pre-determined destination, message brokers also allow consumers to specify destinations with wildcards. See Artemis documentation for examples.
Message brokers use protocols for communication with its clients (publishers, subscribers) and there's many of them available (MQTT, STOMP, OpenWire, ...). Fortunately, message brokers usually support all of them and allow us to mix and match protocols used by all parties - use MQTT for publishers, STOMP for subscribers and leave it up to the message broker to handle the differences.
Similarly to messaging in inter-service communication, most frameworks come with some support of application events allowing us to enjoy the same benefits within a single service (= application).
In Spring, we use ApplicationEventPublisher
interface to publish events. On the other end, we use @EventListener
annotation to listen for events.
While this technique surely helps to reduce code coupling, it doesn't scale. Since the events are only delivered in given application, they never reach past it. If there's multiple replicas of the same service, they can't access each other's events. You still need an external message broker for that.
Create a new DamageReportedEvent
class as a wrapper of a new Damage
.
Use DI to autowire ApplicationEventPublisher
to DamageService
.
Extend the DamageService.create
method to use ApplicationEventPublisher
to publish the DamageReportedEvent
when damage is successfully created.
Create a new @Service
class with a single method accepting DamageReportedEvent
as its only parameter. The name of the class and the method is not important.
Make the method log damage impaired user.
Make the method listen for DamageReportedEvent
application events using @EventListener
annotation.
Start ActiveMQ Artemis message broker using Docker: https://activemq.apache.org/components/artemis/documentation/latest/docker.html#official-images
Note the port mappings:
- port 8161 serves broker management UI
- port 61616 allows JMS connections
Go to http://localhost:8161 and explore the broker management UI.
Add org.springframework.boot:spring-boot-starter-artemis
dependency to pia-labs-core module.
Configure JMS connection using Spring Boot properties:
spring.artemis.mode=native
spring.artemis.broker-url=tcp://localhost:61616
spring.artemis.user=artemis
spring.artemis.password=artemis
Extend your MessageListener
class to autowire JmsTemplate
bean.
Use JmsTemplate.convertAndSend
method to forward the reported damage to the message broker with destination set to kiv.pia.labs.damage.{damageId}
(replace the damageId
with actual value).
Now, JmsTemplate
fails to send the message because it cannot be serialized. To fix it, make all the classes Serializable
.
With the message broker running in Docker, open its management UI at http://localhost:8161.
Start your app as usual and report some damage - either via UI or WS API.
Go to http://localhost:8161/console/artemis/artemisQueues and check that some queues were created and there are some messages enqueued.
Warning
This step is not updated for the 2024/2025 course.
Add org.springframework.boot:spring-boot-starter-websocket
dependency to bikesharing-graphql-api module.
Add messageSink
property to the Room
domain class. This is a Reactive data type serving as both message sink and message emitter:
private final Sinks.Many<Message> messageSink = Sinks.many().multicast().onBackpressureBuffer();
Add a new streamMessages
method to the Room
domain class returning the messageSink
.
Add a new streamRoomMessages
method to the RoomService
class. Implement it similarly to the getRoomMessages
method but call the newly created Room.streamMessages
method instead.
Update streamRoomMessages
method (annotated with @SubscriptionMapping
) in GraphQLController
to use the newly implemented RoomService.streamRoomMessages
method.
Update streamRoomMessages
method in MessageController
to use the newly implemented RoomService.streamRoomMessages
method.
With both the message broker and the application running, open GraphiQL at http://localhost:8080/pia-labs/spring/graphiql.
Run following GraphQL query with roomId
variable set to ID of your choice:
subscription streamRoomMessages($roomId: ID!) {
streamRoomMessages(roomId: $roomId) {
id
text
}
}
Examples:
- commit 3b43695 - client-side UI streaming messages from message broker using STOMP via WebSocket
- commit 19fad94 - server-side UI streaming messages from message broker using STOMP via WebSocket
- commit 68750d9 - client-side UI streaming messages from REST WS API using SSE (server-sent events)
- https://mailpit.axllent.org/docs/install/docker/
- https://activemq.apache.org/components/artemis/documentation/latest/docker.html#official-images
- https://stomp-js.github.io/guide/stompjs/using-stompjs-v5.html
- https://activemq.apache.org/components/artemis/documentation/latest/wildcard-syntax.html#wildcard-syntax
- https://docs.spring.io/spring-framework/docs/current/javadoc-api/org/springframework/jms/annotation/JmsListener.html