YARN-5070. upgrade HBase version for first merge (Vrushali C via sjlee)
diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml
index 0a4f058..4d064e8 100644
--- a/hadoop-project/pom.xml
+++ b/hadoop-project/pom.xml
@@ -49,8 +49,8 @@
     <xerces.jdiff.version>2.11.0</xerces.jdiff.version>
 
     <kafka.version>0.8.2.1</kafka.version>
-    <hbase.version>1.0.1</hbase.version>
-    <phoenix.version>4.5.0-SNAPSHOT</phoenix.version>
+    <hbase.version>1.1.3</hbase.version>
+    <phoenix.version>4.7.0-HBase-1.1</phoenix.version>
     <hbase-compatible-hadoop.version>2.5.1</hbase-compatible-hadoop.version>
 
     <hadoop.assemblies.version>${project.version}</hadoop.assemblies.version>
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java
index 328b25a..6c4c810 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java
@@ -41,7 +41,7 @@
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity;
@@ -107,8 +107,8 @@
       // check in flow run table
       util.waitUntilAllRegionsAssigned(table);
       HRegionServer server = util.getRSForFirstRegionInTable(table);
-      List<HRegion> regions = server.getOnlineRegions(table);
-      for (HRegion region : regions) {
+      List<Region> regions = server.getOnlineRegions(table);
+      for (Region region : regions) {
         assertTrue(TimelineStorageUtils.isFlowRunTable(region.getRegionInfo(),
             hbaseConf));
       }
@@ -122,8 +122,8 @@
       // check in flow activity table
       util.waitUntilAllRegionsAssigned(table);
       HRegionServer server = util.getRSForFirstRegionInTable(table);
-      List<HRegion> regions = server.getOnlineRegions(table);
-      for (HRegion region : regions) {
+      List<Region> regions = server.getOnlineRegions(table);
+      for (Region region : regions) {
         assertFalse(TimelineStorageUtils.isFlowRunTable(region.getRegionInfo(),
             hbaseConf));
       }
@@ -137,8 +137,8 @@
       // check in entity run table
       util.waitUntilAllRegionsAssigned(table);
       HRegionServer server = util.getRSForFirstRegionInTable(table);
-      List<HRegion> regions = server.getOnlineRegions(table);
-      for (HRegion region : regions) {
+      List<Region> regions = server.getOnlineRegions(table);
+      for (Region region : regions) {
         assertFalse(TimelineStorageUtils.isFlowRunTable(region.getRegionInfo(),
             hbaseConf));
       }
@@ -311,6 +311,9 @@
     // check flow run
     checkFlowRunTable(cluster, user, flow, runid, c1);
 
+    // check various batch limits in scanning the table for this flow
+    checkFlowRunTableBatchLimit(cluster, user, flow, runid, c1);
+
     // use the timeline reader to verify data
     HBaseTimelineReaderImpl hbr = null;
     try {
@@ -350,6 +353,157 @@
     }
   }
 
+  /*
+   * checks the batch limits on a scan
+   */
+   void checkFlowRunTableBatchLimit(String cluster, String user,
+      String flow, long runid, Configuration c1) throws IOException {
+
+    Scan s = new Scan();
+    s.addFamily(FlowRunColumnFamily.INFO.getBytes());
+    byte[] startRow =  new FlowRunRowKey(cluster, user, flow, runid).getRowKey();
+    s.setStartRow(startRow);
+    // set a batch limit
+    int batchLimit = 2;
+    s.setBatch(batchLimit);
+    String clusterStop = cluster + "1";
+    byte[] stopRow = new FlowRunRowKey(clusterStop, user, flow, runid).getRowKey();
+    s.setStopRow(stopRow);
+    Connection conn = ConnectionFactory.createConnection(c1);
+    Table table1 = conn
+        .getTable(TableName.valueOf(FlowRunTable.DEFAULT_TABLE_NAME));
+    ResultScanner scanner = table1.getScanner(s);
+
+    int loopCount = 0;
+    for (Result result : scanner) {
+      assertNotNull(result);
+      assertTrue(!result.isEmpty());
+      assertTrue(result.rawCells().length <= batchLimit);
+      Map<byte[], byte[]> values = result
+          .getFamilyMap(FlowRunColumnFamily.INFO.getBytes());
+      assertNotNull(values);
+      assertTrue(values.size() <= batchLimit);
+      loopCount++;
+    }
+    assertTrue(loopCount > 0);
+
+    // test with a diff batch limit
+    s = new Scan();
+    s.addFamily(FlowRunColumnFamily.INFO.getBytes());
+    s.setStartRow(startRow);
+    // set a batch limit
+    batchLimit = 1;
+    s.setBatch(batchLimit);
+    s.setMaxResultsPerColumnFamily(2);
+    s.setStopRow(stopRow);
+    scanner = table1.getScanner(s);
+
+    loopCount = 0;
+    for (Result result : scanner) {
+      assertNotNull(result);
+      assertTrue(!result.isEmpty());
+      assertEquals(batchLimit, result.rawCells().length);
+      Map<byte[], byte[]> values = result
+          .getFamilyMap(FlowRunColumnFamily.INFO.getBytes());
+      assertNotNull(values);
+      assertEquals(batchLimit, values.size());
+      loopCount++;
+    }
+    assertTrue(loopCount > 0);
+
+    // test with a diff batch limit
+    // set it high enough
+    // we expect back 3 since there are
+    // column = m!HDFS_BYTES_READ value=57
+    // column = m!MAP_SLOT_MILLIS value=141
+    // column min_start_time value=1425016501000
+    s = new Scan();
+    s.addFamily(FlowRunColumnFamily.INFO.getBytes());
+    s.setStartRow(startRow);
+    // set a batch limit
+    batchLimit = 100;
+    s.setBatch(batchLimit);
+    s.setStopRow(stopRow);
+    scanner = table1.getScanner(s);
+
+    loopCount = 0;
+    for (Result result : scanner) {
+      assertNotNull(result);
+      assertTrue(!result.isEmpty());
+      assertTrue(result.rawCells().length <= batchLimit);
+      Map<byte[], byte[]> values = result
+          .getFamilyMap(FlowRunColumnFamily.INFO.getBytes());
+      assertNotNull(values);
+      // assert that with every next invocation
+      // we get back <= batchLimit values
+      assertTrue(values.size() <= batchLimit);
+      assertTrue(values.size() == 3); // see comment above
+      loopCount++;
+    }
+    // should loop through only once
+    assertTrue(loopCount == 1);
+
+    // set it to a negative number
+    // we expect all 3 back since there are
+    // column = m!HDFS_BYTES_READ value=57
+    // column = m!MAP_SLOT_MILLIS value=141
+    // column min_start_time value=1425016501000
+    s = new Scan();
+    s.addFamily(FlowRunColumnFamily.INFO.getBytes());
+    s.setStartRow(startRow);
+    // set a batch limit
+    batchLimit = -671;
+    s.setBatch(batchLimit);
+    s.setStopRow(stopRow);
+    scanner = table1.getScanner(s);
+
+    loopCount = 0;
+    for (Result result : scanner) {
+      assertNotNull(result);
+      assertTrue(!result.isEmpty());
+      assertEquals(3, result.rawCells().length);
+      Map<byte[], byte[]> values = result
+          .getFamilyMap(FlowRunColumnFamily.INFO.getBytes());
+      assertNotNull(values);
+      // assert that with every next invocation
+      // we get back <= batchLimit values
+      assertEquals(3, values.size());
+      loopCount++;
+    }
+    // should loop through only once
+    assertEquals(1, loopCount);
+
+    // set it to 0
+    // we expect all 3 back since there are
+    // column = m!HDFS_BYTES_READ value=57
+    // column = m!MAP_SLOT_MILLIS value=141
+    // column min_start_time value=1425016501000
+    s = new Scan();
+    s.addFamily(FlowRunColumnFamily.INFO.getBytes());
+    s.setStartRow(startRow);
+    // set a batch limit
+    batchLimit = 0;
+    s.setBatch(batchLimit);
+    s.setStopRow(stopRow);
+    scanner = table1.getScanner(s);
+
+    loopCount = 0;
+    for (Result result : scanner) {
+      assertNotNull(result);
+      assertTrue(!result.isEmpty());
+      assertEquals(3, result.rawCells().length);
+      Map<byte[], byte[]> values = result
+          .getFamilyMap(FlowRunColumnFamily.INFO.getBytes());
+      assertNotNull(values);
+      // assert that with every next invocation
+      // we get back <= batchLimit values
+      assertEquals(3, values.size());
+      loopCount++;
+    }
+    // should loop through only once
+    assertEquals(1, loopCount);
+  }
+
   private void checkFlowRunTable(String cluster, String user, String flow,
       long runid, Configuration c1) throws IOException {
     Scan s = new Scan();
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java
index e1bef53..71523b8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java
@@ -45,7 +45,7 @@
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -124,6 +124,153 @@
   }
 
   @Test
+  public void testWriteScanBatchLimit() throws Exception {
+    String rowKey = "nonNumericRowKey";
+    String column = "nonNumericColumnName";
+    String value = "nonNumericValue";
+    String column2 = "nonNumericColumnName2";
+    String value2 = "nonNumericValue2";
+    String column3 = "nonNumericColumnName3";
+    String value3 = "nonNumericValue3";
+    String column4 = "nonNumericColumnName4";
+    String value4 = "nonNumericValue4";
+
+    byte[] rowKeyBytes = Bytes.toBytes(rowKey);
+    byte[] columnNameBytes = Bytes.toBytes(column);
+    byte[] valueBytes = Bytes.toBytes(value);
+    byte[] columnName2Bytes = Bytes.toBytes(column2);
+    byte[] value2Bytes = Bytes.toBytes(value2);
+    byte[] columnName3Bytes = Bytes.toBytes(column3);
+    byte[] value3Bytes = Bytes.toBytes(value3);
+    byte[] columnName4Bytes = Bytes.toBytes(column4);
+    byte[] value4Bytes = Bytes.toBytes(value4);
+
+    Put p = new Put(rowKeyBytes);
+    p.addColumn(FlowRunColumnFamily.INFO.getBytes(), columnNameBytes,
+        valueBytes);
+    p.addColumn(FlowRunColumnFamily.INFO.getBytes(), columnName2Bytes,
+        value2Bytes);
+    p.addColumn(FlowRunColumnFamily.INFO.getBytes(), columnName3Bytes,
+        value3Bytes);
+    p.addColumn(FlowRunColumnFamily.INFO.getBytes(), columnName4Bytes,
+        value4Bytes);
+
+    Configuration hbaseConf = util.getConfiguration();
+    TableName table = TableName.valueOf(hbaseConf.get(
+        FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME));
+    Connection conn = null;
+    conn = ConnectionFactory.createConnection(hbaseConf);
+    Table flowRunTable = conn.getTable(table);
+    flowRunTable.put(p);
+
+    String rowKey2 = "nonNumericRowKey2";
+    byte[] rowKey2Bytes = Bytes.toBytes(rowKey2);
+    p = new Put(rowKey2Bytes);
+    p.addColumn(FlowRunColumnFamily.INFO.getBytes(), columnNameBytes,
+        valueBytes);
+    p.addColumn(FlowRunColumnFamily.INFO.getBytes(), columnName2Bytes,
+        value2Bytes);
+    p.addColumn(FlowRunColumnFamily.INFO.getBytes(), columnName3Bytes,
+        value3Bytes);
+    p.addColumn(FlowRunColumnFamily.INFO.getBytes(), columnName4Bytes,
+        value4Bytes);
+    flowRunTable.put(p);
+
+    String rowKey3 = "nonNumericRowKey3";
+    byte[] rowKey3Bytes = Bytes.toBytes(rowKey3);
+    p = new Put(rowKey3Bytes);
+    p.addColumn(FlowRunColumnFamily.INFO.getBytes(), columnNameBytes,
+        valueBytes);
+    p.addColumn(FlowRunColumnFamily.INFO.getBytes(), columnName2Bytes,
+        value2Bytes);
+    p.addColumn(FlowRunColumnFamily.INFO.getBytes(), columnName3Bytes,
+        value3Bytes);
+    p.addColumn(FlowRunColumnFamily.INFO.getBytes(), columnName4Bytes,
+        value4Bytes);
+    flowRunTable.put(p);
+
+    Scan s = new Scan();
+    s.addFamily(FlowRunColumnFamily.INFO.getBytes());
+    s.setStartRow(rowKeyBytes);
+    // set number of cells to fetch per scanner next invocation
+    int batchLimit = 2;
+    s.setBatch(batchLimit);
+    ResultScanner scanner = flowRunTable.getScanner(s);
+    for (Result result : scanner) {
+      assertNotNull(result);
+      assertTrue(!result.isEmpty());
+      assertTrue(result.rawCells().length <= batchLimit);
+      Map<byte[], byte[]> values = result
+          .getFamilyMap(FlowRunColumnFamily.INFO.getBytes());
+      assertTrue(values.size() <= batchLimit);
+    }
+
+    s = new Scan();
+    s.addFamily(FlowRunColumnFamily.INFO.getBytes());
+    s.setStartRow(rowKeyBytes);
+    // set number of cells to fetch per scanner next invocation
+    batchLimit = 3;
+    s.setBatch(batchLimit);
+    scanner = flowRunTable.getScanner(s);
+    for (Result result : scanner) {
+      assertNotNull(result);
+      assertTrue(!result.isEmpty());
+      assertTrue(result.rawCells().length <= batchLimit);
+      Map<byte[], byte[]> values = result
+          .getFamilyMap(FlowRunColumnFamily.INFO.getBytes());
+      assertTrue(values.size() <= batchLimit);
+    }
+
+    s = new Scan();
+    s.addFamily(FlowRunColumnFamily.INFO.getBytes());
+    s.setStartRow(rowKeyBytes);
+    // set number of cells to fetch per scanner next invocation
+    batchLimit = 1000;
+    s.setBatch(batchLimit);
+    scanner = flowRunTable.getScanner(s);
+    int rowCount = 0;
+    for (Result result : scanner) {
+      assertNotNull(result);
+      assertTrue(!result.isEmpty());
+      assertTrue(result.rawCells().length <= batchLimit);
+      Map<byte[], byte[]> values = result
+          .getFamilyMap(FlowRunColumnFamily.INFO.getBytes());
+      assertTrue(values.size() <= batchLimit);
+      // we expect all back in one next call
+      assertEquals(4, values.size());
+      rowCount++;
+    }
+    // should get back 1 row with each invocation
+    // if scan batch is set sufficiently high
+    assertEquals(3, rowCount);
+
+    // test with a negative number
+    // should have same effect as setting it to a high number
+    s = new Scan();
+    s.addFamily(FlowRunColumnFamily.INFO.getBytes());
+    s.setStartRow(rowKeyBytes);
+    // set number of cells to fetch per scanner next invocation
+    batchLimit = -2992;
+    s.setBatch(batchLimit);
+    scanner = flowRunTable.getScanner(s);
+    rowCount = 0;
+    for (Result result : scanner) {
+      assertNotNull(result);
+      assertTrue(!result.isEmpty());
+      assertEquals(4, result.rawCells().length);
+      Map<byte[], byte[]> values = result
+          .getFamilyMap(FlowRunColumnFamily.INFO.getBytes());
+      // we expect all back in one next call
+      assertEquals(4, values.size());
+      System.out.println(" values size " + values.size() +  " " + batchLimit );
+      rowCount++;
+    }
+    // should get back 1 row with each invocation
+    // if scan batch is set sufficiently high
+    assertEquals(3, rowCount);
+  }
+
+  @Test
   public void testWriteFlowRunCompaction() throws Exception {
     String cluster = "kompaction_cluster1";
     String user = "kompaction_FlowRun__user1";
@@ -176,13 +323,13 @@
     // check in flow run table
     HRegionServer server = util.getRSForFirstRegionInTable(TableName
         .valueOf(FlowRunTable.DEFAULT_TABLE_NAME));
-    List<HRegion> regions = server.getOnlineRegions(TableName
+    List<Region> regions = server.getOnlineRegions(TableName
         .valueOf(FlowRunTable.DEFAULT_TABLE_NAME));
     assertTrue("Didn't find any regions for primary table!", regions.size() > 0);
     // flush and compact all the regions of the primary table
-    for (HRegion region : regions) {
-       region.flushcache();
-      region.compactStores(true);
+    for (Region region : regions) {
+       region.flush(true);
+       region.compact(true);
     }
 
     // check flow run for one flow many apps
@@ -237,7 +384,7 @@
     request.setIsMajor(true, true);
     // okay to pass in nulls for the constructor arguments
     // because all we want to do is invoke the process summation
-    FlowScanner fs = new FlowScanner(null, -1, null,
+    FlowScanner fs = new FlowScanner(null, null,
         (request.isMajor() == true ? FlowScannerOperation.MAJOR_COMPACTION
             : FlowScannerOperation.MINOR_COMPACTION));
     assertNotNull(fs);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java
index 8ea51a1..a9dcfaa 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java
@@ -39,7 +39,7 @@
 import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.regionserver.ScanType;
@@ -59,7 +59,7 @@
   private static final Log LOG = LogFactory.getLog(FlowRunCoprocessor.class);
   private boolean isFlowRunRegion = false;
 
-  private HRegion region;
+  private Region region;
   /**
    * generate a timestamp that is unique per row in a region this is per region.
    */
@@ -178,7 +178,7 @@
     scan.setMaxVersions();
     RegionScanner scanner = null;
     try {
-      scanner = new FlowScanner(e.getEnvironment(), scan.getBatch(),
+      scanner = new FlowScanner(e.getEnvironment(), scan,
           region.getScanner(scan), FlowScannerOperation.READ);
       scanner.next(results);
       e.bypass();
@@ -233,7 +233,7 @@
     if (!isFlowRunRegion) {
       return scanner;
     }
-    return new FlowScanner(e.getEnvironment(), scan.getBatch(),
+    return new FlowScanner(e.getEnvironment(), scan,
         scanner, FlowScannerOperation.READ);
   }
 
@@ -257,7 +257,7 @@
             + " storeFilesCount=" + store.getStorefilesCount());
       }
     }
-    return new FlowScanner(c.getEnvironment(), -1, scanner,
+    return new FlowScanner(c.getEnvironment(), scanner,
         FlowScannerOperation.FLUSH);
   }
 
@@ -296,10 +296,9 @@
       requestOp = (request.isMajor() ? FlowScannerOperation.MAJOR_COMPACTION
           : FlowScannerOperation.MINOR_COMPACTION);
       LOG.info("Compactionrequest= " + request.toString() + " "
-          + requestOp.toString() + " RegionName="
-          + e.getEnvironment().getRegion().getRegionNameAsString());
+          + requestOp.toString() + " RegionName=" + e.getEnvironment()
+              .getRegion().getRegionInfo().getRegionNameAsString());
     }
-
-    return new FlowScanner(e.getEnvironment(), -1, scanner, requestOp);
+    return new FlowScanner(e.getEnvironment(), scanner, requestOp);
   }
 }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java
index 648c77b..6e67722 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java
@@ -35,10 +35,12 @@
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.ScannerContext;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Bytes.ByteArrayComparator;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -68,9 +70,9 @@
    */
   private static final String FLOW_APP_ID = "application_00000000000_0000";
 
-  private final HRegion region;
+  private final Region region;
   private final InternalScanner flowRunScanner;
-  private final int limit;
+  private final int batchSize;
   private final long appFinalValueRetentionThreshold;
   private RegionScanner regionScanner;
   private boolean hasMore;
@@ -79,9 +81,15 @@
   private int currentIndex;
   private FlowScannerOperation action = FlowScannerOperation.READ;
 
-  FlowScanner(RegionCoprocessorEnvironment env, int limit,
+  FlowScanner(RegionCoprocessorEnvironment env, InternalScanner internalScanner,
+      FlowScannerOperation action) {
+    this(env, null, internalScanner, action);
+  }
+
+  FlowScanner(RegionCoprocessorEnvironment env, Scan incomingScan,
       InternalScanner internalScanner, FlowScannerOperation action) {
-    this.limit = limit;
+    this.batchSize = incomingScan == null ? -1 : incomingScan.getBatch();
+    // TODO initialize other scan attributes like Scan#maxResultSize
     this.flowRunScanner = internalScanner;
     if (internalScanner instanceof RegionScanner) {
       this.regionScanner = (RegionScanner) internalScanner;
@@ -98,8 +106,12 @@
           YarnConfiguration.APP_FINAL_VALUE_RETENTION_THRESHOLD,
           YarnConfiguration.DEFAULT_APP_FINAL_VALUE_RETENTION_THRESHOLD);
     }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(" batch size=" + batchSize);
+    }
   }
 
+
   /*
    * (non-Javadoc)
    *
@@ -112,22 +124,24 @@
 
   @Override
   public boolean nextRaw(List<Cell> cells) throws IOException {
-    return nextRaw(cells, limit);
+    return nextRaw(cells, ScannerContext.newBuilder().build());
   }
 
   @Override
-  public boolean nextRaw(List<Cell> cells, int cellLimit) throws IOException {
-    return nextInternal(cells, cellLimit);
+  public boolean nextRaw(List<Cell> cells, ScannerContext scannerContext)
+      throws IOException {
+    return nextInternal(cells, scannerContext);
   }
 
   @Override
   public boolean next(List<Cell> cells) throws IOException {
-    return next(cells, limit);
+    return next(cells, ScannerContext.newBuilder().build());
   }
 
   @Override
-  public boolean next(List<Cell> cells, int cellLimit) throws IOException {
-    return nextInternal(cells, cellLimit);
+  public boolean next(List<Cell> cells, ScannerContext scannerContext)
+      throws IOException {
+    return nextInternal(cells, scannerContext);
   }
 
   /**
@@ -159,29 +173,17 @@
   }
 
   /**
-   * Checks if the converter is a numeric converter or not. For a converter to
-   * be numeric, it must implement {@link NumericValueConverter} interface.
-   * @param converter
-   * @return true, if converter is of type NumericValueConverter, false
-   * otherwise.
-   */
-  private static boolean isNumericConverter(ValueConverter converter) {
-    return (converter instanceof NumericValueConverter);
-  }
-
-  /**
    * This method loops through the cells in a given row of the
    * {@link FlowRunTable}. It looks at the tags of each cell to figure out how
    * to process the contents. It then calculates the sum or min or max for each
    * column or returns the cell as is.
    *
    * @param cells
-   * @param cellLimit
+   * @param scannerContext
    * @return true if next row is available for the scanner, false otherwise
    * @throws IOException
    */
-  @SuppressWarnings("deprecation")
-  private boolean nextInternal(List<Cell> cells, int cellLimit)
+  private boolean nextInternal(List<Cell> cells, ScannerContext scannerContext)
       throws IOException {
     Cell cell = null;
     startNext();
@@ -194,48 +196,47 @@
     // So all cells in one qualifier come one after the other before we see the
     // next column qualifier
     ByteArrayComparator comp = new ByteArrayComparator();
-    byte[] currentColumnQualifier = Separator.EMPTY_BYTES;
+    byte[] previousColumnQualifier = Separator.EMPTY_BYTES;
     AggregationOperation currentAggOp = null;
     SortedSet<Cell> currentColumnCells = new TreeSet<>(KeyValue.COMPARATOR);
     Set<String> alreadySeenAggDim = new HashSet<>();
     int addedCnt = 0;
     long currentTimestamp = System.currentTimeMillis();
     ValueConverter converter = null;
+    int limit = batchSize;
 
-    while (cellLimit <= 0 || addedCnt < cellLimit) {
-      cell = peekAtNextCell(cellLimit);
+    while (limit <= 0 || addedCnt < limit) {
+      cell = peekAtNextCell(scannerContext);
       if (cell == null) {
         break;
       }
-      byte[] newColumnQualifier = CellUtil.cloneQualifier(cell);
-      if (comp.compare(currentColumnQualifier, newColumnQualifier) != 0) {
-        if (converter != null && isNumericConverter(converter)) {
-          addedCnt += emitCells(cells, currentColumnCells, currentAggOp,
-              converter, currentTimestamp);
-        }
-        resetState(currentColumnCells, alreadySeenAggDim);
-        currentColumnQualifier = newColumnQualifier;
-        currentAggOp = getCurrentAggOp(cell);
-        converter = getValueConverter(newColumnQualifier);
+      byte[] currentColumnQualifier = CellUtil.cloneQualifier(cell);
+      if (previousColumnQualifier == null) {
+        // first time in loop
+        previousColumnQualifier = currentColumnQualifier;
       }
-      // No operation needs to be performed on non numeric converters.
-      if (!isNumericConverter(converter)) {
-        currentColumnCells.add(cell);
-        nextCell(cellLimit);
-        continue;
+
+      converter = getValueConverter(currentColumnQualifier);
+      if (comp.compare(previousColumnQualifier, currentColumnQualifier) != 0) {
+        addedCnt += emitCells(cells, currentColumnCells, currentAggOp,
+            converter, currentTimestamp);
+        resetState(currentColumnCells, alreadySeenAggDim);
+        previousColumnQualifier = currentColumnQualifier;
+        currentAggOp = getCurrentAggOp(cell);
+        converter = getValueConverter(currentColumnQualifier);
       }
       collectCells(currentColumnCells, currentAggOp, cell, alreadySeenAggDim,
-          (NumericValueConverter)converter);
-      nextCell(cellLimit);
+          converter, scannerContext);
+      nextCell(scannerContext);
     }
-    if (!currentColumnCells.isEmpty()) {
-      addedCnt += emitCells(cells, currentColumnCells, currentAggOp,
-          converter, currentTimestamp);
+    if ((!currentColumnCells.isEmpty()) && ((limit <= 0 || addedCnt < limit))) {
+      addedCnt += emitCells(cells, currentColumnCells, currentAggOp, converter,
+          currentTimestamp);
       if (LOG.isDebugEnabled()) {
         if (addedCnt > 0) {
           LOG.debug("emitted cells. " + addedCnt + " for " + this.action
               + " rowKey="
-              + FlowRunRowKey.parseRowKey(cells.get(0).getRow()).toString());
+              + FlowRunRowKey.parseRowKey(CellUtil.cloneRow(cells.get(0))));
         } else {
           LOG.debug("emitted no cells for " + this.action);
         }
@@ -252,7 +253,7 @@
   }
 
   /**
-   * resets the parameters to an intialized state for next loop iteration.
+   * resets the parameters to an initialized state for next loop iteration.
    *
    * @param cell
    * @param currentAggOp
@@ -268,12 +269,12 @@
 
   private void collectCells(SortedSet<Cell> currentColumnCells,
       AggregationOperation currentAggOp, Cell cell,
-      Set<String> alreadySeenAggDim, NumericValueConverter converter)
-      throws IOException {
+      Set<String> alreadySeenAggDim, ValueConverter converter,
+      ScannerContext scannerContext) throws IOException {
+
     if (currentAggOp == null) {
       // not a min/max/metric cell, so just return it as is
       currentColumnCells.add(cell);
-      nextCell(limit);
       return;
     }
 
@@ -284,7 +285,7 @@
       } else {
         Cell currentMinCell = currentColumnCells.first();
         Cell newMinCell = compareCellValues(currentMinCell, cell, currentAggOp,
-            converter);
+            (NumericValueConverter) converter);
         if (!currentMinCell.equals(newMinCell)) {
           currentColumnCells.remove(currentMinCell);
           currentColumnCells.add(newMinCell);
@@ -297,7 +298,7 @@
       } else {
         Cell currentMaxCell = currentColumnCells.first();
         Cell newMaxCell = compareCellValues(currentMaxCell, cell, currentAggOp,
-            converter);
+            (NumericValueConverter) converter);
         if (!currentMaxCell.equals(newMaxCell)) {
           currentColumnCells.remove(currentMaxCell);
           currentColumnCells.add(newMaxCell);
@@ -610,15 +611,14 @@
    * pointer to the next cell. This method can be called multiple times in a row
    * to advance through all the available cells.
    *
-   * @param cellLimit
-   *          the limit of number of cells to return if the next batch must be
-   *          fetched by the wrapped scanner
+   * @param scannerContext
+   *          context information for the batch of cells under consideration
    * @return the next available cell or null if no more cells are available for
    *         the current row
    * @throws IOException
    */
-  public Cell nextCell(int cellLimit) throws IOException {
-    Cell cell = peekAtNextCell(cellLimit);
+  public Cell nextCell(ScannerContext scannerContext) throws IOException {
+    Cell cell = peekAtNextCell(scannerContext);
     if (cell != null) {
       currentIndex++;
     }
@@ -630,20 +630,19 @@
    * pointer. Calling this method multiple times in a row will continue to
    * return the same cell.
    *
-   * @param cellLimit
-   *          the limit of number of cells to return if the next batch must be
-   *          fetched by the wrapped scanner
+   * @param scannerContext
+   *          context information for the batch of cells under consideration
    * @return the next available cell or null if no more cells are available for
    *         the current row
    * @throws IOException if any problem is encountered while grabbing the next
    *     cell.
    */
-  public Cell peekAtNextCell(int cellLimit) throws IOException {
+  public Cell peekAtNextCell(ScannerContext scannerContext) throws IOException {
     if (currentIndex >= availableCells.size()) {
       // done with current batch
       availableCells.clear();
       currentIndex = 0;
-      hasMore = flowRunScanner.next(availableCells, cellLimit);
+      hasMore = flowRunScanner.next(availableCells, scannerContext);
     }
     Cell cell = null;
     if (currentIndex < availableCells.size()) {
@@ -720,4 +719,9 @@
     }
     return regionScanner.reseek(bytes);
   }
+
+  @Override
+  public int getBatch() {
+    return batchSize;
+  }
 }