blob: ea9dc1181f52553a4282f89a523779c85ff92fae [file] [log] [blame]
/*=========================================================================
* Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
* This product is protected by U.S. and international copyright
* and intellectual property laws. Pivotal products are covered by
* one or more patents listed at http://www.pivotal.io/patents.
*=========================================================================
*/
package com.gemstone.gemfire.cache.hdfs.internal;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import com.gemstone.gemfire.cache.AttributesFactory;
import com.gemstone.gemfire.cache.DataPolicy;
import com.gemstone.gemfire.cache.EvictionAction;
import com.gemstone.gemfire.cache.EvictionAttributes;
import com.gemstone.gemfire.cache.PartitionAttributesFactory;
import com.gemstone.gemfire.cache.Region;
import com.gemstone.gemfire.cache.hdfs.HDFSEventQueueAttributesFactory;
import com.gemstone.gemfire.cache.hdfs.HDFSStoreFactory;
import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
import com.gemstone.gemfire.internal.cache.LocalRegion;
import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
import dunit.AsyncInvocation;
import dunit.SerializableCallable;
import dunit.VM;
/**
* A class for testing the basic HDFS functionality
*
* @author Hemant Bhanawat
*/
public class ColocatedRegionWithHDFSDUnitTest extends RegionWithHDFSTestBase {
public ColocatedRegionWithHDFSDUnitTest(String name) {
super(name);
}
@Override
protected SerializableCallable getCreateRegionCallable(
final int totalnumOfBuckets, final int batchSizeMB,
final int maximumEntries, final String folderPath,
final String uniqueName, final int batchInterval,
final boolean queuePersistent, final boolean writeonly,
final long timeForRollover, final long maxFileSize) {
SerializableCallable createRegion = new SerializableCallable() {
public Object call() throws Exception {
HDFSEventQueueAttributesFactory hqf = new HDFSEventQueueAttributesFactory();
hqf.setBatchSizeMB(batchSizeMB);
hqf.setPersistent(queuePersistent);
hqf.setMaximumQueueMemory(3);
hqf.setBatchTimeInterval(batchInterval);
HDFSStoreFactory hsf = getCache().createHDFSStoreFactory();
String homeDir = tmpDir + "/" + folderPath;
hsf.setHomeDir(homeDir);
hsf.setHDFSEventQueueAttributes(hqf.create());
hsf.create(uniqueName);
AttributesFactory af = new AttributesFactory();
af.setDataPolicy(DataPolicy.HDFS_PARTITION);
PartitionAttributesFactory paf = new PartitionAttributesFactory();
paf.setTotalNumBuckets(totalnumOfBuckets);
paf.setRedundantCopies(1);
af.setHDFSStoreName(uniqueName);
af.setPartitionAttributes(paf.create());
af.setEvictionAttributes(EvictionAttributes.createLRUEntryAttributes(
maximumEntries, EvictionAction.LOCAL_DESTROY));
af.setHDFSWriteOnly(writeonly);
Region r1 = createRootRegion(uniqueName + "-r1", af.create());
paf.setColocatedWith(uniqueName + "-r1");
af.setPartitionAttributes(paf.create());
af.setEvictionAttributes(EvictionAttributes.createLRUEntryAttributes(
maximumEntries, EvictionAction.LOCAL_DESTROY));
Region r2 = createRootRegion(uniqueName + "-r2", af.create());
((LocalRegion) r1).setIsTest();
((LocalRegion) r2).setIsTest();
return 0;
}
};
return createRegion;
}
@Override
protected void doPuts(String uniqueName, int start, int end) {
Region r1 = getRootRegion(uniqueName + "-r1");
Region r2 = getRootRegion(uniqueName + "-r2");
for (int i = start; i < end; i++) {
r1.put("K" + i, "V" + i);
r2.put("K" + i, "V" + i);
}
}
protected AsyncInvocation doAsyncPuts(VM vm, final String regionName,
final int start, final int end, final String suffix) throws Exception {
return vm.invokeAsync(new SerializableCallable() {
public Object call() throws Exception {
Region r1 = getRootRegion(regionName + "-r1");
Region r2 = getRootRegion(regionName + "-r2");
getCache().getLogger().info("Putting entries ");
for (int i = start; i < end; i++) {
r1.put("K" + i, "V" + i + suffix);
r2.put("K" + i, "V" + i + suffix);
}
return null;
}
});
}
protected void doPutAll(final String uniqueName, Map map) {
Region r1 = getRootRegion(uniqueName + "-r1");
Region r2 = getRootRegion(uniqueName + "-r2");
r1.putAll(map);
r2.putAll(map);
}
@Override
protected void doDestroys(String uniqueName, int start, int end) {
Region r1 = getRootRegion(uniqueName + "-r1");
Region r2 = getRootRegion(uniqueName + "-r2");
for (int i = start; i < end; i++) {
r1.destroy("K" + i);
r2.destroy("K" + i);
}
}
@Override
protected void checkWithGet(String uniqueName, int start, int end,
boolean expectValue) {
Region r1 = getRootRegion(uniqueName + "-r1");
Region r2 = getRootRegion(uniqueName + "-r2");
for (int i = start; i < end; i++) {
String expected = expectValue ? "V" + i : null;
assertEquals("Mismatch on key " + i, expected, r1.get("K" + i));
assertEquals("Mismatch on key " + i, expected, r2.get("K" + i));
}
}
protected void checkWithGetAll(String uniqueName, ArrayList arrayl) {
Region r1 = getRootRegion(uniqueName + "-r1");
Region r2 = getRootRegion(uniqueName + "-r2");
Map map1 = r1.getAll(arrayl);
Map map2 = r2.getAll(arrayl);
for (Object e : map1.keySet()) {
String v = e.toString().replaceFirst("K", "V");
assertTrue("Reading entries failed for key " + e + " where value = "
+ map1.get(e), v.equals(map1.get(e)));
assertTrue("Reading entries failed for key " + e + " where value = "
+ map2.get(e), v.equals(map2.get(e)));
}
}
@Override
protected void verifyHDFSData(VM vm, String uniqueName) throws Exception {
HashMap<String, HashMap<String, String>> filesToEntriesMap = createFilesAndEntriesMap(
vm, uniqueName, uniqueName + "-r1");
HashMap<String, String> entriesMap = new HashMap<String, String>();
for (Map.Entry<String, HashMap<String, String>> e : filesToEntriesMap
.entrySet()) {
entriesMap.putAll(e.getValue());
}
verifyInEntriesMap(entriesMap, 1, 50, "vm0");
verifyInEntriesMap(entriesMap, 40, 100, "vm1");
verifyInEntriesMap(entriesMap, 40, 100, "vm2");
verifyInEntriesMap(entriesMap, 90, 150, "vm3");
filesToEntriesMap = createFilesAndEntriesMap(vm, uniqueName, uniqueName
+ "-r2");
entriesMap = new HashMap<String, String>();
for (Map.Entry<String, HashMap<String, String>> e : filesToEntriesMap
.entrySet()) {
entriesMap.putAll(e.getValue());
}
verifyInEntriesMap(entriesMap, 1, 50, "vm0");
verifyInEntriesMap(entriesMap, 40, 100, "vm1");
verifyInEntriesMap(entriesMap, 40, 100, "vm2");
verifyInEntriesMap(entriesMap, 90, 150, "vm3");
}
}