/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package org.apache.accumulo.test;

import java.util.Arrays;
import java.util.Iterator;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Random;

import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.conf.ClientProperty;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.PartialKey;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.trace.TraceUtil;
import org.apache.hadoop.io.Text;
import org.apache.htrace.Sampler;
import org.apache.htrace.Span;
import org.apache.htrace.Trace;
import org.apache.htrace.TraceScope;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.beust.jcommander.Parameter;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;

public class VerifyIngest {

  private static final Logger log = LoggerFactory.getLogger(VerifyIngest.class);

  public static int getRow(Key k) {
    return Integer.parseInt(k.getRow().toString().split("_")[1]);
  }

  public static int getCol(Key k) {
    return Integer.parseInt(k.getColumnQualifier().toString().split("_")[1]);
  }

  public static class VerifyParams extends TestIngest.IngestParams {
    public boolean useGet = false;

    public VerifyParams(Properties props) {
      super(props);
    }

    public VerifyParams(Properties props, String table) {
      super(props, table);
    }

    public VerifyParams(Properties props, String table, int rows) {
      super(props, table, rows);
    }
  }

  public static class Opts extends TestIngest.Opts {
    @Parameter(names = "-useGet", description = "fetches values one at a time, instead of scanning")
    public boolean useGet = false;

    public VerifyParams getVerifyParams() {
      VerifyParams params = new VerifyParams(getClientProps(), tableName);
      populateIngestPrams(params);
      params.useGet = useGet;
      return params;
    }
  }

  public static void main(String[] args) throws Exception {
    Opts opts = new Opts();
    opts.parseArgs(VerifyIngest.class.getName(), args);
    if (opts.trace) {
      TraceUtil.enableClientTraces(null, null, new Properties());
    }
    try (TraceScope clientSpan =
        Trace.startSpan(VerifyIngest.class.getSimpleName(), Sampler.ALWAYS)) {
      Span span = clientSpan.getSpan();
      if (span != null)
        span.addKVAnnotation("cmdLine", Arrays.asList(args).toString());

      try (AccumuloClient client = Accumulo.newClient().from(opts.getClientProps()).build()) {
        verifyIngest(client, opts.getVerifyParams());
      }

    } finally {
      TraceUtil.disable();
    }
  }

  @SuppressFBWarnings(value = "PREDICTABLE_RANDOM",
      justification = "predictable random is okay for testing")
  public static void verifyIngest(AccumuloClient accumuloClient, VerifyParams params)
      throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
    byte[][] bytevals = TestIngest.generateValues(params.dataSize);

    Authorizations labelAuths = new Authorizations("L1", "L2", "G1", "GROUP2");
    String principal = ClientProperty.AUTH_PRINCIPAL.getValue(params.clientProps);
    accumuloClient.securityOperations().changeUserAuthorizations(principal, labelAuths);

    int expectedRow = params.startRow;
    int expectedCol = 0;
    int recsRead = 0;

    long bytesRead = 0;
    long t1 = System.currentTimeMillis();

    byte[] randomValue = new byte[params.dataSize];
    Random random = new Random();

    Key endKey = new Key(new Text("row_" + String.format("%010d", params.rows + params.startRow)));

    int errors = 0;

    while (expectedRow < (params.rows + params.startRow)) {

      if (params.useGet) {
        Text rowKey = new Text("row_" + String.format("%010d", expectedRow + params.startRow));
        Text colf = new Text(params.columnFamily);
        Text colq = new Text("col_" + String.format("%07d", expectedCol));

        try (Scanner scanner = accumuloClient.createScanner("test_ingest", labelAuths)) {
          scanner.setBatchSize(1);
          Key startKey = new Key(rowKey, colf, colq);
          Range range = new Range(startKey, startKey.followingKey(PartialKey.ROW_COLFAM_COLQUAL));
          scanner.setRange(range);

          byte[] val = null; // t.get(rowKey, column);

          Iterator<Entry<Key,Value>> iter = scanner.iterator();

          if (iter.hasNext()) {
            val = iter.next().getValue().get();
          }

          byte[] ev;
          if (params.random != null) {
            ev = TestIngest.genRandomValue(random, randomValue, params.random, expectedRow,
                expectedCol);
          } else {
            ev = bytevals[expectedCol % bytevals.length];
          }

          if (val == null) {
            log.error("Did not find {} {} {}", rowKey, colf, colq);
            errors++;
          } else {
            recsRead++;
            bytesRead += val.length;
            Value value = new Value(val);
            if (value.compareTo(ev) != 0) {
              log.error("unexpected value  ({} {} {} : saw {} expected {}", rowKey, colf, colq,
                  value, new Value(ev));
              errors++;
            }
          }

          expectedCol++;
          if (expectedCol >= params.cols) {
            expectedCol = 0;
            expectedRow++;
          }
        }
      } else {

        Key startKey = new Key(new Text("row_" + String.format("%010d", expectedRow)));

        try (Scanner scanner = accumuloClient.createScanner(params.tableName, labelAuths)) {
          scanner.setRange(new Range(startKey, endKey));
          for (int j = 0; j < params.cols; j++) {
            scanner.fetchColumn(new Text(params.columnFamily),
                new Text("col_" + String.format("%07d", j)));
          }

          int recsReadBefore = recsRead;

          for (Entry<Key,Value> entry : scanner) {

            recsRead++;

            bytesRead += entry.getKey().getLength();
            bytesRead += entry.getValue().getSize();

            int rowNum = getRow(entry.getKey());
            int colNum = getCol(entry.getKey());

            if (rowNum != expectedRow) {
              log.error("rowNum != expectedRow   {} != {}", rowNum, expectedRow);
              errors++;
              expectedRow = rowNum;
            }

            if (colNum != expectedCol) {
              log.error("colNum != expectedCol  {} != {}  rowNum : {}", colNum, expectedCol,
                  rowNum);
              errors++;
            }

            if (expectedRow >= (params.rows + params.startRow)) {
              log.error(
                  "expectedRow ({}) >= (ingestArgs.rows + ingestArgs.startRow)  ({}), get"
                      + " batch returned data passed end key",
                  expectedRow, (params.rows + params.startRow));
              errors++;
              break;
            }

            byte[] value;
            if (params.random != null) {
              value = TestIngest.genRandomValue(random, randomValue, params.random, expectedRow,
                  colNum);
            } else {
              value = bytevals[colNum % bytevals.length];
            }

            if (entry.getValue().compareTo(value) != 0) {
              log.error("unexpected value, rowNum : {} colNum : {}", rowNum, colNum);
              log.error(" saw = {} expected = {}", new String(entry.getValue().get()),
                  new String(value));
              errors++;
            }

            if (params.timestamp >= 0 && entry.getKey().getTimestamp() != params.timestamp) {
              log.error("unexpected timestamp {}, rowNum : {} colNum : {}",
                  entry.getKey().getTimestamp(), rowNum, colNum);
              errors++;
            }

            expectedCol++;
            if (expectedCol >= params.cols) {
              expectedCol = 0;
              expectedRow++;
            }

          }

          if (recsRead == recsReadBefore) {
            log.warn("Scan returned nothing, breaking...");
            break;
          }
        }
      }
    }

    long t2 = System.currentTimeMillis();

    if (errors > 0) {
      throw new AccumuloException("saw " + errors + " errors ");
    }

    if (expectedRow == (params.rows + params.startRow)) {
      System.out.printf(
          "%,12d records read | %,8d records/sec | %,12d bytes read |"
              + " %,8d bytes/sec | %6.3f secs   %n",
          recsRead, (int) ((recsRead) / ((t2 - t1) / 1000.0)), bytesRead,
          (int) (bytesRead / ((t2 - t1) / 1000.0)), (t2 - t1) / 1000.0);
    } else {
      throw new AccumuloException("Did not read expected number of rows. Saw "
          + (expectedRow - params.startRow) + " expected " + params.rows);
    }
  }

}
