blob: 485821e645c8eb57271902e43cc06799cf5bbbaf [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.procedure;
import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.DaemonThreadFactory;
import org.apache.hadoop.hbase.errorhandling.ForeignException;
import com.google.common.collect.MapMaker;
/**
* Process to kick off and manage a running {@link Subprocedure} on a member. This is the
* specialized part of a {@link Procedure} that actually does procedure type-specific work
* and reports back to the coordinator as it completes each phase.
*/
@InterfaceAudience.Private
public class ProcedureMember implements Closeable {
private static final Log LOG = LogFactory.getLog(ProcedureMember.class);
final static long KEEP_ALIVE_MILLIS_DEFAULT = 5000;
private final SubprocedureFactory builder;
private final ProcedureMemberRpcs rpcs;
private final ConcurrentMap<String,Subprocedure> subprocs =
new MapMaker().concurrencyLevel(4).weakValues().makeMap();
private final ExecutorService pool;
/**
* Instantiate a new ProcedureMember. This is a slave that executes subprocedures.
*
* @param rpcs controller used to send notifications to the procedure coordinator
* @param pool thread pool to submit subprocedures
* @param factory class that creates instances of a subprocedure.
*/
public ProcedureMember(ProcedureMemberRpcs rpcs, ThreadPoolExecutor pool,
SubprocedureFactory factory) {
this.pool = pool;
this.rpcs = rpcs;
this.builder = factory;
}
/**
* Default thread pool for the procedure
*
* @param memberName
* @param procThreads the maximum number of threads to allow in the pool
*/
public static ThreadPoolExecutor defaultPool(String memberName, int procThreads) {
return defaultPool(memberName, procThreads, KEEP_ALIVE_MILLIS_DEFAULT);
}
/**
* Default thread pool for the procedure
*
* @param memberName
* @param procThreads the maximum number of threads to allow in the pool
* @param keepAliveMillis the maximum time (ms) that excess idle threads will wait for new tasks
*/
public static ThreadPoolExecutor defaultPool(String memberName, int procThreads,
long keepAliveMillis) {
return new ThreadPoolExecutor(1, procThreads, keepAliveMillis, TimeUnit.MILLISECONDS,
new SynchronousQueue<Runnable>(),
new DaemonThreadFactory("member: '" + memberName + "' subprocedure-pool"));
}
/**
* Package exposed. Not for public use.
*
* @return reference to the Procedure member's rpcs object
*/
ProcedureMemberRpcs getRpcs() {
return rpcs;
}
/**
* This is separated from execution so that we can detect and handle the case where the
* subprocedure is invalid and inactionable due to bad info (like DISABLED snapshot type being
* sent here)
* @param opName
* @param data
* @return subprocedure
*/
public Subprocedure createSubprocedure(String opName, byte[] data) {
return builder.buildSubprocedure(opName, data);
}
/**
* Submit an subprocedure for execution. This starts the local acquire phase.
* @param subproc the subprocedure to execute.
* @return <tt>true</tt> if the subprocedure was started correctly, <tt>false</tt> if it
* could not be started. In the latter case, the subprocedure holds a reference to
* the exception that caused the failure.
*/
public boolean submitSubprocedure(Subprocedure subproc) {
// if the submitted subprocedure was null, bail.
if (subproc == null) {
LOG.warn("Submitted null subprocedure, nothing to run here.");
return false;
}
String procName = subproc.getName();
if (procName == null || procName.length() == 0) {
LOG.error("Subproc name cannot be null or the empty string");
return false;
}
// make sure we aren't already running an subprocedure of that name
Subprocedure rsub = subprocs.get(procName);
if (rsub != null) {
if (!rsub.isComplete()) {
LOG.error("Subproc '" + procName + "' is already running. Bailing out");
return false;
}
LOG.warn("A completed old subproc " + procName + " is still present, removing");
if (!subprocs.remove(procName, rsub)) {
LOG.error("Another thread has replaced existing subproc '" + procName + "'. Bailing out");
return false;
}
}
LOG.debug("Submitting new Subprocedure:" + procName);
// kick off the subprocedure
try {
if (subprocs.putIfAbsent(procName, subproc) == null) {
this.pool.submit(subproc);
return true;
} else {
LOG.error("Another thread has submitted subproc '" + procName + "'. Bailing out");
return false;
}
} catch (RejectedExecutionException e) {
subprocs.remove(procName, subproc);
// the thread pool is full and we can't run the subprocedure
String msg = "Subprocedure pool is full!";
subproc.cancel(msg, e.getCause());
}
LOG.error("Failed to start subprocedure '" + procName + "'");
return false;
}
/**
* Notification that procedure coordinator has reached the global barrier
* @param procName name of the subprocedure that should start running the in-barrier phase
*/
public void receivedReachedGlobalBarrier(String procName) {
Subprocedure subproc = subprocs.get(procName);
if (subproc == null) {
LOG.warn("Unexpected reached globa barrier message for Sub-Procedure '" + procName + "'");
return;
}
if (LOG.isTraceEnabled()) {
LOG.trace("reached global barrier message for Sub-Procedure '" + procName + "'");
}
subproc.receiveReachedGlobalBarrier();
}
/**
* Best effort attempt to close the threadpool via Thread.interrupt.
*/
@Override
public void close() throws IOException {
// have to use shutdown now to break any latch waiting
pool.shutdownNow();
}
/**
* Shutdown the threadpool, and wait for upto timeoutMs millis before bailing
* @param timeoutMs timeout limit in millis
* @return true if successfully, false if bailed due to timeout.
* @throws InterruptedException
*/
boolean closeAndWait(long timeoutMs) throws InterruptedException {
pool.shutdown();
return pool.awaitTermination(timeoutMs, TimeUnit.MILLISECONDS);
}
/**
* The connection to the rest of the procedure group (member and coordinator) has been
* broken/lost/failed. This should fail any interested subprocedure, but not attempt to notify
* other members since we cannot reach them anymore.
* @param message description of the error
* @param cause the actual cause of the failure
* @param procName the name of the procedure we'd cancel due to the error.
*/
public void controllerConnectionFailure(final String message, final Throwable cause,
final String procName) {
LOG.error(message, cause);
if (procName == null) {
return;
}
Subprocedure toNotify = subprocs.get(procName);
if (toNotify != null) {
toNotify.cancel(message, cause);
}
}
/**
* Send abort to the specified procedure
* @param procName name of the procedure to about
* @param ee exception information about the abort
*/
public void receiveAbortProcedure(String procName, ForeignException ee) {
LOG.debug("Request received to abort procedure " + procName, ee);
// if we know about the procedure, notify it
Subprocedure sub = subprocs.get(procName);
if (sub == null) {
LOG.info("Received abort on procedure with no local subprocedure " + procName +
", ignoring it.", ee);
return; // Procedure has already completed
}
String msg = "Propagating foreign exception to subprocedure " + sub.getName();
LOG.error(msg, ee);
sub.cancel(msg, ee);
}
}