blob: e01f3b181e73b0b334d76608cab826e922a399bc [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ambari.metrics.core.timeline.aggregators;
import static junit.framework.Assert.assertEquals;
import static junit.framework.Assert.assertNotNull;
import static junit.framework.Assert.assertTrue;
import static junit.framework.Assert.fail;
import static org.apache.ambari.metrics.core.timeline.MetricTestHelper.createEmptyTimelineClusterMetric;
import static org.apache.ambari.metrics.core.timeline.MetricTestHelper.prepareSingleTimelineMetric;
import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_APP_IDS;
import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.GET_CLUSTER_AGGREGATE_SQL;
import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME;
import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_TABLE_NAME;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import org.apache.ambari.metrics.core.timeline.AbstractMiniHBaseClusterTest;
import org.apache.ambari.metrics.core.timeline.MetricTestHelper;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
import org.apache.ambari.metrics.core.timeline.query.Condition;
import org.apache.ambari.metrics.core.timeline.query.DefaultCondition;
import org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL;
import org.junit.Test;
import junit.framework.Assert;
public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
private final TimelineMetricReadHelper metricReader = new TimelineMetricReadHelper(metadataManager, false);
private Configuration getConfigurationForTest(boolean useGroupByAggregators) {
Configuration configuration = new Configuration();
configuration.set("timeline.metrics.service.use.groupBy.aggregators", String.valueOf(useGroupByAggregators));
return configuration;
}
@Test
public void testShouldAggregateClusterProperly() throws Exception {
// GIVEN
TimelineMetricAggregator agg =
TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hdb,
getConfigurationForTest(false), metadataManager, null, null);
long startTime = System.currentTimeMillis();
long ctime = startTime;
long minute = 60 * 1000;
hdb.insertMetricRecords(MetricTestHelper.prepareSingleTimelineMetric(ctime, "local1",
"disk_free", 1));
hdb.insertMetricRecords(MetricTestHelper.prepareSingleTimelineMetric(ctime, "local2",
"disk_free", 2));
ctime += 2*minute;
hdb.insertMetricRecords(MetricTestHelper.prepareSingleTimelineMetric(ctime, "local1",
"disk_free", 2));
hdb.insertMetricRecords(MetricTestHelper.prepareSingleTimelineMetric(ctime, "local2",
"disk_free", 1));
// WHEN
long endTime = ctime + minute + 1;
boolean success = agg.doWork(startTime, endTime);
//THEN
Condition condition = new DefaultCondition(null, null, null, null, startTime,
endTime, null, null, true);
condition.setStatement(String.format(GET_CLUSTER_AGGREGATE_SQL,
METRICS_CLUSTER_AGGREGATE_TABLE_NAME));
PreparedStatement pstmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition);
ResultSet rs = pstmt.executeQuery();
int recordCount = 0;
while (rs.next()) {
TimelineClusterMetric currentMetric = metricReader.fromResultSet(rs);
MetricClusterAggregate currentHostAggregate =
metricReader.getMetricClusterAggregateFromResultSet(rs);
if ("disk_free".equals(currentMetric.getMetricName())) {
assertEquals(2, currentHostAggregate.getNumberOfHosts());
assertEquals(2.0, currentHostAggregate.getMax());
assertEquals(1.0, currentHostAggregate.getMin());
assertEquals(3.0, currentHostAggregate.getSum());
recordCount++;
} else {
fail("Unexpected entry");
}
}
}
@Test
public void testShouldAggregateClusterIgnoringInstance() throws Exception {
// GIVEN
TimelineMetricAggregator agg =
TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hdb,
getConfigurationForTest(false), metadataManager, null, null);
TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(metadataManager, false);
long startTime = System.currentTimeMillis();
long ctime = startTime;
long minute = 60 * 1000 * 2;
/**
* Here we have two nodes with two instances each:
* | local1 | local2 |
* instance i1 | 1 | 2 |
* instance i2 | 3 | 4 |
*
*/
// Four 1's at ctime - 100
hdb.insertMetricRecords(MetricTestHelper.prepareSingleTimelineMetric(ctime - 100, "local1",
"i1", "disk_free", 1), true);
// Four 2's at ctime - 100: different host
hdb.insertMetricRecords(MetricTestHelper.prepareSingleTimelineMetric(ctime - 100, "local2",
"i1", "disk_free", 2), true);
// Avoid overwrite
hdb.insertMetricRecords(MetricTestHelper.prepareSingleTimelineMetric(ctime + 100, "local1",
"i2", "disk_free", 3), true);
hdb.insertMetricRecords(MetricTestHelper.prepareSingleTimelineMetric(ctime + 100, "local2",
"i2", "disk_free", 4), true);
ctime += minute;
// Four 1's at ctime + 2 min
hdb.insertMetricRecords(MetricTestHelper.prepareSingleTimelineMetric(ctime - 100, "local1",
"i1", "disk_free", 1), true);
// Four 1's at ctime + 2 min - different host
hdb.insertMetricRecords(MetricTestHelper.prepareSingleTimelineMetric(ctime - 100, "local2",
"i1", "disk_free", 3), true);
hdb.insertMetricRecords(MetricTestHelper.prepareSingleTimelineMetric(ctime + 100, "local1",
"i2", "disk_free", 2), true);
hdb.insertMetricRecords(MetricTestHelper.prepareSingleTimelineMetric(ctime + 100, "local2",
"i2", "disk_free", 4), true);
// WHEN
long endTime = ctime + minute;
boolean success = agg.doWork(startTime - 1000, endTime + 1000);
//THEN
Condition condition = new DefaultCondition(null, null, null, null, startTime,
endTime, null, null, true);
condition.setStatement(String.format(GET_CLUSTER_AGGREGATE_SQL,
METRICS_CLUSTER_AGGREGATE_TABLE_NAME));
PreparedStatement pstmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition);
ResultSet rs = pstmt.executeQuery();
int recordCount = 0;
while (rs.next()) {
TimelineClusterMetric currentMetric = readHelper.fromResultSet(rs);
MetricClusterAggregate currentHostAggregate =
readHelper.getMetricClusterAggregateFromResultSet(rs);
if ("disk_free".equals(currentMetric.getMetricName())) {
System.out.println("OUTPUT: " + currentMetric + " - " + currentHostAggregate);
assertEquals(2, currentHostAggregate.getNumberOfHosts());
assertEquals(5.0, Math.floor(currentHostAggregate.getSum()));
recordCount++;
} else {
if (!currentMetric.getMetricName().equals("live_hosts")) {
fail("Unexpected entry");
}
}
}
Assert.assertEquals(6, recordCount); //Interpolation adds 1 record.
}
@Test
public void testShouldAggregateDifferentMetricsOnClusterProperly() throws Exception {
// GIVEN
TimelineMetricAggregator agg =
TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hdb,
getConfigurationForTest(false), metadataManager, null, null);
TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(metadataManager, false);
// here we put some metrics tha will be aggregated
long startTime = System.currentTimeMillis();
long ctime = startTime;
long minute = 60 * 1000;
hdb.insertMetricRecords(MetricTestHelper.prepareSingleTimelineMetric(ctime, "local1",
"disk_free", 1), true);
hdb.insertMetricRecords(MetricTestHelper.prepareSingleTimelineMetric(ctime, "local2",
"disk_free", 2), true);
hdb.insertMetricRecords(MetricTestHelper.prepareSingleTimelineMetric(ctime, "local1",
"disk_used", 1), true);
ctime += 2*minute;
hdb.insertMetricRecords(MetricTestHelper.prepareSingleTimelineMetric(ctime, "local1",
"disk_free", 2), true);
hdb.insertMetricRecords(MetricTestHelper.prepareSingleTimelineMetric(ctime, "local2",
"disk_free", 1), true);
hdb.insertMetricRecords(MetricTestHelper.prepareSingleTimelineMetric(ctime, "local1",
"disk_used", 1), true);
// WHEN
long endTime = ctime + minute;
boolean success = agg.doWork(startTime, endTime);
//THEN
Condition condition = new DefaultCondition(null, null, null, null, startTime,
endTime, null, null, true);
condition.setStatement(String.format(GET_CLUSTER_AGGREGATE_SQL,
METRICS_CLUSTER_AGGREGATE_TABLE_NAME));
PreparedStatement pstmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition);
ResultSet rs = pstmt.executeQuery();
int recordCount = 0;
while (rs.next()) {
TimelineClusterMetric currentMetric = readHelper.fromResultSet(rs);
MetricClusterAggregate currentHostAggregate =
readHelper.getMetricClusterAggregateFromResultSet(rs);
if ("disk_free".equals(currentMetric.getMetricName())) {
assertEquals(2, currentHostAggregate.getNumberOfHosts());
assertEquals(2.0, currentHostAggregate.getMax());
assertEquals(1.0, currentHostAggregate.getMin());
assertEquals(3.0, currentHostAggregate.getSum());
recordCount++;
} else if ("disk_used".equals(currentMetric.getMetricName())) {
assertEquals(1, currentHostAggregate.getNumberOfHosts());
assertEquals(1.0, currentHostAggregate.getMax());
assertEquals(1.0, currentHostAggregate.getMin());
assertEquals(1.0, currentHostAggregate.getSum());
recordCount++;
} else {
if (!currentMetric.getMetricName().equals("live_hosts")) {
fail("Unexpected entry");
}
}
}
}
@Test
public void testAggregateDailyClusterMetrics() throws Exception {
// GIVEN
TimelineMetricAggregator agg =
TimelineMetricAggregatorFactory.createTimelineClusterAggregatorDaily(hdb, getConfigurationForTest(
false),
metadataManager,
null);
// this time can be virtualized! or made independent from real clock
long startTime = System.currentTimeMillis();
long ctime = startTime;
long hour = 3600 * 1000;
Map<TimelineClusterMetric, MetricHostAggregate> records =
new HashMap<TimelineClusterMetric, MetricHostAggregate>();
records.put(MetricTestHelper.createEmptyTimelineClusterMetric(ctime),
MetricTestHelper.createMetricHostAggregate(4.0, 0.0, 2, 4.0));
records.put(MetricTestHelper.createEmptyTimelineClusterMetric(ctime += hour),
MetricTestHelper.createMetricHostAggregate(4.0, 0.0, 2, 4.0));
records.put(MetricTestHelper.createEmptyTimelineClusterMetric(ctime += hour),
MetricTestHelper.createMetricHostAggregate(4.0, 0.0, 2, 4.0));
records.put(MetricTestHelper.createEmptyTimelineClusterMetric(ctime += hour),
MetricTestHelper.createMetricHostAggregate(4.0, 0.0, 2, 4.0));
hdb.saveClusterAggregateRecordsSecond(records, METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME);
// WHEN
agg.doWork(startTime, ctime + hour + 1000);
// THEN
ResultSet rs = executeQuery("SELECT * FROM METRIC_AGGREGATE_DAILY");
int count = 0;
while (rs.next()) {
TimelineMetric metric = metadataManager.getMetricFromUuid(rs.getBytes("UUID"));
assertEquals("METRIC_NAME", "disk_used", metric.getMetricName());
assertEquals("APP_ID", "test_app", metric.getAppId());
assertEquals("METRIC_SUM", 16.0, rs.getDouble("METRIC_SUM"));
assertEquals("METRIC_COUNT", 8, rs.getLong("METRIC_COUNT"));
assertEquals("METRIC_MAX", 4.0, rs.getDouble("METRIC_MAX"));
assertEquals("METRIC_MIN", 0.0, rs.getDouble("METRIC_MIN"));
count++;
}
assertEquals("Day aggregated row expected ", 1, count);
}
@Test
public void testShouldAggregateClusterOnMinuteProperly() throws Exception {
TimelineMetricAggregator agg =
TimelineMetricAggregatorFactory.createTimelineClusterAggregatorMinute(
hdb,
getConfigurationForTest(false),
metadataManager,
null);
long startTime = System.currentTimeMillis();
long ctime = startTime;
long second = 1000;
long minute = 60*second;
Map<TimelineClusterMetric, MetricClusterAggregate> records =
new HashMap<TimelineClusterMetric, MetricClusterAggregate>();
records.put(MetricTestHelper.createEmptyTimelineClusterMetric(ctime),
new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
records.put(MetricTestHelper.createEmptyTimelineClusterMetric(ctime += second),
new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
records.put(MetricTestHelper.createEmptyTimelineClusterMetric(ctime += second),
new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
records.put(MetricTestHelper.createEmptyTimelineClusterMetric(ctime += second),
new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
hdb.saveClusterAggregateRecords(records);
agg.doWork(startTime, ctime + second);
long oldCtime = ctime + second;
//Next minute
ctime = startTime + minute;
records.put(MetricTestHelper.createEmptyTimelineClusterMetric(ctime),
new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
records.put(MetricTestHelper.createEmptyTimelineClusterMetric(ctime += second),
new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
records.put(MetricTestHelper.createEmptyTimelineClusterMetric(ctime += second),
new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
records.put(MetricTestHelper.createEmptyTimelineClusterMetric(ctime += second),
new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
hdb.saveClusterAggregateRecords(records);
agg.doWork(oldCtime, ctime + second);
ResultSet rs = executeQuery("SELECT * FROM METRIC_AGGREGATE_MINUTE");
int count = 0;
long diff = 0 ;
while (rs.next()) {
TimelineMetric metric = metadataManager.getMetricFromUuid(rs.getBytes("UUID"));
assertEquals("METRIC_NAME", "disk_used", metric.getMetricName());
assertEquals("APP_ID", "test_app", metric.getAppId());
assertEquals("METRIC_SUM", 16.0, rs.getDouble("METRIC_SUM"));
assertEquals("METRIC_COUNT", 8, rs.getLong("METRIC_COUNT"));
assertEquals("METRIC_MAX", 4.0, rs.getDouble("METRIC_MAX"));
assertEquals("METRIC_MIN", 0.0, rs.getDouble("METRIC_MIN"));
if (count == 0) {
diff+=rs.getLong("SERVER_TIME");
} else {
diff-=rs.getLong("SERVER_TIME");
if (diff < 0) {
diff*=-1;
}
assertTrue(diff == minute);
}
count++;
}
assertEquals("One hourly aggregated row expected ", 2, count);
}
@Test
public void testShouldAggregateClusterOnHourProperly() throws Exception {
// GIVEN
TimelineMetricAggregator agg =
TimelineMetricAggregatorFactory.createTimelineClusterAggregatorHourly(
hdb,
getConfigurationForTest(false),
metadataManager,
null);
// this time can be virtualized! or made independent from real clock
long startTime = System.currentTimeMillis();
long ctime = startTime;
long minute = 60 * 1000;
Map<TimelineClusterMetric, MetricClusterAggregate> records =
new HashMap<TimelineClusterMetric, MetricClusterAggregate>();
records.put(MetricTestHelper.createEmptyTimelineClusterMetric(ctime),
new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
records.put(MetricTestHelper.createEmptyTimelineClusterMetric(ctime += minute),
new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
records.put(MetricTestHelper.createEmptyTimelineClusterMetric(ctime += minute),
new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
records.put(MetricTestHelper.createEmptyTimelineClusterMetric(ctime += minute),
new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
hdb.saveClusterAggregateRecords(records);
// WHEN
agg.doWork(startTime, ctime + minute);
// THEN
ResultSet rs = executeQuery("SELECT * FROM METRIC_AGGREGATE_HOURLY");
int count = 0;
while (rs.next()) {
TimelineMetric metric = metadataManager.getMetricFromUuid(rs.getBytes("UUID"));
assertEquals("METRIC_NAME", "disk_used", metric.getMetricName());
assertEquals("APP_ID", "test_app", metric.getAppId());
assertEquals("METRIC_SUM", 16.0, rs.getDouble("METRIC_SUM"));
assertEquals("METRIC_COUNT", 8, rs.getLong("METRIC_COUNT"));
assertEquals("METRIC_MAX", 4.0, rs.getDouble("METRIC_MAX"));
assertEquals("METRIC_MIN", 0.0, rs.getDouble("METRIC_MIN"));
count++;
}
assertEquals("One hourly aggregated row expected ", 1, count);
}
@Test
public void testShouldAggregateDifferentMetricsOnHourProperly() throws Exception {
// GIVEN
TimelineMetricAggregator agg =
TimelineMetricAggregatorFactory.createTimelineClusterAggregatorHourly(
hdb,
getConfigurationForTest(false),
metadataManager,
null);
long startTime = System.currentTimeMillis();
long ctime = startTime;
long minute = 60 * 1000;
Map<TimelineClusterMetric, MetricClusterAggregate> records =
new HashMap<TimelineClusterMetric, MetricClusterAggregate>();
records.put(MetricTestHelper.createEmptyTimelineClusterMetric("disk_used", ctime),
new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
records.put(MetricTestHelper.createEmptyTimelineClusterMetric("disk_free", ctime),
new MetricClusterAggregate(1.0, 2, 0.0, 1.0, 1.0));
records.put(MetricTestHelper.createEmptyTimelineClusterMetric("disk_used", ctime += minute),
new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
records.put(MetricTestHelper.createEmptyTimelineClusterMetric("disk_free", ctime),
new MetricClusterAggregate(1.0, 2, 0.0, 1.0, 1.0));
records.put(MetricTestHelper.createEmptyTimelineClusterMetric("disk_used", ctime += minute),
new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
records.put(MetricTestHelper.createEmptyTimelineClusterMetric("disk_free", ctime),
new MetricClusterAggregate(1.0, 2, 0.0, 1.0, 1.0));
records.put(MetricTestHelper.createEmptyTimelineClusterMetric("disk_used", ctime += minute),
new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
records.put(MetricTestHelper.createEmptyTimelineClusterMetric("disk_free", ctime),
new MetricClusterAggregate(1.0, 2, 0.0, 1.0, 1.0));
hdb.saveClusterAggregateRecords(records);
// WHEN
agg.doWork(startTime, ctime + minute);
// THEN
ResultSet rs = executeQuery("SELECT * FROM METRIC_AGGREGATE_HOURLY");
int count = 0;
while (rs.next()) {
TimelineMetric metric = metadataManager.getMetricFromUuid(rs.getBytes("UUID"));
if ("disk_used".equals(metric.getMetricName())) {
assertEquals("APP_ID", "test_app", metric.getAppId());
assertEquals("METRIC_SUM", 4.0, rs.getDouble("METRIC_SUM"));
assertEquals("METRIC_COUNT", 2, rs.getLong("METRIC_COUNT"));
assertEquals("METRIC_MAX", 4.0, rs.getDouble("METRIC_MAX"));
assertEquals("METRIC_MIN", 0.0, rs.getDouble("METRIC_MIN"));
} else if ("disk_free".equals(metric.getMetricName())) {
assertEquals("APP_ID", "test_app", metric.getAppId());
assertEquals("METRIC_SUM", 1.0, rs.getDouble("METRIC_SUM"));
assertEquals("METRIC_COUNT", 2, rs.getLong("METRIC_COUNT"));
assertEquals("METRIC_MAX", 1.0, rs.getDouble("METRIC_MAX"));
assertEquals("METRIC_MIN", 1.0, rs.getDouble("METRIC_MIN"));
}
count++;
}
assertEquals("Two hourly aggregated row expected ", 2, count);
}
@Test
public void testAppLevelHostMetricAggregates() throws Exception {
Configuration conf = getConfigurationForTest(false);
conf.set(CLUSTER_AGGREGATOR_APP_IDS, "app1");
TimelineMetricAggregator agg =
TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(
hdb,
conf,
metadataManager,
null,
null);
TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(metadataManager, false);
long startTime = System.currentTimeMillis();
long ctime = startTime;
long minute = 60 * 1000;
hdb.insertMetricRecords(MetricTestHelper.prepareSingleTimelineMetric((ctime), "local1",
"app1", null, "app_metric_random", 1), true);
ctime += 10;
hdb.insertMetricRecords(MetricTestHelper.prepareSingleTimelineMetric(ctime, "local1",
"cpu_user", 1), true);
ctime += 10;
hdb.insertMetricRecords(MetricTestHelper.prepareSingleTimelineMetric(ctime, "local2",
"cpu_user", 2), true);
// WHEN
long endTime = ctime + minute;
boolean success = agg.doWork(startTime, endTime);
//THEN
List<byte[]> uuids = metadataManager.getUuids(new ArrayList<String>() {{ add("cpu_user"); }},
null,
"app1", null);
Condition condition = new DefaultCondition(uuids,
new ArrayList<String>() {{ add("cpu_user"); }}, null, "app1", null,
startTime - 90000, endTime, null, null, true);
condition.setStatement(String.format(GET_CLUSTER_AGGREGATE_SQL,
METRICS_CLUSTER_AGGREGATE_TABLE_NAME));
PreparedStatement pstmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition);
ResultSet rs = pstmt.executeQuery();
int recordCount = 0;
TimelineClusterMetric currentMetric = null;
MetricClusterAggregate currentHostAggregate = null;
while (rs.next()) {
currentMetric = readHelper.fromResultSet(rs);
currentHostAggregate = readHelper.getMetricClusterAggregateFromResultSet(rs);
recordCount++;
}
assertEquals(3, recordCount);
assertNotNull(currentMetric);
assertEquals("cpu_user", currentMetric.getMetricName());
assertEquals("app1", currentMetric.getAppId());
assertNotNull(currentHostAggregate);
assertEquals(1, currentHostAggregate.getNumberOfHosts());
assertEquals(1.0d, currentHostAggregate.getSum());
}
@Test
public void testClusterAggregateMetricNormalization() throws Exception {
TimelineMetricAggregator agg =
TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(
hdb,
getConfigurationForTest(false),
metadataManager,
null,
null);
TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(metadataManager, false);
long currentTime = System.currentTimeMillis();
// Sample data
TimelineMetric metric1 = new TimelineMetric();
metric1.setMetricName("yarn.ClusterMetrics.NumActiveNMs");
metric1.setAppId("resourcemanager");
metric1.setHostName("h1");
metric1.setStartTime(currentTime);
metric1.setMetricValues(new TreeMap<Long, Double>() {{
put(currentTime + 10000, 1.0);
put(currentTime + 20000, 1.0);
put(currentTime + 30000, 1.0);
put(currentTime + 40000, 1.0);
put(currentTime + 50000, 1.0);
put(currentTime + 60000, 1.0);
put(currentTime + 70000, 1.0);
}});
TimelineMetric metric2 = new TimelineMetric();
metric2.setMetricName("yarn.ClusterMetrics.NumActiveNMs");
metric2.setAppId("resourcemanager");
metric2.setHostName("h1");
metric2.setStartTime(currentTime + 70000);
metric2.setMetricValues(new TreeMap<Long, Double>() {{
put(currentTime + 70000, 1.0);
put(currentTime + 80000, 1.0);
put(currentTime + 90000, 1.0);
put(currentTime + 100000, 1.0);
put(currentTime + 110000, 1.0);
put(currentTime + 120000, 1.0);
put(currentTime + 130000, 1.0);
}});
TimelineMetrics metrics = new TimelineMetrics();
metrics.setMetrics(Collections.singletonList(metric1));
insertMetricRecords(conn, metrics);
metrics.setMetrics(Collections.singletonList(metric2));
insertMetricRecords(conn, metrics);
long startTime = currentTime - 3*60*1000;
long endTime = currentTime + 3*60*1000;
agg.doWork(startTime, endTime);
Condition condition = new DefaultCondition(null, null, null, null, startTime,
endTime, null, null, true);
condition.setStatement(String.format(GET_CLUSTER_AGGREGATE_SQL,
METRICS_CLUSTER_AGGREGATE_TABLE_NAME));
PreparedStatement pstmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition);
ResultSet rs = pstmt.executeQuery();
int recordCount = 0;
while (rs.next()) {
TimelineClusterMetric currentMetric = readHelper.fromResultSet(rs);
MetricClusterAggregate currentHostAggregate = readHelper.getMetricClusterAggregateFromResultSet(rs);
if ("yarn.ClusterMetrics.NumActiveNMs".equals(currentMetric.getMetricName())) {
assertEquals(1, currentHostAggregate.getNumberOfHosts());
assertEquals(1.0, currentHostAggregate.getMax());
assertEquals(1.0, currentHostAggregate.getMin());
assertEquals(1.0, currentHostAggregate.getSum());
recordCount++;
} else {
if (!currentMetric.getMetricName().equals("live_hosts")) {
fail("Unexpected entry");
}
}
}
Assert.assertEquals(10, recordCount); //With interpolation.
}
@Test
public void testAggregationUsingGroupByQuery() throws Exception {
// GIVEN
TimelineMetricAggregator agg =
TimelineMetricAggregatorFactory.createTimelineClusterAggregatorHourly(
hdb,
getConfigurationForTest(true),
metadataManager,
null);
long startTime = System.currentTimeMillis();
long ctime = startTime;
long minute = 60 * 1000;
Map<TimelineClusterMetric, MetricClusterAggregate> records =
new HashMap<TimelineClusterMetric, MetricClusterAggregate>();
records.put(MetricTestHelper.createEmptyTimelineClusterMetric("disk_used", ctime += minute),
new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
records.put(MetricTestHelper.createEmptyTimelineClusterMetric("disk_free", ctime),
new MetricClusterAggregate(1.0, 2, 0.0, 1.0, 1.0));
records.put(MetricTestHelper.createEmptyTimelineClusterMetric("disk_used", ctime += minute),
new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
records.put(MetricTestHelper.createEmptyTimelineClusterMetric("disk_free", ctime),
new MetricClusterAggregate(1.0, 2, 0.0, 1.0, 1.0));
records.put(MetricTestHelper.createEmptyTimelineClusterMetric("disk_used", ctime += minute),
new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
records.put(MetricTestHelper.createEmptyTimelineClusterMetric("disk_free", ctime),
new MetricClusterAggregate(1.0, 2, 0.0, 1.0, 1.0));
records.put(MetricTestHelper.createEmptyTimelineClusterMetric("disk_used", ctime += minute),
new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
records.put(MetricTestHelper.createEmptyTimelineClusterMetric("disk_free", ctime),
new MetricClusterAggregate(1.0, 2, 0.0, 1.0, 1.0));
hdb.saveClusterAggregateRecords(records);
// WHEN
agg.doWork(startTime, ctime + minute);
// THEN
ResultSet rs = executeQuery("SELECT * FROM METRIC_AGGREGATE_HOURLY");
int count = 0;
while (rs.next()) {
TimelineMetric metric = metadataManager.getMetricFromUuid(rs.getBytes("UUID"));
if ("disk_used".equals(metric.getMetricName())) {
assertEquals("APP_ID", "test_app", metric.getAppId());
assertEquals("METRIC_SUM", 4.0, rs.getDouble("METRIC_SUM"));
assertEquals("METRIC_COUNT", 2, rs.getLong("METRIC_COUNT"));
assertEquals("METRIC_MAX", 4.0, rs.getDouble("METRIC_MAX"));
assertEquals("METRIC_MIN", 0.0, rs.getDouble("METRIC_MIN"));
} else if ("disk_free".equals(metric.getMetricName())) {
assertEquals("APP_ID", "test_app", metric.getAppId());
assertEquals("METRIC_SUM", 1.0, rs.getDouble("METRIC_SUM"));
assertEquals("METRIC_COUNT", 2, rs.getLong("METRIC_COUNT"));
assertEquals("METRIC_MAX", 1.0, rs.getDouble("METRIC_MAX"));
assertEquals("METRIC_MIN", 1.0, rs.getDouble("METRIC_MIN"));
}
count++;
}
assertEquals("Two hourly aggregated row expected ", 2, count);
}
private ResultSet executeQuery(String query) throws SQLException {
Connection conn = getConnection(getUrl());
Statement stmt = conn.createStatement();
return stmt.executeQuery(query);
}
}