blob: 14342d469d3fb7e6021da0f7ccfb650f5510a773 [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.procedure;
import static org.apache.hadoop.hbase.master.procedure.ServerProcedureInterface.ServerOperationType.SWITCH_RPC_THROTTLE;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.NavigableMap;
import java.util.Optional;
import java.util.Set;
import java.util.SortedSet;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
import org.apache.hadoop.hbase.master.assignment.MockMasterServices;
import org.apache.hadoop.hbase.master.assignment.OpenRegionProcedure;
import org.apache.hadoop.hbase.master.assignment.TransitRegionStateProcedure;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher;
import org.apache.hadoop.hbase.procedure2.RemoteProcedureException;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.ExpectedException;
import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
@Category({ MasterTests.class, MediumTests.class })
public class TestServerRemoteProcedure {
private static final Logger LOG = LoggerFactory.getLogger(TestServerRemoteProcedure.class);
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestServerRemoteProcedure.class);
@Rule
public TestName name = new TestName();
@Rule
public final ExpectedException exception = ExpectedException.none();
protected HBaseTestingUtility util;
protected MockRSProcedureDispatcher rsDispatcher;
protected MockMasterServices master;
protected AssignmentManager am;
protected NavigableMap<ServerName, SortedSet<byte[]>> regionsToRegionServers =
new ConcurrentSkipListMap<>();
// Simple executor to run some simple tasks.
protected ScheduledExecutorService executor;
@Before
public void setUp() throws Exception {
util = new HBaseTestingUtility();
this.executor = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder()
.setUncaughtExceptionHandler((t, e) -> LOG.warn("Uncaught: ", e)).build());
master = new MockMasterServices(util.getConfiguration(), this.regionsToRegionServers);
rsDispatcher = new MockRSProcedureDispatcher(master);
rsDispatcher.setMockRsExecutor(new NoopRSExecutor());
master.start(2, rsDispatcher);
am = master.getAssignmentManager();
master.getServerManager().getOnlineServersList().stream()
.forEach(serverName -> am.getRegionStates().getOrCreateServer(serverName));
}
@After
public void tearDown() throws Exception {
master.stop("tearDown");
this.executor.shutdownNow();
}
@Test
public void testSplitWALAndCrashBeforeResponse() throws Exception {
ServerName worker = master.getServerManager().getOnlineServersList().get(0);
ServerName crashedWorker = master.getServerManager().getOnlineServersList().get(1);
ServerRemoteProcedure splitWALRemoteProcedure =
new SplitWALRemoteProcedure(worker, crashedWorker, "test");
Future<byte[]> future = submitProcedure(splitWALRemoteProcedure);
Thread.sleep(2000);
master.getServerManager().expireServer(worker);
// if remoteCallFailed is called for this procedure, this procedure should be finished.
future.get(5000, TimeUnit.MILLISECONDS);
Assert.assertTrue(splitWALRemoteProcedure.isSuccess());
}
@Test
public void testRemoteCompleteAndFailedAtTheSameTime() throws Exception {
ServerName worker = master.getServerManager().getOnlineServersList().get(0);
ServerRemoteProcedure noopServerRemoteProcedure = new NoopServerRemoteProcedure(worker);
Future<byte[]> future = submitProcedure(noopServerRemoteProcedure);
Thread.sleep(2000);
// complete the process and fail the process at the same time
ExecutorService threadPool = Executors.newFixedThreadPool(2);
threadPool.execute(() -> noopServerRemoteProcedure
.remoteOperationDone(master.getMasterProcedureExecutor().getEnvironment(), null));
threadPool.execute(() -> noopServerRemoteProcedure.remoteCallFailed(
master.getMasterProcedureExecutor().getEnvironment(), worker, new IOException()));
future.get(2000, TimeUnit.MILLISECONDS);
Assert.assertTrue(noopServerRemoteProcedure.isSuccess());
}
@Test
public void testRegionOpenProcedureIsNotHandledByDispatcher() throws Exception {
TableName tableName = TableName.valueOf("testRegionOpenProcedureIsNotHandledByDisPatcher");
RegionInfo hri = RegionInfoBuilder.newBuilder(tableName).setStartKey(Bytes.toBytes(1))
.setEndKey(Bytes.toBytes(2)).setSplit(false).setRegionId(0).build();
MasterProcedureEnv env = master.getMasterProcedureExecutor().getEnvironment();
env.getAssignmentManager().getRegionStates().getOrCreateRegionStateNode(hri);
TransitRegionStateProcedure proc = TransitRegionStateProcedure.assign(env, hri, null);
ServerName worker = master.getServerManager().getOnlineServersList().get(0);
OpenRegionProcedure openRegionProcedure = new OpenRegionProcedure(proc, hri, worker);
Future<byte[]> future = submitProcedure(openRegionProcedure);
Thread.sleep(2000);
rsDispatcher.removeNode(worker);
try {
future.get(2000, TimeUnit.MILLISECONDS);
fail();
} catch (TimeoutException e) {
LOG.info("timeout is expected");
}
Assert.assertFalse(openRegionProcedure.isFinished());
}
private Future<byte[]> submitProcedure(final Procedure<MasterProcedureEnv> proc) {
return ProcedureSyncWait.submitProcedure(master.getMasterProcedureExecutor(), proc);
}
private static class NoopServerRemoteProcedure extends ServerRemoteProcedure
implements ServerProcedureInterface {
public NoopServerRemoteProcedure(ServerName targetServer) {
this.targetServer = targetServer;
}
@Override
protected void rollback(MasterProcedureEnv env) throws IOException, InterruptedException {
return;
}
@Override
protected boolean abort(MasterProcedureEnv env) {
return false;
}
@Override
protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
return;
}
@Override
protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
return;
}
@Override
public Optional<RemoteProcedureDispatcher.RemoteOperation> remoteCallBuild(
MasterProcedureEnv env, ServerName serverName) {
return Optional
.of(new RSProcedureDispatcher.ServerOperation(null, 0L, this.getClass(), new byte[0]));
}
@Override
public synchronized void remoteOperationCompleted(MasterProcedureEnv env) {
complete(env, null);
}
@Override
public synchronized void remoteOperationFailed(MasterProcedureEnv env,
RemoteProcedureException error) {
complete(env, error);
}
@Override
public void complete(MasterProcedureEnv env, Throwable error) {
this.succ = true;
return;
}
@Override
public ServerName getServerName() {
return targetServer;
}
@Override
public boolean hasMetaTableRegion() {
return false;
}
@Override
public ServerOperationType getServerOperationType() {
return SWITCH_RPC_THROTTLE;
}
}
protected interface MockRSExecutor {
AdminProtos.ExecuteProceduresResponse sendRequest(ServerName server,
AdminProtos.ExecuteProceduresRequest req) throws IOException;
}
protected static class NoopRSExecutor implements MockRSExecutor {
@Override
public AdminProtos.ExecuteProceduresResponse sendRequest(ServerName server,
AdminProtos.ExecuteProceduresRequest req) throws IOException {
if (req.getOpenRegionCount() > 0) {
for (AdminProtos.OpenRegionRequest request : req.getOpenRegionList()) {
for (AdminProtos.OpenRegionRequest.RegionOpenInfo openReq : request.getOpenInfoList()) {
execOpenRegion(server, openReq);
}
}
}
return AdminProtos.ExecuteProceduresResponse.getDefaultInstance();
}
protected AdminProtos.OpenRegionResponse.RegionOpeningState execOpenRegion(ServerName server,
AdminProtos.OpenRegionRequest.RegionOpenInfo regionInfo) throws IOException {
return null;
}
}
protected static class MockRSProcedureDispatcher extends RSProcedureDispatcher {
private MockRSExecutor mockRsExec;
public MockRSProcedureDispatcher(final MasterServices master) {
super(master);
}
public void setMockRsExecutor(final MockRSExecutor mockRsExec) {
this.mockRsExec = mockRsExec;
}
@Override
protected void remoteDispatch(ServerName serverName,
@SuppressWarnings("rawtypes") Set<RemoteProcedure> remoteProcedures) {
submitTask(new MockRSProcedureDispatcher.MockRemoteCall(serverName, remoteProcedures));
}
private class MockRemoteCall extends ExecuteProceduresRemoteCall {
public MockRemoteCall(final ServerName serverName,
@SuppressWarnings("rawtypes") final Set<RemoteProcedure> operations) {
super(serverName, operations);
}
@Override
protected AdminProtos.ExecuteProceduresResponse sendRequest(final ServerName serverName,
final AdminProtos.ExecuteProceduresRequest request) throws IOException {
return mockRsExec.sendRequest(serverName, request);
}
}
}
}