blob: 3ea6a5c0cc490c45b0a31effb2bc7400c6756ec5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.benchmark.redis.tasks;
import static java.lang.Thread.currentThread;
import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import io.vavr.Function3;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.ConnectionPoolConfig;
import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.JedisPubSub;
import org.apache.geode.benchmark.redis.tests.PubSubBenchmarkConfiguration;
public final class JedisClientManager implements RedisClientManager {
private static final Logger logger = LoggerFactory.getLogger(RedisClientManager.class);
private static JedisCluster jedisCluster;
private static final RedisClient redisClient = new RedisClient() {
@Override
public String get(final String key) {
return jedisCluster.get(key);
}
@Override
public String set(final String key, final String value) {
return jedisCluster.set(key, value);
}
@Override
public String hget(final String key, final String field) {
return jedisCluster.hget(key, field);
}
@Override
public boolean hset(final String key, final String field, final String value) {
return 1 == jedisCluster.hset(key, field, value);
}
@Override
public long zadd(final String key, final double score, final String value) {
return jedisCluster.zadd(key, score, value);
}
@Override
public long zrem(final String key, final String value) {
return jedisCluster.zrem(key, value);
}
@Override
public List<String> zrange(final String key, final long start, final long stop) {
return jedisCluster.zrange(key, start, stop);
}
@Override
public List<String> zrangeByScore(final String key, final long start, final long stop) {
return jedisCluster.zrangeByScore(key, start, stop);
}
@Override
public SubscriptionListener createSubscriptionListener(
final PubSubBenchmarkConfiguration pubSubConfig,
final Function3<String, String, Unsubscriber, Void> channelMessageConsumer) {
return new JedisSubscriptionListener(new JedisPubSub() {
@Override
public void onPMessage(final String pattern, final String channel, final String message) {
super.onPMessage(pattern, channel, message);
final Unsubscriber unsubscriber =
channels -> punsubscribe(channels.toArray(new String[] {}));
channelMessageConsumer.apply(channel, message, unsubscriber);
}
@Override
public void onMessage(final String channel, final String message) {
super.onMessage(channel, message);
final Unsubscriber unsubscriber =
channels -> unsubscribe(channels.toArray(new String[] {}));
channelMessageConsumer.apply(channel, message, unsubscriber);
}
});
}
@Override
public void subscribe(final SubscriptionListener listener, final String... channels) {
jedisCluster.subscribe(((JedisSubscriptionListener) listener).getJedisPubSub(), channels);
}
@Override
public void psubscribe(final SubscriptionListener listener, final String... channelPatterns) {
jedisCluster.psubscribe(((JedisSubscriptionListener) listener).getJedisPubSub(),
channelPatterns);
}
@Override
public void publish(final String channel, final String message) {
jedisCluster.publish(channel, message);
}
@Override
public void flushdb() {
jedisCluster.getClusterNodes()
.forEach((nodeKey, nodePool) -> new Jedis(nodePool.getResource()).flushDB());
}
};
@Override
public void connect(final Collection<InetSocketAddress> servers) {
logger.info("Connect RedisClient on thread {}.", currentThread());
final Set<HostAndPort> nodes = servers.stream()
.map(i -> new HostAndPort(i.getHostString(), i.getPort())).collect(Collectors.toSet());
final ConnectionPoolConfig poolConfig = new ConnectionPoolConfig();
poolConfig.setMaxTotal(-1);
poolConfig.setMaxIdle(-1);
poolConfig.setLifo(false);
final JedisCluster jedisCluster = new JedisCluster(nodes, Integer.MAX_VALUE, poolConfig);
final long start = System.nanoTime();
while (true) {
try (final Jedis jedis = new Jedis(jedisCluster.getConnectionFromSlot(0))) {
logger.info("Waiting for cluster to come up.");
final String clusterInfo = jedis.clusterInfo();
if (clusterInfo.contains("cluster_state:ok")) {
break;
}
logger.debug(clusterInfo);
} catch (final Exception e) {
if (System.nanoTime() - start > CONNECT_TIMEOUT.toNanos()) {
throw e;
}
try {
Thread.sleep(50);
} catch (final InterruptedException interruptedException) {
throw new RuntimeException(e);
}
logger.info("Failed connecting.", e);
}
}
JedisClientManager.jedisCluster = jedisCluster;
}
@Override
public void close() {
logger.info("Close RedisClient on thread {}.", currentThread());
jedisCluster.close();
}
@Override
public RedisClient get() {
logger.info("Getting RedisClient on thread {}.", currentThread());
return redisClient;
}
static class JedisSubscriptionListener implements RedisClient.SubscriptionListener {
private final JedisPubSub jedisPubSub;
public JedisSubscriptionListener(final JedisPubSub jedisPubSub) {
this.jedisPubSub = jedisPubSub;
}
JedisPubSub getJedisPubSub() {
return jedisPubSub;
}
}
}