blob: 335ca9212909f4983f57f85aa0958c1b850ea0eb [file] [log] [blame]
package org.apache.cassandra.stress.settings;
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
import java.io.Serializable;
import java.util.*;
import com.datastax.driver.core.Metadata;
import com.google.common.collect.ImmutableMap;
import org.apache.cassandra.config.EncryptionOptions;
import org.apache.cassandra.stress.util.JavaDriverClient;
import org.apache.cassandra.stress.util.SimpleThriftClient;
import org.apache.cassandra.stress.util.SmartThriftClient;
import org.apache.cassandra.stress.util.ThriftClient;
import org.apache.cassandra.thrift.AuthenticationRequest;
import org.apache.cassandra.thrift.Cassandra;
import org.apache.cassandra.thrift.InvalidRequestException;
import org.apache.cassandra.transport.SimpleClient;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.transport.TTransport;
public class StressSettings implements Serializable
{
public final SettingsCommand command;
public final SettingsRate rate;
public final SettingsPopulation generate;
public final SettingsInsert insert;
public final SettingsColumn columns;
public final SettingsSamples samples;
public final SettingsErrors errors;
public final SettingsLog log;
public final SettingsMode mode;
public final SettingsNode node;
public final SettingsSchema schema;
public final SettingsTransport transport;
public final SettingsPort port;
public final String sendToDaemon;
public StressSettings(SettingsCommand command, SettingsRate rate, SettingsPopulation generate, SettingsInsert insert, SettingsColumn columns, SettingsSamples samples, SettingsErrors errors, SettingsLog log, SettingsMode mode, SettingsNode node, SettingsSchema schema, SettingsTransport transport, SettingsPort port, String sendToDaemon)
{
this.command = command;
this.rate = rate;
this.insert = insert;
this.generate = generate;
this.columns = columns;
this.samples = samples;
this.errors = errors;
this.log = log;
this.mode = mode;
this.node = node;
this.schema = schema;
this.transport = transport;
this.port = port;
this.sendToDaemon = sendToDaemon;
}
private SmartThriftClient tclient;
/**
* Thrift client connection
* @return cassandra client connection
*/
public synchronized ThriftClient getThriftClient()
{
if (mode.api != ConnectionAPI.THRIFT_SMART)
return getSimpleThriftClient();
if (tclient == null)
tclient = getSmartThriftClient();
return tclient;
}
private SmartThriftClient getSmartThriftClient()
{
Metadata metadata = getJavaDriverClient().getCluster().getMetadata();
return new SmartThriftClient(this, schema.keyspace, metadata);
}
/**
* Thrift client connection
* @return cassandra client connection
*/
private SimpleThriftClient getSimpleThriftClient()
{
return new SimpleThriftClient(getRawThriftClient(node.randomNode(), true));
}
public Cassandra.Client getRawThriftClient(boolean setKeyspace)
{
return getRawThriftClient(node.randomNode(), setKeyspace);
}
public Cassandra.Client getRawThriftClient(String host)
{
return getRawThriftClient(host, true);
}
public Cassandra.Client getRawThriftClient(String host, boolean setKeyspace)
{
Cassandra.Client client;
try
{
TTransport transport = this.transport.getFactory().openTransport(host, port.thriftPort);
client = new Cassandra.Client(new TBinaryProtocol(transport));
if (mode.cqlVersion.isCql())
client.set_cql_version(mode.cqlVersion.connectVersion);
if (setKeyspace)
client.set_keyspace(schema.keyspace);
if (mode.username != null)
client.login(new AuthenticationRequest(ImmutableMap.of("username", mode.username, "password", mode.password)));
}
catch (InvalidRequestException e)
{
throw new RuntimeException(e.getWhy());
}
catch (Exception e)
{
throw new RuntimeException(e);
}
return client;
}
public SimpleClient getSimpleNativeClient()
{
try
{
String currentNode = node.randomNode();
SimpleClient client = new SimpleClient(currentNode, port.nativePort);
client.connect(false);
client.execute("USE \"" + schema.keyspace + "\";", org.apache.cassandra.db.ConsistencyLevel.ONE);
return client;
}
catch (Exception e)
{
throw new RuntimeException(e.getMessage());
}
}
private static volatile JavaDriverClient client;
public JavaDriverClient getJavaDriverClient()
{
return getJavaDriverClient(true);
}
public JavaDriverClient getJavaDriverClient(boolean setKeyspace)
{
if (client != null)
return client;
try
{
synchronized (this)
{
String currentNode = node.randomNode();
if (client != null)
return client;
EncryptionOptions.ClientEncryptionOptions encOptions = transport.getEncryptionOptions();
JavaDriverClient c = new JavaDriverClient(this, currentNode, port.nativePort, encOptions);
c.connect(mode.compression());
if (setKeyspace)
c.execute("USE \"" + schema.keyspace + "\";", org.apache.cassandra.db.ConsistencyLevel.ONE);
return client = c;
}
}
catch (Exception e)
{
throw new RuntimeException(e);
}
}
public void maybeCreateKeyspaces()
{
if (command.type == Command.WRITE || command.type == Command.COUNTER_WRITE)
schema.createKeySpaces(this);
else if (command.type == Command.USER)
((SettingsCommandUser) command).profile.maybeCreateSchema(this);
}
public static StressSettings parse(String[] args)
{
try
{
args = repairParams(args);
final Map<String, String[]> clArgs = parseMap(args);
if (clArgs.containsKey("legacy"))
return Legacy.build(Arrays.copyOfRange(args, 1, args.length));
if (SettingsMisc.maybeDoSpecial(clArgs))
System.exit(1);
return get(clArgs);
}
catch (IllegalArgumentException e)
{
System.out.println(e.getMessage());
System.exit(1);
throw new AssertionError();
}
}
private static String[] repairParams(String[] args)
{
StringBuilder sb = new StringBuilder();
boolean first = true;
for (String arg : args)
{
if (!first)
sb.append(" ");
sb.append(arg);
first = false;
}
return sb.toString()
.replaceAll("\\s+([,=()])", "$1")
.replaceAll("([,=(])\\s+", "$1")
.split(" +");
}
public static StressSettings get(Map<String, String[]> clArgs)
{
SettingsCommand command = SettingsCommand.get(clArgs);
if (command == null)
throw new IllegalArgumentException("No command specified");
String sendToDaemon = SettingsMisc.getSendToDaemon(clArgs);
SettingsPort port = SettingsPort.get(clArgs);
SettingsRate rate = SettingsRate.get(clArgs, command);
SettingsPopulation generate = SettingsPopulation.get(clArgs, command);
SettingsInsert insert = SettingsInsert.get(clArgs);
SettingsColumn columns = SettingsColumn.get(clArgs);
SettingsSamples samples = SettingsSamples.get(clArgs);
SettingsErrors errors = SettingsErrors.get(clArgs);
SettingsLog log = SettingsLog.get(clArgs);
SettingsMode mode = SettingsMode.get(clArgs);
SettingsNode node = SettingsNode.get(clArgs);
SettingsSchema schema = SettingsSchema.get(clArgs, command);
SettingsTransport transport = SettingsTransport.get(clArgs);
if (!clArgs.isEmpty())
{
printHelp();
System.out.println("Error processing command line arguments. The following were ignored:");
for (Map.Entry<String, String[]> e : clArgs.entrySet())
{
System.out.print(e.getKey());
for (String v : e.getValue())
{
System.out.print(" ");
System.out.print(v);
}
System.out.println();
}
System.exit(1);
}
return new StressSettings(command, rate, generate, insert, columns, samples, errors, log, mode, node, schema, transport, port, sendToDaemon);
}
private static Map<String, String[]> parseMap(String[] args)
{
// first is the main command/operation, so specified without a -
if (args.length == 0)
{
System.out.println("No command provided");
printHelp();
System.exit(1);
}
final LinkedHashMap<String, String[]> r = new LinkedHashMap<>();
String key = null;
List<String> params = new ArrayList<>();
for (int i = 0 ; i < args.length ; i++)
{
if (i == 0 || args[i].startsWith("-"))
{
if (i > 0)
putParam(key, params.toArray(new String[0]), r);
key = args[i].toLowerCase();
params.clear();
}
else
params.add(args[i]);
}
putParam(key, params.toArray(new String[0]), r);
return r;
}
private static void putParam(String key, String[] args, Map<String, String[]> clArgs)
{
String[] prev = clArgs.put(key, args);
if (prev != null)
throw new IllegalArgumentException(key + " is defined multiple times. Each option/command can be specified at most once.");
}
public static void printHelp()
{
SettingsMisc.printHelp();
}
public synchronized void disconnect()
{
if (client == null)
return;
client.disconnect();
client = null;
}
}