/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

package org.apache.samza.config;

import static org.junit.Assert.*;

import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.samza.Partition;
import org.apache.samza.checkpoint.Checkpoint;
import org.apache.samza.checkpoint.CheckpointManager;
import org.apache.samza.checkpoint.CheckpointManagerFactory;
import org.apache.samza.container.TaskName;
import org.apache.samza.container.grouper.task.GroupByContainerCountFactory;
import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.system.SystemStream;
import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.system.chooser.RoundRobinChooserFactory;
import org.junit.Test;


public class TestTaskConfig {
  @Test
  public void testGetInputStreams() {
    Config config = new MapConfig(
        ImmutableMap.of(TaskConfig.INPUT_STREAMS, "kafka.foo, kafka.bar, otherKafka.bar, otherKafka.foo.bar"));
    Set<SystemStream> expected = ImmutableSet.of(
        new SystemStream("kafka", "foo"),
        new SystemStream("kafka", "bar"),
        new SystemStream("otherKafka", "bar"),
        new SystemStream("otherKafka", "foo.bar"));
    assertEquals(expected, new TaskConfig(config).getAllInputStreams());

    // empty string for value
    MapConfig configEmptyInput = new MapConfig(ImmutableMap.of(TaskConfig.INPUT_STREAMS, ""));
    assertTrue(new TaskConfig(configEmptyInput).getInputStreams().isEmpty());
    // config not specified
    assertTrue(new TaskConfig(new MapConfig()).getInputStreams().isEmpty());
  }

  @Test
  public void testGetWindowMs() {
    Config config = new MapConfig(ImmutableMap.of(TaskConfig.WINDOW_MS, "10"));
    assertEquals(10, new TaskConfig(config).getWindowMs());

    config = new MapConfig(ImmutableMap.of(TaskConfig.WINDOW_MS, "-1"));
    assertEquals(-1, new TaskConfig(config).getWindowMs());

    // config not specified
    assertEquals(TaskConfig.DEFAULT_WINDOW_MS, new TaskConfig(new MapConfig()).getWindowMs());
  }

  @Test
  public void testGetCommitMs() {
    Config config = new MapConfig(ImmutableMap.of(TaskConfig.COMMIT_MS, "10"));
    assertEquals(10, new TaskConfig(config).getCommitMs());

    config = new MapConfig(ImmutableMap.of(TaskConfig.COMMIT_MS, "-1"));
    assertEquals(-1, new TaskConfig(config).getCommitMs());

    // config not specified
    assertEquals(TaskConfig.DEFAULT_COMMIT_MS, new TaskConfig(new MapConfig()).getCommitMs());
  }

  @Test
  public void testGetTaskClass() {
    String taskClass = "some.task.class";
    Config config = new MapConfig(ImmutableMap.of(TaskConfig.TASK_CLASS, taskClass));
    assertEquals(Optional.of(taskClass), new TaskConfig(config).getTaskClass());

    // config not specified
    assertFalse(new TaskConfig(new MapConfig()).getTaskClass().isPresent());
  }

  @Test
  public void testGetCommandClass() {
    String commandClass = "some.command.class";
    String defaultCommandClass = "default.command.class";
    Config config = new MapConfig(ImmutableMap.of(TaskConfig.COMMAND_BUILDER, commandClass));
    assertEquals(commandClass, new TaskConfig(config).getCommandClass(defaultCommandClass));

    // config not specified
    assertEquals(defaultCommandClass, new TaskConfig(new MapConfig()).getCommandClass(defaultCommandClass));
  }

  @Test
  public void testGetMessageChooserClass() {
    String messageChooserClassValue = "some.message.chooser.class";
    Config config = new MapConfig(ImmutableMap.of(TaskConfig.MESSAGE_CHOOSER_CLASS_NAME, messageChooserClassValue));
    assertEquals(messageChooserClassValue, new TaskConfig(config).getMessageChooserClass());

    // config not specified
    assertEquals(RoundRobinChooserFactory.class.getName(), new TaskConfig(new MapConfig()).getMessageChooserClass());
  }

  @Test
  public void testGetDropDeserializationErrors() {
    Config config = new MapConfig(ImmutableMap.of(TaskConfig.DROP_DESERIALIZATION_ERRORS, "true"));
    assertTrue(new TaskConfig(config).getDropDeserializationErrors());

    config = new MapConfig(ImmutableMap.of(TaskConfig.DROP_DESERIALIZATION_ERRORS, "false"));
    assertFalse(new TaskConfig(config).getDropDeserializationErrors());

    // config not specified
    assertFalse(new TaskConfig(new MapConfig()).getDropDeserializationErrors());
  }

  @Test
  public void testGetDropSerializationErrors() {
    Config config = new MapConfig(ImmutableMap.of(TaskConfig.DROP_SERIALIZATION_ERRORS, "true"));
    assertTrue(new TaskConfig(config).getDropSerializationErrors());

    config = new MapConfig(ImmutableMap.of(TaskConfig.DROP_SERIALIZATION_ERRORS, "false"));
    assertFalse(new TaskConfig(config).getDropSerializationErrors());

    // config not specified
    assertFalse(new TaskConfig(new MapConfig()).getDropSerializationErrors());
  }

  @Test
  public void testGetDropProducerErrors() {
    Config config = new MapConfig(ImmutableMap.of(TaskConfig.DROP_PRODUCER_ERRORS, "true"));
    assertTrue(new TaskConfig(config).getDropProducerErrors());

    config = new MapConfig(ImmutableMap.of(TaskConfig.DROP_PRODUCER_ERRORS, "false"));
    assertFalse(new TaskConfig(config).getDropProducerErrors());

    // config not specified
    assertFalse(new TaskConfig(new MapConfig()).getDropProducerErrors());
  }

  @Test
  public void testGetPollIntervalMs() {
    Config config = new MapConfig(ImmutableMap.of(TaskConfig.POLL_INTERVAL_MS, "10"));
    assertEquals(10, new TaskConfig(config).getPollIntervalMs());

    // config not specified
    assertEquals(TaskConfig.DEFAULT_POLL_INTERVAL_MS, new TaskConfig(new MapConfig()).getPollIntervalMs());
  }

  @Test
  public void testGetIgnoredExceptions() {
    String ignoredExceptionsValue = "exception0.class, exception1.class";
    Config config = new MapConfig(ImmutableMap.of(TaskConfig.IGNORED_EXCEPTIONS, ignoredExceptionsValue));
    assertEquals(Optional.of(ignoredExceptionsValue), new TaskConfig(config).getIgnoredExceptions());

    // config not specified
    assertFalse(new TaskConfig(new MapConfig()).getIgnoredExceptions().isPresent());
  }

  @Test
  public void testGetTaskNameGrouperFactory() {
    String taskNameGrouperFactoryValue = "task.name.grouper.factory.class";
    Config config = new MapConfig(ImmutableMap.of(TaskConfig.GROUPER_FACTORY, taskNameGrouperFactoryValue));
    assertEquals(taskNameGrouperFactoryValue, new TaskConfig(config).getTaskNameGrouperFactory());

    // config not specified
    assertEquals(GroupByContainerCountFactory.class.getName(),
        new TaskConfig(new MapConfig()).getTaskNameGrouperFactory());
  }

  @Test
  public void testGetMaxConcurrency() {
    Config config = new MapConfig(ImmutableMap.of(TaskConfig.MAX_CONCURRENCY, "10"));
    assertEquals(10, new TaskConfig(config).getMaxConcurrency());

    // config not specified
    assertEquals(TaskConfig.DEFAULT_MAX_CONCURRENCY, new TaskConfig(new MapConfig()).getMaxConcurrency());
  }

  @Test
  public void testGetCallbackTimeoutMs() {
    Config config = new MapConfig(ImmutableMap.of(TaskConfig.CALLBACK_TIMEOUT_MS, "10"));
    assertEquals(10, new TaskConfig(config).getCallbackTimeoutMs());

    config = new MapConfig(ImmutableMap.of(TaskConfig.CALLBACK_TIMEOUT_MS, "-1"));
    assertEquals(-1, new TaskConfig(config).getCallbackTimeoutMs());

    // config not specified
    assertEquals(TaskConfig.DEFAULT_CALLBACK_TIMEOUT_MS, new TaskConfig(new MapConfig()).getCallbackTimeoutMs());
  }

  @Test
  public void testGetAsyncCommit() {
    Config config = new MapConfig(ImmutableMap.of(TaskConfig.ASYNC_COMMIT, "true"));
    assertTrue(new TaskConfig(config).getAsyncCommit());

    config = new MapConfig(ImmutableMap.of(TaskConfig.ASYNC_COMMIT, "false"));
    assertFalse(new TaskConfig(config).getAsyncCommit());

    // config not specified
    assertFalse(new TaskConfig(new MapConfig()).getAsyncCommit());
  }

  @Test
  public void testGetMaxIdleMs() {
    Config config = new MapConfig(ImmutableMap.of(TaskConfig.MAX_IDLE_MS, "20"));
    assertEquals(20, new TaskConfig(config).getMaxIdleMs());

    // config not specified
    assertEquals(TaskConfig.DEFAULT_MAX_IDLE_MS, new TaskConfig(new MapConfig()).getMaxIdleMs());
  }

  @Test
  public void testGetCheckpointManager() {
    Config config =
        new MapConfig(ImmutableMap.of(TaskConfig.CHECKPOINT_MANAGER_FACTORY, MockCheckpointManagerFactory.class.getName()));
    assertTrue(new TaskConfig(config).getCheckpointManager(null)
        .get() instanceof MockCheckpointManager);

    Config configEmptyString = new MapConfig(ImmutableMap.of(TaskConfig.CHECKPOINT_MANAGER_FACTORY, ""));
    assertFalse(new TaskConfig(configEmptyString).getCheckpointManager(null).isPresent());

    assertFalse(new TaskConfig(new MapConfig()).getCheckpointManager(null).isPresent());
  }

  @Test
  public void testGetBroadcastSystemStreamPartitions() {
    // no entry for "task.broadcast.inputs"
    assertEquals(Collections.emptySet(), new TaskConfig(new MapConfig()).getBroadcastSystemStreamPartitions());

    HashMap<String, String> map = new HashMap<>();
    map.put("task.broadcast.inputs", "kafka.foo#4, kafka.boo#5, kafka.z-o-o#[12-14], kafka.foo.bar#[3-4]");
    Config config = new MapConfig(map);
    TaskConfig taskConfig = new TaskConfig(config);
    Set<SystemStreamPartition> systemStreamPartitionSet = taskConfig.getBroadcastSystemStreamPartitions();

    HashSet<SystemStreamPartition> expected = new HashSet<>();
    expected.add(new SystemStreamPartition("kafka", "foo", new Partition(4)));
    expected.add(new SystemStreamPartition("kafka", "boo", new Partition(5)));
    expected.add(new SystemStreamPartition("kafka", "z-o-o", new Partition(12)));
    expected.add(new SystemStreamPartition("kafka", "z-o-o", new Partition(13)));
    expected.add(new SystemStreamPartition("kafka", "z-o-o", new Partition(14)));
    expected.add(new SystemStreamPartition("kafka", "foo.bar", new Partition(3)));
    expected.add(new SystemStreamPartition("kafka", "foo.bar", new Partition(4)));
    assertEquals(expected, systemStreamPartitionSet);

    map.put("task.broadcast.inputs", "kafka.foo");
    taskConfig = new TaskConfig(new MapConfig(map));
    boolean catchCorrectException = false;
    try {
      taskConfig.getBroadcastSystemStreamPartitions();
    } catch (IllegalArgumentException e) {
      catchCorrectException = true;
    }
    assertTrue(catchCorrectException);

    map.put("task.broadcast.inputs", "kafka.org.apache.events.WhitelistedIps#1-2");
    taskConfig = new TaskConfig(new MapConfig(map));
    boolean invalidFormatException = false;
    try {
      taskConfig.getBroadcastSystemStreamPartitions();
    } catch (IllegalArgumentException e) {
      invalidFormatException = true;
    }
    assertTrue(invalidFormatException);
  }

  @Test
  public void testGetBroadcastSystemStreams() {
    Config config = new MapConfig(ImmutableMap.of(TaskConfig.BROADCAST_INPUT_STREAMS,
        "kafka.foo#4, kafka.bar#5, otherKafka.foo#4, otherKafka.foo.bar#5"));
    Set<SystemStream> expected = ImmutableSet.of(
        new SystemStream("kafka", "foo"),
        new SystemStream("kafka", "bar"),
        new SystemStream("otherKafka", "foo"),
        new SystemStream("otherKafka", "foo.bar"));
    assertEquals(expected, new TaskConfig(config).getBroadcastSystemStreams());
    assertTrue(new TaskConfig(new MapConfig()).getBroadcastSystemStreams().isEmpty());
  }

  @Test
  public void testGetAllInputStreams() {
    Config config = new MapConfig(ImmutableMap.of(
        TaskConfig.INPUT_STREAMS, "kafka.foo, otherKafka.bar",
        TaskConfig.BROADCAST_INPUT_STREAMS, "kafka.bar#4, otherKafka.foo#5"));
    Set<SystemStream> expected = ImmutableSet.of(
        new SystemStream("kafka", "foo"),
        new SystemStream("otherKafka", "bar"),
        new SystemStream("kafka", "bar"),
        new SystemStream("otherKafka", "foo"));
    assertEquals(expected, new TaskConfig(config).getAllInputStreams());

    Config configOnlyBroadcast = new MapConfig(ImmutableMap.of(
        TaskConfig.BROADCAST_INPUT_STREAMS, "kafka.bar#4, otherKafka.foo#5"));
    Set<SystemStream> expectedOnlyBroadcast = ImmutableSet.of(
        new SystemStream("kafka", "bar"),
        new SystemStream("otherKafka", "foo"));
    assertEquals(expectedOnlyBroadcast, new TaskConfig(configOnlyBroadcast).getAllInputStreams());

    Config configOnlyInputs = new MapConfig(ImmutableMap.of(TaskConfig.INPUT_STREAMS, "kafka.foo, otherKafka.bar"));
    Set<SystemStream> expectedOnlyInputs = ImmutableSet.of(
        new SystemStream("kafka", "foo"),
        new SystemStream("otherKafka", "bar"));
    assertEquals(expectedOnlyInputs, new TaskConfig(configOnlyInputs).getAllInputStreams());

    assertTrue(new TaskConfig(new MapConfig()).getAllInputStreams().isEmpty());
  }

  @Test
  public void testGetShutdownMs() {
    Config config = new MapConfig(ImmutableMap.of(TaskConfig.TASK_SHUTDOWN_MS, "10"));
    assertEquals(10, new TaskConfig(config).getShutdownMs());

    // unable to parse value into number
    config = new MapConfig(ImmutableMap.of(TaskConfig.TASK_SHUTDOWN_MS, "not a number"));
    assertEquals(TaskConfig.DEFAULT_TASK_SHUTDOWN_MS, new TaskConfig(config).getShutdownMs());

    // config not specified
    assertEquals(TaskConfig.DEFAULT_TASK_SHUTDOWN_MS, new TaskConfig(new MapConfig()).getShutdownMs());
  }

  @Test
  public void testGetTransactionalStateRestoreEnabled() {
    Map<String, String> configMap = new HashMap<>();
    configMap.put(TaskConfig.TRANSACTIONAL_STATE_RESTORE_ENABLED, "true");

    // standby and async commit both off; transactional state restore returned as enabled
    assertTrue(new TaskConfig(new MapConfig(configMap)).getTransactionalStateRestoreEnabled());

    // standby off and async commit on; transactional state restore returned as disabled
    configMap.put(TaskConfig.ASYNC_COMMIT, "true");
    configMap.put(JobConfig.STANDBY_TASKS_REPLICATION_FACTOR, "1");
    assertFalse(new TaskConfig(new MapConfig(configMap)).getTransactionalStateRestoreEnabled());

    // standby on and async commit off; transactional state restore returned as disabled
    configMap.put(TaskConfig.ASYNC_COMMIT, "false");
    configMap.put(JobConfig.STANDBY_TASKS_REPLICATION_FACTOR, "2");
    assertFalse(new TaskConfig(new MapConfig(configMap)).getTransactionalStateRestoreEnabled());

    // standby on and async commit on; transactional state restore returned as disabled
    configMap.put(TaskConfig.ASYNC_COMMIT, "true");
    configMap.put(JobConfig.STANDBY_TASKS_REPLICATION_FACTOR, "2");
    assertFalse(new TaskConfig(new MapConfig(configMap)).getTransactionalStateRestoreEnabled());
  }

  /**
   * Used for testing classloading a {@link CheckpointManagerFactory}.
   */
  public static class MockCheckpointManagerFactory implements CheckpointManagerFactory {
    @Override
    public CheckpointManager getCheckpointManager(Config config, MetricsRegistry registry) {
      return new MockCheckpointManager();
    }
  }

  /**
   * Placeholder class to be returned by {@link MockCheckpointManagerFactory}.
   */
  private static class MockCheckpointManager implements CheckpointManager {
    @Override
    public void start() { }

    @Override
    public void register(TaskName taskName) { }

    @Override
    public void writeCheckpoint(TaskName taskName, Checkpoint checkpoint) { }

    @Override
    public Checkpoint readLastCheckpoint(TaskName taskName) {
      return null;
    }

    @Override
    public void stop() { }
  }
}
