/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.carbondata.core.scan.result.iterator;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import org.apache.carbondata.common.CarbonIterator;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.block.SegmentProperties;
import org.apache.carbondata.core.scan.result.RowBatch;
import org.apache.carbondata.core.util.CarbonProperties;

import org.apache.log4j.Logger;

/**
 * This is a wrapper iterator over the detail raw query iterator.
 * This iterator will handle the processing of the raw rows.
 * This will handle the batch results and will iterate on the batches and give single row.
 */
public class RawResultIterator extends CarbonIterator<Object[]> {

  protected final SegmentProperties sourceSegProperties;

  protected final SegmentProperties destinationSegProperties;
  /**
   * Iterator of the Batch raw result.
   */
  private CarbonIterator<RowBatch> detailRawQueryResultIterator;

  private boolean prefetchEnabled;
  private List<Object[]> currentBuffer;
  private List<Object[]> backupBuffer;
  private int currentIdxInBuffer;
  private ExecutorService executorService;
  private Future<Void> fetchFuture;
  private Object[] currentRawRow = null;
  private boolean isBackupFilled = false;

  /**
   * number of cores which can be used
   */
  private int batchSize;
  /**
   * LOGGER
   */
  private static final Logger LOGGER =
      LogServiceFactory.getLogService(RawResultIterator.class.getName());

  public RawResultIterator(CarbonIterator<RowBatch> detailRawQueryResultIterator,
      SegmentProperties sourceSegProperties, SegmentProperties destinationSegProperties,
      boolean init) {
    this.detailRawQueryResultIterator = detailRawQueryResultIterator;
    this.sourceSegProperties = sourceSegProperties;
    this.destinationSegProperties = destinationSegProperties;
    this.executorService = Executors.newFixedThreadPool(1);
    batchSize = CarbonProperties.getQueryBatchSize();
    if (init) {
      init();
    }
  }

  protected void init() {
    this.prefetchEnabled = CarbonProperties.getInstance().getProperty(
        CarbonCommonConstants.CARBON_COMPACTION_PREFETCH_ENABLE,
        CarbonCommonConstants.CARBON_COMPACTION_PREFETCH_ENABLE_DEFAULT).equalsIgnoreCase("true");
    try {
      new RowsFetcher(false).call();
      if (prefetchEnabled) {
        this.fetchFuture = executorService.submit(new RowsFetcher(true));
      }
    } catch (Exception e) {
      LOGGER.error("Error occurs while fetching records", e);
      throw new RuntimeException(e);
    }
  }

  /**
   * fetch rows
   */
  private final class RowsFetcher implements Callable<Void> {
    private boolean isBackupFilling;

    private RowsFetcher(boolean isBackupFilling) {
      this.isBackupFilling = isBackupFilling;
    }

    @Override
    public Void call() throws Exception {
      if (isBackupFilling) {
        backupBuffer = fetchRows();
        isBackupFilled = true;
      } else {
        currentBuffer = fetchRows();
      }
      return null;
    }
  }

  private List<Object[]> fetchRows() {
    List<Object[]> converted = new ArrayList<>();
    while (detailRawQueryResultIterator.hasNext()) {
      for (Object[] r : detailRawQueryResultIterator.next().getRows()) {
        converted.add(convertRow(r));
      }
      if (converted.size() >= batchSize) {
        break;
      }
    }
    return converted;
  }

  private void fillDataFromPrefetch() {
    try {
      if (currentIdxInBuffer >= currentBuffer.size() && 0 != currentIdxInBuffer) {
        if (prefetchEnabled) {
          if (!isBackupFilled) {
            fetchFuture.get();
          }
          // copy backup buffer to current buffer and fill backup buffer asynchronously
          currentIdxInBuffer = 0;
          currentBuffer.clear();
          currentBuffer = backupBuffer;
          isBackupFilled = false;
          fetchFuture = executorService.submit(new RowsFetcher(true));
        } else {
          currentIdxInBuffer = 0;
          new RowsFetcher(false).call();
        }
      }
    } catch (Exception e) {
      throw new RuntimeException(e);
    }
  }

  /**
   * populate a row with index counter increased
   */
  private void popRow() {
    fillDataFromPrefetch();
    currentRawRow = currentBuffer.get(currentIdxInBuffer);
    currentIdxInBuffer++;
  }

  /**
   * populate a row with index counter unchanged
   */
  private void pickRow() {
    fillDataFromPrefetch();
    currentRawRow = currentBuffer.get(currentIdxInBuffer);
  }

  @Override
  public boolean hasNext() {
    fillDataFromPrefetch();
    if (currentIdxInBuffer < currentBuffer.size()) {
      return true;
    }

    return false;
  }

  @Override
  public Object[] next() {
    try {
      popRow();
      return this.currentRawRow;
    } catch (Exception e) {
      throw new RuntimeException(e);
    }
  }

  /**
   * for fetching the row with out incrementing counter.
   * @return
   */
  public Object[] fetchConverted() {
    pickRow();
    return this.currentRawRow;
  }

  protected Object[] convertRow(Object[] rawRow) {
    return rawRow;
  }

  public void close() {
    if (null != executorService) {
      executorService.shutdownNow();
    }
  }
}