blob: de5f0d3e19a5b6d69af467967eefd89d15a85f30 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.management.internal.cli.commands;
import static java.lang.Math.abs;
import static org.apache.geode.cache.Region.SEPARATOR;
import static org.apache.geode.distributed.ConfigurationProperties.LOG_LEVEL;
import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
import static org.assertj.core.api.Assertions.assertThat;
import java.util.Properties;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.apache.geode.cache.Cache;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionFactory;
import org.apache.geode.cache.RegionShortcut;
import org.apache.geode.cache.partition.PartitionRegionHelper;
import org.apache.geode.distributed.ConfigurationProperties;
import org.apache.geode.internal.AvailablePortHelper;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.management.DistributedRegionMXBean;
import org.apache.geode.management.ManagementService;
import org.apache.geode.management.internal.cli.result.model.ResultModel;
import org.apache.geode.test.dunit.rules.ClusterStartupRule;
import org.apache.geode.test.dunit.rules.MemberVM;
import org.apache.geode.test.junit.rules.GfshCommandRule;
public class RebalanceCommandDUnitTest {
private static final String SHARED_REGION_NAME = "GemfireDataCommandsDUnitTestRegion";
private static final String REGION1_NAME = "GemfireDataCommandsDUnitTestRegion1";
private static final String REGION2_NAME = "GemfireDataCommandsDUnitTestRegion2";
private static final Integer SERVER1_SHARED_REGION_SIZE = 10;
private static final Integer SERVER2_SHARED_REGION_SIZE = 100;
private static final Integer SERVER1_REGION1_SIZE = 100;
private static final Integer SERVER2_REGION2_SIZE = 10;
@Rule
public GfshCommandRule gfsh = new GfshCommandRule();
@Rule
public ClusterStartupRule cluster = new ClusterStartupRule();
private MemberVM server1;
private MemberVM server2;
private static int server1SharedRegionInitialSize, server2SharedRegionInitialSize,
server1Region1InitialSize, server2Region2InitialSize;
@Before
public void before() throws Exception {
MemberVM locator = cluster.startLocatorVM(0, locatorProperties());
server1 = cluster.startServerVM(1, locator.getPort());
server2 = cluster.startServerVM(2, locator.getPort());
gfsh.connectAndVerify(locator);
server1.invoke(() -> {
Cache cache = ClusterStartupRule.getCache();
RegionFactory<String, String> dataRegionFactory =
cache.createRegionFactory(RegionShortcut.PARTITION);
Region<String, String> region = dataRegionFactory.create(SHARED_REGION_NAME);
for (int i = 0; i < SERVER1_SHARED_REGION_SIZE; i++) {
region.put("key" + (i + 200), "value" + (i + 200));
}
region = dataRegionFactory.create(REGION1_NAME);
for (int i = 0; i < SERVER1_REGION1_SIZE; i++) {
region.put("key" + (i + 200), "value" + (i + 200));
}
});
server2.invoke(() -> {
Cache cache = ClusterStartupRule.getCache();
RegionFactory<String, String> dataRegionFactory =
cache.createRegionFactory(RegionShortcut.PARTITION);
Region<String, String> region = dataRegionFactory.create(SHARED_REGION_NAME);
for (int i = 0; i < SERVER2_SHARED_REGION_SIZE; i++) {
region.put("key" + (i + 400), "value" + (i + 400));
}
region = dataRegionFactory.create(REGION2_NAME);
for (int i = 0; i < SERVER2_REGION2_SIZE; i++) {
region.put("key" + (i + 200), "value" + (i + 200));
}
});
// check if DistributedRegionMXBean is available so that command will not fail
locator.invoke(RebalanceCommandDUnitTest::waitForManagerMBean);
server1SharedRegionInitialSize =
server1.invoke(() -> getLocalDataSizeForRegion(SHARED_REGION_NAME));
server2SharedRegionInitialSize =
server2.invoke(() -> getLocalDataSizeForRegion(SHARED_REGION_NAME));
server1Region1InitialSize = server1.invoke(() -> getLocalDataSizeForRegion(REGION1_NAME));
server2Region2InitialSize = server2.invoke(() -> getLocalDataSizeForRegion(REGION2_NAME));
}
@Test
public void testRegionNameInResultStartsWithSlash() {
final String REGION_NAME_WITH_SLASH = SEPARATOR + SHARED_REGION_NAME;
String command = "rebalance --include-region=" + SEPARATOR + SHARED_REGION_NAME;
gfsh.executeAndAssertThat(command).statusIsSuccess();
ResultModel result = gfsh.getCommandResult().getResultData();
assertThat(result.getTableSections().size()).isEqualTo(1);
assertThat(result.getTableSections().get(0).getHeader().contains(REGION_NAME_WITH_SLASH))
.isTrue();
}
@Test
public void testWithTimeOut() {
String command = "rebalance --time-out=1";
gfsh.executeAndAssertThat(command).statusIsSuccess();
assertRegionBalanced(SHARED_REGION_NAME);
}
@Test
public void testWithTimeOutAndRegion() {
String command = "rebalance --time-out=1 --include-region=" + SEPARATOR + SHARED_REGION_NAME;
gfsh.executeAndAssertThat(command).statusIsSuccess();
assertRegionBalanced(SHARED_REGION_NAME);
}
@Test
public void testWithSimulateAndRegion() {
String command = "rebalance --simulate=true --include-region=" + SEPARATOR + SHARED_REGION_NAME;
gfsh.executeAndAssertThat(command).statusIsSuccess();
assertAllRegionsUnchanged();
}
@Test
public void testWithSimulate() {
String command = "rebalance --simulate=true";
gfsh.executeAndAssertThat(command).statusIsSuccess();
assertAllRegionsUnchanged();
}
@Test
public void testWithTwoRegions() {
String command =
"rebalance --include-region=" + SEPARATOR + SHARED_REGION_NAME + "," + SEPARATOR
+ REGION2_NAME;
gfsh.executeAndAssertThat(command).statusIsSuccess();
assertRegionBalanced(SHARED_REGION_NAME);
assertThat(server2.invoke(() -> getLocalDataSizeForRegion(REGION2_NAME)))
.isEqualTo(server2Region2InitialSize);
}
@Test
public void testWithTwoSharedRegions() {
server1.invoke(() -> {
InternalCache cache = ClusterStartupRule.getCache();
RegionFactory<String, String> dataRegionFactory =
cache.createRegionFactory(RegionShortcut.PARTITION);
Region<String, String> region = dataRegionFactory.create(REGION2_NAME);
for (int i = 0; i < 15; i++) {
region.put("key" + (i + 210), "value" + (i + 210));
}
});
String command =
"rebalance --include-region=" + SEPARATOR + SHARED_REGION_NAME + "," + SEPARATOR
+ REGION2_NAME;
gfsh.executeAndAssertThat(command).statusIsSuccess();
assertRegionBalanced(SHARED_REGION_NAME);
assertRegionBalanced(REGION2_NAME);
assertThat(server1.invoke(() -> getLocalDataSizeForRegion(REGION1_NAME)))
.isEqualTo(server1Region1InitialSize);
}
@Test
public void testWithBadRegionNames() {
String command =
"rebalance --include-region=" + SEPARATOR + "randomGarbageString" + "," + SEPARATOR
+ "otherRandomGarbage";
gfsh.executeAndAssertThat(command).statusIsError();
assertAllRegionsUnchanged();
}
@Test
public void testWithOneGoodAndOneBadRegionName() {
String command =
"rebalance --include-region=" + SEPARATOR + SHARED_REGION_NAME + "," + SEPARATOR
+ "otherRandomGarbage";
gfsh.executeAndAssertThat(command).statusIsSuccess();
assertRegionBalanced(SHARED_REGION_NAME);
assertThat(server1.invoke(() -> getLocalDataSizeForRegion(REGION1_NAME)))
.isEqualTo(server1Region1InitialSize);
assertThat(server2.invoke(() -> getLocalDataSizeForRegion(REGION2_NAME)))
.isEqualTo(server2Region2InitialSize);
}
@Test
public void testWithNonSharedRegions() {
String command =
"rebalance --include-region=" + SEPARATOR + REGION1_NAME + "," + SEPARATOR + REGION2_NAME;
gfsh.executeAndAssertThat(command).statusIsError();
assertAllRegionsUnchanged();
}
@Test
public void testWithSimulateAndTimeout() {
String command = "rebalance --simulate=true --time-out=1";
gfsh.executeAndAssertThat(command).statusIsSuccess();
assertAllRegionsUnchanged();
}
@Test
public void testWithNoArgs() {
String command = "rebalance";
gfsh.executeAndAssertThat(command).statusIsSuccess();
assertRegionBalanced(SHARED_REGION_NAME);
assertThat(server1.invoke(() -> getLocalDataSizeForRegion(REGION1_NAME)))
.isEqualTo(server1Region1InitialSize);
assertThat(server2.invoke(() -> getLocalDataSizeForRegion(REGION2_NAME)))
.isEqualTo(server2Region2InitialSize);
}
@Test
public void testWithExcludedRegion() {
String command = "rebalance --exclude-region=" + SEPARATOR + REGION2_NAME;
gfsh.executeAndAssertThat(command).statusIsSuccess();
assertRegionBalanced(SHARED_REGION_NAME);
assertThat(server1.invoke(() -> getLocalDataSizeForRegion(REGION1_NAME)))
.isEqualTo(server1Region1InitialSize);
assertThat(server2.invoke(() -> getLocalDataSizeForRegion(REGION2_NAME)))
.isEqualTo(server2Region2InitialSize);
}
@Test
public void testWithExcludedSharedRegion() {
String command = "rebalance --exclude-region=" + SEPARATOR + SHARED_REGION_NAME;
gfsh.executeAndAssertThat(command).statusIsSuccess();
assertAllRegionsUnchanged();
}
@Test
public void testWithExcludedBadRegion() {
String command = "rebalance --exclude-region=" + SEPARATOR + "asdf";
gfsh.executeAndAssertThat(command).statusIsSuccess();
assertRegionBalanced(SHARED_REGION_NAME);
assertThat(server1.invoke(() -> getLocalDataSizeForRegion(REGION1_NAME)))
.isEqualTo(server1Region1InitialSize);
assertThat(server2.invoke(() -> getLocalDataSizeForRegion(REGION2_NAME)))
.isEqualTo(server2Region2InitialSize);
}
private void assertAllRegionsUnchanged() {
Integer sharedServer1Size = server1.invoke(() -> getLocalDataSizeForRegion(SHARED_REGION_NAME));
Integer sharedServer2Size = server2.invoke(() -> getLocalDataSizeForRegion(SHARED_REGION_NAME));
Integer region1Server1Size = server1.invoke(() -> getLocalDataSizeForRegion(REGION1_NAME));
Integer region2Server2Size = server2.invoke(() -> getLocalDataSizeForRegion(REGION2_NAME));
assertThat(sharedServer1Size).isEqualTo(server1SharedRegionInitialSize);
assertThat(sharedServer2Size).isEqualTo(server2SharedRegionInitialSize);
assertThat(region1Server1Size).isEqualTo(server1Region1InitialSize);
assertThat(region2Server2Size).isEqualTo(server2Region2InitialSize);
}
private void assertRegionBalanced(String regionName) {
Integer size1 = server1.invoke(() -> getLocalDataSizeForRegion(regionName));
Integer size2 = server2.invoke(() -> getLocalDataSizeForRegion(regionName));
assertThat(abs(size1 - size2)).isLessThanOrEqualTo(1);
}
private static Integer getLocalDataSizeForRegion(String regionName) {
InternalCache cache = ClusterStartupRule.getCache();
Region<?, ?> region = cache.getInternalRegionByPath(SEPARATOR + regionName);
return PartitionRegionHelper.getLocalData(region).size();
}
private static void waitForManagerMBean() {
await().until(() -> {
final ManagementService service =
ManagementService.getManagementService(ClusterStartupRule.getCache());
final DistributedRegionMXBean bean =
service.getDistributedRegionMXBean(SEPARATOR + SHARED_REGION_NAME);
return bean != null && bean.getMembers() != null && bean.getMembers().length > 1
&& bean.getMemberCount() > 0
&& service.getDistributedSystemMXBean().listRegions().length >= 2;
});
}
private Properties locatorProperties() {
int jmxPort = AvailablePortHelper.getRandomAvailableTCPPort();
Properties props = new Properties();
props.setProperty(MCAST_PORT, "0");
props.setProperty(LOG_LEVEL, "fine");
props.setProperty(ConfigurationProperties.JMX_MANAGER_HOSTNAME_FOR_CLIENTS, "localhost");
props.setProperty(ConfigurationProperties.JMX_MANAGER_PORT, "" + jmxPort);
return props;
}
}