blob: a9f3c3f004ef0bf0a0ed743b75a8575e44df06f2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.jms;
import static org.apache.beam.sdk.io.UnboundedSource.UnboundedReader.BACKLOG_UNKNOWN;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.lang.reflect.Proxy;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.List;
import java.util.function.Function;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.QueueBrowser;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.security.AuthenticationUser;
import org.apache.activemq.security.SimpleAuthenticationPlugin;
import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
import org.apache.activemq.util.Callback;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.CoderProperties;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Throwables;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
/** Tests of {@link JmsIO}. */
@RunWith(JUnit4.class)
@SuppressWarnings({
"rawtypes", // TODO(https://issues.apache.org/jira/browse/BEAM-10556)
})
public class JmsIOTest {
private static final String BROKER_URL = "vm://localhost";
private static final String USERNAME = "test_user";
private static final String PASSWORD = "test_password";
private static final String QUEUE = "test_queue";
private static final String TOPIC = "test_topic";
private BrokerService broker;
private ConnectionFactory connectionFactory;
private ConnectionFactory connectionFactoryWithSyncAcksAndWithoutPrefetch;
@Rule public final transient TestPipeline pipeline = TestPipeline.create();
@Before
public void startBroker() throws Exception {
broker = new BrokerService();
broker.setUseJmx(false);
broker.setPersistenceAdapter(new MemoryPersistenceAdapter());
broker.addConnector(BROKER_URL);
broker.setBrokerName("localhost");
broker.setPopulateJMSXUserID(true);
broker.setUseAuthenticatedPrincipalForJMSXUserID(true);
// enable authentication
List<AuthenticationUser> users = new ArrayList<>();
// username and password to use to connect to the broker.
// This user has users privilege (able to browse, consume, produce, list destinations)
users.add(new AuthenticationUser(USERNAME, PASSWORD, "users"));
SimpleAuthenticationPlugin plugin = new SimpleAuthenticationPlugin(users);
BrokerPlugin[] plugins = new BrokerPlugin[] {plugin};
broker.setPlugins(plugins);
broker.start();
// create JMS connection factory
connectionFactory = new ActiveMQConnectionFactory(BROKER_URL);
connectionFactoryWithSyncAcksAndWithoutPrefetch =
new ActiveMQConnectionFactory(
BROKER_URL + "?jms.prefetchPolicy.all=0&jms.sendAcksAsync=false");
}
@After
public void stopBroker() throws Exception {
broker.stop();
}
private void runPipelineExpectingJmsConnectException(String innerMessage) {
try {
pipeline.run();
fail();
} catch (Exception e) {
assertThat(Throwables.getRootCause(e).getMessage(), containsString(innerMessage));
}
}
@Test
public void testAuthenticationRequired() {
pipeline.apply(JmsIO.read().withConnectionFactory(connectionFactory).withQueue(QUEUE));
runPipelineExpectingJmsConnectException("User name [null] or password is invalid.");
}
@Test
public void testAuthenticationWithBadPassword() {
pipeline.apply(
JmsIO.read()
.withConnectionFactory(connectionFactory)
.withQueue(QUEUE)
.withUsername(USERNAME)
.withPassword("BAD"));
runPipelineExpectingJmsConnectException("User name [" + USERNAME + "] or password is invalid.");
}
@Test
public void testReadMessages() throws Exception {
// produce message
Connection connection = connectionFactory.createConnection(USERNAME, PASSWORD);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(session.createQueue(QUEUE));
TextMessage message = session.createTextMessage("This Is A Test");
producer.send(message);
producer.send(message);
producer.send(message);
producer.send(message);
producer.send(message);
producer.send(message);
producer.close();
session.close();
connection.close();
// read from the queue
PCollection<JmsRecord> output =
pipeline.apply(
JmsIO.read()
.withConnectionFactory(connectionFactory)
.withQueue(QUEUE)
.withUsername(USERNAME)
.withPassword(PASSWORD)
.withMaxNumRecords(5));
PAssert.thatSingleton(output.apply("Count", Count.globally())).isEqualTo(5L);
pipeline.run();
connection = connectionFactory.createConnection(USERNAME, PASSWORD);
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(session.createQueue(QUEUE));
Message msg = consumer.receiveNoWait();
assertNull(msg);
}
@Test
public void testReadBytesMessages() throws Exception {
// produce message
Connection connection = connectionFactory.createConnection(USERNAME, PASSWORD);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(session.createQueue(QUEUE));
BytesMessage message = session.createBytesMessage();
message.writeBytes("This Is A Test".getBytes(StandardCharsets.UTF_8));
producer.send(message);
producer.close();
session.close();
connection.close();
// read from the queue
PCollection<String> output =
pipeline.apply(
JmsIO.<String>readMessage()
.withConnectionFactory(connectionFactory)
.withQueue(QUEUE)
.withUsername(USERNAME)
.withPassword(PASSWORD)
.withMaxNumRecords(1)
.withCoder(SerializableCoder.of(String.class))
.withMessageMapper(new BytesMessageToStringMessageMapper()));
PAssert.thatSingleton(output.apply("Count", Count.<String>globally())).isEqualTo(1L);
pipeline.run();
connection = connectionFactory.createConnection(USERNAME, PASSWORD);
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(session.createQueue(QUEUE));
Message msg = consumer.receiveNoWait();
assertNull(msg);
}
@Test
public void testWriteMessage() throws Exception {
ArrayList<String> data = new ArrayList<>();
for (int i = 0; i < 100; i++) {
data.add("Message " + i);
}
pipeline
.apply(Create.of(data))
.apply(
JmsIO.write()
.withConnectionFactory(connectionFactory)
.withQueue(QUEUE)
.withUsername(USERNAME)
.withPassword(PASSWORD));
pipeline.run();
Connection connection = connectionFactory.createConnection(USERNAME, PASSWORD);
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(session.createQueue(QUEUE));
int count = 0;
while (consumer.receive(1000) != null) {
count++;
}
assertEquals(100, count);
}
@Test
public void testSplitForQueue() throws Exception {
JmsIO.Read read = JmsIO.read().withQueue(QUEUE);
PipelineOptions pipelineOptions = PipelineOptionsFactory.create();
int desiredNumSplits = 5;
JmsIO.UnboundedJmsSource initialSource = new JmsIO.UnboundedJmsSource(read);
List<JmsIO.UnboundedJmsSource> splits = initialSource.split(desiredNumSplits, pipelineOptions);
// in the case of a queue, we have concurrent consumers by default, so the initial number
// splits is equal to the desired number of splits
assertEquals(desiredNumSplits, splits.size());
}
@Test
public void testSplitForTopic() throws Exception {
JmsIO.Read read = JmsIO.read().withTopic(TOPIC);
PipelineOptions pipelineOptions = PipelineOptionsFactory.create();
int desiredNumSplits = 5;
JmsIO.UnboundedJmsSource initialSource = new JmsIO.UnboundedJmsSource(read);
List<JmsIO.UnboundedJmsSource> splits = initialSource.split(desiredNumSplits, pipelineOptions);
// in the case of a topic, we can have only a unique subscriber on the topic per pipeline
// else it means we can have duplicate messages (all subscribers on the topic receive every
// message).
// So, whatever the desizedNumSplits is, the actual number of splits should be 1.
assertEquals(1, splits.size());
}
@Test
public void testCheckpointMark() throws Exception {
// we are using no prefetch here
// prefetch is an ActiveMQ feature: to make efficient use of network resources the broker
// utilizes a 'push' model to dispatch messages to consumers. However, in the case of our
// test, it means that we can have some latency between the receiveNoWait() method used by
// the consumer and the prefetch buffer populated by the broker. Using a prefetch to 0 means
// that the consumer will poll for message, which is exactly what we want for the test.
// We are also sending message acknowledgements synchronously to ensure that they are
// processed before any subsequent assertions.
Connection connection =
connectionFactoryWithSyncAcksAndWithoutPrefetch.createConnection(USERNAME, PASSWORD);
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(session.createQueue(QUEUE));
for (int i = 0; i < 10; i++) {
producer.send(session.createTextMessage("test " + i));
}
producer.close();
session.close();
connection.close();
JmsIO.Read spec =
JmsIO.read()
.withConnectionFactory(connectionFactoryWithSyncAcksAndWithoutPrefetch)
.withUsername(USERNAME)
.withPassword(PASSWORD)
.withQueue(QUEUE);
JmsIO.UnboundedJmsSource source = new JmsIO.UnboundedJmsSource(spec);
JmsIO.UnboundedJmsReader reader = source.createReader(null, null);
// start the reader and move to the first record
assertTrue(reader.start());
// consume 3 messages (NB: start already consumed the first message)
for (int i = 0; i < 3; i++) {
assertTrue(reader.advance());
}
// the messages are still pending in the queue (no ACK yet)
assertEquals(10, count(QUEUE));
// we finalize the checkpoint
reader.getCheckpointMark().finalizeCheckpoint();
// the checkpoint finalize ack the messages, and so they are not pending in the queue anymore
assertEquals(6, count(QUEUE));
// we read the 6 pending messages
for (int i = 0; i < 6; i++) {
assertTrue(reader.advance());
}
// still 6 pending messages as we didn't finalize the checkpoint
assertEquals(6, count(QUEUE));
// we finalize the checkpoint: no more message in the queue
reader.getCheckpointMark().finalizeCheckpoint();
assertEquals(0, count(QUEUE));
}
@Test
public void testCheckpointMarkSafety() throws Exception {
final int messagesToProcess = 100;
// we are using no prefetch here
// prefetch is an ActiveMQ feature: to make efficient use of network resources the broker
// utilizes a 'push' model to dispatch messages to consumers. However, in the case of our
// test, it means that we can have some latency between the receiveNoWait() method used by
// the consumer and the prefetch buffer populated by the broker. Using a prefetch to 0 means
// that the consumer will poll for message, which is exactly what we want for the test.
// We are also sending message acknowledgements synchronously to ensure that they are
// processed before any subsequent assertions.
Connection connection =
connectionFactoryWithSyncAcksAndWithoutPrefetch.createConnection(USERNAME, PASSWORD);
connection.start();
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
// Fill the queue with messages
MessageProducer producer = session.createProducer(session.createQueue(QUEUE));
for (int i = 0; i < messagesToProcess; i++) {
producer.send(session.createTextMessage("test " + i));
}
producer.close();
session.close();
connection.close();
// create a JmsIO.Read with a decorated ConnectionFactory which will introduce a delay in
// sending
// acknowledgements - this should help uncover threading issues around checkpoint management.
JmsIO.Read spec =
JmsIO.read()
.withConnectionFactory(
withSlowAcks(connectionFactoryWithSyncAcksAndWithoutPrefetch, 10))
.withUsername(USERNAME)
.withPassword(PASSWORD)
.withQueue(QUEUE);
JmsIO.UnboundedJmsSource source = new JmsIO.UnboundedJmsSource(spec);
JmsIO.UnboundedJmsReader reader = source.createReader(null, null);
// start the reader and move to the first record
assertTrue(reader.start());
// consume half the messages (NB: start already consumed the first message)
for (int i = 0; i < (messagesToProcess / 2) - 1; i++) {
assertTrue(reader.advance());
}
// the messages are still pending in the queue (no ACK yet)
assertEquals(messagesToProcess, count(QUEUE));
// we finalize the checkpoint for the already-processed messages while simultaneously consuming
// the remainder of
// messages from the queue
Thread runner =
new Thread(
() -> {
try {
for (int i = 0; i < messagesToProcess / 2; i++) {
assertTrue(reader.advance());
}
} catch (IOException ex) {
throw new RuntimeException(ex);
}
});
runner.start();
reader.getCheckpointMark().finalizeCheckpoint();
// Concurrency issues would cause an exception to be thrown before this method exits, failing
// the test
runner.join();
}
/** Test the checkpoint mark default coder, which is actually AvroCoder. */
@Test
public void testCheckpointMarkDefaultCoder() throws Exception {
JmsCheckpointMark jmsCheckpointMark = new JmsCheckpointMark();
jmsCheckpointMark.add(new ActiveMQMessage());
Coder coder = new JmsIO.UnboundedJmsSource(null).getCheckpointMarkCoder();
CoderProperties.coderSerializable(coder);
CoderProperties.coderDecodeEncodeEqual(coder, jmsCheckpointMark);
}
@Test
public void testDefaultAutoscaler() throws IOException {
JmsIO.Read spec =
JmsIO.read()
.withConnectionFactory(connectionFactory)
.withUsername(USERNAME)
.withPassword(PASSWORD)
.withQueue(QUEUE);
JmsIO.UnboundedJmsSource source = new JmsIO.UnboundedJmsSource(spec);
JmsIO.UnboundedJmsReader reader = source.createReader(null, null);
// start the reader and check getSplitBacklogBytes and getTotalBacklogBytes values
reader.start();
assertEquals(BACKLOG_UNKNOWN, reader.getSplitBacklogBytes());
assertEquals(BACKLOG_UNKNOWN, reader.getTotalBacklogBytes());
reader.close();
}
@Test
public void testCustomAutoscaler() throws IOException {
long excpectedTotalBacklogBytes = 1111L;
AutoScaler autoScaler = mock(DefaultAutoscaler.class);
when(autoScaler.getTotalBacklogBytes()).thenReturn(excpectedTotalBacklogBytes);
JmsIO.Read spec =
JmsIO.read()
.withConnectionFactory(connectionFactory)
.withUsername(USERNAME)
.withPassword(PASSWORD)
.withQueue(QUEUE)
.withAutoScaler(autoScaler);
JmsIO.UnboundedJmsSource source = new JmsIO.UnboundedJmsSource(spec);
JmsIO.UnboundedJmsReader reader = source.createReader(null, null);
// start the reader and check getSplitBacklogBytes and getTotalBacklogBytes values
reader.start();
verify(autoScaler, times(1)).start();
assertEquals(excpectedTotalBacklogBytes, reader.getTotalBacklogBytes());
verify(autoScaler, times(1)).getTotalBacklogBytes();
reader.close();
verify(autoScaler, times(1)).stop();
}
private int count(String queue) throws Exception {
Connection connection = connectionFactory.createConnection(USERNAME, PASSWORD);
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
QueueBrowser browser = session.createBrowser(session.createQueue(queue));
Enumeration<Message> messages = browser.getEnumeration();
int count = 0;
while (messages.hasMoreElements()) {
messages.nextElement();
count++;
}
return count;
}
/** A test class that maps a {@link javax.jms.BytesMessage} into a {@link String}. */
public static class BytesMessageToStringMessageMapper implements JmsIO.MessageMapper<String> {
@Override
public String mapMessage(Message message) throws Exception {
BytesMessage bytesMessage = (BytesMessage) message;
byte[] bytes = new byte[(int) bytesMessage.getBodyLength()];
return new String(bytes, StandardCharsets.UTF_8);
}
}
/*
* A utility method which replaces a ConnectionFactory with one where calling receiveNoWait() -- i.e. pulling a
* message -- will return a message with its acknowledgement callback decorated to include a sleep for a specified
* duration. This gives the effect of ensuring messages take at least {@code delay} milliseconds to be processed.
*/
private ConnectionFactory withSlowAcks(ConnectionFactory factory, long delay) {
return proxyMethod(
factory,
ConnectionFactory.class,
"createConnection",
(Connection connection) ->
proxyMethod(
connection,
Connection.class,
"createSession",
(Session session) ->
proxyMethod(
session,
Session.class,
"createConsumer",
(MessageConsumer consumer) ->
proxyMethod(
consumer,
MessageConsumer.class,
"receiveNoWait",
(ActiveMQMessage message) -> {
final Callback originalCallback =
message.getAcknowledgeCallback();
message.setAcknowledgeCallback(
() -> {
Thread.sleep(delay);
originalCallback.execute();
});
return message;
}))));
}
/*
* A utility method which decorates an existing object with a proxy instance adhering to a given interface, with the
* specified method name having its return value transformed by the provided function.
*/
private <T, MethodArgT, MethodResultT> T proxyMethod(
T target,
Class<? super T> proxyInterface,
String methodName,
Function<MethodArgT, MethodResultT> resultTransformer) {
return (T)
Proxy.newProxyInstance(
this.getClass().getClassLoader(),
new Class[] {proxyInterface},
(proxy, method, args) -> {
Object result = method.invoke(target, args);
if (method.getName().equals(methodName)) {
result = resultTransformer.apply((MethodArgT) result);
}
return result;
});
}
}