blob: f1d78a1e1b5cdb74aed890bd75e40a6fbc547b47 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.management.internal.cli.commands;
import static org.assertj.core.api.Assertions.assertThat;
import java.util.Arrays;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.w3c.dom.Document;
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;
import org.apache.geode.cache.Region;
import org.apache.geode.distributed.internal.InternalConfigurationPersistenceService;
import org.apache.geode.internal.cache.CachedDeserializable;
import org.apache.geode.internal.cache.CachedDeserializableFactory;
import org.apache.geode.internal.cache.EntryEventImpl;
import org.apache.geode.internal.cache.LocalRegion;
import org.apache.geode.internal.cache.RegionEntry;
import org.apache.geode.management.internal.configuration.domain.Configuration;
import org.apache.geode.management.internal.configuration.utils.XmlUtils;
import org.apache.geode.test.dunit.rules.ClusterStartupRule;
import org.apache.geode.test.dunit.rules.MemberVM;
import org.apache.geode.test.junit.categories.CompressionTest;
import org.apache.geode.test.junit.rules.GfshCommandRule;
import org.apache.geode.test.junit.rules.VMProvider;
@Category({CompressionTest.class})
public class AlterCompressorDUnitTest {
private static MemberVM locator;
private static MemberVM server1;
private static MemberVM server2;
private static MemberVM server3;
@ClassRule
public static ClusterStartupRule cluster = new ClusterStartupRule();
@ClassRule
public static GfshCommandRule gfsh = new GfshCommandRule();
@BeforeClass
public static void setupCluster() throws Exception {
locator = cluster.startLocatorVM(0);
server1 = cluster.startServerVM(1, "dataStore", locator.getPort());
server2 = cluster.startServerVM(2, "dataStore", locator.getPort());
server3 = cluster.startServerVM(3, "accessor", locator.getPort());
gfsh.connect(locator);
// create diskstores
gfsh.executeAndAssertThat(
"create disk-store --name=diskStore --groups=dataStore --dir=diskStore").statusIsSuccess();
// create regions
gfsh.executeAndAssertThat(
"create region --name=testRegion --type=REPLICATE_PERSISTENT --group=dataStore --disk-store=diskStore")
.statusIsSuccess();
gfsh.executeAndAssertThat(
"create region --name=testRegion --type=REPLICATE_PROXY --group=accessor")
.statusIsSuccess();
// Load Data only from server3 since region is replicated
server3.invoke(() -> {
Region<String, String> testRegion = ClusterStartupRule.getCache().getRegion("testRegion");
IntStream.range(0, 100).forEach(i -> testRegion.put("key" + i, "value" + i));
});
}
// This test is validating whether a persistent region can be altered to have a compressor.
// 1) Start the cluster, create a region with out compressor and load some data.
// 2) Being offline add a compressor by altering disk-store and cluster-config.
// 3) Bounce servers and validate that the region is recovered and data is compressed.
// 4) Again in offline remove the compressor and assert that region is recovered.
@Test
public void alterDiskCompressor() {
// verify no compressor to start with
gfsh.executeAndAssertThat("describe region --name=testRegion").statusIsSuccess()
.doesNotContainOutput("compressor");
gfsh.executeAndAssertThat("describe disk-store --member=server-1 --name=diskStore")
.statusIsSuccess().doesNotContainOutput("compressor");
// verify that server's region has no compressor and the data is not compressed
VMProvider.invokeInEveryMember(AlterCompressorDUnitTest::verifyRegionIsNotCompressed, server1,
server2);
// stop the VM
server1.stop(false);
server2.stop(false);
server3.stop(false);
// Alter disk-store & region to add compressor
Stream.of(server1, server2).forEach(server -> {
String diskDir = server.getWorkingDir() + "/diskStore";
// make sure offline diskstore has no compressor
gfsh.executeAndAssertThat(
"describe offline-disk-store --name=diskStore --disk-dirs=" + diskDir).statusIsSuccess()
.containsOutput("-compressor=none");
// alter diskstore to have compressor
gfsh.executeAndAssertThat("alter disk-store --name=diskStore --region=testRegion --disk-dirs="
+ diskDir + " --compressor=" + TestCompressor1.class.getName()).statusIsSuccess();
// describe again
gfsh.executeAndAssertThat(
"describe offline-disk-store --name=diskStore --disk-dirs=" + diskDir).statusIsSuccess()
.containsOutput("-compressor=" + TestCompressor1.class.getName());
});
locator.invoke(() -> {
// add the compressor to the region attributes and put it back in cluster config
// this is just a hack to change the cache.xml so that when server restarts it restarts
// with new region attributes
InternalConfigurationPersistenceService ccService =
ClusterStartupRule.getLocator().getConfigurationPersistenceService();
Configuration configuration = ccService.getConfiguration("dataStore");
String modifiedXml =
addCompressor(configuration.getCacheXmlContent(), TestCompressor1.class.getName());
configuration.setCacheXmlContent(modifiedXml);
ccService.getConfigurationRegion().put("dataStore", configuration);
});
// now start the VMs
startServers();
// verify that server's region has compressor added and the data is compressed
VMProvider.invokeInEveryMember(AlterCompressorDUnitTest::verifyRegionIsCompressed, server1);
// stop the VM
server1.stop(false);
server2.stop(false);
server3.stop(false);
Stream.of(server1, server2).forEach(server -> {
String diskDir = server.getWorkingDir() + "/diskStore";
// make sure offline diskstore has no compressor
gfsh.executeAndAssertThat(
"describe offline-disk-store --name=diskStore --disk-dirs=" + diskDir).statusIsSuccess()
.containsOutput("-compressor=" + TestCompressor1.class.getName());
// alter diskstore to have compressor
gfsh.executeAndAssertThat("alter disk-store --name=diskStore --region=testRegion --disk-dirs="
+ diskDir + " --compressor=none").statusIsSuccess();
// describe again
gfsh.executeAndAssertThat(
"describe offline-disk-store --name=diskStore --disk-dirs=" + diskDir).statusIsSuccess()
.containsOutput("-compressor=none");
});
locator.invoke(() -> {
// remove the compressor to the region attributes and put it back in cluster config
// this is just a hack to change the cache.xml so that when server restarts it restarts
// with new region attributes
InternalConfigurationPersistenceService ccService =
ClusterStartupRule.getLocator().getConfigurationPersistenceService();
Configuration configuration = ccService.getConfiguration("dataStore");
String modifiedXml = removeCompressor(configuration.getCacheXmlContent());
configuration.setCacheXmlContent(modifiedXml);
ccService.getConfigurationRegion().put("dataStore", configuration);
});
startServers();
// verify that server's region has no compressor and the data is not compressed
VMProvider.invokeInEveryMember(AlterCompressorDUnitTest::verifyRegionIsNotCompressed, server2);
}
private void startServers() {
// start server1 and server2 in parallel because they each has a replicate data store on disk
Arrays.asList(1, 2).parallelStream()
.forEach(id -> cluster.startServerVM(id, "dataStore", locator.getPort()));
cluster.startServerVM(3, "accessor", locator.getPort());
}
private static String addCompressor(String cacheXmlContent, String name) throws Exception {
Document document = XmlUtils.createDocumentFromXml(cacheXmlContent);
NodeList nodeList = document.getElementsByTagName("region-attributes");
Node compressor = document.createElement("compressor");
Node classname = document.createElement("class-name");
classname.setTextContent(name);
compressor.appendChild(classname);
nodeList.item(0).appendChild(compressor);
return XmlUtils.prettyXml(document.getFirstChild());
}
private static String removeCompressor(String cacheXmlContent) {
String beginTag = "<compressor>";
String endTag = "</compressor>";
int beginIndex = cacheXmlContent.indexOf(beginTag);
int endIndex = cacheXmlContent.indexOf(endTag);
return cacheXmlContent.substring(0, beginIndex)
+ cacheXmlContent.substring(endIndex + endTag.length());
}
private static void verifyRegionIsCompressed() {
// assert the region attributes
Region<String, String> testRegion = ClusterStartupRule.getCache().getRegion("testRegion");
assertThat(testRegion.getAttributes().getCompressor()).isNotNull();
assertThat(testRegion.getAttributes().getCompressor()).isInstanceOf(TestCompressor1.class);
// assert that entry value is actually compressed
TestCompressor1 compressor = new TestCompressor1();
LocalRegion localReg = ((LocalRegion) testRegion);
RegionEntry entry = localReg.entries.getEntry("key19");
Object value = entry.getValue();
assertThat(value).isInstanceOf(byte[].class);
byte[] actual = (byte[]) value;
byte[] expected = EntryEventImpl
.serialize(CachedDeserializableFactory.create(EntryEventImpl.serialize("value19"), null));
assertThat(actual).isEqualTo(compressor.compress(expected));
}
private static void verifyRegionIsNotCompressed() {
// assert the region attributes
Region<String, String> testRegion = ClusterStartupRule.getCache().getRegion("testRegion");
assertThat(testRegion.getAttributes().getCompressor()).isNull();
// assert that values are not compressed
LocalRegion localReg = ((LocalRegion) testRegion);
RegionEntry entry = localReg.entries.getEntry("key19");
Object value = entry.getValue();
assertThat(value).isInstanceOf(CachedDeserializable.class);
CachedDeserializable expected =
CachedDeserializableFactory.create(EntryEventImpl.serialize("value19"), null);
assertThat(((CachedDeserializable) value).getSerializedValue())
.isEqualTo(expected.getSerializedValue());
}
}