blob: 7e129987dc0fa8b1fc7c5e4e3b659440c839b80d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.redis;
import static org.assertj.core.api.Assertions.assertThat;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import org.junit.After;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import redis.clients.jedis.Jedis;
import org.apache.geode.test.awaitility.GeodeAwaitility;
import org.apache.geode.test.dunit.AsyncInvocation;
import org.apache.geode.test.dunit.SerializableRunnable;
import org.apache.geode.test.dunit.VM;
import org.apache.geode.test.dunit.rules.MemberVM;
import org.apache.geode.test.dunit.rules.RedisClusterStartupRule;
import org.apache.geode.test.junit.categories.RedisTest;
@Category({RedisTest.class})
public class RedisDistDUnitTest implements Serializable {
@ClassRule
public static RedisClusterStartupRule cluster = new RedisClusterStartupRule(5);
private static String LOCALHOST = "localhost";
public static final String KEY = "key";
private static VM client1;
private static VM client2;
private static int server1Port;
private static int server2Port;
private static final int JEDIS_TIMEOUT =
Math.toIntExact(GeodeAwaitility.getTimeout().toMillis());
private abstract static class ClientTestBase extends SerializableRunnable {
int port;
protected ClientTestBase(int port) {
this.port = port;
}
}
@BeforeClass
public static void setup() {
MemberVM locator = cluster.startLocatorVM(0);
cluster.startRedisVM(1, locator.getPort());
cluster.startRedisVM(2, locator.getPort());
server1Port = cluster.getRedisPort(1);
server2Port = cluster.getRedisPort(2);
client1 = cluster.getVM(3);
client2 = cluster.getVM(4);
}
class ConcurrentSADDOperation extends ClientTestBase {
private final Collection<String> strings;
private final String key;
protected ConcurrentSADDOperation(int port, String key, Collection<String> strings) {
super(port);
this.strings = strings;
this.key = key;
}
@Override
public void run() {
Jedis jedis = new Jedis(LOCALHOST, port, JEDIS_TIMEOUT);
for (String member : strings) {
jedis.sadd(key, member);
}
}
}
@After
public void cleanup() {
Jedis jedis = new Jedis(LOCALHOST, server1Port, JEDIS_TIMEOUT);
jedis.flushAll();
}
@Test
public void testConcurrentSaddOperations_runWithoutException_orDataLoss()
throws InterruptedException {
List<String> set1 = new ArrayList<>();
List<String> set2 = new ArrayList<>();
int setSize = populateSetValueArrays(set1, set2);
final String setName = "keyset";
Jedis jedis = new Jedis(LOCALHOST, server1Port, JEDIS_TIMEOUT);
AsyncInvocation<Void> remoteSaddInvocation =
client1.invokeAsync(new ConcurrentSADDOperation(server1Port, setName, set1));
client2.invoke(new ConcurrentSADDOperation(server2Port, setName, set2));
remoteSaddInvocation.await();
Set<String> smembers = jedis.smembers(setName);
assertThat(smembers).hasSize(setSize * 2);
assertThat(smembers).contains(set1.toArray(new String[] {}));
assertThat(smembers).contains(set2.toArray(new String[] {}));
}
private int populateSetValueArrays(List<String> set1, List<String> set2) {
int setSize = 5000;
for (int i = 0; i < setSize; i++) {
set1.add("SETA-" + i);
set2.add("SETB-" + i);
}
return setSize;
}
@Test
public void testConcCreateDestroy() throws Exception {
final int ops = 1000;
final String hKey = KEY + "hash";
final String sKey = KEY + "set";
final String key = KEY + "string";
class ConcCreateDestroy extends ClientTestBase {
protected ConcCreateDestroy(int port) {
super(port);
}
@Override
public void run() {
Jedis jedis = new Jedis(LOCALHOST, port, JEDIS_TIMEOUT);
Random r = new Random();
for (int i = 0; i < ops; i++) {
int n = r.nextInt(3);
switch (n) {
// hashes
case 0:
jedis.hset(hKey, randString(), randString());
jedis.del(hKey);
break;
case 1:
jedis.sadd(sKey, randString());
jedis.del(sKey);
break;
case 2:
jedis.set(key, randString());
jedis.del(key);
break;
}
}
}
}
// Expect to run with no exception
AsyncInvocation<Void> i = client1.invokeAsync(new ConcCreateDestroy(server1Port));
client2.invoke(new ConcCreateDestroy(server2Port));
i.await();
Jedis jedis = new Jedis(LOCALHOST, server1Port, JEDIS_TIMEOUT);
assertThat(jedis.keys("*")).isEmpty();
}
@Test
public void testConcurrentDel_iteratingOverEachKey() {
int iterations = 1000;
String keyBaseName = "DELBASE";
Jedis jedis = new Jedis(LOCALHOST, server1Port, JEDIS_TIMEOUT);
Jedis jedis2 = new Jedis(LOCALHOST, server2Port, JEDIS_TIMEOUT);
new ConcurrentLoopingThreads(
iterations,
(i) -> jedis.set(keyBaseName + i, "value" + i))
.run();
AtomicLong deletedCount = new AtomicLong();
new ConcurrentLoopingThreads(iterations,
(i) -> deletedCount.addAndGet(jedis.del(keyBaseName + i)),
(i) -> deletedCount.addAndGet(jedis2.del(keyBaseName + i)))
.run();
assertThat(deletedCount.get()).isEqualTo(iterations);
for (int i = 0; i < iterations; i++) {
assertThat(jedis.get(keyBaseName + i)).isNull();
}
}
@Test
public void testConcurrentDel_bulk() {
int iterations = 1000;
String keyBaseName = "DELBASE";
Jedis jedis = new Jedis(LOCALHOST, server1Port, JEDIS_TIMEOUT);
Jedis jedis2 = new Jedis(LOCALHOST, server2Port, JEDIS_TIMEOUT);
new ConcurrentLoopingThreads(
iterations,
(i) -> jedis.set(keyBaseName + i, "value" + i))
.run();
String[] keys = new String[iterations];
for (int i = 0; i < iterations; i++) {
keys[i] = keyBaseName + i;
}
AtomicLong deletedCount = new AtomicLong();
new ConcurrentLoopingThreads(2,
(i) -> deletedCount.addAndGet(jedis.del(keys)),
(i) -> deletedCount.addAndGet(jedis2.del(keys)))
.run();
assertThat(deletedCount.get()).isEqualTo(iterations);
for (int i = 0; i < iterations; i++) {
assertThat(jedis.get(keyBaseName + i)).isNull();
}
}
/**
* Just make sure there are no unexpected server crashes
*/
@Test
public void testConcOps() throws Exception {
final int ops = 100;
final String hKey = KEY + "hash";
final String sKey = KEY + "set";
class ConcOps extends ClientTestBase {
protected ConcOps(int port) {
super(port);
}
@Override
public void run() {
Jedis jedis = new Jedis(LOCALHOST, port, JEDIS_TIMEOUT);
Random r = new Random();
for (int i = 0; i < ops; i++) {
int n = r.nextInt(4);
if (n == 0) {
jedis.hset(hKey, randString(), randString());
jedis.hgetAll(hKey);
jedis.hvals(hKey);
} else {
jedis.sadd(sKey, randString());
jedis.smembers(sKey);
jedis.sdiff(sKey, "afd");
jedis.sunionstore("dst", sKey, "afds");
}
}
}
}
// Expect to run with no exception
AsyncInvocation<Void> i = client1.invokeAsync(new ConcOps(server1Port));
client2.invoke(new ConcOps(server2Port));
i.await();
}
private String randString() {
return Long.toHexString(Double.doubleToLongBits(Math.random()));
}
}