blob: 5b4754565420f0d092692461867feef34319f709 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ambari.metrics.core.timeline;
import junit.framework.Assert;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
import org.apache.ambari.metrics.core.timeline.aggregators.TimelineClusterMetric;
import org.apache.ambari.metrics.core.timeline.discovery.TimelineMetricMetadataManager;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_APP_IDS;
import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.HOST_APP_ID;
import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_COLLECTOR_IGNITE_NODES;
import static org.apache.ambari.metrics.core.timeline.aggregators.AggregatorUtils.getRoundedCheckPointTimeMillis;
import static org.easymock.EasyMock.anyObject;
import static org.easymock.EasyMock.createNiceMock;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.replay;
import static org.powermock.api.easymock.PowerMock.mockStatic;
import static org.powermock.api.easymock.PowerMock.replayAll;
@RunWith(PowerMockRunner.class)
@PrepareForTest(TimelineMetricConfiguration.class)
@PowerMockIgnore("javax.management.*")
public class TimelineMetricsIgniteCacheTest {
private static TimelineMetricsIgniteCache timelineMetricsIgniteCache;
@BeforeClass
public static void setupConf() throws Exception {
TimelineMetricConfiguration conf = new TimelineMetricConfiguration(new
Configuration(), new Configuration());
mockStatic(TimelineMetricConfiguration.class);
expect(TimelineMetricConfiguration.getInstance()).andReturn(conf).anyTimes();
conf.getMetricsConf().set(CLUSTER_AGGREGATOR_APP_IDS, "appIdForHostsAggr");
conf.getMetricsConf().set(TIMELINE_METRICS_COLLECTOR_IGNITE_NODES, "localhost");
replayAll();
timelineMetricsIgniteCache = new TimelineMetricsIgniteCache();
}
@Test
public void putEvictMetricsFromCacheSlicesMerging() throws Exception {
long cacheSliceIntervalMillis = 30000L;
TimelineMetricMetadataManager metricMetadataManagerMock = createNiceMock(TimelineMetricMetadataManager.class);
expect(metricMetadataManagerMock.getUuid(anyObject(TimelineClusterMetric.class))).andReturn(new byte[16]).once();
replay(metricMetadataManagerMock);
long startTime = getRoundedCheckPointTimeMillis(System.currentTimeMillis(), cacheSliceIntervalMillis);
long seconds = 1000;
TreeMap<Long, Double> metricValues = new TreeMap<Long, Double>();
/*
0 +30s +60s
| | |
(1)(2)(3) (4)(5)(6) h1
*/
// Case 1 : data points are distributed equally, no values are lost, single host.
metricValues.put(startTime + 4*seconds, 1.0);
metricValues.put(startTime + 14*seconds, 2.0);
metricValues.put(startTime + 24*seconds, 3.0);
metricValues.put(startTime + 34*seconds, 4.0);
metricValues.put(startTime + 44*seconds, 5.0);
metricValues.put(startTime + 54*seconds, 6.0);
TimelineMetric timelineMetric = new TimelineMetric("metric1", "host1", "app1", "instance1");
timelineMetric.setStartTime(metricValues.firstKey());
timelineMetric.addMetricValues(metricValues);
Collection<TimelineMetric> timelineMetrics = new ArrayList<>();
timelineMetrics.add(timelineMetric);
timelineMetricsIgniteCache.putMetrics(timelineMetrics, metricMetadataManagerMock);
Map<TimelineClusterMetric, MetricClusterAggregate> aggregateMap = timelineMetricsIgniteCache.evictMetricAggregates(startTime, startTime + 120*seconds);
Assert.assertEquals(aggregateMap.size(), 2);
TimelineClusterMetric timelineClusterMetric = new TimelineClusterMetric(timelineMetric.getMetricName(),
timelineMetric.getAppId(), timelineMetric.getInstanceId(), startTime + 30*seconds);
Assert.assertTrue(aggregateMap.containsKey(timelineClusterMetric));
Assert.assertEquals(2.0, aggregateMap.get(timelineClusterMetric).getSum());
timelineClusterMetric.setTimestamp(startTime + 2*30*seconds);
Assert.assertTrue(aggregateMap.containsKey(timelineClusterMetric));
Assert.assertEquals(5.0, aggregateMap.get(timelineClusterMetric).getSum());
metricValues.clear();
timelineMetrics.clear();
/*
0 +30s +60s
| | |
(1)(2)(3) (4)(5)(6) h1, h2
*/
// Case 2 : data points are distributed equally, no values are lost, two hosts.
metricValues.put(startTime + 4*seconds, 1.0);
metricValues.put(startTime + 14*seconds, 2.0);
metricValues.put(startTime + 24*seconds, 3.0);
metricValues.put(startTime + 34*seconds, 4.0);
metricValues.put(startTime + 44*seconds, 5.0);
metricValues.put(startTime + 54*seconds, 6.0);
timelineMetric = new TimelineMetric("metric1", "host1", "app1", "instance1");
timelineMetric.setMetricValues(metricValues);
metricValues = new TreeMap<>();
metricValues.put(startTime + 5*seconds, 2.0);
metricValues.put(startTime + 15*seconds, 4.0);
metricValues.put(startTime + 25*seconds, 6.0);
metricValues.put(startTime + 35*seconds, 8.0);
metricValues.put(startTime + 45*seconds, 10.0);
metricValues.put(startTime + 55*seconds, 12.0);
TimelineMetric timelineMetric2 = new TimelineMetric("metric1", "host2", "app1", "instance1");
timelineMetric2.setMetricValues(metricValues);
timelineMetrics = new ArrayList<>();
timelineMetrics.add(timelineMetric);
timelineMetrics.add(timelineMetric2);
timelineMetricsIgniteCache.putMetrics(timelineMetrics, metricMetadataManagerMock);
aggregateMap = timelineMetricsIgniteCache.evictMetricAggregates(startTime, startTime + 120*seconds);
Assert.assertEquals(aggregateMap.size(), 2);
timelineClusterMetric = new TimelineClusterMetric(timelineMetric.getMetricName(),
timelineMetric.getAppId(), timelineMetric.getInstanceId(), startTime + 30*seconds);
Assert.assertTrue(aggregateMap.containsKey(timelineClusterMetric));
Assert.assertEquals(6.0, aggregateMap.get(timelineClusterMetric).getSum());
timelineClusterMetric.setTimestamp(startTime + 2*30*seconds);
Assert.assertTrue(aggregateMap.containsKey(timelineClusterMetric));
Assert.assertEquals(15.0, aggregateMap.get(timelineClusterMetric).getSum());
metricValues.clear();
timelineMetrics.clear();
Assert.assertEquals(0d, timelineMetricsIgniteCache.getPointInTimeCacheMetrics().get("Cluster_KeySize"));
}
@Test
public void updateAppAggregatesFromHostMetricTest() {
//make sure hosts metrics are aggregated for appIds from "timeline.metrics.service.cluster.aggregator.appIds"
long cacheSliceIntervalMillis = 30000L;
TimelineMetricMetadataManager metricMetadataManagerMock = createNiceMock(TimelineMetricMetadataManager.class);
expect(metricMetadataManagerMock.getUuid(anyObject(TimelineClusterMetric.class))).andReturn(new byte[16]).once();
expect(metricMetadataManagerMock.getHostedAppsCache()).andReturn(new HashMap<>()).anyTimes();
replay(metricMetadataManagerMock);
long startTime = getRoundedCheckPointTimeMillis(System.currentTimeMillis(), cacheSliceIntervalMillis);
long seconds = 1000;
TreeMap<Long, Double> metricValues = new TreeMap<Long, Double>();
List<TimelineMetric> timelineMetrics = new ArrayList<>();
TimelineMetric timelineMetric;
metricValues = new TreeMap<>();
metricValues.put(startTime + 15*seconds, 1.0);
metricValues.put(startTime + 55*seconds, 2.0);
timelineMetric = new TimelineMetric("host_metric", "host1", HOST_APP_ID, "instance1");
timelineMetric.setMetricValues(metricValues);
timelineMetrics.add(timelineMetric);
metricValues = new TreeMap<>();
metricValues.put(startTime + 45*seconds, 3.0);
metricValues.put(startTime + 85*seconds, 4.0);
timelineMetric = new TimelineMetric("app_metric", "host1", "appIdForHostsAggr", "instance1");
timelineMetric.setMetricValues(metricValues);
timelineMetrics.add(timelineMetric);
metricValues = new TreeMap<>();
metricValues.put(startTime + 85*seconds, 5.0);
timelineMetric = new TimelineMetric("host_metric", "host1", HOST_APP_ID, "instance1");
timelineMetric.setMetricValues(metricValues);
timelineMetrics.add(timelineMetric);
metricValues = new TreeMap<>();
metricValues.put(startTime + 85*seconds, 6.0);
timelineMetric = new TimelineMetric("host_metric", "host2", HOST_APP_ID, "instance1");
timelineMetric.setMetricValues(metricValues);
timelineMetrics.add(timelineMetric);
timelineMetricsIgniteCache.putMetrics(timelineMetrics, metricMetadataManagerMock);
Map<TimelineClusterMetric, MetricClusterAggregate> aggregateMap = timelineMetricsIgniteCache.evictMetricAggregates(startTime, startTime + 120*seconds);
Assert.assertEquals(aggregateMap.size(), 6);
TimelineClusterMetric timelineClusterMetric = new TimelineClusterMetric(timelineMetric.getMetricName(),
timelineMetric.getAppId(), timelineMetric.getInstanceId(), startTime + 90*seconds);
Assert.assertTrue(aggregateMap.containsKey(timelineClusterMetric));
Assert.assertEquals(11.0, aggregateMap.get(timelineClusterMetric).getSum());
timelineClusterMetric = new TimelineClusterMetric("app_metric",
"appIdForHostsAggr", "instance1", startTime + 90*seconds);
Assert.assertTrue(aggregateMap.containsKey(timelineClusterMetric));
Assert.assertEquals(4.0, aggregateMap.get(timelineClusterMetric).getSum());
timelineClusterMetric = new TimelineClusterMetric("host_metric",
"appIdForHostsAggr", "instance1", startTime + 90*seconds);
Assert.assertTrue(aggregateMap.containsKey(timelineClusterMetric));
Assert.assertEquals(5.0, aggregateMap.get(timelineClusterMetric).getSum());
Assert.assertEquals(0d, timelineMetricsIgniteCache.getPointInTimeCacheMetrics().get("Cluster_KeySize"));
}
}