/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.accumulo.core.file;

import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;

import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.ByteSequence;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.file.rfile.RFile;
import org.apache.accumulo.core.spi.cache.BlockCache;
import org.apache.accumulo.core.spi.crypto.CryptoService;
import org.apache.accumulo.core.util.ratelimit.RateLimiter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;

import com.google.common.cache.Cache;

public abstract class FileOperations {

  private static final HashSet<String> validExtensions = new HashSet<>(
      Arrays.asList(Constants.MAPFILE_EXTENSION, RFile.EXTENSION));

  public static Set<String> getValidExtensions() {
    return validExtensions;
  }

  public static String getNewFileExtension(AccumuloConfiguration acuconf) {
    return acuconf.get(Property.TABLE_FILE_TYPE);
  }

  public static FileOperations getInstance() {
    return new DispatchingFileFactory();
  }

  //
  // Abstract methods (to be implemented by subclasses)
  //

  protected abstract long getFileSize(FileOptions options) throws IOException;

  protected abstract FileSKVWriter openWriter(FileOptions options) throws IOException;

  protected abstract FileSKVIterator openIndex(FileOptions options) throws IOException;

  protected abstract FileSKVIterator openScanReader(FileOptions options) throws IOException;

  protected abstract FileSKVIterator openReader(FileOptions options) throws IOException;

  //
  // File operations
  //

  /**
   * Construct an operation object allowing one to create a writer for a file. <br>
   * Syntax:
   *
   * <pre>
   * FileSKVWriter writer = fileOperations.newWriterBuilder()
   *     .forFile(...)
   *     .withTableConfiguration(...)
   *     .withRateLimiter(...) // optional
   *     .withCompression(...) // optional
   *     .build();
   * </pre>
   */
  public WriterBuilder newWriterBuilder() {
    return new WriterBuilder();
  }

  /**
   * Construct an operation object allowing one to create an index iterator for a file. <br>
   * Syntax:
   *
   * <pre>
   * FileSKVIterator iterator = fileOperations.newIndexReaderBuilder()
   *     .forFile(...)
   *     .withTableConfiguration(...)
   *     .withRateLimiter(...) // optional
   *     .withBlockCache(...) // optional
   *     .build();
   * </pre>
   */
  public IndexReaderBuilder newIndexReaderBuilder() {
    return new IndexReaderBuilder();
  }

  /**
   * Construct an operation object allowing one to create a "scan" reader for a file. Scan readers
   * do not have any optimizations for seeking beyond their initial position. This is useful for
   * file operations that only need to scan data within a range and do not need to seek. Therefore
   * file metadata such as indexes does not need to be kept in memory while the file is scanned.
   * Also seek optimizations like bloom filters do not need to be loaded. <br>
   * Syntax:
   *
   * <pre>
   * FileSKVIterator scanner = fileOperations.newScanReaderBuilder()
   *     .forFile(...)
   *     .withTableConfiguration(...)
   *     .overRange(...)
   *     .withRateLimiter(...) // optional
   *     .withBlockCache(...) // optional
   *     .build();
   * </pre>
   */
  public ScanReaderBuilder newScanReaderBuilder() {
    return new ScanReaderBuilder();
  }

  /**
   * Construct an operation object allowing one to create a reader for a file. A reader constructed
   * in this manner fully supports seeking, and also enables any optimizations related to seeking
   * (e.g. Bloom filters). <br>
   * Syntax:
   *
   * <pre>
   * FileSKVIterator scanner = fileOperations.newReaderBuilder()
   *     .forFile(...)
   *     .withTableConfiguration(...)
   *     .withRateLimiter(...) // optional
   *     .withBlockCache(...) // optional
   *     .seekToBeginning(...) // optional
   *     .build();
   * </pre>
   */
  public ReaderBuilder newReaderBuilder() {
    return new ReaderBuilder();
  }

  public class FileOptions {
    // objects used by all
    public final AccumuloConfiguration tableConfiguration;
    public final String filename;
    public final FileSystem fs;
    public final Configuration fsConf;
    public final RateLimiter rateLimiter;
    // writer only objects
    public final String compression;
    public final FSDataOutputStream outputStream;
    public final boolean enableAccumuloStart;
    // reader only objects
    public final BlockCache dataCache;
    public final BlockCache indexCache;
    public final Cache<String,Long> fileLenCache;
    public final boolean seekToBeginning;
    public final CryptoService cryptoService;
    // scan reader only objects
    public final Range range;
    public final Set<ByteSequence> columnFamilies;
    public final boolean inclusive;

    public FileOptions(AccumuloConfiguration tableConfiguration, String filename, FileSystem fs,
        Configuration fsConf, RateLimiter rateLimiter, String compression,
        FSDataOutputStream outputStream, boolean enableAccumuloStart, BlockCache dataCache,
        BlockCache indexCache, Cache<String,Long> fileLenCache, boolean seekToBeginning,
        CryptoService cryptoService, Range range, Set<ByteSequence> columnFamilies,
        boolean inclusive) {
      this.tableConfiguration = tableConfiguration;
      this.filename = filename;
      this.fs = fs;
      this.fsConf = fsConf;
      this.rateLimiter = rateLimiter;
      this.compression = compression;
      this.outputStream = outputStream;
      this.enableAccumuloStart = enableAccumuloStart;
      this.dataCache = dataCache;
      this.indexCache = indexCache;
      this.fileLenCache = fileLenCache;
      this.seekToBeginning = seekToBeginning;
      this.cryptoService = cryptoService;
      this.range = range;
      this.columnFamilies = columnFamilies;
      this.inclusive = inclusive;
    }

    public AccumuloConfiguration getTableConfiguration() {
      return tableConfiguration;
    }

    public String getFilename() {
      return filename;
    }

    public FileSystem getFileSystem() {
      return fs;
    }

    public Configuration getConfiguration() {
      return fsConf;
    }

    public RateLimiter getRateLimiter() {
      return rateLimiter;
    }

    public String getCompression() {
      return compression;
    }

    public FSDataOutputStream getOutputStream() {
      return outputStream;
    }

    public boolean isAccumuloStartEnabled() {
      return enableAccumuloStart;
    }

    public BlockCache getDataCache() {
      return dataCache;
    }

    public BlockCache getIndexCache() {
      return indexCache;
    }

    public Cache<String,Long> getFileLenCache() {
      return fileLenCache;
    }

    public boolean isSeekToBeginning() {
      return seekToBeginning;
    }

    public CryptoService getCryptoService() {
      return cryptoService;
    }

    public Range getRange() {
      return range;
    }

    public Set<ByteSequence> getColumnFamilies() {
      return columnFamilies;
    }

    public boolean isRangeInclusive() {
      return inclusive;
    }
  }

  /**
   * Helper class extended by both writers and readers.
   */
  public class FileHelper {
    protected AccumuloConfiguration tableConfiguration;
    protected String filename;
    protected FileSystem fs;
    protected Configuration fsConf;
    protected RateLimiter rateLimiter;
    protected CryptoService cryptoService;

    protected FileHelper fs(FileSystem fs) {
      Objects.requireNonNull(fs);
      this.fs = fs;
      return this;
    }

    protected FileHelper fsConf(Configuration fsConf) {
      Objects.requireNonNull(fsConf);
      this.fsConf = fsConf;
      return this;
    }

    protected FileHelper filename(String filename) {
      Objects.requireNonNull(filename);
      this.filename = filename;
      return this;
    }

    protected FileHelper tableConfiguration(AccumuloConfiguration tableConfiguration) {
      Objects.requireNonNull(tableConfiguration);
      this.tableConfiguration = tableConfiguration;
      return this;
    }

    protected FileHelper rateLimiter(RateLimiter rateLimiter) {
      this.rateLimiter = rateLimiter;
      return this;
    }

    protected FileHelper cryptoService(CryptoService cs) {
      this.cryptoService = cs;
      return this;
    }

    protected FileOptions toWriterBuilderOptions(String compression,
        FSDataOutputStream outputStream, boolean startEnabled) {
      return new FileOptions(tableConfiguration, filename, fs, fsConf, rateLimiter, compression,
          outputStream, startEnabled, null, null, null, false, cryptoService, null, null, true);
    }

    protected FileOptions toReaderBuilderOptions(BlockCache dataCache, BlockCache indexCache,
        Cache<String,Long> fileLenCache, boolean seekToBeginning) {
      return new FileOptions(tableConfiguration, filename, fs, fsConf, rateLimiter, null, null,
          false, dataCache, indexCache, fileLenCache, seekToBeginning, cryptoService, null, null,
          true);
    }

    protected FileOptions toIndexReaderBuilderOptions(Cache<String,Long> fileLenCache) {
      return new FileOptions(tableConfiguration, filename, fs, fsConf, rateLimiter, null, null,
          false, null, null, fileLenCache, false, cryptoService, null, null, true);
    }

    protected FileOptions toScanReaderBuilderOptions(Range range, Set<ByteSequence> columnFamilies,
        boolean inclusive) {
      return new FileOptions(tableConfiguration, filename, fs, fsConf, rateLimiter, null, null,
          false, null, null, null, false, cryptoService, range, columnFamilies, inclusive);
    }
  }

  /**
   * Operation object for constructing a writer.
   */
  public class WriterBuilder extends FileHelper implements WriterTableConfiguration {
    private String compression;
    private FSDataOutputStream outputStream;
    private boolean enableAccumuloStart = true;

    public WriterTableConfiguration forOutputStream(String extension,
        FSDataOutputStream outputStream, Configuration fsConf) {
      this.outputStream = outputStream;
      filename("foo" + extension).fsConf(fsConf);
      return this;
    }

    public WriterTableConfiguration forFile(String filename, FileSystem fs, Configuration fsConf) {
      filename(filename).fs(fs).fsConf(fsConf);
      return this;
    }

    @Override
    public WriterBuilder withTableConfiguration(AccumuloConfiguration tableConfiguration) {
      tableConfiguration(tableConfiguration);
      return this;
    }

    public WriterBuilder withStartDisabled() {
      this.enableAccumuloStart = false;
      return this;
    }

    public WriterBuilder withCompression(String compression) {
      this.compression = compression;
      return this;
    }

    public WriterBuilder withRateLimiter(RateLimiter rateLimiter) {
      rateLimiter(rateLimiter);
      return this;
    }

    public WriterBuilder withCryptoService(CryptoService cs) {
      cryptoService(cs);
      return this;
    }

    public FileSKVWriter build() throws IOException {
      return openWriter(toWriterBuilderOptions(compression, outputStream, enableAccumuloStart));
    }
  }

  public interface WriterTableConfiguration {
    public WriterBuilder withTableConfiguration(AccumuloConfiguration tableConfiguration);
  }

  /**
   * Options common to all {@code FileOperations} which perform reads.
   */
  public class ReaderBuilder extends FileHelper implements ReaderTableConfiguration {
    private BlockCache dataCache;
    private BlockCache indexCache;
    private Cache<String,Long> fileLenCache;
    private boolean seekToBeginning = false;

    public ReaderTableConfiguration forFile(String filename, FileSystem fs, Configuration fsConf) {
      filename(filename).fs(fs).fsConf(fsConf);
      return this;
    }

    @Override
    public ReaderBuilder withTableConfiguration(AccumuloConfiguration tableConfiguration) {
      tableConfiguration(tableConfiguration);
      return this;
    }

    /**
     * (Optional) Set the block cache pair to be used to optimize reads within the constructed
     * reader.
     */
    public ReaderBuilder withBlockCache(BlockCache dataCache, BlockCache indexCache) {
      this.dataCache = dataCache;
      this.indexCache = indexCache;
      return this;
    }

    /** (Optional) set the data cache to be used to optimize reads within the constructed reader. */
    public ReaderBuilder withDataCache(BlockCache dataCache) {
      this.dataCache = dataCache;
      return this;
    }

    /**
     * (Optional) set the index cache to be used to optimize reads within the constructed reader.
     */
    public ReaderBuilder withIndexCache(BlockCache indexCache) {
      this.indexCache = indexCache;
      return this;
    }

    public ReaderBuilder withFileLenCache(Cache<String,Long> fileLenCache) {
      this.fileLenCache = fileLenCache;
      return this;
    }

    public ReaderBuilder withCryptoService(CryptoService cs) {
      cryptoService(cs);
      return this;
    }

    public ReaderBuilder withRateLimiter(RateLimiter rateLimiter) {
      rateLimiter(rateLimiter);
      return this;
    }

    /**
     * Seek the constructed iterator to the beginning of its domain before returning. Equivalent to
     * {@code seekToBeginning(true)}.
     */
    public ReaderBuilder seekToBeginning() {
      seekToBeginning(true);
      return this;
    }

    /** If true, seek the constructed iterator to the beginning of its domain before returning. */
    public ReaderBuilder seekToBeginning(boolean seekToBeginning) {
      this.seekToBeginning = seekToBeginning;
      return this;
    }

    /** Execute the operation, constructing the specified file reader. */
    public FileSKVIterator build() throws IOException {
      /**
       * If the table configuration disallows caching, rewrite the options object to not pass the
       * caches.
       */
      if (!tableConfiguration.getBoolean(Property.TABLE_INDEXCACHE_ENABLED)) {
        withIndexCache(null);
      }
      if (!tableConfiguration.getBoolean(Property.TABLE_BLOCKCACHE_ENABLED)) {
        withDataCache(null);
      }
      return openReader(
          toReaderBuilderOptions(dataCache, indexCache, fileLenCache, seekToBeginning));
    }
  }

  public interface ReaderTableConfiguration {
    ReaderBuilder withTableConfiguration(AccumuloConfiguration tableConfiguration);
  }

  /**
   * Operation object for opening an index.
   */
  public class IndexReaderBuilder extends FileHelper implements IndexReaderTableConfiguration {

    private Cache<String,Long> fileLenCache = null;

    public IndexReaderTableConfiguration forFile(String filename, FileSystem fs,
        Configuration fsConf) {
      filename(filename).fs(fs).fsConf(fsConf);
      return this;
    }

    @Override
    public IndexReaderBuilder withTableConfiguration(AccumuloConfiguration tableConfiguration) {
      tableConfiguration(tableConfiguration);
      return this;
    }

    public IndexReaderBuilder withFileLenCache(Cache<String,Long> fileLenCache) {
      this.fileLenCache = fileLenCache;
      return this;
    }

    public FileSKVIterator build() throws IOException {
      return openIndex(toIndexReaderBuilderOptions(fileLenCache));
    }
  }

  public interface IndexReaderTableConfiguration {
    IndexReaderBuilder withTableConfiguration(AccumuloConfiguration tableConfiguration);
  }

  /** Operation object for opening a scan reader. */
  public class ScanReaderBuilder extends FileHelper implements ScanReaderTableConfiguration {
    private Range range;
    private Set<ByteSequence> columnFamilies;
    private boolean inclusive;

    public ScanReaderTableConfiguration forFile(String filename, FileSystem fs,
        Configuration fsConf) {
      filename(filename).fs(fs).fsConf(fsConf);
      return this;
    }

    @Override
    public ScanReaderBuilder withTableConfiguration(AccumuloConfiguration tableConfiguration) {
      tableConfiguration(tableConfiguration);
      return this;
    }

    /** Set the range over which the constructed iterator will search. */
    public ScanReaderBuilder overRange(Range range, Set<ByteSequence> columnFamilies,
        boolean inclusive) {
      Objects.requireNonNull(range);
      Objects.requireNonNull(columnFamilies);
      this.range = range;
      this.columnFamilies = columnFamilies;
      this.inclusive = inclusive;
      return this;
    }

    /** The range over which this reader should scan. */
    public Range getRange() {
      return range;
    }

    /** The column families which this reader should scan. */
    public Set<ByteSequence> getColumnFamilies() {
      return columnFamilies;
    }

    /** Execute the operation, constructing a scan iterator. */
    public FileSKVIterator build() throws IOException {
      return openScanReader(toScanReaderBuilderOptions(range, columnFamilies, inclusive));
    }
  }

  public interface ScanReaderTableConfiguration {
    ScanReaderBuilder withTableConfiguration(AccumuloConfiguration tableConfiguration);
  }
}
