blob: 501b0e658e4f64ad3490598d4c81f370e9443ab1 [file] [log] [blame]
package org.apache.carbondata.spark.testsuite.sdk
import java.io.{BufferedWriter, File, FileWriter}
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.sdk.file.{ArrowCarbonReader, CarbonReader, CarbonSchemaReader}
import org.apache.spark.sql.{CarbonEnv, Row}
import org.apache.spark.sql.test.util.QueryTest
import org.scalatest.BeforeAndAfterAll
class TestSDKWithTransactionalTable extends QueryTest with BeforeAndAfterAll {
var filePath: String = _
def buildTestData() = {
filePath = s"${integrationPath}/spark/target/big.csv"
val file = new File(filePath)
val writer = new BufferedWriter(new FileWriter(file))
writer.write("c1, c2, c3, c4, c5, c6, c7, c8, c9, c10")
writer.newLine()
for (i <- 0 until 10) {
writer.write("a" + 1%1000 + "," +
"b" + 1%1000 + "," +
"c" + 1%1000 + "," +
"d" + 1%1000 + "," +
"e" + 1%1000 + "," +
"f" + 1%1000 + "," +
1%1000 + "," +
1%1000 + "," +
1%1000 + "," +
1%1000 + "\n")
if ( i % 10000 == 0) {
writer.flush()
}
}
writer.close()
}
def dropTable() = {
sql("DROP TABLE IF EXISTS carbon_load1")
sql("DROP TABLE IF EXISTS train")
sql("DROP TABLE IF EXISTS test")
}
override def beforeAll {
dropTable
buildTestData
}
test("test sdk with transactional table, read as arrow") {
sql(
"""
| CREATE TABLE carbon_load1(
| c1 string, c2 string, c3 string, c4 string, c5 string,
| c6 string, c7 int, c8 int, c9 int, c10 int)
| STORED AS carbondata
""".stripMargin)
sql(s"LOAD DATA LOCAL INPATH '$filePath' into table carbon_load1")
sql(s"LOAD DATA LOCAL INPATH '$filePath' into table carbon_load1")
sql(s"LOAD DATA LOCAL INPATH '$filePath' into table carbon_load1")
val table = CarbonEnv.getCarbonTable(None, "carbon_load1")(sqlContext.sparkSession)
val reader:ArrowCarbonReader[Array[Object]] =
CarbonReader.builder(table.getTablePath, table.getTableName).buildArrowReader()
var count = 0
while(reader.hasNext) {
reader.readNextRow()
count += 1
}
reader.close()
checkAnswer(sql("select count(*) from carbon_load1"), Seq(Row(count)))
sql("DROP TABLE carbon_load1")
}
test("test sdk with transactional table, read as row") {
sql(
"""
| CREATE TABLE carbon_load1(
| c1 string, c2 string, c3 string, c4 string, c5 string,
| c6 string, c7 int, c8 int, c9 int, c10 int)
| STORED AS carbondata
""".stripMargin)
sql(s"LOAD DATA LOCAL INPATH '$filePath' into table carbon_load1")
sql(s"LOAD DATA LOCAL INPATH '$filePath' into table carbon_load1")
sql(s"LOAD DATA LOCAL INPATH '$filePath' into table carbon_load1")
val table = CarbonEnv.getCarbonTable(None, "carbon_load1")(sqlContext.sparkSession)
val reader = CarbonReader.builder(table.getTablePath, table.getTableName).build()
var count = 0
while ( { reader.hasNext }) {
var row = reader.readNextRow.asInstanceOf[Array[AnyRef]]
count += 1
}
reader.close()
checkAnswer(sql("select count(*) from carbon_load1"), Seq(Row(count)))
sql("DROP TABLE carbon_load1")
}
override def afterAll {
new File(filePath).delete()
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT)
}
}