/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package org.apache.samza.test.framework;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.stream.Collectors;
import org.apache.samza.SamzaException;
import org.apache.samza.application.StreamApplication;
import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
import org.apache.samza.operators.KV;
import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.OutputStream;
import org.apache.samza.serializers.IntegerSerde;
import org.apache.samza.serializers.JsonSerdeV2;
import org.apache.samza.serializers.KVSerde;
import org.apache.samza.serializers.NoOpSerde;
import org.apache.samza.serializers.StringSerde;
import org.apache.samza.storage.kv.descriptors.RocksDbTableDescriptor;
import org.apache.samza.system.OutgoingMessageEnvelope;
import org.apache.samza.system.SystemStream;
import org.apache.samza.system.kafka.descriptors.KafkaInputDescriptor;
import org.apache.samza.system.kafka.descriptors.KafkaOutputDescriptor;
import org.apache.samza.system.kafka.descriptors.KafkaSystemDescriptor;
import org.apache.samza.table.Table;
import org.apache.samza.test.TestData;
import org.apache.samza.test.framework.system.descriptors.InMemoryInputDescriptor;
import org.apache.samza.test.framework.system.descriptors.InMemoryOutputDescriptor;
import org.apache.samza.test.framework.system.descriptors.InMemorySystemDescriptor;
import org.apache.samza.test.table.PageViewToProfileJoinFunction;
import org.apache.samza.test.table.TestTableData;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.samza.test.TestData.PageView;

public class StreamApplicationIntegrationTest {
  private static final Logger LOG = LoggerFactory.getLogger(StreamApplicationIntegrationTest.class);

  private static final String[] PAGEKEYS = {"inbox", "home", "search", "pymk", "group", "job"};

  @Test
  public void testStatefulJoinWithLocalTable() {
    Random random = new Random();
    List<KV<String, TestTableData.PageView>> pageViews = Arrays.asList(TestTableData.generatePageViews(10))
        .stream()
        .map(x -> KV.of(PAGEKEYS[random.nextInt(PAGEKEYS.length)], x))
        .collect(Collectors.toList());
    List<KV<String, TestTableData.Profile>> profiles = Arrays.asList(TestTableData.generateProfiles(10))
        .stream()
        .map(x -> KV.of(PAGEKEYS[random.nextInt(PAGEKEYS.length)], x))
        .collect(Collectors.toList());

    InMemorySystemDescriptor isd = new InMemorySystemDescriptor("test");

    InMemoryInputDescriptor<KV<String, TestTableData.PageView>> pageViewStreamDesc = isd
        .getInputDescriptor("PageView", new NoOpSerde<KV<String, TestTableData.PageView>>());

    InMemoryInputDescriptor<KV<String, TestTableData.Profile>> profileStreamDesc = isd
        .getInputDescriptor("Profile", new NoOpSerde<KV<String, TestTableData.Profile>>())
        .shouldBootstrap();

    InMemoryOutputDescriptor<TestTableData.EnrichedPageView> outputStreamDesc = isd
        .getOutputDescriptor("EnrichedPageView", new NoOpSerde<>());

    InMemoryOutputDescriptor<String> joinKeysDescriptor = isd
        .getOutputDescriptor("JoinPageKeys", new NoOpSerde<>());

    TestRunner
        .of(new PageViewProfileViewJoinApplication())
        .addInputStream(pageViewStreamDesc, pageViews)
        .addInputStream(profileStreamDesc, profiles)
        .addOutputStream(outputStreamDesc, 1)
        .addOutputStream(joinKeysDescriptor, 1)
        .run(Duration.ofSeconds(2));


    Assert.assertEquals(10, TestRunner.consumeStream(outputStreamDesc, Duration.ofSeconds(1)).get(0).size());
    Assert.assertEquals(10, TestRunner.consumeStream(joinKeysDescriptor, Duration.ofSeconds(1)).get(0).size());
  }

  @Test
  public void testHighLevelApi() throws Exception {
    Random random = new Random();
    int count = 10;
    List<PageView> pageViews = new ArrayList<>();
    for (int memberId = 0; memberId < count; memberId++) {
      String pagekey = PAGEKEYS[random.nextInt(PAGEKEYS.length - 1)];
      PageView pv = new PageView(pagekey, memberId);
      pageViews.add(pv);
    }

    InMemorySystemDescriptor isd = new InMemorySystemDescriptor("test");
    InMemoryInputDescriptor<PageView> imid = isd.getInputDescriptor("PageView", new NoOpSerde<PageView>());
    InMemoryOutputDescriptor<PageView> imod = isd.getOutputDescriptor("Output", new NoOpSerde<PageView>());

    TestRunner
        .of(new PageViewRepartitionApplication())
        .addInputStream(imid, pageViews)
        .addOutputStream(imod, 10)
        .run(Duration.ofMillis(1500));

    Assert.assertEquals(TestRunner.consumeStream(imod, Duration.ofMillis(1000)).get(random.nextInt(count)).size(), 1);
  }

  /**
   * Null page key is passed in input data which should fail filter logic
   */
  @Test(expected = SamzaException.class)
  public void testSamzaJobFailureForStreamApplication() {
    int count = 10;
    List<TestData.PageView> pageviews = new ArrayList<>();
    for (int memberId = 0; memberId < count; memberId++) {
      pageviews.add(new TestData.PageView(null, memberId));
    }

    InMemorySystemDescriptor isd = new InMemorySystemDescriptor("test");
    InMemoryInputDescriptor<PageView> imid = isd.getInputDescriptor("PageView", new NoOpSerde<PageView>());
    InMemoryOutputDescriptor<PageView> imod = isd.getOutputDescriptor("Output", new NoOpSerde<PageView>());

    TestRunner.of(new PageViewFilterApplication())
        .addInputStream(imid, pageviews)
        .addOutputStream(imod, 10)
        .run(Duration.ofMillis(1000));
  }


  private static class PageViewProfileViewJoinApplication implements StreamApplication {
    @Override
    public void describe(StreamApplicationDescriptor appDescriptor) {
      Table<KV<Integer, TestTableData.Profile>> table = appDescriptor.getTable(
          new RocksDbTableDescriptor<Integer, TestTableData.Profile>("profile-view-store",
              KVSerde.of(new IntegerSerde(), new TestTableData.ProfileJsonSerde())));

      KafkaSystemDescriptor ksd = new KafkaSystemDescriptor("test");

      KafkaInputDescriptor<KV<String, TestTableData.Profile>> profileISD =
          ksd.getInputDescriptor("Profile", KVSerde.of(new StringSerde(), new JsonSerdeV2<>()));

      KafkaInputDescriptor<KV<String, TestTableData.PageView>> pageViewISD =
          ksd.getInputDescriptor("PageView", KVSerde.of(new StringSerde(), new JsonSerdeV2<>()));
      KafkaOutputDescriptor<TestTableData.EnrichedPageView> enrichedPageViewOSD =
          ksd.getOutputDescriptor("EnrichedPageView", new JsonSerdeV2<>());

      appDescriptor.getInputStream(profileISD)
          .map(m -> new KV(m.getValue().getMemberId(), m.getValue()))
          .sendTo(table)
          .sink((kv, collector, coordinator) -> {
            LOG.info("Inserted Profile with Key: {} in profile-view-store", kv.getKey());
          });

      OutputStream<TestTableData.EnrichedPageView> outputStream = appDescriptor.getOutputStream(enrichedPageViewOSD);
      appDescriptor.getInputStream(pageViewISD)
          .partitionBy(pv -> pv.getValue().getMemberId(),  pv -> pv.getValue(), KVSerde.of(new IntegerSerde(), new JsonSerdeV2<>(TestTableData.PageView.class)), "p1")
          .join(table, new PageViewToProfileJoinFunction())
          .sendTo(outputStream)
          .map(TestTableData.EnrichedPageView::getPageKey)
          .sink((joinPageKey, collector, coordinator) -> {
            collector.send(new OutgoingMessageEnvelope(new SystemStream("test", "JoinPageKeys"), null, null, joinPageKey));
          });

    }
  }

  private static class PageViewFilterApplication implements StreamApplication {
    @Override
    public void describe(StreamApplicationDescriptor appDescriptor) {
      KafkaSystemDescriptor ksd = new KafkaSystemDescriptor("test");
      KafkaInputDescriptor<KV<String, PageView>> isd =
          ksd.getInputDescriptor("PageView", KVSerde.of(new StringSerde(), new JsonSerdeV2<>()));
      MessageStream<KV<String, TestData.PageView>> inputStream = appDescriptor.getInputStream(isd);
      inputStream.map(KV::getValue).filter(pv -> pv.getPageKey().equals("inbox"));
    }
  }

  private static class PageViewRepartitionApplication implements StreamApplication {
    @Override
    public void describe(StreamApplicationDescriptor appDescriptor) {
      KafkaSystemDescriptor ksd = new KafkaSystemDescriptor("test");
      KafkaInputDescriptor<KV<String, PageView>> isd =
          ksd.getInputDescriptor("PageView", KVSerde.of(new NoOpSerde<>(), new NoOpSerde<>()));
      MessageStream<KV<String, TestData.PageView>> inputStream = appDescriptor.getInputStream(isd);
      inputStream
          .map(KV::getValue)
          .partitionBy(PageView::getMemberId, pv -> pv, KVSerde.of(new IntegerSerde(), new JsonSerdeV2<>(PageView.class)), "p1")
          .sink((m, collector, coordinator) ->
              collector.send(new OutgoingMessageEnvelope(new SystemStream("test", "Output"), m.getKey(), m.getKey(), m)));
    }
  }
}
