blob: aa85eb6b02f2d1a62e04424a417fdfe0ca0c0cd3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache.partitioned;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import org.apache.logging.log4j.Logger;
import org.apache.geode.CancelException;
import org.apache.geode.DataSerializer;
import org.apache.geode.annotations.Immutable;
import org.apache.geode.cache.Operation;
import org.apache.geode.distributed.internal.ClusterDistributionManager;
import org.apache.geode.distributed.internal.DistributionMessage;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.distributed.internal.ReplyMessage;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.cache.BucketRegion;
import org.apache.geode.internal.cache.EventID;
import org.apache.geode.internal.cache.FilterProfile;
import org.apache.geode.internal.cache.ForceReattemptException;
import org.apache.geode.internal.cache.PartitionedRegion;
import org.apache.geode.internal.cache.RegionEventImpl;
import org.apache.geode.internal.cache.tier.sockets.CacheClientNotifier;
import org.apache.geode.internal.cache.tier.sockets.ClientTombstoneMessage;
import org.apache.geode.internal.cache.tier.sockets.ClientUpdateMessage;
import org.apache.geode.internal.logging.log4j.LogMarker;
import org.apache.geode.internal.serialization.DeserializationContext;
import org.apache.geode.internal.serialization.SerializationContext;
import org.apache.geode.internal.serialization.SerializationVersions;
import org.apache.geode.internal.serialization.Version;
import org.apache.geode.logging.internal.log4j.api.LogService;
/**
* This message class sends tombstone GC information to other PR holders
*
* @since GemFire 7.0
*/
public class PRTombstoneMessage extends PartitionMessageWithDirectReply
implements SerializationVersions {
private static final Logger logger = LogService.getLogger();
@Immutable
private static final Version[] serializationVersions = null;
private Set<Object> keys;
private EventID eventID;
/**
* Empty constructor to satisfy {@link DataSerializer} requirements
*/
public PRTombstoneMessage() {}
public static void send(BucketRegion r, final Set<Object> keys, EventID eventID) {
Set<InternalDistributedMember> recipients =
r.getPartitionedRegion().getRegionAdvisor().adviseAllServersWithInterest();
recipients.removeAll(r.getDistributionAdvisor().adviseReplicates());
if (recipients.size() == 0) {
return;
}
PartitionResponse p = new Response(r.getSystem(), recipients);
PRTombstoneMessage m =
new PRTombstoneMessage(recipients, r.getPartitionedRegion().getPRId(), p, keys, eventID);
m.setTransactionDistributed(r.getCache().getTxManager().isDistributed());
r.getDistributionManager().putOutgoing(m);
try {
p.waitForCacheException();
} catch (ForceReattemptException e) {
// ignore - the member is going away or has destroyed the PR so
// it won't be forwarding anything to clients for the PR
}
}
private PRTombstoneMessage(Set<InternalDistributedMember> recipients, int regionId,
PartitionResponse p, Set<Object> reapedKeys, EventID eventID) {
super(recipients, regionId, p);
this.keys = reapedKeys;
this.eventID = eventID;
}
@Override
protected boolean operateOnPartitionedRegion(final ClusterDistributionManager dm,
PartitionedRegion r, long startTime) throws ForceReattemptException {
if (logger.isTraceEnabled(LogMarker.DM_VERBOSE)) {
logger.trace("PRTombstoneMessage operateOnRegion: {}", r.getFullPath());
}
FilterProfile fp = r.getFilterProfile();
if (this.keys != null && this.keys.size() > 0) { // sanity check
if (fp != null && CacheClientNotifier.singletonHasClientProxies() && this.eventID != null) {
RegionEventImpl regionEvent = new RegionEventImpl(r, Operation.REGION_DESTROY, null, true,
r.getGemFireCache().getMyId());
regionEvent.setLocalFilterInfo(fp.getLocalFilterRouting(regionEvent));
ClientUpdateMessage clientMessage = ClientTombstoneMessage.gc(r, this.keys, this.eventID);
CacheClientNotifier.notifyClients(regionEvent, clientMessage);
}
}
return true;
}
@Override
protected void appendFields(StringBuilder buff) {
super.appendFields(buff);
buff.append("; keys=").append(this.keys.size());
buff.append("; eventID=").append(this.eventID);
}
@Override
public int getDSFID() {
return PR_TOMBSTONE_MESSAGE;
}
@Override
public Version[] getSerializationVersions() {
return serializationVersions;
}
@Override
public void fromData(DataInput in,
DeserializationContext context) throws IOException, ClassNotFoundException {
super.fromData(in, context);
int numKeys = in.readInt();
this.keys = new HashSet<Object>(numKeys);
for (int i = 0; i < numKeys; i++) {
this.keys.add(DataSerializer.readObject(in));
}
this.eventID = (EventID) DataSerializer.readObject(in);
}
@Override
public void toData(DataOutput out,
SerializationContext context) throws IOException {
super.toData(out, context);
out.writeInt(this.keys.size());
for (Object key : keys) {
DataSerializer.writeObject(key, out);
}
DataSerializer.writeObject(this.eventID, out);
}
private static class Response extends PartitionResponse {
// Set<InternalDistributedMember> forceReattemptSenders = new
// HashSet<InternalDistributedMember>();
public Response(InternalDistributedSystem ds, Set recipients) {
super(ds, recipients, false);
}
@Override
public void process(DistributionMessage msg) {
ReplyMessage reply = (ReplyMessage) msg;
if (reply.getException() != null) {
Throwable cause = reply.getException().getCause();
if (cause instanceof ForceReattemptException || cause instanceof CancelException) {
// TODO do we need to resend to these recipients? Might they have clients that won't
// otherwise get
// the GC message?
// this.forceReattemptSenders.add(reply.getSender());
reply.setException(null);
}
}
super.process(reply);
}
}
}