blob: 7881b93b4fc091edbcbdb57398de787433cb408d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.spark
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.compress.GzipCodec
import org.apache.spark.sql._
import org.apache.spark.sql.execution.command.LoadTable
import org.apache.spark.sql.types._
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.metadata.datatype.{DataTypes => CarbonType}
class CarbonDataFrameWriter(val dataFrame: DataFrame) {
private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
def saveAsCarbonFile(parameters: Map[String, String] = Map()): Unit = {
checkContext()
val cc = CarbonContext.getInstance(dataFrame.sqlContext.sparkContext)
// create a new table using dataframe's schema and write its content into the table
cc.sql(makeCreateTableString(dataFrame.schema, new CarbonOption(parameters)))
writeToCarbonFile(parameters)
}
def appendToCarbonFile(parameters: Map[String, String] = Map()): Unit = {
// append the data as a new load
checkContext()
writeToCarbonFile(parameters)
}
private def writeToCarbonFile(parameters: Map[String, String] = Map()): Unit = {
val options = new CarbonOption(parameters)
val cc = CarbonContext.getInstance(dataFrame.sqlContext.sparkContext)
if (options.tempCSV) {
loadTempCSV(options, cc)
} else {
loadDataFrame(options, cc)
}
}
/**
* Firstly, saving DataFrame to CSV files
* Secondly, load CSV files
* @param options
* @param cc
*/
private def loadTempCSV(options: CarbonOption, cc: CarbonContext): Unit = {
// temporary solution: write to csv file, then load the csv into carbon
val storePath = CarbonEnv.get.carbonMetastore.storePath
val tempCSVFolder = new StringBuilder(storePath).append(CarbonCommonConstants.FILE_SEPARATOR)
.append("tempCSV")
.append(CarbonCommonConstants.UNDERSCORE).append(options.dbName)
.append(CarbonCommonConstants.UNDERSCORE).append(options.tableName)
.append(CarbonCommonConstants.UNDERSCORE).append(System.nanoTime()).toString
writeToTempCSVFile(tempCSVFolder, options)
val tempCSVPath = new Path(tempCSVFolder)
val fs = tempCSVPath.getFileSystem(dataFrame.sqlContext.sparkContext.hadoopConfiguration)
def countSize(): Double = {
var size: Double = 0
val itor = fs.listFiles(tempCSVPath, true)
while (itor.hasNext) {
val f = itor.next()
if (f.getPath.getName.startsWith("part-")) {
size += f.getLen
}
}
size
}
LOGGER.info(s"temporary CSV file size: ${countSize / 1024 / 1024} MB")
try {
cc.sql(makeLoadString(tempCSVFolder, options))
} finally {
fs.delete(tempCSVPath, true)
}
}
private def checkContext(): Unit = {
// To avoid derby problem, dataframe need to be writen and read using CarbonContext
require(dataFrame.sqlContext.isInstanceOf[CarbonContext],
"Error in saving dataframe to carbon file, must use CarbonContext to save dataframe"
)
}
private def writeToTempCSVFile(tempCSVFolder: String, options: CarbonOption): Unit = {
val strRDD = dataFrame.rdd.mapPartitions { case iter =>
new Iterator[String] {
override def hasNext = iter.hasNext
def convertToCSVString(seq: Seq[Any]): String = {
val build = new java.lang.StringBuilder()
if (seq.head != null) {
build.append(seq.head.toString)
}
val itemIter = seq.tail.iterator
while (itemIter.hasNext) {
build.append(CarbonCommonConstants.COMMA)
val value = itemIter.next()
if (value != null) {
build.append(value.toString)
}
}
build.toString
}
override def next: String = {
convertToCSVString(iter.next.toSeq)
}
}
}
if (options.compress) {
strRDD.saveAsTextFile(tempCSVFolder, classOf[GzipCodec])
} else {
strRDD.saveAsTextFile(tempCSVFolder)
}
}
/**
* Loading DataFrame directly without saving DataFrame to CSV files.
* @param options
* @param cc
*/
private def loadDataFrame(options: CarbonOption, cc: CarbonContext): Unit = {
val header = dataFrame.columns.mkString(",")
LoadTable(
Some(options.dbName),
options.tableName,
null,
Seq(),
Map("fileheader" -> header) ++ options.toMap,
isOverwriteExist = false,
null,
Some(dataFrame),
None).run(cc)
}
private def convertToCarbonType(sparkType: DataType): String = {
sparkType match {
case StringType => CarbonType.STRING.getName
case IntegerType => CarbonType.INT.getName
case ShortType => "smallint"
case LongType => "bigint"
case FloatType => CarbonType.DOUBLE.getName
case DoubleType => CarbonType.DOUBLE.getName
case TimestampType => CarbonType.TIMESTAMP.getName
case DateType => CarbonType.DATE.getName
case decimal: DecimalType => s"${CarbonType.DECIMAL.getName} (${decimal.precision}" +
s", ${decimal.scale})"
case other => sys.error(s"unsupported type: $other")
}
}
private def makeCreateTableString(schema: StructType, options: CarbonOption): String = {
val properties = Map(
"DICTIONARY_INCLUDE" -> options.dictionaryInclude,
"DICTIONARY_EXCLUDE" -> options.dictionaryExclude
).filter(_._2.isDefined).map(p => s"'${p._1}' = '${p._2.get}'").mkString(",")
val carbonSchema = schema.map { field =>
s"${ field.name } ${ convertToCarbonType(field.dataType) }"
}
s"""
CREATE TABLE IF NOT EXISTS ${options.dbName}.${options.tableName}
(${ carbonSchema.mkString(", ") })
STORED BY '${ CarbonContext.datasourceName }'
${ if (properties.nonEmpty) " TBLPROPERTIES (" + properties + ")" else ""}
"""
}
private def makeLoadString(csvFolder: String, options: CarbonOption): String = {
s"""
LOAD DATA INPATH '$csvFolder'
INTO TABLE ${options.dbName}.${options.tableName}
OPTIONS ('FILEHEADER' = '${dataFrame.columns.mkString(",")}',
'SINGLE_PASS' = '${options.singlePass}')
"""
}
}