blob: 9a67da6327d94cd4482e41e9a77c1fb50c0c37a3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.extensions.sql.zetasql;
import static org.apache.beam.sdk.extensions.sql.zetasql.DateTimeUtils.parseTimestampWithUTCTimeZone;
import static org.apache.beam.sdk.schemas.Schema.FieldType.DATETIME;
import static org.junit.Assert.assertTrue;
import com.google.protobuf.ByteString;
import com.google.zetasql.SqlException;
import com.google.zetasql.StructType;
import com.google.zetasql.StructType.StructField;
import com.google.zetasql.TypeFactory;
import com.google.zetasql.Value;
import com.google.zetasql.ZetaSQLType.TypeKind;
import java.nio.charset.StandardCharsets;
import java.time.LocalDate;
import java.time.LocalTime;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.extensions.sql.SqlTransform;
import org.apache.beam.sdk.extensions.sql.impl.BeamSqlPipelineOptions;
import org.apache.beam.sdk.extensions.sql.impl.ParseException;
import org.apache.beam.sdk.extensions.sql.impl.SqlConversionException;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.Field;
import org.apache.beam.sdk.schemas.Schema.FieldType;
import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.chrono.ISOChronology;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
/** Tests for various operations/functions defined by ZetaSQL dialect. */
@RunWith(JUnit4.class)
public class ZetaSqlDialectSpecTest extends ZetaSqlTestBase {
@Rule public transient TestPipeline pipeline = TestPipeline.create();
@Rule public ExpectedException thrown = ExpectedException.none();
@Before
public void setUp() {
initialize();
}
@Test
public void testSimpleSelect() {
String sql =
"SELECT CAST (1243 as INT64), "
+ "CAST ('2018-09-15 12:59:59.000000+00' as TIMESTAMP), "
+ "CAST ('string' as STRING);";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema =
Schema.builder()
.addInt64Field("field1")
.addDateTimeField("field2")
.addStringField("field3")
.build();
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(schema)
.addValues(
1243L,
new DateTime(2018, 9, 15, 12, 59, 59, ISOChronology.getInstanceUTC()),
"string")
.build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
// Verify that we can set the query planner via withQueryPlannerClass
@Test
public void testWithQueryPlannerClass() {
String sql =
"SELECT CAST (1243 as INT64), "
+ "CAST ('2018-09-15 12:59:59.000000+00' as TIMESTAMP), "
+ "CAST ('string' as STRING);";
PCollection<Row> stream =
pipeline.apply(SqlTransform.query(sql).withQueryPlannerClass(ZetaSQLQueryPlanner.class));
final Schema schema =
Schema.builder()
.addInt64Field("field1")
.addDateTimeField("field2")
.addStringField("field3")
.build();
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(schema)
.addValues(
1243L,
new DateTime(2018, 9, 15, 12, 59, 59, ISOChronology.getInstanceUTC()),
"string")
.build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
// Verify that we can set the query planner via pipeline options
@Test
public void testPlannerNamePipelineOption() {
pipeline
.getOptions()
.as(BeamSqlPipelineOptions.class)
.setPlannerName("org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLQueryPlanner");
String sql =
"SELECT CAST (1243 as INT64), "
+ "CAST ('2018-09-15 12:59:59.000000+00' as TIMESTAMP), "
+ "CAST ('string' as STRING);";
PCollection<Row> stream = pipeline.apply(SqlTransform.query(sql));
final Schema schema =
Schema.builder()
.addInt64Field("field1")
.addDateTimeField("field2")
.addStringField("field3")
.build();
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(schema)
.addValues(
1243L,
new DateTime(2018, 9, 15, 12, 59, 59, ISOChronology.getInstanceUTC()),
"string")
.build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testByteLiterals() {
String sql = "SELECT b'abc'";
byte[] byteString = new byte[] {'a', 'b', 'c'};
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema = Schema.builder().addNullableField("ColA", FieldType.BYTES).build();
PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValues(byteString).build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testByteString() {
String sql = "SELECT @p0 IS NULL AS ColA";
ByteString byteString = ByteString.copyFrom(new byte[] {0x62});
ImmutableMap<String, Value> params =
ImmutableMap.<String, Value>builder().put("p0", Value.createBytesValue(byteString)).build();
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema = Schema.builder().addNullableField("ColA", FieldType.BOOLEAN).build();
PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValues(false).build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testFloat() {
String sql = "SELECT 3.0";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema = Schema.builder().addNullableField("ColA", FieldType.DOUBLE).build();
PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValues(3.0).build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testStringLiterals() {
String sql = "SELECT '\"America/Los_Angeles\"\\n'";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema = Schema.builder().addNullableField("ColA", FieldType.STRING).build();
PAssert.that(stream)
.containsInAnyOrder(Row.withSchema(schema).addValues("\"America/Los_Angeles\"\n").build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testParameterString() {
String sql = "SELECT ?";
ImmutableList<Value> params = ImmutableList.of(Value.createStringValue("abc\n"));
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema = Schema.builder().addNullableField("ColA", FieldType.STRING).build();
PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValues("abc\n").build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testEQ1() {
String sql = "SELECT @p0 = @p1 AS ColA";
ImmutableMap<String, Value> params =
ImmutableMap.<String, Value>builder()
.put("p0", Value.createSimpleNullValue(TypeKind.TYPE_BOOL))
.put("p1", Value.createBoolValue(true))
.build();
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema = Schema.builder().addNullableField("field1", FieldType.BOOLEAN).build();
PAssert.that(stream)
.containsInAnyOrder(Row.withSchema(schema).addValues((Boolean) null).build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testEQ2() {
String sql = "SELECT @p0 = @p1 AS ColA";
ImmutableMap<String, Value> params =
ImmutableMap.<String, Value>builder()
.put("p0", Value.createDoubleValue(0))
.put("p1", Value.createDoubleValue(Double.POSITIVE_INFINITY))
.build();
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema = Schema.builder().addBooleanField("field1").build();
PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValues(false).build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testEQ3() {
String sql = "SELECT @p0 = @p1 AS ColA";
ImmutableMap<String, Value> params =
ImmutableMap.<String, Value>builder()
.put("p0", Value.createSimpleNullValue(TypeKind.TYPE_DOUBLE))
.put("p1", Value.createDoubleValue(3.14))
.build();
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema = Schema.builder().addNullableField("field1", FieldType.BOOLEAN).build();
PAssert.that(stream)
.containsInAnyOrder(Row.withSchema(schema).addValues((Boolean) null).build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testEQ4() {
String sql = "SELECT @p0 = @p1 AS ColA";
ImmutableMap<String, Value> params =
ImmutableMap.<String, Value>builder()
.put("p0", Value.createBytesValue(ByteString.copyFromUtf8("hello")))
.put("p1", Value.createBytesValue(ByteString.copyFromUtf8("hello")))
.build();
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema = Schema.builder().addNullableField("field1", FieldType.BOOLEAN).build();
PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValues(true).build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testEQ5() {
String sql = "SELECT b'hello' = b'hello' AS ColA";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema = Schema.builder().addNullableField("field1", FieldType.BOOLEAN).build();
PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValues(true).build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testEQ6() {
String sql = "SELECT ? = ? AS ColA";
ImmutableList<Value> params =
ImmutableList.of(Value.createInt64Value(4L), Value.createInt64Value(5L));
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema = Schema.builder().addNullableField("field1", FieldType.BOOLEAN).build();
PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValues(false).build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testIsNotNull1() {
String sql = "SELECT @p0 IS NOT NULL AS ColA";
ImmutableMap<String, Value> params =
ImmutableMap.of("p0", Value.createSimpleNullValue(TypeKind.TYPE_STRING));
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema = Schema.builder().addNullableField("field1", FieldType.BOOLEAN).build();
PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValues(false).build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testIsNotNull2() {
String sql = "SELECT @p0 IS NOT NULL AS ColA";
ImmutableMap<String, Value> params =
ImmutableMap.of(
"p0",
Value.createNullValue(
TypeFactory.createArrayType(TypeFactory.createSimpleType(TypeKind.TYPE_INT64))));
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema = Schema.builder().addNullableField("field1", FieldType.BOOLEAN).build();
PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValues(false).build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testIsNotNull3() {
String sql = "SELECT @p0 IS NOT NULL AS ColA";
ImmutableMap<String, Value> params =
ImmutableMap.of(
"p0",
Value.createNullValue(
TypeFactory.createStructType(
Arrays.asList(
new StructField(
"a", TypeFactory.createSimpleType(TypeKind.TYPE_STRING))))));
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema = Schema.builder().addNullableField("field1", FieldType.BOOLEAN).build();
PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValues(false).build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testIfBasic() {
String sql = "SELECT IF(@p0, @p1, @p2) AS ColA";
ImmutableMap<String, Value> params =
ImmutableMap.of(
"p0",
Value.createBoolValue(true),
"p1",
Value.createInt64Value(1),
"p2",
Value.createInt64Value(2));
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema = Schema.builder().addNullableField("field1", FieldType.INT64).build();
PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValues(1L).build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testIfPositional() {
String sql = "SELECT IF(?, ?, ?) AS ColA";
ImmutableList<Value> params =
ImmutableList.of(
Value.createBoolValue(true), Value.createInt64Value(1), Value.createInt64Value(2));
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema = Schema.builder().addNullableField("field1", FieldType.INT64).build();
PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValues(1L).build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testCoalesceBasic() {
String sql = "SELECT COALESCE(@p0, @p1, @p2) AS ColA";
ImmutableMap<String, Value> params =
ImmutableMap.of(
"p0",
Value.createSimpleNullValue(TypeKind.TYPE_STRING),
"p1",
Value.createStringValue("yay"),
"p2",
Value.createStringValue("nay"));
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema = Schema.builder().addNullableField("field1", FieldType.STRING).build();
PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValues("yay").build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testCoalesceSingleArgument() {
String sql = "SELECT COALESCE(@p0) AS ColA";
ImmutableMap<String, Value> params =
ImmutableMap.of("p0", Value.createSimpleNullValue(TypeKind.TYPE_INT64));
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema =
Schema.builder().addNullableField("field1", FieldType.array(FieldType.INT64)).build();
PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValue(null).build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testCoalesceNullArray() {
String sql = "SELECT COALESCE(@p0, @p1) AS ColA";
ImmutableMap<String, Value> params =
ImmutableMap.of(
"p0",
Value.createNullValue(
TypeFactory.createArrayType(TypeFactory.createSimpleType(TypeKind.TYPE_INT64))),
"p1",
Value.createNullValue(
TypeFactory.createArrayType(TypeFactory.createSimpleType(TypeKind.TYPE_INT64))));
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema =
Schema.builder().addNullableField("field1", FieldType.array(FieldType.INT64)).build();
PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValue(null).build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testNullIfCoercion() {
String sql = "SELECT NULLIF(@p0, @p1) AS ColA";
ImmutableMap<String, Value> params =
ImmutableMap.of(
"p0",
Value.createInt64Value(3L),
"p1",
Value.createSimpleNullValue(TypeKind.TYPE_DOUBLE));
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema = Schema.builder().addNullableField("field1", FieldType.DOUBLE).build();
PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValue(3.0).build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testCoalesceNullStruct() {
String sql = "SELECT COALESCE(NULL, STRUCT(\"a\" AS s, -33 AS i))";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema innerSchema =
Schema.of(Field.of("s", FieldType.STRING), Field.of("i", FieldType.INT64));
final Schema schema =
Schema.builder().addNullableField("field1", FieldType.row(innerSchema)).build();
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(schema)
.addValue(Row.withSchema(innerSchema).addValues("a", -33L).build())
.build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testIfTimestamp() {
String sql = "SELECT IF(@p0, @p1, @p2) AS ColA";
ImmutableMap<String, Value> params =
ImmutableMap.of(
"p0",
Value.createBoolValue(false),
"p1",
Value.createTimestampValueFromUnixMicros(0),
"p2",
Value.createTimestampValueFromUnixMicros(
DateTime.parse("2019-01-01T00:00:00Z").getMillis() * 1000));
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema = Schema.builder().addNullableField("field1", DATETIME).build();
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(schema).addValues(DateTime.parse("2019-01-01T00:00:00Z")).build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
@Ignore("$make_array is not implemented")
public void testMakeArray() {
String sql = "SELECT [s3, s1, s2] FROM (SELECT \"foo\" AS s1, \"bar\" AS s2, \"baz\" AS s3);";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema =
Schema.builder().addNullableField("field1", FieldType.array(FieldType.STRING)).build();
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(schema).addValue(ImmutableList.of("baz", "foo", "bar")).build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testNullIfPositive() {
String sql = "SELECT NULLIF(@p0, @p1) AS ColA";
ImmutableMap<String, Value> params =
ImmutableMap.of(
"p0", Value.createStringValue("null"), "p1", Value.createStringValue("null"));
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema = Schema.builder().addNullableField("field1", FieldType.STRING).build();
PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValue(null).build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testNullIfNegative() {
String sql = "SELECT NULLIF(@p0, @p1) AS ColA";
ImmutableMap<String, Value> params =
ImmutableMap.of(
"p0", Value.createStringValue("foo"), "p1", Value.createStringValue("null"));
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema = Schema.builder().addNullableField("field1", FieldType.STRING).build();
PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValues("foo").build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testIfNullPositive() {
String sql = "SELECT IFNULL(@p0, @p1) AS ColA";
ImmutableMap<String, Value> params =
ImmutableMap.of(
"p0", Value.createStringValue("foo"), "p1", Value.createStringValue("default"));
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema = Schema.builder().addNullableField("field1", FieldType.STRING).build();
PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValues("foo").build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testIfNullNegative() {
String sql = "SELECT IFNULL(@p0, @p1) AS ColA";
ImmutableMap<String, Value> params =
ImmutableMap.of(
"p0",
Value.createSimpleNullValue(TypeKind.TYPE_STRING),
"p1",
Value.createStringValue("yay"));
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema = Schema.builder().addNullableField("field1", FieldType.STRING).build();
PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValues("yay").build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testEmptyArrayParameter() {
String sql = "SELECT @p0 AS ColA";
ImmutableMap<String, Value> params =
ImmutableMap.of(
"p0",
Value.createArrayValue(
TypeFactory.createArrayType(TypeFactory.createSimpleType(TypeKind.TYPE_INT64)),
ImmutableList.of()));
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema = Schema.builder().addArrayField("field1", FieldType.INT64).build();
PAssert.that(stream)
.containsInAnyOrder(Row.withSchema(schema).addValue(ImmutableList.of()).build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testEmptyArrayLiteral() {
String sql = "SELECT ARRAY<STRING>[];";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema = Schema.builder().addArrayField("field1", FieldType.STRING).build();
PAssert.that(stream)
.containsInAnyOrder(Row.withSchema(schema).addValue(ImmutableList.of()).build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testLike1() {
String sql = "SELECT @p0 LIKE @p1 AS ColA";
ImmutableMap<String, Value> params =
ImmutableMap.of(
"p0", Value.createStringValue("ab%"), "p1", Value.createStringValue("ab\\%"));
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema = Schema.builder().addNullableField("field1", FieldType.BOOLEAN).build();
PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValues(true).build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testLikeNullPattern() {
String sql = "SELECT @p0 LIKE @p1 AS ColA";
ImmutableMap<String, Value> params =
ImmutableMap.of(
"p0",
Value.createStringValue("ab%"),
"p1",
Value.createSimpleNullValue(TypeKind.TYPE_STRING));
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema = Schema.builder().addNullableField("field1", FieldType.BOOLEAN).build();
PAssert.that(stream)
.containsInAnyOrder(Row.withSchema(schema).addValues((Object) null).build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testLikeAllowsEscapingNonSpecialCharacter() {
String sql = "SELECT @p0 LIKE @p1 AS ColA";
ImmutableMap<String, Value> params =
ImmutableMap.of("p0", Value.createStringValue("ab"), "p1", Value.createStringValue("\\ab"));
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema = Schema.builder().addNullableField("field1", FieldType.BOOLEAN).build();
PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValues(true).build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testLikeAllowsEscapingBackslash() {
String sql = "SELECT @p0 LIKE @p1 AS ColA";
ImmutableMap<String, Value> params =
ImmutableMap.of(
"p0", Value.createStringValue("a\\c"), "p1", Value.createStringValue("a\\\\c"));
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema = Schema.builder().addNullableField("field1", FieldType.BOOLEAN).build();
PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValues(true).build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testLikeBytes() {
String sql = "SELECT @p0 LIKE @p1 AS ColA";
ImmutableMap<String, Value> params =
ImmutableMap.of(
"p0",
Value.createBytesValue(ByteString.copyFromUtf8("abcd")),
"p1",
Value.createBytesValue(ByteString.copyFromUtf8("__%")));
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema = Schema.builder().addNullableField("field1", FieldType.BOOLEAN).build();
PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValues(true).build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testMod() {
String sql = "SELECT MOD(4, 2)";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema = Schema.builder().addInt64Field("field1").build();
PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValues(0L).build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testSimpleUnionAll() {
String sql =
"SELECT CAST (1243 as INT64), "
+ "CAST ('2018-09-15 12:59:59.000000+00' as TIMESTAMP), "
+ "CAST ('string' as STRING) "
+ " UNION ALL "
+ " SELECT CAST (1243 as INT64), "
+ "CAST ('2018-09-15 12:59:59.000000+00' as TIMESTAMP), "
+ "CAST ('string' as STRING);";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema =
Schema.builder()
.addInt64Field("field1")
.addDateTimeField("field2")
.addStringField("field3")
.build();
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(schema)
.addValues(
1243L,
new DateTime(2018, 9, 15, 12, 59, 59, ISOChronology.getInstanceUTC()),
"string")
.build(),
Row.withSchema(schema)
.addValues(
1243L,
new DateTime(2018, 9, 15, 12, 59, 59, ISOChronology.getInstanceUTC()),
"string")
.build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testThreeWayUnionAll() {
String sql = "SELECT a FROM (SELECT 1 a UNION ALL SELECT 2 UNION ALL SELECT 3)";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema = Schema.builder().addInt64Field("field1").build();
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(schema).addValues(1L).build(),
Row.withSchema(schema).addValues(2L).build(),
Row.withSchema(schema).addValues(3L).build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testSimpleUnionDISTINCT() {
String sql =
"SELECT CAST (1243 as INT64), "
+ "CAST ('2018-09-15 12:59:59.000000+00' as TIMESTAMP), "
+ "CAST ('string' as STRING) "
+ " UNION DISTINCT "
+ " SELECT CAST (1243 as INT64), "
+ "CAST ('2018-09-15 12:59:59.000000+00' as TIMESTAMP), "
+ "CAST ('string' as STRING);";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema =
Schema.builder()
.addInt64Field("field1")
.addDateTimeField("field2")
.addStringField("field3")
.build();
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(schema)
.addValues(
1243L,
new DateTime(2018, 9, 15, 12, 59, 59, ISOChronology.getInstanceUTC()),
"string")
.build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testZetaSQLInnerJoin() {
String sql =
"SELECT t1.Key "
+ "FROM KeyValue AS t1"
+ " INNER JOIN BigTable AS t2"
+ " on "
+ " t1.Key = t2.RowKey AND t1.ts = t2.ts";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(Schema.builder().addInt64Field("field1").build())
.addValues(15L)
.build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
// JOIN USING(col) is equivalent to JOIN on left.col = right.col.
public void testZetaSQLInnerJoinWithUsing() {
String sql = "SELECT t1.Key " + "FROM KeyValue AS t1" + " INNER JOIN BigTable AS t2 USING(ts)";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(Schema.builder().addInt64Field("field1").build())
.addValues(15L)
.build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
// testing ordering of the JOIN conditions.
public void testZetaSQLInnerJoinTwo() {
String sql =
"SELECT t2.RowKey "
+ "FROM KeyValue AS t1"
+ " INNER JOIN BigTable AS t2"
+ " on "
+ " t2.RowKey = t1.Key AND t2.ts = t1.ts";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(Schema.builder().addInt64Field("field1").build())
.addValues(15L)
.build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testZetaSQLLeftOuterJoin() {
String sql =
"SELECT * "
+ "FROM KeyValue AS t1"
+ " LEFT JOIN BigTable AS t2"
+ " on "
+ " t1.Key = t2.RowKey";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schemaOne =
Schema.builder()
.addInt64Field("field1")
.addStringField("field2")
.addDateTimeField("field3")
.addNullableField("field4", FieldType.INT64)
.addNullableField("field5", FieldType.STRING)
.addNullableField("field6", DATETIME)
.build();
final Schema schemaTwo =
Schema.builder()
.addInt64Field("field1")
.addStringField("field2")
.addDateTimeField("field3")
.addInt64Field("field4")
.addStringField("field5")
.addDateTimeField("field6")
.build();
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(schemaOne)
.addValues(
14L,
"KeyValue234",
new DateTime(2018, 7, 1, 21, 26, 6, ISOChronology.getInstanceUTC()),
null,
null,
null)
.build(),
Row.withSchema(schemaTwo)
.addValues(
15L,
"KeyValue235",
new DateTime(2018, 7, 1, 21, 26, 7, ISOChronology.getInstanceUTC()),
15L,
"BigTable235",
new DateTime(2018, 7, 1, 21, 26, 7, ISOChronology.getInstanceUTC()))
.build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testZetaSQLRightOuterJoin() {
String sql =
"SELECT * "
+ "FROM KeyValue AS t1"
+ " RIGHT JOIN BigTable AS t2"
+ " on "
+ " t1.Key = t2.RowKey";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schemaOne =
Schema.builder()
.addNullableField("field1", FieldType.INT64)
.addNullableField("field2", FieldType.STRING)
.addNullableField("field3", DATETIME)
.addInt64Field("field4")
.addStringField("field5")
.addDateTimeField("field6")
.build();
final Schema schemaTwo =
Schema.builder()
.addInt64Field("field1")
.addStringField("field2")
.addDateTimeField("field3")
.addInt64Field("field4")
.addStringField("field5")
.addDateTimeField("field6")
.build();
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(schemaOne)
.addValues(
null,
null,
null,
16L,
"BigTable236",
new DateTime(2018, 7, 1, 21, 26, 8, ISOChronology.getInstanceUTC()))
.build(),
Row.withSchema(schemaTwo)
.addValues(
15L,
"KeyValue235",
new DateTime(2018, 7, 1, 21, 26, 7, ISOChronology.getInstanceUTC()),
15L,
"BigTable235",
new DateTime(2018, 7, 1, 21, 26, 7, ISOChronology.getInstanceUTC()))
.build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testZetaSQLFullOuterJoin() {
String sql =
"SELECT * "
+ "FROM KeyValue AS t1"
+ " FULL JOIN BigTable AS t2"
+ " on "
+ " t1.Key = t2.RowKey";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schemaOne =
Schema.builder()
.addNullableField("field1", FieldType.INT64)
.addNullableField("field2", FieldType.STRING)
.addNullableField("field3", DATETIME)
.addInt64Field("field4")
.addStringField("field5")
.addDateTimeField("field6")
.build();
final Schema schemaTwo =
Schema.builder()
.addInt64Field("field1")
.addStringField("field2")
.addDateTimeField("field3")
.addInt64Field("field4")
.addStringField("field5")
.addDateTimeField("field6")
.build();
final Schema schemaThree =
Schema.builder()
.addInt64Field("field1")
.addStringField("field2")
.addDateTimeField("field3")
.addNullableField("field4", FieldType.INT64)
.addNullableField("field5", FieldType.STRING)
.addNullableField("field6", DATETIME)
.build();
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(schemaOne)
.addValues(
null,
null,
null,
16L,
"BigTable236",
new DateTime(2018, 7, 1, 21, 26, 8, ISOChronology.getInstanceUTC()))
.build(),
Row.withSchema(schemaTwo)
.addValues(
15L,
"KeyValue235",
new DateTime(2018, 7, 1, 21, 26, 7, ISOChronology.getInstanceUTC()),
15L,
"BigTable235",
new DateTime(2018, 7, 1, 21, 26, 7, ISOChronology.getInstanceUTC()))
.build(),
Row.withSchema(schemaThree)
.addValues(
14L,
"KeyValue234",
new DateTime(2018, 7, 1, 21, 26, 6, ISOChronology.getInstanceUTC()),
null,
null,
null)
.build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
@Ignore("BeamSQL only supports equal join")
public void testZetaSQLFullOuterJoinTwo() {
String sql =
"SELECT * "
+ "FROM KeyValue AS t1"
+ " FULL JOIN BigTable AS t2"
+ " on "
+ " t1.Key + t2.RowKey = 30";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testZetaSQLFullOuterJoinFalse() {
String sql = "SELECT * FROM KeyValue AS t1 FULL JOIN BigTable AS t2 ON false";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
thrown.expect(UnsupportedOperationException.class);
BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
}
@Test
public void testZetaSQLThreeWayInnerJoin() {
String sql =
"SELECT t3.Value, t2.Value, t1.Value, t1.Key, t3.ColId FROM KeyValue as t1 "
+ "JOIN BigTable as t2 "
+ "ON (t1.Key = t2.RowKey) "
+ "JOIN Spanner as t3 "
+ "ON (t3.ColId = t1.Key)";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(
Schema.builder()
.addStringField("t3.Value")
.addStringField("t2.Value")
.addStringField("t1.Value")
.addInt64Field("t1.Key")
.addInt64Field("t3.ColId")
.build())
.addValues("Spanner235", "BigTable235", "KeyValue235", 15L, 15L)
.build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testZetaSQLTableJoinOnItselfWithFiltering() {
String sql =
"SELECT * FROM Spanner as t1 "
+ "JOIN Spanner as t2 "
+ "ON (t1.ColId = t2.ColId) WHERE t1.ColId = 17";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(
Schema.builder()
.addInt64Field("field1")
.addStringField("field2")
.addInt64Field("field3")
.addStringField("field4")
.build())
.addValues(17L, "Spanner237", 17L, "Spanner237")
.build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testZetaSQLSelectFromSelect() {
String sql = "SELECT * FROM (SELECT \"apple\" AS fruit, \"carrot\" AS vegetable);";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema =
Schema.builder().addStringField("field1").addStringField("field2").build();
PAssert.that(stream)
.containsInAnyOrder(Row.withSchema(schema).addValues("apple", "carrot").build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
Schema outputSchema = stream.getSchema();
Assert.assertEquals(2, outputSchema.getFieldCount());
Assert.assertEquals("fruit", outputSchema.getField(0).getName());
Assert.assertEquals("vegetable", outputSchema.getField(1).getName());
}
@Test
public void testZetaSQLSelectFromTable() {
String sql = "SELECT Key, Value FROM KeyValue;";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema = Schema.builder().addInt64Field("field1").addStringField("field2").build();
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(schema).addValues(14L, "KeyValue234").build(),
Row.withSchema(schema).addValues(15L, "KeyValue235").build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testZetaSQLSelectFromTableLimit() {
String sql = "SELECT Key, Value FROM KeyValue LIMIT 2;";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema = Schema.builder().addInt64Field("field1").addStringField("field2").build();
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(schema).addValues(14L, "KeyValue234").build(),
Row.withSchema(schema).addValues(15L, "KeyValue235").build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testZetaSQLSelectFromTableLimit0() {
String sql = "SELECT Key, Value FROM KeyValue LIMIT 0;";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
PAssert.that(stream).containsInAnyOrder();
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testZetaSQLSelectNullLimitParam() {
String sql = "SELECT Key, Value FROM KeyValue LIMIT @lmt;";
ImmutableMap<String, Value> params =
ImmutableMap.of(
"lmt", Value.createNullValue(TypeFactory.createSimpleType(TypeKind.TYPE_INT64)));
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
thrown.expect(RuntimeException.class);
thrown.expectMessage("Limit requires non-null count and offset");
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params);
}
@Test
public void testZetaSQLSelectNullOffsetParam() {
String sql = "SELECT Key, Value FROM KeyValue LIMIT 1 OFFSET @lmt;";
ImmutableMap<String, Value> params =
ImmutableMap.of(
"lmt", Value.createNullValue(TypeFactory.createSimpleType(TypeKind.TYPE_INT64)));
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
thrown.expect(RuntimeException.class);
thrown.expectMessage("Limit requires non-null count and offset");
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params);
}
@Test
public void testZetaSQLSelectFromTableOrderLimit() {
String sql =
"SELECT x, y FROM (SELECT 1 as x, 0 as y UNION ALL SELECT 0, 0 "
+ "UNION ALL SELECT 1, 0 UNION ALL SELECT 1, 1) ORDER BY x LIMIT 1";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema = Schema.builder().addInt64Field("field1").addInt64Field("field2").build();
PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValues(0L, 0L).build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testZetaSQLSelectFromTableLimitOffset() {
String sql =
"SELECT COUNT(a) FROM (\n"
+ "SELECT a FROM (SELECT 1 a UNION ALL SELECT 2 UNION ALL SELECT 3) LIMIT 3 OFFSET 1);";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema = Schema.builder().addInt64Field("field1").build();
PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValues(2L).build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
// There is really no order for a PCollection, so this query does not test
// ORDER BY but just a test to see if ORDER BY LIMIT can work.
@Test
public void testZetaSQLSelectFromTableOrderByLimit() {
String sql = "SELECT Key, Value FROM KeyValue ORDER BY Key DESC LIMIT 2;";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema = Schema.builder().addInt64Field("field1").addStringField("field2").build();
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(schema).addValues(14L, "KeyValue234").build(),
Row.withSchema(schema).addValues(15L, "KeyValue235").build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testZetaSQLSelectFromTableOrderBy() {
String sql = "SELECT Key, Value FROM KeyValue ORDER BY Key DESC;";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
thrown.expect(RuntimeException.class);
thrown.expectMessage("ORDER BY without a LIMIT is not supported.");
zetaSQLQueryPlanner.convertToBeamRel(sql);
}
@Test
public void testZetaSQLSelectFromTableWithStructType2() {
String sql =
"SELECT table_with_struct.struct_col.struct_col_str FROM table_with_struct WHERE id = 1;";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema = Schema.builder().addStringField("field").build();
PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValue("row_one").build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testZetaSQLStructFieldAccessInFilter() {
String sql =
"SELECT table_with_struct.id FROM table_with_struct WHERE"
+ " table_with_struct.struct_col.struct_col_str = 'row_one';";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema = Schema.builder().addInt64Field("field").build();
PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValue(1L).build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testZetaSQLStructFieldAccessInCast() {
String sql =
"SELECT CAST(table_with_struct.id AS STRING) FROM table_with_struct WHERE"
+ " table_with_struct.struct_col.struct_col_str = 'row_one';";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema = Schema.builder().addStringField("field").build();
PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValue("1").build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
@Ignore("[BEAM-9191] CAST operator does not work fully due to bugs in unparsing")
public void testZetaSQLStructFieldAccessInCast2() {
String sql =
"SELECT CAST(A.struct_col.struct_col_str AS TIMESTAMP) FROM table_with_struct_ts_string AS"
+ " A";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema = Schema.builder().addDateTimeField("field").build();
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(schema)
.addValue(parseTimestampWithUTCTimeZone("2019-01-15 13:21:03"))
.build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
// Used to validate fix for [BEAM-8042].
public void testAggregateWithAndWithoutColumnRefs() {
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
String sql =
"SELECT \n"
+ " id, \n"
+ " SUM(has_f1) as f1_count, \n"
+ " SUM(has_f2) as f2_count, \n"
+ " SUM(has_f3) as f3_count, \n"
+ " SUM(has_f4) as f4_count, \n"
+ " SUM(has_f5) as f5_count, \n"
+ " COUNT(*) as count, \n"
+ " SUM(has_f6) as f6_count \n"
+ "FROM (select 0 as id, 1 as has_f1, 2 as has_f2, 3 as has_f3, 4 as has_f4, 5 as has_f5, 6 as has_f6)\n"
+ "GROUP BY id";
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema =
Schema.builder()
.addInt64Field("id")
.addInt64Field("f1_count")
.addInt64Field("f2_count")
.addInt64Field("f3_count")
.addInt64Field("f4_count")
.addInt64Field("f5_count")
.addInt64Field("count")
.addInt64Field("f6_count")
.build();
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(schema).addValues(0L, 1L, 2L, 3L, 4L, 5L, 1L, 6L).build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testZetaSQLStructFieldAccessInGroupBy() {
String sql = "SELECT rowCol.row_id, COUNT(*) FROM table_with_struct_two GROUP BY rowCol.row_id";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema = Schema.builder().addInt64Field("field1").addInt64Field("field2").build();
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(schema).addValues(1L, 1L).build(),
Row.withSchema(schema).addValues(2L, 1L).build(),
Row.withSchema(schema).addValues(3L, 2L).build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testZetaSQLAnyValueInGroupBy() {
String sql =
"SELECT rowCol.row_id as key, ANY_VALUE(rowCol.data) as any_value FROM table_with_struct_two GROUP BY rowCol.row_id";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
Map<Long, List<String>> allowedTuples = new HashMap<>();
allowedTuples.put(1L, Arrays.asList("data1"));
allowedTuples.put(2L, Arrays.asList("data2"));
allowedTuples.put(3L, Arrays.asList("data2", "data3"));
PAssert.that(stream)
.satisfies(
input -> {
Iterator<Row> iter = input.iterator();
while (iter.hasNext()) {
Row row = iter.next();
List<String> values = allowedTuples.remove(row.getInt64("key"));
assertTrue(values != null);
assertTrue(values.contains(row.getString("any_value")));
}
assertTrue(allowedTuples.isEmpty());
return null;
});
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testZetaSQLStructFieldAccessInGroupBy2() {
String sql =
"SELECT rowCol.data, MAX(rowCol.row_id), MIN(rowCol.row_id) FROM table_with_struct_two"
+ " GROUP BY rowCol.data";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema =
Schema.builder()
.addStringField("field1")
.addInt64Field("field2")
.addInt64Field("field3")
.build();
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(schema).addValues("data1", 1L, 1L).build(),
Row.withSchema(schema).addValues("data2", 3L, 2L).build(),
Row.withSchema(schema).addValues("data3", 3L, 3L).build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testZetaSQLStructFieldAccessInnerJoin() {
String sql =
"SELECT A.rowCol.data FROM table_with_struct_two AS A INNER JOIN "
+ "table_with_struct AS B "
+ "ON A.rowCol.row_id = B.id";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema = Schema.builder().addStringField("field1").build();
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(schema).addValue("data1").build(),
Row.withSchema(schema).addValue("data2").build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testZetaSQLSelectFromTableWithArrayType() {
String sql = "SELECT array_col FROM table_with_array;";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema = Schema.builder().addArrayField("field", FieldType.STRING).build();
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(schema).addValue(Arrays.asList("1", "2", "3")).build(),
Row.withSchema(schema).addValue(ImmutableList.of()).build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testZetaSQLSelectStarFromTable() {
String sql = "SELECT * FROM BigTable;";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema =
Schema.builder()
.addInt64Field("field1")
.addStringField("field2")
.addDateTimeField("field3")
.build();
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(schema)
.addValues(
15L,
"BigTable235",
new DateTime(2018, 7, 1, 21, 26, 7, ISOChronology.getInstanceUTC()))
.build(),
Row.withSchema(schema)
.addValues(
16L,
"BigTable236",
new DateTime(2018, 7, 1, 21, 26, 8, ISOChronology.getInstanceUTC()))
.build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testZetaSQLBasicFiltering() {
String sql = "SELECT Key, Value FROM KeyValue WHERE Key = 14;";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(
Schema.builder().addInt64Field("field1").addStringField("field2").build())
.addValues(14L, "KeyValue234")
.build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testZetaSQLBasicFilteringTwo() {
String sql = "SELECT Key, Value FROM KeyValue WHERE Key = 14 AND Value = 'non-existing';";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
PAssert.that(stream).containsInAnyOrder();
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testZetaSQLBasicFilteringThree() {
String sql = "SELECT Key, Value FROM KeyValue WHERE Key = 14 OR Key = 15;";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema = Schema.builder().addInt64Field("field1").addStringField("field2").build();
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(schema).addValues(14L, "KeyValue234").build(),
Row.withSchema(schema).addValues(15L, "KeyValue235").build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testZetaSQLCountOnAColumn() {
String sql = "SELECT COUNT(Key) FROM KeyValue";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema = Schema.builder().addInt64Field("field1").build();
PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValues(2L).build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testZetaSQLAggDistinct() {
String sql = "SELECT Key, COUNT(DISTINCT Value) FROM KeyValue GROUP BY Key";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
thrown.expect(RuntimeException.class);
thrown.expectMessage("Does not support COUNT DISTINCT");
zetaSQLQueryPlanner.convertToBeamRel(sql);
}
@Test
public void testZetaSQLBasicAgg() {
String sql = "SELECT Key, COUNT(*) FROM KeyValue GROUP BY Key";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema = Schema.builder().addInt64Field("field1").addInt64Field("field2").build();
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(schema).addValues(14L, 1L).build(),
Row.withSchema(schema).addValues(15L, 1L).build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testZetaSQLColumnAlias1() {
String sql = "SELECT Key, COUNT(*) AS count_col FROM KeyValue GROUP BY Key";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
Schema outputSchema = stream.getSchema();
Assert.assertEquals(2, outputSchema.getFieldCount());
Assert.assertEquals("Key", outputSchema.getField(0).getName());
Assert.assertEquals("count_col", outputSchema.getField(1).getName());
}
@Test
public void testZetaSQLColumnAlias2() {
String sql =
"SELECT Key AS k1, (count_col + 1) AS k2 FROM (SELECT Key, COUNT(*) AS count_col FROM"
+ " KeyValue GROUP BY Key)";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
Schema outputSchema = stream.getSchema();
Assert.assertEquals(2, outputSchema.getFieldCount());
Assert.assertEquals("k1", outputSchema.getField(0).getName());
Assert.assertEquals("k2", outputSchema.getField(1).getName());
}
@Test
public void testZetaSQLColumnAlias3() {
String sql = "SELECT Key AS v1, Value AS v2, ts AS v3 FROM KeyValue";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
Schema outputSchema = stream.getSchema();
Assert.assertEquals(3, outputSchema.getFieldCount());
Assert.assertEquals("v1", outputSchema.getField(0).getName());
Assert.assertEquals("v2", outputSchema.getField(1).getName());
Assert.assertEquals("v3", outputSchema.getField(2).getName());
}
@Test
public void testZetaSQLColumnAlias4() {
String sql = "SELECT CAST(123 AS INT64) AS cast_col";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
Schema outputSchema = stream.getSchema();
Assert.assertEquals(1, outputSchema.getFieldCount());
Assert.assertEquals("cast_col", outputSchema.getField(0).getName());
}
@Test
public void testZetaSQLAmbiguousAlias() {
String sql = "SELECT row_id as ID, int64_col as ID FROM table_all_types GROUP BY ID;";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
thrown.expectMessage(
"Name ID in GROUP BY clause is ambiguous; it may refer to multiple columns in the"
+ " SELECT-list [at 1:68]");
zetaSQLQueryPlanner.convertToBeamRel(sql);
}
@Test
public void testZetaSQLAggWithOrdinalReference() {
String sql = "SELECT Key, COUNT(*) FROM aggregate_test_table GROUP BY 1";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema = Schema.builder().addInt64Field("field1").addInt64Field("field2").build();
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(schema).addValues(1L, 2L).build(),
Row.withSchema(schema).addValues(2L, 3L).build(),
Row.withSchema(schema).addValues(3L, 2L).build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testZetaSQLAggWithAliasReference() {
String sql = "SELECT Key AS K, COUNT(*) FROM aggregate_test_table GROUP BY K";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema = Schema.builder().addInt64Field("field1").addInt64Field("field2").build();
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(schema).addValues(1L, 2L).build(),
Row.withSchema(schema).addValues(2L, 3L).build(),
Row.withSchema(schema).addValues(3L, 2L).build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testZetaSQLBasicAgg2() {
String sql = "SELECT Key, COUNT(*) FROM aggregate_test_table GROUP BY Key";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema = Schema.builder().addInt64Field("field1").addInt64Field("field2").build();
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(schema).addValues(1L, 2L).build(),
Row.withSchema(schema).addValues(2L, 3L).build(),
Row.withSchema(schema).addValues(3L, 2L).build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testZetaSQLBasicAgg3() {
String sql = "SELECT Key, Key2, COUNT(*) FROM aggregate_test_table GROUP BY Key2, Key";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema =
Schema.builder()
.addInt64Field("field1")
.addInt64Field("field3")
.addInt64Field("field2")
.build();
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(schema).addValues(1L, 10L, 1L).build(),
Row.withSchema(schema).addValues(1L, 11L, 1L).build(),
Row.withSchema(schema).addValues(2L, 11L, 2L).build(),
Row.withSchema(schema).addValues(2L, 12L, 1L).build(),
Row.withSchema(schema).addValues(3L, 13L, 2L).build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testZetaSQLBasicAgg4() {
String sql =
"SELECT Key, Key2, MAX(f_int_1), MIN(f_int_1), SUM(f_int_1), SUM(f_double_1) "
+ "FROM aggregate_test_table GROUP BY Key2, Key";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema =
Schema.builder()
.addInt64Field("field1")
.addInt64Field("field3")
.addInt64Field("field2")
.addInt64Field("field4")
.addInt64Field("field5")
.addDoubleField("field6")
.build();
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(schema).addValues(1L, 10L, 1L, 1L, 1L, 1.0).build(),
Row.withSchema(schema).addValues(1L, 11L, 2L, 2L, 2L, 2.0).build(),
Row.withSchema(schema).addValues(2L, 11L, 4L, 3L, 7L, 7.0).build(),
Row.withSchema(schema).addValues(2L, 12L, 5L, 5L, 5L, 5.0).build(),
Row.withSchema(schema).addValues(3L, 13L, 7L, 6L, 13L, 13.0).build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testZetaSQLBasicAgg5() {
String sql =
"SELECT Key, Key2, AVG(CAST(f_int_1 AS FLOAT64)), AVG(f_double_1) "
+ "FROM aggregate_test_table GROUP BY Key2, Key";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema =
Schema.builder()
.addInt64Field("field1")
.addInt64Field("field2")
.addDoubleField("field3")
.addDoubleField("field4")
.build();
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(schema).addValues(1L, 10L, 1.0, 1.0).build(),
Row.withSchema(schema).addValues(1L, 11L, 2.0, 2.0).build(),
Row.withSchema(schema).addValues(2L, 11L, 3.5, 3.5).build(),
Row.withSchema(schema).addValues(2L, 12L, 5.0, 5.0).build(),
Row.withSchema(schema).addValues(3L, 13L, 6.5, 6.5).build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
@Ignore(
"Calcite infers return type of AVG(int64) as BIGINT while ZetaSQL requires it as either"
+ " NUMERIC or DOUBLE/FLOAT64")
public void testZetaSQLTestAVG() {
String sql = "SELECT Key, AVG(f_int_1)" + "FROM aggregate_test_table GROUP BY Key";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema =
Schema.builder()
.addInt64Field("field1")
.addInt64Field("field2")
.addInt64Field("field3")
.build();
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(schema).addValues(1L, 10L, 1L).build(),
Row.withSchema(schema).addValues(1L, 11L, 6L).build(),
Row.withSchema(schema).addValues(2L, 11L, 6L).build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testZetaSQLGroupByExprInSelect() {
String sql = "SELECT int64_col + 1 FROM table_all_types GROUP BY int64_col + 1;";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema = Schema.builder().addInt64Field("field").build();
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(schema).addValue(0L).build(),
Row.withSchema(schema).addValue(-1L).build(),
Row.withSchema(schema).addValue(-2L).build(),
Row.withSchema(schema).addValue(-3L).build(),
Row.withSchema(schema).addValue(-4L).build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testZetaSQLGroupByAndFiltering() {
String sql = "SELECT int64_col FROM table_all_types WHERE int64_col = 1 GROUP BY int64_col;";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
PAssert.that(stream).containsInAnyOrder();
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testZetaSQLGroupByAndFilteringOnNonGroupByColumn() {
String sql = "SELECT int64_col FROM table_all_types WHERE double_col = 0.5 GROUP BY int64_col;";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema = Schema.builder().addInt64Field("field").build();
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(schema).addValue(-5L).build(),
Row.withSchema(schema).addValue(-4L).build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testZetaSQLBasicHaving() {
String sql = "SELECT Key, COUNT(*) FROM aggregate_test_table GROUP BY Key HAVING COUNT(*) > 2";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema = Schema.builder().addInt64Field("field1").addInt64Field("field2").build();
PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValues(2L, 3L).build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testZetaSQLHavingNull() {
String sql = "SELECT SUM(int64_val) FROM all_null_table GROUP BY primary_key HAVING false";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema = Schema.builder().addInt64Field("field").build();
PAssert.that(stream).empty();
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testZetaSQLBasicFixedWindowing() {
String sql =
"SELECT "
+ "COUNT(*) as field_count, "
+ "TUMBLE_START(\"INTERVAL 1 SECOND\") as window_start, "
+ "TUMBLE_END(\"INTERVAL 1 SECOND\") as window_end "
+ "FROM KeyValue "
+ "GROUP BY TUMBLE(ts, \"INTERVAL 1 SECOND\");";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema =
Schema.builder()
.addInt64Field("count_start")
.addDateTimeField("field1")
.addDateTimeField("field2")
.build();
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(schema)
.addValues(
1L,
new DateTime(2018, 7, 1, 21, 26, 7, ISOChronology.getInstanceUTC()),
new DateTime(2018, 7, 1, 21, 26, 8, ISOChronology.getInstanceUTC()))
.build(),
Row.withSchema(schema)
.addValues(
1L,
new DateTime(2018, 7, 1, 21, 26, 6, ISOChronology.getInstanceUTC()),
new DateTime(2018, 7, 1, 21, 26, 7, ISOChronology.getInstanceUTC()))
.build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
// Test nested selection
@Test
public void testZetaSQLNestedQueryOne() {
String sql =
"SELECT a.Value, a.Key FROM (SELECT Key, Value FROM KeyValue WHERE Key = 14 OR Key = 15)"
+ " as a;";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema = Schema.builder().addStringField("field2").addInt64Field("field1").build();
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(schema).addValues("KeyValue234", 14L).build(),
Row.withSchema(schema).addValues("KeyValue235", 15L).build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
// Test selection, filtering and aggregation combined query.
@Test
public void testZetaSQLNestedQueryTwo() {
String sql =
"SELECT a.Key, a.Key2, COUNT(*) FROM "
+ " (SELECT * FROM aggregate_test_table WHERE Key != 10) as a "
+ " GROUP BY a.Key2, a.Key";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema =
Schema.builder()
.addInt64Field("field1")
.addInt64Field("field3")
.addInt64Field("field2")
.build();
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(schema).addValues(1L, 10L, 1L).build(),
Row.withSchema(schema).addValues(1L, 11L, 1L).build(),
Row.withSchema(schema).addValues(2L, 11L, 2L).build(),
Row.withSchema(schema).addValues(2L, 12L, 1L).build(),
Row.withSchema(schema).addValues(3L, 13L, 2L).build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
// test selection and join combined query
@Test
public void testZetaSQLNestedQueryThree() {
String sql =
"SELECT * FROM (SELECT * FROM KeyValue) AS t1 INNER JOIN (SELECT * FROM BigTable) AS t2 on"
+ " t1.Key = t2.RowKey";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(
Schema.builder()
.addInt64Field("Key")
.addStringField("Value")
.addDateTimeField("ts")
.addInt64Field("RowKey")
.addStringField("Value2")
.addDateTimeField("ts2")
.build())
.addValues(
15L,
"KeyValue235",
new DateTime(2018, 7, 1, 21, 26, 7, ISOChronology.getInstanceUTC()),
15L,
"BigTable235",
new DateTime(2018, 7, 1, 21, 26, 7, ISOChronology.getInstanceUTC()))
.build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
// Test nested select with out of order columns.
@Test
public void testZetaSQLNestedQueryFive() {
String sql =
"SELECT a.Value, a.Key FROM (SELECT Value, Key FROM KeyValue WHERE Key = 14 OR Key = 15)"
+ " as a;";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema = Schema.builder().addStringField("field2").addInt64Field("field1").build();
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(schema).addValues("KeyValue234", 14L).build(),
Row.withSchema(schema).addValues("KeyValue235", 15L).build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
/////////////////////////////////////////////////////////////////////////////
// DOUBLE INF and NAN tests
/////////////////////////////////////////////////////////////////////////////
@Test
public void testDoubleINF() {
String sql = "SELECT CAST('+inf' AS FLOAT64), CAST('-inf' AS FLOAT64)";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(
Schema.builder()
.addDoubleField("f_double1")
.addDoubleField("f_double2")
.build())
.addValues(Double.POSITIVE_INFINITY)
.addValues(Double.NEGATIVE_INFINITY)
.build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testDoubleINFEQ() {
String sql =
"SELECT CAST('+inf' AS FLOAT64) = CAST('+inf' AS FLOAT64), CAST('+inf' AS FLOAT64) = CAST('-inf' AS FLOAT64)";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(
Schema.builder()
.addBooleanField("f_boolean1")
.addBooleanField("f_boolean2")
.build())
.addValues(true)
.addValues(false)
.build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testDoubleNAN() {
String sql = "SELECT CAST('NaN' AS FLOAT64)";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(Schema.builder().addDoubleField("f_double").build())
.addValues(Double.NaN)
.build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testDoubleNaNEQ() {
String sql = "SELECT CAST('NaN' AS FLOAT64) = CAST('NaN' AS FLOAT64)";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(Schema.builder().addBooleanField("f_boolean").build())
.addValues(false)
.build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testDoubleIsINF() {
String sql =
"SELECT IS_INF(CAST('+inf' AS FLOAT64)), IS_INF(CAST('-inf' AS FLOAT64)), IS_INF(3.0)";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(
Schema.builder()
.addBooleanField("f_boolean1")
.addBooleanField("f_boolean2")
.addBooleanField("f_boolean3")
.build())
.addValues(true)
.addValues(true)
.addValues(false)
.build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testDoubleIsNAN() {
String sql = "SELECT IS_NAN(CAST('NaN' AS FLOAT64)), IS_NAN(3.0)";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(
Schema.builder()
.addBooleanField("f_boolean1")
.addBooleanField("f_boolean2")
.build())
.addValues(true)
.addValues(false)
.build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
/////////////////////////////////////////////////////////////////////////////
// NUMERIC type tests
/////////////////////////////////////////////////////////////////////////////
@Test
public void testNumericLiteral() {
String sql =
"SELECT NUMERIC '0', "
+ "NUMERIC '123456', "
+ "NUMERIC '-3.14', "
+ "NUMERIC '-0.54321', "
+ "NUMERIC '1.23456e05', "
+ "NUMERIC '-9.876e-3', "
// min value for ZetaSQL NUMERIC type
+ "NUMERIC '-99999999999999999999999999999.999999999', "
// max value for ZetaSQL NUMERIC type
+ "NUMERIC '99999999999999999999999999999.999999999'";
;
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(
Schema.builder()
.addDecimalField("f_numeric1")
.addDecimalField("f_numeric2")
.addDecimalField("f_numeric3")
.addDecimalField("f_numeric4")
.addDecimalField("f_numeric5")
.addDecimalField("f_numeric6")
.addDecimalField("f_numeric7")
.addDecimalField("f_numeric8")
.build())
.addValues(ZetaSqlTypesUtils.bigDecimalAsNumeric("0"))
.addValues(ZetaSqlTypesUtils.bigDecimalAsNumeric("123456"))
.addValues(ZetaSqlTypesUtils.bigDecimalAsNumeric("-3.14"))
.addValues(ZetaSqlTypesUtils.bigDecimalAsNumeric("-0.54321"))
.addValues(ZetaSqlTypesUtils.bigDecimalAsNumeric("123456"))
.addValues(ZetaSqlTypesUtils.bigDecimalAsNumeric("-0.009876"))
.addValues(
ZetaSqlTypesUtils.bigDecimalAsNumeric(
"-99999999999999999999999999999.999999999"))
.addValues(
ZetaSqlTypesUtils.bigDecimalAsNumeric(
"99999999999999999999999999999.999999999"))
.build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testNumericColumn() {
String sql = "SELECT numeric_field FROM table_with_numeric";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(Schema.builder().addDecimalField("f_numeric").build())
.addValues(ZetaSqlTypesUtils.bigDecimalAsNumeric("123.4567"))
.build(),
Row.withSchema(Schema.builder().addDecimalField("f_numeric").build())
.addValues(ZetaSqlTypesUtils.bigDecimalAsNumeric("765.4321"))
.build(),
Row.withSchema(Schema.builder().addDecimalField("f_numeric").build())
.addValues(ZetaSqlTypesUtils.bigDecimalAsNumeric("-555.5555"))
.build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testUnaryMinusNumeric() {
String sql = "SELECT - NUMERIC '1.23456e05'";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(Schema.builder().addDecimalField("f_numeric").build())
.addValues(ZetaSqlTypesUtils.bigDecimalAsNumeric("-123456"))
.build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testAddNumeric() {
String sql = "SELECT NUMERIC '1.23456e05' + NUMERIC '9.876e-3'";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(Schema.builder().addDecimalField("f_numeric").build())
.addValues(ZetaSqlTypesUtils.bigDecimalAsNumeric("123456.009876"))
.build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testSubNumeric() {
String sql = "SELECT NUMERIC '1.23456e05' - NUMERIC '-9.876e-3'";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(Schema.builder().addDecimalField("f_numeric").build())
.addValues(ZetaSqlTypesUtils.bigDecimalAsNumeric("123456.009876"))
.build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testMultiNumeric() {
String sql = "SELECT NUMERIC '1.23e02' * NUMERIC '-1.001e-3'";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(Schema.builder().addDecimalField("f_numeric").build())
.addValues(ZetaSqlTypesUtils.bigDecimalAsNumeric("-0.123123"))
.build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testDivNumeric() {
String sql = "SELECT NUMERIC '-1.23123e-1' / NUMERIC '-1.001e-3'";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(Schema.builder().addDecimalField("f_numeric").build())
.addValues(ZetaSqlTypesUtils.bigDecimalAsNumeric("123"))
.build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testModNumeric() {
String sql = "SELECT MOD(NUMERIC '1.23456e05', NUMERIC '5')";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(Schema.builder().addDecimalField("f_numeric").build())
.addValues(ZetaSqlTypesUtils.bigDecimalAsNumeric("1"))
.build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testFloorNumeric() {
String sql = "SELECT FLOOR(NUMERIC '1.23456e04'), FLOOR(NUMERIC '-1.23456e04')";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(
Schema.builder()
.addDecimalField("f_numeric1")
.addDecimalField("f_numeric2")
.build())
.addValues(ZetaSqlTypesUtils.bigDecimalAsNumeric("12345"))
.addValues(ZetaSqlTypesUtils.bigDecimalAsNumeric("-12346"))
.build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testCeilNumeric() {
String sql = "SELECT CEIL(NUMERIC '1.23456e04'), CEIL(NUMERIC '-1.23456e04')";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(
Schema.builder()
.addDecimalField("f_numeric1")
.addDecimalField("f_numeric2")
.build())
.addValues(ZetaSqlTypesUtils.bigDecimalAsNumeric("12346"))
.addValues(ZetaSqlTypesUtils.bigDecimalAsNumeric("-12345"))
.build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
@Ignore("https://jira.apache.org/jira/browse/BEAM-10459")
public void testSumNumeric() {
String sql = "SELECT SUM(numeric_field) FROM table_with_numeric";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(Schema.builder().addDecimalField("f_numeric").build())
.addValues(ZetaSqlTypesUtils.bigDecimalAsNumeric("333.3333"))
.build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
@Ignore("https://jira.apache.org/jira/browse/BEAM-10459")
public void testAvgNumeric() {
String sql = "SELECT AVG(numeric_field) FROM table_with_numeric";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(Schema.builder().addDecimalField("f_numeric").build())
.addValues(ZetaSqlTypesUtils.bigDecimalAsNumeric("111.1111"))
.build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testMultipleSelectStatementsThrowsException() {
String sql = "SELECT 1; SELECT 2;";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
thrown.expect(UnsupportedOperationException.class);
thrown.expectMessage("No additional statements are allowed after a SELECT statement.");
zetaSQLQueryPlanner.convertToBeamRel(sql);
}
@Test
public void testAlreadyDefinedUDFThrowsException() {
String sql = "CREATE FUNCTION foo() AS (0); CREATE FUNCTION foo() AS (1); SELECT foo();";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
thrown.expect(ParseException.class);
thrown.expectMessage("Failed to define function foo");
zetaSQLQueryPlanner.convertToBeamRel(sql);
}
@Test
public void testCreateFunctionNoSelectThrowsException() {
String sql = "CREATE FUNCTION plusOne(x INT64) AS (x + 1);";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
thrown.expect(UnsupportedOperationException.class);
thrown.expectMessage("Statement list must end in a SELECT statement, not CreateFunctionStmt");
zetaSQLQueryPlanner.convertToBeamRel(sql);
}
@Test
public void testNullaryUdf() {
String sql = "CREATE FUNCTION zero() AS (0); SELECT zero();";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(Schema.builder().addInt64Field("x").build()).addValue(0L).build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testQualifiedNameUdfUnqualifiedCall() {
String sql = "CREATE FUNCTION foo.bar.baz() AS (\"uwu\"); SELECT baz();";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(Schema.builder().addStringField("x").build()).addValue("uwu").build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
@Ignore(
"Qualified paths can't be resolved due to a bug in ZetaSQL: "
+ "https://github.com/google/zetasql/issues/42")
public void testQualifiedNameUdfQualifiedCallThrowsException() {
String sql = "CREATE FUNCTION foo.bar.baz() AS (\"uwu\"); SELECT foo.bar.baz();";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(Schema.builder().addStringField("x").build()).addValue("uwu").build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testUnaryUdf() {
String sql = "CREATE FUNCTION triple(x INT64) AS (3 * x); SELECT triple(triple(1));";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(Schema.builder().addInt64Field("x").build()).addValue(9L).build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testUdfWithinUdf() {
String sql =
"CREATE FUNCTION triple(x INT64) AS (3 * x);"
+ " CREATE FUNCTION nonuple(x INT64) as (triple(triple(x)));"
+ " SELECT nonuple(1);";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(Schema.builder().addInt64Field("x").build()).addValue(9L).build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testUndefinedUdfThrowsException() {
String sql =
"CREATE FUNCTION foo() AS (bar()); "
+ "CREATE FUNCTION bar() AS (foo()); "
+ "SELECT foo();";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
thrown.expect(SqlException.class);
thrown.expectMessage("Function not found: bar");
zetaSQLQueryPlanner.convertToBeamRel(sql);
}
@Test
public void testRecursiveUdfThrowsException() {
String sql = "CREATE FUNCTION omega() AS (omega()); SELECT omega();";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
thrown.expect(SqlException.class);
thrown.expectMessage("Function not found: omega");
zetaSQLQueryPlanner.convertToBeamRel(sql);
}
@Test
public void testUDTVF() {
String sql =
"CREATE TABLE FUNCTION CustomerRange(MinID INT64, MaxID INT64)\n"
+ " AS\n"
+ " SELECT *\n"
+ " FROM KeyValue\n"
+ " WHERE key >= MinId AND key <= MaxId; \n"
+ " SELECT key FROM CustomerRange(10, 14)";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
Schema singleField = Schema.builder().addInt64Field("field1").build();
PAssert.that(stream).containsInAnyOrder(Row.withSchema(singleField).addValues(14L).build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testUDTVFTableNotFound() {
String sql =
"CREATE TABLE FUNCTION CustomerRange(MinID INT64, MaxID INT64)\n"
+ " AS\n"
+ " SELECT *\n"
+ " FROM TableNotExist\n"
+ " WHERE key >= MinId AND key <= MaxId; \n"
+ " SELECT key FROM CustomerRange(10, 14)";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
thrown.expect(SqlConversionException.class);
thrown.expectMessage("Wasn't able to resolve the path [TableNotExist] in schema: beam");
zetaSQLQueryPlanner.convertToBeamRel(sql);
}
@Test
public void testUDTVFFunctionNotFound() {
String sql =
"CREATE TABLE FUNCTION CustomerRange(MinID INT64, MaxID INT64)\n"
+ " AS\n"
+ " SELECT *\n"
+ " FROM KeyValue\n"
+ " WHERE key >= MinId AND key <= MaxId; \n"
+ " SELECT key FROM FunctionNotFound(10, 14)";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
thrown.expect(SqlException.class);
thrown.expectMessage("Table-valued function not found: FunctionNotFound");
zetaSQLQueryPlanner.convertToBeamRel(sql);
}
@Test
public void testDistinct() {
String sql = "SELECT DISTINCT Key2 FROM aggregate_test_table";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
Schema schema = Schema.builder().addInt64Field("Key2").build();
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(schema).addValues(10L).build(),
Row.withSchema(schema).addValues(11L).build(),
Row.withSchema(schema).addValues(12L).build(),
Row.withSchema(schema).addValues(13L).build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testDistinctOnNull() {
String sql = "SELECT DISTINCT str_val FROM all_null_table";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
Schema schema = Schema.builder().addNullableField("str_val", FieldType.DOUBLE).build();
PAssert.that(stream)
.containsInAnyOrder(Row.withSchema(schema).addValues((Object) null).build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testAnyValue() {
String sql = "SELECT ANY_VALUE(double_val) FROM all_null_table";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
Schema schema = Schema.builder().addNullableField("double_val", FieldType.DOUBLE).build();
PAssert.that(stream)
.containsInAnyOrder(Row.withSchema(schema).addValues((Object) null).build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testSelectNULL() {
String sql = "SELECT NULL";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
Schema schema = Schema.builder().addNullableField("long_val", FieldType.INT64).build();
PAssert.that(stream)
.containsInAnyOrder(Row.withSchema(schema).addValues((Object) null).build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testWithQueryOne() {
String sql =
"With T1 AS (SELECT * FROM KeyValue), T2 AS (SELECT * FROM BigTable) SELECT T2.RowKey FROM"
+ " T1 INNER JOIN T2 on T1.Key = T2.RowKey;";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(Schema.builder().addInt64Field("field1").build())
.addValues(15L)
.build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testWithQueryTwo() {
String sql =
"WITH T1 AS (SELECT Key, COUNT(*) as value FROM KeyValue GROUP BY Key) SELECT T1.Key,"
+ " T1.value FROM T1";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema = Schema.builder().addInt64Field("field1").addInt64Field("field2").build();
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(schema).addValues(14L, 1L).build(),
Row.withSchema(schema).addValues(15L, 1L).build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testWithQueryThree() {
String sql =
"WITH T1 as (SELECT Value, Key FROM KeyValue WHERE Key = 14 OR Key = 15) SELECT T1.Value,"
+ " T1.Key FROM T1;";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema = Schema.builder().addStringField("field1").addInt64Field("field2").build();
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(schema).addValues("KeyValue234", 14L).build(),
Row.withSchema(schema).addValues("KeyValue235", 15L).build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testWithQueryFour() {
String sql =
"WITH T1 as (SELECT Value, Key FROM KeyValue) SELECT T1.Value, T1.Key FROM T1 WHERE T1.Key"
+ " = 14 OR T1.Key = 15;";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema = Schema.builder().addStringField("field2").addInt64Field("field1").build();
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(schema).addValues("KeyValue234", 14L).build(),
Row.withSchema(schema).addValues("KeyValue235", 15L).build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testWithQueryFive() {
String sql =
"WITH T1 AS (SELECT * FROM KeyValue) SELECT T1.Key, COUNT(*) FROM T1 GROUP BY T1.Key";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema = Schema.builder().addInt64Field("field1").addInt64Field("field2").build();
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(schema).addValues(14L, 1L).build(),
Row.withSchema(schema).addValues(15L, 1L).build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testWithQuerySix() {
String sql =
"WITH T1 AS (SELECT * FROM window_test_table_two) SELECT "
+ "COUNT(*) as field_count, "
+ "SESSION_START(\"INTERVAL 3 SECOND\") as window_start, "
+ "SESSION_END(\"INTERVAL 3 SECOND\") as window_end "
+ "FROM T1 "
+ "GROUP BY SESSION(ts, \"INTERVAL 3 SECOND\");";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema =
Schema.builder()
.addInt64Field("count_star")
.addDateTimeField("field1")
.addDateTimeField("field2")
.build();
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(schema)
.addValues(
2L,
new DateTime(2018, 7, 1, 21, 26, 12, ISOChronology.getInstanceUTC()),
new DateTime(2018, 7, 1, 21, 26, 12, ISOChronology.getInstanceUTC()))
.build(),
Row.withSchema(schema)
.addValues(
2L,
new DateTime(2018, 7, 1, 21, 26, 6, ISOChronology.getInstanceUTC()),
new DateTime(2018, 7, 1, 21, 26, 6, ISOChronology.getInstanceUTC()))
.build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testUNNESTLiteral() {
String sql = "SELECT * FROM UNNEST(ARRAY<STRING>['foo', 'bar']);";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
Schema schema = Schema.builder().addStringField("str_field").build();
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(schema).addValues("foo").build(),
Row.withSchema(schema).addValues("bar").build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testUNNESTParameters() {
String sql = "SELECT * FROM UNNEST(@p0);";
ImmutableMap<String, Value> params =
ImmutableMap.of(
"p0",
Value.createArrayValue(
TypeFactory.createArrayType(TypeFactory.createSimpleType(TypeKind.TYPE_STRING)),
ImmutableList.of(Value.createStringValue("foo"), Value.createStringValue("bar"))));
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
Schema schema = Schema.builder().addStringField("str_field").build();
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(schema).addValues("foo").build(),
Row.withSchema(schema).addValues("bar").build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
@Ignore("[BEAM-9515] ArrayScanToUncollectConverter Unnest does not support sub-queries")
public void testUNNESTExpression() {
String sql = "SELECT * FROM UNNEST(ARRAY(SELECT Value FROM KeyValue));";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
Schema schema = Schema.builder().addStringField("str_field").build();
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(schema).addValues("KeyValue234").build(),
Row.withSchema(schema).addValues("KeyValue235").build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testNamedUNNESTLiteral() {
String sql = "SELECT *, T1 FROM UNNEST(ARRAY<STRING>['foo', 'bar']) AS T1";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
Schema schema =
Schema.builder().addStringField("str_field").addStringField("str2_field").build();
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(schema).addValues("foo", "foo").build(),
Row.withSchema(schema).addValues("bar", "bar").build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testNamedUNNESTLiteralOffset() {
String sql = "SELECT x, p FROM UNNEST([3, 4]) AS x WITH OFFSET p";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
thrown.expect(UnsupportedOperationException.class);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
}
@Test
public void testUnnestArrayColumn() {
String sql =
"SELECT p FROM table_with_array_for_unnest, UNNEST(table_with_array_for_unnest.int_array_col) as p";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
Schema schema = Schema.builder().addInt64Field("int_field").build();
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(schema).addValue(14L).build(),
Row.withSchema(schema).addValue(18L).build(),
Row.withSchema(schema).addValue(22L).build(),
Row.withSchema(schema).addValue(24L).build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testStringAggregation() {
String sql =
"SELECT STRING_AGG(fruit) AS string_agg"
+ " FROM UNNEST([\"apple\", \"pear\", \"banana\", \"pear\"]) AS fruit";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
Schema schema = Schema.builder().addStringField("string_field").build();
PAssert.that(stream)
.containsInAnyOrder(Row.withSchema(schema).addValue("apple,pear,banana,pear").build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
@Ignore("Seeing exception in Beam, need further investigation on the cause of this failed query.")
public void testNamedUNNESTJoin() {
String sql =
"SELECT * "
+ "FROM table_with_array_for_unnest AS t1"
+ " LEFT JOIN UNNEST(t1.int_array_col) AS t2"
+ " on "
+ " t1.int_col = t2";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
PAssert.that(stream).containsInAnyOrder();
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testUnnestJoinStruct() {
String sql =
"SELECT b, x FROM UNNEST("
+ "[STRUCT(true AS b, [3, 5] AS arr), STRUCT(false AS b, [7, 9] AS arr)]) t "
+ "LEFT JOIN UNNEST(t.arr) x ON b";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
thrown.expect(UnsupportedOperationException.class);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
}
@Test
public void testUnnestJoinLiteral() {
String sql =
"SELECT a, b "
+ "FROM UNNEST([1, 1, 2, 3, 5, 8, 13, NULL]) a "
+ "JOIN UNNEST([1, 2, 3, 5, 7, 11, 13, NULL]) b "
+ "ON a = b";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
thrown.expect(UnsupportedOperationException.class);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
}
@Test
public void testUnnestJoinSubquery() {
String sql =
"SELECT a, b "
+ "FROM UNNEST([1, 2, 3]) a "
+ "JOIN UNNEST(ARRAY(SELECT b FROM UNNEST([3, 2, 1]) b)) b "
+ "ON a = b";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
thrown.expect(UnsupportedOperationException.class);
zetaSQLQueryPlanner.convertToBeamRel(sql);
}
@Test
public void testCaseNoValue() {
String sql = "SELECT CASE WHEN 1 > 2 THEN 'not possible' ELSE 'seems right' END";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(Schema.builder().addStringField("str_field").build())
.addValue("seems right")
.build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testCaseWithValue() {
String sql = "SELECT CASE 1 WHEN 2 THEN 'not possible' ELSE 'seems right' END";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(Schema.builder().addStringField("str_field").build())
.addValue("seems right")
.build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testCaseWithValueMultipleCases() {
String sql =
"SELECT CASE 2 WHEN 1 THEN 'not possible' WHEN 2 THEN 'seems right' ELSE 'also not"
+ " possible' END";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(Schema.builder().addStringField("str_field").build())
.addValue("seems right")
.build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testCaseWithValueNoElse() {
String sql = "SELECT CASE 2 WHEN 1 THEN 'not possible' WHEN 2 THEN 'seems right' END";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(Schema.builder().addStringField("str_field").build())
.addValue("seems right")
.build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testCaseNoValueNoElseNoMatch() {
String sql = "SELECT CASE WHEN 'abc' = '123' THEN 'not possible' END";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(Schema.builder().addNullableField("str_field", FieldType.STRING).build())
.addValue(null)
.build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testCaseWithValueNoElseNoMatch() {
String sql = "SELECT CASE 2 WHEN 1 THEN 'not possible' END";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(Schema.builder().addNullableField("str_field", FieldType.STRING).build())
.addValue(null)
.build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testCastToDateWithCase() {
String sql =
"SELECT f_int, \n"
+ "CASE WHEN CHAR_LENGTH(TRIM(f_string)) = 8 \n"
+ " THEN CAST (CONCAT(\n"
+ " SUBSTR(TRIM(f_string), 1, 4) \n"
+ " , '-' \n"
+ " , SUBSTR(TRIM(f_string), 5, 2) \n"
+ " , '-' \n"
+ " , SUBSTR(TRIM(f_string), 7, 2)) AS DATE)\n"
+ " ELSE NULL\n"
+ "END \n"
+ "FROM table_for_case_when";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
Schema resultType =
Schema.builder()
.addInt64Field("f_long")
.addNullableField("f_date", FieldType.logicalType(SqlTypes.DATE))
.build();
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(resultType).addValues(1L, LocalDate.parse("2018-10-18")).build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testIntersectAll() {
String sql =
"SELECT Key FROM aggregate_test_table "
+ "INTERSECT ALL "
+ "SELECT Key FROM aggregate_test_table_two";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
Schema resultType = Schema.builder().addInt64Field("field").build();
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(resultType).addValues(1L).build(),
Row.withSchema(resultType).addValues(2L).build(),
Row.withSchema(resultType).addValues(2L).build(),
Row.withSchema(resultType).addValues(2L).build(),
Row.withSchema(resultType).addValues(3L).build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testIntersectDistinct() {
String sql =
"SELECT Key FROM aggregate_test_table "
+ "INTERSECT DISTINCT "
+ "SELECT Key FROM aggregate_test_table_two";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
Schema resultType = Schema.builder().addInt64Field("field").build();
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(resultType).addValues(1L).build(),
Row.withSchema(resultType).addValues(2L).build(),
Row.withSchema(resultType).addValues(3L).build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testExceptAll() {
String sql =
"SELECT Key FROM aggregate_test_table "
+ "EXCEPT ALL "
+ "SELECT Key FROM aggregate_test_table_two";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
Schema resultType = Schema.builder().addInt64Field("field").build();
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(resultType).addValues(1L).build(),
Row.withSchema(resultType).addValues(3L).build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testSelectNullIntersectDistinct() {
String sql = "SELECT NULL INTERSECT DISTINCT SELECT 2";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
System.err.println("SCHEMA " + stream.getSchema());
PAssert.that(stream).empty();
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testSelectNullIntersectAll() {
String sql = "SELECT NULL INTERSECT ALL SELECT 2";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
System.err.println("SCHEMA " + stream.getSchema());
PAssert.that(stream).empty();
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testSelectNullExceptDistinct() {
String sql = "SELECT NULL EXCEPT DISTINCT SELECT 2";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
PAssert.that(stream).containsInAnyOrder(Row.nullRow(stream.getSchema()));
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testSelectNullExceptAll() {
String sql = "SELECT NULL EXCEPT ALL SELECT 2";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
PAssert.that(stream).containsInAnyOrder(Row.nullRow(stream.getSchema()));
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testSelectFromEmptyTable() {
String sql = "SELECT * FROM table_empty;";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
PAssert.that(stream).containsInAnyOrder();
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testStartsWithString() {
String sql = "SELECT STARTS_WITH('string1', 'stri')";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema = Schema.builder().addNullableField("field1", FieldType.BOOLEAN).build();
PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValues(true).build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testStartsWithString2() {
String sql = "SELECT STARTS_WITH(@p0, @p1)";
ImmutableMap<String, Value> params =
ImmutableMap.<String, Value>builder()
.put("p0", Value.createSimpleNullValue(TypeKind.TYPE_STRING))
.put("p1", Value.createStringValue(""))
.build();
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema = Schema.builder().addNullableField("field1", FieldType.BOOLEAN).build();
PAssert.that(stream)
.containsInAnyOrder(Row.withSchema(schema).addValues((Boolean) null).build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testStartsWithString3() {
String sql = "SELECT STARTS_WITH(@p0, @p1)";
ImmutableMap<String, Value> params =
ImmutableMap.<String, Value>builder()
.put("p0", Value.createSimpleNullValue(TypeKind.TYPE_STRING))
.put("p1", Value.createSimpleNullValue(TypeKind.TYPE_STRING))
.build();
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema = Schema.builder().addNullableField("field1", FieldType.BOOLEAN).build();
PAssert.that(stream)
.containsInAnyOrder(Row.withSchema(schema).addValues((Boolean) null).build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testEndsWithString() {
String sql = "SELECT STARTS_WITH('string1', 'ng0')";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema = Schema.builder().addNullableField("field1", FieldType.BOOLEAN).build();
PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValues(false).build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testEndsWithString2() {
String sql = "SELECT STARTS_WITH(@p0, @p1)";
ImmutableMap<String, Value> params =
ImmutableMap.<String, Value>builder()
.put("p0", Value.createSimpleNullValue(TypeKind.TYPE_STRING))
.put("p1", Value.createStringValue(""))
.build();
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema = Schema.builder().addNullableField("field1", FieldType.BOOLEAN).build();
PAssert.that(stream)
.containsInAnyOrder(Row.withSchema(schema).addValues((Boolean) null).build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testEndsWithString3() {
String sql = "SELECT STARTS_WITH(@p0, @p1)";
ImmutableMap<String, Value> params =
ImmutableMap.<String, Value>builder()
.put("p0", Value.createSimpleNullValue(TypeKind.TYPE_STRING))
.put("p1", Value.createSimpleNullValue(TypeKind.TYPE_STRING))
.build();
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema = Schema.builder().addNullableField("field1", FieldType.BOOLEAN).build();
PAssert.that(stream)
.containsInAnyOrder(Row.withSchema(schema).addValues((Boolean) null).build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testConcatWithOneParameters() {
String sql = "SELECT concat('abc')";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema = Schema.builder().addStringField("field1").build();
PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValues("abc").build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testConcatWithTwoParameters() {
String sql = "SELECT concat('abc', 'def')";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema = Schema.builder().addStringField("field1").build();
PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValues("abcdef").build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testConcatWithThreeParameters() {
String sql = "SELECT concat('abc', 'def', 'xyz')";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema = Schema.builder().addStringField("field1").build();
PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValues("abcdefxyz").build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testConcatWithFourParameters() {
String sql = "SELECT concat('abc', 'def', ' ', 'xyz')";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema = Schema.builder().addStringField("field1").build();
PAssert.that(stream)
.containsInAnyOrder(Row.withSchema(schema).addValues("abcdef xyz").build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testConcatWithFiveParameters() {
String sql = "SELECT concat('abc', 'def', ' ', 'xyz', 'kkk')";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema = Schema.builder().addStringField("field1").build();
PAssert.that(stream)
.containsInAnyOrder(Row.withSchema(schema).addValues("abcdef xyzkkk").build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testConcatWithSixParameters() {
String sql = "SELECT concat('abc', 'def', ' ', 'xyz', 'kkk', 'ttt')";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema = Schema.builder().addStringField("field1").build();
PAssert.that(stream)
.containsInAnyOrder(Row.withSchema(schema).addValues("abcdef xyzkkkttt").build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testConcatWithNull1() {
String sql = "SELECT CONCAT(@p0, @p1) AS ColA";
ImmutableMap<String, Value> params =
ImmutableMap.of(
"p0",
Value.createStringValue(""),
"p1",
Value.createSimpleNullValue(TypeKind.TYPE_STRING));
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema = Schema.builder().addNullableField("field1", FieldType.STRING).build();
PAssert.that(stream)
.containsInAnyOrder(Row.withSchema(schema).addValues((String) null).build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testConcatWithNull2() {
String sql = "SELECT CONCAT(@p0, @p1) AS ColA";
ImmutableMap<String, Value> params =
ImmutableMap.of(
"p0",
Value.createSimpleNullValue(TypeKind.TYPE_STRING),
"p1",
Value.createSimpleNullValue(TypeKind.TYPE_STRING));
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema = Schema.builder().addNullableField("field1", FieldType.STRING).build();
PAssert.that(stream)
.containsInAnyOrder(Row.withSchema(schema).addValues((String) null).build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testNamedParameterQuery() {
String sql = "SELECT @ColA AS ColA";
ImmutableMap<String, Value> params = ImmutableMap.of("ColA", Value.createInt64Value(5));
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema = Schema.builder().addInt64Field("field1").build();
PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValues(5L).build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testArrayStructLiteral() {
String sql = "SELECT ARRAY<STRUCT<INT64, INT64>>[(11, 12)];";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema innerSchema =
Schema.of(Field.of("s", FieldType.INT64), Field.of("i", FieldType.INT64));
final Schema schema =
Schema.of(Field.of("field1", FieldType.array(FieldType.row(innerSchema))));
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(schema)
.addValue(ImmutableList.of(Row.withSchema(innerSchema).addValues(11L, 12L).build()))
.build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testParameterStruct() {
String sql = "SELECT @p as ColA";
ImmutableMap<String, Value> params =
ImmutableMap.of(
"p",
Value.createStructValue(
TypeFactory.createStructType(
ImmutableList.of(
new StructType.StructField(
"s", TypeFactory.createSimpleType(TypeKind.TYPE_STRING)),
new StructType.StructField(
"i", TypeFactory.createSimpleType(TypeKind.TYPE_INT64)))),
ImmutableList.of(Value.createStringValue("foo"), Value.createInt64Value(1L))));
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema innerSchema =
Schema.of(Field.of("s", FieldType.STRING), Field.of("i", FieldType.INT64));
final Schema schema = Schema.of(Field.of("field1", FieldType.row(innerSchema)));
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(schema)
.addValue(Row.withSchema(innerSchema).addValues("foo", 1L).build())
.build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testParameterStructNested() {
String sql = "SELECT @outer_struct.inner_struct.s as ColA";
StructType innerStructType =
TypeFactory.createStructType(
ImmutableList.of(
new StructType.StructField(
"s", TypeFactory.createSimpleType(TypeKind.TYPE_STRING))));
ImmutableMap<String, Value> params =
ImmutableMap.of(
"outer_struct",
Value.createStructValue(
TypeFactory.createStructType(
ImmutableList.of(new StructType.StructField("inner_struct", innerStructType))),
ImmutableList.of(
Value.createStructValue(
innerStructType, ImmutableList.of(Value.createStringValue("foo"))))));
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema = Schema.builder().addStringField("field1").build();
PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValue("foo").build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testConcatNamedParameterQuery() {
String sql = "SELECT CONCAT(@p0, @p1) AS ColA";
ImmutableMap<String, Value> params =
ImmutableMap.of("p0", Value.createStringValue(""), "p1", Value.createStringValue("A"));
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema = Schema.builder().addStringField("field1").build();
PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValues("A").build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testConcatPositionalParameterQuery() {
String sql = "SELECT CONCAT(?, ?, ?) AS ColA";
ImmutableList<Value> params =
ImmutableList.of(
Value.createStringValue("a"),
Value.createStringValue("b"),
Value.createStringValue("c"));
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema = Schema.builder().addStringField("field1").build();
PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValues("abc").build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testReplace1() {
String sql = "SELECT REPLACE(@p0, @p1, @p2) AS ColA";
ImmutableMap<String, Value> params =
ImmutableMap.of(
"p0", Value.createStringValue(""),
"p1", Value.createStringValue(""),
"p2", Value.createStringValue("a"));
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema = Schema.builder().addStringField("field1").build();
PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValues("").build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testReplace2() {
String sql = "SELECT REPLACE(@p0, @p1, @p2) AS ColA";
ImmutableMap<String, Value> params =
ImmutableMap.of(
"p0", Value.createStringValue("abc"),
"p1", Value.createStringValue(""),
"p2", Value.createStringValue("xyz"));
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema = Schema.builder().addStringField("field1").build();
PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValues("abc").build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testReplace3() {
String sql = "SELECT REPLACE(@p0, @p1, @p2) AS ColA";
ImmutableMap<String, Value> params =
ImmutableMap.of(
"p0", Value.createStringValue(""),
"p1", Value.createStringValue(""),
"p2", Value.createSimpleNullValue(TypeKind.TYPE_STRING));
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema = Schema.builder().addNullableField("field1", FieldType.STRING).build();
PAssert.that(stream)
.containsInAnyOrder(Row.withSchema(schema).addValues((String) null).build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testReplace4() {
String sql = "SELECT REPLACE(@p0, @p1, @p2) AS ColA";
ImmutableMap<String, Value> params =
ImmutableMap.of(
"p0", Value.createSimpleNullValue(TypeKind.TYPE_STRING),
"p1", Value.createSimpleNullValue(TypeKind.TYPE_STRING),
"p2", Value.createStringValue(""));
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema = Schema.builder().addNullableField("field1", FieldType.STRING).build();
PAssert.that(stream)
.containsInAnyOrder(Row.withSchema(schema).addValues((String) null).build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testTrim1() {
String sql = "SELECT trim(@p0)";
ImmutableMap<String, Value> params =
ImmutableMap.of("p0", Value.createStringValue(" a b c "));
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema = Schema.builder().addStringField("field1").build();
PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValues("a b c").build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testTrim2() {
String sql = "SELECT trim(@p0, @p1)";
ImmutableMap<String, Value> params =
ImmutableMap.of(
"p0", Value.createStringValue("abxyzab"), "p1", Value.createStringValue("ab"));
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema = Schema.builder().addStringField("field1").build();
PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValues("xyz").build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testTrim3() {
String sql = "SELECT trim(@p0, @p1)";
ImmutableMap<String, Value> params =
ImmutableMap.of(
"p0", Value.createSimpleNullValue(TypeKind.TYPE_STRING),
"p1", Value.createSimpleNullValue(TypeKind.TYPE_STRING));
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema = Schema.builder().addNullableField("field1", FieldType.STRING).build();
PAssert.that(stream)
.containsInAnyOrder(Row.withSchema(schema).addValues((String) null).build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testLTrim1() {
String sql = "SELECT ltrim(@p0)";
ImmutableMap<String, Value> params =
ImmutableMap.of("p0", Value.createStringValue(" a b c "));
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema = Schema.builder().addStringField("field1").build();
PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValues("a b c ").build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testLTrim2() {
String sql = "SELECT ltrim(@p0, @p1)";
ImmutableMap<String, Value> params =
ImmutableMap.of(
"p0", Value.createStringValue("abxyzab"), "p1", Value.createStringValue("ab"));
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema = Schema.builder().addStringField("field1").build();
PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValues("xyzab").build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testLTrim3() {
String sql = "SELECT ltrim(@p0, @p1)";
ImmutableMap<String, Value> params =
ImmutableMap.of(
"p0", Value.createSimpleNullValue(TypeKind.TYPE_STRING),
"p1", Value.createSimpleNullValue(TypeKind.TYPE_STRING));
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema = Schema.builder().addNullableField("field1", FieldType.STRING).build();
PAssert.that(stream)
.containsInAnyOrder(Row.withSchema(schema).addValues((String) null).build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testRTrim1() {
String sql = "SELECT rtrim(@p0)";
ImmutableMap<String, Value> params =
ImmutableMap.of("p0", Value.createStringValue(" a b c "));
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema = Schema.builder().addStringField("field1").build();
PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValues(" a b c").build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testRTrim2() {
String sql = "SELECT rtrim(@p0, @p1)";
ImmutableMap<String, Value> params =
ImmutableMap.of(
"p0", Value.createStringValue("abxyzab"), "p1", Value.createStringValue("ab"));
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema = Schema.builder().addStringField("field1").build();
PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValues("abxyz").build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testRTrim3() {
String sql = "SELECT rtrim(@p0, @p1)";
ImmutableMap<String, Value> params =
ImmutableMap.of(
"p0", Value.createSimpleNullValue(TypeKind.TYPE_STRING),
"p1", Value.createSimpleNullValue(TypeKind.TYPE_STRING));
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema = Schema.builder().addNullableField("field1", FieldType.STRING).build();
PAssert.that(stream)
.containsInAnyOrder(Row.withSchema(schema).addValues((String) null).build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
@Ignore("https://jira.apache.org/jira/browse/BEAM-9191")
public void testCastBytesToString1() {
String sql = "SELECT CAST(@p0 AS STRING)";
ImmutableMap<String, Value> params =
ImmutableMap.of("p0", Value.createBytesValue(ByteString.copyFromUtf8("`")));
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema = Schema.builder().addStringField("field1").build();
PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValues("`").build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testCastBytesToString2() {
String sql = "SELECT CAST(b'b' AS STRING)";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema = Schema.builder().addStringField("field1").build();
PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValues("b").build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
@Ignore("https://jira.apache.org/jira/browse/BEAM-9191")
public void testCastBytesToStringFromTable() {
String sql = "SELECT CAST(bytes_col AS STRING) FROM table_all_types";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema = Schema.builder().addStringField("field1").build();
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(schema).addValues("1").build(),
Row.withSchema(schema).addValues("2").build(),
Row.withSchema(schema).addValues("3").build(),
Row.withSchema(schema).addValues("4").build(),
Row.withSchema(schema).addValues("5").build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testCastStringToTimestamp() {
String sql = "SELECT CAST('2019-01-15 13:21:03' AS TIMESTAMP)";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema = Schema.builder().addDateTimeField("field_1").build();
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(schema)
.addValues(parseTimestampWithUTCTimeZone("2019-01-15 13:21:03"))
.build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
// test default timezone works properly in query analysis stage
public void testCastStringToTimestampWithDefaultTimezoneSet() {
String sql = "SELECT CAST('2014-12-01 12:34:56+07:30' AS TIMESTAMP)";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
zetaSQLQueryPlanner.setDefaultTimezone("Pacific/Chatham");
pipeline
.getOptions()
.as(BeamSqlPipelineOptions.class)
.setZetaSqlDefaultTimezone("Pacific/Chatham");
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(Schema.builder().addDateTimeField("field_1").build())
.addValues(parseTimestampWithUTCTimeZone("2014-12-01 05:04:56"))
.build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
@Ignore("https://jira.apache.org/jira/browse/BEAM-10340")
public void testCastBetweenTimeAndString() {
String sql =
"SELECT CAST(s1 as TIME) as t2, CAST(t1 as STRING) as s2 FROM "
+ "(SELECT '12:34:56.123456' as s1, TIME '12:34:56.123456' as t1)";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(
Schema.builder()
.addLogicalTypeField("t2", SqlTypes.TIME)
.addStringField("s2")
.build())
.addValues(LocalTime.of(12, 34, 56, 123456000), "12:34:56.123456")
.build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testCastStringToString() {
String sql = "SELECT CAST(@p0 AS STRING)";
ImmutableMap<String, Value> params = ImmutableMap.of("p0", Value.createStringValue(""));
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema = Schema.builder().addStringField("field1").build();
PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValues("").build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testCastStringToInt64() {
String sql = "SELECT CAST(@p0 AS INT64)";
ImmutableMap<String, Value> params = ImmutableMap.of("p0", Value.createStringValue("123"));
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema = Schema.builder().addInt64Field("field1").build();
PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValues(123L).build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testSelectConstant() {
String sql = "SELECT 'hi'";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema = Schema.builder().addStringField("field1").build();
PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValues("hi").build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
@Ignore("[BEAM-8593] ZetaSQL does not support Map type")
public void testSelectFromTableWithMap() {
String sql = "SELECT row_field FROM table_with_map";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
Schema rowSchema = Schema.builder().addInt64Field("row_id").addStringField("data").build();
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(Schema.builder().addRowField("row_field", rowSchema).build())
.addValues(Row.withSchema(rowSchema).addValues(1L, "data1").build())
.build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testSubQuery() {
String sql = "select sum(Key) from KeyValue\n" + "group by (select Key)";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
thrown.expect(UnsupportedOperationException.class);
thrown.expectMessage("Does not support sub-queries");
zetaSQLQueryPlanner.convertToBeamRel(sql);
}
@Test
public void testSubstr() {
String sql = "SELECT substr(@p0, @p1, @p2)";
ImmutableMap<String, Value> params =
ImmutableMap.of(
"p0", Value.createStringValue("abc"),
"p1", Value.createInt64Value(-2L),
"p2", Value.createInt64Value(1L));
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema = Schema.builder().addStringField("field1").build();
PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValues("b").build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testSubstrWithLargeValueExpectException() {
String sql = "SELECT substr(@p0, @p1, @p2)";
ImmutableMap<String, Value> params =
ImmutableMap.of(
"p0", Value.createStringValue("abc"),
"p1", Value.createInt64Value(Integer.MAX_VALUE + 1L),
"p2", Value.createInt64Value(Integer.MIN_VALUE - 1L));
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params);
BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
thrown.expect(RuntimeException.class);
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testSelectAll() {
String sql = "SELECT ALL Key, Value FROM KeyValue;";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema = Schema.builder().addInt64Field("field1").addStringField("field2").build();
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(schema).addValues(14L, "KeyValue234").build(),
Row.withSchema(schema).addValues(15L, "KeyValue235").build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testSelectDistinct() {
String sql = "SELECT DISTINCT Key FROM aggregate_test_table;";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema = Schema.builder().addInt64Field("field1").build();
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(schema).addValues(1L).build(),
Row.withSchema(schema).addValues(2L).build(),
Row.withSchema(schema).addValues(3L).build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testSelectDistinct2() {
String sql =
"SELECT DISTINCT val.BYTES\n"
+ "from (select b\"BYTES\" BYTES union all\n"
+ " select b\"bytes\" union all\n"
+ " select b\"ByTeS\") val";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema = Schema.builder().addByteArrayField("field1").build();
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(schema).addValues("BYTES".getBytes(StandardCharsets.UTF_8)).build(),
Row.withSchema(schema).addValues("ByTeS".getBytes(StandardCharsets.UTF_8)).build(),
Row.withSchema(schema).addValues("bytes".getBytes(StandardCharsets.UTF_8)).build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testSelectBytes() {
String sql = "SELECT b\"ByTes\"";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema = Schema.builder().addByteArrayField("field1").build();
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(schema).addValues("ByTes".getBytes(StandardCharsets.UTF_8)).build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testSelectExcept() {
String sql = "SELECT * EXCEPT (Key, ts) FROM KeyValue;";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema = Schema.builder().addStringField("field2").build();
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(schema).addValues("KeyValue234").build(),
Row.withSchema(schema).addValues("KeyValue235").build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testSelectReplace() {
String sql =
"WITH orders AS\n"
+ " (SELECT 5 as order_id,\n"
+ " \"sprocket\" as item_name,\n"
+ " 200 as quantity)\n"
+ "SELECT * REPLACE (\"widget\" AS item_name)\n"
+ "FROM orders";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema =
Schema.builder()
.addInt64Field("field1")
.addStringField("field2")
.addInt64Field("field3")
.build();
PAssert.that(stream)
.containsInAnyOrder(Row.withSchema(schema).addValues(5L, "widget", 200L).build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testUnionAllBasic() {
String sql =
"SELECT row_id FROM table_all_types UNION ALL SELECT row_id FROM table_all_types_2";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema = Schema.builder().addInt64Field("field1").build();
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(schema).addValue(1L).build(),
Row.withSchema(schema).addValue(2L).build(),
Row.withSchema(schema).addValue(3L).build(),
Row.withSchema(schema).addValue(4L).build(),
Row.withSchema(schema).addValue(5L).build(),
Row.withSchema(schema).addValue(6L).build(),
Row.withSchema(schema).addValue(7L).build(),
Row.withSchema(schema).addValue(8L).build(),
Row.withSchema(schema).addValue(9L).build(),
Row.withSchema(schema).addValue(10L).build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testAVGWithLongInput() {
String sql = "SELECT AVG(f_int_1) FROM aggregate_test_table;";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
thrown.expect(RuntimeException.class);
thrown.expectMessage(
"AVG(LONG) is not supported. You might want to use AVG(CAST(expression AS DOUBLE).");
zetaSQLQueryPlanner.convertToBeamRel(sql);
}
@Test
public void testReverseString() {
String sql = "SELECT REVERSE('abc');";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema = Schema.builder().addStringField("field2").build();
PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValues("cba").build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testCharLength() {
String sql = "SELECT CHAR_LENGTH('abc');";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema = Schema.builder().addInt64Field("field").build();
PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValues(3L).build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testCharLengthNull() {
String sql = "SELECT CHAR_LENGTH(@p0);";
ImmutableMap<String, Value> params =
ImmutableMap.of("p0", Value.createSimpleNullValue(TypeKind.TYPE_STRING));
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema = Schema.builder().addNullableField("field", FieldType.INT64).build();
PAssert.that(stream)
.containsInAnyOrder(Row.withSchema(schema).addValues((Object) null).build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testTumbleAsTVF() {
String sql =
"select Key, Value, ts, window_start, window_end from "
+ "TUMBLE((select * from KeyValue), descriptor(ts), 'INTERVAL 1 SECOND')";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
ImmutableMap<String, Value> params = ImmutableMap.of();
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema =
Schema.builder()
.addInt64Field("Key")
.addStringField("Value")
.addDateTimeField("ts")
.addDateTimeField("window_start")
.addDateTimeField("window_end")
.build();
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(schema)
.addValues(
14L,
"KeyValue234",
DateTimeUtils.parseTimestampWithUTCTimeZone("2018-07-01 21:26:06"),
DateTimeUtils.parseTimestampWithUTCTimeZone("2018-07-01 21:26:06"),
DateTimeUtils.parseTimestampWithUTCTimeZone("2018-07-01 21:26:07"))
.build(),
Row.withSchema(schema)
.addValues(
15L,
"KeyValue235",
DateTimeUtils.parseTimestampWithUTCTimeZone("2018-07-01 21:26:07"),
DateTimeUtils.parseTimestampWithUTCTimeZone("2018-07-01 21:26:07"),
DateTimeUtils.parseTimestampWithUTCTimeZone("2018-07-01T21:26:08"))
.build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testIsNullTrueFalse() {
String sql =
"WITH Src AS (\n"
+ " SELECT NULL as data UNION ALL\n"
+ " SELECT TRUE UNION ALL\n"
+ " SELECT FALSE\n"
+ ")\n"
+ "SELECT\n"
+ " data IS NULL as isnull,\n"
+ " data IS NOT NULL as isnotnull,\n"
+ " data IS TRUE as istrue,\n"
+ " data IS NOT TRUE as isnottrue,\n"
+ " data IS FALSE as isfalse,\n"
+ " data IS NOT FALSE as isnotfalse\n"
+ "FROM Src\n";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
ImmutableMap<String, Value> params = ImmutableMap.of();
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema =
Schema.builder()
.addField("isnull", FieldType.BOOLEAN)
.addField("isnotnull", FieldType.BOOLEAN)
.addField("istrue", FieldType.BOOLEAN)
.addField("isnottrue", FieldType.BOOLEAN)
.addField("isfalse", FieldType.BOOLEAN)
.addField("isnotfalse", FieldType.BOOLEAN)
.build();
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(schema).addValues(true, false, false, true, false, true).build(),
Row.withSchema(schema).addValues(false, true, true, false, false, true).build(),
Row.withSchema(schema).addValues(false, true, false, true, true, false).build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testZetaSQLBitOr() {
String sql = "SELECT BIT_OR(row_id) FROM table_all_types GROUP BY bool_col";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema = Schema.builder().addInt64Field("field1").build();
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(schema).addValues(3L).build(),
Row.withSchema(schema).addValue(7L).build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
@Ignore("NULL values don't work correctly. (https://issues.apache.org/jira/browse/BEAM-10379)")
public void testZetaSQLBitAnd() {
String sql = "SELECT BIT_AND(row_id) FROM table_all_types GROUP BY bool_col";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema = Schema.builder().addInt64Field("field1").build();
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(schema).addValue(1L).build(),
Row.withSchema(schema).addValue(0L).build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
@Test
public void testSimpleTableName() {
String sql = "SELECT Key FROM KeyValue";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
Schema singleField = Schema.builder().addInt64Field("field1").build();
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(singleField).addValues(14L).build(),
Row.withSchema(singleField).addValues(15L).build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
}