/*
 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
 * agreements. See the NOTICE file distributed with this work for additional information regarding
 * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance with the License. You may obtain a
 * copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License
 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
 * or implied. See the License for the specific language governing permissions and limitations under
 * the License.
 */
package org.apache.geode.internal.cache;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Set;

import org.apache.logging.log4j.Logger;

import org.apache.geode.DataSerializer;
import org.apache.geode.cache.CacheEvent;
import org.apache.geode.cache.DataPolicy;
import org.apache.geode.cache.EntryNotFoundException;
import org.apache.geode.cache.Operation;
import org.apache.geode.cache.RegionAttributes;
import org.apache.geode.cache.Scope;
import org.apache.geode.cache.persistence.PersistentReplicatesOfflineException;
import org.apache.geode.cache.query.internal.cq.CqService;
import org.apache.geode.distributed.internal.DirectReplyProcessor;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.InternalDataSerializer;
import org.apache.geode.internal.cache.DistributedPutAllOperation.EntryVersionsList;
import org.apache.geode.internal.cache.FilterRoutingInfo.FilterInfo;
import org.apache.geode.internal.cache.ha.ThreadIdentifier;
import org.apache.geode.internal.cache.partitioned.RemoveAllPRMessage;
import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID;
import org.apache.geode.internal.cache.tier.sockets.VersionedObjectList;
import org.apache.geode.internal.cache.tx.RemoteRemoveAllMessage;
import org.apache.geode.internal.cache.versions.ConcurrentCacheModificationException;
import org.apache.geode.internal.cache.versions.DiskVersionTag;
import org.apache.geode.internal.cache.versions.VersionSource;
import org.apache.geode.internal.cache.versions.VersionTag;
import org.apache.geode.internal.offheap.annotations.Released;
import org.apache.geode.internal.offheap.annotations.Retained;
import org.apache.geode.internal.offheap.annotations.Unretained;
import org.apache.geode.internal.serialization.DataSerializableFixedID;
import org.apache.geode.internal.serialization.DeserializationContext;
import org.apache.geode.internal.serialization.SerializationContext;
import org.apache.geode.logging.internal.log4j.api.LogService;

/**
 * Handles distribution of a Region.removeAll operation.
 *
 * TODO: extend DistributedCacheOperation instead of AbstractUpdateOperation
 *
 * @since GemFire 8.1
 */
public class DistributedRemoveAllOperation extends AbstractUpdateOperation {
  private static final Logger logger = LogService.getLogger();

  /**
   * Release is called by freeOffHeapResources.
   */
  @Retained
  protected final RemoveAllEntryData[] removeAllData;

  public int removeAllDataSize;

  protected boolean isBridgeOp = false;

  static final byte USED_FAKE_EVENT_ID = 0x01;
  static final byte NOTIFY_ONLY = 0x02;
  static final byte FILTER_ROUTING = 0x04;
  static final byte VERSION_TAG = 0x08;
  static final byte POSDUP = 0x10;
  static final byte PERSISTENT_TAG = 0x20;
  static final byte HAS_CALLBACKARG = 0x40;
  static final byte HAS_TAILKEY = (byte) 0x80;

  public DistributedRemoveAllOperation(CacheEvent event, int size, boolean isBridgeOp) {
    super(event, ((EntryEventImpl) event).getEventTime(0L));
    this.removeAllData = new RemoveAllEntryData[size];
    this.removeAllDataSize = 0;
    this.isBridgeOp = isBridgeOp;
  }

  /**
   * return if the operation is bridge operation
   */
  public boolean isBridgeOperation() {
    return this.isBridgeOp;
  }

  public RemoveAllEntryData[] getRemoveAllEntryData() {
    return removeAllData;
  }

  public void setRemoveAllEntryData(RemoveAllEntryData[] removeAllEntryData) {
    for (int i = 0; i < removeAllEntryData.length; i++) {
      removeAllData[i] = removeAllEntryData[i];
    }
    this.removeAllDataSize = removeAllEntryData.length;
  }

  /**
   * Add an entry that this removeAll operation should distribute.
   */
  public void addEntry(RemoveAllEntryData removeAllEntry) {
    this.removeAllData[this.removeAllDataSize] = removeAllEntry;
    this.removeAllDataSize += 1;
  }

  /**
   * Add an entry that this removeAll operation should distribute.
   */
  public void addEntry(EntryEventImpl ev) {
    this.removeAllData[this.removeAllDataSize] = new RemoveAllEntryData(ev);
    this.removeAllDataSize += 1;
  }

  /**
   * Add an entry that this removeAll operation should distribute. This method is for a special
   * case: the callback will be called after this in hasSeenEvent() case, so we should change the
   * status beforehand
   */
  public void addEntry(EntryEventImpl ev, boolean newCallbackInvoked) {
    this.removeAllData[this.removeAllDataSize] = new RemoveAllEntryData(ev);
    this.removeAllData[this.removeAllDataSize].setCallbacksInvoked(newCallbackInvoked);
    this.removeAllDataSize += 1;
  }

  /**
   * Add an entry for PR bucket's msg.
   *
   * @param ev event to be added
   * @param bucketId message is for this bucket
   */
  public void addEntry(EntryEventImpl ev, Integer bucketId) {
    this.removeAllData[this.removeAllDataSize] = new RemoveAllEntryData(ev);
    this.removeAllData[this.removeAllDataSize].setBucketId(bucketId);
    this.removeAllDataSize += 1;
  }

  /**
   * set using fake thread id
   *
   * @param status whether the entry is using fake event id
   */
  public void setUseFakeEventId(boolean status) {
    for (int i = 0; i < removeAllDataSize; i++) {
      removeAllData[i].setUsedFakeEventId(status);
    }
  }

  /**
   * In the originating cache, this returns an iterator on the list of events caused by the
   * removeAll operation. This is cached for listener notification purposes. The iterator is
   * guaranteed to return events in the order they are present in putAllData[]
   */
  public Iterator eventIterator() {
    return new Iterator() {
      int position = 0;

      @Override
      public boolean hasNext() {
        return DistributedRemoveAllOperation.this.removeAllDataSize > position;
      };

      @Override
      @Unretained
      public Object next() {
        @Unretained
        EntryEventImpl ev = getEventForPosition(position);
        position++;
        return ev;
      };

      @Override
      public void remove() {
        throw new UnsupportedOperationException();
      };
    };
  }

  public void freeOffHeapResources() {
    // I do not use eventIterator here because it forces the lazy creation of EntryEventImpl by
    // calling getEventForPosition.
    for (int i = 0; i < this.removeAllDataSize; i++) {
      RemoveAllEntryData entry = this.removeAllData[i];
      if (entry != null && entry.event != null) {
        entry.event.release();
      }
    }
  }

  @Unretained
  public EntryEventImpl getEventForPosition(int position) {
    RemoveAllEntryData entry = this.removeAllData[position];
    if (entry == null) {
      return null;
    }
    if (entry.event != null) {
      return entry.event;
    }
    LocalRegion region = (LocalRegion) this.event.getRegion();
    // owned by this.removeAllData once entry.event = ev is done
    @Retained
    EntryEventImpl ev = EntryEventImpl.create(region, entry.getOp(), entry.getKey(),
        null/* value */, this.event.getCallbackArgument(), false /* originRemote */,
        this.event.getDistributedMember(), this.event.isGenerateCallbacks(), entry.getEventID());
    boolean returnedEv = false;
    try {
      ev.setPossibleDuplicate(entry.isPossibleDuplicate());
      ev.setIsRedestroyedEntry(entry.getRedestroyedEntry());
      if (entry.versionTag != null && region.getConcurrencyChecksEnabled()) {
        VersionSource id = entry.versionTag.getMemberID();
        if (id != null) {
          entry.versionTag.setMemberID(ev.getRegion().getVersionVector().getCanonicalId(id));
        }
        ev.setVersionTag(entry.versionTag);
      }

      entry.event = ev;
      returnedEv = true;
      ev.setOldValue(entry.getOldValue());
      CqService cqService = region.getCache().getCqService();
      if (cqService.isRunning() && !entry.getOp().isCreate() && !ev.hasOldValue()) {
        ev.setOldValueForQueryProcessing();
      }
      ev.setInvokePRCallbacks(!entry.isNotifyOnly());
      if (getBaseEvent().getContext() != null) {
        ev.setContext(getBaseEvent().getContext());
      }
      ev.callbacksInvoked(entry.isCallbacksInvoked());
      ev.setTailKey(entry.getTailKey());
      return ev;
    } finally {
      if (!returnedEv) {
        ev.release();
      }
    }
  }

  public EntryEventImpl getBaseEvent() {
    return getEvent();
  }

  /**
   * Data that represents a single entry being RemoveAll'd.
   */
  public static class RemoveAllEntryData {

    final Object key;

    private final Object oldValue;

    private final Operation op;

    private EventID eventID;

    transient EntryEventImpl event;

    private Integer bucketId = Integer.valueOf(-1);

    protected transient boolean callbacksInvoked = false;

    public FilterRoutingInfo filterRouting;

    // One flags byte for all booleans
    protected byte flags = 0x00;

    // TODO: Yogesh, this should be intialized and sent on wire only when
    // parallel wan is enabled
    private Long tailKey = 0L;

    public VersionTag versionTag;

    transient boolean inhibitDistribution;

    transient boolean redestroyedEntry;

    /**
     * Constructor to use when preparing to send putall data out
     */
    public RemoveAllEntryData(EntryEventImpl event) {

      this.key = event.getKey();
      Object oldValue = event.getRawOldValue();

      if (oldValue == Token.NOT_AVAILABLE || Token.isRemoved(oldValue)) {
        this.oldValue = null;
      } else {
        this.oldValue = oldValue;
      }

      this.op = event.getOperation();
      this.eventID = event.getEventId();
      this.tailKey = event.getTailKey();
      this.versionTag = event.getVersionTag();

      setNotifyOnly(!event.getInvokePRCallbacks());
      setCallbacksInvoked(event.callbacksInvoked());
      setPossibleDuplicate(event.isPossibleDuplicate());
      setInhibitDistribution(event.getInhibitDistribution());
      setRedestroyedEntry(event.getIsRedestroyedEntry());
    }

    /**
     * Constructor to use when receiving a putall from someone else
     */
    public RemoveAllEntryData(DataInput in, EventID baseEventID, int idx,
        DeserializationContext context) throws IOException, ClassNotFoundException {
      this.key = context.getDeserializer().readObject(in);
      this.oldValue = null;
      this.op = Operation.fromOrdinal(in.readByte());
      this.flags = in.readByte();
      if ((this.flags & FILTER_ROUTING) != 0) {
        this.filterRouting = (FilterRoutingInfo) context.getDeserializer().readObject(in);
      }
      if ((this.flags & VERSION_TAG) != 0) {
        boolean persistentTag = (this.flags & PERSISTENT_TAG) != 0;
        this.versionTag = VersionTag.create(persistentTag, in);
      }
      if (isUsedFakeEventId()) {
        this.eventID = new EventID();
        InternalDataSerializer.invokeFromData(this.eventID, in);
      } else {
        this.eventID = new EventID(baseEventID, idx);
      }
      if ((this.flags & HAS_TAILKEY) != 0) {
        this.tailKey = DataSerializer.readLong(in);
      }
    }

    @Override
    public String toString() {
      StringBuilder sb = new StringBuilder(50);
      sb.append("(").append(getKey()).append(",").append(getOldValue());
      if (this.bucketId > 0) {
        sb.append(", b").append(this.bucketId);
      }
      if (versionTag != null) {
        sb.append(",v").append(versionTag.getEntryVersion())
            .append(",rv=" + versionTag.getRegionVersion());
      }
      if (filterRouting != null) {
        sb.append(", ").append(filterRouting);
      }
      sb.append(")");
      return sb.toString();
    }

    void setSender(InternalDistributedMember sender) {
      if (this.versionTag != null) {
        this.versionTag.replaceNullIDs(sender);
      }
    }

    /**
     * Used to serialize this instances data to <code>out</code>. If changes are made to this method
     * make sure that it is backwards compatible by creating toDataPreXX methods. Also make sure
     * that the callers to this method are backwards compatible by creating toDataPreXX methods for
     * them even if they are not changed. <br>
     * Callers for this method are: <br>
     * {@link DataSerializableFixedID#toData(DataOutput, SerializationContext)} <br>
     * {@link DataSerializableFixedID#toData(DataOutput, SerializationContext)} <br>
     * {@link DataSerializableFixedID#toData(DataOutput, SerializationContext)} <br>
     */
    public void serializeTo(final DataOutput out,
        SerializationContext context) throws IOException {
      Object key = this.key;
      context.getSerializer().writeObject(key, out);

      out.writeByte(this.op.ordinal);
      byte bits = this.flags;
      if (this.filterRouting != null)
        bits |= FILTER_ROUTING;
      if (this.versionTag != null) {
        bits |= VERSION_TAG;
        if (this.versionTag instanceof DiskVersionTag) {
          bits |= PERSISTENT_TAG;
        }
      }
      // TODO: Yogesh, this should be conditional,
      // make sure that we sent it on wire only
      // when parallel wan is enabled
      bits |= HAS_TAILKEY;
      out.writeByte(bits);

      if (this.filterRouting != null) {
        context.getSerializer().writeObject(this.filterRouting, out);
      }
      if (this.versionTag != null) {
        InternalDataSerializer.invokeToData(this.versionTag, out);
      }
      if (isUsedFakeEventId()) {
        // fake event id should be serialized
        InternalDataSerializer.invokeToData(this.eventID, out);
      }
      // TODO: Yogesh, this should be conditional,
      // make sure that we sent it on wire only
      // when parallel wan is enabled
      DataSerializer.writeLong(this.tailKey, out);
    }

    /**
     * Returns the key
     */
    public Object getKey() {
      return this.key;
    }

    /**
     * Returns the old value
     */
    public Object getOldValue() {
      return this.oldValue;
    }

    public Long getTailKey() {
      return this.tailKey;
    }

    public void setTailKey(Long key) {
      this.tailKey = key;
    }

    /**
     * Returns the operation
     */
    public Operation getOp() {
      return this.op;
    }

    public EventID getEventID() {
      return this.eventID;
    }

    /**
     * change event id for the entry
     *
     * @param eventId new event id
     */
    public void setEventId(EventID eventId) {
      this.eventID = eventId;
    }

    /**
     * change bucket id for the entry
     *
     * @param bucketId new bucket id
     */
    public void setBucketId(Integer bucketId) {
      this.bucketId = bucketId;
    }

    /**
     * get bucket id for the entry
     *
     * @return bucket id
     */
    public Integer getBucketId() {
      return this.bucketId;
    }

    /**
     * change event id into fake event id The algorithm is to change the threadid into
     * bucketid*MAX_THREAD_PER_CLIENT+oldthreadid. So from the log, we can derive the original
     * thread id.
     *
     * @return wether current event id is fake or not new bucket id
     */
    public boolean setFakeEventID() {
      if (bucketId.intValue() < 0) {
        return false;
      }

      if (!isUsedFakeEventId()) {
        // assign a fake big thread id. bucket id starts from 0. In order to distinguish
        // with other read thread, let bucket id starts from 1 in fake thread id
        long threadId = ThreadIdentifier.createFakeThreadIDForBulkOp(bucketId.intValue(),
            eventID.getThreadID());
        this.eventID = new EventID(eventID.getMembershipID(), threadId, eventID.getSequenceID());
        this.setUsedFakeEventId(true);
      }
      return true;
    }

    public boolean isUsedFakeEventId() {
      return (flags & USED_FAKE_EVENT_ID) != 0;
    }

    public void setUsedFakeEventId(boolean usedFakeEventId) {
      if (usedFakeEventId) {
        flags |= USED_FAKE_EVENT_ID;
      } else {
        flags &= ~(USED_FAKE_EVENT_ID);
      }
    }

    public boolean isNotifyOnly() {
      return (flags & NOTIFY_ONLY) != 0;
    }

    public void setNotifyOnly(boolean notifyOnly) {
      if (notifyOnly) {
        flags |= NOTIFY_ONLY;
      } else {
        flags &= ~(NOTIFY_ONLY);
      }
    }

    boolean isPossibleDuplicate() {
      return (this.flags & POSDUP) != 0;
    }

    public void setPossibleDuplicate(boolean possibleDuplicate) {
      if (possibleDuplicate) {
        flags |= POSDUP;
      } else {
        flags &= ~(POSDUP);
      }
    }

    public boolean isInhibitDistribution() {
      return this.inhibitDistribution;
    }

    public void setInhibitDistribution(boolean inhibitDistribution) {
      this.inhibitDistribution = inhibitDistribution;
    }

    public boolean getRedestroyedEntry() {
      return redestroyedEntry;
    }

    public void setRedestroyedEntry(boolean redestroyedEntry) {
      this.redestroyedEntry = redestroyedEntry;
    }

    public boolean isCallbacksInvoked() {
      return this.callbacksInvoked;
    }

    public void setCallbacksInvoked(boolean callbacksInvoked) {
      this.callbacksInvoked = callbacksInvoked;
    }
  }

  @Override
  protected FilterRoutingInfo getRecipientFilterRouting(Set cacheOpRecipients) {
    // for removeAll, we need to determine the routing information for each event and
    // create a consolidated routing object representing all events that can be
    // used for distribution
    CacheDistributionAdvisor advisor;
    LocalRegion region = (LocalRegion) this.event.getRegion();
    if (region instanceof PartitionedRegion) {
      advisor = ((PartitionedRegion) region).getCacheDistributionAdvisor();
    } else if (region.isUsedForPartitionedRegionBucket()) {
      advisor = ((BucketRegion) region).getPartitionedRegion().getCacheDistributionAdvisor();
    } else {
      advisor = ((DistributedRegion) region).getCacheDistributionAdvisor();
    }
    FilterRoutingInfo consolidated = new FilterRoutingInfo();
    for (int i = 0; i < this.removeAllData.length; i++) {
      @Unretained
      EntryEventImpl ev = getEventForPosition(i);
      if (ev != null) {
        FilterRoutingInfo eventRouting = advisor.adviseFilterRouting(ev, cacheOpRecipients);
        if (eventRouting != null) {
          consolidated.addFilterInfo(eventRouting);
        }
        removeAllData[i].filterRouting = eventRouting;
      }
    }
    // we need to create routing information for each PUT event
    return consolidated;
  }


  @Override
  protected FilterInfo getLocalFilterRouting(FilterRoutingInfo frInfo) {
    FilterProfile fp = getRegion().getFilterProfile();
    if (fp == null) {
      return null;
    }

    // this will set the local FilterInfo in the events
    if (this.removeAllData != null && this.removeAllData.length > 0) {
      fp.getLocalFilterRoutingForRemoveAllOp(this, this.removeAllData);
    }

    return null;
  }

  @Override
  protected CacheOperationMessage createMessage() {
    EntryEventImpl event = getBaseEvent();
    RemoveAllMessage msg = new RemoveAllMessage();
    msg.eventId = event.getEventId();
    msg.context = event.getContext();
    return msg;
  }

  /**
   * Create RemoveAllPRMessage for notify only (to adjunct nodes)
   *
   * @param bucketId create message to send to this bucket
   */
  public RemoveAllPRMessage createPRMessagesNotifyOnly(int bucketId) {
    final EntryEventImpl event = getBaseEvent();
    RemoveAllPRMessage prMsg = new RemoveAllPRMessage(bucketId, removeAllDataSize, true,
        event.isPossibleDuplicate(), !event.isGenerateCallbacks(), event.getCallbackArgument());
    if (event.getContext() != null) {
      prMsg.setBridgeContext(event.getContext());
    }

    // will not recover event id here
    for (int i = 0; i < removeAllDataSize; i++) {
      prMsg.addEntry(removeAllData[i]);
    }

    return prMsg;
  }

  /**
   * Create RemoveAllPRMessages for primary buckets out of this op
   *
   * @return a HashMap contain RemoveAllPRMessages, key is bucket id
   */
  public HashMap<Integer, RemoveAllPRMessage> createPRMessages() {
    // getFilterRecipients(Collections.EMPTY_SET); // establish filter recipient routing information
    HashMap<Integer, RemoveAllPRMessage> prMsgMap = new HashMap<Integer, RemoveAllPRMessage>();
    final EntryEventImpl event = getBaseEvent();

    for (int i = 0; i < removeAllDataSize; i++) {
      Integer bucketId = removeAllData[i].getBucketId();
      RemoveAllPRMessage prMsg = prMsgMap.get(bucketId);
      if (prMsg == null) {
        prMsg = new RemoveAllPRMessage(bucketId.intValue(), removeAllDataSize, false,
            event.isPossibleDuplicate(), !event.isGenerateCallbacks(), event.getCallbackArgument());
        prMsg
            .setTransactionDistributed(event.getRegion().getCache().getTxManager().isDistributed());

        // set dpao's context(original sender) into each PutAllMsg
        // dpao's event's context could be null if it's P2P putAll in PR
        if (event.getContext() != null) {
          prMsg.setBridgeContext(event.getContext());
        }
      }

      // Modify the event id, assign new thread id and new sequence id
      // We have to set fake event id here, because we cannot derive old event id from baseId+idx as
      // we
      // did in DR's PutAllMessage.
      removeAllData[i].setFakeEventID();
      // we only save the reference in prMsg. No duplicate copy
      prMsg.addEntry(removeAllData[i]);
      prMsgMap.put(bucketId, prMsg);
    }
    return prMsgMap;
  }

  @Override
  protected void initMessage(CacheOperationMessage msg, DirectReplyProcessor proc) {
    super.initMessage(msg, proc);
    RemoveAllMessage m = (RemoveAllMessage) msg;

    // if concurrency checks are enabled and this is not a replicated
    // region we need to see if any of the entries have no versions and,
    // if so, cull them out and send a 1-hop message to a replicate that
    // can generate a version for the operation

    RegionAttributes attr = this.event.getRegion().getAttributes();
    if (attr.getConcurrencyChecksEnabled() && !attr.getDataPolicy().withReplication()
        && attr.getScope() != Scope.GLOBAL) {
      if (attr.getDataPolicy() == DataPolicy.EMPTY) {
        // all entries are without version tags
        boolean success = RemoteRemoveAllMessage.distribute((EntryEventImpl) this.event,
            this.removeAllData, this.removeAllDataSize);
        if (success) {
          m.callbackArg = this.event.getCallbackArgument();
          m.removeAllData = new RemoveAllEntryData[0];
          m.removeAllDataSize = 0;
          m.skipCallbacks = !event.isGenerateCallbacks();

          return;

        } else if (!getRegion().getGenerateVersionTag()) {
          // Fix for #45934. We can't continue if we need versions and we failed
          // to distribute versionless entries.
          throw new PersistentReplicatesOfflineException();
        }
      } else {
        // some entries may have Create ops - these will not have version tags
        RemoveAllEntryData[] versionless = selectVersionlessEntries();
        if (logger.isTraceEnabled()) {
          logger.trace("Found these versionless entries: {}", Arrays.toString(versionless));
        }
        if (versionless.length > 0) {
          boolean success = RemoteRemoveAllMessage.distribute((EntryEventImpl) this.event,
              versionless, versionless.length);
          if (success) {
            versionless = null;
            RemoveAllEntryData[] versioned = selectVersionedEntries();
            if (logger.isTraceEnabled()) {
              logger.trace("Found these remaining versioned entries: {}",
                  Arrays.toString(versioned));
            }
            m.callbackArg = this.event.getCallbackArgument();
            m.removeAllData = versioned;
            m.removeAllDataSize = versioned.length;
            m.skipCallbacks = !event.isGenerateCallbacks();
            return;

          } else if (!getRegion().getGenerateVersionTag()) {
            // Fix for #45934. We can't continue if we need versions and we failed
            // to distribute versionless entries.
            throw new PersistentReplicatesOfflineException();
          }
        } else {
          if (logger.isDebugEnabled()) {
            logger.debug("All entries have versions, so using normal DPAO message");
          }
        }
      }
    }
    m.callbackArg = this.event.getCallbackArgument();
    m.removeAllData = this.removeAllData;
    m.removeAllDataSize = this.removeAllDataSize;
    m.skipCallbacks = !event.isGenerateCallbacks();
  }


  @Override
  protected boolean shouldAck() {
    // bug #45704 - RemotePutAllOp's DPAO in another server conflicts with lingering DPAO from same
    // thread, so
    // we require an ACK if concurrency checks are enabled to make sure that the previous op has
    // finished first.
    return super.shouldAck() || getRegion().getConcurrencyChecksEnabled();
  }

  private RemoveAllEntryData[] selectVersionlessEntries() {
    int resultSize = this.removeAllData.length;
    for (int i = 0; i < this.removeAllData.length; i++) {
      RemoveAllEntryData p = this.removeAllData[i];
      if (p == null || p.isInhibitDistribution()) {
        resultSize--;
      } else if (p.versionTag != null && p.versionTag.hasValidVersion()) {
        resultSize--;
      }
    }
    RemoveAllEntryData[] result = new RemoveAllEntryData[resultSize];
    int ri = 0;
    for (int i = 0; i < this.removeAllData.length; i++) {
      RemoveAllEntryData p = this.removeAllData[i];
      if (p == null || p.isInhibitDistribution()) {
        continue; // skip blanks
      } else if (p.versionTag != null && p.versionTag.hasValidVersion()) {
        continue; // skip versioned
      }
      // what remains is versionless
      result[ri++] = p;
    }
    return result;
  }

  private RemoveAllEntryData[] selectVersionedEntries() {
    int resultSize = 0;
    for (int i = 0; i < this.removeAllData.length; i++) {
      RemoveAllEntryData p = this.removeAllData[i];
      if (p == null || p.isInhibitDistribution()) {
        continue; // skip blanks
      } else if (p.versionTag != null && p.versionTag.hasValidVersion()) {
        resultSize++;
      }
    }
    RemoveAllEntryData[] result = new RemoveAllEntryData[resultSize];
    int ri = 0;
    for (int i = 0; i < this.removeAllData.length; i++) {
      RemoveAllEntryData p = this.removeAllData[i];
      if (p == null || p.isInhibitDistribution()) {
        continue; // skip blanks
      } else if (p.versionTag != null && p.versionTag.hasValidVersion()) {
        result[ri++] = p;
      }
    }
    return result;
  }

  /**
   * version tags are gathered from local operations and remote operation responses. This method
   * gathers all of them and stores them in the given list.
   *
   */
  protected void fillVersionedObjectList(VersionedObjectList list) {
    for (RemoveAllEntryData entry : this.removeAllData) {
      if (entry.versionTag != null) {
        list.addKeyAndVersion(entry.key, entry.versionTag);
      }
    }
  }


  public static class RemoveAllMessage extends AbstractUpdateMessage // TODO extend
                                                                     // CacheOperationMessage
                                                                     // instead
  {

    protected RemoveAllEntryData[] removeAllData;

    protected int removeAllDataSize;

    protected transient ClientProxyMembershipID context;

    protected boolean skipCallbacks;

    protected EventID eventId = null;

    protected static final short HAS_BRIDGE_CONTEXT = UNRESERVED_FLAGS_START;
    protected static final short SKIP_CALLBACKS = (short) (HAS_BRIDGE_CONTEXT << 1);

    /** test to see if this message holds any data */
    public boolean isEmpty() {
      return this.removeAllData.length == 0;
    }

    /**
     * Note this this is a "dummy" event since this message contains a list of entries each one of
     * which has its own event. The key thing needed in this event is the region. This is the event
     * that gets passed to basicOperateOnRegion
     */
    @Override
    @Retained
    protected InternalCacheEvent createEvent(DistributedRegion rgn) throws EntryNotFoundException {
      // Gester: We have to specify eventId for the message of MAP
      @Retained
      EntryEventImpl event = EntryEventImpl.create(rgn, Operation.REMOVEALL_DESTROY, null /* key */,
          null/* value */, this.callbackArg, true /* originRemote */, getSender());
      if (this.context != null) {
        event.context = this.context;
      }
      event.setPossibleDuplicate(this.possibleDuplicate);
      event.setEventId(this.eventId);
      return event;
    }

    @Override
    public void appendFields(StringBuilder sb) {
      super.appendFields(sb);
      if (eventId != null) {
        sb.append("; eventId=").append(this.eventId);
      }
      sb.append("; entries=").append(this.removeAllDataSize);
      if (removeAllDataSize <= 20) {
        // 20 is a size for test
        sb.append("; entry values=").append(Arrays.toString(this.removeAllData));
      }
    }

    /**
     * Does the "remove" of one entry for a "removeAll" operation. Note it calls back to
     * AbstractUpdateOperation.UpdateMessage#basicOperationOnRegion
     *
     * @param entry the entry being removed
     * @param rgn the region the entry is removed from
     */
    public void doEntryRemove(RemoveAllEntryData entry, DistributedRegion rgn) {
      @Released
      EntryEventImpl ev = RemoveAllMessage.createEntryEvent(entry, getSender(), this.context, rgn,
          this.possibleDuplicate, this.needsRouting, this.callbackArg, true, skipCallbacks);
      // rgn.getLogWriterI18n().info(String.format("%s", "RemoveAllMessage.doEntryRemove
      // sender=" + getSender() +
      // " event="+ev));
      // we don't need to set old value here, because the msg is from remote. local old value will
      // get from next step
      try {
        if (ev.getVersionTag() != null) {
          checkVersionTag(rgn, ev.getVersionTag());
        }
        // TODO check all removeAll basicDestroy calls done on the farside and make sure
        // "cacheWrite" is false
        rgn.basicDestroy(ev, false, null);
      } catch (EntryNotFoundException ignore) {
        this.appliedOperation = true;
      } catch (ConcurrentCacheModificationException e) {
        dispatchElidedEvent(rgn, ev);
        this.appliedOperation = false;
      } finally {
        if (ev.hasValidVersionTag() && !ev.getVersionTag().isRecorded()) {
          if (rgn.getVersionVector() != null) {
            rgn.getVersionVector().recordVersion(getSender(), ev.getVersionTag());
          }
        }
        ev.release();
      }
    }

    /**
     * create an event for a RemoveAllEntryData element
     *
     * @return the event to be used in applying the element
     */
    @Retained
    public static EntryEventImpl createEntryEvent(RemoveAllEntryData entry,
        InternalDistributedMember sender, ClientProxyMembershipID context, DistributedRegion rgn,
        boolean possibleDuplicate, boolean needsRouting, Object callbackArg, boolean originRemote,
        boolean skipCallbacks) {
      final Object key = entry.getKey();
      EventID evId = entry.getEventID();
      @Retained
      EntryEventImpl ev = EntryEventImpl.create(rgn, entry.getOp(), key, null/* value */,
          callbackArg, originRemote, sender, !skipCallbacks, evId);
      boolean returnedEv = false;
      try {
        if (context != null) {
          ev.context = context;
        }
        ev.setPossibleDuplicate(possibleDuplicate);
        ev.setVersionTag(entry.versionTag);
        // if (needsRouting) {
        // FilterProfile fp = rgn.getFilterProfile();
        // if (fp != null) {
        // FilterInfo fi = fp.getLocalFilterRouting(ev);
        // ev.setLocalFilterInfo(fi);
        // }
        // }
        if (entry.filterRouting != null) {
          InternalDistributedMember id = rgn.getMyId();
          ev.setLocalFilterInfo(entry.filterRouting.getFilterInfo(id));
        }
        /*
         * Setting tailKey for the secondary bucket here. Tail key was update by the primary.
         */
        ev.setTailKey(entry.getTailKey());
        returnedEv = true;
        return ev;
      } finally {
        if (!returnedEv) {
          ev.release();
        }
      }
    }

    @Override
    protected void basicOperateOnRegion(EntryEventImpl ev, final DistributedRegion rgn) {
      for (int i = 0; i < removeAllDataSize; ++i) {
        if (removeAllData[i].versionTag != null) {
          checkVersionTag(rgn, removeAllData[i].versionTag);
        }
      }

      rgn.syncBulkOp(new Runnable() {
        @Override
        public void run() {
          for (int i = 0; i < removeAllDataSize; ++i) {
            if (logger.isTraceEnabled()) {
              logger.trace("removeAll processing {} with {}", removeAllData[i],
                  removeAllData[i].versionTag);
            }
            removeAllData[i].setSender(sender);
            doEntryRemove(removeAllData[i], rgn);
          }
        }
      }, ev.getEventId());
    }

    @Override
    public int getDSFID() {
      return REMOVE_ALL_MESSAGE;
    }

    @Override
    public void fromData(DataInput in,
        DeserializationContext context) throws IOException, ClassNotFoundException {

      super.fromData(in, context);
      this.eventId = (EventID) context.getDeserializer().readObject(in);
      this.removeAllDataSize = (int) InternalDataSerializer.readUnsignedVL(in);
      this.removeAllData = new RemoveAllEntryData[this.removeAllDataSize];
      if (this.removeAllDataSize > 0) {
        for (int i = 0; i < this.removeAllDataSize; i++) {
          this.removeAllData[i] = new RemoveAllEntryData(in, eventId, i, context);
        }

        boolean hasTags = in.readBoolean();
        if (hasTags) {
          EntryVersionsList versionTags = EntryVersionsList.create(in);
          for (int i = 0; i < this.removeAllDataSize; i++) {
            this.removeAllData[i].versionTag = versionTags.get(i);
          }
        }
      }

      if ((flags & HAS_BRIDGE_CONTEXT) != 0) {
        this.context = context.getDeserializer().readObject(in);
      }
      this.skipCallbacks = (flags & SKIP_CALLBACKS) != 0;
    }

    @Override
    public void toData(DataOutput out,
        SerializationContext context) throws IOException {

      super.toData(out, context);
      context.getSerializer().writeObject(this.eventId, out);
      InternalDataSerializer.writeUnsignedVL(this.removeAllDataSize, out);
      if (this.removeAllDataSize > 0) {
        EntryVersionsList versionTags = new EntryVersionsList(removeAllDataSize);

        boolean hasTags = false;
        for (int i = 0; i < this.removeAllDataSize; i++) {
          if (!hasTags && removeAllData[i].versionTag != null) {
            hasTags = true;
          }
          VersionTag<?> tag = removeAllData[i].versionTag;
          versionTags.add(tag);
          removeAllData[i].versionTag = null;
          this.removeAllData[i].serializeTo(out, context);
          this.removeAllData[i].versionTag = tag;
        }

        out.writeBoolean(hasTags);
        if (hasTags) {
          InternalDataSerializer.invokeToData(versionTags, out);
        }
      }
      if (this.context != null) {
        context.getSerializer().writeObject(this.context, out);
      }
    }

    @Override
    protected short computeCompressedShort(short s) {
      s = super.computeCompressedShort(s);
      if (this.context != null)
        s |= HAS_BRIDGE_CONTEXT;
      if (this.skipCallbacks)
        s |= SKIP_CALLBACKS;
      return s;
    }

    public ClientProxyMembershipID getContext() {
      return this.context;
    }

    public RemoveAllEntryData[] getRemoveAllEntryData() {
      return this.removeAllData;
    }

  }
}
