blob: c513546fe2f3c3239b38d2b8e664c8886ed8ee45 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.spark
import org.apache.ignite.IgniteException
import org.apache.ignite.internal.util.IgniteUtils.gridClassLoader
import org.apache.ignite.spark.AbstractDataFrameSpec.{PERSON_TBL_NAME, PERSON_TBL_NAME_2, TEST_CONFIG_FILE}
import org.apache.ignite.spark.IgniteDataFrameSettings._
import org.apache.ignite.spark.impl.sqlTableInfo
import org.apache.spark.sql.SaveMode.{Append, Ignore, Overwrite}
import org.apache.spark.sql.{DataFrame, SaveMode}
import org.junit.Assert.assertEquals
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
import org.apache.spark.sql.functions._
/**
*/
@RunWith(classOf[JUnitRunner])
class IgniteSQLDataFrameWriteSpec extends AbstractDataFrameSpec {
var personDataFrame: DataFrame = _
describe("Write DataFrame into a Ignite SQL table") {
it("Save data frame as a new table") {
val rowsCnt = personDataFrame.count()
personDataFrame.write
.format(FORMAT_IGNITE)
.option(OPTION_CONFIG_FILE, TEST_CONFIG_FILE)
.option(OPTION_TABLE, "new_persons")
.option(OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS, "id")
.save()
assert(rowsCnt == rowsCount("new_persons"), "Data should be saved into 'new_persons' table")
}
it("Save data frame to existing table") {
val rowsCnt = personDataFrame.count()
personDataFrame.write
.format(FORMAT_IGNITE)
.option(OPTION_CONFIG_FILE, TEST_CONFIG_FILE)
.option(OPTION_TABLE, PERSON_TBL_NAME_2)
.option(OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS, "id, city_id")
.option(OPTION_CREATE_TABLE_PARAMETERS, "backups=1, affinityKey=city_id")
.mode(Overwrite)
.save()
assert(rowsCnt == rowsCount(PERSON_TBL_NAME_2), s"Data should be saved into $PERSON_TBL_NAME_2 table")
}
it("Save data frame to existing table with streamer options") {
val rowsCnt = personDataFrame.count()
personDataFrame.write
.format(FORMAT_IGNITE)
.option(OPTION_CONFIG_FILE, TEST_CONFIG_FILE)
.option(OPTION_TABLE, PERSON_TBL_NAME_2)
.option(OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS, "id, city_id")
.option(OPTION_CREATE_TABLE_PARAMETERS, "backups=1, affinityKey=city_id")
.option(OPTION_STREAMER_PER_NODE_PARALLEL_OPERATIONS, 3)
.option(OPTION_STREAMER_PER_NODE_BUFFER_SIZE, 1)
.option(OPTION_STREAMER_FLUSH_FREQUENCY, 10000)
.mode(Overwrite)
.save()
assert(rowsCnt == rowsCount(PERSON_TBL_NAME_2), s"Data should be saved into $PERSON_TBL_NAME_2 table")
}
it("Ignore save operation if table exists") {
//Count of records before saving
val person2RowsCntBeforeSave = rowsCount(PERSON_TBL_NAME_2)
personDataFrame.write
.format(FORMAT_IGNITE)
.option(OPTION_CONFIG_FILE, TEST_CONFIG_FILE)
.option(OPTION_TABLE, PERSON_TBL_NAME_2)
.option(OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS, "id, city_id")
.option(OPTION_CREATE_TABLE_PARAMETERS, "backups=1, affinityKey=city_id")
.mode(Ignore)
.save()
assert(rowsCount(PERSON_TBL_NAME_2) == person2RowsCntBeforeSave, "Save operation should be ignored.")
}
it("Append data frame data to existing table") {
//Count of records before appending
val person2RowsCnt = rowsCount(PERSON_TBL_NAME_2)
//Count of appended records
val personRowsCnt = personDataFrame.count()
personDataFrame
.withColumn("id", col("id") + person2RowsCnt) //Edit id column to prevent duplication
.write.format(FORMAT_IGNITE)
.option(OPTION_CONFIG_FILE, TEST_CONFIG_FILE)
.option(OPTION_TABLE, PERSON_TBL_NAME_2)
.option(OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS, "id, city_id")
.option(OPTION_CREATE_TABLE_PARAMETERS, "backups=1, affinityKey=city_id")
.mode(Append)
.save()
assert(rowsCount(PERSON_TBL_NAME_2) == person2RowsCnt + personRowsCnt,
s"Table $PERSON_TBL_NAME_2 should contain data from $PERSON_TBL_NAME")
}
it("Save another data source data as a Ignite table") {
val citiesDataFrame = spark.read.json(
gridClassLoader().getResource("cities.json").getFile)
citiesDataFrame.write
.format(FORMAT_IGNITE)
.option(OPTION_CONFIG_FILE, TEST_CONFIG_FILE)
.option(OPTION_TABLE, "json_city")
.option(OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS, "id")
.option(OPTION_CREATE_TABLE_PARAMETERS, "template=replicated")
.save()
assert(rowsCount("json_city") == citiesDataFrame.count(),
"Table json_city should contain data from json file.")
}
it("Save data frame as a new table with save('table_name')") {
val rowsCnt = personDataFrame.count()
personDataFrame.write
.format(FORMAT_IGNITE)
.option(OPTION_CONFIG_FILE, TEST_CONFIG_FILE)
.option(OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS, "id")
.save("saved_persons")
assert(rowsCnt == rowsCount("saved_persons"), "Data should be saved into 'saved_persons' table")
}
it("Should keep first row if allowOverwrite is false") {
val nonUniqueCitiesDataFrame = spark.read.json(
gridClassLoader().getResource("cities_non_unique.json").getFile)
nonUniqueCitiesDataFrame.write
.format(FORMAT_IGNITE)
.option(OPTION_CONFIG_FILE, TEST_CONFIG_FILE)
.option(OPTION_TABLE, "first_row_json_city")
.option(OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS, "id")
.option(OPTION_CREATE_TABLE_PARAMETERS, "template=replicated")
.option(OPTION_STREAMER_ALLOW_OVERWRITE, false)
.save()
val cities = readTable("first_row_json_city").collect().sortBy(_.getAs[Long]("ID"))
assert(cities(0).getAs[String]("NAME") == "Forest Hill")
assert(cities(1).getAs[String]("NAME") == "Denver")
assert(cities(2).getAs[String]("NAME") == "St. Petersburg")
}
it("Should keep last row if allowOverwrite is true") {
val nonUniqueCitiesDataFrame = spark.read.json(
gridClassLoader().getResource("cities_non_unique.json").getFile)
nonUniqueCitiesDataFrame.write
.format(FORMAT_IGNITE)
.option(OPTION_CONFIG_FILE, TEST_CONFIG_FILE)
.option(OPTION_TABLE, "last_row_json_city")
.option(OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS, "id")
.option(OPTION_CREATE_TABLE_PARAMETERS, "template=replicated")
.option(OPTION_STREAMER_ALLOW_OVERWRITE, true)
.save()
val cities = readTable("last_row_json_city").collect().sortBy(_.getAs[Long]("ID"))
assert(cities(0).getAs[String]("NAME") == "Paris")
assert(cities(1).getAs[String]("NAME") == "New York")
assert(cities(2).getAs[String]("NAME") == "Moscow")
}
}
describe("Wrong DataFrame Write Options") {
it("Should throw exception with ErrorIfExists for a existing table") {
intercept[IgniteException] {
personDataFrame.write
.format(FORMAT_IGNITE)
.option(OPTION_CONFIG_FILE, TEST_CONFIG_FILE)
.option(OPTION_TABLE, PERSON_TBL_NAME)
.option(OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS, "id")
.mode(SaveMode.ErrorIfExists)
.save()
}
}
it("Should throw exception if primary key fields not specified") {
intercept[IgniteException] {
personDataFrame.write
.format(FORMAT_IGNITE)
.option(OPTION_CONFIG_FILE, TEST_CONFIG_FILE)
.option(OPTION_TABLE, "persons_no_pk")
.save()
}
}
it("Should throw exception if primary key fields not specified for existing table") {
intercept[IgniteException] {
personDataFrame.write
.format(FORMAT_IGNITE)
.option(OPTION_CONFIG_FILE, TEST_CONFIG_FILE)
.option(OPTION_TABLE, PERSON_TBL_NAME)
.mode(Overwrite)
.save()
}
val tblInfo = sqlTableInfo(client, PERSON_TBL_NAME, None)
assert(tblInfo.isDefined, s"Table $PERSON_TBL_NAME should exists.")
}
it("Should throw exception for wrong pk field") {
intercept[IgniteException] {
personDataFrame.write
.format(FORMAT_IGNITE)
.option(OPTION_CONFIG_FILE, TEST_CONFIG_FILE)
.option(OPTION_TABLE, PERSON_TBL_NAME)
.option(OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS, "unknown_field")
.mode(Overwrite)
.save()
}
val tblInfo = sqlTableInfo(client, PERSON_TBL_NAME, None)
assert(tblInfo.isDefined, s"Table $PERSON_TBL_NAME should exists.")
}
it("Should throw exception for wrong pk field - 2") {
intercept[IgniteException] {
personDataFrame.write
.format(FORMAT_IGNITE)
.option(OPTION_CONFIG_FILE, TEST_CONFIG_FILE)
.option(OPTION_TABLE, PERSON_TBL_NAME)
.option(OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS, "id,unknown_field")
.mode(Overwrite)
.save()
}
val tblInfo = sqlTableInfo(client, PERSON_TBL_NAME, None)
assert(tblInfo.isDefined, s"Table $PERSON_TBL_NAME should exists.")
}
it("Should throw exception for wrong WITH clause") {
intercept[IgniteException] {
personDataFrame.write
.format(FORMAT_IGNITE)
.option(OPTION_CONFIG_FILE, TEST_CONFIG_FILE)
.option(OPTION_TABLE, "person_unsupported_with")
.option(OPTION_CREATE_TABLE_PARAMETERS, "unsupported_with_clause")
.mode(Overwrite)
.save()
}
}
it("Should throw exception for wrong table name") {
intercept[IgniteException] {
personDataFrame.write
.format(FORMAT_IGNITE)
.option(OPTION_CONFIG_FILE, TEST_CONFIG_FILE)
.option(OPTION_TABLE, "wrong-table-name")
.option(OPTION_CREATE_TABLE_PARAMETERS, "unsupported_with_clause")
.mode(Overwrite)
.save()
}
}
it("Should throw exception if streamingFlushFrequency is not a number") {
intercept[NumberFormatException] {
personDataFrame.write
.format(FORMAT_IGNITE)
.option(OPTION_CONFIG_FILE, TEST_CONFIG_FILE)
.option(OPTION_TABLE, PERSON_TBL_NAME)
.option(OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS, "id")
.option(OPTION_STREAMER_FLUSH_FREQUENCY, "not_a_number")
.mode(Overwrite)
.save()
}
}
it("Should throw exception if streamingPerNodeBufferSize is not a number") {
intercept[NumberFormatException] {
personDataFrame.write
.format(FORMAT_IGNITE)
.option(OPTION_CONFIG_FILE, TEST_CONFIG_FILE)
.option(OPTION_TABLE, PERSON_TBL_NAME)
.option(OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS, "id")
.option(OPTION_STREAMER_PER_NODE_BUFFER_SIZE, "not_a_number")
.mode(Overwrite)
.save()
}
}
it("Should throw exception if streamingPerNodeParallelOperations is not a number") {
intercept[NumberFormatException] {
personDataFrame.write
.format(FORMAT_IGNITE)
.option(OPTION_CONFIG_FILE, TEST_CONFIG_FILE)
.option(OPTION_TABLE, PERSON_TBL_NAME)
.option(OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS, "id")
.option(OPTION_STREAMER_PER_NODE_PARALLEL_OPERATIONS, "not_a_number")
.mode(Overwrite)
.save()
}
}
it("Should throw exception if streamerAllowOverwrite is not a boolean") {
intercept[IllegalArgumentException] {
personDataFrame.write
.format(FORMAT_IGNITE)
.option(OPTION_CONFIG_FILE, TEST_CONFIG_FILE)
.option(OPTION_TABLE, PERSON_TBL_NAME)
.option(OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS, "id")
.option(OPTION_STREAMER_ALLOW_OVERWRITE, "not_a_boolean")
.mode(Overwrite)
.save()
}
}
it("Should throw exception if saving data frame as a new table with non-PUBLIC schema") {
val ex = intercept[IgniteException] {
personDataFrame.write
.format(FORMAT_IGNITE)
.option(OPTION_CONFIG_FILE, TEST_CONFIG_FILE)
.option(OPTION_TABLE, "nonexistant-table-name")
.option(OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS, "id")
.option(OPTION_SCHEMA, "mySchema")
.save()
}
assertEquals(ex.getMessage,
"Creating new tables in schema mySchema is not valid, tables must only be created in " +
org.apache.ignite.internal.processors.query.QueryUtils.DFLT_SCHEMA)
}
}
override protected def beforeAll(): Unit = {
super.beforeAll()
createPersonTable(client, "cache1")
createPersonTable2(client, "cache1")
createCityTable(client, "cache1")
personDataFrame = spark.read
.format(FORMAT_IGNITE)
.option(OPTION_CONFIG_FILE, TEST_CONFIG_FILE)
.option(OPTION_TABLE, PERSON_TBL_NAME)
.load()
personDataFrame.createOrReplaceTempView("person")
}
/**
* @param tbl Table name.
* @return Count of rows in table.
*/
protected def rowsCount(tbl: String): Long = readTable(tbl).count()
/**
* @param tbl Table name.
* @return Ignite Table DataFrame.
*/
protected def readTable(tbl: String): DataFrame =
spark.read
.format(FORMAT_IGNITE)
.option(OPTION_CONFIG_FILE, TEST_CONFIG_FILE)
.option(OPTION_TABLE, tbl)
.load()
}