blob: a93084c814268198e242ec59757135643f1a8389 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.redis.internal.executor.string;
import static org.apache.geode.test.dunit.rules.RedisClusterStartupRule.BIND_ADDRESS;
import static org.apache.geode.test.dunit.rules.RedisClusterStartupRule.REDIS_CLIENT_TIMEOUT;
import static org.assertj.core.api.Assertions.assertThat;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.params.SetParams;
import org.apache.geode.redis.ConcurrentLoopingThreads;
import org.apache.geode.test.awaitility.GeodeAwaitility;
import org.apache.geode.test.dunit.rules.MemberVM;
import org.apache.geode.test.dunit.rules.RedisClusterStartupRule;
import org.apache.geode.test.junit.rules.ExecutorServiceRule;
public class StringsDUnitTest {
@ClassRule
public static RedisClusterStartupRule clusterStartUp = new RedisClusterStartupRule(4);
@Rule
public ExecutorServiceRule executor = new ExecutorServiceRule();
private static final int LIST_SIZE = 1000;
private static final int NUM_ITERATIONS = 1000;
private static JedisCluster jedisCluster;
private static MemberVM locator;
@BeforeClass
public static void classSetup() {
locator = clusterStartUp.startLocatorVM(0);
clusterStartUp.startRedisVM(1, locator.getPort());
clusterStartUp.startRedisVM(2, locator.getPort());
clusterStartUp.startRedisVM(3, locator.getPort());
int redisServerPort1 = clusterStartUp.getRedisPort(1);
jedisCluster =
new JedisCluster(new HostAndPort(BIND_ADDRESS, redisServerPort1), REDIS_CLIENT_TIMEOUT);
}
@After
public void after() {
clusterStartUp.flushAll();
}
@AfterClass
public static void tearDown() {
jedisCluster.close();
}
@Test
public void get_shouldAllowClientToLocateDataForGivenKey() {
List<String> keys = makeStringList(LIST_SIZE, "key1-");
List<String> values = makeStringList(LIST_SIZE, "values1-");
new ConcurrentLoopingThreads(LIST_SIZE,
(i) -> jedisCluster.set(keys.get(i), values.get(i))).run();
new ConcurrentLoopingThreads(LIST_SIZE,
(i) -> assertThat(jedisCluster.get(keys.get(i))).isEqualTo(values.get(i))).run();
}
@Test
public void setnx_shouldOnlySucceedOnceForAParticularKey_givenMultipleClientsSettingSameKey() {
List<String> keys = makeStringList(LIST_SIZE, "key1-");
List<String> values = makeStringList(LIST_SIZE, "values1-");
AtomicInteger successes1 = new AtomicInteger(0);
AtomicInteger successes2 = new AtomicInteger(0);
new ConcurrentLoopingThreads(LIST_SIZE,
makeSetNXConsumer(keys, values, successes1, jedisCluster),
makeSetNXConsumer(keys, values, successes2, jedisCluster)).run();
new ConcurrentLoopingThreads(LIST_SIZE,
(i) -> assertThat(jedisCluster.get(keys.get(i))).isEqualTo(values.get(i))).run();
assertThat(successes1.get())
.as("Apparently ConcurrentLoopingThread did not run")
.isGreaterThan(0);
assertThat(successes2.get())
.as("Apparently ConcurrentLoopingThread did not run")
.isGreaterThan(0);
assertThat(successes1.get() + successes2.get()).isEqualTo(LIST_SIZE);
}
@Test
public void setxx_shouldAlwaysSucceed_givenMultipleClientsSettingSameKeyThatAlreadyExists() {
List<String> keys = makeStringList(LIST_SIZE, "key1-");
List<String> values = makeStringList(LIST_SIZE, "values1-");
AtomicLong successes1 = new AtomicLong(0);
AtomicLong successes2 = new AtomicLong(0);
for (int i = 0; i < LIST_SIZE; i++) {
jedisCluster.set(keys.get(i), values.get(i));
}
new ConcurrentLoopingThreads(LIST_SIZE,
makeSetXXConsumer(keys, values, successes1, jedisCluster),
makeSetXXConsumer(keys, values, successes2, jedisCluster)).run();
assertThat(successes2.get() + successes1.get()).isEqualTo(LIST_SIZE * 2);
}
@Test
public void set_shouldAllowMultipleClientsToSetValuesOnDifferentKeysConcurrently() {
List<String> keys1 = makeStringList(LIST_SIZE, "key1-");
List<String> values1 = makeStringList(LIST_SIZE, "values1-");
List<String> keys2 = makeStringList(LIST_SIZE, "key2-");
List<String> values2 = makeStringList(LIST_SIZE, "values2-");
new ConcurrentLoopingThreads(LIST_SIZE,
(i) -> jedisCluster.set(keys1.get(i), values1.get(i)),
(i) -> jedisCluster.set(keys2.get(i), values2.get(i))).run();
for (int i = 0; i < LIST_SIZE; i++) {
assertThat(jedisCluster.get(keys1.get(i))).isEqualTo(values1.get(i));
assertThat(jedisCluster.get(keys2.get(i))).isEqualTo(values2.get(i));
}
}
@Test
public void set_shouldAllowMultipleClientsToSetValuesOnTheSameKeysConcurrently() {
List<String> keys = makeStringList(LIST_SIZE, "key1-");
List<String> values = makeStringList(LIST_SIZE, "values1-");
new ConcurrentLoopingThreads(LIST_SIZE,
(i) -> jedisCluster.set(keys.get(i), values.get(i)),
(i) -> jedisCluster.set(keys.get(i), values.get(i))).run();
new ConcurrentLoopingThreads(LIST_SIZE,
(i) -> assertThat(jedisCluster.get(keys.get(i))).isEqualTo(values.get(i))).run();
}
@Test
public void append_shouldAllowMultipleClientsToAppendDifferentValueToSameKeyConcurrently() {
List<String> keys = makeStringList(LIST_SIZE, "key1-");
List<String> values1 = makeStringList(LIST_SIZE, "values1-");
List<String> values2 = makeStringList(LIST_SIZE, "values2-");
new ConcurrentLoopingThreads(LIST_SIZE,
(i) -> jedisCluster.append(keys.get(i), values1.get(i)),
(i) -> jedisCluster.append(keys.get(i), values2.get(i))).runInLockstep();
for (int i = 0; i < LIST_SIZE; i++) {
assertThat(jedisCluster.get(keys.get(i))).contains(values1.get(i));
assertThat(jedisCluster.get(keys.get(i))).contains(values2.get(i));
}
}
@Test
public void decr_shouldDecrementWhileDoingConcurrentDecr() {
String key = "key";
int initialValue = NUM_ITERATIONS * 2;
jedisCluster.set(key, String.valueOf(initialValue));
new ConcurrentLoopingThreads(NUM_ITERATIONS,
(i) -> jedisCluster.decr(key),
(i) -> jedisCluster.decr(key))
.run();
assertThat(jedisCluster.get(key)).isEqualTo("0");
}
@Test
public void decrby_shouldDecrementReliably_givenConcurrentThreadsPerformingDecrby() {
String key = "key";
int initialValue = NUM_ITERATIONS * 6;
jedisCluster.set(key, String.valueOf(initialValue));
new ConcurrentLoopingThreads(NUM_ITERATIONS,
(i) -> jedisCluster.decrBy(key, 4),
(i) -> jedisCluster.decrBy(key, 2)).runInLockstep();
assertThat(jedisCluster.get(key)).isEqualTo("0");
}
@Test
public void strLen_returnsStringLengthWhileConcurrentlyUpdatingValues() {
for (int i = 0; i < LIST_SIZE; i++) {
jedisCluster.set("key-" + i, "value-" + i);
}
new ConcurrentLoopingThreads(LIST_SIZE,
(i) -> jedisCluster.set("key-" + i, "changedValue-" + i),
(i) -> {
long stringLength = jedisCluster.strlen("key-" + i);
assertThat(
stringLength == ("changedValue-" + i).length()
|| stringLength == ("value-" + i).length()).isTrue();
}).runInLockstep();
for (int i = 0; i < LIST_SIZE; i++) {
String key = "key-" + i;
String value = jedisCluster.get(key);
String expectedValue = "changedValue-" + i;
assertThat(value.length()).isEqualTo(expectedValue.length());
assertThat(value).isEqualTo(expectedValue);
}
}
@Test
public void givenBucketsMoveDuringAppend_thenDataIsNotLost() throws Exception {
AtomicBoolean running = new AtomicBoolean(true);
List<String> hashtags = new ArrayList<>();
hashtags.add(clusterStartUp.getKeyOnServer("append", 1));
hashtags.add(clusterStartUp.getKeyOnServer("append", 2));
hashtags.add(clusterStartUp.getKeyOnServer("append", 3));
Runnable task1 = () -> appendPerformAndVerify(1, 10000, hashtags.get(0), running);
Runnable task2 = () -> appendPerformAndVerify(2, 10000, hashtags.get(1), running);
Runnable task3 = () -> appendPerformAndVerify(3, 10000, hashtags.get(2), running);
Future<Void> future1 = executor.runAsync(task1);
Future<Void> future2 = executor.runAsync(task2);
Future<Void> future3 = executor.runAsync(task3);
for (int i = 0; i < 100 && running.get(); i++) {
clusterStartUp.moveBucketForKey(hashtags.get(i % hashtags.size()));
GeodeAwaitility.await().during(Duration.ofMillis(200)).until(() -> true);
}
for (int i = 0; i < 100 && running.get(); i++) {
clusterStartUp.moveBucketForKey(hashtags.get(i % hashtags.size()));
GeodeAwaitility.await().during(Duration.ofMillis(200)).until(() -> true);
}
running.set(false);
future1.get();
future2.get();
future3.get();
}
private void appendPerformAndVerify(int index, int minimumIterations, String hashtag,
AtomicBoolean isRunning) {
String key = "{" + hashtag + "}-key-" + index;
int iterationCount = 0;
while (iterationCount < minimumIterations || isRunning.get()) {
String appendString = "-" + key + "-" + iterationCount + "-";
try {
jedisCluster.append(key, appendString);
} catch (Exception ex) {
isRunning.set(false);
throw new RuntimeException("Exception performing APPEND " + appendString, ex);
}
iterationCount += 1;
}
String storedString = jedisCluster.get(key);
int idx = 0;
int i = 0;
while (i < iterationCount) {
String expectedValue = "-" + key + "-" + i + "-";
String foundValue = storedString.substring(idx, idx + expectedValue.length());
if (!foundValue.equals(expectedValue)) {
Assert.fail("unexpected " + foundValue + " at index " + i + " iterationCount="
+ iterationCount + " in string "
+ storedString);
break;
}
idx += expectedValue.length();
i++;
}
}
private List<String> makeStringList(int setSize, String baseString) {
List<String> strings = new ArrayList<>();
for (int i = 0; i < setSize; i++) {
strings.add(baseString + i);
}
return strings;
}
private Consumer<Integer> makeSetXXConsumer(List<String> keys, List<String> values,
AtomicLong counter, JedisCluster jedis) {
return (i) -> {
SetParams setParams = new SetParams();
setParams.xx();
String result = jedis.set(keys.get(i), values.get(i), setParams);
if (result != null) {
counter.getAndIncrement();
}
};
}
private Consumer<Integer> makeSetNXConsumer(List<String> keys, List<String> values,
AtomicInteger counter, JedisCluster jedis) {
return (i) -> {
SetParams setParams = new SetParams();
setParams.nx();
String result = jedis.set(keys.get(i), values.get(i), setParams);
if (result != null) {
counter.getAndIncrement();
}
};
}
}