/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package org.apache.accumulo.tserver.scan;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import org.apache.accumulo.core.client.SampleNotPresentException;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.dataImpl.thrift.MultiScanResult;
import org.apache.accumulo.core.dataImpl.thrift.TKey;
import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent;
import org.apache.accumulo.core.dataImpl.thrift.TKeyValue;
import org.apache.accumulo.core.dataImpl.thrift.TRange;
import org.apache.accumulo.core.iteratorsImpl.system.IterationInterruptedException;
import org.apache.accumulo.core.util.Pair;
import org.apache.accumulo.server.conf.TableConfiguration;
import org.apache.accumulo.tserver.TabletHostingServer;
import org.apache.accumulo.tserver.session.MultiScanSession;
import org.apache.accumulo.tserver.tablet.KVEntry;
import org.apache.accumulo.tserver.tablet.Tablet.LookupResult;
import org.apache.accumulo.tserver.tablet.TabletBase;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LookupTask extends ScanTask<MultiScanResult> {

  private static final Logger log = LoggerFactory.getLogger(LookupTask.class);

  private final long scanID;

  public LookupTask(TabletHostingServer server, long scanID) {
    super(server);
    this.scanID = scanID;
  }

  @Override
  public void run() {
    MultiScanSession session = (MultiScanSession) server.getSession(scanID);
    String oldThreadName = Thread.currentThread().getName();

    try {
      if (isCancelled() || session == null)
        return;

      if (!transitionToRunning())
        return;

      TableConfiguration acuTableConf = server.getTableConfiguration(session.threadPoolExtent);
      long maxResultsSize = acuTableConf.getAsBytes(Property.TABLE_SCAN_MAXMEM);

      Thread.currentThread().setName("Client: " + session.client + " User: " + session.getUser()
          + " Start: " + session.startTime + " Table: ");

      long bytesAdded = 0;
      long maxScanTime = 4000;

      long startTime = System.currentTimeMillis();
      // Copy entries in session.queries (HashMap) to queryQueue (LinkedList)
      // to better control the order when unfinished ranges are returned
      LinkedList<Pair<KeyExtent,List<Range>>> queryQueue = new LinkedList<>();
      session.queries.entrySet()
          .forEach(e -> queryQueue.addLast(new Pair<>(e.getKey(), e.getValue())));

      List<KVEntry> results = new ArrayList<>();
      Map<KeyExtent,List<Range>> failures = new HashMap<>();
      List<KeyExtent> fullScans = new ArrayList<>();
      KeyExtent partScan = null;
      Key partNextKey = null;
      boolean partNextKeyInclusive = false;

      // check the time so that the read ahead thread is not monopolized
      while (!queryQueue.isEmpty() && bytesAdded < maxResultsSize
          && (System.currentTimeMillis() - startTime) < maxScanTime) {
        Pair<KeyExtent,List<Range>> extentRangePair = queryQueue.removeFirst();
        KeyExtent extent = extentRangePair.getFirst();
        List<Range> ranges = extentRangePair.getSecond();
        session.queries.remove(extent);

        // check that tablet server is serving requested tablet
        TabletBase tablet = session.getTabletResolver().getTablet(extent);

        if (tablet == null) {
          failures.put(extent, ranges);
          continue;
        }
        Thread.currentThread().setName("Client: " + session.client + " User: " + session.getUser()
            + " Start: " + session.startTime + " Tablet: " + extent);

        LookupResult lookupResult;
        try {

          // do the following check to avoid a race condition
          // between setting false below and the task being
          // canceled
          if (isCancelled())
            interruptFlag.set(true);

          // Create new List here to collect the results from this Tablet.lookup() call
          // Ensures that the yield code in Tablet can only compare a yield position
          // to the results from that call
          List<KVEntry> tabletResults = new ArrayList<>();
          lookupResult = tablet.lookup(ranges, tabletResults, session.scanParams,
              maxResultsSize - bytesAdded, interruptFlag);
          // Add results from this Tablet.lookup() to the accumulated results
          results.addAll(tabletResults);

          // if the tablet was closed it it possible that the
          // interrupt flag was set.... do not want it set for
          // the next
          // lookup
          interruptFlag.set(false);

        } catch (IOException e) {
          log.warn("lookup failed for tablet " + extent, e);
          throw new RuntimeException(e);
        }

        bytesAdded += lookupResult.bytesAdded;

        if (lookupResult.unfinishedRanges.isEmpty()) {
          fullScans.add(extent);
          // if this extent was previously saved but now completed, then reset these values
          if (partScan != null && partScan.equals(extent)) {
            partScan = null;
            partNextKey = null;
            partNextKeyInclusive = false;
          }
        } else {
          if (lookupResult.closed) {
            failures.put(extent, lookupResult.unfinishedRanges);
          } else {
            // add to beginning of queue so that we handle this extent and unfinished ranges next
            // unfinished ranges will either be finished or returned as partially completed
            queryQueue.addFirst(new Pair<>(extent, lookupResult.unfinishedRanges));
            session.queries.put(extent, lookupResult.unfinishedRanges);
            partScan = extent;
            partNextKey = lookupResult.unfinishedRanges.get(0).getStartKey();
            partNextKeyInclusive = lookupResult.unfinishedRanges.get(0).isStartKeyInclusive();
          }
        }
      }

      long finishTime = System.currentTimeMillis();
      session.totalLookupTime += (finishTime - startTime);
      session.numEntries += results.size();

      // convert everything to thrift before adding result
      List<TKeyValue> retResults = new ArrayList<>();
      for (KVEntry entry : results)
        retResults
            .add(new TKeyValue(entry.getKey().toThrift(), ByteBuffer.wrap(entry.getValue().get())));
      // @formatter:off
      Map<TKeyExtent,List<TRange>> retFailures = failures.entrySet().stream().collect(Collectors.toMap(
                      entry -> entry.getKey().toThrift(),
                      entry -> entry.getValue().stream().map(Range::toThrift).collect(Collectors.toList())
      ));
      // @formatter:on
      List<TKeyExtent> retFullScans =
          fullScans.stream().map(KeyExtent::toThrift).collect(Collectors.toList());
      TKeyExtent retPartScan = null;
      TKey retPartNextKey = null;
      if (partScan != null) {
        retPartScan = partScan.toThrift();
        retPartNextKey = partNextKey.toThrift();
      }
      // add results to queue
      addResult(new MultiScanResult(retResults, retFailures, retFullScans, retPartScan,
          retPartNextKey, partNextKeyInclusive, !session.queries.isEmpty()));
    } catch (IterationInterruptedException iie) {
      if (!isCancelled()) {
        log.warn("Iteration interrupted, when scan not cancelled", iie);
        addResult(iie);
      }
    } catch (SampleNotPresentException e) {
      addResult(e);
    } catch (Exception e) {
      log.warn("exception while doing multi-scan ", e);
      addResult(e);
    } finally {
      Thread.currentThread().setName(oldThreadName);
      runState.set(ScanRunState.FINISHED);
    }
  }
}
