| // Licensed to the Apache Software Foundation (ASF) under one or more |
| // contributor license agreements. See the NOTICE file distributed with |
| // this work for additional information regarding copyright ownership. |
| // The ASF licenses this file to You under the Apache License, Version 2.0 |
| // (the "License"); you may not use this file except in compliance with |
| // the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| == Analytics |
| |
| Accumulo supports more advanced data processing than simply keeping keys |
| sorted and performing efficient lookups. Analytics can be developed by using |
| MapReduce and Iterators in conjunction with Accumulo tables. |
| |
| === MapReduce |
| |
| Accumulo tables can be used as the source and destination of MapReduce jobs. To |
| use an Accumulo table with a MapReduce job (specifically with the new Hadoop API |
| as of version 0.20), configure the job parameters to use the AccumuloInputFormat |
| and AccumuloOutputFormat. Accumulo specific parameters can be set via these |
| two format classes to do the following: |
| |
| * Authenticate and provide user credentials for the input |
| * Restrict the scan to a range of rows |
| * Restrict the input to a subset of available columns |
| |
| ==== Mapper and Reducer classes |
| |
| To read from an Accumulo table create a Mapper with the following class |
| parameterization and be sure to configure the AccumuloInputFormat. |
| |
| [source,java] |
| class MyMapper extends Mapper<Key,Value,WritableComparable,Writable> { |
| public void map(Key k, Value v, Context c) { |
| // transform key and value data here |
| } |
| } |
| |
| To write to an Accumulo table, create a Reducer with the following class |
| parameterization and be sure to configure the AccumuloOutputFormat. The key |
| emitted from the Reducer identifies the table to which the mutation is sent. This |
| allows a single Reducer to write to more than one table if desired. A default table |
| can be configured using the AccumuloOutputFormat, in which case the output table |
| name does not have to be passed to the Context object within the Reducer. |
| |
| [source,java] |
| class MyReducer extends Reducer<WritableComparable, Writable, Text, Mutation> { |
| public void reduce(WritableComparable key, Iterable<Text> values, Context c) { |
| Mutation m; |
| // create the mutation based on input key and value |
| c.write(new Text("output-table"), m); |
| } |
| } |
| |
| The Text object passed as the output should contain the name of the table to which |
| this mutation should be applied. The Text can be null in which case the mutation |
| will be applied to the default table name specified in the AccumuloOutputFormat |
| options. |
| |
| ==== AccumuloInputFormat options |
| |
| [source,java] |
| ---- |
| Job job = new Job(getConf()); |
| AccumuloInputFormat.setInputInfo(job, |
| "user", |
| "passwd".getBytes(), |
| "table", |
| new Authorizations()); |
| |
| AccumuloInputFormat.setZooKeeperInstance(job, "myinstance", |
| "zooserver-one,zooserver-two"); |
| ---- |
| |
| *Optional Settings:* |
| |
| To restrict Accumulo to a set of row ranges: |
| |
| [source,java] |
| ArrayList<Range> ranges = new ArrayList<Range>(); |
| // populate array list of row ranges ... |
| AccumuloInputFormat.setRanges(job, ranges); |
| |
| To restrict Accumulo to a list of columns: |
| |
| [source,java] |
| ArrayList<Pair<Text,Text>> columns = new ArrayList<Pair<Text,Text>>(); |
| // populate list of columns |
| AccumuloInputFormat.fetchColumns(job, columns); |
| |
| To use a regular expression to match row IDs: |
| |
| [source,java] |
| IteratorSetting is = new IteratorSetting(30, RexExFilter.class); |
| RegExFilter.setRegexs(is, ".*suffix", null, null, null, true); |
| AccumuloInputFormat.addIterator(job, is); |
| |
| ==== AccumuloMultiTableInputFormat options |
| |
| The AccumuloMultiTableInputFormat allows the scanning over multiple tables |
| in a single MapReduce job. Separate ranges, columns, and iterators can be |
| used for each table. |
| |
| [source,java] |
| InputTableConfig tableOneConfig = new InputTableConfig(); |
| InputTableConfig tableTwoConfig = new InputTableConfig(); |
| |
| To set the configuration objects on the job: |
| |
| [source,java] |
| Map<String, InputTableConfig> configs = new HashMap<String,InputTableConfig>(); |
| configs.put("table1", tableOneConfig); |
| configs.put("table2", tableTwoConfig); |
| AccumuloMultiTableInputFormat.setInputTableConfigs(job, configs); |
| |
| *Optional settings:* |
| |
| To restrict to a set of ranges: |
| |
| [source,java] |
| ArrayList<Range> tableOneRanges = new ArrayList<Range>(); |
| ArrayList<Range> tableTwoRanges = new ArrayList<Range>(); |
| // populate array lists of row ranges for tables... |
| tableOneConfig.setRanges(tableOneRanges); |
| tableTwoConfig.setRanges(tableTwoRanges); |
| |
| To restrict Accumulo to a list of columns: |
| |
| [source,java] |
| ArrayList<Pair<Text,Text>> tableOneColumns = new ArrayList<Pair<Text,Text>>(); |
| ArrayList<Pair<Text,Text>> tableTwoColumns = new ArrayList<Pair<Text,Text>>(); |
| // populate lists of columns for each of the tables ... |
| tableOneConfig.fetchColumns(tableOneColumns); |
| tableTwoConfig.fetchColumns(tableTwoColumns); |
| |
| To set scan iterators: |
| |
| [source,java] |
| List<IteratorSetting> tableOneIterators = new ArrayList<IteratorSetting>(); |
| List<IteratorSetting> tableTwoIterators = new ArrayList<IteratorSetting>(); |
| // populate the lists of iterator settings for each of the tables ... |
| tableOneConfig.setIterators(tableOneIterators); |
| tableTwoConfig.setIterators(tableTwoIterators); |
| |
| |
| The name of the table can be retrieved from the input split: |
| |
| [source,java] |
| class MyMapper extends Mapper<Key,Value,WritableComparable,Writable> { |
| public void map(Key k, Value v, Context c) { |
| RangeInputSplit split = (RangeInputSplit)c.getInputSplit(); |
| String tableName = split.getTableName(); |
| // do something with table name |
| } |
| } |
| |
| |
| ==== AccumuloOutputFormat options |
| |
| [source,java] |
| ---- |
| boolean createTables = true; |
| String defaultTable = "mytable"; |
| |
| AccumuloOutputFormat.setOutputInfo(job, |
| "user", |
| "passwd".getBytes(), |
| createTables, |
| defaultTable); |
| |
| AccumuloOutputFormat.setZooKeeperInstance(job, "myinstance", |
| "zooserver-one,zooserver-two"); |
| ---- |
| |
| *Optional Settings:* |
| |
| [source,java] |
| AccumuloOutputFormat.setMaxLatency(job, 300000); // milliseconds |
| AccumuloOutputFormat.setMaxMutationBufferSize(job, 50000000); // bytes |
| |
| An example of using MapReduce with Accumulo can be found at |
| +accumulo/docs/examples/README.mapred+. |
| |
| === Combiners |
| |
| Many applications can benefit from the ability to aggregate values across common |
| keys. This can be done via Combiner iterators and is similar to the Reduce step in |
| MapReduce. This provides the ability to define online, incrementally updated |
| analytics without the overhead or latency associated with batch-oriented |
| MapReduce jobs. |
| |
| All that is needed to aggregate values of a table is to identify the fields over which |
| values will be grouped, insert mutations with those fields as the key, and configure |
| the table with a combining iterator that supports the summarizing operation |
| desired. |
| |
| The only restriction on an combining iterator is that the combiner developer |
| should not assume that all values for a given key have been seen, since new |
| mutations can be inserted at anytime. This precludes using the total number of |
| values in the aggregation such as when calculating an average, for example. |
| |
| ==== Feature Vectors |
| |
| An interesting use of combining iterators within an Accumulo table is to store |
| feature vectors for use in machine learning algorithms. For example, many |
| algorithms such as k-means clustering, support vector machines, anomaly detection, |
| etc. use the concept of a feature vector and the calculation of distance metrics to |
| learn a particular model. The columns in an Accumulo table can be used to efficiently |
| store sparse features and their weights to be incrementally updated via the use of an |
| combining iterator. |
| |
| === Statistical Modeling |
| |
| Statistical models that need to be updated by many machines in parallel could be |
| similarly stored within an Accumulo table. For example, a MapReduce job that is |
| iteratively updating a global statistical model could have each map or reduce worker |
| reference the parts of the model to be read and updated through an embedded |
| Accumulo client. |
| |
| Using Accumulo this way enables efficient and fast lookups and updates of small |
| pieces of information in a random access pattern, which is complementary to |
| MapReduce's sequential access model. |