blob: 7e129987dc0fa8b1fc7c5e4e3b659440c839b80d [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
package org.apache.geode.redis;
import static org.assertj.core.api.Assertions.assertThat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import org.junit.After;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import redis.clients.jedis.Jedis;
import org.apache.geode.test.awaitility.GeodeAwaitility;
import org.apache.geode.test.dunit.AsyncInvocation;
import org.apache.geode.test.dunit.SerializableRunnable;
import org.apache.geode.test.dunit.VM;
import org.apache.geode.test.dunit.rules.MemberVM;
import org.apache.geode.test.dunit.rules.RedisClusterStartupRule;
import org.apache.geode.test.junit.categories.RedisTest;
public class RedisDistDUnitTest implements Serializable {
public static RedisClusterStartupRule cluster = new RedisClusterStartupRule(5);
private static String LOCALHOST = "localhost";
public static final String KEY = "key";
private static VM client1;
private static VM client2;
private static int server1Port;
private static int server2Port;
private static final int JEDIS_TIMEOUT =
private abstract static class ClientTestBase extends SerializableRunnable {
int port;
protected ClientTestBase(int port) {
this.port = port;
public static void setup() {
MemberVM locator = cluster.startLocatorVM(0);
cluster.startRedisVM(1, locator.getPort());
cluster.startRedisVM(2, locator.getPort());
server1Port = cluster.getRedisPort(1);
server2Port = cluster.getRedisPort(2);
client1 = cluster.getVM(3);
client2 = cluster.getVM(4);
class ConcurrentSADDOperation extends ClientTestBase {
private final Collection<String> strings;
private final String key;
protected ConcurrentSADDOperation(int port, String key, Collection<String> strings) {
this.strings = strings;
this.key = key;
public void run() {
Jedis jedis = new Jedis(LOCALHOST, port, JEDIS_TIMEOUT);
for (String member : strings) {
jedis.sadd(key, member);
public void cleanup() {
Jedis jedis = new Jedis(LOCALHOST, server1Port, JEDIS_TIMEOUT);
public void testConcurrentSaddOperations_runWithoutException_orDataLoss()
throws InterruptedException {
List<String> set1 = new ArrayList<>();
List<String> set2 = new ArrayList<>();
int setSize = populateSetValueArrays(set1, set2);
final String setName = "keyset";
Jedis jedis = new Jedis(LOCALHOST, server1Port, JEDIS_TIMEOUT);
AsyncInvocation<Void> remoteSaddInvocation =
client1.invokeAsync(new ConcurrentSADDOperation(server1Port, setName, set1));
client2.invoke(new ConcurrentSADDOperation(server2Port, setName, set2));
Set<String> smembers = jedis.smembers(setName);
assertThat(smembers).hasSize(setSize * 2);
assertThat(smembers).contains(set1.toArray(new String[] {}));
assertThat(smembers).contains(set2.toArray(new String[] {}));
private int populateSetValueArrays(List<String> set1, List<String> set2) {
int setSize = 5000;
for (int i = 0; i < setSize; i++) {
set1.add("SETA-" + i);
set2.add("SETB-" + i);
return setSize;
public void testConcCreateDestroy() throws Exception {
final int ops = 1000;
final String hKey = KEY + "hash";
final String sKey = KEY + "set";
final String key = KEY + "string";
class ConcCreateDestroy extends ClientTestBase {
protected ConcCreateDestroy(int port) {
public void run() {
Jedis jedis = new Jedis(LOCALHOST, port, JEDIS_TIMEOUT);
Random r = new Random();
for (int i = 0; i < ops; i++) {
int n = r.nextInt(3);
switch (n) {
// hashes
case 0:
jedis.hset(hKey, randString(), randString());
case 1:
jedis.sadd(sKey, randString());
case 2:
jedis.set(key, randString());
// Expect to run with no exception
AsyncInvocation<Void> i = client1.invokeAsync(new ConcCreateDestroy(server1Port));
client2.invoke(new ConcCreateDestroy(server2Port));
Jedis jedis = new Jedis(LOCALHOST, server1Port, JEDIS_TIMEOUT);
public void testConcurrentDel_iteratingOverEachKey() {
int iterations = 1000;
String keyBaseName = "DELBASE";
Jedis jedis = new Jedis(LOCALHOST, server1Port, JEDIS_TIMEOUT);
Jedis jedis2 = new Jedis(LOCALHOST, server2Port, JEDIS_TIMEOUT);
new ConcurrentLoopingThreads(
(i) -> jedis.set(keyBaseName + i, "value" + i))
AtomicLong deletedCount = new AtomicLong();
new ConcurrentLoopingThreads(iterations,
(i) -> deletedCount.addAndGet(jedis.del(keyBaseName + i)),
(i) -> deletedCount.addAndGet(jedis2.del(keyBaseName + i)))
for (int i = 0; i < iterations; i++) {
assertThat(jedis.get(keyBaseName + i)).isNull();
public void testConcurrentDel_bulk() {
int iterations = 1000;
String keyBaseName = "DELBASE";
Jedis jedis = new Jedis(LOCALHOST, server1Port, JEDIS_TIMEOUT);
Jedis jedis2 = new Jedis(LOCALHOST, server2Port, JEDIS_TIMEOUT);
new ConcurrentLoopingThreads(
(i) -> jedis.set(keyBaseName + i, "value" + i))
String[] keys = new String[iterations];
for (int i = 0; i < iterations; i++) {
keys[i] = keyBaseName + i;
AtomicLong deletedCount = new AtomicLong();
new ConcurrentLoopingThreads(2,
(i) -> deletedCount.addAndGet(jedis.del(keys)),
(i) -> deletedCount.addAndGet(jedis2.del(keys)))
for (int i = 0; i < iterations; i++) {
assertThat(jedis.get(keyBaseName + i)).isNull();
* Just make sure there are no unexpected server crashes
public void testConcOps() throws Exception {
final int ops = 100;
final String hKey = KEY + "hash";
final String sKey = KEY + "set";
class ConcOps extends ClientTestBase {
protected ConcOps(int port) {
public void run() {
Jedis jedis = new Jedis(LOCALHOST, port, JEDIS_TIMEOUT);
Random r = new Random();
for (int i = 0; i < ops; i++) {
int n = r.nextInt(4);
if (n == 0) {
jedis.hset(hKey, randString(), randString());
} else {
jedis.sadd(sKey, randString());
jedis.sdiff(sKey, "afd");
jedis.sunionstore("dst", sKey, "afds");
// Expect to run with no exception
AsyncInvocation<Void> i = client1.invokeAsync(new ConcOps(server1Port));
client2.invoke(new ConcOps(server2Port));
private String randString() {
return Long.toHexString(Double.doubleToLongBits(Math.random()));