blob: 2c2c6e187a6f3b9aa51f0eab17b9119d4eea50a6 [file] [log] [blame]
/*
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package org.apache.phoenix.spark
import java.sql.DriverManager
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.{HBaseConfiguration, HConstants}
import org.apache.hadoop.io.NullWritable
import org.apache.phoenix.jdbc.PhoenixDriver
import org.apache.phoenix.mapreduce.PhoenixInputFormat
import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil
import org.apache.phoenix.schema.types._
import org.apache.phoenix.util.ColumnInfo
import org.apache.spark._
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, Row, SQLContext}
import scala.collection.JavaConverters._
class PhoenixRDD(sc: SparkContext, table: String, columns: Seq[String],
predicate: Option[String] = None,
zkUrl: Option[String] = None,
@transient conf: Configuration, dateAsTimestamp: Boolean = false,
tenantId: Option[String] = None
)
extends RDD[PhoenixRecordWritable](sc, Nil) {
// Make sure to register the Phoenix driver
DriverManager.registerDriver(new PhoenixDriver)
@transient lazy val phoenixConf = {
getPhoenixConfiguration
}
val phoenixRDD = sc.newAPIHadoopRDD(phoenixConf,
classOf[PhoenixInputFormat[PhoenixRecordWritable]],
classOf[NullWritable],
classOf[PhoenixRecordWritable])
override protected def getPartitions: Array[Partition] = {
phoenixRDD.partitions
}
override protected def getPreferredLocations(split: Partition): Seq[String] = {
phoenixRDD.preferredLocations(split)
}
@DeveloperApi
override def compute(split: Partition, context: TaskContext) = {
phoenixRDD.compute(split, context).map(r => r._2)
}
def printPhoenixConfig(conf: Configuration): Unit = {
for (mapEntry <- conf.iterator().asScala) {
val k = mapEntry.getKey
val v = mapEntry.getValue
if (k.startsWith("phoenix")) {
println(s"$k = $v")
}
}
}
def getPhoenixConfiguration: Configuration = {
// This is just simply not serializable, so don't try, but clone it because
// PhoenixConfigurationUtil mutates it.
val config = HBaseConfiguration.create(conf)
PhoenixConfigurationUtil.setInputClass(config, classOf[PhoenixRecordWritable])
PhoenixConfigurationUtil.setInputTableName(config, table)
if(!columns.isEmpty) {
PhoenixConfigurationUtil.setSelectColumnNames(config, columns.toArray)
}
if(predicate.isDefined) {
PhoenixConfigurationUtil.setInputTableConditions(config, predicate.get)
}
// Override the Zookeeper URL if present. Throw exception if no address given.
zkUrl match {
case Some(url) => ConfigurationUtil.setZookeeperURL(config, url)
case _ => {
if(ConfigurationUtil.getZookeeperURL(config).isEmpty) {
throw new UnsupportedOperationException(
s"One of zkUrl or '${HConstants.ZOOKEEPER_QUORUM}' config property must be provided"
)
}
}
}
tenantId match {
case Some(tid) => ConfigurationUtil.setTenantId(config, tid)
case _ =>
}
config
}
// Convert our PhoenixRDD to a DataFrame
def toDataFrame(sqlContext: SQLContext): DataFrame = {
val columnInfoList = PhoenixConfigurationUtil
.getSelectColumnMetadataList(new Configuration(phoenixConf))
.asScala
// Keep track of the sql type and column names.
val columns: Seq[(String, Int)] = columnInfoList.map(ci => {
(ci.getDisplayName, ci.getSqlType)
})
// Lookup the Spark catalyst types from the Phoenix schema
val structFields = phoenixSchemaToCatalystSchema(columnInfoList).toArray
// Create the data frame from the converted Spark schema
sqlContext.createDataFrame(map(pr => {
// Create a sequence of column data
val rowSeq = columns.map { case (name, sqlType) =>
val res = pr.resultMap(name)
// Special handling for data types
if (dateAsTimestamp && (sqlType == 91 || sqlType == 19) && res!=null) { // 91 is the defined type for Date and 19 for UNSIGNED_DATE
new java.sql.Timestamp(res.asInstanceOf[java.sql.Date].getTime)
} else if ((sqlType == 92 || sqlType == 18) && res!=null) { // 92 is the defined type for Time and 18 for UNSIGNED_TIME
new java.sql.Timestamp(res.asInstanceOf[java.sql.Time].getTime)
} else {
res
}
}
// Create a Spark Row from the sequence
Row.fromSeq(rowSeq)
}), new StructType(structFields))
}
def phoenixSchemaToCatalystSchema(columnList: Seq[ColumnInfo]) = {
columnList.map(ci => {
val structType = phoenixTypeToCatalystType(ci)
StructField(ci.getDisplayName, structType)
})
}
// Lookup table for Phoenix types to Spark catalyst types
def phoenixTypeToCatalystType(columnInfo: ColumnInfo): DataType = columnInfo.getPDataType match {
case t if t.isInstanceOf[PVarchar] || t.isInstanceOf[PChar] => StringType
case t if t.isInstanceOf[PLong] || t.isInstanceOf[PUnsignedLong] => LongType
case t if t.isInstanceOf[PInteger] || t.isInstanceOf[PUnsignedInt] => IntegerType
case t if t.isInstanceOf[PSmallint] || t.isInstanceOf[PUnsignedSmallint] => ShortType
case t if t.isInstanceOf[PTinyint] || t.isInstanceOf[PUnsignedTinyint] => ByteType
case t if t.isInstanceOf[PFloat] || t.isInstanceOf[PUnsignedFloat] => FloatType
case t if t.isInstanceOf[PDouble] || t.isInstanceOf[PUnsignedDouble] => DoubleType
// Use Spark system default precision for now (explicit to work with < 1.5)
case t if t.isInstanceOf[PDecimal] =>
if (columnInfo.getPrecision == null || columnInfo.getPrecision < 0) DecimalType(38, 18) else DecimalType(columnInfo.getPrecision, columnInfo.getScale)
case t if t.isInstanceOf[PTimestamp] || t.isInstanceOf[PUnsignedTimestamp] => TimestampType
case t if t.isInstanceOf[PTime] || t.isInstanceOf[PUnsignedTime] => TimestampType
case t if (t.isInstanceOf[PDate] || t.isInstanceOf[PUnsignedDate]) && !dateAsTimestamp => DateType
case t if (t.isInstanceOf[PDate] || t.isInstanceOf[PUnsignedDate]) && dateAsTimestamp => TimestampType
case t if t.isInstanceOf[PBoolean] => BooleanType
case t if t.isInstanceOf[PVarbinary] || t.isInstanceOf[PBinary] => BinaryType
case t if t.isInstanceOf[PIntegerArray] || t.isInstanceOf[PUnsignedIntArray] => ArrayType(IntegerType, containsNull = true)
case t if t.isInstanceOf[PBooleanArray] => ArrayType(BooleanType, containsNull = true)
case t if t.isInstanceOf[PVarcharArray] || t.isInstanceOf[PCharArray] => ArrayType(StringType, containsNull = true)
case t if t.isInstanceOf[PVarbinaryArray] || t.isInstanceOf[PBinaryArray] => ArrayType(BinaryType, containsNull = true)
case t if t.isInstanceOf[PLongArray] || t.isInstanceOf[PUnsignedLongArray] => ArrayType(LongType, containsNull = true)
case t if t.isInstanceOf[PSmallintArray] || t.isInstanceOf[PUnsignedSmallintArray] => ArrayType(IntegerType, containsNull = true)
case t if t.isInstanceOf[PTinyintArray] || t.isInstanceOf[PUnsignedTinyintArray] => ArrayType(ByteType, containsNull = true)
case t if t.isInstanceOf[PFloatArray] || t.isInstanceOf[PUnsignedFloatArray] => ArrayType(FloatType, containsNull = true)
case t if t.isInstanceOf[PDoubleArray] || t.isInstanceOf[PUnsignedDoubleArray] => ArrayType(DoubleType, containsNull = true)
case t if t.isInstanceOf[PDecimalArray] => ArrayType(
if (columnInfo.getPrecision == null || columnInfo.getPrecision < 0) DecimalType(38, 18) else DecimalType(columnInfo.getPrecision, columnInfo.getScale), containsNull = true)
case t if t.isInstanceOf[PTimestampArray] || t.isInstanceOf[PUnsignedTimestampArray] => ArrayType(TimestampType, containsNull = true)
case t if t.isInstanceOf[PDateArray] || t.isInstanceOf[PUnsignedDateArray] => ArrayType(TimestampType, containsNull = true)
case t if t.isInstanceOf[PTimeArray] || t.isInstanceOf[PUnsignedTimeArray] => ArrayType(TimestampType, containsNull = true)
}
}