blob: 002c70ccd471de79668c32cceca52647a6b9e045 [file] [log] [blame]
/*=========================================================================
* Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
* This product is protected by U.S. and international copyright
* and intellectual property laws. Pivotal products are covered by
* one or more patents listed at http://www.pivotal.io/patents.
*=========================================================================
*/
package com.gemstone.gemfire.internal.cache;
import it.unimi.dsi.fastutil.longs.LongOpenHashSet;
import java.io.File;
import java.io.FilenameFilter;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.logging.log4j.Logger;
import com.gemstone.gemfire.cache.DiskAccessException;
import com.gemstone.gemfire.internal.FileUtil;
import com.gemstone.gemfire.internal.cache.DiskEntry.Helper.ValueWrapper;
import com.gemstone.gemfire.internal.cache.DiskStoreImpl.OplogEntryIdSet;
import com.gemstone.gemfire.internal.cache.persistence.DiskRecoveryStore;
import com.gemstone.gemfire.internal.cache.persistence.DiskRegionView;
import com.gemstone.gemfire.internal.cache.persistence.DiskStoreFilter;
import com.gemstone.gemfire.internal.cache.persistence.OplogType;
import com.gemstone.gemfire.internal.cache.versions.RegionVersionVector;
import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
import com.gemstone.gemfire.internal.logging.LogService;
import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
import com.gemstone.gemfire.internal.logging.log4j.LogMarker;
import com.gemstone.gemfire.internal.sequencelog.EntryLogger;
public class PersistentOplogSet implements OplogSet {
private static final Logger logger = LogService.getLogger();
/** The active oplog * */
protected volatile Oplog child;
/** variable to generate sequential unique oplogEntryId's* */
private final AtomicLong oplogEntryId = new AtomicLong(DiskStoreImpl.INVALID_ID);
/** counter used for round-robin logic * */
int dirCounter = -1;
/**
* Contains all the oplogs that only have a drf (i.e. the crf has been deleted).
*/
final Map<Long, Oplog> drfOnlyOplogs = new LinkedHashMap<Long, Oplog>();
/** oplogs that are ready to compact */
final Map<Long, Oplog> oplogIdToOplog = new LinkedHashMap<Long, Oplog>();
/** oplogs that are done being written to but not yet ready to compact */
private final Map<Long, Oplog> inactiveOplogs = new LinkedHashMap<Long, Oplog>(16, 0.75f, true);
private final DiskStoreImpl parent;
final AtomicInteger inactiveOpenCount = new AtomicInteger();
private final Map<Long, DiskRecoveryStore> pendingRecoveryMap
= new HashMap<Long, DiskRecoveryStore>();
private final Map<Long, DiskRecoveryStore> currentRecoveryMap
= new HashMap<Long, DiskRecoveryStore>();
final AtomicBoolean alreadyRecoveredOnce = new AtomicBoolean(false);
/**
* The maximum oplog id we saw while recovering
*/
private volatile long maxRecoveredOplogId = 0;
public PersistentOplogSet(DiskStoreImpl parent) {
this.parent = parent;
}
/**
* returns the active child
*/
public final Oplog getChild() {
return this.child;
}
/**
* set the child to a new oplog
*
*/
void setChild(Oplog oplog) {
this.child = oplog;
// oplogSetAdd(oplog);
}
public Oplog[] getAllOplogs() {
synchronized (this.oplogIdToOplog) {
int rollNum = this.oplogIdToOplog.size();
int inactiveNum = this.inactiveOplogs.size();
int drfOnlyNum = this.drfOnlyOplogs.size();
int num = rollNum + inactiveNum + drfOnlyNum + 1;
Oplog[] oplogs = new Oplog[num];
oplogs[0] = getChild();
{
Iterator<Oplog> itr = this.oplogIdToOplog.values().iterator();
for (int i = 1; i <= rollNum; i++) {
oplogs[i] = itr.next();
}
}
{
Iterator<Oplog> itr = this.inactiveOplogs.values().iterator();
for (int i = 1; i <= inactiveNum; i++) {
oplogs[i+rollNum] = itr.next();
}
}
{
Iterator<Oplog> itr = this.drfOnlyOplogs.values().iterator();
for (int i = 1; i <= drfOnlyNum; i++) {
oplogs[i+rollNum+inactiveNum] = itr.next();
}
}
//Special case - no oplogs found
if(oplogs.length == 1 && oplogs[0] == null) {
return new Oplog[0];
}
return oplogs;
}
}
private TreeSet<Oplog> getSortedOplogs() {
TreeSet<Oplog> result = new TreeSet<Oplog>(new Comparator() {
public int compare(Object arg0, Object arg1) {
return Long.signum(((Oplog)arg1).getOplogId() - ((Oplog)arg0).getOplogId());
}
});
for (Oplog oplog: getAllOplogs()) {
if (oplog != null) {
result.add(oplog);
}
}
return result;
}
/**
* Get the oplog specified
*
* @param id
* int oplogId to be got
* @return Oplogs the oplog corresponding to the oplodId, id
*/
public Oplog getChild(long id) {
Oplog localOplog = this.child;
if (localOplog != null && id == localOplog.getOplogId()) {
return localOplog;
} else {
Long key = Long.valueOf(id);
synchronized (this.oplogIdToOplog) {
Oplog result = oplogIdToOplog.get(key);
if (result == null) {
result = inactiveOplogs.get(key);
}
return result;
}
}
}
@Override
public void create(LocalRegion region, DiskEntry entry, ValueWrapper value,
boolean async) {
getChild().create(region, entry, value, async);
}
@Override
public void modify(LocalRegion region, DiskEntry entry, ValueWrapper value,
boolean async) {
getChild().modify(region, entry, value, async);
}
public void offlineModify(DiskRegionView drv, DiskEntry entry, byte[] value, boolean isSerializedObject) {
getChild().offlineModify(drv, entry, value, isSerializedObject);
}
@Override
public void remove(LocalRegion region, DiskEntry entry, boolean async,
boolean isClear) {
getChild().remove(region, entry, async, isClear);
}
public void forceRoll(DiskRegion dr) {
Oplog child = getChild();
if (child != null) {
child.forceRolling(dr);
}
}
public Map<File, DirectoryHolder> findFiles(String partialFileName) {
this.dirCounter = 0;
Map<File, DirectoryHolder> backupFiles = new HashMap<File, DirectoryHolder>();
FilenameFilter backupFileFilter = getFileNameFilter(partialFileName);
for (DirectoryHolder dh: parent.directories) {
File dir = dh.getDir();
File[] backupList = FileUtil.listFiles(dir, backupFileFilter);
for (File f: backupList) {
backupFiles.put(f, dh);
}
}
return backupFiles;
}
protected FilenameFilter getFileNameFilter(String partialFileName) {
return new DiskStoreFilter(OplogType.BACKUP, false, partialFileName);
}
public void createOplogs(boolean needsOplogs,
Map<File, DirectoryHolder> backupFiles) {
LongOpenHashSet foundCrfs = new LongOpenHashSet();
LongOpenHashSet foundDrfs = new LongOpenHashSet();
for (Map.Entry<File, DirectoryHolder> entry: backupFiles.entrySet()) {
File file = entry.getKey();
String absolutePath = file.getAbsolutePath();
int underscorePosition = absolutePath.lastIndexOf("_");
int pointPosition = absolutePath.lastIndexOf(".");
String opid = absolutePath.substring(underscorePosition + 1,
pointPosition);
long oplogId = Long.parseLong(opid);
maxRecoveredOplogId = Math.max(maxRecoveredOplogId, oplogId);
//here look diskinit file and check if this opid already deleted or not
//if deleted then don't process it.
if(Oplog.isCRFFile(file.getName())) {
if(!isCrfOplogIdPresent(oplogId)) {
deleteFileOnRecovery(file);
try
{
String krfFileName = Oplog.getKRFFilenameFromCRFFilename(file.getAbsolutePath());
File krfFile = new File(krfFileName);
deleteFileOnRecovery(krfFile);
}catch(Exception ex) {//ignore
}
continue; //this file we unable to delete earlier
}
}else if(Oplog.isDRFFile(file.getName())) {
if(!isDrfOplogIdPresent(oplogId)) {
deleteFileOnRecovery(file);
continue; //this file we unable to delete earlier
}
}
Oplog oplog = getChild(oplogId);
if (oplog == null) {
oplog = new Oplog(oplogId, this);
//oplogSet.add(oplog);
addRecoveredOplog(oplog);
}
if (oplog.addRecoveredFile(file, entry.getValue())) {
foundCrfs.add(oplogId);
} else {
foundDrfs.add(oplogId);
}
}
if(needsOplogs) {
verifyOplogs(foundCrfs, foundDrfs);
}
}
protected boolean isDrfOplogIdPresent(long oplogId) {
return parent.getDiskInitFile().isDRFOplogIdPresent(oplogId);
}
protected boolean isCrfOplogIdPresent(long oplogId) {
return parent.getDiskInitFile().isCRFOplogIdPresent(oplogId);
}
protected void verifyOplogs(LongOpenHashSet foundCrfs, LongOpenHashSet foundDrfs) {
parent.getDiskInitFile().verifyOplogs(foundCrfs, foundDrfs);
}
private void deleteFileOnRecovery(File f) {
try {
f.delete();
}catch(Exception e) {
//ignore, one more attempt to delete the file failed
}
}
void addRecoveredOplog(Oplog oplog) {
basicAddToBeCompacted(oplog);
// don't schedule a compaction here. Wait for recovery to complete
}
/**
* Taking a lock on the LinkedHashMap oplogIdToOplog as it the operation of
* adding an Oplog to the Map & notifying the Compactor thread , if not already
* compaction has to be an atomic operation. add the oplog to the to be compacted
* set. if compactor thread is active and recovery is not going on then the
* compactor thread is notified of the addition
*/
void addToBeCompacted(Oplog oplog) {
basicAddToBeCompacted(oplog);
parent.scheduleCompaction();
}
private void basicAddToBeCompacted(Oplog oplog) {
if (!oplog.isRecovering() && oplog.hasNoLiveValues()) {
oplog.cancelKrf();
oplog.close(); // fix for bug 41687
oplog.deleteFiles(oplog.getHasDeletes());
} else {
int inactivePromotedCount = 0;
parent.getStats().incCompactableOplogs(1);
Long key = Long.valueOf(oplog.getOplogId());
synchronized (this.oplogIdToOplog) {
if (this.inactiveOplogs.remove(key) != null) {
if (oplog.isRAFOpen()) {
inactiveOpenCount.decrementAndGet();
}
inactivePromotedCount++;
}
this.oplogIdToOplog.put(key, oplog);
}
if (inactivePromotedCount > 0) {
parent.getStats().incInactiveOplogs(-inactivePromotedCount);
}
}
}
public final void recoverRegionsThatAreReady() {
// The following sync also prevents concurrent recoveries by multiple regions
// which is needed currently.
synchronized (this.alreadyRecoveredOnce) {
// need to take a snapshot of DiskRecoveryStores we will recover
synchronized (this.pendingRecoveryMap) {
this.currentRecoveryMap.clear();
this.currentRecoveryMap.putAll(this.pendingRecoveryMap);
this.pendingRecoveryMap.clear();
}
if (this.currentRecoveryMap.isEmpty() && this.alreadyRecoveredOnce.get()) {
// no recovery needed
return;
}
for (DiskRecoveryStore drs: this.currentRecoveryMap.values()) {
// Call prepare early to fix bug 41119.
drs.getDiskRegionView().prepareForRecovery();
}
if (!this.alreadyRecoveredOnce.get()) {
initOplogEntryId();
//Fix for #43026 - make sure we don't reuse an entry
//id that has been marked as cleared.
updateOplogEntryId(parent.getDiskInitFile().getMaxRecoveredClearEntryId());
}
final long start = parent.getStats().startRecovery();
long byteCount = 0;
EntryLogger.setSource(parent.getDiskStoreID(), "recovery");
try {
byteCount = recoverOplogs(byteCount);
} finally {
Map<String, Integer> prSizes = null;
Map<String, Integer> prBuckets = null;
if (parent.isValidating()) {
prSizes = new HashMap<String, Integer>();
prBuckets = new HashMap<String, Integer>();
}
for (DiskRecoveryStore drs: this.currentRecoveryMap.values()) {
for (Oplog oplog: getAllOplogs()) {
if (oplog != null) {
// Need to do this AFTER recovery to protect from concurrent compactions
// trying to remove the oplogs.
// We can't remove a dr from the oplog's unrecoveredRegionCount
// until it is fully recovered.
// This fixes bug 41119.
oplog.checkForRecoverableRegion(drs.getDiskRegionView());
}
}
if (parent.isValidating()) {
if (drs instanceof ValidatingDiskRegion) {
ValidatingDiskRegion vdr = ((ValidatingDiskRegion)drs);
if (logger.isTraceEnabled(LogMarker.PERSIST_RECOVERY)) {
vdr.dump(System.out);
}
if (vdr.isBucket()) {
String prName = vdr.getPrName();
if (prSizes.containsKey(prName)) {
int oldSize = prSizes.get(prName);
oldSize += vdr.size();
prSizes.put(prName, oldSize);
int oldBuckets = prBuckets.get(prName);
oldBuckets++;
prBuckets.put(prName, oldBuckets);
} else {
prSizes.put(prName, vdr.size());
prBuckets.put(prName, 1);
}
} else {
parent.incLiveEntryCount(vdr.size());
System.out.println(vdr.getName() + ": entryCount=" + vdr.size());
}
}
}
}
if (parent.isValidating()) {
for (Map.Entry<String, Integer> me: prSizes.entrySet()) {
parent.incLiveEntryCount(me.getValue());
System.out.println(me.getKey() + " entryCount=" + me.getValue()
+ " bucketCount=" + prBuckets.get(me.getKey()));
}
}
parent.getStats().endRecovery(start, byteCount);
this.alreadyRecoveredOnce.set(true);
this.currentRecoveryMap.clear();
EntryLogger.clearSource();
}
}
}
private long recoverOplogs(long byteCount) {
OplogEntryIdSet deletedIds = new OplogEntryIdSet();
TreeSet<Oplog> oplogSet = getSortedOplogs();
Set<Oplog> oplogsNeedingValueRecovery = new HashSet<Oplog>();
if (!this.alreadyRecoveredOnce.get()) {
if (getChild() != null && !getChild().hasBeenUsed()) {
// Then remove the current child since it is empty
// and does not need to be recovered from
// and it is important to not call initAfterRecovery on it.
oplogSet.remove(getChild());
}
}
if (oplogSet.size() > 0) {
long startOpLogRecovery = System.currentTimeMillis();
// first figure out all entries that have been destroyed
boolean latestOplog = true;
for (Oplog oplog: oplogSet) {
byteCount += oplog.recoverDrf(deletedIds,
this.alreadyRecoveredOnce.get(),
latestOplog);
latestOplog = false;
if (!this.alreadyRecoveredOnce.get()) {
updateOplogEntryId(oplog.getMaxRecoveredOplogEntryId());
}
}
parent.incDeadRecordCount(deletedIds.size());
// now figure out live entries
latestOplog = true;
for (Oplog oplog: oplogSet) {
long startOpLogRead = parent.getStats().startOplogRead();
long bytesRead = oplog.recoverCrf(deletedIds,
// @todo make recoverValues per region
recoverValues(),
recoverValuesSync(),
this.alreadyRecoveredOnce.get(),
oplogsNeedingValueRecovery,
latestOplog);
latestOplog = false;
if (!this.alreadyRecoveredOnce.get()) {
updateOplogEntryId(oplog.getMaxRecoveredOplogEntryId());
}
byteCount += bytesRead;
parent.getStats().endOplogRead(startOpLogRead, bytesRead);
//Callback to the disk regions to indicate the oplog is recovered
//Used for offline export
for (DiskRecoveryStore drs: this.currentRecoveryMap.values()) {
drs.getDiskRegionView().oplogRecovered(oplog.oplogId);
}
}
long endOpLogRecovery = System.currentTimeMillis();
long elapsed = endOpLogRecovery - startOpLogRecovery;
logger.info(LocalizedMessage.create(LocalizedStrings.DiskRegion_OPLOG_LOAD_TIME, elapsed));
}
if (!parent.isOfflineCompacting()) {
long startRegionInit = System.currentTimeMillis();
// create the oplogs now so that loadRegionData can have them available
//Create an array of Oplogs so that we are able to add it in a single shot
// to the map
for (DiskRecoveryStore drs: this.currentRecoveryMap.values()) {
drs.getDiskRegionView().initRecoveredEntryCount();
}
if (!this.alreadyRecoveredOnce.get()) {
for (Oplog oplog: oplogSet) {
if (oplog != getChild()) {
oplog.initAfterRecovery(parent.isOffline());
}
}
if (getChild() == null) {
setFirstChild(getSortedOplogs(), false);
}
}
if (!parent.isOffline()) {
if(recoverValues() && !recoverValuesSync()) {
//TODO DAN - should we defer compaction until after
//value recovery is complete? Or at least until after
//value recovery for a given oplog is complete?
//Right now, that's effectively what we're doing
//because this uses up the compactor thread.
parent.scheduleValueRecovery(oplogsNeedingValueRecovery, this.currentRecoveryMap);
}
if(!this.alreadyRecoveredOnce.get()) {
//Create krfs for oplogs that are missing them
for(Oplog oplog: oplogSet) {
if(oplog.needsKrf()) {
oplog.createKrfAsync();
}
}
parent.scheduleCompaction();
}
long endRegionInit = System.currentTimeMillis();
logger.info(LocalizedMessage.create(LocalizedStrings.DiskRegion_REGION_INIT_TIME, endRegionInit - startRegionInit));
}
}
return byteCount;
}
protected boolean recoverValuesSync() {
return parent.RECOVER_VALUES_SYNC;
}
protected boolean recoverValues() {
return parent.RECOVER_VALUES;
}
private void setFirstChild(TreeSet<Oplog> oplogSet, boolean force) {
if (parent.isOffline() && !parent.isOfflineCompacting() && !parent.isOfflineModify()) return;
if (!oplogSet.isEmpty()) {
Oplog first = oplogSet.first();
DirectoryHolder dh = first.getDirectoryHolder();
dirCounter = dh.getArrayIndex();
dirCounter = (++dirCounter) % parent.dirLength;
// we want the first child to go in the directory after the directory
// used by the existing oplog with the max id.
// This fixes bug 41822.
}
if (force || maxRecoveredOplogId > 0) {
setChild(new Oplog(maxRecoveredOplogId + 1, this, getNextDir()));
}
}
private final void initOplogEntryId() {
this.oplogEntryId.set(DiskStoreImpl.INVALID_ID);
}
/**
* Sets the last created oplogEntryId to the given value
* if and only if the given value is greater than the current
* last created oplogEntryId
*/
private final void updateOplogEntryId(long v) {
long curVal;
do {
curVal = this.oplogEntryId.get();
if (curVal >= v) {
// no need to set
return;
}
} while (!this.oplogEntryId.compareAndSet(curVal, v));
}
/**
* Returns the last created oplogEntryId.
* Returns INVALID_ID if no oplogEntryId has been created.
*/
final long getOplogEntryId() {
parent.initializeIfNeeded();
return this.oplogEntryId.get();
}
/**
* Creates and returns a new oplogEntryId for the given key. An oplogEntryId
* is needed when storing a key/value pair on disk. A new one is only needed
* if the key is new. Otherwise the oplogEntryId already allocated for a key
* can be reused for the same key.
*
* @return A disk id that can be used to access this key/value pair on disk
*/
final long newOplogEntryId() {
long result = this.oplogEntryId.incrementAndGet();
return result;
}
/**
* Returns the next available DirectoryHolder which has space. If no dir has
* space then it will return one anyway if compaction is enabled.
*
* @param minAvailableSpace
* the minimum amount of space we need in this directory.
*/
DirectoryHolder getNextDir(int minAvailableSpace, boolean checkForWarning) {
DirectoryHolder dirHolder = null;
DirectoryHolder selectedHolder = null;
synchronized (parent.directories) {
for (int i = 0; i < parent.dirLength; ++i) {
dirHolder = parent.directories[this.dirCounter];
// Asif :Increment the directory counter to next position so that next
// time when this operation is invoked, it checks for the Directory
// Space in a cyclical fashion.
dirCounter = (++dirCounter) % parent.dirLength;
// if the current directory has some space, then quit and
// return the dir
if (dirHolder.getAvailableSpace() >= minAvailableSpace) {
if (checkForWarning && !parent.isDirectoryUsageNormal(dirHolder)) {
if (logger.isDebugEnabled()) {
logger.debug("Ignoring directory {} due to insufficient disk space", dirHolder);
}
} else {
selectedHolder = dirHolder;
break;
}
}
}
if (selectedHolder == null) {
// we didn't find a warning-free directory, try again ignoring the check
if (checkForWarning) {
return getNextDir(minAvailableSpace, false);
}
if (parent.isCompactionEnabled()) {
/*
* try { this.isThreadWaitingForSpace = true;
* this.directories.wait(MAX_WAIT_FOR_SPACE); } catch
* (InterruptedException ie) { throw new
* DiskAccessException(LocalizedStrings.
* DiskRegion_UNABLE_TO_GET_FREE_SPACE_FOR_CREATING_AN_OPLOG_AS_THE_THREAD_ENCOUNETERD_EXCEPTION_WHILE_WAITING_FOR_COMPACTOR_THREAD_TO_FREE_SPACE
* .toLocalizedString(), ie); }
*/
selectedHolder = parent.directories[this.dirCounter];
// Increment the directory counter to next position
this.dirCounter = (++this.dirCounter) % parent.dirLength;
if (selectedHolder.getAvailableSpace() < minAvailableSpace) {
/*
* throw new DiskAccessException(LocalizedStrings.
* DiskRegion_UNABLE_TO_GET_FREE_SPACE_FOR_CREATING_AN_OPLOG_AFTER_WAITING_FOR_0_1_2_SECONDS
* .toLocalizedString(new Object[] {MAX_WAIT_FOR_SPACE, /,
* (1000)}));
*/
logger.warn(LocalizedMessage.create(
LocalizedStrings.DiskRegion_COMPLEXDISKREGIONGETNEXTDIR_MAX_DIRECTORY_SIZE_WILL_GET_VIOLATED__GOING_AHEAD_WITH_THE_SWITCHING_OF_OPLOG_ANY_WAYS_CURRENTLY_AVAILABLE_SPACE_IN_THE_DIRECTORY_IS__0__THE_CAPACITY_OF_DIRECTORY_IS___1,
new Object[] { Long.valueOf(selectedHolder.getUsedSpace()), Long.valueOf(selectedHolder.getCapacity()) }));
}
} else {
throw new DiskAccessException(
LocalizedStrings.DiskRegion_DISK_IS_FULL_COMPACTION_IS_DISABLED_NO_SPACE_CAN_BE_CREATED
.toLocalizedString(), parent);
}
}
}
return selectedHolder;
}
DirectoryHolder getNextDir() {
return getNextDir(DiskStoreImpl.MINIMUM_DIR_SIZE, true);
}
void addDrf(Oplog oplog) {
synchronized (this.oplogIdToOplog) {
this.drfOnlyOplogs.put(Long.valueOf(oplog.getOplogId()), oplog);
}
}
void removeDrf(Oplog oplog) {
synchronized (this.oplogIdToOplog) {
this.drfOnlyOplogs.remove(Long.valueOf(oplog.getOplogId()));
}
}
/**
* Return true if id is less than all the ids in the oplogIdToOplog map. Since
* the oldest one is in the LINKED hash map is first we only need to compare
* ourselves to it.
*/
boolean isOldestExistingOplog(long id) {
synchronized (this.oplogIdToOplog) {
Iterator<Long> it = this.oplogIdToOplog.keySet().iterator();
while (it.hasNext()) {
long otherId = it.next().longValue();
if (id > otherId) {
return false;
}
}
// since the inactiveOplogs map is an LRU we need to check each one
it = this.inactiveOplogs.keySet().iterator();
while (it.hasNext()) {
long otherId = it.next().longValue();
if (id > otherId) {
return false;
}
}
}
return true;
}
/**
* Destroy all the oplogs that are: 1. the oldest (based on smallest oplog id)
* 2. empty (have no live values)
*/
void destroyOldestReadyToCompact() {
synchronized (this.oplogIdToOplog) {
if (this.drfOnlyOplogs.isEmpty())
return;
}
Oplog oldestLiveOplog = getOldestLiveOplog();
ArrayList<Oplog> toDestroy = new ArrayList<Oplog>();
if (oldestLiveOplog == null) {
// remove all oplogs that are empty
synchronized (this.oplogIdToOplog) {
toDestroy.addAll(this.drfOnlyOplogs.values());
}
} else {
// remove all empty oplogs that are older than the oldest live one
synchronized (this.oplogIdToOplog) {
for (Oplog oplog : this.drfOnlyOplogs.values()) {
if (oplog.getOplogId() < oldestLiveOplog.getOplogId()) {
toDestroy.add(oplog);
// } else {
// // since drfOnlyOplogs is sorted any other ones will be even
// bigger
// // so we can break out of this loop
// break;
}
}
}
}
for (Oplog oplog : toDestroy) {
oplog.destroy();
}
}
/**
* Returns the oldest oplog that is ready to compact. Returns null if no
* oplogs are ready to compact. Age is based on the oplog id.
*/
private Oplog getOldestReadyToCompact() {
Oplog oldest = null;
synchronized (this.oplogIdToOplog) {
Iterator<Oplog> it = this.oplogIdToOplog.values().iterator();
while (it.hasNext()) {
Oplog oldestCompactable = it.next();
if (oldest == null
|| oldestCompactable.getOplogId() < oldest.getOplogId()) {
oldest = oldestCompactable;
}
}
it = this.drfOnlyOplogs.values().iterator();
while (it.hasNext()) {
Oplog oldestDrfOnly = it.next();
if (oldest == null || oldestDrfOnly.getOplogId() < oldest.getOplogId()) {
oldest = oldestDrfOnly;
}
}
}
return oldest;
}
private Oplog getOldestLiveOplog() {
Oplog result = null;
synchronized (this.oplogIdToOplog) {
Iterator<Oplog> it = this.oplogIdToOplog.values().iterator();
while (it.hasNext()) {
Oplog n = it.next();
if (result == null || n.getOplogId() < result.getOplogId()) {
result = n;
}
}
// since the inactiveOplogs map is an LRU we need to check each one
it = this.inactiveOplogs.values().iterator();
while (it.hasNext()) {
Oplog n = it.next();
if (result == null || n.getOplogId() < result.getOplogId()) {
result = n;
}
}
}
return result;
}
void inactiveAccessed(Oplog oplog) {
Long key = Long.valueOf(oplog.getOplogId());
synchronized (this.oplogIdToOplog) {
// update last access time
this.inactiveOplogs.get(key);
}
}
void inactiveReopened(Oplog oplog) {
addInactive(oplog, true);
}
void addInactive(Oplog oplog) {
addInactive(oplog, false);
}
private void addInactive(Oplog oplog, boolean reopen) {
Long key = Long.valueOf(oplog.getOplogId());
ArrayList<Oplog> openlist = null;
synchronized (this.oplogIdToOplog) {
boolean isInactive = true;
if (reopen) {
// It is possible that 'oplog' is compactable instead of inactive.
// So set the isInactive.
isInactive = this.inactiveOplogs.get(key) != null;
} else {
this.inactiveOplogs.put(key, oplog);
}
if ((reopen && isInactive) || oplog.isRAFOpen()) {
if (inactiveOpenCount.incrementAndGet() > DiskStoreImpl.MAX_OPEN_INACTIVE_OPLOGS) {
openlist = new ArrayList<Oplog>();
for (Oplog o : this.inactiveOplogs.values()) {
if (o.isRAFOpen()) {
// add to my list
openlist.add(o);
}
}
}
}
}
if (openlist != null) {
for (Oplog o : openlist) {
if (o.closeRAF()) {
if (inactiveOpenCount.decrementAndGet() <= DiskStoreImpl.MAX_OPEN_INACTIVE_OPLOGS) {
break;
}
}
}
}
if (!reopen) {
parent.getStats().incInactiveOplogs(1);
}
}
public void clear(DiskRegion dr, RegionVersionVector rvv) {
// call clear on each oplog
// to fix bug 44336 put them in another collection
ArrayList<Oplog> oplogsToClear = new ArrayList<Oplog>();
synchronized (this.oplogIdToOplog) {
for (Oplog oplog : this.oplogIdToOplog.values()) {
oplogsToClear.add(oplog);
}
for (Oplog oplog : this.inactiveOplogs.values()) {
oplogsToClear.add(oplog);
}
{
Oplog child = getChild();
if (child != null) {
oplogsToClear.add(child);
}
}
}
for (Oplog oplog : oplogsToClear) {
oplog.clear(dr, rvv);
}
if (rvv != null) {
parent.getDiskInitFile().clearRegion(dr, rvv);
} else {
long clearedOplogEntryId = getOplogEntryId();
parent.getDiskInitFile().clearRegion(dr, clearedOplogEntryId);
}
}
public RuntimeException close() {
RuntimeException rte = null;
try {
closeOtherOplogs();
} catch (RuntimeException e) {
if (rte != null) {
rte = e;
}
}
if (this.child != null) {
try {
this.child.finishKrf();
} catch (RuntimeException e) {
if (rte != null) {
rte = e;
}
}
try {
this.child.close();
} catch (RuntimeException e) {
if (rte != null) {
rte = e;
}
}
}
return rte;
}
/** closes all the oplogs except the current one * */
private void closeOtherOplogs() {
// get a snapshot to prevent CME
Oplog[] oplogs = getAllOplogs();
// if there are oplogs which are to be compacted, destroy them
// do not do oplogs[0]
for (int i = 1; i < oplogs.length; i++) {
oplogs[i].finishKrf();
oplogs[i].close();
removeOplog(oplogs[i].getOplogId());
}
}
/**
* Removes the oplog from the map given the oplogId
*
* @param id
* id of the oplog to be removed from the list
* @return oplog Oplog which has been removed
*/
Oplog removeOplog(long id) {
return removeOplog(id, false, null);
}
Oplog removeOplog(long id, boolean deleting, Oplog olgToAddToDrfOnly) {
Oplog oplog = null;
boolean drfOnly = false;
boolean inactive = false;
Long key = Long.valueOf(id);
synchronized (this.oplogIdToOplog) {
oplog = this.oplogIdToOplog.remove(key);
if (oplog == null) {
oplog = this.inactiveOplogs.remove(key);
if (oplog != null) {
if (oplog.isRAFOpen()) {
inactiveOpenCount.decrementAndGet();
}
inactive = true;
} else {
oplog = this.drfOnlyOplogs.remove(key);
if (oplog != null) {
drfOnly = true;
}
}
}
if (olgToAddToDrfOnly != null) {
addDrf(olgToAddToDrfOnly);
}
}
if (oplog != null) {
if (!drfOnly) {
if (inactive) {
parent.getStats().incInactiveOplogs(-1);
} else {
parent.getStats().incCompactableOplogs(-1);
}
}
if (!deleting && !oplog.isOplogEmpty()) {
// we are removing an oplog whose files are not deleted
parent.undeletedOplogSize.addAndGet(oplog.getOplogSize());
}
}
return oplog;
}
public void basicClose(DiskRegion dr) {
ArrayList<Oplog> oplogsToClose = new ArrayList<Oplog>();
synchronized (this.oplogIdToOplog) {
oplogsToClose.addAll(this.oplogIdToOplog.values());
oplogsToClose.addAll(this.inactiveOplogs.values());
oplogsToClose.addAll(this.drfOnlyOplogs.values());
{
Oplog child = getChild();
if (child != null) {
oplogsToClose.add(child);
}
}
}
for (Oplog oplog : oplogsToClose) {
oplog.close(dr);
}
}
public void prepareForClose() {
ArrayList<Oplog> oplogsToPrepare = new ArrayList<Oplog>();
synchronized (this.oplogIdToOplog) {
oplogsToPrepare.addAll(this.oplogIdToOplog.values());
oplogsToPrepare.addAll(this.inactiveOplogs.values());
}
boolean childPreparedForClose = false;
long child_oplogid = this.getChild() == null ? -1 : this.getChild().oplogId;
for (Oplog oplog : oplogsToPrepare) {
oplog.prepareForClose();
if (child_oplogid != -1 && oplog.oplogId == child_oplogid) {
childPreparedForClose = true;
}
}
if (!childPreparedForClose && this.getChild() != null) {
this.getChild().prepareForClose();
}
}
public void basicDestroy(DiskRegion dr) {
ArrayList<Oplog> oplogsToDestroy = new ArrayList<Oplog>();
synchronized (this.oplogIdToOplog) {
for (Oplog oplog : this.oplogIdToOplog.values()) {
oplogsToDestroy.add(oplog);
}
for (Oplog oplog : this.inactiveOplogs.values()) {
oplogsToDestroy.add(oplog);
}
for (Oplog oplog : this.drfOnlyOplogs.values()) {
oplogsToDestroy.add(oplog);
}
{
Oplog child = getChild();
if (child != null) {
oplogsToDestroy.add(child);
}
}
}
for (Oplog oplog : oplogsToDestroy) {
oplog.destroy(dr);
}
}
public void destroyAllOplogs() {
// get a snapshot to prevent CME
for (Oplog oplog : getAllOplogs()) {
if (oplog != null) {
oplog.destroy();
removeOplog(oplog.getOplogId());
}
}
}
/**
* Add compactable oplogs to the list, up to the maximum size.
* @param l
* @param max
*/
public void getCompactableOplogs(List<CompactableOplog> l, int max) {
synchronized (this.oplogIdToOplog) {
// Sort this list so we compact the oldest first instead of the one
// that was
// compactable first.
// ArrayList<CompactableOplog> l = new
// ArrayList<CompactableOplog>(this.oplogIdToOplog.values());
// Collections.sort(l);
// Iterator<Oplog> itr = l.iterator();
{
Iterator<Oplog> itr = this.oplogIdToOplog.values().iterator();
while (itr.hasNext() && l.size() < max) {
Oplog oplog = itr.next();
if (oplog.needsCompaction()) {
l.add(oplog);
}
}
}
}
}
public void scheduleForRecovery(DiskRecoveryStore drs) {
DiskRegionView dr = drs.getDiskRegionView();
if (dr.isRecreated()
&& (dr.getMyPersistentID() != null || dr.getMyInitializingID() != null)) {
// If a region does not have either id then don't pay the cost
// of scanning the oplogs for recovered data.
DiskRecoveryStore p_drs = drs;
synchronized (this.pendingRecoveryMap) {
this.pendingRecoveryMap.put(dr.getId(), p_drs);
}
}
}
/**
* Returns null if we are not currently recovering the DiskRegion with the
* given drId.
*/
public DiskRecoveryStore getCurrentlyRecovering(long drId) {
return this.currentRecoveryMap.get(drId);
}
public void initChild() {
if (getChild() == null) {
setFirstChild(getSortedOplogs(), true);
}
}
public void offlineCompact() {
if (getChild() != null) {
// check active oplog and if it is empty delete it
getChild().krfClose();
if (getChild().isOplogEmpty()) {
getChild().destroy();
}
}
{ // remove any oplogs that only have a drf to fix bug 42036
ArrayList<Oplog> toDestroy = new ArrayList<Oplog>();
synchronized (this.oplogIdToOplog) {
Iterator<Oplog> it = this.oplogIdToOplog.values().iterator();
while (it.hasNext()) {
Oplog n = it.next();
if (n.isDrfOnly()) {
toDestroy.add(n);
}
}
}
for (Oplog oplog : toDestroy) {
oplog.destroy();
}
destroyOldestReadyToCompact();
}
}
public DiskStoreImpl getParent() {
return parent;
}
public void updateDiskRegion(AbstractDiskRegion dr) {
for (Oplog oplog: getAllOplogs()) {
if (oplog != null) {
oplog.updateDiskRegion(dr);
}
}
}
public void flushChild() {
Oplog oplog = getChild();
if (oplog != null) {
oplog.flushAll();
}
}
public String getPrefix() {
return OplogType.BACKUP.getPrefix();
}
public void crfCreate(long oplogId) {
getParent().getDiskInitFile().crfCreate(oplogId);
}
public void drfCreate(long oplogId) {
getParent().getDiskInitFile().drfCreate(oplogId);
}
public void crfDelete(long oplogId) {
getParent().getDiskInitFile().crfDelete(oplogId);
}
public void drfDelete(long oplogId) {
getParent().getDiskInitFile().drfDelete(oplogId);
}
public boolean couldHaveKrf() {
return getParent().couldHaveKrf();
}
public boolean isCompactionPossible() {
return getParent().isCompactionPossible();
}
}