blob: 4f173b46288ae65f048025f21f3fdad263cd7fa3 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.stress.util;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import javax.net.ssl.SSLContext;
import com.datastax.driver.core.*;
import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy;
import com.datastax.driver.core.policies.WhiteListPolicy;
import io.netty.util.internal.logging.InternalLoggerFactory;
import io.netty.util.internal.logging.Slf4JLoggerFactory;
import org.apache.cassandra.config.EncryptionOptions;
import org.apache.cassandra.security.SSLFactory;
import org.apache.cassandra.stress.settings.StressSettings;
public class JavaDriverClient
{
static
{
InternalLoggerFactory.setDefaultFactory(new Slf4JLoggerFactory());
}
public final String host;
public final int port;
public final String username;
public final String password;
public final AuthProvider authProvider;
public final int maxPendingPerConnection;
public final int connectionsPerHost;
private final ProtocolVersion protocolVersion;
private final EncryptionOptions.ClientEncryptionOptions encryptionOptions;
private Cluster cluster;
private Session session;
private final WhiteListPolicy whitelist;
private static final ConcurrentMap<String, PreparedStatement> stmts = new ConcurrentHashMap<>();
public JavaDriverClient(StressSettings settings, String host, int port)
{
this(settings, host, port, new EncryptionOptions.ClientEncryptionOptions());
}
public JavaDriverClient(StressSettings settings, String host, int port, EncryptionOptions.ClientEncryptionOptions encryptionOptions)
{
this.protocolVersion = settings.mode.protocolVersion;
this.host = host;
this.port = port;
this.username = settings.mode.username;
this.password = settings.mode.password;
this.authProvider = settings.mode.authProvider;
this.encryptionOptions = encryptionOptions;
if (settings.node.isWhiteList)
whitelist = new WhiteListPolicy(DCAwareRoundRobinPolicy.builder().build(), settings.node.resolveAll(settings.port.nativePort));
else
whitelist = null;
connectionsPerHost = settings.mode.connectionsPerHost == null ? 8 : settings.mode.connectionsPerHost;
int maxThreadCount = 0;
if (settings.rate.auto)
maxThreadCount = settings.rate.maxThreads;
else
maxThreadCount = settings.rate.threadCount;
//Always allow enough pending requests so every thread can have a request pending
//See https://issues.apache.org/jira/browse/CASSANDRA-7217
int requestsPerConnection = (maxThreadCount / connectionsPerHost) + connectionsPerHost;
maxPendingPerConnection = settings.mode.maxPendingPerConnection == null ? Math.max(128, requestsPerConnection ) : settings.mode.maxPendingPerConnection;
}
public PreparedStatement prepare(String query)
{
PreparedStatement stmt = stmts.get(query);
if (stmt != null)
return stmt;
synchronized (stmts)
{
stmt = stmts.get(query);
if (stmt != null)
return stmt;
stmt = getSession().prepare(query);
stmts.put(query, stmt);
}
return stmt;
}
public void connect(ProtocolOptions.Compression compression) throws Exception
{
PoolingOptions poolingOpts = new PoolingOptions()
.setConnectionsPerHost(HostDistance.LOCAL, connectionsPerHost, connectionsPerHost)
.setMaxRequestsPerConnection(HostDistance.LOCAL, maxPendingPerConnection)
.setNewConnectionThreshold(HostDistance.LOCAL, 100);
Cluster.Builder clusterBuilder = Cluster.builder()
.addContactPoint(host)
.withPort(port)
.withPoolingOptions(poolingOpts)
.withoutJMXReporting()
.withProtocolVersion(protocolVersion)
.withoutMetrics(); // The driver uses metrics 3 with conflict with our version
if (whitelist != null)
clusterBuilder.withLoadBalancingPolicy(whitelist);
clusterBuilder.withCompression(compression);
if (encryptionOptions.enabled)
{
SSLContext sslContext;
sslContext = SSLFactory.createSSLContext(encryptionOptions, true);
SSLOptions sslOptions = JdkSSLOptions.builder()
.withSSLContext(sslContext)
.withCipherSuites(encryptionOptions.cipher_suites).build();
clusterBuilder.withSSL(sslOptions);
}
if (authProvider != null)
{
clusterBuilder.withAuthProvider(authProvider);
}
else if (username != null)
{
clusterBuilder.withCredentials(username, password);
}
cluster = clusterBuilder.build();
Metadata metadata = cluster.getMetadata();
System.out.printf(
"Connected to cluster: %s, max pending requests per connection %d, max connections per host %d%n",
metadata.getClusterName(),
maxPendingPerConnection,
connectionsPerHost);
for (Host host : metadata.getAllHosts())
{
System.out.printf("Datatacenter: %s; Host: %s; Rack: %s%n",
host.getDatacenter(), host.getAddress(), host.getRack());
}
session = cluster.connect();
}
public Cluster getCluster()
{
return cluster;
}
public Session getSession()
{
return session;
}
public ResultSet execute(String query, org.apache.cassandra.db.ConsistencyLevel consistency)
{
SimpleStatement stmt = new SimpleStatement(query);
stmt.setConsistencyLevel(from(consistency));
return getSession().execute(stmt);
}
public ResultSet executePrepared(PreparedStatement stmt, List<Object> queryParams, org.apache.cassandra.db.ConsistencyLevel consistency)
{
stmt.setConsistencyLevel(from(consistency));
BoundStatement bstmt = stmt.bind((Object[]) queryParams.toArray(new Object[queryParams.size()]));
return getSession().execute(bstmt);
}
/**
* Get ConsistencyLevel from a C* ConsistencyLevel. This exists in the Java Driver ConsistencyLevel,
* but it is not public.
*
* @param cl
* @return
*/
public static ConsistencyLevel from(org.apache.cassandra.db.ConsistencyLevel cl)
{
switch (cl)
{
case ANY:
return com.datastax.driver.core.ConsistencyLevel.ANY;
case ONE:
return com.datastax.driver.core.ConsistencyLevel.ONE;
case TWO:
return com.datastax.driver.core.ConsistencyLevel.TWO;
case THREE:
return com.datastax.driver.core.ConsistencyLevel.THREE;
case QUORUM:
return com.datastax.driver.core.ConsistencyLevel.QUORUM;
case ALL:
return com.datastax.driver.core.ConsistencyLevel.ALL;
case LOCAL_QUORUM:
return com.datastax.driver.core.ConsistencyLevel.LOCAL_QUORUM;
case EACH_QUORUM:
return com.datastax.driver.core.ConsistencyLevel.EACH_QUORUM;
case LOCAL_ONE:
return com.datastax.driver.core.ConsistencyLevel.LOCAL_ONE;
}
throw new AssertionError();
}
public void disconnect()
{
cluster.close();
}
}