blob: 398c596b63f8b914276db0c0e343e75fc40a16c0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.RawCellBuilder;
import org.apache.hadoop.hbase.RawCellBuilderFactory;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.CheckAndMutate;
import org.apache.hadoop.hbase.client.CheckAndMutateResult;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.SharedConnection;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.coprocessor.BaseEnvironment;
import org.apache.hadoop.hbase.coprocessor.BulkLoadObserver;
import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.coprocessor.CoreCoprocessor;
import org.apache.hadoop.hbase.coprocessor.EndpointObserver;
import org.apache.hadoop.hbase.coprocessor.HasRegionServerServices;
import org.apache.hadoop.hbase.coprocessor.MetricsCoprocessor;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
import org.apache.hadoop.hbase.io.Reference;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.metrics.MetricRegistry;
import org.apache.hadoop.hbase.regionserver.Region.Operation;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.querymatcher.DeleteTracker;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.CoprocessorClassLoader;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
import org.apache.hbase.thirdparty.com.google.protobuf.Service;
import org.apache.hbase.thirdparty.org.apache.commons.collections4.map.AbstractReferenceMap;
import org.apache.hbase.thirdparty.org.apache.commons.collections4.map.ReferenceMap;
/**
* Implements the coprocessor environment and runtime support for coprocessors loaded within a
* {@link Region}.
*/
@InterfaceAudience.Private
public class RegionCoprocessorHost
extends CoprocessorHost<RegionCoprocessor, RegionCoprocessorEnvironment> {
private static final Logger LOG = LoggerFactory.getLogger(RegionCoprocessorHost.class);
// The shared data map
private static final ReferenceMap<String, ConcurrentMap<String, Object>> SHARED_DATA_MAP =
new ReferenceMap<>(AbstractReferenceMap.ReferenceStrength.HARD,
AbstractReferenceMap.ReferenceStrength.WEAK);
// optimization: no need to call postScannerFilterRow, if no coprocessor implements it
private final boolean hasCustomPostScannerFilterRow;
/*
* Whether any configured CPs override postScannerFilterRow hook
*/
public boolean hasCustomPostScannerFilterRow() {
return hasCustomPostScannerFilterRow;
}
/**
* Encapsulation of the environment of each coprocessor
*/
private static class RegionEnvironment extends BaseEnvironment<RegionCoprocessor>
implements RegionCoprocessorEnvironment {
private Region region;
ConcurrentMap<String, Object> sharedData;
private final MetricRegistry metricRegistry;
private final RegionServerServices services;
/**
* Constructor
* @param impl the coprocessor instance
* @param priority chaining priority
*/
public RegionEnvironment(final RegionCoprocessor impl, final int priority, final int seq,
final Configuration conf, final Region region, final RegionServerServices services,
final ConcurrentMap<String, Object> sharedData) {
super(impl, priority, seq, conf);
this.region = region;
this.sharedData = sharedData;
this.services = services;
this.metricRegistry =
MetricsCoprocessor.createRegistryForRegionCoprocessor(impl.getClass().getName());
}
/** Returns the region */
@Override
public Region getRegion() {
return region;
}
@Override
public OnlineRegions getOnlineRegions() {
return this.services;
}
@Override
public Connection getConnection() {
// Mocks may have services as null at test time.
return services != null ? new SharedConnection(services.getConnection()) : null;
}
@Override
public Connection createConnection(Configuration conf) throws IOException {
return services != null ? this.services.createConnection(conf) : null;
}
@Override
public ServerName getServerName() {
return services != null ? services.getServerName() : null;
}
@Override
public void shutdown() {
super.shutdown();
MetricsCoprocessor.removeRegistry(this.metricRegistry);
}
@Override
public ConcurrentMap<String, Object> getSharedData() {
return sharedData;
}
@Override
public RegionInfo getRegionInfo() {
return region.getRegionInfo();
}
@Override
public MetricRegistry getMetricRegistryForRegionServer() {
return metricRegistry;
}
@Override
public RawCellBuilder getCellBuilder() {
// We always do a DEEP_COPY only
return RawCellBuilderFactory.create();
}
}
/**
* Special version of RegionEnvironment that exposes RegionServerServices for Core Coprocessors
* only. Temporary hack until Core Coprocessors are integrated into Core.
*/
private static class RegionEnvironmentForCoreCoprocessors extends RegionEnvironment
implements HasRegionServerServices {
private final RegionServerServices rsServices;
public RegionEnvironmentForCoreCoprocessors(final RegionCoprocessor impl, final int priority,
final int seq, final Configuration conf, final Region region,
final RegionServerServices services, final ConcurrentMap<String, Object> sharedData) {
super(impl, priority, seq, conf, region, services, sharedData);
this.rsServices = services;
}
/**
* @return An instance of RegionServerServices, an object NOT for general user-space Coprocessor
* consumption.
*/
@Override
public RegionServerServices getRegionServerServices() {
return this.rsServices;
}
}
static class TableCoprocessorAttribute {
private Path path;
private String className;
private int priority;
private Configuration conf;
public TableCoprocessorAttribute(Path path, String className, int priority,
Configuration conf) {
this.path = path;
this.className = className;
this.priority = priority;
this.conf = conf;
}
public Path getPath() {
return path;
}
public String getClassName() {
return className;
}
public int getPriority() {
return priority;
}
public Configuration getConf() {
return conf;
}
}
/** The region server services */
RegionServerServices rsServices;
/** The region */
HRegion region;
/**
* Constructor
* @param region the region
* @param rsServices interface to available region server functionality
* @param conf the configuration
*/
@SuppressWarnings("ReturnValueIgnored") // Checking method exists as CPU optimization
public RegionCoprocessorHost(final HRegion region, final RegionServerServices rsServices,
final Configuration conf) {
super(rsServices);
this.conf = conf;
this.rsServices = rsServices;
this.region = region;
this.pathPrefix = Integer.toString(this.region.getRegionInfo().hashCode());
// load system default cp's from configuration.
loadSystemCoprocessors(conf, REGION_COPROCESSOR_CONF_KEY);
// load system default cp's for user tables from configuration.
if (!region.getRegionInfo().getTable().isSystemTable()) {
loadSystemCoprocessors(conf, USER_REGION_COPROCESSOR_CONF_KEY);
}
// load Coprocessor From HDFS
loadTableCoprocessors(conf);
// now check whether any coprocessor implements postScannerFilterRow
boolean hasCustomPostScannerFilterRow = false;
out: for (RegionCoprocessorEnvironment env : coprocEnvironments) {
if (env.getInstance() instanceof RegionObserver) {
Class<?> clazz = env.getInstance().getClass();
for (;;) {
if (clazz == Object.class) {
// we dont need to look postScannerFilterRow into Object class
break; // break the inner loop
}
try {
clazz.getDeclaredMethod("postScannerFilterRow", ObserverContext.class,
InternalScanner.class, Cell.class, boolean.class);
// this coprocessor has a custom version of postScannerFilterRow
hasCustomPostScannerFilterRow = true;
break out;
} catch (NoSuchMethodException ignore) {
}
// the deprecated signature still exists
try {
clazz.getDeclaredMethod("postScannerFilterRow", ObserverContext.class,
InternalScanner.class, byte[].class, int.class, short.class, boolean.class);
// this coprocessor has a custom version of postScannerFilterRow
hasCustomPostScannerFilterRow = true;
break out;
} catch (NoSuchMethodException ignore) {
}
clazz = clazz.getSuperclass();
}
}
}
this.hasCustomPostScannerFilterRow = hasCustomPostScannerFilterRow;
}
static List<TableCoprocessorAttribute> getTableCoprocessorAttrsFromSchema(Configuration conf,
TableDescriptor htd) {
return htd.getCoprocessorDescriptors().stream().map(cp -> {
Path path = cp.getJarPath().map(p -> new Path(p)).orElse(null);
Configuration ourConf;
if (!cp.getProperties().isEmpty()) {
// do an explicit deep copy of the passed configuration
ourConf = new Configuration(false);
HBaseConfiguration.merge(ourConf, conf);
cp.getProperties().forEach((k, v) -> ourConf.set(k, v));
} else {
ourConf = conf;
}
return new TableCoprocessorAttribute(path, cp.getClassName(), cp.getPriority(), ourConf);
}).collect(Collectors.toList());
}
/**
* Sanity check the table coprocessor attributes of the supplied schema. Will throw an exception
* if there is a problem.
*/
public static void testTableCoprocessorAttrs(final Configuration conf, final TableDescriptor htd)
throws IOException {
String pathPrefix = UUID.randomUUID().toString();
for (TableCoprocessorAttribute attr : getTableCoprocessorAttrsFromSchema(conf, htd)) {
if (attr.getPriority() < 0) {
throw new IOException(
"Priority for coprocessor " + attr.getClassName() + " cannot be less than 0");
}
ClassLoader old = Thread.currentThread().getContextClassLoader();
try {
ClassLoader cl;
if (attr.getPath() != null) {
cl = CoprocessorClassLoader.getClassLoader(attr.getPath(),
CoprocessorHost.class.getClassLoader(), pathPrefix, conf);
} else {
cl = CoprocessorHost.class.getClassLoader();
}
Thread.currentThread().setContextClassLoader(cl);
if (cl instanceof CoprocessorClassLoader) {
String[] includedClassPrefixes = null;
if (conf.get(HConstants.CP_HTD_ATTR_INCLUSION_KEY) != null) {
String prefixes = attr.conf.get(HConstants.CP_HTD_ATTR_INCLUSION_KEY);
includedClassPrefixes = prefixes.split(";");
}
((CoprocessorClassLoader) cl).loadClass(attr.getClassName(), includedClassPrefixes);
} else {
cl.loadClass(attr.getClassName());
}
} catch (ClassNotFoundException e) {
throw new IOException("Class " + attr.getClassName() + " cannot be loaded", e);
} finally {
Thread.currentThread().setContextClassLoader(old);
}
}
}
void loadTableCoprocessors(final Configuration conf) {
boolean coprocessorsEnabled =
conf.getBoolean(COPROCESSORS_ENABLED_CONF_KEY, DEFAULT_COPROCESSORS_ENABLED);
boolean tableCoprocessorsEnabled =
conf.getBoolean(USER_COPROCESSORS_ENABLED_CONF_KEY, DEFAULT_USER_COPROCESSORS_ENABLED);
if (!(coprocessorsEnabled && tableCoprocessorsEnabled)) {
return;
}
// scan the table attributes for coprocessor load specifications
// initialize the coprocessors
List<RegionCoprocessorEnvironment> configured = new ArrayList<>();
for (TableCoprocessorAttribute attr : getTableCoprocessorAttrsFromSchema(conf,
region.getTableDescriptor())) {
// Load encompasses classloading and coprocessor initialization
try {
RegionCoprocessorEnvironment env =
load(attr.getPath(), attr.getClassName(), attr.getPriority(), attr.getConf());
if (env == null) {
continue;
}
configured.add(env);
LOG.info("Loaded coprocessor " + attr.getClassName() + " from HTD of "
+ region.getTableDescriptor().getTableName().getNameAsString() + " successfully.");
} catch (Throwable t) {
// Coprocessor failed to load, do we abort on error?
if (conf.getBoolean(ABORT_ON_ERROR_KEY, DEFAULT_ABORT_ON_ERROR)) {
abortServer(attr.getClassName(), t);
} else {
LOG.error("Failed to load coprocessor " + attr.getClassName(), t);
}
}
}
// add together to coprocessor set for COW efficiency
coprocEnvironments.addAll(configured);
}
@Override
public RegionEnvironment createEnvironment(RegionCoprocessor instance, int priority, int seq,
Configuration conf) {
// If coprocessor exposes any services, register them.
for (Service service : instance.getServices()) {
region.registerService(service);
}
ConcurrentMap<String, Object> classData;
// make sure only one thread can add maps
synchronized (SHARED_DATA_MAP) {
// as long as at least one RegionEnvironment holds on to its classData it will
// remain in this map
classData = SHARED_DATA_MAP.computeIfAbsent(instance.getClass().getName(),
k -> new ConcurrentHashMap<>());
}
// If a CoreCoprocessor, return a 'richer' environment, one laden with RegionServerServices.
return instance.getClass().isAnnotationPresent(CoreCoprocessor.class)
? new RegionEnvironmentForCoreCoprocessors(instance, priority, seq, conf, region, rsServices,
classData)
: new RegionEnvironment(instance, priority, seq, conf, region, rsServices, classData);
}
@Override
public RegionCoprocessor checkAndGetInstance(Class<?> implClass)
throws InstantiationException, IllegalAccessException {
try {
if (RegionCoprocessor.class.isAssignableFrom(implClass)) {
return implClass.asSubclass(RegionCoprocessor.class).getDeclaredConstructor().newInstance();
} else {
LOG.error("{} is not of type RegionCoprocessor. Check the configuration of {}",
implClass.getName(), CoprocessorHost.REGION_COPROCESSOR_CONF_KEY);
return null;
}
} catch (NoSuchMethodException | InvocationTargetException e) {
throw (InstantiationException) new InstantiationException(implClass.getName()).initCause(e);
}
}
private ObserverGetter<RegionCoprocessor, RegionObserver> regionObserverGetter =
RegionCoprocessor::getRegionObserver;
private ObserverGetter<RegionCoprocessor, EndpointObserver> endpointObserverGetter =
RegionCoprocessor::getEndpointObserver;
abstract class RegionObserverOperationWithoutResult
extends ObserverOperationWithoutResult<RegionObserver> {
public RegionObserverOperationWithoutResult() {
super(regionObserverGetter);
}
public RegionObserverOperationWithoutResult(User user) {
super(regionObserverGetter, user);
}
public RegionObserverOperationWithoutResult(boolean bypassable) {
super(regionObserverGetter, null, bypassable);
}
public RegionObserverOperationWithoutResult(User user, boolean bypassable) {
super(regionObserverGetter, user, bypassable);
}
}
abstract class BulkLoadObserverOperation
extends ObserverOperationWithoutResult<BulkLoadObserver> {
public BulkLoadObserverOperation(User user) {
super(RegionCoprocessor::getBulkLoadObserver, user);
}
}
//////////////////////////////////////////////////////////////////////////////////////////////////
// Observer operations
//////////////////////////////////////////////////////////////////////////////////////////////////
//////////////////////////////////////////////////////////////////////////////////////////////////
// Observer operations
//////////////////////////////////////////////////////////////////////////////////////////////////
/**
* Invoked before a region open.
* @throws IOException Signals that an I/O exception has occurred.
*/
public void preOpen() throws IOException {
if (coprocEnvironments.isEmpty()) {
return;
}
execOperation(new RegionObserverOperationWithoutResult() {
@Override
public void call(RegionObserver observer) throws IOException {
observer.preOpen(this);
}
});
}
/**
* Invoked after a region open
*/
public void postOpen() {
if (coprocEnvironments.isEmpty()) {
return;
}
try {
execOperation(new RegionObserverOperationWithoutResult() {
@Override
public void call(RegionObserver observer) throws IOException {
observer.postOpen(this);
}
});
} catch (IOException e) {
LOG.warn(e.toString(), e);
}
}
/**
* Invoked before a region is closed
* @param abortRequested true if the server is aborting
*/
public void preClose(final boolean abortRequested) throws IOException {
execOperation(new RegionObserverOperationWithoutResult() {
@Override
public void call(RegionObserver observer) throws IOException {
observer.preClose(this, abortRequested);
}
});
}
/**
* Invoked after a region is closed
* @param abortRequested true if the server is aborting
*/
public void postClose(final boolean abortRequested) {
try {
execOperation(new RegionObserverOperationWithoutResult() {
@Override
public void call(RegionObserver observer) throws IOException {
observer.postClose(this, abortRequested);
}
@Override
public void postEnvCall() {
shutdown(this.getEnvironment());
}
});
} catch (IOException e) {
LOG.warn(e.toString(), e);
}
}
/**
* Called prior to selecting the {@link HStoreFile}s for compaction from the list of currently
* available candidates.
* <p>
* Supports Coprocessor 'bypass' -- 'bypass' is how this method indicates that it changed the
* passed in <code>candidates</code>.
* @param store The store where compaction is being requested
* @param candidates The currently available store files
* @param tracker used to track the life cycle of a compaction
* @param user the user
*/
public boolean preCompactSelection(final HStore store, final List<HStoreFile> candidates,
final CompactionLifeCycleTracker tracker, final User user) throws IOException {
if (coprocEnvironments.isEmpty()) {
return false;
}
boolean bypassable = true;
return execOperation(new RegionObserverOperationWithoutResult(user, bypassable) {
@Override
public void call(RegionObserver observer) throws IOException {
observer.preCompactSelection(this, store, candidates, tracker);
}
});
}
/**
* Called after the {@link HStoreFile}s to be compacted have been selected from the available
* candidates.
* @param store The store where compaction is being requested
* @param selected The store files selected to compact
* @param tracker used to track the life cycle of a compaction
* @param request the compaction request
* @param user the user
*/
public void postCompactSelection(final HStore store, final List<HStoreFile> selected,
final CompactionLifeCycleTracker tracker, final CompactionRequest request, final User user)
throws IOException {
if (coprocEnvironments.isEmpty()) {
return;
}
execOperation(new RegionObserverOperationWithoutResult(user) {
@Override
public void call(RegionObserver observer) throws IOException {
observer.postCompactSelection(this, store, selected, tracker, request);
}
});
}
/**
* Called prior to opening store scanner for compaction.
*/
public ScanInfo preCompactScannerOpen(HStore store, ScanType scanType,
CompactionLifeCycleTracker tracker, CompactionRequest request, User user) throws IOException {
if (coprocEnvironments.isEmpty()) {
return store.getScanInfo();
}
CustomizedScanInfoBuilder builder = new CustomizedScanInfoBuilder(store.getScanInfo());
execOperation(new RegionObserverOperationWithoutResult(user) {
@Override
public void call(RegionObserver observer) throws IOException {
observer.preCompactScannerOpen(this, store, scanType, builder, tracker, request);
}
});
return builder.build();
}
/**
* Called prior to rewriting the store files selected for compaction
* @param store the store being compacted
* @param scanner the scanner used to read store data during compaction
* @param scanType type of Scan
* @param tracker used to track the life cycle of a compaction
* @param request the compaction request
* @param user the user
* @return Scanner to use (cannot be null!)
*/
public InternalScanner preCompact(final HStore store, final InternalScanner scanner,
final ScanType scanType, final CompactionLifeCycleTracker tracker,
final CompactionRequest request, final User user) throws IOException {
InternalScanner defaultResult = scanner;
if (coprocEnvironments.isEmpty()) {
return defaultResult;
}
return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, InternalScanner>(
regionObserverGetter, defaultResult, user) {
@Override
public InternalScanner call(RegionObserver observer) throws IOException {
InternalScanner scanner =
observer.preCompact(this, store, getResult(), scanType, tracker, request);
if (scanner == null) {
throw new CoprocessorException("Null Scanner return disallowed!");
}
return scanner;
}
});
}
/**
* Called after the store compaction has completed.
* @param store the store being compacted
* @param resultFile the new store file written during compaction
* @param tracker used to track the life cycle of a compaction
* @param request the compaction request
* @param user the user
*/
public void postCompact(final HStore store, final HStoreFile resultFile,
final CompactionLifeCycleTracker tracker, final CompactionRequest request, final User user)
throws IOException {
execOperation(
coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult(user) {
@Override
public void call(RegionObserver observer) throws IOException {
observer.postCompact(this, store, resultFile, tracker, request);
}
});
}
/**
* Invoked before create StoreScanner for flush.
*/
public ScanInfo preFlushScannerOpen(HStore store, FlushLifeCycleTracker tracker)
throws IOException {
if (coprocEnvironments.isEmpty()) {
return store.getScanInfo();
}
CustomizedScanInfoBuilder builder = new CustomizedScanInfoBuilder(store.getScanInfo());
execOperation(new RegionObserverOperationWithoutResult() {
@Override
public void call(RegionObserver observer) throws IOException {
observer.preFlushScannerOpen(this, store, builder, tracker);
}
});
return builder.build();
}
/**
* Invoked before a memstore flush
* @return Scanner to use (cannot be null!)
*/
public InternalScanner preFlush(HStore store, InternalScanner scanner,
FlushLifeCycleTracker tracker) throws IOException {
if (coprocEnvironments.isEmpty()) {
return scanner;
}
return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, InternalScanner>(
regionObserverGetter, scanner) {
@Override
public InternalScanner call(RegionObserver observer) throws IOException {
InternalScanner scanner = observer.preFlush(this, store, getResult(), tracker);
if (scanner == null) {
throw new CoprocessorException("Null Scanner return disallowed!");
}
return scanner;
}
});
}
/**
* Invoked before a memstore flush
*/
public void preFlush(FlushLifeCycleTracker tracker) throws IOException {
execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() {
@Override
public void call(RegionObserver observer) throws IOException {
observer.preFlush(this, tracker);
}
});
}
/**
* Invoked after a memstore flush
*/
public void postFlush(FlushLifeCycleTracker tracker) throws IOException {
execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() {
@Override
public void call(RegionObserver observer) throws IOException {
observer.postFlush(this, tracker);
}
});
}
/**
* Invoked before in memory compaction.
*/
public void preMemStoreCompaction(HStore store) throws IOException {
execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() {
@Override
public void call(RegionObserver observer) throws IOException {
observer.preMemStoreCompaction(this, store);
}
});
}
/**
* Invoked before create StoreScanner for in memory compaction.
*/
public ScanInfo preMemStoreCompactionCompactScannerOpen(HStore store) throws IOException {
CustomizedScanInfoBuilder builder = new CustomizedScanInfoBuilder(store.getScanInfo());
execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() {
@Override
public void call(RegionObserver observer) throws IOException {
observer.preMemStoreCompactionCompactScannerOpen(this, store, builder);
}
});
return builder.build();
}
/**
* Invoked before compacting memstore.
*/
public InternalScanner preMemStoreCompactionCompact(HStore store, InternalScanner scanner)
throws IOException {
if (coprocEnvironments.isEmpty()) {
return scanner;
}
return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, InternalScanner>(
regionObserverGetter, scanner) {
@Override
public InternalScanner call(RegionObserver observer) throws IOException {
return observer.preMemStoreCompactionCompact(this, store, getResult());
}
});
}
/**
* Invoked after in memory compaction.
*/
public void postMemStoreCompaction(HStore store) throws IOException {
execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() {
@Override
public void call(RegionObserver observer) throws IOException {
observer.postMemStoreCompaction(this, store);
}
});
}
/**
* Invoked after a memstore flush
*/
public void postFlush(HStore store, HStoreFile storeFile, FlushLifeCycleTracker tracker)
throws IOException {
if (coprocEnvironments.isEmpty()) {
return;
}
execOperation(new RegionObserverOperationWithoutResult() {
@Override
public void call(RegionObserver observer) throws IOException {
observer.postFlush(this, store, storeFile, tracker);
}
});
}
// RegionObserver support
/**
* Supports Coprocessor 'bypass'.
* @param get the Get request
* @param results What to return if return is true/'bypass'.
* @return true if default processing should be bypassed.
* @exception IOException Exception
*/
public boolean preGet(final Get get, final List<Cell> results) throws IOException {
if (coprocEnvironments.isEmpty()) {
return false;
}
boolean bypassable = true;
return execOperation(new RegionObserverOperationWithoutResult(bypassable) {
@Override
public void call(RegionObserver observer) throws IOException {
observer.preGetOp(this, get, results);
}
});
}
/**
* @param get the Get request
* @param results the result set
* @exception IOException Exception
*/
public void postGet(final Get get, final List<Cell> results) throws IOException {
if (coprocEnvironments.isEmpty()) {
return;
}
execOperation(new RegionObserverOperationWithoutResult() {
@Override
public void call(RegionObserver observer) throws IOException {
observer.postGetOp(this, get, results);
}
});
}
/**
* Supports Coprocessor 'bypass'.
* @param get the Get request
* @return true or false to return to client if bypassing normal operation, or null otherwise
* @exception IOException Exception
*/
public Boolean preExists(final Get get) throws IOException {
boolean bypassable = true;
boolean defaultResult = false;
if (coprocEnvironments.isEmpty()) {
return null;
}
return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, Boolean>(
regionObserverGetter, defaultResult, bypassable) {
@Override
public Boolean call(RegionObserver observer) throws IOException {
return observer.preExists(this, get, getResult());
}
});
}
/**
* @param get the Get request
* @param result the result returned by the region server
* @return the result to return to the client
* @exception IOException Exception
*/
public boolean postExists(final Get get, boolean result) throws IOException {
if (this.coprocEnvironments.isEmpty()) {
return result;
}
return execOperationWithResult(
new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter, result) {
@Override
public Boolean call(RegionObserver observer) throws IOException {
return observer.postExists(this, get, getResult());
}
});
}
/**
* Supports Coprocessor 'bypass'.
* @param put The Put object
* @param edit The WALEdit object.
* @return true if default processing should be bypassed
* @exception IOException Exception
*/
public boolean prePut(final Put put, final WALEdit edit) throws IOException {
if (coprocEnvironments.isEmpty()) {
return false;
}
boolean bypassable = true;
return execOperation(new RegionObserverOperationWithoutResult(bypassable) {
@Override
public void call(RegionObserver observer) throws IOException {
observer.prePut(this, put, edit);
}
});
}
/**
* Supports Coprocessor 'bypass'.
* @param mutation - the current mutation
* @param kv - the current cell
* @param byteNow - current timestamp in bytes
* @param get - the get that could be used Note that the get only does not specify the family
* and qualifier that should be used
* @return true if default processing should be bypassed
* @deprecated In hbase-2.0.0. Will be removed in hbase-4.0.0. Added explicitly for a single
* Coprocessor for its needs only. Will be removed. VisibilityController still needs
* this, need to change the logic there first.
*/
@Deprecated
public boolean prePrepareTimeStampForDeleteVersion(final Mutation mutation, final Cell kv,
final byte[] byteNow, final Get get) throws IOException {
if (coprocEnvironments.isEmpty()) {
return false;
}
boolean bypassable = true;
return execOperation(new RegionObserverOperationWithoutResult(bypassable) {
@Override
public void call(RegionObserver observer) throws IOException {
observer.prePrepareTimeStampForDeleteVersion(this, mutation, kv, byteNow, get);
}
});
}
/**
* @param put The Put object
* @param edit The WALEdit object.
* @exception IOException Exception
*/
public void postPut(final Put put, final WALEdit edit) throws IOException {
if (coprocEnvironments.isEmpty()) {
return;
}
execOperation(new RegionObserverOperationWithoutResult() {
@Override
public void call(RegionObserver observer) throws IOException {
observer.postPut(this, put, edit);
}
});
}
/**
* Supports Coprocessor 'bypass'.
* @param delete The Delete object
* @param edit The WALEdit object.
* @return true if default processing should be bypassed
* @exception IOException Exception
*/
public boolean preDelete(final Delete delete, final WALEdit edit) throws IOException {
if (this.coprocEnvironments.isEmpty()) {
return false;
}
boolean bypassable = true;
return execOperation(new RegionObserverOperationWithoutResult(bypassable) {
@Override
public void call(RegionObserver observer) throws IOException {
observer.preDelete(this, delete, edit);
}
});
}
/**
* @param delete The Delete object
* @param edit The WALEdit object.
* @exception IOException Exception
*/
public void postDelete(final Delete delete, final WALEdit edit) throws IOException {
execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() {
@Override
public void call(RegionObserver observer) throws IOException {
observer.postDelete(this, delete, edit);
}
});
}
public void preBatchMutate(final MiniBatchOperationInProgress<Mutation> miniBatchOp)
throws IOException {
if (this.coprocEnvironments.isEmpty()) {
return;
}
execOperation(new RegionObserverOperationWithoutResult() {
@Override
public void call(RegionObserver observer) throws IOException {
observer.preBatchMutate(this, miniBatchOp);
}
});
}
public void postBatchMutate(final MiniBatchOperationInProgress<Mutation> miniBatchOp)
throws IOException {
if (this.coprocEnvironments.isEmpty()) {
return;
}
execOperation(new RegionObserverOperationWithoutResult() {
@Override
public void call(RegionObserver observer) throws IOException {
observer.postBatchMutate(this, miniBatchOp);
}
});
}
public void postBatchMutateIndispensably(final MiniBatchOperationInProgress<Mutation> miniBatchOp,
final boolean success) throws IOException {
if (this.coprocEnvironments.isEmpty()) {
return;
}
execOperation(new RegionObserverOperationWithoutResult() {
@Override
public void call(RegionObserver observer) throws IOException {
observer.postBatchMutateIndispensably(this, miniBatchOp, success);
}
});
}
/**
* Supports Coprocessor 'bypass'.
* @param checkAndMutate the CheckAndMutate object
* @return true or false to return to client if default processing should be bypassed, or null
* otherwise
* @throws IOException if an error occurred on the coprocessor
*/
public CheckAndMutateResult preCheckAndMutate(CheckAndMutate checkAndMutate) throws IOException {
boolean bypassable = true;
CheckAndMutateResult defaultResult = new CheckAndMutateResult(false, null);
if (coprocEnvironments.isEmpty()) {
return null;
}
return execOperationWithResult(
new ObserverOperationWithResult<RegionObserver, CheckAndMutateResult>(regionObserverGetter,
defaultResult, bypassable) {
@Override
public CheckAndMutateResult call(RegionObserver observer) throws IOException {
return observer.preCheckAndMutate(this, checkAndMutate, getResult());
}
});
}
/**
* Supports Coprocessor 'bypass'.
* @param checkAndMutate the CheckAndMutate object
* @return true or false to return to client if default processing should be bypassed, or null
* otherwise
* @throws IOException if an error occurred on the coprocessor
*/
public CheckAndMutateResult preCheckAndMutateAfterRowLock(CheckAndMutate checkAndMutate)
throws IOException {
boolean bypassable = true;
CheckAndMutateResult defaultResult = new CheckAndMutateResult(false, null);
if (coprocEnvironments.isEmpty()) {
return null;
}
return execOperationWithResult(
new ObserverOperationWithResult<RegionObserver, CheckAndMutateResult>(regionObserverGetter,
defaultResult, bypassable) {
@Override
public CheckAndMutateResult call(RegionObserver observer) throws IOException {
return observer.preCheckAndMutateAfterRowLock(this, checkAndMutate, getResult());
}
});
}
/**
* @param checkAndMutate the CheckAndMutate object
* @param result the result returned by the checkAndMutate
* @return true or false to return to client if default processing should be bypassed, or null
* otherwise
* @throws IOException if an error occurred on the coprocessor
*/
public CheckAndMutateResult postCheckAndMutate(CheckAndMutate checkAndMutate,
CheckAndMutateResult result) throws IOException {
if (this.coprocEnvironments.isEmpty()) {
return result;
}
return execOperationWithResult(
new ObserverOperationWithResult<RegionObserver, CheckAndMutateResult>(regionObserverGetter,
result) {
@Override
public CheckAndMutateResult call(RegionObserver observer) throws IOException {
return observer.postCheckAndMutate(this, checkAndMutate, getResult());
}
});
}
/**
* Supports Coprocessor 'bypass'.
* @param append append object
* @param edit The WALEdit object.
* @return result to return to client if default operation should be bypassed, null otherwise
* @throws IOException if an error occurred on the coprocessor
*/
public Result preAppend(final Append append, final WALEdit edit) throws IOException {
boolean bypassable = true;
Result defaultResult = null;
if (this.coprocEnvironments.isEmpty()) {
return defaultResult;
}
return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, Result>(
regionObserverGetter, defaultResult, bypassable) {
@Override
public Result call(RegionObserver observer) throws IOException {
return observer.preAppend(this, append, edit);
}
});
}
/**
* Supports Coprocessor 'bypass'.
* @param append append object
* @return result to return to client if default operation should be bypassed, null otherwise
* @throws IOException if an error occurred on the coprocessor
*/
public Result preAppendAfterRowLock(final Append append) throws IOException {
boolean bypassable = true;
Result defaultResult = null;
if (this.coprocEnvironments.isEmpty()) {
return defaultResult;
}
return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, Result>(
regionObserverGetter, defaultResult, bypassable) {
@Override
public Result call(RegionObserver observer) throws IOException {
return observer.preAppendAfterRowLock(this, append);
}
});
}
/**
* Supports Coprocessor 'bypass'.
* @param increment increment object
* @param edit The WALEdit object.
* @return result to return to client if default operation should be bypassed, null otherwise
* @throws IOException if an error occurred on the coprocessor
*/
public Result preIncrement(final Increment increment, final WALEdit edit) throws IOException {
boolean bypassable = true;
Result defaultResult = null;
if (coprocEnvironments.isEmpty()) {
return defaultResult;
}
return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, Result>(
regionObserverGetter, defaultResult, bypassable) {
@Override
public Result call(RegionObserver observer) throws IOException {
return observer.preIncrement(this, increment, edit);
}
});
}
/**
* Supports Coprocessor 'bypass'.
* @param increment increment object
* @return result to return to client if default operation should be bypassed, null otherwise
* @throws IOException if an error occurred on the coprocessor
*/
public Result preIncrementAfterRowLock(final Increment increment) throws IOException {
boolean bypassable = true;
Result defaultResult = null;
if (coprocEnvironments.isEmpty()) {
return defaultResult;
}
return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, Result>(
regionObserverGetter, defaultResult, bypassable) {
@Override
public Result call(RegionObserver observer) throws IOException {
return observer.preIncrementAfterRowLock(this, increment);
}
});
}
/**
* @param append Append object
* @param result the result returned by the append
* @param edit The WALEdit object.
* @throws IOException if an error occurred on the coprocessor
*/
public Result postAppend(final Append append, final Result result, final WALEdit edit)
throws IOException {
if (this.coprocEnvironments.isEmpty()) {
return result;
}
return execOperationWithResult(
new ObserverOperationWithResult<RegionObserver, Result>(regionObserverGetter, result) {
@Override
public Result call(RegionObserver observer) throws IOException {
return observer.postAppend(this, append, result, edit);
}
});
}
/**
* @param increment increment object
* @param result the result returned by postIncrement
* @param edit The WALEdit object.
* @throws IOException if an error occurred on the coprocessor
*/
public Result postIncrement(final Increment increment, Result result, final WALEdit edit)
throws IOException {
if (this.coprocEnvironments.isEmpty()) {
return result;
}
return execOperationWithResult(
new ObserverOperationWithResult<RegionObserver, Result>(regionObserverGetter, result) {
@Override
public Result call(RegionObserver observer) throws IOException {
return observer.postIncrement(this, increment, getResult(), edit);
}
});
}
/**
* @param scan the Scan specification
* @exception IOException Exception
*/
public void preScannerOpen(final Scan scan) throws IOException {
execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() {
@Override
public void call(RegionObserver observer) throws IOException {
observer.preScannerOpen(this, scan);
}
});
}
/**
* @param scan the Scan specification
* @param s the scanner
* @return the scanner instance to use
* @exception IOException Exception
*/
public RegionScanner postScannerOpen(final Scan scan, RegionScanner s) throws IOException {
if (this.coprocEnvironments.isEmpty()) {
return s;
}
return execOperationWithResult(
new ObserverOperationWithResult<RegionObserver, RegionScanner>(regionObserverGetter, s) {
@Override
public RegionScanner call(RegionObserver observer) throws IOException {
return observer.postScannerOpen(this, scan, getResult());
}
});
}
/**
* @param s the scanner
* @param results the result set returned by the region server
* @param limit the maximum number of results to return
* @return 'has next' indication to client if bypassing default behavior, or null otherwise
* @exception IOException Exception
*/
public Boolean preScannerNext(final InternalScanner s, final List<Result> results,
final int limit) throws IOException {
boolean bypassable = true;
boolean defaultResult = false;
if (coprocEnvironments.isEmpty()) {
return null;
}
return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, Boolean>(
regionObserverGetter, defaultResult, bypassable) {
@Override
public Boolean call(RegionObserver observer) throws IOException {
return observer.preScannerNext(this, s, results, limit, getResult());
}
});
}
/**
* @param s the scanner
* @param results the result set returned by the region server
* @param limit the maximum number of results to return
* @return 'has more' indication to give to client
* @exception IOException Exception
*/
public boolean postScannerNext(final InternalScanner s, final List<Result> results,
final int limit, boolean hasMore) throws IOException {
if (this.coprocEnvironments.isEmpty()) {
return hasMore;
}
return execOperationWithResult(
new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter, hasMore) {
@Override
public Boolean call(RegionObserver observer) throws IOException {
return observer.postScannerNext(this, s, results, limit, getResult());
}
});
}
/**
* This will be called by the scan flow when the current scanned row is being filtered out by the
* filter.
* @param s the scanner
* @param curRowCell The cell in the current row which got filtered out
* @return whether more rows are available for the scanner or not
*/
public boolean postScannerFilterRow(final InternalScanner s, final Cell curRowCell)
throws IOException {
// short circuit for performance
boolean defaultResult = true;
if (!hasCustomPostScannerFilterRow) {
return defaultResult;
}
if (this.coprocEnvironments.isEmpty()) {
return defaultResult;
}
return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, Boolean>(
regionObserverGetter, defaultResult) {
@Override
public Boolean call(RegionObserver observer) throws IOException {
return observer.postScannerFilterRow(this, s, curRowCell, getResult());
}
});
}
/**
* Supports Coprocessor 'bypass'.
* @param s the scanner
* @return true if default behavior should be bypassed, false otherwise
* @exception IOException Exception
*/
// Should this be bypassable?
public boolean preScannerClose(final InternalScanner s) throws IOException {
return execOperation(
coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult(true) {
@Override
public void call(RegionObserver observer) throws IOException {
observer.preScannerClose(this, s);
}
});
}
/**
* @exception IOException Exception
*/
public void postScannerClose(final InternalScanner s) throws IOException {
execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() {
@Override
public void call(RegionObserver observer) throws IOException {
observer.postScannerClose(this, s);
}
});
}
/**
* Called before open store scanner for user scan.
*/
public ScanInfo preStoreScannerOpen(HStore store, Scan scan) throws IOException {
if (coprocEnvironments.isEmpty()) return store.getScanInfo();
CustomizedScanInfoBuilder builder = new CustomizedScanInfoBuilder(store.getScanInfo(), scan);
execOperation(new RegionObserverOperationWithoutResult() {
@Override
public void call(RegionObserver observer) throws IOException {
observer.preStoreScannerOpen(this, store, builder);
}
});
return builder.build();
}
/**
* @param info the RegionInfo for this region
* @param edits the file of recovered edits
*/
public void preReplayWALs(final RegionInfo info, final Path edits) throws IOException {
execOperation(
coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult(true) {
@Override
public void call(RegionObserver observer) throws IOException {
observer.preReplayWALs(this, info, edits);
}
});
}
/**
* @param info the RegionInfo for this region
* @param edits the file of recovered edits
* @throws IOException Exception
*/
public void postReplayWALs(final RegionInfo info, final Path edits) throws IOException {
execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() {
@Override
public void call(RegionObserver observer) throws IOException {
observer.postReplayWALs(this, info, edits);
}
});
}
/**
* @param familyPaths pairs of { CF, file path } submitted for bulk load
*/
public void preBulkLoadHFile(final List<Pair<byte[], String>> familyPaths) throws IOException {
execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() {
@Override
public void call(RegionObserver observer) throws IOException {
observer.preBulkLoadHFile(this, familyPaths);
}
});
}
public boolean preCommitStoreFile(final byte[] family, final List<Pair<Path, Path>> pairs)
throws IOException {
return execOperation(
coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() {
@Override
public void call(RegionObserver observer) throws IOException {
observer.preCommitStoreFile(this, family, pairs);
}
});
}
public void postCommitStoreFile(final byte[] family, Path srcPath, Path dstPath)
throws IOException {
execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() {
@Override
public void call(RegionObserver observer) throws IOException {
observer.postCommitStoreFile(this, family, srcPath, dstPath);
}
});
}
/**
* @param familyPaths pairs of { CF, file path } submitted for bulk load
* @param map Map of CF to List of file paths for the final loaded files
*/
public void postBulkLoadHFile(final List<Pair<byte[], String>> familyPaths,
Map<byte[], List<Path>> map) throws IOException {
if (this.coprocEnvironments.isEmpty()) {
return;
}
execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() {
@Override
public void call(RegionObserver observer) throws IOException {
observer.postBulkLoadHFile(this, familyPaths, map);
}
});
}
public void postStartRegionOperation(final Operation op) throws IOException {
execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() {
@Override
public void call(RegionObserver observer) throws IOException {
observer.postStartRegionOperation(this, op);
}
});
}
public void postCloseRegionOperation(final Operation op) throws IOException {
execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() {
@Override
public void call(RegionObserver observer) throws IOException {
observer.postCloseRegionOperation(this, op);
}
});
}
/**
* @param fs fileystem to read from
* @param p path to the file
* @param in {@link FSDataInputStreamWrapper}
* @param size Full size of the file
* @param r original reference file. This will be not null only when reading a split file.
* @return a Reader instance to use instead of the base reader if overriding default behavior,
* null otherwise
*/
public StoreFileReader preStoreFileReaderOpen(final FileSystem fs, final Path p,
final FSDataInputStreamWrapper in, final long size, final CacheConfig cacheConf,
final Reference r) throws IOException {
if (coprocEnvironments.isEmpty()) {
return null;
}
return execOperationWithResult(
new ObserverOperationWithResult<RegionObserver, StoreFileReader>(regionObserverGetter, null) {
@Override
public StoreFileReader call(RegionObserver observer) throws IOException {
return observer.preStoreFileReaderOpen(this, fs, p, in, size, cacheConf, r, getResult());
}
});
}
/**
* @param fs fileystem to read from
* @param p path to the file
* @param in {@link FSDataInputStreamWrapper}
* @param size Full size of the file
* @param r original reference file. This will be not null only when reading a split file.
* @param reader the base reader instance
* @return The reader to use
*/
public StoreFileReader postStoreFileReaderOpen(final FileSystem fs, final Path p,
final FSDataInputStreamWrapper in, final long size, final CacheConfig cacheConf,
final Reference r, final StoreFileReader reader) throws IOException {
if (this.coprocEnvironments.isEmpty()) {
return reader;
}
return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, StoreFileReader>(
regionObserverGetter, reader) {
@Override
public StoreFileReader call(RegionObserver observer) throws IOException {
return observer.postStoreFileReaderOpen(this, fs, p, in, size, cacheConf, r, getResult());
}
});
}
public List<Pair<Cell, Cell>> postIncrementBeforeWAL(final Mutation mutation,
final List<Pair<Cell, Cell>> cellPairs) throws IOException {
if (this.coprocEnvironments.isEmpty()) {
return cellPairs;
}
return execOperationWithResult(
new ObserverOperationWithResult<RegionObserver, List<Pair<Cell, Cell>>>(regionObserverGetter,
cellPairs) {
@Override
public List<Pair<Cell, Cell>> call(RegionObserver observer) throws IOException {
return observer.postIncrementBeforeWAL(this, mutation, getResult());
}
});
}
public List<Pair<Cell, Cell>> postAppendBeforeWAL(final Mutation mutation,
final List<Pair<Cell, Cell>> cellPairs) throws IOException {
if (this.coprocEnvironments.isEmpty()) {
return cellPairs;
}
return execOperationWithResult(
new ObserverOperationWithResult<RegionObserver, List<Pair<Cell, Cell>>>(regionObserverGetter,
cellPairs) {
@Override
public List<Pair<Cell, Cell>> call(RegionObserver observer) throws IOException {
return observer.postAppendBeforeWAL(this, mutation, getResult());
}
});
}
public void preWALAppend(WALKey key, WALEdit edit) throws IOException {
if (this.coprocEnvironments.isEmpty()) {
return;
}
execOperation(new RegionObserverOperationWithoutResult() {
@Override
public void call(RegionObserver observer) throws IOException {
observer.preWALAppend(this, key, edit);
}
});
}
public Message preEndpointInvocation(final Service service, final String methodName,
Message request) throws IOException {
if (coprocEnvironments.isEmpty()) {
return request;
}
return execOperationWithResult(
new ObserverOperationWithResult<EndpointObserver, Message>(endpointObserverGetter, request) {
@Override
public Message call(EndpointObserver observer) throws IOException {
return observer.preEndpointInvocation(this, service, methodName, getResult());
}
});
}
public void postEndpointInvocation(final Service service, final String methodName,
final Message request, final Message.Builder responseBuilder) throws IOException {
execOperation(coprocEnvironments.isEmpty()
? null
: new ObserverOperationWithoutResult<EndpointObserver>(endpointObserverGetter) {
@Override
public void call(EndpointObserver observer) throws IOException {
observer.postEndpointInvocation(this, service, methodName, request, responseBuilder);
}
});
}
/**
* @deprecated Since 2.0 with out any replacement and will be removed in 3.0
*/
@Deprecated
public DeleteTracker postInstantiateDeleteTracker(DeleteTracker result) throws IOException {
if (this.coprocEnvironments.isEmpty()) {
return result;
}
return execOperationWithResult(
new ObserverOperationWithResult<RegionObserver, DeleteTracker>(regionObserverGetter, result) {
@Override
public DeleteTracker call(RegionObserver observer) throws IOException {
return observer.postInstantiateDeleteTracker(this, getResult());
}
});
}
/////////////////////////////////////////////////////////////////////////////////////////////////
// BulkLoadObserver hooks
/////////////////////////////////////////////////////////////////////////////////////////////////
public void prePrepareBulkLoad(User user) throws IOException {
execOperation(coprocEnvironments.isEmpty() ? null : new BulkLoadObserverOperation(user) {
@Override
protected void call(BulkLoadObserver observer) throws IOException {
observer.prePrepareBulkLoad(this);
}
});
}
public void preCleanupBulkLoad(User user) throws IOException {
execOperation(coprocEnvironments.isEmpty() ? null : new BulkLoadObserverOperation(user) {
@Override
protected void call(BulkLoadObserver observer) throws IOException {
observer.preCleanupBulkLoad(this);
}
});
}
}