/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package org.apache.samza.execution;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import org.apache.samza.SamzaException;
import org.apache.samza.checkpoint.CheckpointManager;
import org.apache.samza.config.*;
import org.apache.samza.metrics.MetricsRegistryMap;
import org.apache.samza.system.StreamSpec;
import org.apache.samza.system.SystemAdmin;
import org.apache.samza.system.SystemAdmins;
import org.apache.samza.system.SystemStream;
import org.apache.samza.system.SystemStreamMetadata;
import org.apache.samza.util.StreamUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class StreamManager {
  private static final Logger LOGGER = LoggerFactory.getLogger(StreamManager.class);

  private final SystemAdmins systemAdmins;

  public StreamManager(Config config) {
    this(new SystemAdmins(config, StreamManager.class.getSimpleName()));
  }

  @VisibleForTesting
  StreamManager(SystemAdmins systemAdmins) {
    this.systemAdmins = systemAdmins;
  }

  public void createStreams(List<StreamSpec> streams) {
    Multimap<String, StreamSpec> streamsGroupedBySystem = HashMultimap.create();
    streams.forEach(streamSpec ->
      streamsGroupedBySystem.put(streamSpec.getSystemName(), streamSpec));

    for (Map.Entry<String, Collection<StreamSpec>> entry : streamsGroupedBySystem.asMap().entrySet()) {
      String systemName = entry.getKey();
      SystemAdmin systemAdmin = systemAdmins.getSystemAdmin(systemName);

      for (StreamSpec stream : entry.getValue()) {
        LOGGER.info("Creating stream {} with partitions {} on system {}",
            new Object[]{stream.getPhysicalName(), stream.getPartitionCount(), systemName});
        systemAdmin.createStream(stream);
      }
    }
  }

  public void start() {
    this.systemAdmins.start();
  }

  public void stop() {
    this.systemAdmins.stop();
  }

  Map<String, Integer> getStreamPartitionCounts(String systemName, Set<String> streamNames) {
    Map<String, Integer> streamToPartitionCount = new HashMap<>();

    SystemAdmin systemAdmin = systemAdmins.getSystemAdmin(systemName);
    if (systemAdmin == null) {
      throw new SamzaException(String.format("System %s does not exist.", systemName));
    }

    // retrieve the metadata for the streams in this system
    Map<String, SystemStreamMetadata> streamToMetadata = systemAdmin.getSystemStreamMetadata(streamNames);
    // set the partitions of a stream to its StreamEdge
    streamToMetadata.forEach((stream, data) ->
      streamToPartitionCount.put(stream, data.getSystemStreamPartitionMetadata().size()));

    return streamToPartitionCount;
  }

  /**
   * This is a best-effort approach to clear the internal streams from previous run, including intermediate streams,
   * checkpoint stream and changelog streams.
   * For batch processing, we always clean up the previous internal streams and create a new set for each run.
   * @param prevConfig config of the previous run
   */
  public void clearStreamsFromPreviousRun(Config prevConfig) {
    try {
      ApplicationConfig appConfig = new ApplicationConfig(prevConfig);
      LOGGER.info("run.id from previous run is {}", appConfig.getRunId());

      StreamConfig streamConfig = new StreamConfig(prevConfig);

      //Find all intermediate streams and clean up
      Set<StreamSpec> intStreams = streamConfig.getStreamIds().stream()
          .filter(streamConfig::getIsIntermediateStream)
          .map(id -> new StreamSpec(id, streamConfig.getPhysicalName(id), streamConfig.getSystem(id)))
          .collect(Collectors.toSet());
      intStreams.forEach(stream -> {
        LOGGER.info("Clear intermediate stream {} in system {}", stream.getPhysicalName(), stream.getSystemName());
        systemAdmins.getSystemAdmin(stream.getSystemName()).clearStream(stream);
      });

      //Find checkpoint stream and clean up
      TaskConfig taskConfig = new TaskConfig(prevConfig);
      taskConfig.getCheckpointManager(new MetricsRegistryMap()).ifPresent(CheckpointManager::clearCheckpoints);

      //Find changelog streams and remove them
      StorageConfig storageConfig = new StorageConfig(prevConfig);
      for (String store : storageConfig.getStoreNames()) {
        String changelog = storageConfig.getChangelogStream(store).orElse(null);
        if (changelog != null) {
          LOGGER.info("Clear store {} changelog {}", store, changelog);
          SystemStream systemStream = StreamUtil.getSystemStreamFromNames(changelog);
          StreamSpec spec = StreamSpec.createChangeLogStreamSpec(systemStream.getStream(), systemStream.getSystem(), 1);
          systemAdmins.getSystemAdmin(spec.getSystemName()).clearStream(spec);
        }
      }
    } catch (Exception e) {
      // For batch, we always create a new set of internal streams (checkpoint, changelog and intermediate) with unique
      // id. So if clearStream doesn't work, it won't affect the correctness of the results.
      // We log a warning here and rely on retention to clean up the streams later.
      LOGGER.warn("Fail to clear internal streams from previous run. Please clean up manually.", e);
    }
  }

  /**
   * Create a unique stream name if it's batch mode and has a valid run.id.
   * @param stream physical name of the stream
   * @param config {@link Config} object
   * @return stream name created
   */
  public static String createUniqueNameForBatch(String stream, Config config) {
    ApplicationConfig appConfig = new ApplicationConfig(config);
    if (appConfig.getAppMode() == ApplicationConfig.ApplicationMode.BATCH && appConfig.getRunId() != null) {
      return stream + "-" + appConfig.getRunId();
    } else {
      return stream;
    }
  }
}
