blob: f05c513c93c89c6cec8cc79dac19b6b66eca04dc [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.stress.util;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import com.google.common.net.HostAndPort;
import com.datastax.driver.core.*;
import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy;
import com.datastax.driver.core.policies.LoadBalancingPolicy;
import com.datastax.driver.core.policies.TokenAwarePolicy;
import com.datastax.driver.core.policies.WhiteListPolicy;
import com.datastax.shaded.netty.channel.socket.SocketChannel;
import io.netty.util.internal.logging.InternalLoggerFactory;
import io.netty.util.internal.logging.Slf4JLoggerFactory;
import org.apache.cassandra.config.EncryptionOptions;
import org.apache.cassandra.security.SSLFactory;
import org.apache.cassandra.stress.settings.StressSettings;
public class JavaDriverClient
{
static
{
InternalLoggerFactory.setDefaultFactory(new Slf4JLoggerFactory());
}
public final List<String> hosts;
public final int port;
public final String username;
public final String password;
public final AuthProvider authProvider;
public final int maxPendingPerConnection;
public final int connectionsPerHost;
private final ProtocolVersion protocolVersion;
private final EncryptionOptions encryptionOptions;
private Cluster cluster;
private Session session;
private final LoadBalancingPolicy loadBalancingPolicy;
private static final ConcurrentMap<String, PreparedStatement> stmts = new ConcurrentHashMap<>();
public JavaDriverClient(StressSettings settings, String host, int port)
{
this(settings, Collections.singletonList(host), port, new EncryptionOptions());
}
public JavaDriverClient(StressSettings settings, List<String> hosts, int port)
{
this(settings, hosts, port, new EncryptionOptions());
}
public JavaDriverClient(StressSettings settings, List<String> hosts, int port, EncryptionOptions encryptionOptions)
{
this.protocolVersion = settings.mode.protocolVersion;
this.hosts = hosts;
this.port = port;
this.username = settings.mode.username;
this.password = settings.mode.password;
this.authProvider = settings.mode.authProvider;
this.encryptionOptions = new EncryptionOptions(encryptionOptions).applyConfig();
this.loadBalancingPolicy = loadBalancingPolicy(settings);
this.connectionsPerHost = settings.mode.connectionsPerHost == null ? 8 : settings.mode.connectionsPerHost;
int maxThreadCount = 0;
if (settings.rate.auto)
maxThreadCount = settings.rate.maxThreads;
else
maxThreadCount = settings.rate.threadCount;
//Always allow enough pending requests so every thread can have a request pending
//See https://issues.apache.org/jira/browse/CASSANDRA-7217
int requestsPerConnection = (maxThreadCount / connectionsPerHost) + connectionsPerHost;
maxPendingPerConnection = settings.mode.maxPendingPerConnection == null ? Math.max(128, requestsPerConnection ) : settings.mode.maxPendingPerConnection;
}
private LoadBalancingPolicy loadBalancingPolicy(StressSettings settings)
{
DCAwareRoundRobinPolicy.Builder policyBuilder = DCAwareRoundRobinPolicy.builder();
if (settings.node.datacenter != null)
policyBuilder.withLocalDc(settings.node.datacenter);
LoadBalancingPolicy ret = null;
if (settings.node.datacenter != null)
ret = policyBuilder.build();
if (settings.node.isWhiteList)
ret = new WhiteListPolicy(ret == null ? policyBuilder.build() : ret, settings.node.resolveAll(settings.port.nativePort));
return new TokenAwarePolicy(ret == null ? policyBuilder.build() : ret);
}
public PreparedStatement prepare(String query)
{
PreparedStatement stmt = stmts.get(query);
if (stmt != null)
return stmt;
synchronized (stmts)
{
stmt = stmts.get(query);
if (stmt != null)
return stmt;
stmt = getSession().prepare(query);
stmts.put(query, stmt);
}
return stmt;
}
public void connect(ProtocolOptions.Compression compression) throws Exception
{
PoolingOptions poolingOpts = new PoolingOptions()
.setConnectionsPerHost(HostDistance.LOCAL, connectionsPerHost, connectionsPerHost)
.setMaxRequestsPerConnection(HostDistance.LOCAL, maxPendingPerConnection)
.setNewConnectionThreshold(HostDistance.LOCAL, 100);
List<InetSocketAddress> contacts = new ArrayList<>();
for (String host : hosts)
{
HostAndPort hap = HostAndPort.fromString(host).withDefaultPort(port);
InetSocketAddress contact = new InetSocketAddress(InetAddress.getByName(hap.getHost()), hap.getPort());
contacts.add(contact);
}
Cluster.Builder clusterBuilder = Cluster.builder()
.addContactPointsWithPorts(contacts)
.withPoolingOptions(poolingOpts)
.withoutJMXReporting()
.withProtocolVersion(protocolVersion)
.withoutMetrics(); // The driver uses metrics 3 with conflict with our version
if (loadBalancingPolicy != null)
clusterBuilder.withLoadBalancingPolicy(loadBalancingPolicy);
clusterBuilder.withCompression(compression);
if (encryptionOptions.getEnabled())
{
SSLContext sslContext;
sslContext = SSLFactory.createSSLContext(encryptionOptions, true);
// Temporarily override newSSLEngine to set accepted protocols until it is added to
// RemoteEndpointAwareJdkSSLOptions. See CASSANDRA-13325 and CASSANDRA-16362.
RemoteEndpointAwareJdkSSLOptions sslOptions = new RemoteEndpointAwareJdkSSLOptions(sslContext, null)
{
protected SSLEngine newSSLEngine(SocketChannel channel, InetSocketAddress remoteEndpoint)
{
SSLEngine engine = super.newSSLEngine(channel, remoteEndpoint);
String[] acceptedProtocols = encryptionOptions.acceptedProtocolsArray();
if (acceptedProtocols != null && acceptedProtocols.length > 0)
engine.setEnabledProtocols(acceptedProtocols);
return engine;
}
};
clusterBuilder.withSSL(sslOptions);
}
if (authProvider != null)
{
clusterBuilder.withAuthProvider(authProvider);
}
else if (username != null)
{
clusterBuilder.withCredentials(username, password);
}
cluster = clusterBuilder.build();
Metadata metadata = cluster.getMetadata();
System.out.printf(
"Connected to cluster: %s, max pending requests per connection %d, max connections per host %d%n",
metadata.getClusterName(),
maxPendingPerConnection,
connectionsPerHost);
for (Host host : metadata.getAllHosts())
{
System.out.printf("Datacenter: %s; Host: %s; Rack: %s%n",
host.getDatacenter(), host.getAddress() + ":" + host.getSocketAddress().getPort(), host.getRack());
}
session = cluster.connect();
}
public Cluster getCluster()
{
return cluster;
}
public Session getSession()
{
return session;
}
public ResultSet execute(String query, org.apache.cassandra.db.ConsistencyLevel consistency)
{
SimpleStatement stmt = new SimpleStatement(query);
if (consistency.isSerialConsistency())
stmt.setSerialConsistencyLevel(from(consistency));
else
stmt.setConsistencyLevel(from(consistency));
return getSession().execute(stmt);
}
public ResultSet executePrepared(PreparedStatement stmt, List<Object> queryParams, org.apache.cassandra.db.ConsistencyLevel consistency)
{
if (consistency.isSerialConsistency())
{
stmt.setSerialConsistencyLevel(from(consistency));
}
else
{
stmt.setConsistencyLevel(from(consistency));
}
BoundStatement bstmt = stmt.bind((Object[]) queryParams.toArray(new Object[queryParams.size()]));
return getSession().execute(bstmt);
}
/**
* Get ConsistencyLevel from a C* ConsistencyLevel. This exists in the Java Driver ConsistencyLevel,
* but it is not public.
*
* @param cl
* @return
*/
public static ConsistencyLevel from(org.apache.cassandra.db.ConsistencyLevel cl)
{
switch (cl)
{
case ANY:
return com.datastax.driver.core.ConsistencyLevel.ANY;
case ONE:
return com.datastax.driver.core.ConsistencyLevel.ONE;
case TWO:
return com.datastax.driver.core.ConsistencyLevel.TWO;
case THREE:
return com.datastax.driver.core.ConsistencyLevel.THREE;
case QUORUM:
return com.datastax.driver.core.ConsistencyLevel.QUORUM;
case ALL:
return com.datastax.driver.core.ConsistencyLevel.ALL;
case LOCAL_QUORUM:
return com.datastax.driver.core.ConsistencyLevel.LOCAL_QUORUM;
case EACH_QUORUM:
return com.datastax.driver.core.ConsistencyLevel.EACH_QUORUM;
case LOCAL_ONE:
return com.datastax.driver.core.ConsistencyLevel.LOCAL_ONE;
case SERIAL:
return com.datastax.driver.core.ConsistencyLevel.SERIAL;
case LOCAL_SERIAL:
return com.datastax.driver.core.ConsistencyLevel.LOCAL_SERIAL;
}
throw new AssertionError();
}
public void disconnect()
{
cluster.close();
}
}