| /* |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.hbase.regionserver; |
| |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.NavigableSet; |
| import java.util.UUID; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ConcurrentMap; |
| import java.util.regex.Matcher; |
| |
| import com.google.common.collect.ImmutableList; |
| import com.google.common.collect.Lists; |
| import com.google.protobuf.Message; |
| import com.google.protobuf.Service; |
| |
| import org.apache.commons.collections.map.AbstractReferenceMap; |
| import org.apache.commons.collections.map.ReferenceMap; |
| import org.apache.commons.lang.ClassUtils; |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.hbase.classification.InterfaceAudience; |
| import org.apache.hadoop.hbase.classification.InterfaceStability; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.hbase.Cell; |
| import org.apache.hadoop.hbase.Coprocessor; |
| import org.apache.hadoop.hbase.CoprocessorEnvironment; |
| import org.apache.hadoop.hbase.HBaseConfiguration; |
| import org.apache.hadoop.hbase.HBaseInterfaceAudience; |
| import org.apache.hadoop.hbase.HConstants; |
| import org.apache.hadoop.hbase.HRegionInfo; |
| import org.apache.hadoop.hbase.HTableDescriptor; |
| import org.apache.hadoop.hbase.client.Append; |
| import org.apache.hadoop.hbase.client.Delete; |
| import org.apache.hadoop.hbase.client.Durability; |
| import org.apache.hadoop.hbase.client.Get; |
| import org.apache.hadoop.hbase.client.Increment; |
| import org.apache.hadoop.hbase.client.Mutation; |
| import org.apache.hadoop.hbase.client.Put; |
| import org.apache.hadoop.hbase.client.Result; |
| import org.apache.hadoop.hbase.client.Scan; |
| import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; |
| import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; |
| import org.apache.hadoop.hbase.coprocessor.CoprocessorService; |
| import org.apache.hadoop.hbase.coprocessor.EndpointObserver; |
| import org.apache.hadoop.hbase.coprocessor.ObserverContext; |
| import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; |
| import org.apache.hadoop.hbase.coprocessor.RegionObserver; |
| import org.apache.hadoop.hbase.coprocessor.RegionObserver.MutationType; |
| import org.apache.hadoop.hbase.filter.ByteArrayComparable; |
| import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; |
| import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; |
| import org.apache.hadoop.hbase.io.Reference; |
| import org.apache.hadoop.hbase.io.hfile.CacheConfig; |
| import org.apache.hadoop.hbase.ipc.RpcServer; |
| import org.apache.hadoop.hbase.regionserver.Region.Operation; |
| import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; |
| import org.apache.hadoop.hbase.regionserver.querymatcher.DeleteTracker; |
| import org.apache.hadoop.hbase.regionserver.wal.HLogKey; |
| import org.apache.hadoop.hbase.security.User; |
| import org.apache.hadoop.hbase.wal.WALKey; |
| import org.apache.hadoop.hbase.regionserver.wal.WALEdit; |
| import org.apache.hadoop.hbase.util.Bytes; |
| import org.apache.hadoop.hbase.util.CoprocessorClassLoader; |
| import org.apache.hadoop.hbase.util.Pair; |
| |
| /** |
| * Implements the coprocessor environment and runtime support for coprocessors |
| * loaded within a {@link Region}. |
| */ |
| @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC) |
| @InterfaceStability.Evolving |
| public class RegionCoprocessorHost |
| extends CoprocessorHost<RegionCoprocessorHost.RegionEnvironment> { |
| |
| private static final Log LOG = LogFactory.getLog(RegionCoprocessorHost.class); |
| // The shared data map |
| private static ReferenceMap sharedDataMap = |
| new ReferenceMap(AbstractReferenceMap.HARD, AbstractReferenceMap.WEAK); |
| |
| // optimization: no need to call postScannerFilterRow, if no coprocessor implements it |
| private final boolean hasCustomPostScannerFilterRow; |
| |
| /** |
| * |
| * Encapsulation of the environment of each coprocessor |
| */ |
| static class RegionEnvironment extends CoprocessorHost.Environment |
| implements RegionCoprocessorEnvironment { |
| |
| private Region region; |
| private RegionServerServices rsServices; |
| ConcurrentMap<String, Object> sharedData; |
| private final boolean useLegacyPre; |
| private final boolean useLegacyPost; |
| |
| /** |
| * Constructor |
| * @param impl the coprocessor instance |
| * @param priority chaining priority |
| */ |
| public RegionEnvironment(final Coprocessor impl, final int priority, |
| final int seq, final Configuration conf, final Region region, |
| final RegionServerServices services, final ConcurrentMap<String, Object> sharedData) { |
| super(impl, priority, seq, conf); |
| this.region = region; |
| this.rsServices = services; |
| this.sharedData = sharedData; |
| // Pick which version of the WAL related events we'll call. |
| // This way we avoid calling the new version on older RegionObservers so |
| // we can maintain binary compatibility. |
| // See notes in javadoc for RegionObserver |
| useLegacyPre = useLegacyMethod(impl.getClass(), "preWALRestore", ObserverContext.class, |
| HRegionInfo.class, WALKey.class, WALEdit.class); |
| useLegacyPost = useLegacyMethod(impl.getClass(), "postWALRestore", ObserverContext.class, |
| HRegionInfo.class, WALKey.class, WALEdit.class); |
| } |
| |
| /** @return the region */ |
| @Override |
| public Region getRegion() { |
| return region; |
| } |
| |
| /** @return reference to the region server services */ |
| @Override |
| public RegionServerServices getRegionServerServices() { |
| return rsServices; |
| } |
| |
| public void shutdown() { |
| super.shutdown(); |
| } |
| |
| @Override |
| public ConcurrentMap<String, Object> getSharedData() { |
| return sharedData; |
| } |
| |
| @Override |
| public HRegionInfo getRegionInfo() { |
| return region.getRegionInfo(); |
| } |
| |
| } |
| |
| static class TableCoprocessorAttribute { |
| private Path path; |
| private String className; |
| private int priority; |
| private Configuration conf; |
| |
| public TableCoprocessorAttribute(Path path, String className, int priority, |
| Configuration conf) { |
| this.path = path; |
| this.className = className; |
| this.priority = priority; |
| this.conf = conf; |
| } |
| |
| public Path getPath() { |
| return path; |
| } |
| |
| public String getClassName() { |
| return className; |
| } |
| |
| public int getPriority() { |
| return priority; |
| } |
| |
| public Configuration getConf() { |
| return conf; |
| } |
| } |
| |
| /** The region server services */ |
| RegionServerServices rsServices; |
| /** The region */ |
| Region region; |
| |
| /** |
| * Constructor |
| * @param region the region |
| * @param rsServices interface to available region server functionality |
| * @param conf the configuration |
| */ |
| public RegionCoprocessorHost(final Region region, |
| final RegionServerServices rsServices, final Configuration conf) { |
| super(rsServices); |
| this.conf = conf; |
| this.rsServices = rsServices; |
| this.region = region; |
| this.pathPrefix = Integer.toString(this.region.getRegionInfo().hashCode()); |
| |
| // load system default cp's from configuration. |
| loadSystemCoprocessors(conf, REGION_COPROCESSOR_CONF_KEY); |
| |
| // load system default cp's for user tables from configuration. |
| if (!region.getRegionInfo().getTable().isSystemTable()) { |
| loadSystemCoprocessors(conf, USER_REGION_COPROCESSOR_CONF_KEY); |
| } |
| |
| // load Coprocessor From HDFS |
| loadTableCoprocessors(conf); |
| |
| // now check whether any coprocessor implements postScannerFilterRow |
| boolean hasCustomPostScannerFilterRow = false; |
| out: for (RegionEnvironment env: coprocessors) { |
| if (env.getInstance() instanceof RegionObserver) { |
| Class<?> clazz = env.getInstance().getClass(); |
| for(;;) { |
| if (clazz == null) { |
| // we must have directly implemented RegionObserver |
| hasCustomPostScannerFilterRow = true; |
| break out; |
| } |
| if (clazz == BaseRegionObserver.class) { |
| // we reached BaseRegionObserver, try next coprocessor |
| break; |
| } |
| try { |
| clazz.getDeclaredMethod("postScannerFilterRow", ObserverContext.class, |
| InternalScanner.class, Cell.class, boolean.class); |
| // this coprocessor has a custom version of postScannerFilterRow |
| hasCustomPostScannerFilterRow = true; |
| break out; |
| } catch (NoSuchMethodException ignore) { |
| } |
| // the deprecated signature still exists |
| try { |
| clazz.getDeclaredMethod("postScannerFilterRow", ObserverContext.class, |
| InternalScanner.class, byte[].class, int.class, short.class, boolean.class); |
| // this coprocessor has a custom version of postScannerFilterRow |
| hasCustomPostScannerFilterRow = true; |
| break out; |
| } catch (NoSuchMethodException ignore) { |
| } |
| clazz = clazz.getSuperclass(); |
| } |
| } |
| } |
| this.hasCustomPostScannerFilterRow = hasCustomPostScannerFilterRow; |
| } |
| |
| static List<TableCoprocessorAttribute> getTableCoprocessorAttrsFromSchema(Configuration conf, |
| HTableDescriptor htd) { |
| List<TableCoprocessorAttribute> result = Lists.newArrayList(); |
| for (Map.Entry<Bytes, Bytes> e: htd.getValues().entrySet()) { |
| String key = Bytes.toString(e.getKey().get()).trim(); |
| if (HConstants.CP_HTD_ATTR_KEY_PATTERN.matcher(key).matches()) { |
| String spec = Bytes.toString(e.getValue().get()).trim(); |
| // found one |
| try { |
| Matcher matcher = HConstants.CP_HTD_ATTR_VALUE_PATTERN.matcher(spec); |
| if (matcher.matches()) { |
| // jar file path can be empty if the cp class can be loaded |
| // from class loader. |
| Path path = matcher.group(1).trim().isEmpty() ? |
| null : new Path(matcher.group(1).trim()); |
| String className = matcher.group(2).trim(); |
| if (className.isEmpty()) { |
| LOG.error("Malformed table coprocessor specification: key=" + |
| key + ", spec: " + spec); |
| continue; |
| } |
| String priorityStr = matcher.group(3).trim(); |
| int priority = priorityStr.isEmpty() ? |
| Coprocessor.PRIORITY_USER : Integer.parseInt(priorityStr); |
| String cfgSpec = null; |
| try { |
| cfgSpec = matcher.group(4); |
| } catch (IndexOutOfBoundsException ex) { |
| // ignore |
| } |
| Configuration ourConf; |
| if (cfgSpec != null && !cfgSpec.trim().equals("|")) { |
| cfgSpec = cfgSpec.substring(cfgSpec.indexOf('|') + 1); |
| // do an explicit deep copy of the passed configuration |
| ourConf = new Configuration(false); |
| HBaseConfiguration.merge(ourConf, conf); |
| Matcher m = HConstants.CP_HTD_ATTR_VALUE_PARAM_PATTERN.matcher(cfgSpec); |
| while (m.find()) { |
| ourConf.set(m.group(1), m.group(2)); |
| } |
| } else { |
| ourConf = conf; |
| } |
| result.add(new TableCoprocessorAttribute(path, className, priority, ourConf)); |
| } else { |
| LOG.error("Malformed table coprocessor specification: key=" + key + |
| ", spec: " + spec); |
| } |
| } catch (Exception ioe) { |
| LOG.error("Malformed table coprocessor specification: key=" + key + |
| ", spec: " + spec); |
| } |
| } |
| } |
| return result; |
| } |
| |
| /** |
| * Sanity check the table coprocessor attributes of the supplied schema. Will |
| * throw an exception if there is a problem. |
| * @param conf |
| * @param htd |
| * @throws IOException |
| */ |
| public static void testTableCoprocessorAttrs(final Configuration conf, |
| final HTableDescriptor htd) throws IOException { |
| String pathPrefix = UUID.randomUUID().toString(); |
| for (TableCoprocessorAttribute attr: getTableCoprocessorAttrsFromSchema(conf, htd)) { |
| if (attr.getPriority() < 0) { |
| throw new IOException("Priority for coprocessor " + attr.getClassName() + |
| " cannot be less than 0"); |
| } |
| ClassLoader old = Thread.currentThread().getContextClassLoader(); |
| try { |
| ClassLoader cl; |
| if (attr.getPath() != null) { |
| cl = CoprocessorClassLoader.getClassLoader(attr.getPath(), |
| CoprocessorHost.class.getClassLoader(), pathPrefix, conf); |
| } else { |
| cl = CoprocessorHost.class.getClassLoader(); |
| } |
| Thread.currentThread().setContextClassLoader(cl); |
| cl.loadClass(attr.getClassName()); |
| } catch (ClassNotFoundException e) { |
| throw new IOException("Class " + attr.getClassName() + " cannot be loaded", e); |
| } finally { |
| Thread.currentThread().setContextClassLoader(old); |
| } |
| } |
| } |
| |
| void loadTableCoprocessors(final Configuration conf) { |
| boolean coprocessorsEnabled = conf.getBoolean(COPROCESSORS_ENABLED_CONF_KEY, |
| DEFAULT_COPROCESSORS_ENABLED); |
| boolean tableCoprocessorsEnabled = conf.getBoolean(USER_COPROCESSORS_ENABLED_CONF_KEY, |
| DEFAULT_USER_COPROCESSORS_ENABLED); |
| if (!(coprocessorsEnabled && tableCoprocessorsEnabled)) { |
| return; |
| } |
| |
| // scan the table attributes for coprocessor load specifications |
| // initialize the coprocessors |
| List<RegionEnvironment> configured = new ArrayList<RegionEnvironment>(); |
| for (TableCoprocessorAttribute attr: getTableCoprocessorAttrsFromSchema(conf, |
| region.getTableDesc())) { |
| // Load encompasses classloading and coprocessor initialization |
| try { |
| RegionEnvironment env = load(attr.getPath(), attr.getClassName(), attr.getPriority(), |
| attr.getConf()); |
| configured.add(env); |
| LOG.info("Loaded coprocessor " + attr.getClassName() + " from HTD of " + |
| region.getTableDesc().getTableName().getNameAsString() + " successfully."); |
| } catch (Throwable t) { |
| // Coprocessor failed to load, do we abort on error? |
| if (conf.getBoolean(ABORT_ON_ERROR_KEY, DEFAULT_ABORT_ON_ERROR)) { |
| abortServer(attr.getClassName(), t); |
| } else { |
| LOG.error("Failed to load coprocessor " + attr.getClassName(), t); |
| } |
| } |
| } |
| // add together to coprocessor set for COW efficiency |
| coprocessors.addAll(configured); |
| } |
| |
| @Override |
| public RegionEnvironment createEnvironment(Class<?> implClass, |
| Coprocessor instance, int priority, int seq, Configuration conf) { |
| // Check if it's an Endpoint. |
| // Due to current dynamic protocol design, Endpoint |
| // uses a different way to be registered and executed. |
| // It uses a visitor pattern to invoke registered Endpoint |
| // method. |
| for (Object itf : ClassUtils.getAllInterfaces(implClass)) { |
| Class<?> c = (Class<?>) itf; |
| if (CoprocessorService.class.isAssignableFrom(c)) { |
| region.registerService( ((CoprocessorService)instance).getService() ); |
| } |
| } |
| ConcurrentMap<String, Object> classData; |
| // make sure only one thread can add maps |
| synchronized (sharedDataMap) { |
| // as long as at least one RegionEnvironment holds on to its classData it will |
| // remain in this map |
| classData = (ConcurrentMap<String, Object>)sharedDataMap.get(implClass.getName()); |
| if (classData == null) { |
| classData = new ConcurrentHashMap<String, Object>(); |
| sharedDataMap.put(implClass.getName(), classData); |
| } |
| } |
| return new RegionEnvironment(instance, priority, seq, conf, region, |
| rsServices, classData); |
| } |
| |
| /** |
| * HBASE-4014 : This is used by coprocessor hooks which are not declared to throw exceptions. |
| * |
| * For example, {@link |
| * org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost#preOpen()} and |
| * {@link org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost#postOpen()} are such hooks. |
| * |
| * See also |
| * {@link org.apache.hadoop.hbase.master.MasterCoprocessorHost#handleCoprocessorThrowable( |
| * CoprocessorEnvironment, Throwable)} |
| * @param env The coprocessor that threw the exception. |
| * @param e The exception that was thrown. |
| */ |
| private void handleCoprocessorThrowableNoRethrow( |
| final CoprocessorEnvironment env, final Throwable e) { |
| try { |
| handleCoprocessorThrowable(env,e); |
| } catch (IOException ioe) { |
| // We cannot throw exceptions from the caller hook, so ignore. |
| LOG.warn( |
| "handleCoprocessorThrowable() threw an IOException while attempting to handle Throwable " + |
| e + ". Ignoring.",e); |
| } |
| } |
| |
| /** |
| * Invoked before a region open. |
| * |
| * @throws IOException Signals that an I/O exception has occurred. |
| */ |
| public void preOpen() throws IOException { |
| execOperation(coprocessors.isEmpty() ? null : new RegionOperation() { |
| @Override |
| public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) |
| throws IOException { |
| oserver.preOpen(ctx); |
| } |
| }); |
| } |
| |
| /** |
| * Invoked after a region open |
| */ |
| public void postOpen() { |
| try { |
| execOperation(coprocessors.isEmpty() ? null : new RegionOperation() { |
| @Override |
| public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) |
| throws IOException { |
| oserver.postOpen(ctx); |
| } |
| }); |
| } catch (IOException e) { |
| LOG.warn(e); |
| } |
| } |
| |
| /** |
| * Invoked after log replay on region |
| */ |
| public void postLogReplay() { |
| try { |
| execOperation(coprocessors.isEmpty() ? null : new RegionOperation() { |
| @Override |
| public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) |
| throws IOException { |
| oserver.postLogReplay(ctx); |
| } |
| }); |
| } catch (IOException e) { |
| LOG.warn(e); |
| } |
| } |
| |
| /** |
| * Invoked before a region is closed |
| * @param abortRequested true if the server is aborting |
| */ |
| public void preClose(final boolean abortRequested) throws IOException { |
| execOperation(false, new RegionOperation() { |
| @Override |
| public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) |
| throws IOException { |
| oserver.preClose(ctx, abortRequested); |
| } |
| }); |
| } |
| |
| /** |
| * Invoked after a region is closed |
| * @param abortRequested true if the server is aborting |
| */ |
| public void postClose(final boolean abortRequested) { |
| try { |
| execOperation(false, new RegionOperation() { |
| @Override |
| public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) |
| throws IOException { |
| oserver.postClose(ctx, abortRequested); |
| } |
| public void postEnvCall(RegionEnvironment env) { |
| shutdown(env); |
| } |
| }); |
| } catch (IOException e) { |
| LOG.warn(e); |
| } |
| } |
| |
| /** |
| * See |
| * {@link RegionObserver#preCompactScannerOpen(ObserverContext, Store, List, ScanType, long, InternalScanner, CompactionRequest)} |
| */ |
| public InternalScanner preCompactScannerOpen(final Store store, |
| final List<StoreFileScanner> scanners, final ScanType scanType, final long earliestPutTs, |
| final CompactionRequest request, final User user) throws IOException { |
| return execOperationWithResult(null, |
| coprocessors.isEmpty() ? null : new RegionOperationWithResult<InternalScanner>(user) { |
| @Override |
| public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) |
| throws IOException { |
| setResult(oserver.preCompactScannerOpen(ctx, store, scanners, scanType, |
| earliestPutTs, getResult(), request)); |
| } |
| }); |
| } |
| |
| /** |
| * Called prior to selecting the {@link StoreFile}s for compaction from the list of currently |
| * available candidates. |
| * @param store The store where compaction is being requested |
| * @param candidates The currently available store files |
| * @param request custom compaction request |
| * @return If {@code true}, skip the normal selection process and use the current list |
| * @throws IOException |
| */ |
| public boolean preCompactSelection(final Store store, final List<StoreFile> candidates, |
| final CompactionRequest request, final User user) throws IOException { |
| return execOperation(coprocessors.isEmpty() ? null : new RegionOperation(user) { |
| @Override |
| public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) |
| throws IOException { |
| oserver.preCompactSelection(ctx, store, candidates, request); |
| } |
| }); |
| } |
| |
| /** |
| * Called after the {@link StoreFile}s to be compacted have been selected from the available |
| * candidates. |
| * @param store The store where compaction is being requested |
| * @param selected The store files selected to compact |
| * @param request custom compaction |
| */ |
| public void postCompactSelection(final Store store, final ImmutableList<StoreFile> selected, |
| final CompactionRequest request, final User user) { |
| try { |
| execOperation(coprocessors.isEmpty() ? null : new RegionOperation(user) { |
| @Override |
| public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) |
| throws IOException { |
| oserver.postCompactSelection(ctx, store, selected, request); |
| } |
| }); |
| } catch (IOException e) { |
| LOG.warn(e); |
| } |
| } |
| |
| /** |
| * Called prior to rewriting the store files selected for compaction |
| * @param store the store being compacted |
| * @param scanner the scanner used to read store data during compaction |
| * @param scanType type of Scan |
| * @param request the compaction that will be executed |
| * @throws IOException |
| */ |
| public InternalScanner preCompact(final Store store, final InternalScanner scanner, |
| final ScanType scanType, final CompactionRequest request, final User user) |
| throws IOException { |
| return execOperationWithResult(false, scanner, |
| coprocessors.isEmpty() ? null : new RegionOperationWithResult<InternalScanner>(user) { |
| @Override |
| public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) |
| throws IOException { |
| setResult(oserver.preCompact(ctx, store, getResult(), scanType, request)); |
| } |
| }); |
| } |
| |
| /** |
| * Called after the store compaction has completed. |
| * @param store the store being compacted |
| * @param resultFile the new store file written during compaction |
| * @param request the compaction that is being executed |
| * @throws IOException |
| */ |
| public void postCompact(final Store store, final StoreFile resultFile, |
| final CompactionRequest request, final User user) throws IOException { |
| execOperation(coprocessors.isEmpty() ? null : new RegionOperation(user) { |
| @Override |
| public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) |
| throws IOException { |
| oserver.postCompact(ctx, store, resultFile, request); |
| } |
| }); |
| } |
| |
| /** |
| * Invoked before a memstore flush |
| * @throws IOException |
| */ |
| public InternalScanner preFlush(final Store store, final InternalScanner scanner) |
| throws IOException { |
| return execOperationWithResult(false, scanner, |
| coprocessors.isEmpty() ? null : new RegionOperationWithResult<InternalScanner>() { |
| @Override |
| public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) |
| throws IOException { |
| setResult(oserver.preFlush(ctx, store, getResult())); |
| } |
| }); |
| } |
| |
| /** |
| * Invoked before a memstore flush |
| * @throws IOException |
| */ |
| public void preFlush() throws IOException { |
| execOperation(coprocessors.isEmpty() ? null : new RegionOperation() { |
| @Override |
| public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) |
| throws IOException { |
| oserver.preFlush(ctx); |
| } |
| }); |
| } |
| |
| /** |
| * See |
| * {@link RegionObserver#preFlushScannerOpen(ObserverContext, |
| * Store, KeyValueScanner, InternalScanner)} |
| */ |
| public InternalScanner preFlushScannerOpen(final Store store, |
| final KeyValueScanner memstoreScanner) throws IOException { |
| return execOperationWithResult(null, |
| coprocessors.isEmpty() ? null : new RegionOperationWithResult<InternalScanner>() { |
| @Override |
| public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) |
| throws IOException { |
| setResult(oserver.preFlushScannerOpen(ctx, store, memstoreScanner, getResult())); |
| } |
| }); |
| } |
| |
| /** |
| * Invoked after a memstore flush |
| * @throws IOException |
| */ |
| public void postFlush() throws IOException { |
| execOperation(coprocessors.isEmpty() ? null : new RegionOperation() { |
| @Override |
| public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) |
| throws IOException { |
| oserver.postFlush(ctx); |
| } |
| }); |
| } |
| |
| /** |
| * Invoked after a memstore flush |
| * @throws IOException |
| */ |
| public void postFlush(final Store store, final StoreFile storeFile) throws IOException { |
| execOperation(coprocessors.isEmpty() ? null : new RegionOperation() { |
| @Override |
| public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) |
| throws IOException { |
| oserver.postFlush(ctx, store, storeFile); |
| } |
| }); |
| } |
| |
| /** |
| * Invoked just before a split |
| * @throws IOException |
| */ |
| // TODO: Deprecate this |
| public void preSplit(final User user) throws IOException { |
| execOperation(coprocessors.isEmpty() ? null : new RegionOperation(user) { |
| @Override |
| public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) |
| throws IOException { |
| oserver.preSplit(ctx); |
| } |
| }); |
| } |
| |
| /** |
| * Invoked just before a split |
| * @throws IOException |
| */ |
| public void preSplit(final byte[] splitRow, final User user) throws IOException { |
| execOperation(coprocessors.isEmpty() ? null : new RegionOperation(user) { |
| @Override |
| public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) |
| throws IOException { |
| oserver.preSplit(ctx, splitRow); |
| } |
| }); |
| } |
| |
| /** |
| * Invoked just after a split |
| * @param l the new left-hand daughter region |
| * @param r the new right-hand daughter region |
| * @throws IOException |
| */ |
| public void postSplit(final Region l, final Region r, final User user) throws IOException { |
| execOperation(coprocessors.isEmpty() ? null : new RegionOperation(user) { |
| @Override |
| public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) |
| throws IOException { |
| oserver.postSplit(ctx, l, r); |
| } |
| }); |
| } |
| |
| public boolean preSplitBeforePONR(final byte[] splitKey, |
| final List<Mutation> metaEntries, final User user) throws IOException { |
| return execOperation(coprocessors.isEmpty() ? null : new RegionOperation(user) { |
| @Override |
| public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) |
| throws IOException { |
| oserver.preSplitBeforePONR(ctx, splitKey, metaEntries); |
| } |
| }); |
| } |
| |
| public void preSplitAfterPONR(final User user) throws IOException { |
| execOperation(coprocessors.isEmpty() ? null : new RegionOperation(user) { |
| @Override |
| public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) |
| throws IOException { |
| oserver.preSplitAfterPONR(ctx); |
| } |
| }); |
| } |
| |
| /** |
| * Invoked just before the rollback of a failed split is started |
| * @throws IOException |
| */ |
| public void preRollBackSplit(final User user) throws IOException { |
| execOperation(coprocessors.isEmpty() ? null : new RegionOperation(user) { |
| @Override |
| public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) |
| throws IOException { |
| oserver.preRollBackSplit(ctx); |
| } |
| }); |
| } |
| |
| /** |
| * Invoked just after the rollback of a failed split is done |
| * @throws IOException |
| */ |
| public void postRollBackSplit(final User user) throws IOException { |
| execOperation(coprocessors.isEmpty() ? null : new RegionOperation(user) { |
| @Override |
| public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) |
| throws IOException { |
| oserver.postRollBackSplit(ctx); |
| } |
| }); |
| } |
| |
| /** |
| * Invoked after a split is completed irrespective of a failure or success. |
| * @throws IOException |
| */ |
| public void postCompleteSplit() throws IOException { |
| execOperation(coprocessors.isEmpty() ? null : new RegionOperation() { |
| @Override |
| public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) |
| throws IOException { |
| oserver.postCompleteSplit(ctx); |
| } |
| }); |
| } |
| |
| // RegionObserver support |
| |
| /** |
| * @param get the Get request |
| * @return true if default processing should be bypassed |
| * @exception IOException Exception |
| */ |
| public boolean preGet(final Get get, final List<Cell> results) |
| throws IOException { |
| return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() { |
| @Override |
| public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) |
| throws IOException { |
| oserver.preGetOp(ctx, get, results); |
| } |
| }); |
| } |
| |
| /** |
| * @param get the Get request |
| * @param results the result sett |
| * @exception IOException Exception |
| */ |
| public void postGet(final Get get, final List<Cell> results) |
| throws IOException { |
| execOperation(coprocessors.isEmpty() ? null : new RegionOperation() { |
| @Override |
| public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) |
| throws IOException { |
| oserver.postGetOp(ctx, get, results); |
| } |
| }); |
| } |
| |
| /** |
| * @param get the Get request |
| * @return true or false to return to client if bypassing normal operation, |
| * or null otherwise |
| * @exception IOException Exception |
| */ |
| public Boolean preExists(final Get get) throws IOException { |
| return execOperationWithResult(true, false, |
| coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() { |
| @Override |
| public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) |
| throws IOException { |
| setResult(oserver.preExists(ctx, get, getResult())); |
| } |
| }); |
| } |
| |
| /** |
| * @param get the Get request |
| * @param exists the result returned by the region server |
| * @return the result to return to the client |
| * @exception IOException Exception |
| */ |
| public boolean postExists(final Get get, boolean exists) |
| throws IOException { |
| return execOperationWithResult(exists, |
| coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() { |
| @Override |
| public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) |
| throws IOException { |
| setResult(oserver.postExists(ctx, get, getResult())); |
| } |
| }); |
| } |
| |
| /** |
| * @param put The Put object |
| * @param edit The WALEdit object. |
| * @param durability The durability used |
| * @return true if default processing should be bypassed |
| * @exception IOException Exception |
| */ |
| public boolean prePut(final Put put, final WALEdit edit, final Durability durability) |
| throws IOException { |
| return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() { |
| @Override |
| public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) |
| throws IOException { |
| oserver.prePut(ctx, put, edit, durability); |
| } |
| }); |
| } |
| |
| /** |
| * @param mutation - the current mutation |
| * @param kv - the current cell |
| * @param byteNow - current timestamp in bytes |
| * @param get - the get that could be used |
| * Note that the get only does not specify the family and qualifier that should be used |
| * @return true if default processing should be bypassed |
| * @exception IOException |
| * Exception |
| */ |
| public boolean prePrepareTimeStampForDeleteVersion(final Mutation mutation, |
| final Cell kv, final byte[] byteNow, final Get get) throws IOException { |
| return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() { |
| @Override |
| public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) |
| throws IOException { |
| oserver.prePrepareTimeStampForDeleteVersion(ctx, mutation, kv, byteNow, get); |
| } |
| }); |
| } |
| |
| /** |
| * @param put The Put object |
| * @param edit The WALEdit object. |
| * @param durability The durability used |
| * @exception IOException Exception |
| */ |
| public void postPut(final Put put, final WALEdit edit, final Durability durability) |
| throws IOException { |
| execOperation(coprocessors.isEmpty() ? null : new RegionOperation() { |
| @Override |
| public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) |
| throws IOException { |
| oserver.postPut(ctx, put, edit, durability); |
| } |
| }); |
| } |
| |
| /** |
| * @param delete The Delete object |
| * @param edit The WALEdit object. |
| * @param durability The durability used |
| * @return true if default processing should be bypassed |
| * @exception IOException Exception |
| */ |
| public boolean preDelete(final Delete delete, final WALEdit edit, final Durability durability) |
| throws IOException { |
| return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() { |
| @Override |
| public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) |
| throws IOException { |
| oserver.preDelete(ctx, delete, edit, durability); |
| } |
| }); |
| } |
| |
| /** |
| * @param delete The Delete object |
| * @param edit The WALEdit object. |
| * @param durability The durability used |
| * @exception IOException Exception |
| */ |
| public void postDelete(final Delete delete, final WALEdit edit, final Durability durability) |
| throws IOException { |
| execOperation(coprocessors.isEmpty() ? null : new RegionOperation() { |
| @Override |
| public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) |
| throws IOException { |
| oserver.postDelete(ctx, delete, edit, durability); |
| } |
| }); |
| } |
| |
| /** |
| * @param miniBatchOp |
| * @return true if default processing should be bypassed |
| * @throws IOException |
| */ |
| public boolean preBatchMutate( |
| final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException { |
| return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() { |
| @Override |
| public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) |
| throws IOException { |
| oserver.preBatchMutate(ctx, miniBatchOp); |
| } |
| }); |
| } |
| |
| /** |
| * @param miniBatchOp |
| * @throws IOException |
| */ |
| public void postBatchMutate( |
| final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException { |
| execOperation(coprocessors.isEmpty() ? null : new RegionOperation() { |
| @Override |
| public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) |
| throws IOException { |
| oserver.postBatchMutate(ctx, miniBatchOp); |
| } |
| }); |
| } |
| |
| public void postBatchMutateIndispensably( |
| final MiniBatchOperationInProgress<Mutation> miniBatchOp, final boolean success) |
| throws IOException { |
| execOperation(coprocessors.isEmpty() ? null : new RegionOperation() { |
| @Override |
| public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) |
| throws IOException { |
| oserver.postBatchMutateIndispensably(ctx, miniBatchOp, success); |
| } |
| }); |
| } |
| |
| /** |
| * @param row row to check |
| * @param family column family |
| * @param qualifier column qualifier |
| * @param compareOp the comparison operation |
| * @param comparator the comparator |
| * @param put data to put if check succeeds |
| * @return true or false to return to client if default processing should |
| * be bypassed, or null otherwise |
| * @throws IOException e |
| */ |
| public Boolean preCheckAndPut(final byte [] row, final byte [] family, |
| final byte [] qualifier, final CompareOp compareOp, |
| final ByteArrayComparable comparator, final Put put) |
| throws IOException { |
| return execOperationWithResult(true, false, |
| coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() { |
| @Override |
| public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) |
| throws IOException { |
| setResult(oserver.preCheckAndPut(ctx, row, family, qualifier, |
| compareOp, comparator, put, getResult())); |
| } |
| }); |
| } |
| |
| /** |
| * @param row row to check |
| * @param family column family |
| * @param qualifier column qualifier |
| * @param compareOp the comparison operation |
| * @param comparator the comparator |
| * @param put data to put if check succeeds |
| * @return true or false to return to client if default processing should |
| * be bypassed, or null otherwise |
| * @throws IOException e |
| */ |
| public Boolean preCheckAndPutAfterRowLock(final byte[] row, final byte[] family, |
| final byte[] qualifier, final CompareOp compareOp, final ByteArrayComparable comparator, |
| final Put put) throws IOException { |
| return execOperationWithResult(true, false, |
| coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() { |
| @Override |
| public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) |
| throws IOException { |
| setResult(oserver.preCheckAndPutAfterRowLock(ctx, row, family, qualifier, |
| compareOp, comparator, put, getResult())); |
| } |
| }); |
| } |
| |
| /** |
| * @param row row to check |
| * @param family column family |
| * @param qualifier column qualifier |
| * @param compareOp the comparison operation |
| * @param comparator the comparator |
| * @param put data to put if check succeeds |
| * @throws IOException e |
| */ |
| public boolean postCheckAndPut(final byte [] row, final byte [] family, |
| final byte [] qualifier, final CompareOp compareOp, |
| final ByteArrayComparable comparator, final Put put, |
| boolean result) throws IOException { |
| return execOperationWithResult(result, |
| coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() { |
| @Override |
| public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) |
| throws IOException { |
| setResult(oserver.postCheckAndPut(ctx, row, family, qualifier, |
| compareOp, comparator, put, getResult())); |
| } |
| }); |
| } |
| |
| /** |
| * @param row row to check |
| * @param family column family |
| * @param qualifier column qualifier |
| * @param compareOp the comparison operation |
| * @param comparator the comparator |
| * @param delete delete to commit if check succeeds |
| * @return true or false to return to client if default processing should |
| * be bypassed, or null otherwise |
| * @throws IOException e |
| */ |
| public Boolean preCheckAndDelete(final byte [] row, final byte [] family, |
| final byte [] qualifier, final CompareOp compareOp, |
| final ByteArrayComparable comparator, final Delete delete) |
| throws IOException { |
| return execOperationWithResult(true, false, |
| coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() { |
| @Override |
| public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) |
| throws IOException { |
| setResult(oserver.preCheckAndDelete(ctx, row, family, |
| qualifier, compareOp, comparator, delete, getResult())); |
| } |
| }); |
| } |
| |
| /** |
| * @param row row to check |
| * @param family column family |
| * @param qualifier column qualifier |
| * @param compareOp the comparison operation |
| * @param comparator the comparator |
| * @param delete delete to commit if check succeeds |
| * @return true or false to return to client if default processing should |
| * be bypassed, or null otherwise |
| * @throws IOException e |
| */ |
| public Boolean preCheckAndDeleteAfterRowLock(final byte[] row, final byte[] family, |
| final byte[] qualifier, final CompareOp compareOp, final ByteArrayComparable comparator, |
| final Delete delete) throws IOException { |
| return execOperationWithResult(true, false, |
| coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() { |
| @Override |
| public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) |
| throws IOException { |
| setResult(oserver.preCheckAndDeleteAfterRowLock(ctx, row, |
| family, qualifier, compareOp, comparator, delete, getResult())); |
| } |
| }); |
| } |
| |
| /** |
| * @param row row to check |
| * @param family column family |
| * @param qualifier column qualifier |
| * @param compareOp the comparison operation |
| * @param comparator the comparator |
| * @param delete delete to commit if check succeeds |
| * @throws IOException e |
| */ |
| public boolean postCheckAndDelete(final byte [] row, final byte [] family, |
| final byte [] qualifier, final CompareOp compareOp, |
| final ByteArrayComparable comparator, final Delete delete, |
| boolean result) throws IOException { |
| return execOperationWithResult(result, |
| coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() { |
| @Override |
| public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) |
| throws IOException { |
| setResult(oserver.postCheckAndDelete(ctx, row, family, |
| qualifier, compareOp, comparator, delete, getResult())); |
| } |
| }); |
| } |
| |
| /** |
| * @param append append object |
| * @return result to return to client if default operation should be |
| * bypassed, null otherwise |
| * @throws IOException if an error occurred on the coprocessor |
| */ |
| public Result preAppend(final Append append) throws IOException { |
| return execOperationWithResult(true, null, |
| coprocessors.isEmpty() ? null : new RegionOperationWithResult<Result>() { |
| @Override |
| public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) |
| throws IOException { |
| setResult(oserver.preAppend(ctx, append)); |
| } |
| }); |
| } |
| |
| /** |
| * @param append append object |
| * @return result to return to client if default operation should be |
| * bypassed, null otherwise |
| * @throws IOException if an error occurred on the coprocessor |
| */ |
| public Result preAppendAfterRowLock(final Append append) throws IOException { |
| return execOperationWithResult(true, null, |
| coprocessors.isEmpty() ? null : new RegionOperationWithResult<Result>() { |
| @Override |
| public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) |
| throws IOException { |
| setResult(oserver.preAppendAfterRowLock(ctx, append)); |
| } |
| }); |
| } |
| |
| /** |
| * @param increment increment object |
| * @return result to return to client if default operation should be |
| * bypassed, null otherwise |
| * @throws IOException if an error occurred on the coprocessor |
| */ |
| public Result preIncrement(final Increment increment) throws IOException { |
| return execOperationWithResult(true, null, |
| coprocessors.isEmpty() ? null : new RegionOperationWithResult<Result>() { |
| @Override |
| public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) |
| throws IOException { |
| setResult(oserver.preIncrement(ctx, increment)); |
| } |
| }); |
| } |
| |
| /** |
| * @param increment increment object |
| * @return result to return to client if default operation should be |
| * bypassed, null otherwise |
| * @throws IOException if an error occurred on the coprocessor |
| */ |
| public Result preIncrementAfterRowLock(final Increment increment) throws IOException { |
| return execOperationWithResult(true, null, |
| coprocessors.isEmpty() ? null : new RegionOperationWithResult<Result>() { |
| @Override |
| public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) |
| throws IOException { |
| setResult(oserver.preIncrementAfterRowLock(ctx, increment)); |
| } |
| }); |
| } |
| |
| /** |
| * @param append Append object |
| * @param result the result returned by the append |
| * @throws IOException if an error occurred on the coprocessor |
| */ |
| public void postAppend(final Append append, final Result result) throws IOException { |
| execOperation(coprocessors.isEmpty() ? null : new RegionOperation() { |
| @Override |
| public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) |
| throws IOException { |
| oserver.postAppend(ctx, append, result); |
| } |
| }); |
| } |
| |
| /** |
| * @param increment increment object |
| * @param result the result returned by postIncrement |
| * @throws IOException if an error occurred on the coprocessor |
| */ |
| public Result postIncrement(final Increment increment, Result result) throws IOException { |
| return execOperationWithResult(result, |
| coprocessors.isEmpty() ? null : new RegionOperationWithResult<Result>() { |
| @Override |
| public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) |
| throws IOException { |
| setResult(oserver.postIncrement(ctx, increment, getResult())); |
| } |
| }); |
| } |
| |
| /** |
| * @param scan the Scan specification |
| * @return scanner id to return to client if default operation should be |
| * bypassed, false otherwise |
| * @exception IOException Exception |
| */ |
| public RegionScanner preScannerOpen(final Scan scan) throws IOException { |
| return execOperationWithResult(true, null, |
| coprocessors.isEmpty() ? null : new RegionOperationWithResult<RegionScanner>() { |
| @Override |
| public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) |
| throws IOException { |
| setResult(oserver.preScannerOpen(ctx, scan, getResult())); |
| } |
| }); |
| } |
| |
| /** |
| * See |
| * {@link RegionObserver#preStoreScannerOpen(ObserverContext, |
| * Store, Scan, NavigableSet, KeyValueScanner)} |
| */ |
| public KeyValueScanner preStoreScannerOpen(final Store store, final Scan scan, |
| final NavigableSet<byte[]> targetCols, final long readPt) throws IOException { |
| return execOperationWithResult(null, |
| coprocessors.isEmpty() ? null : new RegionOperationWithResult<KeyValueScanner>() { |
| @Override |
| public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) |
| throws IOException { |
| setResult(oserver.preStoreScannerOpen(ctx, store, scan, targetCols, getResult(), readPt)); |
| } |
| }); |
| } |
| |
| /** |
| * @param scan the Scan specification |
| * @param s the scanner |
| * @return the scanner instance to use |
| * @exception IOException Exception |
| */ |
| public RegionScanner postScannerOpen(final Scan scan, RegionScanner s) throws IOException { |
| return execOperationWithResult(s, |
| coprocessors.isEmpty() ? null : new RegionOperationWithResult<RegionScanner>() { |
| @Override |
| public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) |
| throws IOException { |
| setResult(oserver.postScannerOpen(ctx, scan, getResult())); |
| } |
| }); |
| } |
| |
| /** |
| * @param s the scanner |
| * @param results the result set returned by the region server |
| * @param limit the maximum number of results to return |
| * @return 'has next' indication to client if bypassing default behavior, or |
| * null otherwise |
| * @exception IOException Exception |
| */ |
| public Boolean preScannerNext(final InternalScanner s, |
| final List<Result> results, final int limit) throws IOException { |
| return execOperationWithResult(true, false, |
| coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() { |
| @Override |
| public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) |
| throws IOException { |
| setResult(oserver.preScannerNext(ctx, s, results, limit, getResult())); |
| } |
| }); |
| } |
| |
| /** |
| * @param s the scanner |
| * @param results the result set returned by the region server |
| * @param limit the maximum number of results to return |
| * @param hasMore |
| * @return 'has more' indication to give to client |
| * @exception IOException Exception |
| */ |
| public boolean postScannerNext(final InternalScanner s, |
| final List<Result> results, final int limit, boolean hasMore) |
| throws IOException { |
| return execOperationWithResult(hasMore, |
| coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() { |
| @Override |
| public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) |
| throws IOException { |
| setResult(oserver.postScannerNext(ctx, s, results, limit, getResult())); |
| } |
| }); |
| } |
| |
| /** |
| * This will be called by the scan flow when the current scanned row is being filtered out by the |
| * filter. |
| * @param s the scanner |
| * @param curRowCell The cell in the current row which got filtered out |
| * @return whether more rows are available for the scanner or not |
| * @throws IOException |
| */ |
| public boolean postScannerFilterRow(final InternalScanner s, final Cell curRowCell) |
| throws IOException { |
| // short circuit for performance |
| if (!hasCustomPostScannerFilterRow) return true; |
| return execOperationWithResult(true, |
| coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() { |
| @Override |
| public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) |
| throws IOException { |
| setResult(oserver.postScannerFilterRow(ctx, s, curRowCell, getResult())); |
| } |
| }); |
| } |
| |
| /** |
| * @param s the scanner |
| * @return true if default behavior should be bypassed, false otherwise |
| * @exception IOException Exception |
| */ |
| public boolean preScannerClose(final InternalScanner s) throws IOException { |
| return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() { |
| @Override |
| public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) |
| throws IOException { |
| oserver.preScannerClose(ctx, s); |
| } |
| }); |
| } |
| |
| /** |
| * @exception IOException Exception |
| */ |
| public void postScannerClose(final InternalScanner s) throws IOException { |
| execOperation(coprocessors.isEmpty() ? null : new RegionOperation() { |
| @Override |
| public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) |
| throws IOException { |
| oserver.postScannerClose(ctx, s); |
| } |
| }); |
| } |
| |
| /** |
| * @param info the RegionInfo for this region |
| * @param edits the file of recovered edits |
| * @throws IOException Exception |
| */ |
| public void preReplayWALs(final HRegionInfo info, final Path edits) throws IOException { |
| execOperation(coprocessors.isEmpty() ? null : new RegionOperation() { |
| @Override |
| public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) |
| throws IOException { |
| oserver.preReplayWALs(ctx, info, edits); |
| } |
| }); |
| } |
| |
| /** |
| * @param info the RegionInfo for this region |
| * @param edits the file of recovered edits |
| * @throws IOException Exception |
| */ |
| public void postReplayWALs(final HRegionInfo info, final Path edits) throws IOException { |
| execOperation(coprocessors.isEmpty() ? null : new RegionOperation() { |
| @Override |
| public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) |
| throws IOException { |
| oserver.postReplayWALs(ctx, info, edits); |
| } |
| }); |
| } |
| |
| /** |
| * @param info |
| * @param logKey |
| * @param logEdit |
| * @return true if default behavior should be bypassed, false otherwise |
| * @throws IOException |
| */ |
| public boolean preWALRestore(final HRegionInfo info, final WALKey logKey, |
| final WALEdit logEdit) throws IOException { |
| return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() { |
| @Override |
| public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) |
| throws IOException { |
| // Once we don't need to support the legacy call, replace RegionOperation with a version |
| // that's ObserverContext<RegionEnvironment> and avoid this cast. |
| final RegionEnvironment env = (RegionEnvironment)ctx.getEnvironment(); |
| if (env.useLegacyPre) { |
| if (logKey instanceof HLogKey) { |
| oserver.preWALRestore(ctx, info, (HLogKey)logKey, logEdit); |
| } else { |
| legacyWarning(oserver.getClass(), "There are wal keys present that are not HLogKey."); |
| } |
| } else { |
| oserver.preWALRestore(ctx, info, logKey, logEdit); |
| } |
| } |
| }); |
| } |
| |
| /** |
| * @return true if default behavior should be bypassed, false otherwise |
| * @deprecated use {@link #preWALRestore(HRegionInfo, WALKey, WALEdit)}; as of 2.0, remove in 3.0 |
| */ |
| @Deprecated |
| public boolean preWALRestore(final HRegionInfo info, final HLogKey logKey, |
| final WALEdit logEdit) throws IOException { |
| return preWALRestore(info, (WALKey)logKey, logEdit); |
| } |
| |
| /** |
| * @param info |
| * @param logKey |
| * @param logEdit |
| * @throws IOException |
| */ |
| public void postWALRestore(final HRegionInfo info, final WALKey logKey, final WALEdit logEdit) |
| throws IOException { |
| execOperation(coprocessors.isEmpty() ? null : new RegionOperation() { |
| @Override |
| public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) |
| throws IOException { |
| // Once we don't need to support the legacy call, replace RegionOperation with a version |
| // that's ObserverContext<RegionEnvironment> and avoid this cast. |
| final RegionEnvironment env = (RegionEnvironment)ctx.getEnvironment(); |
| if (env.useLegacyPost) { |
| if (logKey instanceof HLogKey) { |
| oserver.postWALRestore(ctx, info, (HLogKey)logKey, logEdit); |
| } else { |
| legacyWarning(oserver.getClass(), "There are wal keys present that are not HLogKey."); |
| } |
| } else { |
| oserver.postWALRestore(ctx, info, logKey, logEdit); |
| } |
| } |
| }); |
| } |
| |
| /** |
| * @deprecated use {@link #postWALRestore(HRegionInfo, WALKey, WALEdit)}; as of 2.0, remove in 3.0 |
| */ |
| @Deprecated |
| public void postWALRestore(final HRegionInfo info, final HLogKey logKey, final WALEdit logEdit) |
| throws IOException { |
| postWALRestore(info, (WALKey)logKey, logEdit); |
| } |
| |
| /** |
| * @param familyPaths pairs of { CF, file path } submitted for bulk load |
| * @return true if the default operation should be bypassed |
| * @throws IOException |
| */ |
| public boolean preBulkLoadHFile(final List<Pair<byte[], String>> familyPaths) throws IOException { |
| return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() { |
| @Override |
| public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) |
| throws IOException { |
| oserver.preBulkLoadHFile(ctx, familyPaths); |
| } |
| }); |
| } |
| |
| /** |
| * @param familyPaths pairs of { CF, file path } submitted for bulk load |
| * @param hasLoaded whether load was successful or not |
| * @return the possibly modified value of hasLoaded |
| * @throws IOException |
| */ |
| public boolean postBulkLoadHFile(final List<Pair<byte[], String>> familyPaths, |
| boolean hasLoaded) throws IOException { |
| return execOperationWithResult(hasLoaded, |
| coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() { |
| @Override |
| public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) |
| throws IOException { |
| setResult(oserver.postBulkLoadHFile(ctx, familyPaths, getResult())); |
| } |
| }); |
| } |
| |
| public void postStartRegionOperation(final Operation op) throws IOException { |
| execOperation(coprocessors.isEmpty() ? null : new RegionOperation() { |
| @Override |
| public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) |
| throws IOException { |
| oserver.postStartRegionOperation(ctx, op); |
| } |
| }); |
| } |
| |
| public void postCloseRegionOperation(final Operation op) throws IOException { |
| execOperation(coprocessors.isEmpty() ? null : new RegionOperation() { |
| @Override |
| public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) |
| throws IOException { |
| oserver.postCloseRegionOperation(ctx, op); |
| } |
| }); |
| } |
| |
| /** |
| * @param fs fileystem to read from |
| * @param p path to the file |
| * @param in {@link FSDataInputStreamWrapper} |
| * @param size Full size of the file |
| * @param cacheConf |
| * @param r original reference file. This will be not null only when reading a split file. |
| * @return a Reader instance to use instead of the base reader if overriding |
| * default behavior, null otherwise |
| * @throws IOException |
| */ |
| public StoreFileReader preStoreFileReaderOpen(final FileSystem fs, final Path p, |
| final FSDataInputStreamWrapper in, final long size, final CacheConfig cacheConf, |
| final Reference r) throws IOException { |
| return execOperationWithResult(null, |
| coprocessors.isEmpty() ? null : new RegionOperationWithResult<StoreFileReader>() { |
| @Override |
| public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) |
| throws IOException { |
| setResult(oserver.preStoreFileReaderOpen(ctx, fs, p, in, size, cacheConf, r, getResult())); |
| } |
| }); |
| } |
| |
| /** |
| * @param fs fileystem to read from |
| * @param p path to the file |
| * @param in {@link FSDataInputStreamWrapper} |
| * @param size Full size of the file |
| * @param cacheConf |
| * @param r original reference file. This will be not null only when reading a split file. |
| * @param reader the base reader instance |
| * @return The reader to use |
| * @throws IOException |
| */ |
| public StoreFileReader postStoreFileReaderOpen(final FileSystem fs, final Path p, |
| final FSDataInputStreamWrapper in, final long size, final CacheConfig cacheConf, |
| final Reference r, final StoreFileReader reader) throws IOException { |
| return execOperationWithResult(reader, |
| coprocessors.isEmpty() ? null : new RegionOperationWithResult<StoreFileReader>() { |
| @Override |
| public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) |
| throws IOException { |
| setResult(oserver.postStoreFileReaderOpen(ctx, fs, p, in, size, cacheConf, r, getResult())); |
| } |
| }); |
| } |
| |
| public Cell postMutationBeforeWAL(final MutationType opType, final Mutation mutation, |
| final Cell oldCell, Cell newCell) throws IOException { |
| return execOperationWithResult(newCell, |
| coprocessors.isEmpty() ? null : new RegionOperationWithResult<Cell>() { |
| @Override |
| public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) |
| throws IOException { |
| setResult(oserver.postMutationBeforeWAL(ctx, opType, mutation, oldCell, getResult())); |
| } |
| }); |
| } |
| |
| public Message preEndpointInvocation(final Service service, final String methodName, |
| Message request) throws IOException { |
| return execOperationWithResult(request, |
| coprocessors.isEmpty() ? null : new EndpointOperationWithResult<Message>() { |
| @Override |
| public void call(EndpointObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) |
| throws IOException { |
| setResult(oserver.preEndpointInvocation(ctx, service, methodName, getResult())); |
| } |
| }); |
| } |
| |
| public void postEndpointInvocation(final Service service, final String methodName, |
| final Message request, final Message.Builder responseBuilder) throws IOException { |
| execOperation(coprocessors.isEmpty() ? null : new EndpointOperation() { |
| @Override |
| public void call(EndpointObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) |
| throws IOException { |
| oserver.postEndpointInvocation(ctx, service, methodName, request, responseBuilder); |
| } |
| }); |
| } |
| |
| public DeleteTracker postInstantiateDeleteTracker(DeleteTracker tracker) throws IOException { |
| return execOperationWithResult(tracker, |
| coprocessors.isEmpty() ? null : new RegionOperationWithResult<DeleteTracker>() { |
| @Override |
| public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) |
| throws IOException { |
| setResult(oserver.postInstantiateDeleteTracker(ctx, getResult())); |
| } |
| }); |
| } |
| |
| private static abstract class CoprocessorOperation |
| extends ObserverContext<RegionCoprocessorEnvironment> { |
| public CoprocessorOperation() { |
| this(RpcServer.getRequestUser()); |
| } |
| |
| public CoprocessorOperation(User user) { |
| super(user); |
| } |
| |
| public abstract void call(Coprocessor observer, |
| ObserverContext<RegionCoprocessorEnvironment> ctx) throws IOException; |
| public abstract boolean hasCall(Coprocessor observer); |
| public void postEnvCall(RegionEnvironment env) { } |
| } |
| |
| private static abstract class RegionOperation extends CoprocessorOperation { |
| public RegionOperation() { |
| } |
| |
| public RegionOperation(User user) { |
| super(user); |
| } |
| |
| public abstract void call(RegionObserver observer, |
| ObserverContext<RegionCoprocessorEnvironment> ctx) throws IOException; |
| |
| public boolean hasCall(Coprocessor observer) { |
| return observer instanceof RegionObserver; |
| } |
| |
| public void call(Coprocessor observer, ObserverContext<RegionCoprocessorEnvironment> ctx) |
| throws IOException { |
| call((RegionObserver)observer, ctx); |
| } |
| } |
| |
| private static abstract class RegionOperationWithResult<T> extends RegionOperation { |
| public RegionOperationWithResult() { |
| } |
| |
| public RegionOperationWithResult(User user) { |
| super (user); |
| } |
| |
| private T result = null; |
| public void setResult(final T result) { this.result = result; } |
| public T getResult() { return this.result; } |
| } |
| |
| private static abstract class EndpointOperation extends CoprocessorOperation { |
| public abstract void call(EndpointObserver observer, |
| ObserverContext<RegionCoprocessorEnvironment> ctx) throws IOException; |
| |
| public boolean hasCall(Coprocessor observer) { |
| return observer instanceof EndpointObserver; |
| } |
| |
| public void call(Coprocessor observer, ObserverContext<RegionCoprocessorEnvironment> ctx) |
| throws IOException { |
| call((EndpointObserver)observer, ctx); |
| } |
| } |
| |
| private static abstract class EndpointOperationWithResult<T> extends EndpointOperation { |
| private T result = null; |
| public void setResult(final T result) { this.result = result; } |
| public T getResult() { return this.result; } |
| } |
| |
| private boolean execOperation(final CoprocessorOperation ctx) |
| throws IOException { |
| return execOperation(true, ctx); |
| } |
| |
| private <T> T execOperationWithResult(final T defaultValue, |
| final RegionOperationWithResult<T> ctx) throws IOException { |
| if (ctx == null) return defaultValue; |
| ctx.setResult(defaultValue); |
| execOperation(true, ctx); |
| return ctx.getResult(); |
| } |
| |
| private <T> T execOperationWithResult(final boolean ifBypass, final T defaultValue, |
| final RegionOperationWithResult<T> ctx) throws IOException { |
| boolean bypass = false; |
| T result = defaultValue; |
| if (ctx != null) { |
| ctx.setResult(defaultValue); |
| bypass = execOperation(true, ctx); |
| result = ctx.getResult(); |
| } |
| return bypass == ifBypass ? result : null; |
| } |
| |
| private <T> T execOperationWithResult(final T defaultValue, |
| final EndpointOperationWithResult<T> ctx) throws IOException { |
| if (ctx == null) return defaultValue; |
| ctx.setResult(defaultValue); |
| execOperation(true, ctx); |
| return ctx.getResult(); |
| } |
| |
| private boolean execOperation(final boolean earlyExit, final CoprocessorOperation ctx) |
| throws IOException { |
| boolean bypass = false; |
| List<RegionEnvironment> envs = coprocessors.get(); |
| for (int i = 0; i < envs.size(); i++) { |
| RegionEnvironment env = envs.get(i); |
| Coprocessor observer = env.getInstance(); |
| if (ctx.hasCall(observer)) { |
| ctx.prepare(env); |
| Thread currentThread = Thread.currentThread(); |
| ClassLoader cl = currentThread.getContextClassLoader(); |
| try { |
| currentThread.setContextClassLoader(env.getClassLoader()); |
| ctx.call(observer, ctx); |
| } catch (Throwable e) { |
| handleCoprocessorThrowable(env, e); |
| } finally { |
| currentThread.setContextClassLoader(cl); |
| } |
| bypass |= ctx.shouldBypass(); |
| if (earlyExit && ctx.shouldComplete()) { |
| break; |
| } |
| } |
| |
| ctx.postEnvCall(env); |
| } |
| return bypass; |
| } |
| } |