blob: a6c48f70c22e75b55d4725f75aa12914da0fcd8e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.benchmark.redis.tasks;
import static java.lang.Thread.currentThread;
import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;
import io.lettuce.core.Range;
import io.lettuce.core.RedisURI;
import io.lettuce.core.cluster.RedisClusterClient;
import io.lettuce.core.cluster.pubsub.StatefulRedisClusterPubSubConnection;
import io.lettuce.core.cluster.pubsub.api.sync.RedisClusterPubSubCommands;
import io.lettuce.core.pubsub.RedisPubSubAdapter;
import io.lettuce.core.pubsub.RedisPubSubListener;
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
import io.vavr.Function3;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.geode.benchmark.redis.tests.PubSubBenchmarkConfiguration;
public final class LettucePubSubClientManager implements RedisClientManager {
private static final Logger logger = LoggerFactory.getLogger(LettucePubSubClientManager.class);
private static RedisClusterClient redisClusterClient;
private static final ThreadLocal<RedisClusterPubSubCommands<String, String>> redisClusterCommands =
ThreadLocal.withInitial(() -> {
logger.info("Setup for thread {}", Thread.currentThread().getId());
final StatefulRedisClusterPubSubConnection<String, String> redisClusterPubSubConnection =
redisClusterClient.connectPubSub();
return redisClusterPubSubConnection.sync();
});
private static final RedisClient redisClient = new RedisClient() {
@Override
public String get(final String key) {
return LettucePubSubClientManager.redisClusterCommands.get().get(key);
}
@Override
public String set(final String key, final String value) {
return LettucePubSubClientManager.redisClusterCommands.get().set(key, value);
}
@Override
public String hget(final String key, final String field) {
return LettucePubSubClientManager.redisClusterCommands.get().hget(key, field);
}
@Override
public boolean hset(final String key, final String field, final String value) {
return LettucePubSubClientManager.redisClusterCommands.get().hset(key, field, value);
}
@Override
public long zadd(String key, double score, String value) {
return LettucePubSubClientManager.redisClusterCommands.get().zadd(key, score, value);
}
@Override
public long zrem(String key, String value) {
return LettucePubSubClientManager.redisClusterCommands.get().zrem(key, value);
}
@Override
public List<String> zrange(String key, long start, long stop) {
return LettucePubSubClientManager.redisClusterCommands.get().zrange(key, start, stop);
}
@Override
public List<String> zrangeByScore(String key, long start, long stop) {
return LettucePubSubClientManager.redisClusterCommands.get().zrangebyscore(key,
Range.create(start, stop));
}
@Override
public SubscriptionListener createSubscriptionListener(
final PubSubBenchmarkConfiguration pubSubConfig,
final Function3<String, String, Unsubscriber, Void> channelMessageConsumer) {
return new LettuceSubscriptionListener(new RedisPubSubAdapter<String, String>() {
@Override
public void message(final String pattern, final String channel, final String message) {
super.message(pattern, channel, message);
final Unsubscriber unsubscriber =
channels -> LettucePubSubClientManager.redisClusterCommands.get()
.punsubscribe(channels.toArray(new String[] {}));
channelMessageConsumer.apply(channel, message, unsubscriber);
}
@Override
public void message(final String channel, final String message) {
super.message(channel, message);
final Unsubscriber unsubscriber =
channels -> LettucePubSubClientManager.redisClusterCommands.get()
.unsubscribe(channels.toArray(new String[] {}));
channelMessageConsumer.apply(channel, message, unsubscriber);
}
});
}
@Override
public void subscribe(final SubscriptionListener listener, final String... channels) {
final StatefulRedisPubSubConnection<String, String> connection =
LettucePubSubClientManager.redisClusterCommands.get().getStatefulConnection();
connection.addListener(((LettuceSubscriptionListener) listener).getListener());
LettucePubSubClientManager.redisClusterCommands.get().subscribe(channels);
}
@Override
public void psubscribe(final SubscriptionListener listener, final String... channels) {
final StatefulRedisPubSubConnection<String, String> connection =
LettucePubSubClientManager.redisClusterCommands.get().getStatefulConnection();
connection.addListener(((LettuceSubscriptionListener) listener).getListener());
LettucePubSubClientManager.redisClusterCommands.get().psubscribe(channels);
}
@Override
public void publish(String channel, String message) {
LettucePubSubClientManager.redisClusterCommands.get().publish(channel, message);
}
@Override
public void flushdb() {
redisClusterCommands.get().flushdb();
}
};
@Override
public void connect(final Collection<InetSocketAddress> servers) {
logger.info("Connect RedisClient on thread {}.", currentThread());
final List<RedisURI> nodes = servers.stream()
.map(i -> RedisURI.create(i.getHostString(), i.getPort())).collect(Collectors.toList());
final RedisClusterClient redisClusterClient = RedisClusterClient.create(nodes);
long start = System.nanoTime();
while (true) {
try (final StatefulRedisClusterPubSubConnection<String, String> connection =
redisClusterClient.connectPubSub()) {
logger.info("Waiting for cluster to come up.");
final String clusterInfo = connection.sync().clusterInfo();
if (clusterInfo.contains("cluster_state:ok")) {
break;
}
logger.debug(clusterInfo);
} catch (Exception e) {
if (System.nanoTime() - start > CONNECT_TIMEOUT.toNanos()) {
throw e;
}
try {
Thread.sleep(50);
} catch (InterruptedException interruptedException) {
throw new RuntimeException(e);
}
logger.info("Failed connecting.", e);
}
}
redisClusterClient.refreshPartitions();
LettucePubSubClientManager.redisClusterClient = redisClusterClient;
}
@Override
public void close() {
logger.info("Close RedisClient on thread {}.", currentThread());
redisClusterClient.shutdown();
}
@Override
public RedisClient get() {
logger.info("Getting RedisClient on thread {}.", currentThread());
return redisClient;
}
static class LettuceSubscriptionListener implements RedisClient.SubscriptionListener {
private final RedisPubSubListener<String, String> listener;
public LettuceSubscriptionListener(
RedisPubSubListener<String, String> listener) {
this.listener = listener;
}
RedisPubSubListener<String, String> getListener() {
return listener;
}
}
}