blob: d700cb74256d1658311e853e339cc20e77fd7079 [file] [log] [blame]
package org.apache.cassandra.hadoop.pig;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.util.Date;
import java.util.UUID;
import org.apache.cassandra.db.marshal.*;
import org.apache.cassandra.hadoop.ConfigHelper;
import org.apache.cassandra.serializers.CollectionSerializer;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.UUIDGen;
import org.apache.hadoop.conf.Configuration;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
@Deprecated
public class StorageHelper
{
// system environment variables that can be set to configure connection info:
// alternatively, Hadoop JobConf variables can be set using keys from ConfigHelper
public final static String PIG_INPUT_RPC_PORT = "PIG_INPUT_RPC_PORT";
public final static String PIG_INPUT_INITIAL_ADDRESS = "PIG_INPUT_INITIAL_ADDRESS";
public final static String PIG_INPUT_PARTITIONER = "PIG_INPUT_PARTITIONER";
public final static String PIG_OUTPUT_RPC_PORT = "PIG_OUTPUT_RPC_PORT";
public final static String PIG_OUTPUT_INITIAL_ADDRESS = "PIG_OUTPUT_INITIAL_ADDRESS";
public final static String PIG_OUTPUT_PARTITIONER = "PIG_OUTPUT_PARTITIONER";
public final static String PIG_RPC_PORT = "PIG_RPC_PORT";
public final static String PIG_INITIAL_ADDRESS = "PIG_INITIAL_ADDRESS";
public final static String PIG_PARTITIONER = "PIG_PARTITIONER";
public final static String PIG_INPUT_FORMAT = "PIG_INPUT_FORMAT";
public final static String PIG_OUTPUT_FORMAT = "PIG_OUTPUT_FORMAT";
public final static String PIG_INPUT_SPLIT_SIZE = "PIG_INPUT_SPLIT_SIZE";
public final static String PARTITION_FILTER_SIGNATURE = "cassandra.partition.filter";
protected static void setConnectionInformation(Configuration conf)
{
if (System.getenv(PIG_RPC_PORT) != null)
{
ConfigHelper.setInputRpcPort(conf, System.getenv(PIG_RPC_PORT));
ConfigHelper.setOutputRpcPort(conf, System.getenv(PIG_RPC_PORT));
}
if (System.getenv(PIG_INPUT_RPC_PORT) != null)
ConfigHelper.setInputRpcPort(conf, System.getenv(PIG_INPUT_RPC_PORT));
if (System.getenv(PIG_OUTPUT_RPC_PORT) != null)
ConfigHelper.setOutputRpcPort(conf, System.getenv(PIG_OUTPUT_RPC_PORT));
if (System.getenv(PIG_INITIAL_ADDRESS) != null)
{
ConfigHelper.setInputInitialAddress(conf, System.getenv(PIG_INITIAL_ADDRESS));
ConfigHelper.setOutputInitialAddress(conf, System.getenv(PIG_INITIAL_ADDRESS));
}
if (System.getenv(PIG_INPUT_INITIAL_ADDRESS) != null)
ConfigHelper.setInputInitialAddress(conf, System.getenv(PIG_INPUT_INITIAL_ADDRESS));
if (System.getenv(PIG_OUTPUT_INITIAL_ADDRESS) != null)
ConfigHelper.setOutputInitialAddress(conf, System.getenv(PIG_OUTPUT_INITIAL_ADDRESS));
if (System.getenv(PIG_PARTITIONER) != null)
{
ConfigHelper.setInputPartitioner(conf, System.getenv(PIG_PARTITIONER));
ConfigHelper.setOutputPartitioner(conf, System.getenv(PIG_PARTITIONER));
}
if(System.getenv(PIG_INPUT_PARTITIONER) != null)
ConfigHelper.setInputPartitioner(conf, System.getenv(PIG_INPUT_PARTITIONER));
if(System.getenv(PIG_OUTPUT_PARTITIONER) != null)
ConfigHelper.setOutputPartitioner(conf, System.getenv(PIG_OUTPUT_PARTITIONER));
}
protected static Object cassandraToObj(AbstractType validator, ByteBuffer value, int nativeProtocolVersion)
{
if (validator instanceof DecimalType || validator instanceof InetAddressType)
return validator.getString(value);
if (validator instanceof CollectionType)
{
// For CollectionType, the compose() method assumes the v3 protocol format of collection, which
// is not correct here since we query using the CQL-over-thrift interface which use the pre-v3 format
return ((CollectionSerializer)validator.getSerializer()).deserializeForNativeProtocol(value, nativeProtocolVersion);
}
return validator.compose(value);
}
/** set the value to the position of the tuple */
protected static void setTupleValue(Tuple pair, int position, Object value) throws ExecException
{
if (value instanceof BigInteger)
pair.set(position, ((BigInteger) value).intValue());
else if (value instanceof ByteBuffer)
pair.set(position, new DataByteArray(ByteBufferUtil.getArray((ByteBuffer) value)));
else if (value instanceof UUID)
pair.set(position, new DataByteArray(UUIDGen.decompose((java.util.UUID) value)));
else if (value instanceof Date)
pair.set(position, TimestampType.instance.decompose((Date) value).getLong());
else
pair.set(position, value);
}
/** get pig type for the cassandra data type*/
protected static byte getPigType(AbstractType type)
{
if (type instanceof LongType || type instanceof DateType || type instanceof TimestampType) // DateType is bad and it should feel bad
return DataType.LONG;
else if (type instanceof IntegerType || type instanceof Int32Type) // IntegerType will overflow at 2**31, but is kept for compatibility until pig has a BigInteger
return DataType.INTEGER;
else if (type instanceof AsciiType || type instanceof UTF8Type || type instanceof DecimalType || type instanceof InetAddressType)
return DataType.CHARARRAY;
else if (type instanceof FloatType)
return DataType.FLOAT;
else if (type instanceof DoubleType)
return DataType.DOUBLE;
else if (type instanceof AbstractCompositeType || type instanceof CollectionType)
return DataType.TUPLE;
return DataType.BYTEARRAY;
}
}