blob: 7f4e09725eeeb756eb7a40605fd8744c4cc1b95d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.redis.internal.executor;
import static org.apache.geode.test.dunit.rules.RedisClusterStartupRule.BIND_ADDRESS;
import static org.apache.geode.test.dunit.rules.RedisClusterStartupRule.REDIS_CLIENT_TIMEOUT;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import io.lettuce.core.cluster.ClusterClientOptions;
import io.lettuce.core.cluster.ClusterTopologyRefreshOptions;
import io.lettuce.core.cluster.RedisClusterClient;
import io.lettuce.core.cluster.api.sync.RedisAdvancedClusterCommands;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.exceptions.JedisMovedDataException;
import org.apache.geode.cache.control.RebalanceFactory;
import org.apache.geode.cache.control.ResourceManager;
import org.apache.geode.redis.internal.cluster.RedisMemberInfo;
import org.apache.geode.test.dunit.rules.ClusterStartupRule;
import org.apache.geode.test.dunit.rules.MemberVM;
import org.apache.geode.test.dunit.rules.RedisClusterStartupRule;
public class MovedDUnitTest {
@ClassRule
public static RedisClusterStartupRule clusterStartUp = new RedisClusterStartupRule();
private static Jedis jedis1;
private static Jedis jedis2;
private static RedisAdvancedClusterCommands<String, String> lettuce;
private static RedisClusterClient clusterClient;
private static MemberVM locator;
private static MemberVM server1;
private static int redisServerPort1;
private static final int ENTRIES = 200;
@BeforeClass
public static void classSetup() {
locator = clusterStartUp.startLocatorVM(0);
server1 = clusterStartUp.startRedisVM(1, locator.getPort());
clusterStartUp.startRedisVM(2, locator.getPort());
redisServerPort1 = clusterStartUp.getRedisPort(1);
int redisServerPort2 = clusterStartUp.getRedisPort(2);
jedis1 = new Jedis(BIND_ADDRESS, redisServerPort1, REDIS_CLIENT_TIMEOUT);
jedis2 = new Jedis(BIND_ADDRESS, redisServerPort2, REDIS_CLIENT_TIMEOUT);
clusterClient = RedisClusterClient.create("redis://localhost:" + redisServerPort1);
ClusterTopologyRefreshOptions refreshOptions =
ClusterTopologyRefreshOptions.builder()
.enableAllAdaptiveRefreshTriggers()
.build();
clusterClient.setOptions(ClusterClientOptions.builder()
.topologyRefreshOptions(refreshOptions)
.autoReconnect(true)
.validateClusterNodeMembership(false)
.build());
lettuce = clusterClient.connect().sync();
}
@AfterClass
public static void cleanup() {
try {
clusterClient.shutdown();
} catch (Exception ignored) {
// https://github.com/lettuce-io/lettuce-core/issues/1800
}
jedis1.close();
jedis2.close();
}
@Before
public void testSetup() {
clusterStartUp.flushAll();
}
@Test
public void testMovedResponse_fromWrongServer() {
int movedResponses = 0;
Jedis jedis;
for (int i = 0; i < ENTRIES; i++) {
String key = "key-" + i;
String value = "value-" + i;
// Always pick the wrong connection to use
RedisMemberInfo memberInfo = clusterStartUp.getMemberInfo(key);
jedis = memberInfo.getRedisPort() == redisServerPort1 ? jedis2 : jedis1;
try {
jedis.set(key, value);
} catch (JedisMovedDataException mex) {
movedResponses++;
}
}
assertThat(movedResponses).isEqualTo(ENTRIES);
}
@Test
public void testNoMovedResponse_fromCorrectServer() {
Jedis jedis;
for (int i = 0; i < ENTRIES; i++) {
String key = "key-" + i;
String value = "value-" + i;
// Always pick the right connection to use
RedisMemberInfo memberInfo = clusterStartUp.getMemberInfo(key);
jedis = memberInfo.getRedisPort() == redisServerPort1 ? jedis1 : jedis2;
assertThat(jedis.set(key, value)).isEqualTo("OK");
}
}
@Test
public void failedServerProducesMovedResponseAfterRestart() {
clusterStartUp.startRedisVM(3, locator.getPort());
rebalanceAllRegions(server1);
for (int i = 0; i < ENTRIES; i++) {
lettuce.set("key-" + i, "value-" + i);
}
clusterStartUp.crashVM(3);
rebalanceAllRegions(server1);
clusterStartUp.startRedisVM(3, locator.getPort());
Jedis jedis3 = new Jedis(BIND_ADDRESS, clusterStartUp.getRedisPort(3), REDIS_CLIENT_TIMEOUT);
for (int i = 0; i < ENTRIES; i++) {
String key = "key-" + i;
assertThatThrownBy(() -> jedis3.get(key))
.as("Key '" + key + "' did not produce MOVED response")
.hasMessageContaining("MOVED");
}
}
private static void rebalanceAllRegions(MemberVM vm) {
vm.invoke("Running rebalance", () -> {
ResourceManager manager = ClusterStartupRule.getCache().getResourceManager();
RebalanceFactory factory = manager.createRebalanceFactory();
try {
factory.start().getResults();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
}
}