blob: 1376818c85970796803897dc0245d751bfc11933 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.io.Serializable;
import java.util.HashSet;
import java.util.Set;
import org.apache.logging.log4j.Logger;
import org.apache.geode.CancelException;
import org.apache.geode.DataSerializer;
import org.apache.geode.SystemFailure;
import org.apache.geode.annotations.Immutable;
import org.apache.geode.cache.CacheClosedException;
import org.apache.geode.cache.CacheException;
import org.apache.geode.cache.RegionDestroyedException;
import org.apache.geode.cache.execute.Function;
import org.apache.geode.cache.execute.FunctionException;
import org.apache.geode.cache.execute.FunctionService;
import org.apache.geode.distributed.internal.ClusterDistributionManager;
import org.apache.geode.distributed.internal.DistributionManager;
import org.apache.geode.distributed.internal.DistributionMessage;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.distributed.internal.OperationExecutors;
import org.apache.geode.distributed.internal.ReplyException;
import org.apache.geode.distributed.internal.ReplyMessage;
import org.apache.geode.distributed.internal.ReplyProcessor21;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.logging.log4j.LogMarker;
import org.apache.geode.internal.serialization.DeserializationContext;
import org.apache.geode.internal.serialization.SerializationContext;
import org.apache.geode.logging.internal.log4j.api.LogService;
public class DistributedRegionFunctionStreamingMessage extends DistributionMessage
implements TransactionMessage {
private static final Logger logger = LogService.getLogger();
private Object result;
transient int replyMsgNum = 0;
transient boolean replyLastMsg;
transient int numObjectsInChunk = 0;
private Function functionObject;
private String functionName;
Object args;
private String regionPath;
private Set filter;
private int processorId;
private boolean isReExecute;
private boolean isFnSerializationReqd;
private int txUniqId = TXManagerImpl.NOTX;
private InternalDistributedMember txMemberId = null;
private static final short IS_REEXECUTE = UNRESERVED_FLAGS_START;
/** default exception to ensure a false-positive response is never returned */
@Immutable
static final ForceReattemptException UNHANDLED_EXCEPTION =
(ForceReattemptException) new ForceReattemptException("Unknown exception").fillInStackTrace();
public DistributedRegionFunctionStreamingMessage() {}
public DistributedRegionFunctionStreamingMessage(final String regionPath, Function function,
int procId, final Set filter, Object args, boolean isReExecute,
boolean isFnSerializationReqd) {
this.functionObject = function;
this.processorId = procId;
this.args = args;
this.regionPath = regionPath;
this.filter = filter;
this.isReExecute = isReExecute;
this.isFnSerializationReqd = isFnSerializationReqd;
this.txUniqId = TXManagerImpl.getCurrentTXUniqueId();
TXStateProxy txState = TXManagerImpl.getCurrentTXState();
if (txState != null && txState.isMemberIdForwardingRequired()) {
this.txMemberId = txState.getOriginatingMember();
}
}
private TXStateProxy prepForTransaction(ClusterDistributionManager dm)
throws InterruptedException {
if (this.txUniqId == TXManagerImpl.NOTX) {
return null;
} else {
InternalCache cache = dm.getCache();
if (cache == null) {
// ignore and return, we are shutting down!
return null;
}
TXManagerImpl mgr = cache.getTXMgr();
return mgr.masqueradeAs(this);
}
}
private void cleanupTransaction(TXStateProxy tx) {
if (this.txUniqId != TXManagerImpl.NOTX) {
InternalCache cache = GemFireCacheImpl.getInstance();
if (cache == null) {
// ignore and return, we are shutting down!
return;
}
TXManagerImpl mgr = cache.getTXMgr();
mgr.unmasquerade(tx);
}
}
@Override
protected void process(final ClusterDistributionManager dm) {
Throwable thr = null;
boolean sendReply = true;
DistributedRegion dr = null;
TXStateProxy tx = null;
try {
if (checkCacheClosing(dm) || checkDSClosing(dm)) {
InternalCache cache = dm.getCache();
if (cache != null) {
thr = cache
.getCacheClosedException(String.format("Remote cache is closed: %s",
dm.getId()));
} else {
thr = new CacheClosedException(String.format("Remote cache is closed: %s",
dm.getId()));
}
return;
}
dr = (DistributedRegion) dm.getCache().getRegion(this.regionPath);
if (dr == null) {
// if the distributed system is disconnecting, don't send a reply saying
// the partitioned region can't be found (bug 36585)
thr = new ForceReattemptException(dm.getDistributionManagerId().toString()
+ ": could not find Distributed region " + regionPath);
return; // reply sent in finally block below
}
thr = UNHANDLED_EXCEPTION;
tx = prepForTransaction(dm);
sendReply = operateOnDistributedRegion(dm, dr); // need to take care of
// it...
thr = null;
} catch (CancelException se) {
// bug 37026: this is too noisy...
// throw new CacheClosedException("remote system shutting down");
// thr = se; cache is closed, no point trying to send a reply
thr = null;
sendReply = false;
if (logger.isDebugEnabled()) {
logger.debug("shutdown caught, abandoning message: {}", se.getMessage(), se);
}
} catch (RegionDestroyedException rde) {
// [bruce] RDE does not always mean that the sender's region is also
// destroyed, so we must send back an exception. If the sender's
// region is also destroyed, who cares if we send it an exception
if (dr != null && dr.isClosed()) {
thr = new ForceReattemptException("Region is destroyed in " + dm.getDistributionManagerId(),
rde);
}
} catch (VirtualMachineError err) {
SystemFailure.initiateFailure(err);
// If this ever returns, rethrow the error. We're poisoned
// now, so don't let this thread continue.
throw err;
} catch (Throwable t) {
logger.debug("{} exception occurred while processing message: {}", this, t.getMessage(), t);
// Whenever you catch Error or Throwable, you must also
// catch VirtualMachineError (see above). However, there is
// _still_ a possibility that you are dealing with a cascading
// error condition, so you also need to check to see if the JVM
// is still usable:
SystemFailure.checkFailure();
// log the exception at fine level if there is no reply to the message
thr = null;
if (sendReply && this.processorId != 0) {
if (!checkDSClosing(dm)) {
thr = t;
} else {
// don't pass arbitrary runtime exceptions and errors back if this
// cache/vm is closing
thr = new ForceReattemptException("Distributed system is disconnecting");
}
}
if (this.processorId == 0) {
logger.debug("{} exception while processing message: {}", this, t.getMessage(), t);
} else if (logger.isTraceEnabled(LogMarker.DM_VERBOSE) && (t instanceof RuntimeException)) {
logger.trace(LogMarker.DM_VERBOSE, "Exception caught while processing message", t);
}
} finally {
cleanupTransaction(tx);
if (sendReply && this.processorId != 0) {
ReplyException rex = null;
if (thr != null) {
// don't transmit the exception if this message was to a listener
// and this listener is shutting down
boolean excludeException = false;
if (!this.functionObject.isHA()) {
excludeException =
(thr instanceof CacheClosedException || (thr instanceof ForceReattemptException));
} else {
excludeException = thr instanceof ForceReattemptException;
}
if (!excludeException) {
rex = new ReplyException(thr);
}
}
// Send the reply if the operateOnPartitionedRegion returned true
sendReply(getSender(), this.processorId, dm, rex, null, 0, true, false);
}
}
}
protected boolean operateOnDistributedRegion(final ClusterDistributionManager dm,
DistributedRegion r) throws ForceReattemptException {
if (this.functionObject == null) {
ReplyMessage.send(getSender(), this.processorId,
new ReplyException(new FunctionException(
String.format("Function named %s is not registered to FunctionService",
this.functionName))),
dm, r.isInternalRegion());
return false;
}
if (logger.isTraceEnabled(LogMarker.DM_VERBOSE)) {
logger.trace(LogMarker.DM_VERBOSE, "FunctionMessage operateOnRegion: {}", r.getFullPath());
}
try {
r.executeOnRegion(this, this.functionObject, this.args, this.processorId, this.filter,
this.isReExecute);
if (!this.replyLastMsg && this.functionObject.hasResult()) {
ReplyMessage.send(getSender(), this.processorId,
new ReplyException(new FunctionException(
String.format("The function, %s, did not send last result",
functionObject.getId()))),
dm, r.isInternalRegion());
return false;
}
} catch (IOException e) {
ReplyMessage
.send(getSender(), this.processorId,
new ReplyException(
"Operation got interrupted due to shutdown in progress on remote VM", e),
dm, r.isInternalRegion());
return false;
} catch (CancelException sde) {
ReplyMessage.send(getSender(), this.processorId,
new ReplyException(new ForceReattemptException(
"Operation got interrupted due to shutdown in progress on remote VM", sde)),
dm, r.isInternalRegion());
return false;
}
// Unless there was an exception thrown, this message handles sending the
// response
return false;
}
/**
* check to see if the cache is closing
*/
private boolean checkCacheClosing(ClusterDistributionManager dm) {
InternalCache cache = dm.getCache();
return cache == null || cache.getCancelCriterion().isCancelInProgress();
}
/**
* check to see if the distributed system is closing
*
* @return true if the distributed system is closing
*/
private boolean checkDSClosing(ClusterDistributionManager dm) {
InternalDistributedSystem ds = dm.getSystem();
return (ds == null || ds.isDisconnecting());
}
@Override
public int getProcessorId() {
return this.processorId;
}
@Override
public int getDSFID() {
return DR_FUNCTION_STREAMING_MESSAGE;
}
@Override
public void fromData(DataInput in,
DeserializationContext context) throws IOException, ClassNotFoundException {
super.fromData(in, context);
short flags = in.readShort();
if ((flags & HAS_PROCESSOR_ID) != 0) {
this.processorId = in.readInt();
ReplyProcessor21.setMessageRPId(this.processorId);
}
if ((flags & HAS_TX_ID) != 0)
this.txUniqId = in.readInt();
if ((flags & HAS_TX_MEMBERID) != 0) {
this.txMemberId = DataSerializer.readObject(in);
}
Object object = DataSerializer.readObject(in);
if (object instanceof String) {
this.isFnSerializationReqd = false;
this.functionObject = FunctionService.getFunction((String) object);
if (this.functionObject == null) {
this.functionName = (String) object;
}
} else {
this.functionObject = (Function) object;
this.isFnSerializationReqd = true;
}
this.args = (Serializable) DataSerializer.readObject(in);
this.filter = (HashSet) DataSerializer.readHashSet(in);
this.regionPath = DataSerializer.readString(in);
this.isReExecute = (flags & IS_REEXECUTE) != 0;
}
@Override
public void toData(DataOutput out,
SerializationContext context) throws IOException {
super.toData(out, context);
short flags = 0;
if (this.processorId != 0)
flags |= HAS_PROCESSOR_ID;
if (this.txUniqId != TXManagerImpl.NOTX)
flags |= HAS_TX_ID;
if (this.txMemberId != null)
flags |= HAS_TX_MEMBERID;
if (this.isReExecute)
flags |= IS_REEXECUTE;
out.writeShort(flags);
if (this.processorId != 0)
out.writeInt(this.processorId);
if (this.txUniqId != TXManagerImpl.NOTX)
out.writeInt(this.txUniqId);
if (this.txMemberId != null) {
DataSerializer.writeObject(this.txMemberId, out);
}
if (this.isFnSerializationReqd) {
DataSerializer.writeObject(this.functionObject, out);
} else {
DataSerializer.writeObject(functionObject.getId(), out);
}
DataSerializer.writeObject(this.args, out);
DataSerializer.writeHashSet((HashSet) this.filter, out);
DataSerializer.writeString(this.regionPath, out);
}
public synchronized boolean sendReplyForOneResult(DistributionManager dm, Object oneResult,
boolean lastResult, boolean sendResultsInOrder)
throws CacheException, ForceReattemptException, InterruptedException {
if (this.replyLastMsg) {
return false;
}
if (Thread.interrupted())
throw new InterruptedException();
int msgNum = this.replyMsgNum;
this.replyLastMsg = lastResult;
sendReply(getSender(), this.processorId, dm, null, oneResult, msgNum, lastResult,
sendResultsInOrder);
if (logger.isDebugEnabled()) {
logger.debug("Sending reply message count: {} to co-ordinating node", replyMsgNum);
}
this.replyMsgNum++;
return false;
}
protected void sendReply(InternalDistributedMember member, int procId, DistributionManager dm,
ReplyException ex, Object result, int msgNum, boolean lastResult,
boolean sendResultsInOrder) {
// if there was an exception, then throw out any data
if (ex != null) {
this.result = null;
this.replyMsgNum = 0;
this.replyLastMsg = true;
}
if (sendResultsInOrder) {
FunctionStreamingOrderedReplyMessage.send(member, procId, ex, dm, result, msgNum, lastResult);
} else {
FunctionStreamingReplyMessage.send(member, procId, ex, dm, result, msgNum, lastResult);
}
}
@Override
public int getProcessorType() {
return OperationExecutors.REGION_FUNCTION_EXECUTION_EXECUTOR;
}
/*
* (non-Javadoc)
*
* @see org.apache.geode.internal.cache.TransactionMessage#canStartRemoteTransaction()
*/
@Override
public boolean canStartRemoteTransaction() {
return true;
}
/*
* (non-Javadoc)
*
* @see org.apache.geode.internal.cache.TransactionMessage#getTXUniqId()
*/
@Override
public int getTXUniqId() {
return txUniqId;
}
@Override
public InternalDistributedMember getMemberToMasqueradeAs() {
if (txMemberId == null) {
return getSender();
} else {
return txMemberId;
}
}
@Override
public InternalDistributedMember getTXOriginatorClient() {
// TODO Auto-generated method stub
return null;
}
@Override
public boolean canParticipateInTransaction() {
return true;
}
@Override
public boolean isTransactionDistributed() {
return false;
}
}