blob: 1211980e9dee6268201900c0ee1c7d59bff357a3 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.pig.backend.hadoop.accumulo;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.SortedMap;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.PartialKey;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.iterators.user.WholeRowIterator;
import org.apache.commons.cli.ParseException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.log4j.Logger;
import org.apache.pig.ResourceSchema.ResourceFieldSchema;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
/**
* Basic PigStorage implementation that uses Accumulo as the backing store.
*
* <p>
* When writing data, the first entry in the {@link Tuple} is treated as the row
* in the Accumulo key, while subsequent entries in the tuple are handled as
* columns in that row. {@link Map}s are expanded, placing the map key in the
* column family and the map value in the Accumulo value. Scalars are placed
* directly into the value with an empty column qualifier. If the columns
* argument on the constructor is omitted, null or the empty String, no column
* family is provided on the Keys created for Accumulo
* </p>
*
* <p>
* When reading data, if aggregateColfams is true, elements in the same row and
* column family are aggregated into a single {@link Map}. This will result in a
* {@link Tuple} of length (unique_column_families + 1) for the given row. If
* aggregateColfams is false, column family and column qualifier are
* concatenated (separated by a colon), and placed into a {@link Map}. This will
* result in a {@link Tuple} with two entries, where the latter element has a
* number of elements equal to the number of columns in the given row.
* </p>
*/
public class AccumuloStorage extends AbstractAccumuloStorage {
private static final Logger log = Logger.getLogger(AccumuloStorage.class);
private static final String COLON = ":", EMPTY = "";
private static final Text EMPTY_TEXT = new Text(new byte[0]);
private static final DataByteArray EMPTY_DATA_BYTE_ARRAY = new DataByteArray(
new byte[0]);
// Not sure if AccumuloStorage instances need to be thread-safe or not
final Text _cfHolder = new Text(), _cqHolder = new Text();
/**
* Creates an AccumuloStorage which writes all values in a {@link Tuple}
* with an empty column family and doesn't group column families together on
* read (creates on {@link Map} for all columns)
*/
public AccumuloStorage() throws ParseException, IOException {
this(EMPTY, EMPTY);
}
/**
* Create an AccumuloStorage with a CSV of columns-families to use on write
* and whether columns in a row should be grouped by family on read.
*
* @param columns
* A comma-separated list of column families to use when writing
* data, aligned to the n'th entry in the tuple
* @param aggregateColfams
* Should unique column qualifier and value pairs be grouped
* together by column family when reading data
*/
public AccumuloStorage(String columns) throws ParseException, IOException {
this(columns, EMPTY);
}
public AccumuloStorage(String columnStr, String args)
throws ParseException, IOException {
super(columnStr, args);
}
@Override
protected Tuple getTuple(Key key, Value value) throws IOException {
SortedMap<Key, Value> rowKVs = WholeRowIterator.decodeRow(key, value);
Tuple tuple = TupleFactory.getInstance().newTuple(columns.size() + 1);
final Text cfHolder = new Text();
final Text cqHolder = new Text();
final Text row = key.getRow();
int tupleOffset = 0;
tuple.set(
tupleOffset,
new DataByteArray(Text.decode(row.getBytes(), 0,
row.getLength())));
for (Column column : this.columns) {
tupleOffset++;
switch (column.getType()) {
case LITERAL:
cfHolder.set(column.getColumnFamily());
if (null != column.getColumnQualifier()) {
cqHolder.set(column.getColumnQualifier());
} else {
cqHolder.set(EMPTY_TEXT);
}
// Get the key where our literal would exist (accounting for
// "colf:colq" or "colf:" empty colq)
Key literalStartKey = new Key(row, cfHolder, cqHolder);
SortedMap<Key, Value> tailMap = rowKVs.tailMap(literalStartKey);
// Find the element
if (tailMap.isEmpty()) {
tuple.set(tupleOffset, EMPTY_DATA_BYTE_ARRAY);
} else {
Key actualKey = tailMap.firstKey();
// Only place it in the tuple if it matches the user
// request, avoid using a value from a
// key with the wrong colqual
if (0 == literalStartKey.compareTo(actualKey,
PartialKey.ROW_COLFAM_COLQUAL)) {
tuple.set(tupleOffset,
new DataByteArray(tailMap.get(actualKey).get()));
} else {
// This row doesn't have the column we were looking for
tuple.set(tupleOffset, EMPTY_DATA_BYTE_ARRAY);
}
}
break;
case COLFAM_PREFIX:
cfHolder.set(column.getColumnFamily());
Range colfamPrefixRange = Range.prefix(row, cfHolder);
Key colfamPrefixStartKey = new Key(row, cfHolder);
SortedMap<Key, Value> cfTailMap = rowKVs
.tailMap(colfamPrefixStartKey);
// Find the element
if (cfTailMap.isEmpty()) {
tuple.set(tupleOffset, EMPTY_DATA_BYTE_ARRAY);
} else {
HashMap<String, DataByteArray> tupleMap = new HashMap<String, DataByteArray>();
// Build up a map for all the entries in this row that match
// the colfam prefix
for (Entry<Key, Value> entry : cfTailMap.entrySet()) {
if (colfamPrefixRange.contains(entry.getKey())) {
entry.getKey().getColumnFamily(cfHolder);
entry.getKey().getColumnQualifier(cqHolder);
DataByteArray val = new DataByteArray(entry
.getValue().get());
// Avoid adding an extra ':' when colqual is empty
if (0 == cqHolder.getLength()) {
tupleMap.put(cfHolder.toString(), val);
} else {
tupleMap.put(cfHolder.toString() + COLON
+ cqHolder.toString(), val);
}
} else {
break;
}
}
if (!tupleMap.isEmpty()) {
tuple.set(tupleOffset, tupleMap);
}
}
break;
case COLQUAL_PREFIX:
cfHolder.set(column.getColumnFamily());
cqHolder.set(column.getColumnQualifier());
Range colqualPrefixRange = Range
.prefix(row, cfHolder, cqHolder);
Key colqualPrefixStartKey = new Key(row, cfHolder, cqHolder);
SortedMap<Key, Value> cqTailMap = rowKVs
.tailMap(colqualPrefixStartKey);
if (cqTailMap.isEmpty()) {
tuple.set(tupleOffset, EMPTY_DATA_BYTE_ARRAY);
} else {
HashMap<String, DataByteArray> tupleMap = new HashMap<String, DataByteArray>();
// Build up a map for all the entries in this row that match
// the colqual prefix
for (Entry<Key, Value> entry : cqTailMap.entrySet()) {
if (colqualPrefixRange.contains(entry.getKey())) {
entry.getKey().getColumnFamily(cfHolder);
entry.getKey().getColumnQualifier(cqHolder);
DataByteArray val = new DataByteArray(entry
.getValue().get());
// Avoid the extra ':' on empty colqual
if (0 == cqHolder.getLength()) {
tupleMap.put(cfHolder.toString(), val);
} else {
tupleMap.put(cfHolder.toString() + COLON
+ cqHolder.toString(), val);
}
} else {
break;
}
}
if (!tupleMap.isEmpty()) {
tuple.set(tupleOffset, tupleMap);
}
}
break;
default:
break;
}
}
return tuple;
}
@Override
protected void configureInputFormat(Job job) {
AccumuloInputFormat.addIterator(job, new IteratorSetting(100,
WholeRowIterator.class));
}
@Override
protected Collection<Mutation> getMutations(Tuple tuple)
throws ExecException, IOException {
final ResourceFieldSchema[] fieldSchemas = (schema == null) ? null
: schema.getFields();
Iterator<Object> tupleIter = tuple.iterator();
if (1 >= tuple.size()) {
log.debug("Ignoring tuple of size " + tuple.size());
return Collections.emptyList();
}
Mutation mutation = new Mutation(objectToText(tupleIter.next(),
(null == fieldSchemas) ? null : fieldSchemas[0]));
int tupleOffset = 1;
Iterator<Column> columnIter = columns.iterator();
while (tupleIter.hasNext() && columnIter.hasNext()) {
Object o = tupleIter.next();
Column column = columnIter.next();
// Grab the type for this field
final byte type = schemaToType(o, (null == fieldSchemas) ? null
: fieldSchemas[tupleOffset]);
switch (column.getType()) {
case LITERAL:
byte[] bytes = objToBytes(o, type);
if (null != bytes) {
Value value = new Value(bytes);
// We don't have any column name from non-Maps
addColumn(mutation, column.getColumnFamily(),
column.getColumnQualifier(), value);
}
break;
case COLFAM_PREFIX:
case COLQUAL_PREFIX:
Map<String, Object> map;
try {
map = (Map<String, Object>) o;
} catch (ClassCastException e) {
log.error("Expected Map at tuple offset " + tupleOffset
+ " but was " + o.getClass().getSimpleName());
throw e;
}
for (Entry<String, Object> entry : map.entrySet()) {
String key = entry.getKey();
Object objValue = entry.getValue();
byte valueType = DataType.findType(objValue);
byte[] mapValue = objToBytes(objValue, valueType);
if (Column.Type.COLFAM_PREFIX == column.getType()) {
addColumn(mutation, column.getColumnFamily() + key,
null, new Value(mapValue));
} else if (Column.Type.COLQUAL_PREFIX == column.getType()) {
addColumn(mutation, column.getColumnFamily(),
column.getColumnQualifier() + key, new Value(
mapValue));
} else {
throw new IOException("Unknown column type");
}
}
break;
default:
log.info("Ignoring unhandled column type");
continue;
}
tupleOffset++;
}
if (0 == mutation.size()) {
return Collections.emptyList();
}
return Collections.singletonList(mutation);
}
/**
* Adds the given column family, column qualifier and value to the given
* mutation
*
* @param mutation
* @param colfam
* @param colqual
* @param columnValue
*/
protected void addColumn(Mutation mutation, String colfam, String colqual,
Value columnValue) {
if (null != colfam) {
_cfHolder.set(colfam);
} else {
_cfHolder.clear();
}
if (null != colqual) {
_cqHolder.set(colqual);
} else {
_cqHolder.clear();
}
mutation.put(_cfHolder, _cqHolder, columnValue);
}
}