blob: 8d2fae309d21a24c6a72753a7b043aec820abb80 [file] [log] [blame]
/*=========================================================================
* Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
* This product is protected by U.S. and international copyright
* and intellectual property laws. Pivotal products are covered by
* one or more patents listed at http://www.pivotal.io/patents.
*=========================================================================
*/
package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
import java.io.IOException;
import java.util.ArrayList;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.experimental.categories.Category;
import com.gemstone.gemfire.cache.hdfs.HDFSStoreMutator;
import com.gemstone.gemfire.cache.hdfs.internal.PersistedEventImpl;
import com.gemstone.gemfire.cache.hdfs.internal.UnsortedHoplogPersistedEvent;
import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HoplogSetReader.HoplogIterator;
import com.gemstone.gemfire.cache.hdfs.internal.hoplog.SequenceFileHoplog.SequenceFileIterator;
import com.gemstone.gemfire.internal.cache.tier.sockets.CacheServerHelper;
import com.gemstone.gemfire.test.junit.categories.HoplogTest;
import com.gemstone.gemfire.test.junit.categories.IntegrationTest;
/**
* Test class to test hoplog functionality for streaming ingest
*
* @author hemantb
*
*/
@Category({IntegrationTest.class, HoplogTest.class})
public class HDFSUnsortedHoplogOrganizerJUnitTest extends BaseHoplogTestCase {
/**
* Tests flush operation
*/
public void testFlush() throws Exception {
int count = 10;
int bucketId = (int) System.nanoTime();
HDFSUnsortedHoplogOrganizer organizer = new HDFSUnsortedHoplogOrganizer(regionManager, bucketId);
// flush and create hoplog
ArrayList<TestEvent> items = new ArrayList<TestEvent>();
for (int i = 0; i < count; i++) {
items.add(new TestEvent(("key-" + i), ("value-" + System.nanoTime())));
}
organizer.flush(items.iterator(), count);
organizer.closeCurrentWriter();
// check file existence in bucket directory
FileStatus[] hoplogs = getBucketHoplogs(getName() + "/" + bucketId,
HdfsSortedOplogOrganizer.SEQ_HOPLOG_EXTENSION);
// only one hoplog should exists
assertEquals(1, hoplogs.length);
readSequenceFile(hdfsStore.getFileSystem(), hoplogs[0].getPath(), 0);
}
public void testAlterRollOverInterval() throws Exception {
HDFSUnsortedHoplogOrganizer organizer = new HDFSUnsortedHoplogOrganizer(regionManager, 0);
// flush 4 times with small delays. Only one seq file will be created
ArrayList<TestEvent> items = new ArrayList<TestEvent>();
for (int j = 0; j < 3; j++) {
items.clear();
for (int i = 0; i < 10; i++) {
items.add(new TestEvent(("key-" + (i + 10 * j)), ("value-" + System.nanoTime())));
}
organizer.flush(items.iterator(), 10);
TimeUnit.MILLISECONDS.sleep(1100);
}
organizer.closeCurrentWriter();
FileStatus[] hoplogs = getBucketHoplogs(getName() + "/" + 0,
HdfsSortedOplogOrganizer.SEQ_HOPLOG_EXTENSION);
// only one hoplog should exists
assertEquals(1, hoplogs.length);
readSequenceFile(hdfsStore.getFileSystem(), hoplogs[0].getPath(), 0);
HDFSStoreMutator mutator = hdfsStore.createHdfsStoreMutator();
mutator.setFileRolloverInterval(1);
hdfsStore.alter(mutator);
TimeUnit.MILLISECONDS.sleep(1100);
for (int j = 0; j < 2; j++) {
items.clear();
for (int i = 0; i < 10; i++) {
items.add(new TestEvent(("key-" + (i + 10 * j)), ("value-" + System.nanoTime())));
}
organizer.flush(items.iterator(), 10);
TimeUnit.MILLISECONDS.sleep(1100);
}
organizer.closeCurrentWriter();
hoplogs = getBucketHoplogs(getName() + "/" + 0,
HdfsSortedOplogOrganizer.SEQ_HOPLOG_EXTENSION);
assertEquals(3, hoplogs.length);
}
public void testSequenceFileScan() throws Exception {
int count = 10000;
int bucketId = (int) System.nanoTime();
HDFSUnsortedHoplogOrganizer organizer = new HDFSUnsortedHoplogOrganizer(regionManager, bucketId);
// flush and create hoplog
ArrayList<TestEvent> items = new ArrayList<TestEvent>();
for (int i = 0; i < count; i++) {
items.add(new TestEvent(("key-" + i), ("value-" + System.nanoTime())));
}
organizer.flush(items.iterator(), count);
organizer.closeCurrentWriter();
// check file existence in bucket directory
FileStatus[] hoplogs = getBucketHoplogs(getName() + "/" + bucketId,
HdfsSortedOplogOrganizer.SEQ_HOPLOG_EXTENSION);
// only one hoplog should exists
assertEquals(1, hoplogs.length);
SequenceFileDetails sfd = getSequenceFileDetails(hdfsStore.getFileSystem(), hoplogs[0].getPath());
// End position is before a sync. Should read until sync.
readSequenceFile(hdfsStore.getFileSystem(), hoplogs[0].getPath(), 0, sfd.indexOfKeyBeforeSecondSync ,
0, sfd.posBeforeSecondSync);
// Start position is inside header. Should start from first key and go to next sync point.
readSequenceFile(hdfsStore.getFileSystem(), hoplogs[0].getPath(), 0, sfd.indexOfKeyBeforeSecondSync,
10, sfd.posAfterFirstSync);
// Start and end position are between two sync markers. Should not read any keys.
readSequenceFile(hdfsStore.getFileSystem(), hoplogs[0].getPath(), 29, 28,
sfd.posAfterFirstSync, sfd.posBeforeSecondSync - sfd.posAfterFirstSync);
// Start position is after a sync and End position is beyond the file size.
//Should read all the records after the next sync.
readSequenceFile(hdfsStore.getFileSystem(), hoplogs[0].getPath(), sfd.indexOfKeyAfterFirstSync, 9999,
sfd.posBeforeFirstSync, 10000000);
// Should read all the records.
readSequenceFile(hdfsStore.getFileSystem(), hoplogs[0].getPath(), 0, 9999, 0, -1);
}
class SequenceFileDetails {
public int posBeforeFirstSync;
public int indexOfKeyBeforeFirstSync;
public int posAfterFirstSync;
public int indexOfKeyAfterFirstSync;
public int posBeforeSecondSync;
public int indexOfKeyBeforeSecondSync;
}
public SequenceFileDetails getSequenceFileDetails(FileSystem inputFS, Path sequenceFileName) throws Exception {
SequenceFileDetails fd = new SequenceFileDetails();
SequenceFileHoplog hoplog = new SequenceFileHoplog(inputFS, sequenceFileName, null);
SequenceFileIterator iter = (SequenceFileIterator)hoplog.getReader().scan();;
int currentkeyStartPos = 0;
int cursorPos = 0;
String currentKey = null;
boolean firstSyncSeen = false;
try {
while (iter.hasNext()) {
iter.next();
currentkeyStartPos = cursorPos;
currentKey = ((String)CacheServerHelper.deserialize(iter.getKey()));
cursorPos = (int)iter.getPosition();
if (iter.syncSeen()){
if (firstSyncSeen) {
fd.posBeforeSecondSync = currentkeyStartPos;
fd.indexOfKeyBeforeSecondSync = Integer.parseInt(currentKey.substring(4));
break;
} else {
fd.posBeforeFirstSync = currentkeyStartPos;
fd.indexOfKeyBeforeFirstSync = Integer.parseInt(currentKey.substring(4));
fd.posAfterFirstSync = cursorPos;
fd.indexOfKeyAfterFirstSync = Integer.parseInt(currentKey.substring(4)) + 1;
firstSyncSeen = true;
}
}
}
} catch (Exception e) {
assertTrue(e.toString(), false);
}
iter.close();
hoplog.close();
return fd;
}
public void testClear() throws Exception {
int count = 10;
int bucketId = (int) System.nanoTime();
HDFSUnsortedHoplogOrganizer organizer = new HDFSUnsortedHoplogOrganizer(regionManager, bucketId);
// flush and create hoplog
ArrayList<TestEvent> items = new ArrayList<TestEvent>();
for (int i = 0; i < count; i++) {
items.add(new TestEvent(("key-" + i), ("value-" + System.nanoTime())));
}
organizer.flush(items.iterator(), count);
organizer.closeCurrentWriter();
// check file existence in bucket directory
FileStatus[] hoplogs = getBucketHoplogs(getName() + "/" + bucketId,
AbstractHoplogOrganizer.SEQ_HOPLOG_EXTENSION);
assertEquals(1, hoplogs.length);
readSequenceFile(hdfsStore.getFileSystem(), hoplogs[0].getPath(), 0);
// write another batch but do not close the data.
organizer.flush(items.iterator(), count);
organizer.clear();
hoplogs = getBucketHoplogs(getName() + "/" + bucketId,
AbstractHoplogOrganizer.SEQ_HOPLOG_EXTENSION);
// check file existence in bucket directory
FileStatus[] expiredhoplogs = getBucketHoplogs(getName() + "/" + bucketId,
AbstractHoplogOrganizer.EXPIRED_HOPLOG_EXTENSION);
// two expired hoplog should exists
assertEquals(2, expiredhoplogs.length);
assertEquals(2, hoplogs.length);
// check the expired hops name should be same
assertTrue(expiredhoplogs[0].getPath().getName().equals(hoplogs[0].getPath().getName()+ AbstractHoplogOrganizer.EXPIRED_HOPLOG_EXTENSION) ||
expiredhoplogs[1].getPath().getName().equals(hoplogs[0].getPath().getName()+ AbstractHoplogOrganizer.EXPIRED_HOPLOG_EXTENSION) );
assertTrue(expiredhoplogs[0].getPath().getName().equals(hoplogs[1].getPath().getName()+ AbstractHoplogOrganizer.EXPIRED_HOPLOG_EXTENSION) ||
expiredhoplogs[1].getPath().getName().equals(hoplogs[1].getPath().getName()+ AbstractHoplogOrganizer.EXPIRED_HOPLOG_EXTENSION) );
// Test that second time clear should be harmless and should not result in extra files.
organizer.clear();
hoplogs = getBucketHoplogs(getName() + "/" + bucketId,
AbstractHoplogOrganizer.SEQ_HOPLOG_EXTENSION);
// check file existence in bucket directory
expiredhoplogs = getBucketHoplogs(getName() + "/" + bucketId,
AbstractHoplogOrganizer.EXPIRED_HOPLOG_EXTENSION);
// two expired hoplog should exists
assertEquals(2, expiredhoplogs.length);
assertEquals(2, hoplogs.length);
// check the expired hops name should be same
assertTrue(expiredhoplogs[0].getPath().getName().equals(hoplogs[0].getPath().getName()+ AbstractHoplogOrganizer.EXPIRED_HOPLOG_EXTENSION) ||
expiredhoplogs[1].getPath().getName().equals(hoplogs[0].getPath().getName()+ AbstractHoplogOrganizer.EXPIRED_HOPLOG_EXTENSION) );
assertTrue(expiredhoplogs[0].getPath().getName().equals(hoplogs[1].getPath().getName()+ AbstractHoplogOrganizer.EXPIRED_HOPLOG_EXTENSION) ||
expiredhoplogs[1].getPath().getName().equals(hoplogs[1].getPath().getName()+ AbstractHoplogOrganizer.EXPIRED_HOPLOG_EXTENSION) );
readSequenceFile(hdfsStore.getFileSystem(), hoplogs[0].getPath(), 0);
readSequenceFile(hdfsStore.getFileSystem(), hoplogs[1].getPath(), 0);
}
public void readSequenceFile(FileSystem inputFS, Path sequenceFileName, int index) throws IOException{
readSequenceFile(inputFS, sequenceFileName, index, -1, 0, -1);
}
/**
* Reads the sequence file assuming that it has keys and values starting from index that
* is specified as parameter.
*
*/
public void readSequenceFile(FileSystem inputFS, Path sequenceFileName, int index, int endIndex,
int startoffset, int length) throws IOException {
SequenceFileHoplog hoplog = new SequenceFileHoplog(inputFS, sequenceFileName, null);
HoplogIterator<byte[], byte[]> iter = null;
if (length == -1){
iter = hoplog.getReader().scan();
}
else {
iter = hoplog.getReader().scan(startoffset, length);
}
try {
while (iter.hasNext()) {
iter.next();
PersistedEventImpl te = UnsortedHoplogPersistedEvent.fromBytes(iter.getValue());
String stringkey = ((String)CacheServerHelper.deserialize(iter.getKey()));
assertTrue("Expected key: key-" + index + ". Actual key: " + stringkey , ((String)stringkey).equals("key-" + index));
index++;
}
if (endIndex != -1)
assertTrue ("The keys should have been until key-"+ endIndex + " but they are until key-"+ (index-1), index == endIndex + 1) ;
} catch (Exception e) {
assertTrue(e.toString(), false);
}
iter.close();
hoplog.close();
}
}