blob: 7c4d989ce04de36cb0765006e7c6d98d78214458 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.timelineservice.collector;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetricOperation;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollector.AggregationStatusTable;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter;
import org.junit.Test;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
/**
* Tests functionality of timeline collector.
*/
public class TestTimelineCollector {
private TimelineEntities generateTestEntities(int groups, int entities) {
TimelineEntities te = new TimelineEntities();
for (int j = 0; j < groups; j++) {
for (int i = 0; i < entities; i++) {
TimelineEntity entity = new TimelineEntity();
String containerId = "container_1000178881110_2002_" + i;
entity.setId(containerId);
String entityType = "TEST_" + j;
entity.setType(entityType);
long cTime = 1425016501000L;
entity.setCreatedTime(cTime);
// add metrics
Set<TimelineMetric> metrics = new HashSet<>();
TimelineMetric m1 = new TimelineMetric();
m1.setId("HDFS_BYTES_WRITE");
m1.setRealtimeAggregationOp(TimelineMetricOperation.SUM);
long ts = System.currentTimeMillis();
m1.addValue(ts - 20000, 100L);
metrics.add(m1);
TimelineMetric m2 = new TimelineMetric();
m2.setId("VCORES_USED");
m2.setRealtimeAggregationOp(TimelineMetricOperation.SUM);
m2.addValue(ts - 20000, 3L);
metrics.add(m2);
// m3 should not show up in the aggregation
TimelineMetric m3 = new TimelineMetric();
m3.setId("UNRELATED_VALUES");
m3.addValue(ts - 20000, 3L);
metrics.add(m3);
TimelineMetric m4 = new TimelineMetric();
m4.setId("TXN_FINISH_TIME");
m4.setRealtimeAggregationOp(TimelineMetricOperation.MAX);
m4.addValue(ts - 20000, i);
metrics.add(m4);
entity.addMetrics(metrics);
te.addEntity(entity);
}
}
return te;
}
@Test
public void testAggregation() throws Exception {
// Test aggregation with multiple groups.
int groups = 3;
int n = 50;
TimelineEntities testEntities = generateTestEntities(groups, n);
TimelineEntity resultEntity = TimelineCollector.aggregateEntities(
testEntities, "test_result", "TEST_AGGR", true);
assertEquals(resultEntity.getMetrics().size(), groups * 3);
for (int i = 0; i < groups; i++) {
Set<TimelineMetric> metrics = resultEntity.getMetrics();
for (TimelineMetric m : metrics) {
if (m.getId().startsWith("HDFS_BYTES_WRITE")) {
assertEquals(100 * n, m.getSingleDataValue().intValue());
} else if (m.getId().startsWith("VCORES_USED")) {
assertEquals(3 * n, m.getSingleDataValue().intValue());
} else if (m.getId().startsWith("TXN_FINISH_TIME")) {
assertEquals(n - 1, m.getSingleDataValue());
} else {
fail("Unrecognized metric! " + m.getId());
}
}
}
// Test aggregation with a single group.
TimelineEntities testEntities1 = generateTestEntities(1, n);
TimelineEntity resultEntity1 = TimelineCollector.aggregateEntities(
testEntities1, "test_result", "TEST_AGGR", false);
assertEquals(resultEntity1.getMetrics().size(), 3);
Set<TimelineMetric> metrics = resultEntity1.getMetrics();
for (TimelineMetric m : metrics) {
if (m.getId().equals("HDFS_BYTES_WRITE")) {
assertEquals(100 * n, m.getSingleDataValue().intValue());
} else if (m.getId().equals("VCORES_USED")) {
assertEquals(3 * n, m.getSingleDataValue().intValue());
} else if (m.getId().equals("TXN_FINISH_TIME")) {
assertEquals(n - 1, m.getSingleDataValue());
} else {
fail("Unrecognized metric! " + m.getId());
}
}
}
/**
* Test TimelineCollector's interaction with TimelineWriter upon
* putEntity() calls.
*/
@Test
public void testPutEntity() throws IOException {
TimelineWriter writer = mock(TimelineWriter.class);
TimelineCollector collector = new TimelineCollectorForTest(writer);
TimelineEntities entities = generateTestEntities(1, 1);
collector.putEntities(
entities, UserGroupInformation.createRemoteUser("test-user"));
verify(writer, times(1)).write(any(TimelineCollectorContext.class),
any(TimelineEntities.class), any(UserGroupInformation.class));
verify(writer, times(1)).flush();
}
/**
* Test TimelineCollector's interaction with TimelineWriter upon
* putEntityAsync() calls.
*/
@Test
public void testPutEntityAsync() throws IOException {
TimelineWriter writer = mock(TimelineWriter.class);
TimelineCollector collector = new TimelineCollectorForTest(writer);
TimelineEntities entities = generateTestEntities(1, 1);
collector.putEntitiesAsync(
entities, UserGroupInformation.createRemoteUser("test-user"));
verify(writer, times(1)).write(any(TimelineCollectorContext.class),
any(TimelineEntities.class), any(UserGroupInformation.class));
verify(writer, never()).flush();
}
private static class TimelineCollectorForTest extends TimelineCollector {
private final TimelineCollectorContext context =
new TimelineCollectorContext();
TimelineCollectorForTest(TimelineWriter writer) {
super("TimelineCollectorForTest");
setWriter(writer);
}
@Override
public TimelineCollectorContext getTimelineEntityContext() {
return context;
}
}
private static TimelineEntity createEntity(String id, String type) {
TimelineEntity entity = new TimelineEntity();
entity.setId(id);
entity.setType(type);
return entity;
}
private static TimelineMetric createDummyMetric(long ts, Long value) {
TimelineMetric metric = new TimelineMetric();
metric.setId("dummy_metric");
metric.addValue(ts, value);
metric.setRealtimeAggregationOp(TimelineMetricOperation.SUM);
return metric;
}
@Test
public void testClearPreviousEntitiesOnAggregation() throws Exception {
final long ts = System.currentTimeMillis();
TimelineCollector collector = new TimelineCollector("") {
@Override
public TimelineCollectorContext getTimelineEntityContext() {
return new TimelineCollectorContext("cluster", "user", "flow", "1",
1L, ApplicationId.newInstance(ts, 1).toString());
}
};
collector.init(new Configuration());
collector.setWriter(mock(TimelineWriter.class));
// Put 5 entities with different metric values.
TimelineEntities entities = new TimelineEntities();
for (int i = 1; i <=5; i++) {
TimelineEntity entity = createEntity("e" + i, "type");
entity.addMetric(createDummyMetric(ts + i, Long.valueOf(i * 50)));
entities.addEntity(entity);
}
collector.putEntities(entities, UserGroupInformation.getCurrentUser());
TimelineCollectorContext currContext = collector.getTimelineEntityContext();
// Aggregate the entities.
Map<String, AggregationStatusTable> aggregationGroups
= collector.getAggregationGroups();
assertEquals(Sets.newHashSet("type"), aggregationGroups.keySet());
TimelineEntity aggregatedEntity = TimelineCollector.
aggregateWithoutGroupId(aggregationGroups, currContext.getAppId(),
TimelineEntityType.YARN_APPLICATION.toString());
TimelineMetric aggregatedMetric =
aggregatedEntity.getMetrics().iterator().next();
assertEquals(750L, aggregatedMetric.getValues().values().iterator().next());
assertEquals(TimelineMetricOperation.SUM,
aggregatedMetric.getRealtimeAggregationOp());
// Aggregate entities.
aggregatedEntity = TimelineCollector.
aggregateWithoutGroupId(aggregationGroups, currContext.getAppId(),
TimelineEntityType.YARN_APPLICATION.toString());
aggregatedMetric = aggregatedEntity.getMetrics().iterator().next();
// No values aggregated as no metrics put for an entity between this
// aggregation and the previous one.
assertTrue(aggregatedMetric.getValues().isEmpty());
assertEquals(TimelineMetricOperation.NOP,
aggregatedMetric.getRealtimeAggregationOp());
// Put 3 entities.
entities = new TimelineEntities();
for (int i = 1; i <=3; i++) {
TimelineEntity entity = createEntity("e" + i, "type");
entity.addMetric(createDummyMetric(System.currentTimeMillis() + i, 50L));
entities.addEntity(entity);
}
aggregationGroups = collector.getAggregationGroups();
collector.putEntities(entities, UserGroupInformation.getCurrentUser());
// Aggregate entities.
aggregatedEntity = TimelineCollector.
aggregateWithoutGroupId(aggregationGroups, currContext.getAppId(),
TimelineEntityType.YARN_APPLICATION.toString());
// Last 3 entities picked up for aggregation.
aggregatedMetric = aggregatedEntity.getMetrics().iterator().next();
assertEquals(150L, aggregatedMetric.getValues().values().iterator().next());
assertEquals(TimelineMetricOperation.SUM,
aggregatedMetric.getRealtimeAggregationOp());
collector.close();
}
}