blob: 5321d0517550433a016d78533127ff9c45f87f48 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbon.flink
import java.text.SimpleDateFormat
import java.util.concurrent.Executors
import java.util.{Base64, Properties}
import org.apache.flink.api.common.restartstrategy.RestartStrategies
import org.apache.flink.api.java.functions.KeySelector
import org.apache.flink.core.fs.Path
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
import org.apache.spark.sql.Row
import org.apache.spark.sql.test.util.QueryTest
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.secondaryindex.joins.BroadCastSIFilterPushJoin
import org.apache.carbondata.api.CarbonStore
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.scalatest.BeforeAndAfterAll
class TestCarbonPartitionWriter extends QueryTest with BeforeAndAfterAll{
val tableName = "test_flink_partition"
val dataTempPath = targetTestClass + "/data/temp/"
test("Writing flink data to local partition carbon table") {
createPartitionTable
try {
val tablePath = storeLocation + "/" + tableName + "/"
val writerProperties = newWriterProperties(dataTempPath)
val carbonProperties = newCarbonProperties(storeLocation)
val environment = StreamExecutionEnvironment.getExecutionEnvironment
environment.setParallelism(6)
environment.enableCheckpointing(2000L)
environment.setRestartStrategy(RestartStrategies.noRestart)
val dataCount = 1000
val source = getTestSource(dataCount)
executeStreamingEnvironment(tablePath, writerProperties, carbonProperties, environment, source)
sql(s"INSERT INTO $tableName STAGE")
checkAnswer(sql(s"SELECT count(1) FROM $tableName"), Seq(Row(1000)))
}
}
test("Show segments with stage") {
createPartitionTable
try {
val tablePath = storeLocation + "/" + tableName + "/"
val tableStagePath = CarbonTablePath.getStageDir(tablePath)
val writerProperties = newWriterProperties(dataTempPath)
val carbonProperties = newCarbonProperties(storeLocation)
val environment = StreamExecutionEnvironment.getExecutionEnvironment
environment.enableCheckpointing(2000L)
val dataCount = 1000
val source = getTestSource(dataCount)
executeStreamingEnvironment(tablePath, writerProperties, carbonProperties, environment, source)
// 1. Test "SHOW SEGMENT ON $tableanme WITH STAGE"
var rows = sql(s"SHOW SEGMENTS ON $tableName WITH STAGE").collect()
var unloadedStageCount = CarbonStore.listStageFiles(tableStagePath)._1.length
assert(rows.length == unloadedStageCount)
for (index <- 0 until unloadedStageCount) {
assert(rows(index).getString(0) == null)
assert(rows(index).getString(1).equals("Unload"))
assert(rows(index).getString(2) != null)
assert(rows(index).getString(3) == null)
assert(!rows(index).getString(4).equals("NA"))
assert(rows(index).getString(5) != null)
assert(rows(index).getString(6) != null)
assert(rows(index).getString(7) == null)
assertShowStagesCreateTimeDesc(rows, index)
}
// 2. Test "SHOW SEGMENT FOR TABLE $tableanme"
val rowsfortable = sql(s"SHOW SEGMENTS FOR TABLE $tableName WITH STAGE").collect()
assert(rowsfortable.length == rows.length)
for (index <- 0 until unloadedStageCount) {
assert(rows(index).toString() == rowsfortable(index).toString())
}
// 3. Test "SHOW SEGMENT ON $tableanme WITH STAGE AS (QUERY)"
rows = sql(s"SHOW SEGMENTS ON $tableName WITH STAGE AS " +
s"(SELECT * FROM $tableName" + "_segments)").collect()
for (index <- 0 until unloadedStageCount) {
val row = rows(index)
assert(rows(index).getString(0) == null)
assert(rows(index).getString(1).equals("Unload"))
assert(rows(index).getString(2) != null)
assert(rows(index).getLong(3) == -1)
assert(!rows(index).get(4).toString.equals("WrappedArray(NA)"))
assert(rows(index).getLong(5) > 0)
assert(rows(index).getLong(6) > 0)
assert(rows(index).getString(7) == null)
assert(rows(index).getString(8) == null)
assert(rows(index).getString(9) == null)
assert(rows(index).getString(10) == null)
assert(rows(index).getString(11) == null)
assertShowStagesCreateTimeDesc(rows, index)
}
// 4. Test "SHOW SEGMENT ON $tableanme WITH STAGE LIMIT 1 AS (QUERY)"
// Test "SHOW SEGMENT ON $tableanme LIMIT 1 AS (QUERY)"
if (unloadedStageCount > 1) {
sql(s"INSERT INTO $tableName STAGE OPTIONS ('batch_file_count' = '1')")
unloadedStageCount = CarbonStore.listStageFiles(tableStagePath)._1.length
rows = sql(s"SHOW SEGMENTS ON $tableName WITH STAGE LIMIT 1").collect()
assert(rows.length == unloadedStageCount + 1)
assert(rows(0).getString(1).equals("Unload"))
rows = sql(s"SHOW SEGMENTS ON $tableName WITH STAGE LIMIT 0").collect()
assert(rows.length == unloadedStageCount)
assert(rows(0).getString(1).equals("Unload"))
rows = sql(s"SHOW SEGMENTS FOR TABLE $tableName WITH STAGE LIMIT 1").collect()
assert(rows.length == unloadedStageCount + 1)
assert(rows(0).getString(1).equals("Unload"))
rows = sql(s"SHOW SEGMENTS ON $tableName WITH STAGE LIMIT 1 AS " +
s"(SELECT * FROM $tableName" + "_segments)").collect()
assert(rows.length == unloadedStageCount + 1)
assert(rows(0).getString(1).equals("Unload"))
unloadedStageCount = CarbonStore.listStageFiles(tableStagePath)._1.length
rows = sql(s"SHOW SEGMENTS ON $tableName WITH STAGE AS " +
s"(SELECT * FROM $tableName" + "_segments where status = 'Unload')").collect()
assert(rows.length == unloadedStageCount)
assert(rows(0).getString(1).equals("Unload"))
rows = sql(s"SHOW SEGMENTS ON $tableName WITH STAGE AS " +
s"(SELECT * FROM $tableName" + "_segments where status = 'Success')").collect()
assert(rows.length >= 1)
assert(rows(0).getString(1).equals("Success"))
// createFakeLoadingStage
createFakeLoadingStage(CarbonTablePath.getStageDir(tablePath))
rows = sql(s"SHOW SEGMENTS ON $tableName WITH STAGE AS " +
s"(SELECT * FROM $tableName" + "_segments where status = 'Loading')").collect()
assert(rows.length == 1)
assert(rows(0).getString(1).equals("Loading"))
var (unloadedFiles, loadingFiles) = CarbonStore.listStageFiles(tableStagePath)
rows = sql(s"SHOW SEGMENTS ON $tableName WITH STAGE AS " +
s"(SELECT * FROM $tableName" + "_segments " +
"where status = 'Unload' or status = 'Loading')").collect()
assert(rows.length == unloadedFiles.length + loadingFiles.length)
}
}
}
test("test concurrent insertstage") {
createPartitionTable
try {
val tablePath = storeLocation + "/" + tableName + "/"
val writerProperties = newWriterProperties(dataTempPath)
val carbonProperties = newCarbonProperties(storeLocation)
val environment = StreamExecutionEnvironment.getExecutionEnvironment
environment.setParallelism(6)
environment.enableCheckpointing(2000L)
environment.setRestartStrategy(RestartStrategies.noRestart)
val dataCount = 1000
val source = getTestSource(dataCount)
executeStreamingEnvironment(tablePath, writerProperties, carbonProperties, environment, source)
Thread.sleep(5000)
val executorService = Executors.newFixedThreadPool(10)
for(i <- 1 to 10) {
executorService.submit(new Runnable {
override def run(): Unit = {
sql(s"INSERT INTO $tableName STAGE OPTIONS('batch_file_count'='5')")
}
}).get()
}
checkAnswer(sql(s"SELECT count(1) FROM $tableName"), Seq(Row(1000)))
}
}
test("Test complex type") {
sql(s"DROP TABLE IF EXISTS $tableName").collect()
sql(
s"""
| CREATE TABLE $tableName (stringField string, intField int, shortField short,
| structField struct<value1:string,value2:int,value3:int>, binaryField struct<value1:binary>)
| STORED AS carbondata
| PARTITIONED BY (hour_ string, date_ string)
| TBLPROPERTIES ('SORT_COLUMNS'='hour_,date_,stringField', 'SORT_SCOPE'='GLOBAL_SORT')
""".stripMargin
).collect()
try {
val tablePath = storeLocation + "/" + tableName + "/"
val writerProperties = newWriterProperties(dataTempPath)
val carbonProperties = newCarbonProperties(storeLocation)
val environment = StreamExecutionEnvironment.getExecutionEnvironment
environment.setParallelism(6)
environment.enableCheckpointing(2000L)
environment.setRestartStrategy(RestartStrategies.noRestart)
val dataCount = 1000
val source = new TestSource(dataCount) {
@throws[InterruptedException]
override def get(index: Int): Array[AnyRef] = {
val data = new Array[AnyRef](7)
data(0) = "test" + index
data(1) = index.asInstanceOf[AnyRef]
data(2) = 12345.asInstanceOf[AnyRef]
data(3) = "test\0011\0012"
data(4) = Base64.getEncoder.encodeToString(Array[Byte](2, 3, 4))
data(5) = Integer.toString(TestSource.randomCache.get().nextInt(24))
data(6) = "20191218"
data
}
@throws[InterruptedException]
override def onFinish(): Unit = {
Thread.sleep(5000L)
}
}
executeStreamingEnvironment(tablePath, writerProperties, carbonProperties, environment, source)
sql(s"INSERT INTO $tableName STAGE")
checkAnswer(sql(s"SELECT count(1) FROM $tableName"), Seq(Row(1000)))
checkAnswer(sql(s"SELECT stringField FROM $tableName order by stringField limit 2"),
Seq(Row("test0"), Row("test1")))
val rows = sql(s"SELECT * FROM $tableName limit 1").collect()
assertResult(1)(rows.length)
assertResult(Array[Byte](2, 3, 4))(rows(0).get(rows(0).fieldIndex("binaryfield")).asInstanceOf[GenericRowWithSchema](0))
}
}
test("test insert stage into partition carbon table with secondary index") {
createPartitionTable
// create si index
sql(s"drop index if exists si_1 on $tableName")
sql(s"create index si_1 on $tableName(stringField1) as 'carbondata'")
try {
val tablePath = storeLocation + "/" + tableName + "/"
val writerProperties = newWriterProperties(dataTempPath)
val carbonProperties = newCarbonProperties(storeLocation)
val environment = StreamExecutionEnvironment.getExecutionEnvironment
environment.setParallelism(6)
environment.enableCheckpointing(2000L)
environment.setRestartStrategy(RestartStrategies.noRestart)
val dataCount = 10
val source = getTestSource(dataCount)
executeStreamingEnvironment(tablePath, writerProperties, carbonProperties, environment, source)
sql(s"INSERT INTO $tableName STAGE")
checkAnswer(sql("select count(*) from si_1"), Seq(Row(10)) )
checkAnswer(sql(s"SELECT count(*) FROM $tableName"), Seq(Row(10)))
// check if query hits si
val df = sql(s"select stringField, intField from $tableName where stringField1 = 'si1'")
checkAnswer(df, Seq(Row("test1", 1)))
var isFilterHitSecondaryIndex = false
df.queryExecution.sparkPlan.transform {
case broadCastSIFilterPushDown: BroadCastSIFilterPushJoin =>
isFilterHitSecondaryIndex = true
broadCastSIFilterPushDown
}
assert(isFilterHitSecondaryIndex)
}
}
test("test insert stage into partition carbon table with materialized view") {
createPartitionTable
// create materialized view
sql(s"drop materialized view if exists mv_1")
sql(s"create materialized view mv_1 as select stringField, shortField from $tableName where intField=9")
try {
val tablePath = storeLocation + "/" + tableName + "/"
val writerProperties = newWriterProperties(dataTempPath)
val carbonProperties = newCarbonProperties(storeLocation)
val environment = StreamExecutionEnvironment.getExecutionEnvironment
environment.setParallelism(6)
environment.enableCheckpointing(2000L)
environment.setRestartStrategy(RestartStrategies.noRestart)
val dataCount = 10
val source = getTestSource(dataCount)
executeStreamingEnvironment(tablePath, writerProperties, carbonProperties, environment, source)
sql(s"INSERT INTO $tableName STAGE")
checkAnswer(sql(s"SELECT count(*) FROM mv_1"), Seq(Row(1)))
val df = sql(s"select stringField, shortField from $tableName where intField=9")
val tables = df.queryExecution.optimizedPlan collect {
case l: LogicalRelation => l.catalogTable.get
}
assert(tables.exists(_.identifier.table.equalsIgnoreCase("mv_1")))
checkAnswer(df, Seq(Row("test9",12345)))
}
}
private def getTestSource(dataCount: Int): TestSource = {
new TestSource(dataCount) {
@throws[InterruptedException]
override def get(index: Int): Array[AnyRef] = {
val data = new Array[AnyRef](7)
data(0) = "test" + index
data(1) = index.asInstanceOf[AnyRef]
data(2) = 12345.asInstanceOf[AnyRef]
data(3) = "si" + index
data(4) = Integer.toString(TestSource.randomCache.get().nextInt(24))
data
}
@throws[InterruptedException]
override def onFinish(): Unit = {
Thread.sleep(5000L)
}
}
}
private def createPartitionTable = {
sql(s"DROP TABLE IF EXISTS $tableName")
sql(
s"""
| CREATE TABLE $tableName (stringField string, intField int, shortField short, stringField1 string)
| STORED AS carbondata
| PARTITIONED BY (hour_ string)
| TBLPROPERTIES ('SORT_COLUMNS'='hour_,stringField', 'SORT_SCOPE'='GLOBAL_SORT')
""".stripMargin)
}
override def afterAll(): Unit = {
sql(s"DROP TABLE IF EXISTS $tableName")
}
private def executeStreamingEnvironment(tablePath: String,
writerProperties: Properties,
carbonProperties: Properties,
environment: StreamExecutionEnvironment,
source: TestSource): Unit = {
val stream = environment.addSource(source)
val factory = CarbonWriterFactory.builder("Local").build(
"default",
tableName,
tablePath,
new Properties,
writerProperties,
carbonProperties)
val streamSink = StreamingFileSink.forBulkFormat(new Path(ProxyFileSystem.DEFAULT_URI), factory).build
stream.keyBy(new KeySelector[Array[AnyRef], AnyRef] {
override def getKey(value: Array[AnyRef]): AnyRef = value(3) // return hour_
}).addSink(streamSink)
try environment.execute
catch {
case exception: Exception =>
// TODO
throw new UnsupportedOperationException(exception)
}
assertResult(false)(FileFactory
.getCarbonFile(CarbonTablePath.getStageDir(tablePath)).listFiles().isEmpty)
}
private def newWriterProperties(
dataTempPath: String) = {
val properties = new Properties
properties.setProperty(CarbonLocalProperty.DATA_TEMP_PATH, dataTempPath)
properties
}
private def newCarbonProperties(storeLocation: String) = {
val properties = new Properties
properties.setProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
properties.setProperty(CarbonCommonConstants.CARBON_DATE_FORMAT,
CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)
properties.setProperty(CarbonCommonConstants.STORE_LOCATION, storeLocation)
properties.setProperty(CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB, "1024")
properties.setProperty("binary_decoder", "base64")
properties
}
private def assertShowStagesCreateTimeDesc(rows: Array[Row], index: Int): Unit = {
if (index > 0) {
val nowtime = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").
parse(rows(index).getString(2)).getTime
val lasttime = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").
parse(rows(index - 1).getString(2)).getTime
assert(nowtime <= lasttime)
}
}
private def createFakeLoadingStage(stagePath: String): Unit = {
var (unloadedFiles, loadingFiles) = CarbonStore.listStageFiles(stagePath)
assert(unloadedFiles.length > 0)
val loadingFilesCountBefore = loadingFiles.length
FileFactory.getCarbonFile(unloadedFiles(0).getAbsolutePath +
CarbonTablePath.LOADING_FILE_SUFFIX).createNewFile()
loadingFiles = CarbonStore.listStageFiles(stagePath)._2
val loadingFilesCountAfter = loadingFiles.length
assert(loadingFilesCountAfter == loadingFilesCountBefore + 1)
}
}