Notes - nats-io/nats-jms-bridge GitHub Wiki
Notes
TODO List and ideas
- Create Spring Boot project with metrics
- Get basic JMS send to JMS recieve to Nats Send to Nats recieve example working DONE
- Install ActiveMQ and configure it DONE
- Install IBM MQ and use it in JMS/Nats example
- Create base class that has headers and such for Message envelop
- Set up Spring Actuator for metrics and health checks
- Perhaps use DefaultMessageListenerContainer (this may not work well if we need to dynamically configure a lot of these)
- Only ack messages if successfully sent to Nats (and maybe also flushed) (REQ)
- Might not use DefaultMessageListenerContainer
- Next step get basic Request/Reply example working in JMS: 1, 2, 3, 4 DONE
- Next step get basic Request/Reply example working in Nat.io (simple java example from site did not work) DONE
- Set up TLS for Nats and Nats Server set up for TLS
Nats.io
- Reqs
- Nats MQ bridge
- https://docs.nats.io/developing-with-nats/tutorials/reqreply
- https://mvnrepository.com/artifact/io.nats/jnats/2.6.6
- ActiveMQ
- JMS
- IBM MQ
- Nats Java Client
- IBM MQ JMS support
- Local Dashboard
- Artemis All
- Spring 3 JMS Docs
- DefaultMessageListenerContainer
- Spring JMS guide
- Nats Docker
JMS request / reply
The JMS API also enables you to create destinations (TemporaryQueue and TemporaryTopic objects) that last only for the duration of the connection in which they are created. You create these destinations dynamically using the Session.createTemporaryQueue and the Session.createTemporaryTopic methods. https://docs.oracle.com/cd/E19798-01/821-1841/bncgb/index.html
The only message consumers that can consume from a temporary destination are those created by the same connection that created the destination. Any message producer can send to the temporary destination. If you close the connection that a temporary destination belongs to, the destination is closed and its contents are lost. https://docs.oracle.com/cd/E19798-01/821-1841/bncgb/index.html
JMS Reply/Request example
try-nats/build.gradle
plugins {
id 'java'
id "application"
}
application {
mainClassName = 'com.cloudurable.nats.SendJMS'
}
group 'com.cloudurable'
version '1.0-SNAPSHOT'
sourceCompatibility = 1.11
repositories {
mavenCentral()
}
dependencies {
testCompile group: 'junit', name: 'junit', version: '4.12'
compile group: 'io.nats', name: 'jnats', version: '2.6.6'
compile group: 'org.apache.activemq', name: 'artemis-jms-client-all', version: '2.11.0'
}
try-nats/gradlew.bat
@if "%DEBUG%" == "" @echo off
@rem ##########################################################################
@rem
@rem Gradle startup script for Windows
@rem
@rem ##########################################################################
@rem Set local scope for the variables with windows NT shell
if "%OS%"=="Windows_NT" setlocal
set DIRNAME=%~dp0
if "%DIRNAME%" == "" set DIRNAME=.
set APP_BASE_NAME=%~n0
set APP_HOME=%DIRNAME%
@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
set DEFAULT_JVM_OPTS="-Xmx64m"
@rem Find java.exe
if defined JAVA_HOME goto findJavaFromJavaHome
set JAVA_EXE=java.exe
%JAVA_EXE% -version >NUL 2>&1
if "%ERRORLEVEL%" == "0" goto init
echo.
echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH.
echo.
echo Please set the JAVA_HOME variable in your environment to match the
echo location of your Java installation.
goto fail
:findJavaFromJavaHome
set JAVA_HOME=%JAVA_HOME:"=%
set JAVA_EXE=%JAVA_HOME%/bin/java.exe
if exist "%JAVA_EXE%" goto init
echo.
echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME%
echo.
echo Please set the JAVA_HOME variable in your environment to match the
echo location of your Java installation.
goto fail
:init
@rem Get command-line arguments, handling Windows variants
if not "%OS%" == "Windows_NT" goto win9xME_args
:win9xME_args
@rem Slurp the command line arguments.
set CMD_LINE_ARGS=
set _SKIP=2
:win9xME_args_slurp
if "x%~1" == "x" goto execute
set CMD_LINE_ARGS=%*
:execute
@rem Setup the command line
set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar
@rem Execute Gradle
"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %CMD_LINE_ARGS%
:end
@rem End local scope for the variables with windows NT shell
if "%ERRORLEVEL%"=="0" goto mainEnd
:fail
rem Set variable GRADLE_EXIT_CONSOLE if you need the _script_ return code instead of
rem the _cmd.exe /c_ return code!
if not "" == "%GRADLE_EXIT_CONSOLE%" exit 1
exit /b 1
:mainEnd
if "%OS%"=="Windows_NT" endlocal
:omega
try-nats/settings.gradle
rootProject.name = 'try-nats'
try-nats/gradle/wrapper/gradle-wrapper.properties
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-5.2.1-bin.zip
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
try-nats/src/main/resources/jndi.properties
java.naming.factory.initial=org.apache.activemq.artemis.jndi.ActiveMQInitialContextFactory
connectionFactory.ConnectionFactory=tcp://localhost:61616
queue.queue/testQueue=testQueue
try-nats/src/main/java/com/cloudurable/nats/SendJMS.java
package com.cloudurable.nats;
import javax.jms.*;
import javax.naming.InitialContext;
import javax.naming.NamingException;
public class SendJMS {
public static void main(String... args) throws NamingException, JMSException, InterruptedException {
var context = new InitialContext();
var factory = (ConnectionFactory)context.lookup("ConnectionFactory");
var orderQueue = (Queue)context.lookup("dynamicQueues/testQueue");
var connection = factory.createConnection();
var session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
var producer = session.createProducer(orderQueue);
connection.start();
while (true ) {
Thread.sleep(1000);
TextMessage message = session.createTextMessage("This is an order no reply ");
producer.send(message);
}
}
}
try-nats/src/main/java/com/cloudurable/nats/GetJMS.java
package com.cloudurable.nats;
import io.nats.client.Nats;
import javax.jms.*;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
public class GetJMS {
public static void main(String... args) throws NamingException, JMSException, InterruptedException, IOException {
var nc = Nats.connect("nats://localhost:4222");
var context = new InitialContext();
var factory = (ConnectionFactory) context.lookup("ConnectionFactory");
var orderQueue = (Queue) context.lookup("dynamicQueues/testQueue");
var connection = factory.createConnection();
var session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
var consumer = session.createConsumer((Destination) orderQueue);
connection.start();
while (true) {
Thread.sleep(1000);
TextMessage receivedMessage = (TextMessage)consumer.receive();
System.out.println("Got message: " + receivedMessage.getText());
nc.publish("test", receivedMessage.getText().getBytes(StandardCharsets.UTF_8));
}
}
}
try-nats/src/main/java/com/cloudurable/nats/SendMain.java
package com.cloudurable.nats;
import io.nats.client.Nats;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.concurrent.TimeoutException;
public class SendMain {
public static void main(String... args) throws IOException, InterruptedException, TimeoutException {
var nc = Nats.connect("nats://localhost:4222");
nc.publish("updates", "All is Well".getBytes(StandardCharsets.UTF_8));
nc.flush(Duration.ZERO);
nc.close();
}
}
try-nats/src/main/java/com/cloudurable/nats/SubscribeMain.java
package com.cloudurable.nats;
import io.nats.client.Nats;
import io.nats.client.Options;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
public class SubscribeMain {
public static void main(String... args) throws IOException, InterruptedException {
// var options = new Options.Builder().
// server("nats://0.0.0.0:4222").
// pingInterval(Duration.ofSeconds(20)). // Set Ping Interval
// maxPingsOut(5). // Set max pings in flight
// verbose().
// build();
//
// var nc = Nats.connect(options);
// nc.close();
var nc = Nats.connect("nats://localhost:4222");
// Subscribe
var sub = nc.subscribe("test");
// Read a message
while (true ) {
Thread.sleep(1000);
var msg = sub.nextMessage(Duration.ZERO);
String str = new String(msg.getData(), StandardCharsets.UTF_8);
System.out.println(str);
}
// Close the connection
//nc.close();
}
}
try-nats/src/main/java/com/cloudurable/nats/nats/requestreply/RequestReply.java
package com.cloudurable.nats.nats.requestreply;
import io.nats.client.Nats;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
public class RequestReply {
public static void main(String... args) throws IOException, InterruptedException {
var nc = Nats.connect("nats://localhost:4222");
var dispatcher = nc.createDispatcher(msg -> {
System.out.println(msg.getSubject() + "->" + new String(msg.getData(), StandardCharsets.UTF_8));
nc.publish(msg.getReplyTo(), ("got it " + new String(msg.getData(), StandardCharsets.UTF_8)).getBytes(StandardCharsets.UTF_8) );
});
dispatcher.subscribe("test");
//nc.publish("updates", "All is Well".getBytes(StandardCharsets.UTF_8));
//nc.flush(Duration.ZERO);
//nc.close();
}
}
try-nats/src/main/java/com/cloudurable/nats/nats/requestreply/Request.java
package com.cloudurable.nats.nats.requestreply;
import io.nats.client.Nats;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
public class Request {
public static void main(String... args) throws IOException, InterruptedException {
var nc = Nats.connect("nats://localhost:4222");
while (true) {
Thread.sleep(1000);
var message = nc.request("test", "test".getBytes(StandardCharsets.UTF_8), Duration.ofSeconds(100));
System.out.println(new String(message.getData(),StandardCharsets.UTF_8));
}
//nc.publish("updates", "All is Well".getBytes(StandardCharsets.UTF_8));
//nc.flush(Duration.ZERO);
//nc.close();
}
}
try-nats/src/main/java/com/cloudurable/nats/jms/requestreply/RequestJMS.java
package com.cloudurable.nats.jms.requestreply;
import javax.jms.*;
import javax.naming.InitialContext;
import javax.naming.NamingException;
public class RequestJMS {
public static void main(String... args) throws NamingException, JMSException, InterruptedException {
try {
var context = new InitialContext();
var factory = (ConnectionFactory) context.lookup("ConnectionFactory");
var orderQueue = (Queue) context.lookup("dynamicQueues/testQueue");
var connection = factory.createConnection();
var session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
var producer = session.createProducer(orderQueue);
connection.start();
while (true) {
Thread.sleep(1000);
var message = session.createTextMessage("This is an order");
var responseQueue = session.createTemporaryQueue();
message.setJMSReplyTo(responseQueue);
producer.send(message);
var consumer = session.createConsumer(responseQueue);
var receive = (TextMessage) consumer.receive(1000);
if (receive != null) {
System.out.println(receive.getText());
}
}
}catch (Exception ex) {
ex.printStackTrace();
}
}
}
try-nats/src/main/java/com/cloudurable/nats/jms/requestreply/RequestResponseJMS.java
package com.cloudurable.nats.jms.requestreply;
import javax.jms.*;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
public class RequestResponseJMS {
public static void main(String... args) throws NamingException, JMSException, InterruptedException, IOException {
try {
var context = new InitialContext();
var factory = (ConnectionFactory) context.lookup("ConnectionFactory");
var orderQueue = (Queue) context.lookup("dynamicQueues/testQueue");
var connection = factory.createConnection();
var session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
var consumer = session.createConsumer(orderQueue);
connection.start();
while (true) {
var receivedMessage = (TextMessage) consumer.receive();
System.out.println("Got message: " + receivedMessage.getText());
var destination = receivedMessage.getJMSReplyTo();
if (destination!=null) {
var producer = session.createProducer(destination);
var message = session.createTextMessage("This is an response to an order");
producer.send(message);
}
}
}catch (Exception ex) {
ex.printStackTrace();
}
}
}
Proposed Java Client Change
I have a question. Instead of a completeable future.. I'd like a callback. I guess that is a request. So request would take three forms.. Blocks, returns a completeable, or you pass it a callback. Thoughts?
Request with a callback.
nc.request("test", "test".getBytes(StandardCharsets.UTF_8), msg -> {
System.out.println(new String(message.getData(),StandardCharsets.UTF_8));
});
Java Request/Reply Example
There was no equiv of this Go example for reply for Java.
src/main/java/com/cloudurable/nats/request/RequestReply.java
package com.cloudurable.nats.request;
import io.nats.client.Nats;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
public class RequestReply {
public static void main(String... args) throws IOException, InterruptedException {
var nc = Nats.connect("nats://localhost:4222");
var dispatcher = nc.createDispatcher(msg -> {
System.out.println(msg.getSubject() + "->" + new String(msg.getData(), StandardCharsets.UTF_8));
nc.publish(msg.getReplyTo(), ("got it " + new String(msg.getData(), StandardCharsets.UTF_8)).getBytes(StandardCharsets.UTF_8) );
});
dispatcher.subscribe("test");
//nc.publish("updates", "All is Well".getBytes(StandardCharsets.UTF_8));
//nc.flush(Duration.ZERO);
//nc.close();
}
}
src/main/java/com/cloudurable/nats/request/Request.java
There was no equiv of this Go example for request for Java.
package com.cloudurable.nats.request;
import io.nats.client.Nats;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
public class Request {
public static void main(String... args) throws IOException, InterruptedException {
var nc = Nats.connect("nats://localhost:4222");
while (true) {
Thread.sleep(1000);
var message = nc.request("test", "test".getBytes(StandardCharsets.UTF_8), Duration.ofSeconds(100));
System.out.println(new String(message.getData(),StandardCharsets.UTF_8));
}
//nc.publish("updates", "All is Well".getBytes(StandardCharsets.UTF_8));
//nc.flush(Duration.ZERO);
//nc.close();
}
}
Running nats server
/usr/local/Cellar/nats-server-v2.1.4-darwin-amd64/nats-server
Setting up activemq
sudo mkdir -p /usr/local/activemq
sudo chown "$(whoami)" /usr/local/activemq
/usr/local/Cellar/apache-artemis-2.11.0/bin/artemis create /usr/local/activemq
Creating ActiveMQ Artemis instance at: /usr/local/activemq
--user: is a mandatory property!
Please provide the default username:
admin
--password: is mandatory with this configuration:
Please provide the default password:
--allow-anonymous | --require-login: is a mandatory property!
Allow anonymous access?, valid values are Y,N,True,False
y
Auto tuning journal ...
done! Your system can make 0.1 writes per millisecond, your journal-buffer-timeout will be 10120000
You can now start the broker by executing:
"/usr/local/activemq/bin/artemis" run
Or you can run the broker in the background using:
"/usr/local/activemq/bin/artemis-service" start
Protocols
2020-03-16 12:34:46,747 INFO [org.apache.activemq.artemis.core.server] AMQ221020: Started KQUEUE Acceptor at 0.0.0.0:61616 for protocols [CORE,MQTT,AMQP,STOMP,HORNETQ,OPENWIRE]
2020-03-16 12:34:46,755 INFO [org.apache.activemq.artemis.core.server] AMQ221020: Started KQUEUE Acceptor at 0.0.0.0:5445 for protocols [HORNETQ,STOMP]
2020-03-16 12:34:46,762 INFO [org.apache.activemq.artemis.core.server] AMQ221020: Started KQUEUE Acceptor at 0.0.0.0:5672 for protocols [AMQP]
2020-03-16 12:34:46,769 INFO [org.apache.activemq.artemis.core.server] AMQ221020: Started KQUEUE Acceptor at 0.0.0.0:1883 for protocols [MQTT]
2020-03-16 12:34:46,777 INFO [org.apache.activemq.artemis.core.server] AMQ221020: Started KQUEUE Acceptor at 0.0.0.0:61613 for protocols [STOMP]
JNDI Properties
java.naming.factory.initial=org.apache.activemq.artemis.jndi.ActiveMQInitialContextFactory
connectionFactory.ConnectionFactory=tcp://localhost:61616
queue.queue/testQueue=testQueue
src/main/resources/jndi.properties
java.naming.factory.initial=org.apache.activemq.artemis.jndi.ActiveMQInitialContextFactory
connectionFactory.ConnectionFactory=tcp://localhost:61616
queue.queue/testQueue=testQueue
src/main/java/com/cloudurable/nats/SendJMS.java
package com.cloudurable.nats;
import javax.jms.*;
import javax.naming.InitialContext;
import javax.naming.NamingException;
public class SendJMS {
public static void main(String... args) throws NamingException, JMSException, InterruptedException {
var context = new InitialContext();
var factory = (ConnectionFactory)context.lookup("ConnectionFactory");
var orderQueue = (Queue)context.lookup("dynamicQueues/testQueue");
var connection = factory.createConnection();
var session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
var producer = session.createProducer((Destination) orderQueue);
var consumer = session.createConsumer((Destination) orderQueue);
connection.start();
while (true ) {
Thread.sleep(1000);
TextMessage message = session.createTextMessage("This is an order");
producer.send(message);
}
}
}
src/main/java/com/cloudurable/nats/GetJMS.java
package com.cloudurable.nats;
import io.nats.client.Nats;
import javax.jms.*;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
public class GetJMS {
public static void main(String... args) throws NamingException, JMSException, InterruptedException, IOException {
var nc = Nats.connect("nats://localhost:4222");
var context = new InitialContext();
var factory = (ConnectionFactory) context.lookup("ConnectionFactory");
var orderQueue = (Queue) context.lookup("dynamicQueues/testQueue");
var connection = factory.createConnection();
var session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
var consumer = session.createConsumer((Destination) orderQueue);
connection.start();
while (true) {
Thread.sleep(1000);
TextMessage receivedMessage = (TextMessage)consumer.receive();
System.out.println("Got message: " + receivedMessage.getText());
nc.publish("test", receivedMessage.getText().getBytes(StandardCharsets.UTF_8));
}
}
}
src/main/java/com/cloudurable/nats/SendMain.java
package com.cloudurable.nats;
import io.nats.client.Nats;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.concurrent.TimeoutException;
public class SendMain {
public static void main(String... args) throws IOException, InterruptedException, TimeoutException {
var nc = Nats.connect("nats://localhost:4222");
nc.publish("updates", "All is Well".getBytes(StandardCharsets.UTF_8));
nc.flush(Duration.ZERO);
nc.close();
}
}
src/main/java/com/cloudurable/nats/SubscribeMain.java
package com.cloudurable.nats;
import io.nats.client.Nats;
import io.nats.client.Options;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
public class SubscribeMain {
public static void main(String... args) throws IOException, InterruptedException {
// var options = new Options.Builder().
// server("nats://0.0.0.0:4222").
// pingInterval(Duration.ofSeconds(20)). // Set Ping Interval
// maxPingsOut(5). // Set max pings in flight
// verbose().
// build();
//
// var nc = Nats.connect(options);
// nc.close();
var nc = Nats.connect("nats://localhost:4222");
// Subscribe
var sub = nc.subscribe("test");
// Read a message
while (true ) {
Thread.sleep(1000);
var msg = sub.nextMessage(Duration.ZERO);
String str = new String(msg.getData(), StandardCharsets.UTF_8);
System.out.println(str);
}
// Close the connection
//nc.close();
}
}