[BAHIR-57] Add Flume sink

Closes #2
diff --git a/flink-connector-flume/pom.xml b/flink-connector-flume/pom.xml
new file mode 100644
index 0000000..f8b98f1
--- /dev/null
+++ b/flink-connector-flume/pom.xml
@@ -0,0 +1,174 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+        <groupId>org.apache.bahir</groupId>
+        <artifactId>bahir-flink_parent_2.11</artifactId>
+        <version>1.0.0-SNAPSHOT</version>
+        <relativePath>..</relativePath>
+    </parent>
+
+	<artifactId>flink-connector-flume_2.11</artifactId>
+	<name>flink-connector-flume</name>
+
+	<packaging>jar</packaging>
+
+	<!-- Allow users to pass custom connector versions -->
+	<properties>
+		<flume-ng.version>1.5.0</flume-ng.version>
+	</properties>
+
+	<dependencies>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-java_2.11</artifactId>
+			<version>${flink.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flume</groupId>
+			<artifactId>flume-ng-core</artifactId>
+			<version>${flume-ng.version}</version>
+			<exclusions>
+				<exclusion>
+					<groupId>org.slf4j</groupId>
+					<artifactId>slf4j-log4j12</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>log4j</groupId>
+					<artifactId>log4j</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>commons-io</groupId>
+					<artifactId>commons-io</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>commons-codec</groupId>
+					<artifactId>commons-codec</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>commons-cli</groupId>
+					<artifactId>commons-cli</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>commons-lang</groupId>
+					<artifactId>commons-lang</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.apache.avro</groupId>
+					<artifactId>avro</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.codehaus.jackson</groupId>
+					<artifactId>jackson-core-asl</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.codehaus.jackson</groupId>
+					<artifactId>jackson-mapper-asl</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>com.thoughtworks.paranamer</groupId>
+					<artifactId>paranamer</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.xerial.snappy</groupId>
+					<artifactId>snappy-java</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.tukaani</groupId>
+					<artifactId>xz</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.apache.velocity</groupId>
+					<artifactId>velocity</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>commons-collections</groupId>
+					<artifactId>commons-collections</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.mortbay.jetty</groupId>
+					<artifactId>servlet-api</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.mortbay.jetty</groupId>
+					<artifactId>jetty-util</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.mortbay.jetty</groupId>
+					<artifactId>jetty</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>com.google.code.gson</groupId>
+					<artifactId>gson</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.apache.thrift</groupId>
+					<artifactId>libthrift</artifactId>
+				</exclusion>
+			</exclusions>
+		</dependency>
+
+	</dependencies>
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-jar-plugin</artifactId>
+				<executions>
+					<execution>
+						<goals>
+							<goal>test-jar</goal>
+						</goals>
+					</execution>
+				</executions>
+			</plugin>
+
+			<plugin>
+				<!-- Override artifactSet configuration to build fat-jar with all dependencies packed. -->
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-shade-plugin</artifactId>
+				<executions>
+					<execution>
+						<id>shade-flink</id>
+						<configuration>
+							<artifactSet>
+								<includes combine.children="append">
+									<!-- We include all dependencies that transitively depend on guava -->
+									<include>org.apache.flume:*</include>
+								</includes>
+							</artifactSet>
+							<transformers>
+								<transformer implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer"/>
+							</transformers>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+		</plugins>
+	</build>
+</project>
diff --git a/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java b/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
new file mode 100644
index 0000000..00bfc39
--- /dev/null
+++ b/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.flume;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.FlumeException;
+import org.apache.flume.api.RpcClient;
+import org.apache.flume.api.RpcClientFactory;
+import org.apache.flume.event.EventBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FlumeSink<IN> extends RichSinkFunction<IN> {
+    private static final long serialVersionUID = 1L;
+
+    private static final Logger LOG = LoggerFactory.getLogger(FlumeSink.class);
+
+    private transient FlinkRpcClientFacade client;
+    boolean initDone = false;
+    String host;
+    int port;
+    SerializationSchema<IN> schema;
+
+    public FlumeSink(String host, int port, SerializationSchema<IN> schema) {
+        this.host = host;
+        this.port = port;
+        this.schema = schema;
+    }
+
+    /**
+     * Receives tuples from the Apache Flink {@link DataStream} and forwards
+     * them to Apache Flume.
+     *
+     * @param value
+     *            The tuple arriving from the datastream
+     */
+    @Override
+    public void invoke(IN value) {
+
+        byte[] data = schema.serialize(value);
+        client.sendDataToFlume(data);
+
+    }
+
+    private class FlinkRpcClientFacade {
+        private RpcClient client;
+        private String hostname;
+        private int port;
+
+        /**
+         * Initializes the connection to Apache Flume.
+         *
+         * @param hostname
+         *            The host
+         * @param port
+         *            The port.
+         */
+        public void init(String hostname, int port) {
+            // Setup the RPC connection
+            this.hostname = hostname;
+            this.port = port;
+            int initCounter = 0;
+            while (true) {
+                if (initCounter >= 90) {
+                    throw new RuntimeException("Cannot establish connection with" + port + " at "
+                            + host);
+                }
+                try {
+                    this.client = RpcClientFactory.getDefaultInstance(hostname, port);
+                } catch (FlumeException e) {
+                    // Wait one second if the connection failed before the next
+                    // try
+                    try {
+                        Thread.sleep(1000);
+                    } catch (InterruptedException e1) {
+                        if (LOG.isErrorEnabled()) {
+                            LOG.error("Interrupted while trying to connect {} at {}", port, host);
+                        }
+                    }
+                }
+                if (client != null) {
+                    break;
+                }
+                initCounter++;
+            }
+            initDone = true;
+        }
+
+        /**
+         * Sends byte arrays as {@link Event} series to Apache Flume.
+         *
+         * @param data
+         *            The byte array to send to Apache FLume
+         */
+        public void sendDataToFlume(byte[] data) {
+            Event event = EventBuilder.withBody(data);
+
+            try {
+                client.append(event);
+
+            } catch (EventDeliveryException e) {
+                // clean up and recreate the client
+                client.close();
+                client = null;
+                client = RpcClientFactory.getDefaultInstance(hostname, port);
+            }
+        }
+
+    }
+
+    @Override
+    public void close() {
+        client.client.close();
+    }
+
+    @Override
+    public void open(Configuration config) {
+        client = new FlinkRpcClientFacade();
+        client.init(host, port);
+    }
+}
diff --git a/flink-connector-redis/pom.xml b/flink-connector-redis/pom.xml
index c34711e..df78295 100644
--- a/flink-connector-redis/pom.xml
+++ b/flink-connector-redis/pom.xml
@@ -44,7 +44,6 @@
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-streaming-java_2.11</artifactId>
             <version>${flink.version}</version>
-            <scope>provided</scope>
         </dependency>
 
         <dependency>
diff --git a/pom.xml b/pom.xml
index b26982f..01fb4fc 100644
--- a/pom.xml
+++ b/pom.xml
@@ -72,6 +72,7 @@
 
   <modules>
     <module>flink-connector-redis</module>
+    <module>flink-connector-flume</module>
   </modules>
 
   <properties>