| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.hbase.procedure2; |
| |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertFalse; |
| import static org.junit.Assert.assertTrue; |
| |
| import java.io.IOException; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import org.apache.hadoop.fs.FileStatus; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.hbase.HBaseClassTestRule; |
| import org.apache.hadoop.hbase.HBaseCommonTestingUtility; |
| import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; |
| import org.apache.hadoop.hbase.testclassification.MasterTests; |
| import org.apache.hadoop.hbase.testclassification.MediumTests; |
| import org.apache.hadoop.hbase.util.Bytes; |
| import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; |
| import org.apache.hadoop.hbase.util.Threads; |
| import org.junit.After; |
| import org.junit.Before; |
| import org.junit.ClassRule; |
| import org.junit.Test; |
| import org.junit.experimental.categories.Category; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import org.apache.hbase.thirdparty.com.google.protobuf.Int32Value; |
| |
| @Category({MasterTests.class, MediumTests.class}) |
| public class TestProcedureRecovery { |
| @ClassRule |
| public static final HBaseClassTestRule CLASS_RULE = |
| HBaseClassTestRule.forClass(TestProcedureRecovery.class); |
| |
| private static final Logger LOG = LoggerFactory.getLogger(TestProcedureRecovery.class); |
| |
| private static final int PROCEDURE_EXECUTOR_SLOTS = 1; |
| |
| private static TestProcEnv procEnv; |
| private static ProcedureExecutor<TestProcEnv> procExecutor; |
| private static ProcedureStore procStore; |
| private static int procSleepInterval; |
| |
| private HBaseCommonTestingUtility htu; |
| private FileSystem fs; |
| private Path testDir; |
| private Path logDir; |
| |
| @Before |
| public void setUp() throws IOException { |
| htu = new HBaseCommonTestingUtility(); |
| testDir = htu.getDataTestDir(); |
| fs = testDir.getFileSystem(htu.getConfiguration()); |
| assertTrue(testDir.depth() > 1); |
| |
| logDir = new Path(testDir, "proc-logs"); |
| procEnv = new TestProcEnv(); |
| procStore = ProcedureTestingUtility.createStore(htu.getConfiguration(), logDir); |
| procExecutor = new ProcedureExecutor<>(htu.getConfiguration(), procEnv, procStore); |
| procExecutor.testing = new ProcedureExecutor.Testing(); |
| procStore.start(PROCEDURE_EXECUTOR_SLOTS); |
| ProcedureTestingUtility.initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, true); |
| procSleepInterval = 0; |
| } |
| |
| @After |
| public void tearDown() throws IOException { |
| procExecutor.stop(); |
| procStore.stop(false); |
| fs.delete(logDir, true); |
| } |
| |
| private void restart() throws Exception { |
| dumpLogDirState(); |
| ProcedureTestingUtility.restart(procExecutor); |
| dumpLogDirState(); |
| } |
| |
| public static class TestSingleStepProcedure extends SequentialProcedure<TestProcEnv> { |
| private int step = 0; |
| |
| public TestSingleStepProcedure() { } |
| |
| @Override |
| protected Procedure[] execute(TestProcEnv env) throws InterruptedException { |
| env.waitOnLatch(); |
| LOG.debug("execute procedure " + this + " step=" + step); |
| step++; |
| setResult(Bytes.toBytes(step)); |
| return null; |
| } |
| |
| @Override |
| protected void rollback(TestProcEnv env) { } |
| |
| @Override |
| protected boolean abort(TestProcEnv env) { |
| return true; |
| } |
| } |
| |
| public static class BaseTestStepProcedure extends SequentialProcedure<TestProcEnv> { |
| private AtomicBoolean abort = new AtomicBoolean(false); |
| private int step = 0; |
| |
| @Override |
| protected Procedure[] execute(TestProcEnv env) throws InterruptedException { |
| env.waitOnLatch(); |
| LOG.debug("execute procedure " + this + " step=" + step); |
| ProcedureTestingUtility.toggleKillBeforeStoreUpdate(procExecutor); |
| step++; |
| Threads.sleepWithoutInterrupt(procSleepInterval); |
| if (isAborted()) { |
| setFailure(new RemoteProcedureException(getClass().getName(), |
| new ProcedureAbortedException( |
| "got an abort at " + getClass().getName() + " step=" + step))); |
| return null; |
| } |
| return null; |
| } |
| |
| @Override |
| protected void rollback(TestProcEnv env) { |
| LOG.debug("rollback procedure " + this + " step=" + step); |
| ProcedureTestingUtility.toggleKillBeforeStoreUpdate(procExecutor); |
| step++; |
| } |
| |
| @Override |
| protected boolean abort(TestProcEnv env) { |
| abort.set(true); |
| return true; |
| } |
| |
| private boolean isAborted() { |
| boolean aborted = abort.get(); |
| BaseTestStepProcedure proc = this; |
| while (proc.hasParent() && !aborted) { |
| proc = (BaseTestStepProcedure)procExecutor.getProcedure(proc.getParentProcId()); |
| aborted = proc.isAborted(); |
| } |
| return aborted; |
| } |
| } |
| |
| public static class TestMultiStepProcedure extends BaseTestStepProcedure { |
| public TestMultiStepProcedure() { } |
| |
| @Override |
| public Procedure[] execute(TestProcEnv env) throws InterruptedException { |
| super.execute(env); |
| return isFailed() ? null : new Procedure[] { new Step1Procedure() }; |
| } |
| |
| public static class Step1Procedure extends BaseTestStepProcedure { |
| public Step1Procedure() { } |
| |
| @Override |
| protected Procedure[] execute(TestProcEnv env) throws InterruptedException { |
| super.execute(env); |
| return isFailed() ? null : new Procedure[] { new Step2Procedure() }; |
| } |
| } |
| |
| public static class Step2Procedure extends BaseTestStepProcedure { |
| public Step2Procedure() { } |
| } |
| } |
| |
| @Test |
| public void testNoopLoad() throws Exception { |
| restart(); |
| } |
| |
| @Test |
| public void testSingleStepProcRecovery() throws Exception { |
| Procedure proc = new TestSingleStepProcedure(); |
| procExecutor.testing.killBeforeStoreUpdate = true; |
| long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc); |
| assertFalse(procExecutor.isRunning()); |
| procExecutor.testing.killBeforeStoreUpdate = false; |
| |
| // Restart and verify that the procedures restart |
| long restartTs = EnvironmentEdgeManager.currentTime(); |
| restart(); |
| waitProcedure(procId); |
| Procedure<?> result = procExecutor.getResult(procId); |
| assertTrue(result.getLastUpdate() > restartTs); |
| ProcedureTestingUtility.assertProcNotFailed(result); |
| assertEquals(1, Bytes.toInt(result.getResult())); |
| long resultTs = result.getLastUpdate(); |
| |
| // Verify that after another restart the result is still there |
| restart(); |
| result = procExecutor.getResult(procId); |
| ProcedureTestingUtility.assertProcNotFailed(result); |
| assertEquals(resultTs, result.getLastUpdate()); |
| assertEquals(1, Bytes.toInt(result.getResult())); |
| } |
| |
| @Test |
| public void testMultiStepProcRecovery() throws Exception { |
| // Step 0 - kill |
| Procedure proc = new TestMultiStepProcedure(); |
| long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc); |
| assertFalse(procExecutor.isRunning()); |
| |
| // Step 0 exec && Step 1 - kill |
| restart(); |
| waitProcedure(procId); |
| ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId); |
| assertFalse(procExecutor.isRunning()); |
| |
| // Step 1 exec && step 2 - kill |
| restart(); |
| waitProcedure(procId); |
| ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId); |
| assertFalse(procExecutor.isRunning()); |
| |
| // Step 2 exec |
| restart(); |
| waitProcedure(procId); |
| assertTrue(procExecutor.isRunning()); |
| |
| // The procedure is completed |
| Procedure<?> result = procExecutor.getResult(procId); |
| ProcedureTestingUtility.assertProcNotFailed(result); |
| } |
| |
| @Test |
| public void testMultiStepRollbackRecovery() throws Exception { |
| // Step 0 - kill |
| Procedure proc = new TestMultiStepProcedure(); |
| long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc); |
| assertFalse(procExecutor.isRunning()); |
| |
| // Step 0 exec && Step 1 - kill |
| restart(); |
| waitProcedure(procId); |
| ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId); |
| assertFalse(procExecutor.isRunning()); |
| |
| // Step 1 exec && step 2 - kill |
| restart(); |
| waitProcedure(procId); |
| ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId); |
| assertFalse(procExecutor.isRunning()); |
| |
| // Step 2 exec - rollback - kill |
| procSleepInterval = 2500; |
| restart(); |
| assertTrue(procExecutor.abort(procId)); |
| waitProcedure(procId); |
| assertFalse(procExecutor.isRunning()); |
| |
| // rollback - kill |
| restart(); |
| waitProcedure(procId); |
| ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId); |
| assertFalse(procExecutor.isRunning()); |
| |
| // rollback - complete |
| restart(); |
| waitProcedure(procId); |
| ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId); |
| assertFalse(procExecutor.isRunning()); |
| |
| // Restart the executor and get the result |
| restart(); |
| waitProcedure(procId); |
| |
| // The procedure is completed |
| Procedure<?> result = procExecutor.getResult(procId); |
| ProcedureTestingUtility.assertIsAbortException(result); |
| } |
| |
| public static class TestStateMachineProcedure |
| extends StateMachineProcedure<TestProcEnv, TestStateMachineProcedure.State> { |
| enum State { STATE_1, STATE_2, STATE_3, DONE } |
| |
| public TestStateMachineProcedure() {} |
| |
| public TestStateMachineProcedure(final boolean testSubmitChildProc) { |
| this.submitChildProc = testSubmitChildProc; |
| } |
| |
| private AtomicBoolean aborted = new AtomicBoolean(false); |
| private int iResult = 0; |
| private boolean submitChildProc = false; |
| |
| @Override |
| protected StateMachineProcedure.Flow executeFromState(TestProcEnv env, State state) { |
| switch (state) { |
| case STATE_1: |
| LOG.info("execute step 1 " + this); |
| setNextState(State.STATE_2); |
| iResult += 3; |
| break; |
| case STATE_2: |
| LOG.info("execute step 2 " + this); |
| if (submitChildProc) { |
| addChildProcedure(new TestStateMachineProcedure(), new TestStateMachineProcedure()); |
| setNextState(State.DONE); |
| } else { |
| setNextState(State.STATE_3); |
| } |
| iResult += 5; |
| break; |
| case STATE_3: |
| LOG.info("execute step 3 " + this); |
| Threads.sleepWithoutInterrupt(procSleepInterval); |
| if (aborted.get()) { |
| LOG.info("aborted step 3 " + this); |
| setAbortFailure("test", "aborted"); |
| break; |
| } |
| setNextState(State.DONE); |
| iResult += 7; |
| break; |
| case DONE: |
| if (submitChildProc) { |
| addChildProcedure(new TestStateMachineProcedure()); |
| } |
| iResult += 11; |
| setResult(Bytes.toBytes(iResult)); |
| return Flow.NO_MORE_STATE; |
| default: |
| throw new UnsupportedOperationException(); |
| } |
| return Flow.HAS_MORE_STATE; |
| } |
| |
| @Override |
| protected void rollbackState(TestProcEnv env, final State state) { |
| switch (state) { |
| case STATE_1: |
| LOG.info("rollback step 1 " + this); |
| break; |
| case STATE_2: |
| LOG.info("rollback step 2 " + this); |
| break; |
| case STATE_3: |
| LOG.info("rollback step 3 " + this); |
| break; |
| default: |
| throw new UnsupportedOperationException(); |
| } |
| } |
| |
| @Override |
| protected State getState(final int stateId) { |
| return State.values()[stateId]; |
| } |
| |
| @Override |
| protected int getStateId(final State state) { |
| return state.ordinal(); |
| } |
| |
| @Override |
| protected State getInitialState() { |
| return State.STATE_1; |
| } |
| |
| @Override |
| protected boolean abort(TestProcEnv env) { |
| aborted.set(true); |
| return true; |
| } |
| |
| @Override |
| protected void serializeStateData(ProcedureStateSerializer serializer) |
| throws IOException { |
| super.serializeStateData(serializer); |
| Int32Value.Builder builder = Int32Value.newBuilder().setValue(iResult); |
| serializer.serialize(builder.build()); |
| } |
| |
| @Override |
| protected void deserializeStateData(ProcedureStateSerializer serializer) |
| throws IOException { |
| super.deserializeStateData(serializer); |
| Int32Value value = serializer.deserialize(Int32Value.class); |
| iResult = value.getValue(); |
| } |
| } |
| |
| @Test |
| public void testStateMachineMultipleLevel() throws Exception { |
| long procId = procExecutor.submitProcedure(new TestStateMachineProcedure(true)); |
| // Wait the completion |
| ProcedureTestingUtility.waitProcedure(procExecutor, procId); |
| Procedure<?> result = procExecutor.getResult(procId); |
| ProcedureTestingUtility.assertProcNotFailed(result); |
| assertEquals(19, Bytes.toInt(result.getResult())); |
| assertEquals(4, procExecutor.getLastProcId()); |
| } |
| |
| @Test |
| public void testStateMachineRecovery() throws Exception { |
| ProcedureTestingUtility.setToggleKillBeforeStoreUpdate(procExecutor, true); |
| ProcedureTestingUtility.setKillBeforeStoreUpdate(procExecutor, true); |
| |
| // Step 1 - kill |
| Procedure proc = new TestStateMachineProcedure(); |
| long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc); |
| assertFalse(procExecutor.isRunning()); |
| |
| // Step 1 exec && Step 2 - kill |
| restart(); |
| waitProcedure(procId); |
| ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId); |
| assertFalse(procExecutor.isRunning()); |
| |
| // Step 2 exec && step 3 - kill |
| restart(); |
| waitProcedure(procId); |
| ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId); |
| assertFalse(procExecutor.isRunning()); |
| |
| // Step 3 exec |
| restart(); |
| waitProcedure(procId); |
| ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId); |
| assertFalse(procExecutor.isRunning()); |
| |
| restart(); |
| waitProcedure(procId); |
| assertTrue(procExecutor.isRunning()); |
| |
| // The procedure is completed |
| Procedure<?> result = procExecutor.getResult(procId); |
| ProcedureTestingUtility.assertProcNotFailed(result); |
| assertEquals(26, Bytes.toInt(result.getResult())); |
| } |
| |
| @Test |
| public void testStateMachineRollbackRecovery() throws Exception { |
| ProcedureTestingUtility.setToggleKillBeforeStoreUpdate(procExecutor, true); |
| ProcedureTestingUtility.setKillBeforeStoreUpdate(procExecutor, true); |
| |
| // Step 1 - kill |
| Procedure proc = new TestStateMachineProcedure(); |
| long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc); |
| ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId); |
| assertFalse(procExecutor.isRunning()); |
| |
| // Step 1 exec && Step 2 - kill |
| restart(); |
| waitProcedure(procId); |
| ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId); |
| assertFalse(procExecutor.isRunning()); |
| |
| // Step 2 exec && step 3 - kill |
| restart(); |
| waitProcedure(procId); |
| ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId); |
| assertFalse(procExecutor.isRunning()); |
| |
| // Step 3 exec - rollback step 3 - kill |
| procSleepInterval = 2500; |
| restart(); |
| assertTrue(procExecutor.abort(procId)); |
| waitProcedure(procId); |
| ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId); |
| assertFalse(procExecutor.isRunning()); |
| |
| // Rollback step 3 - rollback step 2 - kill |
| restart(); |
| waitProcedure(procId); |
| assertFalse(procExecutor.isRunning()); |
| ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId); |
| |
| // Rollback step 2 - step 1 - kill |
| restart(); |
| waitProcedure(procId); |
| assertFalse(procExecutor.isRunning()); |
| ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId); |
| |
| // Rollback step 1 - complete |
| restart(); |
| waitProcedure(procId); |
| assertTrue(procExecutor.isRunning()); |
| |
| // The procedure is completed |
| Procedure<?> result = procExecutor.getResult(procId); |
| ProcedureTestingUtility.assertIsAbortException(result); |
| } |
| |
| private void waitProcedure(final long procId) { |
| ProcedureTestingUtility.waitProcedure(procExecutor, procId); |
| dumpLogDirState(); |
| } |
| |
| private void dumpLogDirState() { |
| try { |
| FileStatus[] files = fs.listStatus(logDir); |
| if (files != null && files.length > 0) { |
| for (FileStatus file: files) { |
| assertTrue(file.toString(), file.isFile()); |
| LOG.debug("log file " + file.getPath() + " size=" + file.getLen()); |
| } |
| } else { |
| LOG.debug("no files under: " + logDir); |
| } |
| } catch (IOException e) { |
| LOG.warn("Unable to dump " + logDir, e); |
| } |
| } |
| |
| private static class TestProcEnv { |
| private CountDownLatch latch = null; |
| |
| /** |
| * set/unset a latch. every procedure execute() step will wait on the latch if any. |
| */ |
| public void setWaitLatch(CountDownLatch latch) { |
| this.latch = latch; |
| } |
| |
| public void waitOnLatch() throws InterruptedException { |
| if (latch != null) { |
| latch.await(); |
| } |
| } |
| } |
| } |