blob: 2f59608177e7243c39d8f26bb1091f99722fab00 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.distributed.internal;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.logging.log4j.Logger;
import org.apache.geode.CancelException;
import org.apache.geode.DataSerializer;
import org.apache.geode.InvalidDeltaException;
import org.apache.geode.cache.EntryNotFoundException;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.Assert;
import org.apache.geode.internal.cache.versions.ConcurrentCacheModificationException;
import org.apache.geode.internal.serialization.DeserializationContext;
import org.apache.geode.internal.serialization.SerializationContext;
import org.apache.geode.logging.internal.log4j.api.LogService;
/**
* A message that acknowledges that an operation completed successfully, or threw a CacheException.
* Note that even though this message has a <code>processorId</code>, it is not a
* {@link MessageWithReply} because it is sent in <b>reply</b> to another message.
*
*
*/
public class ReplyMessage extends HighPriorityDistributionMessage {
private static final Logger logger = LogService.getLogger();
/** The shared obj id of the ReplyProcessor */
protected int processorId;
protected boolean ignored = false;
protected boolean closed = false;
private boolean returnValueIsException;
private Object returnValue;
private transient boolean sendViaJGroups;
protected transient boolean internal;
public void setProcessorId(int id) {
this.processorId = id;
}
@Override
public boolean sendViaUDP() {
return this.sendViaJGroups;
}
public void setException(ReplyException ex) {
this.returnValue = ex;
this.returnValueIsException = true;
}
public void setReturnValue(Object o) {
this.returnValue = o;
this.returnValueIsException = false;
}
/** ReplyMessages are always processed in-line, though subclasses are not */
@Override
public boolean getInlineProcess() {
return this.getClass().equals(ReplyMessage.class);
}
/** Send an ack */
public static void send(InternalDistributedMember recipient, int processorId,
ReplyException exception, ReplySender dm) {
send(recipient, processorId, exception, dm, false);
}
/** Send an ack */
public static void send(InternalDistributedMember recipient, int processorId,
ReplyException exception, ReplySender dm, boolean internal) {
Assert.assertTrue(recipient != null, "Sending a ReplyMessage to ALL");
ReplyMessage m = new ReplyMessage();
m.processorId = processorId;
if (exception != null) {
m.returnValue = exception;
m.returnValueIsException = true;
}
if (exception != null && logger.isDebugEnabled()) {
Throwable cause = exception.getCause();
if (cause instanceof EntryNotFoundException) {
logger.debug("Replying with entry-not-found: {}", exception.getCause().getMessage());
} else if (cause instanceof ConcurrentCacheModificationException) {
logger.debug("Replying with concurrent-modification-exception");
} else if (cause instanceof CancelException) {
// no need to log this - it will show up in normal debug-level logs when the reply is sent
} else {
logger.debug("Replying with exception: " + m, exception);
}
}
m.setRecipient(recipient);
dm.putOutgoing(m);
}
/** Send an ack */
public static void send(InternalDistributedMember recipient, int processorId, Object returnValue,
ReplySender dm) {
Assert.assertTrue(recipient != null, "Sending a ReplyMessage to ALL");
ReplyMessage m = new ReplyMessage();
m.processorId = processorId;
if (returnValue != null) {
m.returnValue = returnValue;
m.returnValueIsException = false;
}
m.setRecipient(recipient);
dm.putOutgoing(m);
}
public static void send(InternalDistributedMember recipient, int processorId,
ReplyException exception, ReplySender dm, boolean ignored, boolean closed,
boolean sendViaJGroups) {
send(recipient, processorId, exception, dm, ignored, false, false, false);
}
public static void send(InternalDistributedMember recipient, int processorId,
ReplyException exception, ReplySender dm, boolean ignored, boolean closed,
boolean sendViaJGroups, boolean internal) {
Assert.assertTrue(recipient != null, "Sending a ReplyMessage to ALL");
ReplyMessage m = new ReplyMessage();
m.processorId = processorId;
m.ignored = ignored;
if (exception != null) {
m.returnValue = exception;
m.returnValueIsException = true;
}
m.closed = closed;
m.sendViaJGroups = sendViaJGroups;
if (logger.isDebugEnabled()) {
if (exception != null && ignored) {
if (exception.getCause() instanceof InvalidDeltaException) {
logger.debug("Replying with invalid-delta: {}", exception.getCause().getMessage());
} else {
logger.debug("Replying with ignored=true and exception: {}", m, exception);
}
} else if (exception != null) {
if (exception.getCause() != null
&& (exception.getCause() instanceof EntryNotFoundException)) {
logger.debug("Replying with entry-not-found: {}", exception.getCause().getMessage());
} else if (exception.getCause() != null
&& (exception.getCause() instanceof ConcurrentCacheModificationException)) {
logger.debug("Replying with concurrent-modification-exception");
} else {
logger.debug("Replying with exception: {}", m, exception);
}
} else if (ignored) {
logger.debug("Replying with ignored=true: {}", m);
}
}
m.setRecipient(recipient);
dm.putOutgoing(m);
}
/**
* Processes this message. This method is invoked by the receiver of the message if the message is
* not direct ack. If the message is a direct ack, the process(dm, ReplyProcessor) method is
* invoked instead.
*
* @param dm the distribution manager that is processing the message.
*/
@Override
protected void process(final ClusterDistributionManager dm) {
dmProcess(dm);
}
public void dmProcess(final DistributionManager dm) {
final long startTime = getTimestamp();
ReplyProcessor21 processor = ReplyProcessor21.getProcessor(processorId);
try {
this.process(dm, processor);
if (DistributionStats.enableClockStats) {
dm.getStats().incReplyMessageTime(DistributionStats.getStatTime() - startTime);
}
} catch (RuntimeException ex) {
if (processor != null) {
processor.cancel(getSender(), ex);
}
throw ex;
}
}
public void process(final DistributionManager dm, ReplyProcessor21 processor) {
if (processor == null)
return;
processor.process(ReplyMessage.this);
}
public Object getReturnValue() {
if (this.returnValueIsException) {
return null;
} else {
return this.returnValue;
}
}
public ReplyException getException() {
if (this.returnValueIsException) {
ReplyException exception = (ReplyException) this.returnValue;
if (exception != null) {
InternalDistributedMember sendr = getSender();
if (sendr != null) {
exception.setSenderIfNull(sendr);
}
}
return exception;
} else {
return null;
}
}
public boolean getIgnored() {
return this.ignored;
}
public boolean getClosed() {
return this.closed;
}
////////////////////// Utility Methods //////////////////////
@Override
public int getDSFID() {
return REPLY_MESSAGE;
}
// 8 bits in a byte
// keeping this consistent with HAS_PROCESSOR_ID in PartitionMessage etc
public static final byte PROCESSOR_ID_FLAG = 0x01;
public static final byte IGNORED_FLAG = 0x02;
public static final byte EXCEPTION_FLAG = 0x04;
public static final byte CLOSED_FLAG = 0x08;
public static final byte HAS_TX_CHANGES = 0x10;
public static final byte TIME_STATS_SET = 0x20;
public static final byte OBJECT_FLAG = 0x40;
public static final byte INTERNAL_FLAG = (byte) 0x80;
private static boolean testFlag(byte status, byte flag) {
return (status & flag) != 0;
}
@Override
public void toData(DataOutput out,
SerializationContext context) throws IOException {
super.toData(out, context);
byte status = 0;
if (this.ignored) {
status |= IGNORED_FLAG;
}
if (this.returnValueIsException) {
status |= EXCEPTION_FLAG;
} else if (this.returnValue != null) {
status |= OBJECT_FLAG;
}
if (this.processorId != 0) {
status |= PROCESSOR_ID_FLAG;
}
if (this.closed) {
status |= CLOSED_FLAG;
}
if (this.internal) {
status |= INTERNAL_FLAG;
}
out.writeByte(status);
if (this.processorId != 0) {
out.writeInt(processorId);
}
if (this.returnValueIsException || this.returnValue != null) {
DataSerializer.writeObject(this.returnValue, out);
}
}
@Override
public void fromData(DataInput in,
DeserializationContext context) throws IOException, ClassNotFoundException {
super.fromData(in, context);
byte status = in.readByte();
this.ignored = testFlag(status, IGNORED_FLAG);
this.closed = testFlag(status, CLOSED_FLAG);
if (testFlag(status, PROCESSOR_ID_FLAG)) {
this.processorId = in.readInt();
}
if (testFlag(status, EXCEPTION_FLAG)) {
this.returnValue = DataSerializer.readObject(in);
this.returnValueIsException = true;
} else if (testFlag(status, OBJECT_FLAG)) {
this.returnValue = DataSerializer.readObject(in);
}
this.internal = testFlag(status, INTERNAL_FLAG);
}
protected StringBuilder getStringBuilder() {
StringBuilder sb = new StringBuilder();
sb.append(getShortClassName());
sb.append(" processorId=");
sb.append(this.processorId);
sb.append(" from ");
sb.append(this.getSender());
ReplyException ex = getException();
if (ex != null) {
if (ex.getCause() != null && ex.getCause() instanceof InvalidDeltaException) {
sb.append(" with request for full value");
} else {
sb.append(" with exception ");
sb.append(ex);
}
}
return sb;
}
@Override
public boolean isInternal() {
return this.internal;
}
@Override
public String toString() {
return getStringBuilder().toString();
}
}