Flume source
diff --git a/flume/pom.xml b/flume/pom.xml
new file mode 100644
index 0000000..ade05a0
--- /dev/null
+++ b/flume/pom.xml
@@ -0,0 +1,275 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Copyright (c) 2016 DataTorrent, Inc. ALL Rights Reserved.
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <artifactId>dt-megh</artifactId>
+ <groupId>com.datatorrent</groupId>
+ <version>3.6.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>dt-flume</artifactId>
+ <packaging>jar</packaging>
+ <name>DataTorrent Flume Integration</name>
+
+ <profiles>
+ <profile>
+ <id>release</id>
+ <properties>
+ <package.username>flume</package.username>
+ <rpm.skip>package</rpm.skip>
+ <rpm.phase>${rpm.skip}</rpm.phase>
+ </properties>
+ <activation>
+ <activeByDefault>false</activeByDefault>
+ </activation>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>rpm-maven-plugin</artifactId>
+ <version>2.1-alpha-4</version>
+ <executions>
+ <execution>
+ <phase>${rpm.phase}</phase>
+ <id>generate-sink-rpm</id>
+ <goals>
+ <goal>attached-rpm</goal>
+ </goals>
+ <configuration>
+ <license>Copyright © 2014 DataTorrent, Inc.</license>
+ <version>${parsedVersion.majorVersion}.${parsedVersion.minorVersion}.${parsedVersion.incrementalVersion}</version>
+ <release>${parsedVersion.qualifier}${parsedVersion.buildNumber}</release>
+ <workarea>target/sink-rpm</workarea>
+ <classifier>sink</classifier>
+ <name>datatorrent-flume-sink</name>
+ <distribution>DataTorrent Enterprise ${project.version}</distribution>
+ <group>Messaging Client Support</group>
+ <icon>src/main/resources/logo.gif</icon>
+ <packager>DataTorrent Build System</packager>
+ <prefix>${package.prefix}</prefix>
+ <changelogFile>src/changelog</changelogFile>
+ <defineStatements>
+ <defineStatement>_unpackaged_files_terminate_build 0</defineStatement>
+ </defineStatements>
+ <mappings>
+ <mapping>
+ <directory>${package.prefix}/flume-${project.version}/lib</directory>
+ <filemode>750</filemode>
+ <username>${package.username}</username>
+ <groupname>${package.groupname}</groupname>
+ <artifact></artifact>
+ <dependency>
+ <includes>
+ <include>org.apache.apex:apex-api:jar:${apex.core.version}</include>
+ <include>com.datatorrent:dt-netlet:jar:1.2.0</include>
+ <include>org.apache.apex:apex-common:jar:${apex.core.version}</include>
+ <include>com.esotericsoftware.kryo:kryo:jar:2.24.0</include>
+ <include>com.esotericsoftware.minlog:minlog:jar:1.2</include>
+ <include>org.objenesis:objenesis:jar:2.1</include>
+ <include>org.apache.curator:curator-client:jar:2.3.0</include>
+ <include>org.apache.curator:curator-x-discovery:jar:2.3.0</include>
+ <include>org.apache.curator:curator-framework:jar:2.3.0</include>
+ </includes>
+ </dependency>
+ </mapping>
+ <mapping>
+ <directory>${package.prefix}/flume-${project.version}/conf</directory>
+ <configuration>true</configuration>
+ <filemode>640</filemode>
+ <username>${package.username}</username>
+ <groupname>${package.groupname}</groupname>
+ <sources>
+ <source>
+ <location>src/main/resources/flume-conf</location>
+ </source>
+ </sources>
+ </mapping>
+ </mappings>
+ <preinstallScriptlet>
+ <script>groupadd -f ${package.groupname} && id ${package.username} >/dev/null 2>&1 && usermod -aG ${package.groupname} ${package.username} || useradd -g ${package.groupname} ${package.username}</script>
+ </preinstallScriptlet>
+ </configuration>
+ </execution>
+
+ <execution>
+ <phase>${rpm.phase}</phase>
+ <id>generate-operator-rpm</id>
+ <goals>
+ <goal>attached-rpm</goal>
+ </goals>
+ <configuration>
+ <version>${parsedVersion.majorVersion}.${parsedVersion.minorVersion}.${parsedVersion.incrementalVersion}</version>
+ <license>Copyright © 2014 DataTorrent, Inc.</license>
+ <release>${parsedVersion.qualifier}${parsedVersion.buildNumber}</release>
+ <workarea>target/operator-rpm</workarea>
+ <classifier>operator</classifier>
+ <name>datatorrent-flume-operator</name>
+ <distribution>DataTorrent Enterprise ${project.version}</distribution>
+ <group>Messaging Client Support</group>
+ <icon>src/main/resources/logo.gif</icon>
+ <packager>DataTorrent Build System</packager>
+ <prefix>${package.prefix}</prefix>
+ <changelogFile>src/changelog</changelogFile>
+ <description>${rpm.release}</description>
+ <defineStatements>
+ <defineStatement>_unpackaged_files_terminate_build 0</defineStatement>
+ </defineStatements>
+ <mappings>
+ <mapping>
+ <directory>${package.prefix}/flume-operator-${project.version}/lib</directory>
+ <filemode>640</filemode>
+ <username>${package.username}</username>
+ <groupname>${package.groupname}</groupname>
+ <artifact></artifact>
+ <dependency>
+ <includes>
+ <include>org.apache.curator:curator-client:jar:2.3.0</include>
+ <include>org.apache.curator:curator-x-discovery:jar:2.3.0</include>
+ <include>org.apache.curator:curator-framework:jar:2.3.0</include>
+ <include>org.apache.flume:flume-ng-sdk:jar:1.5.0</include>
+ <include>org.apache.flume:flume-ng-core:jar:1.5.0</include>
+ <include>org.apache.flume:flume-ng-configuration:jar:1.5.0</include>
+ </includes>
+ </dependency>
+ </mapping>
+ </mappings>
+ <preinstallScriptlet>
+ <script>groupadd -f ${package.groupname} && id ${package.username} >/dev/null 2>&1 && usermod -aG ${package.groupname} ${package.username} || useradd -g ${package.groupname} ${package.username}</script>
+ </preinstallScriptlet>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ </profiles>
+
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <version>2.16</version>
+ <configuration>
+ <argLine>-Xmx5000M</argLine>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ <dependencies>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <version>1.7.2</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.apex</groupId>
+ <artifactId>apex-common</artifactId>
+ <version>${apex.core.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flume</groupId>
+ <artifactId>flume-ng-core</artifactId>
+ <version>1.5.0</version>
+ <exclusions>
+ <exclusion>
+ <!-- Curator requires later version of Guava -->
+ <artifactId>guava</artifactId>
+ <groupId>com.google.guava</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>jackson-core-asl</artifactId>
+ <groupId>org.codehaus.jackson</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>jackson-mapper-asl</artifactId>
+ <groupId>org.codehaus.jackson</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>jetty</artifactId>
+ <groupId>org.mortbay.jetty</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>jetty-util</artifactId>
+ <groupId>org.mortbay.jetty</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>commons-codec</artifactId>
+ <groupId>commons-codec</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>commons-io</artifactId>
+ <groupId>commons-io</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>commons-lang</artifactId>
+ <groupId>commons-lang</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.apex</groupId>
+ <artifactId>apex-api</artifactId>
+ <version>${apex.core.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.datatorrent</groupId>
+ <artifactId>netlet</artifactId>
+ <version>1.2.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-x-discovery</artifactId>
+ <version>2.3.0</version>
+ <exclusions>
+ <exclusion>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.codehaus.jackson</groupId>
+ <artifactId>jackson-core-asl</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.codehaus.jackson</groupId>
+ <artifactId>jackson-mapper-asl</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.codehaus.jackson</groupId>
+ <artifactId>jackson-mapper-asl</artifactId>
+ <version>${jackson.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.codehaus.jackson</groupId>
+ <artifactId>jackson-core-asl</artifactId>
+ <version>${jackson.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>11.0.2</version>
+ </dependency>
+ </dependencies>
+</project>
diff --git a/flume/src/main/java/com/datatorrent/flume/discovery/Discovery.java b/flume/src/main/java/com/datatorrent/flume/discovery/Discovery.java
new file mode 100644
index 0000000..d802002
--- /dev/null
+++ b/flume/src/main/java/com/datatorrent/flume/discovery/Discovery.java
@@ -0,0 +1,68 @@
+/**
+ * Copyright (c) 2016 DataTorrent, Inc. ALL Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.flume.discovery;
+
+import java.util.Collection;
+
+/**
+ * When DTFlumeSink server instance binds to the network interface, it can publish
+ * its whereabouts by invoking advertise method on the Discovery object. Similarly
+ * when it ceases accepting any more connections, it can publish its intent to do
+ * so by invoking unadvertise.<p />
+ * Interesting parties can call discover method to get the list of addresses where
+ * they can find an available DTFlumeSink server instance.
+ *
+ * @author Chetan Narsude <chetan@datatorrent.com>
+ * @param <T> - Type of the objects which can be discovered
+ * @since 0.9.3
+ */
+public interface Discovery<T>
+{
+ /**
+ * Recall the previously published address as it's no longer valid.
+ *
+ * @param service
+ */
+ void unadvertise(Service<T> service);
+
+ /**
+ * Advertise the host/port address where DTFlumeSink is accepting a client connection.
+ *
+ * @param service
+ */
+ void advertise(Service<T> service);
+
+ /**
+ * Discover all the addresses which are actively accepting the client connections.
+ *
+ * @return - Active server addresses which can accept the connections.
+ */
+ Collection<Service<T>> discover();
+
+ interface Service<T>
+ {
+ String getHost();
+
+ int getPort();
+
+ T getPayload();
+
+ String getId();
+
+ }
+
+}
diff --git a/flume/src/main/java/com/datatorrent/flume/discovery/ZKAssistedDiscovery.java b/flume/src/main/java/com/datatorrent/flume/discovery/ZKAssistedDiscovery.java
new file mode 100644
index 0000000..460a478
--- /dev/null
+++ b/flume/src/main/java/com/datatorrent/flume/discovery/ZKAssistedDiscovery.java
@@ -0,0 +1,429 @@
+/**
+ * Copyright (c) 2016 DataTorrent, Inc. ALL Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.flume.discovery;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+
+import javax.validation.constraints.NotNull;
+
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.ObjectReader;
+import org.codehaus.jackson.map.ObjectWriter;
+import org.codehaus.jackson.type.TypeReference;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryNTimes;
+import org.apache.curator.utils.EnsurePath;
+import org.apache.curator.x.discovery.ServiceDiscovery;
+import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
+import org.apache.curator.x.discovery.ServiceInstance;
+import org.apache.curator.x.discovery.details.InstanceSerializer;
+import org.apache.flume.conf.Configurable;
+
+import com.google.common.base.Throwables;
+
+import com.datatorrent.api.Component;
+
+/**
+ * <p>ZKAssistedDiscovery class.</p>
+ *
+ * @author Chetan Narsude <chetan@datatorrent.com>
+ * @since 0.9.3
+ */
+public class ZKAssistedDiscovery implements Discovery<byte[]>,
+ Component<com.datatorrent.api.Context>, Configurable, Serializable
+{
+ @NotNull
+ private String serviceName;
+ @NotNull
+ private String connectionString;
+ @NotNull
+ private String basePath;
+ private int connectionTimeoutMillis;
+ private int connectionRetryCount;
+ private int conntectionRetrySleepMillis;
+ private transient InstanceSerializerFactory instanceSerializerFactory;
+ private transient CuratorFramework curatorFramework;
+ private transient ServiceDiscovery<byte[]> discovery;
+
+ public ZKAssistedDiscovery()
+ {
+ this.serviceName = "DTFlume";
+ this.conntectionRetrySleepMillis = 500;
+ this.connectionRetryCount = 10;
+ this.connectionTimeoutMillis = 1000;
+ }
+
+ @Override
+ public void unadvertise(Service<byte[]> service)
+ {
+ doAdvertise(service, false);
+ }
+
+ @Override
+ public void advertise(Service<byte[]> service)
+ {
+ doAdvertise(service, true);
+ }
+
+ public void doAdvertise(Service<byte[]> service, boolean flag)
+ {
+ try {
+ new EnsurePath(basePath).ensure(curatorFramework.getZookeeperClient());
+
+ ServiceInstance<byte[]> instance = getInstance(service);
+ if (flag) {
+ discovery.registerService(instance);
+ } else {
+ discovery.unregisterService(instance);
+ }
+ } catch (Exception e) {
+ throw Throwables.propagate(e);
+ }
+ }
+
+ @Override
+ public Collection<Service<byte[]>> discover()
+ {
+ try {
+ new EnsurePath(basePath).ensure(curatorFramework.getZookeeperClient());
+
+ Collection<ServiceInstance<byte[]>> services = discovery.queryForInstances(serviceName);
+ ArrayList<Service<byte[]>> returnable = new ArrayList<Service<byte[]>>(services.size());
+ for (final ServiceInstance<byte[]> service : services) {
+ returnable.add(new Service<byte[]>()
+ {
+ @Override
+ public String getHost()
+ {
+ return service.getAddress();
+ }
+
+ @Override
+ public int getPort()
+ {
+ return service.getPort();
+ }
+
+ @Override
+ public byte[] getPayload()
+ {
+ return service.getPayload();
+ }
+
+ @Override
+ public String getId()
+ {
+ return service.getId();
+ }
+
+ @Override
+ public String toString()
+ {
+ return "{" + getId() + " => " + getHost() + ':' + getPort() + '}';
+ }
+
+ });
+ }
+ return returnable;
+ } catch (Exception e) {
+ throw Throwables.propagate(e);
+ }
+ }
+
+ @Override
+ public String toString()
+ {
+ return "ZKAssistedDiscovery{" + "serviceName=" + serviceName + ", connectionString=" + connectionString +
+ ", basePath=" + basePath + ", connectionTimeoutMillis=" + connectionTimeoutMillis + ", connectionRetryCount=" +
+ connectionRetryCount + ", conntectionRetrySleepMillis=" + conntectionRetrySleepMillis + '}';
+ }
+
+ @Override
+ public int hashCode()
+ {
+ int hash = 7;
+ hash = 47 * hash + this.serviceName.hashCode();
+ hash = 47 * hash + this.connectionString.hashCode();
+ hash = 47 * hash + this.basePath.hashCode();
+ hash = 47 * hash + this.connectionTimeoutMillis;
+ hash = 47 * hash + this.connectionRetryCount;
+ hash = 47 * hash + this.conntectionRetrySleepMillis;
+ return hash;
+ }
+
+ @Override
+ public boolean equals(Object obj)
+ {
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+ final ZKAssistedDiscovery other = (ZKAssistedDiscovery)obj;
+ if (!this.serviceName.equals(other.serviceName)) {
+ return false;
+ }
+ if (!this.connectionString.equals(other.connectionString)) {
+ return false;
+ }
+ if (!this.basePath.equals(other.basePath)) {
+ return false;
+ }
+ if (this.connectionTimeoutMillis != other.connectionTimeoutMillis) {
+ return false;
+ }
+ if (this.connectionRetryCount != other.connectionRetryCount) {
+ return false;
+ }
+ if (this.conntectionRetrySleepMillis != other.conntectionRetrySleepMillis) {
+ return false;
+ }
+ return true;
+ }
+
+ ServiceInstance<byte[]> getInstance(Service<byte[]> service) throws Exception
+ {
+ return ServiceInstance.<byte[]>builder()
+ .name(serviceName)
+ .address(service.getHost())
+ .port(service.getPort())
+ .id(service.getId())
+ .payload(service.getPayload())
+ .build();
+ }
+
+ private ServiceDiscovery<byte[]> getDiscovery(CuratorFramework curatorFramework)
+ {
+ return ServiceDiscoveryBuilder.builder(byte[].class)
+ .basePath(basePath)
+ .client(curatorFramework)
+ .serializer(instanceSerializerFactory.getInstanceSerializer(
+ new TypeReference<ServiceInstance<byte[]>>()
+ {})).build();
+ }
+
+ /**
+ * @return the instanceSerializerFactory
+ */
+ InstanceSerializerFactory getInstanceSerializerFactory()
+ {
+ return instanceSerializerFactory;
+ }
+
+ /**
+ * @return the connectionString
+ */
+ public String getConnectionString()
+ {
+ return connectionString;
+ }
+
+ /**
+ * @param connectionString the connectionString to set
+ */
+ public void setConnectionString(String connectionString)
+ {
+ this.connectionString = connectionString;
+ }
+
+ /**
+ * @return the basePath
+ */
+ public String getBasePath()
+ {
+ return basePath;
+ }
+
+ /**
+ * @param basePath the basePath to set
+ */
+ public void setBasePath(String basePath)
+ {
+ this.basePath = basePath;
+ }
+
+ /**
+ * @return the connectionTimeoutMillis
+ */
+ public int getConnectionTimeoutMillis()
+ {
+ return connectionTimeoutMillis;
+ }
+
+ /**
+ * @param connectionTimeoutMillis the connectionTimeoutMillis to set
+ */
+ public void setConnectionTimeoutMillis(int connectionTimeoutMillis)
+ {
+ this.connectionTimeoutMillis = connectionTimeoutMillis;
+ }
+
+ /**
+ * @return the connectionRetryCount
+ */
+ public int getConnectionRetryCount()
+ {
+ return connectionRetryCount;
+ }
+
+ /**
+ * @param connectionRetryCount the connectionRetryCount to set
+ */
+ public void setConnectionRetryCount(int connectionRetryCount)
+ {
+ this.connectionRetryCount = connectionRetryCount;
+ }
+
+ /**
+ * @return the conntectionRetrySleepMillis
+ */
+ public int getConntectionRetrySleepMillis()
+ {
+ return conntectionRetrySleepMillis;
+ }
+
+ /**
+ * @param conntectionRetrySleepMillis the conntectionRetrySleepMillis to set
+ */
+ public void setConntectionRetrySleepMillis(int conntectionRetrySleepMillis)
+ {
+ this.conntectionRetrySleepMillis = conntectionRetrySleepMillis;
+ }
+
+ /**
+ * @return the serviceName
+ */
+ public String getServiceName()
+ {
+ return serviceName;
+ }
+
+ /**
+ * @param serviceName the serviceName to set
+ */
+ public void setServiceName(String serviceName)
+ {
+ this.serviceName = serviceName;
+ }
+
+ @Override
+ public void configure(org.apache.flume.Context context)
+ {
+ serviceName = context.getString("serviceName", "DTFlume");
+ connectionString = context.getString("connectionString");
+ basePath = context.getString("basePath");
+
+ connectionTimeoutMillis = context.getInteger("connectionTimeoutMillis", 1000);
+ connectionRetryCount = context.getInteger("connectionRetryCount", 10);
+ conntectionRetrySleepMillis = context.getInteger("connectionRetrySleepMillis", 500);
+ }
+
+ @Override
+ public void setup(com.datatorrent.api.Context context)
+ {
+ ObjectMapper om = new ObjectMapper();
+ instanceSerializerFactory = new InstanceSerializerFactory(om.reader(), om.writer());
+
+ curatorFramework = CuratorFrameworkFactory.builder()
+ .connectionTimeoutMs(connectionTimeoutMillis)
+ .retryPolicy(new RetryNTimes(connectionRetryCount, conntectionRetrySleepMillis))
+ .connectString(connectionString)
+ .build();
+ curatorFramework.start();
+
+ discovery = getDiscovery(curatorFramework);
+ try {
+ discovery.start();
+ } catch (Exception ex) {
+ Throwables.propagate(ex);
+ }
+ }
+
+ @Override
+ public void teardown()
+ {
+ try {
+ discovery.close();
+ } catch (IOException ex) {
+ throw new RuntimeException(ex);
+ } finally {
+ curatorFramework.close();
+ curatorFramework = null;
+ }
+ }
+
+ public class InstanceSerializerFactory
+ {
+ private final ObjectReader objectReader;
+ private final ObjectWriter objectWriter;
+
+ InstanceSerializerFactory(ObjectReader objectReader, ObjectWriter objectWriter)
+ {
+ this.objectReader = objectReader;
+ this.objectWriter = objectWriter;
+ }
+
+ public <T> InstanceSerializer<T> getInstanceSerializer(
+ TypeReference<ServiceInstance<T>> typeReference)
+ {
+ return new JacksonInstanceSerializer<T>(objectReader, objectWriter, typeReference);
+ }
+
+ final class JacksonInstanceSerializer<T> implements InstanceSerializer<T>
+ {
+ private final TypeReference<ServiceInstance<T>> typeRef;
+ private final ObjectWriter objectWriter;
+ private final ObjectReader objectReader;
+
+ JacksonInstanceSerializer(ObjectReader objectReader, ObjectWriter objectWriter,
+ TypeReference<ServiceInstance<T>> typeRef)
+ {
+ this.objectReader = objectReader;
+ this.objectWriter = objectWriter;
+ this.typeRef = typeRef;
+ }
+
+ @Override
+ public ServiceInstance<T> deserialize(byte[] bytes) throws Exception
+ {
+ return objectReader.withType(typeRef).readValue(bytes);
+ }
+
+ @Override
+ public byte[] serialize(ServiceInstance<T> serviceInstance) throws Exception
+ {
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ objectWriter.writeValue(out, serviceInstance);
+ return out.toByteArray();
+ }
+
+ }
+
+ }
+
+ private static final long serialVersionUID = 201401221145L;
+ private static final Logger logger = LoggerFactory.getLogger(ZKAssistedDiscovery.class);
+}
diff --git a/flume/src/main/java/com/datatorrent/flume/interceptor/ColumnFilteringInterceptor.java b/flume/src/main/java/com/datatorrent/flume/interceptor/ColumnFilteringInterceptor.java
new file mode 100644
index 0000000..90c3a04
--- /dev/null
+++ b/flume/src/main/java/com/datatorrent/flume/interceptor/ColumnFilteringInterceptor.java
@@ -0,0 +1,204 @@
+/**
+ * Copyright (c) 2016 DataTorrent, Inc. ALL Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.flume.interceptor;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.interceptor.Interceptor;
+
+import static com.datatorrent.flume.interceptor.ColumnFilteringInterceptor.Constants.COLUMNS;
+import static com.datatorrent.flume.interceptor.ColumnFilteringInterceptor.Constants.DST_SEPARATOR;
+import static com.datatorrent.flume.interceptor.ColumnFilteringInterceptor.Constants.DST_SEPARATOR_DFLT;
+import static com.datatorrent.flume.interceptor.ColumnFilteringInterceptor.Constants.SRC_SEPARATOR;
+import static com.datatorrent.flume.interceptor.ColumnFilteringInterceptor.Constants.SRC_SEPARATOR_DFLT;
+
+/**
+ * <p>ColumnFilteringInterceptor class.</p>
+ *
+ * @author Chetan Narsude <chetan@datatorrent.com>
+ * @since 0.9.4
+ */
+public class ColumnFilteringInterceptor implements Interceptor
+{
+ private final byte srcSeparator;
+ private final byte dstSeparator;
+
+ private final int maxIndex;
+ private final int maxColumn;
+ private final int[] columns;
+ private final int[] positions;
+
+ private ColumnFilteringInterceptor(int[] columns, byte srcSeparator, byte dstSeparator)
+ {
+ this.columns = columns;
+
+ int tempMaxColumn = Integer.MIN_VALUE;
+ for (int column: columns) {
+ if (column > tempMaxColumn) {
+ tempMaxColumn = column;
+ }
+ }
+ maxIndex = tempMaxColumn;
+ maxColumn = tempMaxColumn + 1;
+ positions = new int[maxColumn + 1];
+
+ this.srcSeparator = srcSeparator;
+ this.dstSeparator = dstSeparator;
+ }
+
+ @Override
+ public void initialize()
+ {
+ /* no-op */
+ }
+
+ @Override
+ public Event intercept(Event event)
+ {
+ byte[] body = event.getBody();
+ if (body == null) {
+ return event;
+ }
+
+ final int length = body.length;
+
+ /* store positions of character after the separators */
+ int i = 0;
+ int index = 0;
+ while (i < length) {
+ if (body[i++] == srcSeparator) {
+ positions[++index] = i;
+ if (index >= maxIndex) {
+ break;
+ }
+ }
+ }
+
+ int nextVirginIndex;
+ boolean separatorTerminated;
+ if (i == length && index < maxColumn) {
+ nextVirginIndex = index + 2;
+ positions[nextVirginIndex - 1] = length;
+ separatorTerminated = length > 0 ? body[length - 1] != srcSeparator : false;
+ } else {
+ nextVirginIndex = index + 1;
+ separatorTerminated = true;
+ }
+
+ int newArrayLen = 0;
+ for (i = columns.length; i-- > 0;) {
+ int column = columns[i];
+ int len = positions[column + 1] - positions[column];
+ if (len <= 0) {
+ newArrayLen++;
+ } else {
+ if (separatorTerminated && positions[column + 1] == length) {
+ newArrayLen++;
+ }
+ newArrayLen += len;
+ }
+ }
+
+ byte[] newbody = new byte[newArrayLen];
+ int newoffset = 0;
+ for (int column: columns) {
+ int len = positions[column + 1] - positions[column];
+ if (len > 0) {
+ System.arraycopy(body, positions[column], newbody, newoffset, len);
+ newoffset += len;
+ if (newbody[newoffset - 1] == srcSeparator) {
+ newbody[newoffset - 1] = dstSeparator;
+ } else {
+ newbody[newoffset++] = dstSeparator;
+ }
+ } else {
+ newbody[newoffset++] = dstSeparator;
+ }
+ }
+
+ event.setBody(newbody);
+ Arrays.fill(positions, 1, nextVirginIndex, 0);
+ return event;
+ }
+
+ @Override
+ public List<Event> intercept(List<Event> events)
+ {
+ for (Event event: events) {
+ intercept(event);
+ }
+ return events;
+ }
+
+ @Override
+ public void close()
+ {
+ }
+
+ public static class Builder implements Interceptor.Builder
+ {
+ private int[] columns;
+ private byte srcSeparator;
+ private byte dstSeparator;
+
+ @Override
+ public Interceptor build()
+ {
+ return new ColumnFilteringInterceptor(columns, srcSeparator, dstSeparator);
+ }
+
+ @Override
+ public void configure(Context context)
+ {
+ String sColumns = context.getString(COLUMNS);
+ if (sColumns == null || sColumns.trim().isEmpty()) {
+ throw new Error("This interceptor requires filtered columns to be specified!");
+ }
+
+ String[] parts = sColumns.split(" ");
+ columns = new int[parts.length];
+ for (int i = parts.length; i-- > 0;) {
+ columns[i] = Integer.parseInt(parts[i]);
+ }
+
+ srcSeparator = context.getInteger(SRC_SEPARATOR, (int)SRC_SEPARATOR_DFLT).byteValue();
+ dstSeparator = context.getInteger(DST_SEPARATOR, (int)DST_SEPARATOR_DFLT).byteValue();
+ }
+
+ }
+
+ @SuppressWarnings("ClassMayBeInterface") /* adhering to flume until i understand it completely */
+
+ public static class Constants
+ {
+ public static final String SRC_SEPARATOR = "srcSeparator";
+ public static final byte SRC_SEPARATOR_DFLT = 2;
+
+ public static final String DST_SEPARATOR = "dstSeparator";
+ public static final byte DST_SEPARATOR_DFLT = 1;
+
+ public static final String COLUMNS = "columns";
+ }
+
+ private static final Logger logger = LoggerFactory.getLogger(ColumnFilteringInterceptor.class);
+}
diff --git a/flume/src/main/java/com/datatorrent/flume/operator/AbstractFlumeInputOperator.java b/flume/src/main/java/com/datatorrent/flume/operator/AbstractFlumeInputOperator.java
new file mode 100644
index 0000000..1ab7182
--- /dev/null
+++ b/flume/src/main/java/com/datatorrent/flume/operator/AbstractFlumeInputOperator.java
@@ -0,0 +1,760 @@
+/**
+ * Copyright (c) 2016 DataTorrent, Inc. ALL Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.flume.operator;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ArrayBlockingQueue;
+
+import javax.validation.constraints.Min;
+import javax.validation.constraints.NotNull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.flume.Event;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.DefaultPartition;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.Partitioner;
+import com.datatorrent.api.Stats.OperatorStats;
+import com.datatorrent.api.StreamCodec;
+import com.datatorrent.flume.discovery.Discovery.Service;
+import com.datatorrent.flume.discovery.ZKAssistedDiscovery;
+import com.datatorrent.flume.sink.Server;
+import com.datatorrent.flume.sink.Server.Command;
+import com.datatorrent.flume.sink.Server.Request;
+import com.datatorrent.netlet.AbstractLengthPrependerClient;
+import com.datatorrent.netlet.DefaultEventLoop;
+import com.datatorrent.netlet.util.Slice;
+
+import static java.lang.Thread.sleep;
+
+/**
+ * <p>
+ * Abstract AbstractFlumeInputOperator class.</p>
+ *
+ * @param <T> Type of the output payload.
+ * @author Chetan Narsude <chetan@datatorrent.com>
+ * @since 0.9.2
+ */
+public abstract class AbstractFlumeInputOperator<T>
+ implements InputOperator, Operator.ActivationListener<OperatorContext>, Operator.IdleTimeHandler,
+ Operator.CheckpointListener, Partitioner<AbstractFlumeInputOperator<T>>
+{
+ public final transient DefaultOutputPort<T> output = new DefaultOutputPort<T>();
+ public final transient DefaultOutputPort<Slice> drop = new DefaultOutputPort<Slice>();
+ @NotNull
+ private String[] connectionSpecs;
+ @NotNull
+ private StreamCodec<Event> codec;
+ private final ArrayList<RecoveryAddress> recoveryAddresses;
+ @SuppressWarnings("FieldMayBeFinal") // it's not final because that mucks with the serialization somehow
+ private transient ArrayBlockingQueue<Slice> handoverBuffer;
+ private transient int idleCounter;
+ private transient int eventCounter;
+ private transient DefaultEventLoop eventloop;
+ private transient volatile boolean connected;
+ private transient OperatorContext context;
+ private transient Client client;
+ private transient long windowId;
+ private transient byte[] address;
+ @Min(0)
+ private long maxEventsPerSecond;
+ //This is calculated from maxEventsPerSecond, App window count and streaming window size
+ private transient long maxEventsPerWindow;
+
+ public AbstractFlumeInputOperator()
+ {
+ handoverBuffer = new ArrayBlockingQueue<Slice>(1024 * 5);
+ connectionSpecs = new String[0];
+ recoveryAddresses = new ArrayList<RecoveryAddress>();
+ maxEventsPerSecond = Long.MAX_VALUE;
+ }
+
+ @Override
+ public void setup(OperatorContext context)
+ {
+ long windowDurationMillis = context.getValue(OperatorContext.APPLICATION_WINDOW_COUNT) *
+ context.getValue(Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS);
+ maxEventsPerWindow = (long)(windowDurationMillis / 1000.0 * maxEventsPerSecond);
+ logger.debug("max-events per-second {} per-window {}", maxEventsPerSecond, maxEventsPerWindow);
+
+ try {
+ eventloop = new DefaultEventLoop("EventLoop-" + context.getId());
+ eventloop.start();
+ } catch (IOException ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ @Override
+ @SuppressWarnings({"unchecked"})
+ public void activate(OperatorContext ctx)
+ {
+ if (connectionSpecs.length == 0) {
+ logger.info("Discovered zero DTFlumeSink");
+ } else if (connectionSpecs.length == 1) {
+ for (String connectAddresse: connectionSpecs) {
+ logger.debug("Connection spec is {}", connectAddresse);
+ String[] parts = connectAddresse.split(":");
+ eventloop.connect(new InetSocketAddress(parts[1], Integer.parseInt(parts[2])), client = new Client(parts[0]));
+ }
+ } else {
+ throw new IllegalArgumentException(
+ String.format("A physical %s operator cannot connect to more than 1 addresses!",
+ this.getClass().getSimpleName()));
+ }
+
+ context = ctx;
+ }
+
+ @Override
+ public void beginWindow(long windowId)
+ {
+ this.windowId = windowId;
+ idleCounter = 0;
+ eventCounter = 0;
+ }
+
+ @Override
+ public void emitTuples()
+ {
+ int i = handoverBuffer.size();
+ if (i > 0 && eventCounter < maxEventsPerWindow) {
+
+ while (--i > 0 && eventCounter < maxEventsPerWindow - 1) {
+ final Slice slice = handoverBuffer.poll();
+ slice.offset += 8;
+ slice.length -= 8;
+ T convert = convert((Event)codec.fromByteArray(slice));
+ if (convert == null) {
+ drop.emit(slice);
+ } else {
+ output.emit(convert);
+ }
+ eventCounter++;
+ }
+
+ final Slice slice = handoverBuffer.poll();
+ slice.offset += 8;
+ slice.length -= 8;
+ T convert = convert((Event)codec.fromByteArray(slice));
+ if (convert == null) {
+ drop.emit(slice);
+ } else {
+ output.emit(convert);
+ }
+ eventCounter++;
+
+ address = Arrays.copyOfRange(slice.buffer, slice.offset - 8, slice.offset);
+ }
+ }
+
+ @Override
+ public void endWindow()
+ {
+ if (connected) {
+ byte[] array = new byte[Request.FIXED_SIZE];
+
+ array[0] = Command.WINDOWED.getOrdinal();
+ Server.writeInt(array, 1, eventCounter);
+ Server.writeInt(array, 5, idleCounter);
+ Server.writeLong(array, Request.TIME_OFFSET, System.currentTimeMillis());
+
+ logger.debug("wrote {} with eventCounter = {} and idleCounter = {}", Command.WINDOWED, eventCounter, idleCounter);
+ client.write(array);
+ }
+
+ if (address != null) {
+ RecoveryAddress rAddress = new RecoveryAddress();
+ rAddress.address = address;
+ address = null;
+ rAddress.windowId = windowId;
+ recoveryAddresses.add(rAddress);
+ }
+ }
+
+ @Override
+ public void deactivate()
+ {
+ if (connected) {
+ eventloop.disconnect(client);
+ }
+ context = null;
+ }
+
+ @Override
+ public void teardown()
+ {
+ eventloop.stop();
+ eventloop = null;
+ }
+
+ @Override
+ public void handleIdleTime()
+ {
+ idleCounter++;
+ try {
+ sleep(context.getValue(OperatorContext.SPIN_MILLIS));
+ } catch (InterruptedException ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ public abstract T convert(Event event);
+
+ /**
+ * @return the connectAddress
+ */
+ public String[] getConnectAddresses()
+ {
+ return connectionSpecs.clone();
+ }
+
+ /**
+ * @param specs - sinkid:host:port specification of all the sinks.
+ */
+ public void setConnectAddresses(String[] specs)
+ {
+ this.connectionSpecs = specs.clone();
+ }
+
+ /**
+ * @return the codec
+ */
+ public StreamCodec<Event> getCodec()
+ {
+ return codec;
+ }
+
+ /**
+ * @param codec the codec to set
+ */
+ public void setCodec(StreamCodec<Event> codec)
+ {
+ this.codec = codec;
+ }
+
+ private static class RecoveryAddress implements Serializable
+ {
+ long windowId;
+ byte[] address;
+
+ @Override
+ public String toString()
+ {
+ return "RecoveryAddress{" + "windowId=" + windowId + ", address=" + Arrays.toString(address) + '}';
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof RecoveryAddress)) {
+ return false;
+ }
+
+ RecoveryAddress that = (RecoveryAddress)o;
+
+ if (windowId != that.windowId) {
+ return false;
+ }
+ return Arrays.equals(address, that.address);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ int result = (int)(windowId ^ (windowId >>> 32));
+ result = 31 * result + (address != null ? Arrays.hashCode(address) : 0);
+ return result;
+ }
+
+ private static final long serialVersionUID = 201312021432L;
+ }
+
+ @Override
+ public void checkpointed(long windowId)
+ {
+ /* dont do anything */
+ }
+
+ @Override
+ public void committed(long windowId)
+ {
+ if (!connected) {
+ return;
+ }
+
+ synchronized (recoveryAddresses) {
+ byte[] addr = null;
+
+ Iterator<RecoveryAddress> iterator = recoveryAddresses.iterator();
+ while (iterator.hasNext()) {
+ RecoveryAddress ra = iterator.next();
+ if (ra.windowId > windowId) {
+ break;
+ }
+
+ iterator.remove();
+ if (ra.address != null) {
+ addr = ra.address;
+ }
+ }
+
+ if (addr != null) {
+ /*
+ * Make sure that we store the last valid address processed
+ */
+ if (recoveryAddresses.isEmpty()) {
+ RecoveryAddress ra = new RecoveryAddress();
+ ra.address = addr;
+ recoveryAddresses.add(ra);
+ }
+
+ int arraySize = 1/* for the type of the message */
+ + 8 /* for the location to commit */
+ + 8 /* for storing the current time stamp*/;
+ byte[] array = new byte[arraySize];
+
+ array[0] = Command.COMMITTED.getOrdinal();
+ System.arraycopy(addr, 0, array, 1, 8);
+ Server.writeLong(array, Request.TIME_OFFSET, System.currentTimeMillis());
+ logger.debug("wrote {} with recoveryOffset = {}", Command.COMMITTED, Arrays.toString(addr));
+ client.write(array);
+ }
+ }
+ }
+
+ @Override
+ public Collection<Partition<AbstractFlumeInputOperator<T>>> definePartitions(
+ Collection<Partition<AbstractFlumeInputOperator<T>>> partitions, PartitioningContext context)
+ {
+ Collection<Service<byte[]>> discovered = discoveredFlumeSinks.get();
+ if (discovered == null) {
+ return partitions;
+ }
+
+ HashMap<String, ArrayList<RecoveryAddress>> allRecoveryAddresses = abandonedRecoveryAddresses.get();
+ ArrayList<String> allConnectAddresses = new ArrayList<String>(partitions.size());
+ for (Partition<AbstractFlumeInputOperator<T>> partition: partitions) {
+ String[] lAddresses = partition.getPartitionedInstance().connectionSpecs;
+ allConnectAddresses.addAll(Arrays.asList(lAddresses));
+ for (int i = lAddresses.length; i-- > 0;) {
+ String[] parts = lAddresses[i].split(":", 2);
+ allRecoveryAddresses.put(parts[0], partition.getPartitionedInstance().recoveryAddresses);
+ }
+ }
+
+ HashMap<String, String> connections = new HashMap<String, String>(discovered.size());
+ for (Service<byte[]> service: discovered) {
+ String previousSpec = connections.get(service.getId());
+ String newspec = service.getId() + ':' + service.getHost() + ':' + service.getPort();
+ if (previousSpec == null) {
+ connections.put(service.getId(), newspec);
+ } else {
+ boolean found = false;
+ for (ConnectionStatus cs: partitionedInstanceStatus.get().values()) {
+ if (previousSpec.equals(cs.spec) && !cs.connected) {
+ connections.put(service.getId(), newspec);
+ found = true;
+ break;
+ }
+ }
+
+ if (!found) {
+ logger.warn("2 sinks found with the same id: {} and {}... Ignoring previous.", previousSpec, newspec);
+ connections.put(service.getId(), newspec);
+ }
+ }
+ }
+
+ for (int i = allConnectAddresses.size(); i-- > 0;) {
+ String[] parts = allConnectAddresses.get(i).split(":");
+ String connection = connections.remove(parts[0]);
+ if (connection == null) {
+ allConnectAddresses.remove(i);
+ } else {
+ allConnectAddresses.set(i, connection);
+ }
+ }
+
+ allConnectAddresses.addAll(connections.values());
+
+ partitions.clear();
+ try {
+ if (allConnectAddresses.isEmpty()) {
+ /* return at least one of them; otherwise stram becomes grumpy */
+ @SuppressWarnings("unchecked")
+ AbstractFlumeInputOperator<T> operator = getClass().newInstance();
+ operator.setCodec(codec);
+ operator.setMaxEventsPerSecond(maxEventsPerSecond);
+ for (ArrayList<RecoveryAddress> lRecoveryAddresses: allRecoveryAddresses.values()) {
+ operator.recoveryAddresses.addAll(lRecoveryAddresses);
+ }
+ operator.connectionSpecs = new String[allConnectAddresses.size()];
+ for (int i = connectionSpecs.length; i-- > 0;) {
+ connectionSpecs[i] = allConnectAddresses.get(i);
+ }
+
+ partitions.add(new DefaultPartition<AbstractFlumeInputOperator<T>>(operator));
+ } else {
+ long maxEventsPerSecondPerOperator = maxEventsPerSecond / allConnectAddresses.size();
+ for (int i = allConnectAddresses.size(); i-- > 0;) {
+ @SuppressWarnings("unchecked")
+ AbstractFlumeInputOperator<T> operator = getClass().newInstance();
+ operator.setCodec(codec);
+ operator.setMaxEventsPerSecond(maxEventsPerSecondPerOperator);
+ String connectAddress = allConnectAddresses.get(i);
+ operator.connectionSpecs = new String[] {connectAddress};
+
+ String[] parts = connectAddress.split(":", 2);
+ ArrayList<RecoveryAddress> remove = allRecoveryAddresses.remove(parts[0]);
+ if (remove != null) {
+ operator.recoveryAddresses.addAll(remove);
+ }
+
+ partitions.add(new DefaultPartition<AbstractFlumeInputOperator<T>>(operator));
+ }
+ }
+ } catch (IllegalAccessException ex) {
+ throw new RuntimeException(ex);
+ } catch (InstantiationException ex) {
+ throw new RuntimeException(ex);
+ }
+
+ logger.debug("Requesting partitions: {}", partitions);
+ return partitions;
+ }
+
+ @Override
+ public void partitioned(Map<Integer, Partition<AbstractFlumeInputOperator<T>>> partitions)
+ {
+ logger.debug("Partitioned Map: {}", partitions);
+ HashMap<Integer, ConnectionStatus> map = partitionedInstanceStatus.get();
+ map.clear();
+ for (Entry<Integer, Partition<AbstractFlumeInputOperator<T>>> entry: partitions.entrySet()) {
+ if (map.containsKey(entry.getKey())) {
+ // what can be done here?
+ } else {
+ map.put(entry.getKey(), null);
+ }
+ }
+ }
+
+ @Override
+ public String toString()
+ {
+ return "AbstractFlumeInputOperator{" + "connected=" + connected + ", connectionSpecs=" +
+ (connectionSpecs.length == 0 ? "empty" : connectionSpecs[0]) + ", recoveryAddresses=" + recoveryAddresses + '}';
+ }
+
+ class Client extends AbstractLengthPrependerClient
+ {
+ private final String id;
+
+ Client(String id)
+ {
+ this.id = id;
+ }
+
+ @Override
+ public void onMessage(byte[] buffer, int offset, int size)
+ {
+ try {
+ handoverBuffer.put(new Slice(buffer, offset, size));
+ } catch (InterruptedException ex) {
+ handleException(ex, eventloop);
+ }
+ }
+
+ @Override
+ public void connected()
+ {
+ super.connected();
+
+ byte[] address;
+ synchronized (recoveryAddresses) {
+ if (recoveryAddresses.size() > 0) {
+ address = recoveryAddresses.get(recoveryAddresses.size() - 1).address;
+ } else {
+ address = new byte[8];
+ }
+ }
+
+ int len = 1 /* for the message type SEEK */
+ + 8 /* for the address */
+ + 8 /* for storing the current time stamp*/;
+
+ byte[] array = new byte[len];
+ array[0] = Command.SEEK.getOrdinal();
+ System.arraycopy(address, 0, array, 1, 8);
+ Server.writeLong(array, 9, System.currentTimeMillis());
+ write(array);
+
+ connected = true;
+ ConnectionStatus connectionStatus = new ConnectionStatus();
+ connectionStatus.connected = true;
+ connectionStatus.spec = connectionSpecs[0];
+ OperatorContext ctx = context;
+ synchronized (ctx) {
+ logger.debug("{} Submitting ConnectionStatus = {}", AbstractFlumeInputOperator.this, connectionStatus);
+ context.setCounters(connectionStatus);
+ }
+ }
+
+ @Override
+ public void disconnected()
+ {
+ connected = false;
+ ConnectionStatus connectionStatus = new ConnectionStatus();
+ connectionStatus.connected = false;
+ connectionStatus.spec = connectionSpecs[0];
+ OperatorContext ctx = context;
+ synchronized (ctx) {
+ logger.debug("{} Submitting ConnectionStatus = {}", AbstractFlumeInputOperator.this, connectionStatus);
+ context.setCounters(connectionStatus);
+ }
+ super.disconnected();
+ }
+
+ }
+
+ public static class ZKStatsListner extends ZKAssistedDiscovery implements com.datatorrent.api.StatsListener,
+ Serializable
+ {
+ /*
+ * In the current design, one input operator is able to connect
+ * to only one flume adapter. Sometime in future, we should support
+ * any number of input operators connecting to any number of flume
+ * sinks and vice a versa.
+ *
+ * Until that happens the following map should be sufficient to
+ * keep track of which input operator is connected to which flume sink.
+ */
+ long intervalMillis;
+ private final Response response;
+ private transient long nextMillis;
+
+ public ZKStatsListner()
+ {
+ intervalMillis = 60 * 1000L;
+ response = new Response();
+ }
+
+ @Override
+ public Response processStats(BatchedOperatorStats stats)
+ {
+ final HashMap<Integer, ConnectionStatus> map = partitionedInstanceStatus.get();
+ response.repartitionRequired = false;
+
+ Object lastStat = null;
+ List<OperatorStats> lastWindowedStats = stats.getLastWindowedStats();
+ for (OperatorStats os: lastWindowedStats) {
+ if (os.counters != null) {
+ lastStat = os.counters;
+ logger.debug("Received custom stats = {}", lastStat);
+ }
+ }
+
+ if (lastStat instanceof ConnectionStatus) {
+ ConnectionStatus cs = (ConnectionStatus)lastStat;
+ map.put(stats.getOperatorId(), cs);
+ if (!cs.connected) {
+ logger.debug("setting repatitioned = true because of lastStat = {}", lastStat);
+ response.repartitionRequired = true;
+ }
+ }
+
+ if (System.currentTimeMillis() >= nextMillis) {
+ logger.debug("nextMillis = {}", nextMillis);
+ try {
+ super.setup(null);
+ Collection<Service<byte[]>> addresses;
+ try {
+ addresses = discover();
+ } finally {
+ super.teardown();
+ }
+ AbstractFlumeInputOperator.discoveredFlumeSinks.set(addresses);
+ logger.debug("\ncurrent map = {}\ndiscovered sinks = {}", map, addresses);
+ switch (addresses.size()) {
+ case 0:
+ response.repartitionRequired = map.size() != 1;
+ break;
+
+ default:
+ if (addresses.size() == map.size()) {
+ for (ConnectionStatus value: map.values()) {
+ if (value == null || !value.connected) {
+ response.repartitionRequired = true;
+ break;
+ }
+ }
+ } else {
+ response.repartitionRequired = true;
+ }
+ break;
+ }
+ } catch (Error er) {
+ throw er;
+ } catch (Throwable cause) {
+ logger.warn("Unable to discover services, using values from last successful discovery", cause);
+ } finally {
+ nextMillis = System.currentTimeMillis() + intervalMillis;
+ logger.debug("Proposed NextMillis = {}", nextMillis);
+ }
+ }
+
+ return response;
+ }
+
+ /**
+ * @return the intervalMillis
+ */
+ public long getIntervalMillis()
+ {
+ return intervalMillis;
+ }
+
+ /**
+ * @param intervalMillis the intervalMillis to set
+ */
+ public void setIntervalMillis(long intervalMillis)
+ {
+ this.intervalMillis = intervalMillis;
+ }
+
+ private static final long serialVersionUID = 201312241646L;
+ }
+
+ public static class ConnectionStatus implements Serializable
+ {
+ int id;
+ String spec;
+ boolean connected;
+
+ @Override
+ public int hashCode()
+ {
+ return spec.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj)
+ {
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+ final ConnectionStatus other = (ConnectionStatus)obj;
+ return spec == null ? other.spec == null : spec.equals(other.spec);
+ }
+
+ @Override
+ public String toString()
+ {
+ return "ConnectionStatus{" + "id=" + id + ", spec=" + spec + ", connected=" + connected + '}';
+ }
+
+ private static final long serialVersionUID = 201312261615L;
+ }
+
+ private static final transient ThreadLocal<HashMap<Integer, ConnectionStatus>> partitionedInstanceStatus =
+ new ThreadLocal<HashMap<Integer, ConnectionStatus>>()
+ {
+ @Override
+ protected HashMap<Integer, ConnectionStatus> initialValue()
+ {
+ return new HashMap<Integer, ConnectionStatus>();
+ }
+
+ };
+ /**
+ * When a sink goes away and a replacement sink is not found, we stash the recovery addresses associated
+ * with the sink in a hope that the new sink may show up in near future.
+ */
+ private static final transient ThreadLocal<HashMap<String, ArrayList<RecoveryAddress>>> abandonedRecoveryAddresses =
+ new ThreadLocal<HashMap<String, ArrayList<RecoveryAddress>>>()
+ {
+ @Override
+ protected HashMap<String, ArrayList<RecoveryAddress>> initialValue()
+ {
+ return new HashMap<String, ArrayList<RecoveryAddress>>();
+ }
+
+ };
+ private static final transient ThreadLocal<Collection<Service<byte[]>>> discoveredFlumeSinks =
+ new ThreadLocal<Collection<Service<byte[]>>>();
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof AbstractFlumeInputOperator)) {
+ return false;
+ }
+
+ AbstractFlumeInputOperator<?> that = (AbstractFlumeInputOperator<?>)o;
+
+ if (!Arrays.equals(connectionSpecs, that.connectionSpecs)) {
+ return false;
+ }
+ return recoveryAddresses.equals(that.recoveryAddresses);
+
+ }
+
+ @Override
+ public int hashCode()
+ {
+ int result = connectionSpecs != null ? Arrays.hashCode(connectionSpecs) : 0;
+ result = 31 * result + (recoveryAddresses.hashCode());
+ return result;
+ }
+
+ public void setMaxEventsPerSecond(long maxEventsPerSecond)
+ {
+ this.maxEventsPerSecond = maxEventsPerSecond;
+ }
+
+ public long getMaxEventsPerSecond()
+ {
+ return maxEventsPerSecond;
+ }
+
+ private static final Logger logger = LoggerFactory.getLogger(AbstractFlumeInputOperator.class);
+}
diff --git a/flume/src/main/java/com/datatorrent/flume/sink/DTFlumeSink.java b/flume/src/main/java/com/datatorrent/flume/sink/DTFlumeSink.java
new file mode 100644
index 0000000..35d0c5f
--- /dev/null
+++ b/flume/src/main/java/com/datatorrent/flume/sink/DTFlumeSink.java
@@ -0,0 +1,571 @@
+/**
+ * Copyright (c) 2016 DataTorrent, Inc. ALL Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.flume.sink;
+
+import java.io.IOError;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.ServiceConfigurationError;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.Transaction;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.sink.AbstractSink;
+
+import com.datatorrent.api.Component;
+import com.datatorrent.api.StreamCodec;
+import com.datatorrent.flume.discovery.Discovery;
+import com.datatorrent.flume.sink.Server.Client;
+import com.datatorrent.flume.sink.Server.Request;
+import com.datatorrent.flume.storage.EventCodec;
+import com.datatorrent.flume.storage.Storage;
+import com.datatorrent.netlet.DefaultEventLoop;
+import com.datatorrent.netlet.NetletThrowable;
+import com.datatorrent.netlet.NetletThrowable.NetletRuntimeException;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * DTFlumeSink is a flume sink developed to ingest the data into DataTorrent DAG
+ * from flume. It's essentially a flume sink which acts as a server capable of
+ * talking to one client at a time. The client for this server is AbstractFlumeInputOperator.
+ * <p />
+ * <experimental>DTFlumeSink auto adjusts the rate at which it consumes the data from channel to
+ * match the throughput of the DAG.</experimental>
+ * <p />
+ * The properties you can set on the DTFlumeSink are: <br />
+ * id - string unique value identifying this sink <br />
+ * hostname - string value indicating the fqdn or ip address of the interface on which the server should listen <br />
+ * port - integer value indicating the numeric port to which the server should bind <br />
+ * sleepMillis - integer value indicating the number of milliseconds the process should sleep when there are no events
+ * before checking for next event again <br />
+ * throughputAdjustmentPercent - integer value indicating by what percentage the flume transaction size should be
+ * adjusted upward or downward at a time <br />
+ * minimumEventsPerTransaction - integer value indicating the minimum number of events per transaction <br />
+ * maximumEventsPerTransaction - integer value indicating the maximum number of events per transaction. This value can
+ * not be more than channel's transaction capacity.<br />
+ *
+ * @author Chetan Narsude <chetan@datatorrent.com>
+ * @since 0.9.2
+ */
+public class DTFlumeSink extends AbstractSink implements Configurable
+{
+ private static final String HOSTNAME_STRING = "hostname";
+ private static final String HOSTNAME_DEFAULT = "locahost";
+ private static final long ACCEPTED_TOLERANCE = 20000;
+ private DefaultEventLoop eventloop;
+ private Server server;
+ private int outstandingEventsCount;
+ private int lastConsumedEventsCount;
+ private int idleCount;
+ private byte[] playback;
+ private Client client;
+ private String hostname;
+ private int port;
+ private String id;
+ private long acceptedTolerance;
+ private long sleepMillis;
+ private double throughputAdjustmentFactor;
+ private int minimumEventsPerTransaction;
+ private int maximumEventsPerTransaction;
+ private long commitEventTimeoutMillis;
+ private transient long lastCommitEventTimeMillis;
+ private Storage storage;
+ Discovery<byte[]> discovery;
+ StreamCodec<Event> codec;
+ /* Begin implementing Flume Sink interface */
+
+ @Override
+ @SuppressWarnings({"BroadCatchBlock", "TooBroadCatch", "UseSpecificCatch", "SleepWhileInLoop"})
+ public Status process() throws EventDeliveryException
+ {
+ Slice slice;
+ synchronized (server.requests) {
+ for (Request r : server.requests) {
+ logger.debug("found {}", r);
+ switch (r.type) {
+ case SEEK:
+ lastCommitEventTimeMillis = System.currentTimeMillis();
+ slice = r.getAddress();
+ playback = storage.retrieve(Arrays.copyOfRange(slice.buffer, slice.offset, slice.offset + slice.length));
+ client = r.client;
+ break;
+
+ case COMMITTED:
+ lastCommitEventTimeMillis = System.currentTimeMillis();
+ slice = r.getAddress();
+ storage.clean(Arrays.copyOfRange(slice.buffer, slice.offset, slice.offset + slice.length));
+ break;
+
+ case CONNECTED:
+ logger.debug("Connected received, ignoring it!");
+ break;
+
+ case DISCONNECTED:
+ if (r.client == client) {
+ client = null;
+ outstandingEventsCount = 0;
+ }
+ break;
+
+ case WINDOWED:
+ lastConsumedEventsCount = r.getEventCount();
+ idleCount = r.getIdleCount();
+ outstandingEventsCount -= lastConsumedEventsCount;
+ break;
+
+ case SERVER_ERROR:
+ throw new IOError(null);
+
+ default:
+ logger.debug("Cannot understand the request {}", r);
+ break;
+ }
+ }
+
+ server.requests.clear();
+ }
+
+ if (client == null) {
+ logger.info("No client expressed interest yet to consume the events.");
+ return Status.BACKOFF;
+ } else if (System.currentTimeMillis() - lastCommitEventTimeMillis > commitEventTimeoutMillis) {
+ logger.info("Client has not processed the workload given for the last {} milliseconds, so backing off.",
+ System.currentTimeMillis() - lastCommitEventTimeMillis);
+ return Status.BACKOFF;
+ }
+
+ int maxTuples;
+ // the following logic needs to be fixed... this is a quick put together.
+ if (outstandingEventsCount < 0) {
+ if (idleCount > 1) {
+ maxTuples = (int)((1 + throughputAdjustmentFactor * idleCount) * lastConsumedEventsCount);
+ } else {
+ maxTuples = (int)((1 + throughputAdjustmentFactor) * lastConsumedEventsCount);
+ }
+ } else if (outstandingEventsCount > lastConsumedEventsCount) {
+ maxTuples = (int)((1 - throughputAdjustmentFactor) * lastConsumedEventsCount);
+ } else {
+ if (idleCount > 0) {
+ maxTuples = (int)((1 + throughputAdjustmentFactor * idleCount) * lastConsumedEventsCount);
+ if (maxTuples <= 0) {
+ maxTuples = minimumEventsPerTransaction;
+ }
+ } else {
+ maxTuples = lastConsumedEventsCount;
+ }
+ }
+
+ if (maxTuples >= maximumEventsPerTransaction) {
+ maxTuples = maximumEventsPerTransaction;
+ } else if (maxTuples <= 0) {
+ maxTuples = minimumEventsPerTransaction;
+ }
+
+ if (maxTuples > 0) {
+ if (playback != null) {
+ try {
+ int i = 0;
+ do {
+ if (!client.write(playback)) {
+ retryWrite(playback, null);
+ }
+ outstandingEventsCount++;
+ playback = storage.retrieveNext();
+ }
+ while (++i < maxTuples && playback != null);
+ } catch (Exception ex) {
+ logger.warn("Playback Failed", ex);
+ if (ex instanceof NetletThrowable) {
+ try {
+ eventloop.disconnect(client);
+ } finally {
+ client = null;
+ outstandingEventsCount = 0;
+ }
+ }
+ return Status.BACKOFF;
+ }
+ } else {
+ int storedTuples = 0;
+
+ Transaction t = getChannel().getTransaction();
+ try {
+ t.begin();
+
+ Event e;
+ while (storedTuples < maxTuples && (e = getChannel().take()) != null) {
+ Slice event = codec.toByteArray(e);
+ byte[] address = storage.store(event);
+ if (address != null) {
+ if (!client.write(address, event)) {
+ retryWrite(address, event);
+ }
+ outstandingEventsCount++;
+ } else {
+ logger.debug("Detected the condition of recovery from flume crash!");
+ }
+ storedTuples++;
+ }
+
+ if (storedTuples > 0) {
+ storage.flush();
+ }
+
+ t.commit();
+
+ if (storedTuples > 0) { /* log less frequently */
+ logger.debug("Transaction details maxTuples = {}, storedTuples = {}, outstanding = {}",
+ maxTuples, storedTuples, outstandingEventsCount);
+ }
+ } catch (Error er) {
+ t.rollback();
+ throw er;
+ } catch (Exception ex) {
+ logger.error("Transaction Failed", ex);
+ if (ex instanceof NetletRuntimeException && client != null) {
+ try {
+ eventloop.disconnect(client);
+ } finally {
+ client = null;
+ outstandingEventsCount = 0;
+ }
+ }
+ t.rollback();
+ return Status.BACKOFF;
+ } finally {
+ t.close();
+ }
+
+ if (storedTuples == 0) {
+ sleep();
+ }
+ }
+ }
+
+ return Status.READY;
+ }
+
+ private void sleep()
+ {
+ try {
+ Thread.sleep(sleepMillis);
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ @Override
+ public void start()
+ {
+ try {
+ if (storage instanceof Component) {
+ @SuppressWarnings("unchecked")
+ Component<com.datatorrent.api.Context> component = (Component<com.datatorrent.api.Context>)storage;
+ component.setup(null);
+ }
+ if (discovery instanceof Component) {
+ @SuppressWarnings("unchecked")
+ Component<com.datatorrent.api.Context> component = (Component<com.datatorrent.api.Context>)discovery;
+ component.setup(null);
+ }
+ if (codec instanceof Component) {
+ @SuppressWarnings("unchecked")
+ Component<com.datatorrent.api.Context> component = (Component<com.datatorrent.api.Context>)codec;
+ component.setup(null);
+ }
+ eventloop = new DefaultEventLoop("EventLoop-" + id);
+ server = new Server(id, discovery,acceptedTolerance);
+ } catch (Error error) {
+ throw error;
+ } catch (RuntimeException re) {
+ throw re;
+ } catch (IOException ex) {
+ throw new RuntimeException(ex);
+ }
+
+ eventloop.start();
+ eventloop.start(hostname, port, server);
+ super.start();
+ }
+
+ @Override
+ public void stop()
+ {
+ try {
+ super.stop();
+ } finally {
+ try {
+ if (client != null) {
+ eventloop.disconnect(client);
+ client = null;
+ }
+
+ eventloop.stop(server);
+ eventloop.stop();
+
+ if (codec instanceof Component) {
+ @SuppressWarnings("unchecked")
+ Component<com.datatorrent.api.Context> component = (Component<com.datatorrent.api.Context>)codec;
+ component.teardown();
+ }
+ if (discovery instanceof Component) {
+ @SuppressWarnings("unchecked")
+ Component<com.datatorrent.api.Context> component = (Component<com.datatorrent.api.Context>)discovery;
+ component.teardown();
+ }
+ if (storage instanceof Component) {
+ @SuppressWarnings("unchecked")
+ Component<com.datatorrent.api.Context> component = (Component<com.datatorrent.api.Context>)storage;
+ component.teardown();
+ }
+ } catch (Throwable cause) {
+ throw new ServiceConfigurationError("Failed Stop", cause);
+ }
+ }
+ }
+
+ /* End implementing Flume Sink interface */
+
+ /* Begin Configurable Interface */
+ @Override
+ public void configure(Context context)
+ {
+ hostname = context.getString(HOSTNAME_STRING, HOSTNAME_DEFAULT);
+ port = context.getInteger("port", 0);
+ id = context.getString("id");
+ if (id == null) {
+ id = getName();
+ }
+ acceptedTolerance = context.getLong("acceptedTolerance", ACCEPTED_TOLERANCE);
+ sleepMillis = context.getLong("sleepMillis", 5L);
+ throughputAdjustmentFactor = context.getInteger("throughputAdjustmentPercent", 5) / 100.0;
+ maximumEventsPerTransaction = context.getInteger("maximumEventsPerTransaction", 10000);
+ minimumEventsPerTransaction = context.getInteger("minimumEventsPerTransaction", 100);
+ commitEventTimeoutMillis = context.getLong("commitEventTimeoutMillis", Long.MAX_VALUE);
+
+ @SuppressWarnings("unchecked")
+ Discovery<byte[]> ldiscovery = configure("discovery", Discovery.class, context);
+ if (ldiscovery == null) {
+ logger.warn("Discovery agent not configured for the sink!");
+ discovery = new Discovery<byte[]>()
+ {
+ @Override
+ public void unadvertise(Service<byte[]> service)
+ {
+ logger.debug("Sink {} stopped listening on {}:{}", service.getId(), service.getHost(), service.getPort());
+ }
+
+ @Override
+ public void advertise(Service<byte[]> service)
+ {
+ logger.debug("Sink {} started listening on {}:{}", service.getId(), service.getHost(), service.getPort());
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public Collection<Service<byte[]>> discover()
+ {
+ return Collections.EMPTY_SET;
+ }
+
+ };
+ } else {
+ discovery = ldiscovery;
+ }
+
+ storage = configure("storage", Storage.class, context);
+ if (storage == null) {
+ logger.warn("storage key missing... DTFlumeSink may lose data!");
+ storage = new Storage()
+ {
+ @Override
+ public byte[] store(Slice slice)
+ {
+ return null;
+ }
+
+ @Override
+ public byte[] retrieve(byte[] identifier)
+ {
+ return null;
+ }
+
+ @Override
+ public byte[] retrieveNext()
+ {
+ return null;
+ }
+
+ @Override
+ public void clean(byte[] identifier)
+ {
+ }
+
+ @Override
+ public void flush()
+ {
+ }
+
+ };
+ }
+
+ @SuppressWarnings("unchecked")
+ StreamCodec<Event> lCodec = configure("codec", StreamCodec.class, context);
+ if (lCodec == null) {
+ codec = new EventCodec();
+ } else {
+ codec = lCodec;
+ }
+
+ }
+
+ /* End Configurable Interface */
+
+ @SuppressWarnings({"UseSpecificCatch", "BroadCatchBlock", "TooBroadCatch"})
+ private static <T> T configure(String key, Class<T> clazz, Context context)
+ {
+ String classname = context.getString(key);
+ if (classname == null) {
+ return null;
+ }
+
+ try {
+ Class<?> loadClass = Thread.currentThread().getContextClassLoader().loadClass(classname);
+ if (clazz.isAssignableFrom(loadClass)) {
+ @SuppressWarnings("unchecked")
+ T object = (T)loadClass.newInstance();
+ if (object instanceof Configurable) {
+ Context context1 = new Context(context.getSubProperties(key + '.'));
+ String id = context1.getString(Storage.ID);
+ if (id == null) {
+ id = context.getString(Storage.ID);
+ logger.debug("{} inherited id={} from sink", key, id);
+ context1.put(Storage.ID, id);
+ }
+ ((Configurable)object).configure(context1);
+ }
+
+ return object;
+ } else {
+ logger.error("key class {} does not implement {} interface", classname, Storage.class.getCanonicalName());
+ throw new Error("Invalid storage " + classname);
+ }
+ } catch (Error error) {
+ throw error;
+ } catch (RuntimeException re) {
+ throw re;
+ } catch (Throwable t) {
+ throw new RuntimeException(t);
+ }
+ }
+
+ /**
+ * @return the hostname
+ */
+ String getHostname()
+ {
+ return hostname;
+ }
+
+ /**
+ * @param hostname the hostname to set
+ */
+ void setHostname(String hostname)
+ {
+ this.hostname = hostname;
+ }
+
+ /**
+ * @return the port
+ */
+ int getPort()
+ {
+ return port;
+ }
+
+ public long getAcceptedTolerance()
+ {
+ return acceptedTolerance;
+ }
+
+ public void setAcceptedTolerance(long acceptedTolerance)
+ {
+ this.acceptedTolerance = acceptedTolerance;
+ }
+
+ /**
+ * @param port the port to set
+ */
+ void setPort(int port)
+ {
+ this.port = port;
+ }
+
+ /**
+ * @return the discovery
+ */
+ Discovery<byte[]> getDiscovery()
+ {
+ return discovery;
+ }
+
+ /**
+ * @param discovery the discovery to set
+ */
+ void setDiscovery(Discovery<byte[]> discovery)
+ {
+ this.discovery = discovery;
+ }
+
+ /**
+ * Attempt the sequence of writing after sleeping twice and upon failure assume
+ * that the client connection has problems and hence close it.
+ *
+ * @param address
+ * @param e
+ * @throws IOException
+ */
+ private void retryWrite(byte[] address, Slice event) throws IOException
+ {
+ if (event == null) { /* this happens for playback where address and event are sent as single object */
+ while (client.isConnected()) {
+ sleep();
+ if (client.write(address)) {
+ return;
+ }
+ }
+ } else { /* this happens when the events are taken from the flume channel and writing first time failed */
+ while (client.isConnected()) {
+ sleep();
+ if (client.write(address, event)) {
+ return;
+ }
+ }
+ }
+
+ throw new IOException("Client disconnected!");
+ }
+
+ private static final Logger logger = LoggerFactory.getLogger(DTFlumeSink.class);
+}
diff --git a/flume/src/main/java/com/datatorrent/flume/sink/Server.java b/flume/src/main/java/com/datatorrent/flume/sink/Server.java
new file mode 100644
index 0000000..14d9ff4
--- /dev/null
+++ b/flume/src/main/java/com/datatorrent/flume/sink/Server.java
@@ -0,0 +1,419 @@
+/**
+ * Copyright (c) 2016 DataTorrent, Inc. ALL Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.flume.sink;
+
+import java.net.InetSocketAddress;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.util.ArrayList;
+import java.util.Arrays;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datatorrent.flume.discovery.Discovery;
+import com.datatorrent.flume.discovery.Discovery.Service;
+import com.datatorrent.netlet.AbstractLengthPrependerClient;
+import com.datatorrent.netlet.AbstractServer;
+import com.datatorrent.netlet.EventLoop;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * <p>
+ * Server class.</p>
+ *
+ * @author Chetan Narsude <chetan@datatorrent.com>
+ * @since 0.9.2
+ */
+public class Server extends AbstractServer
+{
+ private final String id;
+ private final Discovery<byte[]> discovery;
+ private final long acceptedTolerance;
+
+ public Server(String id, Discovery<byte[]> discovery, long acceptedTolerance)
+ {
+ this.id = id;
+ this.discovery = discovery;
+ this.acceptedTolerance = acceptedTolerance;
+ }
+
+ @Override
+ public void handleException(Exception cce, EventLoop el)
+ {
+ logger.error("Server Error", cce);
+ Request r = new Request(Command.SERVER_ERROR, null)
+ {
+ @Override
+ public Slice getAddress()
+ {
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
+
+ @Override
+ public int getEventCount()
+ {
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
+
+ @Override
+ public int getIdleCount()
+ {
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
+
+ };
+ synchronized (requests) {
+ requests.add(r);
+ }
+ }
+
+ private final Service<byte[]> service = new Service<byte[]>()
+ {
+ @Override
+ public String getHost()
+ {
+ return ((InetSocketAddress)getServerAddress()).getHostName();
+ }
+
+ @Override
+ public int getPort()
+ {
+ return ((InetSocketAddress)getServerAddress()).getPort();
+ }
+
+ @Override
+ public byte[] getPayload()
+ {
+ return null;
+ }
+
+ @Override
+ public String getId()
+ {
+ return id;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "Server.Service{id=" + id + ", host=" + getHost() + ", port=" + getPort() + ", payload=" +
+ Arrays.toString(getPayload()) + '}';
+ }
+
+ };
+
+ @Override
+ public void unregistered(final SelectionKey key)
+ {
+ discovery.unadvertise(service);
+ super.unregistered(key);
+ }
+
+ @Override
+ public void registered(final SelectionKey key)
+ {
+ super.registered(key);
+ discovery.advertise(service);
+ }
+
+ public enum Command
+ {
+ ECHO((byte)0),
+ SEEK((byte)1),
+ COMMITTED((byte)2),
+ CHECKPOINTED((byte)3),
+ CONNECTED((byte)4),
+ DISCONNECTED((byte)5),
+ WINDOWED((byte)6),
+ SERVER_ERROR((byte)7);
+
+ Command(byte b)
+ {
+ this.ord = b;
+ }
+
+ public byte getOrdinal()
+ {
+ return ord;
+ }
+
+ public static Command getCommand(byte b)
+ {
+ Command c;
+ switch (b) {
+ case 0:
+ c = ECHO;
+ break;
+
+ case 1:
+ c = SEEK;
+ break;
+
+ case 2:
+ c = COMMITTED;
+ break;
+
+ case 3:
+ c = CHECKPOINTED;
+ break;
+
+ case 4:
+ c = CONNECTED;
+ break;
+
+ case 5:
+ c = DISCONNECTED;
+ break;
+
+ case 6:
+ c = WINDOWED;
+ break;
+
+ case 7:
+ c = SERVER_ERROR;
+ break;
+
+ default:
+ throw new IllegalArgumentException(String.format("No Command defined for ordinal %b", b));
+ }
+
+ assert (b == c.ord);
+ return c;
+ }
+
+ private final byte ord;
+ }
+
+ public final ArrayList<Request> requests = new ArrayList<Request>(4);
+
+ @Override
+ public ClientListener getClientConnection(SocketChannel sc, ServerSocketChannel ssc)
+ {
+ Client lClient = new Client();
+ lClient.connected();
+ return lClient;
+ }
+
+ public class Client extends AbstractLengthPrependerClient
+ {
+
+ @Override
+ public void onMessage(byte[] buffer, int offset, int size)
+ {
+ if (size != Request.FIXED_SIZE) {
+ logger.warn("Invalid Request Received: {} from {}", Arrays.copyOfRange(buffer, offset, offset + size),
+ key.channel());
+ return;
+ }
+
+ long requestTime = Server.readLong(buffer, offset + Request.TIME_OFFSET);
+ if (System.currentTimeMillis() > (requestTime + acceptedTolerance)) {
+ logger.warn("Expired Request Received: {} from {}", Arrays.copyOfRange(buffer, offset, offset + size),
+ key.channel());
+ return;
+ }
+
+ try {
+ if (Command.getCommand(buffer[offset]) == Command.ECHO) {
+ write(buffer, offset, size);
+ return;
+ }
+ } catch (IllegalArgumentException ex) {
+ logger.warn("Invalid Request Received: {} from {}!", Arrays.copyOfRange(buffer, offset, offset + size),
+ key.channel(), ex);
+ return;
+ }
+
+ Request r = Request.getRequest(buffer, offset, this);
+ synchronized (requests) {
+ requests.add(r);
+ }
+ }
+
+ @Override
+ public void disconnected()
+ {
+ synchronized (requests) {
+ requests.add(Request.getRequest(
+ new byte[] {Command.DISCONNECTED.getOrdinal(), 0, 0, 0, 0, 0, 0, 0, 0}, 0, this));
+ }
+ super.disconnected();
+ }
+
+ public boolean write(byte[] address, Slice event)
+ {
+ if (event.offset == 0 && event.length == event.buffer.length) {
+ return write(address, event.buffer);
+ }
+
+ // a better method would be to replace the write implementation and allow it to natively support writing slices
+ return write(address, event.toByteArray());
+ }
+
+ }
+
+ public abstract static class Request
+ {
+ public static final int FIXED_SIZE = 17;
+ public static final int TIME_OFFSET = 9;
+ public final Command type;
+ public final Client client;
+
+ public Request(Command type, Client client)
+ {
+ this.type = type;
+ this.client = client;
+ }
+
+ public abstract Slice getAddress();
+
+ public abstract int getEventCount();
+
+ public abstract int getIdleCount();
+
+ @Override
+ public String toString()
+ {
+ return "Request{" + "type=" + type + '}';
+ }
+
+ public static Request getRequest(final byte[] buffer, final int offset, Client client)
+ {
+ Command command = Command.getCommand(buffer[offset]);
+ switch (command) {
+ case WINDOWED:
+ return new Request(Command.WINDOWED, client)
+ {
+ final int eventCount;
+ final int idleCount;
+
+ {
+ eventCount = Server.readInt(buffer, offset + 1);
+ idleCount = Server.readInt(buffer, offset + 5);
+ }
+
+ @Override
+ public Slice getAddress()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int getEventCount()
+ {
+ return eventCount;
+ }
+
+ @Override
+ public int getIdleCount()
+ {
+ return idleCount;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "Request{" + "type=" + type + ", eventCount=" + eventCount + ", idleCount=" + idleCount + '}';
+ }
+
+ };
+
+ default:
+ return new Request(command, client)
+ {
+ final Slice address;
+
+ {
+ address = new Slice(buffer, offset + 1, 8);
+ }
+
+ @Override
+ public Slice getAddress()
+ {
+ return address;
+ }
+
+ @Override
+ public int getEventCount()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int getIdleCount()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public String toString()
+ {
+ return "Request{" + "type=" + type + ", address=" + address + '}';
+ }
+
+ };
+
+ }
+
+ }
+
+ }
+
+ public static int readInt(byte[] buffer, int offset)
+ {
+ return buffer[offset++] & 0xff
+ | (buffer[offset++] & 0xff) << 8
+ | (buffer[offset++] & 0xff) << 16
+ | (buffer[offset++] & 0xff) << 24;
+ }
+
+ public static void writeInt(byte[] buffer, int offset, int i)
+ {
+ buffer[offset++] = (byte)i;
+ buffer[offset++] = (byte)(i >>> 8);
+ buffer[offset++] = (byte)(i >>> 16);
+ buffer[offset++] = (byte)(i >>> 24);
+ }
+
+ public static long readLong(byte[] buffer, int offset)
+ {
+ return (long)buffer[offset++] & 0xff
+ | (long)(buffer[offset++] & 0xff) << 8
+ | (long)(buffer[offset++] & 0xff) << 16
+ | (long)(buffer[offset++] & 0xff) << 24
+ | (long)(buffer[offset++] & 0xff) << 32
+ | (long)(buffer[offset++] & 0xff) << 40
+ | (long)(buffer[offset++] & 0xff) << 48
+ | (long)(buffer[offset++] & 0xff) << 56;
+ }
+
+ public static void writeLong(byte[] buffer, int offset, long l)
+ {
+ buffer[offset++] = (byte)l;
+ buffer[offset++] = (byte)(l >>> 8);
+ buffer[offset++] = (byte)(l >>> 16);
+ buffer[offset++] = (byte)(l >>> 24);
+ buffer[offset++] = (byte)(l >>> 32);
+ buffer[offset++] = (byte)(l >>> 40);
+ buffer[offset++] = (byte)(l >>> 48);
+ buffer[offset++] = (byte)(l >>> 56);
+ }
+
+ private static final Logger logger = LoggerFactory.getLogger(Server.class);
+}
diff --git a/flume/src/main/java/com/datatorrent/flume/source/TestSource.java b/flume/src/main/java/com/datatorrent/flume/source/TestSource.java
new file mode 100644
index 0000000..490ac35
--- /dev/null
+++ b/flume/src/main/java/com/datatorrent/flume/source/TestSource.java
@@ -0,0 +1,248 @@
+/**
+ * Copyright (c) 2016 DataTorrent, Inc. ALL Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.flume.source;
+
+import java.io.BufferedReader;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
+
+import javax.annotation.Nonnull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDrivenSource;
+import org.apache.flume.channel.ChannelProcessor;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.event.EventBuilder;
+import org.apache.flume.source.AbstractSource;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+/**
+ * <p>TestSource class.</p>
+ *
+ * @since 0.9.4
+ */
+public class TestSource extends AbstractSource implements EventDrivenSource, Configurable
+{
+ public static final String SOURCE_FILE = "sourceFile";
+ public static final String LINE_NUMBER = "lineNumber";
+ public static final String RATE = "rate";
+ public static final String PERCENT_PAST_EVENTS = "percentPastEvents";
+ static byte FIELD_SEPARATOR = 1;
+ static int DEF_PERCENT_PAST_EVENTS = 5;
+ public Timer emitTimer;
+ @Nonnull
+ String filePath;
+ int rate;
+ int numberOfPastEvents;
+ transient List<Row> cache;
+ private transient int startIndex;
+ private transient Random random;
+ private SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd");
+ private SimpleDateFormat timeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+
+ public TestSource()
+ {
+ super();
+ this.rate = 2500;
+ this.numberOfPastEvents = DEF_PERCENT_PAST_EVENTS * 25;
+ this.random = new Random();
+
+ }
+
+ @Override
+ public void configure(Context context)
+ {
+ filePath = context.getString(SOURCE_FILE);
+ rate = context.getInteger(RATE, rate);
+ int percentPastEvents = context.getInteger(PERCENT_PAST_EVENTS, DEF_PERCENT_PAST_EVENTS);
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(filePath));
+ try {
+ BufferedReader lineReader = new BufferedReader(new InputStreamReader(new FileInputStream(filePath)));
+ try {
+ buildCache(lineReader);
+ } finally {
+ lineReader.close();
+ }
+ } catch (FileNotFoundException e) {
+ throw new RuntimeException(e);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ if (DEF_PERCENT_PAST_EVENTS != percentPastEvents) {
+ numberOfPastEvents = (int)(percentPastEvents / 100.0 * cache.size());
+ }
+ }
+
+ @Override
+ public void start()
+ {
+ super.start();
+ emitTimer = new Timer();
+
+ final ChannelProcessor channel = getChannelProcessor();
+ final int cacheSize = cache.size();
+ emitTimer.scheduleAtFixedRate(new TimerTask()
+ {
+ @Override
+ public void run()
+ {
+ int lastIndex = startIndex + rate;
+ if (lastIndex > cacheSize) {
+ lastIndex -= cacheSize;
+ processBatch(channel, cache.subList(startIndex, cacheSize));
+ startIndex = 0;
+ while (lastIndex > cacheSize) {
+ processBatch(channel, cache);
+ lastIndex -= cacheSize;
+ }
+ processBatch(channel, cache.subList(0, lastIndex));
+ } else {
+ processBatch(channel, cache.subList(startIndex, lastIndex));
+ }
+ startIndex = lastIndex;
+ }
+
+ }, 0, 1000);
+ }
+
+ private void processBatch(ChannelProcessor channelProcessor, List<Row> rows)
+ {
+ if (rows.isEmpty()) {
+ return;
+ }
+
+ int noise = random.nextInt(numberOfPastEvents + 1);
+ Set<Integer> pastIndices = Sets.newHashSet();
+ for (int i = 0; i < noise; i++) {
+ pastIndices.add(random.nextInt(rows.size()));
+ }
+
+ Calendar calendar = Calendar.getInstance();
+ long high = calendar.getTimeInMillis();
+ calendar.add(Calendar.DATE, -2);
+ long low = calendar.getTimeInMillis();
+
+
+
+ List<Event> events = Lists.newArrayList();
+ for (int i = 0; i < rows.size(); i++) {
+ Row eventRow = rows.get(i);
+ if (pastIndices.contains(i)) {
+ long pastTime = (long)((Math.random() * (high - low)) + low);
+ byte[] pastDateField = dateFormat.format(pastTime).getBytes();
+ byte[] pastTimeField = timeFormat.format(pastTime).getBytes();
+
+ System.arraycopy(pastDateField, 0, eventRow.bytes, eventRow.dateFieldStart, pastDateField.length);
+ System.arraycopy(pastTimeField, 0, eventRow.bytes, eventRow.timeFieldStart, pastTimeField.length);
+ } else {
+ calendar.setTimeInMillis(System.currentTimeMillis());
+ byte[] currentDateField = dateFormat.format(calendar.getTime()).getBytes();
+ byte[] currentTimeField = timeFormat.format(calendar.getTime()).getBytes();
+
+ System.arraycopy(currentDateField, 0, eventRow.bytes, eventRow.dateFieldStart, currentDateField.length);
+ System.arraycopy(currentTimeField, 0, eventRow.bytes, eventRow.timeFieldStart, currentTimeField.length);
+ }
+
+ HashMap<String, String> headers = new HashMap<String, String>(2);
+ headers.put(SOURCE_FILE, filePath);
+ headers.put(LINE_NUMBER, String.valueOf(startIndex + i));
+ events.add(EventBuilder.withBody(eventRow.bytes, headers));
+ }
+ channelProcessor.processEventBatch(events);
+ }
+
+ @Override
+ public void stop()
+ {
+ emitTimer.cancel();
+ super.stop();
+ }
+
+ private void buildCache(BufferedReader lineReader) throws IOException
+ {
+ cache = Lists.newArrayListWithCapacity(rate);
+
+ String line;
+ while ((line = lineReader.readLine()) != null) {
+ byte[] row = line.getBytes();
+ Row eventRow = new Row(row);
+ final int rowsize = row.length;
+
+ /* guid */
+ int sliceLengh = -1;
+ while (++sliceLengh < rowsize) {
+ if (row[sliceLengh] == FIELD_SEPARATOR) {
+ break;
+ }
+ }
+ int recordStart = sliceLengh + 1;
+ int pointer = sliceLengh + 1;
+ while (pointer < rowsize) {
+ if (row[pointer++] == FIELD_SEPARATOR) {
+ eventRow.dateFieldStart = recordStart;
+ break;
+ }
+ }
+
+ /* lets parse the date */
+ int dateStart = pointer;
+ while (pointer < rowsize) {
+ if (row[pointer++] == FIELD_SEPARATOR) {
+ eventRow.timeFieldStart = dateStart;
+ break;
+ }
+ }
+
+ cache.add(eventRow);
+ }
+ }
+
+ private static class Row
+ {
+ final byte[] bytes;
+ int dateFieldStart;
+ int timeFieldStart;
+// boolean past;
+
+ Row(byte[] bytes)
+ {
+ this.bytes = bytes;
+ }
+
+ }
+
+ private static final Logger logger = LoggerFactory.getLogger(TestSource.class);
+}
diff --git a/flume/src/main/java/com/datatorrent/flume/storage/DebugWrapper.java b/flume/src/main/java/com/datatorrent/flume/storage/DebugWrapper.java
new file mode 100644
index 0000000..c416418
--- /dev/null
+++ b/flume/src/main/java/com/datatorrent/flume/storage/DebugWrapper.java
@@ -0,0 +1,131 @@
+/**
+ * Copyright (c) 2016 DataTorrent, Inc. ALL Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.flume.storage;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.flume.Context;
+import org.apache.flume.conf.Configurable;
+
+import com.datatorrent.api.Component;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * <p>DebugWrapper class.</p>
+ *
+ * @author Chetan Narsude <chetan@datatorrent.com>
+ * @since 0.9.4
+ */
+public class DebugWrapper implements Storage, Configurable, Component<com.datatorrent.api.Context>
+{
+ HDFSStorage storage = new HDFSStorage();
+
+ @Override
+ public byte[] store(Slice bytes)
+ {
+ byte[] ret = null;
+
+ try {
+ ret = storage.store(bytes);
+ } finally {
+ logger.debug("storage.store(new byte[]{{}});", bytes);
+ }
+
+ return ret;
+ }
+
+ @Override
+ public byte[] retrieve(byte[] identifier)
+ {
+ byte[] ret = null;
+
+ try {
+ ret = storage.retrieve(identifier);
+ } finally {
+ logger.debug("storage.retrieve(new byte[]{{}});", identifier);
+ }
+
+ return ret;
+ }
+
+ @Override
+ public byte[] retrieveNext()
+ {
+ byte[] ret = null;
+ try {
+ ret = storage.retrieveNext();
+ } finally {
+ logger.debug("storage.retrieveNext();");
+ }
+
+ return ret;
+ }
+
+ @Override
+ public void clean(byte[] identifier)
+ {
+ try {
+ storage.clean(identifier);
+ } finally {
+ logger.debug("storage.clean(new byte[]{{}});", identifier);
+ }
+ }
+
+ @Override
+ public void flush()
+ {
+ try {
+ storage.flush();
+ } finally {
+ logger.debug("storage.flush();");
+ }
+ }
+
+ @Override
+ public void configure(Context cntxt)
+ {
+ try {
+ storage.configure(cntxt);
+ } finally {
+ logger.debug("storage.configure({});", cntxt);
+ }
+ }
+
+ @Override
+ public void setup(com.datatorrent.api.Context t1)
+ {
+ try {
+ storage.setup(t1);
+ } finally {
+ logger.debug("storage.setup({});", t1);
+ }
+
+ }
+
+ @Override
+ public void teardown()
+ {
+ try {
+ storage.teardown();
+ } finally {
+ logger.debug("storage.teardown();");
+ }
+ }
+
+ private static final Logger logger = LoggerFactory.getLogger(DebugWrapper.class);
+}
diff --git a/flume/src/main/java/com/datatorrent/flume/storage/ErrorMaskingEventCodec.java b/flume/src/main/java/com/datatorrent/flume/storage/ErrorMaskingEventCodec.java
new file mode 100644
index 0000000..59c7fd3
--- /dev/null
+++ b/flume/src/main/java/com/datatorrent/flume/storage/ErrorMaskingEventCodec.java
@@ -0,0 +1,61 @@
+/**
+ * Copyright (c) 2016 DataTorrent, Inc. ALL Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.flume.storage;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.flume.Event;
+
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * <p>ErrorMaskingEventCodec class.</p>
+ *
+ * @author Chetan Narsude <chetan@datatorrent.com>
+ * @since 1.0.4
+ */
+public class ErrorMaskingEventCodec extends EventCodec
+{
+
+ @Override
+ public Object fromByteArray(Slice fragment)
+ {
+ try {
+ return super.fromByteArray(fragment);
+ } catch (RuntimeException re) {
+ logger.warn("Cannot deserialize event {}", fragment, re);
+ }
+
+ return null;
+ }
+
+ @Override
+ public Slice toByteArray(Event event)
+ {
+ try {
+ return super.toByteArray(event);
+ } catch (RuntimeException re) {
+ logger.warn("Cannot serialize event {}", event, re);
+ }
+
+ return null;
+ }
+
+
+ private static final Logger logger = LoggerFactory.getLogger(ErrorMaskingEventCodec.class);
+}
diff --git a/flume/src/main/java/com/datatorrent/flume/storage/EventCodec.java b/flume/src/main/java/com/datatorrent/flume/storage/EventCodec.java
new file mode 100644
index 0000000..03d0d87
--- /dev/null
+++ b/flume/src/main/java/com/datatorrent/flume/storage/EventCodec.java
@@ -0,0 +1,91 @@
+/**
+ * Copyright (c) 2016 DataTorrent, Inc. ALL Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.flume.storage;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.flume.Event;
+import org.apache.flume.event.EventBuilder;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+import com.datatorrent.api.StreamCodec;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * <p>EventCodec class.</p>
+ *
+ * @author Chetan Narsude <chetan@datatorrent.com>
+ * @since 0.9.4
+ */
+public class EventCodec implements StreamCodec<Event>
+{
+ private final transient Kryo kryo;
+
+ public EventCodec()
+ {
+ this.kryo = new Kryo();
+ this.kryo.setClassLoader(Thread.currentThread().getContextClassLoader());
+ }
+
+ @Override
+ public Object fromByteArray(Slice fragment)
+ {
+ ByteArrayInputStream is = new ByteArrayInputStream(fragment.buffer, fragment.offset, fragment.length);
+ Input input = new Input(is);
+
+ @SuppressWarnings("unchecked")
+ HashMap<String, String> headers = kryo.readObjectOrNull(input, HashMap.class);
+ byte[] body = kryo.readObjectOrNull(input, byte[].class);
+ return EventBuilder.withBody(body, headers);
+ }
+
+ @Override
+ public Slice toByteArray(Event event)
+ {
+ ByteArrayOutputStream os = new ByteArrayOutputStream();
+ Output output = new Output(os);
+
+ Map<String, String> headers = event.getHeaders();
+ if (headers != null && headers.getClass() != HashMap.class) {
+ HashMap<String, String> tmp = new HashMap<String, String>(headers.size());
+ tmp.putAll(headers);
+ headers = tmp;
+ }
+ kryo.writeObjectOrNull(output, headers, HashMap.class);
+ kryo.writeObjectOrNull(output, event.getBody(), byte[].class);
+ output.flush();
+ final byte[] bytes = os.toByteArray();
+ return new Slice(bytes, 0, bytes.length);
+ }
+
+ @Override
+ public int getPartition(Event o)
+ {
+ return o.hashCode();
+ }
+
+ private static final Logger logger = LoggerFactory.getLogger(EventCodec.class);
+}
diff --git a/flume/src/main/resources/flume-conf/flume-conf.sample.properties b/flume/src/main/resources/flume-conf/flume-conf.sample.properties
new file mode 100644
index 0000000..9d3e430
--- /dev/null
+++ b/flume/src/main/resources/flume-conf/flume-conf.sample.properties
@@ -0,0 +1,45 @@
+#
+# Copyright (c) 2016 DataTorrent, Inc. ALL Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+#agent1 on node1
+ agent1.sources = netcatSource
+ agent1.channels = ch1
+ agent1.sinks = dt
+
+# first sink - dt
+ agent1.sinks.dt.type = com.datatorrent.flume.sink.DTFlumeSink
+ agent1.sinks.dt.id = sink1
+ agent1.sinks.dt.hostname = localhost
+ agent1.sinks.dt.port = 8080
+ agent1.sinks.dt.sleepMillis = 7
+ agent1.sinks.dt.throughputAdjustmentFactor = 2
+ agent1.sinks.dt.maximumEventsPerTransaction = 5000
+ agent1.sinks.dt.minimumEventsPerTransaction = 1
+ agent1.sinks.dt.storage = com.datatorrent.flume.storage.HDFSStorage
+ agent1.sinks.dt.storage.restore = false
+ agent1.sinks.dt.storage.baseDir = /tmp/flume101
+ agent1.sinks.dt.channel = ch1
+
+# channels
+ agent1.channels.ch1.type = file
+ agent1.channels.ch1.capacity = 10000000
+ agent1.channels.ch1.transactionCapacity = 10000
+ agent1.channels.ch1.maxFileSize = 67108864
+
+ agent1.sources.netcatSource.type = exec
+ agent1.sources.netcatSource.channels = ch1
+ agent1.sources.netcatSource.command = src/test/bash/subcat_periodically src/test/resources/test_data/dt_spend 10000 1
diff --git a/flume/src/main/resources/flume-conf/flume-env.sample.sh b/flume/src/main/resources/flume-conf/flume-env.sample.sh
new file mode 100644
index 0000000..aca341c
--- /dev/null
+++ b/flume/src/main/resources/flume-conf/flume-env.sample.sh
@@ -0,0 +1,36 @@
+#!/bin/bash
+#
+# Copyright (c) 2016 DataTorrent, Inc. ALL Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+
+# This script runs on the machine which have maven repository populated under
+# $HOME/.m2 If that's not the case, please adjust the JARPATH variable below
+# to point to colon separated list of directories where jar files can be found
+if test -z "$DT_FLUME_JAR"
+then
+ echo [ERROR]: Environment variable DT_FLUME_JAR should point to a valid jar file which contains DTFlumeSink class >&2
+ exit 2
+fi
+
+echo JARPATH is set to ${JARPATH:=$HOME/.m2/repository:.}
+if test -z "$JAVA_HOME"
+then
+ JAVA=java
+else
+ JAVA=${JAVA_HOME}/bin/java
+fi
+FLUME_CLASSPATH=`JARPATH=$JARPATH $JAVA -cp $DT_FLUME_JAR com.datatorrent.jarpath.JarPath -N $DT_FLUME_JAR -Xdt-jarpath -Xdt-netlet`
diff --git a/flume/src/test/java/com/datatorrent/flume/discovery/ZKAssistedDiscoveryTest.java b/flume/src/test/java/com/datatorrent/flume/discovery/ZKAssistedDiscoveryTest.java
new file mode 100644
index 0000000..4acf764
--- /dev/null
+++ b/flume/src/test/java/com/datatorrent/flume/discovery/ZKAssistedDiscoveryTest.java
@@ -0,0 +1,142 @@
+/**
+ * Copyright (c) 2016 DataTorrent, Inc. ALL Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.flume.discovery;
+
+import org.codehaus.jackson.type.TypeReference;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.curator.x.discovery.ServiceInstance;
+import org.apache.curator.x.discovery.details.InstanceSerializer;
+
+import com.datatorrent.flume.discovery.Discovery.Service;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertNotNull;
+
+/**
+ *
+ * @author Chetan Narsude <chetan@datatorrent.com>
+ */
+@Ignore
+public class ZKAssistedDiscoveryTest
+{
+ public ZKAssistedDiscoveryTest()
+ {
+ }
+
+ @Test
+ public void testSerialization() throws Exception
+ {
+ ZKAssistedDiscovery discovery = new ZKAssistedDiscovery();
+ discovery.setServiceName("DTFlumeTest");
+ discovery.setConnectionString("localhost:2181");
+ discovery.setBasePath("/HelloDT");
+ discovery.setup(null);
+ ServiceInstance<byte[]> instance = discovery.getInstance(new Service<byte[]>()
+ {
+ @Override
+ public String getHost()
+ {
+ return "localhost";
+ }
+
+ @Override
+ public int getPort()
+ {
+ return 8080;
+ }
+
+ @Override
+ public byte[] getPayload()
+ {
+ return null;
+ }
+
+ @Override
+ public String getId()
+ {
+ return "localhost8080";
+ }
+
+ });
+ InstanceSerializer<byte[]> instanceSerializer =
+ discovery.getInstanceSerializerFactory().getInstanceSerializer(new TypeReference<ServiceInstance<byte[]>>()
+ {
+ });
+ byte[] serialize = instanceSerializer.serialize(instance);
+ logger.debug("serialized json = {}", new String(serialize));
+ ServiceInstance<byte[]> deserialize = instanceSerializer.deserialize(serialize);
+ assertArrayEquals("Metadata", instance.getPayload(), deserialize.getPayload());
+ }
+
+ @Test
+ public void testDiscover()
+ {
+ ZKAssistedDiscovery discovery = new ZKAssistedDiscovery();
+ discovery.setServiceName("DTFlumeTest");
+ discovery.setConnectionString("localhost:2181");
+ discovery.setBasePath("/HelloDT");
+ discovery.setup(null);
+ assertNotNull("Discovered Sinks", discovery.discover());
+ discovery.teardown();
+ }
+
+ @Test
+ public void testAdvertize()
+ {
+ ZKAssistedDiscovery discovery = new ZKAssistedDiscovery();
+ discovery.setServiceName("DTFlumeTest");
+ discovery.setConnectionString("localhost:2181");
+ discovery.setBasePath("/HelloDT");
+ discovery.setup(null);
+
+ Service<byte[]> service = new Service<byte[]>()
+ {
+ @Override
+ public String getHost()
+ {
+ return "chetan";
+ }
+
+ @Override
+ public int getPort()
+ {
+ return 5033;
+ }
+
+ @Override
+ public byte[] getPayload()
+ {
+ return new byte[] {3, 2, 1};
+ }
+
+ @Override
+ public String getId()
+ {
+ return "uniqueId";
+ }
+
+ };
+ discovery.advertise(service);
+ discovery.teardown();
+ }
+
+ private static final Logger logger = LoggerFactory.getLogger(ZKAssistedDiscoveryTest.class);
+}
diff --git a/flume/src/test/java/com/datatorrent/flume/integration/ApplicationTest.java b/flume/src/test/java/com/datatorrent/flume/integration/ApplicationTest.java
new file mode 100644
index 0000000..41364c8
--- /dev/null
+++ b/flume/src/test/java/com/datatorrent/flume/integration/ApplicationTest.java
@@ -0,0 +1,116 @@
+/**
+ * Copyright (c) 2016 DataTorrent, Inc. ALL Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.flume.integration;
+
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.flume.Event;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DAG.Locality;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.flume.operator.AbstractFlumeInputOperator;
+import com.datatorrent.flume.storage.EventCodec;
+
+/**
+ *
+ * @author Chetan Narsude <chetan@datatorrent.com>
+ */
+@Ignore
+public class ApplicationTest implements StreamingApplication
+{
+ public static class FlumeInputOperator extends AbstractFlumeInputOperator<Event>
+ {
+ @Override
+ public Event convert(Event event)
+ {
+ return event;
+ }
+ }
+
+ public static class Counter implements Operator
+ {
+ private int count;
+ private transient Event event;
+ public final transient DefaultInputPort<Event> input = new DefaultInputPort<Event>()
+ {
+ @Override
+ public void process(Event tuple)
+ {
+ count++;
+ event = tuple;
+ }
+
+ };
+
+ @Override
+ public void beginWindow(long windowId)
+ {
+ }
+
+ @Override
+ public void endWindow()
+ {
+ logger.debug("total count = {}, tuple = {}", count, event);
+ }
+
+ @Override
+ public void setup(OperatorContext context)
+ {
+ }
+
+ @Override
+ public void teardown()
+ {
+ }
+
+ private static final Logger logger = LoggerFactory.getLogger(Counter.class);
+ }
+
+ @Override
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+ dag.setAttribute(com.datatorrent.api.Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS, 1000);
+ FlumeInputOperator flume = dag.addOperator("FlumeOperator", new FlumeInputOperator());
+ flume.setConnectAddresses(new String[]{"test:127.0.0.1:8080"});
+ flume.setCodec(new EventCodec());
+ Counter counter = dag.addOperator("Counter", new Counter());
+
+ dag.addStream("Slices", flume.output, counter.input).setLocality(Locality.CONTAINER_LOCAL);
+ }
+
+ @Test
+ public void test()
+ {
+ try {
+ LocalMode.runApp(this, Integer.MAX_VALUE);
+ } catch (Exception ex) {
+ logger.warn("The dag seems to be not testable yet, if it's - remove this exception handling", ex);
+ }
+
+ }
+
+ private static final Logger logger = LoggerFactory.getLogger(ApplicationTest.class);
+}
diff --git a/flume/src/test/java/com/datatorrent/flume/interceptor/ColumnFilteringInterceptorTest.java b/flume/src/test/java/com/datatorrent/flume/interceptor/ColumnFilteringInterceptorTest.java
new file mode 100644
index 0000000..464df42
--- /dev/null
+++ b/flume/src/test/java/com/datatorrent/flume/interceptor/ColumnFilteringInterceptorTest.java
@@ -0,0 +1,85 @@
+/**
+ * Copyright (c) 2016 DataTorrent, Inc. ALL Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.flume.interceptor;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.HashMap;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.flume.Context;
+import org.apache.flume.interceptor.Interceptor;
+
+import static org.junit.Assert.assertArrayEquals;
+
+/**
+ * @author Chetan Narsude <chetan@datatorrent.com>
+ */
+public class ColumnFilteringInterceptorTest
+{
+ private static InterceptorTestHelper helper;
+
+ @BeforeClass
+ public static void startUp()
+ {
+ HashMap<String, String> contextMap = new HashMap<String, String>();
+ contextMap.put(ColumnFilteringInterceptor.Constants.DST_SEPARATOR, Byte.toString((byte)1));
+ contextMap.put(ColumnFilteringInterceptor.Constants.SRC_SEPARATOR, Byte.toString((byte)2));
+ contextMap.put(ColumnFilteringInterceptor.Constants.COLUMNS, "1 2 3");
+
+ helper = new InterceptorTestHelper(new ColumnFilteringInterceptor.Builder(), contextMap);
+ }
+
+ @Test
+ public void testInterceptEvent()
+ {
+ helper.testIntercept_Event();
+ }
+
+ @Test
+ public void testFiles() throws IOException, URISyntaxException
+ {
+ helper.testFiles();
+ }
+
+ @Test
+ public void testInterceptEventWithColumnZero()
+ {
+ HashMap<String, String> contextMap = new HashMap<String, String>();
+ contextMap.put(ColumnFilteringInterceptor.Constants.DST_SEPARATOR, Byte.toString((byte)1));
+ contextMap.put(ColumnFilteringInterceptor.Constants.SRC_SEPARATOR, Byte.toString((byte)2));
+ contextMap.put(ColumnFilteringInterceptor.Constants.COLUMNS, "0");
+
+ ColumnFilteringInterceptor.Builder builder = new ColumnFilteringInterceptor.Builder();
+ builder.configure(new Context(contextMap));
+ Interceptor interceptor = builder.build();
+
+ assertArrayEquals("Empty Bytes",
+ "\001".getBytes(),
+ interceptor.intercept(new InterceptorTestHelper.MyEvent("".getBytes())).getBody());
+
+ assertArrayEquals("One Field",
+ "First\001".getBytes(),
+ interceptor.intercept(new InterceptorTestHelper.MyEvent("First".getBytes())).getBody());
+
+ assertArrayEquals("Two Fields",
+ "\001".getBytes(),
+ interceptor.intercept(new InterceptorTestHelper.MyEvent("\002First".getBytes())).getBody());
+ }
+}
diff --git a/flume/src/test/java/com/datatorrent/flume/interceptor/InterceptorTestHelper.java b/flume/src/test/java/com/datatorrent/flume/interceptor/InterceptorTestHelper.java
new file mode 100644
index 0000000..739184f
--- /dev/null
+++ b/flume/src/test/java/com/datatorrent/flume/interceptor/InterceptorTestHelper.java
@@ -0,0 +1,214 @@
+/**
+ * Copyright (c) 2016 DataTorrent, Inc. ALL Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.flume.interceptor;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+
+import org.junit.Assert;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.interceptor.Interceptor;
+
+import com.datatorrent.netlet.util.Slice;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertNotNull;
+
+/**
+ * @author Chetan Narsude <chetan@datatorrent.com>
+ */
+public class InterceptorTestHelper
+{
+ private static final byte FIELD_SEPARATOR = 1;
+
+ static class MyEvent implements Event
+ {
+ byte[] body;
+
+ MyEvent(byte[] bytes)
+ {
+ body = bytes;
+ }
+
+ @Override
+ public Map<String, String> getHeaders()
+ {
+ return null;
+ }
+
+ @Override
+ public void setHeaders(Map<String, String> map)
+ {
+ }
+
+ @Override
+ @SuppressWarnings("ReturnOfCollectionOrArrayField")
+ public byte[] getBody()
+ {
+ return body;
+ }
+
+ @Override
+ @SuppressWarnings("AssignmentToCollectionOrArrayFieldFromParameter")
+ public void setBody(byte[] bytes)
+ {
+ body = bytes;
+ }
+ }
+
+ private final Interceptor.Builder builder;
+ private final Map<String, String> context;
+
+ InterceptorTestHelper(Interceptor.Builder builder, Map<String, String> context)
+ {
+ this.builder = builder;
+ this.context = context;
+ }
+
+ public void testIntercept_Event()
+ {
+ builder.configure(new Context(context));
+ Interceptor interceptor = builder.build();
+
+ assertArrayEquals("Empty Bytes",
+ "\001\001\001".getBytes(),
+ interceptor.intercept(new MyEvent("".getBytes())).getBody());
+
+ assertArrayEquals("One Separator",
+ "\001\001\001".getBytes(),
+ interceptor.intercept(new MyEvent("\002".getBytes())).getBody());
+
+ assertArrayEquals("Two Separators",
+ "\001\001\001".getBytes(),
+ interceptor.intercept(new MyEvent("\002\002".getBytes())).getBody());
+
+ assertArrayEquals("One Field",
+ "\001\001\001".getBytes(),
+ interceptor.intercept(new MyEvent("First".getBytes())).getBody());
+
+ assertArrayEquals("Two Fields",
+ "First\001\001\001".getBytes(),
+ interceptor.intercept(new MyEvent("\002First".getBytes())).getBody());
+
+ assertArrayEquals("Two Fields",
+ "\001\001\001".getBytes(),
+ interceptor.intercept(new MyEvent("First\001".getBytes())).getBody());
+
+ assertArrayEquals("Two Fields",
+ "Second\001\001\001".getBytes(),
+ interceptor.intercept(new MyEvent("First\002Second".getBytes())).getBody());
+
+ assertArrayEquals("Three Fields",
+ "Second\001\001\001".getBytes(),
+ interceptor.intercept(new MyEvent("First\002Second\002".getBytes())).getBody());
+
+ assertArrayEquals("Three Fields",
+ "\001Second\001\001".getBytes(),
+ interceptor.intercept(new MyEvent("First\002\002Second".getBytes())).getBody());
+
+ assertArrayEquals("Four Fields",
+ "\001Second\001\001".getBytes(),
+ interceptor.intercept(new MyEvent("First\002\002Second\002".getBytes())).getBody());
+
+ assertArrayEquals("Five Fields",
+ "\001Second\001\001".getBytes(),
+ interceptor.intercept(new MyEvent("First\002\002Second\002\002".getBytes())).getBody());
+
+ assertArrayEquals("Six Fields",
+ "\001Second\001\001".getBytes(),
+ interceptor.intercept(new MyEvent("First\002\002Second\002\002\002".getBytes())).getBody());
+ }
+
+ public void testFiles() throws IOException, URISyntaxException
+ {
+ Properties properties = new Properties();
+ properties.load(getClass().getResourceAsStream("/flume/conf/flume-conf.properties"));
+
+ String interceptor = null;
+ for (Entry<Object, Object> entry : properties.entrySet()) {
+ logger.debug("{} => {}", entry.getKey(), entry.getValue());
+
+ if (builder.getClass().getName().equals(entry.getValue().toString())) {
+ String key = entry.getKey().toString();
+ if (key.endsWith(".type")) {
+ interceptor = key.substring(0, key.length() - "type".length());
+ break;
+ }
+ }
+ }
+
+ assertNotNull(builder.getClass().getName(), interceptor);
+ @SuppressWarnings({"null", "ConstantConditions"})
+ final int interceptorLength = interceptor.length();
+
+ HashMap<String, String> map = new HashMap<String, String>();
+ for (Entry<Object, Object> entry : properties.entrySet()) {
+ String key = entry.getKey().toString();
+ if (key.startsWith(interceptor)) {
+ map.put(key.substring(interceptorLength), entry.getValue().toString());
+ }
+ }
+
+ builder.configure(new Context(map));
+ Interceptor interceptorInstance = builder.build();
+
+ URL url = getClass().getResource("/test_data/gentxns/");
+ assertNotNull("Generated Transactions", url);
+
+ int records = 0;
+ File dir = new File(url.toURI());
+ for (File file : dir.listFiles()) {
+ records += processFile(file, interceptorInstance);
+ }
+
+ Assert.assertEquals("Total Records", 2200, records);
+ }
+
+ private int processFile(File file, Interceptor interceptor) throws IOException
+ {
+ InputStream stream = getClass().getResourceAsStream("/test_data/gentxns/" + file.getName());
+ BufferedReader br = new BufferedReader(new InputStreamReader(stream));
+
+ String line;
+ int i = 0;
+ while ((line = br.readLine()) != null) {
+ byte[] body = interceptor.intercept(new MyEvent(line.getBytes())).getBody();
+ RawEvent event = RawEvent.from(body, FIELD_SEPARATOR);
+ Assert.assertEquals("GUID", new Slice(line.getBytes(), 0, 32), event.guid);
+ logger.debug("guid = {}, time = {}", event.guid, event.time);
+ i++;
+ }
+
+ br.close();
+ return i;
+ }
+
+ private static final Logger logger = LoggerFactory.getLogger(InterceptorTestHelper.class);
+}
diff --git a/flume/src/test/java/com/datatorrent/flume/interceptor/RawEvent.java b/flume/src/test/java/com/datatorrent/flume/interceptor/RawEvent.java
new file mode 100644
index 0000000..049609b
--- /dev/null
+++ b/flume/src/test/java/com/datatorrent/flume/interceptor/RawEvent.java
@@ -0,0 +1,119 @@
+/**
+ * Copyright (c) 2016 DataTorrent, Inc. ALL Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.flume.interceptor;
+
+import java.io.Serializable;
+
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ *
+ * @author Chetan Narsude <chetan@datatorrent.com>
+ */
+public class RawEvent implements Serializable
+{
+ public Slice guid;
+ public long time;
+ public int dimensionsOffset;
+
+ public Slice getGUID()
+ {
+ return guid;
+ }
+
+ public long getTime()
+ {
+ return time;
+ }
+
+ RawEvent()
+ {
+ /* needed for Kryo serialization */
+ }
+
+ public static RawEvent from(byte[] row, byte separator)
+ {
+ final int rowsize = row.length;
+
+ /*
+ * Lets get the guid out of the current record
+ */
+ int sliceLengh = -1;
+ while (++sliceLengh < rowsize) {
+ if (row[sliceLengh] == separator) {
+ break;
+ }
+ }
+
+ int i = sliceLengh + 1;
+
+ /* lets parse the date */
+ int dateStart = i;
+ while (i < rowsize) {
+ if (row[i++] == separator) {
+ long time = DATE_PARSER.parseMillis(new String(row, dateStart, i - dateStart - 1));
+ RawEvent event = new RawEvent();
+ event.guid = new Slice(row, 0, sliceLengh);
+ event.time = time;
+ event.dimensionsOffset = i;
+ return event;
+ }
+ }
+
+ return null;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ int hash = 5;
+ hash = 61 * hash + (this.guid != null ? this.guid.hashCode() : 0);
+ hash = 61 * hash + (int)(this.time ^ (this.time >>> 32));
+ return hash;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "RawEvent{" + "guid=" + guid + ", time=" + time + '}';
+ }
+
+ @Override
+ public boolean equals(Object obj)
+ {
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+ final RawEvent other = (RawEvent)obj;
+ if (this.guid != other.guid && (this.guid == null || !this.guid.equals(other.guid))) {
+ return false;
+ }
+ return this.time == other.time;
+ }
+
+ private static final DateTimeFormatter DATE_PARSER = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss");
+ private static final Logger logger = LoggerFactory.getLogger(RawEvent.class);
+ private static final long serialVersionUID = 201312191312L;
+}
diff --git a/flume/src/test/java/com/datatorrent/flume/operator/AbstractFlumeInputOperatorTest.java b/flume/src/test/java/com/datatorrent/flume/operator/AbstractFlumeInputOperatorTest.java
new file mode 100644
index 0000000..a615496
--- /dev/null
+++ b/flume/src/test/java/com/datatorrent/flume/operator/AbstractFlumeInputOperatorTest.java
@@ -0,0 +1,56 @@
+/**
+ * Copyright (c) 2016 DataTorrent, Inc. ALL Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.flume.operator;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ *
+ * @author Chetan Narsude <chetan@datatorrent.com>
+ */
+public class AbstractFlumeInputOperatorTest
+{
+ public AbstractFlumeInputOperatorTest()
+ {
+ }
+
+ @Test
+ public void testThreadLocal()
+ {
+ ThreadLocal<Set<Integer>> tl = new ThreadLocal<Set<Integer>>()
+ {
+ @Override
+ protected Set<Integer> initialValue()
+ {
+ return new HashSet<Integer>();
+ }
+
+ };
+ Set<Integer> get1 = tl.get();
+ get1.add(1);
+ assertTrue("Just Added Value", get1.contains(1));
+
+ Set<Integer> get2 = tl.get();
+ assertTrue("Previously added value", get2.contains(1));
+ }
+
+}
diff --git a/flume/src/test/java/com/datatorrent/flume/sink/DTFlumeSinkTest.java b/flume/src/test/java/com/datatorrent/flume/sink/DTFlumeSinkTest.java
new file mode 100644
index 0000000..833a353
--- /dev/null
+++ b/flume/src/test/java/com/datatorrent/flume/sink/DTFlumeSinkTest.java
@@ -0,0 +1,143 @@
+/**
+ * Copyright (c) 2016 DataTorrent, Inc. ALL Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.flume.sink;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.flume.channel.MemoryChannel;
+
+import com.datatorrent.flume.discovery.Discovery;
+import com.datatorrent.netlet.AbstractLengthPrependerClient;
+import com.datatorrent.netlet.DefaultEventLoop;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * @author Chetan Narsude <chetan@datatorrent.com>
+ */
+public class DTFlumeSinkTest
+{
+ static final String hostname = "localhost";
+ int port = 0;
+
+ @Test
+ @SuppressWarnings("SleepWhileInLoop")
+ public void testServer() throws InterruptedException, IOException
+ {
+ Discovery<byte[]> discovery = new Discovery<byte[]>()
+ {
+ @Override
+ public synchronized void unadvertise(Service<byte[]> service)
+ {
+ notify();
+ }
+
+ @Override
+ public synchronized void advertise(Service<byte[]> service)
+ {
+ port = service.getPort();
+ logger.debug("listening at {}", service);
+ notify();
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public synchronized Collection<Service<byte[]>> discover()
+ {
+ try {
+ wait();
+ } catch (InterruptedException ie) {
+ throw new RuntimeException(ie);
+ }
+ return Collections.EMPTY_LIST;
+ }
+
+ };
+ DTFlumeSink sink = new DTFlumeSink();
+ sink.setName("TeskSink");
+ sink.setHostname(hostname);
+ sink.setPort(0);
+ sink.setAcceptedTolerance(2000);
+ sink.setChannel(new MemoryChannel());
+ sink.setDiscovery(discovery);
+ sink.start();
+ AbstractLengthPrependerClient client = new AbstractLengthPrependerClient()
+ {
+ private byte[] array;
+ private int offset = 2;
+
+ @Override
+ public void onMessage(byte[] buffer, int offset, int size)
+ {
+ Slice received = new Slice(buffer, offset, size);
+ logger.debug("Client Received = {}", received);
+ Assert.assertEquals(received,
+ new Slice(Arrays.copyOfRange(array, this.offset, array.length), 0, Server.Request.FIXED_SIZE));
+ synchronized (DTFlumeSinkTest.this) {
+ DTFlumeSinkTest.this.notify();
+ }
+ }
+
+ @Override
+ public void connected()
+ {
+ super.connected();
+ array = new byte[Server.Request.FIXED_SIZE + offset];
+ array[offset] = Server.Command.ECHO.getOrdinal();
+ array[offset + 1] = 1;
+ array[offset + 2] = 2;
+ array[offset + 3] = 3;
+ array[offset + 4] = 4;
+ array[offset + 5] = 5;
+ array[offset + 6] = 6;
+ array[offset + 7] = 7;
+ array[offset + 8] = 8;
+ Server.writeLong(array, offset + Server.Request.TIME_OFFSET, System.currentTimeMillis());
+ write(array, offset, Server.Request.FIXED_SIZE);
+ }
+
+ };
+
+ DefaultEventLoop eventloop = new DefaultEventLoop("Eventloop-TestClient");
+ eventloop.start();
+ discovery.discover();
+ try {
+ eventloop.connect(new InetSocketAddress(hostname, port), client);
+ try {
+ synchronized (this) {
+ this.wait();
+ }
+ } finally {
+ eventloop.disconnect(client);
+ }
+ } finally {
+ eventloop.stop();
+ }
+
+ sink.stop();
+ }
+
+ private static final Logger logger = LoggerFactory.getLogger(DTFlumeSinkTest.class);
+}
diff --git a/flume/src/test/java/com/datatorrent/flume/sink/ServerTest.java b/flume/src/test/java/com/datatorrent/flume/sink/ServerTest.java
new file mode 100644
index 0000000..64495db
--- /dev/null
+++ b/flume/src/test/java/com/datatorrent/flume/sink/ServerTest.java
@@ -0,0 +1,92 @@
+/**
+ * Copyright (c) 2016 DataTorrent, Inc. ALL Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.flume.sink;
+
+import java.util.Random;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ *
+ * @author Chetan Narsude <chetan@datatorrent.com>
+ */
+public class ServerTest
+{
+ byte[] array;
+
+ public ServerTest()
+ {
+ array = new byte[1024];
+ }
+
+ @Test
+ public void testInt()
+ {
+ Server.writeInt(array, 0, Integer.MAX_VALUE);
+ Assert.assertEquals("Max Integer", Integer.MAX_VALUE, Server.readInt(array, 0));
+
+ Server.writeInt(array, 0, Integer.MIN_VALUE);
+ Assert.assertEquals("Min Integer", Integer.MIN_VALUE, Server.readInt(array, 0));
+
+ Server.writeInt(array, 0, 0);
+ Assert.assertEquals("Zero Integer", 0, Server.readInt(array, 0));
+
+ Random rand = new Random();
+ for (int i = 0; i < 128; i++) {
+ int n = rand.nextInt();
+ if (rand.nextBoolean()) {
+ n = -n;
+ }
+ Server.writeInt(array, 0, n);
+ Assert.assertEquals("Random Integer", n, Server.readInt(array, 0));
+ }
+ }
+
+ @Test
+ public void testLong()
+ {
+ Server.writeLong(array, 0, Integer.MAX_VALUE);
+ Assert.assertEquals("Max Integer", Integer.MAX_VALUE, Server.readLong(array, 0));
+
+ Server.writeLong(array, 0, Integer.MIN_VALUE);
+ Assert.assertEquals("Min Integer", Integer.MIN_VALUE, Server.readLong(array, 0));
+
+ Server.writeLong(array, 0, 0);
+ Assert.assertEquals("Zero Integer", 0L, Server.readLong(array, 0));
+
+ Server.writeLong(array, 0, Long.MAX_VALUE);
+ Assert.assertEquals("Max Long", Long.MAX_VALUE, Server.readLong(array, 0));
+
+ Server.writeLong(array, 0, Long.MIN_VALUE);
+ Assert.assertEquals("Min Long", Long.MIN_VALUE, Server.readLong(array, 0));
+
+ Server.writeLong(array, 0, 0L);
+ Assert.assertEquals("Zero Long", 0L, Server.readLong(array, 0));
+
+ Random rand = new Random();
+ for (int i = 0; i < 128; i++) {
+ long n = rand.nextLong();
+ if (rand.nextBoolean()) {
+ n = -n;
+ }
+ Server.writeLong(array, 0, n);
+ Assert.assertEquals("Random Long", n, Server.readLong(array, 0));
+ }
+ }
+
+}
diff --git a/flume/src/test/resources/flume/conf/flume-conf.properties b/flume/src/test/resources/flume/conf/flume-conf.properties
new file mode 100644
index 0000000..c892c53
--- /dev/null
+++ b/flume/src/test/resources/flume/conf/flume-conf.properties
@@ -0,0 +1,85 @@
+#
+# Copyright (c) 2016 DataTorrent, Inc. ALL Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+#agent1 on node1
+agent1.channels = ch1
+agent1.sources = netcatSource
+agent1.sinks = dt
+
+# channels
+agent1.channels.ch1.type = file
+agent1.channels.ch1.capacity = 10000000
+agent1.channels.ch1.transactionCapacity = 10000
+agent1.channels.ch1.maxFileSize = 67108864
+
+agent1.sources.netcatSource.type = exec
+agent1.sources.netcatSource.channels = ch1
+agent1.sources.netcatSource.command = src/test/bash/subcat_periodically src/test/resources/test_data/dt_spend 10000 1
+# Pick and Reorder the columns we need from a larger record for efficiency
+ agent1.sources.netcatSource.interceptors = columnchooser
+ agent1.sources.netcatSource.interceptors.columnchooser.type = com.datatorrent.flume.interceptor.ColumnFilteringInterceptor$Builder
+ agent1.sources.netcatSource.interceptors.columnchooser.srcSeparator = 2
+ agent1.sources.netcatSource.interceptors.columnchooser.dstSeparator = 1
+ agent1.sources.netcatSource.interceptors.columnchooser.columns = 0 43 62 69 68 139 190 70 71 52 75 37 39 42 191 138
+
+ agent2.sources.netcatSource.interceptors.columnchooser.type = com.datatorrent.flume.interceptor.ColumnFilteringFormattingInterceptor$Builder
+ agent2.sources.netcatSource.interceptors.columnchooser.srcSeparator = 2
+ agent2.sources.netcatSource.interceptors.columnchooser.columnsFormatter = {0}\u0001{43}\u0001{62}\u0001{69}\u0001{68}\u0001{139}\u0001{190}\u0001{70}\u0001{71}\u0001{52}\u0001{75}\u0001{37}\u0001{39}\u0001{42}\u0001{191}\u0001{138}\u0001
+
+# index -- description -- type if different
+# 0 Slice guid; // long
+# 43 public long time // yyyy-MM-dd HH:mm:ss
+# 62 public long adv_id;
+# 69 public int cmp_type; // string
+# 68 public long cmp_id;
+# 139 public long line_id;
+# 190 public long bslice_id;
+# 70 public long ao_id;
+# 71 public long creative_id;
+# 52 public long algo_id;
+# 75 public int device_model_id; // string
+# 37 public long impressions;
+# 39 public long clicks;
+# 42 public double spend;
+# 191 public double bonus_spend;
+# 138 public double spend_local;
+#
+
+# first sink - dt
+agent1.sinks.dt.id = CEVL00P
+agent1.sinks.dt.type = com.datatorrent.flume.sink.DTFlumeSink
+agent1.sinks.dt.hostname = localhost
+agent1.sinks.dt.port = 8080
+agent1.sinks.dt.sleepMillis = 7
+agent1.sinks.dt.throughputAdjustmentFactor = 2
+agent1.sinks.dt.maximumEventsPerTransaction = 5000
+agent1.sinks.dt.minimumEventsPerTransaction = 1
+
+# Ensure that we do not lose the data handed over to us by flume.
+ agent1.sinks.dt.storage = com.datatorrent.flume.storage.HDFSStorage
+ agent1.sinks.dt.storage.restore = false
+ agent1.sinks.dt.storage.baseDir = /tmp/flume101
+ agent1.sinks.dt.channel = ch1
+
+# Ensure that we are able to detect flume sinks (and failures) automatically.
+ agent1.sinks.dt.discovery = com.datatorrent.flume.discovery.ZKAssistedDiscovery
+ agent1.sinks.dt.discovery.connectionString = 127.0.0.1:2181
+ agent1.sinks.dt.discovery.basePath = /HelloDT
+ agent1.sinks.dt.discovery.connectionTimeoutMillis = 1000
+ agent1.sinks.dt.discovery.connectionRetryCount = 10
+ agent1.sinks.dt.discovery.connectionRetrySleepMillis = 500
+
diff --git a/flume/src/test/resources/flume/conf/flume-env.sh b/flume/src/test/resources/flume/conf/flume-env.sh
new file mode 100644
index 0000000..c2232ea
--- /dev/null
+++ b/flume/src/test/resources/flume/conf/flume-env.sh
@@ -0,0 +1,36 @@
+#!/bin/bash
+#
+# Copyright (c) 2016 DataTorrent, Inc. ALL Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+
+# This script runs on the machine which have maven repository populated under
+# $HOME/.m2 If that's not the case, please adjust the JARPATH variable below
+# to point to colon separated list of directories where jar files can be found
+if test -z "$DT_FLUME_JAR"
+then
+ echo [ERROR]: Environment variable DT_FLUME_JAR should point to a valid jar file which contains DTFlumeSink class >&2
+ exit 2
+fi
+
+echo JARPATH is set to ${JARPATH:=$HOME/.m2/repository:.}
+if test -z "$JAVA_HOME"
+then
+ JAVA=java
+else
+ JAVA=${JAVA_HOME}/bin/java
+fi
+FLUME_CLASSPATH=`JARPATH=$JARPATH $JAVA -cp $DT_FLUME_JAR com.datatorrent.jarpath.JarPath -N $DT_FLUME_JAR -Xdt-jarpath -Xdt-netlet`
\ No newline at end of file
diff --git a/flume/src/test/resources/log4j.properties b/flume/src/test/resources/log4j.properties
new file mode 100644
index 0000000..ac0a107
--- /dev/null
+++ b/flume/src/test/resources/log4j.properties
@@ -0,0 +1,38 @@
+#
+# Copyright (c) 2016 DataTorrent, Inc. ALL Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+log4j.rootLogger=INFO,CONSOLE
+
+log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
+log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
+log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
+
+log4j.appender.malhar=org.apache.log4j.RollingFileAppender
+log4j.appender.malhar.layout=org.apache.log4j.PatternLayout
+log4j.appender.malhar.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
+#log4j.appender.malhar.File=/tmp/app.log
+
+# to enable, add SYSLOG to rootLogger
+log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender
+log4j.appender.SYSLOG.syslogHost=127.0.0.1
+log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout
+log4j.appender.SYSLOG.layout.conversionPattern=${dt.cid} %-5p [%t] %c{2} %x - %m%n
+log4j.appender.SYSLOG.Facility=LOCAL1
+
+#log4j.logger.org=info
+#log4j.logger.org.apache.commons.beanutils=warn
+log4j.logger.com.datatorrent=debug
diff --git a/flume/src/test/resources/test_data/gentxns/2013121500 b/flume/src/test/resources/test_data/gentxns/2013121500
new file mode 100644
index 0000000..3ce5646
--- /dev/null
+++ b/flume/src/test/resources/test_data/gentxns/2013121500
Binary files differ
diff --git a/flume/src/test/resources/test_data/gentxns/2013121501 b/flume/src/test/resources/test_data/gentxns/2013121501
new file mode 100644
index 0000000..b2e70c0
--- /dev/null
+++ b/flume/src/test/resources/test_data/gentxns/2013121501
Binary files differ
diff --git a/flume/src/test/resources/test_data/gentxns/2013121502 b/flume/src/test/resources/test_data/gentxns/2013121502
new file mode 100644
index 0000000..ec13862
--- /dev/null
+++ b/flume/src/test/resources/test_data/gentxns/2013121502
Binary files differ
diff --git a/flume/src/test/resources/test_data/gentxns/2013121503 b/flume/src/test/resources/test_data/gentxns/2013121503
new file mode 100644
index 0000000..8267dd3
--- /dev/null
+++ b/flume/src/test/resources/test_data/gentxns/2013121503
Binary files differ
diff --git a/flume/src/test/resources/test_data/gentxns/2013121504 b/flume/src/test/resources/test_data/gentxns/2013121504
new file mode 100644
index 0000000..addfe62
--- /dev/null
+++ b/flume/src/test/resources/test_data/gentxns/2013121504
Binary files differ
diff --git a/flume/src/test/resources/test_data/gentxns/2013121505 b/flume/src/test/resources/test_data/gentxns/2013121505
new file mode 100644
index 0000000..d76aa9f
--- /dev/null
+++ b/flume/src/test/resources/test_data/gentxns/2013121505
Binary files differ
diff --git a/flume/src/test/resources/test_data/gentxns/2013121506 b/flume/src/test/resources/test_data/gentxns/2013121506
new file mode 100644
index 0000000..2f5bbb6
--- /dev/null
+++ b/flume/src/test/resources/test_data/gentxns/2013121506
Binary files differ
diff --git a/flume/src/test/resources/test_data/gentxns/2013121507 b/flume/src/test/resources/test_data/gentxns/2013121507
new file mode 100644
index 0000000..a022dad
--- /dev/null
+++ b/flume/src/test/resources/test_data/gentxns/2013121507
Binary files differ
diff --git a/flume/src/test/resources/test_data/gentxns/2013121508 b/flume/src/test/resources/test_data/gentxns/2013121508
new file mode 100644
index 0000000..d1e7f5c
--- /dev/null
+++ b/flume/src/test/resources/test_data/gentxns/2013121508
Binary files differ
diff --git a/flume/src/test/resources/test_data/gentxns/2013121509 b/flume/src/test/resources/test_data/gentxns/2013121509
new file mode 100644
index 0000000..10d61de
--- /dev/null
+++ b/flume/src/test/resources/test_data/gentxns/2013121509
Binary files differ
diff --git a/flume/src/test/resources/test_data/gentxns/2013121510 b/flume/src/test/resources/test_data/gentxns/2013121510
new file mode 100644
index 0000000..c2f76c8
--- /dev/null
+++ b/flume/src/test/resources/test_data/gentxns/2013121510
Binary files differ
diff --git a/flume/src/test/resources/test_data/gentxns/2013121511 b/flume/src/test/resources/test_data/gentxns/2013121511
new file mode 100644
index 0000000..bf16cfe
--- /dev/null
+++ b/flume/src/test/resources/test_data/gentxns/2013121511
Binary files differ
diff --git a/flume/src/test/resources/test_data/gentxns/2013121512 b/flume/src/test/resources/test_data/gentxns/2013121512
new file mode 100644
index 0000000..fe75419
--- /dev/null
+++ b/flume/src/test/resources/test_data/gentxns/2013121512
Binary files differ
diff --git a/flume/src/test/resources/test_data/gentxns/2013121513 b/flume/src/test/resources/test_data/gentxns/2013121513
new file mode 100644
index 0000000..3094cae
--- /dev/null
+++ b/flume/src/test/resources/test_data/gentxns/2013121513
Binary files differ
diff --git a/flume/src/test/resources/test_data/gentxns/2013121514 b/flume/src/test/resources/test_data/gentxns/2013121514
new file mode 100644
index 0000000..6e00e4a
--- /dev/null
+++ b/flume/src/test/resources/test_data/gentxns/2013121514
Binary files differ
diff --git a/flume/src/test/resources/test_data/gentxns/2013121515 b/flume/src/test/resources/test_data/gentxns/2013121515
new file mode 100644
index 0000000..b860e43
--- /dev/null
+++ b/flume/src/test/resources/test_data/gentxns/2013121515
Binary files differ
diff --git a/flume/src/test/resources/test_data/gentxns/2013121516 b/flume/src/test/resources/test_data/gentxns/2013121516
new file mode 100644
index 0000000..dfb5854
--- /dev/null
+++ b/flume/src/test/resources/test_data/gentxns/2013121516
Binary files differ
diff --git a/flume/src/test/resources/test_data/gentxns/2013121517 b/flume/src/test/resources/test_data/gentxns/2013121517
new file mode 100644
index 0000000..c8da2cc
--- /dev/null
+++ b/flume/src/test/resources/test_data/gentxns/2013121517
Binary files differ
diff --git a/flume/src/test/resources/test_data/gentxns/2013121518 b/flume/src/test/resources/test_data/gentxns/2013121518
new file mode 100644
index 0000000..2cb628b
--- /dev/null
+++ b/flume/src/test/resources/test_data/gentxns/2013121518
Binary files differ
diff --git a/flume/src/test/resources/test_data/gentxns/2013121519 b/flume/src/test/resources/test_data/gentxns/2013121519
new file mode 100644
index 0000000..6fab9d9
--- /dev/null
+++ b/flume/src/test/resources/test_data/gentxns/2013121519
Binary files differ
diff --git a/flume/src/test/resources/test_data/gentxns/2013121520 b/flume/src/test/resources/test_data/gentxns/2013121520
new file mode 100644
index 0000000..ba56d49
--- /dev/null
+++ b/flume/src/test/resources/test_data/gentxns/2013121520
Binary files differ
diff --git a/flume/src/test/resources/test_data/gentxns/2013121521 b/flume/src/test/resources/test_data/gentxns/2013121521
new file mode 100644
index 0000000..37de926
--- /dev/null
+++ b/flume/src/test/resources/test_data/gentxns/2013121521
Binary files differ