blob: e82af8ba9b0b31c85a3233a0e4abebeefb818277 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.kudu.spark.kudu
import java.nio.charset.StandardCharsets
import java.util
import scala.collection.JavaConverters._
import scala.collection.immutable.IndexedSeq
import org.apache.spark.SparkException
import org.apache.spark.sql.Row
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.DataTypes
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StructType
import org.junit.Assert._
import org.apache.kudu.client.CreateTableOptions
import org.apache.kudu.test.RandomUtils
import org.apache.kudu.spark.kudu.SparkListenerUtil.withJobTaskCounter
import org.apache.kudu.test.KuduTestHarness.MasterServerConfig
import org.junit.Before
import org.junit.Test
import org.scalatest.matchers.should.Matchers
import scala.util.Random
class DefaultSourceTest extends KuduTestSuite with Matchers {
val rowCount = 10
var sqlContext: SQLContext = _
var rows: IndexedSeq[(Int, Int, String, Long)] = _
var kuduOptions: Map[String, String] = _
@Before
def setUp(): Unit = {
rows = insertRows(table, rowCount)
sqlContext = ss.sqlContext
kuduOptions =
Map("kudu.table" -> tableName, "kudu.master" -> harness.getMasterAddressesAsString)
sqlContext.read
.options(kuduOptions)
.format("kudu")
.load()
.createOrReplaceTempView(tableName)
}
/**
* A simple test to verify the legacy package reader/writer
* syntax still works. This should be removed when the
* deprecated `kudu` methods are removed.
*/
@Test @deprecated("Marked as deprecated to suppress warning", "")
def testPackageReaderAndWriter(): Unit = {
val df = sqlContext.read.options(kuduOptions).kudu
val baseDF = df.limit(1) // filter down to just the first row
// change the c2 string to abc and update
val updateDF = baseDF.withColumn("c2_s", lit("abc"))
updateDF.write.options(kuduOptions).mode("append").kudu
val newDf = sqlContext.read.options(kuduOptions).kudu
assertFalse(newDf.collect().isEmpty)
}
@Test
def testTableCreation() {
val tableName = "testcreatetable"
if (kuduContext.tableExists(tableName)) {
kuduContext.deleteTable(tableName)
}
val df = sqlContext.read.options(kuduOptions).format("kudu").load
kuduContext.createTable(
tableName,
df.schema,
Seq("key"),
new CreateTableOptions()
.setRangePartitionColumns(List("key").asJava)
.setNumReplicas(1))
val insertsBefore = kuduContext.numInserts.value.get(tableName)
kuduContext.insertRows(df, tableName)
assertEquals(insertsBefore + df.count(), kuduContext.numInserts.value.get(tableName))
// Now use new options to refer to the new table name.
val newOptions: Map[String, String] =
Map("kudu.table" -> tableName, "kudu.master" -> harness.getMasterAddressesAsString)
val checkDf = sqlContext.read.options(newOptions).format("kudu").load
assert(checkDf.schema === df.schema)
assertTrue(kuduContext.tableExists(tableName))
assert(checkDf.count == 10)
kuduContext.deleteTable(tableName)
assertFalse(kuduContext.tableExists(tableName))
}
@Test
def testTableCreationWithPartitioning() {
val tableName = "testcreatepartitionedtable"
if (kuduContext.tableExists(tableName)) {
kuduContext.deleteTable(tableName)
}
val df = sqlContext.read.options(kuduOptions).format("kudu").load
val kuduSchema = kuduContext.createSchema(df.schema, Seq("key"))
val lower = kuduSchema.newPartialRow()
lower.addInt("key", 0)
val upper = kuduSchema.newPartialRow()
upper.addInt("key", Integer.MAX_VALUE)
kuduContext.createTable(
tableName,
kuduSchema,
new CreateTableOptions()
.addHashPartitions(List("key").asJava, 2)
.setRangePartitionColumns(List("key").asJava)
.addRangePartition(lower, upper)
.setNumReplicas(1)
)
val insertsBefore = kuduContext.numInserts.value.get(tableName)
kuduContext.insertRows(df, tableName)
assertEquals(insertsBefore + df.count(), kuduContext.numInserts.value.get(tableName))
// now use new options to refer to the new table name
val newOptions: Map[String, String] =
Map("kudu.table" -> tableName, "kudu.master" -> harness.getMasterAddressesAsString)
val checkDf = sqlContext.read.options(newOptions).format("kudu").load
assert(checkDf.schema === df.schema)
assertTrue(kuduContext.tableExists(tableName))
assert(checkDf.count == 10)
kuduContext.deleteTable(tableName)
assertFalse(kuduContext.tableExists(tableName))
}
@Test
def testInsertion() {
val insertsBefore = kuduContext.numInserts.value.get(tableName)
println(s"insertsBefore: $insertsBefore")
val df = sqlContext.read.options(kuduOptions).format("kudu").load
val changedDF = df
.limit(1)
.withColumn("key", df("key").plus(100))
.withColumn("c2_s", lit("abc"))
kuduContext.insertRows(changedDF, tableName)
assertEquals(insertsBefore + changedDF.count(), kuduContext.numInserts.value.get(tableName))
val newDF = sqlContext.read.options(kuduOptions).format("kudu").load
val collected = newDF.filter("key = 100").collect()
assertEquals("abc", collected(0).getAs[String]("c2_s"))
deleteRow(100)
}
@Test
def testInsertionMultiple() {
val insertsBefore = kuduContext.numInserts.value.get(tableName)
val df = sqlContext.read.options(kuduOptions).format("kudu").load
val changedDF = df
.limit(2)
.withColumn("key", df("key").plus(100))
.withColumn("c2_s", lit("abc"))
kuduContext.insertRows(changedDF, tableName)
assertEquals(insertsBefore + changedDF.count(), kuduContext.numInserts.value.get(tableName))
val newDF = sqlContext.read.options(kuduOptions).format("kudu").load
val collected = newDF.filter("key = 100").collect()
assertEquals("abc", collected(0).getAs[String]("c2_s"))
val collectedTwo = newDF.filter("key = 101").collect()
assertEquals("abc", collectedTwo(0).getAs[String]("c2_s"))
deleteRow(100)
deleteRow(101)
}
@Test
def testInsertionIgnoreRows() {
val df = sqlContext.read.options(kuduOptions).format("kudu").load
val baseDF = df.limit(1) // Filter down to just the first row.
// change the c2 string to abc and insert
val updateDF = baseDF.withColumn("c2_s", lit("abc"))
val kuduWriteOptions = KuduWriteOptions(ignoreDuplicateRowErrors = true)
kuduContext.insertRows(updateDF, tableName, kuduWriteOptions)
// Change the key and insert.
val insertDF = df
.limit(1)
.withColumn("key", df("key").plus(100))
.withColumn("c2_s", lit("def"))
kuduContext.insertRows(insertDF, tableName, kuduWriteOptions)
// Read the data back.
val newDF = sqlContext.read.options(kuduOptions).format("kudu").load
val collectedUpdate = newDF.filter("key = 0").collect()
assertEquals("0", collectedUpdate(0).getAs[String]("c2_s"))
val collectedInsert = newDF.filter("key = 100").collect()
assertEquals("def", collectedInsert(0).getAs[String]("c2_s"))
// Restore the original state of the table.
deleteRow(100)
}
@Test
def testInsertIgnoreRowsUsingDefaultSource() {
val df = sqlContext.read.options(kuduOptions).format("kudu").load
val baseDF = df.limit(1) // filter down to just the first row
// change the c2 string to abc and insert
val updateDF = baseDF.withColumn("c2_s", lit("abc"))
val newOptions: Map[String, String] = Map(
"kudu.table" -> tableName,
"kudu.master" -> harness.getMasterAddressesAsString,
"kudu.operation" -> "insert",
"kudu.ignoreDuplicateRowErrors" -> "true")
updateDF.write.options(newOptions).mode("append").format("kudu").save
// change the key and insert
val insertDF = df
.limit(1)
.withColumn("key", df("key").plus(100))
.withColumn("c2_s", lit("def"))
insertDF.write.options(newOptions).mode("append").format("kudu").save
// read the data back
val newDF = sqlContext.read.options(kuduOptions).format("kudu").load
val collectedUpdate = newDF.filter("key = 0").collect()
assertEquals("0", collectedUpdate(0).getAs[String]("c2_s"))
val collectedInsert = newDF.filter("key = 100").collect()
assertEquals("def", collectedInsert(0).getAs[String]("c2_s"))
// restore the original state of the table
deleteRow(100)
}
@Test
def testInsertIgnoreRowsWriteOption() {
val df = sqlContext.read.options(kuduOptions).format("kudu").load
val baseDF = df.limit(1) // filter down to just the first row
// change the c2 string to abc and insert
val updateDF = baseDF.withColumn("c2_s", lit("abc"))
val newOptions: Map[String, String] = Map(
"kudu.table" -> tableName,
"kudu.master" -> harness.getMasterAddressesAsString,
"kudu.operation" -> "insert-ignore")
updateDF.write.options(newOptions).mode("append").format("kudu").save
// change the key and insert
val insertDF = df
.limit(1)
.withColumn("key", df("key").plus(100))
.withColumn("c2_s", lit("def"))
insertDF.write.options(newOptions).mode("append").format("kudu").save
// read the data back
val newDF = sqlContext.read.options(kuduOptions).format("kudu").load
val collectedUpdate = newDF.filter("key = 0").collect()
assertEquals("0", collectedUpdate(0).getAs[String]("c2_s"))
val collectedInsert = newDF.filter("key = 100").collect()
assertEquals("def", collectedInsert(0).getAs[String]("c2_s"))
// restore the original state of the table
deleteRow(100)
}
@Test @deprecated("Marked as deprecated to suppress warning", "")
def testInsertIgnoreRowsMethod() {
val df = sqlContext.read.options(kuduOptions).format("kudu").load
val baseDF = df.limit(1) // filter down to just the first row
// change the c2 string to abc and insert
val updateDF = baseDF.withColumn("c2_s", lit("abc"))
kuduContext.insertIgnoreRows(updateDF, tableName)
// change the key and insert
val insertDF = df
.limit(1)
.withColumn("key", df("key").plus(100))
.withColumn("c2_s", lit("def"))
kuduContext.insertIgnoreRows(insertDF, tableName)
// read the data back
val newDF = sqlContext.read.options(kuduOptions).format("kudu").load
val collectedUpdate = newDF.filter("key = 0").collect()
assertEquals("0", collectedUpdate(0).getAs[String]("c2_s"))
val collectedInsert = newDF.filter("key = 100").collect()
assertEquals("def", collectedInsert(0).getAs[String]("c2_s"))
// restore the original state of the table
deleteRow(100)
}
@Test
def testUpsertRows() {
val df = sqlContext.read.options(kuduOptions).format("kudu").load
val baseDF = df.limit(1) // Filter down to just the first row.
// Change the c2 string to abc and update.
val upsertDF = baseDF.withColumn("c2_s", lit("abc"))
kuduContext.upsertRows(upsertDF, tableName)
// Change the key and insert.
val upsertsBefore = kuduContext.numUpserts.value.get(tableName)
val insertDF = df
.limit(1)
.withColumn("key", df("key").plus(100))
.withColumn("c2_s", lit("def"))
kuduContext.upsertRows(insertDF, tableName)
assertEquals(upsertsBefore + insertDF.count(), kuduContext.numUpserts.value.get(tableName))
// Read the data back.
val newDF = sqlContext.read.options(kuduOptions).format("kudu").load
val collectedUpdate = newDF.filter("key = 0").collect()
assertEquals("abc", collectedUpdate(0).getAs[String]("c2_s"))
val collectedInsert = newDF.filter("key = 100").collect()
assertEquals("def", collectedInsert(0).getAs[String]("c2_s"))
// Restore the original state of the table, and test the numUpdates metric.
val updatesBefore = kuduContext.numUpdates.value.get(tableName)
val updateDF = baseDF.filter("key = 0").withColumn("c2_s", lit("0"))
val updatesApplied = updateDF.count()
kuduContext.updateRows(updateDF, tableName)
assertEquals(updatesBefore + updatesApplied, kuduContext.numUpdates.value.get(tableName))
deleteRow(100)
}
@Test
def testMultipleTableOperationCounts() {
val df = sqlContext.read.options(kuduOptions).format("kudu").load
val tableUpsertsBefore = kuduContext.numUpserts.value.get(tableName)
val simpleTableUpsertsBefore = kuduContext.numUpserts.value.get(simpleTableName)
// Change the key and insert.
val insertDF = df
.limit(1)
.withColumn("key", df("key").plus(100))
.withColumn("c2_s", lit("def"))
kuduContext.upsertRows(insertDF, tableName)
// insert new row to simple table
val insertSimpleDF = sqlContext.createDataFrame(Seq((0, "foo"))).toDF("key", "val")
kuduContext.upsertRows(insertSimpleDF, simpleTableName)
assertEquals(tableUpsertsBefore + insertDF.count(), kuduContext.numUpserts.value.get(tableName))
assertEquals(
simpleTableUpsertsBefore + insertSimpleDF.count(),
kuduContext.numUpserts.value.get(simpleTableName))
// Restore the original state of the tables, and test the numDeletes metric.
val deletesBefore = kuduContext.numDeletes.value.get(tableName)
val simpleDeletesBefore = kuduContext.numDeletes.value.get(simpleTableName)
kuduContext.deleteRows(insertDF, tableName)
kuduContext.deleteRows(insertSimpleDF, simpleTableName)
assertEquals(deletesBefore + insertDF.count(), kuduContext.numDeletes.value.get(tableName))
assertEquals(
simpleDeletesBefore + insertSimpleDF.count(),
kuduContext.numDeletes.value.get(simpleTableName))
}
@Test
def testWriteWithSink() {
val df = sqlContext.read.options(kuduOptions).format("kudu").load
val baseDF = df.limit(1) // Filter down to just the first row.
// Change the c2 string to abc and upsert.
val upsertDF = baseDF.withColumn("c2_s", lit("abc"))
upsertDF.write
.format("kudu")
.option("kudu.master", harness.getMasterAddressesAsString)
.option("kudu.table", tableName)
// Default kudu.operation is upsert.
.mode(SaveMode.Append)
.save()
// Change the key and insert.
val insertDF = df
.limit(1)
.withColumn("key", df("key").plus(100))
.withColumn("c2_s", lit("def"))
insertDF.write
.format("kudu")
.option("kudu.master", harness.getMasterAddressesAsString)
.option("kudu.table", tableName)
.option("kudu.operation", "insert")
.mode(SaveMode.Append)
.save()
// Read the data back.
val newDF = sqlContext.read.options(kuduOptions).format("kudu").load
val collectedUpdate = newDF.filter("key = 0").collect()
assertEquals("abc", collectedUpdate(0).getAs[String]("c2_s"))
val collectedInsert = newDF.filter("key = 100").collect()
assertEquals("def", collectedInsert(0).getAs[String]("c2_s"))
}
@Test
def testUpsertRowsIgnoreNulls() {
val nonNullDF =
sqlContext.createDataFrame(Seq((0, "foo"))).toDF("key", "val")
kuduContext.insertRows(nonNullDF, simpleTableName)
val dataDF = sqlContext.read
.options(
Map("kudu.master" -> harness.getMasterAddressesAsString, "kudu.table" -> simpleTableName))
.format("kudu")
.load
val nullDF = sqlContext
.createDataFrame(Seq((0, null.asInstanceOf[String])))
.toDF("key", "val")
val ignoreNullOptions = KuduWriteOptions(ignoreNull = true)
kuduContext.upsertRows(nullDF, simpleTableName, ignoreNullOptions)
assert(dataDF.collect.toList === nonNullDF.collect.toList)
val respectNullOptions = KuduWriteOptions(ignoreNull = false)
kuduContext.updateRows(nonNullDF, simpleTableName)
kuduContext.upsertRows(nullDF, simpleTableName, respectNullOptions)
assert(dataDF.collect.toList === nullDF.collect.toList)
kuduContext.updateRows(nonNullDF, simpleTableName)
kuduContext.upsertRows(nullDF, simpleTableName)
assert(dataDF.collect.toList === nullDF.collect.toList)
val deleteDF = dataDF.filter("key = 0").select("key")
kuduContext.deleteRows(deleteDF, simpleTableName)
}
@Test
def testUpsertRowsIgnoreNullsUsingDefaultSource() {
val nonNullDF =
sqlContext.createDataFrame(Seq((0, "foo"))).toDF("key", "val")
kuduContext.insertRows(nonNullDF, simpleTableName)
val dataDF = sqlContext.read
.options(
Map("kudu.master" -> harness.getMasterAddressesAsString, "kudu.table" -> simpleTableName))
.format("kudu")
.load
val nullDF = sqlContext
.createDataFrame(Seq((0, null.asInstanceOf[String])))
.toDF("key", "val")
val options_0: Map[String, String] = Map(
"kudu.table" -> simpleTableName,
"kudu.master" -> harness.getMasterAddressesAsString,
"kudu.ignoreNull" -> "true")
nullDF.write.options(options_0).mode("append").format("kudu").save
assert(dataDF.collect.toList === nonNullDF.collect.toList)
kuduContext.updateRows(nonNullDF, simpleTableName)
val options_1: Map[String, String] =
Map("kudu.table" -> simpleTableName, "kudu.master" -> harness.getMasterAddressesAsString)
nullDF.write.options(options_1).mode("append").format("kudu").save
assert(dataDF.collect.toList === nullDF.collect.toList)
val deleteDF = dataDF.filter("key = 0").select("key")
kuduContext.deleteRows(deleteDF, simpleTableName)
}
@Test
def testRepartition(): Unit = {
runRepartitionTest(false)
}
@Test
def testRepartitionAndSort(): Unit = {
runRepartitionTest(true)
}
def runRepartitionTest(repartitionSort: Boolean): Unit = {
// Create a simple table with 2 range partitions split on the value 100.
val tableName = "testRepartition"
val splitValue = 100
val split = simpleSchema.newPartialRow()
split.addInt("key", splitValue)
val options = new CreateTableOptions()
options.setRangePartitionColumns(List("key").asJava)
options.addSplitRow(split)
val table = kuduClient.createTable(tableName, simpleSchema, options)
val random = Random.javaRandomToRandom(RandomUtils.getRandom)
val data = random.shuffle(
Seq(
Row.fromSeq(Seq(0, "0")),
Row.fromSeq(Seq(25, "25")),
Row.fromSeq(Seq(50, "50")),
Row.fromSeq(Seq(75, "75")),
Row.fromSeq(Seq(99, "99")),
Row.fromSeq(Seq(100, "100")),
Row.fromSeq(Seq(101, "101")),
Row.fromSeq(Seq(125, "125")),
Row.fromSeq(Seq(150, "150")),
Row.fromSeq(Seq(175, "175")),
Row.fromSeq(Seq(199, "199"))
))
val dataRDD = ss.sparkContext.parallelize(data, numSlices = 2)
val schema = SparkUtil.sparkSchema(table.getSchema)
val dataDF = ss.sqlContext.createDataFrame(dataRDD, schema)
// Capture the rows so we can validate the insert order.
kuduContext.captureRows = true
// Count the number of tasks that end.
val actualNumTasks = withJobTaskCounter(ss.sparkContext) { () =>
kuduContext.insertRows(
dataDF,
tableName,
new KuduWriteOptions(repartition = true, repartitionSort = repartitionSort))
}
// 2 tasks from the parallelize call, and 2 from the repartitioning.
assertEquals(4, actualNumTasks)
val rows = kuduContext.rowsAccumulator.value.asScala
assertEquals(data.size, rows.size)
assertEquals(data.map(_.getInt(0)).sorted, rows.map(_.getInt(0)).sorted)
// If repartitionSort is true, verify the rows were sorted while repartitioning.
if (repartitionSort) {
def isSorted(rows: Seq[Int]): Boolean = {
rows.sliding(2).forall(p => (p.size == 1) || p.head < p.tail.head)
}
val (bottomRows, topRows) = rows.map(_.getInt(0)).partition(_ < splitValue)
assertTrue(isSorted(bottomRows))
assertTrue(isSorted(topRows))
}
}
@Test
def testDeleteRows() {
val df = sqlContext.read.options(kuduOptions).format("kudu").load
val deleteDF = df.filter("key = 0").select("key")
val deletesBefore = kuduContext.numDeletes.value.get(tableName)
val deletesApplied = deleteDF.count()
kuduContext.deleteRows(deleteDF, tableName)
assertEquals(deletesBefore + deletesApplied, kuduContext.numDeletes.value.get(tableName))
// Read the data back.
val newDF = sqlContext.read.options(kuduOptions).format("kudu").load
val collectedDelete = newDF.filter("key = 0").collect()
assertEquals(0, collectedDelete.length)
// Restore the original state of the table.
val insertDF = df.limit(1).filter("key = 0")
kuduContext.insertRows(insertDF, tableName)
}
@Test
def testOutOfOrderSelection() {
val df =
sqlContext.read.options(kuduOptions).format("kudu").load.select("c2_s", "c1_i", "key")
val collected = df.collect()
assert(collected(0).getString(0).equals("0"))
}
@Test
def testWriteUsingDefaultSource() {
val df = sqlContext.read.options(kuduOptions).format("kudu").load
val newTable = "testwritedatasourcetable"
kuduContext.createTable(
newTable,
df.schema,
Seq("key"),
new CreateTableOptions()
.setRangePartitionColumns(List("key").asJava)
.setNumReplicas(1))
val newOptions: Map[String, String] =
Map("kudu.table" -> newTable, "kudu.master" -> harness.getMasterAddressesAsString)
df.write.options(newOptions).mode("append").format("kudu").save
val checkDf = sqlContext.read.options(newOptions).format("kudu").load
assert(checkDf.schema === df.schema)
assertTrue(kuduContext.tableExists(newTable))
assert(checkDf.count == 10)
}
@Test
def testSchemaDrift() {
val nonNullDF =
sqlContext.createDataFrame(Seq((0, "foo"))).toDF("key", "val")
kuduContext.insertRows(nonNullDF, simpleTableName)
val tableOptions = Map(
"kudu.master" -> harness.getMasterAddressesAsString,
"kudu.table" -> simpleTableName
)
val df = sqlContext.read.options(tableOptions).format("kudu").load
assertEquals(2, df.schema.fields.length)
// Add a column not in the table schema by duplicating the val column.
val newDf = df.withColumn("val2", col("val"))
// Insert with handleSchemaDrift = false. Note that a new column was not created.
kuduContext.upsertRows(newDf, simpleTableName, KuduWriteOptions(handleSchemaDrift = false))
assertEquals(2, harness.getClient.openTable(simpleTableName).getSchema.getColumns.size())
// Insert with handleSchemaDrift = true. Note that a new column was created.
kuduContext.upsertRows(newDf, simpleTableName, KuduWriteOptions(handleSchemaDrift = true))
assertEquals(3, harness.getClient.openTable(simpleTableName).getSchema.getColumns.size())
val afterDf = sqlContext.read.options(tableOptions).format("kudu").load
assertEquals(3, afterDf.schema.fields.length)
assertEquals("val2", afterDf.schema.fieldNames.last)
assertTrue(afterDf.collect().forall(r => r.getString(1) == r.getString(2)))
}
@Test
def testInsertWrongType() {
val nonNullDF =
sqlContext.createDataFrame(Seq((0, "foo"))).toDF("key", "val")
kuduContext.insertRows(nonNullDF, simpleTableName)
val tableOptions = Map(
"kudu.master" -> harness.getMasterAddressesAsString,
"kudu.table" -> simpleTableName
)
val df = sqlContext.read.options(tableOptions).format("kudu").load
// Convert the val column to a bytes instead of string.
val toBytes = udf[Array[Byte], String](_.getBytes(StandardCharsets.UTF_8))
val newDf = df
.withColumn("valTmp", toBytes(col("val")))
.drop("val")
.withColumnRenamed("valTmp", "val")
try {
kuduContext.insertRows(newDf, simpleTableName, KuduWriteOptions())
} catch {
case e: SparkException =>
assertTrue(e.getMessage.contains("val isn't [Type: binary], it's string"))
}
}
@Test
def testCreateRelationWithSchema() {
// user-supplied schema that is compatible with actual schema, but with the key at the end
val userSchema: StructType = StructType(
List(
StructField("c4_long", DataTypes.LongType),
StructField("key", DataTypes.IntegerType)
))
val dfDefaultSchema = sqlContext.read.options(kuduOptions).format("kudu").load
assertEquals(16, dfDefaultSchema.schema.fields.length)
val dfWithUserSchema =
sqlContext.read.options(kuduOptions).schema(userSchema).format("kudu").load
assertEquals(2, dfWithUserSchema.schema.fields.length)
dfWithUserSchema.limit(10).collect()
assertTrue(dfWithUserSchema.columns.deep == Array("c4_long", "key").deep)
}
@Test
def testCreateRelationWithInvalidSchema() {
// user-supplied schema that is NOT compatible with actual schema
val userSchema: StructType = StructType(
List(
StructField("foo", DataTypes.LongType),
StructField("bar", DataTypes.IntegerType)
))
intercept[IllegalArgumentException] {
sqlContext.read.options(kuduOptions).schema(userSchema).format("kudu").load
}.getMessage should include("Unknown column: foo")
}
// Verify that the propagated timestamp is properly updated inside
// the same client.
@Test
def testTimestampPropagation() {
val df = sqlContext.read.options(kuduOptions).format("kudu").load
val insertDF = df
.limit(1)
.withColumn(
"key",
df("key")
.plus(100))
.withColumn("c2_s", lit("abc"))
// Initiate a write via KuduContext, and verify that the client should
// have propagated timestamp.
kuduContext.insertRows(insertDF, tableName)
assert(kuduContext.syncClient.getLastPropagatedTimestamp > 0)
var prevTimestamp = kuduContext.syncClient.getLastPropagatedTimestamp
// Initiate a read via DataFrame, and verify that the client should
// move the propagated timestamp further.
val newDF = sqlContext.read.options(kuduOptions).format("kudu").load
val collected = newDF.filter("key = 100").collect()
assertEquals("abc", collected(0).getAs[String]("c2_s"))
assert(kuduContext.syncClient.getLastPropagatedTimestamp > prevTimestamp)
prevTimestamp = kuduContext.syncClient.getLastPropagatedTimestamp
// Initiate a read via KuduContext, and verify that the client should
// move the propagated timestamp further.
val rdd = kuduContext.kuduRDD(ss.sparkContext, tableName, List("key"))
assert(rdd.collect.length == 11)
assert(kuduContext.syncClient.getLastPropagatedTimestamp > prevTimestamp)
prevTimestamp = kuduContext.syncClient.getLastPropagatedTimestamp
// Initiate another write via KuduContext, and verify that the client should
// move the propagated timestamp further.
val updateDF = df
.limit(1)
.withColumn(
"key",
df("key")
.plus(100))
.withColumn("c2_s", lit("def"))
val kuduWriteOptions = KuduWriteOptions(ignoreDuplicateRowErrors = true)
kuduContext.insertRows(updateDF, tableName, kuduWriteOptions)
assert(kuduContext.syncClient.getLastPropagatedTimestamp > prevTimestamp)
}
/**
* Verify that the kudu.scanRequestTimeoutMs parameter is parsed by the
* DefaultSource and makes it into the KuduRelation as a configuration
* parameter.
*/
@Test
def testScanRequestTimeoutPropagation() {
kuduOptions = Map(
"kudu.table" -> tableName,
"kudu.master" -> harness.getMasterAddressesAsString,
"kudu.scanRequestTimeoutMs" -> "66666")
val dataFrame = sqlContext.read.options(kuduOptions).format("kudu").load
val kuduRelation = kuduRelationFromDataFrame(dataFrame)
assert(kuduRelation.readOptions.scanRequestTimeoutMs.contains(66666))
}
@Test
def testSnapshotTimestampMsPropagation() {
kuduOptions = Map(
"kudu.table" -> tableName,
"kudu.master" -> harness.getMasterAddressesAsString,
"kudu.snapshotTimestampMs" -> "86400000000")
val dataFrameSnapshotTimestamp = sqlContext.read.options(kuduOptions).format("kudu").load
val kuduRelationSnapshotTimestamp = kuduRelationFromDataFrame(dataFrameSnapshotTimestamp)
kuduOptions =
Map("kudu.table" -> tableName, "kudu.master" -> harness.getMasterAddressesAsString)
val dataFrameNoneSnapshotTimestamp = sqlContext.read.options(kuduOptions).format("kudu").load
val kuduRelationNoneSnapshotTimestamp = kuduRelationFromDataFrame(
dataFrameNoneSnapshotTimestamp)
assert(kuduRelationSnapshotTimestamp.readOptions.snapshotTimestampMs.contains(86400000000L))
assert(kuduRelationNoneSnapshotTimestamp.readOptions.snapshotTimestampMs.isEmpty)
}
@Test
def testReadDataFrameAtSnapshot() {
insertRows(table, 100, 1)
val timestamp = getLastPropagatedTimestampMs()
insertRows(table, 100, 100)
kuduOptions = Map(
"kudu.table" -> tableName,
"kudu.master" -> harness.getMasterAddressesAsString,
"kudu.snapshotTimestampMs" -> s"$timestamp")
val dataFrameWithSnapshotTimestamp = sqlContext.read.options(kuduOptions).format("kudu").load
kuduOptions =
Map("kudu.table" -> tableName, "kudu.master" -> harness.getMasterAddressesAsString)
val dataFrameWithoutSnapshotTimestamp = sqlContext.read.options(kuduOptions).format("kudu").load
assertEquals(100, dataFrameWithSnapshotTimestamp.collect().length)
assertEquals(200, dataFrameWithoutSnapshotTimestamp.collect().length)
}
@Test
def testSnapshotTimestampBeyondMaxAge(): Unit = {
val extraConfigs = new util.HashMap[String, String]()
val tableName = "snapshot_test"
extraConfigs.put("kudu.table.history_max_age_sec", "1");
kuduClient.createTable(
tableName,
schema,
tableOptions.setExtraConfigs(extraConfigs)
)
val timestamp = getLastPropagatedTimestampMs()
kuduOptions = Map(
"kudu.table" -> tableName,
"kudu.master" -> harness.getMasterAddressesAsString,
"kudu.snapshotTimestampMs" -> s"$timestamp")
insertRows(table, 100, 1)
Thread.sleep(2000)
val df = sqlContext.read.options(kuduOptions).format("kudu").load
val exception = intercept[Exception] {
df.count()
}
assertTrue(
exception.getMessage.contains(
"snapshot scan end timestamp is earlier than the ancient history mark")
)
}
@Test
def testSnapshotTimestampBeyondCurrentTimestamp(): Unit = {
val timestamp = getLastPropagatedTimestampMs() + 100000
kuduOptions = Map(
"kudu.table" -> tableName,
"kudu.master" -> harness.getMasterAddressesAsString,
"kudu.snapshotTimestampMs" -> s"$timestamp")
val df = sqlContext.read.options(kuduOptions).format("kudu").load
val exception = intercept[Exception] {
df.count()
}
assertTrue(exception.getMessage.contains("cannot verify timestamp"))
}
@Test
@MasterServerConfig(
flags = Array(
"--mock_table_metrics_for_testing=true",
"--on_disk_size_for_testing=1024",
"--live_row_count_for_testing=100"
))
def testGetTableStatistics(): Unit = {
val dataFrame = sqlContext.read.options(kuduOptions).format("kudu").load
val kuduRelation = kuduRelationFromDataFrame(dataFrame)
assert(kuduRelation.sizeInBytes == 1024)
}
}