[BAHIR-114] update flume to 1.8 and add some tests

Closes #33
diff --git a/.travis.yml b/.travis.yml
index 691667c..3049224 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -18,49 +18,48 @@
 sudo: required
 dist: trusty
 
-cache:
-  directories:
-  - $HOME/.m2
+language: java
 
 # do not cache our own artifacts
 before_cache:
   - rm -rf $HOME/.m2/repository/org/apache/flink/
 
-language: java
+cache:
+  directories:
+  - $HOME/.m2
 
 services:
   - docker
 
-matrix:
-  include:
-    - jdk: oraclejdk8
-      env:
-        - FLINK_VERSION="1.5.1" SCALA_VERSION="2.11"
-        - MAVEN_PROFILE="default"
-        - CACHE_NAME=JDK8_F130_A
-    - jdk: openjdk8
-      env:
-        - FLINK_VERSION="1.5.1" SCALA_VERSION="2.11"
-        - CACHE_NAME=JDK8_F130_C
-        - MAVEN_PROFILE="default"
-        - CACHE_NAME=JDK8_F130_B
-    - jdk: openjdk8
-      env:
-        - FLINK_VERSION="1.5.1" SCALA_VERSION="2.11"
-        - MAVEN_PROFILE="test-kudu"
-        - CACHE_NAME=JDK8_F130_KUDU
+jdk:
+  - oraclejdk8
+  - openjdk8
+
+env:
+  - |
+    FLINK_VERSION="1.5.1" SCALA_VERSION="2.11" DOCKER="false"
+    PROJECTS="flink-connector-activemq,flink-connector-akka,flink-connector-influxdb,flink-connector-netty,flink-connector-redis,flink-library-siddhi"
+  - |
+    FLINK_VERSION="1.5.1" SCALA_VERSION="2.11" DOCKER="true"
+    PROJECTS="flink-connector-flume"
+  - |
+    FLINK_VERSION="1.5.1" SCALA_VERSION="2.11" DOCKER="true"
+    PROJECTS="flink-connector-kudu"
+
 
 before_install:
   - ./dev/change-scala-version.sh $SCALA_VERSION
 
 install: true
 
-script:
-  - |
-    if [[ $MAVEN_PROFILE == "default" ]]; then
-      mvn clean verify -Pscala-$SCALA_VERSION -Dflink.version=$FLINK_VERSION
+before_script:
+  - if [[ $DOCKER == "true" ]]; then
+    docker-compose -f "$PROJECTS/dockers/docker-compose.yml" up -d;
     fi
-  - |
-    if [[ $MAVEN_PROFILE == "test-kudu" ]]; then
-      flink-connector-kudu/dockers/run_kudu_tests.sh
-    fi
+
+script: mvn clean verify -pl $PROJECTS -Pscala-$SCALA_VERSION -Dflink.version=$FLINK_VERSION
+
+after_script:
+  - if [[ $DOCKER == "true" ]]; then
+    docker-compose -f "$PROJECTS/dockers/docker-compose.yml" down;
+    fi
\ No newline at end of file
diff --git a/flink-connector-flume/README.md b/flink-connector-flume/README.md
index c7d7f67..ebcd3f1 100644
--- a/flink-connector-flume/README.md
+++ b/flink-connector-flume/README.md
@@ -9,7 +9,7 @@
       <version>1.1-SNAPSHOT</version>
     </dependency>
 
-*Version Compatibility*: This module is compatible with Flume 1.5.0.
+*Version Compatibility*: This module is compatible with Flume 1.8.0.
 
 Note that the streaming connectors are not part of the binary distribution of Flink. You need to link them into your job jar for cluster execution.
 See how to link with them for cluster execution [here](https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/linking.html).
diff --git a/flink-connector-flume/dockers/conf/sink.conf b/flink-connector-flume/dockers/conf/sink.conf
new file mode 100644
index 0000000..81c246f
--- /dev/null
+++ b/flink-connector-flume/dockers/conf/sink.conf
@@ -0,0 +1,34 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+docker.sinks = fileSink
+docker.sources = avroSource
+docker.channels = inMemoryChannel
+
+docker.sources.avroSource.type = avro
+docker.sources.avroSource.channels = c1
+docker.sources.avroSource.bind = 0.0.0.0
+docker.sources.avroSource.port = 4545
+docker.sources.avroSource.channels = inMemoryChannel
+
+docker.channels.inMemoryChannel.type = memory
+docker.channels.inMemoryChannel.capacity = 1000
+docker.channels.inMemoryChannel.transactionCapacity = 100
+
+docker.sinks.fileSink.type = file_roll
+docker.sinks.fileSink.channel = inMemoryChannel
+docker.sinks.fileSink.sink.directory = /var/tmp/output
+docker.sinks.fileSink.rollInterval = 0
\ No newline at end of file
diff --git a/flink-connector-flume/dockers/conf/source.conf b/flink-connector-flume/dockers/conf/source.conf
new file mode 100644
index 0000000..f883f41
--- /dev/null
+++ b/flink-connector-flume/dockers/conf/source.conf
@@ -0,0 +1,33 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+docker.sinks = avroSink
+docker.sources = netcatSource
+docker.channels = inMemoryChannel
+
+docker.sources.netcatSource.type = avro
+docker.sources.netcatSource.bind = 0.0.0.0
+docker.sources.netcatSource.port = 44444
+docker.sources.netcatSource.channels = inMemoryChannel
+
+docker.channels.inMemoryChannel.type = memory
+docker.channels.inMemoryChannel.capacity = 1000
+docker.channels.inMemoryChannel.transactionCapacity = 100
+
+docker.sinks.avroSink.type = avro
+docker.sinks.avroSink.channel = inMemoryChannel
+docker.sinks.avroSink.hostname = sink
+docker.sinks.avroSink.port = 4545
diff --git a/flink-connector-flume/dockers/docker-compose.yml b/flink-connector-flume/dockers/docker-compose.yml
new file mode 100644
index 0000000..042bd5e
--- /dev/null
+++ b/flink-connector-flume/dockers/docker-compose.yml
@@ -0,0 +1,57 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+version: '2'
+
+services:
+
+  source:
+    image: eskabetxe/flume
+    container_name: flume-source
+    hostname: 172.25.0.3
+    ports:
+      - "44444:44444"
+    volumes:
+      - ./conf/source.conf:/opt/flume-config/flume.conf
+    environment:
+      - FLUME_AGENT_NAME=docker
+    links:
+      - "sink:sink"
+    networks:
+      mynet:
+        ipv4_address: 172.25.0.3
+
+  sink:
+    image:  eskabetxe/flume
+    container_name: flume-sink
+    hostname: 172.25.0.4
+    volumes:
+      - ./conf/sink.conf:/opt/flume-config/flume.conf
+      - ./output:/var/tmp/output
+    environment:
+      - FLUME_AGENT_NAME=docker
+    networks:
+      mynet:
+        ipv4_address: 172.25.0.4
+
+networks:
+  mynet:
+    driver: bridge
+    ipam:
+      config:
+      - subnet: 172.25.0.0/24
+        IPRange: 172.25.0.2/24,
+        gateway: 172.25.0.1
\ No newline at end of file
diff --git a/flink-connector-flume/pom.xml b/flink-connector-flume/pom.xml
index 1f4cf6d..c202c6d 100644
--- a/flink-connector-flume/pom.xml
+++ b/flink-connector-flume/pom.xml
@@ -35,7 +35,7 @@
 
 	<!-- Allow users to pass custom connector versions -->
 	<properties>
-		<flume-ng.version>1.5.0</flume-ng.version>
+		<flume-ng.version>1.8.0</flume-ng.version>
 	</properties>
 
 	<dependencies>
@@ -50,86 +50,23 @@
 			<groupId>org.apache.flume</groupId>
 			<artifactId>flume-ng-core</artifactId>
 			<version>${flume-ng.version}</version>
-			<exclusions>
-				<exclusion>
-					<groupId>org.slf4j</groupId>
-					<artifactId>slf4j-log4j12</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>log4j</groupId>
-					<artifactId>log4j</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>commons-io</groupId>
-					<artifactId>commons-io</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>commons-codec</groupId>
-					<artifactId>commons-codec</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>commons-cli</groupId>
-					<artifactId>commons-cli</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>commons-lang</groupId>
-					<artifactId>commons-lang</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>org.apache.avro</groupId>
-					<artifactId>avro</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>org.codehaus.jackson</groupId>
-					<artifactId>jackson-core-asl</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>org.codehaus.jackson</groupId>
-					<artifactId>jackson-mapper-asl</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>com.thoughtworks.paranamer</groupId>
-					<artifactId>paranamer</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>org.xerial.snappy</groupId>
-					<artifactId>snappy-java</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>org.tukaani</groupId>
-					<artifactId>xz</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>org.apache.velocity</groupId>
-					<artifactId>velocity</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>commons-collections</groupId>
-					<artifactId>commons-collections</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>org.mortbay.jetty</groupId>
-					<artifactId>servlet-api</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>org.mortbay.jetty</groupId>
-					<artifactId>jetty-util</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>org.mortbay.jetty</groupId>
-					<artifactId>jetty</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>com.google.code.gson</groupId>
-					<artifactId>gson</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>org.apache.thrift</groupId>
-					<artifactId>libthrift</artifactId>
-				</exclusion>
-			</exclusions>
 		</dependency>
 
+		<dependency>
+			<groupId>org.junit.jupiter</groupId>
+			<artifactId>junit-jupiter-api</artifactId>
+			<version>5.2.0</version>
+			<scope>test</scope>
+		</dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-tests_${scala.binary.version}</artifactId>
+            <version>${flink.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
 	</dependencies>
 
 	<build>
diff --git a/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeRpcClient.java b/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeRpcClient.java
new file mode 100644
index 0000000..e918f56
--- /dev/null
+++ b/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeRpcClient.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.flume;
+
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.FlumeException;
+import org.apache.flume.api.RpcClient;
+import org.apache.flume.api.RpcClientFactory;
+import org.apache.flume.event.EventBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.Charset;
+
+class FlumeRpcClient implements AutoCloseable {
+
+    private static final Logger LOG = LoggerFactory.getLogger(FlumeRpcClient.class);
+
+    protected RpcClient client;
+    private String hostname;
+    private int port;
+
+
+    FlumeRpcClient(String hostname, int port) {
+        this.hostname = hostname;
+        this.port = port;
+    }
+
+    /**
+     * Initializes the connection to Apache Flume.
+     */
+    public boolean init() {
+        // Setup the RPC connection
+        int initCounter = 0;
+        while (true) {
+            verifyCounter(initCounter, "Cannot establish connection");
+
+            try {
+                this.client = RpcClientFactory.getDefaultInstance(hostname, port);
+            } catch (FlumeException e) {
+                // Wait one second if the connection failed before the next
+                // try
+                try {
+                    Thread.sleep(1000);
+                } catch (InterruptedException e1) {
+                    if (LOG.isErrorEnabled()) {
+                        LOG.error("Interrupted while trying to connect {} on {}", hostname, port);
+                    }
+                }
+            }
+            if (client != null) {
+                break;
+            }
+            initCounter++;
+        }
+        return client.isActive();
+    }
+
+
+    public boolean sendData(String data) {
+        Event event = EventBuilder.withBody(data, Charset.forName("UTF-8"));
+        return sendData(event);
+    }
+    public boolean sendData(byte[] data) {
+        Event event = EventBuilder.withBody(data);
+        return sendData(event);
+    }
+
+    private boolean sendData(Event event) {
+        return sendData(event, 0);
+    }
+    private boolean sendData(Event event, int retryCount) {
+        verifyCounter(retryCount, "Cannot send message");
+        try {
+            client.append(event);
+            return true;
+        } catch (EventDeliveryException e) {
+            // clean up and recreate the client
+            reconnect();
+            return sendData(event, ++retryCount);
+        }
+    }
+
+
+    private void verifyCounter(int counter, String messaje) {
+        if (counter >= 10) {
+            throw new RuntimeException(messaje + " on " + hostname + " on " + port);
+        }
+    }
+
+    private void reconnect() {
+        close();
+        client = null;
+        init();
+    }
+
+    @Override
+    public void close() {
+        if (this.client == null) return;
+
+        this.client.close();
+    }
+}
diff --git a/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java b/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
index 41b1b25..7a80fd2 100644
--- a/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
+++ b/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
@@ -14,32 +14,20 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.flink.streaming.connectors.flume;
 
 import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import org.apache.flume.Event;
-import org.apache.flume.EventDeliveryException;
-import org.apache.flume.FlumeException;
-import org.apache.flume.api.RpcClient;
-import org.apache.flume.api.RpcClientFactory;
-import org.apache.flume.event.EventBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class FlumeSink<IN> extends RichSinkFunction<IN> {
-    private static final long serialVersionUID = 1L;
 
-    private static final Logger LOG = LoggerFactory.getLogger(FlumeSink.class);
+    private transient FlumeRpcClient client;
 
-    private transient FlinkRpcClientFacade client;
-    boolean initDone = false;
-    String host;
-    int port;
-    SerializationSchema<IN> schema;
+    private String host;
+    private int port;
+    private SerializationSchema<IN> schema;
 
     public FlumeSink(String host, int port, SerializationSchema<IN> schema) {
         this.host = host;
@@ -57,84 +45,20 @@
     @Override
     public void invoke(IN value, Context context) throws Exception {
         byte[] data = schema.serialize(value);
-        client.sendDataToFlume(data);
-
-    }
-
-    private class FlinkRpcClientFacade {
-        private RpcClient client;
-        private String hostname;
-        private int port;
-
-        /**
-         * Initializes the connection to Apache Flume.
-         *
-         * @param hostname
-         *            The host
-         * @param port
-         *            The port.
-         */
-        public void init(String hostname, int port) {
-            // Setup the RPC connection
-            this.hostname = hostname;
-            this.port = port;
-            int initCounter = 0;
-            while (true) {
-                if (initCounter >= 90) {
-                    throw new RuntimeException("Cannot establish connection with" + port + " at "
-                            + host);
-                }
-                try {
-                    this.client = RpcClientFactory.getDefaultInstance(hostname, port);
-                } catch (FlumeException e) {
-                    // Wait one second if the connection failed before the next
-                    // try
-                    try {
-                        Thread.sleep(1000);
-                    } catch (InterruptedException e1) {
-                        if (LOG.isErrorEnabled()) {
-                            LOG.error("Interrupted while trying to connect {} at {}", port, host);
-                        }
-                    }
-                }
-                if (client != null) {
-                    break;
-                }
-                initCounter++;
-            }
-            initDone = true;
-        }
-
-        /**
-         * Sends byte arrays as {@link Event} series to Apache Flume.
-         *
-         * @param data
-         *            The byte array to send to Apache FLume
-         */
-        public void sendDataToFlume(byte[] data) {
-            Event event = EventBuilder.withBody(data);
-
-            try {
-                client.append(event);
-
-            } catch (EventDeliveryException e) {
-                // clean up and recreate the client
-                client.close();
-                client = null;
-                client = RpcClientFactory.getDefaultInstance(hostname, port);
-            }
-        }
-
-    }
-
-    @Override
-    public void close() {
-        client.client.close();
+        client.sendData(data);
     }
 
     @Override
     public void open(Configuration config) {
-        client = new FlinkRpcClientFacade();
-        client.init(host, port);
+        client = new FlumeRpcClient(host, port);
+        client.init();
     }
+
+    @Override
+    public void close() {
+        if (client == null) return;
+        client.close();
+    }
+
+
 }
diff --git a/flink-connector-flume/src/test/java/org/apache/flink/streaming/connectors/flume/FlumeRpcClientTest.java b/flink-connector-flume/src/test/java/org/apache/flink/streaming/connectors/flume/FlumeRpcClientTest.java
new file mode 100644
index 0000000..7bab666
--- /dev/null
+++ b/flink-connector-flume/src/test/java/org/apache/flink/streaming/connectors/flume/FlumeRpcClientTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.flume;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class FlumeRpcClientTest {
+
+    public FlumeRpcClient createGoodClient() {
+        return new FlumeRpcClient("172.25.0.3", 44444);
+    }
+
+
+    @Test
+    public void testInitClientMustFail() {
+        FlumeRpcClient client = new FlumeRpcClient("172.25.0.3", 44445);
+        Assertions.assertThrows(RuntimeException.class, () -> client.init(), "client start");
+    }
+
+    @Test
+    public void testSendStringData() {
+        FlumeRpcClient client = createGoodClient();
+        boolean init = client.init();
+        Assertions.assertTrue(init, "client not start");
+
+        boolean send = client.sendData("xpto");
+        Assertions.assertTrue(send, "data not send");
+
+    }
+
+    @Test
+    public void testSendBytesData() {
+        FlumeRpcClient client = createGoodClient();
+        boolean init = client.init();
+        Assertions.assertTrue(init, "client not start");
+
+        boolean send = client.sendData("xpto".getBytes());
+        Assertions.assertTrue(send, "data not send");
+
+    }
+
+    @Test
+    public void testSendDataWhenConnectionClosed() {
+        FlumeRpcClient client = createGoodClient();
+        boolean init = client.init();
+        Assertions.assertTrue(init, "client not start");
+        client.close();
+
+        boolean send = client.sendData("xpto");
+        Assertions.assertTrue(send, "data not send");
+
+    }
+}
diff --git a/flink-connector-flume/src/test/java/org/apache/flink/streaming/connectors/flume/FlumeSinkTest.java b/flink-connector-flume/src/test/java/org/apache/flink/streaming/connectors/flume/FlumeSinkTest.java
new file mode 100644
index 0000000..9d87642
--- /dev/null
+++ b/flink-connector-flume/src/test/java/org/apache/flink/streaming/connectors/flume/FlumeSinkTest.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.flume;
+
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.junit.jupiter.api.Test;
+
+import static org.apache.flink.test.util.TestUtils.tryExecute;
+
+public class FlumeSinkTest {
+
+
+    @Test
+    public void testSink() throws Exception {
+        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
+
+        environment.fromElements("string1", "string2")
+                .addSink(new FlumeSink<>("172.25.0.3", 44444, new SimpleStringSchema()));
+
+        tryExecute(environment, "FlumeTest");
+    }
+
+
+}
diff --git a/flink-connector-flume/src/test/resources/log4j.properties b/flink-connector-flume/src/test/resources/log4j.properties
new file mode 100644
index 0000000..15efe08
--- /dev/null
+++ b/flink-connector-flume/src/test/resources/log4j.properties
@@ -0,0 +1,27 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# This file ensures that tests executed from the IDE show log output
+
+log4j.rootLogger=WARN, console
+
+# Log all infos in the given file
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
diff --git a/pom.xml b/pom.xml
index 80766be..c40e194 100644
--- a/pom.xml
+++ b/pom.xml
@@ -708,7 +708,6 @@
         </plugins>
       </build>
     </profile>
-
     <profile>
       <id>test-java-home</id>
       <activation>