blob: 393a15fa83b6615632d9724b93ae3e3acf0a7d9f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.compression;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import org.junit.Test;
import org.apache.geode.cache.DataPolicy;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.client.ClientCache;
import org.apache.geode.cache.client.ClientCacheFactory;
import org.apache.geode.cache.client.ClientRegionShortcut;
import org.apache.geode.cache.server.CacheServer;
import org.apache.geode.compression.Compressor;
import org.apache.geode.compression.SnappyCompressor;
import org.apache.geode.test.awaitility.GeodeAwaitility;
import org.apache.geode.test.dunit.DistributedTestUtils;
import org.apache.geode.test.dunit.Host;
import org.apache.geode.test.dunit.LogWriterUtils;
import org.apache.geode.test.dunit.SerializableCallable;
import org.apache.geode.test.dunit.SerializableRunnable;
import org.apache.geode.test.dunit.VM;
import org.apache.geode.test.dunit.WaitCriterion;
import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase;
import org.apache.geode.test.dunit.internal.DUnitLauncher;
/**
* Sanity checks on a number of basic cluster configurations with compression turned on.
*
*/
public class CompressionRegionConfigDUnitTest extends JUnit4CacheTestCase {
/**
* The name of our test region.
*/
public static final String REGION_NAME = "compressedRegion";
/**
* Name of the common disk store used by all tests.
*/
public static final String DISK_STORE = "diskStore";
/**
* A key.
*/
public static final String KEY_1 = "key1";
/**
* A value.
*/
public static final String VALUE_1 =
"Lorem ipsum dolor sit amet, consectetur adipiscing elit. Aliquam auctor bibendum tempus. Suspendisse potenti. Ut enim neque, mattis et mattis ac, vulputate quis leo. Cras a metus metus, eget cursus ipsum. Aliquam sagittis condimentum massa aliquet rhoncus. Aliquam sed luctus neque. In hac habitasse platea dictumst.";
public CompressionRegionConfigDUnitTest() {
super();
}
/**
* Sanity check using two peers sharing a replicated region.
*
*/
@Test
public void testReplicateRegion() throws Exception {
Compressor compressor = new SnappyCompressor();
assertTrue(createCompressedRegionOnVm(getVM(0), REGION_NAME, DataPolicy.REPLICATE, compressor));
assertTrue(createCompressedRegionOnVm(getVM(1), REGION_NAME, DataPolicy.REPLICATE, compressor));
assertNull(putUsingVM(getVM(0), KEY_1, VALUE_1));
waitOnPut(getVM(1), KEY_1);
assertEquals(VALUE_1, getUsingVM(getVM(1), KEY_1));
cleanup(getVM(0));
}
/**
* Sanity check for two peers sharing a persisted replicated region.
*
*/
@Test
public void testReplicatePersistentRegion() throws Exception {
Compressor compressor = new SnappyCompressor();
assertTrue(createCompressedRegionOnVm(getVM(0), REGION_NAME, DataPolicy.PERSISTENT_REPLICATE,
compressor, DISK_STORE));
assertTrue(createCompressedRegionOnVm(getVM(1), REGION_NAME, DataPolicy.PERSISTENT_REPLICATE,
compressor));
assertNull(putUsingVM(getVM(0), KEY_1, VALUE_1));
waitOnPut(getVM(1), KEY_1);
flushDiskStoreOnVM(getVM(0), DISK_STORE);
closeRegionOnVM(getVM(1), REGION_NAME);
assertTrue(createCompressedRegionOnVm(getVM(1), REGION_NAME, DataPolicy.PERSISTENT_REPLICATE,
compressor));
assertEquals(VALUE_1, getUsingVM(getVM(1), KEY_1));
cleanup(getVM(0));
}
/**
* Sanity check for two peers hosting a partitioned region.
*/
@Test
public void testPartitionedRegion() {
Compressor compressor = new SnappyCompressor();
assertTrue(createCompressedRegionOnVm(getVM(0), REGION_NAME, DataPolicy.PARTITION, compressor));
assertTrue(createCompressedRegionOnVm(getVM(1), REGION_NAME, DataPolicy.PARTITION, compressor));
assertNull(putUsingVM(getVM(0), KEY_1, VALUE_1));
waitOnPut(getVM(1), KEY_1);
assertEquals(VALUE_1, getUsingVM(getVM(1), KEY_1));
cleanup(getVM(0));
}
/**
* Sanity check for two peers hosting a persistent partitioned region.
*/
@Test
public void testPartitionedPersistentRegion() {
Compressor compressor = new SnappyCompressor();
assertTrue(createCompressedRegionOnVm(getVM(0), REGION_NAME, DataPolicy.PERSISTENT_PARTITION,
compressor, DISK_STORE));
assertTrue(createCompressedRegionOnVm(getVM(1), REGION_NAME, DataPolicy.PERSISTENT_PARTITION,
compressor));
assertNull(putUsingVM(getVM(0), KEY_1, VALUE_1));
waitOnPut(getVM(1), KEY_1);
flushDiskStoreOnVM(getVM(0), DISK_STORE);
closeRegionOnVM(getVM(1), REGION_NAME);
assertTrue(createCompressedRegionOnVm(getVM(1), REGION_NAME, DataPolicy.PERSISTENT_PARTITION,
compressor));
assertEquals(VALUE_1, getUsingVM(getVM(1), KEY_1));
cleanup(getVM(0));
}
/**
* Sanity check for a non caching client and a cache server.
*/
@Test
public void testClientProxyRegion() {
Compressor compressor = new SnappyCompressor();
assertTrue(
createCompressedServerRegionOnVm(getVM(0), REGION_NAME, DataPolicy.REPLICATE, compressor));
assertTrue(createCompressedClientRegionOnVm(getVM(1), REGION_NAME, compressor,
ClientRegionShortcut.PROXY));
assertNull(putUsingClientVM(getVM(1), KEY_1, VALUE_1));
assertEquals(VALUE_1, getUsingClientVM(getVM(1), KEY_1));
assertEquals(VALUE_1, getUsingVM(getVM(0), KEY_1));
cleanupClient(getVM(1));
cleanup(getVM(0));
}
/**
* Sanity check for a caching client and a cache server.
*/
@Test
public void testCachingClientProxyRegion() {
Compressor compressor = new SnappyCompressor();
assertTrue(
createCompressedServerRegionOnVm(getVM(0), REGION_NAME, DataPolicy.REPLICATE, compressor));
assertTrue(createCompressedClientRegionOnVm(getVM(1), REGION_NAME, compressor,
ClientRegionShortcut.CACHING_PROXY));
assertNull(putUsingClientVM(getVM(1), KEY_1, VALUE_1));
assertEquals(VALUE_1, getUsingClientVM(getVM(1), KEY_1));
assertEquals(VALUE_1, getUsingVM(getVM(0), KEY_1));
cleanupClient(getVM(1));
cleanup(getVM(0));
}
/**
* closes a region on a virtual machine.
*
* @param vm a virtual machine.
* @param region the region to close.
*/
private void closeRegionOnVM(final VM vm, final String region) {
vm.invoke(new SerializableRunnable() {
@Override
public void run() {
getCache().getRegion(region).close();
}
});
}
/**
* Flushes a disk store on a vm causing region entries to be written to disk.
*
* @param vm the virtual machine to perform the flush on.
* @param diskStore the disk store to flush.
*/
private void flushDiskStoreOnVM(final VM vm, final String diskStore) {
vm.invoke(new SerializableRunnable() {
@Override
public void run() {
getCache().findDiskStore(diskStore).flush();
}
});
}
/**
* Returns when a put has been replicated to a peer.
*
* @param vm a peer.
* @param key the key to wait on.
*/
private void waitOnPut(final VM vm, final String key) {
GeodeAwaitility.await().untilAsserted(new WaitCriterion() {
@Override
public boolean done() {
return (getUsingVM(vm, key) != null);
}
@Override
public String description() {
return "Waiting on " + key + " to replicate.";
}
});
}
/**
* Performs a put operation on a client virtual machine.
*
* @param vm a client.
* @param key the key to put.
* @param value the value to put.
* @return the old value.
*/
private String putUsingClientVM(final VM vm, final String key, final String value) {
return (String) vm.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
ClientCache cache = getClientCache(getClientCacheFactory(getLocatorPort()));
Region<String, String> region = cache.getRegion(REGION_NAME);
assertNotNull(region);
return region.put(key, value);
}
});
}
/**
* Performs a put operation on a peer.
*
* @param vm a peer.
* @param key the key to put.
* @param value the value to put.
* @return the old value.
*/
private String putUsingVM(final VM vm, final String key, final String value) {
return (String) vm.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
Region<String, String> region = getCache().getRegion(REGION_NAME);
assertNotNull(region);
return region.put(key, value);
}
});
}
/**
* Performs a get operation on a client.
*
* @param vm a client.
* @param key a region entry key.
* @return the value.
*/
private String getUsingClientVM(final VM vm, final String key) {
return (String) vm.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
ClientCache cache = getClientCache(getClientCacheFactory(getLocatorPort()));
Region<String, String> region = cache.getRegion(REGION_NAME);
assertNotNull(region);
return region.get(key);
}
});
}
/**
* Performs a get operation on a peer.
*
* @param vm the peer.
* @param key a region entry key.
* @return the value.
*/
private String getUsingVM(final VM vm, final String key) {
return (String) vm.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
Region<String, String> region = getCache().getRegion(REGION_NAME);
assertNotNull(region);
return region.get(key);
}
});
}
/**
* Returns the VM for a given identifier.
*
* @param vm a virtual machine identifier.
*/
private VM getVM(int vm) {
return Host.getHost(0).getVM(vm);
}
/**
* Closes opened regions on a client.
*
* @param vm a client.
*/
private void cleanupClient(final VM vm) {
vm.invoke(new SerializableRunnable() {
@Override
public void run() {
getClientCache(getClientCacheFactory(getLocatorPort())).getRegion(REGION_NAME).close();
}
});
}
/**
* Removes created regions from a VM.
*
* @param vm the virtual machine to cleanup.
*/
private void cleanup(final VM vm) {
vm.invoke(new SerializableRunnable() {
@Override
public void run() {
Region<String, String> region = getCache().getRegion(REGION_NAME);
assertNotNull(region);
region.destroyRegion();
}
});
}
/**
* Creates a region and assigns a compressor..
*
* @param vm a virtual machine to create the region on.
* @param name a region name.
* @param compressor a compressor.
* @return true if successfully created, otherwise false.
*/
private boolean createCompressedServerRegionOnVm(final VM vm, final String name,
final DataPolicy dataPolicy, final Compressor compressor) {
return (Boolean) vm.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
try {
assertNotNull(createServerRegion(name, dataPolicy, compressor));
} catch (Exception e) {
LogWriterUtils.getLogWriter().error("Could not create the compressed region", e);
return Boolean.FALSE;
}
return Boolean.TRUE;
}
});
}
/**
* Creates a region and assigns a compressor.
*
* @param vm a virtual machine to create the region on.
* @param name a region name.
* @param compressor a compressor.
* @return true if successfully created, otherwise false.
*/
private boolean createCompressedRegionOnVm(final VM vm, final String name,
final DataPolicy dataPolicy, final Compressor compressor) {
return (Boolean) vm.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
try {
assertNotNull(createRegion(name, dataPolicy, compressor));
} catch (Exception e) {
LogWriterUtils.getLogWriter().error("Could not create the compressed region", e);
return Boolean.FALSE;
}
return Boolean.TRUE;
}
});
}
/**
* Creates a region and assigns a compressor.
*
* @param vm a virtual machine to create the region on.
* @param name a region name.
* @param compressor a compressor.
* @return true if successfully created, otherwise false.
*/
private boolean createCompressedRegionOnVm(final VM vm, final String name,
final DataPolicy dataPolicy, final Compressor compressor, final String diskStoreName) {
return (Boolean) vm.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
try {
assertNotNull(createRegion(name, dataPolicy, compressor, diskStoreName));
} catch (Exception e) {
LogWriterUtils.getLogWriter().error("Could not create the compressed region", e);
return Boolean.FALSE;
}
return Boolean.TRUE;
}
});
}
/**
* Creates a compressed region on a client.
*
* @param vm the client.
* @param name the region.
* @param compressor a compressor.
* @param shortcut type of client.
* @return true if created, false otherwise.
*/
private boolean createCompressedClientRegionOnVm(final VM vm, final String name,
final Compressor compressor, final ClientRegionShortcut shortcut) {
return (Boolean) vm.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
try {
assertNotNull(createClientRegion(name, compressor, shortcut));
} catch (Exception e) {
LogWriterUtils.getLogWriter().error("Could not create the compressed region", e);
return Boolean.FALSE;
}
return Boolean.TRUE;
}
});
}
/**
* Creates a compressed region for a client.
*
* @param name a region.
* @param compressor a compressor.
* @param shortcut the type of client.
* @return the newly created region.
*/
private Region<String, String> createClientRegion(String name, Compressor compressor,
ClientRegionShortcut shortcut) {
ClientCacheFactory factory = getClientCacheFactory(getLocatorPort());
return getClientCache(factory).<String, String>createClientRegionFactory(shortcut)
.setCloningEnabled(true).setCompressor(compressor).create(name);
}
/**
* Creates a region and assigns a compressor.
*
* @param name the region name.
* @param dataPolicy the type of peer.
* @param compressor a compressor.
* @return the newly created region.
*/
private Region<String, String> createRegion(String name, DataPolicy dataPolicy,
Compressor compressor) {
return getCache().<String, String>createRegionFactory().setDataPolicy(dataPolicy)
.setCloningEnabled(true).setCompressor(compressor).create(name);
}
/**
* Creates a compressed region and adds a cache server to a peer.
*
* @param name the region name.
* @param dataPolicy the type of peer.
* @param compressor a compressor
* @return the newly created region.
* @throws IOException a problem occurred while created the cache server.
*/
private Region<String, String> createServerRegion(String name, DataPolicy dataPolicy,
Compressor compressor) throws IOException {
Region<String, String> region = getCache().<String, String>createRegionFactory()
.setDataPolicy(dataPolicy).setCloningEnabled(true).setCompressor(compressor).create(name);
CacheServer server = getCache().addCacheServer();
server.setPort(0);
server.start();
return region;
}
/**
* Creates a region and assigns a compressor.
*
* @param name the region name.
* @param dataPolicy the type of peer.
* @param compressor a compressor.
* @return the newly created region.
*/
private Region<String, String> createRegion(String name, DataPolicy dataPolicy,
Compressor compressor, String diskStoreName) {
getCache().createDiskStoreFactory().create(diskStoreName);
return getCache().<String, String>createRegionFactory().setDataPolicy(dataPolicy)
.setDiskStoreName(diskStoreName).setCloningEnabled(true).setCompressor(compressor)
.create(name);
}
/**
* Creates a new ClientCacheFactory.
*
* @param dunitLocatorPort a locator port.
* @return the newly created ClientCacheFactory.
*/
private ClientCacheFactory getClientCacheFactory(int dunitLocatorPort) {
return new ClientCacheFactory().addPoolLocator("localhost", dunitLocatorPort)
.setPoolSubscriptionEnabled(true);
}
/**
* Returns the locator port.
*/
private int getLocatorPort() {
// Running from eclipse
if (DUnitLauncher.isLaunched()) {
String locatorString = DUnitLauncher.getLocatorString();
int index = locatorString.indexOf("[");
return Integer.parseInt(locatorString.substring(index + 1, locatorString.length() - 1));
}
// Running in hydra
else {
return DistributedTestUtils.getDUnitLocatorPort();
}
}
}