blob: 69248d6aa807bf69dfbca2fba4630c2be0ed8e4c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.carbondata.spark.util
import org.apache.spark.sql.common.util.Spark2QueryTest
import org.apache.spark.sql.hive.CarbonRelation
import org.apache.spark.sql.{CarbonEnv, SparkSession}
import org.scalatest.BeforeAndAfterAll
import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.compression.CompressorFactory
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.processing.exception.DataLoadingException
import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel, CarbonLoadModelBuilder, LoadOption}
import org.apache.carbondata.processing.util.TableOptionConstant
/**
* test case for external column dictionary generation
* also support complicated type
*/
class ExternalColumnDictionaryTestCase extends Spark2QueryTest with BeforeAndAfterAll {
var extComplexRelation: CarbonRelation = _
var verticalDelimiteRelation: CarbonRelation = _
var loadSqlRelation: CarbonRelation = _
var filePath: String = _
var pwd: String = _
var complexFilePath1: String = _
var complexFilePath2: String = _
var extColDictFilePath1: String = _
var extColDictFilePath2: String = _
var extColDictFilePath3: String = _
var header: String = _
var header2: String = _
def buildTestData() = {
filePath = s"${ resourcesPath }/sample.csv"
complexFilePath1 = s"${ resourcesPath }/complexdata2.csv"
complexFilePath2 = s"${ resourcesPath }/verticalDelimitedData.csv"
extColDictFilePath1 = s"deviceInformationId:${ resourcesPath }/deviceInformationId.csv," +
s"mobile.imei:${ resourcesPath }/mobileimei.csv," +
s"mac:${ resourcesPath }/mac.csv," +
s"locationInfo.ActiveCountry:${ resourcesPath
}/locationInfoActiveCountry.csv"
extColDictFilePath2 = s"deviceInformationId:${ resourcesPath }/deviceInformationId2.csv"
extColDictFilePath3 = s"channelsId:${ resourcesPath }/channelsId.csv"
header = "deviceInformationId,channelsId,ROMSize,purchasedate,mobile,MAC," +
"locationinfo,proddate,gamePointId,contractNumber"
header2 = "deviceInformationId,channelsId,contractNumber"
}
def buildTable() = {
try {
sql(
"""CREATE TABLE extComplextypes (deviceInformationId int,
channelsId string, ROMSize string, purchasedate string,
mobile struct<imei:string, imsi:string>, MAC array<string>,
locationinfo array<struct<ActiveAreaId:int, ActiveCountry:string,
ActiveProvince:string, Activecity:string, ActiveDistrict:string, ActiveStreet:string>>,
proddate struct<productionDate:string,activeDeactivedate:array<string>>,
gamePointId double,contractNumber double)
STORED BY 'org.apache.carbondata.format'
TBLPROPERTIES('DICTIONARY_INCLUDE' = 'deviceInformationId, channelsId')
""")
} catch {
case ex: Throwable => LOGGER.error(ex.getMessage + "\r\n" + ex.getStackTraceString)
}
try {
sql(
"""CREATE TABLE verticalDelimitedTable (deviceInformationId int,
channelsId string,contractNumber double)
STORED BY 'org.apache.carbondata.format'
TBLPROPERTIES('DICTIONARY_INCLUDE' = 'deviceInformationId, channelsId')
""")
} catch {
case ex: Throwable => LOGGER.error(ex.getMessage + "\r\n" + ex.getStackTraceString)
}
try {
sql(
"""CREATE TABLE loadSqlTest (deviceInformationId int,
channelsId string, ROMSize string, purchasedate string,
mobile struct<imei:string, imsi:string>, MAC array<string>,
locationinfo array<struct<ActiveAreaId:int, ActiveCountry:string,
ActiveProvince:string, Activecity:string, ActiveDistrict:string, ActiveStreet:string>>,
proddate struct<productionDate:string,activeDeactivedate:array<string>>,
gamePointId double,contractNumber double)
STORED BY 'org.apache.carbondata.format'
TBLPROPERTIES('DICTIONARY_INCLUDE' = 'deviceInformationId')
""")
} catch {
case ex: Throwable => LOGGER.error(ex.getMessage + "\r\n" + ex.getStackTraceString)
}
}
def buildRelation() = {
val warehouse = s"$resourcesPath/target/warehouse"
val storeLocation = s"$resourcesPath/target/store"
val metastoredb = s"$resourcesPath/target"
CarbonProperties.getInstance()
.addProperty("carbon.custom.distribution", "true")
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,"FORCE")
import org.apache.spark.sql.CarbonSession._
val spark = SparkSession
.builder()
.master("local")
.appName("CarbonSessionExample")
.config("spark.sql.warehouse.dir", warehouse)
.config("spark.network.timeout", "600s")
.config("spark.executor.heartbeatInterval", "600s")
.config("carbon.enable.vector.reader","false")
.getOrCreateCarbonSession(storeLocation, metastoredb)
val catalog = CarbonEnv.getInstance(spark).carbonMetastore
extComplexRelation = catalog
.lookupRelation(Option(CarbonCommonConstants.DATABASE_DEFAULT_NAME),
"extComplextypes")(spark)
.asInstanceOf[CarbonRelation]
verticalDelimiteRelation = catalog
.lookupRelation(Option(CarbonCommonConstants.DATABASE_DEFAULT_NAME),
"verticalDelimitedTable")(spark)
.asInstanceOf[CarbonRelation]
loadSqlRelation = catalog.lookupRelation(Option(CarbonCommonConstants.DATABASE_DEFAULT_NAME),
"loadSqlTest")(spark)
.asInstanceOf[CarbonRelation]
}
def buildCarbonLoadModel(
relation: CarbonRelation,
filePath: String,
header: String,
extColFilePath: String,
csvDelimiter: String = ","): CarbonLoadModel = {
val carbonLoadModel = new CarbonLoadModel
carbonLoadModel.setTableName(relation.carbonTable.getDatabaseName)
carbonLoadModel.setDatabaseName(relation.carbonTable.getTableName)
val table = relation.carbonTable
val carbonSchema = new CarbonDataLoadSchema(table)
carbonLoadModel.setDatabaseName(table.getDatabaseName)
carbonLoadModel.setTableName(table.getTableName)
carbonLoadModel.setCarbonTransactionalTable(table.isTransactionalTable)
carbonLoadModel.setTablePath(relation.carbonTable.getTablePath)
carbonLoadModel.setCarbonDataLoadSchema(carbonSchema)
carbonLoadModel.setFactFilePath(filePath)
carbonLoadModel.setCsvHeader(header)
carbonLoadModel.setCsvDelimiter(csvDelimiter)
carbonLoadModel.setComplexDelimiter("$")
carbonLoadModel.setComplexDelimiter(":")
carbonLoadModel.setColDictFilePath(extColFilePath)
carbonLoadModel.setQuoteChar("\"");
carbonLoadModel.setSerializationNullFormat(
TableOptionConstant.SERIALIZATION_NULL_FORMAT.getName + ",\\N")
carbonLoadModel.setDefaultTimestampFormat(CarbonProperties.getInstance().getProperty(
CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT))
carbonLoadModel.setDefaultDateFormat(CarbonProperties.getInstance().getProperty(
CarbonCommonConstants.CARBON_DATE_FORMAT,
CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT))
carbonLoadModel.setCsvHeaderColumns(
LoadOption.getCsvHeaderColumns(carbonLoadModel, FileFactory.getConfiguration))
carbonLoadModel.setMaxColumns("100")
// Create table and metadata folders if not exist
val metadataDirectoryPath = CarbonTablePath.getMetadataPath(table.getTablePath)
val fileType = FileFactory.getFileType(metadataDirectoryPath)
if (!FileFactory.isFileExist(metadataDirectoryPath, fileType)) {
FileFactory.mkdirs(metadataDirectoryPath, fileType)
}
import scala.collection.JavaConverters._
val columnCompressor = table.getTableInfo.getFactTable.getTableProperties.asScala
.getOrElse(CarbonCommonConstants.COMPRESSOR,
CompressorFactory.getInstance().getCompressor.getName)
carbonLoadModel.setColumnCompressor(columnCompressor)
carbonLoadModel
}
override def beforeAll {
cleanAllTables
buildTestData
buildTable
buildRelation
}
test("Generate global dictionary from external column file") {
// load the first time
var carbonLoadModel = buildCarbonLoadModel(extComplexRelation, complexFilePath1,
header, extColDictFilePath1)
GlobalDictionaryUtil.generateGlobalDictionary(
sqlContext,
carbonLoadModel,
FileFactory.getConfiguration)
// check whether the dictionary is generated
DictionaryTestCaseUtil.checkDictionary(
extComplexRelation, "deviceInformationId", "10086")
// load the second time
carbonLoadModel = buildCarbonLoadModel(extComplexRelation, complexFilePath1,
header, extColDictFilePath2)
GlobalDictionaryUtil.generateGlobalDictionary(
sqlContext,
carbonLoadModel,
FileFactory.getConfiguration)
// check the old dictionary and whether the new distinct value is generated
DictionaryTestCaseUtil.checkDictionary(
extComplexRelation, "deviceInformationId", "10086")
DictionaryTestCaseUtil.checkDictionary(
extComplexRelation, "deviceInformationId", "10011")
}
test("When csv delimiter is not comma") {
// when csv delimiter is comma
var carbonLoadModel = buildCarbonLoadModel(extComplexRelation, complexFilePath1,
header, extColDictFilePath3)
GlobalDictionaryUtil.generateGlobalDictionary(
sqlContext,
carbonLoadModel,
FileFactory.getConfiguration)
// check whether the dictionary is generated
DictionaryTestCaseUtil.checkDictionary(
extComplexRelation, "channelsId", "1421|")
// when csv delimiter is not comma
carbonLoadModel = buildCarbonLoadModel(verticalDelimiteRelation, complexFilePath2,
header2, extColDictFilePath3, "|")
GlobalDictionaryUtil.generateGlobalDictionary(
sqlContext,
carbonLoadModel,
FileFactory.getConfiguration)
// check whether the dictionary is generated
DictionaryTestCaseUtil.checkDictionary(
verticalDelimiteRelation, "channelsId", "1431,")
}
test("LOAD DML with COLUMNDICT option") {
try {
sql(
s"""
LOAD DATA LOCAL INPATH "$complexFilePath1" INTO TABLE loadSqlTest
OPTIONS('FILEHEADER'='$header', 'COLUMNDICT'='$extColDictFilePath1', 'single_pass'='true')
""")
} catch {
case ex: Exception =>
LOGGER.error(ex.getMessage + "\r\n" + ex.getStackTraceString)
assert(false)
}
DictionaryTestCaseUtil.checkDictionary(
loadSqlRelation, "deviceInformationId", "10086")
}
test("COLUMNDICT and ALL_DICTIONARY_PATH can not be used together") {
val ex = intercept[MalformedCarbonCommandException] {
sql(
s"""
LOAD DATA LOCAL INPATH "$complexFilePath1" INTO TABLE loadSqlTest
OPTIONS('COLUMNDICT'='$extColDictFilePath1',"ALL_DICTIONARY_PATH"='$extColDictFilePath1')
""")
}
assertResult(ex.getMessage)(
"Error: COLUMNDICT and ALL_DICTIONARY_PATH can not be used together " +
"in options")
}
test("Measure can not use COLUMNDICT") {
val ex = intercept[DataLoadingException] {
sql(
s"""
LOAD DATA LOCAL INPATH "$complexFilePath1" INTO TABLE loadSqlTest
OPTIONS('single_pass'='true','FILEHEADER'='$header', 'COLUMNDICT'='gamePointId:$filePath')
""")
}
assertResult(ex.getMessage)(
"Column gamePointId is not a key column. Only key column can be part " +
"of dictionary and used in COLUMNDICT option.")
}
def cleanAllTables: Unit = {
sql("DROP TABLE IF EXISTS extComplextypes")
sql("DROP TABLE IF EXISTS verticalDelimitedTable")
sql("DROP TABLE IF EXISTS loadSqlTest")
}
override def afterAll: Unit = {
cleanAllTables
}
}