blob: 92e509c00d304fda77292bef380dbe7fd4d172a7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.extensions.sql.zetasql;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.joda.time.Duration;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
/** Tests for ZetaSQL Math functions (on INT64, DOUBLE, NUMERIC types). */
@RunWith(JUnit4.class)
public class ZetaSqlMathFunctionsTest extends ZetaSqlTestBase {
@Rule public transient TestPipeline pipeline = TestPipeline.create();
@Rule public ExpectedException thrown = ExpectedException.none();
@Before
public void setUp() {
initialize();
}
/////////////////////////////////////////////////////////////////////////////
// INT64 type tests
/////////////////////////////////////////////////////////////////////////////
@Test
public void testArithmeticOperatorsInt64() {
String sql = "SELECT -1, 1 + 2, 1 - 2, 1 * 2, 1 / 2";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(
Schema.builder()
.addInt64Field("f_int64_1")
.addInt64Field("f_int64_2")
.addInt64Field("f_int64_3")
.addInt64Field("f_int64_4")
.addDoubleField("f_double")
.build())
.addValues(-1L, 3L, -1L, 2L, 0.5)
.build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testAbsInt64() {
String sql = "SELECT ABS(1), ABS(-1)";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(
Schema.builder().addInt64Field("f_int64_1").addInt64Field("f_int64_2").build())
.addValues(1L, 1L)
.build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testSignInt64() {
String sql = "SELECT SIGN(0), SIGN(5), SIGN(-5)";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(
Schema.builder()
.addInt64Field("f_int64_1")
.addInt64Field("f_int64_2")
.addInt64Field("f_int64_3")
.build())
.addValues(0L, 1L, -1L)
.build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testModInt64() {
String sql = "SELECT MOD(4, 2)";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(Schema.builder().addInt64Field("f_int64").build())
.addValues(0L)
.build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testDivInt64() {
String sql = "SELECT DIV(1, 2), DIV(2, 1)";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(
Schema.builder().addInt64Field("f_int64_1").addInt64Field("f_int64_2").build())
.addValues(0L, 2L)
.build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testSafeArithmeticFunctionsInt64() {
String sql =
"SELECT SAFE_ADD(9223372036854775807, 1), "
+ "SAFE_SUBTRACT(-9223372036854775808, 1), "
+ "SAFE_MULTIPLY(9223372036854775807, 2), "
+ "SAFE_DIVIDE(1, 0), "
+ "SAFE_NEGATE(-9223372036854775808)";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(
Schema.builder()
.addNullableField("f_int64_1", Schema.FieldType.INT64)
.addNullableField("f_int64_2", Schema.FieldType.INT64)
.addNullableField("f_int64_3", Schema.FieldType.INT64)
.addNullableField("f_int64_4", Schema.FieldType.INT64)
.addNullableField("f_int64_5", Schema.FieldType.INT64)
.build())
.addValues(null, null, null, null, null)
.build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
/////////////////////////////////////////////////////////////////////////////
// DOUBLE (FLOAT64) type tests
/////////////////////////////////////////////////////////////////////////////
@Test
public void testDoubleLiteral() {
String sql =
"SELECT 3.0, CAST('+inf' AS FLOAT64), CAST('-inf' AS FLOAT64), CAST('NaN' AS FLOAT64)";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(
Schema.builder()
.addDoubleField("f_double1")
.addDoubleField("f_double2")
.addDoubleField("f_double3")
.addDoubleField("f_double4")
.build())
.addValues(3.0, Double.POSITIVE_INFINITY, Double.NEGATIVE_INFINITY, Double.NaN)
.build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testArithmeticOperatorsDouble() {
String sql = "SELECT -1.5, 1.5 + 2.5, 1.5 - 2.5, 1.5 * 2.5, 1.5 / 2.5";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(
Schema.builder()
.addDoubleField("f_double1")
.addDoubleField("f_double2")
.addDoubleField("f_double3")
.addDoubleField("f_double4")
.addDoubleField("f_double5")
.build())
.addValues(-1.5, 4.0, -1.0, 3.75, 0.6)
.build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testEqualsInf() {
String sql =
"SELECT CAST('+inf' AS FLOAT64) = CAST('+inf' AS FLOAT64), "
+ "CAST('+inf' AS FLOAT64) = CAST('-inf' AS FLOAT64)";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(
Schema.builder()
.addBooleanField("f_boolean1")
.addBooleanField("f_boolean2")
.build())
.addValues(true, false)
.build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testEqualsNaN() {
String sql = "SELECT CAST('NaN' AS FLOAT64) = CAST('NaN' AS FLOAT64)";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(Schema.builder().addBooleanField("f_boolean").build())
.addValues(false)
.build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testAbsDouble() {
String sql = "SELECT ABS(1.5), ABS(-1.0), ABS(CAST('NaN' AS FLOAT64))";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(
Schema.builder()
.addDoubleField("f_double1")
.addDoubleField("f_double2")
.addDoubleField("f_double3")
.build())
.addValues(1.5, 1.0, Double.NaN)
.build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testSignDouble() {
String sql = "SELECT SIGN(-0.0), SIGN(1.5), SIGN(-1.5), SIGN(CAST('NaN' AS FLOAT64))";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(
Schema.builder()
.addDoubleField("f_double1")
.addDoubleField("f_double2")
.addDoubleField("f_double3")
.addDoubleField("f_double4")
.build())
.addValues(0.0, 1.0, -1.0, Double.NaN)
.build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testRoundDouble() {
String sql = "SELECT ROUND(1.23), ROUND(-1.27, 1)";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(
Schema.builder()
.addDoubleField("f_double1")
.addDoubleField("f_double2")
.build())
.addValues(1.0, -1.3)
.build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testTruncDouble() {
String sql = "SELECT TRUNC(1.23), TRUNC(-1.27, 1)";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(
Schema.builder()
.addDoubleField("f_double1")
.addDoubleField("f_double2")
.build())
.addValues(1.0, -1.2)
.build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testCeilDouble() {
String sql = "SELECT CEIL(1.2), CEIL(-1.2)";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(
Schema.builder()
.addDoubleField("f_double1")
.addDoubleField("f_double2")
.build())
.addValues(2.0, -1.0)
.build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testFloorDouble() {
String sql = "SELECT FLOOR(1.2), FLOOR(-1.2)";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(
Schema.builder()
.addDoubleField("f_double1")
.addDoubleField("f_double2")
.build())
.addValues(1.0, -2.0)
.build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testIsInf() {
String sql =
"SELECT IS_INF(CAST('+inf' AS FLOAT64)), IS_INF(CAST('-inf' AS FLOAT64)), IS_INF(3.0)";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(
Schema.builder()
.addBooleanField("f_boolean1")
.addBooleanField("f_boolean2")
.addBooleanField("f_boolean3")
.build())
.addValues(true, true, false)
.build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testIsNaN() {
String sql = "SELECT IS_NAN(CAST('NaN' AS FLOAT64)), IS_NAN(3.0)";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(
Schema.builder()
.addBooleanField("f_boolean1")
.addBooleanField("f_boolean2")
.build())
.addValues(true, false)
.build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testIeeeDivide() {
String sql = "SELECT IEEE_DIVIDE(1.0, 0.0)";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(Schema.builder().addDoubleField("f_double").build())
.addValues(Double.POSITIVE_INFINITY)
.build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testSafeDivide() {
String sql = "SELECT SAFE_DIVIDE(1.0, 0.0)";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(
Schema.builder().addNullableField("f_double", Schema.FieldType.DOUBLE).build())
.addValue(null)
.build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testSqrtDouble() {
String sql = "SELECT SQRT(4.0)";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(Schema.builder().addDoubleField("f_double").build())
.addValues(2.0)
.build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testPowDouble() {
String sql = "SELECT POW(2.0, 3.0)";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(Schema.builder().addDoubleField("f_double").build())
.addValues(8.0)
.build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testExpDouble() {
String sql = "SELECT EXP(2.0)";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(Schema.builder().addDoubleField("f_double").build())
.addValues(7.38905609893065)
.build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testLnDouble() {
String sql = "SELECT LN(7.38905609893065)";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(Schema.builder().addDoubleField("f_double").build())
.addValues(2.0)
.build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testLog10Double() {
String sql = "SELECT LOG10(100.0)";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(Schema.builder().addDoubleField("f_double").build())
.addValues(2.0)
.build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testLogDouble() {
String sql = "SELECT LOG(2.25, 1.5)";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(Schema.builder().addDoubleField("f_double").build())
.addValues(2.0)
.build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testTrigonometricFunctions() {
String sql =
"SELECT COS(0.0), COSH(0.0), ACOS(1.0), ACOSH(1.0), "
+ "SIN(0.0), SINH(0.0), ASIN(0.0), ASINH(0.0), "
+ "TAN(0.0), TANH(0.0), ATAN(0.0), ATANH(0.0), ATAN2(0.0, 0.0)";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(
Schema.builder()
.addDoubleField("f_double1")
.addDoubleField("f_double2")
.addDoubleField("f_double3")
.addDoubleField("f_double4")
.addDoubleField("f_double5")
.addDoubleField("f_double6")
.addDoubleField("f_double7")
.addDoubleField("f_double8")
.addDoubleField("f_double9")
.addDoubleField("f_double10")
.addDoubleField("f_double11")
.addDoubleField("f_double12")
.addDoubleField("f_double13")
.build())
.addValues(1.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0)
.build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
/////////////////////////////////////////////////////////////////////////////
// NUMERIC type tests
/////////////////////////////////////////////////////////////////////////////
@Test
public void testNumericLiteral() {
String sql =
"SELECT NUMERIC '0', "
+ "NUMERIC '123456', "
+ "NUMERIC '-3.14', "
+ "NUMERIC '-0.54321', "
+ "NUMERIC '1.23456e05', "
+ "NUMERIC '-9.876e-3', "
// min value for ZetaSQL NUMERIC type
+ "NUMERIC '-99999999999999999999999999999.999999999', "
// max value for ZetaSQL NUMERIC type
+ "NUMERIC '99999999999999999999999999999.999999999'";
;
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(
Schema.builder()
.addDecimalField("f_numeric1")
.addDecimalField("f_numeric2")
.addDecimalField("f_numeric3")
.addDecimalField("f_numeric4")
.addDecimalField("f_numeric5")
.addDecimalField("f_numeric6")
.addDecimalField("f_numeric7")
.addDecimalField("f_numeric8")
.build())
.addValues(
ZetaSqlTypesUtils.bigDecimalAsNumeric("0"),
ZetaSqlTypesUtils.bigDecimalAsNumeric("123456"),
ZetaSqlTypesUtils.bigDecimalAsNumeric("-3.14"),
ZetaSqlTypesUtils.bigDecimalAsNumeric("-0.54321"),
ZetaSqlTypesUtils.bigDecimalAsNumeric("123456"),
ZetaSqlTypesUtils.bigDecimalAsNumeric("-0.009876"),
ZetaSqlCalciteTranslationUtils.ZETASQL_NUMERIC_MIN_VALUE,
ZetaSqlCalciteTranslationUtils.ZETASQL_NUMERIC_MAX_VALUE)
.build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testNumericColumn() {
String sql = "SELECT numeric_field FROM table_with_numeric";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema = Schema.builder().addDecimalField("f_numeric").build();
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(schema)
.addValues(ZetaSqlTypesUtils.bigDecimalAsNumeric("123.4567"))
.build(),
Row.withSchema(schema)
.addValues(ZetaSqlTypesUtils.bigDecimalAsNumeric("765.4321"))
.build(),
Row.withSchema(schema)
.addValues(ZetaSqlTypesUtils.bigDecimalAsNumeric("-555.5555"))
.build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testArithmeticOperatorsNumeric() {
String sql =
"SELECT - NUMERIC '1.23456e05', "
+ "NUMERIC '1.23456e05' + NUMERIC '9.876e-3', "
+ "NUMERIC '1.23456e05' - NUMERIC '-9.876e-3', "
+ "NUMERIC '1.23e02' * NUMERIC '-1.001e-3', "
+ "NUMERIC '-1.23123e-1' / NUMERIC '-1.001e-3', ";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(
Schema.builder()
.addDecimalField("f_numeric1")
.addDecimalField("f_numeric2")
.addDecimalField("f_numeric3")
.addDecimalField("f_numeric4")
.addDecimalField("f_numeric5")
.build())
.addValues(
ZetaSqlTypesUtils.bigDecimalAsNumeric("-123456"),
ZetaSqlTypesUtils.bigDecimalAsNumeric("123456.009876"),
ZetaSqlTypesUtils.bigDecimalAsNumeric("123456.009876"),
ZetaSqlTypesUtils.bigDecimalAsNumeric("-0.123123"),
ZetaSqlTypesUtils.bigDecimalAsNumeric("123"))
.build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testAbsNumeric() {
String sql = "SELECT ABS(NUMERIC '1.23456e04'), ABS(NUMERIC '-1.23456e04')";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(
Schema.builder()
.addDecimalField("f_numeric1")
.addDecimalField("f_numeric2")
.build())
.addValues(
ZetaSqlTypesUtils.bigDecimalAsNumeric("12345.6"),
ZetaSqlTypesUtils.bigDecimalAsNumeric("12345.6"))
.build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testSignNumeric() {
String sql = "SELECT SIGN(NUMERIC '0'), SIGN(NUMERIC '1.23e01'), SIGN(NUMERIC '-1.23e01')";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(
Schema.builder()
.addDecimalField("f_numeric1")
.addDecimalField("f_numeric2")
.addDecimalField("f_numeric3")
.build())
.addValues(
ZetaSqlTypesUtils.bigDecimalAsNumeric("0"),
ZetaSqlTypesUtils.bigDecimalAsNumeric("1"),
ZetaSqlTypesUtils.bigDecimalAsNumeric("-1"))
.build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testRoundNumeric() {
String sql = "SELECT ROUND(NUMERIC '1.23456e04'), ROUND(NUMERIC '-1.234567e04', 1)";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(
Schema.builder()
.addDecimalField("f_numeric1")
.addDecimalField("f_numeric2")
.build())
.addValues(
ZetaSqlTypesUtils.bigDecimalAsNumeric("12346"),
ZetaSqlTypesUtils.bigDecimalAsNumeric("-12345.7"))
.build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testTruncNumeric() {
String sql = "SELECT TRUNC(NUMERIC '1.23456e04'), TRUNC(NUMERIC '-1.234567e04', 1)";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(
Schema.builder()
.addDecimalField("f_numeric1")
.addDecimalField("f_numeric2")
.build())
.addValues(
ZetaSqlTypesUtils.bigDecimalAsNumeric("12345"),
ZetaSqlTypesUtils.bigDecimalAsNumeric("-12345.6"))
.build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testCeilNumeric() {
String sql = "SELECT CEIL(NUMERIC '1.23456e04'), CEIL(NUMERIC '-1.23456e04')";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(
Schema.builder()
.addDecimalField("f_numeric1")
.addDecimalField("f_numeric2")
.build())
.addValues(
ZetaSqlTypesUtils.bigDecimalAsNumeric("12346"),
ZetaSqlTypesUtils.bigDecimalAsNumeric("-12345"))
.build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testFloorNumeric() {
String sql = "SELECT FLOOR(NUMERIC '1.23456e04'), FLOOR(NUMERIC '-1.23456e04')";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(
Schema.builder()
.addDecimalField("f_numeric1")
.addDecimalField("f_numeric2")
.build())
.addValues(
ZetaSqlTypesUtils.bigDecimalAsNumeric("12345"),
ZetaSqlTypesUtils.bigDecimalAsNumeric("-12346"))
.build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testModNumeric() {
String sql = "SELECT MOD(NUMERIC '1.23456e05', NUMERIC '5')";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(Schema.builder().addDecimalField("f_numeric").build())
.addValues(ZetaSqlTypesUtils.bigDecimalAsNumeric("1"))
.build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testDivNumeric() {
String sql = "SELECT DIV(NUMERIC '1.23456e05', NUMERIC '5')";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(Schema.builder().addDecimalField("f_numeric").build())
.addValues(ZetaSqlTypesUtils.bigDecimalAsNumeric("24691"))
.build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testSafeArithmeticFunctionsNumeric() {
String sql =
"SELECT SAFE_ADD(NUMERIC '99999999999999999999999999999.999999999', NUMERIC '1'), "
+ "SAFE_SUBTRACT(NUMERIC '-99999999999999999999999999999.999999999', NUMERIC '1'), "
+ "SAFE_MULTIPLY(NUMERIC '99999999999999999999999999999.999999999', NUMERIC '2'), "
+ "SAFE_DIVIDE(NUMERIC '1.23456e05', NUMERIC '0'), "
+ "SAFE_NEGATE(NUMERIC '99999999999999999999999999999.999999999')";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(
Schema.builder()
.addNullableField("f_numeric1", Schema.FieldType.DECIMAL)
.addNullableField("f_numeric2", Schema.FieldType.DECIMAL)
.addNullableField("f_numeric3", Schema.FieldType.DECIMAL)
.addNullableField("f_numeric4", Schema.FieldType.DECIMAL)
.addNullableField("f_numeric5", Schema.FieldType.DECIMAL)
.build())
.addValues(
null,
null,
null,
null,
ZetaSqlCalciteTranslationUtils.ZETASQL_NUMERIC_MIN_VALUE)
.build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testSqrtNumeric() {
String sql = "SELECT SQRT(NUMERIC '4')";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(Schema.builder().addDecimalField("f_numeric").build())
.addValues(ZetaSqlTypesUtils.bigDecimalAsNumeric("2"))
.build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testPowNumeric() {
String sql = "SELECT POW(NUMERIC '2', NUMERIC '3')";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(Schema.builder().addDecimalField("f_numeric").build())
.addValues(ZetaSqlTypesUtils.bigDecimalAsNumeric("8"))
.build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testExpNumeric() {
String sql = "SELECT EXP(NUMERIC '2')";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(Schema.builder().addDecimalField("f_numeric").build())
.addValues(ZetaSqlTypesUtils.bigDecimalAsNumeric("7.389056099"))
.build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testLnNumeric() {
String sql = "SELECT LN(NUMERIC '7.389056099')";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(Schema.builder().addDecimalField("f_numeric").build())
.addValues(ZetaSqlTypesUtils.bigDecimalAsNumeric("2"))
.build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testLog10Numeric() {
String sql = "SELECT LOG10(NUMERIC '100')";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(Schema.builder().addDecimalField("f_numeric").build())
.addValues(ZetaSqlTypesUtils.bigDecimalAsNumeric("2"))
.build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testLogNumeric() {
String sql = "SELECT LOG(NUMERIC '2.25', NUMERIC '1.5')";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(Schema.builder().addDecimalField("f_numeric").build())
.addValues(ZetaSqlTypesUtils.bigDecimalAsNumeric("2"))
.build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testSumNumeric() {
String sql = "SELECT SUM(numeric_field) FROM table_with_numeric";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(Schema.builder().addDecimalField("f_numeric").build())
.addValues(ZetaSqlTypesUtils.bigDecimalAsNumeric("333.3333"))
.build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testAvgNumeric() {
String sql = "SELECT AVG(numeric_field) FROM table_with_numeric";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(Schema.builder().addDecimalField("f_numeric").build())
.addValues(ZetaSqlTypesUtils.bigDecimalAsNumeric("111.1111"))
.build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
}