blob: 19ebbe311e557bc809b9f354906d36a74573b995 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.cache.store.cassandra.datasource;
import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.io.Serializable;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.LinkedList;
import java.util.List;
import java.util.UUID;
import com.datastax.driver.core.AuthProvider;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.ConsistencyLevel;
import com.datastax.driver.core.NettyOptions;
import com.datastax.driver.core.PoolingOptions;
import com.datastax.driver.core.ProtocolOptions;
import com.datastax.driver.core.ProtocolVersion;
import com.datastax.driver.core.SSLOptions;
import com.datastax.driver.core.SocketOptions;
import com.datastax.driver.core.policies.AddressTranslator;
import com.datastax.driver.core.policies.LoadBalancingPolicy;
import com.datastax.driver.core.policies.ReconnectionPolicy;
import com.datastax.driver.core.policies.RetryPolicy;
import com.datastax.driver.core.policies.SpeculativeExecutionPolicy;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.store.cassandra.session.CassandraSession;
import org.apache.ignite.cache.store.cassandra.session.CassandraSessionImpl;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
/**
* Data source abstraction to specify configuration of the Cassandra session to be used.
*/
public class DataSource implements Externalizable {
/** */
private static final long serialVersionUID = 0L;
/**
* Null object, used as a replacement for those Cassandra connection options which
* don't support serialization (RetryPolicy, LoadBalancingPolicy and etc).
*/
private static final UUID NULL_OBJECT = UUID.fromString("45ffae47-3193-5910-84a2-048fe65735d9");
/** Default expiration timeout for Cassandra driver session. */
public static final long DFLT_SESSION_EXPIRATION_TIMEOUT = 300000; // 5 minutes.
/** Number of rows to immediately fetch in CQL statement execution. */
private Integer fetchSize;
/** Consistency level for READ operations. */
private ConsistencyLevel readConsistency;
/** Consistency level for WRITE operations. */
private ConsistencyLevel writeConsistency;
/** Username to use for authentication. */
@GridToStringExclude
private String user;
/** Password to use for authentication. */
@GridToStringExclude
private String pwd;
/** Port to use for Cassandra connection. */
private Integer port;
/** List of contact points to connect to Cassandra cluster. */
private List<InetAddress> contactPoints;
/** List of contact points with ports to connect to Cassandra cluster. */
private List<InetSocketAddress> contactPointsWithPorts;
/** Maximum time to wait for schema agreement before returning from a DDL query. */
private Integer maxSchemaAgreementWaitSeconds;
/** The native protocol version to use. */
private Integer protoVer;
/** Compression to use for the transport. */
private String compression;
/** Use SSL for communications with Cassandra. */
private Boolean useSSL;
/** Enables metrics collection. */
private Boolean collectMetrix;
/** Enables JMX reporting of the metrics. */
private Boolean jmxReporting;
/** Credentials to use for authentication. */
private Credentials creds;
/** Load balancing policy to use. */
private LoadBalancingPolicy loadBalancingPlc;
/** Reconnection policy to use. */
private ReconnectionPolicy reconnectionPlc;
/** Retry policy to use. */
private RetryPolicy retryPlc;
/** Address translator to use. */
private AddressTranslator addrTranslator;
/** Speculative execution policy to use. */
private SpeculativeExecutionPolicy speculativeExecutionPlc;
/** Authentication provider to use. */
private AuthProvider authProvider;
/** SSL options to use. */
private SSLOptions sslOptions;
/** Connection pooling options to use. */
private PoolingOptions poolingOptions;
/** Socket options to use. */
private SocketOptions sockOptions;
/** Netty options to use for connection. */
private NettyOptions nettyOptions;
/** Expiration timeout for Cassandra driver session. */
private long sessionExpirationTimeout = DFLT_SESSION_EXPIRATION_TIMEOUT;
/** Cassandra session wrapper instance. */
private volatile CassandraSession ses;
/**
* Sets user name to use for authentication.
*
* @param user user name
*/
public void setUser(String user) {
this.user = user;
invalidate();
}
/**
* Sets password to use for authentication.
*
* @param pwd password
*/
public void setPassword(String pwd) {
this.pwd = pwd;
invalidate();
}
/**
* Sets port to use for Cassandra connection.
*
* @param port port
*/
public void setPort(int port) {
this.port = port;
invalidate();
}
/**
* Sets list of contact points to connect to Cassandra cluster.
*
* @param points contact points
*/
public void setContactPoints(String... points) {
if (points == null || points.length == 0)
return;
for (String point : points) {
if (point.contains(":")) {
if (contactPointsWithPorts == null)
contactPointsWithPorts = new LinkedList<>();
String[] chunks = point.split(":");
try {
contactPointsWithPorts.add(InetSocketAddress.createUnresolved(chunks[0].trim(), Integer.parseInt(chunks[1].trim())));
}
catch (Throwable e) {
throw new IllegalArgumentException("Incorrect contact point '" + point + "' specified for Cassandra cache storage", e);
}
}
else {
if (contactPoints == null)
contactPoints = new LinkedList<>();
try {
contactPoints.add(InetAddress.getByName(point));
}
catch (Throwable e) {
throw new IllegalArgumentException("Incorrect contact point '" + point + "' specified for Cassandra cache storage", e);
}
}
}
invalidate();
}
/** @param seconds Maximum time to wait for schema agreement before returning from a DDL query. */
public void setMaxSchemaAgreementWaitSeconds(int seconds) {
maxSchemaAgreementWaitSeconds = seconds;
invalidate();
}
/**
* Sets the native protocol version to use.
*
* @param ver version number
*/
public void setProtocolVersion(int ver) {
protoVer = ver;
invalidate();
}
/**
* Sets compression algorithm to use for the transport.
*
* @param compression Compression algorithm.
*/
public void setCompression(String compression) {
this.compression = compression == null || compression.trim().isEmpty() ? null : compression.trim();
try {
if (this.compression != null)
ProtocolOptions.Compression.valueOf(this.compression);
}
catch (Throwable e) {
throw new IgniteException("Incorrect compression '" + compression + "' specified for Cassandra connection", e);
}
invalidate();
}
/**
* Enables SSL for communications with Cassandra.
*
* @param use Flag to enable/disable SSL.
*/
public void setUseSSL(boolean use) {
useSSL = use;
invalidate();
}
/**
* Enables metrics collection.
*
* @param collect Flag to enable/disable metrics collection.
*/
public void setCollectMetrix(boolean collect) {
collectMetrix = collect;
invalidate();
}
/**
* Enables JMX reporting of the metrics.
*
* @param enableReporting Flag to enable/disable JMX reporting.
*/
public void setJmxReporting(boolean enableReporting) {
jmxReporting = enableReporting;
invalidate();
}
/**
* Sets number of rows to immediately fetch in CQL statement execution.
*
* @param size Number of rows to fetch.
*/
public void setFetchSize(int size) {
fetchSize = size;
invalidate();
}
/**
* Set consistency level for READ operations.
*
* @param level Consistency level.
*/
public void setReadConsistency(String level) {
readConsistency = parseConsistencyLevel(level);
invalidate();
}
/**
* Set consistency level for WRITE operations.
*
* @param level Consistency level.
*/
public void setWriteConsistency(String level) {
writeConsistency = parseConsistencyLevel(level);
invalidate();
}
/**
* Sets credentials to use for authentication.
*
* @param creds Credentials.
*/
public void setCredentials(Credentials creds) {
this.creds = creds;
invalidate();
}
/**
* Sets load balancing policy.
*
* @param plc Load balancing policy.
*/
public void setLoadBalancingPolicy(LoadBalancingPolicy plc) {
loadBalancingPlc = plc;
invalidate();
}
/**
* Sets reconnection policy.
*
* @param plc Reconnection policy.
*/
public void setReconnectionPolicy(ReconnectionPolicy plc) {
reconnectionPlc = plc;
invalidate();
}
/**
* Sets retry policy.
*
* @param plc Retry policy.
*/
public void setRetryPolicy(RetryPolicy plc) {
retryPlc = plc;
invalidate();
}
/**
* Sets address translator.
*
* @param translator Address translator.
*/
public void setAddressTranslator(AddressTranslator translator) {
addrTranslator = translator;
invalidate();
}
/**
* Sets speculative execution policy.
*
* @param plc Speculative execution policy.
*/
public void setSpeculativeExecutionPolicy(SpeculativeExecutionPolicy plc) {
speculativeExecutionPlc = plc;
invalidate();
}
/**
* Sets authentication provider.
*
* @param provider Authentication provider.
*/
public void setAuthProvider(AuthProvider provider) {
authProvider = provider;
invalidate();
}
/**
* Sets SSL options.
*
* @param options SSL options.
*/
public void setSslOptions(SSLOptions options) {
sslOptions = options;
invalidate();
}
/**
* Sets pooling options.
*
* @param options pooling options to use.
*/
public void setPoolingOptions(PoolingOptions options) {
poolingOptions = options;
invalidate();
}
/**
* Sets socket options to use.
*
* @param options Socket options.
*/
public void setSocketOptions(SocketOptions options) {
sockOptions = options;
invalidate();
}
/**
* Sets netty options to use.
*
* @param options netty options.
*/
public void setNettyOptions(NettyOptions options) {
nettyOptions = options;
invalidate();
}
/**
* Sets expiration timeout for Cassandra driver session. Idle sessions that are not
* used during this timeout value will be automatically closed and recreated later
* on demand.
* <p>
* If set to {@code 0}, timeout is disabled.
* <p>
* Default value is {@link #DFLT_SESSION_EXPIRATION_TIMEOUT}.
*
* @param sessionExpirationTimeout Expiration timeout for Cassandra driver session.
*/
public void setSessionExpirationTimeout(long sessionExpirationTimeout) {
this.sessionExpirationTimeout = sessionExpirationTimeout;
invalidate();
}
/**
* Creates Cassandra session wrapper if it wasn't created yet and returns it
*
* @param log logger
* @return Cassandra session wrapper
*/
public synchronized CassandraSession session(IgniteLogger log) {
if (ses != null)
return ses;
Cluster.Builder builder = Cluster.builder();
if (user != null)
builder = builder.withCredentials(user, pwd);
if (port != null)
builder = builder.withPort(port);
if (contactPoints != null)
builder = builder.addContactPoints(contactPoints);
if (contactPointsWithPorts != null)
builder = builder.addContactPointsWithPorts(contactPointsWithPorts);
if (maxSchemaAgreementWaitSeconds != null)
builder = builder.withMaxSchemaAgreementWaitSeconds(maxSchemaAgreementWaitSeconds);
if (protoVer != null)
builder = builder.withProtocolVersion(ProtocolVersion.fromInt(protoVer));
if (compression != null) {
try {
builder = builder.withCompression(ProtocolOptions.Compression.valueOf(compression.trim().toLowerCase()));
}
catch (IllegalArgumentException e) {
throw new IgniteException("Incorrect compression option '" + compression + "' specified for Cassandra connection", e);
}
}
if (useSSL != null && useSSL)
builder = builder.withSSL();
if (sslOptions != null)
builder = builder.withSSL(sslOptions);
if (collectMetrix != null && !collectMetrix)
builder = builder.withoutMetrics();
if (jmxReporting != null && !jmxReporting)
builder = builder.withoutJMXReporting();
if (creds != null)
builder = builder.withCredentials(creds.getUser(), creds.getPassword());
if (loadBalancingPlc != null)
builder = builder.withLoadBalancingPolicy(loadBalancingPlc);
if (reconnectionPlc != null)
builder = builder.withReconnectionPolicy(reconnectionPlc);
if (retryPlc != null)
builder = builder.withRetryPolicy(retryPlc);
if (addrTranslator != null)
builder = builder.withAddressTranslator(addrTranslator);
if (speculativeExecutionPlc != null)
builder = builder.withSpeculativeExecutionPolicy(speculativeExecutionPlc);
if (authProvider != null)
builder = builder.withAuthProvider(authProvider);
if (poolingOptions != null)
builder = builder.withPoolingOptions(poolingOptions);
if (sockOptions != null)
builder = builder.withSocketOptions(sockOptions);
if (nettyOptions != null)
builder = builder.withNettyOptions(nettyOptions);
return ses = new CassandraSessionImpl(
builder, fetchSize, readConsistency, writeConsistency, sessionExpirationTimeout, log);
}
/** {@inheritDoc} */
@Override public void writeExternal(ObjectOutput out) throws IOException {
out.writeObject(fetchSize);
out.writeObject(readConsistency);
out.writeObject(writeConsistency);
U.writeString(out, user);
U.writeString(out, pwd);
out.writeObject(port);
out.writeObject(contactPoints);
out.writeObject(contactPointsWithPorts);
out.writeObject(maxSchemaAgreementWaitSeconds);
out.writeObject(protoVer);
U.writeString(out, compression);
out.writeObject(useSSL);
out.writeObject(collectMetrix);
out.writeObject(jmxReporting);
out.writeObject(creds);
writeObject(out, loadBalancingPlc);
writeObject(out, reconnectionPlc);
writeObject(out, addrTranslator);
writeObject(out, speculativeExecutionPlc);
writeObject(out, authProvider);
writeObject(out, sslOptions);
writeObject(out, poolingOptions);
writeObject(out, sockOptions);
writeObject(out, nettyOptions);
}
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
fetchSize = (Integer)in.readObject();
readConsistency = (ConsistencyLevel)in.readObject();
writeConsistency = (ConsistencyLevel)in.readObject();
user = U.readString(in);
pwd = U.readString(in);
port = (Integer)in.readObject();
contactPoints = (List<InetAddress>)in.readObject();
contactPointsWithPorts = (List<InetSocketAddress>)in.readObject();
maxSchemaAgreementWaitSeconds = (Integer)in.readObject();
protoVer = (Integer)in.readObject();
compression = U.readString(in);
useSSL = (Boolean)in.readObject();
collectMetrix = (Boolean)in.readObject();
jmxReporting = (Boolean)in.readObject();
creds = (Credentials)in.readObject();
loadBalancingPlc = (LoadBalancingPolicy)readObject(in);
reconnectionPlc = (ReconnectionPolicy)readObject(in);
addrTranslator = (AddressTranslator)readObject(in);
speculativeExecutionPlc = (SpeculativeExecutionPolicy)readObject(in);
authProvider = (AuthProvider)readObject(in);
sslOptions = (SSLOptions)readObject(in);
poolingOptions = (PoolingOptions)readObject(in);
sockOptions = (SocketOptions)readObject(in);
nettyOptions = (NettyOptions)readObject(in);
}
/**
* Helper method used to serialize class members
* @param out the stream to write the object to
* @param obj the object to be written
* @throws IOException Includes any I/O exceptions that may occur
*/
private void writeObject(ObjectOutput out, Object obj) throws IOException {
out.writeObject(obj == null || !(obj instanceof Serializable) ? NULL_OBJECT : obj);
}
/**
* Helper method used to deserialize class members
* @param in the stream to read data from in order to restore the object
* @throws IOException Includes any I/O exceptions that may occur
* @throws ClassNotFoundException If the class for an object being restored cannot be found
* @return deserialized object
*/
private Object readObject(ObjectInput in) throws IOException, ClassNotFoundException {
Object obj = in.readObject();
return NULL_OBJECT.equals(obj) ? null : obj;
}
/**
* Parses consistency level provided as string.
*
* @param level consistency level string.
*
* @return consistency level.
*/
private ConsistencyLevel parseConsistencyLevel(String level) {
if (level == null)
return null;
try {
return ConsistencyLevel.valueOf(level.trim().toUpperCase());
}
catch (Throwable e) {
throw new IgniteException("Incorrect consistency level '" + level + "' specified for Cassandra connection", e);
}
}
/**
* Invalidates session.
*/
private synchronized void invalidate() {
ses = null;
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(DataSource.class, this);
}
}