blob: c7efdd9d399139048c3a38a5434b1dd413c8e617 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.phoenix.pig.util;
import java.io.IOException;
import java.sql.Date;
import java.sql.SQLException;
import java.sql.Time;
import java.sql.Timestamp;
import java.sql.Types;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.phoenix.mapreduce.PhoenixRecordWritable;
import org.apache.phoenix.schema.types.PArrayDataType;
import org.apache.phoenix.schema.types.PBinary;
import org.apache.phoenix.schema.types.PBoolean;
import org.apache.phoenix.schema.types.PChar;
import org.apache.phoenix.schema.types.PDataType;
import org.apache.phoenix.schema.types.PDate;
import org.apache.phoenix.schema.types.PDecimal;
import org.apache.phoenix.schema.types.PDouble;
import org.apache.phoenix.schema.types.PFloat;
import org.apache.phoenix.schema.types.PInteger;
import org.apache.phoenix.schema.types.PLong;
import org.apache.phoenix.schema.types.PSmallint;
import org.apache.phoenix.schema.types.PTime;
import org.apache.phoenix.schema.types.PTimestamp;
import org.apache.phoenix.schema.types.PTinyint;
import org.apache.phoenix.schema.types.PUnsignedDate;
import org.apache.phoenix.schema.types.PUnsignedDouble;
import org.apache.phoenix.schema.types.PUnsignedFloat;
import org.apache.phoenix.schema.types.PUnsignedInt;
import org.apache.phoenix.schema.types.PUnsignedLong;
import org.apache.phoenix.schema.types.PUnsignedSmallint;
import org.apache.phoenix.schema.types.PUnsignedTime;
import org.apache.phoenix.schema.types.PUnsignedTimestamp;
import org.apache.phoenix.schema.types.PUnsignedTinyint;
import org.apache.phoenix.schema.types.PVarbinary;
import org.apache.phoenix.schema.types.PVarchar;
import org.apache.phoenix.schema.types.PhoenixArray;
import org.apache.pig.PigException;
import org.apache.pig.ResourceSchema.ResourceFieldSchema;
import org.apache.pig.backend.hadoop.hbase.HBaseBinaryConverter;
import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.joda.time.DateTime;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableMap.Builder;
public final class TypeUtil {
private static final Logger LOG = LoggerFactory.getLogger(TypeUtil.class);
private static final HBaseBinaryConverter BINARY_CONVERTER = new HBaseBinaryConverter();
private static final ImmutableMap<PDataType, Byte> PHOENIX_TO_PIG_TYPE = init();
private static final TupleFactory TUPLE_FACTORY = TupleFactory.getInstance();
private TypeUtil() {}
/**
* @return map of Phoenix to Pig data types.
*/
private static ImmutableMap<PDataType, Byte> init() {
final ImmutableMap.Builder<PDataType, Byte> builder = new Builder<PDataType, Byte>();
builder.put(PLong.INSTANCE, DataType.LONG);
builder.put(PVarbinary.INSTANCE, DataType.BYTEARRAY);
builder.put(PChar.INSTANCE, DataType.CHARARRAY);
builder.put(PVarchar.INSTANCE, DataType.CHARARRAY);
builder.put(PDouble.INSTANCE, DataType.DOUBLE);
builder.put(PFloat.INSTANCE, DataType.FLOAT);
builder.put(PInteger.INSTANCE, DataType.INTEGER);
builder.put(PTinyint.INSTANCE, DataType.INTEGER);
builder.put(PSmallint.INSTANCE, DataType.INTEGER);
builder.put(PDecimal.INSTANCE, DataType.BIGDECIMAL);
builder.put(PTime.INSTANCE, DataType.DATETIME);
builder.put(PTimestamp.INSTANCE, DataType.DATETIME);
builder.put(PBoolean.INSTANCE, DataType.BOOLEAN);
builder.put(PDate.INSTANCE, DataType.DATETIME);
builder.put(PUnsignedDate.INSTANCE, DataType.DATETIME);
builder.put(PUnsignedDouble.INSTANCE, DataType.DOUBLE);
builder.put(PUnsignedFloat.INSTANCE, DataType.FLOAT);
builder.put(PUnsignedInt.INSTANCE, DataType.INTEGER);
builder.put(PUnsignedLong.INSTANCE, DataType.LONG);
builder.put(PUnsignedSmallint.INSTANCE, DataType.INTEGER);
builder.put(PUnsignedTime.INSTANCE, DataType.DATETIME);
builder.put(PUnsignedTimestamp.INSTANCE, DataType.DATETIME);
builder.put(PUnsignedTinyint.INSTANCE, DataType.INTEGER);
return builder.build();
}
/**
* This method returns the most appropriate PDataType associated with the incoming Pig type. Note for Pig DataType
* DATETIME, returns DATE as inferredSqlType. This is later used to make a cast to targetPhoenixType accordingly.
* See {@link #castPigTypeToPhoenix(Object, byte, PDataType)}
*
* @param obj
* @return PDataType
*/
public static PDataType getType(Object obj, byte type) {
if (obj == null) { return null; }
PDataType sqlType;
switch (type) {
case DataType.BYTEARRAY:
sqlType = PVarbinary.INSTANCE;
break;
case DataType.CHARARRAY:
sqlType = PVarchar.INSTANCE;
break;
case DataType.DOUBLE:
case DataType.BIGDECIMAL:
sqlType = PDouble.INSTANCE;
break;
case DataType.FLOAT:
sqlType = PFloat.INSTANCE;
break;
case DataType.INTEGER:
sqlType = PInteger.INSTANCE;
break;
case DataType.LONG:
case DataType.BIGINTEGER:
sqlType = PLong.INSTANCE;
break;
case DataType.BOOLEAN:
sqlType = PBoolean.INSTANCE;
break;
case DataType.DATETIME:
sqlType = PDate.INSTANCE;
break;
case DataType.BYTE:
sqlType = PTinyint.INSTANCE;
break;
default:
throw new RuntimeException("Unknown type " + obj.getClass().getName() + " passed to PhoenixHBaseStorage");
}
return sqlType;
}
/**
* This method encodes a value with Phoenix data type. It begins with checking whether an object is TUPLE. A {@link Tuple} is mapped
* to a {@link PArrayDataType} . It then checks if it is BINARY and makes
* a call to {@link #castBytes(Object, PDataType)} to convert bytes to targetPhoenixType. It returns a {@link RuntimeException}
* when object can not be coerced.
*
* @param o
* @param targetPhoenixType
* @return Object
* @throws SQLException
*/
public static Object castPigTypeToPhoenix(Object o, byte objectType, PDataType targetPhoenixType) throws SQLException {
if(DataType.TUPLE == objectType) {
Tuple tuple = (Tuple)o;
List<Object> data = tuple.getAll();
return data.toArray();
}
PDataType inferredPType = getType(o, objectType);
if (inferredPType == null) { return null; }
if (inferredPType == PVarbinary.INSTANCE) {
try {
o = castBytes(o, targetPhoenixType);
if (targetPhoenixType != PVarbinary.INSTANCE && targetPhoenixType != PBinary.INSTANCE) {
inferredPType = getType(o, DataType.findType(o));
}
} catch (IOException e) {
throw new RuntimeException("Error while casting bytes for object " + o);
}
}
if (inferredPType == PDate.INSTANCE) {
int inferredSqlType = targetPhoenixType.getSqlType();
if (inferredSqlType == Types.DATE) { return new Date(((DateTime)o).getMillis()); }
if (inferredSqlType == Types.TIME) { return new Time(((DateTime)o).getMillis()); }
if (inferredSqlType == Types.TIMESTAMP) { return new Timestamp(((DateTime)o).getMillis()); }
}
if (targetPhoenixType == inferredPType || inferredPType.isCoercibleTo(targetPhoenixType)) { return inferredPType
.toObject(o, targetPhoenixType); }
throw new RuntimeException(o.getClass().getName() + " cannot be coerced to " + targetPhoenixType.toString());
}
/**
* This method converts bytes to the target type required for Phoenix. It uses {@link HBaseBinaryConverter} for the
* conversion.
*
* @param o
* @param targetPhoenixType
* @return Object
* @throws IOException
*/
private static Object castBytes(Object o, PDataType targetPhoenixType) throws IOException {
byte[] bytes = ((DataByteArray)o).get();
if (PDataType.equalsAny(targetPhoenixType, PChar.INSTANCE, PVarchar.INSTANCE)) {
return BINARY_CONVERTER.bytesToCharArray(bytes);
} else if (PDataType.equalsAny(targetPhoenixType, PUnsignedSmallint.INSTANCE, PSmallint.INSTANCE)) {
return BINARY_CONVERTER.bytesToInteger(bytes).shortValue();
} else if (PDataType.equalsAny(targetPhoenixType, PUnsignedTinyint.INSTANCE, PTinyint.INSTANCE)) {
return BINARY_CONVERTER.bytesToInteger(bytes).byteValue();
} else if (PDataType.equalsAny(targetPhoenixType, PUnsignedInt.INSTANCE, PInteger.INSTANCE)) {
return BINARY_CONVERTER.bytesToInteger(bytes);
} else if (targetPhoenixType.equals(PBoolean.INSTANCE)) {
return BINARY_CONVERTER.bytesToBoolean(bytes);
} else if (PDataType.equalsAny(targetPhoenixType, PFloat.INSTANCE, PUnsignedFloat.INSTANCE)) {
return BINARY_CONVERTER.bytesToFloat(bytes);
} else if (PDataType.equalsAny(targetPhoenixType, PDouble.INSTANCE, PUnsignedDouble.INSTANCE)) {
return BINARY_CONVERTER.bytesToDouble(bytes);
} else if (PDataType.equalsAny(targetPhoenixType, PUnsignedLong.INSTANCE, PLong.INSTANCE)) {
return BINARY_CONVERTER.bytesToLong(bytes);
} else if (PDataType.equalsAny(targetPhoenixType, PVarbinary.INSTANCE, PBinary.INSTANCE)) {
return bytes;
} else {
return o;
}
}
/**
* Transforms the PhoenixRecord to Pig {@link Tuple}.
*
* @param record
* @param projectedColumns
* @return
* @throws IOException
*/
public static Tuple transformToTuple(final PhoenixRecordWritable record, final ResourceFieldSchema[] projectedColumns)
throws IOException {
Map<String, Object> columnValues = record.getResultMap();
if (columnValues == null || columnValues.size() == 0 || projectedColumns == null
|| projectedColumns.length != columnValues.size()) { return null; }
int numColumns = columnValues.size();
Tuple tuple = TUPLE_FACTORY.newTuple(numColumns);
try {
int i = 0;
for (Map.Entry<String,Object> entry : columnValues.entrySet()) {
final ResourceFieldSchema fieldSchema = projectedColumns[i];
Object object = entry.getValue();
if (object == null) {
tuple.set(i++, null);
continue;
}
switch (fieldSchema.getType()) {
case DataType.BYTEARRAY:
byte[] bytes = PDataType.fromTypeId(PBinary.INSTANCE.getSqlType()).toBytes(object);
tuple.set(i, new DataByteArray(bytes, 0, bytes.length));
break;
case DataType.CHARARRAY:
tuple.set(i, DataType.toString(object));
break;
case DataType.DOUBLE:
tuple.set(i, DataType.toDouble(object));
break;
case DataType.FLOAT:
tuple.set(i, DataType.toFloat(object));
break;
case DataType.INTEGER:
tuple.set(i, DataType.toInteger(object));
break;
case DataType.LONG:
tuple.set(i, DataType.toLong(object));
break;
case DataType.BOOLEAN:
tuple.set(i, DataType.toBoolean(object));
break;
case DataType.DATETIME:
if (object instanceof java.sql.Timestamp)
tuple.set(i,new DateTime(((java.sql.Timestamp)object).getTime()));
else
tuple.set(i,new DateTime(object));
break;
case DataType.BIGDECIMAL:
tuple.set(i, DataType.toBigDecimal(object));
break;
case DataType.BIGINTEGER:
tuple.set(i, DataType.toBigInteger(object));
break;
case DataType.TUPLE:
{
PhoenixArray array = (PhoenixArray)object;
Tuple t = TUPLE_FACTORY.newTuple(array.getDimensions());;
for(int j = 0 ; j < array.getDimensions() ; j++) {
t.set(j,array.getElement(j));
}
tuple.set(i, t);
break;
}
default:
throw new RuntimeException(String.format(" Not supported [%s] pig type", fieldSchema));
}
i++;
}
} catch (Exception ex) {
final String errorMsg = String.format(" Error transforming PhoenixRecord to Tuple [%s] ", ex.getMessage());
LOG.error(errorMsg);
throw new PigException(errorMsg);
}
return tuple;
}
/**
* Returns the mapping pig data type for a given phoenix data type.
*
* @param phoenixDataType
* @return
*/
public static Byte getPigDataTypeForPhoenixType(final PDataType phoenixDataType) {
Preconditions.checkNotNull(phoenixDataType);
if(phoenixDataType instanceof PArrayDataType) {
return DataType.TUPLE;
}
final Byte pigDataType = PHOENIX_TO_PIG_TYPE.get(phoenixDataType);
if (LOG.isDebugEnabled()) {
LOG.debug(String.format(" For PhoenixDataType [%s] , pigDataType is [%s] ",
phoenixDataType.getSqlTypeName(), DataType.findTypeName(pigDataType)));
}
return pigDataType;
}
}