| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.hbase.regionserver; |
| |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertFalse; |
| import static org.junit.Assert.assertTrue; |
| |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.NavigableSet; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.hbase.Cell; |
| import org.apache.hadoop.hbase.CellComparatorImpl; |
| import org.apache.hadoop.hbase.CompareOperator; |
| import org.apache.hadoop.hbase.HBaseClassTestRule; |
| import org.apache.hadoop.hbase.HBaseTestingUtility; |
| import org.apache.hadoop.hbase.HConstants; |
| import org.apache.hadoop.hbase.KeepDeletedCells; |
| import org.apache.hadoop.hbase.KeyValue; |
| import org.apache.hadoop.hbase.KeyValueUtil; |
| import org.apache.hadoop.hbase.TableName; |
| import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; |
| import org.apache.hadoop.hbase.client.Put; |
| import org.apache.hadoop.hbase.client.Result; |
| import org.apache.hadoop.hbase.client.Scan; |
| import org.apache.hadoop.hbase.client.TableDescriptor; |
| import org.apache.hadoop.hbase.client.TableDescriptorBuilder; |
| import org.apache.hadoop.hbase.filter.Filter; |
| import org.apache.hadoop.hbase.filter.FilterList; |
| import org.apache.hadoop.hbase.filter.FilterList.Operator; |
| import org.apache.hadoop.hbase.filter.PageFilter; |
| import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; |
| import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; |
| import org.apache.hadoop.hbase.io.hfile.CacheConfig; |
| import org.apache.hadoop.hbase.io.hfile.HFileContext; |
| import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; |
| import org.apache.hadoop.hbase.testclassification.MediumTests; |
| import org.apache.hadoop.hbase.testclassification.RegionServerTests; |
| import org.apache.hadoop.hbase.util.Bytes; |
| import org.apache.hadoop.hbase.util.Pair; |
| import org.junit.BeforeClass; |
| import org.junit.ClassRule; |
| import org.junit.Rule; |
| import org.junit.Test; |
| import org.junit.experimental.categories.Category; |
| import org.junit.rules.TestName; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import org.apache.hbase.thirdparty.com.google.common.collect.Lists; |
| |
| /** |
| * Test cases against ReversibleKeyValueScanner |
| */ |
| @Category({RegionServerTests.class, MediumTests.class}) |
| public class TestReversibleScanners { |
| |
| @ClassRule |
| public static final HBaseClassTestRule CLASS_RULE = |
| HBaseClassTestRule.forClass(TestReversibleScanners.class); |
| |
| private static final Logger LOG = LoggerFactory.getLogger(TestReversibleScanners.class); |
| HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); |
| |
| private static byte[] FAMILYNAME = Bytes.toBytes("testCf"); |
| private static long TS = System.currentTimeMillis(); |
| private static int MAXMVCC = 7; |
| private static byte[] ROW = Bytes.toBytes("testRow"); |
| private static final int ROWSIZE = 200; |
| private static byte[][] ROWS = makeN(ROW, ROWSIZE); |
| private static byte[] QUAL = Bytes.toBytes("testQual"); |
| private static final int QUALSIZE = 5; |
| private static byte[][] QUALS = makeN(QUAL, QUALSIZE); |
| private static byte[] VALUE = Bytes.toBytes("testValue"); |
| private static final int VALUESIZE = 3; |
| private static byte[][] VALUES = makeN(VALUE, VALUESIZE); |
| |
| @Rule |
| public TestName name = new TestName(); |
| |
| @BeforeClass |
| public static void setUp() { |
| ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, |
| 0, null, MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT); |
| } |
| @Test |
| public void testReversibleStoreFileScanner() throws IOException { |
| FileSystem fs = TEST_UTIL.getTestFileSystem(); |
| Path hfilePath = new Path(new Path( |
| TEST_UTIL.getDataTestDir("testReversibleStoreFileScanner"), |
| "regionname"), "familyname"); |
| CacheConfig cacheConf = new CacheConfig(TEST_UTIL.getConfiguration()); |
| for (DataBlockEncoding encoding : DataBlockEncoding.values()) { |
| HFileContextBuilder hcBuilder = new HFileContextBuilder(); |
| hcBuilder.withBlockSize(2 * 1024); |
| hcBuilder.withDataBlockEncoding(encoding); |
| HFileContext hFileContext = hcBuilder.build(); |
| StoreFileWriter writer = new StoreFileWriter.Builder( |
| TEST_UTIL.getConfiguration(), cacheConf, fs).withOutputDir(hfilePath) |
| .withFileContext(hFileContext).build(); |
| writeStoreFile(writer); |
| |
| HStoreFile sf = new HStoreFile(fs, writer.getPath(), TEST_UTIL.getConfiguration(), cacheConf, |
| BloomType.NONE, true); |
| |
| List<StoreFileScanner> scanners = StoreFileScanner |
| .getScannersForStoreFiles(Collections.singletonList(sf), |
| false, true, false, false, Long.MAX_VALUE); |
| StoreFileScanner scanner = scanners.get(0); |
| seekTestOfReversibleKeyValueScanner(scanner); |
| for (int readPoint = 0; readPoint < MAXMVCC; readPoint++) { |
| LOG.info("Setting read point to " + readPoint); |
| scanners = StoreFileScanner.getScannersForStoreFiles( |
| Collections.singletonList(sf), false, true, false, false, readPoint); |
| seekTestOfReversibleKeyValueScannerWithMVCC(scanners, readPoint); |
| } |
| } |
| |
| } |
| |
| @Test |
| public void testReversibleMemstoreScanner() throws IOException { |
| MemStore memstore = new DefaultMemStore(); |
| writeMemstore(memstore); |
| List<KeyValueScanner> scanners = memstore.getScanners(Long.MAX_VALUE); |
| seekTestOfReversibleKeyValueScanner(scanners.get(0)); |
| for (int readPoint = 0; readPoint < MAXMVCC; readPoint++) { |
| LOG.info("Setting read point to " + readPoint); |
| scanners = memstore.getScanners(readPoint); |
| seekTestOfReversibleKeyValueScannerWithMVCC(scanners, readPoint); |
| } |
| |
| } |
| |
| @Test |
| public void testReversibleKeyValueHeap() throws IOException { |
| // write data to one memstore and two store files |
| FileSystem fs = TEST_UTIL.getTestFileSystem(); |
| Path hfilePath = new Path(new Path( |
| TEST_UTIL.getDataTestDir("testReversibleKeyValueHeap"), "regionname"), |
| "familyname"); |
| CacheConfig cacheConf = new CacheConfig(TEST_UTIL.getConfiguration()); |
| HFileContextBuilder hcBuilder = new HFileContextBuilder(); |
| hcBuilder.withBlockSize(2 * 1024); |
| HFileContext hFileContext = hcBuilder.build(); |
| StoreFileWriter writer1 = new StoreFileWriter.Builder( |
| TEST_UTIL.getConfiguration(), cacheConf, fs).withOutputDir( |
| hfilePath).withFileContext(hFileContext).build(); |
| StoreFileWriter writer2 = new StoreFileWriter.Builder( |
| TEST_UTIL.getConfiguration(), cacheConf, fs).withOutputDir( |
| hfilePath).withFileContext(hFileContext).build(); |
| |
| MemStore memstore = new DefaultMemStore(); |
| writeMemstoreAndStoreFiles(memstore, new StoreFileWriter[] { writer1, |
| writer2 }); |
| |
| HStoreFile sf1 = new HStoreFile(fs, writer1.getPath(), TEST_UTIL.getConfiguration(), cacheConf, |
| BloomType.NONE, true); |
| |
| HStoreFile sf2 = new HStoreFile(fs, writer2.getPath(), TEST_UTIL.getConfiguration(), cacheConf, |
| BloomType.NONE, true); |
| /** |
| * Test without MVCC |
| */ |
| int startRowNum = ROWSIZE / 2; |
| ReversedKeyValueHeap kvHeap = getReversibleKeyValueHeap(memstore, sf1, sf2, |
| ROWS[startRowNum], MAXMVCC); |
| internalTestSeekAndNextForReversibleKeyValueHeap(kvHeap, startRowNum); |
| |
| startRowNum = ROWSIZE - 1; |
| kvHeap = getReversibleKeyValueHeap(memstore, sf1, sf2, |
| HConstants.EMPTY_START_ROW, MAXMVCC); |
| internalTestSeekAndNextForReversibleKeyValueHeap(kvHeap, startRowNum); |
| |
| /** |
| * Test with MVCC |
| */ |
| for (int readPoint = 0; readPoint < MAXMVCC; readPoint++) { |
| LOG.info("Setting read point to " + readPoint); |
| startRowNum = ROWSIZE - 1; |
| kvHeap = getReversibleKeyValueHeap(memstore, sf1, sf2, |
| HConstants.EMPTY_START_ROW, readPoint); |
| for (int i = startRowNum; i >= 0; i--) { |
| if (i - 2 < 0) break; |
| i = i - 2; |
| kvHeap.seekToPreviousRow(KeyValueUtil.createFirstOnRow(ROWS[i + 1])); |
| Pair<Integer, Integer> nextReadableNum = getNextReadableNumWithBackwardScan( |
| i, 0, readPoint); |
| if (nextReadableNum == null) break; |
| KeyValue expecedKey = makeKV(nextReadableNum.getFirst(), |
| nextReadableNum.getSecond()); |
| assertEquals(expecedKey, kvHeap.peek()); |
| i = nextReadableNum.getFirst(); |
| int qualNum = nextReadableNum.getSecond(); |
| if (qualNum + 1 < QUALSIZE) { |
| kvHeap.backwardSeek(makeKV(i, qualNum + 1)); |
| nextReadableNum = getNextReadableNumWithBackwardScan(i, qualNum + 1, |
| readPoint); |
| if (nextReadableNum == null) break; |
| expecedKey = makeKV(nextReadableNum.getFirst(), |
| nextReadableNum.getSecond()); |
| assertEquals(expecedKey, kvHeap.peek()); |
| i = nextReadableNum.getFirst(); |
| qualNum = nextReadableNum.getSecond(); |
| } |
| |
| kvHeap.next(); |
| |
| if (qualNum + 1 >= QUALSIZE) { |
| nextReadableNum = getNextReadableNumWithBackwardScan(i - 1, 0, |
| readPoint); |
| } else { |
| nextReadableNum = getNextReadableNumWithBackwardScan(i, qualNum + 1, |
| readPoint); |
| } |
| if (nextReadableNum == null) break; |
| expecedKey = makeKV(nextReadableNum.getFirst(), |
| nextReadableNum.getSecond()); |
| assertEquals(expecedKey, kvHeap.peek()); |
| i = nextReadableNum.getFirst(); |
| } |
| } |
| } |
| |
| @Test |
| public void testReversibleStoreScanner() throws IOException { |
| // write data to one memstore and two store files |
| FileSystem fs = TEST_UTIL.getTestFileSystem(); |
| Path hfilePath = new Path(new Path( |
| TEST_UTIL.getDataTestDir("testReversibleStoreScanner"), "regionname"), |
| "familyname"); |
| CacheConfig cacheConf = new CacheConfig(TEST_UTIL.getConfiguration()); |
| HFileContextBuilder hcBuilder = new HFileContextBuilder(); |
| hcBuilder.withBlockSize(2 * 1024); |
| HFileContext hFileContext = hcBuilder.build(); |
| StoreFileWriter writer1 = new StoreFileWriter.Builder( |
| TEST_UTIL.getConfiguration(), cacheConf, fs).withOutputDir( |
| hfilePath).withFileContext(hFileContext).build(); |
| StoreFileWriter writer2 = new StoreFileWriter.Builder( |
| TEST_UTIL.getConfiguration(), cacheConf, fs).withOutputDir( |
| hfilePath).withFileContext(hFileContext).build(); |
| |
| MemStore memstore = new DefaultMemStore(); |
| writeMemstoreAndStoreFiles(memstore, new StoreFileWriter[] { writer1, |
| writer2 }); |
| |
| HStoreFile sf1 = new HStoreFile(fs, writer1.getPath(), TEST_UTIL.getConfiguration(), cacheConf, |
| BloomType.NONE, true); |
| |
| HStoreFile sf2 = new HStoreFile(fs, writer2.getPath(), TEST_UTIL.getConfiguration(), cacheConf, |
| BloomType.NONE, true); |
| |
| ScanInfo scanInfo = |
| new ScanInfo(TEST_UTIL.getConfiguration(), FAMILYNAME, 0, Integer.MAX_VALUE, Long.MAX_VALUE, |
| KeepDeletedCells.FALSE, HConstants.DEFAULT_BLOCKSIZE, 0, CellComparatorImpl.COMPARATOR, false); |
| |
| // Case 1.Test a full reversed scan |
| Scan scan = new Scan(); |
| scan.setReversed(true); |
| StoreScanner storeScanner = |
| getReversibleStoreScanner(memstore, sf1, sf2, scan, scanInfo, MAXMVCC); |
| verifyCountAndOrder(storeScanner, QUALSIZE * ROWSIZE, ROWSIZE, false); |
| |
| // Case 2.Test reversed scan with a specified start row |
| int startRowNum = ROWSIZE / 2; |
| byte[] startRow = ROWS[startRowNum]; |
| scan.withStartRow(startRow); |
| storeScanner = getReversibleStoreScanner(memstore, sf1, sf2, scan, scanInfo, MAXMVCC); |
| verifyCountAndOrder(storeScanner, QUALSIZE * (startRowNum + 1), |
| startRowNum + 1, false); |
| |
| // Case 3.Test reversed scan with a specified start row and specified |
| // qualifiers |
| assertTrue(QUALSIZE > 2); |
| scan.addColumn(FAMILYNAME, QUALS[0]); |
| scan.addColumn(FAMILYNAME, QUALS[2]); |
| storeScanner = getReversibleStoreScanner(memstore, sf1, sf2, scan, scanInfo, MAXMVCC); |
| verifyCountAndOrder(storeScanner, 2 * (startRowNum + 1), startRowNum + 1, |
| false); |
| |
| // Case 4.Test reversed scan with mvcc based on case 3 |
| for (int readPoint = 0; readPoint < MAXMVCC; readPoint++) { |
| LOG.info("Setting read point to " + readPoint); |
| storeScanner = getReversibleStoreScanner(memstore, sf1, sf2, scan, scanInfo, readPoint); |
| int expectedRowCount = 0; |
| int expectedKVCount = 0; |
| for (int i = startRowNum; i >= 0; i--) { |
| int kvCount = 0; |
| if (makeMVCC(i, 0) <= readPoint) { |
| kvCount++; |
| } |
| if (makeMVCC(i, 2) <= readPoint) { |
| kvCount++; |
| } |
| if (kvCount > 0) { |
| expectedRowCount++; |
| expectedKVCount += kvCount; |
| } |
| } |
| verifyCountAndOrder(storeScanner, expectedKVCount, expectedRowCount, |
| false); |
| } |
| } |
| |
| @Test |
| public void testReversibleRegionScanner() throws IOException { |
| byte[] FAMILYNAME2 = Bytes.toBytes("testCf2"); |
| TableDescriptor htd = TableDescriptorBuilder.newBuilder(TableName.valueOf(name.getMethodName())) |
| .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILYNAME)) |
| .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILYNAME2)).build(); |
| HRegion region = TEST_UTIL.createLocalHRegion(htd, null, null); |
| loadDataToRegion(region, FAMILYNAME2); |
| |
| // verify row count with forward scan |
| Scan scan = new Scan(); |
| InternalScanner scanner = region.getScanner(scan); |
| verifyCountAndOrder(scanner, ROWSIZE * QUALSIZE * 2, ROWSIZE, true); |
| |
| // Case1:Full reversed scan |
| scan.setReversed(true); |
| scanner = region.getScanner(scan); |
| verifyCountAndOrder(scanner, ROWSIZE * QUALSIZE * 2, ROWSIZE, false); |
| |
| // Case2:Full reversed scan with one family |
| scan = new Scan(); |
| scan.setReversed(true); |
| scan.addFamily(FAMILYNAME); |
| scanner = region.getScanner(scan); |
| verifyCountAndOrder(scanner, ROWSIZE * QUALSIZE, ROWSIZE, false); |
| |
| // Case3:Specify qualifiers + One family |
| byte[][] specifiedQualifiers = { QUALS[1], QUALS[2] }; |
| for (byte[] specifiedQualifier : specifiedQualifiers) |
| scan.addColumn(FAMILYNAME, specifiedQualifier); |
| scanner = region.getScanner(scan); |
| verifyCountAndOrder(scanner, ROWSIZE * 2, ROWSIZE, false); |
| |
| // Case4:Specify qualifiers + Two families |
| for (byte[] specifiedQualifier : specifiedQualifiers) |
| scan.addColumn(FAMILYNAME2, specifiedQualifier); |
| scanner = region.getScanner(scan); |
| verifyCountAndOrder(scanner, ROWSIZE * 2 * 2, ROWSIZE, false); |
| |
| // Case5: Case4 + specify start row |
| int startRowNum = ROWSIZE * 3 / 4; |
| scan.withStartRow(ROWS[startRowNum]); |
| scanner = region.getScanner(scan); |
| verifyCountAndOrder(scanner, (startRowNum + 1) * 2 * 2, (startRowNum + 1), |
| false); |
| |
| // Case6: Case4 + specify stop row |
| int stopRowNum = ROWSIZE / 4; |
| scan.withStartRow(HConstants.EMPTY_BYTE_ARRAY); |
| scan.withStopRow(ROWS[stopRowNum]); |
| scanner = region.getScanner(scan); |
| verifyCountAndOrder(scanner, (ROWSIZE - stopRowNum - 1) * 2 * 2, (ROWSIZE |
| - stopRowNum - 1), false); |
| |
| // Case7: Case4 + specify start row + specify stop row |
| scan.withStartRow(ROWS[startRowNum]); |
| scanner = region.getScanner(scan); |
| verifyCountAndOrder(scanner, (startRowNum - stopRowNum) * 2 * 2, |
| (startRowNum - stopRowNum), false); |
| |
| // Case8: Case7 + SingleColumnValueFilter |
| int valueNum = startRowNum % VALUESIZE; |
| Filter filter = new SingleColumnValueFilter(FAMILYNAME, |
| specifiedQualifiers[0], CompareOperator.EQUAL, VALUES[valueNum]); |
| scan.setFilter(filter); |
| scanner = region.getScanner(scan); |
| int unfilteredRowNum = (startRowNum - stopRowNum) / VALUESIZE |
| + (stopRowNum / VALUESIZE == valueNum ? 0 : 1); |
| verifyCountAndOrder(scanner, unfilteredRowNum * 2 * 2, unfilteredRowNum, |
| false); |
| |
| // Case9: Case7 + PageFilter |
| int pageSize = 10; |
| filter = new PageFilter(pageSize); |
| scan.setFilter(filter); |
| scanner = region.getScanner(scan); |
| int expectedRowNum = pageSize; |
| verifyCountAndOrder(scanner, expectedRowNum * 2 * 2, expectedRowNum, false); |
| |
| // Case10: Case7 + FilterList+MUST_PASS_ONE |
| SingleColumnValueFilter scvFilter1 = new SingleColumnValueFilter( |
| FAMILYNAME, specifiedQualifiers[0], CompareOperator.EQUAL, VALUES[0]); |
| SingleColumnValueFilter scvFilter2 = new SingleColumnValueFilter( |
| FAMILYNAME, specifiedQualifiers[0], CompareOperator.EQUAL, VALUES[1]); |
| expectedRowNum = 0; |
| for (int i = startRowNum; i > stopRowNum; i--) { |
| if (i % VALUESIZE == 0 || i % VALUESIZE == 1) { |
| expectedRowNum++; |
| } |
| } |
| filter = new FilterList(Operator.MUST_PASS_ONE, scvFilter1, scvFilter2); |
| scan.setFilter(filter); |
| scanner = region.getScanner(scan); |
| verifyCountAndOrder(scanner, expectedRowNum * 2 * 2, expectedRowNum, false); |
| |
| // Case10: Case7 + FilterList+MUST_PASS_ALL |
| filter = new FilterList(Operator.MUST_PASS_ALL, scvFilter1, scvFilter2); |
| expectedRowNum = 0; |
| scan.setFilter(filter); |
| scanner = region.getScanner(scan); |
| verifyCountAndOrder(scanner, expectedRowNum * 2 * 2, expectedRowNum, false); |
| } |
| |
| private StoreScanner getReversibleStoreScanner(MemStore memstore, HStoreFile sf1, HStoreFile sf2, |
| Scan scan, ScanInfo scanInfo, int readPoint) throws IOException { |
| List<KeyValueScanner> scanners = getScanners(memstore, sf1, sf2, null, false, readPoint); |
| NavigableSet<byte[]> columns = null; |
| for (Map.Entry<byte[], NavigableSet<byte[]>> entry : scan.getFamilyMap().entrySet()) { |
| // Should only one family |
| columns = entry.getValue(); |
| } |
| StoreScanner storeScanner = new ReversedStoreScanner(scan, scanInfo, columns, scanners); |
| return storeScanner; |
| } |
| |
| private void verifyCountAndOrder(InternalScanner scanner, |
| int expectedKVCount, int expectedRowCount, boolean forward) |
| throws IOException { |
| List<Cell> kvList = new ArrayList<>(); |
| Result lastResult = null; |
| int rowCount = 0; |
| int kvCount = 0; |
| try { |
| while (scanner.next(kvList)) { |
| if (kvList.isEmpty()) continue; |
| rowCount++; |
| kvCount += kvList.size(); |
| if (lastResult != null) { |
| Result curResult = Result.create(kvList); |
| assertEquals("LastResult:" + lastResult + "CurResult:" + curResult, |
| forward, |
| Bytes.compareTo(curResult.getRow(), lastResult.getRow()) > 0); |
| } |
| lastResult = Result.create(kvList); |
| kvList.clear(); |
| } |
| } finally { |
| scanner.close(); |
| } |
| if (!kvList.isEmpty()) { |
| rowCount++; |
| kvCount += kvList.size(); |
| kvList.clear(); |
| } |
| assertEquals(expectedKVCount, kvCount); |
| assertEquals(expectedRowCount, rowCount); |
| } |
| |
| private void internalTestSeekAndNextForReversibleKeyValueHeap( |
| ReversedKeyValueHeap kvHeap, int startRowNum) throws IOException { |
| // Test next and seek |
| for (int i = startRowNum; i >= 0; i--) { |
| if (i % 2 == 1 && i - 2 >= 0) { |
| i = i - 2; |
| kvHeap.seekToPreviousRow(KeyValueUtil.createFirstOnRow(ROWS[i + 1])); |
| } |
| for (int j = 0; j < QUALSIZE; j++) { |
| if (j % 2 == 1 && (j + 1) < QUALSIZE) { |
| j = j + 1; |
| kvHeap.backwardSeek(makeKV(i, j)); |
| } |
| assertEquals(makeKV(i, j), kvHeap.peek()); |
| kvHeap.next(); |
| } |
| } |
| assertEquals(null, kvHeap.peek()); |
| } |
| |
| private ReversedKeyValueHeap getReversibleKeyValueHeap(MemStore memstore, HStoreFile sf1, |
| HStoreFile sf2, byte[] startRow, int readPoint) throws IOException { |
| List<KeyValueScanner> scanners = getScanners(memstore, sf1, sf2, startRow, true, readPoint); |
| ReversedKeyValueHeap kvHeap = new ReversedKeyValueHeap(scanners, CellComparatorImpl.COMPARATOR); |
| return kvHeap; |
| } |
| |
| private List<KeyValueScanner> getScanners(MemStore memstore, HStoreFile sf1, HStoreFile sf2, |
| byte[] startRow, boolean doSeek, int readPoint) throws IOException { |
| List<StoreFileScanner> fileScanners = StoreFileScanner.getScannersForStoreFiles( |
| Lists.newArrayList(sf1, sf2), false, true, false, false, readPoint); |
| List<KeyValueScanner> memScanners = memstore.getScanners(readPoint); |
| List<KeyValueScanner> scanners = new ArrayList<>(fileScanners.size() + 1); |
| scanners.addAll(fileScanners); |
| scanners.addAll(memScanners); |
| |
| if (doSeek) { |
| if (Bytes.equals(HConstants.EMPTY_START_ROW, startRow)) { |
| for (KeyValueScanner scanner : scanners) { |
| scanner.seekToLastRow(); |
| } |
| } else { |
| KeyValue startKey = KeyValueUtil.createFirstOnRow(startRow); |
| for (KeyValueScanner scanner : scanners) { |
| scanner.backwardSeek(startKey); |
| } |
| } |
| } |
| return scanners; |
| } |
| |
| private void seekTestOfReversibleKeyValueScanner(KeyValueScanner scanner) |
| throws IOException { |
| /** |
| * Test without MVCC |
| */ |
| // Test seek to last row |
| assertTrue(scanner.seekToLastRow()); |
| assertEquals(makeKV(ROWSIZE - 1, 0), scanner.peek()); |
| |
| // Test backward seek in three cases |
| // Case1: seek in the same row in backwardSeek |
| KeyValue seekKey = makeKV(ROWSIZE - 2, QUALSIZE - 2); |
| assertTrue(scanner.backwardSeek(seekKey)); |
| assertEquals(seekKey, scanner.peek()); |
| |
| // Case2: seek to the previous row in backwardSeek |
| int seekRowNum = ROWSIZE - 2; |
| assertTrue(scanner.backwardSeek(KeyValueUtil.createLastOnRow(ROWS[seekRowNum]))); |
| KeyValue expectedKey = makeKV(seekRowNum - 1, 0); |
| assertEquals(expectedKey, scanner.peek()); |
| |
| // Case3: unable to backward seek |
| assertFalse(scanner.backwardSeek(KeyValueUtil.createLastOnRow(ROWS[0]))); |
| assertEquals(null, scanner.peek()); |
| |
| // Test seek to previous row |
| seekRowNum = ROWSIZE - 4; |
| assertTrue(scanner.seekToPreviousRow(KeyValueUtil |
| .createFirstOnRow(ROWS[seekRowNum]))); |
| expectedKey = makeKV(seekRowNum - 1, 0); |
| assertEquals(expectedKey, scanner.peek()); |
| |
| // Test seek to previous row for the first row |
| assertFalse(scanner.seekToPreviousRow(makeKV(0, 0))); |
| assertEquals(null, scanner.peek()); |
| |
| } |
| |
| private void seekTestOfReversibleKeyValueScannerWithMVCC( |
| List<? extends KeyValueScanner> scanners, int readPoint) throws IOException { |
| /** |
| * Test with MVCC |
| */ |
| // Test seek to last row |
| KeyValue expectedKey = getNextReadableKeyValueWithBackwardScan( |
| ROWSIZE - 1, 0, readPoint); |
| boolean res = false; |
| for (KeyValueScanner scanner : scanners) { |
| res |= scanner.seekToLastRow(); |
| } |
| assertEquals(expectedKey != null, res); |
| res = false; |
| for (KeyValueScanner scanner : scanners) { |
| res |= (expectedKey.equals(scanner.peek())); |
| } |
| assertTrue(res); |
| |
| // Test backward seek in two cases |
| // Case1: seek in the same row in backwardSeek |
| expectedKey = getNextReadableKeyValueWithBackwardScan(ROWSIZE - 2, |
| QUALSIZE - 2, readPoint); |
| res = false; |
| for (KeyValueScanner scanner : scanners) { |
| res |= scanner.backwardSeek(expectedKey); |
| } |
| assertEquals(expectedKey != null, res); |
| res = false; |
| for (KeyValueScanner scanner : scanners) { |
| res |= (expectedKey.equals(scanner.peek())); |
| } |
| assertTrue(res); |
| |
| // Case2: seek to the previous row in backwardSeek |
| int seekRowNum = ROWSIZE - 3; |
| res = false; |
| for (KeyValueScanner scanner : scanners) { |
| res |= scanner.backwardSeek(expectedKey); |
| } |
| res = false; |
| for (KeyValueScanner scanner : scanners) { |
| res |= (expectedKey.equals(scanner.peek())); |
| } |
| assertTrue(res); |
| |
| // Test seek to previous row |
| seekRowNum = ROWSIZE - 4; |
| expectedKey = getNextReadableKeyValueWithBackwardScan(seekRowNum - 1, 0, |
| readPoint); |
| res = false; |
| for (KeyValueScanner scanner : scanners) { |
| res |= scanner.seekToPreviousRow(KeyValueUtil.createFirstOnRow(ROWS[seekRowNum])); |
| } |
| assertEquals(expectedKey != null, res); |
| res = false; |
| for (KeyValueScanner scanner : scanners) { |
| res |= (expectedKey.equals(scanner.peek())); |
| } |
| assertTrue(res); |
| } |
| |
| private KeyValue getNextReadableKeyValueWithBackwardScan(int startRowNum, |
| int startQualNum, int readPoint) { |
| Pair<Integer, Integer> nextReadableNum = getNextReadableNumWithBackwardScan( |
| startRowNum, startQualNum, readPoint); |
| if (nextReadableNum == null) |
| return null; |
| return makeKV(nextReadableNum.getFirst(), nextReadableNum.getSecond()); |
| } |
| |
| private Pair<Integer, Integer> getNextReadableNumWithBackwardScan( |
| int startRowNum, int startQualNum, int readPoint) { |
| Pair<Integer, Integer> nextReadableNum = null; |
| boolean findExpected = false; |
| for (int i = startRowNum; i >= 0; i--) { |
| for (int j = (i == startRowNum ? startQualNum : 0); j < QUALSIZE; j++) { |
| if (makeMVCC(i, j) <= readPoint) { |
| nextReadableNum = new Pair<>(i, j); |
| findExpected = true; |
| break; |
| } |
| } |
| if (findExpected) |
| break; |
| } |
| return nextReadableNum; |
| } |
| |
| private static void loadDataToRegion(HRegion region, byte[] additionalFamily) |
| throws IOException { |
| for (int i = 0; i < ROWSIZE; i++) { |
| Put put = new Put(ROWS[i]); |
| for (int j = 0; j < QUALSIZE; j++) { |
| put.add(makeKV(i, j)); |
| // put additional family |
| put.add(makeKV(i, j, additionalFamily)); |
| } |
| region.put(put); |
| if (i == ROWSIZE / 3 || i == ROWSIZE * 2 / 3) { |
| region.flush(true); |
| } |
| } |
| } |
| |
| private static void writeMemstoreAndStoreFiles(MemStore memstore, |
| final StoreFileWriter[] writers) throws IOException { |
| try { |
| for (int i = 0; i < ROWSIZE; i++) { |
| for (int j = 0; j < QUALSIZE; j++) { |
| if (i % 2 == 0) { |
| memstore.add(makeKV(i, j), null); |
| } else { |
| writers[(i + j) % writers.length].append(makeKV(i, j)); |
| } |
| } |
| } |
| } finally { |
| for (int i = 0; i < writers.length; i++) { |
| writers[i].close(); |
| } |
| } |
| } |
| |
| private static void writeStoreFile(final StoreFileWriter writer) |
| throws IOException { |
| try { |
| for (int i = 0; i < ROWSIZE; i++) { |
| for (int j = 0; j < QUALSIZE; j++) { |
| writer.append(makeKV(i, j)); |
| } |
| } |
| } finally { |
| writer.close(); |
| } |
| } |
| |
| private static void writeMemstore(MemStore memstore) throws IOException { |
| // Add half of the keyvalues to memstore |
| for (int i = 0; i < ROWSIZE; i++) { |
| for (int j = 0; j < QUALSIZE; j++) { |
| if ((i + j) % 2 == 0) { |
| memstore.add(makeKV(i, j), null); |
| } |
| } |
| } |
| memstore.snapshot(); |
| // Add another half of the keyvalues to snapshot |
| for (int i = 0; i < ROWSIZE; i++) { |
| for (int j = 0; j < QUALSIZE; j++) { |
| if ((i + j) % 2 == 1) { |
| memstore.add(makeKV(i, j), null); |
| } |
| } |
| } |
| } |
| |
| private static KeyValue makeKV(int rowNum, int cqNum) { |
| return makeKV(rowNum, cqNum, FAMILYNAME); |
| } |
| |
| private static KeyValue makeKV(int rowNum, int cqNum, byte[] familyName) { |
| KeyValue kv = new KeyValue(ROWS[rowNum], familyName, QUALS[cqNum], TS, |
| VALUES[rowNum % VALUESIZE]); |
| kv.setSequenceId(makeMVCC(rowNum, cqNum)); |
| return kv; |
| } |
| |
| private static long makeMVCC(int rowNum, int cqNum) { |
| return (rowNum + cqNum) % (MAXMVCC + 1); |
| } |
| |
| private static byte[][] makeN(byte[] base, int n) { |
| byte[][] ret = new byte[n][]; |
| for (int i = 0; i < n; i++) { |
| ret[i] = Bytes.add(base, Bytes.toBytes(String.format("%04d", i))); |
| } |
| return ret; |
| } |
| } |