blob: 007e73bc5541895c52511d3bd926714d218bc11d [file] [log] [blame]
/*=========================================================================
* Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
* This product is protected by U.S. and international copyright
* and intellectual property laws. Pivotal products are covered by
* one or more patents listed at http://www.pivotal.io/patents.
*=========================================================================
*/
package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.junit.experimental.categories.Category;
import com.gemstone.gemfire.cache.Operation;
import com.gemstone.gemfire.cache.hdfs.HDFSStore;
import com.gemstone.gemfire.cache.hdfs.HDFSStoreFactory.HDFSCompactionConfigFactory;
import com.gemstone.gemfire.cache.hdfs.HDFSStoreMutator;
import com.gemstone.gemfire.cache.hdfs.internal.HDFSStoreImpl;
import com.gemstone.gemfire.cache.hdfs.internal.PersistedEventImpl;
import com.gemstone.gemfire.cache.hdfs.internal.QueuedPersistentEvent;
import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSRegionDirector.HdfsRegionManager;
import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HdfsSortedOplogOrganizer.HoplogCompactor;
import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HoplogOrganizer.Compactor;
import com.gemstone.gemfire.internal.cache.ForceReattemptException;
import com.gemstone.gemfire.internal.cache.persistence.soplog.TrackedReference;
import com.gemstone.gemfire.internal.util.BlobHelper;
import com.gemstone.gemfire.test.junit.categories.HoplogTest;
import com.gemstone.gemfire.test.junit.categories.IntegrationTest
;
@Category({IntegrationTest.class, HoplogTest.class})
public class TieredCompactionJUnitTest extends BaseHoplogTestCase {
static long ONE_MB = 1024 * 1024;
static long TEN_MB = 10 * ONE_MB;
@Override
protected void configureHdfsStoreFactory() throws Exception {
super.configureHdfsStoreFactory();
HDFSCompactionConfigFactory factory = hsf.createCompactionConfigFactory(null);
factory.setMinInputFileCount(3);
hsf.setMinorCompaction(false);
factory.setAutoMajorCompaction(false);
hsf.setHDFSCompactionConfig(factory.create());
}
public void testMinorCompaction() throws Exception {
HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, 0);
// #1
ArrayList<QueuedPersistentEvent> items = new ArrayList<QueuedPersistentEvent>();
items.add(new TestEvent("1", "1"));
items.add(new TestEvent("2", "1"));
items.add(new TestEvent("3", "1"));
items.add(new TestEvent("4", "1"));
organizer.flush(items.iterator(), items.size());
// #2
items.clear();
items.add(new TestEvent("2", "1"));
items.add(new TestEvent("4", "1"));
items.add(new TestEvent("6", "1"));
items.add(new TestEvent("8", "1"));
organizer.flush(items.iterator(), items.size());
// #3
items.clear();
items.add(new TestEvent("1", "1"));
items.add(new TestEvent("3", "1"));
items.add(new TestEvent("5", "1"));
items.add(new TestEvent("7", "1"));
items.add(new TestEvent("9", "1"));
organizer.flush(items.iterator(), items.size());
// #4
items.clear();
items.add(new TestEvent("0", "1"));
items.add(new TestEvent("1", "1"));
items.add(new TestEvent("4", "1"));
items.add(new TestEvent("5", "1"));
organizer.flush(items.iterator(), items.size());
// check file existence in bucket directory, expect 4 hoplgos
FileStatus[] hoplogs = getBucketHoplogs(getName() + "/0", HdfsSortedOplogOrganizer.FLUSH_HOPLOG_EXTENSION);
assertEquals(4, hoplogs.length);
// After compaction expect 1 hoplog only. It should have the same sequence number as that of the
// youngest file compacted, which should be 4 in this case
organizer.getCompactor().compact(false, false);
hoplogs = getBucketHoplogs(getName() + "/0", HdfsSortedOplogOrganizer.MINOR_HOPLOG_EXTENSION);
assertEquals(1, hoplogs.length);
assertEquals(1, organizer.getSortedOplogs().size());
Hoplog hoplog = new HFileSortedOplog(hdfsStore, hoplogs[0].getPath(), blockCache, stats, storeStats);
assertEquals(4, HdfsSortedOplogOrganizer.getSequenceNumber(hoplog));
// iterate on oplogs to validate data in files
HoplogSetIterator iter = new HoplogSetIterator(organizer.getSortedOplogs());
// the iteration pattern for this test should be 0-9:
// 0 1 4 5 oplog #4
// 1 3 5 7 9 oplog #3
// 2 4 6 8 oplog #2
// 1 2 3 4 oplog #1
int count = 0;
for (ByteBuffer keyBB = null; iter.hasNext();) {
keyBB = iter.next();
byte[] key = HFileSortedOplog.byteBufferToArray(keyBB);
assertEquals(String.valueOf(count), BlobHelper.deserializeBlob(key));
count++;
}
assertEquals(10, count);
// there must be 4 expired hoplogs now
hoplogs = getBucketHoplogs(getName() + "/0", HdfsSortedOplogOrganizer.EXPIRED_HOPLOG_EXTENSION);
assertEquals(4, hoplogs.length);
organizer.close();
}
public void testIterativeMinorCompaction() throws Exception {
HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, 0);
// #1
ArrayList<QueuedPersistentEvent> items = new ArrayList<QueuedPersistentEvent>();
items.add(new TestEvent("1", "1"));
items.add(new TestEvent("2", "1"));
organizer.flush(items.iterator(), items.size());
items.clear();
items.add(new TestEvent("1", "2"));
items.add(new TestEvent("3", "2"));
organizer.flush(items.iterator(), items.size());
items.clear();
items.add(new TestEvent("4", "3"));
items.add(new TestEvent("5", "3"));
organizer.flush(items.iterator(), items.size());
// check file existence in bucket directory
FileStatus[] hoplogs = getBucketHoplogs(getName() + "/0", HdfsSortedOplogOrganizer.FLUSH_HOPLOG_EXTENSION);
assertEquals(3, hoplogs.length);
organizer.getCompactor().compact(false, false);
FileStatus[] expired = getBucketHoplogs(getName() + "/0", HdfsSortedOplogOrganizer.EXPIRED_HOPLOG_EXTENSION);
assertEquals(3, expired.length);
FileStatus[] valids = HdfsSortedOplogOrganizer.filterValidHoplogs(hoplogs, expired);
assertEquals(0, valids.length);
// After compaction expect 1 hoplog only.
hoplogs = getBucketHoplogs(getName() + "/0", HdfsSortedOplogOrganizer.MINOR_HOPLOG_EXTENSION);
assertEquals(1, hoplogs.length);
items.clear();
items.add(new TestEvent("4", "4"));
items.add(new TestEvent("6", "4"));
organizer.flush(items.iterator(), items.size());
items.clear();
items.add(new TestEvent("7", "5"));
items.add(new TestEvent("8", "5"));
organizer.flush(items.iterator(), items.size());
hoplogs = getBucketHoplogs(getName() + "/0", HdfsSortedOplogOrganizer.FLUSH_HOPLOG_EXTENSION);
assertEquals(5, hoplogs.length);
organizer.getCompactor().compact(false, false);
expired = getBucketHoplogs(getName() + "/0", HdfsSortedOplogOrganizer.EXPIRED_HOPLOG_EXTENSION);
assertEquals(6, expired.length);
valids = HdfsSortedOplogOrganizer.filterValidHoplogs(hoplogs, expired);
assertEquals(0, valids.length);
hoplogs = getBucketHoplogs(getName() + "/0", HdfsSortedOplogOrganizer.MINOR_HOPLOG_EXTENSION);
assertEquals(2, hoplogs.length);
valids = HdfsSortedOplogOrganizer.filterValidHoplogs(hoplogs, expired);
assertEquals(1, valids.length);
assertEquals("2", organizer.read(BlobHelper.serializeToBlob("1")).getValue());
assertEquals("1", organizer.read(BlobHelper.serializeToBlob("2")).getValue());
assertEquals("2", organizer.read(BlobHelper.serializeToBlob("3")).getValue());
assertEquals("4", organizer.read(BlobHelper.serializeToBlob("4")).getValue());
assertEquals("3", organizer.read(BlobHelper.serializeToBlob("5")).getValue());
assertEquals("4", organizer.read(BlobHelper.serializeToBlob("6")).getValue());
assertEquals("5", organizer.read(BlobHelper.serializeToBlob("7")).getValue());
organizer.close();
}
public void testMajorCompactionWithDelete() throws Exception {
HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, 0);
// #1
ArrayList<TestEvent> items = new ArrayList<TestEvent>();
items.add(new TestEvent("1", "1"));
items.add(new TestEvent("2", "1"));
items.add(new TestEvent("3", "1"));
items.add(new TestEvent("4", "1"));
items.add(new TestEvent("4", "10", Operation.DESTROY));
organizer.flush(items.iterator(), items.size());
// #2
items.clear();
items.add(new TestEvent("2", "1", Operation.DESTROY));
items.add(new TestEvent("4", "1", Operation.DESTROY));
items.add(new TestEvent("6", "1", Operation.INVALIDATE));
items.add(new TestEvent("8", "1"));
organizer.flush(items.iterator(), items.size());
// #3
items.clear();
items.add(new TestEvent("1", "1"));
items.add(new TestEvent("3", "1"));
items.add(new TestEvent("5", "1"));
items.add(new TestEvent("7", "1"));
items.add(new TestEvent("9", "1", Operation.DESTROY));
organizer.flush(items.iterator(), items.size());
// #4
items.clear();
items.add(new TestEvent("0", "1", Operation.DESTROY));
items.add(new TestEvent("1", "1"));
items.add(new TestEvent("4", "1"));
items.add(new TestEvent("5", "1"));
organizer.flush(items.iterator(), items.size());
// check file existence in bucket directory, expect 4 hoplgos
FileStatus[] hoplogs = getBucketHoplogs(getName() + "/0", HdfsSortedOplogOrganizer.FLUSH_HOPLOG_EXTENSION);
assertEquals(4, hoplogs.length);
// After compaction expect 1 hoplog only. It should have the same sequence number as that of the
// youngest file compacted, which should be 4 in this case
organizer.getCompactor().compact(true, false);
hoplogs = getBucketHoplogs(getName() + "/0", HdfsSortedOplogOrganizer.MAJOR_HOPLOG_EXTENSION);
assertEquals(1, hoplogs.length);
assertEquals(1, organizer.getSortedOplogs().size());
Hoplog hoplog = new HFileSortedOplog(hdfsStore, hoplogs[0].getPath(), blockCache, stats, storeStats);
assertEquals(4, HdfsSortedOplogOrganizer.getSequenceNumber(hoplog));
// iterate on oplogs to validate data in files
HoplogSetIterator iter = new HoplogSetIterator(organizer.getSortedOplogs());
int count = 0;
// entries in () are destroyed or invalidated
// 1, 2, 3, 4, (11)
// (2), (4), (6), 8
// 1, 3, 5, 7, (9)
// (0), 1, 4, 5
String[] expectedValues = { "1", "3", "4", "5", "7", "8" };
for (ByteBuffer keyBB = null; iter.hasNext();) {
keyBB = iter.next();
byte[] key = HFileSortedOplog.byteBufferToArray(keyBB);
assertEquals(expectedValues[count], BlobHelper.deserializeBlob(key));
count++;
}
assertEquals(6, count);
// there must be 4 expired hoplogs now
hoplogs = getBucketHoplogs(getName() + "/0", HdfsSortedOplogOrganizer.EXPIRED_HOPLOG_EXTENSION);
assertEquals(4, hoplogs.length);
organizer.close();
}
public void testGainComputation() throws Exception {
HoplogOrganizer<? extends PersistedEventImpl> organizer = regionManager.create(0);
HdfsSortedOplogOrganizer bucket = (HdfsSortedOplogOrganizer) organizer;
ArrayList<TrackedReference<Hoplog>> targets = new ArrayList<TrackedReference<Hoplog>>();
for (int i = 0; i < 10; i++) {
targets.add(new TrackedReference<Hoplog>(new TestHoplog(hdfsStore, i * TEN_MB)));
}
// each read has cost 3. Four files read cost is 3 * 4. Reduce read cost of
// file after compaction
float expect = (float) ((3 * 4.0 - 3) / (20 + 30 + 40 + 50));
float result = bucket.computeGain(2, 5, targets);
assertTrue(Math.abs(expect - result) < (expect/1000));
// each read has cost 3 except 10MB file with read cost 2. 9 files read cost
// is 3 * 9. Reduce read cost of file after compaction.
expect = (float) ((3 * 9 - 3 - 1.0) / (10 + 20 + 30 + 40 + 50 + 60 + 70 + 80 + 90));
result = bucket.computeGain(0, 9, targets);
assertTrue(Math.abs(expect - result) < (expect/1000));
}
public void testGainComputeSmallFile() throws Exception {
HoplogOrganizer<? extends PersistedEventImpl> organizer = regionManager.create(0);
HdfsSortedOplogOrganizer bucket = (HdfsSortedOplogOrganizer) organizer;
ArrayList<TrackedReference<Hoplog>> targets = new ArrayList<TrackedReference<Hoplog>>();
for (int i = 0; i < 10; i++) {
targets.add(new TrackedReference<Hoplog>(new TestHoplog(hdfsStore, i * TEN_MB / 1024)));
}
float result = bucket.computeGain(2, 5, targets);
assertTrue(Math.abs(8.0 - result) < (1.0/1000));
}
public void testGainComputeMixedFiles() throws Exception {
HoplogOrganizer<? extends PersistedEventImpl> organizer = regionManager.create(0);
HdfsSortedOplogOrganizer bucket = (HdfsSortedOplogOrganizer) organizer;
ArrayList<TrackedReference<Hoplog>> targets = new ArrayList<TrackedReference<Hoplog>>();
for (int i = 0; i < 10; i++) {
targets.add(new TrackedReference<Hoplog>(new TestHoplog(hdfsStore, i * TEN_MB / 1024)));
}
TestHoplog midHop = (TestHoplog) targets.get(4).get();
// one more than other files
midHop.size = 5 * TEN_MB;
float expect = (float) ((4 * 2 - 3 + 1.0) / 50);
float result = bucket.computeGain(2, 5, targets);
System.out.println(expect);
System.out.println(result);
assertTrue(Math.abs(expect - result) < (expect/1000));
}
public void testGainComputeBadRatio() throws Exception {
HoplogOrganizer<? extends PersistedEventImpl> organizer = regionManager.create(0);
HdfsSortedOplogOrganizer bucket = (HdfsSortedOplogOrganizer) organizer;
ArrayList<TrackedReference<Hoplog>> targets = new ArrayList<TrackedReference<Hoplog>>();
for (int i = 0; i < 10; i++) {
targets.add(new TrackedReference<Hoplog>(new TestHoplog(hdfsStore, i * TEN_MB)));
}
TestHoplog firstHop = (TestHoplog) targets.get(2).get();
// one more than other files
firstHop.size = (1 + 30 + 40 + 50) * TEN_MB;
Float result = bucket.computeGain(2, 5, targets);
assertNull(result);
}
public void testMinorCompactionTargetMaxSize() throws Exception {
HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, 0);
HoplogCompactor compactor = (HoplogCompactor) organizer.getCompactor();
ArrayList<TrackedReference<TestHoplog>> targets = new ArrayList<TrackedReference<TestHoplog>>();
for (int i = 0; i < 5; i++) {
TrackedReference<TestHoplog> hop = new TrackedReference<TestHoplog>(new TestHoplog(hdfsStore, TEN_MB + i));
hop.increment();
targets.add(hop);
}
TrackedReference<TestHoplog> oldestHop = targets.get(targets.size() - 1);
TestHoplog thirdHop = (TestHoplog) targets.get(2).get();
// oldest is more than max size is ignored
oldestHop.get().size = HDFSStore.HDFSCompactionConfig.DEFAULT_MAX_INPUT_FILE_SIZE_MB * ONE_MB + 100;
List<TrackedReference<Hoplog>> list = (List<TrackedReference<Hoplog>>) targets.clone();
compactor.getMinorCompactionTargets(list, -1);
assertEquals(4, list.size());
for (TrackedReference<Hoplog> ref : list) {
assertTrue(((TestHoplog)ref.get()).size - TEN_MB < 5 );
}
// third is more than max size but is not ignored
thirdHop.size = HDFSStore.HDFSCompactionConfig.DEFAULT_MAX_INPUT_FILE_SIZE_MB * ONE_MB + 100;
oldestHop.increment();
list = (List<TrackedReference<Hoplog>>) targets.clone();
compactor.getMinorCompactionTargets(list, -1);
assertEquals(4, list.size());
int i = 0;
for (TrackedReference<Hoplog> ref : list) {
if (i != 2) {
assertTrue(((TestHoplog) ref.get()).size - TEN_MB < 5);
} else {
assertTrue(((TestHoplog) ref.get()).size > HDFSStore.HDFSCompactionConfig.DEFAULT_MAX_INPUT_FILE_SIZE_MB * ONE_MB);
}
i++;
}
}
public void testAlterMaxInputFileSize() throws Exception {
HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, 0);
HoplogCompactor compactor = (HoplogCompactor) organizer.getCompactor();
assertTrue(TEN_MB * 2 < hdfsStore.getHDFSCompactionConfig().getMaxInputFileSizeMB() * ONE_MB);
ArrayList<TrackedReference<TestHoplog>> targets = new ArrayList<TrackedReference<TestHoplog>>();
for (int i = 0; i < 5; i++) {
TrackedReference<TestHoplog> hop = new TrackedReference<TestHoplog>(new TestHoplog(hdfsStore, TEN_MB + i));
hop.increment();
targets.add(hop);
}
List<TrackedReference<Hoplog>> list = (List<TrackedReference<Hoplog>>) targets.clone();
compactor.getMinorCompactionTargets(list, -1);
assertEquals(targets.size(), list.size());
HDFSStoreMutator mutator = hdfsStore.createHdfsStoreMutator();
mutator.getCompactionConfigMutator().setMaxInputFileSizeMB(1);
hdfsStore.alter(mutator);
compactor.getMinorCompactionTargets(list, -1);
assertEquals(0, list.size());
}
public void testAlterInputFileCount() throws Exception {
HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, 0);
HoplogCompactor compactor = (HoplogCompactor) organizer.getCompactor();
assertTrue(2 < hdfsStore.getHDFSCompactionConfig().getMaxInputFileCount());
ArrayList<TrackedReference<TestHoplog>> targets = new ArrayList<TrackedReference<TestHoplog>>();
for (int i = 0; i < 5; i++) {
TrackedReference<TestHoplog> hop = new TrackedReference<TestHoplog>(new TestHoplog(hdfsStore, TEN_MB + i));
hop.increment();
targets.add(hop);
}
List<TrackedReference<Hoplog>> list = (List<TrackedReference<Hoplog>>) targets.clone();
compactor.getMinorCompactionTargets(list, -1);
assertEquals(targets.size(), list.size());
HDFSStoreMutator mutator = hdfsStore.createHdfsStoreMutator();
mutator.getCompactionConfigMutator().setMaxInputFileCount(2);
mutator.getCompactionConfigMutator().setMinInputFileCount(2);
hdfsStore.alter(mutator);
compactor.getMinorCompactionTargets(list, -1);
assertEquals(2, list.size());
}
public void testAlterMajorCompactionInterval() throws Exception {
final AtomicInteger majorCReqCount = new AtomicInteger(0);
final Compactor compactor = new AbstractCompactor() {
@Override
public boolean compact(boolean isMajor, boolean isForced) throws IOException {
majorCReqCount.incrementAndGet();
return true;
}
};
HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, 0) {
@Override
public synchronized Compactor getCompactor() {
return compactor;
}
};
// create hoplog in the past, 90 seconds before current time
organizer.hoplogCreated(getName(), 0, new TestHoplog(hdfsStore, ONE_MB, System.currentTimeMillis() - 90000));
TimeUnit.MILLISECONDS.sleep(50);
organizer.hoplogCreated(getName(), 0, new TestHoplog(hdfsStore, ONE_MB, System.currentTimeMillis() - 90000));
alterMajorCompaction(hdfsStore, true);
List<TrackedReference<Hoplog>> hoplogs = organizer.getSortedOplogs();
assertEquals(2, hoplogs.size());
organizer.performMaintenance();
TimeUnit.MILLISECONDS.sleep(100);
assertEquals(0, majorCReqCount.get());
HDFSStoreMutator mutator = hdfsStore.createHdfsStoreMutator();
mutator.getCompactionConfigMutator().setMajorCompactionIntervalMins(1);
hdfsStore.alter(mutator);
organizer.performMaintenance();
TimeUnit.MILLISECONDS.sleep(100);
assertEquals(1, majorCReqCount.get());
}
public void testMinorCompactionTargetMinCount() throws Exception {
HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, 0);
HoplogCompactor compactor = (HoplogCompactor) organizer.getCompactor();
ArrayList<TrackedReference<Hoplog>> targets = new ArrayList<TrackedReference<Hoplog>>();
for (int i = 0; i < 2; i++) {
TrackedReference<Hoplog> hop = new TrackedReference<Hoplog>(new TestHoplog(hdfsStore, TEN_MB + i));
hop.increment();
targets.add(hop);
}
compactor.getMinorCompactionTargets(targets, -1);
assertEquals(0, targets.size());
}
public void testMinorCompactionLessTargetsStatsUpdate() throws Exception {
HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, 0);
ArrayList<TestEvent> items = new ArrayList<TestEvent>();
items.add(new TestEvent("1", "1"));
organizer.flush(items.iterator(), items.size());
items.clear();
items.add(new TestEvent("2", "2", Operation.DESTROY));
organizer.flush(items.iterator(), items.size());
TimeUnit.SECONDS.sleep(1);
List<TrackedReference<Hoplog>> hoplogs = organizer.getSortedOplogs();
assertEquals(2, hoplogs.size());
organizer.performMaintenance();
hoplogs = organizer.getSortedOplogs();
assertEquals(2, hoplogs.size());
}
public void testMinorCompactionTargetsOptimizer() throws Exception {
HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, 0);
HoplogCompactor compactor = (HoplogCompactor) organizer.getCompactor();
ArrayList<TrackedReference<Hoplog>> targets = new ArrayList<TrackedReference<Hoplog>>();
for (int i = 0; i < 6; i++) {
TrackedReference<Hoplog> hop = new TrackedReference<Hoplog>(new TestHoplog(hdfsStore, TEN_MB + i));
hop.increment();
targets.add(hop);
}
List<TrackedReference<Hoplog>> list = (List<TrackedReference<Hoplog>>) targets.clone();
compactor.getMinorCompactionTargets(list, -1);
assertEquals(6, list.size());
TestHoplog fifthHop = (TestHoplog) targets.get(4).get();
// fifth hop needs additional block read as it has more than max keys size
fifthHop.size = (HdfsSortedOplogOrganizer.AVG_NUM_KEYS_PER_INDEX_BLOCK * 5 + 1) * 64 * 1024;
list = (List<TrackedReference<Hoplog>>) targets.clone();
compactor.getMinorCompactionTargets(list, -1);
assertEquals(4, list.size());
for (TrackedReference<Hoplog> ref : list) {
assertTrue(((TestHoplog)ref.get()).size - TEN_MB < 4 );
}
}
public void testTargetsReleasedBadRatio() throws Exception {
HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, 0);
HoplogCompactor compactor = (HoplogCompactor) organizer.getCompactor();
ArrayList<TrackedReference<Hoplog>> targets = new ArrayList<TrackedReference<Hoplog>>();
for (int i = 0; i < 3; i++) {
TrackedReference<Hoplog> hop = new TrackedReference<Hoplog>(new TestHoplog(hdfsStore, TEN_MB + i));
hop.increment();
targets.add(hop);
}
TestHoplog oldestHop = (TestHoplog) targets.get(2).get();
oldestHop.size = (1 + 30) * TEN_MB;
List<TrackedReference<Hoplog>> list = (List<TrackedReference<Hoplog>>) targets.clone();
compactor.getMinorCompactionTargets(list, -1);
assertEquals(0, list.size());
assertEquals(3, targets.size());
for (TrackedReference<Hoplog> ref : targets) {
assertEquals(0, ref.uses());
}
}
public void testMinorCTargetsIgnoreMajorC() throws Exception {
HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, 0);
ArrayList<TestEvent> items = new ArrayList<TestEvent>();
for (int i = 0; i < 7; i++) {
items.clear();
items.add(new TestEvent("1" + i, "1" + i));
organizer.flush(items.iterator(), items.size());
}
HoplogCompactor compactor = (HoplogCompactor) organizer.getCompactor();
List<TrackedReference<Hoplog>> targets = organizer.getSortedOplogs();
compactor.getMinorCompactionTargets(targets, -1);
assertEquals(7, targets.size());
targets = organizer.getSortedOplogs();
for (TrackedReference<Hoplog> ref : targets) {
ref.increment();
}
compactor.getMinorCompactionTargets(targets, 2);
assertEquals((7 - 2), targets.size());
targets = organizer.getSortedOplogs();
for (int i = 0; i < targets.size(); i++) {
if (i + 1 <= (7 - 2)) {
assertEquals(1, targets.get(i).uses());
} else {
assertEquals(0, targets.get(i).uses());
}
}
targets = organizer.getSortedOplogs();
for (TrackedReference<Hoplog> ref : targets) {
if (ref.uses() == 0) {
ref.increment();
}
assertEquals(1, ref.uses());
}
compactor.getMinorCompactionTargets(targets, 7);
assertEquals(0, targets.size());
targets = organizer.getSortedOplogs();
for (int i = 0; i < targets.size(); i++) {
assertEquals(0, targets.get(i).uses());
}
}
public void testTargetOverlap() throws Exception {
HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, 0);
ArrayList<TestEvent> items = new ArrayList<TestEvent>();
for (int i = 0; i < 7; i++) {
items.clear();
items.add(new TestEvent("1" + i, "1" + i));
organizer.flush(items.iterator(), items.size());
}
HoplogCompactor compactor = (HoplogCompactor) organizer.getCompactor();
List<TrackedReference<Hoplog>> targets = organizer.getSortedOplogs();
assertTrue(compactor.isMinorMajorOverlap(targets, 8));
assertTrue(compactor.isMinorMajorOverlap(targets, 7));
assertTrue(compactor.isMinorMajorOverlap(targets, 6));
assertTrue(compactor.isMinorMajorOverlap(targets, 1));
assertFalse(compactor.isMinorMajorOverlap(targets, 0));
assertFalse(compactor.isMinorMajorOverlap(targets, -1));
targets.remove(targets.size() -1); // remove the last one
targets.remove(targets.size() -1); // remove the last one again
assertFalse(compactor.isMinorMajorOverlap(targets, 1));
assertFalse(compactor.isMinorMajorOverlap(targets, 2));
assertTrue(compactor.isMinorMajorOverlap(targets, 3));
targets.remove(3); // remove from the middle, seq num 4
assertTrue(compactor.isMinorMajorOverlap(targets, 4));
assertTrue(compactor.isMinorMajorOverlap(targets, 3));
}
public void testSuspendMinorByMajor() throws Exception {
HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, 0);
ArrayList<TestEvent> items = new ArrayList<TestEvent>();
for (int i = 0; i < 5; i++) {
items.clear();
items.add(new TestEvent("1" + i, "1" + i));
organizer.flush(items.iterator(), items.size());
}
HoplogCompactor compactor = (HoplogCompactor) organizer.getCompactor();
Hoplog hoplog = new HFileSortedOplog(hdfsStore, new Path(testDataDir + "/"
+ getName() + "-" + System.currentTimeMillis() + "-1.ihop.tmp"), blockCache, stats, storeStats);
compactor.fillCompactionHoplog(false, organizer.getSortedOplogs(), hoplog, -1);
cache.getLogger().info("<ExpectedException action=add>java.lang.InterruptedException</ExpectedException>");
try {
compactor.maxMajorCSeqNum.set(3);
compactor.fillCompactionHoplog(false, organizer.getSortedOplogs(), hoplog, -1);
fail();
} catch (InterruptedException e) {
// expected
}
cache.getLogger().info("<ExpectedException action=remove>java.lang.InterruptedException</ExpectedException>");
organizer.close();
}
public void testMajorCompactionSetsSeqNum() throws Exception {
final CountDownLatch compactionStartedLatch = new CountDownLatch(1);
final CountDownLatch waitLatch = new CountDownLatch(1);
class MyOrganizer extends HdfsSortedOplogOrganizer {
final HoplogCompactor compactor = new MyCompactor();
public MyOrganizer(HdfsRegionManager region, int bucketId) throws IOException {
super(region, bucketId);
}
public synchronized Compactor getCompactor() {
return compactor;
}
class MyCompactor extends HoplogCompactor {
@Override
public long fillCompactionHoplog(boolean isMajor,
List<TrackedReference<Hoplog>> targets, Hoplog output,
int majorCSeqNum) throws IOException, InterruptedException {
compactionStartedLatch.countDown();
waitLatch.await();
long byteCount = 0;
try {
byteCount = super.fillCompactionHoplog(isMajor, targets, output, majorCSeqNum);
} catch (ForceReattemptException e) {
// we do not expect this in a unit test.
}
return byteCount;
}
}
}
final HdfsSortedOplogOrganizer organizer = new MyOrganizer(regionManager, 0);
ArrayList<TestEvent> items = new ArrayList<TestEvent>();
for (int i = 0; i < 3; i++) {
items.clear();
items.add(new TestEvent("1" + i, "1" + i));
organizer.flush(items.iterator(), items.size());
}
Thread t = new Thread(new Runnable() {
public void run() {
try {
organizer.getCompactor().compact(true, false);
} catch (IOException e) {
e.printStackTrace();
}
}
});
t.start();
compactionStartedLatch.await();
assertEquals(3, ((HoplogCompactor)organizer.getCompactor()).maxMajorCSeqNum.get());
waitLatch.countDown();
t.join();
}
public void testMinorWatchesMajorsSeqNum() throws Exception {
final CountDownLatch majorCStartedLatch = new CountDownLatch(1);
final CountDownLatch majorCWaitLatch = new CountDownLatch(1);
final CountDownLatch minorCStartedLatch = new CountDownLatch(1);
final List<TrackedReference<Hoplog>> minorTargets = new ArrayList<TrackedReference<Hoplog>>();
class MyOrganizer extends HdfsSortedOplogOrganizer {
final HoplogCompactor compactor = new MyCompactor();
public MyOrganizer(HdfsRegionManager region, int bucketId) throws IOException {
super(region, bucketId);
}
public synchronized Compactor getCompactor() {
return compactor;
}
class MyCompactor extends HoplogCompactor {
@Override
public long fillCompactionHoplog(boolean isMajor,
List<TrackedReference<Hoplog>> targets, Hoplog output,
int majorCSeqNum) throws IOException, InterruptedException {
if (isMajor) {
majorCStartedLatch.countDown();
majorCWaitLatch.await();
} else {
minorCStartedLatch.countDown();
minorTargets.addAll(targets);
}
long byteCount =0;
try {
byteCount = super.fillCompactionHoplog(isMajor, targets, output, majorCSeqNum);
} catch (ForceReattemptException e) {
// we do not expect this in a unit test.
}
return byteCount;
}
}
}
final HdfsSortedOplogOrganizer organizer = new MyOrganizer(regionManager, 0);
ArrayList<TestEvent> items = new ArrayList<TestEvent>();
for (int i = 0; i < 3; i++) {
items.clear();
items.add(new TestEvent("1" + i, "1" + i));
organizer.flush(items.iterator(), items.size());
}
Thread majorCThread = new Thread(new Runnable() {
public void run() {
try {
organizer.getCompactor().compact(true, false);
} catch (IOException e) {
e.printStackTrace();
}
}
});
majorCThread.start();
majorCStartedLatch.await();
assertEquals(3, ((HoplogCompactor)organizer.getCompactor()).maxMajorCSeqNum.get());
// create more files for minor C
for (int i = 0; i < 4; i++) {
items.clear();
items.add(new TestEvent("1" + i, "1" + i));
organizer.flush(items.iterator(), items.size());
}
Thread minorCThread = new Thread(new Runnable() {
public void run() {
try {
organizer.getCompactor().compact(false, false);
} catch (IOException e) {
e.printStackTrace();
}
}
});
minorCThread.start();
minorCThread.join();
assertEquals(4, minorTargets.size());
for (TrackedReference<Hoplog> ref : minorTargets) {
assertTrue(organizer.getSequenceNumber(ref.get()) >= 4);
}
majorCWaitLatch.countDown();
majorCThread.join();
}
public void testTimeBoundedSuspend() throws Exception {
final AtomicBoolean barrier = new AtomicBoolean(true);
class MyOrganizer extends HdfsSortedOplogOrganizer {
public MyOrganizer(HdfsRegionManager region, int bucketId) throws IOException {
super(region, bucketId);
}
public synchronized Compactor getCompactor() {
return new MyCompactor();
}
class MyCompactor extends HoplogCompactor {
public long fillCompactionHoplog(boolean isMajor, List<TrackedReference<Hoplog>> targets, Hoplog output)
throws IOException, InterruptedException {
barrier.set(false);
TimeUnit.SECONDS.sleep(5 * HoplogConfig.SUSPEND_MAX_WAIT_MS_DEFAULT);
long byteCount =0;
try {
byteCount = super.fillCompactionHoplog(isMajor, targets, output, -1);
} catch (ForceReattemptException e) {
// we do not expect this in a unit test.
}
return byteCount;
}
}
}
HdfsSortedOplogOrganizer organizer = new MyOrganizer(regionManager, 0);
ArrayList<TestEvent> items = new ArrayList<TestEvent>();
for (int i = 0; i < 4; i++) {
items.clear();
items.add(new TestEvent("1" + i, "1" + i));
organizer.flush(items.iterator(), items.size());
}
final HoplogCompactor compactor = (HoplogCompactor) organizer.getCompactor();
ExecutorService service = Executors.newCachedThreadPool();
service.execute(new Runnable() {
public void run() {
try {
compactor.compact(false, false);
} catch (Exception e) {
}
}
});
final AtomicLong start = new AtomicLong(0);
final AtomicLong end = new AtomicLong(0);
service.execute(new Runnable() {
public void run() {
while (barrier.get()) {
try {
TimeUnit.MILLISECONDS.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
start.set(System.currentTimeMillis());
compactor.suspend();
end.set(System.currentTimeMillis());
}
});
for (long i = 0; i < 5; i++) {
if (end.get() == 0) {
TimeUnit.MILLISECONDS.sleep(HoplogConfig.SUSPEND_MAX_WAIT_MS_DEFAULT / 2);
} else {
break;
}
}
assertTrue(end.get() - start.get() < 100 + HoplogConfig.SUSPEND_MAX_WAIT_MS_DEFAULT);
}
public static class TestHoplog extends AbstractHoplog {
long size;
long creationTime;
TestHoplog(HDFSStoreImpl store, long size) throws IOException {
this(store, size, 0);
}
TestHoplog(HDFSStoreImpl store, long size, long creationTime) throws IOException {
super(store, new Path("1-" + creationTime + "-1.hop"), null);
this.size = size;
this.creationTime = creationTime;
}
@Override
public long getSize() {
return size;
}
@Override
public long getModificationTimeStamp() {
if (creationTime > 0) {
return creationTime;
}
return super.getModificationTimeStamp();
}
@Override
public String toString() {
long name = size - TEN_MB;
if (name < 0) name = size - (TEN_MB / 1024);
return name + "";
}
public boolean isClosed() {
return false;
}
public void close() throws IOException {
}
public HoplogReader getReader() throws IOException {
return null;
}
public HoplogWriter createWriter(int keys) throws IOException {
return null;
}
public void close(boolean clearCache) throws IOException {
}
}
}