/*
 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
 * agreements. See the NOTICE file distributed with this work for additional information regarding
 * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance with the License. You may obtain a
 * copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License
 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
 * or implied. See the License for the specific language governing permissions and limitations under
 * the License.
 */

package org.apache.geode.internal.cache;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;

import org.apache.logging.log4j.Logger;

import org.apache.geode.CancelException;
import org.apache.geode.cache.CommitConflictException;
import org.apache.geode.cache.RegionDestroyedException;
import org.apache.geode.cache.UnsupportedOperationInTransactionException;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.cache.TXEntryState.DistTxThinEntryState;
import org.apache.geode.logging.internal.log4j.api.LogService;

/**
 * TXRegionState is the entity that tracks all the changes a transaction has made to a region.
 *
 *
 * @since GemFire 4.0
 *
 * @see TXManagerImpl
 */
public class TXRegionState {
  private static final Logger logger = LogService.getLogger();

  // A map of Objects (entry keys) -> TXEntryState
  private final HashMap<Object, TXEntryState> entryMods;
  // A map of Objects (entry keys) -> TXEntryUserAttrState
  private HashMap uaMods;
  private Set<InternalDistributedMember> otherMembers = null;
  private TXState txState;
  private InternalRegion region;
  private final boolean needsRefCounts;
  private boolean cleanedUp;
  /*
   * For Distributed Tx Created during precommit, to apply changes on secondaries/replicates from
   * coordinator.
   */
  private boolean createdDuringCommit;

  public TXRegionState(InternalRegion r, TXState txState) {
    if (r.getPersistBackup() && !r.isMetaRegionWithTransactions()
        && !TXManagerImpl.ALLOW_PERSISTENT_TRANSACTIONS) {
      throw new UnsupportedOperationException(
          "Operations on persist-backup regions are not allowed because this thread has an active transaction");
    }
    if (r.getScope().isGlobal()) {
      throw new UnsupportedOperationException(
          "Operations on global regions are not allowed because this thread has an active transaction");
    }
    this.entryMods = new HashMap<Object, TXEntryState>();
    this.uaMods = null;
    this.region = r;
    this.txState = txState;
    this.needsRefCounts = r.isEntryEvictionPossible() || r.isEntryExpiryPossible();
    r.setInUseByTransaction(true);
  }

  public InternalRegion getRegion() {
    return region;
  }

  public boolean needsRefCounts() {
    return this.needsRefCounts;
  }



  public Set getEntryKeys() {
    return Collections.unmodifiableSet(this.entryMods.keySet());
  }

  public TXEntryState readEntry(Object entryKey) {
    return this.entryMods.get(entryKey);
  }

  public TXEntryState createReadEntry(LocalRegion r, Object entryKey, RegionEntry re, Object vId,
      Object pendingValue) {
    InternalCache cache = r.getCache();
    boolean isDistributed = false;
    if (cache.getTxManager().getTXState() != null) {
      isDistributed = cache.getTxManager().getTXState().isDistTx();
    } else {
      // TXCoordinator and datanode are same
      isDistributed = cache.getTxManager().isDistributed();
    }
    TXEntryState result = cache.getTXEntryStateFactory().createEntry(re, vId, pendingValue,
        entryKey, this, isDistributed);
    this.entryMods.put(entryKey, result);
    return result;
  }

  // public void rmEntry(Object entryKey, TXState txState, LocalRegion r) {
  // rmEntryUserAttr(entryKey);
  // TXEntryState e = (TXEntryState)this.entryMods.remove(entryKey);
  // if (e != null) {
  // e.cleanup(r);
  // }
  // if (this.uaMods == null && this.entryMods.size() == 0) {
  // txState.rmRegion(r);
  // }
  // }

  public TXEntryUserAttrState readEntryUserAttr(Object entryKey) {
    TXEntryUserAttrState result = null;
    if (this.uaMods != null) {
      result = (TXEntryUserAttrState) this.uaMods.get(entryKey);
    }
    return result;
  }

  public TXEntryUserAttrState writeEntryUserAttr(Object entryKey, LocalRegion r) {
    if (this.uaMods == null) {
      this.uaMods = new HashMap();
    }
    TXEntryUserAttrState result = (TXEntryUserAttrState) this.uaMods.get(entryKey);
    if (result == null) {
      result = new TXEntryUserAttrState(r.basicGetEntryUserAttribute(entryKey));
      this.uaMods.put(entryKey, result);
    }
    return result;
  }

  public void rmEntryUserAttr(Object entryKey) {
    if (this.uaMods != null) {
      if (this.uaMods.remove(entryKey) != null) {
        if (this.uaMods.size() == 0) {
          this.uaMods = null;
        }
      }
    }
  }

  /**
   * Returns the total number of modifications made by this transaction to this region's entry
   * count. The result will have a +1 for every create and a -1 for every destroy.
   */
  int entryCountMod() {
    int result = 0;
    Iterator it = this.entryMods.values().iterator();
    while (it.hasNext()) {
      TXEntryState es = (TXEntryState) it.next();
      result += es.entryCountMod();
    }
    return result;
  }

  TXEntryState getTXEntryState(Object key) {
    return this.entryMods.get(key);
  }

  /**
   * Fills in a set of any entries created by this transaction for the provided region.
   *
   * @param ret the HashSet to fill in with key objects
   */
  void fillInCreatedEntryKeys(HashSet ret) {
    Iterator<Entry<Object, TXEntryState>> it = this.entryMods.entrySet().iterator();
    while (it.hasNext()) {
      Entry<Object, TXEntryState> me = it.next();
      TXEntryState txes = me.getValue();
      if (txes.wasCreatedByTX()) {
        ret.add(me.getKey());
      }
    }
  }

  /**
   * Create a lock request on this region state and adds it to req
   */
  void createLockRequest(InternalRegion r, TXLockRequest req) {
    if (this.uaMods == null && this.entryMods.isEmpty()) {
      return;
    }
    if (this.txState.logger.isDebugEnabled()) {
      this.txState.logger.debug("TXRegionState.createLockRequest 1 " + r.getClass().getSimpleName()
          + " region-state=" + this);
    }
    if (r.getScope().isDistributed()) {
      // [DISTTX] Do not take lock for RR on replicates
      if (this.isCreatedDuringCommit()) {
        return;
      }
      DistributedRegion dr = (DistributedRegion) r;
      Set<InternalDistributedMember> advice = dr.getCacheDistributionAdvisor().adviseTX();
      if (!advice.isEmpty()) {
        this.otherMembers = advice; // remember for when it is time to distribute
      }
    }
    if (this.txState.logger.isDebugEnabled()) {
      this.txState.logger.debug("TXRegionState.createLockRequest 2");
    }
    // Bypass D-lock for Pr TX
    boolean byPassDLock = false;
    if (r instanceof BucketRegion) {
      // BucketRegion br = (BucketRegion)r;
      // if (br.getRedundancyLevel() < 2) {
      byPassDLock = true;
      // }
    }
    final boolean distributedTX = !byPassDLock && r.getScope().isDistributedAck();
    if (this.uaMods != null || (!distributedTX && this.entryMods.size() > 0)) {
      // need some local locks
      TXRegionLockRequestImpl rlr = new TXRegionLockRequestImpl(r.getCache(), r);
      if (this.uaMods != null) {
        Iterator<Object> it = this.uaMods.keySet().iterator();
        while (it.hasNext()) {
          // add key with isEvent set to TRUE, for keep BC
          rlr.addEntryKey(it.next(), Boolean.TRUE);
        }

      }
      if (!distributedTX && this.entryMods.size() > 0) {
        rlr.addEntryKeys(getLockRequestEntryKeys());
      }
      if (!rlr.isEmpty()) {
        req.addLocalRequest(rlr);
      }
    }
    if (distributedTX && this.entryMods.size() > 0) {
      // need some distributed locks
      TXRegionLockRequestImpl rlr = new TXRegionLockRequestImpl(r.getCache(), r);
      rlr.addEntryKeys(getLockRequestEntryKeys());
      if (!rlr.isEmpty()) {
        req.setOtherMembers(this.otherMembers);
        req.addDistributedRequest(rlr);
      }
    }
  }

  /**
   * Returns a map of entry keys that this tx needs to request a lock for at commit time.
   *
   * @return <code>null</code> if no entries need to be locked.
   */
  private Map getLockRequestEntryKeys() {
    HashMap<Object, Boolean> result = null;
    Iterator it = this.entryMods.entrySet().iterator();
    while (it.hasNext()) {
      Map.Entry me = (Map.Entry) it.next();
      TXEntryState txes = (TXEntryState) me.getValue();
      if (txes.isDirty() && !txes.isOpSearch()) {
        if (result == null) {
          result = new HashMap();
        }
        result.put(me.getKey(), txes.isOpAnyEvent(this.region));
      }
    }
    return result;
  }

  void checkForConflicts(InternalRegion r) throws CommitConflictException {
    if (this.isCreatedDuringCommit()) {
      return;
    }
    {
      Iterator it = this.entryMods.entrySet().iterator();
      while (it.hasNext()) {
        Map.Entry me = (Map.Entry) it.next();
        Object eKey = me.getKey();
        TXEntryState txes = (TXEntryState) me.getValue();
        txes.checkForConflict(r, eKey);
      }
    }
    if (this.uaMods != null) {
      r.checkReadiness();
      Iterator it = this.uaMods.entrySet().iterator();
      while (it.hasNext()) {
        Map.Entry me = (Map.Entry) it.next();
        Object eKey = me.getKey();
        TXEntryUserAttrState txes = (TXEntryUserAttrState) me.getValue();
        txes.checkForConflict(r, eKey);
      }
    }
  }

  /**
   * For each entry that is not dirty (all we did was read it) decrement its refcount (so it can be
   * evicted as we apply our writes) and remove it from entryMods (so we don't keep iterating over
   * it and se we don't try to clean it up again later).
   */
  void cleanupNonDirtyEntries(InternalRegion r) {
    if (!this.entryMods.isEmpty()) {
      Iterator it = this.entryMods.entrySet().iterator();
      while (it.hasNext()) {
        Map.Entry me = (Map.Entry) it.next();
        // Object eKey = me.getKey();
        TXEntryState txes = (TXEntryState) me.getValue();
        if (txes.cleanupNonDirty(r)) {
          it.remove();
        }
      }
    }
  }

  void buildMessage(InternalRegion r, TXCommitMessage msg) {
    try {
      if (!r.getScope().isLocal() && !this.entryMods.isEmpty()) {

        msg.startRegion(r, entryMods.size());
        Iterator it = this.entryMods.entrySet().iterator();
        Set<InternalDistributedMember> newMemberSet = new HashSet<InternalDistributedMember>();

        if (r.getScope().isDistributed()) {
          DistributedRegion dr = (DistributedRegion) r;
          msg.addViewVersion(dr, dr.getDistributionAdvisor().startOperation());
          newMemberSet.addAll(dr.getCacheDistributionAdvisor().adviseTX());
        }

        while (it.hasNext()) {
          Map.Entry me = (Map.Entry) it.next();
          Object eKey = me.getKey();
          TXEntryState txes = (TXEntryState) me.getValue();
          txes.buildMessage(r, eKey, msg, this.otherMembers);
          if (txes.getFilterRoutingInfo() != null) {
            newMemberSet.addAll(txes.getFilterRoutingInfo().getMembers());
          }
          if (txes.getAdjunctRecipients() != null) {
            newMemberSet.addAll(txes.getAdjunctRecipients());
          }

        }



        if (!newMemberSet.equals(this.otherMembers)) {
          // r.getCache().getLogger().info("DEBUG: participants list has changed! bug 32999.");
          // Flag the message that the lock manager needs to be updated with the new member set
          msg.setUpdateLockMembers();
          this.otherMembers = newMemberSet;
        }

        msg.finishRegion(this.otherMembers);
      }
    } catch (RegionDestroyedException ex) {
      // region was destroyed out from under us; after conflict checking
      // passed. So act as if the region destroy happened right after the
      // commit. We act this way by doing nothing; including distribution
      // of this region's commit data.
    } catch (CancelException ex) {
      // cache was closed out from under us; after conflict checking
      // passed. So do nothing.
    }
  }

  void buildMessageForAdjunctReceivers(InternalRegion r, TXCommitMessage msg) {
    try {
      if (!r.getScope().isLocal() && !this.entryMods.isEmpty()) {

        msg.startRegion(r, entryMods.size());
        Iterator it = this.entryMods.entrySet().iterator();
        Set<InternalDistributedMember> newMemberSet = new HashSet<InternalDistributedMember>();

        while (it.hasNext()) {
          Map.Entry me = (Map.Entry) it.next();
          Object eKey = me.getKey();
          TXEntryState txes = (TXEntryState) me.getValue();
          txes.buildMessage(r, eKey, msg, this.otherMembers);
          if (txes.getFilterRoutingInfo() != null) {
            newMemberSet.addAll(txes.getFilterRoutingInfo().getMembers());
          }
          if (txes.getAdjunctRecipients() != null) {

            Set adjunctRecipients = txes.getAdjunctRecipients();
            newMemberSet.addAll(adjunctRecipients);
          }
        }


        if (!newMemberSet.equals(this.otherMembers)) {
          // r.getCache().getLogger().info("DEBUG: participants list has changed! bug 32999.");
          // Flag the message that the lock manager needs to be updated with the new member set
          msg.setUpdateLockMembers();
          this.otherMembers = newMemberSet;
        }

        msg.finishRegion(this.otherMembers);
      }
    } catch (RegionDestroyedException ex) {
      // region was destroyed out from under us; after conflict checking
      // passed. So act as if the region destroy happened right after the
      // commit. We act this way by doing nothing; including distribution
      // of this region's commit data.
    } catch (CancelException ex) {
      // cache was closed out from under us; after conflict checking
      // passed. So do nothing.
    }
  }


  void buildCompleteMessage(InternalRegion r, TXCommitMessage msg) {
    try {
      if (!this.entryMods.isEmpty()) {
        msg.startRegion(r, entryMods.size());
        Iterator it = this.entryMods.entrySet().iterator();
        while (it.hasNext()) {
          Map.Entry me = (Map.Entry) it.next();
          Object eKey = me.getKey();
          TXEntryState txes = (TXEntryState) me.getValue();
          txes.buildCompleteMessage(r, eKey, msg, this.otherMembers);
        }
        msg.finishRegionComplete();
      }
    } catch (RegionDestroyedException ex) {
      // region was destroyed out from under us; after conflict checking
      // passed. So act as if the region destroy happened right after the
      // commit. We act this way by doing nothing; including distribution
      // of this region's commit data.
    } catch (CancelException ex) {
      // cache was closed out from under us; after conflict checking
      // passed. So do nothing.
    }
  }



  void applyChangesStart(InternalRegion r, TXStateInterface txState) {
    try {
      r.txLRUStart();
    } catch (RegionDestroyedException ex) {
      // region was destroyed out from under us; after conflict checking
      // passed. So act as if the region destroy happened right after the
      // commit. We act this way by doing nothing; including distribution
      // of this region's commit data.
    } catch (CancelException ex) {
      // cache was closed out from under us; after conflict checking
      // passed. So do nothing.
    }
  }

  void applyChangesEnd(InternalRegion r, TXStateInterface txState) {
    try {
      try {
        if (this.uaMods != null) {
          Iterator it = this.uaMods.entrySet().iterator();
          while (it.hasNext()) {
            Map.Entry me = (Map.Entry) it.next();
            Object eKey = me.getKey();
            TXEntryUserAttrState txes = (TXEntryUserAttrState) me.getValue();
            txes.applyChanges(r, eKey);
          }
        }
      } finally {
        r.txLRUEnd();
      }
    } catch (RegionDestroyedException ex) {
      // region was destroyed out from under us; after conflict checking
      // passed. So act as if the region destroy happened right after the
      // commit. We act this way by doing nothing; including distribution
      // of this region's commit data.
    } catch (CancelException ex) {
      // cache was closed out from under us; after conflict checking
      // passed. So do nothing.
    }
  }

  void getEvents(InternalRegion r, ArrayList events, TXState txs) {
    {
      Iterator it = this.entryMods.entrySet().iterator();
      while (it.hasNext()) {
        Map.Entry me = (Map.Entry) it.next();
        Object eKey = me.getKey();
        TXEntryState txes = (TXEntryState) me.getValue();
        if (txes.isDirty() && txes.isOpAnyEvent(r)) {
          // OFFHEAP: these events are released when TXEvent.release is called
          events.add(txes.getEvent(r, eKey, txs));
        }
      }
    }
  }

  /**
   * Put all the entries this region knows about into the given "entries" list as instances of
   * TXEntryStateWithRegionAndKey.
   */
  void getEntries(ArrayList/* <TXEntryStateWithRegionAndKey> */ entries, InternalRegion r) {
    Iterator it = this.entryMods.entrySet().iterator();
    while (it.hasNext()) {
      Map.Entry me = (Map.Entry) it.next();
      Object eKey = me.getKey();
      TXEntryState txes = (TXEntryState) me.getValue();
      entries.add(new TXState.TXEntryStateWithRegionAndKey(txes, r, eKey));
    }
  }

  void cleanup(InternalRegion r) {
    if (this.cleanedUp)
      return;
    this.cleanedUp = true;
    Iterator it = this.entryMods.values().iterator();
    while (it.hasNext()) {
      TXEntryState es = (TXEntryState) it.next();
      es.cleanup(r);
    }
    this.region.setInUseByTransaction(false);
  }

  int getChanges() {
    int changes = 0;
    Iterator it = this.entryMods.entrySet().iterator();
    while (it.hasNext()) {
      Map.Entry me = (Map.Entry) it.next();
      TXEntryState txes = (TXEntryState) me.getValue();
      if (txes.isDirty()) {
        changes++;
      }
    }
    if (this.uaMods != null) {
      changes += this.uaMods.size();
    }
    return changes;
  }

  public TXState getTXState() {
    return txState;
  }

  public void close() {
    for (TXEntryState e : this.entryMods.values()) {
      e.close();
    }
  }

  @Override
  public String toString() {
    StringBuilder str = new StringBuilder();
    str.append("{").append(super.toString()).append(" ");
    str.append(" ,entryMods=").append(this.entryMods);
    str.append(" ,isCreatedDuringCommit=").append(this.isCreatedDuringCommit());
    str.append("}");
    return str.toString();
  }

  /**
   * @return the createdDuringCommit
   */
  public boolean isCreatedDuringCommit() {
    return createdDuringCommit;
  }

  /**
   * @param createdDuringCommit the createdDuringCommit to set
   */
  public void setCreatedDuringCommit(boolean createdDuringCommit) {
    this.createdDuringCommit = createdDuringCommit;
  }

  public boolean populateDistTxEntryStateList(ArrayList<DistTxThinEntryState> entryStateList) {
    String regionFullPath = this.getRegion().getFullPath();
    try {
      if (!this.entryMods.isEmpty()) {
        // [DISTTX] TODO Sort this first
        for (Entry<Object, TXEntryState> em : this.entryMods.entrySet()) {
          Object mKey = em.getKey();
          TXEntryState txes = em.getValue();
          DistTxThinEntryState thinEntryState = txes.getDistTxEntryStates();
          entryStateList.add(thinEntryState);
          if (logger.isDebugEnabled()) {
            logger.debug("TXRegionState.populateDistTxEntryStateList Added " + thinEntryState
                + " for key=" + mKey + " ,op=" + txes.opToString() + " ,region=" + regionFullPath);
          }
        }
      }
      return true;
    } catch (RegionDestroyedException ex) {
      // region was destroyed out from under us; after conflict checking
      // passed. So act as if the region destroy happened right after the
      // commit. We act this way by doing nothing; including distribution
      // of this region's commit data.
    } catch (CancelException ex) {
      // cache was closed out from under us; after conflict checking
      // passed. So do nothing.
    }
    if (logger.isDebugEnabled()) {
      logger.debug(
          "TXRegionState.populateDistTxEntryStateList Got exception for region " + regionFullPath);
    }
    return false;
  }

  public void setDistTxEntryStates(ArrayList<DistTxThinEntryState> entryEventList) {
    String regionFullPath = this.getRegion().getFullPath();
    int entryModsSize = this.entryMods.size();
    int entryEventListSize = entryEventList.size();
    if (entryModsSize != entryEventListSize) {
      throw new UnsupportedOperationInTransactionException(
          String.format("Expected %s during a distributed transaction but got %s",
              "entry size of " + entryModsSize + " for region " + regionFullPath,
              entryEventListSize));
    }

    int index = 0;
    // [DISTTX] TODO Sort this first
    for (Entry<Object, TXEntryState> em : this.entryMods.entrySet()) {
      Object mKey = em.getKey();
      TXEntryState txes = em.getValue();
      DistTxThinEntryState thinEntryState = entryEventList.get(index++);
      txes.setDistTxEntryStates(thinEntryState);
      if (logger.isDebugEnabled()) {
        logger.debug("TxRegionState.setDistTxEntryStates Added " + thinEntryState + " for key="
            + mKey + " ,op=" + txes.opToString() + " ,region=" + regionFullPath);
      }
    }
  }
}
