Copying headers from JMS MQ to Nats and copying headers from Nats to JMS MQ Bridge Message Wire Protocol - nats-io/nats-jms-bridge GitHub Wiki
Copying headers from JMS MQ to Nats and copying headers from Nats to JMS MQ Bridge Message Wire Protocol
We defined a binary protocol to encode headers into a Nat's message and this allows us to pass headers from JMS through nats and back. The header copy is bidirectional.
- #38 JMS Bridge header copy test
- #34 Got rid of String Message
- Started ..4 and created VMs but not done, but made JNDI configurable and URL are now env vars
- #35 Defined Message Protocol for encoding headers
- ..36 Started and did some task to facilitate this (progress)
- #37 Did this to support 4 and 36
To see headers being copied from Nats to JMS see NatsToJmsBridgeWithHeadersTest.
To see headers being copied from JMS to Nats see JmsToNatsBridgeWithHeadersTest.
Total commits: 14
Total ctimes: 185
Total files: 30
Total loc: 1839
| Author | loc | coms | fils | distribution |
|:------------------|------:|-------:|-------:|:-----------------|
| Richard Hightower | 1839 | 14 | 30 | 100.0/ 100/100.0 |
Total commits: 31
Total ctimes: 869
Total files: 102
Total loc: 5253
| Author | loc | coms | fils | distribution |
|:------------------|------:|-------:|-------:|:----------------|
| Richard Hightower | 3621 | 26 | 64 | 68.9/83.9/62.7 |
| Rick Hightower | 1632 | 5 | 38 | 31.1/16.1/37.3 |
- 286% more code than before. Most of that is copying headers and implementing the wire protocol.
- We are at 90% code coverage.
- There are full integration tests that send headers from Nats, use those headers behind JMS/MQ and then send them back.
- All integration tests are mirrored, e.g., full integration tests that send headers from JMS, use those headers behind Nats and then send them back.
- Request reply bridge is fully implemented
- Queue to queue bridge is fully implemented
- Passing JMS/MQ headers are fully implemented
<AB marker [byte]><CD marker [byte]><MAJOR_VERSION [byte]><MINOR_VERSION[byte]><AB marker [byte]><CD marker [byte]>
<HEADER_LEN [int]><HEADER_HASH [int]><HEADER_JSON_OBJECT_BYTES [bytes]>
<BODY_LEN [int]><BODY_HASH [int]><OPAQUE_BODY_BYTES>[bytes]
public class Protocol {
public static final int MESSAGE_VERSION_MAJOR = 1;
public static final int MESSAGE_VERSION_MINOR = 0;
public static final int MARKER_AB = (byte) 0xab;
public static final int MARKER_CD = (byte) 0xcd;
public static final String HEADER_KEY_DELIVERY_TIME = "BRIDGE_deliveryTime";
public static final String HEADER_KEY_TIMESTAMP = "BRIDGE_timestamp";
public static final String HEADER_KEY_MODE = "BRIDGE_mode";
public static final String HEADER_KEY_EXPIRATION_TIME = "BRIDGE_expirationTime";
public static final String HEADER_KEY_TYPE = "BRIDGE_type";
public static final String HEADER_KEY_PRIORITY = "BRIDGE_priority";
public static final String HEADER_KEY_REDELIVERED = "BRIDGE_redelivered";
public static int createHashCode(byte[] value) {
int h = 0;
byte[] var2 = value;
int var3 = value.length;
for (int var4 = 0; var4 < var3; ++var4) {
byte v = var2[var4];
h = 31 * h + (v & 255);
}
return h;
}
}package io.nats.bridge.messages;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.Map;
public interface Message {
String NO_TYPE = "NO_TYPE";
default long timestamp() {return -1L;}
//TTL plus timestamp
default long expirationTime() {return -1L;}
//Delivery time is not instant
default long deliveryTime() {return -1L;}
default int deliveryMode() {return -1;}
default String type() {return NO_TYPE;}
default boolean redelivered() {return false;}
default int priority() {return -1;}
default String correlationID() {return "";}
default Map<String, Object> headers() {return Collections.emptyMap();}
default void reply(Message reply) {}
default byte [] getBodyBytes() {
return new byte[0];
}
default String bodyAsString() {
return new String(getBodyBytes(), StandardCharsets.UTF_8);
}
default byte [] getMessageBytes() {
return getBodyBytes();
}
}package io.nats.bridge.messages;
...
public class BaseMessageWithHeaders implements BytesMessage {
...
public byte[] getMessageAsBytes() {
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
final DataOutputStream streamOut = new DataOutputStream(baos);
try {
streamOut.writeByte(Protocol.MARKER_AB);
streamOut.writeByte(Protocol.MARKER_CD);
streamOut.writeByte(Protocol.MESSAGE_VERSION_MAJOR);
streamOut.writeByte(Protocol.MESSAGE_VERSION_MINOR);
streamOut.writeByte(Protocol.MARKER_AB);
streamOut.writeByte(Protocol.MARKER_CD);
final HashMap<String, Object> outputHeaders = new HashMap<>(9 + (headers != null ? headers.size() : 0));
if (headers != null) {
outputHeaders.putAll(headers);
}
if (deliveryTime > 0) {
outputHeaders.put(HEADER_KEY_DELIVERY_TIME, this.deliveryTime());
}
if (mode != -1)
outputHeaders.put(HEADER_KEY_MODE, this.deliveryMode());
if (expirationTime > 0)
outputHeaders.put(HEADER_KEY_EXPIRATION_TIME, this.expirationTime());
if (timestamp > 0)
outputHeaders.put(HEADER_KEY_TIMESTAMP, this.timestamp());
if (type != null)
outputHeaders.put(HEADER_KEY_TYPE, this.type());
if (priority != -1)
outputHeaders.put(HEADER_KEY_PRIORITY, this.priority());
if (redelivered)
outputHeaders.put(HEADER_KEY_REDELIVERED, this.redelivered());
byte[] headerBytes = mapper.writeValueAsBytes(outputHeaders);
streamOut.writeInt(headerBytes.length);
streamOut.writeInt(Protocol.createHashCode(headerBytes));
streamOut.write(headerBytes);
if (bodyBytes != null) {
streamOut.writeInt(bodyBytes.length);
streamOut.writeInt(Protocol.createHashCode(bodyBytes));
streamOut.write(bodyBytes);
} else {
streamOut.write(0);
streamOut.write(0);
}
} catch (Exception e) {
throw new MessageException("Can't write out message", e);
} finally {
try {
streamOut.close();
baos.close();
} catch (Exception e) {
throw new MessageException("Can't write out message", e);
}
}
return baos.toByteArray();
}
...
}public class MessageBuilder {
public Message buildFromBytes(byte[] buffer) {
if (buffer.length > 5) {
if (buffer[0] == MARKER_AB &&
buffer[1] == MARKER_CD &&
buffer[2] == Protocol.MESSAGE_VERSION_MAJOR &&
buffer[3] == Protocol.MESSAGE_VERSION_MINOR &&
buffer[4] == MARKER_AB &&
buffer[5] == MARKER_CD
) {
final DataInputStream dataInputStream = new DataInputStream(new ByteArrayInputStream(buffer));
try {
dataInputStream.skipBytes(6);
//Read the header
final int jsonLength = dataInputStream.readInt();
final int jsonHash = dataInputStream.readInt();
final byte[] jsonByteBuffer = new byte[jsonLength];
dataInputStream.read(jsonByteBuffer);
if (Protocol.createHashCode(jsonByteBuffer) != jsonHash) {
throw new MessageBuilderException("JSON Hash did not match for headers");
}
final Map<String, Object> header = mapper.readValue(jsonByteBuffer, Map.class);
//Read the body
final int bodyLength = dataInputStream.readInt();
final int bodyHash = dataInputStream.readInt();
final byte[] bodyBuffer = new byte[bodyLength];
dataInputStream.read(bodyBuffer);
if (Protocol.createHashCode(bodyBuffer) != bodyHash) {
throw new MessageBuilderException("Body Hash did not match ");
}
/* read headers */
if (header.containsKey(HEADER_KEY_TIMESTAMP)) {
withTimestamp((long) header.get(HEADER_KEY_TIMESTAMP));
header.remove(HEADER_KEY_TIMESTAMP);
}
if (header.containsKey(HEADER_KEY_EXPIRATION_TIME)) {
withExpirationTime((long) header.get(HEADER_KEY_EXPIRATION_TIME));
header.remove(HEADER_KEY_EXPIRATION_TIME);
}
if (header.containsKey(HEADER_KEY_DELIVERY_TIME)) {
withDeliveryTime((long) header.get(HEADER_KEY_DELIVERY_TIME));
header.remove(HEADER_KEY_DELIVERY_TIME);
}
if (header.containsKey(HEADER_KEY_MODE)) {
withDeliveryMode((int) header.get(HEADER_KEY_MODE));
header.remove(HEADER_KEY_MODE);
}
if (header.containsKey(HEADER_KEY_TYPE)) {
withType((String) header.get(HEADER_KEY_TYPE));
header.remove(HEADER_KEY_TYPE);
}
if (header.containsKey(HEADER_KEY_REDELIVERED)) {
withRedelivered((boolean) header.get(HEADER_KEY_REDELIVERED));
header.remove(HEADER_KEY_REDELIVERED);
}
if (header.containsKey(HEADER_KEY_PRIORITY)) {
withPriority((int) header.get(HEADER_KEY_PRIORITY));
header.remove(HEADER_KEY_PRIORITY);
}
withHeaders(header);
withBody(bodyBuffer);
return build();
} catch (final IOException ex) {
throw new MessageBuilderException("Unable to create message", ex);
}
} else {
withBody(buffer);
return build();
}
} else {
withBody(buffer);
return build();
}
}package io.nats.bridge.jms.support;
import io.nats.bridge.messages.BaseMessageWithHeaders;
import io.nats.bridge.messages.Message;
import io.nats.bridge.util.FunctionWithException;
import javax.jms.BytesMessage;
import javax.jms.JMSException;
import javax.jms.Session;
import java.util.Map;
import java.util.Set;
public class ConvertBridgeMessageToJmsMessageWithHeaders implements FunctionWithException<Message, javax.jms.Message> {
private final Session session;
public ConvertBridgeMessageToJmsMessageWithHeaders(final Session session) {
this.session = session;
}
@Override
public javax.jms.Message apply(final Message message) throws Exception {
final BytesMessage bytesMessage = session.createBytesMessage();
if (message instanceof BaseMessageWithHeaders) {
copyHeaders(message, bytesMessage);
}
bytesMessage.writeBytes(message.getBodyBytes());
return bytesMessage;
}
private void copyHeaders(Message message, BytesMessage bytesMessage) throws JMSException {
final Map<String, Object> headers = message.headers();
final Set<String> keys = headers.keySet();
for (String key : keys) {
bytesMessage.setObjectProperty(key, headers.get(key));
}
if (message.timestamp() != -1)
bytesMessage.setJMSTimestamp(message.timestamp());
if (message.expirationTime() != -1)
bytesMessage.setJMSExpiration(message.expirationTime());
if (message.deliveryTime() != -1)
bytesMessage.setJMSDeliveryTime(message.deliveryTime());
if (message.deliveryMode() != -1)
bytesMessage.setJMSDeliveryMode(message.deliveryMode());
if (!Message.NO_TYPE.equals(message.type()))
bytesMessage.setJMSType(message.type());
bytesMessage.setJMSRedelivered(message.redelivered());
if (message.priority() != -1)
bytesMessage.setJMSPriority(message.priority());
if (message.correlationID() != null && message.correlationID().trim().length()>0)
bytesMessage.setJMSCorrelationID(bytesMessage.getJMSCorrelationID());
}
}package io.nats.bridge.jms.support;
import io.nats.bridge.TimeSource;
import io.nats.bridge.jms.JMSMessageBusException;
import io.nats.bridge.messages.Message;
import io.nats.bridge.messages.MessageBuilder;
import io.nats.bridge.util.ExceptionHandler;
import io.nats.bridge.util.FunctionWithException;
import javax.jms.BytesMessage;
import javax.jms.Destination;
import javax.jms.TextMessage;
import java.nio.charset.StandardCharsets;
import java.util.Enumeration;
import java.util.Queue;
public class ConvertJmsMessageToBridgeMessageWithHeaders implements FunctionWithException<javax.jms.Message, Message> {
private final ExceptionHandler tryHandler;
private final TimeSource timeSource;
private final Queue<JMSReply> jmsReplyQueue;
public ConvertJmsMessageToBridgeMessageWithHeaders(final ExceptionHandler tryHandler, final TimeSource timeSource,
final Queue<JMSReply> jmsReplyQueue) {
this.tryHandler = tryHandler;
this.timeSource = timeSource;
this.jmsReplyQueue = jmsReplyQueue;
}
private void enqueueReply(final long sentTime, final Message reply, final String correlationID, final Destination jmsReplyTo) {
jmsReplyQueue.add(new JMSReply(sentTime, reply, correlationID, jmsReplyTo));
}
private byte[] readBytesFromJMSMessage(final javax.jms.Message jmsMessage) throws Exception {
if (jmsMessage instanceof BytesMessage) {
final BytesMessage bytesMessage = (BytesMessage) jmsMessage;
byte[] buffer = new byte[(int) bytesMessage.getBodyLength()];
bytesMessage.readBytes(buffer);
return buffer;
} else if (jmsMessage instanceof TextMessage) {
return ((TextMessage) jmsMessage).getText().getBytes(StandardCharsets.UTF_8);
} else {
throw new JMSMessageBusException("Unable to read bytes from message " + jmsMessage.getClass().getName());
}
}
@Override
public Message apply(final javax.jms.Message jmsMessage) throws Exception {
final Destination jmsReplyTo = jmsMessage.getJMSReplyTo();
final long startTime = timeSource.getTime();
byte[] bodyBytes = readBytesFromJMSMessage(jmsMessage);
final MessageBuilder builder = MessageBuilder.builder().withBody(bodyBytes);
copyHeaders(builder, jmsMessage);
if (jmsReplyTo != null) {
builder.withReplyHandler(reply -> tryHandler.tryWithRethrow(() -> enqueueReply(startTime, reply, jmsMessage.getJMSCorrelationID(), jmsReplyTo), e -> {
throw new JMSMessageBusException("Unable to send to JMS reply", e);
}));
}
return builder.build();
}
private void copyHeaders(final MessageBuilder builder, final javax.jms.Message jmsMessage) throws Exception {
final Enumeration<String> propertyNames = (Enumeration<String>) jmsMessage.getPropertyNames();
while (propertyNames.hasMoreElements()) {
final String propertyName = propertyNames.nextElement();
if (!propertyName.startsWith("JMS")) //Will make excluding or including configurable with reasonable defaults
builder.withHeader(propertyName, jmsMessage.getObjectProperty(propertyName));
}
builder.withRedelivered(jmsMessage.getJMSRedelivered());
builder.withDeliveryTime(jmsMessage.getJMSDeliveryTime());
builder.withDeliveryMode(jmsMessage.getJMSDeliveryMode());
builder.withType(jmsMessage.getJMSType());
builder.withPriority(jmsMessage.getJMSPriority());
builder.withExpirationTime(jmsMessage.getJMSExpiration());
builder.withTimestamp(jmsMessage.getJMSTimestamp());
}
}