/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.accumulo.testing.healthprobe;

import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;
import java.util.Random;
import java.util.concurrent.TimeUnit;

import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.RowIterator;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.admin.Locations;
import org.apache.accumulo.core.client.admin.TableOperations;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.TabletId;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.security.Authorizations;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

public class Monitor {

  private static final Logger log = LoggerFactory.getLogger(Monitor.class);
  static long distance = 1l;
  static List<TabletId> tablets;
  static boolean cacheTablets = true;

  public static void main(String[] args) throws Exception {
    MonitorOpts opts = new MonitorOpts();
    opts.parseArgs(Monitor.class.getName(), args);
    distance = opts.distance;
    Authorizations auth = new Authorizations();
    if (opts.auth != "") {
      auth = new Authorizations(opts.auth);
    }

    try (AccumuloClient client = Accumulo.newClient().from(opts.getClientProps()).build();
        Scanner scanner = client.createScanner(opts.tableName, auth)) {
      if (opts.isolate) {
        scanner.enableIsolation();
      }
      int scannerSleepMs = opts.sleep_ms;
      LoopControl scanning_condition = opts.continuous ? new ContinuousLoopControl()
          : new IterativeLoopControl(opts.scan_iterations);

      while (scanning_condition.keepScanning()) {

        Random tablet_index_generator = new Random();
        TabletId pickedTablet = pickTablet(client.tableOperations(), opts.tableName,
            tablet_index_generator);
        Range range = pickedTablet.toRange();
        scanner.setRange(range);

        if (opts.batch_size > 0) {
          scanner.setBatchSize(opts.batch_size);
        }
        try {
          long startTime = System.nanoTime();
          long count = consume(scanner, opts.distance);
          long stopTime = System.nanoTime();
          MDC.put("StartTime", String.valueOf(startTime));
          MDC.put("TabletId", String.valueOf(pickedTablet));
          MDC.put("TableName", String.valueOf(opts.tableName));
          MDC.put("TotalTime", String.valueOf((stopTime - startTime)));
          MDC.put("StartRow", String.valueOf(range.getStartKey()));
          MDC.put("EndRow", String.valueOf(range.getEndKey()));
          MDC.put("TotalRecords", String.valueOf(count));

          log.info("SCN starttime={} tabletindex={} tablename={} totaltime={} totalrecords={}",
              startTime, tablet_index_generator, opts.tableName, (stopTime - startTime), count);
          if (scannerSleepMs > 0) {
            sleepUninterruptibly(scannerSleepMs, TimeUnit.MILLISECONDS);
          }
        } catch (Exception e) {
          log.error(String.format(
              "Exception while scanning range %s. Check the state of Accumulo for errors.", range),
              e);
        }
      }
    }
  }

  public static int consume(Iterable<Entry<Key,Value>> scanner, long numberOfRows) {
    RowIterator rowIter = new RowIterator(scanner);
    int count = 0;

    while (rowIter.hasNext()) {
      Iterator<Entry<Key,Value>> itr = rowIter.next();
      count++;
      if (count >= numberOfRows) {
        break;
      }
    }
    return count;
  }

  public static TabletId pickTablet(TableOperations tops, String table, Random r)
      throws TableNotFoundException, AccumuloSecurityException, AccumuloException {

    if (cacheTablets) {
      Locations locations = tops.locate(table, Collections.singleton(new Range()));
      tablets = new ArrayList<TabletId>(locations.groupByTablet().keySet());
      cacheTablets = false;
    }
    int index = r.nextInt(tablets.size());
    TabletId tabletId = tablets.get(index);
    return tabletId;
  }

  /*
   * These interfaces + implementations are used to determine how many times the scanner should look
   * up a random tablet and scan it.
   */
  static interface LoopControl {
    public boolean keepScanning();
  }

  // Does a finite number of iterations
  static class IterativeLoopControl implements LoopControl {
    private final int max;
    private int current;

    public IterativeLoopControl(int max) {
      this.max = max;
      this.current = 0;
    }

    @Override
    public boolean keepScanning() {
      if (current < max) {
        ++current;
        return true;
      } else {
        return false;
      }
    }
  }

  // Does an infinite number of iterations
  static class ContinuousLoopControl implements LoopControl {
    @Override
    public boolean keepScanning() {
      return true;
    }
  }
}
