blob: 664eeee35e4edb66861da4cfe7526ca51b7a0ac9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.db;
import java.io.DataInput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import com.google.common.base.Objects;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.filter.ExtendedFilter;
import org.apache.cassandra.db.filter.IDiskAtomFilter;
import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.pager.Pageable;
public class RangeSliceCommand extends AbstractRangeCommand implements Pageable
{
public static final RangeSliceCommandSerializer serializer = new RangeSliceCommandSerializer();
public final int maxResults;
public final boolean countCQL3Rows;
public final boolean isPaging;
public RangeSliceCommand(String keyspace,
String columnFamily,
long timestamp,
IDiskAtomFilter predicate,
AbstractBounds<RowPosition> range,
int maxResults)
{
this(keyspace, columnFamily, timestamp, predicate, range, null, maxResults, false, false);
}
public RangeSliceCommand(String keyspace,
String columnFamily,
long timestamp,
IDiskAtomFilter predicate,
AbstractBounds<RowPosition> range,
List<IndexExpression> row_filter,
int maxResults)
{
this(keyspace, columnFamily, timestamp, predicate, range, row_filter, maxResults, false, false);
}
public RangeSliceCommand(String keyspace,
String columnFamily,
long timestamp,
IDiskAtomFilter predicate,
AbstractBounds<RowPosition> range,
List<IndexExpression> rowFilter,
int maxResults,
boolean countCQL3Rows,
boolean isPaging)
{
super(keyspace, columnFamily, timestamp, range, predicate, rowFilter);
this.maxResults = maxResults;
this.countCQL3Rows = countCQL3Rows;
this.isPaging = isPaging;
}
public MessageOut<RangeSliceCommand> createMessage()
{
return new MessageOut<>(MessagingService.Verb.RANGE_SLICE, this, serializer);
}
public AbstractRangeCommand forSubRange(AbstractBounds<RowPosition> subRange)
{
return new RangeSliceCommand(keyspace,
columnFamily,
timestamp,
predicate.cloneShallow(),
subRange,
rowFilter,
maxResults,
countCQL3Rows,
isPaging);
}
public AbstractRangeCommand withUpdatedLimit(int newLimit)
{
return new RangeSliceCommand(keyspace,
columnFamily,
timestamp,
predicate.cloneShallow(),
keyRange,
rowFilter,
newLimit,
countCQL3Rows,
isPaging);
}
public int limit()
{
return maxResults;
}
public boolean countCQL3Rows()
{
return countCQL3Rows;
}
public List<Row> executeLocally()
{
ColumnFamilyStore cfs = Keyspace.open(keyspace).getColumnFamilyStore(columnFamily);
ExtendedFilter exFilter = cfs.makeExtendedFilter(keyRange, predicate, rowFilter, maxResults, countCQL3Rows, isPaging, timestamp);
if (cfs.indexManager.hasIndexFor(rowFilter))
return cfs.search(exFilter);
else
return cfs.getRangeSlice(exFilter);
}
@Override
public String toString()
{
return Objects.toStringHelper(this)
.add("keyspace", keyspace)
.add("columnFamily", columnFamily)
.add("predicate", predicate)
.add("keyRange", keyRange)
.add("rowFilter", rowFilter)
.add("maxResults", maxResults)
.add("counterCQL3Rows", countCQL3Rows)
.add("timestamp", timestamp)
.toString();
}
}
class RangeSliceCommandSerializer implements IVersionedSerializer<RangeSliceCommand>
{
public void serialize(RangeSliceCommand sliceCommand, DataOutputPlus out, int version) throws IOException
{
out.writeUTF(sliceCommand.keyspace);
out.writeUTF(sliceCommand.columnFamily);
out.writeLong(sliceCommand.timestamp);
CFMetaData metadata = Schema.instance.getCFMetaData(sliceCommand.keyspace, sliceCommand.columnFamily);
metadata.comparator.diskAtomFilterSerializer().serialize(sliceCommand.predicate, out, version);
if (sliceCommand.rowFilter == null)
{
out.writeInt(0);
}
else
{
out.writeInt(sliceCommand.rowFilter.size());
for (IndexExpression expr : sliceCommand.rowFilter)
{
expr.writeTo(out);
}
}
MessagingService.validatePartitioner(sliceCommand.keyRange);
AbstractBounds.rowPositionSerializer.serialize(sliceCommand.keyRange, out, version);
out.writeInt(sliceCommand.maxResults);
out.writeBoolean(sliceCommand.countCQL3Rows);
out.writeBoolean(sliceCommand.isPaging);
}
public RangeSliceCommand deserialize(DataInput in, int version) throws IOException
{
String keyspace = in.readUTF();
String columnFamily = in.readUTF();
long timestamp = in.readLong();
CFMetaData metadata = Schema.instance.getCFMetaData(keyspace, columnFamily);
if (metadata == null)
{
String message = String.format("Got range slice command for nonexistent table %s.%s. If the table was just " +
"created, this is likely due to the schema not being fully propagated. Please wait for schema " +
"agreement on table creation." , keyspace, columnFamily);
throw new UnknownColumnFamilyException(message, null);
}
IDiskAtomFilter predicate = metadata.comparator.diskAtomFilterSerializer().deserialize(in, version);
List<IndexExpression> rowFilter;
int filterCount = in.readInt();
rowFilter = new ArrayList<>(filterCount);
for (int i = 0; i < filterCount; i++)
{
rowFilter.add(IndexExpression.readFrom(in));
}
AbstractBounds<RowPosition> range = AbstractBounds.rowPositionSerializer.deserialize(in, MessagingService.globalPartitioner(), version);
int maxResults = in.readInt();
boolean countCQL3Rows = in.readBoolean();
boolean isPaging = in.readBoolean();
return new RangeSliceCommand(keyspace, columnFamily, timestamp, predicate, range, rowFilter, maxResults, countCQL3Rows, isPaging);
}
public long serializedSize(RangeSliceCommand rsc, int version)
{
long size = TypeSizes.NATIVE.sizeof(rsc.keyspace);
size += TypeSizes.NATIVE.sizeof(rsc.columnFamily);
size += TypeSizes.NATIVE.sizeof(rsc.timestamp);
CFMetaData metadata = Schema.instance.getCFMetaData(rsc.keyspace, rsc.columnFamily);
IDiskAtomFilter filter = rsc.predicate;
size += metadata.comparator.diskAtomFilterSerializer().serializedSize(filter, version);
if (rsc.rowFilter == null)
{
size += TypeSizes.NATIVE.sizeof(0);
}
else
{
size += TypeSizes.NATIVE.sizeof(rsc.rowFilter.size());
for (IndexExpression expr : rsc.rowFilter)
{
size += TypeSizes.NATIVE.sizeofWithShortLength(expr.column);
size += TypeSizes.NATIVE.sizeof(expr.operator.ordinal());
size += TypeSizes.NATIVE.sizeofWithShortLength(expr.value);
}
}
size += AbstractBounds.rowPositionSerializer.serializedSize(rsc.keyRange, version);
size += TypeSizes.NATIVE.sizeof(rsc.maxResults);
size += TypeSizes.NATIVE.sizeof(rsc.countCQL3Rows);
size += TypeSizes.NATIVE.sizeof(rsc.isPaging);
return size;
}
}