| /* |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| */ |
| package org.apache.cassandra.db; |
| |
| import static org.junit.Assert.*; |
| |
| import java.io.IOException; |
| import java.nio.ByteBuffer; |
| import java.util.Collections; |
| import java.util.Iterator; |
| import org.junit.Assert; |
| import org.junit.Before; |
| import org.junit.BeforeClass; |
| import org.junit.Test; |
| |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| import org.apache.cassandra.SchemaLoader; |
| import org.apache.cassandra.Util; |
| import org.apache.cassandra.config.CFMetaData; |
| import org.apache.cassandra.config.ColumnDefinition; |
| import org.apache.cassandra.config.DatabaseDescriptor; |
| import org.apache.cassandra.config.Schema; |
| import org.apache.cassandra.cql3.ColumnIdentifier; |
| import org.apache.cassandra.cql3.QueryProcessor; |
| import org.apache.cassandra.cql3.UntypedResultSet; |
| import org.apache.cassandra.db.filter.AbstractClusteringIndexFilter; |
| import org.apache.cassandra.db.filter.ClusteringIndexNamesFilter; |
| import org.apache.cassandra.db.filter.ClusteringIndexSliceFilter; |
| import org.apache.cassandra.db.filter.ColumnFilter; |
| import org.apache.cassandra.db.filter.DataLimits; |
| import org.apache.cassandra.db.filter.RowFilter; |
| import org.apache.cassandra.db.marshal.Int32Type; |
| import org.apache.cassandra.db.marshal.IntegerType; |
| import org.apache.cassandra.db.marshal.UTF8Type; |
| import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; |
| import org.apache.cassandra.db.rows.Cell; |
| import org.apache.cassandra.db.rows.RangeTombstoneMarker; |
| import org.apache.cassandra.db.rows.Row; |
| import org.apache.cassandra.db.rows.Unfiltered; |
| import org.apache.cassandra.db.rows.UnfilteredRowIterator; |
| import org.apache.cassandra.exceptions.ConfigurationException; |
| import org.apache.cassandra.io.util.DataInputBuffer; |
| import org.apache.cassandra.io.util.DataInputPlus; |
| import org.apache.cassandra.io.util.DataOutputBuffer; |
| import org.apache.cassandra.net.MessagingService; |
| import org.apache.cassandra.schema.KeyspaceParams; |
| import org.apache.cassandra.utils.ByteBufferUtil; |
| import org.apache.cassandra.utils.FBUtilities; |
| import org.apache.cassandra.utils.btree.BTreeSet; |
| |
| public class SinglePartitionSliceCommandTest |
| { |
| private static final Logger logger = LoggerFactory.getLogger(SinglePartitionSliceCommandTest.class); |
| |
| private static final String KEYSPACE = "ks"; |
| private static final String TABLE = "tbl"; |
| |
| private static CFMetaData cfm; |
| private static ColumnDefinition v; |
| private static ColumnDefinition s; |
| |
| private static final String TABLE_SCLICES = "tbl_slices"; |
| private static CFMetaData CFM_SLICES; |
| |
| @BeforeClass |
| public static void defineSchema() throws ConfigurationException |
| { |
| DatabaseDescriptor.daemonInitialization(); |
| |
| cfm = CFMetaData.Builder.create(KEYSPACE, TABLE) |
| .addPartitionKey("k", UTF8Type.instance) |
| .addStaticColumn("s", UTF8Type.instance) |
| .addClusteringColumn("i", IntegerType.instance) |
| .addRegularColumn("v", UTF8Type.instance) |
| .build(); |
| |
| CFM_SLICES = CFMetaData.Builder.create(KEYSPACE, TABLE_SCLICES) |
| .addPartitionKey("k", UTF8Type.instance) |
| .addClusteringColumn("c1", Int32Type.instance) |
| .addClusteringColumn("c2", Int32Type.instance) |
| .addRegularColumn("v", IntegerType.instance) |
| .build(); |
| |
| SchemaLoader.prepareServer(); |
| SchemaLoader.createKeyspace(KEYSPACE, KeyspaceParams.simple(1), cfm, CFM_SLICES); |
| |
| cfm = Schema.instance.getCFMetaData(KEYSPACE, TABLE); |
| v = cfm.getColumnDefinition(new ColumnIdentifier("v", true)); |
| s = cfm.getColumnDefinition(new ColumnIdentifier("s", true)); |
| |
| CFM_SLICES = Schema.instance.getCFMetaData(KEYSPACE, TABLE_SCLICES); |
| } |
| |
| @Before |
| public void truncate() |
| { |
| Keyspace.open(KEYSPACE).getColumnFamilyStore(TABLE).truncateBlocking(); |
| Keyspace.open(KEYSPACE).getColumnFamilyStore(TABLE_SCLICES).truncateBlocking(); |
| } |
| |
| @Test |
| public void staticColumnsAreFiltered() throws IOException |
| { |
| DecoratedKey key = cfm.decorateKey(ByteBufferUtil.bytes("k")); |
| |
| UntypedResultSet rows; |
| |
| QueryProcessor.executeInternal("INSERT INTO ks.tbl (k, s, i, v) VALUES ('k', 's', 0, 'v')"); |
| QueryProcessor.executeInternal("DELETE v FROM ks.tbl WHERE k='k' AND i=0"); |
| QueryProcessor.executeInternal("DELETE FROM ks.tbl WHERE k='k' AND i=0"); |
| rows = QueryProcessor.executeInternal("SELECT * FROM ks.tbl WHERE k='k' AND i=0"); |
| |
| for (UntypedResultSet.Row row: rows) |
| { |
| logger.debug("Current: k={}, s={}, v={}", (row.has("k") ? row.getString("k") : null), (row.has("s") ? row.getString("s") : null), (row.has("v") ? row.getString("v") : null)); |
| } |
| |
| assert rows.isEmpty(); |
| |
| ColumnFilter columnFilter = ColumnFilter.selection(PartitionColumns.of(v)); |
| ByteBuffer zero = ByteBufferUtil.bytes(0); |
| Slices slices = Slices.with(cfm.comparator, Slice.make(ClusteringBound.inclusiveStartOf(zero), ClusteringBound.inclusiveEndOf(zero))); |
| ClusteringIndexSliceFilter sliceFilter = new ClusteringIndexSliceFilter(slices, false); |
| ReadCommand cmd = SinglePartitionReadCommand.create(true, |
| cfm, |
| FBUtilities.nowInSeconds(), |
| columnFilter, |
| RowFilter.NONE, |
| DataLimits.NONE, |
| key, |
| sliceFilter); |
| |
| DataOutputBuffer out = new DataOutputBuffer((int) ReadCommand.legacyReadCommandSerializer.serializedSize(cmd, MessagingService.VERSION_21)); |
| ReadCommand.legacyReadCommandSerializer.serialize(cmd, out, MessagingService.VERSION_21); |
| DataInputPlus in = new DataInputBuffer(out.buffer(), true); |
| cmd = ReadCommand.legacyReadCommandSerializer.deserialize(in, MessagingService.VERSION_21); |
| |
| logger.debug("ReadCommand: {}", cmd); |
| try (ReadExecutionController controller = cmd.executionController(); |
| UnfilteredPartitionIterator partitionIterator = cmd.executeLocally(controller)) |
| { |
| ReadResponse response = ReadResponse.createDataResponse(partitionIterator, cmd); |
| |
| logger.debug("creating response: {}", response); |
| try (UnfilteredPartitionIterator pIter = response.makeIterator(cmd)) |
| { |
| assert pIter.hasNext(); |
| try (UnfilteredRowIterator partition = pIter.next()) |
| { |
| LegacyLayout.LegacyUnfilteredPartition rowIter = LegacyLayout.fromUnfilteredRowIterator(cmd, partition); |
| Assert.assertEquals(Collections.emptyList(), rowIter.cells); |
| } |
| } |
| } |
| } |
| |
| @Test |
| public void testMultiNamesCommandWithFlush() |
| { |
| testMultiNamesOrSlicesCommand(true, false); |
| } |
| |
| @Test |
| public void testMultiNamesCommandWithoutFlush() |
| { |
| testMultiNamesOrSlicesCommand(false, false); |
| } |
| |
| @Test |
| public void testMultiSlicesCommandWithFlush() |
| { |
| testMultiNamesOrSlicesCommand(true, true); |
| } |
| |
| @Test |
| public void testMultiSlicesCommandWithoutFlush() |
| { |
| testMultiNamesOrSlicesCommand(false, true); |
| } |
| |
| private AbstractClusteringIndexFilter createClusteringFilter(int uniqueCk1, int uniqueCk2, boolean isSlice) |
| { |
| Slices.Builder slicesBuilder = new Slices.Builder(CFM_SLICES.comparator); |
| BTreeSet.Builder<Clustering> namesBuilder = BTreeSet.builder(CFM_SLICES.comparator); |
| |
| for (int ck1 = 0; ck1 < uniqueCk1; ck1++) |
| { |
| for (int ck2 = 0; ck2 < uniqueCk2; ck2++) |
| { |
| if (isSlice) |
| slicesBuilder.add(Slice.make(Util.clustering(CFM_SLICES.comparator, ck1, ck2))); |
| else |
| namesBuilder.add(Util.clustering(CFM_SLICES.comparator, ck1, ck2)); |
| } |
| } |
| if (isSlice) |
| return new ClusteringIndexSliceFilter(slicesBuilder.build(), false); |
| return new ClusteringIndexNamesFilter(namesBuilder.build(), false); |
| } |
| |
| private void testMultiNamesOrSlicesCommand(boolean flush, boolean isSlice) |
| { |
| boolean isTombstone = flush || isSlice; |
| int deletionTime = 5; |
| int ck1 = 1; |
| int uniqueCk1 = 2; |
| int uniqueCk2 = 3; |
| |
| DecoratedKey key = CFM_SLICES.decorateKey(ByteBufferUtil.bytes("k")); |
| QueryProcessor.executeInternal(String.format("DELETE FROM ks.tbl_slices USING TIMESTAMP %d WHERE k='k' AND c1=%d", |
| deletionTime, |
| ck1)); |
| |
| if (flush) |
| Keyspace.open(KEYSPACE).getColumnFamilyStore(TABLE_SCLICES).forceBlockingFlush(); |
| |
| AbstractClusteringIndexFilter clusteringFilter = createClusteringFilter(uniqueCk1, uniqueCk2, isSlice); |
| ReadCommand cmd = SinglePartitionReadCommand.create(CFM_SLICES, |
| FBUtilities.nowInSeconds(), |
| ColumnFilter.all(CFM_SLICES), |
| RowFilter.NONE, |
| DataLimits.NONE, |
| key, |
| clusteringFilter); |
| |
| UnfilteredPartitionIterator partitionIterator = cmd.executeLocally(cmd.executionController()); |
| assert partitionIterator.hasNext(); |
| UnfilteredRowIterator partition = partitionIterator.next(); |
| |
| int count = 0; |
| boolean open = true; |
| while (partition.hasNext()) |
| { |
| Unfiltered unfiltered = partition.next(); |
| if (isTombstone) |
| { |
| assertTrue(unfiltered.isRangeTombstoneMarker()); |
| RangeTombstoneMarker marker = (RangeTombstoneMarker) unfiltered; |
| |
| // check if it's open-close pair |
| assertTrue(marker.isOpen(false) == open); |
| // check deletion time same as Range Deletion |
| if (open) |
| assertEquals(deletionTime, marker.openDeletionTime(false).markedForDeleteAt()); |
| else |
| assertEquals(deletionTime, marker.closeDeletionTime(false).markedForDeleteAt()); |
| |
| // check clustering values |
| Clustering clustering = Util.clustering(CFM_SLICES.comparator, ck1, count / 2); |
| for (int i = 0; i < CFM_SLICES.comparator.size(); i++) |
| { |
| int cmp = CFM_SLICES.comparator.compareComponent(i, |
| clustering.getRawValues()[i], |
| marker.clustering().values[i]); |
| assertEquals(0, cmp); |
| } |
| open = !open; |
| } |
| else |
| { |
| // deleted row |
| assertTrue(unfiltered.isRow()); |
| Row row = (Row) unfiltered; |
| assertEquals(deletionTime, row.deletion().time().markedForDeleteAt()); |
| assertEquals(0, row.size()); // no btree |
| } |
| count++; |
| } |
| if (isTombstone) |
| assertEquals(uniqueCk2 * 2, count); // open and close range tombstones |
| else |
| assertEquals(uniqueCk2, count); |
| } |
| |
| private void checkForS(UnfilteredPartitionIterator pi) |
| { |
| Assert.assertTrue(pi.toString(), pi.hasNext()); |
| UnfilteredRowIterator ri = pi.next(); |
| Assert.assertTrue(ri.columns().contains(s)); |
| Row staticRow = ri.staticRow(); |
| Iterator<Cell> cellIterator = staticRow.cells().iterator(); |
| Assert.assertTrue(staticRow.toString(cfm, true), cellIterator.hasNext()); |
| Cell cell = cellIterator.next(); |
| Assert.assertEquals(s, cell.column()); |
| Assert.assertEquals(ByteBufferUtil.bytesToHex(cell.value()), ByteBufferUtil.bytes("s"), cell.value()); |
| Assert.assertFalse(cellIterator.hasNext()); |
| } |
| |
| @Test |
| public void staticColumnsAreReturned() throws IOException |
| { |
| DecoratedKey key = cfm.decorateKey(ByteBufferUtil.bytes("k1")); |
| |
| QueryProcessor.executeInternal("INSERT INTO ks.tbl (k, s) VALUES ('k1', 's')"); |
| Assert.assertFalse(QueryProcessor.executeInternal("SELECT s FROM ks.tbl WHERE k='k1'").isEmpty()); |
| |
| ColumnFilter columnFilter = ColumnFilter.selection(PartitionColumns.of(s)); |
| ClusteringIndexSliceFilter sliceFilter = new ClusteringIndexSliceFilter(Slices.NONE, false); |
| ReadCommand cmd = SinglePartitionReadCommand.create(true, |
| cfm, |
| FBUtilities.nowInSeconds(), |
| columnFilter, |
| RowFilter.NONE, |
| DataLimits.NONE, |
| key, |
| sliceFilter); |
| |
| // check raw iterator for static cell |
| try (ReadExecutionController executionController = cmd.executionController(); UnfilteredPartitionIterator pi = cmd.executeLocally(executionController)) |
| { |
| checkForS(pi); |
| } |
| |
| ReadResponse response; |
| DataOutputBuffer out; |
| DataInputPlus in; |
| ReadResponse dst; |
| |
| // check (de)serialized iterator for memtable static cell |
| try (ReadExecutionController executionController = cmd.executionController(); UnfilteredPartitionIterator pi = cmd.executeLocally(executionController)) |
| { |
| response = ReadResponse.createDataResponse(pi, cmd); |
| } |
| |
| out = new DataOutputBuffer((int) ReadResponse.serializer.serializedSize(response, MessagingService.VERSION_30)); |
| ReadResponse.serializer.serialize(response, out, MessagingService.VERSION_30); |
| in = new DataInputBuffer(out.buffer(), true); |
| dst = ReadResponse.serializer.deserialize(in, MessagingService.VERSION_30); |
| try (UnfilteredPartitionIterator pi = dst.makeIterator(cmd)) |
| { |
| checkForS(pi); |
| } |
| |
| // check (de)serialized iterator for sstable static cell |
| Schema.instance.getColumnFamilyStoreInstance(cfm.cfId).forceBlockingFlush(); |
| try (ReadExecutionController executionController = cmd.executionController(); UnfilteredPartitionIterator pi = cmd.executeLocally(executionController)) |
| { |
| response = ReadResponse.createDataResponse(pi, cmd); |
| } |
| out = new DataOutputBuffer((int) ReadResponse.serializer.serializedSize(response, MessagingService.VERSION_30)); |
| ReadResponse.serializer.serialize(response, out, MessagingService.VERSION_30); |
| in = new DataInputBuffer(out.buffer(), true); |
| dst = ReadResponse.serializer.deserialize(in, MessagingService.VERSION_30); |
| try (UnfilteredPartitionIterator pi = dst.makeIterator(cmd)) |
| { |
| checkForS(pi); |
| } |
| } |
| |
| @Test |
| public void toCQLStringIsSafeToCall() throws IOException |
| { |
| DecoratedKey key = cfm.decorateKey(ByteBufferUtil.bytes("k1")); |
| |
| ColumnFilter columnFilter = ColumnFilter.selection(PartitionColumns.of(s)); |
| Slice slice = Slice.make(ClusteringBound.BOTTOM, ClusteringBound.inclusiveEndOf(ByteBufferUtil.bytes("i1"))); |
| ClusteringIndexSliceFilter sliceFilter = new ClusteringIndexSliceFilter(Slices.with(cfm.comparator, slice), false); |
| ReadCommand cmd = SinglePartitionReadCommand.create(true, |
| cfm, |
| FBUtilities.nowInSeconds(), |
| columnFilter, |
| RowFilter.NONE, |
| DataLimits.NONE, |
| key, |
| sliceFilter); |
| |
| String ret = cmd.toCQLString(); |
| Assert.assertNotNull(ret); |
| Assert.assertFalse(ret.isEmpty()); |
| } |
| } |