/*
 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
 * agreements. See the NOTICE file distributed with this work for additional information regarding
 * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance with the License. You may obtain a
 * copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License
 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
 * or implied. See the License for the specific language governing permissions and limitations under
 * the License.
 */
package org.apache.geode.internal.cache.ha;

import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
import static org.apache.geode.internal.statistics.StatisticsClockFactory.disabledClock;
import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

import org.apache.logging.log4j.Logger;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;

import org.apache.geode.LogWriter;
import org.apache.geode.cache.AttributesFactory;
import org.apache.geode.cache.CacheException;
import org.apache.geode.cache.CacheFactory;
import org.apache.geode.cache.CacheListener;
import org.apache.geode.cache.DataPolicy;
import org.apache.geode.cache.EntryEvent;
import org.apache.geode.cache.Scope;
import org.apache.geode.cache.util.CacheListenerAdapter;
import org.apache.geode.internal.cache.Conflatable;
import org.apache.geode.internal.cache.EventID;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.ha.HARegionQueue.MapWrapper;
import org.apache.geode.logging.internal.log4j.api.LogService;
import org.apache.geode.test.dunit.ThreadUtils;
import org.apache.geode.test.junit.categories.ClientSubscriptionTest;

/**
 * Test to verify Add operation to HARegion Queue with and without conflation.
 */
@Category({ClientSubscriptionTest.class})
public class HARQAddOperationJUnitTest {
  private static final Logger logger = LogService.getLogger();

  /** The cache instance */
  protected InternalCache cache = null;

  /** Logger for this test */
  protected LogWriter logWriter = null;

  /** The <code>RegionQueue</code> instance */
  private HARegionQueue rq = null;

  protected static final String KEY1 = "Key-1";

  protected static final String KEY2 = "Key-2";

  protected static final String VALUE1 = "Value-1";

  protected static final String VALUE2 = "Value-2";

  protected boolean testFailed = false;

  protected StringBuffer message = null;

  protected int barrierCount = 0;

  static volatile int expiryCount = 0;

  @Before
  public void setUp() throws Exception {
    this.cache = createCache();
    this.logWriter = cache.getLogger();
  }

  @After
  public void tearDown() throws Exception {
    this.cache.close();
  }

  /**
   * Creates the cache instance for the test
   */
  private InternalCache createCache() throws CacheException {
    return (InternalCache) new CacheFactory().set(MCAST_PORT, "0").create();
  }

  /**
   * Creates HA region-queue object
   */
  protected HARegionQueue createHARegionQueue(String name)
      throws IOException, ClassNotFoundException, CacheException, InterruptedException {
    AttributesFactory factory = new AttributesFactory();
    factory.setDataPolicy(DataPolicy.REPLICATE);
    factory.setScope(Scope.DISTRIBUTED_ACK);
    HARegionQueue regionqueue = HARegionQueue.getHARegionQueueInstance(name, cache,
        HARegionQueue.NON_BLOCKING_HA_QUEUE, false, disabledClock());
    return regionqueue;
  }

  /**
   * Creates HA region-queue object
   */
  protected HARegionQueue createHARegionQueue(String name, HARegionQueueAttributes attrs)
      throws IOException, ClassNotFoundException, CacheException, InterruptedException {
    HARegionQueue regionqueue = HARegionQueue.getHARegionQueueInstance(name, cache, attrs,
        HARegionQueue.NON_BLOCKING_HA_QUEUE, false, disabledClock());
    return regionqueue;
  }

  /**
   * Add operation with conflation : 1) Add Key1 Val1 & then Key1 Val2. 2) At the end of second
   * operation , Region should contain the entry correspodning to value 2 3) Available IDs , Last
   * DispatchedWrapper Set & Conflation Map should have size 1. 4) Conflation Map ,
   * LastDispatchedWrapper Set & Available IDs should have counter corresponding to second operation
   */
  @Test
  public void testQueueAddOperationWithConflation() throws Exception {
    this.logWriter.info("HARegionQueueJUnitTest : testQueueAddOperationWithConflation BEGIN");
    this.rq = createHARegionQueue("testQueueAddOperationWithConflation");
    EventID id1 = new EventID(new byte[] {1}, 1, 1);
    EventID id2 = new EventID(new byte[] {1}, 1, 2);
    ConflatableObject c1 = new ConflatableObject(KEY1, VALUE1, id1, true, "region1");
    ConflatableObject c2 = new ConflatableObject(KEY1, VALUE2, id2, true, "region1");
    this.rq.put(c1);
    this.rq.put(c2);
    Map conflationMap = (Map) rq.getConflationMapForTesting().get("region1");
    assertEquals(1, conflationMap.size());
    Long cntr = (Long) conflationMap.get(KEY1);
    ConflatableObject retValue = (ConflatableObject) rq.getRegion().get(cntr);
    assertEquals(VALUE2, retValue.getValueToConflate());
    assertEquals(1, rq.getAvailableIds().size());

    assertEquals(1, rq.getCurrentCounterSet(id1).size());
    this.logWriter.info("HARegionQueueJUnitTest : testQueueAddOperationWithConflation END");
  }

  /**
   * Add operation without conflation : 1) Region should have an entry containing counter vs
   * Conflatable object. 2) Events Map should have size as 1 with one ThreadIdentifer objecta s key
   * & Last DispatchedWrapper as value. 3) This wrapper should have a set with size 1. 4) The
   * available IDs set shoudl have size 1. 5) Put another object by same thread. 6) The wrapper set
   * & availableIs List should have size 2 .
   */
  @Test
  public void testQueueAddOperationWithoutConflation() throws Exception {
    this.logWriter.info("HARegionQueueJUnitTest : testQueueAddOperationWithoutConflation BEGIN");
    this.rq = createHARegionQueue("testQueueAddOperationWithConflation");
    EventID id1 = new EventID(new byte[] {1}, 1, 1);
    EventID id2 = new EventID(new byte[] {1}, 1, 2);
    ConflatableObject c1 = new ConflatableObject(KEY1, VALUE1, id1, false, "region1");
    ConflatableObject c2 = new ConflatableObject(KEY2, VALUE2, id2, false, "region1");
    this.rq.put(c1);

    assertNull(rq.getConflationMapForTesting().get("region1"));
    assertEquals(1, rq.getAvailableIds().size());

    assertEquals(1, rq.getCurrentCounterSet(id1).size());

    this.rq.put(c2);
    assertNull(rq.getConflationMapForTesting().get("region1"));
    assertEquals(2, rq.getAvailableIds().size());
    assertEquals(2, rq.getCurrentCounterSet(id1).size());

    Iterator iter = rq.getCurrentCounterSet(id1).iterator();
    if (iter.hasNext()) {
      Long cntr = (Long) iter.next();
      ConflatableObject co = (ConflatableObject) this.rq.getRegion().get(cntr);
      assertEquals(KEY1, co.getKeyToConflate());
      assertEquals(VALUE1, co.getValueToConflate());
    }
    if (iter.hasNext()) {
      Long cntr = (Long) iter.next();
      ConflatableObject co = (ConflatableObject) this.rq.getRegion().get(cntr);
      assertEquals(KEY2, co.getKeyToConflate());
      assertEquals(VALUE2, co.getValueToConflate());
    }
    this.logWriter.info("HARegionQueueJUnitTest : testQueueAddOperationWithoutConflation END");
  }

  /**
   * Add operation without conflation followed by a Take operation: RegionSize, available IDs ,
   * LastDispatchedWrapper's Set should have size 0. Events map containg should have size 1 (
   * corresponding to the lastDispatchedAndCurrentEvent Wrapper objcet)
   */
  @Test
  public void testQueueAddTakeOperationWithoutConflation() throws Exception {
    this.logWriter
        .info("HARegionQueueJUnitTest : testQueueAddTakeOperationWithoutConflation BEGIN");

    this.rq = createHARegionQueue("testQueueAddOperationWithConflation");
    EventID id = new EventID(new byte[] {1}, 1, 1);
    ConflatableObject obj = new ConflatableObject(KEY1, VALUE1, id, true, "region1");
    this.rq.put(obj);
    this.rq.take();
    assertNull(rq.getRegion().get(KEY1));
    assertEquals(0, this.rq.getAvailableIds().size());
    Map eventsMap = this.rq.getEventsMapForTesting();
    assertEquals(1, eventsMap.size());
    assertEquals(0, rq.getCurrentCounterSet(id).size());
    this.logWriter.info("HARegionQueueJUnitTest : testQueueAddTakeOperationWithoutConflation END");
  }

  /**
   * (Add opn followed by Take without conflation) :Add operation which creates the
   * LastDispatchedAndCurrentEvents object should also add it to the Region with Threaddentifer as
   * key & sequence as the value for Expiry. Perform a take operation. Validate that expiry on
   * ThreadIdentifier removes itself from Events Map
   */
  @Test
  public void testExpiryOnThreadIdentifier() {
    try {
      HARegionQueueAttributes attrs = new HARegionQueueAttributes();
      attrs.setExpiryTime(2);
      HARegionQueue regionqueue = createHARegionQueue("testing", attrs);
      // create the conflatable object
      EventID id = new EventID(new byte[] {1}, 1, 1);
      ConflatableObject obj = new ConflatableObject(KEY1, VALUE1, id, true, "region1");

      ThreadIdentifier threadId =
          new ThreadIdentifier(obj.getEventId().getMembershipID(), obj.getEventId().getThreadID());
      regionqueue.put(obj);
      regionqueue.take();
      Thread.sleep(25000);
      assertFalse(
          "ThreadIdentifier did not remove itself through expiry.The reqgion queue is of type="
              + regionqueue.getClass(),
          regionqueue.getRegion().containsKey(threadId));
      Map eventsMap = regionqueue.getEventsMapForTesting();
      assertNull("expiry action on ThreadIdentifier did not remove itself from eventsMap",
          eventsMap.get(threadId));

    } catch (Exception e) {
      fail(" test failed due to " + e);
    }
  }

  /**
   * Add operation which creates the LastDispatchedAndCurrentEvents object should also add it to the
   * Region with Threaddentifer as key & sequence as the value for Expiry. Validate that expiry on
   * ThreadIdentifier does not removes itself from Events Map as data is lying & it resets the self
   * expiry. Validate the data present in Queue experiences expiry. After the expiry of the data ,
   * AvaialbleIds size should be 0, entry removed from Region, LastDispatchedWrapperSet should have
   * size 0.
   */
  @Test
  public void testNoExpiryOnThreadIdentifier() {
    try {
      HARegionQueueAttributes hqa = new HARegionQueueAttributes();
      hqa.setExpiryTime(8);
      HARegionQueue regionqueue = createHARegionQueue("testing", hqa);
      EventID id1 = new EventID(new byte[] {1}, 1, 1);
      EventID id2 = new EventID(new byte[] {1}, 1, 2);
      ConflatableObject c1 = new ConflatableObject(KEY1, VALUE1, id1, true, "region1");
      ConflatableObject c2 = new ConflatableObject(KEY1, VALUE2, id2, true, "region1");
      ThreadIdentifier threadId =
          new ThreadIdentifier(c1.getEventId().getMembershipID(), c1.getEventId().getThreadID());

      regionqueue.put(c1);
      Object o = regionqueue.take();
      assertNotNull(o);
      // wait for some time and put second object
      Thread.sleep(3000);
      regionqueue.put(c2);
      // wait for some more time so that C2 has not expired
      Thread.sleep(4000);

      Map eventsMap = regionqueue.getEventsMapForTesting();

      // verify that ThreadIdentifier does not remove itself as data is
      // lying
      assertNotNull(
          "ThreadIdentifier removed itself through expiry even though data was lying in the queue",
          eventsMap.get(threadId));

      // After the expiry of the data , AvaialbleIds size should be 0,
      // entry
      // removed from Region, LastDispatchedWrapperSet should have size 0.
      await()
          .untilAsserted(() -> assertEquals(0, regionqueue.getRegion().entrySet(false).size()));
      assertEquals(0, regionqueue.getAvailableIds().size());
      assertNull(regionqueue.getCurrentCounterSet(id1));

    } catch (Exception e) {
      e.printStackTrace();
      fail(" test failed due to " + e);
    }
  }

  /**
   * Multiple arrivals of QRM for the same thread id of the client.Intially queue contains objects
   * from 1- 10. QRM with sequenceID 5 arrives It should remove only remove objects for 1- 5. Then
   * sequenceID 10 come which should remove 5-10.
   */
  @Test
  public void testMultipleQRMArrival() throws Exception {
    HARegionQueue regionqueue = createHARegionQueue("testNoExpiryOnThreadIdentifier");

    EventID[] ids = new EventID[10];
    for (int i = 0; i < 10; i++) {
      ids[i] = new EventID(new byte[] {1}, 1, i + 1);
    }
    for (int i = 0; i < 10; i++) {
      regionqueue.put(new ConflatableObject("KEY " + i, "VALUE" + i, ids[i], true, "region1"));
    }

    // Available id size should be == 10 after puting ten entries
    assertEquals(10, regionqueue.getAvailableIds().size());

    // QRM message for therad id 1 and last sequence id 5
    regionqueue.removeDispatchedEvents(ids[4]);
    assertEquals(5, regionqueue.getAvailableIds().size());
    assertEquals(5, regionqueue.getCurrentCounterSet(ids[0]).size());

    Iterator iter = regionqueue.getCurrentCounterSet(ids[0]).iterator();
    while (iter.hasNext()) {
      Long cntr = (Long) iter.next();
      ConflatableObject co = (ConflatableObject) regionqueue.getRegion().get(cntr);
      assertTrue(co.getEventId().getSequenceID() > 5);
    }

    regionqueue.removeDispatchedEvents(ids[9]);
    assertEquals(0, regionqueue.getAvailableIds().size());
  }

  /**
   * Concurrent arrival of put & QRM messagge for a ThreadIdentifier coming for 1st time. The
   * LastDispatchedObject gets operated first by the put thread , the QRM thread should be blocked
   * till the completion of add operation. Thus before QRM thread acts , the object should be
   * present in the lastDispatchedSet & AvailableID. Then the QRM thread gets unblocked , it should
   * remove from the available ID.
   */
  @Test
  public void testConcurrentPutAndQRM() throws Exception {
    testFailed = false;
    message = new StringBuffer();
    final HARegionQueue regionqueue = createHARegionQueue("testConcurrentPutAndQRM");
    final EventID id1 = new EventID(new byte[] {1}, 1, 1);
    final EventID id2 = new EventID(new byte[] {1}, 1, 2);
    Thread t1 = new Thread() {
      @Override
      public void run() {
        try {
          regionqueue.put(new ConflatableObject(KEY1, VALUE1, id1, true, "region1"));
          regionqueue.put(new ConflatableObject(KEY2, VALUE2, id2, true, "region1"));
        } catch (Exception e) {
          message.append("Put in region queue failed");
          testFailed = true;
        }
      }
    };
    t1.setPriority(Thread.MAX_PRIORITY);

    Thread t2 = new Thread() {
      @Override
      public void run() {
        try {
          regionqueue.removeDispatchedEvents(id2);
        } catch (Exception e) {
          message.append("Removal by QRM in region queue failed");
          testFailed = true;
        }
      }
    };

    t1.start();
    t2.start();

    ThreadUtils.join(t1, 180 * 1000);
    ThreadUtils.join(t2, 180 * 1000);

    if (testFailed)
      fail("Test failed due to " + message);

    assertEquals(0, regionqueue.getAvailableIds().size());
    assertEquals(2, regionqueue.getLastDispatchedSequenceId(id2));
  }

  /**
   * Concurrent arrival of put & QRM messagge for a ThreadIdentifier coming for 1st time. The
   * LastDispatchedObject gets operated first by the QRM thread , the Put thread should be blocked
   * till the completion of QRM operation. Thus put thread should see that last Sequence is > than
   * the current & hence the put operation shud remove from region without adding the ID anywhere.
   */
  @Test
  public void testConcurrentQRMAndPut() throws Exception {
    testFailed = false;
    final HARegionQueue regionqueue = createHARegionQueue("testConcurrentQRMAndPut");
    final EventID id1 = new EventID(new byte[] {1}, 1, 1);
    final EventID id2 = new EventID(new byte[] {1}, 1, 2);

    Thread t1 = new Thread() {
      @Override
      public void run() {
        try {
          regionqueue.put(new ConflatableObject(KEY1, VALUE1, id1, true, "region1"));
          regionqueue.put(new ConflatableObject(KEY2, VALUE2, id2, true, "region1"));
        } catch (Exception e) {
          message.append("Put in region queue failed");
          testFailed = true;
        }
      }
    };

    Thread t2 = new Thread() {
      @Override
      public void run() {
        try {
          regionqueue.removeDispatchedEvents(id2);
        } catch (Exception e) {
          message.append("Removal of Events by QRM in Region queue failed");
          testFailed = true;
        }
      }
    };
    t2.setPriority(Thread.MAX_PRIORITY);

    t2.start();
    t1.start();

    ThreadUtils.join(t1, 180 * 1000);
    ThreadUtils.join(t2, 180 * 1000);

    if (testFailed)
      fail("Test failed due to " + message);

    assertEquals(0, regionqueue.getAvailableIds().size());
    assertEquals(2, regionqueue.getLastDispatchedSequenceId(id2));
  }

  /**
   * Two QRMs arriving such that higer sequence number arriving before lower sequence number. The
   * lower squnce number should not set itself & also not do any checking on the IDs of the
   * LinkedHashSet
   */
  @Test
  public void testEventMapPopulationForQRM() throws Exception {
    HARegionQueue regionqueue = createHARegionQueue("testEventMapPopulationForQRM");
    EventID id1 = new EventID(new byte[] {1}, 1, 1);
    EventID id2 = new EventID(new byte[] {1}, 1, 2);

    this.logWriter.info("RemoveDispatched event for sequence id : " + id2.getSequenceID());
    regionqueue.removeDispatchedEvents(id2);
    this.logWriter.info("RemoveDispatched event for sequence id :" + id1.getSequenceID());
    regionqueue.removeDispatchedEvents(id1);
    assertEquals(
        "Size of eventMap should be 1 but actual size " + regionqueue.getEventsMapForTesting(),
        regionqueue.getEventsMapForTesting().size(), 1);
    this.logWriter.info("sequence id : " + regionqueue.getLastDispatchedSequenceId(id2));
    assertEquals("Last dispatched sequence id should be 2 but actual sequence id is ",
        regionqueue.getLastDispatchedSequenceId(id2), id2.getSequenceID());
    this.logWriter.info("testEventMapPopulationForQRM() completed successfully");

  }

  /**
   * Concurrent arrival of put operations on different threadIDs but for same key . One should
   * conflate the other. Whosoever confaltes , that ID should be present in the availableIDs list ,
   * Region , ConflationMap & its HashSet for that ThreadIdentifier. The ID which gets conflated
   * should not be present in the availableID, Region & that ThreadIdentifier's HashSet . The
   * conflation map should contain the Old IDs position.
   */
  @Test
  public void testCleanUpForConflation() throws Exception {
    this.logWriter.info("HARQAddOperationJUnitTest : testCleanUpForConflation BEGIN");
    testFailed = false;
    message = null;
    final int numOfThreads = 10;
    final int numOfPuts = 567;
    final HARegionQueue regionqueue = createHARegionQueue("testCleanUpForConflation");

    this.logWriter
        .info("HARQAddOperationJUnitTest : testCleanUpForConflation after regionqueue create");
    /*
     * doing concurrent put operations on different threadIDs but for the same key
     */
    Thread[] threads = new Thread[10];
    for (int i = 0; i < numOfThreads; i++) {
      final long ids = i;
      threads[i] = new Thread() {
        @Override
        public void run() {
          for (int j = 0; j < numOfPuts; j++) {
            EventID id = new EventID(new byte[] {(byte) ids}, ids, j);
            try {
              regionqueue.put(
                  new ConflatableObject(KEY1, id.getThreadID() + "VALUE" + j, id, true, "region1"));
            } catch (Exception ex) {
              testFailed = true;
              message.append("put failed for the threadId " + id.getThreadID());
            }

          }

        }
      };
    }

    for (int k = 0; k < numOfThreads; k++) {
      threads[k].start();
    }

    for (int k = 0; k < numOfThreads; k++) {
      ThreadUtils.join(threads[k], 180 * 1000);
    }

    this.logWriter.info("HARQAddOperationJUnitTest : testCleanUpForConflation after join");

    if (testFailed)
      fail("Test failed due to " + message);

    assertEquals(
        "size of the conflation map should be 1 but actual size is "
            + regionqueue.getConflationMapForTesting().size(),
        1, regionqueue.getConflationMapForTesting().size());
    assertEquals(
        "size of the event map should be " + numOfThreads + " but actual size "
            + regionqueue.getEventsMapForTesting().size(),
        numOfThreads, regionqueue.getEventsMapForTesting().size());
    assertEquals(
        "size of availableids should 1 but actual size " + regionqueue.getAvailableIds().size(), 1,
        regionqueue.getAvailableIds().size());
    int count = 0;
    for (int i = 0; i < numOfThreads; i++) {
      if ((regionqueue.getCurrentCounterSet(new EventID(new byte[] {(byte) i}, i, i))).size() > 0) {
        count++;
      }
    }

    assertEquals("size of the counter set is  1 but the actual size is " + count, 1, count);

    Long position = null;
    if (regionqueue.getAvailableIds().size() == 1) {
      position = (Long) regionqueue.getAvailableIds().iterator().next();
    }
    ConflatableObject id = (ConflatableObject) regionqueue.getRegion().get(position);
    assertEquals(regionqueue.getCurrentCounterSet(id.getEventId()).size(), 1);
    this.logWriter.info("HARQAddOperationJUnitTest : testCleanUpForConflation END");
  }

  /**
   * Test where a 5 separate threads do 4 puts each. A thread then peeks the 20 objects & then
   * invokes remove. The remove should ensure that the entries are deleted from the available IDs &
   * the Counters set contained in DACE. Conflation is disabled.
   */
  @Test
  public void testPeekAndRemoveWithoutConflation() throws Exception {
    testFailed = false;
    message = null;
    final int numOfThreads = 5;
    final int numOfPuts = 4;
    final int batchSize = 20;
    final HARegionQueue regionqueue = createHARegionQueue("testPeekAndRemoveWithoutConflation");
    Thread[] threads = new Thread[numOfThreads];
    for (int i = 0; i < numOfThreads; i++) {
      final long ids = i;
      threads[i] = new Thread() {
        @Override
        public void run() {
          for (int j = 0; j < numOfPuts; j++) {
            EventID id = new EventID(new byte[] {(byte) ids}, ids, j);
            try {
              regionqueue.put(new ConflatableObject(KEY1 + id.getThreadID() + j,
                  id.getThreadID() + "VALUE" + j, id, false, "region1"));
            } catch (Exception ex) {
              testFailed = true;
              message.append("put failed for the threadId " + id.getThreadID());
            }
          }
        }
      };
    }

    for (int k = 0; k < numOfThreads; k++) {
      threads[k].start();
    }

    for (int k = 0; k < numOfThreads; k++) {
      ThreadUtils.join(threads[k], 180 * 1000);
    }

    if (testFailed)
      fail("Test failed due to " + message);

    List pickObjects = regionqueue.peek(batchSize);
    assertEquals(batchSize, pickObjects.size());
    regionqueue.remove();

    for (int i = 0; i < numOfThreads; i++) {
      assertEquals(3,
          regionqueue.getLastDispatchedSequenceId(new EventID(new byte[] {(byte) i}, i, 1)));
      assertEquals(0,
          regionqueue.getCurrentCounterSet(new EventID(new byte[] {(byte) i}, i, 1)).size());
    }

    assertEquals(0, regionqueue.getAvailableIds().size());

    this.logWriter.info("testPeekAndRemoveWithoutConflation() completed successfully");
  }

  /**
   * Test where a 5 separate threads do 4 puts each. A thread then peeks the 20 objects & then
   * invokes remove. The remove should ensure that the entries are deleted from the available IDs &
   * the Counters set contained in DACE. Conflation is enabled
   */
  @Test
  public void testPeekAndRemoveWithConflation() throws Exception {
    testFailed = false;
    message = null;
    final int numOfThreads = 5;

    final int numOfPuts = 4;
    final int batchSize = numOfThreads * numOfPuts;
    final HARegionQueue regionqueue = createHARegionQueue("testPeekAndRemoveWithConflation");
    Thread[] threads = new Thread[numOfThreads];
    for (int i = 0; i < numOfThreads; i++) {
      final long ids = i;
      threads[i] = new Thread() {
        @Override
        public void run() {
          for (int j = 0; j < numOfPuts; j++) {
            EventID id = new EventID(new byte[] {(byte) ids}, ids, j);
            try {
              regionqueue.put(new ConflatableObject(KEY1 + ids, id.getThreadID() + "VALUE" + j, id,
                  true, "region1"));
            } catch (Exception ex) {
              testFailed = true;
              message.append("put failed for the threadId " + id.getThreadID());
            }
          }
        }
      };
    }

    for (int k = 0; k < numOfThreads; k++) {
      threads[k].start();
    }

    for (int k = 0; k < numOfThreads; k++) {
      ThreadUtils.join(threads[k], 180 * 1000);
    }

    if (testFailed)
      fail("Test failed due to " + message);

    List pickObject = regionqueue.peek(batchSize);
    assertEquals(numOfThreads, pickObject.size());
    regionqueue.remove();

    for (int i = 0; i < numOfThreads; i++) {
      // assertIndexDetailsEquals(numOfPuts,
      // regionqueue.getLastDispatchedSequenceId(new EventID(
      // new byte[] { (byte)i }, i, 1)));
      assertEquals(0,
          regionqueue.getCurrentCounterSet(new EventID(new byte[] {(byte) i}, i, 1)).size());
    }

    assertEquals("size of availableIds map should be 0 ", 0, regionqueue.getAvailableIds().size());
    assertEquals("size of conflation map should be 0 ", 0,
        ((Map) regionqueue.getConflationMapForTesting().get("region1")).size());

    this.logWriter.info("testPeekAndRemoveWithConflation() completed successfully");
  }

  /**
   * Test where a 5 separate threads do 4 puts each. 4 threads then concurrently do a peek of batch
   * size 5, 10 , 15 & 20 respectively. And all of them concurrently cal remove. The remove should
   * ensure that the entries are deleted from the available IDs & the Counters set contained in
   * DACE.
   */
  @Test
  public void testPeekForDiffBatchSizeAndRemoveAll() throws Exception {
    testFailed = false;
    message = null;
    barrierCount = 0;
    final int numOfThreads = 5;
    final int numOfPuts = 4;
    // final CountDownLatch mylatch = new CountDownLatch(4);
    final HARegionQueue regionqueue = createHARegionQueue("testPeekForDiffBatchSizeAndRemoveAll");
    Thread[] threads = new Thread[numOfThreads];
    for (int i = 0; i < numOfThreads; i++) {
      final long ids = i;
      threads[i] = new Thread() {
        @Override
        public void run() {
          for (int j = 0; j < numOfPuts; j++) {
            EventID id = new EventID(new byte[] {(byte) ids}, ids, j);
            try {
              regionqueue.put(new ConflatableObject(KEY1 + id.getThreadID() + j,
                  id.getThreadID() + "VALUE" + j, id, false, "region1"));
            } catch (Exception ex) {
              testFailed = true;
              message.append("put failed for the threadId " + id.getThreadID());
            }
          }
        }
      };
    }

    for (int k = 0; k < numOfThreads; k++) {
      threads[k].start();
    }

    for (int k = 0; k < numOfThreads; k++) {
      ThreadUtils.join(threads[k], 180 * 1000);
    }

    if (testFailed)
      fail("Test failed due to " + message);

    testFailed = false;
    message = null;

    Thread[] threads_peek_remove = new Thread[numOfPuts];
    for (int i = 1; i < (numOfPuts + 1); i++) {
      final int peakBatchSize = i * 5;
      threads_peek_remove[i - 1] = new Thread() {

        @Override
        public void run() {
          try {
            List peakObjects = regionqueue.peek(peakBatchSize);
            assertEquals(peakBatchSize, peakObjects.size());
            synchronized (HARQAddOperationJUnitTest.this) {
              ++barrierCount;
              if (barrierCount == 4) {
                HARQAddOperationJUnitTest.this.notifyAll();
              } else {
                HARQAddOperationJUnitTest.this.wait();
              }
            }
            regionqueue.remove();

          } catch (Exception ex) {
            testFailed = true;
            ex.printStackTrace();
            message.append("Exception while performing peak operation " + ex.getStackTrace());

          }

        }

      };
    }

    for (int k = 0; k < numOfPuts; k++) {
      threads_peek_remove[k].start();
    }

    for (int k = 0; k < numOfPuts; k++) {
      ThreadUtils.join(threads_peek_remove[k], 180 * 1000);
    }

    if (testFailed)
      fail("Test failed due to " + message);

    for (int i = 0; i < numOfThreads; i++) {
      assertEquals(3,
          regionqueue.getLastDispatchedSequenceId(new EventID(new byte[] {(byte) i}, i, 1)));
      assertEquals(0,
          regionqueue.getCurrentCounterSet(new EventID(new byte[] {(byte) i}, i, 1)).size());
    }

    assertEquals(0, regionqueue.getAvailableIds().size());

    this.logWriter.info("testPeekForDiffBatchSizeAndRemoveAll() completed successfully");
  }

  /**
   * Test where a 5 separate threads do 4 puts each. 3 threads then concurrently do a peek of batch
   * size 5, 10 and 15 respectively. And all of them concurrently call remove. The remove should
   * ensure that the entries are deleted from the available IDs & the Counters set contained in
   * DACE.
   */
  @Test
  public void testPeekForDiffBatchSizeAndRemoveSome() throws Exception {
    testFailed = false;
    barrierCount = 0;
    message = null;
    final int numOfThreads = 5;
    final int numOfPuts = 4;
    final HARegionQueue regionqueue = createHARegionQueue("testPeekForDiffBatchSizeAndRemoveSome");
    Thread[] threads = new Thread[numOfThreads];
    for (int i = 0; i < numOfThreads; i++) {
      final long ids = i;
      threads[i] = new Thread() {
        @Override
        public void run() {
          for (int j = 0; j < numOfPuts; j++) {
            EventID id = new EventID(new byte[] {(byte) ids}, ids, j);
            try {
              regionqueue.put(new ConflatableObject(KEY1 + id.getThreadID() + j,
                  id.getThreadID() + "VALUE" + j, id, false, "region1"));
            } catch (Exception ex) {
              testFailed = true;
              message.append("put failed for the threadId " + id.getThreadID());
            }
          }
        }
      };
    }

    for (int k = 0; k < numOfThreads; k++) {
      threads[k].start();
    }

    for (int k = 0; k < numOfThreads; k++) {
      ThreadUtils.join(threads[k], 180 * 1000);
    }

    if (testFailed)
      fail("Test failed due to " + message);

    testFailed = false;
    message = null;
    Thread[] threads_peek_remove = new Thread[numOfPuts - 1];
    for (int i = 1; i < numOfPuts; i++) {
      final int peakBatchSize = i * 5;
      threads_peek_remove[i - 1] = new Thread() {

        @Override
        public void run() {
          try {
            List peakObjects = regionqueue.peek(peakBatchSize);
            assertEquals(peakBatchSize, peakObjects.size());
            synchronized (HARQAddOperationJUnitTest.this) {
              ++barrierCount;
              if (barrierCount == 3) {
                HARQAddOperationJUnitTest.this.notifyAll();
              } else {
                HARQAddOperationJUnitTest.this.wait();
              }
            }
            regionqueue.remove();

          } catch (Exception ex) {
            testFailed = true;
            ex.printStackTrace();
            message.append("Exception while performing peak operation " + ex.getStackTrace());

          }

        }

      };
    }

    for (int k = 0; k < numOfPuts - 1; k++) {
      threads_peek_remove[k].start();
    }

    for (int k = 0; k < numOfPuts - 1; k++) {
      ThreadUtils.join(threads_peek_remove[k], 180 * 1000);
    }

    if (testFailed)
      fail("Test failed due to " + message);

    assertEquals(5, regionqueue.getAvailableIds().size());

    this.logWriter.info("testPeekForDiffBatchSizeAndRemoveSome() completed successfully");
  }

  /**
   * Add with QRM & expiry : Add 10 conflatable objects (0-9). Send QRM LastDispatched as 4.
   * Validate the sequence ID field of LastDispatchedWrapper object is updated to 4. Perform Take
   * operations to remove next 5 to 9. The seqeunceId field should be 9. Allow ThreadIdenitifier to
   * expire. The expiration should fail as the original sequenceId ( -1) does not match with 9.
   * Validate reset with value 9 . The next expiry should remove the LastDisptachedWrapper
   */
  @Test
  public void testAddWithQRMAndExpiry() throws Exception {
    try {
      HARegionQueueAttributes attrs = new HARegionQueueAttributes();
      attrs.setExpiryTime(10);
      final HARegionQueue regionqueue =
          new TestOnlyHARegionQueue("testing", cache, attrs, disabledClock()) {
            @Override
            CacheListener createCacheListenerForHARegion() {
              return new CacheListenerAdapter() {
                @Override
                public void afterInvalidate(EntryEvent event) {
                  try {
                    expireTheEventOrThreadIdentifier(event);
                  } catch (CacheException ce) {
                    logger.error(
                        "HAREgionQueue::createCacheListener::Exception in the expiry thread", ce);
                  }
                  if (event.getKey() instanceof ThreadIdentifier) {
                    synchronized (HARQAddOperationJUnitTest.this) {
                      expiryCount++;
                      HARQAddOperationJUnitTest.this.notify();
                    }
                  }
                }
              };
            }
          };
      Conflatable[] cf = new Conflatable[10];
      // put 10 conflatable objects
      for (int i = 0; i < 10; i++) {
        cf[i] = new ConflatableObject("key" + i, "value", new EventID(new byte[] {1}, 1, i), true,
            "testing");
        regionqueue.put(cf[i]);
      }
      ThreadIdentifier tID = new ThreadIdentifier(new byte[] {1}, 1);
      // verify that the sequence-id for Thread-identifier is -1 (default value).
      assertEquals(new Long(-1), regionqueue.getRegion().get(tID));

      // remove the first 5 - (0-4 sequence IDs)
      regionqueue.removeDispatchedEvents(new EventID(new byte[] {1}, 1, 4));

      // verify that the last dispatched event was of sequence id 4
      assertEquals(4, regionqueue.getLastDispatchedSequenceId(new EventID(new byte[] {1}, 1, 1)));
      // verify 1-5 not in region
      for (long i = 1; i < 6; i++) {
        assertTrue(!regionqueue.getRegion().containsKey(new Long(i)));
      }
      // verify 6-10 still in region queue
      for (long i = 6; i < 11; i++) {
        assertTrue(regionqueue.getRegion().containsKey(new Long(i)));
      }

      // Perform 5 take operations to remove next 5-9 sequence ids
      for (long i = 6; i < 11; i++) {
        regionqueue.take();
      }

      // verify that the last dispatched event was of sequence id 10
      assertEquals(9, regionqueue.getLastDispatchedSequenceId(new EventID(new byte[] {1}, 1, 1)));
      // verify that sequence ids 1-10 all are removed from the RQ
      for (long i = 1; i < 11; i++) {
        assertTrue(!regionqueue.getRegion().containsKey(new Long(i)));
      }

      // wait until expiry thread has run once
      synchronized (HARQAddOperationJUnitTest.this) {
        if (0 == expiryCount) {
          HARQAddOperationJUnitTest.this.wait();
        }
        if (1 == expiryCount) {
          // verify that the Thread-identifier has not yet expired
          assertEquals(1, regionqueue.getEventsMapForTesting().size());

          // verify that the sequence-id for Thread-identifier is updated to 9
          assertEquals(new Long(9), regionqueue.getRegion().get(tID));

          // wait until expiry thread has run again
          HARQAddOperationJUnitTest.this.wait();
        }
      }

      // verify that the Thread-identifier has expired
      assertEquals(0, regionqueue.getEventsMapForTesting().size());
      // verify that the sequence-id for Thread-identifier is null
      assertNull(regionqueue.getRegion().get(tID));
    } catch (Exception e) {
      throw new AssertionError("Exception occurred in test due to", e);
    }
  }

  /*
   * This test does the following:<br> 1)Create a blocking HARegionQueue<br> 2)Add some events to
   * the queue with same ThreadIdentifier<br> 3)Do take() operations to drain the queue<br> 4)Verify
   * that dispatchedMessagesMap is not null<br> 5)Verify that size of the dispatchedMessagesMap is 1
   * as one regionqueue is created in this test<br> 6)Verify that the map contains an entry for the
   * queue-region name<br> 7)Verify that the size of wrapper-map is 1 as all events had same
   * ThreadId<br> 8)Verify that the sequenceId against the ThreadId in the wrapper-map is same as
   * that of the last event taken<br>
   */

  /**
   * Behaviour of take() has been changed for relaible messaging feature. Region queue take()
   * operation will no longer add to the Dispatch Message Map. Hence disabling the test - SUYOG
   */
  @Ignore
  @Test
  public void testDispatchedMsgsMapUpdateOnTakes() throws Exception {
    this.logWriter.info("HARQAddOperationJUnitTest : testDispatchedEventsMapUpdateOnTakes BEGIN");

    String regionName = "testDispatchedEventsMapUpdateOnTakes";
    HARegionQueue rq = createHARegionQueue(regionName);

    Conflatable cf = null;
    EventID id = null;

    int totalEvents = 10;
    for (int i = 0; i < totalEvents; i++) {
      id = new EventID(new byte[] {1}, 1, i);
      cf = new ConflatableObject("key" + i, "value" + i, id, false, "testing");
      rq.put(cf);
    }

    for (int i = 0; i < totalEvents; i++) {
      rq.take();
    }

    Map dispatchedMsgMap = HARegionQueue.getDispatchedMessagesMapForTesting();
    // verify that map is not null
    assertNotNull("dispatchedMessagesMap found null", dispatchedMsgMap);

    // size of the dispatchedMessagesMap should be 1 as one regionqueue is
    // created in this test
    assertEquals("size of dispatched msgs should be 1", 1, dispatchedMsgMap.size());

    // verify that the map contains an entry for the queue-region name
    MapWrapper wrapper = (MapWrapper) dispatchedMsgMap.get(regionName);
    assertNotNull("dispatchedMsgMap should contain an entry with queueregion name as key", wrapper);

    Map dispatchedData = wrapper.map;
    assertEquals("size of wrapper-map should be 1 as all events had same ThreadId", 1,
        dispatchedData.size());

    ThreadIdentifier tid = new ThreadIdentifier(new byte[] {1}, 1);
    Long seqId = (Long) dispatchedData.get(tid);

    assertEquals(
        "sequenceId against the ThreadId in the wrapper-map should be that of the last event taken.",
        id.getSequenceID(), seqId.longValue());

    this.logWriter.info("HARQAddOperationJUnitTest : testDispatchedEventsMapUpdateOnTakes END");
  }
}
