blob: 77f8361289c4a630ca727b13b96e7626b665562e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.redis.internal.executor.key;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.Callable;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Protocol;
import redis.clients.jedis.exceptions.JedisDataException;
import org.apache.geode.redis.internal.RedisConstants;
import org.apache.geode.redis.internal.data.ByteArrayWrapper;
import org.apache.geode.redis.internal.executor.StripedExecutor;
import org.apache.geode.redis.internal.executor.SynchronizedStripedExecutor;
import org.apache.geode.test.awaitility.GeodeAwaitility;
import org.apache.geode.test.dunit.rules.RedisPortSupplier;
public abstract class AbstractRenameIntegrationTest implements RedisPortSupplier {
private Jedis jedis;
private Jedis jedis2;
private Jedis jedis3;
private static final int REDIS_CLIENT_TIMEOUT =
Math.toIntExact(GeodeAwaitility.getTimeout().toMillis());
private static Random rand;
@Before
public void setUp() {
rand = new Random();
jedis = new Jedis("localhost", getPort(), REDIS_CLIENT_TIMEOUT);
jedis2 = new Jedis("localhost", getPort(), REDIS_CLIENT_TIMEOUT);
jedis3 = new Jedis("localhost", getPort(), REDIS_CLIENT_TIMEOUT);
}
@After
public void tearDown() {
jedis.flushAll();
jedis.close();
jedis2.close();
jedis3.close();
}
@Test
public void testTooFewArgs() {
assertThatThrownBy(() -> jedis.sendCommand(Protocol.Command.RENAME, "foo"))
.hasMessageContaining("wrong number of arguments");
}
@Test
public void testTooManyArgs() {
assertThatThrownBy(() -> jedis.sendCommand(Protocol.Command.RENAME, "foo", "newfoo", "bluefoo"))
.hasMessageContaining("wrong number of arguments");
}
@Test
public void testNewKey() {
jedis.set("foo", "bar");
jedis.rename("foo", "newfoo");
assertThat(jedis.get("newfoo")).isEqualTo("bar");
}
@Test
public void testOldKeyIsDeleted() {
jedis.set("foo", "bar");
jedis.rename("foo", "newfoo");
assertThat(jedis.get("foo")).isNull();
}
@Test
public void testRenameKeyThatDoesNotExist() {
try {
jedis.rename("foo", "newfoo");
} catch (JedisDataException e) {
assertThat(e.getMessage()).contains(RedisConstants.ERROR_NO_SUCH_KEY);
}
}
@Test
public void testHashMap() {
jedis.hset("foo", "field", "va");
jedis.rename("foo", "newfoo");
assertThat(jedis.hget("newfoo", "field")).isEqualTo("va");
}
@Test
public void testSet() {
jedis.sadd("foo", "data");
jedis.rename("foo", "newfoo");
assertThat(jedis.smembers("newfoo")).contains("data");
}
@Test
public void testRenameSameKey() {
jedis.set("blue", "moon");
assertThat(jedis.rename("blue", "blue")).isEqualTo("OK");
assertThat(jedis.get("blue")).isEqualTo("moon");
}
@Test
public void testConcurrentSets() throws ExecutionException, InterruptedException {
Set<String> stringsForK1 = new HashSet<String>();
Set<String> stringsForK2 = new HashSet<String>();
int numOfStrings = 500000;
Callable<Long> callable1 =
() -> addStringsToKeys(stringsForK1, "k1", numOfStrings, jedis);
int numOfStringsForSecondKey = 30000;
Callable<Long> callable2 =
() -> addStringsToKeys(stringsForK2, "k2", numOfStringsForSecondKey, jedis2);
Callable<String> callable3 = () -> renameKeys(jedis3);
ExecutorService pool = Executors.newFixedThreadPool(4);
Future<Long> future1 = pool.submit(callable1);
Future<Long> future2 = pool.submit(callable2);
Thread.sleep(rand.nextInt(1000));
Future<String> future3 = pool.submit(callable3);
future1.get();
future2.get();
try {
future3.get();
assertThat(jedis.scard("k2")).isEqualTo(numOfStrings);
assertThat(jedis.get("k1")).isEqualTo(null);
} catch (Exception e) {
assertThat(e.getMessage()).contains(RedisConstants.ERROR_NO_SUCH_KEY);
assertThat(jedis.scard("k1")).isEqualTo(numOfStrings);
assertThat(jedis.scard("k2")).isEqualTo(numOfStringsForSecondKey);
}
jedis2.close();
jedis3.close();
}
@Test
public void should_succeed_givenTwoKeysOnDifferentStripes() {
List<String> listOfKeys = getKeysOnDifferentStripes();
String oldKey = listOfKeys.get(0);
String newKey = listOfKeys.get(1);
jedis.sadd(oldKey, "value1");
jedis.sadd(newKey, "value2");
assertThat(jedis.rename(oldKey, newKey)).isEqualTo("OK");
assertThat(jedis.smembers(newKey)).containsExactly("value1");
assertThat(jedis.exists(oldKey)).isFalse();
}
@Test
public void should_succeed_givenTwoKeysOnSameStripe() {
List<String> listOfKeys = new ArrayList<>(getKeysOnSameRandomStripe(2));
String oldKey = listOfKeys.get(0);
String newKey = listOfKeys.get(1);
jedis.sadd(oldKey, "value1");
jedis.sadd(newKey, "value2");
assertThat(jedis.rename(oldKey, newKey)).isEqualTo("OK");
assertThat(jedis.smembers(newKey)).containsExactly("value1");
assertThat(jedis.exists(oldKey)).isFalse();
}
@Test
public void shouldNotDeadlock_concurrentRenames_givenStripeContention()
throws ExecutionException, InterruptedException {
List<String> keysOnStripe1 = new ArrayList<>(getKeysOnSameRandomStripe(2));
List<String> keysOnStripe2 = getKeysOnSameRandomStripe(2, keysOnStripe1.get(0));
for (int i = 0; i < 5; i++) {
doConcurrentRenamesDifferentKeys(
Arrays.asList(keysOnStripe1.get(0), keysOnStripe2.get(0)),
Arrays.asList(keysOnStripe2.get(1), keysOnStripe1.get(1)));
}
}
@Test
public void shouldThrowError_givenKeyDeletedDuringRename()
throws ExecutionException, InterruptedException {
CyclicBarrier startCyclicBarrier = new CyclicBarrier(2);
ExecutorService pool = Executors.newFixedThreadPool(2);
for (int i = 0; i < 100; i++) {
jedis.set("oldKey", "foo");
Runnable renameOldKeyToNewKey = () -> {
cyclicBarrierAwait(startCyclicBarrier);
jedis.rename("oldKey", "newKey");
};
Runnable deleteOldKey = () -> {
cyclicBarrierAwait(startCyclicBarrier);
jedis2.del("oldKey");
};
Future<?> future1 = pool.submit(renameOldKeyToNewKey);
Future<?> future2 = pool.submit(deleteOldKey);
try {
future1.get();
assertThat(jedis.get("newKey")).isEqualTo("foo");
} catch (Exception e) {
assertThat(e).hasMessageContaining("no such key");
}
future2.get();
assertThat(jedis.get("oldKey")).isNull();
}
}
@Test
public void shouldNotDeadlock_concurrentRenames_givenTwoKeysOnDifferentStripe()
throws ExecutionException, InterruptedException {
doConcurrentRenamesSameKeys(getKeysOnDifferentStripes());
}
@Test
public void shouldNotDeadlock_concurrentRenames_givenTwoKeysOnSameStripe()
throws ExecutionException, InterruptedException {
doConcurrentRenamesSameKeys(new ArrayList<>(getKeysOnSameRandomStripe(2)));
}
private List<String> getKeysOnDifferentStripes() {
String key1 = "keyz" + new Random().nextInt();
ByteArrayWrapper key1ByteArrayWrapper = new ByteArrayWrapper(key1.getBytes());
StripedExecutor stripedExecutor = new SynchronizedStripedExecutor();
int iterator = 0;
String key2;
do {
key2 = "key" + iterator;
iterator++;
} while (stripedExecutor.compareStripes(key1ByteArrayWrapper,
new ByteArrayWrapper(key2.getBytes())) == 0);
return Arrays.asList(key1, key2);
}
private Set<String> getKeysOnSameRandomStripe(int numKeysNeeded) {
Random random = new Random();
String key1 = "keyz" + random.nextInt();
ByteArrayWrapper key1ByteArrayWrapper = new ByteArrayWrapper(key1.getBytes());
StripedExecutor stripedExecutor = new SynchronizedStripedExecutor();
Set<String> keys = new HashSet<>();
keys.add(key1);
do {
String key2 = "key" + random.nextInt();
if (stripedExecutor.compareStripes(key1ByteArrayWrapper,
new ByteArrayWrapper(key2.getBytes())) == 0) {
keys.add(key2);
}
} while (keys.size() < numKeysNeeded);
return keys;
}
public void doConcurrentRenamesDifferentKeys(List<String> listOfKeys1, List<String> listOfKeys2)
throws ExecutionException, InterruptedException {
CyclicBarrier startCyclicBarrier = new CyclicBarrier(2);
String oldKey1 = listOfKeys1.get(0);
String newKey1 = listOfKeys1.get(1);
String oldKey2 = listOfKeys2.get(0);
String newKey2 = listOfKeys2.get(1);
jedis.sadd(oldKey1, "foo", "bar");
jedis.sadd(oldKey2, "bar3", "back3");
ExecutorService pool = Executors.newFixedThreadPool(2);
Runnable renameOldKey1ToNewKey1 = () -> {
cyclicBarrierAwait(startCyclicBarrier);
jedis.rename(oldKey1, newKey1);
};
Runnable renameOldKey2ToNewKey2 = () -> {
cyclicBarrierAwait(startCyclicBarrier);
jedis2.rename(oldKey2, newKey2);
};
Future<?> future1 = pool.submit(renameOldKey1ToNewKey1);
Future<?> future2 = pool.submit(renameOldKey2ToNewKey2);
future1.get();
future2.get();
}
private void cyclicBarrierAwait(CyclicBarrier startCyclicBarrier) {
try {
startCyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
throw new RuntimeException(e);
}
}
private List<String> getKeysOnSameRandomStripe(int numKeysNeeded, Object toAvoid) {
StripedExecutor stripedExecutor = new SynchronizedStripedExecutor();
List<String> keys = new ArrayList<>();
String key1;
ByteArrayWrapper key1ByteArrayWrapper;
do {
key1 = "keyz" + new Random().nextInt();
key1ByteArrayWrapper = new ByteArrayWrapper(key1.getBytes());
} while (stripedExecutor.compareStripes(key1ByteArrayWrapper, toAvoid) == 0 && keys.add(key1));
do {
String key2 = "key" + new Random().nextInt();
if (stripedExecutor.compareStripes(key1ByteArrayWrapper,
new ByteArrayWrapper(key2.getBytes())) == 0) {
keys.add(key2);
}
} while (keys.size() < numKeysNeeded);
return keys;
}
public void doConcurrentRenamesSameKeys(List<String> listOfKeys)
throws ExecutionException, InterruptedException {
String key1 = listOfKeys.get(0);
String key2 = listOfKeys.get(1);
CyclicBarrier startCyclicBarrier = new CyclicBarrier(2);
jedis.sadd(key1, "foo", "bar");
jedis.sadd(key2, "bar", "back");
ExecutorService pool = Executors.newFixedThreadPool(2);
Runnable renameKey1ToKey2 = () -> {
cyclicBarrierAwait(startCyclicBarrier);
jedis.rename(key1, key2);
};
Runnable renameKey2ToKey1 = () -> {
cyclicBarrierAwait(startCyclicBarrier);
jedis2.rename(key2, key1);
};
Future<?> future1 = pool.submit(renameKey1ToKey2);
Future<?> future2 = pool.submit(renameKey2ToKey1);
future1.get();
future2.get();
}
private Long addStringsToKeys(
Set<String> strings,
String key,
int numOfStrings,
Jedis client) {
generateStrings(numOfStrings, strings);
String[] stringArray = strings.toArray(new String[strings.size()]);
return client.sadd(key, stringArray);
}
private String renameKeys(Jedis client) {
return client.rename("k1", "k2");
}
private Set<String> generateStrings(int elements, Set<String> strings) {
for (int i = 0; i < elements; i++) {
String elem = String.valueOf(i);
strings.add(elem);
}
return strings;
}
}