blob: 5bcd84bec00e08c2b8d6928929fa7c0712a0e7d7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.db;
import java.nio.ByteBuffer;
import java.util.concurrent.TimeUnit;
import com.google.common.annotations.VisibleForTesting;
import org.apache.cassandra.db.filter.DataLimits;
import org.apache.cassandra.index.Index;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.utils.MonotonicClock;
import org.apache.cassandra.utils.concurrent.OpOrder;
import static org.apache.cassandra.utils.MonotonicClock.preciseTime;
public class ReadExecutionController implements AutoCloseable
{
private static final long NO_SAMPLING = Long.MIN_VALUE;
// For every reads
private final OpOrder.Group baseOp;
private final TableMetadata baseMetadata; // kept to sanity check that we have take the op order on the right table
// For index reads
private final ReadExecutionController indexController;
private final WriteContext writeContext;
private final ReadCommand command;
static MonotonicClock clock = preciseTime;
private final long createdAtNanos; // Only used while sampling
private final RepairedDataInfo repairedDataInfo;
private int oldestUnrepairedTombstone = Integer.MAX_VALUE;
ReadExecutionController(ReadCommand command,
OpOrder.Group baseOp,
TableMetadata baseMetadata,
ReadExecutionController indexController,
WriteContext writeContext,
long createdAtNanos,
boolean trackRepairedStatus)
{
// We can have baseOp == null, but only when empty() is called, in which case the controller will never really be used
// (which validForReadOn should ensure). But if it's not null, we should have the proper metadata too.
assert (baseOp == null) == (baseMetadata == null);
this.baseOp = baseOp;
this.baseMetadata = baseMetadata;
this.indexController = indexController;
this.writeContext = writeContext;
this.command = command;
this.createdAtNanos = createdAtNanos;
if (trackRepairedStatus)
{
DataLimits.Counter repairedReadCount = command.limits().newCounter(command.nowInSec(),
false,
command.selectsFullPartition(),
metadata().enforceStrictLiveness()).onlyCount();
repairedDataInfo = new RepairedDataInfo(repairedReadCount);
}
else
{
repairedDataInfo = RepairedDataInfo.NO_OP_REPAIRED_DATA_INFO;
}
}
public ReadExecutionController indexReadController()
{
return indexController;
}
public WriteContext getWriteContext()
{
return writeContext;
}
int oldestUnrepairedTombstone()
{
return oldestUnrepairedTombstone;
}
void updateMinOldestUnrepairedTombstone(int candidate)
{
oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, candidate);
}
boolean validForReadOn(ColumnFamilyStore cfs)
{
return baseOp != null && cfs.metadata.id.equals(baseMetadata.id);
}
public static ReadExecutionController empty()
{
return new ReadExecutionController(null, null, null, null, null, NO_SAMPLING, false);
}
/**
* Creates an execution controller for the provided command.
* <p>
* Note: no code should use this method outside of {@link ReadCommand#executionController} (for
* consistency sake) and you should use that latter method if you need an execution controller.
*
* @param command the command for which to create a controller.
* @return the created execution controller, which must always be closed.
*/
@SuppressWarnings("resource") // ops closed during controller close
static ReadExecutionController forCommand(ReadCommand command, boolean trackRepairedStatus)
{
ColumnFamilyStore baseCfs = Keyspace.openAndGetStore(command.metadata());
ColumnFamilyStore indexCfs = maybeGetIndexCfs(baseCfs, command);
long createdAtNanos = baseCfs.metric.topLocalReadQueryTime.isEnabled() ? clock.now() : NO_SAMPLING;
if (indexCfs == null)
return new ReadExecutionController(command, baseCfs.readOrdering.start(), baseCfs.metadata(), null, null, createdAtNanos, trackRepairedStatus);
OpOrder.Group baseOp = null;
WriteContext writeContext = null;
ReadExecutionController indexController = null;
// OpOrder.start() shouldn't fail, but better safe than sorry.
try
{
baseOp = baseCfs.readOrdering.start();
indexController = new ReadExecutionController(command, indexCfs.readOrdering.start(), indexCfs.metadata(), null, null, NO_SAMPLING, false);
/*
* TODO: this should perhaps not open and maintain a writeOp for the full duration, but instead only *try*
* to delete stale entries, without blocking if there's no room
* as it stands, we open a writeOp and keep it open for the duration to ensure that should this CF get flushed to make room we don't block the reclamation of any room being made
*/
writeContext = baseCfs.keyspace.getWriteHandler().createContextForRead();
return new ReadExecutionController(command, baseOp, baseCfs.metadata(), indexController, writeContext, createdAtNanos, trackRepairedStatus);
}
catch (RuntimeException e)
{
// Note that must have writeContext == null since ReadOrderGroup ctor can't fail
assert writeContext == null;
try
{
if (baseOp != null)
baseOp.close();
}
finally
{
if (indexController != null)
indexController.close();
}
throw e;
}
}
private static ColumnFamilyStore maybeGetIndexCfs(ColumnFamilyStore baseCfs, ReadCommand command)
{
Index index = command.getIndex(baseCfs);
return index == null ? null : index.getBackingTable().orElse(null);
}
public TableMetadata metadata()
{
return baseMetadata;
}
public void close()
{
try
{
if (baseOp != null)
baseOp.close();
}
finally
{
if (indexController != null)
{
try
{
indexController.close();
}
finally
{
writeContext.close();
}
}
}
if (createdAtNanos != NO_SAMPLING)
addSample();
}
public boolean isTrackingRepairedStatus()
{
return repairedDataInfo != RepairedDataInfo.NO_OP_REPAIRED_DATA_INFO;
}
@VisibleForTesting
public ByteBuffer getRepairedDataDigest()
{
return repairedDataInfo.getDigest();
}
@VisibleForTesting
public boolean isRepairedDataDigestConclusive()
{
return repairedDataInfo.isConclusive();
}
public RepairedDataInfo getRepairedDataInfo()
{
return repairedDataInfo;
}
private void addSample()
{
String cql = command.toCQLString();
int timeMicros = (int) Math.min(TimeUnit.NANOSECONDS.toMicros(clock.now() - createdAtNanos), Integer.MAX_VALUE);
ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(baseMetadata.id);
if (cfs != null)
cfs.metric.topLocalReadQueryTime.addSample(cql, timeMicros);
}
}