/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.beam.sdk.extensions.sql.meta;

import static org.junit.Assert.assertThrows;

import java.io.Serializable;
import java.util.Map;
import org.apache.beam.sdk.extensions.sql.SqlTransform;
import org.apache.beam.sdk.extensions.sql.impl.TableName;
import org.apache.beam.sdk.extensions.sql.meta.provider.FullNameTableProvider;
import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestBoundedTable;
import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestTableProvider;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.calcite.v1_20_0.com.google.common.collect.ImmutableList;
import org.joda.time.Duration;
import org.junit.Rule;
import org.junit.Test;

/** Test for custom table resolver and full name table provider. */
public class CustomTableResolverTest implements Serializable {

  @Rule public final transient TestPipeline pipeline = TestPipeline.create();

  private static final Schema BASIC_SCHEMA =
      Schema.builder().addInt32Field("id").addStringField("name").build();

  /**
   * Test table provider with custom name resolution.
   *
   * <p>Demonstrates how to parse table names as in normal Calcite queries syntax, e.g. {@code
   * a.b.c.d} and convert them to its' own custom table name format {@code a_b_c_d}.
   */
  public static class CustomResolutionTestTableProvider extends FullNameTableProvider {

    TestTableProvider delegateTableProvider;

    public CustomResolutionTestTableProvider() {
      delegateTableProvider = new TestTableProvider();
    }

    @Override
    public Table getTable(String tableName) {
      return delegateTableProvider.getTable(tableName);
    }

    @Override
    public Table getTableByFullName(TableName fullTableName) {
      // For the test we register tables with underscore instead of dots, so here we lookup the
      // tables
      // with those underscore.
      String actualTableName =
          String.join("_", fullTableName.getPath()) + "_" + fullTableName.getTableName();
      return delegateTableProvider.getTable(actualTableName);
    }

    @Override
    public String getTableType() {
      return delegateTableProvider.getTableType();
    }

    @Override
    public void createTable(Table table) {
      delegateTableProvider.createTable(table);
    }

    public void addRows(String tableName, Row... rows) {
      delegateTableProvider.addRows(tableName, rows);
    }

    @Override
    public void dropTable(String tableName) {
      delegateTableProvider.dropTable(tableName);
    }

    @Override
    public Map<String, Table> getTables() {
      return delegateTableProvider.getTables();
    }

    @Override
    public BeamSqlTable buildBeamSqlTable(Table table) {
      return delegateTableProvider.buildBeamSqlTable(table);
    }
  }

  @Test
  public void testSimpleId() throws Exception {
    CustomResolutionTestTableProvider tableProvider = new CustomResolutionTestTableProvider();
    tableProvider.createTable(
        Table.builder().name("testtable").schema(BASIC_SCHEMA).type("test").build());
    tableProvider.addRows("testtable", row(1, "one"), row(2, "two"));

    PCollection<Row> result =
        pipeline.apply(
            SqlTransform.query("SELECT id, name FROM testtable")
                .withDefaultTableProvider("testprovider", tableProvider));

    PAssert.that(result).containsInAnyOrder(row(1, "one"), row(2, "two"));

    pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
  }

  @Test
  public void testDefaultBuildIOReader_withEmptyParams_returnsPCollection() {
    TestBoundedTable testTable = TestBoundedTable.of(BASIC_SCHEMA).addRows(1, "one");
    Row expected = row(1, "one");

    PCollection<Row> resultWithEmpty =
        testTable.buildIOReader(
            pipeline.begin(), testTable.constructFilter(ImmutableList.of()), ImmutableList.of());

    PAssert.that(resultWithEmpty).containsInAnyOrder(expected);

    pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
  }

  @Test
  public void testDefaultBuildIOReader_withNonEmptyParams_throwsException() {
    TestBoundedTable testTable = TestBoundedTable.of(BASIC_SCHEMA).addRows(1, "one");

    assertThrows(
        IllegalArgumentException.class,
        () -> testTable.buildIOReader(pipeline.begin(), () -> null, ImmutableList.of()));
    assertThrows(
        IllegalArgumentException.class,
        () ->
            testTable.buildIOReader(
                pipeline.begin(),
                new DefaultTableFilter(ImmutableList.of()),
                ImmutableList.of("one")));

    pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
  }

  @Test
  public void testSimpleIdWithExplicitDefaultSchema() throws Exception {
    CustomResolutionTestTableProvider tableProvider = new CustomResolutionTestTableProvider();
    tableProvider.createTable(
        Table.builder().name("testtable").schema(BASIC_SCHEMA).type("test").build());
    tableProvider.addRows("testtable", row(1, "one"), row(2, "two"));

    PCollection<Row> result =
        pipeline.apply(
            SqlTransform.query("SELECT id, name FROM testprovider.testtable")
                .withDefaultTableProvider("testprovider", tableProvider));

    PAssert.that(result).containsInAnyOrder(row(1, "one"), row(2, "two"));

    pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
  }

  @Test
  public void testSimpleIdWithExplicitDefaultSchemaWithMultipleProviders() throws Exception {
    CustomResolutionTestTableProvider tableProvider = new CustomResolutionTestTableProvider();
    tableProvider.createTable(
        Table.builder().name("testtable").schema(BASIC_SCHEMA).type("test").build());
    tableProvider.addRows("testtable", row(1, "one"), row(2, "two"));

    CustomResolutionTestTableProvider tableProvider2 = new CustomResolutionTestTableProvider();
    tableProvider2.createTable(
        Table.builder().name("testtable2").schema(BASIC_SCHEMA).type("test").build());
    tableProvider2.addRows("testtable2", row(3, "three"), row(4, "four"));

    PCollection<Row> result =
        pipeline.apply(
            SqlTransform.query("SELECT id, name FROM testprovider2.testtable2")
                .withTableProvider("testprovider2", tableProvider2)
                .withDefaultTableProvider("testprovider", tableProvider));

    PAssert.that(result).containsInAnyOrder(row(3, "three"), row(4, "four"));

    pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
  }

  @Test
  public void testSimpleIdWithExplicitNonDefaultSchema() throws Exception {
    CustomResolutionTestTableProvider tableProvider = new CustomResolutionTestTableProvider();
    tableProvider.createTable(
        Table.builder().name("testtable").schema(BASIC_SCHEMA).type("test").build());
    tableProvider.addRows("testtable", row(1, "one"), row(2, "two"));

    CustomResolutionTestTableProvider tableProvider2 = new CustomResolutionTestTableProvider();
    tableProvider2.createTable(
        Table.builder().name("testtable2").schema(BASIC_SCHEMA).type("test").build());
    tableProvider2.addRows("testtable2", row(3, "three"), row(4, "four"));

    PCollection<Row> result =
        pipeline.apply(
            SqlTransform.query("SELECT id, name FROM testprovider2.testtable2")
                .withTableProvider("testprovider2", tableProvider2)
                .withDefaultTableProvider("testprovider", tableProvider));

    PAssert.that(result).containsInAnyOrder(row(3, "three"), row(4, "four"));

    pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
  }

  @Test
  public void testCompoundIdInDefaultSchema() throws Exception {
    CustomResolutionTestTableProvider tableProvider = new CustomResolutionTestTableProvider();
    tableProvider.createTable(
        Table.builder().name("testtable_blah").schema(BASIC_SCHEMA).type("test").build());
    tableProvider.addRows("testtable_blah", row(1, "one"), row(2, "two"));

    PCollection<Row> result =
        pipeline.apply(
            SqlTransform.query("SELECT id, name FROM testtable.blah")
                .withDefaultTableProvider("testprovider", tableProvider));

    PAssert.that(result).containsInAnyOrder(row(1, "one"), row(2, "two"));

    pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
  }

  @Test
  public void testCompoundIdInExplicitDefaultSchema() throws Exception {
    CustomResolutionTestTableProvider tableProvider = new CustomResolutionTestTableProvider();
    tableProvider.createTable(
        Table.builder().name("testtable_blah").schema(BASIC_SCHEMA).type("test").build());
    tableProvider.addRows("testtable_blah", row(1, "one"), row(2, "two"));

    PCollection<Row> result =
        pipeline.apply(
            SqlTransform.query("SELECT id, name FROM testprovider.testtable.blah")
                .withDefaultTableProvider("testprovider", tableProvider));

    PAssert.that(result).containsInAnyOrder(row(1, "one"), row(2, "two"));

    pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
  }

  @Test
  public void testLongCompoundIdInDefaultSchema() throws Exception {
    CustomResolutionTestTableProvider tableProvider = new CustomResolutionTestTableProvider();
    tableProvider.createTable(
        Table.builder().name("testtable_blah_foo_bar").schema(BASIC_SCHEMA).type("test").build());
    tableProvider.addRows("testtable_blah_foo_bar", row(1, "one"), row(2, "two"));

    PCollection<Row> result =
        pipeline.apply(
            SqlTransform.query("SELECT id, name FROM testtable.blah.foo.bar")
                .withDefaultTableProvider("testprovider", tableProvider));

    PAssert.that(result).containsInAnyOrder(row(1, "one"), row(2, "two"));

    pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
  }

  @Test
  public void testLongCompoundIdInDefaultSchemaWithMultipleProviders() throws Exception {
    CustomResolutionTestTableProvider tableProvider = new CustomResolutionTestTableProvider();
    tableProvider.createTable(
        Table.builder().name("testtable_blah_foo_bar").schema(BASIC_SCHEMA).type("test").build());
    tableProvider.addRows("testtable_blah_foo_bar", row(1, "one"), row(2, "two"));

    CustomResolutionTestTableProvider tableProvider2 = new CustomResolutionTestTableProvider();
    tableProvider2.createTable(
        Table.builder().name("testtable_blah_foo_bar").schema(BASIC_SCHEMA).type("test").build());
    tableProvider2.addRows("testtable_blah_foo_bar", row(3, "three"), row(4, "four"));

    PCollection<Row> result =
        pipeline.apply(
            SqlTransform.query("SELECT id, name FROM testtable.blah.foo.bar")
                .withTableProvider("testprovider2", tableProvider2)
                .withDefaultTableProvider("testprovider", tableProvider));

    PAssert.that(result).containsInAnyOrder(row(1, "one"), row(2, "two"));

    pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
  }

  @Test
  public void testLongCompoundIdInExplicitDefaultSchema() throws Exception {
    CustomResolutionTestTableProvider tableProvider = new CustomResolutionTestTableProvider();
    tableProvider.createTable(
        Table.builder().name("testtable_blah_foo_bar").schema(BASIC_SCHEMA).type("test").build());
    tableProvider.addRows("testtable_blah_foo_bar", row(1, "one"), row(2, "two"));

    PCollection<Row> result =
        pipeline.apply(
            SqlTransform.query("SELECT id, name FROM testprovider.testtable.blah.foo.bar")
                .withDefaultTableProvider("testprovider", tableProvider));

    PAssert.that(result).containsInAnyOrder(row(1, "one"), row(2, "two"));

    pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
  }

  @Test
  public void testLongCompoundIdInNonDefaultSchemaSameTableNames() throws Exception {
    CustomResolutionTestTableProvider tableProvider = new CustomResolutionTestTableProvider();
    tableProvider.createTable(
        Table.builder().name("testtable_blah_foo_bar").schema(BASIC_SCHEMA).type("test").build());
    tableProvider.addRows("testtable_blah_foo_bar", row(1, "one"), row(2, "two"));

    CustomResolutionTestTableProvider tableProvider2 = new CustomResolutionTestTableProvider();
    tableProvider2.createTable(
        Table.builder().name("testtable_blah_foo_bar").schema(BASIC_SCHEMA).type("test").build());
    tableProvider2.addRows("testtable_blah_foo_bar", row(3, "three"), row(4, "four"));

    PCollection<Row> result =
        pipeline.apply(
            SqlTransform.query("SELECT id, name FROM testprovider2.testtable.blah.foo.bar")
                .withTableProvider("testprovider2", tableProvider2)
                .withDefaultTableProvider("testprovider", tableProvider));

    PAssert.that(result).containsInAnyOrder(row(3, "three"), row(4, "four"));

    pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
  }

  @Test
  public void testLongCompoundIdInNonDefaultSchemaDifferentNames() throws Exception {
    CustomResolutionTestTableProvider tableProvider = new CustomResolutionTestTableProvider();
    tableProvider.createTable(
        Table.builder().name("testtable_blah_foo_bar").schema(BASIC_SCHEMA).type("test").build());
    tableProvider.addRows("testtable_blah_foo_bar", row(1, "one"), row(2, "two"));

    CustomResolutionTestTableProvider tableProvider2 = new CustomResolutionTestTableProvider();
    tableProvider2.createTable(
        Table.builder()
            .name("testtable2_blah2_foo2_bar2")
            .schema(BASIC_SCHEMA)
            .type("test")
            .build());
    tableProvider2.addRows("testtable2_blah2_foo2_bar2", row(3, "three"), row(4, "four"));

    PCollection<Row> result =
        pipeline.apply(
            SqlTransform.query("SELECT id, name FROM testprovider2.testtable2.blah2.foo2.bar2")
                .withTableProvider("testprovider2", tableProvider2)
                .withDefaultTableProvider("testprovider", tableProvider));

    PAssert.that(result).containsInAnyOrder(row(3, "three"), row(4, "four"));

    pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
  }

  @Test
  public void testJoinWithLongCompoundIds() throws Exception {
    CustomResolutionTestTableProvider tableProvider = new CustomResolutionTestTableProvider();
    tableProvider.createTable(
        Table.builder().name("testtable_blah_foo_bar").schema(BASIC_SCHEMA).type("test").build());
    tableProvider.addRows("testtable_blah_foo_bar", row(3, "customer"), row(2, "nobody"));

    CustomResolutionTestTableProvider tableProvider2 = new CustomResolutionTestTableProvider();
    tableProvider2.createTable(
        Table.builder().name("testtable_blah_foo_bar2").schema(BASIC_SCHEMA).type("test").build());
    tableProvider2.addRows("testtable_blah_foo_bar2", row(4, "customer"), row(1, "nobody"));

    PCollection<Row> result =
        pipeline.apply(
            SqlTransform.query(
                    "SELECT testprovider2.testtable.blah.foo.bar2.id, testtable.blah.foo.bar.name \n"
                        + "FROM \n"
                        + "  testprovider2.testtable.blah.foo.bar2 \n"
                        + "JOIN \n"
                        + "  testtable.blah.foo.bar \n"
                        + "USING(name)")
                .withTableProvider("testprovider2", tableProvider2)
                .withDefaultTableProvider("testprovider", tableProvider));

    PAssert.that(result).containsInAnyOrder(row(4, "customer"), row(1, "nobody"));

    pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
  }

  @Test
  public void testInnerJoinWithLongCompoundIds() throws Exception {
    CustomResolutionTestTableProvider tableProvider = new CustomResolutionTestTableProvider();
    tableProvider.createTable(
        Table.builder().name("testtable_blah_foo_bar").schema(BASIC_SCHEMA).type("test").build());
    tableProvider.addRows("testtable_blah_foo_bar", row(3, "customer"), row(2, "nobody"));

    CustomResolutionTestTableProvider tableProvider2 = new CustomResolutionTestTableProvider();
    tableProvider2.createTable(
        Table.builder().name("testtable_blah_foo_bar2").schema(BASIC_SCHEMA).type("test").build());
    tableProvider2.addRows("testtable_blah_foo_bar2", row(4, "customer"), row(1, "nobody"));

    PCollection<Row> result =
        pipeline.apply(
            SqlTransform.query(
                    "SELECT testprovider2.testtable.blah.foo.bar2.id, testtable.blah.foo.bar.name \n"
                        + "FROM \n"
                        + "  testprovider2.testtable.blah.foo.bar2 \n"
                        + "JOIN \n"
                        + "  testtable.blah.foo.bar \n"
                        + "USING(name)")
                .withTableProvider("testprovider2", tableProvider2)
                .withDefaultTableProvider("testprovider", tableProvider));

    PAssert.that(result).containsInAnyOrder(row(4, "customer"), row(1, "nobody"));

    pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
  }

  @Test
  public void testJoinWithLongCompoundIdsWithAliases() throws Exception {
    CustomResolutionTestTableProvider tableProvider = new CustomResolutionTestTableProvider();
    tableProvider.createTable(
        Table.builder().name("testtable_blah_foo_bar").schema(BASIC_SCHEMA).type("test").build());
    tableProvider.addRows("testtable_blah_foo_bar", row(3, "customer"), row(2, "nobody"));

    CustomResolutionTestTableProvider tableProvider2 = new CustomResolutionTestTableProvider();
    tableProvider2.createTable(
        Table.builder().name("testtable_blah_foo_bar2").schema(BASIC_SCHEMA).type("test").build());
    tableProvider2.addRows("testtable_blah_foo_bar2", row(4, "customer"), row(1, "nobody"));

    PCollection<Row> result =
        pipeline.apply(
            SqlTransform.query(
                    "SELECT b.id, a.name \n"
                        + "FROM \n"
                        + "  testprovider2.testtable.blah.foo.bar2 AS b \n"
                        + "JOIN \n"
                        + "  testtable.blah.foo.bar a\n"
                        + "USING(name)")
                .withTableProvider("testprovider2", tableProvider2)
                .withDefaultTableProvider("testprovider", tableProvider));

    PAssert.that(result).containsInAnyOrder(row(4, "customer"), row(1, "nobody"));

    pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
  }

  @Test
  public void testUnionWithLongCompoundIds() throws Exception {
    CustomResolutionTestTableProvider tableProvider = new CustomResolutionTestTableProvider();
    tableProvider.createTable(
        Table.builder().name("testtable_blah_foo_bar").schema(BASIC_SCHEMA).type("test").build());
    tableProvider.addRows("testtable_blah_foo_bar", row(3, "customer"), row(2, "nobody"));

    CustomResolutionTestTableProvider tableProvider2 = new CustomResolutionTestTableProvider();
    tableProvider2.createTable(
        Table.builder().name("testtable_blah_foo_bar2").schema(BASIC_SCHEMA).type("test").build());
    tableProvider2.addRows("testtable_blah_foo_bar2", row(4, "customer"), row(1, "nobody"));

    PCollection<Row> result =
        pipeline.apply(
            SqlTransform.query(
                    "SELECT id, name \n"
                        + "FROM \n"
                        + "  testprovider2.testtable.blah.foo.bar2 \n"
                        + "UNION \n"
                        + "    SELECT id, name \n"
                        + "      FROM \n"
                        + "        testtable.blah.foo.bar \n")
                .withTableProvider("testprovider2", tableProvider2)
                .withDefaultTableProvider("testprovider", tableProvider));

    PAssert.that(result)
        .containsInAnyOrder(
            row(4, "customer"), row(1, "nobody"), row(3, "customer"), row(2, "nobody"));

    pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
  }

  private Row row(int id, String name) {
    return Row.withSchema(BASIC_SCHEMA).addValues(id, name).build();
  }
}
