blob: 9593c8ebd887552bcbc0fb09e2a6d4cf39525509 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.ignite.spark
import org.apache.ignite.spark.AbstractDataFrameSpec.TEST_CONFIG_FILE
import org.apache.ignite.spark.IgniteDataFrameSettings._
import org.apache.spark.sql.DataFrame
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
* Tests to check all kinds of SQL queries from Spark SQL engine to Ignite SQL table.
class IgniteSQLDataFrameSpec extends AbstractDataFrameSpec {
var personDataFrame: DataFrame = _
describe("DataFrame for a Ignite SQL table") {
it("Should correct filter with EqualTo Clause") {
val res = spark.sqlContext.sql("SELECT name FROM person WHERE id = 2").rdd
res.count should equal(1)
val persons = res.collect
persons(0).getAs[String]("name") should equal("Jane Roe")
it("Should correct filter with EqualToNullSafe Clause") {
val res = spark.sqlContext.sql("SELECT id FROM person WHERE name = 'Jane Roe'").rdd
res.count should equal(1)
val persons = res.collect
persons(0).getAs[Long]("id") should equal(2)
it("Should correct filter with GreaterThen Clause") {
val res = spark.sqlContext.sql("SELECT id, name FROM person WHERE id > 3").rdd
res.count should equal(2)
val persons = res.collect.sortBy(_.getAs[Long]("id"))
persons(0).getAs[String]("name") should equal("Richard Miles")
persons(1).getAs[String]("name") should equal(null)
it("Should correct filter with GreaterThenOrEqual Clause") {
val res = spark.sqlContext.sql("SELECT id, name FROM person WHERE id >= 3").rdd
res.count should equal(3)
val persons = res.collect.sortBy(_.getAs[Long]("id"))
persons(0).getAs[String]("name") should equal("Mary Major")
persons(1).getAs[String]("name") should equal("Richard Miles")
persons(2).getAs[String]("name") should equal(null)
it("Should correct filter with LessThan Clause") {
val res = spark.sqlContext.sql("SELECT name FROM person WHERE id < 2").rdd
res.count should equal(1)
val persons = res.collect
persons(0).getAs[String]("name") should equal("John Doe")
it("Should correct filter with LessThanOrEqual Clause") {
val res = spark.sqlContext.sql("SELECT id, name FROM person WHERE id <= 2").rdd
res.count should equal(2)
val persons = res.collect.sortBy(_.getAs[Long]("id"))
persons(0).getAs[String]("name") should equal("John Doe")
persons(1).getAs[String]("name") should equal("Jane Roe")
it("Should correct filter with In Clause") {
val res = spark.sqlContext.sql(
"SELECT id FROM person WHERE name in ('Jane Roe', 'Richard Miles', 'Unknown Person')").rdd
res.count should equal(2)
val persons = res.collect.sortBy(_.getAs[Long]("id"))
persons(0).getAs[Long]("id") should equal(2L)
persons(1).getAs[Long]("id") should equal(4L)
it("Should correct filter with IsNull Clause") {
val res = spark.sqlContext.sql(
"SELECT id FROM person WHERE name IS NULL").rdd
res.count should equal(1)
val persons = res.collect
persons(0).getAs[Long]("id") should equal(5L)
it("Should correct filter with IsNotNull Clause") {
val res = spark.sqlContext.sql(
"SELECT id FROM person WHERE name IS NOT NULL").rdd
res.count should equal(4) ⇒ r.getAs[Long]("id")).sorted should equal(Array(1, 2, 3, 4))
it("Should correct filter with And Clause") {
val res = spark.sqlContext.sql("SELECT id, name FROM person WHERE id <= 4 AND name = 'Jane Roe'").rdd
res.count should equal(1)
val persons = res.collect.sortBy(_.getAs[Long]("id"))
persons(0).getAs[Long]("id") should equal(2)
persons(0).getAs[String]("name") should equal("Jane Roe")
it("Should correct filter with Or Clause") {
val res = spark.sqlContext.sql("SELECT id, name FROM person WHERE id = 2 OR name = 'John Doe'").rdd
res.count should equal(2)
val persons = res.collect.sortBy(_.getAs[Long]("id"))
persons(0).getAs[Long]("id") should equal(1)
persons(0).getAs[String]("name") should equal("John Doe")
persons(1).getAs[Long]("id") should equal(2)
persons(1).getAs[String]("name") should equal("Jane Roe")
it("Should correct filter with Not Clause") {
val res = spark.sqlContext.sql("SELECT id FROM person WHERE NOT(name is null)").rdd
res.count should equal(4) ⇒ r.getAs[Long]("id")).sorted should equal(Array(1, 2, 3, 4))
it("Should correct filter with StringStartsWith Clause") {
val res = spark.sqlContext.sql("SELECT id, name FROM person WHERE name LIKE 'J%'").rdd
res.count should equal(2)
val persons = res.collect.sortBy(_.getAs[Long]("id"))
persons(0).getAs[Long]("id") should equal(1)
persons(0).getAs[String]("name") should equal("John Doe")
persons(1).getAs[Long]("id") should equal(2)
persons(1).getAs[String]("name") should equal("Jane Roe")
it("Should correct filter with StringEndsWith Clause") {
val res = spark.sqlContext.sql("SELECT id, name FROM person WHERE name LIKE '%e'").rdd
res.count should equal(2)
val persons = res.collect.sortBy(_.getAs[Long]("id"))
persons(0).getAs[Long]("id") should equal(1)
persons(0).getAs[String]("name") should equal("John Doe")
persons(1).getAs[Long]("id") should equal(2)
persons(1).getAs[String]("name") should equal("Jane Roe")
it("Should correct filter with StringContains Clause") {
val res = spark.sqlContext.sql("SELECT id, name FROM person WHERE name LIKE '%M%'").rdd
res.count should equal(2)
val persons = res.collect.sortBy(_.getAs[Long]("id"))
persons(0).getAs[Long]("id") should equal(3)
persons(0).getAs[String]("name") should equal("Mary Major")
persons(1).getAs[Long]("id") should equal(4)
persons(1).getAs[String]("name") should equal("Richard Miles")
it("Should correct calculate MAX aggregate function") {
val res = spark.sqlContext.sql("SELECT max(id) FROM person").rdd
res.count should equal(1)
val persons = res.collect
persons(0).getAs[Long]("max(id)") should equal(5)
it("Should correct calculate MIN aggregate function") {
val res = spark.sqlContext.sql("SELECT min(id) FROM person").rdd
res.count should equal(1)
val persons = res.collect
persons(0).getAs[Long]("min(id)") should equal(1)
it("Should correct calculate AVG aggregate function") {
val res = spark.sqlContext.sql("SELECT avg(id) FROM person WHERE id = 1 OR id = 2").rdd
res.count should equal(1)
val persons = res.collect
persons(0).getAs[Double]("avg(id)") should equal(1.5D)
it("Should correct calculate COUNT(*) aggregate function") {
val res = spark.sqlContext.sql("SELECT count(*) FROM person").rdd
res.count should equal(1)
val persons = res.collect
persons(0).getAs[Long]("count(1)") should equal(5)
it("Should correct execute GROUP BY query") {
val res = spark.sqlContext.sql("SELECT count(1), city_id FROM person GROUP BY city_id").rdd
res.count should equal(3)
val persons = res.collect.sortBy(_.getAs[Long]("city_id"))
persons(0).getAs[Long]("city_id") should equal(1)
persons(0).getAs[Long]("count(1)") should equal(1)
persons(1).getAs[Long]("city_id") should equal(2)
persons(1).getAs[Long]("count(1)") should equal(3)
persons(2).getAs[Long]("city_id") should equal(3)
persons(2).getAs[Long]("count(1)") should equal(1)
it("Should correct execute GROUP BY with HAVING query") {
val res = spark.sqlContext.sql("SELECT count(1), city_id FROM person GROUP BY city_id HAVING count(1) > 1").rdd
res.count should equal(1)
val persons = res.collect.sortBy(_.getAs[Long]("city_id"))
persons(0).getAs[Long]("city_id") should equal(2)
persons(0).getAs[Long]("count(1)") should equal(3)
it("should use the schema name where one is specified") {
// `employeeCache1` is created in the schema matching the name of the cache, ie. `employeeCache1`.
createEmployeeCache(client, "employeeCache1")
.option(OPTION_TABLE, "employee")
.option(OPTION_SCHEMA, "employeeCache1")
// `employeeCache2` is created with a custom schema of `employeeSchema`.
createEmployeeCache(client, "employeeCache2", Some("employeeSchema"))
Iterators.size(client.cache("employeeCache2").iterator()) should equal(3)
// Remove a value from `employeeCache2` so that we know whether the select statement picks up the
// correct cache, ie. it should now have 2 values compared to 3 in `employeeCache1`.
client.cache("employeeCache2").remove("key1") shouldBe true
.option(OPTION_TABLE, "employee")
.option(OPTION_SCHEMA, "employeeSchema")
val res = spark.sqlContext.sql("SELECT id FROM employeeWithSchema").rdd
res.count should equal(3)
val res2 = spark.sqlContext.sql("SELECT id FROM employeeWithSchema2").rdd
res2.count should equal(2)
override protected def beforeAll(): Unit = {
createPersonTable(client, "cache1")
personDataFrame =
.option(OPTION_TABLE, "person")