/*
 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
 * agreements. See the NOTICE file distributed with this work for additional information regarding
 * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance with the License. You may obtain a
 * copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License
 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
 * or implied. See the License for the specific language governing permissions and limitations under
 * the License.
 */

package org.apache.fluo.core.util;

import java.io.IOException;
import java.io.PrintStream;
import java.text.DateFormat;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;

import com.google.common.collect.Iterables;
import com.google.gson.FieldNamingPolicy;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonIOException;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.fluo.accumulo.format.FluoFormatter;
import org.apache.fluo.api.client.FluoClient;
import org.apache.fluo.api.client.FluoFactory;
import org.apache.fluo.api.client.Snapshot;
import org.apache.fluo.api.client.scanner.CellScanner;
import org.apache.fluo.api.config.FluoConfiguration;
import org.apache.fluo.api.data.Bytes;
import org.apache.fluo.api.data.Column;
import org.apache.fluo.api.data.RowColumnValue;
import org.apache.fluo.api.data.Span;

public class ScanUtil {
  public static final String FLUO_VALUE = "value";
  public static final String FLUO_COLUMN_VISIBILITY = "visibility";
  public static final String FLUO_COLUMN_QUALIFIER = "qualifier";
  public static final String FLUO_COLUMN_FAMILY = "family";
  public static final String FLUO_ROW = "row";

  public static Span getSpan(ScanOpts options) {
    Span span = new Span();
    if ((options.getExactRow() != null) && ((options.getStartRow() != null)
        || (options.getEndRow() != null) || (options.getRowPrefix() != null))) {
      throw new IllegalArgumentException(
          "You cannot specify an exact row with a start/end row or row prefix!");
    }

    if ((options.getRowPrefix() != null) && ((options.getStartRow() != null)
        || (options.getEndRow() != null) || (options.getExactRow() != null))) {
      throw new IllegalArgumentException(
          "You cannot specify an prefix row with a start/end row or exact row!");
    }

    // configure span of scanner
    if (options.getExactRow() != null) {
      span = Span.exact(options.getExactRow());
    } else if (options.getRowPrefix() != null) {
      span = Span.prefix(options.getRowPrefix());
    } else {
      if ((options.getStartRow() != null) && (options.getEndRow() != null)) {
        span = new Span(options.getStartRow(), true, options.getEndRow(), true);
      } else if (options.getStartRow() != null) {
        span = new Span(Bytes.of(options.getStartRow()), true, Bytes.EMPTY, true);
      } else if (options.getEndRow() != null) {
        span = new Span(Bytes.EMPTY, true, Bytes.of(options.getEndRow()), true);
      }
    }

    return span;
  }

  public static Collection<Column> getColumns(ScanOpts options) {
    Collection<Column> columns = new HashSet<>();

    // configure columns of scanner
    for (String column : options.getColumns()) {
      String[] colFields = column.split(":");
      if (colFields.length == 1) {
        columns.add(new Column(colFields[0]));
      } else if (colFields.length == 2) {
        columns.add(new Column(colFields[0], colFields[1]));
      } else {
        throw new IllegalArgumentException(
            "Failed to scan!  Column '" + column + "' has too many fields (indicated by ':')");
      }
    }

    return columns;
  }


  private static Function<Bytes, String> getEncoder(ScanOpts options) {
    if (options.hexEncNonAscii) {
      return Hex::encNonAscii;
    } else {
      return Bytes::toString;
    }
  }


  private static void scan(ScanOpts options, PrintStream out, CellScanner cellScanner) {
    Function<Bytes, String> encoder = getEncoder(options);

    if (options.exportAsJson) {
      generateJson(cellScanner, encoder, out);
    } else {
      for (RowColumnValue rcv : cellScanner) {
        out.print(encoder.apply(rcv.getRow()));
        out.print(' ');
        out.print(encoder.apply(rcv.getColumn().getFamily()));
        out.print(' ');
        out.print(encoder.apply(rcv.getColumn().getQualifier()));
        out.print(' ');
        out.print(encoder.apply(rcv.getColumn().getVisibility()));
        out.print("\t");
        out.print(encoder.apply(rcv.getValue()));
        out.println();
        if (out.checkError()) {
          break;
        }
      }
    }
  }

  public static void scanFluo(ScanOpts options, FluoConfiguration sConfig, PrintStream out)
      throws IOException {

    try (FluoClient client = FluoFactory.newClient(sConfig)) {
      try (Snapshot s = client.newSnapshot()) {

        Span span = getSpan(options);
        Collection<Column> columns = getColumns(options);
        CellScanner cellScanner = s.scanner().over(span).fetch(columns).build();

        scan(options, out, cellScanner);
      }
    }
  }

  public static void scanNotifications(ScanOpts options, FluoConfiguration sConfig, PrintStream out)
      throws IOException {

    Connector conn = AccumuloUtil.getConnector(sConfig);

    Span span = getSpan(options);
    Collection<Column> columns = getColumns(options);

    Scanner scanner = null;
    try {
      scanner = conn.createScanner(sConfig.getAccumuloTable(), Authorizations.EMPTY);

      scanner.setRange(SpanUtil.toRange(span));

      NotificationScanner ntfyScanner = new NotificationScanner(scanner, columns);

      scan(options, out, ntfyScanner);
    } catch (TableNotFoundException e) {
      throw new RuntimeException(e);
    } finally {
      if (scanner != null) {
        scanner.close();
      }
    }
  }

  /**
   * Generate JSON format as result of the scan.
   *
   * @since 1.2
   */
  private static void generateJson(CellScanner cellScanner, Function<Bytes, String> encoder,
      PrintStream out) throws JsonIOException {
    Gson gson = new GsonBuilder().serializeNulls().setDateFormat(DateFormat.LONG)
        .setFieldNamingPolicy(FieldNamingPolicy.LOWER_CASE_WITH_UNDERSCORES).setVersion(1.0)
        .create();

    Map<String, String> json = new LinkedHashMap<>();
    for (RowColumnValue rcv : cellScanner) {
      json.put(FLUO_ROW, encoder.apply(rcv.getRow()));
      json.put(FLUO_COLUMN_FAMILY, encoder.apply(rcv.getColumn().getFamily()));
      json.put(FLUO_COLUMN_QUALIFIER, encoder.apply(rcv.getColumn().getQualifier()));
      json.put(FLUO_COLUMN_VISIBILITY, encoder.apply(rcv.getColumn().getVisibility()));
      json.put(FLUO_VALUE, encoder.apply(rcv.getValue()));
      gson.toJson(json, out);
      out.append("\n");

      if (out.checkError()) {
        break;
      }
    }
    out.flush();
  }

  public static void scanAccumulo(ScanOpts options, FluoConfiguration sConfig, PrintStream out) {

    Connector conn = AccumuloUtil.getConnector(sConfig);

    Span span = getSpan(options);
    Collection<Column> columns = getColumns(options);

    try {
      Scanner scanner = conn.createScanner(sConfig.getAccumuloTable(), Authorizations.EMPTY);
      scanner.setRange(SpanUtil.toRange(span));
      for (Column col : columns) {
        if (col.isQualifierSet()) {
          scanner.fetchColumn(ByteUtil.toText(col.getFamily()),
              ByteUtil.toText(col.getQualifier()));
        } else {
          scanner.fetchColumnFamily(ByteUtil.toText(col.getFamily()));
        }
      }

      for (String entry : Iterables.transform(scanner, FluoFormatter::toString)) {
        out.println(entry);
      }
      out.flush();
    } catch (Exception e) {
      throw new RuntimeException(e);
    }
  }

  public static enum ScanFlags {
    HELP,
    // hex encode node ascii
    HEX,
    // scan accumuo table directly
    ACCUMULO,
    // endode output as json
    JSON,
    // scan notification
    NTFY
  }

  public static class ScanOpts {

    private String startRow;
    private String endRow;
    private List<String> columns;
    private String exactRow;
    private String rowPrefix;
    public final boolean help;
    public final boolean hexEncNonAscii;
    public final boolean scanAccumuloTable;
    public final boolean exportAsJson;
    public final boolean scanNtfy;

    public ScanOpts(String startRow, String endRow, List<String> columns, String exactRow,
        String rowPrefix, EnumSet<ScanFlags> flags) {
      this.startRow = startRow;
      this.endRow = endRow;
      this.columns = columns;
      this.exactRow = exactRow;
      this.rowPrefix = rowPrefix;
      this.help = flags.contains(ScanFlags.HELP);
      this.hexEncNonAscii = flags.contains(ScanFlags.HEX);
      this.scanAccumuloTable = flags.contains(ScanFlags.ACCUMULO);
      this.exportAsJson = flags.contains(ScanFlags.JSON);
      this.scanNtfy = flags.contains(ScanFlags.NTFY);
    }

    public String getStartRow() {
      return startRow;
    }

    public String getEndRow() {
      return endRow;
    }

    public String getExactRow() {
      return exactRow;
    }

    public String getRowPrefix() {
      return rowPrefix;
    }

    public List<String> getColumns() {
      if (columns == null) {
        return Collections.emptyList();
      }
      return columns;
    }
  }

  public static void setFlag(EnumSet<ScanFlags> flags, boolean b, ScanFlags flag) {
    if (b) {
      flags.add(flag);
    }
  }
}
