blob: ef106d22fc856ab04894f85c4ca442cb550c0171 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import org.apache.logging.log4j.Logger;
import org.apache.geode.CancelException;
import org.apache.geode.DataSerializer;
import org.apache.geode.SystemFailure;
import org.apache.geode.distributed.internal.ClusterDistributionManager;
import org.apache.geode.distributed.internal.DistributionAdvisee;
import org.apache.geode.distributed.internal.DistributionAdvisor;
import org.apache.geode.distributed.internal.DistributionAdvisor.Profile;
import org.apache.geode.distributed.internal.DistributionManager;
import org.apache.geode.distributed.internal.DistributionMessage;
import org.apache.geode.distributed.internal.HighPriorityDistributionMessage;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.distributed.internal.MessageWithReply;
import org.apache.geode.distributed.internal.ReplyException;
import org.apache.geode.distributed.internal.ReplyMessage;
import org.apache.geode.distributed.internal.ReplyProcessor21;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.Assert;
import org.apache.geode.internal.serialization.DeserializationContext;
import org.apache.geode.internal.serialization.SerializationContext;
import org.apache.geode.logging.internal.log4j.api.LogService;
/**
* This class is a bit misnamed. It really has more with pushing a DistributionAdvisee's profile out
* to others and, optionally if <code>profileExchange</code>, fetching the profile of anyone who
* excepts the pushed profile.
*
*/
public class UpdateAttributesProcessor {
private static final Logger logger = LogService.getLogger();
protected final DistributionAdvisee advisee;
private boolean profileExchange = false;
/**
* If true then sender is telling receiver to remove the sender's profile. No profile exchange is
* needed in this case.
*
* @since GemFire 5.7
*/
private boolean removeProfile = false;
private ReplyProcessor21 processor;
/** Creates a new instance of UpdateAttributesProcessor */
public UpdateAttributesProcessor(DistributionAdvisee da) {
this(da, false);
}
/**
* Creates a new instance of UpdateAttributesProcessor
*
* @since GemFire 5.7
*/
public UpdateAttributesProcessor(DistributionAdvisee da, boolean removeProfile) {
this.advisee = da;
this.removeProfile = removeProfile;
}
/**
* Distribute new profile version without exchange of profiles. Same as calling
* {@link #distribute(boolean)} with (false).
*/
void distribute() {
distribute(false);
}
/**
* Distribute with optional exchange of profiles but do not create new profile version.
*
* @param exchangeProfiles true if we want to receive profile replies
*/
public void distribute(boolean exchangeProfiles) {
sendProfileUpdate(exchangeProfiles);
waitForProfileResponse();
}
public void waitForProfileResponse() {
if (processor == null) {
return;
}
DistributionManager mgr = this.advisee.getDistributionManager();
try {
// bug 36983 - you can't loop on a reply processor
mgr.getCancelCriterion().checkCancelInProgress(null);
try {
processor.waitForRepliesUninterruptibly();
} catch (ReplyException e) {
e.handleCause();
}
} finally {
processor.cleanup();
}
}
public void sendProfileUpdate(boolean exchangeProfiles) {
DistributionManager mgr = this.advisee.getDistributionManager();
DistributionAdvisor advisor = this.advisee.getDistributionAdvisor();
this.profileExchange = exchangeProfiles;
// if this is not intended for the purpose of exchanging profiles but
// the advisor is uninitialized, then just exchange profiles anyway
// and be done with it (instead of exchanging profiles and then sending
// an attributes update)
if (!exchangeProfiles) {
if (this.removeProfile) {
if (!advisor.isInitialized()) {
// no need to tell the other advisor we are going away since
// never got initialized.
return;
}
} else if (advisor.initializationGate()) {
// it just did the profile exchange so we are done
return;
}
}
final Set recipients;
if (this.removeProfile) {
recipients = advisor.adviseProfileRemove();
} else if (exchangeProfiles) {
recipients = advisor.adviseProfileExchange();
} else {
recipients = advisor.adviseProfileUpdate();
}
if (recipients.isEmpty()) {
return;
}
ReplyProcessor21 processor = null;
// Scope scope = this.region.scope;
// always require an ack to prevent misordering of messages
InternalDistributedSystem system = this.advisee.getSystem();
processor = new UpdateAttributesReplyProcessor(system, recipients);
UpdateAttributesMessage message = getUpdateAttributesMessage(processor, recipients);
mgr.putOutgoing(message);
this.processor = processor;
}
UpdateAttributesMessage getUpdateAttributesMessage(ReplyProcessor21 processor, Set recipients) {
UpdateAttributesMessage msg = new UpdateAttributesMessage();
msg.adviseePath = this.advisee.getFullPath();
msg.setRecipients(recipients);
if (processor != null) {
msg.processorId = processor.getProcessorId();
}
msg.profile = this.advisee.getProfile();
msg.exchangeProfiles = this.profileExchange;
msg.removeProfile = this.removeProfile;
return msg;
}
class UpdateAttributesReplyProcessor extends ReplyProcessor21 {
UpdateAttributesReplyProcessor(InternalDistributedSystem system, Set members) {
super(system, members);
}
/**
* Registers this processor as a membership listener and returns a set of the current members.
*
* @return a Set of the current members
* @since GemFire 5.7
*/
@Override
protected Set addListenerAndGetMembers() {
DistributionAdvisor da = UpdateAttributesProcessor.this.advisee.getDistributionAdvisor();
if (da.useAdminMembersForDefault()) {
return getDistributionManager().addAllMembershipListenerAndGetAllIds(this);
} else {
return super.addListenerAndGetMembers();
}
}
/**
* Unregisters this processor as a membership listener
*
* @since GemFire 5.7
*/
@Override
protected void removeListener() {
DistributionAdvisor da = UpdateAttributesProcessor.this.advisee.getDistributionAdvisor();
if (da.useAdminMembersForDefault()) {
getDistributionManager().removeAllMembershipListener(this);
} else {
super.removeListener();
}
}
/**
* If this processor being used by controller then return ALL members; otherwise defer to super.
*
* @return a Set of the current members
* @since GemFire 5.7
*/
@Override
protected Set getDistributionManagerIds() {
DistributionAdvisor da = UpdateAttributesProcessor.this.advisee.getDistributionAdvisor();
if (da.useAdminMembersForDefault()) {
return getDistributionManager().getDistributionManagerIdsIncludingAdmin();
} else {
return super.getDistributionManagerIds();
}
}
@Override
public void process(DistributionMessage msg) {
try {
if (msg instanceof ProfilesReplyMessage) {
ProfilesReplyMessage reply = (ProfilesReplyMessage) msg;
if (reply.profiles != null) {
for (int i = 0; i < reply.profiles.length; i++) {
// @todo Add putProfiles to DistributionAdvisor to do this
// with one call atomically?
UpdateAttributesProcessor.this.advisee.getDistributionAdvisor()
.putProfile(reply.profiles[i]);
}
}
} else if (msg instanceof ProfileReplyMessage) {
ProfileReplyMessage reply = (ProfileReplyMessage) msg;
if (reply.profile != null) {
UpdateAttributesProcessor.this.advisee.getDistributionAdvisor()
.putProfile(reply.profile);
}
}
} finally {
super.process(msg);
}
}
}
public static class UpdateAttributesMessage extends HighPriorityDistributionMessage
implements MessageWithReply {
protected String adviseePath;
protected int processorId = 0;
protected Profile profile;
protected boolean exchangeProfiles = false;
protected boolean removeProfile = false;
@Override
public int getProcessorId() {
return this.processorId;
}
@Override
public boolean sendViaUDP() {
return true;
}
@Override
protected void process(ClusterDistributionManager dm) {
Throwable thr = null;
boolean sendReply = this.processorId != 0;
List<Profile> replyProfiles = null;
try {
if (this.profile != null) {
if (this.exchangeProfiles) {
replyProfiles = new ArrayList<Profile>();
}
this.profile.processIncoming(dm, this.adviseePath, this.removeProfile,
this.exchangeProfiles, replyProfiles);
}
} catch (CancelException e) {
if (logger.isDebugEnabled()) {
logger.debug("<cache closed> ///{}", this);
}
} catch (VirtualMachineError err) {
SystemFailure.initiateFailure(err);
// If this ever returns, rethrow the error. We're poisoned
// now, so don't let this thread continue.
throw err;
} catch (Throwable t) {
// Whenever you catch Error or Throwable, you must also
// catch VirtualMachineError (see above). However, there is
// _still_ a possibility that you are dealing with a cascading
// error condition, so you also need to check to see if the JVM
// is still usable:
SystemFailure.checkFailure();
thr = t;
} finally {
if (sendReply) {
ReplyException rex = null;
if (thr != null) {
rex = new ReplyException(thr);
}
if (replyProfiles == null || replyProfiles.size() <= 1) {
Profile p = null;
if (replyProfiles != null && replyProfiles.size() == 1) {
p = replyProfiles.get(0);
}
ProfileReplyMessage.send(getSender(), this.processorId, rex, dm, p);
} else {
Profile[] profiles = new Profile[replyProfiles.size()];
replyProfiles.toArray(profiles);
ProfilesReplyMessage.send(getSender(), this.processorId, rex, dm, profiles);
}
}
}
}
@Override
public String toString() {
StringBuilder buff = new StringBuilder();
buff.append("UpdateAttributesMessage (adviseePath=");
buff.append(this.adviseePath);
buff.append("; processorId=");
buff.append(this.processorId);
buff.append("; profile=");
buff.append(this.profile);
if (this.exchangeProfiles) {
buff.append("; exchangeProfiles");
}
if (this.removeProfile) {
buff.append("; removeProfile");
}
buff.append(")");
return buff.toString();
}
@Override
public int getDSFID() {
return UPDATE_ATTRIBUTES_MESSAGE;
}
@Override
public void fromData(DataInput in,
DeserializationContext context) throws IOException, ClassNotFoundException {
super.fromData(in, context);
this.adviseePath = DataSerializer.readString(in);
this.processorId = in.readInt();
// set the processor ID to be able to send reply to sender in case of any
// unexpected exception during deserialization etc.
ReplyProcessor21.setMessageRPId(this.processorId);
this.profile = DataSerializer.readObject(in);
this.exchangeProfiles = in.readBoolean();
this.removeProfile = in.readBoolean();
}
@Override
public void toData(DataOutput out,
SerializationContext context) throws IOException {
super.toData(out, context);
DataSerializer.writeString(this.adviseePath, out);
out.writeInt(this.processorId);
DataSerializer.writeObject(this.profile, out);
out.writeBoolean(this.exchangeProfiles);
out.writeBoolean(this.removeProfile);
}
}
public static class ProfileReplyMessage extends ReplyMessage {
Profile profile;
public static void send(InternalDistributedMember recipient, int processorId,
ReplyException exception, ClusterDistributionManager dm, Profile profile) {
Assert.assertTrue(recipient != null, "Sending a ProfileReplyMessage to ALL");
ProfileReplyMessage m = new ProfileReplyMessage();
m.processorId = processorId;
m.profile = profile;
if (exception != null) {
m.setException(exception);
if (logger.isDebugEnabled()) {
logger.debug("Replying with exception: {}" + m, exception);
}
}
m.setRecipient(recipient);
dm.putOutgoing(m);
}
@Override
public int getDSFID() {
return PROFILE_REPLY_MESSAGE;
}
@Override
public void fromData(DataInput in,
DeserializationContext context) throws IOException, ClassNotFoundException {
super.fromData(in, context);
this.profile = (Profile) DataSerializer.readObject(in);
}
@Override
public void toData(DataOutput out,
SerializationContext context) throws IOException {
super.toData(out, context);
DataSerializer.writeObject(this.profile, out);
}
@Override
public String toString() {
final StringBuilder buff = new StringBuilder();
buff.append("ProfileReplyMessage");
buff.append(" (processorId=");
buff.append(super.processorId);
buff.append("; profile=");
buff.append(this.profile);
buff.append(")");
return buff.toString();
}
/*
* (non-Javadoc)
*
* @see org.apache.geode.distributed.internal.ReplyMessage#getInlineProcess()
* ProfileReplyMessages must be processed in-line and not in a pool to keep partitioned region
* bucket profile exchange from swamping the high priority pool and not allowing other profile
* exchanges through. This is safe as long as ProfileReplyMessage obtains no extra
* synchronization locks.
*/
@Override
public boolean getInlineProcess() {
return true;
}
}
/**
* Used to return multiple profiles
*
* @since GemFire 5.7
*/
public static class ProfilesReplyMessage extends ReplyMessage {
Profile[] profiles;
public static void send(InternalDistributedMember recipient, int processorId,
ReplyException exception, ClusterDistributionManager dm, Profile[] profiles) {
Assert.assertTrue(recipient != null, "Sending a ProfilesReplyMessage to ALL");
ProfilesReplyMessage m = new ProfilesReplyMessage();
m.processorId = processorId;
m.profiles = profiles;
if (exception != null) {
m.setException(exception);
if (logger.isDebugEnabled()) {
logger.debug("Replying with exception: {}" + m, exception);
}
}
m.setRecipient(recipient);
dm.putOutgoing(m);
}
@Override
public int getDSFID() {
return PROFILES_REPLY_MESSAGE;
}
@Override
public void fromData(DataInput in,
DeserializationContext context) throws IOException, ClassNotFoundException {
super.fromData(in, context);
int length = in.readInt();
if (length == -1) {
this.profiles = null;
} else {
Profile[] array = new Profile[length];
for (int i = 0; i < length; i++) {
array[i] = (Profile) DataSerializer.readObject(in);
}
this.profiles = array;
}
}
@Override
public void toData(DataOutput out,
SerializationContext context) throws IOException {
super.toData(out, context);
if (this.profiles == null) {
out.writeInt(-1);
} else {
int length = this.profiles.length;
out.writeInt(length);
for (int i = 0; i < length; i++) {
DataSerializer.writeObject(this.profiles[i], out);
}
}
}
@Override
public String toString() {
final StringBuilder buff = new StringBuilder();
buff.append("ProfilesReplyMessage");
buff.append(" (processorId=");
buff.append(super.processorId);
if (this.profiles != null) {
buff.append("; profiles=");
buff.append(Arrays.asList(this.profiles));
}
buff.append(")");
return buff.toString();
}
/*
* (non-Javadoc)
*
* @see org.apache.geode.distributed.internal.ReplyMessage#getInlineProcess()
* ProfilesReplyMessages must be processed in-line and not in a pool to keep partitioned region
* bucket profile exchange from swamping the high priority pool and not allowing other profile
* exchanges through. This is safe as long as ProfilesReplyMessage obtains no extra
* synchronization locks.
*/
@Override
public boolean getInlineProcess() {
return true;
}
}
}