blob: a7d02283f1d7317fefe3c9b1b10e24e83a229ea6 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.io.redis;
import com.google.common.collect.Lists;
import com.google.common.net.HostAndPort;
import io.lettuce.core.AbstractRedisClient;
import io.lettuce.core.ClientOptions;
import io.lettuce.core.RedisClient;
import io.lettuce.core.RedisURI;
import io.lettuce.core.SocketOptions;
import io.lettuce.core.api.StatefulConnection;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.cluster.ClusterClientOptions;
import io.lettuce.core.cluster.RedisClusterClient;
import io.lettuce.core.cluster.api.StatefulRedisClusterConnection;
import io.lettuce.core.cluster.api.async.RedisClusterAsyncCommands;
import io.lettuce.core.codec.ByteArrayCodec;
import io.lettuce.core.codec.RedisCodec;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.io.redis.RedisAbstractConfig.ClientMode;
import org.apache.pulsar.io.redis.sink.RedisSinkConfig;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
public class RedisSession {
private final AbstractRedisClient client;
private final StatefulConnection connection;
private final RedisClusterAsyncCommands<byte[], byte[]> asyncCommands;
public RedisSession(AbstractRedisClient client, StatefulConnection connection, RedisClusterAsyncCommands<byte[], byte[]> asyncCommands) {
this.client = client;
this.connection = connection;
this.asyncCommands = asyncCommands;
}
public AbstractRedisClient client() {
return this.client;
}
public StatefulConnection connection() {
return this.connection;
}
public RedisClusterAsyncCommands<byte[], byte[]> asyncCommands() {
return this.asyncCommands;
}
public void close() throws Exception {
if (null != this.connection) {
this.connection.close();
}
if (null != this.client) {
this.client.shutdown();
}
}
public static RedisSession create(RedisSinkConfig config) {
RedisSession redisSession;
final RedisCodec<byte[], byte[]> codec = new ByteArrayCodec();
final SocketOptions socketOptions = SocketOptions.builder()
.tcpNoDelay(config.isTcpNoDelay())
.connectTimeout(Duration.ofMillis(config.getConnectTimeout()))
.keepAlive(config.isKeepAlive())
.build();
final ClientMode clientMode;
try {
clientMode = ClientMode.valueOf(config.getClientMode().toUpperCase());
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException("Illegal Redis client mode, valid values are: "
+ Arrays.asList(ClientMode.values()));
}
List<RedisURI> redisURIs = redisURIs(config.getHostAndPorts(), config);
if (clientMode == ClientMode.STANDALONE) {
ClientOptions.Builder clientOptions = ClientOptions.builder()
.socketOptions(socketOptions)
.requestQueueSize(config.getRequestQueue())
.autoReconnect(config.isAutoReconnect());
final RedisClient client = RedisClient.create(redisURIs.get(0));
client.setOptions(clientOptions.build());
final StatefulRedisConnection<byte[], byte[]> connection = client.connect(codec);
redisSession = new RedisSession(client, connection, connection.async());
} else if (clientMode == ClientMode.CLUSTER) {
ClusterClientOptions.Builder clientOptions = ClusterClientOptions.builder()
.requestQueueSize(config.getRequestQueue())
.autoReconnect(config.isAutoReconnect());
final RedisClusterClient client = RedisClusterClient.create(redisURIs);
client.setOptions(clientOptions.build());
final StatefulRedisClusterConnection<byte[], byte[]> connection = client.connect(codec);
redisSession = new RedisSession(client, connection, connection.async());
} else {
throw new UnsupportedOperationException(
String.format("%s is not supported", config.getClientMode())
);
}
return redisSession;
}
private static List<RedisURI> redisURIs(List<HostAndPort> hostAndPorts, RedisSinkConfig config) {
List<RedisURI> redisURIs = Lists.newArrayList();
for (HostAndPort hostAndPort : hostAndPorts) {
RedisURI.Builder builder = RedisURI.builder();
builder.withHost(hostAndPort.getHost());
builder.withPort(hostAndPort.getPort());
builder.withDatabase(config.getRedisDatabase());
if (!StringUtils.isBlank(config.getRedisPassword())) {
builder.withPassword(config.getRedisPassword());
}
redisURIs.add(builder.build());
}
return redisURIs;
}
}