| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.cassandra.net; |
| |
| import java.io.EOFException; |
| import java.io.IOException; |
| import java.nio.ByteBuffer; |
| import java.util.Objects; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| |
| import io.netty.buffer.ByteBuf; |
| import io.netty.buffer.ByteBufAllocator; |
| import org.apache.cassandra.config.DatabaseDescriptor; |
| import org.apache.cassandra.io.compress.BufferType; |
| import org.apache.cassandra.io.util.DataInputBuffer; |
| import org.apache.cassandra.io.util.DataInputPlus; |
| import org.apache.cassandra.io.util.DataOutputBufferFixed; |
| import org.apache.cassandra.locator.InetAddressAndPort; |
| import org.apache.cassandra.utils.memory.BufferPools; |
| |
| import static java.util.concurrent.TimeUnit.MILLISECONDS; |
| import static org.apache.cassandra.locator.InetAddressAndPort.Serializer.inetAddressAndPortSerializer; |
| import static org.apache.cassandra.net.MessagingService.VERSION_30; |
| import static org.apache.cassandra.net.MessagingService.VERSION_40; |
| import static org.apache.cassandra.net.Message.validateLegacyProtocolMagic; |
| import static org.apache.cassandra.net.Crc.*; |
| import static org.apache.cassandra.net.Crc.computeCrc32; |
| import static org.apache.cassandra.net.OutboundConnectionSettings.*; |
| |
| /** |
| * Messages for the handshake phase of the internode protocol. |
| * |
| * The modern handshake is composed of 2 messages: Initiate and Accept |
| * <p> |
| * The legacy handshake is composed of 3 messages, the first being sent by the initiator of the connection. The other |
| * side then answer with the 2nd message. At that point, if a version mismatch is detected by the connection initiator, |
| * it will simply disconnect and reconnect with a more appropriate version. But if the version is acceptable, the connection |
| * initiator sends the third message of the protocol, after which it considers the connection ready. |
| */ |
| class HandshakeProtocol |
| { |
| static final long TIMEOUT_MILLIS = 3 * DatabaseDescriptor.getRpcTimeout(MILLISECONDS); |
| |
| /** |
| * The initial message sent when a node creates a new connection to a remote peer. This message contains: |
| * 1) the {@link Message#PROTOCOL_MAGIC} number (4 bytes). |
| * 2) the connection flags (4 bytes), which encodes: |
| * - the version the initiator thinks should be used for the connection (in practice, either the initiator |
| * version if it's the first time we connect to that remote since startup, or the last version known for that |
| * peer otherwise). |
| * - the "mode" of the connection: whether it is for streaming or for messaging. |
| * - whether compression should be used or not (if it is, compression is enabled _after_ the last message of the |
| * handshake has been sent). |
| * 3) the connection initiator's broadcast address |
| * 4) a CRC protecting the message from corruption |
| * <p> |
| * More precisely, connection flags: |
| * <pre> |
| * {@code |
| * 1 1 1 1 1 1 1 1 1 1 2 2 2 2 2 2 2 2 2 2 3 3 |
| * 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 |
| * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ |
| * |C C C M C | REQUEST | MIN | MAX | |
| * |A A M O R | VERSION | SUPPORTED | SUPPORTED | |
| * |T T P D C | (DEPRECATED) | VERSION | VERSION | |
| * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ |
| * } |
| * </pre> |
| * CAT - QOS category, 2 bits: SMALL, LARGE, URGENT, or LEGACY (unset) |
| * CMP - compression enabled bit |
| * MOD - connection mode; if the bit is on, the connection is for streaming; if the bit is off, it is for inter-node messaging. |
| * CRC - crc enabled bit |
| * VERSION - {@link org.apache.cassandra.net.MessagingService#current_version} |
| */ |
| static class Initiate |
| { |
| /** Contains the PROTOCOL_MAGIC (int) and the flags (int). */ |
| private static final int MIN_LENGTH = 8; |
| private static final int MAX_LENGTH = 12 + InetAddressAndPort.Serializer.MAXIMUM_SIZE; |
| |
| @Deprecated // this is ignored by post40 nodes, i.e. if maxMessagingVersion is set |
| final int requestMessagingVersion; |
| // the messagingVersion bounds the sender will accept to initiate a connection; |
| // if the remote peer supports any, the newest supported version will be selected; otherwise the nearest supported version |
| final AcceptVersions acceptVersions; |
| final ConnectionType type; |
| final Framing framing; |
| final InetAddressAndPort from; |
| |
| Initiate(int requestMessagingVersion, AcceptVersions acceptVersions, ConnectionType type, Framing framing, InetAddressAndPort from) |
| { |
| this.requestMessagingVersion = requestMessagingVersion; |
| this.acceptVersions = acceptVersions; |
| this.type = type; |
| this.framing = framing; |
| this.from = from; |
| } |
| |
| @VisibleForTesting |
| int encodeFlags() |
| { |
| int flags = 0; |
| if (type.isMessaging()) |
| flags |= type.twoBitID(); |
| if (type.isStreaming()) |
| flags |= 1 << 3; |
| |
| // framing id is split over 2nd and 4th bits, for backwards compatibility |
| flags |= ((framing.id & 1) << 2) | ((framing.id & 2) << 3); |
| flags |= (requestMessagingVersion << 8); |
| |
| if (requestMessagingVersion < VERSION_40 || acceptVersions.max < VERSION_40) |
| return flags; // for testing, permit serializing as though we are pre40 |
| |
| flags |= (acceptVersions.min << 16); |
| flags |= (acceptVersions.max << 24); |
| return flags; |
| } |
| |
| ByteBuf encode() |
| { |
| ByteBuffer buffer = BufferPools.forNetworking().get(MAX_LENGTH, BufferType.OFF_HEAP); |
| try (DataOutputBufferFixed out = new DataOutputBufferFixed(buffer)) |
| { |
| out.writeInt(Message.PROTOCOL_MAGIC); |
| out.writeInt(encodeFlags()); |
| |
| if (requestMessagingVersion >= VERSION_40 && acceptVersions.max >= VERSION_40) |
| { |
| inetAddressAndPortSerializer.serialize(from, out, requestMessagingVersion); |
| out.writeInt(computeCrc32(buffer, 0, buffer.position())); |
| } |
| buffer.flip(); |
| return GlobalBufferPoolAllocator.wrap(buffer); |
| } |
| catch (IOException e) |
| { |
| throw new IllegalStateException(e); |
| } |
| } |
| |
| static Initiate maybeDecode(ByteBuf buf) throws IOException |
| { |
| if (buf.readableBytes() < MIN_LENGTH) |
| return null; |
| |
| ByteBuffer nio = buf.nioBuffer(); |
| int start = nio.position(); |
| try (DataInputBuffer in = new DataInputBuffer(nio, false)) |
| { |
| validateLegacyProtocolMagic(in.readInt()); |
| int flags = in.readInt(); |
| |
| int requestedMessagingVersion = getBits(flags, 8, 8); |
| int minMessagingVersion = getBits(flags, 16, 8); |
| int maxMessagingVersion = getBits(flags, 24, 8); |
| int framingBits = getBits(flags, 2, 1) | (getBits(flags, 4, 1) << 1); |
| Framing framing = Framing.forId(framingBits); |
| |
| boolean isStream = getBits(flags, 3, 1) == 1; |
| |
| ConnectionType type = isStream |
| ? ConnectionType.STREAMING |
| : ConnectionType.fromId(getBits(flags, 0, 2)); |
| |
| InetAddressAndPort from = null; |
| |
| if (requestedMessagingVersion >= VERSION_40 && maxMessagingVersion >= MessagingService.VERSION_40) |
| { |
| from = inetAddressAndPortSerializer.deserialize(in, requestedMessagingVersion); |
| |
| int computed = computeCrc32(nio, start, nio.position()); |
| int read = in.readInt(); |
| if (read != computed) |
| throw new InvalidCrc(read, computed); |
| } |
| |
| buf.skipBytes(nio.position() - start); |
| return new Initiate(requestedMessagingVersion, |
| minMessagingVersion == 0 && maxMessagingVersion == 0 |
| ? null : new AcceptVersions(minMessagingVersion, maxMessagingVersion), |
| type, framing, from); |
| |
| } |
| catch (EOFException e) |
| { |
| return null; |
| } |
| } |
| |
| @VisibleForTesting |
| @Override |
| public boolean equals(Object other) |
| { |
| if (!(other instanceof Initiate)) |
| return false; |
| |
| Initiate that = (Initiate)other; |
| return this.type == that.type |
| && this.framing == that.framing |
| && this.requestMessagingVersion == that.requestMessagingVersion |
| && Objects.equals(this.acceptVersions, that.acceptVersions); |
| } |
| |
| @Override |
| public String toString() |
| { |
| return String.format("Initiate(request: %d, min: %d, max: %d, type: %s, framing: %b, from: %s)", |
| requestMessagingVersion, |
| acceptVersions == null ? requestMessagingVersion : acceptVersions.min, |
| acceptVersions == null ? requestMessagingVersion : acceptVersions.max, |
| type, framing, from); |
| } |
| } |
| |
| |
| /** |
| * The second message of the handshake, sent by the node receiving the {@link Initiate} back to the |
| * connection initiator. |
| * |
| * This message contains |
| * 1) the messaging version of the peer sending this message |
| * 2) the negotiated messaging version if one could be accepted by both peers, |
| * or if not the closest version that this peer could support to the ones requested |
| * 3) a CRC protectingn the integrity of the message |
| * |
| * Note that the pre40 equivalent of this message contains ONLY the messaging version of the peer. |
| */ |
| static class Accept |
| { |
| /** The messaging version sent by the receiving peer (int). */ |
| private static final int MAX_LENGTH = 12; |
| |
| final int useMessagingVersion; |
| final int maxMessagingVersion; |
| |
| Accept(int useMessagingVersion, int maxMessagingVersion) |
| { |
| this.useMessagingVersion = useMessagingVersion; |
| this.maxMessagingVersion = maxMessagingVersion; |
| } |
| |
| ByteBuf encode(ByteBufAllocator allocator) |
| { |
| ByteBuf buffer = allocator.directBuffer(MAX_LENGTH); |
| buffer.clear(); |
| buffer.writeInt(maxMessagingVersion); |
| buffer.writeInt(useMessagingVersion); |
| buffer.writeInt(computeCrc32(buffer, 0, 8)); |
| return buffer; |
| } |
| |
| /** |
| * Respond to pre40 nodes only with our current messagingVersion |
| */ |
| static ByteBuf respondPre40(int messagingVersion, ByteBufAllocator allocator) |
| { |
| ByteBuf buffer = allocator.directBuffer(4); |
| buffer.clear(); |
| buffer.writeInt(messagingVersion); |
| return buffer; |
| } |
| |
| static Accept maybeDecode(ByteBuf in, int handshakeMessagingVersion) throws InvalidCrc |
| { |
| int readerIndex = in.readerIndex(); |
| if (in.readableBytes() < 4) |
| return null; |
| int maxMessagingVersion = in.readInt(); |
| int useMessagingVersion = 0; |
| |
| // if the other node is pre-4.0, it will respond only with its maxMessagingVersion |
| if (maxMessagingVersion < VERSION_40 || handshakeMessagingVersion < VERSION_40) |
| return new Accept(useMessagingVersion, maxMessagingVersion); |
| |
| if (in.readableBytes() < 8) |
| { |
| in.readerIndex(readerIndex); |
| return null; |
| } |
| useMessagingVersion = in.readInt(); |
| |
| // verify crc |
| int computed = computeCrc32(in, readerIndex, readerIndex + 8); |
| int read = in.readInt(); |
| if (read != computed) |
| throw new InvalidCrc(read, computed); |
| |
| return new Accept(useMessagingVersion, maxMessagingVersion); |
| } |
| |
| @VisibleForTesting |
| @Override |
| public boolean equals(Object other) |
| { |
| return other instanceof Accept |
| && this.useMessagingVersion == ((Accept) other).useMessagingVersion |
| && this.maxMessagingVersion == ((Accept) other).maxMessagingVersion; |
| } |
| |
| @Override |
| public String toString() |
| { |
| return String.format("Accept(use: %d, max: %d)", useMessagingVersion, maxMessagingVersion); |
| } |
| } |
| |
| /** |
| * The third message of the handshake, sent by pre40 nodes on reception of {@link Accept}. |
| * This message contains: |
| * 1) The connection initiator's {@link org.apache.cassandra.net.MessagingService#current_version} (4 bytes). |
| * This indicates the max messaging version supported by this node. |
| * 2) The connection initiator's broadcast address as encoded by {@link InetAddressAndPort.Serializer}. |
| * This can be either 7 bytes for an IPv4 address, or 19 bytes for an IPv6 one, post40. |
| * This can be either 5 bytes for an IPv4 address, or 17 bytes for an IPv6 one, pre40. |
| * <p> |
| * This message concludes the legacy handshake protocol. |
| */ |
| static class ConfirmOutboundPre40 |
| { |
| private static final int MAX_LENGTH = 4 + InetAddressAndPort.Serializer.MAXIMUM_SIZE; |
| |
| final int maxMessagingVersion; |
| final InetAddressAndPort from; |
| |
| ConfirmOutboundPre40(int maxMessagingVersion, InetAddressAndPort from) |
| { |
| this.maxMessagingVersion = maxMessagingVersion; |
| this.from = from; |
| } |
| |
| ByteBuf encode() |
| { |
| ByteBuffer buffer = BufferPools.forNetworking().get(MAX_LENGTH, BufferType.OFF_HEAP); |
| try (DataOutputBufferFixed out = new DataOutputBufferFixed(buffer)) |
| { |
| out.writeInt(maxMessagingVersion); |
| // pre-4.0 nodes should only receive the address, never port, and it's ok to hardcode VERSION_30 |
| inetAddressAndPortSerializer.serialize(from, out, VERSION_30); |
| buffer.flip(); |
| return GlobalBufferPoolAllocator.wrap(buffer); |
| } |
| catch (IOException e) |
| { |
| throw new IllegalStateException(e); |
| } |
| } |
| |
| @SuppressWarnings("resource") |
| static ConfirmOutboundPre40 maybeDecode(ByteBuf in) |
| { |
| ByteBuffer nio = in.nioBuffer(); |
| int start = nio.position(); |
| DataInputPlus input = new DataInputBuffer(nio, false); |
| try |
| { |
| int version = input.readInt(); |
| InetAddressAndPort address = inetAddressAndPortSerializer.deserialize(input, version); |
| in.skipBytes(nio.position() - start); |
| return new ConfirmOutboundPre40(version, address); |
| } |
| catch (EOFException e) |
| { |
| // makes the assumption we didn't have enough bytes to deserialize an IPv6 address, |
| // as we only check the MIN_LENGTH of the buf. |
| return null; |
| } |
| catch (IOException e) |
| { |
| throw new IllegalStateException(e); |
| } |
| } |
| |
| @VisibleForTesting |
| @Override |
| public boolean equals(Object other) |
| { |
| if (!(other instanceof ConfirmOutboundPre40)) |
| return false; |
| |
| ConfirmOutboundPre40 that = (ConfirmOutboundPre40) other; |
| return this.maxMessagingVersion == that.maxMessagingVersion |
| && Objects.equals(this.from, that.from); |
| } |
| |
| @Override |
| public String toString() |
| { |
| return String.format("ConfirmOutboundPre40(maxMessagingVersion: %d; address: %s)", maxMessagingVersion, from); |
| } |
| } |
| |
| private static int getBits(int packed, int start, int count) |
| { |
| return (packed >>> start) & ~(-1 << count); |
| } |
| |
| } |