| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.yarn.server.timelineservice.storage.flow; |
| |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertFalse; |
| import static org.junit.Assert.assertNotNull; |
| import static org.junit.Assert.assertTrue; |
| import static org.junit.Assert.fail; |
| |
| import java.io.IOException; |
| import java.util.EnumSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.hbase.HBaseTestingUtility; |
| import org.apache.hadoop.hbase.TableName; |
| import org.apache.hadoop.hbase.client.Admin; |
| import org.apache.hadoop.hbase.client.Connection; |
| import org.apache.hadoop.hbase.client.ConnectionFactory; |
| import org.apache.hadoop.hbase.client.Get; |
| import org.apache.hadoop.hbase.client.Result; |
| import org.apache.hadoop.hbase.client.ResultScanner; |
| import org.apache.hadoop.hbase.client.Scan; |
| import org.apache.hadoop.hbase.client.Table; |
| import org.apache.hadoop.hbase.regionserver.Region; |
| import org.apache.hadoop.hbase.regionserver.HRegionServer; |
| import org.apache.hadoop.hbase.util.Bytes; |
| import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity; |
| import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; |
| import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; |
| import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; |
| import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; |
| import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve; |
| import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters; |
| import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext; |
| import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineCompareFilter; |
| import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineCompareOp; |
| import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList; |
| import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList.Operator; |
| import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelinePrefixFilter; |
| import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineReaderImpl; |
| import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl; |
| import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; |
| import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator; |
| import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper; |
| import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; |
| import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable; |
| import org.junit.AfterClass; |
| import org.junit.BeforeClass; |
| import org.junit.Test; |
| |
| /** |
| * Tests the FlowRun and FlowActivity Tables |
| */ |
| public class TestHBaseStorageFlowRun { |
| |
| private static HBaseTestingUtility util; |
| |
| private static final String METRIC1 = "MAP_SLOT_MILLIS"; |
| private static final String METRIC2 = "HDFS_BYTES_READ"; |
| |
| @BeforeClass |
| public static void setupBeforeClass() throws Exception { |
| util = new HBaseTestingUtility(); |
| Configuration conf = util.getConfiguration(); |
| conf.setInt("hfile.format.version", 3); |
| util.startMiniCluster(); |
| createSchema(); |
| } |
| |
| private static void createSchema() throws IOException { |
| TimelineSchemaCreator.createAllTables(util.getConfiguration(), false); |
| } |
| |
| @Test |
| public void checkCoProcessorOff() throws IOException, InterruptedException { |
| Configuration hbaseConf = util.getConfiguration(); |
| TableName table = TableName.valueOf(hbaseConf.get( |
| FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME)); |
| Connection conn = null; |
| conn = ConnectionFactory.createConnection(hbaseConf); |
| Admin admin = conn.getAdmin(); |
| if (admin == null) { |
| throw new IOException("Can't check tables since admin is null"); |
| } |
| if (admin.tableExists(table)) { |
| // check the regions. |
| // check in flow run table |
| util.waitUntilAllRegionsAssigned(table); |
| HRegionServer server = util.getRSForFirstRegionInTable(table); |
| List<Region> regions = server.getOnlineRegions(table); |
| for (Region region : regions) { |
| assertTrue(TimelineStorageUtils.isFlowRunTable(region.getRegionInfo(), |
| hbaseConf)); |
| } |
| } |
| |
| table = TableName.valueOf(hbaseConf.get( |
| FlowActivityTable.TABLE_NAME_CONF_NAME, |
| FlowActivityTable.DEFAULT_TABLE_NAME)); |
| if (admin.tableExists(table)) { |
| // check the regions. |
| // check in flow activity table |
| util.waitUntilAllRegionsAssigned(table); |
| HRegionServer server = util.getRSForFirstRegionInTable(table); |
| List<Region> regions = server.getOnlineRegions(table); |
| for (Region region : regions) { |
| assertFalse(TimelineStorageUtils.isFlowRunTable(region.getRegionInfo(), |
| hbaseConf)); |
| } |
| } |
| |
| table = TableName.valueOf(hbaseConf.get( |
| EntityTable.TABLE_NAME_CONF_NAME, |
| EntityTable.DEFAULT_TABLE_NAME)); |
| if (admin.tableExists(table)) { |
| // check the regions. |
| // check in entity run table |
| util.waitUntilAllRegionsAssigned(table); |
| HRegionServer server = util.getRSForFirstRegionInTable(table); |
| List<Region> regions = server.getOnlineRegions(table); |
| for (Region region : regions) { |
| assertFalse(TimelineStorageUtils.isFlowRunTable(region.getRegionInfo(), |
| hbaseConf)); |
| } |
| } |
| } |
| |
| /** |
| * Writes 4 timeline entities belonging to one flow run through the |
| * {@link HBaseTimelineWriterImpl} |
| * |
| * Checks the flow run table contents |
| * |
| * The first entity has a created event, metrics and a finish event. |
| * |
| * The second entity has a created event and this is the entity with smallest |
| * start time. This should be the start time for the flow run. |
| * |
| * The third entity has a finish event and this is the entity with the max end |
| * time. This should be the end time for the flow run. |
| * |
| * The fourth entity has a created event which has a start time that is |
| * greater than min start time. |
| * |
| */ |
| @Test |
| public void testWriteFlowRunMinMax() throws Exception { |
| |
| TimelineEntities te = new TimelineEntities(); |
| te.addEntity(TestFlowDataGenerator.getEntity1()); |
| |
| HBaseTimelineWriterImpl hbi = null; |
| Configuration c1 = util.getConfiguration(); |
| String cluster = "testWriteFlowRunMinMaxToHBase_cluster1"; |
| String user = "testWriteFlowRunMinMaxToHBase_user1"; |
| String flow = "testing_flowRun_flow_name"; |
| String flowVersion = "CF7022C10F1354"; |
| long runid = 1002345678919L; |
| String appName = "application_100000000000_1111"; |
| long minStartTs = 1425026900000L; |
| long greaterStartTs = 30000000000000L; |
| long endTs = 1439750690000L; |
| TimelineEntity entityMinStartTime = TestFlowDataGenerator |
| .getEntityMinStartTime(minStartTs); |
| |
| try { |
| hbi = new HBaseTimelineWriterImpl(c1); |
| hbi.init(c1); |
| hbi.write(cluster, user, flow, flowVersion, runid, appName, te); |
| |
| // write another entity with the right min start time |
| te = new TimelineEntities(); |
| te.addEntity(entityMinStartTime); |
| appName = "application_100000000000_3333"; |
| hbi.write(cluster, user, flow, flowVersion, runid, appName, te); |
| |
| // writer another entity for max end time |
| TimelineEntity entityMaxEndTime = TestFlowDataGenerator |
| .getEntityMaxEndTime(endTs); |
| te = new TimelineEntities(); |
| te.addEntity(entityMaxEndTime); |
| appName = "application_100000000000_4444"; |
| hbi.write(cluster, user, flow, flowVersion, runid, appName, te); |
| |
| // writer another entity with greater start time |
| TimelineEntity entityGreaterStartTime = TestFlowDataGenerator |
| .getEntityGreaterStartTime(greaterStartTs); |
| te = new TimelineEntities(); |
| te.addEntity(entityGreaterStartTime); |
| appName = "application_1000000000000000_2222"; |
| hbi.write(cluster, user, flow, flowVersion, runid, appName, te); |
| |
| // flush everything to hbase |
| hbi.flush(); |
| } finally { |
| if (hbi != null) { |
| hbi.close(); |
| } |
| } |
| |
| Connection conn = ConnectionFactory.createConnection(c1); |
| // check in flow run table |
| Table table1 = conn.getTable(TableName |
| .valueOf(FlowRunTable.DEFAULT_TABLE_NAME)); |
| // scan the table and see that we get back the right min and max |
| // timestamps |
| byte[] startRow = new FlowRunRowKey(cluster, user, flow, runid).getRowKey(); |
| Get g = new Get(startRow); |
| g.addColumn(FlowRunColumnFamily.INFO.getBytes(), |
| FlowRunColumn.MIN_START_TIME.getColumnQualifierBytes()); |
| g.addColumn(FlowRunColumnFamily.INFO.getBytes(), |
| FlowRunColumn.MAX_END_TIME.getColumnQualifierBytes()); |
| Result r1 = table1.get(g); |
| assertNotNull(r1); |
| assertTrue(!r1.isEmpty()); |
| Map<byte[], byte[]> values = r1.getFamilyMap(FlowRunColumnFamily.INFO |
| .getBytes()); |
| |
| assertEquals(2, r1.size()); |
| long starttime = Bytes.toLong(values.get( |
| FlowRunColumn.MIN_START_TIME.getColumnQualifierBytes())); |
| assertEquals(minStartTs, starttime); |
| assertEquals(endTs, Bytes.toLong(values |
| .get(FlowRunColumn.MAX_END_TIME.getColumnQualifierBytes()))); |
| |
| // use the timeline reader to verify data |
| HBaseTimelineReaderImpl hbr = null; |
| try { |
| hbr = new HBaseTimelineReaderImpl(); |
| hbr.init(c1); |
| hbr.start(); |
| // get the flow run entity |
| TimelineEntity entity = hbr.getEntity( |
| new TimelineReaderContext(cluster, user, flow, runid, null, |
| TimelineEntityType.YARN_FLOW_RUN.toString(), null), |
| new TimelineDataToRetrieve()); |
| assertTrue(TimelineEntityType.YARN_FLOW_RUN.matches(entity.getType())); |
| FlowRunEntity flowRun = (FlowRunEntity)entity; |
| assertEquals(minStartTs, flowRun.getStartTime()); |
| assertEquals(endTs, flowRun.getMaxEndTime()); |
| } finally { |
| if (hbr != null) { |
| hbr.close(); |
| } |
| } |
| } |
| |
| /** |
| * Writes two application entities of the same flow run. Each application has |
| * two metrics: slot millis and hdfs bytes read. Each metric has values at two |
| * timestamps. |
| * |
| * Checks the metric values of the flow in the flow run table. Flow metric |
| * values should be the sum of individual metric values that belong to the |
| * latest timestamp for that metric |
| */ |
| @Test |
| public void testWriteFlowRunMetricsOneFlow() throws Exception { |
| String cluster = "testWriteFlowRunMetricsOneFlow_cluster1"; |
| String user = "testWriteFlowRunMetricsOneFlow_user1"; |
| String flow = "testing_flowRun_metrics_flow_name"; |
| String flowVersion = "CF7022C10F1354"; |
| long runid = 1002345678919L; |
| |
| TimelineEntities te = new TimelineEntities(); |
| TimelineEntity entityApp1 = TestFlowDataGenerator |
| .getEntityMetricsApp1(System.currentTimeMillis()); |
| te.addEntity(entityApp1); |
| |
| HBaseTimelineWriterImpl hbi = null; |
| Configuration c1 = util.getConfiguration(); |
| try { |
| hbi = new HBaseTimelineWriterImpl(c1); |
| hbi.init(c1); |
| String appName = "application_11111111111111_1111"; |
| hbi.write(cluster, user, flow, flowVersion, runid, appName, te); |
| // write another application with same metric to this flow |
| te = new TimelineEntities(); |
| TimelineEntity entityApp2 = TestFlowDataGenerator |
| .getEntityMetricsApp2(System.currentTimeMillis()); |
| te.addEntity(entityApp2); |
| appName = "application_11111111111111_2222"; |
| hbi.write(cluster, user, flow, flowVersion, runid, appName, te); |
| hbi.flush(); |
| } finally { |
| if (hbi != null) { |
| hbi.close(); |
| } |
| } |
| |
| // check flow run |
| checkFlowRunTable(cluster, user, flow, runid, c1); |
| |
| // check various batch limits in scanning the table for this flow |
| checkFlowRunTableBatchLimit(cluster, user, flow, runid, c1); |
| |
| // use the timeline reader to verify data |
| HBaseTimelineReaderImpl hbr = null; |
| try { |
| hbr = new HBaseTimelineReaderImpl(); |
| hbr.init(c1); |
| hbr.start(); |
| TimelineEntity entity = hbr.getEntity( |
| new TimelineReaderContext(cluster, user, flow, runid, null, |
| TimelineEntityType.YARN_FLOW_RUN.toString(), null), |
| new TimelineDataToRetrieve()); |
| assertTrue(TimelineEntityType.YARN_FLOW_RUN.matches(entity.getType())); |
| Set<TimelineMetric> metrics = entity.getMetrics(); |
| assertEquals(2, metrics.size()); |
| for (TimelineMetric metric : metrics) { |
| String id = metric.getId(); |
| Map<Long, Number> values = metric.getValues(); |
| assertEquals(1, values.size()); |
| Number value = null; |
| for (Number n : values.values()) { |
| value = n; |
| } |
| switch (id) { |
| case METRIC1: |
| assertEquals(141L, value); |
| break; |
| case METRIC2: |
| assertEquals(57L, value); |
| break; |
| default: |
| fail("unrecognized metric: " + id); |
| } |
| } |
| } finally { |
| if (hbr != null) { |
| hbr.close(); |
| } |
| } |
| } |
| |
| /* |
| * checks the batch limits on a scan |
| */ |
| void checkFlowRunTableBatchLimit(String cluster, String user, |
| String flow, long runid, Configuration c1) throws IOException { |
| |
| Scan s = new Scan(); |
| s.addFamily(FlowRunColumnFamily.INFO.getBytes()); |
| byte[] startRow = new FlowRunRowKey(cluster, user, flow, runid).getRowKey(); |
| s.setStartRow(startRow); |
| // set a batch limit |
| int batchLimit = 2; |
| s.setBatch(batchLimit); |
| String clusterStop = cluster + "1"; |
| byte[] stopRow = new FlowRunRowKey(clusterStop, user, flow, runid).getRowKey(); |
| s.setStopRow(stopRow); |
| Connection conn = ConnectionFactory.createConnection(c1); |
| Table table1 = conn |
| .getTable(TableName.valueOf(FlowRunTable.DEFAULT_TABLE_NAME)); |
| ResultScanner scanner = table1.getScanner(s); |
| |
| int loopCount = 0; |
| for (Result result : scanner) { |
| assertNotNull(result); |
| assertTrue(!result.isEmpty()); |
| assertTrue(result.rawCells().length <= batchLimit); |
| Map<byte[], byte[]> values = result |
| .getFamilyMap(FlowRunColumnFamily.INFO.getBytes()); |
| assertNotNull(values); |
| assertTrue(values.size() <= batchLimit); |
| loopCount++; |
| } |
| assertTrue(loopCount > 0); |
| |
| // test with a diff batch limit |
| s = new Scan(); |
| s.addFamily(FlowRunColumnFamily.INFO.getBytes()); |
| s.setStartRow(startRow); |
| // set a batch limit |
| batchLimit = 1; |
| s.setBatch(batchLimit); |
| s.setMaxResultsPerColumnFamily(2); |
| s.setStopRow(stopRow); |
| scanner = table1.getScanner(s); |
| |
| loopCount = 0; |
| for (Result result : scanner) { |
| assertNotNull(result); |
| assertTrue(!result.isEmpty()); |
| assertEquals(batchLimit, result.rawCells().length); |
| Map<byte[], byte[]> values = result |
| .getFamilyMap(FlowRunColumnFamily.INFO.getBytes()); |
| assertNotNull(values); |
| assertEquals(batchLimit, values.size()); |
| loopCount++; |
| } |
| assertTrue(loopCount > 0); |
| |
| // test with a diff batch limit |
| // set it high enough |
| // we expect back 3 since there are |
| // column = m!HDFS_BYTES_READ value=57 |
| // column = m!MAP_SLOT_MILLIS value=141 |
| // column min_start_time value=1425016501000 |
| s = new Scan(); |
| s.addFamily(FlowRunColumnFamily.INFO.getBytes()); |
| s.setStartRow(startRow); |
| // set a batch limit |
| batchLimit = 100; |
| s.setBatch(batchLimit); |
| s.setStopRow(stopRow); |
| scanner = table1.getScanner(s); |
| |
| loopCount = 0; |
| for (Result result : scanner) { |
| assertNotNull(result); |
| assertTrue(!result.isEmpty()); |
| assertTrue(result.rawCells().length <= batchLimit); |
| Map<byte[], byte[]> values = result |
| .getFamilyMap(FlowRunColumnFamily.INFO.getBytes()); |
| assertNotNull(values); |
| // assert that with every next invocation |
| // we get back <= batchLimit values |
| assertTrue(values.size() <= batchLimit); |
| assertTrue(values.size() == 3); // see comment above |
| loopCount++; |
| } |
| // should loop through only once |
| assertTrue(loopCount == 1); |
| |
| // set it to a negative number |
| // we expect all 3 back since there are |
| // column = m!HDFS_BYTES_READ value=57 |
| // column = m!MAP_SLOT_MILLIS value=141 |
| // column min_start_time value=1425016501000 |
| s = new Scan(); |
| s.addFamily(FlowRunColumnFamily.INFO.getBytes()); |
| s.setStartRow(startRow); |
| // set a batch limit |
| batchLimit = -671; |
| s.setBatch(batchLimit); |
| s.setStopRow(stopRow); |
| scanner = table1.getScanner(s); |
| |
| loopCount = 0; |
| for (Result result : scanner) { |
| assertNotNull(result); |
| assertTrue(!result.isEmpty()); |
| assertEquals(3, result.rawCells().length); |
| Map<byte[], byte[]> values = result |
| .getFamilyMap(FlowRunColumnFamily.INFO.getBytes()); |
| assertNotNull(values); |
| // assert that with every next invocation |
| // we get back <= batchLimit values |
| assertEquals(3, values.size()); |
| loopCount++; |
| } |
| // should loop through only once |
| assertEquals(1, loopCount); |
| |
| // set it to 0 |
| // we expect all 3 back since there are |
| // column = m!HDFS_BYTES_READ value=57 |
| // column = m!MAP_SLOT_MILLIS value=141 |
| // column min_start_time value=1425016501000 |
| s = new Scan(); |
| s.addFamily(FlowRunColumnFamily.INFO.getBytes()); |
| s.setStartRow(startRow); |
| // set a batch limit |
| batchLimit = 0; |
| s.setBatch(batchLimit); |
| s.setStopRow(stopRow); |
| scanner = table1.getScanner(s); |
| |
| loopCount = 0; |
| for (Result result : scanner) { |
| assertNotNull(result); |
| assertTrue(!result.isEmpty()); |
| assertEquals(3, result.rawCells().length); |
| Map<byte[], byte[]> values = result |
| .getFamilyMap(FlowRunColumnFamily.INFO.getBytes()); |
| assertNotNull(values); |
| // assert that with every next invocation |
| // we get back <= batchLimit values |
| assertEquals(3, values.size()); |
| loopCount++; |
| } |
| // should loop through only once |
| assertEquals(1, loopCount); |
| } |
| |
| private void checkFlowRunTable(String cluster, String user, String flow, |
| long runid, Configuration c1) throws IOException { |
| Scan s = new Scan(); |
| s.addFamily(FlowRunColumnFamily.INFO.getBytes()); |
| byte[] startRow = new FlowRunRowKey(cluster, user, flow, runid).getRowKey(); |
| s.setStartRow(startRow); |
| String clusterStop = cluster + "1"; |
| byte[] stopRow = |
| new FlowRunRowKey(clusterStop, user, flow, runid).getRowKey(); |
| s.setStopRow(stopRow); |
| Connection conn = ConnectionFactory.createConnection(c1); |
| Table table1 = conn.getTable(TableName |
| .valueOf(FlowRunTable.DEFAULT_TABLE_NAME)); |
| ResultScanner scanner = table1.getScanner(s); |
| |
| int rowCount = 0; |
| for (Result result : scanner) { |
| assertNotNull(result); |
| assertTrue(!result.isEmpty()); |
| Map<byte[], byte[]> values = result.getFamilyMap(FlowRunColumnFamily.INFO |
| .getBytes()); |
| rowCount++; |
| // check metric1 |
| byte[] q = ColumnHelper.getColumnQualifier( |
| FlowRunColumnPrefix.METRIC.getColumnPrefixBytes(), METRIC1); |
| assertTrue(values.containsKey(q)); |
| assertEquals(141L, Bytes.toLong(values.get(q))); |
| |
| // check metric2 |
| assertEquals(3, values.size()); |
| q = ColumnHelper.getColumnQualifier( |
| FlowRunColumnPrefix.METRIC.getColumnPrefixBytes(), METRIC2); |
| assertTrue(values.containsKey(q)); |
| assertEquals(57L, Bytes.toLong(values.get(q))); |
| } |
| assertEquals(1, rowCount); |
| } |
| |
| @Test |
| public void testWriteFlowRunMetricsPrefix() throws Exception { |
| String cluster = "testWriteFlowRunMetricsPrefix_cluster1"; |
| String user = "testWriteFlowRunMetricsPrefix_user1"; |
| String flow = "testWriteFlowRunMetricsPrefix_flow_name"; |
| String flowVersion = "CF7022C10F1354"; |
| |
| TimelineEntities te = new TimelineEntities(); |
| TimelineEntity entityApp1 = TestFlowDataGenerator |
| .getEntityMetricsApp1(System.currentTimeMillis()); |
| te.addEntity(entityApp1); |
| |
| HBaseTimelineWriterImpl hbi = null; |
| Configuration c1 = util.getConfiguration(); |
| try { |
| hbi = new HBaseTimelineWriterImpl(c1); |
| hbi.init(c1); |
| String appName = "application_11111111111111_1111"; |
| hbi.write(cluster, user, flow, flowVersion, 1002345678919L, appName, te); |
| // write another application with same metric to this flow |
| te = new TimelineEntities(); |
| TimelineEntity entityApp2 = TestFlowDataGenerator |
| .getEntityMetricsApp2(System.currentTimeMillis()); |
| te.addEntity(entityApp2); |
| appName = "application_11111111111111_2222"; |
| hbi.write(cluster, user, flow, flowVersion, 1002345678918L, appName, te); |
| hbi.flush(); |
| } finally { |
| if (hbi != null) { |
| hbi.close(); |
| } |
| } |
| |
| // use the timeline reader to verify data |
| HBaseTimelineReaderImpl hbr = null; |
| try { |
| hbr = new HBaseTimelineReaderImpl(); |
| hbr.init(c1); |
| hbr.start(); |
| TimelineFilterList metricsToRetrieve = new TimelineFilterList( |
| Operator.OR, new TimelinePrefixFilter(TimelineCompareOp.EQUAL, |
| METRIC1.substring(0, METRIC1.indexOf("_") + 1))); |
| TimelineEntity entity = hbr.getEntity( |
| new TimelineReaderContext(cluster, user, flow, 1002345678919L, null, |
| TimelineEntityType.YARN_FLOW_RUN.toString(), null), |
| new TimelineDataToRetrieve(null, metricsToRetrieve, null, null)); |
| assertTrue(TimelineEntityType.YARN_FLOW_RUN.matches(entity.getType())); |
| Set<TimelineMetric> metrics = entity.getMetrics(); |
| assertEquals(1, metrics.size()); |
| for (TimelineMetric metric : metrics) { |
| String id = metric.getId(); |
| Map<Long, Number> values = metric.getValues(); |
| assertEquals(1, values.size()); |
| Number value = null; |
| for (Number n : values.values()) { |
| value = n; |
| } |
| switch (id) { |
| case METRIC1: |
| assertEquals(40L, value); |
| break; |
| default: |
| fail("unrecognized metric: " + id); |
| } |
| } |
| |
| Set<TimelineEntity> entities = hbr.getEntities( |
| new TimelineReaderContext(cluster, user, flow, null, null, |
| TimelineEntityType.YARN_FLOW_RUN.toString(), null), |
| new TimelineEntityFilters(), |
| new TimelineDataToRetrieve(null, metricsToRetrieve, null, null)); |
| assertEquals(2, entities.size()); |
| int metricCnt = 0; |
| for (TimelineEntity timelineEntity : entities) { |
| metricCnt += timelineEntity.getMetrics().size(); |
| } |
| assertEquals(2, metricCnt); |
| } finally { |
| if (hbr != null) { |
| hbr.close(); |
| } |
| } |
| } |
| |
| @Test |
| public void testWriteFlowRunsMetricFields() throws Exception { |
| String cluster = "testWriteFlowRunsMetricFields_cluster1"; |
| String user = "testWriteFlowRunsMetricFields_user1"; |
| String flow = "testWriteFlowRunsMetricFields_flow_name"; |
| String flowVersion = "CF7022C10F1354"; |
| long runid = 1002345678919L; |
| |
| TimelineEntities te = new TimelineEntities(); |
| TimelineEntity entityApp1 = TestFlowDataGenerator |
| .getEntityMetricsApp1(System.currentTimeMillis()); |
| te.addEntity(entityApp1); |
| |
| HBaseTimelineWriterImpl hbi = null; |
| Configuration c1 = util.getConfiguration(); |
| try { |
| hbi = new HBaseTimelineWriterImpl(c1); |
| hbi.init(c1); |
| String appName = "application_11111111111111_1111"; |
| hbi.write(cluster, user, flow, flowVersion, runid, appName, te); |
| // write another application with same metric to this flow |
| te = new TimelineEntities(); |
| TimelineEntity entityApp2 = TestFlowDataGenerator |
| .getEntityMetricsApp2(System.currentTimeMillis()); |
| te.addEntity(entityApp2); |
| appName = "application_11111111111111_2222"; |
| hbi.write(cluster, user, flow, flowVersion, runid, appName, te); |
| hbi.flush(); |
| } finally { |
| if (hbi != null) { |
| hbi.close(); |
| } |
| } |
| |
| // check flow run |
| checkFlowRunTable(cluster, user, flow, runid, c1); |
| |
| // use the timeline reader to verify data |
| HBaseTimelineReaderImpl hbr = null; |
| try { |
| hbr = new HBaseTimelineReaderImpl(); |
| hbr.init(c1); |
| hbr.start(); |
| Set<TimelineEntity> entities = hbr.getEntities( |
| new TimelineReaderContext(cluster, user, flow, runid, null, |
| TimelineEntityType.YARN_FLOW_RUN.toString(), null), |
| new TimelineEntityFilters(), |
| new TimelineDataToRetrieve()); |
| assertEquals(1, entities.size()); |
| for (TimelineEntity timelineEntity : entities) { |
| assertEquals(0, timelineEntity.getMetrics().size()); |
| } |
| |
| entities = hbr.getEntities( |
| new TimelineReaderContext(cluster, user, flow, runid, null, |
| TimelineEntityType.YARN_FLOW_RUN.toString(), null), |
| new TimelineEntityFilters(), new TimelineDataToRetrieve(null, null, |
| EnumSet.of(Field.METRICS), null)); |
| assertEquals(1, entities.size()); |
| for (TimelineEntity timelineEntity : entities) { |
| Set<TimelineMetric> timelineMetrics = timelineEntity.getMetrics(); |
| assertEquals(2, timelineMetrics.size()); |
| for (TimelineMetric metric : timelineMetrics) { |
| String id = metric.getId(); |
| Map<Long, Number> values = metric.getValues(); |
| assertEquals(1, values.size()); |
| Number value = null; |
| for (Number n : values.values()) { |
| value = n; |
| } |
| switch (id) { |
| case METRIC1: |
| assertEquals(141L, value); |
| break; |
| case METRIC2: |
| assertEquals(57L, value); |
| break; |
| default: |
| fail("unrecognized metric: " + id); |
| } |
| } |
| } |
| } finally { |
| if (hbr != null) { |
| hbr.close(); |
| } |
| } |
| } |
| |
| @Test |
| public void testWriteFlowRunFlush() throws Exception { |
| String cluster = "atestFlushFlowRun_cluster1"; |
| String user = "atestFlushFlowRun__user1"; |
| String flow = "atestFlushFlowRun_flow_name"; |
| String flowVersion = "AF1021C19F1351"; |
| long runid = 1449526652000L; |
| |
| int start = 10; |
| int count = 20000; |
| int appIdSuffix = 1; |
| HBaseTimelineWriterImpl hbi = null; |
| long insertTs = 1449796654827L - count; |
| long minTS = insertTs + 1; |
| long startTs = insertTs; |
| Configuration c1 = util.getConfiguration(); |
| TimelineEntities te1 = null; |
| TimelineEntity entityApp1 = null; |
| TimelineEntity entityApp2 = null; |
| try { |
| hbi = new HBaseTimelineWriterImpl(c1); |
| hbi.init(c1); |
| |
| for (int i = start; i < count; i++) { |
| String appName = "application_1060350000000_" + appIdSuffix; |
| insertTs++; |
| te1 = new TimelineEntities(); |
| entityApp1 = TestFlowDataGenerator.getMinFlushEntity(insertTs); |
| te1.addEntity(entityApp1); |
| entityApp2 = TestFlowDataGenerator.getMaxFlushEntity(insertTs); |
| te1.addEntity(entityApp2); |
| hbi.write(cluster, user, flow, flowVersion, runid, appName, te1); |
| Thread.sleep(1); |
| |
| appName = "application_1001199480000_7" + appIdSuffix; |
| insertTs++; |
| appIdSuffix++; |
| te1 = new TimelineEntities(); |
| entityApp1 = TestFlowDataGenerator.getMinFlushEntity(insertTs); |
| te1.addEntity(entityApp1); |
| entityApp2 = TestFlowDataGenerator.getMaxFlushEntity(insertTs); |
| te1.addEntity(entityApp2); |
| |
| hbi.write(cluster, user, flow, flowVersion, runid, appName, te1); |
| if (i % 1000 == 0) { |
| hbi.flush(); |
| checkMinMaxFlush(c1, minTS, startTs, count, cluster, user, flow, |
| runid, false); |
| } |
| } |
| } finally { |
| if (hbi != null) { |
| hbi.flush(); |
| hbi.close(); |
| } |
| checkMinMaxFlush(c1, minTS, startTs, count, cluster, user, flow, runid, |
| true); |
| } |
| } |
| |
| private void checkMinMaxFlush(Configuration c1, long minTS, long startTs, |
| int count, String cluster, String user, String flow, long runid, |
| boolean checkMax) throws IOException { |
| Connection conn = ConnectionFactory.createConnection(c1); |
| // check in flow run table |
| Table table1 = conn.getTable(TableName |
| .valueOf(FlowRunTable.DEFAULT_TABLE_NAME)); |
| // scan the table and see that we get back the right min and max |
| // timestamps |
| byte[] startRow = new FlowRunRowKey(cluster, user, flow, runid).getRowKey(); |
| Get g = new Get(startRow); |
| g.addColumn(FlowRunColumnFamily.INFO.getBytes(), |
| FlowRunColumn.MIN_START_TIME.getColumnQualifierBytes()); |
| g.addColumn(FlowRunColumnFamily.INFO.getBytes(), |
| FlowRunColumn.MAX_END_TIME.getColumnQualifierBytes()); |
| |
| Result r1 = table1.get(g); |
| assertNotNull(r1); |
| assertTrue(!r1.isEmpty()); |
| Map<byte[], byte[]> values = r1.getFamilyMap(FlowRunColumnFamily.INFO |
| .getBytes()); |
| int start = 10; |
| assertEquals(2, r1.size()); |
| long starttime = Bytes.toLong(values |
| .get(FlowRunColumn.MIN_START_TIME.getColumnQualifierBytes())); |
| assertEquals(minTS, starttime); |
| if (checkMax) { |
| assertEquals(startTs + 2 * (count - start) |
| + TestFlowDataGenerator.END_TS_INCR, |
| Bytes.toLong(values |
| .get(FlowRunColumn.MAX_END_TIME.getColumnQualifierBytes()))); |
| } |
| } |
| |
| @Test |
| public void testFilterFlowRunsByCreatedTime() throws Exception { |
| String cluster = "cluster2"; |
| String user = "user2"; |
| String flow = "flow_name2"; |
| |
| TimelineEntities te = new TimelineEntities(); |
| TimelineEntity entityApp1 = TestFlowDataGenerator.getEntityMetricsApp1( |
| System.currentTimeMillis()); |
| entityApp1.setCreatedTime(1425016501000L); |
| te.addEntity(entityApp1); |
| |
| HBaseTimelineWriterImpl hbi = null; |
| Configuration c1 = util.getConfiguration(); |
| try { |
| hbi = new HBaseTimelineWriterImpl(c1); |
| hbi.init(c1); |
| hbi.write(cluster, user, flow, "CF7022C10F1354", 1002345678919L, |
| "application_11111111111111_1111", te); |
| // write another application with same metric to this flow |
| te = new TimelineEntities(); |
| TimelineEntity entityApp2 = TestFlowDataGenerator.getEntityMetricsApp2( |
| System.currentTimeMillis()); |
| entityApp2.setCreatedTime(1425016502000L); |
| te.addEntity(entityApp2); |
| hbi.write(cluster, user, flow, "CF7022C10F1354", 1002345678918L, |
| "application_11111111111111_2222", te); |
| hbi.flush(); |
| } finally { |
| if (hbi != null) { |
| hbi.close(); |
| } |
| } |
| |
| // use the timeline reader to verify data |
| HBaseTimelineReaderImpl hbr = null; |
| try { |
| hbr = new HBaseTimelineReaderImpl(); |
| hbr.init(c1); |
| hbr.start(); |
| |
| Set<TimelineEntity> entities = hbr.getEntities( |
| new TimelineReaderContext(cluster, user, flow, |
| null, null, TimelineEntityType.YARN_FLOW_RUN.toString(), null), |
| new TimelineEntityFilters(null, 1425016501000L, 1425016502001L, null, |
| null, null, null, null, null), new TimelineDataToRetrieve()); |
| assertEquals(2, entities.size()); |
| for (TimelineEntity entity : entities) { |
| if (!entity.getId().equals("user2@flow_name2/1002345678918") && |
| !entity.getId().equals("user2@flow_name2/1002345678919")) { |
| fail("Entities with flow runs 1002345678918 and 1002345678919" + |
| "should be present."); |
| } |
| } |
| entities = hbr.getEntities( |
| new TimelineReaderContext(cluster, user, flow, null, null, |
| TimelineEntityType.YARN_FLOW_RUN.toString(), null), |
| new TimelineEntityFilters(null, 1425016501050L, null, null, null, |
| null, null, null, null), new TimelineDataToRetrieve()); |
| assertEquals(1, entities.size()); |
| for (TimelineEntity entity : entities) { |
| if (!entity.getId().equals("user2@flow_name2/1002345678918")) { |
| fail("Entity with flow run 1002345678918 should be present."); |
| } |
| } |
| entities = hbr.getEntities( |
| new TimelineReaderContext(cluster, user, flow, null, null, |
| TimelineEntityType.YARN_FLOW_RUN.toString(), null), |
| new TimelineEntityFilters(null, null, 1425016501050L, null, null, |
| null, null, null, null), new TimelineDataToRetrieve()); |
| assertEquals(1, entities.size()); |
| for (TimelineEntity entity : entities) { |
| if (!entity.getId().equals("user2@flow_name2/1002345678919")) { |
| fail("Entity with flow run 1002345678919 should be present."); |
| } |
| } |
| } finally { |
| if (hbr != null) { |
| hbr.close(); |
| } |
| } |
| } |
| |
| @Test |
| public void testMetricFilters() throws Exception { |
| String cluster = "cluster1"; |
| String user = "user1"; |
| String flow = "flow_name1"; |
| |
| TimelineEntities te = new TimelineEntities(); |
| TimelineEntity entityApp1 = TestFlowDataGenerator.getEntityMetricsApp1( |
| System.currentTimeMillis()); |
| te.addEntity(entityApp1); |
| |
| HBaseTimelineWriterImpl hbi = null; |
| Configuration c1 = util.getConfiguration(); |
| try { |
| hbi = new HBaseTimelineWriterImpl(c1); |
| hbi.init(c1); |
| hbi.write(cluster, user, flow, "CF7022C10F1354", 1002345678919L, |
| "application_11111111111111_1111", te); |
| // write another application with same metric to this flow |
| te = new TimelineEntities(); |
| TimelineEntity entityApp2 = TestFlowDataGenerator.getEntityMetricsApp2( |
| System.currentTimeMillis()); |
| te.addEntity(entityApp2); |
| hbi.write(cluster, user, flow, "CF7022C10F1354", 1002345678918L, |
| "application_11111111111111_2222", te); |
| hbi.flush(); |
| } finally { |
| if (hbi != null) { |
| hbi.close(); |
| } |
| } |
| |
| // use the timeline reader to verify data |
| HBaseTimelineReaderImpl hbr = null; |
| try { |
| hbr = new HBaseTimelineReaderImpl(); |
| hbr.init(c1); |
| hbr.start(); |
| |
| TimelineFilterList list1 = new TimelineFilterList(); |
| list1.addFilter(new TimelineCompareFilter( |
| TimelineCompareOp.GREATER_OR_EQUAL, METRIC1, 101)); |
| TimelineFilterList list2 = new TimelineFilterList(); |
| list2.addFilter(new TimelineCompareFilter( |
| TimelineCompareOp.LESS_THAN, METRIC1, 43)); |
| list2.addFilter(new TimelineCompareFilter( |
| TimelineCompareOp.EQUAL, METRIC2, 57)); |
| TimelineFilterList metricFilterList = |
| new TimelineFilterList(Operator.OR, list1, list2); |
| Set<TimelineEntity> entities = hbr.getEntities( |
| new TimelineReaderContext(cluster, user, flow, null, |
| null, TimelineEntityType.YARN_FLOW_RUN.toString(), null), |
| new TimelineEntityFilters(null, null, null, null, null, null, null, |
| metricFilterList, null), new TimelineDataToRetrieve(null, null, |
| EnumSet.of(Field.METRICS), null)); |
| assertEquals(2, entities.size()); |
| int metricCnt = 0; |
| for (TimelineEntity entity : entities) { |
| metricCnt += entity.getMetrics().size(); |
| } |
| assertEquals(3, metricCnt); |
| |
| TimelineFilterList metricFilterList1 = new TimelineFilterList( |
| new TimelineCompareFilter( |
| TimelineCompareOp.LESS_OR_EQUAL, METRIC1, 127), |
| new TimelineCompareFilter(TimelineCompareOp.NOT_EQUAL, METRIC2, 30)); |
| entities = hbr.getEntities( |
| new TimelineReaderContext(cluster, user, flow, null, null, |
| TimelineEntityType.YARN_FLOW_RUN.toString(), null), |
| new TimelineEntityFilters(null, null, null, null, null, null, null, |
| metricFilterList1, null), new TimelineDataToRetrieve(null, null, |
| EnumSet.of(Field.METRICS), null)); |
| assertEquals(1, entities.size()); |
| metricCnt = 0; |
| for (TimelineEntity entity : entities) { |
| metricCnt += entity.getMetrics().size(); |
| } |
| assertEquals(2, metricCnt); |
| |
| TimelineFilterList metricFilterList2 = new TimelineFilterList( |
| new TimelineCompareFilter(TimelineCompareOp.LESS_THAN, METRIC1, 32), |
| new TimelineCompareFilter(TimelineCompareOp.NOT_EQUAL, METRIC2, 57)); |
| entities = hbr.getEntities( |
| new TimelineReaderContext(cluster, user, flow, null, null, |
| TimelineEntityType.YARN_FLOW_RUN.toString(), null), |
| new TimelineEntityFilters(null, null, null, null, null, null, null, |
| metricFilterList2, null), new TimelineDataToRetrieve(null, null, |
| EnumSet.of(Field.METRICS), null)); |
| assertEquals(0, entities.size()); |
| |
| TimelineFilterList metricFilterList3 = new TimelineFilterList( |
| new TimelineCompareFilter(TimelineCompareOp.EQUAL, "s_metric", 32)); |
| entities = hbr.getEntities( |
| new TimelineReaderContext(cluster, user, flow, null, null, |
| TimelineEntityType.YARN_FLOW_RUN.toString(), null), |
| new TimelineEntityFilters(null, null, null, null, null, null, null, |
| metricFilterList3, null), new TimelineDataToRetrieve(null, null, |
| EnumSet.of(Field.METRICS), null)); |
| assertEquals(0, entities.size()); |
| |
| TimelineFilterList list3 = new TimelineFilterList(); |
| list3.addFilter(new TimelineCompareFilter( |
| TimelineCompareOp.GREATER_OR_EQUAL, METRIC1, 101)); |
| TimelineFilterList list4 = new TimelineFilterList(); |
| list4.addFilter(new TimelineCompareFilter( |
| TimelineCompareOp.LESS_THAN, METRIC1, 43)); |
| list4.addFilter(new TimelineCompareFilter( |
| TimelineCompareOp.EQUAL, METRIC2, 57)); |
| TimelineFilterList metricFilterList4 = |
| new TimelineFilterList(Operator.OR, list3, list4); |
| TimelineFilterList metricsToRetrieve = new TimelineFilterList(Operator.OR, |
| new TimelinePrefixFilter(TimelineCompareOp.EQUAL, |
| METRIC2.substring(0, METRIC2.indexOf("_") + 1))); |
| entities = hbr.getEntities( |
| new TimelineReaderContext(cluster, user, flow, null, null, |
| TimelineEntityType.YARN_FLOW_RUN.toString(), null), |
| new TimelineEntityFilters(null, null, null, null, null, null, null, |
| metricFilterList4, null), |
| new TimelineDataToRetrieve(null, metricsToRetrieve, |
| EnumSet.of(Field.ALL), null)); |
| assertEquals(2, entities.size()); |
| metricCnt = 0; |
| for (TimelineEntity entity : entities) { |
| metricCnt += entity.getMetrics().size(); |
| } |
| assertEquals(1, metricCnt); |
| } finally { |
| if (hbr != null) { |
| hbr.close(); |
| } |
| } |
| } |
| |
| @AfterClass |
| public static void tearDownAfterClass() throws Exception { |
| util.shutdownMiniCluster(); |
| } |
| } |