blob: c346eb96713e0e3165041fb87709452f95d16e50 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.cql3.statements;
import java.nio.ByteBuffer;
import java.util.*;
import com.google.common.collect.HashMultiset;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import org.apache.cassandra.db.commitlog.CommitLogSegment;
import org.apache.cassandra.db.virtual.VirtualMutation;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import static org.apache.cassandra.utils.MonotonicClock.Global.approxTime;
/**
* Utility class to collect updates.
*
* <p>In a batch statement we don't want to recreate mutations every time as this is particularly inefficient when
* applying multiple batch to the same partition (see #6737). </p>
*
*/
final class BatchUpdatesCollector implements UpdatesCollector
{
/**
* The columns that will be updated for each table (keyed by the table ID).
*/
private final Map<TableId, RegularAndStaticColumns> updatedColumns;
/**
* The number of updated rows per table and key.
*/
private final Map<TableId, HashMultiset<ByteBuffer>> perPartitionKeyCounts;
/**
* The mutations per keyspace.
*
* optimised for the common single-keyspace case
*
* Key is keyspace name, then we have an IMutationBuilder for each touched partition key in that keyspace
*
* MutationBuilder holds a PartitionUpdate.Builder
*/
private final Map<String, Map<ByteBuffer, IMutationBuilder>> mutationBuilders = Maps.newHashMapWithExpectedSize(1);
BatchUpdatesCollector(Map<TableId, RegularAndStaticColumns> updatedColumns, Map<TableId, HashMultiset<ByteBuffer>> perPartitionKeyCounts)
{
super();
this.updatedColumns = updatedColumns;
this.perPartitionKeyCounts = perPartitionKeyCounts;
}
/**
* Gets the <code>PartitionUpdate.Builder</code> for the specified column family and key. If the builder does not
* exist it will be created.
*
* @param metadata the column family meta data
* @param dk the partition key
* @param consistency the consistency level
* @return the <code>PartitionUpdate.Builder</code> for the specified column family and key
*/
public PartitionUpdate.Builder getPartitionUpdateBuilder(TableMetadata metadata, DecoratedKey dk, ConsistencyLevel consistency)
{
IMutationBuilder mut = getMutationBuilder(metadata, dk, consistency);
PartitionUpdate.Builder upd = mut.get(metadata.id);
if (upd == null)
{
RegularAndStaticColumns columns = updatedColumns.get(metadata.id);
assert columns != null;
upd = new PartitionUpdate.Builder(metadata, dk, columns, perPartitionKeyCounts.get(metadata.id).count(dk.getKey()));
mut.add(upd);
}
return upd;
}
private IMutationBuilder getMutationBuilder(TableMetadata metadata, DecoratedKey dk, ConsistencyLevel consistency)
{
Map<ByteBuffer, IMutationBuilder> ksMap = keyspaceMap(metadata.keyspace);
IMutationBuilder mutationBuilder = ksMap.get(dk.getKey());
if (mutationBuilder == null)
{
mutationBuilder = makeMutationBuilder(metadata, dk, consistency);
ksMap.put(dk.getKey(), mutationBuilder);
}
return mutationBuilder;
}
private IMutationBuilder makeMutationBuilder(TableMetadata metadata, DecoratedKey partitionKey, ConsistencyLevel cl)
{
if (metadata.isVirtual())
{
return new VirtualMutationBuilder(metadata.keyspace, partitionKey);
}
else
{
MutationBuilder builder = new MutationBuilder(metadata.keyspace, partitionKey, 1);
return metadata.isCounter() ? new CounterMutationBuilder(builder, cl) : builder;
}
}
/**
* Returns a collection containing all the mutations.
* @return a collection containing all the mutations.
*/
public List<IMutation> toMutations()
{
List<IMutation> ms = new ArrayList<>();
for (Map<ByteBuffer, IMutationBuilder> ksMap : mutationBuilders.values())
{
for (IMutationBuilder builder : ksMap.values())
{
IMutation mutation = builder.build();
mutation.validateIndexedColumns();
mutation.validateSize(MessagingService.current_version, CommitLogSegment.ENTRY_OVERHEAD_SIZE);
ms.add(mutation);
}
}
return ms;
}
/**
* Returns the key-mutation mappings for the specified keyspace.
*
* @param ksName the keyspace name
* @return the key-mutation mappings for the specified keyspace.
*/
private Map<ByteBuffer, IMutationBuilder> keyspaceMap(String ksName)
{
Map<ByteBuffer, IMutationBuilder> ksMap = mutationBuilders.get(ksName);
if (ksMap == null)
{
ksMap = Maps.newHashMapWithExpectedSize(1);
mutationBuilders.put(ksName, ksMap);
}
return ksMap;
}
private interface IMutationBuilder
{
/**
* Add a new PartitionUpdate builder to this mutation builder
* @param builder the builder to add
* @return this
*/
IMutationBuilder add(PartitionUpdate.Builder builder);
/**
* Build the immutable mutation
*/
IMutation build();
/**
* Get the builder for the given tableId
*/
PartitionUpdate.Builder get(TableId tableId);
}
private static class MutationBuilder implements IMutationBuilder
{
private final Map<TableId, PartitionUpdate.Builder> modifications;
private final DecoratedKey key;
private final String keyspaceName;
private final long createdAt = approxTime.now();
private MutationBuilder(String keyspaceName, DecoratedKey key, int initialSize)
{
this.keyspaceName = keyspaceName;
this.key = key;
this.modifications = Maps.newHashMapWithExpectedSize(initialSize);
}
public MutationBuilder add(PartitionUpdate.Builder updateBuilder)
{
assert updateBuilder != null;
assert updateBuilder.partitionKey().getPartitioner() == key.getPartitioner();
PartitionUpdate.Builder prev = modifications.put(updateBuilder.metadata().id, updateBuilder);
if (prev != null)
// developer error
throw new IllegalArgumentException("Table " + updateBuilder.metadata().name + " already has modifications in this mutation: " + prev);
return this;
}
public Mutation build()
{
ImmutableMap.Builder<TableId, PartitionUpdate> updates = new ImmutableMap.Builder<>();
for (Map.Entry<TableId, PartitionUpdate.Builder> updateEntry : modifications.entrySet())
{
PartitionUpdate update = updateEntry.getValue().build();
updates.put(updateEntry.getKey(), update);
}
return new Mutation(keyspaceName, key, updates.build(), createdAt);
}
public PartitionUpdate.Builder get(TableId tableId)
{
return modifications.get(tableId);
}
public DecoratedKey key()
{
return key;
}
public boolean isEmpty()
{
return modifications.isEmpty();
}
public String getKeyspaceName()
{
return keyspaceName;
}
}
private static class CounterMutationBuilder implements IMutationBuilder
{
private final MutationBuilder mutationBuilder;
private final ConsistencyLevel cl;
private CounterMutationBuilder(MutationBuilder mutationBuilder, ConsistencyLevel cl)
{
this.mutationBuilder = mutationBuilder;
this.cl = cl;
}
public IMutationBuilder add(PartitionUpdate.Builder builder)
{
return mutationBuilder.add(builder);
}
public IMutation build()
{
return new CounterMutation(mutationBuilder.build(), cl);
}
public PartitionUpdate.Builder get(TableId id)
{
return mutationBuilder.get(id);
}
}
private static class VirtualMutationBuilder implements IMutationBuilder
{
private final String keyspaceName;
private final DecoratedKey partitionKey;
private final HashMap<TableId, PartitionUpdate.Builder> modifications = new HashMap<>();
private VirtualMutationBuilder(String keyspaceName, DecoratedKey partitionKey)
{
this.keyspaceName = keyspaceName;
this.partitionKey = partitionKey;
}
@Override
public VirtualMutationBuilder add(PartitionUpdate.Builder builder)
{
PartitionUpdate.Builder prev = modifications.put(builder.metadata().id, builder);
if (null != prev)
throw new IllegalStateException();
return this;
}
@Override
public VirtualMutation build()
{
ImmutableMap.Builder<TableId, PartitionUpdate> updates = new ImmutableMap.Builder<>();
modifications.forEach((tableId, updateBuilder) -> updates.put(tableId, updateBuilder.build()));
return new VirtualMutation(keyspaceName, partitionKey, updates.build());
}
@Override
public PartitionUpdate.Builder get(TableId tableId)
{
return modifications.get(tableId);
}
}
}