blob: 69bc9a70a64352590c452e227564febe38c4a338 [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CategoryBasedTimeout;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.UnknownRegionException;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Consistency;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TestReplicasClient.SlowMeCopro;
import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.MasterObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.exceptions.UnexpectedStateException;
import org.apache.hadoop.hbase.master.*;
import org.apache.hadoop.hbase.master.RegionState.State;
import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
import org.apache.hadoop.hbase.master.assignment.RegionStates;
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcController;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.HBaseFsck;
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NodeExistsException;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.junit.rules.TestRule;
/**
* The below tests are testing split region against a running cluster
*/
@Category({RegionServerTests.class, LargeTests.class})
@SuppressWarnings("deprecation")
public class TestSplitTransactionOnCluster {
private static final Log LOG = LogFactory.getLog(TestSplitTransactionOnCluster.class);
@Rule public final TestRule timeout = CategoryBasedTimeout.builder().withTimeout(this.getClass()).
withLookingForStuckThread(true).build();
private Admin admin = null;
private MiniHBaseCluster cluster = null;
private static final int NB_SERVERS = 3;
static final HBaseTestingUtility TESTING_UTIL =
new HBaseTestingUtility();
@Rule
public TestName name = new TestName();
@BeforeClass public static void before() throws Exception {
TESTING_UTIL.getConfiguration().setInt(HConstants.HBASE_BALANCER_PERIOD, 60000);
TESTING_UTIL.startMiniCluster(1, NB_SERVERS, null, MyMaster.class, null);
}
@AfterClass public static void after() throws Exception {
TESTING_UTIL.shutdownMiniCluster();
}
@Before public void setup() throws IOException {
TESTING_UTIL.ensureSomeNonStoppedRegionServersAvailable(NB_SERVERS);
this.admin = TESTING_UTIL.getAdmin();
this.cluster = TESTING_UTIL.getMiniHBaseCluster();
}
@After
public void tearDown() throws Exception {
this.admin.close();
for (HTableDescriptor htd: this.admin.listTables()) {
LOG.info("Tear down, remove table=" + htd.getTableName());
TESTING_UTIL.deleteTable(htd.getTableName());
}
}
private HRegionInfo getAndCheckSingleTableRegion(final List<HRegion> regions)
throws IOException, InterruptedException {
assertEquals(1, regions.size());
HRegionInfo hri = regions.get(0).getRegionInfo();
try {
cluster.getMaster().getAssignmentManager().waitForAssignment(hri, 600000);
} catch (NoSuchProcedureException e) {
LOG.info("Presume the procedure has been cleaned up so just proceed: " + e.toString());
}
return hri;
}
private void requestSplitRegion(
final HRegionServer rsServer,
final Region region,
final byte[] midKey) throws IOException {
long procId = cluster.getMaster().splitRegion(region.getRegionInfo(), midKey, 0, 0);
// wait for the split to complete or get interrupted. If the split completes successfully,
// the procedure will return true; if the split fails, the procedure would throw exception.
ProcedureTestingUtility.waitProcedure(cluster.getMaster().getMasterProcedureExecutor(), procId);
}
@Test
public void testRITStateForRollback() throws Exception {
final TableName tableName = TableName.valueOf(name.getMethodName());
final HMaster master = cluster.getMaster();
try {
// Create table then get the single region for our new table.
Table t = createTableAndWait(tableName, Bytes.toBytes("cf"));
final List<HRegion> regions = cluster.getRegions(tableName);
final HRegionInfo hri = getAndCheckSingleTableRegion(regions);
insertData(tableName, admin, t);
t.close();
// Turn off balancer so it doesn't cut in and mess up our placements.
this.admin.setBalancerRunning(false, true);
// Turn off the meta scanner so it don't remove parent on us.
master.setCatalogJanitorEnabled(false);
// find a splittable region
final HRegion region = findSplittableRegion(regions);
assertTrue("not able to find a splittable region", region != null);
// install master co-processor to fail splits
master.getMasterCoprocessorHost().load(
FailingSplitMasterObserver.class,
Coprocessor.PRIORITY_USER,
master.getConfiguration());
// split async
this.admin.splitRegion(region.getRegionInfo().getRegionName(), new byte[] {42});
// we have to wait until the SPLITTING state is seen by the master
FailingSplitMasterObserver observer =
(FailingSplitMasterObserver) master.getMasterCoprocessorHost().findCoprocessor(
FailingSplitMasterObserver.class.getName());
assertNotNull(observer);
observer.latch.await();
LOG.info("Waiting for region to come out of RIT");
while (!cluster.getMaster().getAssignmentManager().getRegionStates().isRegionOnline(hri)) {
Threads.sleep(100);
}
assertTrue(cluster.getMaster().getAssignmentManager().getRegionStates().isRegionOnline(hri));
} finally {
admin.setBalancerRunning(true, false);
master.setCatalogJanitorEnabled(true);
abortAndWaitForMaster();
TESTING_UTIL.deleteTable(tableName);
}
}
@Test
public void testSplitFailedCompactionAndSplit() throws Exception {
final TableName tableName = TableName.valueOf(name.getMethodName());
// Create table then get the single region for our new table.
HTableDescriptor htd = new HTableDescriptor(tableName);
byte[] cf = Bytes.toBytes("cf");
htd.addFamily(new HColumnDescriptor(cf));
admin.createTable(htd);
for (int i = 0; cluster.getRegions(tableName).isEmpty() && i < 100; i++) {
Thread.sleep(100);
}
assertEquals(1, cluster.getRegions(tableName).size());
HRegion region = cluster.getRegions(tableName).get(0);
Store store = region.getStore(cf);
int regionServerIndex = cluster.getServerWith(region.getRegionInfo().getRegionName());
HRegionServer regionServer = cluster.getRegionServer(regionServerIndex);
Table t = TESTING_UTIL.getConnection().getTable(tableName);
// insert data
insertData(tableName, admin, t);
insertData(tableName, admin, t);
int fileNum = store.getStorefiles().size();
// 0, Compaction Request
store.triggerMajorCompaction();
CompactionContext cc = store.requestCompaction();
assertNotNull(cc);
// 1, A timeout split
// 1.1 close region
assertEquals(2, region.close(false).get(cf).size());
// 1.2 rollback and Region initialize again
region.initialize();
// 2, Run Compaction cc
assertFalse(region.compact(cc, store, NoLimitThroughputController.INSTANCE));
assertTrue(fileNum > store.getStorefiles().size());
// 3, Split
requestSplitRegion(regionServer, region, Bytes.toBytes("row3"));
assertEquals(2, cluster.getRegions(tableName).size());
}
public static class FailingSplitMasterObserver implements MasterObserver {
volatile CountDownLatch latch;
@Override
public void start(CoprocessorEnvironment e) throws IOException {
latch = new CountDownLatch(1);
}
@Override
public void preSplitRegionBeforePONRAction(
final ObserverContext<MasterCoprocessorEnvironment> ctx,
final byte[] splitKey,
final List<Mutation> metaEntries) throws IOException {
latch.countDown();
throw new IOException("Causing rollback of region split");
}
}
@Test
public void testSplitRollbackOnRegionClosing() throws IOException, InterruptedException {
final TableName tableName = TableName.valueOf(name.getMethodName());
// Create table then get the single region for our new table.
Table t = createTableAndWait(tableName, HConstants.CATALOG_FAMILY);
List<HRegion> regions = cluster.getRegions(tableName);
HRegionInfo hri = getAndCheckSingleTableRegion(regions);
int tableRegionIndex = ensureTableRegionNotOnSameServerAsMeta(admin, hri);
RegionStates regionStates = cluster.getMaster().getAssignmentManager().getRegionStates();
// Turn off balancer so it doesn't cut in and mess up our placements.
this.admin.setBalancerRunning(false, true);
// Turn off the meta scanner so it don't remove parent on us.
cluster.getMaster().setCatalogJanitorEnabled(false);
try {
// Add a bit of load up into the table so splittable.
TESTING_UTIL.loadTable(t, HConstants.CATALOG_FAMILY, false);
// Get region pre-split.
HRegionServer server = cluster.getRegionServer(tableRegionIndex);
printOutRegions(server, "Initial regions: ");
int regionCount = ProtobufUtil.getOnlineRegions(server.getRSRpcServices()).size();
regionStates.updateRegionState(hri, RegionState.State.CLOSING);
// Now try splitting.... should fail. And each should successfully
// rollback.
this.admin.splitRegion(hri.getRegionName());
this.admin.splitRegion(hri.getRegionName());
this.admin.splitRegion(hri.getRegionName());
// Wait around a while and assert count of regions remains constant.
for (int i = 0; i < 10; i++) {
Thread.sleep(100);
assertEquals(regionCount, ProtobufUtil.getOnlineRegions(
server.getRSRpcServices()).size());
}
regionStates.updateRegionState(hri, State.OPEN);
// Now try splitting and it should work.
split(hri, server, regionCount);
// Get daughters
checkAndGetDaughters(tableName);
// OK, so split happened after we cleared the blocking node.
} finally {
admin.setBalancerRunning(true, false);
cluster.getMaster().setCatalogJanitorEnabled(true);
t.close();
}
}
/**
* Test that if daughter split on us, we won't do the shutdown handler fixup
* just because we can't find the immediate daughter of an offlined parent.
* @throws IOException
* @throws InterruptedException
*/
@Test
public void testShutdownFixupWhenDaughterHasSplit()throws IOException, InterruptedException {
final TableName tableName = TableName.valueOf(name.getMethodName());
// Create table then get the single region for our new table.
Table t = createTableAndWait(tableName, HConstants.CATALOG_FAMILY);
List<HRegion> regions = cluster.getRegions(tableName);
HRegionInfo hri = getAndCheckSingleTableRegion(regions);
int tableRegionIndex = ensureTableRegionNotOnSameServerAsMeta(admin, hri);
// Turn off balancer so it doesn't cut in and mess up our placements.
this.admin.setBalancerRunning(false, true);
// Turn off the meta scanner so it don't remove parent on us.
cluster.getMaster().setCatalogJanitorEnabled(false);
try {
// Add a bit of load up into the table so splittable.
TESTING_UTIL.loadTable(t, HConstants.CATALOG_FAMILY);
// Get region pre-split.
HRegionServer server = cluster.getRegionServer(tableRegionIndex);
printOutRegions(server, "Initial regions: ");
int regionCount = ProtobufUtil.getOnlineRegions(server.getRSRpcServices()).size();
// Now split.
split(hri, server, regionCount);
// Get daughters
List<HRegion> daughters = checkAndGetDaughters(tableName);
// Now split one of the daughters.
regionCount = ProtobufUtil.getOnlineRegions(server.getRSRpcServices()).size();
HRegionInfo daughter = daughters.get(0).getRegionInfo();
LOG.info("Daughter we are going to split: " + daughter);
// Compact first to ensure we have cleaned up references -- else the split
// will fail.
this.admin.compactRegion(daughter.getRegionName());
daughters = cluster.getRegions(tableName);
HRegion daughterRegion = null;
for (HRegion r: daughters) {
if (r.getRegionInfo().equals(daughter)) {
daughterRegion = r;
LOG.info("Found matching HRI: " + daughterRegion);
break;
}
}
assertTrue(daughterRegion != null);
for (int i=0; i<100; i++) {
if (!daughterRegion.hasReferences()) break;
Threads.sleep(100);
}
assertFalse("Waiting for reference to be compacted", daughterRegion.hasReferences());
LOG.info("Daughter hri before split (has been compacted): " + daughter);
split(daughter, server, regionCount);
// Get list of daughters
daughters = cluster.getRegions(tableName);
for (HRegion d: daughters) {
LOG.info("Regions before crash: " + d);
}
// Now crash the server
cluster.abortRegionServer(tableRegionIndex);
waitUntilRegionServerDead();
awaitDaughters(tableName, daughters.size());
// Assert daughters are online and ONLY the original daughters -- that
// fixup didn't insert one during server shutdown recover.
regions = cluster.getRegions(tableName);
for (HRegion d: daughters) {
LOG.info("Regions after crash: " + d);
}
if (daughters.size() != regions.size()) {
LOG.info("Daughters=" + daughters.size() + ", regions=" + regions.size());
}
assertEquals(daughters.size(), regions.size());
for (HRegion r: regions) {
LOG.info("Regions post crash " + r + ", contains=" + daughters.contains(r));
assertTrue("Missing region post crash " + r, daughters.contains(r));
}
} finally {
LOG.info("EXITING");
admin.setBalancerRunning(true, false);
cluster.getMaster().setCatalogJanitorEnabled(true);
t.close();
}
}
@Test
public void testSplitShouldNotThrowNPEEvenARegionHasEmptySplitFiles() throws Exception {
TableName userTableName = TableName.valueOf(name.getMethodName());
HTableDescriptor htd = new HTableDescriptor(userTableName);
HColumnDescriptor hcd = new HColumnDescriptor("col");
htd.addFamily(hcd);
admin.createTable(htd);
Table table = TESTING_UTIL.getConnection().getTable(userTableName);
try {
for (int i = 0; i <= 5; i++) {
String row = "row" + i;
Put p = new Put(row.getBytes());
String val = "Val" + i;
p.addColumn("col".getBytes(), "ql".getBytes(), val.getBytes());
table.put(p);
admin.flush(userTableName);
Delete d = new Delete(row.getBytes());
// Do a normal delete
table.delete(d);
admin.flush(userTableName);
}
admin.majorCompact(userTableName);
List<HRegionInfo> regionsOfTable =
cluster.getMaster().getAssignmentManager().getRegionStates()
.getRegionsOfTable(userTableName);
assertEquals(1, regionsOfTable.size());
HRegionInfo hRegionInfo = regionsOfTable.get(0);
Put p = new Put("row6".getBytes());
p.addColumn("col".getBytes(), "ql".getBytes(), "val".getBytes());
table.put(p);
p = new Put("row7".getBytes());
p.addColumn("col".getBytes(), "ql".getBytes(), "val".getBytes());
table.put(p);
p = new Put("row8".getBytes());
p.addColumn("col".getBytes(), "ql".getBytes(), "val".getBytes());
table.put(p);
admin.flush(userTableName);
admin.splitRegion(hRegionInfo.getRegionName(), "row7".getBytes());
regionsOfTable = cluster.getMaster()
.getAssignmentManager().getRegionStates()
.getRegionsOfTable(userTableName);
while (regionsOfTable.size() != 2) {
Thread.sleep(1000);
regionsOfTable = cluster.getMaster()
.getAssignmentManager().getRegionStates()
.getRegionsOfTable(userTableName);
LOG.debug("waiting 2 regions to be available, got " + regionsOfTable.size() +
": " + regionsOfTable);
}
Assert.assertEquals(2, regionsOfTable.size());
Scan s = new Scan();
ResultScanner scanner = table.getScanner(s);
int mainTableCount = 0;
for (Result rr = scanner.next(); rr != null; rr = scanner.next()) {
mainTableCount++;
}
Assert.assertEquals(3, mainTableCount);
} finally {
table.close();
}
}
/**
* Verifies HBASE-5806. Here the case is that splitting is completed but before the
* CJ could remove the parent region the master is killed and restarted.
* @throws IOException
* @throws InterruptedException
* @throws NodeExistsException
* @throws KeeperException
*/
@Test
public void testMasterRestartAtRegionSplitPendingCatalogJanitor()
throws IOException, InterruptedException, NodeExistsException,
KeeperException, ServiceException {
final TableName tableName = TableName.valueOf(name.getMethodName());
// Create table then get the single region for our new table.
Table t = createTableAndWait(tableName, HConstants.CATALOG_FAMILY);
List<HRegion> regions = cluster.getRegions(tableName);
HRegionInfo hri = getAndCheckSingleTableRegion(regions);
int tableRegionIndex = ensureTableRegionNotOnSameServerAsMeta(admin, hri);
// Turn off balancer so it doesn't cut in and mess up our placements.
this.admin.setBalancerRunning(false, true);
// Turn off the meta scanner so it don't remove parent on us.
cluster.getMaster().setCatalogJanitorEnabled(false);
try {
// Add a bit of load up into the table so splittable.
TESTING_UTIL.loadTable(t, HConstants.CATALOG_FAMILY, false);
// Get region pre-split.
HRegionServer server = cluster.getRegionServer(tableRegionIndex);
printOutRegions(server, "Initial regions: ");
// Call split.
this.admin.splitRegion(hri.getRegionName());
List<HRegion> daughters = checkAndGetDaughters(tableName);
// Before cleanup, get a new master.
HMaster master = abortAndWaitForMaster();
// Now call compact on the daughters and clean up any references.
for (HRegion daughter: daughters) {
daughter.compact(true);
assertFalse(daughter.hasReferences());
}
// BUT calling compact on the daughters is not enough. The CatalogJanitor looks
// in the filesystem, and the filesystem content is not same as what the Region
// is reading from. Compacted-away files are picked up later by the compacted
// file discharger process. It runs infrequently. Make it run so CatalogJanitor
// doens't find any references.
for (RegionServerThread rst: cluster.getRegionServerThreads()) {
boolean oldSetting = rst.getRegionServer().compactedFileDischarger.setUseExecutor(false);
rst.getRegionServer().compactedFileDischarger.run();
rst.getRegionServer().compactedFileDischarger.setUseExecutor(oldSetting);
}
cluster.getMaster().setCatalogJanitorEnabled(true);
LOG.info("Starting run of CatalogJanitor");
cluster.getMaster().getCatalogJanitor().run();
ProcedureTestingUtility.waitAllProcedures(cluster.getMaster().getMasterProcedureExecutor());
RegionStates regionStates = master.getAssignmentManager().getRegionStates();
ServerName regionServerOfRegion = regionStates.getRegionServerOfRegion(hri);
assertEquals(null, regionServerOfRegion);
} finally {
TESTING_UTIL.getAdmin().setBalancerRunning(true, false);
cluster.getMaster().setCatalogJanitorEnabled(true);
t.close();
}
}
@Test
public void testSplitWithRegionReplicas() throws Exception {
final TableName tableName = TableName.valueOf(name.getMethodName());
HTableDescriptor htd = TESTING_UTIL.createTableDescriptor(name.getMethodName());
htd.setRegionReplication(2);
htd.addCoprocessor(SlowMeCopro.class.getName());
// Create table then get the single region for our new table.
Table t = TESTING_UTIL.createTable(htd, new byte[][]{Bytes.toBytes("cf")}, null);
List<HRegion> oldRegions;
do {
oldRegions = cluster.getRegions(tableName);
Thread.sleep(10);
} while (oldRegions.size() != 2);
for (HRegion h : oldRegions) LOG.debug("OLDREGION " + h.getRegionInfo());
try {
int regionServerIndex = cluster.getServerWith(oldRegions.get(0).getRegionInfo()
.getRegionName());
HRegionServer regionServer = cluster.getRegionServer(regionServerIndex);
insertData(tableName, admin, t);
// Turn off balancer so it doesn't cut in and mess up our placements.
admin.setBalancerRunning(false, true);
// Turn off the meta scanner so it don't remove parent on us.
cluster.getMaster().setCatalogJanitorEnabled(false);
boolean tableExists = MetaTableAccessor.tableExists(regionServer.getConnection(),
tableName);
assertEquals("The specified table should be present.", true, tableExists);
final HRegion region = findSplittableRegion(oldRegions);
regionServerIndex = cluster.getServerWith(region.getRegionInfo().getRegionName());
regionServer = cluster.getRegionServer(regionServerIndex);
assertTrue("not able to find a splittable region", region != null);
try {
requestSplitRegion(regionServer, region, Bytes.toBytes("row2"));
} catch (IOException e) {
e.printStackTrace();
fail("Split execution should have succeeded with no exceptions thrown " + e);
}
//TESTING_UTIL.waitUntilAllRegionsAssigned(tableName);
List<HRegion> newRegions;
do {
newRegions = cluster.getRegions(tableName);
for (HRegion h : newRegions) LOG.debug("NEWREGION " + h.getRegionInfo());
Thread.sleep(1000);
} while ((newRegions.contains(oldRegions.get(0)) || newRegions.contains(oldRegions.get(1)))
|| newRegions.size() != 4);
tableExists = MetaTableAccessor.tableExists(regionServer.getConnection(),
tableName);
assertEquals("The specified table should be present.", true, tableExists);
// exists works on stale and we see the put after the flush
byte[] b1 = "row1".getBytes();
Get g = new Get(b1);
g.setConsistency(Consistency.STRONG);
// The following GET will make a trip to the meta to get the new location of the 1st daughter
// In the process it will also get the location of the replica of the daughter (initially
// pointing to the parent's replica)
Result r = t.get(g);
Assert.assertFalse(r.isStale());
LOG.info("exists stale after flush done");
SlowMeCopro.getCdl().set(new CountDownLatch(1));
g = new Get(b1);
g.setConsistency(Consistency.TIMELINE);
// This will succeed because in the previous GET we get the location of the replica
r = t.get(g);
Assert.assertTrue(r.isStale());
SlowMeCopro.getCdl().get().countDown();
} finally {
SlowMeCopro.getCdl().get().countDown();
admin.setBalancerRunning(true, false);
cluster.getMaster().setCatalogJanitorEnabled(true);
t.close();
}
}
private void insertData(final TableName tableName, Admin admin, Table t) throws IOException,
InterruptedException {
Put p = new Put(Bytes.toBytes("row1"));
p.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("q1"), Bytes.toBytes("1"));
t.put(p);
p = new Put(Bytes.toBytes("row2"));
p.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("q1"), Bytes.toBytes("2"));
t.put(p);
p = new Put(Bytes.toBytes("row3"));
p.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("q1"), Bytes.toBytes("3"));
t.put(p);
p = new Put(Bytes.toBytes("row4"));
p.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("q1"), Bytes.toBytes("4"));
t.put(p);
admin.flush(tableName);
}
/**
* If a table has regions that have no store files in a region, they should split successfully
* into two regions with no store files.
*/
@Test
public void testSplitRegionWithNoStoreFiles()
throws Exception {
final TableName tableName = TableName.valueOf(name.getMethodName());
// Create table then get the single region for our new table.
createTableAndWait(tableName, HConstants.CATALOG_FAMILY);
List<HRegion> regions = cluster.getRegions(tableName);
HRegionInfo hri = getAndCheckSingleTableRegion(regions);
ensureTableRegionNotOnSameServerAsMeta(admin, hri);
int regionServerIndex = cluster.getServerWith(regions.get(0).getRegionInfo()
.getRegionName());
HRegionServer regionServer = cluster.getRegionServer(regionServerIndex);
// Turn off balancer so it doesn't cut in and mess up our placements.
this.admin.setBalancerRunning(false, true);
// Turn off the meta scanner so it don't remove parent on us.
cluster.getMaster().setCatalogJanitorEnabled(false);
try {
// Precondition: we created a table with no data, no store files.
printOutRegions(regionServer, "Initial regions: ");
Configuration conf = cluster.getConfiguration();
HBaseFsck.debugLsr(conf, new Path("/"));
Path rootDir = FSUtils.getRootDir(conf);
FileSystem fs = TESTING_UTIL.getDFSCluster().getFileSystem();
Map<String, Path> storefiles =
FSUtils.getTableStoreFilePathMap(null, fs, rootDir, tableName);
assertEquals("Expected nothing but found " + storefiles.toString(), storefiles.size(), 0);
// find a splittable region. Refresh the regions list
regions = cluster.getRegions(tableName);
final HRegion region = findSplittableRegion(regions);
assertTrue("not able to find a splittable region", region != null);
// Now split.
try {
requestSplitRegion(regionServer, region, Bytes.toBytes("row2"));
} catch (IOException e) {
fail("Split execution should have succeeded with no exceptions thrown");
}
// Postcondition: split the table with no store files into two regions, but still have no
// store files
List<HRegion> daughters = cluster.getRegions(tableName);
assertEquals(2, daughters.size());
// check dirs
HBaseFsck.debugLsr(conf, new Path("/"));
Map<String, Path> storefilesAfter =
FSUtils.getTableStoreFilePathMap(null, fs, rootDir, tableName);
assertEquals("Expected nothing but found " + storefilesAfter.toString(),
storefilesAfter.size(), 0);
hri = region.getRegionInfo(); // split parent
AssignmentManager am = cluster.getMaster().getAssignmentManager();
RegionStates regionStates = am.getRegionStates();
long start = EnvironmentEdgeManager.currentTime();
while (!regionStates.isRegionInState(hri, State.SPLIT)) {
LOG.debug("Waiting for SPLIT state on: " + hri);
assertFalse("Timed out in waiting split parent to be in state SPLIT",
EnvironmentEdgeManager.currentTime() - start > 60000);
Thread.sleep(500);
}
assertTrue(regionStates.isRegionInState(daughters.get(0).getRegionInfo(), State.OPEN));
assertTrue(regionStates.isRegionInState(daughters.get(1).getRegionInfo(), State.OPEN));
// We should not be able to assign it again
am.assign(hri, true);
assertFalse("Split region can't be assigned",
regionStates.isRegionInTransition(hri));
assertTrue(regionStates.isRegionInState(hri, State.SPLIT));
// We should not be able to unassign it either
try {
am.unassign(hri);
fail("Should have thrown exception");
} catch (UnexpectedStateException e) {
// Expected
}
assertFalse("Split region can't be unassigned",
regionStates.isRegionInTransition(hri));
assertTrue(regionStates.isRegionInState(hri, State.SPLIT));
} finally {
admin.setBalancerRunning(true, false);
cluster.getMaster().setCatalogJanitorEnabled(true);
}
}
@Test
public void testStoreFileReferenceCreationWhenSplitPolicySaysToSkipRangeCheck()
throws Exception {
final TableName tableName = TableName.valueOf(name.getMethodName());
try {
HTableDescriptor htd = new HTableDescriptor(tableName);
htd.addFamily(new HColumnDescriptor("f"));
htd.addFamily(new HColumnDescriptor("i_f"));
htd.setRegionSplitPolicyClassName(CustomSplitPolicy.class.getName());
admin.createTable(htd);
List<HRegion> regions = awaitTableRegions(tableName);
HRegion region = regions.get(0);
for(int i = 3;i<9;i++) {
Put p = new Put(Bytes.toBytes("row"+i));
p.addColumn(Bytes.toBytes("f"), Bytes.toBytes("q"), Bytes.toBytes("value" + i));
p.addColumn(Bytes.toBytes("i_f"), Bytes.toBytes("q"), Bytes.toBytes("value" + i));
region.put(p);
}
region.flush(true);
Store store = region.getStore(Bytes.toBytes("f"));
Collection<StoreFile> storefiles = store.getStorefiles();
assertEquals(storefiles.size(), 1);
assertFalse(region.hasReferences());
Path referencePath =
region.getRegionFileSystem().splitStoreFile(region.getRegionInfo(), "f",
storefiles.iterator().next(), Bytes.toBytes("row1"), false, region.getSplitPolicy());
assertNull(referencePath);
referencePath =
region.getRegionFileSystem().splitStoreFile(region.getRegionInfo(), "i_f",
storefiles.iterator().next(), Bytes.toBytes("row1"), false, region.getSplitPolicy());
assertNotNull(referencePath);
} finally {
TESTING_UTIL.deleteTable(tableName);
}
}
private HRegion findSplittableRegion(final List<HRegion> regions) throws InterruptedException {
for (int i = 0; i < 5; ++i) {
for (HRegion r: regions) {
if (r.isSplittable() && r.getRegionInfo().getReplicaId() == 0) {
return(r);
}
}
Thread.sleep(100);
}
return(null);
}
private List<HRegion> checkAndGetDaughters(TableName tableName)
throws InterruptedException {
List<HRegion> daughters = null;
// try up to 10s
for (int i=0; i<100; i++) {
daughters = cluster.getRegions(tableName);
if (daughters.size() >= 2) break;
Thread.sleep(100);
}
assertTrue(daughters.size() >= 2);
return daughters;
}
private HMaster abortAndWaitForMaster()
throws IOException, InterruptedException {
cluster.abortMaster(0);
cluster.waitOnMaster(0);
HMaster master = cluster.startMaster().getMaster();
cluster.waitForActiveAndReadyMaster();
return master;
}
private void split(final HRegionInfo hri, final HRegionServer server, final int regionCount)
throws IOException, InterruptedException {
this.admin.splitRegion(hri.getRegionName());
for (int i = 0; this.cluster.getRegions(hri.getTable()).size() <= regionCount && i < 60; i++) {
LOG.debug("Waiting on region " + hri.getRegionNameAsString() + " to split");
Thread.sleep(2000);
}
assertFalse("Waited too long for split",
this.cluster.getRegions(hri.getTable()).size() <= regionCount);
}
/**
* Ensure single table region is not on same server as the single hbase:meta table
* region.
* @param admin
* @param hri
* @return Index of the server hosting the single table region
* @throws UnknownRegionException
* @throws MasterNotRunningException
* @throws org.apache.hadoop.hbase.ZooKeeperConnectionException
* @throws InterruptedException
*/
private int ensureTableRegionNotOnSameServerAsMeta(final Admin admin,
final HRegionInfo hri)
throws IOException, MasterNotRunningException,
ZooKeeperConnectionException, InterruptedException {
// Now make sure that the table region is not on same server as that hosting
// hbase:meta We don't want hbase:meta replay polluting our test when we later crash
// the table region serving server.
int metaServerIndex = cluster.getServerWithMeta();
boolean tablesOnMaster = LoadBalancer.isTablesOnMaster(TESTING_UTIL.getConfiguration());
if (tablesOnMaster) {
// Need to check master is supposed to host meta... perhaps it is not.
throw new UnsupportedOperationException();
// TODO: assertTrue(metaServerIndex == -1); // meta is on master now
}
HRegionServer metaRegionServer = tablesOnMaster?
cluster.getMaster(): cluster.getRegionServer(metaServerIndex);
int tableRegionIndex = cluster.getServerWith(hri.getRegionName());
assertTrue(tableRegionIndex != -1);
HRegionServer tableRegionServer = cluster.getRegionServer(tableRegionIndex);
LOG.info("MetaRegionServer=" + metaRegionServer.getServerName() +
", other=" + tableRegionServer.getServerName());
if (metaRegionServer.getServerName().equals(tableRegionServer.getServerName())) {
HRegionServer hrs = getOtherRegionServer(cluster, metaRegionServer);
assertNotNull(hrs);
assertNotNull(hri);
LOG.info("Moving " + hri.getRegionNameAsString() + " from " +
metaRegionServer.getServerName() + " to " +
hrs.getServerName() + "; metaServerIndex=" + metaServerIndex);
admin.move(hri.getEncodedNameAsBytes(), Bytes.toBytes(hrs.getServerName().toString()));
}
// Wait till table region is up on the server that is NOT carrying hbase:meta.
for (int i = 0; i < 100; i++) {
tableRegionIndex = cluster.getServerWith(hri.getRegionName());
if (tableRegionIndex != -1 && tableRegionIndex != metaServerIndex) break;
LOG.debug("Waiting on region move off the hbase:meta server; current index " +
tableRegionIndex + " and metaServerIndex=" + metaServerIndex);
Thread.sleep(100);
}
assertTrue("Region not moved off hbase:meta server, tableRegionIndex=" + tableRegionIndex,
tableRegionIndex != -1 && tableRegionIndex != metaServerIndex);
// Verify for sure table region is not on same server as hbase:meta
tableRegionIndex = cluster.getServerWith(hri.getRegionName());
assertTrue(tableRegionIndex != -1);
assertNotSame(metaServerIndex, tableRegionIndex);
return tableRegionIndex;
}
/**
* Find regionserver other than the one passed.
* Can't rely on indexes into list of regionservers since crashed servers
* occupy an index.
* @param cluster
* @param notThisOne
* @return A regionserver that is not <code>notThisOne</code> or null if none
* found
*/
private HRegionServer getOtherRegionServer(final MiniHBaseCluster cluster,
final HRegionServer notThisOne) {
for (RegionServerThread rst: cluster.getRegionServerThreads()) {
HRegionServer hrs = rst.getRegionServer();
if (hrs.getServerName().equals(notThisOne.getServerName())) continue;
if (hrs.isStopping() || hrs.isStopped()) continue;
return hrs;
}
return null;
}
private void printOutRegions(final HRegionServer hrs, final String prefix)
throws IOException {
List<HRegionInfo> regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices());
for (HRegionInfo region: regions) {
LOG.info(prefix + region.getRegionNameAsString());
}
}
private void waitUntilRegionServerDead() throws InterruptedException, InterruptedIOException {
// Wait until the master processes the RS shutdown
for (int i=0; (cluster.getMaster().getClusterStatus().getServers().size() > NB_SERVERS
|| cluster.getLiveRegionServerThreads().size() > NB_SERVERS) && i<100; i++) {
LOG.info("Waiting on server to go down");
Thread.sleep(100);
}
assertFalse("Waited too long for RS to die",
cluster.getMaster().getClusterStatus(). getServers().size() > NB_SERVERS
|| cluster.getLiveRegionServerThreads().size() > NB_SERVERS);
}
private void awaitDaughters(TableName tableName, int numDaughters) throws InterruptedException {
// Wait till regions are back on line again.
for (int i = 0; cluster.getRegions(tableName).size() < numDaughters && i < 60; i++) {
LOG.info("Waiting for repair to happen");
Thread.sleep(1000);
}
if (cluster.getRegions(tableName).size() < numDaughters) {
fail("Waiting too long for daughter regions");
}
}
private List<HRegion> awaitTableRegions(final TableName tableName) throws InterruptedException {
List<HRegion> regions = null;
for (int i = 0; i < 100; i++) {
regions = cluster.getRegions(tableName);
if (regions.size() > 0) break;
Thread.sleep(100);
}
return regions;
}
private Table createTableAndWait(TableName tableName, byte[] cf) throws IOException,
InterruptedException {
Table t = TESTING_UTIL.createTable(tableName, cf);
awaitTableRegions(tableName);
assertTrue("Table not online: " + tableName,
cluster.getRegions(tableName).size() != 0);
return t;
}
// Make it public so that JVMClusterUtil can access it.
public static class MyMaster extends HMaster {
public MyMaster(Configuration conf, CoordinatedStateManager cp)
throws IOException, KeeperException,
InterruptedException {
super(conf, cp);
}
@Override
protected RSRpcServices createRpcServices() throws IOException {
return new MyMasterRpcServices(this);
}
}
static class MyMasterRpcServices extends MasterRpcServices {
static AtomicBoolean enabled = new AtomicBoolean(false);
private HMaster myMaster;
public MyMasterRpcServices(HMaster master) throws IOException {
super(master);
myMaster = master;
}
@Override
public ReportRegionStateTransitionResponse reportRegionStateTransition(RpcController c,
ReportRegionStateTransitionRequest req) throws ServiceException {
ReportRegionStateTransitionResponse resp = super.reportRegionStateTransition(c, req);
if (enabled.get() && req.getTransition(0).getTransitionCode().equals(
TransitionCode.READY_TO_SPLIT) && !resp.hasErrorMessage()) {
RegionStates regionStates = myMaster.getAssignmentManager().getRegionStates();
for (RegionStates.RegionStateNode regionState:
regionStates.getRegionsInTransition()) {
/* TODO!!!!
// Find the merging_new region and remove it
if (regionState.isSplittingNew()) {
regionStates.deleteRegion(regionState.getRegion());
}
*/
}
}
return resp;
}
}
static class CustomSplitPolicy extends RegionSplitPolicy {
@Override
protected boolean shouldSplit() {
return true;
}
@Override
public boolean skipStoreFileRangeCheck(String familyName) {
if(familyName.startsWith("i_")) {
return true;
} else {
return false;
}
}
}
}