blob: a9ec20a7e375f81f79d6bb19093e26f06a56bb1c [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.metron.hbase;
import java.io.IOException;
import java.io.Serializable;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Set;
import java.util.TreeMap;
import com.google.common.base.Joiner;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import backtype.storm.tuple.Tuple;
import org.apache.log4j.Logger;
/**
* Configuration for Storm {@link Tuple} to HBase serialization.
*/
@SuppressWarnings("serial")
public class TupleTableConfig extends TableConfig implements Serializable {
private static final Logger LOG = Logger.getLogger(TupleTableConfig.class);
static final long serialVersionUID = -1L;
public static final long DEFAULT_INCREMENT = 1L;
protected String tupleRowKeyField;
protected String tupleTimestampField;
protected Durability durability = Durability.USE_DEFAULT;
private String fields;
/**
* Initialize configuration
*
* @param table
* The HBase table name
* @param rowKeyField
* The {@link Tuple} field used to set the rowKey
*/
public TupleTableConfig(final String table, final String rowKeyField) {
super(table);
this.tupleRowKeyField = rowKeyField;
this.tupleTimestampField = "";
this.columnFamilies = new HashMap<String, Set<String>>();
}
/**
* Initialize configuration
*
* @param table
* The HBase table name
* @param rowKeyField
* The {@link Tuple} field used to set the rowKey
* @param timestampField
* The {@link Tuple} field used to set the timestamp
*/
public TupleTableConfig(final String table, final String rowKeyField, final String timestampField) {
super(table);
this.tupleRowKeyField = rowKeyField;
this.tupleTimestampField = timestampField;
this.columnFamilies = new HashMap<String, Set<String>>();
}
public TupleTableConfig() {
super(null);
this.columnFamilies = new HashMap<String, Set<String>>();
}
public TupleTableConfig withRowKeyField(String rowKeyField) {
this.tupleRowKeyField = rowKeyField;
return this;
}
public TupleTableConfig withTimestampField(String timestampField) {
this.tupleTimestampField = timestampField;
return this;
}
public TupleTableConfig withFields(String fields) {
this.fields = fields;
return this;
}
public String getFields() {
return fields;
}
/**
* Add column family and column qualifier to be extracted from tuple
*
* @param columnFamily
* The column family name
* @param columnQualifier
* The column qualifier name
*/
public void addColumn(final String columnFamily, final String columnQualifier) {
Set<String> columns = this.columnFamilies.get(columnFamily);
if (columns == null) {
columns = new HashSet<String>();
}
columns.add(columnQualifier);
this.columnFamilies.put(columnFamily, columns);
}
/**
* Creates a HBase {@link Put} from a Storm {@link Tuple}
*
* @param tuple
* The {@link Tuple}
* @return {@link Put}
*/
public Put getPutFromTuple(final Tuple tuple) throws IOException{
byte[] rowKey = null;
try {
rowKey = Bytes.toBytes(tuple.getStringByField(tupleRowKeyField));
}
catch(IllegalArgumentException iae) {
throw new IOException("Unable to retrieve " + tupleRowKeyField + " from " + tuple + " [ " + Joiner.on(',').join(tuple.getFields()) + " ]", iae);
}
long ts = 0;
if (!tupleTimestampField.equals("")) {
ts = tuple.getLongByField(tupleTimestampField);
}
Put p = new Put(rowKey);
p.setDurability(durability);
if (columnFamilies.size() > 0) {
for (String cf : columnFamilies.keySet()) {
byte[] cfBytes = Bytes.toBytes(cf);
for (String cq : columnFamilies.get(cf)) {
byte[] cqBytes = Bytes.toBytes(cq);
byte[] val = tuple.getBinaryByField(cq);
if (ts > 0) {
p.add(cfBytes, cqBytes, ts, val);
} else {
p.add(cfBytes, cqBytes, val);
}
}
}
}
return p;
}
/**
* Creates a HBase {@link Increment} from a Storm {@link Tuple}
*
* @param tuple
* The {@link Tuple}
* @param increment
* The amount to increment the counter by
* @return {@link Increment}
*/
public Increment getIncrementFromTuple(final Tuple tuple, final long increment) {
byte[] rowKey = Bytes.toBytes(tuple.getStringByField(tupleRowKeyField));
Increment inc = new Increment(rowKey);
inc.setDurability(durability);
if (columnFamilies.size() > 0) {
for (String cf : columnFamilies.keySet()) {
byte[] cfBytes = Bytes.toBytes(cf);
for (String cq : columnFamilies.get(cf)) {
byte[] val;
try {
val = Bytes.toBytes(tuple.getStringByField(cq));
} catch (IllegalArgumentException ex) {
// if cq isn't a tuple field, use cq for counter instead of tuple
// value
val = Bytes.toBytes(cq);
}
inc.addColumn(cfBytes, val, increment);
}
}
}
return inc;
}
/**
* Increment the counter for the given family and column by the specified
* amount
* <p>
* If the family and column already exist in the Increment the counter value
* is incremented by the specified amount rather than overridden, as it is in
* HBase's {@link Increment#addColumn(byte[], byte[], long)} method
*
* @param inc
* The {@link Increment} to update
* @param family
* The column family
* @param qualifier
* The column qualifier
* @param amount
* The amount to increment the counter by
*/
public static void addIncrement(Increment inc, final byte[] family, final byte[] qualifier, final Long amount) {
NavigableMap<byte[], Long> set = inc.getFamilyMapOfLongs().get(family);
if (set == null) {
set = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
}
// If qualifier exists, increment amount
Long counter = set.get(qualifier);
if (counter == null) {
counter = 0L;
}
set.put(qualifier, amount + counter);
inc.getFamilyMapOfLongs().put(family, set);
}
/**
* @param durability
* Sets whether to write to HBase's edit log.
* <p>
* Setting to false will mean fewer operations to perform when
* writing to HBase and hence better performance, but changes that
* haven't been flushed to a store file will be lost in the event of
* HBase failure
* <p>
* Enabled by default
*/
public void setDurability(Durability durability) {
this.durability = durability;
}
public Durability getDurability() {
return durability;
}
/**
* @return the tupleRowKeyField
*/
public String getTupleRowKeyField() {
return tupleRowKeyField;
}
}