A basic flume configuration can be found in src/test/resources/flume/conf/flume_simple.conf
.
A flume configuration using discovery service can be found in src/test/resources/flume/conf/flume_zkdiscovery.conf
.
Configuration files should be placed in flumes ‘conf’ directory and will be explicitly selected when running flume-ng
In the configuration file set org.apache.apex.malhar.flume.sink.FlumeSink
for the type
and org.apache.apex.malhar.flume.storage.HDFSStorage
for the storage,
as well as a HDFS directory for baseDir
. The HDFS base directory needs to be created on HDFS.
For discovery set org.apache.apex.malhar.flume.discovery.ZKAssistedDiscovery
for each sink and configure them to use the zookeeper service by adding the zookeeper address in connectionString
as well as a basePath
. These values also need to be set for ZKListener in the apex application.
An implementation of AbstractFlumeInputOperator can either simply connect to one flume sink or use discovery/zookeeper to detect flume sinks automatically and partition the operator accordingly at the beginning.
Implement abstract method to convert the Flume event to tuple:
public abstract T convert(Event event);
Additionally a StreamCodec for Flume events must be set. A codec implementation can be found in storage/EventCodec.java
setCodec(new EventCodec());
See ApplicationDiscoveryTest.FlumeInputOperator
for an example operator implementation
For a simple connection to only one flume sink set the connection address in the form of sinkid:host:port
:
public void setConnectAddresses(String[] specs)
For a flume input operator to discover flume sinks and partition accordingly a zookeeper service needs to be set up.
An implementation of AbstractFlumeInputOperator needs to initialize a ZKStatsListener. It additionally needs to override definePartitions to setup ZKStatsListener, discover addresses using discover() and set them in discoveredFlumeSinks before calling the parents definePartitions method.
See src/test/java/org/apache/apex/malhar/flume/integration/ApplicationDiscoveryTest.java
and src/test/java/org/apache/apex/malhar/flume/integration/ApplicationTest.java
for test implementations.