| --- |
| title: Storm HBase Integration |
| layout: documentation |
| documentation: true |
| --- |
| |
| Storm/Trident integration for [Apache HBase](https://hbase.apache.org) |
| |
| ## Usage |
| The main API for interacting with HBase is the `org.apache.storm.hbase.bolt.mapper.HBaseMapper` |
| interface: |
| |
| ```java |
| public interface HBaseMapper extends Serializable { |
| byte[] rowKey(Tuple tuple); |
| |
| ColumnList columns(Tuple tuple); |
| } |
| ``` |
| |
| The `rowKey()` method is straightforward: given a Storm tuple, return a byte array representing the |
| row key. |
| |
| The `columns()` method defines what will be written to an HBase row. The `ColumnList` class allows you |
| to add both standard HBase columns as well as HBase counter columns. |
| |
| To add a standard column, use one of the `addColumn()` methods: |
| |
| ```java |
| ColumnList cols = new ColumnList(); |
| cols.addColumn(this.columnFamily, field.getBytes(), toBytes(tuple.getValueByField(field))); |
| ``` |
| |
| To add a counter column, use one of the `addCounter()` methods: |
| |
| ```java |
| ColumnList cols = new ColumnList(); |
| cols.addCounter(this.columnFamily, field.getBytes(), toLong(tuple.getValueByField(field))); |
| ``` |
| |
| When the remote HBase is security enabled, a kerberos keytab and the corresponding principal name need to be |
| provided for the storm-hbase connector. Specifically, the Config object passed into the topology should contain |
| {(“storm.keytab.file”, “$keytab”), ("storm.kerberos.principal", “$principal”)}. Example: |
| |
| ```java |
| Config config = new Config(); |
| ... |
| config.put("storm.keytab.file", "$keytab"); |
| config.put("storm.kerberos.principal", "$principal"); |
| StormSubmitter.submitTopology("$topologyName", config, builder.createTopology()); |
| ``` |
| |
| ##Working with Secure HBASE using delegation tokens. |
| If your topology is going to interact with secure HBase, your bolts/states needs to be authenticated by HBase. |
| The approach described above requires that all potential worker hosts have "storm.keytab.file" on them. If you have |
| multiple topologies on a cluster , each with different hbase user, you will have to create multiple keytabs and distribute |
| it to all workers. Instead of doing that you could use the following approach: |
| |
| Your administrator can configure nimbus to automatically get delegation tokens on behalf of the topology submitter user. The nimbus should be started with following configurations: |
| |
| ``` |
| nimbus.autocredential.plugins.classes : ["org.apache.storm.hbase.security.AutoHBase"] |
| nimbus.credential.renewers.classes : ["org.apache.storm.hbase.security.AutoHBase"] |
| hbase.keytab.file: "/path/to/keytab/on/nimbus" (This is the keytab of hbase super user that can impersonate other users.) |
| hbase.kerberos.principal: "superuser@EXAMPLE.com" |
| nimbus.credential.renewers.freq.secs : 518400 (6 days, hbase tokens by default expire every 7 days and can not be renewed, if you have custom settings for hbase.auth.token.max.lifetime in hbase-site.xml than you should ensure this value is atleast 1 hour less then that.) |
| ``` |
| |
| Your topology configuration should have: |
| |
| ``` |
| topology.auto-credentials :["org.apache.storm.hbase.security.AutoHBase"] |
| ``` |
| |
| If nimbus did not have the above configuration you need to add it and then restart it. Ensure the hbase configuration |
| files(core-site.xml, hdfs-site.xml and hbase-site.xml) and the storm-hbase jar with all the dependencies is present in nimbus's classpath. |
| |
| As an alternative to adding the configuration files (core-site.xml, hdfs-site.xml and hbase-site.xml) to the classpath, you could specify the configurations as a part of the topology configuration. E.g. in you custom storm.yaml (or -c option while submitting the topology), |
| |
| ``` |
| hbaseCredentialsConfigKeys : ["cluster1", "cluster2"] (the hbase clusters you want to fetch the tokens from) |
| "cluster1": {"config1": "value1", "config2": "value2", ... } (A map of config key-values specific to cluster1) |
| "cluster2": {"config1": "value1", "hbase.keytab.file": "/path/to/keytab/for/cluster2/on/nimubs", "hbase.kerberos.principal": "cluster2user@EXAMPLE.com"} (here along with other configs, we have custom keytab and principal for "cluster2" which will override the keytab/principal specified at topology level) |
| ``` |
| |
| Instead of specifying key values you may also directly specify the resource files for e.g., |
| |
| ``` |
| "cluster1": {"resources": ["/path/to/core-site1.xml", "/path/to/hbase-site1.xml"]} |
| "cluster2": {"resources": ["/path/to/core-site2.xml", "/path/to/hbase-site2.xml"]} |
| ``` |
| |
| Storm will download the tokens separately for each of the clusters and populate it into the subject and also renew the tokens periodically. |
| This way it would be possible to run multiple bolts connecting to separate HBase cluster within the same topology. |
| |
| Nimbus will use the keytab and principal specified in the config to authenticate with HBase. From then on for every |
| topology submission, nimbus will impersonate the topology submitter user and acquire delegation tokens on behalf of the |
| topology submitter user. If topology was started with topology.auto-credentials set to AutoHBase, nimbus will push the |
| delegation tokens to all the workers for your topology and the hbase bolt/state will authenticate with these tokens. |
| |
| As nimbus is impersonating topology submitter user, you need to ensure the user specified in hbase.kerberos.principal |
| has permissions to acquire tokens on behalf of other users. To achieve this you need to follow configuration directions |
| listed on this link |
| |
| http://hbase.apache.org/book/security.html#security.rest.gateway |
| |
| You can read about setting up secure HBase here:http://hbase.apache.org/book/security.html. |
| |
| ### SimpleHBaseMapper |
| `storm-hbase` includes a general purpose `HBaseMapper` implementation called `SimpleHBaseMapper` that can map Storm |
| tuples to both regular HBase columns as well as counter columns. |
| |
| To use `SimpleHBaseMapper`, you simply tell it which fields to map to which types of columns. |
| |
| The following code create a `SimpleHBaseMapper` instance that: |
| |
| 1. Uses the `word` tuple value as a row key. |
| 2. Adds a standard HBase column for the tuple field `word`. |
| 3. Adds an HBase counter column for the tuple field `count`. |
| 4. Writes values to the `cf` column family. |
| |
| ```java |
| SimpleHBaseMapper mapper = new SimpleHBaseMapper() |
| .withRowKeyField("word") |
| .withColumnFields(new Fields("word")) |
| .withCounterFields(new Fields("count")) |
| .withColumnFamily("cf"); |
| ``` |
| ### HBaseBolt |
| To use the `HBaseBolt`, construct it with the name of the table to write to, an a `HBaseMapper` implementation: |
| |
| ```java |
| HBaseBolt hbase = new HBaseBolt("WordCount", mapper); |
| ``` |
| |
| HBaseBolt params |
| |
| |Arg |Description | Type | Default | |
| |--- |--- |--- |
| |writeToWAL | To turn Durability SYNC_WAL or SKIP_WAL | Boolean (Optional) | True | |
| |configKey | Any Hbase related configs | Map (Optional) | | |
| |batchSize | Max no.of Tuples batched together to write to HBase | Int (Optional) | 15000 | |
| |flushIntervalSecs| (In seconds) If > 0 HBase Bolt will periodically flush transaction batches. Enabling this is recommended to avoid tuple timeouts while waiting for a batch to fill up. | Int (Optional) | 0 | |
| |
| |
| The `HBaseBolt` will delegate to the `mapper` instance to figure out how to persist tuple data to HBase. |
| |
| ###HBaseValueMapper |
| This class allows you to transform the HBase lookup result into storm Values that will be emitted by the `HBaseLookupBolt`. |
| |
| ```java |
| public interface HBaseValueMapper extends Serializable { |
| public List<Values> toTuples(Result result) throws Exception; |
| void declareOutputFields(OutputFieldsDeclarer declarer); |
| } |
| ``` |
| |
| The `toTuples` method takes in a HBase `Result` instance and expects a List of `Values` instant. |
| Each of the value returned by this function will be emitted by the `HBaseLookupBolt`. |
| |
| The `declareOutputFields` should be used to declare the outputFields of the `HBaseLookupBolt`. |
| |
| There is an example implementation in `examples/storm-hbase-examples/src/main/java` directory. |
| |
| ###HBaseProjectionCriteria |
| This class allows you to specify the projection criteria for your HBase Get function. This is optional parameter |
| for the lookupBolt and if you do not specify this instance all the columns will be returned by `HBaseLookupBolt`. |
| |
| ```java |
| public class HBaseProjectionCriteria implements Serializable { |
| public HBaseProjectionCriteria addColumnFamily(String columnFamily); |
| public HBaseProjectionCriteria addColumn(ColumnMetaData column); |
| ``` |
| `addColumnFamily` takes in columnFamily. Setting this parameter means all columns for this family will be included |
| in the projection. |
| |
| `addColumn` takes in a columnMetaData instance. Setting this parameter means only this column from the column familty |
| will be part of your projection. |
| The following code creates a projectionCriteria which specifies a projection criteria that: |
| |
| 1. includes count column from column family cf. |
| 2. includes all columns from column family cf2. |
| |
| ```java |
| HBaseProjectionCriteria projectionCriteria = new HBaseProjectionCriteria() |
| .addColumn(new HBaseProjectionCriteria.ColumnMetaData("cf", "count")) |
| .addColumnFamily("cf2"); |
| ``` |
| |
| ###HBaseLookupBolt |
| To use the `HBaseLookupBolt`, Construct it with the name of the table to write to, an implementation of `HBaseMapper` |
| and an implementation of `HBaseRowToStormValueMapper`. You can optionally specify a `HBaseProjectionCriteria`. |
| |
| The `HBaseLookupBolt` will use the mapper to get rowKey to lookup for. It will use the `HBaseProjectionCriteria` to |
| figure out which columns to include in the result and it will leverage the `HBaseRowToStormValueMapper` to get the |
| values to be emitted by the bolt. |
| |
| In addition, the `HBaseLookupBolt` supports bolt-side HBase result caching using an in-memory LRU cache using Caffeine. To enable caching: |
| |
| `hbase.cache.enable` - to enable caching (default false) |
| |
| `hbase.cache.ttl.seconds` - set time to live for LRU cache in seconds (default 300) |
| |
| `hbase.cache.size` - set size of the cache (default 1000) |
| |
| You can look at an example topology LookupWordCount.java under `examples/storm-hbase-examples/src/main/java`. |
| ## Example: Persistent Word Count |
| A runnable example can be found in the `examples/storm-hbase-examples/src/main/java` directory. |
| |
| ### Setup |
| The following steps assume you are running HBase locally, or there is an `hbase-site.xml` on the |
| classpath pointing to your HBase cluster. |
| |
| Use the `hbase shell` command to create the schema: |
| |
| ``` |
| > create 'WordCount', 'cf' |
| ``` |
| |
| ### Execution |
| Run the `org.apache.storm.hbase.topology.PersistenWordCount` class (it will run the topology for 10 seconds, then exit). |
| |
| After (or while) the word count topology is running, run the `org.apache.storm.hbase.topology.WordCountClient` class |
| to view the counter values stored in HBase. You should see something like to following: |
| |
| ``` |
| Word: 'apple', Count: 6867 |
| Word: 'orange', Count: 6645 |
| Word: 'pineapple', Count: 6954 |
| Word: 'banana', Count: 6787 |
| Word: 'watermelon', Count: 6806 |
| ``` |
| |
| For reference, the sample topology is listed below: |
| |
| ```java |
| public class PersistentWordCount { |
| private static final String WORD_SPOUT = "WORD_SPOUT"; |
| private static final String COUNT_BOLT = "COUNT_BOLT"; |
| private static final String HBASE_BOLT = "HBASE_BOLT"; |
| |
| |
| public static void main(String[] args) throws Exception { |
| Config config = new Config(); |
| |
| Map<String, Object> hbConf = new HashMap<String, Object>(); |
| if(args.length > 0){ |
| hbConf.put("hbase.rootdir", args[0]); |
| } |
| config.put("hbase.conf", hbConf); |
| |
| WordSpout spout = new WordSpout(); |
| WordCounter bolt = new WordCounter(); |
| |
| SimpleHBaseMapper mapper = new SimpleHBaseMapper() |
| .withRowKeyField("word") |
| .withColumnFields(new Fields("word")) |
| .withCounterFields(new Fields("count")) |
| .withColumnFamily("cf"); |
| |
| HBaseBolt hbase = new HBaseBolt("WordCount", mapper) |
| .withConfigKey("hbase.conf"); |
| |
| |
| // wordSpout ==> countBolt ==> HBaseBolt |
| TopologyBuilder builder = new TopologyBuilder(); |
| |
| builder.setSpout(WORD_SPOUT, spout, 1); |
| builder.setBolt(COUNT_BOLT, bolt, 1).shuffleGrouping(WORD_SPOUT); |
| builder.setBolt(HBASE_BOLT, hbase, 1).fieldsGrouping(COUNT_BOLT, new Fields("word")); |
| |
| String topoName = "test"; |
| if (args.length > 1) { |
| topoName = args[1]; |
| } |
| if (args.length == 4) { |
| System.out.println("hdfs url: " + args[0] + ", keytab file: " + args[2] + |
| ", principal name: " + args[3] + ", toplogy name: " + topoName); |
| hbConf.put(HBaseSecurityUtil.STORM_KEYTAB_FILE_KEY, args[2]); |
| hbConf.put(HBaseSecurityUtil.STORM_USER_NAME_KEY, args[3]); |
| } else if (args.length == 3 || args.length > 4) { |
| System.out.println("Usage: PersistentWordCount <hbase.rootdir> [topology name] [keytab file] [principal name]"); |
| return; |
| } |
| config.setNumWorkers(3); |
| StormSubmitter.submitTopology(topoName, config, builder.createTopology()); |
| } |
| } |
| ``` |
| |