/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package org.apache.samza.config;

import java.io.File;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.regex.Pattern;
import java.util.stream.Stream;
import org.apache.samza.SamzaException;
import org.apache.samza.container.grouper.stream.GroupByPartitionFactory;
import org.apache.samza.container.grouper.stream.HashSystemStreamPartitionMapperFactory;
import org.apache.samza.coordinator.metadatastore.CoordinatorStreamMetadataStoreFactory;
import org.apache.samza.runtime.DefaultLocationIdProviderFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class JobConfig extends MapConfig {
  private static final Logger LOG = LoggerFactory.getLogger(JobConfig.class);

  public static final String STREAM_JOB_FACTORY_CLASS = "job.factory.class";

  /**
   * job.config.rewriters is a CSV list of config rewriter names. Each name is determined
   * by the %s value in job.config.rewriter.%s.class. For example, if you define
   * job.config.rewriter.some-regex.class=org.apache.samza.config.RegExTopicGenerator,
   * then the rewriter config would be set to job.config.rewriters = some-regex.
   */
  public static final String CONFIG_REWRITERS = "job.config.rewriters";
  public static final String CONFIG_REWRITER_CLASS = "job.config.rewriter.%s.class";

  /**
   * job.config.loader.factory specifies {@link ConfigLoaderFactory} to get {@link ConfigLoader}
   */
  public static final String CONFIG_LOADER_FACTORY = "job.config.loader.factory";

  public static final String JOB_NAME = "job.name";
  public static final String JOB_ID = "job.id";
  static final String DEFAULT_JOB_ID = "1";

  public static final String JOB_COORDINATOR_SYSTEM = "job.coordinator.system";
  public static final String JOB_DEFAULT_SYSTEM = "job.default.system";

  public static final String JOB_JMX_ENABLED = "job.jmx.enabled";
  public static final String JOB_CONTAINER_COUNT = "job.container.count";
  static final int DEFAULT_JOB_CONTAINER_COUNT = 1;
  public static final String JOB_CONTAINER_THREAD_POOL_SIZE = "job.container.thread.pool.size";
  public static final String JOB_INTERMEDIATE_STREAM_PARTITIONS = "job.intermediate.stream.partitions";

  public static final String JOB_DEBOUNCE_TIME_MS = "job.debounce.time.ms";
  static final int DEFAULT_DEBOUNCE_TIME_MS = 20000;

  public static final String SSP_INPUT_EXPANSION_ENABLED = "job.systemstreampartition.input.expansion.enabled";
  public static final boolean DEFAULT_INPUT_EXPANSION_ENABLED = true;

  public static final String SSP_GROUPER_FACTORY = "job.systemstreampartition.grouper.factory";
  public static final String SSP_MATCHER_CLASS = "job.systemstreampartition.matcher.class";
  public static final String SSP_MATCHER_CLASS_REGEX = "org.apache.samza.system.RegexSystemStreamPartitionMatcher";
  public static final String SSP_MATCHER_CLASS_RANGE = "org.apache.samza.system.RangeSystemStreamPartitionMatcher";
  public static final String SSP_MATCHER_CONFIG_REGEX = "job.systemstreampartition.matcher.config.regex";
  public static final String SSP_MATCHER_CONFIG_RANGES = "job.systemstreampartition.matcher.config.ranges";
  public static final String SSP_MATCHER_CONFIG_JOB_FACTORY_REGEX =
      "job.systemstreampartition.matcher.config.job.factory.regex";
  static final String DEFAULT_SSP_MATCHER_CONFIG_JOB_FACTORY_REGEX =
      "org\\.apache\\.samza\\.job\\.local(.*ProcessJobFactory|.*ThreadJobFactory)";
  public static final String SYSTEM_STREAM_PARTITION_MAPPER_FACTORY = "job.system.stream.partition.mapper.factory";

  // number of partitions in the checkpoint stream should be 1. But sometimes,
  // if a stream was created(automatically) with the wrong number of partitions(default number of partitions
  // for new streams), there is no easy fix for the user (topic deletion or reducing of number of partitions
  // is not yet supported, and auto-creation of the topics cannot be always easily tuned off).
  // So we add a setting that allows for the job to continue even though number of partitions is not 1.
  public static final String JOB_FAIL_CHECKPOINT_VALIDATION = "job.checkpoint.validation.enabled";
  public static final String MONITOR_PARTITION_CHANGE = "job.coordinator.monitor-partition-change";
  public static final String MONITOR_PARTITION_CHANGE_FREQUENCY_MS =
      "job.coordinator.monitor-partition-change.frequency.ms";
  static final int DEFAULT_MONITOR_PARTITION_CHANGE_FREQUENCY_MS = 300000;

  public static final String MONITOR_INPUT_REGEX_FREQUENCY_MS = "job.coordinator.monitor-input-regex.frequency.ms";
  static final int DEFAULT_MONITOR_INPUT_REGEX_FREQUENCY_MS = 300000;

  public static final String REGEX_RESOLVED_STREAMS = "job.config.rewriter.%s.regex";
  public static final String REGEX_RESOLVED_SYSTEM = "job.config.rewriter.%s.system";
  public static final String REGEX_INHERITED_CONFIG = "job.config.rewriter.%s.config";

  public static final String JOB_SECURITY_MANAGER_FACTORY = "job.security.manager.factory";

  public static final String METADATA_STORE_FACTORY = "metadata.store.factory";
  public static final String STARTPOINT_METADATA_STORE_FACTORY = "startpoint.metadata.store.factory";

  public static final String LOCATION_ID_PROVIDER_FACTORY = "locationid.provider.factory";

  // Processor Config Constants
  public static final String PROCESSOR_ID = "processor.id";
  public static final String PROCESSOR_LIST = "processor.list";

  // Represents the store path for non-changelog stores.
  public static final String JOB_NON_LOGGED_STORE_BASE_DIR = "job.non-logged.store.base.dir";

  // Represents the store path for stores with changelog enabled. Typically the stores are not cleaned up
  // across application restarts
  public static final String JOB_LOGGED_STORE_BASE_DIR = "job.logged.store.base.dir";

  // Enables diagnostic appender for logging exception events
  public static final String JOB_DIAGNOSTICS_ENABLED = "job.diagnostics.enabled";

  // Enables standby tasks
  public static final String STANDBY_TASKS_REPLICATION_FACTOR = "job.standbytasks.replication.factor";
  static final int DEFAULT_STANDBY_TASKS_REPLICATION_FACTOR = 1;

  // Naming format and directory for container.metadata file
  public static final String CONTAINER_METADATA_FILENAME_FORMAT = "%s.metadata"; // Filename: <containerID>.metadata
  public static final String CONTAINER_METADATA_DIRECTORY_SYS_PROPERTY = "samza.log.dir";

  // Auto-sizing related configs that take precedence over respective sizing confings job.container.count, etc,
  // *only* when job.autosizing.enabled is true. Otherwise current behavior is maintained.
  private static final String JOB_AUTOSIZING_CONFIG_PREFIX = "job.autosizing."; // used to determine if a config is related to autosizing
  public static final String JOB_AUTOSIZING_ENABLED = JOB_AUTOSIZING_CONFIG_PREFIX + "enabled";
  public static final String JOB_AUTOSIZING_CONTAINER_COUNT = JOB_AUTOSIZING_CONFIG_PREFIX + "container.count";
  public static final String JOB_AUTOSIZING_CONTAINER_THREAD_POOL_SIZE = JOB_AUTOSIZING_CONFIG_PREFIX + "container.thread.pool.size";
  public static final String JOB_AUTOSIZING_CONTAINER_MAX_HEAP_MB = JOB_AUTOSIZING_CONFIG_PREFIX + "container.maxheap.mb";
  public static final String JOB_AUTOSIZING_CONTAINER_MEMORY_MB = JOB_AUTOSIZING_CONFIG_PREFIX + "container.memory.mb";
  public static final String JOB_AUTOSIZING_CONTAINER_MAX_CORES = JOB_AUTOSIZING_CONFIG_PREFIX + "container.cpu.cores";

  public static final String COORDINATOR_STREAM_FACTORY = "job.coordinatorstream.config.factory";
  public static final String DEFAULT_COORDINATOR_STREAM_CONFIG_FACTORY = "org.apache.samza.util.DefaultCoordinatorStreamConfigFactory";

  public static final String JOB_SPLIT_DEPLOYMENT_ENABLED = "job.split.deployment.enabled";

  private static final String JOB_STARTPOINT_ENABLED = "job.startpoint.enabled";

  public JobConfig(Config config) {
    super(config);
  }

  public Optional<String> getName() {
    return Optional.ofNullable(get(JOB_NAME));
  }

  public String getCoordinatorSystemName() {
    return Optional.ofNullable(getCoordinatorSystemNameOrNull())
        .orElseThrow(() -> new ConfigException(
            "Missing job.coordinator.system configuration. Cannot proceed with job execution."));
  }

  /**
   * Gets the System to use for reading/writing the coordinator stream. Uses the following precedence.
   *
   * 1. If job.coordinator.system is defined, that value is used.
   * 2. If job.default.system is defined, that value is used.
   * 3. None
   */
  public String getCoordinatorSystemNameOrNull() {
    return Optional.ofNullable(get(JOB_COORDINATOR_SYSTEM)).orElseGet(() -> getDefaultSystem().orElse(null));
  }

  public Optional<String> getDefaultSystem() {
    return Optional.ofNullable(get(JOB_DEFAULT_SYSTEM));
  }

  /**
   * Return the value of JOB_CONTAINER_COUNT or "yarn.container.count" (in that order) if autosizing is not enabled,
   * otherwise returns the value of JOB_AUTOSIZING_CONTAINER_COUNT.
   * @return
   */
  public int getContainerCount() {
    Optional<String> autosizingContainerCountValue = Optional.ofNullable(get(JOB_AUTOSIZING_CONTAINER_COUNT));
    Optional<String> jobContainerCountValue = Optional.ofNullable(get(JOB_CONTAINER_COUNT));

    if (getAutosizingEnabled() && autosizingContainerCountValue.isPresent()) {
      return Integer.parseInt(autosizingContainerCountValue.get());
    } else if (jobContainerCountValue.isPresent()) {
      return Integer.parseInt(jobContainerCountValue.get());
    } else {
      // To maintain backwards compatibility, honor yarn.container.count for now.
      // TODO get rid of this in a future release.
      Optional<String> yarnContainerCountValue = Optional.ofNullable(get("yarn.container.count"));
      if (yarnContainerCountValue.isPresent()) {
        LOG.warn("Configuration 'yarn.container.count' is deprecated. Please use {}.", JOB_CONTAINER_COUNT);
        return Integer.parseInt(yarnContainerCountValue.get());
      } else {
        return DEFAULT_JOB_CONTAINER_COUNT;
      }
    }
  }

  public int getMonitorRegexFrequency() {
    return getInt(MONITOR_INPUT_REGEX_FREQUENCY_MS, DEFAULT_MONITOR_INPUT_REGEX_FREQUENCY_MS);
  }

  public boolean getMonitorRegexDisabled() {
    return getMonitorRegexFrequency() <= 0;
  }

  public int getMonitorPartitionChangeFrequency() {
    return getInt(MONITOR_PARTITION_CHANGE_FREQUENCY_MS, DEFAULT_MONITOR_PARTITION_CHANGE_FREQUENCY_MS);
  }

  /**
   * Compile a map of each input-system to its corresponding input-monitor-regex patterns.
   */
  public Map<String, Pattern> getMonitorRegexPatternMap(String rewritersList) {
    Map<String, Pattern> inputRegexesToMonitor = new HashMap<>();
    Stream.of(rewritersList.split(",")).forEach(rewriterName -> {
      Optional<String> rewriterSystem = getRegexResolvedSystem(rewriterName);
      Optional<String> rewriterRegex = getRegexResolvedStreams(rewriterName);
      if (rewriterSystem.isPresent() && rewriterRegex.isPresent()) {
        Pattern newPatternForSystem;
        Pattern existingPatternForSystem = inputRegexesToMonitor.get(rewriterSystem.get());
        if (existingPatternForSystem == null) {
          newPatternForSystem = Pattern.compile(rewriterRegex.get());
        } else {
          newPatternForSystem =
              Pattern.compile(String.join("|", existingPatternForSystem.pattern(), rewriterRegex.get()));
        }
        inputRegexesToMonitor.put(rewriterSystem.get(), newPatternForSystem);
      }
    });
    return inputRegexesToMonitor;
  }

  public Optional<String> getRegexResolvedStreams(String rewriterName) {
    return Optional.ofNullable(get(String.format(REGEX_RESOLVED_STREAMS, rewriterName)));
  }

  public Optional<String> getRegexResolvedSystem(String rewriterName) {
    return Optional.ofNullable(get(String.format(REGEX_RESOLVED_SYSTEM, rewriterName)));
  }

  public Config getRegexResolvedInheritedConfig(String rewriterName) {
    return subset(String.format(REGEX_INHERITED_CONFIG, rewriterName) + ".", true);
  }

  public Optional<String> getStreamJobFactoryClass() {
    return Optional.ofNullable(get(STREAM_JOB_FACTORY_CLASS));
  }

  public String getJobId() {
    return Optional.ofNullable(get(JOB_ID)).orElse(DEFAULT_JOB_ID);
  }

  public boolean failOnCheckpointValidation() {
    return getBoolean(JOB_FAIL_CHECKPOINT_VALIDATION, true);
  }

  public Optional<String> getConfigRewriters() {
    return Optional.ofNullable(get(CONFIG_REWRITERS));
  }

  public Optional<String> getConfigRewriterClass(String name) {
    return Optional.ofNullable(get(String.format(CONFIG_REWRITER_CLASS, name)));
  }

  public boolean isSSPGrouperProxyEnabled() {
    return getBoolean(SSP_INPUT_EXPANSION_ENABLED, DEFAULT_INPUT_EXPANSION_ENABLED);
  }

  public String getSystemStreamPartitionGrouperFactory() {
    return Optional.ofNullable(get(SSP_GROUPER_FACTORY)).orElseGet(GroupByPartitionFactory.class::getName);
  }

  public String getLocationIdProviderFactory() {
    return Optional.ofNullable(get(LOCATION_ID_PROVIDER_FACTORY))
        .orElseGet(DefaultLocationIdProviderFactory.class::getName);
  }

  public Optional<String> getSecurityManagerFactory() {
    return Optional.ofNullable(get(JOB_SECURITY_MANAGER_FACTORY));
  }

  public Optional<String> getSSPMatcherClass() {
    return Optional.ofNullable(get(SSP_MATCHER_CLASS));
  }

  public String getSSPMatcherConfigRegex() {
    return Optional.ofNullable(get(SSP_MATCHER_CONFIG_REGEX))
        .orElseThrow(
          () -> new SamzaException(String.format("Missing required configuration: '%s'", SSP_MATCHER_CONFIG_REGEX)));
  }

  public String getSSPMatcherConfigRanges() {
    return Optional.ofNullable(get(SSP_MATCHER_CONFIG_RANGES))
        .orElseThrow(
          () -> new SamzaException(String.format("Missing required configuration: '%s'", SSP_MATCHER_CONFIG_RANGES)));
  }

  public String getSSPMatcherConfigJobFactoryRegex() {
    return get(SSP_MATCHER_CONFIG_JOB_FACTORY_REGEX, DEFAULT_SSP_MATCHER_CONFIG_JOB_FACTORY_REGEX);
  }

  /**
   * Return the value of JOB_CONTAINER_THREAD_POOL_SIZE if autosizing is not enabled,
   * otherwise returns the value of JOB_AUTOSIZING_CONTAINER_THREAD_POOL_SIZE.
   * @return
   */
  public int getThreadPoolSize() {
    Optional<String> autosizingContainerThreadPoolSize = Optional.ofNullable(get(
        JOB_AUTOSIZING_CONTAINER_THREAD_POOL_SIZE));
    if (getAutosizingEnabled() && autosizingContainerThreadPoolSize.isPresent()) {
      return Integer.parseInt(autosizingContainerThreadPoolSize.get());
    } else {
      return getInt(JOB_CONTAINER_THREAD_POOL_SIZE, 0);
    }
  }

  public int getDebounceTimeMs() {
    return getInt(JOB_DEBOUNCE_TIME_MS, DEFAULT_DEBOUNCE_TIME_MS);
  }

  public Optional<String> getNonLoggedStorePath() {
    return Optional.ofNullable(get(JOB_NON_LOGGED_STORE_BASE_DIR));
  }

  public Optional<String> getLoggedStorePath() {
    return Optional.ofNullable(get(JOB_LOGGED_STORE_BASE_DIR));
  }

  public String getMetadataStoreFactory() {
    return get(METADATA_STORE_FACTORY, CoordinatorStreamMetadataStoreFactory.class.getName());
  }

  public boolean getDiagnosticsEnabled() {
    return getBoolean(JOB_DIAGNOSTICS_ENABLED, false);
  }

  public boolean getAutosizingEnabled() {
    return getBoolean(JOB_AUTOSIZING_ENABLED, false);
  }

  /**
   * Check if a given config parameter is an internal autosizing related config, based on
   * its name having the prefix "job.autosizing"
   * @param configParam the config param to determine
   * @return true if the config is related to autosizing, false otherwise
   */
  public static boolean isAutosizingConfig(String configParam) {
    return configParam.startsWith(JOB_AUTOSIZING_CONFIG_PREFIX);
  }

  public boolean getJMXEnabled() {
    return getBoolean(JOB_JMX_ENABLED, true);
  }

  public String getSystemStreamPartitionMapperFactoryName() {
    return get(SYSTEM_STREAM_PARTITION_MAPPER_FACTORY, HashSystemStreamPartitionMapperFactory.class.getName());
  }

  public int getStandbyTaskReplicationFactor() {
    return getInt(STANDBY_TASKS_REPLICATION_FACTOR, DEFAULT_STANDBY_TASKS_REPLICATION_FACTOR);
  }

  public boolean getStandbyTasksEnabled() {
    return getStandbyTaskReplicationFactor() > 1;
  }

  public boolean isSplitDeploymentEnabled() {
    return getBoolean(JOB_SPLIT_DEPLOYMENT_ENABLED, false);
  }

  /**
   * The metadata file is written in a {@code exec-env-container-id}.metadata file in the log-dir of the container.
   * Here the {@code exec-env-container-id} refers to the ID assigned by the cluster manager (e.g., YARN) to the container,
   * which uniquely identifies a container's lifecycle.
   */
  public static Optional<File> getMetadataFile(String execEnvContainerId) {
    String dir = System.getProperty(CONTAINER_METADATA_DIRECTORY_SYS_PROPERTY);
    if (dir == null || execEnvContainerId == null) {
      return Optional.empty();
    } else {
      return Optional.of(
          new File(dir, String.format(CONTAINER_METADATA_FILENAME_FORMAT, execEnvContainerId)));
    }
  }

  /**
   * Get coordinatorStreamFactory according to the configs
   * @return the name of coordinatorStreamFactory
   */
  public String getCoordinatorStreamFactory() {
    return get(COORDINATOR_STREAM_FACTORY, DEFAULT_COORDINATOR_STREAM_CONFIG_FACTORY);
  }

  /**
   * Get config loader factory according to the configs
   * @return full qualified name of {@link ConfigLoaderFactory}
   */
  public Optional<String> getConfigLoaderFactory() {
    return Optional.ofNullable(get(CONFIG_LOADER_FACTORY));
  }

  public boolean getStartpointEnabled() {
    return getBoolean(JOB_STARTPOINT_ENABLED, true);
  }
}