blob: 93603449b05aa3493065220f346c006e50fb6393 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.extensions.sql;
import com.google.common.collect.ImmutableMap;
import java.util.Arrays;
import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
import org.apache.beam.sdk.extensions.sql.meta.provider.ReadOnlyTableProvider;
import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestBoundedTable;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.FieldType;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.joda.time.Duration;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
/** Unit Tests for ComplexTypes, including nested ROW etc. */
public class BeamComplexTypeTest {
private static final Schema innerRowSchema =
Schema.builder().addStringField("string_field").addInt64Field("long_field").build();
private static final Schema innerRowWithArraySchema =
Schema.builder()
.addStringField("string_field")
.addArrayField("array_field", FieldType.INT64)
.build();
private static final Schema nestedRowWithArraySchema =
Schema.builder()
.addStringField("field1")
.addRowField("field2", innerRowWithArraySchema)
.addInt64Field("field3")
.addArrayField("field4", FieldType.array(FieldType.STRING))
.build();
private static final Schema nestedRowSchema =
Schema.builder()
.addStringField("nonRowfield1")
.addRowField("RowField", innerRowSchema)
.addInt64Field("nonRowfield2")
.addRowField("RowFieldTwo", innerRowSchema)
.build();
private static final Schema rowWithArraySchema =
Schema.builder()
.addStringField("field1")
.addInt64Field("field2")
.addArrayField("field3", FieldType.INT64)
.build();
private static final Schema flattenedRowSchema =
Schema.builder()
.addStringField("field1")
.addStringField("field2")
.addInt64Field("field3")
.addInt64Field("field4")
.addStringField("field5")
.addInt64Field("field6")
.build();
private static final ReadOnlyTableProvider readOnlyTableProvider =
new ReadOnlyTableProvider(
"test_provider",
ImmutableMap.of(
"arrayWithRowTestTable",
TestBoundedTable.of(FieldType.array(FieldType.row(innerRowSchema)), "col")
.addRows(
Arrays.asList(Row.withSchema(innerRowSchema).addValues("str", 1L).build())),
"nestedArrayTestTable",
TestBoundedTable.of(FieldType.array(FieldType.array(FieldType.INT64)), "col")
.addRows(Arrays.asList(Arrays.asList(1L, 2L, 3L), Arrays.asList(4L, 5L))),
"nestedRowTestTable",
TestBoundedTable.of(Schema.FieldType.row(nestedRowSchema), "col")
.addRows(
Row.withSchema(nestedRowSchema)
.addValues(
"str",
Row.withSchema(innerRowSchema).addValues("inner_str_one", 1L).build(),
2L,
Row.withSchema(innerRowSchema).addValues("inner_str_two", 3L).build())
.build()),
"basicRowTestTable",
TestBoundedTable.of(Schema.FieldType.row(innerRowSchema), "col")
.addRows(Row.withSchema(innerRowSchema).addValues("innerStr", 1L).build()),
"rowWithArrayTestTable",
TestBoundedTable.of(Schema.FieldType.row(rowWithArraySchema), "col")
.addRows(
Row.withSchema(rowWithArraySchema)
.addValues("str", 4L, Arrays.asList(5L, 6L))
.build())));
@Rule public transient TestPipeline pipeline = TestPipeline.create();
@Test
public void testNestedRow() {
BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory(readOnlyTableProvider);
PCollection<Row> stream =
BeamSqlRelUtils.toPCollection(
pipeline, sqlEnv.parseQuery("SELECT nestedRowTestTable.col FROM nestedRowTestTable"));
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(flattenedRowSchema)
.addValues("str", "inner_str_one", 1L, 2L, "inner_str_two", 3L)
.build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
}
@Test
public void testArrayWithRow() {
BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory(readOnlyTableProvider);
PCollection<Row> stream =
BeamSqlRelUtils.toPCollection(
pipeline,
sqlEnv.parseQuery("SELECT arrayWithRowTestTable.col[1] FROM arrayWithRowTestTable"));
PAssert.that(stream)
.containsInAnyOrder(Row.withSchema(innerRowSchema).addValues("str", 1L).build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
}
@Test
public void testNestedArray() {
BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory(readOnlyTableProvider);
PCollection<Row> stream =
BeamSqlRelUtils.toPCollection(
pipeline,
sqlEnv.parseQuery(
"SELECT nestedArrayTestTable.col[1][3], nestedArrayTestTable.col[2][1] FROM nestedArrayTestTable"));
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(Schema.builder().addInt64Field("field1").addInt64Field("field2").build())
.addValues(3L, 4L)
.build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
}
@Test
public void testBasicRow() {
BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory(readOnlyTableProvider);
PCollection<Row> stream =
BeamSqlRelUtils.toPCollection(
pipeline, sqlEnv.parseQuery("SELECT col FROM basicRowTestTable"));
PAssert.that(stream)
.containsInAnyOrder(Row.withSchema(innerRowSchema).addValues("innerStr", 1L).build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
}
@Test
public void testRowWithArray() {
BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory(readOnlyTableProvider);
PCollection<Row> stream =
BeamSqlRelUtils.toPCollection(
pipeline,
sqlEnv.parseQuery(
"SELECT rowWithArrayTestTable.col.field3[2] FROM rowWithArrayTestTable"));
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(Schema.builder().addInt64Field("int64").build()).addValue(6L).build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
}
@Test
public void testFieldAccessToNestedRow() {
BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory(readOnlyTableProvider);
PCollection<Row> stream =
BeamSqlRelUtils.toPCollection(
pipeline,
sqlEnv.parseQuery(
"SELECT nestedRowTestTable.col.RowField.string_field, nestedRowTestTable.col.RowFieldTwo.long_field FROM nestedRowTestTable"));
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(
Schema.builder().addStringField("field1").addInt64Field("field2").build())
.addValues("inner_str_one", 3L)
.build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
}
@Ignore("https://issues.apache.org/jira/browse/BEAM-5189")
@Test
public void testSelectInnerRowOfNestedRow() {
BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory(readOnlyTableProvider);
PCollection<Row> stream =
BeamSqlRelUtils.toPCollection(
pipeline,
sqlEnv.parseQuery("SELECT nestedRowTestTable.col.RowField FROM nestedRowTestTable"));
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(
Schema.builder().addStringField("field1").addInt64Field("field2").build())
.addValues("inner_str_one", 1L)
.build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
}
@Test
public void testRowConstructor() {
BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory(readOnlyTableProvider);
PCollection<Row> stream =
BeamSqlRelUtils.toPCollection(
pipeline, sqlEnv.parseQuery("SELECT ROW(1, ROW(2, 3), 'str', ROW('str2', 'str3'))"));
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(
Schema.builder()
.addInt32Field("field1")
.addInt32Field("field2")
.addInt32Field("field3")
.addStringField("field4")
.addStringField("field5")
.addStringField("field6")
.build())
.addValues(1, 2, 3, "str", "str2", "str3")
.build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
}
}