/*
 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
 * agreements. See the NOTICE file distributed with this work for additional information regarding
 * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance with the License. You may obtain a
 * copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License
 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
 * or implied. See the License for the specific language governing permissions and limitations under
 * the License.
 */

package org.apache.geode.internal.cache.partitioned;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;

import org.apache.logging.log4j.Logger;

import org.apache.geode.CancelException;
import org.apache.geode.DataSerializer;
import org.apache.geode.annotations.Immutable;
import org.apache.geode.cache.Operation;
import org.apache.geode.distributed.internal.ClusterDistributionManager;
import org.apache.geode.distributed.internal.DistributionMessage;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.distributed.internal.ReplyMessage;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.cache.BucketRegion;
import org.apache.geode.internal.cache.EventID;
import org.apache.geode.internal.cache.FilterProfile;
import org.apache.geode.internal.cache.ForceReattemptException;
import org.apache.geode.internal.cache.PartitionedRegion;
import org.apache.geode.internal.cache.RegionEventImpl;
import org.apache.geode.internal.cache.tier.sockets.CacheClientNotifier;
import org.apache.geode.internal.cache.tier.sockets.ClientTombstoneMessage;
import org.apache.geode.internal.cache.tier.sockets.ClientUpdateMessage;
import org.apache.geode.internal.logging.log4j.LogMarker;
import org.apache.geode.internal.serialization.DeserializationContext;
import org.apache.geode.internal.serialization.SerializationContext;
import org.apache.geode.internal.serialization.SerializationVersions;
import org.apache.geode.internal.serialization.Version;
import org.apache.geode.logging.internal.log4j.api.LogService;

/**
 * This message class sends tombstone GC information to other PR holders
 *
 * @since GemFire 7.0
 */
public class PRTombstoneMessage extends PartitionMessageWithDirectReply
    implements SerializationVersions {

  private static final Logger logger = LogService.getLogger();

  @Immutable
  private static final Version[] serializationVersions = null;

  private Set<Object> keys;
  private EventID eventID;

  /**
   * Empty constructor to satisfy {@link DataSerializer} requirements
   */
  public PRTombstoneMessage() {}

  public static void send(BucketRegion r, final Set<Object> keys, EventID eventID) {
    Set<InternalDistributedMember> recipients =
        r.getPartitionedRegion().getRegionAdvisor().adviseAllServersWithInterest();
    recipients.removeAll(r.getDistributionAdvisor().adviseReplicates());
    if (recipients.size() == 0) {
      return;
    }
    PartitionResponse p = new Response(r.getSystem(), recipients);
    PRTombstoneMessage m =
        new PRTombstoneMessage(recipients, r.getPartitionedRegion().getPRId(), p, keys, eventID);
    m.setTransactionDistributed(r.getCache().getTxManager().isDistributed());
    r.getDistributionManager().putOutgoing(m);

    try {
      p.waitForCacheException();
    } catch (ForceReattemptException e) {
      // ignore - the member is going away or has destroyed the PR so
      // it won't be forwarding anything to clients for the PR
    }
  }

  private PRTombstoneMessage(Set<InternalDistributedMember> recipients, int regionId,
      PartitionResponse p, Set<Object> reapedKeys, EventID eventID) {
    super(recipients, regionId, p);
    this.keys = reapedKeys;
    this.eventID = eventID;
  }

  @Override
  protected boolean operateOnPartitionedRegion(final ClusterDistributionManager dm,
      PartitionedRegion r, long startTime) throws ForceReattemptException {
    if (logger.isTraceEnabled(LogMarker.DM_VERBOSE)) {
      logger.trace("PRTombstoneMessage operateOnRegion: {}", r.getFullPath());
    }
    FilterProfile fp = r.getFilterProfile();
    if (this.keys != null && this.keys.size() > 0) { // sanity check
      if (fp != null && CacheClientNotifier.singletonHasClientProxies() && this.eventID != null) {
        RegionEventImpl regionEvent = new RegionEventImpl(r, Operation.REGION_DESTROY, null, true,
            r.getGemFireCache().getMyId());
        regionEvent.setLocalFilterInfo(fp.getLocalFilterRouting(regionEvent));
        ClientUpdateMessage clientMessage = ClientTombstoneMessage.gc(r, this.keys, this.eventID);
        CacheClientNotifier.notifyClients(regionEvent, clientMessage);
      }
    }
    return true;
  }

  @Override
  protected void appendFields(StringBuilder buff) {
    super.appendFields(buff);
    buff.append("; keys=").append(this.keys.size());
    buff.append("; eventID=").append(this.eventID);
  }

  @Override
  public int getDSFID() {
    return PR_TOMBSTONE_MESSAGE;
  }

  @Override
  public Version[] getSerializationVersions() {
    return serializationVersions;
  }

  @Override
  public void fromData(DataInput in,
      DeserializationContext context) throws IOException, ClassNotFoundException {
    super.fromData(in, context);
    int numKeys = in.readInt();
    this.keys = new HashSet<Object>(numKeys);
    for (int i = 0; i < numKeys; i++) {
      this.keys.add(DataSerializer.readObject(in));
    }
    this.eventID = (EventID) DataSerializer.readObject(in);
  }

  @Override
  public void toData(DataOutput out,
      SerializationContext context) throws IOException {
    super.toData(out, context);
    out.writeInt(this.keys.size());
    for (Object key : keys) {
      DataSerializer.writeObject(key, out);
    }
    DataSerializer.writeObject(this.eventID, out);
  }

  private static class Response extends PartitionResponse {
    // Set<InternalDistributedMember> forceReattemptSenders = new
    // HashSet<InternalDistributedMember>();

    public Response(InternalDistributedSystem ds, Set recipients) {
      super(ds, recipients, false);
    }

    @Override
    public void process(DistributionMessage msg) {
      ReplyMessage reply = (ReplyMessage) msg;
      if (reply.getException() != null) {
        Throwable cause = reply.getException().getCause();
        if (cause instanceof ForceReattemptException || cause instanceof CancelException) {
          // TODO do we need to resend to these recipients? Might they have clients that won't
          // otherwise get
          // the GC message?
          // this.forceReattemptSenders.add(reply.getSender());
          reply.setException(null);
        }
      }
      super.process(reply);
    }
  }
}
