blob: e942dde3769f33b40adc4462313b51d6f1c6514c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.presto.integrationtest
import java.io.File
import java.sql.SQLException
import java.util
import org.apache.commons.io.FileUtils
import org.scalatest.{BeforeAndAfterAll, FunSuiteLike}
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.filesystem.CarbonFile
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.metadata.datatype.DataTypes
import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
import org.apache.carbondata.presto.server.PrestoServer
import org.apache.carbondata.sdk.file.{CarbonWriter, Field, Schema}
class PrestoTestNonTransactionalTableFiles extends FunSuiteLike with BeforeAndAfterAll {
private val logger = LogServiceFactory
.getLogService(classOf[PrestoTestNonTransactionalTableFiles].getCanonicalName)
private val rootPath = new File(this.getClass.getResource("/").getPath
+ "../../../..").getCanonicalPath
private val storePath = s"$rootPath/integration/presto/target/store"
private val systemPath = s"$rootPath/integration/presto/target/system"
private val writerPath = storePath + "/sdk_output/files"
private val prestoServer = new PrestoServer
override def beforeAll: Unit = {
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_SYSTEM_FOLDER_LOCATION,
systemPath)
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME,
"Presto")
val map = new util.HashMap[String, String]()
map.put("hive.metastore", "file")
map.put("hive.metastore.catalog.dir", s"file://$storePath")
prestoServer.startServer(storePath, "sdk_output", map)
}
override def afterAll(): Unit = {
prestoServer.stopServer()
CarbonUtil.deleteFoldersAndFiles(FileFactory.getCarbonFile(storePath))
}
def buildTestDataSingleFile(): Any = {
FileUtils.deleteDirectory(new File(writerPath))
createTable
buildTestData(3, null)
}
def buildTestDataMultipleFiles(): Any = {
FileUtils.deleteDirectory(new File(writerPath))
createTable
buildTestData(1000000, null)
}
private def createTable = {
prestoServer.execute("drop table if exists sdk_output.files")
prestoServer.execute("drop schema if exists sdk_output")
prestoServer.execute("create schema sdk_output")
prestoServer
.execute(
"create table sdk_output.files(name varchar, age int, id tinyint, height double, salary " +
"real) with" +
"(format='CARBON') ")
}
def buildTestData(rows: Int, options: util.Map[String, String]): Any = {
buildTestData(rows, options, List("name"))
}
// prepare sdk writer output
def buildTestData(rows: Int,
options: util.Map[String, String],
sortColumns: List[String]): Any = {
val schema = new StringBuilder()
.append("[ \n")
.append(" {\"NaMe\":\"string\"},\n")
.append(" {\"age\":\"int\"},\n")
.append(" {\"id\":\"byte\"},\n")
.append(" {\"height\":\"double\"},\n")
.append(" {\"salary\":\"float\"}\n")
.append("]")
.toString()
try {
val builder = CarbonWriter.builder()
val writer =
if (options != null) {
builder.outputPath(writerPath)
.sortBy(sortColumns.toArray)
.uniqueIdentifier(
System.currentTimeMillis).withBlockSize(2).withLoadOptions(options)
.withCsvInput(Schema.parseJson(schema)).writtenBy("TestNonTransactionalCarbonTable")
.build()
} else {
builder.outputPath(writerPath)
.sortBy(sortColumns.toArray)
.uniqueIdentifier(
System.currentTimeMillis).withBlockSize(2)
.withCsvInput(Schema.parseJson(schema)).writtenBy("TestNonTransactionalCarbonTable")
.build()
}
var i = 0
while (i < rows) {
if ((options != null) && (i < 3)) {
// writing a bad record
writer
.write(Array[String]("robot" + i,
String.valueOf(i),
String.valueOf(i.toDouble / 2),
"robot",
String.valueOf(i.toFloat / 2)))
} else {
writer
.write(Array[String]("robot" + i,
String.valueOf(i),
String.valueOf(i % 128),
String.valueOf(i.toDouble / 2),
String.valueOf(i.toFloat / 2)))
}
i += 1
}
if (options != null) {
//Keep one valid record. else carbon data file will not generate
writer
.write(Array[String]("robot" + i,
String.valueOf(i),
String.valueOf(i),
String.valueOf(i.toDouble / 2),
String.valueOf(i.toFloat / 2)))
}
writer.close()
} catch {
case ex: Throwable => throw new RuntimeException(ex)
}
}
// prepare sdk writer output with other schema
def buildTestDataOtherDataType(rows: Int, sortColumns: Array[String]): Any = {
val fields: Array[Field] = new Array[Field](5)
// same column name, but name as boolean type
fields(0) = new Field("name", DataTypes.BOOLEAN)
fields(1) = new Field("age", DataTypes.INT)
fields(2) = new Field("id", DataTypes.BYTE)
fields(3) = new Field("height", DataTypes.DOUBLE)
fields(4) = new Field("salary", DataTypes.FLOAT)
try {
val builder = CarbonWriter.builder()
val writer =
builder.outputPath(writerPath)
.uniqueIdentifier(System.currentTimeMillis()).withBlockSize(2).sortBy(sortColumns)
.withCsvInput(new Schema(fields)).writtenBy("TestNonTransactionalCarbonTable").build()
var i = 0
while (i < rows) {
writer
.write(Array[String]("true",
String.valueOf(i),
String.valueOf(i),
String.valueOf(i.toDouble / 2),
String.valueOf(i.toFloat / 2)))
i += 1
}
writer.close()
} catch {
case ex: Throwable => throw new RuntimeException(ex)
}
}
def cleanTestData(): Unit = {
FileUtils.deleteDirectory(new File(writerPath))
}
def deleteFile(path: String, extension: String): Unit = {
val file: CarbonFile = FileFactory
.getCarbonFile(path, FileFactory.getFileType(path))
for (eachDir <- file.listFiles) {
if (!eachDir.isDirectory) {
if (eachDir.getName.endsWith(extension)) {
CarbonUtil.deleteFoldersAndFilesSilent(eachDir)
}
} else {
deleteFile(eachDir.getPath, extension)
}
}
}
test("test show schemas") {
buildTestDataSingleFile()
val actualResult: List[Map[String, Any]] = prestoServer
.executeQuery("show schemas ")
assert(actualResult
.equals(List(Map("Schema" -> "information_schema"),
Map("Schema" -> "sdk_output"))))
cleanTestData()
}
test("test show tables") {
buildTestDataSingleFile()
val actualResult: List[Map[String, Any]] = prestoServer
.executeQuery("show tables")
assert(actualResult
.equals(List(Map("Table" -> "files"))))
cleanTestData()
}
test("test read sdk output files") {
buildTestDataSingleFile()
val actualResult: List[Map[String, Any]] = prestoServer
.executeQuery("SELECT COUNT(*) AS RESULT FROM files ")
val expectedResult: List[Map[String, Any]] = List(Map("RESULT" -> 3))
assert(actualResult.equals(expectedResult))
cleanTestData()
}
test("test read multiple carbondata and index files") {
buildTestDataMultipleFiles()
val actualResult: List[Map[String, Any]] = prestoServer
.executeQuery("SELECT COUNT(*) AS RESULT FROM files ")
val expectedResult: List[Map[String, Any]] = List(Map("RESULT" -> 1000000))
assert(actualResult.equals(expectedResult))
cleanTestData()
}
test("test reading different schema") {
buildTestDataSingleFile()
buildTestDataOtherDataType(3, null)
val exception =
intercept[SQLException] {
val actualResult: List[Map[String, Any]] = prestoServer
.executeQuery("select count(*) as RESULT from files ")
}
assert(exception.getMessage()
.contains("All the files doesn't have same schema"))
cleanTestData()
}
test("test reading without carbon index file") {
buildTestDataSingleFile()
deleteFile(writerPath, CarbonCommonConstants.UPDATE_INDEX_FILE_EXT)
val exception =
intercept[SQLException] {
val actualResult: List[Map[String, Any]] = prestoServer
.executeQuery("select * from files ")
}
assert(exception.getMessage()
.contains("No Index files are present in the table location"))
cleanTestData()
}
test("test select all columns") {
buildTestDataSingleFile()
val actualResult: List[Map[String, Any]] = prestoServer
.executeQuery("select * from files ")
val expectedResult: List[Map[String, Any]] = List(Map(
"name" -> "robot0",
"height" -> 0.0,
"age" -> 0,
"salary" -> 0.0,
"id" -> 0),
Map("name" -> "robot1",
"height" -> 0.5,
"age" -> 1,
"salary" -> 0.5,
"id" -> 1),
Map("name" -> "robot2",
"height" -> 1.0,
"age" -> 2,
"salary" -> 1.0,
"id" -> 2))
assert(actualResult.toString() equals expectedResult.toString())
}
}