blob: 632383d5f5e27476c368e92c8449318e98411fe6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.cache30;
import static org.apache.geode.cache.DataPolicy.NORMAL;
import static org.apache.geode.cache.InterestPolicy.ALL;
import static org.apache.geode.cache.Operation.CREATE;
import static org.apache.geode.cache.Operation.DESTROY;
import static org.apache.geode.cache.Operation.INVALIDATE;
import static org.apache.geode.cache.Operation.PUTALL_CREATE;
import static org.apache.geode.cache.Operation.REGION_CLEAR;
import static org.apache.geode.cache.Operation.REGION_CREATE;
import static org.apache.geode.cache.Operation.REGION_INVALIDATE;
import static org.apache.geode.cache.Operation.UPDATE;
import static org.apache.geode.cache.Scope.DISTRIBUTED_ACK;
import static org.apache.geode.distributed.ConfigurationProperties.ROLES;
import static org.apache.geode.test.dunit.Host.getHost;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.TreeMap;
import org.junit.Ignore;
import org.junit.Test;
import org.apache.geode.cache.AttributesFactory;
import org.apache.geode.cache.CacheEvent;
import org.apache.geode.cache.CacheException;
import org.apache.geode.cache.DataPolicy;
import org.apache.geode.cache.EntryEvent;
import org.apache.geode.cache.InterestPolicy;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionEvent;
import org.apache.geode.cache.Scope;
import org.apache.geode.cache.SubscriptionAttributes;
import org.apache.geode.internal.cache.CachePerfStats;
import org.apache.geode.internal.cache.DistributedRegion;
import org.apache.geode.test.awaitility.GeodeAwaitility;
import org.apache.geode.test.dunit.Host;
import org.apache.geode.test.dunit.SerializableRunnable;
import org.apache.geode.test.dunit.VM;
import org.apache.geode.test.dunit.WaitCriterion;
/**
* Test to make sure message queuing works.
*
* @since GemFire 5.0
*/
public class QueueMsgDUnitTest extends ReliabilityTestCase {
/**
* Make sure that cache operations are queued when a required role is missing
*/
@Ignore("TODO: test is disabled")
@Test
public void testQueueWhenRoleMissing() throws Exception {
AttributesFactory factory = new AttributesFactory();
factory.setScope(DISTRIBUTED_ACK);
DistributedRegion r = (DistributedRegion) createRootRegion(factory.create());
final CachePerfStats stats = r.getCachePerfStats();
long queuedOps = stats.getReliableQueuedOps();
r.create("createKey", "createValue", "createCBArg");
r.invalidate("createKey", "invalidateCBArg");
r.put("createKey", "putValue", "putCBArg");
r.destroy("createKey", "destroyCBArg");
assertEquals(queuedOps + 4, stats.getReliableQueuedOps());
queuedOps = stats.getReliableQueuedOps();
{
Map m = new TreeMap();
m.put("aKey", "aValue");
m.put("bKey", "bValue");
r.putAll(m);
}
assertEquals(queuedOps + 2, stats.getReliableQueuedOps());
queuedOps = stats.getReliableQueuedOps();
r.invalidateRegion("invalidateRegionCBArg");
assertEquals(queuedOps + 1, stats.getReliableQueuedOps());
queuedOps = stats.getReliableQueuedOps();
r.clear();
assertEquals(queuedOps + 1, stats.getReliableQueuedOps());
queuedOps = stats.getReliableQueuedOps();
// @todo darrel: try some other ops
VM vm = getHost(0).getVM(0);
// now create a system that fills this role since it does not create the
// region our queue should not be flushed
vm.invoke(new SerializableRunnable() {
@Override
public void run() {
Properties config = new Properties();
config.setProperty(ROLES, "missing");
getSystem(config);
}
});
// we still should have everything queued since the region is not created
assertEquals(queuedOps, stats.getReliableQueuedOps());
// now create the region
vm.invoke(new CacheSerializableRunnable("create root") {
@Override
public void run2() throws CacheException {
AttributesFactory factory = new AttributesFactory();
factory.setScope(DISTRIBUTED_ACK);
factory.setDataPolicy(NORMAL);
factory.setSubscriptionAttributes(new SubscriptionAttributes(ALL));
TestCacheListener cl = new TestCacheListener() {
@Override
public void afterCreate2(EntryEvent event) {}
@Override
public void afterUpdate2(EntryEvent event) {}
@Override
public void afterInvalidate2(EntryEvent event) {}
@Override
public void afterDestroy2(EntryEvent event) {}
};
cl.enableEventHistory();
factory.addCacheListener(cl);
createRootRegion(factory.create());
}
});
// after some amount of time we should see the queuedOps flushed
WaitCriterion ev = new WaitCriterion() {
@Override
public boolean done() {
return stats.getReliableQueuedOps() == 0;
}
@Override
public String description() {
return "waiting for reliableQueuedOps to become 0";
}
};
GeodeAwaitility.await().untilAsserted(ev);
// now check that the queued op was delivered
vm.invoke(new CacheSerializableRunnable("check") {
@Override
public void run2() throws CacheException {
Region r = getRootRegion();
assertEquals(null, r.getEntry("createKey"));
// assertIndexDetailsEquals("putValue", r.getEntry("createKey").getValue());
{
int evIdx = 0;
TestCacheListener cl = (TestCacheListener) r.getAttributes().getCacheListener();
List events = cl.getEventHistory();
{
CacheEvent ce = (CacheEvent) events.get(evIdx++);
assertEquals(REGION_CREATE, ce.getOperation());
}
{
EntryEvent ee = (EntryEvent) events.get(evIdx++);
assertEquals(CREATE, ee.getOperation());
assertEquals("createKey", ee.getKey());
assertEquals("createValue", ee.getNewValue());
assertEquals(null, ee.getOldValue());
assertEquals("createCBArg", ee.getCallbackArgument());
assertEquals(true, ee.isOriginRemote());
}
{
EntryEvent ee = (EntryEvent) events.get(evIdx++);
assertEquals(INVALIDATE, ee.getOperation());
assertEquals("createKey", ee.getKey());
assertEquals(null, ee.getNewValue());
assertEquals("createValue", ee.getOldValue());
assertEquals("invalidateCBArg", ee.getCallbackArgument());
assertEquals(true, ee.isOriginRemote());
}
{
EntryEvent ee = (EntryEvent) events.get(evIdx++);
assertEquals(UPDATE, ee.getOperation());
assertEquals("createKey", ee.getKey());
assertEquals("putValue", ee.getNewValue());
assertEquals(null, ee.getOldValue());
assertEquals("putCBArg", ee.getCallbackArgument());
assertEquals(true, ee.isOriginRemote());
}
{
EntryEvent ee = (EntryEvent) events.get(evIdx++);
assertEquals(DESTROY, ee.getOperation());
assertEquals("createKey", ee.getKey());
assertEquals(null, ee.getNewValue());
assertEquals("putValue", ee.getOldValue());
assertEquals("destroyCBArg", ee.getCallbackArgument());
assertEquals(true, ee.isOriginRemote());
}
{
EntryEvent ee = (EntryEvent) events.get(evIdx++);
assertEquals(PUTALL_CREATE, ee.getOperation());
assertEquals("aKey", ee.getKey());
assertEquals("aValue", ee.getNewValue());
assertEquals(null, ee.getOldValue());
assertEquals(null, ee.getCallbackArgument());
assertEquals(true, ee.isOriginRemote());
}
{
EntryEvent ee = (EntryEvent) events.get(evIdx++);
assertEquals(PUTALL_CREATE, ee.getOperation());
assertEquals("bKey", ee.getKey());
assertEquals("bValue", ee.getNewValue());
assertEquals(null, ee.getOldValue());
assertEquals(null, ee.getCallbackArgument());
assertEquals(true, ee.isOriginRemote());
}
{
RegionEvent re = (RegionEvent) events.get(evIdx++);
assertEquals(REGION_INVALIDATE, re.getOperation());
assertEquals("invalidateRegionCBArg", re.getCallbackArgument());
assertEquals(true, re.isOriginRemote());
}
{
RegionEvent re = (RegionEvent) events.get(evIdx++);
assertEquals(REGION_CLEAR, re.getOperation());
assertEquals(null, re.getCallbackArgument());
assertEquals(true, re.isOriginRemote());
}
assertEquals(evIdx, events.size());
}
}
});
}
/**
* Make sure a queued region does not allow non-queued subscribers
*/
@Ignore("TODO: test is disabled")
@Test
public void testIllegalConfigQueueExists() throws Exception {
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.DISTRIBUTED_ACK);
createRootRegion(factory.create());
VM vm = Host.getHost(0).getVM(0);
vm.invoke(new SerializableRunnable() {
@Override
public void run() {
Properties config = new Properties();
config.setProperty(ROLES, "pubFirst");
getSystem(config);
}
});
// now create the region
vm.invoke(new CacheSerializableRunnable("create root") {
@Override
public void run2() throws CacheException {
final String expectedExceptions = "does not allow queued messages";
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.DISTRIBUTED_ACK);
// setting the following makes things legal
factory.setDataPolicy(DataPolicy.NORMAL);
factory.setSubscriptionAttributes(new SubscriptionAttributes(InterestPolicy.ALL));
getCache().getLogger()
.info("<ExpectedException action=add>" + expectedExceptions + "</ExpectedException>");
try {
createRootRegion(factory.create());
fail("expected IllegalStateException");
} catch (IllegalStateException ignored) {
} finally {
getCache().getLogger().info(
"<ExpectedException action=remove>" + expectedExceptions + "</ExpectedException>");
}
}
});
}
/**
* Make sure a subscriber that does not allow queued messages causes a queued publisher to fail
* creation
*/
@Ignore("TODO: test is disabled")
@Test
public void testIllegalConfigSubscriberExists() throws Exception {
final String expectedExceptions = "does not allow queued messages";
VM vm = Host.getHost(0).getVM(0);
vm.invoke(new SerializableRunnable() {
@Override
public void run() {
Properties config = new Properties();
config.setProperty(ROLES, "subFirst");
getSystem(config);
}
});
// now create the region
vm.invoke(new CacheSerializableRunnable("create root") {
@Override
public void run2() throws CacheException {
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.DISTRIBUTED_ACK);
// setting the following makes things legal
factory.setDataPolicy(DataPolicy.NORMAL);
factory.setSubscriptionAttributes(new SubscriptionAttributes(InterestPolicy.ALL));
createRootRegion(factory.create());
}
});
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.DISTRIBUTED_ACK);
getCache().getLogger()
.info("<ExpectedException action=add>" + expectedExceptions + "</ExpectedException>");
try {
createRootRegion(factory.create());
fail("expected IllegalStateException");
} catch (IllegalStateException ignored) {
} finally {
getCache().getLogger()
.info("<ExpectedException action=remove>" + expectedExceptions + "</ExpectedException>");
}
}
}