/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.accumulo.examples;

import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.junit.jupiter.api.Assumptions.assumeTrue;

import java.io.BufferedWriter;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;

import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.BatchScanner;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.BatchWriterConfig;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.MutationsRejectedException;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.iterators.user.AgeOffFilter;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.examples.client.RandomBatchScanner;
import org.apache.accumulo.examples.client.ReadWriteExample;
import org.apache.accumulo.examples.client.RowOperations;
import org.apache.accumulo.examples.client.SequentialBatchWriter;
import org.apache.accumulo.examples.combiner.StatsCombiner;
import org.apache.accumulo.examples.constraints.MaxMutationSize;
import org.apache.accumulo.examples.helloworld.Insert;
import org.apache.accumulo.examples.helloworld.Read;
import org.apache.accumulo.examples.isolation.InterferenceTest;
import org.apache.accumulo.examples.mapreduce.RegexExample;
import org.apache.accumulo.examples.mapreduce.RowHash;
import org.apache.accumulo.examples.mapreduce.TableToFile;
import org.apache.accumulo.examples.mapreduce.TeraSortIngest;
import org.apache.accumulo.examples.mapreduce.WordCount;
import org.apache.accumulo.examples.shard.ContinuousQuery;
import org.apache.accumulo.examples.shard.Index;
import org.apache.accumulo.examples.shard.Query;
import org.apache.accumulo.examples.shard.Reverse;
import org.apache.accumulo.harness.AccumuloClusterHarness;
import org.apache.accumulo.minicluster.MemoryUnit;
import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
import org.apache.accumulo.test.TestIngest;
import org.apache.accumulo.test.TestIngest.IngestParams;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

public class ExamplesIT extends AccumuloClusterHarness {
  private static final BatchWriterConfig bwc = new BatchWriterConfig();
  private static final String auths = "A,B";

  private AccumuloClient c;
  private BatchWriter bw;
  private IteratorSetting is;
  private String dir;
  private FileSystem fs;
  private Authorizations origAuths;

  @Override
  public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopConf) {
    // 128MB * 3
    cfg.setDefaultMemory(cfg.getDefaultMemory() * 3, MemoryUnit.BYTE);
    cfg.setProperty(Property.TSERV_NATIVEMAP_ENABLED, "false");
  }

  @BeforeEach
  public void setupTest() throws Exception {
    c = Accumulo.newClient().from(getClientProps()).build();
    String user = c.whoami();
    String instance = getClientInfo().getInstanceName();
    String keepers = getClientInfo().getZooKeepers();
    AuthenticationToken token = getAdminToken();
    if (token instanceof PasswordToken) {
      String passwd = new String(((PasswordToken) getAdminToken()).getPassword(), UTF_8);
      writeClientPropsFile(getClientPropsFile(), instance, keepers, user, passwd);
    } else {
      fail("Unknown token type: " + token);
    }
    fs = getCluster().getFileSystem();
    dir = new Path(cluster.getTemporaryPath(), getClass().getName()).toString();

    origAuths = c.securityOperations().getUserAuthorizations(user);
    c.securityOperations().changeUserAuthorizations(user, new Authorizations(auths.split(",")));
  }

  @AfterEach
  public void teardownTest() throws Exception {
    if (bw != null) {
      bw.close();
    }
    if (null != origAuths) {
      c.securityOperations().changeUserAuthorizations(getAdminPrincipal(), origAuths);
    }
    c.close();
  }

  public static void writeClientPropsFile(String file, String instance, String keepers, String user,
      String password) throws IOException {
    try (BufferedWriter writer = Files.newBufferedWriter(Paths.get(file))) {
      writer.write("instance.name=" + instance + "\n");
      writer.write("instance.zookeepers=" + keepers + "\n");
      writer.write("auth.type=password\n");
      writer.write("auth.principal=" + user + "\n");
      writer.write("auth.token=" + password + "\n");
    }
  }

  private String getClientPropsFile() {
    return System.getProperty("user.dir") + "/target/accumulo-client.properties";
  }

  @Override
  protected Duration defaultTimeout() {
    return Duration.ofMinutes(6);
  }

  @Test
  public void testAgeoffFilter() throws Exception {
    String tableName = getUniqueNames(1)[0];
    c.tableOperations().create(tableName);
    is = new IteratorSetting(10, AgeOffFilter.class);
    AgeOffFilter.setTTL(is, 1000L);
    c.tableOperations().attachIterator(tableName, is);
    sleepUninterruptibly(500, TimeUnit.MILLISECONDS); // let zookeeper updates propagate.
    bw = c.createBatchWriter(tableName, bwc);
    Mutation m = new Mutation("foo");
    m.put("a", "b", "c");
    bw.addMutation(m);
    bw.close();
    sleepUninterruptibly(1, TimeUnit.SECONDS);
    try (Scanner scanner = c.createScanner(tableName, Authorizations.EMPTY)) {
      assertTrue(scanner.stream().findAny().isEmpty());
    }
  }

  @Test
  public void testStatsCombiner() throws Exception {
    String table = getUniqueNames(1)[0];
    c.tableOperations().create(table);
    is = new IteratorSetting(10, StatsCombiner.class);
    StatsCombiner.setCombineAllColumns(is, true);
    StatsCombiner.setRadix(is, 10);
    assertTrue(is.getOptions().containsKey(StatsCombiner.RADIX_OPTION));

    c.tableOperations().attachIterator(table, is);
    bw = c.createBatchWriter(table, bwc);
    // Write two mutations otherwise the NativeMap would dedupe them into a single update
    Mutation m = new Mutation("foo");
    m.put("a", "b", "1");
    bw.addMutation(m);
    m = new Mutation("foo");
    m.put("a", "b", "3");
    bw.addMutation(m);
    bw.flush();

    try (Scanner scanner = c.createScanner(table, Authorizations.EMPTY)) {
      Iterator<Entry<Key,Value>> iter = scanner.iterator();
      assertTrue(iter.hasNext(), "Iterator had no results");
      Entry<Key,Value> e = iter.next();
      assertEquals("1,3,4,2", e.getValue().toString(), "Results ");
      assertFalse(iter.hasNext(), "Iterator had additional results");

      m = new Mutation("foo");
      m.put("a", "b", "0,20,20,2");
      bw.addMutation(m);
      bw.close();
    }

    try (Scanner scanner = c.createScanner(table, Authorizations.EMPTY)) {
      Iterator<Entry<Key,Value>> iter = scanner.iterator();
      assertTrue(iter.hasNext(), "Iterator had no results");
      Entry<Key,Value> e = iter.next();
      assertEquals("0,20,24,4", e.getValue().toString(), "Results ");
      assertFalse(iter.hasNext(), "Iterator had additional results");
    }
  }

  @Test
  public void testShardedIndex() throws Exception {
    File src = new File(System.getProperty("user.dir") + "/src");
    assumeTrue(src.exists());
    String[] names = getUniqueNames(3);
    final String shard = names[0], index = names[1];
    c.tableOperations().create(shard);
    c.tableOperations().create(index);
    bw = c.createBatchWriter(shard, bwc);
    Index.index(30, src, "\\W+", bw);
    bw.close();
    List<String> found;
    try (BatchScanner bs = c.createBatchScanner(shard, Authorizations.EMPTY, 4)) {
      found = Query.query(bs, Arrays.asList("foo", "bar"), null);
    }
    // should find ourselves
    assertTrue(found.stream().anyMatch(file -> file.endsWith("/ExamplesIT.java")));

    String[] args = new String[] {"-c", getClientPropsFile(), "--shardTable", shard, "--doc2Term",
        index};

    // create a reverse index
    goodExec(Reverse.class, args);
    args = new String[] {"-c", getClientPropsFile(), "--shardTable", shard, "--doc2Term", index,
        "--terms", "5", "--count", "1000"};
    // run some queries
    goodExec(ContinuousQuery.class, args);
  }

  @Test
  public void testMaxMutationConstraint() throws Exception {
    String tableName = getUniqueNames(1)[0];
    c.tableOperations().create(tableName);
    c.tableOperations().addConstraint(tableName, MaxMutationSize.class.getName());
    IngestParams params = new IngestParams(c.properties(), tableName, 1);
    params.cols = 1000;
    try {
      TestIngest.ingest(c, params);
    } catch (MutationsRejectedException ex) {
      assertEquals(1, ex.getConstraintViolationSummaries().size());
    }
  }

  @Test
  public void testTeraSortAndRead() throws Exception {
    assumeTrue(getAdminToken() instanceof PasswordToken);
    String tableName = getUniqueNames(1)[0];
    String[] args = new String[] {"--count", (1000 * 1000) + "", "-nk", "10", "-xk", "10", "-nv",
        "10", "-xv", "10", "-t", tableName, "-c", getClientPropsFile(), "--splits", "4"};
    goodExec(TeraSortIngest.class, args);
    Path output = new Path(dir, "tmp/nines");
    if (fs.exists(output)) {
      fs.delete(output, true);
    }
    args = new String[] {"-c", getClientPropsFile(), "-t", tableName, "--rowRegex", ".*999.*",
        "--output", output.toString()};
    goodExec(RegexExample.class, args);
    args = new String[] {"-c", getClientPropsFile(), "-t", tableName, "--column", "c:"};
    goodExec(RowHash.class, args);
    output = new Path(dir, "tmp/tableFile");
    if (fs.exists(output)) {
      fs.delete(output, true);
    }
    args = new String[] {"-c", getClientPropsFile(), "-t", tableName, "--output",
        output.toString()};
    goodExec(TableToFile.class, args);
  }

  @Test
  public void testWordCount() throws Exception {
    assumeTrue(getAdminToken() instanceof PasswordToken);
    Path readme = new Path(new Path(System.getProperty("user.dir")), "README.md");
    if (!new File(readme.toString()).exists()) {
      fail("README.md does not exist!");
    }
    fs.copyFromLocalFile(readme, new Path(dir + "/tmp/wc/README.md"));
    String[] args = new String[] {"-c", getClientPropsFile(), "-i", dir + "/tmp/wc", "-t",
        getUniqueNames(1)[0]};
    goodExec(WordCount.class, args);
  }

  @Test
  public void testInsertWithBatchWriterAndReadData() throws Exception {
    String[] args;
    args = new String[] {"-c", getClientPropsFile()};
    goodExec(Insert.class, args);
    goodExec(Read.class, args);
  }

  @Test
  public void testIsolatedScansWithInterference() throws Exception {
    String[] args;
    args = new String[] {"-c", getClientPropsFile(), "-t", getUniqueNames(1)[0], "--iterations",
        "100000", "--isolated"};
    goodExec(InterferenceTest.class, args);
  }

  @Test
  public void testScansWithInterference() throws Exception {
    String[] args;
    args = new String[] {"-c", getClientPropsFile(), "-t", getUniqueNames(1)[0], "--iterations",
        "100000"};
    goodExec(InterferenceTest.class, args);
  }

  @Test
  public void testRowOperations() throws Exception {
    goodExec(RowOperations.class, "-c", getClientPropsFile());
  }

  @Test
  public void testReadWriteAndDelete() throws Exception {
    goodExec(ReadWriteExample.class, "-c", getClientPropsFile());
  }

  @Test
  public void testBatch() throws Exception {
    goodExec(SequentialBatchWriter.class, "-c", getClientPropsFile());
    goodExec(RandomBatchScanner.class, "-c", getClientPropsFile());
  }

  private void goodExec(Class<?> theClass, String... args) throws IOException {
    Entry<Integer,String> pair;
    // We're already slurping stdout into memory (not redirecting to file). Might as well add it
    // to error message.
    pair = getClusterControl().execWithStdout(theClass, args);
    assertEquals(0, pair.getKey().intValue(), "stdout=" + pair.getValue());
  }
}
