blob: d363181d0343bc405eadfb5b35c1656c410ffff8 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.regionserver.CompactingMemStore;
import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.MiscTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
import org.apache.hadoop.hbase.wal.WAL;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor;
/**
* Test for the case where a regionserver going down has enough cycles to do damage to regions that
* have actually been assigned elsehwere.
* <p>
* If we happen to assign a region before it fully done with in its old location -- i.e. it is on
* two servers at the same time -- all can work fine until the case where the region on the dying
* server decides to compact or otherwise change the region file set. The region in its new location
* will then get a surprise when it tries to do something w/ a file removed by the region in its old
* location on dying server.
* <p>
* Making a test for this case is a little tough in that even if a file is deleted up on the
* namenode, if the file was opened before the delete, it will continue to let reads happen until
* something changes the state of cached blocks in the dfsclient that was already open (a block from
* the deleted file is cleaned from the datanode by NN).
* <p>
* What we will do below is do an explicit check for existence on the files listed in the region
* that has had some files removed because of a compaction. This sort of hurry's along and makes
* certain what is a chance occurance.
*/
@Category({MiscTests.class, LargeTests.class})
public class TestIOFencing {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestIOFencing.class);
private static final Logger LOG = LoggerFactory.getLogger(TestIOFencing.class);
static {
// Uncomment the following lines if more verbosity is needed for
// debugging (see HBASE-12285 for details).
//((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.ALL);
//((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL);
//((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.ALL);
//((Log4JLogger)LogFactory.getLog("org.apache.hadoop.hdfs.server.namenode.FSNamesystem"))
// .getLogger().setLevel(Level.ALL);
//((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL);
}
public abstract static class CompactionBlockerRegion extends HRegion {
AtomicInteger compactCount = new AtomicInteger();
volatile CountDownLatch compactionsBlocked = new CountDownLatch(0);
volatile CountDownLatch compactionsWaiting = new CountDownLatch(0);
@SuppressWarnings("deprecation")
public CompactionBlockerRegion(Path tableDir, WAL log,
FileSystem fs, Configuration confParam, RegionInfo info,
TableDescriptor htd, RegionServerServices rsServices) {
super(tableDir, log, fs, confParam, info, htd, rsServices);
}
public void stopCompactions() {
compactionsBlocked = new CountDownLatch(1);
compactionsWaiting = new CountDownLatch(1);
}
public void allowCompactions() {
LOG.debug("allowing compactions");
compactionsBlocked.countDown();
}
public void waitForCompactionToBlock() throws IOException {
try {
LOG.debug("waiting for compaction to block");
compactionsWaiting.await();
LOG.debug("compaction block reached");
} catch (InterruptedException ex) {
throw new IOException(ex);
}
}
@Override
public boolean compact(CompactionContext compaction, HStore store,
ThroughputController throughputController) throws IOException {
try {
return super.compact(compaction, store, throughputController);
} finally {
compactCount.getAndIncrement();
}
}
@Override
public boolean compact(CompactionContext compaction, HStore store,
ThroughputController throughputController, User user) throws IOException {
try {
return super.compact(compaction, store, throughputController, user);
} finally {
compactCount.getAndIncrement();
}
}
public int countStoreFiles() {
int count = 0;
for (HStore store : stores.values()) {
count += store.getStorefilesCount();
}
return count;
}
}
/**
* An override of HRegion that allows us park compactions in a holding pattern and
* then when appropriate for the test, allow them proceed again.
*/
public static class BlockCompactionsInPrepRegion extends CompactionBlockerRegion {
public BlockCompactionsInPrepRegion(Path tableDir, WAL log,
FileSystem fs, Configuration confParam, RegionInfo info,
TableDescriptor htd, RegionServerServices rsServices) {
super(tableDir, log, fs, confParam, info, htd, rsServices);
}
@Override
protected void doRegionCompactionPrep() throws IOException {
compactionsWaiting.countDown();
try {
compactionsBlocked.await();
} catch (InterruptedException ex) {
throw new IOException();
}
super.doRegionCompactionPrep();
}
}
/**
* An override of HRegion that allows us park compactions in a holding pattern and
* then when appropriate for the test, allow them proceed again. This allows the compaction
* entry to go the WAL before blocking, but blocks afterwards
*/
public static class BlockCompactionsInCompletionRegion extends CompactionBlockerRegion {
public BlockCompactionsInCompletionRegion(Path tableDir, WAL log,
FileSystem fs, Configuration confParam, RegionInfo info,
TableDescriptor htd, RegionServerServices rsServices) {
super(tableDir, log, fs, confParam, info, htd, rsServices);
}
@Override
protected HStore instantiateHStore(final ColumnFamilyDescriptor family, boolean warmup)
throws IOException {
return new BlockCompactionsInCompletionHStore(this, family, this.conf, warmup);
}
}
public static class BlockCompactionsInCompletionHStore extends HStore {
CompactionBlockerRegion r;
protected BlockCompactionsInCompletionHStore(HRegion region, ColumnFamilyDescriptor family,
Configuration confParam, boolean warmup) throws IOException {
super(region, family, confParam, warmup);
r = (CompactionBlockerRegion) region;
}
@Override
protected void refreshStoreSizeAndTotalBytes() throws IOException {
try {
r.compactionsWaiting.countDown();
r.compactionsBlocked.await();
} catch (InterruptedException ex) {
throw new IOException(ex);
}
super.refreshStoreSizeAndTotalBytes();
}
}
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private final static TableName TABLE_NAME =
TableName.valueOf("tabletest");
private final static byte[] FAMILY = Bytes.toBytes("family");
private static final int FIRST_BATCH_COUNT = 4000;
private static final int SECOND_BATCH_COUNT = FIRST_BATCH_COUNT;
/**
* Test that puts up a regionserver, starts a compaction on a loaded region but holds the
* compaction until after we have killed the server and the region has come up on
* a new regionserver altogether. This fakes the double assignment case where region in one
* location changes the files out from underneath a region being served elsewhere.
*/
@Test
public void testFencingAroundCompaction() throws Exception {
for(MemoryCompactionPolicy policy : MemoryCompactionPolicy.values()) {
doTest(BlockCompactionsInPrepRegion.class, policy);
}
}
/**
* Test that puts up a regionserver, starts a compaction on a loaded region but holds the
* compaction completion until after we have killed the server and the region has come up on
* a new regionserver altogether. This fakes the double assignment case where region in one
* location changes the files out from underneath a region being served elsewhere.
*/
@Test
public void testFencingAroundCompactionAfterWALSync() throws Exception {
for(MemoryCompactionPolicy policy : MemoryCompactionPolicy.values()) {
doTest(BlockCompactionsInCompletionRegion.class, policy);
}
}
public void doTest(Class<?> regionClass, MemoryCompactionPolicy policy) throws Exception {
Configuration c = TEST_UTIL.getConfiguration();
// Insert our custom region
c.setClass(HConstants.REGION_IMPL, regionClass, HRegion.class);
// Encourage plenty of flushes
c.setLong("hbase.hregion.memstore.flush.size", 25000);
c.set(HConstants.HBASE_REGION_SPLIT_POLICY_KEY, ConstantSizeRegionSplitPolicy.class.getName());
// Only run compaction when we tell it to
c.setInt("hbase.hstore.compaction.min",1);
c.setInt("hbase.hstore.compactionThreshold", 1000);
c.setLong("hbase.hstore.blockingStoreFiles", 1000);
// Compact quickly after we tell it to!
c.setInt("hbase.regionserver.thread.splitcompactcheckfrequency", 1000);
c.set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, String.valueOf(policy));
LOG.info("Starting mini cluster");
TEST_UTIL.startMiniCluster(1);
CompactionBlockerRegion compactingRegion = null;
Admin admin = null;
try {
LOG.info("Creating admin");
admin = TEST_UTIL.getConnection().getAdmin();
LOG.info("Creating table");
TEST_UTIL.createTable(TABLE_NAME, FAMILY);
Table table = TEST_UTIL.getConnection().getTable(TABLE_NAME);
LOG.info("Loading test table");
// Find the region
List<HRegion> testRegions = TEST_UTIL.getMiniHBaseCluster().findRegionsForTable(TABLE_NAME);
assertEquals(1, testRegions.size());
compactingRegion = (CompactionBlockerRegion)testRegions.get(0);
LOG.info("Blocking compactions");
compactingRegion.stopCompactions();
long lastFlushTime = compactingRegion.getEarliestFlushTimeForAllStores();
// Load some rows
TEST_UTIL.loadNumericRows(table, FAMILY, 0, FIRST_BATCH_COUNT);
// add a compaction from an older (non-existing) region to see whether we successfully skip
// those entries
RegionInfo oldHri = RegionInfoBuilder.newBuilder(table.getName()).build();
CompactionDescriptor compactionDescriptor = ProtobufUtil.toCompactionDescriptor(oldHri,
FAMILY, Lists.newArrayList(new Path("/a")), Lists.newArrayList(new Path("/b")),
new Path("store_dir"));
WALUtil.writeCompactionMarker(compactingRegion.getWAL(),
((HRegion)compactingRegion).getReplicationScope(),
oldHri, compactionDescriptor, compactingRegion.getMVCC());
// Wait till flush has happened, otherwise there won't be multiple store files
long startWaitTime = System.currentTimeMillis();
while (compactingRegion.getEarliestFlushTimeForAllStores() <= lastFlushTime ||
compactingRegion.countStoreFiles() <= 1) {
LOG.info("Waiting for the region to flush " +
compactingRegion.getRegionInfo().getRegionNameAsString());
Thread.sleep(1000);
admin.flush(table.getName());
assertTrue("Timed out waiting for the region to flush",
System.currentTimeMillis() - startWaitTime < 30000);
}
assertTrue(compactingRegion.countStoreFiles() > 1);
final byte REGION_NAME[] = compactingRegion.getRegionInfo().getRegionName();
LOG.info("Asking for compaction");
admin.majorCompact(TABLE_NAME);
LOG.info("Waiting for compaction to be about to start");
compactingRegion.waitForCompactionToBlock();
LOG.info("Starting a new server");
RegionServerThread newServerThread = TEST_UTIL.getMiniHBaseCluster().startRegionServer();
final HRegionServer newServer = newServerThread.getRegionServer();
LOG.info("Killing region server ZK lease");
TEST_UTIL.expireRegionServerSession(0);
CompactionBlockerRegion newRegion = null;
startWaitTime = System.currentTimeMillis();
LOG.info("Waiting for the new server to pick up the region " + Bytes.toString(REGION_NAME));
// wait for region to be assigned and to go out of log replay if applicable
Waiter.waitFor(c, 60000, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
Region newRegion = newServer.getOnlineRegion(REGION_NAME);
return newRegion != null;
}
});
newRegion = (CompactionBlockerRegion)newServer.getOnlineRegion(REGION_NAME);
// After compaction of old region finishes on the server that was going down, make sure that
// all the files we expect are still working when region is up in new location.
FileSystem fs = newRegion.getFilesystem();
for (String f: newRegion.getStoreFileList(new byte [][] {FAMILY})) {
assertTrue("After compaction, does not exist: " + f, fs.exists(new Path(f)));
}
LOG.info("Allowing compaction to proceed");
compactingRegion.allowCompactions();
while (compactingRegion.compactCount.get() == 0) {
Thread.sleep(1000);
}
// The server we killed stays up until the compaction that was started before it was killed
// completes. In logs you should see the old regionserver now going down.
LOG.info("Compaction finished");
// If we survive the split keep going...
// Now we make sure that the region isn't totally confused. Load up more rows.
TEST_UTIL.loadNumericRows(table, FAMILY, FIRST_BATCH_COUNT,
FIRST_BATCH_COUNT + SECOND_BATCH_COUNT);
admin.majorCompact(TABLE_NAME);
startWaitTime = System.currentTimeMillis();
while (newRegion.compactCount.get() == 0) {
Thread.sleep(1000);
assertTrue("New region never compacted",
System.currentTimeMillis() - startWaitTime < 180000);
}
int count;
for (int i = 0;; i++) {
try {
count = HBaseTestingUtility.countRows(table);
break;
} catch (DoNotRetryIOException e) {
// wait up to 30s
if (i >= 30 || !e.getMessage().contains("File does not exist")) {
throw e;
}
Thread.sleep(1000);
}
}
if (policy == MemoryCompactionPolicy.EAGER || policy == MemoryCompactionPolicy.ADAPTIVE) {
assertTrue(FIRST_BATCH_COUNT + SECOND_BATCH_COUNT >= count);
} else {
assertEquals(FIRST_BATCH_COUNT + SECOND_BATCH_COUNT, count);
}
} finally {
if (compactingRegion != null) {
compactingRegion.allowCompactions();
}
admin.close();
TEST_UTIL.shutdownMiniCluster();
}
}
}