blob: fc95359554f40a9177668316080cfd0167732317 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.crunch.examples;
import java.io.IOException;
import java.io.Serializable;
import java.util.Arrays;
import java.util.List;
import com.google.common.collect.Lists;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.crunch.DoFn;
import org.apache.crunch.Emitter;
import org.apache.crunch.PCollection;
import org.apache.crunch.PTable;
import org.apache.crunch.Pair;
import org.apache.crunch.Pipeline;
import org.apache.crunch.fn.Aggregators;
import org.apache.crunch.impl.mr.MRPipeline;
import org.apache.crunch.io.hbase.HBaseSourceTarget;
import org.apache.crunch.io.hbase.HBaseTarget;
import org.apache.crunch.io.hbase.HBaseTypes;
import org.apache.crunch.types.writable.Writables;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
* You need to have a HBase instance running. Required dependencies : hbase /!\
* The version should be your version of hbase. <dependency>
* <groupId>org.apache.hbase</groupId> <artifactId>hbase</artifactId>
* <version>...</version> </dependency>
*/
@SuppressWarnings("serial")
public class WordAggregationHBase extends Configured implements Tool, Serializable {
private static final Log LOG = LogFactory.getLog(WordAggregationHBase.class);
// Configuration parameters. Here configured for a hbase instance running
// locally
private static final String HBASE_CONFIGURATION_ZOOKEEPER_QUORUM = "hbase.zookeeper.quorum";
private static final String HBASE_CONFIGURATION_ZOOKEEPER_CLIENTPORT = "hbase.zookeeper.property.clientPort";
private static final String hbaseZookeeperQuorum = "localhost";
private static final String hbaseZookeeperClientPort = "2181";
// HBase parameters
private static final String TABLE_SOURCE = "list";
private static final String TABLE_TARGET = "aggregation";
private final byte[] COLUMN_FAMILY_SOURCE = Bytes.toBytes("content");
private final byte[] COLUMN_QUALIFIER_SOURCE_PLAY = Bytes.toBytes("play");
private final byte[] COLUMN_QUALIFIER_SOURCE_QUOTE = Bytes.toBytes("quote");
private final byte[] COLUMN_FAMILY_TARGET = Bytes.toBytes("aggregation");
private final byte[] COLUMN_QUALIFIER_TARGET_TEXT = Bytes.toBytes("text");
@Override
public int run(String[] args) throws Exception {
// We create the test rows first
String type1 = "romeo and juliet";
String type2 = "macbeth";
String quote1 = "That which we call a rose By any other word would smell as sweet";
String quote2 = "But, soft! what light through yonder window breaks? It is the east, and Juliet is the sun.";
String quote3 = "But first, let me tell ye, if you should leadher in a fool's paradise, as they say,";
String quote4 = "Fair is foul, and foul is fair";
String quote5 = "But screw your courage to the sticking-place, And we'll not fail.";
String[] character = { "juliet", "romeo", "nurse", "witch", "macbeth" };
String[] type = { type1, type1, type1, type2, type2 };
String[] quote = { quote1, quote2, quote3, quote4, quote5 };
List<Put> putList = createPuts(Arrays.asList(character), Arrays.asList(type), Arrays.asList(quote));
// We create the tables and fill the source
Configuration configuration = getConf();
createTable(configuration, TABLE_SOURCE, Bytes.toString(COLUMN_FAMILY_SOURCE));
createTable(configuration, TABLE_TARGET, Bytes.toString(COLUMN_FAMILY_TARGET));
putInHbase(putList, configuration);
// We create the pipeline which will handle most of the job.
Pipeline pipeline = new MRPipeline(WordAggregationHBase.class, HBaseConfiguration.create());
// The scan which will retrieve the data from the source in hbase.
Scan scan = new Scan();
scan.addColumn(COLUMN_FAMILY_SOURCE, COLUMN_QUALIFIER_SOURCE_PLAY);
scan.addColumn(COLUMN_FAMILY_SOURCE, COLUMN_QUALIFIER_SOURCE_QUOTE);
// Our hbase source
HBaseSourceTarget source = new HBaseSourceTarget(TABLE_SOURCE, scan);
// Our source, in a format which can be use by crunch
PTable<ImmutableBytesWritable, Result> rawText = pipeline.read(source);
// We process the data from the source HTable then concatenate all data
// with the same rowkey
PTable<String, String> textExtracted = extractText(rawText);
PTable<String, String> result = textExtracted.groupByKey()
.combineValues(Aggregators.STRING_CONCAT(" ", true));
// We create the collection of puts from the concatenated datas
PCollection<Put> resultPut = createPut(result);
// We write the puts in hbase, in the target table
pipeline.write(resultPut, new HBaseTarget(TABLE_TARGET));
pipeline.done();
return 0;
}
/**
* Put the puts in HBase
*
* @param putList the puts
* @param conf the hbase configuration
* @throws IOException
*/
private static void putInHbase(List<Put> putList, Configuration conf) throws IOException {
HTable htable = new HTable(conf, TABLE_SOURCE);
try {
htable.put(putList);
} finally {
htable.close();
}
}
/**
* Create the table if they don't exist
*
* @param conf the hbase configuration
* @param htableName the table name
* @param families the column family names
* @throws MasterNotRunningException
* @throws ZooKeeperConnectionException
* @throws IOException
*/
private static void createTable(Configuration conf, String htableName, String... families) throws MasterNotRunningException, ZooKeeperConnectionException,
IOException {
HBaseAdmin hbase = new HBaseAdmin(conf);
if (!hbase.tableExists(htableName)) {
HTableDescriptor desc = new HTableDescriptor(htableName);
for (String s : families) {
HColumnDescriptor meta = new HColumnDescriptor(s);
desc.addFamily(meta);
}
hbase.createTable(desc);
}
}
/**
* Create a list of puts
*
* @param character the rowkey
* @param play the play (in column COLUMN_QUALIFIER_SOURCE_PLAY)
* @param quote the quote (in column COLUMN_QUALIFIER_SOURCE_QUOTE)
*/
private List<Put> createPuts(List<String> character, List<String> play, List<String> quote) {
List<Put> list = Lists.newArrayList();
if (character.size() != play.size() || quote.size() != play.size()) {
LOG.error("Every list should have the same number of elements");
throw new IllegalArgumentException("Every list should have the same number of elements");
}
for (int i = 0; i < character.size(); i++) {
Put put = new Put(Bytes.toBytes(character.get(i)));
put.add(COLUMN_FAMILY_SOURCE, COLUMN_QUALIFIER_SOURCE_PLAY, Bytes.toBytes(play.get(i)));
put.add(COLUMN_FAMILY_SOURCE, COLUMN_QUALIFIER_SOURCE_QUOTE, Bytes.toBytes(quote.get(i)));
list.add(put);
}
return list;
}
/**
* Extract information from hbase
*
* @param words the source from hbase
* @return a {@code PTable} composed of the type of the input as key
* and its def as value
*/
public PTable<String, String> extractText(PTable<ImmutableBytesWritable, Result> words) {
return words.parallelDo("Extract text", new DoFn<Pair<ImmutableBytesWritable, Result>, Pair<String, String>>() {
@Override
public void process(Pair<ImmutableBytesWritable, Result> row, Emitter<Pair<String, String>> emitter) {
byte[] type = row.second().getValue(COLUMN_FAMILY_SOURCE, COLUMN_QUALIFIER_SOURCE_PLAY);
byte[] def = row.second().getValue(COLUMN_FAMILY_SOURCE, COLUMN_QUALIFIER_SOURCE_QUOTE);
if (type != null && def != null) {
emitter.emit(new Pair<String, String>(Bytes.toString(type), Bytes.toString(def)));
}
}
}, Writables.tableOf(Writables.strings(), Writables.strings()));
}
/**
* Create puts in order to insert them in hbase.
*
* @param extractedText
* a PTable which contain the data in order to create the puts:
* keys of the PTable are rowkeys for the puts, values are the
* values for hbase.
* @return a PCollection formed by the puts.
*/
public PCollection<Put> createPut(PTable<String, String> extractedText) {
return extractedText.parallelDo("Convert to puts", new DoFn<Pair<String, String>, Put>() {
@Override
public void process(Pair<String, String> input, Emitter<Put> emitter) {
Put put = new Put(Bytes.toBytes(input.first()));
put.add(COLUMN_FAMILY_TARGET, COLUMN_QUALIFIER_TARGET_TEXT, Bytes.toBytes(input.second()));
emitter.emit(put);
}
}, HBaseTypes.puts());
}
public static void main(String[] args) throws Exception {
Configuration conf = HBaseConfiguration.create();
// Configuration hbase
conf.set(HBASE_CONFIGURATION_ZOOKEEPER_QUORUM, hbaseZookeeperQuorum);
conf.set(HBASE_CONFIGURATION_ZOOKEEPER_CLIENTPORT, hbaseZookeeperClientPort);
ToolRunner.run(conf, new WordAggregationHBase(), args);
}
}