blob: f0de47d527733c04c7e88b698c8ab1607ec2cf66 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.carbondata.spark.util
import org.apache.spark.sql.test.util.QueryTest
import org.apache.spark.sql.{CarbonEnv, CarbonRelation}
import org.scalatest.BeforeAndAfterAll
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.processing.exception.DataLoadingException
import org.apache.carbondata.processing.util.TableOptionConstant
import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
/**
* test case for external column dictionary generation
* also support complicated type
*/
class ExternalColumnDictionaryTestCase extends QueryTest with BeforeAndAfterAll {
var extComplexRelation: CarbonRelation = _
var verticalDelimiteRelation: CarbonRelation = _
var loadSqlRelation: CarbonRelation = _
var filePath: String = _
var pwd: String = _
var complexFilePath1: String = _
var complexFilePath2: String = _
var extColDictFilePath1: String = _
var extColDictFilePath2: String = _
var extColDictFilePath3: String = _
var header: String = _
var header2: String = _
def buildTestData() = {
filePath = s"${resourcesPath}/sample.csv"
complexFilePath1 = s"${resourcesPath}/complexdata2.csv"
complexFilePath2 = s"${resourcesPath}/verticalDelimitedData.csv"
extColDictFilePath1 = s"deviceInformationId:${resourcesPath}/deviceInformationId.csv," +
s"mobile.imei:${resourcesPath}/mobileimei.csv," +
s"mac:${resourcesPath}/mac.csv," +
s"locationInfo.ActiveCountry:${resourcesPath}/locationInfoActiveCountry.csv"
extColDictFilePath2 = s"deviceInformationId:${resourcesPath}/deviceInformationId2.csv"
extColDictFilePath3 = s"channelsId:${resourcesPath}/channelsId.csv"
header = "deviceInformationId,channelsId,ROMSize,purchasedate,mobile,MAC,locationinfo,proddate,gamePointId,contractNumber"
header2 = "deviceInformationId,channelsId,contractNumber"
}
def buildTable() = {
try {
sql("""CREATE TABLE extComplextypes (deviceInformationId int,
channelsId string, ROMSize string, purchasedate string,
mobile struct<imei:string, imsi:string>, MAC array<string>,
locationinfo array<struct<ActiveAreaId:int, ActiveCountry:string,
ActiveProvince:string, Activecity:string, ActiveDistrict:string, ActiveStreet:string>>,
proddate struct<productionDate:string,activeDeactivedate:array<string>>,
gamePointId double,contractNumber double)
STORED BY 'org.apache.carbondata.format'
TBLPROPERTIES('DICTIONARY_INCLUDE' = 'deviceInformationId, channelsId')
""")
} catch {
case ex: Throwable => LOGGER.error(ex.getMessage + "\r\n" + ex.getStackTraceString)
}
try {
sql("""CREATE TABLE verticalDelimitedTable (deviceInformationId int,
channelsId string,contractNumber double)
STORED BY 'org.apache.carbondata.format'
TBLPROPERTIES('DICTIONARY_INCLUDE' = 'deviceInformationId, channelsId')
""")
} catch {
case ex: Throwable => LOGGER.error(ex.getMessage + "\r\n" + ex.getStackTraceString)
}
try {
sql("""CREATE TABLE loadSqlTest (deviceInformationId int,
channelsId string, ROMSize string, purchasedate string,
mobile struct<imei:string, imsi:string>, MAC array<string>,
locationinfo array<struct<ActiveAreaId:int, ActiveCountry:string,
ActiveProvince:string, Activecity:string, ActiveDistrict:string, ActiveStreet:string>>,
proddate struct<productionDate:string,activeDeactivedate:array<string>>,
gamePointId double,contractNumber double)
STORED BY 'org.apache.carbondata.format'
TBLPROPERTIES('DICTIONARY_INCLUDE' = 'deviceInformationId')
""")
} catch {
case ex: Throwable => LOGGER.error(ex.getMessage + "\r\n" + ex.getStackTraceString)
}
}
def buildRelation() = {
val catalog = CarbonEnv.get.carbonMetastore
extComplexRelation = catalog.lookupRelation1(Option(CarbonCommonConstants.DATABASE_DEFAULT_NAME),
"extComplextypes")(sqlContext)
.asInstanceOf[CarbonRelation]
verticalDelimiteRelation = catalog.lookupRelation1(Option(CarbonCommonConstants.DATABASE_DEFAULT_NAME),
"verticalDelimitedTable")(sqlContext)
.asInstanceOf[CarbonRelation]
loadSqlRelation = catalog.lookupRelation1(Option(CarbonCommonConstants.DATABASE_DEFAULT_NAME),
"loadSqlTest")(sqlContext)
.asInstanceOf[CarbonRelation]
}
def buildCarbonLoadModel(relation: CarbonRelation,
filePath:String,
header: String,
extColFilePath: String,
csvDelimiter: String = ","): CarbonLoadModel = {
val carbonLoadModel = new CarbonLoadModel
carbonLoadModel.setTableName(relation.tableMeta.carbonTableIdentifier.getDatabaseName)
carbonLoadModel.setDatabaseName(relation.tableMeta.carbonTableIdentifier.getTableName)
val table = relation.tableMeta.carbonTable
val carbonSchema = new CarbonDataLoadSchema(table)
carbonLoadModel.setDatabaseName(table.getDatabaseName)
carbonLoadModel.setTableName(table.getFactTableName)
carbonLoadModel.setCarbonDataLoadSchema(carbonSchema)
carbonLoadModel.setFactFilePath(filePath)
carbonLoadModel.setCsvHeader(header)
carbonLoadModel.setCsvDelimiter(csvDelimiter)
carbonLoadModel.setComplexDelimiterLevel1("\\$")
carbonLoadModel.setComplexDelimiterLevel2("\\:")
carbonLoadModel.setColDictFilePath(extColFilePath)
carbonLoadModel.setQuoteChar("\"");
carbonLoadModel.setSerializationNullFormat(
TableOptionConstant.SERIALIZATION_NULL_FORMAT.getName + ",\\N")
carbonLoadModel.setDefaultTimestampFormat(CarbonProperties.getInstance().getProperty(
CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT))
carbonLoadModel.setDefaultDateFormat(CarbonProperties.getInstance().getProperty(
CarbonCommonConstants.CARBON_DATE_FORMAT,
CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT))
carbonLoadModel.setCsvHeaderColumns(CommonUtil.getCsvHeaderColumns(carbonLoadModel))
carbonLoadModel.setMaxColumns("100")
carbonLoadModel
}
override def beforeAll {
buildTestData
buildTable
buildRelation
}
test("Generate global dictionary from external column file") {
// load the first time
var carbonLoadModel = buildCarbonLoadModel(extComplexRelation, complexFilePath1,
header, extColDictFilePath1)
GlobalDictionaryUtil.generateGlobalDictionary(sqlContext, carbonLoadModel,
extComplexRelation.tableMeta.storePath)
// check whether the dictionary is generated
DictionaryTestCaseUtil.checkDictionary(
extComplexRelation, "deviceInformationId", "10086")
// load the second time
carbonLoadModel = buildCarbonLoadModel(extComplexRelation, complexFilePath1,
header, extColDictFilePath2)
GlobalDictionaryUtil.generateGlobalDictionary(sqlContext, carbonLoadModel,
extComplexRelation.tableMeta.storePath)
// check the old dictionary and whether the new distinct value is generated
DictionaryTestCaseUtil.checkDictionary(
extComplexRelation, "deviceInformationId", "10086")
DictionaryTestCaseUtil.checkDictionary(
extComplexRelation, "deviceInformationId", "10011")
}
test("When csv delimiter is not comma") {
// when csv delimiter is comma
var carbonLoadModel = buildCarbonLoadModel(extComplexRelation, complexFilePath1,
header, extColDictFilePath3)
GlobalDictionaryUtil.generateGlobalDictionary(sqlContext, carbonLoadModel,
extComplexRelation.tableMeta.storePath)
// check whether the dictionary is generated
DictionaryTestCaseUtil.checkDictionary(
extComplexRelation, "channelsId", "1421|")
// when csv delimiter is not comma
carbonLoadModel = buildCarbonLoadModel(verticalDelimiteRelation, complexFilePath2,
header2, extColDictFilePath3, "|")
GlobalDictionaryUtil.generateGlobalDictionary(sqlContext, carbonLoadModel,
verticalDelimiteRelation.tableMeta.storePath)
// check whether the dictionary is generated
DictionaryTestCaseUtil.checkDictionary(
verticalDelimiteRelation, "channelsId", "1431,")
}
test("LOAD DML with COLUMNDICT option") {
try {
sql(s"""
LOAD DATA LOCAL INPATH "$complexFilePath1" INTO TABLE loadSqlTest
OPTIONS('single_pass'='true','FILEHEADER'='$header', 'COLUMNDICT'='$extColDictFilePath1')
""")
} catch {
case ex: Exception =>
LOGGER.error(ex.getMessage + "\r\n" + ex.getStackTraceString)
assert(false)
}
DictionaryTestCaseUtil.checkDictionary(
loadSqlRelation, "deviceInformationId", "10086")
}
test("COLUMNDICT and ALL_DICTIONARY_PATH can not be used together") {
try {
sql(s"""
LOAD DATA LOCAL INPATH "$complexFilePath1" INTO TABLE loadSqlTest
OPTIONS('COLUMNDICT'='$extColDictFilePath1',"ALL_DICTIONARY_PATH"='$extColDictFilePath1')
""")
assert(false)
} catch {
case ex: MalformedCarbonCommandException =>
assertResult(ex.getMessage)("Error: COLUMNDICT and ALL_DICTIONARY_PATH can not be used together " +
"in options")
case _: Throwable => assert(false)
}
}
test("Measure can not use COLUMNDICT") {
try {
sql(s"""
LOAD DATA LOCAL INPATH "$complexFilePath1" INTO TABLE loadSqlTest
OPTIONS('single_pass'='true','FILEHEADER'='$header', 'COLUMNDICT'='gamePointId:$filePath')
""")
assert(false)
} catch {
case ex: DataLoadingException =>
assertResult(ex.getMessage)("Column gamePointId is not a key column. Only key column can be part " +
"of dictionary and used in COLUMNDICT option.")
case _: Throwable => assert(false)
}
}
override def afterAll: Unit = {
sql("DROP TABLE extComplextypes")
sql("DROP TABLE verticalDelimitedTable")
sql("DROP TABLE loadSqlTest")
}
}