| /* |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.hbase.regionserver; |
| |
| import java.io.IOException; |
| import java.lang.management.ManagementFactory; |
| import java.util.ArrayList; |
| import java.util.List; |
| |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.hbase.Cell; |
| import org.apache.hadoop.hbase.CellComparator; |
| import org.apache.hadoop.hbase.CellUtil; |
| import org.apache.hadoop.hbase.HBaseConfiguration; |
| import org.apache.hadoop.hbase.HBaseTestingUtility; |
| import org.apache.hadoop.hbase.HColumnDescriptor; |
| import org.apache.hadoop.hbase.HConstants; |
| import org.apache.hadoop.hbase.HRegionInfo; |
| import org.apache.hadoop.hbase.HTableDescriptor; |
| import org.apache.hadoop.hbase.KeepDeletedCells; |
| import org.apache.hadoop.hbase.KeyValue; |
| import org.apache.hadoop.hbase.KeyValueTestUtil; |
| import org.apache.hadoop.hbase.MemoryCompactionPolicy; |
| import org.apache.hadoop.hbase.TableName; |
| import org.apache.hadoop.hbase.client.Scan; |
| import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException; |
| import org.apache.hadoop.hbase.io.util.MemorySizeUtil; |
| import org.apache.hadoop.hbase.testclassification.MediumTests; |
| import org.apache.hadoop.hbase.testclassification.RegionServerTests; |
| import org.apache.hadoop.hbase.util.Bytes; |
| import org.apache.hadoop.hbase.util.EnvironmentEdge; |
| import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; |
| import org.apache.hadoop.hbase.util.Threads; |
| import org.apache.hadoop.hbase.wal.WAL; |
| import org.junit.After; |
| import org.junit.Before; |
| import org.junit.Test; |
| import org.junit.experimental.categories.Category; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertTrue; |
| |
| /** |
| * compacted memstore test case |
| */ |
| @Category({RegionServerTests.class, MediumTests.class}) |
| public class TestCompactingMemStore extends TestDefaultMemStore { |
| |
| private static final Logger LOG = LoggerFactory.getLogger(TestCompactingMemStore.class); |
| protected static ChunkCreator chunkCreator; |
| protected HRegion region; |
| protected RegionServicesForStores regionServicesForStores; |
| protected HStore store; |
| |
| ////////////////////////////////////////////////////////////////////////////// |
| // Helpers |
| ////////////////////////////////////////////////////////////////////////////// |
| protected static byte[] makeQualifier(final int i1, final int i2) { |
| return Bytes.toBytes(Integer.toString(i1) + ";" + |
| Integer.toString(i2)); |
| } |
| |
| @After |
| public void tearDown() throws Exception { |
| chunkCreator.clearChunksInPool(); |
| } |
| |
| @Override |
| @Before |
| public void setUp() throws Exception { |
| compactingSetUp(); |
| this.memstore = new MyCompactingMemStore(HBaseConfiguration.create(), CellComparator.getInstance(), |
| store, regionServicesForStores, MemoryCompactionPolicy.EAGER); |
| ((CompactingMemStore)memstore).setIndexType(CompactingMemStore.IndexType.ARRAY_MAP); |
| } |
| |
| protected void compactingSetUp() throws Exception { |
| super.internalSetUp(); |
| Configuration conf = new Configuration(); |
| conf.setBoolean(MemStoreLAB.USEMSLAB_KEY, true); |
| conf.setFloat(MemStoreLAB.CHUNK_POOL_MAXSIZE_KEY, 0.2f); |
| conf.setInt(HRegion.MEMSTORE_PERIODIC_FLUSH_INTERVAL, 1000); |
| HBaseTestingUtility hbaseUtility = HBaseTestingUtility.createLocalHTU(conf); |
| HColumnDescriptor hcd = new HColumnDescriptor(FAMILY); |
| HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("foobar")); |
| htd.addFamily(hcd); |
| HRegionInfo info = |
| new HRegionInfo(TableName.valueOf("foobar"), null, null, false); |
| WAL wal = hbaseUtility.createWal(conf, hbaseUtility.getDataTestDir(), info); |
| this.region = HRegion.createHRegion(info, hbaseUtility.getDataTestDir(), conf, htd, wal, true); |
| //this.region = hbaseUtility.createTestRegion("foobar", hcd); |
| this.regionServicesForStores = region.getRegionServicesForStores(); |
| this.store = new HStore(region, hcd, conf); |
| |
| long globalMemStoreLimit = (long) (ManagementFactory.getMemoryMXBean().getHeapMemoryUsage() |
| .getMax() * MemorySizeUtil.getGlobalMemStoreHeapPercent(conf, false)); |
| chunkCreator = ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, |
| globalMemStoreLimit, 0.4f, MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT, null); |
| assertTrue(chunkCreator != null); |
| } |
| |
| /** |
| * A simple test which verifies the 3 possible states when scanning across snapshot. |
| * |
| * @throws IOException |
| * @throws CloneNotSupportedException |
| */ |
| @Override |
| @Test |
| public void testScanAcrossSnapshot2() throws IOException, CloneNotSupportedException { |
| // we are going to the scanning across snapshot with two kvs |
| // kv1 should always be returned before kv2 |
| final byte[] one = Bytes.toBytes(1); |
| final byte[] two = Bytes.toBytes(2); |
| final byte[] f = Bytes.toBytes("f"); |
| final byte[] q = Bytes.toBytes("q"); |
| final byte[] v = Bytes.toBytes(3); |
| |
| final KeyValue kv1 = new KeyValue(one, f, q, 10, v); |
| final KeyValue kv2 = new KeyValue(two, f, q, 10, v); |
| |
| // use case 1: both kvs in kvset |
| this.memstore.add(kv1.clone(), null); |
| this.memstore.add(kv2.clone(), null); |
| verifyScanAcrossSnapshot2(kv1, kv2); |
| |
| // use case 2: both kvs in snapshot |
| this.memstore.snapshot(); |
| verifyScanAcrossSnapshot2(kv1, kv2); |
| |
| // use case 3: first in snapshot second in kvset |
| this.memstore = new CompactingMemStore(HBaseConfiguration.create(), |
| CellComparator.getInstance(), store, regionServicesForStores, |
| MemoryCompactionPolicy.EAGER); |
| this.memstore.add(kv1.clone(), null); |
| // As compaction is starting in the background the repetition |
| // of the k1 might be removed BUT the scanners created earlier |
| // should look on the OLD MutableCellSetSegment, so this should be OK... |
| this.memstore.snapshot(); |
| this.memstore.add(kv2.clone(), null); |
| verifyScanAcrossSnapshot2(kv1,kv2); |
| } |
| |
| /** |
| * Test memstore snapshots |
| * @throws IOException |
| */ |
| @Override |
| @Test |
| public void testSnapshotting() throws IOException { |
| final int snapshotCount = 5; |
| // Add some rows, run a snapshot. Do it a few times. |
| for (int i = 0; i < snapshotCount; i++) { |
| addRows(this.memstore); |
| runSnapshot(this.memstore, true); |
| assertEquals("History not being cleared", 0, this.memstore.getSnapshot().getCellsCount()); |
| } |
| } |
| |
| |
| ////////////////////////////////////////////////////////////////////////////// |
| // Get tests |
| ////////////////////////////////////////////////////////////////////////////// |
| |
| /** Test getNextRow from memstore |
| * @throws InterruptedException |
| */ |
| @Override |
| @Test |
| public void testGetNextRow() throws Exception { |
| addRows(this.memstore); |
| // Add more versions to make it a little more interesting. |
| Thread.sleep(1); |
| addRows(this.memstore); |
| Cell closestToEmpty = ((CompactingMemStore)this.memstore).getNextRow(KeyValue.LOWESTKEY); |
| assertTrue(CellComparator.getInstance().compareRows(closestToEmpty, |
| new KeyValue(Bytes.toBytes(0), System.currentTimeMillis())) == 0); |
| for (int i = 0; i < ROW_COUNT; i++) { |
| Cell nr = ((CompactingMemStore)this.memstore).getNextRow(new KeyValue(Bytes.toBytes(i), |
| System.currentTimeMillis())); |
| if (i + 1 == ROW_COUNT) { |
| assertEquals(nr, null); |
| } else { |
| assertTrue(CellComparator.getInstance().compareRows(nr, |
| new KeyValue(Bytes.toBytes(i + 1), System.currentTimeMillis())) == 0); |
| } |
| } |
| //starting from each row, validate results should contain the starting row |
| Configuration conf = HBaseConfiguration.create(); |
| for (int startRowId = 0; startRowId < ROW_COUNT; startRowId++) { |
| ScanInfo scanInfo = new ScanInfo(conf, FAMILY, 0, 1, Integer.MAX_VALUE, |
| KeepDeletedCells.FALSE, HConstants.DEFAULT_BLOCKSIZE, 0, this.memstore.getComparator(), false); |
| try (InternalScanner scanner = |
| new StoreScanner(new Scan().withStartRow(Bytes.toBytes(startRowId)), scanInfo, null, |
| memstore.getScanners(0))) { |
| List<Cell> results = new ArrayList<>(); |
| for (int i = 0; scanner.next(results); i++) { |
| int rowId = startRowId + i; |
| Cell left = results.get(0); |
| byte[] row1 = Bytes.toBytes(rowId); |
| assertTrue("Row name", |
| CellComparator.getInstance().compareRows(left, row1, 0, row1.length) == 0); |
| assertEquals("Count of columns", QUALIFIER_COUNT, results.size()); |
| List<Cell> row = new ArrayList<>(); |
| for (Cell kv : results) { |
| row.add(kv); |
| } |
| isExpectedRowWithoutTimestamps(rowId, row); |
| // Clear out set. Otherwise row results accumulate. |
| results.clear(); |
| } |
| } |
| } |
| } |
| |
| @Override |
| @Test |
| public void testGet_memstoreAndSnapShot() throws IOException { |
| byte[] row = Bytes.toBytes("testrow"); |
| byte[] fam = Bytes.toBytes("testfamily"); |
| byte[] qf1 = Bytes.toBytes("testqualifier1"); |
| byte[] qf2 = Bytes.toBytes("testqualifier2"); |
| byte[] qf3 = Bytes.toBytes("testqualifier3"); |
| byte[] qf4 = Bytes.toBytes("testqualifier4"); |
| byte[] qf5 = Bytes.toBytes("testqualifier5"); |
| byte[] val = Bytes.toBytes("testval"); |
| |
| //Setting up memstore |
| memstore.add(new KeyValue(row, fam, qf1, val), null); |
| memstore.add(new KeyValue(row, fam, qf2, val), null); |
| memstore.add(new KeyValue(row, fam, qf3, val), null); |
| //Pushing to pipeline |
| ((CompactingMemStore)memstore).flushInMemory(); |
| assertEquals(0, memstore.getSnapshot().getCellsCount()); |
| //Creating a snapshot |
| memstore.snapshot(); |
| assertEquals(3, memstore.getSnapshot().getCellsCount()); |
| //Adding value to "new" memstore |
| assertEquals(0, memstore.getActive().getCellsCount()); |
| memstore.add(new KeyValue(row, fam, qf4, val), null); |
| memstore.add(new KeyValue(row, fam, qf5, val), null); |
| assertEquals(2, memstore.getActive().getCellsCount()); |
| } |
| |
| //////////////////////////////////// |
| // Test for periodic memstore flushes |
| // based on time of oldest edit |
| //////////////////////////////////// |
| |
| /** |
| * Add keyvalues with a fixed memstoreTs, and checks that memstore size is decreased |
| * as older keyvalues are deleted from the memstore. |
| * |
| * @throws Exception |
| */ |
| @Override |
| @Test |
| public void testUpsertMemstoreSize() throws Exception { |
| MemStoreSize oldSize = memstore.size(); |
| |
| List<Cell> l = new ArrayList<>(); |
| KeyValue kv1 = KeyValueTestUtil.create("r", "f", "q", 100, "v"); |
| KeyValue kv2 = KeyValueTestUtil.create("r", "f", "q", 101, "v"); |
| KeyValue kv3 = KeyValueTestUtil.create("r", "f", "q", 102, "v"); |
| |
| kv1.setSequenceId(1); |
| kv2.setSequenceId(1); |
| kv3.setSequenceId(1); |
| l.add(kv1); |
| l.add(kv2); |
| l.add(kv3); |
| |
| this.memstore.upsert(l, 2, null);// readpoint is 2 |
| MemStoreSize newSize = this.memstore.size(); |
| assert (newSize.getDataSize() > oldSize.getDataSize()); |
| //The kv1 should be removed. |
| assert (memstore.getActive().getCellsCount() == 2); |
| |
| KeyValue kv4 = KeyValueTestUtil.create("r", "f", "q", 104, "v"); |
| kv4.setSequenceId(1); |
| l.clear(); |
| l.add(kv4); |
| this.memstore.upsert(l, 3, null); |
| assertEquals(newSize, this.memstore.size()); |
| //The kv2 should be removed. |
| assert (memstore.getActive().getCellsCount() == 2); |
| //this.memstore = null; |
| } |
| |
| /** |
| * Tests that the timeOfOldestEdit is updated correctly for the |
| * various edit operations in memstore. |
| * @throws Exception |
| */ |
| @Override |
| @Test |
| public void testUpdateToTimeOfOldestEdit() throws Exception { |
| try { |
| EnvironmentEdgeForMemstoreTest edge = new EnvironmentEdgeForMemstoreTest(); |
| EnvironmentEdgeManager.injectEdge(edge); |
| long t = memstore.timeOfOldestEdit(); |
| assertEquals(t, Long.MAX_VALUE); |
| |
| // test the case that the timeOfOldestEdit is updated after a KV add |
| memstore.add(KeyValueTestUtil.create("r", "f", "q", 100, "v"), null); |
| t = memstore.timeOfOldestEdit(); |
| assertTrue(t == 1234); |
| // The method will also assert |
| // the value is reset to Long.MAX_VALUE |
| t = runSnapshot(memstore, true); |
| |
| // test the case that the timeOfOldestEdit is updated after a KV delete |
| memstore.add(KeyValueTestUtil.create("r", "f", "q", 100, KeyValue.Type.Delete, "v"), null); |
| t = memstore.timeOfOldestEdit(); |
| assertTrue(t == 1234); |
| t = runSnapshot(memstore, true); |
| |
| // test the case that the timeOfOldestEdit is updated after a KV upsert |
| List<Cell> l = new ArrayList<>(); |
| KeyValue kv1 = KeyValueTestUtil.create("r", "f", "q", 100, "v"); |
| kv1.setSequenceId(100); |
| l.add(kv1); |
| memstore.upsert(l, 1000, null); |
| t = memstore.timeOfOldestEdit(); |
| assertTrue(t == 1234); |
| } finally { |
| EnvironmentEdgeManager.reset(); |
| } |
| } |
| |
| private long runSnapshot(final AbstractMemStore hmc, boolean useForce) |
| throws IOException { |
| // Save off old state. |
| long oldHistorySize = hmc.getSnapshot().keySize(); |
| long prevTimeStamp = hmc.timeOfOldestEdit(); |
| |
| hmc.snapshot(); |
| MemStoreSnapshot snapshot = hmc.snapshot(); |
| if (useForce) { |
| // Make some assertions about what just happened. |
| assertTrue("History size has not increased", oldHistorySize < snapshot.getDataSize()); |
| long t = hmc.timeOfOldestEdit(); |
| assertTrue("Time of oldest edit is not Long.MAX_VALUE", t == Long.MAX_VALUE); |
| hmc.clearSnapshot(snapshot.getId()); |
| } else { |
| long t = hmc.timeOfOldestEdit(); |
| assertTrue("Time of oldest edit didn't remain the same", t == prevTimeStamp); |
| } |
| return prevTimeStamp; |
| } |
| |
| private void isExpectedRowWithoutTimestamps(final int rowIndex, |
| List<Cell> kvs) { |
| int i = 0; |
| for (Cell kv : kvs) { |
| byte[] expectedColname = makeQualifier(rowIndex, i++); |
| assertTrue("Column name", CellUtil.matchingQualifier(kv, expectedColname)); |
| // Value is column name as bytes. Usually result is |
| // 100 bytes in size at least. This is the default size |
| // for BytesWriteable. For comparison, convert bytes to |
| // String and trim to remove trailing null bytes. |
| assertTrue("Content", CellUtil.matchingValue(kv, expectedColname)); |
| } |
| } |
| |
| @Test |
| public void testPuttingBackChunksAfterFlushing() throws IOException { |
| byte[] row = Bytes.toBytes("testrow"); |
| byte[] fam = Bytes.toBytes("testfamily"); |
| byte[] qf1 = Bytes.toBytes("testqualifier1"); |
| byte[] qf2 = Bytes.toBytes("testqualifier2"); |
| byte[] qf3 = Bytes.toBytes("testqualifier3"); |
| byte[] qf4 = Bytes.toBytes("testqualifier4"); |
| byte[] qf5 = Bytes.toBytes("testqualifier5"); |
| byte[] val = Bytes.toBytes("testval"); |
| |
| // Setting up memstore |
| memstore.add(new KeyValue(row, fam, qf1, val), null); |
| memstore.add(new KeyValue(row, fam, qf2, val), null); |
| memstore.add(new KeyValue(row, fam, qf3, val), null); |
| |
| // Creating a snapshot |
| MemStoreSnapshot snapshot = memstore.snapshot(); |
| assertEquals(3, memstore.getSnapshot().getCellsCount()); |
| |
| // Adding value to "new" memstore |
| assertEquals(0, memstore.getActive().getCellsCount()); |
| memstore.add(new KeyValue(row, fam, qf4, val), null); |
| memstore.add(new KeyValue(row, fam, qf5, val), null); |
| assertEquals(2, memstore.getActive().getCellsCount()); |
| // close the scanners |
| for(KeyValueScanner scanner : snapshot.getScanners()) { |
| scanner.close(); |
| } |
| memstore.clearSnapshot(snapshot.getId()); |
| |
| int chunkCount = chunkCreator.getPoolSize(); |
| assertTrue(chunkCount > 0); |
| |
| } |
| |
| @Test |
| public void testPuttingBackChunksWithOpeningScanner() |
| throws IOException { |
| byte[] row = Bytes.toBytes("testrow"); |
| byte[] fam = Bytes.toBytes("testfamily"); |
| byte[] qf1 = Bytes.toBytes("testqualifier1"); |
| byte[] qf2 = Bytes.toBytes("testqualifier2"); |
| byte[] qf3 = Bytes.toBytes("testqualifier3"); |
| byte[] qf4 = Bytes.toBytes("testqualifier4"); |
| byte[] qf5 = Bytes.toBytes("testqualifier5"); |
| byte[] qf6 = Bytes.toBytes("testqualifier6"); |
| byte[] qf7 = Bytes.toBytes("testqualifier7"); |
| byte[] val = Bytes.toBytes("testval"); |
| |
| // Setting up memstore |
| memstore.add(new KeyValue(row, fam, qf1, val), null); |
| memstore.add(new KeyValue(row, fam, qf2, val), null); |
| memstore.add(new KeyValue(row, fam, qf3, val), null); |
| |
| // Creating a snapshot |
| MemStoreSnapshot snapshot = memstore.snapshot(); |
| assertEquals(3, memstore.getSnapshot().getCellsCount()); |
| |
| // Adding value to "new" memstore |
| assertEquals(0, memstore.getActive().getCellsCount()); |
| memstore.add(new KeyValue(row, fam, qf4, val), null); |
| memstore.add(new KeyValue(row, fam, qf5, val), null); |
| assertEquals(2, memstore.getActive().getCellsCount()); |
| |
| // opening scanner before clear the snapshot |
| List<KeyValueScanner> scanners = memstore.getScanners(0); |
| // Shouldn't putting back the chunks to pool,since some scanners are opening |
| // based on their data |
| // close the scanners |
| for(KeyValueScanner scanner : snapshot.getScanners()) { |
| scanner.close(); |
| } |
| memstore.clearSnapshot(snapshot.getId()); |
| |
| assertTrue(chunkCreator.getPoolSize() == 0); |
| |
| // Chunks will be put back to pool after close scanners; |
| for (KeyValueScanner scanner : scanners) { |
| scanner.close(); |
| } |
| assertTrue(chunkCreator.getPoolSize() > 0); |
| |
| // clear chunks |
| chunkCreator.clearChunksInPool(); |
| |
| // Creating another snapshot |
| |
| snapshot = memstore.snapshot(); |
| // Adding more value |
| memstore.add(new KeyValue(row, fam, qf6, val), null); |
| memstore.add(new KeyValue(row, fam, qf7, val), null); |
| // opening scanners |
| scanners = memstore.getScanners(0); |
| // close scanners before clear the snapshot |
| for (KeyValueScanner scanner : scanners) { |
| scanner.close(); |
| } |
| // Since no opening scanner, the chunks of snapshot should be put back to |
| // pool |
| // close the scanners |
| for(KeyValueScanner scanner : snapshot.getScanners()) { |
| scanner.close(); |
| } |
| memstore.clearSnapshot(snapshot.getId()); |
| assertTrue(chunkCreator.getPoolSize() > 0); |
| } |
| |
| @Test |
| public void testPuttingBackChunksWithOpeningPipelineScanner() |
| throws IOException { |
| |
| // set memstore to do data compaction and not to use the speculative scan |
| MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.EAGER; |
| memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, |
| String.valueOf(compactionType)); |
| ((MyCompactingMemStore)memstore).initiateType(compactionType, memstore.getConfiguration()); |
| |
| byte[] row = Bytes.toBytes("testrow"); |
| byte[] fam = Bytes.toBytes("testfamily"); |
| byte[] qf1 = Bytes.toBytes("testqualifier1"); |
| byte[] qf2 = Bytes.toBytes("testqualifier2"); |
| byte[] qf3 = Bytes.toBytes("testqualifier3"); |
| byte[] val = Bytes.toBytes("testval"); |
| |
| // Setting up memstore |
| memstore.add(new KeyValue(row, fam, qf1, 1, val), null); |
| memstore.add(new KeyValue(row, fam, qf2, 1, val), null); |
| memstore.add(new KeyValue(row, fam, qf3, 1, val), null); |
| |
| // Creating a pipeline |
| ((MyCompactingMemStore)memstore).disableCompaction(); |
| ((CompactingMemStore)memstore).flushInMemory(); |
| |
| // Adding value to "new" memstore |
| assertEquals(0, memstore.getActive().getCellsCount()); |
| memstore.add(new KeyValue(row, fam, qf1, 2, val), null); |
| memstore.add(new KeyValue(row, fam, qf2, 2, val), null); |
| assertEquals(2, memstore.getActive().getCellsCount()); |
| |
| // pipeline bucket 2 |
| ((CompactingMemStore)memstore).flushInMemory(); |
| // opening scanner before force flushing |
| List<KeyValueScanner> scanners = memstore.getScanners(0); |
| // Shouldn't putting back the chunks to pool,since some scanners are opening |
| // based on their data |
| ((MyCompactingMemStore)memstore).enableCompaction(); |
| // trigger compaction |
| ((CompactingMemStore)memstore).flushInMemory(); |
| |
| // Adding value to "new" memstore |
| assertEquals(0, memstore.getActive().getCellsCount()); |
| memstore.add(new KeyValue(row, fam, qf3, 3, val), null); |
| memstore.add(new KeyValue(row, fam, qf2, 3, val), null); |
| memstore.add(new KeyValue(row, fam, qf1, 3, val), null); |
| assertEquals(3, memstore.getActive().getCellsCount()); |
| |
| assertTrue(chunkCreator.getPoolSize() == 0); |
| |
| // Chunks will be put back to pool after close scanners; |
| for (KeyValueScanner scanner : scanners) { |
| scanner.close(); |
| } |
| assertTrue(chunkCreator.getPoolSize() > 0); |
| |
| // clear chunks |
| chunkCreator.clearChunksInPool(); |
| |
| // Creating another snapshot |
| |
| MemStoreSnapshot snapshot = memstore.snapshot(); |
| // close the scanners |
| for(KeyValueScanner scanner : snapshot.getScanners()) { |
| scanner.close(); |
| } |
| memstore.clearSnapshot(snapshot.getId()); |
| |
| snapshot = memstore.snapshot(); |
| // Adding more value |
| memstore.add(new KeyValue(row, fam, qf2, 4, val), null); |
| memstore.add(new KeyValue(row, fam, qf3, 4, val), null); |
| // opening scanners |
| scanners = memstore.getScanners(0); |
| // close scanners before clear the snapshot |
| for (KeyValueScanner scanner : scanners) { |
| scanner.close(); |
| } |
| // Since no opening scanner, the chunks of snapshot should be put back to |
| // pool |
| // close the scanners |
| for(KeyValueScanner scanner : snapshot.getScanners()) { |
| scanner.close(); |
| } |
| memstore.clearSnapshot(snapshot.getId()); |
| assertTrue(chunkCreator.getPoolSize() > 0); |
| } |
| |
| ////////////////////////////////////////////////////////////////////////////// |
| // Compaction tests |
| ////////////////////////////////////////////////////////////////////////////// |
| @Test |
| public void testCompaction1Bucket() throws IOException { |
| |
| // set memstore to do basic structure flattening, the "eager" option is tested in |
| // TestCompactingToCellFlatMapMemStore |
| MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC; |
| memstore.getConfiguration() |
| .set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, String.valueOf(compactionType)); |
| ((MyCompactingMemStore)memstore).initiateType(compactionType, memstore.getConfiguration()); |
| |
| String[] keys1 = { "A", "A", "B", "C" }; //A1, A2, B3, C4 |
| |
| // test 1 bucket |
| int totalCellsLen = addRowsByKeys(memstore, keys1); |
| int oneCellOnCSLMHeapSize = 120; |
| int oneCellOnCAHeapSize = 88; |
| long totalHeapSize = MutableSegment.DEEP_OVERHEAD + 4 * oneCellOnCSLMHeapSize; |
| assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize()); |
| assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize()); |
| |
| ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and compact |
| assertEquals(0, memstore.getSnapshot().getCellsCount()); |
| // There is no compaction, as the compacting memstore type is basic. |
| // totalCellsLen remains the same |
| totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM |
| + 4 * oneCellOnCAHeapSize; |
| assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize()); |
| assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize()); |
| |
| MemStoreSize size = memstore.getFlushableSize(); |
| MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot |
| region.decrMemStoreSize(size); // simulate flusher |
| ImmutableSegment s = memstore.getSnapshot(); |
| assertEquals(4, s.getCellsCount()); |
| assertEquals(0, regionServicesForStores.getMemStoreSize()); |
| |
| memstore.clearSnapshot(snapshot.getId()); |
| } |
| |
| @Test |
| public void testCompaction2Buckets() throws IOException { |
| |
| // set memstore to do basic structure flattening, the "eager" option is tested in |
| // TestCompactingToCellFlatMapMemStore |
| MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC; |
| memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, |
| String.valueOf(compactionType)); |
| memstore.getConfiguration().set(MemStoreCompactionStrategy.COMPACTING_MEMSTORE_THRESHOLD_KEY, |
| String.valueOf(1)); |
| ((MyCompactingMemStore)memstore).initiateType(compactionType, memstore.getConfiguration()); |
| String[] keys1 = { "A", "A", "B", "C" }; |
| String[] keys2 = { "A", "B", "D" }; |
| |
| int totalCellsLen1 = addRowsByKeys(memstore, keys1); |
| int oneCellOnCSLMHeapSize = 120; |
| int oneCellOnCAHeapSize = 88; |
| long totalHeapSize = MutableSegment.DEEP_OVERHEAD + 4 * oneCellOnCSLMHeapSize; |
| |
| assertEquals(totalCellsLen1, regionServicesForStores.getMemStoreSize()); |
| assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize()); |
| |
| ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and compact |
| int counter = 0; |
| for ( Segment s : memstore.getSegments()) { |
| counter += s.getCellsCount(); |
| } |
| assertEquals(4, counter); |
| assertEquals(0, memstore.getSnapshot().getCellsCount()); |
| // There is no compaction, as the compacting memstore type is basic. |
| // totalCellsLen remains the same |
| assertEquals(totalCellsLen1, regionServicesForStores.getMemStoreSize()); |
| totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM |
| + 4 * oneCellOnCAHeapSize; |
| assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize()); |
| |
| int totalCellsLen2 = addRowsByKeys(memstore, keys2); |
| totalHeapSize += 3 * oneCellOnCSLMHeapSize; |
| assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemStoreSize()); |
| assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize()); |
| |
| MemStoreSize size = memstore.getFlushableSize(); |
| ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and compact |
| assertEquals(0, memstore.getSnapshot().getCellsCount()); |
| assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemStoreSize()); |
| totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM |
| + 7 * oneCellOnCAHeapSize; |
| assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize()); |
| |
| size = memstore.getFlushableSize(); |
| MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot |
| region.decrMemStoreSize(size); // simulate flusher |
| ImmutableSegment s = memstore.getSnapshot(); |
| assertEquals(7, s.getCellsCount()); |
| assertEquals(0, regionServicesForStores.getMemStoreSize()); |
| |
| memstore.clearSnapshot(snapshot.getId()); |
| } |
| |
| @Test |
| public void testCompaction3Buckets() throws IOException { |
| |
| // set memstore to do data compaction and not to use the speculative scan |
| MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.EAGER; |
| memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, |
| String.valueOf(compactionType)); |
| ((MyCompactingMemStore)memstore).initiateType(compactionType, memstore.getConfiguration()); |
| String[] keys1 = { "A", "A", "B", "C" }; |
| String[] keys2 = { "A", "B", "D" }; |
| String[] keys3 = { "D", "B", "B" }; |
| |
| int totalCellsLen1 = addRowsByKeys(memstore, keys1);// Adding 4 cells. |
| int oneCellOnCSLMHeapSize = 120; |
| int oneCellOnCAHeapSize = 88; |
| assertEquals(totalCellsLen1, region.getMemStoreSize()); |
| long totalHeapSize = MutableSegment.DEEP_OVERHEAD + 4 * oneCellOnCSLMHeapSize; |
| assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize()); |
| ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and compact |
| |
| assertEquals(0, memstore.getSnapshot().getCellsCount()); |
| // One cell is duplicated and the compaction will remove it. All cells of same time so adjusting |
| // totalCellsLen |
| totalCellsLen1 = (totalCellsLen1 * 3) / 4; |
| assertEquals(totalCellsLen1, regionServicesForStores.getMemStoreSize()); |
| // In memory flush to make a CellArrayMap instead of CSLM. See the overhead diff. |
| totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM |
| + 3 * oneCellOnCAHeapSize; |
| assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize()); |
| |
| int totalCellsLen2 = addRowsByKeys(memstore, keys2);// Adding 3 more cells. |
| long totalHeapSize2 = totalHeapSize + 3 * oneCellOnCSLMHeapSize; |
| |
| assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemStoreSize()); |
| assertEquals(totalHeapSize2, ((CompactingMemStore) memstore).heapSize()); |
| |
| ((MyCompactingMemStore) memstore).disableCompaction(); |
| MemStoreSize size = memstore.getFlushableSize(); |
| ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline without compaction |
| assertEquals(0, memstore.getSnapshot().getCellsCount()); |
| // No change in the cells data size. ie. memstore size. as there is no compaction. |
| assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemStoreSize()); |
| assertEquals(totalHeapSize2 + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM, |
| ((CompactingMemStore) memstore).heapSize()); |
| |
| int totalCellsLen3 = addRowsByKeys(memstore, keys3);// 3 more cells added |
| assertEquals(totalCellsLen1 + totalCellsLen2 + totalCellsLen3, |
| regionServicesForStores.getMemStoreSize()); |
| long totalHeapSize3 = totalHeapSize2 + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM |
| + 3 * oneCellOnCSLMHeapSize; |
| assertEquals(totalHeapSize3, ((CompactingMemStore) memstore).heapSize()); |
| |
| ((MyCompactingMemStore)memstore).enableCompaction(); |
| size = memstore.getFlushableSize(); |
| ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and compact |
| assertEquals(0, memstore.getSnapshot().getCellsCount()); |
| // active flushed to pipeline and all 3 segments compacted. Will get rid of duplicated cells. |
| // Out of total 10, only 4 cells are unique |
| totalCellsLen2 = totalCellsLen2 / 3;// 2 out of 3 cells are duplicated |
| totalCellsLen3 = 0;// All duplicated cells. |
| assertEquals(totalCellsLen1 + totalCellsLen2 + totalCellsLen3, |
| regionServicesForStores.getMemStoreSize()); |
| // Only 4 unique cells left |
| assertEquals(4 * oneCellOnCAHeapSize + MutableSegment.DEEP_OVERHEAD |
| + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM, ((CompactingMemStore) memstore).heapSize()); |
| |
| size = memstore.getFlushableSize(); |
| MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot |
| region.decrMemStoreSize(size); // simulate flusher |
| ImmutableSegment s = memstore.getSnapshot(); |
| assertEquals(4, s.getCellsCount()); |
| assertEquals(0, regionServicesForStores.getMemStoreSize()); |
| |
| memstore.clearSnapshot(snapshot.getId()); |
| } |
| |
| @Test |
| public void testMagicCompaction3Buckets() throws IOException { |
| |
| MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.ADAPTIVE; |
| memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, |
| String.valueOf(compactionType)); |
| memstore.getConfiguration().setDouble( |
| AdaptiveMemStoreCompactionStrategy.ADAPTIVE_COMPACTION_THRESHOLD_KEY, 0.45); |
| memstore.getConfiguration().setInt( |
| AdaptiveMemStoreCompactionStrategy.COMPACTING_MEMSTORE_THRESHOLD_KEY, 2); |
| memstore.getConfiguration().setInt(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 1); |
| ((MyCompactingMemStore) memstore).initiateType(compactionType, memstore.getConfiguration()); |
| |
| String[] keys1 = { "A", "B", "D" }; |
| String[] keys2 = { "A" }; |
| String[] keys3 = { "A", "A", "B", "C" }; |
| String[] keys4 = { "D", "B", "B" }; |
| |
| int totalCellsLen1 = addRowsByKeys(memstore, keys1);// Adding 3 cells. |
| int oneCellOnCSLMHeapSize = 120; |
| assertEquals(totalCellsLen1, region.getMemStoreSize()); |
| long totalHeapSize = MutableSegment.DEEP_OVERHEAD + 3 * oneCellOnCSLMHeapSize; |
| assertEquals(totalHeapSize, memstore.heapSize()); |
| |
| ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline - flatten |
| assertEquals(3, ((CompactingMemStore) memstore).getImmutableSegments().getNumOfCells()); |
| assertEquals(1.0, |
| ((CompactingMemStore) memstore).getImmutableSegments().getEstimatedUniquesFrac(), 0); |
| assertEquals(0, memstore.getSnapshot().getCellsCount()); |
| |
| addRowsByKeys(memstore, keys2);// Adding 1 more cell - flatten. |
| ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline without compaction |
| assertEquals(4, ((CompactingMemStore) memstore).getImmutableSegments().getNumOfCells()); |
| assertEquals(1.0, |
| ((CompactingMemStore) memstore).getImmutableSegments().getEstimatedUniquesFrac(), 0); |
| assertEquals(0, memstore.getSnapshot().getCellsCount()); |
| |
| addRowsByKeys(memstore, keys3);// Adding 4 more cells - merge. |
| ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline without compaction |
| assertEquals(8, ((CompactingMemStore) memstore).getImmutableSegments().getNumOfCells()); |
| assertEquals((4.0 / 8.0), |
| ((CompactingMemStore) memstore).getImmutableSegments().getEstimatedUniquesFrac(), 0); |
| assertEquals(0, memstore.getSnapshot().getCellsCount()); |
| |
| addRowsByKeys(memstore, keys4);// 3 more cells added - compact (or not) |
| ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and compact |
| int numCells = ((CompactingMemStore) memstore).getImmutableSegments().getNumOfCells(); |
| assertTrue(4 == numCells || 11 == numCells); |
| assertEquals(0, memstore.getSnapshot().getCellsCount()); |
| |
| MemStoreSize size = memstore.getFlushableSize(); |
| MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot |
| region.decrMemStoreSize(size); // simulate flusher |
| ImmutableSegment s = memstore.getSnapshot(); |
| numCells = s.getCellsCount(); |
| assertTrue(4 == numCells || 11 == numCells); |
| assertEquals(0, regionServicesForStores.getMemStoreSize()); |
| |
| memstore.clearSnapshot(snapshot.getId()); |
| } |
| |
| protected int addRowsByKeys(final AbstractMemStore hmc, String[] keys) { |
| byte[] fam = Bytes.toBytes("testfamily"); |
| byte[] qf = Bytes.toBytes("testqualifier"); |
| long size = hmc.getActive().keySize(); |
| long heapOverhead = hmc.getActive().heapSize(); |
| int totalLen = 0; |
| for (int i = 0; i < keys.length; i++) { |
| long timestamp = System.currentTimeMillis(); |
| Threads.sleep(1); // to make sure each kv gets a different ts |
| byte[] row = Bytes.toBytes(keys[i]); |
| byte[] val = Bytes.toBytes(keys[i] + i); |
| KeyValue kv = new KeyValue(row, fam, qf, timestamp, val); |
| totalLen += kv.getLength(); |
| hmc.add(kv, null); |
| LOG.debug("added kv: " + kv.getKeyString() + ", timestamp:" + kv.getTimestamp()); |
| } |
| regionServicesForStores.addMemStoreSize(new MemStoreSize(hmc.getActive().keySize() - size, |
| hmc.getActive().heapSize() - heapOverhead)); |
| return totalLen; |
| } |
| |
| // for controlling the val size when adding a new cell |
| protected int addRowsByKeys(final AbstractMemStore hmc, String[] keys, byte[] val) { |
| byte[] fam = Bytes.toBytes("testfamily"); |
| byte[] qf = Bytes.toBytes("testqualifier"); |
| long size = hmc.getActive().keySize(); |
| long heapOverhead = hmc.getActive().heapSize(); |
| int totalLen = 0; |
| for (int i = 0; i < keys.length; i++) { |
| long timestamp = System.currentTimeMillis(); |
| Threads.sleep(1); // to make sure each kv gets a different ts |
| byte[] row = Bytes.toBytes(keys[i]); |
| KeyValue kv = new KeyValue(row, fam, qf, timestamp, val); |
| totalLen += kv.getLength(); |
| hmc.add(kv, null); |
| LOG.debug("added kv: " + kv.getKeyString() + ", timestamp:" + kv.getTimestamp()); |
| } |
| regionServicesForStores.addMemStoreSize(new MemStoreSize(hmc.getActive().keySize() - size, |
| hmc.getActive().heapSize() - heapOverhead)); |
| return totalLen; |
| } |
| |
| private class EnvironmentEdgeForMemstoreTest implements EnvironmentEdge { |
| long t = 1234; |
| |
| @Override |
| public long currentTime() { |
| return t; |
| } |
| public void setCurrentTimeMillis(long t) { |
| this.t = t; |
| } |
| } |
| |
| static protected class MyCompactingMemStore extends CompactingMemStore { |
| |
| public MyCompactingMemStore(Configuration conf, CellComparator c, HStore store, |
| RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy) |
| throws IOException { |
| super(conf, c, store, regionServices, compactionPolicy); |
| } |
| |
| void disableCompaction() { |
| allowCompaction.set(false); |
| } |
| |
| void enableCompaction() { |
| allowCompaction.set(true); |
| } |
| void initiateType(MemoryCompactionPolicy compactionType, Configuration conf) |
| throws IllegalArgumentIOException { |
| compactor.initiateCompactionStrategy(compactionType, conf, "CF_TEST"); |
| } |
| |
| } |
| } |