blob: 3f21a72261bc8e2a29876495022d4285be1d8ce3 [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.Comparator;
import java.util.List;
import org.apache.commons.lang.ClassUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.MetaMutationAnnotation;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionServerObserver;
import org.apache.hadoop.hbase.coprocessor.SingletonCoprocessorService;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.security.User;
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
@InterfaceStability.Evolving
public class RegionServerCoprocessorHost extends
CoprocessorHost<RegionServerCoprocessorHost.RegionServerEnvironment> {
private static final Log LOG = LogFactory.getLog(RegionServerCoprocessorHost.class);
private RegionServerServices rsServices;
public RegionServerCoprocessorHost(RegionServerServices rsServices,
Configuration conf) {
super(rsServices);
this.rsServices = rsServices;
this.conf = conf;
// Log the state of coprocessor loading here; should appear only once or
// twice in the daemon log, depending on HBase version, because there is
// only one RegionServerCoprocessorHost instance in the RS process
boolean coprocessorsEnabled = conf.getBoolean(COPROCESSORS_ENABLED_CONF_KEY,
DEFAULT_COPROCESSORS_ENABLED);
boolean tableCoprocessorsEnabled = conf.getBoolean(USER_COPROCESSORS_ENABLED_CONF_KEY,
DEFAULT_USER_COPROCESSORS_ENABLED);
LOG.info("System coprocessor loading is " + (coprocessorsEnabled ? "enabled" : "disabled"));
LOG.info("Table coprocessor loading is " +
((coprocessorsEnabled && tableCoprocessorsEnabled) ? "enabled" : "disabled"));
loadSystemCoprocessors(conf, REGIONSERVER_COPROCESSOR_CONF_KEY);
}
@Override
public RegionServerEnvironment createEnvironment(Class<?> implClass,
Coprocessor instance, int priority, int sequence, Configuration conf) {
return new RegionServerEnvironment(implClass, instance, priority,
sequence, conf, this.rsServices);
}
public void preStop(String message) throws IOException {
execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
@Override
public void call(RegionServerObserver oserver,
ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
oserver.preStopRegionServer(ctx);
}
@Override
public void postEnvCall(RegionServerEnvironment env) {
// invoke coprocessor stop method
shutdown(env);
}
});
}
public boolean preMerge(final HRegion regionA, final HRegion regionB, final User user) throws IOException {
return execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation(user) {
@Override
public void call(RegionServerObserver oserver,
ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
oserver.preMerge(ctx, regionA, regionB);
}
});
}
public void postMerge(final HRegion regionA, final HRegion regionB, final HRegion mergedRegion,
final User user)
throws IOException {
execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation(user) {
@Override
public void call(RegionServerObserver oserver,
ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
oserver.postMerge(ctx, regionA, regionB, mergedRegion);
}
});
}
public boolean preMergeCommit(final HRegion regionA, final HRegion regionB,
final @MetaMutationAnnotation List<Mutation> metaEntries, final User user)
throws IOException {
return execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation(user) {
@Override
public void call(RegionServerObserver oserver,
ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
oserver.preMergeCommit(ctx, regionA, regionB, metaEntries);
}
});
}
public void postMergeCommit(final HRegion regionA, final HRegion regionB,
final HRegion mergedRegion, final User user) throws IOException {
execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation(user) {
@Override
public void call(RegionServerObserver oserver,
ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
oserver.postMergeCommit(ctx, regionA, regionB, mergedRegion);
}
});
}
public void preRollBackMerge(final HRegion regionA, final HRegion regionB, final User user)
throws IOException {
execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation(user) {
@Override
public void call(RegionServerObserver oserver,
ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
oserver.preRollBackMerge(ctx, regionA, regionB);
}
});
}
public void postRollBackMerge(final HRegion regionA, final HRegion regionB, final User user)
throws IOException {
execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation(user) {
@Override
public void call(RegionServerObserver oserver,
ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
oserver.postRollBackMerge(ctx, regionA, regionB);
}
});
}
public void preRollWALWriterRequest() throws IOException {
execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
@Override
public void call(RegionServerObserver oserver,
ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
oserver.preRollWALWriterRequest(ctx);
}
});
}
public void postRollWALWriterRequest() throws IOException {
execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
@Override
public void call(RegionServerObserver oserver,
ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
oserver.postRollWALWriterRequest(ctx);
}
});
}
public void preReplicateLogEntries(final List<WALEntry> entries, final CellScanner cells)
throws IOException {
execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
@Override
public void call(RegionServerObserver oserver,
ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
oserver.preReplicateLogEntries(ctx, entries, cells);
}
});
}
public void postReplicateLogEntries(final List<WALEntry> entries, final CellScanner cells)
throws IOException {
execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
@Override
public void call(RegionServerObserver oserver,
ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
oserver.postReplicateLogEntries(ctx, entries, cells);
}
});
}
public ReplicationEndpoint postCreateReplicationEndPoint(final ReplicationEndpoint endpoint)
throws IOException {
return execOperationWithResult(endpoint, coprocessors.isEmpty() ? null
: new CoprocessOperationWithResult<ReplicationEndpoint>() {
@Override
public void call(RegionServerObserver oserver,
ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
setResult(oserver.postCreateReplicationEndPoint(ctx, getResult()));
}
});
}
private <T> T execOperationWithResult(final T defaultValue,
final CoprocessOperationWithResult<T> ctx) throws IOException {
if (ctx == null)
return defaultValue;
ctx.setResult(defaultValue);
execOperation(ctx);
return ctx.getResult();
}
private static abstract class CoprocessorOperation
extends ObserverContext<RegionServerCoprocessorEnvironment> {
public CoprocessorOperation() {
this(RpcServer.getRequestUser());
}
public CoprocessorOperation(User user) {
super(user);
}
public abstract void call(RegionServerObserver oserver,
ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException;
public void postEnvCall(RegionServerEnvironment env) {
}
}
private static abstract class CoprocessOperationWithResult<T> extends CoprocessorOperation {
private T result = null;
public void setResult(final T result) {
this.result = result;
}
public T getResult() {
return this.result;
}
}
private boolean execOperation(final CoprocessorOperation ctx) throws IOException {
if (ctx == null) return false;
boolean bypass = false;
List<RegionServerEnvironment> envs = coprocessors.get();
for (int i = 0; i < envs.size(); i++) {
RegionServerEnvironment env = envs.get(i);
if (env.getInstance() instanceof RegionServerObserver) {
ctx.prepare(env);
Thread currentThread = Thread.currentThread();
ClassLoader cl = currentThread.getContextClassLoader();
try {
currentThread.setContextClassLoader(env.getClassLoader());
ctx.call((RegionServerObserver)env.getInstance(), ctx);
} catch (Throwable e) {
handleCoprocessorThrowable(env, e);
} finally {
currentThread.setContextClassLoader(cl);
}
bypass |= ctx.shouldBypass();
if (ctx.shouldComplete()) {
break;
}
}
ctx.postEnvCall(env);
}
return bypass;
}
/**
* Coprocessor environment extension providing access to region server
* related services.
*/
static class RegionServerEnvironment extends CoprocessorHost.Environment
implements RegionServerCoprocessorEnvironment {
private RegionServerServices regionServerServices;
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="BC_UNCONFIRMED_CAST",
justification="Intentional; FB has trouble detecting isAssignableFrom")
public RegionServerEnvironment(final Class<?> implClass,
final Coprocessor impl, final int priority, final int seq,
final Configuration conf, final RegionServerServices services) {
super(impl, priority, seq, conf);
this.regionServerServices = services;
for (Object itf : ClassUtils.getAllInterfaces(implClass)) {
Class<?> c = (Class<?>) itf;
if (SingletonCoprocessorService.class.isAssignableFrom(c)) {// FindBugs: BC_UNCONFIRMED_CAST
this.regionServerServices.registerService(
((SingletonCoprocessorService) impl).getService());
break;
}
}
}
@Override
public RegionServerServices getRegionServerServices() {
return regionServerServices;
}
}
/**
* Environment priority comparator. Coprocessors are chained in sorted
* order.
*/
static class EnvironmentPriorityComparator implements
Comparator<CoprocessorEnvironment> {
public int compare(final CoprocessorEnvironment env1,
final CoprocessorEnvironment env2) {
if (env1.getPriority() < env2.getPriority()) {
return -1;
} else if (env1.getPriority() > env2.getPriority()) {
return 1;
}
if (env1.getLoadSequence() < env2.getLoadSequence()) {
return -1;
} else if (env1.getLoadSequence() > env2.getLoadSequence()) {
return 1;
}
return 0;
}
}
}