[EAGLE-882] Stream leaf RunningQueueAPIEntity into Kafka for queue monitoring
https://issues.apache.org/jira/browse/EAGLE-882
Author: Zhao, Qingwen <qingwzhao@apache.org>
Closes #793 from qingwen220/EAGLE-882.
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningApp.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningApp.java
index 77dc0cb..68ca8c7 100644
--- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningApp.java
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningApp.java
@@ -17,11 +17,13 @@
package org.apache.eagle.hadoop.queue;
import backtype.storm.generated.StormTopology;
+import backtype.storm.topology.BoltDeclarer;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.TopologyBuilder;
import com.typesafe.config.Config;
import org.apache.eagle.app.StormApplication;
import org.apache.eagle.app.environment.impl.StormEnvironment;
+import org.apache.eagle.app.messaging.StormStreamSink;
import org.apache.eagle.hadoop.queue.storm.HadoopQueueMetricPersistBolt;
import org.apache.eagle.hadoop.queue.storm.HadoopQueueRunningSpout;
@@ -33,14 +35,19 @@
HadoopQueueMetricPersistBolt bolt = new HadoopQueueMetricPersistBolt(appConfig);
TopologyBuilder builder = new TopologyBuilder();
- int numOfParserTasks = appConfig.topology.numOfParserTasks;
+ int numOfPersistTasks = appConfig.topology.numPersistTasks;
+ int numOfSinkTasks = appConfig.topology.numSinkTasks;
int numOfSpoutTasks = 1;
String spoutName = "runningQueueSpout";
- String boltName = "parserBolt";
+ String persistBoltName = "persistBolt";
builder.setSpout(spoutName, spout, numOfSpoutTasks).setNumTasks(numOfSpoutTasks);
- builder.setBolt(boltName, bolt, numOfParserTasks).setNumTasks(numOfParserTasks).shuffleGrouping(spoutName);
+ builder.setBolt(persistBoltName, bolt, numOfPersistTasks).setNumTasks(numOfPersistTasks).shuffleGrouping(spoutName);
+
+ StormStreamSink queueSinkBolt = environment.getStreamSink("HADOOP_LEAF_QUEUE_STREAM", config);
+ builder.setBolt("queueKafkaSink", queueSinkBolt, numOfSinkTasks)
+ .setNumTasks(numOfSinkTasks).shuffleGrouping(persistBoltName);
return builder.createTopology();
}
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningAppConfig.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningAppConfig.java
index d398028..690ac6b 100644
--- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningAppConfig.java
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningAppConfig.java
@@ -36,7 +36,8 @@
}
public static class Topology implements Serializable {
- public int numOfParserTasks;
+ public int numSinkTasks;
+ public int numPersistTasks;
}
public static class DataSourceConfig implements Serializable {
@@ -67,7 +68,8 @@
private void init(Config config) {
this.config = config;
- this.topology.numOfParserTasks = config.getInt("topology.numOfParserTasks");
+ this.topology.numPersistTasks = config.getInt("topology.numPersistTasks");
+ this.topology.numSinkTasks = config.getInt("topology.numSinkTasks");
this.dataSourceConfig.rMEndPoints = config.getString("dataSourceConfig.rMEndPoints");
this.dataSourceConfig.fetchIntervalSec = config.getString("dataSourceConfig.fetchIntervalSec");
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/HadoopClusterConstants.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/HadoopClusterConstants.java
index 4a63343..9a08f05 100644
--- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/HadoopClusterConstants.java
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/HadoopClusterConstants.java
@@ -61,6 +61,24 @@
}
+ public static class LeafQueueInfo {
+ public static final String TIMESTAMP = "timestamp";
+ public static final String QUEUE_SITE = "site";
+ public static final String QUEUE_NAME = "queue";
+ public static final String QUEUE_STATE = "state";
+ public static final String QUEUE_SCHEDULER = "scheduler";
+ public static final String QUEUE_ABSOLUTE_CAPACITY = "absoluteCapacity";
+ public static final String QUEUE_ABSOLUTE_MAX_CAPACITY = "absoluteMaxCapacity";
+ public static final String QUEUE_ABSOLUTE_USED_CAPACITY = "absoluteUsedCapacity";
+ public static final String QUEUE_MAX_USER_USED_CAPACITY = "maxUserUsedCapacity";
+ public static final String QUEUE_USER_LIMIT_CAPACITY = "userLimitCapacity";
+ public static final String QUEUE_USED_MEMORY = "memory";
+ public static final String QUEUE_USED_VCORES = "vcores";
+ public static final String QUEUE_NUM_ACTIVE_APPS = "numActiveApplications";
+ public static final String QUEUE_NUM_PENDING_APPS = "numPendingApplications";
+ public static final String QUEUE_MAX_ACTIVE_APPS = "maxActiveApplications";
+ }
+
public static final String RUNNING_QUEUE_SERVICE_NAME = "RunningQueueService";
// tag constants
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/SchedulerInfoParseListener.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/SchedulerInfoParseListener.java
index 214e7f6..b0452c9 100644
--- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/SchedulerInfoParseListener.java
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/SchedulerInfoParseListener.java
@@ -112,15 +112,17 @@
_entity.setNumPendingApplications(queue.getNumPendingApplications());
_entity.setMaxActiveApplications(queue.getMaxActiveApplications());
_entity.setTimestamp(currentTimestamp);
+ _entity.setUserLimitFactor(queue.getUserLimitFactor());
List<UserWrapper> userList = new ArrayList<>();
if (queue.getUsers() != null && queue.getUsers().getUser() != null) {
for (User user : queue.getUsers().getUser()) {
- UserWrapper newUser = new UserWrapper(user);
- userList.add(newUser);
+ userList.add(wrapUser(user));
}
}
- _entity.setUsers(userList);
+ UserWrappers users = new UserWrappers();
+ users.setUsers(userList);
+ _entity.setUsers(users);
runningQueueAPIEntities.add(_entity);
@@ -149,4 +151,14 @@
}
}
}
+
+ private UserWrapper wrapUser(User user) {
+ UserWrapper wrapper = new UserWrapper();
+ wrapper.setUsername(user.getUsername());
+ wrapper.setMemory(user.getResourcesUsed().getMemory());
+ wrapper.setvCores(user.getResourcesUsed().getvCores());
+ wrapper.setNumActiveApplications(user.getNumActiveApplications());
+ wrapper.setNumPendingApplications(user.getNumPendingApplications());
+ return wrapper;
+ }
}
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/scheduler/RunningQueueAPIEntity.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/scheduler/RunningQueueAPIEntity.java
index e7563d4..be4ef31 100755
--- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/scheduler/RunningQueueAPIEntity.java
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/scheduler/RunningQueueAPIEntity.java
@@ -54,7 +54,9 @@
@Column("j")
private String scheduler;
@Column("k")
- private List<UserWrapper> users;
+ private UserWrappers users;
+ @Column("l")
+ private double userLimitFactor;
public String getScheduler() {
return scheduler;
@@ -147,12 +149,21 @@
valueChanged("numPendingApplications");
}
- public List<UserWrapper> getUsers() {
+ public UserWrappers getUsers() {
return users;
}
- public void setUsers(List<UserWrapper> users) {
+ public void setUsers(UserWrappers users) {
this.users = users;
valueChanged("users");
}
+
+ public double getUserLimitFactor() {
+ return userLimitFactor;
+ }
+
+ public void setUserLimitFactor(double userLimitFactor) {
+ this.userLimitFactor = userLimitFactor;
+ valueChanged("userLimitFactor");
+ }
}
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/scheduler/UserWrapper.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/scheduler/UserWrapper.java
index 9303727..f5c15f5 100644
--- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/scheduler/UserWrapper.java
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/scheduler/UserWrapper.java
@@ -18,22 +18,20 @@
package org.apache.eagle.hadoop.queue.model.scheduler;
-public class UserWrapper {
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import java.io.Serializable;
+
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class UserWrapper implements Serializable {
private String username;
private long memory;
private long vCores;
private int numPendingApplications;
private int numActiveApplications;
- public UserWrapper(User user) {
- this.username = user.getUsername();
- this.memory = user.getResourcesUsed().getMemory();
- this.vCores = user.getResourcesUsed().getvCores();
- this.numActiveApplications = user.getNumActiveApplications();
- this.numPendingApplications = user.getNumPendingApplications();
- }
-
public String getUsername() {
return username;
}
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/scheduler/UserWrappers.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/scheduler/UserWrappers.java
new file mode 100644
index 0000000..9a6bf8a
--- /dev/null
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/scheduler/UserWrappers.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.hadoop.queue.model.scheduler;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+
+import java.io.Serializable;
+import java.util.List;
+
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class UserWrappers implements Serializable {
+ private List<UserWrapper> users;
+
+ public List<UserWrapper> getUsers() {
+ return users;
+ }
+
+ public void setUsers(List<UserWrapper> users) {
+ this.users = users;
+ }
+
+}
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueMetricPersistBolt.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueMetricPersistBolt.java
index 3609184..1bafc13 100644
--- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueMetricPersistBolt.java
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueMetricPersistBolt.java
@@ -22,10 +22,14 @@
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
+import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
import org.apache.eagle.hadoop.queue.HadoopQueueRunningAppConfig;
import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants;
+import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants.LeafQueueInfo;
import org.apache.eagle.hadoop.queue.model.scheduler.RunningQueueAPIEntity;
+import org.apache.eagle.hadoop.queue.model.scheduler.UserWrapper;
import org.apache.eagle.log.entity.GenericMetricEntity;
import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
import org.apache.eagle.service.client.IEagleServiceClient;
@@ -33,6 +37,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -67,6 +73,11 @@
writeMetrics(metrics);
} else if (dataType.equalsIgnoreCase(HadoopClusterConstants.DataType.ENTITY.toString())) {
List<RunningQueueAPIEntity> entities = (List<RunningQueueAPIEntity>) data;
+ for (RunningQueueAPIEntity queue : entities) {
+ if (queue.getUsers() != null && !queue.getUsers().getUsers().isEmpty() && queue.getMemory() != 0) {
+ collector.emit(new Values(queue.getTags().get(HadoopClusterConstants.TAG_QUEUE), parseLeafQueueInfo(queue)));
+ }
+ }
writeEntities(entities);
}
this.collector.ack(input);
@@ -74,7 +85,18 @@
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields(HadoopClusterConstants.LeafQueueInfo.QUEUE_NAME, "message"));
+ }
+ @Override
+ public void cleanup() {
+ if (client != null) {
+ try {
+ client.close();
+ } catch (IOException e) {
+ LOG.error(e.getMessage(), e);
+ }
+ }
}
private void writeEntities(List<RunningQueueAPIEntity> entities) {
@@ -104,5 +126,40 @@
}
}
+ private Map<String, Object> parseLeafQueueInfo(RunningQueueAPIEntity queueAPIEntity) {
+ Map<String, Object> queueInfoMap = new HashMap<>();
+ queueInfoMap.put(LeafQueueInfo.QUEUE_SITE, queueAPIEntity.getTags().get(HadoopClusterConstants.TAG_SITE));
+ queueInfoMap.put(LeafQueueInfo.QUEUE_NAME, queueAPIEntity.getTags().get(HadoopClusterConstants.TAG_QUEUE));
+ queueInfoMap.put(LeafQueueInfo.QUEUE_ABSOLUTE_CAPACITY, queueAPIEntity.getAbsoluteCapacity());
+ queueInfoMap.put(LeafQueueInfo.QUEUE_ABSOLUTE_MAX_CAPACITY, queueAPIEntity.getAbsoluteMaxCapacity());
+ queueInfoMap.put(LeafQueueInfo.QUEUE_ABSOLUTE_USED_CAPACITY, queueAPIEntity.getAbsoluteUsedCapacity());
+ queueInfoMap.put(LeafQueueInfo.QUEUE_MAX_ACTIVE_APPS, queueAPIEntity.getMaxActiveApplications());
+ queueInfoMap.put(LeafQueueInfo.QUEUE_NUM_ACTIVE_APPS, queueAPIEntity.getNumActiveApplications());
+ queueInfoMap.put(LeafQueueInfo.QUEUE_NUM_PENDING_APPS, queueAPIEntity.getNumPendingApplications());
+ queueInfoMap.put(LeafQueueInfo.QUEUE_SCHEDULER, queueAPIEntity.getScheduler());
+ queueInfoMap.put(LeafQueueInfo.QUEUE_STATE, queueAPIEntity.getState());
+ queueInfoMap.put(LeafQueueInfo.QUEUE_USED_MEMORY, queueAPIEntity.getMemory());
+ queueInfoMap.put(LeafQueueInfo.QUEUE_USED_VCORES, queueAPIEntity.getVcores());
+ queueInfoMap.put(LeafQueueInfo.TIMESTAMP, queueAPIEntity.getTimestamp());
+ double maxUserUsedCapacity = 0;
+ double userUsedCapacity;
+ for (UserWrapper user : queueAPIEntity.getUsers().getUsers()) {
+ userUsedCapacity = calculateUserUsedCapacity(
+ queueAPIEntity.getAbsoluteUsedCapacity(),
+ queueAPIEntity.getMemory(),
+ user.getMemory());
+ if (userUsedCapacity > maxUserUsedCapacity) {
+ maxUserUsedCapacity = userUsedCapacity;
+ }
+
+ }
+ queueInfoMap.put(LeafQueueInfo.QUEUE_MAX_USER_USED_CAPACITY, maxUserUsedCapacity);
+ queueInfoMap.put(LeafQueueInfo.QUEUE_USER_LIMIT_CAPACITY, queueAPIEntity.getUserLimitFactor() * queueAPIEntity.getAbsoluteCapacity());
+ return queueInfoMap;
+ }
+
+ private double calculateUserUsedCapacity(double absoluteUsedCapacity, long queueUsedMem, long userUsedMem) {
+ return userUsedMem * absoluteUsedCapacity / queueUsedMem;
+ }
}
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/resources/META-INF/providers/org.apache.eagle.hadoop.queue.HadoopQueueRunningAppProvider.xml b/eagle-jpm/eagle-hadoop-queue/src/main/resources/META-INF/providers/org.apache.eagle.hadoop.queue.HadoopQueueRunningAppProvider.xml
index 4984b43..4cf745c 100644
--- a/eagle-jpm/eagle-hadoop-queue/src/main/resources/META-INF/providers/org.apache.eagle.hadoop.queue.HadoopQueueRunningAppProvider.xml
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/resources/META-INF/providers/org.apache.eagle.hadoop.queue.HadoopQueueRunningAppProvider.xml
@@ -17,36 +17,162 @@
-->
<application>
- <type>HADOOP_QUEUE_RUNNING_APP</type>
- <name>Hadoop Queue Monitor</name>
- <configuration>
- <!-- org.apache.eagle.hadoop.queue.HadoopQueueRunningAppConfig -->
- <property>
- <name>dataSourceConfig.rMEndPoints</name>
- <displayName>Resource Manager End Points</displayName>
- <description>end points of resource manager, comma-separated for multiple</description>
- <value>http://sandbox.hortonworks.com:8088/</value>
- <required>true</required>
- </property>
- <property>
- <name>workers</name>
- <displayName>Storm Worker Number</displayName>
- <description>the number of storm worker</description>
- <value>1</value>
- </property>
- <property>
- <name>topology.numOfParserTasks</name>
- <displayName>Parallel Tasks Per Bolt</displayName>
- <description>the number of tasks that should be assigned to execute a bolt</description>
- <value>2</value>
- </property>
- <property>
- <name>dataSourceConfig.fetchIntervalSec</name>
- <displayName>Fetching Metric Interval in Seconds</displayName>
- <description>interval seconds of fetching metric from resource manager</description>
- <value>10</value>
- </property>
- </configuration>
+ <type>HADOOP_QUEUE_RUNNING_APP</type>
+ <name>Hadoop Queue Monitor</name>
+ <configuration>
+ <!-- org.apache.eagle.hadoop.queue.HadoopQueueRunningAppConfig -->
+ <property>
+ <name>dataSourceConfig.rMEndPoints</name>
+ <displayName>Resource Manager End Points</displayName>
+ <description>end points of resource manager, comma-separated for multiple</description>
+ <value>http://sandbox.hortonworks.com:8088/</value>
+ <required>true</required>
+ </property>
+ <property>
+ <name>workers</name>
+ <displayName>Storm Worker Number</displayName>
+ <description>the number of storm worker</description>
+ <value>1</value>
+ </property>
+ <property>
+ <name>topology.numPersistTasks</name>
+ <displayName>Tasks for Data Storage Bolt</displayName>
+ <description>the number of tasks that persist metrics or entities into the database</description>
+ <value>2</value>
+ </property>
+ <property>
+ <name>topology.numSinkTasks</name>
+ <displayName>Tasks for Stream Sink Bolt</displayName>
+ <description>the number of tasks that stream leaf queue info into Kafka</description>
+ <value>2</value>
+ </property>
+ <property>
+ <name>dataSourceConfig.fetchIntervalSec</name>
+ <displayName>Fetching Metric Interval in Seconds</displayName>
+ <description>interval seconds of fetching metric from resource manager</description>
+ <value>10</value>
+ </property>
+
+ <!-- sink to kafka -->
+ <property>
+ <name>dataSinkConfig.topic</name>
+ <displayName>dataSinkConfig.topic</displayName>
+ <value>hadoop_leaf_queue</value>
+ <description>topic for kafka data sink</description>
+ </property>
+ <property>
+ <name>dataSinkConfig.brokerList</name>
+ <displayName>dataSinkConfig.brokerList</displayName>
+ <value>localhost:6667</value>
+ <description>kafka broker list</description>
+ <required>true</required>
+ </property>
+ <property>
+ <name>dataSinkConfig.serializerClass</name>
+ <displayName>dataSinkConfig.serializerClass</displayName>
+ <value>kafka.serializer.StringEncoder</value>
+ <description>serializer class Kafka message value</description>
+ </property>
+ <property>
+ <name>dataSinkConfig.keySerializerClass</name>
+ <displayName>dataSinkConfig.keySerializerClass</displayName>
+ <value>kafka.serializer.StringEncoder</value>
+ <description>serializer class Kafka message key</description>
+ </property>
+ <property>
+ <name>dataSinkConfig.producerType</name>
+ <displayName>dataSinkConfig.producerType</displayName>
+ <value>async</value>
+ <description>whether the messages are sent asynchronously in a background thread</description>
+ </property>
+ <property>
+ <name>dataSinkConfig.numBatchMessages</name>
+ <displayName>dataSinkConfig.numBatchMessages</displayName>
+ <value>4096</value>
+ <description>number of messages to send in one batch when using async mode</description>
+ </property>
+ <property>
+ <name>dataSinkConfig.maxQueueBufferMs</name>
+ <displayName>dataSinkConfig.maxQueueBufferMs</displayName>
+ <value>5000</value>
+ <description>maximum time to buffer data when using async mode</description>
+ </property>
+ <property>
+ <name>dataSinkConfig.requestRequiredAcks</name>
+ <displayName>dataSinkConfig.requestRequiredAcks</displayName>
+ <value>0</value>
+ <description>value controls when a produce request is considered completed</description>
+ </property>
+ </configuration>
+ <streams>
+ <stream>
+ <streamId>HADOOP_LEAF_QUEUE_STREAM</streamId>
+ <description>Hadoop Leaf Queue Info Stream</description>
+ <validate>true</validate>
+ <columns>
+ <column>
+ <name>timestamp</name>
+ <type>long</type>
+ </column>
+ <column>
+ <name>site</name>
+ <type>string</type>
+ </column>
+ <column>
+ <name>queue</name>
+ <type>string</type>
+ </column>
+ <column>
+ <name>state</name>
+ <type>string</type>
+ </column>
+ <column>
+ <name>scheduler</name>
+ <type>string</type>
+ </column>
+ <column>
+ <name>absoluteCapacity</name>
+ <type>double</type>
+ </column>
+ <column>
+ <name>absoluteMaxCapacity</name>
+ <type>double</type>
+ </column>
+ <column>
+ <name>absoluteUsedCapacity</name>
+ <type>double</type>
+ </column>
+ <column>
+ <name>maxUserUsedCapacity</name>
+ <type>double</type>
+ </column>
+ <column>
+ <name>userLimitCapacity</name>
+ <type>double</type>
+ </column>
+ <column>
+ <name>memory</name>
+ <type>long</type>
+ </column>
+ <column>
+ <name>vcores</name>
+ <type>long</type>
+ </column>
+ <column>
+ <name>numActiveApplications</name>
+ <type>int</type>
+ </column>
+ <column>
+ <name>numPendingApplications</name>
+ <type>int</type>
+ </column>
+ <column>
+ <name>maxActiveApplications</name>
+ <type>int</type>
+ </column>
+ </columns>
+ </stream>
+ </streams>
<docs>
<install>
</install>
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/resources/application.conf b/eagle-jpm/eagle-hadoop-queue/src/main/resources/application.conf
index e5c9b81..9d69084 100644
--- a/eagle-jpm/eagle-hadoop-queue/src/main/resources/application.conf
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/resources/application.conf
@@ -15,7 +15,8 @@
{
"topology" : {
- "numOfParserTasks" : 2,
+ "numSinkTasks" : 2,
+ "numPersistTasks" : 2
},
"dataSourceConfig": {
"rMEndPoints" : "http://sandbox.hortonworks.com:8088/",
@@ -32,4 +33,15 @@
"mode":"LOCAL",
application.storm.nimbusHost=localhost,
"workers":1,
+
+ "dataSinkConfig": {
+ "topic" : "hadoop_leaf_queue",
+ "brokerList" : "sandbox.hortonworks.com:6667",
+ "serializerClass" : "kafka.serializer.StringEncoder",
+ "keySerializerClass" : "kafka.serializer.StringEncoder"
+ "producerType" : "async",
+ "numBatchMessages" : "4096",
+ "maxQueueBufferMs" : "5000",
+ "requestRequiredAcks" : "0"
+ }
}
diff --git a/eagle-jpm/eagle-hadoop-queue/src/test/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningAppTest.java b/eagle-jpm/eagle-hadoop-queue/src/test/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningAppTest.java
index 32ed320..da79ea2 100644
--- a/eagle-jpm/eagle-hadoop-queue/src/test/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningAppTest.java
+++ b/eagle-jpm/eagle-hadoop-queue/src/test/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningAppTest.java
@@ -21,7 +21,7 @@
public class HadoopQueueRunningAppTest {
@Test
- public void testRun(){
+ public void testRun() {
new HadoopQueueRunningApp().run(ConfigFactory.load());
}
}
diff --git a/eagle-jpm/eagle-hadoop-queue/src/test/resources/application.conf b/eagle-jpm/eagle-hadoop-queue/src/test/resources/application.conf
index e5c9b81..9d69084 100644
--- a/eagle-jpm/eagle-hadoop-queue/src/test/resources/application.conf
+++ b/eagle-jpm/eagle-hadoop-queue/src/test/resources/application.conf
@@ -15,7 +15,8 @@
{
"topology" : {
- "numOfParserTasks" : 2,
+ "numSinkTasks" : 2,
+ "numPersistTasks" : 2
},
"dataSourceConfig": {
"rMEndPoints" : "http://sandbox.hortonworks.com:8088/",
@@ -32,4 +33,15 @@
"mode":"LOCAL",
application.storm.nimbusHost=localhost,
"workers":1,
+
+ "dataSinkConfig": {
+ "topic" : "hadoop_leaf_queue",
+ "brokerList" : "sandbox.hortonworks.com:6667",
+ "serializerClass" : "kafka.serializer.StringEncoder",
+ "keySerializerClass" : "kafka.serializer.StringEncoder"
+ "producerType" : "async",
+ "numBatchMessages" : "4096",
+ "maxQueueBufferMs" : "5000",
+ "requestRequiredAcks" : "0"
+ }
}