blob: 06c3c4bb68f275e25c5062bb8a778abe2afd9b99 [file] [log] [blame]
/*=========================================================================
* Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
* This product is protected by U.S. and international copyright
* and intellectual property laws. Pivotal products are covered by
* one or more patents listed at http://www.pivotal.io/patents.
*=========================================================================
*/
package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.net.URI;
import java.util.HashSet;
import java.util.Random;
import java.util.Set;
import java.util.TreeMap;
import junit.framework.TestCase;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import com.gemstone.gemfire.cache.Cache;
import com.gemstone.gemfire.cache.CacheFactory;
import com.gemstone.gemfire.cache.Operation;
import com.gemstone.gemfire.cache.Region;
import com.gemstone.gemfire.cache.RegionFactory;
import com.gemstone.gemfire.cache.RegionShortcut;
import com.gemstone.gemfire.cache.SerializedCacheValue;
import com.gemstone.gemfire.cache.TransactionId;
import com.gemstone.gemfire.cache.hdfs.HDFSStore;
import com.gemstone.gemfire.cache.hdfs.HDFSStoreFactory;
import com.gemstone.gemfire.cache.hdfs.HDFSStoreFactory.HDFSCompactionConfigFactory;
import com.gemstone.gemfire.cache.hdfs.HDFSStoreMutator;
import com.gemstone.gemfire.cache.hdfs.internal.HDFSStoreFactoryImpl;
import com.gemstone.gemfire.cache.hdfs.internal.HDFSStoreImpl;
import com.gemstone.gemfire.cache.hdfs.internal.SortedHDFSQueuePersistedEvent;
import com.gemstone.gemfire.cache.hdfs.internal.HDFSStoreImpl.FileSystemFactory;
import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSRegionDirector.HdfsRegionManager;
import com.gemstone.gemfire.cache.hdfs.internal.hoplog.Hoplog.HoplogWriter;
import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HoplogOrganizer.Compactor;
import com.gemstone.gemfire.distributed.DistributedMember;
import com.gemstone.gemfire.internal.cache.LocalRegion;
import com.gemstone.gemfire.internal.cache.persistence.soplog.HFileStoreStatistics;
import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedOplogStatistics;
import com.gemstone.gemfire.internal.cache.versions.DiskVersionTag;
import com.gemstone.gemfire.internal.util.BlobHelper;
import org.apache.hadoop.hbase.io.hfile.BlockCache;
import dunit.DistributedTestCase;
import dunit.DistributedTestCase.ExpectedException;
public abstract class BaseHoplogTestCase extends TestCase {
public static final String HDFS_STORE_NAME = "hdfs";
public static final Random rand = new Random(System.currentTimeMillis());
protected Path testDataDir;
protected Cache cache;
protected HDFSRegionDirector director;
protected HdfsRegionManager regionManager;
protected HDFSStoreFactory hsf;
protected HDFSStoreImpl hdfsStore;
protected RegionFactory<Object, Object> regionfactory;
protected Region<Object, Object> region;
protected SortedOplogStatistics stats;
protected HFileStoreStatistics storeStats;
protected BlockCache blockCache;
Set<ExpectedException> exceptions = new HashSet<ExpectedException>();
@Override
protected void setUp() throws Exception {
super.setUp();
System.setProperty(HoplogConfig.ALLOW_LOCAL_HDFS_PROP, "true");
//This is logged by HDFS when it is stopped.
exceptions.add(DistributedTestCase.addExpectedException("sleep interrupted"));
exceptions.add(DistributedTestCase.addExpectedException("java.io.InterruptedIOException"));
testDataDir = new Path("test-case");
cache = createCache();
configureHdfsStoreFactory();
hdfsStore = (HDFSStoreImpl) hsf.create(HDFS_STORE_NAME);
regionfactory = cache.createRegionFactory(RegionShortcut.PARTITION_HDFS);
regionfactory.setHDFSStoreName(HDFS_STORE_NAME);
region = regionfactory.create(getName());
// disable compaction by default and clear existing queues
HDFSCompactionManager compactionManager = HDFSCompactionManager.getInstance(hdfsStore);
compactionManager.reset();
director = HDFSRegionDirector.getInstance();
director.setCache(cache);
regionManager = ((LocalRegion)region).getHdfsRegionManager();
stats = director.getHdfsRegionStats("/" + getName());
storeStats = hdfsStore.getStats();
blockCache = hdfsStore.getBlockCache();
AbstractHoplogOrganizer.JUNIT_TEST_RUN = true;
}
protected void configureHdfsStoreFactory() throws Exception {
hsf = this.cache.createHDFSStoreFactory();
hsf.setHomeDir(testDataDir.toString());
HDFSCompactionConfigFactory cc = hsf.createCompactionConfigFactory(null);
hsf.setMinorCompaction(false);
cc.setAutoMajorCompaction(false);
hsf.setHDFSCompactionConfig(cc.create());
}
protected Cache createCache() {
CacheFactory cf = new CacheFactory().set("mcast-port", "0")
.set("log-level", "info")
;
cache = cf.create();
return cache;
}
@Override
protected void tearDown() throws Exception {
if (region != null) {
region.destroyRegion();
}
if (hdfsStore != null) {
hdfsStore.getFileSystem().delete(testDataDir, true);
hdfsStore.destroy();
}
if (cache != null) {
cache.close();
}
super.tearDown();
for (ExpectedException ex: exceptions) {
ex.remove();
}
}
/**
* creates a hoplog file with numKeys records. Keys follow key-X pattern and values follow value-X
* pattern where X=0 to X is = numKeys -1
*
* @return the sorted map of inserted KVs
*/
protected TreeMap<String, String> createHoplog(int numKeys, Hoplog oplog) throws IOException {
int offset = (numKeys > 10 ? 100000 : 0);
HoplogWriter writer = oplog.createWriter(numKeys);
TreeMap<String, String> map = new TreeMap<String, String>();
for (int i = offset; i < (numKeys + offset); i++) {
String key = ("key-" + i);
String value = ("value-" + System.nanoTime());
writer.append(key.getBytes(), value.getBytes());
map.put(key, value);
}
writer.close();
return map;
}
protected FileStatus[] getBucketHoplogs(String regionAndBucket, final String type)
throws IOException {
return getBucketHoplogs(hdfsStore.getFileSystem(), regionAndBucket, type);
}
protected FileStatus[] getBucketHoplogs(FileSystem fs, String regionAndBucket, final String type)
throws IOException {
FileStatus[] hoplogs = fs.listStatus(
new Path(testDataDir, regionAndBucket), new PathFilter() {
@Override
public boolean accept(Path file) {
return file.getName().endsWith(type);
}
});
return hoplogs;
}
protected String getRandomHoplogName() {
String hoplogName = "hoplog-" + System.nanoTime() + "-" + rand.nextInt(10000) + ".hop";
return hoplogName;
}
// public static MiniDFSCluster initMiniCluster(int port, int numDN) throws Exception {
// HashMap<String, String> map = new HashMap<String, String>();
// map.put(DFSConfigKeys.DFS_REPLICATION_KEY, "1");
// return initMiniCluster(port, numDN, map);
// }
//
// public static MiniDFSCluster initMiniCluster(int port, int numDN, HashMap<String, String> map) throws Exception {
// System.setProperty("test.build.data", "hdfs-test-cluster");
// Configuration hconf = new HdfsConfiguration();
// for (Entry<String, String> entry : map.entrySet()) {
// hconf.set(entry.getKey(), entry.getValue());
// }
//
// hconf.set("dfs.namenode.fs-limits.min-block-size", "1024");
//
// Builder builder = new MiniDFSCluster.Builder(hconf);
// builder.numDataNodes(numDN);
// builder.nameNodePort(port);
// MiniDFSCluster cluster = builder.build();
// return cluster;
// }
public static void setConfigFile(HDFSStoreFactory factory, File configFile, String config)
throws Exception {
BufferedWriter bw = new BufferedWriter(new FileWriter(configFile));
bw.write(config);
bw.close();
factory.setHDFSClientConfigFile(configFile.getName());
}
public static void alterMajorCompaction(HDFSStoreImpl store, boolean enable) {
HDFSStoreMutator mutator = store.createHdfsStoreMutator();
mutator.getCompactionConfigMutator().setAutoMajorCompaction(enable);
store.alter(mutator);
}
public static void alterMinorCompaction(HDFSStoreImpl store, boolean enable) {
HDFSStoreMutator mutator = store.createHdfsStoreMutator();
mutator.setMinorCompaction(enable);
store.alter(mutator);
}
public void deleteMiniClusterDir() throws Exception {
File clusterDir = new File("hdfs-test-cluster");
if (clusterDir.exists()) {
FileUtils.deleteDirectory(clusterDir);
}
}
public static class TestEvent extends SortedHDFSQueuePersistedEvent {
Object key;
public TestEvent(String k, String v) throws Exception {
this(k, v, Operation.PUT_IF_ABSENT);
}
public TestEvent(String k, String v, Operation op) throws Exception {
super(v, op, (byte) 0x02, false, new DiskVersionTag(), BlobHelper.serializeToBlob(k), 0);
this.key = k;
}
public Object getKey() {
return key;
}
public Object getNewValue() {
return valueObject;
}
public Operation getOperation() {
return op;
}
public Region<Object, Object> getRegion() {
return null;
}
public Object getCallbackArgument() {
return null;
}
public boolean isCallbackArgumentAvailable() {
return false;
}
public boolean isOriginRemote() {
return false;
}
public DistributedMember getDistributedMember() {
return null;
}
public boolean isExpiration() {
return false;
}
public boolean isDistributed() {
return false;
}
public Object getOldValue() {
return null;
}
public SerializedCacheValue<Object> getSerializedOldValue() {
return null;
}
public SerializedCacheValue<Object> getSerializedNewValue() {
return null;
}
public boolean isLocalLoad() {
return false;
}
public boolean isNetLoad() {
return false;
}
public boolean isLoad() {
return false;
}
public boolean isNetSearch() {
return false;
}
public TransactionId getTransactionId() {
return null;
}
public boolean isBridgeEvent() {
return false;
}
public boolean hasClientOrigin() {
return false;
}
public boolean isOldValueAvailable() {
return false;
}
}
public abstract class AbstractCompactor implements Compactor {
@Override
public HDFSStore getHdfsStore() {
return hdfsStore;
}
public void suspend() {
}
public void resume() {
}
public boolean isBusy(boolean isMajor) {
return false;
}
}
public HDFSStoreFactoryImpl getCloseableLocalHdfsStoreFactory() {
final FileSystemFactory fsFactory = new FileSystemFactory() {
// by default local FS instance is not disabled by close. Hence this
// customization
class CustomFileSystem extends LocalFileSystem {
boolean isClosed = false;
public void close() throws IOException {
isClosed = true;
super.close();
}
public FileStatus getFileStatus(Path f) throws IOException {
if (isClosed) {
throw new IOException();
}
return super.getFileStatus(f);
}
}
public FileSystem create(URI namenode, Configuration conf, boolean forceNew) throws IOException {
CustomFileSystem fs = new CustomFileSystem();
fs.initialize(namenode, conf);
return fs;
}
};
HDFSStoreFactoryImpl storeFactory = new HDFSStoreFactoryImpl(cache) {
public HDFSStore create(String name) {
return new HDFSStoreImpl(name, this.configHolder) {
public FileSystemFactory getFileSystemFactory() {
return fsFactory;
}
};
}
};
return storeFactory;
}
}