| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.yarn.server.timelineservice.storage.flow; |
| |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertNotEquals; |
| import static org.junit.Assert.assertNotNull; |
| import static org.junit.Assert.assertTrue; |
| |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.SortedSet; |
| import java.util.TreeSet; |
| |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.hbase.Cell; |
| import org.apache.hadoop.hbase.CellUtil; |
| import org.apache.hadoop.hbase.HBaseTestingUtility; |
| import org.apache.hadoop.hbase.KeyValue; |
| import org.apache.hadoop.hbase.TableName; |
| import org.apache.hadoop.hbase.Tag; |
| import org.apache.hadoop.hbase.client.Connection; |
| import org.apache.hadoop.hbase.client.ConnectionFactory; |
| import org.apache.hadoop.hbase.client.Get; |
| import org.apache.hadoop.hbase.client.Put; |
| import org.apache.hadoop.hbase.client.Result; |
| import org.apache.hadoop.hbase.client.ResultScanner; |
| import org.apache.hadoop.hbase.client.Scan; |
| import org.apache.hadoop.hbase.client.Table; |
| import org.apache.hadoop.hbase.regionserver.Region; |
| import org.apache.hadoop.hbase.regionserver.HRegionServer; |
| import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; |
| import org.apache.hadoop.hbase.util.Bytes; |
| import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; |
| import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; |
| import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl; |
| import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator; |
| import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper; |
| import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter; |
| import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; |
| import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimestampGenerator; |
| import org.junit.AfterClass; |
| import org.junit.Assert; |
| import org.junit.BeforeClass; |
| import org.junit.Test; |
| |
| /** |
| * Tests the FlowRun and FlowActivity Tables |
| */ |
| public class TestHBaseStorageFlowRunCompaction { |
| |
| private static HBaseTestingUtility util; |
| |
| private static final String metric1 = "MAP_SLOT_MILLIS"; |
| private static final String metric2 = "HDFS_BYTES_READ"; |
| |
| private final byte[] aRowKey = Bytes.toBytes("a"); |
| private final byte[] aFamily = Bytes.toBytes("family"); |
| private final byte[] aQualifier = Bytes.toBytes("qualifier"); |
| |
| @BeforeClass |
| public static void setupBeforeClass() throws Exception { |
| util = new HBaseTestingUtility(); |
| Configuration conf = util.getConfiguration(); |
| conf.setInt("hfile.format.version", 3); |
| util.startMiniCluster(); |
| createSchema(); |
| } |
| |
| private static void createSchema() throws IOException { |
| TimelineSchemaCreator.createAllTables(util.getConfiguration(), false); |
| } |
| |
| /** writes non numeric data into flow run table |
| * reads it back |
| * |
| * @throws Exception |
| */ |
| @Test |
| public void testWriteNonNumericData() throws Exception { |
| String rowKey = "nonNumericRowKey"; |
| String column = "nonNumericColumnName"; |
| String value = "nonNumericValue"; |
| byte[] rowKeyBytes = Bytes.toBytes(rowKey); |
| byte[] columnNameBytes = Bytes.toBytes(column); |
| byte[] valueBytes = Bytes.toBytes(value); |
| Put p = new Put(rowKeyBytes); |
| p.addColumn(FlowRunColumnFamily.INFO.getBytes(), columnNameBytes, |
| valueBytes); |
| Configuration hbaseConf = util.getConfiguration(); |
| TableName table = TableName.valueOf(hbaseConf.get( |
| FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME)); |
| Connection conn = null; |
| conn = ConnectionFactory.createConnection(hbaseConf); |
| Table flowRunTable = conn.getTable(table); |
| flowRunTable.put(p); |
| |
| Get g = new Get(rowKeyBytes); |
| Result r = flowRunTable.get(g); |
| assertNotNull(r); |
| assertTrue(r.size() >= 1); |
| Cell actualValue = r.getColumnLatestCell( |
| FlowRunColumnFamily.INFO.getBytes(), columnNameBytes); |
| assertNotNull(CellUtil.cloneValue(actualValue)); |
| assertEquals(Bytes.toString(CellUtil.cloneValue(actualValue)), value); |
| } |
| |
| @Test |
| public void testWriteScanBatchLimit() throws Exception { |
| String rowKey = "nonNumericRowKey"; |
| String column = "nonNumericColumnName"; |
| String value = "nonNumericValue"; |
| String column2 = "nonNumericColumnName2"; |
| String value2 = "nonNumericValue2"; |
| String column3 = "nonNumericColumnName3"; |
| String value3 = "nonNumericValue3"; |
| String column4 = "nonNumericColumnName4"; |
| String value4 = "nonNumericValue4"; |
| |
| byte[] rowKeyBytes = Bytes.toBytes(rowKey); |
| byte[] columnNameBytes = Bytes.toBytes(column); |
| byte[] valueBytes = Bytes.toBytes(value); |
| byte[] columnName2Bytes = Bytes.toBytes(column2); |
| byte[] value2Bytes = Bytes.toBytes(value2); |
| byte[] columnName3Bytes = Bytes.toBytes(column3); |
| byte[] value3Bytes = Bytes.toBytes(value3); |
| byte[] columnName4Bytes = Bytes.toBytes(column4); |
| byte[] value4Bytes = Bytes.toBytes(value4); |
| |
| Put p = new Put(rowKeyBytes); |
| p.addColumn(FlowRunColumnFamily.INFO.getBytes(), columnNameBytes, |
| valueBytes); |
| p.addColumn(FlowRunColumnFamily.INFO.getBytes(), columnName2Bytes, |
| value2Bytes); |
| p.addColumn(FlowRunColumnFamily.INFO.getBytes(), columnName3Bytes, |
| value3Bytes); |
| p.addColumn(FlowRunColumnFamily.INFO.getBytes(), columnName4Bytes, |
| value4Bytes); |
| |
| Configuration hbaseConf = util.getConfiguration(); |
| TableName table = TableName.valueOf(hbaseConf.get( |
| FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME)); |
| Connection conn = null; |
| conn = ConnectionFactory.createConnection(hbaseConf); |
| Table flowRunTable = conn.getTable(table); |
| flowRunTable.put(p); |
| |
| String rowKey2 = "nonNumericRowKey2"; |
| byte[] rowKey2Bytes = Bytes.toBytes(rowKey2); |
| p = new Put(rowKey2Bytes); |
| p.addColumn(FlowRunColumnFamily.INFO.getBytes(), columnNameBytes, |
| valueBytes); |
| p.addColumn(FlowRunColumnFamily.INFO.getBytes(), columnName2Bytes, |
| value2Bytes); |
| p.addColumn(FlowRunColumnFamily.INFO.getBytes(), columnName3Bytes, |
| value3Bytes); |
| p.addColumn(FlowRunColumnFamily.INFO.getBytes(), columnName4Bytes, |
| value4Bytes); |
| flowRunTable.put(p); |
| |
| String rowKey3 = "nonNumericRowKey3"; |
| byte[] rowKey3Bytes = Bytes.toBytes(rowKey3); |
| p = new Put(rowKey3Bytes); |
| p.addColumn(FlowRunColumnFamily.INFO.getBytes(), columnNameBytes, |
| valueBytes); |
| p.addColumn(FlowRunColumnFamily.INFO.getBytes(), columnName2Bytes, |
| value2Bytes); |
| p.addColumn(FlowRunColumnFamily.INFO.getBytes(), columnName3Bytes, |
| value3Bytes); |
| p.addColumn(FlowRunColumnFamily.INFO.getBytes(), columnName4Bytes, |
| value4Bytes); |
| flowRunTable.put(p); |
| |
| Scan s = new Scan(); |
| s.addFamily(FlowRunColumnFamily.INFO.getBytes()); |
| s.setStartRow(rowKeyBytes); |
| // set number of cells to fetch per scanner next invocation |
| int batchLimit = 2; |
| s.setBatch(batchLimit); |
| ResultScanner scanner = flowRunTable.getScanner(s); |
| for (Result result : scanner) { |
| assertNotNull(result); |
| assertTrue(!result.isEmpty()); |
| assertTrue(result.rawCells().length <= batchLimit); |
| Map<byte[], byte[]> values = result |
| .getFamilyMap(FlowRunColumnFamily.INFO.getBytes()); |
| assertTrue(values.size() <= batchLimit); |
| } |
| |
| s = new Scan(); |
| s.addFamily(FlowRunColumnFamily.INFO.getBytes()); |
| s.setStartRow(rowKeyBytes); |
| // set number of cells to fetch per scanner next invocation |
| batchLimit = 3; |
| s.setBatch(batchLimit); |
| scanner = flowRunTable.getScanner(s); |
| for (Result result : scanner) { |
| assertNotNull(result); |
| assertTrue(!result.isEmpty()); |
| assertTrue(result.rawCells().length <= batchLimit); |
| Map<byte[], byte[]> values = result |
| .getFamilyMap(FlowRunColumnFamily.INFO.getBytes()); |
| assertTrue(values.size() <= batchLimit); |
| } |
| |
| s = new Scan(); |
| s.addFamily(FlowRunColumnFamily.INFO.getBytes()); |
| s.setStartRow(rowKeyBytes); |
| // set number of cells to fetch per scanner next invocation |
| batchLimit = 1000; |
| s.setBatch(batchLimit); |
| scanner = flowRunTable.getScanner(s); |
| int rowCount = 0; |
| for (Result result : scanner) { |
| assertNotNull(result); |
| assertTrue(!result.isEmpty()); |
| assertTrue(result.rawCells().length <= batchLimit); |
| Map<byte[], byte[]> values = result |
| .getFamilyMap(FlowRunColumnFamily.INFO.getBytes()); |
| assertTrue(values.size() <= batchLimit); |
| // we expect all back in one next call |
| assertEquals(4, values.size()); |
| rowCount++; |
| } |
| // should get back 1 row with each invocation |
| // if scan batch is set sufficiently high |
| assertEquals(3, rowCount); |
| |
| // test with a negative number |
| // should have same effect as setting it to a high number |
| s = new Scan(); |
| s.addFamily(FlowRunColumnFamily.INFO.getBytes()); |
| s.setStartRow(rowKeyBytes); |
| // set number of cells to fetch per scanner next invocation |
| batchLimit = -2992; |
| s.setBatch(batchLimit); |
| scanner = flowRunTable.getScanner(s); |
| rowCount = 0; |
| for (Result result : scanner) { |
| assertNotNull(result); |
| assertTrue(!result.isEmpty()); |
| assertEquals(4, result.rawCells().length); |
| Map<byte[], byte[]> values = result |
| .getFamilyMap(FlowRunColumnFamily.INFO.getBytes()); |
| // we expect all back in one next call |
| assertEquals(4, values.size()); |
| System.out.println(" values size " + values.size() + " " + batchLimit ); |
| rowCount++; |
| } |
| // should get back 1 row with each invocation |
| // if scan batch is set sufficiently high |
| assertEquals(3, rowCount); |
| } |
| |
| @Test |
| public void testWriteFlowRunCompaction() throws Exception { |
| String cluster = "kompaction_cluster1"; |
| String user = "kompaction_FlowRun__user1"; |
| String flow = "kompaction_flowRun_flow_name"; |
| String flowVersion = "AF1021C19F1351"; |
| long runid = 1449526652000L; |
| |
| int start = 10; |
| int count = 2000; |
| int appIdSuffix = 1; |
| HBaseTimelineWriterImpl hbi = null; |
| long insertTs = System.currentTimeMillis() - count; |
| Configuration c1 = util.getConfiguration(); |
| TimelineEntities te1 = null; |
| TimelineEntity entityApp1 = null; |
| try { |
| hbi = new HBaseTimelineWriterImpl(c1); |
| hbi.init(c1); |
| // now insert count * ( 100 + 100) metrics |
| // each call to getEntityMetricsApp1 brings back 100 values |
| // of metric1 and 100 of metric2 |
| for (int i = start; i < start + count; i++) { |
| String appName = "application_10240000000000_" + appIdSuffix; |
| insertTs++; |
| te1 = new TimelineEntities(); |
| entityApp1 = TestFlowDataGenerator.getEntityMetricsApp1(insertTs, c1); |
| te1.addEntity(entityApp1); |
| hbi.write(cluster, user, flow, flowVersion, runid, appName, te1); |
| |
| appName = "application_2048000000000_7" + appIdSuffix; |
| insertTs++; |
| te1 = new TimelineEntities(); |
| entityApp1 = TestFlowDataGenerator.getEntityMetricsApp2(insertTs); |
| te1.addEntity(entityApp1); |
| hbi.write(cluster, user, flow, flowVersion, runid, appName, te1); |
| } |
| } finally { |
| String appName = "application_10240000000000_" + appIdSuffix; |
| te1 = new TimelineEntities(); |
| entityApp1 = TestFlowDataGenerator.getEntityMetricsApp1Complete( |
| insertTs + 1, c1); |
| te1.addEntity(entityApp1); |
| if (hbi != null) { |
| hbi.write(cluster, user, flow, flowVersion, runid, appName, te1); |
| hbi.flush(); |
| hbi.close(); |
| } |
| } |
| |
| // check in flow run table |
| HRegionServer server = util.getRSForFirstRegionInTable(TableName |
| .valueOf(FlowRunTable.DEFAULT_TABLE_NAME)); |
| List<Region> regions = server.getOnlineRegions(TableName |
| .valueOf(FlowRunTable.DEFAULT_TABLE_NAME)); |
| assertTrue("Didn't find any regions for primary table!", regions.size() > 0); |
| // flush and compact all the regions of the primary table |
| for (Region region : regions) { |
| region.flush(true); |
| region.compact(true); |
| } |
| |
| // check flow run for one flow many apps |
| checkFlowRunTable(cluster, user, flow, runid, c1, 4); |
| } |
| |
| |
| private void checkFlowRunTable(String cluster, String user, String flow, |
| long runid, Configuration c1, int valueCount) throws IOException { |
| Scan s = new Scan(); |
| s.addFamily(FlowRunColumnFamily.INFO.getBytes()); |
| byte[] startRow = new FlowRunRowKey(cluster, user, flow, runid).getRowKey(); |
| s.setStartRow(startRow); |
| String clusterStop = cluster + "1"; |
| byte[] stopRow = |
| new FlowRunRowKey(clusterStop, user, flow, runid).getRowKey(); |
| s.setStopRow(stopRow); |
| Connection conn = ConnectionFactory.createConnection(c1); |
| Table table1 = conn.getTable(TableName |
| .valueOf(FlowRunTable.DEFAULT_TABLE_NAME)); |
| ResultScanner scanner = table1.getScanner(s); |
| |
| int rowCount = 0; |
| for (Result result : scanner) { |
| assertNotNull(result); |
| assertTrue(!result.isEmpty()); |
| Map<byte[], byte[]> values = result.getFamilyMap(FlowRunColumnFamily.INFO |
| .getBytes()); |
| assertEquals(valueCount, values.size()); |
| |
| rowCount++; |
| // check metric1 |
| byte[] q = ColumnHelper.getColumnQualifier( |
| FlowRunColumnPrefix.METRIC.getColumnPrefixBytes(), metric1); |
| assertTrue(values.containsKey(q)); |
| assertEquals(141, Bytes.toLong(values.get(q))); |
| |
| // check metric2 |
| q = ColumnHelper.getColumnQualifier( |
| FlowRunColumnPrefix.METRIC.getColumnPrefixBytes(), metric2); |
| assertTrue(values.containsKey(q)); |
| assertEquals(57, Bytes.toLong(values.get(q))); |
| } |
| assertEquals(1, rowCount); |
| } |
| |
| |
| private FlowScanner getFlowScannerForTestingCompaction() { |
| // create a FlowScanner object with the sole purpose of invoking a process |
| // summation; |
| CompactionRequest request = new CompactionRequest(); |
| request.setIsMajor(true, true); |
| // okay to pass in nulls for the constructor arguments |
| // because all we want to do is invoke the process summation |
| FlowScanner fs = new FlowScanner(null, null, |
| (request.isMajor() == true ? FlowScannerOperation.MAJOR_COMPACTION |
| : FlowScannerOperation.MINOR_COMPACTION)); |
| assertNotNull(fs); |
| return fs; |
| } |
| |
| @Test |
| public void checkProcessSummationMoreCellsSumFinal2() |
| throws IOException { |
| long cellValue1 = 1236L; |
| long cellValue2 = 28L; |
| long cellValue3 = 1236L; |
| long cellValue4 = 1236L; |
| FlowScanner fs = getFlowScannerForTestingCompaction(); |
| |
| // note down the current timestamp |
| long currentTimestamp = System.currentTimeMillis(); |
| long cell1Ts = 1200120L; |
| long cell2Ts = TimestampGenerator.getSupplementedTimestamp( |
| System.currentTimeMillis(),"application_123746661110_11202"); |
| long cell3Ts = 1277719L; |
| long cell4Ts = currentTimestamp - 10; |
| |
| SortedSet<Cell> currentColumnCells = new TreeSet<Cell>(KeyValue.COMPARATOR); |
| |
| List<Tag> tags = new ArrayList<>(); |
| Tag t = new Tag(AggregationOperation.SUM_FINAL.getTagType(), |
| "application_1234588888_91188"); |
| tags.add(t); |
| byte[] tagByteArray = Tag.fromList(tags); |
| // create a cell with a VERY old timestamp and attribute SUM_FINAL |
| Cell c1 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier, |
| cell1Ts, Bytes.toBytes(cellValue1), tagByteArray); |
| currentColumnCells.add(c1); |
| |
| tags = new ArrayList<>(); |
| t = new Tag(AggregationOperation.SUM_FINAL.getTagType(), |
| "application_12700000001_29102"); |
| tags.add(t); |
| tagByteArray = Tag.fromList(tags); |
| // create a cell with a recent timestamp and attribute SUM_FINAL |
| Cell c2 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier, |
| cell2Ts, Bytes.toBytes(cellValue2), tagByteArray); |
| currentColumnCells.add(c2); |
| |
| tags = new ArrayList<>(); |
| t = new Tag(AggregationOperation.SUM.getTagType(), |
| "application_191780000000001_8195"); |
| tags.add(t); |
| tagByteArray = Tag.fromList(tags); |
| // create a cell with a VERY old timestamp but has attribute SUM |
| Cell c3 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier, |
| cell3Ts, Bytes.toBytes(cellValue3), tagByteArray); |
| currentColumnCells.add(c3); |
| |
| tags = new ArrayList<>(); |
| t = new Tag(AggregationOperation.SUM.getTagType(), |
| "application_191780000000001_98104"); |
| tags.add(t); |
| tagByteArray = Tag.fromList(tags); |
| // create a cell with a VERY old timestamp but has attribute SUM |
| Cell c4 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier, |
| cell4Ts, Bytes.toBytes(cellValue4), tagByteArray); |
| currentColumnCells.add(c4); |
| |
| List<Cell> cells = |
| fs.processSummationMajorCompaction(currentColumnCells, |
| new LongConverter(), currentTimestamp); |
| assertNotNull(cells); |
| |
| // we should be getting back 4 cells |
| // one is the flow sum cell |
| // two are the cells with SUM attribute |
| // one cell with SUM_FINAL |
| assertEquals(4, cells.size()); |
| |
| for (int i = 0; i < cells.size(); i++) { |
| Cell returnedCell = cells.get(0); |
| assertNotNull(returnedCell); |
| |
| long returnTs = returnedCell.getTimestamp(); |
| long returnValue = Bytes.toLong(CellUtil |
| .cloneValue(returnedCell)); |
| if (returnValue == cellValue2) { |
| assertTrue(returnTs == cell2Ts); |
| } else if (returnValue == cellValue3) { |
| assertTrue(returnTs == cell3Ts); |
| } else if (returnValue == cellValue4) { |
| assertTrue(returnTs == cell4Ts); |
| } else if (returnValue == cellValue1) { |
| assertTrue(returnTs != cell1Ts); |
| assertTrue(returnTs > cell1Ts); |
| assertTrue(returnTs >= currentTimestamp); |
| } else { |
| // raise a failure since we expect only these two values back |
| Assert.fail(); |
| } |
| } |
| } |
| |
| // tests with many cells |
| // of type SUM and SUM_FINAL |
| // all cells of SUM_FINAL will expire |
| @Test |
| public void checkProcessSummationMoreCellsSumFinalMany() throws IOException { |
| FlowScanner fs = getFlowScannerForTestingCompaction(); |
| int count = 200000; |
| |
| long cellValueFinal = 1000L; |
| long cellValueNotFinal = 28L; |
| |
| // note down the current timestamp |
| long currentTimestamp = System.currentTimeMillis(); |
| long cellTsFinalStart = 10001120L; |
| long cellTsFinal = cellTsFinalStart; |
| long cellTsNotFinalStart = currentTimestamp - 5; |
| long cellTsNotFinal = cellTsNotFinalStart; |
| |
| SortedSet<Cell> currentColumnCells = new TreeSet<Cell>(KeyValue.COMPARATOR); |
| List<Tag> tags = null; |
| Tag t = null; |
| Cell c1 = null; |
| |
| // insert SUM_FINAL cells |
| for (int i = 0; i < count; i++) { |
| tags = new ArrayList<>(); |
| t = new Tag(AggregationOperation.SUM_FINAL.getTagType(), |
| "application_123450000" + i + "01_19" + i); |
| tags.add(t); |
| byte[] tagByteArray = Tag.fromList(tags); |
| // create a cell with a VERY old timestamp and attribute SUM_FINAL |
| c1 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier, |
| cellTsFinal, Bytes.toBytes(cellValueFinal), tagByteArray); |
| currentColumnCells.add(c1); |
| cellTsFinal++; |
| } |
| |
| // add SUM cells |
| for (int i = 0; i < count; i++) { |
| tags = new ArrayList<>(); |
| t = new Tag(AggregationOperation.SUM.getTagType(), |
| "application_1987650000" + i + "83_911" + i); |
| tags.add(t); |
| byte[] tagByteArray = Tag.fromList(tags); |
| // create a cell with attribute SUM |
| c1 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier, |
| cellTsNotFinal, Bytes.toBytes(cellValueNotFinal), tagByteArray); |
| currentColumnCells.add(c1); |
| cellTsNotFinal++; |
| } |
| |
| List<Cell> cells = |
| fs.processSummationMajorCompaction(currentColumnCells, |
| new LongConverter(), currentTimestamp); |
| assertNotNull(cells); |
| |
| // we should be getting back count + 1 cells |
| // one is the flow sum cell |
| // others are the cells with SUM attribute |
| assertEquals(count + 1, cells.size()); |
| |
| for (int i = 0; i < cells.size(); i++) { |
| Cell returnedCell = cells.get(0); |
| assertNotNull(returnedCell); |
| |
| long returnTs = returnedCell.getTimestamp(); |
| long returnValue = Bytes.toLong(CellUtil |
| .cloneValue(returnedCell)); |
| if (returnValue == (count * cellValueFinal)) { |
| assertTrue(returnTs > (cellTsFinalStart + count)); |
| assertTrue(returnTs >= currentTimestamp); |
| } else if ((returnValue >= cellValueNotFinal) |
| && (returnValue <= cellValueNotFinal * count)) { |
| assertTrue(returnTs >= cellTsNotFinalStart); |
| assertTrue(returnTs <= cellTsNotFinalStart * count); |
| } else { |
| // raise a failure since we expect only these values back |
| Assert.fail(); |
| } |
| } |
| } |
| |
| // tests with many cells |
| // of type SUM and SUM_FINAL |
| // NOT cells of SUM_FINAL will expire |
| @Test |
| public void checkProcessSummationMoreCellsSumFinalVariedTags() throws IOException { |
| FlowScanner fs = getFlowScannerForTestingCompaction(); |
| int countFinal = 20100; |
| int countNotFinal = 1000; |
| int countFinalNotExpire = 7009; |
| |
| long cellValueFinal = 1000L; |
| long cellValueNotFinal = 28L; |
| |
| // note down the current timestamp |
| long currentTimestamp = System.currentTimeMillis(); |
| long cellTsFinalStart = 10001120L; |
| long cellTsFinal = cellTsFinalStart; |
| |
| long cellTsFinalStartNotExpire = TimestampGenerator.getSupplementedTimestamp( |
| System.currentTimeMillis(), "application_10266666661166_118821"); |
| long cellTsFinalNotExpire = cellTsFinalStartNotExpire; |
| |
| long cellTsNotFinalStart = currentTimestamp - 5; |
| long cellTsNotFinal = cellTsNotFinalStart; |
| |
| SortedSet<Cell> currentColumnCells = new TreeSet<Cell>(KeyValue.COMPARATOR); |
| List<Tag> tags = null; |
| Tag t = null; |
| Cell c1 = null; |
| |
| // insert SUM_FINAL cells which will expire |
| for (int i = 0; i < countFinal; i++) { |
| tags = new ArrayList<>(); |
| t = new Tag(AggregationOperation.SUM_FINAL.getTagType(), |
| "application_123450000" + i + "01_19" + i); |
| tags.add(t); |
| byte[] tagByteArray = Tag.fromList(tags); |
| // create a cell with a VERY old timestamp and attribute SUM_FINAL |
| c1 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier, |
| cellTsFinal, Bytes.toBytes(cellValueFinal), tagByteArray); |
| currentColumnCells.add(c1); |
| cellTsFinal++; |
| } |
| |
| // insert SUM_FINAL cells which will NOT expire |
| for (int i = 0; i < countFinalNotExpire; i++) { |
| tags = new ArrayList<>(); |
| t = new Tag(AggregationOperation.SUM_FINAL.getTagType(), |
| "application_123450000" + i + "01_19" + i); |
| tags.add(t); |
| byte[] tagByteArray = Tag.fromList(tags); |
| // create a cell with a VERY old timestamp and attribute SUM_FINAL |
| c1 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier, |
| cellTsFinalNotExpire, Bytes.toBytes(cellValueFinal), tagByteArray); |
| currentColumnCells.add(c1); |
| cellTsFinalNotExpire++; |
| } |
| |
| // add SUM cells |
| for (int i = 0; i < countNotFinal; i++) { |
| tags = new ArrayList<>(); |
| t = new Tag(AggregationOperation.SUM.getTagType(), |
| "application_1987650000" + i + "83_911" + i); |
| tags.add(t); |
| byte[] tagByteArray = Tag.fromList(tags); |
| // create a cell with attribute SUM |
| c1 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier, |
| cellTsNotFinal, Bytes.toBytes(cellValueNotFinal), tagByteArray); |
| currentColumnCells.add(c1); |
| cellTsNotFinal++; |
| } |
| |
| List<Cell> cells = |
| fs.processSummationMajorCompaction(currentColumnCells, |
| new LongConverter(), currentTimestamp); |
| assertNotNull(cells); |
| |
| // we should be getting back |
| // countNotFinal + countFinalNotExpire + 1 cells |
| // one is the flow sum cell |
| // count = the cells with SUM attribute |
| // count = the cells with SUM_FINAL attribute but not expired |
| assertEquals(countFinalNotExpire + countNotFinal + 1, cells.size()); |
| |
| for (int i = 0; i < cells.size(); i++) { |
| Cell returnedCell = cells.get(0); |
| assertNotNull(returnedCell); |
| |
| long returnTs = returnedCell.getTimestamp(); |
| long returnValue = Bytes.toLong(CellUtil |
| .cloneValue(returnedCell)); |
| if (returnValue == (countFinal * cellValueFinal)) { |
| assertTrue(returnTs > (cellTsFinalStart + countFinal)); |
| assertTrue(returnTs >= currentTimestamp); |
| } else if (returnValue == cellValueNotFinal) { |
| assertTrue(returnTs >= cellTsNotFinalStart); |
| assertTrue(returnTs <= cellTsNotFinalStart + countNotFinal); |
| } else if (returnValue == cellValueFinal){ |
| assertTrue(returnTs >= cellTsFinalStartNotExpire); |
| assertTrue(returnTs <= cellTsFinalStartNotExpire + countFinalNotExpire); |
| } else { |
| // raise a failure since we expect only these values back |
| Assert.fail(); |
| } |
| } |
| } |
| |
| @Test |
| public void testProcessSummationMoreCellsSumFinal() throws IOException { |
| FlowScanner fs = getFlowScannerForTestingCompaction(); |
| // note down the current timestamp |
| long currentTimestamp = System.currentTimeMillis(); |
| long cellValue1 = 1236L; |
| long cellValue2 = 28L; |
| |
| List<Tag> tags = new ArrayList<>(); |
| Tag t = new Tag(AggregationOperation.SUM_FINAL.getTagType(), |
| "application_1234588888_999888"); |
| tags.add(t); |
| byte[] tagByteArray = Tag.fromList(tags); |
| SortedSet<Cell> currentColumnCells = new TreeSet<Cell>(KeyValue.COMPARATOR); |
| |
| // create a cell with a VERY old timestamp and attribute SUM_FINAL |
| Cell c1 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier, |
| 120L, Bytes.toBytes(cellValue1), tagByteArray); |
| currentColumnCells.add(c1); |
| |
| tags = new ArrayList<>(); |
| t = new Tag(AggregationOperation.SUM.getTagType(), |
| "application_100000000001_119101"); |
| tags.add(t); |
| tagByteArray = Tag.fromList(tags); |
| |
| // create a cell with a VERY old timestamp but has attribute SUM |
| Cell c2 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier, |
| 130L, Bytes.toBytes(cellValue2), tagByteArray); |
| currentColumnCells.add(c2); |
| List<Cell> cells = fs.processSummationMajorCompaction(currentColumnCells, |
| new LongConverter(), currentTimestamp); |
| assertNotNull(cells); |
| |
| // we should be getting back two cells |
| // one is the flow sum cell |
| // another is the cell with SUM attribute |
| assertEquals(2, cells.size()); |
| |
| Cell returnedCell = cells.get(0); |
| assertNotNull(returnedCell); |
| long inputTs1 = c1.getTimestamp(); |
| long inputTs2 = c2.getTimestamp(); |
| |
| long returnTs = returnedCell.getTimestamp(); |
| long returnValue = Bytes.toLong(CellUtil |
| .cloneValue(returnedCell)); |
| // the returned Ts will be far greater than input ts as well as the noted |
| // current timestamp |
| if (returnValue == cellValue2) { |
| assertTrue(returnTs == inputTs2); |
| } else if (returnValue == cellValue1) { |
| assertTrue(returnTs >= currentTimestamp); |
| assertTrue(returnTs != inputTs1); |
| } else { |
| // raise a failure since we expect only these two values back |
| Assert.fail(); |
| } |
| } |
| |
| @Test |
| public void testProcessSummationOneCellSumFinal() throws IOException { |
| FlowScanner fs = getFlowScannerForTestingCompaction(); |
| |
| // note down the current timestamp |
| long currentTimestamp = System.currentTimeMillis(); |
| List<Tag> tags = new ArrayList<>(); |
| Tag t = new Tag(AggregationOperation.SUM_FINAL.getTagType(), |
| "application_123458888888_999888"); |
| tags.add(t); |
| byte[] tagByteArray = Tag.fromList(tags); |
| SortedSet<Cell> currentColumnCells = new TreeSet<Cell>(KeyValue.COMPARATOR); |
| |
| // create a cell with a VERY old timestamp |
| Cell c1 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier, |
| 120L, Bytes.toBytes(1110L), tagByteArray); |
| currentColumnCells.add(c1); |
| |
| List<Cell> cells = fs.processSummationMajorCompaction(currentColumnCells, |
| new LongConverter(), currentTimestamp); |
| assertNotNull(cells); |
| // we should not get the same cell back |
| // but we get back the flow cell |
| assertEquals(1, cells.size()); |
| |
| Cell returnedCell = cells.get(0); |
| // it's NOT the same cell |
| assertNotEquals(c1, returnedCell); |
| long inputTs = c1.getTimestamp(); |
| long returnTs = returnedCell.getTimestamp(); |
| // the returned Ts will be far greater than input ts as well as the noted |
| // current timestamp |
| assertTrue(returnTs > inputTs); |
| assertTrue(returnTs >= currentTimestamp); |
| } |
| |
| @Test |
| public void testProcessSummationOneCell() throws IOException { |
| FlowScanner fs = getFlowScannerForTestingCompaction(); |
| |
| // note down the current timestamp |
| long currentTimestamp = System.currentTimeMillis(); |
| |
| // try for 1 cell with tag SUM |
| List<Tag> tags = new ArrayList<>(); |
| Tag t = new Tag(AggregationOperation.SUM.getTagType(), |
| "application_123458888888_999888"); |
| tags.add(t); |
| byte[] tagByteArray = Tag.fromList(tags); |
| |
| SortedSet<Cell> currentColumnCells = new TreeSet<Cell>(KeyValue.COMPARATOR); |
| |
| Cell c1 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier, |
| currentTimestamp, Bytes.toBytes(1110L), tagByteArray); |
| currentColumnCells.add(c1); |
| List<Cell> cells = fs.processSummationMajorCompaction(currentColumnCells, |
| new LongConverter(), currentTimestamp); |
| assertNotNull(cells); |
| // we expect the same cell back |
| assertEquals(1, cells.size()); |
| Cell c2 = cells.get(0); |
| assertEquals(c1, c2); |
| assertEquals(currentTimestamp, c2.getTimestamp()); |
| } |
| |
| @Test |
| public void testProcessSummationEmpty() throws IOException { |
| FlowScanner fs = getFlowScannerForTestingCompaction(); |
| long currentTimestamp = System.currentTimeMillis(); |
| |
| LongConverter longConverter = new LongConverter(); |
| |
| SortedSet<Cell> currentColumnCells = null; |
| List<Cell> cells = |
| fs.processSummationMajorCompaction(currentColumnCells, longConverter, |
| currentTimestamp); |
| assertNotNull(cells); |
| assertEquals(0, cells.size()); |
| |
| currentColumnCells = new TreeSet<Cell>(KeyValue.COMPARATOR); |
| cells = |
| fs.processSummationMajorCompaction(currentColumnCells, longConverter, |
| currentTimestamp); |
| assertNotNull(cells); |
| assertEquals(0, cells.size()); |
| } |
| |
| @AfterClass |
| public static void tearDownAfterClass() throws Exception { |
| util.shutdownMiniCluster(); |
| } |
| } |