blob: 6da76332bbeae0642a7c1aa1948799f36d427894 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.jms.processors;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.nifi.jms.processors.JMSConsumer.JMSResponse;
import org.apache.nifi.logging.ComponentLog;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.springframework.jms.connection.CachingConnectionFactory;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.jms.support.JmsHeaders;
import jakarta.jms.BytesMessage;
import jakarta.jms.JMSException;
import jakarta.jms.MapMessage;
import jakarta.jms.Message;
import jakarta.jms.ObjectMessage;
import jakarta.jms.Session;
import jakarta.jms.StreamMessage;
import jakarta.jms.TextMessage;
import jakarta.jms.Topic;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
public class JMSPublisherConsumerIT {
@Test
public void testObjectMessage() {
final String destinationName = "testObjectMessage";
MessageCreator messageCreator = session -> {
ObjectMessage message = session.createObjectMessage();
message.setObject("stringAsObject");
return message;
};
Consumer<JMSResponse> responseChecker = response -> {
assertArrayEquals(
ObjectMessage.class.getSimpleName().getBytes(StandardCharsets.UTF_8),
response.getMessageBody()
);
};
testMessage(destinationName, messageCreator, responseChecker);
}
@Test
public void testStreamMessage() throws Exception {
final String destinationName = "testStreamMessage";
MessageCreator messageCreator = session -> {
StreamMessage message = session.createStreamMessage();
message.writeBoolean(true);
message.writeByte(Integer.valueOf(1).byteValue());
message.writeBytes(new byte[] {2, 3, 4});
message.writeShort((short)32);
message.writeInt(64);
message.writeLong(128L);
message.writeFloat(1.25F);
message.writeDouble(100.867);
message.writeChar('c');
message.writeString("someString");
message.writeObject("stringAsObject");
return message;
};
byte[] expected;
try (
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream);
) {
dataOutputStream.writeBoolean(true);
dataOutputStream.writeByte(1);
dataOutputStream.write(new byte[] {2, 3, 4});
dataOutputStream.writeShort((short)32);
dataOutputStream.writeInt(64);
dataOutputStream.writeLong(128L);
dataOutputStream.writeFloat(1.25F);
dataOutputStream.writeDouble(100.867);
dataOutputStream.writeChar('c');
dataOutputStream.writeUTF("someString");
dataOutputStream.writeUTF("stringAsObject");
dataOutputStream.flush();
expected = byteArrayOutputStream.toByteArray();
}
Consumer<JMSResponse> responseChecker = response -> {
byte[] actual = response.getMessageBody();
assertArrayEquals(
expected,
actual
);
};
testMessage(destinationName, messageCreator, responseChecker);
}
@Test
public void testMapMessage() {
final String destinationName = "testObjectMessage";
MessageCreator messageCreator = session -> {
MapMessage message = session.createMapMessage();
message.setBoolean("boolean", true);
message.setByte("byte", Integer.valueOf(1).byteValue());
message.setBytes("bytes", new byte[] {2, 3, 4});
message.setShort("short", (short)32);
message.setInt("int", 64);
message.setLong("long", 128L);
message.setFloat("float", 1.25F);
message.setDouble("double", 100.867);
message.setChar("char", 'c');
message.setString("string", "someString");
message.setObject("object", "stringAsObject");
return message;
};
String expectedJson = "{" +
"\"boolean\":true," +
"\"byte\":1," +
"\"bytes\":[2, 3, 4]," +
"\"short\":32," +
"\"int\":64," +
"\"long\":128," +
"\"float\":1.25," +
"\"double\":100.867," +
"\"char\":\"c\"," +
"\"string\":\"someString\"," +
"\"object\":\"stringAsObject\"" +
"}";
testMapMessage(destinationName, messageCreator, expectedJson);
}
private void testMapMessage(String destinationName, MessageCreator messageCreator, String expectedJson) {
Consumer<JMSResponse> responseChecker = response -> {
ObjectMapper objectMapper = new ObjectMapper();
try {
Map<String, Object> actual = objectMapper.readValue(response.getMessageBody(), new TypeReference<Map<String, Object>>() {});
Map<String, Object> expected = objectMapper.readValue(expectedJson.getBytes(), new TypeReference<Map<String, Object>>() {});
assertEquals(expected, actual);
} catch (IOException e) {
throw new RuntimeException(e);
}
};
testMessage(destinationName, messageCreator, responseChecker);
}
private void testMessage(String destinationName, MessageCreator messageCreator, Consumer<JMSResponse> responseChecker) {
JmsTemplate jmsTemplate = CommonTest.buildJmsTemplateForDestination(false);
AtomicBoolean callbackInvoked = new AtomicBoolean();
try {
jmsTemplate.send(destinationName, messageCreator);
JMSConsumer consumer = new JMSConsumer((CachingConnectionFactory) jmsTemplate.getConnectionFactory(), jmsTemplate, mock(ComponentLog.class));
consumer.consumeSingleMessage(destinationName, null, false, false, null, null, "UTF-8", response -> {
callbackInvoked.set(true);
responseChecker.accept(response);
});
assertTrue(callbackInvoked.get());
} finally {
((CachingConnectionFactory) jmsTemplate.getConnectionFactory()).destroy();
}
}
@Test
public void validateBytesConvertedToBytesMessageOnSend() throws Exception {
final String destinationName = "validateBytesConvertedToBytesMessageOnSend";
JmsTemplate jmsTemplate = CommonTest.buildJmsTemplateForDestination(false);
try {
JMSPublisher publisher = new JMSPublisher((CachingConnectionFactory) jmsTemplate.getConnectionFactory(), jmsTemplate, mock(ComponentLog.class));
publisher.publish(destinationName, "hellomq".getBytes());
Message receivedMessage = jmsTemplate.receive(destinationName);
assertTrue(receivedMessage instanceof BytesMessage);
byte[] bytes = new byte[7];
((BytesMessage) receivedMessage).readBytes(bytes);
assertEquals("hellomq", new String(bytes));
} finally {
((CachingConnectionFactory) jmsTemplate.getConnectionFactory()).destroy();
}
}
@Test
public void validateJmsHeadersAndPropertiesAreTransferredFromFFAttributes() throws Exception {
final String destinationName = "validateJmsHeadersAndPropertiesAreTransferredFromFFAttributes";
JmsTemplate jmsTemplate = CommonTest.buildJmsTemplateForDestination(false);
try {
JMSPublisher publisher = new JMSPublisher((CachingConnectionFactory) jmsTemplate.getConnectionFactory(), jmsTemplate, mock(ComponentLog.class));
Map<String, String> flowFileAttributes = new HashMap<>();
flowFileAttributes.put("foo", "foo");
flowFileAttributes.put("hyphen-property", "value");
flowFileAttributes.put("fullstop.property", "value");
flowFileAttributes.put(JmsHeaders.REPLY_TO, "myTopic");
flowFileAttributes.put(JmsHeaders.DELIVERY_MODE, "1");
flowFileAttributes.put(JmsHeaders.PRIORITY, "1");
flowFileAttributes.put(JmsHeaders.EXPIRATION, "never"); // value expected to be integer, make sure non-integer doesn't cause problems
publisher.publish(destinationName, "hellomq".getBytes(), flowFileAttributes);
Message receivedMessage = jmsTemplate.receive(destinationName);
assertTrue(receivedMessage instanceof BytesMessage);
assertEquals("foo", receivedMessage.getStringProperty("foo"));
assertTrue(receivedMessage.propertyExists("hyphen-property"));
assertTrue(receivedMessage.propertyExists("fullstop.property"));
assertTrue(receivedMessage.getJMSReplyTo() instanceof Topic);
assertEquals(1, receivedMessage.getJMSDeliveryMode());
assertEquals(1, receivedMessage.getJMSPriority());
assertEquals("myTopic", ((Topic) receivedMessage.getJMSReplyTo()).getTopicName());
} finally {
((CachingConnectionFactory) jmsTemplate.getConnectionFactory()).destroy();
}
}
@Test
public void validateNIFI6721() throws Exception {
final String destinationName = "validateNIFI6721";
JmsTemplate jmsTemplate = CommonTest.buildJmsTemplateForDestination(false);
try {
ComponentLog mockLog = mock(ComponentLog.class);
JMSPublisher publisher = new JMSPublisher((CachingConnectionFactory) jmsTemplate.getConnectionFactory(), jmsTemplate, mockLog);
Map<String, String> flowFileAttributes = new HashMap<>();
flowFileAttributes.put(JmsHeaders.EXPIRATION, "never"); // value expected to be long, make sure non-long doesn't cause problems
publisher.publish(destinationName, "hellomq-0".getBytes(), flowFileAttributes);
Message receivedMessage = jmsTemplate.receive(destinationName);
assertEquals(0, receivedMessage.getJMSExpiration());
long expiration = Instant.now().toEpochMilli() + 1000 * 120;
flowFileAttributes.put(JmsHeaders.EXPIRATION, Long.toString(expiration));
publisher.publish(destinationName, "hellomq-1".getBytes(), flowFileAttributes);
receivedMessage = jmsTemplate.receive(destinationName);
/**
* https://github.com/spring-projects/spring-framework/issues/24144
* Suggests we cannot rely on the value being specifically what was set
* and on a per message level so instead checking experitation is set
* rather than a specific value
*/
assertTrue(receivedMessage.getJMSExpiration() > 0);
flowFileAttributes.put(JmsHeaders.EXPIRATION, "-1");
publisher.publish(destinationName, "hellomq-3".getBytes(), flowFileAttributes);
receivedMessage = jmsTemplate.receive(destinationName);
assertTrue(receivedMessage.getJMSExpiration() > 0);
flowFileAttributes.put(JmsHeaders.EXPIRATION, "0");
publisher.publish(destinationName, "hellomq-2".getBytes(), flowFileAttributes);
//assertEquals(mockLog.getWarnMessages().size(), 0);
receivedMessage = jmsTemplate.receive(destinationName);
assertEquals(0, receivedMessage.getJMSExpiration());
} finally {
((CachingConnectionFactory) jmsTemplate.getConnectionFactory()).destroy();
}
}
/**
* At the moment the only two supported message types are TextMessage and
* BytesMessage which is sufficient for the type if JMS use cases NiFi is
* used. The may change to the point where all message types are supported
* at which point this test will no be longer required.
*/
@Test
public void validateFailOnUnsupportedMessageType() {
final String destinationName = "validateFailOnUnsupportedMessageType";
JmsTemplate jmsTemplate = CommonTest.buildJmsTemplateForDestination(false);
try {
jmsTemplate.send(destinationName, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
return session.createObjectMessage();
}
});
JMSConsumer consumer = new JMSConsumer((CachingConnectionFactory) jmsTemplate.getConnectionFactory(), jmsTemplate, mock(ComponentLog.class));
consumer.consumeSingleMessage(destinationName, null, false, false, null, null, "UTF-8", response -> {
// noop
});
} finally {
((CachingConnectionFactory) jmsTemplate.getConnectionFactory()).destroy();
}
}
@Test
public void validateConsumeWithCustomHeadersAndProperties() throws Exception {
final String destinationName = "validateConsumeWithCustomHeadersAndProperties";
JmsTemplate jmsTemplate = CommonTest.buildJmsTemplateForDestination(false);
try {
jmsTemplate.send(destinationName, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
TextMessage message = session.createTextMessage("hello from the other side");
message.setStringProperty("foo", "foo");
message.setBooleanProperty("bar", false);
message.setJMSReplyTo(session.createQueue("fooQueue"));
return message;
}
});
JMSConsumer consumer = new JMSConsumer((CachingConnectionFactory) jmsTemplate.getConnectionFactory(), jmsTemplate, mock(ComponentLog.class));
final AtomicBoolean callbackInvoked = new AtomicBoolean();
consumer.consumeSingleMessage(destinationName, null, false, false, null, null, "UTF-8", response -> {
callbackInvoked.set(true);
assertEquals("hello from the other side", new String(response.getMessageBody()));
assertEquals("fooQueue", response.getMessageHeaders().get(JmsHeaders.REPLY_TO));
assertEquals("foo", response.getMessageProperties().get("foo"));
assertEquals("false", response.getMessageProperties().get("bar"));
});
assertTrue(callbackInvoked.get());
} finally {
((CachingConnectionFactory) jmsTemplate.getConnectionFactory()).destroy();
}
}
@Test
@Timeout(value = 10000, unit = TimeUnit.MILLISECONDS)
public void validateMessageRedeliveryWhenNotAcked() {
String destinationName = "validateMessageRedeliveryWhenNotAcked";
JmsTemplate jmsTemplate = CommonTest.buildJmsTemplateForDestination(false);
try {
JMSPublisher publisher = new JMSPublisher((CachingConnectionFactory) jmsTemplate.getConnectionFactory(), jmsTemplate, mock(ComponentLog.class));
publisher.publish(destinationName, "1".getBytes(StandardCharsets.UTF_8));
publisher.publish(destinationName, "2".getBytes(StandardCharsets.UTF_8));
JMSConsumer consumer = new JMSConsumer((CachingConnectionFactory) jmsTemplate.getConnectionFactory(), jmsTemplate, mock(ComponentLog.class));
final AtomicBoolean callbackInvoked = new AtomicBoolean();
try {
consumer.consumeSingleMessage(destinationName, null, false, false, null, null, "UTF-8", response -> {
callbackInvoked.set(true);
assertEquals("1", new String(response.getMessageBody()));
throw new RuntimeException("intentional to avoid explicit ack");
});
} catch (Exception e) {
// expected
}
assertTrue(callbackInvoked.get());
callbackInvoked.set(false);
// should receive the same message, but will process it successfully
while (!callbackInvoked.get()) {
consumer.consumeSingleMessage(destinationName, null, false, false, null, null, "UTF-8", response -> {
if (response == null) {
return;
}
callbackInvoked.set(true);
assertEquals("2", new String(response.getMessageBody()));
acknowledge(response);
});
}
assertTrue(callbackInvoked.get());
callbackInvoked.set(false);
// receiving next message and fail again
try {
while (!callbackInvoked.get()) {
consumer.consumeSingleMessage(destinationName, null, false, false, null, null, "UTF-8", response -> {
if (response == null) {
return;
}
callbackInvoked.set(true);
assertEquals("1", new String(response.getMessageBody()));
throw new RuntimeException("intentional to avoid explicit ack");
});
}
} catch (Exception e) {
// ignore
}
assertTrue(callbackInvoked.get());
callbackInvoked.set(false);
// should receive the same message, but will process it successfully
try {
while (!callbackInvoked.get()) {
consumer.consumeSingleMessage(destinationName, null, false, false, null, null, "UTF-8", response -> {
if (response == null) {
return;
}
callbackInvoked.set(true);
assertEquals("1", new String(response.getMessageBody()));
acknowledge(response);
});
}
} catch (Exception e) {
// ignore
}
} finally {
((CachingConnectionFactory) jmsTemplate.getConnectionFactory()).destroy();
}
}
private void acknowledge(JMSResponse response) {
try {
response.acknowledge();
} catch (JMSException e) {
throw new IllegalStateException("Unable to acknowledge JMS message");
}
}
@Test
public void testMessageSelector() {
String destinationName = "testMessageSelector";
JmsTemplate jmsTemplate = CommonTest.buildJmsTemplateForDestination(false);
String messageSelector = "prop = '1'";
try {
jmsTemplate.send(destinationName, session -> session.createTextMessage("msg0"));
jmsTemplate.send(destinationName, session -> {
TextMessage message = session.createTextMessage("msg1");
message.setStringProperty("prop", "1");
return message;
});
jmsTemplate.send(destinationName, session -> {
TextMessage message = session.createTextMessage("msg2");
message.setStringProperty("prop", "2");
return message;
});
JMSConsumer consumer = new JMSConsumer((CachingConnectionFactory) jmsTemplate.getConnectionFactory(), jmsTemplate, mock(ComponentLog.class));
AtomicBoolean callbackInvoked = new AtomicBoolean();
consumer.consumeSingleMessage(destinationName, null, false, false, null, messageSelector, "UTF-8", response -> {
callbackInvoked.set(true);
assertEquals("msg1", new String(response.getMessageBody()));
});
assertTrue(callbackInvoked.get());
} finally {
((CachingConnectionFactory) jmsTemplate.getConnectionFactory()).destroy();
}
}
}