blob: 6b09e6ba298e5d461e8869b762f7a57175f17963 [file] [log] [blame]
/*=========================================================================
* Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
* This product is protected by U.S. and international copyright
* and intellectual property laws. Pivotal products are covered by
* one or more patents listed at http://www.pivotal.io/patents.
*=========================================================================
*/
package com.gemstone.gemfire.internal.compression;
import java.io.IOException;
import com.gemstone.gemfire.cache.DataPolicy;
import com.gemstone.gemfire.cache.Region;
import com.gemstone.gemfire.cache.client.ClientCache;
import com.gemstone.gemfire.cache.client.ClientCacheFactory;
import com.gemstone.gemfire.cache.client.ClientRegionShortcut;
import com.gemstone.gemfire.cache.server.CacheServer;
import com.gemstone.gemfire.cache30.CacheTestCase;
import com.gemstone.gemfire.compression.Compressor;
import com.gemstone.gemfire.compression.SnappyCompressor;
import dunit.DistributedTestCase;
import dunit.Host;
import dunit.SerializableCallable;
import dunit.SerializableRunnable;
import dunit.VM;
import dunit.standalone.DUnitLauncher;
/**
* Sanity checks on a number of basic cluster configurations with compression turned on.
*
* @author rholmes
*/
public class CompressionRegionConfigDUnitTest extends CacheTestCase {
/**
* The name of our test region.
*/
public static final String REGION_NAME = "compressedRegion";
/**
* Name of the common disk store used by all tests.
*/
public static final String DISK_STORE = "diskStore";
/**
* A key.
*/
public static final String KEY_1 = "key1";
/**
* A value.
*/
public static final String VALUE_1 = "Lorem ipsum dolor sit amet, consectetur adipiscing elit. Aliquam auctor bibendum tempus. Suspendisse potenti. Ut enim neque, mattis et mattis ac, vulputate quis leo. Cras a metus metus, eget cursus ipsum. Aliquam sagittis condimentum massa aliquet rhoncus. Aliquam sed luctus neque. In hac habitasse platea dictumst.";
/**
* Creates a new CompressionRegionOperationsDUnitTest.
* @param name a test name.
*/
public CompressionRegionConfigDUnitTest(String name) {
super(name);
}
@Override
public void setUp() throws Exception {
super.setUp();
}
@Override
public void tearDown2() throws Exception {
super.tearDown2();
}
/**
* Sanity check using two peers sharing a replicated region.
* @throws Exception
*/
public void testReplicateRegion() throws Exception {
Compressor compressor = null;
try {
compressor = SnappyCompressor.getDefaultInstance();
} catch (Throwable t) {
// Not a supported OS
return;
}
assertTrue(createCompressedRegionOnVm(getVM(0), REGION_NAME, DataPolicy.REPLICATE, compressor));
assertTrue(createCompressedRegionOnVm(getVM(1), REGION_NAME, DataPolicy.REPLICATE, compressor));
assertNull(putUsingVM(getVM(0),KEY_1,VALUE_1));
waitOnPut(getVM(1),KEY_1);
assertEquals(VALUE_1,getUsingVM(getVM(1), KEY_1));
cleanup(getVM(0));
}
/**
* Sanity check for two peers sharing a persisted replicated region.
* @throws Exception
*/
public void testReplicatePersistentRegion() throws Exception {
Compressor compressor = null;
try {
compressor = SnappyCompressor.getDefaultInstance();
} catch (Throwable t) {
// Not a supported OS
return;
}
assertTrue(createCompressedRegionOnVm(getVM(0), REGION_NAME, DataPolicy.PERSISTENT_REPLICATE, compressor, DISK_STORE));
assertTrue(createCompressedRegionOnVm(getVM(1), REGION_NAME, DataPolicy.PERSISTENT_REPLICATE, compressor));
assertNull(putUsingVM(getVM(0),KEY_1,VALUE_1));
waitOnPut(getVM(1),KEY_1);
flushDiskStoreOnVM(getVM(0), DISK_STORE);
closeRegionOnVM(getVM(1), REGION_NAME);
assertTrue(createCompressedRegionOnVm(getVM(1), REGION_NAME, DataPolicy.PERSISTENT_REPLICATE, compressor));
assertEquals(VALUE_1,getUsingVM(getVM(1), KEY_1));
cleanup(getVM(0));
}
/**
* Sanity check for two peers hosting a partitioned region.
*/
public void testPartitionedRegion() {
Compressor compressor = null;
try {
compressor = SnappyCompressor.getDefaultInstance();
} catch (Throwable t) {
// Not a supported OS
return;
}
assertTrue(createCompressedRegionOnVm(getVM(0), REGION_NAME, DataPolicy.PARTITION, compressor));
assertTrue(createCompressedRegionOnVm(getVM(1), REGION_NAME, DataPolicy.PARTITION, compressor));
assertNull(putUsingVM(getVM(0),KEY_1,VALUE_1));
waitOnPut(getVM(1),KEY_1);
assertEquals(VALUE_1,getUsingVM(getVM(1), KEY_1));
cleanup(getVM(0));
}
/**
* Sanity check for two peers hosting a persistent partitioned region.
*/
public void testPartitionedPersistentRegion() {
Compressor compressor = null;
try {
compressor = SnappyCompressor.getDefaultInstance();
} catch (Throwable t) {
// Not a supported OS
return;
}
assertTrue(createCompressedRegionOnVm(getVM(0), REGION_NAME, DataPolicy.PERSISTENT_PARTITION, compressor, DISK_STORE));
assertTrue(createCompressedRegionOnVm(getVM(1), REGION_NAME, DataPolicy.PERSISTENT_PARTITION, compressor));
assertNull(putUsingVM(getVM(0),KEY_1,VALUE_1));
waitOnPut(getVM(1),KEY_1);
flushDiskStoreOnVM(getVM(0), DISK_STORE);
closeRegionOnVM(getVM(1), REGION_NAME);
assertTrue(createCompressedRegionOnVm(getVM(1), REGION_NAME, DataPolicy.PERSISTENT_PARTITION, compressor));
assertEquals(VALUE_1,getUsingVM(getVM(1), KEY_1));
cleanup(getVM(0));
}
/**
* Sanity check for a non caching client and a cache server.
*/
public void testClientProxyRegion() {
Compressor compressor = null;
try {
compressor = SnappyCompressor.getDefaultInstance();
} catch (Throwable t) {
// Not a supported OS
return;
}
assertTrue(createCompressedServerRegionOnVm(getVM(0), REGION_NAME, DataPolicy.REPLICATE, compressor));
assertTrue(createCompressedClientRegionOnVm(getVM(1), REGION_NAME, compressor, ClientRegionShortcut.PROXY));
assertNull(putUsingClientVM(getVM(1),KEY_1,VALUE_1));
assertEquals(VALUE_1,getUsingClientVM(getVM(1), KEY_1));
assertEquals(VALUE_1,getUsingVM(getVM(0),KEY_1));
cleanupClient(getVM(1));
cleanup(getVM(0));
}
/**
* Sanity check for a caching client and a cache server.
*/
public void testCachingClientProxyRegion() {
Compressor compressor = null;
try {
compressor = SnappyCompressor.getDefaultInstance();
} catch (Throwable t) {
// Not a supported OS
return;
}
assertTrue(createCompressedServerRegionOnVm(getVM(0), REGION_NAME, DataPolicy.REPLICATE, compressor));
assertTrue(createCompressedClientRegionOnVm(getVM(1), REGION_NAME, compressor, ClientRegionShortcut.CACHING_PROXY));
assertNull(putUsingClientVM(getVM(1),KEY_1,VALUE_1));
assertEquals(VALUE_1,getUsingClientVM(getVM(1), KEY_1));
assertEquals(VALUE_1,getUsingVM(getVM(0),KEY_1));
cleanupClient(getVM(1));
cleanup(getVM(0));
}
/**
* closes a region on a virtual machine.
* @param vm a virtual machine.
* @param region the region to close.
*/
private void closeRegionOnVM(final VM vm, final String region) {
vm.invoke(new SerializableRunnable() {
@Override
public void run() {
getCache().getRegion(region).close();
}
});
}
/**
* Flushes a disk store on a vm causing region entries to be written to disk.
* @param vm the virtual machine to perform the flush on.
* @param diskStore the disk store to flush.
*/
private void flushDiskStoreOnVM(final VM vm, final String diskStore) {
vm.invoke(new SerializableRunnable() {
@Override
public void run() {
getCache().findDiskStore(diskStore).flush();
}
});
}
/**
* Returns when a put has been replicated to a peer.
* @param vm a peer.
* @param key the key to wait on.
*/
private void waitOnPut(final VM vm, final String key) {
DistributedTestCase.waitForCriterion(new DistributedTestCase.WaitCriterion() {
@Override
public boolean done() {
return (getUsingVM(vm, key) != null);
}
@Override
public String description() {
return "Waiting on " + key + " to replicate.";
}
},2000,500,true);
}
/**
* Performs a put operation on a client virtual machine.
* @param vm a client.
* @param key the key to put.
* @param value the value to put.
* @return the old value.
*/
private String putUsingClientVM(final VM vm,final String key,final String value) {
return (String) vm.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
ClientCache cache = getClientCache(getClientCacheFactory(getLocatorPort()));
Region<String,String> region = cache.getRegion(REGION_NAME);
assertNotNull(region);
return region.put(key, value);
}
});
}
/**
* Performs a put operation on a peer.
* @param vm a peer.
* @param key the key to put.
* @param value the value to put.
* @return the old value.
*/
private String putUsingVM(final VM vm,final String key,final String value) {
return (String) vm.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
Region<String,String> region = getCache().getRegion(REGION_NAME);
assertNotNull(region);
return region.put(key, value);
}
});
}
/**
* Performs a get operation on a client.
* @param vm a client.
* @param key a region entry key.
* @return the value.
*/
private String getUsingClientVM(final VM vm,final String key) {
return (String) vm.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
ClientCache cache = getClientCache(getClientCacheFactory(getLocatorPort()));
Region<String,String> region = cache.getRegion(REGION_NAME);
assertNotNull(region);
return region.get(key);
}
});
}
/**
* Performs a get operation on a peer.
* @param vm the peer.
* @param key a region entry key.
* @return the value.
*/
private String getUsingVM(final VM vm,final String key) {
return (String) vm.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
Region<String,String> region = getCache().getRegion(REGION_NAME);
assertNotNull(region);
return region.get(key);
}
});
}
/**
* Returns the VM for a given identifier.
* @param vm a virtual machine identifier.
*/
private VM getVM(int vm) {
return Host.getHost(0).getVM(vm);
}
/**
* Closes opened regions on a client.
* @param vm a client.
*/
private void cleanupClient(final VM vm) {
vm.invoke(new SerializableRunnable() {
@Override
public void run() {
getClientCache(getClientCacheFactory(getLocatorPort())).getRegion(REGION_NAME).close();
}
});
}
/**
* Removes created regions from a VM.
* @param vm the virtual machine to cleanup.
*/
private void cleanup(final VM vm) {
vm.invoke(new SerializableRunnable() {
@Override
public void run() {
Region<String,String> region = getCache().getRegion(REGION_NAME);
assertNotNull(region);
region.destroyRegion();
}
});
}
/**
* Creates a region and assigns a compressor..
* @param vm a virtual machine to create the region on.
* @param name a region name.
* @param compressor a compressor.
* @return true if successfully created, otherwise false.
*/
private boolean createCompressedServerRegionOnVm(final VM vm,final String name,final DataPolicy dataPolicy,final Compressor compressor) {
return (Boolean) vm.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
try {
assertNotNull(createServerRegion(name,dataPolicy,compressor));
} catch(Exception e) {
getLogWriter().error("Could not create the compressed region", e);
return Boolean.FALSE;
}
return Boolean.TRUE;
}
});
}
/**
* Creates a region and assigns a compressor.
* @param vm a virtual machine to create the region on.
* @param name a region name.
* @param compressor a compressor.
* @return true if successfully created, otherwise false.
*/
private boolean createCompressedRegionOnVm(final VM vm,final String name,final DataPolicy dataPolicy,final Compressor compressor) {
return (Boolean) vm.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
try {
assertNotNull(createRegion(name,dataPolicy,compressor));
} catch(Exception e) {
getLogWriter().error("Could not create the compressed region", e);
return Boolean.FALSE;
}
return Boolean.TRUE;
}
});
}
/**
* Creates a region and assigns a compressor.
* @param vm a virtual machine to create the region on.
* @param name a region name.
* @param compressor a compressor.
* @return true if successfully created, otherwise false.
*/
private boolean createCompressedRegionOnVm(final VM vm,final String name,final DataPolicy dataPolicy,final Compressor compressor,final String diskStoreName) {
return (Boolean) vm.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
try {
assertNotNull(createRegion(name,dataPolicy,compressor,diskStoreName));
} catch(Exception e) {
getLogWriter().error("Could not create the compressed region", e);
return Boolean.FALSE;
}
return Boolean.TRUE;
}
});
}
/**
* Creates a compressed region on a client.
* @param vm the client.
* @param name the region.
* @param compressor a compressor.
* @param shortcut type of client.
* @return true if created, false otherwise.
*/
private boolean createCompressedClientRegionOnVm(final VM vm, final String name, final Compressor compressor, final ClientRegionShortcut shortcut) {
return (Boolean) vm.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
try {
assertNotNull(createClientRegion(name,compressor,shortcut));
} catch(Exception e) {
getLogWriter().error("Could not create the compressed region", e);
return Boolean.FALSE;
}
return Boolean.TRUE;
}
});
}
/**
* Creates a compressed region for a client.
* @param name a region.
* @param compressor a compressor.
* @param shortcut the type of client.
* @return the newly created region.
*/
private Region<String,String> createClientRegion(String name,Compressor compressor,ClientRegionShortcut shortcut) {
ClientCacheFactory factory = getClientCacheFactory(getLocatorPort());
return getClientCache(factory).<String,String>createClientRegionFactory(shortcut).setCloningEnabled(true).setCompressor(compressor).create(name);
}
/**
* Creates a region and assigns a compressor.
* @param name the region name.
* @param dataPolicy the type of peer.
* @param compressor a compressor.
* @return the newly created region.
*/
private Region<String,String> createRegion(String name,DataPolicy dataPolicy,Compressor compressor) {
return getCache().<String,String>createRegionFactory().setDataPolicy(dataPolicy).setCloningEnabled(true).setCompressor(compressor).create(name);
}
/**
* Creates a compressed region and adds a cache server to a peer.
* @param name the region name.
* @param dataPolicy the type of peer.
* @param compressor a compressor
* @return the newly created region.
* @throws IOException a problem occurred while created the cache server.
*/
private Region<String,String> createServerRegion(String name,DataPolicy dataPolicy,Compressor compressor) throws IOException {
Region<String,String> region = getCache().<String,String>createRegionFactory().setDataPolicy(dataPolicy).setCloningEnabled(true).setCompressor(compressor).create(name);
CacheServer server = getCache().addCacheServer();
server.start();
return region;
}
/**
* Creates a region and assigns a compressor.
* @param name the region name.
* @param dataPolicy the type of peer.
* @param compressor a compressor.
* @return the newly created region.
*/
private Region<String,String> createRegion(String name,DataPolicy dataPolicy,Compressor compressor,String diskStoreName) {
getCache().createDiskStoreFactory().create(diskStoreName);
return getCache().<String,String>createRegionFactory().setDataPolicy(dataPolicy).setDiskStoreName(diskStoreName).setCloningEnabled(true).setCompressor(compressor).create(name);
}
/**
* Creates a new ClientCacheFactory.
* @param dunitLocatorPort a locator port.
* @return the newly created ClientCacheFactory.
*/
private ClientCacheFactory getClientCacheFactory(int dunitLocatorPort) {
return new ClientCacheFactory().addPoolLocator("localhost", dunitLocatorPort).setPoolSubscriptionEnabled(true);
}
/**
* Returns the locator port.
*/
private int getLocatorPort() {
// Running from eclipse
if(DUnitLauncher.isLaunched()) {
String locatorString = DUnitLauncher.getLocatorString();
int index = locatorString.indexOf("[");
return Integer.parseInt(locatorString.substring(index+1, locatorString.length()-1));
}
// Running in hydra
else {
return getDUnitLocatorPort();
}
}
}