/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

package org.apache.iotdb.db.engine.snapshot;

import org.apache.iotdb.commons.utils.FileUtils;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.modification.ModificationFile;
import org.apache.iotdb.db.engine.snapshot.exception.DirectoryNotLegalException;
import org.apache.iotdb.db.engine.storagegroup.DataRegion;
import org.apache.iotdb.db.engine.storagegroup.TsFileManager;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.List;
import java.util.Objects;

/**
 * SnapshotTaker takes data snapshot for a DataRegion in one time. It does so by creating hard link
 * for files or copying them. SnapshotTaker supports two different ways of snapshot: Full Snapshot
 * and Incremental Snapshot. The former takes a snapshot for all files in an empty directory, and
 * the latter takes a snapshot based on the snapshot that took before.
 */
public class SnapshotTaker {
  private static final Logger LOGGER = LoggerFactory.getLogger(SnapshotTaker.class);
  private final DataRegion dataRegion;
  private SnapshotLogger snapshotLogger;
  private List<TsFileResource> seqFiles;
  private List<TsFileResource> unseqFiles;

  public SnapshotTaker(DataRegion dataRegion) {
    this.dataRegion = dataRegion;
  }

  public boolean takeFullSnapshot(String snapshotDirPath, boolean flushBeforeSnapshot)
      throws DirectoryNotLegalException, IOException {
    File snapshotDir = new File(snapshotDirPath);
    if (snapshotDir.exists()
        && snapshotDir.listFiles() != null
        && Objects.requireNonNull(snapshotDir.listFiles()).length > 0) {
      // the directory should be empty or not exists
      throw new DirectoryNotLegalException(
          String.format("%s already exists and is not empty", snapshotDirPath));
    }

    if (!snapshotDir.exists() && !snapshotDir.mkdirs()) {
      throw new IOException(String.format("Failed to create directory %s", snapshotDir));
    }

    if (flushBeforeSnapshot) {
      dataRegion.syncCloseAllWorkingTsFileProcessors();
    }

    File snapshotLog = new File(snapshotDir, SnapshotLogger.SNAPSHOT_LOG_NAME);
    try {
      snapshotLogger = new SnapshotLogger(snapshotLog);
      boolean success = true;

      readLockTheFile();
      try {
        success = createSnapshot(seqFiles, snapshotDir.getName());
        success = createSnapshot(unseqFiles, snapshotDir.getName()) && success;
      } finally {
        readUnlockTheFile();
      }

      if (!success) {
        LOGGER.warn(
            "Failed to take snapshot for {}-{}, clean up",
            dataRegion.getStorageGroupName(),
            dataRegion.getDataRegionId());
        cleanUpWhenFail(snapshotDir.getName());
      } else {
        LOGGER.info(
            "Successfully take snapshot for {}-{}, snapshot directory is {}",
            dataRegion.getStorageGroupName(),
            dataRegion.getDataRegionId(),
            snapshotDirPath);
      }

      return success;
    } catch (Exception e) {
      LOGGER.error(
          "Exception occurs when taking snapshot for {}-{}",
          dataRegion.getStorageGroupName(),
          dataRegion.getDataRegionId(),
          e);
      return false;
    } finally {
      try {
        snapshotLogger.close();
      } catch (Exception e) {
        LOGGER.error("Failed to close snapshot logger", e);
      }
    }
  }

  private void readLockTheFile() {
    TsFileManager manager = dataRegion.getTsFileManager();
    manager.readLock();
    try {
      seqFiles = manager.getTsFileList(true);
      unseqFiles = manager.getTsFileList(false);
      for (TsFileResource resource : seqFiles) {
        resource.readLock();
      }
      for (TsFileResource resource : unseqFiles) {
        resource.readLock();
      }
    } finally {
      manager.readUnlock();
    }
  }

  private void readUnlockTheFile() {
    for (TsFileResource resource : seqFiles) {
      resource.readUnlock();
    }
    for (TsFileResource resource : unseqFiles) {
      resource.readUnlock();
    }
  }

  private boolean createSnapshot(List<TsFileResource> resources, String snapshotId) {
    try {
      for (TsFileResource resource : resources) {
        if (!resource.isClosed()) {
          continue;
        }
        File tsFile = resource.getTsFile();
        if (!resource.isClosed()) {
          continue;
        }
        File snapshotTsFile = getSnapshotFilePathForTsFile(tsFile, snapshotId);
        // create hard link for tsfile, resource, mods
        createHardLink(snapshotTsFile, tsFile);
        createHardLink(
            new File(snapshotTsFile.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX),
            new File(tsFile.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX));
        if (resource.getModFile().exists()) {
          createHardLink(
              new File(snapshotTsFile.getAbsolutePath() + ModificationFile.FILE_SUFFIX),
              new File(tsFile.getAbsolutePath() + ModificationFile.FILE_SUFFIX));
        }
      }
      return true;
    } catch (IOException e) {
      LOGGER.error("Catch IOException when creating snapshot", e);
      return false;
    }
  }

  private void createHardLink(File target, File source) throws IOException {
    if (!target.getParentFile().exists()) {
      LOGGER.error("Hard link target dir {} doesn't exist", target.getParentFile());
    }
    if (!source.exists()) {
      LOGGER.error("Hard link source file {} doesn't exist", source);
    }

    Files.createLink(target.getAbsoluteFile().toPath(), source.getAbsoluteFile().toPath());
    snapshotLogger.logFile(source.getAbsolutePath(), target.getAbsolutePath());
  }

  /**
   * Construct the snapshot file path for a given tsfile, and will create the dir. Eg, given a
   * tsfile in /data/iotdb/data/sequence/root.testsg/1/0/1-1-0-0.tsfile, with snapshotId "sm123",
   * the snapshot location will be /data/iotdb/data/snapshot/sm123/root.testsg/1/0/1-1-0-0.tsfile
   *
   * @param tsFile tsfile to be taken a snapshot
   * @param snapshotId the id for current snapshot
   * @return the File object of the snapshot file, and its parent directory will be created
   * @throws IOException
   */
  public File getSnapshotFilePathForTsFile(File tsFile, String snapshotId) throws IOException {
    // ... data (un)sequence sgName dataRegionId timePartition tsFileName
    String[] splittedPath =
        tsFile.getAbsolutePath().split(File.separator.equals("\\") ? "\\\\" : File.separator);
    // snapshot dir will be like
    // ... data snapshot snapshotId (un)sequence sgName dataRegionId timePartition
    StringBuilder stringBuilder = new StringBuilder();
    int i = 0;
    // build the prefix part of data dir
    for (; i < splittedPath.length - 5; ++i) {
      stringBuilder.append(splittedPath[i]);
      stringBuilder.append(File.separator);
    }
    stringBuilder.append("snapshot");
    stringBuilder.append(File.separator);
    stringBuilder.append(snapshotId);
    stringBuilder.append(File.separator);
    // the content in here will be
    // ... data snapshot snapshotId

    // build the rest part for the dir
    for (; i < splittedPath.length - 1; ++i) {
      stringBuilder.append(splittedPath[i]);
      stringBuilder.append(File.separator);
    }
    File dir = new File(stringBuilder.toString());
    if (!dir.exists() && !dir.mkdirs()) {
      throw new IOException("Cannot create directory " + dir.getAbsolutePath());
    }
    return new File(dir, tsFile.getName());
  }

  private void cleanUpWhenFail(String snapshotId) {
    LOGGER.info("Cleaning up snapshot dir for {}", snapshotId);
    for (String dataDir : IoTDBDescriptor.getInstance().getConfig().getDataDirs()) {
      File dataDirForThisSnapshot =
          new File(dataDir + File.separator + "snapshot" + File.separator + snapshotId);
      if (dataDirForThisSnapshot.exists()) {
        try {
          FileUtils.recursiveDeleteFolder(dataDirForThisSnapshot.getAbsolutePath());
        } catch (IOException e) {
          LOGGER.error(
              "Failed to delete folder {} when cleaning up",
              dataDirForThisSnapshot.getAbsolutePath());
        }
      }
    }
  }
}
