blob: 6a769afbfb36d87a97a9cdd9e6d1a62b4c413207 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache.ha;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Vector;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.geode.cache.CacheException;
import org.apache.geode.internal.cache.Conflatable;
import org.apache.geode.internal.cache.EventID;
import org.apache.geode.test.dunit.ThreadUtils;
import org.apache.geode.test.dunit.Wait;
import org.apache.geode.test.junit.categories.ClientSubscriptionTest;
/**
* Test runs all tests of HARQAddOperationJUnitTest using BlockingHARegionQueue instead of
* HARegionQueue
*
*
*/
@Category({ClientSubscriptionTest.class})
public class BlockingHARQAddOperationJUnitTest extends HARQAddOperationJUnitTest {
/**
* Creates Blocking HA region-queue object
*
* @return Blocking HA region-queue object
*/
@Override
protected HARegionQueue createHARegionQueue(String name)
throws IOException, ClassNotFoundException, CacheException, InterruptedException {
HARegionQueue regionqueue =
HARegionQueue.getHARegionQueueInstance(name, cache, HARegionQueue.BLOCKING_HA_QUEUE, false);
return regionqueue;
}
/**
* Creates Blocking HA region-queue object
*
* @return Blocking HA region-queue object
*/
@Override
protected HARegionQueue createHARegionQueue(String name, HARegionQueueAttributes attrs)
throws IOException, ClassNotFoundException, CacheException, InterruptedException {
HARegionQueue regionqueue = HARegionQueue.getHARegionQueueInstance(name, cache, attrs,
HARegionQueue.BLOCKING_HA_QUEUE, false);
return regionqueue;
}
/**
* Tests the take() functionality of <code>BlockingHARegionQueue<code> with conflation disabled.
*
*/
@Test
public void testBlockingTakeConflationDisabled() throws Exception {
this.logWriter.info("HARQAddOperationJUnitTest : testBlockingTakeConflationDisabled BEGIN");
doBlockingTake(false);
this.logWriter.info("HARQAddOperationJUnitTest : testBlockingTakeConflationDisabled END");
}
/**
* Tests the take() functionality of <code>BlockingHARegionQueue<code> with conflation enabled.
*
*
*/
@Test
public void testBlockingTakeConflationEnabled() throws Exception {
this.logWriter.info("HARQAddOperationJUnitTest : testBlockingTakeConflationEnabled BEGIN");
doBlockingTake(true);
this.logWriter.info("HARQAddOperationJUnitTest : testBlockingTakeConflationEnabled END");
}
/**
* This method performs the following steps :<br>
* 1)Create a blocking queue and start a thread which does take() on it. 2)Verify after
* significant time that the thread is still alive as it should be blocked on take() since there
* are no events in the queue.<br>
* 3)Do a put into the queue and verify that the take thread returns with the same object.
*
* @param conflationEnabled - whether conflation is enabled or not
*
*/
public void doBlockingTake(boolean conflationEnabled) throws Exception {
testFailed = false;
message = null;
final HARegionQueue rq = createHARegionQueue("testBlockingTake");
final List takenObjects = new ArrayList();
Thread takeThread = new Thread() {
@Override
public void run() {
try {
takenObjects.add(rq.take());
} catch (Exception e) {
testFailed = true;
message.append("Exception while performing take operation " + e.getStackTrace());
}
}
};
takeThread.start();
Wait.pause(20 * 1000);
if (!takeThread.isAlive()) {
fail("take() thread died ");
}
EventID id1 = new EventID(new byte[] {1}, 1, 1);
ConflatableObject c1 = new ConflatableObject(KEY1, VALUE1, id1, conflationEnabled, "region1");
rq.put(c1);
ThreadUtils.join(takeThread, 20 * 1000);
assertEquals(1, takenObjects.size());
Conflatable obj = (Conflatable) takenObjects.get(0);
assertNotNull(obj);
assertEquals(id1, obj.getEventId());
if (testFailed)
fail("Test failed due to " + message);
}
/**
* This test performs the following steps :<br>
* 1)Create a blocking queue.<br>
* 2) Start two threads which does take() on it and add the return object to a list.<br>
* 3)Put two object into the queue. <br>
* 4)Verify both both take() threads return with an object by ensuring that the size of the list
* containing return objects is two.<br>
*
*
*/
@Test
public void testConcurrentBlockingTake() throws Exception {
this.logWriter.info("HARQAddOperationJUnitTest : testConcurrentBlockingTake BEGIN");
testFailed = false;
message = null;
final HARegionQueue rq = createHARegionQueue("testBlockingTake");
final List takenObjects = new Vector();
final int totalTakeThreads = 2;
Thread[] takeThreads = new Thread[totalTakeThreads];
for (int i = 0; i < totalTakeThreads; i++) {
takeThreads[i] = new Thread() {
@Override
public void run() {
try {
takenObjects.add(rq.take());
} catch (Exception e) {
testFailed = true;
message.append("Exception while performing take operation " + e.getStackTrace());
}
}
};
takeThreads[i].start();
}
Conflatable c = null;
EventID id = null;
for (int i = 0; i < totalTakeThreads; i++) {
id = new EventID(new byte[] {1}, 1, i);
c = new ConflatableObject("k" + i, "v" + i, id, true, "region1");
rq.put(c);
}
for (int i = 0; i < totalTakeThreads; i++) {
ThreadUtils.join(takeThreads[i], 20 * 1000);
}
assertEquals(totalTakeThreads, takenObjects.size());
for (int i = 0; i < totalTakeThreads; i++) {
c = (Conflatable) takenObjects.get(i);
assertNotNull(c);
}
if (testFailed)
fail("Test failed due to " + message);
this.logWriter.info("HARQAddOperationJUnitTest : testConcurrentBlockingTake END");
}
}