| //// |
| /** |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| //// |
| |
| [[cp]] |
| = Apache HBase Coprocessors |
| :doctype: book |
| :numbered: |
| :toc: left |
| :icons: font |
| :experimental: |
| |
| HBase Coprocessors are modeled after Google BigTable's coprocessor implementation |
| (http://research.google.com/people/jeff/SOCC2010-keynote-slides.pdf pages 41-42.). |
| |
| The coprocessor framework provides mechanisms for running your custom code directly on |
| the RegionServers managing your data. Efforts are ongoing to bridge gaps between HBase's |
| implementation and BigTable's architecture. For more information see |
| link:https://issues.apache.org/jira/browse/HBASE-4047[HBASE-4047]. |
| |
| The information in this chapter is primarily sourced and heavily reused from the following |
| resources: |
| |
| . Mingjie Lai's blog post |
| link:https://blogs.apache.org/hbase/entry/coprocessor_introduction[Coprocessor Introduction]. |
| . Gaurav Bhardwaj's blog post |
| link:http://www.3pillarglobal.com/insights/hbase-coprocessors[The How To Of HBase Coprocessors]. |
| |
| [WARNING] |
| .Use Coprocessors At Your Own Risk |
| ==== |
| Coprocessors are an advanced feature of HBase and are intended to be used by system |
| developers only. Because coprocessor code runs directly on the RegionServer and has |
| direct access to your data, they introduce the risk of data corruption, man-in-the-middle |
| attacks, or other malicious data access. Currently, there is no mechanism to prevent |
| data corruption by coprocessors, though work is underway on |
| link:https://issues.apache.org/jira/browse/HBASE-4047[HBASE-4047]. |
| + |
| In addition, there is no resource isolation, so a well-intentioned but misbehaving |
| coprocessor can severely degrade cluster performance and stability. |
| ==== |
| |
| == Coprocessor Overview |
| |
| In HBase, you fetch data using a `Get` or `Scan`, whereas in an RDBMS you use a SQL |
| query. In order to fetch only the relevant data, you filter it using a HBase |
| link:https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/filter/Filter.html[Filter] |
| , whereas in an RDBMS you use a `WHERE` predicate. |
| |
| After fetching the data, you perform computations on it. This paradigm works well |
| for "small data" with a few thousand rows and several columns. However, when you scale |
| to billions of rows and millions of columns, moving large amounts of data across your |
| network will create bottlenecks at the network layer, and the client needs to be powerful |
| enough and have enough memory to handle the large amounts of data and the computations. |
| In addition, the client code can grow large and complex. |
| |
| In this scenario, coprocessors might make sense. You can put the business computation |
| code into a coprocessor which runs on the RegionServer, in the same location as the |
| data, and returns the result to the client. |
| |
| This is only one scenario where using coprocessors can provide benefit. Following |
| are some analogies which may help to explain some of the benefits of coprocessors. |
| |
| [[cp_analogies]] |
| === Coprocessor Analogies |
| |
| Triggers and Stored Procedure:: |
| An Observer coprocessor is similar to a trigger in a RDBMS in that it executes |
| your code either before or after a specific event (such as a `Get` or `Put`) |
| occurs. An endpoint coprocessor is similar to a stored procedure in a RDBMS |
| because it allows you to perform custom computations on the data on the |
| RegionServer itself, rather than on the client. |
| |
| MapReduce:: |
| MapReduce operates on the principle of moving the computation to the location of |
| the data. Coprocessors operate on the same principal. |
| |
| AOP:: |
| If you are familiar with Aspect Oriented Programming (AOP), you can think of a coprocessor |
| as applying advice by intercepting a request and then running some custom code, |
| before passing the request on to its final destination (or even changing the destination). |
| |
| |
| === Coprocessor Implementation Overview |
| |
| . Your class should implement one of the Coprocessor interfaces - |
| link:https://hbase.apache.org/devapidocs/org/apache/hadoop/hbase/Coprocessor.html[Coprocessor], |
| link:https://hbase.apache.org/devapidocs/org/apache/hadoop/hbase/coprocessor/RegionObserver.html[RegionObserver], |
| link:https://hbase.apache.org/devapidocs/org/apache/hadoop/hbase/coprocessor/CoprocessorService.html[CoprocessorService] - to name a few. |
| |
| . Load the coprocessor, either statically (from the configuration) or dynamically, |
| using HBase Shell. For more details see <<cp_loading,Loading Coprocessors>>. |
| |
| . Call the coprocessor from your client-side code. HBase handles the coprocessor |
| transparently. |
| |
| The framework API is provided in the |
| link:https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/coprocessor/package-summary.html[coprocessor] |
| package. |
| |
| == Types of Coprocessors |
| |
| === Observer Coprocessors |
| |
| Observer coprocessors are triggered either before or after a specific event occurs. |
| Observers that happen before an event use methods that start with a `pre` prefix, |
| such as link:https://hbase.apache.org/devapidocs/org/apache/hadoop/hbase/coprocessor/RegionObserver.html#prePut-org.apache.hadoop.hbase.coprocessor.ObserverContext-org.apache.hadoop.hbase.client.Put-org.apache.hadoop.hbase.wal.WALEdit-org.apache.hadoop.hbase.client.Durability-[`prePut`]. Observers that happen just after an event override methods that start |
| with a `post` prefix, such as link:https://hbase.apache.org/devapidocs/org/apache/hadoop/hbase/coprocessor/RegionObserver.html#postPut-org.apache.hadoop.hbase.coprocessor.ObserverContext-org.apache.hadoop.hbase.client.Put-org.apache.hadoop.hbase.wal.WALEdit-org.apache.hadoop.hbase.client.Durability-[`postPut`]. |
| |
| |
| ==== Use Cases for Observer Coprocessors |
| Security:: |
| Before performing a `Get` or `Put` operation, you can check for permission using |
| `preGet` or `prePut` methods. |
| |
| Referential Integrity:: |
| HBase does not directly support the RDBMS concept of refential integrity, also known |
| as foreign keys. You can use a coprocessor to enforce such integrity. For instance, |
| if you have a business rule that every insert to the `users` table must be followed |
| by a corresponding entry in the `user_daily_attendance` table, you could implement |
| a coprocessor to use the `prePut` method on `user` to insert a record into `user_daily_attendance`. |
| |
| Secondary Indexes:: |
| You can use a coprocessor to maintain secondary indexes. For more information, see |
| link:https://cwiki.apache.org/confluence/display/HADOOP2/Hbase+SecondaryIndexing[SecondaryIndexing]. |
| |
| |
| ==== Types of Observer Coprocessor |
| |
| RegionObserver:: |
| A RegionObserver coprocessor allows you to observe events on a region, such as `Get` |
| and `Put` operations. See |
| link:https://hbase.apache.org/devapidocs/org/apache/hadoop/hbase/coprocessor/RegionObserver.html[RegionObserver]. |
| |
| RegionServerObserver:: |
| A RegionServerObserver allows you to observe events related to the RegionServer's |
| operation, such as starting, stopping, or performing merges, commits, or rollbacks. |
| See |
| link:https://hbase.apache.org/devapidocs/org/apache/hadoop/hbase/coprocessor/RegionServerObserver.html[RegionServerObserver]. |
| |
| MasterObserver:: |
| A MasterObserver allows you to observe events related to the HBase Master, such |
| as table creation, deletion, or schema modification. See |
| link:https://hbase.apache.org/devapidocs/org/apache/hadoop/hbase/coprocessor/MasterObserver.html[MasterObserver]. |
| |
| WalObserver:: |
| A WalObserver allows you to observe events related to writes to the Write-Ahead |
| Log (WAL). See |
| link:https://hbase.apache.org/devapidocs/org/apache/hadoop/hbase/coprocessor/WALObserver.html[WALObserver]. |
| |
| <<cp_example,Examples>> provides working examples of observer coprocessors. |
| |
| |
| |
| [[cpeps]] |
| === Endpoint Coprocessor |
| |
| Endpoint processors allow you to perform computation at the location of the data. |
| See <<cp_analogies, Coprocessor Analogy>>. An example is the need to calculate a running |
| average or summation for an entire table which spans hundreds of regions. |
| |
| In contrast to observer coprocessors, where your code is run transparently, endpoint |
| coprocessors must be explicitly invoked using the |
| link:https://hbase.apache.org/devapidocs/org/apache/hadoop/hbase/client/AsyncTable.html#coprocessorService-java.util.function.Function-org.apache.hadoop.hbase.client.ServiceCaller-byte:A-[CoprocessorService()] |
| method available in |
| link:https://hbase.apache.org/devapidocs/org/apache/hadoop/hbase/client/AsyncTable.html[AsyncTable]. |
| |
| [WARNING] |
| .On using coprocessorService method with sync client |
| ==== |
| The coprocessorService method in link:https://hbase.apache.org/devapidocs/org/apache/hadoop/hbase/client/Table.html[Table] |
| has been deprecated. |
| |
| In link:https://issues.apache.org/jira/browse/HBASE-21512[HBASE-21512] |
| we reimplement the sync client based on the async client. The coprocessorService |
| method defined in `Table` interface directly references a method from protobuf's |
| `BlockingInterface`, which means we need to use a separate thread pool to execute |
| the method so we avoid blocking the async client(We want to avoid blocking calls in |
| our async implementation). |
| |
| Since coprocessor is an advanced feature, we believe it is OK for coprocessor users to |
| instead switch over to use `AsyncTable`. There is a lightweight |
| link:https://hbase.apache.org/devapidocs/org/apache/hadoop/hbase/client/Connection.html#toAsyncConnection--[toAsyncConnection] |
| method to get an `AsyncConnection` from `Connection` if needed. |
| ==== |
| |
| Starting with HBase 0.96, endpoint coprocessors are implemented using Google Protocol |
| Buffers (protobuf). For more details on protobuf, see Google's |
| link:https://developers.google.com/protocol-buffers/docs/proto[Protocol Buffer Guide]. |
| Endpoints Coprocessor written in version 0.94 are not compatible with version 0.96 or later. |
| See |
| link:https://issues.apache.org/jira/browse/HBASE-5448[HBASE-5448]). To upgrade your |
| HBase cluster from 0.94 or earlier to 0.96 or later, you need to reimplement your |
| coprocessor. |
| |
| In HBase 2.x, we made use of a shaded version of protobuf 3.x, but kept the |
| protobuf for coprocessors on 2.5.0. In HBase 3.0.0, we removed all dependencies on |
| non-shaded protobuf so you need to reimplement your coprocessor to make use of the |
| shaded protobuf version provided in hbase-thirdparty. Please see |
| the <<protobuf,protobuf>> section for more details. |
| |
| Coprocessor Endpoints should make no use of HBase internals and |
| only avail of public APIs; ideally a CPEP should depend on Interfaces |
| and data structures only. This is not always possible but beware |
| that doing so makes the Endpoint brittle, liable to breakage as HBase |
| internals evolve. HBase internal APIs annotated as private or evolving |
| do not have to respect semantic versioning rules or general java rules on |
| deprecation before removal. While generated protobuf files are |
| absent the hbase audience annotations -- they are created by the |
| protobuf protoc tool which knows nothing of how HBase works -- |
| they should be consided `@InterfaceAudience.Private` so are liable to |
| change. |
| |
| <<cp_example,Examples>> provides working examples of endpoint coprocessors. |
| |
| [[cp_loading]] |
| == Loading Coprocessors |
| |
| To make your coprocessor available to HBase, it must be _loaded_, either statically |
| (through the HBase configuration) or dynamically (using HBase Shell or the Java API). |
| |
| === Static Loading |
| |
| Follow these steps to statically load your coprocessor. Keep in mind that you must |
| restart HBase to unload a coprocessor that has been loaded statically. |
| |
| . Define the Coprocessor in _hbase-site.xml_, with a <property> element with a <name> |
| and a <value> sub-element. The <name> should be one of the following: |
| + |
| - `hbase.coprocessor.region.classes` for RegionObservers and Endpoints. |
| - `hbase.coprocessor.wal.classes` for WALObservers. |
| - `hbase.coprocessor.master.classes` for MasterObservers. |
| + |
| <value> must contain the fully-qualified class name of your coprocessor's implementation |
| class. |
| + |
| For example to load a Coprocessor (implemented in class SumEndPoint.java) you have to create |
| following entry in RegionServer's 'hbase-site.xml' file (generally located under 'conf' directory): |
| + |
| [source,xml] |
| ---- |
| <property> |
| <name>hbase.coprocessor.region.classes</name> |
| <value>org.myname.hbase.coprocessor.endpoint.SumEndPoint</value> |
| </property> |
| ---- |
| + |
| If multiple classes are specified for loading, the class names must be comma-separated. |
| The framework attempts to load all the configured classes using the default class loader. |
| Therefore, the jar file must reside on the server-side HBase classpath. |
| |
| + |
| Coprocessors which are loaded in this way will be active on all regions of all tables. |
| These are also called system Coprocessor. |
| The first listed Coprocessors will be assigned the priority `Coprocessor.Priority.SYSTEM`. |
| Each subsequent coprocessor in the list will have its priority value incremented by one (which |
| reduces its priority, because priorities have the natural sort order of Integers). |
| |
| + |
| These priority values can be manually overriden in hbase-site.xml. This can be useful if you |
| want to guarantee that a coprocessor will execute after another. For example, in the following |
| configuration `SumEndPoint` would be guaranteed to go last, except in the case of a tie with |
| another coprocessor: |
| + |
| [source,xml] |
| ---- |
| <property> |
| <name>hbase.coprocessor.region.classes</name> |
| <value>org.myname.hbase.coprocessor.endpoint.SumEndPoint|2147483647</value> |
| </property> |
| ---- |
| |
| + |
| When calling out to registered observers, the framework executes their callbacks methods in the |
| sorted order of their priority. + |
| Ties are broken arbitrarily. |
| |
| . Put your code on HBase's classpath. One easy way to do this is to drop the jar |
| (containing you code and all the dependencies) into the `lib/` directory in the |
| HBase installation. |
| |
| . Restart HBase. |
| |
| |
| === Static Unloading |
| |
| . Delete the coprocessor's <property> element, including sub-elements, from `hbase-site.xml`. |
| . Restart HBase. |
| . Optionally, remove the coprocessor's JAR file from the classpath or HBase's `lib/` |
| directory. |
| |
| |
| === Dynamic Loading |
| |
| You can also load a coprocessor dynamically, without restarting HBase. This may seem |
| preferable to static loading, but dynamically loaded coprocessors are loaded on a |
| per-table basis, and are only available to the table for which they were loaded. For |
| this reason, dynamically loaded tables are sometimes called *Table Coprocessor*. |
| |
| In addition, dynamically loading a coprocessor acts as a schema change on the table, |
| and the table must be taken offline to load the coprocessor. |
| |
| There are three ways to dynamically load Coprocessor. |
| |
| [NOTE] |
| .Assumptions |
| ==== |
| The below mentioned instructions makes the following assumptions: |
| |
| * A JAR called `coprocessor.jar` contains the Coprocessor implementation along with all of its |
| dependencies. |
| * The JAR is available in HDFS in some location like |
| `hdfs://<namenode>:<port>/user/<hadoop-user>/coprocessor.jar`. |
| ==== |
| |
| [[load_coprocessor_in_shell]] |
| ==== Using HBase Shell |
| |
| . Load the Coprocessor, using a command like the following: |
| + |
| [source] |
| ---- |
| hbase alter 'users', METHOD => 'table_att', 'Coprocessor'=>'hdfs://<namenode>:<port>/ |
| user/<hadoop-user>/coprocessor.jar| org.myname.hbase.Coprocessor.RegionObserverExample|1073741823| |
| arg1=1,arg2=2' |
| ---- |
| + |
| The Coprocessor framework will try to read the class information from the coprocessor table |
| attribute value. |
| The value contains four pieces of information which are separated by the pipe (`|`) character. |
| + |
| * File path: The jar file containing the Coprocessor implementation must be in a location where |
| all region servers can read it. + |
| You could copy the file onto the local disk on each region server, but it is recommended to store |
| it in HDFS. + |
| https://issues.apache.org/jira/browse/HBASE-14548[HBASE-14548] allows a directory containing the jars |
| or some wildcards to be specified, such as: hdfs://<namenode>:<port>/user/<hadoop-user>/ or |
| hdfs://<namenode>:<port>/user/<hadoop-user>/*.jar. Please note that if a directory is specified, |
| all jar files(.jar) in the directory are added. It does not search for files in sub-directories. |
| Do not use a wildcard if you would like to specify a directory. This enhancement applies to the |
| usage via the JAVA API as well. |
| * Class name: The full class name of the Coprocessor. |
| * Priority: An integer. The framework will determine the execution sequence of all configured |
| observers registered at the same hook using priorities. This field can be left blank. In that |
| case the framework will assign a default priority value. |
| * Arguments (Optional): This field is passed to the Coprocessor implementation. This is optional. |
| |
| . Verify that the coprocessor loaded: |
| + |
| ---- |
| hbase(main):04:0> describe 'users' |
| ---- |
| + |
| The coprocessor should be listed in the `TABLE_ATTRIBUTES`. |
| |
| ==== Using the Java API (all HBase versions) |
| |
| The following Java code shows how to use the `setValue()` method of `HTableDescriptor` |
| to load a coprocessor on the `users` table. |
| |
| [source,java] |
| ---- |
| TableName tableName = TableName.valueOf("users"); |
| String path = "hdfs://<namenode>:<port>/user/<hadoop-user>/coprocessor.jar"; |
| Configuration conf = HBaseConfiguration.create(); |
| Connection connection = ConnectionFactory.createConnection(conf); |
| Admin admin = connection.getAdmin(); |
| HTableDescriptor hTableDescriptor = new HTableDescriptor(tableName); |
| HColumnDescriptor columnFamily1 = new HColumnDescriptor("personalDet"); |
| columnFamily1.setMaxVersions(3); |
| hTableDescriptor.addFamily(columnFamily1); |
| HColumnDescriptor columnFamily2 = new HColumnDescriptor("salaryDet"); |
| columnFamily2.setMaxVersions(3); |
| hTableDescriptor.addFamily(columnFamily2); |
| hTableDescriptor.setValue("COPROCESSOR$1", path + "|" |
| + RegionObserverExample.class.getCanonicalName() + "|" |
| + Coprocessor.PRIORITY_USER); |
| admin.modifyTable(tableName, hTableDescriptor); |
| ---- |
| |
| ==== Using the Java API (HBase 0.96+ only) |
| |
| In HBase 0.96 and newer, the `addCoprocessor()` method of `HTableDescriptor` provides |
| an easier way to load a coprocessor dynamically. |
| |
| [source,java] |
| ---- |
| TableName tableName = TableName.valueOf("users"); |
| Path path = new Path("hdfs://<namenode>:<port>/user/<hadoop-user>/coprocessor.jar"); |
| Configuration conf = HBaseConfiguration.create(); |
| Connection connection = ConnectionFactory.createConnection(conf); |
| Admin admin = connection.getAdmin(); |
| HTableDescriptor hTableDescriptor = new HTableDescriptor(tableName); |
| HColumnDescriptor columnFamily1 = new HColumnDescriptor("personalDet"); |
| columnFamily1.setMaxVersions(3); |
| hTableDescriptor.addFamily(columnFamily1); |
| HColumnDescriptor columnFamily2 = new HColumnDescriptor("salaryDet"); |
| columnFamily2.setMaxVersions(3); |
| hTableDescriptor.addFamily(columnFamily2); |
| hTableDescriptor.addCoprocessor(RegionObserverExample.class.getCanonicalName(), path, |
| Coprocessor.PRIORITY_USER, null); |
| admin.modifyTable(tableName, hTableDescriptor); |
| ---- |
| |
| WARNING: There is no guarantee that the framework will load a given Coprocessor successfully. |
| For example, the shell command neither guarantees a jar file exists at a particular location nor |
| verifies whether the given class is actually contained in the jar file. |
| |
| |
| === Dynamic Unloading |
| |
| ==== Using HBase Shell |
| |
| . Alter the table to remove the coprocessor. |
| + |
| [source] |
| ---- |
| hbase> alter 'users', METHOD => 'table_att_unset', NAME => 'coprocessor$1' |
| ---- |
| |
| ==== Using the Java API |
| |
| Reload the table definition without setting the value of the coprocessor either by |
| using `setValue()` or `addCoprocessor()` methods. This will remove any coprocessor |
| attached to the table. |
| |
| [source,java] |
| ---- |
| TableName tableName = TableName.valueOf("users"); |
| String path = "hdfs://<namenode>:<port>/user/<hadoop-user>/coprocessor.jar"; |
| Configuration conf = HBaseConfiguration.create(); |
| Connection connection = ConnectionFactory.createConnection(conf); |
| Admin admin = connection.getAdmin(); |
| HTableDescriptor hTableDescriptor = new HTableDescriptor(tableName); |
| HColumnDescriptor columnFamily1 = new HColumnDescriptor("personalDet"); |
| columnFamily1.setMaxVersions(3); |
| hTableDescriptor.addFamily(columnFamily1); |
| HColumnDescriptor columnFamily2 = new HColumnDescriptor("salaryDet"); |
| columnFamily2.setMaxVersions(3); |
| hTableDescriptor.addFamily(columnFamily2); |
| admin.modifyTable(tableName, hTableDescriptor); |
| ---- |
| |
| In HBase 0.96 and newer, you can instead use the `removeCoprocessor()` method of the |
| `HTableDescriptor` class. |
| |
| |
| [[cp_example]] |
| == Examples |
| HBase ships examples for Observer Coprocessor. |
| |
| A more detailed example is given below. |
| |
| These examples assume a table called `users`, which has two column families `personalDet` |
| and `salaryDet`, containing personal and salary details. Below is the graphical representation |
| of the `users` table. |
| |
| .Users Table |
| [width="100%",cols="7",options="header,footer"] |
| |==================== |
| | 3+|personalDet 3+|salaryDet |
| |*rowkey* |*name* |*lastname* |*dob* |*gross* |*net* |*allowances* |
| |admin |Admin |Admin | 3+| |
| |cdickens |Charles |Dickens |02/07/1812 |10000 |8000 |2000 |
| |jverne |Jules |Verne |02/08/1828 |12000 |9000 |3000 |
| |==================== |
| |
| |
| === Observer Example |
| |
| The following Observer coprocessor prevents the details of the user `admin` from being |
| returned in a `Get` or `Scan` of the `users` table. |
| |
| . Write a class that implements the |
| link:https://hbase.apache.org/devapidocs/org/apache/hadoop/hbase/coprocessor/RegionCoprocessor.html[RegionCoprocessor], |
| link:https://hbase.apache.org/devapidocs/org/apache/hadoop/hbase/coprocessor/RegionObserver.html[RegionObserver] |
| class. |
| |
| . Override the `preGetOp()` method (the `preGet()` method is deprecated) to check |
| whether the client has queried for the rowkey with value `admin`. If so, return an |
| empty result. Otherwise, process the request as normal. |
| |
| . Put your code and dependencies in a JAR file. |
| |
| . Place the JAR in HDFS where HBase can locate it. |
| |
| . Load the Coprocessor. |
| |
| . Write a simple program to test it. |
| |
| Following are the implementation of the above steps: |
| |
| [source,java] |
| ---- |
| public class RegionObserverExample implements RegionCoprocessor, RegionObserver { |
| |
| private static final byte[] ADMIN = Bytes.toBytes("admin"); |
| private static final byte[] COLUMN_FAMILY = Bytes.toBytes("details"); |
| private static final byte[] COLUMN = Bytes.toBytes("Admin_det"); |
| private static final byte[] VALUE = Bytes.toBytes("You can't see Admin details"); |
| |
| @Override |
| public Optional<RegionObserver> getRegionObserver() { |
| return Optional.of(this); |
| } |
| |
| @Override |
| public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e, final Get get, final List<Cell> results) |
| throws IOException { |
| |
| if (Bytes.equals(get.getRow(),ADMIN)) { |
| Cell c = CellUtil.createCell(get.getRow(),COLUMN_FAMILY, COLUMN, |
| System.currentTimeMillis(), (byte)4, VALUE); |
| results.add(c); |
| e.bypass(); |
| } |
| } |
| } |
| ---- |
| |
| Overriding the `preGetOp()` will only work for `Get` operations. You also need to override |
| the `preScannerOpen()` method to filter the `admin` row from scan results. |
| |
| [source,java] |
| ---- |
| @Override |
| public RegionScanner preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> e, final Scan scan, |
| final RegionScanner s) throws IOException { |
| |
| Filter filter = new RowFilter(CompareOp.NOT_EQUAL, new BinaryComparator(ADMIN)); |
| scan.setFilter(filter); |
| return s; |
| } |
| ---- |
| |
| This method works but there is a _side effect_. If the client has used a filter in |
| its scan, that filter will be replaced by this filter. Instead, you can explicitly |
| remove any `admin` results from the scan: |
| |
| [source,java] |
| ---- |
| @Override |
| public boolean postScannerNext(final ObserverContext<RegionCoprocessorEnvironment> e, final InternalScanner s, |
| final List<Result> results, final int limit, final boolean hasMore) throws IOException { |
| Result result = null; |
| Iterator<Result> iterator = results.iterator(); |
| while (iterator.hasNext()) { |
| result = iterator.next(); |
| if (Bytes.equals(result.getRow(), ROWKEY)) { |
| iterator.remove(); |
| break; |
| } |
| } |
| return hasMore; |
| } |
| ---- |
| |
| === Endpoint Example |
| |
| Still using the `users` table, this example implements a coprocessor to calculate |
| the sum of all employee salaries, using an endpoint coprocessor. |
| |
| . Create a '.proto' file defining your service. |
| + |
| [source] |
| ---- |
| option java_package = "org.myname.hbase.coprocessor.autogenerated"; |
| option java_outer_classname = "Sum"; |
| option java_generic_services = true; |
| option java_generate_equals_and_hash = true; |
| option optimize_for = SPEED; |
| message SumRequest { |
| required string family = 1; |
| required string column = 2; |
| } |
| |
| message SumResponse { |
| required int64 sum = 1 [default = 0]; |
| } |
| |
| service SumService { |
| rpc getSum(SumRequest) |
| returns (SumResponse); |
| } |
| ---- |
| |
| . Execute the `protoc` command to generate the Java code from the above .proto' file. |
| + |
| [source] |
| ---- |
| $ mkdir src |
| $ protoc --java_out=src ./sum.proto |
| ---- |
| + |
| This will generate a class call `Sum.java`. |
| |
| . Write a class that extends the generated service class, implement the `Coprocessor` |
| and `CoprocessorService` classes, and override the service method. |
| + |
| WARNING: If you load a coprocessor from `hbase-site.xml` and then load the same coprocessor |
| again using HBase Shell, it will be loaded a second time. The same class will |
| exist twice, and the second instance will have a higher ID (and thus a lower priority). |
| The effect is that the duplicate coprocessor is effectively ignored. |
| + |
| [source, java] |
| ---- |
| public class SumEndPoint extends Sum.SumService implements Coprocessor, CoprocessorService { |
| |
| private RegionCoprocessorEnvironment env; |
| |
| @Override |
| public Service getService() { |
| return this; |
| } |
| |
| @Override |
| public void start(CoprocessorEnvironment env) throws IOException { |
| if (env instanceof RegionCoprocessorEnvironment) { |
| this.env = (RegionCoprocessorEnvironment)env; |
| } else { |
| throw new CoprocessorException("Must be loaded on a table region!"); |
| } |
| } |
| |
| @Override |
| public void stop(CoprocessorEnvironment env) throws IOException { |
| // do nothing |
| } |
| |
| @Override |
| public void getSum(RpcController controller, Sum.SumRequest request, RpcCallback<Sum.SumResponse> done) { |
| Scan scan = new Scan(); |
| scan.addFamily(Bytes.toBytes(request.getFamily())); |
| scan.addColumn(Bytes.toBytes(request.getFamily()), Bytes.toBytes(request.getColumn())); |
| |
| Sum.SumResponse response = null; |
| InternalScanner scanner = null; |
| |
| try { |
| scanner = env.getRegion().getScanner(scan); |
| List<Cell> results = new ArrayList<>(); |
| boolean hasMore = false; |
| long sum = 0L; |
| |
| do { |
| hasMore = scanner.next(results); |
| for (Cell cell : results) { |
| sum = sum + Bytes.toLong(CellUtil.cloneValue(cell)); |
| } |
| results.clear(); |
| } while (hasMore); |
| |
| response = Sum.SumResponse.newBuilder().setSum(sum).build(); |
| } catch (IOException ioe) { |
| ResponseConverter.setControllerException(controller, ioe); |
| } finally { |
| if (scanner != null) { |
| try { |
| scanner.close(); |
| } catch (IOException ignored) {} |
| } |
| } |
| |
| done.run(response); |
| } |
| } |
| ---- |
| + |
| [source, java] |
| ---- |
| Configuration conf = HBaseConfiguration.create(); |
| Connection connection = ConnectionFactory.createConnection(conf); |
| TableName tableName = TableName.valueOf("users"); |
| Table table = connection.getTable(tableName); |
| |
| final Sum.SumRequest request = Sum.SumRequest.newBuilder().setFamily("salaryDet").setColumn("gross").build(); |
| try { |
| Map<byte[], Long> results = table.coprocessorService( |
| Sum.SumService.class, |
| null, /* start key */ |
| null, /* end key */ |
| new Batch.Call<Sum.SumService, Long>() { |
| @Override |
| public Long call(Sum.SumService aggregate) throws IOException { |
| BlockingRpcCallback<Sum.SumResponse> rpcCallback = new BlockingRpcCallback<>(); |
| aggregate.getSum(null, request, rpcCallback); |
| Sum.SumResponse response = rpcCallback.get(); |
| |
| return response.hasSum() ? response.getSum() : 0L; |
| } |
| } |
| ); |
| |
| for (Long sum : results.values()) { |
| System.out.println("Sum = " + sum); |
| } |
| } catch (ServiceException e) { |
| e.printStackTrace(); |
| } catch (Throwable e) { |
| e.printStackTrace(); |
| } |
| ---- |
| |
| . Load the Coprocessor. |
| |
| . Write a client code to call the Coprocessor. |
| |
| |
| == Guidelines For Deploying A Coprocessor |
| |
| Bundling Coprocessors:: |
| You can bundle all classes for a coprocessor into a |
| single JAR on the RegionServer's classpath, for easy deployment. Otherwise, |
| place all dependencies on the RegionServer's classpath so that they can be |
| loaded during RegionServer start-up. The classpath for a RegionServer is set |
| in the RegionServer's `hbase-env.sh` file. |
| Automating Deployment:: |
| You can use a tool such as Puppet, Chef, or |
| Ansible to ship the JAR for the coprocessor to the required location on your |
| RegionServers' filesystems and restart each RegionServer, to automate |
| coprocessor deployment. Details for such set-ups are out of scope of this |
| document. |
| Updating a Coprocessor:: |
| Deploying a new version of a given coprocessor is not as simple as disabling it, |
| replacing the JAR, and re-enabling the coprocessor. This is because you cannot |
| reload a class in a JVM unless you delete all the current references to it. |
| Since the current JVM has reference to the existing coprocessor, you must restart |
| the JVM, by restarting the RegionServer, in order to replace it. This behavior |
| is not expected to change. |
| Coprocessor Logging:: |
| The Coprocessor framework does not provide an API for logging beyond standard Java |
| logging. |
| Coprocessor Configuration:: |
| If you do not want to load coprocessors from the HBase Shell, you can add their configuration |
| properties to `hbase-site.xml`. In <<load_coprocessor_in_shell>>, two arguments are |
| set: `arg1=1,arg2=2`. These could have been added to `hbase-site.xml` as follows: |
| [source,xml] |
| ---- |
| <property> |
| <name>arg1</name> |
| <value>1</value> |
| </property> |
| <property> |
| <name>arg2</name> |
| <value>2</value> |
| </property> |
| ---- |
| Then you can read the configuration using code like the following: |
| [source,java] |
| ---- |
| Configuration conf = HBaseConfiguration.create(); |
| Connection connection = ConnectionFactory.createConnection(conf); |
| TableName tableName = TableName.valueOf("users"); |
| Table table = connection.getTable(tableName); |
| |
| Get get = new Get(Bytes.toBytes("admin")); |
| Result result = table.get(get); |
| for (Cell c : result.rawCells()) { |
| System.out.println(Bytes.toString(CellUtil.cloneRow(c)) |
| + "==> " + Bytes.toString(CellUtil.cloneFamily(c)) |
| + "{" + Bytes.toString(CellUtil.cloneQualifier(c)) |
| + ":" + Bytes.toLong(CellUtil.cloneValue(c)) + "}"); |
| } |
| Scan scan = new Scan(); |
| ResultScanner scanner = table.getScanner(scan); |
| for (Result res : scanner) { |
| for (Cell c : res.rawCells()) { |
| System.out.println(Bytes.toString(CellUtil.cloneRow(c)) |
| + " ==> " + Bytes.toString(CellUtil.cloneFamily(c)) |
| + " {" + Bytes.toString(CellUtil.cloneQualifier(c)) |
| + ":" + Bytes.toLong(CellUtil.cloneValue(c)) |
| + "}"); |
| } |
| } |
| ---- |
| |
| == Restricting Coprocessor Usage |
| |
| Restricting arbitrary user coprocessors can be a big concern in multitenant environments. HBase provides a continuum of options for ensuring only expected coprocessors are running: |
| |
| - `hbase.coprocessor.enabled`: Enables or disables all coprocessors. This will limit the functionality of HBase, as disabling all coprocessors will disable some security providers. An example coproccessor so affected is `org.apache.hadoop.hbase.security.access.AccessController`. |
| * `hbase.coprocessor.user.enabled`: Enables or disables loading coprocessors on tables (i.e. user coprocessors). |
| * One can statically load coprocessors, and optionally tune their priorities, via the following tunables in `hbase-site.xml`: |
| ** `hbase.coprocessor.regionserver.classes`: A comma-separated list of coprocessors that are loaded by region servers |
| ** `hbase.coprocessor.region.classes`: A comma-separated list of RegionObserver and Endpoint coprocessors |
| ** `hbase.coprocessor.user.region.classes`: A comma-separated list of coprocessors that are loaded by all regions |
| ** `hbase.coprocessor.master.classes`: A comma-separated list of coprocessors that are loaded by the master (MasterObserver coprocessors) |
| ** `hbase.coprocessor.wal.classes`: A comma-separated list of WALObserver coprocessors to load |
| * `hbase.coprocessor.abortonerror`: Whether to abort the daemon which has loaded the coprocessor if the coprocessor should error other than `IOError`. If this is set to false and an access controller coprocessor should have a fatal error the coprocessor will be circumvented, as such in secure installations this is advised to be `true`; however, one may override this on a per-table basis for user coprocessors, to ensure they do not abort their running region server and are instead unloaded on error. |
| * `hbase.coprocessor.region.whitelist.paths`: A comma separated list available for those loading `org.apache.hadoop.hbase.security.access.CoprocessorWhitelistMasterObserver` whereby one can use the following options to white-list paths from which coprocessors may be loaded. |
| ** Coprocessors on the classpath are implicitly white-listed |
| ** `*` to wildcard all coprocessor paths |
| ** An entire filesystem (e.g. `hdfs://my-cluster/`) |
| ** A wildcard path to be evaluated by link:https://commons.apache.org/proper/commons-io/javadocs/api-release/org/apache/commons/io/FilenameUtils.html[FilenameUtils.wildcardMatch] |
| ** Note: Path can specify scheme or not (e.g. `file:///usr/hbase/lib/coprocessors` or for all filesystems `/usr/hbase/lib/coprocessors`) |