/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.carbondata.core.datastore.filesystem;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;

import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.util.CarbonUtil;

import com.github.luben.zstd.ZstdInputStream;
import com.github.luben.zstd.ZstdOutputStream;
import net.jpountz.lz4.LZ4BlockInputStream;
import net.jpountz.lz4.LZ4BlockOutputStream;
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.log4j.Logger;
import org.xerial.snappy.SnappyInputStream;
import org.xerial.snappy.SnappyOutputStream;

public class LocalCarbonFile implements CarbonFile {
  private static final Logger LOGGER =
      LogServiceFactory.getLogService(LocalCarbonFile.class.getName());

  private File file;

  private String absoluteFilePath;

  public LocalCarbonFile(String filePath) {
    Path pathWithoutSchemeAndAuthority = Path.getPathWithoutSchemeAndAuthority(new Path(filePath));
    filePath = pathWithoutSchemeAndAuthority.toString().replace("\\", "/");
    absoluteFilePath = FileFactory.getUpdatedFilePath(filePath);
    file = new File(absoluteFilePath);
  }

  LocalCarbonFile(File file) {
    this.file = file;
    String filePath = file.getAbsolutePath().replace("\\", "/");
    absoluteFilePath = FileFactory.getUpdatedFilePath(filePath);
  }

  @Override
  public String getAbsolutePath() {
    return file.getAbsolutePath();
  }

  @Override
  public CarbonFile[] listFiles(final CarbonFileFilter fileFilter) {
    if (!isDirectory()) {
      return new CarbonFile[0];
    }
    File[] files = file.listFiles(pathname -> fileFilter.accept(new LocalCarbonFile(pathname)));
    if (files == null) {
      return new CarbonFile[0];
    }
    CarbonFile[] carbonFiles = new CarbonFile[files.length];
    for (int i = 0; i < carbonFiles.length; i++) {
      carbonFiles[i] = new LocalCarbonFile(files[i]);
    }
    return carbonFiles;
  }

  @Override
  public String getName() {
    return file.getName();
  }

  @Override
  public boolean isDirectory() {
    return file.isDirectory();
  }

  @Override
  public boolean exists() {
    return file.exists();
  }

  @Override
  public String getCanonicalPath() {
    try {
      return file.getCanonicalPath();
    } catch (IOException e) {
      LOGGER.error("Exception occured" + e.getMessage(), e);
    }
    return null;
  }

  @Override
  public CarbonFile getParentFile() {
    return new LocalCarbonFile(file.getParentFile());
  }

  @Override
  public String getPath() {
    return file.getPath();
  }

  @Override
  public long getSize() {
    return file.length();
  }

  public boolean renameTo(String changetoName) {
    changetoName = FileFactory.getUpdatedFilePath(changetoName);
    return file.renameTo(new File(changetoName));
  }

  public boolean delete() {
    return deleteFile();
  }

  @Override
  public CarbonFile[] listFiles() {
    if (!file.isDirectory()) {
      return new CarbonFile[0];
    }
    File[] files = file.listFiles();
    if (files == null) {
      return new CarbonFile[0];
    }
    CarbonFile[] carbonFiles = new CarbonFile[files.length];
    for (int i = 0; i < carbonFiles.length; i++) {
      carbonFiles[i] = new LocalCarbonFile(files[i]);
    }
    return carbonFiles;
  }

  @Override
  public CarbonFile[] listFiles(boolean recursive, int maxCount)
      throws IOException {
    // ignore the maxCount for local filesystem
    return listFiles();
  }

  @Override
  public List<CarbonFile> listFiles(Boolean recursive) {
    if (!isDirectory()) {
      return new ArrayList<>();
    }
    Collection<File> fileCollection = FileUtils.listFiles(file, null, true);
    List<CarbonFile> carbonFiles = new ArrayList<>();
    for (File file : fileCollection) {
      carbonFiles.add(new LocalCarbonFile(file));
    }
    return carbonFiles;
  }

  @Override
  public List<CarbonFile> listFiles(boolean recursive, CarbonFileFilter fileFilter) {
    if (!file.isDirectory()) {
      return new ArrayList<>();
    }
    Collection<File> fileCollection = FileUtils.listFiles(file, null, recursive);
    if (fileCollection.isEmpty()) {
      return new ArrayList<>();
    }
    List<CarbonFile> carbonFiles = new ArrayList<>();
    for (File file : fileCollection) {
      CarbonFile carbonFile = new LocalCarbonFile(file);
      if (fileFilter.accept(carbonFile)) {
        carbonFiles.add(carbonFile);
      }
    }
    return carbonFiles;
  }

  @Override
  public boolean createNewFile() {
    try {
      return file.createNewFile();
    } catch (IOException e) {
      return false;
    }
  }

  @Override
  public long getLastModifiedTime() {
    return file.lastModified();
  }

  @Override
  public boolean setLastModifiedTime(long timestamp) {
    return file.setLastModified(timestamp);
  }

  /**
   * This method will delete the data in file data from a given offset
   */
  @Override
  public boolean truncate(String fileName, long validDataEndOffset) {
    FileChannel source = null;
    FileChannel destination = null;
    boolean fileTruncatedSuccessfully = false;
    // temporary file name
    String tempWriteFilePath = fileName + CarbonCommonConstants.TEMPWRITEFILEEXTENSION;
    try {
      CarbonFile tempFile;
      // delete temporary file if it already exists at a given path
      if (isFileExist()) {
        tempFile = FileFactory.getCarbonFile(tempWriteFilePath);
        tempFile.delete();
      }
      // create new temporary file
      FileFactory.createNewFile(tempWriteFilePath);
      tempFile = FileFactory.getCarbonFile(tempWriteFilePath);
      source = new FileInputStream(fileName).getChannel();
      destination = new FileOutputStream(tempWriteFilePath).getChannel();
      long read = destination.transferFrom(source, 0, validDataEndOffset);
      long totalBytesRead = read;
      long remaining = validDataEndOffset - totalBytesRead;
      // read till required data offset is not reached
      while (remaining > 0) {
        read = destination.transferFrom(source, totalBytesRead, remaining);
        totalBytesRead = totalBytesRead + read;
        remaining = remaining - totalBytesRead;
      }
      CarbonUtil.closeStreams(source, destination);
      // rename the temp file to original file
      tempFile.renameForce(fileName);
      fileTruncatedSuccessfully = true;
    } catch (IOException e) {
      LOGGER.error("Exception occured while truncating the file " + e.getMessage(), e);
    } finally {
      CarbonUtil.closeStreams(source, destination);
    }
    return fileTruncatedSuccessfully;
  }

  /**
   * This method will be used to check whether a file has been modified or not
   *
   * @param fileTimeStamp time to be compared with latest timestamp of file
   * @param endOffset     file length to be compared with current length of file
   * @return true if the file is modified otherwise return false.
   */
  @Override
  public boolean isFileModified(long fileTimeStamp, long endOffset) {
    boolean isFileModified = false;
    if (getLastModifiedTime() > fileTimeStamp || getSize() > endOffset) {
      isFileModified = true;
    }
    return isFileModified;
  }

  @Override
  public boolean renameForce(String changeToName) {
    File destFile = new File(changeToName);
    if (destFile.exists() && !file.getAbsolutePath().equals(destFile.getAbsolutePath())) {
      if (destFile.delete()) {
        return file.renameTo(new File(changeToName));
      }
    }
    return file.renameTo(new File(changeToName));
  }

  @Override
  public DataOutputStream getDataOutputStream(int bufferSize, boolean append) throws
      FileNotFoundException {
    return new DataOutputStream(
        new BufferedOutputStream(new FileOutputStream(absoluteFilePath, append), bufferSize));
  }

  @Override
  public DataInputStream getDataInputStream(int bufferSize) throws IOException {
    return getDataInputStream(bufferSize,
        CarbonUtil.inferCompressorFromFileName(file.getAbsolutePath()));
  }

  @Override
  public DataInputStream getDataInputStream(int bufferSize, String compressor)
      throws IOException {
    InputStream inputStream;
    if (compressor.isEmpty()) {
      inputStream = new FileInputStream(absoluteFilePath);
    } else if ("GZIP".equalsIgnoreCase(compressor)) {
      inputStream = new GZIPInputStream(new FileInputStream(absoluteFilePath));
    } else if ("BZIP2".equalsIgnoreCase(compressor)) {
      inputStream = new BZip2CompressorInputStream(new FileInputStream(absoluteFilePath));
    } else if ("SNAPPY".equalsIgnoreCase(compressor)) {
      inputStream = new SnappyInputStream(new FileInputStream(absoluteFilePath));
    } else if ("LZ4".equalsIgnoreCase(compressor)) {
      inputStream = new LZ4BlockInputStream(new FileInputStream(absoluteFilePath));
    } else if ("ZSTD".equalsIgnoreCase(compressor)) {
      inputStream = new ZstdInputStream(new FileInputStream(absoluteFilePath));
    } else {
      throw new IOException("Unsupported compressor: " + compressor);
    }

    if (bufferSize <= 0) {
      return new DataInputStream(new BufferedInputStream(inputStream));
    } else {
      return new DataInputStream(new BufferedInputStream(inputStream, bufferSize));
    }
  }

  /**
   * return the datainputStream which is seek to the offset of file
   *
   * @param bufferSize
   * @param offset
   * @return DataInputStream
   * @throws IOException
   */
  @Override
  public DataInputStream getDataInputStream(int bufferSize, long offset) throws
      IOException {
    FileInputStream fis = new FileInputStream(absoluteFilePath);
    long actualSkipSize = 0;
    long skipSize = offset;
    try {
      while (actualSkipSize != offset) {
        actualSkipSize += fis.skip(skipSize);
        skipSize = skipSize - actualSkipSize;
      }
    } catch (IOException ioe) {
      CarbonUtil.closeStream(fis);
      throw ioe;
    }
    return new DataInputStream(new BufferedInputStream(fis));
  }

  @Override
  public DataOutputStream getDataOutputStream()
      throws IOException {
    return new DataOutputStream(new BufferedOutputStream(new FileOutputStream(absoluteFilePath)));
  }

  @Override
  public DataOutputStream getDataOutputStream(int bufferSize, long blockSize) throws IOException {
    return getDataOutputStream(bufferSize, blockSize, (short) 1);
  }

  @Override
  public DataOutputStream getDataOutputStream(int bufferSize, long blockSize, short replication)
      throws IOException {
    return new DataOutputStream(
        new BufferedOutputStream(new FileOutputStream(absoluteFilePath), bufferSize));
  }

  @Override
  public DataOutputStream getDataOutputStream(int bufferSize, String compressor)
      throws IOException {
    OutputStream outputStream;
    if (compressor.isEmpty()) {
      outputStream = new FileOutputStream(absoluteFilePath);
    } else if ("GZIP".equalsIgnoreCase(compressor)) {
      outputStream = new GZIPOutputStream(new FileOutputStream(absoluteFilePath));
    } else if ("BZIP2".equalsIgnoreCase(compressor)) {
      outputStream = new BZip2CompressorOutputStream(new FileOutputStream(absoluteFilePath));
    } else if ("SNAPPY".equalsIgnoreCase(compressor)) {
      outputStream = new SnappyOutputStream(new FileOutputStream(absoluteFilePath));
    } else if ("LZ4".equalsIgnoreCase(compressor)) {
      outputStream = new LZ4BlockOutputStream(new FileOutputStream(absoluteFilePath));
    } else if ("ZSTD".equalsIgnoreCase(compressor)) {
      // compression level 1 is cost-effective for sort temp file
      // which is not used for storage
      outputStream = new ZstdOutputStream(new FileOutputStream(absoluteFilePath), 1);
    } else {
      throw new IOException("Unsupported compressor: " + compressor);
    }

    if (bufferSize <= 0) {
      return new DataOutputStream(new BufferedOutputStream(outputStream));
    } else {
      return new DataOutputStream(new BufferedOutputStream(outputStream, bufferSize));
    }
  }

  @Override
  public boolean isFileExist(boolean performFileCheck) {
    if (performFileCheck) {
      return file.exists() && file.isFile();
    } else {
      return file.exists();
    }
  }

  @Override
  public boolean isFileExist() {
    return file.exists();
  }

  @Override
  public boolean createNewFile(final FsPermission permission) throws IOException {
    return file.createNewFile();
  }

  @Override
  public boolean deleteFile() {
    return FileFactory.deleteAllFilesOfDir(file);
  }

  @Override
  public boolean mkdirs() {
    return file.mkdirs();
  }

  @Override
  public DataOutputStream getDataOutputStreamUsingAppend() throws IOException {
    return new DataOutputStream(
        new BufferedOutputStream(new FileOutputStream(absoluteFilePath, true)));
  }

  @Override
  public boolean createNewLockFile() throws IOException {
    return file.createNewFile();
  }

  @Override
  public CarbonFile[] locationAwareListFiles(PathFilter pathFilter) {
    return listFiles();
  }

  @Override
  public String[] getLocations() {
    return new String[]{"localhost"};
  }

  @Override
  public boolean setReplication(short replication) {
    // local carbon file does not need replication
    return true;
  }

  @Override
  public short getDefaultReplication() {
    return 1;
  }

  @Override
  public long getLength() {
    return file.length();
  }

  @Override
  public boolean equals(Object o) {
    if (this == o) {
      return true;
    }
    if (o == null || getClass() != o.getClass()) {
      return false;
    }
    LocalCarbonFile that = (LocalCarbonFile) o;
    return Objects.equals(file.getAbsolutePath(), that.file.getAbsolutePath());
  }

  @Override
  public int hashCode() {
    return Objects.hash(file.getAbsolutePath());
  }
}
