blob: 71fe037c5da548cc4cc72fbf74de941676661317 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.hadoop.pig;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.URLDecoder;
import java.nio.ByteBuffer;
import java.nio.charset.CharacterCodingException;
import java.util.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.auth.PasswordAuthenticator;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.db.marshal.*;
import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.exceptions.SyntaxException;
import org.apache.cassandra.hadoop.ColumnFamilyRecordReader;
import org.apache.cassandra.hadoop.ConfigHelper;
import org.apache.cassandra.hadoop.HadoopCompat;
import org.apache.cassandra.schema.LegacySchemaTables;
import org.apache.cassandra.serializers.CollectionSerializer;
import org.apache.cassandra.thrift.*;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Hex;
import org.apache.cassandra.utils.UUIDGen;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.*;
import org.apache.pig.Expression;
import org.apache.pig.LoadFunc;
import org.apache.pig.LoadMetadata;
import org.apache.pig.ResourceSchema;
import org.apache.pig.ResourceStatistics;
import org.apache.pig.StoreFuncInterface;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.data.*;
import org.apache.pig.impl.util.UDFContext;
import org.apache.pig.ResourceSchema.ResourceFieldSchema;
import org.apache.thrift.TDeserializer;
import org.apache.thrift.TException;
import org.apache.thrift.TSerializer;
import org.apache.thrift.protocol.TBinaryProtocol;
/**
* A LoadStoreFunc for retrieving data from and storing data to Cassandra
*
* A row from a standard CF will be returned as nested tuples: (key, ((name1, val1), (name2, val2))).
*/
@Deprecated
public class CassandraStorage extends LoadFunc implements StoreFuncInterface, LoadMetadata
{
public final static String PIG_ALLOW_DELETES = "PIG_ALLOW_DELETES";
public final static String PIG_WIDEROW_INPUT = "PIG_WIDEROW_INPUT";
public final static String PIG_USE_SECONDARY = "PIG_USE_SECONDARY";
private final static ByteBuffer BOUND = ByteBufferUtil.EMPTY_BYTE_BUFFER;
private static final Logger logger = LoggerFactory.getLogger(CassandraStorage.class);
private ByteBuffer slice_start = BOUND;
private ByteBuffer slice_end = BOUND;
private boolean slice_reverse = false;
private boolean allow_deletes = false;
private RecordReader<ByteBuffer, Map<ByteBuffer, ColumnFamilyRecordReader.Column>> reader;
private RecordWriter<ByteBuffer, List<Mutation>> writer;
private boolean widerows = false;
private int limit;
protected String DEFAULT_INPUT_FORMAT;
protected String DEFAULT_OUTPUT_FORMAT;
protected enum MarshallerType { COMPARATOR, DEFAULT_VALIDATOR, KEY_VALIDATOR, SUBCOMPARATOR };
protected String username;
protected String password;
protected String keyspace;
protected String column_family;
protected String loadSignature;
protected String storeSignature;
protected Configuration conf;
protected String inputFormatClass;
protected String outputFormatClass;
protected int splitSize = 64 * 1024;
protected String partitionerClass;
protected boolean usePartitionFilter = false;
protected String initHostAddress;
protected String rpcPort;
protected int nativeProtocolVersion = 1;
// wide row hacks
private ByteBuffer lastKey;
private Map<ByteBuffer, ColumnFamilyRecordReader.Column> lastRow;
private boolean hasNext = true;
public CassandraStorage()
{
this(1024);
}
/**@param limit number of columns to fetch in a slice */
public CassandraStorage(int limit)
{
super();
this.limit = limit;
DEFAULT_INPUT_FORMAT = "org.apache.cassandra.hadoop.ColumnFamilyInputFormat";
DEFAULT_OUTPUT_FORMAT = "org.apache.cassandra.hadoop.ColumnFamilyOutputFormat";
}
public int getLimit()
{
return limit;
}
public void prepareToRead(RecordReader reader, PigSplit split)
{
this.reader = reader;
}
/** read wide row*/
public Tuple getNextWide() throws IOException
{
CfDef cfDef = getCfDef(loadSignature);
ByteBuffer key = null;
Tuple tuple = null;
DefaultDataBag bag = new DefaultDataBag();
try
{
while(true)
{
hasNext = reader.nextKeyValue();
if (!hasNext)
{
if (tuple == null)
tuple = TupleFactory.getInstance().newTuple();
if (lastRow != null)
{
if (tuple.size() == 0) // lastRow is a new one
{
key = reader.getCurrentKey();
tuple = keyToTuple(key, cfDef, parseType(cfDef.getKey_validation_class()));
}
for (Map.Entry<ByteBuffer, ColumnFamilyRecordReader.Column> entry : lastRow.entrySet())
{
bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type())));
}
lastKey = null;
lastRow = null;
tuple.append(bag);
return tuple;
}
else
{
if (tuple.size() == 1) // rare case of just one wide row, key already set
{
tuple.append(bag);
return tuple;
}
else
return null;
}
}
if (key != null && !(reader.getCurrentKey()).equals(key)) // key changed
{
// read too much, hold on to it for next time
lastKey = reader.getCurrentKey();
lastRow = reader.getCurrentValue();
// but return what we have so far
tuple.append(bag);
return tuple;
}
if (key == null) // only set the key on the first iteration
{
key = reader.getCurrentKey();
if (lastKey != null && !(key.equals(lastKey))) // last key only had one value
{
if (tuple == null)
tuple = keyToTuple(lastKey, cfDef, parseType(cfDef.getKey_validation_class()));
else
addKeyToTuple(tuple, lastKey, cfDef, parseType(cfDef.getKey_validation_class()));
for (Map.Entry<ByteBuffer, ColumnFamilyRecordReader.Column> entry : lastRow.entrySet())
{
bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type())));
}
tuple.append(bag);
lastKey = key;
lastRow = reader.getCurrentValue();
return tuple;
}
if (tuple == null)
tuple = keyToTuple(key, cfDef, parseType(cfDef.getKey_validation_class()));
else
addKeyToTuple(tuple, lastKey, cfDef, parseType(cfDef.getKey_validation_class()));
}
SortedMap<ByteBuffer, ColumnFamilyRecordReader.Column> row =
(SortedMap<ByteBuffer, ColumnFamilyRecordReader.Column>)reader.getCurrentValue();
if (lastRow != null) // prepend what was read last time
{
for (Map.Entry<ByteBuffer, ColumnFamilyRecordReader.Column> entry : lastRow.entrySet())
{
bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type())));
}
lastKey = null;
lastRow = null;
}
for (Map.Entry<ByteBuffer, ColumnFamilyRecordReader.Column> entry : row.entrySet())
{
bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type())));
}
}
}
catch (InterruptedException e)
{
throw new IOException(e.getMessage());
}
}
/** read next row */
public Tuple getNext() throws IOException
{
if (widerows)
return getNextWide();
try
{
// load the next pair
if (!reader.nextKeyValue())
return null;
CfDef cfDef = getCfDef(loadSignature);
ByteBuffer key = reader.getCurrentKey();
Map<ByteBuffer, ColumnFamilyRecordReader.Column> cf = reader.getCurrentValue();
assert key != null && cf != null;
// output tuple, will hold the key, each indexed column in a tuple, then a bag of the rest
// NOTE: we're setting the tuple size here only for the key so we can use setTupleValue on it
Tuple tuple = keyToTuple(key, cfDef, parseType(cfDef.getKey_validation_class()));
DefaultDataBag bag = new DefaultDataBag();
// we must add all the indexed columns first to match the schema
Map<ByteBuffer, Boolean> added = new HashMap<ByteBuffer, Boolean>(cfDef.column_metadata.size());
// take care to iterate these in the same order as the schema does
for (ColumnDef cdef : cfDef.column_metadata)
{
boolean hasColumn = false;
boolean cql3Table = false;
try
{
hasColumn = cf.containsKey(cdef.name);
}
catch (Exception e)
{
cql3Table = true;
}
if (hasColumn)
{
tuple.append(columnToTuple(cf.get(cdef.name), cfDef, parseType(cfDef.getComparator_type())));
}
else if (!cql3Table)
{ // otherwise, we need to add an empty tuple to take its place
tuple.append(TupleFactory.getInstance().newTuple());
}
added.put(cdef.name, true);
}
// now add all the other columns
for (Map.Entry<ByteBuffer, ColumnFamilyRecordReader.Column> entry : cf.entrySet())
{
if (!added.containsKey(entry.getKey()))
bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type())));
}
tuple.append(bag);
// finally, special top-level indexes if needed
if (usePartitionFilter)
{
for (ColumnDef cdef : getIndexes())
{
Tuple throwaway = columnToTuple(cf.get(cdef.name), cfDef, parseType(cfDef.getComparator_type()));
tuple.append(throwaway.get(1));
}
}
return tuple;
}
catch (InterruptedException e)
{
throw new IOException(e.getMessage());
}
}
/** write next row */
public void putNext(Tuple t) throws IOException
{
/*
We support two cases for output:
First, the original output:
(key, (name, value), (name,value), {(name,value)}) (tuples or bag is optional)
For supers, we only accept the original output.
*/
if (t.size() < 1)
{
// simply nothing here, we can't even delete without a key
logger.warn("Empty output skipped, filter empty tuples to suppress this warning");
return;
}
ByteBuffer key = objToBB(t.get(0));
if (t.getType(1) == DataType.TUPLE)
writeColumnsFromTuple(key, t, 1);
else if (t.getType(1) == DataType.BAG)
{
if (t.size() > 2)
throw new IOException("No arguments allowed after bag");
writeColumnsFromBag(key, (DataBag) t.get(1));
}
else
throw new IOException("Second argument in output must be a tuple or bag");
}
/** set hadoop cassandra connection settings */
protected void setConnectionInformation() throws IOException
{
StorageHelper.setConnectionInformation(conf);
if (System.getenv(StorageHelper.PIG_INPUT_FORMAT) != null)
inputFormatClass = getFullyQualifiedClassName(System.getenv(StorageHelper.PIG_INPUT_FORMAT));
else
inputFormatClass = DEFAULT_INPUT_FORMAT;
if (System.getenv(StorageHelper.PIG_OUTPUT_FORMAT) != null)
outputFormatClass = getFullyQualifiedClassName(System.getenv(StorageHelper.PIG_OUTPUT_FORMAT));
else
outputFormatClass = DEFAULT_OUTPUT_FORMAT;
if (System.getenv(PIG_ALLOW_DELETES) != null)
allow_deletes = Boolean.parseBoolean(System.getenv(PIG_ALLOW_DELETES));
}
/** get the full class name */
protected String getFullyQualifiedClassName(String classname)
{
return classname.contains(".") ? classname : "org.apache.cassandra.hadoop." + classname;
}
/** set read configuration settings */
public void setLocation(String location, Job job) throws IOException
{
conf = HadoopCompat.getConfiguration(job);
setLocationFromUri(location);
if (ConfigHelper.getInputSlicePredicate(conf) == null)
{
SliceRange range = new SliceRange(slice_start, slice_end, slice_reverse, limit);
SlicePredicate predicate = new SlicePredicate().setSlice_range(range);
ConfigHelper.setInputSlicePredicate(conf, predicate);
}
if (System.getenv(PIG_WIDEROW_INPUT) != null)
widerows = Boolean.parseBoolean(System.getenv(PIG_WIDEROW_INPUT));
if (System.getenv(PIG_USE_SECONDARY) != null)
usePartitionFilter = Boolean.parseBoolean(System.getenv(PIG_USE_SECONDARY));
if (System.getenv(StorageHelper.PIG_INPUT_SPLIT_SIZE) != null)
{
try
{
ConfigHelper.setInputSplitSize(conf, Integer.parseInt(System.getenv(StorageHelper.PIG_INPUT_SPLIT_SIZE)));
}
catch (NumberFormatException e)
{
throw new IOException("PIG_INPUT_SPLIT_SIZE is not a number", e);
}
}
if (usePartitionFilter && getIndexExpressions() != null)
ConfigHelper.setInputRange(conf, getIndexExpressions());
if (username != null && password != null)
ConfigHelper.setInputKeyspaceUserNameAndPassword(conf, username, password);
if (splitSize > 0)
ConfigHelper.setInputSplitSize(conf, splitSize);
if (partitionerClass!= null)
ConfigHelper.setInputPartitioner(conf, partitionerClass);
if (rpcPort != null)
ConfigHelper.setInputRpcPort(conf, rpcPort);
if (initHostAddress != null)
ConfigHelper.setInputInitialAddress(conf, initHostAddress);
ConfigHelper.setInputColumnFamily(conf, keyspace, column_family, widerows);
setConnectionInformation();
if (ConfigHelper.getInputRpcPort(conf) == 0)
throw new IOException("PIG_INPUT_RPC_PORT or PIG_RPC_PORT environment variable not set");
if (ConfigHelper.getInputInitialAddress(conf) == null)
throw new IOException("PIG_INPUT_INITIAL_ADDRESS or PIG_INITIAL_ADDRESS environment variable not set");
if (ConfigHelper.getInputPartitioner(conf) == null)
throw new IOException("PIG_INPUT_PARTITIONER or PIG_PARTITIONER environment variable not set");
if (loadSignature == null)
loadSignature = location;
initSchema(loadSignature);
}
/** set store configuration settings */
public void setStoreLocation(String location, Job job) throws IOException
{
conf = HadoopCompat.getConfiguration(job);
// don't combine mappers to a single mapper per node
conf.setBoolean("pig.noSplitCombination", true);
setLocationFromUri(location);
if (username != null && password != null)
ConfigHelper.setOutputKeyspaceUserNameAndPassword(conf, username, password);
if (splitSize > 0)
ConfigHelper.setInputSplitSize(conf, splitSize);
if (partitionerClass!= null)
ConfigHelper.setOutputPartitioner(conf, partitionerClass);
if (rpcPort != null)
{
ConfigHelper.setOutputRpcPort(conf, rpcPort);
ConfigHelper.setInputRpcPort(conf, rpcPort);
}
if (initHostAddress != null)
{
ConfigHelper.setOutputInitialAddress(conf, initHostAddress);
ConfigHelper.setInputInitialAddress(conf, initHostAddress);
}
ConfigHelper.setOutputColumnFamily(conf, keyspace, column_family);
setConnectionInformation();
if (ConfigHelper.getOutputRpcPort(conf) == 0)
throw new IOException("PIG_OUTPUT_RPC_PORT or PIG_RPC_PORT environment variable not set");
if (ConfigHelper.getOutputInitialAddress(conf) == null)
throw new IOException("PIG_OUTPUT_INITIAL_ADDRESS or PIG_INITIAL_ADDRESS environment variable not set");
if (ConfigHelper.getOutputPartitioner(conf) == null)
throw new IOException("PIG_OUTPUT_PARTITIONER or PIG_PARTITIONER environment variable not set");
// we have to do this again here for the check in writeColumnsFromTuple
if (System.getenv(PIG_USE_SECONDARY) != null)
usePartitionFilter = Boolean.parseBoolean(System.getenv(PIG_USE_SECONDARY));
initSchema(storeSignature);
}
/** Methods to get the column family schema from Cassandra */
protected void initSchema(String signature) throws IOException
{
Properties properties = UDFContext.getUDFContext().getUDFProperties(CassandraStorage.class);
// Only get the schema if we haven't already gotten it
if (!properties.containsKey(signature))
{
try
{
Cassandra.Client client = ConfigHelper.getClientFromInputAddressList(conf);
client.set_keyspace(keyspace);
if (username != null && password != null)
{
Map<String, String> credentials = new HashMap<String, String>(2);
credentials.put(PasswordAuthenticator.USERNAME_KEY, username);
credentials.put(PasswordAuthenticator.PASSWORD_KEY, password);
try
{
client.login(new AuthenticationRequest(credentials));
}
catch (AuthenticationException e)
{
logger.error("Authentication exception: invalid username and/or password");
throw new IOException(e);
}
}
// compose the CfDef for the columfamily
CfDef cfDef = getCfDef(client);
if (cfDef != null)
{
StringBuilder sb = new StringBuilder();
sb.append(cfdefToString(cfDef));
properties.setProperty(signature, sb.toString());
}
else
throw new IOException(String.format("Table '%s' not found in keyspace '%s'",
column_family,
keyspace));
}
catch (Exception e)
{
throw new IOException(e);
}
}
}
public void checkSchema(ResourceSchema schema) throws IOException
{
// we don't care about types, they all get casted to ByteBuffers
}
/** define the schema */
public ResourceSchema getSchema(String location, Job job) throws IOException
{
setLocation(location, job);
CfDef cfDef = getCfDef(loadSignature);
if (cfDef.column_type.equals("Super"))
return null;
/*
Our returned schema should look like this:
(key, index1:(name, value), index2:(name, value), columns:{(name, value)})
Which is to say, columns that have metadata will be returned as named tuples, but unknown columns will go into a bag.
This way, wide rows can still be handled by the bag, but known columns can easily be referenced.
*/
// top-level schema, no type
ResourceSchema schema = new ResourceSchema();
// get default marshallers and validators
Map<MarshallerType, AbstractType> marshallers = getDefaultMarshallers(cfDef);
Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef);
// add key
ResourceFieldSchema keyFieldSchema = new ResourceFieldSchema();
keyFieldSchema.setName("key");
keyFieldSchema.setType(StorageHelper.getPigType(marshallers.get(MarshallerType.KEY_VALIDATOR)));
ResourceSchema bagSchema = new ResourceSchema();
ResourceFieldSchema bagField = new ResourceFieldSchema();
bagField.setType(DataType.BAG);
bagField.setName("columns");
// inside the bag, place one tuple with the default comparator/validator schema
ResourceSchema bagTupleSchema = new ResourceSchema();
ResourceFieldSchema bagTupleField = new ResourceFieldSchema();
bagTupleField.setType(DataType.TUPLE);
ResourceFieldSchema bagcolSchema = new ResourceFieldSchema();
ResourceFieldSchema bagvalSchema = new ResourceFieldSchema();
bagcolSchema.setName("name");
bagvalSchema.setName("value");
bagcolSchema.setType(StorageHelper.getPigType(marshallers.get(MarshallerType.COMPARATOR)));
bagvalSchema.setType(StorageHelper.getPigType(marshallers.get(MarshallerType.DEFAULT_VALIDATOR)));
bagTupleSchema.setFields(new ResourceFieldSchema[] { bagcolSchema, bagvalSchema });
bagTupleField.setSchema(bagTupleSchema);
bagSchema.setFields(new ResourceFieldSchema[] { bagTupleField });
bagField.setSchema(bagSchema);
// will contain all fields for this schema
List<ResourceFieldSchema> allSchemaFields = new ArrayList<ResourceFieldSchema>();
// add the key first, then the indexed columns, and finally the bag
allSchemaFields.add(keyFieldSchema);
if (!widerows)
{
// defined validators/indexes
for (ColumnDef cdef : cfDef.column_metadata)
{
// make a new tuple for each col/val pair
ResourceSchema innerTupleSchema = new ResourceSchema();
ResourceFieldSchema innerTupleField = new ResourceFieldSchema();
innerTupleField.setType(DataType.TUPLE);
innerTupleField.setSchema(innerTupleSchema);
innerTupleField.setName(new String(cdef.getName()));
ResourceFieldSchema idxColSchema = new ResourceFieldSchema();
idxColSchema.setName("name");
idxColSchema.setType(StorageHelper.getPigType(marshallers.get(MarshallerType.COMPARATOR)));
ResourceFieldSchema valSchema = new ResourceFieldSchema();
AbstractType validator = validators.get(cdef.name);
if (validator == null)
validator = marshallers.get(MarshallerType.DEFAULT_VALIDATOR);
valSchema.setName("value");
valSchema.setType(StorageHelper.getPigType(validator));
innerTupleSchema.setFields(new ResourceFieldSchema[] { idxColSchema, valSchema });
allSchemaFields.add(innerTupleField);
}
}
// bag at the end for unknown columns
allSchemaFields.add(bagField);
// add top-level index elements if needed
if (usePartitionFilter)
{
for (ColumnDef cdef : getIndexes())
{
ResourceFieldSchema idxSchema = new ResourceFieldSchema();
idxSchema.setName("index_" + new String(cdef.getName()));
AbstractType validator = validators.get(cdef.name);
if (validator == null)
validator = marshallers.get(MarshallerType.DEFAULT_VALIDATOR);
idxSchema.setType(StorageHelper.getPigType(validator));
allSchemaFields.add(idxSchema);
}
}
// top level schema contains everything
schema.setFields(allSchemaFields.toArray(new ResourceFieldSchema[allSchemaFields.size()]));
return schema;
}
/** set partition filter */
public void setPartitionFilter(Expression partitionFilter) throws IOException
{
UDFContext context = UDFContext.getUDFContext();
Properties property = context.getUDFProperties(CassandraStorage.class);
property.setProperty(StorageHelper.PARTITION_FILTER_SIGNATURE, indexExpressionsToString(filterToIndexExpressions(partitionFilter)));
}
/** prepare writer */
public void prepareToWrite(RecordWriter writer)
{
this.writer = writer;
}
/** convert object to ByteBuffer */
protected ByteBuffer objToBB(Object o)
{
if (o == null)
return nullToBB();
if (o instanceof java.lang.String)
return ByteBuffer.wrap(new DataByteArray((String)o).get());
if (o instanceof Integer)
return Int32Type.instance.decompose((Integer)o);
if (o instanceof Long)
return LongType.instance.decompose((Long)o);
if (o instanceof Float)
return FloatType.instance.decompose((Float)o);
if (o instanceof Double)
return DoubleType.instance.decompose((Double)o);
if (o instanceof UUID)
return ByteBuffer.wrap(UUIDGen.decompose((UUID) o));
if(o instanceof Tuple) {
List<Object> objects = ((Tuple)o).getAll();
//collections
if (objects.size() > 0 && objects.get(0) instanceof String)
{
String collectionType = (String) objects.get(0);
if ("set".equalsIgnoreCase(collectionType) ||
"list".equalsIgnoreCase(collectionType))
return objToListOrSetBB(objects.subList(1, objects.size()));
else if ("map".equalsIgnoreCase(collectionType))
return objToMapBB(objects.subList(1, objects.size()));
}
return objToCompositeBB(objects);
}
return ByteBuffer.wrap(((DataByteArray) o).get());
}
private ByteBuffer objToListOrSetBB(List<Object> objects)
{
List<ByteBuffer> serialized = new ArrayList<ByteBuffer>(objects.size());
for(Object sub : objects)
{
ByteBuffer buffer = objToBB(sub);
serialized.add(buffer);
}
// NOTE: using protocol v1 serialization format for collections so as to not break
// compatibility. Not sure if that's the right thing.
return CollectionSerializer.pack(serialized, objects.size(), 1);
}
private ByteBuffer objToMapBB(List<Object> objects)
{
List<ByteBuffer> serialized = new ArrayList<ByteBuffer>(objects.size() * 2);
for(Object sub : objects)
{
List<Object> keyValue = ((Tuple)sub).getAll();
for (Object entry: keyValue)
{
ByteBuffer buffer = objToBB(entry);
serialized.add(buffer);
}
}
// NOTE: using protocol v1 serialization format for collections so as to not break
// compatibility. Not sure if that's the right thing.
return CollectionSerializer.pack(serialized, objects.size(), 1);
}
private ByteBuffer objToCompositeBB(List<Object> objects)
{
List<ByteBuffer> serialized = new ArrayList<ByteBuffer>(objects.size());
int totalLength = 0;
for(Object sub : objects)
{
ByteBuffer buffer = objToBB(sub);
serialized.add(buffer);
totalLength += 2 + buffer.remaining() + 1;
}
ByteBuffer out = ByteBuffer.allocate(totalLength);
for (ByteBuffer bb : serialized)
{
int length = bb.remaining();
out.put((byte) ((length >> 8) & 0xFF));
out.put((byte) (length & 0xFF));
out.put(bb);
out.put((byte) 0);
}
out.flip();
return out;
}
/** write tuple data to cassandra */
private void writeColumnsFromTuple(ByteBuffer key, Tuple t, int offset) throws IOException
{
ArrayList<Mutation> mutationList = new ArrayList<Mutation>();
for (int i = offset; i < t.size(); i++)
{
if (t.getType(i) == DataType.BAG)
writeColumnsFromBag(key, (DataBag) t.get(i));
else if (t.getType(i) == DataType.TUPLE)
{
Tuple inner = (Tuple) t.get(i);
if (inner.size() > 0) // may be empty, for an indexed column that wasn't present
mutationList.add(mutationFromTuple(inner));
}
else if (!usePartitionFilter)
{
throw new IOException("Output type was not a bag or a tuple");
}
}
if (mutationList.size() > 0)
writeMutations(key, mutationList);
}
/** compose Cassandra mutation from tuple */
private Mutation mutationFromTuple(Tuple t) throws IOException
{
Mutation mutation = new Mutation();
if (t.get(1) == null)
{
if (allow_deletes)
{
mutation.deletion = new Deletion();
mutation.deletion.predicate = new org.apache.cassandra.thrift.SlicePredicate();
mutation.deletion.predicate.column_names = Arrays.asList(objToBB(t.get(0)));
mutation.deletion.setTimestamp(FBUtilities.timestampMicros());
}
else
throw new IOException("null found but deletes are disabled, set " + PIG_ALLOW_DELETES +
"=true in environment or allow_deletes=true in URL to enable");
}
else
{
org.apache.cassandra.thrift.Column column = new org.apache.cassandra.thrift.Column();
column.setName(objToBB(t.get(0)));
column.setValue(objToBB(t.get(1)));
column.setTimestamp(FBUtilities.timestampMicros());
mutation.column_or_supercolumn = new ColumnOrSuperColumn();
mutation.column_or_supercolumn.column = column;
}
return mutation;
}
/** write bag data to Cassandra */
private void writeColumnsFromBag(ByteBuffer key, DataBag bag) throws IOException
{
List<Mutation> mutationList = new ArrayList<Mutation>();
for (Tuple pair : bag)
{
Mutation mutation = new Mutation();
if (DataType.findType(pair.get(1)) == DataType.BAG) // supercolumn
{
SuperColumn sc = new SuperColumn();
sc.setName(objToBB(pair.get(0)));
List<org.apache.cassandra.thrift.Column> columns = new ArrayList<org.apache.cassandra.thrift.Column>();
for (Tuple subcol : (DataBag) pair.get(1))
{
org.apache.cassandra.thrift.Column column = new org.apache.cassandra.thrift.Column();
column.setName(objToBB(subcol.get(0)));
column.setValue(objToBB(subcol.get(1)));
column.setTimestamp(FBUtilities.timestampMicros());
columns.add(column);
}
if (columns.isEmpty())
{
if (allow_deletes)
{
mutation.deletion = new Deletion();
mutation.deletion.super_column = objToBB(pair.get(0));
mutation.deletion.setTimestamp(FBUtilities.timestampMicros());
}
else
throw new IOException("SuperColumn deletion attempted with empty bag, but deletes are disabled, set " +
PIG_ALLOW_DELETES + "=true in environment or allow_deletes=true in URL to enable");
}
else
{
sc.columns = columns;
mutation.column_or_supercolumn = new ColumnOrSuperColumn();
mutation.column_or_supercolumn.super_column = sc;
}
}
else
mutation = mutationFromTuple(pair);
mutationList.add(mutation);
// for wide rows, we need to limit the amount of mutations we write at once
if (mutationList.size() >= 10) // arbitrary, CFOF will re-batch this up, and BOF won't care
{
writeMutations(key, mutationList);
mutationList.clear();
}
}
// write the last batch
if (mutationList.size() > 0)
writeMutations(key, mutationList);
}
/** write mutation to Cassandra */
private void writeMutations(ByteBuffer key, List<Mutation> mutations) throws IOException
{
try
{
writer.write(key, mutations);
}
catch (InterruptedException e)
{
throw new IOException(e);
}
}
/** get a list of columns with defined index*/
protected List<ColumnDef> getIndexes() throws IOException
{
CfDef cfdef = getCfDef(loadSignature);
List<ColumnDef> indexes = new ArrayList<ColumnDef>();
for (ColumnDef cdef : cfdef.column_metadata)
{
if (cdef.index_type != null)
indexes.add(cdef);
}
return indexes;
}
/** get a list of Cassandra IndexExpression from Pig expression */
private List<IndexExpression> filterToIndexExpressions(Expression expression) throws IOException
{
List<IndexExpression> indexExpressions = new ArrayList<IndexExpression>();
Expression.BinaryExpression be = (Expression.BinaryExpression)expression;
ByteBuffer name = ByteBuffer.wrap(be.getLhs().toString().getBytes());
ByteBuffer value = ByteBuffer.wrap(be.getRhs().toString().getBytes());
switch (expression.getOpType())
{
case OP_EQ:
indexExpressions.add(new IndexExpression(name, IndexOperator.EQ, value));
break;
case OP_GE:
indexExpressions.add(new IndexExpression(name, IndexOperator.GTE, value));
break;
case OP_GT:
indexExpressions.add(new IndexExpression(name, IndexOperator.GT, value));
break;
case OP_LE:
indexExpressions.add(new IndexExpression(name, IndexOperator.LTE, value));
break;
case OP_LT:
indexExpressions.add(new IndexExpression(name, IndexOperator.LT, value));
break;
case OP_AND:
indexExpressions.addAll(filterToIndexExpressions(be.getLhs()));
indexExpressions.addAll(filterToIndexExpressions(be.getRhs()));
break;
default:
throw new IOException("Unsupported expression type: " + expression.getOpType().name());
}
return indexExpressions;
}
/** convert a list of index expression to string */
private static String indexExpressionsToString(List<IndexExpression> indexExpressions) throws IOException
{
assert indexExpressions != null;
// oh, you thought cfdefToString was awful?
IndexClause indexClause = new IndexClause();
indexClause.setExpressions(indexExpressions);
indexClause.setStart_key("".getBytes());
TSerializer serializer = new TSerializer(new TBinaryProtocol.Factory());
try
{
return Hex.bytesToHex(serializer.serialize(indexClause));
}
catch (TException e)
{
throw new IOException(e);
}
}
/** convert string to a list of index expression */
private static List<IndexExpression> indexExpressionsFromString(String ie) throws IOException
{
assert ie != null;
TDeserializer deserializer = new TDeserializer(new TBinaryProtocol.Factory());
IndexClause indexClause = new IndexClause();
try
{
deserializer.deserialize(indexClause, Hex.hexToBytes(ie));
}
catch (TException e)
{
throw new IOException(e);
}
return indexClause.getExpressions();
}
public ResourceStatistics getStatistics(String location, Job job)
{
return null;
}
public void cleanupOnFailure(String failure, Job job)
{
}
public void cleanupOnSuccess(String location, Job job) throws IOException {
}
/** StoreFunc methods */
public void setStoreFuncUDFContextSignature(String signature)
{
this.storeSignature = signature;
}
public String relToAbsPathForStoreLocation(String location, Path curDir) throws IOException
{
return relativeToAbsolutePath(location, curDir);
}
/** output format */
public OutputFormat getOutputFormat() throws IOException
{
try
{
return FBUtilities.construct(outputFormatClass, "outputformat");
}
catch (ConfigurationException e)
{
throw new IOException(e);
}
}
@Override
public InputFormat getInputFormat() throws IOException
{
try
{
return FBUtilities.construct(inputFormatClass, "inputformat");
}
catch (ConfigurationException e)
{
throw new IOException(e);
}
}
/** get a list of index expression */
private List<IndexExpression> getIndexExpressions() throws IOException
{
UDFContext context = UDFContext.getUDFContext();
Properties property = context.getUDFProperties(CassandraStorage.class);
if (property.getProperty(StorageHelper.PARTITION_FILTER_SIGNATURE) != null)
return indexExpressionsFromString(property.getProperty(StorageHelper.PARTITION_FILTER_SIGNATURE));
else
return null;
}
/** get a list of column for the column family */
protected List<ColumnDef> getColumnMetadata(Cassandra.Client client)
throws TException, CharacterCodingException, InvalidRequestException, ConfigurationException
{
return getColumnMeta(client, true, true);
}
/** get column meta data */
protected List<ColumnDef> getColumnMeta(Cassandra.Client client, boolean cassandraStorage, boolean includeCompactValueColumn)
throws org.apache.cassandra.thrift.InvalidRequestException,
UnavailableException,
TimedOutException,
SchemaDisagreementException,
TException,
CharacterCodingException,
org.apache.cassandra.exceptions.InvalidRequestException,
ConfigurationException,
NotFoundException
{
String query = String.format("SELECT column_name, validator, index_type, type " +
"FROM %s.%s " +
"WHERE keyspace_name = '%s' AND columnfamily_name = '%s'",
SystemKeyspace.NAME,
LegacySchemaTables.COLUMNS,
keyspace,
column_family);
CqlResult result = client.execute_cql3_query(ByteBufferUtil.bytes(query), Compression.NONE, ConsistencyLevel.ONE);
List<CqlRow> rows = result.rows;
List<ColumnDef> columnDefs = new ArrayList<ColumnDef>();
if (rows == null || rows.isEmpty())
{
// if CassandraStorage, just return the empty list
if (cassandraStorage)
return columnDefs;
// otherwise for CqlNativeStorage, check metadata for classic thrift tables
CFMetaData cfm = getCFMetaData(keyspace, column_family, client);
for (ColumnDefinition def : cfm.regularAndStaticColumns())
{
ColumnDef cDef = new ColumnDef();
String columnName = def.name.toString();
String type = def.type.toString();
logger.trace("name: {}, type: {} ", columnName, type);
cDef.name = ByteBufferUtil.bytes(columnName);
cDef.validation_class = type;
columnDefs.add(cDef);
}
// we may not need to include the value column for compact tables as we
// could have already processed it as schema_columnfamilies.value_alias
if (columnDefs.size() == 0 && includeCompactValueColumn && cfm.compactValueColumn() != null)
{
ColumnDefinition def = cfm.compactValueColumn();
if ("value".equals(def.name.toString()))
{
ColumnDef cDef = new ColumnDef();
cDef.name = def.name.bytes;
cDef.validation_class = def.type.toString();
columnDefs.add(cDef);
}
}
return columnDefs;
}
Iterator<CqlRow> iterator = rows.iterator();
while (iterator.hasNext())
{
CqlRow row = iterator.next();
ColumnDef cDef = new ColumnDef();
String type = ByteBufferUtil.string(row.getColumns().get(3).value);
if (!type.equals("regular"))
continue;
cDef.setName(ByteBufferUtil.clone(row.getColumns().get(0).value));
cDef.validation_class = ByteBufferUtil.string(row.getColumns().get(1).value);
ByteBuffer indexType = row.getColumns().get(2).value;
if (indexType != null)
cDef.index_type = getIndexType(ByteBufferUtil.string(indexType));
columnDefs.add(cDef);
}
return columnDefs;
}
/** get CFMetaData of a column family */
protected CFMetaData getCFMetaData(String ks, String cf, Cassandra.Client client)
throws NotFoundException,
org.apache.cassandra.thrift.InvalidRequestException,
TException,
org.apache.cassandra.exceptions.InvalidRequestException,
ConfigurationException
{
KsDef ksDef = client.describe_keyspace(ks);
for (CfDef cfDef : ksDef.cf_defs)
{
if (cfDef.name.equalsIgnoreCase(cf))
return ThriftConversion.fromThrift(cfDef);
}
return null;
}
/** get index type from string */
protected IndexType getIndexType(String type)
{
type = type.toLowerCase();
if ("keys".equals(type))
return IndexType.KEYS;
else if("custom".equals(type))
return IndexType.CUSTOM;
else if("composites".equals(type))
return IndexType.COMPOSITES;
else
return null;
}
/** return partition keys */
public String[] getPartitionKeys(String location, Job job) throws IOException
{
if (!usePartitionFilter)
return null;
List<ColumnDef> indexes = getIndexes();
String[] partitionKeys = new String[indexes.size()];
for (int i = 0; i < indexes.size(); i++)
{
partitionKeys[i] = new String(indexes.get(i).getName());
}
return partitionKeys;
}
/** convert key to a tuple */
private Tuple keyToTuple(ByteBuffer key, CfDef cfDef, AbstractType comparator) throws IOException
{
Tuple tuple = TupleFactory.getInstance().newTuple(1);
addKeyToTuple(tuple, key, cfDef, comparator);
return tuple;
}
/** add key to a tuple */
private void addKeyToTuple(Tuple tuple, ByteBuffer key, CfDef cfDef, AbstractType comparator) throws IOException
{
if( comparator instanceof AbstractCompositeType )
{
StorageHelper.setTupleValue(tuple, 0, composeComposite((AbstractCompositeType) comparator, key));
}
else
{
StorageHelper.setTupleValue(tuple, 0, StorageHelper.cassandraToObj(getDefaultMarshallers(cfDef).get(MarshallerType.KEY_VALIDATOR), key, nativeProtocolVersion));
}
}
/** Deconstructs a composite type to a Tuple. */
protected Tuple composeComposite(AbstractCompositeType comparator, ByteBuffer name) throws IOException
{
List<AbstractCompositeType.CompositeComponent> result = comparator.deconstruct(name);
Tuple t = TupleFactory.getInstance().newTuple(result.size());
for (int i=0; i<result.size(); i++)
StorageHelper.setTupleValue(t, i, StorageHelper.cassandraToObj(result.get(i).comparator, result.get(i).value, nativeProtocolVersion));
return t;
}
/** cassandra://[username:password@]<keyspace>/<columnfamily>[?slice_start=<start>&slice_end=<end>
* [&reversed=true][&limit=1][&allow_deletes=true][&widerows=true]
* [&use_secondary=true][&comparator=<comparator>][&partitioner=<partitioner>]]*/
private void setLocationFromUri(String location) throws IOException
{
try
{
if (!location.startsWith("cassandra://"))
throw new Exception("Bad scheme." + location);
String[] urlParts = location.split("\\?");
if (urlParts.length > 1)
{
Map<String, String> urlQuery = getQueryMap(urlParts[1]);
AbstractType comparator = BytesType.instance;
if (urlQuery.containsKey("comparator"))
comparator = TypeParser.parse(urlQuery.get("comparator"));
if (urlQuery.containsKey("slice_start"))
slice_start = comparator.fromString(urlQuery.get("slice_start"));
if (urlQuery.containsKey("slice_end"))
slice_end = comparator.fromString(urlQuery.get("slice_end"));
if (urlQuery.containsKey("reversed"))
slice_reverse = Boolean.parseBoolean(urlQuery.get("reversed"));
if (urlQuery.containsKey("limit"))
limit = Integer.parseInt(urlQuery.get("limit"));
if (urlQuery.containsKey("allow_deletes"))
allow_deletes = Boolean.parseBoolean(urlQuery.get("allow_deletes"));
if (urlQuery.containsKey("widerows"))
widerows = Boolean.parseBoolean(urlQuery.get("widerows"));
if (urlQuery.containsKey("use_secondary"))
usePartitionFilter = Boolean.parseBoolean(urlQuery.get("use_secondary"));
if (urlQuery.containsKey("split_size"))
splitSize = Integer.parseInt(urlQuery.get("split_size"));
if (urlQuery.containsKey("partitioner"))
partitionerClass = urlQuery.get("partitioner");
if (urlQuery.containsKey("init_address"))
initHostAddress = urlQuery.get("init_address");
if (urlQuery.containsKey("rpc_port"))
rpcPort = urlQuery.get("rpc_port");
}
String[] parts = urlParts[0].split("/+");
String[] credentialsAndKeyspace = parts[1].split("@");
if (credentialsAndKeyspace.length > 1)
{
String[] credentials = credentialsAndKeyspace[0].split(":");
username = credentials[0];
password = credentials[1];
keyspace = credentialsAndKeyspace[1];
}
else
{
keyspace = parts[1];
}
column_family = parts[2];
}
catch (Exception e)
{
throw new IOException("Expected 'cassandra://[username:password@]<keyspace>/<table>" +
"[?slice_start=<start>&slice_end=<end>[&reversed=true][&limit=1]" +
"[&allow_deletes=true][&widerows=true][&use_secondary=true]" +
"[&comparator=<comparator>][&split_size=<size>][&partitioner=<partitioner>]" +
"[&init_address=<host>][&rpc_port=<port>]]': " + e.getMessage());
}
}
/** decompose the query to store the parameters in a map */
public static Map<String, String> getQueryMap(String query) throws UnsupportedEncodingException
{
String[] params = query.split("&");
Map<String, String> map = new HashMap<String, String>(params.length);
for (String param : params)
{
String[] keyValue = param.split("=");
map.put(keyValue[0], URLDecoder.decode(keyValue[1], "UTF-8"));
}
return map;
}
public ByteBuffer nullToBB()
{
return null;
}
/** return the CfInfo for the column family */
protected CfDef getCfDef(Cassandra.Client client)
throws org.apache.cassandra.thrift.InvalidRequestException,
UnavailableException,
TimedOutException,
SchemaDisagreementException,
TException,
NotFoundException,
org.apache.cassandra.exceptions.InvalidRequestException,
ConfigurationException,
IOException
{
// get CF meta data
String query = String.format("SELECT type, comparator, subcomparator, default_validator, key_validator " +
"FROM %s.%s " +
"WHERE keyspace_name = '%s' AND columnfamily_name = '%s'",
SystemKeyspace.NAME,
LegacySchemaTables.COLUMNFAMILIES,
keyspace,
column_family);
CqlResult result = client.execute_cql3_query(ByteBufferUtil.bytes(query), Compression.NONE, ConsistencyLevel.ONE);
if (result == null || result.rows == null || result.rows.isEmpty())
return null;
Iterator<CqlRow> iteraRow = result.rows.iterator();
CfDef cfDef = new CfDef();
cfDef.keyspace = keyspace;
cfDef.name = column_family;
if (iteraRow.hasNext())
{
CqlRow cqlRow = iteraRow.next();
cfDef.column_type = ByteBufferUtil.string(cqlRow.columns.get(0).value);
cfDef.comparator_type = ByteBufferUtil.string(cqlRow.columns.get(1).value);
ByteBuffer subComparator = cqlRow.columns.get(2).value;
if (subComparator != null)
cfDef.subcomparator_type = ByteBufferUtil.string(subComparator);
cfDef.default_validation_class = ByteBufferUtil.string(cqlRow.columns.get(3).value);
cfDef.key_validation_class = ByteBufferUtil.string(cqlRow.columns.get(4).value);
}
cfDef.column_metadata = getColumnMetadata(client);
return cfDef;
}
/** get the columnfamily definition for the signature */
protected CfDef getCfDef(String signature) throws IOException
{
UDFContext context = UDFContext.getUDFContext();
Properties property = context.getUDFProperties(CassandraStorage.class);
String prop = property.getProperty(signature);
return cfdefFromString(prop);
}
/** convert string back to CfDef */
protected static CfDef cfdefFromString(String st) throws IOException
{
assert st != null;
TDeserializer deserializer = new TDeserializer(new TBinaryProtocol.Factory());
CfDef cfDef = new CfDef();
try
{
deserializer.deserialize(cfDef, Hex.hexToBytes(st));
}
catch (TException e)
{
throw new IOException(e);
}
return cfDef;
}
/** convert CfDef to string */
protected static String cfdefToString(CfDef cfDef) throws IOException
{
assert cfDef != null;
// this is so awful it's kind of cool!
TSerializer serializer = new TSerializer(new TBinaryProtocol.Factory());
try
{
return Hex.bytesToHex(serializer.serialize(cfDef));
}
catch (TException e)
{
throw new IOException(e);
}
}
/** parse the string to a cassandra data type */
protected AbstractType parseType(String type) throws IOException
{
try
{
// always treat counters like longs, specifically CCT.compose is not what we need
if (type != null && type.equals("org.apache.cassandra.db.marshal.CounterColumnType"))
return LongType.instance;
return TypeParser.parse(type);
}
catch (ConfigurationException e)
{
throw new IOException(e);
}
catch (SyntaxException e)
{
throw new IOException(e);
}
}
/** convert a column to a tuple */
protected Tuple columnToTuple(ColumnFamilyRecordReader.Column column, CfDef cfDef, AbstractType comparator) throws IOException
{
Tuple pair = TupleFactory.getInstance().newTuple(2);
// name
if(comparator instanceof AbstractCompositeType)
StorageHelper.setTupleValue(pair, 0, composeComposite((AbstractCompositeType) comparator, column.name));
else
StorageHelper.setTupleValue(pair, 0, StorageHelper.cassandraToObj(comparator, column.name, nativeProtocolVersion));
// value
Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef);
if (validators.get(column.name) == null)
{
Map<MarshallerType, AbstractType> marshallers = getDefaultMarshallers(cfDef);
StorageHelper.setTupleValue(pair, 1, StorageHelper.cassandraToObj(marshallers.get(MarshallerType.DEFAULT_VALIDATOR), column.value, nativeProtocolVersion));
}
else
StorageHelper.setTupleValue(pair, 1, StorageHelper.cassandraToObj(validators.get(column.name), column.value, nativeProtocolVersion));
return pair;
}
/** construct a map to store the mashaller type to cassandra data type mapping */
protected Map<MarshallerType, AbstractType> getDefaultMarshallers(CfDef cfDef) throws IOException
{
Map<MarshallerType, AbstractType> marshallers = new EnumMap<MarshallerType, AbstractType>(MarshallerType.class);
AbstractType comparator;
AbstractType subcomparator;
AbstractType default_validator;
AbstractType key_validator;
comparator = parseType(cfDef.getComparator_type());
subcomparator = parseType(cfDef.getSubcomparator_type());
default_validator = parseType(cfDef.getDefault_validation_class());
key_validator = parseType(cfDef.getKey_validation_class());
marshallers.put(MarshallerType.COMPARATOR, comparator);
marshallers.put(MarshallerType.DEFAULT_VALIDATOR, default_validator);
marshallers.put(MarshallerType.KEY_VALIDATOR, key_validator);
marshallers.put(MarshallerType.SUBCOMPARATOR, subcomparator);
return marshallers;
}
/** get the validators */
protected Map<ByteBuffer, AbstractType> getValidatorMap(CfDef cfDef) throws IOException
{
Map<ByteBuffer, AbstractType> validators = new HashMap<ByteBuffer, AbstractType>();
for (ColumnDef cd : cfDef.getColumn_metadata())
{
if (cd.getValidation_class() != null && !cd.getValidation_class().isEmpty())
{
AbstractType validator = null;
try
{
validator = TypeParser.parse(cd.getValidation_class());
if (validator instanceof CounterColumnType)
validator = LongType.instance;
validators.put(cd.name, validator);
}
catch (ConfigurationException e)
{
throw new IOException(e);
}
catch (SyntaxException e)
{
throw new IOException(e);
}
}
}
return validators;
}
}