[BAHIR-58] Add ActiveMQ connector
Closes #3
diff --git a/.travis.yml b/.travis.yml
index 4c3ba91..fd3733a 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -18,7 +18,7 @@
language: java
env:
- - FLINK_VERSION="1.0.3"
+# - FLINK_VERSION="1.1.0"
- FLINK_VERSION="1.1.1"
jdk:
diff --git a/flink-connector-activemq/pom.xml b/flink-connector-activemq/pom.xml
new file mode 100644
index 0000000..528b546
--- /dev/null
+++ b/flink-connector-activemq/pom.xml
@@ -0,0 +1,118 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.bahir</groupId>
+ <artifactId>bahir-flink_parent_2.11</artifactId>
+ <version>1.0.0-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <artifactId>flink-connector-activemq_2.11</artifactId>
+ <name>flink-connector-activemq</name>
+
+ <packaging>jar</packaging>
+
+ <!-- Allow users to pass custom connector versions -->
+ <properties>
+ <activemq.version>5.14.0</activemq.version>
+ </properties>
+
+ <dependencies>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-java_2.11</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>activemq-client</artifactId>
+ <version>${activemq.version}</version>
+ </dependency>
+
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-java_2.11</artifactId>
+ <version>${flink.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-test-utils_2.11</artifactId>
+ <version>${flink.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-tests_2.11</artifactId>
+ <version>${flink.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-runtime_2.11</artifactId>
+ <version>${flink.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.activemq.tooling</groupId>
+ <artifactId>activemq-junit</artifactId>
+ <version>${activemq.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>activemq-broker</artifactId>
+ <version>${activemq.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.powermock</groupId>
+ <artifactId>powermock-module-junit4</artifactId>
+ <version>1.5.5</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.powermock</groupId>
+ <artifactId>powermock-api-mockito</artifactId>
+ <version>1.5.5</version>
+ <scope>test</scope>
+ </dependency>
+
+ </dependencies>
+
+</project>
diff --git a/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSink.java b/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSink.java
new file mode 100644
index 0000000..a494162
--- /dev/null
+++ b/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSink.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.activemq;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.connectors.activemq.internal.AMQUtil;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+/**
+ * Sink class for writing data into ActiveMQ queue.
+ * <p>
+ * To create an instance of AMQSink class one should initialize and configure an
+ * instance of a connection factory that will be used to create a connection.
+ * Every input message is converted into a byte array using a serialization
+ * schema and being sent into a message queue.
+ *
+ * @param <IN> type of input messages
+ */
+public class AMQSink<IN> extends RichSinkFunction<IN> {
+ private static final long serialVersionUID = 1L;
+
+ private static final Logger LOG = LoggerFactory.getLogger(AMQSink.class);
+
+
+ // Factory that is used to create AMQ connection
+ private final ActiveMQConnectionFactory connectionFactory;
+ // Name of a queue or topic
+ private final String destinationName;
+ // Serialization scheme that is used to convert input message to bytes
+ private final SerializationSchema<IN> serializationSchema;
+ // Defines if persistent delivery in AMQ is used
+ private final boolean persistentDelivery;
+ // Type of AMQ destination (topic or a queue)
+ private final DestinationType destinationType;
+ // Throw exceptions or just log them
+ private boolean logFailuresOnly = false;
+ // Used to send messages
+ private transient MessageProducer producer;
+ // AMQ session
+ private transient Session session;
+ // AMQ connection
+ private transient Connection connection;
+
+ /**
+ * Create AMQSink.
+ *
+ * @param config AMQSink configuration
+ */
+ public AMQSink(AMQSinkConfig<IN> config) {
+ this.connectionFactory = config.getConnectionFactory();
+ this.destinationName = config.getDestinationName();
+ this.serializationSchema = config.getSerializationSchema();
+ this.persistentDelivery = config.isPersistentDelivery();
+ this.destinationType = config.getDestinationType();
+ }
+
+ /**
+ * Defines whether the producer should fail on errors, or only log them.
+ * If this is set to true, then exceptions will be only logged, if set to false,
+ * exceptions will be eventually thrown and cause the streaming program to
+ * fail (and enter recovery).
+ *
+ * @param logFailuresOnly The flag to indicate logging-only on exceptions.
+ */
+ public void setLogFailuresOnly(boolean logFailuresOnly) {
+ this.logFailuresOnly = logFailuresOnly;
+ }
+
+
+ @Override
+ public void open(Configuration config) throws Exception {
+ super.open(config);
+ // Create a Connection
+ connection = connectionFactory.createConnection();
+ connection.start();
+
+ // Create a Session
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ // Create the destination (Topic or Queue)
+ Destination destination = AMQUtil.getDestination(session, destinationType, destinationName);
+
+ // Create a MessageProducer from the Session to the Topic or
+ // Queue
+ producer = session.createProducer(destination);
+ producer.setDeliveryMode(getDeliveryMode());
+ }
+
+ private int getDeliveryMode() {
+ if (persistentDelivery) {
+ return DeliveryMode.PERSISTENT;
+ }
+
+ return DeliveryMode.NON_PERSISTENT;
+ }
+
+ /**
+ * Called when new data arrives to the sink, and forwards it to RMQ.
+ *
+ * @param value
+ * The incoming data
+ */
+ @Override
+ public void invoke(IN value) {
+ try {
+ byte[] bytes = serializationSchema.serialize(value);
+ BytesMessage message = session.createBytesMessage();
+ message.writeBytes(bytes);
+ producer.send(message);
+ } catch (JMSException e) {
+ if (logFailuresOnly) {
+ LOG.error("Failed to send message to ActiveMQ", e);
+ } else {
+ throw new RuntimeException("Failed to send message to ActiveMQ", e);
+ }
+ }
+ }
+
+ @Override
+ public void close() {
+ RuntimeException t = null;
+ try {
+ session.close();
+ } catch (JMSException e) {
+ if (logFailuresOnly) {
+ LOG.error("Failed to close ActiveMQ session", e);
+ } else {
+ t = new RuntimeException("Failed to close ActiveMQ session", e);
+ }
+ }
+
+ try {
+ connection.close();
+ } catch (JMSException e) {
+ if (logFailuresOnly) {
+ LOG.error("Failed to close ActiveMQ connection", e);
+ } else {
+ t = t == null ? new RuntimeException("Failed to close ActiveMQ session", e)
+ : t;
+ }
+ }
+
+ if (t != null) {
+ throw t;
+ }
+ }
+
+}
diff --git a/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSinkConfig.java b/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSinkConfig.java
new file mode 100644
index 0000000..86254ff
--- /dev/null
+++ b/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSinkConfig.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.activemq;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Immutable configuration for AMQSink
+ * @param <IN> type of input messages in configured sink
+ */
+public class AMQSinkConfig<IN> {
+ private final ActiveMQConnectionFactory connectionFactory;
+ private final String queueName;
+ private final SerializationSchema<IN> serializationSchema;
+ private final boolean persistentDelivery;
+ private final DestinationType destinationType;
+
+ public AMQSinkConfig(ActiveMQConnectionFactory connectionFactory, String queueName,
+ SerializationSchema<IN> serializationSchema, boolean persistentDelivery,
+ DestinationType destinationType) {
+ this.connectionFactory = Preconditions.checkNotNull(connectionFactory, "connectionFactory");
+ this.queueName = Preconditions.checkNotNull(queueName, "destinationName");
+ this.serializationSchema = Preconditions.checkNotNull(serializationSchema, "serializationSchema");
+ this.persistentDelivery = persistentDelivery;
+ this.destinationType = Preconditions.checkNotNull(destinationType, "destinationType");
+ }
+
+ public ActiveMQConnectionFactory getConnectionFactory() {
+ return connectionFactory;
+ }
+
+ public String getDestinationName() {
+ return queueName;
+ }
+
+ public SerializationSchema<IN> getSerializationSchema() {
+ return serializationSchema;
+ }
+
+ public boolean isPersistentDelivery() {
+ return persistentDelivery;
+ }
+
+ public DestinationType getDestinationType() {
+ return destinationType;
+ }
+
+
+ /**
+ * Builder for {@link AMQSinkConfig}
+ * @param <IN> type of input messages in configured sink
+ */
+ public static class AMQSinkConfigBuilder<IN> {
+ private ActiveMQConnectionFactory connectionFactory;
+ private String destinationName;
+ private SerializationSchema<IN> serializationSchema;
+ private boolean persistentDelivery;
+ private DestinationType destinationType = DestinationType.QUEUE;
+
+ public AMQSinkConfigBuilder<IN> setConnectionFactory(ActiveMQConnectionFactory connectionFactory) {
+ this.connectionFactory = Preconditions.checkNotNull(connectionFactory);
+ return this;
+ }
+
+ public AMQSinkConfigBuilder<IN> setDestinationName(String queueName) {
+ this.destinationName = Preconditions.checkNotNull(queueName);
+ return this;
+ }
+
+ public AMQSinkConfigBuilder<IN> setSerializationSchema(SerializationSchema<IN> serializationSchema) {
+ this.serializationSchema = Preconditions.checkNotNull(serializationSchema);
+ return this;
+ }
+
+ public AMQSinkConfigBuilder<IN> setPersistentDelivery(boolean persistentDelivery) {
+ this.persistentDelivery = persistentDelivery;
+ return this;
+ }
+
+ public AMQSinkConfigBuilder<IN> setDestinationType(DestinationType destinationType) {
+ this.destinationType = Preconditions.checkNotNull(destinationType);
+ return this;
+ }
+
+ public AMQSinkConfig<IN> build() {
+ return new AMQSinkConfig<IN>(connectionFactory, destinationName, serializationSchema, persistentDelivery, destinationType);
+ }
+
+ }
+}
diff --git a/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSource.java b/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSource.java
new file mode 100644
index 0000000..49f2cf7
--- /dev/null
+++ b/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSource.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.activemq;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQSession;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.connectors.activemq.internal.AMQExceptionListener;
+import org.apache.flink.streaming.connectors.activemq.internal.AMQUtil;
+import org.apache.flink.streaming.connectors.activemq.internal.RunningChecker;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import java.util.HashMap;
+import java.util.List;
+
+/**
+ * Source for reading messages from an ActiveMQ queue.
+ * <p>
+ * To create an instance of AMQSink class one should initialize and configure an
+ * instance of a connection factory that will be used to create a connection.
+ * This source is waiting for incoming messages from ActiveMQ and converts them from
+ * an array of bytes into an instance of the output type. If an incoming
+ * message is not a message with an array of bytes, this message is ignored
+ * and warning message is logged.
+ *
+ * If checkpointing is enabled AMQSink will not acknowledge received AMQ messages as they arrive,
+ * but will store them internally and will acknowledge a bulk of messages during checkpointing.
+ *
+ * @param <OUT> type of output messages
+ */
+public class AMQSource<OUT> extends MessageAcknowledgingSourceBase<OUT, String>
+ implements ResultTypeQueryable<OUT> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(AMQSource.class);
+
+ // Factory that is used to create AMQ connection
+ private final ActiveMQConnectionFactory connectionFactory;
+ // Name of a queue or topic
+ private final String destinationName;
+ // Deserialization scheme that is used to convert bytes to output message
+ private final DeserializationSchema<OUT> deserializationSchema;
+ // Type of AMQ destination (topic or a queue)
+ private final DestinationType destinationType;
+ // Throw exceptions or just log them
+ private boolean logFailuresOnly = false;
+ // Stores if source is running (used for testing)
+ private RunningChecker runningChecker;
+ // AMQ connection
+ private transient Connection connection;
+ // AMQ session
+ private transient Session session;
+ // Used to receive incoming messages
+ private transient MessageConsumer consumer;
+ // If source should immediately acknowledge incoming message
+ private boolean autoAck;
+ // Map of message ids to currently unacknowledged AMQ messages
+ private HashMap<String, Message> unacknowledgedMessages = new HashMap<>();
+ // Listener for AMQ exceptions
+ private AMQExceptionListener exceptionListener;
+
+ /**
+ * Create AMQSource.
+ *
+ * @param config AMQSource configuration
+ */
+ AMQSource(AMQSourceConfig<OUT> config) {
+ super(String.class);
+ this.connectionFactory = config.getConnectionFactory();
+ this.destinationName = config.getDestinationName();
+ this.deserializationSchema = config.getDeserializationSchema();
+ this.runningChecker = config.getRunningChecker();
+ this.destinationType = config.getDestinationType();
+ }
+
+ /**
+ * Defines whether the producer should fail on errors, or only log them.
+ * If this is set to true, then exceptions will be only logged, if set to false,
+ * exceptions will be eventually thrown and cause the streaming program to
+ * fail (and enter recovery).
+ *
+ * @param logFailuresOnly The flag to indicate logging-only on exceptions.
+ */
+ public void setLogFailuresOnly(boolean logFailuresOnly) {
+ this.logFailuresOnly = logFailuresOnly;
+ }
+
+ // Visible for testing
+ void setExceptionListener(AMQExceptionListener exceptionListener) {
+ this.exceptionListener = exceptionListener;
+ }
+
+ @Override
+ public void open(Configuration config) throws Exception {
+ super.open(config);
+ // Create a Connection
+ connection = connectionFactory.createConnection();
+ connection.start();
+
+ exceptionListener = new AMQExceptionListener(LOG, logFailuresOnly);
+ connection.setExceptionListener(exceptionListener);
+
+ RuntimeContext runtimeContext = getRuntimeContext();
+ int acknowledgeType;
+ if (runtimeContext instanceof StreamingRuntimeContext
+ && ((StreamingRuntimeContext) runtimeContext).isCheckpointingEnabled()) {
+ autoAck = false;
+ acknowledgeType = ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE;
+ } else {
+ autoAck = true;
+ acknowledgeType = ActiveMQSession.AUTO_ACKNOWLEDGE;
+ }
+ // Create a Session
+ session = connection.createSession(false, acknowledgeType);
+
+ // Create the destination (Topic or Queue)
+ Destination destination = AMQUtil.getDestination(session, destinationType, destinationName);
+
+ // Create a MessageConsumer from the Session to the Topic or
+ // Queue
+ consumer = session.createConsumer(destination);
+ runningChecker.setIsRunning(true);
+ }
+
+ @Override
+ public void close() throws Exception {
+ super.close();
+ RuntimeException exception = null;
+ try {
+ consumer.close();
+ } catch (JMSException e) {
+ if (logFailuresOnly) {
+ LOG.error("Failed to close ActiveMQ session", e);
+ } else {
+ exception = new RuntimeException("Failed to close ActiveMQ consumer", e);
+ }
+ }
+ try {
+ session.close();
+ } catch (JMSException e) {
+ if (logFailuresOnly) {
+ LOG.error("Failed to close ActiveMQ session", e);
+ } else {
+ exception = exception == null ? new RuntimeException("Failed to close ActiveMQ session", e)
+ : exception;
+ }
+
+ }
+ try {
+ connection.close();
+ } catch (JMSException e) {
+ if (logFailuresOnly) {
+ LOG.error("Failed to close ActiveMQ session", e);
+ } else {
+ exception = exception == null ? new RuntimeException("Failed to close ActiveMQ connection", e)
+ : exception;
+ }
+ }
+
+ if (exception != null) {
+ throw exception;
+ }
+ }
+
+ @Override
+ protected void acknowledgeIDs(long checkpointId, List<String> UIds) {
+ try {
+ for (String messageId : UIds) {
+ Message unacknowledgedMessage = unacknowledgedMessages.get(messageId);
+ if (unacknowledgedMessage != null) {
+ unacknowledgedMessage.acknowledge();
+ unacknowledgedMessages.remove(messageId);
+ } else {
+ LOG.warn("Tried to acknowledge unknown ActiveMQ message id: {}", messageId);
+ }
+ }
+ } catch (JMSException e) {
+ if (logFailuresOnly) {
+ LOG.error("Failed to acknowledge ActiveMQ message");
+ } else {
+ throw new RuntimeException("Failed to acknowledge ActiveMQ message");
+ }
+ }
+ }
+
+ @Override
+ public void run(SourceContext<OUT> ctx) throws Exception {
+ while (runningChecker.isRunning()) {
+ exceptionListener.checkErroneous();
+
+ Message message = consumer.receive(1000);
+ if (! (message instanceof BytesMessage)) {
+ LOG.warn("Active MQ source received non bytes message: {}");
+ return;
+ }
+ BytesMessage bytesMessage = (BytesMessage) message;
+ byte[] bytes = new byte[(int) bytesMessage.getBodyLength()];
+ bytesMessage.readBytes(bytes);
+ OUT value = deserializationSchema.deserialize(bytes);
+ synchronized (ctx.getCheckpointLock()) {
+ ctx.collect(value);
+ if (!autoAck) {
+ addId(bytesMessage.getJMSMessageID());
+ unacknowledgedMessages.put(bytesMessage.getJMSMessageID(), bytesMessage);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void cancel() {
+ runningChecker.setIsRunning(false);
+ }
+
+ @Override
+ public TypeInformation<OUT> getProducedType() {
+ return deserializationSchema.getProducedType();
+ }
+
+}
diff --git a/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSourceConfig.java b/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSourceConfig.java
new file mode 100644
index 0000000..2dcb2cb
--- /dev/null
+++ b/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSourceConfig.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.activemq;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.flink.streaming.connectors.activemq.internal.RunningChecker;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Immutable AMQ source config.
+ *
+ * @param <OUT> type of source output messages
+ */
+public class AMQSourceConfig<OUT> {
+
+ private final ActiveMQConnectionFactory connectionFactory;
+ private final String destinationName;
+ private final DeserializationSchema<OUT> deserializationSchema;
+ private final RunningChecker runningChecker;
+ private final DestinationType destinationType;
+
+ AMQSourceConfig(ActiveMQConnectionFactory connectionFactory, String destinationName,
+ DeserializationSchema<OUT> deserializationSchema, RunningChecker runningChecker,
+ DestinationType destinationType) {
+ this.connectionFactory = Preconditions.checkNotNull(connectionFactory, "connectionFactory");
+ this.destinationName = Preconditions.checkNotNull(destinationName, "destinationName");
+ this.deserializationSchema = Preconditions.checkNotNull(deserializationSchema, "deserializationSchema");
+ this.runningChecker = Preconditions.checkNotNull(runningChecker, "runningChecker");
+ this.destinationType = Preconditions.checkNotNull(destinationType, "destinationType");
+ }
+
+ public ActiveMQConnectionFactory getConnectionFactory() {
+ return connectionFactory;
+ }
+
+ public String getDestinationName() {
+ return destinationName;
+ }
+
+ public DeserializationSchema<OUT> getDeserializationSchema() {
+ return deserializationSchema;
+ }
+
+ public RunningChecker getRunningChecker() {
+ return runningChecker;
+ }
+
+ public DestinationType getDestinationType() {
+ return destinationType;
+ }
+
+ /**
+ * Builder for {@link AMQSourceConfig}
+ *
+ * @param <OUT> type of source output messages
+ */
+ public static class AMQSourceConfigBuilder<OUT> {
+ private ActiveMQConnectionFactory connectionFactory;
+ private String destinationName;
+ private DeserializationSchema<OUT> deserializationSchema;
+ private RunningChecker runningChecker = new RunningChecker();
+ private DestinationType destinationType = DestinationType.QUEUE;
+
+ public AMQSourceConfigBuilder<OUT> setConnectionFactory(ActiveMQConnectionFactory connectionFactory) {
+ this.connectionFactory = Preconditions.checkNotNull(connectionFactory);
+ return this;
+ }
+
+ public AMQSourceConfigBuilder<OUT> setDestinationName(String destinationName) {
+ this.destinationName = Preconditions.checkNotNull(destinationName);
+ return this;
+ }
+
+ public AMQSourceConfigBuilder<OUT> setDeserializationSchema(DeserializationSchema<OUT> deserializationSchema) {
+ this.deserializationSchema = Preconditions.checkNotNull(deserializationSchema);
+ return this;
+ }
+
+ public AMQSourceConfigBuilder<OUT> setRunningChecker(RunningChecker runningChecker) {
+ this.runningChecker = Preconditions.checkNotNull(runningChecker);
+ return this;
+ }
+
+ public AMQSourceConfigBuilder<OUT> setDestinationType(DestinationType destinationType) {
+ this.destinationType = Preconditions.checkNotNull(destinationType);
+ return this;
+ }
+
+ public AMQSourceConfig<OUT> build() {
+ return new AMQSourceConfig<OUT>(connectionFactory, destinationName, deserializationSchema, runningChecker, destinationType);
+ }
+
+ }
+}
diff --git a/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/DestinationType.java b/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/DestinationType.java
new file mode 100644
index 0000000..fa111a4
--- /dev/null
+++ b/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/DestinationType.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.activemq;
+
+/**
+ * Type of AMQ destination
+ */
+public enum DestinationType {
+ QUEUE,
+ TOPIC
+}
diff --git a/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/internal/AMQExceptionListener.java b/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/internal/AMQExceptionListener.java
new file mode 100644
index 0000000..94fcd56
--- /dev/null
+++ b/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/internal/AMQExceptionListener.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.activemq.internal;
+
+import org.slf4j.Logger;
+
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+
+public class AMQExceptionListener implements ExceptionListener {
+
+ private final boolean logFailuresOnly;
+ private final Logger logger;
+ private JMSException exception;
+
+ public AMQExceptionListener(Logger logger, boolean logFailuresOnly) {
+ this.logger = logger;
+ this.logFailuresOnly = logFailuresOnly;
+ }
+
+ @Override
+ public void onException(JMSException e) {
+ this.exception = e;
+ }
+
+ /**
+ * Check if the listener received an asynchronous exception. Throws an exception if it was
+ * received and if logFailuresOnly was set to true. Resets the state after the call
+ * so a single exception can be thrown only once.
+ *
+ * @throws JMSException if exception was received and logFailuresOnly was set to true.
+ */
+ public void checkErroneous() throws JMSException {
+ if (exception == null) {
+ return;
+ }
+
+ JMSException recordedException = exception;
+ exception = null;
+ if (logFailuresOnly) {
+ logger.error("Received ActiveMQ exception", recordedException);
+ } else {
+ throw recordedException;
+ }
+ }
+}
diff --git a/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/internal/AMQUtil.java b/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/internal/AMQUtil.java
new file mode 100644
index 0000000..e5ae524
--- /dev/null
+++ b/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/internal/AMQUtil.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.activemq.internal;
+
+import org.apache.flink.streaming.connectors.activemq.DestinationType;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Session;
+
+/**
+ * Utilities for AMQ connector
+ */
+public class AMQUtil {
+ private AMQUtil() {}
+
+ /**
+ * Create ActiveMQ destination (queue or topic).
+ *
+ * @param session AMQ session
+ * @param destinationType destination type to create
+ * @param destinationName name of the destination
+ * @return created destination
+ * @throws JMSException
+ */
+ public static Destination getDestination(Session session, DestinationType destinationType,
+ String destinationName) throws JMSException {
+ switch (destinationType) {
+ case QUEUE:
+ return session.createQueue(destinationName);
+ case TOPIC:
+ return session.createTopic(destinationName);
+ default:
+ throw new IllegalArgumentException("Unknown destination type: " + destinationType);
+ }
+ }
+
+}
diff --git a/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/internal/RunningChecker.java b/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/internal/RunningChecker.java
new file mode 100644
index 0000000..8c46695
--- /dev/null
+++ b/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/internal/RunningChecker.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.activemq.internal;
+
+import java.io.Serializable;
+
+/**
+ * Class that is used to store current status of source execution
+ */
+public class RunningChecker implements Serializable {
+ private volatile boolean isRunning = false;
+
+ /**
+ * Check if source should run.
+ *
+ * @return true if source should run, false otherwise
+ */
+ public boolean isRunning() {
+ return isRunning;
+ }
+
+ /**
+ * Set if source should run.
+ *
+ * @param isRunning true if source should run, false otherwise
+ */
+ public void setIsRunning(boolean isRunning) {
+ this.isRunning = isRunning;
+ }
+}
diff --git a/flink-connector-activemq/src/test/java/org/apache/flink/streaming/connectors/activemq/AMQExceptionListenerTest.java b/flink-connector-activemq/src/test/java/org/apache/flink/streaming/connectors/activemq/AMQExceptionListenerTest.java
new file mode 100644
index 0000000..81bb926
--- /dev/null
+++ b/flink-connector-activemq/src/test/java/org/apache/flink/streaming/connectors/activemq/AMQExceptionListenerTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.activemq;
+
+import org.apache.flink.streaming.connectors.activemq.internal.AMQExceptionListener;
+import org.junit.Test;
+import org.slf4j.Logger;
+
+import javax.jms.JMSException;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+public class AMQExceptionListenerTest {
+ @Test
+ public void logMessageOnException() throws JMSException {
+ Logger logger = mock(Logger.class);
+ AMQExceptionListener listener = new AMQExceptionListener(logger, true);
+ JMSException exception = new JMSException("error");
+ listener.onException(exception);
+ listener.checkErroneous();
+ verify(logger).error("Received ActiveMQ exception", exception);
+ }
+
+ @Test
+ public void logMessageWrittenOnlyOnce() throws JMSException {
+ Logger logger = mock(Logger.class);
+ AMQExceptionListener listener = new AMQExceptionListener(logger, true);
+ JMSException exception = new JMSException("error");
+ listener.onException(exception);
+ listener.checkErroneous();
+ listener.checkErroneous();
+ verify(logger, times(1)).error("Received ActiveMQ exception", exception);
+ }
+
+ @Test(expected = JMSException.class)
+ public void throwException() throws JMSException {
+ Logger logger = mock(Logger.class);
+ AMQExceptionListener listener = new AMQExceptionListener(logger, false);
+ listener.onException(new JMSException("error"));
+ listener.checkErroneous();
+ }
+
+ @Test
+ public void throwExceptionOnlyOnce() throws JMSException {
+ Logger logger = mock(Logger.class);
+ AMQExceptionListener listener = new AMQExceptionListener(logger, false);
+ listener.onException(new JMSException("error"));
+
+ try {
+ listener.checkErroneous();
+ } catch (JMSException ignore) {
+ // ignore
+ }
+ listener.checkErroneous();
+ }
+
+ @Test
+ public void logMessageNotWrittenIfNoException() throws JMSException {
+ Logger logger = mock(Logger.class);
+ AMQExceptionListener listener = new AMQExceptionListener(logger, false);
+ listener.checkErroneous();
+ verify(logger, times(0)).error(any(String.class), any(Throwable.class));
+ }
+}
diff --git a/flink-connector-activemq/src/test/java/org/apache/flink/streaming/connectors/activemq/AMQSinkTest.java b/flink-connector-activemq/src/test/java/org/apache/flink/streaming/connectors/activemq/AMQSinkTest.java
new file mode 100644
index 0000000..b9ecfd8
--- /dev/null
+++ b/flink-connector-activemq/src/test/java/org/apache/flink/streaming/connectors/activemq/AMQSinkTest.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.streaming.connectors.activemq;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class AMQSinkTest {
+
+ private final String DESTINATION_NAME = "queue";
+
+ private ActiveMQConnectionFactory connectionFactory;
+ private MessageProducer producer;
+ private Session session;
+ private Connection connection;
+ private Destination destination;
+ private BytesMessage message;
+
+ private AMQSink<String> amqSink;
+ private SerializationSchema<String> serializationSchema;
+
+ @Before
+ public void before() throws Exception {
+ connectionFactory = mock(ActiveMQConnectionFactory.class);
+ producer = mock(MessageProducer.class);
+ session = mock(Session.class);
+ connection = mock(Connection.class);
+ destination = mock(Destination.class);
+ message = mock(BytesMessage.class);
+
+ when(connectionFactory.createConnection()).thenReturn(connection);
+ when(connection.createSession(anyBoolean(), anyInt())).thenReturn(session);
+ when(session.createProducer(null)).thenReturn(producer);
+ when(session.createBytesMessage()).thenReturn(message);
+ serializationSchema = new SimpleStringSchema();
+
+ AMQSinkConfig<String> config = new AMQSinkConfig.AMQSinkConfigBuilder<String>()
+ .setConnectionFactory(connectionFactory)
+ .setDestinationName(DESTINATION_NAME)
+ .setSerializationSchema(serializationSchema)
+ .build();
+ amqSink = new AMQSink<>(config);
+ amqSink.open(new Configuration());
+ }
+
+ @Test
+ public void messageSentToProducer() throws Exception {
+ byte[] expectedMessage = serializationSchema.serialize("msg");
+ amqSink.invoke("msg");
+
+ verify(producer).send(message);
+ verify(message).writeBytes(expectedMessage);
+ }
+
+ @Test
+ public void setPersistentDeliveryMode() throws Exception {
+ AMQSinkConfig<String> config = new AMQSinkConfig.AMQSinkConfigBuilder<String>()
+ .setConnectionFactory(connectionFactory)
+ .setDestinationName(DESTINATION_NAME)
+ .setSerializationSchema(serializationSchema)
+ .setPersistentDelivery(true)
+ .build();
+ amqSink = new AMQSink<>(config);
+ amqSink.open(new Configuration());
+ verify(producer).setDeliveryMode(DeliveryMode.PERSISTENT);
+ }
+
+ @Test
+ public void writeToTopic() throws Exception {
+ AMQSinkConfig<String> config = new AMQSinkConfig.AMQSinkConfigBuilder<String>()
+ .setConnectionFactory(connectionFactory)
+ .setDestinationName(DESTINATION_NAME)
+ .setSerializationSchema(serializationSchema)
+ .setDestinationType(DestinationType.TOPIC)
+ .build();
+ amqSink = new AMQSink<>(config);
+ amqSink.open(new Configuration());
+ verify(session).createTopic(DESTINATION_NAME);
+ }
+
+ @Test
+ public void exceptionOnSendAreNotThrown() throws Exception {
+ when(session.createBytesMessage()).thenThrow(JMSException.class);
+ amqSink.setLogFailuresOnly(true);
+
+ amqSink.invoke("msg");
+ }
+
+ @Test(expected = RuntimeException.class)
+ public void exceptionOnSendAreThrownByDefault() throws Exception {
+ when(session.createBytesMessage()).thenThrow(JMSException.class);
+
+ amqSink.invoke("msg");
+ }
+
+ @Test
+ public void sessionAndConnectionAreClosed() throws Exception {
+ amqSink.close();
+ verify(session).close();
+ verify(connection).close();
+ }
+
+ @Test
+ public void connectionCloseExceptionIsIgnored() throws Exception {
+ doThrow(new JMSException("session")).when(session).close();
+ doThrow(new JMSException("connection")).when(connection).close();
+
+ try {
+ amqSink.close();
+ fail("Should throw an exception");
+ } catch (RuntimeException ex) {
+ assertEquals("session", ex.getCause().getMessage());
+ }
+ }
+
+ @Test
+ public void connectionCloseExceptionIsPassed() throws Exception {
+ doThrow(new JMSException("connection")).when(connection).close();
+
+ try {
+ amqSink.close();
+ fail("Should throw an exception");
+ } catch (RuntimeException ex) {
+ assertEquals("connection", ex.getCause().getMessage());
+ }
+ }
+
+ @Test
+ public void exceptionDuringCloseAsIgnored() throws Exception {
+ doThrow(new JMSException("session")).when(session).close();
+ doThrow(new JMSException("connection")).when(connection).close();
+
+ amqSink.setLogFailuresOnly(true);
+ amqSink.close();
+ }
+}
diff --git a/flink-connector-activemq/src/test/java/org/apache/flink/streaming/connectors/activemq/AMQSourceTest.java b/flink-connector-activemq/src/test/java/org/apache/flink/streaming/connectors/activemq/AMQSourceTest.java
new file mode 100644
index 0000000..05d0d60
--- /dev/null
+++ b/flink-connector-activemq/src/test/java/org/apache/flink/streaming/connectors/activemq/AMQSourceTest.java
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.activemq;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.connectors.activemq.internal.AMQExceptionListener;
+import org.apache.flink.streaming.connectors.activemq.internal.RunningChecker;
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import scala.Array;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+
+import java.util.Collections;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class AMQSourceTest {
+
+ private static final long CHECKPOINT_ID = 1;
+ private final String DESTINATION_NAME = "queue";
+ private final String MSG_ID = "msgId";
+
+ private ActiveMQConnectionFactory connectionFactory;
+ private Session session;
+ private Connection connection;
+ private Destination destination;
+ private MessageConsumer consumer;
+ private BytesMessage message;
+
+ private AMQSource<String> amqSource;
+ private SimpleStringSchema deserializationSchema;
+ SourceFunction.SourceContext<String> context;
+
+ @Before
+ public void before() throws Exception {
+ connectionFactory = mock(ActiveMQConnectionFactory.class);
+ session = mock(Session.class);
+ connection = mock(Connection.class);
+ destination = mock(Destination.class);
+ consumer = mock(MessageConsumer.class);
+ context = mock(SourceFunction.SourceContext.class);
+
+ message = mock(BytesMessage.class);
+
+ when(connectionFactory.createConnection()).thenReturn(connection);
+ when(connection.createSession(anyBoolean(), anyInt())).thenReturn(session);
+ when(consumer.receive(anyInt())).thenReturn(message);
+ when(session.createConsumer(any(Destination.class))).thenReturn(consumer);
+ when(context.getCheckpointLock()).thenReturn(new Object());
+ when(message.getJMSMessageID()).thenReturn(MSG_ID);
+
+ deserializationSchema = new SimpleStringSchema();
+ AMQSourceConfig<String> config = new AMQSourceConfig.AMQSourceConfigBuilder<String>()
+ .setConnectionFactory(connectionFactory)
+ .setDestinationName(DESTINATION_NAME)
+ .setDeserializationSchema(deserializationSchema)
+ .setRunningChecker(new SingleLoopRunChecker())
+ .build();
+ amqSource = new AMQSource<>(config);
+ amqSource.setRuntimeContext(createRuntimeContext());
+ amqSource.open(new Configuration());
+ }
+
+ private RuntimeContext createRuntimeContext() {
+ StreamingRuntimeContext runtimeContext = mock(StreamingRuntimeContext.class);
+ when(runtimeContext.isCheckpointingEnabled()).thenReturn(true);
+ return runtimeContext;
+ }
+
+ @Test
+ public void readFromTopic() throws Exception {
+ AMQSourceConfig<String> config = new AMQSourceConfig.AMQSourceConfigBuilder<String>()
+ .setConnectionFactory(connectionFactory)
+ .setDestinationName(DESTINATION_NAME)
+ .setDeserializationSchema(deserializationSchema)
+ .setDestinationType(DestinationType.TOPIC)
+ .setRunningChecker(new SingleLoopRunChecker())
+ .build();
+ amqSource = new AMQSource<>(config);
+ amqSource.setRuntimeContext(createRuntimeContext());
+ amqSource.open(new Configuration());
+ verify(session).createTopic(DESTINATION_NAME);
+ }
+
+ @Test
+ public void parseReceivedMessage() throws Exception {
+ final byte[] bytes = deserializationSchema.serialize("msg");
+ when(message.getBodyLength()).thenReturn((long) bytes.length);
+ when(message.readBytes(any(byte[].class))).thenAnswer(new Answer<Object>() {
+ @Override
+ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+ byte[] inputBytes = (byte[]) invocationOnMock.getArguments()[0];
+ Array.copy(bytes, 0, inputBytes, 0, bytes.length);
+ return null;
+ }
+ });
+
+ amqSource.run(context);
+
+ verify(context).collect("msg");
+ }
+
+ @Test
+ public void acknowledgeReceivedMessage() throws Exception {
+ amqSource.run(context);
+ amqSource.acknowledgeIDs(CHECKPOINT_ID, Collections.singletonList(MSG_ID));
+
+ verify(message).acknowledge();
+ }
+
+ @Test
+ public void handleUnknownIds() throws Exception {
+ amqSource.run(context);
+ amqSource.acknowledgeIDs(CHECKPOINT_ID, Collections.singletonList("unknown-id"));
+
+ verify(message, never()).acknowledge();
+ }
+
+ @Test
+ public void doNotAcknowledgeMessageTwice() throws Exception {
+ amqSource.run(context);
+ amqSource.acknowledgeIDs(CHECKPOINT_ID, Collections.singletonList(MSG_ID));
+ amqSource.acknowledgeIDs(CHECKPOINT_ID, Collections.singletonList(MSG_ID));
+
+ verify(message, times(1)).acknowledge();
+ }
+
+ @Test(expected = JMSException.class)
+ public void propagateAsyncException() throws Exception {
+ AMQExceptionListener exceptionListener = mock(AMQExceptionListener.class);
+ amqSource.setExceptionListener(exceptionListener);
+ doThrow(JMSException.class).when(exceptionListener).checkErroneous();
+ amqSource.run(context);
+ }
+
+ @Test(expected = RuntimeException.class)
+ public void throwAcknowledgeExceptionByDefault() throws Exception {
+ doThrow(JMSException.class).when(message).acknowledge();
+
+ amqSource.run(context);
+ amqSource.acknowledgeIDs(CHECKPOINT_ID, Collections.singletonList(MSG_ID));
+ }
+
+ @Test
+ public void doNotThrowAcknowledgeExceptionByDefault() throws Exception {
+ amqSource.setLogFailuresOnly(true);
+
+ doThrow(JMSException.class).when(message).acknowledge();
+
+ amqSource.run(context);
+ amqSource.acknowledgeIDs(CHECKPOINT_ID, Collections.singletonList(MSG_ID));
+ }
+
+ @Test
+ public void closeResources() throws Exception {
+ amqSource.close();
+
+ verify(consumer).close();
+ verify(session).close();
+ verify(connection).close();
+ }
+
+ @Test
+ public void consumerCloseExceptionShouldBePased() throws Exception {
+ doThrow(new JMSException("consumer")).when(consumer).close();
+ doThrow(new JMSException("session")).when(session).close();
+ doThrow(new JMSException("connection")).when(connection).close();
+
+ try {
+ amqSource.close();
+ fail("Should throw an exception");
+ } catch (RuntimeException ex) {
+ assertEquals("consumer", ex.getCause().getMessage());
+ }
+ }
+
+ @Test
+ public void sessionCloseExceptionShouldBePased() throws Exception {
+ doThrow(new JMSException("session")).when(session).close();
+ doThrow(new JMSException("connection")).when(connection).close();
+
+ try {
+ amqSource.close();
+ fail("Should throw an exception");
+ } catch (RuntimeException ex) {
+ assertEquals("session", ex.getCause().getMessage());
+ }
+ }
+
+ @Test
+ public void connectionCloseExceptionShouldBePased() throws Exception {
+ doThrow(new JMSException("connection")).when(connection).close();
+
+ try {
+ amqSource.close();
+ fail("Should throw an exception");
+ } catch (RuntimeException ex) {
+ assertEquals("connection", ex.getCause().getMessage());
+ }
+ }
+
+ @Test
+ public void exceptionsShouldNotBePassedIfLogFailuresOnly() throws Exception {
+ doThrow(new JMSException("consumer")).when(consumer).close();
+ doThrow(new JMSException("session")).when(session).close();
+ doThrow(new JMSException("connection")).when(connection).close();
+
+ amqSource.setLogFailuresOnly(true);
+ amqSource.close();
+ }
+
+ class SingleLoopRunChecker extends RunningChecker {
+
+ int count = 0;
+
+ @Override
+ public boolean isRunning() {
+ return (count++ == 0);
+ }
+
+ @Override
+ public void setIsRunning(boolean isRunning) {
+
+ }
+ }
+}
diff --git a/flink-connector-activemq/src/test/java/org/apache/flink/streaming/connectors/activemq/ActiveMQConnectorITCase.java b/flink-connector-activemq/src/test/java/org/apache/flink/streaming/connectors/activemq/ActiveMQConnectorITCase.java
new file mode 100644
index 0000000..985e06d
--- /dev/null
+++ b/flink-connector-activemq/src/test/java/org/apache/flink/streaming/connectors/activemq/ActiveMQConnectorITCase.java
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.activemq;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.apache.flink.test.util.SuccessException;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.HashSet;
+import java.util.Random;
+
+import static org.apache.flink.test.util.TestUtils.tryExecute;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+public class ActiveMQConnectorITCase {
+
+ public static final int MESSAGES_NUM = 10000;
+ public static final String QUEUE_NAME = "queue";
+ public static final String TOPIC_NAME = "topic";
+ private static ForkableFlinkMiniCluster flink;
+ private static int flinkPort;
+
+ @BeforeClass
+ public static void beforeClass() {
+ // start also a re-usable Flink mini cluster
+ Configuration flinkConfig = new Configuration();
+ flinkConfig.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
+ flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8);
+ flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 16);
+ flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s");
+
+ flink = new ForkableFlinkMiniCluster(flinkConfig, false);
+ flink.start();
+
+ flinkPort = flink.getLeaderRPCPort();
+ }
+
+ @AfterClass
+ public static void afterClass() {
+ flinkPort = -1;
+ if (flink != null) {
+ flink.shutdown();
+ }
+ }
+
+ @Test
+ public void amqTopologyWithQueue() throws Exception {
+ StreamExecutionEnvironment env = createExecutionEnvironment();
+ AMQSinkConfig<String> sinkConfig = new AMQSinkConfig.AMQSinkConfigBuilder<String>()
+ .setConnectionFactory(createConnectionFactory())
+ .setDestinationName(QUEUE_NAME)
+ .setSerializationSchema(new SimpleStringSchema())
+ .build();
+ createProducerTopology(env, sinkConfig);
+
+ ActiveMQConnectionFactory sourceConnectionFactory = createConnectionFactory();
+ AMQSourceConfig<String> sourceConfig = new AMQSourceConfig.AMQSourceConfigBuilder<String>()
+ .setConnectionFactory(sourceConnectionFactory)
+ .setDestinationName(QUEUE_NAME)
+ .setDeserializationSchema(new SimpleStringSchema())
+ .build();
+ createConsumerTopology(env, sourceConfig);
+
+ tryExecute(env, "AMQTest");
+ }
+
+ @Test
+ public void amqTopologyWithTopic() throws Exception {
+ StreamExecutionEnvironment env = createExecutionEnvironment();
+ AMQSinkConfig<String> sinkConfig = new AMQSinkConfig.AMQSinkConfigBuilder<String>()
+ .setConnectionFactory(createConnectionFactory())
+ .setDestinationName(TOPIC_NAME)
+ .setSerializationSchema(new SimpleStringSchema())
+ .setDestinationType(DestinationType.TOPIC)
+ .build();
+ createProducerTopology(env, sinkConfig);
+
+ ActiveMQConnectionFactory sourceConnectionFactory = createConnectionFactory();
+ AMQSourceConfig<String> sourceConfig = new AMQSourceConfig.AMQSourceConfigBuilder<String>()
+ .setConnectionFactory(sourceConnectionFactory)
+ .setDestinationName(TOPIC_NAME)
+ .setDeserializationSchema(new SimpleStringSchema())
+ .setDestinationType(DestinationType.TOPIC)
+ .build();
+ createConsumerTopology(env, sourceConfig);
+
+ tryExecute(env, "AMQTest");
+ }
+
+ private StreamExecutionEnvironment createExecutionEnvironment() {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+ env.setRestartStrategy(RestartStrategies.noRestart());
+ env.getConfig().disableSysoutLogging();
+ return env;
+ }
+
+ private ActiveMQConnectionFactory createConnectionFactory() {
+ return new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
+ }
+
+ private void createProducerTopology(StreamExecutionEnvironment env, AMQSinkConfig<String> config) {
+ DataStreamSource<String> stream = env.addSource(new SourceFunction<String>() {
+ @Override
+ public void run(SourceContext<String> ctx) throws Exception {
+ for (int i = 0; i < MESSAGES_NUM; i++) {
+ ctx.collect("amq-" + i);
+ }
+ }
+
+ @Override
+ public void cancel() {}
+ });
+
+
+ AMQSink<String> sink = new AMQSink<>(config);
+ stream.addSink(sink);
+ }
+
+ private void createConsumerTopology(StreamExecutionEnvironment env, AMQSourceConfig<String> config) {
+ AMQSource<String> source = new AMQSource<>(config);
+
+ env.addSource(source)
+ .addSink(new SinkFunction<String>() {
+ final HashSet<Integer> set = new HashSet<>();
+ @Override
+ public void invoke(String value) throws Exception {
+ int val = Integer.parseInt(value.split("-")[1]);
+ set.add(val);
+
+ if (set.size() == MESSAGES_NUM) {
+ throw new SuccessException();
+ }
+ }
+ });
+ }
+
+ @Test
+ public void amqTopologyWithCheckpointing() throws Exception {
+ ActiveMQConnectionFactory connectionFactory = createConnectionFactory();
+ AMQSinkConfig<String> sinkConfig = new AMQSinkConfig.AMQSinkConfigBuilder<String>()
+ .setConnectionFactory(connectionFactory)
+ .setDestinationName("queue2")
+ .setSerializationSchema(new SimpleStringSchema())
+ .build();
+ AMQSink<String> sink = new AMQSink<>(sinkConfig);
+ sink.open(new Configuration());
+
+ for (int i = 0; i < MESSAGES_NUM; i++) {
+ sink.invoke("amq-" + i);
+ }
+
+ AMQSourceConfig<String> sourceConfig = new AMQSourceConfig.AMQSourceConfigBuilder<String>()
+ .setConnectionFactory(connectionFactory)
+ .setDestinationName("queue2")
+ .setDeserializationSchema(new SimpleStringSchema())
+ .build();
+
+ final AMQSource<String> source = new AMQSource<>(sourceConfig);
+ RuntimeContext runtimeContext = createMockRuntimeContext();
+ source.setRuntimeContext(runtimeContext);
+ source.open(new Configuration());
+
+ final TestSourceContext sourceContext = new TestSourceContext();
+ Thread thread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ source.run(sourceContext);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ });
+ thread.start();
+
+ Deadline deadline = FiniteDuration.apply(5, "s").fromNow();
+ while (deadline.hasTimeLeft() && sourceContext.getIdsNum() < MESSAGES_NUM) {
+ Thread.sleep(100);
+ Random random = new Random();
+ long checkpointId = random.nextLong();
+ synchronized (sourceContext.getCheckpointLock()) {
+ source.snapshotState(checkpointId, System.currentTimeMillis());
+ source.notifyCheckpointComplete(checkpointId);
+ }
+ }
+ assertEquals(MESSAGES_NUM, sourceContext.getIdsNum());
+ }
+
+ private RuntimeContext createMockRuntimeContext() {
+ StreamingRuntimeContext runtimeContext = mock(StreamingRuntimeContext.class);
+ when(runtimeContext.isCheckpointingEnabled()).thenReturn(true);
+ return runtimeContext;
+ }
+
+ class TestSourceContext implements SourceFunction.SourceContext<String> {
+
+ private HashSet<Integer> ids = new HashSet<>();
+ private Object contextLock = new Object();
+ @Override
+ public void collect(String value) {
+ int val = Integer.parseInt(value.split("-")[1]);
+ ids.add(val);
+ }
+
+ @Override
+ public void collectWithTimestamp(String element, long timestamp) { }
+
+ @Override
+ public void emitWatermark(Watermark mark) { }
+
+ @Override
+ public Object getCheckpointLock() {
+ return contextLock;
+ }
+
+ @Override
+ public void close() { }
+
+ public int getIdsNum() {
+ synchronized (contextLock) {
+ return ids.size();
+ }
+ }
+ };
+}
diff --git a/flink-connector-flume/pom.xml b/flink-connector-flume/pom.xml
index f8b98f1..8551abd 100644
--- a/flink-connector-flume/pom.xml
+++ b/flink-connector-flume/pom.xml
@@ -152,6 +152,7 @@
<!-- Override artifactSet configuration to build fat-jar with all dependencies packed. -->
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
+ <version>2.4.3</version>
<executions>
<execution>
<id>shade-flink</id>
diff --git a/pom.xml b/pom.xml
index 01fb4fc..31af3a3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -73,6 +73,7 @@
<modules>
<module>flink-connector-redis</module>
<module>flink-connector-flume</module>
+ <module>flink-connector-activemq</module>
</modules>
<properties>