/*
 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
 * agreements. See the NOTICE file distributed with this work for additional information regarding
 * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance with the License. You may obtain a
 * copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License
 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
 * or implied. See the License for the specific language governing permissions and limitations under
 * the License.
 */
package org.apache.geode.internal.cache;

import java.io.BufferedInputStream;
import java.io.ByteArrayInputStream;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.EOFException;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.PrintStream;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;

import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
import it.unimi.dsi.fastutil.longs.LongIterator;
import it.unimi.dsi.fastutil.longs.LongOpenHashSet;
import it.unimi.dsi.fastutil.objects.ObjectIterator;
import org.apache.commons.io.FileUtils;
import org.apache.logging.log4j.Logger;

import org.apache.geode.CancelCriterion;
import org.apache.geode.CancelException;
import org.apache.geode.DataSerializer;
import org.apache.geode.Instantiator;
import org.apache.geode.cache.DiskAccessException;
import org.apache.geode.cache.EvictionAction;
import org.apache.geode.cache.EvictionAlgorithm;
import org.apache.geode.cache.RegionAttributes;
import org.apache.geode.cache.RegionDestroyedException;
import org.apache.geode.compression.Compressor;
import org.apache.geode.internal.HeapDataOutputStream;
import org.apache.geode.internal.InternalDataSerializer;
import org.apache.geode.internal.InternalInstantiator;
import org.apache.geode.internal.InternalInstantiator.InstantiatorAttributesHolder;
import org.apache.geode.internal.cache.persistence.CanonicalIdHolder;
import org.apache.geode.internal.cache.persistence.DiskExceptionHandler;
import org.apache.geode.internal.cache.persistence.DiskInitFileInterpreter;
import org.apache.geode.internal.cache.persistence.DiskInitFileParser;
import org.apache.geode.internal.cache.persistence.DiskRegionView;
import org.apache.geode.internal.cache.persistence.DiskStoreID;
import org.apache.geode.internal.cache.persistence.PRPersistentConfig;
import org.apache.geode.internal.cache.persistence.PersistentMemberID;
import org.apache.geode.internal.cache.persistence.PersistentMemberPattern;
import org.apache.geode.internal.cache.versions.DiskRegionVersionVector;
import org.apache.geode.internal.cache.versions.RegionVersionHolder;
import org.apache.geode.internal.cache.versions.RegionVersionVector;
import org.apache.geode.internal.logging.log4j.LogMarker;
import org.apache.geode.internal.serialization.KnownVersion;
import org.apache.geode.internal.serialization.VersioningIO;
import org.apache.geode.logging.internal.log4j.api.LogService;

/**
 * Does all the IF file work for a DiskStoreImpl.
 *
 *
 * @since GemFire prPersistSprint1
 */
public class DiskInitFile implements DiskInitFileInterpreter {
  private static final Logger logger = LogService.getLogger();

  public static final String IF_FILE_EXT = ".if";

  public static final byte IF_EOF_ID = 0;
  public static final byte END_OF_RECORD_ID = 21;
  static final int OPLOG_FILE_ID_REC_SIZE = 1 + 8 + 1;

  private final ReentrantLock lock = new ReentrantLock();

  /**
   * Written to IF Byte Format: 8: leastSigBits of UUID 8: mostSigBits of UUID 1: EndOfRecordMarker
   */
  public static final byte IFREC_DISKSTORE_ID = 56;

  /**
   * Written to IF Byte Format: 4: instantiatorID 4: classNameLength classNameLength: className
   * bytes 4: instClassNameLength instClassNameLength: instClassName bytes 1: EndOfRecordMarker
   */
  public static final byte IFREC_INSTANTIATOR_ID = 57;

  /**
   * Written to IF Byte Format: 4: classNameLength classNameLength: className bytes 1:
   * EndOfRecordMarker
   */
  public static final byte IFREC_DATA_SERIALIZER_ID = 58;
  /**
   * Written to IF Used to say that persistent member id is online. Byte Format: RegionId 4:
   * blobLength blobLength: member bytes 1: EndOfRecordMarker
   *
   * @since GemFire prPersistSprint1
   */
  public static final byte IFREC_ONLINE_MEMBER_ID = 59;
  /**
   * Written to IF Used to say that persistent member id is offline. Byte Format: RegionId 4:
   * blobLength blobLength: member bytes 1: EndOfRecordMarker
   *
   * @since GemFire prPersistSprint1
   */
  public static final byte IFREC_OFFLINE_MEMBER_ID = 60;
  /**
   * Written to IF Used to say that persistent member id no longer exists. Byte Format: RegionId 4:
   * blobLength blobLength: member bytes 1: EndOfRecordMarker
   *
   * @since GemFire prPersistSprint1
   */
  public static final byte IFREC_RM_MEMBER_ID = 61;
  /**
   * Written to IF. Used to record the persistent member id of this file. Byte Format: RegionId 4:
   * blobLength blobLength: member bytes 1: EndOfRecordMarker
   *
   * @since GemFire prPersistSprint1
   */
  public static final byte IFREC_MY_MEMBER_INITIALIZING_ID = 62;
  /**
   * Written to IF. Used to record the previous my member id completed initialization. Byte Format:
   * RegionId 1: EndOfRecordMarker
   *
   * @since GemFire prPersistSprint1
   */
  public static final byte IFREC_MY_MEMBER_INITIALIZED_ID = 63;

  /**
   * Written to IF. Used to record create of region Byte Format: RegionId 4: nameLength nameLength:
   * name bytes 1: EndOfRecordMarker
   *
   * @since GemFire prPersistSprint1
   */
  public static final byte IFREC_CREATE_REGION_ID = 64;
  /**
   * Written to IF. Used to record begin of destroy of region Byte Format: RegionId 1:
   * EndOfRecordMarker
   *
   * @since GemFire prPersistSprint1
   */
  public static final byte IFREC_BEGIN_DESTROY_REGION_ID = 65;
  /**
   * Written to IF. Used to record clear of region Byte Format: RegionId 8: oplogEntryId 1:
   * EndOfRecordMarker
   *
   * @since GemFire prPersistSprint1
   */
  public static final byte IFREC_CLEAR_REGION_ID = 66;
  /**
   * Written to IF. Used to record that the end of a destroy region. Byte Format: RegionId 1:
   * EndOfRecordMarker
   *
   * @since GemFire prPersistSprint1
   */
  public static final byte IFREC_END_DESTROY_REGION_ID = 67;
  /**
   * Written to IF. Used to record that a region is about to be partially destroyed Byte Format:
   * RegionId 1: EndOfRecordMarker
   *
   * @since GemFire prPersistSprint1
   */
  public static final byte IFREC_BEGIN_PARTIAL_DESTROY_REGION_ID = 68;
  /**
   * Written to IF. Used to record that a region is partially destroyed Byte Format: RegionId 1:
   * EndOfRecordMarker
   *
   * @since GemFire prPersistSprint1
   */
  public static final byte IFREC_END_PARTIAL_DESTROY_REGION_ID = 69;

  /**
   * Records the creation of an oplog crf file Byte Format: 8: oplogId 1: EndOfRecord
   */
  public static final byte IFREC_CRF_CREATE = 70;

  /**
   * Records the creation of an oplog drf file Byte Format: 8: oplogId 1: EndOfRecord
   */
  public static final byte IFREC_DRF_CREATE = 71;

  /**
   * Records the deletion of an oplog crf file Byte Format: 8: oplogId 1: EndOfRecord
   */
  public static final byte IFREC_CRF_DELETE = 72;

  /**
   * Records the deletion of an oplog drf file Byte Format: 8: oplogId 1: EndOfRecord
   */
  public static final byte IFREC_DRF_DELETE = 73;

  /**
   * Written to IF. Used to record regions config Byte Format: RegionId 1: lruAlgorithm 1: lruAction
   * 4: lruLimit (int) // no need to ObjectSize during recovery since all data is in blob form 4:
   * concurrencyLevel (int) 4: initialCapacity (int) 4: loadFactor (float) 1: statisticsEnabled
   * (boolean) 1: isBucket (boolean) 1: EndOfRecordMarker
   *
   * Used to read the region configuration for 6.5 disk stores.
   *
   */
  public static final byte IFREC_REGION_CONFIG_ID = 74;

  /*
   * Written to IF Used to say that persistent member id is offline and has the same data on disk as
   * this member Byte Format: RegionId 4: blobLength blobLength: member bytes 1: EndOfRecordMarker
   *
   * @since GemFire prPersistSprint3
   */
  public static final byte IFREC_OFFLINE_AND_EQUAL_MEMBER_ID = 75;
  /**
   * Written to IF. Used to record regions config Byte Format: RegionId 1: lruAlgorithm 1: lruAction
   * 4: lruLimit (int) // no need to ObjectSize during recovery since all data is in blob form 4:
   * concurrencyLevel (int) 4: initialCapacity (int) 4: loadFactor (float) 1: statisticsEnabled
   * (boolean) 1: isBucket (boolean) 4: length of partitionName String bytes (int) length:actual
   * bytes 4: startingBucketId(int) 1: EndOfRecordMarker
   */
  public static final byte IFREC_REGION_CONFIG_ID_66 = 76;


  /**
   * Records the creation of an oplog krf file The presence of this record indicates that the krf
   * file is complete. Byte Format: 8: oplogId 1: EndOfRecord
   */
  public static final byte IFREC_KRF_CREATE = 77;

  /**
   * Records the creation of a persistent partitioned region configuration. Byte Format: variable:
   * pr name 4: total num buckets variable: colocated with 1: EndOfRecord
   */
  public static final byte IFREC_PR_CREATE = 78;

  /**
   * Records the deletion of persistent partitioned region. Byte Format: variable: pr name 1:
   * EndOfRecord
   */
  public static final byte IFREC_PR_DESTROY = 79;


  /**
   * Maps a member id (either a disk store ID or a distributed system id plus a byte) to a single
   * integer, which can be used in oplogs.
   *
   * Byte Format: 4: the number assigned to this id. variable: serialized object representing the
   * ID. variable: pr name 1: EndOfRecord
   */
  public static final byte IFREC_ADD_CANONICAL_MEMBER_ID = 80;
  /**
   * Written to IF Used to say that a disk store has been revoked Byte Format: variable: a
   * PersistentMemberPattern
   *
   * @since GemFire 7.0
   */
  public static final byte IFREC_REVOKE_DISK_STORE_ID = 81;

  /**
   * Written gemfire version to IF Byte Format: 1: version byte from Version.GFE_CURRENT.ordinal 1:
   * EndOfRecord
   *
   * @since GemFire 7.0
   */
  public static final byte IFREC_GEMFIRE_VERSION = 82;

  /**
   * Written to IF. Used to record clear of using an RVV Byte Format: RegionId variable: serialized
   * RVV 1: EndOfRecordMarker
   *
   * @since GemFire 7.0
   */
  public static final byte IFREC_CLEAR_REGION_WITH_RVV_ID = 83;

  /**
   * Written to IF. Used to record regions config Byte Format: RegionId 1: lruAlgorithm 1: lruAction
   * 4: lruLimit (int) // no need to ObjectSize during recovery since all data is in blob form 4:
   * concurrencyLevel (int) 4: initialCapacity (int) 4: loadFactor (float) 1: statisticsEnabled
   * (boolean) 1: isBucket (boolean) variable: partitionName (utf) 4: startingBucketId (int)
   * variable: compressorClassName (utf) 1: versioned (boolean) 1: EndOfRecordMarker
   *
   */
  public static final byte IFREC_REGION_CONFIG_ID_80 = 88;

  /**
   * Persist oplog file magic number. Written once at the beginning of every oplog file; CRF, DRF,
   * KRF, and IF. Followed by 6 byte magic number. Each oplog type has a different magic number
   * Followed by EndOfRecord Fix for bug 43824
   *
   * @since GemFire 8.0
   */
  public static final byte OPLOG_MAGIC_SEQ_ID = 89;

  /**
   * Written to IF. Used to record regions config Byte Format: RegionId 1: lruAlgorithm 1: lruAction
   * 4: lruLimit (int) // no need to ObjectSize during recovery since all data is in blob form 4:
   * concurrencyLevel (int) 4: initialCapacity (int) 4: loadFactor (float) 1: statisticsEnabled
   * (boolean) 1: isBucket (boolean) variable: partitionName (utf) 4: startingBucketId (int)
   * variable: compressorClassName (utf) 1: versioned (boolean) 1: offHeap (boolean) added in 9.0 1:
   * EndOfRecordMarker
   *
   * @since Geode 1.0
   */
  public static final byte IFREC_REGION_CONFIG_ID_90 = 90;

  private final DiskStoreImpl parent;

  private final File ifFile;
  private RandomAccessFile ifRAF;
  private boolean closed;
  // contains the ids of dataSerializers already written to IF
  private final IntOpenHashSet dsIds;
  // contains the ids of instantiators already written to IF
  private final IntOpenHashSet instIds;

  private final LongOpenHashSet crfIds;
  private final LongOpenHashSet drfIds;
  private final Set<Long> krfIds;

  /**
   * Map used to keep track of regions we know of from the DiskInitFile but that do not yet exist
   * (they have not yet been recovered or they have been closed).
   */
  private final Map<Long, PlaceHolderDiskRegion> drMap = new HashMap<Long, PlaceHolderDiskRegion>();
  private final Map<String, PlaceHolderDiskRegion> drMapByName =
      new HashMap<String, PlaceHolderDiskRegion>();

  /**
   * Map of persistent partitioned regions configurations that are stored in this init file.
   */
  private final Map<String, PRPersistentConfig> prMap = new HashMap<String, PRPersistentConfig>();

  private final InternalDataSerializer.RegistrationListener regListener;

  private int ifLiveRecordCount = 0;
  private int ifTotalRecordCount = 0;
  private boolean compactInProgress;
  // the recovered version
  private KnownVersion gfversion;


  /**
   * Used to calculate the highest oplog entry id we have seen in a clear entry.
   */
  private long clearOplogEntryIdHWM = DiskStoreImpl.INVALID_ID;

  /**
   * Container for canonical ids held in the disk store. Member ids are canonicalized so they can be
   * written as an integer in the oplogs.
   */
  private final CanonicalIdHolder canonicalIdHolder = new CanonicalIdHolder();

  /**
   * Set of members that have been revoked. We keep track of the revoked members so that we can
   * indicate to the user a member has been revoked, rather is simply conflicting
   */
  private final Set<PersistentMemberPattern> revokedMembers =
      new HashSet<PersistentMemberPattern>();

  private transient long nextSeekPosition;

  private transient boolean gotEOF;

  private void lock(boolean useBackupLock) {
    if (useBackupLock) {
      getDiskStore().getBackupLock().lock();
    }
    this.lock.lock();
  }

  private void unlock(boolean useBackupLock) {
    if (useBackupLock) {
      getDiskStore().getBackupLock().unlock();
    }
    this.lock.unlock();
  }

  private void recoverFromFailedCompaction() {
    File tmpFile = getTempFile();
    if (tmpFile.exists()) {
      // if the temp init file exists then we must have crashed during a compaction.
      // In this case we need to destroy the non temp file and rename the temp file.
      if (this.ifFile.exists()) {
        if (!this.ifFile.delete()) {
          throw new IllegalStateException("Could not delete " + this.ifFile);
        }
        if (!tmpFile.renameTo(this.ifFile)) {
          throw new IllegalStateException("Could not rename " + tmpFile + " to " + this.ifFile);
        }
      }
    }
  }

  private DiskStoreImpl getDiskStore() {
    return this.parent;
  }

  public KnownVersion currentRecoveredGFVersion() {
    return this.gfversion;
  }

  DiskStoreID recover() {
    recoverFromFailedCompaction();
    if (!this.ifFile.exists()) {
      // nothing to recover
      // Instead of calling randomUUID which uses SecureRandom which can be slow
      // return UUID.randomUUID();
      // create a UUID using the cheaper Random class.
      return new DiskStoreID(UUID.randomUUID());
    }
    DiskStoreID result = null;
    try {
      FileInputStream fis = null;
      CountingDataInputStream dis = null;
      try {
        fis = new FileInputStream(this.ifFile);
        dis = new CountingDataInputStream(new BufferedInputStream(fis, 8 * 1024),
            this.ifFile.length());
        DiskInitFileParser parser = new DiskInitFileParser(dis, this);
        result = parser.parse();

        this.gotEOF = parser.gotEOF();
        this.nextSeekPosition = dis.getCount();
        if (logger.isTraceEnabled(LogMarker.PERSIST_RECOVERY_VERBOSE)) {
          logger.trace(LogMarker.PERSIST_RECOVERY_VERBOSE, "liveRecordCount={} totalRecordCount={}",
              this.ifLiveRecordCount, this.ifTotalRecordCount);
        }
      } finally {
        if (dis != null) {
          dis.close();
        }
        if (fis != null) {
          fis.close();
        }
      }
      for (PlaceHolderDiskRegion drv : this.drMap.values()) {
        if (drv.getMyPersistentID() != null || drv.getMyInitializingID() != null) {
          // Prepare each region we found in the init file for early recovery.
          if (drv.isBucket() || !getDiskStore().getOwnedByRegion()) {
            getDiskStore().getStats().incUncreatedRecoveredRegions(1);
            drv.setRecoveredEntryMap(RegionMapFactory.createVM(drv, getDiskStore(),
                getDiskStore().getInternalRegionArguments()));
            if (!getDiskStore().isOffline()) {
              // schedule it for recovery since we want to recovery region data early now
              getDiskStore().scheduleForRecovery(drv);
            }
            // else if we are validating or offlineCompacting
            // then the scheduleForRecovery is called later in DiskStoreImpl
            // this helps fix bug 42043
          }
        }
      }
    } catch (EOFException ex) {
      // ignore since a partial record write can be caused by a crash
      // throw new
      // DiskAccessException(String.format("Failed to read file during recovery from %s",
      // this.ifFile.getPath()), ex, this.parent);
    } catch (ClassNotFoundException ex) {
      throw new DiskAccessException(
          String.format("Failed to read file during recovery from %s",
              this.ifFile.getPath()),
          ex, this.parent);
    } catch (IOException ex) {
      throw new DiskAccessException(
          String.format("Failed to read file during recovery from %s",
              this.ifFile.getPath()),
          ex, this.parent);
    } catch (CancelException ignore) {
      if (logger.isDebugEnabled()) {
        logger.debug("Oplog::readOplog:Error in recovery as Cache was closed", ignore);
      }
    } catch (RegionDestroyedException ignore) {
      if (logger.isDebugEnabled()) {
        logger.debug("Oplog::readOplog:Error in recovery as Region was destroyed", ignore);
      }
    } catch (IllegalStateException ex) {
      if (!this.parent.isClosing()) {
        throw ex;
      }
    }
    return result;
  }

  @Override
  public void cmnClearRegion(long drId, long clearOplogEntryId) {
    DiskRegionView drv = getDiskRegionById(drId);
    if (drv.getClearOplogEntryId() == DiskStoreImpl.INVALID_ID) {
      this.ifLiveRecordCount++;
    }
    // otherwise previous clear is cancelled so don't change liveRecordCount
    this.ifTotalRecordCount++;
    drv.setClearOplogEntryId(clearOplogEntryId);
    if (clearOplogEntryId > clearOplogEntryIdHWM) {
      clearOplogEntryIdHWM = clearOplogEntryId;
    }
  }

  @Override
  public void cmnClearRegion(long drId,
      ConcurrentHashMap<DiskStoreID, RegionVersionHolder<DiskStoreID>> memberToVersion) {
    DiskRegionView drv = getDiskRegionById(drId);
    if (drv.getClearRVV() == null) {
      this.ifLiveRecordCount++;
    }
    // otherwise previous clear is cancelled so don't change liveRecordCount
    this.ifTotalRecordCount++;

    DiskStoreID ownerId = parent.getDiskStoreID();
    // Create a fake RVV for clear purposes. We only need to memberToVersion information
    RegionVersionHolder<DiskStoreID> ownerExceptions = memberToVersion.remove(ownerId);
    long ownerVersion = ownerExceptions == null ? 0 : ownerExceptions.getVersion();
    RegionVersionVector rvv = new DiskRegionVersionVector(ownerId, memberToVersion, ownerVersion,
        new ConcurrentHashMap(), 0L, false, ownerExceptions);
    drv.setClearRVV(rvv);
  }

  private int liveRegions = 0; // added for bug 41618

  public boolean hasLiveRegions() {
    lock(false);
    try {
      return this.liveRegions > 0;
    } finally {
      unlock(false);
    }
  }

  @Override
  public void cmnCreateRegion(long drId, String regName) {
    recoverDiskRegion(drId, regName);
    this.liveRegions++;
    this.ifLiveRecordCount++;
    this.ifTotalRecordCount++;
  }

  @Override
  public void cmnRegionConfig(long drId, byte lruAlgorithm, byte lruAction, int lruLimit,
      int concurrencyLevel, int initialCapacity, float loadFactor, boolean statisticsEnabled,
      boolean isBucket, EnumSet<DiskRegionFlag> flags, String partitionName, int startingBucketId,
      String compressorClassName, boolean offHeap) {
    DiskRegionView dr = getDiskRegionById(drId);
    if (dr != null) {
      // We need to add the IS_WITH_VERSIONING to persistent regions
      // during the upgrade. Previously, all regions had versioning enabled
      // but now only regions that have this flag will have versioning enabled.
      //
      // We don't want gateway queues to turn on versioning. Unfortunately, the only
      // way to indentify that a region is a gateway queue is by the region
      // name.
      if (KnownVersion.GFE_80.compareTo(currentRecoveredGFVersion()) > 0
          && !dr.getName().contains("_SERIAL_GATEWAY_SENDER_QUEUE")
          && !dr.getName().contains("_PARALLEL__GATEWAY__SENDER__QUEUE")) {
        flags.add(DiskRegionFlag.IS_WITH_VERSIONING);
      }
      dr.setConfig(lruAlgorithm, lruAction, lruLimit, concurrencyLevel, initialCapacity, loadFactor,
          statisticsEnabled, isBucket, flags, partitionName, startingBucketId, compressorClassName,
          offHeap);

      // Just count this as a live record even though it is possible
      // that we have an extra one due to the config changing while
      // we were offline.
      this.ifLiveRecordCount++;
      this.ifTotalRecordCount++;
    } else {
      if (logger.isTraceEnabled(LogMarker.PERSIST_RECOVERY_VERBOSE)) {
        logger.trace(LogMarker.PERSIST_RECOVERY_VERBOSE, "bad disk region id!");
      } else {
        throw new IllegalStateException("bad disk region id");
      }
    }
  }

  @Override
  public boolean cmnPRCreate(String name, PRPersistentConfig config) {
    if (this.prMap.put(name, config) == null) {
      this.ifLiveRecordCount++;
      this.ifTotalRecordCount++;
      this.liveRegions++;
      return true;
    }

    return false;
  }

  @Override
  public void cmnGemfireVersion(KnownVersion version) {
    this.gfversion = version;
  }

  @Override
  public boolean cmnPRDestroy(String name) {
    if (this.prMap.remove(name) != null) {
      this.ifLiveRecordCount--;
      this.ifTotalRecordCount++;
      this.liveRegions--;
      return true;
    }
    return false;
  }

  @Override
  public void cmnAddCanonicalMemberId(int id, Object object) {
    this.canonicalIdHolder.addMapping(id, object);
    this.ifLiveRecordCount++;
    this.ifTotalRecordCount++;
  }

  @Override
  public void cmnRmMemberId(long drId, PersistentMemberID pmid) {
    DiskRegionView dr = getDiskRegionById(drId);
    if (dr != null) {
      if (!dr.rmOnlineMember(pmid)) {
        if (!dr.rmOfflineMember(pmid)) {
          dr.rmEqualMember(pmid);
        }
      }
      // since we removed a member don't inc the live count
      // In fact decrement it by one since both this record
      // and the previous one are both garbage.
      this.ifLiveRecordCount--;
      this.ifTotalRecordCount++;
    } else {
      if (logger.isTraceEnabled(LogMarker.PERSIST_RECOVERY_VERBOSE)) {
        logger.trace(LogMarker.PERSIST_RECOVERY_VERBOSE, "bad disk region id!");
      } else {
        throw new IllegalStateException("bad disk region id");
      }
    }
  }

  @Override
  public void cmnOfflineMemberId(long drId, PersistentMemberID pmid) {
    DiskRegionView dr = getDiskRegionById(drId);
    if (dr != null) {
      dr.addOfflineMember(pmid);
      if (dr.rmOnlineMember(pmid) || dr.rmEqualMember(pmid)) {
        this.ifLiveRecordCount--;
      }
      this.ifLiveRecordCount++;
      this.ifTotalRecordCount++;
    } else {
      if (logger.isTraceEnabled(LogMarker.PERSIST_RECOVERY_VERBOSE)) {
        logger.trace(LogMarker.PERSIST_RECOVERY_VERBOSE, "bad disk region id!");
      } else {
        throw new IllegalStateException("bad disk region id");
      }
    }
  }

  @Override
  public void cmdOfflineAndEqualMemberId(long drId, PersistentMemberID pmid) {
    DiskRegionView dr = getDiskRegionById(drId);
    if (dr != null) {
      if (this.parent.upgradeVersionOnly
          && KnownVersion.GFE_70.compareTo(currentRecoveredGFVersion()) > 0) {
        dr.addOnlineMember(pmid);
        if (dr.rmOfflineMember(pmid)) {
          this.ifLiveRecordCount--;
        }
      } else {
        dr.addOfflineAndEqualMember(pmid);
        if (dr.rmOnlineMember(pmid) || dr.rmOfflineMember(pmid)) {
          this.ifLiveRecordCount--;
        }
      }
      this.ifLiveRecordCount++;
      this.ifTotalRecordCount++;
    } else {
      if (logger.isTraceEnabled(LogMarker.PERSIST_RECOVERY_VERBOSE)) {
        logger.trace(LogMarker.PERSIST_RECOVERY_VERBOSE, "bad disk region id!");
      } else {
        throw new IllegalStateException("bad disk region id");
      }
    }
  }

  @Override
  public void cmnOnlineMemberId(long drId, PersistentMemberID pmid) {
    DiskRegionView dr = getDiskRegionById(drId);
    if (dr != null) {
      dr.addOnlineMember(pmid);
      if (dr.rmOfflineMember(pmid) || dr.rmEqualMember(pmid)) {
        this.ifLiveRecordCount--;
      }
      this.ifLiveRecordCount++;
      this.ifTotalRecordCount++;
    } else {
      if (logger.isTraceEnabled(LogMarker.PERSIST_RECOVERY_VERBOSE)) {
        logger.trace(LogMarker.PERSIST_RECOVERY_VERBOSE, "bad disk region id!");
      } else {
        throw new IllegalStateException("bad disk region id");
      }
    }
  }

  @Override
  public void cmnDataSerializerId(Class dsc) {
    if (dsc != null) {
      DataSerializer ds = InternalDataSerializer.register(dsc, /* dsId, */ true);
      this.dsIds.add(ds.getId());
    }
    this.ifLiveRecordCount++;
    this.ifTotalRecordCount++;
  }

  @Override
  public void cmnInstantiatorId(int id, Class c, Class ic) {
    if (c != null && ic != null) {
      InternalInstantiator.register(c, ic, id, true);
      this.instIds.add(id);
    }
    this.ifLiveRecordCount++;
    this.ifTotalRecordCount++;
  }

  @Override
  public void cmnInstantiatorId(int id, String cn, String icn) {
    if (cn != null && icn != null) {
      InternalInstantiator.register(cn, icn, id, true);
      this.instIds.add(id);
    }
    this.ifLiveRecordCount++;
    this.ifTotalRecordCount++;
  }

  @Override
  public void cmnCrfCreate(long oplogId) {
    this.crfIds.add(oplogId);
    this.ifLiveRecordCount++;
    this.ifTotalRecordCount++;
  }

  @Override
  public void cmnDrfCreate(long oplogId) {
    this.drfIds.add(oplogId);
    this.ifLiveRecordCount++;
    this.ifTotalRecordCount++;
  }

  @Override
  public void cmnKrfCreate(long oplogId) {
    this.krfIds.add(oplogId);
    this.ifLiveRecordCount++;
    this.ifTotalRecordCount++;
  }

  @Override
  public boolean cmnCrfDelete(long oplogId) {
    if (this.krfIds.remove(oplogId)) {
      this.ifLiveRecordCount--;
      this.ifTotalRecordCount++;
    }
    if (this.crfIds.remove(oplogId)) {
      this.ifLiveRecordCount--;
      this.ifTotalRecordCount++;
      return true;
    } else {
      return false;
    }
  }

  @Override
  public boolean cmnDrfDelete(long oplogId) {
    if (this.drfIds.remove(oplogId)) {
      this.ifLiveRecordCount--;
      this.ifTotalRecordCount++;
      return true;
    } else {
      return false;
    }
  }

  public boolean isCRFOplogIdPresent(long crfId) {
    return this.crfIds.contains(crfId);
  }

  public boolean isDRFOplogIdPresent(long drfId) {
    return this.drfIds.contains(drfId);
  }

  public void verifyOplogs(LongOpenHashSet foundCrfs, LongOpenHashSet foundDrfs) {
    verifyOplogs(foundCrfs, foundDrfs, this.crfIds, this.drfIds);
  }

  public void verifyOplogs(LongOpenHashSet foundCrfs, LongOpenHashSet foundDrfs,
      LongOpenHashSet expectedCrfIds, LongOpenHashSet expectedDrfIds) {
    LongOpenHashSet missingCrfs = calcMissing(foundCrfs, expectedCrfIds);
    LongOpenHashSet missingDrfs = calcMissing(foundDrfs, expectedDrfIds);
    // Note that finding extra ones is ok; it is possible we died just
    // after creating one but before we could record it in the if file
    // Or died just after deleting it but before we could record it in the if file.
    boolean failed = false;
    String msg = null;
    if (!missingCrfs.isEmpty()) {
      failed = true;
      msg = "*.crf files with these ids: " + Arrays.toString(missingCrfs.toArray());
    }
    if (!missingDrfs.isEmpty()) {
      failed = true;
      if (msg == null) {
        msg = "";
      } else {
        msg += ", ";
      }
      msg += "*.drf files with these ids: " + Arrays.toString(missingDrfs.toArray());
    }
    if (failed) {
      msg = "The following required files could not be found: " + msg + ".";
      throw new IllegalStateException(msg);
    }
  }

  private LongOpenHashSet calcMissing(LongOpenHashSet found, LongOpenHashSet expected) {
    LongOpenHashSet missing = new LongOpenHashSet(expected);
    missing.removeAll(found);
    return missing;
  }

  boolean hasKrf(long oplogId) {
    return krfIds.contains(oplogId);
  }

  DiskRegionView takeDiskRegionByName(String name) {
    lock(false);
    try {
      DiskRegionView result = this.drMapByName.remove(name);
      if (result != null) {
        this.drMap.remove(result.getId());
      }
      return result;
    } finally {
      unlock(false);
    }
  }

  Map<Long, PlaceHolderDiskRegion> getDRMap() {
    lock(false);
    try {
      return new HashMap<Long, PlaceHolderDiskRegion>(drMap);
    } finally {
      unlock(false);
    }
  }

  DiskRegion createDiskRegion(DiskStoreImpl dsi, String name, boolean isBucket,
      boolean isPersistBackup, boolean overflowEnabled, boolean isSynchronous,
      DiskRegionStats stats, CancelCriterion cancel, DiskExceptionHandler exceptionHandler,
      RegionAttributes ra, EnumSet<DiskRegionFlag> flags, String partitionName,
      int startingBucketId, Compressor compressor, boolean offHeap) {
    lock(true);
    try {
      // need to call the constructor and addDiskRegion while synced
      DiskRegion result = new DiskRegion(dsi, name, isBucket, isPersistBackup, overflowEnabled,
          isSynchronous, stats, cancel, exceptionHandler, ra, flags, partitionName,
          startingBucketId, compressor == null ? null : compressor.getClass().getName(), offHeap);
      dsi.addDiskRegion(result);
      return result;
    } finally {
      unlock(true);
    }
  }

  DiskRegionView getDiskRegionByName(String name) {
    lock(false);
    try {
      return this.drMapByName.get(name);
    } finally {
      unlock(false);
    }
  }

  DiskRegionView getDiskRegionByPrName(String name) {
    lock(false);
    try {
      for (PlaceHolderDiskRegion dr : this.drMapByName.values()) {
        if (dr.isBucket()) {
          if (name.equals(dr.getPrName())) {
            return dr;
          }
        }
      }
      return null;
    } finally {
      unlock(false);
    }
  }

  private DiskRegionView getDiskRegionById(long drId) {
    DiskRegionView result = this.drMap.get(drId);
    if (result == null) {
      result = this.parent.getById(drId);
    }
    return result;
  }


  private void recoverDiskRegion(long drId, String regName) {
    // Whatever the last create region drId we see we remember
    // in the DiskStore. Note that this could be a region that is destroyed
    // (we will not know until we see a later destroy region record)
    // but since drId's can wrap around into negative numbers whatever
    // the last one we see is the right one to remember.
    this.parent.recoverRegionId(drId);
    PlaceHolderDiskRegion dr = new PlaceHolderDiskRegion(this.parent, drId, regName);
    Object old = this.drMap.put(drId, dr);
    assert old == null;
    PlaceHolderDiskRegion oldDr = this.drMapByName.put(regName, dr);
    if (oldDr != null) {
      this.drMap.remove(oldDr.getId()); // fix for bug 42043
    }
    // don't schedule for recovery until we know it was not destroyed
  }

  /**
   * Maximum number of bytes used to encode a DiskRegion id.
   */
  static final int DR_ID_MAX_BYTES = 9;

  private void writeIFRecord(byte b, DiskRegionView dr) {
    assert lock.isHeldByCurrentThread();
    try {
      ByteBuffer bb = getIFWriteBuffer(1 + DR_ID_MAX_BYTES + 1);
      bb.put(b);
      putDiskRegionID(bb, dr.getId());
      bb.put(END_OF_RECORD_ID);
      writeIFRecord(bb, false); // don't do stats for these small records
    } catch (IOException ex) {
      DiskAccessException dae = new DiskAccessException(
          String.format("Failed writing data to initialization file because: %s", ex),
          this.parent);
      if (!this.compactInProgress) {
        this.parent.handleDiskAccessException(dae);
      }
      throw dae;
    }
  }

  private void writeIFRecord(byte b, DiskRegionView dr, long v) {
    assert lock.isHeldByCurrentThread();
    try {
      ByteBuffer bb = getIFWriteBuffer(1 + DR_ID_MAX_BYTES + 8 + 1);
      bb.put(b);
      putDiskRegionID(bb, dr.getId());
      bb.putLong(v);
      bb.put(END_OF_RECORD_ID);
      writeIFRecord(bb, false); // don't do stats for these small records
    } catch (IOException ex) {
      DiskAccessException dae = new DiskAccessException(
          String.format("Failed writing data to initialization file because: %s", ex),
          this.parent);
      if (!this.compactInProgress) {
        this.parent.handleDiskAccessException(dae);
      }
      throw dae;
    }
  }

  private void writeIFRecord(byte b, long v) {
    assert lock.isHeldByCurrentThread();
    try {
      ByteBuffer bb = getIFWriteBuffer(OPLOG_FILE_ID_REC_SIZE);
      bb.put(b);
      bb.putLong(v);
      bb.put(END_OF_RECORD_ID);
      writeIFRecord(bb, false); // don't do stats for these small records
    } catch (IOException ex) {
      DiskAccessException dae = new DiskAccessException(
          String.format("Failed writing data to initialization file because: %s", ex),
          this.parent);
      if (!this.compactInProgress) {
        this.parent.handleDiskAccessException(dae);
      }
      throw dae;
    }
  }

  private void writeIFRecord(byte b, DiskRegionView dr, String s) {
    assert lock.isHeldByCurrentThread();
    try {
      int hdosSize = 1 + DR_ID_MAX_BYTES + estimateByteSize(s) + 1;
      if (hdosSize < 32) {
        hdosSize = 32;
      }
      try (HeapDataOutputStream hdos = new HeapDataOutputStream(hdosSize, KnownVersion.CURRENT)) {
        hdos.write(b);
        writeDiskRegionID(hdos, dr.getId());
        hdos.writeUTF(s);
        hdos.write(END_OF_RECORD_ID);
        writeIFRecord(hdos, true);
      }
    } catch (IOException ex) {
      DiskAccessException dae = new DiskAccessException(
          String.format("Failed writing data to initialization file because: %s", ex),
          this.parent);
      if (!this.compactInProgress) {
        this.parent.handleDiskAccessException(dae);
      }
      throw dae;
    }
  }

  private void writeIFRecord(byte b, long regionId, String fileName, Object compactorInfo) {
    assert lock.isHeldByCurrentThread();
    try {
      int hdosSize = 1 + DR_ID_MAX_BYTES + estimateByteSize(fileName) + 1;
      if (hdosSize < 32) {
        hdosSize = 32;
      }
      try (HeapDataOutputStream hdos = new HeapDataOutputStream(hdosSize, KnownVersion.CURRENT)) {
        hdos.write(b);
        writeDiskRegionID(hdos, regionId);
        hdos.writeUTF(fileName);
        // TODO - plum the correct compactor info to this point, to optimize
        // serialization
        DataSerializer.writeObject(compactorInfo, hdos);
        hdos.write(END_OF_RECORD_ID);
        writeIFRecord(hdos, true);
      }
    } catch (IOException ex) {
      DiskAccessException dae = new DiskAccessException(
          String.format("Failed writing data to initialization file because: %s", ex),
          this.parent);
      if (!this.compactInProgress) {
        this.parent.handleDiskAccessException(dae);
      }
      throw dae;
    }
  }

  private void writeIFRecord(byte b, long regionId, String fileName) {
    assert lock.isHeldByCurrentThread();
    try {
      int hdosSize = 1 + DR_ID_MAX_BYTES + estimateByteSize(fileName) + 1;
      if (hdosSize < 32) {
        hdosSize = 32;
      }
      try (HeapDataOutputStream hdos = new HeapDataOutputStream(hdosSize, KnownVersion.CURRENT)) {
        hdos.write(b);
        writeDiskRegionID(hdos, regionId);
        hdos.writeUTF(fileName);
        hdos.write(END_OF_RECORD_ID);
        writeIFRecord(hdos, true);
      }
    } catch (IOException ex) {
      DiskAccessException dae = new DiskAccessException(
          String.format("Failed writing data to initialization file because: %s", ex),
          this.parent);
      if (!this.compactInProgress) {
        this.parent.handleDiskAccessException(dae);
      }
      throw dae;
    }
  }

  private int estimateByteSize(String s) {
    return s == null ? 0 : ((s.length() + 1) * 3);
  }

  private void writePMIDRecord(byte opcode, DiskRegionView dr, PersistentMemberID pmid,
      boolean doStats) {
    assert lock.isHeldByCurrentThread();
    try {
      byte[] pmidBytes = pmidToBytes(pmid);
      ByteBuffer bb = getIFWriteBuffer(1 + DR_ID_MAX_BYTES + 4 + pmidBytes.length + 1);
      bb.put(opcode);
      putDiskRegionID(bb, dr.getId());
      bb.putInt(pmidBytes.length);
      bb.put(pmidBytes);
      bb.put(END_OF_RECORD_ID);
      writeIFRecord(bb, doStats);
    } catch (IOException ex) {
      DiskAccessException dae = new DiskAccessException(
          String.format("Failed writing data to initialization file because: %s", ex),
          this.parent);
      if (!this.compactInProgress) {
        this.parent.handleDiskAccessException(dae);
      }
      throw dae;
    }
  }

  private void putDiskRegionID(ByteBuffer bb, long drId) {
    // If the drId is <= 255 (max unsigned byte) then
    // encode it as a single byte.
    // Otherwise write a byte whose value is the number of bytes
    // it will be encoded by and then follow it with that many bytes.
    // Note that drId are not allowed to have a value in the range 1..8 inclusive.
    if (drId >= 0 && drId <= 255) {
      bb.put((byte) drId);
    } else {
      byte bytesNeeded = Oplog.bytesNeeded(drId);
      bb.put(bytesNeeded);
      byte[] bytes = new byte[bytesNeeded];
      for (int i = bytesNeeded - 1; i >= 0; i--) {
        bytes[i] = (byte) (drId & 0xFF);
        drId >>= 8;
      }
      bb.put(bytes);
    }
  }

  static void writeDiskRegionID(DataOutput dos, long drId) throws IOException {
    // If the drId is <= 255 (max unsigned byte) then
    // encode it as a single byte.
    // Otherwise write a byte whose value is the number of bytes
    // it will be encoded by and then follow it with that many bytes.
    // Note that drId are not allowed to have a value in the range 1..8 inclusive.
    if (drId >= 0 && drId <= 255) {
      dos.write((byte) drId);
    } else {
      byte bytesNeeded = Oplog.bytesNeeded(drId);
      dos.write(bytesNeeded);
      byte[] bytes = new byte[bytesNeeded];
      for (int i = bytesNeeded - 1; i >= 0; i--) {
        bytes[i] = (byte) (drId & 0xFF);
        drId >>= 8;
      }
      dos.write(bytes);
    }
  }

  static long readDiskRegionID(DataInput dis) throws IOException {
    int bytesToRead = dis.readUnsignedByte();
    if (bytesToRead <= DiskStoreImpl.MAX_RESERVED_DRID
        && bytesToRead >= DiskStoreImpl.MIN_RESERVED_DRID) {
      long result = dis.readByte(); // we want to sign extend this first byte
      bytesToRead--;
      while (bytesToRead > 0) {
        result <<= 8;
        result |= dis.readUnsignedByte(); // no sign extension
        bytesToRead--;
      }
      return result;
    } else {
      return bytesToRead;
    }
  }

  private void cmnAddMyInitializingPMID(DiskRegionView dr, PersistentMemberID pmid) {
    if (dr != null) {
      if (dr.addMyInitializingPMID(pmid) == null) {
        this.ifLiveRecordCount++;
      }
      this.ifTotalRecordCount++;
    } else {
      if (logger.isTraceEnabled(LogMarker.PERSIST_RECOVERY_VERBOSE)) {
        logger.trace(LogMarker.PERSIST_RECOVERY_VERBOSE, "bad disk region id!");
      } else {
        throw new IllegalStateException("bad disk region id");
      }
    }
  }

  private void cmnMarkInitialized(DiskRegionView dr) {
    // dec since this initializeId is overriding a previous one
    // It actually doesn't override myInitializing
    // this.ifLiveRecordCount--;
    // don't count this as a record in the totalRecCount
    if (dr != null) {
      dr.markInitialized();
    } else {
      if (logger.isTraceEnabled(LogMarker.PERSIST_RECOVERY_VERBOSE)) {
        logger.trace(LogMarker.PERSIST_RECOVERY_VERBOSE, "bad disk region id!");
      } else {
        throw new IllegalStateException("bad disk region id");
      }
    }
  }

  private void cmnBeginDestroyRegion(DiskRegionView dr) {
    // don't count it is a small record

    if (dr != null) {
      dr.markBeginDestroyRegion();
    } else {
      if (logger.isTraceEnabled(LogMarker.PERSIST_RECOVERY_VERBOSE)) {
        logger.trace(LogMarker.PERSIST_RECOVERY_VERBOSE, "bad disk region id!");
      } else {
        throw new IllegalStateException("bad disk region id");
      }
    }
  }

  private void cmnEndDestroyRegion(DiskRegionView dr) {
    // Figure out how may other records this freed up.

    if (dr != null) {
      if (dr.getClearOplogEntryId() != DiskStoreImpl.INVALID_ID) {
        // one for the clear record
        this.ifLiveRecordCount--;
      }
      // one for each online member
      this.ifLiveRecordCount -= dr.getOnlineMembers().size();
      // one for each offline member
      this.ifLiveRecordCount -= dr.getOfflineMembers().size();
      // one for each equal member
      this.ifLiveRecordCount -= dr.getOfflineAndEqualMembers().size();



      // one for the CREATE_REGION
      this.ifLiveRecordCount--;

      // one for the regions memberId
      if (dr.getMyPersistentID() != null) {
        this.ifLiveRecordCount--;
      }

      this.liveRegions--;
      this.drMap.remove(dr.getId());
      this.drMapByName.remove(dr.getName());
      this.parent.rmById(dr.getId());

      dr.markEndDestroyRegion();
    } else {
      if (logger.isTraceEnabled(LogMarker.PERSIST_RECOVERY_VERBOSE)) {
        logger.trace(LogMarker.PERSIST_RECOVERY_VERBOSE, "bad disk region id!");
      } else {
        throw new IllegalStateException("bad disk region id");
      }
    }
  }

  private void cmnBeginPartialDestroyRegion(DiskRegionView dr) {
    // count the begin as both live and total
    this.ifLiveRecordCount++;
    this.ifTotalRecordCount++;

    dr.markBeginDestroyDataStorage();
  }

  private void cmnEndPartialDestroyRegion(DiskRegionView dr) {
    // no need to count this small record

    // Figure out how may other records this freed up.
    if (dr.getClearOplogEntryId() != DiskStoreImpl.INVALID_ID) {
      // one for the clear record
      this.ifLiveRecordCount--;
    }
    // Figure out how may other records this freed up.
    if (dr.getMyPersistentID() != null) {
      // one for the regions memberId
      this.ifLiveRecordCount--;
    }

    dr.markEndDestroyDataStorage();
  }

  /**
   * Write the specified instantiator to the file.
   */
  private void saveInstantiator(Instantiator inst) {
    saveInstantiator(inst.getId(), inst.getClass().getName(),
        inst.getInstantiatedClass().getName());
  }

  private void saveInstantiator(int id, String instantiatorClassName,
      String instantiatedClassName) {
    lock(true);
    try {
      if (!this.compactInProgress && this.instIds.contains(id)) {
        // instantiator already written to disk so just return
        return;
      }
      final byte[] classNameBytes = classNameToBytes(instantiatorClassName);
      final byte[] instClassNameBytes = classNameToBytes(instantiatedClassName);
      ByteBuffer bb =
          getIFWriteBuffer(1 + 4 + 4 + classNameBytes.length + 4 + instClassNameBytes.length + 1);
      bb.put(IFREC_INSTANTIATOR_ID);
      bb.putInt(id);
      bb.putInt(classNameBytes.length);
      bb.put(classNameBytes);
      bb.putInt(instClassNameBytes.length);
      bb.put(instClassNameBytes);
      bb.put(END_OF_RECORD_ID);
      writeIFRecord(bb);
    } catch (IOException ex) {
      throw new DiskAccessException(
          String.format("Failed saving instantiator to disk because: %s",
              ex),
          this.parent);
    } finally {
      unlock(true);
    }
  }

  private void saveInstantiators() {
    Object[] objects = InternalInstantiator.getInstantiatorsForSerialization();
    for (Object obj : objects) {
      if (obj instanceof Instantiator) {
        saveInstantiator((Instantiator) obj);
      } else {
        InstantiatorAttributesHolder iah = (InstantiatorAttributesHolder) obj;
        saveInstantiator(iah.getId(), iah.getInstantiatorClassName(),
            iah.getInstantiatedClassName());
      }
    }
  }

  /**
   * Returns the bytes used to represent a class in an oplog.
   */
  private static byte[] classToBytes(Class c) {
    return classNameToBytes(c.getName());
  }

  /**
   * Returns the bytes used to represent a class in an oplog.
   */
  private static byte[] classNameToBytes(String cn) {
    return cn.getBytes(); // use default encoder
  }

  /**
   * Write the specified DataSerializer to the file.
   */
  private void saveDataSerializer(DataSerializer ds) {
    lock(true);
    try {
      if (!this.compactInProgress && this.dsIds.contains(ds.getId())) {
        // dataSerializer already written to disk so just return
        return;
      }
      final byte[] classNameBytes = classToBytes(ds.getClass());
      ByteBuffer bb = getIFWriteBuffer(1 + 4 + classNameBytes.length + 1);
      bb.put(IFREC_DATA_SERIALIZER_ID);
      bb.putInt(classNameBytes.length);
      bb.put(classNameBytes);
      bb.put(END_OF_RECORD_ID);
      writeIFRecord(bb);
    } catch (IOException ex) {
      throw new DiskAccessException(
          String.format("Failed saving data serializer to disk because: %s",
              ex),
          this.parent);
    } finally {
      unlock(true);
    }
  }

  private void saveDataSerializers() {
    DataSerializer[] dataSerializers = InternalDataSerializer.getSerializers();
    for (int i = 0; i < dataSerializers.length; i++) {
      saveDataSerializer(dataSerializers[i]);
    }
  }

  private void saveGemfireVersion() {
    if (this.gfversion == null) {
      this.gfversion = KnownVersion.CURRENT;
    }
    writeGemfireVersion(this.gfversion);
  }

  private void stopListeningForDataSerializerChanges() {
    if (this.regListener != null) {
      InternalDataSerializer.removeRegistrationListener(this.regListener);
    }
  }

  public long getMaxRecoveredClearEntryId() {
    return clearOplogEntryIdHWM;
  }

  private ByteBuffer getIFWriteBuffer(int size) {
    return ByteBuffer.allocate(size);
  }

  private void writeIFRecord(ByteBuffer bb) throws IOException {
    writeIFRecord(bb, true);
  }

  private void writeIFRecord(ByteBuffer bb, boolean doStats) throws IOException {
    assert lock.isHeldByCurrentThread();
    if (this.closed) {
      throw new DiskAccessException("The disk store is closed", parent);
    }

    this.ifRAF.write(bb.array(), 0, bb.position());
    if (logger.isTraceEnabled(LogMarker.PERSIST_WRITES_VERBOSE)) {
      logger.trace(LogMarker.PERSIST_WRITES_VERBOSE, "DiskInitFile writeIFRecord bb[0] = {}",
          bb.array()[0]);
    }
    if (doStats) {
      this.ifLiveRecordCount++;
      this.ifTotalRecordCount++;
    }
    compactIfNeeded();
  }

  private void writeIFRecord(HeapDataOutputStream hdos, boolean doStats) throws IOException {
    assert lock.isHeldByCurrentThread();
    if (this.closed) {
      throw new DiskAccessException("The disk store is closed", parent);
    }
    hdos.sendTo(this.ifRAF);
    if (logger.isTraceEnabled(LogMarker.PERSIST_WRITES_VERBOSE)) {
      logger.trace(LogMarker.PERSIST_WRITES_VERBOSE, "DiskInitFile writeIFRecord HDOS");
    }
    if (doStats) {
      this.ifLiveRecordCount++;
      this.ifTotalRecordCount++;
    }
    compactIfNeeded();
  }

  /**
   * If the file is smaller than this constant then it does not need to be compacted.
   */
  private static final long MIN_SIZE_BEFORE_COMPACT = 1024 * 1024;
  /**
   * If the ratio of live vs. dead is not less than this constant then no need to compact.
   */
  private static final double COMPACT_RATIO = 0.5;

  private void compactIfNeeded() {
    lock(true);
    try {
      if (this.compactInProgress)
        return;
      if (this.ifTotalRecordCount == 0)
        return;
      if (this.ifTotalRecordCount == this.ifLiveRecordCount)
        return;
      if (this.ifRAF.length() <= MIN_SIZE_BEFORE_COMPACT)
        return;
      if ((double) this.ifLiveRecordCount / (double) this.ifTotalRecordCount > COMPACT_RATIO)
        return;
      compact();
    } catch (IOException ignore) {
      return;
    } finally {
      unlock(true);
    }
  }

  private File getTempFile() {
    return new File(this.ifFile.getAbsolutePath() + "tmp");
  }

  public File getIFFile() {
    return this.ifFile;
  }

  private void compact() {
    lock(true);
    this.compactInProgress = true;
    try {
      try {
        this.ifRAF.close();
      } catch (IOException ignore) {
      }
      // rename the old file to tmpFile
      File tmpFile = getTempFile();
      if (this.ifFile.renameTo(tmpFile)) {
        boolean success = false;
        try {
          // create the new file
          openRAF();
          // fill the new file with data
          writeLiveData();
          success = true;

          // delete the old file
          if (!tmpFile.delete()) {
            throw new DiskAccessException("could not delete temporary file " + tmpFile,
                this.parent);
          }
        } catch (DiskAccessException ignore) {
          if (logger.isDebugEnabled()) {
            logger.debug("Exception compacting init file {}", this, ignore);
          }
        } finally {
          if (!success) {
            // if we failed
            // close the new one and delete it
            try {
              this.ifRAF.close();
            } catch (IOException ignore2) {
            }
            if (!this.ifFile.delete()) {
              throw new DiskAccessException("could not delete file " + this.ifFile, this.parent);
            }
            if (!tmpFile.renameTo(this.ifFile)) {
              throw new DiskAccessException(
                  "could not rename file " + tmpFile + " to " + this.ifFile, this.parent);
            }
            // reopen the old file since we couldn't write the new one
            openRAF();
            // reset the counts to 0 so we will try a compaction again
            // in the future but not right away.
            this.ifLiveRecordCount = 0;
            this.ifTotalRecordCount = 0;
          }
        }
      } else {
        // reopen the old file since we couldn't rename it
        openRAF();
        // reset the counts to 0 so we will try a compaction again
        // in the future but not right away.
        this.ifLiveRecordCount = 0;
        this.ifTotalRecordCount = 0;
      }
    } finally {
      this.compactInProgress = false;
      unlock(true);
    }
  }

  public void copyTo(File targetDir) throws IOException {
    lock(false);
    try {
      FileUtils.copyFileToDirectory(this.ifFile, targetDir);
    } finally {
      unlock(false);
    }
  }

  private void openRAF() {
    if (DiskStoreImpl.PREALLOCATE_IF) {
      openRAF2();
      return;
    }

    try {
      this.ifRAF = new RandomAccessFile(this.ifFile, getFileMode());
      long len = this.ifRAF.length();
      if (len != 0) {
        this.ifRAF.seek(len);
      }
    } catch (IOException ex) {
      throw new DiskAccessException(
          String.format("Could not open %s.", this.ifFile.getPath()), ex,
          this.parent);
    }
  }

  protected String getFileMode() {
    return DiskStoreImpl.SYNC_IF_WRITES ? "rwd" : "rw";
  }

  private void openRAF2() {
    try {
      this.ifRAF = new RandomAccessFile(this.ifFile, getFileMode());
      long len = this.ifRAF.length();
      if (len != 0) {
        // this.ifRAF.seek(len);
        if (this.gotEOF) {
          this.ifRAF.seek(this.nextSeekPosition - 1);
        } else {
          this.ifRAF.seek(this.nextSeekPosition);
        }
      } else {
        // pre-allocate the if file using some percentage of max Oplog size but
        // with max of 10M and min of 1M
        long maxSizeInMB = Math.min(Math.max(this.parent.getMaxOplogSize() / 200L, 1L), 10L);
        byte[] buffer = new byte[(1024 * 1024)];
        for (int i = 0; i < maxSizeInMB; i++) {
          this.ifRAF.write(buffer);
        }
        this.ifRAF.seek(0L);
      }
    } catch (IOException ex) {
      throw new DiskAccessException(
          String.format("Could not open %s.", this.ifFile.getPath()), ex,
          this.parent);
    }
  }

  /**
   * Write all live data to the init file
   */
  private void writeLiveData() {
    lock(true);
    try {
      this.ifLiveRecordCount = 0;
      this.ifTotalRecordCount = 0;
      writeDiskStoreId();
      saveGemfireVersion();
      saveInstantiators();
      saveDataSerializers();
      saveCrfIds();
      saveDrfIds();
      saveKrfIds();
      for (DiskRegionView drv : this.drMap.values()) {
        writeLiveData(drv);
      }
      for (DiskRegionView drv : this.parent.getDiskRegions()) {
        writeLiveData(drv);
      }
      savePRConfigs();
      saveCanonicalIds();
      saveRevokedMembers();
      if (logger.isDebugEnabled()) {
        logger.debug("After compacting init file lrc={} trc={}", this.ifLiveRecordCount,
            this.ifTotalRecordCount);
      }
    } finally {
      unlock(true);
    }
  }

  private void saveCrfIds() {
    for (LongIterator i = this.crfIds.iterator(); i.hasNext();) {
      writeIFRecord(IFREC_CRF_CREATE, i.next());
      this.ifLiveRecordCount++;
      this.ifTotalRecordCount++;
    }
  }

  private void saveDrfIds() {
    for (LongIterator i = this.drfIds.iterator(); i.hasNext();) {
      writeIFRecord(IFREC_DRF_CREATE, i.next());
      this.ifLiveRecordCount++;
      this.ifTotalRecordCount++;
    }
  }

  private void saveKrfIds() {
    for (Iterator<Long> i = this.krfIds.iterator(); i.hasNext();) {
      writeIFRecord(IFREC_KRF_CREATE, i.next());
      this.ifLiveRecordCount++;
      this.ifTotalRecordCount++;
    }
  }

  private void savePRConfigs() {
    for (Map.Entry<String, PRPersistentConfig> entry : prMap.entrySet()) {
      writePRCreate(entry.getKey(), entry.getValue());
      this.ifLiveRecordCount++;
      this.ifTotalRecordCount++;
    }
  }

  private void saveCanonicalIds() {
    Int2ObjectOpenHashMap mappings = canonicalIdHolder.getAllMappings();
    for (ObjectIterator<Int2ObjectMap.Entry<?>> i = mappings.int2ObjectEntrySet().fastIterator(); i
        .hasNext();) {
      Int2ObjectMap.Entry<?> entry = i.next();
      writeCanonicalId(entry.getIntKey(), entry.getValue());
    }
  }

  private void saveRevokedMembers() {
    for (PersistentMemberPattern revoked : revokedMembers) {
      writeRevokedMember(revoked);
    }
  }

  private void writeDiskStoreId() {
    lock(true);
    try {
      ByteBuffer bb = getIFWriteBuffer(1 + 6 + 1);
      bb.put(OPLOG_MAGIC_SEQ_ID);
      bb.put(Oplog.OPLOG_TYPE.IF.getBytes(), 0, Oplog.OPLOG_TYPE.getLen());
      bb.put(END_OF_RECORD_ID);
      writeIFRecord(bb, false); // don't do stats for these small records

      bb = getIFWriteBuffer(1 + 8 + 8 + 1);
      bb.put(IFREC_DISKSTORE_ID);
      bb.putLong(parent.getDiskStoreID().getLeastSignificantBits());
      bb.putLong(parent.getDiskStoreID().getMostSignificantBits());
      bb.put(END_OF_RECORD_ID);
      writeIFRecord(bb, false); // don't do stats for these small records
    } catch (IOException ex) {
      DiskAccessException dae = new DiskAccessException(
          String.format("Failed writing data to initialization file because: %s", ex),
          this.parent);
      if (!this.compactInProgress) {
        this.parent.handleDiskAccessException(dae);
      }
      throw dae;
    } finally {
      unlock(true);
    }
  }

  private void writeRevokedMember(PersistentMemberPattern revoked) {
    try (HeapDataOutputStream hdos = new HeapDataOutputStream(32, KnownVersion.CURRENT)) {
      hdos.write(IFREC_REVOKE_DISK_STORE_ID);
      InternalDataSerializer.invokeToData(revoked, hdos);
      hdos.write(END_OF_RECORD_ID);
      writeIFRecord(hdos, false); // don't do stats for these small records
    } catch (IOException ex) {
      DiskAccessException dae = new DiskAccessException(
          String.format("Failed writing data to initialization file because: %s", ex),
          this.parent);
      if (!this.compactInProgress) {
        this.parent.handleDiskAccessException(dae);
      }
      throw dae;
    }
  }

  private void writeRegionConfig(DiskRegionView drv) {
    int len = estimateByteSize(drv.getPartitionName());
    try (HeapDataOutputStream bb = new HeapDataOutputStream(
        1 + DR_ID_MAX_BYTES + 1 + 1 + 4 + 4 + 4 + 1 + 1 + 4 + len + 4 + 1 + 1 + 1,
        KnownVersion.CURRENT)) {
      int comprLen = estimateByteSize(drv.getCompressorClassName());
      bb.write(IFREC_REGION_CONFIG_ID_90);
      writeDiskRegionID(bb, drv.getId());
      bb.write(drv.getLruAlgorithm());
      bb.write(drv.getLruAction());
      bb.writeInt(drv.getLruLimit());
      bb.writeInt(drv.getConcurrencyLevel());
      bb.writeInt(drv.getInitialCapacity());
      bb.writeFloat(drv.getLoadFactor());
      bb.write((byte) (drv.getStatisticsEnabled() ? 1 : 0));
      bb.write((byte) (drv.isBucket() ? 1 : 0));
      final EnumSet<DiskRegionFlag> flags = drv.getFlags();
      bb.writeUTF(drv.getPartitionName());
      bb.writeInt(drv.getStartingBucketId());
      bb.writeUTF(drv.getCompressorClassName() == null ? "" : drv.getCompressorClassName());
      bb.writeBoolean(flags.contains(DiskRegionFlag.IS_WITH_VERSIONING));
      // TODO the offheap flag needs to be in a new version
      bb.writeBoolean(drv.getOffHeap());
      bb.write(END_OF_RECORD_ID);
      writeIFRecord(bb, false); // don't do stats for these small records
    } catch (IOException ex) {
      DiskAccessException dae = new DiskAccessException(
          String.format("Failed writing data to initialization file because: %s", ex),
          this.parent);
      if (!this.compactInProgress) {
        this.parent.handleDiskAccessException(dae);
      }
      throw dae;
    }
  }

  private void writePRCreate(String name, PRPersistentConfig config) {
    int nameLength = estimateByteSize(name);
    String colocatedWith = config.getColocatedWith();
    colocatedWith = colocatedWith == null ? "" : colocatedWith;
    int colocatedLength = estimateByteSize(colocatedWith);
    try (HeapDataOutputStream hdos =
        new HeapDataOutputStream(1 + nameLength + 4 + colocatedLength + 1, KnownVersion.CURRENT)) {
      hdos.write(IFREC_PR_CREATE);
      hdos.writeUTF(name);
      hdos.writeInt(config.getTotalNumBuckets());
      hdos.writeUTF(colocatedWith);
      hdos.write(END_OF_RECORD_ID);
      writeIFRecord(hdos, false);
    } catch (IOException ex) {
      DiskAccessException dae = new DiskAccessException(
          String.format("Failed writing data to initialization file because: %s", ex),
          this.parent);
      if (!this.compactInProgress) {
        this.parent.handleDiskAccessException(dae);
      }
      throw dae;
    }
  }

  private void writePRDestroy(String name) {
    int nameLength = estimateByteSize(name);
    try (HeapDataOutputStream hdos =
        new HeapDataOutputStream(1 + nameLength + 4 + 1, KnownVersion.CURRENT)) {
      hdos.write(IFREC_PR_DESTROY);
      hdos.writeUTF(name);
      hdos.write(END_OF_RECORD_ID);
      writeIFRecord(hdos, false);
    } catch (IOException ex) {
      DiskAccessException dae = new DiskAccessException(
          String.format("Failed writing data to initialization file because: %s", ex),
          this.parent);
      if (!this.compactInProgress) {
        this.parent.handleDiskAccessException(dae);
      }
      throw dae;
    }
  }

  private void writeCanonicalId(int id, Object object) {
    try (HeapDataOutputStream hdos = new HeapDataOutputStream(32, KnownVersion.CURRENT)) {
      hdos.write(IFREC_ADD_CANONICAL_MEMBER_ID);
      hdos.writeInt(id);
      DataSerializer.writeObject(object, hdos);
      hdos.write(END_OF_RECORD_ID);
      writeIFRecord(hdos, true);
    } catch (IOException ex) {
      DiskAccessException dae = new DiskAccessException(
          String.format("Failed writing data to initialization file because: %s", ex),
          this.parent);
      if (!this.compactInProgress) {
        this.parent.handleDiskAccessException(dae);
      }
      throw dae;
    }
  }

  private void writeLiveData(DiskRegionView drv) {
    writeIFRecord(IFREC_CREATE_REGION_ID, drv, drv.getName());
    writeRegionConfig(drv);
    if (drv.wasAboutToDestroy()) {
      writeIFRecord(IFREC_BEGIN_DESTROY_REGION_ID, drv);
    } else if (drv.wasAboutToDestroyDataStorage()) {
      writeIFRecord(IFREC_BEGIN_PARTIAL_DESTROY_REGION_ID, drv);
    }
    if (drv.getClearOplogEntryId() != DiskStoreImpl.INVALID_ID) {
      writeIFRecord(IFREC_CLEAR_REGION_ID, drv, drv.getClearOplogEntryId());
      this.ifTotalRecordCount++;
      this.ifLiveRecordCount++;
    }
    if (drv.getClearRVV() != null) {
      writeClearRecord(drv, drv.getClearRVV());
    }
    for (PersistentMemberID pmid : drv.getOnlineMembers()) {
      writePMIDRecord(IFREC_ONLINE_MEMBER_ID, drv, pmid, true);
    }
    for (PersistentMemberID pmid : drv.getOfflineMembers()) {
      writePMIDRecord(IFREC_OFFLINE_MEMBER_ID, drv, pmid, true);
    }
    for (PersistentMemberID pmid : drv.getOfflineAndEqualMembers()) {
      writePMIDRecord(IFREC_OFFLINE_AND_EQUAL_MEMBER_ID, drv, pmid, true);
    }
    if (drv.getMyPersistentID() != null) {
      writePMIDRecord(IFREC_MY_MEMBER_INITIALIZING_ID, drv, drv.getMyPersistentID(), true);
      writeIFRecord(IFREC_MY_MEMBER_INITIALIZED_ID, drv);
    }
    if (drv.getMyInitializingID() != null) {
      writePMIDRecord(IFREC_MY_MEMBER_INITIALIZING_ID, drv, drv.getMyInitializingID(), true);
    }
  }

  void forceCompaction() {
    compact();
  }

  private byte[] pmidToBytes(PersistentMemberID id) {
    try {
      HeapDataOutputStream hdos = new HeapDataOutputStream(KnownVersion.CURRENT);
      InternalDataSerializer.invokeToData(id, hdos);
      return hdos.toByteArray();
    } catch (IOException ex) {
      throw new DiskAccessException(
          String.format("Failed writing data to initialization file because: %s", ex),
          this.parent);
    }
  }

  private PersistentMemberID bytesToPMID(byte[] bytes) {
    try {
      ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
      DataInputStream dis = new DataInputStream(bais);
      PersistentMemberID result = new PersistentMemberID();
      InternalDataSerializer.invokeFromData(result, dis);
      return result;
    } catch (IOException io) {
      throw new DiskAccessException(
          String.format("Failed to read file during recovery from %s",
              this.ifFile.getPath()),
          io, this.parent);
    } catch (ClassNotFoundException cnf) {
      throw new DiskAccessException(
          String.format("Failed to read file during recovery from %s",
              this.ifFile.getPath()),
          cnf, this.parent);
    }
  }

  // non-private methods

  DiskInitFile(String name, DiskStoreImpl parent, boolean shouldExist, Set<File> oplogs) {
    this.parent = parent;
    File f = new File(this.parent.getInfoFileDir().getDir(), "BACKUP" + name + IF_FILE_EXT);
    final boolean didNotExist = !f.exists();
    if (shouldExist && didNotExist) {
      String msg = String.format("The init file %s does not exist.",
          new Object[] {f});
      if (!oplogs.isEmpty()) {
        Set<File> allOplogs = new LinkedHashSet(oplogs);
        msg +=
            String.format(
                "If it no longer exists then delete the following files to be able to create this disk store. Existing oplogs are: %s",
                new Object[] {allOplogs});
      }
      throw new IllegalStateException(msg);
    }
    this.ifFile = f;
    this.dsIds = new IntOpenHashSet();
    this.instIds = new IntOpenHashSet();
    this.crfIds = new LongOpenHashSet();
    this.drfIds = new LongOpenHashSet();

    // krfIds uses a concurrent impl because backup
    // can call hasKrf concurrently with cmnKrfCreate
    this.krfIds = ConcurrentHashMap.newKeySet();
    recover();
    if (this.parent.isOffline() && !this.parent.isOfflineCompacting()
        && !this.parent.isOfflineModify()) {
      dump();
    }
    openRAF();
    if (!this.parent.isOffline() || this.parent.isOfflineCompacting()) {
      if (didNotExist) {
        this.parent.setDiskStoreID(DiskStoreID.random());
        writeDiskStoreId();
        saveGemfireVersion(); // normal create diskstore
      }
      this.regListener = new InternalDataSerializer.RegistrationListener() {
        @Override
        public void newInstantiator(Instantiator i) {
          saveInstantiator(i);
        }

        @Override
        public void newDataSerializer(DataSerializer ds) {
          saveDataSerializer(ds);
        }
      };
      InternalDataSerializer.addRegistrationListener(this.regListener);
      // do this after the listener is registered to make sure we don't
      // miss any registrations.
      saveInstantiators();
      saveDataSerializers();
    } else {
      this.regListener = null;
    }
  }

  void closeRegion(DiskRegionView dr) {
    lock(true);
    try {
      this.parent.rmById(dr.getId()); // fix for bug 41334
      PlaceHolderDiskRegion phdr = new PlaceHolderDiskRegion(dr);
      this.drMap.put(dr.getId(), phdr);
      this.drMapByName.put(dr.getName(), phdr);
      // @todo make sure we only have one instance of the region for this name
    } finally {
      unlock(true);
    }
  }

  void clearRegion(DiskRegionView dr, long clearOplogEntryId) {
    lock(true);
    try {
      if (clearOplogEntryId != DiskStoreImpl.INVALID_ID) {
        this.ifTotalRecordCount++;
        if (dr.getClearOplogEntryId() == DiskStoreImpl.INVALID_ID) {
          this.ifLiveRecordCount++;
        } else {
          // we now have one record to gc (the previous clear).
        }
        dr.setClearOplogEntryId(clearOplogEntryId);
        if (clearOplogEntryId > clearOplogEntryIdHWM) {
          clearOplogEntryIdHWM = clearOplogEntryId;
        }
        writeIFRecord(IFREC_CLEAR_REGION_ID, dr, clearOplogEntryId);
      }
    } finally {
      unlock(true);
    }
  }

  /**
   * Clear the region using an RVV.
   */
  void clearRegion(DiskRegion dr, RegionVersionVector rvv) {
    lock(true);
    try {
      this.ifTotalRecordCount++;
      if (dr.getClearRVV() == null) {
        this.ifLiveRecordCount++;
      } else {
        // we now have one record to gc (the previous clear).
      }
      dr.setClearRVV(rvv);
      writeClearRecord(dr, rvv);
    } finally {
      unlock(true);
    }

  }

  /**
   * Write a clear with an RVV record.
   */
  private void writeClearRecord(DiskRegionView dr, RegionVersionVector rvv) {
    try (HeapDataOutputStream hdos = new HeapDataOutputStream(32, KnownVersion.CURRENT)) {
      hdos.write(IFREC_CLEAR_REGION_WITH_RVV_ID);
      writeDiskRegionID(hdos, dr.getId());
      // We only need the memberToVersionMap for clear purposes
      Map<DiskStoreID, RegionVersionHolder> memberToVersion = rvv.getMemberToVersion();
      hdos.writeInt(memberToVersion.size());
      for (Map.Entry<DiskStoreID, RegionVersionHolder> entry : memberToVersion.entrySet()) {
        InternalDataSerializer.invokeToData(entry.getKey(), hdos);
        synchronized (entry.getValue()) {
          InternalDataSerializer.invokeToData(entry.getValue(), hdos);
        }
      }
      hdos.write(END_OF_RECORD_ID);
      writeIFRecord(hdos, false); // don't do stats for these small records
    } catch (IOException ex) {
      DiskAccessException dae = new DiskAccessException(
          String.format("Failed writing data to initialization file because: %s", ex),
          this.parent);
      if (!this.compactInProgress) {
        this.parent.handleDiskAccessException(dae);
      }
      throw dae;
    }

  }

  void createRegion(DiskRegionView drv) {
    lock(true);
    try {
      if (!drv.isRecreated()) {
        writeIFRecord(IFREC_CREATE_REGION_ID, drv, drv.getName());
        this.liveRegions++;
        writeRegionConfig(drv);
        // no need to add to drMap since it will be in the DiskStore drMap
      } else {
        if (drv.hasConfigChanged()) {
          writeRegionConfig(drv);
          drv.setConfigChanged(false);
        }
      }
    } finally {
      unlock(true);
    }
  }

  void beginDestroyRegion(DiskRegionView dr) {
    lock(true);
    try {
      if (regionStillCreated(dr)) {
        cmnBeginDestroyRegion(dr);
        writeIFRecord(IFREC_BEGIN_DESTROY_REGION_ID, dr);
      }
    } finally {
      unlock(true);
    }
  }

  void endDestroyRegion(DiskRegionView dr) {
    lock(true);
    try {
      if (regionStillCreated(dr)) {
        cmnEndDestroyRegion(dr);
        writeIFRecord(IFREC_END_DESTROY_REGION_ID, dr);
        if (logger.isDebugEnabled()) {
          logger.trace(LogMarker.PERSIST_WRITES_VERBOSE,
              "DiskInitFile IFREC_END_DESTROY_REGION_ID drId={}", dr.getId());
        }
      }
    } finally {
      unlock(true);
    }
  }

  void beginDestroyDataStorage(DiskRegionView dr) {
    lock(true);
    try {
      assert regionStillCreated(dr);
      cmnBeginPartialDestroyRegion(dr);
      writeIFRecord(IFREC_BEGIN_PARTIAL_DESTROY_REGION_ID, dr);
    } finally {
      unlock(true);
    }
  }

  void endDestroyDataStorage(DiskRegionView dr) {
    lock(true);
    try {
      assert regionStillCreated(dr);
      cmnEndPartialDestroyRegion(dr);
      writeIFRecord(IFREC_END_PARTIAL_DESTROY_REGION_ID, dr);
    } finally {
      unlock(true);
    }
  }

  public void createPersistentPR(String name, PRPersistentConfig config) {
    lock(true);
    try {
      if (cmnPRCreate(name, config)) {
        writePRCreate(name, config);
      }
    } finally {
      unlock(true);
    }
  }

  public void destroyPersistentPR(String name) {
    lock(true);
    try {
      if (cmnPRDestroy(name)) {
        writePRDestroy(name);
      }
    } finally {
      unlock(true);
    }
  }

  public PRPersistentConfig getPersistentPR(String name) {
    lock(false);
    try {
      return prMap.get(name);
    } finally {
      unlock(false);
    }
  }

  public Map<String, PRPersistentConfig> getAllPRs() {
    lock(false);
    try {
      return new HashMap<String, PRPersistentConfig>(prMap);
    } finally {
      unlock(false);
    }
  }

  void crfCreate(long oplogId) {
    lock(true);
    try {
      cmnCrfCreate(oplogId);
      writeIFRecord(IFREC_CRF_CREATE, oplogId);
    } finally {
      unlock(true);
    }
  }

  void drfCreate(long oplogId) {
    lock(true);
    try {
      cmnDrfCreate(oplogId);
      writeIFRecord(IFREC_DRF_CREATE, oplogId);
    } finally {
      unlock(true);
    }
  }

  void krfCreate(long oplogId) {
    lock(true);
    try {
      cmnKrfCreate(oplogId);
      writeIFRecord(IFREC_KRF_CREATE, oplogId);
    } finally {
      unlock(true);
    }
  }

  void crfDelete(long oplogId) {
    lock(true);
    try {
      if (cmnCrfDelete(oplogId)) {
        // call writeIFRecord AFTER cmnCrfDelete to fix bug 41505
        writeIFRecord(IFREC_CRF_DELETE, oplogId);
      }
    } finally {
      unlock(true);
    }
  }

  void drfDelete(long oplogId) {
    lock(true);
    try {
      if (cmnDrfDelete(oplogId)) {
        writeIFRecord(IFREC_DRF_DELETE, oplogId);
      }
    } finally {
      unlock(true);
    }
  }

  int getOrCreateCanonicalId(Object object) {
    lock(true);
    try {
      int id = canonicalIdHolder.getId(object);
      if (id <= 0) {
        id = canonicalIdHolder.createId(object);
        writeCanonicalId(id, object);
      }
      return id;
    } finally {
      unlock(true);
    }
  }

  Object getCanonicalObject(int id) {
    lock(false);
    try {
      return canonicalIdHolder.getObject(id);
    } finally {
      unlock(false);
    }
  }

  void close() {
    lock(true);
    try {
      if (this.closed)
        return;
      this.closed = true;
      stopListeningForDataSerializerChanges();
      try {
        this.ifRAF.close();
      } catch (IOException ignore) {
      }
      for (DiskRegionView k : this.getKnown()) {
        k.close();
      }
      if (this.liveRegions == 0 && !parent.isValidating()) {
        basicDestroy();
      }
    } finally {
      unlock(true);
    }
  }

  void destroy() {
    lock(true);
    try {
      close();
      basicDestroy();
    } finally {
      unlock(true);
    }
  }

  private void basicDestroy() {
    if (this.ifFile.exists()) {
      if (!this.ifFile.delete()) {
        if (logger.isDebugEnabled()) {
          logger.debug("could not delete file {}", this.ifFile);
        }
      }
    }
  }


  void addMyInitializingPMID(DiskRegionView dr, PersistentMemberID pmid) {
    lock(true);
    try {
      if (regionStillCreated(dr)) {
        cmnAddMyInitializingPMID(dr, pmid);
        writePMIDRecord(IFREC_MY_MEMBER_INITIALIZING_ID, dr, pmid, false);
      }
    } finally {
      unlock(true);
    }
  }

  void markInitialized(DiskRegionView dr) {
    lock(true);
    try {
      if (regionStillCreated(dr)) {
        writeIFRecord(IFREC_MY_MEMBER_INITIALIZED_ID, dr);
        cmnMarkInitialized(dr);
      }
    } finally {
      unlock(true);
    }
  }

  void addOnlinePMID(DiskRegionView dr, PersistentMemberID pmid) {
    lock(true);
    try {
      if (regionStillCreated(dr)) {
        if (dr.addOnlineMember(pmid)) {
          if (dr.rmOfflineMember(pmid) || dr.rmEqualMember(pmid)) {
            this.ifLiveRecordCount--;
          }
          writePMIDRecord(IFREC_ONLINE_MEMBER_ID, dr, pmid, true);
        }
      }
    } finally {
      unlock(true);
    }
  }

  void addOfflinePMID(DiskRegionView dr, PersistentMemberID pmid) {
    lock(true);
    try {
      if (regionStillCreated(dr)) {
        if (dr.addOfflineMember(pmid)) {
          if (dr.rmOnlineMember(pmid) || dr.rmEqualMember(pmid)) {
            this.ifLiveRecordCount--;
          }
          writePMIDRecord(IFREC_OFFLINE_MEMBER_ID, dr, pmid, true);
        }
      }
    } finally {
      unlock(true);
    }
  }

  void addOfflineAndEqualPMID(DiskRegionView dr, PersistentMemberID pmid) {
    lock(true);
    try {
      if (regionStillCreated(dr)) {
        if (dr.addOfflineAndEqualMember(pmid)) {
          if (dr.rmOnlineMember(pmid) || dr.rmOfflineMember(pmid)) {
            this.ifLiveRecordCount--;
          }
          writePMIDRecord(IFREC_OFFLINE_AND_EQUAL_MEMBER_ID, dr, pmid, true);
        }
      }
    } finally {
      unlock(true);
    }
  }

  void rmPMID(DiskRegionView dr, PersistentMemberID pmid) {
    lock(true);
    try {
      if (regionStillCreated(dr)) {
        if (dr.rmOnlineMember(pmid) || dr.rmOfflineMember(pmid) || dr.rmEqualMember(pmid)) {
          // we now have two records to gc (this one and the live one we removed).
          this.ifLiveRecordCount--;
          this.ifTotalRecordCount++;
          writePMIDRecord(IFREC_RM_MEMBER_ID, dr, pmid, false);
        }
      }
    } finally {
      unlock(true);
    }
  }

  public void revokeMember(PersistentMemberPattern revokedPattern) {
    // We're only going to record members revoked with the new API -
    // using the UUID
    if (revokedPattern.getUUID() == null) {
      return;
    }

    lock(true);
    try {
      if (cmnRevokeDiskStoreId(revokedPattern)) {
        // we now have two records to gc (this one and the live one we removed).
        this.ifLiveRecordCount++;
        this.ifTotalRecordCount++;
        writeRevokedMember(revokedPattern);
      }
    } finally {
      unlock(true);
    }
  }

  /**
   * Get the set of members known to be revoked
   */
  public Set<PersistentMemberPattern> getRevokedIDs() {
    lock(false);
    try {
      // Return a copy of the set, because we modify it in place.
      return new HashSet<PersistentMemberPattern>(this.revokedMembers);
    } finally {
      unlock(false);
    }

  }

  /**
   * Return true if the given dr is still created in this IF.
   */
  boolean regionStillCreated(DiskRegionView dr) {
    lock(false);
    try {
      return getDiskRegionById(dr.getId()) != null;
    } finally {
      unlock(false);
    }
  }

  boolean regionExists(long drId) {
    lock(false);
    try {
      // @todo make drMap concurrent so this call can be fast
      return this.drMap.containsKey(drId);
    } finally {
      unlock(false);
    }
  }

  Collection<DiskRegionView> getKnown() {
    lock(false);
    try {
      return new ArrayList<DiskRegionView>(this.drMap.values());
    } finally {
      unlock(false);
    }
  }

  @Override
  public void cmnAddMyInitializingPMID(long drId, PersistentMemberID pmid) {
    cmnAddMyInitializingPMID(getDiskRegionById(drId), pmid);

  }

  @Override
  public void cmnBeginDestroyRegion(long drId) {
    cmnBeginDestroyRegion(getDiskRegionById(drId));
  }

  @Override
  public void cmnBeginPartialDestroyRegion(long drId) {
    cmnBeginPartialDestroyRegion(getDiskRegionById(drId));
  }

  @Override
  public void cmnEndDestroyRegion(long drId) {
    cmnEndDestroyRegion(getDiskRegionById(drId));
  }

  @Override
  public void cmnEndPartialDestroyRegion(long drId) {
    cmnEndPartialDestroyRegion(getDiskRegionById(drId));
  }

  @Override
  public void cmnMarkInitialized(long drId) {
    cmnMarkInitialized(getDiskRegionById(drId));
  }

  @Override
  public void cmnDiskStoreID(DiskStoreID diskStoreID) {
    if (logger.isTraceEnabled(LogMarker.PERSIST_RECOVERY_VERBOSE)) {
      logger.trace(LogMarker.PERSIST_RECOVERY_VERBOSE, "diskStoreId={}", diskStoreID);
    }
    this.parent.setDiskStoreID(diskStoreID);
  }

  @Override
  public boolean cmnRevokeDiskStoreId(PersistentMemberPattern revokedPattern) {
    return this.revokedMembers.add(revokedPattern);
  }

  @Override
  public String getNameForError() {
    return this.parent.toString();
  }

  @Override
  public boolean isClosing() {
    return parent.isClosing();
  }

  public void dump() {
    if (logger.isTraceEnabled(LogMarker.PERSIST_RECOVERY_VERBOSE)) {
      System.out.println("expectedCrfs=" + Arrays.toString(this.crfIds.toArray()));
      System.out.println("expectedDrfs=" + Arrays.toString(this.drfIds.toArray()));
      System.out.println("dataSerializerIds=" + Arrays.toString(this.dsIds.toArray()));
      System.out.println("instantiatorIds=  " + Arrays.toString(this.instIds.toArray()));
    }
  }

  /**
   * Returns a map of region_name->(pr_buckets|replicated_region)
   */
  private Map<String, List<PlaceHolderDiskRegion>> getRegionsToDump(String regName) {
    if (regName == null) {
      Map<String, List<PlaceHolderDiskRegion>> regions =
          new HashMap<String, List<PlaceHolderDiskRegion>>();
      for (DiskRegionView drv : this.drMap.values()) {
        if (drv instanceof PlaceHolderDiskRegion) {
          PlaceHolderDiskRegion dr = (PlaceHolderDiskRegion) drv;
          if (dr.isBucket()) {
            List<PlaceHolderDiskRegion> buckets = regions.get(dr.getPrName());
            if (buckets == null) {
              buckets = new ArrayList<PlaceHolderDiskRegion>();
              regions.put(dr.getPrName(), buckets);
            }
            buckets.add(dr);
          } else {
            regions.put(drv.getName(), Collections.singletonList(dr));
          }
        }
      }
      return regions;
    } else {
      DiskRegionView drv = getDiskRegionByName(regName);
      if (drv == null) {
        List<PlaceHolderDiskRegion> buckets = new ArrayList<PlaceHolderDiskRegion>();
        for (PlaceHolderDiskRegion dr : this.drMapByName.values()) {
          if (dr.isBucket()) {
            if (dr.getName().equals(dr.getPrName())) {
              buckets.add(dr);
            }
          }
        }
        if (buckets.isEmpty()) {
          throw new IllegalArgumentException(
              "The disk store does not contain a region named " + regName);
        } else {
          return Collections.singletonMap(regName, buckets);
        }
      } else if (drv instanceof PlaceHolderDiskRegion) {
        return Collections.singletonMap(regName,
            Collections.singletonList((PlaceHolderDiskRegion) drv));
      } else {
        return Collections.emptyMap();
      }
    }
  }

  public void dumpRegionInfo(PrintStream printStream, String regName) {
    printStream.println("Regions in the disk store:");
    for (Map.Entry<String, List<PlaceHolderDiskRegion>> regionEntry : getRegionsToDump(regName)
        .entrySet()) {
      printStream.print("  ");
      List<PlaceHolderDiskRegion> regions = regionEntry.getValue();
      if (logger.isTraceEnabled(LogMarker.PERSIST_RECOVERY_VERBOSE)) {
        for (PlaceHolderDiskRegion region : regions) {
          region.dump(printStream);
        }
      } else {
        // NOTE, regions will always have at least 1 item.
        regions.get(0).dump(printStream);
      }
    }
  }

  public void dumpRegionMetadata(boolean showBuckets) {
    System.out.println("Disk Store ID: " + getDiskStore().getDiskStoreID());
    System.out.println("Regions in the disk store:");
    for (Map.Entry<String, List<PlaceHolderDiskRegion>> regionEntry : getRegionsToDump(null)
        .entrySet()) {
      System.out.print("  ");
      List<PlaceHolderDiskRegion> regions = regionEntry.getValue();
      PlaceHolderDiskRegion region0 = regions.get(0);
      if (region0.isBucket()) {
        dumpPRMetaData(showBuckets, regions);

      } else {
        region0.dumpMetadata();
      }
    }
  }

  /**
   * Dump the metadata for a partitioned region, optionally dumping the meta data for individual
   * buckets.
   */
  private void dumpPRMetaData(boolean showBuckets, List<PlaceHolderDiskRegion> regions) {
    StringBuilder msg = new StringBuilder(regions.get(0).getPrName());
    regions.get(0).dumpCommonAttributes(msg);

    if (showBuckets) {
      for (PlaceHolderDiskRegion region : regions) {
        msg.append("\n");
        msg.append("\n");
        msg.append(region.getName());
        region.dumpPersistentView(msg);
      }
    } else {
      Map<DiskStoreID, String> online = new HashMap<DiskStoreID, String>();
      Map<DiskStoreID, String> offline = new HashMap<DiskStoreID, String>();
      Map<DiskStoreID, String> equal = new HashMap<DiskStoreID, String>();
      for (PlaceHolderDiskRegion region : regions) {
        for (PersistentMemberID mem : region.getOnlineMembers()) {
          online.put(mem.getDiskStoreId(), mem.getHost() + ":" + mem.getDirectory());
        }
        for (PersistentMemberID mem : region.getOfflineMembers()) {
          offline.put(mem.getDiskStoreId(), mem.getHost() + ":" + mem.getDirectory());
        }
        for (PersistentMemberID mem : region.getOfflineAndEqualMembers()) {
          equal.put(mem.getDiskStoreId(), mem.getHost() + ":" + mem.getDirectory());
        }
      }

      msg.append("\n\tonlineMembers:");
      for (Map.Entry<DiskStoreID, String> id : online.entrySet()) {
        msg.append("\n\t\t").append(id.getKey()).append(" ").append(id.getValue());
      }

      msg.append("\n\tofflineMembers:");
      for (Map.Entry<DiskStoreID, String> id : offline.entrySet()) {
        msg.append("\n\t\t").append(id.getKey()).append(" ").append(id.getValue());
      }

      msg.append("\n\tequalsMembers:");
      for (Map.Entry<DiskStoreID, String> id : equal.entrySet()) {
        msg.append("\n\t\t").append(id.getKey()).append(" ").append(id.getValue());
      }
    }

    System.out.println(msg);
  }

  public void destroyPRRegion(String prName) {
    ArrayList<PlaceHolderDiskRegion> buckets = new ArrayList<PlaceHolderDiskRegion>();
    lock(true);
    try {
      for (PlaceHolderDiskRegion dr : this.drMapByName.values()) {
        if (dr.isBucket()) {
          if (prName.equals(dr.getPrName())) {
            buckets.add(dr);
          }
        }
      }
    } finally {
      unlock(true);
    }
    for (PlaceHolderDiskRegion dr : buckets) {
      endDestroyRegion(dr);
    }

    // Remove the partitioned region record
    // for this disk store.
    destroyPersistentPR(prName);
  }

  public String modifyPRRegion(String prName, String lruOption, String lruActionOption,
      String lruLimitOption, String concurrencyLevelOption, String initialCapacityOption,
      String loadFactorOption, String compressorClassNameOption, String statisticsEnabledOption,
      String offHeapOption, boolean printToConsole) {
    StringBuffer sb = new StringBuffer();
    ArrayList<PlaceHolderDiskRegion> buckets = new ArrayList<PlaceHolderDiskRegion>();
    lock(true);
    try {
      for (PlaceHolderDiskRegion dr : this.drMapByName.values()) {
        if (dr.isBucket()) {
          if (prName.equals(dr.getPrName())) {
            buckets.add(dr);
          }
        }
      }

      // only print info on the first bucket to fix bug 41735
      boolean printInfo = true;
      for (PlaceHolderDiskRegion dr : buckets) {
        String message = basicModifyRegion(printInfo, dr, lruOption, lruActionOption,
            lruLimitOption, concurrencyLevelOption, initialCapacityOption, loadFactorOption,
            compressorClassNameOption, statisticsEnabledOption, offHeapOption, printToConsole);
        if (printInfo)
          sb.append(message);
        printInfo = false;
      }
    } finally {
      unlock(true);
    }
    return sb.toString();
  }

  public String modifyRegion(DiskRegionView drv, String lruOption, String lruActionOption,
      String lruLimitOption, String concurrencyLevelOption, String initialCapacityOption,
      String loadFactorOption, String compressorClassNameOption, String statisticsEnabledOption,
      String offHeapOption, boolean printToConsole) {
    lock(true);
    try {
      return basicModifyRegion(false, drv, lruOption, lruActionOption, lruLimitOption,
          concurrencyLevelOption, initialCapacityOption, loadFactorOption,
          compressorClassNameOption, statisticsEnabledOption, offHeapOption, printToConsole);
    } finally {
      unlock(true);
    }
  }

  private String basicModifyRegion(boolean printInfo, DiskRegionView drv, String lruOption,
      String lruActionOption, String lruLimitOption, String concurrencyLevelOption,
      String initialCapacityOption, String loadFactorOption, String compressorClassNameOption,
      String statisticsEnabledOption, String offHeapOption, boolean printToConsole) {
    byte lruAlgorithm = drv.getLruAlgorithm();
    byte lruAction = drv.getLruAction();
    int lruLimit = drv.getLruLimit();
    int concurrencyLevel = drv.getConcurrencyLevel();
    int initialCapacity = drv.getInitialCapacity();
    float loadFactor = drv.getLoadFactor();
    String compressorClassName = drv.getCompressorClassName();
    boolean statisticsEnabled = drv.getStatisticsEnabled();
    boolean offHeap = drv.getOffHeap();
    StringBuffer sb = new StringBuffer();
    final String lineSeparator = System.getProperty("line.separator");

    if (lruOption != null) {
      EvictionAlgorithm ea = EvictionAlgorithm.parseAction(lruOption);
      if (ea != null) {
        lruAlgorithm = (byte) ea.getValue();
      } else {
        throw new IllegalArgumentException(
            "Expected lru to be one of the following: \"none\", \"lru-entry-count\", \"lru-heap-percentage\", or \"lru-memory-size\"");
      }
      if (ea.isNone()) {
        lruAction = (byte) EvictionAction.NONE.getValue();
        lruLimit = 0;
      } else if (ea.isLRUHeap()) {
        lruLimit = 0;
      }
    }
    if (lruActionOption != null) {
      EvictionAction ea = EvictionAction.parseAction(lruActionOption);
      if (ea != null) {
        lruAction = (byte) ea.getValue();
      } else {
        throw new IllegalArgumentException(
            "Expected lruAction to be one of the following: \"none\", \"overflow-to-disk\", or \"local-destroy\"");
      }
    }
    if (lruLimitOption != null) {
      lruLimit = Integer.parseInt(lruLimitOption);
      if (lruLimit < 0) {
        throw new IllegalArgumentException("Expected lruLimit to be greater than or equal to zero");
      }
    }
    if (concurrencyLevelOption != null) {
      concurrencyLevel = Integer.parseInt(concurrencyLevelOption);
      if (concurrencyLevel < 0) {
        throw new IllegalArgumentException(
            "Expected concurrencyLevel to be greater than or equal to zero");
      }
    }
    if (initialCapacityOption != null) {
      initialCapacity = Integer.parseInt(initialCapacityOption);
      if (initialCapacity < 0) {
        throw new IllegalArgumentException(
            "Expected initialCapacity to be greater than or equal to zero");
      }
    }
    if (loadFactorOption != null) {
      loadFactor = Float.parseFloat(loadFactorOption);
      if (loadFactor < 0.0) {
        throw new IllegalArgumentException(
            "Expected loadFactor to be greater than or equal to zero");
      }
    }
    if (compressorClassNameOption != null) {
      compressorClassName =
          (compressorClassNameOption.isEmpty() ? null : compressorClassNameOption);
    }
    if (statisticsEnabledOption != null) {
      statisticsEnabled = Boolean.parseBoolean(statisticsEnabledOption);
      if (!statisticsEnabled) {
        // make sure it is "false"
        if (!statisticsEnabledOption.equalsIgnoreCase("false")) {
          throw new IllegalArgumentException(
              "Expected statisticsEnabled to be \"true\" or \"false\"");
        }
      }
    }
    if (offHeapOption != null) {
      offHeap = Boolean.parseBoolean(offHeapOption);
      if (!offHeap) {
        // make sure it is "false"
        if (!offHeapOption.equalsIgnoreCase("false")) {
          throw new IllegalArgumentException("Expected offHeap to be \"true\" or \"false\"");
        }
      }
    }

    sb.append("Before modification: ");
    sb.append(lineSeparator);
    sb.append(((PlaceHolderDiskRegion) drv).dump2());
    sb.append(lineSeparator);

    drv.setConfig(lruAlgorithm, lruAction, lruLimit, concurrencyLevel, initialCapacity, loadFactor,
        statisticsEnabled, drv.isBucket(), drv.getFlags(), drv.getPartitionName(),
        drv.getStartingBucketId(), compressorClassName, offHeap);

    // Make sure the combined lru args can still produce a legal eviction attributes
    // before writing them to disk.
    ((PlaceHolderDiskRegion) drv).getEvictionAttributes();
    writeRegionConfig(drv);

    sb.append("After modification: ");
    sb.append(lineSeparator);
    sb.append(((PlaceHolderDiskRegion) drv).dump2());
    sb.append(lineSeparator);

    String message = sb.toString();

    if (printInfo && printToConsole) {
      System.out.println(message);
    }
    return message;
  }

  private void writeGemfireVersion(KnownVersion version) {
    lock(true);
    try {
      ByteBuffer bb = getIFWriteBuffer(1 + 3 + 1);
      bb.put(IFREC_GEMFIRE_VERSION);
      VersioningIO.writeOrdinal(bb, version.ordinal(), false);
      bb.put(END_OF_RECORD_ID);
      writeIFRecord(bb, false); // don't do stats for these small records
    } catch (IOException ex) {
      DiskAccessException dae = new DiskAccessException(
          String.format("Failed writing data to initialization file because: %s", ex),
          this.parent);
      if (!this.compactInProgress) {
        this.parent.handleDiskAccessException(dae);
      }
      throw dae;
    } finally {
      unlock(true);
    }
  }

  /**
   * Additional flags for a disk region that are persisted in its meta-data.
   *
   * @since GemFire 7.0
   */
  public enum DiskRegionFlag {
    /**
     * True if this disk region has entries with versioning enabled. Depending on this flag, the
     * appropriate RegionEntryFactory gets instantiated.
     */
    IS_WITH_VERSIONING
  }
}
