GEODE-9368: Changes to support better resiliency for HA scenarios (#6686)
- Wrap all StripedExecutor.execute calls with
PartitionedRegion.computeWithPrimaryLocked.
- Handle RegionDestroyedException as a result of trying to lock a bucket
while the bucket is moving.
- Improve race condition handling in *ScanExecutor calls
- Rename responds with MOVED when the bucket is moving during the call.
- Convert tests to use Lettuce's RedisClusterClient
- Add a retry mechanism to various lettuce calls since the client does
not handle all failure cases.
- Fix TCL test script to ensure that buckets are created before running
the tests.
- Ignore RedisSessionDUnitTest for now - fix in GEODE-9437
diff --git a/ci/scripts/execute_redis_tests.sh b/ci/scripts/execute_redis_tests.sh
index f39fb28..c34c861 100755
--- a/ci/scripts/execute_redis_tests.sh
+++ b/ci/scripts/execute_redis_tests.sh
@@ -35,7 +35,15 @@
--name=server1 \
--compatible-with-redis-port=6380 \
--compatible-with-redis-bind-address=127.0.0.1 \
- --compatible-with-redis-password=foobar
+ --compatible-with-redis-password=foobar \
+ --server-port=0 \
+ --J=-Dgemfire.jmx-manager=true \
+ --J=-Dgemfire.jmx-manager-start=true \
+ --J=-Dgemfire.jmx-manager-port=1099
+
+# This will cause all buckets to be created
+../geode-assembly/build/install/apache-geode/bin/gfsh -e "connect --jmx-manager=localhost[1099]" \
+ -e "query --query='select count(*) from /REDIS_DATA'"
failCount=0
@@ -43,13 +51,21 @@
((failCount += $?))
+../geode-assembly/build/install/apache-geode/bin/gfsh stop server --dir=server1
../geode-assembly/build/install/apache-geode/bin/gfsh start server \
--J=-Denable-unsupported-commands=true \
--name=server2 \
--server-port=0 \
--compatible-with-redis-port=6379 \
- --compatible-with-redis-bind-address=127.0.0.1
+ --compatible-with-redis-bind-address=127.0.0.1 \
+ --J=-Dgemfire.jmx-manager=true \
+ --J=-Dgemfire.jmx-manager-start=true \
+ --J=-Dgemfire.jmx-manager-port=1099
+
+# This will cause all buckets to be created
+../geode-assembly/build/install/apache-geode/bin/gfsh -e "connect --jmx-manager=localhost[1099]" \
+ -e "query --query='select count(*) from /REDIS_DATA'"
./runtest --host 127.0.0.1 --port 6379 \
--single unit/type/set \
diff --git a/geode-apis-compatible-with-redis/src/acceptanceTest/java/session/NativeRedisSessionAcceptanceTest.java b/geode-apis-compatible-with-redis/src/acceptanceTest/java/session/NativeRedisSessionAcceptanceTest.java
index a0fc31b6..db53791 100644
--- a/geode-apis-compatible-with-redis/src/acceptanceTest/java/session/NativeRedisSessionAcceptanceTest.java
+++ b/geode-apis-compatible-with-redis/src/acceptanceTest/java/session/NativeRedisSessionAcceptanceTest.java
@@ -16,13 +16,11 @@
import org.junit.BeforeClass;
import org.junit.ClassRule;
-import org.junit.Ignore;
import org.junit.Test;
import org.apache.geode.redis.NativeRedisClusterTestRule;
import org.apache.geode.redis.session.RedisSessionDUnitTest;
-@Ignore("GEODE-9341")
public class NativeRedisSessionAcceptanceTest extends RedisSessionDUnitTest {
@ClassRule
diff --git a/geode-apis-compatible-with-redis/src/acceptanceTest/java/session/NativeRedisSessionExpirationAcceptanceTest.java b/geode-apis-compatible-with-redis/src/acceptanceTest/java/session/NativeRedisSessionExpirationAcceptanceTest.java
index a47f95a..4fc8716 100644
--- a/geode-apis-compatible-with-redis/src/acceptanceTest/java/session/NativeRedisSessionExpirationAcceptanceTest.java
+++ b/geode-apis-compatible-with-redis/src/acceptanceTest/java/session/NativeRedisSessionExpirationAcceptanceTest.java
@@ -16,13 +16,11 @@
import org.junit.BeforeClass;
import org.junit.ClassRule;
-import org.junit.Ignore;
import org.junit.Test;
import org.apache.geode.redis.NativeRedisClusterTestRule;
import org.apache.geode.redis.session.SessionExpirationDUnitTest;
-@Ignore("GEODE-9341")
public class NativeRedisSessionExpirationAcceptanceTest extends SessionExpirationDUnitTest {
@ClassRule
public static NativeRedisClusterTestRule redis = new NativeRedisClusterTestRule();
diff --git a/geode-apis-compatible-with-redis/src/commonTest/java/org/apache/geode/redis/GeodeRedisServerRule.java b/geode-apis-compatible-with-redis/src/commonTest/java/org/apache/geode/redis/GeodeRedisServerRule.java
index ccae2bb..454d09c 100644
--- a/geode-apis-compatible-with-redis/src/commonTest/java/org/apache/geode/redis/GeodeRedisServerRule.java
+++ b/geode-apis-compatible-with-redis/src/commonTest/java/org/apache/geode/redis/GeodeRedisServerRule.java
@@ -49,6 +49,12 @@
cache = cacheFactory.create();
server = new GeodeRedisServer("localhost", 0, (InternalCache) cache);
server.setAllowUnsupportedCommands(enableUnsupportedCommands);
+
+ // Ensure that buckets are created up front
+ try {
+ server.getRegionProvider().getSlotAdvisor().getBucketSlots();
+ } catch (InterruptedException ignored) {
+ }
}
public GeodeRedisServerRule withProperty(String property, String value) {
diff --git a/geode-apis-compatible-with-redis/src/commonTest/java/org/apache/geode/test/dunit/rules/RedisClusterStartupRule.java b/geode-apis-compatible-with-redis/src/commonTest/java/org/apache/geode/test/dunit/rules/RedisClusterStartupRule.java
index 017c8ed..bb382cf 100644
--- a/geode-apis-compatible-with-redis/src/commonTest/java/org/apache/geode/test/dunit/rules/RedisClusterStartupRule.java
+++ b/geode-apis-compatible-with-redis/src/commonTest/java/org/apache/geode/test/dunit/rules/RedisClusterStartupRule.java
@@ -21,6 +21,7 @@
import static org.apache.geode.distributed.ConfigurationProperties.REDIS_PORT;
import java.util.Properties;
+import java.util.Set;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
@@ -28,12 +29,18 @@
import org.apache.logging.log4j.core.config.Configurator;
import redis.clients.jedis.Jedis;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.partition.PartitionRegionHelper;
+import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.logging.internal.log4j.api.FastLogger;
import org.apache.geode.redis.ClusterNode;
import org.apache.geode.redis.ClusterNodes;
import org.apache.geode.redis.internal.GeodeRedisServer;
import org.apache.geode.redis.internal.GeodeRedisService;
+import org.apache.geode.redis.internal.RegionProvider;
import org.apache.geode.redis.internal.cluster.RedisMemberInfo;
+import org.apache.geode.redis.internal.data.RedisData;
+import org.apache.geode.redis.internal.data.RedisKey;
import org.apache.geode.test.awaitility.GeodeAwaitility;
import org.apache.geode.test.junit.rules.ServerStarterRule;
@@ -154,4 +161,58 @@
FastLogger.setDelegating(true);
});
}
+
+ /**
+ * Assuming a redundancy of 1, and at least 3 members, move the given key's primary bucket to a
+ * non-hosting member.
+ */
+ public DistributedMember moveBucketForKey(String key) {
+ return getMember(1).invoke("moveBucketForKey: " + key, () -> {
+ Region<RedisKey, RedisData> r = RedisClusterStartupRule.getCache()
+ .getRegion(RegionProvider.REDIS_DATA_REGION);
+
+ RedisKey redisKey = new RedisKey(key.getBytes());
+ DistributedMember primaryMember = PartitionRegionHelper.getPrimaryMemberForKey(r, redisKey);
+ Set<DistributedMember> allHosting = PartitionRegionHelper.getAllMembersForKey(r, redisKey);
+
+ // Returns all members, except the one calling.
+ Set<DistributedMember> allMembers = getCache().getMembers(r);
+ allMembers.add(getCache().getDistributedSystem().getDistributedMember());
+
+ allMembers.removeAll(allHosting);
+ DistributedMember targetMember = allMembers.stream().findFirst().orElseThrow(
+ () -> new IllegalStateException("No non-hosting member found for key: " + key));
+
+ PartitionRegionHelper.moveBucketByKey(r, primaryMember, targetMember, redisKey);
+
+ // Who is the primary now?
+ return PartitionRegionHelper.getPrimaryMemberForKey(r, redisKey);
+ });
+ }
+
+ /**
+ * Return some key of the form {@code prefix<N>}, (where {@code N} is an integer), for which the
+ * given VM is the primary bucket holder. This is useful in tests where one needs to ensure that
+ * a given key would be hosted on a given server.
+ */
+ public String getKeyOnServer(String keyPrefix, int vmId) {
+ return getMember(1).invoke("getKeyOnServer", () -> {
+ Region<RedisKey, RedisData> r = RedisClusterStartupRule.getCache()
+ .getRegion(RegionProvider.REDIS_DATA_REGION);
+
+ String server = "server-" + vmId;
+ String key;
+ int i = 0;
+ while (true) {
+ key = keyPrefix + i;
+ DistributedMember primaryMember =
+ PartitionRegionHelper.getPrimaryMemberForKey(r, new RedisKey(key.getBytes()));
+ if (primaryMember.getName().equals(server)) {
+ return key;
+ }
+ i++;
+ }
+ });
+ }
+
}
diff --git a/geode-apis-compatible-with-redis/src/distributedTest/java/org/apache/geode/redis/CheckPrimaryBucketFunction.java b/geode-apis-compatible-with-redis/src/distributedTest/java/org/apache/geode/redis/CheckPrimaryBucketFunction.java
index 02361a5..d679c9c 100644
--- a/geode-apis-compatible-with-redis/src/distributedTest/java/org/apache/geode/redis/CheckPrimaryBucketFunction.java
+++ b/geode-apis-compatible-with-redis/src/distributedTest/java/org/apache/geode/redis/CheckPrimaryBucketFunction.java
@@ -16,6 +16,7 @@
package org.apache.geode.redis;
+import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -82,7 +83,7 @@
}
}
- Runnable r = () -> {
+ Callable<Void> r = () -> {
if (!releaseLatchEarly) {
signalFunctionHasStarted.countDown();
}
@@ -97,6 +98,7 @@
e.printStackTrace();
result.lastResult(false);
}
+ return null;
};
LocalDataSet localDataSet = (LocalDataSet) localRegion;
@@ -105,6 +107,8 @@
partitionedRegion.computeWithPrimaryLocked(key, r);
} catch (PrimaryBucketLockException ex) {
throw new BucketMovedException(ex.toString());
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
}
}
diff --git a/geode-apis-compatible-with-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/CrashAndNoRepeatDUnitTest.java b/geode-apis-compatible-with-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/CrashAndNoRepeatDUnitTest.java
deleted file mode 100644
index 3055021..0000000
--- a/geode-apis-compatible-with-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/CrashAndNoRepeatDUnitTest.java
+++ /dev/null
@@ -1,277 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- *
- */
-
-package org.apache.geode.redis.internal.executor;
-
-import static org.assertj.core.api.Assertions.assertThat;
-
-import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-
-import io.lettuce.core.cluster.ClusterClientOptions;
-import io.lettuce.core.cluster.ClusterTopologyRefreshOptions;
-import io.lettuce.core.cluster.RedisClusterClient;
-import io.lettuce.core.cluster.api.sync.RedisAdvancedClusterCommands;
-import org.apache.logging.log4j.Level;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-import org.apache.logging.log4j.core.config.Configurator;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Ignore;
-import org.junit.Rule;
-import org.junit.Test;
-
-import org.apache.geode.cache.control.RebalanceFactory;
-import org.apache.geode.cache.control.RebalanceResults;
-import org.apache.geode.cache.control.ResourceManager;
-import org.apache.geode.logging.internal.log4j.api.FastLogger;
-import org.apache.geode.logging.internal.log4j.api.LogService;
-import org.apache.geode.test.dunit.rules.ClusterStartupRule;
-import org.apache.geode.test.dunit.rules.MemberVM;
-import org.apache.geode.test.dunit.rules.RedisClusterStartupRule;
-import org.apache.geode.test.junit.rules.ExecutorServiceRule;
-
-@Ignore("GEODE-9368")
-public class CrashAndNoRepeatDUnitTest {
-
- private static final Logger logger = LogService.getLogger();
-
- @ClassRule
- public static RedisClusterStartupRule clusterStartUp = new RedisClusterStartupRule();
-
- private static MemberVM locator;
- private static MemberVM server1;
- private static MemberVM server2;
- private static MemberVM server3;
- private static RedisClusterClient clusterClient;
- private static RedisAdvancedClusterCommands<String, String> lettuce;
-
- @Rule
- public ExecutorServiceRule executor = new ExecutorServiceRule();
-
- @BeforeClass
- public static void classSetup() throws Exception {
- locator = clusterStartUp.startLocatorVM(0);
- server1 = clusterStartUp.startRedisVM(1, locator.getPort());
- server2 = clusterStartUp.startRedisVM(2, locator.getPort());
- server3 = clusterStartUp.startRedisVM(3, locator.getPort());
-
- server1.invoke("Set logging level to DEBUG", () -> {
- Logger logger = LogManager.getLogger("org.apache.geode.redis.internal");
- Configurator.setAllLevels(logger.getName(), Level.getLevel("DEBUG"));
- FastLogger.setDelegating(true);
- });
-
- server2.invoke("Set logging level to DEBUG", () -> {
- Logger logger = LogManager.getLogger("org.apache.geode.redis.internal");
- Configurator.setAllLevels(logger.getName(), Level.getLevel("DEBUG"));
- FastLogger.setDelegating(true);
- });
-
- server3.invoke("Set logging level to DEBUG", () -> {
- Logger logger = LogManager.getLogger("org.apache.geode.redis.internal");
- Configurator.setAllLevels(logger.getName(), Level.getLevel("DEBUG"));
- FastLogger.setDelegating(true);
- });
-
- int redisServerPort1 = clusterStartUp.getRedisPort(1);
- clusterClient = RedisClusterClient.create("redis://localhost:" + redisServerPort1);
-
- ClusterTopologyRefreshOptions refreshOptions =
- ClusterTopologyRefreshOptions.builder()
- .enableAllAdaptiveRefreshTriggers()
- .refreshTriggersReconnectAttempts(1)
- .build();
-
- clusterClient.setOptions(ClusterClientOptions.builder()
- .topologyRefreshOptions(refreshOptions)
- .validateClusterNodeMembership(false)
- .build());
-
- lettuce = clusterClient.connect().sync();
- }
-
- @AfterClass
- public static void cleanup() {
- clusterClient.shutdown();
- }
-
- @Test
- public void givenServerCrashesDuringAPPEND_thenDataIsNotLost() throws Exception {
- AtomicBoolean running1 = new AtomicBoolean(true);
- AtomicBoolean running2 = new AtomicBoolean(false);
-
- Runnable task1 = () -> appendPerformAndVerify(1, 20000, running1);
- Runnable task2 = () -> appendPerformAndVerify(2, 20000, running1);
- Runnable task3 = () -> appendPerformAndVerify(3, 20000, running1);
- Runnable task4 = () -> appendPerformAndVerify(4, 1000, running2);
-
- Future<Void> future1 = executor.runAsync(task1);
- Future<Void> future2 = executor.runAsync(task2);
- Future<Void> future3 = executor.runAsync(task3);
- Future<Void> future4 = executor.runAsync(task4);
-
- future4.get();
- clusterStartUp.crashVM(2);
- server2 = clusterStartUp.startRedisVM(2, locator.getPort());
- rebalanceAllRegions(server2);
-
- clusterStartUp.crashVM(3);
- server3 = clusterStartUp.startRedisVM(3, locator.getPort());
- rebalanceAllRegions(server3);
-
- clusterStartUp.crashVM(2);
- server2 = clusterStartUp.startRedisVM(2, locator.getPort());
- rebalanceAllRegions(server2);
-
- clusterStartUp.crashVM(3);
- server3 = clusterStartUp.startRedisVM(3, locator.getPort());
- rebalanceAllRegions(server3);
-
- running1.set(false);
-
- future1.get();
- future2.get();
- future3.get();
- }
-
- @Test
- public void givenServerCrashesDuringRename_thenDataIsNotLost() throws Exception {
- AtomicBoolean running1 = new AtomicBoolean(true);
- AtomicBoolean running2 = new AtomicBoolean(true);
- AtomicBoolean running3 = new AtomicBoolean(true);
- AtomicBoolean running4 = new AtomicBoolean(false);
-
- AtomicReference<String> phase = new AtomicReference<>("STARTUP");
- Runnable task1 = () -> renamePerformAndVerify(1, 20000, running1, phase);
- Runnable task2 = () -> renamePerformAndVerify(2, 20000, running2, phase);
- Runnable task3 = () -> renamePerformAndVerify(3, 20000, running3, phase);
- Runnable task4 = () -> renamePerformAndVerify(4, 1000, running4, phase);
-
- Future<Void> future1 = executor.runAsync(task1);
- Future<Void> future2 = executor.runAsync(task2);
- Future<Void> future3 = executor.runAsync(task3);
- Future<Void> future4 = executor.runAsync(task4);
-
- future4.get();
- phase.set("CRASH 1 SERVER2");
- clusterStartUp.crashVM(2);
- phase.set("RESTART 1 SERVER2");
- server2 = clusterStartUp.startRedisVM(2, locator.getPort());
- phase.set("REBALANCE 1 SERVER2");
- rebalanceAllRegions(server2);
-
- phase.set("CRASH 2 SERVER3");
- clusterStartUp.crashVM(3);
- phase.set("RESTART 2 SERVER3");
- server3 = clusterStartUp.startRedisVM(3, locator.getPort());
- phase.set("REBALANCE 2 SERVER3");
- rebalanceAllRegions(server3);
-
- phase.set("CRASH 3 SERVER2");
- clusterStartUp.crashVM(2);
- phase.set("RESTART 3 SERVER2");
- server2 = clusterStartUp.startRedisVM(2, locator.getPort());
- phase.set("REBALANCE 3 SERVER2");
- rebalanceAllRegions(server2);
-
- phase.set("CRASH 4 SERVER3");
- clusterStartUp.crashVM(3);
- phase.set("RESTART 4 SERVER3");
- server3 = clusterStartUp.startRedisVM(3, locator.getPort());
- phase.set("REBALANCE 4 SERVER3");
- rebalanceAllRegions(server3);
-
- running1.set(false);
- running2.set(false);
- running3.set(false);
-
- future1.get();
- future2.get();
- future3.get();
- }
-
- private void renamePerformAndVerify(int index, int minimumIterations, AtomicBoolean isRunning,
- AtomicReference<String> phase) {
- String newKey;
- String baseKey = "rename-key-" + index;
- lettuce.set(baseKey + "-0", "value");
- int iterationCount = 0;
-
- while (iterationCount < minimumIterations || isRunning.get()) {
- String oldKey = baseKey + "-" + iterationCount;
- newKey = baseKey + "-" + (iterationCount + 1);
- // This try/catch is left for debugging and should be removed as part of GEODE-9368
- try {
- lettuce.rename(oldKey, newKey);
- assertThat(lettuce.exists(newKey)).as("key " + newKey + " should exist").isEqualTo(1);
- } catch (Exception exception) {
- System.err.println("---||| Exception on key " + newKey + " during phase: " + phase.get());
- exception.printStackTrace();
- isRunning.set(false);
- throw exception;
- }
- iterationCount += 1;
- }
-
- logger.info("--->>> RENAME test ran {} iterations", iterationCount);
- }
-
- private void appendPerformAndVerify(int index, int minimumIterations, AtomicBoolean isRunning) {
- String key = "append-key-" + index;
- int iterationCount = 0;
-
- while (iterationCount < minimumIterations || isRunning.get()) {
- String appendString = "-" + key + "-" + iterationCount + "-";
- lettuce.append(key, appendString);
- iterationCount += 1;
- }
-
- String storedString = lettuce.get(key);
- int idx = 0;
- for (int i = 0; i < iterationCount; i++) {
- String expectedValue = "-" + key + "-" + i + "-";
- String foundValue = storedString.substring(idx, idx + expectedValue.length());
- if (!expectedValue.equals(foundValue)) {
- Assert.fail("unexpected " + foundValue + " at index " + i + " iterationCount="
- + iterationCount + " in string "
- + storedString);
- break;
- }
- idx += expectedValue.length();
- }
-
- logger.info("--->>> APPEND test ran {} iterations", iterationCount);
- }
-
- private static void rebalanceAllRegions(MemberVM vm) {
- vm.invoke(() -> {
- ResourceManager manager = ClusterStartupRule.getCache().getResourceManager();
-
- RebalanceFactory factory = manager.createRebalanceFactory();
-
- try {
- RebalanceResults result = factory.start().getResults();
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- });
- }
-}
diff --git a/geode-apis-compatible-with-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/MovedDUnitTest.java b/geode-apis-compatible-with-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/MovedDUnitTest.java
index 95d29c3..7f4e097 100644
--- a/geode-apis-compatible-with-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/MovedDUnitTest.java
+++ b/geode-apis-compatible-with-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/MovedDUnitTest.java
@@ -82,7 +82,11 @@
@AfterClass
public static void cleanup() {
- clusterClient.shutdown();
+ try {
+ clusterClient.shutdown();
+ } catch (Exception ignored) {
+ // https://github.com/lettuce-io/lettuce-core/issues/1800
+ }
jedis1.close();
jedis2.close();
}
diff --git a/geode-apis-compatible-with-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/hash/HScanDunitTest.java b/geode-apis-compatible-with-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/hash/HScanDunitTest.java
index 712fc6a..7306064 100644
--- a/geode-apis-compatible-with-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/hash/HScanDunitTest.java
+++ b/geode-apis-compatible-with-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/hash/HScanDunitTest.java
@@ -23,12 +23,10 @@
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
import io.lettuce.core.MapScanCursor;
-import io.lettuce.core.RedisCommandExecutionException;
-import io.lettuce.core.RedisException;
import io.lettuce.core.ScanCursor;
import io.lettuce.core.cluster.ClusterClientOptions;
import io.lettuce.core.cluster.ClusterTopologyRefreshOptions;
@@ -37,18 +35,15 @@
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
-import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
-import org.apache.geode.cache.control.RebalanceFactory;
-import org.apache.geode.cache.control.ResourceManager;
-import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.logging.internal.log4j.api.LogService;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
import org.apache.geode.test.dunit.rules.MemberVM;
import org.apache.geode.test.dunit.rules.RedisClusterStartupRule;
import org.apache.geode.test.junit.rules.ExecutorServiceRule;
-@Ignore("GEODE-9368")
public class HScanDunitTest {
@ClassRule
@@ -62,17 +57,15 @@
private static MemberVM locator;
- static final String HASH_KEY = "key";
- static final String BASE_FIELD = "baseField_";
- static final Map<String, String> INITIAL_DATA_SET = makeEntrySet(1000);
+ private static final String HASH_KEY = "key";
+ private static final String BASE_FIELD = "baseField_";
+ private static final Map<String, String> INITIAL_DATA_SET = makeEntrySet(1000);
@BeforeClass
public static void classSetup() {
locator = redisClusterStartupRule.startLocatorVM(0);
int locatorPort = locator.getPort();
- // note: due to rules around member weighting in split-brain scenarios,
- // vm1 (server1) should not be crashed or it will cause additional (unrelated) failures
redisClusterStartupRule.startRedisVM(1, locatorPort);
redisClusterStartupRule.startRedisVM(2, locatorPort);
redisClusterStartupRule.startRedisVM(3, locatorPort);
@@ -96,78 +89,61 @@
@AfterClass
public static void tearDown() {
- clusterClient.shutdown();
+ try {
+ clusterClient.shutdown();
+ } catch (Exception ignored) {
+ // https://github.com/lettuce-io/lettuce-core/issues/1800
+ }
}
@Test
- public void should_allowHscanIterationToCompleteSuccessfullyGivenServerCrashesDuringIteration()
+ public void should_allowHscanIterationToCompleteSuccessfullyGivenBucketIsMoving()
throws ExecutionException, InterruptedException {
- AtomicBoolean keepCrashingVMs = new AtomicBoolean(true);
- AtomicInteger numberOfTimesVMCrashed = new AtomicInteger(0);
+ AtomicBoolean running = new AtomicBoolean(true);
Future<Void> hScanFuture = executor.runAsync(
- () -> doHScanContinuallyAndAssertOnResults(keepCrashingVMs, numberOfTimesVMCrashed));
+ () -> doHScanContinuallyAndAssertOnResults(running));
- Future<Void> crashingVmFuture = executor.runAsync(
- () -> crashAlternatingVMS(keepCrashingVMs, numberOfTimesVMCrashed));
+ for (int i = 0; i < 100 && running.get(); i++) {
+ redisClusterStartupRule.moveBucketForKey(HASH_KEY);
+ GeodeAwaitility.await().during(200, TimeUnit.MILLISECONDS).until(() -> true);
+ }
+ running.set(false);
hScanFuture.get();
- crashingVmFuture.get();
}
- private static void doHScanContinuallyAndAssertOnResults(AtomicBoolean keepCrashingVMs,
- AtomicInteger numberOfTimesServersCrashed) {
- int numberOfAssertionsCompleted = 0;
-
+ private static void doHScanContinuallyAndAssertOnResults(AtomicBoolean running) {
ScanCursor scanCursor = new ScanCursor("0", false);
List<String> allEntries = new ArrayList<>();
MapScanCursor<String, String> result;
+ int i = 0;
- while (numberOfAssertionsCompleted < 3 || numberOfTimesServersCrashed.get() < 3) {
-
+ while (running.get()) {
allEntries.clear();
scanCursor.setCursor("0");
scanCursor.setFinished(false);
+ do {
+ result = commands.hscan(HASH_KEY, scanCursor);
+ scanCursor.setCursor(result.getCursor());
+ Map<String, String> resultEntries = result.getMap();
+
+ resultEntries.forEach((key, value) -> allEntries.add(key));
+ } while (!result.isFinished());
+
try {
- do {
- result = commands.hscan(HASH_KEY, scanCursor);
- scanCursor.setCursor(result.getCursor());
- Map<String, String> resultEntries = result.getMap();
-
- resultEntries.forEach((key, value) -> allEntries.add(key));
-
- } while (!result.isFinished());
-
assertThat(allEntries).containsAll(INITIAL_DATA_SET.keySet());
- numberOfAssertionsCompleted++;
-
- } catch (RedisCommandExecutionException ignore) {
- } catch (RedisException ex) {
- if (!ex.getMessage().contains("Connection reset by peer")) {
- throw ex;
- } // ignore error
-
+ } catch (AssertionError ex) {
+ running.set(false);
+ throw ex;
}
+
+ i++;
}
- keepCrashingVMs.set(false);
- }
- private void crashAlternatingVMS(AtomicBoolean keepCrashingVMs,
- AtomicInteger numberOfTimesServersCrashed) {
-
- int vmToCrashToggle = 3;
- MemberVM vm;
-
- do {
- redisClusterStartupRule.crashVM(vmToCrashToggle);
- vm = redisClusterStartupRule.startRedisVM(vmToCrashToggle, locator.getPort());
- rebalanceAllRegions(vm);
- numberOfTimesServersCrashed.incrementAndGet();
- vmToCrashToggle = (vmToCrashToggle == 2) ? 3 : 2;
-
- } while (keepCrashingVMs.get());
+ LogService.getLogger().info("--->>> Completed {} iterations of HSCAN", i);
}
private static Map<String, String> makeEntrySet(int sizeOfDataSet) {
@@ -178,17 +154,4 @@
return dataSet;
}
- private static void rebalanceAllRegions(MemberVM vm) {
- vm.invoke(() -> {
-
- ResourceManager manager = ClusterStartupRule.getCache().getResourceManager();
- RebalanceFactory factory = manager.createRebalanceFactory();
-
- try {
- factory.start().getResults();
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- });
- }
}
diff --git a/geode-apis-compatible-with-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/hash/HashesAndCrashesDUnitTest.java b/geode-apis-compatible-with-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/hash/HashesAndCrashesDUnitTest.java
index caf3f86..7d62d51 100644
--- a/geode-apis-compatible-with-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/hash/HashesAndCrashesDUnitTest.java
+++ b/geode-apis-compatible-with-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/hash/HashesAndCrashesDUnitTest.java
@@ -92,7 +92,11 @@
@AfterClass
public static void cleanup() {
- clusterClient.shutdown();
+ try {
+ clusterClient.shutdown();
+ } catch (Exception ignored) {
+ // https://github.com/lettuce-io/lettuce-core/issues/1800
+ }
}
@Test
diff --git a/geode-apis-compatible-with-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/hash/HdelDUnitTest.java b/geode-apis-compatible-with-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/hash/HdelDUnitTest.java
index c206d3e..a80c36a 100644
--- a/geode-apis-compatible-with-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/hash/HdelDUnitTest.java
+++ b/geode-apis-compatible-with-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/hash/HdelDUnitTest.java
@@ -22,13 +22,14 @@
import java.util.concurrent.atomic.AtomicLong;
import io.lettuce.core.RedisException;
+import io.lettuce.core.cluster.ClusterClientOptions;
+import io.lettuce.core.cluster.ClusterTopologyRefreshOptions;
import io.lettuce.core.cluster.RedisClusterClient;
import io.lettuce.core.cluster.api.sync.RedisAdvancedClusterCommands;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
-import org.junit.Ignore;
import org.junit.Test;
import org.apache.geode.redis.ConcurrentLoopingThreads;
@@ -36,7 +37,6 @@
import org.apache.geode.test.dunit.rules.RedisClusterStartupRule;
import org.apache.geode.test.junit.rules.ExecutorServiceRule;
-@Ignore("GEODE-9368")
public class HdelDUnitTest {
@ClassRule
@@ -47,8 +47,6 @@
private static final int HASH_SIZE = 50000;
private static MemberVM locator;
- private static MemberVM server1;
- private static MemberVM server2;
private static RedisAdvancedClusterCommands<String, String> lettuce;
private static RedisClusterClient clusterClient;
@@ -56,18 +54,33 @@
public static void classSetup() {
locator = cluster.startLocatorVM(0);
- server1 = cluster.startRedisVM(1, locator.getPort());
- server2 = cluster.startRedisVM(2, locator.getPort());
+ cluster.startRedisVM(1, locator.getPort());
+ cluster.startRedisVM(2, locator.getPort());
int redisServerPort1 = cluster.getRedisPort(1);
clusterClient = RedisClusterClient.create("redis://localhost:" + redisServerPort1);
+ ClusterTopologyRefreshOptions refreshOptions =
+ ClusterTopologyRefreshOptions.builder()
+ .enableAllAdaptiveRefreshTriggers()
+ .refreshTriggersReconnectAttempts(1)
+ .build();
+
+ clusterClient.setOptions(ClusterClientOptions.builder()
+ .topologyRefreshOptions(refreshOptions)
+ .validateClusterNodeMembership(false)
+ .build());
+
lettuce = clusterClient.connect().sync();
}
@AfterClass
public static void cleanup() {
- clusterClient.shutdown();
+ try {
+ clusterClient.shutdown();
+ } catch (Exception ignored) {
+ // https://github.com/lettuce-io/lettuce-core/issues/1800
+ }
}
@Before
@@ -119,7 +132,7 @@
.start();
cluster.crashVM(2);
- server2 = cluster.startRedisVM(2, locator.getPort());
+ cluster.startRedisVM(2, locator.getPort());
loopingThreads.await();
assertThat(lettuce.hgetall(key).size())
diff --git a/geode-apis-compatible-with-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/key/RenameDUnitTest.java b/geode-apis-compatible-with-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/key/RenameDUnitTest.java
index 56a41c4..4768dc9 100644
--- a/geode-apis-compatible-with-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/key/RenameDUnitTest.java
+++ b/geode-apis-compatible-with-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/key/RenameDUnitTest.java
@@ -15,26 +15,34 @@
package org.apache.geode.redis.internal.executor.key;
+import static org.apache.geode.test.dunit.rules.RedisClusterStartupRule.BIND_ADDRESS;
+import static org.apache.geode.test.dunit.rules.RedisClusterStartupRule.REDIS_CLIENT_TIMEOUT;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.Callable;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
+import org.junit.Rule;
import org.junit.Test;
import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisCluster;
+import redis.clients.jedis.exceptions.JedisMovedDataException;
import org.apache.geode.redis.internal.data.RedisKey;
import org.apache.geode.redis.internal.executor.StripedExecutor;
@@ -42,30 +50,29 @@
import org.apache.geode.test.awaitility.GeodeAwaitility;
import org.apache.geode.test.dunit.rules.MemberVM;
import org.apache.geode.test.dunit.rules.RedisClusterStartupRule;
+import org.apache.geode.test.junit.rules.ExecutorServiceRule;
public class RenameDUnitTest {
@ClassRule
public static RedisClusterStartupRule clusterStartUp = new RedisClusterStartupRule(3);
- private static final String LOCAL_HOST = "127.0.0.1";
- private static final int JEDIS_TIMEOUT =
- Math.toIntExact(GeodeAwaitility.getTimeout().toMillis());
- private final ExecutorService pool = Executors.newCachedThreadPool();
+ @Rule
+ public ExecutorServiceRule executor = new ExecutorServiceRule();
private static JedisCluster jedisCluster;
private static MemberVM locator;
- private static MemberVM server1;
- private static MemberVM server2;
@BeforeClass
public static void setup() {
locator = clusterStartUp.startLocatorVM(0);
- server1 = clusterStartUp.startRedisVM(1, locator.getPort());
- server2 = clusterStartUp.startRedisVM(2, locator.getPort());
+ clusterStartUp.startRedisVM(1, locator.getPort());
+ clusterStartUp.startRedisVM(2, locator.getPort());
+ clusterStartUp.startRedisVM(3, locator.getPort());
int redisServerPort1 = clusterStartUp.getRedisPort(1);
- jedisCluster = new JedisCluster(new HostAndPort(LOCAL_HOST, redisServerPort1), JEDIS_TIMEOUT);
+ jedisCluster =
+ new JedisCluster(new HostAndPort(BIND_ADDRESS, redisServerPort1), REDIS_CLIENT_TIMEOUT);
}
@Before
@@ -76,9 +83,6 @@
@AfterClass
public static void tearDown() {
jedisCluster.close();
-
- server1.stop();
- server2.stop();
}
@Test
@@ -116,6 +120,20 @@
}
}
+ @Test
+ public void testRenameWithKeysOnDifferentServers_shouldReturnMovedError() {
+ int port1 = clusterStartUp.getRedisPort(1);
+ Jedis jedis = new Jedis(BIND_ADDRESS, port1, REDIS_CLIENT_TIMEOUT);
+
+ String srcKey = clusterStartUp.getKeyOnServer("key-", 1);
+ String dstKey = clusterStartUp.getKeyOnServer("key-", 2);
+
+ jedis.set(srcKey, "Fancy that");
+
+ assertThatThrownBy(() -> jedis.rename(srcKey, dstKey))
+ .isInstanceOf(JedisMovedDataException.class);
+ }
+
private Set<String> getKeysOnSameRandomStripe(int numKeysNeeded) {
Random random = new Random();
String key1 = "{rename}keyz" + random.nextInt();
@@ -161,30 +179,30 @@
String oldKey4 = listOfKeys4.get(0);
String newKey4 = listOfKeys4.get(1);
- Runnable renameOldKey1ToNewKey1 = () -> {
+ Callable<String> renameOldKey1ToNewKey1 = () -> {
cyclicBarrierAwait(startCyclicBarrier);
- jedisCluster.rename(oldKey1, newKey1);
+ return jedisCluster.rename(oldKey1, newKey1);
};
- Runnable renameOldKey2ToNewKey2 = () -> {
+ Callable<String> renameOldKey2ToNewKey2 = () -> {
cyclicBarrierAwait(startCyclicBarrier);
- jedisCluster.rename(oldKey2, newKey2);
+ return jedisCluster.rename(oldKey2, newKey2);
};
- Runnable renameOldKey3ToNewKey3 = () -> {
+ Callable<String> renameOldKey3ToNewKey3 = () -> {
cyclicBarrierAwait(startCyclicBarrier);
- jedisCluster.rename(oldKey3, newKey3);
+ return jedisCluster.rename(oldKey3, newKey3);
};
- Runnable renameOldKey4ToNewKey4 = () -> {
+ Callable<String> renameOldKey4ToNewKey4 = () -> {
cyclicBarrierAwait(startCyclicBarrier);
- jedisCluster.rename(oldKey4, newKey4);
+ return jedisCluster.rename(oldKey4, newKey4);
};
- Future<?> future1 = pool.submit(renameOldKey1ToNewKey1);
- Future<?> future2 = pool.submit(renameOldKey2ToNewKey2);
- Future<?> future3 = pool.submit(renameOldKey3ToNewKey3);
- Future<?> future4 = pool.submit(renameOldKey4ToNewKey4);
+ Future<?> future1 = executor.submit(renameOldKey1ToNewKey1);
+ Future<?> future2 = executor.submit(renameOldKey2ToNewKey2);
+ Future<?> future3 = executor.submit(renameOldKey3ToNewKey3);
+ Future<?> future4 = executor.submit(renameOldKey4ToNewKey4);
future1.get();
future2.get();
@@ -192,6 +210,57 @@
future4.get();
}
+ @Test
+ public void givenBucketsMoveDuringRename_thenDataIsNotLost() throws Exception {
+ AtomicBoolean running = new AtomicBoolean(true);
+
+ List<String> hashtags = new ArrayList<>();
+ hashtags.add(clusterStartUp.getKeyOnServer("rename", 1));
+ hashtags.add(clusterStartUp.getKeyOnServer("rename", 2));
+ hashtags.add(clusterStartUp.getKeyOnServer("rename", 3));
+
+ Runnable task1 = () -> renamePerformAndVerify(1, 10000, hashtags.get(0), running);
+ Runnable task2 = () -> renamePerformAndVerify(2, 10000, hashtags.get(1), running);
+ Runnable task3 = () -> renamePerformAndVerify(3, 10000, hashtags.get(2), running);
+
+ Future<Void> future1 = executor.runAsync(task1);
+ Future<Void> future2 = executor.runAsync(task2);
+ Future<Void> future3 = executor.runAsync(task3);
+
+ for (int i = 0; i < 100 && running.get(); i++) {
+ clusterStartUp.moveBucketForKey(hashtags.get(i % hashtags.size()));
+ GeodeAwaitility.await().during(Duration.ofMillis(200)).until(() -> true);
+ }
+
+ running.set(false);
+
+ future1.get();
+ future2.get();
+ future3.get();
+ }
+
+ private void renamePerformAndVerify(int index, int minimumIterations, String hashtag,
+ AtomicBoolean isRunning) {
+ String baseKey = "{" + hashtag + "}-key-" + index;
+ jedisCluster.set(baseKey + "-0", "value");
+ int iterationCount = 0;
+
+ while (iterationCount < minimumIterations || isRunning.get()) {
+ String oldKey = baseKey + "-" + iterationCount;
+ String newKey = baseKey + "-" + (iterationCount + 1);
+ try {
+ jedisCluster.rename(oldKey, newKey);
+ } catch (Exception ex) {
+ isRunning.set(false);
+ throw new RuntimeException("Exception performing RENAME " + oldKey + " " + newKey, ex);
+ }
+
+ assertThat(jedisCluster.exists(newKey))
+ .as("key " + newKey + " should exist").isTrue();
+ iterationCount += 1;
+ }
+ }
+
private void cyclicBarrierAwait(CyclicBarrier startCyclicBarrier) {
try {
startCyclicBarrier.await();
diff --git a/geode-apis-compatible-with-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/string/StringsDUnitTest.java b/geode-apis-compatible-with-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/string/StringsDUnitTest.java
index e900f6e..a93084c 100644
--- a/geode-apis-compatible-with-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/string/StringsDUnitTest.java
+++ b/geode-apis-compatible-with-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/string/StringsDUnitTest.java
@@ -15,18 +15,25 @@
package org.apache.geode.redis.internal.executor.string;
+import static org.apache.geode.test.dunit.rules.RedisClusterStartupRule.BIND_ADDRESS;
+import static org.apache.geode.test.dunit.rules.RedisClusterStartupRule.REDIS_CLIENT_TIMEOUT;
import static org.assertj.core.api.Assertions.assertThat;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import org.junit.After;
import org.junit.AfterClass;
+import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
+import org.junit.Rule;
import org.junit.Test;
import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.JedisCluster;
@@ -36,32 +43,32 @@
import org.apache.geode.test.awaitility.GeodeAwaitility;
import org.apache.geode.test.dunit.rules.MemberVM;
import org.apache.geode.test.dunit.rules.RedisClusterStartupRule;
+import org.apache.geode.test.junit.rules.ExecutorServiceRule;
public class StringsDUnitTest {
@ClassRule
public static RedisClusterStartupRule clusterStartUp = new RedisClusterStartupRule(4);
- private static final String LOCAL_HOST = "127.0.0.1";
+ @Rule
+ public ExecutorServiceRule executor = new ExecutorServiceRule();
+
private static final int LIST_SIZE = 1000;
private static final int NUM_ITERATIONS = 1000;
- private static final int JEDIS_TIMEOUT = Math.toIntExact(GeodeAwaitility.getTimeout().toMillis());
private static JedisCluster jedisCluster;
private static MemberVM locator;
- private static MemberVM server1;
- private static MemberVM server2;
- private static MemberVM server3;
@BeforeClass
public static void classSetup() {
locator = clusterStartUp.startLocatorVM(0);
- server1 = clusterStartUp.startRedisVM(1, locator.getPort());
- server2 = clusterStartUp.startRedisVM(2, locator.getPort());
- server3 = clusterStartUp.startRedisVM(3, locator.getPort());
+ clusterStartUp.startRedisVM(1, locator.getPort());
+ clusterStartUp.startRedisVM(2, locator.getPort());
+ clusterStartUp.startRedisVM(3, locator.getPort());
int redisServerPort1 = clusterStartUp.getRedisPort(1);
- jedisCluster = new JedisCluster(new HostAndPort(LOCAL_HOST, redisServerPort1), JEDIS_TIMEOUT);
+ jedisCluster =
+ new JedisCluster(new HostAndPort(BIND_ADDRESS, redisServerPort1), REDIS_CLIENT_TIMEOUT);
}
@After
@@ -72,10 +79,6 @@
@AfterClass
public static void tearDown() {
jedisCluster.close();
-
- server1.stop();
- server2.stop();
- server3.stop();
}
@Test
@@ -234,6 +237,73 @@
}
}
+ @Test
+ public void givenBucketsMoveDuringAppend_thenDataIsNotLost() throws Exception {
+ AtomicBoolean running = new AtomicBoolean(true);
+
+ List<String> hashtags = new ArrayList<>();
+ hashtags.add(clusterStartUp.getKeyOnServer("append", 1));
+ hashtags.add(clusterStartUp.getKeyOnServer("append", 2));
+ hashtags.add(clusterStartUp.getKeyOnServer("append", 3));
+
+ Runnable task1 = () -> appendPerformAndVerify(1, 10000, hashtags.get(0), running);
+ Runnable task2 = () -> appendPerformAndVerify(2, 10000, hashtags.get(1), running);
+ Runnable task3 = () -> appendPerformAndVerify(3, 10000, hashtags.get(2), running);
+
+ Future<Void> future1 = executor.runAsync(task1);
+ Future<Void> future2 = executor.runAsync(task2);
+ Future<Void> future3 = executor.runAsync(task3);
+
+ for (int i = 0; i < 100 && running.get(); i++) {
+ clusterStartUp.moveBucketForKey(hashtags.get(i % hashtags.size()));
+ GeodeAwaitility.await().during(Duration.ofMillis(200)).until(() -> true);
+ }
+
+ for (int i = 0; i < 100 && running.get(); i++) {
+ clusterStartUp.moveBucketForKey(hashtags.get(i % hashtags.size()));
+ GeodeAwaitility.await().during(Duration.ofMillis(200)).until(() -> true);
+ }
+
+ running.set(false);
+
+ future1.get();
+ future2.get();
+ future3.get();
+ }
+
+ private void appendPerformAndVerify(int index, int minimumIterations, String hashtag,
+ AtomicBoolean isRunning) {
+ String key = "{" + hashtag + "}-key-" + index;
+ int iterationCount = 0;
+
+ while (iterationCount < minimumIterations || isRunning.get()) {
+ String appendString = "-" + key + "-" + iterationCount + "-";
+ try {
+ jedisCluster.append(key, appendString);
+ } catch (Exception ex) {
+ isRunning.set(false);
+ throw new RuntimeException("Exception performing APPEND " + appendString, ex);
+ }
+ iterationCount += 1;
+ }
+
+ String storedString = jedisCluster.get(key);
+
+ int idx = 0;
+ int i = 0;
+ while (i < iterationCount) {
+ String expectedValue = "-" + key + "-" + i + "-";
+ String foundValue = storedString.substring(idx, idx + expectedValue.length());
+ if (!foundValue.equals(expectedValue)) {
+ Assert.fail("unexpected " + foundValue + " at index " + i + " iterationCount="
+ + iterationCount + " in string "
+ + storedString);
+ break;
+ }
+ idx += expectedValue.length();
+ i++;
+ }
+ }
private List<String> makeStringList(int setSize, String baseString) {
List<String> strings = new ArrayList<>();
diff --git a/geode-apis-compatible-with-redis/src/distributedTest/java/org/apache/geode/redis/session/RedisSessionDUnitTest.java b/geode-apis-compatible-with-redis/src/distributedTest/java/org/apache/geode/redis/session/RedisSessionDUnitTest.java
index b2e5acb..5b62ea3 100644
--- a/geode-apis-compatible-with-redis/src/distributedTest/java/org/apache/geode/redis/session/RedisSessionDUnitTest.java
+++ b/geode-apis-compatible-with-redis/src/distributedTest/java/org/apache/geode/redis/session/RedisSessionDUnitTest.java
@@ -29,7 +29,7 @@
import org.apache.geode.test.junit.rules.ExecutorServiceRule;
-@Ignore("GEODE-9341")
+@Ignore("GEODE-9437")
public class RedisSessionDUnitTest extends SessionDUnitTest {
@Rule
diff --git a/geode-apis-compatible-with-redis/src/distributedTest/java/org/apache/geode/redis/session/SessionDUnitTest.java b/geode-apis-compatible-with-redis/src/distributedTest/java/org/apache/geode/redis/session/SessionDUnitTest.java
index 3f62a3b..a13898a 100644
--- a/geode-apis-compatible-with-redis/src/distributedTest/java/org/apache/geode/redis/session/SessionDUnitTest.java
+++ b/geode-apis-compatible-with-redis/src/distributedTest/java/org/apache/geode/redis/session/SessionDUnitTest.java
@@ -16,7 +16,6 @@
package org.apache.geode.redis.session;
-
import java.net.HttpCookie;
import java.time.Duration;
import java.util.ArrayList;
@@ -29,15 +28,13 @@
import io.github.resilience4j.retry.Retry;
import io.github.resilience4j.retry.RetryConfig;
import io.github.resilience4j.retry.RetryRegistry;
+import io.lettuce.core.RedisConnectionException;
import io.lettuce.core.cluster.ClusterClientOptions;
import io.lettuce.core.cluster.ClusterTopologyRefreshOptions;
import io.lettuce.core.cluster.RedisClusterClient;
import io.lettuce.core.cluster.api.StatefulRedisClusterConnection;
import io.lettuce.core.cluster.api.sync.RedisAdvancedClusterCommands;
-import org.apache.logging.log4j.Level;
-import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
-import org.apache.logging.log4j.core.config.Configurator;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
@@ -54,7 +51,6 @@
import org.springframework.web.client.RestTemplate;
import org.apache.geode.internal.AvailablePortHelper;
-import org.apache.geode.logging.internal.log4j.api.FastLogger;
import org.apache.geode.logging.internal.log4j.api.LogService;
import org.apache.geode.redis.session.springRedisTestApplication.RedisSpringTestApplication;
import org.apache.geode.test.awaitility.GeodeAwaitility;
@@ -103,7 +99,8 @@
protected static void setupRetry() {
RetryConfig config = RetryConfig.custom()
.maxAttempts(20)
- .retryExceptions(HttpServerErrorException.InternalServerError.class)
+ .retryExceptions(HttpServerErrorException.InternalServerError.class,
+ RedisConnectionException.class)
.build();
RetryRegistry registry = RetryRegistry.of(config);
retry = registry.retry("sessions");
@@ -140,7 +137,7 @@
redisClient.setOptions(ClusterClientOptions.builder()
.topologyRefreshOptions(refreshOptions)
- .autoReconnect(true)
+ .validateClusterNodeMembership(false)
.build());
connection = redisClient.connect();
commands = connection.sync();
@@ -167,6 +164,7 @@
try {
redisClient.shutdown();
} catch (Exception ignored) {
+ // https://github.com/lettuce-io/lettuce-core/issues/1800
}
}
@@ -177,12 +175,7 @@
protected static void startRedisServer(int server) {
cluster.startRedisVM(server, cluster.getMember(LOCATOR).getPort());
-
- cluster.getVM(server).invoke("Set logging level to DEBUG", () -> {
- Logger logger = LogManager.getLogger("org.apache.geode.redis.internal");
- Configurator.setAllLevels(logger.getName(), Level.getLevel("DEBUG"));
- FastLogger.setDelegating(true);
- });
+ cluster.enableDebugLogging(server);
}
protected static void startSpringApp(int sessionApp, long sessionTimeout, int... serverPorts) {
diff --git a/geode-apis-compatible-with-redis/src/distributedTest/java/org/apache/geode/redis/session/SessionExpirationDUnitTest.java b/geode-apis-compatible-with-redis/src/distributedTest/java/org/apache/geode/redis/session/SessionExpirationDUnitTest.java
index 873fca9..2d937ff 100644
--- a/geode-apis-compatible-with-redis/src/distributedTest/java/org/apache/geode/redis/session/SessionExpirationDUnitTest.java
+++ b/geode-apis-compatible-with-redis/src/distributedTest/java/org/apache/geode/redis/session/SessionExpirationDUnitTest.java
@@ -25,7 +25,6 @@
import java.util.concurrent.TimeUnit;
import org.junit.BeforeClass;
-import org.junit.Ignore;
import org.junit.Test;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
@@ -39,7 +38,6 @@
import org.apache.geode.test.awaitility.GeodeAwaitility;
import org.apache.geode.test.dunit.rules.ClusterStartupRule;
-@Ignore("GEODE-9341")
public class SessionExpirationDUnitTest extends SessionDUnitTest {
protected static final int SHORT_SESSION_TIMEOUT = 5;
diff --git a/geode-apis-compatible-with-redis/src/distributedTest/java/org/apache/geode/redis/session/springRedisTestApplication/config/WebMvcConfig.java b/geode-apis-compatible-with-redis/src/distributedTest/java/org/apache/geode/redis/session/springRedisTestApplication/config/WebMvcConfig.java
index 9788bc8..704b336 100755
--- a/geode-apis-compatible-with-redis/src/distributedTest/java/org/apache/geode/redis/session/springRedisTestApplication/config/WebMvcConfig.java
+++ b/geode-apis-compatible-with-redis/src/distributedTest/java/org/apache/geode/redis/session/springRedisTestApplication/config/WebMvcConfig.java
@@ -54,6 +54,7 @@
return LettuceClientConfiguration.builder()
.clientOptions(ClusterClientOptions.builder()
.topologyRefreshOptions(refreshOptions)
+ .validateClusterNodeMembership(false)
.build())
.build();
}
diff --git a/geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/connection/AuthIntegrationTest.java b/geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/connection/AuthIntegrationTest.java
index f7c8cad..c54ea52 100644
--- a/geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/connection/AuthIntegrationTest.java
+++ b/geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/connection/AuthIntegrationTest.java
@@ -53,7 +53,7 @@
cache.close();
}
- public void setupCacheWithPassword() {
+ public void setupCacheWithPassword() throws Exception {
port = AvailablePortHelper.getRandomAvailableTCPPort();
CacheFactory cf = new CacheFactory();
cf.set(LOG_LEVEL, "error");
@@ -62,6 +62,7 @@
cf.set(ConfigurationProperties.REDIS_PASSWORD, PASSWORD);
cache = cf.create();
server = new GeodeRedisServer("localhost", port, (InternalCache) cache);
+ server.getRegionProvider().getSlotAdvisor().getBucketSlots();
this.jedis = new Jedis("localhost", port, 100000);
}
@@ -77,7 +78,7 @@
}
@Test
- public void testAuthIncorrectNumberOfArguments() {
+ public void testAuthIncorrectNumberOfArguments() throws Exception {
setupCacheWithPassword();
assertThatThrownBy(() -> jedis.sendCommand(Protocol.Command.AUTH))
.hasMessageContaining("wrong number of arguments");
@@ -86,14 +87,14 @@
}
@Test
- public void testAuthConfig() {
+ public void testAuthConfig() throws Exception {
setupCacheWithPassword();
InternalDistributedSystem iD = (InternalDistributedSystem) cache.getDistributedSystem();
assertThat(iD.getConfig().getRedisPassword()).isEqualTo(PASSWORD);
}
@Test
- public void testAuthRejectAccept() {
+ public void testAuthRejectAccept() throws Exception {
setupCacheWithPassword();
assertThatThrownBy(() -> jedis.auth("wrongpwd"))
@@ -111,7 +112,7 @@
}
@Test
- public void testAuthAcceptRequests() {
+ public void testAuthAcceptRequests() throws Exception {
setupCacheWithPassword();
assertThatThrownBy(() -> jedis.set("foo", "bar"))
@@ -124,7 +125,7 @@
}
@Test
- public void testSeparateClientRequests() {
+ public void testSeparateClientRequests() throws Exception {
setupCacheWithPassword();
Jedis nonAuthorizedJedis = new Jedis("localhost", getPort(), 100000);
Jedis authorizedJedis = new Jedis("localhost", getPort(), 100000);
@@ -141,7 +142,7 @@
}
@Test
- public void lettuceAuthClient_withLettuceVersion6() {
+ public void lettuceAuthClient_withLettuceVersion6() throws Exception {
setupCacheWithPassword();
RedisURI uri = RedisURI.create(String.format("redis://%s@localhost:%d", PASSWORD, getPort()));
diff --git a/geode-apis-compatible-with-redis/src/integrationTest/resources/org/apache/geode/codeAnalysis/excludedClasses.txt b/geode-apis-compatible-with-redis/src/integrationTest/resources/org/apache/geode/codeAnalysis/excludedClasses.txt
index bdf1b26..8e7d92f 100644
--- a/geode-apis-compatible-with-redis/src/integrationTest/resources/org/apache/geode/codeAnalysis/excludedClasses.txt
+++ b/geode-apis-compatible-with-redis/src/integrationTest/resources/org/apache/geode/codeAnalysis/excludedClasses.txt
@@ -3,7 +3,8 @@
org/apache/geode/redis/internal/collections/SizeableObjectOpenCustomHashSet
org/apache/geode/redis/internal/collections/IndexibleTreeSet
org/apache/geode/redis/internal/data/RedisDataMovedException
+org/apache/geode/redis/internal/data/RedisDataTypeMismatchException
org/apache/geode/redis/internal/RedisException
-org/apache/geode/redis/internal/RedisRestoreKeyExistsException
+org/apache/geode/redis/internal/data/RedisRestoreKeyExistsException
org/apache/geode/redis/internal/data/RedisSetCommandsFunctionExecutor$SetOp
org/apache/geode/redis/internal/data/RedisStringCommandsFunctionExecutor$BitOp
diff --git a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/RedisException.java b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/RedisException.java
index ef1ea59..718df77 100755
--- a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/RedisException.java
+++ b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/RedisException.java
@@ -14,6 +14,7 @@
*/
package org.apache.geode.redis.internal;
+
/**
* A general exception that can be thrown during command execution and will result in a Redis
* protocol exception message being returned to the client.
@@ -30,6 +31,10 @@
super(message);
}
+ public RedisException(Throwable throwable) {
+ super(throwable);
+ }
+
public RedisException(String message, Throwable throwable) {
super(message, throwable);
}
diff --git a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/RegionProvider.java b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/RegionProvider.java
index 46841e7..3728159 100644
--- a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/RegionProvider.java
+++ b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/RegionProvider.java
@@ -35,6 +35,7 @@
import org.apache.geode.cache.PartitionAttributesFactory;
import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionDestroyedException;
import org.apache.geode.cache.RegionShortcut;
import org.apache.geode.cache.partition.PartitionMemberInfo;
import org.apache.geode.cache.partition.PartitionRegionHelper;
@@ -42,6 +43,9 @@
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.InternalRegionFactory;
+import org.apache.geode.internal.cache.PartitionedRegion;
+import org.apache.geode.internal.cache.PrimaryBucketLockException;
+import org.apache.geode.internal.cache.execute.BucketMovedException;
import org.apache.geode.management.ManagementException;
import org.apache.geode.redis.internal.cluster.RedisMemberInfo;
import org.apache.geode.redis.internal.data.NullRedisDataStructures;
@@ -84,6 +88,7 @@
private static final Map<RedisDataType, RedisData> NULL_TYPES = new HashMap<>();
private final Region<RedisKey, RedisData> dataRegion;
+ private final PartitionedRegion partitionedRegion;
private final RedisHashCommandsFunctionExecutor hashCommands;
private final RedisSetCommandsFunctionExecutor setCommands;
private final RedisStringCommandsFunctionExecutor stringCommands;
@@ -118,6 +123,7 @@
redisDataRegionFactory.setPartitionAttributes(attributesFactory.create());
dataRegion = redisDataRegionFactory.create(REDIS_DATA_REGION);
+ partitionedRegion = (PartitionedRegion) dataRegion;
stringCommands = new RedisStringCommandsFunctionExecutor(this);
setCommands = new RedisSetCommandsFunctionExecutor(this);
@@ -145,7 +151,16 @@
}
public <T> T execute(Object key, Callable<T> callable) {
- return stripedExecutor.execute(key, callable);
+ try {
+ return partitionedRegion.computeWithPrimaryLocked(key,
+ () -> stripedExecutor.execute(key, callable));
+ } catch (PrimaryBucketLockException | BucketMovedException | RegionDestroyedException ex) {
+ throw createRedisDataMovedException((RedisKey) key);
+ } catch (RedisException bex) {
+ throw bex;
+ } catch (Exception ex) {
+ throw new RedisException(ex);
+ }
}
public RedisData getRedisData(RedisKey key) {
@@ -153,7 +168,13 @@
}
public RedisData getRedisData(RedisKey key, RedisData notFoundValue) {
- RedisData result = getLocalDataRegion().get(key);
+ RedisData result;
+ try {
+ result = getLocalDataRegion().get(key);
+ } catch (RegionDestroyedException rex) {
+ throw createRedisDataMovedException(key);
+ }
+
if (result != null) {
if (result.hasExpired()) {
result.doExpiration(this, key);
@@ -161,10 +182,7 @@
}
} else {
if (!getSlotAdvisor().isLocal(key)) {
- RedisMemberInfo memberInfo = getRedisMemberInfo(key);
- Integer slot = key.getCrc16() & (REDIS_SLOTS - 1);
- throw new RedisDataMovedException(slot, memberInfo.getHostAddress(),
- memberInfo.getRedisPort());
+ throw createRedisDataMovedException(key);
}
}
if (result == null) {
@@ -174,6 +192,13 @@
}
}
+ private RedisDataMovedException createRedisDataMovedException(RedisKey key) {
+ RedisMemberInfo memberInfo = getRedisMemberInfo(key);
+ Integer slot = key.getCrc16() & (REDIS_SLOTS - 1);
+ return new RedisDataMovedException(slot, memberInfo.getHostAddress(),
+ memberInfo.getRedisPort());
+ }
+
private RedisMemberInfo getRedisMemberInfo(RedisKey key) {
try {
return getSlotAdvisor().getMemberInfo(key);
diff --git a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/cluster/RedisMemberInfo.java b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/cluster/RedisMemberInfo.java
index e0f72da..35fd22b 100644
--- a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/cluster/RedisMemberInfo.java
+++ b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/cluster/RedisMemberInfo.java
@@ -19,6 +19,7 @@
import java.io.DataOutput;
import java.io.IOException;
import java.io.Serializable;
+import java.util.Objects;
import org.apache.geode.DataSerializer;
import org.apache.geode.distributed.DistributedMember;
@@ -85,4 +86,22 @@
public String toString() {
return member.toString() + " hostAddress: " + hostAddress + " redisPort: " + redisPort;
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof RedisMemberInfo)) {
+ return false;
+ }
+ RedisMemberInfo that = (RedisMemberInfo) o;
+ return redisPort == that.redisPort && Objects.equals(member, that.member)
+ && Objects.equals(hostAddress, that.hostAddress);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(member, hostAddress, redisPort);
+ }
}
diff --git a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/AbstractRedisData.java b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/AbstractRedisData.java
index f37b2a6..90d4112 100644
--- a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/AbstractRedisData.java
+++ b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/AbstractRedisData.java
@@ -39,7 +39,6 @@
import org.apache.geode.internal.serialization.KnownVersion;
import org.apache.geode.internal.serialization.SerializationContext;
import org.apache.geode.logging.internal.log4j.api.LogService;
-import org.apache.geode.redis.internal.RedisRestoreKeyExistsException;
import org.apache.geode.redis.internal.RegionProvider;
import org.apache.geode.redis.internal.delta.AddsDeltaInfo;
import org.apache.geode.redis.internal.delta.AppendDeltaInfo;
diff --git a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisData.java b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisData.java
index b10c458..3a64670 100644
--- a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisData.java
+++ b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisData.java
@@ -96,7 +96,9 @@
short ordinal = inputStream.readShort();
KnownVersion knownVersion = KnownVersion.getKnownVersion(ordinal);
if (knownVersion == null) {
- throw new RedisException("Unknown version ordinal: " + ordinal);
+ LogService.getLogger()
+ .info("Error restoring object - unknown version ordinal: {}", ordinal);
+ throw new RedisException(ERROR_RESTORE_INVALID_PAYLOAD);
}
obj = DataSerializer.readObject(new VersionedDataInputStream(bais, knownVersion));
diff --git a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisDataMovedException.java b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisDataMovedException.java
index 46193b3..7aeca4e 100644
--- a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisDataMovedException.java
+++ b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisDataMovedException.java
@@ -15,7 +15,9 @@
package org.apache.geode.redis.internal.data;
-public class RedisDataMovedException extends RuntimeException {
+import org.apache.geode.redis.internal.RedisException;
+
+public class RedisDataMovedException extends RedisException {
private static final long serialVersionUID = 8138174496239512955L;
diff --git a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisDataTypeMismatchException.java b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisDataTypeMismatchException.java
index ee3de2f..dfc557d 100755
--- a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisDataTypeMismatchException.java
+++ b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisDataTypeMismatchException.java
@@ -15,11 +15,14 @@
*/
package org.apache.geode.redis.internal.data;
+
+import org.apache.geode.redis.internal.RedisException;
+
/**
* This exception is for the case that a client attempts to operate on a data structure of one
* {@link org.apache.geode.redis.internal.data.RedisDataType} with a command that is of another type
*/
-public class RedisDataTypeMismatchException extends RuntimeException {
+public class RedisDataTypeMismatchException extends RedisException {
private static final long serialVersionUID = -2451663685348513870L;
diff --git a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/RedisRestoreKeyExistsException.java b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisRestoreKeyExistsException.java
similarity index 86%
rename from geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/RedisRestoreKeyExistsException.java
rename to geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisRestoreKeyExistsException.java
index 4add607..a6faf3b 100755
--- a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/RedisRestoreKeyExistsException.java
+++ b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisRestoreKeyExistsException.java
@@ -12,12 +12,14 @@
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
-package org.apache.geode.redis.internal;
+package org.apache.geode.redis.internal.data;
+
+import org.apache.geode.redis.internal.RedisException;
/**
* An exception thrown when the key being restored already exists.
*/
-public class RedisRestoreKeyExistsException extends RuntimeException {
+public class RedisRestoreKeyExistsException extends RedisException {
private static final long serialVersionUID = -7022501593522613782L;
diff --git a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HScanExecutor.java b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HScanExecutor.java
index bce6e74..32f2cde 100755
--- a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HScanExecutor.java
+++ b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HScanExecutor.java
@@ -35,6 +35,7 @@
import org.apache.logging.log4j.Logger;
import org.apache.geode.logging.internal.log4j.api.LogService;
+import org.apache.geode.redis.internal.data.RedisData;
import org.apache.geode.redis.internal.data.RedisDataTypeMismatchException;
import org.apache.geode.redis.internal.data.RedisKey;
import org.apache.geode.redis.internal.executor.RedisResponse;
@@ -50,16 +51,12 @@
private static final Logger logger = LogService.getLogger();
@Override
- public RedisResponse executeCommand(Command command,
- ExecutionHandlerContext context) {
+ public RedisResponse executeCommand(Command command, ExecutionHandlerContext context) {
List<byte[]> commandElems = command.getProcessedCommand();
String cursorString = bytesToString(commandElems.get(2));
int cursor;
- Pattern matchPattern;
- String globPattern = null;
- int count = DEFAULT_COUNT;
try {
cursor = Integer.parseInt(cursorString);
@@ -68,54 +65,67 @@
}
RedisKey key = command.getKey();
- if (!getDataRegion(context).containsKey(key)) {
- context.getRedisStats().incKeyspaceMisses();
- return RedisResponse.emptyScan();
- }
- if (getDataRegion(context).get(key).getType() != REDIS_HASH) {
- throw new RedisDataTypeMismatchException(ERROR_WRONG_TYPE);
- }
+ // Because we're trying to preserve the same semantics of error conditions, with native redis,
+ // the ordering of input validation is reflected here. To that end the first check ends up
+ // being an existence check of the key. That causes a race since the data value needs to be
+ // accessed again when the actual command does its work. If the relevant bucket doesn't get
+ // locked throughout the call, the bucket may move producing inconsistent results.
+ return context.getRegionProvider().execute(key, () -> {
+ String globPattern = null;
+ int count = DEFAULT_COUNT;
+ Pattern matchPattern;
- command.getCommandType().checkDeferredParameters(command, context);
+ RedisData value = context.getRegionProvider().getRedisData(key);
+ if (value.isNull()) {
+ context.getRedisStats().incKeyspaceMisses();
+ return RedisResponse.emptyScan();
+ }
- for (int i = 3; i < commandElems.size(); i = i + 2) {
- byte[] commandElemBytes = commandElems.get(i);
- if (equalsIgnoreCaseBytes(commandElemBytes, bMATCH)) {
- commandElemBytes = commandElems.get(i + 1);
- globPattern = bytesToString(commandElemBytes);
+ if (value.getType() != REDIS_HASH) {
+ throw new RedisDataTypeMismatchException(ERROR_WRONG_TYPE);
+ }
- } else if (equalsIgnoreCaseBytes(commandElemBytes, bCOUNT)) {
- commandElemBytes = commandElems.get(i + 1);
- try {
- count = narrowLongToInt(bytesToLong(commandElemBytes));
- } catch (NumberFormatException e) {
- return RedisResponse.error(ERROR_NOT_INTEGER);
- }
+ command.getCommandType().checkDeferredParameters(command, context);
- if (count < 1) {
+ for (int i = 3; i < commandElems.size(); i = i + 2) {
+ byte[] commandElemBytes = commandElems.get(i);
+ if (equalsIgnoreCaseBytes(commandElemBytes, bMATCH)) {
+ commandElemBytes = commandElems.get(i + 1);
+ globPattern = bytesToString(commandElemBytes);
+
+ } else if (equalsIgnoreCaseBytes(commandElemBytes, bCOUNT)) {
+ commandElemBytes = commandElems.get(i + 1);
+ try {
+ count = narrowLongToInt(bytesToLong(commandElemBytes));
+ } catch (NumberFormatException e) {
+ return RedisResponse.error(ERROR_NOT_INTEGER);
+ }
+
+ if (count < 1) {
+ return RedisResponse.error(ERROR_SYNTAX);
+ }
+
+ } else {
return RedisResponse.error(ERROR_SYNTAX);
}
-
- } else {
- return RedisResponse.error(ERROR_SYNTAX);
}
- }
- try {
- matchPattern = convertGlobToRegex(globPattern);
- } catch (PatternSyntaxException e) {
- logger.warn(
- "Could not compile the pattern: '{}' due to the following exception: '{}'. HSCAN will return an empty list.",
- globPattern, e.getMessage());
+ try {
+ matchPattern = convertGlobToRegex(globPattern);
+ } catch (PatternSyntaxException e) {
+ logger.warn(
+ "Could not compile the pattern: '{}' due to the following exception: '{}'. HSCAN will return an empty list.",
+ globPattern, e.getMessage());
- return RedisResponse.emptyScan();
- }
- RedisHashCommands redisHashCommands = context.getHashCommands();
+ return RedisResponse.emptyScan();
+ }
+ RedisHashCommands redisHashCommands = context.getHashCommands();
- Pair<Integer, List<byte[]>> scanResult =
- redisHashCommands.hscan(key, matchPattern, count, cursor);
+ Pair<Integer, List<byte[]>> scanResult =
+ redisHashCommands.hscan(key, matchPattern, count, cursor);
- return RedisResponse.scan(new BigInteger(String.valueOf(scanResult.getLeft())),
- scanResult.getRight());
+ return RedisResponse.scan(new BigInteger(String.valueOf(scanResult.getLeft())),
+ scanResult.getRight());
+ });
}
}
diff --git a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/key/RenameExecutor.java b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/key/RenameExecutor.java
index 62b78e1..1d054ea 100644
--- a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/key/RenameExecutor.java
+++ b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/key/RenameExecutor.java
@@ -40,6 +40,12 @@
return RedisResponse.ok();
}
+ if (key.getBucketId() != newKey.getBucketId()) {
+ // Will produce MOVED exceptions here for whichever key is at fault
+ context.getRegionProvider().getRedisData(newKey);
+ context.getRegionProvider().getRedisData(key);
+ }
+
if (!redisKeyCommands.rename(key, newKey)) {
return RedisResponse.error(ERROR_NO_SUCH_KEY);
}
diff --git a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/key/RestoreExecutor.java b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/key/RestoreExecutor.java
index a54f07e..b78fcb3 100644
--- a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/key/RestoreExecutor.java
+++ b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/key/RestoreExecutor.java
@@ -25,8 +25,8 @@
import java.util.List;
import org.apache.geode.redis.internal.RedisException;
-import org.apache.geode.redis.internal.RedisRestoreKeyExistsException;
import org.apache.geode.redis.internal.data.RedisKey;
+import org.apache.geode.redis.internal.data.RedisRestoreKeyExistsException;
import org.apache.geode.redis.internal.executor.AbstractExecutor;
import org.apache.geode.redis.internal.executor.RedisResponse;
import org.apache.geode.redis.internal.netty.Coder;
diff --git a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SScanExecutor.java b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SScanExecutor.java
index f1099a1..5b20998 100755
--- a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SScanExecutor.java
+++ b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SScanExecutor.java
@@ -35,6 +35,7 @@
import org.apache.commons.lang3.tuple.Pair;
import org.apache.geode.logging.internal.log4j.api.LogService;
+import org.apache.geode.redis.internal.data.RedisData;
import org.apache.geode.redis.internal.data.RedisDataTypeMismatchException;
import org.apache.geode.redis.internal.data.RedisKey;
import org.apache.geode.redis.internal.executor.RedisResponse;
@@ -66,12 +67,13 @@
RedisKey key = command.getKey();
- if (!getDataRegion(context).containsKey(key)) {
+ RedisData value = context.getRegionProvider().getRedisData(key);
+ if (value.isNull()) {
context.getRedisStats().incKeyspaceMisses();
return RedisResponse.emptyScan();
}
- if (getDataRegion(context).get(key).getType() != REDIS_SET) {
+ if (value.getType() != REDIS_SET) {
throw new RedisDataTypeMismatchException(ERROR_WRONG_TYPE);
}
diff --git a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/netty/ExecutionHandlerContext.java b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/netty/ExecutionHandlerContext.java
index e1fa74d..04d0a05 100644
--- a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/netty/ExecutionHandlerContext.java
+++ b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/netty/ExecutionHandlerContext.java
@@ -218,38 +218,45 @@
}
private RedisResponse getExceptionResponse(ChannelHandlerContext ctx, Throwable cause) {
- RedisResponse response;
if (cause instanceof IOException) {
channelInactive(ctx);
return null;
}
- if (cause instanceof IllegalStateException
- || cause instanceof RedisParametersMismatchException
- || cause instanceof RedisException
- || cause instanceof NumberFormatException
- || cause instanceof ArithmeticException) {
- response = RedisResponse.error(cause.getMessage());
- } else if (cause instanceof RedisDataMovedException) {
- response = RedisResponse.moved(cause.getMessage());
- } else if (cause instanceof RedisDataTypeMismatchException) {
- response = RedisResponse.wrongType(cause.getMessage());
- } else if (cause instanceof LowMemoryException) {
- response = RedisResponse.oom(RedisConstants.ERROR_OOM_COMMAND_NOT_ALLOWED);
- } else if (cause instanceof DecoderException
- && cause.getCause() instanceof RedisCommandParserException) {
- response =
- RedisResponse.error(RedisConstants.PARSING_EXCEPTION_MESSAGE + ": " + cause.getMessage());
- } else if (cause instanceof InterruptedException || cause instanceof CacheClosedException) {
- response = RedisResponse.error(RedisConstants.SERVER_ERROR_SHUTDOWN);
+ Throwable rootCause = getRootCause(cause);
+ if (rootCause instanceof RedisDataMovedException) {
+ return RedisResponse.moved(rootCause.getMessage());
+ } else if (rootCause instanceof RedisDataTypeMismatchException) {
+ return RedisResponse.wrongType(rootCause.getMessage());
+ } else if (rootCause instanceof IllegalStateException
+ || rootCause instanceof RedisParametersMismatchException
+ || rootCause instanceof RedisException
+ || rootCause instanceof NumberFormatException
+ || rootCause instanceof ArithmeticException) {
+ return RedisResponse.error(rootCause.getMessage());
+ } else if (rootCause instanceof LowMemoryException) {
+ return RedisResponse.oom(RedisConstants.ERROR_OOM_COMMAND_NOT_ALLOWED);
+ } else if (rootCause instanceof DecoderException
+ && rootCause.getCause() instanceof RedisCommandParserException) {
+ return RedisResponse
+ .error(RedisConstants.PARSING_EXCEPTION_MESSAGE + ": " + rootCause.getMessage());
+ } else if (rootCause instanceof InterruptedException
+ || rootCause instanceof CacheClosedException) {
+ return RedisResponse.error(RedisConstants.SERVER_ERROR_SHUTDOWN);
} else {
if (logger.isErrorEnabled()) {
- logger.error("GeodeRedisServer-Unexpected error handler for {}", ctx.channel(), cause);
+ logger.error("GeodeRedisServer-Unexpected error handler for {}", ctx.channel(), rootCause);
}
- response = RedisResponse.error(RedisConstants.SERVER_ERROR_MESSAGE);
+ return RedisResponse.error(RedisConstants.SERVER_ERROR_MESSAGE);
}
+ }
- return response;
+ private Throwable getRootCause(Throwable cause) {
+ Throwable root = cause;
+ while (root.getCause() != null) {
+ root = root.getCause();
+ }
+ return root;
}
@Override
diff --git a/geode-apis-compatible-with-redis/src/main/resources/org/apache/geode/redis/internal/sanctioned-geode-apis-compatible-with-redis-serializables.txt b/geode-apis-compatible-with-redis/src/main/resources/org/apache/geode/redis/internal/sanctioned-geode-apis-compatible-with-redis-serializables.txt
index a485f29..4fde503 100755
--- a/geode-apis-compatible-with-redis/src/main/resources/org/apache/geode/redis/internal/sanctioned-geode-apis-compatible-with-redis-serializables.txt
+++ b/geode-apis-compatible-with-redis/src/main/resources/org/apache/geode/redis/internal/sanctioned-geode-apis-compatible-with-redis-serializables.txt
@@ -2,7 +2,6 @@
org/apache/geode/redis/internal/RedisCommandSupportLevel,false
org/apache/geode/redis/internal/RedisCommandType,false,deferredParameterRequirements:org/apache/geode/redis/internal/ParameterRequirements/ParameterRequirements,executor:org/apache/geode/redis/internal/executor/Executor,parameterRequirements:org/apache/geode/redis/internal/ParameterRequirements/ParameterRequirements,supportLevel:org/apache/geode/redis/internal/RedisCommandSupportLevel
org/apache/geode/redis/internal/data/RedisDataType,false,toStringValue:java/lang/String
-org/apache/geode/redis/internal/data/RedisDataTypeMismatchException,true,-2451663685348513870
org/apache/geode/redis/internal/delta/DeltaType,false
org/apache/geode/redis/internal/executor/BaseSetOptions$Exists,false
org/apache/geode/redis/internal/netty/CoderException,true,4707944288714910949
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
index a41ad28..7289a6f 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
@@ -609,7 +609,7 @@
return redundancyTracker;
}
- public void computeWithPrimaryLocked(Object key, Runnable r) throws PrimaryBucketLockException {
+ public <T> T computeWithPrimaryLocked(Object key, Callable<T> callable) throws Exception {
int bucketId = PartitionedRegionHelper.getHashKey(this, null, key, null, null);
BucketRegion br;
@@ -626,7 +626,7 @@
}
try {
- r.run();
+ return callable.call();
} finally {
br.doUnlockForPrimary();
}