blob: 7b29fa49a1c7602762464bf621c72c5f730bc612 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.heron.metricscachemgr.metricscache;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import org.junit.Test;
import org.apache.heron.metricscachemgr.metricscache.query.MetricDatum;
import org.apache.heron.metricscachemgr.metricscache.query.MetricRequest;
import org.apache.heron.metricscachemgr.metricscache.query.MetricResponse;
import org.apache.heron.metricscachemgr.metricscache.query.MetricTimeRangeValue;
import org.apache.heron.proto.tmaster.TopologyMaster;
import org.apache.heron.spi.metricsmgr.metrics.MetricsFilter;
import static org.apache.heron.metricscachemgr.metricscache.query.MetricGranularity.RAW;
import static org.junit.Assert.assertEquals;
public class CacheCoreTest {
// test target
private static CacheCore cacheCore;
// cache timestamp
private static long now;
// aggregation type
private static MetricsFilter metricsFilter;
// sort MetricTimeRangeValue
private static Comparator<MetricTimeRangeValue> timeRangeValueComparator;
// sort MetricDatum
private static Comparator<MetricDatum> datumComparator;
private static void assertMetricValue(
List<MetricTimeRangeValue> expected, List<MetricTimeRangeValue> actualIn) {
List<MetricTimeRangeValue> actual = new ArrayList<>(actualIn);
actual.sort(timeRangeValueComparator);
int len = expected.size();
assertEquals(len, actual.size());
for (int i = 0; i < len; i++) {
MetricTimeRangeValue expectedVal = expected.get(i);
MetricTimeRangeValue actualVal = actual.get(i);
assertEquals(expectedVal.getStartTime(), actualVal.getStartTime());
assertEquals(expectedVal.getEndTime(), actualVal.getEndTime());
assertEquals(expectedVal.getValue(), actualVal.getValue());
}
}
private static void assertMetricResponse(
List<MetricDatum> metricListIn, MetricDatum... metricData) {
List<MetricDatum> metricList = new ArrayList<>(metricListIn);
assertEquals(metricData.length, metricList.size());
metricList.sort(datumComparator);
for (int i = 0; i < metricData.length; i++) {
MetricDatum expected = metricData[i];
MetricDatum actual = metricList.get(i);
assertEquals(expected.getComponentName(), actual.getComponentName());
assertEquals(expected.getInstanceId(), actual.getInstanceId());
assertEquals(expected.getMetricName(), actual.getMetricName());
assertMetricValue(expected.getMetricValue(), actual.getMetricValue());
}
}
private void prepareDataForHashIndex() {
// create cache with time window 100 seconds, bucket size 30 seconds and no exception store.
// the cache should be initialized with 4 buckets:
// bucket 1: [now-100 seconds ~ now-70 seconds)
// bucket 2: [now-70 seconds ~ now-40 seconds)
// bucket 3: [now-40 seconds ~ now-10 seconds)
// bucket 4: [now-10 seconds ~ now]
cacheCore = new CacheCore(Duration.ofSeconds(100), Duration.ofSeconds(30), 0);
// current timestamp used as time origin
// although it may be slightly different from the time origin
// in the CacheCore initialization.
now = System.currentTimeMillis();
TopologyMaster.PublishMetrics.Builder builder = TopologyMaster.PublishMetrics.newBuilder();
// should be in bucket 1
long ts = now - 90 * 1000;
String[] components = new String[]{
"c1", "c2"
};
String[] instances = new String[]{
"i1", "i2"
};
String[] metrics = new String[]{
"m1", "m2"
};
String[] vals = new String[]{
"0.1", "0.2", "0.3", "0.4", "0.5", "0.6", "0.7", "0.8"
};
int valIdx = 0;
for (String component : components) {
for (String instance : instances) {
for (String metric : metrics) {
builder.addMetrics(TopologyMaster.MetricDatum.newBuilder()
.setTimestamp(ts)
.setComponentName(component).setInstanceId(instance)
.setName(metric)
.setValue(vals[valIdx++]));
}
}
}
cacheCore.addMetricException(builder.build());
metricsFilter = new MetricsFilter();
metricsFilter.setMetricToType("m1", MetricsFilter.MetricAggregationType.SUM);
metricsFilter.setMetricToType("m2", MetricsFilter.MetricAggregationType.SUM);
datumComparator =
new Comparator<MetricDatum>() {
@Override
public int compare(MetricDatum o1,
MetricDatum o2) {
if (!o1.getComponentName().equals(o2.getComponentName())) {
return o1.getComponentName().compareTo(o2.getComponentName());
}
if (!o1.getInstanceId().equals(o2.getInstanceId())) {
return o1.getInstanceId().compareTo(o2.getInstanceId());
}
if (!o1.getMetricName().equals(o2.getMetricName())) {
return o1.getMetricName().compareTo(o2.getMetricName());
}
return 0;
}
};
}
/*
* query 1 metric
*/
@Test
public void test1() {
prepareDataForHashIndex();
long startTime = now - 95 * 1000;
long endTime = now - 85 * 1000;
Map<String, Set<String>> componentNameInstanceId = new HashMap<>();
componentNameInstanceId.put("c1", new HashSet<String>());
componentNameInstanceId.get("c1").add("i1");
Set<String> metricNames = new HashSet<String>();
metricNames.add("m1");
MetricRequest request = new MetricRequest(componentNameInstanceId, metricNames,
startTime, endTime, RAW);
MetricResponse response = cacheCore.getMetrics(request, metricsFilter);
// there is only one <component, instance, metric> tuple
assertMetricResponse(response.getMetricList(),
new MetricDatum("c1", "i1", "m1", Arrays.asList(
new MetricTimeRangeValue(now - 90 * 1000, now - 90 * 1000, "0.1")
))
);
}
/*
* query instances: null
*/
@Test
public void testInstanceNull() {
prepareDataForHashIndex();
long startTime = now - 95 * 1000;
long endTime = now - 85 * 1000;
Map<String, Set<String>> componentNameInstanceId = new HashMap<>();
componentNameInstanceId.put("c1", null);
Set<String> metricNames = new HashSet<String>();
metricNames.add("m1");
MetricRequest request = new MetricRequest(componentNameInstanceId, metricNames,
startTime, endTime, RAW);
MetricResponse response = cacheCore.getMetrics(request, metricsFilter);
// there is 2 <component, instance, metric> tuples
MetricDatum[] expected = new MetricDatum[]{
new MetricDatum("c1", "i1", "m1", Arrays.asList(
// there should be 1 metric for each instance
new MetricTimeRangeValue(now - 90 * 1000, now - 90 * 1000, "0.1")
)),
new MetricDatum("c1", "i2", "m1", Arrays.asList(
new MetricTimeRangeValue(now - 90 * 1000, now - 90 * 1000, "0.3")
))
};
assertMetricResponse(response.getMetricList(), expected);
}
/*
* query instances: i1, i2
*/
@Test
public void testInstances() {
prepareDataForHashIndex();
long startTime = now - 95 * 1000;
long endTime = now - 85 * 1000;
Map<String, Set<String>> componentNameInstanceId = new HashMap<>();
componentNameInstanceId.put("c1", new HashSet<String>());
componentNameInstanceId.get("c1").add("i1");
componentNameInstanceId.get("c1").add("i2");
Set<String> metricNames = new HashSet<String>();
metricNames.add("m1");
MetricRequest request = new MetricRequest(componentNameInstanceId, metricNames,
startTime, endTime, RAW);
MetricResponse response = cacheCore.getMetrics(request, metricsFilter);
// there is 2 <component, instance, metric> tuples
MetricDatum[] expected = new MetricDatum[]{
new MetricDatum("c1", "i1", "m1", Arrays.asList(
// there should be 1 metric for each instance
new MetricTimeRangeValue(now - 90 * 1000, now - 90 * 1000, "0.1")
)),
new MetricDatum("c1", "i2", "m1", Arrays.asList(
new MetricTimeRangeValue(now - 90 * 1000, now - 90 * 1000, "0.3")
))
};
assertMetricResponse(response.getMetricList(), expected);
}
/*
* query instances: empty
*/
@Test
public void testInstanceEmpty() {
prepareDataForHashIndex();
long startTime = now - 95 * 1000;
long endTime = now - 85 * 1000;
Map<String, Set<String>> componentNameInstanceId = new HashMap<>();
componentNameInstanceId.put("c1", new HashSet<String>());
Set<String> metricNames = new HashSet<String>();
metricNames.add("m1");
MetricRequest request = new MetricRequest(componentNameInstanceId, metricNames,
startTime, endTime, RAW);
MetricResponse response = cacheCore.getMetrics(request, metricsFilter);
// there is 0 <component, instance, metric> tuples
assertEquals(response.getMetricList().size(), 0);
}
/*
* query components: null
*/
@Test
public void testComponentNull() {
prepareDataForHashIndex();
long startTime = now - 95 * 1000;
long endTime = now - 85 * 1000;
Set<String> metricNames = new HashSet<String>();
metricNames.add("m1");
MetricRequest request = new MetricRequest(null, metricNames, startTime, endTime, RAW);
MetricResponse response = cacheCore.getMetrics(request, metricsFilter);
// there is 4 <component, instance, metric> tuples
MetricDatum[] expected = new MetricDatum[]{
new MetricDatum("c1", "i1", "m1", Arrays.asList(
// there should be 1 metric for each instance
new MetricTimeRangeValue(now - 90 * 1000, now - 90 * 1000, "0.1")
)),
new MetricDatum("c1", "i2", "m1", Arrays.asList(
new MetricTimeRangeValue(now - 90 * 1000, now - 90 * 1000, "0.3")
)),
new MetricDatum("c2", "i1", "m1", Arrays.asList(
new MetricTimeRangeValue(now - 90 * 1000, now - 90 * 1000, "0.5")
)),
new MetricDatum("c2", "i2", "m1", Arrays.asList(
new MetricTimeRangeValue(now - 90 * 1000, now - 90 * 1000, "0.7")
))
};
assertMetricResponse(response.getMetricList(), expected);
}
/*
* query components: c1, c2
*/
@Test
public void testComponent() {
prepareDataForHashIndex();
long startTime = now - 95 * 1000;
long endTime = now - 85 * 1000;
Map<String, Set<String>> componentNameInstanceId = new HashMap<>();
componentNameInstanceId.put("c1", null);
componentNameInstanceId.put("c2", null);
Set<String> metricNames = new HashSet<String>();
metricNames.add("m1");
MetricRequest request = new MetricRequest(componentNameInstanceId, metricNames,
startTime, endTime, RAW);
MetricResponse response = cacheCore.getMetrics(request, metricsFilter);
// there is 4 <component, instance, metric> tuples
MetricDatum[] expected = new MetricDatum[]{
new MetricDatum("c1", "i1", "m1", Arrays.asList(
// there should be 1 metric for each instance
new MetricTimeRangeValue(now - 90 * 1000, now - 90 * 1000, "0.1")
)),
new MetricDatum("c1", "i2", "m1", Arrays.asList(
new MetricTimeRangeValue(now - 90 * 1000, now - 90 * 1000, "0.3")
)),
new MetricDatum("c2", "i1", "m1", Arrays.asList(
new MetricTimeRangeValue(now - 90 * 1000, now - 90 * 1000, "0.5")
)),
new MetricDatum("c2", "i2", "m1", Arrays.asList(
new MetricTimeRangeValue(now - 90 * 1000, now - 90 * 1000, "0.7")
))
};
assertMetricResponse(response.getMetricList(), expected);
}
/*
* query components: empty
*/
@Test
public void testComponents() {
prepareDataForHashIndex();
long startTime = now - 95 * 1000;
long endTime = now - 85 * 1000;
Map<String, Set<String>> componentNameInstanceId = new HashMap<>();
Set<String> metricNames = new HashSet<String>();
metricNames.add("m1");
MetricRequest request = new MetricRequest(componentNameInstanceId, metricNames,
startTime, endTime, RAW);
MetricResponse response = cacheCore.getMetrics(request, metricsFilter);
// there is 0 <component, instance, metric> tuples
assertEquals(response.getMetricList().size(), 0);
}
/*
* query metrics: m1, m2
*/
@Test
public void testMetricsSameComponentInstance() {
prepareDataForHashIndex();
long startTime = now - 95 * 1000;
long endTime = now - 85 * 1000;
Map<String, Set<String>> componentNameInstanceId = new HashMap<>();
componentNameInstanceId.put("c1", new HashSet<String>());
componentNameInstanceId.get("c1").add("i1");
Set<String> metricNames = new HashSet<String>();
metricNames.add("m1");
metricNames.add("m2");
MetricRequest request = new MetricRequest(componentNameInstanceId, metricNames,
startTime, endTime, RAW);
MetricResponse response = cacheCore.getMetrics(request, metricsFilter);
// there is 2 <component, instance, metric> tuples
MetricDatum[] expected = new MetricDatum[]{
new MetricDatum("c1", "i1", "m1", Arrays.asList(
// there should be 1 metric for each instance
new MetricTimeRangeValue(now - 90 * 1000, now - 90 * 1000, "0.1")
)),
new MetricDatum("c1", "i1", "m2", Arrays.asList(
new MetricTimeRangeValue(now - 90 * 1000, now - 90 * 1000, "0.2")
))
};
assertMetricResponse(response.getMetricList(), expected);
}
/*
* query metrics: c1, c2, i1, m1, m2
*/
@Test
public void testMetrics() {
prepareDataForHashIndex();
long startTime = now - 95 * 1000;
long endTime = now - 85 * 1000;
Map<String, Set<String>> componentNameInstanceId = new HashMap<>();
componentNameInstanceId.put("c1", new HashSet<String>());
componentNameInstanceId.get("c1").add("i1");
componentNameInstanceId.put("c2", new HashSet<String>());
componentNameInstanceId.get("c2").add("i1");
Set<String> metricNames = new HashSet<String>();
metricNames.add("m1");
metricNames.add("m2");
MetricRequest request = new MetricRequest(componentNameInstanceId, metricNames,
startTime, endTime, RAW);
MetricResponse response = cacheCore.getMetrics(request, metricsFilter);
// there is 4 <component, instance, metric> tuples
MetricDatum[] expected = new MetricDatum[]{
new MetricDatum("c1", "i1", "m1", Arrays.asList(
// there should be 1 metric for each instance
new MetricTimeRangeValue(now - 90 * 1000, now - 90 * 1000, "0.1")
)),
new MetricDatum("c1", "i1", "m2", Arrays.asList(
new MetricTimeRangeValue(now - 90 * 1000, now - 90 * 1000, "0.2")
)),
new MetricDatum("c2", "i1", "m1", Arrays.asList(
new MetricTimeRangeValue(now - 90 * 1000, now - 90 * 1000, "0.5")
)),
new MetricDatum("c2", "i1", "m2", Arrays.asList(
new MetricTimeRangeValue(now - 90 * 1000, now - 90 * 1000, "0.6")
))
};
assertMetricResponse(response.getMetricList(), expected);
}
/*
* query metrics: null
*/
@Test
public void testMetricsNull() {
prepareDataForHashIndex();
long startTime = now - 95 * 1000;
long endTime = now - 85 * 1000;
Map<String, Set<String>> componentNameInstanceId = new HashMap<>();
componentNameInstanceId.put("c1", new HashSet<String>());
componentNameInstanceId.get("c1").add("i1");
MetricRequest request =
new MetricRequest(componentNameInstanceId, null, startTime, endTime, RAW);
MetricResponse response = cacheCore.getMetrics(request, metricsFilter);
// there is 2 <component, instance, metric> tuples
MetricDatum[] expected = new MetricDatum[]{
new MetricDatum("c1", "i1", "m1", Arrays.asList(
// there should be 1 metric for each instance
new MetricTimeRangeValue(now - 90 * 1000, now - 90 * 1000, "0.1")
)),
new MetricDatum("c1", "i1", "m2", Arrays.asList(
new MetricTimeRangeValue(now - 90 * 1000, now - 90 * 1000, "0.2")
))
};
assertMetricResponse(response.getMetricList(), expected);
}
/*
* query metrics: empty
*/
@Test
public void testMetricsEmpty() {
prepareDataForHashIndex();
long startTime = now - 95 * 1000;
long endTime = now - 85 * 1000;
Map<String, Set<String>> componentNameInstanceId = new HashMap<>();
componentNameInstanceId.put("c1", new HashSet<String>());
componentNameInstanceId.get("c1").add("i1");
Set<String> metricNames = new HashSet<String>();
MetricRequest request = new MetricRequest(componentNameInstanceId, metricNames,
startTime, endTime, RAW);
MetricResponse response = cacheCore.getMetrics(request, metricsFilter);
// there is 0 <component, instance, metric> tuples
assertEquals(response.getMetricList().size(), 0);
}
private void prepareDataForTreeIndex() {
// create cache with time window 100 seconds, bucket size 30 seconds and no exception store.
// the cache should be initialized with 4 buckets:
// bucket 1: [now-100 seconds ~ now-70 seconds)
// bucket 2: [now-70 seconds ~ now-40 seconds)
// bucket 3: [now-40 seconds ~ now-10 seconds)
// bucket 4: [now-10 seconds ~ now]
cacheCore = new CacheCore(Duration.ofSeconds(100), Duration.ofSeconds(30), 0);
// current timestamp used as time origin
// although it may be slightly different from the time origin
// in the CacheCore initialization.
now = System.currentTimeMillis();
TopologyMaster.PublishMetrics.Builder builder = TopologyMaster.PublishMetrics.newBuilder();
long[] ts = new long[]{
// the timestamp falls outside cache time window. too old to be in the cache
now - 120 * 1000,
// should be in bucket 1
now - 90 * 1000,
// should be in bucket 1
now - 80 * 1000,
// should be in bucket 2
now - 60 * 1000,
// should be in bucket 2
now - 50 * 1000,
// should be in bucket 3
now - 30 * 1000,
// should be in bucket 3
now - 20 * 1000,
// should be in bucket 4
now
};
String[] vals = new String[]{
"0.0", "0.1", "0.2", "0.3", "0.4", "0.5", "0.6", "0.7"
};
for (int i = 0; i < ts.length; i++) {
builder.addMetrics(TopologyMaster.MetricDatum.newBuilder()
.setTimestamp(ts[i])
.setComponentName("c1").setInstanceId("i1")
.setName("m1")
.setValue(vals[i]));
}
cacheCore.addMetricException(builder.build());
// initialization
metricsFilter = new MetricsFilter();
metricsFilter.setMetricToType("m1", MetricsFilter.MetricAggregationType.SUM);
timeRangeValueComparator =
new Comparator<MetricTimeRangeValue>() {
@Override
public int compare(MetricTimeRangeValue o1,
MetricTimeRangeValue o2) {
return (int) (o1.getStartTime() - o2.getStartTime());
}
};
}
/*
* query 1 bucket
*/
@Test
public void testTreeIndex1() {
prepareDataForTreeIndex();
long startTime = now - 95 * 1000;
long endTime = now - 75 * 1000;
MetricRequest request = new MetricRequest(null, null, startTime, endTime, RAW);
MetricResponse response = cacheCore.getMetrics(request, metricsFilter);
// there is only one <component, instance, metric> tuple
List<MetricDatum> metricList = response.getMetricList();
assertEquals(metricList.size(), 1);
// there should be 2 metrics
List<MetricTimeRangeValue> expected = Arrays.asList(
new MetricTimeRangeValue(now - 90 * 1000, now - 90 * 1000, "0.1"),
new MetricTimeRangeValue(now - 80 * 1000, now - 80 * 1000, "0.2")
);
assertMetricValue(expected, metricList.get(0).getMetricValue());
}
/*
* query 2 buckets
*/
@Test
public void testTreeIndex2() {
prepareDataForTreeIndex();
long startTime = now - 95 * 1000;
long endTime = now - 45 * 1000;
MetricRequest request = new MetricRequest(null, null, startTime, endTime, RAW);
MetricResponse response = cacheCore.getMetrics(request, metricsFilter);
// there is only one <component, instance, metric> tuple
List<MetricDatum> metricList = response.getMetricList();
assertEquals(metricList.size(), 1);
// there should be 4 metrics
List<MetricTimeRangeValue> expected = Arrays.asList(
new MetricTimeRangeValue(now - 90 * 1000, now - 90 * 1000, "0.1"),
new MetricTimeRangeValue(now - 80 * 1000, now - 80 * 1000, "0.2"),
new MetricTimeRangeValue(now - 60 * 1000, now - 60 * 1000, "0.3"),
new MetricTimeRangeValue(now - 50 * 1000, now - 50 * 1000, "0.4")
);
assertMetricValue(expected, metricList.get(0).getMetricValue());
}
/*
* query all buckets
*/
@Test
public void testTreeIndexAll() {
prepareDataForTreeIndex();
long startTime = now - 200 * 1000;
long endTime = now;
MetricRequest request = new MetricRequest(null, null, startTime, endTime, RAW);
MetricResponse response = cacheCore.getMetrics(request, metricsFilter);
// there is only one <component, instance, metric> tuple
List<MetricDatum> metricList = response.getMetricList();
assertEquals(metricList.size(), 1);
// there should be 7 metrics
List<MetricTimeRangeValue> expected = Arrays.asList(
new MetricTimeRangeValue(now - 90 * 1000, now - 90 * 1000, "0.1"),
new MetricTimeRangeValue(now - 80 * 1000, now - 80 * 1000, "0.2"),
new MetricTimeRangeValue(now - 60 * 1000, now - 60 * 1000, "0.3"),
new MetricTimeRangeValue(now - 50 * 1000, now - 50 * 1000, "0.4"),
new MetricTimeRangeValue(now - 30 * 1000, now - 30 * 1000, "0.5"),
new MetricTimeRangeValue(now - 20 * 1000, now - 20 * 1000, "0.6"),
new MetricTimeRangeValue(now, now, "0.7")
);
assertMetricValue(expected, metricList.get(0).getMetricValue());
}
/*
* query the last bucket
*/
@Test
public void testTreeIndexLast() {
prepareDataForTreeIndex();
long startTime = now - 5 * 1000;
long endTime = now;
MetricRequest request = new MetricRequest(null, null, startTime, endTime, RAW);
MetricResponse response = cacheCore.getMetrics(request, metricsFilter);
// there is only one <component, instance, metric> tuple
List<MetricDatum> metricList = response.getMetricList();
assertEquals(metricList.size(), 1);
// there should be 1 metric
List<MetricTimeRangeValue> list = new ArrayList<>(metricList.get(0).getMetricValue());
assertEquals(list.size(), 1);
// check value
assertEquals(list.get(0).getValue(), "0.7");
}
@Test
public void testPurge() throws InterruptedException {
// create cache with time window 10 seconds, bucket size 3 seconds and no exception store.
// the cache should be initialized with 4 buckets:
// bucket 1: [now-10 seconds ~ now-7 seconds)
// bucket 2: [now-7 seconds ~ now-4 seconds)
// bucket 3: [now-4 seconds ~ now-1 seconds)
// bucket 4: [now-1 seconds ~ now]
FakeTicker ticker = new FakeTicker();
cacheCore = new CacheCore(Duration.ofSeconds(10), Duration.ofSeconds(3), 0, ticker);
// current timestamp used as time origin
// although it may be slightly different from the time origin
// in the CacheCore initialization.
now = ticker.read();
TopologyMaster.PublishMetrics.Builder builder = TopologyMaster.PublishMetrics.newBuilder();
// should be in bucket 1
long ts = now - 9 * 1000;
// c1-i1, m1: 0.1
builder.addMetrics(TopologyMaster.MetricDatum.newBuilder()
.setTimestamp(ts)
.setComponentName("c1").setInstanceId("i1")
.setName("m1")
.setValue("0.1"));
cacheCore.addMetricException(builder.build());
metricsFilter = new MetricsFilter();
metricsFilter.setMetricToType("m1", MetricsFilter.MetricAggregationType.SUM);
// query before purge
long startTime = now - 20 * 1000;
long endTime = now;
HashMap<String, Set<String>> componentNameInstanceId = new HashMap<>();
componentNameInstanceId.put("c1", new HashSet<String>());
componentNameInstanceId.get("c1").add("i1");
Set<String> metricNames = new HashSet<>();
metricNames.add("m1");
MetricRequest request = new MetricRequest(componentNameInstanceId, metricNames,
startTime, endTime, RAW);
MetricResponse response = cacheCore.getMetrics(request, metricsFilter);
// there is only one <component, instance, metric> tuple
assertMetricResponse(response.getMetricList(),
new MetricDatum("c1", "i1", "m1", Arrays.asList(
// there should be 1 metric for each instance
new MetricTimeRangeValue(now - 9 * 1000, now - 9 * 1000, "0.1")
))
);
// purge
ticker.advance(Duration.ofSeconds(3)); // assure more than 1 bucket is purged
cacheCore.purge();
// query after purge
response = cacheCore.getMetrics(request, metricsFilter);
// there is 1 <component, instance, metric> tuple: how to trim the gone metric in metadata?
assertMetricResponse(response.getMetricList(),
new MetricDatum("c1", "i1", "m1", Arrays.asList(new MetricTimeRangeValue[]{}))
);
// insert-select after purge
TopologyMaster.PublishMetrics.Builder builder2 = TopologyMaster.PublishMetrics.newBuilder();
// should be in bucket 1
ts = now - 3 * 1000;
// c1-i1, m1: 0.1
builder2.addMetrics(TopologyMaster.MetricDatum.newBuilder()
.setTimestamp(ts)
.setComponentName("c1").setInstanceId("i1")
.setName("m1")
.setValue("0.2"));
cacheCore.addMetricException(builder2.build());
response = cacheCore.getMetrics(request, metricsFilter);
// there is only one <component, instance, metric> tuple
assertMetricResponse(response.getMetricList(),
new MetricDatum("c1", "i1", "m1", Arrays.asList(
// there should be 1 metric for each instance
new MetricTimeRangeValue(now - 3 * 1000, now - 3 * 1000, "0.2")
))
);
}
private static final class FakeTicker extends CacheCore.Ticker {
private AtomicLong now = new AtomicLong(System.currentTimeMillis());
void advance(Duration duration) {
now.addAndGet(duration.toMillis());
}
@Override
long read() {
return now.get();
}
}
}