blob: 94baf8689320fbf299e3a59317412fcd57cfd5c3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.carbondata
import java.io.{File, PrintWriter}
import java.math.BigDecimal
import java.net.{BindException, ServerSocket}
import java.sql.{Date, Timestamp}
import java.util.concurrent.Executors
import scala.collection.mutable
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.hive.CarbonRelation
import org.apache.spark.sql.{CarbonEnv, Row, SaveMode, SparkSession}
import org.apache.spark.sql.streaming.{ProcessingTime, StreamingQuery}
import org.apache.spark.sql.test.util.QueryTest
import org.scalatest.BeforeAndAfterAll
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.statusmanager.{FileFormat, SegmentStatus}
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
import org.apache.carbondata.spark.exception.{MalformedCarbonCommandException, ProcessMetaDataException}
class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
private val spark = sqlContext.sparkSession
private val dataFilePath = s"$resourcesPath/streamSample.csv"
override def beforeAll {
CarbonProperties.getInstance().addProperty(
CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
CarbonProperties.getInstance().addProperty(
CarbonCommonConstants.CARBON_DATE_FORMAT,
CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)
sql("DROP DATABASE IF EXISTS streaming CASCADE")
sql("CREATE DATABASE streaming")
sql("USE streaming")
sql(
"""
| CREATE TABLE source(
| c1 string,
| c2 int,
| c3 string,
| c5 string
| ) STORED BY 'org.apache.carbondata.format'
| TBLPROPERTIES ('streaming' = 'true')
""".stripMargin)
sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO TABLE source""")
dropTable()
// 1. normal table not support streaming ingest
createTable(tableName = "batch_table", streaming = false, withBatchLoad = true)
// 2. streaming table with different input source
// file source
createTable(tableName = "stream_table_file", streaming = true, withBatchLoad = true)
// 3. streaming table with bad records
createTable(tableName = "bad_record_fail", streaming = true, withBatchLoad = true)
// 4. streaming frequency check
createTable(tableName = "stream_table_1s", streaming = true, withBatchLoad = true)
// 5. streaming table execute batch loading
// 6. detail query
// 8. compaction
// full scan + filter scan + aggregate query
createTable(tableName = "stream_table_filter", streaming = true, withBatchLoad = true)
createTableWithComplexType(
tableName = "stream_table_filter_complex", streaming = true, withBatchLoad = true)
// 10. fault tolerant
createTable(tableName = "stream_table_tolerant", streaming = true, withBatchLoad = true)
// 11. table for delete segment test
createTable(tableName = "stream_table_delete_id", streaming = true, withBatchLoad = false)
createTable(tableName = "stream_table_delete_date", streaming = true, withBatchLoad = false)
// 12. reject alter streaming properties
// 13. handoff streaming segment and finish streaming
createTable(tableName = "stream_table_handoff", streaming = false, withBatchLoad = false)
// 15. auto handoff streaming segment
// 16. close streaming table
// 17. reopen streaming table after close
// 9. create new stream segment if current stream segment is full
createTable(tableName = "stream_table_reopen", streaming = true, withBatchLoad = false)
// 18. block drop table while streaming is in progress
createTable(tableName = "stream_table_drop", streaming = true, withBatchLoad = false)
// 19. block streaming on 'preaggregate' main table
createTable(tableName = "agg_table_block", streaming = false, withBatchLoad = false)
}
test("validate streaming property") {
sql(
"""
| CREATE TABLE correct(
| c1 string
| ) STORED BY 'org.apache.carbondata.format'
| TBLPROPERTIES ('streaming' = 'true')
""".stripMargin)
sql("DROP TABLE correct")
sql(
"""
| CREATE TABLE correct(
| c1 string
| ) STORED BY 'org.apache.carbondata.format'
| TBLPROPERTIES ('streaming' = 'false')
""".stripMargin)
sql("DROP TABLE correct")
intercept[MalformedCarbonCommandException] {
sql(
"""
| create table wrong(
| c1 string
| ) STORED BY 'org.apache.carbondata.format'
| TBLPROPERTIES ('streaming' = 'invalid')
""".stripMargin)
}
}
test("test blocking update and delete operation on streaming table") {
intercept[MalformedCarbonCommandException] {
sql("""UPDATE source d SET (d.c2) = (d.c2 + 1) WHERE d.c1 = 'a'""").show()
}
intercept[MalformedCarbonCommandException] {
sql("""DELETE FROM source WHERE d.c1 = 'a'""").show()
}
}
test("test blocking alter table operation on streaming table") {
intercept[MalformedCarbonCommandException] {
sql("""ALTER TABLE source ADD COLUMNS (c6 string)""").show()
}
intercept[MalformedCarbonCommandException] {
sql("""ALTER TABLE source DROP COLUMNS (c1)""").show()
}
intercept[MalformedCarbonCommandException] {
sql("""ALTER TABLE source RENAME to t""").show()
}
intercept[MalformedCarbonCommandException] {
sql("""ALTER TABLE source CHANGE c1 c1 int""").show()
}
}
override def afterAll {
dropTable()
sql("USE default")
sql("DROP DATABASE IF EXISTS streaming CASCADE")
}
def dropTable(): Unit = {
sql("drop table if exists streaming.batch_table")
sql("drop table if exists streaming.stream_table_file")
sql("drop table if exists streaming.bad_record_fail")
sql("drop table if exists streaming.stream_table_1s")
sql("drop table if exists streaming.stream_table_filter ")
sql("drop table if exists streaming.stream_table_filter_complex")
sql("drop table if exists streaming.stream_table_tolerant")
sql("drop table if exists streaming.stream_table_delete_id")
sql("drop table if exists streaming.stream_table_delete_date")
sql("drop table if exists streaming.stream_table_handoff")
sql("drop table if exists streaming.stream_table_reopen")
sql("drop table if exists streaming.stream_table_drop")
sql("drop table if exists streaming.agg_table_block")
sql("drop table if exists streaming.agg_table_block_agg0")
}
// normal table not support streaming ingest
test("normal table not support streaming ingest and alter normal table's streaming property") {
// alter normal table's streaming property
val msg = intercept[MalformedCarbonCommandException](sql("alter table streaming.batch_table set tblproperties('streaming'='false')"))
assertResult("Streaming property value is incorrect")(msg.getMessage)
val identifier = new TableIdentifier("batch_table", Option("streaming"))
val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore.lookupRelation(identifier)(spark)
.asInstanceOf[CarbonRelation].metaData.carbonTable
val tablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
var server: ServerSocket = null
try {
server = getServerSocket
val thread1 = createWriteSocketThread(server, 2, 10, 1)
thread1.start()
// use thread pool to catch the exception of sink thread
val pool = Executors.newSingleThreadExecutor()
val thread2 = createSocketStreamingThread(spark, server.getLocalPort, tablePath, identifier)
val future = pool.submit(thread2)
Thread.sleep(1000)
thread1.interrupt()
val msg = intercept[Exception] {
future.get()
}
assert(msg.getMessage.contains("is not a streaming table"))
} finally {
if (server != null) {
server.close()
}
}
}
// input source: file
test("streaming ingest from file source") {
val identifier = new TableIdentifier("stream_table_file", Option("streaming"))
val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore.lookupRelation(identifier)(spark)
.asInstanceOf[CarbonRelation].metaData.carbonTable
val tablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
val csvDataDir = new File("target/csvdata").getCanonicalPath
// streaming ingest 10 rows
generateCSVDataFile(spark, idStart = 10, rowNums = 10, csvDataDir)
val thread = createFileStreamingThread(spark, tablePath, csvDataDir, intervalSecond = 1,
identifier)
thread.start()
Thread.sleep(2000)
generateCSVDataFile(spark, idStart = 30, rowNums = 10, csvDataDir)
Thread.sleep(5000)
thread.interrupt()
checkAnswer(
sql("select count(*) from streaming.stream_table_file"),
Seq(Row(25))
)
val row = sql("select * from streaming.stream_table_file order by id").head()
val exceptedRow = Row(10, "name_10", "city_10", 100000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))
assertResult(exceptedRow)(row)
}
// bad records
test("streaming table with bad records action: fail") {
executeStreamingIngest(
tableName = "bad_record_fail",
batchNums = 2,
rowNumsEachBatch = 10,
intervalOfSource = 1,
intervalOfIngest = 1,
continueSeconds = 8,
generateBadRecords = true,
badRecordAction = "fail",
autoHandoff = false
)
val result = sql("select count(*) from streaming.bad_record_fail").collect()
assert(result(0).getLong(0) < 10 + 5)
}
// ingest with different interval
test("1 row per 1 second interval") {
executeStreamingIngest(
tableName = "stream_table_1s",
batchNums = 3,
rowNumsEachBatch = 1,
intervalOfSource = 1,
intervalOfIngest = 1,
continueSeconds = 6,
generateBadRecords = false,
badRecordAction = "force",
autoHandoff = false
)
val result = sql("select count(*) from streaming.stream_table_1s").collect()
// 20 seconds can't ingest all data, exists data delay
assert(result(0).getLong(0) > 5)
}
test("query on stream table with dictionary, sort_columns") {
executeStreamingIngest(
tableName = "stream_table_filter",
batchNums = 2,
rowNumsEachBatch = 25,
intervalOfSource = 5,
intervalOfIngest = 5,
continueSeconds = 20,
generateBadRecords = true,
badRecordAction = "force",
autoHandoff = false
)
// non-filter
val result = sql("select * from streaming.stream_table_filter order by id, name").collect()
assert(result != null)
assert(result.length == 55)
// check one row of streaming data
assert(result(1).isNullAt(0))
assert(result(1).getString(1) == "name_6")
// check one row of batch loading
assert(result(50).getInt(0) == 100000001)
assert(result(50).getString(1) == "batch_1")
// filter
checkAnswer(
sql("select * from stream_table_filter where id = 1"),
Seq(Row(1, "name_1", "city_1", 10000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
checkAnswer(
sql("select * from stream_table_filter where id > 49 and id < 100000002"),
Seq(Row(50, "name_50", "city_50", 500000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0")),
Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
checkAnswer(
sql("select * from stream_table_filter where id between 50 and 100000001"),
Seq(Row(50, "name_50", "city_50", 500000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0")),
Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
checkAnswer(
sql("select * from stream_table_filter where name in ('name_9','name_10', 'name_11', 'name_12') and id <> 10 and id not in (11, 12)"),
Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
checkAnswer(
sql("select * from stream_table_filter where name = 'name_3'"),
Seq(Row(3, "name_3", "city_3", 30000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
checkAnswer(
sql("select * from stream_table_filter where name like '%me_3%' and id < 30"),
Seq(Row(3, "name_3", "city_3", 30000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
checkAnswer(
sql("select * from stream_table_filter where name >= 'name_3' and id < 4"),
Seq(Row(3, "name_3", "city_3", 30000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
checkAnswer(
sql("select * from stream_table_filter where id in (9, 10, 11, 12) and name <> 'name_10' and name not in ('name_11', 'name_12')"),
Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
checkAnswer(
sql("select * from stream_table_filter where city = 'city_1'"),
Seq(Row(1, "name_1", "city_1", 10000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0")),
Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
checkAnswer(
sql("select * from stream_table_filter where city like '%ty_1%' and ( id < 10 or id >= 100000001)"),
Seq(Row(1, "name_1", "city_1", 10000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0")),
Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
checkAnswer(
sql("select * from stream_table_filter where city > 'city_09' and city < 'city_10'"),
Seq(Row(1, "name_1", "city_1", 10000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0")),
Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
checkAnswer(
sql("select * from stream_table_filter where city between 'city_09' and 'city_1'"),
Seq(Row(1, "name_1", "city_1", 10000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0")),
Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
checkAnswer(
sql("select * from stream_table_filter where id in (9, 10, 11, 12) and city <> 'city_10' and city not in ('city_11', 'city_12')"),
Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
checkAnswer(
sql("select * from stream_table_filter where salary = 90000"),
Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
checkAnswer(
sql("select * from stream_table_filter where salary > 80000 and salary <= 100000"),
Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")),
Row(10, "name_10", "city_10", 100000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
checkAnswer(
sql("select * from stream_table_filter where salary between 80001 and 90000"),
Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
checkAnswer(
sql("select * from stream_table_filter where id in (9, 10, 11, 12) and salary <> 100000.0 and salary not in (110000.0, 120000.0)"),
Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
checkAnswer(
sql("select * from stream_table_filter where tax = 0.04 and id < 100"),
Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
checkAnswer(
sql("select * from stream_table_filter where tax >= 0.04 and id < 100"),
Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
checkAnswer(
sql("select * from stream_table_filter where tax < 0.05 and tax > 0.02 and id < 100"),
Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
checkAnswer(
sql("select * from stream_table_filter where tax between 0.02 and 0.04 and id < 100"),
Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
checkAnswer(
sql("select * from stream_table_filter where id in (9, 10) and tax <> 0.01"),
Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
checkAnswer(
sql("select * from stream_table_filter where percent = 80.04 and id < 100"),
Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
checkAnswer(
sql("select * from stream_table_filter where percent >= 80.04 and id < 100"),
Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
checkAnswer(
sql("select * from stream_table_filter where percent < 80.05 and percent > 80.02 and id < 100"),
Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
checkAnswer(
sql("select * from stream_table_filter where percent between 80.02 and 80.05 and id < 100"),
Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
checkAnswer(
sql("select * from stream_table_filter where id in (9, 10) and percent <> 80.01"),
Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
checkAnswer(
sql("select * from stream_table_filter where birthday between '1990-01-04' and '1990-01-05'"),
Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")),
Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")),
Row(100000005, "batch_5", "city_5", 0.5, BigDecimal.valueOf(0.05), 80.05, Date.valueOf("1990-01-05"), Timestamp.valueOf("2010-01-05 10:01:01.0"), Timestamp.valueOf("2010-01-05 10:01:01.0"))))
checkAnswer(
sql("select * from stream_table_filter where birthday = '1990-01-04'"),
Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")),
Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
checkAnswer(
sql("select * from stream_table_filter where birthday > '1990-01-03' and birthday <= '1990-01-04'"),
Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")),
Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
checkAnswer(
sql("select * from stream_table_filter where birthday between '1990-01-04' and '1990-01-05'"),
Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")),
Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")),
Row(100000005, "batch_5", "city_5", 0.5, BigDecimal.valueOf(0.05), 80.05, Date.valueOf("1990-01-05"), Timestamp.valueOf("2010-01-05 10:01:01.0"), Timestamp.valueOf("2010-01-05 10:01:01.0"))))
checkAnswer(
sql("select * from stream_table_filter where id in (9, 10) and birthday <> '1990-01-01'"),
Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
checkAnswer(
sql("select * from stream_table_filter where register = '2010-01-04 10:01:01'"),
Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")),
Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
checkAnswer(
sql("select * from stream_table_filter where register > '2010-01-03 10:01:01' and register <= '2010-01-04 10:01:01'"),
Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")),
Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
checkAnswer(
sql("select * from stream_table_filter where register between '2010-01-04 10:01:01' and '2010-01-05 10:01:01'"),
Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")),
Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")),
Row(100000005, "batch_5", "city_5", 0.5, BigDecimal.valueOf(0.05), 80.05, Date.valueOf("1990-01-05"), Timestamp.valueOf("2010-01-05 10:01:01.0"), Timestamp.valueOf("2010-01-05 10:01:01.0"))))
checkAnswer(
sql("select * from stream_table_filter where id in (9, 10) and register <> '2010-01-01 10:01:01'"),
Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
checkAnswer(
sql("select * from stream_table_filter where updated = '2010-01-04 10:01:01'"),
Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")),
Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
checkAnswer(
sql("select * from stream_table_filter where updated > '2010-01-03 10:01:01' and register <= '2010-01-04 10:01:01'"),
Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")),
Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
checkAnswer(
sql("select * from stream_table_filter where updated between '2010-01-04 10:01:01' and '2010-01-05 10:01:01'"),
Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")),
Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")),
Row(100000005, "batch_5", "city_5", 0.5, BigDecimal.valueOf(0.05), 80.05, Date.valueOf("1990-01-05"), Timestamp.valueOf("2010-01-05 10:01:01.0"), Timestamp.valueOf("2010-01-05 10:01:01.0"))))
checkAnswer(
sql("select * from stream_table_filter where id in (9, 10) and updated <> '2010-01-01 10:01:01'"),
Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
checkAnswer(
sql("select * from stream_table_filter where id is null order by name"),
Seq(Row(null, "", "", null, null, null, null, null, null),
Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
checkAnswer(
sql("select * from stream_table_filter where name = ''"),
Seq(Row(null, "", "", null, null, null, null, null, null)))
checkAnswer(
sql("select * from stream_table_filter where id is null and name <> ''"),
Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
checkAnswer(
sql("select * from stream_table_filter where city = ''"),
Seq(Row(null, "", "", null, null, null, null, null, null)))
checkAnswer(
sql("select * from stream_table_filter where id is null and city <> ''"),
Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
checkAnswer(
sql("select * from stream_table_filter where salary is null"),
Seq(Row(null, "", "", null, null, null, null, null, null)))
checkAnswer(
sql("select * from stream_table_filter where id is null and salary is not null"),
Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
checkAnswer(
sql("select * from stream_table_filter where tax is null"),
Seq(Row(null, "", "", null, null, null, null, null, null)))
checkAnswer(
sql("select * from stream_table_filter where id is null and tax is not null"),
Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
checkAnswer(
sql("select * from stream_table_filter where percent is null"),
Seq(Row(null, "", "", null, null, null, null, null, null)))
checkAnswer(
sql("select * from stream_table_filter where id is null and percent is not null"),
Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
checkAnswer(
sql("select * from stream_table_filter where birthday is null"),
Seq(Row(null, "", "", null, null, null, null, null, null)))
checkAnswer(
sql("select * from stream_table_filter where id is null and birthday is not null"),
Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
checkAnswer(
sql("select * from stream_table_filter where register is null"),
Seq(Row(null, "", "", null, null, null, null, null, null)))
checkAnswer(
sql("select * from stream_table_filter where id is null and register is not null"),
Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
checkAnswer(
sql("select * from stream_table_filter where updated is null"),
Seq(Row(null, "", "", null, null, null, null, null, null)))
checkAnswer(
sql("select * from stream_table_filter where id is null and updated is not null"),
Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
// agg
checkAnswer(
sql("select count(*), max(id), min(name), cast(avg(id) as integer), sum(id) " +
"from stream_table_filter where id >= 2 and id <= 100000004"),
Seq(Row(51, 100000004, "batch_1", 7843162, 400001276)))
checkAnswer(
sql("select city, count(id), sum(id), cast(avg(id) as integer), " +
"max(salary), min(salary) " +
"from stream_table_filter " +
"where name in ('batch_1', 'batch_2', 'batch_3', 'name_1', 'name_2', 'name_3') " +
"and city <> '' " +
"group by city " +
"order by city"),
Seq(Row("city_1", 2, 100000002, 50000001, 10000.0, 0.1),
Row("city_2", 1, 100000002, 100000002, 0.2, 0.2),
Row("city_3", 2, 100000006, 50000003, 30000.0, 0.3)))
// batch loading
for(_ <- 0 to 2) {
executeBatchLoad("stream_table_filter")
}
checkAnswer(
sql("select count(*) from streaming.stream_table_filter"),
Seq(Row(25 * 2 + 5 + 5 * 3)))
sql("alter table streaming.stream_table_filter compact 'minor'")
Thread.sleep(5000)
val result1 = sql("show segments for table streaming.stream_table_filter").collect()
result1.foreach { row =>
if (row.getString(0).equals("1")) {
assertResult(SegmentStatus.STREAMING.getMessage)(row.getString(1))
assertResult(FileFormat.ROW_V1.toString)(row.getString(5))
} else if (row.getString(0).equals("0.1")) {
assertResult(SegmentStatus.SUCCESS.getMessage)(row.getString(1))
assertResult(FileFormat.COLUMNAR_V3.toString)(row.getString(5))
} else {
assertResult(SegmentStatus.COMPACTED.getMessage)(row.getString(1))
assertResult(FileFormat.COLUMNAR_V3.toString)(row.getString(5))
}
}
}
test("query on stream table with dictionary, sort_columns and complex column") {
executeStreamingIngest(
tableName = "stream_table_filter_complex",
batchNums = 2,
rowNumsEachBatch = 25,
intervalOfSource = 5,
intervalOfIngest = 5,
continueSeconds = 20,
generateBadRecords = true,
badRecordAction = "force",
autoHandoff = false
)
// non-filter
val result = sql("select * from streaming.stream_table_filter_complex order by id, name").collect()
assert(result != null)
assert(result.length == 55)
// check one row of streaming data
assert(result(0).isNullAt(0))
assert(result(0).getString(1) == "")
assert(result(0).getStruct(9).isNullAt(1))
// check one row of batch loading
assert(result(50).getInt(0) == 100000001)
assert(result(50).getString(1) == "batch_1")
assert(result(50).getStruct(9).getInt(1) == 20)
// filter
checkAnswer(
sql("select * from stream_table_filter_complex where id = 1"),
Seq(Row(1, "name_1", "city_1", 10000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_1", "school_11")), 1))))
checkAnswer(
sql("select * from stream_table_filter_complex where id > 49 and id < 100000002"),
Seq(Row(50, "name_50", "city_50", 500000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_50", "school_5050")), 50)),
Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_1", "school_11")), 20))))
checkAnswer(
sql("select * from stream_table_filter_complex where id between 50 and 100000001"),
Seq(Row(50, "name_50", "city_50", 500000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_50", "school_5050")), 50)),
Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_1", "school_11")), 20))))
checkAnswer(
sql("select * from stream_table_filter_complex where name = 'name_3'"),
Seq(Row(3, "name_3", "city_3", 30000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_3", "school_33")), 3))))
checkAnswer(
sql("select * from stream_table_filter_complex where name like '%me_3%' and id < 30"),
Seq(Row(3, "name_3", "city_3", 30000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_3", "school_33")), 3))))
checkAnswer(
sql("select * from stream_table_filter_complex where name >= 'name_3' and id < 4"),
Seq(Row(3, "name_3", "city_3", 30000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_3", "school_33")), 3))))
checkAnswer(
sql("select * from stream_table_filter_complex where city = 'city_1'"),
Seq(Row(1, "name_1", "city_1", 10000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_1", "school_11")), 1)),
Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_1", "school_11")), 20))))
checkAnswer(
sql("select * from stream_table_filter_complex where city like '%ty_1%' and ( id < 10 or id >= 100000001)"),
Seq(Row(1, "name_1", "city_1", 10000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_1", "school_11")), 1)),
Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_1", "school_11")), 20))))
checkAnswer(
sql("select * from stream_table_filter_complex where city > 'city_09' and city < 'city_10'"),
Seq(Row(1, "name_1", "city_1", 10000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_1", "school_11")), 1)),
Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_1", "school_11")), 20))))
checkAnswer(
sql("select * from stream_table_filter_complex where city between 'city_09' and 'city_1'"),
Seq(Row(1, "name_1", "city_1", 10000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_1", "school_11")), 1)),
Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_1", "school_11")), 20))))
checkAnswer(
sql("select * from stream_table_filter_complex where salary = 90000"),
Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9))))
checkAnswer(
sql("select * from stream_table_filter_complex where salary > 80000 and salary <= 100000"),
Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9)),
Row(10, "name_10", "city_10", 100000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_10", "school_1010")), 10))))
checkAnswer(
sql("select * from stream_table_filter_complex where salary between 80001 and 90000"),
Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9))))
checkAnswer(
sql("select * from stream_table_filter_complex where tax = 0.04 and id < 100"),
Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9))))
checkAnswer(
sql("select * from stream_table_filter_complex where tax >= 0.04 and id < 100"),
Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9))))
checkAnswer(
sql("select * from stream_table_filter_complex where tax < 0.05 and tax > 0.02 and id < 100"),
Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9))))
checkAnswer(
sql("select * from stream_table_filter_complex where tax between 0.02 and 0.04 and id < 100"),
Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9))))
checkAnswer(
sql("select * from stream_table_filter_complex where percent = 80.04 and id < 100"),
Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9))))
checkAnswer(
sql("select * from stream_table_filter_complex where percent >= 80.04 and id < 100"),
Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9))))
checkAnswer(
sql("select * from stream_table_filter_complex where percent < 80.05 and percent > 80.02 and id < 100"),
Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9))))
checkAnswer(
sql("select * from stream_table_filter_complex where percent between 80.02 and 80.05 and id < 100"),
Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9))))
checkAnswer(
sql("select * from stream_table_filter_complex where birthday between '1990-01-04' and '1990-01-05'"),
Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9)),
Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_4", "school_44")), 50)),
Row(100000005, "batch_5", "city_5", 0.5, BigDecimal.valueOf(0.05), 80.05, Date.valueOf("1990-01-05"), Timestamp.valueOf("2010-01-05 10:01:01.0"), Timestamp.valueOf("2010-01-05 10:01:01.0"), Row(wrap(Array("school_5", "school_55")), 60))))
checkAnswer(
sql("select * from stream_table_filter_complex where birthday = '1990-01-04'"),
Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9)),
Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_4", "school_44")), 50))))
checkAnswer(
sql("select * from stream_table_filter_complex where birthday > '1990-01-03' and birthday <= '1990-01-04'"),
Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9)),
Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_4", "school_44")), 50))))
checkAnswer(
sql("select * from stream_table_filter_complex where birthday between '1990-01-04' and '1990-01-05'"),
Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9)),
Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_4", "school_44")), 50)),
Row(100000005, "batch_5", "city_5", 0.5, BigDecimal.valueOf(0.05), 80.05, Date.valueOf("1990-01-05"), Timestamp.valueOf("2010-01-05 10:01:01.0"), Timestamp.valueOf("2010-01-05 10:01:01.0"), Row(wrap(Array("school_5", "school_55")), 60))))
checkAnswer(
sql("select * from stream_table_filter_complex where register = '2010-01-04 10:01:01'"),
Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9)),
Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_4", "school_44")), 50))))
checkAnswer(
sql("select * from stream_table_filter_complex where register > '2010-01-03 10:01:01' and register <= '2010-01-04 10:01:01'"),
Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9)),
Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_4", "school_44")), 50))))
checkAnswer(
sql("select * from stream_table_filter_complex where register between '2010-01-04 10:01:01' and '2010-01-05 10:01:01'"),
Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9)),
Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_4", "school_44")),50)),
Row(100000005, "batch_5", "city_5", 0.5, BigDecimal.valueOf(0.05), 80.05, Date.valueOf("1990-01-05"), Timestamp.valueOf("2010-01-05 10:01:01.0"), Timestamp.valueOf("2010-01-05 10:01:01.0"), Row(wrap(Array("school_5", "school_55")), 60))))
checkAnswer(
sql("select * from stream_table_filter_complex where updated = '2010-01-04 10:01:01'"),
Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9)),
Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_4", "school_44")), 50))))
checkAnswer(
sql("select * from stream_table_filter_complex where updated > '2010-01-03 10:01:01' and register <= '2010-01-04 10:01:01'"),
Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9)),
Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_4", "school_44")), 50))))
checkAnswer(
sql("select * from stream_table_filter_complex where updated between '2010-01-04 10:01:01' and '2010-01-05 10:01:01'"),
Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9)),
Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_4", "school_44")), 50)),
Row(100000005, "batch_5", "city_5", 0.5, BigDecimal.valueOf(0.05), 80.05, Date.valueOf("1990-01-05"), Timestamp.valueOf("2010-01-05 10:01:01.0"), Timestamp.valueOf("2010-01-05 10:01:01.0"), Row(wrap(Array("school_5", "school_55")), 60))))
checkAnswer(
sql("select * from stream_table_filter_complex where id is null order by name"),
Seq(Row(null, "", "", null, null, null, null, null, null, Row(wrap(Array(null, null)), null)),
Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_6", "school_66")), 6))))
checkAnswer(
sql("select * from stream_table_filter_complex where name = ''"),
Seq(Row(null, "", "", null, null, null, null, null, null, Row(wrap(Array(null, null)), null))))
checkAnswer(
sql("select * from stream_table_filter_complex where id is null and name <> ''"),
Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_6", "school_66")), 6))))
checkAnswer(
sql("select * from stream_table_filter_complex where city = ''"),
Seq(Row(null, "", "", null, null, null, null, null, null, Row(wrap(Array(null, null)), null))))
checkAnswer(
sql("select * from stream_table_filter_complex where id is null and city <> ''"),
Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_6", "school_66")), 6))))
checkAnswer(
sql("select * from stream_table_filter_complex where salary is null"),
Seq(Row(null, "", "", null, null, null, null, null, null, Row(wrap(Array(null, null)), null))))
checkAnswer(
sql("select * from stream_table_filter_complex where id is null and salary is not null"),
Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_6", "school_66")), 6))))
checkAnswer(
sql("select * from stream_table_filter_complex where tax is null"),
Seq(Row(null, "", "", null, null, null, null, null, null, Row(wrap(Array(null, null)), null))))
checkAnswer(
sql("select * from stream_table_filter_complex where id is null and tax is not null"),
Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_6", "school_66")), 6))))
checkAnswer(
sql("select * from stream_table_filter_complex where percent is null"),
Seq(Row(null, "", "", null, null, null, null, null, null, Row(wrap(Array(null, null)), null))))
checkAnswer(
sql("select * from stream_table_filter_complex where id is null and salary is not null"),
Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_6", "school_66")), 6))))
checkAnswer(
sql("select * from stream_table_filter_complex where birthday is null"),
Seq(Row(null, "", "", null, null, null, null, null, null, Row(wrap(Array(null, null)), null))))
checkAnswer(
sql("select * from stream_table_filter_complex where id is null and birthday is not null"),
Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_6", "school_66")), 6))))
checkAnswer(
sql("select * from stream_table_filter_complex where register is null"),
Seq(Row(null, "", "", null, null, null, null, null, null, Row(wrap(Array(null, null)), null))))
checkAnswer(
sql("select * from stream_table_filter_complex where id is null and register is not null"),
Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_6", "school_66")), 6))))
checkAnswer(
sql("select * from stream_table_filter_complex where updated is null"),
Seq(Row(null, "", "", null, null, null, null, null, null, Row(wrap(Array(null, null)), null))))
checkAnswer(
sql("select * from stream_table_filter_complex where id is null and updated is not null"),
Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_6", "school_66")), 6))))
// agg
checkAnswer(
sql("select count(*), max(id), min(name), cast(avg(file.age) as integer), sum(file.age) " +
"from stream_table_filter_complex where id >= 2 and id <= 100000004"),
Seq(Row(51, 100000004, "batch_1", 27, 1406)))
checkAnswer(
sql("select city, count(id), sum(id), cast(avg(file.age) as integer), " +
"max(salary), min(salary) " +
"from stream_table_filter_complex " +
"where name in ('batch_1', 'batch_2', 'batch_3', 'name_1', 'name_2', 'name_3') " +
"and city <> '' " +
"group by city " +
"order by city"),
Seq(Row("city_1", 2, 100000002, 10, 10000.0, 0.1),
Row("city_2", 1, 100000002, 30, 0.2, 0.2),
Row("city_3", 2, 100000006, 21, 30000.0, 0.3)))
}
test("test deleting streaming segment by ID while ingesting") {
executeStreamingIngest(
tableName = "stream_table_delete_id",
batchNums = 3,
rowNumsEachBatch = 100,
intervalOfSource = 5,
intervalOfIngest = 5,
continueSeconds = 18,
generateBadRecords = false,
badRecordAction = "force",
handoffSize = 1,
autoHandoff = false
)
val beforeDelete = sql("show segments for table streaming.stream_table_delete_id").collect()
val segmentIds1 = beforeDelete.filter(_.getString(1).equals("Streaming")).map(_.getString(0)).mkString(",")
val msg = intercept[Exception] {
sql(s"delete from table streaming.stream_table_delete_id where segment.id in ($segmentIds1) ")
}
assertResult(s"Delete segment by Id is failed. Invalid ID is: ${beforeDelete.length -1}")(msg.getMessage)
val segmentIds2 = beforeDelete.filter(_.getString(1).equals("Streaming Finish"))
.map(_.getString(0)).mkString(",")
sql(s"delete from table streaming.stream_table_delete_id where segment.id in ($segmentIds2) ")
val afterDelete = sql("show segments for table streaming.stream_table_delete_id").collect()
afterDelete.filter(!_.getString(1).equals("Streaming")).foreach { row =>
assertResult(SegmentStatus.MARKED_FOR_DELETE.getMessage)(row.getString(1))
}
}
test("test deleting streaming segment by date while ingesting") {
executeStreamingIngest(
tableName = "stream_table_delete_date",
batchNums = 3,
rowNumsEachBatch = 100,
intervalOfSource = 5,
intervalOfIngest = 5,
continueSeconds = 18,
generateBadRecords = false,
badRecordAction = "force",
handoffSize = 1,
autoHandoff = false
)
val beforeDelete = sql("show segments for table streaming.stream_table_delete_date").collect()
sql(s"delete from table streaming.stream_table_delete_date where segment.starttime before " +
s"'2999-10-01 01:00:00'")
val segmentIds = beforeDelete.filter(_.getString(1).equals("Streaming"))
assertResult(1)(segmentIds.length)
val afterDelete = sql("show segments for table streaming.stream_table_delete_date").collect()
afterDelete.filter(!_.getString(1).equals("Streaming")).foreach { row =>
assertResult(SegmentStatus.MARKED_FOR_DELETE.getMessage)(row.getString(1))
}
}
test("reject alter streaming properties and handoff 'streaming finish' segment to columnar segment") {
try {
sql("ALTER TABLE streaming.stream_table_handoff UNSET TBLPROPERTIES IF EXISTS ('streaming')")
assert(false, "unsupport to unset streaming property")
} catch {
case _ =>
assert(true)
}
try {
sql("ALTER TABLE streaming.stream_table_handoff SET TBLPROPERTIES('streaming'='true')")
executeStreamingIngest(
tableName = "stream_table_handoff",
batchNums = 2,
rowNumsEachBatch = 100,
intervalOfSource = 5,
intervalOfIngest = 5,
continueSeconds = 20,
generateBadRecords = false,
badRecordAction = "force",
handoffSize = 1L,
autoHandoff = false
)
} catch {
case _ =>
assert(false, "should support set table to streaming")
}
// alter streaming table's streaming property
val msg = intercept[MalformedCarbonCommandException](sql("alter table streaming.stream_table_handoff set tblproperties('streaming'='false')"))
assertResult("Streaming property can not be changed once it is 'true'")(msg.getMessage)
val segments = sql("show segments for table streaming.stream_table_handoff").collect()
assert(segments.length == 2 || segments.length == 3)
assertResult("Streaming")(segments(0).getString(1))
assertResult("Streaming Finish")(segments(1).getString(1))
checkAnswer(
sql("select count(*) from streaming.stream_table_handoff"),
Seq(Row(2 * 100))
)
val resultBeforeHandoff = sql("select * from streaming.stream_table_handoff order by id, name").collect()
sql("alter table streaming.stream_table_handoff compact 'streaming'")
Thread.sleep(5000)
val resultAfterHandoff = sql("select * from streaming.stream_table_handoff order by id, name").collect()
assertResult(resultBeforeHandoff)(resultAfterHandoff)
val newSegments = sql("show segments for table streaming.stream_table_handoff").collect()
assert(newSegments.length == 3 || newSegments.length == 5)
assertResult("Streaming")(newSegments((newSegments.length - 1) / 2).getString(1))
(0 until (newSegments.length - 1) / 2).foreach{ i =>
assertResult("Success")(newSegments(i).getString(1))
}
((newSegments.length + 1) / 2 until newSegments.length).foreach{ i =>
assertResult("Compacted")(newSegments(i).getString(1))
}
sql("alter table streaming.stream_table_handoff finish streaming")
val newSegments1 = sql("show segments for table streaming.stream_table_handoff").collect()
assertResult("Streaming Finish")(newSegments1((newSegments.length - 1) / 2).getString(1))
checkAnswer(
sql("select count(*) from streaming.stream_table_handoff"),
Seq(Row(2 * 100))
)
try {
sql("ALTER TABLE stream_table_handoff SET TBLPROPERTIES('streaming'='false')")
assert(false, "unsupport disable streaming properties")
} catch {
case _ =>
assert(true)
}
}
test("auto hand off, close and reopen streaming table") {
executeStreamingIngest(
tableName = "stream_table_reopen",
batchNums = 2,
rowNumsEachBatch = 100,
intervalOfSource = 5,
intervalOfIngest = 5,
continueSeconds = 20,
generateBadRecords = false,
badRecordAction = "force",
handoffSize = 1L,
autoHandoff = false
)
val table1 =
CarbonEnv.getCarbonTable(Option("streaming"), "stream_table_reopen")(spark)
assertResult(true)(table1.isStreamingTable)
sql("alter table streaming.stream_table_reopen compact 'close_streaming'")
val segments =
sql("show segments for table streaming.stream_table_reopen").collect()
assert(segments.length == 4 || segments.length == 6)
assertResult(segments.length / 2)(segments.filter(_.getString(1).equals("Success")).length)
assertResult(segments.length / 2)(segments.filter(_.getString(1).equals("Compacted")).length)
checkAnswer(
sql("select count(*) from streaming.stream_table_reopen"),
Seq(Row(2 * 100))
)
val table2 =
CarbonEnv.getCarbonTable(Option("streaming"), "stream_table_reopen")(spark)
assertResult(false)(table2.isStreamingTable)
sql("ALTER TABLE streaming.stream_table_reopen SET TBLPROPERTIES('streaming'='true')")
val table3 =
CarbonEnv.getCarbonTable(Option("streaming"), "stream_table_reopen")(spark)
assertResult(true)(table3.isStreamingTable)
executeStreamingIngest(
tableName = "stream_table_reopen",
batchNums = 2,
rowNumsEachBatch = 100,
intervalOfSource = 5,
intervalOfIngest = 5,
continueSeconds = 20,
generateBadRecords = false,
badRecordAction = "force",
handoffSize = 1L,
autoHandoff = true
)
Thread.sleep(10000)
val newSegments1 =
sql("show segments for table streaming.stream_table_reopen").collect()
assert(newSegments1.length == 7 || newSegments1.length == 9 || newSegments1.length == 11)
assertResult(1)(newSegments1.filter(_.getString(1).equals("Streaming")).length)
assertResult((newSegments1.length - 1) / 2)(newSegments1.filter(_.getString(1).equals("Success")).length)
assertResult((newSegments1.length - 1) / 2)(newSegments1.filter(_.getString(1).equals("Compacted")).length)
sql("alter table streaming.stream_table_reopen compact 'close_streaming'")
val newSegments =
sql("show segments for table streaming.stream_table_reopen").collect()
assert(newSegments.length == 8 || newSegments.length == 10 || newSegments.length == 12)
assertResult(newSegments.length / 2)(newSegments.filter(_.getString(1).equals("Success")).length)
assertResult(newSegments.length / 2)(newSegments.filter(_.getString(1).equals("Compacted")).length)
//Verify MergeTO column entry for compacted Segments
newSegments.filter(_.getString(1).equals("Compacted")).foreach{ rw =>
assertResult("Compacted")(rw.getString(1))
assertResult((Integer.parseInt(rw.getString(0))+2).toString)(rw.getString(4))
}
checkAnswer(
sql("select count(*) from streaming.stream_table_reopen"),
Seq(Row(2 * 100 * 2))
)
}
test("block drop streaming table while streaming is in progress") {
val identifier = new TableIdentifier("stream_table_drop", Option("streaming"))
val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore.lookupRelation(identifier)(spark)
.asInstanceOf[CarbonRelation].metaData.carbonTable
val tablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
var server: ServerSocket = null
try {
server = getServerSocket
val thread1 = createWriteSocketThread(server, 2, 10, 3)
val thread2 = createSocketStreamingThread(spark, server.getLocalPort, tablePath, identifier, "force", 5, 1024L * 200, false)
thread1.start()
thread2.start()
Thread.sleep(1000)
val msg = intercept[ProcessMetaDataException] {
sql(s"drop table streaming.stream_table_drop")
}
assert(msg.getMessage.contains("Dropping table streaming.stream_table_drop failed: Acquire table lock failed after retry, please try after some time"))
thread1.interrupt()
thread2.interrupt()
} finally {
if (server != null) {
server.close()
}
}
}
test("do not support creating datamap on streaming table") {
assert(
intercept[MalformedCarbonCommandException](
sql("CREATE DATAMAP datamap ON TABLE source " +
"USING 'preaggregate'" +
" AS SELECT c1, sum(c2) FROM source GROUP BY c1")
).getMessage.contains("Streaming table does not support creating datamap"))
}
test("check streaming property of table") {
checkExistence(sql("DESC FORMATTED batch_table"), true, "Streaming")
val result =
sql("DESC FORMATTED batch_table").collect().filter(_.getString(0).trim.equals("Streaming"))
assertResult(1)(result.length)
assertResult("false")(result(0).getString(1).trim)
checkExistence(sql("DESC FORMATTED stream_table_file"), true, "Streaming")
val resultStreaming = sql("DESC FORMATTED stream_table_file").collect()
.filter(_.getString(0).trim.equals("Streaming"))
assertResult(1)(resultStreaming.length)
assertResult("true")(resultStreaming(0).getString(1).trim)
}
test("block streaming for 'preaggregate' table") {
sql("create datamap agg_table_block_agg0 on table streaming.agg_table_block using 'preaggregate' as select city, count(name) from streaming.agg_table_block group by city")
val msg = intercept[MalformedCarbonCommandException](sql("ALTER TABLE streaming.agg_table_block SET TBLPROPERTIES('streaming'='true')"))
assertResult("The table has 'preaggregate' DataMap, it doesn't support streaming")(msg.getMessage)
}
def createWriteSocketThread(
serverSocket: ServerSocket,
writeNums: Int,
rowNums: Int,
intervalSecond: Int,
badRecords: Boolean = false): Thread = {
new Thread() {
override def run(): Unit = {
// wait for client to connection request and accept
val clientSocket = serverSocket.accept()
val socketWriter = new PrintWriter(clientSocket.getOutputStream())
var index = 0
var timeRow = true
for (_ <- 1 to writeNums) {
// write 5 records per iteration
val stringBuilder = new StringBuilder()
for (_ <- 1 to rowNums) {
index = index + 1
if (badRecords) {
if (index == 2) {
// null value
stringBuilder.append(",,,,,,,,,")
} else if (index == 6) {
// illegal number
stringBuilder.append(index.toString + "abc,name_" + index
+ ",city_" + index + "," + (10000.00 * index).toString + ",0.01,80.01" +
",1990-01-01,2010-01-01 10:01:01,2010-01-01 10:01:01" +
",school_" + index + ":school_" + index + index + "$" + index)
} else {
if (index == 9 && timeRow) {
timeRow = false
stringBuilder.append(index.toString + ",name_" + index
+ ",city_" + index + "," + (10000.00 * index).toString + ",0.04,80.04" +
",1990-01-04,2010-01-04 10:01:01,2010-01-04 10:01:01" +
",school_" + index + ":school_" + index + index + "$" + index)
} else {
stringBuilder.append(index.toString + ",name_" + index
+ ",city_" + index + "," + (10000.00 * index).toString + ",0.01,80.01" +
",1990-01-01,2010-01-01 10:01:01,2010-01-01 10:01:01" +
",school_" + index + ":school_" + index + index + "$" + index)
}
}
} else {
stringBuilder.append(index.toString + ",name_" + index
+ ",city_" + index + "," + (10000.00 * index).toString + ",0.01,80.01" +
",1990-01-01,2010-01-01 10:01:01,2010-01-01 10:01:01" +
",school_" + index + ":school_" + index + index + "$" + index)
}
stringBuilder.append("\n")
}
socketWriter.append(stringBuilder.toString())
socketWriter.flush()
Thread.sleep(1000 * intervalSecond)
}
socketWriter.close()
}
}
}
def createSocketStreamingThread(
spark: SparkSession,
port: Int,
tablePath: CarbonTablePath,
tableIdentifier: TableIdentifier,
badRecordAction: String = "force",
intervalSecond: Int = 2,
handoffSize: Long = CarbonCommonConstants.HANDOFF_SIZE_DEFAULT,
autoHandoff: Boolean = CarbonCommonConstants.ENABLE_AUTO_HANDOFF_DEFAULT.toBoolean
): Thread = {
new Thread() {
override def run(): Unit = {
var qry: StreamingQuery = null
try {
val readSocketDF = spark.readStream
.format("socket")
.option("host", "localhost")
.option("port", port)
.load()
// Write data from socket stream to carbondata file
qry = readSocketDF.writeStream
.format("carbondata")
.trigger(ProcessingTime(s"$intervalSecond seconds"))
.option("checkpointLocation", tablePath.getStreamingCheckpointDir)
.option("bad_records_action", badRecordAction)
.option("dbName", tableIdentifier.database.get)
.option("tableName", tableIdentifier.table)
.option(CarbonCommonConstants.HANDOFF_SIZE, handoffSize)
.option("timestampformat", CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
.option(CarbonCommonConstants.ENABLE_AUTO_HANDOFF, autoHandoff)
.start()
qry.awaitTermination()
} catch {
case ex =>
throw new Exception(ex.getMessage)
} finally {
if (null != qry) {
qry.stop()
}
}
}
}
}
/**
* start ingestion thread: write `rowNumsEachBatch` rows repeatly for `batchNums` times.
*/
def executeStreamingIngest(
tableName: String,
batchNums: Int,
rowNumsEachBatch: Int,
intervalOfSource: Int,
intervalOfIngest: Int,
continueSeconds: Int,
generateBadRecords: Boolean,
badRecordAction: String,
handoffSize: Long = CarbonCommonConstants.HANDOFF_SIZE_DEFAULT,
autoHandoff: Boolean = CarbonCommonConstants.ENABLE_AUTO_HANDOFF_DEFAULT.toBoolean
): Unit = {
val identifier = new TableIdentifier(tableName, Option("streaming"))
val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore.lookupRelation(identifier)(spark)
.asInstanceOf[CarbonRelation].metaData.carbonTable
val tablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
var server: ServerSocket = null
try {
server = getServerSocket()
val thread1 = createWriteSocketThread(
serverSocket = server,
writeNums = batchNums,
rowNums = rowNumsEachBatch,
intervalSecond = intervalOfSource,
badRecords = generateBadRecords)
val thread2 = createSocketStreamingThread(
spark = spark,
port = server.getLocalPort,
tablePath = tablePath,
tableIdentifier = identifier,
badRecordAction = badRecordAction,
intervalSecond = intervalOfIngest,
handoffSize = handoffSize,
autoHandoff = autoHandoff)
thread1.start()
thread2.start()
Thread.sleep(continueSeconds * 1000)
thread2.interrupt()
thread1.interrupt()
} finally {
if (null != server) {
server.close()
}
}
}
def generateCSVDataFile(
spark: SparkSession,
idStart: Int,
rowNums: Int,
csvDirPath: String): Unit = {
// Create csv data frame file
val csvRDD = spark.sparkContext.parallelize(idStart until idStart + rowNums)
.map { id =>
(id,
"name_" + id,
"city_" + id,
10000.00 * id,
BigDecimal.valueOf(0.01),
80.01,
"1990-01-01",
"2010-01-01 10:01:01",
"2010-01-01 10:01:01",
"school_" + id + ":school_" + id + id + "$" + id)
}
val csvDataDF = spark.createDataFrame(csvRDD).toDF(
"id", "name", "city", "salary", "tax", "percent", "birthday", "register", "updated", "file")
csvDataDF.write
.option("header", "false")
.mode(SaveMode.Overwrite)
.csv(csvDirPath)
}
def createFileStreamingThread(
spark: SparkSession,
tablePath: CarbonTablePath,
csvDataDir: String,
intervalSecond: Int,
tableIdentifier: TableIdentifier): Thread = {
new Thread() {
override def run(): Unit = {
var qry: StreamingQuery = null
try {
val readSocketDF = spark.readStream.text(csvDataDir)
// Write data from socket stream to carbondata file
qry = readSocketDF.writeStream
.format("carbondata")
.trigger(ProcessingTime(s"${ intervalSecond } seconds"))
.option("checkpointLocation", tablePath.getStreamingCheckpointDir)
.option("dbName", tableIdentifier.database.get)
.option("tableName", tableIdentifier.table)
.option("timestampformat", CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
.start()
qry.awaitTermination()
} catch {
case _: InterruptedException =>
println("Done reading and writing streaming data")
} finally {
if (qry != null) {
qry.stop()
}
}
}
}
}
def createTable(tableName: String, streaming: Boolean, withBatchLoad: Boolean): Unit = {
sql(
s"""
| CREATE TABLE streaming.$tableName(
| id INT,
| name STRING,
| city STRING,
| salary FLOAT,
| tax DECIMAL(8,2),
| percent double,
| birthday DATE,
| register TIMESTAMP,
| updated TIMESTAMP
| )
| STORED BY 'carbondata'
| TBLPROPERTIES(${if (streaming) "'streaming'='true', " else "" }
| 'sort_columns'='name', 'dictionary_include'='city,register')
| """.stripMargin)
if (withBatchLoad) {
// batch loading 5 rows
executeBatchLoad(tableName)
}
}
def createTableWithComplexType(
tableName: String,
streaming: Boolean,
withBatchLoad: Boolean): Unit = {
sql(
s"""
| CREATE TABLE streaming.$tableName(
| id INT,
| name STRING,
| city STRING,
| salary FLOAT,
| tax DECIMAL(8,2),
| percent double,
| birthday DATE,
| register TIMESTAMP,
| updated TIMESTAMP,
| file struct<school:array<string>, age:int>
| )
| STORED BY 'carbondata'
| TBLPROPERTIES(${if (streaming) "'streaming'='true', " else "" }
| 'sort_columns'='name', 'dictionary_include'='id,name,salary,tax,percent,updated')
| """.stripMargin)
if (withBatchLoad) {
// batch loading 5 rows
executeBatchLoad(tableName)
}
}
def executeBatchLoad(tableName: String): Unit = {
sql(
s"""
| LOAD DATA LOCAL INPATH '$dataFilePath'
| INTO TABLE streaming.$tableName
| OPTIONS('HEADER'='true')
""".stripMargin)
}
def wrap(array: Array[String]) = {
new mutable.WrappedArray.ofRef(array)
}
/**
* get a ServerSocket
* if the address was already used, it will retry to use new port number.
*
* @return ServerSocket
*/
def getServerSocket(): ServerSocket = {
var port = 7071
var serverSocket: ServerSocket = null
var retry = false
do {
try {
retry = false
serverSocket = new ServerSocket(port)
} catch {
case ex: BindException =>
retry = true
port = port + 2
if (port >= 65535) {
throw ex
}
}
} while (retry)
serverSocket
}
}