blob: 22e88326911207fadaf10bb2375ffbaec8261819 [file] [log] [blame]
/*=========================================================================
* Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
* This product is protected by U.S. and international copyright
* and intellectual property laws. Pivotal products are covered by
* one or more patents listed at http://www.pivotal.io/patents.
*========================================================================
*/
package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import com.gemstone.gemfire.cache.Operation;
import com.gemstone.gemfire.cache.hdfs.internal.HDFSStoreImpl;
import com.gemstone.gemfire.cache.hdfs.internal.PersistedEventImpl;
import com.gemstone.gemfire.cache.hdfs.internal.QueuedPersistentEvent;
import com.gemstone.gemfire.cache.hdfs.internal.SortedHoplogPersistedEvent;
import com.gemstone.gemfire.cache.hdfs.internal.hoplog.AbstractHoplog.HoplogDescriptor;
import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSRegionDirector.HdfsRegionManager;
import com.gemstone.gemfire.internal.Assert;
import com.gemstone.gemfire.internal.cache.BucketRegion;
import com.gemstone.gemfire.internal.cache.ForceReattemptException;
import com.gemstone.gemfire.internal.cache.PartitionedRegion;
import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedOplogStatistics;
import com.gemstone.gemfire.internal.cache.persistence.soplog.TrackedReference;
import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
import com.gemstone.gemfire.internal.logging.LogService;
import org.apache.logging.log4j.Logger;
public abstract class AbstractHoplogOrganizer<T extends PersistedEventImpl> implements HoplogOrganizer<T> {
public static final String MINOR_HOPLOG_EXTENSION = ".ihop";
public static final String MAJOR_HOPLOG_EXTENSION = ".chop";
public static final String EXPIRED_HOPLOG_EXTENSION = ".exp";
public static final String TEMP_HOPLOG_EXTENSION = ".tmp";
public static final String FLUSH_HOPLOG_EXTENSION = ".hop";
public static final String SEQ_HOPLOG_EXTENSION = ".shop";
// all valid hoplogs will follow the following name pattern
public static final String HOPLOG_NAME_REGEX = "(.+?)-(\\d+?)-(\\d+?)";
public static final Pattern HOPLOG_NAME_PATTERN = Pattern.compile(HOPLOG_NAME_REGEX
+ "\\.(.*)");
public static boolean JUNIT_TEST_RUN = false;
protected static final boolean ENABLE_INTEGRITY_CHECKS = Boolean
.getBoolean("gemfire.HdfsSortedOplogOrganizer.ENABLE_INTEGRITY_CHECKS")
|| assertionsEnabled();
private static boolean assertionsEnabled() {
boolean enabled = false;
assert enabled = true;
return enabled;
}
protected HdfsRegionManager regionManager;
// name or id of bucket managed by this organizer
protected final String regionFolder;
protected final int bucketId;
// path of the region directory
protected final Path basePath;
// identifies path of directory containing a bucket's oplog files
protected final Path bucketPath;
protected final HDFSStoreImpl store;
// assigns a unique increasing number to each oplog file
protected AtomicInteger sequence;
//logger instance
protected static final Logger logger = LogService.getLogger();
protected final String logPrefix;
protected SortedOplogStatistics stats;
AtomicLong bucketDiskUsage = new AtomicLong(0);
// creation of new files and expiration of files will be synchronously
// notified to the listener
protected HoplogListener listener;
private volatile boolean closed = false;
protected Object changePrimarylockObject = new Object();
public AbstractHoplogOrganizer(HdfsRegionManager region, int bucketId) {
assert region != null;
this.regionManager = region;
this.regionFolder = region.getRegionFolder();
this.store = region.getStore();
this.listener = region.getListener();
this.stats = region.getHdfsStats();
this.bucketId = bucketId;
this.basePath = new Path(store.getHomeDir());
this.bucketPath = new Path(basePath, regionFolder + "/" + bucketId);
this.logPrefix = "<" + getRegionBucketStr() + "> ";
}
@Override
public boolean isClosed() {
return closed || regionManager.isClosed();
}
@Override
public void close() throws IOException {
closed = true;
// this bucket is closed and may be owned by a new node. So reduce the store
// usage stat, as the new owner adds the usage metric
incrementDiskUsage((-1) * bucketDiskUsage.get());
}
@Override
public abstract void flush(Iterator<? extends QueuedPersistentEvent> bufferIter,
int count) throws IOException, ForceReattemptException;
@Override
public abstract void clear() throws IOException;
protected abstract Hoplog getHoplog(Path hoplogPath) throws IOException;
@Override
public void hoplogCreated(String region, int bucketId, Hoplog... oplogs)
throws IOException {
throw new UnsupportedOperationException("Not supported for "
+ this.getClass().getSimpleName());
}
@Override
public void hoplogDeleted(String region, int bucketId, Hoplog... oplogs)
throws IOException {
throw new UnsupportedOperationException("Not supported for "
+ this.getClass().getSimpleName());
}
@Override
public void compactionCompleted(String region, int bucket, boolean isMajor) {
throw new UnsupportedOperationException("Not supported for "
+ this.getClass().getSimpleName());
}
@Override
public T read(byte[] key) throws IOException {
throw new UnsupportedOperationException("Not supported for "
+ this.getClass().getSimpleName());
}
@Override
public HoplogIterator<byte[], T> scan() throws IOException {
throw new UnsupportedOperationException("Not supported for "
+ this.getClass().getSimpleName());
}
@Override
public HoplogIterator<byte[], T> scan(byte[] from, byte[] to)
throws IOException {
throw new UnsupportedOperationException("Not supported for "
+ this.getClass().getSimpleName());
}
@Override
public HoplogIterator<byte[], T> scan(byte[] from,
boolean fromInclusive, byte[] to, boolean toInclusive) throws IOException {
throw new UnsupportedOperationException("Not supported for "
+ this.getClass().getSimpleName());
}
@Override
public long sizeEstimate() {
throw new UnsupportedOperationException("Not supported for "
+ this.getClass().getSimpleName());
}
/**
* @return returns an oplogs full path after prefixing bucket path to the file
* name
*/
protected String getPathStr(Hoplog oplog) {
return bucketPath.toString() + "/" + oplog.getFileName();
}
protected String getRegionBucketStr() {
return regionFolder + "/" + bucketId;
}
protected SortedHoplogPersistedEvent deserializeValue(byte[] val) throws IOException {
try {
return SortedHoplogPersistedEvent.fromBytes(val);
} catch (ClassNotFoundException e) {
logger
.error(
LocalizedStrings.GetMessage_UNABLE_TO_DESERIALIZE_VALUE_CLASSNOTFOUNDEXCEPTION,
e);
return null;
}
}
/**
* @return true if the entry belongs to an destroy event
*/
protected boolean isDeletedEntry(byte[] value, int offset) throws IOException {
// Read only the first byte of PersistedEventImpl for the operation
assert value != null && value.length > 0 && offset >= 0 && offset < value.length;
Operation op = Operation.fromOrdinal(value[offset]);
if (op.isDestroy() || op.isInvalidate()) {
return true;
}
return false;
}
/**
* @param seqNum
* desired sequence number of the hoplog. If null a highest number is
* choosen
* @param extension
* file extension representing the type of file, e.g. ihop for
* intermediate hoplog
* @return a new temporary file for a new sorted oplog. The name consists of
* bucket name, a sequence number for ordering the files followed by a
* timestamp
*/
Hoplog getTmpSortedOplog(Integer seqNum, String extension) throws IOException {
if (seqNum == null) {
seqNum = sequence.incrementAndGet();
}
String name = bucketId + "-" + System.currentTimeMillis() + "-" + seqNum
+ extension;
Path soplogPath = new Path(bucketPath, name + TEMP_HOPLOG_EXTENSION);
return getHoplog(soplogPath);
}
/**
* renames a temporary hoplog file to a legitimate name.
*/
static void makeLegitimate(Hoplog so) throws IOException {
String name = so.getFileName();
assert name.endsWith(TEMP_HOPLOG_EXTENSION);
int index = name.lastIndexOf(TEMP_HOPLOG_EXTENSION);
name = name.substring(0, index);
so.rename(name);
}
/**
* creates a expiry marker for a file on file system
*
* @param hoplog
* @throws IOException
*/
protected void addExpiryMarkerForAFile(Hoplog hoplog) throws IOException {
FileSystem fs = store.getFileSystem();
// TODO optimization needed here. instead of creating expired marker
// file per file, create a meta file. the main thing to worry is
// compaction of meta file itself
Path expiryMarker = getExpiryMarkerPath(hoplog.getFileName());
// uh-oh, why are we trying to expire an already expired file?
if (ENABLE_INTEGRITY_CHECKS) {
Assert.assertTrue(!fs.exists(expiryMarker),
"Expiry marker already exists: " + expiryMarker);
}
FSDataOutputStream expiryMarkerFile = fs.create(expiryMarker);
expiryMarkerFile.close();
if (logger.isDebugEnabled())
logger.debug("Hoplog marked expired: " + getPathStr(hoplog));
}
protected Path getExpiryMarkerPath(String name) {
return new Path(bucketPath, name + EXPIRED_HOPLOG_EXTENSION);
}
protected String truncateExpiryExtension(String name) {
if (name.endsWith(EXPIRED_HOPLOG_EXTENSION)) {
return name.substring(0, name.length() - EXPIRED_HOPLOG_EXTENSION.length());
}
return name;
}
/**
* updates region stats and a local copy of bucket level store usage metric.
*
* @param delta
*/
protected void incrementDiskUsage(long delta) {
long newSize = bucketDiskUsage.addAndGet(delta);
if (newSize < 0 && delta < 0) {
if (logger.isDebugEnabled()){
logger.debug("{}Invalid diskUsage size:" + newSize + " caused by delta:"
+ delta + ", parallel del & close?" + isClosed(), logPrefix);
}
if (isClosed()) {
// avoid corrupting disk usage size during close by reducing residue
// size only
delta = delta + (-1 * newSize);
}
}
stats.incStoreUsageBytes(delta);
}
/**
* Utility method to remove a file from valid file list if a expired marker
* for the file exists
*
* @param valid
* list of valid files
* @param expired
* list of expired file markers
* @return list f valid files that do not have a expired file marker
*/
public static FileStatus[] filterValidHoplogs(FileStatus[] valid,
FileStatus[] expired) {
if (valid == null) {
return null;
}
if (expired == null) {
return valid;
}
ArrayList<FileStatus> result = new ArrayList<FileStatus>();
for (FileStatus vs : valid) {
boolean found = false;
for (FileStatus ex : expired) {
if (ex
.getPath()
.getName()
.equals(
vs.getPath().getName()
+ HdfsSortedOplogOrganizer.EXPIRED_HOPLOG_EXTENSION)) {
found = true;
}
}
if (!found) {
result.add(vs);
}
}
return result.toArray(new FileStatus[result.size()]);
}
protected void pingSecondaries() throws ForceReattemptException {
if (JUNIT_TEST_RUN)
return;
BucketRegion br = ((PartitionedRegion)this.regionManager.getRegion()).getDataStore().getLocalBucketById(this.bucketId);
boolean secondariesPingable = false;
try {
secondariesPingable = br.areSecondariesPingable();
} catch (Throwable e) {
throw new ForceReattemptException("Failed to ping secondary servers of bucket: " +
this.bucketId + ", region: " + ((PartitionedRegion)this.regionManager.getRegion()), e);
}
if (!secondariesPingable)
throw new ForceReattemptException("Failed to ping secondary servers of bucket: " +
this.bucketId + ", region: " + ((PartitionedRegion)this.regionManager.getRegion()));
}
/**
* A comparator for ordering soplogs based on the file name. The file names
* are assigned incrementally and hint at the age of the file
*/
public static final class HoplogComparator implements
Comparator<TrackedReference<Hoplog>> {
/**
* a file with a higher sequence or timestamp is the younger and hence the
* smaller
*/
@Override
public int compare(TrackedReference<Hoplog> o1, TrackedReference<Hoplog> o2) {
return o1.get().compareTo(o2.get());
}
/**
* Compares age of files based on file names and returns 1 if name1 is
* older, -1 if name1 is yonger and 0 if the two files are same age
*/
public static int compareByName(String name1, String name2) {
HoplogDescriptor hd1 = new HoplogDescriptor(name1);
HoplogDescriptor hd2 = new HoplogDescriptor(name2);
return hd1.compareTo(hd2);
}
}
/**
* @param matcher
* A preinitialized / matched regex pattern
* @return Timestamp of the
*/
public static long getHoplogTimestamp(Matcher matcher) {
return Long.valueOf(matcher.group(2));
}
}