blob: c0d20513fbb31c3babde326388cd9d85170d23e2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache.wan;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.logging.log4j.Logger;
import org.apache.geode.cache.asyncqueue.AsyncEventListener;
import org.apache.geode.cache.client.internal.LocatorDiscoveryCallback;
import org.apache.geode.cache.wan.GatewayEventFilter;
import org.apache.geode.cache.wan.GatewayEventSubstitutionFilter;
import org.apache.geode.cache.wan.GatewaySender;
import org.apache.geode.cache.wan.GatewaySender.OrderPolicy;
import org.apache.geode.cache.wan.GatewaySenderFactory;
import org.apache.geode.cache.wan.GatewayTransportFilter;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.internal.cache.GemFireCacheImpl;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.wan.parallel.ParallelGatewaySenderImpl;
import org.apache.geode.internal.cache.wan.serial.SerialGatewaySenderImpl;
import org.apache.geode.internal.cache.xmlcache.CacheCreation;
import org.apache.geode.internal.cache.xmlcache.ParallelGatewaySenderCreation;
import org.apache.geode.internal.cache.xmlcache.SerialGatewaySenderCreation;
import org.apache.geode.internal.statistics.StatisticsClock;
import org.apache.geode.logging.internal.log4j.api.LogService;
/**
* @since GemFire 7.0
*/
public class GatewaySenderFactoryImpl implements InternalGatewaySenderFactory {
private static final Logger logger = LogService.getLogger();
private static final AtomicBoolean GATEWAY_CONNECTION_READ_TIMEOUT_PROPERTY_CHECKED =
new AtomicBoolean(false);
/**
* Used internally to pass the attributes from this factory to the real GatewaySender it is
* creating.
*/
private final GatewaySenderAttributes attrs = new GatewaySenderAttributes();
private final InternalCache cache;
private final StatisticsClock statisticsClock;
public GatewaySenderFactoryImpl(InternalCache cache, StatisticsClock statisticsClock) {
this.cache = cache;
this.statisticsClock = statisticsClock;
}
@Override
public GatewaySenderFactory setParallel(boolean isParallel) {
this.attrs.isParallel = isParallel;
return this;
}
@Override
public GatewaySenderFactory setGroupTransactionEvents(boolean groupTransactionEvents) {
this.attrs.groupTransactionEvents = groupTransactionEvents;
return this;
}
@Override
public GatewaySenderFactory setForInternalUse(boolean isForInternalUse) {
this.attrs.isForInternalUse = isForInternalUse;
return this;
}
@Override
public GatewaySenderFactory addGatewayEventFilter(GatewayEventFilter filter) {
this.attrs.addGatewayEventFilter(filter);
return this;
}
@Override
public GatewaySenderFactory addGatewayTransportFilter(GatewayTransportFilter filter) {
this.attrs.addGatewayTransportFilter(filter);
return this;
}
@Override
public GatewaySenderFactory addAsyncEventListener(AsyncEventListener listener) {
this.attrs.addAsyncEventListener(listener);
return this;
}
@Override
public GatewaySenderFactory setSocketBufferSize(int socketBufferSize) {
this.attrs.socketBufferSize = socketBufferSize;
return this;
}
@Override
public GatewaySenderFactory setSocketReadTimeout(int socketReadTimeout) {
this.attrs.socketReadTimeout = socketReadTimeout;
return this;
}
@Override
public GatewaySenderFactory setDiskStoreName(String diskStoreName) {
this.attrs.diskStoreName = diskStoreName;
return this;
}
@Override
public GatewaySenderFactory setMaximumQueueMemory(int maximumQueueMemory) {
this.attrs.maximumQueueMemory = maximumQueueMemory;
return this;
}
@Override
public GatewaySenderFactory setBatchSize(int batchSize) {
this.attrs.batchSize = batchSize;
return this;
}
@Override
public GatewaySenderFactory setBatchTimeInterval(int batchTimeInterval) {
this.attrs.batchTimeInterval = batchTimeInterval;
return this;
}
@Override
public GatewaySenderFactory setBatchConflationEnabled(boolean enableBatchConflation) {
this.attrs.isBatchConflationEnabled = enableBatchConflation;
return this;
}
@Override
public GatewaySenderFactory setPersistenceEnabled(boolean enablePersistence) {
this.attrs.isPersistenceEnabled = enablePersistence;
return this;
}
@Override
public GatewaySenderFactory setAlertThreshold(int threshold) {
this.attrs.alertThreshold = threshold;
return this;
}
@Override
public GatewaySenderFactory setManualStart(boolean start) {
this.attrs.manualStart = start;
return this;
}
@Override
public GatewaySenderFactory setLocatorDiscoveryCallback(LocatorDiscoveryCallback locCallback) {
this.attrs.locatorDiscoveryCallback = locCallback;
return this;
}
@Override
public GatewaySenderFactory setDiskSynchronous(boolean isSynchronous) {
this.attrs.isDiskSynchronous = isSynchronous;
return this;
}
@Override
public GatewaySenderFactory setDispatcherThreads(int numThreads) {
if ((numThreads > 1) && this.attrs.policy == null) {
this.attrs.policy = GatewaySender.DEFAULT_ORDER_POLICY;
}
this.attrs.dispatcherThreads = numThreads;
return this;
}
@Override
public GatewaySenderFactory setParallelFactorForReplicatedRegion(int parallel) {
this.attrs.parallelism = parallel;
this.attrs.policy = GatewaySender.DEFAULT_ORDER_POLICY;
return this;
}
@Override
public GatewaySenderFactory setOrderPolicy(OrderPolicy policy) {
this.attrs.policy = policy;
return this;
}
@Override
public GatewaySenderFactory setBucketSorted(boolean isBucketSorted) {
this.attrs.isBucketSorted = isBucketSorted;
return this;
}
@Override
public GatewaySenderFactory setEnforceThreadsConnectSameReceiver(
boolean enforceThreadsConnectSameReceiver) {
this.attrs.enforceThreadsConnectSameReceiver = enforceThreadsConnectSameReceiver;
return this;
}
@Override
public GatewaySender create(String id, int remoteDSId) {
int myDSId = InternalDistributedSystem.getAnyInstance().getDistributionManager()
.getDistributedSystemId();
if (remoteDSId == myDSId) {
throw new GatewaySenderException(
String.format(
"GatewaySender %s cannot be created with remote DS Id equal to this DS Id. ",
id));
}
if (remoteDSId < 0) {
throw new GatewaySenderException(
String.format("GatewaySender %s cannot be created with remote DS Id less than 0. ",
id));
}
this.attrs.id = id;
this.attrs.remoteDs = remoteDSId;
GatewaySender sender = null;
if (this.attrs.getDispatcherThreads() <= 0) {
throw new GatewaySenderException(
String.format("GatewaySender %s can not be created with dispatcher threads less than 1",
id));
}
// Verify socket read timeout if a proper logger is available
if (this.cache instanceof GemFireCacheImpl) {
// If socket read timeout is less than the minimum, log a warning.
// Ideally, this should throw a GatewaySenderException, but wan dunit tests
// were failing, and we were running out of time to change them.
if (this.attrs.getSocketReadTimeout() != 0
&& this.attrs.getSocketReadTimeout() < GatewaySender.MINIMUM_SOCKET_READ_TIMEOUT) {
logger.warn(
"{} cannot configure socket read timeout of {} milliseconds because it is less than the minimum of {} milliseconds. The default will be used instead.",
new Object[] {"GatewaySender " + id, this.attrs.getSocketReadTimeout(),
GatewaySender.MINIMUM_SOCKET_READ_TIMEOUT});
this.attrs.socketReadTimeout = GatewaySender.MINIMUM_SOCKET_READ_TIMEOUT;
}
// Log a warning if the old system property is set.
if (GATEWAY_CONNECTION_READ_TIMEOUT_PROPERTY_CHECKED.compareAndSet(false, true)) {
if (System.getProperty(GatewaySender.GATEWAY_CONNECTION_READ_TIMEOUT_PROPERTY) != null) {
logger.warn(
"Obsolete java system property named {} was set to control {}. This property is no longer supported. Please use the GemFire API instead.",
new Object[] {GatewaySender.GATEWAY_CONNECTION_READ_TIMEOUT_PROPERTY,
"GatewaySender socket read timeout"});
}
}
}
if (this.attrs.mustGroupTransactionEvents() && this.attrs.isBatchConflationEnabled()) {
throw new GatewaySenderException(
String.format(
"GatewaySender %s cannot be created with both group transaction events set to true and batch conflation enabled",
id));
}
if (this.attrs.isParallel()) {
if ((this.attrs.getOrderPolicy() != null)
&& this.attrs.getOrderPolicy().equals(OrderPolicy.THREAD)) {
throw new GatewaySenderException(
String.format("Parallel Gateway Sender %s can not be created with OrderPolicy %s",
id, this.attrs.getOrderPolicy()));
}
if (this.cache instanceof GemFireCacheImpl) {
sender = new ParallelGatewaySenderImpl(cache, statisticsClock, attrs);
this.cache.addGatewaySender(sender);
if (!this.attrs.isManualStart()) {
sender.start();
}
} else if (this.cache instanceof CacheCreation) {
sender = new ParallelGatewaySenderCreation(this.cache, this.attrs);
this.cache.addGatewaySender(sender);
}
} else {
if (this.attrs.getAsyncEventListeners().size() > 0) {
throw new GatewaySenderException(
String.format(
"SerialGatewaySender %s cannot define a remote site because at least AsyncEventListener is already added. Both listeners and remote site cannot be defined for the same gateway sender.",
id));
}
if (this.attrs.mustGroupTransactionEvents() && this.attrs.getDispatcherThreads() > 1) {
throw new GatewaySenderException(
String.format(
"SerialGatewaySender %s cannot be created with group transaction events set to true when dispatcher threads is greater than 1",
id));
}
if (this.attrs.getOrderPolicy() == null && this.attrs.getDispatcherThreads() > 1) {
this.attrs.policy = GatewaySender.DEFAULT_ORDER_POLICY;
}
if (this.cache instanceof GemFireCacheImpl) {
sender = new SerialGatewaySenderImpl(cache, statisticsClock, attrs);
this.cache.addGatewaySender(sender);
if (!this.attrs.isManualStart()) {
sender.start();
}
} else if (this.cache instanceof CacheCreation) {
sender = new SerialGatewaySenderCreation(this.cache, this.attrs);
this.cache.addGatewaySender(sender);
}
}
return sender;
}
@Override
public GatewaySender create(String id) {
this.attrs.id = id;
GatewaySender sender = null;
if (this.attrs.getDispatcherThreads() <= 0) {
throw new AsyncEventQueueConfigurationException(
String.format("AsyncEventQueue %s can not be created with dispatcher threads less than 1",
id));
}
if (this.attrs.isParallel()) {
if ((this.attrs.getOrderPolicy() != null)
&& this.attrs.getOrderPolicy().equals(OrderPolicy.THREAD)) {
throw new AsyncEventQueueConfigurationException(
String.format(
"AsyncEventQueue %s can not be created with OrderPolicy %s when it is set parallel",
id, this.attrs.getOrderPolicy()));
}
if (this.cache instanceof GemFireCacheImpl) {
sender = new ParallelGatewaySenderImpl(cache, statisticsClock, attrs);
this.cache.addGatewaySender(sender);
if (!this.attrs.isManualStart()) {
sender.start();
}
} else if (this.cache instanceof CacheCreation) {
sender = new ParallelGatewaySenderCreation(this.cache, this.attrs);
this.cache.addGatewaySender(sender);
}
} else {
if (this.attrs.getOrderPolicy() == null && this.attrs.getDispatcherThreads() > 1) {
this.attrs.policy = GatewaySender.DEFAULT_ORDER_POLICY;
}
if (this.cache instanceof GemFireCacheImpl) {
sender = new SerialGatewaySenderImpl(cache, statisticsClock, attrs);
this.cache.addGatewaySender(sender);
if (!this.attrs.isManualStart()) {
sender.start();
}
} else if (this.cache instanceof CacheCreation) {
sender = new SerialGatewaySenderCreation(this.cache, this.attrs);
this.cache.addGatewaySender(sender);
}
}
return sender;
}
@Override
public GatewaySenderFactory removeGatewayEventFilter(GatewayEventFilter filter) {
this.attrs.eventFilters.remove(filter);
return this;
}
@Override
public GatewaySenderFactory removeGatewayTransportFilter(GatewayTransportFilter filter) {
this.attrs.transFilters.remove(filter);
return this;
}
@Override
public GatewaySenderFactory setGatewayEventSubstitutionFilter(
GatewayEventSubstitutionFilter filter) {
this.attrs.eventSubstitutionFilter = filter;
return this;
}
@Override
public void configureGatewaySender(GatewaySender senderCreation) {
this.attrs.isParallel = senderCreation.isParallel();
this.attrs.manualStart = senderCreation.isManualStart();
this.attrs.socketBufferSize = senderCreation.getSocketBufferSize();
this.attrs.socketReadTimeout = senderCreation.getSocketReadTimeout();
this.attrs.isBatchConflationEnabled = senderCreation.isBatchConflationEnabled();
this.attrs.batchSize = senderCreation.getBatchSize();
this.attrs.batchTimeInterval = senderCreation.getBatchTimeInterval();
this.attrs.isPersistenceEnabled = senderCreation.isPersistenceEnabled();
this.attrs.diskStoreName = senderCreation.getDiskStoreName();
this.attrs.isDiskSynchronous = senderCreation.isDiskSynchronous();
this.attrs.maximumQueueMemory = senderCreation.getMaximumQueueMemory();
this.attrs.alertThreshold = senderCreation.getAlertThreshold();
this.attrs.dispatcherThreads = senderCreation.getDispatcherThreads();
this.attrs.policy = senderCreation.getOrderPolicy();
for (GatewayEventFilter filter : senderCreation.getGatewayEventFilters()) {
this.attrs.eventFilters.add(filter);
}
for (GatewayTransportFilter filter : senderCreation.getGatewayTransportFilters()) {
this.attrs.transFilters.add(filter);
}
this.attrs.eventSubstitutionFilter = senderCreation.getGatewayEventSubstitutionFilter();
this.attrs.groupTransactionEvents = senderCreation.mustGroupTransactionEvents();
this.attrs.enforceThreadsConnectSameReceiver =
senderCreation.getEnforceThreadsConnectSameReceiver();
}
}