/*
 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
 * agreements. See the NOTICE file distributed with this work for additional information regarding
 * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance with the License. You may obtain a
 * copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License
 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
 * or implied. See the License for the specific language governing permissions and limitations under
 * the License.
 */
package org.apache.geode.cache.client.internal;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.logging.log4j.Logger;

import org.apache.geode.SystemFailure;
import org.apache.geode.cache.Cache;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionDestroyedException;
import org.apache.geode.cache.client.internal.PoolImpl.PoolTask;
import org.apache.geode.internal.cache.ClientServerObserver;
import org.apache.geode.internal.cache.ClientServerObserverHolder;
import org.apache.geode.internal.cache.EventID;
import org.apache.geode.internal.cache.GemFireCacheImpl;
import org.apache.geode.internal.cache.LocalRegion;
import org.apache.geode.internal.cache.ha.ThreadIdentifier;
import org.apache.geode.internal.logging.LogService;

public class QueueStateImpl implements QueueState {
  private static final Logger logger = LogService.getLogger();

  protected QueueManager qManager = null;
  private boolean processedMarker = false;
  private final AtomicInteger invalidateCount = new AtomicInteger();

  /**
   * This will store the ThreadId to latest received sequence Id
   *
   * Keys are instances of {@link ThreadIdentifier} Values are instances of
   * {@link SequenceIdAndExpirationObject}
   */
  protected final Map threadIdToSequenceId = new LinkedHashMap();

  public QueueStateImpl(QueueManager qm) {
    this.qManager = qm;
  }

  @Override
  public void processMarker() {
    if (!this.processedMarker) {
      handleMarker();
      this.processedMarker = true;
    } else {
      if (logger.isDebugEnabled()) {
        logger.debug("{}: extra marker received", this);
      }
    }
  }

  @Override
  public boolean getProcessedMarker() {
    return this.processedMarker;
  }

  public void handleMarker() {
    ArrayList regions = new ArrayList();
    Cache cache = GemFireCacheImpl.getInstance();
    if (cache == null) {
      return;
    }

    Set rootRegions = cache.rootRegions();


    for (Iterator iter1 = rootRegions.iterator(); iter1.hasNext();) {
      Region rootRegion = (Region) iter1.next();
      regions.add(rootRegion);
      try {
        Set subRegions = rootRegion.subregions(true); // throws RDE
        for (Iterator iter2 = subRegions.iterator(); iter2.hasNext();) {
          regions.add(iter2.next());
        }
      } catch (RegionDestroyedException e) {
        continue; // region is gone go to the next one bug 38705
      }
    }

    for (Iterator iter = regions.iterator(); iter.hasNext();) {
      LocalRegion region = (LocalRegion) iter.next();
      try {
        if (region.getAttributes().getPoolName() != null
            && region.getAttributes().getPoolName().equals(qManager.getPool().getName())) {
          region.handleMarker(); // can this throw RDE??
        }
      } catch (RegionDestroyedException e) {
        continue; // region is gone go to the next one bug 38705
      }
    }
  }

  @Override
  public void incrementInvalidatedStats() {
    this.invalidateCount.incrementAndGet();

  }

  public int getInvalidateCount() {
    return this.invalidateCount.get();
  }

  /**
   * test hook - access to this map should be synchronized on the map to avoid concurrent
   * modification exceptions
   */
  @Override
  public Map getThreadIdToSequenceIdMap() {
    return this.threadIdToSequenceId;
  }

  @Override
  public boolean verifyIfDuplicate(EventID eid) {
    return verifyIfDuplicate(eid, true);
  }

  @Override
  public boolean verifyIfDuplicate(EventID eid, boolean addToMap) {
    ThreadIdentifier tid = new ThreadIdentifier(eid.getMembershipID(), eid.getThreadID());
    long seqId = eid.getSequenceID();
    SequenceIdAndExpirationObject seo = null;

    // Fix 36930: save the max sequence id for each non-putAll operation's thread
    // There're totally 3 cases to consider:
    // check the tid:
    // 1) if duplicated, (both putall or non-putall): reject
    // 2) if not duplicate
    // 2.1)if putAll, check via real thread id again,
    // if duplicate, reject (because one non-putall operation with bigger
    // seqno has happened)
    // otherwise save the putAllSeqno for real thread id
    // and save seqno for tid
    // 2.2) if not putAll,
    // check putAllSequenceId with real thread id
    // if request's seqno is smaller, reject (because one putAll operation
    // with bigger seqno has happened)
    // otherwise, update the seqno for tid
    // lock taken to avoid concurrentModification
    // while the objects are being expired
    synchronized (this.threadIdToSequenceId) {
      seo = (SequenceIdAndExpirationObject) this.threadIdToSequenceId.get(tid);
      if (seo != null && seo.getSequenceId() >= seqId) {
        if (logger.isDebugEnabled()) {
          logger.debug(" got a duplicate entry with EventId {}. Ignoring the entry", eid);
        }
        seo.setAckSend(false);
        return true;
      } else if (addToMap) {
        ThreadIdentifier real_tid = new ThreadIdentifier(eid.getMembershipID(),
            ThreadIdentifier.getRealThreadIDIncludingWan(eid.getThreadID()));
        if (ThreadIdentifier.isPutAllFakeThreadID(eid.getThreadID())) {
          // it's a putAll
          seo = (SequenceIdAndExpirationObject) this.threadIdToSequenceId.get(real_tid);
          if (seo != null && seo.getSequenceId() >= seqId) {
            if (logger.isDebugEnabled()) {
              logger.debug(
                  "got a duplicate putAll entry with eventId {}. Other operation with same thread id and bigger seqno {} has happened. Ignoring the entry",
                  eid, seo.getSequenceId());
            }
            seo.setAckSend(false); // bug #41289: send ack to servers that send old events
            return true;
          } else {
            // save the seqno for real thread id into a putAllSequenceId
            this.threadIdToSequenceId.remove(real_tid);
            this.threadIdToSequenceId.put(real_tid,
                seo == null ? new SequenceIdAndExpirationObject(-1, seqId)
                    : new SequenceIdAndExpirationObject(seo.getSequenceId(), seqId));
            // save seqno for tid
            // here tid!=real_tid, for fake tid, putAllSeqno should be 0
            this.threadIdToSequenceId.remove(tid);
            this.threadIdToSequenceId.put(tid, new SequenceIdAndExpirationObject(seqId, -1));
          }
        } else {
          // non-putAll operation:
          // check putAllSeqno for real thread id
          // if request's seqno is smaller, reject
          // otherwise, update the seqno for tid
          seo = (SequenceIdAndExpirationObject) this.threadIdToSequenceId.get(real_tid);
          if (seo != null && seo.getPutAllSequenceId() >= seqId) {
            if (logger.isDebugEnabled()) {
              logger.debug(
                  "got a duplicate non-putAll entry with eventId {}. One putAll operation with same real thread id and bigger seqno {} has happened. Ignoring the entry",
                  eid, seo.getPutAllSequenceId());
            }
            seo.setAckSend(false); // bug #41289: send ack to servers that send old events
            return true;
          } else {
            // here tid==real_tid
            this.threadIdToSequenceId.remove(tid);
            this.threadIdToSequenceId.put(tid,
                seo == null ? new SequenceIdAndExpirationObject(seqId, -1)
                    : new SequenceIdAndExpirationObject(seqId, seo.getPutAllSequenceId()));
          }
        }
      }
    }
    return false;
  }

  @Override
  public void start(ScheduledExecutorService timer, int interval) {
    timer.scheduleWithFixedDelay(new ThreadIdToSequenceIdExpiryTask(), interval, interval,
        TimeUnit.MILLISECONDS);
  }

  /**
   *
   * Thread which will iterate over threadIdToSequenceId map
   *
   * 1)It will send an ack primary server for all threadIds for which it has not send an ack. 2)It
   * will expire the entries which have exceeded the specified expiry time and for which ack has
   * been alerady sent.
   *
   * @since GemFire 5.1
   *
   */

  private class ThreadIdToSequenceIdExpiryTask extends PoolTask {
    /**
     * The expiry time of the entries in the map
     */
    private final long expiryTime;

    /**
     * constructs the Thread and initializes the expiry time
     *
     */
    public ThreadIdToSequenceIdExpiryTask() {
      expiryTime = QueueStateImpl.this.qManager.getPool().getSubscriptionMessageTrackingTimeout();
    }

    @Override
    public void run2() {
      SystemFailure.checkFailure();
      if (qManager.getPool().getCancelCriterion().isCancelInProgress()) {
        return;
      }
      if (PoolImpl.BEFORE_SENDING_CLIENT_ACK_CALLBACK_FLAG) {
        ClientServerObserver bo = ClientServerObserverHolder.getInstance();
        bo.beforeSendingClientAck();
      }
      sendPeriodicAck();
      checkForExpiry();
    }

    void checkForExpiry() {
      synchronized (threadIdToSequenceId) {
        Iterator iterator = threadIdToSequenceId.entrySet().iterator();
        long currentTime = System.currentTimeMillis();
        Map.Entry entry;
        SequenceIdAndExpirationObject seo;

        while (iterator.hasNext()) {
          entry = (Map.Entry) iterator.next();
          seo = (SequenceIdAndExpirationObject) entry.getValue();
          if ((currentTime - seo.getCreationTime() > this.expiryTime)) {
            if (seo.getAckSend() || (qManager.getPool().getSubscriptionRedundancy() == 0
                && !qManager.getPool().isDurableClient())) {
              iterator.remove();
            }
          } else {
            break;
          }
        }
      }
    }

    /**
     * Sends Periodic ack to the primary server for all threadIds for which it has not send an ack.
     */
    void sendPeriodicAck() {
      List events = new ArrayList();
      boolean success = false;
      synchronized (threadIdToSequenceId) {
        Iterator iterator = threadIdToSequenceId.entrySet().iterator();
        while (iterator.hasNext()) {
          Map.Entry entry = (Map.Entry) iterator.next();
          SequenceIdAndExpirationObject seo = (SequenceIdAndExpirationObject) entry.getValue();
          if (!seo.getAckSend()) {
            ThreadIdentifier tid = (ThreadIdentifier) entry.getKey();
            events.add(new EventID(tid.getMembershipID(), tid.getThreadID(), seo.getSequenceId()));
            seo.setAckSend(true);
          } // if ends
        } // while ends
      } // synchronized ends

      if (events.size() > 0) {
        try {
          PrimaryAckOp.execute(qManager.getAllConnections().getPrimary(), qManager.getPool(),
              events);
          success = true;
        } catch (Exception ex) {
          if (logger.isDebugEnabled())
            logger.debug("Exception while sending an ack to the primary server: {}", ex);
        } finally {
          if (!success) {
            Iterator iter = events.iterator();
            while (iter.hasNext()) {
              EventID eid = (EventID) iter.next();
              ThreadIdentifier tid = new ThreadIdentifier(eid.getMembershipID(), eid.getThreadID());
              synchronized (threadIdToSequenceId) {
                SequenceIdAndExpirationObject seo =
                    (SequenceIdAndExpirationObject) threadIdToSequenceId.get(tid);
                if (seo != null && seo.getAckSend()) {
                  seo = (SequenceIdAndExpirationObject) threadIdToSequenceId.remove(tid);
                  if (seo != null) {
                    // put back the old seqId with a new time stamp
                    SequenceIdAndExpirationObject siaeo = new SequenceIdAndExpirationObject(
                        seo.getSequenceId(), seo.getPutAllSequenceId());
                    threadIdToSequenceId.put(tid, siaeo);
                  }
                } // if ends
              } // synchronized ends
            } // while ends
          } // if(!success) ends
        } // finally ends
      } // if(events.size() > 0)ends
    }// method ends
  }

  /**
   * A class to store sequenceId and the creation time of the object to be used for expiring the
   * entry
   *
   * @since GemFire 5.1
   *
   */
  public static class SequenceIdAndExpirationObject {
    /** The sequence Id of the entry * */
    private final long sequenceId;
    /** The sequence Id of the putAll operations * */
    private final long putAllSequenceId;
    /** The time of creation of the object* */
    private final long creationTime;
    /** Client ack is send to server or not* */
    private boolean ackSend;

    SequenceIdAndExpirationObject(long sequenceId, long putAllSequenceId) {
      this.sequenceId = sequenceId;
      this.putAllSequenceId = putAllSequenceId;
      this.creationTime = System.currentTimeMillis();
      this.ackSend = false;
    }

    /**
     * @return Returns the creationTime.
     */
    public long getCreationTime() {
      return creationTime;
    }

    /**
     * @return Returns the sequenceId.
     */
    public long getSequenceId() {
      return sequenceId;
    }

    /**
     * @return Returns the putAllSequenceId.
     */
    public long getPutAllSequenceId() {
      return putAllSequenceId;
    }

    /**
     *
     * @return Returns the ackSend
     */
    public boolean getAckSend() {
      return ackSend;
    }

    /**
     * Sets the ackSend
     *
     */
    public void setAckSend(boolean ackSend) {
      this.ackSend = ackSend;
    }

    @Override
    public String toString() {
      StringBuffer sb = new StringBuffer();
      sb.append("SequenceIdAndExpirationObject[");
      sb.append("ackSend = " + this.ackSend);
      sb.append("; creation = " + creationTime);
      sb.append("; seq = " + sequenceId);
      sb.append("; putAll seq = " + putAllSequenceId);
      sb.append("]");
      return sb.toString();
    }
  }

}
