/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.beam.sdk.schemas.transforms;

import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.apache.beam.sdk.schemas.JavaFieldSchema;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.UsesSchema;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

/** Test for {@link Select}. */
@RunWith(JUnit4.class)
@Category(UsesSchema.class)
public class SelectTest {
  @Rule public final transient TestPipeline pipeline = TestPipeline.create();
  @Rule public transient ExpectedException thrown = ExpectedException.none();

  /** flat POJO to selection from. */
  @DefaultSchema(JavaFieldSchema.class)
  static class POJO1 {
    String field1 = "field1";
    Integer field2 = 42;
    Double field3 = 3.14;

    @Override
    public boolean equals(Object o) {
      if (this == o) {
        return true;
      }
      if (o == null || getClass() != o.getClass()) {
        return false;
      }
      POJO1 pojo1 = (POJO1) o;
      return Objects.equals(field1, pojo1.field1)
          && Objects.equals(field2, pojo1.field2)
          && Objects.equals(field3, pojo1.field3);
    }

    @Override
    public int hashCode() {
      return Objects.hash(field1, field2, field3);
    }

    @Override
    public String toString() {
      return "POJO1{"
          + "field1='"
          + field1
          + '\''
          + ", field2="
          + field2
          + ", field3="
          + field3
          + '}';
    }
  };

  /** A pojo matching the schema resulting from selection field1, field3. */
  @DefaultSchema(JavaFieldSchema.class)
  static class POJO1Selected {
    String field1 = "field1";
    Double field3 = 3.14;

    @Override
    public boolean equals(Object o) {
      if (this == o) {
        return true;
      }
      if (o == null || getClass() != o.getClass()) {
        return false;
      }
      POJO1Selected that = (POJO1Selected) o;
      return Objects.equals(field1, that.field1) && Objects.equals(field3, that.field3);
    }

    @Override
    public int hashCode() {
      return Objects.hash(field1, field3);
    }
  }

  /** A nested POJO. */
  @DefaultSchema(JavaFieldSchema.class)
  static class POJO2 {
    String field1 = "field1";
    POJO1 field2 = new POJO1();

    @Override
    public boolean equals(Object o) {
      if (this == o) {
        return true;
      }
      if (!(o instanceof POJO2)) {
        return false;
      }
      POJO2 pojo2 = (POJO2) o;
      return Objects.equals(field1, pojo2.field1) && Objects.equals(field2, pojo2.field2);
    }

    @Override
    public int hashCode() {
      return Objects.hash(field1, field2);
    }
  }

  @Test
  @Category(NeedsRunner.class)
  public void testSelectMissingFieldName() {
    thrown.expect(IllegalArgumentException.class);
    pipeline.apply(Create.of(new POJO1())).apply(Select.fieldNames("missing"));
    pipeline.run();
  }

  @Test
  @Category(NeedsRunner.class)
  public void testSelectMissingFieldIndex() {
    thrown.expect(IllegalArgumentException.class);
    pipeline.apply(Create.of(new POJO1())).apply(Select.fieldIds(42));
    pipeline.run();
  }

  @Test
  @Category(NeedsRunner.class)
  public void testSelectAll() {
    PCollection<POJO1> pojos =
        pipeline
            .apply(Create.of(new POJO1()))
            .apply(Select.fieldNames("*"))
            .apply(Convert.to(POJO1.class));
    PAssert.that(pojos).containsInAnyOrder(new POJO1());
    pipeline.run();
  }

  @Test
  @Category(NeedsRunner.class)
  public void testSimpleSelect() {
    PCollection<POJO1Selected> pojos =
        pipeline
            .apply(Create.of(new POJO1()))
            .apply(Select.fieldNames("field1", "field3"))
            .apply(Convert.to(POJO1Selected.class));
    PAssert.that(pojos).containsInAnyOrder(new POJO1Selected());
    pipeline.run();
  }

  @Test
  @Category(NeedsRunner.class)
  public void testSelectNestedAll() {
    PCollection<POJO1> pojos =
        pipeline
            .apply(Create.of(new POJO2()))
            .apply(Select.fieldNames("field2"))
            .apply(Convert.to(POJO1.class));
    PAssert.that(pojos).containsInAnyOrder(new POJO1());
    pipeline.run();
  }

  @Test
  @Category(NeedsRunner.class)
  public void testSelectNestedAllWildcard() {
    PCollection<POJO1> pojos =
        pipeline
            .apply(Create.of(new POJO2()))
            .apply(Select.fieldNames("field2.*"))
            .apply(Convert.to(POJO1.class));
    PAssert.that(pojos).containsInAnyOrder(new POJO1());
    pipeline.run();
  }

  @Test
  @Category(NeedsRunner.class)
  public void testSelectNestedPartial() {
    PCollection<POJO1Selected> pojos =
        pipeline
            .apply(Create.of(new POJO2()))
            .apply(Select.fieldNames("field2.field1", "field2.field3"))
            .apply(Convert.to(POJO1Selected.class));
    PAssert.that(pojos).containsInAnyOrder(new POJO1Selected());
    pipeline.run();
  }

  @DefaultSchema(JavaFieldSchema.class)
  static class PrimitiveArray {
    List<Double> field1 = ImmutableList.of(1.0, 2.1, 3.2);

    @Override
    public boolean equals(Object o) {
      if (this == o) {
        return true;
      }
      if (!(o instanceof PrimitiveArray)) {
        return false;
      }
      PrimitiveArray that = (PrimitiveArray) o;
      return Objects.equals(field1, that.field1);
    }

    @Override
    public int hashCode() {
      return Objects.hash(field1);
    }
  }

  @Test
  @Category(NeedsRunner.class)
  public void testSelectPrimitiveArray() {
    PCollection<PrimitiveArray> selected =
        pipeline
            .apply(Create.of(new PrimitiveArray()))
            .apply(Select.fieldNames("field1"))
            .apply(Convert.to(PrimitiveArray.class));
    PAssert.that(selected).containsInAnyOrder(new PrimitiveArray());
    pipeline.run();
  }

  @DefaultSchema(JavaFieldSchema.class)
  static class RowSingleArray {
    List<POJO1> field1 = ImmutableList.of(new POJO1(), new POJO1(), new POJO1());

    @Override
    public boolean equals(Object o) {
      if (this == o) {
        return true;
      }
      if (!(o instanceof RowSingleArray)) {
        return false;
      }
      RowSingleArray that = (RowSingleArray) o;
      return Objects.equals(field1, that.field1);
    }

    @Override
    public int hashCode() {
      return Objects.hash(field1);
    }
  }

  @DefaultSchema(JavaFieldSchema.class)
  static class PartialRowSingleArray {
    List<String> field1 = ImmutableList.of("field1", "field1", "field1");
    List<Double> field3 = ImmutableList.of(3.14, 3.14, 3.14);

    @Override
    public boolean equals(Object o) {
      if (this == o) {
        return true;
      }
      if (!(o instanceof PartialRowSingleArray)) {
        return false;
      }
      PartialRowSingleArray that = (PartialRowSingleArray) o;
      return Objects.equals(field1, that.field1) && Objects.equals(field3, that.field3);
    }

    @Override
    public int hashCode() {
      return Objects.hash(field1, field3);
    }
  }

  @Test
  @Category(NeedsRunner.class)
  public void testSelectRowArray() {
    PCollection<PartialRowSingleArray> selected =
        pipeline
            .apply(Create.of(new RowSingleArray()))
            .apply(Select.fieldNames("field1.field1", "field1.field3"))
            .apply(Convert.to(PartialRowSingleArray.class));
    PAssert.that(selected).containsInAnyOrder(new PartialRowSingleArray());
    pipeline.run();
  }

  @DefaultSchema(JavaFieldSchema.class)
  static class RowSingleMap {
    Map<String, POJO1> field1 =
        ImmutableMap.of(
            "key1", new POJO1(),
            "key2", new POJO1(),
            "key3", new POJO1());

    @Override
    public boolean equals(Object o) {
      if (this == o) {
        return true;
      }
      if (!(o instanceof RowSingleMap)) {
        return false;
      }
      RowSingleMap that = (RowSingleMap) o;
      return Objects.equals(field1, that.field1);
    }

    @Override
    public int hashCode() {
      return Objects.hash(field1);
    }
  }

  @DefaultSchema(JavaFieldSchema.class)
  static class PartialRowSingleMap {
    Map<String, String> field1 =
        ImmutableMap.of(
            "key1", "field1",
            "key2", "field1",
            "key3", "field1");
    Map<String, Double> field3 =
        ImmutableMap.of(
            "key1", 3.14,
            "key2", 3.14,
            "key3", 3.14);

    @Override
    public boolean equals(Object o) {
      if (this == o) {
        return true;
      }
      if (!(o instanceof PartialRowSingleMap)) {
        return false;
      }
      PartialRowSingleMap that = (PartialRowSingleMap) o;
      return Objects.equals(field1, that.field1) && Objects.equals(field3, that.field3);
    }

    @Override
    public int hashCode() {
      return Objects.hash(field1, field3);
    }
  }

  @Test
  @Category(NeedsRunner.class)
  public void testSelectRowMap() {
    PCollection<PartialRowSingleMap> selected =
        pipeline
            .apply(Create.of(new RowSingleMap()))
            .apply(Select.fieldNames("field1.field1", "field1.field3"))
            .apply(Convert.to(PartialRowSingleMap.class));
    PAssert.that(selected).containsInAnyOrder(new PartialRowSingleMap());
    pipeline.run();
  }

  @DefaultSchema(JavaFieldSchema.class)
  static class RowMultipleArray {
    private static final List<POJO1> POJO_LIST =
        ImmutableList.of(new POJO1(), new POJO1(), new POJO1());
    private static final List<List<POJO1>> POJO_LIST_LIST =
        ImmutableList.of(POJO_LIST, POJO_LIST, POJO_LIST);

    List<List<List<POJO1>>> field1 =
        ImmutableList.of(POJO_LIST_LIST, POJO_LIST_LIST, POJO_LIST_LIST);

    @Override
    public boolean equals(Object o) {
      if (this == o) {
        return true;
      }
      if (!(o instanceof RowMultipleArray)) {
        return false;
      }
      RowMultipleArray that = (RowMultipleArray) o;
      return Objects.equals(field1, that.field1);
    }

    @Override
    public int hashCode() {
      return Objects.hash(field1);
    }
  }

  @DefaultSchema(JavaFieldSchema.class)
  static class PartialRowMultipleArray {
    private static final List<POJO1Selected> POJO_LIST =
        ImmutableList.of(new POJO1Selected(), new POJO1Selected(), new POJO1Selected());
    private static final List<List<POJO1Selected>> POJO_LIST_LIST =
        ImmutableList.of(POJO_LIST, POJO_LIST, POJO_LIST);

    private static final List<String> STRING_LIST = ImmutableList.of("field1", "field1", "field1");
    private static final List<List<String>> STRING_LISTLIST =
        ImmutableList.of(STRING_LIST, STRING_LIST, STRING_LIST);
    List<List<List<String>>> field1 =
        ImmutableList.of(STRING_LISTLIST, STRING_LISTLIST, STRING_LISTLIST);

    private static final List<Double> DOUBLE_LIST = ImmutableList.of(3.14, 3.14, 3.14);
    private static final List<List<Double>> DOUBLE_LISTLIST =
        ImmutableList.of(DOUBLE_LIST, DOUBLE_LIST, DOUBLE_LIST);
    List<List<List<Double>>> field3 =
        ImmutableList.of(DOUBLE_LISTLIST, DOUBLE_LISTLIST, DOUBLE_LISTLIST);

    @Override
    public boolean equals(Object o) {
      if (this == o) {
        return true;
      }
      if (!(o instanceof PartialRowMultipleArray)) {
        return false;
      }
      PartialRowMultipleArray that = (PartialRowMultipleArray) o;
      return Objects.equals(field1, that.field1);
    }

    @Override
    public int hashCode() {
      return Objects.hash(field1);
    }
  }

  @Test
  @Category(NeedsRunner.class)
  public void testSelectedNestedArrays() {
    PCollection<RowMultipleArray> input = pipeline.apply(Create.of(new RowMultipleArray()));

    PCollection<PartialRowMultipleArray> selected =
        input
            .apply("select1", Select.fieldNames("field1.field1", "field1.field3"))
            .apply("convert1", Convert.to(PartialRowMultipleArray.class));
    PAssert.that(selected).containsInAnyOrder(new PartialRowMultipleArray());

    PCollection<PartialRowMultipleArray> selected2 =
        input
            .apply("select2", Select.fieldNames("field1[][][].field1", "field1[][][].field3"))
            .apply("convert2", Convert.to(PartialRowMultipleArray.class));

    PAssert.that(selected).containsInAnyOrder(new PartialRowMultipleArray());
    PAssert.that(selected2).containsInAnyOrder(new PartialRowMultipleArray());
    pipeline.run();
  }

  @DefaultSchema(JavaFieldSchema.class)
  static class RowMultipleMaps {
    static final Map<String, POJO1> POJO_MAP =
        ImmutableMap.of(
            "key1", new POJO1(),
            "key2", new POJO1(),
            "key3", new POJO1());
    static final Map<String, Map<String, POJO1>> POJO_MAP_MAP =
        ImmutableMap.of(
            "key1", POJO_MAP,
            "key2", POJO_MAP,
            "key3", POJO_MAP);
    Map<String, Map<String, Map<String, POJO1>>> field1 =
        ImmutableMap.of(
            "key1", POJO_MAP_MAP,
            "key2", POJO_MAP_MAP,
            "key3", POJO_MAP_MAP);

    @Override
    public boolean equals(Object o) {
      if (this == o) {
        return true;
      }
      if (!(o instanceof RowMultipleMaps)) {
        return false;
      }
      RowMultipleMaps that = (RowMultipleMaps) o;
      return Objects.equals(field1, that.field1);
    }

    @Override
    public int hashCode() {
      return Objects.hash(field1);
    }
  }

  @DefaultSchema(JavaFieldSchema.class)
  static class PartialRowMultipleMaps {
    static final Map<String, String> STRING_MAP =
        ImmutableMap.of(
            "key1", "field1",
            "key2", "field1",
            "key3", "field1");
    static final Map<String, Map<String, String>> STRING_MAPMAP =
        ImmutableMap.of(
            "key1", STRING_MAP,
            "key2", STRING_MAP,
            "key3", STRING_MAP);
    Map<String, Map<String, Map<String, String>>> field1 =
        ImmutableMap.of(
            "key1", STRING_MAPMAP,
            "key2", STRING_MAPMAP,
            "key3", STRING_MAPMAP);
    static final Map<String, Double> DOUBLE_MAP =
        ImmutableMap.of(
            "key1", 3.14,
            "key2", 3.14,
            "key3", 3.14);
    static final Map<String, Map<String, Double>> DOUBLE_MAPMAP =
        ImmutableMap.of(
            "key1", DOUBLE_MAP,
            "key2", DOUBLE_MAP,
            "key3", DOUBLE_MAP);

    Map<String, Map<String, Map<String, Double>>> field3 =
        ImmutableMap.of(
            "key1", DOUBLE_MAPMAP,
            "key2", DOUBLE_MAPMAP,
            "key3", DOUBLE_MAPMAP);

    @Override
    public boolean equals(Object o) {
      if (this == o) {
        return true;
      }
      if (!(o instanceof PartialRowMultipleMaps)) {
        return false;
      }
      PartialRowMultipleMaps that = (PartialRowMultipleMaps) o;
      return Objects.equals(field1, that.field1) && Objects.equals(field3, that.field3);
    }

    @Override
    public int hashCode() {
      return Objects.hash(field1, field3);
    }
  }

  @Test
  @Category(NeedsRunner.class)
  public void testSelectRowNestedMaps() {
    PCollection<RowMultipleMaps> input = pipeline.apply(Create.of(new RowMultipleMaps()));

    PCollection<PartialRowMultipleMaps> selected =
        input
            .apply("select1", Select.fieldNames("field1.field1", "field1.field3"))
            .apply("convert1", Convert.to(PartialRowMultipleMaps.class));

    PCollection<PartialRowMultipleMaps> selected2 =
        input
            .apply("select2", Select.fieldNames("field1{}{}{}.field1", "field1{}{}{}.field3"))
            .apply("convert2", Convert.to(PartialRowMultipleMaps.class));

    PAssert.that(selected).containsInAnyOrder(new PartialRowMultipleMaps());
    PAssert.that(selected2).containsInAnyOrder(new PartialRowMultipleMaps());
    pipeline.run();
  }

  @DefaultSchema(JavaFieldSchema.class)
  static class RowNestedArraysAndMaps {
    static final List<POJO1> POJO_LIST = ImmutableList.of(new POJO1(), new POJO1(), new POJO1());
    static final Map<String, List<POJO1>> POJO_MAP_LIST =
        ImmutableMap.of(
            "key1", POJO_LIST,
            "key2", POJO_LIST,
            "key3", POJO_LIST);
    List<Map<String, List<POJO1>>> field1 =
        ImmutableList.of(POJO_MAP_LIST, POJO_MAP_LIST, POJO_MAP_LIST);

    @Override
    public boolean equals(Object o) {
      if (this == o) {
        return true;
      }
      if (!(o instanceof RowNestedArraysAndMaps)) {
        return false;
      }
      RowNestedArraysAndMaps that = (RowNestedArraysAndMaps) o;
      return Objects.equals(field1, that.field1);
    }

    @Override
    public int hashCode() {
      return Objects.hash(field1);
    }
  }

  @DefaultSchema(JavaFieldSchema.class)
  static class PartialRowNestedArraysAndMaps {
    static final Map<String, List<String>> STRING_MAP =
        ImmutableMap.of(
            "key1", ImmutableList.of("field1", "field1", "field1"),
            "key2", ImmutableList.of("field1", "field1", "field1"),
            "key3", ImmutableList.of("field1", "field1", "field1"));
    List<Map<String, List<String>>> field1 = ImmutableList.of(STRING_MAP, STRING_MAP, STRING_MAP);

    static final Map<String, List<Double>> DOUBLE_MAP =
        ImmutableMap.of(
            "key1", ImmutableList.of(3.14, 3.14, 3.14),
            "key2", ImmutableList.of(3.14, 3.14, 3.14),
            "key3", ImmutableList.of(3.14, 3.14, 3.14));

    List<Map<String, List<Double>>> field3 = ImmutableList.of(DOUBLE_MAP, DOUBLE_MAP, DOUBLE_MAP);

    @Override
    public boolean equals(Object o) {
      if (this == o) {
        return true;
      }
      if (!(o instanceof PartialRowNestedArraysAndMaps)) {
        return false;
      }
      PartialRowNestedArraysAndMaps that = (PartialRowNestedArraysAndMaps) o;
      return Objects.equals(field1, that.field1) && Objects.equals(field3, that.field3);
    }

    @Override
    public int hashCode() {
      return Objects.hash(field1, field3);
    }

    @Override
    public String toString() {
      return "PartialRowNestedArraysAndMaps{" + "field1=" + field1 + ", field3=" + field3 + '}';
    }
  }

  @Test
  @Category(NeedsRunner.class)
  public void testSelectRowNestedListsAndMaps() {
    PCollection<RowNestedArraysAndMaps> input =
        pipeline.apply(Create.of(new RowNestedArraysAndMaps()));

    PCollection<PartialRowNestedArraysAndMaps> selected =
        input
            .apply("select1", Select.fieldNames("field1.field1", "field1.field3"))
            .apply("convert1", Convert.to(PartialRowNestedArraysAndMaps.class));

    PCollection<PartialRowNestedArraysAndMaps> selected2 =
        input
            .apply("select2", Select.fieldNames("field1[]{}[].field1", "field1[]{}[].field3"))
            .apply("convert2", Convert.to(PartialRowNestedArraysAndMaps.class));

    PAssert.that(selected).containsInAnyOrder(new PartialRowNestedArraysAndMaps());
    PAssert.that(selected2).containsInAnyOrder(new PartialRowNestedArraysAndMaps());
    pipeline.run();
  }
}
