blob: 226cf5313f5d04ca9a5008556ebebe4d55320ec0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.functional
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.exception.TableNotFoundException
import org.apache.hudi.testutils.HoodieClientTestBase
import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers}
import org.apache.log4j.LogManager
import org.apache.spark.sql._
import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime}
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import scala.collection.JavaConversions._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, Future}
/**
* Basic tests on the spark datasource for structured streaming sink
*/
class TestStructuredStreaming extends HoodieClientTestBase {
private val log = LogManager.getLogger(getClass)
var spark: SparkSession = null
val commonOpts = Map(
"hoodie.insert.shuffle.parallelism" -> "4",
"hoodie.upsert.shuffle.parallelism" -> "4",
DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "_row_key",
DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "partition",
DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "timestamp",
HoodieWriteConfig.TABLE_NAME -> "hoodie_test"
)
@BeforeEach override def setUp() {
initPath()
initSparkContexts()
spark = sqlContext.sparkSession
initTestDataGenerator()
initFileSystem()
}
@AfterEach override def tearDown() = {
cleanupSparkContexts()
cleanupTestDataGenerator()
cleanupFileSystem()
}
@Test
def testStructuredStreaming(): Unit = {
fs.delete(new Path(basePath), true)
val sourcePath = basePath + "/source"
val destPath = basePath + "/dest"
fs.mkdirs(new Path(sourcePath))
// First chunk of data
val records1 = recordsToStrings(dataGen.generateInserts("000", 100)).toList
val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
// Second chunk of data
val records2 = recordsToStrings(dataGen.generateUpdates("001", 100)).toList
val inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2, 2))
val uniqueKeyCnt = inputDF2.select("_row_key").distinct().count()
// define the source of streaming
val streamingInput =
spark.readStream
.schema(inputDF1.schema)
.json(sourcePath)
val f1 = Future {
println("streaming starting")
//'writeStream' can be called only on streaming Dataset/DataFrame
streamingInput
.writeStream
.format("org.apache.hudi")
.options(commonOpts)
.trigger(new ProcessingTime(100))
.option("checkpointLocation", basePath + "/checkpoint")
.outputMode(OutputMode.Append)
.start(destPath)
.awaitTermination(10000)
println("streaming ends")
}
val f2 = Future {
inputDF1.coalesce(1).write.mode(SaveMode.Append).json(sourcePath)
// wait for spark streaming to process one microbatch
val currNumCommits = waitTillAtleastNCommits(fs, destPath, 1, 120, 5)
assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, destPath, "000"))
val commitInstantTime1 = HoodieDataSourceHelpers.latestCommit(fs, destPath)
// Read RO View
val hoodieROViewDF1 = spark.read.format("org.apache.hudi")
.load(destPath + "/*/*/*/*")
assert(hoodieROViewDF1.count() == 100)
inputDF2.coalesce(1).write.mode(SaveMode.Append).json(sourcePath)
// wait for spark streaming to process one microbatch
waitTillAtleastNCommits(fs, destPath, currNumCommits + 1, 120, 5)
val commitInstantTime2 = HoodieDataSourceHelpers.latestCommit(fs, destPath)
assertEquals(2, HoodieDataSourceHelpers.listCommitsSince(fs, destPath, "000").size())
// Read RO View
val hoodieROViewDF2 = spark.read.format("org.apache.hudi")
.load(destPath + "/*/*/*/*")
assertEquals(100, hoodieROViewDF2.count()) // still 100, since we only updated
// Read Incremental View
// we have 2 commits, try pulling the first commit (which is not the latest)
val firstCommit = HoodieDataSourceHelpers.listCommitsSince(fs, destPath, "000").get(0)
val hoodieIncViewDF1 = spark.read.format("org.apache.hudi")
.option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
.option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, "000")
.option(DataSourceReadOptions.END_INSTANTTIME_OPT_KEY, firstCommit)
.load(destPath)
assertEquals(100, hoodieIncViewDF1.count())
// 100 initial inserts must be pulled
var countsPerCommit = hoodieIncViewDF1.groupBy("_hoodie_commit_time").count().collect()
assertEquals(1, countsPerCommit.length)
assertEquals(firstCommit, countsPerCommit(0).get(0))
// pull the latest commit
val hoodieIncViewDF2 = spark.read.format("org.apache.hudi")
.option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
.option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, commitInstantTime1)
.load(destPath)
assertEquals(uniqueKeyCnt, hoodieIncViewDF2.count()) // 100 records must be pulled
countsPerCommit = hoodieIncViewDF2.groupBy("_hoodie_commit_time").count().collect()
assertEquals(1, countsPerCommit.length)
assertEquals(commitInstantTime2, countsPerCommit(0).get(0))
}
Await.result(Future.sequence(Seq(f1, f2)), Duration.Inf)
}
@throws[InterruptedException]
private def waitTillAtleastNCommits(fs: FileSystem, tablePath: String,
numCommits: Int, timeoutSecs: Int, sleepSecsAfterEachRun: Int) = {
val beginTime = System.currentTimeMillis
var currTime = beginTime
val timeoutMsecs = timeoutSecs * 1000
var numInstants = 0
var success = false
while ({!success && (currTime - beginTime) < timeoutMsecs}) try {
val timeline = HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs, tablePath)
log.info("Timeline :" + timeline.getInstants.toArray)
if (timeline.countInstants >= numCommits) {
numInstants = timeline.countInstants
success = true
}
val metaClient = new HoodieTableMetaClient(fs.getConf, tablePath, true)
} catch {
case te: TableNotFoundException =>
log.info("Got table not found exception. Retrying")
} finally {
Thread.sleep(sleepSecsAfterEachRun * 1000)
currTime = System.currentTimeMillis
}
if (!success) throw new IllegalStateException("Timed-out waiting for " + numCommits + " commits to appear in " + tablePath)
numInstants
}
}