blob: 1865c0b38790e019acfb3740f0adfb9ca7bb8f8e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mob;
import static org.apache.hadoop.hbase.HBaseTestingUtility.START_KEY;
import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1;
import static org.apache.hadoop.hbase.regionserver.HStoreFile.BULKLOAD_TIME_KEY;
import static org.apache.hadoop.hbase.regionserver.HStoreFile.MOB_CELLS_COUNT;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.HStoreFile;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.RegionAsTable;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.Pair;
import org.junit.After;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Test mob store compaction
*/
@Category(MediumTests.class)
public class TestMobStoreCompaction {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestMobStoreCompaction.class);
@Rule
public TestName name = new TestName();
static final Logger LOG = LoggerFactory.getLogger(TestMobStoreCompaction.class.getName());
private final static HBaseTestingUtility UTIL = new HBaseTestingUtility();
private Configuration conf = null;
private HRegion region = null;
private TableDescriptor tableDescriptor = null;
private ColumnFamilyDescriptor familyDescriptor = null;
private long mobCellThreshold = 1000;
private FileSystem fs;
private static final byte[] COLUMN_FAMILY = fam1;
private final byte[] STARTROW = Bytes.toBytes(START_KEY);
private int compactionThreshold;
private void init(Configuration conf, long mobThreshold) throws Exception {
this.conf = conf;
this.mobCellThreshold = mobThreshold;
HBaseTestingUtility UTIL = new HBaseTestingUtility(conf);
compactionThreshold = conf.getInt("hbase.hstore.compactionThreshold", 3);
familyDescriptor = ColumnFamilyDescriptorBuilder.newBuilder(COLUMN_FAMILY).setMobEnabled(true)
.setMobThreshold(mobThreshold).setMaxVersions(1).build();
tableDescriptor = UTIL.createModifyableTableDescriptor(name.getMethodName())
.modifyColumnFamily(familyDescriptor).build();
RegionInfo regionInfo = RegionInfoBuilder.newBuilder(tableDescriptor.getTableName()).build();
region = HBaseTestingUtility.createRegionAndWAL(regionInfo,
UTIL.getDataTestDir(), conf, tableDescriptor, new MobFileCache(conf));
fs = FileSystem.get(conf);
}
@After
public void tearDown() throws Exception {
region.close();
fs.delete(UTIL.getDataTestDir(), true);
}
/**
* During compaction, cells smaller than the threshold won't be affected.
*/
@Test
public void testSmallerValue() throws Exception {
init(UTIL.getConfiguration(), 500);
byte[] dummyData = makeDummyData(300); // smaller than mob threshold
Table loader = new RegionAsTable(region);
// one hfile per row
for (int i = 0; i < compactionThreshold; i++) {
Put p = createPut(i, dummyData);
loader.put(p);
region.flush(true);
}
assertEquals("Before compaction: store files", compactionThreshold, countStoreFiles());
assertEquals("Before compaction: mob file count", 0, countMobFiles());
assertEquals("Before compaction: rows", compactionThreshold, UTIL.countRows(region));
assertEquals("Before compaction: mob rows", 0, countMobRows());
region.compactStores();
assertEquals("After compaction: store files", 1, countStoreFiles());
assertEquals("After compaction: mob file count", 0, countMobFiles());
assertEquals("After compaction: referenced mob file count", 0, countReferencedMobFiles());
assertEquals("After compaction: rows", compactionThreshold, UTIL.countRows(region));
assertEquals("After compaction: mob rows", 0, countMobRows());
}
/**
* During compaction, the mob threshold size is changed.
*/
@Test
public void testLargerValue() throws Exception {
init(UTIL.getConfiguration(), 200);
byte[] dummyData = makeDummyData(300); // larger than mob threshold
Table loader = new RegionAsTable(region);
for (int i = 0; i < compactionThreshold; i++) {
Put p = createPut(i, dummyData);
loader.put(p);
region.flush(true);
}
assertEquals("Before compaction: store files", compactionThreshold, countStoreFiles());
assertEquals("Before compaction: mob file count", compactionThreshold, countMobFiles());
assertEquals("Before compaction: rows", compactionThreshold, UTIL.countRows(region));
assertEquals("Before compaction: mob rows", compactionThreshold, countMobRows());
assertEquals("Before compaction: number of mob cells", compactionThreshold,
countMobCellsInMetadata());
// Change the threshold larger than the data size
setMobThreshold(region, COLUMN_FAMILY, 500);
region.initialize();
List<HStore> stores = region.getStores();
for (HStore store: stores) {
// Force major compaction
store.triggerMajorCompaction();
Optional<CompactionContext> context =
store.requestCompaction(HStore.PRIORITY_USER, CompactionLifeCycleTracker.DUMMY,
User.getCurrent());
if (!context.isPresent()) {
continue;
}
region.compact(context.get(), store,
NoLimitThroughputController.INSTANCE, User.getCurrent());
}
assertEquals("After compaction: store files", 1, countStoreFiles());
assertEquals("After compaction: mob file count", compactionThreshold, countMobFiles());
assertEquals("After compaction: referenced mob file count", 0, countReferencedMobFiles());
assertEquals("After compaction: rows", compactionThreshold, UTIL.countRows(region));
assertEquals("After compaction: mob rows", 0, countMobRows());
}
private static HRegion setMobThreshold(HRegion region, byte[] cfName, long modThreshold) {
ColumnFamilyDescriptor cfd = ColumnFamilyDescriptorBuilder
.newBuilder(region.getTableDescriptor().getColumnFamily(cfName))
.setMobThreshold(modThreshold)
.build();
TableDescriptor td = TableDescriptorBuilder
.newBuilder(region.getTableDescriptor())
.removeColumnFamily(cfName)
.setColumnFamily(cfd)
.build();
region.setTableDescriptor(td);
return region;
}
/**
* This test will first generate store files, then bulk load them and trigger the compaction.
* When compaction, the cell value will be larger than the threshold.
*/
@Test
public void testMobCompactionWithBulkload() throws Exception {
// The following will produce store files of 600.
init(UTIL.getConfiguration(), 300);
byte[] dummyData = makeDummyData(600);
Path hbaseRootDir = CommonFSUtils.getRootDir(conf);
Path basedir = new Path(hbaseRootDir, tableDescriptor.getTableName().getNameAsString());
List<Pair<byte[], String>> hfiles = new ArrayList<>(1);
for (int i = 0; i < compactionThreshold; i++) {
Path hpath = new Path(basedir, "hfile" + i);
hfiles.add(Pair.newPair(COLUMN_FAMILY, hpath.toString()));
createHFile(hpath, i, dummyData);
}
// The following will bulk load the above generated store files and compact, with 600(fileSize)
// > 300(threshold)
Map<byte[], List<Path>> map = region.bulkLoadHFiles(hfiles, true, null);
assertTrue("Bulkload result:", !map.isEmpty());
assertEquals("Before compaction: store files", compactionThreshold, countStoreFiles());
assertEquals("Before compaction: mob file count", 0, countMobFiles());
assertEquals("Before compaction: rows", compactionThreshold, UTIL.countRows(region));
assertEquals("Before compaction: mob rows", 0, countMobRows());
assertEquals("Before compaction: referenced mob file count", 0, countReferencedMobFiles());
region.compactStores();
assertEquals("After compaction: store files", 1, countStoreFiles());
assertEquals("After compaction: mob file count:", 1, countMobFiles());
assertEquals("After compaction: rows", compactionThreshold, UTIL.countRows(region));
assertEquals("After compaction: mob rows", compactionThreshold, countMobRows());
assertEquals("After compaction: referenced mob file count", 1, countReferencedMobFiles());
assertEquals("After compaction: number of mob cells", compactionThreshold,
countMobCellsInMetadata());
}
@Test
public void testMajorCompactionAfterDelete() throws Exception {
init(UTIL.getConfiguration(), 100);
byte[] dummyData = makeDummyData(200); // larger than mob threshold
Table loader = new RegionAsTable(region);
// create hfiles and mob hfiles but don't trigger compaction
int numHfiles = compactionThreshold - 1;
byte[] deleteRow = Bytes.add(STARTROW, Bytes.toBytes(0));
for (int i = 0; i < numHfiles; i++) {
Put p = createPut(i, dummyData);
loader.put(p);
region.flush(true);
}
assertEquals("Before compaction: store files", numHfiles, countStoreFiles());
assertEquals("Before compaction: mob file count", numHfiles, countMobFiles());
assertEquals("Before compaction: rows", numHfiles, UTIL.countRows(region));
assertEquals("Before compaction: mob rows", numHfiles, countMobRows());
assertEquals("Before compaction: number of mob cells", numHfiles, countMobCellsInMetadata());
// now let's delete some cells that contain mobs
Delete delete = new Delete(deleteRow);
delete.addFamily(COLUMN_FAMILY);
region.delete(delete);
region.flush(true);
assertEquals("Before compaction: store files", numHfiles + 1, countStoreFiles());
assertEquals("Before compaction: mob files", numHfiles, countMobFiles());
// region.compactStores();
region.compact(true);
assertEquals("After compaction: store files", 1, countStoreFiles());
}
private int countStoreFiles() throws IOException {
HStore store = region.getStore(COLUMN_FAMILY);
return store.getStorefilesCount();
}
private int countMobFiles() throws IOException {
Path mobDirPath = MobUtils.getMobFamilyPath(conf, tableDescriptor.getTableName(),
familyDescriptor.getNameAsString());
if (fs.exists(mobDirPath)) {
FileStatus[] files = UTIL.getTestFileSystem().listStatus(mobDirPath);
return files.length;
}
return 0;
}
private long countMobCellsInMetadata() throws IOException {
long mobCellsCount = 0;
Path mobDirPath = MobUtils.getMobFamilyPath(conf, tableDescriptor.getTableName(),
familyDescriptor.getNameAsString());
Configuration copyOfConf = new Configuration(conf);
copyOfConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0f);
CacheConfig cacheConfig = new CacheConfig(copyOfConf);
if (fs.exists(mobDirPath)) {
FileStatus[] files = UTIL.getTestFileSystem().listStatus(mobDirPath);
for (FileStatus file : files) {
HStoreFile sf = new HStoreFile(fs, file.getPath(), conf, cacheConfig, BloomType.NONE, true);
sf.initReader();
Map<byte[], byte[]> fileInfo = sf.getReader().loadFileInfo();
byte[] count = fileInfo.get(MOB_CELLS_COUNT);
assertTrue(count != null);
mobCellsCount += Bytes.toLong(count);
}
}
return mobCellsCount;
}
private Put createPut(int rowIdx, byte[] dummyData) throws IOException {
Put p = new Put(Bytes.add(STARTROW, Bytes.toBytes(rowIdx)));
p.setDurability(Durability.SKIP_WAL);
p.addColumn(COLUMN_FAMILY, Bytes.toBytes("colX"), dummyData);
return p;
}
/**
* Create an HFile with the given number of bytes
*/
private void createHFile(Path path, int rowIdx, byte[] dummyData) throws IOException {
HFileContext meta = new HFileContextBuilder().build();
HFile.Writer writer = HFile.getWriterFactory(conf, new CacheConfig(conf)).withPath(fs, path)
.withFileContext(meta).create();
long now = System.currentTimeMillis();
try {
KeyValue kv = new KeyValue(Bytes.add(STARTROW, Bytes.toBytes(rowIdx)), COLUMN_FAMILY,
Bytes.toBytes("colX"), now, dummyData);
writer.append(kv);
} finally {
writer.appendFileInfo(BULKLOAD_TIME_KEY, Bytes.toBytes(System.currentTimeMillis()));
writer.close();
}
}
private int countMobRows() throws IOException {
Scan scan = new Scan();
// Do not retrieve the mob data when scanning
scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE));
InternalScanner scanner = region.getScanner(scan);
int scannedCount = 0;
List<Cell> results = new ArrayList<>();
boolean hasMore = true;
while (hasMore) {
hasMore = scanner.next(results);
for (Cell c : results) {
if (MobUtils.isMobReferenceCell(c)) {
scannedCount++;
}
}
results.clear();
}
scanner.close();
return scannedCount;
}
private byte[] makeDummyData(int size) {
byte[] dummyData = new byte[size];
new Random().nextBytes(dummyData);
return dummyData;
}
private int countReferencedMobFiles() throws IOException {
Scan scan = new Scan();
// Do not retrieve the mob data when scanning
scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE));
InternalScanner scanner = region.getScanner(scan);
List<Cell> kvs = new ArrayList<>();
boolean hasMore = true;
String fileName;
Set<String> files = new HashSet<>();
do {
kvs.clear();
hasMore = scanner.next(kvs);
for (Cell kv : kvs) {
if (!MobUtils.isMobReferenceCell(kv)) {
continue;
}
if (!MobUtils.hasValidMobRefCellValue(kv)) {
continue;
}
int size = MobUtils.getMobValueLength(kv);
if (size <= mobCellThreshold) {
continue;
}
fileName = MobUtils.getMobFileName(kv);
if (fileName.isEmpty()) {
continue;
}
files.add(fileName);
Path familyPath = MobUtils.getMobFamilyPath(conf, tableDescriptor.getTableName(),
familyDescriptor.getNameAsString());
assertTrue(fs.exists(new Path(familyPath, fileName)));
}
} while (hasMore);
scanner.close();
return files.size();
}
}