blob: 2cb41ae34b73c45ee48755f387b7e3cb1e6003c4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.spark
import org.apache.ignite.Ignite
import org.apache.ignite.cache.query.SqlFieldsQuery
import org.apache.ignite.internal.IgnitionEx
import org.apache.ignite.spark.AbstractDataFrameSpec.{DEFAULT_CACHE, TEST_CONFIG_FILE, checkOptimizationResult, enclose}
import org.apache.spark.sql.ignite.IgniteSparkSession
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
import java.lang.{DoubleJDouble, LongJLong}
/**
*/
@RunWith(classOf[JUnitRunner])
class IgniteOptimizationMathFuncSpec extends AbstractDataFrameSpec {
var igniteSession: IgniteSparkSession = _
describe("Supported optimized string functions") {
it("ABS") {
val df = igniteSession.sql("SELECT ABS(val) FROM numbers WHERE id = 6")
checkOptimizationResult(df, "SELECT ABS(val) FROM numbers WHERE id = 6")
val data = Tuple1(.5)
checkQueryData(df, data)
}
it("ACOS") {
val df = igniteSession.sql("SELECT ACOS(val) FROM numbers WHERE id = 7")
checkOptimizationResult(df, "SELECT ACOS(val) FROM numbers WHERE id = 7")
val data = Tuple1(Math.PI)
checkQueryData(df, data)
}
it("ASIN") {
val df = igniteSession.sql("SELECT ASIN(val) FROM numbers WHERE id = 7")
checkOptimizationResult(df, "SELECT ASIN(val) FROM numbers WHERE id = 7")
val data = Tuple1(-Math.PI/2)
checkQueryData(df, data)
}
it("ATAN") {
val df = igniteSession.sql("SELECT ATAN(val) FROM numbers WHERE id = 7")
checkOptimizationResult(df, "SELECT ATAN(val) FROM numbers WHERE id = 7")
val data = Tuple1(-Math.PI/4)
checkQueryData(df, data)
}
it("COS") {
val df = igniteSession.sql("SELECT COS(val) FROM numbers WHERE id = 1")
checkOptimizationResult(df, "SELECT COS(val) FROM numbers WHERE id = 1")
val data = Tuple1(1.0)
checkQueryData(df, data)
}
it("SIN") {
val df = igniteSession.sql("SELECT SIN(val) FROM numbers WHERE id = 1")
checkOptimizationResult(df, "SELECT SIN(val) FROM numbers WHERE id = 1")
val data = Tuple1(.0)
checkQueryData(df, data)
}
it("TAN") {
val df = igniteSession.sql("SELECT TAN(val) FROM numbers WHERE id = 1")
checkOptimizationResult(df, "SELECT TAN(val) FROM numbers WHERE id = 1")
val data = Tuple1(.0)
checkQueryData(df, data)
}
it("COSH") {
val df = igniteSession.sql("SELECT COSH(val) FROM numbers WHERE id = 1")
checkOptimizationResult(df, "SELECT COSH(val) FROM numbers WHERE id = 1")
val data = Tuple1(1.0)
checkQueryData(df, data)
}
it("SINH") {
val df = igniteSession.sql("SELECT SINH(val) FROM numbers WHERE id = 1")
checkOptimizationResult(df, "SELECT SINH(val) FROM numbers WHERE id = 1")
val data = Tuple1(.0)
checkQueryData(df, data)
}
it("TANH") {
val df = igniteSession.sql("SELECT TANH(val) FROM numbers WHERE id = 1")
checkOptimizationResult(df, "SELECT TANH(val) FROM numbers WHERE id = 1")
val data = Tuple1(.0)
checkQueryData(df, data)
}
it("ATAN2") {
val df = igniteSession.sql("SELECT ATAN2(val, 0.0) FROM numbers WHERE id = 1")
checkOptimizationResult(df, "SELECT ATAN2(val, 0.0) AS \"ATAN2(val, CAST(0.0 AS DOUBLE))\" " +
"FROM numbers WHERE id = 1")
val data = Tuple1(.0)
checkQueryData(df, data)
}
it("MOD") {
val df = igniteSession.sql("SELECT val % 9 FROM numbers WHERE id = 8")
checkOptimizationResult(df, "SELECT val % 9.0 as \"(val % CAST(9 AS DOUBLE))\" " +
"FROM numbers WHERE id = 8")
val data = Tuple1(6.0)
checkQueryData(df, data)
}
it("CEIL") {
val df = igniteSession.sql("SELECT CEIL(val) FROM numbers WHERE id = 2")
checkOptimizationResult(df, "SELECT CAST(CEIL(val) AS LONG) as \"CEIL(val)\" " +
"FROM numbers WHERE id = 2")
val data = Tuple1(1)
checkQueryData(df, data)
}
it("ROUND") {
val df = igniteSession.sql("SELECT id, ROUND(val) FROM numbers WHERE id IN (2, 9, 10)")
checkOptimizationResult(df, "SELECT id, ROUND(val, 0) FROM numbers WHERE id IN (2, 9, 10)")
val data = (
(2, 1.0),
(9, 1.0),
(10, 0.0))
checkQueryData(df, data)
}
it("FLOOR") {
val df = igniteSession.sql("SELECT FLOOR(val) FROM numbers WHERE id = 2")
checkOptimizationResult(df, "SELECT CAST(FLOOR(val) AS LONG) as \"FLOOR(val)\" FROM numbers " +
"WHERE id = 2")
val data = Tuple1(0)
checkQueryData(df, data)
}
it("POWER") {
val df = igniteSession.sql("SELECT POWER(val, 3) FROM numbers WHERE id = 4")
checkOptimizationResult(df, "SELECT POWER(val, 3.0) as \"POWER(val, CAST(3 AS DOUBLE))\" FROM numbers " +
"WHERE id = 4")
val data = Tuple1(8.0)
checkQueryData(df, data)
}
it("EXP") {
val df = igniteSession.sql("SELECT id, EXP(val) FROM numbers WHERE id IN (1, 3)")
checkOptimizationResult(df, "SELECT id, EXP(val) FROM numbers WHERE id IN (1, 3)")
val data = (
(1, 1),
(3, Math.E))
checkQueryData(df, data)
}
it("LOG") {
val df = igniteSession.sql("SELECT LOG(val) FROM numbers WHERE id = 12")
checkOptimizationResult(df, "SELECT LOG(val) as \"LOG(E(), val)\" FROM numbers " +
"WHERE id = 12")
val data = Tuple1(2.0)
checkQueryData(df, data)
}
it("LOG10") {
val df = igniteSession.sql("SELECT LOG10(val) FROM numbers WHERE id = 11")
checkOptimizationResult(df, "SELECT LOG10(val) FROM numbers WHERE id = 11")
val data = Tuple1(2.0)
checkQueryData(df, data)
}
it("DEGREES") {
val df = igniteSession.sql("SELECT DEGREES(val) FROM numbers WHERE id = 13")
checkOptimizationResult(df, "SELECT DEGREES(val) FROM numbers WHERE id = 13")
val data = Tuple1(180.0)
checkQueryData(df, data)
}
it("RADIANS") {
val df = igniteSession.sql("SELECT RADIANS(val) FROM numbers WHERE id = 14")
checkOptimizationResult(df, "SELECT RADIANS(val) FROM numbers WHERE id = 14")
val data = Tuple1(Math.PI)
checkQueryData(df, data)
}
it("BITAND") {
val df = igniteSession.sql("SELECT int_val&1 FROM numbers WHERE id = 15")
checkOptimizationResult(df, "SELECT BITAND(int_val, 1) as \"(int_val & CAST(1 AS BIGINT))\" FROM numbers " +
"WHERE id = 15")
val data = Tuple1(1)
checkQueryData(df, data)
}
it("BITOR") {
val df = igniteSession.sql("SELECT int_val|1 FROM numbers WHERE id = 16")
checkOptimizationResult(df, "SELECT BITOR(int_val, 1) as \"(int_val | CAST(1 AS BIGINt))\" FROM numbers " +
"WHERE id = 16")
val data = Tuple1(3)
checkQueryData(df, data)
}
it("BITXOR") {
val df = igniteSession.sql("SELECT int_val^1 FROM numbers WHERE id = 17")
checkOptimizationResult(df, "SELECT BITXOR(int_val, 1) AS \"(int_val ^ CAST(1 AS BIGINT))\" FROM numbers " +
"WHERE id = 17")
val data = Tuple1(2)
checkQueryData(df, data)
}
it("RAND") {
val df = igniteSession.sql("SELECT id, RAND(1) FROM numbers WHERE id = 17")
checkOptimizationResult(df, "SELECT id, RAND(1) FROM numbers WHERE id = 17")
val data = df.rdd.collect
assert(data(0).getAs[JLong]("id") == 17L)
assert(data(0).getAs[JDouble]("rand(1)") != null)
}
}
def createNumberTable(client: Ignite, cacheName: String): Unit = {
val cache = client.cache(cacheName)
cache.query(new SqlFieldsQuery(
"""
| CREATE TABLE numbers (
| id LONG,
| val DOUBLE,
| int_val LONG,
| PRIMARY KEY (id)) WITH "backups=1"
""".stripMargin)).getAll
var qry = new SqlFieldsQuery("INSERT INTO numbers (id, val) values (?, ?)")
cache.query(qry.setArgs(1L.asInstanceOf[JLong], .0.asInstanceOf[JDouble])).getAll
cache.query(qry.setArgs(2L.asInstanceOf[JLong], .5.asInstanceOf[JDouble])).getAll
cache.query(qry.setArgs(3L.asInstanceOf[JLong], 1.0.asInstanceOf[JDouble])).getAll
cache.query(qry.setArgs(4L.asInstanceOf[JLong], 2.0.asInstanceOf[JDouble])).getAll
cache.query(qry.setArgs(5L.asInstanceOf[JLong], 4.0.asInstanceOf[JDouble])).getAll
cache.query(qry.setArgs(6L.asInstanceOf[JLong], -0.5.asInstanceOf[JDouble])).getAll
cache.query(qry.setArgs(7L.asInstanceOf[JLong], -1.0.asInstanceOf[JDouble])).getAll
cache.query(qry.setArgs(8L.asInstanceOf[JLong], 42.0.asInstanceOf[JDouble])).getAll
cache.query(qry.setArgs(9L.asInstanceOf[JLong], .51.asInstanceOf[JDouble])).getAll
cache.query(qry.setArgs(10L.asInstanceOf[JLong], .49.asInstanceOf[JDouble])).getAll
cache.query(qry.setArgs(11L.asInstanceOf[JLong], 100.0.asInstanceOf[JDouble])).getAll
cache.query(qry.setArgs(12L.asInstanceOf[JLong], (Math.E*Math.E).asInstanceOf[JDouble])).getAll
cache.query(qry.setArgs(13L.asInstanceOf[JLong], Math.PI.asInstanceOf[JDouble])).getAll
cache.query(qry.setArgs(14L.asInstanceOf[JLong], 180.0.asInstanceOf[JDouble])).getAll
qry = new SqlFieldsQuery("INSERT INTO numbers (id, int_val) values (?, ?)")
cache.query(qry.setArgs(15L.asInstanceOf[JLong], 1L.asInstanceOf[JLong])).getAll
cache.query(qry.setArgs(16L.asInstanceOf[JLong], 2L.asInstanceOf[JLong])).getAll
cache.query(qry.setArgs(17L.asInstanceOf[JLong], 3L.asInstanceOf[JLong])).getAll
}
override protected def beforeAll(): Unit = {
super.beforeAll()
createNumberTable(client, DEFAULT_CACHE)
val configProvider = enclose(null) (x ⇒ (){
val cfg = IgnitionEx.loadConfiguration(TEST_CONFIG_FILE).get1()
cfg.setClientMode(true)
cfg.setIgniteInstanceName("client-2")
cfg
})
igniteSession = IgniteSparkSession.builder()
.config(spark.sparkContext.getConf)
.igniteConfigProvider(configProvider)
.getOrCreate()
}
}