blob: 7912cd0307d5bddeaa25789a75e22b6f62171f39 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.spark
import org.apache.ignite.Ignite
import org.apache.ignite.cache.query.SqlFieldsQuery
import org.apache.ignite.internal.IgnitionEx
import org.apache.ignite.spark.AbstractDataFrameSpec.{DEFAULT_CACHE, TEST_CONFIG_FILE, checkOptimizationResult, enclose}
import org.apache.spark.sql.ignite.IgniteSparkSession
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
import java.lang.{LongJLong}
import java.util.{DateJDate}
import java.text.SimpleDateFormat
import java.util.concurrent.TimeUnit
import java.util.concurrent.TimeUnit.DAYS
/**
*/
@RunWith(classOf[JUnitRunner])
class IgniteOptimizationDateFuncSpec extends AbstractDataFrameSpec {
var igniteSession: IgniteSparkSession = _
val format = new SimpleDateFormat("dd.MM.yyyy HH:mm:ss")
describe("Supported optimized date functions") {
it(" - CURRENT_TIMESTAMP") {
val df = igniteSession.sql("SELECT id, CURRENT_TIMESTAMP() FROM dates WHERE id = 1")
checkOptimizationResult(df)
val data = df.rdd.collect
assert(data(0).getAs[JLong]("id") == 1L)
val date: JDate = data(0).getAs[JDate]("current_timestamp()")
val millisDiff = new JDate().getTime - date.getTime
assert(millisDiff <= 30000)
}
it(" - CURRENT_DATE") {
val df = igniteSession.sql("SELECT id, CURRENT_DATE() FROM dates WHERE id = 1")
checkOptimizationResult(df)
val data = df.rdd.collect
assert(data(0).getAs[JLong]("id") == 1L)
val date: JDate = data(0).getAs[JDate]("current_date()")
val dayDiff = DAYS.convert(new JDate().getTime - date.getTime, TimeUnit.MILLISECONDS)
assert(dayDiff <= 1)
}
it(" - HOUR") {
val df = igniteSession.sql("SELECT HOUR(val) FROM dates WHERE id = 1")
checkOptimizationResult(df)
val data = Tuple1(0)
checkQueryData(df, data)
}
it(" - MINUTE") {
val df = igniteSession.sql("SELECT MINUTE(val) FROM dates WHERE id = 1")
checkOptimizationResult(df)
val data = Tuple1(0)
checkQueryData(df, data)
}
it(" - SECOND") {
val df = igniteSession.sql("SELECT SECOND(val) FROM dates WHERE id = 1")
checkOptimizationResult(df)
val data = Tuple1(0)
checkQueryData(df, data)
}
it(" - MONTH") {
val df = igniteSession.sql("SELECT MONTH(val) FROM dates WHERE id = 1")
checkOptimizationResult(df)
val data = Tuple1(0)
checkQueryData(df, data)
}
it(" - YEAR") {
val df = igniteSession.sql("SELECT YEAR(val) FROM dates WHERE id = 1")
checkOptimizationResult(df)
val data = Tuple1(2017)
checkQueryData(df, data)
}
it(" - QUARTER") {
val df = igniteSession.sql("SELECT QUARTER(val) FROM dates WHERE id = 1")
checkOptimizationResult(df)
val data = Tuple1(1)
checkQueryData(df, data)
}
it(" - WEEK") {
val df = igniteSession.sql("SELECT WEEKOFYEAR(val) FROM dates WHERE id = 1")
checkOptimizationResult(df)
val data = Tuple1(1)
checkQueryData(df, data)
}
it(" - DAY_OF_MONTH") {
val df = igniteSession.sql("SELECT DAYOFMONTH(val) FROM dates WHERE id = 1")
checkOptimizationResult(df)
val data = Tuple1(1)
checkQueryData(df, data)
}
it(" - DAY_OF_YEAR") {
val df = igniteSession.sql("SELECT DAYOFYEAR(val) FROM dates WHERE id = 1")
checkOptimizationResult(df)
val data = Tuple1(1)
checkQueryData(df, data)
}
it(" - DATE_ADD") {
val df = igniteSession.sql("SELECT DATE_ADD(val, 2) FROM dates WHERE id = 1")
checkOptimizationResult(df)
val data = Tuple1(format.parse("03.01.2017 00:00:00"))
checkQueryData(df, data)
}
it(" - DATEDIFF") {
val df = igniteSession.sql("SELECT " +
"DATEDIFF(val, TO_DATE('2017-01-02 00:00:00.000', 'yyyy-MM-dd HH:mm:ss.SSS')) FROM dates WHERE id = 1")
checkOptimizationResult(df)
val data = Tuple1(1)
checkQueryData(df, data)
}
it(" - FORMATDATETIME") {
val df = igniteSession.sql("SELECT DATE_FORMAT(val, 'yyyy-MM-dd HH:mm:ss.SSS') FROM dates WHERE id = 1")
checkOptimizationResult(df)
val data = Tuple1("2017-01-01 00:00:00.000")
checkQueryData(df, data)
}
}
def createDateTable(client: Ignite, cacheName: String): Unit = {
val cache = client.cache(cacheName)
cache.query(new SqlFieldsQuery(
"""
| CREATE TABLE dates (
| id LONG,
| val DATE,
| PRIMARY KEY (id)) WITH "backups=1"
""".stripMargin)).getAll
val qry = new SqlFieldsQuery("INSERT INTO dates(id, val) values (?, ?)")
cache.query(qry.setArgs(1L.asInstanceOf[JLong], format.parse("01.01.2017 00:00:00"))).getAll
}
override protected def beforeAll(): Unit = {
super.beforeAll()
createDateTable(client, DEFAULT_CACHE)
val configProvider = enclose(null) (x ⇒ (){
val cfg = IgnitionEx.loadConfiguration(TEST_CONFIG_FILE).get1()
cfg.setClientMode(true)
cfg.setIgniteInstanceName("client-2")
cfg
})
igniteSession = IgniteSparkSession.builder()
.config(spark.sparkContext.getConf)
.igniteConfigProvider(configProvider)
.getOrCreate()
}
}