/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package org.apache.accumulo.test.functional;

import static org.junit.Assert.assertEquals;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;

import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.BatchDeleter;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.BatchWriterConfig;
import org.apache.accumulo.core.client.IsolatedScanner;
import org.apache.accumulo.core.client.MutationsRejectedException;
import org.apache.accumulo.core.client.RowIterator;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.TableExistsException;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.admin.NewTableConfiguration;
import org.apache.accumulo.core.clientImpl.ClientContext;
import org.apache.accumulo.core.clientImpl.ClientInfo;
import org.apache.accumulo.core.clientImpl.Tables;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.manager.state.tables.TableState;
import org.apache.accumulo.core.master.thrift.ManagerState;
import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.metadata.TServerInstance;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.CurrentLocationColumnFamily;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.fate.util.UtilWaitThread;
import org.apache.accumulo.fate.zookeeper.ZooCache;
import org.apache.accumulo.fate.zookeeper.ZooLock;
import org.apache.accumulo.fate.zookeeper.ZooUtil;
import org.apache.accumulo.harness.AccumuloClusterHarness;
import org.apache.accumulo.server.manager.state.CurrentState;
import org.apache.accumulo.server.manager.state.MergeInfo;
import org.apache.accumulo.server.manager.state.MetaDataTableScanner;
import org.apache.accumulo.server.manager.state.TabletStateChangeIterator;
import org.apache.hadoop.io.Text;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.collect.Sets;

/**
 * Test to ensure that the {@link TabletStateChangeIterator} properly skips over tablet information
 * in the metadata table when there is no work to be done on the tablet (see ACCUMULO-3580)
 */
public class TabletStateChangeIteratorIT extends AccumuloClusterHarness {
  private final static Logger log = LoggerFactory.getLogger(TabletStateChangeIteratorIT.class);

  @Override
  public int defaultTimeoutSeconds() {
    return 3 * 60;
  }

  @Test
  public void test() throws AccumuloException, AccumuloSecurityException, TableExistsException,
      TableNotFoundException {

    try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) {

      String[] tables = getUniqueNames(6);
      final String t1 = tables[0];
      final String t2 = tables[1];
      final String t3 = tables[2];
      final String metaCopy1 = tables[3];
      final String metaCopy2 = tables[4];
      final String metaCopy3 = tables[5];

      // create some metadata
      createTable(client, t1, true);
      createTable(client, t2, false);
      createTable(client, t3, true);

      // examine a clone of the metadata table, so we can manipulate it
      copyTable(client, MetadataTable.NAME, metaCopy1);

      State state = new State(client);
      int tabletsInFlux = findTabletsNeedingAttention(client, metaCopy1, state);
      while (tabletsInFlux > 0) {
        log.debug("Waiting for {} tablets for {}", tabletsInFlux, metaCopy1);
        UtilWaitThread.sleep(500);
        copyTable(client, MetadataTable.NAME, metaCopy1);
        tabletsInFlux = findTabletsNeedingAttention(client, metaCopy1, state);
      }
      assertEquals("No tables should need attention", 0,
          findTabletsNeedingAttention(client, metaCopy1, state));

      // The metadata table stabilized and metaCopy1 contains a copy suitable for testing. Before
      // metaCopy1 is modified, copy it for subsequent test.
      copyTable(client, metaCopy1, metaCopy2);
      copyTable(client, metaCopy1, metaCopy3);

      // test the assigned case (no location)
      removeLocation(client, metaCopy1, t3);
      assertEquals("Should have two tablets without a loc", 2,
          findTabletsNeedingAttention(client, metaCopy1, state));

      // test the cases where the assignment is to a dead tserver
      reassignLocation(client, metaCopy2, t3);
      assertEquals("Should have one tablet that needs to be unassigned", 1,
          findTabletsNeedingAttention(client, metaCopy2, state));

      // test the cases where there is ongoing merges
      state = new State(client) {
        @Override
        public Collection<MergeInfo> merges() {
          TableId tableIdToModify = TableId.of(client.tableOperations().tableIdMap().get(t3));
          return Collections.singletonList(
              new MergeInfo(new KeyExtent(tableIdToModify, null, null), MergeInfo.Operation.MERGE));
        }
      };
      assertEquals("Should have 2 tablets that need to be chopped or unassigned", 1,
          findTabletsNeedingAttention(client, metaCopy2, state));

      // test the bad tablet location state case (inconsistent metadata)
      state = new State(client);
      addDuplicateLocation(client, metaCopy3, t3);
      assertEquals("Should have 1 tablet that needs a metadata repair", 1,
          findTabletsNeedingAttention(client, metaCopy3, state));

      // clean up
      dropTables(client, t1, t2, t3, metaCopy1, metaCopy2, metaCopy3);
    }
  }

  private void addDuplicateLocation(AccumuloClient client, String table, String tableNameToModify)
      throws TableNotFoundException, MutationsRejectedException {
    TableId tableIdToModify =
        TableId.of(client.tableOperations().tableIdMap().get(tableNameToModify));
    Mutation m = new Mutation(new KeyExtent(tableIdToModify, null, null).toMetaRow());
    m.put(CurrentLocationColumnFamily.NAME, new Text("1234567"), new Value("fake:9005"));
    try (BatchWriter bw = client.createBatchWriter(table)) {
      bw.addMutation(m);
    }
  }

  private void reassignLocation(AccumuloClient client, String table, String tableNameToModify)
      throws TableNotFoundException, MutationsRejectedException {
    TableId tableIdToModify =
        TableId.of(client.tableOperations().tableIdMap().get(tableNameToModify));
    try (Scanner scanner = client.createScanner(table, Authorizations.EMPTY)) {
      scanner.setRange(new KeyExtent(tableIdToModify, null, null).toMetaRange());
      scanner.fetchColumnFamily(CurrentLocationColumnFamily.NAME);
      Entry<Key,Value> entry = scanner.iterator().next();
      Mutation m = new Mutation(entry.getKey().getRow());
      m.putDelete(entry.getKey().getColumnFamily(), entry.getKey().getColumnQualifier(),
          entry.getKey().getTimestamp());
      m.put(entry.getKey().getColumnFamily(), new Text("1234567"),
          entry.getKey().getTimestamp() + 1, new Value("fake:9005"));
      try (BatchWriter bw = client.createBatchWriter(table)) {
        bw.addMutation(m);
      }
    }
  }

  private void removeLocation(AccumuloClient client, String table, String tableNameToModify)
      throws TableNotFoundException, MutationsRejectedException {
    TableId tableIdToModify =
        TableId.of(client.tableOperations().tableIdMap().get(tableNameToModify));
    BatchDeleter deleter =
        client.createBatchDeleter(table, Authorizations.EMPTY, 1, new BatchWriterConfig());
    deleter
        .setRanges(Collections.singleton(new KeyExtent(tableIdToModify, null, null).toMetaRange()));
    deleter.fetchColumnFamily(CurrentLocationColumnFamily.NAME);
    deleter.delete();
    deleter.close();
  }

  private int findTabletsNeedingAttention(AccumuloClient client, String table, State state)
      throws TableNotFoundException {
    int results = 0;
    List<Key> resultList = new ArrayList<>();
    try (Scanner scanner = client.createScanner(table, Authorizations.EMPTY)) {
      MetaDataTableScanner.configureScanner(scanner, state);
      log.debug("Current state = {}", state);
      scanner.updateScanIteratorOption("tabletChange", "debug", "1");
      for (Entry<Key,Value> e : scanner) {
        if (e != null) {
          results++;
          resultList.add(e.getKey());
        }
      }
    }
    log.debug("Tablets in flux: {}", resultList);
    return results;
  }

  private void createTable(AccumuloClient client, String t, boolean online)
      throws AccumuloSecurityException, AccumuloException, TableNotFoundException,
      TableExistsException {
    SortedSet<Text> partitionKeys = new TreeSet<>();
    partitionKeys.add(new Text("some split"));
    NewTableConfiguration ntc = new NewTableConfiguration().withSplits(partitionKeys);
    client.tableOperations().create(t, ntc);
    client.tableOperations().online(t, true);
    if (!online) {
      client.tableOperations().offline(t, true);
    }
  }

  /**
   * Create a copy of the source table by first gathering all the rows of the source in a list of
   * mutations. Then create the copy of the table and apply the mutations to the copy.
   */
  private void copyTable(AccumuloClient client, String source, String copy)
      throws AccumuloException, AccumuloSecurityException, TableNotFoundException,
      TableExistsException {
    try {
      dropTables(client, copy);
    } catch (TableNotFoundException ex) {
      // ignored
    }

    log.info("Gathering rows to copy {} ", source);
    List<Mutation> mutations = new ArrayList<>();

    try (Scanner scanner = client.createScanner(source, Authorizations.EMPTY)) {
      RowIterator rows = new RowIterator(new IsolatedScanner(scanner));

      while (rows.hasNext()) {
        Iterator<Entry<Key,Value>> row = rows.next();
        Mutation m = null;

        while (row.hasNext()) {
          Entry<Key,Value> entry = row.next();
          Key k = entry.getKey();
          if (m == null)
            m = new Mutation(k.getRow());

          m.put(k.getColumnFamily(), k.getColumnQualifier(), k.getColumnVisibilityParsed(),
              k.getTimestamp(), entry.getValue());
        }

        mutations.add(m);
      }
    }

    // metadata should be stable with only 7 rows (1 replication + 2 for each table)
    log.debug("Gathered {} rows to create copy {}", mutations.size(), copy);
    assertEquals("Metadata should have 7 rows (1 repl + 2 for each table)", 7, mutations.size());
    client.tableOperations().create(copy);

    try (BatchWriter writer = client.createBatchWriter(copy, new BatchWriterConfig())) {
      for (Mutation m : mutations) {
        writer.addMutation(m);
      }
    }

    log.info("Finished creating copy " + copy);
  }

  private void dropTables(AccumuloClient client, String... tables)
      throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
    for (String t : tables) {
      client.tableOperations().delete(t);
    }
  }

  private static class State implements CurrentState {

    final AccumuloClient client;

    State(AccumuloClient client) {
      this.client = client;
    }

    private Set<TServerInstance> tservers;
    private Set<TableId> onlineTables;

    @Override
    public Set<TServerInstance> onlineTabletServers() {
      HashSet<TServerInstance> tservers = new HashSet<>();
      for (String tserver : client.instanceOperations().getTabletServers()) {
        try {
          String zPath = ZooUtil.getRoot(client.instanceOperations().getInstanceID())
              + Constants.ZTSERVERS + "/" + tserver;
          ClientInfo info = getClientInfo();
          long sessionId = ZooLock.getSessionId(
              new ZooCache(info.getZooKeepers(), info.getZooKeepersSessionTimeOut()), zPath);
          tservers.add(new TServerInstance(tserver, sessionId));
        } catch (Exception e) {
          throw new RuntimeException(e);
        }
      }
      this.tservers = Collections.unmodifiableSet(tservers);
      return tservers;
    }

    @Override
    public Set<TableId> onlineTables() {
      ClientContext context = (ClientContext) client;
      Set<TableId> onlineTables = Tables.getIdToNameMap(context).keySet();
      this.onlineTables = Sets.filter(onlineTables,
          tableId -> Tables.getTableState(context, tableId) == TableState.ONLINE);
      return this.onlineTables;
    }

    @Override
    public Collection<MergeInfo> merges() {
      return Collections.emptySet();
    }

    @Override
    public Set<KeyExtent> migrationsSnapshot() {
      return Collections.emptySet();
    }

    @Override
    public Set<TServerInstance> shutdownServers() {
      return Collections.emptySet();
    }

    @Override
    public ManagerState getManagerState() {
      return ManagerState.NORMAL;
    }

    @Override
    public String toString() {
      return "tservers: " + tservers + " onlineTables: " + onlineTables;
    }
  }

}
