blob: fa6e62a65617f2a97d61e53b2446c48dcf839714 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.compactions;
import static org.apache.hadoop.hbase.regionserver.StripeStoreFileManager.OPEN_KEY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.AdditionalMatchers.aryEq;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.argThat;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.isNull;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.only;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.ScannerContext;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreConfigInformation;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFileReader;
import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
import org.apache.hadoop.hbase.regionserver.StripeMultiFileWriter;
import org.apache.hadoop.hbase.regionserver.StripeStoreConfig;
import org.apache.hadoop.hbase.regionserver.StripeStoreFileManager;
import org.apache.hadoop.hbase.regionserver.StripeStoreFlusher;
import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy.StripeInformationProvider;
import org.apache.hadoop.hbase.regionserver.compactions.TestCompactor.StoreFileWritersCapture;
import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ConcatenatedLists;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
import org.junit.runners.Parameterized.Parameters;
import org.mockito.ArgumentMatcher;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
@RunWith(Parameterized.class)
@Category({RegionServerTests.class, SmallTests.class})
public class TestStripeCompactionPolicy {
private static final byte[] KEY_A = Bytes.toBytes("aaa");
private static final byte[] KEY_B = Bytes.toBytes("bbb");
private static final byte[] KEY_C = Bytes.toBytes("ccc");
private static final byte[] KEY_D = Bytes.toBytes("ddd");
private static final byte[] KEY_E = Bytes.toBytes("eee");
private static final KeyValue KV_A = new KeyValue(KEY_A, 0L);
private static final KeyValue KV_B = new KeyValue(KEY_B, 0L);
private static final KeyValue KV_C = new KeyValue(KEY_C, 0L);
private static final KeyValue KV_D = new KeyValue(KEY_D, 0L);
private static final KeyValue KV_E = new KeyValue(KEY_E, 0L);
private static long defaultSplitSize = 18;
private static float defaultSplitCount = 1.8F;
private final static int defaultInitialCount = 1;
private static long defaultTtl = 1000 * 1000;
@Parameters(name = "{index}: usePrivateReaders={0}")
public static Iterable<Object[]> data() {
return Arrays.asList(new Object[] { true }, new Object[] { false });
}
@Parameter
public boolean usePrivateReaders;
@Test
public void testNoStripesFromFlush() throws Exception {
Configuration conf = HBaseConfiguration.create();
conf.setBoolean(StripeStoreConfig.FLUSH_TO_L0_KEY, true);
StripeCompactionPolicy policy = createPolicy(conf);
StripeInformationProvider si = createStripesL0Only(0, 0);
KeyValue[] input = new KeyValue[] { KV_A, KV_B, KV_C, KV_D, KV_E };
KeyValue[][] expected = new KeyValue[][] { input };
verifyFlush(policy, si, input, expected, null);
}
@Test
public void testOldStripesFromFlush() throws Exception {
StripeCompactionPolicy policy = createPolicy(HBaseConfiguration.create());
StripeInformationProvider si = createStripes(0, KEY_C, KEY_D);
KeyValue[] input = new KeyValue[] { KV_B, KV_C, KV_C, KV_D, KV_E };
KeyValue[][] expected = new KeyValue[][] { new KeyValue[] { KV_B },
new KeyValue[] { KV_C, KV_C }, new KeyValue[] { KV_D, KV_E } };
verifyFlush(policy, si, input, expected, new byte[][] { OPEN_KEY, KEY_C, KEY_D, OPEN_KEY });
}
@Test
public void testNewStripesFromFlush() throws Exception {
StripeCompactionPolicy policy = createPolicy(HBaseConfiguration.create());
StripeInformationProvider si = createStripesL0Only(0, 0);
KeyValue[] input = new KeyValue[] { KV_B, KV_C, KV_C, KV_D, KV_E };
// Starts with one stripe; unlike flush results, must have metadata
KeyValue[][] expected = new KeyValue[][] { input };
verifyFlush(policy, si, input, expected, new byte[][] { OPEN_KEY, OPEN_KEY });
}
@Test
public void testSingleStripeCompaction() throws Exception {
// Create a special policy that only compacts single stripes, using standard methods.
Configuration conf = HBaseConfiguration.create();
// Test depends on this not being set to pass. Default breaks test. TODO: Revisit.
conf.unset("hbase.hstore.compaction.min.size");
conf.setFloat(CompactionConfiguration.HBASE_HSTORE_COMPACTION_RATIO_KEY, 1.0F);
conf.setInt(StripeStoreConfig.MIN_FILES_KEY, 3);
conf.setInt(StripeStoreConfig.MAX_FILES_KEY, 4);
conf.setLong(StripeStoreConfig.SIZE_TO_SPLIT_KEY, 1000); // make sure the are no splits
StoreConfigInformation sci = mock(StoreConfigInformation.class);
StripeStoreConfig ssc = new StripeStoreConfig(conf, sci);
StripeCompactionPolicy policy = new StripeCompactionPolicy(conf, sci, ssc) {
@Override
public StripeCompactionRequest selectCompaction(StripeInformationProvider si,
List<StoreFile> filesCompacting, boolean isOffpeak) throws IOException {
if (!filesCompacting.isEmpty()) return null;
return selectSingleStripeCompaction(si, false, false, isOffpeak);
}
@Override
public boolean needsCompactions(
StripeInformationProvider si, List<StoreFile> filesCompacting) {
if (!filesCompacting.isEmpty()) return false;
return needsSingleStripeCompaction(si);
}
};
// No compaction due to min files or ratio
StripeInformationProvider si = createStripesWithSizes(0, 0,
new Long[] { 2L }, new Long[] { 3L, 3L }, new Long[] { 5L, 1L });
verifyNoCompaction(policy, si);
// No compaction due to min files or ratio - will report needed, but not do any.
si = createStripesWithSizes(0, 0,
new Long[] { 2L }, new Long[] { 3L, 3L }, new Long[] { 5L, 1L, 1L });
assertNull(policy.selectCompaction(si, al(), false));
assertTrue(policy.needsCompactions(si, al()));
// One stripe has possible compaction
si = createStripesWithSizes(0, 0,
new Long[] { 2L }, new Long[] { 3L, 3L }, new Long[] { 5L, 4L, 3L });
verifySingleStripeCompaction(policy, si, 2, null);
// Several stripes have possible compactions; choose best quality (removes most files)
si = createStripesWithSizes(0, 0,
new Long[] { 3L, 2L, 2L }, new Long[] { 2L, 2L, 1L }, new Long[] { 3L, 2L, 2L, 1L });
verifySingleStripeCompaction(policy, si, 2, null);
si = createStripesWithSizes(0, 0,
new Long[] { 5L }, new Long[] { 3L, 2L, 2L, 1L }, new Long[] { 3L, 2L, 2L });
verifySingleStripeCompaction(policy, si, 1, null);
// Or with smallest files, if the count is the same
si = createStripesWithSizes(0, 0,
new Long[] { 3L, 3L, 3L }, new Long[] { 3L, 1L, 2L }, new Long[] { 3L, 2L, 2L });
verifySingleStripeCompaction(policy, si, 1, null);
// Verify max count is respected.
si = createStripesWithSizes(0, 0, new Long[] { 5L }, new Long[] { 5L, 4L, 4L, 4L, 4L });
List<StoreFile> sfs = si.getStripes().get(1).subList(1, 5);
verifyCompaction(policy, si, sfs, null, 1, null, si.getStartRow(1), si.getEndRow(1), true);
// Verify ratio is applied.
si = createStripesWithSizes(0, 0, new Long[] { 5L }, new Long[] { 50L, 4L, 4L, 4L, 4L });
sfs = si.getStripes().get(1).subList(1, 5);
verifyCompaction(policy, si, sfs, null, 1, null, si.getStartRow(1), si.getEndRow(1), true);
}
@Test
public void testWithParallelCompaction() throws Exception {
// TODO: currently only one compaction at a time per store is allowed. If this changes,
// the appropriate file exclusion testing would need to be done in respective tests.
assertNull(createPolicy(HBaseConfiguration.create()).selectCompaction(
mock(StripeInformationProvider.class), al(createFile()), false));
}
@Test
public void testWithReferences() throws Exception {
StripeCompactionPolicy policy = createPolicy(HBaseConfiguration.create());
StripeCompactor sc = mock(StripeCompactor.class);
StoreFile ref = createFile();
when(ref.isReference()).thenReturn(true);
StripeInformationProvider si = mock(StripeInformationProvider.class);
Collection<StoreFile> sfs = al(ref, createFile());
when(si.getStorefiles()).thenReturn(sfs);
assertTrue(policy.needsCompactions(si, al()));
StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(), false);
assertEquals(si.getStorefiles(), scr.getRequest().getFiles());
scr.execute(sc, NoLimitThroughputController.INSTANCE, null);
verify(sc, only()).compact(eq(scr.getRequest()), anyInt(), anyLong(), aryEq(OPEN_KEY),
aryEq(OPEN_KEY), aryEq(OPEN_KEY), aryEq(OPEN_KEY),
any(NoLimitThroughputController.class), any(User.class));
}
@Test
public void testInitialCountFromL0() throws Exception {
Configuration conf = HBaseConfiguration.create();
conf.setInt(StripeStoreConfig.MIN_FILES_L0_KEY, 2);
StripeCompactionPolicy policy = createPolicy(
conf, defaultSplitSize, defaultSplitCount, 2, false);
StripeCompactionPolicy.StripeInformationProvider si = createStripesL0Only(3, 8);
verifyCompaction(policy, si, si.getStorefiles(), true, 2, 12L, OPEN_KEY, OPEN_KEY, true);
si = createStripesL0Only(3, 10); // If result would be too large, split into smaller parts.
verifyCompaction(policy, si, si.getStorefiles(), true, 3, 10L, OPEN_KEY, OPEN_KEY, true);
policy = createPolicy(conf, defaultSplitSize, defaultSplitCount, 6, false);
verifyCompaction(policy, si, si.getStorefiles(), true, 6, 5L, OPEN_KEY, OPEN_KEY, true);
}
@Test
public void testExistingStripesFromL0() throws Exception {
Configuration conf = HBaseConfiguration.create();
conf.setInt(StripeStoreConfig.MIN_FILES_L0_KEY, 3);
StripeCompactionPolicy.StripeInformationProvider si = createStripes(3, KEY_A);
verifyCompaction(
createPolicy(conf), si, si.getLevel0Files(), null, null, si.getStripeBoundaries());
}
@Test
public void testNothingToCompactFromL0() throws Exception {
Configuration conf = HBaseConfiguration.create();
conf.setInt(StripeStoreConfig.MIN_FILES_L0_KEY, 4);
StripeCompactionPolicy.StripeInformationProvider si = createStripesL0Only(3, 10);
StripeCompactionPolicy policy = createPolicy(conf);
verifyNoCompaction(policy, si);
si = createStripes(3, KEY_A);
verifyNoCompaction(policy, si);
}
@Test
public void testSplitOffStripe() throws Exception {
Configuration conf = HBaseConfiguration.create();
// Test depends on this not being set to pass. Default breaks test. TODO: Revisit.
conf.unset("hbase.hstore.compaction.min.size");
// First test everything with default split count of 2, then split into more.
conf.setInt(StripeStoreConfig.MIN_FILES_KEY, 2);
Long[] toSplit = new Long[] { defaultSplitSize - 2, 1L, 1L };
Long[] noSplit = new Long[] { defaultSplitSize - 2, 1L };
long splitTargetSize = (long)(defaultSplitSize / defaultSplitCount);
// Don't split if not eligible for compaction.
StripeCompactionPolicy.StripeInformationProvider si =
createStripesWithSizes(0, 0, new Long[] { defaultSplitSize - 2, 2L });
assertNull(createPolicy(conf).selectCompaction(si, al(), false));
// Make sure everything is eligible.
conf.setFloat(CompactionConfiguration.HBASE_HSTORE_COMPACTION_RATIO_KEY, 500f);
StripeCompactionPolicy policy = createPolicy(conf);
verifyWholeStripesCompaction(policy, si, 0, 0, null, 2, splitTargetSize);
// Add some extra stripes...
si = createStripesWithSizes(0, 0, noSplit, noSplit, toSplit);
verifyWholeStripesCompaction(policy, si, 2, 2, null, 2, splitTargetSize);
// In the middle.
si = createStripesWithSizes(0, 0, noSplit, toSplit, noSplit);
verifyWholeStripesCompaction(policy, si, 1, 1, null, 2, splitTargetSize);
// No split-off with different config (larger split size).
// However, in this case some eligible stripe will just be compacted alone.
StripeCompactionPolicy specPolicy = createPolicy(
conf, defaultSplitSize + 1, defaultSplitCount, defaultInitialCount, false);
verifySingleStripeCompaction(specPolicy, si, 1, null);
}
@Test
public void testSplitOffStripeOffPeak() throws Exception {
// for HBASE-11439
Configuration conf = HBaseConfiguration.create();
// Test depends on this not being set to pass. Default breaks test. TODO: Revisit.
conf.unset("hbase.hstore.compaction.min.size");
conf.setInt(StripeStoreConfig.MIN_FILES_KEY, 2);
// Select the last 2 files.
StripeCompactionPolicy.StripeInformationProvider si =
createStripesWithSizes(0, 0, new Long[] { defaultSplitSize - 2, 1L, 1L });
assertEquals(2, createPolicy(conf).selectCompaction(si, al(), false).getRequest().getFiles()
.size());
// Make sure everything is eligible in offpeak.
conf.setFloat("hbase.hstore.compaction.ratio.offpeak", 500f);
assertEquals(3, createPolicy(conf).selectCompaction(si, al(), true).getRequest().getFiles()
.size());
}
@Test
public void testSplitOffStripeDropDeletes() throws Exception {
Configuration conf = HBaseConfiguration.create();
conf.setInt(StripeStoreConfig.MIN_FILES_KEY, 2);
StripeCompactionPolicy policy = createPolicy(conf);
Long[] toSplit = new Long[] { defaultSplitSize / 2, defaultSplitSize / 2 };
Long[] noSplit = new Long[] { 1L };
long splitTargetSize = (long)(defaultSplitSize / defaultSplitCount);
// Verify the deletes can be dropped if there are no L0 files.
StripeCompactionPolicy.StripeInformationProvider si =
createStripesWithSizes(0, 0, noSplit, toSplit);
verifyWholeStripesCompaction(policy, si, 1, 1, true, null, splitTargetSize);
// But cannot be dropped if there are.
si = createStripesWithSizes(2, 2, noSplit, toSplit);
verifyWholeStripesCompaction(policy, si, 1, 1, false, null, splitTargetSize);
}
@SuppressWarnings("unchecked")
@Test
public void testMergeExpiredFiles() throws Exception {
ManualEnvironmentEdge edge = new ManualEnvironmentEdge();
long now = defaultTtl + 2;
edge.setValue(now);
EnvironmentEdgeManager.injectEdge(edge);
try {
StoreFile expiredFile = createFile(), notExpiredFile = createFile();
when(expiredFile.getReader().getMaxTimestamp()).thenReturn(now - defaultTtl - 1);
when(notExpiredFile.getReader().getMaxTimestamp()).thenReturn(now - defaultTtl + 1);
List<StoreFile> expired = Lists.newArrayList(expiredFile, expiredFile);
List<StoreFile> notExpired = Lists.newArrayList(notExpiredFile, notExpiredFile);
List<StoreFile> mixed = Lists.newArrayList(expiredFile, notExpiredFile);
StripeCompactionPolicy policy = createPolicy(HBaseConfiguration.create(),
defaultSplitSize, defaultSplitCount, defaultInitialCount, true);
// Merge expired if there are eligible stripes.
StripeCompactionPolicy.StripeInformationProvider si =
createStripesWithFiles(expired, expired, expired);
verifyWholeStripesCompaction(policy, si, 0, 2, null, 1, Long.MAX_VALUE, false);
// Don't merge if nothing expired.
si = createStripesWithFiles(notExpired, notExpired, notExpired);
assertNull(policy.selectCompaction(si, al(), false));
// Merge one expired stripe with next.
si = createStripesWithFiles(notExpired, expired, notExpired);
verifyWholeStripesCompaction(policy, si, 1, 2, null, 1, Long.MAX_VALUE, false);
// Merge the biggest run out of multiple options.
// Merge one expired stripe with next.
si = createStripesWithFiles(notExpired, expired, notExpired, expired, expired, notExpired);
verifyWholeStripesCompaction(policy, si, 3, 4, null, 1, Long.MAX_VALUE, false);
// Stripe with a subset of expired files is not merged.
si = createStripesWithFiles(expired, expired, notExpired, expired, mixed);
verifyWholeStripesCompaction(policy, si, 0, 1, null, 1, Long.MAX_VALUE, false);
} finally {
EnvironmentEdgeManager.reset();
}
}
@SuppressWarnings("unchecked")
@Test
public void testMergeExpiredStripes() throws Exception {
// HBASE-11397
ManualEnvironmentEdge edge = new ManualEnvironmentEdge();
long now = defaultTtl + 2;
edge.setValue(now);
EnvironmentEdgeManager.injectEdge(edge);
try {
StoreFile expiredFile = createFile(), notExpiredFile = createFile();
when(expiredFile.getReader().getMaxTimestamp()).thenReturn(now - defaultTtl - 1);
when(notExpiredFile.getReader().getMaxTimestamp()).thenReturn(now - defaultTtl + 1);
List<StoreFile> expired = Lists.newArrayList(expiredFile, expiredFile);
List<StoreFile> notExpired = Lists.newArrayList(notExpiredFile, notExpiredFile);
StripeCompactionPolicy policy =
createPolicy(HBaseConfiguration.create(), defaultSplitSize, defaultSplitCount,
defaultInitialCount, true);
// Merge all three expired stripes into one.
StripeCompactionPolicy.StripeInformationProvider si =
createStripesWithFiles(expired, expired, expired);
verifyMergeCompatcion(policy, si, 0, 2);
// Merge two adjacent expired stripes into one.
si = createStripesWithFiles(notExpired, expired, notExpired, expired, expired, notExpired);
verifyMergeCompatcion(policy, si, 3, 4);
} finally {
EnvironmentEdgeManager.reset();
}
}
@SuppressWarnings("unchecked")
private static StripeCompactionPolicy.StripeInformationProvider createStripesWithFiles(
List<StoreFile>... stripeFiles) throws Exception {
return createStripesWithFiles(createBoundaries(stripeFiles.length),
Lists.newArrayList(stripeFiles), new ArrayList<StoreFile>());
}
@Test
public void testSingleStripeDropDeletes() throws Exception {
Configuration conf = HBaseConfiguration.create();
// Test depends on this not being set to pass. Default breaks test. TODO: Revisit.
conf.unset("hbase.hstore.compaction.min.size");
StripeCompactionPolicy policy = createPolicy(conf);
// Verify the deletes can be dropped if there are no L0 files.
Long[][] stripes = new Long[][] { new Long[] { 3L, 2L, 2L, 2L }, new Long[] { 6L } };
StripeInformationProvider si = createStripesWithSizes(0, 0, stripes);
verifySingleStripeCompaction(policy, si, 0, true);
// But cannot be dropped if there are.
si = createStripesWithSizes(2, 2, stripes);
verifySingleStripeCompaction(policy, si, 0, false);
// Unless there are enough to cause L0 compaction.
si = createStripesWithSizes(6, 2, stripes);
ConcatenatedLists<StoreFile> sfs = new ConcatenatedLists<StoreFile>();
sfs.addSublist(si.getLevel0Files());
sfs.addSublist(si.getStripes().get(0));
verifyCompaction(
policy, si, sfs, si.getStartRow(0), si.getEndRow(0), si.getStripeBoundaries());
// If we cannot actually compact all files in some stripe, L0 is chosen.
si = createStripesWithSizes(6, 2,
new Long[][] { new Long[] { 10L, 1L, 1L, 1L, 1L }, new Long[] { 12L } });
verifyCompaction(policy, si, si.getLevel0Files(), null, null, si.getStripeBoundaries());
// even if L0 has no file
// if all files of stripe aren't selected, delete must not be dropped.
stripes = new Long[][] { new Long[] { 100L, 3L, 2L, 2L, 2L }, new Long[] { 6L } };
si = createStripesWithSizes(0, 0, stripes);
List<StoreFile> compact_file = new ArrayList<StoreFile>();
Iterator<StoreFile> iter = si.getStripes().get(0).listIterator(1);
while (iter.hasNext()) {
compact_file.add(iter.next());
}
verifyCompaction(policy, si, compact_file, false, 1, null, si.getStartRow(0), si.getEndRow(0), true);
}
/********* HELPER METHODS ************/
private static StripeCompactionPolicy createPolicy(
Configuration conf) throws Exception {
return createPolicy(conf, defaultSplitSize, defaultSplitCount, defaultInitialCount, false);
}
private static StripeCompactionPolicy createPolicy(Configuration conf,
long splitSize, float splitCount, int initialCount, boolean hasTtl) throws Exception {
conf.setLong(StripeStoreConfig.SIZE_TO_SPLIT_KEY, splitSize);
conf.setFloat(StripeStoreConfig.SPLIT_PARTS_KEY, splitCount);
conf.setInt(StripeStoreConfig.INITIAL_STRIPE_COUNT_KEY, initialCount);
StoreConfigInformation sci = mock(StoreConfigInformation.class);
when(sci.getStoreFileTtl()).thenReturn(hasTtl ? defaultTtl : Long.MAX_VALUE);
StripeStoreConfig ssc = new StripeStoreConfig(conf, sci);
return new StripeCompactionPolicy(conf, sci, ssc);
}
private static ArrayList<StoreFile> al(StoreFile... sfs) {
return new ArrayList<StoreFile>(Arrays.asList(sfs));
}
private void verifyMergeCompatcion(StripeCompactionPolicy policy, StripeInformationProvider si,
int from, int to) throws Exception {
StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(), false);
Collection<StoreFile> sfs = getAllFiles(si, from, to);
verifyCollectionsEqual(sfs, scr.getRequest().getFiles());
// All the Stripes are expired, so the Compactor will not create any Writers. We need to create
// an empty file to preserve metadata
StripeCompactor sc = createCompactor();
List<Path> paths = scr.execute(sc, NoLimitThroughputController.INSTANCE, null);
assertEquals(1, paths.size());
}
/**
* Verify the compaction that includes several entire stripes.
* @param policy Policy to test.
* @param si Stripe information pre-set with stripes to test.
* @param from Starting stripe.
* @param to Ending stripe (inclusive).
* @param dropDeletes Whether to drop deletes from compaction range.
* @param count Expected # of resulting stripes, null if not checked.
* @param size Expected target stripe size, null if not checked.
*/
private void verifyWholeStripesCompaction(StripeCompactionPolicy policy,
StripeInformationProvider si, int from, int to, Boolean dropDeletes,
Integer count, Long size, boolean needsCompaction) throws IOException {
verifyCompaction(policy, si, getAllFiles(si, from, to), dropDeletes,
count, size, si.getStartRow(from), si.getEndRow(to), needsCompaction);
}
private void verifyWholeStripesCompaction(StripeCompactionPolicy policy,
StripeInformationProvider si, int from, int to, Boolean dropDeletes,
Integer count, Long size) throws IOException {
verifyWholeStripesCompaction(policy, si, from, to, dropDeletes, count, size, true);
}
private void verifySingleStripeCompaction(StripeCompactionPolicy policy,
StripeInformationProvider si, int index, Boolean dropDeletes) throws IOException {
verifyWholeStripesCompaction(policy, si, index, index, dropDeletes, 1, null, true);
}
/**
* Verify no compaction is needed or selected.
* @param policy Policy to test.
* @param si Stripe information pre-set with stripes to test.
*/
private void verifyNoCompaction(
StripeCompactionPolicy policy, StripeInformationProvider si) throws IOException {
assertNull(policy.selectCompaction(si, al(), false));
assertFalse(policy.needsCompactions(si, al()));
}
/**
* Verify arbitrary compaction.
* @param policy Policy to test.
* @param si Stripe information pre-set with stripes to test.
* @param sfs Files that should be compacted.
* @param dropDeletesFrom Row from which to drop deletes.
* @param dropDeletesTo Row to which to drop deletes.
* @param boundaries Expected target stripe boundaries.
*/
private void verifyCompaction(StripeCompactionPolicy policy, StripeInformationProvider si,
Collection<StoreFile> sfs, byte[] dropDeletesFrom, byte[] dropDeletesTo,
final List<byte[]> boundaries) throws Exception {
StripeCompactor sc = mock(StripeCompactor.class);
assertTrue(policy.needsCompactions(si, al()));
StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(), false);
verifyCollectionsEqual(sfs, scr.getRequest().getFiles());
scr.execute(sc, NoLimitThroughputController.INSTANCE, null);
verify(sc, times(1)).compact(eq(scr.getRequest()), argThat(new ArgumentMatcher<List<byte[]>>() {
@Override
public boolean matches(Object argument) {
@SuppressWarnings("unchecked")
List<byte[]> other = (List<byte[]>) argument;
if (other.size() != boundaries.size()) return false;
for (int i = 0; i < other.size(); ++i) {
if (!Bytes.equals(other.get(i), boundaries.get(i))) return false;
}
return true;
}
}), dropDeletesFrom == null ? isNull(byte[].class) : aryEq(dropDeletesFrom),
dropDeletesTo == null ? isNull(byte[].class) : aryEq(dropDeletesTo),
any(NoLimitThroughputController.class), any(User.class));
}
/**
* Verify arbitrary compaction.
* @param policy Policy to test.
* @param si Stripe information pre-set with stripes to test.
* @param sfs Files that should be compacted.
* @param dropDeletes Whether to drop deletes from compaction range.
* @param count Expected # of resulting stripes, null if not checked.
* @param size Expected target stripe size, null if not checked.
* @param start Left boundary of the compaction.
* @param righr Right boundary of the compaction.
*/
private void verifyCompaction(StripeCompactionPolicy policy, StripeInformationProvider si,
Collection<StoreFile> sfs, Boolean dropDeletes, Integer count, Long size,
byte[] start, byte[] end, boolean needsCompaction) throws IOException {
StripeCompactor sc = mock(StripeCompactor.class);
assertTrue(!needsCompaction || policy.needsCompactions(si, al()));
StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(), false);
verifyCollectionsEqual(sfs, scr.getRequest().getFiles());
scr.execute(sc, NoLimitThroughputController.INSTANCE, null);
verify(sc, times(1)).compact(eq(scr.getRequest()),
count == null ? anyInt() : eq(count.intValue()),
size == null ? anyLong() : eq(size.longValue()), aryEq(start), aryEq(end),
dropDeletesMatcher(dropDeletes, start), dropDeletesMatcher(dropDeletes, end),
any(NoLimitThroughputController.class), any(User.class));
}
/** Verify arbitrary flush. */
protected void verifyFlush(StripeCompactionPolicy policy, StripeInformationProvider si,
KeyValue[] input, KeyValue[][] expected, byte[][] boundaries) throws IOException {
StoreFileWritersCapture writers = new StoreFileWritersCapture();
StripeStoreFlusher.StripeFlushRequest req = policy.selectFlush(CellComparator.COMPARATOR, si,
input.length);
StripeMultiFileWriter mw = req.createWriter();
mw.init(null, writers);
for (KeyValue kv : input) {
mw.append(kv);
}
boolean hasMetadata = boundaries != null;
mw.commitWriters(0, false);
writers.verifyKvs(expected, true, hasMetadata);
if (hasMetadata) {
writers.verifyBoundaries(boundaries);
}
}
private byte[] dropDeletesMatcher(Boolean dropDeletes, byte[] value) {
return dropDeletes == null ? any(byte[].class)
: (dropDeletes.booleanValue() ? aryEq(value) : isNull(byte[].class));
}
private void verifyCollectionsEqual(Collection<StoreFile> sfs, Collection<StoreFile> scr) {
// Dumb.
assertEquals(sfs.size(), scr.size());
assertTrue(scr.containsAll(sfs));
}
private static List<StoreFile> getAllFiles(
StripeInformationProvider si, int fromStripe, int toStripe) {
ArrayList<StoreFile> expected = new ArrayList<StoreFile>();
for (int i = fromStripe; i <= toStripe; ++i) {
expected.addAll(si.getStripes().get(i));
}
return expected;
}
/**
* @param l0Count Number of L0 files.
* @param boundaries Target boundaries.
* @return Mock stripes.
*/
private static StripeInformationProvider createStripes(
int l0Count, byte[]... boundaries) throws Exception {
List<Long> l0Sizes = new ArrayList<Long>();
for (int i = 0; i < l0Count; ++i) {
l0Sizes.add(5L);
}
List<List<Long>> sizes = new ArrayList<List<Long>>();
for (int i = 0; i <= boundaries.length; ++i) {
sizes.add(Arrays.asList(Long.valueOf(5)));
}
return createStripes(Arrays.asList(boundaries), sizes, l0Sizes);
}
/**
* @param l0Count Number of L0 files.
* @param l0Size Size of each file.
* @return Mock stripes.
*/
private static StripeInformationProvider createStripesL0Only(
int l0Count, long l0Size) throws Exception {
List<Long> l0Sizes = new ArrayList<Long>();
for (int i = 0; i < l0Count; ++i) {
l0Sizes.add(l0Size);
}
return createStripes(null, new ArrayList<List<Long>>(), l0Sizes);
}
/**
* @param l0Count Number of L0 files.
* @param l0Size Size of each file.
* @param sizes Sizes of the files; each sub-array representing a stripe.
* @return Mock stripes.
*/
private static StripeInformationProvider createStripesWithSizes(
int l0Count, long l0Size, Long[]... sizes) throws Exception {
ArrayList<List<Long>> sizeList = new ArrayList<List<Long>>();
for (Long[] size : sizes) {
sizeList.add(Arrays.asList(size));
}
return createStripesWithSizes(l0Count, l0Size, sizeList);
}
private static StripeInformationProvider createStripesWithSizes(
int l0Count, long l0Size, List<List<Long>> sizes) throws Exception {
List<byte[]> boundaries = createBoundaries(sizes.size());
List<Long> l0Sizes = new ArrayList<Long>();
for (int i = 0; i < l0Count; ++i) {
l0Sizes.add(l0Size);
}
return createStripes(boundaries, sizes, l0Sizes);
}
private static List<byte[]> createBoundaries(int stripeCount) {
byte[][] keys = new byte[][] { KEY_A, KEY_B, KEY_C, KEY_D, KEY_E };
assert stripeCount <= keys.length + 1;
List<byte[]> boundaries = new ArrayList<byte[]>();
boundaries.addAll(Arrays.asList(keys).subList(0, stripeCount - 1));
return boundaries;
}
private static StripeInformationProvider createStripes(List<byte[]> boundaries,
List<List<Long>> stripeSizes, List<Long> l0Sizes) throws Exception {
List<List<StoreFile>> stripeFiles = new ArrayList<List<StoreFile>>(stripeSizes.size());
for (List<Long> sizes : stripeSizes) {
List<StoreFile> sfs = new ArrayList<StoreFile>();
for (Long size : sizes) {
sfs.add(createFile(size));
}
stripeFiles.add(sfs);
}
List<StoreFile> l0Files = new ArrayList<StoreFile>();
for (Long size : l0Sizes) {
l0Files.add(createFile(size));
}
return createStripesWithFiles(boundaries, stripeFiles, l0Files);
}
/**
* This method actually does all the work.
*/
private static StripeInformationProvider createStripesWithFiles(List<byte[]> boundaries,
List<List<StoreFile>> stripeFiles, List<StoreFile> l0Files) throws Exception {
ArrayList<ImmutableList<StoreFile>> stripes = new ArrayList<ImmutableList<StoreFile>>();
ArrayList<byte[]> boundariesList = new ArrayList<byte[]>();
StripeInformationProvider si = mock(StripeInformationProvider.class);
if (!stripeFiles.isEmpty()) {
assert stripeFiles.size() == (boundaries.size() + 1);
boundariesList.add(OPEN_KEY);
for (int i = 0; i <= boundaries.size(); ++i) {
byte[] startKey = ((i == 0) ? OPEN_KEY : boundaries.get(i - 1));
byte[] endKey = ((i == boundaries.size()) ? OPEN_KEY : boundaries.get(i));
boundariesList.add(endKey);
for (StoreFile sf : stripeFiles.get(i)) {
setFileStripe(sf, startKey, endKey);
}
stripes.add(ImmutableList.copyOf(stripeFiles.get(i)));
when(si.getStartRow(eq(i))).thenReturn(startKey);
when(si.getEndRow(eq(i))).thenReturn(endKey);
}
}
ConcatenatedLists<StoreFile> sfs = new ConcatenatedLists<StoreFile>();
sfs.addAllSublists(stripes);
sfs.addSublist(l0Files);
when(si.getStorefiles()).thenReturn(sfs);
when(si.getStripes()).thenReturn(stripes);
when(si.getStripeBoundaries()).thenReturn(boundariesList);
when(si.getStripeCount()).thenReturn(stripes.size());
when(si.getLevel0Files()).thenReturn(l0Files);
return si;
}
private static StoreFile createFile(long size) throws Exception {
StoreFile sf = mock(StoreFile.class);
when(sf.getPath()).thenReturn(new Path("moo"));
StoreFileReader r = mock(StoreFileReader.class);
when(r.getEntries()).thenReturn(size);
when(r.length()).thenReturn(size);
when(r.getBloomFilterType()).thenReturn(BloomType.NONE);
when(r.getHFileReader()).thenReturn(mock(HFile.Reader.class));
when(r.getStoreFileScanner(anyBoolean(), anyBoolean(), anyBoolean(), anyLong(), anyLong(),
anyBoolean())).thenReturn(mock(StoreFileScanner.class));
when(sf.getReader()).thenReturn(r);
when(sf.createReader(anyBoolean())).thenReturn(r);
when(sf.createReader()).thenReturn(r);
when(sf.cloneForReader()).thenReturn(sf);
return sf;
}
private static StoreFile createFile() throws Exception {
return createFile(0);
}
private static void setFileStripe(StoreFile sf, byte[] startKey, byte[] endKey) {
when(sf.getMetadataValue(StripeStoreFileManager.STRIPE_START_KEY)).thenReturn(startKey);
when(sf.getMetadataValue(StripeStoreFileManager.STRIPE_END_KEY)).thenReturn(endKey);
}
private StripeCompactor createCompactor() throws Exception {
HColumnDescriptor col = new HColumnDescriptor(Bytes.toBytes("foo"));
StoreFileWritersCapture writers = new StoreFileWritersCapture();
Store store = mock(Store.class);
HRegionInfo info = mock(HRegionInfo.class);
when(info.getRegionNameAsString()).thenReturn("testRegion");
when(store.getFamily()).thenReturn(col);
when(store.getRegionInfo()).thenReturn(info);
when(
store.createWriterInTmp(anyLong(), any(Compression.Algorithm.class), anyBoolean(),
anyBoolean(), anyBoolean(), anyBoolean())).thenAnswer(writers);
Configuration conf = HBaseConfiguration.create();
conf.setBoolean("hbase.regionserver.compaction.private.readers", usePrivateReaders);
final Scanner scanner = new Scanner();
return new StripeCompactor(conf, store) {
@Override
protected InternalScanner createScanner(Store store, List<StoreFileScanner> scanners,
long smallestReadPoint, long earliestPutTs, byte[] dropDeletesFromRow,
byte[] dropDeletesToRow) throws IOException {
return scanner;
}
@Override
protected InternalScanner createScanner(Store store, List<StoreFileScanner> scanners,
ScanType scanType, long smallestReadPoint, long earliestPutTs) throws IOException {
return scanner;
}
};
}
private static class Scanner implements InternalScanner {
private final ArrayList<KeyValue> kvs;
public Scanner(KeyValue... kvs) {
this.kvs = new ArrayList<KeyValue>(Arrays.asList(kvs));
}
@Override
public boolean next(List<Cell> results) throws IOException {
if (kvs.isEmpty()) return false;
results.add(kvs.remove(0));
return !kvs.isEmpty();
}
@Override
public boolean next(List<Cell> result, ScannerContext scannerContext)
throws IOException {
return next(result);
}
@Override
public void close() throws IOException {
}
}
}