blob: afdc0b189081b09ad3bc243c3ce832c294bcad8d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cassandra.stress.operations.predefined;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import com.google.common.base.Function;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import org.apache.cassandra.stress.generate.PartitionGenerator;
import org.apache.cassandra.stress.generate.SeedManager;
import org.apache.cassandra.stress.settings.Command;
import org.apache.cassandra.stress.settings.ConnectionStyle;
import org.apache.cassandra.stress.settings.StressSettings;
import org.apache.cassandra.stress.util.JavaDriverClient;
import org.apache.cassandra.stress.util.ThriftClient;
import org.apache.cassandra.stress.util.Timer;
import org.apache.cassandra.thrift.Compression;
import org.apache.cassandra.thrift.CqlResult;
import org.apache.cassandra.thrift.CqlRow;
import org.apache.cassandra.thrift.ThriftConversion;
import org.apache.cassandra.transport.SimpleClient;
import org.apache.cassandra.transport.messages.ResultMessage;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.thrift.TException;
public abstract class CqlOperation<V> extends PredefinedOperation
{
protected abstract List<Object> getQueryParameters(byte[] key);
protected abstract String buildQuery();
protected abstract CqlRunOp<V> buildRunOp(ClientWrapper client, String query, Object queryId, List<Object> params, ByteBuffer key);
public CqlOperation(Command type, Timer timer, PartitionGenerator generator, SeedManager seedManager, StressSettings settings)
{
super(type, timer, generator, seedManager, settings);
if (settings.columns.variableColumnCount)
throw new IllegalStateException("Variable column counts are not implemented for CQL");
}
protected CqlRunOp<V> run(final ClientWrapper client, final List<Object> queryParams, final ByteBuffer key) throws IOException
{
final CqlRunOp<V> op;
if (settings.mode.style == ConnectionStyle.CQL_PREPARED)
{
final Object id;
Object idobj = getCqlCache();
if (idobj == null)
{
try
{
id = client.createPreparedStatement(buildQuery());
} catch (TException e)
{
throw new RuntimeException(e);
}
storeCqlCache(id);
}
else
id = idobj;
op = buildRunOp(client, null, id, queryParams, key);
}
else
{
final String query;
Object qobj = getCqlCache();
if (qobj == null)
storeCqlCache(query = buildQuery());
else
query = qobj.toString();
op = buildRunOp(client, query, null, queryParams, key);
}
timeWithRetry(op);
return op;
}
protected void run(final ClientWrapper client) throws IOException
{
final byte[] key = getKey().array();
final List<Object> queryParams = getQueryParameters(key);
run(client, queryParams, ByteBuffer.wrap(key));
}
// Classes to process Cql results
// Always succeeds so long as the query executes without error; provides a keyCount to increment on instantiation
protected final class CqlRunOpAlwaysSucceed extends CqlRunOp<Integer>
{
final int keyCount;
protected CqlRunOpAlwaysSucceed(ClientWrapper client, String query, Object queryId, List<Object> params, ByteBuffer key, int keyCount)
{
super(client, query, queryId, RowCountHandler.INSTANCE, params, key);
this.keyCount = keyCount;
}
@Override
public boolean validate(Integer result)
{
return true;
}
@Override
public int partitionCount()
{
return keyCount;
}
@Override
public int rowCount()
{
return keyCount;
}
}
// Succeeds so long as the result set is nonempty, and the query executes without error
protected final class CqlRunOpTestNonEmpty extends CqlRunOp<Integer>
{
protected CqlRunOpTestNonEmpty(ClientWrapper client, String query, Object queryId, List<Object> params, ByteBuffer key)
{
super(client, query, queryId, RowCountHandler.INSTANCE, params, key);
}
@Override
public boolean validate(Integer result)
{
return result > 0;
}
@Override
public int partitionCount()
{
return result;
}
@Override
public int rowCount()
{
return result;
}
}
protected final class CqlRunOpMatchResults extends CqlRunOp<ByteBuffer[][]>
{
final List<List<ByteBuffer>> expect;
// a null value for an item in expect means we just check the row is present
protected CqlRunOpMatchResults(ClientWrapper client, String query, Object queryId, List<Object> params, ByteBuffer key, List<List<ByteBuffer>> expect)
{
super(client, query, queryId, RowsHandler.INSTANCE, params, key);
this.expect = expect;
}
@Override
public int partitionCount()
{
return result == null ? 0 : result.length;
}
@Override
public int rowCount()
{
return result == null ? 0 : result.length;
}
public boolean validate(ByteBuffer[][] result)
{
if (result.length != expect.size())
return false;
for (int i = 0 ; i < result.length ; i++)
if (expect.get(i) != null && !expect.get(i).equals(Arrays.asList(result[i])))
return false;
return true;
}
}
// Cql
protected abstract class CqlRunOp<V> implements RunOp
{
final ClientWrapper client;
final String query;
final Object queryId;
final List<Object> params;
final ByteBuffer key;
final ResultHandler<V> handler;
V result;
private CqlRunOp(ClientWrapper client, String query, Object queryId, ResultHandler<V> handler, List<Object> params, ByteBuffer key)
{
this.client = client;
this.query = query;
this.queryId = queryId;
this.handler = handler;
this.params = params;
this.key = key;
}
@Override
public boolean run() throws Exception
{
return queryId != null
? validate(result = client.execute(queryId, key, params, handler))
: validate(result = client.execute(query, key, params, handler));
}
public abstract boolean validate(V result);
}
/// LOTS OF WRAPPING/UNWRAPPING NONSENSE
@Override
public void run(final ThriftClient client) throws IOException
{
run(wrap(client));
}
@Override
public void run(SimpleClient client) throws IOException
{
run(wrap(client));
}
@Override
public void run(JavaDriverClient client) throws IOException
{
run(wrap(client));
}
public ClientWrapper wrap(ThriftClient client)
{
return new Cql3CassandraClientWrapper(client);
}
public ClientWrapper wrap(JavaDriverClient client)
{
return new JavaDriverWrapper(client);
}
public ClientWrapper wrap(SimpleClient client)
{
return new SimpleClientWrapper(client);
}
protected interface ClientWrapper
{
Object createPreparedStatement(String cqlQuery) throws TException;
<V> V execute(Object preparedStatementId, ByteBuffer key, List<Object> queryParams, ResultHandler<V> handler) throws TException;
<V> V execute(String query, ByteBuffer key, List<Object> queryParams, ResultHandler<V> handler) throws TException;
}
private final class JavaDriverWrapper implements ClientWrapper
{
final JavaDriverClient client;
private JavaDriverWrapper(JavaDriverClient client)
{
this.client = client;
}
@Override
public <V> V execute(String query, ByteBuffer key, List<Object> queryParams, ResultHandler<V> handler)
{
String formattedQuery = formatCqlQuery(query, queryParams);
return handler.javaDriverHandler().apply(client.execute(formattedQuery, ThriftConversion.fromThrift(settings.command.consistencyLevel)));
}
@Override
public <V> V execute(Object preparedStatementId, ByteBuffer key, List<Object> queryParams, ResultHandler<V> handler)
{
return handler.javaDriverHandler().apply(
client.executePrepared(
(PreparedStatement) preparedStatementId,
queryParams,
ThriftConversion.fromThrift(settings.command.consistencyLevel)));
}
@Override
public Object createPreparedStatement(String cqlQuery)
{
return client.prepare(cqlQuery);
}
}
private final class SimpleClientWrapper implements ClientWrapper
{
final SimpleClient client;
private SimpleClientWrapper(SimpleClient client)
{
this.client = client;
}
@Override
public <V> V execute(String query, ByteBuffer key, List<Object> queryParams, ResultHandler<V> handler)
{
String formattedQuery = formatCqlQuery(query, queryParams);
return handler.thriftHandler().apply(client.execute(formattedQuery, ThriftConversion.fromThrift(settings.command.consistencyLevel)));
}
@Override
public <V> V execute(Object preparedStatementId, ByteBuffer key, List<Object> queryParams, ResultHandler<V> handler)
{
return handler.thriftHandler().apply(
client.executePrepared(
(byte[]) preparedStatementId,
toByteBufferParams(queryParams),
ThriftConversion.fromThrift(settings.command.consistencyLevel)));
}
@Override
public Object createPreparedStatement(String cqlQuery)
{
return client.prepare(cqlQuery).statementId.bytes;
}
}
// client wrapper for Cql3
private final class Cql3CassandraClientWrapper implements ClientWrapper
{
final ThriftClient client;
private Cql3CassandraClientWrapper(ThriftClient client)
{
this.client = client;
}
@Override
public <V> V execute(String query, ByteBuffer key, List<Object> queryParams, ResultHandler<V> handler) throws TException
{
String formattedQuery = formatCqlQuery(query, queryParams);
return handler.simpleNativeHandler().apply(
client.execute_cql3_query(formattedQuery, key, Compression.NONE, settings.command.consistencyLevel)
);
}
@Override
public <V> V execute(Object preparedStatementId, ByteBuffer key, List<Object> queryParams, ResultHandler<V> handler) throws TException
{
Integer id = (Integer) preparedStatementId;
return handler.simpleNativeHandler().apply(
client.execute_prepared_cql3_query(id, key, toByteBufferParams(queryParams), settings.command.consistencyLevel)
);
}
@Override
public Object createPreparedStatement(String cqlQuery) throws TException
{
return client.prepare_cql3_query(cqlQuery, Compression.NONE);
}
}
// interface for building functions to standardise results from each client
protected static interface ResultHandler<V>
{
Function<ResultSet, V> javaDriverHandler();
Function<ResultMessage, V> thriftHandler();
Function<CqlResult, V> simpleNativeHandler();
}
protected static class RowCountHandler implements ResultHandler<Integer>
{
static final RowCountHandler INSTANCE = new RowCountHandler();
@Override
public Function<ResultSet, Integer> javaDriverHandler()
{
return new Function<ResultSet, Integer>()
{
@Override
public Integer apply(ResultSet rows)
{
if (rows == null)
return 0;
return rows.all().size();
}
};
}
@Override
public Function<ResultMessage, Integer> thriftHandler()
{
return new Function<ResultMessage, Integer>()
{
@Override
public Integer apply(ResultMessage result)
{
return result instanceof ResultMessage.Rows ? ((ResultMessage.Rows) result).result.size() : 0;
}
};
}
@Override
public Function<CqlResult, Integer> simpleNativeHandler()
{
return new Function<CqlResult, Integer>()
{
@Override
public Integer apply(CqlResult result)
{
switch (result.getType())
{
case ROWS:
return result.getRows().size();
default:
return 1;
}
}
};
}
}
// Processes results from each client into an array of all key bytes returned
protected static final class RowsHandler implements ResultHandler<ByteBuffer[][]>
{
static final RowsHandler INSTANCE = new RowsHandler();
@Override
public Function<ResultSet, ByteBuffer[][]> javaDriverHandler()
{
return new Function<ResultSet, ByteBuffer[][]>()
{
@Override
public ByteBuffer[][] apply(ResultSet result)
{
if (result == null)
return new ByteBuffer[0][];
List<Row> rows = result.all();
ByteBuffer[][] r = new ByteBuffer[rows.size()][];
for (int i = 0 ; i < r.length ; i++)
{
Row row = rows.get(i);
r[i] = new ByteBuffer[row.getColumnDefinitions().size()];
for (int j = 0 ; j < row.getColumnDefinitions().size() ; j++)
r[i][j] = row.getBytes(j);
}
return r;
}
};
}
@Override
public Function<ResultMessage, ByteBuffer[][]> thriftHandler()
{
return new Function<ResultMessage, ByteBuffer[][]>()
{
@Override
public ByteBuffer[][] apply(ResultMessage result)
{
if (!(result instanceof ResultMessage.Rows))
return new ByteBuffer[0][];
ResultMessage.Rows rows = ((ResultMessage.Rows) result);
ByteBuffer[][] r = new ByteBuffer[rows.result.size()][];
for (int i = 0 ; i < r.length ; i++)
{
List<ByteBuffer> row = rows.result.rows.get(i);
r[i] = new ByteBuffer[row.size()];
for (int j = 0 ; j < row.size() ; j++)
r[i][j] = row.get(j);
}
return r;
}
};
}
@Override
public Function<CqlResult, ByteBuffer[][]> simpleNativeHandler()
{
return new Function<CqlResult, ByteBuffer[][]>()
{
@Override
public ByteBuffer[][] apply(CqlResult result)
{
ByteBuffer[][] r = new ByteBuffer[result.getRows().size()][];
for (int i = 0 ; i < r.length ; i++)
{
CqlRow row = result.getRows().get(i);
r[i] = new ByteBuffer[row.getColumns().size()];
for (int j = 0 ; j < r[i].length ; j++)
r[i][j] = ByteBuffer.wrap(row.getColumns().get(j).getValue());
}
return r;
}
};
}
}
// Processes results from each client into an array of all key bytes returned
protected static final class KeysHandler implements ResultHandler<byte[][]>
{
static final KeysHandler INSTANCE = new KeysHandler();
@Override
public Function<ResultSet, byte[][]> javaDriverHandler()
{
return new Function<ResultSet, byte[][]>()
{
@Override
public byte[][] apply(ResultSet result)
{
if (result == null)
return new byte[0][];
List<Row> rows = result.all();
byte[][] r = new byte[rows.size()][];
for (int i = 0 ; i < r.length ; i++)
r[i] = rows.get(i).getBytes(0).array();
return r;
}
};
}
@Override
public Function<ResultMessage, byte[][]> thriftHandler()
{
return new Function<ResultMessage, byte[][]>()
{
@Override
public byte[][] apply(ResultMessage result)
{
if (result instanceof ResultMessage.Rows)
{
ResultMessage.Rows rows = ((ResultMessage.Rows) result);
byte[][] r = new byte[rows.result.size()][];
for (int i = 0 ; i < r.length ; i++)
r[i] = rows.result.rows.get(i).get(0).array();
return r;
}
return null;
}
};
}
@Override
public Function<CqlResult, byte[][]> simpleNativeHandler()
{
return new Function<CqlResult, byte[][]>()
{
@Override
public byte[][] apply(CqlResult result)
{
byte[][] r = new byte[result.getRows().size()][];
for (int i = 0 ; i < r.length ; i++)
r[i] = result.getRows().get(i).getKey();
return r;
}
};
}
}
private static String getUnQuotedCqlBlob(ByteBuffer term)
{
return "0x" + ByteBufferUtil.bytesToHex(term);
}
/**
* Constructs a CQL query string by replacing instances of the character
* '?', with the corresponding parameter.
*
* @param query base query string to format
* @param parms sequence of string query parameters
* @return formatted CQL query string
*/
private static String formatCqlQuery(String query, List<Object> parms)
{
int marker, position = 0;
StringBuilder result = new StringBuilder();
if (-1 == (marker = query.indexOf('?')) || parms.size() == 0)
return query;
for (Object parm : parms)
{
result.append(query.substring(position, marker));
if (parm instanceof ByteBuffer)
result.append(getUnQuotedCqlBlob((ByteBuffer) parm));
else if (parm instanceof Long)
result.append(parm);
else throw new AssertionError();
position = marker + 1;
if (-1 == (marker = query.indexOf('?', position + 1)))
break;
}
if (position < query.length())
result.append(query.substring(position));
return result.toString();
}
private static List<ByteBuffer> toByteBufferParams(List<Object> params)
{
List<ByteBuffer> r = new ArrayList<>();
for (Object param : params)
{
if (param instanceof ByteBuffer)
r.add((ByteBuffer) param);
else if (param instanceof Long)
r.add(ByteBufferUtil.bytes((Long) param));
else throw new AssertionError();
}
return r;
}
protected String wrapInQuotes(String string)
{
return "\"" + string + "\"";
}
}