/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

package org.apache.samza.config;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang3.StringUtils;
import org.apache.samza.SamzaException;
import org.apache.samza.system.SystemAdmin;
import org.apache.samza.system.SystemFactory;
import org.apache.samza.util.ReflectionUtil;

/**
 * Config helper methods related to systems.
 */
public class SystemConfig extends MapConfig {
  private static final String SYSTEMS_PREFIX = "systems.";
  public static final String SYSTEM_ID_PREFIX = SYSTEMS_PREFIX + "%s.";

  private static final String SYSTEM_FACTORY_SUFFIX = ".samza.factory";
  public static final String SYSTEM_FACTORY_FORMAT = SYSTEMS_PREFIX + "%s" + SYSTEM_FACTORY_SUFFIX;
  public static final String SYSTEM_DEFAULT_STREAMS_PREFIX_FORMAT = SYSTEM_ID_PREFIX + "default.stream.";

  // If true, automatically delete committed messages from streams whose committed messages can be deleted.
  // A stream's committed messages can be deleted if it is a intermediate stream, or if user has manually
  // set streams.{streamId}.samza.delete.committed.messages to true in the configuration.
  @VisibleForTesting
  static final String DELETE_COMMITTED_MESSAGES = SYSTEM_ID_PREFIX + "samza.delete.committed.messages";

  private static final String EMPTY = "";

  static final String SAMZA_SYSTEM_OFFSET_UPCOMING = "upcoming";
  static final String SAMZA_SYSTEM_OFFSET_OLDEST = "oldest";

  private Map<String, SystemAdmin> systemAdminMap = new HashMap<>();

  public SystemConfig(Config config) {
    super(config);
  }

  public Optional<String> getSystemFactory(String systemName) {
    if (systemName == null) {
      return Optional.empty();
    }
    String systemFactory = String.format(SYSTEM_FACTORY_FORMAT, systemName);
    String value = get(systemFactory, null);
    return (StringUtils.isBlank(value)) ? Optional.empty() : Optional.of(value);
  }

  /**
   * Get a list of system names.
   *
   * @return A list system names
   */
  public List<String> getSystemNames() {
    Config subConf = subset(SYSTEMS_PREFIX, true);
    ArrayList<String> systemNames = new ArrayList<>();
    for (Map.Entry<String, String> entry : subConf.entrySet()) {
      String key = entry.getKey();
      if (key.endsWith(SYSTEM_FACTORY_SUFFIX)) {
        systemNames.add(key.replace(SYSTEM_FACTORY_SUFFIX, EMPTY));
      }
    }
    return systemNames;
  }

  /**
   * Get {@link SystemAdmin} instances for all the systems defined in this config.
   *
   * @return map of system name to {@link SystemAdmin}
   */
  public Map<String, SystemAdmin> getSystemAdmins(String adminLabel) {
    if (systemAdminMap.isEmpty()) {
      systemAdminMap = getSystemFactories().entrySet()
          .stream()
          .collect(Collectors.toMap(Entry::getKey,
            systemNameToFactoryEntry -> systemNameToFactoryEntry.getValue()
               .getAdmin(systemNameToFactoryEntry.getKey(), this, adminLabel)));
    }
    return systemAdminMap;
  }

  public Map<String, SystemAdmin> getSystemAdmins() {
    return getSystemAdmins("");
  }

  /**
   * Get {@link SystemAdmin} instance for given system name.
   *
   * @param systemName System name
   * @return SystemAdmin of the system if it exists, otherwise null.
   */
  public SystemAdmin getSystemAdmin(String systemName, String adminLabel) {
    if (systemAdminMap.containsKey(systemName) && systemAdminMap.get(systemName).isStopped()) {
      systemAdminMap.clear();
    }
    return getSystemAdmins(adminLabel).get(systemName);
  }

  public SystemAdmin getSystemAdmin(String systemName) {
    return getSystemAdmin(systemName, "");
  }

  /**
   * Get {@link SystemFactory} instances for all the systems defined in this config.
   *
   * @return a map from system name to {@link SystemFactory}
   */
  public Map<String, SystemFactory> getSystemFactories() {
    Map<String, SystemFactory> systemFactories = getSystemNames().stream().collect(Collectors.toMap(
      systemName -> systemName,
      systemName -> {
        String systemFactoryClassName = getSystemFactory(systemName).orElseThrow(() -> new SamzaException(
            String.format("A stream uses system %s, which is missing from the configuration.", systemName)));
        return ReflectionUtil.getObj(systemFactoryClassName, SystemFactory.class);
      }));

    return systemFactories;
  }

  /**
   * Gets the system-wide defaults for streams.
   *
   * @param systemName the name of the system for which the defaults will be returned.
   * @return a subset of the config with the system prefix removed.
   */
  public Config getDefaultStreamProperties(String systemName) {
    return subset(String.format(SYSTEM_DEFAULT_STREAMS_PREFIX_FORMAT, systemName), true);
  }

  /**
   * Get system offset default value.
   * systems.'system'.default.stream.samza.offset.default is the config.
   * systems.'system'.samza.offset.default is the deprecated setting, but needs to be checked for backward compatibility.
   * @param systemName get config value for this system.
   * @return value of system reset or default ("upcoming") if none set.
   */
  public String getSystemOffsetDefault(String systemName) {
    // first check stream system default
    String systemOffsetDefault = get(String.format("systems.%s.default.stream.samza.offset.default", systemName));

    // if not set, check the deprecated setting
    if (StringUtils.isBlank(systemOffsetDefault)) {
      systemOffsetDefault = get(String.format("systems.%s.samza.offset.default", systemName));
      if (StringUtils.isBlank(systemOffsetDefault)) {
        return SAMZA_SYSTEM_OFFSET_UPCOMING;
      }
    }

    return systemOffsetDefault;
  }

  /**
   * @param systemName name of the system
   * @return the key serde for the {@code systemName}, or empty if it was not found
   */
  public Optional<String> getSystemKeySerde(String systemName) {
    return getSystemDefaultStreamProperty(systemName, StreamConfig.KEY_SERDE);
  }

  /**
   * @param systemName name of the system
   * @return the message serde for the {@code systemName}, or empty if it was not found
   */
  public Optional<String> getSystemMsgSerde(String systemName) {
    return getSystemDefaultStreamProperty(systemName, StreamConfig.MSG_SERDE);
  }

  /**
   * @param systemName name of the system
   * @return if messages committed to this system should automatically be deleted
   */
  public boolean deleteCommittedMessages(String systemName) {
    return getBoolean(String.format(DELETE_COMMITTED_MESSAGES, systemName), false);
  }

  /**
   * Gets the system-wide default for the {@code propertyName} for the {@code systemName}.
   * This will check in a couple of different config locations for the value.
   */
  private Optional<String> getSystemDefaultStreamProperty(String systemName, String propertyName) {
    Map<String, String> defaultStreamProperties = getDefaultStreamProperties(systemName);
    String defaultStreamProperty = defaultStreamProperties.get(propertyName);
    if (StringUtils.isNotEmpty(defaultStreamProperty)) {
      return Optional.of(defaultStreamProperty);
    } else {
      String fallbackStreamProperty = get(String.format(SYSTEM_ID_PREFIX, systemName) + propertyName);
      if (StringUtils.isNotEmpty(fallbackStreamProperty)) {
        return Optional.of(fallbackStreamProperty);
      } else {
        return Optional.empty();
      }
    }
  }
}
