blob: c73ac36f883e2fe093840f9f27fdd6c6e1c7da18 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ambari.metrics.core.timeline.aggregators;
import static org.apache.ambari.metrics.core.timeline.aggregators.AggregatorUtils.getRoundedAggregateTimeMillis;
import static org.apache.ambari.metrics.core.timeline.aggregators.AggregatorUtils.getRoundedCheckPointTimeMillis;
import static org.apache.ambari.metrics.core.timeline.aggregators.AggregatorUtils.getTimeSlices;
import static org.apache.ambari.metrics.core.timeline.aggregators.AggregatorUtils.sliceFromTimelineMetric;
import static org.apache.ambari.metrics.core.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_SECOND;
import static org.easymock.EasyMock.anyObject;
import static org.easymock.EasyMock.createNiceMock;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.replay;
import java.sql.ResultSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
import org.apache.ambari.metrics.core.timeline.discovery.TimelineMetricMetadataKey;
import org.apache.ambari.metrics.core.timeline.discovery.TimelineMetricMetadataManager;
import org.codehaus.jackson.map.ObjectMapper;
import org.junit.Test;
import junit.framework.Assert;
public class TimelineMetricClusterAggregatorSecondTest {
@Test
public void testClusterSecondAggregatorWithInterpolation() {
long aggregatorInterval = 120000l;
long sliceInterval = 30000l;
long metricInterval = 10000l;
TimelineMetricMetadataManager metricMetadataManagerMock = createNiceMock(TimelineMetricMetadataManager.class);
expect(metricMetadataManagerMock.getUuid(anyObject(TimelineClusterMetric.class))).andReturn(new byte[16]).once();
replay(metricMetadataManagerMock);
long roundedEndTime = getRoundedAggregateTimeMillis(aggregatorInterval);
long roundedStartTime = roundedEndTime - aggregatorInterval;
List<Long[]> timeSlices = getTimeSlices(roundedStartTime ,
roundedEndTime, sliceInterval);
TreeMap<Long, Double> metricValues = new TreeMap<>();
long startTime = roundedEndTime - aggregatorInterval;
for (int i=1; startTime < roundedEndTime; i++) {
startTime += metricInterval;
if (i%6 == 1 || i%6 == 2) {
metricValues.put(startTime, (double)i);
}
}
TimelineMetric counterMetric = new TimelineMetric();
counterMetric.setMetricName("TestMetric");
counterMetric.setHostName("TestHost");
counterMetric.setAppId("TestAppId");
counterMetric.setMetricValues(metricValues);
counterMetric.setType("COUNTER");
Map<TimelineClusterMetric, Double> timelineClusterMetricMap = sliceFromTimelineMetric(counterMetric, timeSlices, true);
TimelineClusterMetric timelineClusterMetric = new TimelineClusterMetric(counterMetric.getMetricName(), counterMetric.getAppId(),
counterMetric.getInstanceId(), 0l);
timelineClusterMetric.setTimestamp(roundedStartTime + 2*sliceInterval);
Assert.assertTrue(timelineClusterMetricMap.containsKey(timelineClusterMetric));
Assert.assertEquals(timelineClusterMetricMap.get(timelineClusterMetric), 6.0);
timelineClusterMetric.setTimestamp(roundedStartTime + 4*sliceInterval);
Assert.assertTrue(timelineClusterMetricMap.containsKey(timelineClusterMetric));
Assert.assertEquals(timelineClusterMetricMap.get(timelineClusterMetric), 12.0);
TimelineMetric metric = new TimelineMetric();
metric.setMetricName("TestMetric");
metric.setHostName("TestHost");
metric.setAppId("TestAppId");
metric.setMetricValues(metricValues);
timelineClusterMetricMap = sliceFromTimelineMetric(metric, timeSlices, true);
timelineClusterMetric = new TimelineClusterMetric(metric.getMetricName(), metric.getAppId(),
metric.getInstanceId(), 0l);
timelineClusterMetric.setTimestamp(roundedStartTime + 2*sliceInterval);
Assert.assertTrue(timelineClusterMetricMap.containsKey(timelineClusterMetric));
Assert.assertEquals(timelineClusterMetricMap.get(timelineClusterMetric), 4.5);
timelineClusterMetric.setTimestamp(roundedStartTime + 4*sliceInterval);
Assert.assertTrue(timelineClusterMetricMap.containsKey(timelineClusterMetric));
Assert.assertEquals(timelineClusterMetricMap.get(timelineClusterMetric), 7.5);
}
@Test
public void testShouldAggregateProperly() {
long aggregatorInterval = 120000l;
long sliceInterval = 30000l;
Configuration configuration = new Configuration();
TimelineMetricMetadataManager metricMetadataManagerMock = createNiceMock(TimelineMetricMetadataManager.class);
expect(metricMetadataManagerMock.getMetadataCacheValue((TimelineMetricMetadataKey) anyObject()))
.andReturn(null).anyTimes();
replay(metricMetadataManagerMock);
TimelineMetricClusterAggregatorSecond secondAggregator = new TimelineMetricClusterAggregatorSecond(
METRIC_AGGREGATE_SECOND, metricMetadataManagerMock, null, configuration, null,
aggregatorInterval, 2, "false", "", "", aggregatorInterval, sliceInterval, null
);
long startTime = getRoundedCheckPointTimeMillis(System.currentTimeMillis(),aggregatorInterval);
List<Long[]> timeslices = getTimeSlices(startTime, startTime + aggregatorInterval, sliceInterval);
Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics = new HashMap<>();
long seconds = 1000;
TimelineMetric timelineMetric = new TimelineMetric();
timelineMetric.setMetricName("m1");
timelineMetric.setHostName("h1");
timelineMetric.setAppId("a1");
timelineMetric.setType("GUAGE");
timelineMetric.setStartTime(startTime);
/*
0 +30s +60s +90s +120s +150s +180s
| | | | | | |
(1) (2) (3) (4) (5) (6)
*/
// Case 1 : Points present in all the required timeslices.
// Life is good! Ignore (5) and (6).
TreeMap<Long, Double> metricValues = new TreeMap<Long, Double>();
metricValues.put(startTime + 15*seconds, 1.0);
metricValues.put(startTime + 45*seconds, 2.0);
metricValues.put(startTime + 75*seconds, 3.0);
metricValues.put(startTime + 105*seconds, 4.0);
metricValues.put(startTime + 135*seconds, 5.0);
metricValues.put(startTime + 165*seconds, 6.0);
timelineMetric.setMetricValues(metricValues);
secondAggregator.processAggregateClusterMetrics(aggregateClusterMetrics, timelineMetric, timeslices);
Assert.assertEquals(aggregateClusterMetrics.size(), 4);
TimelineClusterMetric timelineClusterMetric = new TimelineClusterMetric(timelineMetric.getMetricName(),
timelineMetric.getAppId(), timelineMetric.getInstanceId(), startTime + 30*seconds);
Assert.assertTrue(aggregateClusterMetrics.containsKey(timelineClusterMetric));
Assert.assertEquals(aggregateClusterMetrics.get(timelineClusterMetric).getSum(), 1.0);
timelineClusterMetric.setTimestamp(startTime + 4*30*seconds);
Assert.assertTrue(aggregateClusterMetrics.containsKey(timelineClusterMetric));
Assert.assertEquals(aggregateClusterMetrics.get(timelineClusterMetric).getSum(),4.0);
metricValues.clear();
aggregateClusterMetrics.clear();
/*
0 +30s +60s +90s +120s +150s +180s
| | | | | | |
(1) (3) (4) (5) (6)
*/
// Case 2 : Some "middle" point missing in the required timeslices.
// Interpolate the middle point. Ignore (5) and (6).
metricValues.put(startTime + 15*seconds, 1.0);
metricValues.put(startTime + 75*seconds, 3.0);
metricValues.put(startTime + 105*seconds, 4.0);
metricValues.put(startTime + 135*seconds, 5.0);
metricValues.put(startTime + 165*seconds, 6.0);
timelineMetric.setMetricValues(metricValues);
secondAggregator.processAggregateClusterMetrics(aggregateClusterMetrics, timelineMetric, timeslices);
Assert.assertEquals(aggregateClusterMetrics.size(), 4);
timelineClusterMetric.setTimestamp(startTime + 60*seconds);
Assert.assertTrue(aggregateClusterMetrics.containsKey(timelineClusterMetric));
Assert.assertEquals(aggregateClusterMetrics.get(timelineClusterMetric).getSum(), 2.0);
metricValues.clear();
aggregateClusterMetrics.clear();
/*
0 +30s +60s +90s +120s +150s +180s
| | | | | | |
(1) (2) (3) (5) (6)
*/
// Case 3 : "end" point missing in the required timeslices.
// Use all points to get missing point if COUNTER. Else use just (3). Ignore (6).
metricValues.put(startTime + 15*seconds, 1.0);
metricValues.put(startTime + 45*seconds, 2.0);
metricValues.put(startTime + 75*seconds, 3.0);
metricValues.put(startTime + 135*seconds, 5.0);
metricValues.put(startTime + 165*seconds, 6.0);
timelineMetric.setMetricValues(metricValues);
timelineMetric.setType("GUAGE");
secondAggregator.processAggregateClusterMetrics(aggregateClusterMetrics, timelineMetric, timeslices);
Assert.assertEquals(aggregateClusterMetrics.size(), 4);
timelineClusterMetric.setTimestamp(startTime + 120*seconds);
Assert.assertTrue(aggregateClusterMetrics.containsKey(timelineClusterMetric));
Assert.assertEquals(aggregateClusterMetrics.get(timelineClusterMetric).getSum(), 3.0);
aggregateClusterMetrics.clear();
timelineMetric.setType("COUNTER");
secondAggregator.processAggregateClusterMetrics(aggregateClusterMetrics, timelineMetric, timeslices);
Assert.assertEquals(aggregateClusterMetrics.size(), 4);
timelineClusterMetric.setTimestamp(startTime + 120*seconds);
Assert.assertTrue(aggregateClusterMetrics.containsKey(timelineClusterMetric));
Assert.assertEquals(aggregateClusterMetrics.get(timelineClusterMetric).getSum(), 4.5);
metricValues.clear();
aggregateClusterMetrics.clear();
/*
0 +30s +60s +90s +120s +150s +180s
| | | | | | |
(2) (3) (4) (5) (6)
*/
// Case 4 : "start" point missing in the required timeslices.
// Interpolate with only (2) to get missing point if GUAGE metric. Else use all points for COUNTER.
metricValues.put(startTime + 45*seconds, 2.0);
metricValues.put(startTime + 75*seconds, 3.0);
metricValues.put(startTime + 105*seconds, 4.0);
metricValues.put(startTime + 135*seconds, 5.0);
metricValues.put(startTime + 165*seconds, 6.0);
timelineMetric.setMetricValues(metricValues);
timelineMetric.setType("GUAGE");
secondAggregator.processAggregateClusterMetrics(aggregateClusterMetrics, timelineMetric, timeslices);
Assert.assertEquals(aggregateClusterMetrics.size(), 4);
timelineClusterMetric.setTimestamp(startTime + 30*seconds);
Assert.assertTrue(aggregateClusterMetrics.containsKey(timelineClusterMetric));
Assert.assertEquals(aggregateClusterMetrics.get(timelineClusterMetric).getSum(), 2.0);
aggregateClusterMetrics.clear();
timelineMetric.setType("COUNTER");
int liveHosts = secondAggregator.processAggregateClusterMetrics(aggregateClusterMetrics, timelineMetric, timeslices);
Assert.assertEquals(liveHosts, 1);
Assert.assertEquals(aggregateClusterMetrics.size(), 4);
timelineClusterMetric.setTimestamp(startTime + 30*seconds);
Assert.assertTrue(aggregateClusterMetrics.containsKey(timelineClusterMetric));
Assert.assertEquals(aggregateClusterMetrics.get(timelineClusterMetric).getSum(), 1.5);
metricValues.clear();
aggregateClusterMetrics.clear();
/*
0 +30s +60s +90s +120s +150s +180s
| | | | | | |
(5) (6)
*/
// Case 5 : Well, we have nothing in the 2 min window.
// Use (5) to paint the 2 min window as (5).
metricValues.put(startTime + 135*seconds, 5.0);
metricValues.put(startTime + 165*seconds, 6.0);
timelineMetric.setMetricValues(metricValues);
timelineMetric.setType("GUAGE");
secondAggregator.processAggregateClusterMetrics(aggregateClusterMetrics, timelineMetric, timeslices);
Assert.assertEquals(aggregateClusterMetrics.size(), 4);
timelineClusterMetric.setTimestamp(startTime + 30*seconds);
Assert.assertTrue(aggregateClusterMetrics.containsKey(timelineClusterMetric));
Assert.assertEquals(aggregateClusterMetrics.get(timelineClusterMetric).getSum(), 5.0);
aggregateClusterMetrics.clear();
timelineMetric.setType("COUNTER");
liveHosts = secondAggregator.processAggregateClusterMetrics(aggregateClusterMetrics, timelineMetric, timeslices);
Assert.assertEquals(liveHosts, 1);
Assert.assertEquals(aggregateClusterMetrics.size(), 4);
timelineClusterMetric.setTimestamp(startTime + 60*seconds);
Assert.assertTrue(aggregateClusterMetrics.containsKey(timelineClusterMetric));
Assert.assertEquals(aggregateClusterMetrics.get(timelineClusterMetric).getSum(), 2.5);
metricValues.clear();
aggregateClusterMetrics.clear();
}
@Test
public void testLiveHostCounterMetrics() throws Exception {
long aggregatorInterval = 120000;
long sliceInterval = 30000;
Configuration configuration = new Configuration();
TimelineMetricMetadataManager metricMetadataManagerMock = createNiceMock(TimelineMetricMetadataManager.class);
expect(metricMetadataManagerMock.getMetadataCacheValue((TimelineMetricMetadataKey) anyObject())).andReturn(null).anyTimes();
/*
m1-h1-a1
m2-h1-a1
m2-h1-a2
m2-h2-a1
m2-h2-a2
m2-h3-a2
So live_hosts : a1 = 2
live_hosts : a2 = 3
*/
TimelineMetric metric1 = new TimelineMetric("m1", "h1", "a1", null);
TimelineMetric metric2 = new TimelineMetric("m2", "h1", "a1", null);
TimelineMetric metric3 = new TimelineMetric("m2", "h1", "a2", null);
TimelineMetric metric4 = new TimelineMetric("m2", "h2", "a1", null);
TimelineMetric metric5 = new TimelineMetric("m2", "h2", "a2", null);
TimelineMetric metric6 = new TimelineMetric("m2", "h3", "a2", null);
expect(metricMetadataManagerMock.getMetricFromUuid((byte[]) anyObject())).
andReturn(metric1).andReturn(metric2).andReturn(metric3).
andReturn(metric4).andReturn(metric5).andReturn(metric6);
replay(metricMetadataManagerMock);
TimelineMetricClusterAggregatorSecond secondAggregator = new TimelineMetricClusterAggregatorSecond(
METRIC_AGGREGATE_SECOND, metricMetadataManagerMock, null, configuration, null,
aggregatorInterval, 2, "false", "", "", aggregatorInterval,
sliceInterval, null);
long now = System.currentTimeMillis();
long startTime = now - 120000;
long seconds = 1000;
List<Long[]> slices = getTimeSlices(startTime, now, sliceInterval);
ResultSet rs = createNiceMock(ResultSet.class);
TreeMap<Long, Double> metricValues = new TreeMap<>();
metricValues.put(startTime + 15 * seconds, 1.0);
metricValues.put(startTime + 45 * seconds, 2.0);
metricValues.put(startTime + 75 * seconds, 3.0);
metricValues.put(startTime + 105 * seconds, 4.0);
expect(rs.next()).andReturn(true).times(6);
expect(rs.next()).andReturn(false);
expect(rs.getLong("SERVER_TIME")).andReturn(now - 150000).times(6);
expect(rs.getLong("START_TIME")).andReturn(now - 150000).times(6);
ObjectMapper mapper = new ObjectMapper();
expect(rs.getString("METRICS")).andReturn(mapper.writeValueAsString(metricValues)).times(6);
replay(rs);
Map<TimelineClusterMetric, MetricClusterAggregate> aggregates = secondAggregator.aggregateMetricsFromResultSet(rs, slices);
Assert.assertNotNull(aggregates);
MetricClusterAggregate a1 = null, a2 = null;
for (Map.Entry<TimelineClusterMetric, MetricClusterAggregate> m : aggregates.entrySet()) {
if (m.getKey().getMetricName().equals("live_hosts") && m.getKey().getAppId().equals("a1")) {
a1 = m.getValue();
}
if (m.getKey().getMetricName().equals("live_hosts") && m.getKey().getAppId().equals("a2")) {
a2 = m.getValue();
}
}
Assert.assertNotNull(a1);
Assert.assertNotNull(a2);
Assert.assertEquals(2d, a1.getSum());
Assert.assertEquals(3d, a2.getSum());
}
}