blob: 22e88326911207fadaf10bb2375ffbaec8261819 [file] [log] [blame]
* Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
* This product is protected by U.S. and international copyright
* and intellectual property laws. Pivotal products are covered by
* one or more patents listed at
package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import com.gemstone.gemfire.cache.Operation;
import com.gemstone.gemfire.cache.hdfs.internal.HDFSStoreImpl;
import com.gemstone.gemfire.cache.hdfs.internal.PersistedEventImpl;
import com.gemstone.gemfire.cache.hdfs.internal.QueuedPersistentEvent;
import com.gemstone.gemfire.cache.hdfs.internal.SortedHoplogPersistedEvent;
import com.gemstone.gemfire.cache.hdfs.internal.hoplog.AbstractHoplog.HoplogDescriptor;
import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSRegionDirector.HdfsRegionManager;
import com.gemstone.gemfire.internal.Assert;
import com.gemstone.gemfire.internal.cache.BucketRegion;
import com.gemstone.gemfire.internal.cache.ForceReattemptException;
import com.gemstone.gemfire.internal.cache.PartitionedRegion;
import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedOplogStatistics;
import com.gemstone.gemfire.internal.cache.persistence.soplog.TrackedReference;
import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
import com.gemstone.gemfire.internal.logging.LogService;
import org.apache.logging.log4j.Logger;
public abstract class AbstractHoplogOrganizer<T extends PersistedEventImpl> implements HoplogOrganizer<T> {
public static final String MINOR_HOPLOG_EXTENSION = ".ihop";
public static final String MAJOR_HOPLOG_EXTENSION = ".chop";
public static final String EXPIRED_HOPLOG_EXTENSION = ".exp";
public static final String TEMP_HOPLOG_EXTENSION = ".tmp";
public static final String FLUSH_HOPLOG_EXTENSION = ".hop";
public static final String SEQ_HOPLOG_EXTENSION = ".shop";
// all valid hoplogs will follow the following name pattern
public static final String HOPLOG_NAME_REGEX = "(.+?)-(\\d+?)-(\\d+?)";
public static final Pattern HOPLOG_NAME_PATTERN = Pattern.compile(HOPLOG_NAME_REGEX
+ "\\.(.*)");
public static boolean JUNIT_TEST_RUN = false;
protected static final boolean ENABLE_INTEGRITY_CHECKS = Boolean
|| assertionsEnabled();
private static boolean assertionsEnabled() {
boolean enabled = false;
assert enabled = true;
return enabled;
protected HdfsRegionManager regionManager;
// name or id of bucket managed by this organizer
protected final String regionFolder;
protected final int bucketId;
// path of the region directory
protected final Path basePath;
// identifies path of directory containing a bucket's oplog files
protected final Path bucketPath;
protected final HDFSStoreImpl store;
// assigns a unique increasing number to each oplog file
protected AtomicInteger sequence;
//logger instance
protected static final Logger logger = LogService.getLogger();
protected final String logPrefix;
protected SortedOplogStatistics stats;
AtomicLong bucketDiskUsage = new AtomicLong(0);
// creation of new files and expiration of files will be synchronously
// notified to the listener
protected HoplogListener listener;
private volatile boolean closed = false;
protected Object changePrimarylockObject = new Object();
public AbstractHoplogOrganizer(HdfsRegionManager region, int bucketId) {
assert region != null;
this.regionManager = region;
this.regionFolder = region.getRegionFolder(); = region.getStore();
this.listener = region.getListener();
this.stats = region.getHdfsStats();
this.bucketId = bucketId;
this.basePath = new Path(store.getHomeDir());
this.bucketPath = new Path(basePath, regionFolder + "/" + bucketId);
this.logPrefix = "<" + getRegionBucketStr() + "> ";
public boolean isClosed() {
return closed || regionManager.isClosed();
public void close() throws IOException {
closed = true;
// this bucket is closed and may be owned by a new node. So reduce the store
// usage stat, as the new owner adds the usage metric
incrementDiskUsage((-1) * bucketDiskUsage.get());
public abstract void flush(Iterator<? extends QueuedPersistentEvent> bufferIter,
int count) throws IOException, ForceReattemptException;
public abstract void clear() throws IOException;
protected abstract Hoplog getHoplog(Path hoplogPath) throws IOException;
public void hoplogCreated(String region, int bucketId, Hoplog... oplogs)
throws IOException {
throw new UnsupportedOperationException("Not supported for "
+ this.getClass().getSimpleName());
public void hoplogDeleted(String region, int bucketId, Hoplog... oplogs)
throws IOException {
throw new UnsupportedOperationException("Not supported for "
+ this.getClass().getSimpleName());
public void compactionCompleted(String region, int bucket, boolean isMajor) {
throw new UnsupportedOperationException("Not supported for "
+ this.getClass().getSimpleName());
public T read(byte[] key) throws IOException {
throw new UnsupportedOperationException("Not supported for "
+ this.getClass().getSimpleName());
public HoplogIterator<byte[], T> scan() throws IOException {
throw new UnsupportedOperationException("Not supported for "
+ this.getClass().getSimpleName());
public HoplogIterator<byte[], T> scan(byte[] from, byte[] to)
throws IOException {
throw new UnsupportedOperationException("Not supported for "
+ this.getClass().getSimpleName());
public HoplogIterator<byte[], T> scan(byte[] from,
boolean fromInclusive, byte[] to, boolean toInclusive) throws IOException {
throw new UnsupportedOperationException("Not supported for "
+ this.getClass().getSimpleName());
public long sizeEstimate() {
throw new UnsupportedOperationException("Not supported for "
+ this.getClass().getSimpleName());
* @return returns an oplogs full path after prefixing bucket path to the file
* name
protected String getPathStr(Hoplog oplog) {
return bucketPath.toString() + "/" + oplog.getFileName();
protected String getRegionBucketStr() {
return regionFolder + "/" + bucketId;
protected SortedHoplogPersistedEvent deserializeValue(byte[] val) throws IOException {
try {
return SortedHoplogPersistedEvent.fromBytes(val);
} catch (ClassNotFoundException e) {
return null;
* @return true if the entry belongs to an destroy event
protected boolean isDeletedEntry(byte[] value, int offset) throws IOException {
// Read only the first byte of PersistedEventImpl for the operation
assert value != null && value.length > 0 && offset >= 0 && offset < value.length;
Operation op = Operation.fromOrdinal(value[offset]);
if (op.isDestroy() || op.isInvalidate()) {
return true;
return false;
* @param seqNum
* desired sequence number of the hoplog. If null a highest number is
* choosen
* @param extension
* file extension representing the type of file, e.g. ihop for
* intermediate hoplog
* @return a new temporary file for a new sorted oplog. The name consists of
* bucket name, a sequence number for ordering the files followed by a
* timestamp
Hoplog getTmpSortedOplog(Integer seqNum, String extension) throws IOException {
if (seqNum == null) {
seqNum = sequence.incrementAndGet();
String name = bucketId + "-" + System.currentTimeMillis() + "-" + seqNum
+ extension;
Path soplogPath = new Path(bucketPath, name + TEMP_HOPLOG_EXTENSION);
return getHoplog(soplogPath);
* renames a temporary hoplog file to a legitimate name.
static void makeLegitimate(Hoplog so) throws IOException {
String name = so.getFileName();
assert name.endsWith(TEMP_HOPLOG_EXTENSION);
int index = name.lastIndexOf(TEMP_HOPLOG_EXTENSION);
name = name.substring(0, index);
* creates a expiry marker for a file on file system
* @param hoplog
* @throws IOException
protected void addExpiryMarkerForAFile(Hoplog hoplog) throws IOException {
FileSystem fs = store.getFileSystem();
// TODO optimization needed here. instead of creating expired marker
// file per file, create a meta file. the main thing to worry is
// compaction of meta file itself
Path expiryMarker = getExpiryMarkerPath(hoplog.getFileName());
// uh-oh, why are we trying to expire an already expired file?
"Expiry marker already exists: " + expiryMarker);
FSDataOutputStream expiryMarkerFile = fs.create(expiryMarker);
if (logger.isDebugEnabled())
logger.debug("Hoplog marked expired: " + getPathStr(hoplog));
protected Path getExpiryMarkerPath(String name) {
return new Path(bucketPath, name + EXPIRED_HOPLOG_EXTENSION);
protected String truncateExpiryExtension(String name) {
if (name.endsWith(EXPIRED_HOPLOG_EXTENSION)) {
return name.substring(0, name.length() - EXPIRED_HOPLOG_EXTENSION.length());
return name;
* updates region stats and a local copy of bucket level store usage metric.
* @param delta
protected void incrementDiskUsage(long delta) {
long newSize = bucketDiskUsage.addAndGet(delta);
if (newSize < 0 && delta < 0) {
if (logger.isDebugEnabled()){
logger.debug("{}Invalid diskUsage size:" + newSize + " caused by delta:"
+ delta + ", parallel del & close?" + isClosed(), logPrefix);
if (isClosed()) {
// avoid corrupting disk usage size during close by reducing residue
// size only
delta = delta + (-1 * newSize);
* Utility method to remove a file from valid file list if a expired marker
* for the file exists
* @param valid
* list of valid files
* @param expired
* list of expired file markers
* @return list f valid files that do not have a expired file marker
public static FileStatus[] filterValidHoplogs(FileStatus[] valid,
FileStatus[] expired) {
if (valid == null) {
return null;
if (expired == null) {
return valid;
ArrayList<FileStatus> result = new ArrayList<FileStatus>();
for (FileStatus vs : valid) {
boolean found = false;
for (FileStatus ex : expired) {
if (ex
+ HdfsSortedOplogOrganizer.EXPIRED_HOPLOG_EXTENSION)) {
found = true;
if (!found) {
return result.toArray(new FileStatus[result.size()]);
protected void pingSecondaries() throws ForceReattemptException {
BucketRegion br = ((PartitionedRegion)this.regionManager.getRegion()).getDataStore().getLocalBucketById(this.bucketId);
boolean secondariesPingable = false;
try {
secondariesPingable = br.areSecondariesPingable();
} catch (Throwable e) {
throw new ForceReattemptException("Failed to ping secondary servers of bucket: " +
this.bucketId + ", region: " + ((PartitionedRegion)this.regionManager.getRegion()), e);
if (!secondariesPingable)
throw new ForceReattemptException("Failed to ping secondary servers of bucket: " +
this.bucketId + ", region: " + ((PartitionedRegion)this.regionManager.getRegion()));
* A comparator for ordering soplogs based on the file name. The file names
* are assigned incrementally and hint at the age of the file
public static final class HoplogComparator implements
Comparator<TrackedReference<Hoplog>> {
* a file with a higher sequence or timestamp is the younger and hence the
* smaller
public int compare(TrackedReference<Hoplog> o1, TrackedReference<Hoplog> o2) {
return o1.get().compareTo(o2.get());
* Compares age of files based on file names and returns 1 if name1 is
* older, -1 if name1 is yonger and 0 if the two files are same age
public static int compareByName(String name1, String name2) {
HoplogDescriptor hd1 = new HoplogDescriptor(name1);
HoplogDescriptor hd2 = new HoplogDescriptor(name2);
return hd1.compareTo(hd2);
* @param matcher
* A preinitialized / matched regex pattern
* @return Timestamp of the
public static long getHoplogTimestamp(Matcher matcher) {
return Long.valueOf(;