blob: 5a41271474dd383adb8585063623c6e61639a51a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.db;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableCollection;
import com.google.common.collect.ImmutableMap;
import org.apache.commons.lang3.StringUtils;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.config.CassandraRelevantProperties;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.db.rows.DeserializationHelper;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.io.util.TeeDataInputPlus;
import org.apache.cassandra.locator.ReplicaPlan;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.service.AbstractWriteResponseHandler;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.concurrent.Future;
import static org.apache.cassandra.net.MessagingService.VERSION_40;
import static org.apache.cassandra.net.MessagingService.VERSION_50;
import static org.apache.cassandra.net.MessagingService.VERSION_51;
import static org.apache.cassandra.utils.MonotonicClock.Global.approxTime;
public class Mutation implements IMutation, Supplier<Mutation>
{
public static final MutationSerializer serializer = new MutationSerializer();
// todo this is redundant
// when we remove it, also restore SerializationsTest.testMutationRead to not regenerate new Mutations each test
private final String keyspaceName;
private final DecoratedKey key;
// map of column family id to mutations for that column family.
private final ImmutableMap<TableId, PartitionUpdate> modifications;
// Time at which this mutation or the builder that built it was instantiated
final long approxCreatedAtNanos;
// keep track of when mutation has started waiting for a MV partition lock
final AtomicLong viewLockAcquireStart = new AtomicLong(0);
private final boolean cdcEnabled;
private static final int SERIALIZATION_VERSION_COUNT = MessagingService.Version.values().length;
// Contains serialized representations of this mutation.
// Note: there is no functionality to clear/remove serialized instances, because a mutation must never
// be modified (e.g. calling add(PartitionUpdate)) when it's being serialized.
private final Serialization[] cachedSerializations = new Serialization[SERIALIZATION_VERSION_COUNT];
/** @see CassandraRelevantProperties#CACHEABLE_MUTATION_SIZE_LIMIT */
private static final long CACHEABLE_MUTATION_SIZE_LIMIT = CassandraRelevantProperties.CACHEABLE_MUTATION_SIZE_LIMIT.getLong();
public Mutation(PartitionUpdate update)
{
this(update.metadata().keyspace, update.partitionKey(), ImmutableMap.of(update.metadata().id, update), approxTime.now(), update.metadata().params.cdc);
}
public Mutation(String keyspaceName, DecoratedKey key, ImmutableMap<TableId, PartitionUpdate> modifications, long approxCreatedAtNanos)
{
this(keyspaceName, key, modifications, approxCreatedAtNanos, cdcEnabled(modifications.values()));
}
public Mutation(String keyspaceName, DecoratedKey key, ImmutableMap<TableId, PartitionUpdate> modifications, long approxCreatedAtNanos, boolean cdcEnabled)
{
this.keyspaceName = keyspaceName;
this.key = key;
this.modifications = modifications;
this.cdcEnabled = cdcEnabled;
this.approxCreatedAtNanos = approxCreatedAtNanos;
}
private static boolean cdcEnabled(Iterable<PartitionUpdate> modifications)
{
boolean cdc = false;
for (PartitionUpdate pu : modifications)
cdc |= pu.metadata().params.cdc;
return cdc;
}
public Mutation without(Set<TableId> tableIds)
{
if (tableIds.isEmpty())
return this;
ImmutableMap.Builder<TableId, PartitionUpdate> builder = new ImmutableMap.Builder<>();
for (Map.Entry<TableId, PartitionUpdate> update : modifications.entrySet())
{
if (!tableIds.contains(update.getKey()))
{
builder.put(update);
}
}
return new Mutation(keyspaceName, key, builder.build(), approxCreatedAtNanos);
}
public Mutation without(TableId tableId)
{
return without(Collections.singleton(tableId));
}
public String getKeyspaceName()
{
return keyspaceName;
}
public Collection<TableId> getTableIds()
{
return modifications.keySet();
}
public DecoratedKey key()
{
return key;
}
public ImmutableCollection<PartitionUpdate> getPartitionUpdates()
{
return modifications.values();
}
@Override
public Supplier<Mutation> hintOnFailure()
{
return this;
}
@Override
public Mutation get()
{
return this;
}
public void validateSize(int version, int overhead)
{
long totalSize = serializedSize(version) + overhead;
if(totalSize > MAX_MUTATION_SIZE)
{
CommitLog.instance.metrics.oversizedMutations.mark();
throw new MutationExceededMaxSizeException(this, version, totalSize);
}
}
public PartitionUpdate getPartitionUpdate(TableMetadata table)
{
return table == null ? null : modifications.get(table.id);
}
public boolean isEmpty()
{
return modifications.isEmpty();
}
/**
* Creates a new mutation that merges all the provided mutations.
*
* @param mutations the mutations to merge together. All mutation must be
* on the same keyspace and partition key. There should also be at least one
* mutation.
* @return a mutation that contains all the modifications contained in {@code mutations}.
*
* @throws IllegalArgumentException if not all the mutations are on the same
* keyspace and key.
*/
public static Mutation merge(List<Mutation> mutations)
{
assert !mutations.isEmpty();
if (mutations.size() == 1)
return mutations.get(0);
Set<TableId> updatedTables = new HashSet<>();
String ks = null;
DecoratedKey key = null;
for (Mutation mutation : mutations)
{
updatedTables.addAll(mutation.modifications.keySet());
if (ks != null && !ks.equals(mutation.keyspaceName))
throw new IllegalArgumentException();
if (key != null && !key.equals(mutation.key))
throw new IllegalArgumentException();
ks = mutation.keyspaceName;
key = mutation.key;
}
List<PartitionUpdate> updates = new ArrayList<>(mutations.size());
ImmutableMap.Builder<TableId, PartitionUpdate> modifications = new ImmutableMap.Builder<>();
for (TableId table : updatedTables)
{
for (Mutation mutation : mutations)
{
PartitionUpdate upd = mutation.modifications.get(table);
if (upd != null)
updates.add(upd);
}
if (updates.isEmpty())
continue;
modifications.put(table, updates.size() == 1 ? updates.get(0) : PartitionUpdate.merge(updates));
updates.clear();
}
return new Mutation(ks, key, modifications.build(), approxTime.now());
}
public Future<?> applyFuture()
{
Keyspace ks = Keyspace.open(keyspaceName);
return ks.applyFuture(this, Keyspace.open(keyspaceName).getMetadata().params.durableWrites, true);
}
private void apply(Keyspace keyspace, boolean durableWrites, boolean isDroppable)
{
keyspace.apply(this, durableWrites, true, isDroppable);
}
public void apply(boolean durableWrites, boolean isDroppable)
{
apply(Keyspace.open(keyspaceName), durableWrites, isDroppable);
}
public void apply(boolean durableWrites)
{
apply(durableWrites, true);
}
/*
* This is equivalent to calling commit. Applies the changes to
* to the keyspace that is obtained by calling Keyspace.open().
*/
public void apply()
{
Keyspace keyspace = Keyspace.open(keyspaceName);
apply(keyspace, keyspace.getMetadata().params.durableWrites, true);
}
public void applyUnsafe()
{
apply(false);
}
public long getTimeout(TimeUnit unit)
{
return DatabaseDescriptor.getWriteRpcTimeout(unit);
}
public int smallestGCGS()
{
int gcgs = Integer.MAX_VALUE;
for (PartitionUpdate update : getPartitionUpdates())
gcgs = Math.min(gcgs, update.metadata().params.gcGraceSeconds);
return gcgs;
}
public boolean trackedByCDC()
{
return cdcEnabled;
}
public String toString()
{
return toString(false);
}
public String toString(boolean shallow)
{
StringBuilder buff = new StringBuilder("Mutation(");
buff.append("keyspace='").append(keyspaceName).append('\'');
buff.append(", key='").append(ByteBufferUtil.bytesToHex(key.getKey())).append('\'');
buff.append(", modifications=[");
if (shallow)
{
List<String> cfnames = new ArrayList<>(modifications.size());
for (TableId tableId : modifications.keySet())
{
TableMetadata cfm = Schema.instance.getTableMetadata(tableId);
cfnames.add(cfm == null ? "-dropped-" : cfm.name);
}
buff.append(StringUtils.join(cfnames, ", "));
}
else
{
buff.append("\n ").append(StringUtils.join(modifications.values(), "\n ")).append('\n');
}
return buff.append("])").toString();
}
private int serializedSize40;
private int serializedSize50;
private int serializedSize51;
public int serializedSize(int version)
{
switch (version)
{
case VERSION_40:
if (serializedSize40 == 0)
serializedSize40 = (int) serializer.serializedSize(this, VERSION_40);
return serializedSize40;
case VERSION_50:
if (serializedSize50 == 0)
serializedSize50 = (int) serializer.serializedSize(this, VERSION_50);
return serializedSize50;
case VERSION_51:
if (serializedSize51 == 0)
serializedSize51 = (int) serializer.serializedSize(this, VERSION_51);
return serializedSize51;
default:
throw new IllegalStateException("Unknown serialization version: " + version);
}
}
/**
* Creates a new simple mutuation builder.
*
* @param keyspaceName the name of the keyspace this is a mutation for.
* @param partitionKey the key of partition this if a mutation for.
* @return a newly created builder.
*/
public static SimpleBuilder simpleBuilder(String keyspaceName, DecoratedKey partitionKey)
{
return new SimpleBuilders.MutationBuilder(keyspaceName, partitionKey);
}
/**
* Interface for building mutations geared towards human.
* <p>
* This should generally not be used when performance matters too much, but provides a more convenient interface to
* build a mutation than using the class constructor when performance is not of the utmost importance.
*/
public interface SimpleBuilder
{
/**
* Sets the timestamp to use for the following additions to this builder or any derived (update or row) builder.
*
* @param timestamp the timestamp to use for following additions. If that timestamp hasn't been set, the current
* time in microseconds will be used.
* @return this builder.
*/
public SimpleBuilder timestamp(long timestamp);
/**
* Sets the ttl to use for the following additions to this builder or any derived (update or row) builder.
* <p>
* Note that the for non-compact tables, this method must be called before any column addition for this
* ttl to be used for the row {@code LivenessInfo}.
*
* @param ttl the ttl to use for following additions. If that ttl hasn't been set, no ttl will be used.
* @return this builder.
*/
public SimpleBuilder ttl(int ttl);
/**
* Adds an update for table identified by the provided metadata and return a builder for that partition.
*
* @param metadata the metadata of the table for which to add an update.
* @return a builder for the partition identified by {@code metadata} (and the partition key for which this is a
* mutation of).
*/
public PartitionUpdate.SimpleBuilder update(TableMetadata metadata);
/**
* Adds an update for table identified by the provided name and return a builder for that partition.
*
* @param tableName the name of the table for which to add an update.
* @return a builder for the partition identified by {@code metadata} (and the partition key for which this is a
* mutation of).
*/
public PartitionUpdate.SimpleBuilder update(String tableName);
/**
* Build the mutation represented by this builder.
*
* @return the built mutation.
*/
public Mutation build();
}
public static class MutationSerializer implements IVersionedSerializer<Mutation>
{
public void serialize(Mutation mutation, DataOutputPlus out, int version) throws IOException
{
serialization(mutation, version).serialize(PartitionUpdate.serializer, mutation, out, version);
}
/**
* Called early during request processing to prevent that {@link #serialization(Mutation, int)} is
* called concurrently.
* See {@link org.apache.cassandra.service.StorageProxy#sendToHintedReplicas(Mutation, ReplicaPlan.ForWrite, AbstractWriteResponseHandler, String, Stage)}
*/
@SuppressWarnings("JavadocReference")
public void prepareSerializedBuffer(Mutation mutation, int version)
{
serialization(mutation, version);
}
/**
* Retrieve the cached serialization of this mutation, or compute and cache said serialization if it doesn't
* exist yet. Note that this method is _not_ synchronized even though it may (and will often) be called
* concurrently. Concurrent calls are still safe however, the only risk is that the value is not cached yet,
* multiple concurrent calls may compute it multiple times instead of just once. This is ok as in practice
* as we make sure this doesn't happen in the hot path by forcing the initial caching in
* {@link org.apache.cassandra.service.StorageProxy#sendToHintedReplicas(Mutation, ReplicaPlan.ForWrite, AbstractWriteResponseHandler, String, Stage)}
* via {@link #prepareSerializedBuffer(Mutation)}, which is the only caller that passes
* {@code isPrepare==true}.
*/
@SuppressWarnings("JavadocReference")
private Serialization serialization(Mutation mutation, int version)
{
int versionOrdinal = MessagingService.getVersionOrdinal(version);
// Retrieves the cached version, or build+cache it if it's not cached already.
Serialization serialization = mutation.cachedSerializations[versionOrdinal];
if (serialization == null)
{
serialization = new SizeOnlyCacheableSerialization();
long serializedSize = serialization.serializedSize(PartitionUpdate.serializer, mutation, version);
// Excessively large mutation objects cause GC pressure and huge allocations when serialized.
// so we only cache serialized mutations when they are below the defined limit.
if (serializedSize < CACHEABLE_MUTATION_SIZE_LIMIT)
{
try (DataOutputBuffer dob = DataOutputBuffer.scratchBuffer.get())
{
serializeInternal(PartitionUpdate.serializer, mutation, dob, version);
serialization = new CachedSerialization(dob.toByteArray());
}
catch (IOException e)
{
throw new RuntimeException(e);
}
}
mutation.cachedSerializations[versionOrdinal] = serialization;
}
return serialization;
}
static void serializeInternal(PartitionUpdate.PartitionUpdateSerializer serializer,
Mutation mutation,
DataOutputPlus out,
int version) throws IOException
{
Map<TableId, PartitionUpdate> modifications = mutation.modifications;
/* serialize the modifications in the mutation */
int size = modifications.size();
out.writeUnsignedVInt32(size);
assert size > 0;
for (PartitionUpdate partitionUpdate : modifications.values())
{
serializer.serialize(partitionUpdate, out, version);
}
}
public Mutation deserialize(DataInputPlus in, int version, DeserializationHelper.Flag flag) throws IOException
{
Mutation m;
TeeDataInputPlus teeIn;
try (DataOutputBuffer dob = DataOutputBuffer.scratchBuffer.get())
{
teeIn = new TeeDataInputPlus(in, dob, CACHEABLE_MUTATION_SIZE_LIMIT);
int size = teeIn.readUnsignedVInt32();
assert size > 0;
PartitionUpdate update = PartitionUpdate.serializer.deserialize(teeIn, version, flag);
if (size == 1)
{
m = new Mutation(update);
}
else
{
ImmutableMap.Builder<TableId, PartitionUpdate> modifications = new ImmutableMap.Builder<>();
DecoratedKey dk = update.partitionKey();
modifications.put(update.metadata().id, update);
for (int i = 1; i < size; ++i)
{
update = PartitionUpdate.serializer.deserialize(teeIn, version, flag);
modifications.put(update.metadata().id, update);
}
m = new Mutation(update.metadata().keyspace, dk, modifications.build(), approxTime.now());
}
//Only cache serializations that don't hit the limit
if (!teeIn.isLimitReached())
m.cachedSerializations[MessagingService.getVersionOrdinal(version)] = new CachedSerialization(dob.toByteArray());
return m;
}
}
public Mutation deserialize(DataInputPlus in, int version) throws IOException
{
return deserialize(in, version, DeserializationHelper.Flag.FROM_REMOTE);
}
public long serializedSize(Mutation mutation, int version)
{
return serialization(mutation, version).serializedSize(PartitionUpdate.serializer, mutation, version);
}
}
/**
* There are two implementations of this class. One that keeps the serialized representation on-heap for later
* reuse and one that doesn't. Keeping all sized mutations around may lead to "bad" GC pressure (G1 GC) due to humongous objects.
* By default serialized mutations up to 2MB are kept on-heap - see {@link org.apache.cassandra.config.CassandraRelevantProperties#CACHEABLE_MUTATION_SIZE_LIMIT}.
*/
private static abstract class Serialization
{
abstract void serialize(PartitionUpdate.PartitionUpdateSerializer serializer, Mutation mutation, DataOutputPlus out, int version) throws IOException;
abstract long serializedSize(PartitionUpdate.PartitionUpdateSerializer serializer, Mutation mutation, int version);
}
/**
* Represents the cached serialization of a {@link Mutation} as a {@code byte[]}.
*/
private static final class CachedSerialization extends Serialization
{
private final byte[] serialized;
CachedSerialization(byte[] serialized)
{
this.serialized = Preconditions.checkNotNull(serialized);
}
@Override
void serialize(PartitionUpdate.PartitionUpdateSerializer serializer, Mutation mutation, DataOutputPlus out, int version) throws IOException
{
out.write(serialized);
}
@Override
long serializedSize(PartitionUpdate.PartitionUpdateSerializer serializer, Mutation mutation, int version)
{
return serialized.length;
}
}
/**
* Represents a non-cacheable serialization of a {@link Mutation}, only the size of the mutation is lazily cached.
*/
private static final class SizeOnlyCacheableSerialization extends Serialization
{
private volatile long size;
@Override
void serialize(PartitionUpdate.PartitionUpdateSerializer serializer, Mutation mutation, DataOutputPlus out, int version) throws IOException
{
MutationSerializer.serializeInternal(serializer, mutation, out, version);
}
@Override
long serializedSize(PartitionUpdate.PartitionUpdateSerializer serializer, Mutation mutation, int version)
{
long size = this.size;
if (size == 0L)
{
size = TypeSizes.sizeofUnsignedVInt(mutation.modifications.size());
for (PartitionUpdate partitionUpdate : mutation.modifications.values())
size += serializer.serializedSize(partitionUpdate, version);
this.size = size;
}
return size;
}
}
/**
* Collects finalized partition updates
*/
public static class PartitionUpdateCollector
{
private final ImmutableMap.Builder<TableId, PartitionUpdate> modifications = new ImmutableMap.Builder<>();
private final String keyspaceName;
private final DecoratedKey key;
private final long approxCreatedAtNanos = approxTime.now();
private boolean empty = true;
public PartitionUpdateCollector(String keyspaceName, DecoratedKey key)
{
this.keyspaceName = keyspaceName;
this.key = key;
}
public PartitionUpdateCollector add(PartitionUpdate partitionUpdate)
{
assert partitionUpdate != null;
assert partitionUpdate.partitionKey().getPartitioner() == key.getPartitioner();
// note that ImmutableMap.Builder only allows put:ing the same key once, it will fail during build() below otherwise
modifications.put(partitionUpdate.metadata().id, partitionUpdate);
empty = false;
return this;
}
public DecoratedKey key()
{
return key;
}
public String getKeyspaceName()
{
return keyspaceName;
}
public boolean isEmpty()
{
return empty;
}
public Mutation build()
{
return new Mutation(keyspaceName, key, modifications.build(), approxCreatedAtNanos);
}
}
}