blob: 6af086660f6d4121c24704ee0d2ca76225866d59 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache.wan;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
import org.apache.logging.log4j.Logger;
import org.apache.geode.CancelException;
import org.apache.geode.DataSerializer;
import org.apache.geode.annotations.Immutable;
import org.apache.geode.cache.asyncqueue.AsyncEventListener;
import org.apache.geode.cache.util.Gateway;
import org.apache.geode.cache.wan.GatewaySender.OrderPolicy;
import org.apache.geode.cache.wan.GatewayTransportFilter;
import org.apache.geode.distributed.DistributedLockService;
import org.apache.geode.distributed.internal.ClusterDistributionManager;
import org.apache.geode.distributed.internal.DistributionAdvisee;
import org.apache.geode.distributed.internal.DistributionAdvisor;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.distributed.internal.ServerLocation;
import org.apache.geode.distributed.internal.locks.DLockService;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.Assert;
import org.apache.geode.internal.InternalDataSerializer;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.UpdateAttributesProcessor;
import org.apache.geode.internal.serialization.DeserializationContext;
import org.apache.geode.internal.serialization.KnownVersion;
import org.apache.geode.internal.serialization.SerializationContext;
import org.apache.geode.internal.serialization.StaticSerialization;
import org.apache.geode.logging.internal.executors.LoggingThread;
import org.apache.geode.logging.internal.log4j.api.LogService;
public class GatewaySenderAdvisor extends DistributionAdvisor {
private static final Logger logger = LogService.getLogger();
private DistributedLockService lockService;
private volatile boolean isPrimary;
private final Object primaryLock = new Object();
private final String lockToken;
private Thread lockObtainingThread;
private AbstractGatewaySender sender;
private GatewaySenderAdvisor(DistributionAdvisee sender) {
super(sender);
this.sender = (AbstractGatewaySender) sender;
this.lockToken = getDLockServiceName() + "-token";
}
public static GatewaySenderAdvisor createGatewaySenderAdvisor(DistributionAdvisee sender) {
GatewaySenderAdvisor advisor = new GatewaySenderAdvisor(sender);
advisor.initialize();
return advisor;
}
public String getDLockServiceName() {
return getClass().getName() + "_" + this.sender.getId();
}
public Thread getLockObtainingThread() {
return this.lockObtainingThread;
}
/** Instantiate new Sender profile for this member */
@Override
protected Profile instantiateProfile(InternalDistributedMember memberId, int version) {
return new GatewaySenderProfile(memberId, version);
}
/**
* The profile will be created when the sender is added to the cache. here we are not starting the
* sender. so we should not release or acquire any lock for the sender to become primary based on
* creation only.
*/
@Override
public void profileCreated(Profile profile) {
if (profile instanceof GatewaySenderProfile) {
GatewaySenderProfile sp = (GatewaySenderProfile) profile;
checkCompatibility(sp);
}
}
private void checkCompatibility(GatewaySenderProfile sp) {
if (sp.remoteDSId != sender.getRemoteDSId()) {
throw new IllegalStateException(
String.format(
"Cannot create Gateway Sender %s with remote ds id %s because another cache has the same Gateway Sender defined with remote ds id %s.",
sp.Id, sp.remoteDSId, sender.remoteDSId));
}
if (sp.isParallel && !sender.isParallel()) {
throw new IllegalStateException(
String.format(
"Cannot create Gateway Sender %s as parallel gateway sender because another cache has the same sender as serial gateway sender",
sp.Id));
}
if (!sp.isParallel && sender.isParallel()) {
throw new IllegalStateException(
String.format(
"Cannot create Gateway Sender %s as serial gateway sender because another cache has the same sender as parallel gateway sender",
sp.Id));
}
if (sp.isBatchConflationEnabled != sender.isBatchConflationEnabled()) {
throw new IllegalStateException(
String.format(
"Cannot create Gateway Sender %s with isBatchConflationEnabled %s because another cache has the same Gateway Sender defined with isBatchConflationEnabled %s",
sp.Id, sp.isBatchConflationEnabled,
sender.isBatchConflationEnabled()));
}
if (sp.isPersistenceEnabled != sender.isPersistenceEnabled()) {
throw new IllegalStateException(
String.format(
"Cannot create Gateway Sender %s with isPersistentEnabled %s because another cache has the same Gateway Sender defined with isPersistentEnabled %s",
sp.Id, sp.isPersistenceEnabled, sender.isPersistenceEnabled()));
}
if (sp.alertThreshold != sender.getAlertThreshold()) {
throw new IllegalStateException(
String.format(
"Cannot create Gateway Sender %s with alertThreshold %s because another cache has the same Gateway Sender defined with alertThreshold %s",
sp.Id, sp.alertThreshold, sender.getAlertThreshold()));
}
if (!sender.isParallel()) {
if (sp.manualStart != sender.isManualStart()) {
throw new IllegalStateException(
String.format(
"Cannot create Gateway Sender %s with manual start %s because another cache has the same Gateway Sender defined with manual start %s",
sp.Id, sp.manualStart, sender.isManualStart()));
}
}
if (!sp.isParallel) {
if (sp.orderPolicy != sender.getOrderPolicy()) {
throw new IllegalStateException(
String.format(
"Cannot create Gateway Sender %s with orderPolicy %s because another cache has the same Gateway Sender defined with orderPolicy %s",
sp.Id, sp.orderPolicy, sender.getOrderPolicy()));
}
}
List<String> senderEventFilterClassNames = new ArrayList<String>();
for (org.apache.geode.cache.wan.GatewayEventFilter filter : sender.getGatewayEventFilters()) {
senderEventFilterClassNames.add(filter.getClass().getName());
}
if (sp.eventFiltersClassNames.size() != senderEventFilterClassNames.size()) {
throw new IllegalStateException(
String.format(
"Cannot create Gateway Sender %s with GatewayEventFilters %s because another cache has the same Gateway Sender defined with GatewayEventFilters %s",
sp.Id, sp.eventFiltersClassNames, senderEventFilterClassNames));
} else {
for (String filterName : senderEventFilterClassNames) {
if (!sp.eventFiltersClassNames.contains(filterName)) {
throw new IllegalStateException(
String.format(
"Cannot create Gateway Sender %s with GatewayEventFilters %s because another cache has the same Gateway Sender defined with GatewayEventFilters %s",
sp.Id, sp.eventFiltersClassNames, senderEventFilterClassNames));
}
}
}
Set<String> senderTransportFilterClassNames = new LinkedHashSet<String>();
for (GatewayTransportFilter filter : sender.getGatewayTransportFilters()) {
senderTransportFilterClassNames.add(filter.getClass().getName());
}
if (sp.transFiltersClassNames.size() != senderTransportFilterClassNames.size()) {
throw new IllegalStateException(
String.format(
"Cannot create Gateway Sender %s with GatewayTransportFilters %s because another cache has the same Gateway Sender defined with GatewayTransportFilters %s",
sp.Id, sp.transFiltersClassNames, senderTransportFilterClassNames));
} else {
Iterator<String> i1 = sp.transFiltersClassNames.iterator();
Iterator<String> i2 = senderTransportFilterClassNames.iterator();
while (i1.hasNext() && i2.hasNext()) {
if (!i1.next().equals(i2.next())) {
throw new IllegalStateException(
String.format(
"Cannot create Gateway Sender %s with GatewayTransportFilters %s because another cache has the same Gateway Sender defined with GatewayTransportFilters %s",
sp.Id, sp.transFiltersClassNames,
senderTransportFilterClassNames));
}
}
}
List<String> senderEventListenerClassNames = new ArrayList<String>();
for (AsyncEventListener listener : sender.getAsyncEventListeners()) {
senderEventListenerClassNames.add(listener.getClass().getName());
}
if (sp.senderEventListenerClassNames.size() != senderEventListenerClassNames.size()) {
throw new IllegalStateException(
String.format(
"Cannot create Gateway Sender %s with AsyncEventListeners %s because another cache has the same Gateway Sender defined with AsyncEventListener %s",
sp.Id, sp.senderEventListenerClassNames,
senderEventListenerClassNames));
} else {
for (String listenerName : senderEventListenerClassNames) {
if (!sp.senderEventListenerClassNames.contains(listenerName)) {
throw new IllegalStateException(
String.format(
"Cannot create Gateway Sender %s with AsyncEventListeners %s because another cache has the same Gateway Sender defined with AsyncEventListener %s",
sp.Id, sp.senderEventListenerClassNames,
senderEventListenerClassNames));
}
}
}
if (sp.isDiskSynchronous != sender.isDiskSynchronous()) {
throw new IllegalStateException(
String.format(
"Cannot create Gateway Sender %s with isDiskSynchronous %s because another cache has the same Gateway Sender defined with isDiskSynchronous %s",
sp.Id, sp.isDiskSynchronous, sender.isDiskSynchronous()));
}
if (sp.getDistributedMember().getVersion().isNotOlderThan(KnownVersion.GEODE_1_14_0)) {
if (sp.enforceThreadsConnectSameReceiver != sender.getEnforceThreadsConnectSameReceiver()) {
throw new IllegalStateException(
String.format(
"Cannot create Gateway Sender %s with enforceThreadsConnectSameReceiver %s because another cache has the same Gateway Sender defined with enforceThreadsConnectSameReceiver %s",
sp.Id, sp.enforceThreadsConnectSameReceiver,
sender.getEnforceThreadsConnectSameReceiver()));
}
}
}
/**
* If there is change in sender which having policy as primary. 1. If that sender is stopped then
* if there are no other primary senders then this sender should volunteer for primary. 2. If this
* sender is primary and its policy is secondary then this sender should release the lock so that
* other primary sender which s waiting on lock will get the lock.
*/
@Override
public void profileUpdated(Profile profile) {
if (profile instanceof GatewaySenderProfile) {
GatewaySenderProfile sp = (GatewaySenderProfile) profile;
if (!sp.isParallel) { // SerialGatewaySender
if (!sp.isRunning) {
if (advisePrimaryGatewaySender() != null) {
return;
}
// IF this sender is not primary
if (!this.sender.isPrimary()) {
if (!adviseEldestGatewaySender()) {// AND this is not the eldest
// sender
if (logger.isDebugEnabled()) {
logger.debug(
"Sender {} is not the eldest in the system. Giving preference to eldest sender to become primary...",
this.sender);
}
return;
}
launchLockObtainingVolunteerThread();
}
} else {
if (sp.serverLocation != null) {
this.sender.setServerLocation(sp.serverLocation);
}
}
}
}
}
/**
* When the sender profile is removed, then check for the primary members if they are not
* available then this secondary sender should volunteer for primary
*/
@Override
protected void profileRemoved(Profile profile) {
if (profile instanceof GatewaySenderProfile) {
GatewaySenderProfile sp = (GatewaySenderProfile) profile;
if (!sp.isParallel) {// SerialGatewaySender
// if there is a primary sender, then don't volunteer for primary
if (advisePrimaryGatewaySender() != null) {
return;
}
if (!this.sender.isPrimary()) {// IF this sender is not primary
if (!adviseEldestGatewaySender()) {// AND this is not the eldest sender
if (logger.isDebugEnabled()) {
logger.debug(
"Sender {} is not the eldest in the system. Giving preference to eldest sender to become primary...",
this.sender);
}
return;
}
launchLockObtainingVolunteerThread();
}
}
}
}
public boolean isPrimary() {
return sender.isParallel() || this.isPrimary;
}
public void initDLockService() {
InternalDistributedSystem ds = this.sender.getCache().getInternalDistributedSystem();
String dlsName = getDLockServiceName();
this.lockService = DistributedLockService.getServiceNamed(dlsName);
if (this.lockService == null) {
this.lockService = DLockService.create(dlsName, ds, true, true, true);
}
Assert.assertTrue(this.lockService != null);
if (logger.isDebugEnabled()) {
logger.debug("{}: Obtained DistributedLockService: {}", this, this.lockService);
}
}
public boolean volunteerForPrimary() {
if (logger.isDebugEnabled()) {
logger.debug("Sender : {} is volunteering for Primary ", this.sender.getId());
}
if (advisePrimaryGatewaySender() == null) {
if (!adviseEldestGatewaySender()) {
if (logger.isDebugEnabled()) {
logger.debug(
"Sender {} is not the eldest in the system. Giving preference to eldest sender to become primary...",
this.sender);
}
return false;
}
if (logger.isDebugEnabled()) {
logger.debug("Sender : {} no Primary available. So going to acquire distributed lock",
this.sender);
}
this.lockService.lock(this.lockToken, 10000, -1);
return this.lockService.isHeldByCurrentThread(this.lockToken);
}
return false;
}
/**
* Find out if this sender is the eldest in the DS. Returns true if: 1. No other sender is running
* 2. At least one sender is running in the system apart from this sender AND this sender's start
* time is lesser of all (i.e. this sender is oldest)
*
* @return boolean true if this eldest sender; false otherwise
*/
private boolean adviseEldestGatewaySender() {
Profile[] snapshot = this.profiles;
// sender with minimum startTime is eldest. Find out the minimum start time
// of remote senders.
TreeSet<Long> senderStartTimes = new TreeSet<Long>();
for (Profile profile : snapshot) {
GatewaySenderProfile sp = (GatewaySenderProfile) profile;
if (!sp.isParallel && sp.isRunning) {
senderStartTimes.add(sp.startTime);
}
}
// if none of the remote senders is running, then this sender should
// volunteer for primary
// if there are remote senders running and this sender is not running then
// it should give up
// and allow existing running senders to volunteer
return (senderStartTimes.isEmpty())
|| (this.sender.isRunning() && (this.sender.startTime <= senderStartTimes.first()));
}
private InternalDistributedMember adviseEldestGatewaySenderNode() {
Profile[] snapshot = this.profiles;
// sender with minimum startTime is eldest. Find out the minimum start time
// of remote senders.
InternalDistributedMember node = null;
GatewaySenderProfile eldestProfile = null;
for (Profile profile : snapshot) {
GatewaySenderProfile sp = (GatewaySenderProfile) profile;
if (!sp.isParallel && sp.isRunning) {
if (eldestProfile == null) {
eldestProfile = sp;
}
if (sp.startTime < eldestProfile.startTime) {
eldestProfile = sp;
}
}
}
if (eldestProfile != null) {
node = eldestProfile.getDistributedMember();
}
return node;
}
public void makePrimary() {
logger.info("{} : Starting as primary", this.sender);
AbstractGatewaySenderEventProcessor eventProcessor = this.sender.getEventProcessor();
if (eventProcessor != null) {
eventProcessor.removeCacheListener();
}
logger.info("{} : Becoming primary gateway sender", this.sender);
notifyAndBecomePrimary();
new UpdateAttributesProcessor(this.sender).distribute(false);
}
public void notifyAndBecomePrimary() {
synchronized (this.primaryLock) {
setIsPrimary(true);
notifyPrimaryLock();
}
}
public void notifyPrimaryLock() {
synchronized (this.primaryLock) {
this.primaryLock.notifyAll();
}
}
public void makeSecondary() {
if (logger.isDebugEnabled()) {
logger.debug("{}: Did not obtain the lock on {}. Starting as secondary gateway sender.",
this.sender, this.lockToken);
}
// Set primary flag to false
logger.info(
"{} starting as secondary because primary gateway sender is available on member :{}",
new Object[] {this.sender.getId(), advisePrimaryGatewaySender()});
this.isPrimary = false;
new UpdateAttributesProcessor(this.sender).distribute(false);
}
public void launchLockObtainingVolunteerThread() {
String threadName = "Gateway Sender Primary Lock Acquisition Thread Volunteer";
this.lockObtainingThread = new LoggingThread(threadName, () -> {
GatewaySenderAdvisor.this.sender.getLifeCycleLock().readLock().lock();
try {
// Attempt to obtain the lock
if (!(GatewaySenderAdvisor.this.sender.isRunning())) {
return;
}
if (logger.isDebugEnabled()) {
logger.debug("{}: Obtaining the lock on {}", this, GatewaySenderAdvisor.this.lockToken);
}
if (volunteerForPrimary()) {
if (logger.isDebugEnabled()) {
logger.debug("{}: Obtained the lock on {}", this,
GatewaySenderAdvisor.this.lockToken);
}
logger.info("{} is becoming primary gateway Sender.",
GatewaySenderAdvisor.this);
// As soon as the lock is obtained, set primary
GatewaySenderAdvisor.this.makePrimary();
}
} catch (CancelException e) {
// no action necessary
} catch (Exception e) {
if (!sender.getStopper().isCancelInProgress()) {
logger.fatal(String.format(
"%s: The thread to obtain the failover lock was interrupted. This gateway sender will never become the primary.",
GatewaySenderAdvisor.this),
e);
}
} finally {
GatewaySenderAdvisor.this.sender.getLifeCycleLock().readLock().unlock();
}
});
this.lockObtainingThread.start();
}
public void waitToBecomePrimary(AbstractGatewaySenderEventProcessor callingProcessor)
throws InterruptedException {
if (isPrimary()) {
return;
}
synchronized (this.primaryLock) {
logger.info("{} : Waiting to become primary gateway", this.sender.getId());
while (!isPrimary()) {
this.primaryLock.wait(1000);
if (sender.getEventProcessor() != null && callingProcessor.isStopped()) {
logger.info("The event processor is stopped, not to wait for being primary any more.");
return;
}
}
}
}
/**
* Profile information for a remote counterpart.
*/
public static class GatewaySenderProfile extends DistributionAdvisor.Profile {
public String Id;
public long startTime;
public int remoteDSId;
/**
* I need this boolean to make sure the sender which is volunteer for primary is running. not
* running sender should not become primary.
*/
public boolean isRunning;
public boolean isPrimary;
public boolean isParallel;
public boolean isBatchConflationEnabled;
public boolean isPersistenceEnabled;
public int alertThreshold;
public boolean manualStart;
public ArrayList<String> eventFiltersClassNames = new ArrayList<String>();
public ArrayList<String> transFiltersClassNames = new ArrayList<String>();
public ArrayList<String> senderEventListenerClassNames = new ArrayList<String>();
public boolean isDiskSynchronous;
public int dispatcherThreads;
public OrderPolicy orderPolicy;
public ServerLocation serverLocation;
public boolean enforceThreadsConnectSameReceiver = false;
public GatewaySenderProfile(InternalDistributedMember memberId, int version) {
super(memberId, version);
}
public GatewaySenderProfile() {}
@Override
public void fromData(DataInput in,
DeserializationContext context) throws IOException, ClassNotFoundException {
fromDataPre_GEODE_1_14_0_0(in, context);
this.enforceThreadsConnectSameReceiver = in.readBoolean();
}
public void fromDataPre_GEODE_1_14_0_0(DataInput in,
DeserializationContext context) throws IOException, ClassNotFoundException {
super.fromData(in, context);
this.Id = DataSerializer.readString(in);
this.startTime = in.readLong();
this.remoteDSId = in.readInt();
this.isRunning = in.readBoolean();
this.isPrimary = in.readBoolean();
this.isParallel = in.readBoolean();
this.isBatchConflationEnabled = in.readBoolean();
this.isPersistenceEnabled = in.readBoolean();
this.alertThreshold = in.readInt();
this.manualStart = in.readBoolean();
this.eventFiltersClassNames = DataSerializer.readArrayList(in);
this.transFiltersClassNames = DataSerializer.readArrayList(in);
this.senderEventListenerClassNames = DataSerializer.readArrayList(in);
this.isDiskSynchronous = in.readBoolean();
this.dispatcherThreads = in.readInt();
if (StaticSerialization.getVersionForDataStream(in).isOlderThan(KnownVersion.GFE_90)) {
Gateway.OrderPolicy oldOrderPolicy = DataSerializer.readObject(in);
if (oldOrderPolicy != null) {
if (oldOrderPolicy.name().equals(OrderPolicy.KEY.name())) {
this.orderPolicy = OrderPolicy.KEY;
} else if (oldOrderPolicy.name().equals(OrderPolicy.THREAD.name())) {
this.orderPolicy = OrderPolicy.THREAD;
} else {
this.orderPolicy = OrderPolicy.PARTITION;
}
} else {
this.orderPolicy = null;
}
} else {
this.orderPolicy = DataSerializer.readObject(in);
}
boolean serverLocationFound = DataSerializer.readPrimitiveBoolean(in);
if (serverLocationFound) {
this.serverLocation = new ServerLocation();
InternalDataSerializer.invokeFromData(this.serverLocation, in);
}
this.enforceThreadsConnectSameReceiver = in.readBoolean();
}
@Override
public void toData(DataOutput out,
SerializationContext context) throws IOException {
toDataPre_GEODE_1_14_0_0(out, context);
out.writeBoolean(enforceThreadsConnectSameReceiver);
}
public void toDataPre_GEODE_1_14_0_0(DataOutput out,
SerializationContext context) throws IOException {
super.toData(out, context);
DataSerializer.writeString(Id, out);
out.writeLong(startTime);
out.writeInt(remoteDSId);
out.writeBoolean(isRunning);
out.writeBoolean(isPrimary);
out.writeBoolean(isParallel);
out.writeBoolean(isBatchConflationEnabled);
out.writeBoolean(isPersistenceEnabled);
out.writeInt(alertThreshold);
out.writeBoolean(manualStart);
DataSerializer.writeArrayList(eventFiltersClassNames, out);
DataSerializer.writeArrayList(transFiltersClassNames, out);
DataSerializer.writeArrayList(senderEventListenerClassNames, out);
out.writeBoolean(isDiskSynchronous);
out.writeInt(dispatcherThreads);
if (StaticSerialization.getVersionForDataStream(out).isOlderThan(KnownVersion.GFE_90)
&& this.orderPolicy != null) {
String orderPolicyName = this.orderPolicy.name();
if (orderPolicyName.equals(Gateway.OrderPolicy.KEY.name())) {
DataSerializer.writeObject(Gateway.OrderPolicy.KEY, out);
} else if (orderPolicyName.equals(Gateway.OrderPolicy.THREAD.name())) {
DataSerializer.writeObject(Gateway.OrderPolicy.THREAD, out);
} else {
DataSerializer.writeObject(Gateway.OrderPolicy.PARTITION, out);
}
} else {
DataSerializer.writeObject(orderPolicy, out);
}
boolean serverLocationFound = (this.serverLocation != null);
DataSerializer.writePrimitiveBoolean(serverLocationFound, out);
if (serverLocationFound) {
InternalDataSerializer.invokeToData(serverLocation, out);
}
out.writeBoolean(enforceThreadsConnectSameReceiver);
}
public void fromDataPre_GFE_8_0_0_0(DataInput in, DeserializationContext context)
throws IOException, ClassNotFoundException {
super.fromData(in, context);
this.Id = DataSerializer.readString(in);
this.startTime = in.readLong();
this.remoteDSId = in.readInt();
this.isRunning = in.readBoolean();
this.isPrimary = in.readBoolean();
this.isParallel = in.readBoolean();
this.isBatchConflationEnabled = in.readBoolean();
this.isPersistenceEnabled = in.readBoolean();
this.alertThreshold = in.readInt();
this.manualStart = in.readBoolean();
this.eventFiltersClassNames = DataSerializer.readArrayList(in);
this.transFiltersClassNames = DataSerializer.readArrayList(in);
this.senderEventListenerClassNames = DataSerializer.readArrayList(in);
this.isDiskSynchronous = in.readBoolean();
this.dispatcherThreads = in.readInt();
this.orderPolicy = DataSerializer.readObject(in);
boolean serverLocationFound = DataSerializer.readPrimitiveBoolean(in);
if (serverLocationFound) {
this.serverLocation = new ServerLocation();
InternalDataSerializer.invokeFromData(this.serverLocation, in);
}
}
public void toDataPre_GFE_8_0_0_0(DataOutput out, SerializationContext context)
throws IOException {
super.toData(out, context);
DataSerializer.writeString(Id, out);
out.writeLong(startTime);
out.writeInt(remoteDSId);
out.writeBoolean(isRunning);
out.writeBoolean(isPrimary);
out.writeBoolean(isParallel);
out.writeBoolean(isBatchConflationEnabled);
out.writeBoolean(isPersistenceEnabled);
out.writeInt(alertThreshold);
out.writeBoolean(manualStart);
DataSerializer.writeArrayList(eventFiltersClassNames, out);
DataSerializer.writeArrayList(transFiltersClassNames, out);
DataSerializer.writeArrayList(senderEventListenerClassNames, out);
out.writeBoolean(isDiskSynchronous);
// out.writeInt(dispatcherThreads);
if (isParallel)
out.writeInt(1);// it was 1 on previous version of gemfire
else if (orderPolicy == null)
out.writeInt(1);// it was 1 on previous version of gemfire
else
out.writeInt(dispatcherThreads);
if (isParallel)
DataSerializer.writeObject(null, out);
else
DataSerializer.writeObject(orderPolicy, out);
boolean serverLocationFound = (this.serverLocation != null);
DataSerializer.writePrimitiveBoolean(serverLocationFound, out);
if (serverLocationFound) {
InternalDataSerializer.invokeToData(serverLocation, out);
}
}
@Immutable
private static final KnownVersion[] serializationVersions =
new KnownVersion[] {KnownVersion.GFE_80, KnownVersion.GEODE_1_14_0};
@Override
public KnownVersion[] getSerializationVersions() {
return serializationVersions;
}
@Override
public int getDSFID() {
return GATEWAY_SENDER_PROFILE;
}
@Override
public void processIncoming(ClusterDistributionManager dm, String adviseePath,
boolean removeProfile, boolean exchangeProfiles, final List<Profile> replyProfiles) {
InternalCache cache = dm.getCache();
if (cache != null) {
AbstractGatewaySender sender = (AbstractGatewaySender) cache.getGatewaySender(adviseePath);
handleDistributionAdvisee(sender, removeProfile, exchangeProfiles, replyProfiles);
}
}
@Override
public void fillInToString(StringBuilder sb) {
super.fillInToString(sb);
sb.append("; id=" + this.Id);
sb.append("; remoteDSName=" + this.remoteDSId);
sb.append("; isRunning=" + this.isRunning);
sb.append("; isPrimary=" + this.isPrimary);
}
}
public InternalDistributedMember advisePrimaryGatewaySender() {
Profile[] snapshot = this.profiles;
for (Profile profile : snapshot) {
GatewaySenderProfile sp = (GatewaySenderProfile) profile;
if (!sp.isParallel && sp.isPrimary) {
return sp.getDistributedMember();
}
}
return null;
}
public void setIsPrimary(boolean isPrimary) {
this.isPrimary = isPrimary;
}
@Override
public void close() {
new UpdateAttributesProcessor(this.getAdvisee(), true).distribute(false);
super.close();
}
}