blob: e888639eac4a921ed25209be4c769b55842e71f3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import static org.apache.hadoop.hbase.io.hfile.CacheConfig.CACHE_BLOCKS_ON_WRITE_KEY;
import static org.apache.hadoop.hbase.io.hfile.CacheConfig.CACHE_DATA_ON_READ_KEY;
import static org.apache.hadoop.hbase.io.hfile.CacheConfig.DEFAULT_CACHE_DATA_ON_READ;
import static org.apache.hadoop.hbase.io.hfile.CacheConfig.DEFAULT_CACHE_DATA_ON_WRITE;
import static org.apache.hadoop.hbase.io.hfile.CacheConfig.DEFAULT_EVICT_ON_CLOSE;
import static org.apache.hadoop.hbase.io.hfile.CacheConfig.EVICT_BLOCKS_ON_CLOSE_KEY;
import static org.apache.hadoop.hbase.regionserver.DefaultStoreEngine.DEFAULT_COMPACTION_POLICY_CLASS_KEY;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.ref.SoftReference;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import java.util.NavigableSet;
import java.util.Optional;
import java.util.TreeSet;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import java.util.function.IntBinaryOperator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FilterFileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellBuilderFactory;
import org.apache.hadoop.hbase.CellBuilderType;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.CellComparatorImpl;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.ExtendedCell;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.MemoryCompactionPolicy;
import org.apache.hadoop.hbase.PrivateCellUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Scan.ReadType;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterBase;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.nio.RefCnt;
import org.apache.hadoop.hbase.quotas.RegionSizeStoreImpl;
import org.apache.hadoop.hbase.regionserver.ChunkCreator.ChunkType;
import org.apache.hadoop.hbase.regionserver.MemStoreCompactionStrategy.Action;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
import org.apache.hadoop.hbase.regionserver.compactions.EverythingPolicy;
import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher;
import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.BloomFilterUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge;
import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.util.Progressable;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
/**
* Test class for the HStore
*/
@Category({ RegionServerTests.class, MediumTests.class })
public class TestHStore {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestHStore.class);
private static final Logger LOG = LoggerFactory.getLogger(TestHStore.class);
@Rule
public TestName name = new TestName();
HRegion region;
HStore store;
byte[] table = Bytes.toBytes("table");
byte[] family = Bytes.toBytes("family");
byte[] row = Bytes.toBytes("row");
byte[] row2 = Bytes.toBytes("row2");
byte[] qf1 = Bytes.toBytes("qf1");
byte[] qf2 = Bytes.toBytes("qf2");
byte[] qf3 = Bytes.toBytes("qf3");
byte[] qf4 = Bytes.toBytes("qf4");
byte[] qf5 = Bytes.toBytes("qf5");
byte[] qf6 = Bytes.toBytes("qf6");
NavigableSet<byte[]> qualifiers = new ConcurrentSkipListSet<>(Bytes.BYTES_COMPARATOR);
List<Cell> expected = new ArrayList<>();
List<Cell> result = new ArrayList<>();
long id = EnvironmentEdgeManager.currentTime();
Get get = new Get(row);
private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
private static final String DIR = TEST_UTIL.getDataTestDir("TestStore").toString();
@Before
public void setUp() throws IOException {
qualifiers.clear();
qualifiers.add(qf1);
qualifiers.add(qf3);
qualifiers.add(qf5);
Iterator<byte[]> iter = qualifiers.iterator();
while (iter.hasNext()) {
byte[] next = iter.next();
expected.add(new KeyValue(row, family, next, 1, (byte[]) null));
get.addColumn(family, next);
}
}
private void init(String methodName) throws IOException {
init(methodName, TEST_UTIL.getConfiguration());
}
private HStore init(String methodName, Configuration conf) throws IOException {
// some of the tests write 4 versions and then flush
// (with HBASE-4241, lower versions are collected on flush)
return init(methodName, conf,
ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(4).build());
}
private HStore init(String methodName, Configuration conf, ColumnFamilyDescriptor hcd)
throws IOException {
return init(methodName, conf, TableDescriptorBuilder.newBuilder(TableName.valueOf(table)), hcd);
}
private HStore init(String methodName, Configuration conf, TableDescriptorBuilder builder,
ColumnFamilyDescriptor hcd) throws IOException {
return init(methodName, conf, builder, hcd, null);
}
private HStore init(String methodName, Configuration conf, TableDescriptorBuilder builder,
ColumnFamilyDescriptor hcd, MyStoreHook hook) throws IOException {
return init(methodName, conf, builder, hcd, hook, false);
}
private void initHRegion(String methodName, Configuration conf, TableDescriptorBuilder builder,
ColumnFamilyDescriptor hcd, MyStoreHook hook, boolean switchToPread) throws IOException {
TableDescriptor htd = builder.setColumnFamily(hcd).build();
Path basedir = new Path(DIR + methodName);
Path tableDir = CommonFSUtils.getTableDir(basedir, htd.getTableName());
final Path logdir = new Path(basedir, AbstractFSWALProvider.getWALDirectoryName(methodName));
FileSystem fs = FileSystem.get(conf);
fs.delete(logdir, true);
ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false,
MemStoreLABImpl.CHUNK_SIZE_DEFAULT, 1, 0, null,
MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT);
RegionInfo info = RegionInfoBuilder.newBuilder(htd.getTableName()).build();
Configuration walConf = new Configuration(conf);
CommonFSUtils.setRootDir(walConf, basedir);
WALFactory wals = new WALFactory(walConf, methodName);
region = new HRegion(new HRegionFileSystem(conf, fs, tableDir, info), wals.getWAL(info), conf,
htd, null);
region.regionServicesForStores = Mockito.spy(region.regionServicesForStores);
ThreadPoolExecutor pool = (ThreadPoolExecutor) Executors.newFixedThreadPool(1);
Mockito.when(region.regionServicesForStores.getInMemoryCompactionPool()).thenReturn(pool);
}
private HStore init(String methodName, Configuration conf, TableDescriptorBuilder builder,
ColumnFamilyDescriptor hcd, MyStoreHook hook, boolean switchToPread) throws IOException {
initHRegion(methodName, conf, builder, hcd, hook, switchToPread);
if (hook == null) {
store = new HStore(region, hcd, conf, false);
} else {
store = new MyStore(region, hcd, conf, hook, switchToPread);
}
region.stores.put(store.getColumnFamilyDescriptor().getName(), store);
return store;
}
/**
* Test we do not lose data if we fail a flush and then close. Part of HBase-10466
*/
@Test
public void testFlushSizeSizing() throws Exception {
LOG.info("Setting up a faulty file system that cannot write in " + this.name.getMethodName());
final Configuration conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
// Only retry once.
conf.setInt("hbase.hstore.flush.retries.number", 1);
User user = User.createUserForTesting(conf, this.name.getMethodName(), new String[] { "foo" });
// Inject our faulty LocalFileSystem
conf.setClass("fs.file.impl", FaultyFileSystem.class, FileSystem.class);
user.runAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
// Make sure it worked (above is sensitive to caching details in hadoop core)
FileSystem fs = FileSystem.get(conf);
assertEquals(FaultyFileSystem.class, fs.getClass());
FaultyFileSystem ffs = (FaultyFileSystem) fs;
// Initialize region
init(name.getMethodName(), conf);
MemStoreSize mss = store.memstore.getFlushableSize();
assertEquals(0, mss.getDataSize());
LOG.info("Adding some data");
MemStoreSizing kvSize = new NonThreadSafeMemStoreSizing();
store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), kvSize);
// add the heap size of active (mutable) segment
kvSize.incMemStoreSize(0, MutableSegment.DEEP_OVERHEAD, 0, 0);
mss = store.memstore.getFlushableSize();
assertEquals(kvSize.getMemStoreSize(), mss);
// Flush. Bug #1 from HBASE-10466. Make sure size calculation on failed flush is right.
try {
LOG.info("Flushing");
flushStore(store, id++);
fail("Didn't bubble up IOE!");
} catch (IOException ioe) {
assertTrue(ioe.getMessage().contains("Fault injected"));
}
// due to snapshot, change mutable to immutable segment
kvSize.incMemStoreSize(0,
CSLMImmutableSegment.DEEP_OVERHEAD_CSLM - MutableSegment.DEEP_OVERHEAD, 0, 0);
mss = store.memstore.getFlushableSize();
assertEquals(kvSize.getMemStoreSize(), mss);
MemStoreSizing kvSize2 = new NonThreadSafeMemStoreSizing();
store.add(new KeyValue(row, family, qf2, 2, (byte[]) null), kvSize2);
kvSize2.incMemStoreSize(0, MutableSegment.DEEP_OVERHEAD, 0, 0);
// Even though we add a new kv, we expect the flushable size to be 'same' since we have
// not yet cleared the snapshot -- the above flush failed.
assertEquals(kvSize.getMemStoreSize(), mss);
ffs.fault.set(false);
flushStore(store, id++);
mss = store.memstore.getFlushableSize();
// Size should be the foreground kv size.
assertEquals(kvSize2.getMemStoreSize(), mss);
flushStore(store, id++);
mss = store.memstore.getFlushableSize();
assertEquals(0, mss.getDataSize());
assertEquals(MutableSegment.DEEP_OVERHEAD, mss.getHeapSize());
return null;
}
});
}
@Test
public void testStoreBloomFilterMetricsWithBloomRowCol() throws IOException {
int numStoreFiles = 5;
writeAndRead(BloomType.ROWCOL, numStoreFiles);
assertEquals(0, store.getBloomFilterEligibleRequestsCount());
// hard to know exactly the numbers here, we are just trying to
// prove that they are incrementing
assertTrue(store.getBloomFilterRequestsCount() >= numStoreFiles);
assertTrue(store.getBloomFilterNegativeResultsCount() > 0);
}
@Test
public void testStoreBloomFilterMetricsWithBloomRow() throws IOException {
int numStoreFiles = 5;
writeAndRead(BloomType.ROWCOL, numStoreFiles);
assertEquals(0, store.getBloomFilterEligibleRequestsCount());
// hard to know exactly the numbers here, we are just trying to
// prove that they are incrementing
assertTrue(store.getBloomFilterRequestsCount() >= numStoreFiles);
assertTrue(store.getBloomFilterNegativeResultsCount() > 0);
}
@Test
public void testStoreBloomFilterMetricsWithBloomRowPrefix() throws IOException {
int numStoreFiles = 5;
writeAndRead(BloomType.ROWPREFIX_FIXED_LENGTH, numStoreFiles);
assertEquals(0, store.getBloomFilterEligibleRequestsCount());
// hard to know exactly the numbers here, we are just trying to
// prove that they are incrementing
assertTrue(store.getBloomFilterRequestsCount() >= numStoreFiles);
}
@Test
public void testStoreBloomFilterMetricsWithBloomNone() throws IOException {
int numStoreFiles = 5;
writeAndRead(BloomType.NONE, numStoreFiles);
assertEquals(0, store.getBloomFilterRequestsCount());
assertEquals(0, store.getBloomFilterNegativeResultsCount());
// hard to know exactly the numbers here, we are just trying to
// prove that they are incrementing
assertTrue(store.getBloomFilterEligibleRequestsCount() >= numStoreFiles);
}
private void writeAndRead(BloomType bloomType, int numStoreFiles) throws IOException {
Configuration conf = HBaseConfiguration.create();
FileSystem fs = FileSystem.get(conf);
ColumnFamilyDescriptor hcd = ColumnFamilyDescriptorBuilder.newBuilder(family)
.setCompressionType(Compression.Algorithm.GZ).setBloomFilterType(bloomType)
.setConfiguration(BloomFilterUtil.PREFIX_LENGTH_KEY, "3").build();
init(name.getMethodName(), conf, hcd);
for (int i = 1; i <= numStoreFiles; i++) {
byte[] row = Bytes.toBytes("row" + i);
LOG.info("Adding some data for the store file #" + i);
long timeStamp = EnvironmentEdgeManager.currentTime();
this.store.add(new KeyValue(row, family, qf1, timeStamp, (byte[]) null), null);
this.store.add(new KeyValue(row, family, qf2, timeStamp, (byte[]) null), null);
this.store.add(new KeyValue(row, family, qf3, timeStamp, (byte[]) null), null);
flush(i);
}
// Verify the total number of store files
assertEquals(numStoreFiles, this.store.getStorefiles().size());
TreeSet<byte[]> columns = new TreeSet<>(Bytes.BYTES_COMPARATOR);
columns.add(qf1);
for (int i = 1; i <= numStoreFiles; i++) {
KeyValueScanner scanner =
store.getScanner(new Scan(new Get(Bytes.toBytes("row" + i))), columns, 0);
scanner.peek();
}
}
/**
* Verify that compression and data block encoding are respected by the createWriter method, used
* on store flush.
*/
@Test
public void testCreateWriter() throws Exception {
Configuration conf = HBaseConfiguration.create();
FileSystem fs = FileSystem.get(conf);
ColumnFamilyDescriptor hcd =
ColumnFamilyDescriptorBuilder.newBuilder(family).setCompressionType(Compression.Algorithm.GZ)
.setDataBlockEncoding(DataBlockEncoding.DIFF).build();
init(name.getMethodName(), conf, hcd);
// Test createWriter
StoreFileWriter writer = store.getStoreEngine()
.createWriter(CreateStoreFileWriterParams.create().maxKeyCount(4)
.compression(hcd.getCompressionType()).isCompaction(false).includeMVCCReadpoint(true)
.includesTag(false).shouldDropBehind(false));
Path path = writer.getPath();
writer.append(new KeyValue(row, family, qf1, Bytes.toBytes(1)));
writer.append(new KeyValue(row, family, qf2, Bytes.toBytes(2)));
writer.append(new KeyValue(row2, family, qf1, Bytes.toBytes(3)));
writer.append(new KeyValue(row2, family, qf2, Bytes.toBytes(4)));
writer.close();
// Verify that compression and encoding settings are respected
HFile.Reader reader = HFile.createReader(fs, path, new CacheConfig(conf), true, conf);
assertEquals(hcd.getCompressionType(), reader.getTrailer().getCompressionCodec());
assertEquals(hcd.getDataBlockEncoding(), reader.getDataBlockEncoding());
reader.close();
}
@Test
public void testDeleteExpiredStoreFiles() throws Exception {
testDeleteExpiredStoreFiles(0);
testDeleteExpiredStoreFiles(1);
}
/**
* @param minVersions the MIN_VERSIONS for the column family
*/
public void testDeleteExpiredStoreFiles(int minVersions) throws Exception {
int storeFileNum = 4;
int ttl = 4;
IncrementingEnvironmentEdge edge = new IncrementingEnvironmentEdge();
EnvironmentEdgeManagerTestHelper.injectEdge(edge);
Configuration conf = HBaseConfiguration.create();
// Enable the expired store file deletion
conf.setBoolean("hbase.store.delete.expired.storefile", true);
// Set the compaction threshold higher to avoid normal compactions.
conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 5);
init(name.getMethodName() + "-" + minVersions, conf, ColumnFamilyDescriptorBuilder
.newBuilder(family).setMinVersions(minVersions).setTimeToLive(ttl).build());
long storeTtl = this.store.getScanInfo().getTtl();
long sleepTime = storeTtl / storeFileNum;
long timeStamp;
// There are 4 store files and the max time stamp difference among these
// store files will be (this.store.ttl / storeFileNum)
for (int i = 1; i <= storeFileNum; i++) {
LOG.info("Adding some data for the store file #" + i);
timeStamp = EnvironmentEdgeManager.currentTime();
this.store.add(new KeyValue(row, family, qf1, timeStamp, (byte[]) null), null);
this.store.add(new KeyValue(row, family, qf2, timeStamp, (byte[]) null), null);
this.store.add(new KeyValue(row, family, qf3, timeStamp, (byte[]) null), null);
flush(i);
edge.incrementTime(sleepTime);
}
// Verify the total number of store files
assertEquals(storeFileNum, this.store.getStorefiles().size());
// Each call will find one expired store file and delete it before compaction happens.
// There will be no compaction due to threshold above. Last file will not be replaced.
for (int i = 1; i <= storeFileNum - 1; i++) {
// verify the expired store file.
assertFalse(this.store.requestCompaction().isPresent());
Collection<HStoreFile> sfs = this.store.getStorefiles();
// Ensure i files are gone.
if (minVersions == 0) {
assertEquals(storeFileNum - i, sfs.size());
// Ensure only non-expired files remain.
for (HStoreFile sf : sfs) {
assertTrue(sf.getReader().getMaxTimestamp() >= (edge.currentTime() - storeTtl));
}
} else {
assertEquals(storeFileNum, sfs.size());
}
// Let the next store file expired.
edge.incrementTime(sleepTime);
}
assertFalse(this.store.requestCompaction().isPresent());
Collection<HStoreFile> sfs = this.store.getStorefiles();
// Assert the last expired file is not removed.
if (minVersions == 0) {
assertEquals(1, sfs.size());
}
long ts = sfs.iterator().next().getReader().getMaxTimestamp();
assertTrue(ts < (edge.currentTime() - storeTtl));
for (HStoreFile sf : sfs) {
sf.closeStoreFile(true);
}
}
@Test
public void testLowestModificationTime() throws Exception {
Configuration conf = HBaseConfiguration.create();
FileSystem fs = FileSystem.get(conf);
// Initialize region
init(name.getMethodName(), conf);
int storeFileNum = 4;
for (int i = 1; i <= storeFileNum; i++) {
LOG.info("Adding some data for the store file #" + i);
this.store.add(new KeyValue(row, family, qf1, i, (byte[]) null), null);
this.store.add(new KeyValue(row, family, qf2, i, (byte[]) null), null);
this.store.add(new KeyValue(row, family, qf3, i, (byte[]) null), null);
flush(i);
}
// after flush; check the lowest time stamp
long lowestTimeStampFromManager = StoreUtils.getLowestTimestamp(store.getStorefiles());
long lowestTimeStampFromFS = getLowestTimeStampFromFS(fs, store.getStorefiles());
assertEquals(lowestTimeStampFromManager, lowestTimeStampFromFS);
// after compact; check the lowest time stamp
store.compact(store.requestCompaction().get(), NoLimitThroughputController.INSTANCE, null);
lowestTimeStampFromManager = StoreUtils.getLowestTimestamp(store.getStorefiles());
lowestTimeStampFromFS = getLowestTimeStampFromFS(fs, store.getStorefiles());
assertEquals(lowestTimeStampFromManager, lowestTimeStampFromFS);
}
private static long getLowestTimeStampFromFS(FileSystem fs,
final Collection<HStoreFile> candidates) throws IOException {
long minTs = Long.MAX_VALUE;
if (candidates.isEmpty()) {
return minTs;
}
Path[] p = new Path[candidates.size()];
int i = 0;
for (HStoreFile sf : candidates) {
p[i] = sf.getPath();
++i;
}
FileStatus[] stats = fs.listStatus(p);
if (stats == null || stats.length == 0) {
return minTs;
}
for (FileStatus s : stats) {
minTs = Math.min(minTs, s.getModificationTime());
}
return minTs;
}
//////////////////////////////////////////////////////////////////////////////
// Get tests
//////////////////////////////////////////////////////////////////////////////
private static final int BLOCKSIZE_SMALL = 8192;
/**
* Test for hbase-1686.
*/
@Test
public void testEmptyStoreFile() throws IOException {
init(this.name.getMethodName());
// Write a store file.
this.store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), null);
this.store.add(new KeyValue(row, family, qf2, 1, (byte[]) null), null);
flush(1);
// Now put in place an empty store file. Its a little tricky. Have to
// do manually with hacked in sequence id.
HStoreFile f = this.store.getStorefiles().iterator().next();
Path storedir = f.getPath().getParent();
long seqid = f.getMaxSequenceId();
Configuration c = HBaseConfiguration.create();
FileSystem fs = FileSystem.get(c);
HFileContext meta = new HFileContextBuilder().withBlockSize(BLOCKSIZE_SMALL).build();
StoreFileWriter w = new StoreFileWriter.Builder(c, new CacheConfig(c), fs)
.withOutputDir(storedir).withFileContext(meta).build();
w.appendMetadata(seqid + 1, false);
w.close();
this.store.close();
// Reopen it... should pick up two files
this.store =
new HStore(this.store.getHRegion(), this.store.getColumnFamilyDescriptor(), c, false);
assertEquals(2, this.store.getStorefilesCount());
result = HBaseTestingUtil.getFromStoreFile(store, get.getRow(), qualifiers);
assertEquals(1, result.size());
}
/**
* Getting data from memstore only
*/
@Test
public void testGet_FromMemStoreOnly() throws IOException {
init(this.name.getMethodName());
// Put data in memstore
this.store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), null);
this.store.add(new KeyValue(row, family, qf2, 1, (byte[]) null), null);
this.store.add(new KeyValue(row, family, qf3, 1, (byte[]) null), null);
this.store.add(new KeyValue(row, family, qf4, 1, (byte[]) null), null);
this.store.add(new KeyValue(row, family, qf5, 1, (byte[]) null), null);
this.store.add(new KeyValue(row, family, qf6, 1, (byte[]) null), null);
// Get
result = HBaseTestingUtil.getFromStoreFile(store, get.getRow(), qualifiers);
// Compare
assertCheck();
}
@Test
public void testTimeRangeIfSomeCellsAreDroppedInFlush() throws IOException {
testTimeRangeIfSomeCellsAreDroppedInFlush(1);
testTimeRangeIfSomeCellsAreDroppedInFlush(3);
testTimeRangeIfSomeCellsAreDroppedInFlush(5);
}
private void testTimeRangeIfSomeCellsAreDroppedInFlush(int maxVersion) throws IOException {
init(this.name.getMethodName(), TEST_UTIL.getConfiguration(),
ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(maxVersion).build());
long currentTs = 100;
long minTs = currentTs;
// the extra cell won't be flushed to disk,
// so the min of timerange will be different between memStore and hfile.
for (int i = 0; i != (maxVersion + 1); ++i) {
this.store.add(new KeyValue(row, family, qf1, ++currentTs, (byte[]) null), null);
if (i == 1) {
minTs = currentTs;
}
}
flushStore(store, id++);
Collection<HStoreFile> files = store.getStorefiles();
assertEquals(1, files.size());
HStoreFile f = files.iterator().next();
f.initReader();
StoreFileReader reader = f.getReader();
assertEquals(minTs, reader.timeRange.getMin());
assertEquals(currentTs, reader.timeRange.getMax());
}
/**
* Getting data from files only
*/
@Test
public void testGet_FromFilesOnly() throws IOException {
init(this.name.getMethodName());
// Put data in memstore
this.store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), null);
this.store.add(new KeyValue(row, family, qf2, 1, (byte[]) null), null);
// flush
flush(1);
// Add more data
this.store.add(new KeyValue(row, family, qf3, 1, (byte[]) null), null);
this.store.add(new KeyValue(row, family, qf4, 1, (byte[]) null), null);
// flush
flush(2);
// Add more data
this.store.add(new KeyValue(row, family, qf5, 1, (byte[]) null), null);
this.store.add(new KeyValue(row, family, qf6, 1, (byte[]) null), null);
// flush
flush(3);
// Get
result = HBaseTestingUtil.getFromStoreFile(store, get.getRow(), qualifiers);
// this.store.get(get, qualifiers, result);
// Need to sort the result since multiple files
Collections.sort(result, CellComparatorImpl.COMPARATOR);
// Compare
assertCheck();
}
/**
* Getting data from memstore and files
*/
@Test
public void testGet_FromMemStoreAndFiles() throws IOException {
init(this.name.getMethodName());
// Put data in memstore
this.store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), null);
this.store.add(new KeyValue(row, family, qf2, 1, (byte[]) null), null);
// flush
flush(1);
// Add more data
this.store.add(new KeyValue(row, family, qf3, 1, (byte[]) null), null);
this.store.add(new KeyValue(row, family, qf4, 1, (byte[]) null), null);
// flush
flush(2);
// Add more data
this.store.add(new KeyValue(row, family, qf5, 1, (byte[]) null), null);
this.store.add(new KeyValue(row, family, qf6, 1, (byte[]) null), null);
// Get
result = HBaseTestingUtil.getFromStoreFile(store, get.getRow(), qualifiers);
// Need to sort the result since multiple files
Collections.sort(result, CellComparatorImpl.COMPARATOR);
// Compare
assertCheck();
}
private void flush(int storeFilessize) throws IOException {
flushStore(store, id++);
assertEquals(storeFilessize, this.store.getStorefiles().size());
assertEquals(0, ((AbstractMemStore) this.store.memstore).getActive().getCellsCount());
}
private void assertCheck() {
assertEquals(expected.size(), result.size());
for (int i = 0; i < expected.size(); i++) {
assertEquals(expected.get(i), result.get(i));
}
}
@After
public void tearDown() throws Exception {
EnvironmentEdgeManagerTestHelper.reset();
if (store != null) {
try {
store.close();
} catch (IOException e) {
}
store = null;
}
if (region != null) {
region.close();
region = null;
}
}
@AfterClass
public static void tearDownAfterClass() throws IOException {
TEST_UTIL.cleanupTestDir();
}
@Test
public void testHandleErrorsInFlush() throws Exception {
LOG.info("Setting up a faulty file system that cannot write");
final Configuration conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
User user = User.createUserForTesting(conf, "testhandleerrorsinflush", new String[] { "foo" });
// Inject our faulty LocalFileSystem
conf.setClass("fs.file.impl", FaultyFileSystem.class, FileSystem.class);
user.runAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
// Make sure it worked (above is sensitive to caching details in hadoop core)
FileSystem fs = FileSystem.get(conf);
assertEquals(FaultyFileSystem.class, fs.getClass());
// Initialize region
init(name.getMethodName(), conf);
LOG.info("Adding some data");
store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), null);
store.add(new KeyValue(row, family, qf2, 1, (byte[]) null), null);
store.add(new KeyValue(row, family, qf3, 1, (byte[]) null), null);
LOG.info("Before flush, we should have no files");
Collection<StoreFileInfo> files =
store.getRegionFileSystem().getStoreFiles(store.getColumnFamilyName());
assertEquals(0, files != null ? files.size() : 0);
// flush
try {
LOG.info("Flushing");
flush(1);
fail("Didn't bubble up IOE!");
} catch (IOException ioe) {
assertTrue(ioe.getMessage().contains("Fault injected"));
}
LOG.info("After failed flush, we should still have no files!");
files = store.getRegionFileSystem().getStoreFiles(store.getColumnFamilyName());
assertEquals(0, files != null ? files.size() : 0);
store.getHRegion().getWAL().close();
return null;
}
});
FileSystem.closeAllForUGI(user.getUGI());
}
/**
* Faulty file system that will fail if you write past its fault position the FIRST TIME only;
* thereafter it will succeed. Used by {@link TestHRegion} too.
*/
static class FaultyFileSystem extends FilterFileSystem {
List<SoftReference<FaultyOutputStream>> outStreams = new ArrayList<>();
private long faultPos = 200;
AtomicBoolean fault = new AtomicBoolean(true);
public FaultyFileSystem() {
super(new LocalFileSystem());
LOG.info("Creating faulty!");
}
@Override
public FSDataOutputStream create(Path p) throws IOException {
return new FaultyOutputStream(super.create(p), faultPos, fault);
}
@Override
public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite,
int bufferSize, short replication, long blockSize, Progressable progress) throws IOException {
return new FaultyOutputStream(
super.create(f, permission, overwrite, bufferSize, replication, blockSize, progress),
faultPos, fault);
}
@Override
public FSDataOutputStream createNonRecursive(Path f, boolean overwrite, int bufferSize,
short replication, long blockSize, Progressable progress) throws IOException {
// Fake it. Call create instead. The default implementation throws an IOE
// that this is not supported.
return create(f, overwrite, bufferSize, replication, blockSize, progress);
}
}
static class FaultyOutputStream extends FSDataOutputStream {
volatile long faultPos = Long.MAX_VALUE;
private final AtomicBoolean fault;
public FaultyOutputStream(FSDataOutputStream out, long faultPos, final AtomicBoolean fault)
throws IOException {
super(out, null);
this.faultPos = faultPos;
this.fault = fault;
}
@Override
public synchronized void write(byte[] buf, int offset, int length) throws IOException {
LOG.info("faulty stream write at pos " + getPos());
injectFault();
super.write(buf, offset, length);
}
private void injectFault() throws IOException {
if (this.fault.get() && getPos() >= faultPos) {
throw new IOException("Fault injected");
}
}
}
private static StoreFlushContext flushStore(HStore store, long id) throws IOException {
StoreFlushContext storeFlushCtx = store.createFlushContext(id, FlushLifeCycleTracker.DUMMY);
storeFlushCtx.prepare();
storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
return storeFlushCtx;
}
/**
* Generate a list of KeyValues for testing based on given parameters
* @return the rows key-value list
*/
private List<Cell> getKeyValueSet(long[] timestamps, int numRows, byte[] qualifier,
byte[] family) {
List<Cell> kvList = new ArrayList<>();
for (int i = 1; i <= numRows; i++) {
byte[] b = Bytes.toBytes(i);
for (long timestamp : timestamps) {
kvList.add(new KeyValue(b, family, qualifier, timestamp, b));
}
}
return kvList;
}
/**
* Test to ensure correctness when using Stores with multiple timestamps
*/
@Test
public void testMultipleTimestamps() throws IOException {
int numRows = 1;
long[] timestamps1 = new long[] { 1, 5, 10, 20 };
long[] timestamps2 = new long[] { 30, 80 };
init(this.name.getMethodName());
List<Cell> kvList1 = getKeyValueSet(timestamps1, numRows, qf1, family);
for (Cell kv : kvList1) {
this.store.add(kv, null);
}
flushStore(store, id++);
List<Cell> kvList2 = getKeyValueSet(timestamps2, numRows, qf1, family);
for (Cell kv : kvList2) {
this.store.add(kv, null);
}
List<Cell> result;
Get get = new Get(Bytes.toBytes(1));
get.addColumn(family, qf1);
get.setTimeRange(0, 15);
result = HBaseTestingUtil.getFromStoreFile(store, get);
assertTrue(result.size() > 0);
get.setTimeRange(40, 90);
result = HBaseTestingUtil.getFromStoreFile(store, get);
assertTrue(result.size() > 0);
get.setTimeRange(10, 45);
result = HBaseTestingUtil.getFromStoreFile(store, get);
assertTrue(result.size() > 0);
get.setTimeRange(80, 145);
result = HBaseTestingUtil.getFromStoreFile(store, get);
assertTrue(result.size() > 0);
get.setTimeRange(1, 2);
result = HBaseTestingUtil.getFromStoreFile(store, get);
assertTrue(result.size() > 0);
get.setTimeRange(90, 200);
result = HBaseTestingUtil.getFromStoreFile(store, get);
assertTrue(result.size() == 0);
}
/**
* Test for HBASE-3492 - Test split on empty colfam (no store files).
* @throws IOException When the IO operations fail.
*/
@Test
public void testSplitWithEmptyColFam() throws IOException {
init(this.name.getMethodName());
assertFalse(store.getSplitPoint().isPresent());
}
@Test
public void testStoreUsesConfigurationFromHcdAndHtd() throws Exception {
final String CONFIG_KEY = "hbase.regionserver.thread.compaction.throttle";
long anyValue = 10;
// We'll check that it uses correct config and propagates it appropriately by going thru
// the simplest "real" path I can find - "throttleCompaction", which just checks whether
// a number we pass in is higher than some config value, inside compactionPolicy.
Configuration conf = HBaseConfiguration.create();
conf.setLong(CONFIG_KEY, anyValue);
init(name.getMethodName() + "-xml", conf);
assertTrue(store.throttleCompaction(anyValue + 1));
assertFalse(store.throttleCompaction(anyValue));
// HTD overrides XML.
--anyValue;
init(
name.getMethodName() + "-htd", conf, TableDescriptorBuilder
.newBuilder(TableName.valueOf(table)).setValue(CONFIG_KEY, Long.toString(anyValue)),
ColumnFamilyDescriptorBuilder.of(family));
assertTrue(store.throttleCompaction(anyValue + 1));
assertFalse(store.throttleCompaction(anyValue));
// HCD overrides them both.
--anyValue;
init(name.getMethodName() + "-hcd", conf,
TableDescriptorBuilder.newBuilder(TableName.valueOf(table)).setValue(CONFIG_KEY,
Long.toString(anyValue)),
ColumnFamilyDescriptorBuilder.newBuilder(family).setValue(CONFIG_KEY, Long.toString(anyValue))
.build());
assertTrue(store.throttleCompaction(anyValue + 1));
assertFalse(store.throttleCompaction(anyValue));
}
public static class DummyStoreEngine extends DefaultStoreEngine {
public static DefaultCompactor lastCreatedCompactor = null;
@Override
protected void createComponents(Configuration conf, HStore store, CellComparator comparator)
throws IOException {
super.createComponents(conf, store, comparator);
lastCreatedCompactor = this.compactor;
}
}
@Test
public void testStoreUsesSearchEngineOverride() throws Exception {
Configuration conf = HBaseConfiguration.create();
conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DummyStoreEngine.class.getName());
init(this.name.getMethodName(), conf);
assertEquals(DummyStoreEngine.lastCreatedCompactor, this.store.storeEngine.getCompactor());
}
private void addStoreFile() throws IOException {
HStoreFile f = this.store.getStorefiles().iterator().next();
Path storedir = f.getPath().getParent();
long seqid = this.store.getMaxSequenceId().orElse(0L);
Configuration c = TEST_UTIL.getConfiguration();
FileSystem fs = FileSystem.get(c);
HFileContext fileContext = new HFileContextBuilder().withBlockSize(BLOCKSIZE_SMALL).build();
StoreFileWriter w = new StoreFileWriter.Builder(c, new CacheConfig(c), fs)
.withOutputDir(storedir).withFileContext(fileContext).build();
w.appendMetadata(seqid + 1, false);
w.close();
LOG.info("Added store file:" + w.getPath());
}
private void archiveStoreFile(int index) throws IOException {
Collection<HStoreFile> files = this.store.getStorefiles();
HStoreFile sf = null;
Iterator<HStoreFile> it = files.iterator();
for (int i = 0; i <= index; i++) {
sf = it.next();
}
store.getRegionFileSystem().removeStoreFiles(store.getColumnFamilyName(),
Lists.newArrayList(sf));
}
private void closeCompactedFile(int index) throws IOException {
Collection<HStoreFile> files =
this.store.getStoreEngine().getStoreFileManager().getCompactedfiles();
if (files.size() > 0) {
HStoreFile sf = null;
Iterator<HStoreFile> it = files.iterator();
for (int i = 0; i <= index; i++) {
sf = it.next();
}
sf.closeStoreFile(true);
store.getStoreEngine().getStoreFileManager()
.removeCompactedFiles(Collections.singletonList(sf));
}
}
@Test
public void testRefreshStoreFiles() throws Exception {
init(name.getMethodName());
assertEquals(0, this.store.getStorefilesCount());
// Test refreshing store files when no store files are there
store.refreshStoreFiles();
assertEquals(0, this.store.getStorefilesCount());
// add some data, flush
this.store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), null);
flush(1);
assertEquals(1, this.store.getStorefilesCount());
// add one more file
addStoreFile();
assertEquals(1, this.store.getStorefilesCount());
store.refreshStoreFiles();
assertEquals(2, this.store.getStorefilesCount());
// add three more files
addStoreFile();
addStoreFile();
addStoreFile();
assertEquals(2, this.store.getStorefilesCount());
store.refreshStoreFiles();
assertEquals(5, this.store.getStorefilesCount());
closeCompactedFile(0);
archiveStoreFile(0);
assertEquals(5, this.store.getStorefilesCount());
store.refreshStoreFiles();
assertEquals(4, this.store.getStorefilesCount());
archiveStoreFile(0);
archiveStoreFile(1);
archiveStoreFile(2);
assertEquals(4, this.store.getStorefilesCount());
store.refreshStoreFiles();
assertEquals(1, this.store.getStorefilesCount());
archiveStoreFile(0);
store.refreshStoreFiles();
assertEquals(0, this.store.getStorefilesCount());
}
@Test
public void testRefreshStoreFilesNotChanged() throws IOException {
init(name.getMethodName());
assertEquals(0, this.store.getStorefilesCount());
// add some data, flush
this.store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), null);
flush(1);
// add one more file
addStoreFile();
StoreEngine<?, ?, ?, ?> spiedStoreEngine = spy(store.getStoreEngine());
// call first time after files changed
spiedStoreEngine.refreshStoreFiles();
assertEquals(2, this.store.getStorefilesCount());
verify(spiedStoreEngine, times(1)).replaceStoreFiles(any(), any(), any(), any());
// call second time
spiedStoreEngine.refreshStoreFiles();
// ensure that replaceStoreFiles is not called, i.e, the times does not change, if files are not
// refreshed,
verify(spiedStoreEngine, times(1)).replaceStoreFiles(any(), any(), any(), any());
}
@Test
public void testScanWithCompactionAfterFlush() throws Exception {
TEST_UTIL.getConfiguration().set(DEFAULT_COMPACTION_POLICY_CLASS_KEY,
EverythingPolicy.class.getName());
init(name.getMethodName());
assertEquals(0, this.store.getStorefilesCount());
KeyValue kv = new KeyValue(row, family, qf1, 1, (byte[]) null);
// add some data, flush
this.store.add(kv, null);
flush(1);
kv = new KeyValue(row, family, qf2, 1, (byte[]) null);
// add some data, flush
this.store.add(kv, null);
flush(2);
kv = new KeyValue(row, family, qf3, 1, (byte[]) null);
// add some data, flush
this.store.add(kv, null);
flush(3);
ExecutorService service = Executors.newFixedThreadPool(2);
Scan scan = new Scan(new Get(row));
Future<KeyValueScanner> scanFuture = service.submit(() -> {
try {
LOG.info(">>>> creating scanner");
return this.store.createScanner(scan,
new ScanInfo(HBaseConfiguration.create(),
ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(4).build(),
Long.MAX_VALUE, 0, CellComparator.getInstance()),
scan.getFamilyMap().get(store.getColumnFamilyDescriptor().getName()), 0);
} catch (IOException e) {
e.printStackTrace();
return null;
}
});
Future compactFuture = service.submit(() -> {
try {
LOG.info(">>>>>> starting compaction");
Optional<CompactionContext> opCompaction = this.store.requestCompaction();
assertTrue(opCompaction.isPresent());
store.compact(opCompaction.get(), new NoLimitThroughputController(), User.getCurrent());
LOG.info(">>>>>> Compaction is finished");
this.store.closeAndArchiveCompactedFiles();
LOG.info(">>>>>> Compacted files deleted");
} catch (IOException e) {
e.printStackTrace();
}
});
KeyValueScanner kvs = scanFuture.get();
compactFuture.get();
((StoreScanner) kvs).currentScanners.forEach(s -> {
if (s instanceof StoreFileScanner) {
assertEquals(1, ((StoreFileScanner) s).getReader().getRefCount());
}
});
kvs.seek(kv);
service.shutdownNow();
}
private long countMemStoreScanner(StoreScanner scanner) {
if (scanner.currentScanners == null) {
return 0;
}
return scanner.currentScanners.stream().filter(s -> !s.isFileScanner()).count();
}
@Test
public void testNumberOfMemStoreScannersAfterFlush() throws IOException {
long seqId = 100;
long timestamp = EnvironmentEdgeManager.currentTime();
Cell cell0 = CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row).setFamily(family)
.setQualifier(qf1).setTimestamp(timestamp).setType(Cell.Type.Put).setValue(qf1).build();
PrivateCellUtil.setSequenceId(cell0, seqId);
testNumberOfMemStoreScannersAfterFlush(Arrays.asList(cell0), Collections.emptyList());
Cell cell1 = CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row).setFamily(family)
.setQualifier(qf2).setTimestamp(timestamp).setType(Cell.Type.Put).setValue(qf1).build();
PrivateCellUtil.setSequenceId(cell1, seqId);
testNumberOfMemStoreScannersAfterFlush(Arrays.asList(cell0), Arrays.asList(cell1));
seqId = 101;
timestamp = EnvironmentEdgeManager.currentTime();
Cell cell2 = CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row2).setFamily(family)
.setQualifier(qf2).setTimestamp(timestamp).setType(Cell.Type.Put).setValue(qf1).build();
PrivateCellUtil.setSequenceId(cell2, seqId);
testNumberOfMemStoreScannersAfterFlush(Arrays.asList(cell0), Arrays.asList(cell1, cell2));
}
private void testNumberOfMemStoreScannersAfterFlush(List<Cell> inputCellsBeforeSnapshot,
List<Cell> inputCellsAfterSnapshot) throws IOException {
init(this.name.getMethodName() + "-" + inputCellsBeforeSnapshot.size());
TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
long seqId = Long.MIN_VALUE;
for (Cell c : inputCellsBeforeSnapshot) {
quals.add(CellUtil.cloneQualifier(c));
seqId = Math.max(seqId, c.getSequenceId());
}
for (Cell c : inputCellsAfterSnapshot) {
quals.add(CellUtil.cloneQualifier(c));
seqId = Math.max(seqId, c.getSequenceId());
}
inputCellsBeforeSnapshot.forEach(c -> store.add(c, null));
StoreFlushContext storeFlushCtx = store.createFlushContext(id++, FlushLifeCycleTracker.DUMMY);
storeFlushCtx.prepare();
inputCellsAfterSnapshot.forEach(c -> store.add(c, null));
int numberOfMemScannersBeforeFlush = inputCellsAfterSnapshot.isEmpty() ? 1 : 2;
try (StoreScanner s = (StoreScanner) store.getScanner(new Scan(), quals, seqId)) {
// snapshot + active (if inputCellsAfterSnapshot isn't empty)
assertEquals(numberOfMemScannersBeforeFlush, countMemStoreScanner(s));
storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
// snapshot has no data after flush
int numberOfMemScannersAfterFlush = inputCellsAfterSnapshot.isEmpty() ? 0 : 1;
boolean more;
int cellCount = 0;
do {
List<Cell> cells = new ArrayList<>();
more = s.next(cells);
cellCount += cells.size();
assertEquals(more ? numberOfMemScannersAfterFlush : 0, countMemStoreScanner(s));
} while (more);
assertEquals(
"The number of cells added before snapshot is " + inputCellsBeforeSnapshot.size()
+ ", The number of cells added after snapshot is " + inputCellsAfterSnapshot.size(),
inputCellsBeforeSnapshot.size() + inputCellsAfterSnapshot.size(), cellCount);
// the current scanners is cleared
assertEquals(0, countMemStoreScanner(s));
}
}
private Cell createCell(byte[] qualifier, long ts, long sequenceId, byte[] value)
throws IOException {
return createCell(row, qualifier, ts, sequenceId, value);
}
private Cell createCell(byte[] row, byte[] qualifier, long ts, long sequenceId, byte[] value)
throws IOException {
Cell c = CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row).setFamily(family)
.setQualifier(qualifier).setTimestamp(ts).setType(Cell.Type.Put).setValue(value).build();
PrivateCellUtil.setSequenceId(c, sequenceId);
return c;
}
@Test
public void testFlushBeforeCompletingScanWoFilter() throws IOException, InterruptedException {
final AtomicBoolean timeToGoNextRow = new AtomicBoolean(false);
final int expectedSize = 3;
testFlushBeforeCompletingScan(new MyListHook() {
@Override
public void hook(int currentSize) {
if (currentSize == expectedSize - 1) {
try {
flushStore(store, id++);
timeToGoNextRow.set(true);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
}, new FilterBase() {
@Override
public Filter.ReturnCode filterCell(final Cell c) throws IOException {
return ReturnCode.INCLUDE;
}
}, expectedSize);
}
@Test
public void testFlushBeforeCompletingScanWithFilter() throws IOException, InterruptedException {
final AtomicBoolean timeToGoNextRow = new AtomicBoolean(false);
final int expectedSize = 2;
testFlushBeforeCompletingScan(new MyListHook() {
@Override
public void hook(int currentSize) {
if (currentSize == expectedSize - 1) {
try {
flushStore(store, id++);
timeToGoNextRow.set(true);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
}, new FilterBase() {
@Override
public Filter.ReturnCode filterCell(final Cell c) throws IOException {
if (timeToGoNextRow.get()) {
timeToGoNextRow.set(false);
return ReturnCode.NEXT_ROW;
} else {
return ReturnCode.INCLUDE;
}
}
}, expectedSize);
}
@Test
public void testFlushBeforeCompletingScanWithFilterHint()
throws IOException, InterruptedException {
final AtomicBoolean timeToGetHint = new AtomicBoolean(false);
final int expectedSize = 2;
testFlushBeforeCompletingScan(new MyListHook() {
@Override
public void hook(int currentSize) {
if (currentSize == expectedSize - 1) {
try {
flushStore(store, id++);
timeToGetHint.set(true);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
}, new FilterBase() {
@Override
public Filter.ReturnCode filterCell(final Cell c) throws IOException {
if (timeToGetHint.get()) {
timeToGetHint.set(false);
return Filter.ReturnCode.SEEK_NEXT_USING_HINT;
} else {
return Filter.ReturnCode.INCLUDE;
}
}
@Override
public Cell getNextCellHint(Cell currentCell) throws IOException {
return currentCell;
}
}, expectedSize);
}
private void testFlushBeforeCompletingScan(MyListHook hook, Filter filter, int expectedSize)
throws IOException, InterruptedException {
Configuration conf = HBaseConfiguration.create();
byte[] r0 = Bytes.toBytes("row0");
byte[] r1 = Bytes.toBytes("row1");
byte[] r2 = Bytes.toBytes("row2");
byte[] value0 = Bytes.toBytes("value0");
byte[] value1 = Bytes.toBytes("value1");
byte[] value2 = Bytes.toBytes("value2");
MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
long ts = EnvironmentEdgeManager.currentTime();
long seqId = 100;
init(name.getMethodName(), conf, TableDescriptorBuilder.newBuilder(TableName.valueOf(table)),
ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(1).build(),
new MyStoreHook() {
@Override
public long getSmallestReadPoint(HStore store) {
return seqId + 3;
}
});
// The cells having the value0 won't be flushed to disk because the value of max version is 1
store.add(createCell(r0, qf1, ts, seqId, value0), memStoreSizing);
store.add(createCell(r0, qf2, ts, seqId, value0), memStoreSizing);
store.add(createCell(r0, qf3, ts, seqId, value0), memStoreSizing);
store.add(createCell(r1, qf1, ts + 1, seqId + 1, value1), memStoreSizing);
store.add(createCell(r1, qf2, ts + 1, seqId + 1, value1), memStoreSizing);
store.add(createCell(r1, qf3, ts + 1, seqId + 1, value1), memStoreSizing);
store.add(createCell(r2, qf1, ts + 2, seqId + 2, value2), memStoreSizing);
store.add(createCell(r2, qf2, ts + 2, seqId + 2, value2), memStoreSizing);
store.add(createCell(r2, qf3, ts + 2, seqId + 2, value2), memStoreSizing);
store.add(createCell(r1, qf1, ts + 3, seqId + 3, value1), memStoreSizing);
store.add(createCell(r1, qf2, ts + 3, seqId + 3, value1), memStoreSizing);
store.add(createCell(r1, qf3, ts + 3, seqId + 3, value1), memStoreSizing);
List<Cell> myList = new MyList<>(hook);
Scan scan = new Scan().withStartRow(r1).setFilter(filter);
try (InternalScanner scanner = (InternalScanner) store.getScanner(scan, null, seqId + 3)) {
// r1
scanner.next(myList);
assertEquals(expectedSize, myList.size());
for (Cell c : myList) {
byte[] actualValue = CellUtil.cloneValue(c);
assertTrue("expected:" + Bytes.toStringBinary(value1) + ", actual:"
+ Bytes.toStringBinary(actualValue), Bytes.equals(actualValue, value1));
}
List<Cell> normalList = new ArrayList<>(3);
// r2
scanner.next(normalList);
assertEquals(3, normalList.size());
for (Cell c : normalList) {
byte[] actualValue = CellUtil.cloneValue(c);
assertTrue("expected:" + Bytes.toStringBinary(value2) + ", actual:"
+ Bytes.toStringBinary(actualValue), Bytes.equals(actualValue, value2));
}
}
}
@Test
public void testCreateScannerAndSnapshotConcurrently() throws IOException, InterruptedException {
Configuration conf = HBaseConfiguration.create();
conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore.class.getName());
init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
.setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
byte[] value = Bytes.toBytes("value");
MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
long ts = EnvironmentEdgeManager.currentTime();
long seqId = 100;
// older data whihc shouldn't be "seen" by client
store.add(createCell(qf1, ts, seqId, value), memStoreSizing);
store.add(createCell(qf2, ts, seqId, value), memStoreSizing);
store.add(createCell(qf3, ts, seqId, value), memStoreSizing);
TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
quals.add(qf1);
quals.add(qf2);
quals.add(qf3);
StoreFlushContext storeFlushCtx = store.createFlushContext(id++, FlushLifeCycleTracker.DUMMY);
MyCompactingMemStore.START_TEST.set(true);
Runnable flush = () -> {
// this is blocked until we create first scanner from pipeline and snapshot -- phase (1/5)
// recreate the active memstore -- phase (4/5)
storeFlushCtx.prepare();
};
ExecutorService service = Executors.newSingleThreadExecutor();
service.execute(flush);
// we get scanner from pipeline and snapshot but they are empty. -- phase (2/5)
// this is blocked until we recreate the active memstore -- phase (3/5)
// we get scanner from active memstore but it is empty -- phase (5/5)
InternalScanner scanner =
(InternalScanner) store.getScanner(new Scan(new Get(row)), quals, seqId + 1);
service.shutdown();
service.awaitTermination(20, TimeUnit.SECONDS);
try {
try {
List<Cell> results = new ArrayList<>();
scanner.next(results);
assertEquals(3, results.size());
for (Cell c : results) {
byte[] actualValue = CellUtil.cloneValue(c);
assertTrue("expected:" + Bytes.toStringBinary(value) + ", actual:"
+ Bytes.toStringBinary(actualValue), Bytes.equals(actualValue, value));
}
} finally {
scanner.close();
}
} finally {
MyCompactingMemStore.START_TEST.set(false);
storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
}
}
@Test
public void testScanWithDoubleFlush() throws IOException {
Configuration conf = HBaseConfiguration.create();
// Initialize region
MyStore myStore = initMyStore(name.getMethodName(), conf, new MyStoreHook() {
@Override
public void getScanners(MyStore store) throws IOException {
final long tmpId = id++;
ExecutorService s = Executors.newSingleThreadExecutor();
s.execute(() -> {
try {
// flush the store before storescanner updates the scanners from store.
// The current data will be flushed into files, and the memstore will
// be clear.
// -- phase (4/4)
flushStore(store, tmpId);
} catch (IOException ex) {
throw new RuntimeException(ex);
}
});
s.shutdown();
try {
// wait for the flush, the thread will be blocked in HStore#notifyChangedReadersObservers.
s.awaitTermination(3, TimeUnit.SECONDS);
} catch (InterruptedException ex) {
}
}
});
byte[] oldValue = Bytes.toBytes("oldValue");
byte[] currentValue = Bytes.toBytes("currentValue");
MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
long ts = EnvironmentEdgeManager.currentTime();
long seqId = 100;
// older data whihc shouldn't be "seen" by client
myStore.add(createCell(qf1, ts, seqId, oldValue), memStoreSizing);
myStore.add(createCell(qf2, ts, seqId, oldValue), memStoreSizing);
myStore.add(createCell(qf3, ts, seqId, oldValue), memStoreSizing);
long snapshotId = id++;
// push older data into snapshot -- phase (1/4)
StoreFlushContext storeFlushCtx =
store.createFlushContext(snapshotId, FlushLifeCycleTracker.DUMMY);
storeFlushCtx.prepare();
// insert current data into active -- phase (2/4)
myStore.add(createCell(qf1, ts + 1, seqId + 1, currentValue), memStoreSizing);
myStore.add(createCell(qf2, ts + 1, seqId + 1, currentValue), memStoreSizing);
myStore.add(createCell(qf3, ts + 1, seqId + 1, currentValue), memStoreSizing);
TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
quals.add(qf1);
quals.add(qf2);
quals.add(qf3);
try (InternalScanner scanner =
(InternalScanner) myStore.getScanner(new Scan(new Get(row)), quals, seqId + 1)) {
// complete the flush -- phase (3/4)
storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
List<Cell> results = new ArrayList<>();
scanner.next(results);
assertEquals(3, results.size());
for (Cell c : results) {
byte[] actualValue = CellUtil.cloneValue(c);
assertTrue("expected:" + Bytes.toStringBinary(currentValue) + ", actual:"
+ Bytes.toStringBinary(actualValue), Bytes.equals(actualValue, currentValue));
}
}
}
/**
* This test is for HBASE-27519, when the {@link StoreScanner} is scanning,the Flush and the
* Compaction execute concurrently and theCcompaction compact and archive the flushed
* {@link HStoreFile} which is used by {@link StoreScanner#updateReaders}.Before
* HBASE-27519,{@link StoreScanner.updateReaders} would throw {@link FileNotFoundException}.
*/
@Test
public void testStoreScannerUpdateReadersWhenFlushAndCompactConcurrently() throws IOException {
Configuration conf = HBaseConfiguration.create();
conf.setBoolean(WALFactory.WAL_ENABLED, false);
conf.set(DEFAULT_COMPACTION_POLICY_CLASS_KEY, EverythingPolicy.class.getName());
byte[] r0 = Bytes.toBytes("row0");
byte[] r1 = Bytes.toBytes("row1");
final CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
final AtomicBoolean shouldWaitRef = new AtomicBoolean(false);
// Initialize region
final MyStore myStore = initMyStore(name.getMethodName(), conf, new MyStoreHook() {
@Override
public void getScanners(MyStore store) throws IOException {
try {
// Here this method is called by StoreScanner.updateReaders which is invoked by the
// following TestHStore.flushStore
if (shouldWaitRef.get()) {
// wait the following compaction Task start
cyclicBarrier.await();
// wait the following HStore.closeAndArchiveCompactedFiles end.
cyclicBarrier.await();
}
} catch (BrokenBarrierException | InterruptedException e) {
throw new RuntimeException(e);
}
}
});
final AtomicReference<Throwable> compactionExceptionRef = new AtomicReference<Throwable>(null);
Runnable compactionTask = () -> {
try {
// Only when the StoreScanner.updateReaders invoked by TestHStore.flushStore prepares for
// entering the MyStore.getScanners, compactionTask could start.
cyclicBarrier.await();
region.compactStore(family, new NoLimitThroughputController());
myStore.closeAndArchiveCompactedFiles();
// Notify StoreScanner.updateReaders could enter MyStore.getScanners.
cyclicBarrier.await();
} catch (Throwable e) {
compactionExceptionRef.set(e);
}
};
long ts = EnvironmentEdgeManager.currentTime();
long seqId = 100;
byte[] value = Bytes.toBytes("value");
// older data whihc shouldn't be "seen" by client
myStore.add(createCell(r0, qf1, ts, seqId, value), null);
flushStore(myStore, id++);
myStore.add(createCell(r0, qf2, ts, seqId, value), null);
flushStore(myStore, id++);
myStore.add(createCell(r0, qf3, ts, seqId, value), null);
TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
quals.add(qf1);
quals.add(qf2);
quals.add(qf3);
myStore.add(createCell(r1, qf1, ts, seqId, value), null);
myStore.add(createCell(r1, qf2, ts, seqId, value), null);
myStore.add(createCell(r1, qf3, ts, seqId, value), null);
Thread.currentThread()
.setName("testStoreScannerUpdateReadersWhenFlushAndCompactConcurrently thread");
Scan scan = new Scan();
scan.withStartRow(r0, true);
try (InternalScanner scanner = (InternalScanner) myStore.getScanner(scan, quals, seqId)) {
List<Cell> results = new MyList<>(size -> {
switch (size) {
case 1:
shouldWaitRef.set(true);
Thread thread = new Thread(compactionTask);
thread.setName("MyCompacting Thread.");
thread.start();
try {
flushStore(myStore, id++);
thread.join();
} catch (IOException | InterruptedException e) {
throw new RuntimeException(e);
}
shouldWaitRef.set(false);
break;
default:
break;
}
});
// Before HBASE-27519, here would throw java.io.FileNotFoundException because the storeFile
// which used by StoreScanner.updateReaders is deleted by compactionTask.
scanner.next(results);
// The results is r0 row cells.
assertEquals(3, results.size());
assertTrue(compactionExceptionRef.get() == null);
}
}
@Test
public void testReclaimChunkWhenScaning() throws IOException {
init("testReclaimChunkWhenScaning");
long ts = EnvironmentEdgeManager.currentTime();
long seqId = 100;
byte[] value = Bytes.toBytes("value");
// older data whihc shouldn't be "seen" by client
store.add(createCell(qf1, ts, seqId, value), null);
store.add(createCell(qf2, ts, seqId, value), null);
store.add(createCell(qf3, ts, seqId, value), null);
TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
quals.add(qf1);
quals.add(qf2);
quals.add(qf3);
try (InternalScanner scanner =
(InternalScanner) store.getScanner(new Scan(new Get(row)), quals, seqId)) {
List<Cell> results = new MyList<>(size -> {
switch (size) {
// 1) we get the first cell (qf1)
// 2) flush the data to have StoreScanner update inner scanners
// 3) the chunk will be reclaimed after updaing
case 1:
try {
flushStore(store, id++);
} catch (IOException e) {
throw new RuntimeException(e);
}
break;
// 1) we get the second cell (qf2)
// 2) add some cell to fill some byte into the chunk (we have only one chunk)
case 2:
try {
byte[] newValue = Bytes.toBytes("newValue");
// older data whihc shouldn't be "seen" by client
store.add(createCell(qf1, ts + 1, seqId + 1, newValue), null);
store.add(createCell(qf2, ts + 1, seqId + 1, newValue), null);
store.add(createCell(qf3, ts + 1, seqId + 1, newValue), null);
} catch (IOException e) {
throw new RuntimeException(e);
}
break;
default:
break;
}
});
scanner.next(results);
assertEquals(3, results.size());
for (Cell c : results) {
byte[] actualValue = CellUtil.cloneValue(c);
assertTrue("expected:" + Bytes.toStringBinary(value) + ", actual:"
+ Bytes.toStringBinary(actualValue), Bytes.equals(actualValue, value));
}
}
}
/**
* If there are two running InMemoryFlushRunnable, the later InMemoryFlushRunnable may change the
* versionedList. And the first InMemoryFlushRunnable will use the chagned versionedList to remove
* the corresponding segments. In short, there will be some segements which isn't in merge are
* removed.
*/
@Test
public void testRunDoubleMemStoreCompactors() throws IOException, InterruptedException {
int flushSize = 500;
Configuration conf = HBaseConfiguration.create();
conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStoreWithCustomCompactor.class.getName());
conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.25);
MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.set(0);
conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushSize));
// Set the lower threshold to invoke the "MERGE" policy
conf.set(MemStoreCompactionStrategy.COMPACTING_MEMSTORE_THRESHOLD_KEY, String.valueOf(0));
init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
.setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
byte[] value = Bytes.toBytes("thisisavarylargevalue");
MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
long ts = EnvironmentEdgeManager.currentTime();
long seqId = 100;
// older data whihc shouldn't be "seen" by client
store.add(createCell(qf1, ts, seqId, value), memStoreSizing);
store.add(createCell(qf2, ts, seqId, value), memStoreSizing);
store.add(createCell(qf3, ts, seqId, value), memStoreSizing);
assertEquals(1, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get());
StoreFlushContext storeFlushCtx = store.createFlushContext(id++, FlushLifeCycleTracker.DUMMY);
storeFlushCtx.prepare();
// This shouldn't invoke another in-memory flush because the first compactor thread
// hasn't accomplished the in-memory compaction.
store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSizing);
store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSizing);
store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSizing);
assertEquals(1, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get());
// okay. Let the compaction be completed
MyMemStoreCompactor.START_COMPACTOR_LATCH.countDown();
CompactingMemStore mem = (CompactingMemStore) ((HStore) store).memstore;
while (mem.isMemStoreFlushingInMemory()) {
TimeUnit.SECONDS.sleep(1);
}
// This should invoke another in-memory flush.
store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSizing);
store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSizing);
store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSizing);
assertEquals(2, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get());
conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE,
String.valueOf(TableDescriptorBuilder.DEFAULT_MEMSTORE_FLUSH_SIZE));
storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
}
@Test
public void testAge() throws IOException {
long currentTime = EnvironmentEdgeManager.currentTime();
ManualEnvironmentEdge edge = new ManualEnvironmentEdge();
edge.setValue(currentTime);
EnvironmentEdgeManager.injectEdge(edge);
Configuration conf = TEST_UTIL.getConfiguration();
ColumnFamilyDescriptor hcd = ColumnFamilyDescriptorBuilder.of(family);
initHRegion(name.getMethodName(), conf,
TableDescriptorBuilder.newBuilder(TableName.valueOf(table)), hcd, null, false);
HStore store = new HStore(region, hcd, conf, false) {
@Override
protected StoreEngine<?, ?, ?, ?> createStoreEngine(HStore store, Configuration conf,
CellComparator kvComparator) throws IOException {
List<HStoreFile> storefiles =
Arrays.asList(mockStoreFile(currentTime - 10), mockStoreFile(currentTime - 100),
mockStoreFile(currentTime - 1000), mockStoreFile(currentTime - 10000));
StoreFileManager sfm = mock(StoreFileManager.class);
when(sfm.getStorefiles()).thenReturn(storefiles);
StoreEngine<?, ?, ?, ?> storeEngine = mock(StoreEngine.class);
when(storeEngine.getStoreFileManager()).thenReturn(sfm);
return storeEngine;
}
};
assertEquals(10L, store.getMinStoreFileAge().getAsLong());
assertEquals(10000L, store.getMaxStoreFileAge().getAsLong());
assertEquals((10 + 100 + 1000 + 10000) / 4.0, store.getAvgStoreFileAge().getAsDouble(), 1E-4);
}
private HStoreFile mockStoreFile(long createdTime) {
StoreFileInfo info = mock(StoreFileInfo.class);
when(info.getCreatedTimestamp()).thenReturn(createdTime);
HStoreFile sf = mock(HStoreFile.class);
when(sf.getReader()).thenReturn(mock(StoreFileReader.class));
when(sf.isHFile()).thenReturn(true);
when(sf.getFileInfo()).thenReturn(info);
return sf;
}
private MyStore initMyStore(String methodName, Configuration conf, MyStoreHook hook)
throws IOException {
return (MyStore) init(methodName, conf,
TableDescriptorBuilder.newBuilder(TableName.valueOf(table)),
ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(5).build(), hook);
}
private static class MyStore extends HStore {
private final MyStoreHook hook;
MyStore(final HRegion region, final ColumnFamilyDescriptor family,
final Configuration confParam, MyStoreHook hook, boolean switchToPread) throws IOException {
super(region, family, confParam, false);
this.hook = hook;
}
@Override
public List<KeyValueScanner> getScanners(List<HStoreFile> files, boolean cacheBlocks,
boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow,
boolean includeStartRow, byte[] stopRow, boolean includeStopRow, long readPt,
boolean includeMemstoreScanner) throws IOException {
hook.getScanners(this);
return super.getScanners(files, cacheBlocks, usePread, isCompaction, matcher, startRow, true,
stopRow, false, readPt, includeMemstoreScanner);
}
@Override
public long getSmallestReadPoint() {
return hook.getSmallestReadPoint(this);
}
}
private abstract static class MyStoreHook {
void getScanners(MyStore store) throws IOException {
}
long getSmallestReadPoint(HStore store) {
return store.getHRegion().getSmallestReadPoint();
}
}
@Test
public void testSwitchingPreadtoStreamParallelyWithCompactionDischarger() throws Exception {
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.hstore.engine.class", DummyStoreEngine.class.getName());
conf.setLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, 0);
// Set the lower threshold to invoke the "MERGE" policy
MyStore store = initMyStore(name.getMethodName(), conf, new MyStoreHook() {
});
MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
long ts = EnvironmentEdgeManager.currentTime();
long seqID = 1L;
// Add some data to the region and do some flushes
for (int i = 1; i < 10; i++) {
store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")),
memStoreSizing);
}
// flush them
flushStore(store, seqID);
for (int i = 11; i < 20; i++) {
store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")),
memStoreSizing);
}
// flush them
flushStore(store, seqID);
for (int i = 21; i < 30; i++) {
store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")),
memStoreSizing);
}
// flush them
flushStore(store, seqID);
assertEquals(3, store.getStorefilesCount());
Scan scan = new Scan();
scan.addFamily(family);
Collection<HStoreFile> storefiles2 = store.getStorefiles();
ArrayList<HStoreFile> actualStorefiles = Lists.newArrayList(storefiles2);
StoreScanner storeScanner =
(StoreScanner) store.getScanner(scan, scan.getFamilyMap().get(family), Long.MAX_VALUE);
// get the current heap
KeyValueHeap heap = storeScanner.heap;
// create more store files
for (int i = 31; i < 40; i++) {
store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")),
memStoreSizing);
}
// flush them
flushStore(store, seqID);
for (int i = 41; i < 50; i++) {
store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")),
memStoreSizing);
}
// flush them
flushStore(store, seqID);
storefiles2 = store.getStorefiles();
ArrayList<HStoreFile> actualStorefiles1 = Lists.newArrayList(storefiles2);
actualStorefiles1.removeAll(actualStorefiles);
// Do compaction
MyThread thread = new MyThread(storeScanner);
thread.start();
store.replaceStoreFiles(actualStorefiles, actualStorefiles1, false);
thread.join();
KeyValueHeap heap2 = thread.getHeap();
assertFalse(heap.equals(heap2));
}
@Test
public void testMaxPreadBytesConfiguredToBeLessThanZero() throws Exception {
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.hstore.engine.class", DummyStoreEngine.class.getName());
// Set 'hbase.storescanner.pread.max.bytes' < 0, so that StoreScanner will be a STREAM type.
conf.setLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, -1);
MyStore store = initMyStore(name.getMethodName(), conf, new MyStoreHook() {
});
Scan scan = new Scan();
scan.addFamily(family);
// ReadType on Scan is still DEFAULT only.
assertEquals(ReadType.DEFAULT, scan.getReadType());
StoreScanner storeScanner =
(StoreScanner) store.getScanner(scan, scan.getFamilyMap().get(family), Long.MAX_VALUE);
assertFalse(storeScanner.isScanUsePread());
}
@Test
public void testSpaceQuotaChangeAfterReplacement() throws IOException {
final TableName tn = TableName.valueOf(name.getMethodName());
init(name.getMethodName());
RegionSizeStoreImpl sizeStore = new RegionSizeStoreImpl();
HStoreFile sf1 = mockStoreFileWithLength(1024L);
HStoreFile sf2 = mockStoreFileWithLength(2048L);
HStoreFile sf3 = mockStoreFileWithLength(4096L);
HStoreFile sf4 = mockStoreFileWithLength(8192L);
RegionInfo regionInfo = RegionInfoBuilder.newBuilder(tn).setStartKey(Bytes.toBytes("a"))
.setEndKey(Bytes.toBytes("b")).build();
// Compacting two files down to one, reducing size
sizeStore.put(regionInfo, 1024L + 4096L);
store.updateSpaceQuotaAfterFileReplacement(sizeStore, regionInfo, Arrays.asList(sf1, sf3),
Arrays.asList(sf2));
assertEquals(2048L, sizeStore.getRegionSize(regionInfo).getSize());
// The same file length in and out should have no change
store.updateSpaceQuotaAfterFileReplacement(sizeStore, regionInfo, Arrays.asList(sf2),
Arrays.asList(sf2));
assertEquals(2048L, sizeStore.getRegionSize(regionInfo).getSize());
// Increase the total size used
store.updateSpaceQuotaAfterFileReplacement(sizeStore, regionInfo, Arrays.asList(sf2),
Arrays.asList(sf3));
assertEquals(4096L, sizeStore.getRegionSize(regionInfo).getSize());
RegionInfo regionInfo2 = RegionInfoBuilder.newBuilder(tn).setStartKey(Bytes.toBytes("b"))
.setEndKey(Bytes.toBytes("c")).build();
store.updateSpaceQuotaAfterFileReplacement(sizeStore, regionInfo2, null, Arrays.asList(sf4));
assertEquals(8192L, sizeStore.getRegionSize(regionInfo2).getSize());
}
@Test
public void testHFileContextSetWithCFAndTable() throws Exception {
init(this.name.getMethodName());
StoreFileWriter writer = store.getStoreEngine()
.createWriter(CreateStoreFileWriterParams.create().maxKeyCount(10000L)
.compression(Compression.Algorithm.NONE).isCompaction(true).includeMVCCReadpoint(true)
.includesTag(false).shouldDropBehind(true));
HFileContext hFileContext = writer.getHFileWriter().getFileContext();
assertArrayEquals(family, hFileContext.getColumnFamily());
assertArrayEquals(table, hFileContext.getTableName());
}
// This test is for HBASE-26026, HBase Write be stuck when active segment has no cell
// but its dataSize exceeds inmemoryFlushSize
@Test
public void testCompactingMemStoreNoCellButDataSizeExceedsInmemoryFlushSize()
throws IOException, InterruptedException {
Configuration conf = HBaseConfiguration.create();
byte[] smallValue = new byte[3];
byte[] largeValue = new byte[9];
final long timestamp = EnvironmentEdgeManager.currentTime();
final long seqId = 100;
final Cell smallCell = createCell(qf1, timestamp, seqId, smallValue);
final Cell largeCell = createCell(qf2, timestamp, seqId, largeValue);
int smallCellByteSize = MutableSegment.getCellLength(smallCell);
int largeCellByteSize = MutableSegment.getCellLength(largeCell);
int flushByteSize = smallCellByteSize + largeCellByteSize - 2;
// set CompactingMemStore.inmemoryFlushSize to flushByteSize.
conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore2.class.getName());
conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005);
conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushByteSize * 200));
init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
.setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
MyCompactingMemStore2 myCompactingMemStore = ((MyCompactingMemStore2) store.memstore);
assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize);
myCompactingMemStore.smallCellPreUpdateCounter.set(0);
myCompactingMemStore.largeCellPreUpdateCounter.set(0);
final AtomicReference<Throwable> exceptionRef = new AtomicReference<Throwable>();
Thread smallCellThread = new Thread(() -> {
try {
store.add(smallCell, new NonThreadSafeMemStoreSizing());
} catch (Throwable exception) {
exceptionRef.set(exception);
}
});
smallCellThread.setName(MyCompactingMemStore2.SMALL_CELL_THREAD_NAME);
smallCellThread.start();
String oldThreadName = Thread.currentThread().getName();
try {
/**
* 1.smallCellThread enters CompactingMemStore.checkAndAddToActiveSize first, then
* largeCellThread enters CompactingMemStore.checkAndAddToActiveSize, and then largeCellThread
* invokes flushInMemory.
* <p/>
* 2. After largeCellThread finished CompactingMemStore.flushInMemory method, smallCellThread
* can add cell to currentActive . That is to say when largeCellThread called flushInMemory
* method, CompactingMemStore.active has no cell.
*/
Thread.currentThread().setName(MyCompactingMemStore2.LARGE_CELL_THREAD_NAME);
store.add(largeCell, new NonThreadSafeMemStoreSizing());
smallCellThread.join();
for (int i = 0; i < 100; i++) {
long currentTimestamp = timestamp + 100 + i;
Cell cell = createCell(qf2, currentTimestamp, seqId, largeValue);
store.add(cell, new NonThreadSafeMemStoreSizing());
}
} finally {
Thread.currentThread().setName(oldThreadName);
}
assertTrue(exceptionRef.get() == null);
}
// This test is for HBASE-26210, HBase Write be stuck when there is cell which size exceeds
// InmemoryFlushSize
@Test(timeout = 60000)
public void testCompactingMemStoreCellExceedInmemoryFlushSize() throws Exception {
Configuration conf = HBaseConfiguration.create();
conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore6.class.getName());
init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
.setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
MyCompactingMemStore6 myCompactingMemStore = ((MyCompactingMemStore6) store.memstore);
int size = (int) (myCompactingMemStore.getInmemoryFlushSize());
byte[] value = new byte[size + 1];
MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
long timestamp = EnvironmentEdgeManager.currentTime();
long seqId = 100;
Cell cell = createCell(qf1, timestamp, seqId, value);
int cellByteSize = MutableSegment.getCellLength(cell);
store.add(cell, memStoreSizing);
assertTrue(memStoreSizing.getCellsCount() == 1);
assertTrue(memStoreSizing.getDataSize() == cellByteSize);
// Waiting the in memory compaction completed, see HBASE-26438
myCompactingMemStore.inMemoryCompactionEndCyclicBarrier.await();
}
/**
* This test is for HBASE-27464, before this JIRA,when init {@link CellChunkImmutableSegment} for
* 'COMPACT' action, we not force copy to current MSLab. When cell size bigger than
* {@link MemStoreLABImpl#maxAlloc}, cell will stay in previous chunk which will recycle after
* segment replace, and we may read wrong data when these chunk reused by others.
*/
@Test
public void testForceCloneOfBigCellForCellChunkImmutableSegment() throws Exception {
Configuration conf = HBaseConfiguration.create();
int maxAllocByteSize = conf.getInt(MemStoreLAB.MAX_ALLOC_KEY, MemStoreLAB.MAX_ALLOC_DEFAULT);
// Construct big cell,which is large than {@link MemStoreLABImpl#maxAlloc}.
byte[] cellValue = new byte[maxAllocByteSize + 1];
final long timestamp = EnvironmentEdgeManager.currentTime();
final long seqId = 100;
final byte[] rowKey1 = Bytes.toBytes("rowKey1");
final Cell originalCell1 = createCell(rowKey1, qf1, timestamp, seqId, cellValue);
final byte[] rowKey2 = Bytes.toBytes("rowKey2");
final Cell originalCell2 = createCell(rowKey2, qf1, timestamp, seqId, cellValue);
TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
quals.add(qf1);
int cellByteSize = MutableSegment.getCellLength(originalCell1);
int inMemoryFlushByteSize = cellByteSize - 1;
// set CompactingMemStore.inmemoryFlushSize to flushByteSize.
conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore6.class.getName());
conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005);
conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(inMemoryFlushByteSize * 200));
conf.setBoolean(WALFactory.WAL_ENABLED, false);
// Use {@link MemoryCompactionPolicy#EAGER} for always compacting.
init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
.setInMemoryCompaction(MemoryCompactionPolicy.EAGER).build());
MyCompactingMemStore6 myCompactingMemStore = ((MyCompactingMemStore6) store.memstore);
assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == inMemoryFlushByteSize);
// Data chunk Pool is disabled.
assertTrue(ChunkCreator.getInstance().getMaxCount(ChunkType.DATA_CHUNK) == 0);
MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
// First compact
store.add(originalCell1, memStoreSizing);
// Waiting for the first in-memory compaction finished
myCompactingMemStore.inMemoryCompactionEndCyclicBarrier.await();
StoreScanner storeScanner =
(StoreScanner) store.getScanner(new Scan(new Get(rowKey1)), quals, seqId + 1);
SegmentScanner segmentScanner = getTypeKeyValueScanner(storeScanner, SegmentScanner.class);
Cell resultCell1 = segmentScanner.next();
assertTrue(CellUtil.equals(resultCell1, originalCell1));
int cell1ChunkId = ((ExtendedCell) resultCell1).getChunkId();
assertTrue(cell1ChunkId != ExtendedCell.CELL_NOT_BASED_ON_CHUNK);
assertNull(segmentScanner.next());
segmentScanner.close();
storeScanner.close();
Segment segment = segmentScanner.segment;
assertTrue(segment instanceof CellChunkImmutableSegment);
MemStoreLABImpl memStoreLAB1 = (MemStoreLABImpl) (segmentScanner.segment.getMemStoreLAB());
assertTrue(!memStoreLAB1.isClosed());
assertTrue(!memStoreLAB1.chunks.isEmpty());
assertTrue(!memStoreLAB1.isReclaimed());
// Second compact
store.add(originalCell2, memStoreSizing);
// Waiting for the second in-memory compaction finished
myCompactingMemStore.inMemoryCompactionEndCyclicBarrier.await();
// Before HBASE-27464, here may throw java.lang.IllegalArgumentException: In CellChunkMap, cell
// must be associated with chunk.. We were looking for a cell at index 0.
// The cause for this exception is because the data chunk Pool is disabled,when the data chunks
// are recycled after the second in-memory compaction finished,the
// {@link ChunkCreator.putbackChunks} method does not put the chunks back to the data chunk
// pool,it just removes them from {@link ChunkCreator#chunkIdMap},so in
// {@link CellChunkMap#getCell} we could not get the data chunk by chunkId.
storeScanner = (StoreScanner) store.getScanner(new Scan(new Get(rowKey1)), quals, seqId + 1);
segmentScanner = getTypeKeyValueScanner(storeScanner, SegmentScanner.class);
Cell newResultCell1 = segmentScanner.next();
assertTrue(newResultCell1 != resultCell1);
assertTrue(CellUtil.equals(newResultCell1, originalCell1));
Cell resultCell2 = segmentScanner.next();
assertTrue(CellUtil.equals(resultCell2, originalCell2));
assertNull(segmentScanner.next());
segmentScanner.close();
storeScanner.close();
segment = segmentScanner.segment;
assertTrue(segment instanceof CellChunkImmutableSegment);
MemStoreLABImpl memStoreLAB2 = (MemStoreLABImpl) (segmentScanner.segment.getMemStoreLAB());
assertTrue(!memStoreLAB2.isClosed());
assertTrue(!memStoreLAB2.chunks.isEmpty());
assertTrue(!memStoreLAB2.isReclaimed());
assertTrue(memStoreLAB1.isClosed());
assertTrue(memStoreLAB1.chunks.isEmpty());
assertTrue(memStoreLAB1.isReclaimed());
}
// This test is for HBASE-26210 also, test write large cell and small cell concurrently when
// InmemoryFlushSize is smaller,equal with and larger than cell size.
@Test
public void testCompactingMemStoreWriteLargeCellAndSmallCellConcurrently()
throws IOException, InterruptedException {
doWriteTestLargeCellAndSmallCellConcurrently(
(smallCellByteSize, largeCellByteSize) -> largeCellByteSize - 1);
doWriteTestLargeCellAndSmallCellConcurrently(
(smallCellByteSize, largeCellByteSize) -> largeCellByteSize);
doWriteTestLargeCellAndSmallCellConcurrently(
(smallCellByteSize, largeCellByteSize) -> smallCellByteSize + largeCellByteSize - 1);
doWriteTestLargeCellAndSmallCellConcurrently(
(smallCellByteSize, largeCellByteSize) -> smallCellByteSize + largeCellByteSize);
doWriteTestLargeCellAndSmallCellConcurrently(
(smallCellByteSize, largeCellByteSize) -> smallCellByteSize + largeCellByteSize + 1);
}
private void doWriteTestLargeCellAndSmallCellConcurrently(IntBinaryOperator getFlushByteSize)
throws IOException, InterruptedException {
Configuration conf = HBaseConfiguration.create();
byte[] smallValue = new byte[3];
byte[] largeValue = new byte[100];
final long timestamp = EnvironmentEdgeManager.currentTime();
final long seqId = 100;
final Cell smallCell = createCell(qf1, timestamp, seqId, smallValue);
final Cell largeCell = createCell(qf2, timestamp, seqId, largeValue);
int smallCellByteSize = MutableSegment.getCellLength(smallCell);
int largeCellByteSize = MutableSegment.getCellLength(largeCell);
int flushByteSize = getFlushByteSize.applyAsInt(smallCellByteSize, largeCellByteSize);
boolean flushByteSizeLessThanSmallAndLargeCellSize =
flushByteSize < (smallCellByteSize + largeCellByteSize);
conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore3.class.getName());
conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005);
conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushByteSize * 200));
init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
.setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
MyCompactingMemStore3 myCompactingMemStore = ((MyCompactingMemStore3) store.memstore);
assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize);
myCompactingMemStore.disableCompaction();
if (flushByteSizeLessThanSmallAndLargeCellSize) {
myCompactingMemStore.flushByteSizeLessThanSmallAndLargeCellSize = true;
} else {
myCompactingMemStore.flushByteSizeLessThanSmallAndLargeCellSize = false;
}
final ThreadSafeMemStoreSizing memStoreSizing = new ThreadSafeMemStoreSizing();
final AtomicLong totalCellByteSize = new AtomicLong(0);
final AtomicReference<Throwable> exceptionRef = new AtomicReference<Throwable>();
Thread smallCellThread = new Thread(() -> {
try {
for (int i = 1; i <= MyCompactingMemStore3.CELL_COUNT; i++) {
long currentTimestamp = timestamp + i;
Cell cell = createCell(qf1, currentTimestamp, seqId, smallValue);
totalCellByteSize.addAndGet(MutableSegment.getCellLength(cell));
store.add(cell, memStoreSizing);
}
} catch (Throwable exception) {
exceptionRef.set(exception);
}
});
smallCellThread.setName(MyCompactingMemStore3.SMALL_CELL_THREAD_NAME);
smallCellThread.start();
String oldThreadName = Thread.currentThread().getName();
try {
/**
* When flushByteSizeLessThanSmallAndLargeCellSize is true:
* </p>
* 1.smallCellThread enters MyCompactingMemStore3.checkAndAddToActiveSize first, then
* largeCellThread enters MyCompactingMemStore3.checkAndAddToActiveSize, and then
* largeCellThread invokes flushInMemory.
* <p/>
* 2. After largeCellThread finished CompactingMemStore.flushInMemory method, smallCellThread
* can run into MyCompactingMemStore3.checkAndAddToActiveSize again.
* <p/>
* When flushByteSizeLessThanSmallAndLargeCellSize is false: smallCellThread and
* largeCellThread concurrently write one cell and wait each other, and then write another
* cell etc.
*/
Thread.currentThread().setName(MyCompactingMemStore3.LARGE_CELL_THREAD_NAME);
for (int i = 1; i <= MyCompactingMemStore3.CELL_COUNT; i++) {
long currentTimestamp = timestamp + i;
Cell cell = createCell(qf2, currentTimestamp, seqId, largeValue);
totalCellByteSize.addAndGet(MutableSegment.getCellLength(cell));
store.add(cell, memStoreSizing);
}
smallCellThread.join();
assertTrue(exceptionRef.get() == null);
assertTrue(memStoreSizing.getCellsCount() == (MyCompactingMemStore3.CELL_COUNT * 2));
assertTrue(memStoreSizing.getDataSize() == totalCellByteSize.get());
if (flushByteSizeLessThanSmallAndLargeCellSize) {
assertTrue(myCompactingMemStore.flushCounter.get() == MyCompactingMemStore3.CELL_COUNT);
} else {
assertTrue(
myCompactingMemStore.flushCounter.get() <= (MyCompactingMemStore3.CELL_COUNT - 1));
}
} finally {
Thread.currentThread().setName(oldThreadName);
}
}
/**
* <pre>
* This test is for HBASE-26384,
* test {@link CompactingMemStore#flattenOneSegment} and {@link CompactingMemStore#snapshot()}
* execute concurrently.
* The threads sequence before HBASE-26384 is(The bug only exists for branch-2,and I add UTs
* for both branch-2 and master):
* 1. The {@link CompactingMemStore} size exceeds
* {@link CompactingMemStore#getInmemoryFlushSize()},the write thread adds a new
* {@link ImmutableSegment} to the head of {@link CompactingMemStore#pipeline},and start a
* in memory compact thread to execute {@link CompactingMemStore#inMemoryCompaction}.
* 2. The in memory compact thread starts and then stopping before
* {@link CompactingMemStore#flattenOneSegment}.
* 3. The snapshot thread starts {@link CompactingMemStore#snapshot} concurrently,after the
* snapshot thread executing {@link CompactingMemStore#getImmutableSegments},the in memory
* compact thread continues.
* Assuming {@link VersionedSegmentsList#version} returned from
* {@link CompactingMemStore#getImmutableSegments} is v.
* 4. The snapshot thread stopping before {@link CompactingMemStore#swapPipelineWithNull}.
* 5. The in memory compact thread completes {@link CompactingMemStore#flattenOneSegment},
* {@link CompactionPipeline#version} is still v.
* 6. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because
* {@link CompactionPipeline#version} is v, {@link CompactingMemStore#swapPipelineWithNull}
* thinks it is successful and continue flushing,but the {@link ImmutableSegment} in
* {@link CompactionPipeline} has changed because
* {@link CompactingMemStore#flattenOneSegment},so the {@link ImmutableSegment} is not
* removed in fact and still remaining in {@link CompactionPipeline}.
*
* After HBASE-26384, the 5-6 step is changed to following, which is expected behavior:
* 5. The in memory compact thread completes {@link CompactingMemStore#flattenOneSegment},
* {@link CompactingMemStore#flattenOneSegment} change {@link CompactionPipeline#version} to
* v+1.
* 6. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because
* {@link CompactionPipeline#version} is v+1, {@link CompactingMemStore#swapPipelineWithNull}
* failed and retry the while loop in {@link CompactingMemStore#pushPipelineToSnapshot} once
* again, because there is no concurrent {@link CompactingMemStore#inMemoryCompaction} now,
* {@link CompactingMemStore#swapPipelineWithNull} succeeds.
* </pre>
*/
@Test
public void testFlattenAndSnapshotCompactingMemStoreConcurrently() throws Exception {
Configuration conf = HBaseConfiguration.create();
byte[] smallValue = new byte[3];
byte[] largeValue = new byte[9];
final long timestamp = EnvironmentEdgeManager.currentTime();
final long seqId = 100;
final Cell smallCell = createCell(qf1, timestamp, seqId, smallValue);
final Cell largeCell = createCell(qf2, timestamp, seqId, largeValue);
int smallCellByteSize = MutableSegment.getCellLength(smallCell);
int largeCellByteSize = MutableSegment.getCellLength(largeCell);
int totalCellByteSize = (smallCellByteSize + largeCellByteSize);
int flushByteSize = totalCellByteSize - 2;
// set CompactingMemStore.inmemoryFlushSize to flushByteSize.
conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore4.class.getName());
conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005);
conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushByteSize * 200));
init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
.setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
MyCompactingMemStore4 myCompactingMemStore = ((MyCompactingMemStore4) store.memstore);
assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize);
store.add(smallCell, new NonThreadSafeMemStoreSizing());
store.add(largeCell, new NonThreadSafeMemStoreSizing());
String oldThreadName = Thread.currentThread().getName();
try {
Thread.currentThread().setName(MyCompactingMemStore4.TAKE_SNAPSHOT_THREAD_NAME);
/**
* {@link CompactingMemStore#snapshot} must wait the in memory compact thread enters
* {@link CompactingMemStore#flattenOneSegment},because {@link CompactingMemStore#snapshot}
* would invoke {@link CompactingMemStore#stopCompaction}.
*/
myCompactingMemStore.snapShotStartCyclicCyclicBarrier.await();
MemStoreSnapshot memStoreSnapshot = myCompactingMemStore.snapshot();
myCompactingMemStore.inMemoryCompactionEndCyclicBarrier.await();
assertTrue(memStoreSnapshot.getCellsCount() == 2);
assertTrue(((int) (memStoreSnapshot.getDataSize())) == totalCellByteSize);
VersionedSegmentsList segments = myCompactingMemStore.getImmutableSegments();
assertTrue(segments.getNumOfSegments() == 0);
assertTrue(segments.getNumOfCells() == 0);
assertTrue(myCompactingMemStore.setInMemoryCompactionFlagCounter.get() == 1);
assertTrue(myCompactingMemStore.swapPipelineWithNullCounter.get() == 2);
} finally {
Thread.currentThread().setName(oldThreadName);
}
}
/**
* <pre>
* This test is for HBASE-26384,
* test {@link CompactingMemStore#flattenOneSegment}{@link CompactingMemStore#snapshot()}
* and writeMemStore execute concurrently.
* The threads sequence before HBASE-26384 is(The bug only exists for branch-2,and I add UTs
* for both branch-2 and master):
* 1. The {@link CompactingMemStore} size exceeds
* {@link CompactingMemStore#getInmemoryFlushSize()},the write thread adds a new
* {@link ImmutableSegment} to the head of {@link CompactingMemStore#pipeline},and start a
* in memory compact thread to execute {@link CompactingMemStore#inMemoryCompaction}.
* 2. The in memory compact thread starts and then stopping before
* {@link CompactingMemStore#flattenOneSegment}.
* 3. The snapshot thread starts {@link CompactingMemStore#snapshot} concurrently,after the
* snapshot thread executing {@link CompactingMemStore#getImmutableSegments},the in memory
* compact thread continues.
* Assuming {@link VersionedSegmentsList#version} returned from
* {@link CompactingMemStore#getImmutableSegments} is v.
* 4. The snapshot thread stopping before {@link CompactingMemStore#swapPipelineWithNull}.
* 5. The in memory compact thread completes {@link CompactingMemStore#flattenOneSegment},
* {@link CompactionPipeline#version} is still v.
* 6. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because
* {@link CompactionPipeline#version} is v, {@link CompactingMemStore#swapPipelineWithNull}
* thinks it is successful and continue flushing,but the {@link ImmutableSegment} in
* {@link CompactionPipeline} has changed because
* {@link CompactingMemStore#flattenOneSegment},so the {@link ImmutableSegment} is not
* removed in fact and still remaining in {@link CompactionPipeline}.
*
* After HBASE-26384, the 5-6 step is changed to following, which is expected behavior,
* and I add step 7-8 to test there is new segment added before retry.
* 5. The in memory compact thread completes {@link CompactingMemStore#flattenOneSegment},
* {@link CompactingMemStore#flattenOneSegment} change {@link CompactionPipeline#version} to
* v+1.
* 6. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because
* {@link CompactionPipeline#version} is v+1, {@link CompactingMemStore#swapPipelineWithNull}
* failed and retry,{@link VersionedSegmentsList#version} returned from
* {@link CompactingMemStore#getImmutableSegments} is v+1.
* 7. The write thread continues writing to {@link CompactingMemStore} and
* {@link CompactingMemStore} size exceeds {@link CompactingMemStore#getInmemoryFlushSize()},
* {@link CompactingMemStore#flushInMemory(MutableSegment)} is called and a new
* {@link ImmutableSegment} is added to the head of {@link CompactingMemStore#pipeline},
* {@link CompactionPipeline#version} is still v+1.
* 8. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because
* {@link CompactionPipeline#version} is still v+1,
* {@link CompactingMemStore#swapPipelineWithNull} succeeds.The new {@link ImmutableSegment}
* remained at the head of {@link CompactingMemStore#pipeline},the old is removed by
* {@link CompactingMemStore#swapPipelineWithNull}.
* </pre>
*/
@Test
public void testFlattenSnapshotWriteCompactingMemeStoreConcurrently() throws Exception {
Configuration conf = HBaseConfiguration.create();
byte[] smallValue = new byte[3];
byte[] largeValue = new byte[9];
final long timestamp = EnvironmentEdgeManager.currentTime();
final long seqId = 100;
final Cell smallCell = createCell(qf1, timestamp, seqId, smallValue);
final Cell largeCell = createCell(qf2, timestamp, seqId, largeValue);
int smallCellByteSize = MutableSegment.getCellLength(smallCell);
int largeCellByteSize = MutableSegment.getCellLength(largeCell);
int firstWriteCellByteSize = (smallCellByteSize + largeCellByteSize);
int flushByteSize = firstWriteCellByteSize - 2;
// set CompactingMemStore.inmemoryFlushSize to flushByteSize.
conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore5.class.getName());
conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005);
conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushByteSize * 200));
init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
.setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
final MyCompactingMemStore5 myCompactingMemStore = ((MyCompactingMemStore5) store.memstore);
assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize);
store.add(smallCell, new NonThreadSafeMemStoreSizing());
store.add(largeCell, new NonThreadSafeMemStoreSizing());
final AtomicReference<Throwable> exceptionRef = new AtomicReference<Throwable>();
final Cell writeAgainCell1 = createCell(qf3, timestamp, seqId + 1, largeValue);
final Cell writeAgainCell2 = createCell(qf4, timestamp, seqId + 1, largeValue);
final int writeAgainCellByteSize =
MutableSegment.getCellLength(writeAgainCell1) + MutableSegment.getCellLength(writeAgainCell2);
final Thread writeAgainThread = new Thread(() -> {
try {
myCompactingMemStore.writeMemStoreAgainStartCyclicBarrier.await();
store.add(writeAgainCell1, new NonThreadSafeMemStoreSizing());
store.add(writeAgainCell2, new NonThreadSafeMemStoreSizing());
myCompactingMemStore.writeMemStoreAgainEndCyclicBarrier.await();
} catch (Throwable exception) {
exceptionRef.set(exception);
}
});
writeAgainThread.setName(MyCompactingMemStore5.WRITE_AGAIN_THREAD_NAME);
writeAgainThread.start();
String oldThreadName = Thread.currentThread().getName();
try {
Thread.currentThread().setName(MyCompactingMemStore5.TAKE_SNAPSHOT_THREAD_NAME);
/**
* {@link CompactingMemStore#snapshot} must wait the in memory compact thread enters
* {@link CompactingMemStore#flattenOneSegment},because {@link CompactingMemStore#snapshot}
* would invoke {@link CompactingMemStore#stopCompaction}.
*/
myCompactingMemStore.snapShotStartCyclicCyclicBarrier.await();
MemStoreSnapshot memStoreSnapshot = myCompactingMemStore.snapshot();
myCompactingMemStore.inMemoryCompactionEndCyclicBarrier.await();
writeAgainThread.join();
assertTrue(memStoreSnapshot.getCellsCount() == 2);
assertTrue(((int) (memStoreSnapshot.getDataSize())) == firstWriteCellByteSize);
VersionedSegmentsList segments = myCompactingMemStore.getImmutableSegments();
assertTrue(segments.getNumOfSegments() == 1);
assertTrue(
((int) (segments.getStoreSegments().get(0).getDataSize())) == writeAgainCellByteSize);
assertTrue(segments.getNumOfCells() == 2);
assertTrue(myCompactingMemStore.setInMemoryCompactionFlagCounter.get() == 2);
assertTrue(exceptionRef.get() == null);
assertTrue(myCompactingMemStore.swapPipelineWithNullCounter.get() == 2);
} finally {
Thread.currentThread().setName(oldThreadName);
}
}
/**
* <pre>
* This test is for HBASE-26465,
* test {@link DefaultMemStore#clearSnapshot} and {@link DefaultMemStore#getScanners} execute
* concurrently. The threads sequence before HBASE-26465 is:
* 1.The flush thread starts {@link DefaultMemStore} flushing after some cells have be added to
* {@link DefaultMemStore}.
* 2.The flush thread stopping before {@link DefaultMemStore#clearSnapshot} in
* {@link HStore#updateStorefiles} after completed flushing memStore to hfile.
* 3.The scan thread starts and stopping after {@link DefaultMemStore#getSnapshotSegments} in
* {@link DefaultMemStore#getScanners},here the scan thread gets the
* {@link DefaultMemStore#snapshot} which is created by the flush thread.
* 4.The flush thread continues {@link DefaultMemStore#clearSnapshot} and close
* {@link DefaultMemStore#snapshot},because the reference count of the corresponding
* {@link MemStoreLABImpl} is 0, the {@link Chunk}s in corresponding {@link MemStoreLABImpl}
* are recycled.
* 5.The scan thread continues {@link DefaultMemStore#getScanners},and create a
* {@link SegmentScanner} for this {@link DefaultMemStore#snapshot}, and increase the
* reference count of the corresponding {@link MemStoreLABImpl}, but {@link Chunk}s in
* corresponding {@link MemStoreLABImpl} are recycled by step 4, and these {@link Chunk}s may
* be overwritten by other write threads,which may cause serious problem.
* After HBASE-26465,{@link DefaultMemStore#getScanners} and
* {@link DefaultMemStore#clearSnapshot} could not execute concurrently.
* </pre>
*/
@Test
public void testClearSnapshotGetScannerConcurrently() throws Exception {
Configuration conf = HBaseConfiguration.create();
byte[] smallValue = new byte[3];
byte[] largeValue = new byte[9];
final long timestamp = EnvironmentEdgeManager.currentTime();
final long seqId = 100;
final Cell smallCell = createCell(qf1, timestamp, seqId, smallValue);
final Cell largeCell = createCell(qf2, timestamp, seqId, largeValue);
TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
quals.add(qf1);
quals.add(qf2);
conf.set(HStore.MEMSTORE_CLASS_NAME, MyDefaultMemStore.class.getName());
conf.setBoolean(WALFactory.WAL_ENABLED, false);
init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family).build());
MyDefaultMemStore myDefaultMemStore = (MyDefaultMemStore) (store.memstore);
myDefaultMemStore.store = store;
MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
store.add(smallCell, memStoreSizing);
store.add(largeCell, memStoreSizing);
final AtomicReference<Throwable> exceptionRef = new AtomicReference<Throwable>();
final Thread flushThread = new Thread(() -> {
try {
flushStore(store, id++);
} catch (Throwable exception) {
exceptionRef.set(exception);
}
});
flushThread.setName(MyDefaultMemStore.FLUSH_THREAD_NAME);
flushThread.start();
String oldThreadName = Thread.currentThread().getName();
StoreScanner storeScanner = null;
try {
Thread.currentThread().setName(MyDefaultMemStore.GET_SCANNER_THREAD_NAME);
/**
* Wait flush thread stopping before {@link DefaultMemStore#doClearSnapshot}
*/
myDefaultMemStore.getScannerCyclicBarrier.await();
storeScanner = (StoreScanner) store.getScanner(new Scan(new Get(row)), quals, seqId + 1);
flushThread.join();
if (myDefaultMemStore.shouldWait) {
SegmentScanner segmentScanner = getTypeKeyValueScanner(storeScanner, SegmentScanner.class);
MemStoreLABImpl memStoreLAB = (MemStoreLABImpl) (segmentScanner.segment.getMemStoreLAB());
assertTrue(memStoreLAB.isClosed());
assertTrue(!memStoreLAB.chunks.isEmpty());
assertTrue(!memStoreLAB.isReclaimed());
Cell cell1 = segmentScanner.next();
CellUtil.equals(smallCell, cell1);
Cell cell2 = segmentScanner.next();
CellUtil.equals(largeCell, cell2);
assertNull(segmentScanner.next());
} else {
List<Cell> results = new ArrayList<>();
storeScanner.next(results);
assertEquals(2, results.size());
CellUtil.equals(smallCell, results.get(0));
CellUtil.equals(largeCell, results.get(1));
}
assertTrue(exceptionRef.get() == null);
} finally {
if (storeScanner != null) {
storeScanner.close();
}
Thread.currentThread().setName(oldThreadName);
}
}
@SuppressWarnings("unchecked")
private <T> T getTypeKeyValueScanner(StoreScanner storeScanner, Class<T> keyValueScannerClass) {
List<T> resultScanners = new ArrayList<T>();
for (KeyValueScanner keyValueScanner : storeScanner.currentScanners) {
if (keyValueScannerClass.isInstance(keyValueScanner)) {
resultScanners.add((T) keyValueScanner);
}
}
assertTrue(resultScanners.size() == 1);
return resultScanners.get(0);
}
@Test
public void testOnConfigurationChange() throws IOException {
final int COMMON_MAX_FILES_TO_COMPACT = 10;
final int NEW_COMMON_MAX_FILES_TO_COMPACT = 8;
final int STORE_MAX_FILES_TO_COMPACT = 6;
// Build a table that its maxFileToCompact different from common configuration.
Configuration conf = HBaseConfiguration.create();
conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY,
COMMON_MAX_FILES_TO_COMPACT);
conf.setBoolean(CACHE_DATA_ON_READ_KEY, false);
conf.setBoolean(CACHE_BLOCKS_ON_WRITE_KEY, true);
conf.setBoolean(EVICT_BLOCKS_ON_CLOSE_KEY, true);
ColumnFamilyDescriptor hcd = ColumnFamilyDescriptorBuilder.newBuilder(family)
.setConfiguration(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY,
String.valueOf(STORE_MAX_FILES_TO_COMPACT))
.build();
init(this.name.getMethodName(), conf, hcd);
// After updating common configuration, the conf in HStore itself must not be changed.
conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY,
NEW_COMMON_MAX_FILES_TO_COMPACT);
this.store.onConfigurationChange(conf);
assertEquals(STORE_MAX_FILES_TO_COMPACT,
store.getStoreEngine().getCompactionPolicy().getConf().getMaxFilesToCompact());
assertEquals(conf.getBoolean(CACHE_DATA_ON_READ_KEY, DEFAULT_CACHE_DATA_ON_READ), false);
assertEquals(conf.getBoolean(CACHE_BLOCKS_ON_WRITE_KEY, DEFAULT_CACHE_DATA_ON_WRITE), true);
assertEquals(conf.getBoolean(EVICT_BLOCKS_ON_CLOSE_KEY, DEFAULT_EVICT_ON_CLOSE), true);
// reset to default values
conf.getBoolean(CACHE_DATA_ON_READ_KEY, DEFAULT_CACHE_DATA_ON_READ);
conf.getBoolean(CACHE_BLOCKS_ON_WRITE_KEY, DEFAULT_CACHE_DATA_ON_WRITE);
conf.getBoolean(EVICT_BLOCKS_ON_CLOSE_KEY, DEFAULT_EVICT_ON_CLOSE);
this.store.onConfigurationChange(conf);
}
/**
* This test is for HBASE-26476
*/
@Test
public void testExtendsDefaultMemStore() throws Exception {
Configuration conf = HBaseConfiguration.create();
conf.setBoolean(WALFactory.WAL_ENABLED, false);
init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family).build());
assertTrue(this.store.memstore.getClass() == DefaultMemStore.class);
tearDown();
conf.set(HStore.MEMSTORE_CLASS_NAME, CustomDefaultMemStore.class.getName());
init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family).build());
assertTrue(this.store.memstore.getClass() == CustomDefaultMemStore.class);
}
static class CustomDefaultMemStore extends DefaultMemStore {
public CustomDefaultMemStore(Configuration conf, CellComparator c,
RegionServicesForStores regionServices) {
super(conf, c, regionServices);
}
}
/**
* This test is for HBASE-26488
*/
@Test
public void testMemoryLeakWhenFlushMemStoreRetrying() throws Exception {
Configuration conf = HBaseConfiguration.create();
byte[] smallValue = new byte[3];
byte[] largeValue = new byte[9];
final long timestamp = EnvironmentEdgeManager.currentTime();
final long seqId = 100;
final Cell smallCell = createCell(qf1, timestamp, seqId, smallValue);
final Cell largeCell = createCell(qf2, timestamp, seqId, largeValue);
TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
quals.add(qf1);
quals.add(qf2);
conf.set(HStore.MEMSTORE_CLASS_NAME, MyDefaultMemStore1.class.getName());
conf.setBoolean(WALFactory.WAL_ENABLED, false);
conf.set(DefaultStoreEngine.DEFAULT_STORE_FLUSHER_CLASS_KEY,
MyDefaultStoreFlusher.class.getName());
init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family).build());
MyDefaultMemStore1 myDefaultMemStore = (MyDefaultMemStore1) (store.memstore);
assertTrue((store.storeEngine.getStoreFlusher()) instanceof MyDefaultStoreFlusher);
MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
store.add(smallCell, memStoreSizing);
store.add(largeCell, memStoreSizing);
flushStore(store, id++);
MemStoreLABImpl memStoreLAB =
(MemStoreLABImpl) (myDefaultMemStore.snapshotImmutableSegment.getMemStoreLAB());
assertTrue(memStoreLAB.isClosed());
assertTrue(memStoreLAB.getRefCntValue() == 0);
assertTrue(memStoreLAB.isReclaimed());
assertTrue(memStoreLAB.chunks.isEmpty());
StoreScanner storeScanner = null;
try {
storeScanner = (StoreScanner) store.getScanner(new Scan(new Get(row)), quals, seqId + 1);
assertTrue(store.storeEngine.getStoreFileManager().getStorefileCount() == 1);
assertTrue(store.memstore.size().getCellsCount() == 0);
assertTrue(store.memstore.getSnapshotSize().getCellsCount() == 0);
assertTrue(storeScanner.currentScanners.size() == 1);
assertTrue(storeScanner.currentScanners.get(0) instanceof StoreFileScanner);
List<Cell> results = new ArrayList<>();
storeScanner.next(results);
assertEquals(2, results.size());
CellUtil.equals(smallCell, results.get(0));
CellUtil.equals(largeCell, results.get(1));
} finally {
if (storeScanner != null) {
storeScanner.close();
}
}
}
static class MyDefaultMemStore1 extends DefaultMemStore {
private ImmutableSegment snapshotImmutableSegment;
public MyDefaultMemStore1(Configuration conf, CellComparator c,
RegionServicesForStores regionServices) {
super(conf, c, regionServices);
}
@Override
public MemStoreSnapshot snapshot() {
MemStoreSnapshot result = super.snapshot();
this.snapshotImmutableSegment = snapshot;
return result;
}
}
public static class MyDefaultStoreFlusher extends DefaultStoreFlusher {
private static final AtomicInteger failCounter = new AtomicInteger(1);
private static final AtomicInteger counter = new AtomicInteger(0);
public MyDefaultStoreFlusher(Configuration conf, HStore store) {
super(conf, store);
}
@Override
public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId,
MonitoredTask status, ThroughputController throughputController,
FlushLifeCycleTracker tracker, Consumer<Path> writerCreationTracker) throws IOException {
counter.incrementAndGet();
return super.flushSnapshot(snapshot, cacheFlushId, status, throughputController, tracker,
writerCreationTracker);
}
@Override
protected void performFlush(InternalScanner scanner, final CellSink sink,
ThroughputController throughputController) throws IOException {
final int currentCount = counter.get();
CellSink newCellSink = (cell) -> {
if (currentCount <= failCounter.get()) {
throw new IOException("Simulated exception by tests");
}
sink.append(cell);
};
super.performFlush(scanner, newCellSink, throughputController);
}
}
/**
* This test is for HBASE-26494, test the {@link RefCnt} behaviors in {@link ImmutableMemStoreLAB}
*/
@Test
public void testImmutableMemStoreLABRefCnt() throws Exception {
Configuration conf = HBaseConfiguration.create();
byte[] smallValue = new byte[3];
byte[] largeValue = new byte[9];
final long timestamp = EnvironmentEdgeManager.currentTime();
final long seqId = 100;
final Cell smallCell1 = createCell(qf1, timestamp, seqId, smallValue);
final Cell largeCell1 = createCell(qf2, timestamp, seqId, largeValue);
final Cell smallCell2 = createCell(qf3, timestamp, seqId + 1, smallValue);
final Cell largeCell2 = createCell(qf4, timestamp, seqId + 1, largeValue);
final Cell smallCell3 = createCell(qf5, timestamp, seqId + 2, smallValue);
final Cell largeCell3 = createCell(qf6, timestamp, seqId + 2, largeValue);
int smallCellByteSize = MutableSegment.getCellLength(smallCell1);
int largeCellByteSize = MutableSegment.getCellLength(largeCell1);
int firstWriteCellByteSize = (smallCellByteSize + largeCellByteSize);
int flushByteSize = firstWriteCellByteSize - 2;
// set CompactingMemStore.inmemoryFlushSize to flushByteSize.
conf.set(HStore.MEMSTORE_CLASS_NAME, CompactingMemStore.class.getName());
conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005);
conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushByteSize * 200));
conf.setBoolean(WALFactory.WAL_ENABLED, false);
init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
.setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
final CompactingMemStore myCompactingMemStore = ((CompactingMemStore) store.memstore);
assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize);
myCompactingMemStore.allowCompaction.set(false);
NonThreadSafeMemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
store.add(smallCell1, memStoreSizing);
store.add(largeCell1, memStoreSizing);
store.add(smallCell2, memStoreSizing);
store.add(largeCell2, memStoreSizing);
store.add(smallCell3, memStoreSizing);
store.add(largeCell3, memStoreSizing);
VersionedSegmentsList versionedSegmentsList = myCompactingMemStore.getImmutableSegments();
assertTrue(versionedSegmentsList.getNumOfSegments() == 3);
List<ImmutableSegment> segments = versionedSegmentsList.getStoreSegments();
List<MemStoreLABImpl> memStoreLABs = new ArrayList<MemStoreLABImpl>(segments.size());
for (ImmutableSegment segment : segments) {
memStoreLABs.add((MemStoreLABImpl) segment.getMemStoreLAB());
}
List<KeyValueScanner> scanners1 = myCompactingMemStore.getScanners(Long.MAX_VALUE);
for (MemStoreLABImpl memStoreLAB : memStoreLABs) {
assertTrue(memStoreLAB.getRefCntValue() == 2);
}
myCompactingMemStore.allowCompaction.set(true);
myCompactingMemStore.flushInMemory();
versionedSegmentsList = myCompactingMemStore.getImmutableSegments();
assertTrue(versionedSegmentsList.getNumOfSegments() == 1);
ImmutableMemStoreLAB immutableMemStoreLAB =
(ImmutableMemStoreLAB) (versionedSegmentsList.getStoreSegments().get(0).getMemStoreLAB());
for (MemStoreLABImpl memStoreLAB : memStoreLABs) {
assertTrue(memStoreLAB.getRefCntValue() == 2);
}
List<KeyValueScanner> scanners2 = myCompactingMemStore.getScanners(Long.MAX_VALUE);
for (MemStoreLABImpl memStoreLAB : memStoreLABs) {
assertTrue(memStoreLAB.getRefCntValue() == 2);
}
assertTrue(immutableMemStoreLAB.getRefCntValue() == 2);
for (KeyValueScanner scanner : scanners1) {
scanner.close();
}
for (MemStoreLABImpl memStoreLAB : memStoreLABs) {
assertTrue(memStoreLAB.getRefCntValue() == 1);
}
for (KeyValueScanner scanner : scanners2) {
scanner.close();
}
for (MemStoreLABImpl memStoreLAB : memStoreLABs) {
assertTrue(memStoreLAB.getRefCntValue() == 1);
}
assertTrue(immutableMemStoreLAB.getRefCntValue() == 1);
flushStore(store, id++);
for (MemStoreLABImpl memStoreLAB : memStoreLABs) {
assertTrue(memStoreLAB.getRefCntValue() == 0);
}
assertTrue(immutableMemStoreLAB.getRefCntValue() == 0);
assertTrue(immutableMemStoreLAB.isClosed());
for (MemStoreLABImpl memStoreLAB : memStoreLABs) {
assertTrue(memStoreLAB.isClosed());
assertTrue(memStoreLAB.isReclaimed());
assertTrue(memStoreLAB.chunks.isEmpty());
}
}
private HStoreFile mockStoreFileWithLength(long length) {
HStoreFile sf = mock(HStoreFile.class);
StoreFileReader sfr = mock(StoreFileReader.class);
when(sf.isHFile()).thenReturn(true);
when(sf.getReader()).thenReturn(sfr);
when(sfr.length()).thenReturn(length);
return sf;
}
private static class MyThread extends Thread {
private StoreScanner scanner;
private KeyValueHeap heap;
public MyThread(StoreScanner scanner) {
this.scanner = scanner;
}
public KeyValueHeap getHeap() {
return this.heap;
}
@Override
public void run() {
scanner.trySwitchToStreamRead();
heap = scanner.heap;
}
}
private static class MyMemStoreCompactor extends MemStoreCompactor {
private static final AtomicInteger RUNNER_COUNT = new AtomicInteger(0);
private static final CountDownLatch START_COMPACTOR_LATCH = new CountDownLatch(1);
public MyMemStoreCompactor(CompactingMemStore compactingMemStore,
MemoryCompactionPolicy compactionPolicy) throws IllegalArgumentIOException {
super(compactingMemStore, compactionPolicy);
}
@Override
public boolean start() throws IOException {
boolean isFirst = RUNNER_COUNT.getAndIncrement() == 0;
if (isFirst) {
try {
START_COMPACTOR_LATCH.await();
return super.start();
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
}
return super.start();
}
}
public static class MyCompactingMemStoreWithCustomCompactor extends CompactingMemStore {
private static final AtomicInteger RUNNER_COUNT = new AtomicInteger(0);
public MyCompactingMemStoreWithCustomCompactor(Configuration conf, CellComparatorImpl c,
HStore store, RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy)
throws IOException {
super(conf, c, store, regionServices, compactionPolicy);
}
@Override
protected MemStoreCompactor createMemStoreCompactor(MemoryCompactionPolicy compactionPolicy)
throws IllegalArgumentIOException {
return new MyMemStoreCompactor(this, compactionPolicy);
}
@Override
protected boolean setInMemoryCompactionFlag() {
boolean rval = super.setInMemoryCompactionFlag();
if (rval) {
RUNNER_COUNT.incrementAndGet();
if (LOG.isDebugEnabled()) {
LOG.debug("runner count: " + RUNNER_COUNT.get());
}
}
return rval;
}
}
public static class MyCompactingMemStore extends CompactingMemStore {
private static final AtomicBoolean START_TEST = new AtomicBoolean(false);
private final CountDownLatch getScannerLatch = new CountDownLatch(1);
private final CountDownLatch snapshotLatch = new CountDownLatch(1);
public MyCompactingMemStore(Configuration conf, CellComparatorImpl c, HStore store,
RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy)
throws IOException {
super(conf, c, store, regionServices, compactionPolicy);
}
@Override
protected List<KeyValueScanner> createList(int capacity) {
if (START_TEST.get()) {
try {
getScannerLatch.countDown();
snapshotLatch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
return new ArrayList<>(capacity);
}
@Override
protected void pushActiveToPipeline(MutableSegment active, boolean checkEmpty) {
if (START_TEST.get()) {
try {
getScannerLatch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
super.pushActiveToPipeline(active, checkEmpty);
if (START_TEST.get()) {
snapshotLatch.countDown();
}
}
}
interface MyListHook {
void hook(int currentSize);
}
private static class MyList<T> implements List<T> {
private final List<T> delegatee = new ArrayList<>();
private final MyListHook hookAtAdd;
MyList(final MyListHook hookAtAdd) {
this.hookAtAdd = hookAtAdd;
}
@Override
public int size() {
return delegatee.size();
}
@Override
public boolean isEmpty() {
return delegatee.isEmpty();
}
@Override
public boolean contains(Object o) {
return delegatee.contains(o);
}
@Override
public Iterator<T> iterator() {
return delegatee.iterator();
}
@Override
public Object[] toArray() {
return delegatee.toArray();
}
@Override
public <R> R[] toArray(R[] a) {
return delegatee.toArray(a);
}
@Override
public boolean add(T e) {
hookAtAdd.hook(size());
return delegatee.add(e);
}
@Override
public boolean remove(Object o) {
return delegatee.remove(o);
}
@Override
public boolean containsAll(Collection<?> c) {
return delegatee.containsAll(c);
}
@Override
public boolean addAll(Collection<? extends T> c) {
return delegatee.addAll(c);
}
@Override
public boolean addAll(int index, Collection<? extends T> c) {
return delegatee.addAll(index, c);
}
@Override
public boolean removeAll(Collection<?> c) {
return delegatee.removeAll(c);
}
@Override
public boolean retainAll(Collection<?> c) {
return delegatee.retainAll(c);
}
@Override
public void clear() {
delegatee.clear();
}
@Override
public T get(int index) {
return delegatee.get(index);
}
@Override
public T set(int index, T element) {
return delegatee.set(index, element);
}
@Override
public void add(int index, T element) {
delegatee.add(index, element);
}
@Override
public T remove(int index) {
return delegatee.remove(index);
}
@Override
public int indexOf(Object o) {
return delegatee.indexOf(o);
}
@Override
public int lastIndexOf(Object o) {
return delegatee.lastIndexOf(o);
}
@Override
public ListIterator<T> listIterator() {
return delegatee.listIterator();
}
@Override
public ListIterator<T> listIterator(int index) {
return delegatee.listIterator(index);
}
@Override
public List<T> subList(int fromIndex, int toIndex) {
return delegatee.subList(fromIndex, toIndex);
}
}
public static class MyCompactingMemStore2 extends CompactingMemStore {
private static final String LARGE_CELL_THREAD_NAME = "largeCellThread";
private static final String SMALL_CELL_THREAD_NAME = "smallCellThread";
private final CyclicBarrier preCyclicBarrier = new CyclicBarrier(2);
private final CyclicBarrier postCyclicBarrier = new CyclicBarrier(2);
private final AtomicInteger largeCellPreUpdateCounter = new AtomicInteger(0);
private final AtomicInteger smallCellPreUpdateCounter = new AtomicInteger(0);
public MyCompactingMemStore2(Configuration conf, CellComparatorImpl cellComparator,
HStore store, RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy)
throws IOException {
super(conf, cellComparator, store, regionServices, compactionPolicy);
}
@Override
protected boolean checkAndAddToActiveSize(MutableSegment currActive, Cell cellToAdd,
MemStoreSizing memstoreSizing) {
if (Thread.currentThread().getName().equals(LARGE_CELL_THREAD_NAME)) {
int currentCount = largeCellPreUpdateCounter.incrementAndGet();
if (currentCount <= 1) {
try {
/**
* smallCellThread enters CompactingMemStore.checkAndAddToActiveSize first, then
* largeCellThread enters CompactingMemStore.checkAndAddToActiveSize, and then
* largeCellThread invokes flushInMemory.
*/
preCyclicBarrier.await();
} catch (Throwable e) {
throw new RuntimeException(e);
}
}
}
boolean returnValue = super.checkAndAddToActiveSize(currActive, cellToAdd, memstoreSizing);
if (Thread.currentThread().getName().equals(SMALL_CELL_THREAD_NAME)) {
try {
preCyclicBarrier.await();
} catch (Throwable e) {
throw new RuntimeException(e);
}
}
return returnValue;
}
@Override
protected void doAdd(MutableSegment currentActive, Cell cell, MemStoreSizing memstoreSizing) {
if (Thread.currentThread().getName().equals(SMALL_CELL_THREAD_NAME)) {
try {
/**
* After largeCellThread finished flushInMemory method, smallCellThread can add cell to
* currentActive . That is to say when largeCellThread called flushInMemory method,
* currentActive has no cell.
*/
postCyclicBarrier.await();
} catch (Throwable e) {
throw new RuntimeException(e);
}
}
super.doAdd(currentActive, cell, memstoreSizing);
}
@Override
protected void flushInMemory(MutableSegment currentActiveMutableSegment) {
super.flushInMemory(currentActiveMutableSegment);
if (Thread.currentThread().getName().equals(LARGE_CELL_THREAD_NAME)) {
if (largeCellPreUpdateCounter.get() <= 1) {
try {
postCyclicBarrier.await();
} catch (Throwable e) {
throw new RuntimeException(e);
}
}
}
}
}
public static class MyCompactingMemStore3 extends CompactingMemStore {
private static final String LARGE_CELL_THREAD_NAME = "largeCellThread";
private static final String SMALL_CELL_THREAD_NAME = "smallCellThread";
private final CyclicBarrier preCyclicBarrier = new CyclicBarrier(2);
private final CyclicBarrier postCyclicBarrier = new CyclicBarrier(2);
private final AtomicInteger flushCounter = new AtomicInteger(0);
private static final int CELL_COUNT = 5;
private boolean flushByteSizeLessThanSmallAndLargeCellSize = true;
public MyCompactingMemStore3(Configuration conf, CellComparatorImpl cellComparator,
HStore store, RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy)
throws IOException {
super(conf, cellComparator, store, regionServices, compactionPolicy);
}
@Override
protected boolean checkAndAddToActiveSize(MutableSegment currActive, Cell cellToAdd,
MemStoreSizing memstoreSizing) {
if (!flushByteSizeLessThanSmallAndLargeCellSize) {
return super.checkAndAddToActiveSize(currActive, cellToAdd, memstoreSizing);
}
if (Thread.currentThread().getName().equals(LARGE_CELL_THREAD_NAME)) {
try {
preCyclicBarrier.await();
} catch (Throwable e) {
throw new RuntimeException(e);
}
}
boolean returnValue = super.checkAndAddToActiveSize(currActive, cellToAdd, memstoreSizing);
if (Thread.currentThread().getName().equals(SMALL_CELL_THREAD_NAME)) {
try {
preCyclicBarrier.await();
} catch (Throwable e) {
throw new RuntimeException(e);
}
}
return returnValue;
}
@Override
protected void postUpdate(MutableSegment currentActiveMutableSegment) {
super.postUpdate(currentActiveMutableSegment);
if (!flushByteSizeLessThanSmallAndLargeCellSize) {
try {
postCyclicBarrier.await();
} catch (Throwable e) {
throw new RuntimeException(e);
}
return;
}
if (Thread.currentThread().getName().equals(SMALL_CELL_THREAD_NAME)) {
try {
postCyclicBarrier.await();
} catch (Throwable e) {
throw new RuntimeException(e);
}
}
}
@Override
protected void flushInMemory(MutableSegment currentActiveMutableSegment) {
super.flushInMemory(currentActiveMutableSegment);
flushCounter.incrementAndGet();
if (!flushByteSizeLessThanSmallAndLargeCellSize) {
return;
}
assertTrue(Thread.currentThread().getName().equals(LARGE_CELL_THREAD_NAME));
try {
postCyclicBarrier.await();
} catch (Throwable e) {
throw new RuntimeException(e);
}
}
void disableCompaction() {
allowCompaction.set(false);
}
void enableCompaction() {
allowCompaction.set(true);
}
}
public static class MyCompactingMemStore4 extends CompactingMemStore {
private static final String TAKE_SNAPSHOT_THREAD_NAME = "takeSnapShotThread";
/**
* {@link CompactingMemStore#flattenOneSegment} must execute after
* {@link CompactingMemStore#getImmutableSegments}
*/
private final CyclicBarrier flattenOneSegmentPreCyclicBarrier = new CyclicBarrier(2);
/**
* Only after {@link CompactingMemStore#flattenOneSegment} completed,
* {@link CompactingMemStore#swapPipelineWithNull} could execute.
*/
private final CyclicBarrier flattenOneSegmentPostCyclicBarrier = new CyclicBarrier(2);
/**
* Only the in memory compact thread enters {@link CompactingMemStore#flattenOneSegment},the
* snapshot thread starts {@link CompactingMemStore#snapshot},because
* {@link CompactingMemStore#snapshot} would invoke {@link CompactingMemStore#stopCompaction}.
*/
private final CyclicBarrier snapShotStartCyclicCyclicBarrier = new CyclicBarrier(2);
/**
* To wait for {@link CompactingMemStore.InMemoryCompactionRunnable} stopping.
*/
private final CyclicBarrier inMemoryCompactionEndCyclicBarrier = new CyclicBarrier(2);
private final AtomicInteger getImmutableSegmentsListCounter = new AtomicInteger(0);
private final AtomicInteger swapPipelineWithNullCounter = new AtomicInteger(0);
private final AtomicInteger flattenOneSegmentCounter = new AtomicInteger(0);
private final AtomicInteger setInMemoryCompactionFlagCounter = new AtomicInteger(0);
public MyCompactingMemStore4(Configuration conf, CellComparatorImpl cellComparator,
HStore store, RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy)
throws IOException {
super(conf, cellComparator, store, regionServices, compactionPolicy);
}
@Override
public VersionedSegmentsList getImmutableSegments() {
VersionedSegmentsList result = super.getImmutableSegments();
if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) {
int currentCount = getImmutableSegmentsListCounter.incrementAndGet();
if (currentCount <= 1) {
try {
flattenOneSegmentPreCyclicBarrier.await();
} catch (Throwable e) {
throw new RuntimeException(e);
}
}
}
return result;
}
@Override
protected boolean swapPipelineWithNull(VersionedSegmentsList segments) {
if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) {
int currentCount = swapPipelineWithNullCounter.incrementAndGet();
if (currentCount <= 1) {
try {
flattenOneSegmentPostCyclicBarrier.await();
} catch (Throwable e) {
throw new RuntimeException(e);
}
}
}
boolean result = super.swapPipelineWithNull(segments);
if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) {
int currentCount = swapPipelineWithNullCounter.get();
if (currentCount <= 1) {
assertTrue(!result);
}
if (currentCount == 2) {
assertTrue(result);
}
}
return result;
}
@Override
public void flattenOneSegment(long requesterVersion, Action action) {
int currentCount = flattenOneSegmentCounter.incrementAndGet();
if (currentCount <= 1) {
try {
/**
* {@link CompactingMemStore#snapshot} could start.
*/
snapShotStartCyclicCyclicBarrier.await();
flattenOneSegmentPreCyclicBarrier.await();
} catch (Throwable e) {
throw new RuntimeException(e);
}
}
super.flattenOneSegment(requesterVersion, action);
if (currentCount <= 1) {
try {
flattenOneSegmentPostCyclicBarrier.await();
} catch (Throwable e) {
throw new RuntimeException(e);
}
}
}
@Override
protected boolean setInMemoryCompactionFlag() {
boolean result = super.setInMemoryCompactionFlag();
assertTrue(result);
setInMemoryCompactionFlagCounter.incrementAndGet();
return result;
}
@Override
void inMemoryCompaction() {
try {
super.inMemoryCompaction();
} finally {
try {
inMemoryCompactionEndCyclicBarrier.await();
} catch (Throwable e) {
throw new RuntimeException(e);
}
}
}
}
public static class MyCompactingMemStore5 extends CompactingMemStore {
private static final String TAKE_SNAPSHOT_THREAD_NAME = "takeSnapShotThread";
private static final String WRITE_AGAIN_THREAD_NAME = "writeAgainThread";
/**
* {@link CompactingMemStore#flattenOneSegment} must execute after
* {@link CompactingMemStore#getImmutableSegments}
*/
private final CyclicBarrier flattenOneSegmentPreCyclicBarrier = new CyclicBarrier(2);
/**
* Only after {@link CompactingMemStore#flattenOneSegment} completed,
* {@link CompactingMemStore#swapPipelineWithNull} could execute.
*/
private final CyclicBarrier flattenOneSegmentPostCyclicBarrier = new CyclicBarrier(2);
/**
* Only the in memory compact thread enters {@link CompactingMemStore#flattenOneSegment},the
* snapshot thread starts {@link CompactingMemStore#snapshot},because
* {@link CompactingMemStore#snapshot} would invoke {@link CompactingMemStore#stopCompaction}.
*/
private final CyclicBarrier snapShotStartCyclicCyclicBarrier = new CyclicBarrier(2);
/**
* To wait for {@link CompactingMemStore.InMemoryCompactionRunnable} stopping.
*/
private final CyclicBarrier inMemoryCompactionEndCyclicBarrier = new CyclicBarrier(2);
private final AtomicInteger getImmutableSegmentsListCounter = new AtomicInteger(0);
private final AtomicInteger swapPipelineWithNullCounter = new AtomicInteger(0);
private final AtomicInteger flattenOneSegmentCounter = new AtomicInteger(0);
private final AtomicInteger setInMemoryCompactionFlagCounter = new AtomicInteger(0);
/**
* Only the snapshot thread retry {@link CompactingMemStore#swapPipelineWithNull}, writeAgain
* thread could start.
*/
private final CyclicBarrier writeMemStoreAgainStartCyclicBarrier = new CyclicBarrier(2);
/**
* This is used for snapshot thread,writeAgain thread and in memory compact thread. Only the
* writeAgain thread completes, {@link CompactingMemStore#swapPipelineWithNull} would
* execute,and in memory compact thread would exit,because we expect that in memory compact
* executing only once.
*/
private final CyclicBarrier writeMemStoreAgainEndCyclicBarrier = new CyclicBarrier(3);
public MyCompactingMemStore5(Configuration conf, CellComparatorImpl cellComparator,
HStore store, RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy)
throws IOException {
super(conf, cellComparator, store, regionServices, compactionPolicy);
}
@Override
public VersionedSegmentsList getImmutableSegments() {
VersionedSegmentsList result = super.getImmutableSegments();
if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) {
int currentCount = getImmutableSegmentsListCounter.incrementAndGet();
if (currentCount <= 1) {
try {
flattenOneSegmentPreCyclicBarrier.await();
} catch (Throwable e) {
throw new RuntimeException(e);
}
}
}
return result;
}
@Override
protected boolean swapPipelineWithNull(VersionedSegmentsList segments) {
if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) {
int currentCount = swapPipelineWithNullCounter.incrementAndGet();
if (currentCount <= 1) {
try {
flattenOneSegmentPostCyclicBarrier.await();
} catch (Throwable e) {
throw new RuntimeException(e);
}
}
if (currentCount == 2) {
try {
/**
* Only the snapshot thread retry {@link CompactingMemStore#swapPipelineWithNull},
* writeAgain thread could start.
*/
writeMemStoreAgainStartCyclicBarrier.await();
/**
* Only the writeAgain thread completes, retry
* {@link CompactingMemStore#swapPipelineWithNull} would execute.
*/
writeMemStoreAgainEndCyclicBarrier.await();
} catch (Throwable e) {
throw new RuntimeException(e);
}
}
}
boolean result = super.swapPipelineWithNull(segments);
if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) {
int currentCount = swapPipelineWithNullCounter.get();
if (currentCount <= 1) {
assertTrue(!result);
}
if (currentCount == 2) {
assertTrue(result);
}
}
return result;
}
@Override
public void flattenOneSegment(long requesterVersion, Action action) {
int currentCount = flattenOneSegmentCounter.incrementAndGet();
if (currentCount <= 1) {
try {
/**
* {@link CompactingMemStore#snapshot} could start.
*/
snapShotStartCyclicCyclicBarrier.await();
flattenOneSegmentPreCyclicBarrier.await();
} catch (Throwable e) {
throw new RuntimeException(e);
}
}
super.flattenOneSegment(requesterVersion, action);
if (currentCount <= 1) {
try {
flattenOneSegmentPostCyclicBarrier.await();
/**
* Only the writeAgain thread completes, in memory compact thread would exit,because we
* expect that in memory compact executing only once.
*/
writeMemStoreAgainEndCyclicBarrier.await();
} catch (Throwable e) {
throw new RuntimeException(e);
}
}
}
@Override
protected boolean setInMemoryCompactionFlag() {
boolean result = super.setInMemoryCompactionFlag();
int count = setInMemoryCompactionFlagCounter.incrementAndGet();
if (count <= 1) {
assertTrue(result);
}
if (count == 2) {
assertTrue(!result);
}
return result;
}
@Override
void inMemoryCompaction() {
try {
super.inMemoryCompaction();
} finally {
try {
inMemoryCompactionEndCyclicBarrier.await();
} catch (Throwable e) {
throw new RuntimeException(e);
}
}
}
}
public static class MyCompactingMemStore6 extends CompactingMemStore {
private final CyclicBarrier inMemoryCompactionEndCyclicBarrier = new CyclicBarrier(2);
public MyCompactingMemStore6(Configuration conf, CellComparatorImpl cellComparator,
HStore store, RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy)
throws IOException {
super(conf, cellComparator, store, regionServices, compactionPolicy);
}
@Override
void inMemoryCompaction() {
try {
super.inMemoryCompaction();
} finally {
try {
inMemoryCompactionEndCyclicBarrier.await();
} catch (Throwable e) {
throw new RuntimeException(e);
}
}
}
}
public static class MyDefaultMemStore extends DefaultMemStore {
private static final String GET_SCANNER_THREAD_NAME = "getScannerMyThread";
private static final String FLUSH_THREAD_NAME = "flushMyThread";
/**
* Only when flush thread enters {@link DefaultMemStore#doClearSnapShot}, getScanner thread
* could start.
*/
private final CyclicBarrier getScannerCyclicBarrier = new CyclicBarrier(2);
/**
* Used by getScanner thread notifies flush thread {@link DefaultMemStore#getSnapshotSegments}
* completed, {@link DefaultMemStore#doClearSnapShot} could continue.
*/
private final CyclicBarrier preClearSnapShotCyclicBarrier = new CyclicBarrier(2);
/**
* Used by flush thread notifies getScanner thread {@link DefaultMemStore#doClearSnapShot}
* completed, {@link DefaultMemStore#getScanners} could continue.
*/
private final CyclicBarrier postClearSnapShotCyclicBarrier = new CyclicBarrier(2);
private final AtomicInteger getSnapshotSegmentsCounter = new AtomicInteger(0);
private final AtomicInteger clearSnapshotCounter = new AtomicInteger(0);
private volatile boolean shouldWait = true;
private volatile HStore store = null;
public MyDefaultMemStore(Configuration conf, CellComparator cellComparator,
RegionServicesForStores regionServices) throws IOException {
super(conf, cellComparator, regionServices);
}
@Override
protected List<Segment> getSnapshotSegments() {
List<Segment> result = super.getSnapshotSegments();
if (Thread.currentThread().getName().equals(GET_SCANNER_THREAD_NAME)) {
int currentCount = getSnapshotSegmentsCounter.incrementAndGet();
if (currentCount == 1) {
if (this.shouldWait) {
try {
/**
* Notify flush thread {@link DefaultMemStore#getSnapshotSegments} completed,
* {@link DefaultMemStore#doClearSnapShot} could continue.
*/
preClearSnapShotCyclicBarrier.await();
/**
* Wait for {@link DefaultMemStore#doClearSnapShot} completed.
*/
postClearSnapShotCyclicBarrier.await();
} catch (Throwable e) {
throw new RuntimeException(e);
}
}
}
}
return result;
}
@Override
protected void doClearSnapShot() {
if (Thread.currentThread().getName().equals(FLUSH_THREAD_NAME)) {
int currentCount = clearSnapshotCounter.incrementAndGet();
if (currentCount == 1) {
try {
if (
((ReentrantReadWriteLock) store.getStoreEngine().getLock())
.isWriteLockedByCurrentThread()
) {
shouldWait = false;
}
/**
* Only when flush thread enters {@link DefaultMemStore#doClearSnapShot}, getScanner
* thread could start.
*/
getScannerCyclicBarrier.await();
if (shouldWait) {
/**
* Wait for {@link DefaultMemStore#getSnapshotSegments} completed.
*/
preClearSnapShotCyclicBarrier.await();
}
} catch (Throwable e) {
throw new RuntimeException(e);
}
}
}
super.doClearSnapShot();
if (Thread.currentThread().getName().equals(FLUSH_THREAD_NAME)) {
int currentCount = clearSnapshotCounter.get();
if (currentCount == 1) {
if (shouldWait) {
try {
/**
* Notify getScanner thread {@link DefaultMemStore#doClearSnapShot} completed,
* {@link DefaultMemStore#getScanners} could continue.
*/
postClearSnapShotCyclicBarrier.await();
} catch (Throwable e) {
throw new RuntimeException(e);
}
}
}
}
}
}
}