| /** |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.hbase.master; |
| |
| import static org.apache.hadoop.hbase.SplitLogCounters.resetCounters; |
| import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_heartbeat; |
| import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_node_create_queued; |
| import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_orphan_task_acquired; |
| import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_rescan; |
| import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_rescan_deleted; |
| import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit; |
| import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit_dead_server_task; |
| import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit_failed; |
| import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit_force; |
| import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit_threshold_reached; |
| import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit_unassigned; |
| import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_task_deleted; |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertFalse; |
| import static org.junit.Assert.assertTrue; |
| |
| import java.io.IOException; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.UUID; |
| import java.util.concurrent.atomic.AtomicLong; |
| |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.hbase.ChoreService; |
| import org.apache.hadoop.hbase.CoordinatedStateManager; |
| import org.apache.hadoop.hbase.CoordinatedStateManagerFactory; |
| import org.apache.hadoop.hbase.HBaseTestingUtility; |
| import org.apache.hadoop.hbase.HConstants; |
| import org.apache.hadoop.hbase.HRegionInfo; |
| import org.apache.hadoop.hbase.Server; |
| import org.apache.hadoop.hbase.ServerName; |
| import org.apache.hadoop.hbase.SplitLogCounters; |
| import org.apache.hadoop.hbase.SplitLogTask; |
| import org.apache.hadoop.hbase.Stoppable; |
| import org.apache.hadoop.hbase.Waiter; |
| import org.apache.hadoop.hbase.client.ClusterConnection; |
| import org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination; |
| import org.apache.hadoop.hbase.master.SplitLogManager.Task; |
| import org.apache.hadoop.hbase.master.SplitLogManager.TaskBatch; |
| import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode; |
| import org.apache.hadoop.hbase.regionserver.TestMasterAddressTracker.NodeCreationListener; |
| import org.apache.hadoop.hbase.testclassification.MasterTests; |
| import org.apache.hadoop.hbase.testclassification.MediumTests; |
| import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; |
| import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; |
| import org.apache.hadoop.hbase.zookeeper.ZKUtil; |
| import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; |
| import org.apache.log4j.Level; |
| import org.apache.log4j.Logger; |
| import org.apache.zookeeper.CreateMode; |
| import org.apache.zookeeper.KeeperException; |
| import org.apache.zookeeper.ZooDefs.Ids; |
| import org.junit.After; |
| import org.junit.Assert; |
| import org.junit.Before; |
| import org.junit.Ignore; |
| import org.junit.Test; |
| import org.junit.experimental.categories.Category; |
| import org.mockito.Mockito; |
| |
| @Category({MasterTests.class, MediumTests.class}) |
| public class TestSplitLogManager { |
| private static final Log LOG = LogFactory.getLog(TestSplitLogManager.class); |
| private final ServerName DUMMY_MASTER = ServerName.valueOf("dummy-master,1,1"); |
| private final ServerManager sm = Mockito.mock(ServerManager.class); |
| private final MasterServices master = Mockito.mock(MasterServices.class); |
| |
| static { |
| Logger.getLogger("org.apache.hadoop.hbase").setLevel(Level.DEBUG); |
| } |
| |
| private ZooKeeperWatcher zkw; |
| private DummyServer ds; |
| private static boolean stopped = false; |
| private SplitLogManager slm; |
| private Configuration conf; |
| private int to; |
| private RecoveryMode mode; |
| |
| private static HBaseTestingUtility TEST_UTIL; |
| |
| class DummyServer implements Server { |
| private ZooKeeperWatcher zkw; |
| private Configuration conf; |
| private CoordinatedStateManager cm; |
| |
| public DummyServer(ZooKeeperWatcher zkw, Configuration conf) { |
| this.zkw = zkw; |
| this.conf = conf; |
| cm = CoordinatedStateManagerFactory.getCoordinatedStateManager(conf); |
| cm.initialize(this); |
| } |
| |
| @Override |
| public void abort(String why, Throwable e) { |
| } |
| |
| @Override |
| public boolean isAborted() { |
| return false; |
| } |
| |
| @Override |
| public void stop(String why) { |
| } |
| |
| @Override |
| public boolean isStopped() { |
| return false; |
| } |
| |
| @Override |
| public Configuration getConfiguration() { |
| return conf; |
| } |
| |
| @Override |
| public ZooKeeperWatcher getZooKeeper() { |
| return zkw; |
| } |
| |
| @Override |
| public ServerName getServerName() { |
| return null; |
| } |
| |
| @Override |
| public CoordinatedStateManager getCoordinatedStateManager() { |
| return cm; |
| } |
| |
| @Override |
| public ClusterConnection getConnection() { |
| return null; |
| } |
| |
| @Override |
| public MetaTableLocator getMetaTableLocator() { |
| return null; |
| } |
| |
| @Override |
| public ChoreService getChoreService() { |
| return null; |
| } |
| |
| @Override |
| public ClusterConnection getClusterConnection() { |
| // TODO Auto-generated method stub |
| return null; |
| } |
| } |
| |
| static Stoppable stopper = new Stoppable() { |
| @Override |
| public void stop(String why) { |
| stopped = true; |
| } |
| |
| @Override |
| public boolean isStopped() { |
| return stopped; |
| } |
| }; |
| |
| @Before |
| public void setup() throws Exception { |
| TEST_UTIL = new HBaseTestingUtility(); |
| TEST_UTIL.startMiniZKCluster(); |
| conf = TEST_UTIL.getConfiguration(); |
| // Use a different ZK wrapper instance for each tests. |
| zkw = |
| new ZooKeeperWatcher(conf, "split-log-manager-tests" + UUID.randomUUID().toString(), null); |
| ds = new DummyServer(zkw, conf); |
| |
| ZKUtil.deleteChildrenRecursively(zkw, zkw.baseZNode); |
| ZKUtil.createAndFailSilent(zkw, zkw.baseZNode); |
| assertTrue(ZKUtil.checkExists(zkw, zkw.baseZNode) != -1); |
| LOG.debug(zkw.baseZNode + " created"); |
| ZKUtil.createAndFailSilent(zkw, zkw.splitLogZNode); |
| assertTrue(ZKUtil.checkExists(zkw, zkw.splitLogZNode) != -1); |
| LOG.debug(zkw.splitLogZNode + " created"); |
| |
| stopped = false; |
| resetCounters(); |
| |
| // By default, we let the test manage the error as before, so the server |
| // does not appear as dead from the master point of view, only from the split log pov. |
| Mockito.when(sm.isServerOnline(Mockito.any(ServerName.class))).thenReturn(true); |
| Mockito.when(master.getServerManager()).thenReturn(sm); |
| |
| to = 12000; |
| conf.setInt(HConstants.HBASE_SPLITLOG_MANAGER_TIMEOUT, to); |
| conf.setInt("hbase.splitlog.manager.unassigned.timeout", 2 * to); |
| |
| conf.setInt("hbase.splitlog.manager.timeoutmonitor.period", 100); |
| to = to + 16 * 100; |
| |
| this.mode = |
| (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) ? RecoveryMode.LOG_REPLAY |
| : RecoveryMode.LOG_SPLITTING); |
| } |
| |
| @After |
| public void teardown() throws IOException, KeeperException { |
| stopper.stop(""); |
| if (slm != null) slm.stop(); |
| TEST_UTIL.shutdownMiniZKCluster(); |
| } |
| |
| private interface Expr { |
| long eval(); |
| } |
| |
| private void waitForCounter(final AtomicLong ctr, long oldval, long newval, long timems) |
| throws Exception { |
| Expr e = new Expr() { |
| @Override |
| public long eval() { |
| return ctr.get(); |
| } |
| }; |
| waitForCounter(e, oldval, newval, timems); |
| return; |
| } |
| |
| private void waitForCounter(final Expr e, final long oldval, long newval, long timems) |
| throws Exception { |
| |
| TEST_UTIL.waitFor(timems, 10, new Waiter.Predicate<Exception>() { |
| @Override |
| public boolean evaluate() throws Exception { |
| return (e.eval() != oldval); |
| } |
| }); |
| |
| assertEquals(newval, e.eval()); |
| } |
| |
| private String submitTaskAndWait(TaskBatch batch, String name) throws KeeperException, |
| InterruptedException { |
| String tasknode = ZKSplitLog.getEncodedNodeName(zkw, name); |
| NodeCreationListener listener = new NodeCreationListener(zkw, tasknode); |
| zkw.registerListener(listener); |
| ZKUtil.watchAndCheckExists(zkw, tasknode); |
| |
| slm.enqueueSplitTask(name, batch); |
| assertEquals(1, batch.installed); |
| assertTrue(slm.findOrCreateOrphanTask(tasknode).batch == batch); |
| assertEquals(1L, tot_mgr_node_create_queued.get()); |
| |
| LOG.debug("waiting for task node creation"); |
| listener.waitForCreation(); |
| LOG.debug("task created"); |
| return tasknode; |
| } |
| |
| /** |
| * Test whether the splitlog correctly creates a task in zookeeper |
| * @throws Exception |
| */ |
| @Test (timeout=180000) |
| public void testTaskCreation() throws Exception { |
| |
| LOG.info("TestTaskCreation - test the creation of a task in zk"); |
| slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER); |
| TaskBatch batch = new TaskBatch(); |
| |
| String tasknode = submitTaskAndWait(batch, "foo/1"); |
| |
| byte[] data = ZKUtil.getData(zkw, tasknode); |
| SplitLogTask slt = SplitLogTask.parseFrom(data); |
| LOG.info("Task node created " + slt.toString()); |
| assertTrue(slt.isUnassigned(DUMMY_MASTER)); |
| } |
| |
| @Test (timeout=180000) |
| public void testOrphanTaskAcquisition() throws Exception { |
| LOG.info("TestOrphanTaskAcquisition"); |
| |
| String tasknode = ZKSplitLog.getEncodedNodeName(zkw, "orphan/test/slash"); |
| SplitLogTask slt = new SplitLogTask.Owned(DUMMY_MASTER, this.mode); |
| zkw.getRecoverableZooKeeper().create(tasknode, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE, |
| CreateMode.PERSISTENT); |
| |
| slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER); |
| waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to/2); |
| Task task = slm.findOrCreateOrphanTask(tasknode); |
| assertTrue(task.isOrphan()); |
| waitForCounter(tot_mgr_heartbeat, 0, 1, to/2); |
| assertFalse(task.isUnassigned()); |
| long curt = System.currentTimeMillis(); |
| assertTrue((task.last_update <= curt) && |
| (task.last_update > (curt - 1000))); |
| LOG.info("waiting for manager to resubmit the orphan task"); |
| waitForCounter(tot_mgr_resubmit, 0, 1, to + to/2); |
| assertTrue(task.isUnassigned()); |
| waitForCounter(tot_mgr_rescan, 0, 1, to + to/2); |
| } |
| |
| @Test (timeout=180000) |
| public void testUnassignedOrphan() throws Exception { |
| LOG.info("TestUnassignedOrphan - an unassigned task is resubmitted at" + |
| " startup"); |
| String tasknode = ZKSplitLog.getEncodedNodeName(zkw, "orphan/test/slash"); |
| //create an unassigned orphan task |
| SplitLogTask slt = new SplitLogTask.Unassigned(DUMMY_MASTER, this.mode); |
| zkw.getRecoverableZooKeeper().create(tasknode, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE, |
| CreateMode.PERSISTENT); |
| int version = ZKUtil.checkExists(zkw, tasknode); |
| |
| slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER); |
| waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to/2); |
| Task task = slm.findOrCreateOrphanTask(tasknode); |
| assertTrue(task.isOrphan()); |
| assertTrue(task.isUnassigned()); |
| // wait for RESCAN node to be created |
| waitForCounter(tot_mgr_rescan, 0, 1, to/2); |
| Task task2 = slm.findOrCreateOrphanTask(tasknode); |
| assertTrue(task == task2); |
| LOG.debug("task = " + task); |
| assertEquals(1L, tot_mgr_resubmit.get()); |
| assertEquals(1, task.incarnation.get()); |
| assertEquals(0, task.unforcedResubmits.get()); |
| assertTrue(task.isOrphan()); |
| assertTrue(task.isUnassigned()); |
| assertTrue(ZKUtil.checkExists(zkw, tasknode) > version); |
| } |
| |
| @Test (timeout=180000) |
| public void testMultipleResubmits() throws Exception { |
| LOG.info("TestMultipleResbmits - no indefinite resubmissions"); |
| conf.setInt("hbase.splitlog.max.resubmit", 2); |
| slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER); |
| TaskBatch batch = new TaskBatch(); |
| |
| String tasknode = submitTaskAndWait(batch, "foo/1"); |
| int version = ZKUtil.checkExists(zkw, tasknode); |
| final ServerName worker1 = ServerName.valueOf("worker1,1,1"); |
| final ServerName worker2 = ServerName.valueOf("worker2,1,1"); |
| final ServerName worker3 = ServerName.valueOf("worker3,1,1"); |
| SplitLogTask slt = new SplitLogTask.Owned(worker1, this.mode); |
| ZKUtil.setData(zkw, tasknode, slt.toByteArray()); |
| waitForCounter(tot_mgr_heartbeat, 0, 1, to/2); |
| waitForCounter(tot_mgr_resubmit, 0, 1, to + to/2); |
| int version1 = ZKUtil.checkExists(zkw, tasknode); |
| assertTrue(version1 > version); |
| slt = new SplitLogTask.Owned(worker2, this.mode); |
| ZKUtil.setData(zkw, tasknode, slt.toByteArray()); |
| waitForCounter(tot_mgr_heartbeat, 1, 2, to/2); |
| waitForCounter(tot_mgr_resubmit, 1, 2, to + to/2); |
| int version2 = ZKUtil.checkExists(zkw, tasknode); |
| assertTrue(version2 > version1); |
| slt = new SplitLogTask.Owned(worker3, this.mode); |
| ZKUtil.setData(zkw, tasknode, slt.toByteArray()); |
| waitForCounter(tot_mgr_heartbeat, 2, 3, to/2); |
| waitForCounter(tot_mgr_resubmit_threshold_reached, 0, 1, to + to/2); |
| Thread.sleep(to + to/2); |
| assertEquals(2L, tot_mgr_resubmit.get() - tot_mgr_resubmit_force.get()); |
| } |
| |
| @Test (timeout=180000) |
| public void testRescanCleanup() throws Exception { |
| LOG.info("TestRescanCleanup - ensure RESCAN nodes are cleaned up"); |
| |
| slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER); |
| TaskBatch batch = new TaskBatch(); |
| |
| String tasknode = submitTaskAndWait(batch, "foo/1"); |
| int version = ZKUtil.checkExists(zkw, tasknode); |
| final ServerName worker1 = ServerName.valueOf("worker1,1,1"); |
| SplitLogTask slt = new SplitLogTask.Owned(worker1, this.mode); |
| ZKUtil.setData(zkw, tasknode, slt.toByteArray()); |
| waitForCounter(tot_mgr_heartbeat, 0, 1, to/2); |
| waitForCounter(new Expr() { |
| @Override |
| public long eval() { |
| return (tot_mgr_resubmit.get() + tot_mgr_resubmit_failed.get()); |
| } |
| }, 0, 1, 5*60000); // wait long enough |
| Assert.assertEquals("Could not run test. Lost ZK connection?", 0, tot_mgr_resubmit_failed.get()); |
| int version1 = ZKUtil.checkExists(zkw, tasknode); |
| assertTrue(version1 > version); |
| byte[] taskstate = ZKUtil.getData(zkw, tasknode); |
| slt = SplitLogTask.parseFrom(taskstate); |
| assertTrue(slt.isUnassigned(DUMMY_MASTER)); |
| |
| waitForCounter(tot_mgr_rescan_deleted, 0, 1, to/2); |
| } |
| |
| @Test (timeout=180000) |
| public void testTaskDone() throws Exception { |
| LOG.info("TestTaskDone - cleanup task node once in DONE state"); |
| |
| slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER); |
| TaskBatch batch = new TaskBatch(); |
| String tasknode = submitTaskAndWait(batch, "foo/1"); |
| final ServerName worker1 = ServerName.valueOf("worker1,1,1"); |
| SplitLogTask slt = new SplitLogTask.Done(worker1, this.mode); |
| ZKUtil.setData(zkw, tasknode, slt.toByteArray()); |
| synchronized (batch) { |
| while (batch.installed != batch.done) { |
| batch.wait(); |
| } |
| } |
| waitForCounter(tot_mgr_task_deleted, 0, 1, to/2); |
| assertTrue(ZKUtil.checkExists(zkw, tasknode) == -1); |
| } |
| |
| @Test (timeout=180000) |
| public void testTaskErr() throws Exception { |
| LOG.info("TestTaskErr - cleanup task node once in ERR state"); |
| |
| conf.setInt("hbase.splitlog.max.resubmit", 0); |
| slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER); |
| TaskBatch batch = new TaskBatch(); |
| |
| String tasknode = submitTaskAndWait(batch, "foo/1"); |
| final ServerName worker1 = ServerName.valueOf("worker1,1,1"); |
| SplitLogTask slt = new SplitLogTask.Err(worker1, this.mode); |
| ZKUtil.setData(zkw, tasknode, slt.toByteArray()); |
| |
| synchronized (batch) { |
| while (batch.installed != batch.error) { |
| batch.wait(); |
| } |
| } |
| waitForCounter(tot_mgr_task_deleted, 0, 1, to/2); |
| assertTrue(ZKUtil.checkExists(zkw, tasknode) == -1); |
| conf.setInt("hbase.splitlog.max.resubmit", ZKSplitLogManagerCoordination.DEFAULT_MAX_RESUBMIT); |
| } |
| |
| @Test (timeout=180000) |
| public void testTaskResigned() throws Exception { |
| LOG.info("TestTaskResigned - resubmit task node once in RESIGNED state"); |
| assertEquals(tot_mgr_resubmit.get(), 0); |
| slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER); |
| assertEquals(tot_mgr_resubmit.get(), 0); |
| TaskBatch batch = new TaskBatch(); |
| String tasknode = submitTaskAndWait(batch, "foo/1"); |
| assertEquals(tot_mgr_resubmit.get(), 0); |
| final ServerName worker1 = ServerName.valueOf("worker1,1,1"); |
| assertEquals(tot_mgr_resubmit.get(), 0); |
| SplitLogTask slt = new SplitLogTask.Resigned(worker1, this.mode); |
| assertEquals(tot_mgr_resubmit.get(), 0); |
| ZKUtil.setData(zkw, tasknode, slt.toByteArray()); |
| ZKUtil.checkExists(zkw, tasknode); |
| // Could be small race here. |
| if (tot_mgr_resubmit.get() == 0) { |
| waitForCounter(tot_mgr_resubmit, 0, 1, to/2); |
| } |
| assertEquals(tot_mgr_resubmit.get(), 1); |
| |
| byte[] taskstate = ZKUtil.getData(zkw, tasknode); |
| slt = SplitLogTask.parseFrom(taskstate); |
| assertTrue(slt.isUnassigned(DUMMY_MASTER)); |
| } |
| |
| @Test (timeout=180000) |
| public void testUnassignedTimeout() throws Exception { |
| LOG.info("TestUnassignedTimeout - iff all tasks are unassigned then" + |
| " resubmit"); |
| |
| // create an orphan task in OWNED state |
| String tasknode1 = ZKSplitLog.getEncodedNodeName(zkw, "orphan/1"); |
| final ServerName worker1 = ServerName.valueOf("worker1,1,1"); |
| SplitLogTask slt = new SplitLogTask.Owned(worker1, this.mode); |
| zkw.getRecoverableZooKeeper().create(tasknode1, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE, |
| CreateMode.PERSISTENT); |
| |
| slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER); |
| waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to/2); |
| |
| // submit another task which will stay in unassigned mode |
| TaskBatch batch = new TaskBatch(); |
| submitTaskAndWait(batch, "foo/1"); |
| |
| // keep updating the orphan owned node every to/2 seconds |
| for (int i = 0; i < (3 * to)/100; i++) { |
| Thread.sleep(100); |
| final ServerName worker2 = ServerName.valueOf("worker1,1,1"); |
| slt = new SplitLogTask.Owned(worker2, this.mode); |
| ZKUtil.setData(zkw, tasknode1, slt.toByteArray()); |
| } |
| |
| // since we have stopped heartbeating the owned node therefore it should |
| // get resubmitted |
| LOG.info("waiting for manager to resubmit the orphan task"); |
| waitForCounter(tot_mgr_resubmit, 0, 1, to + to/2); |
| |
| // now all the nodes are unassigned. manager should post another rescan |
| waitForCounter(tot_mgr_resubmit_unassigned, 0, 1, 2 * to + to/2); |
| } |
| |
| @Test (timeout=180000) |
| public void testDeadWorker() throws Exception { |
| LOG.info("testDeadWorker"); |
| |
| conf.setLong("hbase.splitlog.max.resubmit", 0); |
| slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER); |
| TaskBatch batch = new TaskBatch(); |
| |
| String tasknode = submitTaskAndWait(batch, "foo/1"); |
| int version = ZKUtil.checkExists(zkw, tasknode); |
| final ServerName worker1 = ServerName.valueOf("worker1,1,1"); |
| SplitLogTask slt = new SplitLogTask.Owned(worker1, this.mode); |
| ZKUtil.setData(zkw, tasknode, slt.toByteArray()); |
| if (tot_mgr_heartbeat.get() == 0) waitForCounter(tot_mgr_heartbeat, 0, 1, to/2); |
| slm.handleDeadWorker(worker1); |
| if (tot_mgr_resubmit.get() == 0) waitForCounter(tot_mgr_resubmit, 0, 1, to+to/2); |
| if (tot_mgr_resubmit_dead_server_task.get() == 0) { |
| waitForCounter(tot_mgr_resubmit_dead_server_task, 0, 1, to + to/2); |
| } |
| |
| int version1 = ZKUtil.checkExists(zkw, tasknode); |
| assertTrue(version1 > version); |
| byte[] taskstate = ZKUtil.getData(zkw, tasknode); |
| slt = SplitLogTask.parseFrom(taskstate); |
| assertTrue(slt.isUnassigned(DUMMY_MASTER)); |
| return; |
| } |
| |
| @Test (timeout=180000) |
| public void testWorkerCrash() throws Exception { |
| slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER); |
| TaskBatch batch = new TaskBatch(); |
| |
| String tasknode = submitTaskAndWait(batch, "foo/1"); |
| final ServerName worker1 = ServerName.valueOf("worker1,1,1"); |
| |
| SplitLogTask slt = new SplitLogTask.Owned(worker1, this.mode); |
| ZKUtil.setData(zkw, tasknode, slt.toByteArray()); |
| if (tot_mgr_heartbeat.get() == 0) waitForCounter(tot_mgr_heartbeat, 0, 1, to/2); |
| |
| // Not yet resubmitted. |
| Assert.assertEquals(0, tot_mgr_resubmit.get()); |
| |
| // This server becomes dead |
| Mockito.when(sm.isServerOnline(worker1)).thenReturn(false); |
| |
| Thread.sleep(1300); // The timeout checker is done every 1000 ms (hardcoded). |
| |
| // It has been resubmitted |
| Assert.assertEquals(1, tot_mgr_resubmit.get()); |
| } |
| |
| @Test (timeout=180000) |
| public void testEmptyLogDir() throws Exception { |
| LOG.info("testEmptyLogDir"); |
| slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER); |
| FileSystem fs = TEST_UTIL.getTestFileSystem(); |
| Path emptyLogDirPath = new Path(fs.getWorkingDirectory(), |
| UUID.randomUUID().toString()); |
| fs.mkdirs(emptyLogDirPath); |
| slm.splitLogDistributed(emptyLogDirPath); |
| assertFalse(fs.exists(emptyLogDirPath)); |
| } |
| |
| @Test (timeout = 60000) |
| public void testLogFilesAreArchived() throws Exception { |
| LOG.info("testLogFilesAreArchived"); |
| final SplitLogManager slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER); |
| FileSystem fs = TEST_UTIL.getTestFileSystem(); |
| Path dir = TEST_UTIL.getDataTestDirOnTestFS("testLogFilesAreArchived"); |
| conf.set(HConstants.HBASE_DIR, dir.toString()); |
| Path logDirPath = new Path(dir, UUID.randomUUID().toString()); |
| fs.mkdirs(logDirPath); |
| // create an empty log file |
| String logFile = ServerName.valueOf("foo", 1, 1).toString(); |
| fs.create(new Path(logDirPath, logFile)).close(); |
| |
| // spin up a thread mocking split done. |
| new Thread() { |
| @Override |
| public void run() { |
| boolean done = false; |
| while (!done) { |
| for (Map.Entry<String, Task> entry : slm.getTasks().entrySet()) { |
| final ServerName worker1 = ServerName.valueOf("worker1,1,1"); |
| SplitLogTask slt = new SplitLogTask.Done(worker1, RecoveryMode.LOG_SPLITTING); |
| boolean encounteredZKException = false; |
| try { |
| ZKUtil.setData(zkw, entry.getKey(), slt.toByteArray()); |
| } catch (KeeperException e) { |
| LOG.warn(e); |
| encounteredZKException = true; |
| } |
| if (!encounteredZKException) { |
| done = true; |
| } |
| } |
| } |
| }; |
| }.start(); |
| |
| slm.splitLogDistributed(logDirPath); |
| |
| assertFalse(fs.exists(logDirPath)); |
| } |
| |
| /** |
| * The following test case is aiming to test the situation when distributedLogReplay is turned off |
| * and restart a cluster there should no recovery regions in ZK left. |
| * @throws Exception |
| */ |
| @Test(timeout = 300000) |
| public void testRecoveryRegionRemovedFromZK() throws Exception { |
| LOG.info("testRecoveryRegionRemovedFromZK"); |
| conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false); |
| String nodePath = |
| ZKUtil.joinZNode(zkw.recoveringRegionsZNode, |
| HRegionInfo.FIRST_META_REGIONINFO.getEncodedName()); |
| ZKUtil.createSetData(zkw, nodePath, ZKUtil.positionToByteArray(0L)); |
| |
| slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER); |
| slm.removeStaleRecoveringRegions(null); |
| |
| List<String> recoveringRegions = |
| zkw.getRecoverableZooKeeper().getChildren(zkw.recoveringRegionsZNode, false); |
| |
| assertTrue("Recovery regions isn't cleaned", recoveringRegions.isEmpty()); |
| } |
| |
| @Ignore("DLR is broken by HBASE-12751") @Test(timeout=60000) |
| public void testGetPreviousRecoveryMode() throws Exception { |
| LOG.info("testGetPreviousRecoveryMode"); |
| SplitLogCounters.resetCounters(); |
| // Not actually enabling DLR for the cluster, just for the ZkCoordinatedStateManager to use. |
| // The test is just manipulating ZK manually anyways. |
| conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true); |
| |
| zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "testRecovery"), |
| new SplitLogTask.Unassigned( |
| ServerName.valueOf("mgr,1,1"), RecoveryMode.LOG_SPLITTING).toByteArray(), |
| Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); |
| |
| slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER); |
| LOG.info("Mode1=" + slm.getRecoveryMode()); |
| assertTrue(slm.isLogSplitting()); |
| zkw.getRecoverableZooKeeper().delete(ZKSplitLog.getEncodedNodeName(zkw, "testRecovery"), -1); |
| LOG.info("Mode2=" + slm.getRecoveryMode()); |
| slm.setRecoveryMode(false); |
| LOG.info("Mode3=" + slm.getRecoveryMode()); |
| assertTrue("Mode4=" + slm.getRecoveryMode(), slm.isLogReplaying()); |
| } |
| } |