| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.cassandra.triggers; |
| |
| import java.util.*; |
| |
| import org.junit.Test; |
| |
| import org.apache.cassandra.Util; |
| import org.apache.cassandra.config.CFMetaData; |
| import org.apache.cassandra.db.*; |
| import org.apache.cassandra.db.rows.*; |
| import org.apache.cassandra.db.marshal.UTF8Type; |
| import org.apache.cassandra.db.partitions.Partition; |
| import org.apache.cassandra.db.partitions.PartitionUpdate; |
| import org.apache.cassandra.exceptions.ConfigurationException; |
| import org.apache.cassandra.exceptions.InvalidRequestException; |
| import org.apache.cassandra.schema.TriggerMetadata; |
| import org.apache.cassandra.utils.FBUtilities; |
| |
| import static org.apache.cassandra.utils.ByteBufferUtil.bytes; |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertNull; |
| import static org.junit.Assert.assertTrue; |
| |
| public class TriggerExecutorTest |
| { |
| @Test |
| public void sameKeySameCfColumnFamilies() throws ConfigurationException, InvalidRequestException |
| { |
| CFMetaData metadata = makeCfMetaData("ks1", "cf1", TriggerMetadata.create("test", SameKeySameCfTrigger.class.getName())); |
| PartitionUpdate mutated = TriggerExecutor.instance.execute(makeCf(metadata, "k1", "v1", null)); |
| |
| RowIterator rowIterator = UnfilteredRowIterators.filter(mutated.unfilteredIterator(), FBUtilities.nowInSeconds()); |
| |
| Iterator<Cell> cells = rowIterator.next().cells().iterator(); |
| assertEquals(bytes("trigger"), cells.next().value()); |
| |
| assertTrue(!rowIterator.hasNext()); |
| } |
| |
| @Test(expected = InvalidRequestException.class) |
| public void sameKeyDifferentCfColumnFamilies() throws ConfigurationException, InvalidRequestException |
| { |
| CFMetaData metadata = makeCfMetaData("ks1", "cf1", TriggerMetadata.create("test", SameKeyDifferentCfTrigger.class.getName())); |
| TriggerExecutor.instance.execute(makeCf(metadata, "k1", "v1", null)); |
| } |
| |
| @Test(expected = InvalidRequestException.class) |
| public void differentKeyColumnFamilies() throws ConfigurationException, InvalidRequestException |
| { |
| CFMetaData metadata = makeCfMetaData("ks1", "cf1", TriggerMetadata.create("test", DifferentKeyTrigger.class.getName())); |
| TriggerExecutor.instance.execute(makeCf(metadata, "k1", "v1", null)); |
| } |
| |
| @Test |
| public void noTriggerMutations() throws ConfigurationException, InvalidRequestException |
| { |
| CFMetaData metadata = makeCfMetaData("ks1", "cf1", TriggerMetadata.create("test", NoOpTrigger.class.getName())); |
| Mutation rm = new Mutation(makeCf(metadata, "k1", "v1", null)); |
| assertNull(TriggerExecutor.instance.execute(Collections.singletonList(rm))); |
| } |
| |
| @Test |
| public void sameKeySameCfRowMutations() throws ConfigurationException, InvalidRequestException |
| { |
| CFMetaData metadata = makeCfMetaData("ks1", "cf1", TriggerMetadata.create("test", SameKeySameCfTrigger.class.getName())); |
| PartitionUpdate cf1 = makeCf(metadata, "k1", "k1v1", null); |
| PartitionUpdate cf2 = makeCf(metadata, "k2", "k2v1", null); |
| Mutation rm1 = new Mutation("ks1", cf1.partitionKey()).add(cf1); |
| Mutation rm2 = new Mutation("ks1", cf2.partitionKey()).add(cf2); |
| |
| List<? extends IMutation> tmutations = new ArrayList<>(TriggerExecutor.instance.execute(Arrays.asList(rm1, rm2))); |
| assertEquals(2, tmutations.size()); |
| Collections.sort(tmutations, new RmComparator()); |
| |
| List<PartitionUpdate> mutatedCFs = new ArrayList<>(tmutations.get(0).getPartitionUpdates()); |
| assertEquals(1, mutatedCFs.size()); |
| Row row = mutatedCFs.get(0).iterator().next(); |
| assertEquals(bytes("k1v1"), row.getCell(metadata.getColumnDefinition(bytes("c1"))).value()); |
| assertEquals(bytes("trigger"), row.getCell(metadata.getColumnDefinition(bytes("c2"))).value()); |
| |
| mutatedCFs = new ArrayList<>(tmutations.get(1).getPartitionUpdates()); |
| assertEquals(1, mutatedCFs.size()); |
| row = mutatedCFs.get(0).iterator().next(); |
| assertEquals(bytes("k2v1"), row.getCell(metadata.getColumnDefinition(bytes("c1"))).value()); |
| assertEquals(bytes("trigger"), row.getCell(metadata.getColumnDefinition(bytes("c2"))).value()); |
| } |
| |
| @Test |
| public void sameKeySameCfPartialRowMutations() throws ConfigurationException, InvalidRequestException |
| { |
| CFMetaData metadata = makeCfMetaData("ks1", "cf1", TriggerMetadata.create("test", SameKeySameCfPartialTrigger.class.getName())); |
| PartitionUpdate cf1 = makeCf(metadata, "k1", "k1v1", null); |
| PartitionUpdate cf2 = makeCf(metadata, "k2", "k2v1", null); |
| Mutation rm1 = new Mutation("ks1", cf1.partitionKey()).add(cf1); |
| Mutation rm2 = new Mutation("ks1", cf2.partitionKey()).add(cf2); |
| |
| List<? extends IMutation> tmutations = new ArrayList<>(TriggerExecutor.instance.execute(Arrays.asList(rm1, rm2))); |
| assertEquals(2, tmutations.size()); |
| Collections.sort(tmutations, new RmComparator()); |
| |
| List<PartitionUpdate> mutatedCFs = new ArrayList<>(tmutations.get(0).getPartitionUpdates()); |
| assertEquals(1, mutatedCFs.size()); |
| Row row = mutatedCFs.get(0).iterator().next(); |
| assertEquals(bytes("k1v1"), row.getCell(metadata.getColumnDefinition(bytes("c1"))).value()); |
| assertNull(row.getCell(metadata.getColumnDefinition(bytes("c2")))); |
| |
| mutatedCFs = new ArrayList<>(tmutations.get(1).getPartitionUpdates()); |
| assertEquals(1, mutatedCFs.size()); |
| row = mutatedCFs.get(0).iterator().next(); |
| assertEquals(bytes("k2v1"), row.getCell(metadata.getColumnDefinition(bytes("c1"))).value()); |
| assertEquals(bytes("trigger"), row.getCell(metadata.getColumnDefinition(bytes("c2"))).value()); |
| } |
| |
| @Test |
| public void sameKeyDifferentCfRowMutations() throws ConfigurationException, InvalidRequestException |
| { |
| CFMetaData metadata = makeCfMetaData("ks1", "cf1", TriggerMetadata.create("test", SameKeyDifferentCfTrigger.class.getName())); |
| PartitionUpdate cf1 = makeCf(metadata, "k1", "k1v1", null); |
| PartitionUpdate cf2 = makeCf(metadata, "k2", "k2v1", null); |
| Mutation rm1 = new Mutation("ks1", cf1.partitionKey()).add(cf1); |
| Mutation rm2 = new Mutation("ks1", cf2.partitionKey()).add(cf2); |
| |
| List<? extends IMutation> tmutations = new ArrayList<>(TriggerExecutor.instance.execute(Arrays.asList(rm1, rm2))); |
| assertEquals(2, tmutations.size()); |
| Collections.sort(tmutations, new RmComparator()); |
| |
| List<PartitionUpdate> mutatedCFs = new ArrayList<>(tmutations.get(0).getPartitionUpdates()); |
| assertEquals(2, mutatedCFs.size()); |
| for (PartitionUpdate update : mutatedCFs) |
| { |
| if (update.metadata().cfName.equals("cf1")) |
| { |
| Row row = update.iterator().next(); |
| assertEquals(bytes("k1v1"), row.getCell(metadata.getColumnDefinition(bytes("c1"))).value()); |
| assertNull(row.getCell(metadata.getColumnDefinition(bytes("c2")))); |
| } |
| else |
| { |
| Row row = update.iterator().next(); |
| assertNull(row.getCell(metadata.getColumnDefinition(bytes("c1")))); |
| assertEquals(bytes("trigger"), row.getCell(metadata.getColumnDefinition(bytes("c2"))).value()); |
| } |
| } |
| |
| mutatedCFs = new ArrayList<>(tmutations.get(1).getPartitionUpdates()); |
| assertEquals(2, mutatedCFs.size()); |
| |
| for (PartitionUpdate update : mutatedCFs) |
| { |
| if (update.metadata().cfName.equals("cf1")) |
| { |
| Row row = update.iterator().next(); |
| assertEquals(bytes("k2v1"), row.getCell(metadata.getColumnDefinition(bytes("c1"))).value()); |
| assertNull(row.getCell(metadata.getColumnDefinition(bytes("c2")))); |
| } |
| else |
| { |
| Row row = update.iterator().next(); |
| assertNull(row.getCell(metadata.getColumnDefinition(bytes("c1")))); |
| assertEquals(bytes("trigger"), row.getCell(metadata.getColumnDefinition(bytes("c2"))).value()); |
| } |
| } |
| } |
| |
| @Test |
| public void sameKeyDifferentKsRowMutations() throws ConfigurationException, InvalidRequestException |
| { |
| CFMetaData metadata = makeCfMetaData("ks1", "cf1", TriggerMetadata.create("test", SameKeyDifferentKsTrigger.class.getName())); |
| PartitionUpdate cf1 = makeCf(metadata, "k1", "k1v1", null); |
| PartitionUpdate cf2 = makeCf(metadata, "k2", "k2v1", null); |
| Mutation rm1 = new Mutation("ks1", cf1.partitionKey()).add(cf1); |
| Mutation rm2 = new Mutation("ks1", cf2.partitionKey()).add(cf2); |
| |
| List<? extends IMutation> tmutations = new ArrayList<>(TriggerExecutor.instance.execute(Arrays.asList(rm1, rm2))); |
| assertEquals(4, tmutations.size()); |
| Collections.sort(tmutations, new RmComparator()); |
| |
| List<PartitionUpdate> mutatedCFs = new ArrayList<>(tmutations.get(0).getPartitionUpdates()); |
| assertEquals(1, mutatedCFs.size()); |
| Row row = mutatedCFs.get(0).iterator().next(); |
| assertEquals(bytes("k1v1"), row.getCell(metadata.getColumnDefinition(bytes("c1"))).value()); |
| assertNull(row.getCell(metadata.getColumnDefinition(bytes("c2")))); |
| |
| mutatedCFs = new ArrayList<>(tmutations.get(1).getPartitionUpdates()); |
| assertEquals(1, mutatedCFs.size()); |
| row = mutatedCFs.get(0).iterator().next(); |
| assertEquals(bytes("k2v1"), row.getCell(metadata.getColumnDefinition(bytes("c1"))).value()); |
| assertNull(row.getCell(metadata.getColumnDefinition(bytes("c2")))); |
| |
| mutatedCFs = new ArrayList<>(tmutations.get(2).getPartitionUpdates()); |
| assertEquals(1, mutatedCFs.size()); |
| row = mutatedCFs.get(0).iterator().next(); |
| assertNull(row.getCell(metadata.getColumnDefinition(bytes("c1")))); |
| assertEquals(bytes("trigger"), row.getCell(metadata.getColumnDefinition(bytes("c2"))).value()); |
| |
| mutatedCFs = new ArrayList<>(tmutations.get(3).getPartitionUpdates()); |
| assertEquals(1, mutatedCFs.size()); |
| row = mutatedCFs.get(0).iterator().next(); |
| assertNull(row.getCell(metadata.getColumnDefinition(bytes("c1")))); |
| assertEquals(bytes("trigger"), row.getCell(metadata.getColumnDefinition(bytes("c2"))).value()); |
| } |
| |
| @Test |
| public void differentKeyRowMutations() throws ConfigurationException, InvalidRequestException |
| { |
| |
| CFMetaData metadata = makeCfMetaData("ks1", "cf1", TriggerMetadata.create("test", DifferentKeyTrigger.class.getName())); |
| PartitionUpdate cf1 = makeCf(metadata, "k1", "v1", null); |
| Mutation rm = new Mutation("ks1", cf1.partitionKey()).add(cf1); |
| |
| List<? extends IMutation> tmutations = new ArrayList<>(TriggerExecutor.instance.execute(Arrays.asList(rm))); |
| assertEquals(2, tmutations.size()); |
| Collections.sort(tmutations, new RmComparator()); |
| |
| assertEquals(bytes("k1"), tmutations.get(0).key().getKey()); |
| assertEquals(bytes("otherKey"), tmutations.get(1).key().getKey()); |
| |
| List<PartitionUpdate> mutatedCFs = new ArrayList<>(tmutations.get(0).getPartitionUpdates()); |
| assertEquals(1, mutatedCFs.size()); |
| Row row = mutatedCFs.get(0).iterator().next(); |
| assertEquals(bytes("v1"), row.getCell(metadata.getColumnDefinition(bytes("c1"))).value()); |
| assertNull(row.getCell(metadata.getColumnDefinition(bytes("c2")))); |
| |
| mutatedCFs = new ArrayList<>(tmutations.get(1).getPartitionUpdates()); |
| assertEquals(1, mutatedCFs.size()); |
| row = mutatedCFs.get(0).iterator().next(); |
| assertEquals(bytes("trigger"), row.getCell(metadata.getColumnDefinition(bytes("c2"))).value()); |
| assertNull(row.getCell(metadata.getColumnDefinition(bytes("c1")))); |
| } |
| |
| private static CFMetaData makeCfMetaData(String ks, String cf, TriggerMetadata trigger) |
| { |
| CFMetaData metadata = CFMetaData.Builder.create(ks, cf) |
| .addPartitionKey("pkey", UTF8Type.instance) |
| .addRegularColumn("c1", UTF8Type.instance) |
| .addRegularColumn("c2", UTF8Type.instance) |
| .build(); |
| |
| try |
| { |
| if (trigger != null) |
| metadata.triggers(metadata.getTriggers().with(trigger)); |
| } |
| catch (InvalidRequestException e) |
| { |
| throw new AssertionError(e); |
| } |
| |
| return metadata; |
| } |
| |
| private static PartitionUpdate makeCf(CFMetaData metadata, String key, String columnValue1, String columnValue2) |
| { |
| Row.Builder builder = BTreeRow.unsortedBuilder(FBUtilities.nowInSeconds()); |
| builder.newRow(Clustering.EMPTY); |
| long ts = FBUtilities.timestampMicros(); |
| if (columnValue1 != null) |
| builder.addCell(BufferCell.live(metadata, metadata.getColumnDefinition(bytes("c1")), ts, bytes(columnValue1))); |
| if (columnValue2 != null) |
| builder.addCell(BufferCell.live(metadata, metadata.getColumnDefinition(bytes("c2")), ts, bytes(columnValue2))); |
| |
| return PartitionUpdate.singleRowUpdate(metadata, Util.dk(key), builder.build()); |
| } |
| |
| public static class NoOpTrigger implements ITrigger |
| { |
| public Collection<Mutation> augment(Partition partition) |
| { |
| return null; |
| } |
| } |
| |
| public static class SameKeySameCfTrigger implements ITrigger |
| { |
| public Collection<Mutation> augment(Partition partition) |
| { |
| RowUpdateBuilder builder = new RowUpdateBuilder(partition.metadata(), FBUtilities.timestampMicros(), partition.partitionKey().getKey()); |
| builder.add("c2", bytes("trigger")); |
| return Collections.singletonList(builder.build()); |
| } |
| } |
| |
| public static class SameKeySameCfPartialTrigger implements ITrigger |
| { |
| public Collection<Mutation> augment(Partition partition) |
| { |
| if (!partition.partitionKey().getKey().equals(bytes("k2"))) |
| return null; |
| |
| RowUpdateBuilder builder = new RowUpdateBuilder(partition.metadata(), FBUtilities.timestampMicros(), partition.partitionKey().getKey()); |
| builder.add("c2", bytes("trigger")); |
| return Collections.singletonList(builder.build()); |
| } |
| } |
| |
| public static class SameKeyDifferentCfTrigger implements ITrigger |
| { |
| public Collection<Mutation> augment(Partition partition) |
| { |
| RowUpdateBuilder builder = new RowUpdateBuilder(makeCfMetaData(partition.metadata().ksName, "otherCf", null), FBUtilities.timestampMicros(), partition.partitionKey().getKey()); |
| builder.add("c2", bytes("trigger")); |
| return Collections.singletonList(builder.build()); |
| } |
| } |
| |
| public static class SameKeyDifferentKsTrigger implements ITrigger |
| { |
| public Collection<Mutation> augment(Partition partition) |
| { |
| RowUpdateBuilder builder = new RowUpdateBuilder(makeCfMetaData("otherKs", "otherCf", null), FBUtilities.timestampMicros(), partition.partitionKey().getKey()); |
| builder.add("c2", bytes("trigger")); |
| return Collections.singletonList(builder.build()); |
| } |
| } |
| |
| public static class DifferentKeyTrigger implements ITrigger |
| { |
| public Collection<Mutation> augment(Partition partition) |
| { |
| RowUpdateBuilder builder = new RowUpdateBuilder(makeCfMetaData("otherKs", "otherCf", null), FBUtilities.timestampMicros(), "otherKey"); |
| builder.add("c2", bytes("trigger")); |
| return Collections.singletonList(builder.build()); |
| } |
| } |
| |
| private static class RmComparator implements Comparator<IMutation> |
| { |
| public int compare(IMutation m1, IMutation m2) |
| { |
| int cmp = m1.getKeyspaceName().compareTo(m2.getKeyspaceName()); |
| return cmp != 0 ? cmp : m1.key().compareTo(m2.key()); |
| } |
| } |
| |
| private static class CfComparator implements Comparator<Partition> |
| { |
| public int compare(Partition cf1, Partition cf2) |
| { |
| return cf1.metadata().cfName.compareTo(cf2.metadata().cfName); |
| } |
| } |
| } |