| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hudi.functional |
| |
| import org.apache.hadoop.fs.{FileSystem, Path} |
| import org.apache.hudi.common.table.HoodieTableMetaClient |
| import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings |
| import org.apache.hudi.config.HoodieWriteConfig |
| import org.apache.hudi.exception.TableNotFoundException |
| import org.apache.hudi.testutils.HoodieClientTestBase |
| import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers} |
| import org.apache.log4j.LogManager |
| import org.apache.spark.sql._ |
| import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime} |
| import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue} |
| import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} |
| |
| import scala.collection.JavaConversions._ |
| import scala.concurrent.ExecutionContext.Implicits.global |
| import scala.concurrent.duration.Duration |
| import scala.concurrent.{Await, Future} |
| |
| /** |
| * Basic tests on the spark datasource for structured streaming sink |
| */ |
| class TestStructuredStreaming extends HoodieClientTestBase { |
| private val log = LogManager.getLogger(getClass) |
| var spark: SparkSession = null |
| val commonOpts = Map( |
| "hoodie.insert.shuffle.parallelism" -> "4", |
| "hoodie.upsert.shuffle.parallelism" -> "4", |
| DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "_row_key", |
| DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "partition", |
| DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "timestamp", |
| HoodieWriteConfig.TABLE_NAME -> "hoodie_test" |
| ) |
| |
| @BeforeEach override def setUp() { |
| initPath() |
| initSparkContexts() |
| spark = sqlContext.sparkSession |
| initTestDataGenerator() |
| initFileSystem() |
| } |
| |
| @AfterEach override def tearDown() = { |
| cleanupSparkContexts() |
| cleanupTestDataGenerator() |
| cleanupFileSystem() |
| } |
| |
| @Test |
| def testStructuredStreaming(): Unit = { |
| fs.delete(new Path(basePath), true) |
| val sourcePath = basePath + "/source" |
| val destPath = basePath + "/dest" |
| fs.mkdirs(new Path(sourcePath)) |
| |
| // First chunk of data |
| val records1 = recordsToStrings(dataGen.generateInserts("000", 100)).toList |
| val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2)) |
| |
| // Second chunk of data |
| val records2 = recordsToStrings(dataGen.generateUpdates("001", 100)).toList |
| val inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2, 2)) |
| val uniqueKeyCnt = inputDF2.select("_row_key").distinct().count() |
| |
| // define the source of streaming |
| val streamingInput = |
| spark.readStream |
| .schema(inputDF1.schema) |
| .json(sourcePath) |
| |
| val f1 = Future { |
| println("streaming starting") |
| //'writeStream' can be called only on streaming Dataset/DataFrame |
| streamingInput |
| .writeStream |
| .format("org.apache.hudi") |
| .options(commonOpts) |
| .trigger(new ProcessingTime(100)) |
| .option("checkpointLocation", basePath + "/checkpoint") |
| .outputMode(OutputMode.Append) |
| .start(destPath) |
| .awaitTermination(10000) |
| println("streaming ends") |
| } |
| |
| val f2 = Future { |
| inputDF1.coalesce(1).write.mode(SaveMode.Append).json(sourcePath) |
| // wait for spark streaming to process one microbatch |
| val currNumCommits = waitTillAtleastNCommits(fs, destPath, 1, 120, 5) |
| assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, destPath, "000")) |
| val commitInstantTime1 = HoodieDataSourceHelpers.latestCommit(fs, destPath) |
| // Read RO View |
| val hoodieROViewDF1 = spark.read.format("org.apache.hudi") |
| .load(destPath + "/*/*/*/*") |
| assert(hoodieROViewDF1.count() == 100) |
| |
| inputDF2.coalesce(1).write.mode(SaveMode.Append).json(sourcePath) |
| // wait for spark streaming to process one microbatch |
| waitTillAtleastNCommits(fs, destPath, currNumCommits + 1, 120, 5) |
| val commitInstantTime2 = HoodieDataSourceHelpers.latestCommit(fs, destPath) |
| assertEquals(2, HoodieDataSourceHelpers.listCommitsSince(fs, destPath, "000").size()) |
| // Read RO View |
| val hoodieROViewDF2 = spark.read.format("org.apache.hudi") |
| .load(destPath + "/*/*/*/*") |
| assertEquals(100, hoodieROViewDF2.count()) // still 100, since we only updated |
| |
| |
| // Read Incremental View |
| // we have 2 commits, try pulling the first commit (which is not the latest) |
| val firstCommit = HoodieDataSourceHelpers.listCommitsSince(fs, destPath, "000").get(0) |
| val hoodieIncViewDF1 = spark.read.format("org.apache.hudi") |
| .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) |
| .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, "000") |
| .option(DataSourceReadOptions.END_INSTANTTIME_OPT_KEY, firstCommit) |
| .load(destPath) |
| assertEquals(100, hoodieIncViewDF1.count()) |
| // 100 initial inserts must be pulled |
| var countsPerCommit = hoodieIncViewDF1.groupBy("_hoodie_commit_time").count().collect() |
| assertEquals(1, countsPerCommit.length) |
| assertEquals(firstCommit, countsPerCommit(0).get(0)) |
| |
| // pull the latest commit |
| val hoodieIncViewDF2 = spark.read.format("org.apache.hudi") |
| .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) |
| .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, commitInstantTime1) |
| .load(destPath) |
| |
| assertEquals(uniqueKeyCnt, hoodieIncViewDF2.count()) // 100 records must be pulled |
| countsPerCommit = hoodieIncViewDF2.groupBy("_hoodie_commit_time").count().collect() |
| assertEquals(1, countsPerCommit.length) |
| assertEquals(commitInstantTime2, countsPerCommit(0).get(0)) |
| } |
| Await.result(Future.sequence(Seq(f1, f2)), Duration.Inf) |
| } |
| |
| @throws[InterruptedException] |
| private def waitTillAtleastNCommits(fs: FileSystem, tablePath: String, |
| numCommits: Int, timeoutSecs: Int, sleepSecsAfterEachRun: Int) = { |
| val beginTime = System.currentTimeMillis |
| var currTime = beginTime |
| val timeoutMsecs = timeoutSecs * 1000 |
| var numInstants = 0 |
| var success = false |
| while ({!success && (currTime - beginTime) < timeoutMsecs}) try { |
| val timeline = HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs, tablePath) |
| log.info("Timeline :" + timeline.getInstants.toArray) |
| if (timeline.countInstants >= numCommits) { |
| numInstants = timeline.countInstants |
| success = true |
| } |
| val metaClient = new HoodieTableMetaClient(fs.getConf, tablePath, true) |
| } catch { |
| case te: TableNotFoundException => |
| log.info("Got table not found exception. Retrying") |
| } finally { |
| Thread.sleep(sleepSecsAfterEachRun * 1000) |
| currTime = System.currentTimeMillis |
| } |
| if (!success) throw new IllegalStateException("Timed-out waiting for " + numCommits + " commits to appear in " + tablePath) |
| numInstants |
| } |
| } |