blob: c9e8b92190456d9c0389531670b1a5389fc85d23 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.cache;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.util.Properties;
import java.util.Set;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.geode.cache.asyncqueue.AsyncEventListener;
import org.apache.geode.cache.asyncqueue.AsyncEventQueue;
import org.apache.geode.cache.asyncqueue.AsyncEventQueueFactory;
import org.apache.geode.cache.wan.GatewayEventFilter;
import org.apache.geode.cache.wan.GatewayQueueEvent;
import org.apache.geode.cache.wan.GatewayReceiver;
import org.apache.geode.cache.wan.GatewayReceiverFactory;
import org.apache.geode.cache.wan.GatewaySender;
import org.apache.geode.cache.wan.GatewaySenderFactory;
import org.apache.geode.cache.wan.GatewayTransportFilter;
import org.apache.geode.cache30.CacheXml70DUnitTestHelper;
import org.apache.geode.cache30.CacheXmlTestCase;
import org.apache.geode.cache30.MyGatewayEventFilter1;
import org.apache.geode.cache30.MyGatewayTransportFilter1;
import org.apache.geode.cache30.MyGatewayTransportFilter2;
import org.apache.geode.internal.cache.xmlcache.CacheCreation;
import org.apache.geode.internal.cache.xmlcache.CacheXml;
import org.apache.geode.internal.cache.xmlcache.ParallelGatewaySenderCreation;
import org.apache.geode.internal.cache.xmlcache.RegionAttributesCreation;
import org.apache.geode.internal.cache.xmlcache.SerialGatewaySenderCreation;
import org.apache.geode.test.junit.categories.WanTest;
@Category({WanTest.class})
public class CacheXml70GatewayDUnitTest extends CacheXmlTestCase {
@Override
protected String getGemFireVersion() {
return CacheXml.VERSION_7_0;
}
/**
* Added to test the scenario of defect #50600.
*/
@Test
public void testAsyncEventQueueWithGatewayEventFilter() throws Exception {
getSystem();
CacheCreation cache = new CacheCreation();
String id = "WBCLChannel";
AsyncEventQueueFactory factory = cache.createAsyncEventQueueFactory();
factory.setBatchSize(100);
factory.setBatchTimeInterval(500);
factory.setBatchConflationEnabled(true);
factory.setMaximumQueueMemory(200);
factory.setDiskSynchronous(true);
factory.setParallel(false);
factory.setDispatcherThreads(33);
factory.addGatewayEventFilter(new MyGatewayEventFilter());
AsyncEventListener eventListener = new CacheXml70DUnitTestHelper.MyAsyncEventListener();
AsyncEventQueue asyncEventQueue = factory.create(id, eventListener);
RegionAttributesCreation attrs = new RegionAttributesCreation();
attrs.addAsyncEventQueueId(asyncEventQueue.getId());
cache.createRegion("UserRegion", attrs);
testXml(cache);
Cache c = getCache();
assertNotNull(c);
Set<AsyncEventQueue> asyncEventQueuesOnCache = c.getAsyncEventQueues();
assertTrue("Size of asyncEventQueues should be greater than 0",
asyncEventQueuesOnCache.size() > 0);
for (AsyncEventQueue asyncEventQueueOnCache : asyncEventQueuesOnCache) {
CacheXml70DUnitTestHelper.validateAsyncEventQueue(asyncEventQueue, asyncEventQueueOnCache);
}
}
@Test
public void testGatewayReceiver() throws Exception {
getSystem();
CacheCreation cache = new CacheCreation();
GatewayReceiverFactory gatewayReceiverFactory = cache.createGatewayReceiverFactory();
gatewayReceiverFactory.setBindAddress("");
gatewayReceiverFactory.setStartPort(20000);
gatewayReceiverFactory.setEndPort(29999);
gatewayReceiverFactory.setMaximumTimeBetweenPings(2000);
gatewayReceiverFactory.setSocketBufferSize(1500);
GatewayTransportFilter myStreamFilter1 = new MyGatewayTransportFilter1();
gatewayReceiverFactory.addGatewayTransportFilter(myStreamFilter1);
GatewayTransportFilter myStreamFilter2 = new MyGatewayTransportFilter2();
gatewayReceiverFactory.addGatewayTransportFilter(myStreamFilter2);
GatewayReceiver receiver1 = gatewayReceiverFactory.create();
receiver1.start();
testXml(cache);
Cache c = getCache();
assertNotNull(c);
Set<GatewayReceiver> receivers = c.getGatewayReceivers();
for (GatewayReceiver receiver : receivers) {
validateGatewayReceiver(receiver1, receiver);
}
}
@Test
public void testParallelGatewaySender() throws Exception {
getSystem();
CacheCreation cache = new CacheCreation();
GatewaySenderFactory gatewaySenderFactory = cache.createGatewaySenderFactory();
gatewaySenderFactory.setParallel(true);
gatewaySenderFactory.setDispatcherThreads(13);
gatewaySenderFactory.setManualStart(true);
gatewaySenderFactory.setSocketBufferSize(1234);
gatewaySenderFactory.setSocketReadTimeout(1050);
gatewaySenderFactory.setBatchConflationEnabled(false);
gatewaySenderFactory.setBatchSize(88);
gatewaySenderFactory.setBatchTimeInterval(9);
gatewaySenderFactory.setPersistenceEnabled(true);
gatewaySenderFactory.setDiskStoreName("LNSender");
gatewaySenderFactory.setDiskSynchronous(true);
gatewaySenderFactory.setMaximumQueueMemory(211);
gatewaySenderFactory.setAlertThreshold(35);
GatewayEventFilter myEventFilter1 = new MyGatewayEventFilter1();
gatewaySenderFactory.addGatewayEventFilter(myEventFilter1);
GatewayTransportFilter myStreamFilter1 = new MyGatewayTransportFilter1();
gatewaySenderFactory.addGatewayTransportFilter(myStreamFilter1);
GatewayTransportFilter myStreamFilter2 = new MyGatewayTransportFilter2();
gatewaySenderFactory.addGatewayTransportFilter(myStreamFilter2);
GatewaySender parallelGatewaySender = gatewaySenderFactory.create("LN", 2);
testXml(cache);
Cache c = getCache();
assertNotNull(c);
Set<GatewaySender> sendersOnCache = c.getGatewaySenders();
for (GatewaySender sender : sendersOnCache) {
assertEquals(true, sender.isParallel());
validateGatewaySender(parallelGatewaySender, sender);
}
}
@Test
public void testSerialGatewaySender() throws Exception {
getSystem();
CacheCreation cache = new CacheCreation();
GatewaySenderFactory gatewaySenderFactory = cache.createGatewaySenderFactory();
gatewaySenderFactory.setParallel(false);
gatewaySenderFactory.setManualStart(true);
gatewaySenderFactory.setSocketBufferSize(124);
gatewaySenderFactory.setSocketReadTimeout(1000);
gatewaySenderFactory.setBatchConflationEnabled(false);
gatewaySenderFactory.setBatchSize(100);
gatewaySenderFactory.setBatchTimeInterval(10);
gatewaySenderFactory.setPersistenceEnabled(true);
gatewaySenderFactory.setDiskStoreName("LNSender");
gatewaySenderFactory.setDiskSynchronous(true);
gatewaySenderFactory.setMaximumQueueMemory(200);
gatewaySenderFactory.setAlertThreshold(30);
GatewayEventFilter myEventFilter1 = new MyGatewayEventFilter1();
gatewaySenderFactory.addGatewayEventFilter(myEventFilter1);
GatewayTransportFilter myStreamFilter1 = new MyGatewayTransportFilter1();
gatewaySenderFactory.addGatewayTransportFilter(myStreamFilter1);
GatewayTransportFilter myStreamFilter2 = new MyGatewayTransportFilter2();
gatewaySenderFactory.addGatewayTransportFilter(myStreamFilter2);
GatewaySender serialGatewaySender = gatewaySenderFactory.create("LN", 2);
RegionAttributesCreation attrs = new RegionAttributesCreation();
attrs.addGatewaySenderId(serialGatewaySender.getId());
cache.createRegion("UserRegion", attrs);
testXml(cache);
Cache c = getCache();
assertNotNull(c);
Set<GatewaySender> sendersOnCache = c.getGatewaySenders();
for (GatewaySender sender : sendersOnCache) {
assertEquals(false, sender.isParallel());
validateGatewaySender(serialGatewaySender, sender);
}
}
public static class MyGatewayEventFilter implements GatewayEventFilter, Declarable {
@Override
public void afterAcknowledgement(GatewayQueueEvent event) {}
@Override
public boolean beforeEnqueue(GatewayQueueEvent event) {
return true;
}
@Override
public boolean beforeTransmit(GatewayQueueEvent event) {
return true;
}
@Override
public void close() {}
@Override
public void init(Properties properties) {}
}
static void validateGatewayReceiver(GatewayReceiver receiver1, GatewayReceiver gatewayReceiver) {
assertEquals(receiver1.getHostnameForSenders(), gatewayReceiver.getHostnameForSenders());
assertEquals(receiver1.getStartPort(), gatewayReceiver.getStartPort());
assertEquals(receiver1.getEndPort(), gatewayReceiver.getEndPort());
assertEquals(receiver1.getMaximumTimeBetweenPings(),
gatewayReceiver.getMaximumTimeBetweenPings());
assertEquals(receiver1.getSocketBufferSize(), gatewayReceiver.getSocketBufferSize());
assertEquals(receiver1.getGatewayTransportFilters().size(),
gatewayReceiver.getGatewayTransportFilters().size());
}
static void validateGatewaySender(GatewaySender sender1, GatewaySender gatewaySender) {
assertEquals(sender1.getId(), gatewaySender.getId());
assertEquals(sender1.getRemoteDSId(), gatewaySender.getRemoteDSId());
assertEquals(sender1.isParallel(), gatewaySender.isParallel());
assertEquals(sender1.isBatchConflationEnabled(), gatewaySender.isBatchConflationEnabled());
assertEquals(sender1.getBatchSize(), gatewaySender.getBatchSize());
assertEquals(sender1.getBatchTimeInterval(), gatewaySender.getBatchTimeInterval());
assertEquals(sender1.isPersistenceEnabled(), gatewaySender.isPersistenceEnabled());
assertEquals(sender1.getDiskStoreName(), gatewaySender.getDiskStoreName());
assertEquals(sender1.isDiskSynchronous(), gatewaySender.isDiskSynchronous());
assertEquals(sender1.getMaximumQueueMemory(), gatewaySender.getMaximumQueueMemory());
assertEquals(sender1.getAlertThreshold(), gatewaySender.getAlertThreshold());
assertEquals(sender1.getGatewayEventFilters().size(),
gatewaySender.getGatewayEventFilters().size());
assertEquals(sender1.getGatewayTransportFilters().size(),
gatewaySender.getGatewayTransportFilters().size());
boolean isParallel = sender1.isParallel();
if (isParallel) {
assertTrue("sender should be instanceof Creation",
sender1 instanceof ParallelGatewaySenderCreation);
} else {
assertTrue("sender should be instanceof Creation",
sender1 instanceof SerialGatewaySenderCreation);
}
}
}