blob: 74c5d94562b96dcf5e93c1842a2e8f22d8541d9c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbon.flink
import java.text.SimpleDateFormat
import java.util.Properties
import org.apache.flink.api.common.JobExecutionResult
import org.apache.flink.api.common.restartstrategy.RestartStrategies
import org.apache.flink.core.fs.Path
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
import org.apache.spark.sql.{CarbonEnv, Row}
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.test.util.QueryTest
import org.apache.spark.sql.execution.exchange.Exchange
import org.apache.spark.sql.secondaryindex.joins.BroadCastSIFilterPushJoin
import org.apache.carbondata.api.CarbonStore
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.scalatest.BeforeAndAfterAll
class TestCarbonWriter extends QueryTest with BeforeAndAfterAll{
val tableName = "test_flink"
val bucketTableName = "insert_bucket_table"
val dataTempPath: String = targetTestClass + "/data/temp/"
test("Writing flink data to local carbon table") {
createTable
try {
val tablePath = storeLocation + "/" + tableName + "/"
val writerProperties = newWriterProperties(dataTempPath)
val carbonProperties = newCarbonProperties(storeLocation)
val environment = StreamExecutionEnvironment.getExecutionEnvironment
environment.enableCheckpointing(2000L)
executeFlinkStreamingEnvironment(environment, writerProperties, carbonProperties)
checkAnswer(sql(s"select count(1) from $tableName"), Seq(Row(0)))
// query with stage input
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_QUERY_STAGE_INPUT, "true")
checkAnswer(sql(s"select count(*) from $tableName"), Seq(Row(1000)))
sql(s"select * from $tableName limit 10").show
checkAnswer(sql(s"select max(intField) from $tableName"), Seq(Row(999)))
checkAnswer(sql(s"select count(intField) from $tableName where intField >= 900"), Seq(Row(100)))
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_QUERY_STAGE_INPUT, "false")
sql(s"INSERT INTO $tableName STAGE")
checkAnswer(sql(s"SELECT count(*) FROM $tableName"), Seq(Row(1000)))
checkAnswer(sql(s"select count(intField) from $tableName where intField >= 900"), Seq(Row(100)))
checkIfStageFilesAreDeleted(tablePath)
}
}
test("test batch_file_count option") {
createTable
try {
val writerProperties = newWriterProperties(dataTempPath)
val carbonProperties = newCarbonProperties(storeLocation)
writerProperties.put(CarbonLocalProperty.COMMIT_THRESHOLD, "100")
val environment = StreamExecutionEnvironment.getExecutionEnvironment
executeFlinkStreamingEnvironment(environment, writerProperties, carbonProperties)
sql(s"INSERT INTO $tableName STAGE OPTIONS ('batch_file_count' = '5')")
checkAnswer(sql(s"SELECT count(1) FROM $tableName"), Seq(Row(500)))
sql(s"INSERT INTO $tableName STAGE OPTIONS ('batch_file_count' = '5')")
checkAnswer(sql(s"SELECT count(1) FROM $tableName"), Seq(Row(1000)))
}
}
test("test carbon writer of bucket table") {
sql(s"DROP TABLE IF EXISTS $tableName").collect()
sql(s"DROP TABLE IF EXISTS $bucketTableName").collect()
sql(
s"""
| CREATE TABLE $tableName (stringField string, intField int, shortField short, stringField1 string)
| STORED AS carbondata TBLPROPERTIES ('BUCKET_NUMBER'='10', 'BUCKET_COLUMNS'='stringField')
""".stripMargin
).collect()
sql(
s"""
| CREATE TABLE $bucketTableName (stringField string, intField int, shortField short, stringField1 string)
| STORED AS carbondata TBLPROPERTIES ('BUCKET_NUMBER'='10', 'BUCKET_COLUMNS'='stringField')
""".stripMargin
).collect()
try {
val writerProperties = newWriterProperties(dataTempPath)
val carbonProperties = newCarbonProperties(storeLocation)
writerProperties.put(CarbonLocalProperty.COMMIT_THRESHOLD, "100")
val environment = StreamExecutionEnvironment.getExecutionEnvironment
executeFlinkStreamingEnvironment(environment, writerProperties, carbonProperties)
sql(s"INSERT INTO $tableName STAGE OPTIONS ('batch_file_count' = '5')")
val table = CarbonEnv.getCarbonTable(Option("default"), s"$tableName")(sqlContext.sparkSession)
val segmentDir = FileFactory.getCarbonFile(table.getTablePath + "/Fact/Part0/Segment_0")
val dataFiles = segmentDir.listFiles(new CarbonFileFilter {
override def accept(file: CarbonFile): Boolean = file.getName.endsWith(".carbondata")
})
assert(dataFiles.length == 10)
checkAnswer(sql(s"SELECT count(1) FROM $tableName"), Seq(Row(500)))
sql(s"INSERT INTO $tableName STAGE OPTIONS ('batch_file_count' = '5')")
val segmentDir2 = FileFactory.getCarbonFile(table.getTablePath + "/Fact/Part0/Segment_1")
val dataFiles2 = segmentDir2.listFiles(new CarbonFileFilter {
override def accept(file: CarbonFile): Boolean = file.getName.endsWith(".carbondata")
})
assert(dataFiles2.length == 10)
checkAnswer(sql(s"SELECT count(*) FROM $tableName where stringField != 'AAA'"), Seq(Row(1000)))
sql(s"insert into $bucketTableName select * from $tableName").collect()
val plan = sql(
s"""
|select t1.*, t2.*
|from $tableName t1, $bucketTableName t2
|where t1.stringField = t2.stringField
""".stripMargin).queryExecution.executedPlan
var shuffleExists = false
plan.collect {
case s: Exchange if (s.getClass.getName.equals
("org.apache.spark.sql.execution.exchange.ShuffleExchange") ||
s.getClass.getName.equals
("org.apache.spark.sql.execution.exchange.ShuffleExchangeExec"))
=> shuffleExists = true
}
assert(!shuffleExists, "shuffle should not exist on bucket tables")
checkAnswer(sql(
s"""select count(*) from
|(select t1.*, t2.*
|from $tableName t1, $bucketTableName t2
|where t1.stringField = t2.stringField) temp
""".stripMargin), Row(1000))
}
}
test("test insert stage command with secondary index and bloomfilter") {
createTable
// create si and bloom index
sql(s"drop index if exists si_1 on $tableName")
sql(s"drop index if exists bloom_1 on $tableName")
sql(s"create index si_1 on $tableName(stringField1) as 'carbondata'")
sql(s"create index bloom_1 on $tableName(intField) as 'bloomfilter'")
try {
val tablePath = storeLocation + "/" + tableName + "/"
val writerProperties = newWriterProperties(dataTempPath)
val carbonProperties = newCarbonProperties(storeLocation)
val environment = StreamExecutionEnvironment.getExecutionEnvironment
environment.enableCheckpointing(2000L)
executeFlinkStreamingEnvironment(environment, writerProperties, carbonProperties)
// check count before insert stage
checkAnswer(sql(s"SELECT count(*) FROM $tableName"), Seq(Row(0)))
checkAnswer(sql("select count(*) from si_1"), Seq(Row(0)))
sql(s"INSERT INTO $tableName STAGE")
checkAnswer(sql("select count(*) from si_1"), Seq(Row(1000)))
checkAnswer(sql(s"SELECT count(*) FROM $tableName"), Seq(Row(1000)))
// check if query hits si
val df = sql(s"select stringField, intField from $tableName where stringField1 = 'si12'")
checkAnswer(df, Seq(Row("test12", 12)))
var isFilterHitSecondaryIndex = false
df.queryExecution.sparkPlan.transform {
case broadCastSIFilterPushDown: BroadCastSIFilterPushJoin =>
isFilterHitSecondaryIndex = true
broadCastSIFilterPushDown
}
assert(isFilterHitSecondaryIndex)
// check if query hits bloom filter
checkAnswer(sql(s"select intField,stringField1 from $tableName where intField = 99"), Seq(Row(99, "si99")))
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_QUERY_STATISTICS, "true")
val explainBloom = sql(s"explain select intField,stringField1 from $tableName where intField = 99").collect()
assert(explainBloom(0).getString(0).contains(
"""
|Table Scan on test_flink
| - total: 1 blocks, 1 blocklets
| - filter: (intfield <> null and intfield = 99)
| - pruned by Main Index
| - skipped: 0 blocks, 0 blocklets
| - pruned by CG Index
| - name: bloom_1
| - provider: bloomfilter
| - skipped: 0 blocks, 0 blocklets""".stripMargin))
checkIfStageFilesAreDeleted(tablePath)
}
}
test("test insert stage command with materilaized view") {
createTable
// create materialized view
sql(s"drop materialized view if exists mv_1")
sql(s"create materialized view mv_1 as select stringField, shortField from $tableName where intField=99 ")
try {
val tablePath = storeLocation + "/" + tableName + "/"
val writerProperties = newWriterProperties(dataTempPath)
val carbonProperties = newCarbonProperties(storeLocation)
val environment = StreamExecutionEnvironment.getExecutionEnvironment
environment.enableCheckpointing(2000L)
executeFlinkStreamingEnvironment(environment, writerProperties, carbonProperties)
// check count before insert stage
checkAnswer(sql(s"SELECT count(*) FROM $tableName"), Seq(Row(0)))
checkAnswer(sql(s"SELECT count(*) FROM mv_1"), Seq(Row(0)))
sql(s"INSERT INTO $tableName STAGE")
checkAnswer(sql(s"SELECT count(*) FROM mv_1"), Seq(Row(1)))
val df = sql(s"select stringField, shortField from $tableName where intField=99")
val tables = df.queryExecution.optimizedPlan collect {
case l: LogicalRelation => l.catalogTable.get
}
assert(tables.exists(_.identifier.table.equalsIgnoreCase("mv_1")))
checkAnswer(df, Seq(Row("test99",12345)))
checkIfStageFilesAreDeleted(tablePath)
}
}
test("Show segments with stage") {
createTable
try {
val tablePath = storeLocation + "/" + tableName + "/"
val stagePath = CarbonTablePath.getStageDir(tablePath)
val writerProperties = newWriterProperties(dataTempPath)
val carbonProperties = newCarbonProperties(storeLocation)
val environment = StreamExecutionEnvironment.getExecutionEnvironment
environment.enableCheckpointing(2000L)
executeFlinkStreamingEnvironment(environment, writerProperties, carbonProperties)
// 1. Test "SHOW SEGMENT ON $tableanme WITH STAGE"
var rows = sql(s"SHOW SEGMENTS ON $tableName WITH STAGE").collect()
var unloadedStageCount = CarbonStore.listStageFiles(stagePath)._1.length
assert(rows.length == unloadedStageCount)
for (index <- 0 until unloadedStageCount) {
assert(rows(index).getString(0) == null)
assert(rows(index).getString(1).equals("Unload"))
assert(rows(index).getString(2) != null)
assert(rows(index).getString(3) == null)
assert(rows(index).getString(4).equals("NA"))
assert(rows(index).getString(5) != null)
assert(rows(index).getString(6) != null)
assert(rows(index).getString(7) == null)
assertShowStagesCreateTimeDesc(rows, index)
}
// 2. Test "SHOW SEGMENT FOR TABLE $tableanme"
val rowsfortable = sql(s"SHOW SEGMENTS FOR TABLE $tableName WITH STAGE").collect()
assert(rowsfortable.length == rows.length)
for (index <- 0 until unloadedStageCount) {
assert(rows(index).toString() == rowsfortable(index).toString())
}
// 3. Test "SHOW SEGMENT ON $tableanme WITH STAGE AS (QUERY)"
rows = sql(s"SHOW SEGMENTS ON $tableName WITH STAGE AS " +
s"(SELECT * FROM $tableName" + "_segments)").collect()
for (index <- 0 until unloadedStageCount) {
val row = rows(index)
assert(rows(index).getString(0) == null)
assert(rows(index).getString(1).equals("Unload"))
assert(rows(index).getString(2) != null)
assert(rows(index).getLong(3) == -1)
assert(rows(index).get(4).toString.equals("WrappedArray(NA)"))
assert(rows(index).getLong(5) > 0)
assert(rows(index).getLong(6) > 0)
assert(rows(index).getString(7) == null)
assert(rows(index).getString(8) == null)
assert(rows(index).getString(9) == null)
assert(rows(index).getString(10) == null)
assert(rows(index).getString(11) == null)
assertShowStagesCreateTimeDesc(rows, index)
}
// 4. Test "SHOW SEGMENT ON $tableanme WITH STAGE LIMIT 1 AS (QUERY)"
// Test "SHOW SEGMENT ON $tableanme LIMIT 1 AS (QUERY)"
if (unloadedStageCount > 1) {
sql(s"INSERT INTO $tableName STAGE OPTIONS ('batch_file_count' = '1')")
unloadedStageCount = CarbonStore.listStageFiles(stagePath)._1.length
rows = sql(s"SHOW SEGMENTS ON $tableName WITH STAGE LIMIT 1").collect()
assert(rows.length == unloadedStageCount + 1)
assert(rows(0).getString(1).equals("Unload"))
rows = sql(s"SHOW SEGMENTS ON $tableName WITH STAGE LIMIT 0").collect()
assert(rows.length == unloadedStageCount)
assert(rows(0).getString(1).equals("Unload"))
rows = sql(s"SHOW SEGMENTS FOR TABLE $tableName WITH STAGE LIMIT 1").collect()
assert(rows.length == unloadedStageCount + 1)
assert(rows(0).getString(1).equals("Unload"))
rows = sql(s"SHOW SEGMENTS ON $tableName WITH STAGE LIMIT 1 AS " +
s"(SELECT * FROM $tableName" + "_segments)").collect()
assert(rows.length == unloadedStageCount + 1)
assert(rows(0).getString(1).equals("Unload"))
rows = sql(s"SHOW SEGMENTS ON $tableName WITH STAGE AS " +
s"(SELECT * FROM $tableName" + "_segments where status = 'Unload')").collect()
unloadedStageCount = CarbonStore.listStageFiles(stagePath)._1.length
assert(rows.length == unloadedStageCount)
assert(rows(0).getString(1).equals("Unload"))
rows = sql(s"SHOW SEGMENTS ON $tableName WITH STAGE AS " +
s"(SELECT * FROM $tableName" + "_segments where status = 'Success')").collect()
assert(rows.length == 1)
assert(rows(0).getString(1).equals("Success"))
}
}
}
private def executeFlinkStreamingEnvironment(environment: StreamExecutionEnvironment,
writerProperties: Properties,
carbonProperties: Properties): JobExecutionResult = {
val tablePath = storeLocation + "/" + tableName + "/"
environment.setParallelism(1)
environment.setRestartStrategy(RestartStrategies.noRestart)
val dataCount = 1000
val source = new TestSource(dataCount) {
@throws[InterruptedException]
override def get(index: Int): Array[AnyRef] = {
Thread.sleep(1L)
val data = new Array[AnyRef](4)
data(0) = "test" + index
data(1) = index.asInstanceOf[AnyRef]
data(2) = 12345.asInstanceOf[AnyRef]
data(3) = "si" + index
data
}
@throws[InterruptedException]
override def onFinish(): Unit = {
Thread.sleep(5000L)
}
}
val stream = environment.addSource(source)
val factory = CarbonWriterFactory.builder("Local").build(
"default",
tableName,
tablePath,
new Properties,
writerProperties,
carbonProperties)
val streamSink = StreamingFileSink.forBulkFormat(new Path(ProxyFileSystem.DEFAULT_URI), factory).build
stream.addSink(streamSink)
try environment.execute
catch {
case exception: Exception =>
// TODO
throw new UnsupportedOperationException(exception)
}
}
private def checkIfStageFilesAreDeleted(tablePath: String): Unit = {
// ensure the stage snapshot file and all stage files are deleted
assertResult(false)(FileFactory.isFileExist(CarbonTablePath.getStageSnapshotFile(tablePath)))
assertResult(true)(FileFactory.getCarbonFile(CarbonTablePath.getStageDir(tablePath)).listFiles().isEmpty)
}
private def createTable = {
sql(s"DROP TABLE IF EXISTS $tableName")
sql(s"""
| CREATE TABLE $tableName (stringField string, intField int, shortField short, stringField1 string)
| STORED AS carbondata
""".stripMargin)
}
override def afterAll(): Unit = {
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_QUERY_STATISTICS,
CarbonCommonConstants.ENABLE_QUERY_STATISTICS_DEFAULT)
sql(s"DROP TABLE IF EXISTS $tableName")
sql(s"DROP TABLE IF EXISTS $bucketTableName").collect()
}
private def newWriterProperties(dataTempPath: String) = {
val properties = new Properties
properties.setProperty(CarbonLocalProperty.DATA_TEMP_PATH, dataTempPath)
properties
}
private def newCarbonProperties(storeLocation: String) = {
val properties = new Properties
properties.setProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
properties.setProperty(CarbonCommonConstants.CARBON_DATE_FORMAT,
CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)
properties.setProperty(CarbonCommonConstants.STORE_LOCATION, storeLocation)
properties.setProperty(CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB, "1024")
properties
}
private def assertShowStagesCreateTimeDesc(rows: Array[Row], index: Int): Unit = {
if (index > 0) {
val nowtime = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").
parse(rows(index).getString(2)).getTime
val lasttime = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").
parse(rows(index - 1).getString(2)).getTime
assert(nowtime <= lasttime)
}
}
}