/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.accumulo.core.client;

import static java.util.Objects.requireNonNull;

import java.io.IOException;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;

import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.impl.ScannerOptions;
import org.apache.accumulo.core.client.sample.SamplerConfiguration;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.data.ArrayByteSequence;
import org.apache.accumulo.core.data.ByteSequence;
import org.apache.accumulo.core.data.Column;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.data.thrift.IterInfo;
import org.apache.accumulo.core.iterators.IteratorAdapter;
import org.apache.accumulo.core.iterators.IteratorEnvironment;
import org.apache.accumulo.core.iterators.IteratorUtil;
import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.hadoop.io.Text;

/**
 * A scanner that instantiates iterators on the client side instead of on the tablet server. This
 * can be useful for testing iterators or in cases where you don't want iterators affecting the
 * performance of tablet servers.
 *
 * <p>
 * Suggested usage:
 *
 * <pre>
 * <code>
 * Scanner scanner = client.createScanner(tableName, authorizations);
 * scanner = new ClientSideIteratorScanner(scanner);
 * </code>
 * </pre>
 *
 * <p>
 * Iterators added to this scanner will be run in the client JVM. Separate scan iterators can be run
 * on the server side and client side by adding iterators to the source scanner (which will execute
 * server side) and to the client side scanner (which will execute client side).
 */
public class ClientSideIteratorScanner extends ScannerOptions implements Scanner {
  private int size;

  private Range range;
  private boolean isolated = false;
  private long readaheadThreshold = Constants.SCANNER_DEFAULT_READAHEAD_THRESHOLD;
  private SamplerConfiguration iteratorSamplerConfig;

  private class ClientSideIteratorEnvironment implements IteratorEnvironment {

    private SamplerConfiguration samplerConfig;
    private boolean sampleEnabled;

    ClientSideIteratorEnvironment(boolean sampleEnabled, SamplerConfiguration samplerConfig) {
      this.sampleEnabled = sampleEnabled;
      this.samplerConfig = samplerConfig;
    }

    @Override
    public SortedKeyValueIterator<Key,Value> reserveMapFileReader(String mapFileName)
        throws IOException {
      throw new UnsupportedOperationException();
    }

    @Override
    public AccumuloConfiguration getConfig() {
      throw new UnsupportedOperationException();
    }

    @Override
    public IteratorScope getIteratorScope() {
      return IteratorScope.scan;
    }

    @Override
    public boolean isFullMajorCompaction() {
      return false;
    }

    @Override
    public boolean isUserCompaction() {
      return false;
    }

    @Override
    public void registerSideChannel(SortedKeyValueIterator<Key,Value> iter) {
      throw new UnsupportedOperationException();
    }

    @Override
    public Authorizations getAuthorizations() {
      return ClientSideIteratorScanner.this.getAuthorizations();
    }

    @Override
    public IteratorEnvironment cloneWithSamplingEnabled() {
      return new ClientSideIteratorEnvironment(true, samplerConfig);
    }

    @Override
    public boolean isSamplingEnabled() {
      return sampleEnabled;
    }

    @Override
    public SamplerConfiguration getSamplerConfiguration() {
      return samplerConfig;
    }
  }

  /**
   * A class that wraps a Scanner in a SortedKeyValueIterator so that other accumulo iterators can
   * use it as a source.
   */
  private class ScannerTranslatorImpl implements SortedKeyValueIterator<Key,Value> {
    protected Scanner scanner;
    Iterator<Entry<Key,Value>> iter;
    Entry<Key,Value> top = null;
    private SamplerConfiguration samplerConfig;

    /**
     * Constructs an accumulo iterator from a scanner.
     *
     * @param scanner
     *          the scanner to iterate over
     */
    public ScannerTranslatorImpl(final Scanner scanner, SamplerConfiguration samplerConfig) {
      this.scanner = scanner;
      this.samplerConfig = samplerConfig;
    }

    @Override
    public void init(final SortedKeyValueIterator<Key,Value> source,
        final Map<String,String> options, final IteratorEnvironment env) throws IOException {
      throw new UnsupportedOperationException();
    }

    @Override
    public boolean hasTop() {
      return top != null;
    }

    @Override
    public void next() throws IOException {
      if (iter.hasNext())
        top = iter.next();
      else
        top = null;
    }

    @Override
    public void seek(final Range range, final Collection<ByteSequence> columnFamilies,
        final boolean inclusive) throws IOException {
      if (!inclusive && columnFamilies.size() > 0) {
        throw new IllegalArgumentException();
      }
      scanner.setRange(range);
      scanner.clearColumns();
      for (ByteSequence colf : columnFamilies) {
        scanner.fetchColumnFamily(new Text(colf.toArray()));
      }

      if (samplerConfig == null) {
        scanner.clearSamplerConfiguration();
      } else {
        scanner.setSamplerConfiguration(samplerConfig);
      }

      iter = scanner.iterator();
      next();
    }

    @Override
    public Key getTopKey() {
      return top.getKey();
    }

    @Override
    public Value getTopValue() {
      return top.getValue();
    }

    @Override
    public SortedKeyValueIterator<Key,Value> deepCopy(final IteratorEnvironment env) {
      return new ScannerTranslatorImpl(scanner,
          env.isSamplingEnabled() ? env.getSamplerConfiguration() : null);
    }
  }

  private ScannerTranslatorImpl smi;

  /**
   * Constructs a scanner that can execute client-side iterators.
   *
   * @param scanner
   *          the source scanner
   */
  public ClientSideIteratorScanner(final Scanner scanner) {
    smi = new ScannerTranslatorImpl(scanner, scanner.getSamplerConfiguration());
    this.range = scanner.getRange();
    this.size = scanner.getBatchSize();
    this.timeOut = scanner.getTimeout(TimeUnit.MILLISECONDS);
    this.batchTimeOut = scanner.getTimeout(TimeUnit.MILLISECONDS);
    this.readaheadThreshold = scanner.getReadaheadThreshold();
    SamplerConfiguration samplerConfig = scanner.getSamplerConfiguration();
    if (samplerConfig != null)
      setSamplerConfiguration(samplerConfig);
  }

  /**
   * Sets the source Scanner.
   */
  public void setSource(final Scanner scanner) {
    smi = new ScannerTranslatorImpl(scanner, scanner.getSamplerConfiguration());
  }

  @Override
  public Iterator<Entry<Key,Value>> iterator() {
    smi.scanner.setBatchSize(size);
    smi.scanner.setTimeout(timeOut, TimeUnit.MILLISECONDS);
    smi.scanner.setBatchTimeout(batchTimeOut, TimeUnit.MILLISECONDS);
    smi.scanner.setReadaheadThreshold(readaheadThreshold);
    if (isolated)
      smi.scanner.enableIsolation();
    else
      smi.scanner.disableIsolation();

    smi.samplerConfig = getSamplerConfiguration();

    final TreeMap<Integer,IterInfo> tm = new TreeMap<>();

    for (IterInfo iterInfo : serverSideIteratorList) {
      tm.put(iterInfo.getPriority(), iterInfo);
    }

    SortedKeyValueIterator<Key,Value> skvi;
    try {
      skvi = IteratorUtil.loadIterators(smi, tm.values(), serverSideIteratorOptions,
          new ClientSideIteratorEnvironment(getSamplerConfiguration() != null,
              getIteratorSamplerConfigurationInternal()),
          false, null);
    } catch (IOException e) {
      throw new RuntimeException(e);
    }

    final Set<ByteSequence> colfs = new TreeSet<>();
    for (Column c : this.getFetchedColumns()) {
      colfs.add(new ArrayByteSequence(c.getColumnFamily()));
    }

    try {
      skvi.seek(range, colfs, true);
    } catch (IOException e) {
      throw new RuntimeException(e);
    }

    return new IteratorAdapter(skvi);
  }

  @Override
  public Authorizations getAuthorizations() {
    return smi.scanner.getAuthorizations();
  }

  @Deprecated
  @Override
  public void setTimeOut(int timeOut) {
    if (timeOut == Integer.MAX_VALUE)
      setTimeout(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
    else
      setTimeout(timeOut, TimeUnit.SECONDS);
  }

  @Deprecated
  @Override
  public int getTimeOut() {
    long timeout = getTimeout(TimeUnit.SECONDS);
    if (timeout >= Integer.MAX_VALUE)
      return Integer.MAX_VALUE;
    return (int) timeout;
  }

  @Override
  public void setRange(final Range range) {
    this.range = range;
  }

  @Override
  public Range getRange() {
    return range;
  }

  @Override
  public void setBatchSize(final int size) {
    this.size = size;
  }

  @Override
  public int getBatchSize() {
    return size;
  }

  @Override
  public void enableIsolation() {
    this.isolated = true;
  }

  @Override
  public void disableIsolation() {
    this.isolated = false;
  }

  @Override
  public long getReadaheadThreshold() {
    return readaheadThreshold;
  }

  @Override
  public void setReadaheadThreshold(long batches) {
    if (0 > batches) {
      throw new IllegalArgumentException(
          "Number of batches before read-ahead must be non-negative");
    }
    this.readaheadThreshold = batches;
  }

  private SamplerConfiguration getIteratorSamplerConfigurationInternal() {
    SamplerConfiguration scannerSamplerConfig = getSamplerConfiguration();
    if (scannerSamplerConfig != null) {
      if (iteratorSamplerConfig != null && !iteratorSamplerConfig.equals(scannerSamplerConfig)) {
        throw new IllegalStateException("Scanner and iterator sampler configuration differ");
      }

      return scannerSamplerConfig;
    }

    return iteratorSamplerConfig;
  }

  /**
   * This is provided for the case where no sampler configuration is set on the scanner, but there
   * is a need to create iterator deep copies that have sampling enabled. If sampler configuration
   * is set on the scanner, then this method does not need to be called inorder to create deep
   * copies with sampling.
   *
   * <p>
   * Setting this differently than the scanners sampler configuration may cause exceptions.
   *
   * @since 1.8.0
   */
  public void setIteratorSamplerConfiguration(SamplerConfiguration sc) {
    requireNonNull(sc);
    this.iteratorSamplerConfig = sc;
  }

  /**
   * Clear any iterator sampler configuration.
   *
   * @since 1.8.0
   */
  public void clearIteratorSamplerConfiguration() {
    this.iteratorSamplerConfig = null;
  }

  /**
   * @return currently set iterator sampler configuration.
   *
   * @since 1.8.0
   */

  public SamplerConfiguration getIteratorSamplerConfiguration() {
    return iteratorSamplerConfig;
  }
}
