blob: 9593c8ebd887552bcbc0fb09e2a6d4cf39525509 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.spark
import com.google.common.collect.Iterators
import org.apache.ignite.spark.AbstractDataFrameSpec.TEST_CONFIG_FILE
import org.apache.ignite.spark.IgniteDataFrameSettings._
import org.apache.spark.sql.DataFrame
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
/**
* Tests to check all kinds of SQL queries from Spark SQL engine to Ignite SQL table.
*/
@RunWith(classOf[JUnitRunner])
class IgniteSQLDataFrameSpec extends AbstractDataFrameSpec {
var personDataFrame: DataFrame = _
describe("DataFrame for a Ignite SQL table") {
it("Should correct filter with EqualTo Clause") {
val res = spark.sqlContext.sql("SELECT name FROM person WHERE id = 2").rdd
res.count should equal(1)
val persons = res.collect
persons(0).getAs[String]("name") should equal("Jane Roe")
}
it("Should correct filter with EqualToNullSafe Clause") {
val res = spark.sqlContext.sql("SELECT id FROM person WHERE name = 'Jane Roe'").rdd
res.count should equal(1)
val persons = res.collect
persons(0).getAs[Long]("id") should equal(2)
}
it("Should correct filter with GreaterThen Clause") {
val res = spark.sqlContext.sql("SELECT id, name FROM person WHERE id > 3").rdd
res.count should equal(2)
val persons = res.collect.sortBy(_.getAs[Long]("id"))
persons(0).getAs[String]("name") should equal("Richard Miles")
persons(1).getAs[String]("name") should equal(null)
}
it("Should correct filter with GreaterThenOrEqual Clause") {
val res = spark.sqlContext.sql("SELECT id, name FROM person WHERE id >= 3").rdd
res.count should equal(3)
val persons = res.collect.sortBy(_.getAs[Long]("id"))
persons(0).getAs[String]("name") should equal("Mary Major")
persons(1).getAs[String]("name") should equal("Richard Miles")
persons(2).getAs[String]("name") should equal(null)
}
it("Should correct filter with LessThan Clause") {
val res = spark.sqlContext.sql("SELECT name FROM person WHERE id < 2").rdd
res.count should equal(1)
val persons = res.collect
persons(0).getAs[String]("name") should equal("John Doe")
}
it("Should correct filter with LessThanOrEqual Clause") {
val res = spark.sqlContext.sql("SELECT id, name FROM person WHERE id <= 2").rdd
res.count should equal(2)
val persons = res.collect.sortBy(_.getAs[Long]("id"))
persons(0).getAs[String]("name") should equal("John Doe")
persons(1).getAs[String]("name") should equal("Jane Roe")
}
it("Should correct filter with In Clause") {
val res = spark.sqlContext.sql(
"SELECT id FROM person WHERE name in ('Jane Roe', 'Richard Miles', 'Unknown Person')").rdd
res.count should equal(2)
val persons = res.collect.sortBy(_.getAs[Long]("id"))
persons(0).getAs[Long]("id") should equal(2L)
persons(1).getAs[Long]("id") should equal(4L)
}
it("Should correct filter with IsNull Clause") {
val res = spark.sqlContext.sql(
"SELECT id FROM person WHERE name IS NULL").rdd
res.count should equal(1)
val persons = res.collect
persons(0).getAs[Long]("id") should equal(5L)
}
it("Should correct filter with IsNotNull Clause") {
val res = spark.sqlContext.sql(
"SELECT id FROM person WHERE name IS NOT NULL").rdd
res.count should equal(4)
res.collect.map(r ⇒ r.getAs[Long]("id")).sorted should equal(Array(1, 2, 3, 4))
}
it("Should correct filter with And Clause") {
val res = spark.sqlContext.sql("SELECT id, name FROM person WHERE id <= 4 AND name = 'Jane Roe'").rdd
res.count should equal(1)
val persons = res.collect.sortBy(_.getAs[Long]("id"))
persons(0).getAs[Long]("id") should equal(2)
persons(0).getAs[String]("name") should equal("Jane Roe")
}
it("Should correct filter with Or Clause") {
val res = spark.sqlContext.sql("SELECT id, name FROM person WHERE id = 2 OR name = 'John Doe'").rdd
res.count should equal(2)
val persons = res.collect.sortBy(_.getAs[Long]("id"))
persons(0).getAs[Long]("id") should equal(1)
persons(0).getAs[String]("name") should equal("John Doe")
persons(1).getAs[Long]("id") should equal(2)
persons(1).getAs[String]("name") should equal("Jane Roe")
}
it("Should correct filter with Not Clause") {
val res = spark.sqlContext.sql("SELECT id FROM person WHERE NOT(name is null)").rdd
res.count should equal(4)
res.collect.map(r ⇒ r.getAs[Long]("id")).sorted should equal(Array(1, 2, 3, 4))
}
it("Should correct filter with StringStartsWith Clause") {
val res = spark.sqlContext.sql("SELECT id, name FROM person WHERE name LIKE 'J%'").rdd
res.count should equal(2)
val persons = res.collect.sortBy(_.getAs[Long]("id"))
persons(0).getAs[Long]("id") should equal(1)
persons(0).getAs[String]("name") should equal("John Doe")
persons(1).getAs[Long]("id") should equal(2)
persons(1).getAs[String]("name") should equal("Jane Roe")
}
it("Should correct filter with StringEndsWith Clause") {
val res = spark.sqlContext.sql("SELECT id, name FROM person WHERE name LIKE '%e'").rdd
res.count should equal(2)
val persons = res.collect.sortBy(_.getAs[Long]("id"))
persons(0).getAs[Long]("id") should equal(1)
persons(0).getAs[String]("name") should equal("John Doe")
persons(1).getAs[Long]("id") should equal(2)
persons(1).getAs[String]("name") should equal("Jane Roe")
}
it("Should correct filter with StringContains Clause") {
val res = spark.sqlContext.sql("SELECT id, name FROM person WHERE name LIKE '%M%'").rdd
res.count should equal(2)
val persons = res.collect.sortBy(_.getAs[Long]("id"))
persons(0).getAs[Long]("id") should equal(3)
persons(0).getAs[String]("name") should equal("Mary Major")
persons(1).getAs[Long]("id") should equal(4)
persons(1).getAs[String]("name") should equal("Richard Miles")
}
it("Should correct calculate MAX aggregate function") {
val res = spark.sqlContext.sql("SELECT max(id) FROM person").rdd
res.count should equal(1)
val persons = res.collect
persons(0).getAs[Long]("max(id)") should equal(5)
}
it("Should correct calculate MIN aggregate function") {
val res = spark.sqlContext.sql("SELECT min(id) FROM person").rdd
res.count should equal(1)
val persons = res.collect
persons(0).getAs[Long]("min(id)") should equal(1)
}
it("Should correct calculate AVG aggregate function") {
val res = spark.sqlContext.sql("SELECT avg(id) FROM person WHERE id = 1 OR id = 2").rdd
res.count should equal(1)
val persons = res.collect
persons(0).getAs[Double]("avg(id)") should equal(1.5D)
}
it("Should correct calculate COUNT(*) aggregate function") {
val res = spark.sqlContext.sql("SELECT count(*) FROM person").rdd
res.count should equal(1)
val persons = res.collect
persons(0).getAs[Long]("count(1)") should equal(5)
}
it("Should correct execute GROUP BY query") {
val res = spark.sqlContext.sql("SELECT count(1), city_id FROM person GROUP BY city_id").rdd
res.count should equal(3)
val persons = res.collect.sortBy(_.getAs[Long]("city_id"))
persons(0).getAs[Long]("city_id") should equal(1)
persons(0).getAs[Long]("count(1)") should equal(1)
persons(1).getAs[Long]("city_id") should equal(2)
persons(1).getAs[Long]("count(1)") should equal(3)
persons(2).getAs[Long]("city_id") should equal(3)
persons(2).getAs[Long]("count(1)") should equal(1)
}
it("Should correct execute GROUP BY with HAVING query") {
val res = spark.sqlContext.sql("SELECT count(1), city_id FROM person GROUP BY city_id HAVING count(1) > 1").rdd
res.count should equal(1)
val persons = res.collect.sortBy(_.getAs[Long]("city_id"))
persons(0).getAs[Long]("city_id") should equal(2)
persons(0).getAs[Long]("count(1)") should equal(3)
}
it("should use the schema name where one is specified") {
// `employeeCache1` is created in the schema matching the name of the cache, ie. `employeeCache1`.
createEmployeeCache(client, "employeeCache1")
spark.read
.format(FORMAT_IGNITE)
.option(OPTION_CONFIG_FILE, TEST_CONFIG_FILE)
.option(OPTION_TABLE, "employee")
.option(OPTION_SCHEMA, "employeeCache1")
.load()
.createOrReplaceTempView("employeeWithSchema")
// `employeeCache2` is created with a custom schema of `employeeSchema`.
createEmployeeCache(client, "employeeCache2", Some("employeeSchema"))
Iterators.size(client.cache("employeeCache2").iterator()) should equal(3)
// Remove a value from `employeeCache2` so that we know whether the select statement picks up the
// correct cache, ie. it should now have 2 values compared to 3 in `employeeCache1`.
client.cache("employeeCache2").remove("key1") shouldBe true
spark.read
.format(FORMAT_IGNITE)
.option(OPTION_CONFIG_FILE, TEST_CONFIG_FILE)
.option(OPTION_TABLE, "employee")
.option(OPTION_SCHEMA, "employeeSchema")
.load()
.createOrReplaceTempView("employeeWithSchema2")
val res = spark.sqlContext.sql("SELECT id FROM employeeWithSchema").rdd
res.count should equal(3)
val res2 = spark.sqlContext.sql("SELECT id FROM employeeWithSchema2").rdd
res2.count should equal(2)
}
}
override protected def beforeAll(): Unit = {
super.beforeAll()
createPersonTable(client, "cache1")
personDataFrame = spark.read
.format(FORMAT_IGNITE)
.option(OPTION_CONFIG_FILE, TEST_CONFIG_FILE)
.option(OPTION_TABLE, "person")
.load()
personDataFrame.createOrReplaceTempView("person")
}
}