blob: f98ec3bc673879bb9cf1c68a5d05216608c74d2f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.examples.util
import java.io.File
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{CarbonContext, SaveMode}
import org.apache.carbondata.core.util.CarbonProperties
// scalastyle:off println
object ExampleUtils {
def currentPath: String = new File(this.getClass.getResource("/").getPath + "../../")
.getCanonicalPath
val storeLocation = currentPath + "/target/store"
def createCarbonContext(appName: String): CarbonContext = {
val sc = new SparkContext(new SparkConf()
.setAppName(appName)
.setMaster("local[2]"))
sc.setLogLevel("ERROR")
println(s"Starting $appName using spark version ${sc.version}")
val cc = new CarbonContext(sc, storeLocation, currentPath + "/target/carbonmetastore")
CarbonProperties.getInstance()
.addProperty("carbon.storelocation", storeLocation)
cc
}
/**
* This func will write a sample CarbonData file containing following schema:
* c1: String, c2: String, c3: Double
* Returns table path
*/
def writeSampleCarbonFile(cc: CarbonContext, tableName: String, numRows: Int = 1000): String = {
cc.sql(s"DROP TABLE IF EXISTS $tableName")
writeDataframe(cc, tableName, numRows, SaveMode.Overwrite)
s"$storeLocation/default/$tableName"
}
/**
* This func will append data to the CarbonData file
* Returns table path
*/
def appendSampleCarbonFile(cc: CarbonContext, tableName: String, numRows: Int = 1000): String = {
writeDataframe(cc, tableName, numRows, SaveMode.Append)
s"$storeLocation/default/$tableName"
}
/**
* create a new dataframe and write to CarbonData file, based on save mode
*/
private def writeDataframe(
cc: CarbonContext, tableName: String, numRows: Int, mode: SaveMode): Unit = {
// use CarbonContext to write CarbonData files
import cc.implicits._
val sc = cc.sparkContext
val df = sc.parallelize(1 to numRows, 2)
.map(x => ("a", "b", x))
.toDF("c1", "c2", "c3")
// save dataframe directl to carbon file without tempCSV
df.write
.format("carbondata")
.option("tableName", tableName)
.option("compress", "true")
.option("tempCSV", "false")
.mode(mode)
.save()
}
def cleanSampleCarbonFile(cc: CarbonContext, tableName: String): Unit = {
cc.sql(s"DROP TABLE IF EXISTS $tableName")
}
}
// scalastyle:on println