blob: edace9d8fae03171d05ecf004787f437a31f90a7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.db;
import java.io.DataInput;
import java.io.IOException;
import java.nio.ByteBuffer;
import com.google.common.base.Objects;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.filter.IDiskAtomFilter;
import org.apache.cassandra.db.filter.QueryFilter;
import org.apache.cassandra.db.filter.SliceQueryFilter;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.service.RowDataResolver;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.Pair;
public class SliceFromReadCommand extends ReadCommand
{
private static final Logger logger = LoggerFactory.getLogger(SliceFromReadCommand.class);
static final SliceFromReadCommandSerializer serializer = new SliceFromReadCommandSerializer();
public final SliceQueryFilter filter;
public SliceFromReadCommand(String keyspaceName, ByteBuffer key, String cfName, long timestamp, SliceQueryFilter filter)
{
super(keyspaceName, key, cfName, timestamp, Type.GET_SLICES);
this.filter = filter;
}
public ReadCommand copy()
{
return new SliceFromReadCommand(ksName, key, cfName, timestamp, filter).setIsDigestQuery(isDigestQuery());
}
public Row getRow(Keyspace keyspace)
{
CFMetaData cfm = Schema.instance.getCFMetaData(ksName, cfName);
DecoratedKey dk = StorageService.getPartitioner().decorateKey(key);
// If we're doing a reversed query and the filter includes static columns, we need to issue two separate
// reads in order to guarantee that the static columns are fetched. See CASSANDRA-8502 for more details.
if (filter.reversed && filter.hasStaticSlice(cfm))
{
logger.trace("Splitting reversed slice with static columns into two reads");
Pair<SliceQueryFilter, SliceQueryFilter> newFilters = filter.splitOutStaticSlice(cfm);
Row normalResults = keyspace.getRow(new QueryFilter(dk, cfName, newFilters.right, timestamp));
Row staticResults = keyspace.getRow(new QueryFilter(dk, cfName, newFilters.left, timestamp));
// add the static results to the start of the normal results
if (normalResults.cf == null)
return staticResults;
if (staticResults.cf != null)
for (Cell cell : staticResults.cf.getReverseSortedColumns())
normalResults.cf.addColumn(cell);
return normalResults;
}
return keyspace.getRow(new QueryFilter(dk, cfName, filter, timestamp));
}
@Override
public ReadCommand maybeGenerateRetryCommand(RowDataResolver resolver, Row row)
{
int maxLiveColumns = resolver.getMaxLiveCount();
int count = filter.count;
// We generate a retry if at least one node reply with count live columns but after merge we have less
// than the total number of column we are interested in (which may be < count on a retry).
// So in particular, if no host returned count live columns, we know it's not a short read.
if (maxLiveColumns < count)
return null;
int liveCountInRow = row == null || row.cf == null ? 0 : filter.getLiveCount(row.cf, timestamp);
if (liveCountInRow < getOriginalRequestedCount())
{
// We asked t (= count) live columns and got l (=liveCountInRow) ones.
// From that, we can estimate that on this row, for x requested
// columns, only l/t end up live after reconciliation. So for next
// round we want to ask x column so that x * (l/t) == t, i.e. x = t^2/l.
int retryCount = liveCountInRow == 0 ? count + 1 : ((count * count) / liveCountInRow) + 1;
SliceQueryFilter newFilter = filter.withUpdatedCount(retryCount);
return new RetriedSliceFromReadCommand(ksName, key, cfName, timestamp, newFilter, getOriginalRequestedCount());
}
return null;
}
@Override
public Row maybeTrim(Row row)
{
if ((row == null) || (row.cf == null))
return row;
return new Row(row.key, filter.trim(row.cf, getOriginalRequestedCount(), timestamp));
}
public IDiskAtomFilter filter()
{
return filter;
}
public SliceFromReadCommand withUpdatedFilter(SliceQueryFilter newFilter)
{
return new SliceFromReadCommand(ksName, key, cfName, timestamp, newFilter);
}
/**
* The original number of columns requested by the user.
* This can be different from count when the slice command is a retry (see
* RetriedSliceFromReadCommand)
*/
protected int getOriginalRequestedCount()
{
return filter.count;
}
@Override
public String toString()
{
return Objects.toStringHelper(this)
.add("ksName", ksName)
.add("cfName", cfName)
.add("key", ByteBufferUtil.bytesToHex(key))
.add("filter", filter)
.add("timestamp", timestamp)
.toString();
}
}
class SliceFromReadCommandSerializer implements IVersionedSerializer<ReadCommand>
{
public void serialize(ReadCommand rm, DataOutputPlus out, int version) throws IOException
{
SliceFromReadCommand realRM = (SliceFromReadCommand)rm;
out.writeBoolean(realRM.isDigestQuery());
out.writeUTF(realRM.ksName);
ByteBufferUtil.writeWithShortLength(realRM.key, out);
out.writeUTF(realRM.cfName);
out.writeLong(realRM.timestamp);
CFMetaData metadata = Schema.instance.getCFMetaData(realRM.ksName, realRM.cfName);
metadata.comparator.sliceQueryFilterSerializer().serialize(realRM.filter, out, version);
}
public ReadCommand deserialize(DataInput in, int version) throws IOException
{
boolean isDigest = in.readBoolean();
String keyspaceName = in.readUTF();
ByteBuffer key = ByteBufferUtil.readWithShortLength(in);
String cfName = in.readUTF();
long timestamp = in.readLong();
CFMetaData metadata = Schema.instance.getCFMetaData(keyspaceName, cfName);
if (metadata == null)
{
String message = String.format("Got slice command for nonexistent table %s.%s. If the table was just " +
"created, this is likely due to the schema not being fully propagated. Please wait for schema " +
"agreement on table creation.", keyspaceName, cfName);
throw new UnknownColumnFamilyException(message, null);
}
SliceQueryFilter filter = metadata.comparator.sliceQueryFilterSerializer().deserialize(in, version);
return new SliceFromReadCommand(keyspaceName, key, cfName, timestamp, filter).setIsDigestQuery(isDigest);
}
public long serializedSize(ReadCommand cmd, int version)
{
TypeSizes sizes = TypeSizes.NATIVE;
SliceFromReadCommand command = (SliceFromReadCommand) cmd;
int keySize = command.key.remaining();
CFMetaData metadata = Schema.instance.getCFMetaData(cmd.ksName, cmd.cfName);
int size = sizes.sizeof(cmd.isDigestQuery()); // boolean
size += sizes.sizeof(command.ksName);
size += sizes.sizeof((short) keySize) + keySize;
size += sizes.sizeof(command.cfName);
size += sizes.sizeof(cmd.timestamp);
size += metadata.comparator.sliceQueryFilterSerializer().serializedSize(command.filter, version);
return size;
}
}