GEODE-9368: Changes to support better resiliency for HA scenarios (#6686)

- Wrap all StripedExecutor.execute calls with
  PartitionedRegion.computeWithPrimaryLocked.
- Handle RegionDestroyedException as a result of trying to lock a bucket
  while the bucket is moving.
- Improve race condition handling in *ScanExecutor calls
- Rename responds with MOVED when the bucket is moving during the call.
- Convert tests to use Lettuce's RedisClusterClient
- Add a retry mechanism to various lettuce calls since the client does
  not handle all failure cases.
- Fix TCL test script to ensure that buckets are created before running
  the tests.
- Ignore RedisSessionDUnitTest for now - fix in GEODE-9437
diff --git a/ci/scripts/execute_redis_tests.sh b/ci/scripts/execute_redis_tests.sh
index f39fb28..c34c861 100755
--- a/ci/scripts/execute_redis_tests.sh
+++ b/ci/scripts/execute_redis_tests.sh
@@ -35,7 +35,15 @@
   --name=server1 \
   --compatible-with-redis-port=6380 \
   --compatible-with-redis-bind-address=127.0.0.1 \
-  --compatible-with-redis-password=foobar
+  --compatible-with-redis-password=foobar \
+  --server-port=0 \
+  --J=-Dgemfire.jmx-manager=true \
+  --J=-Dgemfire.jmx-manager-start=true \
+  --J=-Dgemfire.jmx-manager-port=1099
+
+# This will cause all buckets to be created
+../geode-assembly/build/install/apache-geode/bin/gfsh -e "connect --jmx-manager=localhost[1099]" \
+  -e "query --query='select count(*) from /REDIS_DATA'"
 
 failCount=0
 
@@ -43,13 +51,21 @@
 
 ((failCount += $?))
 
+../geode-assembly/build/install/apache-geode/bin/gfsh stop server --dir=server1
 
 ../geode-assembly/build/install/apache-geode/bin/gfsh start server \
   --J=-Denable-unsupported-commands=true \
   --name=server2 \
   --server-port=0 \
   --compatible-with-redis-port=6379 \
-  --compatible-with-redis-bind-address=127.0.0.1
+  --compatible-with-redis-bind-address=127.0.0.1 \
+  --J=-Dgemfire.jmx-manager=true \
+  --J=-Dgemfire.jmx-manager-start=true \
+  --J=-Dgemfire.jmx-manager-port=1099
+
+# This will cause all buckets to be created
+../geode-assembly/build/install/apache-geode/bin/gfsh -e "connect --jmx-manager=localhost[1099]" \
+  -e "query --query='select count(*) from /REDIS_DATA'"
 
 ./runtest --host 127.0.0.1 --port 6379 \
 --single unit/type/set \
diff --git a/geode-apis-compatible-with-redis/src/acceptanceTest/java/session/NativeRedisSessionAcceptanceTest.java b/geode-apis-compatible-with-redis/src/acceptanceTest/java/session/NativeRedisSessionAcceptanceTest.java
index a0fc31b6..db53791 100644
--- a/geode-apis-compatible-with-redis/src/acceptanceTest/java/session/NativeRedisSessionAcceptanceTest.java
+++ b/geode-apis-compatible-with-redis/src/acceptanceTest/java/session/NativeRedisSessionAcceptanceTest.java
@@ -16,13 +16,11 @@
 
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
-import org.junit.Ignore;
 import org.junit.Test;
 
 import org.apache.geode.redis.NativeRedisClusterTestRule;
 import org.apache.geode.redis.session.RedisSessionDUnitTest;
 
-@Ignore("GEODE-9341")
 public class NativeRedisSessionAcceptanceTest extends RedisSessionDUnitTest {
 
   @ClassRule
diff --git a/geode-apis-compatible-with-redis/src/acceptanceTest/java/session/NativeRedisSessionExpirationAcceptanceTest.java b/geode-apis-compatible-with-redis/src/acceptanceTest/java/session/NativeRedisSessionExpirationAcceptanceTest.java
index a47f95a..4fc8716 100644
--- a/geode-apis-compatible-with-redis/src/acceptanceTest/java/session/NativeRedisSessionExpirationAcceptanceTest.java
+++ b/geode-apis-compatible-with-redis/src/acceptanceTest/java/session/NativeRedisSessionExpirationAcceptanceTest.java
@@ -16,13 +16,11 @@
 
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
-import org.junit.Ignore;
 import org.junit.Test;
 
 import org.apache.geode.redis.NativeRedisClusterTestRule;
 import org.apache.geode.redis.session.SessionExpirationDUnitTest;
 
-@Ignore("GEODE-9341")
 public class NativeRedisSessionExpirationAcceptanceTest extends SessionExpirationDUnitTest {
   @ClassRule
   public static NativeRedisClusterTestRule redis = new NativeRedisClusterTestRule();
diff --git a/geode-apis-compatible-with-redis/src/commonTest/java/org/apache/geode/redis/GeodeRedisServerRule.java b/geode-apis-compatible-with-redis/src/commonTest/java/org/apache/geode/redis/GeodeRedisServerRule.java
index ccae2bb..454d09c 100644
--- a/geode-apis-compatible-with-redis/src/commonTest/java/org/apache/geode/redis/GeodeRedisServerRule.java
+++ b/geode-apis-compatible-with-redis/src/commonTest/java/org/apache/geode/redis/GeodeRedisServerRule.java
@@ -49,6 +49,12 @@
     cache = cacheFactory.create();
     server = new GeodeRedisServer("localhost", 0, (InternalCache) cache);
     server.setAllowUnsupportedCommands(enableUnsupportedCommands);
+
+    // Ensure that buckets are created up front
+    try {
+      server.getRegionProvider().getSlotAdvisor().getBucketSlots();
+    } catch (InterruptedException ignored) {
+    }
   }
 
   public GeodeRedisServerRule withProperty(String property, String value) {
diff --git a/geode-apis-compatible-with-redis/src/commonTest/java/org/apache/geode/test/dunit/rules/RedisClusterStartupRule.java b/geode-apis-compatible-with-redis/src/commonTest/java/org/apache/geode/test/dunit/rules/RedisClusterStartupRule.java
index 017c8ed..bb382cf 100644
--- a/geode-apis-compatible-with-redis/src/commonTest/java/org/apache/geode/test/dunit/rules/RedisClusterStartupRule.java
+++ b/geode-apis-compatible-with-redis/src/commonTest/java/org/apache/geode/test/dunit/rules/RedisClusterStartupRule.java
@@ -21,6 +21,7 @@
 import static org.apache.geode.distributed.ConfigurationProperties.REDIS_PORT;
 
 import java.util.Properties;
+import java.util.Set;
 
 import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.LogManager;
@@ -28,12 +29,18 @@
 import org.apache.logging.log4j.core.config.Configurator;
 import redis.clients.jedis.Jedis;
 
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.partition.PartitionRegionHelper;
+import org.apache.geode.distributed.DistributedMember;
 import org.apache.geode.logging.internal.log4j.api.FastLogger;
 import org.apache.geode.redis.ClusterNode;
 import org.apache.geode.redis.ClusterNodes;
 import org.apache.geode.redis.internal.GeodeRedisServer;
 import org.apache.geode.redis.internal.GeodeRedisService;
+import org.apache.geode.redis.internal.RegionProvider;
 import org.apache.geode.redis.internal.cluster.RedisMemberInfo;
+import org.apache.geode.redis.internal.data.RedisData;
+import org.apache.geode.redis.internal.data.RedisKey;
 import org.apache.geode.test.awaitility.GeodeAwaitility;
 import org.apache.geode.test.junit.rules.ServerStarterRule;
 
@@ -154,4 +161,58 @@
       FastLogger.setDelegating(true);
     });
   }
+
+  /**
+   * Assuming a redundancy of 1, and at least 3 members, move the given key's primary bucket to a
+   * non-hosting member.
+   */
+  public DistributedMember moveBucketForKey(String key) {
+    return getMember(1).invoke("moveBucketForKey: " + key, () -> {
+      Region<RedisKey, RedisData> r = RedisClusterStartupRule.getCache()
+          .getRegion(RegionProvider.REDIS_DATA_REGION);
+
+      RedisKey redisKey = new RedisKey(key.getBytes());
+      DistributedMember primaryMember = PartitionRegionHelper.getPrimaryMemberForKey(r, redisKey);
+      Set<DistributedMember> allHosting = PartitionRegionHelper.getAllMembersForKey(r, redisKey);
+
+      // Returns all members, except the one calling.
+      Set<DistributedMember> allMembers = getCache().getMembers(r);
+      allMembers.add(getCache().getDistributedSystem().getDistributedMember());
+
+      allMembers.removeAll(allHosting);
+      DistributedMember targetMember = allMembers.stream().findFirst().orElseThrow(
+          () -> new IllegalStateException("No non-hosting member found for key: " + key));
+
+      PartitionRegionHelper.moveBucketByKey(r, primaryMember, targetMember, redisKey);
+
+      // Who is the primary now?
+      return PartitionRegionHelper.getPrimaryMemberForKey(r, redisKey);
+    });
+  }
+
+  /**
+   * Return some key of the form {@code prefix<N>}, (where {@code N} is an integer), for which the
+   * given VM is the primary bucket holder. This is useful in tests where one needs to ensure that
+   * a given key would be hosted on a given server.
+   */
+  public String getKeyOnServer(String keyPrefix, int vmId) {
+    return getMember(1).invoke("getKeyOnServer", () -> {
+      Region<RedisKey, RedisData> r = RedisClusterStartupRule.getCache()
+          .getRegion(RegionProvider.REDIS_DATA_REGION);
+
+      String server = "server-" + vmId;
+      String key;
+      int i = 0;
+      while (true) {
+        key = keyPrefix + i;
+        DistributedMember primaryMember =
+            PartitionRegionHelper.getPrimaryMemberForKey(r, new RedisKey(key.getBytes()));
+        if (primaryMember.getName().equals(server)) {
+          return key;
+        }
+        i++;
+      }
+    });
+  }
+
 }
diff --git a/geode-apis-compatible-with-redis/src/distributedTest/java/org/apache/geode/redis/CheckPrimaryBucketFunction.java b/geode-apis-compatible-with-redis/src/distributedTest/java/org/apache/geode/redis/CheckPrimaryBucketFunction.java
index 02361a5..d679c9c 100644
--- a/geode-apis-compatible-with-redis/src/distributedTest/java/org/apache/geode/redis/CheckPrimaryBucketFunction.java
+++ b/geode-apis-compatible-with-redis/src/distributedTest/java/org/apache/geode/redis/CheckPrimaryBucketFunction.java
@@ -16,6 +16,7 @@
 
 package org.apache.geode.redis;
 
+import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
@@ -82,7 +83,7 @@
       }
     }
 
-    Runnable r = () -> {
+    Callable<Void> r = () -> {
       if (!releaseLatchEarly) {
         signalFunctionHasStarted.countDown();
       }
@@ -97,6 +98,7 @@
         e.printStackTrace();
         result.lastResult(false);
       }
+      return null;
     };
 
     LocalDataSet localDataSet = (LocalDataSet) localRegion;
@@ -105,6 +107,8 @@
       partitionedRegion.computeWithPrimaryLocked(key, r);
     } catch (PrimaryBucketLockException ex) {
       throw new BucketMovedException(ex.toString());
+    } catch (Exception ex) {
+      throw new RuntimeException(ex);
     }
   }
 
diff --git a/geode-apis-compatible-with-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/CrashAndNoRepeatDUnitTest.java b/geode-apis-compatible-with-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/CrashAndNoRepeatDUnitTest.java
deleted file mode 100644
index 3055021..0000000
--- a/geode-apis-compatible-with-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/CrashAndNoRepeatDUnitTest.java
+++ /dev/null
@@ -1,277 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- *
- */
-
-package org.apache.geode.redis.internal.executor;
-
-import static org.assertj.core.api.Assertions.assertThat;
-
-import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-
-import io.lettuce.core.cluster.ClusterClientOptions;
-import io.lettuce.core.cluster.ClusterTopologyRefreshOptions;
-import io.lettuce.core.cluster.RedisClusterClient;
-import io.lettuce.core.cluster.api.sync.RedisAdvancedClusterCommands;
-import org.apache.logging.log4j.Level;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-import org.apache.logging.log4j.core.config.Configurator;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Ignore;
-import org.junit.Rule;
-import org.junit.Test;
-
-import org.apache.geode.cache.control.RebalanceFactory;
-import org.apache.geode.cache.control.RebalanceResults;
-import org.apache.geode.cache.control.ResourceManager;
-import org.apache.geode.logging.internal.log4j.api.FastLogger;
-import org.apache.geode.logging.internal.log4j.api.LogService;
-import org.apache.geode.test.dunit.rules.ClusterStartupRule;
-import org.apache.geode.test.dunit.rules.MemberVM;
-import org.apache.geode.test.dunit.rules.RedisClusterStartupRule;
-import org.apache.geode.test.junit.rules.ExecutorServiceRule;
-
-@Ignore("GEODE-9368")
-public class CrashAndNoRepeatDUnitTest {
-
-  private static final Logger logger = LogService.getLogger();
-
-  @ClassRule
-  public static RedisClusterStartupRule clusterStartUp = new RedisClusterStartupRule();
-
-  private static MemberVM locator;
-  private static MemberVM server1;
-  private static MemberVM server2;
-  private static MemberVM server3;
-  private static RedisClusterClient clusterClient;
-  private static RedisAdvancedClusterCommands<String, String> lettuce;
-
-  @Rule
-  public ExecutorServiceRule executor = new ExecutorServiceRule();
-
-  @BeforeClass
-  public static void classSetup() throws Exception {
-    locator = clusterStartUp.startLocatorVM(0);
-    server1 = clusterStartUp.startRedisVM(1, locator.getPort());
-    server2 = clusterStartUp.startRedisVM(2, locator.getPort());
-    server3 = clusterStartUp.startRedisVM(3, locator.getPort());
-
-    server1.invoke("Set logging level to DEBUG", () -> {
-      Logger logger = LogManager.getLogger("org.apache.geode.redis.internal");
-      Configurator.setAllLevels(logger.getName(), Level.getLevel("DEBUG"));
-      FastLogger.setDelegating(true);
-    });
-
-    server2.invoke("Set logging level to DEBUG", () -> {
-      Logger logger = LogManager.getLogger("org.apache.geode.redis.internal");
-      Configurator.setAllLevels(logger.getName(), Level.getLevel("DEBUG"));
-      FastLogger.setDelegating(true);
-    });
-
-    server3.invoke("Set logging level to DEBUG", () -> {
-      Logger logger = LogManager.getLogger("org.apache.geode.redis.internal");
-      Configurator.setAllLevels(logger.getName(), Level.getLevel("DEBUG"));
-      FastLogger.setDelegating(true);
-    });
-
-    int redisServerPort1 = clusterStartUp.getRedisPort(1);
-    clusterClient = RedisClusterClient.create("redis://localhost:" + redisServerPort1);
-
-    ClusterTopologyRefreshOptions refreshOptions =
-        ClusterTopologyRefreshOptions.builder()
-            .enableAllAdaptiveRefreshTriggers()
-            .refreshTriggersReconnectAttempts(1)
-            .build();
-
-    clusterClient.setOptions(ClusterClientOptions.builder()
-        .topologyRefreshOptions(refreshOptions)
-        .validateClusterNodeMembership(false)
-        .build());
-
-    lettuce = clusterClient.connect().sync();
-  }
-
-  @AfterClass
-  public static void cleanup() {
-    clusterClient.shutdown();
-  }
-
-  @Test
-  public void givenServerCrashesDuringAPPEND_thenDataIsNotLost() throws Exception {
-    AtomicBoolean running1 = new AtomicBoolean(true);
-    AtomicBoolean running2 = new AtomicBoolean(false);
-
-    Runnable task1 = () -> appendPerformAndVerify(1, 20000, running1);
-    Runnable task2 = () -> appendPerformAndVerify(2, 20000, running1);
-    Runnable task3 = () -> appendPerformAndVerify(3, 20000, running1);
-    Runnable task4 = () -> appendPerformAndVerify(4, 1000, running2);
-
-    Future<Void> future1 = executor.runAsync(task1);
-    Future<Void> future2 = executor.runAsync(task2);
-    Future<Void> future3 = executor.runAsync(task3);
-    Future<Void> future4 = executor.runAsync(task4);
-
-    future4.get();
-    clusterStartUp.crashVM(2);
-    server2 = clusterStartUp.startRedisVM(2, locator.getPort());
-    rebalanceAllRegions(server2);
-
-    clusterStartUp.crashVM(3);
-    server3 = clusterStartUp.startRedisVM(3, locator.getPort());
-    rebalanceAllRegions(server3);
-
-    clusterStartUp.crashVM(2);
-    server2 = clusterStartUp.startRedisVM(2, locator.getPort());
-    rebalanceAllRegions(server2);
-
-    clusterStartUp.crashVM(3);
-    server3 = clusterStartUp.startRedisVM(3, locator.getPort());
-    rebalanceAllRegions(server3);
-
-    running1.set(false);
-
-    future1.get();
-    future2.get();
-    future3.get();
-  }
-
-  @Test
-  public void givenServerCrashesDuringRename_thenDataIsNotLost() throws Exception {
-    AtomicBoolean running1 = new AtomicBoolean(true);
-    AtomicBoolean running2 = new AtomicBoolean(true);
-    AtomicBoolean running3 = new AtomicBoolean(true);
-    AtomicBoolean running4 = new AtomicBoolean(false);
-
-    AtomicReference<String> phase = new AtomicReference<>("STARTUP");
-    Runnable task1 = () -> renamePerformAndVerify(1, 20000, running1, phase);
-    Runnable task2 = () -> renamePerformAndVerify(2, 20000, running2, phase);
-    Runnable task3 = () -> renamePerformAndVerify(3, 20000, running3, phase);
-    Runnable task4 = () -> renamePerformAndVerify(4, 1000, running4, phase);
-
-    Future<Void> future1 = executor.runAsync(task1);
-    Future<Void> future2 = executor.runAsync(task2);
-    Future<Void> future3 = executor.runAsync(task3);
-    Future<Void> future4 = executor.runAsync(task4);
-
-    future4.get();
-    phase.set("CRASH 1 SERVER2");
-    clusterStartUp.crashVM(2);
-    phase.set("RESTART 1 SERVER2");
-    server2 = clusterStartUp.startRedisVM(2, locator.getPort());
-    phase.set("REBALANCE 1 SERVER2");
-    rebalanceAllRegions(server2);
-
-    phase.set("CRASH 2 SERVER3");
-    clusterStartUp.crashVM(3);
-    phase.set("RESTART 2 SERVER3");
-    server3 = clusterStartUp.startRedisVM(3, locator.getPort());
-    phase.set("REBALANCE 2 SERVER3");
-    rebalanceAllRegions(server3);
-
-    phase.set("CRASH 3 SERVER2");
-    clusterStartUp.crashVM(2);
-    phase.set("RESTART 3 SERVER2");
-    server2 = clusterStartUp.startRedisVM(2, locator.getPort());
-    phase.set("REBALANCE 3 SERVER2");
-    rebalanceAllRegions(server2);
-
-    phase.set("CRASH 4 SERVER3");
-    clusterStartUp.crashVM(3);
-    phase.set("RESTART 4 SERVER3");
-    server3 = clusterStartUp.startRedisVM(3, locator.getPort());
-    phase.set("REBALANCE 4 SERVER3");
-    rebalanceAllRegions(server3);
-
-    running1.set(false);
-    running2.set(false);
-    running3.set(false);
-
-    future1.get();
-    future2.get();
-    future3.get();
-  }
-
-  private void renamePerformAndVerify(int index, int minimumIterations, AtomicBoolean isRunning,
-      AtomicReference<String> phase) {
-    String newKey;
-    String baseKey = "rename-key-" + index;
-    lettuce.set(baseKey + "-0", "value");
-    int iterationCount = 0;
-
-    while (iterationCount < minimumIterations || isRunning.get()) {
-      String oldKey = baseKey + "-" + iterationCount;
-      newKey = baseKey + "-" + (iterationCount + 1);
-      // This try/catch is left for debugging and should be removed as part of GEODE-9368
-      try {
-        lettuce.rename(oldKey, newKey);
-        assertThat(lettuce.exists(newKey)).as("key " + newKey + " should exist").isEqualTo(1);
-      } catch (Exception exception) {
-        System.err.println("---||| Exception on key " + newKey + " during phase: " + phase.get());
-        exception.printStackTrace();
-        isRunning.set(false);
-        throw exception;
-      }
-      iterationCount += 1;
-    }
-
-    logger.info("--->>> RENAME test ran {} iterations", iterationCount);
-  }
-
-  private void appendPerformAndVerify(int index, int minimumIterations, AtomicBoolean isRunning) {
-    String key = "append-key-" + index;
-    int iterationCount = 0;
-
-    while (iterationCount < minimumIterations || isRunning.get()) {
-      String appendString = "-" + key + "-" + iterationCount + "-";
-      lettuce.append(key, appendString);
-      iterationCount += 1;
-    }
-
-    String storedString = lettuce.get(key);
-    int idx = 0;
-    for (int i = 0; i < iterationCount; i++) {
-      String expectedValue = "-" + key + "-" + i + "-";
-      String foundValue = storedString.substring(idx, idx + expectedValue.length());
-      if (!expectedValue.equals(foundValue)) {
-        Assert.fail("unexpected " + foundValue + " at index " + i + " iterationCount="
-            + iterationCount + " in string "
-            + storedString);
-        break;
-      }
-      idx += expectedValue.length();
-    }
-
-    logger.info("--->>> APPEND test ran {} iterations", iterationCount);
-  }
-
-  private static void rebalanceAllRegions(MemberVM vm) {
-    vm.invoke(() -> {
-      ResourceManager manager = ClusterStartupRule.getCache().getResourceManager();
-
-      RebalanceFactory factory = manager.createRebalanceFactory();
-
-      try {
-        RebalanceResults result = factory.start().getResults();
-      } catch (InterruptedException e) {
-        throw new RuntimeException(e);
-      }
-    });
-  }
-}
diff --git a/geode-apis-compatible-with-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/MovedDUnitTest.java b/geode-apis-compatible-with-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/MovedDUnitTest.java
index 95d29c3..7f4e097 100644
--- a/geode-apis-compatible-with-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/MovedDUnitTest.java
+++ b/geode-apis-compatible-with-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/MovedDUnitTest.java
@@ -82,7 +82,11 @@
 
   @AfterClass
   public static void cleanup() {
-    clusterClient.shutdown();
+    try {
+      clusterClient.shutdown();
+    } catch (Exception ignored) {
+      // https://github.com/lettuce-io/lettuce-core/issues/1800
+    }
     jedis1.close();
     jedis2.close();
   }
diff --git a/geode-apis-compatible-with-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/hash/HScanDunitTest.java b/geode-apis-compatible-with-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/hash/HScanDunitTest.java
index 712fc6a..7306064 100644
--- a/geode-apis-compatible-with-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/hash/HScanDunitTest.java
+++ b/geode-apis-compatible-with-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/hash/HScanDunitTest.java
@@ -23,12 +23,10 @@
 import java.util.Map;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import io.lettuce.core.MapScanCursor;
-import io.lettuce.core.RedisCommandExecutionException;
-import io.lettuce.core.RedisException;
 import io.lettuce.core.ScanCursor;
 import io.lettuce.core.cluster.ClusterClientOptions;
 import io.lettuce.core.cluster.ClusterTopologyRefreshOptions;
@@ -37,18 +35,15 @@
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
-import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 
-import org.apache.geode.cache.control.RebalanceFactory;
-import org.apache.geode.cache.control.ResourceManager;
-import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.logging.internal.log4j.api.LogService;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
 import org.apache.geode.test.dunit.rules.MemberVM;
 import org.apache.geode.test.dunit.rules.RedisClusterStartupRule;
 import org.apache.geode.test.junit.rules.ExecutorServiceRule;
 
-@Ignore("GEODE-9368")
 public class HScanDunitTest {
 
   @ClassRule
@@ -62,17 +57,15 @@
 
   private static MemberVM locator;
 
-  static final String HASH_KEY = "key";
-  static final String BASE_FIELD = "baseField_";
-  static final Map<String, String> INITIAL_DATA_SET = makeEntrySet(1000);
+  private static final String HASH_KEY = "key";
+  private static final String BASE_FIELD = "baseField_";
+  private static final Map<String, String> INITIAL_DATA_SET = makeEntrySet(1000);
 
   @BeforeClass
   public static void classSetup() {
     locator = redisClusterStartupRule.startLocatorVM(0);
     int locatorPort = locator.getPort();
 
-    // note: due to rules around member weighting in split-brain scenarios,
-    // vm1 (server1) should not be crashed or it will cause additional (unrelated) failures
     redisClusterStartupRule.startRedisVM(1, locatorPort);
     redisClusterStartupRule.startRedisVM(2, locatorPort);
     redisClusterStartupRule.startRedisVM(3, locatorPort);
@@ -96,78 +89,61 @@
 
   @AfterClass
   public static void tearDown() {
-    clusterClient.shutdown();
+    try {
+      clusterClient.shutdown();
+    } catch (Exception ignored) {
+      // https://github.com/lettuce-io/lettuce-core/issues/1800
+    }
   }
 
   @Test
-  public void should_allowHscanIterationToCompleteSuccessfullyGivenServerCrashesDuringIteration()
+  public void should_allowHscanIterationToCompleteSuccessfullyGivenBucketIsMoving()
       throws ExecutionException, InterruptedException {
 
-    AtomicBoolean keepCrashingVMs = new AtomicBoolean(true);
-    AtomicInteger numberOfTimesVMCrashed = new AtomicInteger(0);
+    AtomicBoolean running = new AtomicBoolean(true);
 
     Future<Void> hScanFuture = executor.runAsync(
-        () -> doHScanContinuallyAndAssertOnResults(keepCrashingVMs, numberOfTimesVMCrashed));
+        () -> doHScanContinuallyAndAssertOnResults(running));
 
-    Future<Void> crashingVmFuture = executor.runAsync(
-        () -> crashAlternatingVMS(keepCrashingVMs, numberOfTimesVMCrashed));
+    for (int i = 0; i < 100 && running.get(); i++) {
+      redisClusterStartupRule.moveBucketForKey(HASH_KEY);
+      GeodeAwaitility.await().during(200, TimeUnit.MILLISECONDS).until(() -> true);
+    }
 
+    running.set(false);
     hScanFuture.get();
-    crashingVmFuture.get();
   }
 
-  private static void doHScanContinuallyAndAssertOnResults(AtomicBoolean keepCrashingVMs,
-      AtomicInteger numberOfTimesServersCrashed) {
-    int numberOfAssertionsCompleted = 0;
-
+  private static void doHScanContinuallyAndAssertOnResults(AtomicBoolean running) {
     ScanCursor scanCursor = new ScanCursor("0", false);
     List<String> allEntries = new ArrayList<>();
     MapScanCursor<String, String> result;
+    int i = 0;
 
-    while (numberOfAssertionsCompleted < 3 || numberOfTimesServersCrashed.get() < 3) {
-
+    while (running.get()) {
       allEntries.clear();
       scanCursor.setCursor("0");
       scanCursor.setFinished(false);
 
+      do {
+        result = commands.hscan(HASH_KEY, scanCursor);
+        scanCursor.setCursor(result.getCursor());
+        Map<String, String> resultEntries = result.getMap();
+
+        resultEntries.forEach((key, value) -> allEntries.add(key));
+      } while (!result.isFinished());
+
       try {
-        do {
-          result = commands.hscan(HASH_KEY, scanCursor);
-          scanCursor.setCursor(result.getCursor());
-          Map<String, String> resultEntries = result.getMap();
-
-          resultEntries.forEach((key, value) -> allEntries.add(key));
-
-        } while (!result.isFinished());
-
         assertThat(allEntries).containsAll(INITIAL_DATA_SET.keySet());
-        numberOfAssertionsCompleted++;
-
-      } catch (RedisCommandExecutionException ignore) {
-      } catch (RedisException ex) {
-        if (!ex.getMessage().contains("Connection reset by peer")) {
-          throw ex;
-        } // ignore error
-
+      } catch (AssertionError ex) {
+        running.set(false);
+        throw ex;
       }
+
+      i++;
     }
-    keepCrashingVMs.set(false);
-  }
 
-  private void crashAlternatingVMS(AtomicBoolean keepCrashingVMs,
-      AtomicInteger numberOfTimesServersCrashed) {
-
-    int vmToCrashToggle = 3;
-    MemberVM vm;
-
-    do {
-      redisClusterStartupRule.crashVM(vmToCrashToggle);
-      vm = redisClusterStartupRule.startRedisVM(vmToCrashToggle, locator.getPort());
-      rebalanceAllRegions(vm);
-      numberOfTimesServersCrashed.incrementAndGet();
-      vmToCrashToggle = (vmToCrashToggle == 2) ? 3 : 2;
-
-    } while (keepCrashingVMs.get());
+    LogService.getLogger().info("--->>> Completed {} iterations of HSCAN", i);
   }
 
   private static Map<String, String> makeEntrySet(int sizeOfDataSet) {
@@ -178,17 +154,4 @@
     return dataSet;
   }
 
-  private static void rebalanceAllRegions(MemberVM vm) {
-    vm.invoke(() -> {
-
-      ResourceManager manager = ClusterStartupRule.getCache().getResourceManager();
-      RebalanceFactory factory = manager.createRebalanceFactory();
-
-      try {
-        factory.start().getResults();
-      } catch (InterruptedException e) {
-        throw new RuntimeException(e);
-      }
-    });
-  }
 }
diff --git a/geode-apis-compatible-with-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/hash/HashesAndCrashesDUnitTest.java b/geode-apis-compatible-with-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/hash/HashesAndCrashesDUnitTest.java
index caf3f86..7d62d51 100644
--- a/geode-apis-compatible-with-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/hash/HashesAndCrashesDUnitTest.java
+++ b/geode-apis-compatible-with-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/hash/HashesAndCrashesDUnitTest.java
@@ -92,7 +92,11 @@
 
   @AfterClass
   public static void cleanup() {
-    clusterClient.shutdown();
+    try {
+      clusterClient.shutdown();
+    } catch (Exception ignored) {
+      // https://github.com/lettuce-io/lettuce-core/issues/1800
+    }
   }
 
   @Test
diff --git a/geode-apis-compatible-with-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/hash/HdelDUnitTest.java b/geode-apis-compatible-with-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/hash/HdelDUnitTest.java
index c206d3e..a80c36a 100644
--- a/geode-apis-compatible-with-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/hash/HdelDUnitTest.java
+++ b/geode-apis-compatible-with-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/hash/HdelDUnitTest.java
@@ -22,13 +22,14 @@
 import java.util.concurrent.atomic.AtomicLong;
 
 import io.lettuce.core.RedisException;
+import io.lettuce.core.cluster.ClusterClientOptions;
+import io.lettuce.core.cluster.ClusterTopologyRefreshOptions;
 import io.lettuce.core.cluster.RedisClusterClient;
 import io.lettuce.core.cluster.api.sync.RedisAdvancedClusterCommands;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
-import org.junit.Ignore;
 import org.junit.Test;
 
 import org.apache.geode.redis.ConcurrentLoopingThreads;
@@ -36,7 +37,6 @@
 import org.apache.geode.test.dunit.rules.RedisClusterStartupRule;
 import org.apache.geode.test.junit.rules.ExecutorServiceRule;
 
-@Ignore("GEODE-9368")
 public class HdelDUnitTest {
 
   @ClassRule
@@ -47,8 +47,6 @@
 
   private static final int HASH_SIZE = 50000;
   private static MemberVM locator;
-  private static MemberVM server1;
-  private static MemberVM server2;
   private static RedisAdvancedClusterCommands<String, String> lettuce;
   private static RedisClusterClient clusterClient;
 
@@ -56,18 +54,33 @@
   public static void classSetup() {
     locator = cluster.startLocatorVM(0);
 
-    server1 = cluster.startRedisVM(1, locator.getPort());
-    server2 = cluster.startRedisVM(2, locator.getPort());
+    cluster.startRedisVM(1, locator.getPort());
+    cluster.startRedisVM(2, locator.getPort());
 
     int redisServerPort1 = cluster.getRedisPort(1);
     clusterClient = RedisClusterClient.create("redis://localhost:" + redisServerPort1);
 
+    ClusterTopologyRefreshOptions refreshOptions =
+        ClusterTopologyRefreshOptions.builder()
+            .enableAllAdaptiveRefreshTriggers()
+            .refreshTriggersReconnectAttempts(1)
+            .build();
+
+    clusterClient.setOptions(ClusterClientOptions.builder()
+        .topologyRefreshOptions(refreshOptions)
+        .validateClusterNodeMembership(false)
+        .build());
+
     lettuce = clusterClient.connect().sync();
   }
 
   @AfterClass
   public static void cleanup() {
-    clusterClient.shutdown();
+    try {
+      clusterClient.shutdown();
+    } catch (Exception ignored) {
+      // https://github.com/lettuce-io/lettuce-core/issues/1800
+    }
   }
 
   @Before
@@ -119,7 +132,7 @@
             .start();
 
     cluster.crashVM(2);
-    server2 = cluster.startRedisVM(2, locator.getPort());
+    cluster.startRedisVM(2, locator.getPort());
 
     loopingThreads.await();
     assertThat(lettuce.hgetall(key).size())
diff --git a/geode-apis-compatible-with-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/key/RenameDUnitTest.java b/geode-apis-compatible-with-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/key/RenameDUnitTest.java
index 56a41c4..4768dc9 100644
--- a/geode-apis-compatible-with-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/key/RenameDUnitTest.java
+++ b/geode-apis-compatible-with-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/key/RenameDUnitTest.java
@@ -15,26 +15,34 @@
 package org.apache.geode.redis.internal.executor.key;
 
 
+import static org.apache.geode.test.dunit.rules.RedisClusterStartupRule.BIND_ADDRESS;
+import static org.apache.geode.test.dunit.rules.RedisClusterStartupRule.REDIS_CLIENT_TIMEOUT;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.Callable;
 import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
+import org.junit.Rule;
 import org.junit.Test;
 import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.Jedis;
 import redis.clients.jedis.JedisCluster;
+import redis.clients.jedis.exceptions.JedisMovedDataException;
 
 import org.apache.geode.redis.internal.data.RedisKey;
 import org.apache.geode.redis.internal.executor.StripedExecutor;
@@ -42,30 +50,29 @@
 import org.apache.geode.test.awaitility.GeodeAwaitility;
 import org.apache.geode.test.dunit.rules.MemberVM;
 import org.apache.geode.test.dunit.rules.RedisClusterStartupRule;
+import org.apache.geode.test.junit.rules.ExecutorServiceRule;
 
 public class RenameDUnitTest {
 
   @ClassRule
   public static RedisClusterStartupRule clusterStartUp = new RedisClusterStartupRule(3);
 
-  private static final String LOCAL_HOST = "127.0.0.1";
-  private static final int JEDIS_TIMEOUT =
-      Math.toIntExact(GeodeAwaitility.getTimeout().toMillis());
-  private final ExecutorService pool = Executors.newCachedThreadPool();
+  @Rule
+  public ExecutorServiceRule executor = new ExecutorServiceRule();
 
   private static JedisCluster jedisCluster;
   private static MemberVM locator;
-  private static MemberVM server1;
-  private static MemberVM server2;
 
   @BeforeClass
   public static void setup() {
     locator = clusterStartUp.startLocatorVM(0);
-    server1 = clusterStartUp.startRedisVM(1, locator.getPort());
-    server2 = clusterStartUp.startRedisVM(2, locator.getPort());
+    clusterStartUp.startRedisVM(1, locator.getPort());
+    clusterStartUp.startRedisVM(2, locator.getPort());
+    clusterStartUp.startRedisVM(3, locator.getPort());
 
     int redisServerPort1 = clusterStartUp.getRedisPort(1);
-    jedisCluster = new JedisCluster(new HostAndPort(LOCAL_HOST, redisServerPort1), JEDIS_TIMEOUT);
+    jedisCluster =
+        new JedisCluster(new HostAndPort(BIND_ADDRESS, redisServerPort1), REDIS_CLIENT_TIMEOUT);
   }
 
   @Before
@@ -76,9 +83,6 @@
   @AfterClass
   public static void tearDown() {
     jedisCluster.close();
-
-    server1.stop();
-    server2.stop();
   }
 
   @Test
@@ -116,6 +120,20 @@
     }
   }
 
+  @Test
+  public void testRenameWithKeysOnDifferentServers_shouldReturnMovedError() {
+    int port1 = clusterStartUp.getRedisPort(1);
+    Jedis jedis = new Jedis(BIND_ADDRESS, port1, REDIS_CLIENT_TIMEOUT);
+
+    String srcKey = clusterStartUp.getKeyOnServer("key-", 1);
+    String dstKey = clusterStartUp.getKeyOnServer("key-", 2);
+
+    jedis.set(srcKey, "Fancy that");
+
+    assertThatThrownBy(() -> jedis.rename(srcKey, dstKey))
+        .isInstanceOf(JedisMovedDataException.class);
+  }
+
   private Set<String> getKeysOnSameRandomStripe(int numKeysNeeded) {
     Random random = new Random();
     String key1 = "{rename}keyz" + random.nextInt();
@@ -161,30 +179,30 @@
     String oldKey4 = listOfKeys4.get(0);
     String newKey4 = listOfKeys4.get(1);
 
-    Runnable renameOldKey1ToNewKey1 = () -> {
+    Callable<String> renameOldKey1ToNewKey1 = () -> {
       cyclicBarrierAwait(startCyclicBarrier);
-      jedisCluster.rename(oldKey1, newKey1);
+      return jedisCluster.rename(oldKey1, newKey1);
     };
 
-    Runnable renameOldKey2ToNewKey2 = () -> {
+    Callable<String> renameOldKey2ToNewKey2 = () -> {
       cyclicBarrierAwait(startCyclicBarrier);
-      jedisCluster.rename(oldKey2, newKey2);
+      return jedisCluster.rename(oldKey2, newKey2);
     };
 
-    Runnable renameOldKey3ToNewKey3 = () -> {
+    Callable<String> renameOldKey3ToNewKey3 = () -> {
       cyclicBarrierAwait(startCyclicBarrier);
-      jedisCluster.rename(oldKey3, newKey3);
+      return jedisCluster.rename(oldKey3, newKey3);
     };
 
-    Runnable renameOldKey4ToNewKey4 = () -> {
+    Callable<String> renameOldKey4ToNewKey4 = () -> {
       cyclicBarrierAwait(startCyclicBarrier);
-      jedisCluster.rename(oldKey4, newKey4);
+      return jedisCluster.rename(oldKey4, newKey4);
     };
 
-    Future<?> future1 = pool.submit(renameOldKey1ToNewKey1);
-    Future<?> future2 = pool.submit(renameOldKey2ToNewKey2);
-    Future<?> future3 = pool.submit(renameOldKey3ToNewKey3);
-    Future<?> future4 = pool.submit(renameOldKey4ToNewKey4);
+    Future<?> future1 = executor.submit(renameOldKey1ToNewKey1);
+    Future<?> future2 = executor.submit(renameOldKey2ToNewKey2);
+    Future<?> future3 = executor.submit(renameOldKey3ToNewKey3);
+    Future<?> future4 = executor.submit(renameOldKey4ToNewKey4);
 
     future1.get();
     future2.get();
@@ -192,6 +210,57 @@
     future4.get();
   }
 
+  @Test
+  public void givenBucketsMoveDuringRename_thenDataIsNotLost() throws Exception {
+    AtomicBoolean running = new AtomicBoolean(true);
+
+    List<String> hashtags = new ArrayList<>();
+    hashtags.add(clusterStartUp.getKeyOnServer("rename", 1));
+    hashtags.add(clusterStartUp.getKeyOnServer("rename", 2));
+    hashtags.add(clusterStartUp.getKeyOnServer("rename", 3));
+
+    Runnable task1 = () -> renamePerformAndVerify(1, 10000, hashtags.get(0), running);
+    Runnable task2 = () -> renamePerformAndVerify(2, 10000, hashtags.get(1), running);
+    Runnable task3 = () -> renamePerformAndVerify(3, 10000, hashtags.get(2), running);
+
+    Future<Void> future1 = executor.runAsync(task1);
+    Future<Void> future2 = executor.runAsync(task2);
+    Future<Void> future3 = executor.runAsync(task3);
+
+    for (int i = 0; i < 100 && running.get(); i++) {
+      clusterStartUp.moveBucketForKey(hashtags.get(i % hashtags.size()));
+      GeodeAwaitility.await().during(Duration.ofMillis(200)).until(() -> true);
+    }
+
+    running.set(false);
+
+    future1.get();
+    future2.get();
+    future3.get();
+  }
+
+  private void renamePerformAndVerify(int index, int minimumIterations, String hashtag,
+      AtomicBoolean isRunning) {
+    String baseKey = "{" + hashtag + "}-key-" + index;
+    jedisCluster.set(baseKey + "-0", "value");
+    int iterationCount = 0;
+
+    while (iterationCount < minimumIterations || isRunning.get()) {
+      String oldKey = baseKey + "-" + iterationCount;
+      String newKey = baseKey + "-" + (iterationCount + 1);
+      try {
+        jedisCluster.rename(oldKey, newKey);
+      } catch (Exception ex) {
+        isRunning.set(false);
+        throw new RuntimeException("Exception performing RENAME " + oldKey + " " + newKey, ex);
+      }
+
+      assertThat(jedisCluster.exists(newKey))
+          .as("key " + newKey + " should exist").isTrue();
+      iterationCount += 1;
+    }
+  }
+
   private void cyclicBarrierAwait(CyclicBarrier startCyclicBarrier) {
     try {
       startCyclicBarrier.await();
diff --git a/geode-apis-compatible-with-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/string/StringsDUnitTest.java b/geode-apis-compatible-with-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/string/StringsDUnitTest.java
index e900f6e..a93084c 100644
--- a/geode-apis-compatible-with-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/string/StringsDUnitTest.java
+++ b/geode-apis-compatible-with-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/string/StringsDUnitTest.java
@@ -15,18 +15,25 @@
 
 package org.apache.geode.redis.internal.executor.string;
 
+import static org.apache.geode.test.dunit.rules.RedisClusterStartupRule.BIND_ADDRESS;
+import static org.apache.geode.test.dunit.rules.RedisClusterStartupRule.REDIS_CLIENT_TIMEOUT;
 import static org.assertj.core.api.Assertions.assertThat;
 
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Consumer;
 
 import org.junit.After;
 import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
+import org.junit.Rule;
 import org.junit.Test;
 import redis.clients.jedis.HostAndPort;
 import redis.clients.jedis.JedisCluster;
@@ -36,32 +43,32 @@
 import org.apache.geode.test.awaitility.GeodeAwaitility;
 import org.apache.geode.test.dunit.rules.MemberVM;
 import org.apache.geode.test.dunit.rules.RedisClusterStartupRule;
+import org.apache.geode.test.junit.rules.ExecutorServiceRule;
 
 public class StringsDUnitTest {
 
   @ClassRule
   public static RedisClusterStartupRule clusterStartUp = new RedisClusterStartupRule(4);
 
-  private static final String LOCAL_HOST = "127.0.0.1";
+  @Rule
+  public ExecutorServiceRule executor = new ExecutorServiceRule();
+
   private static final int LIST_SIZE = 1000;
   private static final int NUM_ITERATIONS = 1000;
-  private static final int JEDIS_TIMEOUT = Math.toIntExact(GeodeAwaitility.getTimeout().toMillis());
   private static JedisCluster jedisCluster;
 
   private static MemberVM locator;
-  private static MemberVM server1;
-  private static MemberVM server2;
-  private static MemberVM server3;
 
   @BeforeClass
   public static void classSetup() {
     locator = clusterStartUp.startLocatorVM(0);
-    server1 = clusterStartUp.startRedisVM(1, locator.getPort());
-    server2 = clusterStartUp.startRedisVM(2, locator.getPort());
-    server3 = clusterStartUp.startRedisVM(3, locator.getPort());
+    clusterStartUp.startRedisVM(1, locator.getPort());
+    clusterStartUp.startRedisVM(2, locator.getPort());
+    clusterStartUp.startRedisVM(3, locator.getPort());
 
     int redisServerPort1 = clusterStartUp.getRedisPort(1);
-    jedisCluster = new JedisCluster(new HostAndPort(LOCAL_HOST, redisServerPort1), JEDIS_TIMEOUT);
+    jedisCluster =
+        new JedisCluster(new HostAndPort(BIND_ADDRESS, redisServerPort1), REDIS_CLIENT_TIMEOUT);
   }
 
   @After
@@ -72,10 +79,6 @@
   @AfterClass
   public static void tearDown() {
     jedisCluster.close();
-
-    server1.stop();
-    server2.stop();
-    server3.stop();
   }
 
   @Test
@@ -234,6 +237,73 @@
     }
   }
 
+  @Test
+  public void givenBucketsMoveDuringAppend_thenDataIsNotLost() throws Exception {
+    AtomicBoolean running = new AtomicBoolean(true);
+
+    List<String> hashtags = new ArrayList<>();
+    hashtags.add(clusterStartUp.getKeyOnServer("append", 1));
+    hashtags.add(clusterStartUp.getKeyOnServer("append", 2));
+    hashtags.add(clusterStartUp.getKeyOnServer("append", 3));
+
+    Runnable task1 = () -> appendPerformAndVerify(1, 10000, hashtags.get(0), running);
+    Runnable task2 = () -> appendPerformAndVerify(2, 10000, hashtags.get(1), running);
+    Runnable task3 = () -> appendPerformAndVerify(3, 10000, hashtags.get(2), running);
+
+    Future<Void> future1 = executor.runAsync(task1);
+    Future<Void> future2 = executor.runAsync(task2);
+    Future<Void> future3 = executor.runAsync(task3);
+
+    for (int i = 0; i < 100 && running.get(); i++) {
+      clusterStartUp.moveBucketForKey(hashtags.get(i % hashtags.size()));
+      GeodeAwaitility.await().during(Duration.ofMillis(200)).until(() -> true);
+    }
+
+    for (int i = 0; i < 100 && running.get(); i++) {
+      clusterStartUp.moveBucketForKey(hashtags.get(i % hashtags.size()));
+      GeodeAwaitility.await().during(Duration.ofMillis(200)).until(() -> true);
+    }
+
+    running.set(false);
+
+    future1.get();
+    future2.get();
+    future3.get();
+  }
+
+  private void appendPerformAndVerify(int index, int minimumIterations, String hashtag,
+      AtomicBoolean isRunning) {
+    String key = "{" + hashtag + "}-key-" + index;
+    int iterationCount = 0;
+
+    while (iterationCount < minimumIterations || isRunning.get()) {
+      String appendString = "-" + key + "-" + iterationCount + "-";
+      try {
+        jedisCluster.append(key, appendString);
+      } catch (Exception ex) {
+        isRunning.set(false);
+        throw new RuntimeException("Exception performing APPEND " + appendString, ex);
+      }
+      iterationCount += 1;
+    }
+
+    String storedString = jedisCluster.get(key);
+
+    int idx = 0;
+    int i = 0;
+    while (i < iterationCount) {
+      String expectedValue = "-" + key + "-" + i + "-";
+      String foundValue = storedString.substring(idx, idx + expectedValue.length());
+      if (!foundValue.equals(expectedValue)) {
+        Assert.fail("unexpected " + foundValue + " at index " + i + " iterationCount="
+            + iterationCount + " in string "
+            + storedString);
+        break;
+      }
+      idx += expectedValue.length();
+      i++;
+    }
+  }
 
   private List<String> makeStringList(int setSize, String baseString) {
     List<String> strings = new ArrayList<>();
diff --git a/geode-apis-compatible-with-redis/src/distributedTest/java/org/apache/geode/redis/session/RedisSessionDUnitTest.java b/geode-apis-compatible-with-redis/src/distributedTest/java/org/apache/geode/redis/session/RedisSessionDUnitTest.java
index b2e5acb..5b62ea3 100644
--- a/geode-apis-compatible-with-redis/src/distributedTest/java/org/apache/geode/redis/session/RedisSessionDUnitTest.java
+++ b/geode-apis-compatible-with-redis/src/distributedTest/java/org/apache/geode/redis/session/RedisSessionDUnitTest.java
@@ -29,7 +29,7 @@
 
 import org.apache.geode.test.junit.rules.ExecutorServiceRule;
 
-@Ignore("GEODE-9341")
+@Ignore("GEODE-9437")
 public class RedisSessionDUnitTest extends SessionDUnitTest {
 
   @Rule
diff --git a/geode-apis-compatible-with-redis/src/distributedTest/java/org/apache/geode/redis/session/SessionDUnitTest.java b/geode-apis-compatible-with-redis/src/distributedTest/java/org/apache/geode/redis/session/SessionDUnitTest.java
index 3f62a3b..a13898a 100644
--- a/geode-apis-compatible-with-redis/src/distributedTest/java/org/apache/geode/redis/session/SessionDUnitTest.java
+++ b/geode-apis-compatible-with-redis/src/distributedTest/java/org/apache/geode/redis/session/SessionDUnitTest.java
@@ -16,7 +16,6 @@
 package org.apache.geode.redis.session;
 
 
-
 import java.net.HttpCookie;
 import java.time.Duration;
 import java.util.ArrayList;
@@ -29,15 +28,13 @@
 import io.github.resilience4j.retry.Retry;
 import io.github.resilience4j.retry.RetryConfig;
 import io.github.resilience4j.retry.RetryRegistry;
+import io.lettuce.core.RedisConnectionException;
 import io.lettuce.core.cluster.ClusterClientOptions;
 import io.lettuce.core.cluster.ClusterTopologyRefreshOptions;
 import io.lettuce.core.cluster.RedisClusterClient;
 import io.lettuce.core.cluster.api.StatefulRedisClusterConnection;
 import io.lettuce.core.cluster.api.sync.RedisAdvancedClusterCommands;
-import org.apache.logging.log4j.Level;
-import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
-import org.apache.logging.log4j.core.config.Configurator;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -54,7 +51,6 @@
 import org.springframework.web.client.RestTemplate;
 
 import org.apache.geode.internal.AvailablePortHelper;
-import org.apache.geode.logging.internal.log4j.api.FastLogger;
 import org.apache.geode.logging.internal.log4j.api.LogService;
 import org.apache.geode.redis.session.springRedisTestApplication.RedisSpringTestApplication;
 import org.apache.geode.test.awaitility.GeodeAwaitility;
@@ -103,7 +99,8 @@
   protected static void setupRetry() {
     RetryConfig config = RetryConfig.custom()
         .maxAttempts(20)
-        .retryExceptions(HttpServerErrorException.InternalServerError.class)
+        .retryExceptions(HttpServerErrorException.InternalServerError.class,
+            RedisConnectionException.class)
         .build();
     RetryRegistry registry = RetryRegistry.of(config);
     retry = registry.retry("sessions");
@@ -140,7 +137,7 @@
 
     redisClient.setOptions(ClusterClientOptions.builder()
         .topologyRefreshOptions(refreshOptions)
-        .autoReconnect(true)
+        .validateClusterNodeMembership(false)
         .build());
     connection = redisClient.connect();
     commands = connection.sync();
@@ -167,6 +164,7 @@
     try {
       redisClient.shutdown();
     } catch (Exception ignored) {
+      // https://github.com/lettuce-io/lettuce-core/issues/1800
     }
   }
 
@@ -177,12 +175,7 @@
 
   protected static void startRedisServer(int server) {
     cluster.startRedisVM(server, cluster.getMember(LOCATOR).getPort());
-
-    cluster.getVM(server).invoke("Set logging level to DEBUG", () -> {
-      Logger logger = LogManager.getLogger("org.apache.geode.redis.internal");
-      Configurator.setAllLevels(logger.getName(), Level.getLevel("DEBUG"));
-      FastLogger.setDelegating(true);
-    });
+    cluster.enableDebugLogging(server);
   }
 
   protected static void startSpringApp(int sessionApp, long sessionTimeout, int... serverPorts) {
diff --git a/geode-apis-compatible-with-redis/src/distributedTest/java/org/apache/geode/redis/session/SessionExpirationDUnitTest.java b/geode-apis-compatible-with-redis/src/distributedTest/java/org/apache/geode/redis/session/SessionExpirationDUnitTest.java
index 873fca9..2d937ff 100644
--- a/geode-apis-compatible-with-redis/src/distributedTest/java/org/apache/geode/redis/session/SessionExpirationDUnitTest.java
+++ b/geode-apis-compatible-with-redis/src/distributedTest/java/org/apache/geode/redis/session/SessionExpirationDUnitTest.java
@@ -25,7 +25,6 @@
 import java.util.concurrent.TimeUnit;
 
 import org.junit.BeforeClass;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.springframework.http.HttpEntity;
 import org.springframework.http.HttpHeaders;
@@ -39,7 +38,6 @@
 import org.apache.geode.test.awaitility.GeodeAwaitility;
 import org.apache.geode.test.dunit.rules.ClusterStartupRule;
 
-@Ignore("GEODE-9341")
 public class SessionExpirationDUnitTest extends SessionDUnitTest {
 
   protected static final int SHORT_SESSION_TIMEOUT = 5;
diff --git a/geode-apis-compatible-with-redis/src/distributedTest/java/org/apache/geode/redis/session/springRedisTestApplication/config/WebMvcConfig.java b/geode-apis-compatible-with-redis/src/distributedTest/java/org/apache/geode/redis/session/springRedisTestApplication/config/WebMvcConfig.java
index 9788bc8..704b336 100755
--- a/geode-apis-compatible-with-redis/src/distributedTest/java/org/apache/geode/redis/session/springRedisTestApplication/config/WebMvcConfig.java
+++ b/geode-apis-compatible-with-redis/src/distributedTest/java/org/apache/geode/redis/session/springRedisTestApplication/config/WebMvcConfig.java
@@ -54,6 +54,7 @@
     return LettuceClientConfiguration.builder()
         .clientOptions(ClusterClientOptions.builder()
             .topologyRefreshOptions(refreshOptions)
+            .validateClusterNodeMembership(false)
             .build())
         .build();
   }
diff --git a/geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/connection/AuthIntegrationTest.java b/geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/connection/AuthIntegrationTest.java
index f7c8cad..c54ea52 100644
--- a/geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/connection/AuthIntegrationTest.java
+++ b/geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/connection/AuthIntegrationTest.java
@@ -53,7 +53,7 @@
     cache.close();
   }
 
-  public void setupCacheWithPassword() {
+  public void setupCacheWithPassword() throws Exception {
     port = AvailablePortHelper.getRandomAvailableTCPPort();
     CacheFactory cf = new CacheFactory();
     cf.set(LOG_LEVEL, "error");
@@ -62,6 +62,7 @@
     cf.set(ConfigurationProperties.REDIS_PASSWORD, PASSWORD);
     cache = cf.create();
     server = new GeodeRedisServer("localhost", port, (InternalCache) cache);
+    server.getRegionProvider().getSlotAdvisor().getBucketSlots();
     this.jedis = new Jedis("localhost", port, 100000);
   }
 
@@ -77,7 +78,7 @@
   }
 
   @Test
-  public void testAuthIncorrectNumberOfArguments() {
+  public void testAuthIncorrectNumberOfArguments() throws Exception {
     setupCacheWithPassword();
     assertThatThrownBy(() -> jedis.sendCommand(Protocol.Command.AUTH))
         .hasMessageContaining("wrong number of arguments");
@@ -86,14 +87,14 @@
   }
 
   @Test
-  public void testAuthConfig() {
+  public void testAuthConfig() throws Exception {
     setupCacheWithPassword();
     InternalDistributedSystem iD = (InternalDistributedSystem) cache.getDistributedSystem();
     assertThat(iD.getConfig().getRedisPassword()).isEqualTo(PASSWORD);
   }
 
   @Test
-  public void testAuthRejectAccept() {
+  public void testAuthRejectAccept() throws Exception {
     setupCacheWithPassword();
 
     assertThatThrownBy(() -> jedis.auth("wrongpwd"))
@@ -111,7 +112,7 @@
   }
 
   @Test
-  public void testAuthAcceptRequests() {
+  public void testAuthAcceptRequests() throws Exception {
     setupCacheWithPassword();
 
     assertThatThrownBy(() -> jedis.set("foo", "bar"))
@@ -124,7 +125,7 @@
   }
 
   @Test
-  public void testSeparateClientRequests() {
+  public void testSeparateClientRequests() throws Exception {
     setupCacheWithPassword();
     Jedis nonAuthorizedJedis = new Jedis("localhost", getPort(), 100000);
     Jedis authorizedJedis = new Jedis("localhost", getPort(), 100000);
@@ -141,7 +142,7 @@
   }
 
   @Test
-  public void lettuceAuthClient_withLettuceVersion6() {
+  public void lettuceAuthClient_withLettuceVersion6() throws Exception {
     setupCacheWithPassword();
 
     RedisURI uri = RedisURI.create(String.format("redis://%s@localhost:%d", PASSWORD, getPort()));
diff --git a/geode-apis-compatible-with-redis/src/integrationTest/resources/org/apache/geode/codeAnalysis/excludedClasses.txt b/geode-apis-compatible-with-redis/src/integrationTest/resources/org/apache/geode/codeAnalysis/excludedClasses.txt
index bdf1b26..8e7d92f 100644
--- a/geode-apis-compatible-with-redis/src/integrationTest/resources/org/apache/geode/codeAnalysis/excludedClasses.txt
+++ b/geode-apis-compatible-with-redis/src/integrationTest/resources/org/apache/geode/codeAnalysis/excludedClasses.txt
@@ -3,7 +3,8 @@
 org/apache/geode/redis/internal/collections/SizeableObjectOpenCustomHashSet
 org/apache/geode/redis/internal/collections/IndexibleTreeSet
 org/apache/geode/redis/internal/data/RedisDataMovedException
+org/apache/geode/redis/internal/data/RedisDataTypeMismatchException
 org/apache/geode/redis/internal/RedisException
-org/apache/geode/redis/internal/RedisRestoreKeyExistsException
+org/apache/geode/redis/internal/data/RedisRestoreKeyExistsException
 org/apache/geode/redis/internal/data/RedisSetCommandsFunctionExecutor$SetOp
 org/apache/geode/redis/internal/data/RedisStringCommandsFunctionExecutor$BitOp
diff --git a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/RedisException.java b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/RedisException.java
index ef1ea59..718df77 100755
--- a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/RedisException.java
+++ b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/RedisException.java
@@ -14,6 +14,7 @@
  */
 package org.apache.geode.redis.internal;
 
+
 /**
  * A general exception that can be thrown during command execution and will result in a Redis
  * protocol exception message being returned to the client.
@@ -30,6 +31,10 @@
     super(message);
   }
 
+  public RedisException(Throwable throwable) {
+    super(throwable);
+  }
+
   public RedisException(String message, Throwable throwable) {
     super(message, throwable);
   }
diff --git a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/RegionProvider.java b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/RegionProvider.java
index 46841e7..3728159 100644
--- a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/RegionProvider.java
+++ b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/RegionProvider.java
@@ -35,6 +35,7 @@
 
 import org.apache.geode.cache.PartitionAttributesFactory;
 import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionDestroyedException;
 import org.apache.geode.cache.RegionShortcut;
 import org.apache.geode.cache.partition.PartitionMemberInfo;
 import org.apache.geode.cache.partition.PartitionRegionHelper;
@@ -42,6 +43,9 @@
 import org.apache.geode.distributed.DistributedMember;
 import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.internal.cache.InternalRegionFactory;
+import org.apache.geode.internal.cache.PartitionedRegion;
+import org.apache.geode.internal.cache.PrimaryBucketLockException;
+import org.apache.geode.internal.cache.execute.BucketMovedException;
 import org.apache.geode.management.ManagementException;
 import org.apache.geode.redis.internal.cluster.RedisMemberInfo;
 import org.apache.geode.redis.internal.data.NullRedisDataStructures;
@@ -84,6 +88,7 @@
   private static final Map<RedisDataType, RedisData> NULL_TYPES = new HashMap<>();
 
   private final Region<RedisKey, RedisData> dataRegion;
+  private final PartitionedRegion partitionedRegion;
   private final RedisHashCommandsFunctionExecutor hashCommands;
   private final RedisSetCommandsFunctionExecutor setCommands;
   private final RedisStringCommandsFunctionExecutor stringCommands;
@@ -118,6 +123,7 @@
     redisDataRegionFactory.setPartitionAttributes(attributesFactory.create());
 
     dataRegion = redisDataRegionFactory.create(REDIS_DATA_REGION);
+    partitionedRegion = (PartitionedRegion) dataRegion;
 
     stringCommands = new RedisStringCommandsFunctionExecutor(this);
     setCommands = new RedisSetCommandsFunctionExecutor(this);
@@ -145,7 +151,16 @@
   }
 
   public <T> T execute(Object key, Callable<T> callable) {
-    return stripedExecutor.execute(key, callable);
+    try {
+      return partitionedRegion.computeWithPrimaryLocked(key,
+          () -> stripedExecutor.execute(key, callable));
+    } catch (PrimaryBucketLockException | BucketMovedException | RegionDestroyedException ex) {
+      throw createRedisDataMovedException((RedisKey) key);
+    } catch (RedisException bex) {
+      throw bex;
+    } catch (Exception ex) {
+      throw new RedisException(ex);
+    }
   }
 
   public RedisData getRedisData(RedisKey key) {
@@ -153,7 +168,13 @@
   }
 
   public RedisData getRedisData(RedisKey key, RedisData notFoundValue) {
-    RedisData result = getLocalDataRegion().get(key);
+    RedisData result;
+    try {
+      result = getLocalDataRegion().get(key);
+    } catch (RegionDestroyedException rex) {
+      throw createRedisDataMovedException(key);
+    }
+
     if (result != null) {
       if (result.hasExpired()) {
         result.doExpiration(this, key);
@@ -161,10 +182,7 @@
       }
     } else {
       if (!getSlotAdvisor().isLocal(key)) {
-        RedisMemberInfo memberInfo = getRedisMemberInfo(key);
-        Integer slot = key.getCrc16() & (REDIS_SLOTS - 1);
-        throw new RedisDataMovedException(slot, memberInfo.getHostAddress(),
-            memberInfo.getRedisPort());
+        throw createRedisDataMovedException(key);
       }
     }
     if (result == null) {
@@ -174,6 +192,13 @@
     }
   }
 
+  private RedisDataMovedException createRedisDataMovedException(RedisKey key) {
+    RedisMemberInfo memberInfo = getRedisMemberInfo(key);
+    Integer slot = key.getCrc16() & (REDIS_SLOTS - 1);
+    return new RedisDataMovedException(slot, memberInfo.getHostAddress(),
+        memberInfo.getRedisPort());
+  }
+
   private RedisMemberInfo getRedisMemberInfo(RedisKey key) {
     try {
       return getSlotAdvisor().getMemberInfo(key);
diff --git a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/cluster/RedisMemberInfo.java b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/cluster/RedisMemberInfo.java
index e0f72da..35fd22b 100644
--- a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/cluster/RedisMemberInfo.java
+++ b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/cluster/RedisMemberInfo.java
@@ -19,6 +19,7 @@
 import java.io.DataOutput;
 import java.io.IOException;
 import java.io.Serializable;
+import java.util.Objects;
 
 import org.apache.geode.DataSerializer;
 import org.apache.geode.distributed.DistributedMember;
@@ -85,4 +86,22 @@
   public String toString() {
     return member.toString() + " hostAddress: " + hostAddress + " redisPort: " + redisPort;
   }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (!(o instanceof RedisMemberInfo)) {
+      return false;
+    }
+    RedisMemberInfo that = (RedisMemberInfo) o;
+    return redisPort == that.redisPort && Objects.equals(member, that.member)
+        && Objects.equals(hostAddress, that.hostAddress);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(member, hostAddress, redisPort);
+  }
 }
diff --git a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/AbstractRedisData.java b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/AbstractRedisData.java
index f37b2a6..90d4112 100644
--- a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/AbstractRedisData.java
+++ b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/AbstractRedisData.java
@@ -39,7 +39,6 @@
 import org.apache.geode.internal.serialization.KnownVersion;
 import org.apache.geode.internal.serialization.SerializationContext;
 import org.apache.geode.logging.internal.log4j.api.LogService;
-import org.apache.geode.redis.internal.RedisRestoreKeyExistsException;
 import org.apache.geode.redis.internal.RegionProvider;
 import org.apache.geode.redis.internal.delta.AddsDeltaInfo;
 import org.apache.geode.redis.internal.delta.AppendDeltaInfo;
diff --git a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisData.java b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisData.java
index b10c458..3a64670 100644
--- a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisData.java
+++ b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisData.java
@@ -96,7 +96,9 @@
       short ordinal = inputStream.readShort();
       KnownVersion knownVersion = KnownVersion.getKnownVersion(ordinal);
       if (knownVersion == null) {
-        throw new RedisException("Unknown version ordinal: " + ordinal);
+        LogService.getLogger()
+            .info("Error restoring object - unknown version ordinal: {}", ordinal);
+        throw new RedisException(ERROR_RESTORE_INVALID_PAYLOAD);
       }
 
       obj = DataSerializer.readObject(new VersionedDataInputStream(bais, knownVersion));
diff --git a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisDataMovedException.java b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisDataMovedException.java
index 46193b3..7aeca4e 100644
--- a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisDataMovedException.java
+++ b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisDataMovedException.java
@@ -15,7 +15,9 @@
 
 package org.apache.geode.redis.internal.data;
 
-public class RedisDataMovedException extends RuntimeException {
+import org.apache.geode.redis.internal.RedisException;
+
+public class RedisDataMovedException extends RedisException {
 
   private static final long serialVersionUID = 8138174496239512955L;
 
diff --git a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisDataTypeMismatchException.java b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisDataTypeMismatchException.java
index ee3de2f..dfc557d 100755
--- a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisDataTypeMismatchException.java
+++ b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisDataTypeMismatchException.java
@@ -15,11 +15,14 @@
  */
 package org.apache.geode.redis.internal.data;
 
+
+import org.apache.geode.redis.internal.RedisException;
+
 /**
  * This exception is for the case that a client attempts to operate on a data structure of one
  * {@link org.apache.geode.redis.internal.data.RedisDataType} with a command that is of another type
  */
-public class RedisDataTypeMismatchException extends RuntimeException {
+public class RedisDataTypeMismatchException extends RedisException {
 
   private static final long serialVersionUID = -2451663685348513870L;
 
diff --git a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/RedisRestoreKeyExistsException.java b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisRestoreKeyExistsException.java
similarity index 86%
rename from geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/RedisRestoreKeyExistsException.java
rename to geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisRestoreKeyExistsException.java
index 4add607..a6faf3b 100755
--- a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/RedisRestoreKeyExistsException.java
+++ b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisRestoreKeyExistsException.java
@@ -12,12 +12,14 @@
  * or implied. See the License for the specific language governing permissions and limitations under
  * the License.
  */
-package org.apache.geode.redis.internal;
+package org.apache.geode.redis.internal.data;
+
+import org.apache.geode.redis.internal.RedisException;
 
 /**
  * An exception thrown when the key being restored already exists.
  */
-public class RedisRestoreKeyExistsException extends RuntimeException {
+public class RedisRestoreKeyExistsException extends RedisException {
 
   private static final long serialVersionUID = -7022501593522613782L;
 
diff --git a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HScanExecutor.java b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HScanExecutor.java
index bce6e74..32f2cde 100755
--- a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HScanExecutor.java
+++ b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HScanExecutor.java
@@ -35,6 +35,7 @@
 import org.apache.logging.log4j.Logger;
 
 import org.apache.geode.logging.internal.log4j.api.LogService;
+import org.apache.geode.redis.internal.data.RedisData;
 import org.apache.geode.redis.internal.data.RedisDataTypeMismatchException;
 import org.apache.geode.redis.internal.data.RedisKey;
 import org.apache.geode.redis.internal.executor.RedisResponse;
@@ -50,16 +51,12 @@
   private static final Logger logger = LogService.getLogger();
 
   @Override
-  public RedisResponse executeCommand(Command command,
-      ExecutionHandlerContext context) {
+  public RedisResponse executeCommand(Command command, ExecutionHandlerContext context) {
 
     List<byte[]> commandElems = command.getProcessedCommand();
 
     String cursorString = bytesToString(commandElems.get(2));
     int cursor;
-    Pattern matchPattern;
-    String globPattern = null;
-    int count = DEFAULT_COUNT;
 
     try {
       cursor = Integer.parseInt(cursorString);
@@ -68,54 +65,67 @@
     }
 
     RedisKey key = command.getKey();
-    if (!getDataRegion(context).containsKey(key)) {
-      context.getRedisStats().incKeyspaceMisses();
-      return RedisResponse.emptyScan();
-    }
 
-    if (getDataRegion(context).get(key).getType() != REDIS_HASH) {
-      throw new RedisDataTypeMismatchException(ERROR_WRONG_TYPE);
-    }
+    // Because we're trying to preserve the same semantics of error conditions, with native redis,
+    // the ordering of input validation is reflected here. To that end the first check ends up
+    // being an existence check of the key. That causes a race since the data value needs to be
+    // accessed again when the actual command does its work. If the relevant bucket doesn't get
+    // locked throughout the call, the bucket may move producing inconsistent results.
+    return context.getRegionProvider().execute(key, () -> {
+      String globPattern = null;
+      int count = DEFAULT_COUNT;
+      Pattern matchPattern;
 
-    command.getCommandType().checkDeferredParameters(command, context);
+      RedisData value = context.getRegionProvider().getRedisData(key);
+      if (value.isNull()) {
+        context.getRedisStats().incKeyspaceMisses();
+        return RedisResponse.emptyScan();
+      }
 
-    for (int i = 3; i < commandElems.size(); i = i + 2) {
-      byte[] commandElemBytes = commandElems.get(i);
-      if (equalsIgnoreCaseBytes(commandElemBytes, bMATCH)) {
-        commandElemBytes = commandElems.get(i + 1);
-        globPattern = bytesToString(commandElemBytes);
+      if (value.getType() != REDIS_HASH) {
+        throw new RedisDataTypeMismatchException(ERROR_WRONG_TYPE);
+      }
 
-      } else if (equalsIgnoreCaseBytes(commandElemBytes, bCOUNT)) {
-        commandElemBytes = commandElems.get(i + 1);
-        try {
-          count = narrowLongToInt(bytesToLong(commandElemBytes));
-        } catch (NumberFormatException e) {
-          return RedisResponse.error(ERROR_NOT_INTEGER);
-        }
+      command.getCommandType().checkDeferredParameters(command, context);
 
-        if (count < 1) {
+      for (int i = 3; i < commandElems.size(); i = i + 2) {
+        byte[] commandElemBytes = commandElems.get(i);
+        if (equalsIgnoreCaseBytes(commandElemBytes, bMATCH)) {
+          commandElemBytes = commandElems.get(i + 1);
+          globPattern = bytesToString(commandElemBytes);
+
+        } else if (equalsIgnoreCaseBytes(commandElemBytes, bCOUNT)) {
+          commandElemBytes = commandElems.get(i + 1);
+          try {
+            count = narrowLongToInt(bytesToLong(commandElemBytes));
+          } catch (NumberFormatException e) {
+            return RedisResponse.error(ERROR_NOT_INTEGER);
+          }
+
+          if (count < 1) {
+            return RedisResponse.error(ERROR_SYNTAX);
+          }
+
+        } else {
           return RedisResponse.error(ERROR_SYNTAX);
         }
-
-      } else {
-        return RedisResponse.error(ERROR_SYNTAX);
       }
-    }
-    try {
-      matchPattern = convertGlobToRegex(globPattern);
-    } catch (PatternSyntaxException e) {
-      logger.warn(
-          "Could not compile the pattern: '{}' due to the following exception: '{}'. HSCAN will return an empty list.",
-          globPattern, e.getMessage());
+      try {
+        matchPattern = convertGlobToRegex(globPattern);
+      } catch (PatternSyntaxException e) {
+        logger.warn(
+            "Could not compile the pattern: '{}' due to the following exception: '{}'. HSCAN will return an empty list.",
+            globPattern, e.getMessage());
 
-      return RedisResponse.emptyScan();
-    }
-    RedisHashCommands redisHashCommands = context.getHashCommands();
+        return RedisResponse.emptyScan();
+      }
+      RedisHashCommands redisHashCommands = context.getHashCommands();
 
-    Pair<Integer, List<byte[]>> scanResult =
-        redisHashCommands.hscan(key, matchPattern, count, cursor);
+      Pair<Integer, List<byte[]>> scanResult =
+          redisHashCommands.hscan(key, matchPattern, count, cursor);
 
-    return RedisResponse.scan(new BigInteger(String.valueOf(scanResult.getLeft())),
-        scanResult.getRight());
+      return RedisResponse.scan(new BigInteger(String.valueOf(scanResult.getLeft())),
+          scanResult.getRight());
+    });
   }
 }
diff --git a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/key/RenameExecutor.java b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/key/RenameExecutor.java
index 62b78e1..1d054ea 100644
--- a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/key/RenameExecutor.java
+++ b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/key/RenameExecutor.java
@@ -40,6 +40,12 @@
       return RedisResponse.ok();
     }
 
+    if (key.getBucketId() != newKey.getBucketId()) {
+      // Will produce MOVED exceptions here for whichever key is at fault
+      context.getRegionProvider().getRedisData(newKey);
+      context.getRegionProvider().getRedisData(key);
+    }
+
     if (!redisKeyCommands.rename(key, newKey)) {
       return RedisResponse.error(ERROR_NO_SUCH_KEY);
     }
diff --git a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/key/RestoreExecutor.java b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/key/RestoreExecutor.java
index a54f07e..b78fcb3 100644
--- a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/key/RestoreExecutor.java
+++ b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/key/RestoreExecutor.java
@@ -25,8 +25,8 @@
 import java.util.List;
 
 import org.apache.geode.redis.internal.RedisException;
-import org.apache.geode.redis.internal.RedisRestoreKeyExistsException;
 import org.apache.geode.redis.internal.data.RedisKey;
+import org.apache.geode.redis.internal.data.RedisRestoreKeyExistsException;
 import org.apache.geode.redis.internal.executor.AbstractExecutor;
 import org.apache.geode.redis.internal.executor.RedisResponse;
 import org.apache.geode.redis.internal.netty.Coder;
diff --git a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SScanExecutor.java b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SScanExecutor.java
index f1099a1..5b20998 100755
--- a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SScanExecutor.java
+++ b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SScanExecutor.java
@@ -35,6 +35,7 @@
 import org.apache.commons.lang3.tuple.Pair;
 
 import org.apache.geode.logging.internal.log4j.api.LogService;
+import org.apache.geode.redis.internal.data.RedisData;
 import org.apache.geode.redis.internal.data.RedisDataTypeMismatchException;
 import org.apache.geode.redis.internal.data.RedisKey;
 import org.apache.geode.redis.internal.executor.RedisResponse;
@@ -66,12 +67,13 @@
 
     RedisKey key = command.getKey();
 
-    if (!getDataRegion(context).containsKey(key)) {
+    RedisData value = context.getRegionProvider().getRedisData(key);
+    if (value.isNull()) {
       context.getRedisStats().incKeyspaceMisses();
       return RedisResponse.emptyScan();
     }
 
-    if (getDataRegion(context).get(key).getType() != REDIS_SET) {
+    if (value.getType() != REDIS_SET) {
       throw new RedisDataTypeMismatchException(ERROR_WRONG_TYPE);
     }
 
diff --git a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/netty/ExecutionHandlerContext.java b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/netty/ExecutionHandlerContext.java
index e1fa74d..04d0a05 100644
--- a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/netty/ExecutionHandlerContext.java
+++ b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/netty/ExecutionHandlerContext.java
@@ -218,38 +218,45 @@
   }
 
   private RedisResponse getExceptionResponse(ChannelHandlerContext ctx, Throwable cause) {
-    RedisResponse response;
     if (cause instanceof IOException) {
       channelInactive(ctx);
       return null;
     }
 
-    if (cause instanceof IllegalStateException
-        || cause instanceof RedisParametersMismatchException
-        || cause instanceof RedisException
-        || cause instanceof NumberFormatException
-        || cause instanceof ArithmeticException) {
-      response = RedisResponse.error(cause.getMessage());
-    } else if (cause instanceof RedisDataMovedException) {
-      response = RedisResponse.moved(cause.getMessage());
-    } else if (cause instanceof RedisDataTypeMismatchException) {
-      response = RedisResponse.wrongType(cause.getMessage());
-    } else if (cause instanceof LowMemoryException) {
-      response = RedisResponse.oom(RedisConstants.ERROR_OOM_COMMAND_NOT_ALLOWED);
-    } else if (cause instanceof DecoderException
-        && cause.getCause() instanceof RedisCommandParserException) {
-      response =
-          RedisResponse.error(RedisConstants.PARSING_EXCEPTION_MESSAGE + ": " + cause.getMessage());
-    } else if (cause instanceof InterruptedException || cause instanceof CacheClosedException) {
-      response = RedisResponse.error(RedisConstants.SERVER_ERROR_SHUTDOWN);
+    Throwable rootCause = getRootCause(cause);
+    if (rootCause instanceof RedisDataMovedException) {
+      return RedisResponse.moved(rootCause.getMessage());
+    } else if (rootCause instanceof RedisDataTypeMismatchException) {
+      return RedisResponse.wrongType(rootCause.getMessage());
+    } else if (rootCause instanceof IllegalStateException
+        || rootCause instanceof RedisParametersMismatchException
+        || rootCause instanceof RedisException
+        || rootCause instanceof NumberFormatException
+        || rootCause instanceof ArithmeticException) {
+      return RedisResponse.error(rootCause.getMessage());
+    } else if (rootCause instanceof LowMemoryException) {
+      return RedisResponse.oom(RedisConstants.ERROR_OOM_COMMAND_NOT_ALLOWED);
+    } else if (rootCause instanceof DecoderException
+        && rootCause.getCause() instanceof RedisCommandParserException) {
+      return RedisResponse
+          .error(RedisConstants.PARSING_EXCEPTION_MESSAGE + ": " + rootCause.getMessage());
+    } else if (rootCause instanceof InterruptedException
+        || rootCause instanceof CacheClosedException) {
+      return RedisResponse.error(RedisConstants.SERVER_ERROR_SHUTDOWN);
     } else {
       if (logger.isErrorEnabled()) {
-        logger.error("GeodeRedisServer-Unexpected error handler for {}", ctx.channel(), cause);
+        logger.error("GeodeRedisServer-Unexpected error handler for {}", ctx.channel(), rootCause);
       }
-      response = RedisResponse.error(RedisConstants.SERVER_ERROR_MESSAGE);
+      return RedisResponse.error(RedisConstants.SERVER_ERROR_MESSAGE);
     }
+  }
 
-    return response;
+  private Throwable getRootCause(Throwable cause) {
+    Throwable root = cause;
+    while (root.getCause() != null) {
+      root = root.getCause();
+    }
+    return root;
   }
 
   @Override
diff --git a/geode-apis-compatible-with-redis/src/main/resources/org/apache/geode/redis/internal/sanctioned-geode-apis-compatible-with-redis-serializables.txt b/geode-apis-compatible-with-redis/src/main/resources/org/apache/geode/redis/internal/sanctioned-geode-apis-compatible-with-redis-serializables.txt
index a485f29..4fde503 100755
--- a/geode-apis-compatible-with-redis/src/main/resources/org/apache/geode/redis/internal/sanctioned-geode-apis-compatible-with-redis-serializables.txt
+++ b/geode-apis-compatible-with-redis/src/main/resources/org/apache/geode/redis/internal/sanctioned-geode-apis-compatible-with-redis-serializables.txt
@@ -2,7 +2,6 @@
 org/apache/geode/redis/internal/RedisCommandSupportLevel,false
 org/apache/geode/redis/internal/RedisCommandType,false,deferredParameterRequirements:org/apache/geode/redis/internal/ParameterRequirements/ParameterRequirements,executor:org/apache/geode/redis/internal/executor/Executor,parameterRequirements:org/apache/geode/redis/internal/ParameterRequirements/ParameterRequirements,supportLevel:org/apache/geode/redis/internal/RedisCommandSupportLevel
 org/apache/geode/redis/internal/data/RedisDataType,false,toStringValue:java/lang/String
-org/apache/geode/redis/internal/data/RedisDataTypeMismatchException,true,-2451663685348513870
 org/apache/geode/redis/internal/delta/DeltaType,false
 org/apache/geode/redis/internal/executor/BaseSetOptions$Exists,false
 org/apache/geode/redis/internal/netty/CoderException,true,4707944288714910949
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
index a41ad28..7289a6f 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
@@ -609,7 +609,7 @@
     return redundancyTracker;
   }
 
-  public void computeWithPrimaryLocked(Object key, Runnable r) throws PrimaryBucketLockException {
+  public <T> T computeWithPrimaryLocked(Object key, Callable<T> callable) throws Exception {
     int bucketId = PartitionedRegionHelper.getHashKey(this, null, key, null, null);
 
     BucketRegion br;
@@ -626,7 +626,7 @@
     }
 
     try {
-      r.run();
+      return callable.call();
     } finally {
       br.doUnlockForPrimary();
     }