blob: 26461ace73dc818dbb462150688caaae1fac646d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.phoenix.spark
import org.apache.phoenix.query.QueryConstants
import org.apache.phoenix.schema.types.{PBinary, PBinaryArray, PBoolean, PBooleanArray, PChar,
PCharArray, PDate, PDateArray, PDecimal, PDecimalArray, PDouble, PDoubleArray, PFloat, PFloatArray,
PInteger, PIntegerArray, PLong, PLongArray, PSmallint, PSmallintArray, PTime, PTimeArray,
PTimestamp, PTimestampArray, PTinyint, PTinyintArray, PUnsignedDate, PUnsignedDateArray,
PUnsignedDouble, PUnsignedDoubleArray, PUnsignedFloat, PUnsignedFloatArray, PUnsignedInt,
PUnsignedIntArray, PUnsignedLong, PUnsignedLongArray, PUnsignedSmallint, PUnsignedSmallintArray,
PUnsignedTime, PUnsignedTimeArray, PUnsignedTimestamp, PUnsignedTimestampArray, PUnsignedTinyint,
PUnsignedTinyintArray, PVarbinary, PVarbinaryArray, PVarchar, PVarcharArray}
import org.apache.phoenix.util.{ColumnInfo, SchemaUtil}
import org.apache.spark.sql.types.{ArrayType, BinaryType, BooleanType, ByteType, DataType, DateType,
DecimalType, DoubleType, FloatType, IntegerType, LongType, ShortType, StringType, StructField,
StructType, TimestampType}
object SparkSchemaUtil {
def phoenixSchemaToCatalystSchema(columnList: Seq[ColumnInfo], dateAsTimestamp: Boolean = false) : StructType = {
val structFields = columnList.map(ci => {
val structType = phoenixTypeToCatalystType(ci, dateAsTimestamp)
StructField(normalizeColumnName(ci.getColumnName), structType)
})
new StructType(structFields.toArray)
}
def normalizeColumnName(columnName: String) = {
val unescapedColumnName = SchemaUtil.getUnEscapedFullColumnName(columnName)
var normalizedColumnName = ""
if (unescapedColumnName.indexOf(QueryConstants.NAME_SEPARATOR) < 0) {
normalizedColumnName = unescapedColumnName
}
else {
// split by separator to get the column family and column name
val tokens = unescapedColumnName.split(QueryConstants.NAME_SEPARATOR_REGEX)
normalizedColumnName = if (tokens(0) == "0") tokens(1) else unescapedColumnName
}
normalizedColumnName
}
// Lookup table for Phoenix types to Spark catalyst types
def phoenixTypeToCatalystType(columnInfo: ColumnInfo, dateAsTimestamp: Boolean): DataType = columnInfo.getPDataType match {
case t if t.isInstanceOf[PVarchar] || t.isInstanceOf[PChar] => StringType
case t if t.isInstanceOf[PLong] || t.isInstanceOf[PUnsignedLong] => LongType
case t if t.isInstanceOf[PInteger] || t.isInstanceOf[PUnsignedInt] => IntegerType
case t if t.isInstanceOf[PSmallint] || t.isInstanceOf[PUnsignedSmallint] => ShortType
case t if t.isInstanceOf[PTinyint] || t.isInstanceOf[PUnsignedTinyint] => ByteType
case t if t.isInstanceOf[PFloat] || t.isInstanceOf[PUnsignedFloat] => FloatType
case t if t.isInstanceOf[PDouble] || t.isInstanceOf[PUnsignedDouble] => DoubleType
// Use Spark system default precision for now (explicit to work with < 1.5)
case t if t.isInstanceOf[PDecimal] =>
if (columnInfo.getPrecision == null || columnInfo.getPrecision < 0) DecimalType(38, 18) else DecimalType(columnInfo.getPrecision, columnInfo.getScale)
case t if t.isInstanceOf[PTimestamp] || t.isInstanceOf[PUnsignedTimestamp] => TimestampType
case t if t.isInstanceOf[PTime] || t.isInstanceOf[PUnsignedTime] => TimestampType
case t if (t.isInstanceOf[PDate] || t.isInstanceOf[PUnsignedDate]) && !dateAsTimestamp => DateType
case t if (t.isInstanceOf[PDate] || t.isInstanceOf[PUnsignedDate]) && dateAsTimestamp => TimestampType
case t if t.isInstanceOf[PBoolean] => BooleanType
case t if t.isInstanceOf[PVarbinary] || t.isInstanceOf[PBinary] => BinaryType
case t if t.isInstanceOf[PIntegerArray] || t.isInstanceOf[PUnsignedIntArray] => ArrayType(IntegerType, containsNull = true)
case t if t.isInstanceOf[PBooleanArray] => ArrayType(BooleanType, containsNull = true)
case t if t.isInstanceOf[PVarcharArray] || t.isInstanceOf[PCharArray] => ArrayType(StringType, containsNull = true)
case t if t.isInstanceOf[PVarbinaryArray] || t.isInstanceOf[PBinaryArray] => ArrayType(BinaryType, containsNull = true)
case t if t.isInstanceOf[PLongArray] || t.isInstanceOf[PUnsignedLongArray] => ArrayType(LongType, containsNull = true)
case t if t.isInstanceOf[PSmallintArray] || t.isInstanceOf[PUnsignedSmallintArray] => ArrayType(ShortType, containsNull = true)
case t if t.isInstanceOf[PTinyintArray] || t.isInstanceOf[PUnsignedTinyintArray] => ArrayType(ByteType, containsNull = true)
case t if t.isInstanceOf[PFloatArray] || t.isInstanceOf[PUnsignedFloatArray] => ArrayType(FloatType, containsNull = true)
case t if t.isInstanceOf[PDoubleArray] || t.isInstanceOf[PUnsignedDoubleArray] => ArrayType(DoubleType, containsNull = true)
case t if t.isInstanceOf[PDecimalArray] => ArrayType(
if (columnInfo.getPrecision == null || columnInfo.getPrecision < 0) DecimalType(38, 18) else DecimalType(columnInfo.getPrecision, columnInfo.getScale), containsNull = true)
case t if t.isInstanceOf[PTimestampArray] || t.isInstanceOf[PUnsignedTimestampArray] => ArrayType(TimestampType, containsNull = true)
case t if t.isInstanceOf[PDateArray] || t.isInstanceOf[PUnsignedDateArray] => ArrayType(TimestampType, containsNull = true)
case t if t.isInstanceOf[PTimeArray] || t.isInstanceOf[PUnsignedTimeArray] => ArrayType(TimestampType, containsNull = true)
}
}