/*
 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
 * agreements. See the NOTICE file distributed with this work for additional information regarding
 * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance with the License. You may obtain a
 * copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License
 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
 * or implied. See the License for the specific language governing permissions and limitations under
 * the License.
 */
package org.apache.geode.internal.cache.ha;

import static java.lang.Thread.sleep;
import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
import static org.apache.geode.internal.cache.ha.HARegionQueue.BLOCKING_HA_QUEUE;
import static org.apache.geode.internal.cache.ha.HARegionQueue.getHARegionQueueInstance;
import static org.apache.geode.internal.statistics.StatisticsClockFactory.disabledClock;
import static org.apache.geode.test.dunit.ThreadUtils.join;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import java.util.Properties;

import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;

import org.apache.geode.cache.CacheFactory;
import org.apache.geode.distributed.DistributedSystem;
import org.apache.geode.internal.cache.EventID;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.test.awaitility.GeodeAwaitility;
import org.apache.geode.test.dunit.ThreadUtils;
import org.apache.geode.test.dunit.WaitCriterion;
import org.apache.geode.test.junit.categories.ClientSubscriptionTest;

@Category({ClientSubscriptionTest.class})
public class BlockingHARegionJUnitTest {

  private static InternalCache cache = null;

  /** boolean to record an exception occurrence in another thread **/
  private static volatile boolean exceptionOccurred = false;
  /** StringBuffer to store the exception **/
  private static StringBuffer exceptionString = new StringBuffer();
  /** boolen to quit the for loop **/
  private static volatile boolean quitForLoop = false;

  @Before
  public void setUp() throws Exception {
    Properties props = new Properties();
    props.setProperty(MCAST_PORT, "0");
    if (cache != null) {
      cache.close(); // fault tolerance
    }
    cache = (InternalCache) CacheFactory.create(DistributedSystem.connect(props));
  }

  /**
   * This test has a scenario where the HAReqionQueue capacity is just 1. There will be two thread.
   * One doing a 1000 puts and the other doing a 1000 takes. The validation for this test is that it
   * should not encounter any exceptions
   */
  @Test
  public void testBoundedPuts() throws Exception {
    exceptionOccurred = false;
    HARegionQueueAttributes harqa = new HARegionQueueAttributes();
    harqa.setBlockingQueueCapacity(1);
    HARegionQueue hrq = HARegionQueue.getHARegionQueueInstance("BlockingHARegionJUnitTest_Region",
        cache, harqa, HARegionQueue.BLOCKING_HA_QUEUE, false, disabledClock());
    hrq.setPrimary(true);// fix for 40314 - capacity constraint is checked for primary only.
    Thread thread1 = new DoPuts(hrq, 1000);
    Thread thread2 = new DoTake(hrq, 1000);

    thread1.start();
    thread2.start();

    ThreadUtils.join(thread1, 30 * 1000);
    ThreadUtils.join(thread2, 30 * 1000);

    if (exceptionOccurred) {
      fail(" Test failed due to " + exceptionString);
    }

    cache.close();
  }

  /**
   * This test tests whether puts are blocked. There are two threads. One which is going to do 2
   * puts and one which is going to do take a single take. The capacity of the region is just 1. The
   * put thread is first started and it is then ensured that only one put has successfully made
   * through and that the thread is still alive. Then the take thread is started. This will cause
   * the region size to come down by one and the put thread waiting will go ahead and do the put.
   * The thread should then die and the region size should be validated to reflect that.
   */
  @Test
  public void testPutBeingBlocked() throws Exception {
    exceptionOccurred = false;
    quitForLoop = false;
    HARegionQueueAttributes harqa = new HARegionQueueAttributes();
    harqa.setBlockingQueueCapacity(1);
    final HARegionQueue hrq = getHARegionQueueInstance(
        "BlockingHARegionJUnitTest_Region", cache, harqa, BLOCKING_HA_QUEUE, false,
        disabledClock());
    hrq.setPrimary(true);// fix for 40314 - capacity constraint is checked for primary only.
    final Thread thread1 = new DoPuts(hrq, 2);
    thread1.start();
    WaitCriterion ev = new WaitCriterion() {
      @Override
      public boolean done() {
        return hrq.region.size() == 2;
      }

      @Override
      public String description() {
        return null;
      }
    };
    GeodeAwaitility.await().untilAsserted(ev);
    assertTrue(thread1.isAlive()); // thread should still be alive (in wait state)

    Thread thread2 = new DoTake(hrq, 1);
    thread2.start(); // start take thread
    ev = new WaitCriterion() {
      @Override
      public boolean done() {
        return hrq.region.size() == 3;
      }

      @Override
      public String description() {
        return null;
      }
    };
    // sleep. take will proceed and so will sleeping put
    GeodeAwaitility.await().untilAsserted(ev);

    // thread should have died since put should have proceeded
    ev = new WaitCriterion() {
      @Override
      public boolean done() {
        return !thread1.isAlive();
      }

      @Override
      public String description() {
        return "thread1 still alive";
      }
    };
    GeodeAwaitility.await().untilAsserted(ev);

    join(thread1, 30 * 1000); // for completeness
    join(thread2, 30 * 1000);
    if (exceptionOccurred) {
      fail(" Test failed due to " + exceptionString);
    }
    cache.close();
  }


  /**
   * This test tests that the region capacity is never exceeded even in highly concurrent
   * environments. The region capacity is set to 10000. Then 5 threads start doing put
   * simultaneously. They will reach a state where the queue is full and they will all go in a wait
   * state. the region size would be verified to be 20000 (10000 puts and 10000 DACE objects). then
   * the threads are interrupted and made to quit the loop
   */
  @Test
  public void testConcurrentPutsNotExceedingLimit() throws Exception {
    exceptionOccurred = false;
    quitForLoop = false;
    HARegionQueueAttributes harqa = new HARegionQueueAttributes();
    harqa.setBlockingQueueCapacity(10000);
    final HARegionQueue hrq = getHARegionQueueInstance(
        "BlockingHARegionJUnitTest_Region", cache, harqa, BLOCKING_HA_QUEUE, false,
        disabledClock());
    hrq.setPrimary(true);// fix for 40314 - capacity constraint is checked for primary only.
    Thread thread1 = new DoPuts(hrq, 20000, 1);
    Thread thread2 = new DoPuts(hrq, 20000, 2);
    Thread thread3 = new DoPuts(hrq, 20000, 3);
    Thread thread4 = new DoPuts(hrq, 20000, 4);
    Thread thread5 = new DoPuts(hrq, 20000, 5);

    thread1.start();
    thread2.start();
    thread3.start();
    thread4.start();
    thread5.start();

    WaitCriterion ev = new WaitCriterion() {
      @Override
      public boolean done() {
        return hrq.region.size() == 20000;
      }

      @Override
      public String description() {
        return null;
      }
    };
    GeodeAwaitility.await().untilAsserted(ev);

    assertTrue(thread1.isAlive());
    assertTrue(thread2.isAlive());
    assertTrue(thread3.isAlive());
    assertTrue(thread4.isAlive());
    assertTrue(thread5.isAlive());

    assertTrue(hrq.region.size() == 20000);

    quitForLoop = true;
    sleep(20000);

    thread1.interrupt();
    thread2.interrupt();
    thread3.interrupt();
    thread4.interrupt();
    thread5.interrupt();

    sleep(2000);

    join(thread1, 5 * 60 * 1000);
    join(thread2, 5 * 60 * 1000);
    join(thread3, 5 * 60 * 1000);
    join(thread4, 5 * 60 * 1000);
    join(thread5, 5 * 60 * 1000);

    cache.close();
  }

  /**
   * This test tests that the region capacity is never exceeded even in highly concurrent
   * environments. The region capacity is set to 10000. Then 5 threads start doing put
   * simultaneously. They will reach a state where the queue is full and they will all go in a wait
   * state. the region size would be verified to be 20000 (10000 puts and 10000 DACE objects). then
   * the threads are interrupted and made to quit the loop
   */
  @Ignore("TODO: test is disabled")
  @Test
  public void testConcurrentPutsTakesNotExceedingLimit() throws Exception {
    exceptionOccurred = false;
    quitForLoop = false;
    HARegionQueueAttributes harqa = new HARegionQueueAttributes();
    harqa.setBlockingQueueCapacity(10000);
    final HARegionQueue hrq = getHARegionQueueInstance(
        "BlockingHARegionJUnitTest_Region", cache, harqa, BLOCKING_HA_QUEUE, false,
        disabledClock());
    Thread thread1 = new DoPuts(hrq, 40000, 1);
    Thread thread2 = new DoPuts(hrq, 40000, 2);
    Thread thread3 = new DoPuts(hrq, 40000, 3);
    Thread thread4 = new DoPuts(hrq, 40000, 4);
    Thread thread5 = new DoPuts(hrq, 40000, 5);

    Thread thread6 = new DoTake(hrq, 5000);
    Thread thread7 = new DoTake(hrq, 5000);
    Thread thread8 = new DoTake(hrq, 5000);
    Thread thread9 = new DoTake(hrq, 5000);
    Thread thread10 = new DoTake(hrq, 5000);

    thread1.start();
    thread2.start();
    thread3.start();
    thread4.start();
    thread5.start();

    thread6.start();
    thread7.start();
    thread8.start();
    thread9.start();
    thread10.start();

    join(thread6, 30 * 1000);
    join(thread7, 30 * 1000);
    join(thread8, 30 * 1000);
    join(thread9, 30 * 1000);
    join(thread10, 30 * 1000);

    WaitCriterion ev = new WaitCriterion() {
      @Override
      public boolean done() {
        return hrq.region.size() == 20000;
      }

      @Override
      public String description() {
        return null;
      }
    };
    GeodeAwaitility.await().untilAsserted(ev);

    assertTrue(thread1.isAlive());
    assertTrue(thread2.isAlive());
    assertTrue(thread3.isAlive());
    assertTrue(thread4.isAlive());
    assertTrue(thread5.isAlive());

    assertTrue(hrq.region.size() == 20000);

    quitForLoop = true;

    sleep(2000);

    thread1.interrupt();
    thread2.interrupt();
    thread3.interrupt();
    thread4.interrupt();
    thread5.interrupt();

    sleep(2000);


    join(thread1, 30 * 1000);
    join(thread2, 30 * 1000);
    join(thread3, 30 * 1000);
    join(thread4, 30 * 1000);
    join(thread5, 30 * 1000);

    cache.close();
  }

  /**
   * Tests the bug in HARegionQueue where the take side put permit is not being incremented when the
   * event arriving at the queue which has optimistically decreased the put permit, is not
   * incrementing the take permit if the event has a sequence ID less than the last dispatched
   * sequence ID. This event is rightly rejected from entering the queue but the take permit also
   * needs to increase & a notify issued
   */
  @Test
  public void testHARQMaxCapacity_Bug37627() throws Exception {
    try {
      exceptionOccurred = false;
      quitForLoop = false;
      HARegionQueueAttributes harqa = new HARegionQueueAttributes();
      harqa.setBlockingQueueCapacity(1);
      harqa.setExpiryTime(180);
      final HARegionQueue hrq = HARegionQueue.getHARegionQueueInstance(
          "BlockingHARegionJUnitTest_Region", cache, harqa, HARegionQueue.BLOCKING_HA_QUEUE, false,
          disabledClock());
      hrq.setPrimary(true);// fix for 40314 - capacity constraint is checked for primary only.
      final EventID id1 = new EventID(new byte[] {1}, 1, 2); // violation
      final EventID ignore = new EventID(new byte[] {1}, 1, 1); //
      final EventID id2 = new EventID(new byte[] {1}, 1, 3); //
      Thread t1 = new Thread() {
        @Override
        public void run() {
          try {
            hrq.put(new ConflatableObject("key1", "value1", id1, false, "region1"));
            hrq.take();
            hrq.put(new ConflatableObject("key2", "value1", ignore, false, "region1"));
            hrq.put(new ConflatableObject("key3", "value1", id2, false, "region1"));
          } catch (Exception e) {
            exceptionString.append("First Put in region queue failed");
            exceptionOccurred = true;
          }
        }
      };
      t1.start();
      ThreadUtils.join(t1, 20 * 1000);
      if (exceptionOccurred) {
        fail(" Test failed due to " + exceptionString);
      }
    } finally {
      if (cache != null) {
        cache.close();
      }
    }
  }

  /**
   * class which does specified number of puts on the queue
   */
  private static class DoPuts extends Thread {

    HARegionQueue regionQueue = null;
    final int numberOfPuts;

    DoPuts(HARegionQueue haRegionQueue, int numberOfPuts) {
      this.regionQueue = haRegionQueue;
      this.numberOfPuts = numberOfPuts;
    }

    /**
     * region id can be specified to generate Thread unique events
     */
    int regionId = 0;

    DoPuts(HARegionQueue haRegionQueue, int numberOfPuts, int regionId) {
      this.regionQueue = haRegionQueue;
      this.numberOfPuts = numberOfPuts;
      this.regionId = regionId;
    }

    @Override
    public void run() {
      for (int i = 0; i < numberOfPuts; i++) {
        try {
          this.regionQueue.put(new ConflatableObject("" + i, "" + i,
              new EventID(new byte[regionId], i, i), false, "BlockingHARegionJUnitTest_Region"));
          if (quitForLoop) {
            break;
          }
          if (Thread.currentThread().isInterrupted()) {
            break;
          }
        } catch (Exception e) {
          exceptionOccurred = true;
          exceptionString.append(" Exception occurred due to ").append(e);
          break;
        }
      }
    }
  }

  /**
   * class which does a specified number of takes
   */
  private static class DoTake extends Thread {

    final HARegionQueue regionQueue;
    final int numberOfTakes;

    DoTake(HARegionQueue haRegionQueue, int numberOfTakes) {
      this.regionQueue = haRegionQueue;
      this.numberOfTakes = numberOfTakes;
    }

    @Override
    public void run() {
      for (int i = 0; i < numberOfTakes; i++) {
        try {
          assertNotNull(this.regionQueue.take());
          if (Thread.currentThread().isInterrupted()) {
            break;
          }
        } catch (Exception e) {
          exceptionOccurred = true;
          exceptionString.append(" Exception occurred due to ").append(e);
          break;
        }
      }
    }
  }

}
