| /*========================================================================= |
| * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved. |
| * This product is protected by U.S. and international copyright |
| * and intellectual property laws. Pivotal products are covered by |
| * more patents listed at http://www.pivotal.io/patents. |
| *======================================================================== |
| */ |
| package com.gemstone.gemfire.internal.cache; |
| |
| import com.gemstone.gemfire.internal.cache.versions.ConcurrentCacheModificationException; |
| import com.gemstone.gemfire.internal.i18n.LocalizedStrings; |
| import com.gemstone.gemfire.*; |
| import com.gemstone.gemfire.cache.*; |
| import com.gemstone.gemfire.distributed.DistributedMember; |
| import java.io.*; |
| |
| /** |
| * Represents a single operation that can be queued for reliable delivery. |
| * Instances are owned in the context of a region. |
| * |
| * @author Darrel Schneider |
| * @since 5.0 |
| */ |
| public class QueuedOperation |
| { |
| private final Operation op; |
| |
| private final Object key; // may be null |
| |
| private final byte[] value; // may be null |
| |
| private final Object valueObj; // may be null |
| |
| /** |
| * Deserialization policies defined in AbstractUpdateOperation |
| * @see com.gemstone.gemfire.internal.cache.AbstractUpdateOperation |
| */ |
| private final byte deserializationPolicy; |
| |
| private final Object cbArg; // may be null |
| |
| /** |
| * Creates an instance of a queued operation with the given data. |
| */ |
| public QueuedOperation(Operation op, Object key, byte[] value, |
| Object valueObj, byte deserializationPolicy, Object cbArg) { |
| this.op = op; |
| this.key = key; |
| this.value = value; |
| this.valueObj = valueObj; |
| this.deserializationPolicy = deserializationPolicy; |
| this.cbArg = cbArg; |
| } |
| |
| public void process(LocalRegion lr, DistributedMember src, long lastMod) |
| { |
| if (this.op.isRegion()) { |
| // it is a region operation |
| RegionEventImpl re = new RegionEventImpl(lr, this.op, this.cbArg, true, |
| src); |
| //re.setQueued(true); |
| if (this.op.isRegionInvalidate()) { |
| lr.basicInvalidateRegion(re); |
| } |
| else if (this.op == Operation.REGION_CLEAR) { |
| lr.cmnClearRegion(re, false/* cacheWrite */, false/*useRVV*/); |
| } |
| else { |
| throw new IllegalStateException(LocalizedStrings.QueuedOperation_THE_0_SHOULD_NOT_HAVE_BEEN_QUEUED.toLocalizedString(this.op)); |
| } |
| } |
| else { |
| // it is an entry operation |
| //TODO :EventID should be passed from the sender & should be reused here |
| EntryEventImpl ee = EntryEventImpl.create( |
| lr, this.op, this.key, null, |
| this.cbArg, true, src); |
| try { |
| //ee.setQueued(true); |
| if (this.op.isCreate() || this.op.isUpdate()) { |
| UpdateOperation.UpdateMessage.setNewValueInEvent(this.value, |
| this.valueObj, |
| ee, |
| this.deserializationPolicy); |
| try { |
| long time = lastMod; |
| if (ee.getVersionTag() != null) { |
| time = ee.getVersionTag().getVersionTimeStamp(); |
| } |
| if (AbstractUpdateOperation.doPutOrCreate(lr, ee, time)) { |
| // am I done? |
| } |
| } catch (ConcurrentCacheModificationException e) { |
| // operation was rejected by the cache's concurrency control mechanism as being old |
| } |
| } |
| else if (this.op.isDestroy()) { |
| ee.setOldValueFromRegion(); |
| try { |
| lr.basicDestroy(ee, |
| false, |
| null); // expectedOldValue |
| // ???:ezoerner:20080815 |
| // can a remove(key, value) operation be |
| // queued? |
| } |
| catch (ConcurrentCacheModificationException e) { |
| // operation was rejected by the cache's concurrency control mechanism as being old |
| } |
| catch (EntryNotFoundException ignore) { |
| } |
| catch (CacheWriterException e) { |
| throw new Error(LocalizedStrings.QueuedOperation_CACHEWRITER_SHOULD_NOT_BE_CALLED.toLocalizedString(), e); |
| } |
| catch (TimeoutException e) { |
| throw new Error(LocalizedStrings.QueuedOperation_DISTRIBUTEDLOCK_SHOULD_NOT_BE_ACQUIRED.toLocalizedString(), e); |
| } |
| } |
| else if (this.op.isInvalidate()) { |
| ee.setOldValueFromRegion(); |
| boolean forceNewEntry = lr.dataPolicy.withReplication() |
| && !lr.isInitialized(); |
| boolean invokeCallbacks = lr.isInitialized(); |
| try { |
| lr.basicInvalidate(ee, invokeCallbacks, forceNewEntry); |
| } |
| catch (ConcurrentCacheModificationException e) { |
| // operation was rejected by the cache's concurrency control mechanism as being old |
| } |
| catch (EntryNotFoundException ignore) { |
| } |
| } |
| else { |
| throw new IllegalStateException(LocalizedStrings.QueuedOperation_THE_0_SHOULD_NOT_HAVE_BEEN_QUEUED.toLocalizedString(this.op)); |
| } |
| } finally { |
| ee.release(); |
| } |
| } |
| } |
| |
| public static QueuedOperation createFromData(DataInput in) |
| throws IOException, ClassNotFoundException |
| { |
| Operation op = Operation.fromOrdinal(in.readByte()); |
| Object key = null; |
| byte[] value = null; |
| Object valueObj = null; |
| byte deserializationPolicy = DistributedCacheOperation.DESERIALIZATION_POLICY_NONE; |
| Object cbArg = DataSerializer.readObject(in); |
| if (op.isEntry()) { |
| key = DataSerializer.readObject(in); |
| if (op.isUpdate() || op.isCreate()) { |
| deserializationPolicy = in.readByte(); |
| if (deserializationPolicy == |
| DistributedCacheOperation.DESERIALIZATION_POLICY_EAGER) { |
| valueObj = DataSerializer.readObject(in); |
| } |
| else { |
| value = DataSerializer.readByteArray(in); |
| } |
| } |
| } |
| return new QueuedOperation(op, key, value, valueObj, deserializationPolicy, |
| cbArg); |
| } |
| |
| /** |
| * Post 7.1, if changes are made to this method make sure that it is backwards |
| * compatible by creating toDataPreXX methods. Also make sure that the callers |
| * to this method are backwards compatible by creating toDataPreXX methods for |
| * them even if they are not changed. <br> |
| * Callers for this method are: <br> |
| * SendQueueMessage.toData(DataOutput) <br> |
| */ |
| public void toData(DataOutput out) throws IOException |
| { |
| out.writeByte(this.op.ordinal); |
| DataSerializer.writeObject(this.cbArg, out); |
| if (this.op.isEntry()) { |
| DataSerializer.writeObject(this.key, out); |
| if (this.op.isUpdate() || this.op.isCreate()) { |
| out.writeByte(this.deserializationPolicy); |
| if (this.deserializationPolicy != |
| DistributedCacheOperation.DESERIALIZATION_POLICY_EAGER) { |
| DataSerializer.writeByteArray(this.value, out); |
| } |
| else { |
| DataSerializer.writeObject(this.valueObj, out); |
| } |
| } |
| } |
| } |
| } |