| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.hbase.procedure2; |
| |
| import java.io.IOException; |
| import java.lang.Thread.UncaughtExceptionHandler; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Optional; |
| import java.util.Set; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.DelayQueue; |
| import java.util.concurrent.ThreadPoolExecutor; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.hbase.procedure2.util.DelayedUtil; |
| import org.apache.hadoop.hbase.procedure2.util.DelayedUtil.DelayedContainerWithTimestamp; |
| import org.apache.hadoop.hbase.procedure2.util.DelayedUtil.DelayedWithTimeout; |
| import org.apache.hadoop.hbase.procedure2.util.StringUtils; |
| import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; |
| import org.apache.hadoop.hbase.util.Threads; |
| import org.apache.yetus.audience.InterfaceAudience; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import org.apache.hbase.thirdparty.com.google.common.collect.ArrayListMultimap; |
| |
| /** |
| * A procedure dispatcher that aggregates and sends after elapsed time or after we hit |
| * count threshold. Creates its own threadpool to run RPCs with timeout. |
| * <ul> |
| * <li>Each server queue has a dispatch buffer</li> |
| * <li>Once the dispatch buffer reaches a threshold-size/time we send<li> |
| * </ul> |
| * <p>Call {@link #start()} and then {@link #submitTask(Runnable)}. When done, |
| * call {@link #stop()}. |
| */ |
| @InterfaceAudience.Private |
| public abstract class RemoteProcedureDispatcher<TEnv, TRemote extends Comparable<TRemote>> { |
| private static final Logger LOG = LoggerFactory.getLogger(RemoteProcedureDispatcher.class); |
| |
| public static final String THREAD_POOL_SIZE_CONF_KEY = |
| "hbase.procedure.remote.dispatcher.threadpool.size"; |
| private static final int DEFAULT_THREAD_POOL_SIZE = 128; |
| |
| public static final String DISPATCH_DELAY_CONF_KEY = |
| "hbase.procedure.remote.dispatcher.delay.msec"; |
| private static final int DEFAULT_DISPATCH_DELAY = 150; |
| |
| public static final String DISPATCH_MAX_QUEUE_SIZE_CONF_KEY = |
| "hbase.procedure.remote.dispatcher.max.queue.size"; |
| private static final int DEFAULT_MAX_QUEUE_SIZE = 32; |
| |
| private final AtomicBoolean running = new AtomicBoolean(false); |
| private final ConcurrentHashMap<TRemote, BufferNode> nodeMap = |
| new ConcurrentHashMap<TRemote, BufferNode>(); |
| |
| private final int operationDelay; |
| private final int queueMaxSize; |
| private final int corePoolSize; |
| |
| private TimeoutExecutorThread timeoutExecutor; |
| private ThreadPoolExecutor threadPool; |
| |
| protected RemoteProcedureDispatcher(Configuration conf) { |
| this.corePoolSize = conf.getInt(THREAD_POOL_SIZE_CONF_KEY, DEFAULT_THREAD_POOL_SIZE); |
| this.operationDelay = conf.getInt(DISPATCH_DELAY_CONF_KEY, DEFAULT_DISPATCH_DELAY); |
| this.queueMaxSize = conf.getInt(DISPATCH_MAX_QUEUE_SIZE_CONF_KEY, DEFAULT_MAX_QUEUE_SIZE); |
| } |
| |
| public boolean start() { |
| if (running.getAndSet(true)) { |
| LOG.warn("Already running"); |
| return false; |
| } |
| |
| LOG.info("Instantiated, coreThreads={} (allowCoreThreadTimeOut=true), queueMaxSize={}, " + |
| "operationDelay={}", this.corePoolSize, this.queueMaxSize, this.operationDelay); |
| |
| // Create the timeout executor |
| timeoutExecutor = new TimeoutExecutorThread(); |
| timeoutExecutor.start(); |
| |
| // Create the thread pool that will execute RPCs |
| threadPool = Threads.getBoundedCachedThreadPool(corePoolSize, 60L, TimeUnit.SECONDS, |
| Threads.newDaemonThreadFactory(this.getClass().getSimpleName(), |
| getUncaughtExceptionHandler())); |
| return true; |
| } |
| |
| public boolean stop() { |
| if (!running.getAndSet(false)) { |
| return false; |
| } |
| |
| LOG.info("Stopping procedure remote dispatcher"); |
| |
| // send stop signals |
| timeoutExecutor.sendStopSignal(); |
| threadPool.shutdownNow(); |
| return true; |
| } |
| |
| public void join() { |
| assert !running.get() : "expected not running"; |
| |
| // wait the timeout executor |
| timeoutExecutor.awaitTermination(); |
| timeoutExecutor = null; |
| |
| // wait for the thread pool to terminate |
| threadPool.shutdownNow(); |
| try { |
| while (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) { |
| LOG.warn("Waiting for thread-pool to terminate"); |
| } |
| } catch (InterruptedException e) { |
| LOG.warn("Interrupted while waiting for thread-pool termination", e); |
| } |
| } |
| |
| protected abstract UncaughtExceptionHandler getUncaughtExceptionHandler(); |
| |
| // ============================================================================================ |
| // Node Helpers |
| // ============================================================================================ |
| /** |
| * Add a node that will be able to execute remote procedures |
| * @param key the node identifier |
| */ |
| public void addNode(final TRemote key) { |
| assert key != null: "Tried to add a node with a null key"; |
| nodeMap.computeIfAbsent(key, k -> new BufferNode(k)); |
| } |
| |
| /** |
| * Add a remote rpc. |
| * @param key the node identifier |
| */ |
| public void addOperationToNode(final TRemote key, RemoteProcedure rp) |
| throws NullTargetServerDispatchException, NoServerDispatchException, |
| NoNodeDispatchException { |
| if (key == null) { |
| throw new NullTargetServerDispatchException(rp.toString()); |
| } |
| BufferNode node = nodeMap.get(key); |
| if (node == null) { |
| // If null here, it means node has been removed because it crashed. This happens when server |
| // is expired in ServerManager. ServerCrashProcedure may or may not have run. |
| throw new NoServerDispatchException(key.toString() + "; " + rp.toString()); |
| } |
| node.add(rp); |
| // Check our node still in the map; could have been removed by #removeNode. |
| if (!nodeMap.containsValue(node)) { |
| throw new NoNodeDispatchException(key.toString() + "; " + rp.toString()); |
| } |
| } |
| |
| public void removeCompletedOperation(final TRemote key, RemoteProcedure rp) { |
| BufferNode node = nodeMap.get(key); |
| if (node == null) { |
| LOG.warn("since no node for this key {}, we can't removed the finished remote procedure", |
| key); |
| return; |
| } |
| node.operationCompleted(rp); |
| } |
| |
| /** |
| * Remove a remote node |
| * @param key the node identifier |
| */ |
| public boolean removeNode(final TRemote key) { |
| final BufferNode node = nodeMap.remove(key); |
| if (node == null) { |
| return false; |
| } |
| |
| node.abortOperationsInQueue(); |
| return true; |
| } |
| |
| // ============================================================================================ |
| // Task Helpers |
| // ============================================================================================ |
| protected final void submitTask(Runnable task) { |
| threadPool.execute(task); |
| } |
| |
| protected final void submitTask(Runnable task, long delay, TimeUnit unit) { |
| timeoutExecutor.add(new DelayedTask(task, delay, unit)); |
| } |
| |
| protected abstract void remoteDispatch(TRemote key, Set<RemoteProcedure> operations); |
| protected abstract void abortPendingOperations(TRemote key, Set<RemoteProcedure> operations); |
| |
| /** |
| * Data structure with reference to remote operation. |
| */ |
| public static abstract class RemoteOperation { |
| private final RemoteProcedure remoteProcedure; |
| |
| protected RemoteOperation(final RemoteProcedure remoteProcedure) { |
| this.remoteProcedure = remoteProcedure; |
| } |
| |
| public RemoteProcedure getRemoteProcedure() { |
| return remoteProcedure; |
| } |
| } |
| |
| /** |
| * Remote procedure reference. |
| */ |
| public interface RemoteProcedure<TEnv, TRemote> { |
| /** |
| * For building the remote operation. |
| * May be empty if no need to send remote call. Usually, this means the RemoteProcedure has been |
| * finished already. This is possible, as we may have already sent the procedure to RS but then |
| * the rpc connection is broken so the executeProcedures call fails, but the RS does receive the |
| * procedure and execute it and then report back, before we retry again. |
| */ |
| Optional<RemoteOperation> remoteCallBuild(TEnv env, TRemote remote); |
| |
| /** |
| * Called when the executeProcedure call is failed. |
| */ |
| void remoteCallFailed(TEnv env, TRemote remote, IOException exception); |
| |
| /** |
| * Called when RS tells the remote procedure is succeeded through the |
| * {@code reportProcedureDone} method. |
| */ |
| void remoteOperationCompleted(TEnv env); |
| |
| /** |
| * Called when RS tells the remote procedure is failed through the {@code reportProcedureDone} |
| * method. |
| */ |
| void remoteOperationFailed(TEnv env, RemoteProcedureException error); |
| |
| /** |
| * Whether store this remote procedure in dispatched queue |
| * only OpenRegionProcedure and CloseRegionProcedure return false since they are |
| * not fully controlled by dispatcher |
| */ |
| default boolean storeInDispatchedQueue() { |
| return true; |
| } |
| } |
| |
| /** |
| * Account of what procedures are running on remote node. |
| */ |
| public interface RemoteNode<TEnv, TRemote> { |
| TRemote getKey(); |
| |
| void add(RemoteProcedure<TEnv, TRemote> operation); |
| |
| void dispatch(); |
| } |
| |
| protected ArrayListMultimap<Class<?>, RemoteOperation> buildAndGroupRequestByType(final TEnv env, |
| final TRemote remote, final Set<RemoteProcedure> remoteProcedures) { |
| final ArrayListMultimap<Class<?>, RemoteOperation> requestByType = ArrayListMultimap.create(); |
| for (RemoteProcedure proc : remoteProcedures) { |
| Optional<RemoteOperation> operation = proc.remoteCallBuild(env, remote); |
| operation.ifPresent(op -> requestByType.put(op.getClass(), op)); |
| } |
| return requestByType; |
| } |
| |
| protected <T extends RemoteOperation> List<T> fetchType( |
| final ArrayListMultimap<Class<?>, RemoteOperation> requestByType, final Class<T> type) { |
| return (List<T>)requestByType.removeAll(type); |
| } |
| |
| // ============================================================================================ |
| // Timeout Helpers |
| // ============================================================================================ |
| private final class TimeoutExecutorThread extends Thread { |
| private final DelayQueue<DelayedWithTimeout> queue = new DelayQueue<DelayedWithTimeout>(); |
| |
| public TimeoutExecutorThread() { |
| super("ProcedureDispatcherTimeoutThread"); |
| } |
| |
| @Override |
| public void run() { |
| while (running.get()) { |
| final DelayedWithTimeout task = DelayedUtil.takeWithoutInterrupt(queue); |
| if (task == null || task == DelayedUtil.DELAYED_POISON) { |
| // the executor may be shutting down, and the task is just the shutdown request |
| continue; |
| } |
| if (task instanceof DelayedTask) { |
| threadPool.execute(((DelayedTask) task).getObject()); |
| } else { |
| ((BufferNode) task).dispatch(); |
| } |
| } |
| } |
| |
| public void add(final DelayedWithTimeout delayed) { |
| queue.add(delayed); |
| } |
| |
| public void remove(final DelayedWithTimeout delayed) { |
| queue.remove(delayed); |
| } |
| |
| public void sendStopSignal() { |
| queue.add(DelayedUtil.DELAYED_POISON); |
| } |
| |
| public void awaitTermination() { |
| try { |
| final long startTime = EnvironmentEdgeManager.currentTime(); |
| for (int i = 0; isAlive(); ++i) { |
| sendStopSignal(); |
| join(250); |
| if (i > 0 && (i % 8) == 0) { |
| LOG.warn("Waiting termination of thread " + getName() + ", " + |
| StringUtils.humanTimeDiff(EnvironmentEdgeManager.currentTime() - startTime)); |
| } |
| } |
| } catch (InterruptedException e) { |
| LOG.warn(getName() + " join wait got interrupted", e); |
| } |
| } |
| } |
| |
| // ============================================================================================ |
| // Internals Helpers |
| // ============================================================================================ |
| |
| /** |
| * Node that contains a set of RemoteProcedures |
| */ |
| protected final class BufferNode extends DelayedContainerWithTimestamp<TRemote> |
| implements RemoteNode<TEnv, TRemote> { |
| private Set<RemoteProcedure> operations; |
| private final Set<RemoteProcedure> dispatchedOperations = new HashSet<>(); |
| |
| protected BufferNode(final TRemote key) { |
| super(key, 0); |
| } |
| |
| @Override |
| public TRemote getKey() { |
| return getObject(); |
| } |
| |
| @Override |
| public synchronized void add(final RemoteProcedure operation) { |
| if (this.operations == null) { |
| this.operations = new HashSet<>(); |
| setTimeout(EnvironmentEdgeManager.currentTime() + operationDelay); |
| timeoutExecutor.add(this); |
| } |
| this.operations.add(operation); |
| if (this.operations.size() > queueMaxSize) { |
| timeoutExecutor.remove(this); |
| dispatch(); |
| } |
| } |
| |
| @Override |
| public synchronized void dispatch() { |
| if (operations != null) { |
| remoteDispatch(getKey(), operations); |
| operations.stream().filter(operation -> operation.storeInDispatchedQueue()) |
| .forEach(operation -> dispatchedOperations.add(operation)); |
| this.operations = null; |
| } |
| } |
| |
| public synchronized void abortOperationsInQueue() { |
| if (operations != null) { |
| abortPendingOperations(getKey(), operations); |
| this.operations = null; |
| } |
| abortPendingOperations(getKey(), dispatchedOperations); |
| this.dispatchedOperations.clear(); |
| } |
| |
| public synchronized void operationCompleted(final RemoteProcedure remoteProcedure){ |
| this.dispatchedOperations.remove(remoteProcedure); |
| } |
| |
| @Override |
| public String toString() { |
| return super.toString() + ", operations=" + this.operations; |
| } |
| } |
| |
| /** |
| * Delayed object that holds a FutureTask. |
| * <p/> |
| * used to submit something later to the thread-pool. |
| */ |
| private static final class DelayedTask extends DelayedContainerWithTimestamp<Runnable> { |
| public DelayedTask(Runnable task, long delay, TimeUnit unit) { |
| super(task, EnvironmentEdgeManager.currentTime() + unit.toMillis(delay)); |
| } |
| } |
| } |