blob: 2c58893f4328a27d3bcdac58161ec5ae431af696 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.distributed.internal.streaming;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.logging.log4j.Logger;
import org.apache.geode.CancelException;
import org.apache.geode.DataSerializer;
import org.apache.geode.GemFireRethrowable;
import org.apache.geode.InternalGemFireError;
import org.apache.geode.InternalGemFireException;
import org.apache.geode.SystemFailure;
import org.apache.geode.annotations.Immutable;
import org.apache.geode.cache.CacheClosedException;
import org.apache.geode.cache.query.internal.DefaultQuery;
import org.apache.geode.cache.query.internal.PRQueryTraceInfo;
import org.apache.geode.cache.query.internal.QueryMonitor;
import org.apache.geode.cache.query.internal.StructImpl;
import org.apache.geode.cache.query.internal.types.StructTypeImpl;
import org.apache.geode.cache.query.types.ObjectType;
import org.apache.geode.distributed.internal.ClusterDistributionManager;
import org.apache.geode.distributed.internal.DistributionManager;
import org.apache.geode.distributed.internal.DistributionMessage;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.distributed.internal.MessageWithReply;
import org.apache.geode.distributed.internal.PooledDistributionMessage;
import org.apache.geode.distributed.internal.ReplyException;
import org.apache.geode.distributed.internal.ReplyMessage;
import org.apache.geode.distributed.internal.ReplyProcessor21;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.HeapDataOutputStream;
import org.apache.geode.internal.InternalDataSerializer;
import org.apache.geode.internal.cache.GemFireCacheImpl;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.PartitionedRegionQueryEvaluator;
import org.apache.geode.internal.cache.Token;
import org.apache.geode.internal.serialization.DeserializationContext;
import org.apache.geode.internal.serialization.SerializationContext;
import org.apache.geode.internal.serialization.Version;
import org.apache.geode.internal.util.BlobHelper;
import org.apache.geode.logging.internal.log4j.api.LogService;
/**
* StreamingOperation is an abstraction for sending messages to multiple (or single) recipient
* requesting a potentially large amount of data and receiving the reply with data chunked into
* several messages.
*/
public abstract class StreamingOperation {
private static final Logger logger = LogService.getLogger();
/**
* This is the number of bytes that need to be allowed in addition to data chunk to prevent
* overflowing the socket buffer size in one message. For now, this is just an estimate
*/
public static final int MSG_OVERHEAD = 200; // seems to need to be greater than 100
public final InternalDistributedSystem sys;
/**
* Creates a new instance of StreamingOperation
*/
public StreamingOperation(InternalDistributedSystem sys) {
this.sys = sys;
}
/**
* Returns normally if succeeded to get data, otherwise throws an exception
*
* @throws InterruptedException TODO-javadocs
*/
public void getDataFromAll(Set recipients)
throws org.apache.geode.cache.TimeoutException, InterruptedException {
if (Thread.interrupted()) {
throw new InterruptedException();
}
if (recipients.isEmpty()) {
return;
}
StreamingProcessor processor = new StreamingProcessor(this.sys, recipients);
DistributionMessage m = createRequestMessage(recipients, processor);
this.sys.getDistributionManager().putOutgoing(m);
// while() loop removed for bug 36983 - you can't loop on waitForReplies()
try {
// should we allow this to timeout?
processor.waitForRepliesUninterruptibly();
} catch (InternalGemFireException ex) {
Throwable cause = ex.getCause();
if (cause instanceof org.apache.geode.cache.TimeoutException) {
throw (org.apache.geode.cache.TimeoutException) cause;
}
throw ex;
} catch (ReplyException e) {
e.handleCause();
// throws exception
}
}
/**
* Override in subclass to instantiate request message
*/
protected abstract DistributionMessage createRequestMessage(Set recipients,
ReplyProcessor21 processor);
/**
* Called from separate thread when reply is processed.
*
* @return false if should abort (region was destroyed or cache was closed)
*/
public boolean processChunk(List objects, InternalDistributedMember sender, int sequenceNum,
boolean lastInSequence) {
return processData(objects, sender, sequenceNum, lastInSequence);
}
/**
* Override in subclass to do something useful with the data.
*
* @param sequenceNum the sequence of this data (0-based), in case ordering matters
* @param lastInSequence true if this is the last chunk in the sequence
* @return false to abort
*/
protected abstract boolean processData(List objects, InternalDistributedMember sender,
int sequenceNum, boolean lastInSequence);
public class StreamingProcessor extends ReplyProcessor21 {
protected volatile boolean abort = false;
private final Map statusMap = new HashMap();
protected final AtomicInteger msgsBeingProcessed = new AtomicInteger();
class Status {
int msgsProcessed = 0;
int numMsgs = 0;
/** Return true if this is the very last reply msg to process for this member */
protected synchronized boolean trackMessage(StreamingReplyMessage m) {
this.msgsProcessed++;
if (m.lastMsg) {
this.numMsgs = m.msgNum + 1;
}
if (logger.isDebugEnabled()) {
logger.debug(
"Streaming Message Tracking Status: Processor id: {}; Sender: {}; Messages Processed: {}; NumMsgs: {}",
getProcessorId(), m.getSender(), this.msgsProcessed, this.numMsgs);
}
// this.numMsgs starts out as zero and gets initialized
// only when we get a lastMsg true.
// Since we increment msgsProcessed, the following condition
// cannot be true until sometime after we've received the
// lastMsg, and signals that all messages have been processed
return this.msgsProcessed == this.numMsgs;
}
}
public StreamingProcessor(final InternalDistributedSystem system,
InternalDistributedMember member) {
super(system, member);
}
public StreamingProcessor(InternalDistributedSystem system, Set members) {
super(system, members);
}
@Override
public void process(DistributionMessage msg) {
// ignore messages from members not in the wait list
if (!waitingOnMember(msg.getSender())) {
return;
}
this.msgsBeingProcessed.incrementAndGet();
try {
StreamingReplyMessage m = (StreamingReplyMessage) msg;
boolean isLast = true; // is last message for this member?
List objects = m.getObjects();
if (objects != null) { // CONSTRAINT: objects should only be null if there's no data at all
// Bug 37461: don't allow abort to be reset.
boolean isAborted = this.abort; // volatile fetch
if (!isAborted) {
isAborted = !processChunk(objects, m.getSender(), m.msgNum, m.lastMsg);
if (isAborted) {
this.abort = true; // volatile store
}
}
isLast = isAborted || trackMessage(m); // interpret msgNum
// @todo ezoerner send an abort message to data provider if
// !doContinue (region was destroyed or cache closed);
// also provide ability to explicitly cancel
} else {
// if a null chunk was received (no data), then
// we're done with that member
isLast = true;
}
if (isLast) {
super.process(msg, false); // removes from members and cause us to
// ignore future messages received from that member
}
} finally {
this.msgsBeingProcessed.decrementAndGet();
checkIfDone(); // check to see if decrementing msgsBeingProcessed requires signalling to
// proceed
}
}
/**
* Contract of {@link ReplyProcessor21#stillWaiting()} is to never return true after returning
* false.
*/
private volatile boolean finishedWaiting = false;
/**
* Overridden to wait for messages being currently processed: This situation can come about if a
* member departs while we are still processing data from that member
*/
@Override
protected boolean stillWaiting() {
if (finishedWaiting) { // volatile fetch
return false;
}
if (this.msgsBeingProcessed.get() > 0) {
// to fix bug 37391 always wait for msgsBeingPRocessod to go to 0,
// even if abort is true
return true;
}
// volatile fetches
finishedWaiting = finishedWaiting || this.abort || !super.stillWaiting();
return !finishedWaiting;
}
@Override
public String toString() {
return "<" + this.getClass().getName() + " " + this.getProcessorId() + " waiting for "
+ numMembers() + " replies" + (exception == null ? "" : (" exception: " + exception))
+ " from " + membersToString() + "; waiting for " + this.msgsBeingProcessed.get()
+ " messages in the process of being processed" + ">";
}
protected boolean trackMessage(StreamingReplyMessage m) {
Status status;
synchronized (this) {
status = (Status) this.statusMap.get(m.getSender());
if (status == null) {
status = new Status();
this.statusMap.put(m.getSender(), status);
}
}
return status.trackMessage(m);
}
}
public abstract static class RequestStreamingMessage extends PooledDistributionMessage
implements MessageWithReply {
protected int processorId;
@Override
public int getProcessorId() {
return this.processorId;
}
@Override
protected void process(final ClusterDistributionManager dm) {
Throwable thr = null;
ReplyException rex = null;
Object nextObject = null;
Object failedObject = null;
int socketBufferSize = dm.getSystem().getConfig().getSocketBufferSize();
int chunkSize = socketBufferSize - MSG_OVERHEAD;
HeapDataOutputStream outStream =
new HeapDataOutputStream(chunkSize, getSender().getVersionObject());
boolean sentFinalMessage = false;
boolean receiverCacheClosed = false;
int msgNum = 0;
try {
do {
int numObjectsInChunk = 0;
// boolean firstObject = true;
// always write at least one object, allowing expansion
// if we have an object already that didn't get added, then use
// that object instead of getting another one
if (failedObject == null) {
nextObject = getNextReplyObject();
} else {
nextObject = failedObject;
failedObject = null;
}
if (nextObject != Token.END_OF_STREAM) {
numObjectsInChunk = 1;
BlobHelper.serializeTo(nextObject, outStream);
// for the next objects, disallow stream from allocating more storage
do {
outStream.disallowExpansion(CHUNK_FULL); // sets the mark where rollback occurs on
// CHUNK_FULL
nextObject = getNextReplyObject();
if (nextObject != Token.END_OF_STREAM) {
try {
BlobHelper.serializeTo(nextObject, outStream);
numObjectsInChunk++;
} catch (GemFireRethrowable e) {
// can only be thrown when expansion is disallowed
// and buffer is automatically reset to point where it was disallowed
failedObject = nextObject;
break;
}
}
} while (nextObject != Token.END_OF_STREAM);
}
try {
replyWithData(dm, outStream, numObjectsInChunk, msgNum++,
nextObject == Token.END_OF_STREAM);
if (nextObject == Token.END_OF_STREAM) {
sentFinalMessage = true;
}
} catch (CancelException e) {
receiverCacheClosed = true;
break; // receiver no longer cares
}
outStream.reset(); // ready for reuse, assumes replyWithData
// does not queue the message but outStream has
// already been used
} while (nextObject != Token.END_OF_STREAM);
// } catch (CancelException e) {
// // if cache is closed, we cannot send a reply (correct?)
} catch (VirtualMachineError err) {
SystemFailure.initiateFailure(err);
// If this ever returns, rethrow the error. We're poisoned
// now, so don't let this thread continue.
throw err;
} catch (Throwable t) {
// Whenever you catch Error or Throwable, you must also
// catch VirtualMachineError (see above). However, there is
// _still_ a possibility that you are dealing with a cascading
// error condition, so you also need to check to see if the JVM
// is still usable:
SystemFailure.checkFailure();
thr = t;
}
if (thr != null) {
rex = new ReplyException(thr);
replyWithException(dm, rex);
} else if (!sentFinalMessage && !receiverCacheClosed) {
throw new InternalGemFireError(
"this should not happen");
// replyNoData(dm);
}
}
/**
* override in subclass to provide reply data. terminate by returning Token.END_OF_STREAM
*/
protected abstract Object getNextReplyObject() throws InterruptedException;
// private void replyNoData(DistributionManager dm) {
// StreamingReplyMessage.send(getSender(), this.processorId, null, dm, null, 0, 0, true);
// }
protected void replyWithData(ClusterDistributionManager dm, HeapDataOutputStream outStream,
int numObjects, int msgNum, boolean lastMsg) {
StreamingReplyMessage.send(getSender(), this.processorId, null, dm, outStream, numObjects,
msgNum, lastMsg);
}
protected void replyWithException(ClusterDistributionManager dm, ReplyException rex) {
StreamingReplyMessage.send(getSender(), this.processorId, rex, dm, null, 0, 0, true);
}
@Override
public void fromData(DataInput in,
DeserializationContext context) throws IOException, ClassNotFoundException {
super.fromData(in, context);
this.processorId = in.readInt();
}
@Override
public void toData(DataOutput out,
SerializationContext context) throws IOException {
super.toData(out, context);
out.writeInt(this.processorId);
}
@Override
public String toString() {
StringBuffer buff = new StringBuffer();
buff.append(getClass().getName());
buff.append("'; sender=");
buff.append(getSender());
buff.append("; processorId=");
buff.append(this.processorId);
buff.append(")");
return buff.toString();
}
}
public static class StreamingReplyMessage extends ReplyMessage {
/** the number of this message */
protected int msgNum;
/** whether this message is the last one in this series */
protected boolean lastMsg;
private transient HeapDataOutputStream chunkStream; // used only on sending side, null means
// abort
private transient int numObjects; // used only on sending side
private transient List objectList = null; // used only on receiving side
private boolean pdxReadSerialized = false; // used to read PDX types in serialized form.
private transient boolean isCanceled = false; // used only on receiving side and if
// messageProcessor is of type
// PartitionedRegionQueryEvaluator.StreamingQueryPartitionResponse
/**
* @param chunkStream the data to send back, if null then all the following parameters are
* ignored and any future replies from this member will be ignored, and the streaming of
* chunks is considered aborted by the receiver.
*
* @param msgNum message number in this series (0-based)
* @param lastMsg if this is the last message in this series
*/
public static void send(InternalDistributedMember recipient, int processorId,
ReplyException exception, DistributionManager dm, HeapDataOutputStream chunkStream,
int numObjects, int msgNum, boolean lastMsg) {
send(recipient, processorId, exception, dm, chunkStream, numObjects, msgNum, lastMsg, false);
}
public static void send(InternalDistributedMember recipient, int processorId,
ReplyException exception, DistributionManager dm, HeapDataOutputStream chunkStream,
int numObjects, int msgNum, boolean lastMsg, boolean pdxReadSerialized) {
StreamingReplyMessage replyMessage = new StreamingReplyMessage();
replyMessage.processorId = processorId;
if (exception != null) {
replyMessage.setException(exception);
logger.debug("Replying with exception: {}", replyMessage, exception);
}
replyMessage.chunkStream = chunkStream;
replyMessage.numObjects = numObjects;
replyMessage.setRecipient(recipient);
replyMessage.msgNum = msgNum;
replyMessage.lastMsg = lastMsg;
replyMessage.pdxReadSerialized = pdxReadSerialized;
dm.putOutgoing(replyMessage);
}
public int getMessageNumber() {
return this.msgNum;
}
public boolean isLastMessage() {
return this.lastMsg;
}
public boolean isCanceled() {
return isCanceled;
}
/** Return the objects in this chunk as a List, used only on receiving side */
public List getObjects() {
return this.objectList;
}
@Override
public int getDSFID() {
return STREAMING_REPLY_MESSAGE;
}
@Override
public void fromData(DataInput in,
DeserializationContext context) throws IOException, ClassNotFoundException {
int n;
super.fromData(in, context);
n = in.readInt();
this.msgNum = in.readInt();
this.lastMsg = in.readBoolean();
this.pdxReadSerialized = in.readBoolean();
Version senderVersion = InternalDataSerializer.getVersionForDataStream(in);
boolean isSenderAbove_8_1 = senderVersion.compareTo(Version.GFE_81) > 0;
InternalCache cache = null;
Boolean initialPdxReadSerialized = false;
try {
cache =
(InternalCache) GemFireCacheImpl.getForPdx("fromData invocation in StreamingOperation");
initialPdxReadSerialized = cache.getPdxReadSerializedOverride();
} catch (CacheClosedException e) {
logger.debug("Cache is closed. PdxReadSerializedOverride set to false");
}
if (n == -1) {
this.objectList = null;
} else {
this.numObjects = n; // for benefit of toString()
this.objectList = new ArrayList(n);
// Check if the PDX types needs to be kept in serialized form.
// This will make readObject() to return PdxInstance form.
if (this.pdxReadSerialized && cache != null) {
cache.setPdxReadSerializedOverride(true);
}
try {
ReplyProcessor21 messageProcessor = ReplyProcessor21.getProcessor(processorId);
boolean isQueryMessageProcessor =
messageProcessor instanceof PartitionedRegionQueryEvaluator.StreamingQueryPartitionResponse;
ObjectType elementType = null;
if (isQueryMessageProcessor) {
elementType =
((PartitionedRegionQueryEvaluator.StreamingQueryPartitionResponse) messageProcessor)
.getResultType();
}
boolean lowMemoryDetected = false;
for (int i = 0; i < n; i++) {
// TestHook used in ResourceManagerWithQueryMonitorDUnitTest.
// will simulate an critical memory event after a certain number of calls to
// doTestHook(BEFORE_ADD_OR_UPDATE_MAPPING_OR_DESERIALIZING_NTH_STREAMINGOPERATION)
if (DefaultQuery.testHook != null) {
DefaultQuery.testHook.doTestHook(
DefaultQuery.TestHook.SPOTS.BEFORE_ADD_OR_UPDATE_MAPPING_OR_DESERIALIZING_NTH_STREAMINGOPERATION,
null, null);
}
if (isQueryMessageProcessor && QueryMonitor.isLowMemory()) {
lowMemoryDetected = true;
break;
}
Object o = DataSerializer.readObject(in);
if (isQueryMessageProcessor && elementType != null && elementType.isStructType()) {
boolean convertToStruct = isSenderAbove_8_1;
if (convertToStruct && i == 0) {
convertToStruct = !(o instanceof PRQueryTraceInfo);
}
if (convertToStruct) {
o = new StructImpl((StructTypeImpl) elementType, (Object[]) o);
}
}
this.objectList.add(o);
}
if (lowMemoryDetected) {
isCanceled = true;
// TestHook to help verify that objects have been rejected.
if (DefaultQuery.testHook != null) {
DefaultQuery.testHook.doTestHook(
DefaultQuery.TestHook.SPOTS.LOW_MEMORY_WHEN_DESERIALIZING_STREAMINGOPERATION,
null, null);
}
}
} finally {
if (this.pdxReadSerialized && cache != null) {
cache.setPdxReadSerializedOverride(initialPdxReadSerialized);
}
}
}
}
@Override
public void toData(DataOutput out,
SerializationContext context) throws IOException {
super.toData(out, context);
if (this.chunkStream == null) {
out.writeInt(-1);
} else {
out.writeInt(this.numObjects);
}
out.writeInt(this.msgNum);
out.writeBoolean(this.lastMsg);
out.writeBoolean(this.pdxReadSerialized);
if (this.chunkStream != null && this.numObjects > 0) {
this.chunkStream.sendTo(out);
}
}
@Override
public String toString() {
StringBuffer buff = new StringBuffer();
buff.append(getClass().getName());
buff.append("(processorId=");
buff.append(this.processorId);
buff.append(" from ");
buff.append(this.getSender());
ReplyException ex = this.getException();
if (ex != null) {
buff.append(" with exception ");
buff.append(ex);
}
buff.append(";numObjects=");
buff.append(this.numObjects);
buff.append(";msgNum ");
buff.append(this.msgNum);
buff.append(";lastMsg=");
buff.append(this.lastMsg);
if (this.objectList != null) {
buff.append(";objectList(size=");
buff.append(this.objectList.size());
buff.append(")");
} else {
buff.append(";chunkStream=");
buff.append(this.chunkStream);
}
buff.append(")");
return buff.toString();
}
}
@Immutable
public static final GemFireRethrowable CHUNK_FULL = new GemFireRethrowable();
}