blob: 1160128919b8db4069203fc1d3b4b0d076e3eb8e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.activemq;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.test.util.SuccessException;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.jupiter.api.Disabled;
import scala.concurrent.duration.Deadline;
import scala.concurrent.duration.FiniteDuration;
import java.util.HashSet;
import java.util.Random;
import static org.apache.flink.test.util.TestUtils.tryExecute;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@Disabled("failing")
public class ActiveMQConnectorITCase {
public static final int MESSAGES_NUM = 10000;
public static final String QUEUE_NAME = "queue";
public static final String TOPIC_NAME = "topic";
private static LocalFlinkMiniCluster flink;
private static int flinkPort;
@BeforeClass
public static void beforeClass() {
// start also a re-usable Flink mini cluster
Configuration flinkConfig = new Configuration();
flinkConfig.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
flinkConfig.setInteger(TaskManagerOptions.NUM_TASK_SLOTS.key(), 8);
flinkConfig.setInteger(TaskManagerOptions.MANAGED_MEMORY_SIZE.key(), 16);
flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s");
flink = new LocalFlinkMiniCluster(flinkConfig, false);
flink.start();
flinkPort = flink.getLeaderRPCPort();
}
@AfterClass
public static void afterClass() {
flinkPort = -1;
if (flink != null) {
flink.startInternalShutdown();
}
}
@Test
public void amqTopologyWithQueue() throws Exception {
StreamExecutionEnvironment env = createExecutionEnvironment();
AMQSinkConfig<String> sinkConfig = new AMQSinkConfig.AMQSinkConfigBuilder<String>()
.setConnectionFactory(createConnectionFactory())
.setDestinationName(QUEUE_NAME)
.setSerializationSchema(new SimpleStringSchema())
.build();
createProducerTopology(env, sinkConfig);
ActiveMQConnectionFactory sourceConnectionFactory = createConnectionFactory();
AMQSourceConfig<String> sourceConfig = new AMQSourceConfig.AMQSourceConfigBuilder<String>()
.setConnectionFactory(sourceConnectionFactory)
.setDestinationName(QUEUE_NAME)
.setDeserializationSchema(new SimpleStringSchema())
.build();
createConsumerTopology(env, sourceConfig);
tryExecute(env, "AMQTest");
}
@Test
public void amqTopologyWithTopic() throws Exception {
StreamExecutionEnvironment env = createExecutionEnvironment();
AMQSinkConfig<String> sinkConfig = new AMQSinkConfig.AMQSinkConfigBuilder<String>()
.setConnectionFactory(createConnectionFactory())
.setDestinationName(TOPIC_NAME)
.setSerializationSchema(new SimpleStringSchema())
.setDestinationType(DestinationType.TOPIC)
.build();
createProducerTopology(env, sinkConfig);
ActiveMQConnectionFactory sourceConnectionFactory = createConnectionFactory();
AMQSourceConfig<String> sourceConfig = new AMQSourceConfig.AMQSourceConfigBuilder<String>()
.setConnectionFactory(sourceConnectionFactory)
.setDestinationName(TOPIC_NAME)
.setDeserializationSchema(new SimpleStringSchema())
.setDestinationType(DestinationType.TOPIC)
.build();
createConsumerTopology(env, sourceConfig);
tryExecute(env, "AMQTest");
}
private StreamExecutionEnvironment createExecutionEnvironment() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
env.setRestartStrategy(RestartStrategies.noRestart());
env.getConfig().disableSysoutLogging();
return env;
}
private ActiveMQConnectionFactory createConnectionFactory() {
return new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
}
private void createProducerTopology(StreamExecutionEnvironment env, AMQSinkConfig<String> config) {
DataStreamSource<String> stream = env.addSource(new SourceFunction<String>() {
@Override
public void run(SourceContext<String> ctx) throws Exception {
for (int i = 0; i < MESSAGES_NUM; i++) {
ctx.collect("amq-" + i);
}
}
@Override
public void cancel() {}
});
AMQSink<String> sink = new AMQSink<>(config);
stream.addSink(sink);
}
private void createConsumerTopology(StreamExecutionEnvironment env, AMQSourceConfig<String> config) {
AMQSource<String> source = new AMQSource<>(config);
env.addSource(source)
.addSink(new SinkFunction<String>() {
final HashSet<Integer> set = new HashSet<>();
@Override
public void invoke(String value, Context context) throws Exception {
int val = Integer.parseInt(value.split("-")[1]);
set.add(val);
if (set.size() == MESSAGES_NUM) {
throw new SuccessException();
}
}
});
}
@Test
public void amqTopologyWithCheckpointing() throws Exception {
ActiveMQConnectionFactory connectionFactory = createConnectionFactory();
AMQSinkConfig<String> sinkConfig = new AMQSinkConfig.AMQSinkConfigBuilder<String>()
.setConnectionFactory(connectionFactory)
.setDestinationName("queue2")
.setSerializationSchema(new SimpleStringSchema())
.build();
AMQSink<String> sink = new AMQSink<>(sinkConfig);
sink.open(new Configuration());
for (int i = 0; i < MESSAGES_NUM; i++) {
sink.invoke("amq-" + i, null);
}
AMQSourceConfig<String> sourceConfig = new AMQSourceConfig.AMQSourceConfigBuilder<String>()
.setConnectionFactory(connectionFactory)
.setDestinationName("queue2")
.setDeserializationSchema(new SimpleStringSchema())
.build();
final AMQSource<String> source = new AMQSource<>(sourceConfig);
RuntimeContext runtimeContext = createMockRuntimeContext();
source.setRuntimeContext(runtimeContext);
source.open(new Configuration());
final TestSourceContext sourceContext = new TestSourceContext();
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
try {
source.run(sourceContext);
} catch (Exception e) {
e.printStackTrace();
}
}
});
thread.start();
Deadline deadline = FiniteDuration.apply(5, "s").fromNow();
while (deadline.hasTimeLeft() && sourceContext.getIdsNum() < MESSAGES_NUM) {
Thread.sleep(100);
Random random = new Random();
final long checkpointId = random.nextLong();
synchronized (sourceContext.getCheckpointLock()) {
source.snapshotState(new FunctionSnapshotContext() {
@Override
public long getCheckpointId() {
return checkpointId;
}
@Override
public long getCheckpointTimestamp() {
return System.currentTimeMillis();
}
});
source.notifyCheckpointComplete(checkpointId);
}
}
assertEquals(MESSAGES_NUM, sourceContext.getIdsNum());
}
private RuntimeContext createMockRuntimeContext() {
StreamingRuntimeContext runtimeContext = mock(StreamingRuntimeContext.class);
when(runtimeContext.isCheckpointingEnabled()).thenReturn(true);
return runtimeContext;
}
class TestSourceContext implements SourceFunction.SourceContext<String> {
private HashSet<Integer> ids = new HashSet<>();
private Object contextLock = new Object();
@Override
public void collect(String value) {
int val = Integer.parseInt(value.split("-")[1]);
ids.add(val);
}
@Override
public void collectWithTimestamp(String element, long timestamp) { }
@Override
public void emitWatermark(Watermark mark) { }
@Override
public void markAsTemporarilyIdle() {
throw new UnsupportedOperationException();
}
@Override
public Object getCheckpointLock() {
return contextLock;
}
@Override
public void close() { }
public int getIdsNum() {
synchronized (contextLock) {
return ids.size();
}
}
};
}