blob: e0c904a0556c1bb3ddc6153af397794df8194837 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.spark
import org.apache.ignite.Ignite
import org.apache.ignite.cache.query.SqlFieldsQuery
import org.apache.spark.sql.ignite.IgniteSparkSession
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
import java.lang.{DoubleJDouble, LongJLong}
import org.apache.ignite.internal.IgnitionEx
import org.apache.ignite.spark.AbstractDataFrameSpec.{DEFAULT_CACHE, TEST_CONFIG_FILE, checkOptimizationResult, enclose}
/**
*/
@RunWith(classOf[JUnitRunner])
class IgniteOptimizationSystemFuncSpec extends AbstractDataFrameSpec {
var igniteSession: IgniteSparkSession = _
describe("Supported optimized system functions") {
it("COALESCE") {
val df = igniteSession.sql("SELECT COALESCE(int_val1, int_val2) FROM numbers WHERE id IN (1, 2, 3)")
checkOptimizationResult(df, "SELECT COALESCE(int_val1, int_val2) FROM numbers WHERE id IN (1, 2, 3)")
val data = (1, 2, 3)
checkQueryData(df, data)
}
it("GREATEST") {
val df = igniteSession.sql("SELECT GREATEST(int_val1, int_val2) FROM numbers WHERE id IN (4, 5)")
checkOptimizationResult(df, "SELECT GREATEST(int_val1, int_val2) FROM numbers WHERE id IN (4, 5)")
val data = (4, 6)
checkQueryData(df, data)
}
it("LEAST") {
val df = igniteSession.sql("SELECT LEAST(int_val1, int_val2) FROM numbers WHERE id IN (4, 5)")
checkOptimizationResult(df, "SELECT LEAST(int_val1, int_val2) FROM numbers WHERE id IN (4, 5)")
val data = (3, 5)
checkQueryData(df, data)
}
it("IFNULL") {
val df = igniteSession.sql("SELECT IFNULL(int_val1, int_val2) FROM numbers WHERE id IN (1, 2, 3)")
checkOptimizationResult(df, "SELECT COALESCE(int_val1, int_val2) as \"ifnull(default.numbers.`int_val1`, default.numbers.`int_val2`)\" FROM numbers WHERE id IN (1, 2, 3)")
val data = (1, 2, 3)
checkQueryData(df, data)
}
it("NULLIF") {
val df = igniteSession.sql("SELECT id, NULLIF(int_val1, int_val2) FROM numbers WHERE id IN (6, 7)")
checkOptimizationResult(df)
val data = (
(6, null),
(7, 8))
checkQueryData(df, data)
}
it("NVL2") {
val df = igniteSession.sql("SELECT id, NVL2(int_val1, 'not null', 'null') FROM numbers WHERE id IN (1, 2, 3)")
checkOptimizationResult(df)
val data = (
(1, "not null"),
(2, "null"),
(3, "not null"))
checkQueryData(df, data)
}
}
def createNumberTable(client: Ignite, cacheName: String): Unit = {
val cache = client.cache(cacheName)
cache.query(new SqlFieldsQuery(
"""
| CREATE TABLE numbers (
| id LONG,
| int_val1 LONG,
| int_val2 LONG,
| PRIMARY KEY (id)) WITH "backups=1"
""".stripMargin)).getAll
val qry = new SqlFieldsQuery("INSERT INTO numbers (id, int_val1, int_val2) values (?, ?, ?)")
cache.query(qry.setArgs(1L.asInstanceOf[JLong], 1L.asInstanceOf[JLong], null)).getAll
cache.query(qry.setArgs(2L.asInstanceOf[JLong], null, 2L.asInstanceOf[JLong])).getAll
cache.query(qry.setArgs(3L.asInstanceOf[JLong], 3L.asInstanceOf[JLong], null)).getAll
cache.query(qry.setArgs(4L.asInstanceOf[JLong], 3L.asInstanceOf[JLong], 4L.asInstanceOf[JLong])).getAll
cache.query(qry.setArgs(5L.asInstanceOf[JLong], 6L.asInstanceOf[JLong], 5L.asInstanceOf[JLong])).getAll
cache.query(qry.setArgs(6L.asInstanceOf[JLong], 7L.asInstanceOf[JLong], 7L.asInstanceOf[JLong])).getAll
cache.query(qry.setArgs(7L.asInstanceOf[JLong], 8L.asInstanceOf[JLong], 9L.asInstanceOf[JLong])).getAll
}
override protected def beforeAll(): Unit = {
super.beforeAll()
createNumberTable(client, DEFAULT_CACHE)
val configProvider = enclose(null) (x ⇒ (){
val cfg = IgnitionEx.loadConfiguration(TEST_CONFIG_FILE).get1()
cfg.setClientMode(true)
cfg.setIgniteInstanceName("client-2")
cfg
})
igniteSession = IgniteSparkSession.builder()
.config(spark.sparkContext.getConf)
.igniteConfigProvider(configProvider)
.getOrCreate()
}
}