| /** |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.hbase.master; |
| |
| import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_wait_for_zk_delete; |
| import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_final_transition_failed; |
| import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_preempt_task; |
| import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_task_acquired; |
| import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_task_done; |
| import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_task_err; |
| import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_task_resigned; |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertFalse; |
| import static org.junit.Assert.assertTrue; |
| import static org.junit.Assert.fail; |
| |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.NavigableSet; |
| import java.util.Set; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.Future; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.TimeoutException; |
| import java.util.concurrent.atomic.AtomicLong; |
| |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FSDataOutputStream; |
| import org.apache.hadoop.fs.FileStatus; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.fs.PathFilter; |
| import org.apache.hadoop.hbase.HBaseConfiguration; |
| import org.apache.hadoop.hbase.HBaseTestingUtility; |
| import org.apache.hadoop.hbase.HColumnDescriptor; |
| import org.apache.hadoop.hbase.HConstants; |
| import org.apache.hadoop.hbase.HRegionInfo; |
| import org.apache.hadoop.hbase.HTableDescriptor; |
| import org.apache.hadoop.hbase.KeyValue; |
| import org.apache.hadoop.hbase.MiniHBaseCluster; |
| import org.apache.hadoop.hbase.NamespaceDescriptor; |
| import org.apache.hadoop.hbase.ServerName; |
| import org.apache.hadoop.hbase.SplitLogCounters; |
| import org.apache.hadoop.hbase.TableName; |
| import org.apache.hadoop.hbase.Waiter; |
| import org.apache.hadoop.hbase.client.ClusterConnection; |
| import org.apache.hadoop.hbase.client.CompactionState; |
| import org.apache.hadoop.hbase.client.ConnectionUtils; |
| import org.apache.hadoop.hbase.client.Delete; |
| import org.apache.hadoop.hbase.client.Get; |
| import org.apache.hadoop.hbase.client.Increment; |
| import org.apache.hadoop.hbase.client.NonceGenerator; |
| import org.apache.hadoop.hbase.client.PerClientRandomNonceGenerator; |
| import org.apache.hadoop.hbase.client.Put; |
| import org.apache.hadoop.hbase.client.RegionLocator; |
| import org.apache.hadoop.hbase.client.Result; |
| import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException; |
| import org.apache.hadoop.hbase.client.Table; |
| import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager; |
| import org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination; |
| import org.apache.hadoop.hbase.exceptions.OperationConflictException; |
| import org.apache.hadoop.hbase.exceptions.RegionInRecoveryException; |
| import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; |
| import org.apache.hadoop.hbase.master.SplitLogManager.TaskBatch; |
| import org.apache.hadoop.hbase.protobuf.ProtobufUtil; |
| import org.apache.hadoop.hbase.regionserver.HRegion; |
| import org.apache.hadoop.hbase.regionserver.HRegionServer; |
| import org.apache.hadoop.hbase.regionserver.Region; |
| import org.apache.hadoop.hbase.regionserver.wal.HLogKey; |
| import org.apache.hadoop.hbase.regionserver.wal.WALEdit; |
| import org.apache.hadoop.hbase.testclassification.LargeTests; |
| import org.apache.hadoop.hbase.testclassification.MasterTests; |
| import org.apache.hadoop.hbase.util.Bytes; |
| import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; |
| import org.apache.hadoop.hbase.util.FSUtils; |
| import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread; |
| import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; |
| import org.apache.hadoop.hbase.util.Threads; |
| import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; |
| import org.apache.hadoop.hbase.wal.FSHLogProvider; |
| import org.apache.hadoop.hbase.wal.WAL; |
| import org.apache.hadoop.hbase.wal.WALFactory; |
| import org.apache.hadoop.hbase.wal.WALSplitter; |
| import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; |
| import org.apache.hadoop.hbase.zookeeper.ZKUtil; |
| import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; |
| import org.apache.hadoop.hdfs.MiniDFSCluster; |
| import org.junit.After; |
| import org.junit.AfterClass; |
| import org.junit.Assert; |
| import org.junit.Before; |
| import org.junit.BeforeClass; |
| import org.junit.Ignore; |
| import org.junit.Test; |
| import org.junit.experimental.categories.Category; |
| |
| @Category({MasterTests.class, LargeTests.class}) |
| @SuppressWarnings("deprecation") |
| public class TestDistributedLogSplitting { |
| private static final Log LOG = LogFactory.getLog(TestSplitLogManager.class); |
| static { |
| // Uncomment the following line if more verbosity is needed for |
| // debugging (see HBASE-12285 for details). |
| //Logger.getLogger("org.apache.hadoop.hbase").setLevel(Level.DEBUG); |
| |
| // test ThreeRSAbort fails under hadoop2 (2.0.2-alpha) if shortcircuit-read (scr) is on. this |
| // turns it off for this test. TODO: Figure out why scr breaks recovery. |
| System.setProperty("hbase.tests.use.shortcircuit.reads", "false"); |
| |
| } |
| |
| // Start a cluster with 2 masters and 6 regionservers |
| static final int NUM_MASTERS = 2; |
| static final int NUM_RS = 5; |
| |
| MiniHBaseCluster cluster; |
| HMaster master; |
| Configuration conf; |
| static Configuration originalConf; |
| static HBaseTestingUtility TEST_UTIL; |
| static MiniDFSCluster dfsCluster; |
| static MiniZooKeeperCluster zkCluster; |
| |
| @BeforeClass |
| public static void setup() throws Exception { |
| TEST_UTIL = new HBaseTestingUtility(HBaseConfiguration.create()); |
| dfsCluster = TEST_UTIL.startMiniDFSCluster(1); |
| zkCluster = TEST_UTIL.startMiniZKCluster(); |
| originalConf = TEST_UTIL.getConfiguration(); |
| } |
| |
| @AfterClass |
| public static void tearDown() throws IOException { |
| TEST_UTIL.shutdownMiniZKCluster(); |
| TEST_UTIL.shutdownMiniDFSCluster(); |
| TEST_UTIL.shutdownMiniHBaseCluster(); |
| } |
| |
| private void startCluster(int num_rs) throws Exception { |
| SplitLogCounters.resetCounters(); |
| LOG.info("Starting cluster"); |
| conf.getLong("hbase.splitlog.max.resubmit", 0); |
| // Make the failure test faster |
| conf.setInt("zookeeper.recovery.retry", 0); |
| conf.setInt(HConstants.REGIONSERVER_INFO_PORT, -1); |
| conf.setFloat(HConstants.LOAD_BALANCER_SLOP_KEY, (float) 100.0); // no load balancing |
| conf.setInt("hbase.regionserver.wal.max.splitters", 3); |
| conf.setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 10); |
| TEST_UTIL.shutdownMiniHBaseCluster(); |
| TEST_UTIL = new HBaseTestingUtility(conf); |
| TEST_UTIL.setDFSCluster(dfsCluster); |
| TEST_UTIL.setZkCluster(zkCluster); |
| TEST_UTIL.startMiniHBaseCluster(NUM_MASTERS, num_rs); |
| cluster = TEST_UTIL.getHBaseCluster(); |
| LOG.info("Waiting for active/ready master"); |
| cluster.waitForActiveAndReadyMaster(); |
| master = cluster.getMaster(); |
| while (cluster.getLiveRegionServerThreads().size() < num_rs) { |
| Threads.sleep(10); |
| } |
| } |
| |
| @Before |
| public void before() throws Exception { |
| // refresh configuration |
| conf = HBaseConfiguration.create(originalConf); |
| } |
| |
| @After |
| public void after() throws Exception { |
| try { |
| if (TEST_UTIL.getHBaseCluster() != null) { |
| for (MasterThread mt : TEST_UTIL.getHBaseCluster().getLiveMasterThreads()) { |
| mt.getMaster().abort("closing...", null); |
| } |
| } |
| TEST_UTIL.shutdownMiniHBaseCluster(); |
| } finally { |
| TEST_UTIL.getTestFileSystem().delete(FSUtils.getRootDir(TEST_UTIL.getConfiguration()), true); |
| ZKUtil.deleteNodeRecursively(TEST_UTIL.getZooKeeperWatcher(), "/hbase"); |
| } |
| } |
| |
| @Ignore("DLR is broken by HBASE-12751") @Test (timeout=300000) |
| public void testRecoveredEdits() throws Exception { |
| LOG.info("testRecoveredEdits"); |
| conf.setLong("hbase.regionserver.hlog.blocksize", 30 * 1024); // create more than one wal |
| conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false); |
| startCluster(NUM_RS); |
| |
| final int NUM_LOG_LINES = 1000; |
| final SplitLogManager slm = master.getMasterWalManager().getSplitLogManager(); |
| // turn off load balancing to prevent regions from moving around otherwise |
| // they will consume recovered.edits |
| master.balanceSwitch(false); |
| FileSystem fs = master.getMasterFileSystem().getFileSystem(); |
| |
| List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads(); |
| |
| Path rootdir = FSUtils.getRootDir(conf); |
| |
| Table t = installTable(new ZooKeeperWatcher(conf, "table-creation", null), |
| "table", "family", 40); |
| try { |
| TableName table = t.getName(); |
| List<HRegionInfo> regions = null; |
| HRegionServer hrs = null; |
| for (int i = 0; i < NUM_RS; i++) { |
| boolean foundRs = false; |
| hrs = rsts.get(i).getRegionServer(); |
| regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices()); |
| for (HRegionInfo region : regions) { |
| if (region.getTable().getNameAsString().equalsIgnoreCase("table")) { |
| foundRs = true; |
| break; |
| } |
| } |
| if (foundRs) break; |
| } |
| final Path logDir = new Path(rootdir, AbstractFSWALProvider.getWALDirectoryName(hrs |
| .getServerName().toString())); |
| |
| LOG.info("#regions = " + regions.size()); |
| Iterator<HRegionInfo> it = regions.iterator(); |
| while (it.hasNext()) { |
| HRegionInfo region = it.next(); |
| if (region.getTable().getNamespaceAsString() |
| .equals(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR)) { |
| it.remove(); |
| } |
| } |
| |
| makeWAL(hrs, regions, "table", "family", NUM_LOG_LINES, 100); |
| |
| slm.splitLogDistributed(logDir); |
| |
| int count = 0; |
| for (HRegionInfo hri : regions) { |
| |
| Path tdir = FSUtils.getTableDir(rootdir, table); |
| Path editsdir = |
| WALSplitter.getRegionDirRecoveredEditsDir( |
| HRegion.getRegionDir(tdir, hri.getEncodedName())); |
| LOG.debug("checking edits dir " + editsdir); |
| FileStatus[] files = fs.listStatus(editsdir, new PathFilter() { |
| @Override |
| public boolean accept(Path p) { |
| if (WALSplitter.isSequenceIdFile(p)) { |
| return false; |
| } |
| return true; |
| } |
| }); |
| assertTrue( |
| "edits dir should have more than a single file in it. instead has " + files.length, |
| files.length > 1); |
| for (int i = 0; i < files.length; i++) { |
| int c = countWAL(files[i].getPath(), fs, conf); |
| count += c; |
| } |
| LOG.info(count + " edits in " + files.length + " recovered edits files."); |
| } |
| |
| // check that the log file is moved |
| assertFalse(fs.exists(logDir)); |
| |
| assertEquals(NUM_LOG_LINES, count); |
| } finally { |
| if (t != null) t.close(); |
| } |
| } |
| |
| @Ignore("DLR is broken by HBASE-12751") @Test(timeout = 300000) |
| public void testLogReplayWithNonMetaRSDown() throws Exception { |
| LOG.info("testLogReplayWithNonMetaRSDown"); |
| conf.setLong("hbase.regionserver.hlog.blocksize", 30 * 1024); // create more than one wal |
| conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true); |
| startCluster(NUM_RS); |
| final int NUM_REGIONS_TO_CREATE = 40; |
| final int NUM_LOG_LINES = 1000; |
| // turn off load balancing to prevent regions from moving around otherwise |
| // they will consume recovered.edits |
| master.balanceSwitch(false); |
| |
| final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null); |
| Table ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE); |
| try { |
| HRegionServer hrs = findRSToKill(false, "table"); |
| List<HRegionInfo> regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices()); |
| makeWAL(hrs, regions, "table", "family", NUM_LOG_LINES, 100); |
| |
| // wait for abort completes |
| this.abortRSAndVerifyRecovery(hrs, ht, zkw, NUM_REGIONS_TO_CREATE, NUM_LOG_LINES); |
| } finally { |
| if (ht != null) ht.close(); |
| if (zkw != null) zkw.close(); |
| } |
| } |
| |
| private static class NonceGeneratorWithDups extends PerClientRandomNonceGenerator { |
| private boolean isDups = false; |
| private LinkedList<Long> nonces = new LinkedList<Long>(); |
| |
| public void startDups() { |
| isDups = true; |
| } |
| |
| @Override |
| public long newNonce() { |
| long nonce = isDups ? nonces.removeFirst() : super.newNonce(); |
| if (!isDups) { |
| nonces.add(nonce); |
| } |
| return nonce; |
| } |
| } |
| |
| @Ignore("DLR is broken by HBASE-12751") @Test(timeout = 300000) |
| public void testNonceRecovery() throws Exception { |
| LOG.info("testNonceRecovery"); |
| final String TABLE_NAME = "table"; |
| final String FAMILY_NAME = "family"; |
| final int NUM_REGIONS_TO_CREATE = 40; |
| |
| conf.setLong("hbase.regionserver.hlog.blocksize", 100*1024); |
| conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true); |
| startCluster(NUM_RS); |
| master.balanceSwitch(false); |
| |
| final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null); |
| Table ht = installTable(zkw, TABLE_NAME, FAMILY_NAME, NUM_REGIONS_TO_CREATE); |
| NonceGeneratorWithDups ng = new NonceGeneratorWithDups(); |
| NonceGenerator oldNg = |
| ConnectionUtils.injectNonceGeneratorForTesting( |
| (ClusterConnection)TEST_UTIL.getConnection(), ng); |
| |
| try { |
| List<Increment> reqs = new ArrayList<Increment>(); |
| for (RegionServerThread rst : cluster.getLiveRegionServerThreads()) { |
| HRegionServer hrs = rst.getRegionServer(); |
| List<HRegionInfo> hris = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices()); |
| for (HRegionInfo hri : hris) { |
| if (TABLE_NAME.equalsIgnoreCase(hri.getTable().getNameAsString())) { |
| byte[] key = hri.getStartKey(); |
| if (key == null || key.length == 0) { |
| key = Bytes.copy(hri.getEndKey()); |
| --(key[key.length - 1]); |
| } |
| Increment incr = new Increment(key); |
| incr.addColumn(Bytes.toBytes(FAMILY_NAME), Bytes.toBytes("q"), 1); |
| ht.increment(incr); |
| reqs.add(incr); |
| } |
| } |
| } |
| |
| HRegionServer hrs = findRSToKill(false, "table"); |
| abortRSAndWaitForRecovery(hrs, zkw, NUM_REGIONS_TO_CREATE); |
| ng.startDups(); |
| for (Increment incr : reqs) { |
| try { |
| ht.increment(incr); |
| fail("should have thrown"); |
| } catch (OperationConflictException ope) { |
| LOG.debug("Caught as expected: " + ope.getMessage()); |
| } |
| } |
| } finally { |
| ConnectionUtils.injectNonceGeneratorForTesting((ClusterConnection) |
| TEST_UTIL.getConnection(), oldNg); |
| if (ht != null) ht.close(); |
| if (zkw != null) zkw.close(); |
| } |
| } |
| |
| @Ignore("DLR is broken by HBASE-12751") @Test(timeout = 300000) |
| public void testLogReplayWithMetaRSDown() throws Exception { |
| LOG.info("testRecoveredEditsReplayWithMetaRSDown"); |
| conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true); |
| startCluster(NUM_RS); |
| final int NUM_REGIONS_TO_CREATE = 40; |
| final int NUM_LOG_LINES = 1000; |
| // turn off load balancing to prevent regions from moving around otherwise |
| // they will consume recovered.edits |
| master.balanceSwitch(false); |
| |
| final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null); |
| Table ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE); |
| try { |
| HRegionServer hrs = findRSToKill(true, "table"); |
| List<HRegionInfo> regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices()); |
| makeWAL(hrs, regions, "table", "family", NUM_LOG_LINES, 100); |
| |
| this.abortRSAndVerifyRecovery(hrs, ht, zkw, NUM_REGIONS_TO_CREATE, NUM_LOG_LINES); |
| } finally { |
| if (ht != null) ht.close(); |
| if (zkw != null) zkw.close(); |
| } |
| } |
| |
| private void abortRSAndVerifyRecovery(HRegionServer hrs, Table ht, final ZooKeeperWatcher zkw, |
| final int numRegions, final int numofLines) throws Exception { |
| |
| abortRSAndWaitForRecovery(hrs, zkw, numRegions); |
| assertEquals(numofLines, TEST_UTIL.countRows(ht)); |
| } |
| |
| private void abortRSAndWaitForRecovery(HRegionServer hrs, final ZooKeeperWatcher zkw, |
| final int numRegions) throws Exception { |
| final MiniHBaseCluster tmpCluster = this.cluster; |
| |
| // abort RS |
| LOG.info("Aborting region server: " + hrs.getServerName()); |
| hrs.abort("testing"); |
| |
| // wait for abort completes |
| TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() { |
| @Override |
| public boolean evaluate() throws Exception { |
| return (tmpCluster.getLiveRegionServerThreads().size() <= (NUM_RS - 1)); |
| } |
| }); |
| |
| // wait for regions come online |
| TEST_UTIL.waitFor(180000, 200, new Waiter.Predicate<Exception>() { |
| @Override |
| public boolean evaluate() throws Exception { |
| return (HBaseTestingUtility.getAllOnlineRegions(tmpCluster).size() |
| >= (numRegions + 1)); |
| } |
| }); |
| |
| // wait for all regions are fully recovered |
| TEST_UTIL.waitFor(180000, 200, new Waiter.Predicate<Exception>() { |
| @Override |
| public boolean evaluate() throws Exception { |
| List<String> recoveringRegions = zkw.getRecoverableZooKeeper().getChildren( |
| zkw.recoveringRegionsZNode, false); |
| return (recoveringRegions != null && recoveringRegions.size() == 0); |
| } |
| }); |
| } |
| |
| @Ignore("DLR is broken by HBASE-12751") @Test(timeout = 300000) |
| public void testMasterStartsUpWithLogSplittingWork() throws Exception { |
| LOG.info("testMasterStartsUpWithLogSplittingWork"); |
| conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false); |
| conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, NUM_RS - 1); |
| startCluster(NUM_RS); |
| |
| final int NUM_REGIONS_TO_CREATE = 40; |
| final int NUM_LOG_LINES = 1000; |
| // turn off load balancing to prevent regions from moving around otherwise |
| // they will consume recovered.edits |
| master.balanceSwitch(false); |
| |
| final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null); |
| Table ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE); |
| try { |
| HRegionServer hrs = findRSToKill(false, "table"); |
| List<HRegionInfo> regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices()); |
| makeWAL(hrs, regions, "table", "family", NUM_LOG_LINES, 100); |
| |
| // abort master |
| abortMaster(cluster); |
| |
| // abort RS |
| LOG.info("Aborting region server: " + hrs.getServerName()); |
| hrs.abort("testing"); |
| |
| // wait for abort completes |
| TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() { |
| @Override |
| public boolean evaluate() throws Exception { |
| return (cluster.getLiveRegionServerThreads().size() <= (NUM_RS - 1)); |
| } |
| }); |
| |
| Thread.sleep(2000); |
| LOG.info("Current Open Regions:" |
| + HBaseTestingUtility.getAllOnlineRegions(cluster).size()); |
| |
| // wait for abort completes |
| TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() { |
| @Override |
| public boolean evaluate() throws Exception { |
| return (HBaseTestingUtility.getAllOnlineRegions(cluster).size() |
| >= (NUM_REGIONS_TO_CREATE + 1)); |
| } |
| }); |
| |
| LOG.info("Current Open Regions After Master Node Starts Up:" |
| + HBaseTestingUtility.getAllOnlineRegions(cluster).size()); |
| |
| assertEquals(NUM_LOG_LINES, TEST_UTIL.countRows(ht)); |
| } finally { |
| if (ht != null) ht.close(); |
| if (zkw != null) zkw.close(); |
| } |
| } |
| |
| @Ignore("DLR is broken by HBASE-12751") @Test(timeout = 300000) |
| public void testMasterStartsUpWithLogReplayWork() throws Exception { |
| LOG.info("testMasterStartsUpWithLogReplayWork"); |
| conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true); |
| conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, NUM_RS - 1); |
| startCluster(NUM_RS); |
| |
| final int NUM_REGIONS_TO_CREATE = 40; |
| final int NUM_LOG_LINES = 1000; |
| // turn off load balancing to prevent regions from moving around otherwise |
| // they will consume recovered.edits |
| master.balanceSwitch(false); |
| |
| final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null); |
| Table ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE); |
| try { |
| HRegionServer hrs = findRSToKill(false, "table"); |
| List<HRegionInfo> regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices()); |
| makeWAL(hrs, regions, "table", "family", NUM_LOG_LINES, 100); |
| |
| // abort master |
| abortMaster(cluster); |
| |
| // abort RS |
| LOG.info("Aborting region server: " + hrs.getServerName()); |
| hrs.abort("testing"); |
| |
| // wait for the RS dies |
| TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() { |
| @Override |
| public boolean evaluate() throws Exception { |
| return (cluster.getLiveRegionServerThreads().size() <= (NUM_RS - 1)); |
| } |
| }); |
| |
| Thread.sleep(2000); |
| LOG.info("Current Open Regions:" + HBaseTestingUtility.getAllOnlineRegions(cluster).size()); |
| |
| // wait for all regions are fully recovered |
| TEST_UTIL.waitFor(180000, 200, new Waiter.Predicate<Exception>() { |
| @Override |
| public boolean evaluate() throws Exception { |
| List<String> recoveringRegions = zkw.getRecoverableZooKeeper().getChildren( |
| zkw.recoveringRegionsZNode, false); |
| boolean done = recoveringRegions != null && recoveringRegions.size() == 0; |
| if (!done) { |
| LOG.info("Recovering regions: " + recoveringRegions); |
| } |
| return done; |
| } |
| }); |
| |
| LOG.info("Current Open Regions After Master Node Starts Up:" |
| + HBaseTestingUtility.getAllOnlineRegions(cluster).size()); |
| |
| assertEquals(NUM_LOG_LINES, TEST_UTIL.countRows(ht)); |
| } finally { |
| if (ht != null) ht.close(); |
| if (zkw != null) zkw.close(); |
| } |
| } |
| |
| |
| @Ignore("DLR is broken by HBASE-12751") @Test(timeout = 300000) |
| public void testLogReplayTwoSequentialRSDown() throws Exception { |
| LOG.info("testRecoveredEditsReplayTwoSequentialRSDown"); |
| conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true); |
| startCluster(NUM_RS); |
| final int NUM_REGIONS_TO_CREATE = 40; |
| final int NUM_LOG_LINES = 1000; |
| // turn off load balancing to prevent regions from moving around otherwise |
| // they will consume recovered.edits |
| master.balanceSwitch(false); |
| |
| List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads(); |
| final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null); |
| Table ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE); |
| try { |
| List<HRegionInfo> regions = null; |
| HRegionServer hrs1 = findRSToKill(false, "table"); |
| regions = ProtobufUtil.getOnlineRegions(hrs1.getRSRpcServices()); |
| |
| makeWAL(hrs1, regions, "table", "family", NUM_LOG_LINES, 100); |
| |
| // abort RS1 |
| LOG.info("Aborting region server: " + hrs1.getServerName()); |
| hrs1.abort("testing"); |
| |
| // wait for abort completes |
| TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() { |
| @Override |
| public boolean evaluate() throws Exception { |
| return (cluster.getLiveRegionServerThreads().size() <= (NUM_RS - 1)); |
| } |
| }); |
| |
| // wait for regions come online |
| TEST_UTIL.waitFor(180000, 200, new Waiter.Predicate<Exception>() { |
| @Override |
| public boolean evaluate() throws Exception { |
| return (HBaseTestingUtility.getAllOnlineRegions(cluster).size() |
| >= (NUM_REGIONS_TO_CREATE + 1)); |
| } |
| }); |
| |
| // sleep a little bit in order to interrupt recovering in the middle |
| Thread.sleep(300); |
| // abort second region server |
| rsts = cluster.getLiveRegionServerThreads(); |
| HRegionServer hrs2 = rsts.get(0).getRegionServer(); |
| LOG.info("Aborting one more region server: " + hrs2.getServerName()); |
| hrs2.abort("testing"); |
| |
| // wait for abort completes |
| TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() { |
| @Override |
| public boolean evaluate() throws Exception { |
| return (cluster.getLiveRegionServerThreads().size() <= (NUM_RS - 2)); |
| } |
| }); |
| |
| // wait for regions come online |
| TEST_UTIL.waitFor(180000, 200, new Waiter.Predicate<Exception>() { |
| @Override |
| public boolean evaluate() throws Exception { |
| return (HBaseTestingUtility.getAllOnlineRegions(cluster).size() |
| >= (NUM_REGIONS_TO_CREATE + 1)); |
| } |
| }); |
| |
| // wait for all regions are fully recovered |
| TEST_UTIL.waitFor(180000, 200, new Waiter.Predicate<Exception>() { |
| @Override |
| public boolean evaluate() throws Exception { |
| List<String> recoveringRegions = zkw.getRecoverableZooKeeper().getChildren( |
| zkw.recoveringRegionsZNode, false); |
| return (recoveringRegions != null && recoveringRegions.size() == 0); |
| } |
| }); |
| |
| assertEquals(NUM_LOG_LINES, TEST_UTIL.countRows(ht)); |
| } finally { |
| if (ht != null) ht.close(); |
| if (zkw != null) zkw.close(); |
| } |
| } |
| |
| @Ignore("DLR is broken by HBASE-12751") @Test(timeout = 300000) |
| public void testMarkRegionsRecoveringInZK() throws Exception { |
| LOG.info("testMarkRegionsRecoveringInZK"); |
| conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true); |
| startCluster(NUM_RS); |
| master.balanceSwitch(false); |
| List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads(); |
| final ZooKeeperWatcher zkw = master.getZooKeeper(); |
| Table ht = installTable(zkw, "table", "family", 40); |
| try { |
| final SplitLogManager slm = master.getMasterWalManager().getSplitLogManager(); |
| |
| Set<HRegionInfo> regionSet = new HashSet<HRegionInfo>(); |
| HRegionInfo region = null; |
| HRegionServer hrs = null; |
| ServerName firstFailedServer = null; |
| ServerName secondFailedServer = null; |
| for (int i = 0; i < NUM_RS; i++) { |
| hrs = rsts.get(i).getRegionServer(); |
| List<HRegionInfo> regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices()); |
| if (regions.isEmpty()) continue; |
| region = regions.get(0); |
| regionSet.add(region); |
| firstFailedServer = hrs.getServerName(); |
| secondFailedServer = rsts.get((i + 1) % NUM_RS).getRegionServer().getServerName(); |
| break; |
| } |
| |
| slm.markRegionsRecovering(firstFailedServer, regionSet); |
| slm.markRegionsRecovering(secondFailedServer, regionSet); |
| |
| List<String> recoveringRegions = ZKUtil.listChildrenNoWatch(zkw, |
| ZKUtil.joinZNode(zkw.recoveringRegionsZNode, region.getEncodedName())); |
| |
| assertEquals(recoveringRegions.size(), 2); |
| |
| // wait for splitLogWorker to mark them up because there is no WAL files recorded in ZK |
| final HRegionServer tmphrs = hrs; |
| TEST_UTIL.waitFor(60000, 1000, new Waiter.Predicate<Exception>() { |
| @Override |
| public boolean evaluate() throws Exception { |
| return (tmphrs.getRecoveringRegions().size() == 0); |
| } |
| }); |
| } finally { |
| if (ht != null) ht.close(); |
| if (zkw != null) zkw.close(); |
| } |
| } |
| |
| @Ignore("DLR is broken by HBASE-12751") @Test(timeout = 300000) |
| public void testReplayCmd() throws Exception { |
| LOG.info("testReplayCmd"); |
| conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true); |
| startCluster(NUM_RS); |
| final int NUM_REGIONS_TO_CREATE = 40; |
| // turn off load balancing to prevent regions from moving around otherwise |
| // they will consume recovered.edits |
| master.balanceSwitch(false); |
| |
| List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads(); |
| final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null); |
| Table ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE); |
| try { |
| List<HRegionInfo> regions = null; |
| HRegionServer hrs = null; |
| for (int i = 0; i < NUM_RS; i++) { |
| boolean isCarryingMeta = false; |
| hrs = rsts.get(i).getRegionServer(); |
| regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices()); |
| for (HRegionInfo region : regions) { |
| if (region.isMetaRegion()) { |
| isCarryingMeta = true; |
| break; |
| } |
| } |
| if (isCarryingMeta) { |
| continue; |
| } |
| if (regions.size() > 0) break; |
| } |
| |
| this.prepareData(ht, Bytes.toBytes("family"), Bytes.toBytes("c1")); |
| String originalCheckSum = TEST_UTIL.checksumRows(ht); |
| |
| // abort RA and trigger replay |
| abortRSAndWaitForRecovery(hrs, zkw, NUM_REGIONS_TO_CREATE); |
| |
| assertEquals("Data should remain after reopening of regions", originalCheckSum, |
| TEST_UTIL.checksumRows(ht)); |
| } finally { |
| if (ht != null) ht.close(); |
| if (zkw != null) zkw.close(); |
| } |
| } |
| |
| @Ignore("DLR is broken by HBASE-12751") @Test(timeout = 300000) |
| public void testLogReplayForDisablingTable() throws Exception { |
| LOG.info("testLogReplayForDisablingTable"); |
| conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true); |
| startCluster(NUM_RS); |
| final int NUM_REGIONS_TO_CREATE = 40; |
| final int NUM_LOG_LINES = 1000; |
| |
| List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads(); |
| final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null); |
| Table disablingHT = installTable(zkw, "disableTable", "family", NUM_REGIONS_TO_CREATE); |
| Table ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE, NUM_REGIONS_TO_CREATE); |
| try { |
| // turn off load balancing to prevent regions from moving around otherwise |
| // they will consume recovered.edits |
| master.balanceSwitch(false); |
| |
| List<HRegionInfo> regions = null; |
| HRegionServer hrs = null; |
| boolean hasRegionsForBothTables = false; |
| String tableName = null; |
| for (int i = 0; i < NUM_RS; i++) { |
| tableName = null; |
| hasRegionsForBothTables = false; |
| boolean isCarryingSystem = false; |
| hrs = rsts.get(i).getRegionServer(); |
| regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices()); |
| for (HRegionInfo region : regions) { |
| if (region.getTable().isSystemTable()) { |
| isCarryingSystem = true; |
| break; |
| } |
| if (tableName != null && |
| !tableName.equalsIgnoreCase(region.getTable().getNameAsString())) { |
| // make sure that we find a RS has online regions for both "table" and "disableTable" |
| hasRegionsForBothTables = true; |
| break; |
| } else if (tableName == null) { |
| tableName = region.getTable().getNameAsString(); |
| } |
| } |
| if (isCarryingSystem) { |
| continue; |
| } |
| if (hasRegionsForBothTables) { |
| break; |
| } |
| } |
| |
| // make sure we found a good RS |
| Assert.assertTrue(hasRegionsForBothTables); |
| |
| LOG.info("#regions = " + regions.size()); |
| Iterator<HRegionInfo> it = regions.iterator(); |
| while (it.hasNext()) { |
| HRegionInfo region = it.next(); |
| if (region.isMetaTable()) { |
| it.remove(); |
| } |
| } |
| makeWAL(hrs, regions, "disableTable", "family", NUM_LOG_LINES, 100, false); |
| makeWAL(hrs, regions, "table", "family", NUM_LOG_LINES, 100); |
| |
| LOG.info("Disabling table\n"); |
| TEST_UTIL.getHBaseAdmin().disableTable(TableName.valueOf("disableTable")); |
| TEST_UTIL.waitTableDisabled(TableName.valueOf("disableTable").getName()); |
| |
| // abort RS |
| LOG.info("Aborting region server: " + hrs.getServerName()); |
| hrs.abort("testing"); |
| |
| // wait for abort completes |
| TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() { |
| @Override |
| public boolean evaluate() throws Exception { |
| return (cluster.getLiveRegionServerThreads().size() <= (NUM_RS - 1)); |
| } |
| }); |
| |
| // wait for regions come online |
| TEST_UTIL.waitFor(180000, 200, new Waiter.Predicate<Exception>() { |
| @Override |
| public boolean evaluate() throws Exception { |
| return (HBaseTestingUtility.getAllOnlineRegions(cluster).size() |
| >= (NUM_REGIONS_TO_CREATE + 1)); |
| } |
| }); |
| |
| // wait for all regions are fully recovered |
| TEST_UTIL.waitFor(180000, 200, new Waiter.Predicate<Exception>() { |
| @Override |
| public boolean evaluate() throws Exception { |
| List<String> recoveringRegions = zkw.getRecoverableZooKeeper().getChildren( |
| zkw.recoveringRegionsZNode, false); |
| ServerManager serverManager = master.getServerManager(); |
| return (!serverManager.areDeadServersInProgress() && |
| recoveringRegions != null && recoveringRegions.size() == 0); |
| } |
| }); |
| |
| int count = 0; |
| FileSystem fs = master.getMasterFileSystem().getFileSystem(); |
| Path rootdir = FSUtils.getRootDir(conf); |
| Path tdir = FSUtils.getTableDir(rootdir, TableName.valueOf("disableTable")); |
| for (HRegionInfo hri : regions) { |
| Path editsdir = |
| WALSplitter.getRegionDirRecoveredEditsDir( |
| HRegion.getRegionDir(tdir, hri.getEncodedName())); |
| LOG.debug("checking edits dir " + editsdir); |
| if(!fs.exists(editsdir)) continue; |
| FileStatus[] files = fs.listStatus(editsdir, new PathFilter() { |
| @Override |
| public boolean accept(Path p) { |
| if (WALSplitter.isSequenceIdFile(p)) { |
| return false; |
| } |
| return true; |
| } |
| }); |
| if(files != null) { |
| for(FileStatus file : files) { |
| int c = countWAL(file.getPath(), fs, conf); |
| count += c; |
| LOG.info(c + " edits in " + file.getPath()); |
| } |
| } |
| } |
| |
| LOG.info("Verify edits in recovered.edits files"); |
| assertEquals(NUM_LOG_LINES, count); |
| LOG.info("Verify replayed edits"); |
| assertEquals(NUM_LOG_LINES, TEST_UTIL.countRows(ht)); |
| |
| // clean up |
| for (HRegionInfo hri : regions) { |
| Path editsdir = |
| WALSplitter.getRegionDirRecoveredEditsDir( |
| HRegion.getRegionDir(tdir, hri.getEncodedName())); |
| fs.delete(editsdir, true); |
| } |
| disablingHT.close(); |
| } finally { |
| if (ht != null) ht.close(); |
| if (zkw != null) zkw.close(); |
| } |
| } |
| |
| @Ignore("DLR is broken by HBASE-12751") @Test(timeout = 300000) |
| public void testDisallowWritesInRecovering() throws Exception { |
| LOG.info("testDisallowWritesInRecovering"); |
| conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true); |
| conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 3); |
| conf.setBoolean(HConstants.DISALLOW_WRITES_IN_RECOVERING, true); |
| startCluster(NUM_RS); |
| final int NUM_REGIONS_TO_CREATE = 40; |
| // turn off load balancing to prevent regions from moving around otherwise |
| // they will consume recovered.edits |
| master.balanceSwitch(false); |
| |
| List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads(); |
| final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null); |
| Table ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE); |
| try { |
| final SplitLogManager slm = master.getMasterWalManager().getSplitLogManager(); |
| |
| Set<HRegionInfo> regionSet = new HashSet<HRegionInfo>(); |
| HRegionInfo region = null; |
| HRegionServer hrs = null; |
| HRegionServer dstRS = null; |
| for (int i = 0; i < NUM_RS; i++) { |
| hrs = rsts.get(i).getRegionServer(); |
| List<HRegionInfo> regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices()); |
| if (regions.isEmpty()) continue; |
| region = regions.get(0); |
| regionSet.add(region); |
| dstRS = rsts.get((i+1) % NUM_RS).getRegionServer(); |
| break; |
| } |
| |
| slm.markRegionsRecovering(hrs.getServerName(), regionSet); |
| // move region in order for the region opened in recovering state |
| final HRegionInfo hri = region; |
| final HRegionServer tmpRS = dstRS; |
| TEST_UTIL.getHBaseAdmin().move(region.getEncodedNameAsBytes(), |
| Bytes.toBytes(dstRS.getServerName().getServerName())); |
| // wait for region move completes |
| final RegionStates regionStates = |
| TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates(); |
| TEST_UTIL.waitFor(45000, 200, new Waiter.Predicate<Exception>() { |
| @Override |
| public boolean evaluate() throws Exception { |
| ServerName sn = regionStates.getRegionServerOfRegion(hri); |
| return (sn != null && sn.equals(tmpRS.getServerName())); |
| } |
| }); |
| |
| try { |
| byte[] key = region.getStartKey(); |
| if (key == null || key.length == 0) { |
| key = new byte[] { 0, 0, 0, 0, 1 }; |
| } |
| Put put = new Put(key); |
| put.addColumn(Bytes.toBytes("family"), Bytes.toBytes("c1"), new byte[]{'b'}); |
| ht.put(put); |
| } catch (IOException ioe) { |
| Assert.assertTrue(ioe instanceof RetriesExhaustedWithDetailsException); |
| RetriesExhaustedWithDetailsException re = (RetriesExhaustedWithDetailsException) ioe; |
| boolean foundRegionInRecoveryException = false; |
| for (Throwable t : re.getCauses()) { |
| if (t instanceof RegionInRecoveryException) { |
| foundRegionInRecoveryException = true; |
| break; |
| } |
| } |
| Assert.assertTrue( |
| "No RegionInRecoveryException. Following exceptions returned=" + re.getCauses(), |
| foundRegionInRecoveryException); |
| } |
| } finally { |
| if (ht != null) ht.close(); |
| if (ht != null) zkw.close(); |
| } |
| } |
| |
| /** |
| * The original intention of this test was to force an abort of a region |
| * server and to make sure that the failure path in the region servers is |
| * properly evaluated. But it is difficult to ensure that the region server |
| * doesn't finish the log splitting before it aborts. Also now, there is |
| * this code path where the master will preempt the region server when master |
| * detects that the region server has aborted. |
| * @throws Exception |
| */ |
| @Ignore ("Disabled because flakey") @Test (timeout=300000) |
| public void testWorkerAbort() throws Exception { |
| LOG.info("testWorkerAbort"); |
| startCluster(3); |
| final int NUM_LOG_LINES = 10000; |
| final SplitLogManager slm = master.getMasterWalManager().getSplitLogManager(); |
| FileSystem fs = master.getMasterFileSystem().getFileSystem(); |
| |
| final List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads(); |
| HRegionServer hrs = findRSToKill(false, "table"); |
| Path rootdir = FSUtils.getRootDir(conf); |
| final Path logDir = new Path(rootdir, |
| AbstractFSWALProvider.getWALDirectoryName(hrs.getServerName().toString())); |
| |
| Table t = installTable(new ZooKeeperWatcher(conf, "table-creation", null), |
| "table", "family", 40); |
| try { |
| makeWAL(hrs, ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices()), |
| "table", "family", NUM_LOG_LINES, 100); |
| |
| new Thread() { |
| @Override |
| public void run() { |
| waitForCounter(tot_wkr_task_acquired, 0, 1, 1000); |
| for (RegionServerThread rst : rsts) { |
| rst.getRegionServer().abort("testing"); |
| break; |
| } |
| } |
| }.start(); |
| // slm.splitLogDistributed(logDir); |
| FileStatus[] logfiles = fs.listStatus(logDir); |
| TaskBatch batch = new TaskBatch(); |
| slm.enqueueSplitTask(logfiles[0].getPath().toString(), batch); |
| //waitForCounter but for one of the 2 counters |
| long curt = System.currentTimeMillis(); |
| long waitTime = 80000; |
| long endt = curt + waitTime; |
| while (curt < endt) { |
| if ((tot_wkr_task_resigned.get() + tot_wkr_task_err.get() + |
| tot_wkr_final_transition_failed.get() + tot_wkr_task_done.get() + |
| tot_wkr_preempt_task.get()) == 0) { |
| Thread.yield(); |
| curt = System.currentTimeMillis(); |
| } else { |
| assertTrue(1 <= (tot_wkr_task_resigned.get() + tot_wkr_task_err.get() + |
| tot_wkr_final_transition_failed.get() + tot_wkr_task_done.get() + |
| tot_wkr_preempt_task.get())); |
| return; |
| } |
| } |
| fail("none of the following counters went up in " + waitTime + |
| " milliseconds - " + |
| "tot_wkr_task_resigned, tot_wkr_task_err, " + |
| "tot_wkr_final_transition_failed, tot_wkr_task_done, " + |
| "tot_wkr_preempt_task"); |
| } finally { |
| if (t != null) t.close(); |
| } |
| } |
| |
| @Test (timeout=300000) |
| public void testThreeRSAbort() throws Exception { |
| LOG.info("testThreeRSAbort"); |
| final int NUM_REGIONS_TO_CREATE = 40; |
| final int NUM_ROWS_PER_REGION = 100; |
| |
| startCluster(NUM_RS); // NUM_RS=6. |
| |
| final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, |
| "distributed log splitting test", null); |
| |
| Table ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE); |
| try { |
| populateDataInTable(NUM_ROWS_PER_REGION, "family"); |
| |
| |
| List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads(); |
| assertEquals(NUM_RS, rsts.size()); |
| rsts.get(0).getRegionServer().abort("testing"); |
| rsts.get(1).getRegionServer().abort("testing"); |
| rsts.get(2).getRegionServer().abort("testing"); |
| |
| long start = EnvironmentEdgeManager.currentTime(); |
| while (cluster.getLiveRegionServerThreads().size() > (NUM_RS - 3)) { |
| if (EnvironmentEdgeManager.currentTime() - start > 60000) { |
| assertTrue(false); |
| } |
| Thread.sleep(200); |
| } |
| |
| start = EnvironmentEdgeManager.currentTime(); |
| while (HBaseTestingUtility.getAllOnlineRegions(cluster).size() |
| < (NUM_REGIONS_TO_CREATE + 1)) { |
| if (EnvironmentEdgeManager.currentTime() - start > 60000) { |
| assertTrue("Timedout", false); |
| } |
| Thread.sleep(200); |
| } |
| |
| // wait for all regions are fully recovered |
| TEST_UTIL.waitFor(180000, 200, new Waiter.Predicate<Exception>() { |
| @Override |
| public boolean evaluate() throws Exception { |
| List<String> recoveringRegions = zkw.getRecoverableZooKeeper().getChildren( |
| zkw.recoveringRegionsZNode, false); |
| return (recoveringRegions != null && recoveringRegions.size() == 0); |
| } |
| }); |
| |
| assertEquals(NUM_REGIONS_TO_CREATE * NUM_ROWS_PER_REGION, |
| TEST_UTIL.countRows(ht)); |
| } finally { |
| if (ht != null) ht.close(); |
| if (zkw != null) zkw.close(); |
| } |
| } |
| |
| |
| |
| @Test(timeout=30000) |
| public void testDelayedDeleteOnFailure() throws Exception { |
| LOG.info("testDelayedDeleteOnFailure"); |
| startCluster(1); |
| final SplitLogManager slm = master.getMasterWalManager().getSplitLogManager(); |
| final FileSystem fs = master.getMasterFileSystem().getFileSystem(); |
| final Path logDir = new Path(FSUtils.getRootDir(conf), "x"); |
| fs.mkdirs(logDir); |
| ExecutorService executor = null; |
| try { |
| final Path corruptedLogFile = new Path(logDir, "x"); |
| FSDataOutputStream out; |
| out = fs.create(corruptedLogFile); |
| out.write(0); |
| out.write(Bytes.toBytes("corrupted bytes")); |
| out.close(); |
| ZKSplitLogManagerCoordination coordination = |
| (ZKSplitLogManagerCoordination) ((BaseCoordinatedStateManager) master |
| .getCoordinatedStateManager()).getSplitLogManagerCoordination(); |
| coordination.setIgnoreDeleteForTesting(true); |
| executor = Executors.newSingleThreadExecutor(); |
| Runnable runnable = new Runnable() { |
| @Override |
| public void run() { |
| try { |
| // since the logDir is a fake, corrupted one, so the split log worker |
| // will finish it quickly with error, and this call will fail and throw |
| // an IOException. |
| slm.splitLogDistributed(logDir); |
| } catch (IOException ioe) { |
| try { |
| assertTrue(fs.exists(corruptedLogFile)); |
| // this call will block waiting for the task to be removed from the |
| // tasks map which is not going to happen since ignoreZKDeleteForTesting |
| // is set to true, until it is interrupted. |
| slm.splitLogDistributed(logDir); |
| } catch (IOException e) { |
| assertTrue(Thread.currentThread().isInterrupted()); |
| return; |
| } |
| fail("did not get the expected IOException from the 2nd call"); |
| } |
| fail("did not get the expected IOException from the 1st call"); |
| } |
| }; |
| Future<?> result = executor.submit(runnable); |
| try { |
| result.get(2000, TimeUnit.MILLISECONDS); |
| } catch (TimeoutException te) { |
| // it is ok, expected. |
| } |
| waitForCounter(tot_mgr_wait_for_zk_delete, 0, 1, 10000); |
| executor.shutdownNow(); |
| executor = null; |
| |
| // make sure the runnable is finished with no exception thrown. |
| result.get(); |
| } finally { |
| if (executor != null) { |
| // interrupt the thread in case the test fails in the middle. |
| // it has no effect if the thread is already terminated. |
| executor.shutdownNow(); |
| } |
| fs.delete(logDir, true); |
| } |
| } |
| |
| @Ignore("DLR is broken by HBASE-12751") @Test(timeout = 300000) |
| public void testMetaRecoveryInZK() throws Exception { |
| LOG.info("testMetaRecoveryInZK"); |
| conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true); |
| startCluster(NUM_RS); |
| |
| // turn off load balancing to prevent regions from moving around otherwise |
| // they will consume recovered.edits |
| master.balanceSwitch(false); |
| final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null); |
| |
| // only testing meta recovery in ZK operation |
| HRegionServer hrs = findRSToKill(true, null); |
| List<HRegionInfo> regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices()); |
| |
| LOG.info("#regions = " + regions.size()); |
| Set<HRegionInfo> tmpRegions = new HashSet<HRegionInfo>(); |
| tmpRegions.add(HRegionInfo.FIRST_META_REGIONINFO); |
| master.getMasterWalManager().prepareLogReplay(hrs.getServerName(), tmpRegions); |
| Set<HRegionInfo> userRegionSet = new HashSet<HRegionInfo>(); |
| userRegionSet.addAll(regions); |
| master.getMasterWalManager().prepareLogReplay(hrs.getServerName(), userRegionSet); |
| boolean isMetaRegionInRecovery = false; |
| List<String> recoveringRegions = |
| zkw.getRecoverableZooKeeper().getChildren(zkw.recoveringRegionsZNode, false); |
| for (String curEncodedRegionName : recoveringRegions) { |
| if (curEncodedRegionName.equals(HRegionInfo.FIRST_META_REGIONINFO.getEncodedName())) { |
| isMetaRegionInRecovery = true; |
| break; |
| } |
| } |
| assertTrue(isMetaRegionInRecovery); |
| |
| master.getMasterWalManager().splitMetaLog(hrs.getServerName()); |
| |
| isMetaRegionInRecovery = false; |
| recoveringRegions = |
| zkw.getRecoverableZooKeeper().getChildren(zkw.recoveringRegionsZNode, false); |
| for (String curEncodedRegionName : recoveringRegions) { |
| if (curEncodedRegionName.equals(HRegionInfo.FIRST_META_REGIONINFO.getEncodedName())) { |
| isMetaRegionInRecovery = true; |
| break; |
| } |
| } |
| // meta region should be recovered |
| assertFalse(isMetaRegionInRecovery); |
| zkw.close(); |
| } |
| |
| @Ignore("DLR is broken by HBASE-12751") @Test(timeout = 300000) |
| public void testSameVersionUpdatesRecovery() throws Exception { |
| LOG.info("testSameVersionUpdatesRecovery"); |
| conf.setLong("hbase.regionserver.hlog.blocksize", 15 * 1024); |
| conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true); |
| startCluster(NUM_RS); |
| final AtomicLong sequenceId = new AtomicLong(100); |
| final int NUM_REGIONS_TO_CREATE = 40; |
| final int NUM_LOG_LINES = 1000; |
| // turn off load balancing to prevent regions from moving around otherwise |
| // they will consume recovered.edits |
| master.balanceSwitch(false); |
| |
| List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads(); |
| final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null); |
| Table ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE); |
| try { |
| List<HRegionInfo> regions = null; |
| HRegionServer hrs = null; |
| for (int i = 0; i < NUM_RS; i++) { |
| boolean isCarryingMeta = false; |
| hrs = rsts.get(i).getRegionServer(); |
| regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices()); |
| for (HRegionInfo region : regions) { |
| if (region.isMetaRegion()) { |
| isCarryingMeta = true; |
| break; |
| } |
| } |
| if (isCarryingMeta) { |
| continue; |
| } |
| break; |
| } |
| |
| LOG.info("#regions = " + regions.size()); |
| Iterator<HRegionInfo> it = regions.iterator(); |
| while (it.hasNext()) { |
| HRegionInfo region = it.next(); |
| if (region.isMetaTable() |
| || region.getEncodedName().equals( |
| HRegionInfo.FIRST_META_REGIONINFO.getEncodedName())) { |
| it.remove(); |
| } |
| } |
| if (regions.size() == 0) return; |
| HRegionInfo curRegionInfo = regions.get(0); |
| byte[] startRow = curRegionInfo.getStartKey(); |
| if (startRow == null || startRow.length == 0) { |
| startRow = new byte[] { 0, 0, 0, 0, 1 }; |
| } |
| byte[] row = Bytes.incrementBytes(startRow, 1); |
| // use last 5 bytes because HBaseTestingUtility.createMultiRegions use 5 bytes key |
| row = Arrays.copyOfRange(row, 3, 8); |
| long value = 0; |
| TableName tableName = TableName.valueOf("table"); |
| byte[] family = Bytes.toBytes("family"); |
| byte[] qualifier = Bytes.toBytes("c1"); |
| long timeStamp = System.currentTimeMillis(); |
| HTableDescriptor htd = new HTableDescriptor(tableName); |
| htd.addFamily(new HColumnDescriptor(family)); |
| final WAL wal = hrs.getWAL(curRegionInfo); |
| for (int i = 0; i < NUM_LOG_LINES; i += 1) { |
| WALEdit e = new WALEdit(); |
| value++; |
| e.add(new KeyValue(row, family, qualifier, timeStamp, Bytes.toBytes(value))); |
| wal.append(curRegionInfo, new HLogKey(curRegionInfo.getEncodedNameAsBytes(), tableName, |
| System.currentTimeMillis(), null), e, true); |
| } |
| wal.sync(); |
| wal.shutdown(); |
| |
| // wait for abort completes |
| this.abortRSAndWaitForRecovery(hrs, zkw, NUM_REGIONS_TO_CREATE); |
| |
| // verify we got the last value |
| LOG.info("Verification Starts..."); |
| Get g = new Get(row); |
| Result r = ht.get(g); |
| long theStoredVal = Bytes.toLong(r.getValue(family, qualifier)); |
| assertEquals(value, theStoredVal); |
| |
| // after flush |
| LOG.info("Verification after flush..."); |
| TEST_UTIL.getHBaseAdmin().flush(tableName); |
| r = ht.get(g); |
| theStoredVal = Bytes.toLong(r.getValue(family, qualifier)); |
| assertEquals(value, theStoredVal); |
| } finally { |
| if (ht != null) ht.close(); |
| if (zkw != null) zkw.close(); |
| } |
| } |
| |
| @Ignore("DLR is broken by HBASE-12751") @Test(timeout = 300000) |
| public void testSameVersionUpdatesRecoveryWithCompaction() throws Exception { |
| LOG.info("testSameVersionUpdatesRecoveryWithWrites"); |
| conf.setLong("hbase.regionserver.hlog.blocksize", 15 * 1024); |
| conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true); |
| conf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 30 * 1024); |
| conf.setInt("hbase.hstore.compactionThreshold", 3); |
| startCluster(NUM_RS); |
| final AtomicLong sequenceId = new AtomicLong(100); |
| final int NUM_REGIONS_TO_CREATE = 40; |
| final int NUM_LOG_LINES = 2000; |
| // turn off load balancing to prevent regions from moving around otherwise |
| // they will consume recovered.edits |
| master.balanceSwitch(false); |
| |
| List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads(); |
| final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null); |
| Table ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE); |
| try { |
| List<HRegionInfo> regions = null; |
| HRegionServer hrs = null; |
| for (int i = 0; i < NUM_RS; i++) { |
| boolean isCarryingMeta = false; |
| hrs = rsts.get(i).getRegionServer(); |
| regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices()); |
| for (HRegionInfo region : regions) { |
| if (region.isMetaRegion()) { |
| isCarryingMeta = true; |
| break; |
| } |
| } |
| if (isCarryingMeta) { |
| continue; |
| } |
| break; |
| } |
| |
| LOG.info("#regions = " + regions.size()); |
| Iterator<HRegionInfo> it = regions.iterator(); |
| while (it.hasNext()) { |
| HRegionInfo region = it.next(); |
| if (region.isMetaTable() |
| || region.getEncodedName().equals(HRegionInfo.FIRST_META_REGIONINFO.getEncodedName())) { |
| it.remove(); |
| } |
| } |
| if (regions.size() == 0) return; |
| HRegionInfo curRegionInfo = regions.get(0); |
| byte[] startRow = curRegionInfo.getStartKey(); |
| if (startRow == null || startRow.length == 0) { |
| startRow = new byte[] { 0, 0, 0, 0, 1 }; |
| } |
| byte[] row = Bytes.incrementBytes(startRow, 1); |
| // use last 5 bytes because HBaseTestingUtility.createMultiRegions use 5 bytes key |
| row = Arrays.copyOfRange(row, 3, 8); |
| long value = 0; |
| final TableName tableName = TableName.valueOf("table"); |
| byte[] family = Bytes.toBytes("family"); |
| byte[] qualifier = Bytes.toBytes("c1"); |
| long timeStamp = System.currentTimeMillis(); |
| HTableDescriptor htd = new HTableDescriptor(tableName); |
| htd.addFamily(new HColumnDescriptor(family)); |
| final WAL wal = hrs.getWAL(curRegionInfo); |
| for (int i = 0; i < NUM_LOG_LINES; i += 1) { |
| WALEdit e = new WALEdit(); |
| value++; |
| e.add(new KeyValue(row, family, qualifier, timeStamp, Bytes.toBytes(value))); |
| wal.append(curRegionInfo, new HLogKey(curRegionInfo.getEncodedNameAsBytes(), |
| tableName, System.currentTimeMillis()), e, true); |
| } |
| wal.sync(); |
| wal.shutdown(); |
| |
| // wait for abort completes |
| this.abortRSAndWaitForRecovery(hrs, zkw, NUM_REGIONS_TO_CREATE); |
| |
| // verify we got the last value |
| LOG.info("Verification Starts..."); |
| Get g = new Get(row); |
| Result r = ht.get(g); |
| long theStoredVal = Bytes.toLong(r.getValue(family, qualifier)); |
| assertEquals(value, theStoredVal); |
| |
| // after flush & compaction |
| LOG.info("Verification after flush..."); |
| TEST_UTIL.getHBaseAdmin().flush(tableName); |
| TEST_UTIL.getHBaseAdmin().compact(tableName); |
| |
| // wait for compaction completes |
| TEST_UTIL.waitFor(30000, 200, new Waiter.Predicate<Exception>() { |
| @Override |
| public boolean evaluate() throws Exception { |
| return (TEST_UTIL.getHBaseAdmin() |
| .getCompactionState(tableName) == CompactionState.NONE); |
| } |
| }); |
| |
| r = ht.get(g); |
| theStoredVal = Bytes.toLong(r.getValue(family, qualifier)); |
| assertEquals(value, theStoredVal); |
| } finally { |
| if (ht != null) ht.close(); |
| if (zkw != null) zkw.close(); |
| } |
| } |
| |
| @Test(timeout = 300000) |
| public void testReadWriteSeqIdFiles() throws Exception { |
| LOG.info("testReadWriteSeqIdFiles"); |
| startCluster(2); |
| final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null); |
| Table ht = installTable(zkw, "table", "family", 10); |
| try { |
| FileSystem fs = master.getMasterFileSystem().getFileSystem(); |
| Path tableDir = FSUtils.getTableDir(FSUtils.getRootDir(conf), TableName.valueOf("table")); |
| List<Path> regionDirs = FSUtils.getRegionDirs(fs, tableDir); |
| long newSeqId = WALSplitter.writeRegionSequenceIdFile(fs, regionDirs.get(0), 1L, 1000L); |
| WALSplitter.writeRegionSequenceIdFile(fs, regionDirs.get(0) , 1L, 1000L); |
| assertEquals(newSeqId + 2000, |
| WALSplitter.writeRegionSequenceIdFile(fs, regionDirs.get(0), 3L, 1000L)); |
| |
| Path editsdir = WALSplitter.getRegionDirRecoveredEditsDir(regionDirs.get(0)); |
| FileStatus[] files = FSUtils.listStatus(fs, editsdir, new PathFilter() { |
| @Override |
| public boolean accept(Path p) { |
| return WALSplitter.isSequenceIdFile(p); |
| } |
| }); |
| // only one seqid file should exist |
| assertEquals(1, files.length); |
| |
| // verify all seqId files aren't treated as recovered.edits files |
| NavigableSet<Path> recoveredEdits = |
| WALSplitter.getSplitEditFilesSorted(fs, regionDirs.get(0)); |
| assertEquals(0, recoveredEdits.size()); |
| } finally { |
| if (ht != null) ht.close(); |
| if (zkw != null) zkw.close(); |
| } |
| } |
| |
| Table installTable(ZooKeeperWatcher zkw, String tname, String fname, int nrs) throws Exception { |
| return installTable(zkw, tname, fname, nrs, 0); |
| } |
| |
| Table installTable(ZooKeeperWatcher zkw, String tname, String fname, int nrs, |
| int existingRegions) throws Exception { |
| // Create a table with regions |
| TableName table = TableName.valueOf(tname); |
| byte [] family = Bytes.toBytes(fname); |
| LOG.info("Creating table with " + nrs + " regions"); |
| Table ht = TEST_UTIL.createMultiRegionTable(table, family, nrs); |
| int numRegions = -1; |
| try (RegionLocator r = TEST_UTIL.getConnection().getRegionLocator(table)) { |
| numRegions = r.getStartKeys().length; |
| } |
| assertEquals(nrs, numRegions); |
| LOG.info("Waiting for no more RIT\n"); |
| blockUntilNoRIT(zkw, master); |
| // disable-enable cycle to get rid of table's dead regions left behind |
| // by createMultiRegions |
| LOG.debug("Disabling table\n"); |
| TEST_UTIL.getHBaseAdmin().disableTable(table); |
| LOG.debug("Waiting for no more RIT\n"); |
| blockUntilNoRIT(zkw, master); |
| NavigableSet<String> regions = HBaseTestingUtility.getAllOnlineRegions(cluster); |
| LOG.debug("Verifying only catalog and namespace regions are assigned\n"); |
| if (regions.size() != 2) { |
| for (String oregion : regions) |
| LOG.debug("Region still online: " + oregion); |
| } |
| assertEquals(2 + existingRegions, regions.size()); |
| LOG.debug("Enabling table\n"); |
| TEST_UTIL.getHBaseAdmin().enableTable(table); |
| LOG.debug("Waiting for no more RIT\n"); |
| blockUntilNoRIT(zkw, master); |
| LOG.debug("Verifying there are " + numRegions + " assigned on cluster\n"); |
| regions = HBaseTestingUtility.getAllOnlineRegions(cluster); |
| assertEquals(numRegions + 2 + existingRegions, regions.size()); |
| return ht; |
| } |
| |
| void populateDataInTable(int nrows, String fname) throws Exception { |
| byte [] family = Bytes.toBytes(fname); |
| |
| List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads(); |
| assertEquals(NUM_RS, rsts.size()); |
| |
| for (RegionServerThread rst : rsts) { |
| HRegionServer hrs = rst.getRegionServer(); |
| List<HRegionInfo> hris = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices()); |
| for (HRegionInfo hri : hris) { |
| if (hri.getTable().isSystemTable()) { |
| continue; |
| } |
| LOG.debug("adding data to rs = " + rst.getName() + |
| " region = "+ hri.getRegionNameAsString()); |
| Region region = hrs.getOnlineRegion(hri.getRegionName()); |
| assertTrue(region != null); |
| putData(region, hri.getStartKey(), nrows, Bytes.toBytes("q"), family); |
| } |
| } |
| |
| for (MasterThread mt : cluster.getLiveMasterThreads()) { |
| HRegionServer hrs = mt.getMaster(); |
| List<HRegionInfo> hris; |
| try { |
| hris = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices()); |
| } catch (ServerNotRunningYetException e) { |
| // It's ok: this master may be a backup. Ignored. |
| continue; |
| } |
| for (HRegionInfo hri : hris) { |
| if (hri.getTable().isSystemTable()) { |
| continue; |
| } |
| LOG.debug("adding data to rs = " + mt.getName() + |
| " region = "+ hri.getRegionNameAsString()); |
| Region region = hrs.getOnlineRegion(hri.getRegionName()); |
| assertTrue(region != null); |
| putData(region, hri.getStartKey(), nrows, Bytes.toBytes("q"), family); |
| } |
| } |
| } |
| |
| public void makeWAL(HRegionServer hrs, List<HRegionInfo> regions, String tname, String fname, |
| int num_edits, int edit_size) throws IOException { |
| makeWAL(hrs, regions, tname, fname, num_edits, edit_size, true); |
| } |
| |
| public void makeWAL(HRegionServer hrs, List<HRegionInfo> regions, String tname, String fname, |
| int num_edits, int edit_size, boolean cleanShutdown) throws IOException { |
| TableName fullTName = TableName.valueOf(tname); |
| // remove root and meta region |
| regions.remove(HRegionInfo.FIRST_META_REGIONINFO); |
| // using one sequenceId for edits across all regions is ok. |
| final AtomicLong sequenceId = new AtomicLong(10); |
| |
| |
| for(Iterator<HRegionInfo> iter = regions.iterator(); iter.hasNext(); ) { |
| HRegionInfo regionInfo = iter.next(); |
| if(regionInfo.getTable().isSystemTable()) { |
| iter.remove(); |
| } |
| } |
| HTableDescriptor htd = new HTableDescriptor(fullTName); |
| byte[] family = Bytes.toBytes(fname); |
| htd.addFamily(new HColumnDescriptor(family)); |
| byte[] value = new byte[edit_size]; |
| |
| List<HRegionInfo> hris = new ArrayList<HRegionInfo>(); |
| for (HRegionInfo region : regions) { |
| if (!region.getTable().getNameAsString().equalsIgnoreCase(tname)) { |
| continue; |
| } |
| hris.add(region); |
| } |
| LOG.info("Creating wal edits across " + hris.size() + " regions."); |
| for (int i = 0; i < edit_size; i++) { |
| value[i] = (byte) ('a' + (i % 26)); |
| } |
| int n = hris.size(); |
| int[] counts = new int[n]; |
| // sync every ~30k to line up with desired wal rolls |
| final int syncEvery = 30 * 1024 / edit_size; |
| if (n > 0) { |
| for (int i = 0; i < num_edits; i += 1) { |
| WALEdit e = new WALEdit(); |
| HRegionInfo curRegionInfo = hris.get(i % n); |
| final WAL log = hrs.getWAL(curRegionInfo); |
| byte[] startRow = curRegionInfo.getStartKey(); |
| if (startRow == null || startRow.length == 0) { |
| startRow = new byte[] { 0, 0, 0, 0, 1 }; |
| } |
| byte[] row = Bytes.incrementBytes(startRow, counts[i % n]); |
| row = Arrays.copyOfRange(row, 3, 8); // use last 5 bytes because |
| // HBaseTestingUtility.createMultiRegions use 5 bytes |
| // key |
| byte[] qualifier = Bytes.toBytes("c" + Integer.toString(i)); |
| e.add(new KeyValue(row, family, qualifier, System.currentTimeMillis(), value)); |
| log.append(curRegionInfo, |
| new HLogKey(curRegionInfo.getEncodedNameAsBytes(), fullTName, |
| System.currentTimeMillis()), e, true); |
| if (0 == i % syncEvery) { |
| log.sync(); |
| } |
| counts[i % n] += 1; |
| } |
| } |
| // done as two passes because the regions might share logs. shutdown is idempotent, but sync |
| // will cause errors if done after. |
| for (HRegionInfo info : hris) { |
| final WAL log = hrs.getWAL(info); |
| log.sync(); |
| } |
| if (cleanShutdown) { |
| for (HRegionInfo info : hris) { |
| final WAL log = hrs.getWAL(info); |
| log.shutdown(); |
| } |
| } |
| for (int i = 0; i < n; i++) { |
| LOG.info("region " + hris.get(i).getRegionNameAsString() + " has " + counts[i] + " edits"); |
| } |
| return; |
| } |
| |
| private int countWAL(Path log, FileSystem fs, Configuration conf) |
| throws IOException { |
| int count = 0; |
| WAL.Reader in = WALFactory.createReader(fs, log, conf); |
| try { |
| WAL.Entry e; |
| while ((e = in.next()) != null) { |
| if (!WALEdit.isMetaEditFamily(e.getEdit().getCells().get(0))) { |
| count++; |
| } |
| } |
| } finally { |
| try { |
| in.close(); |
| } catch (IOException exception) { |
| LOG.warn("Problem closing wal: " + exception.getMessage()); |
| LOG.debug("exception details.", exception); |
| } |
| } |
| return count; |
| } |
| |
| private void blockUntilNoRIT(ZooKeeperWatcher zkw, HMaster master) throws Exception { |
| TEST_UTIL.waitUntilNoRegionsInTransition(60000); |
| } |
| |
| private void putData(Region region, byte[] startRow, int numRows, byte [] qf, |
| byte [] ...families) |
| throws IOException { |
| for(int i = 0; i < numRows; i++) { |
| Put put = new Put(Bytes.add(startRow, Bytes.toBytes(i))); |
| for(byte [] family : families) { |
| put.addColumn(family, qf, null); |
| } |
| region.put(put); |
| } |
| } |
| |
| /** |
| * Load table with puts and deletes with expected values so that we can verify later |
| */ |
| private void prepareData(final Table t, final byte[] f, final byte[] column) throws IOException { |
| byte[] k = new byte[3]; |
| |
| // add puts |
| List<Put> puts = new ArrayList<>(); |
| for (byte b1 = 'a'; b1 <= 'z'; b1++) { |
| for (byte b2 = 'a'; b2 <= 'z'; b2++) { |
| for (byte b3 = 'a'; b3 <= 'z'; b3++) { |
| k[0] = b1; |
| k[1] = b2; |
| k[2] = b3; |
| Put put = new Put(k); |
| put.addColumn(f, column, k); |
| puts.add(put); |
| } |
| } |
| } |
| t.put(puts); |
| // add deletes |
| for (byte b3 = 'a'; b3 <= 'z'; b3++) { |
| k[0] = 'a'; |
| k[1] = 'a'; |
| k[2] = b3; |
| Delete del = new Delete(k); |
| t.delete(del); |
| } |
| } |
| |
| private void waitForCounter(AtomicLong ctr, long oldval, long newval, |
| long timems) { |
| long curt = System.currentTimeMillis(); |
| long endt = curt + timems; |
| while (curt < endt) { |
| if (ctr.get() == oldval) { |
| Thread.yield(); |
| curt = System.currentTimeMillis(); |
| } else { |
| assertEquals(newval, ctr.get()); |
| return; |
| } |
| } |
| assertTrue(false); |
| } |
| |
| private void abortMaster(MiniHBaseCluster cluster) throws InterruptedException { |
| for (MasterThread mt : cluster.getLiveMasterThreads()) { |
| if (mt.getMaster().isActiveMaster()) { |
| mt.getMaster().abort("Aborting for tests", new Exception("Trace info")); |
| mt.join(); |
| break; |
| } |
| } |
| LOG.debug("Master is aborted"); |
| } |
| |
| /** |
| * Find a RS that has regions of a table. |
| * @param hasMetaRegion when true, the returned RS has hbase:meta region as well |
| * @param tableName |
| * @return |
| * @throws Exception |
| */ |
| private HRegionServer findRSToKill(boolean hasMetaRegion, String tableName) throws Exception { |
| List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads(); |
| List<HRegionInfo> regions = null; |
| HRegionServer hrs = null; |
| |
| for (RegionServerThread rst: rsts) { |
| hrs = rst.getRegionServer(); |
| while (rst.isAlive() && !hrs.isOnline()) { |
| Thread.sleep(100); |
| } |
| if (!rst.isAlive()) { |
| continue; |
| } |
| boolean isCarryingMeta = false; |
| boolean foundTableRegion = false; |
| regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices()); |
| for (HRegionInfo region : regions) { |
| if (region.isMetaRegion()) { |
| isCarryingMeta = true; |
| } |
| if (tableName == null || region.getTable().getNameAsString().equals(tableName)) { |
| foundTableRegion = true; |
| } |
| if (foundTableRegion && (isCarryingMeta || !hasMetaRegion)) { |
| break; |
| } |
| } |
| if (isCarryingMeta && hasMetaRegion) { |
| // clients ask for a RS with META |
| if (!foundTableRegion) { |
| final HRegionServer destRS = hrs; |
| // the RS doesn't have regions of the specified table so we need move one to this RS |
| List<HRegionInfo> tableRegions = |
| TEST_UTIL.getHBaseAdmin().getTableRegions(TableName.valueOf(tableName)); |
| final HRegionInfo hri = tableRegions.get(0); |
| TEST_UTIL.getHBaseAdmin().move(hri.getEncodedNameAsBytes(), |
| Bytes.toBytes(destRS.getServerName().getServerName())); |
| // wait for region move completes |
| final RegionStates regionStates = |
| TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates(); |
| TEST_UTIL.waitFor(45000, 200, new Waiter.Predicate<Exception>() { |
| @Override |
| public boolean evaluate() throws Exception { |
| ServerName sn = regionStates.getRegionServerOfRegion(hri); |
| return (sn != null && sn.equals(destRS.getServerName())); |
| } |
| }); |
| } |
| return hrs; |
| } else if (hasMetaRegion || isCarryingMeta) { |
| continue; |
| } |
| if (foundTableRegion) break; |
| } |
| |
| return hrs; |
| } |
| } |