blob: 0604df67fb5c8d304d95d7f73d4588cb7741084b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.fql;
import java.io.File;
import java.nio.ByteBuffer;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import javax.annotation.Nullable;
import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
import com.google.common.primitives.Ints;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.buffer.ByteBuf;
import net.openhft.chronicle.bytes.BytesStore;
import net.openhft.chronicle.wire.ValueOut;
import net.openhft.chronicle.wire.WireOut;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.cql3.CQLStatement;
import org.apache.cassandra.cql3.QueryEvents;
import org.apache.cassandra.cql3.QueryOptions;
import org.apache.cassandra.cql3.statements.BatchStatement;
import org.apache.cassandra.service.QueryState;
import org.apache.cassandra.transport.CBUtil;
import org.apache.cassandra.transport.Message;
import org.apache.cassandra.utils.ObjectSizes;
import org.apache.cassandra.utils.binlog.BinLog;
import org.apache.cassandra.utils.binlog.BinLogOptions;
import org.apache.cassandra.utils.concurrent.WeightedQueue;
import org.github.jamm.MemoryLayoutSpecification;
import static com.google.common.base.Preconditions.checkNotNull;
/**
* A logger that logs entire query contents after the query finishes (or times out).
*/
public class FullQueryLogger implements QueryEvents.Listener
{
protected static final Logger logger = LoggerFactory.getLogger(FullQueryLogger.class);
public static final long CURRENT_VERSION = 0; // encode a dummy version, to prevent pain in decoding in the future
public static final String VERSION = "version";
public static final String TYPE = "type";
public static final String PROTOCOL_VERSION = "protocol-version";
public static final String QUERY_OPTIONS = "query-options";
public static final String QUERY_START_TIME = "query-start-time";
public static final String GENERATED_TIMESTAMP = "generated-timestamp";
public static final String GENERATED_NOW_IN_SECONDS = "generated-now-in-seconds";
public static final String KEYSPACE = "keyspace";
public static final String BATCH = "batch";
public static final String SINGLE_QUERY = "single-query";
public static final String QUERY = "query";
public static final String BATCH_TYPE = "batch-type";
public static final String QUERIES = "queries";
public static final String VALUES = "values";
private static final int EMPTY_LIST_SIZE = Ints.checkedCast(ObjectSizes.measureDeep(new ArrayList<>(0)));
private static final int EMPTY_BYTEBUF_SIZE;
private static final int OBJECT_HEADER_SIZE = MemoryLayoutSpecification.SPEC.getObjectHeaderSize();
private static final int OBJECT_REFERENCE_SIZE = MemoryLayoutSpecification.SPEC.getReferenceSize();
public static final FullQueryLogger instance = new FullQueryLogger();
volatile BinLog binLog;
public synchronized void enable(Path path, String rollCycle, boolean blocking, int maxQueueWeight, long maxLogSize, String archiveCommand, int maxArchiveRetries)
{
if (this.binLog != null)
throw new IllegalStateException("Binlog is already configured");
this.binLog = new BinLog.Builder().path(path)
.rollCycle(rollCycle)
.blocking(blocking)
.maxQueueWeight(maxQueueWeight)
.maxLogSize(maxLogSize)
.archiveCommand(archiveCommand)
.maxArchiveRetries(maxArchiveRetries)
.build(true);
QueryEvents.instance.registerListener(this);
}
public synchronized void enableWithoutClean(Path path, String rollCycle, boolean blocking, int maxQueueWeight, long maxLogSize, String archiveCommand, int maxArchiveRetries)
{
if (this.binLog != null)
throw new IllegalStateException("Binlog is already configured");
this.binLog = new BinLog.Builder().path(path)
.rollCycle(rollCycle)
.blocking(blocking)
.maxQueueWeight(maxQueueWeight)
.maxLogSize(maxLogSize)
.archiveCommand(archiveCommand)
.maxArchiveRetries(maxArchiveRetries)
.build(false);
QueryEvents.instance.registerListener(this);
}
static
{
ByteBuf buf = CBUtil.allocator.buffer(0, 0);
try
{
EMPTY_BYTEBUF_SIZE = Ints.checkedCast(ObjectSizes.measure(buf));
}
finally
{
buf.release();
}
}
public FullQueryLoggerOptions getFullQueryLoggerOptions()
{
if (isEnabled())
{
final FullQueryLoggerOptions options = new FullQueryLoggerOptions();
final BinLogOptions binLogOptions = binLog.getBinLogOptions();
options.archive_command = binLogOptions.archive_command;
options.roll_cycle = binLogOptions.roll_cycle;
options.block = binLogOptions.block;
options.max_archive_retries = binLogOptions.max_archive_retries;
options.max_queue_weight = binLogOptions.max_queue_weight;
options.max_log_size = binLogOptions.max_log_size;
options.log_dir = binLog.path.toString();
return options;
}
else
{
// otherwise get what database is configured with from cassandra.yaml
return DatabaseDescriptor.getFullQueryLogOptions();
}
}
public synchronized void stop()
{
try
{
BinLog binLog = this.binLog;
if (binLog != null)
{
logger.info("Stopping full query logging to {}", binLog.path);
binLog.stop();
}
else
{
logger.info("Full query log already stopped");
}
}
catch (InterruptedException e)
{
throw new RuntimeException(e);
}
finally
{
QueryEvents.instance.unregisterListener(this);
this.binLog = null;
}
}
/**
* Need the path as a parameter as well because if the process is restarted the config file might be the only
* location for retrieving the path to the full query log files, but JMX also allows you to specify a path
* that isn't persisted anywhere so we have to clean that one as well.
*/
public synchronized void reset(String fullQueryLogPath)
{
try
{
Set<File> pathsToClean = Sets.newHashSet();
//First decide whether to clean the path configured in the YAML
if (fullQueryLogPath != null)
{
File fullQueryLogPathFile = new File(fullQueryLogPath);
if (fullQueryLogPathFile.exists())
{
pathsToClean.add(fullQueryLogPathFile);
}
}
//Then decide whether to clean the last used path, possibly configured by JMX
if (binLog != null && binLog.path != null)
{
File pathFile = binLog.path.toFile();
if (pathFile.exists())
{
pathsToClean.add(pathFile);
}
}
logger.info("Reset (and deactivation) of full query log requested.");
if (binLog != null)
{
logger.info("Stopping full query log. Cleaning {}.", pathsToClean);
binLog.stop();
binLog = null;
}
else
{
logger.info("Full query log already deactivated. Cleaning {}.", pathsToClean);
}
Throwable accumulate = null;
for (File f : pathsToClean)
{
accumulate = BinLog.cleanDirectory(f, accumulate);
}
if (accumulate != null)
{
throw new RuntimeException(accumulate);
}
}
catch (Exception e)
{
if (e instanceof RuntimeException)
{
throw (RuntimeException)e;
}
throw new RuntimeException(e);
}
finally
{
QueryEvents.instance.unregisterListener(this);
}
}
public boolean isEnabled()
{
return this.binLog != null;
}
/**
* Log an invocation of a batch of queries
* @param type The type of the batch
* @param statements the prepared cql statements (unused here)
* @param queries CQL text of the queries
* @param values Values to bind to as parameters for the queries
* @param queryOptions Options associated with the query invocation
* @param queryState Timestamp state associated with the query invocation
* @param batchTimeMillis Approximate time in milliseconds since the epoch since the batch was invoked
* @param response the response from the batch query
*/
public void batchSuccess(BatchStatement.Type type,
List<? extends CQLStatement> statements,
List<String> queries,
List<List<ByteBuffer>> values,
QueryOptions queryOptions,
QueryState queryState,
long batchTimeMillis,
Message.Response response)
{
checkNotNull(type, "type was null");
checkNotNull(queries, "queries was null");
checkNotNull(values, "value was null");
checkNotNull(queryOptions, "queryOptions was null");
checkNotNull(queryState, "queryState was null");
Preconditions.checkArgument(batchTimeMillis > 0, "batchTimeMillis must be > 0");
//Don't construct the wrapper if the log is disabled
BinLog binLog = this.binLog;
if (binLog == null)
{
return;
}
Batch wrappedBatch = new Batch(type, queries, values, queryOptions, queryState, batchTimeMillis);
binLog.logRecord(wrappedBatch);
}
/**
* Log a single CQL query
* @param query CQL query text
* @param queryOptions Options associated with the query invocation
* @param queryState Timestamp state associated with the query invocation
* @param queryTimeMillis Approximate time in milliseconds since the epoch since the batch was invoked
* @param response the response from this query
*/
public void querySuccess(CQLStatement statement,
String query,
QueryOptions queryOptions,
QueryState queryState,
long queryTimeMillis,
Message.Response response)
{
checkNotNull(query, "query was null");
checkNotNull(queryOptions, "queryOptions was null");
checkNotNull(queryState, "queryState was null");
Preconditions.checkArgument(queryTimeMillis > 0, "queryTimeMillis must be > 0");
//Don't construct the wrapper if the log is disabled
BinLog binLog = this.binLog;
if (binLog == null)
return;
Query wrappedQuery = new Query(query, queryOptions, queryState, queryTimeMillis);
binLog.logRecord(wrappedQuery);
}
public void executeSuccess(CQLStatement statement, String query, QueryOptions options, QueryState state, long queryTime, Message.Response response)
{
querySuccess(statement, query, options, state, queryTime, response);
}
public static class Query extends AbstractLogEntry
{
private final String query;
public Query(String query, QueryOptions queryOptions, QueryState queryState, long queryStartTime)
{
super(queryOptions, queryState, queryStartTime);
this.query = query;
}
@Override
protected String type()
{
return SINGLE_QUERY;
}
@Override
public void writeMarshallablePayload(WireOut wire)
{
super.writeMarshallablePayload(wire);
wire.write(QUERY).text(query);
}
@Override
public int weight()
{
return Ints.checkedCast(ObjectSizes.sizeOf(query)) + super.weight();
}
}
public static class Batch extends AbstractLogEntry
{
private final int weight;
private final BatchStatement.Type batchType;
private final List<String> queries;
private final List<List<ByteBuffer>> values;
public Batch(BatchStatement.Type batchType,
List<String> queries,
List<List<ByteBuffer>> values,
QueryOptions queryOptions,
QueryState queryState,
long batchTimeMillis)
{
super(queryOptions, queryState, batchTimeMillis);
this.queries = queries;
this.values = values;
this.batchType = batchType;
int weight = super.weight();
// weight, queries, values, batch type
weight += Integer.BYTES + // cached weight
2 * EMPTY_LIST_SIZE + // queries + values lists
3 * OBJECT_REFERENCE_SIZE; // batchType and two lists references
for (String query : queries)
weight += ObjectSizes.sizeOf(checkNotNull(query)) + OBJECT_REFERENCE_SIZE;
for (List<ByteBuffer> subValues : values)
{
weight += EMPTY_LIST_SIZE + OBJECT_REFERENCE_SIZE;
for (ByteBuffer value : subValues)
weight += ObjectSizes.sizeOnHeapOf(value) + OBJECT_REFERENCE_SIZE;
}
this.weight = weight;
}
@Override
protected String type()
{
return BATCH;
}
@Override
public void writeMarshallablePayload(WireOut wire)
{
super.writeMarshallablePayload(wire);
wire.write(BATCH_TYPE).text(batchType.name());
ValueOut valueOut = wire.write(QUERIES);
valueOut.int32(queries.size());
for (String query : queries)
{
valueOut.text(query);
}
valueOut = wire.write(VALUES);
valueOut.int32(values.size());
for (List<ByteBuffer> subValues : values)
{
valueOut.int32(subValues.size());
for (ByteBuffer value : subValues)
{
valueOut.bytes(BytesStore.wrap(value));
}
}
}
@Override
public int weight()
{
return weight;
}
}
private static abstract class AbstractLogEntry extends BinLog.ReleaseableWriteMarshallable implements WeightedQueue.Weighable
{
private final long queryStartTime;
private final int protocolVersion;
private final ByteBuf queryOptionsBuffer;
private final long generatedTimestamp;
private final int generatedNowInSeconds;
@Nullable
private final String keyspace;
AbstractLogEntry(QueryOptions queryOptions, QueryState queryState, long queryStartTime)
{
this.queryStartTime = queryStartTime;
this.protocolVersion = queryOptions.getProtocolVersion().asInt();
int optionsSize = QueryOptions.codec.encodedSize(queryOptions, queryOptions.getProtocolVersion());
queryOptionsBuffer = CBUtil.allocator.buffer(optionsSize, optionsSize);
this.generatedTimestamp = queryState.generatedTimestamp();
this.generatedNowInSeconds = queryState.generatedNowInSeconds();
this.keyspace = queryState.getClientState().getRawKeyspace();
/*
* Struggled with what tradeoff to make in terms of query options which is potentially large and complicated
* There is tension between low garbage production (or allocator overhead), small working set size, and CPU overhead reserializing the
* query options into binary format.
*
* I went with the lowest risk most predictable option which is allocator overhead and CPU overhead
* rather then keep the original query message around so I could just serialize that as a memcpy. It's more
* instructions when turned on, but it doesn't change memory footprint quite as much and it's more pay for what you use
* in terms of query volume. The CPU overhead is spread out across producers so we should at least get
* some scaling.
*
*/
try
{
QueryOptions.codec.encode(queryOptions, queryOptionsBuffer, queryOptions.getProtocolVersion());
}
catch (Throwable e)
{
queryOptionsBuffer.release();
throw e;
}
}
@Override
protected long version()
{
return CURRENT_VERSION;
}
@Override
public void writeMarshallablePayload(WireOut wire)
{
wire.write(QUERY_START_TIME).int64(queryStartTime);
wire.write(PROTOCOL_VERSION).int32(protocolVersion);
wire.write(QUERY_OPTIONS).bytes(BytesStore.wrap(queryOptionsBuffer.nioBuffer()));
wire.write(GENERATED_TIMESTAMP).int64(generatedTimestamp);
wire.write(GENERATED_NOW_IN_SECONDS).int32(generatedNowInSeconds);
wire.write(KEYSPACE).text(keyspace);
}
@Override
public void release()
{
queryOptionsBuffer.release();
}
@Override
public int weight()
{
return OBJECT_HEADER_SIZE
+ Long.BYTES // queryStartTime
+ Integer.BYTES // protocolVersion
+ OBJECT_REFERENCE_SIZE + EMPTY_BYTEBUF_SIZE + queryOptionsBuffer.capacity() // queryOptionsBuffer
+ Long.BYTES // generatedTimestamp
+ Integer.BYTES // generatedNowInSeconds
+ OBJECT_REFERENCE_SIZE + Ints.checkedCast(ObjectSizes.sizeOf(keyspace)); // keyspace
}
}
}