| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.beam.sdk.extensions.sql.zetasql; |
| |
| import static org.apache.beam.sdk.extensions.sql.zetasql.DateTimeUtils.parseDate; |
| import static org.apache.beam.sdk.extensions.sql.zetasql.DateTimeUtils.parseDateToValue; |
| import static org.apache.beam.sdk.extensions.sql.zetasql.DateTimeUtils.parseTime; |
| import static org.apache.beam.sdk.extensions.sql.zetasql.DateTimeUtils.parseTimeToValue; |
| import static org.apache.beam.sdk.extensions.sql.zetasql.DateTimeUtils.parseTimestampWithTZToValue; |
| import static org.apache.beam.sdk.extensions.sql.zetasql.DateTimeUtils.parseTimestampWithTimeZone; |
| import static org.apache.beam.sdk.extensions.sql.zetasql.DateTimeUtils.parseTimestampWithUTCTimeZone; |
| import static org.apache.beam.sdk.extensions.sql.zetasql.TestInput.AGGREGATE_TABLE_ONE; |
| import static org.apache.beam.sdk.extensions.sql.zetasql.TestInput.AGGREGATE_TABLE_TWO; |
| import static org.apache.beam.sdk.extensions.sql.zetasql.TestInput.BASIC_TABLE_ONE; |
| import static org.apache.beam.sdk.extensions.sql.zetasql.TestInput.BASIC_TABLE_THREE; |
| import static org.apache.beam.sdk.extensions.sql.zetasql.TestInput.BASIC_TABLE_TWO; |
| import static org.apache.beam.sdk.extensions.sql.zetasql.TestInput.TABLE_ALL_NULL; |
| import static org.apache.beam.sdk.extensions.sql.zetasql.TestInput.TABLE_ALL_TYPES; |
| import static org.apache.beam.sdk.extensions.sql.zetasql.TestInput.TABLE_ALL_TYPES_2; |
| import static org.apache.beam.sdk.extensions.sql.zetasql.TestInput.TABLE_EMPTY; |
| import static org.apache.beam.sdk.extensions.sql.zetasql.TestInput.TABLE_FOR_CASE_WHEN; |
| import static org.apache.beam.sdk.extensions.sql.zetasql.TestInput.TABLE_WITH_ARRAY; |
| import static org.apache.beam.sdk.extensions.sql.zetasql.TestInput.TABLE_WITH_ARRAY_FOR_UNNEST; |
| import static org.apache.beam.sdk.extensions.sql.zetasql.TestInput.TABLE_WITH_MAP; |
| import static org.apache.beam.sdk.extensions.sql.zetasql.TestInput.TABLE_WITH_STRUCT; |
| import static org.apache.beam.sdk.extensions.sql.zetasql.TestInput.TABLE_WITH_STRUCT_TIMESTAMP_STRING; |
| import static org.apache.beam.sdk.extensions.sql.zetasql.TestInput.TABLE_WITH_STRUCT_TWO; |
| import static org.apache.beam.sdk.extensions.sql.zetasql.TestInput.TIMESTAMP_TABLE_ONE; |
| import static org.apache.beam.sdk.extensions.sql.zetasql.TestInput.TIMESTAMP_TABLE_TWO; |
| import static org.apache.beam.sdk.extensions.sql.zetasql.TestInput.TIME_TABLE; |
| import static org.apache.beam.sdk.schemas.Schema.FieldType.DATETIME; |
| |
| import com.google.protobuf.ByteString; |
| import com.google.zetasql.SqlException; |
| import com.google.zetasql.StructType.StructField; |
| import com.google.zetasql.TypeFactory; |
| import com.google.zetasql.Value; |
| import com.google.zetasql.ZetaSQLType.TypeKind; |
| import com.google.zetasql.ZetaSQLValue.ValueProto; |
| import java.nio.charset.StandardCharsets; |
| import java.util.Arrays; |
| import java.util.HashMap; |
| import java.util.Map; |
| import org.apache.beam.sdk.extensions.sql.impl.JdbcConnection; |
| import org.apache.beam.sdk.extensions.sql.impl.JdbcDriver; |
| import org.apache.beam.sdk.extensions.sql.impl.planner.BeamRuleSets; |
| import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode; |
| import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils; |
| import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable; |
| import org.apache.beam.sdk.extensions.sql.meta.provider.ReadOnlyTableProvider; |
| import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider; |
| import org.apache.beam.sdk.options.PipelineOptionsFactory; |
| import org.apache.beam.sdk.schemas.Schema; |
| import org.apache.beam.sdk.schemas.Schema.Field; |
| import org.apache.beam.sdk.schemas.Schema.FieldType; |
| import org.apache.beam.sdk.testing.PAssert; |
| import org.apache.beam.sdk.testing.TestPipeline; |
| import org.apache.beam.sdk.values.PCollection; |
| import org.apache.beam.sdk.values.Row; |
| import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.Context; |
| import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.Contexts; |
| import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.ConventionTraitDef; |
| import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelTraitDef; |
| import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.schema.SchemaPlus; |
| import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.tools.FrameworkConfig; |
| import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.tools.Frameworks; |
| import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; |
| import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; |
| import org.joda.time.DateTime; |
| import org.joda.time.Duration; |
| import org.joda.time.chrono.ISOChronology; |
| import org.junit.Assert; |
| import org.junit.Before; |
| import org.junit.Ignore; |
| import org.junit.Rule; |
| import org.junit.Test; |
| import org.junit.rules.ExpectedException; |
| import org.junit.runner.RunWith; |
| import org.junit.runners.JUnit4; |
| |
| /** ZetaSQLDialectSpecTest. */ |
| @RunWith(JUnit4.class) |
| public class ZetaSQLDialectSpecTest { |
| private static final Long PIPELINE_EXECUTION_WAITTIME_MINUTES = 2L; |
| |
| private FrameworkConfig config; |
| |
| private TableProvider tableProvider; |
| |
| @Rule public transient TestPipeline pipeline = TestPipeline.create(); |
| @Rule public ExpectedException thrown = ExpectedException.none(); |
| |
| @Before |
| public void setUp() { |
| initializeBeamTableProvider(); |
| initializeCalciteEnvironment(); |
| } |
| |
| @Test |
| public void testSimpleSelect() { |
| String sql = |
| "SELECT CAST (1243 as INT64), " |
| + "CAST ('2018-09-15 12:59:59.000000+00' as TIMESTAMP), " |
| + "CAST ('string' as STRING);"; |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| final Schema schema = |
| Schema.builder() |
| .addInt64Field("field1") |
| .addDateTimeField("field2") |
| .addStringField("field3") |
| .build(); |
| |
| PAssert.that(stream) |
| .containsInAnyOrder( |
| Row.withSchema(schema) |
| .addValues( |
| 1243L, |
| new DateTime(2018, 9, 15, 12, 59, 59, ISOChronology.getInstanceUTC()), |
| "string") |
| .build()); |
| |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testEQ1() { |
| String sql = "SELECT @p0 = @p1 AS ColA"; |
| |
| ImmutableMap<String, Value> params = |
| ImmutableMap.<String, Value>builder() |
| .put("p0", Value.createSimpleNullValue(TypeKind.TYPE_BOOL)) |
| .put("p1", Value.createBoolValue(true)) |
| .build(); |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| final Schema schema = Schema.builder().addNullableField("field1", FieldType.BOOLEAN).build(); |
| |
| PAssert.that(stream) |
| .containsInAnyOrder(Row.withSchema(schema).addValues((Boolean) null).build()); |
| |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| @Ignore( |
| "Does not support inf/-inf/nan in double/float literals because double/float literals are" |
| + " converted to BigDecimal in Calcite codegen.") |
| public void testEQ2() { |
| String sql = "SELECT @p0 = @p1 AS ColA"; |
| |
| ImmutableMap<String, Value> params = |
| ImmutableMap.<String, Value>builder() |
| .put("p0", Value.createDoubleValue(0)) |
| .put("p1", Value.createDoubleValue(Double.POSITIVE_INFINITY)) |
| .build(); |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| |
| final Schema schema = Schema.builder().addBooleanField("field1").build(); |
| |
| PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValues(false).build()); |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testEQ3() { |
| String sql = "SELECT @p0 = @p1 AS ColA"; |
| |
| ImmutableMap<String, Value> params = |
| ImmutableMap.<String, Value>builder() |
| .put("p0", Value.createSimpleNullValue(TypeKind.TYPE_DOUBLE)) |
| .put("p1", Value.createDoubleValue(3.14)) |
| .build(); |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| |
| final Schema schema = Schema.builder().addNullableField("field1", FieldType.BOOLEAN).build(); |
| |
| PAssert.that(stream) |
| .containsInAnyOrder(Row.withSchema(schema).addValues((Boolean) null).build()); |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testEQ4() { |
| String sql = "SELECT @p0 = @p1 AS ColA"; |
| |
| ImmutableMap<String, Value> params = |
| ImmutableMap.<String, Value>builder() |
| .put("p0", Value.createBytesValue(ByteString.copyFromUtf8("hello"))) |
| .put("p1", Value.createBytesValue(ByteString.copyFromUtf8("hello"))) |
| .build(); |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| |
| final Schema schema = Schema.builder().addNullableField("field1", FieldType.BOOLEAN).build(); |
| |
| PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValues(true).build()); |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testEQ5() { |
| String sql = "SELECT b'hello' = b'hello' AS ColA"; |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| |
| final Schema schema = Schema.builder().addNullableField("field1", FieldType.BOOLEAN).build(); |
| |
| PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValues(true).build()); |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testIsNotNull1() { |
| String sql = "SELECT @p0 IS NOT NULL AS ColA"; |
| ImmutableMap<String, Value> params = |
| ImmutableMap.of("p0", Value.createSimpleNullValue(TypeKind.TYPE_STRING)); |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| |
| final Schema schema = Schema.builder().addNullableField("field1", FieldType.BOOLEAN).build(); |
| |
| PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValues(false).build()); |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testIsNotNull2() { |
| String sql = "SELECT @p0 IS NOT NULL AS ColA"; |
| ImmutableMap<String, Value> params = |
| ImmutableMap.of( |
| "p0", |
| Value.createNullValue( |
| TypeFactory.createArrayType(TypeFactory.createSimpleType(TypeKind.TYPE_INT64)))); |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| |
| final Schema schema = Schema.builder().addNullableField("field1", FieldType.BOOLEAN).build(); |
| |
| PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValues(false).build()); |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| @Ignore("Does not support struct literal.") |
| public void testIsNotNull3() { |
| String sql = "SELECT @p0 IS NOT NULL AS ColA"; |
| ImmutableMap<String, Value> params = |
| ImmutableMap.of( |
| "p0", |
| Value.createNullValue( |
| TypeFactory.createStructType( |
| Arrays.asList( |
| new StructField( |
| "a", TypeFactory.createSimpleType(TypeKind.TYPE_STRING)))))); |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| |
| final Schema schema = Schema.builder().addNullableField("field1", FieldType.BOOLEAN).build(); |
| |
| PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValues(false).build()); |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testIfBasic() { |
| String sql = "SELECT IF(@p0, @p1, @p2) AS ColA"; |
| ImmutableMap<String, Value> params = |
| ImmutableMap.of( |
| "p0", |
| Value.createBoolValue(true), |
| "p1", |
| Value.createInt64Value(1), |
| "p2", |
| Value.createInt64Value(2)); |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| |
| final Schema schema = Schema.builder().addNullableField("field1", FieldType.INT64).build(); |
| |
| PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValues(1L).build()); |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testCoalesceBasic() { |
| String sql = "SELECT COALESCE(@p0, @p1, @p2) AS ColA"; |
| ImmutableMap<String, Value> params = |
| ImmutableMap.of( |
| "p0", |
| Value.createSimpleNullValue(TypeKind.TYPE_STRING), |
| "p1", |
| Value.createStringValue("yay"), |
| "p2", |
| Value.createStringValue("nay")); |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| |
| final Schema schema = Schema.builder().addNullableField("field1", FieldType.STRING).build(); |
| |
| PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValues("yay").build()); |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testCoalesceSingleArgument() { |
| String sql = "SELECT COALESCE(@p0) AS ColA"; |
| ImmutableMap<String, Value> params = |
| ImmutableMap.of("p0", Value.createSimpleNullValue(TypeKind.TYPE_INT64)); |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| |
| final Schema schema = |
| Schema.builder().addNullableField("field1", FieldType.array(FieldType.INT64)).build(); |
| |
| PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValue(null).build()); |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testCoalesceNullArray() { |
| String sql = "SELECT COALESCE(@p0, @p1) AS ColA"; |
| ImmutableMap<String, Value> params = |
| ImmutableMap.of( |
| "p0", |
| Value.createNullValue( |
| TypeFactory.createArrayType(TypeFactory.createSimpleType(TypeKind.TYPE_INT64))), |
| "p1", |
| Value.createNullValue( |
| TypeFactory.createArrayType(TypeFactory.createSimpleType(TypeKind.TYPE_INT64)))); |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| |
| final Schema schema = |
| Schema.builder().addNullableField("field1", FieldType.array(FieldType.INT64)).build(); |
| |
| PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValue(null).build()); |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testNullIfCoercion() { |
| String sql = "SELECT NULLIF(@p0, @p1) AS ColA"; |
| ImmutableMap<String, Value> params = |
| ImmutableMap.of( |
| "p0", |
| Value.createInt64Value(3L), |
| "p1", |
| Value.createSimpleNullValue(TypeKind.TYPE_DOUBLE)); |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| |
| final Schema schema = Schema.builder().addNullableField("field1", FieldType.DOUBLE).build(); |
| |
| PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValue(3.0).build()); |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| @Ignore("Struct literals are not currently supported") |
| public void testCoalesceNullStruct() { |
| String sql = "SELECT COALESCE(NULL, STRUCT(\"a\" AS s, -33 AS i))"; |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); |
| |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| |
| final Schema innerSchema = |
| Schema.of(Field.of("s", FieldType.STRING), Field.of("i", FieldType.INT64)); |
| final Schema schema = |
| Schema.builder().addNullableField("field1", FieldType.row(innerSchema)).build(); |
| |
| PAssert.that(stream) |
| .containsInAnyOrder( |
| Row.withSchema(schema) |
| .addValue(Row.withSchema(innerSchema).addValues("a", -33).build()) |
| .build()); |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testIfTimestamp() { |
| String sql = "SELECT IF(@p0, @p1, @p2) AS ColA"; |
| ImmutableMap<String, Value> params = |
| ImmutableMap.of( |
| "p0", |
| Value.createBoolValue(false), |
| "p1", |
| Value.createTimestampValueFromUnixMicros(0), |
| "p2", |
| Value.createTimestampValueFromUnixMicros( |
| DateTime.parse("2019-01-01T00:00:00Z").getMillis() * 1000)); |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| |
| final Schema schema = Schema.builder().addNullableField("field1", DATETIME).build(); |
| |
| PAssert.that(stream) |
| .containsInAnyOrder( |
| Row.withSchema(schema).addValues(DateTime.parse("2019-01-01T00:00:00Z")).build()); |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| @Ignore("$make_array is not implemented") |
| public void testMakeArray() { |
| String sql = "SELECT [s3, s1, s2] FROM (SELECT \"foo\" AS s1, \"bar\" AS s2, \"baz\" AS s3);"; |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); |
| |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| |
| final Schema schema = |
| Schema.builder().addNullableField("field1", FieldType.array(FieldType.STRING)).build(); |
| |
| PAssert.that(stream) |
| .containsInAnyOrder( |
| Row.withSchema(schema).addValue(ImmutableList.of("baz", "foo", "bar")).build()); |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testNullIfPositive() { |
| String sql = "SELECT NULLIF(@p0, @p1) AS ColA"; |
| ImmutableMap<String, Value> params = |
| ImmutableMap.of( |
| "p0", Value.createStringValue("null"), "p1", Value.createStringValue("null")); |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| |
| final Schema schema = Schema.builder().addNullableField("field1", FieldType.STRING).build(); |
| |
| PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValue(null).build()); |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testNullIfNegative() { |
| String sql = "SELECT NULLIF(@p0, @p1) AS ColA"; |
| ImmutableMap<String, Value> params = |
| ImmutableMap.of( |
| "p0", Value.createStringValue("foo"), "p1", Value.createStringValue("null")); |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| |
| final Schema schema = Schema.builder().addNullableField("field1", FieldType.STRING).build(); |
| |
| PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValues("foo").build()); |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testIfNullPositive() { |
| String sql = "SELECT IFNULL(@p0, @p1) AS ColA"; |
| ImmutableMap<String, Value> params = |
| ImmutableMap.of( |
| "p0", Value.createStringValue("foo"), "p1", Value.createStringValue("default")); |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| |
| final Schema schema = Schema.builder().addNullableField("field1", FieldType.STRING).build(); |
| |
| PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValues("foo").build()); |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testIfNullNegative() { |
| String sql = "SELECT IFNULL(@p0, @p1) AS ColA"; |
| ImmutableMap<String, Value> params = |
| ImmutableMap.of( |
| "p0", |
| Value.createSimpleNullValue(TypeKind.TYPE_STRING), |
| "p1", |
| Value.createStringValue("yay")); |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| |
| final Schema schema = Schema.builder().addNullableField("field1", FieldType.STRING).build(); |
| |
| PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValues("yay").build()); |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| @Ignore("Throws IndexOutOfBoundsException") |
| public void testConstructEmptyArrayLiteral() { |
| String sql = "SELECT @p0 AS ColA"; |
| ImmutableMap<String, Value> params = |
| ImmutableMap.of( |
| "p0", |
| Value.createArrayValue( |
| TypeFactory.createArrayType(TypeFactory.createSimpleType(TypeKind.TYPE_INT64)), |
| ImmutableList.of())); |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| |
| final Schema schema = Schema.builder().addArrayField("field1", FieldType.INT64).build(); |
| |
| PAssert.that(stream) |
| .containsInAnyOrder(Row.withSchema(schema).addValue(ImmutableList.of()).build()); |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| @Ignore("") |
| public void testLike1() { |
| String sql = "SELECT @p0 LIKE @p1 AS ColA"; |
| ImmutableMap<String, Value> params = |
| ImmutableMap.of( |
| "p0", Value.createStringValue("ab%"), "p1", Value.createStringValue("ab\\%")); |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| |
| final Schema schema = Schema.builder().addNullableField("field1", FieldType.BOOLEAN).build(); |
| |
| PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValues(true).build()); |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testLikeNullPattern() { |
| String sql = "SELECT @p0 LIKE @p1 AS ColA"; |
| ImmutableMap<String, Value> params = |
| ImmutableMap.of( |
| "p0", |
| Value.createStringValue("ab%"), |
| "p1", |
| Value.createSimpleNullValue(TypeKind.TYPE_STRING)); |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| |
| final Schema schema = Schema.builder().addNullableField("field1", FieldType.BOOLEAN).build(); |
| |
| PAssert.that(stream) |
| .containsInAnyOrder(Row.withSchema(schema).addValues((Object) null).build()); |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| @Ignore("") |
| public void testLikeAllowsEscapingNonSpecialCharacter() { |
| String sql = "SELECT @p0 LIKE @p1 AS ColA"; |
| ImmutableMap<String, Value> params = |
| ImmutableMap.of("p0", Value.createStringValue("ab"), "p1", Value.createStringValue("\\ab")); |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| |
| final Schema schema = Schema.builder().addNullableField("field1", FieldType.BOOLEAN).build(); |
| |
| PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValues(true).build()); |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| @Ignore("") |
| public void testLikeAllowsEscapingBackslash() { |
| String sql = "SELECT @p0 LIKE @p1 AS ColA"; |
| ImmutableMap<String, Value> params = |
| ImmutableMap.of( |
| "p0", Value.createStringValue("a\\c"), "p1", Value.createStringValue("a\\\\c")); |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| |
| final Schema schema = Schema.builder().addNullableField("field1", FieldType.BOOLEAN).build(); |
| |
| PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValues(true).build()); |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| @Ignore("Currently non UTF-8 values are coerced to UTF-8") |
| public void testThrowsErrorForNonUTF8() { |
| String sql = "SELECT @p0 LIKE @p1 AS ColA"; |
| byte[] bytes = {(byte) 0xe8, (byte) 0xb0}; |
| Value bad = |
| Value.deserialize( |
| TypeFactory.createSimpleType(TypeKind.TYPE_STRING), |
| ValueProto.newBuilder().setStringValueBytes(ByteString.copyFrom(bytes)).build()); |
| ImmutableMap<String, Value> params = |
| ImmutableMap.of("p0", Value.createStringValue("abc"), "p1", bad); |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| thrown.expect(RuntimeException.class); |
| // TODO: message should be a constant on ZetaSQLPlannerImpl |
| thrown.expectMessage("invalid UTF-8"); |
| zetaSQLQueryPlanner.convertToBeamRel(sql, params); |
| } |
| |
| @Test |
| @Ignore("Does not support BYTES for like") |
| public void testLikeBytes() { |
| String sql = "SELECT @p0 LIKE @p1 AS ColA"; |
| ImmutableMap<String, Value> params = |
| ImmutableMap.of( |
| "p0", |
| Value.createBytesValue(ByteString.copyFromUtf8("abcd")), |
| "p1", |
| Value.createBytesValue(ByteString.copyFromUtf8("__%"))); |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| |
| final Schema schema = Schema.builder().addNullableField("field1", FieldType.BOOLEAN).build(); |
| |
| PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValues(true).build()); |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testMod() { |
| String sql = "SELECT MOD(4, 2)"; |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| |
| final Schema schema = Schema.builder().addInt64Field("field1").build(); |
| |
| PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValues(0L).build()); |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testSimpleUnionAll() { |
| String sql = |
| "SELECT CAST (1243 as INT64), " |
| + "CAST ('2018-09-15 12:59:59.000000+00' as TIMESTAMP), " |
| + "CAST ('string' as STRING) " |
| + " UNION ALL " |
| + " SELECT CAST (1243 as INT64), " |
| + "CAST ('2018-09-15 12:59:59.000000+00' as TIMESTAMP), " |
| + "CAST ('string' as STRING);"; |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| |
| final Schema schema = |
| Schema.builder() |
| .addInt64Field("field1") |
| .addDateTimeField("field2") |
| .addStringField("field3") |
| .build(); |
| |
| PAssert.that(stream) |
| .containsInAnyOrder( |
| Row.withSchema(schema) |
| .addValues( |
| 1243L, |
| new DateTime(2018, 9, 15, 12, 59, 59, ISOChronology.getInstanceUTC()), |
| "string") |
| .build(), |
| Row.withSchema(schema) |
| .addValues( |
| 1243L, |
| new DateTime(2018, 9, 15, 12, 59, 59, ISOChronology.getInstanceUTC()), |
| "string") |
| .build()); |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testThreeWayUnionAll() { |
| String sql = "SELECT a FROM (SELECT 1 a UNION ALL SELECT 2 UNION ALL SELECT 3)"; |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| |
| final Schema schema = Schema.builder().addInt64Field("field1").build(); |
| |
| PAssert.that(stream) |
| .containsInAnyOrder( |
| Row.withSchema(schema).addValues(1L).build(), |
| Row.withSchema(schema).addValues(2L).build(), |
| Row.withSchema(schema).addValues(3L).build()); |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testSimpleUnionDISTINCT() { |
| String sql = |
| "SELECT CAST (1243 as INT64), " |
| + "CAST ('2018-09-15 12:59:59.000000+00' as TIMESTAMP), " |
| + "CAST ('string' as STRING) " |
| + " UNION DISTINCT " |
| + " SELECT CAST (1243 as INT64), " |
| + "CAST ('2018-09-15 12:59:59.000000+00' as TIMESTAMP), " |
| + "CAST ('string' as STRING);"; |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| |
| final Schema schema = |
| Schema.builder() |
| .addInt64Field("field1") |
| .addDateTimeField("field2") |
| .addStringField("field3") |
| .build(); |
| |
| PAssert.that(stream) |
| .containsInAnyOrder( |
| Row.withSchema(schema) |
| .addValues( |
| 1243L, |
| new DateTime(2018, 9, 15, 12, 59, 59, ISOChronology.getInstanceUTC()), |
| "string") |
| .build()); |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testZetaSQLInnerJoin() { |
| String sql = |
| "SELECT t1.Key " |
| + "FROM KeyValue AS t1" |
| + " INNER JOIN BigTable AS t2" |
| + " on " |
| + " t1.Key = t2.RowKey AND t1.ts = t2.ts"; |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| |
| PAssert.that(stream) |
| .containsInAnyOrder( |
| Row.withSchema(Schema.builder().addInt64Field("field1").build()) |
| .addValues(15L) |
| .build()); |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| // JOIN USING(col) is equivalent to JOIN on left.col = right.col. |
| public void testZetaSQLInnerJoinWithUsing() { |
| String sql = "SELECT t1.Key " + "FROM KeyValue AS t1" + " INNER JOIN BigTable AS t2 USING(ts)"; |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| |
| PAssert.that(stream) |
| .containsInAnyOrder( |
| Row.withSchema(Schema.builder().addInt64Field("field1").build()) |
| .addValues(15L) |
| .build()); |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| // testing ordering of the JOIN conditions. |
| public void testZetaSQLInnerJoinTwo() { |
| String sql = |
| "SELECT t2.RowKey " |
| + "FROM KeyValue AS t1" |
| + " INNER JOIN BigTable AS t2" |
| + " on " |
| + " t2.RowKey = t1.Key AND t2.ts = t1.ts"; |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| |
| PAssert.that(stream) |
| .containsInAnyOrder( |
| Row.withSchema(Schema.builder().addInt64Field("field1").build()) |
| .addValues(15L) |
| .build()); |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testZetaSQLLeftOuterJoin() { |
| String sql = |
| "SELECT * " |
| + "FROM KeyValue AS t1" |
| + " LEFT JOIN BigTable AS t2" |
| + " on " |
| + " t1.Key = t2.RowKey"; |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| |
| final Schema schemaOne = |
| Schema.builder() |
| .addInt64Field("field1") |
| .addStringField("field2") |
| .addDateTimeField("field3") |
| .addNullableField("field4", FieldType.INT64) |
| .addNullableField("field5", FieldType.STRING) |
| .addNullableField("field6", DATETIME) |
| .build(); |
| |
| final Schema schemaTwo = |
| Schema.builder() |
| .addInt64Field("field1") |
| .addStringField("field2") |
| .addDateTimeField("field3") |
| .addInt64Field("field4") |
| .addStringField("field5") |
| .addDateTimeField("field6") |
| .build(); |
| |
| PAssert.that(stream) |
| .containsInAnyOrder( |
| Row.withSchema(schemaOne) |
| .addValues( |
| 14L, |
| "KeyValue234", |
| new DateTime(2018, 7, 1, 21, 26, 6, ISOChronology.getInstanceUTC()), |
| null, |
| null, |
| null) |
| .build(), |
| Row.withSchema(schemaTwo) |
| .addValues( |
| 15L, |
| "KeyValue235", |
| new DateTime(2018, 7, 1, 21, 26, 7, ISOChronology.getInstanceUTC()), |
| 15L, |
| "BigTable235", |
| new DateTime(2018, 7, 1, 21, 26, 7, ISOChronology.getInstanceUTC())) |
| .build()); |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testZetaSQLRightOuterJoin() { |
| String sql = |
| "SELECT * " |
| + "FROM KeyValue AS t1" |
| + " RIGHT JOIN BigTable AS t2" |
| + " on " |
| + " t1.Key = t2.RowKey"; |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| |
| final Schema schemaOne = |
| Schema.builder() |
| .addNullableField("field1", FieldType.INT64) |
| .addNullableField("field2", FieldType.STRING) |
| .addNullableField("field3", DATETIME) |
| .addInt64Field("field4") |
| .addStringField("field5") |
| .addDateTimeField("field6") |
| .build(); |
| |
| final Schema schemaTwo = |
| Schema.builder() |
| .addInt64Field("field1") |
| .addStringField("field2") |
| .addDateTimeField("field3") |
| .addInt64Field("field4") |
| .addStringField("field5") |
| .addDateTimeField("field6") |
| .build(); |
| |
| PAssert.that(stream) |
| .containsInAnyOrder( |
| Row.withSchema(schemaOne) |
| .addValues( |
| null, |
| null, |
| null, |
| 16L, |
| "BigTable236", |
| new DateTime(2018, 7, 1, 21, 26, 8, ISOChronology.getInstanceUTC())) |
| .build(), |
| Row.withSchema(schemaTwo) |
| .addValues( |
| 15L, |
| "KeyValue235", |
| new DateTime(2018, 7, 1, 21, 26, 7, ISOChronology.getInstanceUTC()), |
| 15L, |
| "BigTable235", |
| new DateTime(2018, 7, 1, 21, 26, 7, ISOChronology.getInstanceUTC())) |
| .build()); |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testZetaSQLFullOuterJoin() { |
| String sql = |
| "SELECT * " |
| + "FROM KeyValue AS t1" |
| + " FULL JOIN BigTable AS t2" |
| + " on " |
| + " t1.Key = t2.RowKey"; |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| |
| final Schema schemaOne = |
| Schema.builder() |
| .addNullableField("field1", FieldType.INT64) |
| .addNullableField("field2", FieldType.STRING) |
| .addNullableField("field3", DATETIME) |
| .addInt64Field("field4") |
| .addStringField("field5") |
| .addDateTimeField("field6") |
| .build(); |
| |
| final Schema schemaTwo = |
| Schema.builder() |
| .addInt64Field("field1") |
| .addStringField("field2") |
| .addDateTimeField("field3") |
| .addInt64Field("field4") |
| .addStringField("field5") |
| .addDateTimeField("field6") |
| .build(); |
| |
| final Schema schemaThree = |
| Schema.builder() |
| .addInt64Field("field1") |
| .addStringField("field2") |
| .addDateTimeField("field3") |
| .addNullableField("field4", FieldType.INT64) |
| .addNullableField("field5", FieldType.STRING) |
| .addNullableField("field6", DATETIME) |
| .build(); |
| |
| PAssert.that(stream) |
| .containsInAnyOrder( |
| Row.withSchema(schemaOne) |
| .addValues( |
| null, |
| null, |
| null, |
| 16L, |
| "BigTable236", |
| new DateTime(2018, 7, 1, 21, 26, 8, ISOChronology.getInstanceUTC())) |
| .build(), |
| Row.withSchema(schemaTwo) |
| .addValues( |
| 15L, |
| "KeyValue235", |
| new DateTime(2018, 7, 1, 21, 26, 7, ISOChronology.getInstanceUTC()), |
| 15L, |
| "BigTable235", |
| new DateTime(2018, 7, 1, 21, 26, 7, ISOChronology.getInstanceUTC())) |
| .build(), |
| Row.withSchema(schemaThree) |
| .addValues( |
| 14L, |
| "KeyValue234", |
| new DateTime(2018, 7, 1, 21, 26, 6, ISOChronology.getInstanceUTC()), |
| null, |
| null, |
| null) |
| .build()); |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| @Ignore("BeamSQL only supports equal join") |
| public void testZetaSQLFullOuterJoinTwo() { |
| String sql = |
| "SELECT * " |
| + "FROM KeyValue AS t1" |
| + " FULL JOIN BigTable AS t2" |
| + " on " |
| + " t1.Key + t2.RowKey = 30"; |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); |
| BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testZetaSQLThreeWayInnerJoin() { |
| String sql = |
| "SELECT t3.Value, t2.Value, t1.Value, t1.Key, t3.ColId FROM KeyValue as t1 " |
| + "JOIN BigTable as t2 " |
| + "ON (t1.Key = t2.RowKey) " |
| + "JOIN Spanner as t3 " |
| + "ON (t3.ColId = t1.Key)"; |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| |
| PAssert.that(stream) |
| .containsInAnyOrder( |
| Row.withSchema( |
| Schema.builder() |
| .addStringField("t3.Value") |
| .addStringField("t2.Value") |
| .addStringField("t1.Value") |
| .addInt64Field("t1.Key") |
| .addInt64Field("t3.ColId") |
| .build()) |
| .addValues("Spanner235", "BigTable235", "KeyValue235", 15L, 15L) |
| .build()); |
| |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testZetaSQLTableJoinOnItselfWithFiltering() { |
| String sql = |
| "SELECT * FROM Spanner as t1 " |
| + "JOIN Spanner as t2 " |
| + "ON (t1.ColId = t2.ColId) WHERE t1.ColId = 17"; |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| |
| PAssert.that(stream) |
| .containsInAnyOrder( |
| Row.withSchema( |
| Schema.builder() |
| .addInt64Field("field1") |
| .addStringField("field2") |
| .addInt64Field("field3") |
| .addStringField("field4") |
| .build()) |
| .addValues(17L, "Spanner237", 17L, "Spanner237") |
| .build()); |
| |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testZetaSQLSelectFromSelect() { |
| String sql = "SELECT * FROM (SELECT \"apple\" AS fruit, \"carrot\" AS vegetable);"; |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| |
| final Schema schema = |
| Schema.builder().addStringField("field1").addStringField("field2").build(); |
| |
| PAssert.that(stream) |
| .containsInAnyOrder(Row.withSchema(schema).addValues("apple", "carrot").build()); |
| |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| |
| Schema outputSchema = stream.getSchema(); |
| Assert.assertEquals(2, outputSchema.getFieldCount()); |
| Assert.assertEquals("fruit", outputSchema.getField(0).getName()); |
| Assert.assertEquals("vegetable", outputSchema.getField(1).getName()); |
| } |
| |
| @Test |
| public void testZetaSQLSelectFromTable() { |
| String sql = "SELECT Key, Value FROM KeyValue;"; |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| |
| final Schema schema = Schema.builder().addInt64Field("field1").addStringField("field2").build(); |
| |
| PAssert.that(stream) |
| .containsInAnyOrder( |
| Row.withSchema(schema).addValues(14L, "KeyValue234").build(), |
| Row.withSchema(schema).addValues(15L, "KeyValue235").build()); |
| |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testZetaSQLSelectFromTableLimit() { |
| String sql = "SELECT Key, Value FROM KeyValue LIMIT 2;"; |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| |
| final Schema schema = Schema.builder().addInt64Field("field1").addStringField("field2").build(); |
| PAssert.that(stream) |
| .containsInAnyOrder( |
| Row.withSchema(schema).addValues(14L, "KeyValue234").build(), |
| Row.withSchema(schema).addValues(15L, "KeyValue235").build()); |
| |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testZetaSQLSelectFromTableLimit0() { |
| String sql = "SELECT Key, Value FROM KeyValue LIMIT 0;"; |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| PAssert.that(stream).containsInAnyOrder(); |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testZetaSQLSelectFromTableLimitOffset() { |
| String sql = |
| "SELECT COUNT(a) FROM (\n" |
| + "SELECT a FROM (SELECT 1 a UNION ALL SELECT 2 UNION ALL SELECT 3) LIMIT 3 OFFSET 1);"; |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| final Schema schema = Schema.builder().addInt64Field("field1").build(); |
| PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValues(2L).build()); |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| // There is really no order for a PCollection, so this query does not test |
| // ORDER BY but just a test to see if ORDER BY LIMIT can work. |
| @Test |
| public void testZetaSQLSelectFromTableOrderByLimit() { |
| String sql = "SELECT Key, Value FROM KeyValue ORDER BY Key DESC LIMIT 2;"; |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| |
| final Schema schema = Schema.builder().addInt64Field("field1").addStringField("field2").build(); |
| PAssert.that(stream) |
| .containsInAnyOrder( |
| Row.withSchema(schema).addValues(14L, "KeyValue234").build(), |
| Row.withSchema(schema).addValues(15L, "KeyValue235").build()); |
| |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testZetaSQLSelectFromTableOrderBy() { |
| String sql = "SELECT Key, Value FROM KeyValue ORDER BY Key DESC;"; |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| thrown.expect(RuntimeException.class); |
| thrown.expectMessage("ORDER BY without a LIMIT is not supported."); |
| zetaSQLQueryPlanner.convertToBeamRel(sql); |
| } |
| |
| @Test |
| public void testZetaSQLSelectFromTableWithStructType2() { |
| String sql = |
| "SELECT table_with_struct.struct_col.struct_col_str FROM table_with_struct WHERE id = 1;"; |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| final Schema schema = Schema.builder().addStringField("field").build(); |
| PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValue("row_one").build()); |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testZetaSQLStructFieldAccessInFilter() { |
| String sql = |
| "SELECT table_with_struct.id FROM table_with_struct WHERE" |
| + " table_with_struct.struct_col.struct_col_str = 'row_one';"; |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| final Schema schema = Schema.builder().addInt64Field("field").build(); |
| PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValue(1L).build()); |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testZetaSQLStructFieldAccessInCast() { |
| String sql = |
| "SELECT CAST(table_with_struct.id AS STRING) FROM table_with_struct WHERE" |
| + " table_with_struct.struct_col.struct_col_str = 'row_one';"; |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| final Schema schema = Schema.builder().addStringField("field").build(); |
| PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValue("1").build()); |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testZetaSQLStructFieldAccessInCast2() { |
| String sql = |
| "SELECT CAST(A.struct_col.struct_col_str AS TIMESTAMP) FROM table_with_struct_ts_string AS" |
| + " A"; |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| final Schema schema = Schema.builder().addDateTimeField("field").build(); |
| PAssert.that(stream) |
| .containsInAnyOrder( |
| Row.withSchema(schema) |
| .addValue(parseTimestampWithUTCTimeZone("2019-01-15 13:21:03")) |
| .build()); |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testZetaSQLStructFieldAccessInTumble() { |
| String sql = |
| "SELECT TUMBLE_START('INTERVAL 1 MINUTE') FROM table_with_struct_ts_string AS A GROUP BY " |
| + "TUMBLE(CAST(A.struct_col.struct_col_str AS TIMESTAMP), 'INTERVAL 1 MINUTE')"; |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| final Schema schema = Schema.builder().addDateTimeField("field").build(); |
| PAssert.that(stream) |
| .containsInAnyOrder( |
| Row.withSchema(schema) |
| .addValue(parseTimestampWithUTCTimeZone("2019-01-15 13:21:00")) |
| .build()); |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testZetaSQLStructFieldAccessInGroupBy() { |
| String sql = "SELECT rowCol.row_id, COUNT(*) FROM table_with_struct_two GROUP BY rowCol.row_id"; |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| final Schema schema = Schema.builder().addInt64Field("field1").addInt64Field("field2").build(); |
| PAssert.that(stream) |
| .containsInAnyOrder( |
| Row.withSchema(schema).addValues(1L, 1L).build(), |
| Row.withSchema(schema).addValues(2L, 1L).build(), |
| Row.withSchema(schema).addValues(3L, 2L).build()); |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testZetaSQLStructFieldAccessInGroupBy2() { |
| String sql = |
| "SELECT rowCol.data, MAX(rowCol.row_id), MIN(rowCol.row_id) FROM table_with_struct_two" |
| + " GROUP BY rowCol.data"; |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| final Schema schema = |
| Schema.builder() |
| .addStringField("field1") |
| .addInt64Field("field2") |
| .addInt64Field("field3") |
| .build(); |
| PAssert.that(stream) |
| .containsInAnyOrder( |
| Row.withSchema(schema).addValues("data1", 1L, 1L).build(), |
| Row.withSchema(schema).addValues("data2", 3L, 2L).build(), |
| Row.withSchema(schema).addValues("data3", 3L, 3L).build()); |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testZetaSQLSelectFromTableWithArrayType() { |
| String sql = "SELECT array_col FROM table_with_array;"; |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| |
| final Schema schema = Schema.builder().addArrayField("field", FieldType.STRING).build(); |
| |
| PAssert.that(stream) |
| .containsInAnyOrder( |
| Row.withSchema(schema).addValue(Arrays.asList("1", "2", "3")).build(), |
| Row.withSchema(schema).addValue(ImmutableList.of()).build()); |
| |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testZetaSQLSelectStarFromTable() { |
| String sql = "SELECT * FROM BigTable;"; |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| |
| final Schema schema = |
| Schema.builder() |
| .addInt64Field("field1") |
| .addStringField("field2") |
| .addDateTimeField("field3") |
| .build(); |
| |
| PAssert.that(stream) |
| .containsInAnyOrder( |
| Row.withSchema(schema) |
| .addValues( |
| 15L, |
| "BigTable235", |
| new DateTime(2018, 7, 1, 21, 26, 7, ISOChronology.getInstanceUTC())) |
| .build(), |
| Row.withSchema(schema) |
| .addValues( |
| 16L, |
| "BigTable236", |
| new DateTime(2018, 7, 1, 21, 26, 8, ISOChronology.getInstanceUTC())) |
| .build()); |
| |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testZetaSQLBasicFiltering() { |
| String sql = "SELECT Key, Value FROM KeyValue WHERE Key = 14;"; |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| PAssert.that(stream) |
| .containsInAnyOrder( |
| Row.withSchema( |
| Schema.builder().addInt64Field("field1").addStringField("field2").build()) |
| .addValues(14L, "KeyValue234") |
| .build()); |
| |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testZetaSQLBasicFilteringTwo() { |
| String sql = "SELECT Key, Value FROM KeyValue WHERE Key = 14 AND Value = 'non-existing';"; |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| PAssert.that(stream).containsInAnyOrder(); |
| |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testZetaSQLBasicFilteringThree() { |
| String sql = "SELECT Key, Value FROM KeyValue WHERE Key = 14 OR Key = 15;"; |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| |
| final Schema schema = Schema.builder().addInt64Field("field1").addStringField("field2").build(); |
| |
| PAssert.that(stream) |
| .containsInAnyOrder( |
| Row.withSchema(schema).addValues(14L, "KeyValue234").build(), |
| Row.withSchema(schema).addValues(15L, "KeyValue235").build()); |
| |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testZetaSQLCountOnAColumn() { |
| String sql = "SELECT COUNT(Key) FROM KeyValue"; |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| |
| final Schema schema = Schema.builder().addInt64Field("field1").build(); |
| |
| PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValues(2L).build()); |
| |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testZetaSQLAggDistinct() { |
| String sql = "SELECT Key, COUNT(DISTINCT Value) FROM KeyValue GROUP BY Key"; |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| thrown.expect(RuntimeException.class); |
| thrown.expectMessage("Does not support COUNT DISTINCT"); |
| zetaSQLQueryPlanner.convertToBeamRel(sql); |
| } |
| |
| @Test |
| public void testZetaSQLBasicAgg() { |
| String sql = "SELECT Key, COUNT(*) FROM KeyValue GROUP BY Key"; |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| |
| final Schema schema = Schema.builder().addInt64Field("field1").addInt64Field("field2").build(); |
| |
| PAssert.that(stream) |
| .containsInAnyOrder( |
| Row.withSchema(schema).addValues(14L, 1L).build(), |
| Row.withSchema(schema).addValues(15L, 1L).build()); |
| |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testZetaSQLColumnAlias1() { |
| String sql = "SELECT Key, COUNT(*) AS count_col FROM KeyValue GROUP BY Key"; |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| |
| Schema outputSchema = stream.getSchema(); |
| Assert.assertEquals(2, outputSchema.getFieldCount()); |
| Assert.assertEquals("Key", outputSchema.getField(0).getName()); |
| Assert.assertEquals("count_col", outputSchema.getField(1).getName()); |
| } |
| |
| @Test |
| public void testZetaSQLColumnAlias2() { |
| String sql = |
| "SELECT Key AS k1, (count_col + 1) AS k2 FROM (SELECT Key, COUNT(*) AS count_col FROM" |
| + " KeyValue GROUP BY Key)"; |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| |
| Schema outputSchema = stream.getSchema(); |
| Assert.assertEquals(2, outputSchema.getFieldCount()); |
| Assert.assertEquals("k1", outputSchema.getField(0).getName()); |
| Assert.assertEquals("k2", outputSchema.getField(1).getName()); |
| } |
| |
| @Test |
| public void testZetaSQLColumnAlias3() { |
| String sql = "SELECT Key AS v1, Value AS v2, ts AS v3 FROM KeyValue"; |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| |
| Schema outputSchema = stream.getSchema(); |
| Assert.assertEquals(3, outputSchema.getFieldCount()); |
| Assert.assertEquals("v1", outputSchema.getField(0).getName()); |
| Assert.assertEquals("v2", outputSchema.getField(1).getName()); |
| Assert.assertEquals("v3", outputSchema.getField(2).getName()); |
| } |
| |
| @Test |
| public void testZetaSQLColumnAlias4() { |
| String sql = "SELECT CAST(123 AS INT64) AS cast_col"; |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| |
| Schema outputSchema = stream.getSchema(); |
| Assert.assertEquals(1, outputSchema.getFieldCount()); |
| Assert.assertEquals("cast_col", outputSchema.getField(0).getName()); |
| } |
| |
| @Test |
| public void testZetaSQLAmbiguousAlias() { |
| String sql = "SELECT row_id as ID, int64_col as ID FROM table_all_types GROUP BY ID;"; |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| |
| thrown.expectMessage( |
| "Name ID in GROUP BY clause is ambiguous; it may refer to multiple columns in the" |
| + " SELECT-list [at 1:68]"); |
| zetaSQLQueryPlanner.convertToBeamRel(sql); |
| } |
| |
| @Test |
| public void testZetaSQLAggWithOrdinalReference() { |
| String sql = "SELECT Key, COUNT(*) FROM aggregate_test_table GROUP BY 1"; |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| |
| final Schema schema = Schema.builder().addInt64Field("field1").addInt64Field("field2").build(); |
| |
| PAssert.that(stream) |
| .containsInAnyOrder( |
| Row.withSchema(schema).addValues(1L, 2L).build(), |
| Row.withSchema(schema).addValues(2L, 3L).build(), |
| Row.withSchema(schema).addValues(3L, 2L).build()); |
| |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testZetaSQLAggWithAliasReference() { |
| String sql = "SELECT Key AS K, COUNT(*) FROM aggregate_test_table GROUP BY K"; |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| |
| final Schema schema = Schema.builder().addInt64Field("field1").addInt64Field("field2").build(); |
| |
| PAssert.that(stream) |
| .containsInAnyOrder( |
| Row.withSchema(schema).addValues(1L, 2L).build(), |
| Row.withSchema(schema).addValues(2L, 3L).build(), |
| Row.withSchema(schema).addValues(3L, 2L).build()); |
| |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testZetaSQLBasicAgg2() { |
| String sql = "SELECT Key, COUNT(*) FROM aggregate_test_table GROUP BY Key"; |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| |
| final Schema schema = Schema.builder().addInt64Field("field1").addInt64Field("field2").build(); |
| |
| PAssert.that(stream) |
| .containsInAnyOrder( |
| Row.withSchema(schema).addValues(1L, 2L).build(), |
| Row.withSchema(schema).addValues(2L, 3L).build(), |
| Row.withSchema(schema).addValues(3L, 2L).build()); |
| |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testZetaSQLBasicAgg3() { |
| String sql = "SELECT Key, Key2, COUNT(*) FROM aggregate_test_table GROUP BY Key2, Key"; |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| |
| final Schema schema = |
| Schema.builder() |
| .addInt64Field("field1") |
| .addInt64Field("field3") |
| .addInt64Field("field2") |
| .build(); |
| |
| PAssert.that(stream) |
| .containsInAnyOrder( |
| Row.withSchema(schema).addValues(1L, 10L, 1L).build(), |
| Row.withSchema(schema).addValues(1L, 11L, 1L).build(), |
| Row.withSchema(schema).addValues(2L, 11L, 2L).build(), |
| Row.withSchema(schema).addValues(2L, 12L, 1L).build(), |
| Row.withSchema(schema).addValues(3L, 13L, 2L).build()); |
| |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testZetaSQLBasicAgg4() { |
| String sql = |
| "SELECT Key, Key2, MAX(f_int_1), MIN(f_int_1), SUM(f_int_1), SUM(f_double_1) " |
| + "FROM aggregate_test_table GROUP BY Key2, Key"; |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| |
| final Schema schema = |
| Schema.builder() |
| .addInt64Field("field1") |
| .addInt64Field("field3") |
| .addInt64Field("field2") |
| .addInt64Field("field4") |
| .addInt64Field("field5") |
| .addDoubleField("field6") |
| .build(); |
| |
| PAssert.that(stream) |
| .containsInAnyOrder( |
| Row.withSchema(schema).addValues(1L, 10L, 1L, 1L, 1L, 1.0).build(), |
| Row.withSchema(schema).addValues(1L, 11L, 2L, 2L, 2L, 2.0).build(), |
| Row.withSchema(schema).addValues(2L, 11L, 4L, 3L, 7L, 7.0).build(), |
| Row.withSchema(schema).addValues(2L, 12L, 5L, 5L, 5L, 5.0).build(), |
| Row.withSchema(schema).addValues(3L, 13L, 7L, 6L, 13L, 13.0).build()); |
| |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testZetaSQLBasicAgg5() { |
| String sql = |
| "SELECT Key, Key2, AVG(CAST(f_int_1 AS FLOAT64)), AVG(f_double_1) " |
| + "FROM aggregate_test_table GROUP BY Key2, Key"; |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| |
| final Schema schema = |
| Schema.builder() |
| .addInt64Field("field1") |
| .addInt64Field("field2") |
| .addDoubleField("field3") |
| .addDoubleField("field4") |
| .build(); |
| |
| PAssert.that(stream) |
| .containsInAnyOrder( |
| Row.withSchema(schema).addValues(1L, 10L, 1.0, 1.0).build(), |
| Row.withSchema(schema).addValues(1L, 11L, 2.0, 2.0).build(), |
| Row.withSchema(schema).addValues(2L, 11L, 3.5, 3.5).build(), |
| Row.withSchema(schema).addValues(2L, 12L, 5.0, 5.0).build(), |
| Row.withSchema(schema).addValues(3L, 13L, 6.5, 6.5).build()); |
| |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| @Ignore( |
| "Calcite infers return type of AVG(int64) as BIGINT while ZetaSQL requires it as either" |
| + " NUMERIC or DOUBLE/FLOAT64") |
| public void testZetaSQLTestAVG() { |
| String sql = "SELECT Key, AVG(f_int_1)" + "FROM aggregate_test_table GROUP BY Key"; |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| |
| final Schema schema = |
| Schema.builder() |
| .addInt64Field("field1") |
| .addInt64Field("field2") |
| .addInt64Field("field3") |
| .build(); |
| |
| PAssert.that(stream) |
| .containsInAnyOrder( |
| Row.withSchema(schema).addValues(1L, 10L, 1L).build(), |
| Row.withSchema(schema).addValues(1L, 11L, 6L).build(), |
| Row.withSchema(schema).addValues(2L, 11L, 6L).build()); |
| |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testZetaSQLGroupByExprInSelect() { |
| String sql = "SELECT int64_col + 1 FROM table_all_types GROUP BY int64_col + 1;"; |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| |
| final Schema schema = Schema.builder().addInt64Field("field").build(); |
| |
| PAssert.that(stream) |
| .containsInAnyOrder( |
| Row.withSchema(schema).addValue(0L).build(), |
| Row.withSchema(schema).addValue(-1L).build(), |
| Row.withSchema(schema).addValue(-2L).build(), |
| Row.withSchema(schema).addValue(-3L).build(), |
| Row.withSchema(schema).addValue(-4L).build()); |
| |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testZetaSQLGroupByAndFiltering() { |
| String sql = "SELECT int64_col FROM table_all_types WHERE int64_col = 1 GROUP BY int64_col;"; |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| PAssert.that(stream).containsInAnyOrder(); |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testZetaSQLGroupByAndFilteringOnNonGroupByColumn() { |
| String sql = "SELECT int64_col FROM table_all_types WHERE double_col = 0.5 GROUP BY int64_col;"; |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| final Schema schema = Schema.builder().addInt64Field("field").build(); |
| PAssert.that(stream) |
| .containsInAnyOrder( |
| Row.withSchema(schema).addValue(-5L).build(), |
| Row.withSchema(schema).addValue(-4L).build()); |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testZetaSQLBasicHaving() { |
| String sql = "SELECT Key, COUNT(*) FROM aggregate_test_table GROUP BY Key HAVING COUNT(*) > 2"; |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| |
| final Schema schema = Schema.builder().addInt64Field("field1").addInt64Field("field2").build(); |
| |
| PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValues(2L, 3L).build()); |
| |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testZetaSQLBasicFixedWindowing() { |
| String sql = |
| "SELECT " |
| + "COUNT(*) as field_count, " |
| + "TUMBLE_START(\"INTERVAL 1 SECOND\") as window_start, " |
| + "TUMBLE_END(\"INTERVAL 1 SECOND\") as window_end " |
| + "FROM KeyValue " |
| + "GROUP BY TUMBLE(ts, \"INTERVAL 1 SECOND\");"; |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); |
| |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| |
| final Schema schema = |
| Schema.builder() |
| .addInt64Field("count_start") |
| .addDateTimeField("field1") |
| .addDateTimeField("field2") |
| .build(); |
| PAssert.that(stream) |
| .containsInAnyOrder( |
| Row.withSchema(schema) |
| .addValues( |
| 1L, |
| new DateTime(2018, 7, 1, 21, 26, 7, ISOChronology.getInstanceUTC()), |
| new DateTime(2018, 7, 1, 21, 26, 8, ISOChronology.getInstanceUTC())) |
| .build(), |
| Row.withSchema(schema) |
| .addValues( |
| 1L, |
| new DateTime(2018, 7, 1, 21, 26, 6, ISOChronology.getInstanceUTC()), |
| new DateTime(2018, 7, 1, 21, 26, 7, ISOChronology.getInstanceUTC())) |
| .build()); |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testZetaSQLBasicSlidingWindowing() { |
| String sql = |
| "SELECT " |
| + "COUNT(*) as field_count, " |
| + "HOP_START(\"INTERVAL 1 SECOND\", \"INTERVAL 2 SECOND\") as window_start, " |
| + "HOP_END(\"INTERVAL 1 SECOND\", \"INTERVAL 2 SECOND\") as window_end " |
| + "FROM window_test_table " |
| + "GROUP BY HOP(ts, \"INTERVAL 1 SECOND\", \"INTERVAL 2 SECOND\");"; |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); |
| |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| |
| final Schema schema = |
| Schema.builder() |
| .addInt64Field("count_star") |
| .addDateTimeField("field1") |
| .addDateTimeField("field2") |
| .build(); |
| PAssert.that(stream) |
| .containsInAnyOrder( |
| Row.withSchema(schema) |
| .addValues( |
| 2L, |
| new DateTime(2018, 7, 1, 21, 26, 7, ISOChronology.getInstanceUTC()), |
| new DateTime(2018, 7, 1, 21, 26, 9, ISOChronology.getInstanceUTC())) |
| .build(), |
| Row.withSchema(schema) |
| .addValues( |
| 1L, |
| new DateTime(2018, 7, 1, 21, 26, 5, ISOChronology.getInstanceUTC()), |
| new DateTime(2018, 7, 1, 21, 26, 7, ISOChronology.getInstanceUTC())) |
| .build(), |
| Row.withSchema(schema) |
| .addValues( |
| 2L, |
| new DateTime(2018, 7, 1, 21, 26, 6, ISOChronology.getInstanceUTC()), |
| new DateTime(2018, 7, 1, 21, 26, 8, ISOChronology.getInstanceUTC())) |
| .build(), |
| Row.withSchema(schema) |
| .addValues( |
| 2L, |
| new DateTime(2018, 7, 1, 21, 26, 8, ISOChronology.getInstanceUTC()), |
| new DateTime(2018, 7, 1, 21, 26, 10, ISOChronology.getInstanceUTC())) |
| .build(), |
| Row.withSchema(schema) |
| .addValues( |
| 1L, |
| new DateTime(2018, 7, 1, 21, 26, 9, ISOChronology.getInstanceUTC()), |
| new DateTime(2018, 7, 1, 21, 26, 11, ISOChronology.getInstanceUTC())) |
| .build()); |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testZetaSQLBasicSessionWindowing() { |
| String sql = |
| "SELECT " |
| + "COUNT(*) as field_count, " |
| + "SESSION_START(\"INTERVAL 3 SECOND\") as window_start, " |
| + "SESSION_END(\"INTERVAL 3 SECOND\") as window_end " |
| + "FROM window_test_table_two " |
| + "GROUP BY SESSION(ts, \"INTERVAL 3 SECOND\");"; |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); |
| |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| |
| final Schema schema = |
| Schema.builder() |
| .addInt64Field("count_star") |
| .addDateTimeField("field1") |
| .addDateTimeField("field2") |
| .build(); |
| PAssert.that(stream) |
| .containsInAnyOrder( |
| Row.withSchema(schema) |
| .addValues( |
| 2L, |
| new DateTime(2018, 7, 1, 21, 26, 12, ISOChronology.getInstanceUTC()), |
| new DateTime(2018, 7, 1, 21, 26, 12, ISOChronology.getInstanceUTC())) |
| .build(), |
| Row.withSchema(schema) |
| .addValues( |
| 2L, |
| new DateTime(2018, 7, 1, 21, 26, 6, ISOChronology.getInstanceUTC()), |
| new DateTime(2018, 7, 1, 21, 26, 6, ISOChronology.getInstanceUTC())) |
| .build()); |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| // Test nested selection |
| @Test |
| public void testZetaSQLNestedQueryOne() { |
| String sql = |
| "SELECT a.Value, a.Key FROM (SELECT Key, Value FROM KeyValue WHERE Key = 14 OR Key = 15)" |
| + " as a;"; |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| |
| final Schema schema = Schema.builder().addStringField("field2").addInt64Field("field1").build(); |
| |
| PAssert.that(stream) |
| .containsInAnyOrder( |
| Row.withSchema(schema).addValues("KeyValue234", 14L).build(), |
| Row.withSchema(schema).addValues("KeyValue235", 15L).build()); |
| |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| // Test selection, filtering and aggregation combined query. |
| @Test |
| public void testZetaSQLNestedQueryTwo() { |
| String sql = |
| "SELECT a.Key, a.Key2, COUNT(*) FROM " |
| + " (SELECT * FROM aggregate_test_table WHERE Key != 10) as a " |
| + " GROUP BY a.Key2, a.Key"; |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| |
| final Schema schema = |
| Schema.builder() |
| .addInt64Field("field1") |
| .addInt64Field("field3") |
| .addInt64Field("field2") |
| .build(); |
| |
| PAssert.that(stream) |
| .containsInAnyOrder( |
| Row.withSchema(schema).addValues(1L, 10L, 1L).build(), |
| Row.withSchema(schema).addValues(1L, 11L, 1L).build(), |
| Row.withSchema(schema).addValues(2L, 11L, 2L).build(), |
| Row.withSchema(schema).addValues(2L, 12L, 1L).build(), |
| Row.withSchema(schema).addValues(3L, 13L, 2L).build()); |
| |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| // test selection and join combined query |
| @Test |
| public void testZetaSQLNestedQueryThree() { |
| String sql = |
| "SELECT * FROM (SELECT * FROM KeyValue) AS t1 INNER JOIN (SELECT * FROM BigTable) AS t2 on" |
| + " t1.Key = t2.RowKey"; |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| |
| PAssert.that(stream) |
| .containsInAnyOrder( |
| Row.withSchema( |
| Schema.builder() |
| .addInt64Field("Key") |
| .addStringField("Value") |
| .addDateTimeField("ts") |
| .addInt64Field("RowKey") |
| .addStringField("Value2") |
| .addDateTimeField("ts2") |
| .build()) |
| .addValues( |
| 15L, |
| "KeyValue235", |
| new DateTime(2018, 7, 1, 21, 26, 7, ISOChronology.getInstanceUTC()), |
| 15L, |
| "BigTable235", |
| new DateTime(2018, 7, 1, 21, 26, 7, ISOChronology.getInstanceUTC())) |
| .build()); |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testZetaSQLNestedQueryFour() { |
| String sql = |
| "SELECT t1.Value, TUMBLE_START('INTERVAL 1 SECOND') AS period_start, MIN(t2.Value) as" |
| + " min_v FROM KeyValue AS t1 INNER JOIN BigTable AS t2 on t1.Key = t2.RowKey GROUP BY" |
| + " t1.Value, TUMBLE(t2.ts, 'INTERVAL 1 SECOND')"; |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| |
| PAssert.that(stream) |
| .containsInAnyOrder( |
| Row.withSchema( |
| Schema.builder() |
| .addStringField("value") |
| .addDateTimeField("min_v") |
| .addStringField("period_start") |
| .build()) |
| .addValues( |
| "KeyValue235", |
| new DateTime(2018, 7, 1, 21, 26, 7, ISOChronology.getInstanceUTC()), |
| "BigTable235") |
| .build()); |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| // Test nested select with out of order columns. |
| @Test |
| public void testZetaSQLNestedQueryFive() { |
| String sql = |
| "SELECT a.Value, a.Key FROM (SELECT Value, Key FROM KeyValue WHERE Key = 14 OR Key = 15)" |
| + " as a;"; |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| |
| final Schema schema = Schema.builder().addStringField("field2").addInt64Field("field1").build(); |
| |
| PAssert.that(stream) |
| .containsInAnyOrder( |
| Row.withSchema(schema).addValues("KeyValue234", 14L).build(), |
| Row.withSchema(schema).addValues("KeyValue235", 15L).build()); |
| |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| @Ignore("Does not support DATE_ADD and TIME_ADD.") |
| public void testDateAndTimeAddSub() { |
| String sql = |
| "SELECT " |
| + "DATE_ADD(DATE '2008-12-25', INTERVAL 5 DAY), " |
| + "TIME_ADD(TIME '13:24:30', INTERVAL 3 HOUR)"; |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| |
| PAssert.that(stream) |
| .containsInAnyOrder( |
| Row.withSchema( |
| Schema.builder() |
| .addDateTimeField("f_date_plus") |
| .addDateTimeField("f_time_plus") |
| .build()) |
| .addValues(parseDate("2008-12-30"), parseTime("16:24:30")) |
| .build()); |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testTimestampAddSub() { |
| String sql = |
| "SELECT " |
| + "TIMESTAMP_ADD(TIMESTAMP '2008-12-25 15:30:00 UTC', INTERVAL 10 MINUTE), " |
| + "TIMESTAMP_ADD(TIMESTAMP '2008-12-25 15:30:00+07:30', INTERVAL 10 MINUTE)"; |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| |
| PAssert.that(stream) |
| .containsInAnyOrder( |
| Row.withSchema( |
| Schema.builder() |
| .addDateTimeField("f_timestamp_plus") |
| .addDateTimeField("f_timestamp_with_time_zone_plus") |
| .build()) |
| .addValues( |
| DateTimeUtils.parseTimestampWithUTCTimeZone("2008-12-25 15:40:00"), |
| parseTimestampWithTimeZone("2008-12-25 15:40:00+0730")) |
| .build()); |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testTimeZone() { |
| String sql = "SELECT TIMESTAMP '2018-12-10 10:38:59-10:00'"; |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| |
| PAssert.that(stream) |
| .containsInAnyOrder( |
| Row.withSchema(Schema.builder().addDateTimeField("f_timestamp_with_time_zone").build()) |
| .addValues(parseTimestampWithTimeZone("2018-12-10 10:38:59-1000")) |
| .build()); |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testDistinct() { |
| String sql = "SELECT DISTINCT Key2 FROM aggregate_test_table"; |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| |
| Schema schema = Schema.builder().addInt64Field("Key2").build(); |
| PAssert.that(stream) |
| .containsInAnyOrder( |
| Row.withSchema(schema).addValues(10L).build(), |
| Row.withSchema(schema).addValues(11L).build(), |
| Row.withSchema(schema).addValues(12L).build(), |
| Row.withSchema(schema).addValues(13L).build()); |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testDistinctOnNull() { |
| String sql = "SELECT DISTINCT str_val FROM all_null_table"; |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| |
| Schema schema = Schema.builder().addNullableField("str_val", FieldType.DOUBLE).build(); |
| PAssert.that(stream) |
| .containsInAnyOrder(Row.withSchema(schema).addValues((Object) null).build()); |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| @Ignore("BeamSQL does not support ANY_VALUE") |
| public void testAnyValue() { |
| String sql = "SELECT ANY_VALUE(double_val) FROM all_null_table"; |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| |
| Schema schema = Schema.builder().addNullableField("double_val", FieldType.DOUBLE).build(); |
| PAssert.that(stream) |
| .containsInAnyOrder(Row.withSchema(schema).addValues((Object) null).build()); |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testSelectNULL() { |
| String sql = "SELECT NULL"; |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| |
| Schema schema = Schema.builder().addNullableField("long_val", FieldType.INT64).build(); |
| PAssert.that(stream) |
| .containsInAnyOrder(Row.withSchema(schema).addValues((Object) null).build()); |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testWithQueryOne() { |
| String sql = |
| "With T1 AS (SELECT * FROM KeyValue), T2 AS (SELECT * FROM BigTable) SELECT T2.RowKey FROM" |
| + " T1 INNER JOIN T2 on T1.Key = T2.RowKey;"; |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| PAssert.that(stream) |
| .containsInAnyOrder( |
| Row.withSchema(Schema.builder().addInt64Field("field1").build()) |
| .addValues(15L) |
| .build()); |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testWithQueryTwo() { |
| String sql = |
| "WITH T1 AS (SELECT Key, COUNT(*) as value FROM KeyValue GROUP BY Key) SELECT T1.Key," |
| + " T1.value FROM T1"; |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| |
| final Schema schema = Schema.builder().addInt64Field("field1").addInt64Field("field2").build(); |
| |
| PAssert.that(stream) |
| .containsInAnyOrder( |
| Row.withSchema(schema).addValues(14L, 1L).build(), |
| Row.withSchema(schema).addValues(15L, 1L).build()); |
| |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testWithQueryThree() { |
| String sql = |
| "WITH T1 as (SELECT Value, Key FROM KeyValue WHERE Key = 14 OR Key = 15) SELECT T1.Value," |
| + " T1.Key FROM T1;"; |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| |
| final Schema schema = Schema.builder().addStringField("field1").addInt64Field("field2").build(); |
| |
| PAssert.that(stream) |
| .containsInAnyOrder( |
| Row.withSchema(schema).addValues("KeyValue234", 14L).build(), |
| Row.withSchema(schema).addValues("KeyValue235", 15L).build()); |
| |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testWithQueryFour() { |
| String sql = |
| "WITH T1 as (SELECT Value, Key FROM KeyValue) SELECT T1.Value, T1.Key FROM T1 WHERE T1.Key" |
| + " = 14 OR T1.Key = 15;"; |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| |
| final Schema schema = Schema.builder().addStringField("field2").addInt64Field("field1").build(); |
| |
| PAssert.that(stream) |
| .containsInAnyOrder( |
| Row.withSchema(schema).addValues("KeyValue234", 14L).build(), |
| Row.withSchema(schema).addValues("KeyValue235", 15L).build()); |
| |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testWithQueryFive() { |
| String sql = |
| "WITH T1 AS (SELECT * FROM KeyValue) SELECT T1.Key, COUNT(*) FROM T1 GROUP BY T1.Key"; |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| |
| final Schema schema = Schema.builder().addInt64Field("field1").addInt64Field("field2").build(); |
| |
| PAssert.that(stream) |
| .containsInAnyOrder( |
| Row.withSchema(schema).addValues(14L, 1L).build(), |
| Row.withSchema(schema).addValues(15L, 1L).build()); |
| |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testWithQuerySix() { |
| String sql = |
| "WITH T1 AS (SELECT * FROM window_test_table_two) SELECT " |
| + "COUNT(*) as field_count, " |
| + "SESSION_START(\"INTERVAL 3 SECOND\") as window_start, " |
| + "SESSION_END(\"INTERVAL 3 SECOND\") as window_end " |
| + "FROM T1 " |
| + "GROUP BY SESSION(ts, \"INTERVAL 3 SECOND\");"; |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); |
| |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| |
| final Schema schema = |
| Schema.builder() |
| .addInt64Field("count_star") |
| .addDateTimeField("field1") |
| .addDateTimeField("field2") |
| .build(); |
| PAssert.that(stream) |
| .containsInAnyOrder( |
| Row.withSchema(schema) |
| .addValues( |
| 2L, |
| new DateTime(2018, 7, 1, 21, 26, 12, ISOChronology.getInstanceUTC()), |
| new DateTime(2018, 7, 1, 21, 26, 12, ISOChronology.getInstanceUTC())) |
| .build(), |
| Row.withSchema(schema) |
| .addValues( |
| 2L, |
| new DateTime(2018, 7, 1, 21, 26, 6, ISOChronology.getInstanceUTC()), |
| new DateTime(2018, 7, 1, 21, 26, 6, ISOChronology.getInstanceUTC())) |
| .build()); |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testWithQuerySeven() { |
| String sql = |
| "WITH T1 AS (SELECT * FROM KeyValue) SELECT " |
| + "COUNT(*) as field_count, " |
| + "TUMBLE_START(\"INTERVAL 1 SECOND\") as window_start, " |
| + "TUMBLE_END(\"INTERVAL 1 SECOND\") as window_end " |
| + "FROM T1 " |
| + "GROUP BY TUMBLE(ts, \"INTERVAL 1 SECOND\");"; |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); |
| |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| |
| final Schema schema = |
| Schema.builder() |
| .addInt64Field("count_start") |
| .addDateTimeField("field1") |
| .addDateTimeField("field2") |
| .build(); |
| PAssert.that(stream) |
| .containsInAnyOrder( |
| Row.withSchema(schema) |
| .addValues( |
| 1L, |
| new DateTime(2018, 7, 1, 21, 26, 7, ISOChronology.getInstanceUTC()), |
| new DateTime(2018, 7, 1, 21, 26, 8, ISOChronology.getInstanceUTC())) |
| .build(), |
| Row.withSchema(schema) |
| .addValues( |
| 1L, |
| new DateTime(2018, 7, 1, 21, 26, 6, ISOChronology.getInstanceUTC()), |
| new DateTime(2018, 7, 1, 21, 26, 7, ISOChronology.getInstanceUTC())) |
| .build()); |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testUNNESTLiteral() { |
| String sql = "SELECT * FROM UNNEST(ARRAY<STRING>['foo', 'bar']);"; |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); |
| |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| Schema schema = Schema.builder().addStringField("str_field").build(); |
| PAssert.that(stream) |
| .containsInAnyOrder( |
| Row.withSchema(schema).addValues("foo").build(), |
| Row.withSchema(schema).addValues("bar").build()); |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testNamedUNNESTLiteral() { |
| String sql = "SELECT *, T1 FROM UNNEST(ARRAY<STRING>['foo', 'bar']) AS T1"; |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); |
| |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| Schema schema = |
| Schema.builder().addStringField("str_field").addStringField("str2_field").build(); |
| PAssert.that(stream) |
| .containsInAnyOrder( |
| Row.withSchema(schema).addValues("foo", "foo").build(), |
| Row.withSchema(schema).addValues("bar", "bar").build()); |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| @Ignore("Seeing exception in Beam, need further investigation on the cause of this failed query.") |
| public void testNamedUNNESTJoin() { |
| String sql = |
| "SELECT * " |
| + "FROM table_with_array_for_unnest AS t1" |
| + " LEFT JOIN UNNEST(t1.int_array_col) AS t2" |
| + " on " |
| + " t1.int_col = t2"; |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| |
| PAssert.that(stream).containsInAnyOrder(); |
| |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testCaseNoValue() { |
| String sql = "SELECT CASE WHEN 1 > 2 THEN 'not possible' ELSE 'seems right' END"; |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| |
| PAssert.that(stream) |
| .containsInAnyOrder( |
| Row.withSchema(Schema.builder().addStringField("str_field").build()) |
| .addValue("seems right") |
| .build()); |
| |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testCaseWithValue() { |
| String sql = "SELECT CASE 1 WHEN 2 THEN 'not possible' ELSE 'seems right' END"; |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| |
| PAssert.that(stream) |
| .containsInAnyOrder( |
| Row.withSchema(Schema.builder().addStringField("str_field").build()) |
| .addValue("seems right") |
| .build()); |
| |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testCaseWithValueMultipleCases() { |
| String sql = |
| "SELECT CASE 2 WHEN 1 THEN 'not possible' WHEN 2 THEN 'seems right' ELSE 'also not" |
| + " possible' END"; |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| |
| PAssert.that(stream) |
| .containsInAnyOrder( |
| Row.withSchema(Schema.builder().addStringField("str_field").build()) |
| .addValue("seems right") |
| .build()); |
| |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testCaseWithValueNoElse() { |
| String sql = "SELECT CASE 2 WHEN 1 THEN 'not possible' WHEN 2 THEN 'seems right' END"; |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| |
| PAssert.that(stream) |
| .containsInAnyOrder( |
| Row.withSchema(Schema.builder().addStringField("str_field").build()) |
| .addValue("seems right") |
| .build()); |
| |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testCaseNoValueNoElseNoMatch() { |
| String sql = "SELECT CASE WHEN 'abc' = '123' THEN 'not possible' END"; |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| |
| PAssert.that(stream) |
| .containsInAnyOrder( |
| Row.withSchema(Schema.builder().addNullableField("str_field", FieldType.STRING).build()) |
| .addValue(null) |
| .build()); |
| |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testCaseWithValueNoElseNoMatch() { |
| String sql = "SELECT CASE 2 WHEN 1 THEN 'not possible' END"; |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| |
| PAssert.that(stream) |
| .containsInAnyOrder( |
| Row.withSchema(Schema.builder().addNullableField("str_field", FieldType.STRING).build()) |
| .addValue(null) |
| .build()); |
| |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| @Ignore( |
| "Codegen generates code that Janino cannot compile, need further investigation on root" |
| + " cause.") |
| public void testCastToDateWithCase() { |
| String sql = |
| "SELECT f_int, \n" |
| + "CASE WHEN CHAR_LENGTH(TRIM(f_string)) = 8 \n" |
| + " THEN CAST (CONCAT(\n" |
| + " SUBSTR(TRIM(f_string), 0, 4) \n" |
| + " , '-' \n" |
| + " , SUBSTR(TRIM(f_string), 4, 2) \n" |
| + " , '-' \n" |
| + " , SUBSTR(TRIM(f_string), 6, 2)) AS DATE)\n" |
| + " ELSE NULL\n" |
| + "END \n" |
| + "FROM table_for_case_when"; |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| |
| Schema resultType = |
| Schema.builder().addInt32Field("f_int").addNullableField("f_date", DATETIME).build(); |
| |
| PAssert.that(stream) |
| .containsInAnyOrder( |
| Row.withSchema(resultType).addValues(1, parseDate("2018-10-18")).build()); |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testIntersectAll() { |
| String sql = |
| "SELECT Key FROM aggregate_test_table " |
| + "INTERSECT ALL " |
| + "SELECT Key FROM aggregate_test_table_two"; |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| |
| Schema resultType = Schema.builder().addInt64Field("field").build(); |
| |
| PAssert.that(stream) |
| .containsInAnyOrder( |
| Row.withSchema(resultType).addValues(1L).build(), |
| Row.withSchema(resultType).addValues(2L).build(), |
| Row.withSchema(resultType).addValues(2L).build(), |
| Row.withSchema(resultType).addValues(2L).build(), |
| Row.withSchema(resultType).addValues(3L).build()); |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testIntersectDistinct() { |
| String sql = |
| "SELECT Key FROM aggregate_test_table " |
| + "INTERSECT DISTINCT " |
| + "SELECT Key FROM aggregate_test_table_two"; |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| |
| Schema resultType = Schema.builder().addInt64Field("field").build(); |
| |
| PAssert.that(stream) |
| .containsInAnyOrder( |
| Row.withSchema(resultType).addValues(1L).build(), |
| Row.withSchema(resultType).addValues(2L).build(), |
| Row.withSchema(resultType).addValues(3L).build()); |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testExceptAll() { |
| String sql = |
| "SELECT Key FROM aggregate_test_table " |
| + "EXCEPT ALL " |
| + "SELECT Key FROM aggregate_test_table_two"; |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| |
| Schema resultType = Schema.builder().addInt64Field("field").build(); |
| |
| PAssert.that(stream) |
| .containsInAnyOrder( |
| Row.withSchema(resultType).addValues(1L).build(), |
| Row.withSchema(resultType).addValues(3L).build()); |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testSelectFromEmptyTable() { |
| String sql = "SELECT * FROM table_empty;"; |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| PAssert.that(stream).containsInAnyOrder(); |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testStartsWithString() { |
| String sql = "SELECT STARTS_WITH('string1', 'stri')"; |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| final Schema schema = Schema.builder().addNullableField("field1", FieldType.BOOLEAN).build(); |
| |
| PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValues(true).build()); |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testStartsWithString2() { |
| String sql = "SELECT STARTS_WITH(@p0, @p1)"; |
| |
| ImmutableMap<String, Value> params = |
| ImmutableMap.<String, Value>builder() |
| .put("p0", Value.createSimpleNullValue(TypeKind.TYPE_STRING)) |
| .put("p1", Value.createStringValue("")) |
| .build(); |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| final Schema schema = Schema.builder().addNullableField("field1", FieldType.BOOLEAN).build(); |
| |
| PAssert.that(stream) |
| .containsInAnyOrder(Row.withSchema(schema).addValues((Boolean) null).build()); |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testStartsWithString3() { |
| String sql = "SELECT STARTS_WITH(@p0, @p1)"; |
| |
| ImmutableMap<String, Value> params = |
| ImmutableMap.<String, Value>builder() |
| .put("p0", Value.createSimpleNullValue(TypeKind.TYPE_STRING)) |
| .put("p1", Value.createSimpleNullValue(TypeKind.TYPE_STRING)) |
| .build(); |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| final Schema schema = Schema.builder().addNullableField("field1", FieldType.BOOLEAN).build(); |
| |
| PAssert.that(stream) |
| .containsInAnyOrder(Row.withSchema(schema).addValues((Boolean) null).build()); |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testEndsWithString() { |
| String sql = "SELECT STARTS_WITH('string1', 'ng0')"; |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| final Schema schema = Schema.builder().addNullableField("field1", FieldType.BOOLEAN).build(); |
| |
| PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValues(false).build()); |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testEndsWithString2() { |
| String sql = "SELECT STARTS_WITH(@p0, @p1)"; |
| |
| ImmutableMap<String, Value> params = |
| ImmutableMap.<String, Value>builder() |
| .put("p0", Value.createSimpleNullValue(TypeKind.TYPE_STRING)) |
| .put("p1", Value.createStringValue("")) |
| .build(); |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| final Schema schema = Schema.builder().addNullableField("field1", FieldType.BOOLEAN).build(); |
| |
| PAssert.that(stream) |
| .containsInAnyOrder(Row.withSchema(schema).addValues((Boolean) null).build()); |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testEndsWithString3() { |
| String sql = "SELECT STARTS_WITH(@p0, @p1)"; |
| |
| ImmutableMap<String, Value> params = |
| ImmutableMap.<String, Value>builder() |
| .put("p0", Value.createSimpleNullValue(TypeKind.TYPE_STRING)) |
| .put("p1", Value.createSimpleNullValue(TypeKind.TYPE_STRING)) |
| .build(); |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| final Schema schema = Schema.builder().addNullableField("field1", FieldType.BOOLEAN).build(); |
| |
| PAssert.that(stream) |
| .containsInAnyOrder(Row.withSchema(schema).addValues((Boolean) null).build()); |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| @Ignore("Does not support DateTime literal.") |
| public void testDateTimeLiteral() { |
| String sql = "SELECT DATETIME '2018-01-01 05:30:00.334'"; |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| thrown.expect(RuntimeException.class); |
| thrown.expectMessage("Unsupported ResolvedLiteral type: DATETIME"); |
| zetaSQLQueryPlanner.convertToBeamRel(sql); |
| } |
| |
| @Test |
| public void testTimeStampLiteral() { |
| String sql = "SELECT TIMESTAMP '2016-12-25 05:30:00+00'"; |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| final Schema schema = Schema.builder().addDateTimeField("field1").build(); |
| |
| PAssert.that(stream) |
| .containsInAnyOrder( |
| Row.withSchema(schema) |
| .addValues(parseTimestampWithUTCTimeZone("2016-12-25 05:30:00")) |
| .build()); |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testTimeStampLiteralWithoutTimeZone() { |
| String sql = "SELECT TIMESTAMP '2016-12-25 05:30:00'"; |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| final Schema schema = Schema.builder().addDateTimeField("field1").build(); |
| |
| PAssert.that(stream) |
| .containsInAnyOrder( |
| Row.withSchema(schema) |
| .addValues(parseTimestampWithUTCTimeZone("2016-12-25 05:30:00")) |
| .build()); |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testTimeStampLiteralWithNonUTCTimeZone() { |
| String sql = "SELECT TIMESTAMP '2016-12-25 05:30:00+05'"; |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| final Schema schema = Schema.builder().addDateTimeField("field1").build(); |
| |
| PAssert.that(stream) |
| .containsInAnyOrder( |
| Row.withSchema(schema) |
| .addValues(parseTimestampWithTimeZone("2016-12-25 05:30:00+05")) |
| .build()); |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testConcatWithOneParameters() { |
| String sql = "SELECT concat('abc')"; |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| final Schema schema = Schema.builder().addStringField("field1").build(); |
| PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValues("abc").build()); |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testConcatWithTwoParameters() { |
| String sql = "SELECT concat('abc', 'def')"; |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| final Schema schema = Schema.builder().addStringField("field1").build(); |
| PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValues("abcdef").build()); |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testConcatWithThreeParameters() { |
| String sql = "SELECT concat('abc', 'def', 'xyz')"; |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| final Schema schema = Schema.builder().addStringField("field1").build(); |
| PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValues("abcdefxyz").build()); |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testConcatWithFourParameters() { |
| String sql = "SELECT concat('abc', 'def', ' ', 'xyz')"; |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| final Schema schema = Schema.builder().addStringField("field1").build(); |
| PAssert.that(stream) |
| .containsInAnyOrder(Row.withSchema(schema).addValues("abcdef xyz").build()); |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testConcatWithFiveParameters() { |
| String sql = "SELECT concat('abc', 'def', ' ', 'xyz', 'kkk')"; |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| final Schema schema = Schema.builder().addStringField("field1").build(); |
| PAssert.that(stream) |
| .containsInAnyOrder(Row.withSchema(schema).addValues("abcdef xyzkkk").build()); |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| @Ignore( |
| "Calcite codegen does not support UDF with ... args." |
| + " See:https://jira.apache.org/jira/browse/CALCITE-2889") |
| public void testConcatWithSixParameters() { |
| String sql = "SELECT concat('abc', 'def', ' ', 'xyz', 'kkk', 'ttt')"; |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| final Schema schema = Schema.builder().addStringField("field1").build(); |
| PAssert.that(stream) |
| .containsInAnyOrder(Row.withSchema(schema).addValues("abcdef xyzkkkttt").build()); |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testConcatWithNull1() { |
| String sql = "SELECT CONCAT(@p0, @p1) AS ColA"; |
| ImmutableMap<String, Value> params = |
| ImmutableMap.of( |
| "p0", |
| Value.createStringValue(""), |
| "p1", |
| Value.createSimpleNullValue(TypeKind.TYPE_STRING)); |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| final Schema schema = Schema.builder().addNullableField("field1", FieldType.STRING).build(); |
| PAssert.that(stream) |
| .containsInAnyOrder(Row.withSchema(schema).addValues((String) null).build()); |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testConcatWithNull2() { |
| String sql = "SELECT CONCAT(@p0, @p1) AS ColA"; |
| ImmutableMap<String, Value> params = |
| ImmutableMap.of( |
| "p0", |
| Value.createSimpleNullValue(TypeKind.TYPE_STRING), |
| "p1", |
| Value.createSimpleNullValue(TypeKind.TYPE_STRING)); |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| final Schema schema = Schema.builder().addNullableField("field1", FieldType.STRING).build(); |
| PAssert.that(stream) |
| .containsInAnyOrder(Row.withSchema(schema).addValues((String) null).build()); |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testConcatParameterQuery() { |
| String sql = "SELECT CONCAT(@p0, @p1) AS ColA"; |
| ImmutableMap<String, Value> params = |
| ImmutableMap.of("p0", Value.createStringValue(""), "p1", Value.createStringValue("A")); |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| final Schema schema = Schema.builder().addStringField("field1").build(); |
| PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValues("A").build()); |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testReplace1() { |
| String sql = "SELECT REPLACE(@p0, @p1, @p2) AS ColA"; |
| ImmutableMap<String, Value> params = |
| ImmutableMap.of( |
| "p0", Value.createStringValue(""), |
| "p1", Value.createStringValue(""), |
| "p2", Value.createStringValue("a")); |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| |
| final Schema schema = Schema.builder().addStringField("field1").build(); |
| |
| PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValues("").build()); |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testReplace2() { |
| String sql = "SELECT REPLACE(@p0, @p1, @p2) AS ColA"; |
| ImmutableMap<String, Value> params = |
| ImmutableMap.of( |
| "p0", Value.createStringValue("abc"), |
| "p1", Value.createStringValue(""), |
| "p2", Value.createStringValue("xyz")); |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| |
| final Schema schema = Schema.builder().addStringField("field1").build(); |
| |
| PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValues("abc").build()); |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testReplace3() { |
| String sql = "SELECT REPLACE(@p0, @p1, @p2) AS ColA"; |
| ImmutableMap<String, Value> params = |
| ImmutableMap.of( |
| "p0", Value.createStringValue(""), |
| "p1", Value.createStringValue(""), |
| "p2", Value.createSimpleNullValue(TypeKind.TYPE_STRING)); |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| |
| final Schema schema = Schema.builder().addNullableField("field1", FieldType.STRING).build(); |
| |
| PAssert.that(stream) |
| .containsInAnyOrder(Row.withSchema(schema).addValues((String) null).build()); |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testReplace4() { |
| String sql = "SELECT REPLACE(@p0, @p1, @p2) AS ColA"; |
| ImmutableMap<String, Value> params = |
| ImmutableMap.of( |
| "p0", Value.createSimpleNullValue(TypeKind.TYPE_STRING), |
| "p1", Value.createSimpleNullValue(TypeKind.TYPE_STRING), |
| "p2", Value.createStringValue("")); |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| |
| final Schema schema = Schema.builder().addNullableField("field1", FieldType.STRING).build(); |
| |
| PAssert.that(stream) |
| .containsInAnyOrder(Row.withSchema(schema).addValues((String) null).build()); |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testTrim1() { |
| String sql = "SELECT trim(@p0)"; |
| ImmutableMap<String, Value> params = |
| ImmutableMap.of("p0", Value.createStringValue(" a b c ")); |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| |
| final Schema schema = Schema.builder().addStringField("field1").build(); |
| |
| PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValues("a b c").build()); |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testTrim2() { |
| String sql = "SELECT trim(@p0, @p1)"; |
| ImmutableMap<String, Value> params = |
| ImmutableMap.of( |
| "p0", Value.createStringValue("abxyzab"), "p1", Value.createStringValue("ab")); |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| |
| final Schema schema = Schema.builder().addStringField("field1").build(); |
| |
| PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValues("xyz").build()); |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testTrim3() { |
| String sql = "SELECT trim(@p0, @p1)"; |
| ImmutableMap<String, Value> params = |
| ImmutableMap.of( |
| "p0", Value.createSimpleNullValue(TypeKind.TYPE_STRING), |
| "p1", Value.createSimpleNullValue(TypeKind.TYPE_STRING)); |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| |
| final Schema schema = Schema.builder().addNullableField("field1", FieldType.STRING).build(); |
| |
| PAssert.that(stream) |
| .containsInAnyOrder(Row.withSchema(schema).addValues((String) null).build()); |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testLTrim1() { |
| String sql = "SELECT ltrim(@p0)"; |
| ImmutableMap<String, Value> params = |
| ImmutableMap.of("p0", Value.createStringValue(" a b c ")); |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| |
| final Schema schema = Schema.builder().addStringField("field1").build(); |
| |
| PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValues("a b c ").build()); |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testLTrim2() { |
| String sql = "SELECT ltrim(@p0, @p1)"; |
| ImmutableMap<String, Value> params = |
| ImmutableMap.of( |
| "p0", Value.createStringValue("abxyzab"), "p1", Value.createStringValue("ab")); |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| |
| final Schema schema = Schema.builder().addStringField("field1").build(); |
| |
| PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValues("xyzab").build()); |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testLTrim3() { |
| String sql = "SELECT ltrim(@p0, @p1)"; |
| ImmutableMap<String, Value> params = |
| ImmutableMap.of( |
| "p0", Value.createSimpleNullValue(TypeKind.TYPE_STRING), |
| "p1", Value.createSimpleNullValue(TypeKind.TYPE_STRING)); |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| |
| final Schema schema = Schema.builder().addNullableField("field1", FieldType.STRING).build(); |
| |
| PAssert.that(stream) |
| .containsInAnyOrder(Row.withSchema(schema).addValues((String) null).build()); |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testRTrim1() { |
| String sql = "SELECT rtrim(@p0)"; |
| ImmutableMap<String, Value> params = |
| ImmutableMap.of("p0", Value.createStringValue(" a b c ")); |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| |
| final Schema schema = Schema.builder().addStringField("field1").build(); |
| |
| PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValues(" a b c").build()); |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testRTrim2() { |
| String sql = "SELECT rtrim(@p0, @p1)"; |
| ImmutableMap<String, Value> params = |
| ImmutableMap.of( |
| "p0", Value.createStringValue("abxyzab"), "p1", Value.createStringValue("ab")); |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| |
| final Schema schema = Schema.builder().addStringField("field1").build(); |
| |
| PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValues("abxyz").build()); |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testRTrim3() { |
| String sql = "SELECT rtrim(@p0, @p1)"; |
| ImmutableMap<String, Value> params = |
| ImmutableMap.of( |
| "p0", Value.createSimpleNullValue(TypeKind.TYPE_STRING), |
| "p1", Value.createSimpleNullValue(TypeKind.TYPE_STRING)); |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| |
| final Schema schema = Schema.builder().addNullableField("field1", FieldType.STRING).build(); |
| |
| PAssert.that(stream) |
| .containsInAnyOrder(Row.withSchema(schema).addValues((String) null).build()); |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| @Ignore("") |
| public void testCastBytesToString1() { |
| String sql = "SELECT CAST(@p0 AS STRING)"; |
| ImmutableMap<String, Value> params = |
| ImmutableMap.of("p0", Value.createBytesValue(ByteString.copyFromUtf8("`"))); |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| |
| final Schema schema = Schema.builder().addStringField("field1").build(); |
| |
| PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValues("`").build()); |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testCastBytesToString2() { |
| String sql = "SELECT CAST(b'b' AS STRING)"; |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| |
| final Schema schema = Schema.builder().addStringField("field1").build(); |
| |
| PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValues("b").build()); |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| @Ignore("") |
| public void testCastBytesToStringFromTable() { |
| String sql = "SELECT CAST(bytes_col AS STRING) FROM table_all_types"; |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| |
| final Schema schema = Schema.builder().addStringField("field1").build(); |
| |
| PAssert.that(stream) |
| .containsInAnyOrder( |
| Row.withSchema(schema).addValues("1").build(), |
| Row.withSchema(schema).addValues("2").build(), |
| Row.withSchema(schema).addValues("3").build(), |
| Row.withSchema(schema).addValues("4").build(), |
| Row.withSchema(schema).addValues("5").build()); |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testCastStringToTS() { |
| String sql = "SELECT CAST('2019-01-15 13:21:03' AS TIMESTAMP)"; |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| |
| final Schema schema = Schema.builder().addDateTimeField("field_1").build(); |
| |
| PAssert.that(stream) |
| .containsInAnyOrder( |
| Row.withSchema(schema) |
| .addValues(parseTimestampWithUTCTimeZone("2019-01-15 13:21:03")) |
| .build()); |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testCastStringToString() { |
| String sql = "SELECT CAST(@p0 AS STRING)"; |
| ImmutableMap<String, Value> params = ImmutableMap.of("p0", Value.createStringValue("")); |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| |
| final Schema schema = Schema.builder().addStringField("field1").build(); |
| |
| PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValues("").build()); |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testCastStringToInt64() { |
| String sql = "SELECT CAST(@p0 AS INT64)"; |
| ImmutableMap<String, Value> params = ImmutableMap.of("p0", Value.createStringValue("123")); |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| |
| final Schema schema = Schema.builder().addInt64Field("field1").build(); |
| |
| PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValues(123L).build()); |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testSelectConstant() { |
| String sql = "SELECT 'hi'"; |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| |
| final Schema schema = Schema.builder().addStringField("field1").build(); |
| |
| PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValues("hi").build()); |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| @Ignore("Does not support DATE_ADD.") |
| public void testDateAddWithParameter() { |
| String sql = |
| "SELECT " |
| + "DATE_ADD(@p0, INTERVAL @p1 DAY), " |
| + "DATE_ADD(@p2, INTERVAL @p3 DAY), " |
| + "DATE_ADD(@p4, INTERVAL @p5 YEAR), " |
| + "DATE_ADD(@p6, INTERVAL @p7 DAY), " |
| + "DATE_ADD(@p8, INTERVAL @p9 MONTH)"; |
| // Value |
| ImmutableMap<String, Value> params = |
| ImmutableMap.<String, Value>builder() |
| .put("p0", Value.createDateValue(0)) // 1970-01-01 |
| .put("p1", Value.createInt64Value(2L)) |
| .put("p2", parseDateToValue("2019-01-01")) |
| .put("p3", Value.createInt64Value(2L)) |
| .put("p4", Value.createSimpleNullValue(TypeKind.TYPE_DATE)) |
| .put("p5", Value.createInt64Value(1L)) |
| .put("p6", parseDateToValue("2000-02-29")) |
| .put("p7", Value.createInt64Value(-365L)) |
| .put("p8", parseDateToValue("1999-03-31")) |
| .put("p9", Value.createInt64Value(-1L)) |
| .build(); |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| |
| final Schema schema = |
| Schema.builder() |
| .addDateTimeField("field1") |
| .addDateTimeField("field2") |
| .addNullableField("field3", DATETIME) |
| .addDateTimeField("field4") |
| .addDateTimeField("field5") |
| .build(); |
| PAssert.that(stream) |
| .containsInAnyOrder( |
| Row.withSchema(schema) |
| .addValues( |
| parseDate("1970-01-03"), |
| parseDate("2019-01-03"), |
| null, |
| parseDate("1999-03-01"), |
| parseDate("1999-02-28")) |
| .build()); |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| @Ignore("Does not support TIME_ADD.") |
| public void testTimeAddWithParameter() { |
| String sql = "SELECT TIME_ADD(@p0, INTERVAL @p1 SECOND)"; |
| ImmutableMap<String, Value> params = |
| ImmutableMap.of( |
| "p0", parseTimeToValue("12:13:14.123"), |
| "p1", Value.createInt64Value(1L)); |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| |
| final Schema schema = Schema.builder().addDateTimeField("field1").build(); |
| PAssert.that(stream) |
| .containsInAnyOrder(Row.withSchema(schema).addValues(parseTime("12:13:15.123")).build()); |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| @Ignore("") |
| public void testTimestampAddWithParameter() { |
| String sql = "SELECT TIMESTAMP_ADD(@p0, INTERVAL @p1 MILLISECOND)"; |
| ImmutableMap<String, Value> params = |
| ImmutableMap.of( |
| "p0", parseTimestampWithTZToValue("2001-01-01 00:00:00+00"), |
| "p1", Value.createInt64Value(1L)); |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| |
| final Schema schema = Schema.builder().addDateTimeField("field1").build(); |
| PAssert.that(stream) |
| .containsInAnyOrder( |
| Row.withSchema(schema) |
| .addValues(parseTimestampWithTimeZone("2001-01-01 00:00:00.001+00")) |
| .build()); |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testTimeStampAddWithParameter() { |
| String sql = "SELECT TIMESTAMP_ADD(@p0, INTERVAL @p1 MINUTE)"; |
| ImmutableMap<String, Value> params = |
| ImmutableMap.of( |
| "p0", parseTimestampWithTZToValue("2008-12-25 15:30:00+07:30"), |
| "p1", Value.createInt64Value(10L)); |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| |
| final Schema schema = Schema.builder().addDateTimeField("field1").build(); |
| PAssert.that(stream) |
| .containsInAnyOrder( |
| Row.withSchema(schema) |
| .addValues(parseTimestampWithTimeZone("2008-12-25 15:40:00+07:30")) |
| .build()); |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testSelectFromTableWithMap() { |
| String sql = "SELECT row_field FROM table_with_map"; |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| Schema rowSchema = Schema.builder().addInt64Field("row_id").addStringField("data").build(); |
| PAssert.that(stream) |
| .containsInAnyOrder( |
| Row.withSchema(Schema.builder().addRowField("row_field", rowSchema).build()) |
| .addValues(Row.withSchema(rowSchema).addValues(1L, "data1").build()) |
| .build()); |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testSubQuery() { |
| String sql = "select sum(Key) from KeyValue\n" + "group by (select Key)"; |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| thrown.expect(IllegalArgumentException.class); |
| thrown.expectMessage("Does not support sub-queries"); |
| zetaSQLQueryPlanner.convertToBeamRel(sql); |
| } |
| |
| @Test |
| public void testSubstr() { |
| String sql = "SELECT substr(@p0, @p1, @p2)"; |
| ImmutableMap<String, Value> params = |
| ImmutableMap.of( |
| "p0", Value.createStringValue("abc"), |
| "p1", Value.createInt64Value(-2L), |
| "p2", Value.createInt64Value(1L)); |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| |
| final Schema schema = Schema.builder().addStringField("field1").build(); |
| PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValues("b").build()); |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testSubstrWithLargeValueExpectException() { |
| String sql = "SELECT substr(@p0, @p1, @p2)"; |
| ImmutableMap<String, Value> params = |
| ImmutableMap.of( |
| "p0", Value.createStringValue("abc"), |
| "p1", Value.createInt64Value(Integer.MAX_VALUE + 1L), |
| "p2", Value.createInt64Value(Integer.MIN_VALUE - 1L)); |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params); |
| BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| thrown.expect(RuntimeException.class); |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testSelectAll() { |
| String sql = "SELECT ALL Key, Value FROM KeyValue;"; |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| |
| final Schema schema = Schema.builder().addInt64Field("field1").addStringField("field2").build(); |
| |
| PAssert.that(stream) |
| .containsInAnyOrder( |
| Row.withSchema(schema).addValues(14L, "KeyValue234").build(), |
| Row.withSchema(schema).addValues(15L, "KeyValue235").build()); |
| |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testSelectDistinct() { |
| String sql = "SELECT DISTINCT Key FROM aggregate_test_table;"; |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| |
| final Schema schema = Schema.builder().addInt64Field("field1").build(); |
| |
| PAssert.that(stream) |
| .containsInAnyOrder( |
| Row.withSchema(schema).addValues(1L).build(), |
| Row.withSchema(schema).addValues(2L).build(), |
| Row.withSchema(schema).addValues(3L).build()); |
| |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| @Ignore("Bytes cannot be in UNION ALL") |
| public void testSelectDistinct2() { |
| String sql = |
| "SELECT DISTINCT val.BYTES\n" |
| + "from (select b\"BYTES\" BYTES union all\n" |
| + " select b\"bytes\" union all\n" |
| + " select b\"ByTeS\") val"; |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| |
| final Schema schema = Schema.builder().addByteArrayField("field1").build(); |
| |
| PAssert.that(stream) |
| .containsInAnyOrder( |
| Row.withSchema(schema).addValues("BYTES".getBytes(StandardCharsets.UTF_8)).build(), |
| Row.withSchema(schema).addValues("ByTeS".getBytes(StandardCharsets.UTF_8)).build(), |
| Row.withSchema(schema).addValues("bytes".getBytes(StandardCharsets.UTF_8)).build()); |
| |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testSelectBytes() { |
| String sql = "SELECT b\"ByTes\""; |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| |
| final Schema schema = Schema.builder().addByteArrayField("field1").build(); |
| |
| PAssert.that(stream) |
| .containsInAnyOrder( |
| Row.withSchema(schema).addValues("ByTes".getBytes(StandardCharsets.UTF_8)).build()); |
| |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testSelectExcept() { |
| String sql = "SELECT * EXCEPT (Key, ts) FROM KeyValue;"; |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| |
| final Schema schema = Schema.builder().addStringField("field2").build(); |
| |
| PAssert.that(stream) |
| .containsInAnyOrder( |
| Row.withSchema(schema).addValues("KeyValue234").build(), |
| Row.withSchema(schema).addValues("KeyValue235").build()); |
| |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testSelectReplace() { |
| String sql = |
| "WITH orders AS\n" |
| + " (SELECT 5 as order_id,\n" |
| + " \"sprocket\" as item_name,\n" |
| + " 200 as quantity)\n" |
| + "SELECT * REPLACE (\"widget\" AS item_name)\n" |
| + "FROM orders"; |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| |
| final Schema schema = |
| Schema.builder() |
| .addInt64Field("field1") |
| .addStringField("field2") |
| .addInt64Field("field3") |
| .build(); |
| |
| PAssert.that(stream) |
| .containsInAnyOrder(Row.withSchema(schema).addValues(5L, "widget", 200L).build()); |
| |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testUnionAllBasic() { |
| String sql = |
| "SELECT row_id FROM table_all_types UNION ALL SELECT row_id FROM table_all_types_2"; |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| |
| final Schema schema = Schema.builder().addInt64Field("field1").build(); |
| |
| PAssert.that(stream) |
| .containsInAnyOrder( |
| Row.withSchema(schema).addValue(1L).build(), |
| Row.withSchema(schema).addValue(2L).build(), |
| Row.withSchema(schema).addValue(3L).build(), |
| Row.withSchema(schema).addValue(4L).build(), |
| Row.withSchema(schema).addValue(5L).build(), |
| Row.withSchema(schema).addValue(6L).build(), |
| Row.withSchema(schema).addValue(7L).build(), |
| Row.withSchema(schema).addValue(8L).build(), |
| Row.withSchema(schema).addValue(9L).build(), |
| Row.withSchema(schema).addValue(10L).build()); |
| |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testAVGWithLongInput() { |
| String sql = "SELECT AVG(f_int_1) FROM aggregate_test_table;"; |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| thrown.expect(RuntimeException.class); |
| thrown.expectMessage( |
| "AVG(LONG) is not supported. You might want to use AVG(CAST(expression AS DOUBLE)."); |
| zetaSQLQueryPlanner.convertToBeamRel(sql); |
| } |
| |
| @Test |
| public void testReverseString() { |
| String sql = "SELECT REVERSE('abc');"; |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| |
| final Schema schema = Schema.builder().addStringField("field2").build(); |
| |
| PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValues("cba").build()); |
| |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testCharLength() { |
| String sql = "SELECT CHAR_LENGTH('abc');"; |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| |
| final Schema schema = Schema.builder().addInt64Field("field").build(); |
| PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValues(3L).build()); |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testCharLengthNull() { |
| String sql = "SELECT CHAR_LENGTH(@p0);"; |
| |
| ImmutableMap<String, Value> params = |
| ImmutableMap.of("p0", Value.createSimpleNullValue(TypeKind.TYPE_STRING)); |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| |
| final Schema schema = Schema.builder().addNullableField("field", FieldType.INT64).build(); |
| PAssert.that(stream) |
| .containsInAnyOrder(Row.withSchema(schema).addValues((Object) null).build()); |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testExtractTimestampThrowsOnMicrosecondNotSupported() { |
| String sql = |
| "WITH Timestamps AS (\n" |
| + " SELECT TIMESTAMP '2000-01-01 00:11:22.345678+00' as timestamp\n" |
| + ")\n" |
| + "SELECT\n" |
| + " timestamp,\n" |
| + " EXTRACT(ISOYEAR FROM timestamp) AS isoyear,\n" |
| + " EXTRACT(YEAR FROM timestamp) AS year,\n" |
| + " EXTRACT(ISOWEEK FROM timestamp) AS week,\n" |
| + " EXTRACT(MINUTE FROM timestamp) AS minute\n" |
| + "FROM Timestamps\n"; |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| ImmutableMap<String, Value> params = ImmutableMap.of(); |
| thrown.expect(IllegalArgumentException.class); |
| zetaSQLQueryPlanner.convertToBeamRel(sql, params); |
| } |
| |
| /** Only sample scenarios are covered here. Excessive testing is done via Compliance tests. */ |
| @Test |
| @Ignore("ZetaSQL does not support EnumType to IdentifierLiteral") |
| public void testExtractTimestamp() { |
| String sql = |
| "WITH Timestamps AS (\n" |
| + " SELECT TIMESTAMP '2005-01-03 12:34:56' AS timestamp UNION ALL\n" |
| + " SELECT TIMESTAMP '2017-05-26'\n" |
| + ")\n" |
| + "SELECT\n" |
| + " timestamp,\n" |
| + " EXTRACT(ISOYEAR FROM timestamp) AS isoyear,\n" |
| + " EXTRACT(YEAR FROM timestamp) AS year,\n" |
| + " EXTRACT(ISOWEEK FROM timestamp) AS week,\n" |
| + " EXTRACT(MINUTE FROM timestamp) AS minute\n" |
| + "FROM Timestamps\n"; |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| ImmutableMap<String, Value> params = ImmutableMap.of(); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| |
| final Schema schema = |
| Schema.builder() |
| .addDateTimeField("ts") |
| .addField("isoyear", FieldType.INT64) |
| .addField("year", FieldType.INT64) |
| .addField("week", FieldType.INT64) |
| .addField("minute", FieldType.INT64) |
| .build(); |
| PAssert.that(stream) |
| .containsInAnyOrder( |
| Row.withSchema(schema) |
| .addValues( |
| DateTimeUtils.parseTimestampWithUTCTimeZone("2005-01-03 12:34:56"), |
| 2005L, |
| 2005L, |
| 1L, |
| 34L) |
| .build(), |
| Row.withSchema(schema) |
| .addValues(parseDate("2017-05-26"), 2017L, 2017L, 21L, 0L) |
| .build()); |
| |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| @Ignore("ZetaSQL does not support EnumType to IdentifierLiteral") |
| public void testExtractTimestampAtTimeZoneThrowsBecauseNotSupported() { |
| String sql = |
| "WITH Timestamps AS (\n" |
| + " SELECT TIMESTAMP '2017-05-26' AS timestamp\n" |
| + ")\n" |
| + "SELECT\n" |
| + " timestamp,\n" |
| + " EXTRACT(HOUR FROM timestamp AT TIME ZONE 'America/Vancouver') AS hour,\n" |
| + " EXTRACT(DAY FROM timestamp AT TIME ZONE 'America/Vancouver') AS day\n" |
| + "FROM Timestamps\n"; |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| ImmutableMap<String, Value> params = ImmutableMap.of(); |
| thrown.expect(IllegalArgumentException.class); |
| zetaSQLQueryPlanner.convertToBeamRel(sql, params); |
| } |
| |
| @Test |
| @Ignore("") |
| public void testExtractDateFromTimestampThrowsBecauseNotSupported() { |
| String sql = |
| "WITH Timestamps AS (\n" |
| + " SELECT TIMESTAMP '2017-05-26' AS ts\n" |
| + ")\n" |
| + "SELECT\n" |
| + " ts,\n" |
| + " EXTRACT(DATE FROM ts) AS dt\n" |
| + "FROM Timestamps\n"; |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| ImmutableMap<String, Value> params = ImmutableMap.of(); |
| thrown.expect(SqlException.class); |
| zetaSQLQueryPlanner.convertToBeamRel(sql, params); |
| } |
| |
| @Test |
| public void testIsNullTrueFalse() { |
| String sql = |
| "WITH Src AS (\n" |
| + " SELECT NULL as data UNION ALL\n" |
| + " SELECT TRUE UNION ALL\n" |
| + " SELECT FALSE\n" |
| + ")\n" |
| + "SELECT\n" |
| + " data IS NULL as isnull,\n" |
| + " data IS NOT NULL as isnotnull,\n" |
| + " data IS TRUE as istrue,\n" |
| + " data IS NOT TRUE as isnottrue,\n" |
| + " data IS FALSE as isfalse,\n" |
| + " data IS NOT FALSE as isnotfalse\n" |
| + "FROM Src\n"; |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| ImmutableMap<String, Value> params = ImmutableMap.of(); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| |
| final Schema schema = |
| Schema.builder() |
| .addField("isnull", FieldType.BOOLEAN) |
| .addField("isnotnull", FieldType.BOOLEAN) |
| .addField("istrue", FieldType.BOOLEAN) |
| .addField("isnottrue", FieldType.BOOLEAN) |
| .addField("isfalse", FieldType.BOOLEAN) |
| .addField("isnotfalse", FieldType.BOOLEAN) |
| .build(); |
| PAssert.that(stream) |
| .containsInAnyOrder( |
| Row.withSchema(schema).addValues(true, false, false, true, false, true).build(), |
| Row.withSchema(schema).addValues(false, true, true, false, false, true).build(), |
| Row.withSchema(schema).addValues(false, true, false, true, true, false).build()); |
| |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| @Test |
| public void testSimpleTableName() { |
| String sql = "SELECT Key FROM KeyValue"; |
| |
| ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); |
| BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); |
| PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); |
| |
| Schema singleField = Schema.builder().addInt64Field("field1").build(); |
| PAssert.that(stream) |
| .containsInAnyOrder( |
| Row.withSchema(singleField).addValues(14L).build(), |
| Row.withSchema(singleField).addValues(15L).build()); |
| pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); |
| } |
| |
| private void initializeCalciteEnvironment() { |
| initializeCalciteEnvironmentWithContext(); |
| } |
| |
| private void initializeCalciteEnvironmentWithContext(Context... extraContext) { |
| JdbcConnection jdbcConnection = |
| JdbcDriver.connect(tableProvider, PipelineOptionsFactory.create()); |
| SchemaPlus defaultSchemaPlus = jdbcConnection.getCurrentSchemaPlus(); |
| final ImmutableList<RelTraitDef> traitDefs = ImmutableList.of(ConventionTraitDef.INSTANCE); |
| |
| Object[] contexts = |
| ImmutableList.<Context>builder() |
| .add(Contexts.of(jdbcConnection.config())) |
| .add(extraContext) |
| .build() |
| .toArray(); |
| |
| this.config = |
| Frameworks.newConfigBuilder() |
| .defaultSchema(defaultSchemaPlus) |
| .traitDefs(traitDefs) |
| .context(Contexts.of(contexts)) |
| .ruleSets(BeamRuleSets.getRuleSets()) |
| .costFactory(null) |
| .typeSystem(jdbcConnection.getTypeFactory().getTypeSystem()) |
| .build(); |
| } |
| |
| private void initializeBeamTableProvider() { |
| Map<String, BeamSqlTable> testBoundedTableMap = new HashMap<>(); |
| testBoundedTableMap.put("KeyValue", BASIC_TABLE_ONE); |
| testBoundedTableMap.put("BigTable", BASIC_TABLE_TWO); |
| testBoundedTableMap.put("Spanner", BASIC_TABLE_THREE); |
| testBoundedTableMap.put("aggregate_test_table", AGGREGATE_TABLE_ONE); |
| testBoundedTableMap.put("window_test_table", TIMESTAMP_TABLE_ONE); |
| testBoundedTableMap.put("window_test_table_two", TIMESTAMP_TABLE_TWO); |
| testBoundedTableMap.put("time_test_table", TIME_TABLE); |
| testBoundedTableMap.put("all_null_table", TABLE_ALL_NULL); |
| testBoundedTableMap.put("table_with_struct", TABLE_WITH_STRUCT); |
| testBoundedTableMap.put("table_with_struct_two", TABLE_WITH_STRUCT_TWO); |
| testBoundedTableMap.put("table_with_array", TABLE_WITH_ARRAY); |
| testBoundedTableMap.put("table_with_array_for_unnest", TABLE_WITH_ARRAY_FOR_UNNEST); |
| testBoundedTableMap.put("table_for_case_when", TABLE_FOR_CASE_WHEN); |
| testBoundedTableMap.put("aggregate_test_table_two", AGGREGATE_TABLE_TWO); |
| testBoundedTableMap.put("table_empty", TABLE_EMPTY); |
| testBoundedTableMap.put("table_all_types", TABLE_ALL_TYPES); |
| testBoundedTableMap.put("table_all_types_2", TABLE_ALL_TYPES_2); |
| testBoundedTableMap.put("table_with_map", TABLE_WITH_MAP); |
| testBoundedTableMap.put("table_with_struct_ts_string", TABLE_WITH_STRUCT_TIMESTAMP_STRING); |
| |
| tableProvider = new ReadOnlyTableProvider("test_table_provider", testBoundedTableMap); |
| } |
| } |