blob: 799484fc92e0c10d49580ac7a81304baf638a1f1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.fql;
import java.nio.ByteBuffer;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nullable;
import org.apache.cassandra.io.util.File;
import org.apache.commons.lang3.StringUtils;
import org.junit.After;
import org.junit.BeforeClass;
import org.junit.Test;
import io.netty.buffer.Unpooled;
import net.openhft.chronicle.queue.ChronicleQueue;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder;
import net.openhft.chronicle.queue.ExcerptTailer;
import net.openhft.chronicle.queue.RollCycles;
import net.openhft.chronicle.wire.ValueIn;
import net.openhft.chronicle.wire.WireOut;
import org.apache.cassandra.Util;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.cql3.CQLTester;
import org.apache.cassandra.cql3.QueryOptions;
import org.apache.cassandra.cql3.statements.BatchStatement;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.fql.FullQueryLogger.Query;
import org.apache.cassandra.fql.FullQueryLogger.Batch;
import org.apache.cassandra.cql3.statements.BatchStatement.Type;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.service.QueryState;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.transport.ProtocolVersion;
import org.apache.cassandra.utils.ObjectSizes;
import org.apache.cassandra.utils.binlog.BinLogTest;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.apache.cassandra.fql.FullQueryLogger.BATCH;
import static org.apache.cassandra.fql.FullQueryLogger.BATCH_TYPE;
import static org.apache.cassandra.fql.FullQueryLogger.GENERATED_NOW_IN_SECONDS;
import static org.apache.cassandra.fql.FullQueryLogger.GENERATED_TIMESTAMP;
import static org.apache.cassandra.fql.FullQueryLogger.PROTOCOL_VERSION;
import static org.apache.cassandra.fql.FullQueryLogger.QUERIES;
import static org.apache.cassandra.fql.FullQueryLogger.QUERY;
import static org.apache.cassandra.fql.FullQueryLogger.QUERY_OPTIONS;
import static org.apache.cassandra.fql.FullQueryLogger.QUERY_START_TIME;
import static org.apache.cassandra.fql.FullQueryLogger.SINGLE_QUERY;
import static org.apache.cassandra.fql.FullQueryLogger.TYPE;
import static org.apache.cassandra.fql.FullQueryLogger.VALUES;
import static org.apache.cassandra.fql.FullQueryLogger.VERSION;
import static org.junit.Assert.fail;
public class FullQueryLoggerTest extends CQLTester
{
private static Path tempDir;
@BeforeClass
public static void beforeClass() throws Exception
{
tempDir = BinLogTest.tempDir();
}
@After
public void tearDown()
{
FullQueryLogger.instance.stop();
FullQueryLogger.instance.reset(tempDir.toString());
}
@Test(expected = NullPointerException.class)
public void testConfigureNullPath() throws Exception
{
FullQueryLogger.instance.enable(null, "", true, 1, 1, StringUtils.EMPTY, 10);
}
@Test(expected = NullPointerException.class)
public void testConfigureNullRollCycle() throws Exception
{
FullQueryLogger.instance.enable(BinLogTest.tempDir(), null, true, 1, 1, StringUtils.EMPTY, 10);
}
@Test(expected = IllegalArgumentException.class)
public void testConfigureInvalidRollCycle() throws Exception
{
FullQueryLogger.instance.enable(BinLogTest.tempDir(), "foobar", true, 1, 1, StringUtils.EMPTY, 10);
}
@Test(expected = IllegalArgumentException.class)
public void testConfigureInvalidMaxQueueWeight() throws Exception
{
FullQueryLogger.instance.enable(BinLogTest.tempDir(), "DAILY", true, 0, 1, StringUtils.EMPTY, 10);
}
@Test(expected = IllegalArgumentException.class)
public void testConfigureInvalidMaxQueueLogSize() throws Exception
{
FullQueryLogger.instance.enable(BinLogTest.tempDir(), "DAILY", true, 1, 0, StringUtils.EMPTY, 10);
}
@Test(expected = IllegalArgumentException.class)
public void testConfigureOverExistingFile()
{
File f = FileUtils.createTempFile("foo", "bar");
f.deleteOnExit();
FullQueryLogger.instance.enable(f.toPath(), "TEST_SECONDLY", true, 1, 1, StringUtils.EMPTY, 10);
}
@Test(expected = IllegalArgumentException.class)
public void testCanRead() throws Exception
{
new File(tempDir).trySetReadable(false);
try
{
configureFQL();
}
finally
{
new File(tempDir).trySetReadable(true);
}
}
@Test(expected = IllegalArgumentException.class)
public void testCanWrite() throws Exception
{
new File(tempDir).trySetWritable(false);
try
{
configureFQL();
}
finally
{
new File(tempDir).trySetWritable(true);
}
}
@Test(expected = IllegalArgumentException.class)
public void testCanExecute() throws Exception
{
new File(tempDir).trySetExecutable(false);
try
{
configureFQL();
}
finally
{
new File(tempDir).trySetExecutable(true);
}
}
@Test
public void testResetWithoutConfigure() throws Exception
{
FullQueryLogger.instance.reset(tempDir.toString());
FullQueryLogger.instance.reset(tempDir.toString());
}
@Test
public void stopWithoutConfigure() throws Exception
{
FullQueryLogger.instance.stop();
FullQueryLogger.instance.stop();
}
/**
* Both the last used and supplied directory should get cleaned
*/
@Test
public void testResetCleansPaths() throws Exception
{
configureFQL();
File tempA = FileUtils.createTempFile("foo", "bar", new File(tempDir));
assertTrue(tempA.exists());
File tempB = FileUtils.createTempFile("foo", "bar", new File(BinLogTest.tempDir()));
FullQueryLogger.instance.reset(tempB.parentPath());
assertFalse(tempA.exists());
assertFalse(tempB.exists());
}
/**
* The last used and configured directory are the same and it shouldn't be an issue
*/
@Test
public void testResetSamePath() throws Exception
{
configureFQL();
File tempA = FileUtils.createTempFile("foo", "bar", new File(tempDir));
assertTrue(tempA.exists());
FullQueryLogger.instance.reset(tempA.parentPath());
assertFalse(tempA.exists());
}
@Test(expected = IllegalStateException.class)
public void testDoubleConfigure() throws Exception
{
configureFQL();
configureFQL();
}
@Test
public void testCleansDirectory() throws Exception
{
assertTrue(new File(tempDir, "foobar").createFileIfNotExists());
configureFQL();
assertEquals(new File(tempDir).tryList().length, 1);
assertEquals("metadata.cq4t", new File(tempDir).tryList()[0].name());
}
@Test
public void testEnabledReset() throws Exception
{
assertFalse(FullQueryLogger.instance.isEnabled());
configureFQL();
assertTrue(FullQueryLogger.instance.isEnabled());
FullQueryLogger.instance.reset(tempDir.toString());
assertFalse(FullQueryLogger.instance.isEnabled());
}
@Test
public void testEnabledStop() throws Exception
{
assertFalse(FullQueryLogger.instance.isEnabled());
configureFQL();
assertTrue(FullQueryLogger.instance.isEnabled());
FullQueryLogger.instance.stop();
assertFalse(FullQueryLogger.instance.isEnabled());
}
/**
* Test that a thread will block if the FQL is over weight, and unblock once the backup is cleared
*/
@Test
public void testBlocking() throws Exception
{
configureFQL();
//Prevent the bin log thread from making progress, causing the task queue to block
Semaphore blockBinLog = new Semaphore(0);
try
{
//Find out when the bin log thread has been blocked, necessary to not run into batch task drain behavior
Semaphore binLogBlocked = new Semaphore(0);
FullQueryLogger.instance.binLog.put(new Query("foo1", QueryOptions.DEFAULT, queryState(), 1)
{
public void writeMarshallablePayload(WireOut wire)
{
//Notify that the bin log is blocking now
binLogBlocked.release();
try
{
//Block the bin log thread so the task queue can be filled
blockBinLog.acquire();
}
catch (InterruptedException e)
{
e.printStackTrace();
}
super.writeMarshallablePayload(wire);
}
public void release()
{
super.release();
}
});
//Wait for the bin log thread to block so it can't batch drain tasks
Util.spinAssertEquals(true, binLogBlocked::tryAcquire, 60);
//Now fill the task queue
logQuery("foo2");
//Start a thread to block waiting on the bin log queue
Thread t = new Thread(() ->
{
logQuery("foo3");
//Should be able to log another query without an issue
logQuery("foo4");
});
t.start();
Thread.sleep(500);
//If thread state is terminated then the thread started, finished, and didn't block on the full task queue
assertTrue(t.getState() != Thread.State.TERMINATED);
}
finally
{
//Unblock the binlog thread
blockBinLog.release();
}
Util.spinAssertEquals(true, () -> checkForQueries(Arrays.asList("foo1", "foo2", "foo3", "foo4")), 60);
}
private boolean checkForQueries(List<String> queries)
{
try (ChronicleQueue queue = SingleChronicleQueueBuilder.single(tempDir.toFile()).rollCycle(RollCycles.TEST_SECONDLY).build())
{
ExcerptTailer tailer = queue.createTailer();
List<String> expectedQueries = new LinkedList<>(queries);
while (!expectedQueries.isEmpty())
{
if (!tailer.readDocument(wire -> {
assertEquals(expectedQueries.get(0), wire.read("query").text());
expectedQueries.remove(0);
}))
{
return false;
}
}
assertFalse(tailer.readDocument(wire -> {}));
return true;
}
}
@Test
public void testNonBlocking() throws Exception
{
FullQueryLogger.instance.enable(tempDir, "TEST_SECONDLY", false, 1, 1024 * 1024 * 256, StringUtils.EMPTY, 10);
//Prevent the bin log thread from making progress, causing the task queue to refuse tasks
Semaphore blockBinLog = new Semaphore(0);
try
{
//Find out when the bin log thread has been blocked, necessary to not run into batch task drain behavior
Semaphore binLogBlocked = new Semaphore(0);
FullQueryLogger.instance.binLog.put(new Query("foo1", QueryOptions.DEFAULT, queryState(), 1)
{
public void writeMarshallablePayload(WireOut wire)
{
//Notify that the bin log is blocking now
binLogBlocked.release();
try
{
//Block the bin log thread so the task queue can be filled
blockBinLog.acquire();
}
catch (InterruptedException e)
{
e.printStackTrace();
}
super.writeMarshallablePayload(wire);
}
public void release()
{
super.release();
}
});
//Wait for the bin log thread to block so it can't batch drain tasks
Util.spinAssertEquals(true, binLogBlocked::tryAcquire, 60);
//Now fill the task queue
logQuery("foo2");
//This sample should get dropped AKA released without being written
AtomicInteger releasedCount = new AtomicInteger(0);
AtomicInteger writtenCount = new AtomicInteger(0);
FullQueryLogger.instance.binLog.logRecord(new Query("foo3", QueryOptions.DEFAULT, queryState(), 1) {
public void writeMarshallablePayload(WireOut wire)
{
writtenCount.incrementAndGet();
super.writeMarshallablePayload(wire);
}
public void release()
{
releasedCount.incrementAndGet();
super.release();
}
});
Util.spinAssertEquals(1, releasedCount::get, 60);
assertEquals(0, writtenCount.get());
}
finally
{
blockBinLog.release();
}
//Wait for tasks to drain so there should be space in the queue
Util.spinAssertEquals(true, () -> checkForQueries(Arrays.asList("foo1", "foo2")), 60);
//Should be able to log again
logQuery("foo4");
Util.spinAssertEquals(true, () -> checkForQueries(Arrays.asList("foo1", "foo2", "foo4")), 60);
}
@Test
public void testRoundTripQuery() throws Exception
{
configureFQL();
logQuery("foo");
Util.spinAssertEquals(true, () -> checkForQueries(Arrays.asList("foo")), 60);
assertRoundTripQuery(null);
}
@Test
public void testRoundTripQueryWithKeyspace() throws Exception
{
configureFQL();
logQuery("foo", "abcdefg");
Util.spinAssertEquals(true, () -> checkForQueries(Arrays.asList("foo")), 60);
assertRoundTripQuery("abcdefg");
}
private void assertRoundTripQuery(@Nullable String keyspace)
{
try (ChronicleQueue queue = SingleChronicleQueueBuilder.single(tempDir.toFile()).rollCycle(RollCycles.TEST_SECONDLY).build())
{
ExcerptTailer tailer = queue.createTailer();
assertTrue(tailer.readDocument(wire ->
{
assertEquals(0, wire.read(VERSION).int16());
assertEquals(SINGLE_QUERY, wire.read(TYPE).text());
assertEquals(1L, wire.read(QUERY_START_TIME).int64());
ProtocolVersion protocolVersion = ProtocolVersion.decode(wire.read(PROTOCOL_VERSION).int32(), true);
assertEquals(ProtocolVersion.CURRENT, protocolVersion);
QueryOptions queryOptions = QueryOptions.codec.decode(Unpooled.wrappedBuffer(wire.read(QUERY_OPTIONS).bytes()), protocolVersion);
compareQueryOptions(QueryOptions.DEFAULT, queryOptions);
String wireKeyspace = wire.read(FullQueryLogger.KEYSPACE).text();
assertEquals(keyspace, wireKeyspace);
assertEquals("foo", wire.read(QUERY).text());
}));
}
}
@Test
public void testRoundTripBatchWithKeyspace() throws Exception
{
configureFQL();
logBatch(Type.UNLOGGED,
Arrays.asList("foo1", "foo2"),
Arrays.asList(Arrays.asList(ByteBuffer.allocate(1),
ByteBuffer.allocateDirect(2)),
Collections.emptyList()),
QueryOptions.DEFAULT,
queryState("abcdefgh"),
1);
Util.spinAssertEquals(true, () ->
{
try (ChronicleQueue queue = SingleChronicleQueueBuilder.single(tempDir.toFile()).rollCycle(RollCycles.TEST_SECONDLY).build())
{
return queue.createTailer().readingDocument().isPresent();
}
}, 60);
assertRoundTripBatch("abcdefgh");
}
@Test
public void testRoundTripBatchWithKeyspaceNull() throws Exception
{
configureFQL();
logBatch(Type.UNLOGGED,
Arrays.asList("foo1", "foo2"),
Arrays.asList(Arrays.asList(ByteBuffer.allocate(1),
ByteBuffer.allocateDirect(2)),
Collections.emptyList()),
QueryOptions.DEFAULT,
queryState(),
1);
Util.spinAssertEquals(true, () ->
{
try (ChronicleQueue queue = SingleChronicleQueueBuilder.single(tempDir.toFile()).rollCycle(RollCycles.TEST_SECONDLY).build())
{
return queue.createTailer().readingDocument().isPresent();
}
}, 60);
assertRoundTripBatch(null);
}
private void assertRoundTripBatch(@Nullable String keyspace)
{
try (ChronicleQueue queue = SingleChronicleQueueBuilder.single(tempDir.toFile()).rollCycle(RollCycles.TEST_SECONDLY).build())
{
ExcerptTailer tailer = queue.createTailer();
assertTrue(tailer.readDocument(wire -> {
assertEquals(0, wire.read(VERSION).int16());
assertEquals(BATCH, wire.read(TYPE).text());
assertEquals(1L, wire.read(QUERY_START_TIME).int64());
ProtocolVersion protocolVersion = ProtocolVersion.decode(wire.read(PROTOCOL_VERSION).int32(), true);
assertEquals(ProtocolVersion.CURRENT, protocolVersion);
QueryOptions queryOptions = QueryOptions.codec.decode(Unpooled.wrappedBuffer(wire.read(QUERY_OPTIONS).bytes()), protocolVersion);
compareQueryOptions(QueryOptions.DEFAULT, queryOptions);
assertEquals(Long.MIN_VALUE, wire.read(GENERATED_TIMESTAMP).int64());
assertEquals(Integer.MIN_VALUE, wire.read(GENERATED_NOW_IN_SECONDS).int32());
assertEquals(keyspace, wire.read(FullQueryLogger.KEYSPACE).text());
assertEquals("UNLOGGED", wire.read(BATCH_TYPE).text());
ValueIn in = wire.read(QUERIES);
assertEquals(2, in.int32());
assertEquals("foo1", in.text());
assertEquals("foo2", in.text());
in = wire.read(VALUES);
assertEquals(2, in.int32());
assertEquals(2, in.int32());
assertTrue(Arrays.equals(new byte[1], in.bytes()));
assertTrue(Arrays.equals(new byte[2], in.bytes()));
assertEquals(0, in.int32());
}));
}
}
@Test
public void testQueryWeight()
{
//Empty query should have some weight
Query query = new Query("", QueryOptions.DEFAULT, queryState(), 1);
assertTrue(query.weight() >= 95);
StringBuilder sb = new StringBuilder();
for (int ii = 0; ii < 1024 * 1024; ii++)
{
sb.append('a');
}
query = new Query(sb.toString(), QueryOptions.DEFAULT, queryState(), 1);
//A large query should be reflected in the size, * 2 since characters are still two bytes
assertTrue(query.weight() > ObjectSizes.measureDeep(sb.toString()));
//Large query options should be reflected
QueryOptions largeOptions = QueryOptions.forInternalCalls(Arrays.asList(ByteBuffer.allocate(1024 * 1024)));
query = new Query("", largeOptions, queryState(), 1);
assertTrue(query.weight() > 1024 * 1024);
System.out.printf("weight %d%n", query.weight());
}
@Test
public void testBatchWeight()
{
//An empty batch should have weight
Batch batch = new Batch(Type.UNLOGGED, new ArrayList<>(), new ArrayList<>(), QueryOptions.DEFAULT, queryState(), 1);
assertTrue(batch.weight() > 0);
// make sure that a batch with keyspace set has a higher weight
Batch batch2 = new Batch(Type.UNLOGGED, new ArrayList<>(), new ArrayList<>(), QueryOptions.DEFAULT, queryState("ABABA"), 1);
assertTrue(batch.weight() < batch2.weight());
StringBuilder sb = new StringBuilder();
for (int ii = 0; ii < 1024 * 1024; ii++)
{
sb.append('a');
}
//The weight of the list containing queries should be reflected
List<String> bigList = new ArrayList(100000);
for (int ii = 0; ii < 100000; ii++)
{
bigList.add("");
}
batch = new Batch(Type.UNLOGGED, bigList, new ArrayList<>(), QueryOptions.DEFAULT, queryState(), 1);
assertTrue(batch.weight() > ObjectSizes.measureDeep(bigList));
//The size of the query should be reflected
bigList = new ArrayList(1);
bigList.add(sb.toString());
batch = new Batch(Type.UNLOGGED, bigList, new ArrayList<>(), QueryOptions.DEFAULT, queryState(), 1);
assertTrue(batch.weight() > ObjectSizes.measureDeep(bigList));
bigList = null;
//The size of the list of values should be reflected
List<List<ByteBuffer>> bigValues = new ArrayList<>(100000);
for (int ii = 0; ii < 100000; ii++)
{
bigValues.add(new ArrayList<>(0));
}
bigValues.get(0).add(ByteBuffer.allocate(1024 * 1024 * 5));
batch = new Batch(Type.UNLOGGED, new ArrayList<>(), bigValues, QueryOptions.DEFAULT, queryState(), 1);
assertTrue(batch.weight() > ObjectSizes.measureDeep(bigValues));
//As should the size of the values
QueryOptions largeOptions = QueryOptions.forInternalCalls(Arrays.asList(ByteBuffer.allocate(1024 * 1024)));
batch = new Batch(Type.UNLOGGED, new ArrayList<>(), new ArrayList<>(), largeOptions, queryState(), 1);
assertTrue(batch.weight() > 1024 * 1024);
}
@Test(expected = NullPointerException.class)
public void testLogBatchNullType() throws Exception
{
logBatch(null, new ArrayList<>(), new ArrayList<>(), QueryOptions.DEFAULT, queryState(), 1);
}
@Test(expected = NullPointerException.class)
public void testLogBatchNullQueries() throws Exception
{
logBatch(Type.UNLOGGED, null, new ArrayList<>(), QueryOptions.DEFAULT, queryState(), 1);
}
@Test(expected = NullPointerException.class)
public void testLogBatchNullQueriesQuery() throws Exception
{
configureFQL();
logBatch(Type.UNLOGGED, Arrays.asList((String)null), new ArrayList<>(), QueryOptions.DEFAULT, queryState(), 1);
}
@Test(expected = NullPointerException.class)
public void testLogBatchNullValues() throws Exception
{
logBatch(Type.UNLOGGED, new ArrayList<>(), null, QueryOptions.DEFAULT, queryState(), 1);
}
@Test(expected = NullPointerException.class)
public void testLogBatchNullValuesValue() throws Exception
{
logBatch(Type.UNLOGGED, new ArrayList<>(), Arrays.asList((List<ByteBuffer>)null), null, queryState(), 1);
}
@Test(expected = NullPointerException.class)
public void testLogBatchNullQueryOptions() throws Exception
{
logBatch(Type.UNLOGGED, new ArrayList<>(), new ArrayList<>(), null, queryState(), 1);
}
@Test(expected = IllegalArgumentException.class)
public void testLogBatchNegativeTime() throws Exception
{
logBatch(Type.UNLOGGED, new ArrayList<>(), new ArrayList<>(), QueryOptions.DEFAULT, queryState(), -1);
}
@Test(expected = NullPointerException.class)
public void testLogQueryNullQuery() throws Exception
{
logQuery(null, QueryOptions.DEFAULT, queryState(), 1);
}
@Test(expected = NullPointerException.class)
public void testLogQueryNullQueryOptions() throws Exception
{
logQuery("", null, queryState(), 1);
}
@Test(expected = IllegalArgumentException.class)
public void testLogQueryNegativeTime() throws Exception
{
logQuery("", QueryOptions.DEFAULT, queryState(), -1);
}
@Test
public void testJMXArchiveCommand()
{
FullQueryLoggerOptions options = new FullQueryLoggerOptions();
try
{
DatabaseDescriptor.getFullQueryLogOptions().allow_nodetool_archive_command = false;
StorageService.instance.enableFullQueryLogger(options.log_dir, options.roll_cycle, false, 1000, 1000, "/xyz/not/null", 0);
fail("not allowed");
}
catch (ConfigurationException e)
{
assertTrue(e.getMessage().contains("Can't enable full query log archiving via nodetool"));
}
options.allow_nodetool_archive_command = true;
options.archive_command = "/xyz/not/null";
options.log_dir = "/tmp/abc";
DatabaseDescriptor.setFullQueryLogOptions(options);
StorageService.instance.enableFullQueryLogger(options.log_dir, options.roll_cycle, false, 1000, 1000, null, 0);
assertTrue(FullQueryLogger.instance.isEnabled());
assertEquals("/xyz/not/null", FullQueryLogger.instance.getFullQueryLoggerOptions().archive_command);
}
private static void compareQueryOptions(QueryOptions a, QueryOptions b)
{
assertEquals(a.getClass(), b.getClass());
assertEquals(a.getProtocolVersion(), b.getProtocolVersion());
assertEquals(a.getPageSize(), b.getPageSize());
assertEquals(a.getConsistency(), b.getConsistency());
assertEquals(a.getPagingState(), b.getPagingState());
assertEquals(a.getValues(), b.getValues());
assertEquals(a.getSerialConsistency(), b.getSerialConsistency());
}
private void configureFQL() throws Exception
{
FullQueryLogger.instance.enable(tempDir, "TEST_SECONDLY", true, 1, 1024 * 1024 * 256, StringUtils.EMPTY, 10);
}
private void logQuery(String query)
{
FullQueryLogger.instance.querySuccess(null, query, QueryOptions.DEFAULT, queryState(), 1, null);
}
private void logQuery(String query, String keyspace)
{
logQuery(query, keyspace, 1);
}
private void logQuery(String query, String keyspace, long time)
{
FullQueryLogger.instance.querySuccess(null, query, QueryOptions.DEFAULT, queryState(keyspace), time, null);
}
private void logQuery(String query, QueryOptions options, QueryState state, long time)
{
FullQueryLogger.instance.querySuccess(null, query, options, state, time, null);
}
private void logBatch(BatchStatement.Type type,
List<String> queries,
List<List<ByteBuffer>> values,
QueryOptions options,
QueryState queryState,
long time)
{
FullQueryLogger.instance.batchSuccess(type,
Collections.emptyList(),
queries,
values,
options,
queryState,
time,
null);
}
private QueryState queryState(String keyspace)
{
ClientState clientState = ClientState.forInternalCalls(keyspace);
return new QueryState(clientState);
}
private QueryState queryState()
{
return new QueryState(ClientState.forInternalCalls());
}
}