blob: 720f25b57d9b12e0d27d7fcb59edce821727a8bb [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.procedure;
import java.io.IOException;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.errorhandling.ForeignException;
import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hbase.thirdparty.com.google.common.collect.MapMaker;
/**
* This is the master side of a distributed complex procedure execution.
* <p>
* The {@link Procedure} is generic and subclassing or customization shouldn't be
* necessary -- any customization should happen just in {@link Subprocedure}s.
*/
@InterfaceAudience.Private
public class ProcedureCoordinator {
private static final Logger LOG = LoggerFactory.getLogger(ProcedureCoordinator.class);
final static long KEEP_ALIVE_MILLIS_DEFAULT = 5000;
final static long TIMEOUT_MILLIS_DEFAULT = 60000;
final static long WAKE_MILLIS_DEFAULT = 500;
private final ProcedureCoordinatorRpcs rpcs;
private final ExecutorService pool;
private final long wakeTimeMillis;
private final long timeoutMillis;
// Running procedure table. Maps procedure name to running procedure reference
private final ConcurrentMap<String, Procedure> procedures =
new MapMaker().concurrencyLevel(4).weakValues().makeMap();
/**
* Create and start a ProcedureCoordinator.
*
* The rpc object registers the ProcedureCoordinator and starts any threads in this
* constructor.
*
* @param pool Used for executing procedures.
*/
// Only used in tests. SimpleMasterProcedureManager is a test class.
public ProcedureCoordinator(ProcedureCoordinatorRpcs rpcs, ThreadPoolExecutor pool) {
this(rpcs, pool, TIMEOUT_MILLIS_DEFAULT, WAKE_MILLIS_DEFAULT);
}
/**
* Create and start a ProcedureCoordinator.
*
* The rpc object registers the ProcedureCoordinator and starts any threads in
* this constructor.
*
* @param pool Used for executing procedures.
*/
public ProcedureCoordinator(ProcedureCoordinatorRpcs rpcs, ThreadPoolExecutor pool,
long timeoutMillis, long wakeTimeMillis) {
this.timeoutMillis = timeoutMillis;
this.wakeTimeMillis = wakeTimeMillis;
this.rpcs = rpcs;
this.pool = pool;
this.rpcs.start(this);
}
/**
* Default thread pool for the procedure
*
* @param coordName
* @param opThreads the maximum number of threads to allow in the pool
*/
public static ThreadPoolExecutor defaultPool(String coordName, int opThreads) {
return defaultPool(coordName, opThreads, KEEP_ALIVE_MILLIS_DEFAULT);
}
/**
* Default thread pool for the procedure
*
* @param coordName
* @param opThreads the maximum number of threads to allow in the pool
* @param keepAliveMillis the maximum time (ms) that excess idle threads will wait for new tasks
*/
public static ThreadPoolExecutor defaultPool(String coordName, int opThreads,
long keepAliveMillis) {
return new ThreadPoolExecutor(1, opThreads, keepAliveMillis, TimeUnit.MILLISECONDS,
new SynchronousQueue<>(),
new ThreadFactoryBuilder().setNameFormat("(" + coordName + ")-proc-coordinator-pool-%d")
.setDaemon(true).setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
}
/**
* Shutdown the thread pools and release rpc resources
* @throws IOException
*/
public void close() throws IOException {
// have to use shutdown now to break any latch waiting
pool.shutdownNow();
rpcs.close();
}
/**
* Submit an procedure to kick off its dependent subprocedures.
* @param proc Procedure to execute
* @return <tt>true</tt> if the procedure was started correctly, <tt>false</tt> if the
* procedure or any subprocedures could not be started. Failure could be due to
* submitting a procedure multiple times (or one with the same name), or some sort
* of IO problem. On errors, the procedure's monitor holds a reference to the exception
* that caused the failure.
*/
@SuppressWarnings("FutureReturnValueIgnored")
boolean submitProcedure(Procedure proc) {
// if the submitted procedure was null, then we don't want to run it
if (proc == null) {
return false;
}
String procName = proc.getName();
// make sure we aren't already running a procedure of that name
Procedure oldProc = procedures.get(procName);
if (oldProc != null) {
// procedures are always eventually completed on both successful and failed execution
try {
if (!oldProc.isCompleted()) {
LOG.warn("Procedure " + procName + " currently running. Rejecting new request");
return false;
} else {
LOG.debug("Procedure " + procName
+ " was in running list but was completed. Accepting new attempt.");
if (!procedures.remove(procName, oldProc)) {
LOG.warn("Procedure " + procName
+ " has been resubmitted by another thread. Rejecting this request.");
return false;
}
}
} catch (ForeignException e) {
LOG.debug("Procedure " + procName
+ " was in running list but has exception. Accepting new attempt.");
if (!procedures.remove(procName, oldProc)) {
LOG.warn("Procedure " + procName
+ " has been resubmitted by another thread. Rejecting this request.");
return false;
}
}
}
// kick off the procedure's execution in a separate thread
try {
if (this.procedures.putIfAbsent(procName, proc) == null) {
LOG.debug("Submitting procedure " + procName);
this.pool.submit(proc);
return true;
} else {
LOG.error("Another thread has submitted procedure '" + procName + "'. Ignoring this attempt.");
return false;
}
} catch (RejectedExecutionException e) {
LOG.warn("Procedure " + procName + " rejected by execution pool. Propagating error.", e);
// Remove the procedure from the list since is not started
this.procedures.remove(procName, proc);
// the thread pool is full and we can't run the procedure
proc.receive(new ForeignException(procName, e));
}
return false;
}
/**
* The connection to the rest of the procedure group (members and coordinator) has been
* broken/lost/failed. This should fail any interested procedures, but not attempt to notify other
* members since we cannot reach them anymore.
* @param message description of the error
* @param cause the actual cause of the failure
*/
void rpcConnectionFailure(final String message, final IOException cause) {
Collection<Procedure> toNotify = procedures.values();
boolean isTraceEnabled = LOG.isTraceEnabled();
LOG.debug("received connection failure: " + message, cause);
for (Procedure proc : toNotify) {
if (proc == null) {
continue;
}
// notify the elements, if they aren't null
if (isTraceEnabled) {
LOG.trace("connection failure - notify procedure: " + proc.getName());
}
proc.receive(new ForeignException(proc.getName(), cause));
}
}
/**
* Abort the procedure with the given name
* @param procName name of the procedure to abort
* @param reason serialized information about the abort
*/
public void abortProcedure(String procName, ForeignException reason) {
LOG.debug("abort procedure " + procName, reason);
// if we know about the Procedure, notify it
Procedure proc = procedures.get(procName);
if (proc == null) {
return;
}
proc.receive(reason);
}
/**
* Exposed for hooking with unit tests.
* @param procName
* @param procArgs
* @param expectedMembers
* @return the newly created procedure
*/
Procedure createProcedure(ForeignExceptionDispatcher fed, String procName, byte[] procArgs,
List<String> expectedMembers) {
// build the procedure
return new Procedure(this, fed, wakeTimeMillis, timeoutMillis,
procName, procArgs, expectedMembers);
}
/**
* Kick off the named procedure
* Currently only one procedure with the same type and name is allowed to run at a time.
* @param procName name of the procedure to start
* @param procArgs arguments for the procedure
* @param expectedMembers expected members to start
* @return handle to the running procedure, if it was started correctly,
* <tt>null</tt> otherwise.
* Null could be due to submitting a procedure multiple times
* (or one with the same name), or runtime exception.
* Check the procedure's monitor that holds a reference to the exception
* that caused the failure.
*/
public Procedure startProcedure(ForeignExceptionDispatcher fed, String procName, byte[] procArgs,
List<String> expectedMembers) {
Procedure proc = createProcedure(fed, procName, procArgs, expectedMembers);
if (!this.submitProcedure(proc)) {
LOG.error("Failed to submit procedure '" + procName + "'");
return null;
}
return proc;
}
/**
* Notification that the procedure had the specified member acquired its part of the barrier
* via {@link Subprocedure#acquireBarrier()}.
* @param procName name of the procedure that acquired
* @param member name of the member that acquired
*/
void memberAcquiredBarrier(String procName, final String member) {
Procedure proc = procedures.get(procName);
if (proc == null) {
LOG.warn("Member '"+ member +"' is trying to acquire an unknown procedure '"+ procName +"'");
return;
}
if (LOG.isTraceEnabled()) {
LOG.trace("Member '"+ member +"' acquired procedure '"+ procName +"'");
}
proc.barrierAcquiredByMember(member);
}
/**
* Notification that the procedure had another member finished executing its in-barrier subproc
* via {@link Subprocedure#insideBarrier()}.
* @param procName name of the subprocedure that finished
* @param member name of the member that executed and released its barrier
* @param dataFromMember the data that the member returned along with the notification
*/
void memberFinishedBarrier(String procName, final String member, byte[] dataFromMember) {
Procedure proc = procedures.get(procName);
if (proc == null) {
LOG.warn("Member '"+ member +"' is trying to release an unknown procedure '"+ procName +"'");
return;
}
if (LOG.isTraceEnabled()) {
LOG.trace("Member '"+ member +"' released procedure '"+ procName +"'");
}
proc.barrierReleasedByMember(member, dataFromMember);
}
/**
* @return the rpcs implementation for all current procedures
*/
ProcedureCoordinatorRpcs getRpcs() {
return rpcs;
}
/**
* Returns the procedure. This Procedure is a live instance so should not be modified but can
* be inspected.
* @param name Name of the procedure
* @return Procedure or null if not present any more
*/
public Procedure getProcedure(String name) {
return procedures.get(name);
}
/**
* @return Return set of all procedure names.
*/
public Set<String> getProcedureNames() {
return new HashSet<>(procedures.keySet());
}
}