blob: 9ccee661586a4ceac9767aab0aa0f42433992071 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.procedure;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.hadoop.hbase.errorhandling.ForeignException;
import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SimpleRSProcedureManager extends RegionServerProcedureManager {
private static final Logger LOG = LoggerFactory.getLogger(SimpleRSProcedureManager.class);
private RegionServerServices rss;
private ProcedureMemberRpcs memberRpcs;
private ProcedureMember member;
@Override
public void initialize(RegionServerServices rss) throws KeeperException {
this.rss = rss;
ZKWatcher zkw = rss.getZooKeeper();
this.memberRpcs = new ZKProcedureMemberRpcs(zkw, getProcedureSignature());
ThreadPoolExecutor pool =
ProcedureMember.defaultPool(rss.getServerName().toString(), 1);
this.member = new ProcedureMember(memberRpcs, pool, new SimleSubprocedureBuilder());
LOG.info("Initialized: " + rss.getServerName().toString());
}
@Override
public void start() {
this.memberRpcs.start(rss.getServerName().toString(), member);
LOG.info("Started.");
}
@Override
public void stop(boolean force) throws IOException {
LOG.info("stop: " + force);
try {
this.member.close();
} finally {
this.memberRpcs.close();
}
}
@Override
public String getProcedureSignature() {
return SimpleMasterProcedureManager.SIMPLE_SIGNATURE;
}
/**
* If in a running state, creates the specified subprocedure for handling a procedure.
* @return Subprocedure to submit to the ProcedureMember.
*/
public Subprocedure buildSubprocedure(String name) {
// don't run a procedure if the parent is stop(ping)
if (rss.isStopping() || rss.isStopped()) {
throw new IllegalStateException("Can't start procedure on RS: " + rss.getServerName()
+ ", because stopping/stopped!");
}
LOG.info("Attempting to run a procedure.");
ForeignExceptionDispatcher errorDispatcher = new ForeignExceptionDispatcher();
Configuration conf = rss.getConfiguration();
SimpleSubprocedurePool taskManager =
new SimpleSubprocedurePool(rss.getServerName().toString(), conf);
return new SimpleSubprocedure(rss, member, errorDispatcher, taskManager, name);
}
/**
* Build the actual procedure runner that will do all the 'hard' work
*/
public class SimleSubprocedureBuilder implements SubprocedureFactory {
@Override
public Subprocedure buildSubprocedure(String name, byte[] data) {
LOG.info("Building procedure: " + name);
return SimpleRSProcedureManager.this.buildSubprocedure(name);
}
}
public static class SimpleSubprocedurePool implements Closeable, Abortable {
private final ExecutorCompletionService<Void> taskPool;
private final ExecutorService executor;
private volatile boolean aborted;
private final List<Future<Void>> futures = new ArrayList<>();
private final String name;
public SimpleSubprocedurePool(String name, Configuration conf) {
this.name = name;
executor = Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder().setNameFormat("rs(" + name + ")-procedure-pool-%d")
.setDaemon(true).setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
taskPool = new ExecutorCompletionService<>(executor);
}
/**
* Submit a task to the pool.
*/
public void submitTask(final Callable<Void> task) {
Future<Void> f = this.taskPool.submit(task);
futures.add(f);
}
/**
* Wait for all of the currently outstanding tasks submitted via {@link #submitTask(Callable)}
*
* @return <tt>true</tt> on success, <tt>false</tt> otherwise
* @throws ForeignException
*/
public boolean waitForOutstandingTasks() throws ForeignException {
LOG.debug("Waiting for procedure to finish.");
try {
for (Future<Void> f: futures) {
f.get();
}
return true;
} catch (InterruptedException e) {
if (aborted) throw new ForeignException(
"Interrupted and found to be aborted while waiting for tasks!", e);
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
if (e.getCause() instanceof ForeignException) {
throw (ForeignException) e.getCause();
}
throw new ForeignException(name, e.getCause());
} finally {
// close off remaining tasks
for (Future<Void> f: futures) {
if (!f.isDone()) {
f.cancel(true);
}
}
}
return false;
}
/**
* Attempt to cleanly shutdown any running tasks - allows currently running tasks to cleanly
* finish
*/
@Override
public void close() {
executor.shutdown();
}
@Override
public void abort(String why, Throwable e) {
if (this.aborted) return;
this.aborted = true;
LOG.warn("Aborting because: " + why, e);
this.executor.shutdownNow();
}
@Override
public boolean isAborted() {
return this.aborted;
}
}
public class SimpleSubprocedure extends Subprocedure {
private final RegionServerServices rss;
private final SimpleSubprocedurePool taskManager;
public SimpleSubprocedure(RegionServerServices rss, ProcedureMember member,
ForeignExceptionDispatcher errorListener, SimpleSubprocedurePool taskManager, String name) {
super(member, name, errorListener, 500, 60000);
LOG.info("Constructing a SimpleSubprocedure.");
this.rss = rss;
this.taskManager = taskManager;
}
/**
* Callable task.
* TODO. We don't need a thread pool to execute roll log. This can be simplified
* with no use of subprocedurepool.
*/
class RSSimpleTask implements Callable<Void> {
RSSimpleTask() {}
@Override
public Void call() throws Exception {
LOG.info("Execute subprocedure on " + rss.getServerName().toString());
return null;
}
}
private void execute() throws ForeignException {
monitor.rethrowException();
// running a task (e.g., roll log, flush table) on region server
taskManager.submitTask(new RSSimpleTask());
monitor.rethrowException();
// wait for everything to complete.
taskManager.waitForOutstandingTasks();
monitor.rethrowException();
}
@Override
public void acquireBarrier() throws ForeignException {
// do nothing, executing in inside barrier step.
}
/**
* do a log roll.
*/
@Override
public byte[] insideBarrier() throws ForeignException {
execute();
return Bytes.toBytes(SimpleMasterProcedureManager.SIMPLE_DATA);
}
/**
* Cancel threads if they haven't finished.
*/
@Override
public void cleanup(Exception e) {
taskManager.abort("Aborting simple subprocedure tasks due to error", e);
}
}
}