blob: 60d76415e72a22bcc9790c49b6e29e90343faaef [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cassandra.hints;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Objects;
import java.util.UUID;
import javax.annotation.Nullable;
import com.google.common.primitives.Ints;
import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.exceptions.UnknownTableException;
import org.apache.cassandra.io.IVersionedAsymmetricSerializer;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.io.util.TrackedDataInputPlus;
import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.utils.UUIDSerializer;
/**
* The message we use to dispatch and forward hints.
*
* Encodes the host id the hint is meant for and the hint itself.
* We use the host id to determine whether we should store or apply the hint:
* 1. If host id equals to the receiving node host id, then we apply the hint
* 2. If host id is different from the receiving node's host id, then we store the hint
*
* Scenario (1) means that we are dealing with regular hint dispatch.
* Scenario (2) means that we got a hint from a node that's going through decommissioning and is streaming its hints
* elsewhere first.
*/
public final class HintMessage implements SerializableHintMessage
{
public static final IVersionedAsymmetricSerializer<SerializableHintMessage, HintMessage> serializer = new Serializer();
final UUID hostId;
@Nullable // can be null if we fail do decode the hint because of an unknown table id in it
final Hint hint;
@Nullable // will usually be null, unless a hint deserialization fails due to an unknown table id
final TableId unknownTableID;
HintMessage(UUID hostId, Hint hint)
{
assert hint != null;
this.hostId = hostId;
this.hint = hint;
this.unknownTableID = null;
}
HintMessage(UUID hostId, TableId unknownTableID)
{
this.hostId = hostId;
this.hint = null;
this.unknownTableID = unknownTableID;
}
public static class Serializer implements IVersionedAsymmetricSerializer<SerializableHintMessage, HintMessage>
{
public long serializedSize(SerializableHintMessage obj, int version)
{
if (obj instanceof HintMessage)
{
HintMessage message = (HintMessage) obj;
Objects.requireNonNull(message.hint); // we should never *send* a HintMessage with null hint
long size = UUIDSerializer.serializer.serializedSize(message.hostId, version);
long hintSize = Hint.serializer.serializedSize(message.hint, version);
size += TypeSizes.sizeofUnsignedVInt(hintSize);
size += hintSize;
return size;
}
else if (obj instanceof Encoded)
{
Encoded message = (Encoded) obj;
if (version != message.version)
throw new IllegalArgumentException("serializedSize() called with non-matching version " + version);
long size = UUIDSerializer.serializer.serializedSize(message.hostId, version);
size += TypeSizes.sizeofUnsignedVInt(message.hint.remaining());
size += message.hint.remaining();
return size;
}
else
{
throw new IllegalStateException("Unexpected type: " + obj);
}
}
public void serialize(SerializableHintMessage obj, DataOutputPlus out, int version) throws IOException
{
if (obj instanceof HintMessage)
{
HintMessage message = (HintMessage) obj;
Objects.requireNonNull(message.hint); // we should never *send* a HintMessage with null hint
UUIDSerializer.serializer.serialize(message.hostId, out, version);
/*
* We are serializing the hint size so that the receiver of the message could gracefully handle
* deserialize failure when a table had been dropped, by simply skipping the unread bytes.
*/
out.writeUnsignedVInt(Hint.serializer.serializedSize(message.hint, version));
Hint.serializer.serialize(message.hint, out, version);
}
else if (obj instanceof Encoded)
{
Encoded message = (Encoded) obj;
if (version != message.version)
throw new IllegalArgumentException("serialize() called with non-matching version " + version);
UUIDSerializer.serializer.serialize(message.hostId, out, version);
out.writeUnsignedVInt(message.hint.remaining());
out.write(message.hint);
}
else
{
throw new IllegalStateException("Unexpected type: " + obj);
}
}
/*
* It's not an exceptional scenario to have a hints file streamed that have partition updates for tables
* that don't exist anymore. We want to handle that case gracefully instead of dropping the connection for every
* one of them.
*/
public HintMessage deserialize(DataInputPlus in, int version) throws IOException
{
UUID hostId = UUIDSerializer.serializer.deserialize(in, version);
long hintSize = in.readUnsignedVInt();
TrackedDataInputPlus countingIn = new TrackedDataInputPlus(in);
try
{
return new HintMessage(hostId, Hint.serializer.deserialize(countingIn, version));
}
catch (UnknownTableException e)
{
in.skipBytes(Ints.checkedCast(hintSize - countingIn.getBytesRead()));
return new HintMessage(hostId, e.id);
}
}
}
/**
* A specialized version of {@link HintMessage} that takes an already encoded in a bytebuffer hint and sends it verbatim.
*
* An optimization for when dispatching a hint file of the current messaging version to a node of the same messaging version,
* which is the most common case. Saves on extra ByteBuffer allocations one redundant hint deserialization-serialization cycle.
*
* Never deserialized as an HintMessage.Encoded - the receiving side will always deserialize the message as vanilla
* {@link HintMessage}.
*/
static final class Encoded implements SerializableHintMessage
{
private final UUID hostId;
private final ByteBuffer hint;
private final int version;
Encoded(UUID hostId, ByteBuffer hint, int version)
{
this.hostId = hostId;
this.hint = hint;
this.version = version;
}
public long getHintCreationTime()
{
return Hint.serializer.getHintCreationTime(hint, version);
}
}
}