/*
 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
 * agreements. See the NOTICE file distributed with this work for additional information regarding
 * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance with the License. You may obtain a
 * copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License
 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
 * or implied. See the License for the specific language governing permissions and limitations under
 * the License.
 */
package org.apache.geode.internal.cache;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;

import org.apache.geode.DataSerializer;
import org.apache.geode.InternalGemFireError;
import org.apache.geode.annotations.Immutable;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.HeapDataOutputStream;
import org.apache.geode.internal.InternalDataSerializer;
import org.apache.geode.internal.ObjToByteArraySerializer;
import org.apache.geode.internal.VersionedDataSerializable;
import org.apache.geode.internal.serialization.ByteArrayDataInput;
import org.apache.geode.internal.serialization.KnownVersion;
import org.apache.geode.internal.serialization.StaticSerialization;

/**
 * This class is used to hold the information about the servers and their Filters (CQs and Interest
 * List) that are satisfied by the cache update operation.
 *
 * @since GemFire 6.5
 */
public class FilterRoutingInfo implements VersionedDataSerializable {

  private static final boolean OLD_MEMBERS_OPTIMIZED =
      Boolean.getBoolean("optimized-cq-serialization");

  @Immutable
  private static final KnownVersion[] serializationVersions =
      new KnownVersion[] {KnownVersion.GFE_71};

  /** Set to true if any peer members has any filters. */
  private boolean memberWithFilterInfoExists = false;

  /** Holds filter local filter information. */
  private transient FilterInfo localFilterInfo;

  /** whether local interest has been computed (not CQs - just interest) */
  private transient boolean hasLocalInterestBeenComputed;

  /**
   * Map containing peer-server to filter message.
   */
  private HashMap<InternalDistributedMember, FilterInfo> serverFilterInfo =
      new HashMap<InternalDistributedMember, FilterInfo>();

  /**
   * Sets the local CQ filter information.
   *
   * @param cqInfo map of server side CQ Name to CQ event type.
   */
  public void setLocalCqInfo(HashMap cqInfo) {
    if (this.localFilterInfo == null) {
      this.localFilterInfo = new FilterInfo();
    }
    this.localFilterInfo.cqs = cqInfo;
    this.localFilterInfo.filterProcessedLocally = true;
  }

  /**
   * Sets the local Interest information.
   *
   * @param clients interested clients with receiveValues=true.
   * @param clientsInv interested clients with receiveValues=false;
   */
  public void setLocalInterestedClients(Set clients, Set clientsInv) {
    if (this.localFilterInfo == null) {
      this.localFilterInfo = new FilterInfo();
    }
    this.localFilterInfo.setInterestedClients(clients);
    this.localFilterInfo.setInterestedClientsInv(clientsInv);
    this.localFilterInfo.filterProcessedLocally = true;
    this.hasLocalInterestBeenComputed = true;
  }

  public void setLocalFilterInfo(FilterInfo filterInfo) {
    localFilterInfo = filterInfo;
  }

  /**
   * returns true if local interest has been computed
   */
  public boolean hasLocalInterestBeenComputed() {
    return this.hasLocalInterestBeenComputed;
  }

  /**
   * Returns local Filter information.
   *
   * @return FilterInfo local filter info having CQs and interested client info.
   */
  public FilterInfo getLocalFilterInfo() {
    return this.localFilterInfo;
  }

  /**
   * Sets CQ routing information.
   *
   * @param member for which CQs are satisfied.
   * @param cqInfo map of server side CQ Name to CQ event type.
   */
  public void setCqRoutingInfo(InternalDistributedMember member, HashMap cqInfo) {
    FilterInfo fInfo = new FilterInfo();
    fInfo.setCQs(cqInfo);
    this.serverFilterInfo.put(member, fInfo);
    if (cqInfo.size() > 0) {
      this.memberWithFilterInfoExists = true;
    }
  }

  /**
   * Sets interested clients routing information
   *
   * @param member on which the client interests are satisfied
   * @param clients Set containing interested clients with receiveValues=true
   * @param clientsInv Set containing interested clients with receiveValues=false
   * @param longIDs whether the client IDs may be long integers
   */
  public void addInterestedClients(InternalDistributedMember member, Set clients, Set clientsInv,
      boolean longIDs) {
    this.memberWithFilterInfoExists = true;
    FilterInfo fInfo = this.serverFilterInfo.get(member);
    if (fInfo == null) {
      fInfo = new FilterInfo();
      this.serverFilterInfo.put(member, fInfo);
    }
    if (clients != null && clients.size() > 0) {
      fInfo.setInterestedClients(clients);
    }
    if (clientsInv != null && clientsInv.size() > 0) {
      fInfo.setInterestedClientsInv(clientsInv);
    }
    if (longIDs) {
      fInfo.longIDs = longIDs;
    }
  }

  /**
   * Returns the members list that has filters satisfied.
   *
   * @return the members who have filter routing information
   */
  public Set<InternalDistributedMember> getMembers() {
    return this.serverFilterInfo.keySet();
  }

  /**
   * Returns true if the local filter information is set.
   *
   * @return whether there is local routing information in this object
   */
  public boolean hasLocalFilterInfo() {
    return this.localFilterInfo != null;
  }

  /**
   * Returns true if there is any member with filters satisfied.
   *
   * @return true if we have any filter information in this object
   */
  public boolean hasMemberWithFilterInfo() {
    return this.memberWithFilterInfoExists;
  }

  /**
   * Returns the Filter Information for the member.
   *
   * @param member the member whose filter information is desired
   * @return the filter information for the given member
   */
  public FilterInfo getFilterInfo(InternalDistributedMember member) {
    return this.serverFilterInfo.get(member);
  }

  /**
   * This adds the filter information from the given routing object to this object's tables. This is
   * used to merge routing information for putAll operations.
   *
   * @param eventRouting the routing information for a single putAll event
   */
  public void addFilterInfo(FilterRoutingInfo eventRouting) {
    for (Map.Entry<InternalDistributedMember, FilterInfo> entry : eventRouting.serverFilterInfo
        .entrySet()) {
      FilterInfo existing = this.serverFilterInfo.get(entry.getKey());
      if (existing == null) {
        existing = new FilterInfo();
        this.serverFilterInfo.put(entry.getKey(), existing);
      }
      existing.addFilterInfo(entry.getValue());
    }
    if (eventRouting.localFilterInfo != null) {
      if (this.localFilterInfo == null) {
        this.localFilterInfo = new FilterInfo();
      }
      this.localFilterInfo.addFilterInfo(eventRouting.localFilterInfo);
    }
    this.memberWithFilterInfoExists |= eventRouting.memberWithFilterInfoExists;
  }

  /** DataSerializable methods */
  @Override
  public void fromData(DataInput in) throws IOException, ClassNotFoundException {
    DistributedMember myID = null;
    InternalCache cache = GemFireCacheImpl.getInstance();
    if (cache != null) {
      myID = cache.getMyId();
    }
    int size = in.readInt();
    for (int i = 0; i < size; i++) {
      InternalDistributedMember member = InternalDistributedMember.readEssentialData(in);
      FilterInfo fInfo = new FilterInfo();
      InternalDataSerializer.invokeFromData(fInfo, in);
      // we only need to retain the recipient's entry
      if (myID == null || myID.equals(member)) {
        this.serverFilterInfo.put(member, fInfo);
      }
    }
  }

  @Override
  public void toData(DataOutput out) throws IOException {
    int size = this.serverFilterInfo.size();
    out.writeInt(size);
    for (Map.Entry<InternalDistributedMember, FilterInfo> e : this.serverFilterInfo.entrySet()) {
      InternalDistributedMember member = e.getKey();
      member.writeEssentialData(out);
      FilterInfo fInfo = e.getValue();
      InternalDataSerializer.invokeToData(fInfo, out);
    }
  }

  @Override
  public KnownVersion[] getSerializationVersions() {
    return serializationVersions;
  }

  public void fromDataPre_GFE_7_1_0_0(DataInput in) throws IOException, ClassNotFoundException {
    DistributedMember myID = null;
    InternalCache cache = GemFireCacheImpl.getInstance();
    if (cache != null) {
      myID = cache.getMyId();
    }
    int size = in.readInt();
    for (int i = 0; i < size; i++) {
      InternalDistributedMember member = new InternalDistributedMember();
      InternalDataSerializer.invokeFromData(member, in);
      FilterInfo fInfo = new FilterInfo();
      InternalDataSerializer.invokeFromData(fInfo, in);
      // we only need to retain the recipient's entry
      if (myID == null || myID.equals(member)) {
        this.serverFilterInfo.put(member, fInfo);
      }
    }
  }

  public void toDataPre_GFE_7_1_0_0(DataOutput out) throws IOException {
    int size = this.serverFilterInfo.size();
    out.writeInt(size);
    for (Map.Entry<InternalDistributedMember, FilterInfo> e : this.serverFilterInfo.entrySet()) {
      InternalDistributedMember member = e.getKey();
      InternalDataSerializer.invokeToData(member, out);
      FilterInfo fInfo = e.getValue();
      InternalDataSerializer.invokeToData(fInfo, out);
    }
  }

  @Override
  public String toString() {
    String result = "FilterRoutingInfo(";
    if (this.localFilterInfo != null) {
      result += "local=";
      result += this.localFilterInfo;
      if (this.serverFilterInfo != null) {
        result += ", ";
      }
    }
    if (this.serverFilterInfo != null) {
      result += "remote=";
      result += this.serverFilterInfo;
    }
    return result + ")";
  }


  /**
   * This holds the information about the CQs and interest list.
   */
  public static class FilterInfo implements VersionedDataSerializable {

    public boolean longIDs;

    private static final long serialVersionUID = 0;

    /** Map holding Cq filterID and CqEvent Type */
    private HashMap<Long, Integer> cqs;

    /**
     * serialized routing data. This is only deserialized when requested so that routing information
     * for other members included in a cach op message stays serialized, reducing the cost of having
     * to send all routing info to all members
     */
    private transient byte[] myData;

    /** version of creator of myData, needed for deserialization */
    private transient KnownVersion myDataVersion;

    /** Clients that are interested in the event and want values */
    private Set interestedClients;

    /** Clients that are interested in the event and want invalidations */
    private Set interestedClientsInv;

    /** To identify where the filter is processed, locally or in remote node */
    public boolean filterProcessedLocally = false;

    /** Used to determine when to computed interested clients for transactional events */
    private transient boolean changeAppliedToCache = false;

    /** adds the content from another FilterInfo object. */
    public void addFilterInfo(FilterInfo other) {
      if (other.cqs != null) {
        if (this.cqs == null) {
          this.cqs = new HashMap();
        }
        for (Map.Entry<Long, Integer> entry : other.cqs.entrySet()) {
          this.cqs.put(entry.getKey(), entry.getValue());
        }
      }
      if (other.interestedClients != null) {
        if (this.interestedClients == null) {
          this.interestedClients = new HashSet();
        }
        this.interestedClients.addAll(other.interestedClients);
      }
      if (other.interestedClientsInv != null) {
        if (this.interestedClientsInv == null) {
          this.interestedClientsInv = new HashSet();
        }
        this.interestedClientsInv.addAll(other.interestedClientsInv);
      }
    }

    /** clears CQ routing information */
    public void clearCQRouting() {
      this.cqs = null;
    }

    @Immutable
    private static final KnownVersion[] serializationVersions =
        new KnownVersion[] {KnownVersion.GFE_80};

    @Override
    public KnownVersion[] getSerializationVersions() {
      return serializationVersions;
    }

    /** DataSerializable methods */
    @Override
    public void fromData(DataInput in) throws IOException, ClassNotFoundException {
      this.myData = DataSerializer.readByteArray(in);
    }

    @Override
    public void toData(DataOutput out) throws IOException {
      HeapDataOutputStream hdos;
      int size = 9;
      size += interestedClients == null ? 4 : interestedClients.size() * 8 + 5;
      size += interestedClientsInv == null ? 4 : interestedClientsInv.size() * 8 + 5;
      size += cqs == null ? 0 : cqs.size() * 12;
      byte[] myData = StaticSerialization.getThreadLocalByteArray(size);
      hdos = new HeapDataOutputStream(myData);
      hdos.disallowExpansion();
      if (this.cqs == null) {
        hdos.writeBoolean(false);
      } else {
        hdos.writeBoolean(true);
        InternalDataSerializer.writeArrayLength(cqs.size(), hdos);
        for (Iterator it = this.cqs.entrySet().iterator(); it.hasNext();) {
          Map.Entry e = (Map.Entry) it.next();
          // most cq IDs and all event types are small ints, so we use an optimized
          // write that serializes 7 bits at a time in a compact form
          InternalDataSerializer.writeUnsignedVL((Long) e.getKey(), hdos);
          InternalDataSerializer.writeUnsignedVL((Integer) e.getValue(), hdos);
        }
      }
      InternalDataSerializer.writeSetOfLongs(this.interestedClients, this.longIDs, hdos);
      InternalDataSerializer.writeSetOfLongs(this.interestedClientsInv, this.longIDs, hdos);
      hdos.finishWriting();
      DataSerializer.writeByteArray(myData, hdos.size(), out);
    }

    public void fromDataPre_GFE_8_0_0_0(DataInput in) throws IOException, ClassNotFoundException {
      if (OLD_MEMBERS_OPTIMIZED) {
        this.myDataVersion = StaticSerialization.getVersionForDataStreamOrNull(in);
        this.myData = DataSerializer.readByteArray(in);
      } else {
        this.cqs = DataSerializer.readHashMap(in);
        this.interestedClients = InternalDataSerializer.readSetOfLongs(in);
        this.interestedClientsInv = InternalDataSerializer.readSetOfLongs(in);
      }
    }

    public void toDataPre_GFE_8_0_0_0(DataOutput out) throws IOException {
      if (OLD_MEMBERS_OPTIMIZED) {
        HeapDataOutputStream hdos =
            new HeapDataOutputStream(1000, StaticSerialization.getVersionForDataStream(out));
        if (this.cqs == null) {
          hdos.writeBoolean(false);
        } else {
          hdos.writeBoolean(true);
          InternalDataSerializer.writeArrayLength(cqs.size(), hdos);
          for (Iterator it = this.cqs.entrySet().iterator(); it.hasNext();) {
            Map.Entry e = (Map.Entry) it.next();
            // most cq IDs and all event types are small ints, so we use an optimized
            // write that serializes 7 bits at a time in a compact form
            InternalDataSerializer.writeUnsignedVL((Long) e.getKey(), hdos);
            InternalDataSerializer.writeUnsignedVL((Integer) e.getValue(), hdos);
          }
        }
        InternalDataSerializer.writeSetOfLongs(this.interestedClients, this.longIDs, hdos);
        InternalDataSerializer.writeSetOfLongs(this.interestedClientsInv, this.longIDs, hdos);
        if (out instanceof HeapDataOutputStream) {
          ((ObjToByteArraySerializer) out).writeAsSerializedByteArray(hdos);
        } else {
          byte[] myData = hdos.toByteArray();
          DataSerializer.writeByteArray(myData, out);
        }
      } else {
        DataSerializer.writeHashMap(this.cqs, out);
        InternalDataSerializer.writeSetOfLongs(this.interestedClients, this.longIDs, out);
        InternalDataSerializer.writeSetOfLongs(this.interestedClientsInv, this.longIDs, out);
      }
    }

    public HashMap<Long, Integer> getCQs() {
      if (this.cqs == null && this.myData != null) {
        deserialize();
      }
      return this.cqs;
    }

    public void setCQs(HashMap<Long, Integer> cqs) {
      this.cqs = cqs;
    }

    public Set getInterestedClients() {
      if (this.interestedClients == null && this.myData != null) {
        deserialize();
      }
      return this.interestedClients;
    }

    public void setInterestedClients(Set clients) {
      this.interestedClients = clients;
    }

    public Set getInterestedClientsInv() {
      if (this.interestedClientsInv == null && this.myData != null) {
        deserialize();
      }
      return this.interestedClientsInv;
    }

    public void setInterestedClientsInv(Set clients) {
      this.interestedClientsInv = clients;
    }

    /**
     * FilterInfo fields are only deserialized if they are needed. We send all FilterInfo routings
     * to all members that receive a cach op message but each member is only interested in its own
     * routing, so there is no need to deserialize the routings for other members
     */
    private void deserialize() {
      try (ByteArrayDataInput dis = new ByteArrayDataInput(myData, myDataVersion)) {
        boolean hasCQs = dis.readBoolean();
        if (hasCQs) {
          int numEntries = InternalDataSerializer.readArrayLength(dis);
          this.cqs = new HashMap(numEntries);
          for (int i = 0; i < numEntries; i++) {
            Long key = InternalDataSerializer.readUnsignedVL(dis);
            Integer value = (int) InternalDataSerializer.readUnsignedVL(dis);
            this.cqs.put(key, value);
          }
        }
        this.interestedClients = InternalDataSerializer.readSetOfLongs(dis);
        this.interestedClientsInv = InternalDataSerializer.readSetOfLongs(dis);
        this.myData = null; // prevent future deserializations by setting this to null
      } catch (IOException e) {
        throw new InternalGemFireError(e);
      }
    }

    @Override
    public String toString() {
      StringBuilder sb = new StringBuilder();
      if (this.interestedClients != null && this.interestedClients.size() > 0) {
        sb.append("interestedClients:");
        sb.append(this.interestedClients);
      }
      if (this.interestedClientsInv != null && this.interestedClientsInv.size() > 0) {
        sb.append(", interestedClientsInv:");
        sb.append(this.interestedClientsInv);
      }
      if (InternalDistributedSystem.getLogger().finerEnabled()) {
        if (this.cqs != null) {
          sb.append(", cqs=");
          sb.append(this.cqs.keySet());
        }
      } else {
        if (this.cqs != null) {
          sb.append(", ").append(this.cqs.size()).append(" cqs");
        }
      }
      return sb.toString();
    }

    public void setChangeAppliedToCache(boolean changeAppliedToCache) {
      this.changeAppliedToCache = changeAppliedToCache;
    }

    public boolean isChangeAppliedToCache() {
      return changeAppliedToCache;
    }
  }

}
