| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.cassandra.cql3.statements; |
| |
| import java.nio.ByteBuffer; |
| import java.util.*; |
| |
| import com.google.common.collect.HashMultiset; |
| import com.google.common.collect.ImmutableMap; |
| import com.google.common.collect.Maps; |
| |
| import org.apache.cassandra.db.commitlog.CommitLogSegment; |
| import org.apache.cassandra.db.virtual.VirtualMutation; |
| import org.apache.cassandra.net.MessagingService; |
| import org.apache.cassandra.schema.TableId; |
| import org.apache.cassandra.schema.TableMetadata; |
| import org.apache.cassandra.db.*; |
| import org.apache.cassandra.db.partitions.PartitionUpdate; |
| |
| import static org.apache.cassandra.utils.MonotonicClock.Global.approxTime; |
| |
| /** |
| * Utility class to collect updates. |
| * |
| * <p>In a batch statement we don't want to recreate mutations every time as this is particularly inefficient when |
| * applying multiple batch to the same partition (see #6737). </p> |
| * |
| */ |
| final class BatchUpdatesCollector implements UpdatesCollector |
| { |
| /** |
| * The columns that will be updated for each table (keyed by the table ID). |
| */ |
| private final Map<TableId, RegularAndStaticColumns> updatedColumns; |
| |
| /** |
| * The number of updated rows per table and key. |
| */ |
| private final Map<TableId, HashMultiset<ByteBuffer>> perPartitionKeyCounts; |
| |
| /** |
| * The mutations per keyspace. |
| * |
| * optimised for the common single-keyspace case |
| * |
| * Key is keyspace name, then we have an IMutationBuilder for each touched partition key in that keyspace |
| * |
| * MutationBuilder holds a PartitionUpdate.Builder |
| */ |
| private final Map<String, Map<ByteBuffer, IMutationBuilder>> mutationBuilders = Maps.newHashMapWithExpectedSize(1); |
| |
| |
| BatchUpdatesCollector(Map<TableId, RegularAndStaticColumns> updatedColumns, Map<TableId, HashMultiset<ByteBuffer>> perPartitionKeyCounts) |
| { |
| super(); |
| this.updatedColumns = updatedColumns; |
| this.perPartitionKeyCounts = perPartitionKeyCounts; |
| } |
| |
| /** |
| * Gets the <code>PartitionUpdate.Builder</code> for the specified column family and key. If the builder does not |
| * exist it will be created. |
| * |
| * @param metadata the column family meta data |
| * @param dk the partition key |
| * @param consistency the consistency level |
| * @return the <code>PartitionUpdate.Builder</code> for the specified column family and key |
| */ |
| public PartitionUpdate.Builder getPartitionUpdateBuilder(TableMetadata metadata, DecoratedKey dk, ConsistencyLevel consistency) |
| { |
| IMutationBuilder mut = getMutationBuilder(metadata, dk, consistency); |
| PartitionUpdate.Builder upd = mut.get(metadata.id); |
| if (upd == null) |
| { |
| RegularAndStaticColumns columns = updatedColumns.get(metadata.id); |
| assert columns != null; |
| upd = new PartitionUpdate.Builder(metadata, dk, columns, perPartitionKeyCounts.get(metadata.id).count(dk.getKey())); |
| mut.add(upd); |
| } |
| return upd; |
| } |
| |
| private IMutationBuilder getMutationBuilder(TableMetadata metadata, DecoratedKey dk, ConsistencyLevel consistency) |
| { |
| Map<ByteBuffer, IMutationBuilder> ksMap = keyspaceMap(metadata.keyspace); |
| IMutationBuilder mutationBuilder = ksMap.get(dk.getKey()); |
| if (mutationBuilder == null) |
| { |
| mutationBuilder = makeMutationBuilder(metadata, dk, consistency); |
| ksMap.put(dk.getKey(), mutationBuilder); |
| } |
| return mutationBuilder; |
| } |
| |
| private IMutationBuilder makeMutationBuilder(TableMetadata metadata, DecoratedKey partitionKey, ConsistencyLevel cl) |
| { |
| if (metadata.isVirtual()) |
| { |
| return new VirtualMutationBuilder(metadata.keyspace, partitionKey); |
| } |
| else |
| { |
| MutationBuilder builder = new MutationBuilder(metadata.keyspace, partitionKey, 1); |
| return metadata.isCounter() ? new CounterMutationBuilder(builder, cl) : builder; |
| } |
| } |
| |
| /** |
| * Returns a collection containing all the mutations. |
| * @return a collection containing all the mutations. |
| */ |
| public List<IMutation> toMutations() |
| { |
| List<IMutation> ms = new ArrayList<>(); |
| for (Map<ByteBuffer, IMutationBuilder> ksMap : mutationBuilders.values()) |
| { |
| for (IMutationBuilder builder : ksMap.values()) |
| { |
| IMutation mutation = builder.build(); |
| mutation.validateIndexedColumns(); |
| mutation.validateSize(MessagingService.current_version, CommitLogSegment.ENTRY_OVERHEAD_SIZE); |
| ms.add(mutation); |
| } |
| } |
| return ms; |
| } |
| |
| /** |
| * Returns the key-mutation mappings for the specified keyspace. |
| * |
| * @param ksName the keyspace name |
| * @return the key-mutation mappings for the specified keyspace. |
| */ |
| private Map<ByteBuffer, IMutationBuilder> keyspaceMap(String ksName) |
| { |
| Map<ByteBuffer, IMutationBuilder> ksMap = mutationBuilders.get(ksName); |
| if (ksMap == null) |
| { |
| ksMap = Maps.newHashMapWithExpectedSize(1); |
| mutationBuilders.put(ksName, ksMap); |
| } |
| return ksMap; |
| } |
| |
| private interface IMutationBuilder |
| { |
| /** |
| * Add a new PartitionUpdate builder to this mutation builder |
| * @param builder the builder to add |
| * @return this |
| */ |
| IMutationBuilder add(PartitionUpdate.Builder builder); |
| |
| /** |
| * Build the immutable mutation |
| */ |
| IMutation build(); |
| |
| /** |
| * Get the builder for the given tableId |
| */ |
| PartitionUpdate.Builder get(TableId tableId); |
| } |
| |
| private static class MutationBuilder implements IMutationBuilder |
| { |
| private final Map<TableId, PartitionUpdate.Builder> modifications; |
| private final DecoratedKey key; |
| private final String keyspaceName; |
| private final long createdAt = approxTime.now(); |
| |
| private MutationBuilder(String keyspaceName, DecoratedKey key, int initialSize) |
| { |
| this.keyspaceName = keyspaceName; |
| this.key = key; |
| this.modifications = Maps.newHashMapWithExpectedSize(initialSize); |
| } |
| |
| public MutationBuilder add(PartitionUpdate.Builder updateBuilder) |
| { |
| assert updateBuilder != null; |
| assert updateBuilder.partitionKey().getPartitioner() == key.getPartitioner(); |
| PartitionUpdate.Builder prev = modifications.put(updateBuilder.metadata().id, updateBuilder); |
| if (prev != null) |
| // developer error |
| throw new IllegalArgumentException("Table " + updateBuilder.metadata().name + " already has modifications in this mutation: " + prev); |
| return this; |
| } |
| |
| public Mutation build() |
| { |
| ImmutableMap.Builder<TableId, PartitionUpdate> updates = new ImmutableMap.Builder<>(); |
| for (Map.Entry<TableId, PartitionUpdate.Builder> updateEntry : modifications.entrySet()) |
| { |
| PartitionUpdate update = updateEntry.getValue().build(); |
| updates.put(updateEntry.getKey(), update); |
| } |
| return new Mutation(keyspaceName, key, updates.build(), createdAt); |
| } |
| |
| public PartitionUpdate.Builder get(TableId tableId) |
| { |
| return modifications.get(tableId); |
| } |
| |
| public DecoratedKey key() |
| { |
| return key; |
| } |
| |
| public boolean isEmpty() |
| { |
| return modifications.isEmpty(); |
| } |
| |
| public String getKeyspaceName() |
| { |
| return keyspaceName; |
| } |
| } |
| |
| private static class CounterMutationBuilder implements IMutationBuilder |
| { |
| private final MutationBuilder mutationBuilder; |
| private final ConsistencyLevel cl; |
| |
| private CounterMutationBuilder(MutationBuilder mutationBuilder, ConsistencyLevel cl) |
| { |
| this.mutationBuilder = mutationBuilder; |
| this.cl = cl; |
| } |
| |
| public IMutationBuilder add(PartitionUpdate.Builder builder) |
| { |
| return mutationBuilder.add(builder); |
| } |
| |
| public IMutation build() |
| { |
| return new CounterMutation(mutationBuilder.build(), cl); |
| } |
| |
| public PartitionUpdate.Builder get(TableId id) |
| { |
| return mutationBuilder.get(id); |
| } |
| } |
| |
| private static class VirtualMutationBuilder implements IMutationBuilder |
| { |
| private final String keyspaceName; |
| private final DecoratedKey partitionKey; |
| |
| private final HashMap<TableId, PartitionUpdate.Builder> modifications = new HashMap<>(); |
| |
| private VirtualMutationBuilder(String keyspaceName, DecoratedKey partitionKey) |
| { |
| this.keyspaceName = keyspaceName; |
| this.partitionKey = partitionKey; |
| } |
| |
| @Override |
| public VirtualMutationBuilder add(PartitionUpdate.Builder builder) |
| { |
| PartitionUpdate.Builder prev = modifications.put(builder.metadata().id, builder); |
| if (null != prev) |
| throw new IllegalStateException(); |
| return this; |
| } |
| |
| @Override |
| public VirtualMutation build() |
| { |
| ImmutableMap.Builder<TableId, PartitionUpdate> updates = new ImmutableMap.Builder<>(); |
| modifications.forEach((tableId, updateBuilder) -> updates.put(tableId, updateBuilder.build())); |
| return new VirtualMutation(keyspaceName, partitionKey, updates.build()); |
| } |
| |
| @Override |
| public PartitionUpdate.Builder get(TableId tableId) |
| { |
| return modifications.get(tableId); |
| } |
| } |
| } |