| //// |
| /** |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| //// |
| |
| [[cp]] |
| = Apache HBase Coprocessors |
| :doctype: book |
| :numbered: |
| :toc: left |
| :icons: font |
| :experimental: |
| |
| HBase Coprocessors are modeled after Google BigTable's coprocessor implementation |
| (http://research.google.com/people/jeff/SOCC2010-keynote-slides.pdf pages 41-42.). |
| |
| The coprocessor framework provides mechanisms for running your custom code directly on |
| the RegionServers managing your data. Efforts are ongoing to bridge gaps between HBase's |
| implementation and BigTable's architecture. For more information see |
| link:https://issues.apache.org/jira/browse/HBASE-4047[HBASE-4047]. |
| |
| The information in this chapter is primarily sourced and heavily reused from the following |
| resources: |
| |
| . Mingjie Lai's blog post |
| link:https://blogs.apache.org/hbase/entry/coprocessor_introduction[Coprocessor Introduction]. |
| . Gaurav Bhardwaj's blog post |
| link:http://www.3pillarglobal.com/insights/hbase-coprocessors[The How To Of HBase Coprocessors]. |
| |
| [WARNING] |
| .Use Coprocessors At Your Own Risk |
| ==== |
| Coprocessors are an advanced feature of HBase and are intended to be used by system |
| developers only. Because coprocessor code runs directly on the RegionServer and has |
| direct access to your data, they introduce the risk of data corruption, man-in-the-middle |
| attacks, or other malicious data access. Currently, there is no mechanism to prevent |
| data corruption by coprocessors, though work is underway on |
| link:https://issues.apache.org/jira/browse/HBASE-4047[HBASE-4047]. |
| + |
| In addition, there is no resource isolation, so a well-intentioned but misbehaving |
| coprocessor can severely degrade cluster performance and stability. |
| ==== |
| |
| == Coprocessor Overview |
| |
| In HBase, you fetch data using a `Get` or `Scan`, whereas in an RDBMS you use a SQL |
| query. In order to fetch only the relevant data, you filter it using a HBase |
| link:http://hbase.apache.org/apidocs/org/apache/hadoop/hbase/filter/Filter.html[Filter] |
| , whereas in an RDBMS you use a `WHERE` predicate. |
| |
| After fetching the data, you perform computations on it. This paradigm works well |
| for "small data" with a few thousand rows and several columns. However, when you scale |
| to billions of rows and millions of columns, moving large amounts of data across your |
| network will create bottlenecks at the network layer, and the client needs to be powerful |
| enough and have enough memory to handle the large amounts of data and the computations. |
| In addition, the client code can grow large and complex. |
| |
| In this scenario, coprocessors might make sense. You can put the business computation |
| code into a coprocessor which runs on the RegionServer, in the same location as the |
| data, and returns the result to the client. |
| |
| This is only one scenario where using coprocessors can provide benefit. Following |
| are some analogies which may help to explain some of the benefits of coprocessors. |
| |
| [[cp_analogies]] |
| === Coprocessor Analogies |
| |
| Triggers and Stored Procedure:: |
| An Observer coprocessor is similar to a trigger in a RDBMS in that it executes |
| your code either before or after a specific event (such as a `Get` or `Put`) |
| occurs. An endpoint coprocessor is similar to a stored procedure in a RDBMS |
| because it allows you to perform custom computations on the data on the |
| RegionServer itself, rather than on the client. |
| |
| MapReduce:: |
| MapReduce operates on the principle of moving the computation to the location of |
| the data. Coprocessors operate on the same principal. |
| |
| AOP:: |
| If you are familiar with Aspect Oriented Programming (AOP), you can think of a coprocessor |
| as applying advice by intercepting a request and then running some custom code, |
| before passing the request on to its final destination (or even changing the destination). |
| |
| |
| === Coprocessor Implementation Overview |
| |
| . Either your class should extend one of the Coprocessor classes, such as |
| link:https://hbase.apache.org/devapidocs/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.html[BaseRegionObserver], |
| or it should implement the link:https://hbase.apache.org/devapidocs/org/apache/hadoop/hbase/Coprocessor.html[Coprocessor] |
| or |
| link:https://hbase.apache.org/devapidocs/org/apache/hadoop/hbase/coprocessor/CoprocessorService.html[CoprocessorService] |
| interface. |
| |
| . Load the coprocessor, either statically (from the configuration) or dynamically, |
| using HBase Shell. For more details see <<cp_loading,Loading Coprocessors>>. |
| |
| . Call the coprocessor from your client-side code. HBase handles the coprocessor |
| transparently. |
| |
| The framework API is provided in the |
| link:https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/coprocessor/package-summary.html[coprocessor] |
| package. |
| |
| == Types of Coprocessors |
| |
| === Observer Coprocessors |
| |
| Observer coprocessors are triggered either before or after a specific event occurs. |
| Observers that happen before an event use methods that start with a `pre` prefix, |
| such as link:http://hbase.apache.org/devapidocs/org/apache/hadoop/hbase/coprocessor/RegionObserver.html#prePut%28org.apache.hadoop.hbase.coprocessor.ObserverContext,%20org.apache.hadoop.hbase.client.Put,%20org.apache.hadoop.hbase.regionserver.wal.WALEdit,%20org.apache.hadoop.hbase.client.Durability%29[`prePut`]. Observers that happen just after an event override methods that start |
| with a `post` prefix, such as link:http://hbase.apache.org/devapidocs/org/apache/hadoop/hbase/coprocessor/RegionObserver.html#postPut%28org.apache.hadoop.hbase.coprocessor.ObserverContext,%20org.apache.hadoop.hbase.client.Put,%20org.apache.hadoop.hbase.regionserver.wal.WALEdit,%20org.apache.hadoop.hbase.client.Durability%29[`postPut`]. |
| |
| |
| ==== Use Cases for Observer Coprocessors |
| Security:: |
| Before performing a `Get` or `Put` operation, you can check for permission using |
| `preGet` or `prePut` methods. |
| |
| Referential Integrity:: |
| HBase does not directly support the RDBMS concept of refential integrity, also known |
| as foreign keys. You can use a coprocessor to enforce such integrity. For instance, |
| if you have a business rule that every insert to the `users` table must be followed |
| by a corresponding entry in the `user_daily_attendance` table, you could implement |
| a coprocessor to use the `prePut` method on `user` to insert a record into `user_daily_attendance`. |
| |
| Secondary Indexes:: |
| You can use a coprocessor to maintain secondary indexes. For more information, see |
| link:http://wiki.apache.org/hadoop/Hbase/SecondaryIndexing[SecondaryIndexing]. |
| |
| |
| ==== Types of Observer Coprocessor |
| |
| RegionObserver:: |
| A RegionObserver coprocessor allows you to observe events on a region, such as `Get` |
| and `Put` operations. See |
| link:https://hbase.apache.org/devapidocs/org/apache/hadoop/hbase/coprocessor/RegionObserver.html[RegionObserver]. |
| Consider overriding the convenience class |
| link:https://hbase.apache.org/devapidocs/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.html[BaseRegionObserver], |
| which implements the `RegionObserver` interface and will not break if new methods are added. |
| |
| RegionServerObserver:: |
| A RegionServerObserver allows you to observe events related to the RegionServer's |
| operation, such as starting, stopping, or performing merges, commits, or rollbacks. |
| See |
| link:https://hbase.apache.org/devapidocs/org/apache/hadoop/hbase/coprocessor/RegionServerObserver.html[RegionServerObserver]. |
| Consider overriding the convenience class |
| https://hbase.apache.org/devapidocs/org/apache/hadoop/hbase/coprocessor/BaseMasterAndRegionObserver.html[BaseMasterAndRegionObserver] |
| which implements both `MasterObserver` and `RegionServerObserver` interfaces and |
| will not break if new methods are added. |
| |
| MasterOvserver:: |
| A MasterObserver allows you to observe events related to the HBase Master, such |
| as table creation, deletion, or schema modification. See |
| link:https://hbase.apache.org/devapidocs/org/apache/hadoop/hbase/coprocessor/MasterObserver.html[MasterObserver]. |
| Consider overriding the convenience class |
| https://hbase.apache.org/devapidocs/org/apache/hadoop/hbase/coprocessor/BaseMasterAndRegionObserver.html[BaseMasterAndRegionObserver], |
| which implements both `MasterObserver` and `RegionServerObserver` interfaces and |
| will not break if new methods are added. |
| |
| WalObserver:: |
| A WalObserver allows you to observe events related to writes to the Write-Ahead |
| Log (WAL). See |
| link:http://hbase.apache.org/devapidocs/org/apache/hadoop/hbase/coprocessor/WALObserver.html[WALObserver]. |
| Consider overriding the convenience class |
| link:https://hbase.apache.org/devapidocs/org/apache/hadoop/hbase/coprocessor/BaseWALObserver.html[BaseWALObserver], |
| which implements the `WalObserver` interface and will not break if new methods are added. |
| |
| <<cp_example,Examples>> provides working examples of observer coprocessors. |
| |
| |
| === Endpoint Coprocessor |
| |
| Endpoint processors allow you to perform computation at the location of the data. |
| See <<cp_analogies, Coprocessor Analogy>>. An example is the need to calculate a running |
| average or summation for an entire table which spans hundreds of regions. |
| |
| In contract to observer coprocessors, where your code is run transparently, endpoint |
| coprocessors must be explicitly invoked using the |
| link:https://hbase.apache.org/devapidocs/org/apache/hadoop/hbase/client/Table.html#coprocessorService%28java.lang.Class,%20byte%5B%5D,%20byte%5B%5D,%20org.apache.hadoop.hbase.client.coprocessor.Batch.Call%29[CoprocessorService()] |
| method available in |
| link:https://hbase.apache.org/devapidocs/org/apache/hadoop/hbase/client/Table.html[Table], |
| link:https://hbase.apache.org/devapidocs/org/apache/hadoop/hbase/client/HTableInterface.html[HTableInterface], |
| or |
| link:https://hbase.apache.org/devapidocs/org/apache/hadoop/hbase/client/HTable.html[HTable]. |
| |
| Starting with HBase 0.96, endpoint coprocessors are implemented using Google Protocol |
| Buffers (protobuf). For more details on protobuf, see Google's |
| link:https://developers.google.com/protocol-buffers/docs/proto[Protocol Buffer Guide]. |
| Endpoints Coprocessor written in version 0.94 are not compatible with version 0.96 or later. |
| See |
| link:https://issues.apache.org/jira/browse/HBASE-5448[HBASE-5448]). To upgrade your |
| HBase cluster from 0.94 or earlier to 0.96 or later, you need to reimplement your |
| coprocessor. |
| |
| <<cp_example,Examples>> provides working examples of endpoint coprocessors. |
| |
| [[cp_loading]] |
| == Loading Coprocessors |
| |
| To make your coprocessor available to HBase, it must be _loaded_, either statically |
| (through the HBase configuration) or dynamically (using HBase Shell or the Java API). |
| |
| === Static Loading |
| |
| Follow these steps to statically load your coprocessor. Keep in mind that you must |
| restart HBase to unload a coprocessor that has been loaded statically. |
| |
| . Define the Coprocessor in _hbase-site.xml_, with a <property> element with a <name> |
| and a <value> sub-element. The <name> should be one of the following: |
| + |
| - `hbase.coprocessor.region.classes` for RegionObservers and Endpoints. |
| - `hbase.coprocessor.wal.classes` for WALObservers. |
| - `hbase.coprocessor.master.classes` for MasterObservers. |
| + |
| <value> must contain the fully-qualified class name of your coprocessor's implementation |
| class. |
| + |
| For example to load a Coprocessor (implemented in class SumEndPoint.java) you have to create |
| following entry in RegionServer's 'hbase-site.xml' file (generally located under 'conf' directory): |
| + |
| [source,xml] |
| ---- |
| <property> |
| <name>hbase.coprocessor.region.classes</name> |
| <value>org.myname.hbase.coprocessor.endpoint.SumEndPoint</value> |
| </property> |
| ---- |
| + |
| If multiple classes are specified for loading, the class names must be comma-separated. |
| The framework attempts to load all the configured classes using the default class loader. |
| Therefore, the jar file must reside on the server-side HBase classpath. |
| + |
| Coprocessors which are loaded in this way will be active on all regions of all tables. |
| These are also called system Coprocessor. |
| The first listed Coprocessors will be assigned the priority `Coprocessor.Priority.SYSTEM`. |
| Each subsequent coprocessor in the list will have its priority value incremented by one (which |
| reduces its priority, because priorities have the natural sort order of Integers). |
| + |
| When calling out to registered observers, the framework executes their callbacks methods in the |
| sorted order of their priority. + |
| Ties are broken arbitrarily. |
| |
| . Put your code HBase's classpath. One easy way to do this is to drop the jar |
| (containing you code and all the dependencies) into the `lib/` directory in the |
| HBase installation. |
| |
| . Restart HBase. |
| |
| |
| === Static Unloading |
| |
| . Delete the coprocessor's <property> element, including sub-elements, from `hbase-site.xml`. |
| . Restart HBase. |
| . Optionally, remove the coprocessor's JAR file from the classpath or HBase's `lib/` |
| directory. |
| |
| |
| === Dynamic Loading |
| |
| You can also load a coprocessor dynamically, without restarting HBase. This may seem |
| preferable to static loading, but dynamically loaded coprocessors are loaded on a |
| per-table basis, and are only available to the table for which they were loaded. For |
| this reason, dynamically loaded tables are sometimes called *Table Coprocessor*. |
| |
| In addition, dynamically loading a coprocessor acts as a schema change on the table, |
| and the table must be taken offline to load the coprocessor. |
| |
| There are three ways to dynamically load Coprocessor. |
| |
| [NOTE] |
| .Assumptions |
| ==== |
| The below mentioned instructions makes the following assumptions: |
| |
| * A JAR called `coprocessor.jar` contains the Coprocessor implementation along with all of its |
| dependencies. |
| * The JAR is available in HDFS in some location like |
| `hdfs://<namenode>:<port>/user/<hadoop-user>/coprocessor.jar`. |
| ==== |
| |
| [[load_coprocessor_in_shell]] |
| ==== Using HBase Shell |
| |
| . Disable the table using HBase Shell: |
| + |
| [source] |
| ---- |
| hbase> disable 'users' |
| ---- |
| |
| . Load the Coprocessor, using a command like the following: |
| + |
| [source] |
| ---- |
| hbase alter 'users', METHOD => 'table_att', 'Coprocessor'=>'hdfs://<namenode>:<port>/ |
| user/<hadoop-user>/coprocessor.jar| org.myname.hbase.Coprocessor.RegionObserverExample|1073741823| |
| arg1=1,arg2=2' |
| ---- |
| + |
| The Coprocessor framework will try to read the class information from the coprocessor table |
| attribute value. |
| The value contains four pieces of information which are separated by the pipe (`|`) character. |
| + |
| * File path: The jar file containing the Coprocessor implementation must be in a location where |
| all region servers can read it. + |
| You could copy the file onto the local disk on each region server, but it is recommended to store |
| it in HDFS. + |
| https://issues.apache.org/jira/browse/HBASE-14548[HBASE-14548] allows a directory containing the jars |
| or some wildcards to be specified, such as: hdfs://<namenode>:<port>/user/<hadoop-user>/ or |
| hdfs://<namenode>:<port>/user/<hadoop-user>/*.jar. Please note that if a directory is specified, |
| all jar files(.jar) directly in the directory are added, |
| but it does not search files in the subtree rooted in the directory. |
| And do not contain any wildcard if you would like to specify a directory. |
| This enhancement applies to the ways of using the JAVA API as well. |
| * Class name: The full class name of the Coprocessor. |
| * Priority: An integer. The framework will determine the execution sequence of all configured |
| observers registered at the same hook using priorities. This field can be left blank. In that |
| case the framework will assign a default priority value. |
| * Arguments (Optional): This field is passed to the Coprocessor implementation. This is optional. |
| |
| . Enable the table. |
| + |
| ---- |
| hbase(main):003:0> enable 'users' |
| ---- |
| |
| . Verify that the coprocessor loaded: |
| + |
| ---- |
| hbase(main):04:0> describe 'users' |
| ---- |
| + |
| The coprocessor should be listed in the `TABLE_ATTRIBUTES`. |
| |
| ==== Using the Java API (all HBase versions) |
| |
| The following Java code shows how to use the `setValue()` method of `HTableDescriptor` |
| to load a coprocessor on the `users` table. |
| |
| [source,java] |
| ---- |
| TableName tableName = TableName.valueOf("users"); |
| String path = "hdfs://<namenode>:<port>/user/<hadoop-user>/coprocessor.jar"; |
| Configuration conf = HBaseConfiguration.create(); |
| Connection connection = ConnectionFactory.createConnection(conf); |
| Admin admin = connection.getAdmin(); |
| admin.disableTable(tableName); |
| HTableDescriptor hTableDescriptor = new HTableDescriptor(tableName); |
| HColumnDescriptor columnFamily1 = new HColumnDescriptor("personalDet"); |
| columnFamily1.setMaxVersions(3); |
| hTableDescriptor.addFamily(columnFamily1); |
| HColumnDescriptor columnFamily2 = new HColumnDescriptor("salaryDet"); |
| columnFamily2.setMaxVersions(3); |
| hTableDescriptor.addFamily(columnFamily2); |
| hTableDescriptor.setValue("COPROCESSOR$1", path + "|" |
| + RegionObserverExample.class.getCanonicalName() + "|" |
| + Coprocessor.PRIORITY_USER); |
| admin.modifyTable(tableName, hTableDescriptor); |
| admin.enableTable(tableName); |
| ---- |
| |
| ==== Using the Java API (HBase 0.96+ only) |
| |
| In HBase 0.96 and newer, the `addCoprocessor()` method of `HTableDescriptor` provides |
| an easier way to load a coprocessor dynamically. |
| |
| [source,java] |
| ---- |
| TableName tableName = TableName.valueOf("users"); |
| Path path = new Path("hdfs://<namenode>:<port>/user/<hadoop-user>/coprocessor.jar"); |
| Configuration conf = HBaseConfiguration.create(); |
| Connection connection = ConnectionFactory.createConnection(conf); |
| Admin admin = connection.getAdmin(); |
| admin.disableTable(tableName); |
| HTableDescriptor hTableDescriptor = new HTableDescriptor(tableName); |
| HColumnDescriptor columnFamily1 = new HColumnDescriptor("personalDet"); |
| columnFamily1.setMaxVersions(3); |
| hTableDescriptor.addFamily(columnFamily1); |
| HColumnDescriptor columnFamily2 = new HColumnDescriptor("salaryDet"); |
| columnFamily2.setMaxVersions(3); |
| hTableDescriptor.addFamily(columnFamily2); |
| hTableDescriptor.addCoprocessor(RegionObserverExample.class.getCanonicalName(), path, |
| Coprocessor.PRIORITY_USER, null); |
| admin.modifyTable(tableName, hTableDescriptor); |
| admin.enableTable(tableName); |
| ---- |
| |
| WARNING: There is no guarantee that the framework will load a given Coprocessor successfully. |
| For example, the shell command neither guarantees a jar file exists at a particular location nor |
| verifies whether the given class is actually contained in the jar file. |
| |
| |
| === Dynamic Unloading |
| |
| ==== Using HBase Shell |
| |
| . Disable the table. |
| + |
| [source] |
| ---- |
| hbase> disable 'users' |
| ---- |
| |
| . Alter the table to remove the coprocessor. |
| + |
| [source] |
| ---- |
| hbase> alter 'users', METHOD => 'table_att_unset', NAME => 'coprocessor$1' |
| ---- |
| |
| . Enable the table. |
| + |
| [source] |
| ---- |
| hbase> enable 'users' |
| ---- |
| |
| ==== Using the Java API |
| |
| Reload the table definition without setting the value of the coprocessor either by |
| using `setValue()` or `addCoprocessor()` methods. This will remove any coprocessor |
| attached to the table. |
| |
| [source,java] |
| ---- |
| TableName tableName = TableName.valueOf("users"); |
| String path = "hdfs://<namenode>:<port>/user/<hadoop-user>/coprocessor.jar"; |
| Configuration conf = HBaseConfiguration.create(); |
| Connection connection = ConnectionFactory.createConnection(conf); |
| Admin admin = connection.getAdmin(); |
| admin.disableTable(tableName); |
| HTableDescriptor hTableDescriptor = new HTableDescriptor(tableName); |
| HColumnDescriptor columnFamily1 = new HColumnDescriptor("personalDet"); |
| columnFamily1.setMaxVersions(3); |
| hTableDescriptor.addFamily(columnFamily1); |
| HColumnDescriptor columnFamily2 = new HColumnDescriptor("salaryDet"); |
| columnFamily2.setMaxVersions(3); |
| hTableDescriptor.addFamily(columnFamily2); |
| admin.modifyTable(tableName, hTableDescriptor); |
| admin.enableTable(tableName); |
| ---- |
| |
| In HBase 0.96 and newer, you can instead use the `removeCoprocessor()` method of the |
| `HTableDescriptor` class. |
| |
| |
| [[cp_example]] |
| == Examples |
| HBase ships examples for Observer Coprocessor in |
| link:http://hbase.apache.org/xref/org/apache/hadoop/hbase/coprocessor/example/ZooKeeperScanPolicyObserver.html[ZooKeeperScanPolicyObserver] |
| and for Endpoint Coprocessor in |
| link:http://hbase.apache.org/xref/org/apache/hadoop/hbase/coprocessor/example/RowCountEndpoint.html[RowCountEndpoint] |
| |
| A more detailed example is given below. |
| |
| These examples assume a table called `users`, which has two column families `personalDet` |
| and `salaryDet`, containing personal and salary details. Below is the graphical representation |
| of the `users` table. |
| |
| .Users Table |
| [width="100%",cols="7",options="header,footer"] |
| |==================== |
| | 3+|personalDet 3+|salaryDet |
| |*rowkey* |*name* |*lastname* |*dob* |*gross* |*net* |*allowances* |
| |admin |Admin |Admin | 3+| |
| |cdickens |Charles |Dickens |02/07/1812 |10000 |8000 |2000 |
| |jverne |Jules |Verne |02/08/1828 |12000 |9000 |3000 |
| |==================== |
| |
| |
| === Observer Example |
| |
| The following Observer coprocessor prevents the details of the user `admin` from being |
| returned in a `Get` or `Scan` of the `users` table. |
| |
| . Write a class that extends the |
| link:https://hbase.apache.org/devapidocs/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.html[BaseRegionObserver] |
| class. |
| |
| . Override the `preGetOp()` method (the `preGet()` method is deprecated) to check |
| whether the client has queried for the rowkey with value `admin`. If so, return an |
| empty result. Otherwise, process the request as normal. |
| |
| . Put your code and dependencies in a JAR file. |
| |
| . Place the JAR in HDFS where HBase can locate it. |
| |
| . Load the Coprocessor. |
| |
| . Write a simple program to test it. |
| |
| Following are the implementation of the above steps: |
| |
| |
| [source,java] |
| ---- |
| public class RegionObserverExample extends BaseRegionObserver { |
| |
| private static final byte[] ADMIN = Bytes.toBytes("admin"); |
| private static final byte[] COLUMN_FAMILY = Bytes.toBytes("details"); |
| private static final byte[] COLUMN = Bytes.toBytes("Admin_det"); |
| private static final byte[] VALUE = Bytes.toBytes("You can't see Admin details"); |
| |
| @Override |
| public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e, final Get get, final List<Cell> results) |
| throws IOException { |
| |
| if (Bytes.equals(get.getRow(),ADMIN)) { |
| Cell c = CellUtil.createCell(get.getRow(),COLUMN_FAMILY, COLUMN, |
| System.currentTimeMillis(), (byte)4, VALUE); |
| results.add(c); |
| e.bypass(); |
| } |
| } |
| } |
| ---- |
| |
| Overriding the `preGetOp()` will only work for `Get` operations. You also need to override |
| the `preScannerOpen()` method to filter the `admin` row from scan results. |
| |
| [source,java] |
| ---- |
| @Override |
| public RegionScanner preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> e, final Scan scan, |
| final RegionScanner s) throws IOException { |
| |
| Filter filter = new RowFilter(CompareOp.NOT_EQUAL, new BinaryComparator(ADMIN)); |
| scan.setFilter(filter); |
| return s; |
| } |
| ---- |
| |
| This method works but there is a _side effect_. If the client has used a filter in |
| its scan, that filter will be replaced by this filter. Instead, you can explicitly |
| remove any `admin` results from the scan: |
| |
| [source,java] |
| ---- |
| @Override |
| public boolean postScannerNext(final ObserverContext<RegionCoprocessorEnvironment> e, final InternalScanner s, |
| final List<Result> results, final int limit, final boolean hasMore) throws IOException { |
| Result result = null; |
| Iterator<Result> iterator = results.iterator(); |
| while (iterator.hasNext()) { |
| result = iterator.next(); |
| if (Bytes.equals(result.getRow(), ROWKEY)) { |
| iterator.remove(); |
| break; |
| } |
| } |
| return hasMore; |
| } |
| ---- |
| |
| === Endpoint Example |
| |
| Still using the `users` table, this example implements a coprocessor to calculate |
| the sum of all employee salaries, using an endpoint coprocessor. |
| |
| . Create a '.proto' file defining your service. |
| + |
| [source] |
| ---- |
| option java_package = "org.myname.hbase.coprocessor.autogenerated"; |
| option java_outer_classname = "Sum"; |
| option java_generic_services = true; |
| option java_generate_equals_and_hash = true; |
| option optimize_for = SPEED; |
| message SumRequest { |
| required string family = 1; |
| required string column = 2; |
| } |
| |
| message SumResponse { |
| required int64 sum = 1 [default = 0]; |
| } |
| |
| service SumService { |
| rpc getSum(SumRequest) |
| returns (SumResponse); |
| } |
| ---- |
| |
| . Execute the `protoc` command to generate the Java code from the above .proto' file. |
| + |
| [source] |
| ---- |
| $ mkdir src |
| $ protoc --java_out=src ./sum.proto |
| ---- |
| + |
| This will generate a class call `Sum.java`. |
| |
| . Write a class that extends the generated service class, implement the `Coprocessor` |
| and `CoprocessorService` classes, and override the service method. |
| + |
| WARNING: If you load a coprocessor from `hbase-site.xml` and then load the same coprocessor |
| again using HBase Shell, it will be loaded a second time. The same class will |
| exist twice, and the second instance will have a higher ID (and thus a lower priority). |
| The effect is that the duplicate coprocessor is effectively ignored. |
| + |
| [source, java] |
| ---- |
| public class SumEndPoint extends SumService implements Coprocessor, CoprocessorService { |
| |
| private RegionCoprocessorEnvironment env; |
| |
| @Override |
| public Service getService() { |
| return this; |
| } |
| |
| @Override |
| public void start(CoprocessorEnvironment env) throws IOException { |
| if (env instanceof RegionCoprocessorEnvironment) { |
| this.env = (RegionCoprocessorEnvironment)env; |
| } else { |
| throw new CoprocessorException("Must be loaded on a table region!"); |
| } |
| } |
| |
| @Override |
| public void stop(CoprocessorEnvironment env) throws IOException { |
| // do mothing |
| } |
| |
| @Override |
| public void getSum(RpcController controller, SumRequest request, RpcCallback done) { |
| Scan scan = new Scan(); |
| scan.addFamily(Bytes.toBytes(request.getFamily())); |
| scan.addColumn(Bytes.toBytes(request.getFamily()), Bytes.toBytes(request.getColumn())); |
| SumResponse response = null; |
| InternalScanner scanner = null; |
| try { |
| scanner = env.getRegion().getScanner(scan); |
| List results = new ArrayList(); |
| boolean hasMore = false; |
| long sum = 0L; |
| do { |
| hasMore = scanner.next(results); |
| for (Cell cell : results) { |
| sum = sum + Bytes.toLong(CellUtil.cloneValue(cell)); |
| } |
| results.clear(); |
| } while (hasMore); |
| |
| response = SumResponse.newBuilder().setSum(sum).build(); |
| |
| } catch (IOException ioe) { |
| ResponseConverter.setControllerException(controller, ioe); |
| } finally { |
| if (scanner != null) { |
| try { |
| scanner.close(); |
| } catch (IOException ignored) {} |
| } |
| } |
| done.run(response); |
| } |
| } |
| ---- |
| + |
| [source, java] |
| ---- |
| Configuration conf = HBaseConfiguration.create(); |
| // Use below code for HBase version 1.x.x or above. |
| Connection connection = ConnectionFactory.createConnection(conf); |
| TableName tableName = TableName.valueOf("users"); |
| Table table = connection.getTable(tableName); |
| |
| //Use below code HBase version 0.98.xx or below. |
| //HConnection connection = HConnectionManager.createConnection(conf); |
| //HTableInterface table = connection.getTable("users"); |
| |
| final SumRequest request = SumRequest.newBuilder().setFamily("salaryDet").setColumn("gross") |
| .build(); |
| try { |
| Map<byte[], Long> results = table.CoprocessorService (SumService.class, null, null, |
| new Batch.Call<SumService, Long>() { |
| @Override |
| public Long call(SumService aggregate) throws IOException { |
| BlockingRpcCallback rpcCallback = new BlockingRpcCallback(); |
| aggregate.getSum(null, request, rpcCallback); |
| SumResponse response = rpcCallback.get(); |
| return response.hasSum() ? response.getSum() : 0L; |
| } |
| }); |
| for (Long sum : results.values()) { |
| System.out.println("Sum = " + sum); |
| } |
| } catch (ServiceException e) { |
| e.printStackTrace(); |
| } catch (Throwable e) { |
| e.printStackTrace(); |
| } |
| ---- |
| |
| . Load the Coprocessor. |
| |
| . Write a client code to call the Coprocessor. |
| |
| |
| == Guidelines For Deploying A Coprocessor |
| |
| Bundling Coprocessors:: |
| You can bundle all classes for a coprocessor into a |
| single JAR on the RegionServer's classpath, for easy deployment. Otherwise, |
| place all dependencies on the RegionServer's classpath so that they can be |
| loaded during RegionServer start-up. The classpath for a RegionServer is set |
| in the RegionServer's `hbase-env.sh` file. |
| Automating Deployment:: |
| You can use a tool such as Puppet, Chef, or |
| Ansible to ship the JAR for the coprocessor to the required location on your |
| RegionServers' filesystems and restart each RegionServer, to automate |
| coprocessor deployment. Details for such set-ups are out of scope of this |
| document. |
| Updating a Coprocessor:: |
| Deploying a new version of a given coprocessor is not as simple as disabling it, |
| replacing the JAR, and re-enabling the coprocessor. This is because you cannot |
| reload a class in a JVM unless you delete all the current references to it. |
| Since the current JVM has reference to the existing coprocessor, you must restart |
| the JVM, by restarting the RegionServer, in order to replace it. This behavior |
| is not expected to change. |
| Coprocessor Logging:: |
| The Coprocessor framework does not provide an API for logging beyond standard Java |
| logging. |
| Coprocessor Configuration:: |
| If you do not want to load coprocessors from the HBase Shell, you can add their configuration |
| properties to `hbase-site.xml`. In <<load_coprocessor_in_shell>>, two arguments are |
| set: `arg1=1,arg2=2`. These could have been added to `hbase-site.xml` as follows: |
| [source,xml] |
| ---- |
| <property> |
| <name>arg1</name> |
| <value>1</value> |
| </property> |
| <property> |
| <name>arg2</name> |
| <value>2</value> |
| </property> |
| ---- |
| Then you can read the configuration using code like the following: |
| [source,java] |
| ---- |
| Configuration conf = HBaseConfiguration.create(); |
| // Use below code for HBase version 1.x.x or above. |
| Connection connection = ConnectionFactory.createConnection(conf); |
| TableName tableName = TableName.valueOf("users"); |
| Table table = connection.getTable(tableName); |
| |
| //Use below code HBase version 0.98.xx or below. |
| //HConnection connection = HConnectionManager.createConnection(conf); |
| //HTableInterface table = connection.getTable("users"); |
| |
| Get get = new Get(Bytes.toBytes("admin")); |
| Result result = table.get(get); |
| for (Cell c : result.rawCells()) { |
| System.out.println(Bytes.toString(CellUtil.cloneRow(c)) |
| + "==> " + Bytes.toString(CellUtil.cloneFamily(c)) |
| + "{" + Bytes.toString(CellUtil.cloneQualifier(c)) |
| + ":" + Bytes.toLong(CellUtil.cloneValue(c)) + "}"); |
| } |
| Scan scan = new Scan(); |
| ResultScanner scanner = table.getScanner(scan); |
| for (Result res : scanner) { |
| for (Cell c : res.rawCells()) { |
| System.out.println(Bytes.toString(CellUtil.cloneRow(c)) |
| + " ==> " + Bytes.toString(CellUtil.cloneFamily(c)) |
| + " {" + Bytes.toString(CellUtil.cloneQualifier(c)) |
| + ":" + Bytes.toLong(CellUtil.cloneValue(c)) |
| + "}"); |
| } |
| } |
| ---- |
| |
| |
| |
| |
| == Monitor Time Spent in Coprocessors |
| |
| HBase 0.98.5 introduced the ability to monitor some statistics relating to the amount of time |
| spent executing a given Coprocessor. |
| You can see these statistics via the HBase Metrics framework (see <<hbase_metrics>> or the Web UI |
| for a given Region Server, via the _Coprocessor Metrics_ tab. |
| These statistics are valuable for debugging and benchmarking the performance impact of a given |
| Coprocessor on your cluster. |
| Tracked statistics include min, max, average, and 90th, 95th, and 99th percentile. |
| All times are shown in milliseconds. |
| The statistics are calculated over Coprocessor execution samples recorded during the reporting |
| interval, which is 10 seconds by default. |
| The metrics sampling rate as described in <<hbase_metrics>>. |
| |
| .Coprocessor Metrics UI |
| image::coprocessor_stats.png[] |