blob: 41f916bef236a03003c8fef4d3551af93c483828 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.extensions.sql;
import java.util.Arrays;
import org.apache.beam.sdk.extensions.sql.impl.schema.BeamPCollectionTable;
import org.apache.beam.sdk.extensions.sql.meta.provider.ReadOnlyTableProvider;
import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.calcite.v1_20_0.com.google.common.collect.ImmutableMap;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
/** Ad-hoc tests for CAST. */
public class BeamSqlMultipleSchemasTest {
private static final Schema ROW_SCHEMA =
Schema.builder().addInt32Field("f_int").addStringField("f_string").build();
@Rule public final TestPipeline pipeline = TestPipeline.create();
@Rule public ExpectedException exceptions = ExpectedException.none();
@Test
public void testSelectFromQualifiedPCollection() {
PCollection<Row> input = pipeline.apply(create(row(1, "strstr")));
PCollection<Row> result =
input.apply(SqlTransform.query("SELECT f_int, f_string FROM beam.PCOLLECTION"));
PAssert.that(result).containsInAnyOrder(row(1, "strstr"));
pipeline.run();
}
@Test
public void testSelectFromUnqualifiedPCollection() {
PCollection<Row> input = pipeline.apply(create(row(1, "strstr")));
PCollection<Row> result =
input.apply(SqlTransform.query("SELECT f_int, f_string FROM PCOLLECTION"));
PAssert.that(result).containsInAnyOrder(row(1, "strstr"));
pipeline.run();
}
@Test
public void testSelectFromExtraSchema() {
PCollection<Row> inputMain =
pipeline.apply("mainInput", create(row(1, "pcollection_1"), row(2, "pcollection_2")));
PCollection<Row> inputExtra =
pipeline.apply("extraInput", create(row(1, "_extra_table_1"), row(2, "_extra_table_2")));
TableProvider extraInputProvider = extraTableProvider("extraTable", inputExtra);
PCollection<Row> result =
inputMain.apply(
SqlTransform.query("SELECT f_int, f_string FROM extraSchema.extraTable")
.withTableProvider("extraSchema", extraInputProvider));
PAssert.that(result).containsInAnyOrder(row(1, "_extra_table_1"), row(2, "_extra_table_2"));
pipeline.run();
}
@Test
public void testOverrideUnqualifiedMainSchema() {
PCollection<Row> inputMain =
pipeline.apply("mainInput", create(row(1, "pcollection_1"), row(2, "pcollection_2")));
PCollection<Row> inputExtra =
pipeline.apply("extraInput", create(row(1, "_extra_table_1"), row(2, "_extra_table_2")));
TableProvider extraInputProvider = extraTableProvider("extraTable", inputExtra);
PCollection<Row> result =
inputMain.apply(
SqlTransform.query("SELECT f_int, f_string FROM extraTable")
.withTableProvider("beam", extraInputProvider));
PAssert.that(result).containsInAnyOrder(row(1, "_extra_table_1"), row(2, "_extra_table_2"));
pipeline.run();
}
@Test
public void testOverrideQualifiedMainSchema() {
PCollection<Row> inputMain =
pipeline.apply("mainInput", create(row(1, "pcollection_1"), row(2, "pcollection_2")));
PCollection<Row> inputExtra =
pipeline.apply("extraInput", create(row(1, "_extra_table_1"), row(2, "_extra_table_2")));
TableProvider extraInputProvider = extraTableProvider("extraTable", inputExtra);
PCollection<Row> result =
inputMain.apply(
SqlTransform.query("SELECT f_int, f_string FROM beam.extraTable")
.withTableProvider("beam", extraInputProvider));
PAssert.that(result).containsInAnyOrder(row(1, "_extra_table_1"), row(2, "_extra_table_2"));
pipeline.run();
}
@Test
public void testSetDefaultUnqualifiedSchema() {
PCollection<Row> inputMain =
pipeline.apply("mainInput", create(row(1, "pcollection_1"), row(2, "pcollection_2")));
PCollection<Row> inputExtra =
pipeline.apply("extraInput", create(row(1, "_extra_table_1"), row(2, "_extra_table_2")));
TableProvider extraInputProvider = extraTableProvider("extraTable", inputExtra);
PCollection<Row> result =
inputMain.apply(
SqlTransform.query("SELECT f_int, f_string FROM extraTable")
.withDefaultTableProvider("extraSchema", extraInputProvider));
PAssert.that(result).containsInAnyOrder(row(1, "_extra_table_1"), row(2, "_extra_table_2"));
pipeline.run();
}
@Test
public void testSetDefaultUnqualifiedSchemaAndJoin() {
PCollection<Row> inputMain =
pipeline.apply("mainInput", create(row(1, "pcollection_1"), row(2, "pcollection_2")));
PCollection<Row> inputExtra =
pipeline.apply("extraInput", create(row(1, "_extra_table_1"), row(2, "_extra_table_2")));
TableProvider extraInputProvider = extraTableProvider("extraTable", inputExtra);
PCollection<Row> result =
inputMain.apply(
SqlTransform.query(
"SELECT extra.f_int, main.f_string || extra.f_string AS f_string \n"
+ "FROM extraTable AS extra \n"
+ " INNER JOIN \n"
+ " beam.PCOLLECTION AS main \n"
+ " ON main.f_int = extra.f_int")
.withDefaultTableProvider("extraSchema", extraInputProvider));
PAssert.that(result)
.containsInAnyOrder(
row(1, "pcollection_1_extra_table_1"), row(2, "pcollection_2_extra_table_2"));
pipeline.run();
}
@Test
public void testSetDefaultQualifiedSchema() {
PCollection<Row> inputMain =
pipeline.apply("mainInput", create(row(1, "pcollection_1"), row(2, "pcollection_2")));
PCollection<Row> inputExtra =
pipeline.apply("extraInput", create(row(1, "_extra_table_1"), row(2, "_extra_table_2")));
TableProvider extraInputProvider = extraTableProvider("extraTable", inputExtra);
PCollection<Row> result =
inputMain.apply(
SqlTransform.query("SELECT f_int, f_string FROM extraSchema.extraTable")
.withDefaultTableProvider("extraSchema", extraInputProvider));
PAssert.that(result).containsInAnyOrder(row(1, "_extra_table_1"), row(2, "_extra_table_2"));
pipeline.run();
}
@Test
public void testJoinWithExtraSchema() {
PCollection<Row> inputMain =
pipeline.apply("mainInput", create(row(1, "pcollection_1"), row(2, "pcollection_2")));
PCollection<Row> inputExtra =
pipeline.apply("extraInput", create(row(1, "_extra_table_1"), row(2, "_extra_table_2")));
TableProvider extraInputProvider = extraTableProvider("extraTable", inputExtra);
PCollection<Row> result =
inputMain.apply(
SqlTransform.query(
"SELECT extra.f_int, main.f_string || extra.f_string AS f_string \n"
+ "FROM extraSchema.extraTable AS extra \n"
+ " INNER JOIN \n"
+ " PCOLLECTION AS main \n"
+ " ON main.f_int = extra.f_int")
.withTableProvider("extraSchema", extraInputProvider));
PAssert.that(result)
.containsInAnyOrder(
row(1, "pcollection_1_extra_table_1"), row(2, "pcollection_2_extra_table_2"));
pipeline.run();
}
@Test
public void testJoinQualifiedMainWithExtraSchema() {
PCollection<Row> inputMain =
pipeline.apply("mainInput", create(row(1, "pcollection_1"), row(2, "pcollection_2")));
PCollection<Row> inputExtra =
pipeline.apply("extraInput", create(row(1, "_extra_table_1"), row(2, "_extra_table_2")));
TableProvider extraInputProvider = extraTableProvider("extraTable", inputExtra);
PCollection<Row> result =
inputMain.apply(
SqlTransform.query(
"SELECT extra.f_int, main.f_string || extra.f_string AS f_string \n"
+ "FROM extraSchema.extraTable AS extra \n"
+ " INNER JOIN \n"
+ " beam.PCOLLECTION AS main \n"
+ " ON main.f_int = extra.f_int")
.withTableProvider("extraSchema", extraInputProvider));
PAssert.that(result)
.containsInAnyOrder(
row(1, "pcollection_1_extra_table_1"), row(2, "pcollection_2_extra_table_2"));
pipeline.run();
}
private TableProvider extraTableProvider(String tableName, PCollection<Row> rows) {
return new ReadOnlyTableProvider(
"testExtraTableProvider", ImmutableMap.of(tableName, new BeamPCollectionTable<>(rows)));
}
private Row row(int fIntValue, String fStringValue) {
return Row.withSchema(ROW_SCHEMA).addValues(fIntValue, fStringValue).build();
}
private PTransform<PBegin, PCollection<Row>> create(Row... rows) {
return Create.of(Arrays.asList(rows)).withRowSchema(ROW_SCHEMA);
}
}