[BAHIR-58] Add ActiveMQ connector

Closes #3
diff --git a/.travis.yml b/.travis.yml
index 4c3ba91..fd3733a 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -18,7 +18,7 @@
 language: java
 
 env:
-  - FLINK_VERSION="1.0.3"
+#  - FLINK_VERSION="1.1.0"
   - FLINK_VERSION="1.1.1"
 
 jdk:
diff --git a/flink-connector-activemq/pom.xml b/flink-connector-activemq/pom.xml
new file mode 100644
index 0000000..528b546
--- /dev/null
+++ b/flink-connector-activemq/pom.xml
@@ -0,0 +1,118 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.bahir</groupId>
+        <artifactId>bahir-flink_parent_2.11</artifactId>
+        <version>1.0.0-SNAPSHOT</version>
+        <relativePath>..</relativePath>
+    </parent>
+
+    <artifactId>flink-connector-activemq_2.11</artifactId>
+    <name>flink-connector-activemq</name>
+
+    <packaging>jar</packaging>
+
+    <!-- Allow users to pass custom connector versions -->
+    <properties>
+        <activemq.version>5.14.0</activemq.version>
+    </properties>
+
+    <dependencies>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-streaming-java_2.11</artifactId>
+            <version>${flink.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.activemq</groupId>
+            <artifactId>activemq-client</artifactId>
+            <version>${activemq.version}</version>
+        </dependency>
+
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-streaming-java_2.11</artifactId>
+            <version>${flink.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-test-utils_2.11</artifactId>
+            <version>${flink.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-tests_2.11</artifactId>
+            <version>${flink.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-runtime_2.11</artifactId>
+            <version>${flink.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.activemq.tooling</groupId>
+            <artifactId>activemq-junit</artifactId>
+            <version>${activemq.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.activemq</groupId>
+            <artifactId>activemq-broker</artifactId>
+            <version>${activemq.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.powermock</groupId>
+            <artifactId>powermock-module-junit4</artifactId>
+            <version>1.5.5</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.powermock</groupId>
+            <artifactId>powermock-api-mockito</artifactId>
+            <version>1.5.5</version>
+            <scope>test</scope>
+        </dependency>
+
+    </dependencies>
+
+</project>
diff --git a/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSink.java b/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSink.java
new file mode 100644
index 0000000..a494162
--- /dev/null
+++ b/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSink.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.activemq;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.connectors.activemq.internal.AMQUtil;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+/**
+ * Sink class for writing data into ActiveMQ queue.
+ * <p>
+ * To create an instance of AMQSink class one should initialize and configure an
+ * instance of a connection factory that will be used to create a connection.
+ * Every input message is converted into a byte array using a serialization
+ * schema and being sent into a message queue.
+ *
+ * @param <IN> type of input messages
+ */
+public class AMQSink<IN> extends RichSinkFunction<IN> {
+    private static final long serialVersionUID = 1L;
+
+    private static final Logger LOG = LoggerFactory.getLogger(AMQSink.class);
+
+
+    // Factory that is used to create AMQ connection
+    private final ActiveMQConnectionFactory connectionFactory;
+    // Name of a queue or topic
+    private final String destinationName;
+    // Serialization scheme that is used to convert input message to bytes
+    private final SerializationSchema<IN> serializationSchema;
+    // Defines if persistent delivery in AMQ is used
+    private final boolean persistentDelivery;
+    // Type of AMQ destination (topic or a queue)
+    private final DestinationType destinationType;
+    // Throw exceptions or just log them
+    private boolean logFailuresOnly = false;
+    // Used to send messages
+    private transient MessageProducer producer;
+    // AMQ session
+    private transient Session session;
+    // AMQ connection
+    private transient Connection connection;
+
+    /**
+     * Create AMQSink.
+     *
+     * @param config AMQSink configuration
+     */
+    public AMQSink(AMQSinkConfig<IN> config) {
+        this.connectionFactory = config.getConnectionFactory();
+        this.destinationName = config.getDestinationName();
+        this.serializationSchema = config.getSerializationSchema();
+        this.persistentDelivery = config.isPersistentDelivery();
+        this.destinationType = config.getDestinationType();
+    }
+
+    /**
+     * Defines whether the producer should fail on errors, or only log them.
+     * If this is set to true, then exceptions will be only logged, if set to false,
+     * exceptions will be eventually thrown and cause the streaming program to
+     * fail (and enter recovery).
+     *
+     * @param logFailuresOnly The flag to indicate logging-only on exceptions.
+     */
+    public void setLogFailuresOnly(boolean logFailuresOnly) {
+        this.logFailuresOnly = logFailuresOnly;
+    }
+
+
+    @Override
+    public void open(Configuration config) throws Exception {
+        super.open(config);
+        // Create a Connection
+        connection = connectionFactory.createConnection();
+        connection.start();
+
+        // Create a Session
+        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        // Create the destination (Topic or Queue)
+        Destination destination = AMQUtil.getDestination(session, destinationType, destinationName);
+
+        // Create a MessageProducer from the Session to the Topic or
+        // Queue
+        producer = session.createProducer(destination);
+        producer.setDeliveryMode(getDeliveryMode());
+    }
+
+    private int getDeliveryMode() {
+        if (persistentDelivery) {
+            return DeliveryMode.PERSISTENT;
+        }
+
+        return DeliveryMode.NON_PERSISTENT;
+    }
+
+    /**
+     * Called when new data arrives to the sink, and forwards it to RMQ.
+     *
+     * @param value
+     *            The incoming data
+     */
+    @Override
+    public void invoke(IN value) {
+        try {
+            byte[] bytes = serializationSchema.serialize(value);
+            BytesMessage message = session.createBytesMessage();
+            message.writeBytes(bytes);
+            producer.send(message);
+        } catch (JMSException e) {
+            if (logFailuresOnly) {
+                LOG.error("Failed to send message to ActiveMQ", e);
+            } else {
+                throw new RuntimeException("Failed to send message to ActiveMQ", e);
+            }
+        }
+    }
+
+    @Override
+    public void close() {
+        RuntimeException t = null;
+        try {
+            session.close();
+        } catch (JMSException e) {
+            if (logFailuresOnly) {
+                LOG.error("Failed to close ActiveMQ session", e);
+            } else {
+                t = new RuntimeException("Failed to close ActiveMQ session", e);
+            }
+        }
+
+        try {
+            connection.close();
+        } catch (JMSException e) {
+            if (logFailuresOnly) {
+                LOG.error("Failed to close ActiveMQ connection", e);
+            } else {
+                t = t == null    ? new RuntimeException("Failed to close ActiveMQ session", e)
+                                : t;
+            }
+        }
+
+        if (t != null) {
+            throw t;
+        }
+    }
+
+}
diff --git a/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSinkConfig.java b/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSinkConfig.java
new file mode 100644
index 0000000..86254ff
--- /dev/null
+++ b/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSinkConfig.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.activemq;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Immutable configuration for AMQSink
+ * @param <IN> type of input messages in configured sink
+ */
+public class AMQSinkConfig<IN> {
+    private final ActiveMQConnectionFactory connectionFactory;
+    private final String queueName;
+    private final SerializationSchema<IN> serializationSchema;
+    private final boolean persistentDelivery;
+    private final DestinationType destinationType;
+
+    public AMQSinkConfig(ActiveMQConnectionFactory connectionFactory, String queueName,
+                        SerializationSchema<IN> serializationSchema, boolean persistentDelivery,
+                        DestinationType destinationType) {
+        this.connectionFactory = Preconditions.checkNotNull(connectionFactory, "connectionFactory");
+        this.queueName = Preconditions.checkNotNull(queueName, "destinationName");
+        this.serializationSchema = Preconditions.checkNotNull(serializationSchema, "serializationSchema");
+        this.persistentDelivery = persistentDelivery;
+        this.destinationType = Preconditions.checkNotNull(destinationType, "destinationType");
+    }
+
+    public ActiveMQConnectionFactory getConnectionFactory() {
+        return connectionFactory;
+    }
+
+    public String getDestinationName() {
+        return queueName;
+    }
+
+    public SerializationSchema<IN> getSerializationSchema() {
+        return serializationSchema;
+    }
+
+    public boolean isPersistentDelivery() {
+        return persistentDelivery;
+    }
+
+    public DestinationType getDestinationType() {
+        return destinationType;
+    }
+
+
+    /**
+     * Builder for {@link AMQSinkConfig}
+     * @param <IN> type of input messages in configured sink
+     */
+    public static class AMQSinkConfigBuilder<IN> {
+        private ActiveMQConnectionFactory connectionFactory;
+        private String destinationName;
+        private SerializationSchema<IN> serializationSchema;
+        private boolean persistentDelivery;
+        private DestinationType destinationType = DestinationType.QUEUE;
+
+        public AMQSinkConfigBuilder<IN> setConnectionFactory(ActiveMQConnectionFactory connectionFactory) {
+            this.connectionFactory = Preconditions.checkNotNull(connectionFactory);
+            return this;
+        }
+
+        public AMQSinkConfigBuilder<IN> setDestinationName(String queueName) {
+            this.destinationName = Preconditions.checkNotNull(queueName);
+            return this;
+        }
+
+        public AMQSinkConfigBuilder<IN> setSerializationSchema(SerializationSchema<IN> serializationSchema) {
+            this.serializationSchema = Preconditions.checkNotNull(serializationSchema);
+            return this;
+        }
+
+        public AMQSinkConfigBuilder<IN> setPersistentDelivery(boolean persistentDelivery) {
+            this.persistentDelivery = persistentDelivery;
+            return this;
+        }
+
+        public AMQSinkConfigBuilder<IN> setDestinationType(DestinationType destinationType) {
+            this.destinationType = Preconditions.checkNotNull(destinationType);
+            return this;
+        }
+
+        public AMQSinkConfig<IN> build() {
+            return new AMQSinkConfig<IN>(connectionFactory, destinationName, serializationSchema, persistentDelivery, destinationType);
+        }
+
+    }
+}
diff --git a/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSource.java b/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSource.java
new file mode 100644
index 0000000..49f2cf7
--- /dev/null
+++ b/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSource.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.activemq;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQSession;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.connectors.activemq.internal.AMQExceptionListener;
+import org.apache.flink.streaming.connectors.activemq.internal.AMQUtil;
+import org.apache.flink.streaming.connectors.activemq.internal.RunningChecker;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import java.util.HashMap;
+import java.util.List;
+
+/**
+ * Source for reading messages from an ActiveMQ queue.
+ * <p>
+ * To create an instance of AMQSink class one should initialize and configure an
+ * instance of a connection factory that will be used to create a connection.
+ * This source is waiting for incoming messages from ActiveMQ and converts them from
+ * an array of bytes into an instance of the output type. If an incoming
+ * message is not a message with an array of bytes, this message is ignored
+ * and warning message is logged.
+ *
+ * If checkpointing is enabled AMQSink will not acknowledge received AMQ messages as they arrive,
+ * but will store them internally and will acknowledge a bulk of messages during checkpointing.
+ *
+ * @param <OUT> type of output messages
+ */
+public class AMQSource<OUT> extends MessageAcknowledgingSourceBase<OUT, String>
+    implements ResultTypeQueryable<OUT> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AMQSource.class);
+
+    // Factory that is used to create AMQ connection
+    private final ActiveMQConnectionFactory connectionFactory;
+    // Name of a queue or topic
+    private final String destinationName;
+    // Deserialization scheme that is used to convert bytes to output message
+    private final DeserializationSchema<OUT> deserializationSchema;
+    // Type of AMQ destination (topic or a queue)
+    private final DestinationType destinationType;
+    // Throw exceptions or just log them
+    private boolean logFailuresOnly = false;
+    // Stores if source is running (used for testing)
+    private RunningChecker runningChecker;
+    // AMQ connection
+    private transient Connection connection;
+    // AMQ session
+    private transient Session session;
+    // Used to receive incoming messages
+    private transient MessageConsumer consumer;
+    // If source should immediately acknowledge incoming message
+    private boolean autoAck;
+    // Map of message ids to currently unacknowledged AMQ messages
+    private HashMap<String, Message> unacknowledgedMessages = new HashMap<>();
+    // Listener for AMQ exceptions
+    private AMQExceptionListener exceptionListener;
+
+    /**
+     * Create AMQSource.
+     *
+     * @param config AMQSource configuration
+     */
+    AMQSource(AMQSourceConfig<OUT> config) {
+        super(String.class);
+        this.connectionFactory = config.getConnectionFactory();
+        this.destinationName = config.getDestinationName();
+        this.deserializationSchema = config.getDeserializationSchema();
+        this.runningChecker = config.getRunningChecker();
+        this.destinationType = config.getDestinationType();
+    }
+
+    /**
+     * Defines whether the producer should fail on errors, or only log them.
+     * If this is set to true, then exceptions will be only logged, if set to false,
+     * exceptions will be eventually thrown and cause the streaming program to
+     * fail (and enter recovery).
+     *
+     * @param logFailuresOnly The flag to indicate logging-only on exceptions.
+     */
+    public void setLogFailuresOnly(boolean logFailuresOnly) {
+        this.logFailuresOnly = logFailuresOnly;
+    }
+
+    // Visible for testing
+    void setExceptionListener(AMQExceptionListener exceptionListener) {
+        this.exceptionListener = exceptionListener;
+    }
+
+    @Override
+    public void open(Configuration config) throws Exception {
+        super.open(config);
+        // Create a Connection
+        connection = connectionFactory.createConnection();
+        connection.start();
+
+        exceptionListener = new AMQExceptionListener(LOG, logFailuresOnly);
+        connection.setExceptionListener(exceptionListener);
+
+        RuntimeContext runtimeContext = getRuntimeContext();
+        int acknowledgeType;
+        if (runtimeContext instanceof StreamingRuntimeContext
+            && ((StreamingRuntimeContext) runtimeContext).isCheckpointingEnabled()) {
+            autoAck = false;
+            acknowledgeType = ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE;
+        } else {
+            autoAck = true;
+            acknowledgeType = ActiveMQSession.AUTO_ACKNOWLEDGE;
+        }
+        // Create a Session
+        session = connection.createSession(false, acknowledgeType);
+
+        // Create the destination (Topic or Queue)
+        Destination destination = AMQUtil.getDestination(session, destinationType, destinationName);
+
+        // Create a MessageConsumer from the Session to the Topic or
+        // Queue
+        consumer = session.createConsumer(destination);
+        runningChecker.setIsRunning(true);
+    }
+
+    @Override
+    public void close() throws Exception {
+        super.close();
+        RuntimeException exception = null;
+        try {
+            consumer.close();
+        } catch (JMSException e) {
+            if (logFailuresOnly) {
+                LOG.error("Failed to close ActiveMQ session", e);
+            } else {
+                exception = new RuntimeException("Failed to close ActiveMQ consumer", e);
+            }
+        }
+        try {
+            session.close();
+        } catch (JMSException e) {
+            if (logFailuresOnly) {
+                LOG.error("Failed to close ActiveMQ session", e);
+            } else {
+                exception = exception == null    ? new RuntimeException("Failed to close ActiveMQ session", e)
+                                                : exception;
+            }
+
+        }
+        try {
+            connection.close();
+        } catch (JMSException e) {
+            if (logFailuresOnly) {
+                LOG.error("Failed to close ActiveMQ session", e);
+            } else {
+                exception = exception == null    ? new RuntimeException("Failed to close ActiveMQ connection", e)
+                                                : exception;
+            }
+        }
+
+        if (exception != null) {
+            throw exception;
+        }
+    }
+
+    @Override
+    protected void acknowledgeIDs(long checkpointId, List<String> UIds) {
+        try {
+            for (String messageId : UIds) {
+                Message unacknowledgedMessage = unacknowledgedMessages.get(messageId);
+                if (unacknowledgedMessage != null) {
+                    unacknowledgedMessage.acknowledge();
+                    unacknowledgedMessages.remove(messageId);
+                } else {
+                    LOG.warn("Tried to acknowledge unknown ActiveMQ message id: {}", messageId);
+                }
+            }
+        } catch (JMSException e) {
+            if (logFailuresOnly) {
+                LOG.error("Failed to acknowledge ActiveMQ message");
+            } else {
+                throw new RuntimeException("Failed to acknowledge ActiveMQ message");
+            }
+        }
+    }
+
+    @Override
+    public void run(SourceContext<OUT> ctx) throws Exception {
+        while (runningChecker.isRunning()) {
+            exceptionListener.checkErroneous();
+
+            Message message = consumer.receive(1000);
+            if (! (message instanceof BytesMessage)) {
+                LOG.warn("Active MQ source received non bytes message: {}");
+                return;
+            }
+            BytesMessage bytesMessage = (BytesMessage) message;
+            byte[] bytes = new byte[(int) bytesMessage.getBodyLength()];
+            bytesMessage.readBytes(bytes);
+            OUT value = deserializationSchema.deserialize(bytes);
+            synchronized (ctx.getCheckpointLock()) {
+                ctx.collect(value);
+                if (!autoAck) {
+                    addId(bytesMessage.getJMSMessageID());
+                    unacknowledgedMessages.put(bytesMessage.getJMSMessageID(), bytesMessage);
+                }
+            }
+        }
+    }
+
+    @Override
+    public void cancel() {
+        runningChecker.setIsRunning(false);
+    }
+
+    @Override
+    public TypeInformation<OUT> getProducedType() {
+        return deserializationSchema.getProducedType();
+    }
+
+}
diff --git a/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSourceConfig.java b/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSourceConfig.java
new file mode 100644
index 0000000..2dcb2cb
--- /dev/null
+++ b/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSourceConfig.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.activemq;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.flink.streaming.connectors.activemq.internal.RunningChecker;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Immutable AMQ source config.
+ *
+ * @param <OUT> type of source output messages
+ */
+public class AMQSourceConfig<OUT> {
+
+    private final ActiveMQConnectionFactory connectionFactory;
+    private final String destinationName;
+    private final DeserializationSchema<OUT> deserializationSchema;
+    private final RunningChecker runningChecker;
+    private final DestinationType destinationType;
+
+    AMQSourceConfig(ActiveMQConnectionFactory connectionFactory, String destinationName,
+                    DeserializationSchema<OUT> deserializationSchema, RunningChecker runningChecker,
+                    DestinationType destinationType) {
+        this.connectionFactory = Preconditions.checkNotNull(connectionFactory, "connectionFactory");
+        this.destinationName = Preconditions.checkNotNull(destinationName, "destinationName");
+        this.deserializationSchema = Preconditions.checkNotNull(deserializationSchema, "deserializationSchema");
+        this.runningChecker = Preconditions.checkNotNull(runningChecker, "runningChecker");
+        this.destinationType = Preconditions.checkNotNull(destinationType, "destinationType");
+    }
+
+    public ActiveMQConnectionFactory getConnectionFactory() {
+        return connectionFactory;
+    }
+
+    public String getDestinationName() {
+        return destinationName;
+    }
+
+    public DeserializationSchema<OUT> getDeserializationSchema() {
+        return deserializationSchema;
+    }
+
+    public RunningChecker getRunningChecker() {
+        return runningChecker;
+    }
+
+    public DestinationType getDestinationType() {
+        return destinationType;
+    }
+
+    /**
+     * Builder for {@link AMQSourceConfig}
+     *
+     * @param <OUT> type of source output messages
+     */
+    public static class AMQSourceConfigBuilder<OUT> {
+        private ActiveMQConnectionFactory connectionFactory;
+        private String destinationName;
+        private DeserializationSchema<OUT> deserializationSchema;
+        private RunningChecker runningChecker = new RunningChecker();
+        private DestinationType destinationType = DestinationType.QUEUE;
+
+        public AMQSourceConfigBuilder<OUT> setConnectionFactory(ActiveMQConnectionFactory connectionFactory) {
+            this.connectionFactory = Preconditions.checkNotNull(connectionFactory);
+            return this;
+        }
+
+        public AMQSourceConfigBuilder<OUT> setDestinationName(String destinationName) {
+            this.destinationName = Preconditions.checkNotNull(destinationName);
+            return this;
+        }
+
+        public AMQSourceConfigBuilder<OUT> setDeserializationSchema(DeserializationSchema<OUT> deserializationSchema) {
+            this.deserializationSchema = Preconditions.checkNotNull(deserializationSchema);
+            return this;
+        }
+
+        public AMQSourceConfigBuilder<OUT> setRunningChecker(RunningChecker runningChecker) {
+            this.runningChecker = Preconditions.checkNotNull(runningChecker);
+            return this;
+        }
+
+        public AMQSourceConfigBuilder<OUT> setDestinationType(DestinationType destinationType) {
+            this.destinationType = Preconditions.checkNotNull(destinationType);
+            return this;
+        }
+
+        public AMQSourceConfig<OUT> build() {
+            return new AMQSourceConfig<OUT>(connectionFactory, destinationName, deserializationSchema, runningChecker, destinationType);
+        }
+
+    }
+}
diff --git a/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/DestinationType.java b/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/DestinationType.java
new file mode 100644
index 0000000..fa111a4
--- /dev/null
+++ b/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/DestinationType.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.activemq;
+
+/**
+ * Type of AMQ destination
+ */
+public enum DestinationType {
+    QUEUE,
+    TOPIC
+}
diff --git a/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/internal/AMQExceptionListener.java b/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/internal/AMQExceptionListener.java
new file mode 100644
index 0000000..94fcd56
--- /dev/null
+++ b/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/internal/AMQExceptionListener.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.activemq.internal;
+
+import org.slf4j.Logger;
+
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+
+public class AMQExceptionListener implements ExceptionListener {
+
+    private final boolean logFailuresOnly;
+    private final Logger logger;
+    private JMSException exception;
+
+    public AMQExceptionListener(Logger logger, boolean logFailuresOnly) {
+        this.logger = logger;
+        this.logFailuresOnly = logFailuresOnly;
+    }
+
+    @Override
+    public void onException(JMSException e) {
+        this.exception = e;
+    }
+
+    /**
+     * Check if the listener received an asynchronous exception. Throws an exception if it was
+     * received and if logFailuresOnly was set to true. Resets the state after the call
+     * so a single exception can be thrown only once.
+     *
+     * @throws JMSException if exception was received and logFailuresOnly was set to true.
+     */
+    public void checkErroneous() throws JMSException {
+        if (exception == null) {
+            return;
+        }
+
+        JMSException recordedException = exception;
+        exception = null;
+        if (logFailuresOnly) {
+            logger.error("Received ActiveMQ exception", recordedException);
+        } else {
+            throw recordedException;
+        }
+    }
+}
diff --git a/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/internal/AMQUtil.java b/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/internal/AMQUtil.java
new file mode 100644
index 0000000..e5ae524
--- /dev/null
+++ b/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/internal/AMQUtil.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.activemq.internal;
+
+import org.apache.flink.streaming.connectors.activemq.DestinationType;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Session;
+
+/**
+ * Utilities for AMQ connector
+ */
+public class AMQUtil {
+    private AMQUtil() {}
+
+    /**
+     * Create ActiveMQ destination (queue or topic).
+     *
+     * @param session AMQ session
+     * @param destinationType destination type to create
+     * @param destinationName name of the destination
+     * @return created destination
+     * @throws JMSException
+     */
+    public static Destination getDestination(Session session, DestinationType destinationType,
+                                            String destinationName) throws JMSException {
+        switch (destinationType) {
+            case QUEUE:
+                return session.createQueue(destinationName);
+            case TOPIC:
+                return session.createTopic(destinationName);
+            default:
+                throw new IllegalArgumentException("Unknown destination type: " + destinationType);
+        }
+    }
+
+}
diff --git a/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/internal/RunningChecker.java b/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/internal/RunningChecker.java
new file mode 100644
index 0000000..8c46695
--- /dev/null
+++ b/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/internal/RunningChecker.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.activemq.internal;
+
+import java.io.Serializable;
+
+/**
+ * Class that is used to store current status of source execution
+ */
+public class RunningChecker implements Serializable {
+    private volatile boolean isRunning = false;
+
+    /**
+     * Check if source should run.
+     *
+     * @return true if source should run, false otherwise
+     */
+    public boolean isRunning() {
+        return isRunning;
+    }
+
+    /**
+     * Set if source should run.
+     *
+     * @param isRunning true if source should run, false otherwise
+     */
+    public void setIsRunning(boolean isRunning) {
+        this.isRunning = isRunning;
+    }
+}
diff --git a/flink-connector-activemq/src/test/java/org/apache/flink/streaming/connectors/activemq/AMQExceptionListenerTest.java b/flink-connector-activemq/src/test/java/org/apache/flink/streaming/connectors/activemq/AMQExceptionListenerTest.java
new file mode 100644
index 0000000..81bb926
--- /dev/null
+++ b/flink-connector-activemq/src/test/java/org/apache/flink/streaming/connectors/activemq/AMQExceptionListenerTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.activemq;
+
+import org.apache.flink.streaming.connectors.activemq.internal.AMQExceptionListener;
+import org.junit.Test;
+import org.slf4j.Logger;
+
+import javax.jms.JMSException;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+public class AMQExceptionListenerTest {
+    @Test
+    public void logMessageOnException() throws JMSException {
+        Logger logger = mock(Logger.class);
+        AMQExceptionListener listener = new AMQExceptionListener(logger, true);
+        JMSException exception = new JMSException("error");
+        listener.onException(exception);
+        listener.checkErroneous();
+        verify(logger).error("Received ActiveMQ exception", exception);
+    }
+
+    @Test
+    public void logMessageWrittenOnlyOnce() throws JMSException {
+        Logger logger = mock(Logger.class);
+        AMQExceptionListener listener = new AMQExceptionListener(logger, true);
+        JMSException exception = new JMSException("error");
+        listener.onException(exception);
+        listener.checkErroneous();
+        listener.checkErroneous();
+        verify(logger, times(1)).error("Received ActiveMQ exception", exception);
+    }
+
+    @Test(expected = JMSException.class)
+    public void throwException() throws JMSException {
+        Logger logger = mock(Logger.class);
+        AMQExceptionListener listener = new AMQExceptionListener(logger, false);
+        listener.onException(new JMSException("error"));
+        listener.checkErroneous();
+    }
+
+    @Test
+    public void throwExceptionOnlyOnce() throws JMSException {
+        Logger logger = mock(Logger.class);
+        AMQExceptionListener listener = new AMQExceptionListener(logger, false);
+        listener.onException(new JMSException("error"));
+
+        try {
+            listener.checkErroneous();
+        } catch (JMSException ignore) {
+            // ignore
+        }
+        listener.checkErroneous();
+    }
+
+    @Test
+    public void logMessageNotWrittenIfNoException() throws JMSException {
+        Logger logger = mock(Logger.class);
+        AMQExceptionListener listener = new AMQExceptionListener(logger, false);
+        listener.checkErroneous();
+        verify(logger, times(0)).error(any(String.class), any(Throwable.class));
+    }
+}
diff --git a/flink-connector-activemq/src/test/java/org/apache/flink/streaming/connectors/activemq/AMQSinkTest.java b/flink-connector-activemq/src/test/java/org/apache/flink/streaming/connectors/activemq/AMQSinkTest.java
new file mode 100644
index 0000000..b9ecfd8
--- /dev/null
+++ b/flink-connector-activemq/src/test/java/org/apache/flink/streaming/connectors/activemq/AMQSinkTest.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.streaming.connectors.activemq;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class AMQSinkTest {
+
+    private final String DESTINATION_NAME = "queue";
+
+    private ActiveMQConnectionFactory connectionFactory;
+    private MessageProducer producer;
+    private Session session;
+    private Connection connection;
+    private Destination destination;
+    private BytesMessage message;
+
+    private AMQSink<String> amqSink;
+    private SerializationSchema<String> serializationSchema;
+
+    @Before
+    public void before() throws Exception {
+        connectionFactory = mock(ActiveMQConnectionFactory.class);
+        producer = mock(MessageProducer.class);
+        session = mock(Session.class);
+        connection = mock(Connection.class);
+        destination = mock(Destination.class);
+        message = mock(BytesMessage.class);
+
+        when(connectionFactory.createConnection()).thenReturn(connection);
+        when(connection.createSession(anyBoolean(), anyInt())).thenReturn(session);
+        when(session.createProducer(null)).thenReturn(producer);
+        when(session.createBytesMessage()).thenReturn(message);
+        serializationSchema = new SimpleStringSchema();
+
+        AMQSinkConfig<String> config = new AMQSinkConfig.AMQSinkConfigBuilder<String>()
+            .setConnectionFactory(connectionFactory)
+            .setDestinationName(DESTINATION_NAME)
+            .setSerializationSchema(serializationSchema)
+            .build();
+        amqSink = new AMQSink<>(config);
+        amqSink.open(new Configuration());
+    }
+
+    @Test
+    public void messageSentToProducer() throws Exception {
+        byte[] expectedMessage = serializationSchema.serialize("msg");
+        amqSink.invoke("msg");
+
+        verify(producer).send(message);
+        verify(message).writeBytes(expectedMessage);
+    }
+
+    @Test
+    public void setPersistentDeliveryMode() throws Exception {
+        AMQSinkConfig<String> config = new AMQSinkConfig.AMQSinkConfigBuilder<String>()
+            .setConnectionFactory(connectionFactory)
+            .setDestinationName(DESTINATION_NAME)
+            .setSerializationSchema(serializationSchema)
+            .setPersistentDelivery(true)
+            .build();
+        amqSink = new AMQSink<>(config);
+        amqSink.open(new Configuration());
+        verify(producer).setDeliveryMode(DeliveryMode.PERSISTENT);
+    }
+
+    @Test
+    public void writeToTopic() throws Exception {
+        AMQSinkConfig<String> config = new AMQSinkConfig.AMQSinkConfigBuilder<String>()
+            .setConnectionFactory(connectionFactory)
+            .setDestinationName(DESTINATION_NAME)
+            .setSerializationSchema(serializationSchema)
+            .setDestinationType(DestinationType.TOPIC)
+            .build();
+        amqSink = new AMQSink<>(config);
+        amqSink.open(new Configuration());
+        verify(session).createTopic(DESTINATION_NAME);
+    }
+
+    @Test
+    public void exceptionOnSendAreNotThrown() throws Exception {
+        when(session.createBytesMessage()).thenThrow(JMSException.class);
+        amqSink.setLogFailuresOnly(true);
+
+        amqSink.invoke("msg");
+    }
+
+    @Test(expected = RuntimeException.class)
+    public void exceptionOnSendAreThrownByDefault() throws Exception {
+        when(session.createBytesMessage()).thenThrow(JMSException.class);
+
+        amqSink.invoke("msg");
+    }
+
+    @Test
+    public void sessionAndConnectionAreClosed() throws Exception {
+        amqSink.close();
+        verify(session).close();
+        verify(connection).close();
+    }
+
+    @Test
+    public void connectionCloseExceptionIsIgnored() throws Exception {
+        doThrow(new JMSException("session")).when(session).close();
+        doThrow(new JMSException("connection")).when(connection).close();
+
+        try {
+            amqSink.close();
+            fail("Should throw an exception");
+        } catch (RuntimeException ex) {
+            assertEquals("session", ex.getCause().getMessage());
+        }
+    }
+
+    @Test
+    public void connectionCloseExceptionIsPassed() throws Exception {
+        doThrow(new JMSException("connection")).when(connection).close();
+
+        try {
+            amqSink.close();
+            fail("Should throw an exception");
+        } catch (RuntimeException ex) {
+            assertEquals("connection", ex.getCause().getMessage());
+        }
+    }
+
+    @Test
+    public void exceptionDuringCloseAsIgnored() throws Exception {
+        doThrow(new JMSException("session")).when(session).close();
+        doThrow(new JMSException("connection")).when(connection).close();
+
+        amqSink.setLogFailuresOnly(true);
+        amqSink.close();
+    }
+}
diff --git a/flink-connector-activemq/src/test/java/org/apache/flink/streaming/connectors/activemq/AMQSourceTest.java b/flink-connector-activemq/src/test/java/org/apache/flink/streaming/connectors/activemq/AMQSourceTest.java
new file mode 100644
index 0000000..05d0d60
--- /dev/null
+++ b/flink-connector-activemq/src/test/java/org/apache/flink/streaming/connectors/activemq/AMQSourceTest.java
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.activemq;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.connectors.activemq.internal.AMQExceptionListener;
+import org.apache.flink.streaming.connectors.activemq.internal.RunningChecker;
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import scala.Array;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+
+import java.util.Collections;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class AMQSourceTest {
+
+    private static final long CHECKPOINT_ID = 1;
+    private final String DESTINATION_NAME = "queue";
+    private final String MSG_ID = "msgId";
+
+    private ActiveMQConnectionFactory connectionFactory;
+    private Session session;
+    private Connection connection;
+    private Destination destination;
+    private MessageConsumer consumer;
+    private BytesMessage message;
+
+    private AMQSource<String> amqSource;
+    private SimpleStringSchema deserializationSchema;
+    SourceFunction.SourceContext<String> context;
+
+    @Before
+    public void before() throws Exception {
+        connectionFactory = mock(ActiveMQConnectionFactory.class);
+        session = mock(Session.class);
+        connection = mock(Connection.class);
+        destination = mock(Destination.class);
+        consumer = mock(MessageConsumer.class);
+        context = mock(SourceFunction.SourceContext.class);
+
+        message = mock(BytesMessage.class);
+
+        when(connectionFactory.createConnection()).thenReturn(connection);
+        when(connection.createSession(anyBoolean(), anyInt())).thenReturn(session);
+        when(consumer.receive(anyInt())).thenReturn(message);
+        when(session.createConsumer(any(Destination.class))).thenReturn(consumer);
+        when(context.getCheckpointLock()).thenReturn(new Object());
+        when(message.getJMSMessageID()).thenReturn(MSG_ID);
+
+        deserializationSchema = new SimpleStringSchema();
+        AMQSourceConfig<String> config = new AMQSourceConfig.AMQSourceConfigBuilder<String>()
+            .setConnectionFactory(connectionFactory)
+            .setDestinationName(DESTINATION_NAME)
+            .setDeserializationSchema(deserializationSchema)
+            .setRunningChecker(new SingleLoopRunChecker())
+            .build();
+        amqSource = new AMQSource<>(config);
+        amqSource.setRuntimeContext(createRuntimeContext());
+        amqSource.open(new Configuration());
+    }
+
+    private RuntimeContext createRuntimeContext() {
+        StreamingRuntimeContext runtimeContext = mock(StreamingRuntimeContext.class);
+        when(runtimeContext.isCheckpointingEnabled()).thenReturn(true);
+        return runtimeContext;
+    }
+
+    @Test
+    public void readFromTopic() throws Exception {
+        AMQSourceConfig<String> config = new AMQSourceConfig.AMQSourceConfigBuilder<String>()
+            .setConnectionFactory(connectionFactory)
+            .setDestinationName(DESTINATION_NAME)
+            .setDeserializationSchema(deserializationSchema)
+            .setDestinationType(DestinationType.TOPIC)
+            .setRunningChecker(new SingleLoopRunChecker())
+            .build();
+        amqSource = new AMQSource<>(config);
+        amqSource.setRuntimeContext(createRuntimeContext());
+        amqSource.open(new Configuration());
+        verify(session).createTopic(DESTINATION_NAME);
+    }
+
+    @Test
+    public void parseReceivedMessage() throws Exception {
+        final byte[] bytes = deserializationSchema.serialize("msg");
+        when(message.getBodyLength()).thenReturn((long) bytes.length);
+        when(message.readBytes(any(byte[].class))).thenAnswer(new Answer<Object>() {
+            @Override
+            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+                byte[] inputBytes = (byte[]) invocationOnMock.getArguments()[0];
+                Array.copy(bytes, 0, inputBytes, 0, bytes.length);
+                return null;
+            }
+        });
+
+        amqSource.run(context);
+
+        verify(context).collect("msg");
+    }
+
+    @Test
+    public void acknowledgeReceivedMessage() throws Exception {
+        amqSource.run(context);
+        amqSource.acknowledgeIDs(CHECKPOINT_ID, Collections.singletonList(MSG_ID));
+
+        verify(message).acknowledge();
+    }
+
+    @Test
+    public void handleUnknownIds() throws Exception {
+        amqSource.run(context);
+        amqSource.acknowledgeIDs(CHECKPOINT_ID, Collections.singletonList("unknown-id"));
+
+        verify(message, never()).acknowledge();
+    }
+
+    @Test
+    public void doNotAcknowledgeMessageTwice() throws Exception {
+        amqSource.run(context);
+        amqSource.acknowledgeIDs(CHECKPOINT_ID, Collections.singletonList(MSG_ID));
+        amqSource.acknowledgeIDs(CHECKPOINT_ID, Collections.singletonList(MSG_ID));
+
+        verify(message, times(1)).acknowledge();
+    }
+
+    @Test(expected = JMSException.class)
+    public void propagateAsyncException() throws Exception {
+        AMQExceptionListener exceptionListener = mock(AMQExceptionListener.class);
+        amqSource.setExceptionListener(exceptionListener);
+        doThrow(JMSException.class).when(exceptionListener).checkErroneous();
+        amqSource.run(context);
+    }
+
+    @Test(expected = RuntimeException.class)
+    public void throwAcknowledgeExceptionByDefault() throws Exception {
+        doThrow(JMSException.class).when(message).acknowledge();
+
+        amqSource.run(context);
+        amqSource.acknowledgeIDs(CHECKPOINT_ID, Collections.singletonList(MSG_ID));
+    }
+
+    @Test
+    public void doNotThrowAcknowledgeExceptionByDefault() throws Exception {
+        amqSource.setLogFailuresOnly(true);
+
+        doThrow(JMSException.class).when(message).acknowledge();
+
+        amqSource.run(context);
+        amqSource.acknowledgeIDs(CHECKPOINT_ID, Collections.singletonList(MSG_ID));
+    }
+
+    @Test
+    public void closeResources() throws Exception {
+        amqSource.close();
+
+        verify(consumer).close();
+        verify(session).close();
+        verify(connection).close();
+    }
+
+    @Test
+    public void consumerCloseExceptionShouldBePased() throws Exception {
+        doThrow(new JMSException("consumer")).when(consumer).close();
+        doThrow(new JMSException("session")).when(session).close();
+        doThrow(new JMSException("connection")).when(connection).close();
+
+        try {
+            amqSource.close();
+            fail("Should throw an exception");
+        } catch (RuntimeException ex) {
+            assertEquals("consumer", ex.getCause().getMessage());
+        }
+    }
+
+    @Test
+    public void sessionCloseExceptionShouldBePased() throws Exception {
+        doThrow(new JMSException("session")).when(session).close();
+        doThrow(new JMSException("connection")).when(connection).close();
+
+        try {
+            amqSource.close();
+            fail("Should throw an exception");
+        } catch (RuntimeException ex) {
+            assertEquals("session", ex.getCause().getMessage());
+        }
+    }
+
+    @Test
+    public void connectionCloseExceptionShouldBePased() throws Exception {
+        doThrow(new JMSException("connection")).when(connection).close();
+
+        try {
+            amqSource.close();
+            fail("Should throw an exception");
+        } catch (RuntimeException ex) {
+            assertEquals("connection", ex.getCause().getMessage());
+        }
+    }
+
+    @Test
+    public void exceptionsShouldNotBePassedIfLogFailuresOnly() throws Exception {
+        doThrow(new JMSException("consumer")).when(consumer).close();
+        doThrow(new JMSException("session")).when(session).close();
+        doThrow(new JMSException("connection")).when(connection).close();
+
+        amqSource.setLogFailuresOnly(true);
+        amqSource.close();
+    }
+
+    class SingleLoopRunChecker extends RunningChecker {
+
+        int count = 0;
+
+        @Override
+        public boolean isRunning() {
+            return (count++ == 0);
+        }
+
+        @Override
+        public void setIsRunning(boolean isRunning) {
+
+        }
+    }
+}
diff --git a/flink-connector-activemq/src/test/java/org/apache/flink/streaming/connectors/activemq/ActiveMQConnectorITCase.java b/flink-connector-activemq/src/test/java/org/apache/flink/streaming/connectors/activemq/ActiveMQConnectorITCase.java
new file mode 100644
index 0000000..985e06d
--- /dev/null
+++ b/flink-connector-activemq/src/test/java/org/apache/flink/streaming/connectors/activemq/ActiveMQConnectorITCase.java
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.activemq;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.apache.flink.test.util.SuccessException;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.HashSet;
+import java.util.Random;
+
+import static org.apache.flink.test.util.TestUtils.tryExecute;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+public class ActiveMQConnectorITCase {
+
+    public static final int MESSAGES_NUM = 10000;
+    public static final String QUEUE_NAME = "queue";
+    public static final String TOPIC_NAME = "topic";
+    private static ForkableFlinkMiniCluster flink;
+    private static int flinkPort;
+
+    @BeforeClass
+    public static void beforeClass() {
+        // start also a re-usable Flink mini cluster
+        Configuration flinkConfig = new Configuration();
+        flinkConfig.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
+        flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8);
+        flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 16);
+        flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s");
+
+        flink = new ForkableFlinkMiniCluster(flinkConfig, false);
+        flink.start();
+
+        flinkPort = flink.getLeaderRPCPort();
+    }
+
+    @AfterClass
+    public static void afterClass() {
+        flinkPort = -1;
+        if (flink != null) {
+            flink.shutdown();
+        }
+    }
+
+    @Test
+    public void amqTopologyWithQueue() throws Exception {
+        StreamExecutionEnvironment env = createExecutionEnvironment();
+        AMQSinkConfig<String> sinkConfig = new AMQSinkConfig.AMQSinkConfigBuilder<String>()
+            .setConnectionFactory(createConnectionFactory())
+            .setDestinationName(QUEUE_NAME)
+            .setSerializationSchema(new SimpleStringSchema())
+            .build();
+        createProducerTopology(env, sinkConfig);
+
+        ActiveMQConnectionFactory sourceConnectionFactory = createConnectionFactory();
+        AMQSourceConfig<String> sourceConfig = new AMQSourceConfig.AMQSourceConfigBuilder<String>()
+            .setConnectionFactory(sourceConnectionFactory)
+            .setDestinationName(QUEUE_NAME)
+            .setDeserializationSchema(new SimpleStringSchema())
+            .build();
+        createConsumerTopology(env, sourceConfig);
+
+        tryExecute(env, "AMQTest");
+    }
+
+    @Test
+    public void amqTopologyWithTopic() throws Exception {
+        StreamExecutionEnvironment env = createExecutionEnvironment();
+        AMQSinkConfig<String> sinkConfig = new AMQSinkConfig.AMQSinkConfigBuilder<String>()
+            .setConnectionFactory(createConnectionFactory())
+            .setDestinationName(TOPIC_NAME)
+            .setSerializationSchema(new SimpleStringSchema())
+            .setDestinationType(DestinationType.TOPIC)
+            .build();
+        createProducerTopology(env, sinkConfig);
+
+        ActiveMQConnectionFactory sourceConnectionFactory = createConnectionFactory();
+        AMQSourceConfig<String> sourceConfig = new AMQSourceConfig.AMQSourceConfigBuilder<String>()
+            .setConnectionFactory(sourceConnectionFactory)
+            .setDestinationName(TOPIC_NAME)
+            .setDeserializationSchema(new SimpleStringSchema())
+            .setDestinationType(DestinationType.TOPIC)
+            .build();
+        createConsumerTopology(env, sourceConfig);
+
+        tryExecute(env, "AMQTest");
+    }
+
+    private StreamExecutionEnvironment createExecutionEnvironment() {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+        env.setRestartStrategy(RestartStrategies.noRestart());
+        env.getConfig().disableSysoutLogging();
+        return env;
+    }
+
+    private ActiveMQConnectionFactory createConnectionFactory() {
+        return new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
+    }
+
+    private void createProducerTopology(StreamExecutionEnvironment env, AMQSinkConfig<String> config) {
+        DataStreamSource<String> stream = env.addSource(new SourceFunction<String>() {
+            @Override
+            public void run(SourceContext<String> ctx) throws Exception {
+                for (int i = 0; i < MESSAGES_NUM; i++) {
+                    ctx.collect("amq-" + i);
+                }
+            }
+
+            @Override
+            public void cancel() {}
+        });
+
+
+        AMQSink<String> sink = new AMQSink<>(config);
+        stream.addSink(sink);
+    }
+
+    private void createConsumerTopology(StreamExecutionEnvironment env, AMQSourceConfig<String> config) {
+        AMQSource<String> source = new AMQSource<>(config);
+
+        env.addSource(source)
+            .addSink(new SinkFunction<String>() {
+                final HashSet<Integer> set = new HashSet<>();
+                @Override
+                public void invoke(String value) throws Exception {
+                    int val = Integer.parseInt(value.split("-")[1]);
+                    set.add(val);
+
+                    if (set.size() == MESSAGES_NUM) {
+                        throw new SuccessException();
+                    }
+                }
+            });
+    }
+
+    @Test
+    public void amqTopologyWithCheckpointing() throws Exception {
+        ActiveMQConnectionFactory connectionFactory = createConnectionFactory();
+        AMQSinkConfig<String> sinkConfig = new AMQSinkConfig.AMQSinkConfigBuilder<String>()
+            .setConnectionFactory(connectionFactory)
+            .setDestinationName("queue2")
+            .setSerializationSchema(new SimpleStringSchema())
+            .build();
+        AMQSink<String> sink = new AMQSink<>(sinkConfig);
+        sink.open(new Configuration());
+
+        for (int i = 0; i < MESSAGES_NUM; i++) {
+            sink.invoke("amq-" + i);
+        }
+
+        AMQSourceConfig<String> sourceConfig = new AMQSourceConfig.AMQSourceConfigBuilder<String>()
+            .setConnectionFactory(connectionFactory)
+            .setDestinationName("queue2")
+            .setDeserializationSchema(new SimpleStringSchema())
+            .build();
+
+        final AMQSource<String> source = new AMQSource<>(sourceConfig);
+        RuntimeContext runtimeContext = createMockRuntimeContext();
+        source.setRuntimeContext(runtimeContext);
+        source.open(new Configuration());
+
+        final TestSourceContext sourceContext = new TestSourceContext();
+        Thread thread = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    source.run(sourceContext);
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        });
+        thread.start();
+
+        Deadline deadline = FiniteDuration.apply(5, "s").fromNow();
+        while (deadline.hasTimeLeft() && sourceContext.getIdsNum() < MESSAGES_NUM) {
+            Thread.sleep(100);
+            Random random = new Random();
+            long checkpointId = random.nextLong();
+            synchronized (sourceContext.getCheckpointLock()) {
+                source.snapshotState(checkpointId, System.currentTimeMillis());
+                source.notifyCheckpointComplete(checkpointId);
+            }
+        }
+        assertEquals(MESSAGES_NUM, sourceContext.getIdsNum());
+    }
+
+    private RuntimeContext createMockRuntimeContext() {
+        StreamingRuntimeContext runtimeContext = mock(StreamingRuntimeContext.class);
+        when(runtimeContext.isCheckpointingEnabled()).thenReturn(true);
+        return runtimeContext;
+    }
+
+    class TestSourceContext implements SourceFunction.SourceContext<String> {
+
+        private HashSet<Integer> ids = new HashSet<>();
+        private Object contextLock = new Object();
+        @Override
+        public void collect(String value) {
+            int val = Integer.parseInt(value.split("-")[1]);
+            ids.add(val);
+        }
+
+        @Override
+        public void collectWithTimestamp(String element, long timestamp) { }
+
+        @Override
+        public void emitWatermark(Watermark mark) { }
+
+        @Override
+        public Object getCheckpointLock() {
+            return contextLock;
+        }
+
+        @Override
+        public void close() { }
+
+        public int getIdsNum() {
+            synchronized (contextLock) {
+                return ids.size();
+            }
+        }
+    };
+}
diff --git a/flink-connector-flume/pom.xml b/flink-connector-flume/pom.xml
index f8b98f1..8551abd 100644
--- a/flink-connector-flume/pom.xml
+++ b/flink-connector-flume/pom.xml
@@ -152,6 +152,7 @@
 				<!-- Override artifactSet configuration to build fat-jar with all dependencies packed. -->
 				<groupId>org.apache.maven.plugins</groupId>
 				<artifactId>maven-shade-plugin</artifactId>
+				<version>2.4.3</version>
 				<executions>
 					<execution>
 						<id>shade-flink</id>
diff --git a/pom.xml b/pom.xml
index 01fb4fc..31af3a3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -73,6 +73,7 @@
   <modules>
     <module>flink-connector-redis</module>
     <module>flink-connector-flume</module>
+    <module>flink-connector-activemq</module>
   </modules>
 
   <properties>