blob: 88f201efff6f83b86cffdfbac064ad4d28bfbd35 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Deque;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.AsyncClusterConnection;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.crypto.Encryption;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.tool.BulkLoadHFilesTool;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Threads;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.collect.Multimap;
@Category({RegionServerTests.class, MediumTests.class})
public class TestSecureBulkLoadManager {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestSecureBulkLoadManager.class);
private static final Logger LOG =
LoggerFactory.getLogger(TestSecureBulkLoadManager.class);
private static TableName TABLE = TableName.valueOf(Bytes.toBytes("TestSecureBulkLoadManager"));
private static byte[] FAMILY = Bytes.toBytes("family");
private static byte[] COLUMN = Bytes.toBytes("column");
private static byte[] key1 = Bytes.toBytes("row1");
private static byte[] key2 = Bytes.toBytes("row2");
private static byte[] key3 = Bytes.toBytes("row3");
private static byte[] value1 = Bytes.toBytes("t1");
private static byte[] value3 = Bytes.toBytes("t3");
private static byte[] SPLIT_ROWKEY = key2;
private Thread ealierBulkload;
private Thread laterBulkload;
protected final static HBaseTestingUtility testUtil = new HBaseTestingUtility();
private static Configuration conf = testUtil.getConfiguration();
@BeforeClass
public static void setUp() throws Exception {
testUtil.startMiniCluster();
}
@AfterClass
public static void tearDown() throws Exception {
testUtil.shutdownMiniCluster();
testUtil.cleanupTestDir();
}
/**
* After a secure bulkload finished , there is a clean-up for FileSystems used in the bulkload.
* Sometimes, FileSystems used in the finished bulkload might also be used in other bulkload
* calls, or there are other FileSystems created by the same user, they could be closed by a
* FileSystem.closeAllForUGI call. So during the clean-up, those FileSystems need to be used
* later can not get closed ,or else a race condition occurs.
*
* testForRaceCondition tests the case that two secure bulkload calls from the same UGI go
* into two different regions and one bulkload finishes earlier when the other bulkload still
* needs its FileSystems, checks that both bulkloads succeed.
*/
@Test
public void testForRaceCondition() throws Exception {
Consumer<HRegion> fsCreatedListener = new Consumer<HRegion>() {
@Override
public void accept(HRegion hRegion) {
if (hRegion.getRegionInfo().containsRow(key3)) {
Threads.shutdown(ealierBulkload);/// wait util the other bulkload finished
}
}
} ;
testUtil.getMiniHBaseCluster().getRegionServerThreads().get(0).getRegionServer()
.getSecureBulkLoadManager().setFsCreatedListener(fsCreatedListener);
/// create table
testUtil.createTable(TABLE,FAMILY,Bytes.toByteArrays(SPLIT_ROWKEY));
/// prepare files
Path rootdir = testUtil.getMiniHBaseCluster().getRegionServerThreads().get(0)
.getRegionServer().getDataRootDir();
Path dir1 = new Path(rootdir, "dir1");
prepareHFile(dir1, key1, value1);
Path dir2 = new Path(rootdir, "dir2");
prepareHFile(dir2, key3, value3);
/// do bulkload
final AtomicReference<Throwable> t1Exception = new AtomicReference<>();
final AtomicReference<Throwable> t2Exception = new AtomicReference<>();
ealierBulkload = new Thread(new Runnable() {
@Override
public void run() {
try {
doBulkloadWithoutRetry(dir1);
} catch (Exception e) {
LOG.error("bulk load failed .",e);
t1Exception.set(e);
}
}
});
laterBulkload = new Thread(new Runnable() {
@Override
public void run() {
try {
doBulkloadWithoutRetry(dir2);
} catch (Exception e) {
LOG.error("bulk load failed .",e);
t2Exception.set(e);
}
}
});
ealierBulkload.start();
laterBulkload.start();
Threads.shutdown(ealierBulkload);
Threads.shutdown(laterBulkload);
Assert.assertNull(t1Exception.get());
Assert.assertNull(t2Exception.get());
/// check bulkload ok
Get get1 = new Get(key1);
Get get3 = new Get(key3);
Table t = testUtil.getConnection().getTable(TABLE);
Result r = t.get(get1);
Assert.assertArrayEquals(r.getValue(FAMILY, COLUMN), value1);
r = t.get(get3);
Assert.assertArrayEquals(r.getValue(FAMILY, COLUMN), value3);
}
/**
* A trick is used to make sure server-side failures( if any ) not being covered up by a client
* retry. Since BulkLoadHFilesTool.bulkLoad keeps performing bulkload calls as long as the
* HFile queue is not empty, while server-side exceptions in the doAs block do not lead
* to a client exception, a bulkload will always succeed in this case by default, thus client
* will never be aware that failures have ever happened . To avoid this kind of retry ,
* a MyExceptionToAvoidRetry exception is thrown after bulkLoadPhase finished and caught
* silently outside the doBulkLoad call, so that the bulkLoadPhase would be called exactly
* once, and server-side failures, if any ,can be checked via data.
*/
class MyExceptionToAvoidRetry extends DoNotRetryIOException {
private static final long serialVersionUID = -6802760664998771151L;
}
private void doBulkloadWithoutRetry(Path dir) throws Exception {
BulkLoadHFilesTool h = new BulkLoadHFilesTool(conf) {
@Override
protected void bulkLoadPhase(AsyncClusterConnection conn, TableName tableName,
Deque<LoadQueueItem> queue, Multimap<ByteBuffer, LoadQueueItem> regionGroups,
boolean copyFiles, Map<LoadQueueItem, ByteBuffer> item2RegionMap) throws IOException {
super.bulkLoadPhase(conn, tableName, queue, regionGroups, copyFiles, item2RegionMap);
throw new MyExceptionToAvoidRetry(); // throw exception to avoid retry
}
};
try {
h.bulkLoad(TABLE, dir);
Assert.fail("MyExceptionToAvoidRetry is expected");
} catch (MyExceptionToAvoidRetry e) { //expected
}
}
private void prepareHFile(Path dir, byte[] key, byte[] value) throws Exception {
TableDescriptor desc = testUtil.getAdmin().getDescriptor(TABLE);
ColumnFamilyDescriptor family = desc.getColumnFamily(FAMILY);
Compression.Algorithm compression = HFile.DEFAULT_COMPRESSION_ALGORITHM;
CacheConfig writerCacheConf = new CacheConfig(conf, family, null, ByteBuffAllocator.HEAP);
writerCacheConf.setCacheDataOnWrite(false);
HFileContext hFileContext = new HFileContextBuilder()
.withIncludesMvcc(false)
.withIncludesTags(true)
.withCompression(compression)
.withCompressTags(family.isCompressTags())
.withChecksumType(StoreUtils.getChecksumType(conf))
.withBytesPerCheckSum(StoreUtils.getBytesPerChecksum(conf))
.withBlockSize(family.getBlocksize())
.withHBaseCheckSum(true)
.withDataBlockEncoding(family.getDataBlockEncoding())
.withEncryptionContext(Encryption.Context.NONE)
.withCreateTime(EnvironmentEdgeManager.currentTime())
.build();
StoreFileWriter.Builder builder =
new StoreFileWriter.Builder(conf, writerCacheConf, dir.getFileSystem(conf))
.withOutputDir(new Path(dir, family.getNameAsString()))
.withBloomType(family.getBloomFilterType())
.withMaxKeyCount(Integer.MAX_VALUE)
.withFileContext(hFileContext);
StoreFileWriter writer = builder.build();
Put put = new Put(key);
put.addColumn(FAMILY, COLUMN, value);
for (Cell c : put.get(FAMILY, COLUMN)) {
writer.append(c);
}
writer.close();
}
}