blob: 74b9e501ca409ba09e93b784838b2cab59d1c353 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve;
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters;
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineCompareFilter;
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineCompareOp;
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList;
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList.Operator;
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelinePrefixFilter;
import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineReaderImpl;
import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
/**
* Tests the FlowRun and FlowActivity Tables.
*/
public class TestHBaseStorageFlowRun {
private static HBaseTestingUtility util;
private static final String METRIC1 = "MAP_SLOT_MILLIS";
private static final String METRIC2 = "HDFS_BYTES_READ";
@BeforeClass
public static void setupBeforeClass() throws Exception {
util = new HBaseTestingUtility();
Configuration conf = util.getConfiguration();
conf.setInt("hfile.format.version", 3);
util.startMiniCluster();
createSchema();
}
private static void createSchema() throws IOException {
TimelineSchemaCreator.createAllTables(util.getConfiguration(), false);
}
@Test
public void checkCoProcessorOff() throws IOException, InterruptedException {
Configuration hbaseConf = util.getConfiguration();
TableName table = TableName.valueOf(hbaseConf.get(
FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME));
Connection conn = null;
conn = ConnectionFactory.createConnection(hbaseConf);
Admin admin = conn.getAdmin();
if (admin == null) {
throw new IOException("Can't check tables since admin is null");
}
if (admin.tableExists(table)) {
// check the regions.
// check in flow run table
util.waitUntilAllRegionsAssigned(table);
HRegionServer server = util.getRSForFirstRegionInTable(table);
List<Region> regions = server.getOnlineRegions(table);
for (Region region : regions) {
assertTrue(TimelineStorageUtils.isFlowRunTable(region.getRegionInfo(),
hbaseConf));
}
}
table = TableName.valueOf(hbaseConf.get(
FlowActivityTable.TABLE_NAME_CONF_NAME,
FlowActivityTable.DEFAULT_TABLE_NAME));
if (admin.tableExists(table)) {
// check the regions.
// check in flow activity table
util.waitUntilAllRegionsAssigned(table);
HRegionServer server = util.getRSForFirstRegionInTable(table);
List<Region> regions = server.getOnlineRegions(table);
for (Region region : regions) {
assertFalse(TimelineStorageUtils.isFlowRunTable(region.getRegionInfo(),
hbaseConf));
}
}
table = TableName.valueOf(hbaseConf.get(
EntityTable.TABLE_NAME_CONF_NAME,
EntityTable.DEFAULT_TABLE_NAME));
if (admin.tableExists(table)) {
// check the regions.
// check in entity run table
util.waitUntilAllRegionsAssigned(table);
HRegionServer server = util.getRSForFirstRegionInTable(table);
List<Region> regions = server.getOnlineRegions(table);
for (Region region : regions) {
assertFalse(TimelineStorageUtils.isFlowRunTable(region.getRegionInfo(),
hbaseConf));
}
}
}
/**
* Writes 4 timeline entities belonging to one flow run through the
* {@link HBaseTimelineWriterImpl}
*
* Checks the flow run table contents
*
* The first entity has a created event, metrics and a finish event.
*
* The second entity has a created event and this is the entity with smallest
* start time. This should be the start time for the flow run.
*
* The third entity has a finish event and this is the entity with the max end
* time. This should be the end time for the flow run.
*
* The fourth entity has a created event which has a start time that is
* greater than min start time.
*
*/
@Test
public void testWriteFlowRunMinMax() throws Exception {
TimelineEntities te = new TimelineEntities();
te.addEntity(TestFlowDataGenerator.getEntity1());
HBaseTimelineWriterImpl hbi = null;
Configuration c1 = util.getConfiguration();
String cluster = "testWriteFlowRunMinMaxToHBase_cluster1";
String user = "testWriteFlowRunMinMaxToHBase_user1";
String flow = "testing_flowRun_flow_name";
String flowVersion = "CF7022C10F1354";
long runid = 1002345678919L;
String appName = "application_100000000000_1111";
long minStartTs = 1425026900000L;
long greaterStartTs = 30000000000000L;
long endTs = 1439750690000L;
TimelineEntity entityMinStartTime = TestFlowDataGenerator
.getEntityMinStartTime(minStartTs);
try {
hbi = new HBaseTimelineWriterImpl(c1);
hbi.init(c1);
hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
// write another entity with the right min start time
te = new TimelineEntities();
te.addEntity(entityMinStartTime);
appName = "application_100000000000_3333";
hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
// writer another entity for max end time
TimelineEntity entityMaxEndTime = TestFlowDataGenerator
.getEntityMaxEndTime(endTs);
te = new TimelineEntities();
te.addEntity(entityMaxEndTime);
appName = "application_100000000000_4444";
hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
// writer another entity with greater start time
TimelineEntity entityGreaterStartTime = TestFlowDataGenerator
.getEntityGreaterStartTime(greaterStartTs);
te = new TimelineEntities();
te.addEntity(entityGreaterStartTime);
appName = "application_1000000000000000_2222";
hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
// flush everything to hbase
hbi.flush();
} finally {
if (hbi != null) {
hbi.close();
}
}
Connection conn = ConnectionFactory.createConnection(c1);
// check in flow run table
Table table1 = conn.getTable(TableName
.valueOf(FlowRunTable.DEFAULT_TABLE_NAME));
// scan the table and see that we get back the right min and max
// timestamps
byte[] startRow = new FlowRunRowKey(cluster, user, flow, runid).getRowKey();
Get g = new Get(startRow);
g.addColumn(FlowRunColumnFamily.INFO.getBytes(),
FlowRunColumn.MIN_START_TIME.getColumnQualifierBytes());
g.addColumn(FlowRunColumnFamily.INFO.getBytes(),
FlowRunColumn.MAX_END_TIME.getColumnQualifierBytes());
Result r1 = table1.get(g);
assertNotNull(r1);
assertTrue(!r1.isEmpty());
Map<byte[], byte[]> values = r1.getFamilyMap(FlowRunColumnFamily.INFO
.getBytes());
assertEquals(2, r1.size());
long starttime = Bytes.toLong(values.get(
FlowRunColumn.MIN_START_TIME.getColumnQualifierBytes()));
assertEquals(minStartTs, starttime);
assertEquals(endTs, Bytes.toLong(values
.get(FlowRunColumn.MAX_END_TIME.getColumnQualifierBytes())));
// use the timeline reader to verify data
HBaseTimelineReaderImpl hbr = null;
try {
hbr = new HBaseTimelineReaderImpl();
hbr.init(c1);
hbr.start();
// get the flow run entity
TimelineEntity entity = hbr.getEntity(
new TimelineReaderContext(cluster, user, flow, runid, null,
TimelineEntityType.YARN_FLOW_RUN.toString(), null),
new TimelineDataToRetrieve());
assertTrue(TimelineEntityType.YARN_FLOW_RUN.matches(entity.getType()));
FlowRunEntity flowRun = (FlowRunEntity)entity;
assertEquals(minStartTs, flowRun.getStartTime());
assertEquals(endTs, flowRun.getMaxEndTime());
} finally {
if (hbr != null) {
hbr.close();
}
}
}
/**
* Writes two application entities of the same flow run. Each application has
* two metrics: slot millis and hdfs bytes read. Each metric has values at two
* timestamps.
*
* Checks the metric values of the flow in the flow run table. Flow metric
* values should be the sum of individual metric values that belong to the
* latest timestamp for that metric
*/
@Test
public void testWriteFlowRunMetricsOneFlow() throws Exception {
String cluster = "testWriteFlowRunMetricsOneFlow_cluster1";
String user = "testWriteFlowRunMetricsOneFlow_user1";
String flow = "testing_flowRun_metrics_flow_name";
String flowVersion = "CF7022C10F1354";
long runid = 1002345678919L;
TimelineEntities te = new TimelineEntities();
TimelineEntity entityApp1 = TestFlowDataGenerator
.getEntityMetricsApp1(System.currentTimeMillis());
te.addEntity(entityApp1);
HBaseTimelineWriterImpl hbi = null;
Configuration c1 = util.getConfiguration();
try {
hbi = new HBaseTimelineWriterImpl(c1);
hbi.init(c1);
String appName = "application_11111111111111_1111";
hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
// write another application with same metric to this flow
te = new TimelineEntities();
TimelineEntity entityApp2 = TestFlowDataGenerator
.getEntityMetricsApp2(System.currentTimeMillis());
te.addEntity(entityApp2);
appName = "application_11111111111111_2222";
hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
hbi.flush();
} finally {
if (hbi != null) {
hbi.close();
}
}
// check flow run
checkFlowRunTable(cluster, user, flow, runid, c1);
// check various batch limits in scanning the table for this flow
checkFlowRunTableBatchLimit(cluster, user, flow, runid, c1);
// use the timeline reader to verify data
HBaseTimelineReaderImpl hbr = null;
try {
hbr = new HBaseTimelineReaderImpl();
hbr.init(c1);
hbr.start();
TimelineEntity entity = hbr.getEntity(
new TimelineReaderContext(cluster, user, flow, runid, null,
TimelineEntityType.YARN_FLOW_RUN.toString(), null),
new TimelineDataToRetrieve());
assertTrue(TimelineEntityType.YARN_FLOW_RUN.matches(entity.getType()));
Set<TimelineMetric> metrics = entity.getMetrics();
assertEquals(2, metrics.size());
for (TimelineMetric metric : metrics) {
String id = metric.getId();
Map<Long, Number> values = metric.getValues();
assertEquals(1, values.size());
Number value = null;
for (Number n : values.values()) {
value = n;
}
switch (id) {
case METRIC1:
assertEquals(141L, value);
break;
case METRIC2:
assertEquals(57L, value);
break;
default:
fail("unrecognized metric: " + id);
}
}
} finally {
if (hbr != null) {
hbr.close();
}
}
}
/*
* checks the batch limits on a scan
*/
void checkFlowRunTableBatchLimit(String cluster, String user,
String flow, long runid, Configuration c1) throws IOException {
Scan s = new Scan();
s.addFamily(FlowRunColumnFamily.INFO.getBytes());
byte[] startRow =
new FlowRunRowKey(cluster, user, flow, runid).getRowKey();
s.setStartRow(startRow);
// set a batch limit
int batchLimit = 2;
s.setBatch(batchLimit);
String clusterStop = cluster + "1";
byte[] stopRow =
new FlowRunRowKey(clusterStop, user, flow, runid).getRowKey();
s.setStopRow(stopRow);
Connection conn = ConnectionFactory.createConnection(c1);
Table table1 = conn
.getTable(TableName.valueOf(FlowRunTable.DEFAULT_TABLE_NAME));
ResultScanner scanner = table1.getScanner(s);
int loopCount = 0;
for (Result result : scanner) {
assertNotNull(result);
assertTrue(!result.isEmpty());
assertTrue(result.rawCells().length <= batchLimit);
Map<byte[], byte[]> values = result
.getFamilyMap(FlowRunColumnFamily.INFO.getBytes());
assertNotNull(values);
assertTrue(values.size() <= batchLimit);
loopCount++;
}
assertTrue(loopCount > 0);
// test with a diff batch limit
s = new Scan();
s.addFamily(FlowRunColumnFamily.INFO.getBytes());
s.setStartRow(startRow);
// set a batch limit
batchLimit = 1;
s.setBatch(batchLimit);
s.setMaxResultsPerColumnFamily(2);
s.setStopRow(stopRow);
scanner = table1.getScanner(s);
loopCount = 0;
for (Result result : scanner) {
assertNotNull(result);
assertTrue(!result.isEmpty());
assertEquals(batchLimit, result.rawCells().length);
Map<byte[], byte[]> values = result
.getFamilyMap(FlowRunColumnFamily.INFO.getBytes());
assertNotNull(values);
assertEquals(batchLimit, values.size());
loopCount++;
}
assertTrue(loopCount > 0);
// test with a diff batch limit
// set it high enough
// we expect back 3 since there are
// column = m!HDFS_BYTES_READ value=57
// column = m!MAP_SLOT_MILLIS value=141
// column min_start_time value=1425016501000
s = new Scan();
s.addFamily(FlowRunColumnFamily.INFO.getBytes());
s.setStartRow(startRow);
// set a batch limit
batchLimit = 100;
s.setBatch(batchLimit);
s.setStopRow(stopRow);
scanner = table1.getScanner(s);
loopCount = 0;
for (Result result : scanner) {
assertNotNull(result);
assertTrue(!result.isEmpty());
assertTrue(result.rawCells().length <= batchLimit);
Map<byte[], byte[]> values = result
.getFamilyMap(FlowRunColumnFamily.INFO.getBytes());
assertNotNull(values);
// assert that with every next invocation
// we get back <= batchLimit values
assertTrue(values.size() <= batchLimit);
assertTrue(values.size() == 3); // see comment above
loopCount++;
}
// should loop through only once
assertTrue(loopCount == 1);
// set it to a negative number
// we expect all 3 back since there are
// column = m!HDFS_BYTES_READ value=57
// column = m!MAP_SLOT_MILLIS value=141
// column min_start_time value=1425016501000
s = new Scan();
s.addFamily(FlowRunColumnFamily.INFO.getBytes());
s.setStartRow(startRow);
// set a batch limit
batchLimit = -671;
s.setBatch(batchLimit);
s.setStopRow(stopRow);
scanner = table1.getScanner(s);
loopCount = 0;
for (Result result : scanner) {
assertNotNull(result);
assertTrue(!result.isEmpty());
assertEquals(3, result.rawCells().length);
Map<byte[], byte[]> values = result
.getFamilyMap(FlowRunColumnFamily.INFO.getBytes());
assertNotNull(values);
// assert that with every next invocation
// we get back <= batchLimit values
assertEquals(3, values.size());
loopCount++;
}
// should loop through only once
assertEquals(1, loopCount);
// set it to 0
// we expect all 3 back since there are
// column = m!HDFS_BYTES_READ value=57
// column = m!MAP_SLOT_MILLIS value=141
// column min_start_time value=1425016501000
s = new Scan();
s.addFamily(FlowRunColumnFamily.INFO.getBytes());
s.setStartRow(startRow);
// set a batch limit
batchLimit = 0;
s.setBatch(batchLimit);
s.setStopRow(stopRow);
scanner = table1.getScanner(s);
loopCount = 0;
for (Result result : scanner) {
assertNotNull(result);
assertTrue(!result.isEmpty());
assertEquals(3, result.rawCells().length);
Map<byte[], byte[]> values = result
.getFamilyMap(FlowRunColumnFamily.INFO.getBytes());
assertNotNull(values);
// assert that with every next invocation
// we get back <= batchLimit values
assertEquals(3, values.size());
loopCount++;
}
// should loop through only once
assertEquals(1, loopCount);
}
private void checkFlowRunTable(String cluster, String user, String flow,
long runid, Configuration c1) throws IOException {
Scan s = new Scan();
s.addFamily(FlowRunColumnFamily.INFO.getBytes());
byte[] startRow = new FlowRunRowKey(cluster, user, flow, runid).getRowKey();
s.setStartRow(startRow);
String clusterStop = cluster + "1";
byte[] stopRow =
new FlowRunRowKey(clusterStop, user, flow, runid).getRowKey();
s.setStopRow(stopRow);
Connection conn = ConnectionFactory.createConnection(c1);
Table table1 = conn.getTable(TableName
.valueOf(FlowRunTable.DEFAULT_TABLE_NAME));
ResultScanner scanner = table1.getScanner(s);
int rowCount = 0;
for (Result result : scanner) {
assertNotNull(result);
assertTrue(!result.isEmpty());
Map<byte[], byte[]> values = result.getFamilyMap(FlowRunColumnFamily.INFO
.getBytes());
rowCount++;
// check metric1
byte[] q = ColumnHelper.getColumnQualifier(
FlowRunColumnPrefix.METRIC.getColumnPrefixBytes(), METRIC1);
assertTrue(values.containsKey(q));
assertEquals(141L, Bytes.toLong(values.get(q)));
// check metric2
assertEquals(3, values.size());
q = ColumnHelper.getColumnQualifier(
FlowRunColumnPrefix.METRIC.getColumnPrefixBytes(), METRIC2);
assertTrue(values.containsKey(q));
assertEquals(57L, Bytes.toLong(values.get(q)));
}
assertEquals(1, rowCount);
}
@Test
public void testWriteFlowRunMetricsPrefix() throws Exception {
String cluster = "testWriteFlowRunMetricsPrefix_cluster1";
String user = "testWriteFlowRunMetricsPrefix_user1";
String flow = "testWriteFlowRunMetricsPrefix_flow_name";
String flowVersion = "CF7022C10F1354";
TimelineEntities te = new TimelineEntities();
TimelineEntity entityApp1 = TestFlowDataGenerator
.getEntityMetricsApp1(System.currentTimeMillis());
te.addEntity(entityApp1);
HBaseTimelineWriterImpl hbi = null;
Configuration c1 = util.getConfiguration();
try {
hbi = new HBaseTimelineWriterImpl(c1);
hbi.init(c1);
String appName = "application_11111111111111_1111";
hbi.write(cluster, user, flow, flowVersion, 1002345678919L, appName, te);
// write another application with same metric to this flow
te = new TimelineEntities();
TimelineEntity entityApp2 = TestFlowDataGenerator
.getEntityMetricsApp2(System.currentTimeMillis());
te.addEntity(entityApp2);
appName = "application_11111111111111_2222";
hbi.write(cluster, user, flow, flowVersion, 1002345678918L, appName, te);
hbi.flush();
} finally {
if (hbi != null) {
hbi.close();
}
}
// use the timeline reader to verify data
HBaseTimelineReaderImpl hbr = null;
try {
hbr = new HBaseTimelineReaderImpl();
hbr.init(c1);
hbr.start();
TimelineFilterList metricsToRetrieve = new TimelineFilterList(
Operator.OR, new TimelinePrefixFilter(TimelineCompareOp.EQUAL,
METRIC1.substring(0, METRIC1.indexOf("_") + 1)));
TimelineEntity entity = hbr.getEntity(
new TimelineReaderContext(cluster, user, flow, 1002345678919L, null,
TimelineEntityType.YARN_FLOW_RUN.toString(), null),
new TimelineDataToRetrieve(null, metricsToRetrieve, null, null));
assertTrue(TimelineEntityType.YARN_FLOW_RUN.matches(entity.getType()));
Set<TimelineMetric> metrics = entity.getMetrics();
assertEquals(1, metrics.size());
for (TimelineMetric metric : metrics) {
String id = metric.getId();
Map<Long, Number> values = metric.getValues();
assertEquals(1, values.size());
Number value = null;
for (Number n : values.values()) {
value = n;
}
switch (id) {
case METRIC1:
assertEquals(40L, value);
break;
default:
fail("unrecognized metric: " + id);
}
}
Set<TimelineEntity> entities = hbr.getEntities(
new TimelineReaderContext(cluster, user, flow, null, null,
TimelineEntityType.YARN_FLOW_RUN.toString(), null),
new TimelineEntityFilters(),
new TimelineDataToRetrieve(null, metricsToRetrieve, null, null));
assertEquals(2, entities.size());
int metricCnt = 0;
for (TimelineEntity timelineEntity : entities) {
metricCnt += timelineEntity.getMetrics().size();
}
assertEquals(2, metricCnt);
} finally {
if (hbr != null) {
hbr.close();
}
}
}
@Test
public void testWriteFlowRunsMetricFields() throws Exception {
String cluster = "testWriteFlowRunsMetricFields_cluster1";
String user = "testWriteFlowRunsMetricFields_user1";
String flow = "testWriteFlowRunsMetricFields_flow_name";
String flowVersion = "CF7022C10F1354";
long runid = 1002345678919L;
TimelineEntities te = new TimelineEntities();
TimelineEntity entityApp1 = TestFlowDataGenerator
.getEntityMetricsApp1(System.currentTimeMillis());
te.addEntity(entityApp1);
HBaseTimelineWriterImpl hbi = null;
Configuration c1 = util.getConfiguration();
try {
hbi = new HBaseTimelineWriterImpl(c1);
hbi.init(c1);
String appName = "application_11111111111111_1111";
hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
// write another application with same metric to this flow
te = new TimelineEntities();
TimelineEntity entityApp2 = TestFlowDataGenerator
.getEntityMetricsApp2(System.currentTimeMillis());
te.addEntity(entityApp2);
appName = "application_11111111111111_2222";
hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
hbi.flush();
} finally {
if (hbi != null) {
hbi.close();
}
}
// check flow run
checkFlowRunTable(cluster, user, flow, runid, c1);
// use the timeline reader to verify data
HBaseTimelineReaderImpl hbr = null;
try {
hbr = new HBaseTimelineReaderImpl();
hbr.init(c1);
hbr.start();
Set<TimelineEntity> entities = hbr.getEntities(
new TimelineReaderContext(cluster, user, flow, runid, null,
TimelineEntityType.YARN_FLOW_RUN.toString(), null),
new TimelineEntityFilters(),
new TimelineDataToRetrieve());
assertEquals(1, entities.size());
for (TimelineEntity timelineEntity : entities) {
assertEquals(0, timelineEntity.getMetrics().size());
}
entities = hbr.getEntities(
new TimelineReaderContext(cluster, user, flow, runid, null,
TimelineEntityType.YARN_FLOW_RUN.toString(), null),
new TimelineEntityFilters(), new TimelineDataToRetrieve(null, null,
EnumSet.of(Field.METRICS), null));
assertEquals(1, entities.size());
for (TimelineEntity timelineEntity : entities) {
Set<TimelineMetric> timelineMetrics = timelineEntity.getMetrics();
assertEquals(2, timelineMetrics.size());
for (TimelineMetric metric : timelineMetrics) {
String id = metric.getId();
Map<Long, Number> values = metric.getValues();
assertEquals(1, values.size());
Number value = null;
for (Number n : values.values()) {
value = n;
}
switch (id) {
case METRIC1:
assertEquals(141L, value);
break;
case METRIC2:
assertEquals(57L, value);
break;
default:
fail("unrecognized metric: " + id);
}
}
}
} finally {
if (hbr != null) {
hbr.close();
}
}
}
@Test
public void testWriteFlowRunFlush() throws Exception {
String cluster = "atestFlushFlowRun_cluster1";
String user = "atestFlushFlowRun__user1";
String flow = "atestFlushFlowRun_flow_name";
String flowVersion = "AF1021C19F1351";
long runid = 1449526652000L;
int start = 10;
int count = 20000;
int appIdSuffix = 1;
HBaseTimelineWriterImpl hbi = null;
long insertTs = 1449796654827L - count;
long minTS = insertTs + 1;
long startTs = insertTs;
Configuration c1 = util.getConfiguration();
TimelineEntities te1 = null;
TimelineEntity entityApp1 = null;
TimelineEntity entityApp2 = null;
try {
hbi = new HBaseTimelineWriterImpl(c1);
hbi.init(c1);
for (int i = start; i < count; i++) {
String appName = "application_1060350000000_" + appIdSuffix;
insertTs++;
te1 = new TimelineEntities();
entityApp1 = TestFlowDataGenerator.getMinFlushEntity(insertTs);
te1.addEntity(entityApp1);
entityApp2 = TestFlowDataGenerator.getMaxFlushEntity(insertTs);
te1.addEntity(entityApp2);
hbi.write(cluster, user, flow, flowVersion, runid, appName, te1);
Thread.sleep(1);
appName = "application_1001199480000_7" + appIdSuffix;
insertTs++;
appIdSuffix++;
te1 = new TimelineEntities();
entityApp1 = TestFlowDataGenerator.getMinFlushEntity(insertTs);
te1.addEntity(entityApp1);
entityApp2 = TestFlowDataGenerator.getMaxFlushEntity(insertTs);
te1.addEntity(entityApp2);
hbi.write(cluster, user, flow, flowVersion, runid, appName, te1);
if (i % 1000 == 0) {
hbi.flush();
checkMinMaxFlush(c1, minTS, startTs, count, cluster, user, flow,
runid, false);
}
}
} finally {
if (hbi != null) {
hbi.flush();
hbi.close();
}
checkMinMaxFlush(c1, minTS, startTs, count, cluster, user, flow, runid,
true);
}
}
private void checkMinMaxFlush(Configuration c1, long minTS, long startTs,
int count, String cluster, String user, String flow, long runid,
boolean checkMax) throws IOException {
Connection conn = ConnectionFactory.createConnection(c1);
// check in flow run table
Table table1 = conn.getTable(TableName
.valueOf(FlowRunTable.DEFAULT_TABLE_NAME));
// scan the table and see that we get back the right min and max
// timestamps
byte[] startRow = new FlowRunRowKey(cluster, user, flow, runid).getRowKey();
Get g = new Get(startRow);
g.addColumn(FlowRunColumnFamily.INFO.getBytes(),
FlowRunColumn.MIN_START_TIME.getColumnQualifierBytes());
g.addColumn(FlowRunColumnFamily.INFO.getBytes(),
FlowRunColumn.MAX_END_TIME.getColumnQualifierBytes());
Result r1 = table1.get(g);
assertNotNull(r1);
assertTrue(!r1.isEmpty());
Map<byte[], byte[]> values = r1.getFamilyMap(FlowRunColumnFamily.INFO
.getBytes());
int start = 10;
assertEquals(2, r1.size());
long starttime = Bytes.toLong(values
.get(FlowRunColumn.MIN_START_TIME.getColumnQualifierBytes()));
assertEquals(minTS, starttime);
if (checkMax) {
assertEquals(startTs + 2 * (count - start)
+ TestFlowDataGenerator.END_TS_INCR,
Bytes.toLong(values
.get(FlowRunColumn.MAX_END_TIME.getColumnQualifierBytes())));
}
}
@Test
public void testFilterFlowRunsByCreatedTime() throws Exception {
String cluster = "cluster2";
String user = "user2";
String flow = "flow_name2";
TimelineEntities te = new TimelineEntities();
TimelineEntity entityApp1 = TestFlowDataGenerator.getEntityMetricsApp1(
System.currentTimeMillis());
entityApp1.setCreatedTime(1425016501000L);
te.addEntity(entityApp1);
HBaseTimelineWriterImpl hbi = null;
Configuration c1 = util.getConfiguration();
try {
hbi = new HBaseTimelineWriterImpl(c1);
hbi.init(c1);
hbi.write(cluster, user, flow, "CF7022C10F1354", 1002345678919L,
"application_11111111111111_1111", te);
// write another application with same metric to this flow
te = new TimelineEntities();
TimelineEntity entityApp2 = TestFlowDataGenerator.getEntityMetricsApp2(
System.currentTimeMillis());
entityApp2.setCreatedTime(1425016502000L);
te.addEntity(entityApp2);
hbi.write(cluster, user, flow, "CF7022C10F1354", 1002345678918L,
"application_11111111111111_2222", te);
hbi.flush();
} finally {
if (hbi != null) {
hbi.close();
}
}
// use the timeline reader to verify data
HBaseTimelineReaderImpl hbr = null;
try {
hbr = new HBaseTimelineReaderImpl();
hbr.init(c1);
hbr.start();
Set<TimelineEntity> entities = hbr.getEntities(
new TimelineReaderContext(cluster, user, flow,
null, null, TimelineEntityType.YARN_FLOW_RUN.toString(), null),
new TimelineEntityFilters(null, 1425016501000L, 1425016502001L, null,
null, null, null, null, null), new TimelineDataToRetrieve());
assertEquals(2, entities.size());
for (TimelineEntity entity : entities) {
if (!entity.getId().equals("user2@flow_name2/1002345678918") &&
!entity.getId().equals("user2@flow_name2/1002345678919")) {
fail("Entities with flow runs 1002345678918 and 1002345678919" +
"should be present.");
}
}
entities = hbr.getEntities(
new TimelineReaderContext(cluster, user, flow, null, null,
TimelineEntityType.YARN_FLOW_RUN.toString(), null),
new TimelineEntityFilters(null, 1425016501050L, null, null, null,
null, null, null, null), new TimelineDataToRetrieve());
assertEquals(1, entities.size());
for (TimelineEntity entity : entities) {
if (!entity.getId().equals("user2@flow_name2/1002345678918")) {
fail("Entity with flow run 1002345678918 should be present.");
}
}
entities = hbr.getEntities(
new TimelineReaderContext(cluster, user, flow, null, null,
TimelineEntityType.YARN_FLOW_RUN.toString(), null),
new TimelineEntityFilters(null, null, 1425016501050L, null, null,
null, null, null, null), new TimelineDataToRetrieve());
assertEquals(1, entities.size());
for (TimelineEntity entity : entities) {
if (!entity.getId().equals("user2@flow_name2/1002345678919")) {
fail("Entity with flow run 1002345678919 should be present.");
}
}
} finally {
if (hbr != null) {
hbr.close();
}
}
}
@Test
public void testMetricFilters() throws Exception {
String cluster = "cluster1";
String user = "user1";
String flow = "flow_name1";
TimelineEntities te = new TimelineEntities();
TimelineEntity entityApp1 = TestFlowDataGenerator.getEntityMetricsApp1(
System.currentTimeMillis());
te.addEntity(entityApp1);
HBaseTimelineWriterImpl hbi = null;
Configuration c1 = util.getConfiguration();
try {
hbi = new HBaseTimelineWriterImpl(c1);
hbi.init(c1);
hbi.write(cluster, user, flow, "CF7022C10F1354", 1002345678919L,
"application_11111111111111_1111", te);
// write another application with same metric to this flow
te = new TimelineEntities();
TimelineEntity entityApp2 = TestFlowDataGenerator.getEntityMetricsApp2(
System.currentTimeMillis());
te.addEntity(entityApp2);
hbi.write(cluster, user, flow, "CF7022C10F1354", 1002345678918L,
"application_11111111111111_2222", te);
hbi.flush();
} finally {
if (hbi != null) {
hbi.close();
}
}
// use the timeline reader to verify data
HBaseTimelineReaderImpl hbr = null;
try {
hbr = new HBaseTimelineReaderImpl();
hbr.init(c1);
hbr.start();
TimelineFilterList list1 = new TimelineFilterList();
list1.addFilter(new TimelineCompareFilter(
TimelineCompareOp.GREATER_OR_EQUAL, METRIC1, 101));
TimelineFilterList list2 = new TimelineFilterList();
list2.addFilter(new TimelineCompareFilter(
TimelineCompareOp.LESS_THAN, METRIC1, 43));
list2.addFilter(new TimelineCompareFilter(
TimelineCompareOp.EQUAL, METRIC2, 57));
TimelineFilterList metricFilterList =
new TimelineFilterList(Operator.OR, list1, list2);
Set<TimelineEntity> entities = hbr.getEntities(
new TimelineReaderContext(cluster, user, flow, null,
null, TimelineEntityType.YARN_FLOW_RUN.toString(), null),
new TimelineEntityFilters(null, null, null, null, null, null, null,
metricFilterList, null), new TimelineDataToRetrieve(null, null,
EnumSet.of(Field.METRICS), null));
assertEquals(2, entities.size());
int metricCnt = 0;
for (TimelineEntity entity : entities) {
metricCnt += entity.getMetrics().size();
}
assertEquals(3, metricCnt);
TimelineFilterList metricFilterList1 = new TimelineFilterList(
new TimelineCompareFilter(
TimelineCompareOp.LESS_OR_EQUAL, METRIC1, 127),
new TimelineCompareFilter(TimelineCompareOp.NOT_EQUAL, METRIC2, 30));
entities = hbr.getEntities(
new TimelineReaderContext(cluster, user, flow, null, null,
TimelineEntityType.YARN_FLOW_RUN.toString(), null),
new TimelineEntityFilters(null, null, null, null, null, null, null,
metricFilterList1, null), new TimelineDataToRetrieve(null, null,
EnumSet.of(Field.METRICS), null));
assertEquals(1, entities.size());
metricCnt = 0;
for (TimelineEntity entity : entities) {
metricCnt += entity.getMetrics().size();
}
assertEquals(2, metricCnt);
TimelineFilterList metricFilterList2 = new TimelineFilterList(
new TimelineCompareFilter(TimelineCompareOp.LESS_THAN, METRIC1, 32),
new TimelineCompareFilter(TimelineCompareOp.NOT_EQUAL, METRIC2, 57));
entities = hbr.getEntities(
new TimelineReaderContext(cluster, user, flow, null, null,
TimelineEntityType.YARN_FLOW_RUN.toString(), null),
new TimelineEntityFilters(null, null, null, null, null, null, null,
metricFilterList2, null), new TimelineDataToRetrieve(null, null,
EnumSet.of(Field.METRICS), null));
assertEquals(0, entities.size());
TimelineFilterList metricFilterList3 = new TimelineFilterList(
new TimelineCompareFilter(TimelineCompareOp.EQUAL, "s_metric", 32));
entities = hbr.getEntities(
new TimelineReaderContext(cluster, user, flow, null, null,
TimelineEntityType.YARN_FLOW_RUN.toString(), null),
new TimelineEntityFilters(null, null, null, null, null, null, null,
metricFilterList3, null), new TimelineDataToRetrieve(null, null,
EnumSet.of(Field.METRICS), null));
assertEquals(0, entities.size());
TimelineFilterList list3 = new TimelineFilterList();
list3.addFilter(new TimelineCompareFilter(
TimelineCompareOp.GREATER_OR_EQUAL, METRIC1, 101));
TimelineFilterList list4 = new TimelineFilterList();
list4.addFilter(new TimelineCompareFilter(
TimelineCompareOp.LESS_THAN, METRIC1, 43));
list4.addFilter(new TimelineCompareFilter(
TimelineCompareOp.EQUAL, METRIC2, 57));
TimelineFilterList metricFilterList4 =
new TimelineFilterList(Operator.OR, list3, list4);
TimelineFilterList metricsToRetrieve = new TimelineFilterList(Operator.OR,
new TimelinePrefixFilter(TimelineCompareOp.EQUAL,
METRIC2.substring(0, METRIC2.indexOf("_") + 1)));
entities = hbr.getEntities(
new TimelineReaderContext(cluster, user, flow, null, null,
TimelineEntityType.YARN_FLOW_RUN.toString(), null),
new TimelineEntityFilters(null, null, null, null, null, null, null,
metricFilterList4, null),
new TimelineDataToRetrieve(null, metricsToRetrieve,
EnumSet.of(Field.ALL), null));
assertEquals(2, entities.size());
metricCnt = 0;
for (TimelineEntity entity : entities) {
metricCnt += entity.getMetrics().size();
}
assertEquals(1, metricCnt);
} finally {
if (hbr != null) {
hbr.close();
}
}
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
util.shutdownMiniCluster();
}
}