blob: 2f990cb0b064f7e1946ca5fee991aff8745542fe [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.procedure;
import java.io.IOException;
import java.lang.Thread.UncaughtExceptionHandler;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.CallQueueTooBigException;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.ServerListener;
import org.apache.hadoop.hbase.master.ServerManager;
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher;
import org.apache.hadoop.hbase.regionserver.RegionServerAbortedException;
import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.collect.ArrayListMultimap;
import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RemoteProcedureRequest;
/**
* A remote procecdure dispatcher for regionservers.
*/
@InterfaceAudience.Private
public class RSProcedureDispatcher
extends RemoteProcedureDispatcher<MasterProcedureEnv, ServerName>
implements ServerListener {
private static final Logger LOG = LoggerFactory.getLogger(RSProcedureDispatcher.class);
public static final String RS_RPC_STARTUP_WAIT_TIME_CONF_KEY =
"hbase.regionserver.rpc.startup.waittime";
private static final int DEFAULT_RS_RPC_STARTUP_WAIT_TIME = 60000;
protected final MasterServices master;
private final long rsStartupWaitTime;
private MasterProcedureEnv procedureEnv;
public RSProcedureDispatcher(final MasterServices master) {
super(master.getConfiguration());
this.master = master;
this.rsStartupWaitTime = master.getConfiguration().getLong(
RS_RPC_STARTUP_WAIT_TIME_CONF_KEY, DEFAULT_RS_RPC_STARTUP_WAIT_TIME);
}
@Override
protected UncaughtExceptionHandler getUncaughtExceptionHandler() {
return new UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
LOG.error("Unexpected error caught, this may cause the procedure to hang forever", e);
}
};
}
@Override
public boolean start() {
if (!super.start()) {
return false;
}
if (master.isStopped()) {
LOG.debug("Stopped");
return false;
}
// Around startup, if failed, some of the below may be set back to null so NPE is possible.
ServerManager sm = master.getServerManager();
if (sm == null) {
LOG.debug("ServerManager is null");
return false;
}
sm.registerListener(this);
ProcedureExecutor<MasterProcedureEnv> pe = master.getMasterProcedureExecutor();
if (pe == null) {
LOG.debug("ProcedureExecutor is null");
return false;
}
this.procedureEnv = pe.getEnvironment();
if (this.procedureEnv == null) {
LOG.debug("ProcedureEnv is null; stopping={}", master.isStopping());
return false;
}
try {
for (ServerName serverName : sm.getOnlineServersList()) {
addNode(serverName);
}
} catch (Exception e) {
LOG.info("Failed start", e);
return false;
}
return true;
}
@Override
public boolean stop() {
if (!super.stop()) {
return false;
}
master.getServerManager().unregisterListener(this);
return true;
}
@Override
protected void remoteDispatch(final ServerName serverName,
final Set<RemoteProcedure> remoteProcedures) {
if (!master.getServerManager().isServerOnline(serverName)) {
// fail fast
submitTask(new DeadRSRemoteCall(serverName, remoteProcedures));
} else {
submitTask(new ExecuteProceduresRemoteCall(serverName, remoteProcedures));
}
}
@Override
protected void abortPendingOperations(final ServerName serverName,
final Set<RemoteProcedure> operations) {
// TODO: Replace with a ServerNotOnlineException()
final IOException e = new DoNotRetryIOException("server not online " + serverName);
for (RemoteProcedure proc: operations) {
proc.remoteCallFailed(procedureEnv, serverName, e);
}
}
@Override
public void serverAdded(final ServerName serverName) {
addNode(serverName);
}
@Override
public void serverRemoved(final ServerName serverName) {
removeNode(serverName);
}
private interface RemoteProcedureResolver {
void dispatchOpenRequests(MasterProcedureEnv env, List<RegionOpenOperation> operations);
void dispatchCloseRequests(MasterProcedureEnv env, List<RegionCloseOperation> operations);
void dispatchServerOperations(MasterProcedureEnv env, List<ServerOperation> operations);
}
/**
* Fetches {@link org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteOperation}s
* from the given {@code remoteProcedures} and groups them by class of the returned operation.
* Then {@code resolver} is used to dispatch {@link RegionOpenOperation}s and
* {@link RegionCloseOperation}s.
* @param serverName RegionServer to which the remote operations are sent
* @param operations Remote procedures which are dispatched to the given server
* @param resolver Used to dispatch remote procedures to given server.
*/
public void splitAndResolveOperation(ServerName serverName, Set<RemoteProcedure> operations,
RemoteProcedureResolver resolver) {
MasterProcedureEnv env = master.getMasterProcedureExecutor().getEnvironment();
ArrayListMultimap<Class<?>, RemoteOperation> reqsByType =
buildAndGroupRequestByType(env, serverName, operations);
List<RegionOpenOperation> openOps = fetchType(reqsByType, RegionOpenOperation.class);
if (!openOps.isEmpty()) {
resolver.dispatchOpenRequests(env, openOps);
}
List<RegionCloseOperation> closeOps = fetchType(reqsByType, RegionCloseOperation.class);
if (!closeOps.isEmpty()) {
resolver.dispatchCloseRequests(env, closeOps);
}
List<ServerOperation> refreshOps = fetchType(reqsByType, ServerOperation.class);
if (!refreshOps.isEmpty()) {
resolver.dispatchServerOperations(env, refreshOps);
}
if (!reqsByType.isEmpty()) {
LOG.warn("unknown request type in the queue: " + reqsByType);
}
}
private class DeadRSRemoteCall extends ExecuteProceduresRemoteCall {
public DeadRSRemoteCall(ServerName serverName, Set<RemoteProcedure> remoteProcedures) {
super(serverName, remoteProcedures);
}
@Override
public void run() {
remoteCallFailed(procedureEnv,
new RegionServerStoppedException("Server " + getServerName() + " is not online"));
}
}
// ==========================================================================
// Compatibility calls
// ==========================================================================
protected class ExecuteProceduresRemoteCall implements RemoteProcedureResolver, Runnable {
private final ServerName serverName;
private final Set<RemoteProcedure> remoteProcedures;
private int numberOfAttemptsSoFar = 0;
private long maxWaitTime = -1;
private final long rsRpcRetryInterval;
private static final String RS_RPC_RETRY_INTERVAL_CONF_KEY =
"hbase.regionserver.rpc.retry.interval";
private static final int DEFAULT_RS_RPC_RETRY_INTERVAL = 100;
private ExecuteProceduresRequest.Builder request = null;
public ExecuteProceduresRemoteCall(final ServerName serverName,
final Set<RemoteProcedure> remoteProcedures) {
this.serverName = serverName;
this.remoteProcedures = remoteProcedures;
this.rsRpcRetryInterval = master.getConfiguration().getLong(RS_RPC_RETRY_INTERVAL_CONF_KEY,
DEFAULT_RS_RPC_RETRY_INTERVAL);
}
private AsyncRegionServerAdmin getRsAdmin() throws IOException {
return master.getAsyncClusterConnection().getRegionServerAdmin(serverName);
}
protected final ServerName getServerName() {
return serverName;
}
private boolean scheduleForRetry(IOException e) {
LOG.debug("Request to {} failed, try={}", serverName, numberOfAttemptsSoFar, e);
// Should we wait a little before retrying? If the server is starting it's yes.
if (e instanceof ServerNotRunningYetException) {
long remainingTime = getMaxWaitTime() - EnvironmentEdgeManager.currentTime();
if (remainingTime > 0) {
LOG.warn("Waiting a little before retrying {}, try={}, can wait up to {}ms",
serverName, numberOfAttemptsSoFar, remainingTime);
numberOfAttemptsSoFar++;
// Retry every rsRpcRetryInterval millis up to maximum wait time.
submitTask(this, rsRpcRetryInterval, TimeUnit.MILLISECONDS);
return true;
}
LOG.warn("{} is throwing ServerNotRunningYetException for {}ms; trying another server",
serverName, getMaxWaitTime());
return false;
}
if (e instanceof DoNotRetryIOException) {
LOG.warn("{} tells us DoNotRetry due to {}, try={}, give up", serverName,
e.toString(), numberOfAttemptsSoFar);
return false;
}
// This exception is thrown in the rpc framework, where we can make sure that the call has not
// been executed yet, so it is safe to mark it as fail. Especially for open a region, we'd
// better choose another region server.
// Notice that, it is safe to quit only if this is the first time we send request to region
// server. Maybe the region server has accepted our request the first time, and then there is
// a network error which prevents we receive the response, and the second time we hit a
// CallQueueTooBigException, obviously it is not safe to quit here, otherwise it may lead to a
// double assign...
if (e instanceof CallQueueTooBigException && numberOfAttemptsSoFar == 0) {
LOG.warn("request to {} failed due to {}, try={}, this usually because" +
" server is overloaded, give up", serverName, e.toString(), numberOfAttemptsSoFar);
return false;
}
// Always retry for other exception types if the region server is not dead yet.
if (!master.getServerManager().isServerOnline(serverName)) {
LOG.warn("Request to {} failed due to {}, try={} and the server is not online, give up",
serverName, e.toString(), numberOfAttemptsSoFar);
return false;
}
if (e instanceof RegionServerAbortedException || e instanceof RegionServerStoppedException) {
// A better way is to return true here to let the upper layer quit, and then schedule a
// background task to check whether the region server is dead. And if it is dead, call
// remoteCallFailed to tell the upper layer. Keep retrying here does not lead to incorrect
// result, but waste some resources.
LOG.warn("{} is aborted or stopped, for safety we still need to" +
" wait until it is fully dead, try={}", serverName, numberOfAttemptsSoFar);
} else {
LOG.warn("request to {} failed due to {}, try={}, retrying...", serverName,
e.toString(), numberOfAttemptsSoFar);
}
numberOfAttemptsSoFar++;
// Add some backoff here as the attempts rise otherwise if a stuck condition, will fill logs
// with failed attempts. None of our backoff classes -- RetryCounter or ClientBackoffPolicy
// -- fit here nicely so just do something simple; increment by rsRpcRetryInterval millis *
// retry^2 on each try
// up to max of 10 seconds (don't want to back off too much in case of situation change).
submitTask(this,
Math.min(rsRpcRetryInterval * (this.numberOfAttemptsSoFar * this.numberOfAttemptsSoFar),
10 * 1000),
TimeUnit.MILLISECONDS);
return true;
}
private long getMaxWaitTime() {
if (this.maxWaitTime < 0) {
// This is the max attempts, not retries, so it should be at least 1.
this.maxWaitTime = EnvironmentEdgeManager.currentTime() + rsStartupWaitTime;
}
return this.maxWaitTime;
}
private IOException unwrapException(IOException e) {
if (e instanceof RemoteException) {
e = ((RemoteException)e).unwrapRemoteException();
}
return e;
}
@Override
public void run() {
request = ExecuteProceduresRequest.newBuilder();
if (LOG.isTraceEnabled()) {
LOG.trace("Building request with operations count=" + remoteProcedures.size());
}
splitAndResolveOperation(getServerName(), remoteProcedures, this);
try {
sendRequest(getServerName(), request.build());
} catch (IOException e) {
e = unwrapException(e);
// TODO: In the future some operation may want to bail out early.
// TODO: How many times should we retry (use numberOfAttemptsSoFar)
if (!scheduleForRetry(e)) {
remoteCallFailed(procedureEnv, e);
}
}
}
@Override
public void dispatchOpenRequests(final MasterProcedureEnv env,
final List<RegionOpenOperation> operations) {
request.addOpenRegion(buildOpenRegionRequest(env, getServerName(), operations));
}
@Override
public void dispatchCloseRequests(final MasterProcedureEnv env,
final List<RegionCloseOperation> operations) {
for (RegionCloseOperation op: operations) {
request.addCloseRegion(op.buildCloseRegionRequest(getServerName()));
}
}
@Override
public void dispatchServerOperations(MasterProcedureEnv env, List<ServerOperation> operations) {
operations.stream().map(o -> o.buildRequest()).forEachOrdered(request::addProc);
}
// will be overridden in test.
protected ExecuteProceduresResponse sendRequest(final ServerName serverName,
final ExecuteProceduresRequest request) throws IOException {
return FutureUtils.get(getRsAdmin().executeProcedures(request));
}
protected final void remoteCallFailed(final MasterProcedureEnv env, final IOException e) {
for (RemoteProcedure proc : remoteProcedures) {
proc.remoteCallFailed(env, getServerName(), e);
}
}
}
private static OpenRegionRequest buildOpenRegionRequest(final MasterProcedureEnv env,
final ServerName serverName, final List<RegionOpenOperation> operations) {
final OpenRegionRequest.Builder builder = OpenRegionRequest.newBuilder();
builder.setServerStartCode(serverName.getStartcode());
builder.setMasterSystemTime(EnvironmentEdgeManager.currentTime());
for (RegionOpenOperation op: operations) {
builder.addOpenInfo(op.buildRegionOpenInfoRequest(env));
}
return builder.build();
}
// ==========================================================================
// RPC Messages
// - ServerOperation: refreshConfig, grant, revoke, ... (TODO)
// - RegionOperation: open, close, flush, snapshot, ...
// ==========================================================================
public static final class ServerOperation extends RemoteOperation {
private final long procId;
private final Class<?> rsProcClass;
private final byte[] rsProcData;
public ServerOperation(RemoteProcedure remoteProcedure, long procId, Class<?> rsProcClass,
byte[] rsProcData) {
super(remoteProcedure);
this.procId = procId;
this.rsProcClass = rsProcClass;
this.rsProcData = rsProcData;
}
public RemoteProcedureRequest buildRequest() {
return RemoteProcedureRequest.newBuilder().setProcId(procId)
.setProcClass(rsProcClass.getName()).setProcData(ByteString.copyFrom(rsProcData)).build();
}
}
public static abstract class RegionOperation extends RemoteOperation {
protected final RegionInfo regionInfo;
protected final long procId;
protected RegionOperation(RemoteProcedure remoteProcedure, RegionInfo regionInfo, long procId) {
super(remoteProcedure);
this.regionInfo = regionInfo;
this.procId = procId;
}
}
public static class RegionOpenOperation extends RegionOperation {
public RegionOpenOperation(RemoteProcedure remoteProcedure, RegionInfo regionInfo,
long procId) {
super(remoteProcedure, regionInfo, procId);
}
public OpenRegionRequest.RegionOpenInfo buildRegionOpenInfoRequest(
final MasterProcedureEnv env) {
return RequestConverter.buildRegionOpenInfo(regionInfo,
env.getAssignmentManager().getFavoredNodes(regionInfo), procId);
}
}
public static class RegionCloseOperation extends RegionOperation {
private final ServerName destinationServer;
public RegionCloseOperation(RemoteProcedure remoteProcedure, RegionInfo regionInfo, long procId,
ServerName destinationServer) {
super(remoteProcedure, regionInfo, procId);
this.destinationServer = destinationServer;
}
public ServerName getDestinationServer() {
return destinationServer;
}
public CloseRegionRequest buildCloseRegionRequest(final ServerName serverName) {
return ProtobufUtil.buildCloseRegionRequest(serverName, regionInfo.getRegionName(),
getDestinationServer(), procId);
}
}
}