YARN-5070. upgrade HBase version for first merge (Vrushali C via sjlee)
diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml
index 0a4f058..4d064e8 100644
--- a/hadoop-project/pom.xml
+++ b/hadoop-project/pom.xml
@@ -49,8 +49,8 @@
<xerces.jdiff.version>2.11.0</xerces.jdiff.version>
<kafka.version>0.8.2.1</kafka.version>
- <hbase.version>1.0.1</hbase.version>
- <phoenix.version>4.5.0-SNAPSHOT</phoenix.version>
+ <hbase.version>1.1.3</hbase.version>
+ <phoenix.version>4.7.0-HBase-1.1</phoenix.version>
<hbase-compatible-hadoop.version>2.5.1</hbase-compatible-hadoop.version>
<hadoop.assemblies.version>${project.version}</hadoop.assemblies.version>
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java
index 328b25a..6c4c810 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java
@@ -41,7 +41,7 @@
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity;
@@ -107,8 +107,8 @@
// check in flow run table
util.waitUntilAllRegionsAssigned(table);
HRegionServer server = util.getRSForFirstRegionInTable(table);
- List<HRegion> regions = server.getOnlineRegions(table);
- for (HRegion region : regions) {
+ List<Region> regions = server.getOnlineRegions(table);
+ for (Region region : regions) {
assertTrue(TimelineStorageUtils.isFlowRunTable(region.getRegionInfo(),
hbaseConf));
}
@@ -122,8 +122,8 @@
// check in flow activity table
util.waitUntilAllRegionsAssigned(table);
HRegionServer server = util.getRSForFirstRegionInTable(table);
- List<HRegion> regions = server.getOnlineRegions(table);
- for (HRegion region : regions) {
+ List<Region> regions = server.getOnlineRegions(table);
+ for (Region region : regions) {
assertFalse(TimelineStorageUtils.isFlowRunTable(region.getRegionInfo(),
hbaseConf));
}
@@ -137,8 +137,8 @@
// check in entity run table
util.waitUntilAllRegionsAssigned(table);
HRegionServer server = util.getRSForFirstRegionInTable(table);
- List<HRegion> regions = server.getOnlineRegions(table);
- for (HRegion region : regions) {
+ List<Region> regions = server.getOnlineRegions(table);
+ for (Region region : regions) {
assertFalse(TimelineStorageUtils.isFlowRunTable(region.getRegionInfo(),
hbaseConf));
}
@@ -311,6 +311,9 @@
// check flow run
checkFlowRunTable(cluster, user, flow, runid, c1);
+ // check various batch limits in scanning the table for this flow
+ checkFlowRunTableBatchLimit(cluster, user, flow, runid, c1);
+
// use the timeline reader to verify data
HBaseTimelineReaderImpl hbr = null;
try {
@@ -350,6 +353,157 @@
}
}
+ /*
+ * checks the batch limits on a scan
+ */
+ void checkFlowRunTableBatchLimit(String cluster, String user,
+ String flow, long runid, Configuration c1) throws IOException {
+
+ Scan s = new Scan();
+ s.addFamily(FlowRunColumnFamily.INFO.getBytes());
+ byte[] startRow = new FlowRunRowKey(cluster, user, flow, runid).getRowKey();
+ s.setStartRow(startRow);
+ // set a batch limit
+ int batchLimit = 2;
+ s.setBatch(batchLimit);
+ String clusterStop = cluster + "1";
+ byte[] stopRow = new FlowRunRowKey(clusterStop, user, flow, runid).getRowKey();
+ s.setStopRow(stopRow);
+ Connection conn = ConnectionFactory.createConnection(c1);
+ Table table1 = conn
+ .getTable(TableName.valueOf(FlowRunTable.DEFAULT_TABLE_NAME));
+ ResultScanner scanner = table1.getScanner(s);
+
+ int loopCount = 0;
+ for (Result result : scanner) {
+ assertNotNull(result);
+ assertTrue(!result.isEmpty());
+ assertTrue(result.rawCells().length <= batchLimit);
+ Map<byte[], byte[]> values = result
+ .getFamilyMap(FlowRunColumnFamily.INFO.getBytes());
+ assertNotNull(values);
+ assertTrue(values.size() <= batchLimit);
+ loopCount++;
+ }
+ assertTrue(loopCount > 0);
+
+ // test with a diff batch limit
+ s = new Scan();
+ s.addFamily(FlowRunColumnFamily.INFO.getBytes());
+ s.setStartRow(startRow);
+ // set a batch limit
+ batchLimit = 1;
+ s.setBatch(batchLimit);
+ s.setMaxResultsPerColumnFamily(2);
+ s.setStopRow(stopRow);
+ scanner = table1.getScanner(s);
+
+ loopCount = 0;
+ for (Result result : scanner) {
+ assertNotNull(result);
+ assertTrue(!result.isEmpty());
+ assertEquals(batchLimit, result.rawCells().length);
+ Map<byte[], byte[]> values = result
+ .getFamilyMap(FlowRunColumnFamily.INFO.getBytes());
+ assertNotNull(values);
+ assertEquals(batchLimit, values.size());
+ loopCount++;
+ }
+ assertTrue(loopCount > 0);
+
+ // test with a diff batch limit
+ // set it high enough
+ // we expect back 3 since there are
+ // column = m!HDFS_BYTES_READ value=57
+ // column = m!MAP_SLOT_MILLIS value=141
+ // column min_start_time value=1425016501000
+ s = new Scan();
+ s.addFamily(FlowRunColumnFamily.INFO.getBytes());
+ s.setStartRow(startRow);
+ // set a batch limit
+ batchLimit = 100;
+ s.setBatch(batchLimit);
+ s.setStopRow(stopRow);
+ scanner = table1.getScanner(s);
+
+ loopCount = 0;
+ for (Result result : scanner) {
+ assertNotNull(result);
+ assertTrue(!result.isEmpty());
+ assertTrue(result.rawCells().length <= batchLimit);
+ Map<byte[], byte[]> values = result
+ .getFamilyMap(FlowRunColumnFamily.INFO.getBytes());
+ assertNotNull(values);
+ // assert that with every next invocation
+ // we get back <= batchLimit values
+ assertTrue(values.size() <= batchLimit);
+ assertTrue(values.size() == 3); // see comment above
+ loopCount++;
+ }
+ // should loop through only once
+ assertTrue(loopCount == 1);
+
+ // set it to a negative number
+ // we expect all 3 back since there are
+ // column = m!HDFS_BYTES_READ value=57
+ // column = m!MAP_SLOT_MILLIS value=141
+ // column min_start_time value=1425016501000
+ s = new Scan();
+ s.addFamily(FlowRunColumnFamily.INFO.getBytes());
+ s.setStartRow(startRow);
+ // set a batch limit
+ batchLimit = -671;
+ s.setBatch(batchLimit);
+ s.setStopRow(stopRow);
+ scanner = table1.getScanner(s);
+
+ loopCount = 0;
+ for (Result result : scanner) {
+ assertNotNull(result);
+ assertTrue(!result.isEmpty());
+ assertEquals(3, result.rawCells().length);
+ Map<byte[], byte[]> values = result
+ .getFamilyMap(FlowRunColumnFamily.INFO.getBytes());
+ assertNotNull(values);
+ // assert that with every next invocation
+ // we get back <= batchLimit values
+ assertEquals(3, values.size());
+ loopCount++;
+ }
+ // should loop through only once
+ assertEquals(1, loopCount);
+
+ // set it to 0
+ // we expect all 3 back since there are
+ // column = m!HDFS_BYTES_READ value=57
+ // column = m!MAP_SLOT_MILLIS value=141
+ // column min_start_time value=1425016501000
+ s = new Scan();
+ s.addFamily(FlowRunColumnFamily.INFO.getBytes());
+ s.setStartRow(startRow);
+ // set a batch limit
+ batchLimit = 0;
+ s.setBatch(batchLimit);
+ s.setStopRow(stopRow);
+ scanner = table1.getScanner(s);
+
+ loopCount = 0;
+ for (Result result : scanner) {
+ assertNotNull(result);
+ assertTrue(!result.isEmpty());
+ assertEquals(3, result.rawCells().length);
+ Map<byte[], byte[]> values = result
+ .getFamilyMap(FlowRunColumnFamily.INFO.getBytes());
+ assertNotNull(values);
+ // assert that with every next invocation
+ // we get back <= batchLimit values
+ assertEquals(3, values.size());
+ loopCount++;
+ }
+ // should loop through only once
+ assertEquals(1, loopCount);
+ }
+
private void checkFlowRunTable(String cluster, String user, String flow,
long runid, Configuration c1) throws IOException {
Scan s = new Scan();
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java
index e1bef53..71523b8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java
@@ -45,7 +45,7 @@
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.util.Bytes;
@@ -124,6 +124,153 @@
}
@Test
+ public void testWriteScanBatchLimit() throws Exception {
+ String rowKey = "nonNumericRowKey";
+ String column = "nonNumericColumnName";
+ String value = "nonNumericValue";
+ String column2 = "nonNumericColumnName2";
+ String value2 = "nonNumericValue2";
+ String column3 = "nonNumericColumnName3";
+ String value3 = "nonNumericValue3";
+ String column4 = "nonNumericColumnName4";
+ String value4 = "nonNumericValue4";
+
+ byte[] rowKeyBytes = Bytes.toBytes(rowKey);
+ byte[] columnNameBytes = Bytes.toBytes(column);
+ byte[] valueBytes = Bytes.toBytes(value);
+ byte[] columnName2Bytes = Bytes.toBytes(column2);
+ byte[] value2Bytes = Bytes.toBytes(value2);
+ byte[] columnName3Bytes = Bytes.toBytes(column3);
+ byte[] value3Bytes = Bytes.toBytes(value3);
+ byte[] columnName4Bytes = Bytes.toBytes(column4);
+ byte[] value4Bytes = Bytes.toBytes(value4);
+
+ Put p = new Put(rowKeyBytes);
+ p.addColumn(FlowRunColumnFamily.INFO.getBytes(), columnNameBytes,
+ valueBytes);
+ p.addColumn(FlowRunColumnFamily.INFO.getBytes(), columnName2Bytes,
+ value2Bytes);
+ p.addColumn(FlowRunColumnFamily.INFO.getBytes(), columnName3Bytes,
+ value3Bytes);
+ p.addColumn(FlowRunColumnFamily.INFO.getBytes(), columnName4Bytes,
+ value4Bytes);
+
+ Configuration hbaseConf = util.getConfiguration();
+ TableName table = TableName.valueOf(hbaseConf.get(
+ FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME));
+ Connection conn = null;
+ conn = ConnectionFactory.createConnection(hbaseConf);
+ Table flowRunTable = conn.getTable(table);
+ flowRunTable.put(p);
+
+ String rowKey2 = "nonNumericRowKey2";
+ byte[] rowKey2Bytes = Bytes.toBytes(rowKey2);
+ p = new Put(rowKey2Bytes);
+ p.addColumn(FlowRunColumnFamily.INFO.getBytes(), columnNameBytes,
+ valueBytes);
+ p.addColumn(FlowRunColumnFamily.INFO.getBytes(), columnName2Bytes,
+ value2Bytes);
+ p.addColumn(FlowRunColumnFamily.INFO.getBytes(), columnName3Bytes,
+ value3Bytes);
+ p.addColumn(FlowRunColumnFamily.INFO.getBytes(), columnName4Bytes,
+ value4Bytes);
+ flowRunTable.put(p);
+
+ String rowKey3 = "nonNumericRowKey3";
+ byte[] rowKey3Bytes = Bytes.toBytes(rowKey3);
+ p = new Put(rowKey3Bytes);
+ p.addColumn(FlowRunColumnFamily.INFO.getBytes(), columnNameBytes,
+ valueBytes);
+ p.addColumn(FlowRunColumnFamily.INFO.getBytes(), columnName2Bytes,
+ value2Bytes);
+ p.addColumn(FlowRunColumnFamily.INFO.getBytes(), columnName3Bytes,
+ value3Bytes);
+ p.addColumn(FlowRunColumnFamily.INFO.getBytes(), columnName4Bytes,
+ value4Bytes);
+ flowRunTable.put(p);
+
+ Scan s = new Scan();
+ s.addFamily(FlowRunColumnFamily.INFO.getBytes());
+ s.setStartRow(rowKeyBytes);
+ // set number of cells to fetch per scanner next invocation
+ int batchLimit = 2;
+ s.setBatch(batchLimit);
+ ResultScanner scanner = flowRunTable.getScanner(s);
+ for (Result result : scanner) {
+ assertNotNull(result);
+ assertTrue(!result.isEmpty());
+ assertTrue(result.rawCells().length <= batchLimit);
+ Map<byte[], byte[]> values = result
+ .getFamilyMap(FlowRunColumnFamily.INFO.getBytes());
+ assertTrue(values.size() <= batchLimit);
+ }
+
+ s = new Scan();
+ s.addFamily(FlowRunColumnFamily.INFO.getBytes());
+ s.setStartRow(rowKeyBytes);
+ // set number of cells to fetch per scanner next invocation
+ batchLimit = 3;
+ s.setBatch(batchLimit);
+ scanner = flowRunTable.getScanner(s);
+ for (Result result : scanner) {
+ assertNotNull(result);
+ assertTrue(!result.isEmpty());
+ assertTrue(result.rawCells().length <= batchLimit);
+ Map<byte[], byte[]> values = result
+ .getFamilyMap(FlowRunColumnFamily.INFO.getBytes());
+ assertTrue(values.size() <= batchLimit);
+ }
+
+ s = new Scan();
+ s.addFamily(FlowRunColumnFamily.INFO.getBytes());
+ s.setStartRow(rowKeyBytes);
+ // set number of cells to fetch per scanner next invocation
+ batchLimit = 1000;
+ s.setBatch(batchLimit);
+ scanner = flowRunTable.getScanner(s);
+ int rowCount = 0;
+ for (Result result : scanner) {
+ assertNotNull(result);
+ assertTrue(!result.isEmpty());
+ assertTrue(result.rawCells().length <= batchLimit);
+ Map<byte[], byte[]> values = result
+ .getFamilyMap(FlowRunColumnFamily.INFO.getBytes());
+ assertTrue(values.size() <= batchLimit);
+ // we expect all back in one next call
+ assertEquals(4, values.size());
+ rowCount++;
+ }
+ // should get back 1 row with each invocation
+ // if scan batch is set sufficiently high
+ assertEquals(3, rowCount);
+
+ // test with a negative number
+ // should have same effect as setting it to a high number
+ s = new Scan();
+ s.addFamily(FlowRunColumnFamily.INFO.getBytes());
+ s.setStartRow(rowKeyBytes);
+ // set number of cells to fetch per scanner next invocation
+ batchLimit = -2992;
+ s.setBatch(batchLimit);
+ scanner = flowRunTable.getScanner(s);
+ rowCount = 0;
+ for (Result result : scanner) {
+ assertNotNull(result);
+ assertTrue(!result.isEmpty());
+ assertEquals(4, result.rawCells().length);
+ Map<byte[], byte[]> values = result
+ .getFamilyMap(FlowRunColumnFamily.INFO.getBytes());
+ // we expect all back in one next call
+ assertEquals(4, values.size());
+ System.out.println(" values size " + values.size() + " " + batchLimit );
+ rowCount++;
+ }
+ // should get back 1 row with each invocation
+ // if scan batch is set sufficiently high
+ assertEquals(3, rowCount);
+ }
+
+ @Test
public void testWriteFlowRunCompaction() throws Exception {
String cluster = "kompaction_cluster1";
String user = "kompaction_FlowRun__user1";
@@ -176,13 +323,13 @@
// check in flow run table
HRegionServer server = util.getRSForFirstRegionInTable(TableName
.valueOf(FlowRunTable.DEFAULT_TABLE_NAME));
- List<HRegion> regions = server.getOnlineRegions(TableName
+ List<Region> regions = server.getOnlineRegions(TableName
.valueOf(FlowRunTable.DEFAULT_TABLE_NAME));
assertTrue("Didn't find any regions for primary table!", regions.size() > 0);
// flush and compact all the regions of the primary table
- for (HRegion region : regions) {
- region.flushcache();
- region.compactStores(true);
+ for (Region region : regions) {
+ region.flush(true);
+ region.compact(true);
}
// check flow run for one flow many apps
@@ -237,7 +384,7 @@
request.setIsMajor(true, true);
// okay to pass in nulls for the constructor arguments
// because all we want to do is invoke the process summation
- FlowScanner fs = new FlowScanner(null, -1, null,
+ FlowScanner fs = new FlowScanner(null, null,
(request.isMajor() == true ? FlowScannerOperation.MAJOR_COMPACTION
: FlowScannerOperation.MINOR_COMPACTION));
assertNotNull(fs);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java
index 8ea51a1..a9dcfaa 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java
@@ -39,7 +39,7 @@
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.ScanType;
@@ -59,7 +59,7 @@
private static final Log LOG = LogFactory.getLog(FlowRunCoprocessor.class);
private boolean isFlowRunRegion = false;
- private HRegion region;
+ private Region region;
/**
* generate a timestamp that is unique per row in a region this is per region.
*/
@@ -178,7 +178,7 @@
scan.setMaxVersions();
RegionScanner scanner = null;
try {
- scanner = new FlowScanner(e.getEnvironment(), scan.getBatch(),
+ scanner = new FlowScanner(e.getEnvironment(), scan,
region.getScanner(scan), FlowScannerOperation.READ);
scanner.next(results);
e.bypass();
@@ -233,7 +233,7 @@
if (!isFlowRunRegion) {
return scanner;
}
- return new FlowScanner(e.getEnvironment(), scan.getBatch(),
+ return new FlowScanner(e.getEnvironment(), scan,
scanner, FlowScannerOperation.READ);
}
@@ -257,7 +257,7 @@
+ " storeFilesCount=" + store.getStorefilesCount());
}
}
- return new FlowScanner(c.getEnvironment(), -1, scanner,
+ return new FlowScanner(c.getEnvironment(), scanner,
FlowScannerOperation.FLUSH);
}
@@ -296,10 +296,9 @@
requestOp = (request.isMajor() ? FlowScannerOperation.MAJOR_COMPACTION
: FlowScannerOperation.MINOR_COMPACTION);
LOG.info("Compactionrequest= " + request.toString() + " "
- + requestOp.toString() + " RegionName="
- + e.getEnvironment().getRegion().getRegionNameAsString());
+ + requestOp.toString() + " RegionName=" + e.getEnvironment()
+ .getRegion().getRegionInfo().getRegionNameAsString());
}
-
- return new FlowScanner(e.getEnvironment(), -1, scanner, requestOp);
+ return new FlowScanner(e.getEnvironment(), scanner, requestOp);
}
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java
index 648c77b..6e67722 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java
@@ -35,10 +35,12 @@
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.ScannerContext;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Bytes.ByteArrayComparator;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -68,9 +70,9 @@
*/
private static final String FLOW_APP_ID = "application_00000000000_0000";
- private final HRegion region;
+ private final Region region;
private final InternalScanner flowRunScanner;
- private final int limit;
+ private final int batchSize;
private final long appFinalValueRetentionThreshold;
private RegionScanner regionScanner;
private boolean hasMore;
@@ -79,9 +81,15 @@
private int currentIndex;
private FlowScannerOperation action = FlowScannerOperation.READ;
- FlowScanner(RegionCoprocessorEnvironment env, int limit,
+ FlowScanner(RegionCoprocessorEnvironment env, InternalScanner internalScanner,
+ FlowScannerOperation action) {
+ this(env, null, internalScanner, action);
+ }
+
+ FlowScanner(RegionCoprocessorEnvironment env, Scan incomingScan,
InternalScanner internalScanner, FlowScannerOperation action) {
- this.limit = limit;
+ this.batchSize = incomingScan == null ? -1 : incomingScan.getBatch();
+ // TODO initialize other scan attributes like Scan#maxResultSize
this.flowRunScanner = internalScanner;
if (internalScanner instanceof RegionScanner) {
this.regionScanner = (RegionScanner) internalScanner;
@@ -98,8 +106,12 @@
YarnConfiguration.APP_FINAL_VALUE_RETENTION_THRESHOLD,
YarnConfiguration.DEFAULT_APP_FINAL_VALUE_RETENTION_THRESHOLD);
}
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(" batch size=" + batchSize);
+ }
}
+
/*
* (non-Javadoc)
*
@@ -112,22 +124,24 @@
@Override
public boolean nextRaw(List<Cell> cells) throws IOException {
- return nextRaw(cells, limit);
+ return nextRaw(cells, ScannerContext.newBuilder().build());
}
@Override
- public boolean nextRaw(List<Cell> cells, int cellLimit) throws IOException {
- return nextInternal(cells, cellLimit);
+ public boolean nextRaw(List<Cell> cells, ScannerContext scannerContext)
+ throws IOException {
+ return nextInternal(cells, scannerContext);
}
@Override
public boolean next(List<Cell> cells) throws IOException {
- return next(cells, limit);
+ return next(cells, ScannerContext.newBuilder().build());
}
@Override
- public boolean next(List<Cell> cells, int cellLimit) throws IOException {
- return nextInternal(cells, cellLimit);
+ public boolean next(List<Cell> cells, ScannerContext scannerContext)
+ throws IOException {
+ return nextInternal(cells, scannerContext);
}
/**
@@ -159,29 +173,17 @@
}
/**
- * Checks if the converter is a numeric converter or not. For a converter to
- * be numeric, it must implement {@link NumericValueConverter} interface.
- * @param converter
- * @return true, if converter is of type NumericValueConverter, false
- * otherwise.
- */
- private static boolean isNumericConverter(ValueConverter converter) {
- return (converter instanceof NumericValueConverter);
- }
-
- /**
* This method loops through the cells in a given row of the
* {@link FlowRunTable}. It looks at the tags of each cell to figure out how
* to process the contents. It then calculates the sum or min or max for each
* column or returns the cell as is.
*
* @param cells
- * @param cellLimit
+ * @param scannerContext
* @return true if next row is available for the scanner, false otherwise
* @throws IOException
*/
- @SuppressWarnings("deprecation")
- private boolean nextInternal(List<Cell> cells, int cellLimit)
+ private boolean nextInternal(List<Cell> cells, ScannerContext scannerContext)
throws IOException {
Cell cell = null;
startNext();
@@ -194,48 +196,47 @@
// So all cells in one qualifier come one after the other before we see the
// next column qualifier
ByteArrayComparator comp = new ByteArrayComparator();
- byte[] currentColumnQualifier = Separator.EMPTY_BYTES;
+ byte[] previousColumnQualifier = Separator.EMPTY_BYTES;
AggregationOperation currentAggOp = null;
SortedSet<Cell> currentColumnCells = new TreeSet<>(KeyValue.COMPARATOR);
Set<String> alreadySeenAggDim = new HashSet<>();
int addedCnt = 0;
long currentTimestamp = System.currentTimeMillis();
ValueConverter converter = null;
+ int limit = batchSize;
- while (cellLimit <= 0 || addedCnt < cellLimit) {
- cell = peekAtNextCell(cellLimit);
+ while (limit <= 0 || addedCnt < limit) {
+ cell = peekAtNextCell(scannerContext);
if (cell == null) {
break;
}
- byte[] newColumnQualifier = CellUtil.cloneQualifier(cell);
- if (comp.compare(currentColumnQualifier, newColumnQualifier) != 0) {
- if (converter != null && isNumericConverter(converter)) {
- addedCnt += emitCells(cells, currentColumnCells, currentAggOp,
- converter, currentTimestamp);
- }
- resetState(currentColumnCells, alreadySeenAggDim);
- currentColumnQualifier = newColumnQualifier;
- currentAggOp = getCurrentAggOp(cell);
- converter = getValueConverter(newColumnQualifier);
+ byte[] currentColumnQualifier = CellUtil.cloneQualifier(cell);
+ if (previousColumnQualifier == null) {
+ // first time in loop
+ previousColumnQualifier = currentColumnQualifier;
}
- // No operation needs to be performed on non numeric converters.
- if (!isNumericConverter(converter)) {
- currentColumnCells.add(cell);
- nextCell(cellLimit);
- continue;
+
+ converter = getValueConverter(currentColumnQualifier);
+ if (comp.compare(previousColumnQualifier, currentColumnQualifier) != 0) {
+ addedCnt += emitCells(cells, currentColumnCells, currentAggOp,
+ converter, currentTimestamp);
+ resetState(currentColumnCells, alreadySeenAggDim);
+ previousColumnQualifier = currentColumnQualifier;
+ currentAggOp = getCurrentAggOp(cell);
+ converter = getValueConverter(currentColumnQualifier);
}
collectCells(currentColumnCells, currentAggOp, cell, alreadySeenAggDim,
- (NumericValueConverter)converter);
- nextCell(cellLimit);
+ converter, scannerContext);
+ nextCell(scannerContext);
}
- if (!currentColumnCells.isEmpty()) {
- addedCnt += emitCells(cells, currentColumnCells, currentAggOp,
- converter, currentTimestamp);
+ if ((!currentColumnCells.isEmpty()) && ((limit <= 0 || addedCnt < limit))) {
+ addedCnt += emitCells(cells, currentColumnCells, currentAggOp, converter,
+ currentTimestamp);
if (LOG.isDebugEnabled()) {
if (addedCnt > 0) {
LOG.debug("emitted cells. " + addedCnt + " for " + this.action
+ " rowKey="
- + FlowRunRowKey.parseRowKey(cells.get(0).getRow()).toString());
+ + FlowRunRowKey.parseRowKey(CellUtil.cloneRow(cells.get(0))));
} else {
LOG.debug("emitted no cells for " + this.action);
}
@@ -252,7 +253,7 @@
}
/**
- * resets the parameters to an intialized state for next loop iteration.
+ * resets the parameters to an initialized state for next loop iteration.
*
* @param cell
* @param currentAggOp
@@ -268,12 +269,12 @@
private void collectCells(SortedSet<Cell> currentColumnCells,
AggregationOperation currentAggOp, Cell cell,
- Set<String> alreadySeenAggDim, NumericValueConverter converter)
- throws IOException {
+ Set<String> alreadySeenAggDim, ValueConverter converter,
+ ScannerContext scannerContext) throws IOException {
+
if (currentAggOp == null) {
// not a min/max/metric cell, so just return it as is
currentColumnCells.add(cell);
- nextCell(limit);
return;
}
@@ -284,7 +285,7 @@
} else {
Cell currentMinCell = currentColumnCells.first();
Cell newMinCell = compareCellValues(currentMinCell, cell, currentAggOp,
- converter);
+ (NumericValueConverter) converter);
if (!currentMinCell.equals(newMinCell)) {
currentColumnCells.remove(currentMinCell);
currentColumnCells.add(newMinCell);
@@ -297,7 +298,7 @@
} else {
Cell currentMaxCell = currentColumnCells.first();
Cell newMaxCell = compareCellValues(currentMaxCell, cell, currentAggOp,
- converter);
+ (NumericValueConverter) converter);
if (!currentMaxCell.equals(newMaxCell)) {
currentColumnCells.remove(currentMaxCell);
currentColumnCells.add(newMaxCell);
@@ -610,15 +611,14 @@
* pointer to the next cell. This method can be called multiple times in a row
* to advance through all the available cells.
*
- * @param cellLimit
- * the limit of number of cells to return if the next batch must be
- * fetched by the wrapped scanner
+ * @param scannerContext
+ * context information for the batch of cells under consideration
* @return the next available cell or null if no more cells are available for
* the current row
* @throws IOException
*/
- public Cell nextCell(int cellLimit) throws IOException {
- Cell cell = peekAtNextCell(cellLimit);
+ public Cell nextCell(ScannerContext scannerContext) throws IOException {
+ Cell cell = peekAtNextCell(scannerContext);
if (cell != null) {
currentIndex++;
}
@@ -630,20 +630,19 @@
* pointer. Calling this method multiple times in a row will continue to
* return the same cell.
*
- * @param cellLimit
- * the limit of number of cells to return if the next batch must be
- * fetched by the wrapped scanner
+ * @param scannerContext
+ * context information for the batch of cells under consideration
* @return the next available cell or null if no more cells are available for
* the current row
* @throws IOException if any problem is encountered while grabbing the next
* cell.
*/
- public Cell peekAtNextCell(int cellLimit) throws IOException {
+ public Cell peekAtNextCell(ScannerContext scannerContext) throws IOException {
if (currentIndex >= availableCells.size()) {
// done with current batch
availableCells.clear();
currentIndex = 0;
- hasMore = flowRunScanner.next(availableCells, cellLimit);
+ hasMore = flowRunScanner.next(availableCells, scannerContext);
}
Cell cell = null;
if (currentIndex < availableCells.size()) {
@@ -720,4 +719,9 @@
}
return regionScanner.reseek(bytes);
}
+
+ @Override
+ public int getBatch() {
+ return batchSize;
+ }
}