/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.solr.cloud;


import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;

import com.codahale.metrics.Metric;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.CoreAdminRequest;
import org.apache.solr.client.solrj.response.CoreAdminResponse;
import org.apache.solr.client.solrj.response.RequestStatusState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.params.CollectionParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.StrUtils;
import org.apache.solr.metrics.MetricsMap;
import org.apache.solr.metrics.SolrMetricManager;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ReplaceNodeTest extends SolrCloudTestCase {
  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
  @BeforeClass
  public static void setupCluster() throws Exception {
    System.setProperty("metricsEnabled", "true");
    configureCluster(6)
        .addConfig("conf1", TEST_PATH().resolve("configsets").resolve("cloud-dynamic").resolve("conf"))
        .configure();
  }

  protected String getSolrXml() {
    return "solr.xml";
  }

  @Test
  public void test() throws Exception {
    String coll = "replacenodetest_coll";
    if (log.isInfoEnabled()) {
      log.info("total_jettys: {}", cluster.getJettySolrRunners().size());
    }

    CloudSolrClient cloudClient = cluster.getSolrClient();
    Set<String> liveNodes = cloudClient.getZkStateReader().getClusterState().getLiveNodes();
    ArrayList<String> l = new ArrayList<>(liveNodes);
    Collections.shuffle(l, random());
    String emptyNode = l.remove(0);
    String node2bdecommissioned = l.get(0);
    CollectionAdminRequest.Create create;
    // NOTE: always using the createCollection that takes in 'int' for all types of replicas, so we never
    // have to worry about null checking when comparing the Create command with the final Slices
    
    // TODO: tlog replicas do not work correctly in tests due to fault TestInjection#waitForInSyncWithLeader
    create = pickRandom(
                        CollectionAdminRequest.createCollection(coll, "conf1", 5, 2,0,0),
                        //CollectionAdminRequest.createCollection(coll, "conf1", 5, 1,1,0),
                        //CollectionAdminRequest.createCollection(coll, "conf1", 5, 0,1,1),
                        //CollectionAdminRequest.createCollection(coll, "conf1", 5, 1,0,1),
                        //CollectionAdminRequest.createCollection(coll, "conf1", 5, 0,2,0),
                        // check also replicationFactor 1
                        CollectionAdminRequest.createCollection(coll, "conf1", 5, 1,0,0)
                        //CollectionAdminRequest.createCollection(coll, "conf1", 5, 0,1,0)
    );
    create.setCreateNodeSet(StrUtils.join(l, ',')).setMaxShardsPerNode(3);
    cloudClient.request(create);
    
    cluster.waitForActiveCollection(coll, 5, 5 * (create.getNumNrtReplicas() + create.getNumPullReplicas() + create.getNumTlogReplicas()));
    
    DocCollection collection = cloudClient.getZkStateReader().getClusterState().getCollection(coll);
    log.debug("### Before decommission: {}", collection);
    log.info("excluded_node : {}  ", emptyNode);
    createReplaceNodeRequest(node2bdecommissioned, emptyNode, null).processAsync("000", cloudClient);
    CollectionAdminRequest.RequestStatus requestStatus = CollectionAdminRequest.requestStatus("000");
    boolean success = false;
    for (int i = 0; i < 300; i++) {
      CollectionAdminRequest.RequestStatusResponse rsp = requestStatus.process(cloudClient);
      if (rsp.getRequestStatus() == RequestStatusState.COMPLETED) {
        success = true;
        break;
      }
      assertFalse(rsp.getRequestStatus() == RequestStatusState.FAILED);
      Thread.sleep(50);
    }
    assertTrue(success);
    try (HttpSolrClient coreclient = getHttpSolrClient(cloudClient.getZkStateReader().getBaseUrlForNodeName(node2bdecommissioned))) {
      CoreAdminResponse status = CoreAdminRequest.getStatus(null, coreclient);
      assertTrue(status.getCoreStatus().size() == 0);
    }

    Thread.sleep(5000);
    collection = cloudClient.getZkStateReader().getClusterState().getCollection(coll);
    log.debug("### After decommission: {}", collection);
    // check what are replica states on the decommissioned node
    List<Replica> replicas = collection.getReplicas(node2bdecommissioned);
    if (replicas == null) {
      replicas = Collections.emptyList();
    }
    log.debug("### Existing replicas on decommissioned node: {}", replicas);

    //let's do it back - this time wait for recoveries
    CollectionAdminRequest.AsyncCollectionAdminRequest replaceNodeRequest = createReplaceNodeRequest(emptyNode, node2bdecommissioned, Boolean.TRUE);
    replaceNodeRequest.setWaitForFinalState(true);
    replaceNodeRequest.processAsync("001", cloudClient);
    requestStatus = CollectionAdminRequest.requestStatus("001");

    for (int i = 0; i < 200; i++) {
      CollectionAdminRequest.RequestStatusResponse rsp = requestStatus.process(cloudClient);
      if (rsp.getRequestStatus() == RequestStatusState.COMPLETED) {
        success = true;
        break;
      }
      assertFalse(rsp.getRequestStatus() == RequestStatusState.FAILED);
      Thread.sleep(50);
    }
    assertTrue(success);
    try (HttpSolrClient coreclient = getHttpSolrClient(cloudClient.getZkStateReader().getBaseUrlForNodeName(emptyNode))) {
      CoreAdminResponse status = CoreAdminRequest.getStatus(null, coreclient);
      assertEquals("Expecting no cores but found some: " + status.getCoreStatus(), 0, status.getCoreStatus().size());
    }

    collection = cloudClient.getZkStateReader().getClusterState().getCollection(coll);
    assertEquals(create.getNumShards().intValue(), collection.getSlices().size());
    for (Slice s:collection.getSlices()) {
      assertEquals(create.getNumNrtReplicas().intValue(), s.getReplicas(EnumSet.of(Replica.Type.NRT)).size());
      assertEquals(create.getNumTlogReplicas().intValue(), s.getReplicas(EnumSet.of(Replica.Type.TLOG)).size());
      assertEquals(create.getNumPullReplicas().intValue(), s.getReplicas(EnumSet.of(Replica.Type.PULL)).size());
    }
    // make sure all newly created replicas on node are active
    List<Replica> newReplicas = collection.getReplicas(node2bdecommissioned);
    replicas.forEach(r -> {
      for (Iterator<Replica> it = newReplicas.iterator(); it.hasNext(); ) {
        Replica nr = it.next();
        if (nr.getName().equals(r.getName())) {
          it.remove();
        }
      }
    });
    assertFalse(newReplicas.isEmpty());
    for (Replica r : newReplicas) {
      assertEquals(r.toString(), Replica.State.ACTIVE, r.getState());
    }
    // make sure all replicas on emptyNode are not active
    replicas = collection.getReplicas(emptyNode);
    if (replicas != null) {
      for (Replica r : replicas) {
        assertFalse(r.toString(), Replica.State.ACTIVE.equals(r.getState()));
      }
    }

    // check replication metrics on this jetty - see SOLR-14924
    for (JettySolrRunner jetty : cluster.getJettySolrRunners()) {
      if (jetty.getCoreContainer() == null) {
        continue;
      }
      SolrMetricManager metricManager = jetty.getCoreContainer().getMetricManager();
      String registryName = null;
      for (String name : metricManager.registryNames()) {
        if (name.startsWith("solr.core.")) {
          registryName = name;
        }
      }
      Map<String, Metric> metrics = metricManager.registry(registryName).getMetrics();
      if (!metrics.containsKey("REPLICATION./replication.fetcher")) {
        continue;
      }
      @SuppressWarnings("unchecked")
      MetricsMap fetcherGauge = (MetricsMap) ((SolrMetricManager.GaugeWrapper<?>) metrics.get("REPLICATION./replication.fetcher")).getGauge();
      assertNotNull("no IndexFetcher gauge in metrics", fetcherGauge);
      Map<String, Object> value = fetcherGauge.getValue();
      if (value.isEmpty()) {
        continue;
      }
      assertNotNull("isReplicating missing: " + value, value.get("isReplicating"));
      assertTrue("isReplicating should be a boolean: " + value, value.get("isReplicating") instanceof Boolean);
      if (value.get("indexReplicatedAt") == null) {
        continue;
      }
      assertNotNull("timesIndexReplicated missing: " + value, value.get("timesIndexReplicated"));
      assertTrue("timesIndexReplicated should be a number: " + value, value.get("timesIndexReplicated") instanceof Number);
    }

  }

  public static  CollectionAdminRequest.AsyncCollectionAdminRequest createReplaceNodeRequest(String sourceNode, String targetNode, Boolean parallel) {
    if (random().nextBoolean()) {
      return new CollectionAdminRequest.ReplaceNode(sourceNode, targetNode).setParallel(parallel);
    } else  {
      // test back compat with old param names
      // todo remove in solr 8.0
      return new CollectionAdminRequest.AsyncCollectionAdminRequest(CollectionParams.CollectionAction.REPLACENODE)  {
        @Override
        public SolrParams getParams() {
          ModifiableSolrParams params = (ModifiableSolrParams) super.getParams();
          params.set("source", sourceNode);
          params.setNonNull("target", targetNode);
          if (parallel != null) params.set("parallel", parallel.toString());
          return params;
        }
      };
    }
  }
}
