/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package org.apache.samza.config;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import java.io.File;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.regex.Pattern;
import org.apache.samza.SamzaException;
import org.apache.samza.container.grouper.stream.GroupByPartitionFactory;
import org.apache.samza.container.grouper.stream.HashSystemStreamPartitionMapperFactory;
import org.apache.samza.coordinator.metadatastore.CoordinatorStreamMetadataStoreFactory;
import org.apache.samza.runtime.DefaultLocationIdProviderFactory;
import org.junit.Assert;
import org.junit.Test;

import static org.junit.Assert.*;


public class TestJobConfig {
  @Test
  public void testGetName() {
    String jobName = "job-name";
    JobConfig jobConfig = new JobConfig(new MapConfig(ImmutableMap.of(JobConfig.JOB_NAME, jobName)));
    assertEquals(Optional.of(jobName), jobConfig.getName());

    jobConfig = new JobConfig(new MapConfig());
    assertEquals(Optional.empty(), jobConfig.getName());
  }

  @Test
  public void testGetCoordinatorSystemName() {
    String coordinatorSystemName = "coordinator-system", jobDefaultSystem = "job-default-system";

    // has job coordinator system and default system; choose job coordinator system
    JobConfig jobConfig = new JobConfig(new MapConfig(
        ImmutableMap.of(JobConfig.JOB_COORDINATOR_SYSTEM, coordinatorSystemName, JobConfig.JOB_DEFAULT_SYSTEM,
            jobDefaultSystem)));
    assertEquals(coordinatorSystemName, jobConfig.getCoordinatorSystemName());

    // has job coordinator system only; choose job coordinator system
    jobConfig = new JobConfig(new MapConfig(ImmutableMap.of(JobConfig.JOB_COORDINATOR_SYSTEM, coordinatorSystemName)));
    assertEquals(coordinatorSystemName, jobConfig.getCoordinatorSystemName());

    // has default system only; choose default system
    jobConfig = new JobConfig(new MapConfig(ImmutableMap.of(JobConfig.JOB_DEFAULT_SYSTEM, jobDefaultSystem)));
    assertEquals(jobDefaultSystem, jobConfig.getCoordinatorSystemName());

    try {
      new JobConfig(new MapConfig()).getCoordinatorSystemName();
      fail("Should have gotten a ConfigException");
    } catch (ConfigException e) {
      // expected
    }
  }

  @Test
  public void testGetCoordinatorSystemNameOrNull() {
    String coordinatorSystemName = "coordinator-system", jobDefaultSystem = "job-default-system";

    // has job coordinator system and default system; choose job coordinator system
    JobConfig jobConfig = new JobConfig(new MapConfig(
        ImmutableMap.of(JobConfig.JOB_COORDINATOR_SYSTEM, coordinatorSystemName, JobConfig.JOB_DEFAULT_SYSTEM,
            jobDefaultSystem)));
    assertEquals(coordinatorSystemName, jobConfig.getCoordinatorSystemNameOrNull());

    // has job coordinator system only; choose job coordinator system
    jobConfig = new JobConfig(new MapConfig(ImmutableMap.of(JobConfig.JOB_COORDINATOR_SYSTEM, coordinatorSystemName)));
    assertEquals(coordinatorSystemName, jobConfig.getCoordinatorSystemNameOrNull());

    // has default system only; choose default system
    jobConfig = new JobConfig(new MapConfig(ImmutableMap.of(JobConfig.JOB_DEFAULT_SYSTEM, jobDefaultSystem)));
    assertEquals(jobDefaultSystem, jobConfig.getCoordinatorSystemNameOrNull());

    jobConfig = new JobConfig(new MapConfig());
    assertNull(jobConfig.getCoordinatorSystemNameOrNull());
  }

  @Test
  public void testGetDefaultSystem() {
    String jobDefaultSystem = "job-default-system";

    JobConfig jobConfig = new JobConfig(new MapConfig(ImmutableMap.of(JobConfig.JOB_DEFAULT_SYSTEM, jobDefaultSystem)));
    assertEquals(Optional.of(jobDefaultSystem), jobConfig.getDefaultSystem());

    jobConfig = new JobConfig(new MapConfig());
    assertEquals(Optional.empty(), jobConfig.getDefaultSystem());
  }

  @Test
  public void testGetContainerCount() {
    int jobContainerCount = 10, yarnContainerCount = 5;

    // has job container count and yarn container count; choose job container count
    JobConfig jobConfig = new JobConfig(new MapConfig(
        ImmutableMap.of(JobConfig.JOB_CONTAINER_COUNT, Integer.toString(jobContainerCount), "yarn.container.count",
            Integer.toString(yarnContainerCount))));
    assertEquals(jobContainerCount, jobConfig.getContainerCount());

    // has job container count only; choose job container count
    jobConfig = new JobConfig(
        new MapConfig(ImmutableMap.of(JobConfig.JOB_CONTAINER_COUNT, Integer.toString(jobContainerCount))));
    assertEquals(jobContainerCount, jobConfig.getContainerCount());

    // has yarn container count only; choose yarn container count
    jobConfig =
        new JobConfig(new MapConfig(ImmutableMap.of("yarn.container.count", Integer.toString(yarnContainerCount))));
    assertEquals(yarnContainerCount, jobConfig.getContainerCount());

    // not specified; use default
    jobConfig = new JobConfig(new MapConfig());
    assertEquals(JobConfig.DEFAULT_JOB_CONTAINER_COUNT, jobConfig.getContainerCount());
  }

  @Test
  public void testGetMonitorRegexDisabled() {
    // positive means enabled
    JobConfig jobConfig = new JobConfig(
        new MapConfig(ImmutableMap.of(JobConfig.MONITOR_INPUT_REGEX_FREQUENCY_MS, Integer.toString(100))));
    assertFalse(jobConfig.getMonitorRegexDisabled());

    // zero means disabled
    jobConfig =
        new JobConfig(new MapConfig(ImmutableMap.of(JobConfig.MONITOR_INPUT_REGEX_FREQUENCY_MS, Integer.toString(0))));
    assertTrue(jobConfig.getMonitorRegexDisabled());

    // negative means disabled
    jobConfig =
        new JobConfig(new MapConfig(ImmutableMap.of(JobConfig.MONITOR_INPUT_REGEX_FREQUENCY_MS, Integer.toString(-1))));
    assertTrue(jobConfig.getMonitorRegexDisabled());

    // not specified uses the default monitor partition change frequency, which means enabled
    jobConfig = new JobConfig(new MapConfig());
    assertFalse(jobConfig.getMonitorRegexDisabled());
  }

  @Test
  public void testGetMonitorPartitionChangeFrequency() {
    JobConfig jobConfig = new JobConfig(
        new MapConfig(ImmutableMap.of(JobConfig.MONITOR_PARTITION_CHANGE_FREQUENCY_MS, Integer.toString(100))));
    assertEquals(100, jobConfig.getMonitorPartitionChangeFrequency());

    jobConfig = new JobConfig(
        new MapConfig(ImmutableMap.of(JobConfig.MONITOR_PARTITION_CHANGE_FREQUENCY_MS, Integer.toString(0))));
    assertEquals(0, jobConfig.getMonitorPartitionChangeFrequency());

    jobConfig = new JobConfig(
        new MapConfig(ImmutableMap.of(JobConfig.MONITOR_PARTITION_CHANGE_FREQUENCY_MS, Integer.toString(-1))));
    assertEquals(-1, jobConfig.getMonitorPartitionChangeFrequency());

    jobConfig = new JobConfig(new MapConfig());
    assertEquals(JobConfig.DEFAULT_MONITOR_PARTITION_CHANGE_FREQUENCY_MS,
        jobConfig.getMonitorPartitionChangeFrequency());
  }

  @Test
  public void testGetMonitorRegexFrequency() {
    JobConfig jobConfig = new JobConfig(
        new MapConfig(ImmutableMap.of(JobConfig.MONITOR_INPUT_REGEX_FREQUENCY_MS, Integer.toString(100))));
    assertEquals(100, jobConfig.getMonitorRegexFrequency());

    jobConfig =
        new JobConfig(new MapConfig(ImmutableMap.of(JobConfig.MONITOR_INPUT_REGEX_FREQUENCY_MS, Integer.toString(0))));
    assertEquals(0, jobConfig.getMonitorRegexFrequency());

    jobConfig =
        new JobConfig(new MapConfig(ImmutableMap.of(JobConfig.MONITOR_INPUT_REGEX_FREQUENCY_MS, Integer.toString(-1))));
    assertEquals(-1, jobConfig.getMonitorRegexFrequency());

    jobConfig = new JobConfig(new MapConfig());
    assertEquals(JobConfig.DEFAULT_MONITOR_INPUT_REGEX_FREQUENCY_MS, jobConfig.getMonitorRegexFrequency());
  }

  @Test
  public void testGetMonitorRegexPatternMap() {
    // 2 different rewriters to system0, 1 rewriter to system1, 1 rewriter which isn't in rewritersList
    String system0 = "system0", system1 = "system1";
    String system0Rewriter0 = "system-0-rewriter-0", system0Rewriter1 = "system-0-rewriter-1", system1Rewriter =
        "system-1-rewriter", systemOnlyRewriter = "system-only-rewriter";
    String system0Rewriter0Streams = "system-0-rewriter-0-.*", system0Rewriter1Streams = "system-0-rewriter-1-.*",
        system1RewriterStreams = "system-1-rewriter-.*";
    JobConfig jobConfig = new JobConfig(new MapConfig(
        new ImmutableMap.Builder<String, String>().put(String.format(JobConfig.REGEX_RESOLVED_SYSTEM, system0Rewriter0),
            system0)
            .put(String.format(JobConfig.REGEX_RESOLVED_STREAMS, system0Rewriter0), system0Rewriter0Streams)
            .put(String.format(JobConfig.REGEX_RESOLVED_SYSTEM, system0Rewriter1), system0)
            .put(String.format(JobConfig.REGEX_RESOLVED_STREAMS, system0Rewriter1), system0Rewriter1Streams)
            .put(String.format(JobConfig.REGEX_RESOLVED_SYSTEM, system1Rewriter), system1)
            .put(String.format(JobConfig.REGEX_RESOLVED_STREAMS, system1Rewriter), system1RewriterStreams)
            // not passed in as a rewriter when calling getMonitorRegexPatternMap
            .put(String.format(JobConfig.REGEX_RESOLVED_SYSTEM, "unused-rewriter"), system0)
            .put(String.format(JobConfig.REGEX_RESOLVED_STREAMS, "unused-rewriter"), "unused-rewriter-.*")
            // should not be included since there is no regex
            .put(String.format(JobConfig.REGEX_RESOLVED_SYSTEM, systemOnlyRewriter), system0)
            .build()));
    // Pattern.equals only checks that the references are the same, so can't compare maps directly
    Map<String, Pattern> actual = jobConfig.getMonitorRegexPatternMap(String.join(",",
        ImmutableList.of(system0Rewriter0, system0Rewriter1, system1Rewriter, systemOnlyRewriter,
            "not-a-regex-rewriter")));
    // only should have rewriters for system0 and system1
    assertEquals(2, actual.size());
    assertEquals(system0Rewriter0Streams + "|" + system0Rewriter1Streams, actual.get(system0).pattern());
    assertEquals(system1RewriterStreams, actual.get(system1).pattern());

    // empty configs should produce an empty map
    jobConfig = new JobConfig(new MapConfig());
    assertEquals(Collections.<String, Pattern>emptyMap(), jobConfig.getMonitorRegexPatternMap(system0Rewriter0));
    assertEquals(Collections.<String, Pattern>emptyMap(), jobConfig.getMonitorRegexPatternMap(""));
  }

  @Test
  public void testGetRegexResolvedStreams() {
    String rewriterName = "rewriter-name", regex = "my-stream-.*";
    JobConfig jobConfig = new JobConfig(
        new MapConfig(ImmutableMap.of(String.format(JobConfig.REGEX_RESOLVED_STREAMS, rewriterName), regex)));
    assertEquals(Optional.of(regex), jobConfig.getRegexResolvedStreams(rewriterName));
    assertEquals(Optional.empty(), jobConfig.getRegexResolvedStreams("other-rewriter"));
  }

  @Test
  public void testGetRegexResolvedSystem() {
    String rewriterName = "rewriter-name", regex = "my-system-.*";
    JobConfig jobConfig = new JobConfig(
        new MapConfig(ImmutableMap.of(String.format(JobConfig.REGEX_RESOLVED_SYSTEM, rewriterName), regex)));
    assertEquals(Optional.of(regex), jobConfig.getRegexResolvedSystem(rewriterName));
    assertEquals(Optional.empty(), jobConfig.getRegexResolvedSystem("other-rewriter"));
  }

  @Test
  public void testGetRegexResolvedInheritedConfig() {
    String rewriterName = "rewriter-name";
    String key0 = "key0", value0 = "value0", key1 = "other.key1", value1 = "value1";
    JobConfig jobConfig = new JobConfig(new MapConfig(
        ImmutableMap.of(String.format(JobConfig.REGEX_INHERITED_CONFIG + "." + key0, rewriterName), value0,
            String.format(JobConfig.REGEX_INHERITED_CONFIG + "." + key1, rewriterName), value1)));
    assertEquals(new MapConfig(ImmutableMap.of(key0, value0, key1, value1)),
        jobConfig.getRegexResolvedInheritedConfig(rewriterName));
    assertEquals(new MapConfig(), jobConfig.getRegexResolvedInheritedConfig("other-rewriter"));
  }

  @Test
  public void testGetStreamJobFactoryClass() {
    String jobFactoryClass = "my.job.Factory.class";

    JobConfig jobConfig =
        new JobConfig(new MapConfig(ImmutableMap.of(JobConfig.STREAM_JOB_FACTORY_CLASS, jobFactoryClass)));
    assertEquals(Optional.of(jobFactoryClass), jobConfig.getStreamJobFactoryClass());

    jobConfig = new JobConfig(new MapConfig());
    assertEquals(Optional.empty(), jobConfig.getStreamJobFactoryClass());
  }

  @Test
  public void testGetJobId() {
    String jobId = "job-id";

    JobConfig jobConfig = new JobConfig(new MapConfig(ImmutableMap.of(JobConfig.JOB_ID, jobId)));
    assertEquals(jobId, jobConfig.getJobId());

    jobConfig = new JobConfig(new MapConfig());
    assertEquals(JobConfig.DEFAULT_JOB_ID, jobConfig.getJobId());
  }

  @Test
  public void testFailOnCheckpointValidation() {
    JobConfig jobConfig =
        new JobConfig(new MapConfig(ImmutableMap.of(JobConfig.JOB_FAIL_CHECKPOINT_VALIDATION, "true")));
    assertTrue(jobConfig.failOnCheckpointValidation());

    jobConfig = new JobConfig(new MapConfig(ImmutableMap.of(JobConfig.JOB_FAIL_CHECKPOINT_VALIDATION, "false")));
    assertFalse(jobConfig.failOnCheckpointValidation());

    jobConfig = new JobConfig(new MapConfig());
    assertTrue(jobConfig.failOnCheckpointValidation());
  }

  @Test
  public void testGetConfigRewriters() {
    String configRewriters = "rewriter0,rewriter1";

    JobConfig jobConfig = new JobConfig(new MapConfig(ImmutableMap.of(JobConfig.CONFIG_REWRITERS, configRewriters)));
    assertEquals(Optional.of(configRewriters), jobConfig.getConfigRewriters());

    jobConfig = new JobConfig(new MapConfig());
    assertEquals(Optional.empty(), jobConfig.getConfigRewriters());
  }

  @Test
  public void testGetConfigRewriterClass() {
    String rewriterName = "rewriter-name", className = "my.Rewriter.class";
    JobConfig jobConfig = new JobConfig(
        new MapConfig(ImmutableMap.of(String.format(JobConfig.CONFIG_REWRITER_CLASS, rewriterName), className)));
    assertEquals(Optional.of(className), jobConfig.getConfigRewriterClass(rewriterName));
    assertEquals(Optional.empty(), jobConfig.getConfigRewriterClass("other-rewriter"));
  }

  @Test
  public void testGetSystemStreamPartitionGrouperFactory() {
    String sspGrouperFactory = "my.ssp.grouper.Factory.class";

    JobConfig jobConfig =
        new JobConfig(new MapConfig(ImmutableMap.of(JobConfig.SSP_GROUPER_FACTORY, sspGrouperFactory)));
    assertEquals(sspGrouperFactory, jobConfig.getSystemStreamPartitionGrouperFactory());

    jobConfig = new JobConfig(new MapConfig());
    assertEquals(GroupByPartitionFactory.class.getName(), jobConfig.getSystemStreamPartitionGrouperFactory());
  }

  @Test
  public void testGetLocationIdProviderFactory() {
    String locationIdProviderFactory = "my.location.id.provider.Factory.class";

    JobConfig jobConfig = new JobConfig(
        new MapConfig(ImmutableMap.of(JobConfig.LOCATION_ID_PROVIDER_FACTORY, locationIdProviderFactory)));
    assertEquals(locationIdProviderFactory, jobConfig.getLocationIdProviderFactory());

    jobConfig = new JobConfig(new MapConfig());
    assertEquals(DefaultLocationIdProviderFactory.class.getName(), jobConfig.getLocationIdProviderFactory());
  }

  @Test
  public void testGetSecurityManagerFactory() {
    String securityManagerFactory = "my.security.manager.factory";

    JobConfig jobConfig =
        new JobConfig(new MapConfig(ImmutableMap.of(JobConfig.JOB_SECURITY_MANAGER_FACTORY, securityManagerFactory)));
    assertEquals(Optional.of(securityManagerFactory), jobConfig.getSecurityManagerFactory());

    jobConfig = new JobConfig(new MapConfig());
    assertEquals(Optional.empty(), jobConfig.getSecurityManagerFactory());
  }

  @Test
  public void testGetSSPMatcherClass() {
    String sspMatcherClass = "my.ssp.Matcher.class";

    JobConfig jobConfig = new JobConfig(new MapConfig(ImmutableMap.of(JobConfig.SSP_MATCHER_CLASS, sspMatcherClass)));
    assertEquals(Optional.of(sspMatcherClass), jobConfig.getSSPMatcherClass());

    jobConfig = new JobConfig(new MapConfig());
    assertEquals(Optional.empty(), jobConfig.getSSPMatcherClass());
  }

  @Test
  public void testGetSSPMatcherConfigRegex() {
    String sspMatcherConfigRegex = "ssp-.*";

    JobConfig jobConfig =
        new JobConfig(new MapConfig(ImmutableMap.of(JobConfig.SSP_MATCHER_CONFIG_REGEX, sspMatcherConfigRegex)));
    assertEquals(sspMatcherConfigRegex, jobConfig.getSSPMatcherConfigRegex());

    try {
      new JobConfig(new MapConfig()).getSSPMatcherConfigRegex();
      fail("Expected a SamzaException");
    } catch (SamzaException e) {
      // expected
    }
  }

  @Test
  public void testGetSSPMatcherConfigRanges() {
    String sspMatcherConfigRanges = "1,2,3";

    JobConfig jobConfig =
        new JobConfig(new MapConfig(ImmutableMap.of(JobConfig.SSP_MATCHER_CONFIG_RANGES, sspMatcherConfigRanges)));
    assertEquals(sspMatcherConfigRanges, jobConfig.getSSPMatcherConfigRanges());

    try {
      new JobConfig(new MapConfig()).getSSPMatcherConfigRanges();
      fail("Expected a SamzaException");
    } catch (SamzaException e) {
      // expected
    }
  }

  @Test
  public void testGetSSPMatcherConfigJobFactoryRegex() {
    String sspMatcherConfigJobFactoryRegex = ".*JobFactory";

    JobConfig jobConfig = new JobConfig(new MapConfig(
        ImmutableMap.of(JobConfig.SSP_MATCHER_CONFIG_JOB_FACTORY_REGEX, sspMatcherConfigJobFactoryRegex)));
    assertEquals(sspMatcherConfigJobFactoryRegex, jobConfig.getSSPMatcherConfigJobFactoryRegex());

    jobConfig = new JobConfig(new MapConfig());
    assertEquals(JobConfig.DEFAULT_SSP_MATCHER_CONFIG_JOB_FACTORY_REGEX,
        jobConfig.getSSPMatcherConfigJobFactoryRegex());
  }

  @Test
  public void testGetThreadPoolSize() {
    JobConfig jobConfig = new JobConfig(new MapConfig(
        ImmutableMap.of(JobConfig.JOB_CONTAINER_THREAD_POOL_SIZE, "10")));
    assertEquals(10, jobConfig.getThreadPoolSize());

    jobConfig = new JobConfig(new MapConfig());
    assertEquals(0, jobConfig.getThreadPoolSize());
  }

  @Test
  public void testGetDebounceTimeMs() {
    JobConfig jobConfig =
        new JobConfig(new MapConfig(ImmutableMap.of(JobConfig.JOB_DEBOUNCE_TIME_MS, Integer.toString(100))));
    assertEquals(100, jobConfig.getDebounceTimeMs());

    jobConfig = new JobConfig(new MapConfig(ImmutableMap.of(JobConfig.JOB_DEBOUNCE_TIME_MS, Integer.toString(0))));
    assertEquals(0, jobConfig.getDebounceTimeMs());

    jobConfig = new JobConfig(new MapConfig(ImmutableMap.of(JobConfig.JOB_DEBOUNCE_TIME_MS, Integer.toString(-1))));
    assertEquals(-1, jobConfig.getDebounceTimeMs());

    jobConfig = new JobConfig(new MapConfig());
    assertEquals(JobConfig.DEFAULT_DEBOUNCE_TIME_MS, jobConfig.getDebounceTimeMs());
  }

  @Test
  public void testGetNonLoggedStorePath() {
    String nonLoggedStorePath = "/path/to/non/logged/store";

    JobConfig jobConfig =
        new JobConfig(new MapConfig(ImmutableMap.of(JobConfig.JOB_NON_LOGGED_STORE_BASE_DIR, nonLoggedStorePath)));
    assertEquals(Optional.of(nonLoggedStorePath), jobConfig.getNonLoggedStorePath());

    jobConfig = new JobConfig(new MapConfig());
    assertEquals(Optional.empty(), jobConfig.getNonLoggedStorePath());
  }

  @Test
  public void testGetLoggedStorePath() {
    String loggedStorePath = "/path/to/logged/store";

    JobConfig jobConfig =
        new JobConfig(new MapConfig(ImmutableMap.of(JobConfig.JOB_LOGGED_STORE_BASE_DIR, loggedStorePath)));
    assertEquals(Optional.of(loggedStorePath), jobConfig.getLoggedStorePath());

    jobConfig = new JobConfig(new MapConfig());
    assertEquals(Optional.empty(), jobConfig.getLoggedStorePath());
  }

  @Test
  public void testGetMetadataStoreFactory() {
    String metadataStoreFactory = "my.metadata.store.Factory.class";

    JobConfig jobConfig =
        new JobConfig(new MapConfig(ImmutableMap.of(JobConfig.METADATA_STORE_FACTORY, metadataStoreFactory)));
    assertEquals(metadataStoreFactory, jobConfig.getMetadataStoreFactory());

    jobConfig = new JobConfig(new MapConfig());
    assertEquals(CoordinatorStreamMetadataStoreFactory.class.getName(), jobConfig.getMetadataStoreFactory());
  }

  @Test
  public void testGetDiagnosticsEnabled() {
    JobConfig jobConfig = new JobConfig(new MapConfig(ImmutableMap.of(JobConfig.JOB_DIAGNOSTICS_ENABLED, "true")));
    assertTrue(jobConfig.getDiagnosticsEnabled());

    jobConfig = new JobConfig(new MapConfig(ImmutableMap.of(JobConfig.JOB_DIAGNOSTICS_ENABLED, "false")));
    assertFalse(jobConfig.getDiagnosticsEnabled());

    jobConfig = new JobConfig(new MapConfig());
    assertFalse(jobConfig.getDiagnosticsEnabled());
  }

  @Test
  public void testGetJMXEnabled() {
    JobConfig jobConfig = new JobConfig(new MapConfig(ImmutableMap.of(JobConfig.JOB_JMX_ENABLED, "true")));
    assertTrue(jobConfig.getJMXEnabled());

    jobConfig = new JobConfig(new MapConfig(ImmutableMap.of(JobConfig.JOB_JMX_ENABLED, "false")));
    assertFalse(jobConfig.getJMXEnabled());

    jobConfig = new JobConfig(new MapConfig());
    assertTrue(jobConfig.getJMXEnabled());
  }

  @Test
  public void testGetSystemStreamPartitionMapperFactoryName() {
    String sspMapperFactory = "my.ssp.mapper.Factory.class";

    JobConfig jobConfig = new JobConfig(
        new MapConfig(ImmutableMap.of(JobConfig.SYSTEM_STREAM_PARTITION_MAPPER_FACTORY, sspMapperFactory)));
    assertEquals(sspMapperFactory, jobConfig.getSystemStreamPartitionMapperFactoryName());

    jobConfig = new JobConfig(new MapConfig());
    assertEquals(HashSystemStreamPartitionMapperFactory.class.getName(),
        jobConfig.getSystemStreamPartitionMapperFactoryName());
  }

  @Test
  public void testGetStandbyTasksEnabled() {
    // greater than 1 means enabled
    JobConfig jobConfig = new JobConfig(
        new MapConfig(ImmutableMap.of(JobConfig.STANDBY_TASKS_REPLICATION_FACTOR, Integer.toString(100))));
    assertTrue(jobConfig.getStandbyTasksEnabled());

    // zero means disabled
    jobConfig =
        new JobConfig(new MapConfig(ImmutableMap.of(JobConfig.STANDBY_TASKS_REPLICATION_FACTOR, Integer.toString(0))));
    assertFalse(jobConfig.getStandbyTasksEnabled());

    // negative means disabled
    jobConfig =
        new JobConfig(new MapConfig(ImmutableMap.of(JobConfig.STANDBY_TASKS_REPLICATION_FACTOR, Integer.toString(-1))));
    assertFalse(jobConfig.getStandbyTasksEnabled());

    // not specified uses the default standby count, which means disabled
    jobConfig = new JobConfig(new MapConfig());
    assertFalse(jobConfig.getStandbyTasksEnabled());
  }

  @Test
  public void testGetStandbyTaskReplicationFactor() {
    JobConfig jobConfig = new JobConfig(
        new MapConfig(ImmutableMap.of(JobConfig.STANDBY_TASKS_REPLICATION_FACTOR, Integer.toString(100))));
    assertEquals(100, jobConfig.getStandbyTaskReplicationFactor());

    jobConfig =
        new JobConfig(new MapConfig(ImmutableMap.of(JobConfig.STANDBY_TASKS_REPLICATION_FACTOR, Integer.toString(0))));
    assertEquals(0, jobConfig.getStandbyTaskReplicationFactor());

    jobConfig =
        new JobConfig(new MapConfig(ImmutableMap.of(JobConfig.STANDBY_TASKS_REPLICATION_FACTOR, Integer.toString(-1))));
    assertEquals(-1, jobConfig.getStandbyTaskReplicationFactor());

    jobConfig = new JobConfig(new MapConfig());
    assertEquals(JobConfig.DEFAULT_STANDBY_TASKS_REPLICATION_FACTOR, jobConfig.getStandbyTaskReplicationFactor());
  }

  @Test
  public void testGetClusterBasedJobCoordinatorDependencyIsolationEnabled() {
    Config config = new MapConfig(ImmutableMap.of(JobConfig.JOB_SPLIT_DEPLOYMENT_ENABLED, "true"));
    assertTrue(new JobConfig(config).isSplitDeploymentEnabled());

    config = new MapConfig(ImmutableMap.of(JobConfig.JOB_SPLIT_DEPLOYMENT_ENABLED, "false"));
    assertFalse(new JobConfig(config).isSplitDeploymentEnabled());

    assertFalse(new JobConfig(new MapConfig()).isSplitDeploymentEnabled());
  }

  @Test
  public void testGetMetadataFile() {
    String execEnvContainerId = "container-id";
    String containerMetadataDirectory = "/tmp/samza/log/dir";
    System.setProperty(JobConfig.CONTAINER_METADATA_DIRECTORY_SYS_PROPERTY, containerMetadataDirectory);
    assertEquals(new File(containerMetadataDirectory,
            String.format(JobConfig.CONTAINER_METADATA_FILENAME_FORMAT, execEnvContainerId)).getPath(),
        JobConfig.getMetadataFile(execEnvContainerId).get().getPath());
    System.clearProperty(JobConfig.CONTAINER_METADATA_DIRECTORY_SYS_PROPERTY);

    assertEquals(Optional.empty(), JobConfig.getMetadataFile(null));
  }

  @Test
  public void testGetCoordinatorStreamFactory() {
    JobConfig jobConfig = new JobConfig(new MapConfig(ImmutableMap.of("test", "")));
    assertEquals(jobConfig.getCoordinatorStreamFactory(), JobConfig.DEFAULT_COORDINATOR_STREAM_CONFIG_FACTORY);

    jobConfig = new JobConfig(new MapConfig(ImmutableMap.of(JobConfig.COORDINATOR_STREAM_FACTORY, "specific_coordinator_stream")));
    assertEquals(jobConfig.getCoordinatorStreamFactory(), "specific_coordinator_stream");
  }

  @Test
  public void testAutosizingConfig() {
    Map<String, String> config = new HashMap<>();
    config.put("job.autosizing.enabled", "true");
    config.put("job.container.count", "1");
    config.put("job.autosizing.container.count", "2");
    config.put("job.container.thread.pool.size", "1");
    config.put("job.autosizing.container.thread.pool.size", "3");
    config.put("job.autosizing.container.maxheap.mb", "500");

    config.put("cluster-manager.container.memory.mb", "500");
    config.put("job.autosizing.container.memory.mb", "900");
    config.put("cluster-manager.container.cpu.cores", "1");
    config.put("job.autosizing.container.cpu.cores", "2");
    JobConfig jobConfig = new JobConfig(new MapConfig(config));
    Assert.assertTrue(jobConfig.getAutosizingEnabled());
    Assert.assertEquals(2, jobConfig.getContainerCount());
    Assert.assertEquals(3, jobConfig.getThreadPoolSize());

    ClusterManagerConfig clusterManagerConfig = new ClusterManagerConfig(new MapConfig(config));
    Assert.assertEquals(900, clusterManagerConfig.getContainerMemoryMb());
    Assert.assertEquals(2, clusterManagerConfig.getNumCores());
  }
}