blob: d2be9b67af7d3ba1c363978395f6a544a6c4cdc0 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.accumulo.pig;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.SortedMap;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.PartialKey;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.iterators.user.WholeRowIterator;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.log4j.Logger;
import org.apache.pig.ResourceSchema.ResourceFieldSchema;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.builtin.Utf8StorageConverter;
import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import com.google.common.collect.Lists;
public class AccumuloStorage extends AbstractAccumuloStorage {
private static final Logger log = Logger.getLogger(AccumuloStorage.class);
private static final String COMMA = ",", COLON = ":";
private static final Text EMPTY_TEXT = new Text(new byte[0]);
public static final String METADATA_SUFFIX = "_metadata";
protected final List<String> columnSpecs;
protected final boolean aggregateColfams;
// Not sure if AccumuloStorage instances need to be thread-safe or not
final Text _cfHolder = new Text(), _cqHolder = new Text();
public AccumuloStorage() {
this("");
}
public AccumuloStorage(boolean aggregateColfams) {
this("", aggregateColfams);
}
public AccumuloStorage(String columns) {
this(columns, false);
}
public AccumuloStorage(String columns, boolean aggregateColfams) {
this.caster = new Utf8StorageConverter();
this.aggregateColfams = aggregateColfams;
// TODO It would be nice to have some other means than enumerating
// the CF for every column in the Tuples we're going process
if (!StringUtils.isBlank(columns)) {
String[] columnArray = StringUtils.split(columns, COMMA);
columnSpecs = Lists.newArrayList(columnArray);
} else {
columnSpecs = Collections.emptyList();
}
}
@Override
protected Tuple getTuple(Key key, Value value) throws IOException {
// System.out.println(key);
SortedMap<Key,Value> rowKVs = WholeRowIterator.decodeRow(key, value);
List<Object> tupleEntries = Lists.newLinkedList();
Iterator<Entry<Key,Value>> iter = rowKVs.entrySet().iterator();
List<Entry<Key,Value>> aggregate = Lists.newLinkedList();
Entry<Key,Value> currentEntry = null;
while (iter.hasNext()) {
if (null == currentEntry) {
currentEntry = iter.next();
aggregate.add(currentEntry);
} else {
Entry<Key,Value> nextEntry = iter.next();
// If we're not aggregating colfams together, or we are and we have the same colfam
if (!aggregateColfams || currentEntry.getKey().equals(nextEntry.getKey(), PartialKey.ROW_COLFAM)) {
// Aggregate this entry into the map
aggregate.add(nextEntry);
} else {
currentEntry = nextEntry;
// Flush and start again
Map<String,Object> map = aggregate(aggregate);
tupleEntries.add(map);
aggregate = Lists.newLinkedList();
aggregate.add(currentEntry);
}
}
}
if (!aggregate.isEmpty()) {
tupleEntries.add(aggregate(aggregate));
}
// and wrap it in a tuple
Tuple tuple = TupleFactory.getInstance().newTuple(tupleEntries.size() + 1);
tuple.set(0, key.getRow().toString());
int i = 1;
for (Object obj : tupleEntries) {
tuple.set(i, obj);
i++;
}
return tuple;
}
protected Map<String,Object> aggregate(List<Entry<Key,Value>> columns) {
final Map<String,Object> map = new HashMap<String,Object>();
final StringBuilder sb = new StringBuilder(128);
for (Entry<Key,Value> column : columns) {
String cf = column.getKey().getColumnFamily().toString(), cq = column.getKey().getColumnQualifier().toString();
sb.append(cf);
if (!cq.isEmpty()) {
sb.append(COLON).append(cq);
}
map.put(sb.toString(), new DataByteArray(column.getValue().get()));
sb.setLength(0);
}
return map;
}
@Override
protected void configureInputFormat(Job job) {
AccumuloInputFormat.addIterator(job, new IteratorSetting(50, WholeRowIterator.class));
}
@Override
public Collection<Mutation> getMutations(Tuple tuple) throws ExecException, IOException {
final ResourceFieldSchema[] fieldSchemas = (schema == null) ? null : schema.getFields();
Iterator<Object> tupleIter = tuple.iterator();
if (1 >= tuple.size()) {
log.debug("Ignoring tuple of size " + tuple.size());
return Collections.emptyList();
}
Mutation mutation = new Mutation(objectToText(tupleIter.next(), (null == fieldSchemas) ? null : fieldSchemas[0]));
int columnOffset = 0;
int tupleOffset = 1;
while (tupleIter.hasNext()) {
Object o = tupleIter.next();
String family = null;
// Figure out if the user provided a specific columnfamily to use.
if (columnOffset < columnSpecs.size()) {
family = columnSpecs.get(columnOffset);
}
// Grab the type for this field
final byte type = schemaToType(o, (null == fieldSchemas) ? null : fieldSchemas[tupleOffset]);
// If we have a Map, we want to treat every Entry as a column in this record
// placing said column in the column family unless this instance of AccumuloStorage
// was provided a specific columnFamily to use, in which case the entry's column is
// in the column qualifier.
if (DataType.MAP == type) {
@SuppressWarnings("unchecked")
Map<String,Object> map = (Map<String,Object>) o;
for (Entry<String,Object> entry : map.entrySet()) {
Object entryObject = entry.getValue();
// Treat a null value in the map as the lack of this column
// The input may have come from a structured source where the
// column could not have been omitted. We can handle the lack of the column
if (null != entryObject) {
byte entryType = DataType.findType(entryObject);
Value value = new Value(objToBytes(entryObject, entryType));
addColumn(mutation, family, entry.getKey(), value);
}
}
} else {
byte[] bytes = objToBytes(o, type);
if (null != bytes) {
Value value = new Value(bytes);
// We don't have any column name from non-Maps
addColumn(mutation, family, null, value);
}
}
columnOffset++;
tupleOffset++;
}
if (0 == mutation.size()) {
return Collections.emptyList();
}
return Collections.singletonList(mutation);
}
/**
* Adds column and value to the given mutation. A columnfamily and optional column qualifier
* or column qualifier prefix is pulled from {@link columnDef} with the family and qualifier
* delimiter being a colon. If {@link columnName} is non-null, it will be appended to the qualifier.
*
* If both the {@link columnDef} and {@link columnName} are null, nothing is added to the mutation
*
* @param mutation
* @param columnDef
* @param columnName
* @param columnValue
*/
protected void addColumn(Mutation mutation, String columnDef, String columnName, Value columnValue) {
if (null == columnDef && null == columnName) {
// TODO Emit a counter here somehow? org.apache.pig.tools.pigstats.PigStatusReporter
log.warn("Was provided no name or definition for column. Ignoring value");
return;
}
if (null != columnDef) {
// use the provided columnDef to make a cf (with optional cq prefix)
int index = columnDef.indexOf(COLON);
if (-1 == index) {
_cfHolder.set(columnDef);
_cqHolder.clear();
} else {
byte[] cfBytes = columnDef.getBytes();
_cfHolder.set(cfBytes, 0, index);
_cqHolder.set(cfBytes, index + 1, cfBytes.length - (index + 1));
}
} else {
_cfHolder.clear();
_cqHolder.clear();
}
// If we have a column name (this came from a Map)
// append that name on the cq.
if (null != columnName) {
byte[] cnBytes = columnName.getBytes();
// CQ is either empty or has a prefix from the columnDef
_cqHolder.append(cnBytes, 0, cnBytes.length);
}
mutation.put(_cfHolder, _cqHolder, columnValue);
}
}