blob: b416a3f1574671019de42a3df136004cf7c5d283 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.extensions.sql.meta;
import static org.junit.Assert.assertThrows;
import java.io.Serializable;
import java.util.Map;
import org.apache.beam.sdk.extensions.sql.SqlTransform;
import org.apache.beam.sdk.extensions.sql.impl.TableName;
import org.apache.beam.sdk.extensions.sql.meta.provider.FullNameTableProvider;
import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestBoundedTable;
import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestTableProvider;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.calcite.v1_20_0.com.google.common.collect.ImmutableList;
import org.joda.time.Duration;
import org.junit.Rule;
import org.junit.Test;
/** Test for custom table resolver and full name table provider. */
public class CustomTableResolverTest implements Serializable {
@Rule public final transient TestPipeline pipeline = TestPipeline.create();
private static final Schema BASIC_SCHEMA =
Schema.builder().addInt32Field("id").addStringField("name").build();
/**
* Test table provider with custom name resolution.
*
* <p>Demonstrates how to parse table names as in normal Calcite queries syntax, e.g. {@code
* a.b.c.d} and convert them to its' own custom table name format {@code a_b_c_d}.
*/
public static class CustomResolutionTestTableProvider extends FullNameTableProvider {
TestTableProvider delegateTableProvider;
public CustomResolutionTestTableProvider() {
delegateTableProvider = new TestTableProvider();
}
@Override
public Table getTable(String tableName) {
return delegateTableProvider.getTable(tableName);
}
@Override
public Table getTableByFullName(TableName fullTableName) {
// For the test we register tables with underscore instead of dots, so here we lookup the
// tables
// with those underscore.
String actualTableName =
String.join("_", fullTableName.getPath()) + "_" + fullTableName.getTableName();
return delegateTableProvider.getTable(actualTableName);
}
@Override
public String getTableType() {
return delegateTableProvider.getTableType();
}
@Override
public void createTable(Table table) {
delegateTableProvider.createTable(table);
}
public void addRows(String tableName, Row... rows) {
delegateTableProvider.addRows(tableName, rows);
}
@Override
public void dropTable(String tableName) {
delegateTableProvider.dropTable(tableName);
}
@Override
public Map<String, Table> getTables() {
return delegateTableProvider.getTables();
}
@Override
public BeamSqlTable buildBeamSqlTable(Table table) {
return delegateTableProvider.buildBeamSqlTable(table);
}
}
@Test
public void testSimpleId() throws Exception {
CustomResolutionTestTableProvider tableProvider = new CustomResolutionTestTableProvider();
tableProvider.createTable(
Table.builder().name("testtable").schema(BASIC_SCHEMA).type("test").build());
tableProvider.addRows("testtable", row(1, "one"), row(2, "two"));
PCollection<Row> result =
pipeline.apply(
SqlTransform.query("SELECT id, name FROM testtable")
.withDefaultTableProvider("testprovider", tableProvider));
PAssert.that(result).containsInAnyOrder(row(1, "one"), row(2, "two"));
pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
}
@Test
public void testDefaultBuildIOReader_withEmptyParams_returnsPCollection() {
TestBoundedTable testTable = TestBoundedTable.of(BASIC_SCHEMA).addRows(1, "one");
Row expected = row(1, "one");
PCollection<Row> resultWithEmpty =
testTable.buildIOReader(
pipeline.begin(), testTable.constructFilter(ImmutableList.of()), ImmutableList.of());
PAssert.that(resultWithEmpty).containsInAnyOrder(expected);
pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
}
@Test
public void testDefaultBuildIOReader_withNonEmptyParams_throwsException() {
TestBoundedTable testTable = TestBoundedTable.of(BASIC_SCHEMA).addRows(1, "one");
assertThrows(
IllegalArgumentException.class,
() -> testTable.buildIOReader(pipeline.begin(), () -> null, ImmutableList.of()));
assertThrows(
IllegalArgumentException.class,
() ->
testTable.buildIOReader(
pipeline.begin(),
new DefaultTableFilter(ImmutableList.of()),
ImmutableList.of("one")));
pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
}
@Test
public void testSimpleIdWithExplicitDefaultSchema() throws Exception {
CustomResolutionTestTableProvider tableProvider = new CustomResolutionTestTableProvider();
tableProvider.createTable(
Table.builder().name("testtable").schema(BASIC_SCHEMA).type("test").build());
tableProvider.addRows("testtable", row(1, "one"), row(2, "two"));
PCollection<Row> result =
pipeline.apply(
SqlTransform.query("SELECT id, name FROM testprovider.testtable")
.withDefaultTableProvider("testprovider", tableProvider));
PAssert.that(result).containsInAnyOrder(row(1, "one"), row(2, "two"));
pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
}
@Test
public void testSimpleIdWithExplicitDefaultSchemaWithMultipleProviders() throws Exception {
CustomResolutionTestTableProvider tableProvider = new CustomResolutionTestTableProvider();
tableProvider.createTable(
Table.builder().name("testtable").schema(BASIC_SCHEMA).type("test").build());
tableProvider.addRows("testtable", row(1, "one"), row(2, "two"));
CustomResolutionTestTableProvider tableProvider2 = new CustomResolutionTestTableProvider();
tableProvider2.createTable(
Table.builder().name("testtable2").schema(BASIC_SCHEMA).type("test").build());
tableProvider2.addRows("testtable2", row(3, "three"), row(4, "four"));
PCollection<Row> result =
pipeline.apply(
SqlTransform.query("SELECT id, name FROM testprovider2.testtable2")
.withTableProvider("testprovider2", tableProvider2)
.withDefaultTableProvider("testprovider", tableProvider));
PAssert.that(result).containsInAnyOrder(row(3, "three"), row(4, "four"));
pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
}
@Test
public void testSimpleIdWithExplicitNonDefaultSchema() throws Exception {
CustomResolutionTestTableProvider tableProvider = new CustomResolutionTestTableProvider();
tableProvider.createTable(
Table.builder().name("testtable").schema(BASIC_SCHEMA).type("test").build());
tableProvider.addRows("testtable", row(1, "one"), row(2, "two"));
CustomResolutionTestTableProvider tableProvider2 = new CustomResolutionTestTableProvider();
tableProvider2.createTable(
Table.builder().name("testtable2").schema(BASIC_SCHEMA).type("test").build());
tableProvider2.addRows("testtable2", row(3, "three"), row(4, "four"));
PCollection<Row> result =
pipeline.apply(
SqlTransform.query("SELECT id, name FROM testprovider2.testtable2")
.withTableProvider("testprovider2", tableProvider2)
.withDefaultTableProvider("testprovider", tableProvider));
PAssert.that(result).containsInAnyOrder(row(3, "three"), row(4, "four"));
pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
}
@Test
public void testCompoundIdInDefaultSchema() throws Exception {
CustomResolutionTestTableProvider tableProvider = new CustomResolutionTestTableProvider();
tableProvider.createTable(
Table.builder().name("testtable_blah").schema(BASIC_SCHEMA).type("test").build());
tableProvider.addRows("testtable_blah", row(1, "one"), row(2, "two"));
PCollection<Row> result =
pipeline.apply(
SqlTransform.query("SELECT id, name FROM testtable.blah")
.withDefaultTableProvider("testprovider", tableProvider));
PAssert.that(result).containsInAnyOrder(row(1, "one"), row(2, "two"));
pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
}
@Test
public void testCompoundIdInExplicitDefaultSchema() throws Exception {
CustomResolutionTestTableProvider tableProvider = new CustomResolutionTestTableProvider();
tableProvider.createTable(
Table.builder().name("testtable_blah").schema(BASIC_SCHEMA).type("test").build());
tableProvider.addRows("testtable_blah", row(1, "one"), row(2, "two"));
PCollection<Row> result =
pipeline.apply(
SqlTransform.query("SELECT id, name FROM testprovider.testtable.blah")
.withDefaultTableProvider("testprovider", tableProvider));
PAssert.that(result).containsInAnyOrder(row(1, "one"), row(2, "two"));
pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
}
@Test
public void testLongCompoundIdInDefaultSchema() throws Exception {
CustomResolutionTestTableProvider tableProvider = new CustomResolutionTestTableProvider();
tableProvider.createTable(
Table.builder().name("testtable_blah_foo_bar").schema(BASIC_SCHEMA).type("test").build());
tableProvider.addRows("testtable_blah_foo_bar", row(1, "one"), row(2, "two"));
PCollection<Row> result =
pipeline.apply(
SqlTransform.query("SELECT id, name FROM testtable.blah.foo.bar")
.withDefaultTableProvider("testprovider", tableProvider));
PAssert.that(result).containsInAnyOrder(row(1, "one"), row(2, "two"));
pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
}
@Test
public void testLongCompoundIdInDefaultSchemaWithMultipleProviders() throws Exception {
CustomResolutionTestTableProvider tableProvider = new CustomResolutionTestTableProvider();
tableProvider.createTable(
Table.builder().name("testtable_blah_foo_bar").schema(BASIC_SCHEMA).type("test").build());
tableProvider.addRows("testtable_blah_foo_bar", row(1, "one"), row(2, "two"));
CustomResolutionTestTableProvider tableProvider2 = new CustomResolutionTestTableProvider();
tableProvider2.createTable(
Table.builder().name("testtable_blah_foo_bar").schema(BASIC_SCHEMA).type("test").build());
tableProvider2.addRows("testtable_blah_foo_bar", row(3, "three"), row(4, "four"));
PCollection<Row> result =
pipeline.apply(
SqlTransform.query("SELECT id, name FROM testtable.blah.foo.bar")
.withTableProvider("testprovider2", tableProvider2)
.withDefaultTableProvider("testprovider", tableProvider));
PAssert.that(result).containsInAnyOrder(row(1, "one"), row(2, "two"));
pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
}
@Test
public void testLongCompoundIdInExplicitDefaultSchema() throws Exception {
CustomResolutionTestTableProvider tableProvider = new CustomResolutionTestTableProvider();
tableProvider.createTable(
Table.builder().name("testtable_blah_foo_bar").schema(BASIC_SCHEMA).type("test").build());
tableProvider.addRows("testtable_blah_foo_bar", row(1, "one"), row(2, "two"));
PCollection<Row> result =
pipeline.apply(
SqlTransform.query("SELECT id, name FROM testprovider.testtable.blah.foo.bar")
.withDefaultTableProvider("testprovider", tableProvider));
PAssert.that(result).containsInAnyOrder(row(1, "one"), row(2, "two"));
pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
}
@Test
public void testLongCompoundIdInNonDefaultSchemaSameTableNames() throws Exception {
CustomResolutionTestTableProvider tableProvider = new CustomResolutionTestTableProvider();
tableProvider.createTable(
Table.builder().name("testtable_blah_foo_bar").schema(BASIC_SCHEMA).type("test").build());
tableProvider.addRows("testtable_blah_foo_bar", row(1, "one"), row(2, "two"));
CustomResolutionTestTableProvider tableProvider2 = new CustomResolutionTestTableProvider();
tableProvider2.createTable(
Table.builder().name("testtable_blah_foo_bar").schema(BASIC_SCHEMA).type("test").build());
tableProvider2.addRows("testtable_blah_foo_bar", row(3, "three"), row(4, "four"));
PCollection<Row> result =
pipeline.apply(
SqlTransform.query("SELECT id, name FROM testprovider2.testtable.blah.foo.bar")
.withTableProvider("testprovider2", tableProvider2)
.withDefaultTableProvider("testprovider", tableProvider));
PAssert.that(result).containsInAnyOrder(row(3, "three"), row(4, "four"));
pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
}
@Test
public void testLongCompoundIdInNonDefaultSchemaDifferentNames() throws Exception {
CustomResolutionTestTableProvider tableProvider = new CustomResolutionTestTableProvider();
tableProvider.createTable(
Table.builder().name("testtable_blah_foo_bar").schema(BASIC_SCHEMA).type("test").build());
tableProvider.addRows("testtable_blah_foo_bar", row(1, "one"), row(2, "two"));
CustomResolutionTestTableProvider tableProvider2 = new CustomResolutionTestTableProvider();
tableProvider2.createTable(
Table.builder()
.name("testtable2_blah2_foo2_bar2")
.schema(BASIC_SCHEMA)
.type("test")
.build());
tableProvider2.addRows("testtable2_blah2_foo2_bar2", row(3, "three"), row(4, "four"));
PCollection<Row> result =
pipeline.apply(
SqlTransform.query("SELECT id, name FROM testprovider2.testtable2.blah2.foo2.bar2")
.withTableProvider("testprovider2", tableProvider2)
.withDefaultTableProvider("testprovider", tableProvider));
PAssert.that(result).containsInAnyOrder(row(3, "three"), row(4, "four"));
pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
}
@Test
public void testJoinWithLongCompoundIds() throws Exception {
CustomResolutionTestTableProvider tableProvider = new CustomResolutionTestTableProvider();
tableProvider.createTable(
Table.builder().name("testtable_blah_foo_bar").schema(BASIC_SCHEMA).type("test").build());
tableProvider.addRows("testtable_blah_foo_bar", row(3, "customer"), row(2, "nobody"));
CustomResolutionTestTableProvider tableProvider2 = new CustomResolutionTestTableProvider();
tableProvider2.createTable(
Table.builder().name("testtable_blah_foo_bar2").schema(BASIC_SCHEMA).type("test").build());
tableProvider2.addRows("testtable_blah_foo_bar2", row(4, "customer"), row(1, "nobody"));
PCollection<Row> result =
pipeline.apply(
SqlTransform.query(
"SELECT testprovider2.testtable.blah.foo.bar2.id, testtable.blah.foo.bar.name \n"
+ "FROM \n"
+ " testprovider2.testtable.blah.foo.bar2 \n"
+ "JOIN \n"
+ " testtable.blah.foo.bar \n"
+ "USING(name)")
.withTableProvider("testprovider2", tableProvider2)
.withDefaultTableProvider("testprovider", tableProvider));
PAssert.that(result).containsInAnyOrder(row(4, "customer"), row(1, "nobody"));
pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
}
@Test
public void testInnerJoinWithLongCompoundIds() throws Exception {
CustomResolutionTestTableProvider tableProvider = new CustomResolutionTestTableProvider();
tableProvider.createTable(
Table.builder().name("testtable_blah_foo_bar").schema(BASIC_SCHEMA).type("test").build());
tableProvider.addRows("testtable_blah_foo_bar", row(3, "customer"), row(2, "nobody"));
CustomResolutionTestTableProvider tableProvider2 = new CustomResolutionTestTableProvider();
tableProvider2.createTable(
Table.builder().name("testtable_blah_foo_bar2").schema(BASIC_SCHEMA).type("test").build());
tableProvider2.addRows("testtable_blah_foo_bar2", row(4, "customer"), row(1, "nobody"));
PCollection<Row> result =
pipeline.apply(
SqlTransform.query(
"SELECT testprovider2.testtable.blah.foo.bar2.id, testtable.blah.foo.bar.name \n"
+ "FROM \n"
+ " testprovider2.testtable.blah.foo.bar2 \n"
+ "JOIN \n"
+ " testtable.blah.foo.bar \n"
+ "USING(name)")
.withTableProvider("testprovider2", tableProvider2)
.withDefaultTableProvider("testprovider", tableProvider));
PAssert.that(result).containsInAnyOrder(row(4, "customer"), row(1, "nobody"));
pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
}
@Test
public void testJoinWithLongCompoundIdsWithAliases() throws Exception {
CustomResolutionTestTableProvider tableProvider = new CustomResolutionTestTableProvider();
tableProvider.createTable(
Table.builder().name("testtable_blah_foo_bar").schema(BASIC_SCHEMA).type("test").build());
tableProvider.addRows("testtable_blah_foo_bar", row(3, "customer"), row(2, "nobody"));
CustomResolutionTestTableProvider tableProvider2 = new CustomResolutionTestTableProvider();
tableProvider2.createTable(
Table.builder().name("testtable_blah_foo_bar2").schema(BASIC_SCHEMA).type("test").build());
tableProvider2.addRows("testtable_blah_foo_bar2", row(4, "customer"), row(1, "nobody"));
PCollection<Row> result =
pipeline.apply(
SqlTransform.query(
"SELECT b.id, a.name \n"
+ "FROM \n"
+ " testprovider2.testtable.blah.foo.bar2 AS b \n"
+ "JOIN \n"
+ " testtable.blah.foo.bar a\n"
+ "USING(name)")
.withTableProvider("testprovider2", tableProvider2)
.withDefaultTableProvider("testprovider", tableProvider));
PAssert.that(result).containsInAnyOrder(row(4, "customer"), row(1, "nobody"));
pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
}
@Test
public void testUnionWithLongCompoundIds() throws Exception {
CustomResolutionTestTableProvider tableProvider = new CustomResolutionTestTableProvider();
tableProvider.createTable(
Table.builder().name("testtable_blah_foo_bar").schema(BASIC_SCHEMA).type("test").build());
tableProvider.addRows("testtable_blah_foo_bar", row(3, "customer"), row(2, "nobody"));
CustomResolutionTestTableProvider tableProvider2 = new CustomResolutionTestTableProvider();
tableProvider2.createTable(
Table.builder().name("testtable_blah_foo_bar2").schema(BASIC_SCHEMA).type("test").build());
tableProvider2.addRows("testtable_blah_foo_bar2", row(4, "customer"), row(1, "nobody"));
PCollection<Row> result =
pipeline.apply(
SqlTransform.query(
"SELECT id, name \n"
+ "FROM \n"
+ " testprovider2.testtable.blah.foo.bar2 \n"
+ "UNION \n"
+ " SELECT id, name \n"
+ " FROM \n"
+ " testtable.blah.foo.bar \n")
.withTableProvider("testprovider2", tableProvider2)
.withDefaultTableProvider("testprovider", tableProvider));
PAssert.that(result)
.containsInAnyOrder(
row(4, "customer"), row(1, "nobody"), row(3, "customer"), row(2, "nobody"));
pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
}
private Row row(int id, String name) {
return Row.withSchema(BASIC_SCHEMA).addValues(id, name).build();
}
}