blob: 17eb24dc5768295679b52fcbb8190691bccd95f1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.extensions.sql.zetasql;
import static org.apache.beam.sdk.extensions.sql.zetasql.TestInput.BASIC_TABLE_ONE;
import static org.apache.beam.sdk.extensions.sql.zetasql.TestInput.BASIC_TABLE_TWO;
import static org.apache.beam.sdk.extensions.sql.zetasql.TestInput.TABLE_WITH_STRUCT;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.extensions.sql.impl.JdbcConnection;
import org.apache.beam.sdk.extensions.sql.impl.JdbcDriver;
import org.apache.beam.sdk.extensions.sql.impl.planner.BeamRuleSets;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
import org.apache.beam.sdk.extensions.sql.meta.provider.ReadOnlyTableProvider;
import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.Contexts;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.ConventionTraitDef;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelTraitDef;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.schema.SchemaPlus;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.tools.FrameworkConfig;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.tools.Frameworks;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.joda.time.Duration;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
/** Tests for identifiers. */
@RunWith(JUnit4.class)
public class JoinCompoundIdentifiersTest {
private static final Long TWO_MINUTES = 2L;
private static final String DEFAULT_SCHEMA = "beam";
private static final String FULL_ON_ID =
"a.`b-\\`c`.d.`httz://d.e-f.g:233333/blah\\?yes=1&true=false`";
private static final String TABLE_WITH_STRUCTS_ID = "a.`table:com`.`..::with-struct::..`";
private static final TableProvider TEST_TABLES =
new ReadOnlyTableProvider(
"test_table_provider",
ImmutableMap.<String, BeamSqlTable>builder()
.put("KeyValue", BASIC_TABLE_ONE)
.put("a.b", BASIC_TABLE_ONE)
.put("c.d.e", BASIC_TABLE_ONE)
.put("c.d.f", BASIC_TABLE_TWO)
.put("c.g.e", BASIC_TABLE_TWO)
.put("weird.`\\n\\t\\r\\f`", BASIC_TABLE_ONE)
.put("a.`b-\\`c`.d", BASIC_TABLE_TWO)
.put(FULL_ON_ID, BASIC_TABLE_TWO)
.put(TABLE_WITH_STRUCTS_ID, TABLE_WITH_STRUCT)
.build());
@Rule public transient TestPipeline pipeline = TestPipeline.create();
@Rule public ExpectedException thrown = ExpectedException.none();
@Test
public void testComplexTableName() throws Exception {
FrameworkConfig cfg = initializeCalcite();
PCollection<Row> result = applySqlTransform(pipeline, cfg, "SELECT Key FROM a.b");
PAssert.that(result).containsInAnyOrder(singleValue(14L), singleValue(15L));
pipeline.run().waitUntilFinish(Duration.standardMinutes(TWO_MINUTES));
}
@Test
public void testComplexTableName3Levels() throws Exception {
FrameworkConfig cfg = initializeCalcite();
PCollection<Row> result = applySqlTransform(pipeline, cfg, "SELECT Key FROM c.d.e");
PAssert.that(result).containsInAnyOrder(singleValue(14L), singleValue(15L));
pipeline.run().waitUntilFinish(Duration.standardMinutes(TWO_MINUTES));
}
@Test
public void testOnePartWithBackticks() throws Exception {
FrameworkConfig cfg = initializeCalcite();
PCollection<Row> result = applySqlTransform(pipeline, cfg, "SELECT RowKey FROM a.`b-\\`c`.d");
PAssert.that(result).containsInAnyOrder(singleValue(16L), singleValue(15L));
pipeline.run().waitUntilFinish(Duration.standardMinutes(TWO_MINUTES));
}
@Test
public void testNewLinesAndOtherWhitespace() throws Exception {
FrameworkConfig cfg = initializeCalcite();
PCollection<Row> result =
applySqlTransform(pipeline, cfg, "SELECT Key FROM weird.`\\n\\t\\r\\f`");
PAssert.that(result).containsInAnyOrder(singleValue(14L), singleValue(15L));
pipeline.run().waitUntilFinish(Duration.standardMinutes(TWO_MINUTES));
}
@Test
public void testFullOnWithBackticks() throws Exception {
FrameworkConfig cfg = initializeCalcite();
PCollection<Row> result = applySqlTransform(pipeline, cfg, "SELECT RowKey FROM " + FULL_ON_ID);
PAssert.that(result).containsInAnyOrder(singleValue(16L), singleValue(15L));
pipeline.run().waitUntilFinish(Duration.standardMinutes(TWO_MINUTES));
}
@Test
public void testJoinWithFullOnWithBackticks() throws Exception {
FrameworkConfig cfg = initializeCalcite();
PCollection<Row> result =
applySqlTransform(
pipeline,
cfg,
"SELECT t1.RowKey FROM "
+ FULL_ON_ID
+ " AS t1 \n"
+ " INNER JOIN a.`b-\\`c`.d t2 on t1.RowKey = t2.RowKey");
PAssert.that(result).containsInAnyOrder(singleValue(16L), singleValue(15L));
pipeline.run().waitUntilFinish(Duration.standardMinutes(TWO_MINUTES));
}
@Test
public void testQualifiedFieldAccessWithAliasedComplexTableName() throws Exception {
FrameworkConfig cfg = initializeCalcite();
PCollection<Row> result = applySqlTransform(pipeline, cfg, "SELECT t.Key FROM a.b AS t");
PAssert.that(result).containsInAnyOrder(singleValue(14L), singleValue(15L));
pipeline.run().waitUntilFinish(Duration.standardMinutes(TWO_MINUTES));
}
@Test
public void testQualifiedFieldAccessWithAliasedComplexTableName3Levels() throws Exception {
FrameworkConfig cfg = initializeCalcite();
PCollection<Row> result = applySqlTransform(pipeline, cfg, "SELECT t.Key FROM c.d.e AS t");
PAssert.that(result).containsInAnyOrder(singleValue(14L), singleValue(15L));
pipeline.run().waitUntilFinish(Duration.standardMinutes(TWO_MINUTES));
}
@Test
public void testQualifiedFieldAccessWithUnaliasedComplexTableName() throws Exception {
FrameworkConfig cfg = initializeCalcite();
PCollection<Row> result = applySqlTransform(pipeline, cfg, "SELECT b.Key FROM a.b");
PAssert.that(result).containsInAnyOrder(singleValue(14L), singleValue(15L));
pipeline.run().waitUntilFinish(Duration.standardMinutes(TWO_MINUTES));
}
@Test
public void testQualifiedFieldAccessWithUnaliasedComplexTableName3Levels() throws Exception {
FrameworkConfig cfg = initializeCalcite();
PCollection<Row> result = applySqlTransform(pipeline, cfg, "SELECT e.Key FROM c.d.e");
PAssert.that(result).containsInAnyOrder(singleValue(14L), singleValue(15L));
pipeline.run().waitUntilFinish(Duration.standardMinutes(TWO_MINUTES));
}
@Test
public void testQualifiedFieldAccessWithUnaliasedComplexTableName3Levels2() throws Exception {
FrameworkConfig cfg = initializeCalcite();
PCollection<Row> result = applySqlTransform(pipeline, cfg, "SELECT e.Key FROM c.d.e");
PAssert.that(result).containsInAnyOrder(singleValue(14L), singleValue(15L));
pipeline.run().waitUntilFinish(Duration.standardMinutes(TWO_MINUTES));
}
@Test
public void testQualifiedFieldAccessWithJoinOfAliasedComplexTableNames() throws Exception {
FrameworkConfig cfg = initializeCalcite();
PCollection<Row> result =
applySqlTransform(
pipeline,
cfg,
"SELECT t1.Key FROM a.b AS t1 INNER JOIN c.d.e AS t2 ON t1.Key = t2.Key");
PAssert.that(result).containsInAnyOrder(singleValue(14L), singleValue(15L));
pipeline.run().waitUntilFinish(Duration.standardMinutes(TWO_MINUTES));
}
@Test
public void testJoinTwoTablesWithLastPartIdDifferent() throws Exception {
FrameworkConfig cfg = initializeCalcite();
PCollection<Row> result =
applySqlTransform(
pipeline,
cfg,
"SELECT t1.Key FROM c.d.e AS t1 INNER JOIN c.d.f AS t2 ON t1.Key = t2.RowKey");
PAssert.that(result).containsInAnyOrder(singleValue(15L));
pipeline.run().waitUntilFinish(Duration.standardMinutes(TWO_MINUTES));
}
@Test
public void testJoinTwoTablesWithMiddlePartIdDifferent() throws Exception {
FrameworkConfig cfg = initializeCalcite();
PCollection<Row> result =
applySqlTransform(
pipeline,
cfg,
"SELECT t1.Key FROM c.d.e AS t1 INNER JOIN c.g.e AS t2 ON t1.Key = t2.RowKey");
PAssert.that(result).containsInAnyOrder(singleValue(15L));
pipeline.run().waitUntilFinish(Duration.standardMinutes(TWO_MINUTES));
}
@Test
public void testQualifiedFieldAccessWithJoinOfUnaliasedComplexTableNames() throws Exception {
FrameworkConfig cfg = initializeCalcite();
PCollection<Row> result =
applySqlTransform(pipeline, cfg, "SELECT b.Key FROM a.b INNER JOIN c.d.e ON b.Key = e.Key");
PAssert.that(result).containsInAnyOrder(singleValue(14L), singleValue(15L));
pipeline.run().waitUntilFinish(Duration.standardMinutes(TWO_MINUTES));
}
@Test
public void testStructFieldAccess() throws Exception {
FrameworkConfig cfg = initializeCalcite();
PCollection<Row> result =
applySqlTransform(
pipeline,
cfg,
"SELECT struct_col.struct_col_str FROM a.`table:com`.`..::with-struct::..`");
PAssert.that(result).containsInAnyOrder(singleValue("row_one"), singleValue("row_two"));
pipeline.run().waitUntilFinish(Duration.standardMinutes(TWO_MINUTES));
}
@Test
public void testQualifiedStructFieldAccess() throws Exception {
FrameworkConfig cfg = initializeCalcite();
PCollection<Row> result =
applySqlTransform(
pipeline,
cfg,
"SELECT `..::with-struct::..`.struct_col.struct_col_str \n"
+ " FROM a.`table:com`.`..::with-struct::..`");
PAssert.that(result).containsInAnyOrder(singleValue("row_one"), singleValue("row_two"));
pipeline.run().waitUntilFinish(Duration.standardMinutes(TWO_MINUTES));
}
@Test
public void testAliasedStructFieldAccess() throws Exception {
FrameworkConfig cfg = initializeCalcite();
PCollection<Row> result =
applySqlTransform(
pipeline,
cfg,
"SELECT t.struct_col.struct_col_str FROM " + TABLE_WITH_STRUCTS_ID + " t");
PAssert.that(result).containsInAnyOrder(singleValue("row_one"), singleValue("row_two"));
pipeline.run().waitUntilFinish(Duration.standardMinutes(TWO_MINUTES));
}
@SuppressWarnings("unxchecked")
private static FrameworkConfig initializeCalcite() {
JdbcConnection jdbcConnection =
JdbcDriver.connect(TEST_TABLES, PipelineOptionsFactory.create());
SchemaPlus defaultSchemaPlus = jdbcConnection.getCurrentSchemaPlus();
List<RelTraitDef> traitDefs = ImmutableList.of(ConventionTraitDef.INSTANCE);
Object[] contexts =
ImmutableList.of(
Contexts.of(jdbcConnection.config()),
TableResolutionContext.joinCompoundIds(DEFAULT_SCHEMA))
.toArray();
return Frameworks.newConfigBuilder()
.defaultSchema(defaultSchemaPlus)
.traitDefs(traitDefs)
.context(Contexts.of(contexts))
.ruleSets(BeamRuleSets.getRuleSets())
.costFactory(null)
.typeSystem(jdbcConnection.getTypeFactory().getTypeSystem())
.build();
}
private PCollection<Row> applySqlTransform(
Pipeline pipeline, FrameworkConfig config, String query) throws Exception {
BeamRelNode beamRelNode = new ZetaSQLQueryPlanner(config).parseQuery(query);
return BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
}
private Row singleValue(long value) {
return Row.withSchema(singleLongField()).addValue(value).build();
}
private Row singleValue(String value) {
return Row.withSchema(singleStringField()).addValue(value).build();
}
private Schema singleLongField() {
return Schema.builder().addInt64Field("field1").build();
}
private Schema singleStringField() {
return Schema.builder().addStringField("field1").build();
}
}