| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more contributor license |
| * agreements. See the NOTICE file distributed with this work for additional information regarding |
| * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. You may obtain a |
| * copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software distributed under the License |
| * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
| * or implied. See the License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| |
| package org.apache.geode.internal.cache; |
| |
| import java.io.DataInput; |
| import java.io.DataOutput; |
| import java.io.IOException; |
| import java.util.Set; |
| |
| import org.apache.logging.log4j.Logger; |
| |
| import org.apache.geode.DataSerializer; |
| import org.apache.geode.cache.CacheException; |
| import org.apache.geode.cache.Operation; |
| import org.apache.geode.distributed.internal.ClusterDistributionManager; |
| import org.apache.geode.distributed.internal.DistributionAdvisor; |
| import org.apache.geode.distributed.internal.DistributionManager; |
| import org.apache.geode.distributed.internal.InternalDistributedSystem; |
| import org.apache.geode.distributed.internal.ReplyException; |
| import org.apache.geode.distributed.internal.ReplyMessage; |
| import org.apache.geode.distributed.internal.ReplyProcessor21; |
| import org.apache.geode.internal.Assert; |
| import org.apache.geode.internal.cache.partitioned.PartitionMessage; |
| import org.apache.geode.internal.cache.partitioned.RegionAdvisor; |
| import org.apache.geode.internal.cache.partitioned.RegionAdvisor.PartitionProfile; |
| import org.apache.geode.internal.logging.log4j.LogMarker; |
| import org.apache.geode.internal.serialization.DeserializationContext; |
| import org.apache.geode.internal.serialization.KnownVersion; |
| import org.apache.geode.internal.serialization.SerializationContext; |
| import org.apache.geode.logging.internal.log4j.api.LogService; |
| |
| /** |
| * This message is sent for two purposes <br> |
| * 1) To destroy the {@link org.apache.geode.internal.cache.PartitionedRegion} for all members |
| * specified (typically sent to all members that have the <code>PartitionedRegion</code> defined.) |
| * <br> |
| * 2) To inform the other nodes that {@link org.apache.geode.internal.cache.PartitionedRegion} is |
| * closed/locally destroyed or cache is closed on a node<br> |
| * This results in updating of the RegionAdvisor of the remote nodes. |
| * |
| * Sending this message should flush all previous {@link org.apache.geode.cache.Region} operations, |
| * which means this operation should not over-ride |
| * {@link org.apache.geode.internal.cache.partitioned.PartitionMessage#getProcessorId()}. It is |
| * critical guarantee delivery of events sent prior to this message. |
| * |
| * A standard {@link ReplyMessage} is used to send the reply, however any exception that it carries |
| * is ignored, preventing interuption after sending this message. |
| * |
| * @since GemFire 5.0 |
| */ |
| public class DestroyPartitionedRegionMessage extends PartitionMessage { |
| private static final Logger logger = LogService.getLogger(); |
| |
| private Object cbArg; |
| |
| /** The specific destroy operation performed on the sender */ |
| private Operation op; |
| |
| /** Serial number of the region being removed */ |
| private int prSerial; |
| |
| /** Serial numbers of the buckets for this region */ |
| private int bucketSerials[]; |
| |
| /** Event ID of the destroy operation created at the origin */ |
| private EventID eventID; |
| |
| @Override |
| public EventID getEventID() { |
| return eventID; |
| } |
| |
| |
| /** |
| * Empty constructor to satisfy {@link DataSerializer} requirements |
| */ |
| public DestroyPartitionedRegionMessage() {} |
| |
| /** |
| * |
| * @param recipients the set of members on which the partitioned region should be destoryed |
| * @param region the partitioned region |
| * @param processor the processor that the reply will use to notify of the reply. |
| * @see #send(Set, PartitionedRegion, RegionEventImpl, int[]) |
| */ |
| private DestroyPartitionedRegionMessage(Set recipients, PartitionedRegion region, |
| ReplyProcessor21 processor, final RegionEventImpl event, int serials[]) { |
| super(recipients, region.getPRId(), processor); |
| this.cbArg = event.getRawCallbackArgument(); |
| this.op = event.getOperation(); |
| this.prSerial = region.getSerialNumber(); |
| Assert.assertTrue(this.prSerial != DistributionAdvisor.ILLEGAL_SERIAL); |
| this.bucketSerials = serials; |
| this.eventID = event.getEventId(); |
| } |
| |
| /** |
| * |
| * @param recipients set of members who have the PartitionedRegion defined. |
| * @param r the PartitionedRegion to destroy on each member |
| * @return the response on which to wait for the confirmation |
| */ |
| public static DestroyPartitionedRegionResponse send(Set recipients, PartitionedRegion r, |
| final RegionEventImpl event, int serials[]) { |
| Assert.assertTrue(recipients != null, "DestroyMessage NULL recipients set"); |
| DestroyPartitionedRegionResponse resp = |
| new DestroyPartitionedRegionResponse(r.getSystem(), recipients); |
| DestroyPartitionedRegionMessage m = |
| new DestroyPartitionedRegionMessage(recipients, r, resp, event, serials); |
| m.setTransactionDistributed(r.getCache().getTxManager().isDistributed()); |
| r.getDistributionManager().putOutgoing(m); |
| return resp; |
| } |
| |
| @Override |
| protected Throwable processCheckForPR(PartitionedRegion pr, |
| DistributionManager distributionManager) { |
| if (pr != null && !pr.getDistributionAdvisor().isInitialized()) { |
| Throwable thr = new ForceReattemptException( |
| String.format("%s : could not find partitioned region with Id %s", |
| distributionManager.getDistributionManagerId(), |
| pr.getRegionIdentifier())); |
| return thr; |
| } |
| return null; |
| } |
| |
| |
| @Override |
| protected boolean operateOnPartitionedRegion(ClusterDistributionManager dm, PartitionedRegion r, |
| long startTime) throws CacheException { |
| if (r == null) { |
| return true; |
| } |
| if (this.op.isLocal()) { |
| // notify the advisor that the sending member has locally destroyed (or closed) the region |
| |
| PartitionProfile pp = r.getRegionAdvisor().getPartitionProfile(getSender()); |
| if (pp == null) { // Fix for bug#36863 |
| return true; |
| } |
| // final Lock isClosingWriteLock = |
| // r.getRegionAdvisor().getPartitionProfile(getSender()).getIsClosingWriteLock(); |
| |
| Assert.assertTrue(this.prSerial != DistributionAdvisor.ILLEGAL_SERIAL); |
| |
| boolean ok = true; |
| // Examine this peer's profile and look at the serial number in that |
| // profile. If we have a newer profile, ignore the request. |
| |
| int oldSerial = pp.getSerialNumber(); |
| if (DistributionAdvisor.isNewerSerialNumber(oldSerial, this.prSerial)) { |
| ok = false; |
| if (logger.isDebugEnabled()) { |
| logger.debug("Not removing region {} serial requested = {}; actual is {}", r.getName(), |
| this.prSerial, r.getSerialNumber()); |
| } |
| } |
| if (ok) { |
| RegionAdvisor ra = r.getRegionAdvisor(); |
| ra.removeIdAndBuckets(this.sender, this.prSerial, this.bucketSerials, !this.op.isClose()); |
| } |
| |
| sendReply(getSender(), getProcessorId(), dm, null, r, startTime); |
| return false; |
| } |
| |
| // If region's isDestroyed flag is true, we can check if local destroy is done or not and if |
| // NOT, we can invoke destroyPartitionedRegionLocally method. |
| if (r.isDestroyed()) { |
| boolean isClose = this.op.isClose(); |
| r.destroyPartitionedRegionLocally(!isClose); |
| return true; |
| } |
| |
| if (logger.isTraceEnabled(LogMarker.DM_VERBOSE)) { |
| logger.trace(LogMarker.DM_VERBOSE, "{} operateOnRegion: {}", getClass().getName(), |
| r.getFullPath()); |
| } |
| RegionEventImpl event = |
| new RegionEventImpl(r, this.op, this.cbArg, true, r.getMyId(), getEventID()); |
| r.basicDestroyRegion(event, false, false, true); |
| |
| return true; |
| } |
| |
| @Override |
| protected void appendFields(StringBuilder buff) { |
| super.appendFields(buff); |
| buff.append("; cbArg=").append(this.cbArg).append("; op=").append(this.op); |
| buff.append("; prSerial=" + prSerial); |
| buff.append("; bucketSerials (" + bucketSerials.length + ")=("); |
| for (int i = 0; i < bucketSerials.length; i++) { |
| buff.append(Integer.toString(bucketSerials[i])); |
| if (i < bucketSerials.length - 1) { |
| buff.append(", "); |
| } |
| } |
| } |
| |
| @Override |
| public int getDSFID() { |
| return DESTROY_PARTITIONED_REGION_MESSAGE; |
| } |
| |
| @Override |
| public KnownVersion[] getSerializationVersions() { |
| return new KnownVersion[] {KnownVersion.GEODE_1_9_0}; |
| } |
| |
| |
| @Override |
| public void fromData(DataInput in, |
| DeserializationContext context) throws IOException, ClassNotFoundException { |
| fromDataPre_GEODE_1_9_0_0(in, context); |
| this.eventID = DataSerializer.readObject(in); |
| |
| } |
| |
| public void fromDataPre_GEODE_1_9_0_0(DataInput in, DeserializationContext context) |
| throws IOException, ClassNotFoundException { |
| super.fromData(in, context); |
| this.cbArg = DataSerializer.readObject(in); |
| this.op = Operation.fromOrdinal(in.readByte()); |
| this.prSerial = in.readInt(); |
| int len = in.readInt(); |
| this.bucketSerials = new int[len]; |
| for (int i = 0; i < len; i++) { |
| this.bucketSerials[i] = in.readInt(); |
| } |
| } |
| |
| @Override |
| public void toData(DataOutput out, |
| SerializationContext context) throws IOException { |
| toDataPre_GEODE_1_9_0_0(out, context); |
| DataSerializer.writeObject(this.eventID, out); |
| } |
| |
| public void toDataPre_GEODE_1_9_0_0(DataOutput out, SerializationContext context) |
| throws IOException { |
| super.toData(out, context); |
| DataSerializer.writeObject(this.cbArg, out); |
| out.writeByte(this.op.ordinal); |
| out.writeInt(this.prSerial); |
| out.writeInt(this.bucketSerials.length); |
| for (int i = 0; i < this.bucketSerials.length; i++) { |
| out.writeInt(this.bucketSerials[i]); |
| } |
| } |
| |
| |
| |
| /** |
| * The response on which to wait for all the replies. This response ignores any exceptions |
| * received from the "far side" |
| * |
| * @since GemFire 5.0 |
| */ |
| public static class DestroyPartitionedRegionResponse extends ReplyProcessor21 { |
| public DestroyPartitionedRegionResponse(InternalDistributedSystem system, Set initMembers) { |
| super(system, initMembers); |
| } |
| |
| @Override |
| protected synchronized void processException(ReplyException ex) { |
| // retry on ForceReattempt in case the region is still being initialized |
| if (ex.getRootCause() instanceof ForceReattemptException) { |
| super.processException(ex); |
| } |
| // other errors are ignored |
| else if (logger.isDebugEnabled()) { |
| logger.debug("DestroyRegionResponse ignoring exception", ex); |
| } |
| } |
| } |
| |
| } |