/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.beam.sdk.extensions.jackson;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.Matchers.hasSize;
import static org.junit.Assert.assertEquals;

import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import java.io.Serializable;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.MapCoder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.WithFailures;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.junit.Rule;
import org.junit.Test;

/** Test Jackson transforms {@link ParseJsons} and {@link AsJsons}. */
public class JacksonTransformsTest implements Serializable {
  private static final List<String> VALID_JSONS =
      Arrays.asList("{\"myString\":\"abc\",\"myInt\":3}", "{\"myString\":\"def\",\"myInt\":4}");

  private static final List<String> INVALID_JSONS =
      Arrays.asList("{myString:\"abc\",\"myInt\":3,\"other\":1}", "{", "");

  private static final List<String> EMPTY_JSONS = Arrays.asList("{}", "{}");

  private static final List<String> EXTRA_PROPERTIES_JSONS =
      Arrays.asList(
          "{\"myString\":\"abc\",\"myInt\":3,\"other\":1}", "{\"myString\":\"def\",\"myInt\":4}");

  private static final List<MyPojo> POJOS =
      Arrays.asList(new MyPojo("abc", 3), new MyPojo("def", 4));

  private static final List<MyInvalidPojo> INVALID_POJOS =
      Arrays.asList(new MyInvalidPojo("aaa", 5), new MyInvalidPojo("bbb", 6));

  private static final List<MyEmptyBean> EMPTY_BEANS =
      Arrays.asList(new MyEmptyBean("abc", 3), new MyEmptyBean("def", 4));

  @Rule public final transient TestPipeline pipeline = TestPipeline.create();

  @Test
  public void parseValidJsons() {
    PCollection<MyPojo> output =
        pipeline
            .apply(Create.of(VALID_JSONS))
            .apply(ParseJsons.of(MyPojo.class))
            .setCoder(SerializableCoder.of(MyPojo.class));

    PAssert.that(output).containsInAnyOrder(POJOS);

    pipeline.run();
  }

  @Test(expected = Pipeline.PipelineExecutionException.class)
  public void failParsingInvalidJsons() {
    PCollection<MyPojo> output =
        pipeline
            .apply(Create.of(Iterables.concat(VALID_JSONS, INVALID_JSONS)))
            .apply(ParseJsons.of(MyPojo.class))
            .setCoder(SerializableCoder.of(MyPojo.class));

    PAssert.that(output).containsInAnyOrder(POJOS);

    pipeline.run();
  }

  @Test
  public void testParsingInvalidJsonsWithFailuresDefaultHandler() {
    WithFailures.Result<PCollection<MyPojo>, KV<String, Map<String, String>>> result =
        pipeline
            .apply(Create.of(Iterables.concat(VALID_JSONS, INVALID_JSONS)))
            .apply(ParseJsons.of(MyPojo.class).exceptionsVia());

    result.output().setCoder(SerializableCoder.of(MyPojo.class));

    PAssert.that(result.output()).containsInAnyOrder(POJOS);
    assertParsingWithErrorMapHandler(result);

    pipeline.run();
  }

  @Test
  public void testParsingInvalidJsonsWithFailuresAsMap() {
    WithFailures.Result<PCollection<MyPojo>, KV<String, Map<String, String>>> result =
        pipeline
            .apply(Create.of(Iterables.concat(VALID_JSONS, INVALID_JSONS)))
            .apply(
                ParseJsons.of(MyPojo.class)
                    .exceptionsVia(new WithFailures.ExceptionAsMapHandler<String>() {}));

    result.output().setCoder(SerializableCoder.of(MyPojo.class));

    PAssert.that(result.output()).containsInAnyOrder(POJOS);
    assertParsingWithErrorMapHandler(result);

    pipeline.run();
  }

  @Test
  public void testParsingInvalidJsonsWithFailuresSimpleFunction() {
    WithFailures.Result<PCollection<MyPojo>, KV<String, String>> result =
        pipeline
            .apply(Create.of(Iterables.concat(VALID_JSONS, INVALID_JSONS)))
            .apply(
                ParseJsons.of(MyPojo.class)
                    .exceptionsVia(
                        new SimpleFunction<
                            WithFailures.ExceptionElement<String>, KV<String, String>>() {
                          @Override
                          public KV<String, String> apply(
                              WithFailures.ExceptionElement<String> failure) {
                            return KV.of(
                                failure.element(),
                                failure.exception().getClass().getCanonicalName());
                          }
                        }));
    result.output().setCoder(SerializableCoder.of(MyPojo.class));

    PAssert.that(result.output()).containsInAnyOrder(POJOS);
    assertParsingWithErrorFunctionHandler(result);

    pipeline.run();
  }

  @Test
  public void testParsingInvalidJsonsWithFailuresLambda() {
    WithFailures.Result<PCollection<MyPojo>, KV<String, String>> result =
        pipeline
            .apply(Create.of(Iterables.concat(VALID_JSONS, INVALID_JSONS)))
            .apply(
                ParseJsons.of(MyPojo.class)
                    .exceptionsInto(
                        TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.strings()))
                    .exceptionsVia(
                        f -> KV.of(f.element(), f.exception().getClass().getCanonicalName())));
    result.output().setCoder(SerializableCoder.of(MyPojo.class));

    PAssert.that(result.output()).containsInAnyOrder(POJOS);
    assertParsingWithErrorFunctionHandler(result);

    pipeline.run();
  }

  @Test(expected = Pipeline.PipelineExecutionException.class)
  public void failParsingWithoutCustomMapper() {
    PCollection<MyPojo> output =
        pipeline
            .apply(Create.of(EXTRA_PROPERTIES_JSONS))
            .apply(ParseJsons.of(MyPojo.class))
            .setCoder(SerializableCoder.of(MyPojo.class));

    PAssert.that(output).empty();

    pipeline.run();
  }

  @Test
  public void parseUsingCustomMapper() {
    ObjectMapper customMapper =
        new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);

    PCollection<MyPojo> output =
        pipeline
            .apply(Create.of(EXTRA_PROPERTIES_JSONS))
            .apply(ParseJsons.of(MyPojo.class).withMapper(customMapper))
            .setCoder(SerializableCoder.of(MyPojo.class));

    PAssert.that(output).containsInAnyOrder(POJOS);

    pipeline.run();
  }

  @Test
  public void writeValidObjects() {
    PCollection<String> output =
        pipeline
            .apply(Create.of(POJOS))
            .apply(AsJsons.of(MyPojo.class))
            .setCoder(StringUtf8Coder.of());

    PAssert.that(output).containsInAnyOrder(VALID_JSONS);

    pipeline.run();
  }

  @Test(expected = Pipeline.PipelineExecutionException.class)
  public void failWritingWithoutCustomMapper() {
    pipeline
        .apply(Create.of(EMPTY_BEANS))
        .apply(AsJsons.of(MyEmptyBean.class))
        .setCoder(StringUtf8Coder.of());

    pipeline.run();
  }

  @Test
  public void writeUsingCustomMapper() {
    ObjectMapper customMapper =
        new ObjectMapper().configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);

    PCollection<String> output =
        pipeline
            .apply(Create.of(EMPTY_BEANS))
            .apply(AsJsons.of(MyEmptyBean.class).withMapper(customMapper))
            .setCoder(StringUtf8Coder.of());

    PAssert.that(output).containsInAnyOrder(EMPTY_JSONS);

    pipeline.run();
  }

  @Test
  public void testWritingInvalidJsonsWithFailuresDefaultHandler() {
    WithFailures.Result<PCollection<String>, KV<MyPojo, Map<String, String>>> result =
        pipeline
            .apply(
                Create.of(Iterables.concat(POJOS, INVALID_POJOS))
                    .withCoder(SerializableCoder.of(MyPojo.class)))
            .apply(AsJsons.of(MyPojo.class).exceptionsVia());

    result.output().setCoder(StringUtf8Coder.of());

    result
        .failures()
        .setCoder(
            KvCoder.of(
                SerializableCoder.of(MyPojo.class),
                MapCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())));

    PAssert.that(result.output()).containsInAnyOrder(VALID_JSONS);
    assertWritingWithErrorMapHandler(result);

    pipeline.run();
  }

  @Test
  public void testWritingInvalidJsonsWithFailuresAsMap() {
    WithFailures.Result<PCollection<String>, KV<MyPojo, Map<String, String>>> result =
        pipeline
            .apply(
                Create.of(Iterables.concat(POJOS, INVALID_POJOS))
                    .withCoder(SerializableCoder.of(MyPojo.class)))
            .apply(
                AsJsons.of(MyPojo.class)
                    .exceptionsVia(new WithFailures.ExceptionAsMapHandler<MyPojo>() {}));

    result.output().setCoder(StringUtf8Coder.of());

    PAssert.that(result.output()).containsInAnyOrder(VALID_JSONS);
    assertWritingWithErrorMapHandler(result);

    pipeline.run();
  }

  @Test
  public void testWritingInvalidJsonsWithFailuresSimpleFunction() {
    WithFailures.Result<PCollection<String>, KV<MyPojo, String>> result =
        pipeline
            .apply(
                Create.of(Iterables.concat(POJOS, INVALID_POJOS))
                    .withCoder(SerializableCoder.of(MyPojo.class)))
            .apply(
                AsJsons.of(MyPojo.class)
                    .exceptionsVia(
                        new SimpleFunction<
                            WithFailures.ExceptionElement<MyPojo>, KV<MyPojo, String>>() {
                          @Override
                          public KV<MyPojo, String> apply(
                              WithFailures.ExceptionElement<MyPojo> failure) {
                            return KV.of(
                                failure.element(),
                                failure.exception().getClass().getCanonicalName());
                          }
                        }));
    result.output().setCoder(StringUtf8Coder.of());

    PAssert.that(result.output()).containsInAnyOrder(VALID_JSONS);
    assertWritingWithErrorFunctionHandler(result);

    pipeline.run();
  }

  @Test
  public void testWritingInvalidJsonsWithFailuresLambda() {
    WithFailures.Result<PCollection<String>, KV<MyPojo, String>> result =
        pipeline
            .apply(
                Create.of(Iterables.concat(POJOS, INVALID_POJOS))
                    .withCoder(SerializableCoder.of(MyPojo.class)))
            .apply(
                AsJsons.of(MyPojo.class)
                    .exceptionsInto(
                        TypeDescriptors.kvs(
                            TypeDescriptor.of(MyPojo.class), TypeDescriptors.strings()))
                    .exceptionsVia(
                        f -> KV.of(f.element(), f.exception().getClass().getCanonicalName())));
    result.output().setCoder(StringUtf8Coder.of());

    PAssert.that(result.output()).containsInAnyOrder(VALID_JSONS);
    assertWritingWithErrorFunctionHandler(result);

    pipeline.run();
  }

  /** Pojo for tests. */
  @SuppressWarnings({"WeakerAccess", "unused"})
  public static class MyPojo implements Serializable {
    private String myString;
    private int myInt;

    public MyPojo() {}

    public MyPojo(String myString, int myInt) {
      this.myString = myString;
      this.myInt = myInt;
    }

    public String getMyString() {
      return myString;
    }

    public void setMyString(String myString) {
      this.myString = myString;
    }

    public int getMyInt() {
      return myInt;
    }

    public void setMyInt(int myInt) {
      this.myInt = myInt;
    }

    @Override
    public boolean equals(Object o) {
      if (this == o) {
        return true;
      }

      if (!(o instanceof MyPojo)) {
        return false;
      }

      MyPojo myPojo = (MyPojo) o;

      return myInt == myPojo.myInt
          && (myString != null ? myString.equals(myPojo.myString) : myPojo.myString == null);
    }

    @Override
    public int hashCode() {
      int result = myString != null ? myString.hashCode() : 0;
      result = 31 * result + myInt;
      return result;
    }
  }

  /** Pojo for tests. */
  @SuppressWarnings({"WeakerAccess", "unused"})
  public static class MyEmptyBean implements Serializable {
    private String myString;
    private int myInt;

    public MyEmptyBean(String myString, int myInt) {
      this.myString = myString;
      this.myInt = myInt;
    }

    @Override
    public boolean equals(Object o) {
      if (this == o) {
        return true;
      }
      if (o == null || getClass() != o.getClass()) {
        return false;
      }

      MyEmptyBean that = (MyEmptyBean) o;

      if (myInt != that.myInt) {
        return false;
      }
      return myString != null ? myString.equals(that.myString) : that.myString == null;
    }

    @Override
    public int hashCode() {
      int result = myString != null ? myString.hashCode() : 0;
      result = 31 * result + myInt;
      return result;
    }
  }

  /** Pojo for tests. */
  @SuppressWarnings({"WeakerAccess", "unused"})
  public static class MyInvalidPojo extends MyPojo {
    public MyInvalidPojo(String myString, int myInt) {
      super(myString, myInt);
    }

    @Override
    public String getMyString() {
      throw new RuntimeException("Unknown error!");
    }
  }

  private void assertParsingWithErrorMapHandler(
      WithFailures.Result<PCollection<MyPojo>, KV<String, Map<String, String>>> result) {
    PAssert.that(result.failures())
        .satisfies(
            kv -> {
              for (KV<String, Map<String, String>> entry : kv) {
                if (entry.getKey().equals(INVALID_JSONS.get(0))) {
                  assertEquals(
                      "com.fasterxml.jackson.core.JsonParseException",
                      entry.getValue().get("className"));
                } else if (entry.getKey().equals(INVALID_JSONS.get(1))) {
                  assertEquals(
                      "com.fasterxml.jackson.core.io.JsonEOFException",
                      entry.getValue().get("className"));
                } else if (entry.getKey().equals(INVALID_JSONS.get(2))) {
                  assertEquals(
                      "com.fasterxml.jackson.databind.exc.MismatchedInputException",
                      entry.getValue().get("className"));
                } else {
                  throw new AssertionError(
                      "Unexpected key is found in failures result: \"" + entry.getKey() + "\"");
                }
                assertThat(entry.getValue().entrySet(), hasSize(3));
                assertThat(entry.getValue(), hasKey("stackTrace"));
                assertThat(entry.getValue(), hasKey("message"));
              }

              return null;
            });
  }

  private void assertParsingWithErrorFunctionHandler(
      WithFailures.Result<PCollection<MyPojo>, KV<String, String>> result) {
    PAssert.that(result.failures())
        .containsInAnyOrder(
            KV.of(INVALID_JSONS.get(0), "com.fasterxml.jackson.core.JsonParseException"),
            KV.of(INVALID_JSONS.get(1), "com.fasterxml.jackson.core.io.JsonEOFException"),
            KV.of(
                INVALID_JSONS.get(2),
                "com.fasterxml.jackson.databind.exc.MismatchedInputException"));
  }

  private void assertWritingWithErrorMapHandler(
      WithFailures.Result<PCollection<String>, KV<MyPojo, Map<String, String>>> result) {
    PAssert.that(result.failures())
        .satisfies(
            kv -> {
              for (KV<MyPojo, Map<String, String>> entry : kv) {
                assertThat(entry.getValue().entrySet(), hasSize(3));
                assertThat(entry.getValue(), hasKey("stackTrace"));
                assertThat(entry.getValue(), hasKey("message"));
                assertEquals(
                    "com.fasterxml.jackson.databind.JsonMappingException",
                    entry.getValue().get("className"));
              }
              return null;
            });
  }

  private void assertWritingWithErrorFunctionHandler(
      WithFailures.Result<PCollection<String>, KV<MyPojo, String>> result) {
    PAssert.that(result.failures())
        .containsInAnyOrder(
            KV.of(INVALID_POJOS.get(0), "com.fasterxml.jackson.databind.JsonMappingException"),
            KV.of(INVALID_POJOS.get(1), "com.fasterxml.jackson.databind.JsonMappingException"));
  }
}
