[Issue 3275][pulsar-io]Support source and sink of flume (#3597)

* Support TLS authentication and authorization in standalone mode

* Compile success
To do: test channel, sink and source of flume

* Add conf file

* Add sink and source folder
Move file to folder
Add source
Compile success
To do -> test source

* test flume source paas

* Add config file and test case

* Add test and update pom.xml
To do add test of source and sink

* Add unit tests

* Add test case and test pass
To do test source

* Add license
Add test source of pulsar

* Handle if blockingQueue is null

* Move LOG to log

* Format code

* Add sinkClass in pulsar-io.yaml

* Add comment

* Default is false

* Modify pom file of flume

* Format pom.xml file

* Move pom version to 2.4.0-SNAPSHOT
diff --git a/pulsar-io/flume/pom.xml b/pulsar-io/flume/pom.xml
new file mode 100644
index 0000000..21dbc85
--- /dev/null
+++ b/pulsar-io/flume/pom.xml
@@ -0,0 +1,116 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.pulsar</groupId>
+        <artifactId>pulsar-io</artifactId>
+        <version>2.4.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>pulsar-io-flume</artifactId>
+    <name>Pulsar IO :: Flume</name>
+
+    <dependencies>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>pulsar-io-core</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-databind</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>com.fasterxml.jackson.dataformat</groupId>
+            <artifactId>jackson-dataformat-yaml</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flume</groupId>
+            <artifactId>flume-ng-node</artifactId>
+            <version>1.9.0</version>
+            <type>pom</type>
+            <exclusions>
+                <exclusion>
+                    <artifactId>avro-ipc</artifactId>
+                    <groupId>org.apache.avro</groupId>
+                </exclusion>
+                <exclusion>
+                    <artifactId>avro</artifactId>
+                    <groupId>org.apache.avro</groupId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.avro</groupId>
+            <artifactId>avro</artifactId>
+            <version>1.8.1</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.avro</groupId>
+            <artifactId>avro-ipc</artifactId>
+            <version>1.8.1</version>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+            <version>4.10</version>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-all</artifactId>
+            <scope>test</scope>
+            <version>1.9.0</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.curator</groupId>
+            <artifactId>curator-test</artifactId>
+            <version>2.6.0</version>
+        </dependency>
+        <dependency>
+            <groupId>com.github.stefanbirkner</groupId>
+            <artifactId>system-rules</artifactId>
+            <scope>test</scope>
+            <version>1.17.0</version>
+        </dependency>
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+            <version>18.0</version>
+        </dependency>
+    </dependencies>
+
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.nifi</groupId>
+                <artifactId>nifi-nar-maven-plugin</artifactId>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>
\ No newline at end of file
diff --git a/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/FlumeConfig.java b/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/FlumeConfig.java
new file mode 100644
index 0000000..54830cb
--- /dev/null
+++ b/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/FlumeConfig.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.flume;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
+import lombok.*;
+import lombok.experimental.Accessors;
+import org.apache.pulsar.io.core.annotations.FieldDoc;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ *  Flume general config.
+ */
+@Data
+@Setter
+@Getter
+@EqualsAndHashCode
+@ToString
+@Accessors(chain = true)
+public class FlumeConfig {
+
+    @FieldDoc(
+            required = true,
+            defaultValue = "",
+            help = "the name of this agent")
+    private String name;
+    @FieldDoc(
+            required = true,
+            defaultValue = "",
+            help = "specify a config file (required if -z missing)")
+    private String confFile;
+    @FieldDoc(
+            defaultValue = "false",
+            help = "do not reload config file if changed")
+    private Boolean noReloadConf;
+    @FieldDoc(
+            required = true,
+            defaultValue = "",
+            help = "specify the ZooKeeper connection to use (required if -f missing)")
+    private String zkConnString;
+    @FieldDoc(
+            required = true,
+            defaultValue = "",
+            help = "specify the base path in ZooKeeper for agent configs")
+    private String zkBasePath;
+
+    public static FlumeConfig load(String yamlFile) throws IOException {
+        ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
+        return mapper.readValue(new File(yamlFile), FlumeConfig.class);
+    }
+
+
+    public static FlumeConfig load(Map<String, Object> map) throws IOException {
+        ObjectMapper mapper = new ObjectMapper();
+        return mapper.readValue(new ObjectMapper().writeValueAsString(map), FlumeConfig.class);
+    }
+}
diff --git a/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/FlumeConnector.java b/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/FlumeConnector.java
new file mode 100644
index 0000000..29a9847
--- /dev/null
+++ b/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/FlumeConnector.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.flume;
+
+import com.google.common.collect.Lists;
+import com.google.common.eventbus.EventBus;
+import org.apache.commons.cli.ParseException;
+import org.apache.flume.Constants;
+import org.apache.flume.lifecycle.LifecycleAware;
+import org.apache.flume.util.SSLUtil;
+import org.apache.pulsar.io.flume.node.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+
+public class FlumeConnector {
+
+    private static final Logger log = LoggerFactory
+            .getLogger(FlumeConnector.class);
+
+    protected Application application;
+
+    public void StartConnector(FlumeConfig flumeConfig) throws Exception {
+        SSLUtil.initGlobalSSLParameters();
+        String agentName = flumeConfig.getName();
+        boolean reload = !flumeConfig.getNoReloadConf();
+        boolean isZkConfigured = false;
+        if (flumeConfig.getZkConnString().length() > 0) {
+            isZkConfigured = true;
+        }
+        if (isZkConfigured) {
+            // get options
+            String zkConnectionStr = flumeConfig.getZkConnString();
+            String baseZkPath = flumeConfig.getZkBasePath();
+            if (reload) {
+                EventBus eventBus = new EventBus(agentName + "-event-bus");
+                List<LifecycleAware> components = Lists.newArrayList();
+                PollingZooKeeperConfigurationProvider zookeeperConfigurationProvider =
+                        new PollingZooKeeperConfigurationProvider(
+                                agentName, zkConnectionStr, baseZkPath, eventBus);
+                components.add(zookeeperConfigurationProvider);
+                application = new Application(components);
+                eventBus.register(application);
+            } else {
+                StaticZooKeeperConfigurationProvider zookeeperConfigurationProvider =
+                        new StaticZooKeeperConfigurationProvider(
+                                agentName, zkConnectionStr, baseZkPath);
+                application = new Application();
+                application.handleConfigurationEvent(zookeeperConfigurationProvider.getConfiguration());
+            }
+
+        } else {
+            File configurationFile = new File(flumeConfig.getConfFile());
+             /*
+         * The following is to ensure that by default the agent will fail on
+         * startup if the file does not exist.
+         */
+            if (!configurationFile.exists()) {
+                // If command line invocation, then need to fail fast
+                if (System.getProperty(Constants.SYSPROP_CALLED_FROM_SERVICE) ==
+                        null) {
+                    String path = configurationFile.getPath();
+                    try {
+                        path = configurationFile.getCanonicalPath();
+                    } catch (IOException ex) {
+                        log.error("Failed to read canonical path for file: " + path,
+                                ex);
+                    }
+                    throw new ParseException("The specified configuration file does not exist: " + path);
+                }
+            }
+            List<LifecycleAware> components = Lists.newArrayList();
+
+            if (reload) {
+                EventBus eventBus = new EventBus(agentName + "-event-bus");
+                PollingPropertiesFileConfigurationProvider configurationProvider =
+                        new PollingPropertiesFileConfigurationProvider(
+                                agentName, configurationFile, eventBus, 30);
+                components.add(configurationProvider);
+                application = new Application(components);
+                eventBus.register(application);
+            } else {
+                PropertiesFileConfigurationProvider configurationProvider =
+                        new PropertiesFileConfigurationProvider(agentName, configurationFile);
+                application = new Application();
+                application.handleConfigurationEvent(configurationProvider.getConfiguration());
+            }
+        }
+        application.start();
+
+        final Application appReference = application;
+        Runtime.getRuntime().addShutdownHook(new Thread("agent-shutdown-hook") {
+            @Override
+            public void run() {
+                appReference.stop();
+            }
+        });
+    }
+
+    public void stop() {
+        if (application != null) {
+            application.stop();
+        }
+    }
+}
diff --git a/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/node/AbstractConfigurationProvider.java b/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/node/AbstractConfigurationProvider.java
new file mode 100644
index 0000000..36bbe55
--- /dev/null
+++ b/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/node/AbstractConfigurationProvider.java
@@ -0,0 +1,559 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.flume.node;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.Set;
+
+import org.apache.flume.Channel;
+import org.apache.flume.ChannelFactory;
+import org.apache.flume.ChannelSelector;
+import org.apache.flume.Context;
+import org.apache.flume.FlumeException;
+import org.apache.flume.Sink;
+import org.apache.flume.SinkFactory;
+import org.apache.flume.SinkProcessor;
+import org.apache.flume.SinkRunner;
+import org.apache.flume.Source;
+import org.apache.flume.SourceFactory;
+import org.apache.flume.SourceRunner;
+import org.apache.flume.annotations.Disposable;
+import org.apache.flume.channel.ChannelProcessor;
+import org.apache.flume.channel.ChannelSelectorFactory;
+import org.apache.flume.channel.DefaultChannelFactory;
+import org.apache.flume.conf.BasicConfigurationConstants;
+import org.apache.flume.conf.BatchSizeSupported;
+import org.apache.flume.conf.ComponentConfiguration;
+import org.apache.flume.conf.Configurables;
+import org.apache.flume.conf.FlumeConfiguration;
+import org.apache.flume.conf.FlumeConfiguration.AgentConfiguration;
+import org.apache.flume.conf.TransactionCapacitySupported;
+import org.apache.flume.conf.channel.ChannelSelectorConfiguration;
+import org.apache.flume.conf.sink.SinkConfiguration;
+import org.apache.flume.conf.sink.SinkGroupConfiguration;
+import org.apache.flume.conf.source.SourceConfiguration;
+import org.apache.flume.sink.DefaultSinkFactory;
+import org.apache.flume.sink.DefaultSinkProcessor;
+import org.apache.flume.sink.SinkGroup;
+import org.apache.flume.source.DefaultSourceFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ListMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+public abstract class AbstractConfigurationProvider implements ConfigurationProvider {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractConfigurationProvider.class);
+
+    private final String agentName;
+    private final SourceFactory sourceFactory;
+    private final SinkFactory sinkFactory;
+    private final ChannelFactory channelFactory;
+
+    private final Map<Class<? extends Channel>, Map<String, Channel>> channelCache;
+
+    public AbstractConfigurationProvider(String agentName) {
+        super();
+        this.agentName = agentName;
+        this.sourceFactory = new DefaultSourceFactory();
+        this.sinkFactory = new DefaultSinkFactory();
+        this.channelFactory = new DefaultChannelFactory();
+
+        channelCache = new HashMap<Class<? extends Channel>, Map<String, Channel>>();
+    }
+
+    protected abstract FlumeConfiguration getFlumeConfiguration();
+
+    public MaterializedConfiguration getConfiguration() {
+        MaterializedConfiguration conf = new SimpleMaterializedConfiguration();
+        FlumeConfiguration fconfig = getFlumeConfiguration();
+        AgentConfiguration agentConf = fconfig.getConfigurationFor(getAgentName());
+        if (agentConf != null) {
+            Map<String, ChannelComponent> channelComponentMap = Maps.newHashMap();
+            Map<String, SourceRunner> sourceRunnerMap = Maps.newHashMap();
+            Map<String, SinkRunner> sinkRunnerMap = Maps.newHashMap();
+            try {
+                loadChannels(agentConf, channelComponentMap);
+                loadSources(agentConf, channelComponentMap, sourceRunnerMap);
+                loadSinks(agentConf, channelComponentMap, sinkRunnerMap);
+                Set<String> channelNames = new HashSet<String>(channelComponentMap.keySet());
+                for (String channelName : channelNames) {
+                    ChannelComponent channelComponent = channelComponentMap.get(channelName);
+                    if (channelComponent.components.isEmpty()) {
+                        LOGGER.warn(String.format("Channel %s has no components connected" +
+                                " and has been removed.", channelName));
+                        channelComponentMap.remove(channelName);
+                        Map<String, Channel> nameChannelMap =
+                                channelCache.get(channelComponent.channel.getClass());
+                        if (nameChannelMap != null) {
+                            nameChannelMap.remove(channelName);
+                        }
+                    } else {
+                        LOGGER.info(String.format("Channel %s connected to %s",
+                                channelName, channelComponent.components.toString()));
+                        conf.addChannel(channelName, channelComponent.channel);
+                    }
+                }
+                for (Map.Entry<String, SourceRunner> entry : sourceRunnerMap.entrySet()) {
+                    conf.addSourceRunner(entry.getKey(), entry.getValue());
+                }
+                for (Map.Entry<String, SinkRunner> entry : sinkRunnerMap.entrySet()) {
+                    conf.addSinkRunner(entry.getKey(), entry.getValue());
+                }
+            } catch (InstantiationException ex) {
+                LOGGER.error("Failed to instantiate component", ex);
+            } finally {
+                channelComponentMap.clear();
+                sourceRunnerMap.clear();
+                sinkRunnerMap.clear();
+            }
+        } else {
+            LOGGER.warn("No configuration found for this host:{}", getAgentName());
+        }
+        return conf;
+    }
+
+    public String getAgentName() {
+        return agentName;
+    }
+
+    private void loadChannels(AgentConfiguration agentConf,
+                              Map<String, ChannelComponent> channelComponentMap)
+            throws InstantiationException {
+        LOGGER.info("Creating channels");
+
+    /*
+     * Some channels will be reused across re-configurations. To handle this,
+     * we store all the names of current channels, perform the reconfiguration,
+     * and then if a channel was not used, we delete our reference to it.
+     * This supports the scenario where you enable channel "ch0" then remove it
+     * and add it back. Without this, channels like memory channel would cause
+     * the first instances data to show up in the seconds.
+     */
+        ListMultimap<Class<? extends Channel>, String> channelsNotReused =
+                ArrayListMultimap.create();
+        // assume all channels will not be re-used
+        for (Map.Entry<Class<? extends Channel>, Map<String, Channel>> entry :
+                channelCache.entrySet()) {
+            Class<? extends Channel> channelKlass = entry.getKey();
+            Set<String> channelNames = entry.getValue().keySet();
+            channelsNotReused.get(channelKlass).addAll(channelNames);
+        }
+
+        Set<String> channelNames = agentConf.getChannelSet();
+        Map<String, ComponentConfiguration> compMap = agentConf.getChannelConfigMap();
+    /*
+     * Components which have a ComponentConfiguration object
+     */
+        for (String chName : channelNames) {
+            ComponentConfiguration comp = compMap.get(chName);
+            if (comp != null) {
+                Channel channel = getOrCreateChannel(channelsNotReused,
+                        comp.getComponentName(), comp.getType());
+                try {
+                    Configurables.configure(channel, comp);
+                    channelComponentMap.put(comp.getComponentName(),
+                            new ChannelComponent(channel));
+                    LOGGER.info("Created channel " + chName);
+                } catch (Exception e) {
+                    String msg = String.format("Channel %s has been removed due to an " +
+                            "error during configuration", chName);
+                    LOGGER.error(msg, e);
+                }
+            }
+        }
+    /*
+     * Components which DO NOT have a ComponentConfiguration object
+     * and use only Context
+     */
+        for (String chName : channelNames) {
+            Context context = agentConf.getChannelContext().get(chName);
+            if (context != null) {
+                Channel channel = getOrCreateChannel(channelsNotReused, chName,
+                        context.getString(BasicConfigurationConstants.CONFIG_TYPE));
+                try {
+                    Configurables.configure(channel, context);
+                    channelComponentMap.put(chName, new ChannelComponent(channel));
+                    LOGGER.info("Created channel " + chName);
+                } catch (Exception e) {
+                    String msg = String.format("Channel %s has been removed due to an " +
+                            "error during configuration", chName);
+                    LOGGER.error(msg, e);
+                }
+            }
+        }
+    /*
+     * Any channel which was not re-used, will have it's reference removed
+     */
+        for (Class<? extends Channel> channelKlass : channelsNotReused.keySet()) {
+            Map<String, Channel> channelMap = channelCache.get(channelKlass);
+            if (channelMap != null) {
+                for (String channelName : channelsNotReused.get(channelKlass)) {
+                    if (channelMap.remove(channelName) != null) {
+                        LOGGER.info("Removed {} of type {}", channelName, channelKlass);
+                    }
+                }
+                if (channelMap.isEmpty()) {
+                    channelCache.remove(channelKlass);
+                }
+            }
+        }
+    }
+
+    private Channel getOrCreateChannel(
+            ListMultimap<Class<? extends Channel>, String> channelsNotReused,
+            String name, String type)
+            throws FlumeException {
+
+        Class<? extends Channel> channelClass = channelFactory.getClass(type);
+    /*
+     * Channel has requested a new instance on each re-configuration
+     */
+        if (channelClass.isAnnotationPresent(Disposable.class)) {
+            Channel channel = channelFactory.create(name, type);
+            channel.setName(name);
+            return channel;
+        }
+        Map<String, Channel> channelMap = channelCache.get(channelClass);
+        if (channelMap == null) {
+            channelMap = new HashMap<String, Channel>();
+            channelCache.put(channelClass, channelMap);
+        }
+        Channel channel = channelMap.get(name);
+        if (channel == null) {
+            channel = channelFactory.create(name, type);
+            channel.setName(name);
+            channelMap.put(name, channel);
+        }
+        channelsNotReused.get(channelClass).remove(name);
+        return channel;
+    }
+
+    private void loadSources(AgentConfiguration agentConf,
+                             Map<String, ChannelComponent> channelComponentMap,
+                             Map<String, SourceRunner> sourceRunnerMap)
+            throws InstantiationException {
+
+        Set<String> sourceNames = agentConf.getSourceSet();
+        Map<String, ComponentConfiguration> compMap =
+                agentConf.getSourceConfigMap();
+    /*
+     * Components which have a ComponentConfiguration object
+     */
+        for (String sourceName : sourceNames) {
+            ComponentConfiguration comp = compMap.get(sourceName);
+            if (comp != null) {
+                SourceConfiguration config = (SourceConfiguration) comp;
+
+                Source source = sourceFactory.create(comp.getComponentName(),
+                        comp.getType());
+                try {
+                    Configurables.configure(source, config);
+                    Set<String> channelNames = config.getChannels();
+                    List<Channel> sourceChannels =
+                            getSourceChannels(channelComponentMap, source, channelNames);
+                    if (sourceChannels.isEmpty()) {
+                        String msg = String.format("Source %s is not connected to a " +
+                                "channel", sourceName);
+                        throw new IllegalStateException(msg);
+                    }
+                    ChannelSelectorConfiguration selectorConfig =
+                            config.getSelectorConfiguration();
+
+                    ChannelSelector selector = ChannelSelectorFactory.create(
+                            sourceChannels, selectorConfig);
+
+                    ChannelProcessor channelProcessor = new ChannelProcessor(selector);
+                    Configurables.configure(channelProcessor, config);
+
+                    source.setChannelProcessor(channelProcessor);
+                    sourceRunnerMap.put(comp.getComponentName(),
+                            SourceRunner.forSource(source));
+                    for (Channel channel : sourceChannels) {
+                        ChannelComponent channelComponent =
+                                Preconditions.checkNotNull(channelComponentMap.get(channel.getName()),
+                                        String.format("Channel %s", channel.getName()));
+                        channelComponent.components.add(sourceName);
+                    }
+                } catch (Exception e) {
+                    String msg = String.format("Source %s has been removed due to an " +
+                            "error during configuration", sourceName);
+                    LOGGER.error(msg, e);
+                }
+            }
+        }
+    /*
+     * Components which DO NOT have a ComponentConfiguration object
+     * and use only Context
+     */
+        Map<String, Context> sourceContexts = agentConf.getSourceContext();
+        for (String sourceName : sourceNames) {
+            Context context = sourceContexts.get(sourceName);
+            if (context != null) {
+                Source source =
+                        sourceFactory.create(sourceName,
+                                context.getString(BasicConfigurationConstants.CONFIG_TYPE));
+                try {
+                    Configurables.configure(source, context);
+                    String[] channelNames = context.getString(
+                            BasicConfigurationConstants.CONFIG_CHANNELS).split("\\s+");
+                    List<Channel> sourceChannels =
+                            getSourceChannels(channelComponentMap, source, Arrays.asList(channelNames));
+                    if (sourceChannels.isEmpty()) {
+                        String msg = String.format("Source %s is not connected to a " +
+                                "channel", sourceName);
+                        throw new IllegalStateException(msg);
+                    }
+                    Map<String, String> selectorConfig = context.getSubProperties(
+                            BasicConfigurationConstants.CONFIG_SOURCE_CHANNELSELECTOR_PREFIX);
+
+                    ChannelSelector selector = ChannelSelectorFactory.create(
+                            sourceChannels, selectorConfig);
+
+                    ChannelProcessor channelProcessor = new ChannelProcessor(selector);
+                    Configurables.configure(channelProcessor, context);
+                    source.setChannelProcessor(channelProcessor);
+                    sourceRunnerMap.put(sourceName,
+                            SourceRunner.forSource(source));
+                    for (Channel channel : sourceChannels) {
+                        ChannelComponent channelComponent =
+                                Preconditions.checkNotNull(channelComponentMap.get(channel.getName()),
+                                        String.format("Channel %s", channel.getName()));
+                        channelComponent.components.add(sourceName);
+                    }
+                } catch (Exception e) {
+                    String msg = String.format("Source %s has been removed due to an " +
+                            "error during configuration", sourceName);
+                    LOGGER.error(msg, e);
+                }
+            }
+        }
+    }
+
+    private List<Channel> getSourceChannels(Map<String, ChannelComponent> channelComponentMap,
+                                            Source source, Collection<String> channelNames) throws InstantiationException {
+        List<Channel> sourceChannels = new ArrayList<Channel>();
+        for (String chName : channelNames) {
+            ChannelComponent channelComponent = channelComponentMap.get(chName);
+            if (channelComponent != null) {
+                checkSourceChannelCompatibility(source, channelComponent.channel);
+                sourceChannels.add(channelComponent.channel);
+            }
+        }
+        return sourceChannels;
+    }
+
+    private void checkSourceChannelCompatibility(Source source, Channel channel)
+            throws InstantiationException {
+        if (source instanceof BatchSizeSupported && channel instanceof TransactionCapacitySupported) {
+            long transCap = ((TransactionCapacitySupported) channel).getTransactionCapacity();
+            long batchSize = ((BatchSizeSupported) source).getBatchSize();
+            if (transCap < batchSize) {
+                String msg = String.format(
+                        "Incompatible source and channel settings defined. " +
+                                "source's batch size is greater than the channels transaction capacity. " +
+                                "Source: %s, batch size = %d, channel %s, transaction capacity = %d",
+                        source.getName(), batchSize,
+                        channel.getName(), transCap);
+                throw new InstantiationException(msg);
+            }
+        }
+    }
+
+    private void checkSinkChannelCompatibility(Sink sink, Channel channel)
+            throws InstantiationException {
+        if (sink instanceof BatchSizeSupported && channel instanceof TransactionCapacitySupported) {
+            long transCap = ((TransactionCapacitySupported) channel).getTransactionCapacity();
+            long batchSize = ((BatchSizeSupported) sink).getBatchSize();
+            if (transCap < batchSize) {
+                String msg = String.format(
+                        "Incompatible sink and channel settings defined. " +
+                                "sink's batch size is greater than the channels transaction capacity. " +
+                                "Sink: %s, batch size = %d, channel %s, transaction capacity = %d",
+                        sink.getName(), batchSize,
+                        channel.getName(), transCap);
+                throw new InstantiationException(msg);
+            }
+        }
+    }
+
+    private void loadSinks(AgentConfiguration agentConf,
+                           Map<String, ChannelComponent> channelComponentMap, Map<String, SinkRunner> sinkRunnerMap)
+            throws InstantiationException {
+        Set<String> sinkNames = agentConf.getSinkSet();
+        Map<String, ComponentConfiguration> compMap =
+                agentConf.getSinkConfigMap();
+        Map<String, Sink> sinks = new HashMap<String, Sink>();
+    /*
+     * Components which have a ComponentConfiguration object
+     */
+        for (String sinkName : sinkNames) {
+            ComponentConfiguration comp = compMap.get(sinkName);
+            if (comp != null) {
+                SinkConfiguration config = (SinkConfiguration) comp;
+                Sink sink = sinkFactory.create(comp.getComponentName(), comp.getType());
+                try {
+                    Configurables.configure(sink, config);
+                    ChannelComponent channelComponent = channelComponentMap.get(config.getChannel());
+                    if (channelComponent == null) {
+                        String msg = String.format("Sink %s is not connected to a " +
+                                "channel", sinkName);
+                        throw new IllegalStateException(msg);
+                    }
+                    checkSinkChannelCompatibility(sink, channelComponent.channel);
+                    sink.setChannel(channelComponent.channel);
+                    sinks.put(comp.getComponentName(), sink);
+                    channelComponent.components.add(sinkName);
+                } catch (Exception e) {
+                    String msg = String.format("Sink %s has been removed due to an " +
+                            "error during configuration", sinkName);
+                    LOGGER.error(msg, e);
+                }
+            }
+        }
+    /*
+     * Components which DO NOT have a ComponentConfiguration object
+     * and use only Context
+     */
+        Map<String, Context> sinkContexts = agentConf.getSinkContext();
+        for (String sinkName : sinkNames) {
+            Context context = sinkContexts.get(sinkName);
+            if (context != null) {
+                Sink sink = sinkFactory.create(sinkName, context.getString(
+                        BasicConfigurationConstants.CONFIG_TYPE));
+                try {
+                    Configurables.configure(sink, context);
+                    ChannelComponent channelComponent =
+                            channelComponentMap.get(
+                                    context.getString(BasicConfigurationConstants.CONFIG_CHANNEL));
+                    if (channelComponent == null) {
+                        String msg = String.format("Sink %s is not connected to a " +
+                                "channel", sinkName);
+                        throw new IllegalStateException(msg);
+                    }
+                    checkSinkChannelCompatibility(sink, channelComponent.channel);
+                    sink.setChannel(channelComponent.channel);
+                    sinks.put(sinkName, sink);
+                    channelComponent.components.add(sinkName);
+                } catch (Exception e) {
+                    String msg = String.format("Sink %s has been removed due to an " +
+                            "error during configuration", sinkName);
+                    LOGGER.error(msg, e);
+                }
+            }
+        }
+
+        loadSinkGroups(agentConf, sinks, sinkRunnerMap);
+    }
+
+    private void loadSinkGroups(AgentConfiguration agentConf,
+                                Map<String, Sink> sinks, Map<String, SinkRunner> sinkRunnerMap)
+            throws InstantiationException {
+        Set<String> sinkGroupNames = agentConf.getSinkgroupSet();
+        Map<String, ComponentConfiguration> compMap =
+                agentConf.getSinkGroupConfigMap();
+        Map<String, String> usedSinks = new HashMap<String, String>();
+        for (String groupName : sinkGroupNames) {
+            ComponentConfiguration comp = compMap.get(groupName);
+            if (comp != null) {
+                SinkGroupConfiguration groupConf = (SinkGroupConfiguration) comp;
+                List<Sink> groupSinks = new ArrayList<Sink>();
+                for (String sink : groupConf.getSinks()) {
+                    Sink s = sinks.remove(sink);
+                    if (s == null) {
+                        String sinkUser = usedSinks.get(sink);
+                        if (sinkUser != null) {
+                            throw new InstantiationException(String.format(
+                                    "Sink %s of group %s already " +
+                                            "in use by group %s", sink, groupName, sinkUser));
+                        } else {
+                            throw new InstantiationException(String.format(
+                                    "Sink %s of group %s does "
+                                            + "not exist or is not properly configured", sink,
+                                    groupName));
+                        }
+                    }
+                    groupSinks.add(s);
+                    usedSinks.put(sink, groupName);
+                }
+                try {
+                    SinkGroup group = new SinkGroup(groupSinks);
+                    Configurables.configure(group, groupConf);
+                    sinkRunnerMap.put(comp.getComponentName(),
+                            new SinkRunner(group.getProcessor()));
+                } catch (Exception e) {
+                    String msg = String.format("SinkGroup %s has been removed due to " +
+                            "an error during configuration", groupName);
+                    LOGGER.error(msg, e);
+                }
+            }
+        }
+        // add any unassigned sinks to solo collectors
+        for (Entry<String, Sink> entry : sinks.entrySet()) {
+            if (!usedSinks.containsValue(entry.getKey())) {
+                try {
+                    SinkProcessor pr = new DefaultSinkProcessor();
+                    List<Sink> sinkMap = new ArrayList<Sink>();
+                    sinkMap.add(entry.getValue());
+                    pr.setSinks(sinkMap);
+                    Configurables.configure(pr, new Context());
+                    sinkRunnerMap.put(entry.getKey(), new SinkRunner(pr));
+                } catch (Exception e) {
+                    String msg = String.format("SinkGroup %s has been removed due to " +
+                            "an error during configuration", entry.getKey());
+                    LOGGER.error(msg, e);
+                }
+            }
+        }
+    }
+
+    private static class ChannelComponent {
+        final Channel channel;
+        final List<String> components;
+
+        ChannelComponent(Channel channel) {
+            this.channel = channel;
+            components = Lists.newArrayList();
+        }
+    }
+
+    protected Map<String, String> toMap(Properties properties) {
+        Map<String, String> result = Maps.newHashMap();
+        Enumeration<?> propertyNames = properties.propertyNames();
+        while (propertyNames.hasMoreElements()) {
+            String name = (String) propertyNames.nextElement();
+            String value = properties.getProperty(name);
+            result.put(name, value);
+        }
+        return result;
+    }
+}
\ No newline at end of file
diff --git a/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/node/AbstractZooKeeperConfigurationProvider.java b/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/node/AbstractZooKeeperConfigurationProvider.java
new file mode 100644
index 0000000..8bff0e9
--- /dev/null
+++ b/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/node/AbstractZooKeeperConfigurationProvider.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.flume.node;
+
+import java.io.IOException;
+import java.io.StringReader;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Properties;
+
+import com.google.common.base.Charsets;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.flume.conf.FlumeConfiguration;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+
+/**
+ * ZooKeeper based configuration implementation provider.
+ *
+ * The Agent configuration can be uploaded in ZooKeeper under a base name, which
+ * defaults to /flume
+ *
+ * Currently the agent configuration is stored under the agent name node in
+ * ZooKeeper
+ *
+ * <PRE>
+ *   /flume
+ *       /a1 [agent config file]
+ *       /a2 [agent config file]
+ *       /a3 [agent config file]
+ * </PRE>
+ *
+ * Configuration format is same as PropertiesFileConfigurationProvider
+ *
+ * Configuration properties
+ *
+ * agentName - Name of Agent for which configuration needs to be pulled
+ *
+ * zkConnString - Connection string to ZooKeeper Ensemble
+ * (host:port,host1:port1)
+ *
+ * basePath - Base Path where agent configuration needs to be stored. Defaults
+ * to /flume
+ */
+public abstract class AbstractZooKeeperConfigurationProvider extends
+        AbstractConfigurationProvider {
+
+    static final String DEFAULT_ZK_BASE_PATH = "/flume";
+
+    protected final String basePath;
+
+    protected final String zkConnString;
+
+    protected AbstractZooKeeperConfigurationProvider(String agentName,
+                                                     String zkConnString, String basePath) {
+        super(agentName);
+        Preconditions.checkArgument(!Strings.isNullOrEmpty(zkConnString),
+                "Invalid Zookeeper Connection String %s", zkConnString);
+        this.zkConnString = zkConnString;
+        if (basePath == null || basePath.isEmpty()) {
+            this.basePath = DEFAULT_ZK_BASE_PATH;
+        } else {
+            this.basePath = basePath;
+        }
+    }
+
+    protected CuratorFramework createClient() {
+        return CuratorFrameworkFactory.newClient(zkConnString,
+                new ExponentialBackoffRetry(1000, 1));
+    }
+
+    protected FlumeConfiguration configFromBytes(byte[] configData)
+            throws IOException {
+        Map<String, String> configMap;
+        if (configData == null || configData.length == 0) {
+            configMap = Collections.emptyMap();
+        } else {
+            String fileContent = new String(configData, Charsets.UTF_8);
+            Properties properties = new Properties();
+            properties.load(new StringReader(fileContent));
+            configMap = toMap(properties);
+        }
+        return new FlumeConfiguration(configMap);
+    }
+}
diff --git a/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/node/Application.java b/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/node/Application.java
new file mode 100644
index 0000000..6538ad2
--- /dev/null
+++ b/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/node/Application.java
@@ -0,0 +1,238 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.flume.node;
+
+import com.google.common.base.Throwables;
+import com.google.common.eventbus.Subscribe;
+import org.apache.flume.Channel;
+import org.apache.flume.Context;
+import org.apache.flume.SinkRunner;
+import org.apache.flume.SourceRunner;
+import org.apache.flume.instrumentation.MonitorService;
+import org.apache.flume.instrumentation.MonitoringType;
+import org.apache.flume.lifecycle.LifecycleAware;
+import org.apache.flume.lifecycle.LifecycleState;
+import org.apache.flume.lifecycle.LifecycleSupervisor;
+import org.apache.flume.lifecycle.LifecycleSupervisor.SupervisorPolicy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class Application {
+
+    private static final Logger logger = LoggerFactory
+            .getLogger(Application.class);
+
+    public static final String CONF_MONITOR_CLASS = "flume.monitoring.type";
+    public static final String CONF_MONITOR_PREFIX = "flume.monitoring.";
+
+    private final List<LifecycleAware> components;
+    private final LifecycleSupervisor supervisor;
+    private MaterializedConfiguration materializedConfiguration;
+    private MonitorService monitorServer;
+    private final ReentrantLock lifecycleLock = new ReentrantLock();
+
+    public Application() {
+        this(new ArrayList<LifecycleAware>(0));
+    }
+
+    public Application(List<LifecycleAware> components) {
+        this.components = components;
+        supervisor = new LifecycleSupervisor();
+    }
+
+    public void start() {
+        lifecycleLock.lock();
+        try {
+            for (LifecycleAware component : components) {
+                supervisor.supervise(component,
+                        new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
+            }
+        } finally {
+            lifecycleLock.unlock();
+        }
+    }
+
+    @Subscribe
+    public void handleConfigurationEvent(MaterializedConfiguration conf) {
+        try {
+            lifecycleLock.lockInterruptibly();
+            stopAllComponents();
+            startAllComponents(conf);
+        } catch (InterruptedException e) {
+            logger.info("Interrupted while trying to handle configuration event");
+            return;
+        } finally {
+            // If interrupted while trying to lock, we don't own the lock, so must not attempt to unlock
+            if (lifecycleLock.isHeldByCurrentThread()) {
+                lifecycleLock.unlock();
+            }
+        }
+    }
+
+    public void stop() {
+        lifecycleLock.lock();
+        stopAllComponents();
+        try {
+            supervisor.stop();
+            if (monitorServer != null) {
+                monitorServer.stop();
+            }
+        } finally {
+            lifecycleLock.unlock();
+        }
+    }
+
+    private void stopAllComponents() {
+        if (this.materializedConfiguration != null) {
+            logger.info("Shutting down configuration: {}", this.materializedConfiguration);
+            for (Entry<String, SourceRunner> entry :
+                    this.materializedConfiguration.getSourceRunners().entrySet()) {
+                try {
+                    logger.info("Stopping Source " + entry.getKey());
+                    supervisor.unsupervise(entry.getValue());
+                } catch (Exception e) {
+                    logger.error("Error while stopping {}", entry.getValue(), e);
+                }
+            }
+
+            for (Entry<String, SinkRunner> entry :
+                    this.materializedConfiguration.getSinkRunners().entrySet()) {
+                try {
+                    logger.info("Stopping Sink " + entry.getKey());
+                    supervisor.unsupervise(entry.getValue());
+                } catch (Exception e) {
+                    logger.error("Error while stopping {}", entry.getValue(), e);
+                }
+            }
+
+            for (Entry<String, Channel> entry :
+                    this.materializedConfiguration.getChannels().entrySet()) {
+                try {
+                    logger.info("Stopping Channel " + entry.getKey());
+                    supervisor.unsupervise(entry.getValue());
+                } catch (Exception e) {
+                    logger.error("Error while stopping {}", entry.getValue(), e);
+                }
+            }
+        }
+        if (monitorServer != null) {
+            monitorServer.stop();
+        }
+    }
+
+    private void startAllComponents(MaterializedConfiguration materializedConfiguration) {
+        logger.info("Starting new configuration:{}", materializedConfiguration);
+
+        this.materializedConfiguration = materializedConfiguration;
+
+        for (Entry<String, Channel> entry :
+                materializedConfiguration.getChannels().entrySet()) {
+            try {
+                logger.info("Starting Channel " + entry.getKey());
+                supervisor.supervise(entry.getValue(),
+                        new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
+            } catch (Exception e) {
+                logger.error("Error while starting {}", entry.getValue(), e);
+            }
+        }
+
+    /*
+     * Wait for all channels to start.
+     */
+        for (Channel ch : materializedConfiguration.getChannels().values()) {
+            while (ch.getLifecycleState() != LifecycleState.START
+                    && !supervisor.isComponentInErrorState(ch)) {
+                try {
+                    logger.info("Waiting for channel: " + ch.getName() +
+                            " to start. Sleeping for 500 ms");
+                    Thread.sleep(500);
+                } catch (InterruptedException e) {
+                    logger.error("Interrupted while waiting for channel to start.", e);
+                    Throwables.propagate(e);
+                }
+            }
+        }
+
+        for (Entry<String, SinkRunner> entry : materializedConfiguration.getSinkRunners().entrySet()) {
+            try {
+                logger.info("Starting Sink " + entry.getKey());
+                supervisor.supervise(entry.getValue(),
+                        new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
+            } catch (Exception e) {
+                logger.error("Error while starting {}", entry.getValue(), e);
+            }
+        }
+
+        for (Entry<String, SourceRunner> entry :
+                materializedConfiguration.getSourceRunners().entrySet()) {
+            try {
+                logger.info("Starting Source " + entry.getKey());
+                supervisor.supervise(entry.getValue(),
+                        new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
+            } catch (Exception e) {
+                logger.error("Error while starting {}", entry.getValue(), e);
+            }
+        }
+
+        this.loadMonitoring();
+    }
+
+    @SuppressWarnings("unchecked")
+    private void loadMonitoring() {
+        Properties systemProps = System.getProperties();
+        Set<String> keys = systemProps.stringPropertyNames();
+        try {
+            if (keys.contains(CONF_MONITOR_CLASS)) {
+                String monitorType = systemProps.getProperty(CONF_MONITOR_CLASS);
+                Class<? extends MonitorService> klass;
+                try {
+                    //Is it a known type?
+                    klass = MonitoringType.valueOf(
+                            monitorType.toUpperCase(Locale.ENGLISH)).getMonitorClass();
+                } catch (Exception e) {
+                    //Not a known type, use FQCN
+                    klass = (Class<? extends MonitorService>) Class.forName(monitorType);
+                }
+                this.monitorServer = klass.newInstance();
+                Context context = new Context();
+                for (String key : keys) {
+                    if (key.startsWith(CONF_MONITOR_PREFIX)) {
+                        context.put(key.substring(CONF_MONITOR_PREFIX.length()),
+                                systemProps.getProperty(key));
+                    }
+                }
+                monitorServer.configure(context);
+                monitorServer.start();
+            }
+        } catch (Exception e) {
+            logger.warn("Error starting monitoring. "
+                    + "Monitoring might not be available.", e);
+        }
+
+    }
+
+}
\ No newline at end of file
diff --git a/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/node/ConfigurationProvider.java b/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/node/ConfigurationProvider.java
new file mode 100644
index 0000000..e2a7ffe
--- /dev/null
+++ b/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/node/ConfigurationProvider.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.flume.node;
+
+public interface ConfigurationProvider {
+    MaterializedConfiguration getConfiguration();
+}
diff --git a/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/node/EnvVarResolverProperties.java b/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/node/EnvVarResolverProperties.java
new file mode 100644
index 0000000..d7b0c22
--- /dev/null
+++ b/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/node/EnvVarResolverProperties.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.flume.node;
+
+import com.google.common.base.Preconditions;
+
+import java.util.Properties;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+
+/**
+ * A class that extends the Java built-in Properties overriding
+ * {@link java.util.Properties#getProperty(String)} to allow ${ENV_VAR_NAME}-style environment
+ * variable inclusions
+ */
+public class EnvVarResolverProperties extends Properties {
+    /**
+     * @param input The input string with ${ENV_VAR_NAME}-style environment variable names
+     * @return The output string with ${ENV_VAR_NAME} replaced with their environment variable values
+     */
+    protected static String resolveEnvVars(String input) {
+        Preconditions.checkNotNull(input);
+        // match ${ENV_VAR_NAME}
+        Pattern p = Pattern.compile("\\$\\{(\\w+)\\}");
+        Matcher m = p.matcher(input);
+        StringBuffer sb = new StringBuffer();
+        while (m.find()) {
+            String envVarName = m.group(1);
+            String envVarValue = System.getenv(envVarName);
+            m.appendReplacement(sb, null == envVarValue ? "" : envVarValue);
+        }
+        m.appendTail(sb);
+        return sb.toString();
+    }
+
+    /**
+     * @param key the property key
+     * @return the value of the property key with ${ENV_VAR_NAME}-style environment variables replaced
+     */
+    @Override
+    public String getProperty(String key) {
+        return resolveEnvVars(super.getProperty(key));
+    }
+}
diff --git a/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/node/MaterializedConfiguration.java b/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/node/MaterializedConfiguration.java
new file mode 100644
index 0000000..c46fbf5
--- /dev/null
+++ b/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/node/MaterializedConfiguration.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.flume.node;
+
+import org.apache.flume.Channel;
+import org.apache.flume.SinkRunner;
+import org.apache.flume.SourceRunner;
+
+import com.google.common.collect.ImmutableMap;
+
+/**
+ * MaterializedConfiguration represents the materialization of a Flume
+ * properties file. That is it's the actual Source, Sink, and Channels
+ * represented in the configuration file.
+ */
+public interface MaterializedConfiguration {
+
+    public void addSourceRunner(String name, SourceRunner sourceRunner);
+
+    public void addSinkRunner(String name, SinkRunner sinkRunner);
+
+    public void addChannel(String name, Channel channel);
+
+    public ImmutableMap<String, SourceRunner> getSourceRunners();
+
+    public ImmutableMap<String, SinkRunner> getSinkRunners();
+
+    public ImmutableMap<String, Channel> getChannels();
+
+}
diff --git a/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/node/PollingPropertiesFileConfigurationProvider.java b/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/node/PollingPropertiesFileConfigurationProvider.java
new file mode 100644
index 0000000..2c628ae
--- /dev/null
+++ b/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/node/PollingPropertiesFileConfigurationProvider.java
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.flume.node;
+
+import java.io.File;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.flume.CounterGroup;
+import org.apache.flume.lifecycle.LifecycleAware;
+import org.apache.flume.lifecycle.LifecycleState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.common.eventbus.EventBus;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+public class PollingPropertiesFileConfigurationProvider
+        extends PropertiesFileConfigurationProvider
+        implements LifecycleAware {
+
+    private static final Logger LOGGER =
+            LoggerFactory.getLogger(PollingPropertiesFileConfigurationProvider.class);
+
+    private final EventBus eventBus;
+    private final File file;
+    private final int interval;
+    private final CounterGroup counterGroup;
+    private LifecycleState lifecycleState;
+
+    private ScheduledExecutorService executorService;
+
+    public PollingPropertiesFileConfigurationProvider(String agentName,
+                                                      File file, EventBus eventBus, int interval) {
+        super(agentName, file);
+        this.eventBus = eventBus;
+        this.file = file;
+        this.interval = interval;
+        counterGroup = new CounterGroup();
+        lifecycleState = LifecycleState.IDLE;
+    }
+
+    @Override
+    public void start() {
+        LOGGER.info("Configuration provider starting");
+
+        Preconditions.checkState(file != null,
+                "The parameter file must not be null");
+
+        executorService = Executors.newSingleThreadScheduledExecutor(
+                new ThreadFactoryBuilder().setNameFormat("conf-file-poller-%d")
+                        .build());
+
+        FileWatcherRunnable fileWatcherRunnable =
+                new FileWatcherRunnable(file, counterGroup);
+
+        executorService.scheduleWithFixedDelay(fileWatcherRunnable, 0, interval,
+                TimeUnit.SECONDS);
+
+        lifecycleState = LifecycleState.START;
+
+        LOGGER.debug("Configuration provider started");
+    }
+
+    @Override
+    public void stop() {
+        LOGGER.info("Configuration provider stopping");
+
+        executorService.shutdown();
+        try {
+            if (!executorService.awaitTermination(500, TimeUnit.MILLISECONDS)) {
+                LOGGER.debug("File watcher has not terminated. Forcing shutdown of executor.");
+                executorService.shutdownNow();
+                while (!executorService.awaitTermination(500, TimeUnit.MILLISECONDS)) {
+                    LOGGER.debug("Waiting for file watcher to terminate");
+                }
+            }
+        } catch (InterruptedException e) {
+            LOGGER.debug("Interrupted while waiting for file watcher to terminate");
+            Thread.currentThread().interrupt();
+        }
+        lifecycleState = LifecycleState.STOP;
+        LOGGER.debug("Configuration provider stopped");
+    }
+
+    @Override
+    public synchronized LifecycleState getLifecycleState() {
+        return lifecycleState;
+    }
+
+
+    @Override
+    public String toString() {
+        return "{ file:" + file + " counterGroup:" + counterGroup + "  provider:"
+                + getClass().getCanonicalName() + " agentName:" + getAgentName() + " }";
+    }
+
+    public class FileWatcherRunnable implements Runnable {
+
+        private final File file;
+        private final CounterGroup counterGroup;
+
+        private long lastChange;
+
+        public FileWatcherRunnable(File file, CounterGroup counterGroup) {
+            super();
+            this.file = file;
+            this.counterGroup = counterGroup;
+            this.lastChange = 0L;
+        }
+
+        @Override
+        public void run() {
+            LOGGER.debug("Checking file:{} for changes", file);
+
+            counterGroup.incrementAndGet("file.checks");
+
+            long lastModified = file.lastModified();
+
+            if (lastModified > lastChange) {
+                LOGGER.info("Reloading configuration file:{}", file);
+
+                counterGroup.incrementAndGet("file.loads");
+
+                lastChange = lastModified;
+
+                try {
+                    eventBus.post(getConfiguration());
+                } catch (Exception e) {
+                    LOGGER.error("Failed to load configuration data. Exception follows.",
+                            e);
+                } catch (NoClassDefFoundError e) {
+                    LOGGER.error("Failed to start agent because dependencies were not " +
+                            "found in classpath. Error follows.", e);
+                } catch (Throwable t) {
+                    // caught because the caller does not handle or log Throwables
+                    LOGGER.error("Unhandled error", t);
+                }
+            }
+        }
+    }
+
+}
diff --git a/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/node/PollingZooKeeperConfigurationProvider.java b/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/node/PollingZooKeeperConfigurationProvider.java
new file mode 100644
index 0000000..b80eed3
--- /dev/null
+++ b/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/node/PollingZooKeeperConfigurationProvider.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.flume.node;
+
+import java.io.IOException;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.cache.ChildData;
+import org.apache.curator.framework.recipes.cache.NodeCache;
+import org.apache.curator.framework.recipes.cache.NodeCacheListener;
+import org.apache.flume.FlumeException;
+import org.apache.flume.conf.FlumeConfiguration;
+import org.apache.flume.lifecycle.LifecycleAware;
+import org.apache.flume.lifecycle.LifecycleState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.eventbus.EventBus;
+
+public class PollingZooKeeperConfigurationProvider extends
+        AbstractZooKeeperConfigurationProvider implements LifecycleAware {
+
+    private static final Logger LOGGER = LoggerFactory
+            .getLogger(PollingZooKeeperConfigurationProvider.class);
+
+    private final EventBus eventBus;
+
+    private final CuratorFramework client;
+
+    private NodeCache agentNodeCache;
+
+    private FlumeConfiguration flumeConfiguration;
+
+    private LifecycleState lifecycleState;
+
+    public PollingZooKeeperConfigurationProvider(String agentName,
+                                                 String zkConnString, String basePath, EventBus eventBus) {
+        super(agentName, zkConnString, basePath);
+        this.eventBus = eventBus;
+        client = createClient();
+        agentNodeCache = null;
+        flumeConfiguration = null;
+        lifecycleState = LifecycleState.IDLE;
+    }
+
+    @Override
+    protected FlumeConfiguration getFlumeConfiguration() {
+        return flumeConfiguration;
+    }
+
+    @Override
+    public void start() {
+        LOGGER.debug("Starting...");
+        try {
+            client.start();
+            try {
+                agentNodeCache = new NodeCache(client, basePath + "/" + getAgentName());
+                agentNodeCache.start();
+                agentNodeCache.getListenable().addListener(new NodeCacheListener() {
+                    @Override
+                    public void nodeChanged() throws Exception {
+                        refreshConfiguration();
+                    }
+                });
+            } catch (Exception e) {
+                client.close();
+                throw e;
+            }
+        } catch (Exception e) {
+            lifecycleState = LifecycleState.ERROR;
+            if (e instanceof RuntimeException) {
+                throw (RuntimeException) e;
+            } else {
+                throw new FlumeException(e);
+            }
+        }
+        lifecycleState = LifecycleState.START;
+    }
+
+    private void refreshConfiguration() throws IOException {
+        LOGGER.info("Refreshing configuration from ZooKeeper");
+        byte[] data = null;
+        ChildData childData = agentNodeCache.getCurrentData();
+        if (childData != null) {
+            data = childData.getData();
+        }
+        flumeConfiguration = configFromBytes(data);
+        eventBus.post(getConfiguration());
+    }
+
+    @Override
+    public void stop() {
+        LOGGER.debug("Stopping...");
+        if (agentNodeCache != null) {
+            try {
+                agentNodeCache.close();
+            } catch (IOException e) {
+                LOGGER.warn("Encountered exception while stopping", e);
+                lifecycleState = LifecycleState.ERROR;
+            }
+        }
+
+        try {
+            client.close();
+        } catch (Exception e) {
+            LOGGER.warn("Error stopping Curator client", e);
+            lifecycleState = LifecycleState.ERROR;
+        }
+
+        if (lifecycleState != LifecycleState.ERROR) {
+            lifecycleState = LifecycleState.STOP;
+        }
+    }
+
+    @Override
+    public LifecycleState getLifecycleState() {
+        return lifecycleState;
+    }
+}
diff --git a/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/node/PropertiesFileConfigurationProvider.java b/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/node/PropertiesFileConfigurationProvider.java
new file mode 100644
index 0000000..03055b6
--- /dev/null
+++ b/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/node/PropertiesFileConfigurationProvider.java
@@ -0,0 +1,217 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.flume.node;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Properties;
+
+import org.apache.flume.conf.FlumeConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * <p>
+ * A configuration provider that uses properties file for specifying
+ * configuration. The configuration files follow the Java properties file syntax
+ * rules specified at {@link java.util.Properties#load(java.io.Reader)}. Every
+ * configuration value specified in the properties file is prefixed by an
+ * <em>Agent Name</em> which helps isolate an individual agent&apos;s namespace.
+ * </p>
+ * <p>
+ * Valid configuration files must observe the following rules for every agent
+ * namespace.
+ * <ul>
+ * <li>For every &lt;agent name&gt; there must be three lists specified that
+ * include <tt>&lt;agent name&gt;.sources</tt>,
+ * <tt>&lt;agent name&gt;.sinks</tt>, and <tt>&lt;agent name&gt;.channels</tt>.
+ * Each of these lists must contain a space separated list of names
+ * corresponding to that particular entity.</li>
+ * <li>For each source named in <tt>&lt;agent name&gt;.sources</tt>, there must
+ * be a non-empty <tt>type</tt> attribute specified from the valid set of source
+ * types. For example:
+ * <tt>&lt;agent name&gt;.sources.&lt;source name&gt;.type = event</tt></li>
+ * <li>For each source named in <tt>&lt;agent name&gt;.sources</tt>, there must
+ * be a space-separated list of channel names that the source will associate
+ * with during runtime. Each of these names must be contained in the channels
+ * list specified by <tt>&lt;agent name&gt;.channels</tt>. For example:
+ * <tt>&lt;agent name&gt;.sources.&lt;source name&gt;.channels =
+ * &lt;channel-1 name&gt; &lt;channel-2 name&gt;</tt></li>
+ * <li>For each source named in the <tt>&lt;agent name&gt;.sources</tt>, there
+ * must be a <tt>runner</tt> namespace of configuration that configures the
+ * associated source runner. For example:
+ * <tt>&lt;agent name&gt;.sources.&lt;source name&gt;.runner.type = avro</tt>.
+ * This namespace can also be used to configure other configuration of the
+ * source runner as needed. For example:
+ * <tt>&lt;agent name&gt;.sources.&lt;source name&gt;.runner.port = 10101</tt>
+ * </li>
+ * <li>For each source named in <tt>&lt;sources&gt;.sources</tt> there can
+ * be an optional <tt>selector.type</tt> specified that identifies the type
+ * of channel selector associated with the source. If not specified, the
+ * default replicating channel selector is used.
+ * </li><li>For each channel named in the <tt>&lt;agent name&gt;.channels</tt>,
+ * there must be a non-empty <tt>type</tt> attribute specified from the valid
+ * set of channel types. For example:
+ * <tt>&lt;agent name&gt;.channels.&lt;channel name&gt;.type = mem</tt></li>
+ * <li>For each sink named in the <tt>&lt;agent name&gt;.sinks</tt>, there must
+ * be a non-empty <tt>type</tt> attribute specified from the valid set of sink
+ * types. For example:
+ * <tt>&lt;agent name&gt;.sinks.&lt;sink name&gt;.type = hdfs</tt></li>
+ * <li>For each sink named in the <tt>&lt;agent name&gt;.sinks</tt>, there must
+ * be a non-empty single-valued channel name specified as the value of the
+ * <tt>channel</tt> attribute. This value must be contained in the channels list
+ * specified by <tt>&lt;agent name&gt;.channels</tt>. For example:
+ * <tt>&lt;agent name&gt;.sinks.&lt;sink name&gt;.channel =
+ * &lt;channel name&gt;</tt></li>
+ * <li>For each sink named in the <tt>&lt;agent name&gt;.sinks</tt>, there must
+ * be a <tt>runner</tt> namespace of configuration that configures the
+ * associated sink runner. For example:
+ * <tt>&lt;agent name&gt;.sinks.&lt;sink name&gt;.runner.type = polling</tt>.
+ * This namespace can also be used to configure other configuration of the sink
+ * runner as needed. For example:
+ * <tt>&lt;agent name&gt;.sinks.&lt;sink name&gt;.runner.polling.interval =
+ * 60</tt></li>
+ * <li>A fourth optional list <tt>&lt;agent name&gt;.sinkgroups</tt>
+ * may be added to each agent, consisting of unique space separated names
+ * for groups</li>
+ * <li>Each sinkgroup must specify sinks, containing a list of all sinks
+ * belonging to it. These cannot be shared by multiple groups.
+ * Further, one can set a processor and behavioral parameters to determine
+ * how sink selection is made via <tt>&lt;agent name&gt;.sinkgroups.&lt;
+ * group name&lt.processor</tt>. For further detail refer to individual processor
+ * documentation</li>
+ * <li>Sinks not assigned to a group will be assigned to default single sink
+ * groups.</li>
+ * </ul>
+ *
+ * Apart from the above required configuration values, each source, sink or
+ * channel can have its own set of arbitrary configuration as required by the
+ * implementation. Each of these configuration values are expressed by fully
+ * namespace qualified configuration keys. For example, the configuration
+ * property called <tt>capacity</tt> for a channel called <tt>ch1</tt> for the
+ * agent named <tt>host1</tt> with value <tt>1000</tt> will be expressed as:
+ * <tt>host1.channels.ch1.capacity = 1000</tt>.
+ * </p>
+ * <p>
+ * Any information contained in the configuration file other than what pertains
+ * to the configured agents, sources, sinks and channels via the explicitly
+ * enumerated list of sources, sinks and channels per agent name are ignored by
+ * this provider. Moreover, if any of the required configuration values are not
+ * present in the configuration file for the configured entities, that entity
+ * and anything that depends upon it is considered invalid and consequently not
+ * configured. For example, if a channel is missing its <tt>type</tt> attribute,
+ * it is considered misconfigured. Also, any sources or sinks that depend upon
+ * this channel are also considered misconfigured and not initialized.
+ * </p>
+ * <p>
+ * Example configuration file:
+ *
+ * <pre>
+ * #
+ * # Flume Configuration
+ * # This file contains configuration for one Agent identified as host1.
+ * #
+ *
+ * host1.sources = avroSource thriftSource
+ * host1.channels = jdbcChannel
+ * host1.sinks = hdfsSink
+ *
+ * # avroSource configuration
+ * host1.sources.avroSource.type = org.apache.flume.source.AvroSource
+ * host1.sources.avroSource.runner.type = avro
+ * host1.sources.avroSource.runner.port = 11001
+ * host1.sources.avroSource.channels = jdbcChannel
+ * host1.sources.avroSource.selector.type = replicating
+ *
+ * # thriftSource configuration
+ * host1.sources.thriftSource.type = org.apache.flume.source.ThriftSource
+ * host1.sources.thriftSource.runner.type = thrift
+ * host1.sources.thriftSource.runner.port = 12001
+ * host1.sources.thriftSource.channels = jdbcChannel
+ *
+ * # jdbcChannel configuration
+ * host1.channels.jdbcChannel.type = jdbc
+ * host1.channels.jdbcChannel.jdbc.driver = com.mysql.jdbc.Driver
+ * host1.channels.jdbcChannel.jdbc.connect.url = http://localhost/flumedb
+ * host1.channels.jdbcChannel.jdbc.username = flume
+ * host1.channels.jdbcChannel.jdbc.password = flume
+ *
+ * # hdfsSink configuration
+ * host1.sinks.hdfsSink.type = hdfs
+ * host1.sinks.hdfsSink.hdfs.path = hdfs://localhost/
+ * host1.sinks.hdfsSink.batchsize = 1000
+ * host1.sinks.hdfsSink.runner.type = polling
+ * host1.sinks.hdfsSink.runner.polling.interval = 60
+ * </pre>
+ *
+ * </p>
+ *
+ * @see java.util.Properties#load(java.io.Reader)
+ */
+public class PropertiesFileConfigurationProvider extends
+        AbstractConfigurationProvider {
+
+    private static final Logger LOGGER = LoggerFactory
+            .getLogger(PropertiesFileConfigurationProvider.class);
+    private static final String DEFAULT_PROPERTIES_IMPLEMENTATION = "java.util.Properties";
+
+    private final File file;
+
+    public PropertiesFileConfigurationProvider(String agentName, File file) {
+        super(agentName);
+        this.file = file;
+    }
+
+    @Override
+    public FlumeConfiguration getFlumeConfiguration() {
+        BufferedReader reader = null;
+        try {
+            reader = new BufferedReader(new FileReader(file));
+            String resolverClassName = System.getProperty("propertiesImplementation",
+                    DEFAULT_PROPERTIES_IMPLEMENTATION);
+            Class<? extends Properties> propsclass = Class.forName(resolverClassName)
+                    .asSubclass(Properties.class);
+            Properties properties = propsclass.newInstance();
+            properties.load(reader);
+            return new FlumeConfiguration(toMap(properties));
+        } catch (IOException ex) {
+            LOGGER.error("Unable to load file:" + file
+                    + " (I/O failure) - Exception follows.", ex);
+        } catch (ClassNotFoundException e) {
+            LOGGER.error("Configuration resolver class not found", e);
+        } catch (InstantiationException e) {
+            LOGGER.error("Instantiation exception", e);
+        } catch (IllegalAccessException e) {
+            LOGGER.error("Illegal access exception", e);
+        } finally {
+            if (reader != null) {
+                try {
+                    reader.close();
+                } catch (IOException ex) {
+                    LOGGER.warn(
+                            "Unable to close file reader for file: " + file, ex);
+                }
+            }
+        }
+        return new FlumeConfiguration(new HashMap<String, String>());
+    }
+}
diff --git a/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/node/SimpleMaterializedConfiguration.java b/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/node/SimpleMaterializedConfiguration.java
new file mode 100644
index 0000000..fd14d84
--- /dev/null
+++ b/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/node/SimpleMaterializedConfiguration.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.flume.node;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.flume.Channel;
+import org.apache.flume.SinkRunner;
+import org.apache.flume.SourceRunner;
+
+import com.google.common.collect.ImmutableMap;
+
+public class SimpleMaterializedConfiguration implements MaterializedConfiguration {
+
+    private final Map<String, Channel> channels;
+    private final Map<String, SourceRunner> sourceRunners;
+    private final Map<String, SinkRunner> sinkRunners;
+
+    public SimpleMaterializedConfiguration() {
+        channels = new HashMap<String, Channel>();
+        sourceRunners = new HashMap<String, SourceRunner>();
+        sinkRunners = new HashMap<String, SinkRunner>();
+    }
+
+    @Override
+    public String toString() {
+        return "{ sourceRunners:" + sourceRunners + " sinkRunners:" + sinkRunners
+                + " channels:" + channels + " }";
+    }
+
+    @Override
+    public void addSourceRunner(String name, SourceRunner sourceRunner) {
+        sourceRunners.put(name, sourceRunner);
+    }
+
+    @Override
+    public void addSinkRunner(String name, SinkRunner sinkRunner) {
+        sinkRunners.put(name, sinkRunner);
+    }
+
+    @Override
+    public void addChannel(String name, Channel channel) {
+        channels.put(name, channel);
+    }
+
+    @Override
+    public ImmutableMap<String, Channel> getChannels() {
+        return ImmutableMap.copyOf(channels);
+    }
+
+    @Override
+    public ImmutableMap<String, SourceRunner> getSourceRunners() {
+        return ImmutableMap.copyOf(sourceRunners);
+    }
+
+    @Override
+    public ImmutableMap<String, SinkRunner> getSinkRunners() {
+        return ImmutableMap.copyOf(sinkRunners);
+    }
+
+}
diff --git a/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/node/StaticZooKeeperConfigurationProvider.java b/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/node/StaticZooKeeperConfigurationProvider.java
new file mode 100644
index 0000000..2a927e0
--- /dev/null
+++ b/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/node/StaticZooKeeperConfigurationProvider.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.flume.node;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.flume.FlumeException;
+import org.apache.flume.conf.FlumeConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class StaticZooKeeperConfigurationProvider extends
+        AbstractZooKeeperConfigurationProvider {
+
+    private static final Logger LOGGER = LoggerFactory
+            .getLogger(StaticZooKeeperConfigurationProvider.class);
+
+    public StaticZooKeeperConfigurationProvider(String agentName,
+                                                String zkConnString, String basePath) {
+        super(agentName, zkConnString, basePath);
+    }
+
+    @Override
+    protected FlumeConfiguration getFlumeConfiguration() {
+        try {
+            CuratorFramework cf = createClient();
+            cf.start();
+            try {
+                byte[] data = cf.getData().forPath(basePath + "/" + getAgentName());
+                return configFromBytes(data);
+            } finally {
+                cf.close();
+            }
+        } catch (Exception e) {
+            LOGGER.error("Error getting configuration info from Zookeeper", e);
+            throw new FlumeException(e);
+        }
+    }
+
+}
diff --git a/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/sink/AbstractSink.java b/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/sink/AbstractSink.java
new file mode 100644
index 0000000..3e707ed
--- /dev/null
+++ b/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/sink/AbstractSink.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.flume.sink;
+
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.Sink;
+import org.apache.pulsar.io.core.SinkContext;
+import org.apache.pulsar.io.flume.FlumeConfig;
+import org.apache.pulsar.io.flume.FlumeConnector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * A Simple abstract sink class for pulsar to flume.
+ */
+public abstract class AbstractSink<T> implements Sink<T> {
+
+    private static final Logger log = LoggerFactory.getLogger(AbstractSink.class);
+
+
+    public abstract T extractValue(Record<T> record);
+
+    protected static BlockingQueue<Map<String, Object>> records;
+
+    protected FlumeConnector flumeConnector;
+
+    public static BlockingQueue<Map<String, Object>> getQueue() {
+        return records;
+    }
+
+    @Override
+    public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
+
+        records = new LinkedBlockingQueue<Map<String, Object>>();
+
+        FlumeConfig flumeConfig = FlumeConfig.load(config);
+
+        flumeConnector = new FlumeConnector();
+        flumeConnector.StartConnector(flumeConfig);
+    }
+
+    @Override
+    public void write(Record<T> record) {
+        try {
+            T message = extractValue(record);
+            Map<String, Object> m = new HashMap();
+            m.put("body", message);
+            records.put(m);
+            record.ack();
+        } catch (InterruptedException e) {
+            record.fail();
+            log.error("error", e);
+        }
+    }
+
+    @Override
+    public void close() {
+        if (flumeConnector != null) {
+            flumeConnector.stop();
+        }
+    }
+}
diff --git a/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/sink/SourceOfFlume.java b/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/sink/SourceOfFlume.java
new file mode 100644
index 0000000..a1be6e7
--- /dev/null
+++ b/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/sink/SourceOfFlume.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.flume.sink;
+
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.conf.BatchSizeSupported;
+import org.apache.flume.event.EventBuilder;
+import org.apache.flume.instrumentation.SourceCounter;
+import org.apache.flume.source.AbstractPollableSource;
+import org.apache.flume.source.avro.AvroFlumeEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+
+
+import com.google.common.base.Optional;
+
+import static org.apache.flume.source.SpoolDirectorySourceConfigurationConstants.BATCH_SIZE;
+
+
+public class SourceOfFlume extends AbstractPollableSource implements BatchSizeSupported {
+
+    private static final Logger log = LoggerFactory
+            .getLogger(SourceOfFlume.class);
+
+    public static final String BATCH_DURATION_MS = "batchDurationMillis";
+
+    private long batchSize;
+
+    private int maxBatchDurationMillis;
+
+    private SourceCounter counter;
+
+    private final List<Event> eventList = new ArrayList<Event>();
+
+    private Optional<SpecificDatumReader<AvroFlumeEvent>> reader = Optional.absent();
+
+
+    @Override
+    public synchronized void doStart() {
+        log.info("start source of flume ...");
+        this.counter = new SourceCounter("flume-source");
+        this.counter.start();
+    }
+
+    @Override
+    public void doStop() {
+        log.info("stop source of flume ...");
+        this.counter.stop();
+    }
+
+    @Override
+    public void doConfigure(Context context) {
+        batchSize = context.getInteger(BATCH_SIZE, 1000);
+        maxBatchDurationMillis = context.getInteger(BATCH_DURATION_MS, 1000);
+        log.info("context: {}", context);
+    }
+
+    @Override
+    public Status doProcess() {
+        Event event;
+        String eventBody;
+        try {
+            final long maxBatchEndTime = System.currentTimeMillis() + maxBatchDurationMillis;
+
+            while (eventList.size() < this.getBatchSize() &&
+                    System.currentTimeMillis() < maxBatchEndTime) {
+                BlockingQueue<Map<String, Object>> blockingQueue = StringSink.getQueue();
+                while (blockingQueue != null && !blockingQueue.isEmpty()) {
+                    Map<String, Object> message = blockingQueue.take();
+                    eventBody = message.get("body").toString();
+                    event = EventBuilder.withBody(eventBody.getBytes());
+                    eventList.add(event);
+                }
+            }
+            if (eventList.size() > 0) {
+                counter.addToEventReceivedCount((long) eventList.size());
+                getChannelProcessor().processEventBatch(eventList);
+                eventList.clear();
+                return Status.READY;
+            }
+            return Status.BACKOFF;
+
+        } catch (Exception e) {
+            log.error("Flume Source EXCEPTION, {}", e);
+            counter.incrementEventReadOrChannelFail(e);
+            return Status.BACKOFF;
+        }
+    }
+
+    @Override
+    public long getBatchSize() {
+        return batchSize;
+    }
+
+}
diff --git a/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/sink/StringSink.java b/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/sink/StringSink.java
new file mode 100644
index 0000000..a05950a
--- /dev/null
+++ b/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/sink/StringSink.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.flume.sink;
+
+
+import org.apache.pulsar.functions.api.Record;
+
+public class StringSink extends AbstractSink<String> {
+
+    @Override
+    public String extractValue(Record<String> message) {
+        return message.getValue();
+    }
+}
diff --git a/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/source/AbstractSinkOfFlume.java b/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/source/AbstractSinkOfFlume.java
new file mode 100644
index 0000000..81313fd
--- /dev/null
+++ b/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/source/AbstractSinkOfFlume.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.flume.source;
+
+import org.apache.flume.sink.AbstractSink;
+
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+
+public abstract class AbstractSinkOfFlume extends AbstractSink {
+
+    protected static BlockingQueue<Map<String, Object>> records;
+
+    public static BlockingQueue getQueue() {
+        return records;
+    }
+}
diff --git a/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/source/AbstractSource.java b/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/source/AbstractSource.java
new file mode 100644
index 0000000..8c80512
--- /dev/null
+++ b/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/source/AbstractSource.java
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.flume.source;
+
+import lombok.Getter;
+import lombok.Setter;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.flume.FlumeConfig;
+import org.apache.pulsar.io.flume.FlumeConnector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+
+import org.apache.pulsar.io.core.PushSource;
+import org.apache.pulsar.io.core.SourceContext;
+
+import java.util.*;
+import java.util.concurrent.BlockingQueue;
+
+/**
+ * A Simple abstract source class for flume to pulsar.
+ */
+public abstract class AbstractSource<V> extends PushSource<V> {
+
+    private static final Logger log = LoggerFactory
+            .getLogger(AbstractSource.class);
+
+    protected Thread thread = null;
+
+    protected volatile boolean running = false;
+
+    protected final Thread.UncaughtExceptionHandler handler = new Thread.UncaughtExceptionHandler() {
+
+        @Override
+        public void uncaughtException(Thread t, Throwable e) {
+            log.error("[{}] parse events has an error", t.getName(), e);
+        }
+    };
+
+    @Override
+    public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {
+
+        FlumeConfig flumeConfig = FlumeConfig.load(config);
+
+        FlumeConnector flumeConnector = new FlumeConnector();
+        flumeConnector.StartConnector(flumeConfig);
+
+        this.start();
+
+    }
+
+    public abstract V extractValue(String message);
+
+    protected void start() {
+        thread = new Thread(new Runnable() {
+
+            @Override
+            public void run() {
+                process();
+            }
+        });
+
+        thread.setName("flume source thread");
+        thread.setUncaughtExceptionHandler(handler);
+        running = true;
+        thread.start();
+    }
+
+    @Override
+    public void close() throws InterruptedException {
+        log.info("close flume source");
+        if (!running) {
+            return;
+        }
+        running = false;
+        if (thread != null) {
+            thread.interrupt();
+            thread.join();
+        }
+    }
+
+    protected void process() {
+        while (running) {
+            try {
+                log.info("start flume receive from sink process");
+                while (running) {
+                    BlockingQueue<Map<String, Object>> blockingQueue = SinkOfFlume.getQueue();
+                    while (blockingQueue != null && !blockingQueue.isEmpty()) {
+                        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+                        ObjectOutput out = null;
+                        out = new ObjectOutputStream(bos);
+                        Map<String, Object> message = blockingQueue.take();
+                        out.writeObject(message.get("body"));
+                        out.flush();
+                        byte[] m = bos.toByteArray();
+                        String m1 = new String(m);
+                        bos.close();
+                        FlumeRecord flumeRecord = new FlumeRecord<>();
+                        flumeRecord.setRecord(extractValue(m1));
+                        consume(flumeRecord);
+                    }
+                }
+            } catch (Exception e) {
+                log.error("process error!", e);
+            }
+        }
+    }
+
+    @Getter
+    @Setter
+    static private class FlumeRecord<V> implements Record<V> {
+        private V record;
+        private Long id;
+
+        @Override
+        public Optional<String> getKey() {
+            return Optional.of(Long.toString(id));
+        }
+
+        @Override
+        public V getValue() {
+            return record;
+        }
+    }
+
+}
diff --git a/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/source/SinkOfFlume.java b/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/source/SinkOfFlume.java
new file mode 100644
index 0000000..f990bf0
--- /dev/null
+++ b/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/source/SinkOfFlume.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.flume.source;
+
+import org.apache.flume.*;
+import org.apache.flume.conf.BatchSizeSupported;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.instrumentation.SinkCounter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import static org.apache.flume.source.SpoolDirectorySourceConfigurationConstants.BATCH_SIZE;
+
+public class SinkOfFlume extends AbstractSinkOfFlume implements Configurable, BatchSizeSupported {
+
+    private static final Logger LOG = LoggerFactory.getLogger(SinkOfFlume.class);
+
+    private long batchSize;
+
+    private SinkCounter counter = null;
+
+    @Override
+    public void configure(Context context) {
+        batchSize = context.getInteger(BATCH_SIZE, 1000);
+    }
+
+    @Override
+    public long getBatchSize() {
+        return batchSize;
+    }
+
+
+    @Override
+    public Status process() throws EventDeliveryException {
+        Status result = Status.READY;
+        Channel channel = getChannel();
+        Transaction transaction = null;
+        Event event = null;
+
+        try {
+            transaction = channel.getTransaction();
+            transaction.begin();
+            long processedEvents = 0;
+            for (; processedEvents < batchSize; processedEvents += 1) {
+                event = channel.take();
+
+                if (event == null) {
+                    // no events available in the channel
+                    break;
+                }
+                if (processedEvents == 0) {
+                    result = Status.BACKOFF;
+                    counter.incrementBatchEmptyCount();
+                } else if (processedEvents < batchSize) {
+                    counter.incrementBatchUnderflowCount();
+                } else {
+                    counter.incrementBatchCompleteCount();
+                }
+                event.getHeaders();
+                event.getBody();
+                Map<String, Object> m = new HashMap();
+                m.put("headers", event.getHeaders());
+                m.put("body", event.getBody());
+                records.put(m);
+            }
+            transaction.commit();
+        } catch (Exception ex) {
+            String errorMsg = "Failed to publish events";
+            LOG.error("Failed to publish events", ex);
+            counter.incrementEventWriteOrChannelFail(ex);
+            result = Status.BACKOFF;
+            if (transaction != null) {
+                try {
+                    // If the transaction wasn't committed before we got the exception, we
+                    // need to rollback.
+                    transaction.rollback();
+                } catch (RuntimeException e) {
+                    LOG.error("Transaction rollback failed: " + e.getLocalizedMessage());
+                    LOG.debug("Exception follows.", e);
+                } finally {
+                    transaction.close();
+                    transaction = null;
+                }
+            }
+        } finally {
+            if (transaction != null) {
+                transaction.close();
+            }
+        }
+        return result;
+    }
+
+    @Override
+    public synchronized void start() {
+        records = new LinkedBlockingQueue<Map<String, Object>>();
+        this.counter = new SinkCounter("flume-sink");
+    }
+
+    @Override
+    public synchronized void stop() {
+    }
+
+}
diff --git a/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/source/StringSource.java b/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/source/StringSource.java
new file mode 100644
index 0000000..3c25986
--- /dev/null
+++ b/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/source/StringSource.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.flume.source;
+
+
+public class StringSource extends AbstractSource<String> {
+
+    @Override
+    public String extractValue(String message) {
+        return message;
+    }
+}
diff --git a/pulsar-io/flume/src/main/resources/META-INF/services/pulsar-io.yaml b/pulsar-io/flume/src/main/resources/META-INF/services/pulsar-io.yaml
new file mode 100644
index 0000000..0e578b8
--- /dev/null
+++ b/pulsar-io/flume/src/main/resources/META-INF/services/pulsar-io.yaml
@@ -0,0 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+name: flume
+description: flume source and sink connector
+sourceClass: org.apache.pulsar.io.flume.source.StringSource
+sinkClass: org.apache.pulsar.io.flume.sink.StringSink
\ No newline at end of file
diff --git a/pulsar-io/flume/src/main/resources/flume/flume-io-sink.yaml b/pulsar-io/flume/src/main/resources/flume/flume-io-sink.yaml
new file mode 100644
index 0000000..4850d31
--- /dev/null
+++ b/pulsar-io/flume/src/main/resources/flume/flume-io-sink.yaml
@@ -0,0 +1,25 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+configs:
+  name: a1
+  confFile: sink.conf
+  noReloadConf: false
+  zkConnString: ""
+  zkBasePath: ""
\ No newline at end of file
diff --git a/pulsar-io/flume/src/main/resources/flume/flume-io-source.yaml b/pulsar-io/flume/src/main/resources/flume/flume-io-source.yaml
new file mode 100644
index 0000000..2df778c
--- /dev/null
+++ b/pulsar-io/flume/src/main/resources/flume/flume-io-source.yaml
@@ -0,0 +1,25 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+configs:
+  name: a1
+  confFile: source.conf
+  noReloadConf: false
+  zkConnString: ""
+  zkBasePath: ""
\ No newline at end of file
diff --git a/pulsar-io/flume/src/main/resources/flume/sink.conf b/pulsar-io/flume/src/main/resources/flume/sink.conf
new file mode 100644
index 0000000..8006195
--- /dev/null
+++ b/pulsar-io/flume/src/main/resources/flume/sink.conf
@@ -0,0 +1,41 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# example.conf: A single-node Flume configuration
+
+# Name the components on this agent
+a1.sources = r1
+a1.sinks = k1
+a1.channels = c1
+
+# Describe/configure the source
+a1.sources.r1.type = netcat
+a1.sources.r1.bind = localhost
+a1.sources.r1.port = 44444
+
+# Describe the sink
+a1.sinks.k1.type = org.apache.pulsar.io.flume.source.SinkOfFlume
+# Use a channel which buffers events in memory
+a1.channels.c1.type = memory
+a1.channels.c1.capacity = 1000
+a1.channels.c1.transactionCapacity = 1000
+
+# Bind the source and sink to the channel
+a1.sources.r1.channels = c1
+a1.sinks.k1.channel = c1
\ No newline at end of file
diff --git a/pulsar-io/flume/src/main/resources/flume/source.conf b/pulsar-io/flume/src/main/resources/flume/source.conf
new file mode 100644
index 0000000..93c713b
--- /dev/null
+++ b/pulsar-io/flume/src/main/resources/flume/source.conf
@@ -0,0 +1,41 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# example.conf: A single-node Flume configuration
+
+# Name the components on this agent
+a1.sources = r1
+a1.sinks = k1
+a1.channels = c1
+
+# Describe/configure the source
+a1.sources.r1.type = org.apache.pulsar.io.flume.sink.SourceOfFlume
+
+# Describe the sink
+a1.sinks.k1.type = avro
+a1.sinks.k1.hostname = 127.0.0.1
+a1.sinks.k1.port = 44444
+# Use a channel which buffers events in memory
+a1.channels.c1.type = memory
+a1.channels.c1.capacity = 1000
+a1.channels.c1.transactionCapacity = 1000
+
+# Bind the source and sink to the channel
+a1.sources.r1.channels = c1
+a1.sinks.k1.channel = c1
\ No newline at end of file
diff --git a/pulsar-io/flume/src/test/java/org/apache/pulsar/io/flume/AbstractFlumeTests.java b/pulsar-io/flume/src/test/java/org/apache/pulsar/io/flume/AbstractFlumeTests.java
new file mode 100644
index 0000000..c9fe6c6
--- /dev/null
+++ b/pulsar-io/flume/src/test/java/org/apache/pulsar/io/flume/AbstractFlumeTests.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.flume;
+
+
+public abstract class AbstractFlumeTests {
+
+}
diff --git a/pulsar-io/flume/src/test/java/org/apache/pulsar/io/flume/node/TestAbstractConfigurationProvider.java b/pulsar-io/flume/src/test/java/org/apache/pulsar/io/flume/node/TestAbstractConfigurationProvider.java
new file mode 100644
index 0000000..ed2cf3f
--- /dev/null
+++ b/pulsar-io/flume/src/test/java/org/apache/pulsar/io/flume/node/TestAbstractConfigurationProvider.java
@@ -0,0 +1,370 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.flume.node;
+
+import com.google.common.collect.Maps;
+import junit.framework.Assert;
+import org.apache.flume.Channel;
+import org.apache.flume.ChannelException;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.Transaction;
+import org.apache.flume.annotations.Disposable;
+import org.apache.flume.annotations.Recyclable;
+import org.apache.flume.channel.AbstractChannel;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.conf.FlumeConfiguration;
+import org.apache.flume.sink.AbstractSink;
+import org.apache.flume.source.AbstractSource;
+import org.junit.Test;
+
+import java.util.Map;
+
+public class TestAbstractConfigurationProvider {
+
+    @Test
+    public void testDispoableChannel() throws Exception {
+        String agentName = "agent1";
+        Map<String, String> properties = getPropertiesForChannel(agentName,
+                DisposableChannel.class.getName());
+        MemoryConfigurationProvider provider =
+                new MemoryConfigurationProvider(agentName, properties);
+        MaterializedConfiguration config1 = provider.getConfiguration();
+        Channel channel1 = config1.getChannels().values().iterator().next();
+        Assert.assertTrue(channel1 instanceof DisposableChannel);
+        MaterializedConfiguration config2 = provider.getConfiguration();
+        Channel channel2 = config2.getChannels().values().iterator().next();
+        Assert.assertTrue(channel2 instanceof DisposableChannel);
+        Assert.assertNotSame(channel1, channel2);
+    }
+
+    @Test
+    public void testReusableChannel() throws Exception {
+        String agentName = "agent1";
+        Map<String, String> properties = getPropertiesForChannel(agentName,
+                RecyclableChannel.class.getName());
+        MemoryConfigurationProvider provider =
+                new MemoryConfigurationProvider(agentName, properties);
+
+        MaterializedConfiguration config1 = provider.getConfiguration();
+        Channel channel1 = config1.getChannels().values().iterator().next();
+        Assert.assertTrue(channel1 instanceof RecyclableChannel);
+
+        MaterializedConfiguration config2 = provider.getConfiguration();
+        Channel channel2 = config2.getChannels().values().iterator().next();
+        Assert.assertTrue(channel2 instanceof RecyclableChannel);
+
+        Assert.assertSame(channel1, channel2);
+    }
+
+    @Test
+    public void testUnspecifiedChannel() throws Exception {
+        String agentName = "agent1";
+        Map<String, String> properties = getPropertiesForChannel(agentName,
+                UnspecifiedChannel.class.getName());
+        MemoryConfigurationProvider provider =
+                new MemoryConfigurationProvider(agentName, properties);
+
+        MaterializedConfiguration config1 = provider.getConfiguration();
+        Channel channel1 = config1.getChannels().values().iterator().next();
+        Assert.assertTrue(channel1 instanceof UnspecifiedChannel);
+
+        MaterializedConfiguration config2 = provider.getConfiguration();
+        Channel channel2 = config2.getChannels().values().iterator().next();
+        Assert.assertTrue(channel2 instanceof UnspecifiedChannel);
+
+        Assert.assertSame(channel1, channel2);
+    }
+
+    @Test
+    public void testReusableChannelNotReusedLater() throws Exception {
+        String agentName = "agent1";
+        Map<String, String> propertiesReusable = getPropertiesForChannel(agentName,
+                RecyclableChannel.class
+                        .getName());
+        Map<String, String> propertiesDispoable = getPropertiesForChannel(agentName,
+                DisposableChannel.class
+                        .getName());
+        MemoryConfigurationProvider provider =
+                new MemoryConfigurationProvider(agentName, propertiesReusable);
+        MaterializedConfiguration config1 = provider.getConfiguration();
+        Channel channel1 = config1.getChannels().values().iterator().next();
+        Assert.assertTrue(channel1 instanceof RecyclableChannel);
+
+        provider.setProperties(propertiesDispoable);
+        MaterializedConfiguration config2 = provider.getConfiguration();
+        Channel channel2 = config2.getChannels().values().iterator().next();
+        Assert.assertTrue(channel2 instanceof DisposableChannel);
+
+        provider.setProperties(propertiesReusable);
+        MaterializedConfiguration config3 = provider.getConfiguration();
+        Channel channel3 = config3.getChannels().values().iterator().next();
+        Assert.assertTrue(channel3 instanceof RecyclableChannel);
+
+        Assert.assertNotSame(channel1, channel3);
+    }
+
+    @Test
+    public void testSourceThrowsExceptionDuringConfiguration() throws Exception {
+        String agentName = "agent1";
+        String sourceType = UnconfigurableSource.class.getName();
+        String channelType = "memory";
+        String sinkType = "null";
+        Map<String, String> properties = getProperties(agentName, sourceType,
+                channelType, sinkType);
+        MemoryConfigurationProvider provider =
+                new MemoryConfigurationProvider(agentName, properties);
+        MaterializedConfiguration config = provider.getConfiguration();
+        Assert.assertTrue(config.getSourceRunners().size() == 0);
+        Assert.assertTrue(config.getChannels().size() == 1);
+        Assert.assertTrue(config.getSinkRunners().size() == 1);
+    }
+
+    @Test
+    public void testChannelThrowsExceptionDuringConfiguration() throws Exception {
+        String agentName = "agent1";
+        String sourceType = "seq";
+        String channelType = UnconfigurableChannel.class.getName();
+        String sinkType = "null";
+        Map<String, String> properties = getProperties(agentName, sourceType,
+                channelType, sinkType);
+        MemoryConfigurationProvider provider =
+                new MemoryConfigurationProvider(agentName, properties);
+        MaterializedConfiguration config = provider.getConfiguration();
+        Assert.assertTrue(config.getSourceRunners().size() == 0);
+        Assert.assertTrue(config.getChannels().size() == 0);
+        Assert.assertTrue(config.getSinkRunners().size() == 0);
+    }
+
+    @Test
+    public void testSinkThrowsExceptionDuringConfiguration() throws Exception {
+        String agentName = "agent1";
+        String sourceType = "seq";
+        String channelType = "memory";
+        String sinkType = UnconfigurableSink.class.getName();
+        Map<String, String> properties = getProperties(agentName, sourceType,
+                channelType, sinkType);
+        MemoryConfigurationProvider provider =
+                new MemoryConfigurationProvider(agentName, properties);
+        MaterializedConfiguration config = provider.getConfiguration();
+        Assert.assertTrue(config.getSourceRunners().size() == 1);
+        Assert.assertTrue(config.getChannels().size() == 1);
+        Assert.assertTrue(config.getSinkRunners().size() == 0);
+    }
+
+    @Test
+    public void testSourceAndSinkThrowExceptionDuringConfiguration()
+            throws Exception {
+        String agentName = "agent1";
+        String sourceType = UnconfigurableSource.class.getName();
+        String channelType = "memory";
+        String sinkType = UnconfigurableSink.class.getName();
+        Map<String, String> properties = getProperties(agentName, sourceType,
+                channelType, sinkType);
+        MemoryConfigurationProvider provider =
+                new MemoryConfigurationProvider(agentName, properties);
+        MaterializedConfiguration config = provider.getConfiguration();
+        Assert.assertTrue(config.getSourceRunners().size() == 0);
+        Assert.assertTrue(config.getChannels().size() == 0);
+        Assert.assertTrue(config.getSinkRunners().size() == 0);
+    }
+
+    @Test
+    public void testSinkSourceMismatchDuringConfiguration() throws Exception {
+        String agentName = "agent1";
+        String sourceType = "seq";
+        String channelType = "memory";
+        String sinkType = "avro";
+        Map<String, String> properties = getProperties(agentName, sourceType,
+                channelType, sinkType);
+        properties.put(agentName + ".channels.channel1.capacity", "1000");
+        properties.put(agentName + ".channels.channel1.transactionCapacity", "1000");
+        properties.put(agentName + ".sources.source1.batchSize", "1000");
+        properties.put(agentName + ".sinks.sink1.batch-size", "1000");
+        properties.put(agentName + ".sinks.sink1.hostname", "10.10.10.10");
+        properties.put(agentName + ".sinks.sink1.port", "1010");
+
+        MemoryConfigurationProvider provider =
+                new MemoryConfigurationProvider(agentName, properties);
+        MaterializedConfiguration config = provider.getConfiguration();
+        Assert.assertTrue(config.getSourceRunners().size() == 1);
+        Assert.assertTrue(config.getChannels().size() == 1);
+        Assert.assertTrue(config.getSinkRunners().size() == 1);
+
+        properties.put(agentName + ".sources.source1.batchSize", "1001");
+        properties.put(agentName + ".sinks.sink1.batch-size", "1000");
+
+        provider = new MemoryConfigurationProvider(agentName, properties);
+        config = provider.getConfiguration();
+        Assert.assertTrue(config.getSourceRunners().size() == 0);
+        Assert.assertTrue(config.getChannels().size() == 1);
+        Assert.assertTrue(config.getSinkRunners().size() == 1);
+
+        properties.put(agentName + ".sources.source1.batchSize", "1000");
+        properties.put(agentName + ".sinks.sink1.batch-size", "1001");
+
+        provider = new MemoryConfigurationProvider(agentName, properties);
+        config = provider.getConfiguration();
+        Assert.assertTrue(config.getSourceRunners().size() == 1);
+        Assert.assertTrue(config.getChannels().size() == 1);
+        Assert.assertTrue(config.getSinkRunners().size() == 0);
+
+        properties.put(agentName + ".sources.source1.batchSize", "1001");
+        properties.put(agentName + ".sinks.sink1.batch-size", "1001");
+
+        provider = new MemoryConfigurationProvider(agentName, properties);
+        config = provider.getConfiguration();
+        Assert.assertTrue(config.getSourceRunners().size() == 0);
+        Assert.assertTrue(config.getChannels().size() == 0);
+        Assert.assertTrue(config.getSinkRunners().size() == 0);
+    }
+
+    private Map<String, String> getProperties(String agentName,
+                                              String sourceType, String channelType,
+                                              String sinkType) {
+        Map<String, String> properties = Maps.newHashMap();
+        properties.put(agentName + ".sources", "source1");
+        properties.put(agentName + ".channels", "channel1");
+        properties.put(agentName + ".sinks", "sink1");
+        properties.put(agentName + ".sources.source1.type", sourceType);
+        properties.put(agentName + ".sources.source1.channels", "channel1");
+        properties.put(agentName + ".channels.channel1.type", channelType);
+        properties.put(agentName + ".channels.channel1.capacity", "100");
+        properties.put(agentName + ".sinks.sink1.type", sinkType);
+        properties.put(agentName + ".sinks.sink1.channel", "channel1");
+        return properties;
+    }
+
+    private Map<String, String> getPropertiesForChannel(String agentName, String channelType) {
+        return getProperties(agentName, "seq", channelType, "null");
+    }
+
+    public static class MemoryConfigurationProvider extends AbstractConfigurationProvider {
+        private Map<String, String> properties;
+
+        public MemoryConfigurationProvider(String agentName, Map<String, String> properties) {
+            super(agentName);
+            this.properties = properties;
+        }
+
+        public void setProperties(Map<String, String> properties) {
+            this.properties = properties;
+        }
+
+        @Override
+        protected FlumeConfiguration getFlumeConfiguration() {
+            return new FlumeConfiguration(properties);
+        }
+    }
+
+    @Disposable
+    public static class DisposableChannel extends AbstractChannel {
+        @Override
+        public void put(Event event) throws ChannelException {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public Event take() throws ChannelException {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public Transaction getTransaction() {
+            throw new UnsupportedOperationException();
+        }
+    }
+
+    @Recyclable
+    public static class RecyclableChannel extends AbstractChannel {
+        @Override
+        public void put(Event event) throws ChannelException {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public Event take() throws ChannelException {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public Transaction getTransaction() {
+            throw new UnsupportedOperationException();
+        }
+    }
+
+    public static class UnspecifiedChannel extends AbstractChannel {
+        @Override
+        public void put(Event event) throws ChannelException {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public Event take() throws ChannelException {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public Transaction getTransaction() {
+            throw new UnsupportedOperationException();
+        }
+    }
+
+    public static class UnconfigurableChannel extends AbstractChannel {
+        @Override
+        public void configure(Context context) {
+            throw new RuntimeException("expected");
+        }
+
+        @Override
+        public void put(Event event) throws ChannelException {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public Event take() throws ChannelException {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public Transaction getTransaction() {
+            throw new UnsupportedOperationException();
+        }
+    }
+
+    public static class UnconfigurableSource extends AbstractSource implements Configurable {
+        @Override
+        public void configure(Context context) {
+            throw new RuntimeException("expected");
+        }
+    }
+
+    public static class UnconfigurableSink extends AbstractSink implements Configurable {
+        @Override
+        public void configure(Context context) {
+            throw new RuntimeException("expected");
+        }
+
+        @Override
+        public Status process() throws EventDeliveryException {
+            throw new UnsupportedOperationException();
+        }
+    }
+}
diff --git a/pulsar-io/flume/src/test/java/org/apache/pulsar/io/flume/node/TestAbstractZooKeeperConfigurationProvider.java b/pulsar-io/flume/src/test/java/org/apache/pulsar/io/flume/node/TestAbstractZooKeeperConfigurationProvider.java
new file mode 100644
index 0000000..107e96a
--- /dev/null
+++ b/pulsar-io/flume/src/test/java/org/apache/pulsar/io/flume/node/TestAbstractZooKeeperConfigurationProvider.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.flume.node;
+
+import com.google.common.base.Charsets;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import junit.framework.Assert;
+import org.apache.commons.io.IOUtils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.curator.test.TestingServer;
+import org.apache.curator.utils.EnsurePath;
+import org.apache.flume.conf.FlumeConfiguration;
+import org.apache.flume.conf.FlumeConfigurationError;
+import org.junit.After;
+import org.junit.Before;
+
+import java.io.InputStreamReader;
+import java.io.Reader;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+public abstract class TestAbstractZooKeeperConfigurationProvider {
+
+    private static final String FLUME_CONF_FILE = "flume-conf.properties";
+
+    protected static final String AGENT_NAME = "a1";
+
+    protected static final String AGENT_PATH =
+            AbstractZooKeeperConfigurationProvider.DEFAULT_ZK_BASE_PATH + "/" + AGENT_NAME;
+
+    protected TestingServer zkServer;
+    protected CuratorFramework client;
+
+    @Before
+    public void setUp() throws Exception {
+        zkServer = new TestingServer();
+        client = CuratorFrameworkFactory
+                .newClient("localhost:" + zkServer.getPort(),
+                        new ExponentialBackoffRetry(1000, 3));
+        client.start();
+
+        EnsurePath ensurePath = new EnsurePath(AGENT_PATH);
+        ensurePath.ensure(client.getZookeeperClient());
+        doSetUp();
+    }
+
+    protected abstract void doSetUp() throws Exception;
+
+    @After
+    public void tearDown() throws Exception {
+        doTearDown();
+        zkServer.close();
+        client.close();
+    }
+
+    protected abstract void doTearDown() throws Exception;
+
+    protected void addData() throws Exception {
+        Reader in = new InputStreamReader(getClass().getClassLoader()
+                .getResourceAsStream(FLUME_CONF_FILE), Charsets.UTF_8);
+        try {
+            String config = IOUtils.toString(in);
+            client.setData().forPath(AGENT_PATH, config.getBytes());
+        } finally {
+            in.close();
+        }
+    }
+
+    protected void verifyProperties(AbstractConfigurationProvider cp) {
+        FlumeConfiguration configuration = cp.getFlumeConfiguration();
+        Assert.assertNotNull(configuration);
+
+    /*
+     * Test the known errors in the file
+     */
+        List<String> expected = Lists.newArrayList();
+        expected.add("host5 CONFIG_ERROR");
+        expected.add("host5 INVALID_PROPERTY");
+        expected.add("host4 CONFIG_ERROR");
+        expected.add("host4 CONFIG_ERROR");
+        expected.add("host4 PROPERTY_VALUE_NULL");
+        expected.add("host4 PROPERTY_VALUE_NULL");
+        expected.add("host4 PROPERTY_VALUE_NULL");
+        expected.add("host4 AGENT_CONFIGURATION_INVALID");
+        expected.add("ch2 ATTRS_MISSING");
+        expected.add("host3 CONFIG_ERROR");
+        expected.add("host3 PROPERTY_VALUE_NULL");
+        expected.add("host3 AGENT_CONFIGURATION_INVALID");
+        expected.add("host2 PROPERTY_VALUE_NULL");
+        expected.add("host2 AGENT_CONFIGURATION_INVALID");
+        List<String> actual = Lists.newArrayList();
+        for (FlumeConfigurationError error : configuration.getConfigurationErrors()) {
+            actual.add(error.getComponentName() + " " + error.getErrorType().toString());
+        }
+        Collections.sort(expected);
+        Collections.sort(actual);
+        Assert.assertEquals(expected, actual);
+
+        FlumeConfiguration.AgentConfiguration agentConfiguration = configuration
+                .getConfigurationFor("host1");
+        Assert.assertNotNull(agentConfiguration);
+
+        Set<String> sources = Sets.newHashSet("source1");
+        Set<String> sinks = Sets.newHashSet("sink1");
+        Set<String> channels = Sets.newHashSet("channel1");
+
+        Assert.assertEquals(sources, agentConfiguration.getSourceSet());
+        Assert.assertEquals(sinks, agentConfiguration.getSinkSet());
+        Assert.assertEquals(channels, agentConfiguration.getChannelSet());
+    }
+}
diff --git a/pulsar-io/flume/src/test/java/org/apache/pulsar/io/flume/node/TestApplication.java b/pulsar-io/flume/src/test/java/org/apache/pulsar/io/flume/node/TestApplication.java
new file mode 100644
index 0000000..187ef67
--- /dev/null
+++ b/pulsar-io/flume/src/test/java/org/apache/pulsar/io/flume/node/TestApplication.java
@@ -0,0 +1,198 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.flume.node;
+
+import static org.mockito.Mockito.*;
+
+import java.io.File;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.flume.Channel;
+import org.apache.flume.SinkRunner;
+import org.apache.flume.SourceRunner;
+import org.apache.flume.lifecycle.LifecycleAware;
+import org.apache.flume.lifecycle.LifecycleState;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import com.google.common.collect.Lists;
+import com.google.common.eventbus.EventBus;
+import com.google.common.io.Files;
+
+public class TestApplication {
+
+    private File baseDir;
+
+    @Before
+    public void setup() throws Exception {
+        baseDir = Files.createTempDir();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        FileUtils.deleteDirectory(baseDir);
+    }
+
+    private <T extends LifecycleAware> T mockLifeCycle(Class<T> klass) {
+
+        T lifeCycleAware = mock(klass);
+
+        final AtomicReference<LifecycleState> state =
+                new AtomicReference<LifecycleState>();
+
+        state.set(LifecycleState.IDLE);
+
+        when(lifeCycleAware.getLifecycleState()).then(new Answer<LifecycleState>() {
+            @Override
+            public LifecycleState answer(InvocationOnMock invocation)
+                    throws Throwable {
+                return state.get();
+            }
+        });
+
+        doAnswer(new Answer<Void>() {
+            @Override
+            public Void answer(InvocationOnMock invocation) throws Throwable {
+                state.set(LifecycleState.START);
+                return null;
+            }
+        }).when(lifeCycleAware).start();
+
+        doAnswer(new Answer<Void>() {
+            @Override
+            public Void answer(InvocationOnMock invocation) throws Throwable {
+                state.set(LifecycleState.STOP);
+                return null;
+            }
+        }).when(lifeCycleAware).stop();
+
+        return lifeCycleAware;
+    }
+
+    @Test
+    public void testBasicConfiguration() throws Exception {
+
+        EventBus eventBus = new EventBus("test-event-bus");
+
+        MaterializedConfiguration materializedConfiguration = new
+                SimpleMaterializedConfiguration();
+
+        SourceRunner sourceRunner = mockLifeCycle(SourceRunner.class);
+        materializedConfiguration.addSourceRunner("test", sourceRunner);
+
+        SinkRunner sinkRunner = mockLifeCycle(SinkRunner.class);
+        materializedConfiguration.addSinkRunner("test", sinkRunner);
+
+        Channel channel = mockLifeCycle(Channel.class);
+        materializedConfiguration.addChannel("test", channel);
+
+
+        ConfigurationProvider configurationProvider = mock(ConfigurationProvider.class);
+        when(configurationProvider.getConfiguration()).thenReturn(materializedConfiguration);
+
+        Application application = new Application();
+        eventBus.register(application);
+        eventBus.post(materializedConfiguration);
+        application.start();
+
+        Thread.sleep(1000L);
+
+        verify(sourceRunner).start();
+        verify(sinkRunner).start();
+        verify(channel).start();
+
+        application.stop();
+
+        Thread.sleep(1000L);
+
+        verify(sourceRunner).stop();
+        verify(sinkRunner).stop();
+        verify(channel).stop();
+    }
+
+    @Test
+    public void testFLUME1854() throws Exception {
+        File configFile = new File(baseDir, "flume-conf.properties");
+        Files.copy(new File(getClass().getClassLoader()
+                .getResource("flume-conf.properties").getFile()), configFile);
+        Random random = new Random();
+        for (int i = 0; i < 3; i++) {
+            EventBus eventBus = new EventBus("test-event-bus");
+            PollingPropertiesFileConfigurationProvider configurationProvider =
+                    new PollingPropertiesFileConfigurationProvider("host1",
+                            configFile, eventBus, 1);
+            List<LifecycleAware> components = Lists.newArrayList();
+            components.add(configurationProvider);
+            Application application = new Application(components);
+            eventBus.register(application);
+            application.start();
+            Thread.sleep(random.nextInt(10000));
+            application.stop();
+        }
+    }
+
+    @Test(timeout = 10000L)
+    public void testFLUME2786() throws Exception {
+        final String agentName = "test";
+        final int interval = 1;
+        final long intervalMs = 1000L;
+
+        File configFile = new File(baseDir, "flume-conf.properties");
+        Files.copy(new File(getClass().getClassLoader()
+                .getResource("flume-conf.properties.2786").getFile()), configFile);
+        File mockConfigFile = spy(configFile);
+        when(mockConfigFile.lastModified()).then(new Answer<Long>() {
+            @Override
+            public Long answer(InvocationOnMock invocation) throws Throwable {
+                Thread.sleep(intervalMs);
+                return System.currentTimeMillis();
+            }
+        });
+
+        EventBus eventBus = new EventBus(agentName + "-event-bus");
+        PollingPropertiesFileConfigurationProvider configurationProvider =
+                new PollingPropertiesFileConfigurationProvider(agentName,
+                        mockConfigFile, eventBus, interval);
+        PollingPropertiesFileConfigurationProvider mockConfigurationProvider =
+                spy(configurationProvider);
+        doAnswer(new Answer<Void>() {
+            @Override
+            public Void answer(InvocationOnMock invocation) throws Throwable {
+                Thread.sleep(intervalMs);
+                invocation.callRealMethod();
+                return null;
+            }
+        }).when(mockConfigurationProvider).stop();
+
+        List<LifecycleAware> components = Lists.newArrayList();
+        components.add(mockConfigurationProvider);
+        Application application = new Application(components);
+        eventBus.register(application);
+        application.start();
+        Thread.sleep(1500L);
+        application.stop();
+    }
+
+}
diff --git a/pulsar-io/flume/src/test/java/org/apache/pulsar/io/flume/node/TestEnvVarResolverProperties.java b/pulsar-io/flume/src/test/java/org/apache/pulsar/io/flume/node/TestEnvVarResolverProperties.java
new file mode 100644
index 0000000..dc62c2b
--- /dev/null
+++ b/pulsar-io/flume/src/test/java/org/apache/pulsar/io/flume/node/TestEnvVarResolverProperties.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.flume.node;
+
+import junit.framework.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.contrib.java.lang.system.EnvironmentVariables;
+
+import java.io.File;
+
+public class TestEnvVarResolverProperties {
+    private static final File TESTFILE = new File(
+            TestEnvVarResolverProperties.class.getClassLoader()
+                    .getResource("flume-conf-with-envvars.properties").getFile());
+
+    @Rule
+    public final EnvironmentVariables environmentVariables = new EnvironmentVariables();
+    private PropertiesFileConfigurationProvider provider;
+
+    @Before
+    public void setUp() throws Exception {
+        provider = new PropertiesFileConfigurationProvider("a1", TESTFILE);
+    }
+
+    @Test
+    public void resolveEnvVar() throws Exception {
+        environmentVariables.set("VARNAME", "varvalue");
+        String resolved = EnvVarResolverProperties.resolveEnvVars("padding ${VARNAME} padding");
+        Assert.assertEquals("padding varvalue padding", resolved);
+    }
+
+    @Test
+    public void resolveEnvVars() throws Exception {
+        environmentVariables.set("VARNAME1", "varvalue1");
+        environmentVariables.set("VARNAME2", "varvalue2");
+        String resolved = EnvVarResolverProperties
+                .resolveEnvVars("padding ${VARNAME1} ${VARNAME2} padding");
+        Assert.assertEquals("padding varvalue1 varvalue2 padding", resolved);
+    }
+
+    @Test
+    public void getProperty() throws Exception {
+        String NC_PORT = "6667";
+        environmentVariables.set("NC_PORT", NC_PORT);
+        System.setProperty("propertiesImplementation",
+                "org.apache.pulsar.io.flume.node.EnvVarResolverProperties");
+
+        Assert.assertEquals(NC_PORT, provider.getFlumeConfiguration()
+                .getConfigurationFor("a1")
+                .getSourceContext().get("r1").getParameters().get("port"));
+    }
+}
diff --git a/pulsar-io/flume/src/test/java/org/apache/pulsar/io/flume/node/TestPollingPropertiesFileConfigurationProvider.java b/pulsar-io/flume/src/test/java/org/apache/pulsar/io/flume/node/TestPollingPropertiesFileConfigurationProvider.java
new file mode 100644
index 0000000..4f559b5
--- /dev/null
+++ b/pulsar-io/flume/src/test/java/org/apache/pulsar/io/flume/node/TestPollingPropertiesFileConfigurationProvider.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.flume.node;
+
+import java.io.File;
+import java.util.List;
+
+import junit.framework.Assert;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.flume.lifecycle.LifecycleController;
+import org.apache.flume.lifecycle.LifecycleState;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+import com.google.common.eventbus.EventBus;
+import com.google.common.eventbus.Subscribe;
+import com.google.common.io.Files;
+
+public class TestPollingPropertiesFileConfigurationProvider {
+
+    private static final File TESTFILE = new File(
+            TestPollingPropertiesFileConfigurationProvider.class.getClassLoader()
+                    .getResource("flume-conf.properties").getFile());
+
+    private PollingPropertiesFileConfigurationProvider provider;
+    private File baseDir;
+    private File configFile;
+    private EventBus eventBus;
+
+    @Before
+    public void setUp() throws Exception {
+
+        baseDir = Files.createTempDir();
+
+        configFile = new File(baseDir, TESTFILE.getName());
+        Files.copy(TESTFILE, configFile);
+
+        eventBus = new EventBus("test");
+        provider =
+                new PollingPropertiesFileConfigurationProvider("host1",
+                        configFile, eventBus, 1);
+        provider.start();
+        LifecycleController.waitForOneOf(provider, LifecycleState.START_OR_ERROR);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        FileUtils.deleteDirectory(baseDir);
+        provider.stop();
+    }
+
+    @Test
+    public void testPolling() throws Exception {
+
+        // let first event fire
+        Thread.sleep(2000L);
+
+        final List<MaterializedConfiguration> events = Lists.newArrayList();
+
+        Object eventHandler = new Object() {
+            @Subscribe
+            public synchronized void handleConfigurationEvent(MaterializedConfiguration event) {
+                events.add(event);
+            }
+        };
+        eventBus.register(eventHandler);
+        configFile.setLastModified(System.currentTimeMillis());
+
+        // now wait for second event to fire
+        Thread.sleep(2000L);
+
+        Assert.assertEquals(String.valueOf(events), 1, events.size());
+
+        MaterializedConfiguration materializedConfiguration = events.remove(0);
+
+        Assert.assertEquals(1, materializedConfiguration.getSourceRunners().size());
+        Assert.assertEquals(1, materializedConfiguration.getSinkRunners().size());
+        Assert.assertEquals(1, materializedConfiguration.getChannels().size());
+
+
+    }
+}
diff --git a/pulsar-io/flume/src/test/java/org/apache/pulsar/io/flume/node/TestPollingZooKeeperConfigurationProvider.java b/pulsar-io/flume/src/test/java/org/apache/pulsar/io/flume/node/TestPollingZooKeeperConfigurationProvider.java
new file mode 100644
index 0000000..7191d33
--- /dev/null
+++ b/pulsar-io/flume/src/test/java/org/apache/pulsar/io/flume/node/TestPollingZooKeeperConfigurationProvider.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.flume.node;
+
+import junit.framework.Assert;
+
+import org.apache.flume.conf.FlumeConfiguration;
+import org.apache.flume.conf.FlumeConfiguration.AgentConfiguration;
+import org.apache.flume.lifecycle.LifecycleController;
+import org.apache.flume.lifecycle.LifecycleState;
+import org.junit.Test;
+
+import com.google.common.eventbus.EventBus;
+import com.google.common.eventbus.Subscribe;
+
+public class TestPollingZooKeeperConfigurationProvider extends
+        TestAbstractZooKeeperConfigurationProvider {
+
+    private EventBus eb;
+
+    private EventSync es;
+
+    private PollingZooKeeperConfigurationProvider cp;
+
+    private class EventSync {
+
+        private boolean notified;
+
+        @Subscribe
+        public synchronized void notifyEvent(MaterializedConfiguration mConfig) {
+            notified = true;
+            notifyAll();
+        }
+
+        public synchronized void awaitEvent() throws InterruptedException {
+            while (!notified) {
+                wait();
+            }
+        }
+
+        public synchronized void reset() {
+            notified = false;
+        }
+    }
+
+    @Override
+    protected void doSetUp() throws Exception {
+        eb = new EventBus("test");
+        es = new EventSync();
+        es.reset();
+        eb.register(es);
+        cp = new PollingZooKeeperConfigurationProvider(AGENT_NAME, "localhost:"
+                + zkServer.getPort(), null, eb);
+        cp.start();
+        LifecycleController.waitForOneOf(cp, LifecycleState.START_OR_ERROR);
+    }
+
+    @Override
+    protected void doTearDown() throws Exception {
+        // do nothing
+    }
+
+    @Test
+    public void testPolling() throws Exception {
+        es.awaitEvent();
+        es.reset();
+
+        FlumeConfiguration fc = cp.getFlumeConfiguration();
+        Assert.assertTrue(fc.getConfigurationErrors().isEmpty());
+        AgentConfiguration ac = fc.getConfigurationFor(AGENT_NAME);
+        Assert.assertNull(ac);
+
+        addData();
+        es.awaitEvent();
+        es.reset();
+
+        verifyProperties(cp);
+    }
+}
diff --git a/pulsar-io/flume/src/test/java/org/apache/pulsar/io/flume/node/TestPropertiesFileConfigurationProvider.java b/pulsar-io/flume/src/test/java/org/apache/pulsar/io/flume/node/TestPropertiesFileConfigurationProvider.java
new file mode 100644
index 0000000..7eab211
--- /dev/null
+++ b/pulsar-io/flume/src/test/java/org/apache/pulsar/io/flume/node/TestPropertiesFileConfigurationProvider.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.flume.node;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import junit.framework.Assert;
+import org.apache.flume.conf.FlumeConfiguration;
+import org.apache.flume.conf.FlumeConfiguration.AgentConfiguration;
+import org.apache.flume.conf.FlumeConfigurationError;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+public class TestPropertiesFileConfigurationProvider {
+
+    private static final Logger LOGGER =
+            LoggerFactory.getLogger(TestPropertiesFileConfigurationProvider.class);
+
+    private static final File TESTFILE = new File(
+            TestPropertiesFileConfigurationProvider.class.getClassLoader()
+                    .getResource("flume-conf.properties").getFile());
+
+    private PropertiesFileConfigurationProvider provider;
+
+    @Before
+    public void setUp() throws Exception {
+        provider = new PropertiesFileConfigurationProvider("test", TESTFILE);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+
+    }
+
+    @Test
+    public void testPropertyRead() throws Exception {
+
+        FlumeConfiguration configuration = provider.getFlumeConfiguration();
+        Assert.assertNotNull(configuration);
+
+    /*
+     * Test the known errors in the file
+     */
+        List<String> expected = Lists.newArrayList();
+        expected.add("host5 CONFIG_ERROR");
+        expected.add("host5 INVALID_PROPERTY");
+        expected.add("host4 CONFIG_ERROR");
+        expected.add("host4 CONFIG_ERROR");
+        expected.add("host4 PROPERTY_VALUE_NULL");
+        expected.add("host4 PROPERTY_VALUE_NULL");
+        expected.add("host4 PROPERTY_VALUE_NULL");
+        expected.add("host4 AGENT_CONFIGURATION_INVALID");
+        expected.add("ch2 ATTRS_MISSING");
+        expected.add("host3 CONFIG_ERROR");
+        expected.add("host3 PROPERTY_VALUE_NULL");
+        expected.add("host3 AGENT_CONFIGURATION_INVALID");
+        expected.add("host2 PROPERTY_VALUE_NULL");
+        expected.add("host2 AGENT_CONFIGURATION_INVALID");
+        List<String> actual = Lists.newArrayList();
+        for (FlumeConfigurationError error : configuration.getConfigurationErrors()) {
+            actual.add(error.getComponentName() + " " + error.getErrorType().toString());
+        }
+        Collections.sort(expected);
+        Collections.sort(actual);
+        Assert.assertEquals(expected, actual);
+
+        AgentConfiguration agentConfiguration =
+                configuration.getConfigurationFor("host1");
+        Assert.assertNotNull(agentConfiguration);
+
+        LOGGER.info(agentConfiguration.getPrevalidationConfig());
+        LOGGER.info(agentConfiguration.getPostvalidationConfig());
+
+        Set<String> sources = Sets.newHashSet("source1");
+        Set<String> sinks = Sets.newHashSet("sink1");
+        Set<String> channels = Sets.newHashSet("channel1");
+
+        Assert.assertEquals(sources, agentConfiguration.getSourceSet());
+        Assert.assertEquals(sinks, agentConfiguration.getSinkSet());
+        Assert.assertEquals(channels, agentConfiguration.getChannelSet());
+    }
+}
diff --git a/pulsar-io/flume/src/test/java/org/apache/pulsar/io/flume/node/TestStaticZooKeeperConfigurationProvider.java b/pulsar-io/flume/src/test/java/org/apache/pulsar/io/flume/node/TestStaticZooKeeperConfigurationProvider.java
new file mode 100644
index 0000000..79bd38d
--- /dev/null
+++ b/pulsar-io/flume/src/test/java/org/apache/pulsar/io/flume/node/TestStaticZooKeeperConfigurationProvider.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.flume.node;
+
+import org.junit.Test;
+
+public class TestStaticZooKeeperConfigurationProvider extends
+        TestAbstractZooKeeperConfigurationProvider {
+
+    private StaticZooKeeperConfigurationProvider configurationProvider;
+
+    @Override
+    protected void doSetUp() throws Exception {
+        addData();
+        configurationProvider = new StaticZooKeeperConfigurationProvider(
+                AGENT_NAME, "localhost:" + zkServer.getPort(), null);
+    }
+
+    @Override
+    protected void doTearDown() throws Exception {
+        // do nothing
+    }
+
+    @Test
+    public void testPropertyRead() throws Exception {
+        verifyProperties(configurationProvider);
+    }
+}
\ No newline at end of file
diff --git a/pulsar-io/flume/src/test/java/org/apache/pulsar/io/flume/sink/StringSinkTests.java b/pulsar-io/flume/src/test/java/org/apache/pulsar/io/flume/sink/StringSinkTests.java
new file mode 100644
index 0000000..714deac
--- /dev/null
+++ b/pulsar-io/flume/src/test/java/org/apache/pulsar/io/flume/sink/StringSinkTests.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.flume.sink;
+
+import com.google.common.collect.Maps;
+import org.apache.flume.*;
+import org.apache.flume.channel.ChannelProcessor;
+import org.apache.flume.channel.MemoryChannel;
+import org.apache.flume.channel.ReplicatingChannelSelector;
+import org.apache.flume.conf.Configurables;
+import org.apache.flume.source.AvroSource;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.SinkContext;
+import org.apache.pulsar.io.flume.AbstractFlumeTests;
+import org.mockito.Mock;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+import org.junit.Assert;
+
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.mockito.Mockito.*;
+
+public class StringSinkTests extends AbstractFlumeTests {
+
+    @Mock
+    protected SinkContext mockSinkContext;
+
+    @Mock
+    protected Record<String> mockRecord;
+
+
+    private AvroSource source;
+    private Channel channel;
+    private InetAddress localhost;
+
+    @BeforeMethod
+    public void setUp() throws Exception {
+        mockRecord = mock(Record.class);
+        mockSinkContext = mock(SinkContext.class);
+        localhost = InetAddress.getByName("127.0.0.1");
+        source = new AvroSource();
+        channel = new MemoryChannel();
+        Context context = new Context();
+        context.put("port", String.valueOf(44444));
+        context.put("bind", "0.0.0.0");
+
+        Configurables.configure(source, context);
+        Configurables.configure(channel, context);
+
+        List<Channel> channels = new ArrayList<Channel>();
+        channels.add(channel);
+
+        ChannelSelector rcs = new ReplicatingChannelSelector();
+        rcs.setChannels(channels);
+
+        source.setChannelProcessor(new ChannelProcessor(rcs));
+
+        source.start();
+
+        when(mockRecord.getKey()).thenAnswer(new Answer<Optional<String>>() {
+            long sequenceCounter = 0;
+
+            public Optional<String> answer(InvocationOnMock invocation) throws Throwable {
+                return Optional.of("key-" + sequenceCounter++);
+            }
+        });
+
+        when(mockRecord.getValue()).thenAnswer(new Answer<String>() {
+            long sequenceCounter = 0;
+
+            public String answer(InvocationOnMock invocation) throws Throwable {
+                return new String("value-" + sequenceCounter++);
+            }
+        });
+    }
+
+    @AfterMethod
+    public void tearDown() throws Exception {
+        source.stop();
+    }
+
+    protected final void send(StringSink stringSink, int numRecords) throws Exception {
+        for (int idx = 0; idx < numRecords; idx++) {
+            stringSink.write(mockRecord);
+        }
+    }
+
+    @Test
+    public void TestOpenAndWriteSink() throws Exception {
+        Map<String, Object> conf = Maps.newHashMap();
+        StringSink stringSink = new StringSink();
+        conf.put("name", "a1");
+        conf.put("confFile", "./src/test/resources/flume/source.conf");
+        conf.put("noReloadConf", false);
+        conf.put("zkConnString", "");
+        conf.put("zkBasePath", "");
+        stringSink.open(conf, mockSinkContext);
+        send(stringSink, 100);
+
+        Thread.sleep(3 * 1000);
+        Transaction transaction = channel.getTransaction();
+        transaction.begin();
+        Event event = channel.take();
+
+        Assert.assertNotNull(event);
+        Assert.assertNotNull(mockRecord);
+
+        verify(mockRecord, times(100)).ack();
+        transaction.commit();
+        transaction.close();
+    }
+}
diff --git a/pulsar-io/flume/src/test/java/org/apache/pulsar/io/flume/source/StringSourceTests.java b/pulsar-io/flume/src/test/java/org/apache/pulsar/io/flume/source/StringSourceTests.java
new file mode 100644
index 0000000..8a3ebda
--- /dev/null
+++ b/pulsar-io/flume/src/test/java/org/apache/pulsar/io/flume/source/StringSourceTests.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.flume.source;
+
+import com.google.common.base.Charsets;
+import com.google.common.collect.Maps;
+import org.apache.flume.*;
+import org.apache.flume.conf.Configurables;
+import org.apache.flume.event.EventBuilder;
+import org.apache.pulsar.io.core.SourceContext;
+import org.mockito.Mock;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.apache.flume.sink.AvroSink;
+import org.apache.flume.channel.MemoryChannel;
+import org.junit.Assert;
+
+import java.util.Map;
+
+import org.apache.pulsar.io.flume.AbstractFlumeTests;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.*;
+
+public class StringSourceTests extends AbstractFlumeTests {
+
+    private AvroSink sink;
+
+    private Channel channel;
+
+    @Mock
+    private SourceContext mockSourceContext;
+
+    @BeforeMethod
+    public void setUp() throws Exception {
+        if (sink != null) {
+            throw new RuntimeException("double setup");
+        }
+        Context context = new Context();
+        context.put("hostname", "127.0.0.1");
+        context.put("port", "44444");
+        context.put("batch-size", String.valueOf(2));
+        context.put("connect-timeout", String.valueOf(2000L));
+        context.put("request-timeout", String.valueOf(3000L));
+        sink = new AvroSink();
+        channel = new MemoryChannel();
+        sink.setChannel(channel);
+        Configurables.configure(sink, context);
+        Configurables.configure(channel, context);
+
+        mockSourceContext = mock(SourceContext.class);
+    }
+
+    @AfterMethod
+    public void tearDown() throws Exception {
+        sink.stop();
+    }
+
+
+    @Test
+    public void TestOpenAndReadSource() throws Exception {
+        Map<String, Object> conf = Maps.newHashMap();
+        StringSource stringSource = new StringSource();
+        conf.put("name", "a1");
+        conf.put("confFile", "./src/test/resources/flume/sink.conf");
+        conf.put("noReloadConf", false);
+        conf.put("zkConnString", "");
+        conf.put("zkBasePath", "");
+        Event event = EventBuilder.withBody("test event 1", Charsets.UTF_8);
+        stringSource.open(conf, mockSourceContext);
+        Thread.sleep(3 * 1000);
+        sink.start();
+        Transaction transaction = channel.getTransaction();
+
+        transaction.begin();
+        for (int i = 0; i < 10; i++) {
+            channel.put(event);
+        }
+        transaction.commit();
+        transaction.close();
+
+        for (int i = 0; i < 5; i++) {
+            Sink.Status status = sink.process();
+            Assert.assertEquals(Sink.Status.READY, status);
+        }
+
+        Assert.assertEquals(Sink.Status.BACKOFF, sink.process());
+        stringSource.close();
+    }
+}
diff --git a/pulsar-io/flume/src/test/resources/flume-conf-with-envvars.properties b/pulsar-io/flume/src/test/resources/flume-conf-with-envvars.properties
new file mode 100644
index 0000000..bc73d41
--- /dev/null
+++ b/pulsar-io/flume/src/test/resources/flume-conf-with-envvars.properties
@@ -0,0 +1,36 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+a1.sources = r1
+a1.sources.r1.type = netcat
+a1.sources.r1.bind = 0.0.0.0
+a1.sources.r1.port = ${NC_PORT}
+a1.sources.r1.channels = c1
+
+a1.channels = c1
+a1.channels.c1.type = memory
+a1.channels.c1.capacity = 10000
+a1.channels.c1.transactionCapacity = 10000
+a1.channels.c1.byteCapacityBufferPercentage = 20
+a1.channels.c1.byteCapacity = 800000
+
+a1.channels = c1
+a1.sinks = k1
+a1.sinks.k1.type = logger
+a1.sinks.k1.channel = c1
diff --git a/pulsar-io/flume/src/test/resources/flume-conf.properties b/pulsar-io/flume/src/test/resources/flume-conf.properties
new file mode 100644
index 0000000..744314d
--- /dev/null
+++ b/pulsar-io/flume/src/test/resources/flume-conf.properties
@@ -0,0 +1,115 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+#
+# Flume Configuration
+# This file contains configuration for one Agent identified as host1.
+# This file also contains invalid configuration for few agents
+# host2, host3 etc.
+#
+
+host1.sources = source1
+host1.channels = channel1
+host1.sinks = sink1
+
+# avroSource configuration
+host1.sources.source1.type = seq
+host1.sources.source1.channels = channel1
+
+# memChannel1 configuration
+host1.channels.channel1.type = memory
+host1.channels.channel1.capacity = 10000
+
+
+# hdfsSink configuration
+host1.sinks.sink1.type = null
+host1.sinks.sink1.channel = channel1
+
+#
+# Agent configuration for host2 - invalid because channels is not
+# defined.
+#
+host2.sources = src1
+host2.sinks = sink1
+
+host2.sources.src1.type = foo
+host2.sources.src1.runner = xxx
+host2.sources.src1.runner.type = ttt
+host2.sinks.sink1.type = bar
+host2.sinks.sink1.runner = yyy
+host2.sinks.sink1.runner.type = yyy
+
+#
+# Agent configuration for host3 - invalid because the effective set of
+# channels is 0 since configured ones are not active, and active ones are
+# not configured.
+#
+host3.sources = src1 src2
+host3.channels = ch1 ch2
+
+host3.sources.src1.type = foo
+host3.sources.src1.runner.type = x
+host3.sources.src1.channels = ch1 ch3
+
+host3.channels.ch2.foo = bar
+host3.channels.ch3.type = foo
+host3.channels.ch3.xxx = yyy
+
+#
+# Agent configuration for host4 - invalid, same as host3 except that this
+# time one channel configuration is valid but no sources or sinks are
+# configured correctly.
+#
+host4.sources = src2
+host4.channels = ch1 ch2
+
+host4.sources.src1.type = foo
+host4.sources.src1.runner.type = x
+host4.sources.src1.channels = ch1 ch2
+
+host4.channels.ch2.foo = bar
+host4.channels.ch2.type = abc
+host4.channels.ch3.type = foo
+host4.channels.ch3.xxx = yyy
+
+#
+# Agent configuration for host5 - valid using a sinkgroup with a failover processor
+# One of the sinks isn't properly configured but the group should let it fail and drop down
+# to two sinks
+#
+
+host5.sources = src1
+host5.channels = ch1
+host5.sinks = sink1 sink2 sink3
+host5.sinkgroups = sg1
+
+host5.channels.ch1.type = abc
+
+host5.sources.src1.type = def
+host5.sources.src1.channels = ch1
+
+host5.sinks.sink1.type = foo
+host5.sinks.sink1.channel = ch1
+host5.sinks.sink2.type = bar
+host5.sinks.sink2.channel = ch1
+
+host5.sinkgroups.sg1.sinks = sink1 sink2 sink3
+host5.sinkgroups.sg1.policy.type = failover
+host5.sinkgroups.sg1.policy.priority.sink1 = 1
+host5.sinkgroups.sg1.policy.priority.sink2 = 2
diff --git a/pulsar-io/flume/src/test/resources/flume-conf.properties.2786 b/pulsar-io/flume/src/test/resources/flume-conf.properties.2786
new file mode 100755
index 0000000..2a7bea0
--- /dev/null
+++ b/pulsar-io/flume/src/test/resources/flume-conf.properties.2786
@@ -0,0 +1,35 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+#
+# Flume Configuration for testing FLUME-2786
+#
+
+test.sources = source1
+test.channels = channel1
+test.sinks = sink1
+
+test.sources.source1.type = seq
+test.sources.source1.totalEvents = 10000
+test.sources.source1.channels = channel1
+
+test.channels.channel1.type = memory
+test.channels.channel1.capacity = 10000
+
+test.sinks.sink1.type = null
+test.sinks.sink1.channel = channel1
diff --git a/pulsar-io/flume/src/test/resources/flume/sink.conf b/pulsar-io/flume/src/test/resources/flume/sink.conf
new file mode 100644
index 0000000..e45e6f4
--- /dev/null
+++ b/pulsar-io/flume/src/test/resources/flume/sink.conf
@@ -0,0 +1,41 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# example.conf: A single-node Flume configuration
+
+# Name the components on this agent
+a1.sources = r1
+a1.sinks = k1
+a1.channels = c1
+
+# Describe/configure the source
+a1.sources.r1.type = avro
+a1.sources.r1.bind = 127.0.0.1
+a1.sources.r1.port = 44444
+
+# Describe the sink
+a1.sinks.k1.type = org.apache.pulsar.io.flume.source.SinkOfFlume
+# Use a channel which buffers events in memory
+a1.channels.c1.type = memory
+a1.channels.c1.capacity = 1000
+a1.channels.c1.transactionCapacity = 1000
+
+# Bind the source and sink to the channel
+a1.sources.r1.channels = c1
+a1.sinks.k1.channel = c1
\ No newline at end of file
diff --git a/pulsar-io/flume/src/test/resources/flume/source.conf b/pulsar-io/flume/src/test/resources/flume/source.conf
new file mode 100644
index 0000000..93c713b
--- /dev/null
+++ b/pulsar-io/flume/src/test/resources/flume/source.conf
@@ -0,0 +1,41 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# example.conf: A single-node Flume configuration
+
+# Name the components on this agent
+a1.sources = r1
+a1.sinks = k1
+a1.channels = c1
+
+# Describe/configure the source
+a1.sources.r1.type = org.apache.pulsar.io.flume.sink.SourceOfFlume
+
+# Describe the sink
+a1.sinks.k1.type = avro
+a1.sinks.k1.hostname = 127.0.0.1
+a1.sinks.k1.port = 44444
+# Use a channel which buffers events in memory
+a1.channels.c1.type = memory
+a1.channels.c1.capacity = 1000
+a1.channels.c1.transactionCapacity = 1000
+
+# Bind the source and sink to the channel
+a1.sources.r1.channels = c1
+a1.sinks.k1.channel = c1
\ No newline at end of file
diff --git a/pulsar-io/flume/src/test/resources/log4j.properties b/pulsar-io/flume/src/test/resources/log4j.properties
new file mode 100644
index 0000000..a98acea
--- /dev/null
+++ b/pulsar-io/flume/src/test/resources/log4j.properties
@@ -0,0 +1,26 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+log4j.rootCategory = INFO, out
+
+log4j.appender.out = org.apache.log4j.ConsoleAppender
+log4j.appender.out.layout = org.apache.log4j.PatternLayout
+log4j.appender.out.layout.ConversionPattern = %d (%t) [%p - %l] %m%n
+
+log4j.logger.org.apache.flume = DEBUG
diff --git a/pulsar-io/pom.xml b/pulsar-io/pom.xml
index 36a770b..ffd1e67 100644
--- a/pulsar-io/pom.xml
+++ b/pulsar-io/pom.xml
@@ -52,6 +52,7 @@
     <module>netty</module>
     <module>hbase</module>
     <module>mongo</module>
+    <module>flume</module>
   </modules>
 
 </project>