blob: fd4ac35cbec40042aa7cfd04478ea68473c9f33a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.presto.integrationtest
import java.io.{BufferedInputStream, File, FileInputStream}
import java.sql.SQLException
import java.util
import org.apache.commons.codec.binary.Hex
import org.apache.commons.io.FileUtils
import org.apache.commons.lang.RandomStringUtils
import org.scalatest.{BeforeAndAfterAll, FunSuiteLike}
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.filesystem.CarbonFile
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.metadata.datatype.{DataTypes, Field}
import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
import org.apache.carbondata.presto.server.PrestoServer
import org.apache.carbondata.sdk.file.{CarbonWriter, Schema}
class PrestoTestNonTransactionalTableFiles extends FunSuiteLike with BeforeAndAfterAll {
private val logger = LogServiceFactory
.getLogService(classOf[PrestoTestNonTransactionalTableFiles].getCanonicalName)
private val rootPath = new File(this.getClass.getResource("/").getPath
+ "../../../..").getCanonicalPath
private val storePath = s"$rootPath/integration/presto/target/store"
private val writerPath = storePath + "/sdk_output/files"
private val writerPathBinary = storePath + "/sdk_output/files1"
private val prestoServer = new PrestoServer
private var varcharString = new String
override def beforeAll: Unit = {
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME,
"Presto")
val map = new util.HashMap[String, String]()
map.put("hive.metastore", "file")
map.put("hive.metastore.catalog.dir", s"file://$storePath")
prestoServer.startServer("sdk_output", map)
}
override def afterAll(): Unit = {
prestoServer.stopServer()
CarbonUtil.deleteFoldersAndFiles(FileFactory.getCarbonFile(storePath))
}
def buildTestDataSingleFile(): Any = {
FileUtils.deleteDirectory(new File(writerPath))
createTable
buildTestData(3, null, true)
}
def buildTestDataMultipleFiles(): Any = {
FileUtils.deleteDirectory(new File(writerPath))
createTable
buildTestData(1000000, null, false)
}
private def createTable = {
prestoServer.execute("drop table if exists sdk_output.files")
prestoServer.execute("drop schema if exists sdk_output")
prestoServer.execute("create schema sdk_output")
prestoServer
.execute(
"create table sdk_output.files(name varchar, age int, id tinyint, height double, salary " +
"real, address varchar) with" +
"(format='CARBON') ")
}
private def createTableBinary = {
prestoServer.execute("drop table if exists sdk_output.files1")
prestoServer.execute("drop schema if exists sdk_output")
prestoServer.execute("create schema sdk_output")
prestoServer
.execute(
"create table sdk_output.files1(name boolean, age int, id varbinary, height double, salary " +
"real) with" +
"(format='CARBON') ")
}
def buildTestData(rows: Int, options: util.Map[String, String], varcharDataGen: Boolean): Any = {
buildTestData(rows, options, List("name"), varcharDataGen)
}
// prepare sdk writer output
def buildTestData(rows: Int,
options: util.Map[String, String],
sortColumns: List[String],
varcharDataGen: Boolean): Any = {
val schema = new StringBuilder()
.append("[ \n")
.append(" {\"NaMe\":\"string\"},\n")
.append(" {\"age\":\"int\"},\n")
.append(" {\"id\":\"byte\"},\n")
.append(" {\"height\":\"double\"},\n")
.append(" {\"salary\":\"float\"},\n")
.append(" {\"address\":\"varchar\"}\n")
.append("]")
.toString()
// Build Varchar Column data
var varcharValue: String = {
if (varcharDataGen) {
RandomStringUtils.randomAlphabetic(32001)
} else {
"a"
}
}
varcharString = varcharValue
try {
val builder = CarbonWriter.builder()
val writer =
if (options != null) {
builder.outputPath(writerPath)
.sortBy(sortColumns.toArray)
.uniqueIdentifier(
System.currentTimeMillis).withBlockSize(2).withLoadOptions(options)
.withCsvInput(Schema.parseJson(schema)).writtenBy("TestNonTransactionalCarbonTable")
.build()
} else {
builder.outputPath(writerPath)
.sortBy(sortColumns.toArray)
.uniqueIdentifier(
System.currentTimeMillis).withBlockSize(2)
.withCsvInput(Schema.parseJson(schema)).writtenBy("TestNonTransactionalCarbonTable")
.build()
}
var i = 0
while (i < rows) {
if ((options != null) && (i < 3)) {
// writing a bad record
writer
.write(Array[String]("robot" + i,
String.valueOf(i),
String.valueOf(i.toDouble / 2),
"robot",
String.valueOf(i.toFloat / 2),
String.valueOf(varcharValue)))
} else {
writer
.write(Array[String]("robot" + i,
String.valueOf(i),
String.valueOf(i % 128),
String.valueOf(i.toDouble / 2),
String.valueOf(i.toFloat / 2),
String.valueOf(varcharValue)))
}
i += 1
}
if (options != null) {
//Keep one valid record. else carbon data file will not generate
writer
.write(Array[String]("robot" + i,
String.valueOf(i),
String.valueOf(i),
String.valueOf(i.toDouble / 2),
String.valueOf(i.toFloat / 2),
String.valueOf(varcharValue)))
}
writer.close()
} catch {
case ex: Throwable => throw new RuntimeException(ex)
}
}
// prepare sdk writer output with other schema
def buildTestDataOtherDataType(rows: Int, sortColumns: Array[String], path : String): Any = {
val fields: Array[Field] = new Array[Field](5)
// same column name, but name as boolean type
fields(0) = new Field("name", DataTypes.BOOLEAN)
fields(1) = new Field("age", DataTypes.INT)
fields(2) = new Field("id", DataTypes.BINARY)
fields(3) = new Field("height", DataTypes.DOUBLE)
fields(4) = new Field("salary", DataTypes.FLOAT)
val imagePath = rootPath + "/sdk/sdk/src/test/resources/image/carbondatalogo.jpg"
try {
var i = 0
val bis = new BufferedInputStream(new FileInputStream(imagePath))
var hexValue: Array[Char] = null
val originBinary = new Array[Byte](bis.available)
while (bis.read(originBinary) != -1) {
hexValue = Hex.encodeHex(originBinary)
}
bis.close()
val binaryValue = String.valueOf(hexValue)
val builder = CarbonWriter.builder()
val writer =
builder.outputPath(path)
.uniqueIdentifier(System.currentTimeMillis()).withBlockSize(2).sortBy(sortColumns)
.withCsvInput(new Schema(fields)).writtenBy("TestNonTransactionalCarbonTable").build()
while (i < rows) {
writer
.write(Array[String]("true",
String.valueOf(i),
binaryValue,
String.valueOf(i.toDouble / 2),
String.valueOf(i.toFloat / 2)))
i += 1
}
writer.close()
} catch {
case ex: Throwable => throw new RuntimeException(ex)
}
}
def cleanTestData(): Unit = {
FileUtils.deleteDirectory(new File(writerPath))
}
def deleteFile(path: String, extension: String): Unit = {
val file: CarbonFile = FileFactory.getCarbonFile(path)
for (eachDir <- file.listFiles) {
if (!eachDir.isDirectory) {
if (eachDir.getName.endsWith(extension)) {
CarbonUtil.deleteFoldersAndFilesSilent(eachDir)
}
} else {
deleteFile(eachDir.getPath, extension)
}
}
}
test("test show schemas") {
buildTestDataSingleFile()
val actualResult: List[Map[String, Any]] = prestoServer
.executeQuery("show schemas ")
assert(actualResult
.equals(List(Map("Schema" -> "information_schema"),
Map("Schema" -> "sdk_output"))))
cleanTestData()
}
test("test show tables") {
buildTestDataSingleFile()
val actualResult: List[Map[String, Any]] = prestoServer
.executeQuery("show tables")
assert(actualResult
.equals(List(Map("Table" -> "files"))))
cleanTestData()
}
test("test read sdk output files") {
buildTestDataSingleFile()
val actualResult: List[Map[String, Any]] = prestoServer
.executeQuery("SELECT COUNT(*) AS RESULT FROM files ")
val expectedResult: List[Map[String, Any]] = List(Map("RESULT" -> 3))
assert(actualResult.equals(expectedResult))
cleanTestData()
}
test("test read multiple carbondata and index files") {
buildTestDataMultipleFiles()
val actualResult: List[Map[String, Any]] = prestoServer
.executeQuery("SELECT COUNT(*) AS RESULT FROM files ")
val expectedResult: List[Map[String, Any]] = List(Map("RESULT" -> 1000000))
assert(actualResult.equals(expectedResult))
cleanTestData()
}
test("test reading different schema") {
buildTestDataSingleFile()
buildTestDataOtherDataType(3, null, writerPath)
val exception =
intercept[SQLException] {
val actualResult: List[Map[String, Any]] = prestoServer
.executeQuery("select count(*) as RESULT from files ")
}
assert(exception.getMessage()
.contains("All common columns present in the files doesn't have same datatype"))
cleanTestData()
}
test("test reading binary") {
FileUtils.deleteDirectory(new File(writerPathBinary))
createTableBinary
buildTestDataOtherDataType(3, null, writerPathBinary)
val actualResult: List[Map[String, Any]] = prestoServer
.executeQuery("select id from files1 ")
assert(actualResult.size == 3)
// check the binary byte Array size, as original hex encoded image byte array size is 118198
assert(actualResult.head("id").asInstanceOf[Array[Byte]].length == 118198)
// validate some initial bytes
assert(actualResult.head("id").asInstanceOf[Array[Byte]](0) == 56)
assert(actualResult.head("id").asInstanceOf[Array[Byte]](1) == 57)
assert(actualResult.head("id").asInstanceOf[Array[Byte]](2) == 53)
assert(actualResult.head("id").asInstanceOf[Array[Byte]](3) == 48)
FileUtils.deleteDirectory(new File(writerPathBinary))
}
test("test reading without carbon index file") {
buildTestDataSingleFile()
deleteFile(writerPath, CarbonCommonConstants.UPDATE_INDEX_FILE_EXT)
val exception =
intercept[SQLException] {
val actualResult: List[Map[String, Any]] = prestoServer
.executeQuery("select * from files ")
}
assert(exception.getMessage()
.contains("No Index files are present in the table location"))
cleanTestData()
}
test("test select all columns") {
buildTestDataSingleFile()
val actualResult: List[Map[String, Any]] = prestoServer
.executeQuery("select * from files ")
val expectedResult: List[Map[String, Any]] = List(Map(
"name" -> "robot0",
"height" -> 0.0,
"age" -> 0,
"salary" -> 0.0,
"id" -> 0,
"address" -> varcharString),
Map("name" -> "robot1",
"height" -> 0.5,
"age" -> 1,
"salary" -> 0.5,
"id" -> 1,
"address" -> varcharString),
Map("name" -> "robot2",
"height" -> 1.0,
"age" -> 2,
"salary" -> 1.0,
"id" -> 2,
"address" -> varcharString))
assert(actualResult.toString() equals expectedResult.toString())
}
test("Test for query on Varchar columns") {
buildTestDataSingleFile()
val actualRes: List[Map[String, Any]] = prestoServer.
executeQuery("select max(length(address)) from files")
val expectedRes: List[Map[String, Any]] = List(Map("_col0" -> 32001))
assert(actualRes.toString() equals expectedRes.toString())
}
}