| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.hbase.master; |
| |
| import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER; |
| import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_wait_for_zk_delete; |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertTrue; |
| import static org.junit.Assert.fail; |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.NavigableSet; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.Future; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.TimeoutException; |
| import java.util.concurrent.atomic.LongAdder; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FSDataOutputStream; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.hbase.HBaseTestingUtility; |
| import org.apache.hadoop.hbase.HConstants; |
| import org.apache.hadoop.hbase.KeyValue; |
| import org.apache.hadoop.hbase.MiniHBaseCluster; |
| import org.apache.hadoop.hbase.ServerName; |
| import org.apache.hadoop.hbase.SplitLogCounters; |
| import org.apache.hadoop.hbase.StartMiniClusterOption; |
| import org.apache.hadoop.hbase.TableName; |
| import org.apache.hadoop.hbase.Waiter; |
| import org.apache.hadoop.hbase.client.Put; |
| import org.apache.hadoop.hbase.client.RegionInfo; |
| import org.apache.hadoop.hbase.client.RegionInfoBuilder; |
| import org.apache.hadoop.hbase.client.RegionLocator; |
| import org.apache.hadoop.hbase.client.Table; |
| import org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination; |
| import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; |
| import org.apache.hadoop.hbase.master.assignment.RegionStates; |
| import org.apache.hadoop.hbase.regionserver.HRegionServer; |
| import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; |
| import org.apache.hadoop.hbase.regionserver.Region; |
| import org.apache.hadoop.hbase.util.Bytes; |
| import org.apache.hadoop.hbase.util.CommonFSUtils; |
| import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread; |
| import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; |
| import org.apache.hadoop.hbase.util.Threads; |
| import org.apache.hadoop.hbase.wal.WAL; |
| import org.apache.hadoop.hbase.wal.WALEdit; |
| import org.apache.hadoop.hbase.wal.WALFactory; |
| import org.apache.hadoop.hbase.wal.WALKeyImpl; |
| import org.apache.hadoop.hbase.zookeeper.ZKUtil; |
| import org.junit.After; |
| import org.junit.AfterClass; |
| import org.junit.Before; |
| import org.junit.BeforeClass; |
| import org.junit.Rule; |
| import org.junit.Test; |
| import org.junit.rules.TestName; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; |
| |
| /** |
| * Base class for testing distributed log splitting. |
| */ |
| public abstract class AbstractTestDLS { |
| private static final Logger LOG = LoggerFactory.getLogger(TestSplitLogManager.class); |
| |
| private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); |
| |
| // Start a cluster with 2 masters and 5 regionservers |
| private static final int NUM_MASTERS = 2; |
| private static final int NUM_RS = 5; |
| private static byte[] COLUMN_FAMILY = Bytes.toBytes("family"); |
| |
| @Rule |
| public TestName testName = new TestName(); |
| |
| private TableName tableName; |
| private MiniHBaseCluster cluster; |
| private HMaster master; |
| private Configuration conf; |
| |
| @Rule |
| public TestName name = new TestName(); |
| |
| @BeforeClass |
| public static void setup() throws Exception { |
| // Uncomment the following line if more verbosity is needed for |
| // debugging (see HBASE-12285 for details). |
| // Logger.getLogger("org.apache.hadoop.hbase").setLevel(Level.DEBUG); |
| TEST_UTIL.startMiniZKCluster(); |
| TEST_UTIL.startMiniDFSCluster(3); |
| } |
| |
| @AfterClass |
| public static void tearDown() throws Exception { |
| TEST_UTIL.shutdownMiniCluster(); |
| } |
| |
| protected abstract String getWalProvider(); |
| |
| private void startCluster(int numRS) throws Exception { |
| SplitLogCounters.resetCounters(); |
| LOG.info("Starting cluster"); |
| conf.setLong("hbase.splitlog.max.resubmit", 0); |
| // Make the failure test faster |
| conf.setInt("zookeeper.recovery.retry", 0); |
| conf.setInt(HConstants.REGIONSERVER_INFO_PORT, -1); |
| conf.setFloat(HConstants.LOAD_BALANCER_SLOP_KEY, (float) 100.0); // no load balancing |
| conf.setInt(HBASE_SPLIT_WAL_MAX_SPLITTER, 3); |
| conf.setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 10); |
| conf.set("hbase.wal.provider", getWalProvider()); |
| StartMiniClusterOption option = StartMiniClusterOption.builder() |
| .numMasters(NUM_MASTERS).numRegionServers(numRS).build(); |
| TEST_UTIL.startMiniHBaseCluster(option); |
| cluster = TEST_UTIL.getHBaseCluster(); |
| LOG.info("Waiting for active/ready master"); |
| cluster.waitForActiveAndReadyMaster(); |
| master = cluster.getMaster(); |
| TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() { |
| @Override |
| public boolean evaluate() throws Exception { |
| return cluster.getLiveRegionServerThreads().size() >= numRS; |
| } |
| }); |
| } |
| |
| @Before |
| public void before() throws Exception { |
| conf = TEST_UTIL.getConfiguration(); |
| tableName = TableName.valueOf(testName.getMethodName()); |
| } |
| |
| @After |
| public void after() throws Exception { |
| TEST_UTIL.shutdownMiniHBaseCluster(); |
| TEST_UTIL.getTestFileSystem().delete(CommonFSUtils.getRootDir(TEST_UTIL.getConfiguration()), |
| true); |
| ZKUtil.deleteNodeRecursively(TEST_UTIL.getZooKeeperWatcher(), "/hbase"); |
| } |
| |
| @Test |
| public void testMasterStartsUpWithLogSplittingWork() throws Exception { |
| conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, NUM_RS - 1); |
| startCluster(NUM_RS); |
| |
| int numRegionsToCreate = 40; |
| int numLogLines = 1000; |
| // turn off load balancing to prevent regions from moving around otherwise |
| // they will consume recovered.edits |
| master.balanceSwitch(false); |
| |
| try (Table ht = installTable(numRegionsToCreate)) { |
| HRegionServer hrs = findRSToKill(false); |
| List<RegionInfo> regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices()); |
| makeWAL(hrs, regions, numLogLines, 100); |
| |
| // abort master |
| abortMaster(cluster); |
| |
| // abort RS |
| LOG.info("Aborting region server: " + hrs.getServerName()); |
| hrs.abort("testing"); |
| |
| // wait for abort completes |
| TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() { |
| @Override |
| public boolean evaluate() throws Exception { |
| return cluster.getLiveRegionServerThreads().size() <= NUM_RS - 1; |
| } |
| }); |
| |
| Thread.sleep(2000); |
| LOG.info("Current Open Regions:" + HBaseTestingUtility.getAllOnlineRegions(cluster).size()); |
| |
| // wait for abort completes |
| TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() { |
| @Override |
| public boolean evaluate() throws Exception { |
| return (HBaseTestingUtility.getAllOnlineRegions(cluster) |
| .size() >= (numRegionsToCreate + 1)); |
| } |
| }); |
| |
| LOG.info("Current Open Regions After Master Node Starts Up:" + |
| HBaseTestingUtility.getAllOnlineRegions(cluster).size()); |
| |
| assertEquals(numLogLines, TEST_UTIL.countRows(ht)); |
| } |
| } |
| |
| @Test |
| public void testThreeRSAbort() throws Exception { |
| LOG.info("testThreeRSAbort"); |
| int numRegionsToCreate = 40; |
| int numRowsPerRegion = 100; |
| |
| startCluster(NUM_RS); // NUM_RS=6. |
| |
| try (Table table = installTable(numRegionsToCreate)) { |
| populateDataInTable(numRowsPerRegion); |
| |
| List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads(); |
| assertEquals(NUM_RS, rsts.size()); |
| cluster.killRegionServer(rsts.get(0).getRegionServer().getServerName()); |
| cluster.killRegionServer(rsts.get(1).getRegionServer().getServerName()); |
| cluster.killRegionServer(rsts.get(2).getRegionServer().getServerName()); |
| |
| TEST_UTIL.waitFor(60000, new Waiter.ExplainingPredicate<Exception>() { |
| |
| @Override |
| public boolean evaluate() throws Exception { |
| return cluster.getLiveRegionServerThreads().size() <= NUM_RS - 3; |
| } |
| |
| @Override |
| public String explainFailure() throws Exception { |
| return "Timed out waiting for server aborts."; |
| } |
| }); |
| TEST_UTIL.waitUntilAllRegionsAssigned(tableName); |
| int rows; |
| try { |
| rows = TEST_UTIL.countRows(table); |
| } catch (Exception e) { |
| Threads.printThreadInfo(System.out, "Thread dump before fail"); |
| throw e; |
| } |
| assertEquals(numRegionsToCreate * numRowsPerRegion, rows); |
| } |
| } |
| |
| @Test |
| public void testDelayedDeleteOnFailure() throws Exception { |
| if (!this.conf.getBoolean(HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK, |
| HConstants.DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK)) { |
| // This test depends on zk coordination.... |
| return; |
| } |
| LOG.info("testDelayedDeleteOnFailure"); |
| startCluster(1); |
| final SplitLogManager slm = master.getMasterWalManager().getSplitLogManager(); |
| final FileSystem fs = master.getMasterFileSystem().getFileSystem(); |
| final Path rootLogDir = |
| new Path(CommonFSUtils.getWALRootDir(conf), HConstants.HREGION_LOGDIR_NAME); |
| final Path logDir = new Path(rootLogDir, ServerName.valueOf("x", 1, 1).toString()); |
| fs.mkdirs(logDir); |
| ExecutorService executor = null; |
| try { |
| final Path corruptedLogFile = new Path(logDir, "x"); |
| FSDataOutputStream out; |
| out = fs.create(corruptedLogFile); |
| out.write(0); |
| out.write(Bytes.toBytes("corrupted bytes")); |
| out.close(); |
| ZKSplitLogManagerCoordination coordination = |
| (ZKSplitLogManagerCoordination) (master.getCoordinatedStateManager()) |
| .getSplitLogManagerCoordination(); |
| coordination.setIgnoreDeleteForTesting(true); |
| executor = Executors.newSingleThreadExecutor(); |
| Runnable runnable = new Runnable() { |
| @Override |
| public void run() { |
| try { |
| // since the logDir is a fake, corrupted one, so the split log worker |
| // will finish it quickly with error, and this call will fail and throw |
| // an IOException. |
| slm.splitLogDistributed(logDir); |
| } catch (IOException ioe) { |
| try { |
| assertTrue(fs.exists(corruptedLogFile)); |
| // this call will block waiting for the task to be removed from the |
| // tasks map which is not going to happen since ignoreZKDeleteForTesting |
| // is set to true, until it is interrupted. |
| slm.splitLogDistributed(logDir); |
| } catch (IOException e) { |
| assertTrue(Thread.currentThread().isInterrupted()); |
| return; |
| } |
| fail("did not get the expected IOException from the 2nd call"); |
| } |
| fail("did not get the expected IOException from the 1st call"); |
| } |
| }; |
| Future<?> result = executor.submit(runnable); |
| try { |
| result.get(2000, TimeUnit.MILLISECONDS); |
| } catch (TimeoutException te) { |
| // it is ok, expected. |
| } |
| waitForCounter(tot_mgr_wait_for_zk_delete, 0, 1, 10000); |
| executor.shutdownNow(); |
| executor = null; |
| |
| // make sure the runnable is finished with no exception thrown. |
| result.get(); |
| } finally { |
| if (executor != null) { |
| // interrupt the thread in case the test fails in the middle. |
| // it has no effect if the thread is already terminated. |
| executor.shutdownNow(); |
| } |
| fs.delete(logDir, true); |
| } |
| } |
| |
| private Table installTable(int nrs) throws Exception { |
| return installTable(nrs, 0); |
| } |
| |
| private Table installTable(int nrs, int existingRegions) throws Exception { |
| // Create a table with regions |
| byte[] family = Bytes.toBytes("family"); |
| LOG.info("Creating table with " + nrs + " regions"); |
| Table table = TEST_UTIL.createMultiRegionTable(tableName, family, nrs); |
| int numRegions = -1; |
| try (RegionLocator r = TEST_UTIL.getConnection().getRegionLocator(tableName)) { |
| numRegions = r.getStartKeys().length; |
| } |
| assertEquals(nrs, numRegions); |
| LOG.info("Waiting for no more RIT\n"); |
| blockUntilNoRIT(); |
| // disable-enable cycle to get rid of table's dead regions left behind |
| // by createMultiRegions |
| assertTrue(TEST_UTIL.getAdmin().isTableEnabled(tableName)); |
| LOG.debug("Disabling table\n"); |
| TEST_UTIL.getAdmin().disableTable(tableName); |
| LOG.debug("Waiting for no more RIT\n"); |
| blockUntilNoRIT(); |
| NavigableSet<String> regions = HBaseTestingUtility.getAllOnlineRegions(cluster); |
| LOG.debug("Verifying only catalog region is assigned\n"); |
| if (regions.size() != 1) { |
| for (String oregion : regions) { |
| LOG.debug("Region still online: " + oregion); |
| } |
| } |
| assertEquals(1 + existingRegions, regions.size()); |
| LOG.debug("Enabling table\n"); |
| TEST_UTIL.getAdmin().enableTable(tableName); |
| LOG.debug("Waiting for no more RIT\n"); |
| blockUntilNoRIT(); |
| LOG.debug("Verifying there are " + numRegions + " assigned on cluster\n"); |
| regions = HBaseTestingUtility.getAllOnlineRegions(cluster); |
| assertEquals(numRegions + 1 + existingRegions, regions.size()); |
| return table; |
| } |
| |
| void populateDataInTable(int nrows) throws Exception { |
| List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads(); |
| assertEquals(NUM_RS, rsts.size()); |
| |
| for (RegionServerThread rst : rsts) { |
| HRegionServer hrs = rst.getRegionServer(); |
| List<RegionInfo> hris = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices()); |
| for (RegionInfo hri : hris) { |
| if (hri.getTable().isSystemTable()) { |
| continue; |
| } |
| LOG.debug( |
| "adding data to rs = " + rst.getName() + " region = " + hri.getRegionNameAsString()); |
| Region region = hrs.getOnlineRegion(hri.getRegionName()); |
| assertTrue(region != null); |
| putData(region, hri.getStartKey(), nrows, Bytes.toBytes("q"), COLUMN_FAMILY); |
| } |
| } |
| |
| for (MasterThread mt : cluster.getLiveMasterThreads()) { |
| HRegionServer hrs = mt.getMaster(); |
| List<RegionInfo> hris; |
| try { |
| hris = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices()); |
| } catch (ServerNotRunningYetException e) { |
| // It's ok: this master may be a backup. Ignored. |
| continue; |
| } |
| for (RegionInfo hri : hris) { |
| if (hri.getTable().isSystemTable()) { |
| continue; |
| } |
| LOG.debug( |
| "adding data to rs = " + mt.getName() + " region = " + hri.getRegionNameAsString()); |
| Region region = hrs.getOnlineRegion(hri.getRegionName()); |
| assertTrue(region != null); |
| putData(region, hri.getStartKey(), nrows, Bytes.toBytes("q"), COLUMN_FAMILY); |
| } |
| } |
| } |
| |
| public void makeWAL(HRegionServer hrs, List<RegionInfo> regions, int num_edits, int edit_size) |
| throws IOException { |
| makeWAL(hrs, regions, num_edits, edit_size, true); |
| } |
| |
| public void makeWAL(HRegionServer hrs, List<RegionInfo> regions, int numEdits, int editSize, |
| boolean cleanShutdown) throws IOException { |
| // remove root and meta region |
| regions.remove(RegionInfoBuilder.FIRST_META_REGIONINFO); |
| |
| for (Iterator<RegionInfo> iter = regions.iterator(); iter.hasNext();) { |
| RegionInfo regionInfo = iter.next(); |
| if (regionInfo.getTable().isSystemTable()) { |
| iter.remove(); |
| } |
| } |
| byte[] value = new byte[editSize]; |
| |
| List<RegionInfo> hris = new ArrayList<>(); |
| for (RegionInfo region : regions) { |
| if (region.getTable() != tableName) { |
| continue; |
| } |
| hris.add(region); |
| } |
| LOG.info("Creating wal edits across " + hris.size() + " regions."); |
| for (int i = 0; i < editSize; i++) { |
| value[i] = (byte) ('a' + (i % 26)); |
| } |
| int n = hris.size(); |
| int[] counts = new int[n]; |
| // sync every ~30k to line up with desired wal rolls |
| final int syncEvery = 30 * 1024 / editSize; |
| MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); |
| if (n > 0) { |
| for (int i = 0; i < numEdits; i += 1) { |
| WALEdit e = new WALEdit(); |
| RegionInfo curRegionInfo = hris.get(i % n); |
| WAL log = hrs.getWAL(curRegionInfo); |
| byte[] startRow = curRegionInfo.getStartKey(); |
| if (startRow == null || startRow.length == 0) { |
| startRow = new byte[] { 0, 0, 0, 0, 1 }; |
| } |
| byte[] row = Bytes.incrementBytes(startRow, counts[i % n]); |
| row = Arrays.copyOfRange(row, 3, 8); // use last 5 bytes because |
| // HBaseTestingUtility.createMultiRegions use 5 bytes key |
| byte[] qualifier = Bytes.toBytes("c" + Integer.toString(i)); |
| e.add(new KeyValue(row, COLUMN_FAMILY, qualifier, System.currentTimeMillis(), value)); |
| log.appendData(curRegionInfo, new WALKeyImpl(curRegionInfo.getEncodedNameAsBytes(), |
| tableName, System.currentTimeMillis(), mvcc), e); |
| if (0 == i % syncEvery) { |
| log.sync(); |
| } |
| counts[i % n] += 1; |
| } |
| } |
| // done as two passes because the regions might share logs. shutdown is idempotent, but sync |
| // will cause errors if done after. |
| for (RegionInfo info : hris) { |
| WAL log = hrs.getWAL(info); |
| log.sync(); |
| } |
| if (cleanShutdown) { |
| for (RegionInfo info : hris) { |
| WAL log = hrs.getWAL(info); |
| log.shutdown(); |
| } |
| } |
| for (int i = 0; i < n; i++) { |
| LOG.info("region " + hris.get(i).getRegionNameAsString() + " has " + counts[i] + " edits"); |
| } |
| return; |
| } |
| |
| private int countWAL(Path log, FileSystem fs, Configuration conf) throws IOException { |
| int count = 0; |
| try (WAL.Reader in = WALFactory.createReader(fs, log, conf)) { |
| WAL.Entry e; |
| while ((e = in.next()) != null) { |
| if (!WALEdit.isMetaEditFamily(e.getEdit().getCells().get(0))) { |
| count++; |
| } |
| } |
| } |
| return count; |
| } |
| |
| private void blockUntilNoRIT() throws Exception { |
| TEST_UTIL.waitUntilNoRegionsInTransition(60000); |
| } |
| |
| private void putData(Region region, byte[] startRow, int numRows, byte[] qf, byte[]... families) |
| throws IOException { |
| for (int i = 0; i < numRows; i++) { |
| Put put = new Put(Bytes.add(startRow, Bytes.toBytes(i))); |
| for (byte[] family : families) { |
| put.addColumn(family, qf, null); |
| } |
| region.put(put); |
| } |
| } |
| |
| private void waitForCounter(LongAdder ctr, long oldval, long newval, long timems) |
| throws InterruptedException { |
| long curt = System.currentTimeMillis(); |
| long endt = curt + timems; |
| while (curt < endt) { |
| if (ctr.sum() == oldval) { |
| Thread.sleep(100); |
| curt = System.currentTimeMillis(); |
| } else { |
| assertEquals(newval, ctr.sum()); |
| return; |
| } |
| } |
| fail(); |
| } |
| |
| private void abortMaster(MiniHBaseCluster cluster) throws InterruptedException { |
| for (MasterThread mt : cluster.getLiveMasterThreads()) { |
| if (mt.getMaster().isActiveMaster()) { |
| mt.getMaster().abort("Aborting for tests", new Exception("Trace info")); |
| mt.join(); |
| break; |
| } |
| } |
| LOG.debug("Master is aborted"); |
| } |
| |
| /** |
| * Find a RS that has regions of a table. |
| * @param hasMetaRegion when true, the returned RS has hbase:meta region as well |
| */ |
| private HRegionServer findRSToKill(boolean hasMetaRegion) throws Exception { |
| List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads(); |
| List<RegionInfo> regions = null; |
| HRegionServer hrs = null; |
| |
| for (RegionServerThread rst : rsts) { |
| hrs = rst.getRegionServer(); |
| while (rst.isAlive() && !hrs.isOnline()) { |
| Thread.sleep(100); |
| } |
| if (!rst.isAlive()) { |
| continue; |
| } |
| boolean isCarryingMeta = false; |
| boolean foundTableRegion = false; |
| regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices()); |
| for (RegionInfo region : regions) { |
| if (region.isMetaRegion()) { |
| isCarryingMeta = true; |
| } |
| if (region.getTable() == tableName) { |
| foundTableRegion = true; |
| } |
| if (foundTableRegion && (isCarryingMeta || !hasMetaRegion)) { |
| break; |
| } |
| } |
| if (isCarryingMeta && hasMetaRegion) { |
| // clients ask for a RS with META |
| if (!foundTableRegion) { |
| HRegionServer destRS = hrs; |
| // the RS doesn't have regions of the specified table so we need move one to this RS |
| List<RegionInfo> tableRegions = TEST_UTIL.getAdmin().getRegions(tableName); |
| RegionInfo hri = tableRegions.get(0); |
| TEST_UTIL.getAdmin().move(hri.getEncodedNameAsBytes(), destRS.getServerName()); |
| // wait for region move completes |
| RegionStates regionStates = |
| TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates(); |
| TEST_UTIL.waitFor(45000, 200, new Waiter.Predicate<Exception>() { |
| @Override |
| public boolean evaluate() throws Exception { |
| ServerName sn = regionStates.getRegionServerOfRegion(hri); |
| return (sn != null && sn.equals(destRS.getServerName())); |
| } |
| }); |
| } |
| return hrs; |
| } else if (hasMetaRegion || isCarryingMeta) { |
| continue; |
| } |
| if (foundTableRegion) { |
| break; |
| } |
| } |
| |
| return hrs; |
| } |
| } |