| /* | 
 |  * Licensed to the Apache Software Foundation (ASF) under one | 
 |  * or more contributor license agreements.  See the NOTICE file | 
 |  * distributed with this work for additional information | 
 |  * regarding copyright ownership.  The ASF licenses this file | 
 |  * to you under the Apache License, Version 2.0 (the | 
 |  * "License"); you may not use this file except in compliance | 
 |  * with the License.  You may obtain a copy of the License at | 
 |  * | 
 |  *     http://www.apache.org/licenses/LICENSE-2.0 | 
 |  * | 
 |  * Unless required by applicable law or agreed to in writing, software | 
 |  * distributed under the License is distributed on an "AS IS" BASIS, | 
 |  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 
 |  * See the License for the specific language governing permissions and | 
 |  * limitations under the License. | 
 |  */ | 
 | package org.apache.cassandra.db; | 
 |  | 
 | import java.io.DataInput; | 
 | import java.io.IOException; | 
 | import java.nio.ByteBuffer; | 
 |  | 
 | import com.google.common.base.Objects; | 
 | import org.slf4j.Logger; | 
 | import org.slf4j.LoggerFactory; | 
 |  | 
 | import org.apache.cassandra.config.CFMetaData; | 
 | import org.apache.cassandra.config.Schema; | 
 | import org.apache.cassandra.db.filter.IDiskAtomFilter; | 
 | import org.apache.cassandra.db.filter.QueryFilter; | 
 | import org.apache.cassandra.db.filter.SliceQueryFilter; | 
 | import org.apache.cassandra.io.IVersionedSerializer; | 
 | import org.apache.cassandra.io.util.DataOutputPlus; | 
 | import org.apache.cassandra.service.RowDataResolver; | 
 | import org.apache.cassandra.service.StorageService; | 
 | import org.apache.cassandra.utils.ByteBufferUtil; | 
 | import org.apache.cassandra.utils.Pair; | 
 |  | 
 | public class SliceFromReadCommand extends ReadCommand | 
 | { | 
 |     private static final Logger logger = LoggerFactory.getLogger(SliceFromReadCommand.class); | 
 |  | 
 |     static final SliceFromReadCommandSerializer serializer = new SliceFromReadCommandSerializer(); | 
 |  | 
 |     public final SliceQueryFilter filter; | 
 |  | 
 |     public SliceFromReadCommand(String keyspaceName, ByteBuffer key, String cfName, long timestamp, SliceQueryFilter filter) | 
 |     { | 
 |         super(keyspaceName, key, cfName, timestamp, Type.GET_SLICES); | 
 |         this.filter = filter; | 
 |     } | 
 |  | 
 |     public ReadCommand copy() | 
 |     { | 
 |         return new SliceFromReadCommand(ksName, key, cfName, timestamp, filter).setIsDigestQuery(isDigestQuery()); | 
 |     } | 
 |  | 
 |     public Row getRow(Keyspace keyspace) | 
 |     { | 
 |         CFMetaData cfm = Schema.instance.getCFMetaData(ksName, cfName); | 
 |         DecoratedKey dk = StorageService.getPartitioner().decorateKey(key); | 
 |  | 
 |         // If we're doing a reversed query and the filter includes static columns, we need to issue two separate | 
 |         // reads in order to guarantee that the static columns are fetched.  See CASSANDRA-8502 for more details. | 
 |         if (filter.reversed && filter.hasStaticSlice(cfm)) | 
 |         { | 
 |             logger.trace("Splitting reversed slice with static columns into two reads"); | 
 |             Pair<SliceQueryFilter, SliceQueryFilter> newFilters = filter.splitOutStaticSlice(cfm); | 
 |  | 
 |             Row normalResults =  keyspace.getRow(new QueryFilter(dk, cfName, newFilters.right, timestamp)); | 
 |             Row staticResults =  keyspace.getRow(new QueryFilter(dk, cfName, newFilters.left, timestamp)); | 
 |  | 
 |             // add the static results to the start of the normal results | 
 |             if (normalResults.cf == null) | 
 |                 return staticResults; | 
 |  | 
 |             if (staticResults.cf != null) | 
 |                 for (Cell cell : staticResults.cf.getReverseSortedColumns()) | 
 |                     normalResults.cf.addColumn(cell); | 
 |  | 
 |             return normalResults; | 
 |         } | 
 |  | 
 |         return keyspace.getRow(new QueryFilter(dk, cfName, filter, timestamp)); | 
 |     } | 
 |  | 
 |     @Override | 
 |     public ReadCommand maybeGenerateRetryCommand(RowDataResolver resolver, Row row) | 
 |     { | 
 |         int maxLiveColumns = resolver.getMaxLiveCount(); | 
 |  | 
 |         int count = filter.count; | 
 |         // We generate a retry if at least one node reply with count live columns but after merge we have less | 
 |         // than the total number of column we are interested in (which may be < count on a retry). | 
 |         // So in particular, if no host returned count live columns, we know it's not a short read. | 
 |         if (maxLiveColumns < count) | 
 |             return null; | 
 |  | 
 |         int liveCountInRow = row == null || row.cf == null ? 0 : filter.getLiveCount(row.cf, timestamp); | 
 |         if (liveCountInRow < getOriginalRequestedCount()) | 
 |         { | 
 |             // We asked t (= count) live columns and got l (=liveCountInRow) ones. | 
 |             // From that, we can estimate that on this row, for x requested | 
 |             // columns, only l/t end up live after reconciliation. So for next | 
 |             // round we want to ask x column so that x * (l/t) == t, i.e. x = t^2/l. | 
 |             int retryCount = liveCountInRow == 0 ? count + 1 : ((count * count) / liveCountInRow) + 1; | 
 |             SliceQueryFilter newFilter = filter.withUpdatedCount(retryCount); | 
 |             return new RetriedSliceFromReadCommand(ksName, key, cfName, timestamp, newFilter, getOriginalRequestedCount()); | 
 |         } | 
 |  | 
 |         return null; | 
 |     } | 
 |  | 
 |     @Override | 
 |     public Row maybeTrim(Row row) | 
 |     { | 
 |         if ((row == null) || (row.cf == null)) | 
 |             return row; | 
 |  | 
 |         return new Row(row.key, filter.trim(row.cf, getOriginalRequestedCount(), timestamp)); | 
 |     } | 
 |  | 
 |     public IDiskAtomFilter filter() | 
 |     { | 
 |         return filter; | 
 |     } | 
 |  | 
 |     public SliceFromReadCommand withUpdatedFilter(SliceQueryFilter newFilter) | 
 |     { | 
 |         return new SliceFromReadCommand(ksName, key, cfName, timestamp, newFilter); | 
 |     } | 
 |  | 
 |     /** | 
 |      * The original number of columns requested by the user. | 
 |      * This can be different from count when the slice command is a retry (see | 
 |      * RetriedSliceFromReadCommand) | 
 |      */ | 
 |     protected int getOriginalRequestedCount() | 
 |     { | 
 |         return filter.count; | 
 |     } | 
 |  | 
 |     @Override | 
 |     public String toString() | 
 |     { | 
 |         return Objects.toStringHelper(this) | 
 |                       .add("ksName", ksName) | 
 |                       .add("cfName", cfName) | 
 |                       .add("key", ByteBufferUtil.bytesToHex(key)) | 
 |                       .add("filter", filter) | 
 |                       .add("timestamp", timestamp) | 
 |                       .toString(); | 
 |     } | 
 | } | 
 |  | 
 | class SliceFromReadCommandSerializer implements IVersionedSerializer<ReadCommand> | 
 | { | 
 |     public void serialize(ReadCommand rm, DataOutputPlus out, int version) throws IOException | 
 |     { | 
 |         SliceFromReadCommand realRM = (SliceFromReadCommand)rm; | 
 |         out.writeBoolean(realRM.isDigestQuery()); | 
 |         out.writeUTF(realRM.ksName); | 
 |         ByteBufferUtil.writeWithShortLength(realRM.key, out); | 
 |         out.writeUTF(realRM.cfName); | 
 |         out.writeLong(realRM.timestamp); | 
 |         CFMetaData metadata = Schema.instance.getCFMetaData(realRM.ksName, realRM.cfName); | 
 |         metadata.comparator.sliceQueryFilterSerializer().serialize(realRM.filter, out, version); | 
 |     } | 
 |  | 
 |     public ReadCommand deserialize(DataInput in, int version) throws IOException | 
 |     { | 
 |         boolean isDigest = in.readBoolean(); | 
 |         String keyspaceName = in.readUTF(); | 
 |         ByteBuffer key = ByteBufferUtil.readWithShortLength(in); | 
 |         String cfName = in.readUTF(); | 
 |         long timestamp = in.readLong(); | 
 |         CFMetaData metadata = Schema.instance.getCFMetaData(keyspaceName, cfName); | 
 |         if (metadata == null) | 
 |         { | 
 |             String message = String.format("Got slice command for nonexistent table %s.%s.  If the table was just " + | 
 |                     "created, this is likely due to the schema not being fully propagated.  Please wait for schema " + | 
 |                     "agreement on table creation.", keyspaceName, cfName); | 
 |             throw new UnknownColumnFamilyException(message, null); | 
 |         } | 
 |         SliceQueryFilter filter = metadata.comparator.sliceQueryFilterSerializer().deserialize(in, version); | 
 |         return new SliceFromReadCommand(keyspaceName, key, cfName, timestamp, filter).setIsDigestQuery(isDigest); | 
 |     } | 
 |  | 
 |     public long serializedSize(ReadCommand cmd, int version) | 
 |     { | 
 |         TypeSizes sizes = TypeSizes.NATIVE; | 
 |         SliceFromReadCommand command = (SliceFromReadCommand) cmd; | 
 |         int keySize = command.key.remaining(); | 
 |  | 
 |         CFMetaData metadata = Schema.instance.getCFMetaData(cmd.ksName, cmd.cfName); | 
 |  | 
 |         int size = sizes.sizeof(cmd.isDigestQuery()); // boolean | 
 |         size += sizes.sizeof(command.ksName); | 
 |         size += sizes.sizeof((short) keySize) + keySize; | 
 |         size += sizes.sizeof(command.cfName); | 
 |         size += sizes.sizeof(cmd.timestamp); | 
 |         size += metadata.comparator.sliceQueryFilterSerializer().serializedSize(command.filter, version); | 
 |  | 
 |         return size; | 
 |     } | 
 | } |