blob: 83fb741164292e5cdd9c07c52772ec1d4e0900f4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache.tx;
import java.io.ByteArrayInputStream;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import org.apache.logging.log4j.Logger;
import org.apache.geode.CancelException;
import org.apache.geode.DataSerializer;
import org.apache.geode.cache.TransactionDataNodeHasDepartedException;
import org.apache.geode.cache.TransactionException;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.internal.ClusterDistributionManager;
import org.apache.geode.distributed.internal.DistributionManager;
import org.apache.geode.distributed.internal.DistributionMessage;
import org.apache.geode.distributed.internal.DistributionStats;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.distributed.internal.ReplyException;
import org.apache.geode.distributed.internal.ReplyMessage;
import org.apache.geode.distributed.internal.ReplyProcessor21;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.Assert;
import org.apache.geode.internal.HeapDataOutputStream;
import org.apache.geode.internal.cache.InitialImageOperation;
import org.apache.geode.internal.cache.LocalRegion;
import org.apache.geode.internal.cache.PartitionedRegion;
import org.apache.geode.internal.cache.RemoteOperationException;
import org.apache.geode.internal.logging.log4j.LogMarker;
import org.apache.geode.internal.serialization.DeserializationContext;
import org.apache.geode.internal.serialization.SerializationContext;
import org.apache.geode.internal.util.ObjectIntProcedure;
import org.apache.geode.logging.internal.log4j.api.LogService;
public class RemoteFetchKeysMessage extends RemoteOperationMessage {
private static final Logger logger = LogService.getLogger();
public RemoteFetchKeysMessage() {}
private RemoteFetchKeysMessage(InternalDistributedMember recipient, String regionPath,
ReplyProcessor21 processor) {
super(recipient, regionPath, processor);
}
@Override
protected boolean operateOnRegion(ClusterDistributionManager dm, LocalRegion r, long startTime)
throws RemoteOperationException {
if (!(r instanceof PartitionedRegion)) { // prs already wait on initialization
r.waitOnInitialization(); // bug #43371 - accessing a region before it's initialized
}
Set<?> keys = r.keySet();
try {
RemoteFetchKeysReplyMessage.send(getSender(), processorId, dm, keys);
} catch (IOException io) {
if (logger.isDebugEnabled()) {
logger.debug("Caught exception while sending keys: {}", io.getMessage(), io);
throw new RemoteOperationException(
"Unable to send response to fetch keys request",
io);
}
}
return false;
}
@Override
public int getDSFID() {
return R_FETCH_KEYS_MESSAGE;
}
/**
* @return the response
*/
public static FetchKeysResponse send(LocalRegion currRegion, DistributedMember target) {
FetchKeysResponse response =
new FetchKeysResponse(currRegion.getSystem(), (InternalDistributedMember) target);
RemoteFetchKeysMessage msg = new RemoteFetchKeysMessage((InternalDistributedMember) target,
currRegion.getFullPath(), response);
currRegion.getSystem().getDistributionManager().putOutgoing(msg);
return response;
}
@Override
public void toData(DataOutput out,
SerializationContext context) throws IOException {
super.toData(out, context);
}
@Override
public void fromData(DataInput in,
DeserializationContext context) throws IOException, ClassNotFoundException {
super.fromData(in, context);
}
public static class RemoteFetchKeysReplyMessage extends ReplyMessage {
/** The number of the series */
int seriesNum;
/** The message number in the series */
int msgNum;
/** The total number of series */
int numSeries;
/** Whether this is the last of a series */
boolean lastInSeries;
/** the stream holding the chunk to send */
transient HeapDataOutputStream chunkStream;
/** the array holding data received */
transient byte[] chunk;
/**
* Empty constructor to conform to DataSerializable interface
*/
public RemoteFetchKeysReplyMessage() {}
private RemoteFetchKeysReplyMessage(InternalDistributedMember recipient, int processorId,
HeapDataOutputStream chunk, int seriesNum, int msgNum, int numSeries,
boolean lastInSeries) {
super();
setRecipient(recipient);
setProcessorId(processorId);
this.seriesNum = seriesNum;
this.msgNum = msgNum;
this.numSeries = numSeries;
this.lastInSeries = lastInSeries;
this.chunkStream = chunk;
}
/**
* Send an ack
*
* @throws IOException if the peer is no longer available
*/
public static void send(final InternalDistributedMember recipient, final int processorId,
final DistributionManager dm, Set<?> keys) throws IOException {
Assert.assertTrue(recipient != null, "FetchKeysReplyMessage NULL reply message");
final int numSeries = 1;
final int seriesNum = 0;
// chunkEntries returns false if didn't finish
if (logger.isDebugEnabled()) {
logger.debug("Starting region keys chunking for {} keys to member {}", keys.size(),
recipient);
}
boolean finished = chunkSet(recipient, keys, InitialImageOperation.CHUNK_SIZE_IN_BYTES, false,
new ObjectIntProcedure() {
int msgNum = 0;
boolean last = false;
/**
* @param a byte[] chunk
* @param b positive if last chunk
* @return true to continue to next chunk
*/
@Override
public boolean executeWith(Object a, int b) {
HeapDataOutputStream chunk = (HeapDataOutputStream) a;
this.last = b > 0;
try {
boolean okay = sendChunk(recipient, processorId, dm, chunk, seriesNum, msgNum++,
numSeries, this.last);
return okay;
} catch (CancelException e) {
return false;
}
}
});
if (logger.isDebugEnabled()) {
logger.debug("{} region keys chunking", (finished ? "Finished" : "DID NOT complete"));
}
}
static boolean sendChunk(InternalDistributedMember recipient, int processorId,
DistributionManager dm, HeapDataOutputStream chunk, int seriesNum, int msgNum,
int numSeries, boolean lastInSeries) {
RemoteFetchKeysReplyMessage reply = new RemoteFetchKeysReplyMessage(recipient, processorId,
chunk, seriesNum, msgNum, numSeries, lastInSeries);
Set<?> failures = dm.putOutgoing(reply);
return (failures == null) || (failures.size() == 0);
}
/**
* Serialize the given set's elements into byte[] chunks, calling proc for each one. proc args:
* the byte[] chunk and an int indicating whether it is the last chunk (positive means last
* chunk, zero otherwise). The return value of proc indicates whether to continue to the next
* chunk (true) or abort (false).
*
* @return true if finished all chunks, false if stopped early
*/
static boolean chunkSet(InternalDistributedMember recipient, Set<?> set,
int CHUNK_SIZE_IN_BYTES, boolean includeValues, ObjectIntProcedure proc)
throws IOException {
@SuppressWarnings("rawtypes")
Iterator it = set.iterator();
boolean keepGoing = true;
boolean sentLastChunk = false;
// always write at least one chunk
final HeapDataOutputStream mos = new HeapDataOutputStream(
InitialImageOperation.CHUNK_SIZE_IN_BYTES + 2048, recipient.getVersionObject());
do {
mos.reset();
int avgItemSize = 0;
int itemCount = 0;
while ((mos.size() + avgItemSize) < InitialImageOperation.CHUNK_SIZE_IN_BYTES
&& it.hasNext()) {
Object key = it.next();
DataSerializer.writeObject(key, mos);
// Note we track the itemCount so we can compute avgItemSize
itemCount++;
// Note we track avgItemSize to help us not to always go one item
// past the max chunk size. When we go past it causes us to grow
// the ByteBuffer that the chunk is stored in resulting in a copy
// of the data.
avgItemSize = mos.size() / itemCount;
}
// Write "end of chunk" entry to indicate end of chunk
DataSerializer.writeObject((Object) null, mos);
// send 1 for last message if no more data
int lastMsg = it.hasNext() ? 0 : 1;
keepGoing = proc.executeWith(mos, lastMsg);
sentLastChunk = lastMsg == 1 && keepGoing;
// if this region is destroyed while we are sending data, then abort.
} while (keepGoing && it.hasNext());
// return false if we were told to abort
return sentLastChunk;
}
/**
* Processes this message. This method is invoked by the receiver of the message.
*
* @param dm the distribution manager that is processing the message.
*/
@Override
public void process(final DistributionManager dm, final ReplyProcessor21 p) {
final long startTime = getTimestamp();
FetchKeysResponse processor = (FetchKeysResponse) p;
if (processor == null) {
if (logger.isTraceEnabled(LogMarker.DM_VERBOSE)) {
logger.trace(LogMarker.DM_VERBOSE, "FetchKeysReplyMessage processor not found");
}
return;
}
processor.process(this);
if (logger.isTraceEnabled(LogMarker.DM_VERBOSE)) {
logger.trace(LogMarker.DM_VERBOSE, "{} Remote-processed {}", processor, this);
}
dm.getStats().incReplyMessageTime(DistributionStats.getStatTime() - startTime);
}
@Override
public void toData(DataOutput out,
SerializationContext context) throws IOException {
super.toData(out, context);
out.writeInt(this.seriesNum);
out.writeInt(this.msgNum);
out.writeInt(this.numSeries);
out.writeBoolean(this.lastInSeries);
DataSerializer.writeObjectAsByteArray(this.chunkStream, out);
}
@Override
public int getDSFID() {
return R_FETCH_KEYS_REPLY;
}
@Override
public void fromData(DataInput in,
DeserializationContext context) throws IOException, ClassNotFoundException {
super.fromData(in, context);
this.seriesNum = in.readInt();
this.msgNum = in.readInt();
this.numSeries = in.readInt();
this.lastInSeries = in.readBoolean();
this.chunk = DataSerializer.readByteArray(in);
}
@Override
public String toString() {
StringBuffer sb = new StringBuffer();
sb.append("RemoteFetchKeysReplyMessage ").append("processorid=").append(this.processorId);
if (getSender() != null) {
sb.append(",sender=").append(this.getSender());
}
sb.append(",seriesNum=").append(seriesNum).append(",msgNum=").append(msgNum)
.append(",numSeries=").append(numSeries).append(",lastInSeries=").append(lastInSeries);
if (chunkStream != null) {
sb.append(",size=").append(chunkStream.size());
} else if (chunk != null) {
sb.append(",size=").append(chunk.length);
}
if (getException() != null) {
sb.append(", exception=").append(getException());
}
return sb.toString();
}
}
public static class FetchKeysResponse extends ReplyProcessor21 {
private final Set<Object> returnValue;
/** lock used to synchronize chunk processing */
private final Object endLock = new Object();
/** number of chunks processed */
private volatile int chunksProcessed;
/** chunks expected (set when last chunk has been processed */
private volatile int chunksExpected;
/** whether the last chunk has been processed */
private volatile boolean lastChunkReceived;
public FetchKeysResponse(InternalDistributedSystem system, InternalDistributedMember member) {
super(system, member);
returnValue = new HashSet<>();
}
@Override
public void process(DistributionMessage msg) {
boolean doneProcessing = false;
try {
if (msg instanceof RemoteFetchKeysReplyMessage) {
RemoteFetchKeysReplyMessage fkrm = (RemoteFetchKeysReplyMessage) msg;
if (fkrm.getException() != null) {
doneProcessing = true;
} else {
doneProcessing = processChunk((RemoteFetchKeysReplyMessage) msg);
}
} else {
doneProcessing = true;
}
} finally {
if (doneProcessing) {
super.process(msg);
}
}
}
/**
* @return true if done processing
*/
boolean processChunk(RemoteFetchKeysReplyMessage msg) {
// this processing algorithm won't work well if there are multiple recipients. currently the
// retry logic for failed recipients is in PartitionedRegion. If we parallelize the sending
// of this message, we'll need to handle fail over in this processor class and track results
// differently.
boolean doneProcessing = false;
try {
ByteArrayInputStream byteStream = new ByteArrayInputStream(msg.chunk);
DataInputStream in = new DataInputStream(byteStream);
while (in.available() > 0) {
Object key = DataSerializer.readObject(in);
if (key != null) {
synchronized (returnValue) {
returnValue.add(key);
}
} else {
// null should signal the end of the set of keys
Assert.assertTrue(in.available() == 0);
}
}
synchronized (this.endLock) {
chunksProcessed = chunksProcessed + 1;
if (((msg.seriesNum + 1) == msg.numSeries) && msg.lastInSeries) {
lastChunkReceived = true;
chunksExpected = msg.msgNum + 1;
}
if (lastChunkReceived && (chunksExpected == chunksProcessed)) {
doneProcessing = true;
}
if (logger.isTraceEnabled(LogMarker.DM_VERBOSE)) {
logger.trace(LogMarker.DM_VERBOSE,
"{} chunksProcessed={},lastChunkReceived={},chunksExpected={},done={}", this,
chunksProcessed, lastChunkReceived, chunksExpected, doneProcessing);
}
}
} catch (Exception e) {
processException(new ReplyException(
"Error deserializing keys", e));
}
return doneProcessing;
}
@SuppressWarnings("rawtypes")
public Set waitForKeys() {
try {
waitForRepliesUninterruptibly();
} catch (ReplyException e) {
if (e.getCause() instanceof RemoteOperationException) {
if (e.getCause().getCause() instanceof CancelException) {
throw new TransactionDataNodeHasDepartedException("Node departed while fetching keys");
}
}
e.handleCause();
if (!this.lastChunkReceived) {
throw new TransactionException(e);
}
}
return Collections.unmodifiableSet(this.returnValue);
}
}
}