[Issue 3275][pulsar-io]Support source and sink of flume (#3597)
* Support TLS authentication and authorization in standalone mode
* Compile success
To do: test channel, sink and source of flume
* Add conf file
* Add sink and source folder
Move file to folder
Add source
Compile success
To do -> test source
* test flume source paas
* Add config file and test case
* Add test and update pom.xml
To do add test of source and sink
* Add unit tests
* Add test case and test pass
To do test source
* Add license
Add test source of pulsar
* Handle if blockingQueue is null
* Move LOG to log
* Format code
* Add sinkClass in pulsar-io.yaml
* Add comment
* Default is false
* Modify pom file of flume
* Format pom.xml file
* Move pom version to 2.4.0-SNAPSHOT
diff --git a/pulsar-io/flume/pom.xml b/pulsar-io/flume/pom.xml
new file mode 100644
index 0000000..21dbc85
--- /dev/null
+++ b/pulsar-io/flume/pom.xml
@@ -0,0 +1,116 @@
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.pulsar</groupId>
+ <artifactId>pulsar-io</artifactId>
+ <version>2.4.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>pulsar-io-flume</artifactId>
+ <name>Pulsar IO :: Flume</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-io-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.fasterxml.jackson.dataformat</groupId>
+ <artifactId>jackson-dataformat-yaml</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flume</groupId>
+ <artifactId>flume-ng-node</artifactId>
+ <version>1.9.0</version>
+ <type>pom</type>
+ <exclusions>
+ <exclusion>
+ <artifactId>avro-ipc</artifactId>
+ <groupId>org.apache.avro</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>avro</artifactId>
+ <groupId>org.apache.avro</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ <version>1.8.1</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro-ipc</artifactId>
+ <version>1.8.1</version>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ <version>4.10</version>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <scope>test</scope>
+ <version>1.9.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-test</artifactId>
+ <version>2.6.0</version>
+ </dependency>
+ <dependency>
+ <groupId>com.github.stefanbirkner</groupId>
+ <artifactId>system-rules</artifactId>
+ <scope>test</scope>
+ <version>1.17.0</version>
+ </dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>18.0</version>
+ </dependency>
+ </dependencies>
+
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-nar-maven-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
\ No newline at end of file
diff --git a/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/FlumeConfig.java b/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/FlumeConfig.java
new file mode 100644
index 0000000..54830cb
--- /dev/null
+++ b/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/FlumeConfig.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.flume;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
+import lombok.*;
+import lombok.experimental.Accessors;
+import org.apache.pulsar.io.core.annotations.FieldDoc;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * Flume general config.
+ */
+@Data
+@Setter
+@Getter
+@EqualsAndHashCode
+@ToString
+@Accessors(chain = true)
+public class FlumeConfig {
+
+ @FieldDoc(
+ required = true,
+ defaultValue = "",
+ help = "the name of this agent")
+ private String name;
+ @FieldDoc(
+ required = true,
+ defaultValue = "",
+ help = "specify a config file (required if -z missing)")
+ private String confFile;
+ @FieldDoc(
+ defaultValue = "false",
+ help = "do not reload config file if changed")
+ private Boolean noReloadConf;
+ @FieldDoc(
+ required = true,
+ defaultValue = "",
+ help = "specify the ZooKeeper connection to use (required if -f missing)")
+ private String zkConnString;
+ @FieldDoc(
+ required = true,
+ defaultValue = "",
+ help = "specify the base path in ZooKeeper for agent configs")
+ private String zkBasePath;
+
+ public static FlumeConfig load(String yamlFile) throws IOException {
+ ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
+ return mapper.readValue(new File(yamlFile), FlumeConfig.class);
+ }
+
+
+ public static FlumeConfig load(Map<String, Object> map) throws IOException {
+ ObjectMapper mapper = new ObjectMapper();
+ return mapper.readValue(new ObjectMapper().writeValueAsString(map), FlumeConfig.class);
+ }
+}
diff --git a/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/FlumeConnector.java b/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/FlumeConnector.java
new file mode 100644
index 0000000..29a9847
--- /dev/null
+++ b/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/FlumeConnector.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.flume;
+
+import com.google.common.collect.Lists;
+import com.google.common.eventbus.EventBus;
+import org.apache.commons.cli.ParseException;
+import org.apache.flume.Constants;
+import org.apache.flume.lifecycle.LifecycleAware;
+import org.apache.flume.util.SSLUtil;
+import org.apache.pulsar.io.flume.node.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+
+public class FlumeConnector {
+
+ private static final Logger log = LoggerFactory
+ .getLogger(FlumeConnector.class);
+
+ protected Application application;
+
+ public void StartConnector(FlumeConfig flumeConfig) throws Exception {
+ SSLUtil.initGlobalSSLParameters();
+ String agentName = flumeConfig.getName();
+ boolean reload = !flumeConfig.getNoReloadConf();
+ boolean isZkConfigured = false;
+ if (flumeConfig.getZkConnString().length() > 0) {
+ isZkConfigured = true;
+ }
+ if (isZkConfigured) {
+ // get options
+ String zkConnectionStr = flumeConfig.getZkConnString();
+ String baseZkPath = flumeConfig.getZkBasePath();
+ if (reload) {
+ EventBus eventBus = new EventBus(agentName + "-event-bus");
+ List<LifecycleAware> components = Lists.newArrayList();
+ PollingZooKeeperConfigurationProvider zookeeperConfigurationProvider =
+ new PollingZooKeeperConfigurationProvider(
+ agentName, zkConnectionStr, baseZkPath, eventBus);
+ components.add(zookeeperConfigurationProvider);
+ application = new Application(components);
+ eventBus.register(application);
+ } else {
+ StaticZooKeeperConfigurationProvider zookeeperConfigurationProvider =
+ new StaticZooKeeperConfigurationProvider(
+ agentName, zkConnectionStr, baseZkPath);
+ application = new Application();
+ application.handleConfigurationEvent(zookeeperConfigurationProvider.getConfiguration());
+ }
+
+ } else {
+ File configurationFile = new File(flumeConfig.getConfFile());
+ /*
+ * The following is to ensure that by default the agent will fail on
+ * startup if the file does not exist.
+ */
+ if (!configurationFile.exists()) {
+ // If command line invocation, then need to fail fast
+ if (System.getProperty(Constants.SYSPROP_CALLED_FROM_SERVICE) ==
+ null) {
+ String path = configurationFile.getPath();
+ try {
+ path = configurationFile.getCanonicalPath();
+ } catch (IOException ex) {
+ log.error("Failed to read canonical path for file: " + path,
+ ex);
+ }
+ throw new ParseException("The specified configuration file does not exist: " + path);
+ }
+ }
+ List<LifecycleAware> components = Lists.newArrayList();
+
+ if (reload) {
+ EventBus eventBus = new EventBus(agentName + "-event-bus");
+ PollingPropertiesFileConfigurationProvider configurationProvider =
+ new PollingPropertiesFileConfigurationProvider(
+ agentName, configurationFile, eventBus, 30);
+ components.add(configurationProvider);
+ application = new Application(components);
+ eventBus.register(application);
+ } else {
+ PropertiesFileConfigurationProvider configurationProvider =
+ new PropertiesFileConfigurationProvider(agentName, configurationFile);
+ application = new Application();
+ application.handleConfigurationEvent(configurationProvider.getConfiguration());
+ }
+ }
+ application.start();
+
+ final Application appReference = application;
+ Runtime.getRuntime().addShutdownHook(new Thread("agent-shutdown-hook") {
+ @Override
+ public void run() {
+ appReference.stop();
+ }
+ });
+ }
+
+ public void stop() {
+ if (application != null) {
+ application.stop();
+ }
+ }
+}
diff --git a/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/node/AbstractConfigurationProvider.java b/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/node/AbstractConfigurationProvider.java
new file mode 100644
index 0000000..36bbe55
--- /dev/null
+++ b/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/node/AbstractConfigurationProvider.java
@@ -0,0 +1,559 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.flume.node;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.Set;
+
+import org.apache.flume.Channel;
+import org.apache.flume.ChannelFactory;
+import org.apache.flume.ChannelSelector;
+import org.apache.flume.Context;
+import org.apache.flume.FlumeException;
+import org.apache.flume.Sink;
+import org.apache.flume.SinkFactory;
+import org.apache.flume.SinkProcessor;
+import org.apache.flume.SinkRunner;
+import org.apache.flume.Source;
+import org.apache.flume.SourceFactory;
+import org.apache.flume.SourceRunner;
+import org.apache.flume.annotations.Disposable;
+import org.apache.flume.channel.ChannelProcessor;
+import org.apache.flume.channel.ChannelSelectorFactory;
+import org.apache.flume.channel.DefaultChannelFactory;
+import org.apache.flume.conf.BasicConfigurationConstants;
+import org.apache.flume.conf.BatchSizeSupported;
+import org.apache.flume.conf.ComponentConfiguration;
+import org.apache.flume.conf.Configurables;
+import org.apache.flume.conf.FlumeConfiguration;
+import org.apache.flume.conf.FlumeConfiguration.AgentConfiguration;
+import org.apache.flume.conf.TransactionCapacitySupported;
+import org.apache.flume.conf.channel.ChannelSelectorConfiguration;
+import org.apache.flume.conf.sink.SinkConfiguration;
+import org.apache.flume.conf.sink.SinkGroupConfiguration;
+import org.apache.flume.conf.source.SourceConfiguration;
+import org.apache.flume.sink.DefaultSinkFactory;
+import org.apache.flume.sink.DefaultSinkProcessor;
+import org.apache.flume.sink.SinkGroup;
+import org.apache.flume.source.DefaultSourceFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ListMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+public abstract class AbstractConfigurationProvider implements ConfigurationProvider {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(AbstractConfigurationProvider.class);
+
+ private final String agentName;
+ private final SourceFactory sourceFactory;
+ private final SinkFactory sinkFactory;
+ private final ChannelFactory channelFactory;
+
+ private final Map<Class<? extends Channel>, Map<String, Channel>> channelCache;
+
+ public AbstractConfigurationProvider(String agentName) {
+ super();
+ this.agentName = agentName;
+ this.sourceFactory = new DefaultSourceFactory();
+ this.sinkFactory = new DefaultSinkFactory();
+ this.channelFactory = new DefaultChannelFactory();
+
+ channelCache = new HashMap<Class<? extends Channel>, Map<String, Channel>>();
+ }
+
+ protected abstract FlumeConfiguration getFlumeConfiguration();
+
+ public MaterializedConfiguration getConfiguration() {
+ MaterializedConfiguration conf = new SimpleMaterializedConfiguration();
+ FlumeConfiguration fconfig = getFlumeConfiguration();
+ AgentConfiguration agentConf = fconfig.getConfigurationFor(getAgentName());
+ if (agentConf != null) {
+ Map<String, ChannelComponent> channelComponentMap = Maps.newHashMap();
+ Map<String, SourceRunner> sourceRunnerMap = Maps.newHashMap();
+ Map<String, SinkRunner> sinkRunnerMap = Maps.newHashMap();
+ try {
+ loadChannels(agentConf, channelComponentMap);
+ loadSources(agentConf, channelComponentMap, sourceRunnerMap);
+ loadSinks(agentConf, channelComponentMap, sinkRunnerMap);
+ Set<String> channelNames = new HashSet<String>(channelComponentMap.keySet());
+ for (String channelName : channelNames) {
+ ChannelComponent channelComponent = channelComponentMap.get(channelName);
+ if (channelComponent.components.isEmpty()) {
+ LOGGER.warn(String.format("Channel %s has no components connected" +
+ " and has been removed.", channelName));
+ channelComponentMap.remove(channelName);
+ Map<String, Channel> nameChannelMap =
+ channelCache.get(channelComponent.channel.getClass());
+ if (nameChannelMap != null) {
+ nameChannelMap.remove(channelName);
+ }
+ } else {
+ LOGGER.info(String.format("Channel %s connected to %s",
+ channelName, channelComponent.components.toString()));
+ conf.addChannel(channelName, channelComponent.channel);
+ }
+ }
+ for (Map.Entry<String, SourceRunner> entry : sourceRunnerMap.entrySet()) {
+ conf.addSourceRunner(entry.getKey(), entry.getValue());
+ }
+ for (Map.Entry<String, SinkRunner> entry : sinkRunnerMap.entrySet()) {
+ conf.addSinkRunner(entry.getKey(), entry.getValue());
+ }
+ } catch (InstantiationException ex) {
+ LOGGER.error("Failed to instantiate component", ex);
+ } finally {
+ channelComponentMap.clear();
+ sourceRunnerMap.clear();
+ sinkRunnerMap.clear();
+ }
+ } else {
+ LOGGER.warn("No configuration found for this host:{}", getAgentName());
+ }
+ return conf;
+ }
+
+ public String getAgentName() {
+ return agentName;
+ }
+
+ private void loadChannels(AgentConfiguration agentConf,
+ Map<String, ChannelComponent> channelComponentMap)
+ throws InstantiationException {
+ LOGGER.info("Creating channels");
+
+ /*
+ * Some channels will be reused across re-configurations. To handle this,
+ * we store all the names of current channels, perform the reconfiguration,
+ * and then if a channel was not used, we delete our reference to it.
+ * This supports the scenario where you enable channel "ch0" then remove it
+ * and add it back. Without this, channels like memory channel would cause
+ * the first instances data to show up in the seconds.
+ */
+ ListMultimap<Class<? extends Channel>, String> channelsNotReused =
+ ArrayListMultimap.create();
+ // assume all channels will not be re-used
+ for (Map.Entry<Class<? extends Channel>, Map<String, Channel>> entry :
+ channelCache.entrySet()) {
+ Class<? extends Channel> channelKlass = entry.getKey();
+ Set<String> channelNames = entry.getValue().keySet();
+ channelsNotReused.get(channelKlass).addAll(channelNames);
+ }
+
+ Set<String> channelNames = agentConf.getChannelSet();
+ Map<String, ComponentConfiguration> compMap = agentConf.getChannelConfigMap();
+ /*
+ * Components which have a ComponentConfiguration object
+ */
+ for (String chName : channelNames) {
+ ComponentConfiguration comp = compMap.get(chName);
+ if (comp != null) {
+ Channel channel = getOrCreateChannel(channelsNotReused,
+ comp.getComponentName(), comp.getType());
+ try {
+ Configurables.configure(channel, comp);
+ channelComponentMap.put(comp.getComponentName(),
+ new ChannelComponent(channel));
+ LOGGER.info("Created channel " + chName);
+ } catch (Exception e) {
+ String msg = String.format("Channel %s has been removed due to an " +
+ "error during configuration", chName);
+ LOGGER.error(msg, e);
+ }
+ }
+ }
+ /*
+ * Components which DO NOT have a ComponentConfiguration object
+ * and use only Context
+ */
+ for (String chName : channelNames) {
+ Context context = agentConf.getChannelContext().get(chName);
+ if (context != null) {
+ Channel channel = getOrCreateChannel(channelsNotReused, chName,
+ context.getString(BasicConfigurationConstants.CONFIG_TYPE));
+ try {
+ Configurables.configure(channel, context);
+ channelComponentMap.put(chName, new ChannelComponent(channel));
+ LOGGER.info("Created channel " + chName);
+ } catch (Exception e) {
+ String msg = String.format("Channel %s has been removed due to an " +
+ "error during configuration", chName);
+ LOGGER.error(msg, e);
+ }
+ }
+ }
+ /*
+ * Any channel which was not re-used, will have it's reference removed
+ */
+ for (Class<? extends Channel> channelKlass : channelsNotReused.keySet()) {
+ Map<String, Channel> channelMap = channelCache.get(channelKlass);
+ if (channelMap != null) {
+ for (String channelName : channelsNotReused.get(channelKlass)) {
+ if (channelMap.remove(channelName) != null) {
+ LOGGER.info("Removed {} of type {}", channelName, channelKlass);
+ }
+ }
+ if (channelMap.isEmpty()) {
+ channelCache.remove(channelKlass);
+ }
+ }
+ }
+ }
+
+ private Channel getOrCreateChannel(
+ ListMultimap<Class<? extends Channel>, String> channelsNotReused,
+ String name, String type)
+ throws FlumeException {
+
+ Class<? extends Channel> channelClass = channelFactory.getClass(type);
+ /*
+ * Channel has requested a new instance on each re-configuration
+ */
+ if (channelClass.isAnnotationPresent(Disposable.class)) {
+ Channel channel = channelFactory.create(name, type);
+ channel.setName(name);
+ return channel;
+ }
+ Map<String, Channel> channelMap = channelCache.get(channelClass);
+ if (channelMap == null) {
+ channelMap = new HashMap<String, Channel>();
+ channelCache.put(channelClass, channelMap);
+ }
+ Channel channel = channelMap.get(name);
+ if (channel == null) {
+ channel = channelFactory.create(name, type);
+ channel.setName(name);
+ channelMap.put(name, channel);
+ }
+ channelsNotReused.get(channelClass).remove(name);
+ return channel;
+ }
+
+ private void loadSources(AgentConfiguration agentConf,
+ Map<String, ChannelComponent> channelComponentMap,
+ Map<String, SourceRunner> sourceRunnerMap)
+ throws InstantiationException {
+
+ Set<String> sourceNames = agentConf.getSourceSet();
+ Map<String, ComponentConfiguration> compMap =
+ agentConf.getSourceConfigMap();
+ /*
+ * Components which have a ComponentConfiguration object
+ */
+ for (String sourceName : sourceNames) {
+ ComponentConfiguration comp = compMap.get(sourceName);
+ if (comp != null) {
+ SourceConfiguration config = (SourceConfiguration) comp;
+
+ Source source = sourceFactory.create(comp.getComponentName(),
+ comp.getType());
+ try {
+ Configurables.configure(source, config);
+ Set<String> channelNames = config.getChannels();
+ List<Channel> sourceChannels =
+ getSourceChannels(channelComponentMap, source, channelNames);
+ if (sourceChannels.isEmpty()) {
+ String msg = String.format("Source %s is not connected to a " +
+ "channel", sourceName);
+ throw new IllegalStateException(msg);
+ }
+ ChannelSelectorConfiguration selectorConfig =
+ config.getSelectorConfiguration();
+
+ ChannelSelector selector = ChannelSelectorFactory.create(
+ sourceChannels, selectorConfig);
+
+ ChannelProcessor channelProcessor = new ChannelProcessor(selector);
+ Configurables.configure(channelProcessor, config);
+
+ source.setChannelProcessor(channelProcessor);
+ sourceRunnerMap.put(comp.getComponentName(),
+ SourceRunner.forSource(source));
+ for (Channel channel : sourceChannels) {
+ ChannelComponent channelComponent =
+ Preconditions.checkNotNull(channelComponentMap.get(channel.getName()),
+ String.format("Channel %s", channel.getName()));
+ channelComponent.components.add(sourceName);
+ }
+ } catch (Exception e) {
+ String msg = String.format("Source %s has been removed due to an " +
+ "error during configuration", sourceName);
+ LOGGER.error(msg, e);
+ }
+ }
+ }
+ /*
+ * Components which DO NOT have a ComponentConfiguration object
+ * and use only Context
+ */
+ Map<String, Context> sourceContexts = agentConf.getSourceContext();
+ for (String sourceName : sourceNames) {
+ Context context = sourceContexts.get(sourceName);
+ if (context != null) {
+ Source source =
+ sourceFactory.create(sourceName,
+ context.getString(BasicConfigurationConstants.CONFIG_TYPE));
+ try {
+ Configurables.configure(source, context);
+ String[] channelNames = context.getString(
+ BasicConfigurationConstants.CONFIG_CHANNELS).split("\\s+");
+ List<Channel> sourceChannels =
+ getSourceChannels(channelComponentMap, source, Arrays.asList(channelNames));
+ if (sourceChannels.isEmpty()) {
+ String msg = String.format("Source %s is not connected to a " +
+ "channel", sourceName);
+ throw new IllegalStateException(msg);
+ }
+ Map<String, String> selectorConfig = context.getSubProperties(
+ BasicConfigurationConstants.CONFIG_SOURCE_CHANNELSELECTOR_PREFIX);
+
+ ChannelSelector selector = ChannelSelectorFactory.create(
+ sourceChannels, selectorConfig);
+
+ ChannelProcessor channelProcessor = new ChannelProcessor(selector);
+ Configurables.configure(channelProcessor, context);
+ source.setChannelProcessor(channelProcessor);
+ sourceRunnerMap.put(sourceName,
+ SourceRunner.forSource(source));
+ for (Channel channel : sourceChannels) {
+ ChannelComponent channelComponent =
+ Preconditions.checkNotNull(channelComponentMap.get(channel.getName()),
+ String.format("Channel %s", channel.getName()));
+ channelComponent.components.add(sourceName);
+ }
+ } catch (Exception e) {
+ String msg = String.format("Source %s has been removed due to an " +
+ "error during configuration", sourceName);
+ LOGGER.error(msg, e);
+ }
+ }
+ }
+ }
+
+ private List<Channel> getSourceChannels(Map<String, ChannelComponent> channelComponentMap,
+ Source source, Collection<String> channelNames) throws InstantiationException {
+ List<Channel> sourceChannels = new ArrayList<Channel>();
+ for (String chName : channelNames) {
+ ChannelComponent channelComponent = channelComponentMap.get(chName);
+ if (channelComponent != null) {
+ checkSourceChannelCompatibility(source, channelComponent.channel);
+ sourceChannels.add(channelComponent.channel);
+ }
+ }
+ return sourceChannels;
+ }
+
+ private void checkSourceChannelCompatibility(Source source, Channel channel)
+ throws InstantiationException {
+ if (source instanceof BatchSizeSupported && channel instanceof TransactionCapacitySupported) {
+ long transCap = ((TransactionCapacitySupported) channel).getTransactionCapacity();
+ long batchSize = ((BatchSizeSupported) source).getBatchSize();
+ if (transCap < batchSize) {
+ String msg = String.format(
+ "Incompatible source and channel settings defined. " +
+ "source's batch size is greater than the channels transaction capacity. " +
+ "Source: %s, batch size = %d, channel %s, transaction capacity = %d",
+ source.getName(), batchSize,
+ channel.getName(), transCap);
+ throw new InstantiationException(msg);
+ }
+ }
+ }
+
+ private void checkSinkChannelCompatibility(Sink sink, Channel channel)
+ throws InstantiationException {
+ if (sink instanceof BatchSizeSupported && channel instanceof TransactionCapacitySupported) {
+ long transCap = ((TransactionCapacitySupported) channel).getTransactionCapacity();
+ long batchSize = ((BatchSizeSupported) sink).getBatchSize();
+ if (transCap < batchSize) {
+ String msg = String.format(
+ "Incompatible sink and channel settings defined. " +
+ "sink's batch size is greater than the channels transaction capacity. " +
+ "Sink: %s, batch size = %d, channel %s, transaction capacity = %d",
+ sink.getName(), batchSize,
+ channel.getName(), transCap);
+ throw new InstantiationException(msg);
+ }
+ }
+ }
+
+ private void loadSinks(AgentConfiguration agentConf,
+ Map<String, ChannelComponent> channelComponentMap, Map<String, SinkRunner> sinkRunnerMap)
+ throws InstantiationException {
+ Set<String> sinkNames = agentConf.getSinkSet();
+ Map<String, ComponentConfiguration> compMap =
+ agentConf.getSinkConfigMap();
+ Map<String, Sink> sinks = new HashMap<String, Sink>();
+ /*
+ * Components which have a ComponentConfiguration object
+ */
+ for (String sinkName : sinkNames) {
+ ComponentConfiguration comp = compMap.get(sinkName);
+ if (comp != null) {
+ SinkConfiguration config = (SinkConfiguration) comp;
+ Sink sink = sinkFactory.create(comp.getComponentName(), comp.getType());
+ try {
+ Configurables.configure(sink, config);
+ ChannelComponent channelComponent = channelComponentMap.get(config.getChannel());
+ if (channelComponent == null) {
+ String msg = String.format("Sink %s is not connected to a " +
+ "channel", sinkName);
+ throw new IllegalStateException(msg);
+ }
+ checkSinkChannelCompatibility(sink, channelComponent.channel);
+ sink.setChannel(channelComponent.channel);
+ sinks.put(comp.getComponentName(), sink);
+ channelComponent.components.add(sinkName);
+ } catch (Exception e) {
+ String msg = String.format("Sink %s has been removed due to an " +
+ "error during configuration", sinkName);
+ LOGGER.error(msg, e);
+ }
+ }
+ }
+ /*
+ * Components which DO NOT have a ComponentConfiguration object
+ * and use only Context
+ */
+ Map<String, Context> sinkContexts = agentConf.getSinkContext();
+ for (String sinkName : sinkNames) {
+ Context context = sinkContexts.get(sinkName);
+ if (context != null) {
+ Sink sink = sinkFactory.create(sinkName, context.getString(
+ BasicConfigurationConstants.CONFIG_TYPE));
+ try {
+ Configurables.configure(sink, context);
+ ChannelComponent channelComponent =
+ channelComponentMap.get(
+ context.getString(BasicConfigurationConstants.CONFIG_CHANNEL));
+ if (channelComponent == null) {
+ String msg = String.format("Sink %s is not connected to a " +
+ "channel", sinkName);
+ throw new IllegalStateException(msg);
+ }
+ checkSinkChannelCompatibility(sink, channelComponent.channel);
+ sink.setChannel(channelComponent.channel);
+ sinks.put(sinkName, sink);
+ channelComponent.components.add(sinkName);
+ } catch (Exception e) {
+ String msg = String.format("Sink %s has been removed due to an " +
+ "error during configuration", sinkName);
+ LOGGER.error(msg, e);
+ }
+ }
+ }
+
+ loadSinkGroups(agentConf, sinks, sinkRunnerMap);
+ }
+
+ private void loadSinkGroups(AgentConfiguration agentConf,
+ Map<String, Sink> sinks, Map<String, SinkRunner> sinkRunnerMap)
+ throws InstantiationException {
+ Set<String> sinkGroupNames = agentConf.getSinkgroupSet();
+ Map<String, ComponentConfiguration> compMap =
+ agentConf.getSinkGroupConfigMap();
+ Map<String, String> usedSinks = new HashMap<String, String>();
+ for (String groupName : sinkGroupNames) {
+ ComponentConfiguration comp = compMap.get(groupName);
+ if (comp != null) {
+ SinkGroupConfiguration groupConf = (SinkGroupConfiguration) comp;
+ List<Sink> groupSinks = new ArrayList<Sink>();
+ for (String sink : groupConf.getSinks()) {
+ Sink s = sinks.remove(sink);
+ if (s == null) {
+ String sinkUser = usedSinks.get(sink);
+ if (sinkUser != null) {
+ throw new InstantiationException(String.format(
+ "Sink %s of group %s already " +
+ "in use by group %s", sink, groupName, sinkUser));
+ } else {
+ throw new InstantiationException(String.format(
+ "Sink %s of group %s does "
+ + "not exist or is not properly configured", sink,
+ groupName));
+ }
+ }
+ groupSinks.add(s);
+ usedSinks.put(sink, groupName);
+ }
+ try {
+ SinkGroup group = new SinkGroup(groupSinks);
+ Configurables.configure(group, groupConf);
+ sinkRunnerMap.put(comp.getComponentName(),
+ new SinkRunner(group.getProcessor()));
+ } catch (Exception e) {
+ String msg = String.format("SinkGroup %s has been removed due to " +
+ "an error during configuration", groupName);
+ LOGGER.error(msg, e);
+ }
+ }
+ }
+ // add any unassigned sinks to solo collectors
+ for (Entry<String, Sink> entry : sinks.entrySet()) {
+ if (!usedSinks.containsValue(entry.getKey())) {
+ try {
+ SinkProcessor pr = new DefaultSinkProcessor();
+ List<Sink> sinkMap = new ArrayList<Sink>();
+ sinkMap.add(entry.getValue());
+ pr.setSinks(sinkMap);
+ Configurables.configure(pr, new Context());
+ sinkRunnerMap.put(entry.getKey(), new SinkRunner(pr));
+ } catch (Exception e) {
+ String msg = String.format("SinkGroup %s has been removed due to " +
+ "an error during configuration", entry.getKey());
+ LOGGER.error(msg, e);
+ }
+ }
+ }
+ }
+
+ private static class ChannelComponent {
+ final Channel channel;
+ final List<String> components;
+
+ ChannelComponent(Channel channel) {
+ this.channel = channel;
+ components = Lists.newArrayList();
+ }
+ }
+
+ protected Map<String, String> toMap(Properties properties) {
+ Map<String, String> result = Maps.newHashMap();
+ Enumeration<?> propertyNames = properties.propertyNames();
+ while (propertyNames.hasMoreElements()) {
+ String name = (String) propertyNames.nextElement();
+ String value = properties.getProperty(name);
+ result.put(name, value);
+ }
+ return result;
+ }
+}
\ No newline at end of file
diff --git a/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/node/AbstractZooKeeperConfigurationProvider.java b/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/node/AbstractZooKeeperConfigurationProvider.java
new file mode 100644
index 0000000..8bff0e9
--- /dev/null
+++ b/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/node/AbstractZooKeeperConfigurationProvider.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.flume.node;
+
+import java.io.IOException;
+import java.io.StringReader;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Properties;
+
+import com.google.common.base.Charsets;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.flume.conf.FlumeConfiguration;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+
+/**
+ * ZooKeeper based configuration implementation provider.
+ *
+ * The Agent configuration can be uploaded in ZooKeeper under a base name, which
+ * defaults to /flume
+ *
+ * Currently the agent configuration is stored under the agent name node in
+ * ZooKeeper
+ *
+ * <PRE>
+ * /flume
+ * /a1 [agent config file]
+ * /a2 [agent config file]
+ * /a3 [agent config file]
+ * </PRE>
+ *
+ * Configuration format is same as PropertiesFileConfigurationProvider
+ *
+ * Configuration properties
+ *
+ * agentName - Name of Agent for which configuration needs to be pulled
+ *
+ * zkConnString - Connection string to ZooKeeper Ensemble
+ * (host:port,host1:port1)
+ *
+ * basePath - Base Path where agent configuration needs to be stored. Defaults
+ * to /flume
+ */
+public abstract class AbstractZooKeeperConfigurationProvider extends
+ AbstractConfigurationProvider {
+
+ static final String DEFAULT_ZK_BASE_PATH = "/flume";
+
+ protected final String basePath;
+
+ protected final String zkConnString;
+
+ protected AbstractZooKeeperConfigurationProvider(String agentName,
+ String zkConnString, String basePath) {
+ super(agentName);
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(zkConnString),
+ "Invalid Zookeeper Connection String %s", zkConnString);
+ this.zkConnString = zkConnString;
+ if (basePath == null || basePath.isEmpty()) {
+ this.basePath = DEFAULT_ZK_BASE_PATH;
+ } else {
+ this.basePath = basePath;
+ }
+ }
+
+ protected CuratorFramework createClient() {
+ return CuratorFrameworkFactory.newClient(zkConnString,
+ new ExponentialBackoffRetry(1000, 1));
+ }
+
+ protected FlumeConfiguration configFromBytes(byte[] configData)
+ throws IOException {
+ Map<String, String> configMap;
+ if (configData == null || configData.length == 0) {
+ configMap = Collections.emptyMap();
+ } else {
+ String fileContent = new String(configData, Charsets.UTF_8);
+ Properties properties = new Properties();
+ properties.load(new StringReader(fileContent));
+ configMap = toMap(properties);
+ }
+ return new FlumeConfiguration(configMap);
+ }
+}
diff --git a/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/node/Application.java b/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/node/Application.java
new file mode 100644
index 0000000..6538ad2
--- /dev/null
+++ b/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/node/Application.java
@@ -0,0 +1,238 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.flume.node;
+
+import com.google.common.base.Throwables;
+import com.google.common.eventbus.Subscribe;
+import org.apache.flume.Channel;
+import org.apache.flume.Context;
+import org.apache.flume.SinkRunner;
+import org.apache.flume.SourceRunner;
+import org.apache.flume.instrumentation.MonitorService;
+import org.apache.flume.instrumentation.MonitoringType;
+import org.apache.flume.lifecycle.LifecycleAware;
+import org.apache.flume.lifecycle.LifecycleState;
+import org.apache.flume.lifecycle.LifecycleSupervisor;
+import org.apache.flume.lifecycle.LifecycleSupervisor.SupervisorPolicy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class Application {
+
+ private static final Logger logger = LoggerFactory
+ .getLogger(Application.class);
+
+ public static final String CONF_MONITOR_CLASS = "flume.monitoring.type";
+ public static final String CONF_MONITOR_PREFIX = "flume.monitoring.";
+
+ private final List<LifecycleAware> components;
+ private final LifecycleSupervisor supervisor;
+ private MaterializedConfiguration materializedConfiguration;
+ private MonitorService monitorServer;
+ private final ReentrantLock lifecycleLock = new ReentrantLock();
+
+ public Application() {
+ this(new ArrayList<LifecycleAware>(0));
+ }
+
+ public Application(List<LifecycleAware> components) {
+ this.components = components;
+ supervisor = new LifecycleSupervisor();
+ }
+
+ public void start() {
+ lifecycleLock.lock();
+ try {
+ for (LifecycleAware component : components) {
+ supervisor.supervise(component,
+ new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
+ }
+ } finally {
+ lifecycleLock.unlock();
+ }
+ }
+
+ @Subscribe
+ public void handleConfigurationEvent(MaterializedConfiguration conf) {
+ try {
+ lifecycleLock.lockInterruptibly();
+ stopAllComponents();
+ startAllComponents(conf);
+ } catch (InterruptedException e) {
+ logger.info("Interrupted while trying to handle configuration event");
+ return;
+ } finally {
+ // If interrupted while trying to lock, we don't own the lock, so must not attempt to unlock
+ if (lifecycleLock.isHeldByCurrentThread()) {
+ lifecycleLock.unlock();
+ }
+ }
+ }
+
+ public void stop() {
+ lifecycleLock.lock();
+ stopAllComponents();
+ try {
+ supervisor.stop();
+ if (monitorServer != null) {
+ monitorServer.stop();
+ }
+ } finally {
+ lifecycleLock.unlock();
+ }
+ }
+
+ private void stopAllComponents() {
+ if (this.materializedConfiguration != null) {
+ logger.info("Shutting down configuration: {}", this.materializedConfiguration);
+ for (Entry<String, SourceRunner> entry :
+ this.materializedConfiguration.getSourceRunners().entrySet()) {
+ try {
+ logger.info("Stopping Source " + entry.getKey());
+ supervisor.unsupervise(entry.getValue());
+ } catch (Exception e) {
+ logger.error("Error while stopping {}", entry.getValue(), e);
+ }
+ }
+
+ for (Entry<String, SinkRunner> entry :
+ this.materializedConfiguration.getSinkRunners().entrySet()) {
+ try {
+ logger.info("Stopping Sink " + entry.getKey());
+ supervisor.unsupervise(entry.getValue());
+ } catch (Exception e) {
+ logger.error("Error while stopping {}", entry.getValue(), e);
+ }
+ }
+
+ for (Entry<String, Channel> entry :
+ this.materializedConfiguration.getChannels().entrySet()) {
+ try {
+ logger.info("Stopping Channel " + entry.getKey());
+ supervisor.unsupervise(entry.getValue());
+ } catch (Exception e) {
+ logger.error("Error while stopping {}", entry.getValue(), e);
+ }
+ }
+ }
+ if (monitorServer != null) {
+ monitorServer.stop();
+ }
+ }
+
+ private void startAllComponents(MaterializedConfiguration materializedConfiguration) {
+ logger.info("Starting new configuration:{}", materializedConfiguration);
+
+ this.materializedConfiguration = materializedConfiguration;
+
+ for (Entry<String, Channel> entry :
+ materializedConfiguration.getChannels().entrySet()) {
+ try {
+ logger.info("Starting Channel " + entry.getKey());
+ supervisor.supervise(entry.getValue(),
+ new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
+ } catch (Exception e) {
+ logger.error("Error while starting {}", entry.getValue(), e);
+ }
+ }
+
+ /*
+ * Wait for all channels to start.
+ */
+ for (Channel ch : materializedConfiguration.getChannels().values()) {
+ while (ch.getLifecycleState() != LifecycleState.START
+ && !supervisor.isComponentInErrorState(ch)) {
+ try {
+ logger.info("Waiting for channel: " + ch.getName() +
+ " to start. Sleeping for 500 ms");
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+ logger.error("Interrupted while waiting for channel to start.", e);
+ Throwables.propagate(e);
+ }
+ }
+ }
+
+ for (Entry<String, SinkRunner> entry : materializedConfiguration.getSinkRunners().entrySet()) {
+ try {
+ logger.info("Starting Sink " + entry.getKey());
+ supervisor.supervise(entry.getValue(),
+ new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
+ } catch (Exception e) {
+ logger.error("Error while starting {}", entry.getValue(), e);
+ }
+ }
+
+ for (Entry<String, SourceRunner> entry :
+ materializedConfiguration.getSourceRunners().entrySet()) {
+ try {
+ logger.info("Starting Source " + entry.getKey());
+ supervisor.supervise(entry.getValue(),
+ new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
+ } catch (Exception e) {
+ logger.error("Error while starting {}", entry.getValue(), e);
+ }
+ }
+
+ this.loadMonitoring();
+ }
+
+ @SuppressWarnings("unchecked")
+ private void loadMonitoring() {
+ Properties systemProps = System.getProperties();
+ Set<String> keys = systemProps.stringPropertyNames();
+ try {
+ if (keys.contains(CONF_MONITOR_CLASS)) {
+ String monitorType = systemProps.getProperty(CONF_MONITOR_CLASS);
+ Class<? extends MonitorService> klass;
+ try {
+ //Is it a known type?
+ klass = MonitoringType.valueOf(
+ monitorType.toUpperCase(Locale.ENGLISH)).getMonitorClass();
+ } catch (Exception e) {
+ //Not a known type, use FQCN
+ klass = (Class<? extends MonitorService>) Class.forName(monitorType);
+ }
+ this.monitorServer = klass.newInstance();
+ Context context = new Context();
+ for (String key : keys) {
+ if (key.startsWith(CONF_MONITOR_PREFIX)) {
+ context.put(key.substring(CONF_MONITOR_PREFIX.length()),
+ systemProps.getProperty(key));
+ }
+ }
+ monitorServer.configure(context);
+ monitorServer.start();
+ }
+ } catch (Exception e) {
+ logger.warn("Error starting monitoring. "
+ + "Monitoring might not be available.", e);
+ }
+
+ }
+
+}
\ No newline at end of file
diff --git a/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/node/ConfigurationProvider.java b/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/node/ConfigurationProvider.java
new file mode 100644
index 0000000..e2a7ffe
--- /dev/null
+++ b/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/node/ConfigurationProvider.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.flume.node;
+
+public interface ConfigurationProvider {
+ MaterializedConfiguration getConfiguration();
+}
diff --git a/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/node/EnvVarResolverProperties.java b/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/node/EnvVarResolverProperties.java
new file mode 100644
index 0000000..d7b0c22
--- /dev/null
+++ b/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/node/EnvVarResolverProperties.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.flume.node;
+
+import com.google.common.base.Preconditions;
+
+import java.util.Properties;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+
+/**
+ * A class that extends the Java built-in Properties overriding
+ * {@link java.util.Properties#getProperty(String)} to allow ${ENV_VAR_NAME}-style environment
+ * variable inclusions
+ */
+public class EnvVarResolverProperties extends Properties {
+ /**
+ * @param input The input string with ${ENV_VAR_NAME}-style environment variable names
+ * @return The output string with ${ENV_VAR_NAME} replaced with their environment variable values
+ */
+ protected static String resolveEnvVars(String input) {
+ Preconditions.checkNotNull(input);
+ // match ${ENV_VAR_NAME}
+ Pattern p = Pattern.compile("\\$\\{(\\w+)\\}");
+ Matcher m = p.matcher(input);
+ StringBuffer sb = new StringBuffer();
+ while (m.find()) {
+ String envVarName = m.group(1);
+ String envVarValue = System.getenv(envVarName);
+ m.appendReplacement(sb, null == envVarValue ? "" : envVarValue);
+ }
+ m.appendTail(sb);
+ return sb.toString();
+ }
+
+ /**
+ * @param key the property key
+ * @return the value of the property key with ${ENV_VAR_NAME}-style environment variables replaced
+ */
+ @Override
+ public String getProperty(String key) {
+ return resolveEnvVars(super.getProperty(key));
+ }
+}
diff --git a/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/node/MaterializedConfiguration.java b/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/node/MaterializedConfiguration.java
new file mode 100644
index 0000000..c46fbf5
--- /dev/null
+++ b/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/node/MaterializedConfiguration.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.flume.node;
+
+import org.apache.flume.Channel;
+import org.apache.flume.SinkRunner;
+import org.apache.flume.SourceRunner;
+
+import com.google.common.collect.ImmutableMap;
+
+/**
+ * MaterializedConfiguration represents the materialization of a Flume
+ * properties file. That is it's the actual Source, Sink, and Channels
+ * represented in the configuration file.
+ */
+public interface MaterializedConfiguration {
+
+ public void addSourceRunner(String name, SourceRunner sourceRunner);
+
+ public void addSinkRunner(String name, SinkRunner sinkRunner);
+
+ public void addChannel(String name, Channel channel);
+
+ public ImmutableMap<String, SourceRunner> getSourceRunners();
+
+ public ImmutableMap<String, SinkRunner> getSinkRunners();
+
+ public ImmutableMap<String, Channel> getChannels();
+
+}
diff --git a/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/node/PollingPropertiesFileConfigurationProvider.java b/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/node/PollingPropertiesFileConfigurationProvider.java
new file mode 100644
index 0000000..2c628ae
--- /dev/null
+++ b/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/node/PollingPropertiesFileConfigurationProvider.java
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.flume.node;
+
+import java.io.File;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.flume.CounterGroup;
+import org.apache.flume.lifecycle.LifecycleAware;
+import org.apache.flume.lifecycle.LifecycleState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.common.eventbus.EventBus;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+public class PollingPropertiesFileConfigurationProvider
+ extends PropertiesFileConfigurationProvider
+ implements LifecycleAware {
+
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(PollingPropertiesFileConfigurationProvider.class);
+
+ private final EventBus eventBus;
+ private final File file;
+ private final int interval;
+ private final CounterGroup counterGroup;
+ private LifecycleState lifecycleState;
+
+ private ScheduledExecutorService executorService;
+
+ public PollingPropertiesFileConfigurationProvider(String agentName,
+ File file, EventBus eventBus, int interval) {
+ super(agentName, file);
+ this.eventBus = eventBus;
+ this.file = file;
+ this.interval = interval;
+ counterGroup = new CounterGroup();
+ lifecycleState = LifecycleState.IDLE;
+ }
+
+ @Override
+ public void start() {
+ LOGGER.info("Configuration provider starting");
+
+ Preconditions.checkState(file != null,
+ "The parameter file must not be null");
+
+ executorService = Executors.newSingleThreadScheduledExecutor(
+ new ThreadFactoryBuilder().setNameFormat("conf-file-poller-%d")
+ .build());
+
+ FileWatcherRunnable fileWatcherRunnable =
+ new FileWatcherRunnable(file, counterGroup);
+
+ executorService.scheduleWithFixedDelay(fileWatcherRunnable, 0, interval,
+ TimeUnit.SECONDS);
+
+ lifecycleState = LifecycleState.START;
+
+ LOGGER.debug("Configuration provider started");
+ }
+
+ @Override
+ public void stop() {
+ LOGGER.info("Configuration provider stopping");
+
+ executorService.shutdown();
+ try {
+ if (!executorService.awaitTermination(500, TimeUnit.MILLISECONDS)) {
+ LOGGER.debug("File watcher has not terminated. Forcing shutdown of executor.");
+ executorService.shutdownNow();
+ while (!executorService.awaitTermination(500, TimeUnit.MILLISECONDS)) {
+ LOGGER.debug("Waiting for file watcher to terminate");
+ }
+ }
+ } catch (InterruptedException e) {
+ LOGGER.debug("Interrupted while waiting for file watcher to terminate");
+ Thread.currentThread().interrupt();
+ }
+ lifecycleState = LifecycleState.STOP;
+ LOGGER.debug("Configuration provider stopped");
+ }
+
+ @Override
+ public synchronized LifecycleState getLifecycleState() {
+ return lifecycleState;
+ }
+
+
+ @Override
+ public String toString() {
+ return "{ file:" + file + " counterGroup:" + counterGroup + " provider:"
+ + getClass().getCanonicalName() + " agentName:" + getAgentName() + " }";
+ }
+
+ public class FileWatcherRunnable implements Runnable {
+
+ private final File file;
+ private final CounterGroup counterGroup;
+
+ private long lastChange;
+
+ public FileWatcherRunnable(File file, CounterGroup counterGroup) {
+ super();
+ this.file = file;
+ this.counterGroup = counterGroup;
+ this.lastChange = 0L;
+ }
+
+ @Override
+ public void run() {
+ LOGGER.debug("Checking file:{} for changes", file);
+
+ counterGroup.incrementAndGet("file.checks");
+
+ long lastModified = file.lastModified();
+
+ if (lastModified > lastChange) {
+ LOGGER.info("Reloading configuration file:{}", file);
+
+ counterGroup.incrementAndGet("file.loads");
+
+ lastChange = lastModified;
+
+ try {
+ eventBus.post(getConfiguration());
+ } catch (Exception e) {
+ LOGGER.error("Failed to load configuration data. Exception follows.",
+ e);
+ } catch (NoClassDefFoundError e) {
+ LOGGER.error("Failed to start agent because dependencies were not " +
+ "found in classpath. Error follows.", e);
+ } catch (Throwable t) {
+ // caught because the caller does not handle or log Throwables
+ LOGGER.error("Unhandled error", t);
+ }
+ }
+ }
+ }
+
+}
diff --git a/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/node/PollingZooKeeperConfigurationProvider.java b/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/node/PollingZooKeeperConfigurationProvider.java
new file mode 100644
index 0000000..b80eed3
--- /dev/null
+++ b/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/node/PollingZooKeeperConfigurationProvider.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.flume.node;
+
+import java.io.IOException;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.cache.ChildData;
+import org.apache.curator.framework.recipes.cache.NodeCache;
+import org.apache.curator.framework.recipes.cache.NodeCacheListener;
+import org.apache.flume.FlumeException;
+import org.apache.flume.conf.FlumeConfiguration;
+import org.apache.flume.lifecycle.LifecycleAware;
+import org.apache.flume.lifecycle.LifecycleState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.eventbus.EventBus;
+
+public class PollingZooKeeperConfigurationProvider extends
+ AbstractZooKeeperConfigurationProvider implements LifecycleAware {
+
+ private static final Logger LOGGER = LoggerFactory
+ .getLogger(PollingZooKeeperConfigurationProvider.class);
+
+ private final EventBus eventBus;
+
+ private final CuratorFramework client;
+
+ private NodeCache agentNodeCache;
+
+ private FlumeConfiguration flumeConfiguration;
+
+ private LifecycleState lifecycleState;
+
+ public PollingZooKeeperConfigurationProvider(String agentName,
+ String zkConnString, String basePath, EventBus eventBus) {
+ super(agentName, zkConnString, basePath);
+ this.eventBus = eventBus;
+ client = createClient();
+ agentNodeCache = null;
+ flumeConfiguration = null;
+ lifecycleState = LifecycleState.IDLE;
+ }
+
+ @Override
+ protected FlumeConfiguration getFlumeConfiguration() {
+ return flumeConfiguration;
+ }
+
+ @Override
+ public void start() {
+ LOGGER.debug("Starting...");
+ try {
+ client.start();
+ try {
+ agentNodeCache = new NodeCache(client, basePath + "/" + getAgentName());
+ agentNodeCache.start();
+ agentNodeCache.getListenable().addListener(new NodeCacheListener() {
+ @Override
+ public void nodeChanged() throws Exception {
+ refreshConfiguration();
+ }
+ });
+ } catch (Exception e) {
+ client.close();
+ throw e;
+ }
+ } catch (Exception e) {
+ lifecycleState = LifecycleState.ERROR;
+ if (e instanceof RuntimeException) {
+ throw (RuntimeException) e;
+ } else {
+ throw new FlumeException(e);
+ }
+ }
+ lifecycleState = LifecycleState.START;
+ }
+
+ private void refreshConfiguration() throws IOException {
+ LOGGER.info("Refreshing configuration from ZooKeeper");
+ byte[] data = null;
+ ChildData childData = agentNodeCache.getCurrentData();
+ if (childData != null) {
+ data = childData.getData();
+ }
+ flumeConfiguration = configFromBytes(data);
+ eventBus.post(getConfiguration());
+ }
+
+ @Override
+ public void stop() {
+ LOGGER.debug("Stopping...");
+ if (agentNodeCache != null) {
+ try {
+ agentNodeCache.close();
+ } catch (IOException e) {
+ LOGGER.warn("Encountered exception while stopping", e);
+ lifecycleState = LifecycleState.ERROR;
+ }
+ }
+
+ try {
+ client.close();
+ } catch (Exception e) {
+ LOGGER.warn("Error stopping Curator client", e);
+ lifecycleState = LifecycleState.ERROR;
+ }
+
+ if (lifecycleState != LifecycleState.ERROR) {
+ lifecycleState = LifecycleState.STOP;
+ }
+ }
+
+ @Override
+ public LifecycleState getLifecycleState() {
+ return lifecycleState;
+ }
+}
diff --git a/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/node/PropertiesFileConfigurationProvider.java b/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/node/PropertiesFileConfigurationProvider.java
new file mode 100644
index 0000000..03055b6
--- /dev/null
+++ b/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/node/PropertiesFileConfigurationProvider.java
@@ -0,0 +1,217 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.flume.node;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Properties;
+
+import org.apache.flume.conf.FlumeConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * <p>
+ * A configuration provider that uses properties file for specifying
+ * configuration. The configuration files follow the Java properties file syntax
+ * rules specified at {@link java.util.Properties#load(java.io.Reader)}. Every
+ * configuration value specified in the properties file is prefixed by an
+ * <em>Agent Name</em> which helps isolate an individual agent's namespace.
+ * </p>
+ * <p>
+ * Valid configuration files must observe the following rules for every agent
+ * namespace.
+ * <ul>
+ * <li>For every <agent name> there must be three lists specified that
+ * include <tt><agent name>.sources</tt>,
+ * <tt><agent name>.sinks</tt>, and <tt><agent name>.channels</tt>.
+ * Each of these lists must contain a space separated list of names
+ * corresponding to that particular entity.</li>
+ * <li>For each source named in <tt><agent name>.sources</tt>, there must
+ * be a non-empty <tt>type</tt> attribute specified from the valid set of source
+ * types. For example:
+ * <tt><agent name>.sources.<source name>.type = event</tt></li>
+ * <li>For each source named in <tt><agent name>.sources</tt>, there must
+ * be a space-separated list of channel names that the source will associate
+ * with during runtime. Each of these names must be contained in the channels
+ * list specified by <tt><agent name>.channels</tt>. For example:
+ * <tt><agent name>.sources.<source name>.channels =
+ * <channel-1 name> <channel-2 name></tt></li>
+ * <li>For each source named in the <tt><agent name>.sources</tt>, there
+ * must be a <tt>runner</tt> namespace of configuration that configures the
+ * associated source runner. For example:
+ * <tt><agent name>.sources.<source name>.runner.type = avro</tt>.
+ * This namespace can also be used to configure other configuration of the
+ * source runner as needed. For example:
+ * <tt><agent name>.sources.<source name>.runner.port = 10101</tt>
+ * </li>
+ * <li>For each source named in <tt><sources>.sources</tt> there can
+ * be an optional <tt>selector.type</tt> specified that identifies the type
+ * of channel selector associated with the source. If not specified, the
+ * default replicating channel selector is used.
+ * </li><li>For each channel named in the <tt><agent name>.channels</tt>,
+ * there must be a non-empty <tt>type</tt> attribute specified from the valid
+ * set of channel types. For example:
+ * <tt><agent name>.channels.<channel name>.type = mem</tt></li>
+ * <li>For each sink named in the <tt><agent name>.sinks</tt>, there must
+ * be a non-empty <tt>type</tt> attribute specified from the valid set of sink
+ * types. For example:
+ * <tt><agent name>.sinks.<sink name>.type = hdfs</tt></li>
+ * <li>For each sink named in the <tt><agent name>.sinks</tt>, there must
+ * be a non-empty single-valued channel name specified as the value of the
+ * <tt>channel</tt> attribute. This value must be contained in the channels list
+ * specified by <tt><agent name>.channels</tt>. For example:
+ * <tt><agent name>.sinks.<sink name>.channel =
+ * <channel name></tt></li>
+ * <li>For each sink named in the <tt><agent name>.sinks</tt>, there must
+ * be a <tt>runner</tt> namespace of configuration that configures the
+ * associated sink runner. For example:
+ * <tt><agent name>.sinks.<sink name>.runner.type = polling</tt>.
+ * This namespace can also be used to configure other configuration of the sink
+ * runner as needed. For example:
+ * <tt><agent name>.sinks.<sink name>.runner.polling.interval =
+ * 60</tt></li>
+ * <li>A fourth optional list <tt><agent name>.sinkgroups</tt>
+ * may be added to each agent, consisting of unique space separated names
+ * for groups</li>
+ * <li>Each sinkgroup must specify sinks, containing a list of all sinks
+ * belonging to it. These cannot be shared by multiple groups.
+ * Further, one can set a processor and behavioral parameters to determine
+ * how sink selection is made via <tt><agent name>.sinkgroups.<
+ * group name<.processor</tt>. For further detail refer to individual processor
+ * documentation</li>
+ * <li>Sinks not assigned to a group will be assigned to default single sink
+ * groups.</li>
+ * </ul>
+ *
+ * Apart from the above required configuration values, each source, sink or
+ * channel can have its own set of arbitrary configuration as required by the
+ * implementation. Each of these configuration values are expressed by fully
+ * namespace qualified configuration keys. For example, the configuration
+ * property called <tt>capacity</tt> for a channel called <tt>ch1</tt> for the
+ * agent named <tt>host1</tt> with value <tt>1000</tt> will be expressed as:
+ * <tt>host1.channels.ch1.capacity = 1000</tt>.
+ * </p>
+ * <p>
+ * Any information contained in the configuration file other than what pertains
+ * to the configured agents, sources, sinks and channels via the explicitly
+ * enumerated list of sources, sinks and channels per agent name are ignored by
+ * this provider. Moreover, if any of the required configuration values are not
+ * present in the configuration file for the configured entities, that entity
+ * and anything that depends upon it is considered invalid and consequently not
+ * configured. For example, if a channel is missing its <tt>type</tt> attribute,
+ * it is considered misconfigured. Also, any sources or sinks that depend upon
+ * this channel are also considered misconfigured and not initialized.
+ * </p>
+ * <p>
+ * Example configuration file:
+ *
+ * <pre>
+ * #
+ * # Flume Configuration
+ * # This file contains configuration for one Agent identified as host1.
+ * #
+ *
+ * host1.sources = avroSource thriftSource
+ * host1.channels = jdbcChannel
+ * host1.sinks = hdfsSink
+ *
+ * # avroSource configuration
+ * host1.sources.avroSource.type = org.apache.flume.source.AvroSource
+ * host1.sources.avroSource.runner.type = avro
+ * host1.sources.avroSource.runner.port = 11001
+ * host1.sources.avroSource.channels = jdbcChannel
+ * host1.sources.avroSource.selector.type = replicating
+ *
+ * # thriftSource configuration
+ * host1.sources.thriftSource.type = org.apache.flume.source.ThriftSource
+ * host1.sources.thriftSource.runner.type = thrift
+ * host1.sources.thriftSource.runner.port = 12001
+ * host1.sources.thriftSource.channels = jdbcChannel
+ *
+ * # jdbcChannel configuration
+ * host1.channels.jdbcChannel.type = jdbc
+ * host1.channels.jdbcChannel.jdbc.driver = com.mysql.jdbc.Driver
+ * host1.channels.jdbcChannel.jdbc.connect.url = http://localhost/flumedb
+ * host1.channels.jdbcChannel.jdbc.username = flume
+ * host1.channels.jdbcChannel.jdbc.password = flume
+ *
+ * # hdfsSink configuration
+ * host1.sinks.hdfsSink.type = hdfs
+ * host1.sinks.hdfsSink.hdfs.path = hdfs://localhost/
+ * host1.sinks.hdfsSink.batchsize = 1000
+ * host1.sinks.hdfsSink.runner.type = polling
+ * host1.sinks.hdfsSink.runner.polling.interval = 60
+ * </pre>
+ *
+ * </p>
+ *
+ * @see java.util.Properties#load(java.io.Reader)
+ */
+public class PropertiesFileConfigurationProvider extends
+ AbstractConfigurationProvider {
+
+ private static final Logger LOGGER = LoggerFactory
+ .getLogger(PropertiesFileConfigurationProvider.class);
+ private static final String DEFAULT_PROPERTIES_IMPLEMENTATION = "java.util.Properties";
+
+ private final File file;
+
+ public PropertiesFileConfigurationProvider(String agentName, File file) {
+ super(agentName);
+ this.file = file;
+ }
+
+ @Override
+ public FlumeConfiguration getFlumeConfiguration() {
+ BufferedReader reader = null;
+ try {
+ reader = new BufferedReader(new FileReader(file));
+ String resolverClassName = System.getProperty("propertiesImplementation",
+ DEFAULT_PROPERTIES_IMPLEMENTATION);
+ Class<? extends Properties> propsclass = Class.forName(resolverClassName)
+ .asSubclass(Properties.class);
+ Properties properties = propsclass.newInstance();
+ properties.load(reader);
+ return new FlumeConfiguration(toMap(properties));
+ } catch (IOException ex) {
+ LOGGER.error("Unable to load file:" + file
+ + " (I/O failure) - Exception follows.", ex);
+ } catch (ClassNotFoundException e) {
+ LOGGER.error("Configuration resolver class not found", e);
+ } catch (InstantiationException e) {
+ LOGGER.error("Instantiation exception", e);
+ } catch (IllegalAccessException e) {
+ LOGGER.error("Illegal access exception", e);
+ } finally {
+ if (reader != null) {
+ try {
+ reader.close();
+ } catch (IOException ex) {
+ LOGGER.warn(
+ "Unable to close file reader for file: " + file, ex);
+ }
+ }
+ }
+ return new FlumeConfiguration(new HashMap<String, String>());
+ }
+}
diff --git a/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/node/SimpleMaterializedConfiguration.java b/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/node/SimpleMaterializedConfiguration.java
new file mode 100644
index 0000000..fd14d84
--- /dev/null
+++ b/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/node/SimpleMaterializedConfiguration.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.flume.node;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.flume.Channel;
+import org.apache.flume.SinkRunner;
+import org.apache.flume.SourceRunner;
+
+import com.google.common.collect.ImmutableMap;
+
+public class SimpleMaterializedConfiguration implements MaterializedConfiguration {
+
+ private final Map<String, Channel> channels;
+ private final Map<String, SourceRunner> sourceRunners;
+ private final Map<String, SinkRunner> sinkRunners;
+
+ public SimpleMaterializedConfiguration() {
+ channels = new HashMap<String, Channel>();
+ sourceRunners = new HashMap<String, SourceRunner>();
+ sinkRunners = new HashMap<String, SinkRunner>();
+ }
+
+ @Override
+ public String toString() {
+ return "{ sourceRunners:" + sourceRunners + " sinkRunners:" + sinkRunners
+ + " channels:" + channels + " }";
+ }
+
+ @Override
+ public void addSourceRunner(String name, SourceRunner sourceRunner) {
+ sourceRunners.put(name, sourceRunner);
+ }
+
+ @Override
+ public void addSinkRunner(String name, SinkRunner sinkRunner) {
+ sinkRunners.put(name, sinkRunner);
+ }
+
+ @Override
+ public void addChannel(String name, Channel channel) {
+ channels.put(name, channel);
+ }
+
+ @Override
+ public ImmutableMap<String, Channel> getChannels() {
+ return ImmutableMap.copyOf(channels);
+ }
+
+ @Override
+ public ImmutableMap<String, SourceRunner> getSourceRunners() {
+ return ImmutableMap.copyOf(sourceRunners);
+ }
+
+ @Override
+ public ImmutableMap<String, SinkRunner> getSinkRunners() {
+ return ImmutableMap.copyOf(sinkRunners);
+ }
+
+}
diff --git a/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/node/StaticZooKeeperConfigurationProvider.java b/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/node/StaticZooKeeperConfigurationProvider.java
new file mode 100644
index 0000000..2a927e0
--- /dev/null
+++ b/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/node/StaticZooKeeperConfigurationProvider.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.flume.node;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.flume.FlumeException;
+import org.apache.flume.conf.FlumeConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class StaticZooKeeperConfigurationProvider extends
+ AbstractZooKeeperConfigurationProvider {
+
+ private static final Logger LOGGER = LoggerFactory
+ .getLogger(StaticZooKeeperConfigurationProvider.class);
+
+ public StaticZooKeeperConfigurationProvider(String agentName,
+ String zkConnString, String basePath) {
+ super(agentName, zkConnString, basePath);
+ }
+
+ @Override
+ protected FlumeConfiguration getFlumeConfiguration() {
+ try {
+ CuratorFramework cf = createClient();
+ cf.start();
+ try {
+ byte[] data = cf.getData().forPath(basePath + "/" + getAgentName());
+ return configFromBytes(data);
+ } finally {
+ cf.close();
+ }
+ } catch (Exception e) {
+ LOGGER.error("Error getting configuration info from Zookeeper", e);
+ throw new FlumeException(e);
+ }
+ }
+
+}
diff --git a/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/sink/AbstractSink.java b/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/sink/AbstractSink.java
new file mode 100644
index 0000000..3e707ed
--- /dev/null
+++ b/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/sink/AbstractSink.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.flume.sink;
+
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.Sink;
+import org.apache.pulsar.io.core.SinkContext;
+import org.apache.pulsar.io.flume.FlumeConfig;
+import org.apache.pulsar.io.flume.FlumeConnector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * A Simple abstract sink class for pulsar to flume.
+ */
+public abstract class AbstractSink<T> implements Sink<T> {
+
+ private static final Logger log = LoggerFactory.getLogger(AbstractSink.class);
+
+
+ public abstract T extractValue(Record<T> record);
+
+ protected static BlockingQueue<Map<String, Object>> records;
+
+ protected FlumeConnector flumeConnector;
+
+ public static BlockingQueue<Map<String, Object>> getQueue() {
+ return records;
+ }
+
+ @Override
+ public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
+
+ records = new LinkedBlockingQueue<Map<String, Object>>();
+
+ FlumeConfig flumeConfig = FlumeConfig.load(config);
+
+ flumeConnector = new FlumeConnector();
+ flumeConnector.StartConnector(flumeConfig);
+ }
+
+ @Override
+ public void write(Record<T> record) {
+ try {
+ T message = extractValue(record);
+ Map<String, Object> m = new HashMap();
+ m.put("body", message);
+ records.put(m);
+ record.ack();
+ } catch (InterruptedException e) {
+ record.fail();
+ log.error("error", e);
+ }
+ }
+
+ @Override
+ public void close() {
+ if (flumeConnector != null) {
+ flumeConnector.stop();
+ }
+ }
+}
diff --git a/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/sink/SourceOfFlume.java b/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/sink/SourceOfFlume.java
new file mode 100644
index 0000000..a1be6e7
--- /dev/null
+++ b/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/sink/SourceOfFlume.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.flume.sink;
+
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.conf.BatchSizeSupported;
+import org.apache.flume.event.EventBuilder;
+import org.apache.flume.instrumentation.SourceCounter;
+import org.apache.flume.source.AbstractPollableSource;
+import org.apache.flume.source.avro.AvroFlumeEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+
+
+import com.google.common.base.Optional;
+
+import static org.apache.flume.source.SpoolDirectorySourceConfigurationConstants.BATCH_SIZE;
+
+
+public class SourceOfFlume extends AbstractPollableSource implements BatchSizeSupported {
+
+ private static final Logger log = LoggerFactory
+ .getLogger(SourceOfFlume.class);
+
+ public static final String BATCH_DURATION_MS = "batchDurationMillis";
+
+ private long batchSize;
+
+ private int maxBatchDurationMillis;
+
+ private SourceCounter counter;
+
+ private final List<Event> eventList = new ArrayList<Event>();
+
+ private Optional<SpecificDatumReader<AvroFlumeEvent>> reader = Optional.absent();
+
+
+ @Override
+ public synchronized void doStart() {
+ log.info("start source of flume ...");
+ this.counter = new SourceCounter("flume-source");
+ this.counter.start();
+ }
+
+ @Override
+ public void doStop() {
+ log.info("stop source of flume ...");
+ this.counter.stop();
+ }
+
+ @Override
+ public void doConfigure(Context context) {
+ batchSize = context.getInteger(BATCH_SIZE, 1000);
+ maxBatchDurationMillis = context.getInteger(BATCH_DURATION_MS, 1000);
+ log.info("context: {}", context);
+ }
+
+ @Override
+ public Status doProcess() {
+ Event event;
+ String eventBody;
+ try {
+ final long maxBatchEndTime = System.currentTimeMillis() + maxBatchDurationMillis;
+
+ while (eventList.size() < this.getBatchSize() &&
+ System.currentTimeMillis() < maxBatchEndTime) {
+ BlockingQueue<Map<String, Object>> blockingQueue = StringSink.getQueue();
+ while (blockingQueue != null && !blockingQueue.isEmpty()) {
+ Map<String, Object> message = blockingQueue.take();
+ eventBody = message.get("body").toString();
+ event = EventBuilder.withBody(eventBody.getBytes());
+ eventList.add(event);
+ }
+ }
+ if (eventList.size() > 0) {
+ counter.addToEventReceivedCount((long) eventList.size());
+ getChannelProcessor().processEventBatch(eventList);
+ eventList.clear();
+ return Status.READY;
+ }
+ return Status.BACKOFF;
+
+ } catch (Exception e) {
+ log.error("Flume Source EXCEPTION, {}", e);
+ counter.incrementEventReadOrChannelFail(e);
+ return Status.BACKOFF;
+ }
+ }
+
+ @Override
+ public long getBatchSize() {
+ return batchSize;
+ }
+
+}
diff --git a/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/sink/StringSink.java b/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/sink/StringSink.java
new file mode 100644
index 0000000..a05950a
--- /dev/null
+++ b/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/sink/StringSink.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.flume.sink;
+
+
+import org.apache.pulsar.functions.api.Record;
+
+public class StringSink extends AbstractSink<String> {
+
+ @Override
+ public String extractValue(Record<String> message) {
+ return message.getValue();
+ }
+}
diff --git a/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/source/AbstractSinkOfFlume.java b/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/source/AbstractSinkOfFlume.java
new file mode 100644
index 0000000..81313fd
--- /dev/null
+++ b/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/source/AbstractSinkOfFlume.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.flume.source;
+
+import org.apache.flume.sink.AbstractSink;
+
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+
+public abstract class AbstractSinkOfFlume extends AbstractSink {
+
+ protected static BlockingQueue<Map<String, Object>> records;
+
+ public static BlockingQueue getQueue() {
+ return records;
+ }
+}
diff --git a/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/source/AbstractSource.java b/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/source/AbstractSource.java
new file mode 100644
index 0000000..8c80512
--- /dev/null
+++ b/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/source/AbstractSource.java
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.flume.source;
+
+import lombok.Getter;
+import lombok.Setter;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.flume.FlumeConfig;
+import org.apache.pulsar.io.flume.FlumeConnector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+
+import org.apache.pulsar.io.core.PushSource;
+import org.apache.pulsar.io.core.SourceContext;
+
+import java.util.*;
+import java.util.concurrent.BlockingQueue;
+
+/**
+ * A Simple abstract source class for flume to pulsar.
+ */
+public abstract class AbstractSource<V> extends PushSource<V> {
+
+ private static final Logger log = LoggerFactory
+ .getLogger(AbstractSource.class);
+
+ protected Thread thread = null;
+
+ protected volatile boolean running = false;
+
+ protected final Thread.UncaughtExceptionHandler handler = new Thread.UncaughtExceptionHandler() {
+
+ @Override
+ public void uncaughtException(Thread t, Throwable e) {
+ log.error("[{}] parse events has an error", t.getName(), e);
+ }
+ };
+
+ @Override
+ public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {
+
+ FlumeConfig flumeConfig = FlumeConfig.load(config);
+
+ FlumeConnector flumeConnector = new FlumeConnector();
+ flumeConnector.StartConnector(flumeConfig);
+
+ this.start();
+
+ }
+
+ public abstract V extractValue(String message);
+
+ protected void start() {
+ thread = new Thread(new Runnable() {
+
+ @Override
+ public void run() {
+ process();
+ }
+ });
+
+ thread.setName("flume source thread");
+ thread.setUncaughtExceptionHandler(handler);
+ running = true;
+ thread.start();
+ }
+
+ @Override
+ public void close() throws InterruptedException {
+ log.info("close flume source");
+ if (!running) {
+ return;
+ }
+ running = false;
+ if (thread != null) {
+ thread.interrupt();
+ thread.join();
+ }
+ }
+
+ protected void process() {
+ while (running) {
+ try {
+ log.info("start flume receive from sink process");
+ while (running) {
+ BlockingQueue<Map<String, Object>> blockingQueue = SinkOfFlume.getQueue();
+ while (blockingQueue != null && !blockingQueue.isEmpty()) {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ ObjectOutput out = null;
+ out = new ObjectOutputStream(bos);
+ Map<String, Object> message = blockingQueue.take();
+ out.writeObject(message.get("body"));
+ out.flush();
+ byte[] m = bos.toByteArray();
+ String m1 = new String(m);
+ bos.close();
+ FlumeRecord flumeRecord = new FlumeRecord<>();
+ flumeRecord.setRecord(extractValue(m1));
+ consume(flumeRecord);
+ }
+ }
+ } catch (Exception e) {
+ log.error("process error!", e);
+ }
+ }
+ }
+
+ @Getter
+ @Setter
+ static private class FlumeRecord<V> implements Record<V> {
+ private V record;
+ private Long id;
+
+ @Override
+ public Optional<String> getKey() {
+ return Optional.of(Long.toString(id));
+ }
+
+ @Override
+ public V getValue() {
+ return record;
+ }
+ }
+
+}
diff --git a/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/source/SinkOfFlume.java b/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/source/SinkOfFlume.java
new file mode 100644
index 0000000..f990bf0
--- /dev/null
+++ b/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/source/SinkOfFlume.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.flume.source;
+
+import org.apache.flume.*;
+import org.apache.flume.conf.BatchSizeSupported;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.instrumentation.SinkCounter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import static org.apache.flume.source.SpoolDirectorySourceConfigurationConstants.BATCH_SIZE;
+
+public class SinkOfFlume extends AbstractSinkOfFlume implements Configurable, BatchSizeSupported {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SinkOfFlume.class);
+
+ private long batchSize;
+
+ private SinkCounter counter = null;
+
+ @Override
+ public void configure(Context context) {
+ batchSize = context.getInteger(BATCH_SIZE, 1000);
+ }
+
+ @Override
+ public long getBatchSize() {
+ return batchSize;
+ }
+
+
+ @Override
+ public Status process() throws EventDeliveryException {
+ Status result = Status.READY;
+ Channel channel = getChannel();
+ Transaction transaction = null;
+ Event event = null;
+
+ try {
+ transaction = channel.getTransaction();
+ transaction.begin();
+ long processedEvents = 0;
+ for (; processedEvents < batchSize; processedEvents += 1) {
+ event = channel.take();
+
+ if (event == null) {
+ // no events available in the channel
+ break;
+ }
+ if (processedEvents == 0) {
+ result = Status.BACKOFF;
+ counter.incrementBatchEmptyCount();
+ } else if (processedEvents < batchSize) {
+ counter.incrementBatchUnderflowCount();
+ } else {
+ counter.incrementBatchCompleteCount();
+ }
+ event.getHeaders();
+ event.getBody();
+ Map<String, Object> m = new HashMap();
+ m.put("headers", event.getHeaders());
+ m.put("body", event.getBody());
+ records.put(m);
+ }
+ transaction.commit();
+ } catch (Exception ex) {
+ String errorMsg = "Failed to publish events";
+ LOG.error("Failed to publish events", ex);
+ counter.incrementEventWriteOrChannelFail(ex);
+ result = Status.BACKOFF;
+ if (transaction != null) {
+ try {
+ // If the transaction wasn't committed before we got the exception, we
+ // need to rollback.
+ transaction.rollback();
+ } catch (RuntimeException e) {
+ LOG.error("Transaction rollback failed: " + e.getLocalizedMessage());
+ LOG.debug("Exception follows.", e);
+ } finally {
+ transaction.close();
+ transaction = null;
+ }
+ }
+ } finally {
+ if (transaction != null) {
+ transaction.close();
+ }
+ }
+ return result;
+ }
+
+ @Override
+ public synchronized void start() {
+ records = new LinkedBlockingQueue<Map<String, Object>>();
+ this.counter = new SinkCounter("flume-sink");
+ }
+
+ @Override
+ public synchronized void stop() {
+ }
+
+}
diff --git a/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/source/StringSource.java b/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/source/StringSource.java
new file mode 100644
index 0000000..3c25986
--- /dev/null
+++ b/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/source/StringSource.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.flume.source;
+
+
+public class StringSource extends AbstractSource<String> {
+
+ @Override
+ public String extractValue(String message) {
+ return message;
+ }
+}
diff --git a/pulsar-io/flume/src/main/resources/META-INF/services/pulsar-io.yaml b/pulsar-io/flume/src/main/resources/META-INF/services/pulsar-io.yaml
new file mode 100644
index 0000000..0e578b8
--- /dev/null
+++ b/pulsar-io/flume/src/main/resources/META-INF/services/pulsar-io.yaml
@@ -0,0 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+name: flume
+description: flume source and sink connector
+sourceClass: org.apache.pulsar.io.flume.source.StringSource
+sinkClass: org.apache.pulsar.io.flume.sink.StringSink
\ No newline at end of file
diff --git a/pulsar-io/flume/src/main/resources/flume/flume-io-sink.yaml b/pulsar-io/flume/src/main/resources/flume/flume-io-sink.yaml
new file mode 100644
index 0000000..4850d31
--- /dev/null
+++ b/pulsar-io/flume/src/main/resources/flume/flume-io-sink.yaml
@@ -0,0 +1,25 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+configs:
+ name: a1
+ confFile: sink.conf
+ noReloadConf: false
+ zkConnString: ""
+ zkBasePath: ""
\ No newline at end of file
diff --git a/pulsar-io/flume/src/main/resources/flume/flume-io-source.yaml b/pulsar-io/flume/src/main/resources/flume/flume-io-source.yaml
new file mode 100644
index 0000000..2df778c
--- /dev/null
+++ b/pulsar-io/flume/src/main/resources/flume/flume-io-source.yaml
@@ -0,0 +1,25 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+configs:
+ name: a1
+ confFile: source.conf
+ noReloadConf: false
+ zkConnString: ""
+ zkBasePath: ""
\ No newline at end of file
diff --git a/pulsar-io/flume/src/main/resources/flume/sink.conf b/pulsar-io/flume/src/main/resources/flume/sink.conf
new file mode 100644
index 0000000..8006195
--- /dev/null
+++ b/pulsar-io/flume/src/main/resources/flume/sink.conf
@@ -0,0 +1,41 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# example.conf: A single-node Flume configuration
+
+# Name the components on this agent
+a1.sources = r1
+a1.sinks = k1
+a1.channels = c1
+
+# Describe/configure the source
+a1.sources.r1.type = netcat
+a1.sources.r1.bind = localhost
+a1.sources.r1.port = 44444
+
+# Describe the sink
+a1.sinks.k1.type = org.apache.pulsar.io.flume.source.SinkOfFlume
+# Use a channel which buffers events in memory
+a1.channels.c1.type = memory
+a1.channels.c1.capacity = 1000
+a1.channels.c1.transactionCapacity = 1000
+
+# Bind the source and sink to the channel
+a1.sources.r1.channels = c1
+a1.sinks.k1.channel = c1
\ No newline at end of file
diff --git a/pulsar-io/flume/src/main/resources/flume/source.conf b/pulsar-io/flume/src/main/resources/flume/source.conf
new file mode 100644
index 0000000..93c713b
--- /dev/null
+++ b/pulsar-io/flume/src/main/resources/flume/source.conf
@@ -0,0 +1,41 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# example.conf: A single-node Flume configuration
+
+# Name the components on this agent
+a1.sources = r1
+a1.sinks = k1
+a1.channels = c1
+
+# Describe/configure the source
+a1.sources.r1.type = org.apache.pulsar.io.flume.sink.SourceOfFlume
+
+# Describe the sink
+a1.sinks.k1.type = avro
+a1.sinks.k1.hostname = 127.0.0.1
+a1.sinks.k1.port = 44444
+# Use a channel which buffers events in memory
+a1.channels.c1.type = memory
+a1.channels.c1.capacity = 1000
+a1.channels.c1.transactionCapacity = 1000
+
+# Bind the source and sink to the channel
+a1.sources.r1.channels = c1
+a1.sinks.k1.channel = c1
\ No newline at end of file
diff --git a/pulsar-io/flume/src/test/java/org/apache/pulsar/io/flume/AbstractFlumeTests.java b/pulsar-io/flume/src/test/java/org/apache/pulsar/io/flume/AbstractFlumeTests.java
new file mode 100644
index 0000000..c9fe6c6
--- /dev/null
+++ b/pulsar-io/flume/src/test/java/org/apache/pulsar/io/flume/AbstractFlumeTests.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.flume;
+
+
+public abstract class AbstractFlumeTests {
+
+}
diff --git a/pulsar-io/flume/src/test/java/org/apache/pulsar/io/flume/node/TestAbstractConfigurationProvider.java b/pulsar-io/flume/src/test/java/org/apache/pulsar/io/flume/node/TestAbstractConfigurationProvider.java
new file mode 100644
index 0000000..ed2cf3f
--- /dev/null
+++ b/pulsar-io/flume/src/test/java/org/apache/pulsar/io/flume/node/TestAbstractConfigurationProvider.java
@@ -0,0 +1,370 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.flume.node;
+
+import com.google.common.collect.Maps;
+import junit.framework.Assert;
+import org.apache.flume.Channel;
+import org.apache.flume.ChannelException;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.Transaction;
+import org.apache.flume.annotations.Disposable;
+import org.apache.flume.annotations.Recyclable;
+import org.apache.flume.channel.AbstractChannel;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.conf.FlumeConfiguration;
+import org.apache.flume.sink.AbstractSink;
+import org.apache.flume.source.AbstractSource;
+import org.junit.Test;
+
+import java.util.Map;
+
+public class TestAbstractConfigurationProvider {
+
+ @Test
+ public void testDispoableChannel() throws Exception {
+ String agentName = "agent1";
+ Map<String, String> properties = getPropertiesForChannel(agentName,
+ DisposableChannel.class.getName());
+ MemoryConfigurationProvider provider =
+ new MemoryConfigurationProvider(agentName, properties);
+ MaterializedConfiguration config1 = provider.getConfiguration();
+ Channel channel1 = config1.getChannels().values().iterator().next();
+ Assert.assertTrue(channel1 instanceof DisposableChannel);
+ MaterializedConfiguration config2 = provider.getConfiguration();
+ Channel channel2 = config2.getChannels().values().iterator().next();
+ Assert.assertTrue(channel2 instanceof DisposableChannel);
+ Assert.assertNotSame(channel1, channel2);
+ }
+
+ @Test
+ public void testReusableChannel() throws Exception {
+ String agentName = "agent1";
+ Map<String, String> properties = getPropertiesForChannel(agentName,
+ RecyclableChannel.class.getName());
+ MemoryConfigurationProvider provider =
+ new MemoryConfigurationProvider(agentName, properties);
+
+ MaterializedConfiguration config1 = provider.getConfiguration();
+ Channel channel1 = config1.getChannels().values().iterator().next();
+ Assert.assertTrue(channel1 instanceof RecyclableChannel);
+
+ MaterializedConfiguration config2 = provider.getConfiguration();
+ Channel channel2 = config2.getChannels().values().iterator().next();
+ Assert.assertTrue(channel2 instanceof RecyclableChannel);
+
+ Assert.assertSame(channel1, channel2);
+ }
+
+ @Test
+ public void testUnspecifiedChannel() throws Exception {
+ String agentName = "agent1";
+ Map<String, String> properties = getPropertiesForChannel(agentName,
+ UnspecifiedChannel.class.getName());
+ MemoryConfigurationProvider provider =
+ new MemoryConfigurationProvider(agentName, properties);
+
+ MaterializedConfiguration config1 = provider.getConfiguration();
+ Channel channel1 = config1.getChannels().values().iterator().next();
+ Assert.assertTrue(channel1 instanceof UnspecifiedChannel);
+
+ MaterializedConfiguration config2 = provider.getConfiguration();
+ Channel channel2 = config2.getChannels().values().iterator().next();
+ Assert.assertTrue(channel2 instanceof UnspecifiedChannel);
+
+ Assert.assertSame(channel1, channel2);
+ }
+
+ @Test
+ public void testReusableChannelNotReusedLater() throws Exception {
+ String agentName = "agent1";
+ Map<String, String> propertiesReusable = getPropertiesForChannel(agentName,
+ RecyclableChannel.class
+ .getName());
+ Map<String, String> propertiesDispoable = getPropertiesForChannel(agentName,
+ DisposableChannel.class
+ .getName());
+ MemoryConfigurationProvider provider =
+ new MemoryConfigurationProvider(agentName, propertiesReusable);
+ MaterializedConfiguration config1 = provider.getConfiguration();
+ Channel channel1 = config1.getChannels().values().iterator().next();
+ Assert.assertTrue(channel1 instanceof RecyclableChannel);
+
+ provider.setProperties(propertiesDispoable);
+ MaterializedConfiguration config2 = provider.getConfiguration();
+ Channel channel2 = config2.getChannels().values().iterator().next();
+ Assert.assertTrue(channel2 instanceof DisposableChannel);
+
+ provider.setProperties(propertiesReusable);
+ MaterializedConfiguration config3 = provider.getConfiguration();
+ Channel channel3 = config3.getChannels().values().iterator().next();
+ Assert.assertTrue(channel3 instanceof RecyclableChannel);
+
+ Assert.assertNotSame(channel1, channel3);
+ }
+
+ @Test
+ public void testSourceThrowsExceptionDuringConfiguration() throws Exception {
+ String agentName = "agent1";
+ String sourceType = UnconfigurableSource.class.getName();
+ String channelType = "memory";
+ String sinkType = "null";
+ Map<String, String> properties = getProperties(agentName, sourceType,
+ channelType, sinkType);
+ MemoryConfigurationProvider provider =
+ new MemoryConfigurationProvider(agentName, properties);
+ MaterializedConfiguration config = provider.getConfiguration();
+ Assert.assertTrue(config.getSourceRunners().size() == 0);
+ Assert.assertTrue(config.getChannels().size() == 1);
+ Assert.assertTrue(config.getSinkRunners().size() == 1);
+ }
+
+ @Test
+ public void testChannelThrowsExceptionDuringConfiguration() throws Exception {
+ String agentName = "agent1";
+ String sourceType = "seq";
+ String channelType = UnconfigurableChannel.class.getName();
+ String sinkType = "null";
+ Map<String, String> properties = getProperties(agentName, sourceType,
+ channelType, sinkType);
+ MemoryConfigurationProvider provider =
+ new MemoryConfigurationProvider(agentName, properties);
+ MaterializedConfiguration config = provider.getConfiguration();
+ Assert.assertTrue(config.getSourceRunners().size() == 0);
+ Assert.assertTrue(config.getChannels().size() == 0);
+ Assert.assertTrue(config.getSinkRunners().size() == 0);
+ }
+
+ @Test
+ public void testSinkThrowsExceptionDuringConfiguration() throws Exception {
+ String agentName = "agent1";
+ String sourceType = "seq";
+ String channelType = "memory";
+ String sinkType = UnconfigurableSink.class.getName();
+ Map<String, String> properties = getProperties(agentName, sourceType,
+ channelType, sinkType);
+ MemoryConfigurationProvider provider =
+ new MemoryConfigurationProvider(agentName, properties);
+ MaterializedConfiguration config = provider.getConfiguration();
+ Assert.assertTrue(config.getSourceRunners().size() == 1);
+ Assert.assertTrue(config.getChannels().size() == 1);
+ Assert.assertTrue(config.getSinkRunners().size() == 0);
+ }
+
+ @Test
+ public void testSourceAndSinkThrowExceptionDuringConfiguration()
+ throws Exception {
+ String agentName = "agent1";
+ String sourceType = UnconfigurableSource.class.getName();
+ String channelType = "memory";
+ String sinkType = UnconfigurableSink.class.getName();
+ Map<String, String> properties = getProperties(agentName, sourceType,
+ channelType, sinkType);
+ MemoryConfigurationProvider provider =
+ new MemoryConfigurationProvider(agentName, properties);
+ MaterializedConfiguration config = provider.getConfiguration();
+ Assert.assertTrue(config.getSourceRunners().size() == 0);
+ Assert.assertTrue(config.getChannels().size() == 0);
+ Assert.assertTrue(config.getSinkRunners().size() == 0);
+ }
+
+ @Test
+ public void testSinkSourceMismatchDuringConfiguration() throws Exception {
+ String agentName = "agent1";
+ String sourceType = "seq";
+ String channelType = "memory";
+ String sinkType = "avro";
+ Map<String, String> properties = getProperties(agentName, sourceType,
+ channelType, sinkType);
+ properties.put(agentName + ".channels.channel1.capacity", "1000");
+ properties.put(agentName + ".channels.channel1.transactionCapacity", "1000");
+ properties.put(agentName + ".sources.source1.batchSize", "1000");
+ properties.put(agentName + ".sinks.sink1.batch-size", "1000");
+ properties.put(agentName + ".sinks.sink1.hostname", "10.10.10.10");
+ properties.put(agentName + ".sinks.sink1.port", "1010");
+
+ MemoryConfigurationProvider provider =
+ new MemoryConfigurationProvider(agentName, properties);
+ MaterializedConfiguration config = provider.getConfiguration();
+ Assert.assertTrue(config.getSourceRunners().size() == 1);
+ Assert.assertTrue(config.getChannels().size() == 1);
+ Assert.assertTrue(config.getSinkRunners().size() == 1);
+
+ properties.put(agentName + ".sources.source1.batchSize", "1001");
+ properties.put(agentName + ".sinks.sink1.batch-size", "1000");
+
+ provider = new MemoryConfigurationProvider(agentName, properties);
+ config = provider.getConfiguration();
+ Assert.assertTrue(config.getSourceRunners().size() == 0);
+ Assert.assertTrue(config.getChannels().size() == 1);
+ Assert.assertTrue(config.getSinkRunners().size() == 1);
+
+ properties.put(agentName + ".sources.source1.batchSize", "1000");
+ properties.put(agentName + ".sinks.sink1.batch-size", "1001");
+
+ provider = new MemoryConfigurationProvider(agentName, properties);
+ config = provider.getConfiguration();
+ Assert.assertTrue(config.getSourceRunners().size() == 1);
+ Assert.assertTrue(config.getChannels().size() == 1);
+ Assert.assertTrue(config.getSinkRunners().size() == 0);
+
+ properties.put(agentName + ".sources.source1.batchSize", "1001");
+ properties.put(agentName + ".sinks.sink1.batch-size", "1001");
+
+ provider = new MemoryConfigurationProvider(agentName, properties);
+ config = provider.getConfiguration();
+ Assert.assertTrue(config.getSourceRunners().size() == 0);
+ Assert.assertTrue(config.getChannels().size() == 0);
+ Assert.assertTrue(config.getSinkRunners().size() == 0);
+ }
+
+ private Map<String, String> getProperties(String agentName,
+ String sourceType, String channelType,
+ String sinkType) {
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put(agentName + ".sources", "source1");
+ properties.put(agentName + ".channels", "channel1");
+ properties.put(agentName + ".sinks", "sink1");
+ properties.put(agentName + ".sources.source1.type", sourceType);
+ properties.put(agentName + ".sources.source1.channels", "channel1");
+ properties.put(agentName + ".channels.channel1.type", channelType);
+ properties.put(agentName + ".channels.channel1.capacity", "100");
+ properties.put(agentName + ".sinks.sink1.type", sinkType);
+ properties.put(agentName + ".sinks.sink1.channel", "channel1");
+ return properties;
+ }
+
+ private Map<String, String> getPropertiesForChannel(String agentName, String channelType) {
+ return getProperties(agentName, "seq", channelType, "null");
+ }
+
+ public static class MemoryConfigurationProvider extends AbstractConfigurationProvider {
+ private Map<String, String> properties;
+
+ public MemoryConfigurationProvider(String agentName, Map<String, String> properties) {
+ super(agentName);
+ this.properties = properties;
+ }
+
+ public void setProperties(Map<String, String> properties) {
+ this.properties = properties;
+ }
+
+ @Override
+ protected FlumeConfiguration getFlumeConfiguration() {
+ return new FlumeConfiguration(properties);
+ }
+ }
+
+ @Disposable
+ public static class DisposableChannel extends AbstractChannel {
+ @Override
+ public void put(Event event) throws ChannelException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Event take() throws ChannelException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Transaction getTransaction() {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ @Recyclable
+ public static class RecyclableChannel extends AbstractChannel {
+ @Override
+ public void put(Event event) throws ChannelException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Event take() throws ChannelException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Transaction getTransaction() {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ public static class UnspecifiedChannel extends AbstractChannel {
+ @Override
+ public void put(Event event) throws ChannelException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Event take() throws ChannelException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Transaction getTransaction() {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ public static class UnconfigurableChannel extends AbstractChannel {
+ @Override
+ public void configure(Context context) {
+ throw new RuntimeException("expected");
+ }
+
+ @Override
+ public void put(Event event) throws ChannelException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Event take() throws ChannelException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Transaction getTransaction() {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ public static class UnconfigurableSource extends AbstractSource implements Configurable {
+ @Override
+ public void configure(Context context) {
+ throw new RuntimeException("expected");
+ }
+ }
+
+ public static class UnconfigurableSink extends AbstractSink implements Configurable {
+ @Override
+ public void configure(Context context) {
+ throw new RuntimeException("expected");
+ }
+
+ @Override
+ public Status process() throws EventDeliveryException {
+ throw new UnsupportedOperationException();
+ }
+ }
+}
diff --git a/pulsar-io/flume/src/test/java/org/apache/pulsar/io/flume/node/TestAbstractZooKeeperConfigurationProvider.java b/pulsar-io/flume/src/test/java/org/apache/pulsar/io/flume/node/TestAbstractZooKeeperConfigurationProvider.java
new file mode 100644
index 0000000..107e96a
--- /dev/null
+++ b/pulsar-io/flume/src/test/java/org/apache/pulsar/io/flume/node/TestAbstractZooKeeperConfigurationProvider.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.flume.node;
+
+import com.google.common.base.Charsets;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import junit.framework.Assert;
+import org.apache.commons.io.IOUtils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.curator.test.TestingServer;
+import org.apache.curator.utils.EnsurePath;
+import org.apache.flume.conf.FlumeConfiguration;
+import org.apache.flume.conf.FlumeConfigurationError;
+import org.junit.After;
+import org.junit.Before;
+
+import java.io.InputStreamReader;
+import java.io.Reader;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+public abstract class TestAbstractZooKeeperConfigurationProvider {
+
+ private static final String FLUME_CONF_FILE = "flume-conf.properties";
+
+ protected static final String AGENT_NAME = "a1";
+
+ protected static final String AGENT_PATH =
+ AbstractZooKeeperConfigurationProvider.DEFAULT_ZK_BASE_PATH + "/" + AGENT_NAME;
+
+ protected TestingServer zkServer;
+ protected CuratorFramework client;
+
+ @Before
+ public void setUp() throws Exception {
+ zkServer = new TestingServer();
+ client = CuratorFrameworkFactory
+ .newClient("localhost:" + zkServer.getPort(),
+ new ExponentialBackoffRetry(1000, 3));
+ client.start();
+
+ EnsurePath ensurePath = new EnsurePath(AGENT_PATH);
+ ensurePath.ensure(client.getZookeeperClient());
+ doSetUp();
+ }
+
+ protected abstract void doSetUp() throws Exception;
+
+ @After
+ public void tearDown() throws Exception {
+ doTearDown();
+ zkServer.close();
+ client.close();
+ }
+
+ protected abstract void doTearDown() throws Exception;
+
+ protected void addData() throws Exception {
+ Reader in = new InputStreamReader(getClass().getClassLoader()
+ .getResourceAsStream(FLUME_CONF_FILE), Charsets.UTF_8);
+ try {
+ String config = IOUtils.toString(in);
+ client.setData().forPath(AGENT_PATH, config.getBytes());
+ } finally {
+ in.close();
+ }
+ }
+
+ protected void verifyProperties(AbstractConfigurationProvider cp) {
+ FlumeConfiguration configuration = cp.getFlumeConfiguration();
+ Assert.assertNotNull(configuration);
+
+ /*
+ * Test the known errors in the file
+ */
+ List<String> expected = Lists.newArrayList();
+ expected.add("host5 CONFIG_ERROR");
+ expected.add("host5 INVALID_PROPERTY");
+ expected.add("host4 CONFIG_ERROR");
+ expected.add("host4 CONFIG_ERROR");
+ expected.add("host4 PROPERTY_VALUE_NULL");
+ expected.add("host4 PROPERTY_VALUE_NULL");
+ expected.add("host4 PROPERTY_VALUE_NULL");
+ expected.add("host4 AGENT_CONFIGURATION_INVALID");
+ expected.add("ch2 ATTRS_MISSING");
+ expected.add("host3 CONFIG_ERROR");
+ expected.add("host3 PROPERTY_VALUE_NULL");
+ expected.add("host3 AGENT_CONFIGURATION_INVALID");
+ expected.add("host2 PROPERTY_VALUE_NULL");
+ expected.add("host2 AGENT_CONFIGURATION_INVALID");
+ List<String> actual = Lists.newArrayList();
+ for (FlumeConfigurationError error : configuration.getConfigurationErrors()) {
+ actual.add(error.getComponentName() + " " + error.getErrorType().toString());
+ }
+ Collections.sort(expected);
+ Collections.sort(actual);
+ Assert.assertEquals(expected, actual);
+
+ FlumeConfiguration.AgentConfiguration agentConfiguration = configuration
+ .getConfigurationFor("host1");
+ Assert.assertNotNull(agentConfiguration);
+
+ Set<String> sources = Sets.newHashSet("source1");
+ Set<String> sinks = Sets.newHashSet("sink1");
+ Set<String> channels = Sets.newHashSet("channel1");
+
+ Assert.assertEquals(sources, agentConfiguration.getSourceSet());
+ Assert.assertEquals(sinks, agentConfiguration.getSinkSet());
+ Assert.assertEquals(channels, agentConfiguration.getChannelSet());
+ }
+}
diff --git a/pulsar-io/flume/src/test/java/org/apache/pulsar/io/flume/node/TestApplication.java b/pulsar-io/flume/src/test/java/org/apache/pulsar/io/flume/node/TestApplication.java
new file mode 100644
index 0000000..187ef67
--- /dev/null
+++ b/pulsar-io/flume/src/test/java/org/apache/pulsar/io/flume/node/TestApplication.java
@@ -0,0 +1,198 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.flume.node;
+
+import static org.mockito.Mockito.*;
+
+import java.io.File;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.flume.Channel;
+import org.apache.flume.SinkRunner;
+import org.apache.flume.SourceRunner;
+import org.apache.flume.lifecycle.LifecycleAware;
+import org.apache.flume.lifecycle.LifecycleState;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import com.google.common.collect.Lists;
+import com.google.common.eventbus.EventBus;
+import com.google.common.io.Files;
+
+public class TestApplication {
+
+ private File baseDir;
+
+ @Before
+ public void setup() throws Exception {
+ baseDir = Files.createTempDir();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ FileUtils.deleteDirectory(baseDir);
+ }
+
+ private <T extends LifecycleAware> T mockLifeCycle(Class<T> klass) {
+
+ T lifeCycleAware = mock(klass);
+
+ final AtomicReference<LifecycleState> state =
+ new AtomicReference<LifecycleState>();
+
+ state.set(LifecycleState.IDLE);
+
+ when(lifeCycleAware.getLifecycleState()).then(new Answer<LifecycleState>() {
+ @Override
+ public LifecycleState answer(InvocationOnMock invocation)
+ throws Throwable {
+ return state.get();
+ }
+ });
+
+ doAnswer(new Answer<Void>() {
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ state.set(LifecycleState.START);
+ return null;
+ }
+ }).when(lifeCycleAware).start();
+
+ doAnswer(new Answer<Void>() {
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ state.set(LifecycleState.STOP);
+ return null;
+ }
+ }).when(lifeCycleAware).stop();
+
+ return lifeCycleAware;
+ }
+
+ @Test
+ public void testBasicConfiguration() throws Exception {
+
+ EventBus eventBus = new EventBus("test-event-bus");
+
+ MaterializedConfiguration materializedConfiguration = new
+ SimpleMaterializedConfiguration();
+
+ SourceRunner sourceRunner = mockLifeCycle(SourceRunner.class);
+ materializedConfiguration.addSourceRunner("test", sourceRunner);
+
+ SinkRunner sinkRunner = mockLifeCycle(SinkRunner.class);
+ materializedConfiguration.addSinkRunner("test", sinkRunner);
+
+ Channel channel = mockLifeCycle(Channel.class);
+ materializedConfiguration.addChannel("test", channel);
+
+
+ ConfigurationProvider configurationProvider = mock(ConfigurationProvider.class);
+ when(configurationProvider.getConfiguration()).thenReturn(materializedConfiguration);
+
+ Application application = new Application();
+ eventBus.register(application);
+ eventBus.post(materializedConfiguration);
+ application.start();
+
+ Thread.sleep(1000L);
+
+ verify(sourceRunner).start();
+ verify(sinkRunner).start();
+ verify(channel).start();
+
+ application.stop();
+
+ Thread.sleep(1000L);
+
+ verify(sourceRunner).stop();
+ verify(sinkRunner).stop();
+ verify(channel).stop();
+ }
+
+ @Test
+ public void testFLUME1854() throws Exception {
+ File configFile = new File(baseDir, "flume-conf.properties");
+ Files.copy(new File(getClass().getClassLoader()
+ .getResource("flume-conf.properties").getFile()), configFile);
+ Random random = new Random();
+ for (int i = 0; i < 3; i++) {
+ EventBus eventBus = new EventBus("test-event-bus");
+ PollingPropertiesFileConfigurationProvider configurationProvider =
+ new PollingPropertiesFileConfigurationProvider("host1",
+ configFile, eventBus, 1);
+ List<LifecycleAware> components = Lists.newArrayList();
+ components.add(configurationProvider);
+ Application application = new Application(components);
+ eventBus.register(application);
+ application.start();
+ Thread.sleep(random.nextInt(10000));
+ application.stop();
+ }
+ }
+
+ @Test(timeout = 10000L)
+ public void testFLUME2786() throws Exception {
+ final String agentName = "test";
+ final int interval = 1;
+ final long intervalMs = 1000L;
+
+ File configFile = new File(baseDir, "flume-conf.properties");
+ Files.copy(new File(getClass().getClassLoader()
+ .getResource("flume-conf.properties.2786").getFile()), configFile);
+ File mockConfigFile = spy(configFile);
+ when(mockConfigFile.lastModified()).then(new Answer<Long>() {
+ @Override
+ public Long answer(InvocationOnMock invocation) throws Throwable {
+ Thread.sleep(intervalMs);
+ return System.currentTimeMillis();
+ }
+ });
+
+ EventBus eventBus = new EventBus(agentName + "-event-bus");
+ PollingPropertiesFileConfigurationProvider configurationProvider =
+ new PollingPropertiesFileConfigurationProvider(agentName,
+ mockConfigFile, eventBus, interval);
+ PollingPropertiesFileConfigurationProvider mockConfigurationProvider =
+ spy(configurationProvider);
+ doAnswer(new Answer<Void>() {
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ Thread.sleep(intervalMs);
+ invocation.callRealMethod();
+ return null;
+ }
+ }).when(mockConfigurationProvider).stop();
+
+ List<LifecycleAware> components = Lists.newArrayList();
+ components.add(mockConfigurationProvider);
+ Application application = new Application(components);
+ eventBus.register(application);
+ application.start();
+ Thread.sleep(1500L);
+ application.stop();
+ }
+
+}
diff --git a/pulsar-io/flume/src/test/java/org/apache/pulsar/io/flume/node/TestEnvVarResolverProperties.java b/pulsar-io/flume/src/test/java/org/apache/pulsar/io/flume/node/TestEnvVarResolverProperties.java
new file mode 100644
index 0000000..dc62c2b
--- /dev/null
+++ b/pulsar-io/flume/src/test/java/org/apache/pulsar/io/flume/node/TestEnvVarResolverProperties.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.flume.node;
+
+import junit.framework.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.contrib.java.lang.system.EnvironmentVariables;
+
+import java.io.File;
+
+public class TestEnvVarResolverProperties {
+ private static final File TESTFILE = new File(
+ TestEnvVarResolverProperties.class.getClassLoader()
+ .getResource("flume-conf-with-envvars.properties").getFile());
+
+ @Rule
+ public final EnvironmentVariables environmentVariables = new EnvironmentVariables();
+ private PropertiesFileConfigurationProvider provider;
+
+ @Before
+ public void setUp() throws Exception {
+ provider = new PropertiesFileConfigurationProvider("a1", TESTFILE);
+ }
+
+ @Test
+ public void resolveEnvVar() throws Exception {
+ environmentVariables.set("VARNAME", "varvalue");
+ String resolved = EnvVarResolverProperties.resolveEnvVars("padding ${VARNAME} padding");
+ Assert.assertEquals("padding varvalue padding", resolved);
+ }
+
+ @Test
+ public void resolveEnvVars() throws Exception {
+ environmentVariables.set("VARNAME1", "varvalue1");
+ environmentVariables.set("VARNAME2", "varvalue2");
+ String resolved = EnvVarResolverProperties
+ .resolveEnvVars("padding ${VARNAME1} ${VARNAME2} padding");
+ Assert.assertEquals("padding varvalue1 varvalue2 padding", resolved);
+ }
+
+ @Test
+ public void getProperty() throws Exception {
+ String NC_PORT = "6667";
+ environmentVariables.set("NC_PORT", NC_PORT);
+ System.setProperty("propertiesImplementation",
+ "org.apache.pulsar.io.flume.node.EnvVarResolverProperties");
+
+ Assert.assertEquals(NC_PORT, provider.getFlumeConfiguration()
+ .getConfigurationFor("a1")
+ .getSourceContext().get("r1").getParameters().get("port"));
+ }
+}
diff --git a/pulsar-io/flume/src/test/java/org/apache/pulsar/io/flume/node/TestPollingPropertiesFileConfigurationProvider.java b/pulsar-io/flume/src/test/java/org/apache/pulsar/io/flume/node/TestPollingPropertiesFileConfigurationProvider.java
new file mode 100644
index 0000000..4f559b5
--- /dev/null
+++ b/pulsar-io/flume/src/test/java/org/apache/pulsar/io/flume/node/TestPollingPropertiesFileConfigurationProvider.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.flume.node;
+
+import java.io.File;
+import java.util.List;
+
+import junit.framework.Assert;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.flume.lifecycle.LifecycleController;
+import org.apache.flume.lifecycle.LifecycleState;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+import com.google.common.eventbus.EventBus;
+import com.google.common.eventbus.Subscribe;
+import com.google.common.io.Files;
+
+public class TestPollingPropertiesFileConfigurationProvider {
+
+ private static final File TESTFILE = new File(
+ TestPollingPropertiesFileConfigurationProvider.class.getClassLoader()
+ .getResource("flume-conf.properties").getFile());
+
+ private PollingPropertiesFileConfigurationProvider provider;
+ private File baseDir;
+ private File configFile;
+ private EventBus eventBus;
+
+ @Before
+ public void setUp() throws Exception {
+
+ baseDir = Files.createTempDir();
+
+ configFile = new File(baseDir, TESTFILE.getName());
+ Files.copy(TESTFILE, configFile);
+
+ eventBus = new EventBus("test");
+ provider =
+ new PollingPropertiesFileConfigurationProvider("host1",
+ configFile, eventBus, 1);
+ provider.start();
+ LifecycleController.waitForOneOf(provider, LifecycleState.START_OR_ERROR);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ FileUtils.deleteDirectory(baseDir);
+ provider.stop();
+ }
+
+ @Test
+ public void testPolling() throws Exception {
+
+ // let first event fire
+ Thread.sleep(2000L);
+
+ final List<MaterializedConfiguration> events = Lists.newArrayList();
+
+ Object eventHandler = new Object() {
+ @Subscribe
+ public synchronized void handleConfigurationEvent(MaterializedConfiguration event) {
+ events.add(event);
+ }
+ };
+ eventBus.register(eventHandler);
+ configFile.setLastModified(System.currentTimeMillis());
+
+ // now wait for second event to fire
+ Thread.sleep(2000L);
+
+ Assert.assertEquals(String.valueOf(events), 1, events.size());
+
+ MaterializedConfiguration materializedConfiguration = events.remove(0);
+
+ Assert.assertEquals(1, materializedConfiguration.getSourceRunners().size());
+ Assert.assertEquals(1, materializedConfiguration.getSinkRunners().size());
+ Assert.assertEquals(1, materializedConfiguration.getChannels().size());
+
+
+ }
+}
diff --git a/pulsar-io/flume/src/test/java/org/apache/pulsar/io/flume/node/TestPollingZooKeeperConfigurationProvider.java b/pulsar-io/flume/src/test/java/org/apache/pulsar/io/flume/node/TestPollingZooKeeperConfigurationProvider.java
new file mode 100644
index 0000000..7191d33
--- /dev/null
+++ b/pulsar-io/flume/src/test/java/org/apache/pulsar/io/flume/node/TestPollingZooKeeperConfigurationProvider.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.flume.node;
+
+import junit.framework.Assert;
+
+import org.apache.flume.conf.FlumeConfiguration;
+import org.apache.flume.conf.FlumeConfiguration.AgentConfiguration;
+import org.apache.flume.lifecycle.LifecycleController;
+import org.apache.flume.lifecycle.LifecycleState;
+import org.junit.Test;
+
+import com.google.common.eventbus.EventBus;
+import com.google.common.eventbus.Subscribe;
+
+public class TestPollingZooKeeperConfigurationProvider extends
+ TestAbstractZooKeeperConfigurationProvider {
+
+ private EventBus eb;
+
+ private EventSync es;
+
+ private PollingZooKeeperConfigurationProvider cp;
+
+ private class EventSync {
+
+ private boolean notified;
+
+ @Subscribe
+ public synchronized void notifyEvent(MaterializedConfiguration mConfig) {
+ notified = true;
+ notifyAll();
+ }
+
+ public synchronized void awaitEvent() throws InterruptedException {
+ while (!notified) {
+ wait();
+ }
+ }
+
+ public synchronized void reset() {
+ notified = false;
+ }
+ }
+
+ @Override
+ protected void doSetUp() throws Exception {
+ eb = new EventBus("test");
+ es = new EventSync();
+ es.reset();
+ eb.register(es);
+ cp = new PollingZooKeeperConfigurationProvider(AGENT_NAME, "localhost:"
+ + zkServer.getPort(), null, eb);
+ cp.start();
+ LifecycleController.waitForOneOf(cp, LifecycleState.START_OR_ERROR);
+ }
+
+ @Override
+ protected void doTearDown() throws Exception {
+ // do nothing
+ }
+
+ @Test
+ public void testPolling() throws Exception {
+ es.awaitEvent();
+ es.reset();
+
+ FlumeConfiguration fc = cp.getFlumeConfiguration();
+ Assert.assertTrue(fc.getConfigurationErrors().isEmpty());
+ AgentConfiguration ac = fc.getConfigurationFor(AGENT_NAME);
+ Assert.assertNull(ac);
+
+ addData();
+ es.awaitEvent();
+ es.reset();
+
+ verifyProperties(cp);
+ }
+}
diff --git a/pulsar-io/flume/src/test/java/org/apache/pulsar/io/flume/node/TestPropertiesFileConfigurationProvider.java b/pulsar-io/flume/src/test/java/org/apache/pulsar/io/flume/node/TestPropertiesFileConfigurationProvider.java
new file mode 100644
index 0000000..7eab211
--- /dev/null
+++ b/pulsar-io/flume/src/test/java/org/apache/pulsar/io/flume/node/TestPropertiesFileConfigurationProvider.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.flume.node;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import junit.framework.Assert;
+import org.apache.flume.conf.FlumeConfiguration;
+import org.apache.flume.conf.FlumeConfiguration.AgentConfiguration;
+import org.apache.flume.conf.FlumeConfigurationError;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+public class TestPropertiesFileConfigurationProvider {
+
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(TestPropertiesFileConfigurationProvider.class);
+
+ private static final File TESTFILE = new File(
+ TestPropertiesFileConfigurationProvider.class.getClassLoader()
+ .getResource("flume-conf.properties").getFile());
+
+ private PropertiesFileConfigurationProvider provider;
+
+ @Before
+ public void setUp() throws Exception {
+ provider = new PropertiesFileConfigurationProvider("test", TESTFILE);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+
+ }
+
+ @Test
+ public void testPropertyRead() throws Exception {
+
+ FlumeConfiguration configuration = provider.getFlumeConfiguration();
+ Assert.assertNotNull(configuration);
+
+ /*
+ * Test the known errors in the file
+ */
+ List<String> expected = Lists.newArrayList();
+ expected.add("host5 CONFIG_ERROR");
+ expected.add("host5 INVALID_PROPERTY");
+ expected.add("host4 CONFIG_ERROR");
+ expected.add("host4 CONFIG_ERROR");
+ expected.add("host4 PROPERTY_VALUE_NULL");
+ expected.add("host4 PROPERTY_VALUE_NULL");
+ expected.add("host4 PROPERTY_VALUE_NULL");
+ expected.add("host4 AGENT_CONFIGURATION_INVALID");
+ expected.add("ch2 ATTRS_MISSING");
+ expected.add("host3 CONFIG_ERROR");
+ expected.add("host3 PROPERTY_VALUE_NULL");
+ expected.add("host3 AGENT_CONFIGURATION_INVALID");
+ expected.add("host2 PROPERTY_VALUE_NULL");
+ expected.add("host2 AGENT_CONFIGURATION_INVALID");
+ List<String> actual = Lists.newArrayList();
+ for (FlumeConfigurationError error : configuration.getConfigurationErrors()) {
+ actual.add(error.getComponentName() + " " + error.getErrorType().toString());
+ }
+ Collections.sort(expected);
+ Collections.sort(actual);
+ Assert.assertEquals(expected, actual);
+
+ AgentConfiguration agentConfiguration =
+ configuration.getConfigurationFor("host1");
+ Assert.assertNotNull(agentConfiguration);
+
+ LOGGER.info(agentConfiguration.getPrevalidationConfig());
+ LOGGER.info(agentConfiguration.getPostvalidationConfig());
+
+ Set<String> sources = Sets.newHashSet("source1");
+ Set<String> sinks = Sets.newHashSet("sink1");
+ Set<String> channels = Sets.newHashSet("channel1");
+
+ Assert.assertEquals(sources, agentConfiguration.getSourceSet());
+ Assert.assertEquals(sinks, agentConfiguration.getSinkSet());
+ Assert.assertEquals(channels, agentConfiguration.getChannelSet());
+ }
+}
diff --git a/pulsar-io/flume/src/test/java/org/apache/pulsar/io/flume/node/TestStaticZooKeeperConfigurationProvider.java b/pulsar-io/flume/src/test/java/org/apache/pulsar/io/flume/node/TestStaticZooKeeperConfigurationProvider.java
new file mode 100644
index 0000000..79bd38d
--- /dev/null
+++ b/pulsar-io/flume/src/test/java/org/apache/pulsar/io/flume/node/TestStaticZooKeeperConfigurationProvider.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.flume.node;
+
+import org.junit.Test;
+
+public class TestStaticZooKeeperConfigurationProvider extends
+ TestAbstractZooKeeperConfigurationProvider {
+
+ private StaticZooKeeperConfigurationProvider configurationProvider;
+
+ @Override
+ protected void doSetUp() throws Exception {
+ addData();
+ configurationProvider = new StaticZooKeeperConfigurationProvider(
+ AGENT_NAME, "localhost:" + zkServer.getPort(), null);
+ }
+
+ @Override
+ protected void doTearDown() throws Exception {
+ // do nothing
+ }
+
+ @Test
+ public void testPropertyRead() throws Exception {
+ verifyProperties(configurationProvider);
+ }
+}
\ No newline at end of file
diff --git a/pulsar-io/flume/src/test/java/org/apache/pulsar/io/flume/sink/StringSinkTests.java b/pulsar-io/flume/src/test/java/org/apache/pulsar/io/flume/sink/StringSinkTests.java
new file mode 100644
index 0000000..714deac
--- /dev/null
+++ b/pulsar-io/flume/src/test/java/org/apache/pulsar/io/flume/sink/StringSinkTests.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.flume.sink;
+
+import com.google.common.collect.Maps;
+import org.apache.flume.*;
+import org.apache.flume.channel.ChannelProcessor;
+import org.apache.flume.channel.MemoryChannel;
+import org.apache.flume.channel.ReplicatingChannelSelector;
+import org.apache.flume.conf.Configurables;
+import org.apache.flume.source.AvroSource;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.SinkContext;
+import org.apache.pulsar.io.flume.AbstractFlumeTests;
+import org.mockito.Mock;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+import org.junit.Assert;
+
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.mockito.Mockito.*;
+
+public class StringSinkTests extends AbstractFlumeTests {
+
+ @Mock
+ protected SinkContext mockSinkContext;
+
+ @Mock
+ protected Record<String> mockRecord;
+
+
+ private AvroSource source;
+ private Channel channel;
+ private InetAddress localhost;
+
+ @BeforeMethod
+ public void setUp() throws Exception {
+ mockRecord = mock(Record.class);
+ mockSinkContext = mock(SinkContext.class);
+ localhost = InetAddress.getByName("127.0.0.1");
+ source = new AvroSource();
+ channel = new MemoryChannel();
+ Context context = new Context();
+ context.put("port", String.valueOf(44444));
+ context.put("bind", "0.0.0.0");
+
+ Configurables.configure(source, context);
+ Configurables.configure(channel, context);
+
+ List<Channel> channels = new ArrayList<Channel>();
+ channels.add(channel);
+
+ ChannelSelector rcs = new ReplicatingChannelSelector();
+ rcs.setChannels(channels);
+
+ source.setChannelProcessor(new ChannelProcessor(rcs));
+
+ source.start();
+
+ when(mockRecord.getKey()).thenAnswer(new Answer<Optional<String>>() {
+ long sequenceCounter = 0;
+
+ public Optional<String> answer(InvocationOnMock invocation) throws Throwable {
+ return Optional.of("key-" + sequenceCounter++);
+ }
+ });
+
+ when(mockRecord.getValue()).thenAnswer(new Answer<String>() {
+ long sequenceCounter = 0;
+
+ public String answer(InvocationOnMock invocation) throws Throwable {
+ return new String("value-" + sequenceCounter++);
+ }
+ });
+ }
+
+ @AfterMethod
+ public void tearDown() throws Exception {
+ source.stop();
+ }
+
+ protected final void send(StringSink stringSink, int numRecords) throws Exception {
+ for (int idx = 0; idx < numRecords; idx++) {
+ stringSink.write(mockRecord);
+ }
+ }
+
+ @Test
+ public void TestOpenAndWriteSink() throws Exception {
+ Map<String, Object> conf = Maps.newHashMap();
+ StringSink stringSink = new StringSink();
+ conf.put("name", "a1");
+ conf.put("confFile", "./src/test/resources/flume/source.conf");
+ conf.put("noReloadConf", false);
+ conf.put("zkConnString", "");
+ conf.put("zkBasePath", "");
+ stringSink.open(conf, mockSinkContext);
+ send(stringSink, 100);
+
+ Thread.sleep(3 * 1000);
+ Transaction transaction = channel.getTransaction();
+ transaction.begin();
+ Event event = channel.take();
+
+ Assert.assertNotNull(event);
+ Assert.assertNotNull(mockRecord);
+
+ verify(mockRecord, times(100)).ack();
+ transaction.commit();
+ transaction.close();
+ }
+}
diff --git a/pulsar-io/flume/src/test/java/org/apache/pulsar/io/flume/source/StringSourceTests.java b/pulsar-io/flume/src/test/java/org/apache/pulsar/io/flume/source/StringSourceTests.java
new file mode 100644
index 0000000..8a3ebda
--- /dev/null
+++ b/pulsar-io/flume/src/test/java/org/apache/pulsar/io/flume/source/StringSourceTests.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.flume.source;
+
+import com.google.common.base.Charsets;
+import com.google.common.collect.Maps;
+import org.apache.flume.*;
+import org.apache.flume.conf.Configurables;
+import org.apache.flume.event.EventBuilder;
+import org.apache.pulsar.io.core.SourceContext;
+import org.mockito.Mock;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.apache.flume.sink.AvroSink;
+import org.apache.flume.channel.MemoryChannel;
+import org.junit.Assert;
+
+import java.util.Map;
+
+import org.apache.pulsar.io.flume.AbstractFlumeTests;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.*;
+
+public class StringSourceTests extends AbstractFlumeTests {
+
+ private AvroSink sink;
+
+ private Channel channel;
+
+ @Mock
+ private SourceContext mockSourceContext;
+
+ @BeforeMethod
+ public void setUp() throws Exception {
+ if (sink != null) {
+ throw new RuntimeException("double setup");
+ }
+ Context context = new Context();
+ context.put("hostname", "127.0.0.1");
+ context.put("port", "44444");
+ context.put("batch-size", String.valueOf(2));
+ context.put("connect-timeout", String.valueOf(2000L));
+ context.put("request-timeout", String.valueOf(3000L));
+ sink = new AvroSink();
+ channel = new MemoryChannel();
+ sink.setChannel(channel);
+ Configurables.configure(sink, context);
+ Configurables.configure(channel, context);
+
+ mockSourceContext = mock(SourceContext.class);
+ }
+
+ @AfterMethod
+ public void tearDown() throws Exception {
+ sink.stop();
+ }
+
+
+ @Test
+ public void TestOpenAndReadSource() throws Exception {
+ Map<String, Object> conf = Maps.newHashMap();
+ StringSource stringSource = new StringSource();
+ conf.put("name", "a1");
+ conf.put("confFile", "./src/test/resources/flume/sink.conf");
+ conf.put("noReloadConf", false);
+ conf.put("zkConnString", "");
+ conf.put("zkBasePath", "");
+ Event event = EventBuilder.withBody("test event 1", Charsets.UTF_8);
+ stringSource.open(conf, mockSourceContext);
+ Thread.sleep(3 * 1000);
+ sink.start();
+ Transaction transaction = channel.getTransaction();
+
+ transaction.begin();
+ for (int i = 0; i < 10; i++) {
+ channel.put(event);
+ }
+ transaction.commit();
+ transaction.close();
+
+ for (int i = 0; i < 5; i++) {
+ Sink.Status status = sink.process();
+ Assert.assertEquals(Sink.Status.READY, status);
+ }
+
+ Assert.assertEquals(Sink.Status.BACKOFF, sink.process());
+ stringSource.close();
+ }
+}
diff --git a/pulsar-io/flume/src/test/resources/flume-conf-with-envvars.properties b/pulsar-io/flume/src/test/resources/flume-conf-with-envvars.properties
new file mode 100644
index 0000000..bc73d41
--- /dev/null
+++ b/pulsar-io/flume/src/test/resources/flume-conf-with-envvars.properties
@@ -0,0 +1,36 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+a1.sources = r1
+a1.sources.r1.type = netcat
+a1.sources.r1.bind = 0.0.0.0
+a1.sources.r1.port = ${NC_PORT}
+a1.sources.r1.channels = c1
+
+a1.channels = c1
+a1.channels.c1.type = memory
+a1.channels.c1.capacity = 10000
+a1.channels.c1.transactionCapacity = 10000
+a1.channels.c1.byteCapacityBufferPercentage = 20
+a1.channels.c1.byteCapacity = 800000
+
+a1.channels = c1
+a1.sinks = k1
+a1.sinks.k1.type = logger
+a1.sinks.k1.channel = c1
diff --git a/pulsar-io/flume/src/test/resources/flume-conf.properties b/pulsar-io/flume/src/test/resources/flume-conf.properties
new file mode 100644
index 0000000..744314d
--- /dev/null
+++ b/pulsar-io/flume/src/test/resources/flume-conf.properties
@@ -0,0 +1,115 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+#
+# Flume Configuration
+# This file contains configuration for one Agent identified as host1.
+# This file also contains invalid configuration for few agents
+# host2, host3 etc.
+#
+
+host1.sources = source1
+host1.channels = channel1
+host1.sinks = sink1
+
+# avroSource configuration
+host1.sources.source1.type = seq
+host1.sources.source1.channels = channel1
+
+# memChannel1 configuration
+host1.channels.channel1.type = memory
+host1.channels.channel1.capacity = 10000
+
+
+# hdfsSink configuration
+host1.sinks.sink1.type = null
+host1.sinks.sink1.channel = channel1
+
+#
+# Agent configuration for host2 - invalid because channels is not
+# defined.
+#
+host2.sources = src1
+host2.sinks = sink1
+
+host2.sources.src1.type = foo
+host2.sources.src1.runner = xxx
+host2.sources.src1.runner.type = ttt
+host2.sinks.sink1.type = bar
+host2.sinks.sink1.runner = yyy
+host2.sinks.sink1.runner.type = yyy
+
+#
+# Agent configuration for host3 - invalid because the effective set of
+# channels is 0 since configured ones are not active, and active ones are
+# not configured.
+#
+host3.sources = src1 src2
+host3.channels = ch1 ch2
+
+host3.sources.src1.type = foo
+host3.sources.src1.runner.type = x
+host3.sources.src1.channels = ch1 ch3
+
+host3.channels.ch2.foo = bar
+host3.channels.ch3.type = foo
+host3.channels.ch3.xxx = yyy
+
+#
+# Agent configuration for host4 - invalid, same as host3 except that this
+# time one channel configuration is valid but no sources or sinks are
+# configured correctly.
+#
+host4.sources = src2
+host4.channels = ch1 ch2
+
+host4.sources.src1.type = foo
+host4.sources.src1.runner.type = x
+host4.sources.src1.channels = ch1 ch2
+
+host4.channels.ch2.foo = bar
+host4.channels.ch2.type = abc
+host4.channels.ch3.type = foo
+host4.channels.ch3.xxx = yyy
+
+#
+# Agent configuration for host5 - valid using a sinkgroup with a failover processor
+# One of the sinks isn't properly configured but the group should let it fail and drop down
+# to two sinks
+#
+
+host5.sources = src1
+host5.channels = ch1
+host5.sinks = sink1 sink2 sink3
+host5.sinkgroups = sg1
+
+host5.channels.ch1.type = abc
+
+host5.sources.src1.type = def
+host5.sources.src1.channels = ch1
+
+host5.sinks.sink1.type = foo
+host5.sinks.sink1.channel = ch1
+host5.sinks.sink2.type = bar
+host5.sinks.sink2.channel = ch1
+
+host5.sinkgroups.sg1.sinks = sink1 sink2 sink3
+host5.sinkgroups.sg1.policy.type = failover
+host5.sinkgroups.sg1.policy.priority.sink1 = 1
+host5.sinkgroups.sg1.policy.priority.sink2 = 2
diff --git a/pulsar-io/flume/src/test/resources/flume-conf.properties.2786 b/pulsar-io/flume/src/test/resources/flume-conf.properties.2786
new file mode 100755
index 0000000..2a7bea0
--- /dev/null
+++ b/pulsar-io/flume/src/test/resources/flume-conf.properties.2786
@@ -0,0 +1,35 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+#
+# Flume Configuration for testing FLUME-2786
+#
+
+test.sources = source1
+test.channels = channel1
+test.sinks = sink1
+
+test.sources.source1.type = seq
+test.sources.source1.totalEvents = 10000
+test.sources.source1.channels = channel1
+
+test.channels.channel1.type = memory
+test.channels.channel1.capacity = 10000
+
+test.sinks.sink1.type = null
+test.sinks.sink1.channel = channel1
diff --git a/pulsar-io/flume/src/test/resources/flume/sink.conf b/pulsar-io/flume/src/test/resources/flume/sink.conf
new file mode 100644
index 0000000..e45e6f4
--- /dev/null
+++ b/pulsar-io/flume/src/test/resources/flume/sink.conf
@@ -0,0 +1,41 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# example.conf: A single-node Flume configuration
+
+# Name the components on this agent
+a1.sources = r1
+a1.sinks = k1
+a1.channels = c1
+
+# Describe/configure the source
+a1.sources.r1.type = avro
+a1.sources.r1.bind = 127.0.0.1
+a1.sources.r1.port = 44444
+
+# Describe the sink
+a1.sinks.k1.type = org.apache.pulsar.io.flume.source.SinkOfFlume
+# Use a channel which buffers events in memory
+a1.channels.c1.type = memory
+a1.channels.c1.capacity = 1000
+a1.channels.c1.transactionCapacity = 1000
+
+# Bind the source and sink to the channel
+a1.sources.r1.channels = c1
+a1.sinks.k1.channel = c1
\ No newline at end of file
diff --git a/pulsar-io/flume/src/test/resources/flume/source.conf b/pulsar-io/flume/src/test/resources/flume/source.conf
new file mode 100644
index 0000000..93c713b
--- /dev/null
+++ b/pulsar-io/flume/src/test/resources/flume/source.conf
@@ -0,0 +1,41 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# example.conf: A single-node Flume configuration
+
+# Name the components on this agent
+a1.sources = r1
+a1.sinks = k1
+a1.channels = c1
+
+# Describe/configure the source
+a1.sources.r1.type = org.apache.pulsar.io.flume.sink.SourceOfFlume
+
+# Describe the sink
+a1.sinks.k1.type = avro
+a1.sinks.k1.hostname = 127.0.0.1
+a1.sinks.k1.port = 44444
+# Use a channel which buffers events in memory
+a1.channels.c1.type = memory
+a1.channels.c1.capacity = 1000
+a1.channels.c1.transactionCapacity = 1000
+
+# Bind the source and sink to the channel
+a1.sources.r1.channels = c1
+a1.sinks.k1.channel = c1
\ No newline at end of file
diff --git a/pulsar-io/flume/src/test/resources/log4j.properties b/pulsar-io/flume/src/test/resources/log4j.properties
new file mode 100644
index 0000000..a98acea
--- /dev/null
+++ b/pulsar-io/flume/src/test/resources/log4j.properties
@@ -0,0 +1,26 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+log4j.rootCategory = INFO, out
+
+log4j.appender.out = org.apache.log4j.ConsoleAppender
+log4j.appender.out.layout = org.apache.log4j.PatternLayout
+log4j.appender.out.layout.ConversionPattern = %d (%t) [%p - %l] %m%n
+
+log4j.logger.org.apache.flume = DEBUG
diff --git a/pulsar-io/pom.xml b/pulsar-io/pom.xml
index 36a770b..ffd1e67 100644
--- a/pulsar-io/pom.xml
+++ b/pulsar-io/pom.xml
@@ -52,6 +52,7 @@
<module>netty</module>
<module>hbase</module>
<module>mongo</module>
+ <module>flume</module>
</modules>
</project>