blob: e1ebbcb8eb26669bc8412f66949e32b0115705ca [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
* 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
*/
package org.apache.storm.cassandra.client;
import com.datastax.driver.core.ConsistencyLevel;
import com.datastax.driver.core.SocketOptions;
import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy;
import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy.Builder;
import com.datastax.driver.core.policies.DefaultRetryPolicy;
import com.datastax.driver.core.policies.DowngradingConsistencyRetryPolicy;
import com.datastax.driver.core.policies.FallthroughRetryPolicy;
import com.datastax.driver.core.policies.LoadBalancingPolicy;
import com.datastax.driver.core.policies.LoggingRetryPolicy;
import com.datastax.driver.core.policies.RetryPolicy;
import com.datastax.driver.core.policies.RoundRobinPolicy;
import com.datastax.driver.core.policies.TokenAwarePolicy;
import com.google.common.base.Objects;
import java.io.Serializable;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.storm.utils.ObjectReader;
import org.apache.storm.utils.Utils;
/**
* Configuration used by cassandra storm components.
*/
public class CassandraConf implements Serializable {
public static final String CASSANDRA_USERNAME = "cassandra.username";
public static final String CASSANDRA_PASSWORD = "cassandra.password";
public static final String CASSANDRA_KEYSPACE = "cassandra.keyspace";
public static final String CASSANDRA_CONSISTENCY_LEVEL = "cassandra.output.consistencyLevel";
public static final String CASSANDRA_NODES = "cassandra.nodes";
public static final String CASSANDRA_PORT = "cassandra.port";
public static final String CASSANDRA_BATCH_SIZE_ROWS = "cassandra.batch.size.rows";
public static final String CASSANDRA_RETRY_POLICY = "cassandra.retryPolicy";
public static final String CASSANDRA_RECONNECT_POLICY_BASE_MS = "cassandra.reconnectionPolicy.baseDelayMs";
public static final String CASSANDRA_RECONNECT_POLICY_MAX_MS = "cassandra.reconnectionPolicy.maxDelayMs";
public static final String CASSANDRA_POOL_MAX_SIZE = "cassandra.pool.max.size";
public static final String CASSANDRA_LOAD_BALANCING_POLICY = "cassandra.loadBalancingPolicy";
public static final String CASSANDRA_DATACENTER_NAME = "cassandra.datacenter.name";
public static final String CASSANDRA_MAX_REQUESTS_PER_CON_LOCAL = "cassandra.max.requests.per.con.local";
public static final String CASSANDRA_MAX_REQUESTS_PER_CON_REMOTE = "cassandra.max.requests.per.con.remote";
public static final String CASSANDRA_HEARTBEAT_INTERVAL_SEC = "cassandra.heartbeat.interval.sec";
public static final String CASSANDRA_IDLE_TIMEOUT_SEC = "cassandra.idle.timeout.sec";
public static final String CASSANDRA_SOCKET_READ_TIMEOUT_MS = "cassandra.socket.read.timeout.millis";
public static final String CASSANDRA_SOCKET_CONNECT_TIMEOUT_MS = "cassandra.socket.connect.timeout.millis";
/**
* The authorized cassandra username.
*/
private String username;
/**
* The authorized cassandra password.
*/
private String password;
/**
* The cassandra keyspace.
*/
private String keyspace;
/**
* List of contacts nodes.
*/
private String[] nodes = { "localhost" };
/**
* The port used to connect to nodes.
*/
private int port = 9092;
/**
* Consistency level used to write statements.
*/
private ConsistencyLevel consistencyLevel = ConsistencyLevel.ONE;
/**
* The maximal numbers of rows per batch.
*/
private int batchSizeRows = 100;
/**
* The retry policy to use for the new cluster.
*/
private String retryPolicyName;
/**
* The base delay in milliseconds to use for the reconnection policy.
*/
private long reconnectionPolicyBaseMs;
/**
* The maximum delay to wait between two attempts.
*/
private long reconnectionPolicyMaxMs;
/**
* The maximum queue for connection pool.
*/
private int poolMaxQueueSize;
private String loadBalancingPolicyName;
private String datacenterName;
private int maxRequestPerConnectionLocal;
private int maxRequestPerConnectionRemote;
private int heartbeatIntervalSeconds;
private int idleTimeoutSeconds;
/**
* The timeout for read for socket options.
*/
private long socketReadTimeoutMillis;
/**
* The timeout for connect for socket options.
*/
private long socketConnectTimeoutMillis;
/**
* Creates a new {@link CassandraConf} instance.
*/
public CassandraConf() {
super();
}
/**
* Creates a new {@link CassandraConf} instance.
*
* @param conf The storm configuration.
*/
public CassandraConf(Map<String, Object> conf) {
this.username = (String) Utils.get(conf, CASSANDRA_USERNAME, null);
this.password = (String) Utils.get(conf, CASSANDRA_PASSWORD, null);
this.keyspace = get(conf, CASSANDRA_KEYSPACE);
this.consistencyLevel =
ConsistencyLevel.valueOf((String) Utils.get(conf, CASSANDRA_CONSISTENCY_LEVEL, ConsistencyLevel.ONE.name()));
this.nodes = ((String) Utils.get(conf, CASSANDRA_NODES, "localhost")).split(",");
this.batchSizeRows = ObjectReader.getInt(conf.get(CASSANDRA_BATCH_SIZE_ROWS), 100);
this.port = ObjectReader.getInt(conf.get(CASSANDRA_PORT), 9042);
this.retryPolicyName = (String) Utils.get(conf, CASSANDRA_RETRY_POLICY, DefaultRetryPolicy.class.getSimpleName());
this.reconnectionPolicyBaseMs = getLong(conf.get(CASSANDRA_RECONNECT_POLICY_BASE_MS), 100L);
this.reconnectionPolicyMaxMs = getLong(conf.get(CASSANDRA_RECONNECT_POLICY_MAX_MS), TimeUnit.MINUTES.toMillis(1));
this.poolMaxQueueSize = getInt(conf.get(CASSANDRA_POOL_MAX_SIZE), 256);
this.loadBalancingPolicyName = (String) Utils.get(conf, CASSANDRA_LOAD_BALANCING_POLICY, TokenAwarePolicy.class.getSimpleName());
this.datacenterName = (String) Utils.get(conf, CASSANDRA_DATACENTER_NAME, null);
this.maxRequestPerConnectionLocal = getInt(conf.get(CASSANDRA_MAX_REQUESTS_PER_CON_LOCAL), 1024);
this.maxRequestPerConnectionRemote = getInt(conf.get(CASSANDRA_MAX_REQUESTS_PER_CON_REMOTE), 256);
this.heartbeatIntervalSeconds = getInt(conf.get(CASSANDRA_HEARTBEAT_INTERVAL_SEC), 30);
this.idleTimeoutSeconds = getInt(conf.get(CASSANDRA_IDLE_TIMEOUT_SEC), 60);
this.socketReadTimeoutMillis =
getLong(conf.get(CASSANDRA_SOCKET_READ_TIMEOUT_MS), (long) SocketOptions.DEFAULT_READ_TIMEOUT_MILLIS);
this.socketConnectTimeoutMillis =
getLong(conf.get(CASSANDRA_SOCKET_CONNECT_TIMEOUT_MS), (long) SocketOptions.DEFAULT_CONNECT_TIMEOUT_MILLIS);
}
public static Integer getInt(Object o, Integer defaultValue) {
if (null == o) {
return defaultValue;
}
if (o instanceof Number) {
return ((Number) o).intValue();
} else if (o instanceof String) {
return Integer.parseInt((String) o);
}
throw new IllegalArgumentException("Don't know how to convert " + o + " to int");
}
public static Long getLong(Object o, Long defaultValue) {
if (null == o) {
return defaultValue;
}
if (o instanceof Number) {
return ((Number) o).longValue();
} else if (o instanceof String) {
return Long.parseLong((String) o);
}
throw new IllegalArgumentException("Don't know how to convert " + o + " to long");
}
public String getUsername() {
return username;
}
public String getPassword() {
return password;
}
public String getKeyspace() {
return keyspace;
}
public String[] getNodes() {
return nodes;
}
public ConsistencyLevel getConsistencyLevel() {
return consistencyLevel;
}
public int getBatchSizeRows() {
return batchSizeRows;
}
public int getPort() {
return this.port;
}
public long getReconnectionPolicyBaseMs() {
return reconnectionPolicyBaseMs;
}
public long getReconnectionPolicyMaxMs() {
return reconnectionPolicyMaxMs;
}
public RetryPolicy getRetryPolicy() {
if (this.retryPolicyName.equals(DowngradingConsistencyRetryPolicy.class.getSimpleName())) {
return new LoggingRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE);
}
if (this.retryPolicyName.equals(FallthroughRetryPolicy.class.getSimpleName())) {
return FallthroughRetryPolicy.INSTANCE;
}
if (this.retryPolicyName.equals(DefaultRetryPolicy.class.getSimpleName())) {
return DefaultRetryPolicy.INSTANCE;
}
throw new IllegalArgumentException("Unknown cassandra retry policy " + this.retryPolicyName);
}
public LoadBalancingPolicy getLoadBalancingPolicy() {
if (this.loadBalancingPolicyName.equals(TokenAwarePolicy.class.getSimpleName())) {
return new TokenAwarePolicy(new RoundRobinPolicy());
}
if (this.loadBalancingPolicyName.equals(DCAwareRoundRobinPolicy.class.getSimpleName())) {
Builder builder = DCAwareRoundRobinPolicy.builder();
if (StringUtils.isNotBlank(datacenterName)) {
builder = builder.withLocalDc(this.datacenterName);
}
return new TokenAwarePolicy(builder.build());
}
throw new IllegalArgumentException("Unknown cassandra load balancing policy " + this.loadBalancingPolicyName);
}
public int getPoolMaxQueueSize() {
return poolMaxQueueSize;
}
public String getDatacenterName() {
return datacenterName;
}
public int getMaxRequestPerConnectionLocal() {
return maxRequestPerConnectionLocal;
}
public int getMaxRequestPerConnectionRemote() {
return maxRequestPerConnectionRemote;
}
public int getHeartbeatIntervalSeconds() {
return heartbeatIntervalSeconds;
}
public int getIdleTimeoutSeconds() {
return idleTimeoutSeconds;
}
public long getSocketReadTimeoutMillis() {
return socketReadTimeoutMillis;
}
public long getSocketConnectTimeoutMillis() {
return socketConnectTimeoutMillis;
}
private <T> T get(Map<String, Object> conf, String key) {
Object o = conf.get(key);
if (o == null) {
throw new IllegalArgumentException("No '" + key + "' value found in configuration!");
}
return (T) o;
}
@Override
public String toString() {
return Objects.toStringHelper(this)
.add("username", username)
.add("password", password)
.add("keyspace", keyspace)
.add("nodes", nodes)
.add("port", port)
.add("consistencyLevel", consistencyLevel)
.add("batchSizeRows", batchSizeRows)
.add("retryPolicyName", retryPolicyName)
.add("reconnectionPolicyBaseMs", reconnectionPolicyBaseMs)
.add("reconnectionPolicyMaxMs", reconnectionPolicyMaxMs)
.add("poolMaxQueueSize", poolMaxQueueSize)
.add("datacenterName", datacenterName)
.add("maxRequestPerConnectionLocal", maxRequestPerConnectionLocal)
.add("maxRequestPerConnectionRemote", maxRequestPerConnectionRemote)
.add("heartbeatIntervalSeconds", heartbeatIntervalSeconds)
.add("idleTimeoutSeconds", idleTimeoutSeconds)
.add("socketReadTimeoutMillis", socketReadTimeoutMillis)
.toString();
}
}