/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.server.resourcemanager;

import com.google.common.base.Supplier;
import org.apache.curator.CuratorZookeeperClient;
import org.apache.curator.test.InstanceSpec;
import org.apache.curator.test.KillSession;
import org.apache.curator.test.TestingCluster;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.zookeeper.ZooKeeper;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.io.IOException;
import java.util.Collection;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;

public class TestLeaderElectorService {

  private static final String RM1_ADDRESS = "1.1.1.1:1";
  private static final String RM1_NODE_ID = "rm1";

  private static final String RM2_ADDRESS = "0.0.0.0:0";
  private static final String RM2_NODE_ID = "rm2";

  Configuration conf ;
  MockRM rm1;
  MockRM rm2;
  TestingCluster zkCluster;
  @Before
  public void setUp() throws Exception {
    Logger rootLogger = LogManager.getRootLogger();
    rootLogger.setLevel(Level.INFO);
    conf = new Configuration();
    conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
    conf.setBoolean(YarnConfiguration.CURATOR_LEADER_ELECTOR, true);

    conf.set(YarnConfiguration.RM_CLUSTER_ID, "cluster1");
    conf.set(YarnConfiguration.RM_HA_IDS, RM1_NODE_ID + "," + RM2_NODE_ID);

    for (String confKey : YarnConfiguration.getServiceAddressConfKeys(conf)) {
      conf.set(HAUtil.addSuffix(confKey, RM1_NODE_ID), RM1_ADDRESS);
      conf.set(HAUtil.addSuffix(confKey, RM2_NODE_ID), RM2_ADDRESS);
    }
    zkCluster = new TestingCluster(3);
    conf.set(YarnConfiguration.RM_ZK_ADDRESS, zkCluster.getConnectString());
    zkCluster.start();
  }

  @After
  public void tearDown() throws Exception {
    if (rm1 != null) {
      rm1.stop();
    }
    if (rm2 !=null) {
      rm2.stop();
    }
  }

  // 1. rm1 active
  // 2. rm2 standby
  // 3. stop rm1
  // 4. rm2 become active
  @Test (timeout = 20000)
  public void testRMShutDownCauseFailover() throws Exception {
    rm1 = startRM("rm1", HAServiceState.ACTIVE);
    rm2 = startRM("rm2", HAServiceState.STANDBY);

    // wait for some time to make sure rm2 will not become active;
    Thread.sleep(5000);
    waitFor(rm2, HAServiceState.STANDBY);

    rm1.stop();
    // rm2 should become active;
    waitFor(rm2, HAServiceState.ACTIVE);
  }

  // 1. rm1 active
  // 2. rm2 standby
  // 3. submit a job to rm1 which triggers state-store failure.
  // 4. rm2 become
  @Test
  public void testStateStoreFailureCauseFailover() throws  Exception {

    conf.set(YarnConfiguration.RM_HA_ID, "rm1");
    MemoryRMStateStore memStore = new MemoryRMStateStore() {
      @Override
      public synchronized void storeApplicationStateInternal(ApplicationId
          appId, ApplicationStateData appState) throws Exception{
        throw new Exception("store app failure.");
      }
    };
    memStore.init(conf);
    rm1 = new MockRM(conf, memStore, true);
    rm1.init(conf);
    rm1.start();

    waitFor(rm1, HAServiceState.ACTIVE);

    rm2 = startRM("rm2", HAServiceState.STANDBY);

    // submit an app which will trigger state-store failure.
    rm1.submitApp(200, "app1", "user1", null, "default", false);
    waitFor(rm1, HAServiceState.STANDBY);

    // rm2 should become active;
    waitFor(rm2, HAServiceState.ACTIVE);

    rm2.stop();
    // rm1 will become active again
    waitFor(rm1, HAServiceState.ACTIVE);
  }

  // 1. rm1 active
  // 2. restart zk cluster
  // 3. rm1 will first relinquish leadership and re-acquire leadership
  @Test
  public void testZKClusterDown() throws Exception {
    rm1 = startRM("rm1", HAServiceState.ACTIVE);

    // stop zk cluster
    zkCluster.stop();
    waitFor(rm1, HAServiceState.STANDBY);

    Collection<InstanceSpec> instanceSpecs = zkCluster.getInstances();
    zkCluster = new TestingCluster(instanceSpecs);
    zkCluster.start();
    // rm becomes active again
    waitFor(rm1, HAServiceState.ACTIVE);
  }

  // 1. rm1 active
  // 2. kill the zk session between the rm and zk cluster.
  // 3. rm1 will first relinquish leadership and re-acquire leadership
  @Test
  public void testExpireCurrentZKSession() throws Exception{

    rm1 = startRM("rm1", HAServiceState.ACTIVE);

    CuratorBasedElectorService service = (CuratorBasedElectorService)
        rm1.getRMContext().getLeaderElectorService();
    CuratorZookeeperClient client =
        service.getCuratorClient().getZookeeperClient();
    // this will expire current curator client session. curator will re-establish
    // the session. RM will first relinquish leadership and re-acquire leadership
    KillSession
        .kill(client.getZooKeeper(), client.getCurrentConnectionString());

    waitFor(rm1, HAServiceState.ACTIVE);
  }

  // 1. rm1 fail to become active.
  // 2. rm1 will rejoin leader election and retry the leadership
  @Test
  public void testRMFailToTransitionToActive() throws Exception{
    conf.set(YarnConfiguration.RM_HA_ID, "rm1");
    final AtomicBoolean throwException = new AtomicBoolean(true);
    Thread launchRM = new Thread() {
      @Override
      public void run() {
        rm1 = new MockRM(conf, true) {
          @Override
          synchronized void transitionToActive() throws Exception {
            if (throwException.get()) {
             throw new Exception("Fail to transition to active");
            } else {
              super.transitionToActive();
            }
          }
        };
        rm1.init(conf);
        rm1.start();
      }
     };
    launchRM.start();
    // wait some time, rm will keep retry the leadership;
    Thread.sleep(5000);
    throwException.set(false);
    waitFor(rm1, HAServiceState.ACTIVE);
  }

  // 1. rm1 active
  // 2. rm2 standby
  // 3. kill the current connected zk instance
  // 4. either rm1 or rm2 will become active.
  @Test
  public void testKillZKInstance() throws Exception {
    rm1 = startRM("rm1", HAServiceState.ACTIVE);
    rm2 = startRM("rm2", HAServiceState.STANDBY);

    CuratorBasedElectorService service = (CuratorBasedElectorService)
        rm1.getRMContext().getLeaderElectorService();

    ZooKeeper zkClient =
        service.getCuratorClient().getZookeeperClient().getZooKeeper();

    InstanceSpec connectionInstance = zkCluster.findConnectionInstance(zkClient);
    zkCluster.killServer(connectionInstance);

    // wait for rm1 or rm2 to be active by randomness
    GenericTestUtils.waitFor(new Supplier<Boolean>() {
      @Override public Boolean get() {
        try {
          HAServiceState rm1State =
              rm1.getAdminService().getServiceStatus().getState();
          HAServiceState rm2State =
              rm2.getAdminService().getServiceStatus().getState();
          return (rm1State.equals(HAServiceState.ACTIVE) && rm2State
              .equals(HAServiceState.STANDBY)) || (
              rm1State.equals(HAServiceState.STANDBY) && rm2State
                  .equals(HAServiceState.ACTIVE));
        } catch (IOException e) {
        }
        return false;
      }
    }, 2000, 15000);
  }

  private MockRM startRM(String rmId, HAServiceState state) throws Exception{
    YarnConfiguration yarnConf = new YarnConfiguration(conf);
    yarnConf.set(YarnConfiguration.RM_HA_ID, rmId);
    MockRM rm = new MockRM(yarnConf, true);
    rm.init(yarnConf);
    rm.start();
    waitFor(rm, state);
    return rm;
  }

  private void waitFor(final MockRM rm,
      final HAServiceState state)
      throws TimeoutException, InterruptedException {
    GenericTestUtils.waitFor(new Supplier<Boolean>() {
      @Override public Boolean get() {
        try {
          return rm.getAdminService().getServiceStatus().getState()
              .equals(state);
        } catch (IOException e) {
        }
        return false;
      }
    }, 2000, 15000);
  }
}
