blob: 7bb025b8660c67eda4186f972445857d1b7ad8d0 [file] [log] [blame]
/*=========================================================================
* Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
* This product is protected by U.S. and international copyright
* and intellectual property laws. Pivotal products are covered by
* one or more patents listed at http://www.pivotal.io/patents.
*=========================================================================
*/
package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.EnumMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.lang.NotImplementedException;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.util.ShutdownHookManager;
import com.gemstone.gemfire.InternalGemFireException;
import com.gemstone.gemfire.cache.CacheClosedException;
import com.gemstone.gemfire.cache.hdfs.HDFSIOException;
import com.gemstone.gemfire.cache.hdfs.HDFSStore;
import com.gemstone.gemfire.cache.hdfs.HDFSStore.HDFSCompactionConfig;
import com.gemstone.gemfire.cache.hdfs.internal.QueuedPersistentEvent;
import com.gemstone.gemfire.cache.hdfs.internal.SortedHoplogPersistedEvent;
import com.gemstone.gemfire.cache.hdfs.internal.cardinality.CardinalityMergeException;
import com.gemstone.gemfire.cache.hdfs.internal.cardinality.HyperLogLog;
import com.gemstone.gemfire.cache.hdfs.internal.cardinality.ICardinality;
import com.gemstone.gemfire.cache.hdfs.internal.cardinality.MurmurHash;
import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSCompactionManager.CompactionRequest;
import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSRegionDirector.HdfsRegionManager;
import com.gemstone.gemfire.cache.hdfs.internal.hoplog.Hoplog.HoplogReader;
import com.gemstone.gemfire.cache.hdfs.internal.hoplog.Hoplog.HoplogReaderActivityListener;
import com.gemstone.gemfire.cache.hdfs.internal.hoplog.Hoplog.HoplogWriter;
import com.gemstone.gemfire.cache.hdfs.internal.hoplog.Hoplog.Meta;
import com.gemstone.gemfire.cache.hdfs.internal.hoplog.mapreduce.HoplogUtil;
import com.gemstone.gemfire.internal.HeapDataOutputStream;
import com.gemstone.gemfire.internal.cache.ForceReattemptException;
import com.gemstone.gemfire.internal.cache.PrimaryBucketException;
import com.gemstone.gemfire.internal.cache.execute.BucketMovedException;
import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedOplogStatistics.IOOperation;
import com.gemstone.gemfire.internal.cache.persistence.soplog.TrackedReference;
import com.gemstone.gemfire.internal.concurrent.ConcurrentHashSet;
import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
import org.apache.hadoop.hbase.util.FSUtils;
/**
* Manages sorted oplog files for a bucket. An instance per bucket will exist in
* each PR
*
* @author ashvina
*/
public class HdfsSortedOplogOrganizer extends AbstractHoplogOrganizer<SortedHoplogPersistedEvent> {
public static final int AVG_NUM_KEYS_PER_INDEX_BLOCK = 200;
// all valid sorted hoplogs will follow the following name pattern
public static final String SORTED_HOPLOG_REGEX = HOPLOG_NAME_REGEX + "("
+ FLUSH_HOPLOG_EXTENSION + "|" + MINOR_HOPLOG_EXTENSION + "|"
+ MAJOR_HOPLOG_EXTENSION + ")";
public static final Pattern SORTED_HOPLOG_PATTERN = Pattern.compile(SORTED_HOPLOG_REGEX);
//Amount of time before deleting old temporary files
final long TMP_FILE_EXPIRATION_TIME_MS = Long.getLong(HoplogConfig.TMP_FILE_EXPIRATION, HoplogConfig.TMP_FILE_EXPIRATION_DEFAULT);
static float RATIO = HoplogConfig.COMPACTION_FILE_RATIO_DEFAULT;
// Compacter for this bucket
private Compactor compacter;
private final HoplogReadersController hoplogReadersController;
private AtomicLong previousCleanupTimestamp = new AtomicLong(Long.MIN_VALUE);
/**
* The default HLL constant. gives an accuracy of about 3.25%
* public only for testing upgrade from 1.3 to 1.4
*/
public static double HLL_CONSTANT = 0.03;
/**
* This estimator keeps track of this buckets entry count. This value is
* affected by flush and compaction cycles
*/
private volatile ICardinality bucketSize = new HyperLogLog(HLL_CONSTANT);
//A set of tmp files that existed when this bucket organizer was originally
//created. These may still be open by the old primary, or they may be
//abandoned files.
private LinkedList<FileStatus> oldTmpFiles;
private ConcurrentMap<Hoplog, Boolean> tmpFiles = new ConcurrentHashMap<Hoplog, Boolean>();
protected volatile boolean organizerClosed = false;
/**
* For the 1.4 release we are changing the HLL_CONSTANT which will make the
* old persisted HLLs incompatible with the new HLLs. To fix this we will
* force a major compaction when the system starts up so that we will only
* have new HLLs in the system (see bug 51403)
*/
private boolean startCompactionOnStartup = false;
/**
* @param region
* Region manager instance. Instances of hdfs listener instance,
* stats collector, file system, etc are shared by all buckets of a
* region and provided by region manager instance
* @param bucketId bucket id to be managed by this organizer
* @throws IOException
*/
public HdfsSortedOplogOrganizer(HdfsRegionManager region, int bucketId) throws IOException{
super(region, bucketId);
String val = System.getProperty(HoplogConfig.COMPACTION_FILE_RATIO);
try {
RATIO = Float.parseFloat(val);
} catch (Exception e) {
}
hoplogReadersController = new HoplogReadersController();
// initialize with all the files in the directory
List<Hoplog> hoplogs = identifyAndLoadSortedOplogs(true);
if (logger.isDebugEnabled()) {
logger.debug("{}Initializing bucket with existing hoplogs, count = " + hoplogs.size(), logPrefix);
}
for (Hoplog hoplog : hoplogs) {
addSortedOplog(hoplog, false, true);
}
// initialize sequence to the current maximum
sequence = new AtomicInteger(findMaxSequenceNumber(hoplogs));
initOldTmpFiles();
FileSystem fs = store.getFileSystem();
Path cleanUpIntervalPath = new Path(store.getHomeDir(), HoplogConfig.CLEAN_UP_INTERVAL_FILE_NAME);
if (!fs.exists(cleanUpIntervalPath)) {
HDFSCompactionConfig compactionConf = store.getHDFSCompactionConfig();
long intervalDurationMillis = compactionConf.getOldFilesCleanupIntervalMins() * 60 * 1000;
HoplogUtil.exposeCleanupIntervalMillis(fs, cleanUpIntervalPath, intervalDurationMillis);
}
if (startCompactionOnStartup) {
forceCompactionOnVersionUpgrade();
if (logger.isInfoEnabled()) {
logger.info(LocalizedStrings.HOPLOG_MAJOR_COMPACTION_SCHEDULED_FOR_BETTER_ESTIMATE);
}
}
}
/**
* Iterates on the input buffer and persists it in a new sorted oplog. This operation is
* synchronous and blocks the thread.
*/
@Override
public void flush(Iterator<? extends QueuedPersistentEvent> iterator, final int count)
throws IOException, ForceReattemptException {
assert iterator != null;
if (logger.isDebugEnabled())
logger.debug("{}Initializing flush operation", logPrefix);
final Hoplog so = getTmpSortedOplog(null, FLUSH_HOPLOG_EXTENSION);
HoplogWriter writer = null;
ICardinality localHLL = new HyperLogLog(HLL_CONSTANT);
// variables for updating stats
long start = stats.getFlush().begin();
int byteCount = 0;
try {
/**MergeGemXDHDFSToGFE changed the following statement as the code of HeapDataOutputStream is not merged */
//HeapDataOutputStream out = new HeapDataOutputStream();
try {
writer = this.store.getSingletonWriter().runSerially(new Callable<Hoplog.HoplogWriter>() {
@Override
public HoplogWriter call() throws Exception {
return so.createWriter(count);
}
});
} catch (Exception e) {
if (e instanceof IOException) {
throw (IOException)e;
}
throw new IOException(e);
}
while (iterator.hasNext() && !this.organizerClosed) {
HeapDataOutputStream out = new HeapDataOutputStream(1024, null);
QueuedPersistentEvent item = iterator.next();
item.toHoplogEventBytes(out);
byte[] valueBytes = out.toByteArray();
writer.append(item.getRawKey(), valueBytes);
// add key length and value length to stats byte counter
byteCount += (item.getRawKey().length + valueBytes.length);
// increment size only if entry is not deleted
if (!isDeletedEntry(valueBytes, 0)) {
int hash = MurmurHash.hash(item.getRawKey());
localHLL.offerHashed(hash);
}
/**MergeGemXDHDFSToGFE how to clear for reuse. Leaving it for Darrel to merge this change*/
//out.clearForReuse();
}
if (organizerClosed)
throw new BucketMovedException("The current bucket is moved BucketID: "+
this.bucketId + " Region name: " + this.regionManager.getRegion().getName());
// append completed. provide cardinality and close writer
writer.close(buildMetaData(localHLL));
writer = null;
} catch (IOException e) {
stats.getFlush().error(start);
try {
e = handleWriteHdfsIOError(writer, so, e);
} finally {
//Set the writer to null because handleWriteHDFSIOError has
//already closed the writer.
writer = null;
}
throw e;
} catch (BucketMovedException e) {
stats.getFlush().error(start);
deleteTmpFile(writer, so);
writer = null;
throw e;
} finally {
if (writer != null) {
writer.close();
}
}
try{
// ping secondaries before making the file a legitimate file to ensure
// that in case of split brain, no other vm has taken up as primary. #50110.
pingSecondaries();
// rename file and check if renaming was successful
synchronized (changePrimarylockObject) {
if (!organizerClosed)
makeLegitimate(so);
else
throw new BucketMovedException("The current bucket is moved BucketID: "+
this.bucketId + " Region name: " + this.regionManager.getRegion().getName());
}
try {
so.getSize();
} catch (IllegalStateException e) {
throw new IOException("Failed to rename hoplog file:" + so.getFileName());
}
//Disabling this assertion due to bug 49740
// check to make sure the sequence number is correct
// if (ENABLE_INTEGRITY_CHECKS) {
// Assert.assertTrue(getSequenceNumber(so) == findMaxSequenceNumber(identifyAndLoadSortedOplogs(false)),
// "Invalid sequence number detected for " + so.getFileName());
// }
// record the file for future maintenance and reads
addSortedOplog(so, false, true);
stats.getFlush().end(byteCount, start);
incrementDiskUsage(so.getSize());
} catch (BucketMovedException e) {
stats.getFlush().error(start);
deleteTmpFile(writer, so);
writer = null;
throw e;
} catch (IOException e) {
stats.getFlush().error(start);
logger.warn(LocalizedStrings.HOPLOG_FLUSH_OPERATION_FAILED, e);
throw e;
}
submitCompactionRequests();
}
/**
* store cardinality information in metadata
* @param localHLL the hll estimate for this hoplog only
*/
private EnumMap<Meta, byte[]> buildMetaData(ICardinality localHLL) throws IOException {
EnumMap<Meta, byte[]> map = new EnumMap<Hoplog.Meta, byte[]>(Meta.class);
map.put(Meta.LOCAL_CARDINALITY_ESTIMATE_V2, localHLL.getBytes());
return map;
}
private void submitCompactionRequests() throws IOException {
CompactionRequest req;
// determine if a major compaction is needed and create a compaction request
// with compaction manager
HDFSCompactionConfig compactionConf = store.getHDFSCompactionConfig();
if (compactionConf.getAutoMajorCompaction()) {
if (isMajorCompactionNeeded()) {
req = new CompactionRequest(regionFolder, bucketId, getCompactor(), true);
HDFSCompactionManager.getInstance(store).submitRequest(req);
}
}
// submit a minor compaction task. It will be ignored if there is no work to
// be done.
if (store.getMinorCompaction()) {
req = new CompactionRequest(regionFolder, bucketId, getCompactor(), false);
HDFSCompactionManager.getInstance(store).submitRequest(req);
}
}
/**
* @return true if the oldest hoplog was created 1 major compaction interval ago
*/
private boolean isMajorCompactionNeeded() throws IOException {
// major compaction interval in milliseconds
HDFSCompactionConfig compactionConf = store.getHDFSCompactionConfig();
long majorCInterval = ((long)compactionConf.getMajorCompactionIntervalMins()) * 60 * 1000;
Hoplog oplog = hoplogReadersController.getOldestHoplog();
if (oplog == null) {
return false;
}
long oldestFileTime = oplog.getModificationTimeStamp();
long now = System.currentTimeMillis();
if (logger.isDebugEnabled()) {
logger.debug("{}Checking oldest hop " + oplog.getFileName()
+ " for majorCompactionInterval=" + majorCInterval
+ " + now=" + now, logPrefix);
}
if (oldestFileTime > 0l && oldestFileTime < (now - majorCInterval)) {
return true;
}
return false;
}
@Override
public SortedHoplogPersistedEvent read(byte[] key) throws IOException {
long startTime = stats.getRead().begin();
String user = logger.isDebugEnabled() ? "Read" : null;
// collect snapshot of hoplogs
List<TrackedReference<Hoplog>> hoplogs = null;
hoplogs = hoplogReadersController.getTrackedSortedOplogList(user);
try {
// search for the key in order starting with the youngest oplog
for (TrackedReference<Hoplog> hoplog : hoplogs) {
HoplogReader reader = hoplog.get().getReader();
byte[] val = reader.read(key);
if (val != null) {
// value found in a younger hoplog. stop iteration
SortedHoplogPersistedEvent eventObj = deserializeValue(val);
stats.getRead().end(val.length, startTime);
return eventObj;
}
}
} catch (IllegalArgumentException e) {
if (IOException.class.isAssignableFrom(e.getCause().getClass())) {
throw handleIOError((IOException) e.getCause());
} else {
throw e;
}
} catch (IOException e) {
throw handleIOError(e);
} catch (HDFSIOException e) {
throw handleIOError(e);
} finally {
hoplogReadersController.releaseHoplogs(hoplogs, user);
}
stats.getRead().end(0, startTime);
return null;
}
protected IOException handleIOError(IOException e) {
// expose the error wrapped inside remote exception
if (e instanceof RemoteException) {
return ((RemoteException) e).unwrapRemoteException();
}
checkForSafeError(e);
// it is not a safe error. let the caller handle it
return e;
}
protected HDFSIOException handleIOError(HDFSIOException e) {
checkForSafeError(e);
return e;
}
protected void checkForSafeError(Exception e) {
boolean safeError = ShutdownHookManager.get().isShutdownInProgress();
if (safeError) {
// IOException because of closed file system. This happens when member is
// shutting down
if (logger.isDebugEnabled())
logger.debug("IO error caused by filesystem shutdown", e);
throw new CacheClosedException("IO error caused by filesystem shutdown", e);
}
if(isClosed()) {
//If the hoplog organizer is closed, throw an exception to indicate the
//caller should retry on the new primary.
throw new PrimaryBucketException(e);
}
}
protected IOException handleWriteHdfsIOError(HoplogWriter writer, Hoplog so, IOException e)
throws IOException {
if (logger.isDebugEnabled()) {
logger.debug("{}Handle write error:" + so, logPrefix);
}
closeWriter(writer);
// add to the janitor queue
tmpFiles.put(so, Boolean.TRUE);
return handleIOError(e);
}
private void deleteTmpFile(HoplogWriter writer, Hoplog so) {
closeWriter(writer);
// delete the temporary hoplog
try {
if (so != null) {
so.delete();
}
} catch (IOException e1) {
logger.info(e1);
}
}
private void closeWriter(HoplogWriter writer) {
if (writer != null) {
// close writer before deleting it
try {
writer.close();
} catch (Throwable e1) {
// error to close hoplog will happen if no connections to datanode are
// available. Try to delete the file on namenode
if(!isClosed()) {
logger.info(e1);
}
}
}
}
/**
* Closes hoplog and suppresses IO during reader close. Suppressing IO errors
* when the organizer is closing or an hoplog becomes inactive lets the system
* continue freeing other resources. It could potentially lead to socket
* leaks though.
*/
private void closeReaderAndSuppressError(Hoplog hoplog, boolean clearCache) {
try {
hoplog.close();
} catch (IOException e) {
// expose the error wrapped inside remote exception
if (e instanceof RemoteException) {
e = ((RemoteException) e).unwrapRemoteException();
}
logger.info(e);
}
}
@Override
public BucketIterator scan() throws IOException {
String user = logger.isDebugEnabled() ? "Scan" : null;
List<TrackedReference<Hoplog>> hoplogs = null;
BucketIterator iter = null;
try {
hoplogs = hoplogReadersController.getTrackedSortedOplogList(user);
iter = new BucketIterator(hoplogs);
return iter;
} finally {
// Normally the hoplogs will be released when the iterator is closed. The
// hoplogs must be released only if creating the iterator has failed.
if (iter == null) {
hoplogReadersController.releaseHoplogs(hoplogs, user);
}
}
}
@Override
public BucketIterator scan(byte[] from, byte[] to) throws IOException {
throw new NotImplementedException();
}
@Override
public BucketIterator scan(byte[] from, boolean fromInclusive, byte[] to, boolean toInclusive) throws IOException {
throw new NotImplementedException();
}
@Override
public HoplogIterator<byte[], SortedHoplogPersistedEvent> scan(
long startOffset, long length) throws IOException {
throw new UnsupportedOperationException("Not supported for " + this.getClass().getSimpleName());
}
@Override
public void close() throws IOException {
super.close();
synchronized (changePrimarylockObject) {
organizerClosed = true;
}
//Suspend compaction
getCompactor().suspend();
//Close the readers controller.
hoplogReadersController.close();
previousCleanupTimestamp.set(Long.MIN_VALUE);
}
/**
* This method call will happen on secondary node. The secondary node needs to update its data
* structures
*/
@Override
public void hoplogCreated(String region, int bucketId, Hoplog... oplogs)
throws IOException {
for (Hoplog oplog : oplogs) {
addSortedOplog(oplog, false, true);
}
}
@Override
public long sizeEstimate() {
return this.bucketSize.cardinality();
}
private void addSortedOplog(Hoplog so, boolean notify, boolean addsToBucketSize)
throws IOException {
if (!hoplogReadersController.addSortedOplog(so)) {
so.close();
throw new InternalGemFireException("Failed to add " + so);
}
String user = logger.isDebugEnabled() ? "Add" : null;
if (addsToBucketSize) {
TrackedReference<Hoplog> ref = null;
try {
ref = hoplogReadersController.trackHoplog(so, user);
synchronized (bucketSize) {
ICardinality localHLL = ref.get().getEntryCountEstimate();
if (localHLL != null) {
bucketSize = mergeHLL(bucketSize, localHLL);
}
}
} finally {
if (ref != null) {
hoplogReadersController.releaseHoplog(ref, user);
}
}
}
if (notify && listener != null) {
listener.hoplogCreated(regionFolder, bucketId, so);
}
}
private void reEstimateBucketSize() throws IOException {
ICardinality global = null;
String user = logger.isDebugEnabled() ? "HLL" : null;
List<TrackedReference<Hoplog>> hoplogs = null;
try {
hoplogs = hoplogReadersController.getTrackedSortedOplogList(user);
global = new HyperLogLog(HLL_CONSTANT);
for (TrackedReference<Hoplog> hop : hoplogs) {
global = mergeHLL(global, hop.get().getEntryCountEstimate());
}
} finally {
hoplogReadersController.releaseHoplogs(hoplogs, user);
}
bucketSize = global;
}
protected ICardinality mergeHLL(ICardinality global, ICardinality local)
/*throws IOException*/ {
try {
return global.merge(local);
} catch (CardinalityMergeException e) {
// uncomment this after the 1.4 release
//throw new InternalGemFireException(e.getLocalizedMessage(), e);
startCompactionOnStartup = true;
return global;
}
}
private void removeSortedOplog(TrackedReference<Hoplog> so, boolean notify) throws IOException {
hoplogReadersController.removeSortedOplog(so);
// release lock before notifying listeners
if (notify && listener != null) {
listener.hoplogDeleted(regionFolder, bucketId, so.get());
}
}
private void notifyCompactionListeners(boolean isMajor) {
listener.compactionCompleted(regionFolder, bucketId, isMajor);
}
/**
* This method call will happen on secondary node. The secondary node needs to update its data
* structures
* @throws IOException
*/
@Override
public void hoplogDeleted(String region, int bucketId, Hoplog... oplogs) throws IOException {
throw new NotImplementedException();
}
@Override
public synchronized Compactor getCompactor() {
if (compacter == null) {
compacter = new HoplogCompactor();
}
return compacter;
}
@Override
protected Hoplog getHoplog(Path hoplogPath) throws IOException {
Hoplog so = new HFileSortedOplog(store, hoplogPath, store.getBlockCache(), stats, store.getStats());
return so;
}
/**
* locks sorted oplogs collection, removes oplog and renames for deletion later
* @throws IOException
*/
void markSortedOplogForDeletion(List<TrackedReference<Hoplog>> targets, boolean notify) throws IOException {
for (int i = targets.size(); i > 0; i--) {
TrackedReference<Hoplog> so = targets.get(i - 1);
removeSortedOplog(so, true);
if (!store.getFileSystem().exists(new Path(bucketPath, so.get().getFileName()))) {
// the hoplog does not even exist on file system. Skip remaining steps
continue;
}
addExpiryMarkerForAFile(so.get());
}
}
/**
* Deletes expired hoplogs and expiry markers from the file system. Calculates
* a target timestamp based on cleanup interval. Then gets list of target
* hoplogs. It also updates the disk usage state
*
* @return number of files deleted
*/
synchronized int initiateCleanup() throws IOException {
int conf = store.getHDFSCompactionConfig().getOldFilesCleanupIntervalMins();
// minutes to milliseconds
long intervalDurationMillis = conf * 60 * 1000;
// Any expired hoplog with timestamp less than targetTS is a delete
// candidate.
long targetTS = System.currentTimeMillis() - intervalDurationMillis;
if (logger.isDebugEnabled()) {
logger.debug("Target timestamp for expired hoplog deletion " + targetTS, logPrefix);
}
// avoid too frequent cleanup invocations. Exit cleanup invocation if the
// previous cleanup was executed within 10% range of cleanup interval
if (previousCleanupTimestamp.get() > targetTS
&& (previousCleanupTimestamp.get() - targetTS) < (intervalDurationMillis / 10)) {
if (logger.isDebugEnabled()) {
logger.debug("Skip cleanup, previous " + previousCleanupTimestamp.get(), logPrefix);
}
return 0;
}
List<FileStatus> targets = getOptimizationTargets(targetTS);
return deleteExpiredFiles(targets);
}
protected int deleteExpiredFiles(List<FileStatus> targets) throws IOException {
if (targets == null) {
return 0;
}
for (FileStatus file : targets) {
if (logger.isDebugEnabled()) {
logger.debug("{}Deleting file: " + file.getPath(), logPrefix);
}
store.getFileSystem().delete(file.getPath(), false);
if (isClosed()) {
if (logger.isDebugEnabled())
logger.debug("{}Expiry file cleanup interupted by bucket close", logPrefix);
return 0;
}
incrementDiskUsage(-1 * file.getLen());
}
previousCleanupTimestamp.set(System.currentTimeMillis());
return targets.size();
}
/**
* @param ts
* target timestamp
* @return list of hoplogs, whose expiry markers were created before target
* timestamp, and the expiry marker itself.
* @throws IOException
*/
protected List<FileStatus> getOptimizationTargets(long ts) throws IOException {
if (logger.isDebugEnabled()) {
logger.debug("{}Identifying optimization targets " + ts, logPrefix);
}
List<FileStatus> deleteTargets = new ArrayList<FileStatus>();
FileStatus[] markers = getExpiryMarkers();
if (markers != null) {
for (FileStatus marker : markers) {
String name = truncateExpiryExtension(marker.getPath().getName());
long timestamp = marker.getModificationTime();
// expired minor compacted files are not being used anywhere. These can
// be removed immediately. All the other expired files should be removed
// when the files have aged
boolean isTarget = false;
if (name.endsWith(MINOR_HOPLOG_EXTENSION)) {
isTarget = true;
} else if (timestamp < ts && name.endsWith(FLUSH_HOPLOG_EXTENSION)) {
isTarget = true;
} else if (timestamp < ts && name.endsWith(MAJOR_HOPLOG_EXTENSION)) {
HDFSCompactionConfig compactionConf = store.getHDFSCompactionConfig();
long majorCInterval = ((long)compactionConf.getMajorCompactionIntervalMins()) * 60 * 1000;
if (timestamp < (System.currentTimeMillis() - majorCInterval)) {
isTarget = true;
}
}
if (!isTarget) {
continue;
}
// if the file is still being read, do not delete or rename it
TrackedReference<Hoplog> used = hoplogReadersController.getInactiveHoplog(name);
if (used != null) {
if (used.inUse() && logger.isDebugEnabled()) {
logger.debug("{}Optimizer: found active expired hoplog:" + name, logPrefix);
} else if (logger.isDebugEnabled()) {
logger.debug("{}Optimizer: found open expired hoplog:" + name, logPrefix);
}
continue;
}
if (logger.isDebugEnabled()) {
logger.debug("{}Delete target identified " + marker.getPath(), logPrefix);
}
deleteTargets.add(marker);
Path hoplogPath = new Path(bucketPath, name);
if (store.getFileSystem().exists(hoplogPath)) {
FileStatus hoplog = store.getFileSystem().getFileStatus(hoplogPath);
deleteTargets.add(hoplog);
}
}
}
return deleteTargets;
}
/**
* Returns a list of of hoplogs present in the bucket's directory, expected to be called during
* hoplog set initialization
*/
List<Hoplog> identifyAndLoadSortedOplogs(boolean countSize) throws IOException {
FileSystem fs = store.getFileSystem();
if (! fs.exists(bucketPath)) {
return new ArrayList<Hoplog>();
}
FileStatus allFiles[] = fs.listStatus(bucketPath);
ArrayList<FileStatus> validFiles = new ArrayList<FileStatus>();
for (FileStatus file : allFiles) {
// All hoplog files contribute to disk usage
Matcher matcher = HOPLOG_NAME_PATTERN.matcher(file.getPath().getName());
if (! matcher.matches()) {
// not a hoplog
continue;
}
// account for the disk used by this file
if (countSize) {
incrementDiskUsage(file.getLen());
}
// All valid hoplog files must match the regex
matcher = SORTED_HOPLOG_PATTERN.matcher(file.getPath().getName());
if (matcher.matches()) {
validFiles.add(file);
}
}
FileStatus[] markers = getExpiryMarkers();
FileStatus[] validHoplogs = filterValidHoplogs(
validFiles.toArray(new FileStatus[validFiles.size()]), markers);
ArrayList<Hoplog> results = new ArrayList<Hoplog>();
if (validHoplogs == null || validHoplogs.length == 0) {
return results;
}
for (int i = 0; i < validHoplogs.length; i++) {
// Skip directories
if (validHoplogs[i].isDirectory()) {
continue;
}
final Path p = validHoplogs[i].getPath();
// skip empty file
if (fs.getFileStatus(p).getLen() <= 0) {
continue;
}
Hoplog hoplog = new HFileSortedOplog(store, p, store.getBlockCache(), stats, store.getStats());
results.add(hoplog);
}
return results;
}
private static int findMaxSequenceNumber(List<Hoplog> hoplogs) throws IOException {
int maxSeq = 0;
for (Hoplog hoplog : hoplogs) {
maxSeq = Math.max(maxSeq, getSequenceNumber(hoplog));
}
return maxSeq;
}
/**
* @return the sequence number associate with a hoplog file
*/
static int getSequenceNumber(Hoplog hoplog) {
Matcher matcher = SORTED_HOPLOG_PATTERN.matcher(hoplog.getFileName());
boolean matched = matcher.find();
assert matched;
return Integer.valueOf(matcher.group(3));
}
protected FileStatus[] getExpiryMarkers() throws IOException {
FileSystem fs = store.getFileSystem();
if (hoplogReadersController.hoplogs == null
|| hoplogReadersController.hoplogs.size() == 0) {
// there are no hoplogs in the system. May be the bucket is not existing
// at all.
if (!fs.exists(bucketPath)) {
if (logger.isDebugEnabled())
logger.debug("{}This bucket is unused, skipping expired hoplog check", logPrefix);
return null;
}
}
FileStatus files[] = FSUtils.listStatus(fs, bucketPath, new PathFilter() {
@Override
public boolean accept(Path file) {
// All expired hoplog end with expire extension and must match the valid file regex
String fileName = file.getName();
if (! fileName.endsWith(EXPIRED_HOPLOG_EXTENSION)) {
return false;
}
fileName = truncateExpiryExtension(fileName);
Matcher matcher = SORTED_HOPLOG_PATTERN.matcher(fileName);
return matcher.find();
}
});
return files;
}
@Override
public void clear() throws IOException {
//Suspend compaction while we are doing the clear. This
//aborts the currently in progress compaction.
getCompactor().suspend();
// while compaction is suspended, clear method marks hoplogs for deletion
// only. Files will be removed by cleanup thread after active gets and
// iterations are completed
String user = logger.isDebugEnabled() ? "clear" : null;
List<TrackedReference<Hoplog>> oplogs = null;
try {
oplogs = hoplogReadersController.getTrackedSortedOplogList(user);
markSortedOplogForDeletion(oplogs, true);
} finally {
if (oplogs != null) {
hoplogReadersController.releaseHoplogs(oplogs, user);
}
//Resume compaction
getCompactor().resume();
}
}
/**
* Performs the following activities
* <UL>
* <LI>Submits compaction requests as needed
* <LI>Deletes tmp files which the system failed to removed earlier
*/
@Override
public void performMaintenance() throws IOException {
long startTime = System.currentTimeMillis();
if (logger.isDebugEnabled())
logger.debug("{}Executing bucket maintenance", logPrefix);
submitCompactionRequests();
hoplogReadersController.closeInactiveHoplogs();
initiateCleanup();
cleanupTmpFiles();
if (logger.isDebugEnabled()) {
logger.debug("{}Time spent in bucket maintenance (in ms): "
+ (System.currentTimeMillis() - startTime), logPrefix);
}
}
@Override
public Future<CompactionStatus> forceCompaction(boolean isMajor) {
CompactionRequest request = new CompactionRequest(regionFolder, bucketId,
getCompactor(), isMajor, true/*force*/);
return HDFSCompactionManager.getInstance(store).submitRequest(request);
}
private Future<CompactionStatus> forceCompactionOnVersionUpgrade() {
CompactionRequest request = new CompactionRequest(regionFolder, bucketId, getCompactor(), true, true, true);
return HDFSCompactionManager.getInstance(store).submitRequest(request);
}
@Override
public long getLastMajorCompactionTimestamp() {
long ts = 0;
String user = logger.isDebugEnabled() ? "StoredProc" : null;
List<TrackedReference<Hoplog>> hoplogs = hoplogReadersController.getTrackedSortedOplogList(user);
try {
for (TrackedReference<Hoplog> hoplog : hoplogs) {
String fileName = hoplog.get().getFileName();
Matcher file = HOPLOG_NAME_PATTERN.matcher(fileName);
if (file.matches() && fileName.endsWith(MAJOR_HOPLOG_EXTENSION)) {
ts = getHoplogTimestamp(file);
break;
}
}
} finally {
hoplogReadersController.releaseHoplogs(hoplogs, user);
}
if (logger.isDebugEnabled()) {
logger.debug("{}HDFS: for bucket:"+getRegionBucketStr()+" returning last major compaction timestamp "+ts, logPrefix);
}
return ts;
}
private void initOldTmpFiles() throws IOException {
FileSystem fs = store.getFileSystem();
if (! fs.exists(bucketPath)) {
return;
}
oldTmpFiles = new LinkedList<FileStatus>(Arrays.asList(fs.listStatus(bucketPath, new TmpFilePathFilter())));
}
private void cleanupTmpFiles() throws IOException {
if(oldTmpFiles == null && tmpFiles == null) {
return;
}
if (oldTmpFiles != null) {
FileSystem fs = store.getFileSystem();
long now = System.currentTimeMillis();
for (Iterator<FileStatus> itr = oldTmpFiles.iterator(); itr.hasNext();) {
FileStatus file = itr.next();
if(file.getModificationTime() + TMP_FILE_EXPIRATION_TIME_MS > now) {
if (logger.isDebugEnabled()) {
logger.debug("{}Deleting temporary file:" + file.getPath(), logPrefix);
}
fs.delete(file.getPath(), false);
itr.remove();
}
}
}
if (tmpFiles != null) {
for (Hoplog so : tmpFiles.keySet()) {
if (logger.isDebugEnabled()) {
logger.debug("{}Deleting temporary file:" + so.getFileName(), logPrefix);
}
deleteTmpFile(null, so);
}
}
}
/**
* Executes tiered compaction of hoplog files. One instance of compacter per bucket will exist
*/
protected class HoplogCompactor implements Compactor {
private volatile boolean suspend = false;
// the following boolean will be used to synchronize minor compaction
private AtomicBoolean isMinorCompactionActive = new AtomicBoolean(false);
// the following boolean will be used to synchronize major compaction
private AtomicBoolean isMajorCompactionActive = new AtomicBoolean(false);
// the following integer tracks the max sequence number amongst the
// target files being major compacted. This value will be used to prevent
// concurrent MajorC and minorC. MinorC is preempted in case of an
// overlap. This object is also used as a lock. The lock is acquired before
// identifying compaction targets and before marking targets for expiry
final AtomicInteger maxMajorCSeqNum = new AtomicInteger(-1);
@Override
public void suspend() {
long wait = Long.getLong(HoplogConfig.SUSPEND_MAX_WAIT_MS, HoplogConfig.SUSPEND_MAX_WAIT_MS_DEFAULT);
this.suspend=true;
//this forces the compact method to finish.
while (isMajorCompactionActive.get() || isMinorCompactionActive.get()) {
if (wait < 0) {
wait = Long.getLong(HoplogConfig.SUSPEND_MAX_WAIT_MS, HoplogConfig.SUSPEND_MAX_WAIT_MS_DEFAULT);
String act = isMajorCompactionActive.get() ? "MajorC" : "MinorC";
logger.warn(LocalizedMessage.create(LocalizedStrings.HOPLOG_SUSPEND_OF_0_FAILED_IN_1, new Object[] {act, wait}));
break;
}
try {
TimeUnit.MILLISECONDS.sleep(50);
wait -= 50;
} catch (InterruptedException e) {
break;
}
}
}
@Override
public void resume() {
this.suspend = false;
}
@Override
public boolean isBusy(boolean isMajor) {
if (isMajor) {
return isMajorCompactionActive.get();
} else {
return isMinorCompactionActive.get();
}
}
/**
* compacts hoplogs. The method takes a minor or major compaction "lock" to
* prevent concurrent execution of compaction cycles. A possible improvement
* is to allow parallel execution of minor compaction if the sets of
* hoplogs being compacted are disjoint.
*/
@Override
public boolean compact(boolean isMajor, boolean isForced) throws IOException {
if(suspend) {
return false;
}
String extension = null;
IOOperation compactionStats = null;
long startTime = 0;
final AtomicBoolean lock;
Hoplog compactedHoplog = null;
List<TrackedReference<Hoplog>> targets = null;
String user = logger.isDebugEnabled() ? (isMajor ? "MajorC" : "MinorC") : null;
if (isMajor) {
lock = isMajorCompactionActive;
extension = MAJOR_HOPLOG_EXTENSION;
compactionStats = stats.getMajorCompaction();
} else {
lock = isMinorCompactionActive;
extension = MINOR_HOPLOG_EXTENSION;
compactionStats = stats.getMinorCompaction();
}
// final check before beginning compaction. Return if compaction is active
if (! lock.compareAndSet(false, true)) {
if (isMajor) {
if (logger.isDebugEnabled())
logger.debug("{}Major compaction already active. Ignoring new request", logPrefix);
} else {
if (logger.isDebugEnabled())
logger.debug("Minor compaction already active. Ignoring new request", logPrefix);
}
return false;
}
try {
if(suspend) {
return false;
}
// variables for updating stats
startTime = compactionStats.begin();
int seqNum = -1;
int lastKnownMajorCSeqNum;
synchronized (maxMajorCSeqNum) {
lastKnownMajorCSeqNum = maxMajorCSeqNum.get();
targets = hoplogReadersController.getTrackedSortedOplogList(user);
getCompactionTargets(isMajor, targets, lastKnownMajorCSeqNum);
if (targets != null && targets.size() > 0) {
targets = Collections.unmodifiableList(targets);
seqNum = getSequenceNumber(targets.get(0).get());
if (isMajor) {
maxMajorCSeqNum.set(seqNum);
}
}
}
if (targets == null || targets.isEmpty() || (!isMajor && targets.size() == 1 && !isForced)) {
if (logger.isDebugEnabled()){
logger.debug("{}Skipping compaction, too few hoplops to compact. Major?" + isMajor, logPrefix);
}
compactionStats.end(0, startTime);
return true;
}
//In case that we only have one major compacted file, we don't need to run major compaction to
//generate a copy of the same content
if (targets.size() == 1 && !isForced) {
String hoplogName = targets.get(0).get().getFileName();
if (hoplogName.endsWith(MAJOR_HOPLOG_EXTENSION)){
if (logger.isDebugEnabled()){
logger.debug("{}Skipping compaction, no need to compact a major compacted file. Major?" + isMajor, logPrefix);
}
compactionStats.end(0, startTime);
return true;
}
}
if (logger.isDebugEnabled()) {
for (TrackedReference<Hoplog> target : targets) {
if (logger.isDebugEnabled()) {
fineLog("Target:", target, " size:", target.get().getSize());
}
}
}
// Create a temporary hoplog for compacted hoplog. The compacted hoplog
// will have the seq number same as that of youngest target file. Any
// hoplog younger than target hoplogs will have a higher sequence number
compactedHoplog = getTmpSortedOplog(seqNum, extension);
long byteCount;
try {
byteCount = fillCompactionHoplog(isMajor, targets, compactedHoplog, lastKnownMajorCSeqNum);
compactionStats.end(byteCount, startTime);
} catch (InterruptedException e) {
if (logger.isDebugEnabled())
logger.debug("{}Compaction execution suspended", logPrefix);
compactionStats.error(startTime);
return false;
} catch (ForceReattemptException e) {
if (logger.isDebugEnabled())
logger.debug("{}Compaction execution suspended", logPrefix);
compactionStats.error(startTime);
return false;
}
// creation of compacted hoplog completed, its time to use it for
// reading. Before using it, make sure minorC and mojorC were not
// executing on overlapping sets of files. All targets can be marked for
// expiration. Notify listener if configured. Update bucket size
synchronized (maxMajorCSeqNum) {
if (!isMajor && isMinorMajorOverlap(targets, maxMajorCSeqNum.get())) {
// MajorC is higher priority. In case of any overlap kill minorC
if (logger.isDebugEnabled())
logger.debug("{}Interrupting MinorC for a concurrent MajorC", logPrefix);
compactionStats.error(startTime);
return false;
}
addSortedOplog(compactedHoplog, true, false);
markSortedOplogForDeletion(targets, true);
}
} catch (IOException e) {
compactionStats.error(startTime);
throw e;
} finally {
if (isMajor) {
maxMajorCSeqNum.set(-1);
}
lock.set(false);
hoplogReadersController.releaseHoplogs(targets, user);
}
incrementDiskUsage(compactedHoplog.getSize());
reEstimateBucketSize();
notifyCompactionListeners(isMajor);
return true;
}
/**
* Major compaction compacts all files. Seq number of the youngest file
* being MajorCed is known. If MinorC is operating on any file with a seq
* number less than this number, there is a overlap
* @param num
*/
boolean isMinorMajorOverlap(List<TrackedReference<Hoplog>> targets, int num) {
if (num < 0 || targets == null || targets.isEmpty()) {
return false;
}
for (TrackedReference<Hoplog> hop : targets) {
if (getSequenceNumber(hop.get()) <= num) {
return true;
}
}
return false;
}
/**
* Iterates over targets and writes eligible targets to the output hoplog.
* Handles creation of iterators and writer and closing it in case of
* errors.
*/
public long fillCompactionHoplog(boolean isMajor,
List<TrackedReference<Hoplog>> targets, Hoplog output, int majorCSeqNum)
throws IOException, InterruptedException, ForceReattemptException {
HoplogWriter writer = null;
ICardinality localHLL = new HyperLogLog(HLL_CONSTANT);
HoplogSetIterator mergedIter = null;
int byteCount = 0;
try {
// create a merged iterator over the targets and write entries into
// output hoplog
mergedIter = new HoplogSetIterator(targets);
writer = output.createWriter(mergedIter.getRemainingEntryCount());
boolean interrupted = false;
for (; mergedIter.hasNext(); ) {
if (suspend) {
interrupted = true;
break;
} else if (!isMajor && maxMajorCSeqNum.get() > majorCSeqNum) {
// A new major compaction cycle is starting, quit minorC to avoid
// duplicate work and missing deletes
if (logger.isDebugEnabled())
logger.debug("{}Preempting MinorC, new MajorC cycle detected ", logPrefix);
interrupted = true;
break;
}
mergedIter.nextBB();
ByteBuffer k = mergedIter.getKeyBB();
ByteBuffer v = mergedIter.getValueBB();
boolean isDeletedEntry = isDeletedEntry(v.array(), v.arrayOffset());
if (isMajor && isDeletedEntry) {
// its major compaction, time to ignore deleted entries
continue;
}
if (!isDeletedEntry) {
int hash = MurmurHash.hash(k.array(), k.arrayOffset(), k.remaining(), -1);
localHLL.offerHashed(hash);
}
writer.append(k, v);
byteCount += (k.remaining() + v.remaining());
}
mergedIter.close();
mergedIter = null;
writer.close(buildMetaData(localHLL));
writer = null;
if (interrupted) {
// If we suspended compaction operations, delete the partially written
// file and return.
output.delete();
throw new InterruptedException();
}
// ping secondaries before making the file a legitimate file to ensure
// that in case of split brain, no other vm has taken up as primary. #50110.
pingSecondaries();
makeLegitimate(output);
return byteCount;
} catch (IOException e) {
e = handleWriteHdfsIOError(writer, output, e);
writer = null;
throw e;
} catch (ForceReattemptException e) {
output.delete();
throw e;
}finally {
if (mergedIter != null) {
mergedIter.close();
}
if (writer != null) {
writer.close();
}
}
}
/**
* identifies targets. For major compaction all sorted oplogs will be
* iterated on. For minor compaction, policy driven fewer targets will take
* place.
*/
protected void getCompactionTargets(boolean major,
List<TrackedReference<Hoplog>> targets, int majorCSeqNum) {
if (!major) {
getMinorCompactionTargets(targets, majorCSeqNum);
}
}
/**
* list of oplogs most suitable for compaction. The alogrithm selects m
* smallest oplogs which are not bigger than X in size. Null if valid
* candidates are not found
*/
void getMinorCompactionTargets(List<TrackedReference<Hoplog>> targets, int majorCSeqNum)
{
List<TrackedReference<Hoplog>> omittedHoplogs = new ArrayList<TrackedReference<Hoplog>>();
// reverse the order of hoplogs in list. the oldest file becomes the first file.
Collections.reverse(targets);
// hoplog greater than this size will not be minor-compacted
final long MAX_COMPACTION_FILE_SIZE;
// maximum number of files to be included in any compaction cycle
final int MAX_FILE_COUNT_COMPACTION;
// minimum number of files that must be present for compaction to be worth
final int MIN_FILE_COUNT_COMPACTION;
HDFSCompactionConfig compactionConf = store.getHDFSCompactionConfig();
MAX_COMPACTION_FILE_SIZE = ((long)compactionConf.getMaxInputFileSizeMB()) * 1024 *1024;
MAX_FILE_COUNT_COMPACTION = compactionConf.getMaxInputFileCount();
MIN_FILE_COUNT_COMPACTION = compactionConf.getMinInputFileCount();
try {
// skip till first file smaller than the max compaction file size is
// found. And if MajorC is active, move to a file which is also outside
// scope of MajorC
for (Iterator<TrackedReference<Hoplog>> iterator = targets.iterator(); iterator.hasNext();) {
TrackedReference<Hoplog> oplog = iterator.next();
if (majorCSeqNum >= getSequenceNumber(oplog.get())) {
iterator.remove();
omittedHoplogs.add(oplog);
if (logger.isDebugEnabled()){
fineLog("Overlap with MajorC, excluding hoplog " + oplog.get());
}
continue;
}
if (oplog.get().getSize() > MAX_COMPACTION_FILE_SIZE || oplog.get().getFileName().endsWith(MAJOR_HOPLOG_EXTENSION)) {
// big file will not be included for minor compaction
// major compacted file will not be converted to minor compacted file
iterator.remove();
omittedHoplogs.add(oplog);
if (logger.isDebugEnabled()) {
fineLog("Excluding big hoplog from minor cycle:",
oplog.get(), " size:", oplog.get().getSize(), " limit:",
MAX_COMPACTION_FILE_SIZE);
}
} else {
// first small hoplog found, skip the loop
break;
}
}
// If there are too few files no need to perform compaction
if (targets.size() < MIN_FILE_COUNT_COMPACTION) {
if (logger.isDebugEnabled()){
logger.debug("{}Too few hoplogs for minor cycle:" + targets.size(), logPrefix);
}
omittedHoplogs.addAll(targets);
targets.clear();
return;
}
float maxGain = Float.MIN_VALUE;
int bestFrom = -1;
int bestTo = -1;
// for listSize=5 list, minFile=3; maxIndex=5-3.
// so from takes values 0,1,2
int maxIndexForFrom = targets.size() - MIN_FILE_COUNT_COMPACTION;
for (int from = 0; from <= maxIndexForFrom ; from++) {
// for listSize=6 list, minFile=3, maxFile=5; minTo=0+3-1, maxTo=0+5-1
// so to takes values 2,3,4
int minIndexForTo = from + MIN_FILE_COUNT_COMPACTION - 1;
int maxIndexForTo = Math.min(from + MAX_FILE_COUNT_COMPACTION, targets.size());
for (int i = minIndexForTo; i < maxIndexForTo; i++) {
Float gain = computeGain(from, i, targets);
if (gain == null) {
continue;
}
if (gain > maxGain) {
maxGain = gain;
bestFrom = from;
bestTo = i;
}
}
}
if (bestFrom == -1) {
if (logger.isDebugEnabled())
logger.debug("{}Failed to find optimal target set for MinorC", logPrefix);
omittedHoplogs.addAll(targets);
targets.clear();
return;
}
if (logger.isDebugEnabled()) {
fineLog("MinorCTarget optimal result from:", bestFrom, " to:", bestTo);
}
// remove hoplogs they do not fall in the bestFrom-bestTo range
int i = 0;
for (Iterator<TrackedReference<Hoplog>> iter = targets.iterator(); iter.hasNext();) {
TrackedReference<Hoplog> hop = iter.next();
if (i < bestFrom || i > bestTo) {
iter.remove();
omittedHoplogs.add(hop);
}
i++;
}
} finally {
// release readers of targets not included in the compaction cycle
String user = logger.isDebugEnabled() ? "MinorC" : null;
hoplogReadersController.releaseHoplogs(omittedHoplogs, user);
}
// restore the order, youngest file is the first file again
Collections.reverse(targets);
}
@Override
public HDFSStore getHdfsStore() {
return store;
}
}
Float computeGain(int from, int to, List<TrackedReference<Hoplog>> targets) {
double SIZE_64K = 64.0 * 1024;
// TODO the base for log should depend on the average number of keys a index block will contain
double LOG_BASE = Math.log(AVG_NUM_KEYS_PER_INDEX_BLOCK);
long totalSize = 0;
double costBefore = 0f;
for (int i = from; i <= to; i++) {
long size = targets.get(i).get().getSize();
if (size == 0) {
continue;
}
totalSize += size;
// For each hoplog file, read cost is number of index block reads and 1
// data block read. Index blocks on an average contain N keys and are
// organized in a N-ary tree structure. Hence the number of index block
// reads will be logBaseN(number of data blocks)
costBefore += Math.ceil(Math.max(1.0, Math.log(size / SIZE_64K) / LOG_BASE)) + 1;
}
// if the first file is relatively too large this set is bad for compaction
long firstFileSize = targets.get(from).get().getSize();
if (firstFileSize > (totalSize - firstFileSize) * RATIO) {
if (logger.isDebugEnabled()){
fineLog("First file too big:", firstFileSize, " totalSize:", totalSize);
}
return null;
}
// compute size in mb so that the value of gain is in few decimals
long totalSizeInMb = totalSize / 1024 / 1024;
if (totalSizeInMb == 0) {
// the files are tooooo small, just return the count. The more we compact
// the better it is
if (logger.isDebugEnabled()) {
logger.debug("{}Total size too small:" +totalSize, logPrefix);
}
return (float) costBefore;
}
double costAfter = Math.ceil(Math.log(totalSize / SIZE_64K) / LOG_BASE) + 1;
return (float) ((costBefore - costAfter) / totalSizeInMb);
}
/**
* Hoplog readers are accessed asynchronously. There could be a window in
* which, while a hoplog is being iterated on, it gets compacted and becomes
* expired or inactive. The reader of the hoplog must not be closed till the
* iterator completes. All such scenarios will be managed by this class. It
* will keep all the reader, active and inactive, and reference counter to the
* readers. An inactive reader will be closed if the reference count goes down
* to 0.
*
* One important point, only compaction process makes a hoplog inactive.
* Compaction process in a bucket is single threaded. So compaction itself
* will not face race condition. Read and scan operations on the bucket will
* be affected. So reference counter is incremented for each read and scan.
*
* @author ashvina
*/
private class HoplogReadersController implements HoplogReaderActivityListener {
private Integer maxOpenFilesLimit;
// sorted collection of all the active oplog files associated with this bucket. Instead of a
// queue, a set is used. New files created as part of compaction may be inserted after a few
// hoplogs were created. The compacted file is such a case but should not be treated newest.
private final ConcurrentSkipListSet<TrackedReference<Hoplog>> hoplogs;
// list of all the hoplogs that have been compacted and need to be closed
// once the reference count reduces to 0
private final ConcurrentHashSet<TrackedReference<Hoplog>> inactiveHoplogs;
// ReadWriteLock on list of oplogs to allow for consistent reads and scans
// while hoplog set changes. A write lock is needed on completion of
// compaction, addition of a new hoplog or on receiving updates message from
// other GF nodes
private final ReadWriteLock hoplogRWLock = new ReentrantReadWriteLock(true);
// tracks the number of active readers for hoplogs of this bucket
private AtomicInteger activeReaderCount = new AtomicInteger(0);
public HoplogReadersController() {
HoplogComparator comp = new HoplogComparator();
hoplogs = new ConcurrentSkipListSet<TrackedReference<Hoplog>>(comp) {
private static final long serialVersionUID = 1L;
@Override
public boolean add(TrackedReference<Hoplog> e) {
// increment number of hoplogs active for this bucket
boolean result = super.add(e);
if (result)
stats.incActiveFiles(1);
return result;
}
@Override
public boolean remove(Object o) {
// decrement the number of hoplogs active for this bucket
boolean result = super.remove(o);
if (result)
stats.incActiveFiles(-1);
return result;
}
};
inactiveHoplogs = new ConcurrentHashSet<TrackedReference<Hoplog>>() {
private static final long serialVersionUID = 1L;
@Override
public boolean add(TrackedReference<Hoplog> e) {
boolean result = super.add(e);
if (result)
stats.incInactiveFiles(1);
return result;
}
@Override
public boolean remove(Object o) {
boolean result = super.remove(o);
if (result)
stats.incInactiveFiles(-1);
return result;
}
};
maxOpenFilesLimit = Integer.getInteger(
HoplogConfig.BUCKET_MAX_OPEN_HFILES_CONF,
HoplogConfig.BUCKET_MAX_OPEN_HFILES_DEFAULT);
}
Hoplog getOldestHoplog() {
if (hoplogs.isEmpty()) {
return null;
}
return hoplogs.last().get();
}
/**
* locks sorted oplogs collection and performs add operation
* @return if addition was successful
*/
private boolean addSortedOplog(Hoplog so) throws IOException {
if (logger.isDebugEnabled()) {
logger.debug("{}Try add " + so, logPrefix);
}
hoplogRWLock.writeLock().lock();
try {
int size = hoplogs.size();
boolean result = hoplogs.add(new TrackedReference<Hoplog>(so));
so.setReaderActivityListener(this);
if (logger.isDebugEnabled()){
fineLog("Added: ", so, " Before:", size, " After:", hoplogs.size());
}
return result;
} finally {
hoplogRWLock.writeLock().unlock();
}
}
/**
* locks sorted oplogs collection and performs remove operation and updates readers also
*/
private void removeSortedOplog(TrackedReference<Hoplog> so) throws IOException {
if (logger.isDebugEnabled()) {
logger.debug("Try remove " + so, logPrefix);
}
hoplogRWLock.writeLock().lock();
try {
int size = hoplogs.size();
boolean result = hoplogs.remove(so);
if (result) {
inactiveHoplogs.add(so);
if (logger.isDebugEnabled()) {
fineLog("Removed: ", so, " Before:", size, " After:", hoplogs.size());
}
} else {
if (inactiveHoplogs.contains(so)) {
if (logger.isDebugEnabled()) {
logger.debug("{}Found a missing active hoplog in inactive list." + so, logPrefix);
}
} else {
so.get().close();
logger.warn(LocalizedMessage.create(LocalizedStrings.HOPLOG_MISSING_IN_BUCKET_FORCED_CLOSED, so.get()));
}
}
} finally {
hoplogRWLock.writeLock().unlock();
}
}
private void closeInactiveHoplogs() throws IOException {
hoplogRWLock.writeLock().lock();
try {
for (TrackedReference<Hoplog> hoplog : inactiveHoplogs) {
if (logger.isDebugEnabled()){
logger.debug("{}Try close inactive " + hoplog, logPrefix);
}
if (!hoplog.inUse()) {
int size = inactiveHoplogs.size();
inactiveHoplogs.remove(hoplog);
closeReaderAndSuppressError(hoplog.get(), true);
if (logger.isDebugEnabled()){
fineLog("Closed inactive: ", hoplog.get(), " Before:", size,
" After:", inactiveHoplogs.size());
}
}
}
} finally {
hoplogRWLock.writeLock().unlock();
}
}
/**
* @param target
* name of the hoplog file
* @return trackedReference if target exists in inactive hoplog list.
* @throws IOException
*/
TrackedReference<Hoplog> getInactiveHoplog(String target) throws IOException {
hoplogRWLock.writeLock().lock();
try {
for (TrackedReference<Hoplog> hoplog : inactiveHoplogs) {
if (hoplog.get().getFileName().equals(target)) {
if (logger.isDebugEnabled()) {
logger.debug("{}Target found in inactive hoplogs list: " + hoplog, logPrefix);
}
return hoplog;
}
}
if (logger.isDebugEnabled()){
logger.debug("{}Target not found in inactive hoplogs list: " + target, logPrefix);
}
return null;
} finally {
hoplogRWLock.writeLock().unlock();
}
}
/**
* force closes all readers
*/
public void close() throws IOException {
hoplogRWLock.writeLock().lock();
try {
for (TrackedReference<Hoplog> hoplog : hoplogs) {
closeReaderAndSuppressError(hoplog.get(), true);
}
for (TrackedReference<Hoplog> hoplog : inactiveHoplogs) {
closeReaderAndSuppressError(hoplog.get(), true);
}
} finally {
hoplogs.clear();
inactiveHoplogs.clear();
hoplogRWLock.writeLock().unlock();
}
}
/**
* locks hoplogs to create a snapshot of active hoplogs. reference of each
* reader is incremented to keep it from getting closed
*
* @return ordered list of sorted oplogs
*/
private List<TrackedReference<Hoplog>> getTrackedSortedOplogList(String user) {
List<TrackedReference<Hoplog>> oplogs = new ArrayList<TrackedReference<Hoplog>>();
hoplogRWLock.readLock().lock();
try {
for (TrackedReference<Hoplog> oplog : hoplogs) {
oplog.increment(user);
oplogs.add(oplog);
if (logger.isDebugEnabled()) {
logger.debug("{}Track ref " + oplog, logPrefix);
}
}
} finally {
hoplogRWLock.readLock().unlock();
}
return oplogs;
}
private TrackedReference<Hoplog> trackHoplog(Hoplog hoplog, String user) {
hoplogRWLock.readLock().lock();
try {
for (TrackedReference<Hoplog> oplog : hoplogs) {
if (oplog.get().getFileName().equals(hoplog.getFileName())) {
oplog.increment(user);
if (logger.isDebugEnabled()) {
logger.debug("{}Track " + oplog, logPrefix);
}
return oplog;
}
}
} finally {
hoplogRWLock.readLock().unlock();
}
throw new NoSuchElementException(hoplog.getFileName());
}
public void releaseHoplogs(List<TrackedReference<Hoplog>> targets, String user) {
if (targets == null) {
return;
}
for (int i = targets.size() - 1; i >= 0; i--) {
TrackedReference<Hoplog> hoplog = targets.get(i);
releaseHoplog(hoplog, user);
}
}
public void releaseHoplog(TrackedReference<Hoplog> target, String user) {
if (target == null) {
return;
}
target.decrement(user);
if (logger.isDebugEnabled()) {
logger.debug("{}Try release " + target, logPrefix);
}
if (target.inUse()) {
return;
}
// there are no users of this hoplog. if it is inactive close it.
hoplogRWLock.writeLock().lock();
try {
if (!target.inUse()) {
if (inactiveHoplogs.contains(target) ) {
int sizeBefore = inactiveHoplogs.size();
inactiveHoplogs.remove(target);
closeReaderAndSuppressError(target.get(), true);
if (logger.isDebugEnabled()) {
fineLog("Closed inactive: ", target, " totalBefore:", sizeBefore,
" totalAfter:", inactiveHoplogs.size());
}
} else if (hoplogs.contains(target)) {
closeExcessReaders();
}
}
} catch (IOException e) {
logger.warn(LocalizedMessage.create(LocalizedStrings.HOPLOG_IO_ERROR,
"Close reader: " + target.get().getFileName()), e);
} finally {
hoplogRWLock.writeLock().unlock();
}
}
/*
* detects if the total number of open hdfs readers is more than configured
* max file limit. In case the limit is exceeded, some readers need to be
* closed to avoid dadanode receiver overflow error.
*/
private void closeExcessReaders() throws IOException {
if (logger.isDebugEnabled()) {
logger.debug("{}Close excess readers. Size:" + hoplogs.size()
+ " activeReaders:" + activeReaderCount.get() + " limit:"
+ maxOpenFilesLimit, logPrefix);
}
if (hoplogs.size() <= maxOpenFilesLimit) {
return;
}
if (activeReaderCount.get() <= maxOpenFilesLimit) {
return;
}
for (TrackedReference<Hoplog> hoplog : hoplogs.descendingSet()) {
if (!hoplog.inUse() && !hoplog.get().isClosed()) {
hoplog.get().close(false);
if (logger.isDebugEnabled()) {
logger.debug("{}Excess reader closed " + hoplog, logPrefix);
}
}
if (activeReaderCount.get() <= maxOpenFilesLimit) {
return;
}
}
}
@Override
public void readerCreated() {
activeReaderCount.incrementAndGet();
stats.incActiveReaders(1);
if (logger.isDebugEnabled())
logger.debug("{}ActiveReader++", logPrefix);
}
@Override
public void readerClosed() {
activeReaderCount.decrementAndGet();
stats.incActiveReaders(-1);
if (logger.isDebugEnabled())
logger.debug("{}ActiveReader--", logPrefix);
}
}
/**
* returns an ordered list of oplogs, FOR TESTING ONLY
*/
public List<TrackedReference<Hoplog>> getSortedOplogs() throws IOException {
List<TrackedReference<Hoplog>> oplogs = new ArrayList<TrackedReference<Hoplog>>();
for (TrackedReference<Hoplog> oplog : hoplogReadersController.hoplogs) {
oplogs.add(oplog);
}
return oplogs;
}
/**
* Merged iterator on a list of hoplogs.
*/
public class BucketIterator implements HoplogIterator<byte[], SortedHoplogPersistedEvent> {
// list of hoplogs to be iterated on.
final List<TrackedReference<Hoplog>> hoplogList;
HoplogSetIterator mergedIter;
public BucketIterator(List<TrackedReference<Hoplog>> hoplogs) throws IOException {
this.hoplogList = hoplogs;
try {
mergedIter = new HoplogSetIterator(this.hoplogList);
if (logger.isDebugEnabled()) {
for (TrackedReference<Hoplog> hoplog : hoplogs) {
logger.debug("{}BucketIter target hop:" + hoplog.get().getFileName(), logPrefix);
}
}
} catch (IllegalArgumentException e) {
if (IOException.class.isAssignableFrom(e.getCause().getClass())) {
throw handleIOError((IOException) e.getCause());
} else {
throw e;
}
} catch (IOException e) {
throw handleIOError(e);
} catch (HDFSIOException e) {
throw handleIOError(e);
}
}
@Override
public boolean hasNext() {
return mergedIter.hasNext();
}
@Override
public byte[] next() throws IOException {
try {
return HFileSortedOplog.byteBufferToArray(mergedIter.next());
} catch (IllegalArgumentException e) {
if (IOException.class.isAssignableFrom(e.getCause().getClass())) {
throw handleIOError((IOException) e.getCause());
} else {
throw e;
}
} catch (IOException e) {
throw handleIOError(e);
}
}
@Override
public byte[] getKey() {
// merged iterator returns a byte[]. This needs to be deserialized to the object which was
// provided during flush operation
return HFileSortedOplog.byteBufferToArray(mergedIter.getKey());
}
@Override
public SortedHoplogPersistedEvent getValue() {
// merged iterator returns a byte[]. This needs to be deserialized to the
// object which was provided during flush operation
try {
return deserializeValue(HFileSortedOplog.byteBufferToArray(mergedIter.getValue()));
} catch (IOException e) {
throw new HDFSIOException("Failed to deserialize byte while iterating on partition", e);
}
}
@Override
public void remove() {
mergedIter.remove();
}
@Override
public void close() {
// TODO release the closed iterators early
String user = logger.isDebugEnabled() ? "Scan" : null;
hoplogReadersController.releaseHoplogs(hoplogList, user);
}
}
/**
* This utility class is used to filter temporary hoplogs in a bucket
* directory
*
* @author ashvina
*/
private static class TmpFilePathFilter implements PathFilter {
@Override
public boolean accept(Path path) {
Matcher matcher = HOPLOG_NAME_PATTERN.matcher(path.getName());
if (matcher.matches() && path.getName().endsWith(TEMP_HOPLOG_EXTENSION)) {
return true;
}
return false;
}
}
private void fineLog(Object... strings) {
if (logger.isDebugEnabled()) {
StringBuffer sb = concatString(strings);
logger.debug(logPrefix + sb.toString());
}
}
private StringBuffer concatString(Object... strings) {
StringBuffer sb = new StringBuffer();
for (Object str : strings) {
sb.append(str.toString());
}
return sb;
}
@Override
public void compactionCompleted(String region, int bucket, boolean isMajor) {
// do nothing for compaction events. Hoplog Organizer depends on addition
// and deletion of hoplogs only
}
}