[EAGLE-882] Stream leaf RunningQueueAPIEntity into Kafka for queue monitoring

https://issues.apache.org/jira/browse/EAGLE-882

Author: Zhao, Qingwen <qingwzhao@apache.org>

Closes #793 from qingwen220/EAGLE-882.
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningApp.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningApp.java
index 77dc0cb..68ca8c7 100644
--- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningApp.java
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningApp.java
@@ -17,11 +17,13 @@
 package org.apache.eagle.hadoop.queue;
 
 import backtype.storm.generated.StormTopology;
+import backtype.storm.topology.BoltDeclarer;
 import backtype.storm.topology.IRichSpout;
 import backtype.storm.topology.TopologyBuilder;
 import com.typesafe.config.Config;
 import org.apache.eagle.app.StormApplication;
 import org.apache.eagle.app.environment.impl.StormEnvironment;
+import org.apache.eagle.app.messaging.StormStreamSink;
 import org.apache.eagle.hadoop.queue.storm.HadoopQueueMetricPersistBolt;
 import org.apache.eagle.hadoop.queue.storm.HadoopQueueRunningSpout;
 
@@ -33,14 +35,19 @@
         HadoopQueueMetricPersistBolt bolt = new HadoopQueueMetricPersistBolt(appConfig);
         TopologyBuilder builder = new TopologyBuilder();
 
-        int numOfParserTasks = appConfig.topology.numOfParserTasks;
+        int numOfPersistTasks = appConfig.topology.numPersistTasks;
+        int numOfSinkTasks = appConfig.topology.numSinkTasks;
         int numOfSpoutTasks = 1;
 
         String spoutName = "runningQueueSpout";
-        String boltName = "parserBolt";
+        String persistBoltName = "persistBolt";
 
         builder.setSpout(spoutName, spout, numOfSpoutTasks).setNumTasks(numOfSpoutTasks);
-        builder.setBolt(boltName, bolt, numOfParserTasks).setNumTasks(numOfParserTasks).shuffleGrouping(spoutName);
+        builder.setBolt(persistBoltName, bolt, numOfPersistTasks).setNumTasks(numOfPersistTasks).shuffleGrouping(spoutName);
+
+        StormStreamSink queueSinkBolt = environment.getStreamSink("HADOOP_LEAF_QUEUE_STREAM", config);
+        builder.setBolt("queueKafkaSink", queueSinkBolt, numOfSinkTasks)
+                .setNumTasks(numOfSinkTasks).shuffleGrouping(persistBoltName);
 
         return builder.createTopology();
     }
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningAppConfig.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningAppConfig.java
index d398028..690ac6b 100644
--- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningAppConfig.java
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningAppConfig.java
@@ -36,7 +36,8 @@
     }
 
     public static class Topology implements Serializable {
-        public int numOfParserTasks;
+        public int numSinkTasks;
+        public int numPersistTasks;
     }
 
     public static class DataSourceConfig implements Serializable {
@@ -67,7 +68,8 @@
     private void init(Config config) {
         this.config = config;
 
-        this.topology.numOfParserTasks = config.getInt("topology.numOfParserTasks");
+        this.topology.numPersistTasks = config.getInt("topology.numPersistTasks");
+        this.topology.numSinkTasks = config.getInt("topology.numSinkTasks");
 
         this.dataSourceConfig.rMEndPoints = config.getString("dataSourceConfig.rMEndPoints");
         this.dataSourceConfig.fetchIntervalSec = config.getString("dataSourceConfig.fetchIntervalSec");
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/HadoopClusterConstants.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/HadoopClusterConstants.java
index 4a63343..9a08f05 100644
--- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/HadoopClusterConstants.java
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/HadoopClusterConstants.java
@@ -61,6 +61,24 @@
 
     }
 
+    public static class LeafQueueInfo {
+        public static final String TIMESTAMP = "timestamp";
+        public static final String QUEUE_SITE = "site";
+        public static final String QUEUE_NAME = "queue";
+        public static final String QUEUE_STATE = "state";
+        public static final String QUEUE_SCHEDULER = "scheduler";
+        public static final String QUEUE_ABSOLUTE_CAPACITY = "absoluteCapacity";
+        public static final String QUEUE_ABSOLUTE_MAX_CAPACITY = "absoluteMaxCapacity";
+        public static final String QUEUE_ABSOLUTE_USED_CAPACITY = "absoluteUsedCapacity";
+        public static final String QUEUE_MAX_USER_USED_CAPACITY = "maxUserUsedCapacity";
+        public static final String QUEUE_USER_LIMIT_CAPACITY = "userLimitCapacity";
+        public static final String QUEUE_USED_MEMORY = "memory";
+        public static final String QUEUE_USED_VCORES = "vcores";
+        public static final String QUEUE_NUM_ACTIVE_APPS = "numActiveApplications";
+        public static final String QUEUE_NUM_PENDING_APPS = "numPendingApplications";
+        public static final String QUEUE_MAX_ACTIVE_APPS = "maxActiveApplications";
+    }
+
     public static final String RUNNING_QUEUE_SERVICE_NAME = "RunningQueueService";
 
     // tag constants
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/SchedulerInfoParseListener.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/SchedulerInfoParseListener.java
index 214e7f6..b0452c9 100644
--- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/SchedulerInfoParseListener.java
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/SchedulerInfoParseListener.java
@@ -112,15 +112,17 @@
         _entity.setNumPendingApplications(queue.getNumPendingApplications());
         _entity.setMaxActiveApplications(queue.getMaxActiveApplications());
         _entity.setTimestamp(currentTimestamp);
+        _entity.setUserLimitFactor(queue.getUserLimitFactor());
 
         List<UserWrapper> userList = new ArrayList<>();
         if (queue.getUsers() != null && queue.getUsers().getUser() != null) {
             for (User user : queue.getUsers().getUser()) {
-                UserWrapper newUser = new UserWrapper(user);
-                userList.add(newUser);
+                userList.add(wrapUser(user));
             }
         }
-        _entity.setUsers(userList);
+        UserWrappers users = new UserWrappers();
+        users.setUsers(userList);
+        _entity.setUsers(users);
 
         runningQueueAPIEntities.add(_entity);
 
@@ -149,4 +151,14 @@
             }
         }
     }
+
+    private UserWrapper wrapUser(User user) {
+        UserWrapper wrapper = new UserWrapper();
+        wrapper.setUsername(user.getUsername());
+        wrapper.setMemory(user.getResourcesUsed().getMemory());
+        wrapper.setvCores(user.getResourcesUsed().getvCores());
+        wrapper.setNumActiveApplications(user.getNumActiveApplications());
+        wrapper.setNumPendingApplications(user.getNumPendingApplications());
+        return wrapper;
+    }
 }
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/scheduler/RunningQueueAPIEntity.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/scheduler/RunningQueueAPIEntity.java
index e7563d4..be4ef31 100755
--- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/scheduler/RunningQueueAPIEntity.java
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/scheduler/RunningQueueAPIEntity.java
@@ -54,7 +54,9 @@
     @Column("j")
     private String scheduler;
     @Column("k")
-    private List<UserWrapper> users;
+    private UserWrappers users;
+    @Column("l")
+    private double userLimitFactor;
 
     public String getScheduler() {
         return scheduler;
@@ -147,12 +149,21 @@
         valueChanged("numPendingApplications");
     }
 
-    public List<UserWrapper> getUsers() {
+    public UserWrappers getUsers() {
         return users;
     }
 
-    public void setUsers(List<UserWrapper> users) {
+    public void setUsers(UserWrappers users) {
         this.users = users;
         valueChanged("users");
     }
+
+    public double getUserLimitFactor() {
+        return userLimitFactor;
+    }
+
+    public void setUserLimitFactor(double userLimitFactor) {
+        this.userLimitFactor = userLimitFactor;
+        valueChanged("userLimitFactor");
+    }
 }
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/scheduler/UserWrapper.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/scheduler/UserWrapper.java
index 9303727..f5c15f5 100644
--- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/scheduler/UserWrapper.java
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/scheduler/UserWrapper.java
@@ -18,22 +18,20 @@
 
 package org.apache.eagle.hadoop.queue.model.scheduler;
 
-public class UserWrapper {
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
 
+import java.io.Serializable;
+
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class UserWrapper implements Serializable {
     private String username;
     private long memory;
     private long vCores;
     private int numPendingApplications;
     private int numActiveApplications;
 
-    public UserWrapper(User user) {
-        this.username = user.getUsername();
-        this.memory = user.getResourcesUsed().getMemory();
-        this.vCores = user.getResourcesUsed().getvCores();
-        this.numActiveApplications = user.getNumActiveApplications();
-        this.numPendingApplications = user.getNumPendingApplications();
-    }
-
     public String getUsername() {
         return username;
     }
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/scheduler/UserWrappers.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/scheduler/UserWrappers.java
new file mode 100644
index 0000000..9a6bf8a
--- /dev/null
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/scheduler/UserWrappers.java
@@ -0,0 +1,39 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.eagle.hadoop.queue.model.scheduler;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+
+import java.io.Serializable;
+import java.util.List;
+
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class UserWrappers implements Serializable {
+    private List<UserWrapper> users;
+
+    public List<UserWrapper> getUsers() {
+        return users;
+    }
+
+    public void setUsers(List<UserWrapper> users) {
+        this.users = users;
+    }
+
+}
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueMetricPersistBolt.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueMetricPersistBolt.java
index 3609184..1bafc13 100644
--- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueMetricPersistBolt.java
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueMetricPersistBolt.java
@@ -22,10 +22,14 @@
 import backtype.storm.task.TopologyContext;
 import backtype.storm.topology.OutputFieldsDeclarer;
 import backtype.storm.topology.base.BaseRichBolt;
+import backtype.storm.tuple.Fields;
 import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
 import org.apache.eagle.hadoop.queue.HadoopQueueRunningAppConfig;
 import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants;
+import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants.LeafQueueInfo;
 import org.apache.eagle.hadoop.queue.model.scheduler.RunningQueueAPIEntity;
+import org.apache.eagle.hadoop.queue.model.scheduler.UserWrapper;
 import org.apache.eagle.log.entity.GenericMetricEntity;
 import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
 import org.apache.eagle.service.client.IEagleServiceClient;
@@ -33,6 +37,8 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -67,6 +73,11 @@
             writeMetrics(metrics);
         } else if (dataType.equalsIgnoreCase(HadoopClusterConstants.DataType.ENTITY.toString())) {
             List<RunningQueueAPIEntity> entities = (List<RunningQueueAPIEntity>) data;
+            for (RunningQueueAPIEntity queue : entities) {
+                if (queue.getUsers() != null && !queue.getUsers().getUsers().isEmpty() && queue.getMemory() != 0) {
+                    collector.emit(new Values(queue.getTags().get(HadoopClusterConstants.TAG_QUEUE), parseLeafQueueInfo(queue)));
+                }
+            }
             writeEntities(entities);
         }
         this.collector.ack(input);
@@ -74,7 +85,18 @@
 
     @Override
     public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        declarer.declare(new Fields(HadoopClusterConstants.LeafQueueInfo.QUEUE_NAME, "message"));
+    }
 
+    @Override
+    public void cleanup() {
+        if (client != null) {
+            try {
+                client.close();
+            } catch (IOException e) {
+                LOG.error(e.getMessage(), e);
+            }
+        }
     }
 
     private void writeEntities(List<RunningQueueAPIEntity> entities) {
@@ -104,5 +126,40 @@
         }
     }
 
+    private Map<String, Object> parseLeafQueueInfo(RunningQueueAPIEntity queueAPIEntity) {
+        Map<String, Object> queueInfoMap = new HashMap<>();
+        queueInfoMap.put(LeafQueueInfo.QUEUE_SITE, queueAPIEntity.getTags().get(HadoopClusterConstants.TAG_SITE));
+        queueInfoMap.put(LeafQueueInfo.QUEUE_NAME, queueAPIEntity.getTags().get(HadoopClusterConstants.TAG_QUEUE));
+        queueInfoMap.put(LeafQueueInfo.QUEUE_ABSOLUTE_CAPACITY, queueAPIEntity.getAbsoluteCapacity());
+        queueInfoMap.put(LeafQueueInfo.QUEUE_ABSOLUTE_MAX_CAPACITY, queueAPIEntity.getAbsoluteMaxCapacity());
+        queueInfoMap.put(LeafQueueInfo.QUEUE_ABSOLUTE_USED_CAPACITY, queueAPIEntity.getAbsoluteUsedCapacity());
+        queueInfoMap.put(LeafQueueInfo.QUEUE_MAX_ACTIVE_APPS, queueAPIEntity.getMaxActiveApplications());
+        queueInfoMap.put(LeafQueueInfo.QUEUE_NUM_ACTIVE_APPS, queueAPIEntity.getNumActiveApplications());
+        queueInfoMap.put(LeafQueueInfo.QUEUE_NUM_PENDING_APPS, queueAPIEntity.getNumPendingApplications());
+        queueInfoMap.put(LeafQueueInfo.QUEUE_SCHEDULER, queueAPIEntity.getScheduler());
+        queueInfoMap.put(LeafQueueInfo.QUEUE_STATE, queueAPIEntity.getState());
+        queueInfoMap.put(LeafQueueInfo.QUEUE_USED_MEMORY, queueAPIEntity.getMemory());
+        queueInfoMap.put(LeafQueueInfo.QUEUE_USED_VCORES, queueAPIEntity.getVcores());
+        queueInfoMap.put(LeafQueueInfo.TIMESTAMP, queueAPIEntity.getTimestamp());
 
+        double maxUserUsedCapacity = 0;
+        double userUsedCapacity;
+        for (UserWrapper user : queueAPIEntity.getUsers().getUsers()) {
+            userUsedCapacity = calculateUserUsedCapacity(
+                    queueAPIEntity.getAbsoluteUsedCapacity(),
+                    queueAPIEntity.getMemory(),
+                    user.getMemory());
+            if (userUsedCapacity > maxUserUsedCapacity) {
+                maxUserUsedCapacity = userUsedCapacity;
+            }
+
+        }
+        queueInfoMap.put(LeafQueueInfo.QUEUE_MAX_USER_USED_CAPACITY, maxUserUsedCapacity);
+        queueInfoMap.put(LeafQueueInfo.QUEUE_USER_LIMIT_CAPACITY, queueAPIEntity.getUserLimitFactor() * queueAPIEntity.getAbsoluteCapacity());
+        return queueInfoMap;
+    }
+
+    private double calculateUserUsedCapacity(double absoluteUsedCapacity, long queueUsedMem, long userUsedMem) {
+        return userUsedMem * absoluteUsedCapacity / queueUsedMem;
+    }
 }
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/resources/META-INF/providers/org.apache.eagle.hadoop.queue.HadoopQueueRunningAppProvider.xml b/eagle-jpm/eagle-hadoop-queue/src/main/resources/META-INF/providers/org.apache.eagle.hadoop.queue.HadoopQueueRunningAppProvider.xml
index 4984b43..4cf745c 100644
--- a/eagle-jpm/eagle-hadoop-queue/src/main/resources/META-INF/providers/org.apache.eagle.hadoop.queue.HadoopQueueRunningAppProvider.xml
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/resources/META-INF/providers/org.apache.eagle.hadoop.queue.HadoopQueueRunningAppProvider.xml
@@ -17,36 +17,162 @@
   -->
 
 <application>
-  <type>HADOOP_QUEUE_RUNNING_APP</type>
-  <name>Hadoop Queue Monitor</name>
-  <configuration>
-    <!-- org.apache.eagle.hadoop.queue.HadoopQueueRunningAppConfig -->
-    <property>
-      <name>dataSourceConfig.rMEndPoints</name>
-      <displayName>Resource Manager End Points</displayName>
-      <description>end points of resource manager, comma-separated for multiple</description>
-      <value>http://sandbox.hortonworks.com:8088/</value>
-      <required>true</required>
-    </property>
-    <property>
-      <name>workers</name>
-      <displayName>Storm Worker Number</displayName>
-      <description>the number of storm worker</description>
-      <value>1</value>
-    </property>
-    <property>
-      <name>topology.numOfParserTasks</name>
-      <displayName>Parallel Tasks Per Bolt</displayName>
-      <description>the number of tasks that should be assigned to execute a bolt</description>
-      <value>2</value>
-    </property>
-    <property>
-      <name>dataSourceConfig.fetchIntervalSec</name>
-      <displayName>Fetching Metric Interval in Seconds</displayName>
-      <description>interval seconds of fetching metric from resource manager</description>
-      <value>10</value>
-    </property>
-  </configuration>
+    <type>HADOOP_QUEUE_RUNNING_APP</type>
+    <name>Hadoop Queue Monitor</name>
+    <configuration>
+        <!-- org.apache.eagle.hadoop.queue.HadoopQueueRunningAppConfig -->
+        <property>
+            <name>dataSourceConfig.rMEndPoints</name>
+            <displayName>Resource Manager End Points</displayName>
+            <description>end points of resource manager, comma-separated for multiple</description>
+            <value>http://sandbox.hortonworks.com:8088/</value>
+            <required>true</required>
+        </property>
+        <property>
+            <name>workers</name>
+            <displayName>Storm Worker Number</displayName>
+            <description>the number of storm worker</description>
+            <value>1</value>
+        </property>
+        <property>
+            <name>topology.numPersistTasks</name>
+            <displayName>Tasks for Data Storage Bolt</displayName>
+            <description>the number of tasks that persist metrics or entities into the database</description>
+            <value>2</value>
+        </property>
+        <property>
+            <name>topology.numSinkTasks</name>
+            <displayName>Tasks for Stream Sink Bolt</displayName>
+            <description>the number of tasks that stream leaf queue info into Kafka</description>
+            <value>2</value>
+        </property>
+        <property>
+            <name>dataSourceConfig.fetchIntervalSec</name>
+            <displayName>Fetching Metric Interval in Seconds</displayName>
+            <description>interval seconds of fetching metric from resource manager</description>
+            <value>10</value>
+        </property>
+
+        <!-- sink to kafka -->
+        <property>
+            <name>dataSinkConfig.topic</name>
+            <displayName>dataSinkConfig.topic</displayName>
+            <value>hadoop_leaf_queue</value>
+            <description>topic for kafka data sink</description>
+        </property>
+        <property>
+            <name>dataSinkConfig.brokerList</name>
+            <displayName>dataSinkConfig.brokerList</displayName>
+            <value>localhost:6667</value>
+            <description>kafka broker list</description>
+            <required>true</required>
+        </property>
+        <property>
+            <name>dataSinkConfig.serializerClass</name>
+            <displayName>dataSinkConfig.serializerClass</displayName>
+            <value>kafka.serializer.StringEncoder</value>
+            <description>serializer class Kafka message value</description>
+        </property>
+        <property>
+            <name>dataSinkConfig.keySerializerClass</name>
+            <displayName>dataSinkConfig.keySerializerClass</displayName>
+            <value>kafka.serializer.StringEncoder</value>
+            <description>serializer class Kafka message key</description>
+        </property>
+        <property>
+            <name>dataSinkConfig.producerType</name>
+            <displayName>dataSinkConfig.producerType</displayName>
+            <value>async</value>
+            <description>whether the messages are sent asynchronously in a background thread</description>
+        </property>
+        <property>
+            <name>dataSinkConfig.numBatchMessages</name>
+            <displayName>dataSinkConfig.numBatchMessages</displayName>
+            <value>4096</value>
+            <description>number of messages to send in one batch when using async mode</description>
+        </property>
+        <property>
+            <name>dataSinkConfig.maxQueueBufferMs</name>
+            <displayName>dataSinkConfig.maxQueueBufferMs</displayName>
+            <value>5000</value>
+            <description>maximum time to buffer data when using async mode</description>
+        </property>
+        <property>
+            <name>dataSinkConfig.requestRequiredAcks</name>
+            <displayName>dataSinkConfig.requestRequiredAcks</displayName>
+            <value>0</value>
+            <description>value controls when a produce request is considered completed</description>
+        </property>
+    </configuration>
+    <streams>
+        <stream>
+            <streamId>HADOOP_LEAF_QUEUE_STREAM</streamId>
+            <description>Hadoop Leaf Queue Info Stream</description>
+            <validate>true</validate>
+            <columns>
+                <column>
+                    <name>timestamp</name>
+                    <type>long</type>
+                </column>
+                <column>
+                    <name>site</name>
+                    <type>string</type>
+                </column>
+                <column>
+                    <name>queue</name>
+                    <type>string</type>
+                </column>
+                <column>
+                    <name>state</name>
+                    <type>string</type>
+                </column>
+                <column>
+                    <name>scheduler</name>
+                    <type>string</type>
+                </column>
+                <column>
+                    <name>absoluteCapacity</name>
+                    <type>double</type>
+                </column>
+                <column>
+                    <name>absoluteMaxCapacity</name>
+                    <type>double</type>
+                </column>
+                <column>
+                    <name>absoluteUsedCapacity</name>
+                    <type>double</type>
+                </column>
+                <column>
+                    <name>maxUserUsedCapacity</name>
+                    <type>double</type>
+                </column>
+                <column>
+                    <name>userLimitCapacity</name>
+                    <type>double</type>
+                </column>
+                <column>
+                    <name>memory</name>
+                    <type>long</type>
+                </column>
+                <column>
+                    <name>vcores</name>
+                    <type>long</type>
+                </column>
+                <column>
+                    <name>numActiveApplications</name>
+                    <type>int</type>
+                </column>
+                <column>
+                    <name>numPendingApplications</name>
+                    <type>int</type>
+                </column>
+                <column>
+                    <name>maxActiveApplications</name>
+                    <type>int</type>
+                </column>
+            </columns>
+        </stream>
+    </streams>
   <docs>
     <install>
     </install>
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/resources/application.conf b/eagle-jpm/eagle-hadoop-queue/src/main/resources/application.conf
index e5c9b81..9d69084 100644
--- a/eagle-jpm/eagle-hadoop-queue/src/main/resources/application.conf
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/resources/application.conf
@@ -15,7 +15,8 @@
 
 {
   "topology" : {
-    "numOfParserTasks" : 2,
+    "numSinkTasks" : 2,
+    "numPersistTasks" : 2
   },
   "dataSourceConfig": {
     "rMEndPoints" : "http://sandbox.hortonworks.com:8088/",
@@ -32,4 +33,15 @@
   "mode":"LOCAL",
   application.storm.nimbusHost=localhost,
   "workers":1,
+
+  "dataSinkConfig": {
+    "topic" : "hadoop_leaf_queue",
+    "brokerList" : "sandbox.hortonworks.com:6667",
+    "serializerClass" : "kafka.serializer.StringEncoder",
+    "keySerializerClass" : "kafka.serializer.StringEncoder"
+    "producerType" : "async",
+    "numBatchMessages" : "4096",
+    "maxQueueBufferMs" : "5000",
+    "requestRequiredAcks" : "0"
+  }
 }
diff --git a/eagle-jpm/eagle-hadoop-queue/src/test/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningAppTest.java b/eagle-jpm/eagle-hadoop-queue/src/test/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningAppTest.java
index 32ed320..da79ea2 100644
--- a/eagle-jpm/eagle-hadoop-queue/src/test/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningAppTest.java
+++ b/eagle-jpm/eagle-hadoop-queue/src/test/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningAppTest.java
@@ -21,7 +21,7 @@
 
 public class HadoopQueueRunningAppTest {
     @Test
-    public void testRun(){
+    public void testRun() {
         new HadoopQueueRunningApp().run(ConfigFactory.load());
     }
 }
diff --git a/eagle-jpm/eagle-hadoop-queue/src/test/resources/application.conf b/eagle-jpm/eagle-hadoop-queue/src/test/resources/application.conf
index e5c9b81..9d69084 100644
--- a/eagle-jpm/eagle-hadoop-queue/src/test/resources/application.conf
+++ b/eagle-jpm/eagle-hadoop-queue/src/test/resources/application.conf
@@ -15,7 +15,8 @@
 
 {
   "topology" : {
-    "numOfParserTasks" : 2,
+    "numSinkTasks" : 2,
+    "numPersistTasks" : 2
   },
   "dataSourceConfig": {
     "rMEndPoints" : "http://sandbox.hortonworks.com:8088/",
@@ -32,4 +33,15 @@
   "mode":"LOCAL",
   application.storm.nimbusHost=localhost,
   "workers":1,
+
+  "dataSinkConfig": {
+    "topic" : "hadoop_leaf_queue",
+    "brokerList" : "sandbox.hortonworks.com:6667",
+    "serializerClass" : "kafka.serializer.StringEncoder",
+    "keySerializerClass" : "kafka.serializer.StringEncoder"
+    "producerType" : "async",
+    "numBatchMessages" : "4096",
+    "maxQueueBufferMs" : "5000",
+    "requestRequiredAcks" : "0"
+  }
 }