blob: cfc911e6f940540857ddc000e0b42515a299b25c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache;
import static java.lang.System.lineSeparator;
import java.io.PrintStream;
import java.util.EnumSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import joptsimple.internal.Strings;
import org.apache.logging.log4j.Logger;
import org.apache.geode.StatisticsFactory;
import org.apache.geode.cache.EvictionAction;
import org.apache.geode.cache.EvictionAlgorithm;
import org.apache.geode.cache.EvictionAttributes;
import org.apache.geode.compression.Compressor;
import org.apache.geode.internal.CopyOnWriteHashSet;
import org.apache.geode.internal.cache.DiskInitFile.DiskRegionFlag;
import org.apache.geode.internal.cache.entries.OffHeapRegionEntry;
import org.apache.geode.internal.cache.persistence.DiskRegionView;
import org.apache.geode.internal.cache.persistence.PersistentMemberID;
import org.apache.geode.internal.cache.persistence.PersistentMemberPattern;
import org.apache.geode.internal.cache.versions.DiskRegionVersionVector;
import org.apache.geode.internal.cache.versions.RegionVersionHolder;
import org.apache.geode.internal.cache.versions.RegionVersionVector;
import org.apache.geode.internal.cache.versions.VersionSource;
import org.apache.geode.internal.cache.versions.VersionTag;
import org.apache.geode.internal.classloader.ClassPathLoader;
import org.apache.geode.internal.logging.log4j.LogMarker;
import org.apache.geode.internal.util.concurrent.ConcurrentMapWithReusableEntries;
import org.apache.geode.logging.internal.log4j.api.LogService;
/**
* Code shared by both DiskRegion and RecoveredDiskRegion.
*
* @since GemFire prPersistSprint2
*/
public abstract class AbstractDiskRegion implements DiskRegionView {
private static final Logger logger = LogService.getLogger();
private final DiskStoreImpl ds;
private final long id;
private long clearOplogEntryId = DiskStoreImpl.INVALID_ID;
private RegionVersionVector clearRVV;
private byte lruAlgorithm;
private byte lruAction;
private int lruLimit;
private int concurrencyLevel = 16;
private int initialCapacity = 16;
private float loadFactor = 0.75f;
private boolean statisticsEnabled;
private boolean isBucket;
/** True if a persistent backup is needed */
private boolean backup;
/** Additional flags that are persisted to the meta-data. */
private final EnumSet<DiskRegionFlag> flags;
/**
* A flag used to indicate that this disk region is being recreated using already existing data on
* the disk.
*/
private boolean isRecreated;
private boolean configChanged;
private boolean aboutToDestroy;
private boolean aboutToDestroyDataStorage;
private String partitionName;
private int startingBucketId;
private String compressorClassName;
private Compressor compressor;
private boolean offHeap;
/**
* Records the version vector of what has been persisted to disk. This may lag behind the version
* vector of what is in memory, because updates may be written asynchronously to disk. We need to
* keep track of exactly what has been written to disk so that we can record a version vector at
* the beginning of each oplog.
*
* The version vector of what is in memory is held in is held in LocalRegion.versionVector.
*/
private RegionVersionVector versionVector;
/**
* A flag whether the current version vector accurately represents what has been written to this
* members disk.
*/
private volatile boolean rvvTrusted = true;
protected AbstractDiskRegion(DiskStoreImpl ds, String name) {
DiskRegionView drv = ds.getDiskInitFile().takeDiskRegionByName(name);
if (drv != null) {
// if we found one in the initFile then we take it out of it and this
// one we are constructing will replace it in the diskStore drMap.
this.ds = drv.getDiskStore();
id = drv.getId();
backup = drv.isBackup();
clearOplogEntryId = drv.getClearOplogEntryId();
clearRVV = drv.getClearRVV();
lruAlgorithm = drv.getLruAlgorithm();
lruAction = drv.getLruAction();
lruLimit = drv.getLruLimit();
concurrencyLevel = drv.getConcurrencyLevel();
initialCapacity = drv.getInitialCapacity();
loadFactor = drv.getLoadFactor();
statisticsEnabled = drv.getStatisticsEnabled();
isBucket = drv.isBucket();
flags = drv.getFlags();
partitionName = drv.getPartitionName();
startingBucketId = drv.getStartingBucketId();
myInitializingId = drv.getMyInitializingID();
myInitializedId = drv.getMyPersistentID();
aboutToDestroy = drv.wasAboutToDestroy();
aboutToDestroyDataStorage = drv.wasAboutToDestroyDataStorage();
onlineMembers = new CopyOnWriteHashSet<>(drv.getOnlineMembers());
offlineMembers = new CopyOnWriteHashSet<>(drv.getOfflineMembers());
equalMembers =
new CopyOnWriteHashSet<>(drv.getOfflineAndEqualMembers());
isRecreated = true;
// Use the same atomic counters as the previous disk region. This ensures that
// updates from threads with a reference to the old region update this disk region
// See 49943
numOverflowOnDisk = ((AbstractDiskRegion) drv).numOverflowOnDisk;
numEntriesInVM = ((AbstractDiskRegion) drv).numEntriesInVM;
numOverflowBytesOnDisk = ((AbstractDiskRegion) drv).numOverflowBytesOnDisk;
entries = drv.getRecoveredEntryMap();
readyForRecovery = drv.isReadyForRecovery();
recoveredEntryCount = drv.getRecoveredEntryCount();
recoveryCompleted = ((AbstractDiskRegion) drv).recoveryCompleted;
versionVector = drv.getRegionVersionVector();
compressorClassName = drv.getCompressorClassName();
compressor = drv.getCompressor();
offHeap = drv.getOffHeap();
if (drv instanceof PlaceHolderDiskRegion) {
setRVVTrusted(((PlaceHolderDiskRegion) drv).getRVVTrusted());
}
} else {
// This is a brand new disk region.
this.ds = ds;
// {
// DiskRegion existingDr = ds.getByName(name);
// if (existingDr != null) {
// throw new IllegalStateException("DiskRegion named " + name + " already exists with id=" +
// existingDr.getId());
// }
// }
id = ds.generateRegionId();
flags = EnumSet.noneOf(DiskRegionFlag.class);
onlineMembers = new CopyOnWriteHashSet<>();
offlineMembers = new CopyOnWriteHashSet<>();
equalMembers = new CopyOnWriteHashSet<>();
isRecreated = false;
versionVector = new DiskRegionVersionVector(ds.getDiskStoreID());
numOverflowOnDisk = new AtomicLong();
numEntriesInVM = new AtomicLong();
numOverflowBytesOnDisk = new AtomicLong();
}
}
protected AbstractDiskRegion(DiskStoreImpl ds, long id) {
this.ds = ds;
this.id = id;
flags = EnumSet.noneOf(DiskRegionFlag.class);
onlineMembers = new CopyOnWriteHashSet<>();
offlineMembers = new CopyOnWriteHashSet<>();
equalMembers = new CopyOnWriteHashSet<>();
isRecreated = true;
backup = true;
versionVector = new DiskRegionVersionVector(ds.getDiskStoreID());
numOverflowOnDisk = new AtomicLong();
numEntriesInVM = new AtomicLong();
numOverflowBytesOnDisk = new AtomicLong();
// We do not initialize the soplog set here. The soplog set needs
// to be handled the complete set of recovered soplogs, which is not available
// at the time a recovered disk region is first created.
}
/**
* Used to initialize a PlaceHolderDiskRegion for a region that is being closed
*
* @param drv the region that is being closed
*/
protected AbstractDiskRegion(DiskRegionView drv) {
ds = drv.getDiskStore();
id = drv.getId();
backup = drv.isBackup();
clearOplogEntryId = drv.getClearOplogEntryId();
clearRVV = drv.getClearRVV();
lruAlgorithm = drv.getLruAlgorithm();
lruAction = drv.getLruAction();
lruLimit = drv.getLruLimit();
concurrencyLevel = drv.getConcurrencyLevel();
initialCapacity = drv.getInitialCapacity();
loadFactor = drv.getLoadFactor();
statisticsEnabled = drv.getStatisticsEnabled();
isBucket = drv.isBucket();
flags = drv.getFlags();
partitionName = drv.getPartitionName();
startingBucketId = drv.getStartingBucketId();
myInitializingId = null; // fixes 43650
myInitializedId = drv.getMyPersistentID();
aboutToDestroy = false;
aboutToDestroyDataStorage = false;
onlineMembers = new CopyOnWriteHashSet<>(drv.getOnlineMembers());
offlineMembers = new CopyOnWriteHashSet<>(drv.getOfflineMembers());
equalMembers = new CopyOnWriteHashSet<>(drv.getOfflineAndEqualMembers());
isRecreated = true;
numOverflowOnDisk = new AtomicLong();
numEntriesInVM = new AtomicLong();
numOverflowBytesOnDisk = new AtomicLong();
entries = drv.getRecoveredEntryMap();
readyForRecovery = drv.isReadyForRecovery();
recoveredEntryCount = 0; // fix for bug 41570
recoveryCompleted = ((AbstractDiskRegion) drv).recoveryCompleted;
versionVector = drv.getRegionVersionVector();
compressorClassName = drv.getCompressorClassName();
compressor = drv.getCompressor();
offHeap = drv.getOffHeap();
}
@Override
public abstract String getName();
@Override
public DiskStoreImpl getDiskStore() {
return ds;
}
abstract void beginDestroyRegion(LocalRegion region);
public void resetRVV() {
versionVector = new DiskRegionVersionVector(ds.getDiskStoreID());
}
@Override
public long getId() {
return id;
}
@Override
public long getClearOplogEntryId() {
return clearOplogEntryId;
}
@Override
public void setClearOplogEntryId(long v) {
clearOplogEntryId = v;
}
@Override
public RegionVersionVector getClearRVV() {
return clearRVV;
}
@Override
public void setClearRVV(RegionVersionVector rvv) {
clearRVV = rvv;
}
@Override
public void setConfig(byte lruAlgorithm, byte lruAction, int lruLimit, int concurrencyLevel,
int initialCapacity, float loadFactor, boolean statisticsEnabled, boolean isBucket,
EnumSet<DiskRegionFlag> flags, String partitionName, int startingBucketId,
String compressorClassName, boolean offHeap) {
this.lruAlgorithm = lruAlgorithm;
this.lruAction = lruAction;
this.lruLimit = lruLimit;
this.concurrencyLevel = concurrencyLevel;
this.initialCapacity = initialCapacity;
this.loadFactor = loadFactor;
this.statisticsEnabled = statisticsEnabled;
this.isBucket = isBucket;
if (flags != null && flags != this.flags) {
this.flags.clear();
this.flags.addAll(flags);
}
this.partitionName = partitionName;
this.startingBucketId = startingBucketId;
this.compressorClassName = compressorClassName;
this.offHeap = offHeap;
if (!ds.isOffline()) {
createCompressorFromClassName();
}
}
public void createCompressorFromClassName() {
if (Strings.isNullOrEmpty(compressorClassName)) {
compressor = null;
} else {
try {
@SuppressWarnings("unchecked")
Class<Compressor> compressorClass =
(Class<Compressor>) ClassPathLoader.getLatest().forName(compressorClassName);
compressor = compressorClass.newInstance();
} catch (ClassNotFoundException e) {
throw new IllegalArgumentException(
String.format("Unknown Compressor %s found in disk initialization file.",
compressorClassName),
e);
} catch (InstantiationException e) {
throw new IllegalArgumentException(
String.format("Unknown Compressor %s found in disk initialization file.",
compressorClassName),
e);
} catch (IllegalAccessException e) {
throw new IllegalArgumentException(
String.format("Unknown Compressor %s found in disk initialization file.",
compressorClassName),
e);
}
}
}
@Override
public EvictionAttributes getEvictionAttributes() {
return new EvictionAttributesImpl().setAlgorithm(getActualLruAlgorithm())
.setAction(getActualLruAction()).setMaximum(getLruLimit());
}
@Override
public byte getLruAlgorithm() {
return lruAlgorithm;
}
public EvictionAlgorithm getActualLruAlgorithm() {
return EvictionAlgorithm.parseValue(getLruAlgorithm());
}
@Override
public byte getLruAction() {
return lruAction;
}
public EvictionAction getActualLruAction() {
return EvictionAction.parseValue(getLruAction());
}
@Override
public int getLruLimit() {
return lruLimit;
}
@Override
public int getConcurrencyLevel() {
return concurrencyLevel;
}
@Override
public int getInitialCapacity() {
return initialCapacity;
}
@Override
public float getLoadFactor() {
return loadFactor;
}
@Override
public boolean getStatisticsEnabled() {
return statisticsEnabled;
}
@Override
public boolean isBucket() {
return isBucket;
}
@Override
public EnumSet<DiskRegionFlag> getFlags() {
return flags;
}
@Override
public String getPartitionName() {
return partitionName;
}
@Override
public int getStartingBucketId() {
return startingBucketId;
}
public String getPrName() {
assert isBucket();
String bn = PartitionedRegionHelper.getBucketName(getName());
return PartitionedRegionHelper.getPRPath(bn);
}
private PersistentMemberID myInitializingId = null;
private PersistentMemberID myInitializedId = null;
private final CopyOnWriteHashSet<PersistentMemberID> onlineMembers;
private final CopyOnWriteHashSet<PersistentMemberID> offlineMembers;
private final CopyOnWriteHashSet<PersistentMemberID> equalMembers;
@Override
public PersistentMemberID addMyInitializingPMID(PersistentMemberID pmid) {
PersistentMemberID result = myInitializingId;
myInitializingId = pmid;
if (result != null) {
myInitializedId = result;
}
return result;
}
@Override
public void markInitialized() {
assert myInitializingId != null;
myInitializedId = myInitializingId;
myInitializingId = null;
}
@Override
public boolean addOnlineMember(PersistentMemberID pmid) {
return onlineMembers.add(pmid);
}
@Override
public boolean addOfflineMember(PersistentMemberID pmid) {
return offlineMembers.add(pmid);
}
@Override
public boolean addOfflineAndEqualMember(PersistentMemberID pmid) {
return equalMembers.add(pmid);
}
@Override
public boolean rmOnlineMember(PersistentMemberID pmid) {
return onlineMembers.remove(pmid);
}
@Override
public boolean rmOfflineMember(PersistentMemberID pmid) {
return offlineMembers.remove(pmid);
}
@Override
public boolean rmEqualMember(PersistentMemberID pmid) {
return equalMembers.remove(pmid);
}
@Override
public void markBeginDestroyRegion() {
aboutToDestroy = true;
}
@Override
public void markBeginDestroyDataStorage() {
aboutToDestroyDataStorage = true;
}
@Override
public void markEndDestroyRegion() {
onlineMembers.clear();
offlineMembers.clear();
equalMembers.clear();
myInitializedId = null;
myInitializingId = null;
aboutToDestroy = false;
isRecreated = false;
}
@Override
public void markEndDestroyDataStorage() {
myInitializedId = null;
myInitializingId = null;
aboutToDestroyDataStorage = false;
}
// PersistentMemberView methods
@Override
public PersistentMemberID getMyInitializingID() {
DiskInitFile dif = ds.getDiskInitFile();
if (dif == null) {
return myInitializingId;
}
synchronized (dif) {
return myInitializingId;
}
}
@Override
public PersistentMemberID getMyPersistentID() {
DiskInitFile dif = ds.getDiskInitFile();
if (dif == null) {
return myInitializedId;
}
synchronized (dif) {
return myInitializedId;
}
}
@Override
public Set<PersistentMemberID> getOnlineMembers() {
DiskInitFile dif = ds.getDiskInitFile();
if (dif == null) {
return onlineMembers.getSnapshot();
}
synchronized (dif) {
return onlineMembers.getSnapshot();
}
}
@Override
public Set<PersistentMemberID> getOfflineMembers() {
DiskInitFile dif = ds.getDiskInitFile();
if (dif == null) {
return offlineMembers.getSnapshot();
}
synchronized (dif) {
return offlineMembers.getSnapshot();
}
}
@Override
public Set<PersistentMemberID> getOfflineAndEqualMembers() {
DiskInitFile dif = ds.getDiskInitFile();
if (dif == null) {
return equalMembers.getSnapshot();
}
synchronized (dif) {
return equalMembers.getSnapshot();
}
}
@Override
public Set<PersistentMemberPattern> getRevokedMembers() {
DiskInitFile dif = ds.getDiskInitFile();
return ds.getRevokedMembers();
}
@Override
public void memberOffline(PersistentMemberID persistentID) {
ds.memberOffline(this, persistentID);
if (logger.isTraceEnabled(LogMarker.PERSIST_VERBOSE)) {
logger.trace(LogMarker.PERSIST_VERBOSE, "PersistentView {} - {} - member offline {}",
getDiskStoreID().abbrev(), getName(), persistentID);
}
}
@Override
public void memberOfflineAndEqual(PersistentMemberID persistentID) {
ds.memberOfflineAndEqual(this, persistentID);
if (logger.isTraceEnabled(LogMarker.PERSIST_VERBOSE)) {
logger.trace(LogMarker.PERSIST_VERBOSE,
"PersistentView {} - {} - member offline and equal {}", getDiskStoreID().abbrev(),
getName(), persistentID);
}
}
@Override
public void memberOnline(PersistentMemberID persistentID) {
ds.memberOnline(this, persistentID);
if (logger.isTraceEnabled(LogMarker.PERSIST_VERBOSE)) {
logger.trace(LogMarker.PERSIST_VERBOSE, "PersistentView {} - {} - member online {}",
getDiskStoreID().abbrev(), getName(), persistentID);
}
}
@Override
public void memberRemoved(PersistentMemberID persistentID) {
ds.memberRemoved(this, persistentID);
if (logger.isTraceEnabled(LogMarker.PERSIST_VERBOSE)) {
logger.trace(LogMarker.PERSIST_VERBOSE, "PersistentView {} - {} - member removed {}",
getDiskStoreID().abbrev(), getName(), persistentID);
}
}
@Override
public void memberRevoked(PersistentMemberPattern revokedPattern) {
ds.memberRevoked(revokedPattern);
if (logger.isTraceEnabled(LogMarker.PERSIST_VERBOSE)) {
logger.trace(LogMarker.PERSIST_VERBOSE, "PersistentView {} - {} - member revoked {}",
getDiskStoreID().abbrev(), getName(), revokedPattern);
}
}
@Override
public void setInitializing(PersistentMemberID newId) {
ds.setInitializing(this, newId);
if (logger.isTraceEnabled(LogMarker.PERSIST_VERBOSE)) {
logger.trace(LogMarker.PERSIST_VERBOSE, "PersistentView {} - {} - initializing local id: {}",
getDiskStoreID().abbrev(), getName(), getMyInitializingID());
}
}
@Override
public void setInitialized() {
ds.setInitialized(this);
if (logger.isTraceEnabled(LogMarker.PERSIST_VERBOSE)) {
logger.trace(LogMarker.PERSIST_VERBOSE, "PersistentView {} - {} - initialized local id: {}",
getDiskStoreID().abbrev(), getName(), getMyPersistentID());
}
}
@Override
public PersistentMemberID generatePersistentID() {
return ds.generatePersistentID();
}
@Override
public boolean isRecreated() {
return isRecreated;
}
@Override
public boolean hasConfigChanged() {
return configChanged;
}
@Override
public void setConfigChanged(boolean v) {
configChanged = v;
}
@Override
public void endDestroy(LocalRegion region) {
// Clean up the state if we were ready to recover this region
if (isReadyForRecovery()) {
ds.updateDiskRegion(this);
entriesMapIncompatible = false;
if (entries != null) {
ConcurrentMapWithReusableEntries<Object, Object> other =
entries.getCustomEntryConcurrentHashMap();
for (Map.Entry<Object, Object> me : other.entrySetWithReusableEntries()) {
RegionEntry oldRe = (RegionEntry) me.getValue();
if (oldRe instanceof OffHeapRegionEntry) {
((OffHeapRegionEntry) oldRe).release();
} else {
// no need to keep iterating; they are all either off heap or on heap.
break;
}
}
}
entries = null;
readyForRecovery = false;
}
if (aboutToDestroyDataStorage) {
ds.endDestroyDataStorage(region, (DiskRegion) this);
if (logger.isTraceEnabled(LogMarker.PERSIST_VERBOSE)) {
logger.trace(LogMarker.PERSIST_VERBOSE,
"PersistentView {} - {} - endDestroyDataStorage: {}", getDiskStoreID().abbrev(),
getName(), getMyPersistentID());
}
} else {
ds.endDestroyRegion(region, (DiskRegion) this);
if (logger.isTraceEnabled(LogMarker.PERSIST_VERBOSE)) {
logger.trace(LogMarker.PERSIST_VERBOSE, "PersistentView {} - {} - endDestroy: {}",
getDiskStoreID().abbrev(), getName(), getMyPersistentID());
}
}
}
/**
* Begin the destroy of everything related to this disk region.
*/
@Override
public void beginDestroy(LocalRegion region) {
beginDestroyRegion(region);
if (logger.isTraceEnabled(LogMarker.PERSIST_VERBOSE)) {
logger.trace(LogMarker.PERSIST_VERBOSE, "PersistentView {} - {} - beginDestroy: {}",
getDiskStoreID().abbrev(), getName(), getMyPersistentID());
}
if (myInitializedId == null) {
endDestroy(region);
}
}
/**
* Destroy the data storage this this disk region. Destroying the data storage leaves the
* persistent view, but removes the data.
*/
@Override
public void beginDestroyDataStorage() {
ds.beginDestroyDataStorage((DiskRegion) this);
if (logger.isTraceEnabled(LogMarker.PERSIST_VERBOSE)) {
logger.trace(LogMarker.PERSIST_VERBOSE,
"PersistentView {} - {} - beginDestroyDataStorage: {}", getDiskStoreID().abbrev(),
getName(), getMyPersistentID());
}
}
public void createDataStorage() {}
@Override
public boolean wasAboutToDestroy() {
return aboutToDestroy;
}
@Override
public boolean wasAboutToDestroyDataStorage() {
return aboutToDestroyDataStorage;
}
/**
* Set to true once this DiskRegion is ready to be recovered.
*/
private boolean readyForRecovery;
/**
* Total number of entries recovered by restoring from backup. Its initialized right after a
* recovery but may be updated later as recovered entries go away due to updates and destroys.
*/
protected int recoveredEntryCount;
private boolean entriesMapIncompatible;
private RegionMap entries;
private AtomicBoolean recoveryCompleted;
public void setEntriesMapIncompatible(boolean v) {
entriesMapIncompatible = v;
}
@Override
public boolean isEntriesMapIncompatible() {
return entriesMapIncompatible;
}
public RegionMap useExistingRegionMap(LocalRegion lr) {
RegionMap result = null;
if (!entriesMapIncompatible) {
result = entries;
// if (result != null) {
// result.changeOwner(lr);
// }
}
return result;
}
private void waitForRecoveryCompletion() {
boolean interrupted = Thread.interrupted();
synchronized (recoveryCompleted) {
try {
// @todo also check for shutdown of diskstore?
while (!recoveryCompleted.get()) {
try {
recoveryCompleted.wait();
} catch (InterruptedException ex) {
interrupted = true;
}
}
} finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
}
}
@Override
public void copyExistingRegionMap(LocalRegion lr) {
waitForRecoveryCompletion();
if (entriesMapIncompatible) {
// Reset the numEntriesInVM. It will be incremented when the copy to the new map is done,
// down in DiskEntry.Help.initialize. However, the other stats can't be updated
// there because we don't have the value length at that point. So leave
// those stats alone.
numEntriesInVM.set(0);
lr.initializeStats(getNumEntriesInVM(), getNumOverflowOnDisk(),
getNumOverflowBytesOnDisk());
lr.copyRecoveredEntries(entries);
} else {
entries.changeOwner(lr);
lr.initializeStats(getNumEntriesInVM(), getNumOverflowOnDisk(),
getNumOverflowBytesOnDisk());
lr.copyRecoveredEntries(null);
}
entries = null;
}
public void setRecoveredEntryMap(RegionMap rm) {
recoveryCompleted = new AtomicBoolean();
entries = rm;
}
@Override
public RegionMap getRecoveredEntryMap() {
return entries;
}
public void releaseRecoveryData() {
readyForRecovery = false;
}
@Override
public boolean isReadyForRecovery() {
// better name for this method would be isRecovering
return readyForRecovery;
}
@Override
public void prepareForRecovery() {
readyForRecovery = true;
}
/**
* gets the number of entries recovered
*
* @since GemFire 3.2.1
*/
@Override
public int getRecoveredEntryCount() {
return recoveredEntryCount;
}
@Override
public void incRecoveredEntryCount() {
recoveredEntryCount++;
}
/**
* initializes the number of entries recovered
*/
@Override
public void initRecoveredEntryCount() {
if (recoveryCompleted != null) {
synchronized (recoveryCompleted) {
recoveryCompleted.set(true);
recoveryCompleted.notifyAll();
}
}
}
protected final AtomicLong numOverflowOnDisk;
@Override
public long getNumOverflowOnDisk() {
return numOverflowOnDisk.get();
}
@Override
public void incNumOverflowOnDisk(long delta) {
numOverflowOnDisk.addAndGet(delta);
}
protected final AtomicLong numOverflowBytesOnDisk;
@Override
public long getNumOverflowBytesOnDisk() {
return numOverflowBytesOnDisk.get();
}
@Override
public void incNumOverflowBytesOnDisk(long delta) {
numOverflowBytesOnDisk.addAndGet(delta);
}
protected final AtomicLong numEntriesInVM;
@Override
public long getNumEntriesInVM() {
return numEntriesInVM.get();
}
@Override
public void incNumEntriesInVM(long delta) {
numEntriesInVM.addAndGet(delta);
}
/**
* Returns true if this region maintains a backup of all its keys and values on disk. Returns
* false if only values that will not fit in memory are written to disk.
*/
@Override
public boolean isBackup() {
return backup;
}
protected void setBackup(boolean v) {
backup = v;
}
public void dump(PrintStream printStream) {
String name = getName();
if (isBucket() && !logger.isTraceEnabled(LogMarker.PERSIST_RECOVERY_VERBOSE)) {
name = getPrName();
}
String msg = name + ":" + " -lru=" + getEvictionAttributes().getAlgorithm();
if (!getEvictionAttributes().getAlgorithm().isNone()) {
msg += " -lruAction=" + getEvictionAttributes().getAction();
if (!getEvictionAttributes().getAlgorithm().isLRUHeap()) {
msg += " -lruLimit=" + getEvictionAttributes().getMaximum();
}
}
msg += " -concurrencyLevel=" + getConcurrencyLevel() + " -initialCapacity="
+ getInitialCapacity() + " -loadFactor=" + getLoadFactor() + " -offHeap=" + getOffHeap()
+ " -compressor=" + (getCompressorClassName() == null ? "none" : getCompressorClassName())
+ " -statisticsEnabled=" + getStatisticsEnabled();
if (logger.isTraceEnabled(LogMarker.PERSIST_RECOVERY_VERBOSE)) {
msg += " drId=" + getId() + " isBucket=" + isBucket() + " clearEntryId="
+ getClearOplogEntryId() + " MyInitializingID=<" + getMyInitializingID() + ">"
+ " MyPersistentID=<" + getMyPersistentID() + ">" + " onlineMembers=" + getOnlineMembers()
+ " offlineMembers=" + getOfflineMembers() + " equalsMembers="
+ getOfflineAndEqualMembers();
}
printStream.println(msg);
}
public String dump2() {
final String lineSeparator = lineSeparator();
StringBuilder sb = new StringBuilder();
String name = getName();
if (isBucket() && logger.isTraceEnabled(LogMarker.PERSIST_RECOVERY_VERBOSE)) {
name = getPrName();
}
String msg = name + ":" + " -lru=" + getEvictionAttributes().getAlgorithm();
sb.append(name);
sb.append(lineSeparator);
sb.append("lru=" + getEvictionAttributes().getAlgorithm());
sb.append(lineSeparator);
if (!getEvictionAttributes().getAlgorithm().isNone()) {
sb.append("lruAction=" + getEvictionAttributes().getAction());
sb.append(lineSeparator);
if (!getEvictionAttributes().getAlgorithm().isLRUHeap()) {
sb.append("lruAction=" + getEvictionAttributes().getAction());
sb.append(lineSeparator);
}
}
sb.append("-concurrencyLevel=" + getConcurrencyLevel());
sb.append(lineSeparator);
sb.append("-initialCapacity=" + getInitialCapacity());
sb.append(lineSeparator);
sb.append("-loadFactor=" + getLoadFactor());
sb.append(lineSeparator);
sb.append("-offHeap=" + getOffHeap());
sb.append(lineSeparator);
sb.append(
"-compressor=" + (getCompressorClassName() == null ? "none" : getCompressorClassName()));
sb.append(lineSeparator);
sb.append("-statisticsEnabled=" + getStatisticsEnabled());
sb.append(lineSeparator);
if (logger.isTraceEnabled(LogMarker.PERSIST_RECOVERY_VERBOSE)) {
sb.append("drId=" + getId());
sb.append(lineSeparator);
sb.append("isBucket=" + isBucket());
sb.append(lineSeparator);
sb.append("clearEntryId=" + getClearOplogEntryId());
sb.append(lineSeparator);
sb.append("MyInitializingID=<" + getMyInitializingID() + ">");
sb.append(lineSeparator);
sb.append("MyPersistentID=<" + getMyPersistentID() + ">");
sb.append(lineSeparator);
sb.append("onlineMembers=" + getOnlineMembers());
sb.append(lineSeparator);
sb.append("offlineMembers=" + getOfflineMembers());
sb.append(lineSeparator);
sb.append("equalsMembers=" + getOfflineAndEqualMembers());
sb.append(lineSeparator);
sb.append("flags=").append(getFlags());
sb.append(lineSeparator);
}
return sb.toString();
}
public void dumpMetadata() {
String name = getName();
StringBuilder msg = new StringBuilder(name);
dumpCommonAttributes(msg);
dumpPersistentView(msg);
System.out.println(msg);
}
/**
* Dump the (bucket specific) persistent view to the string builder
*/
public void dumpPersistentView(StringBuilder msg) {
msg.append("\n\tMyInitializingID=<").append(getMyInitializingID()).append(">");
msg.append("\n\tMyPersistentID=<").append(getMyPersistentID()).append(">");
msg.append("\n\tonlineMembers:");
for (PersistentMemberID id : getOnlineMembers()) {
msg.append("\n\t\t").append(id);
}
msg.append("\n\tofflineMembers:");
for (PersistentMemberID id : getOfflineMembers()) {
msg.append("\n\t\t").append(id);
}
msg.append("\n\tequalsMembers:");
for (PersistentMemberID id : getOfflineAndEqualMembers()) {
msg.append("\n\t\t").append(id);
}
}
/**
* Dump the attributes which are common across the PR to the string builder.
*/
public void dumpCommonAttributes(StringBuilder msg) {
msg.append("\n\tlru=").append(getEvictionAttributes().getAlgorithm());
if (!getEvictionAttributes().getAlgorithm().isNone()) {
msg.append("\n\tlruAction=").append(getEvictionAttributes().getAction());
if (!getEvictionAttributes().getAlgorithm().isLRUHeap()) {
msg.append("\n\tlruLimit=").append(getEvictionAttributes().getMaximum());
}
}
msg.append("\n\tconcurrencyLevel=").append(getConcurrencyLevel());
msg.append("\n\tinitialCapacity=").append(getInitialCapacity());
msg.append("\n\tloadFactor=").append(getLoadFactor());
msg.append("\n\toffHeap=").append(getOffHeap());
msg.append("\n\tstatisticsEnabled=").append(getStatisticsEnabled());
msg.append("\n\tdrId=").append(getId());
msg.append("\n\tisBucket=").append(isBucket());
msg.append("\n\tclearEntryId=").append(getClearOplogEntryId());
msg.append("\n\tflags=").append(getFlags());
}
/**
* This method was added to fix bug 40192. It is like getBytesAndBits except it will return
* Token.REMOVE_PHASE1 if the htreeReference has changed (which means a clear was done).
*
* @return an instance of BytesAndBits or Token.REMOVED_PHASE1
*/
@Override
public Object getRaw(DiskId id) {
acquireReadLock();
try {
return getDiskStore().getRaw(this, id);
} finally {
releaseReadLock();
}
}
@Override
public RegionVersionVector getRegionVersionVector() {
return versionVector;
}
public long getVersionForMember(VersionSource member) {
return versionVector.getVersionForMember(member);
}
public void recordRecoveredGCVersion(VersionSource member, long gcVersion) {
versionVector.recordGCVersion(member, gcVersion);
}
public void recordRecoveredVersionHolder(VersionSource member, RegionVersionHolder versionHolder,
boolean latestOplog) {
versionVector.initRecoveredVersion(member, versionHolder, latestOplog);
}
public void recordRecoveredVersionTag(VersionTag tag) {
versionVector.recordVersion(tag.getMemberID(), tag.getRegionVersion());
}
/**
* Indicate that the current RVV for this disk region does not accurately reflect what has been
* recorded on disk. This is true while we are in the middle of a GII, because we record the new
* RVV at the beginning of the GII. If we recover in this state, we need to know that the
* recovered RVV is not something we can use to do a delta GII.
*/
public void setRVVTrusted(boolean trusted) {
rvvTrusted = trusted;
}
public boolean getRVVTrusted() {
return rvvTrusted;
}
public PersistentOplogSet getOplogSet() {
return getDiskStore().getPersistentOplogSet(this);
}
@Override
public String getCompressorClassName() {
return compressorClassName;
}
@Override
public Compressor getCompressor() {
return compressor;
}
@Override
public boolean getOffHeap() {
return offHeap;
}
@Override
public CachePerfStats getCachePerfStats() {
return ds.getCache().getCachePerfStats();
}
@Override
public void oplogRecovered(long oplogId) {
// do nothing. Overridden in ExportDiskRegion
}
@Override
public String toString() {
return getClass().getSimpleName() + ":" + getName();
}
@Override
public void incRecentlyUsed() {
entries.incRecentlyUsed();
}
@Override
public StatisticsFactory getStatisticsFactory() {
return ds.getStatisticsFactory();
}
@Override
public String getNameForStats() {
if (isBucket()) {
return getPrName();
} else {
return getName();
}
}
@Override
public InternalCache getCache() {
return getDiskStore().getCache();
}
}