| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.cassandra.db.streaming; |
| |
| import java.io.IOException; |
| import java.nio.ByteBuffer; |
| import java.util.ArrayList; |
| import java.util.List; |
| import java.util.Objects; |
| import java.util.function.Function; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| |
| import org.apache.cassandra.db.ColumnFamilyStore; |
| import org.apache.cassandra.db.DecoratedKey; |
| import org.apache.cassandra.db.SerializationHeader; |
| import org.apache.cassandra.db.TypeSizes; |
| import org.apache.cassandra.dht.IPartitioner; |
| import org.apache.cassandra.io.IVersionedSerializer; |
| import org.apache.cassandra.io.sstable.format.SSTableFormat; |
| import org.apache.cassandra.io.sstable.format.SSTableReader; |
| import org.apache.cassandra.io.sstable.format.Version; |
| import org.apache.cassandra.io.util.DataInputPlus; |
| import org.apache.cassandra.io.util.DataOutputPlus; |
| import org.apache.cassandra.schema.TableId; |
| import org.apache.cassandra.utils.ByteBufferUtil; |
| |
| import static com.google.common.base.Preconditions.checkNotNull; |
| |
| public class CassandraStreamHeader |
| { |
| /** SSTable version */ |
| public final Version version; |
| |
| /** SSTable format **/ |
| public final SSTableFormat.Type format; |
| public final long estimatedKeys; |
| public final List<SSTableReader.PartitionPositionBounds> sections; |
| public final CompressionInfo compressionInfo; |
| public final int sstableLevel; |
| public final SerializationHeader.Component serializationHeader; |
| |
| /* flag indicating whether this is a partial or entire sstable transfer */ |
| public final boolean isEntireSSTable; |
| /* first token of the sstable required for faster streaming */ |
| public final DecoratedKey firstKey; |
| public final TableId tableId; |
| public final ComponentManifest componentManifest; |
| |
| /* cached size value */ |
| private final long size; |
| |
| private CassandraStreamHeader(Builder builder) |
| { |
| version = builder.version; |
| format = builder.format; |
| estimatedKeys = builder.estimatedKeys; |
| sections = builder.sections; |
| compressionInfo = builder.compressionInfo; |
| sstableLevel = builder.sstableLevel; |
| serializationHeader = builder.serializationHeader; |
| tableId = builder.tableId; |
| isEntireSSTable = builder.isEntireSSTable; |
| componentManifest = builder.componentManifest; |
| firstKey = builder.firstKey; |
| size = calculateSize(); |
| } |
| |
| public static Builder builder() |
| { |
| return new Builder(); |
| } |
| |
| public boolean isCompressed() |
| { |
| return compressionInfo != null; |
| } |
| |
| /** |
| * @return total file size to transfer in bytes |
| */ |
| public long size() |
| { |
| return size; |
| } |
| |
| @VisibleForTesting |
| public long calculateSize() |
| { |
| if (isEntireSSTable) |
| return componentManifest.totalSize(); |
| |
| if (compressionInfo != null) |
| return compressionInfo.getTotalSize(); |
| |
| long transferSize = 0; |
| for (SSTableReader.PartitionPositionBounds section : sections) |
| transferSize += section.upperPosition - section.lowerPosition; |
| return transferSize; |
| } |
| |
| @Override |
| public boolean equals(Object o) |
| { |
| if (this == o) return true; |
| if (o == null || getClass() != o.getClass()) return false; |
| CassandraStreamHeader that = (CassandraStreamHeader) o; |
| return estimatedKeys == that.estimatedKeys && |
| sstableLevel == that.sstableLevel && |
| isEntireSSTable == that.isEntireSSTable && |
| Objects.equals(version, that.version) && |
| format == that.format && |
| Objects.equals(sections, that.sections) && |
| Objects.equals(compressionInfo, that.compressionInfo) && |
| Objects.equals(serializationHeader, that.serializationHeader) && |
| Objects.equals(componentManifest, that.componentManifest) && |
| Objects.equals(firstKey, that.firstKey) && |
| Objects.equals(tableId, that.tableId); |
| } |
| |
| @Override |
| public int hashCode() |
| { |
| return Objects.hash(version, format, estimatedKeys, sections, compressionInfo, sstableLevel, serializationHeader, componentManifest, |
| isEntireSSTable, firstKey, tableId); |
| } |
| |
| @Override |
| public String toString() |
| { |
| return "CassandraStreamHeader{" + |
| "version=" + version + |
| ", format=" + format + |
| ", estimatedKeys=" + estimatedKeys + |
| ", sections=" + sections + |
| ", sstableLevel=" + sstableLevel + |
| ", header=" + serializationHeader + |
| ", isEntireSSTable=" + isEntireSSTable + |
| ", firstKey=" + firstKey + |
| ", tableId=" + tableId + |
| '}'; |
| } |
| |
| public static final IVersionedSerializer<CassandraStreamHeader> serializer = new CassandraStreamHeaderSerializer(); |
| |
| public static class CassandraStreamHeaderSerializer implements IVersionedSerializer<CassandraStreamHeader> |
| { |
| public void serialize(CassandraStreamHeader header, DataOutputPlus out, int version) throws IOException |
| { |
| out.writeUTF(header.version.toString()); |
| out.writeUTF(header.format.name); |
| |
| out.writeLong(header.estimatedKeys); |
| out.writeInt(header.sections.size()); |
| for (SSTableReader.PartitionPositionBounds section : header.sections) |
| { |
| out.writeLong(section.lowerPosition); |
| out.writeLong(section.upperPosition); |
| } |
| CompressionInfo.serializer.serialize(header.compressionInfo, out, version); |
| out.writeInt(header.sstableLevel); |
| |
| SerializationHeader.serializer.serialize(header.version, header.serializationHeader, out); |
| |
| header.tableId.serialize(out); |
| out.writeBoolean(header.isEntireSSTable); |
| |
| if (header.isEntireSSTable) |
| { |
| ComponentManifest.serializer.serialize(header.componentManifest, out, version); |
| ByteBufferUtil.writeWithVIntLength(header.firstKey.getKey(), out); |
| } |
| } |
| |
| public CassandraStreamHeader deserialize(DataInputPlus in, int version) throws IOException |
| { |
| return deserialize(in, version, tableId -> { |
| ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(tableId); |
| if (cfs != null) |
| return cfs.getPartitioner(); |
| |
| return null; |
| }); |
| } |
| |
| @VisibleForTesting |
| public CassandraStreamHeader deserialize(DataInputPlus in, int version, Function<TableId, IPartitioner> partitionerMapper) throws IOException |
| { |
| Version sstableVersion = SSTableFormat.Type.current().info.getVersion(in.readUTF()); |
| SSTableFormat.Type format = SSTableFormat.Type.validate(in.readUTF()); |
| |
| long estimatedKeys = in.readLong(); |
| int count = in.readInt(); |
| List<SSTableReader.PartitionPositionBounds> sections = new ArrayList<>(count); |
| for (int k = 0; k < count; k++) |
| sections.add(new SSTableReader.PartitionPositionBounds(in.readLong(), in.readLong())); |
| CompressionInfo compressionInfo = CompressionInfo.serializer.deserialize(in, version); |
| int sstableLevel = in.readInt(); |
| |
| SerializationHeader.Component header = SerializationHeader.serializer.deserialize(sstableVersion, in); |
| |
| TableId tableId = TableId.deserialize(in); |
| boolean isEntireSSTable = in.readBoolean(); |
| ComponentManifest manifest = null; |
| DecoratedKey firstKey = null; |
| |
| if (isEntireSSTable) |
| { |
| manifest = ComponentManifest.serializer.deserialize(in, version); |
| ByteBuffer keyBuf = ByteBufferUtil.readWithVIntLength(in); |
| IPartitioner partitioner = partitionerMapper.apply(tableId); |
| if (partitioner == null) |
| throw new IllegalArgumentException(String.format("Could not determine partitioner for tableId %s", tableId)); |
| firstKey = partitioner.decorateKey(keyBuf); |
| } |
| |
| return builder().withSSTableFormat(format) |
| .withSSTableVersion(sstableVersion) |
| .withSSTableLevel(sstableLevel) |
| .withEstimatedKeys(estimatedKeys) |
| .withSections(sections) |
| .withCompressionInfo(compressionInfo) |
| .withSerializationHeader(header) |
| .withComponentManifest(manifest) |
| .isEntireSSTable(isEntireSSTable) |
| .withFirstKey(firstKey) |
| .withTableId(tableId) |
| .build(); |
| } |
| |
| public long serializedSize(CassandraStreamHeader header, int version) |
| { |
| long size = 0; |
| size += TypeSizes.sizeof(header.version.toString()); |
| size += TypeSizes.sizeof(header.format.name); |
| size += TypeSizes.sizeof(header.estimatedKeys); |
| |
| size += TypeSizes.sizeof(header.sections.size()); |
| for (SSTableReader.PartitionPositionBounds section : header.sections) |
| { |
| size += TypeSizes.sizeof(section.lowerPosition); |
| size += TypeSizes.sizeof(section.upperPosition); |
| } |
| |
| size += CompressionInfo.serializer.serializedSize(header.compressionInfo, version); |
| size += TypeSizes.sizeof(header.sstableLevel); |
| |
| size += SerializationHeader.serializer.serializedSize(header.version, header.serializationHeader); |
| |
| size += header.tableId.serializedSize(); |
| size += TypeSizes.sizeof(header.isEntireSSTable); |
| |
| if (header.isEntireSSTable) |
| { |
| size += ComponentManifest.serializer.serializedSize(header.componentManifest, version); |
| size += ByteBufferUtil.serializedSizeWithVIntLength(header.firstKey.getKey()); |
| } |
| return size; |
| } |
| } |
| |
| public static final class Builder |
| { |
| private Version version; |
| private SSTableFormat.Type format; |
| private long estimatedKeys; |
| private List<SSTableReader.PartitionPositionBounds> sections; |
| private CompressionInfo compressionInfo; |
| private int sstableLevel; |
| private SerializationHeader.Component serializationHeader; |
| private ComponentManifest componentManifest; |
| private boolean isEntireSSTable; |
| private DecoratedKey firstKey; |
| private TableId tableId; |
| |
| public Builder withSSTableFormat(SSTableFormat.Type format) |
| { |
| this.format = format; |
| return this; |
| } |
| |
| public Builder withSSTableVersion(Version version) |
| { |
| this.version = version; |
| return this; |
| } |
| |
| public Builder withSSTableLevel(int sstableLevel) |
| { |
| this.sstableLevel = sstableLevel; |
| return this; |
| } |
| |
| public Builder withEstimatedKeys(long estimatedKeys) |
| { |
| this.estimatedKeys = estimatedKeys; |
| return this; |
| } |
| |
| public Builder withSections(List<SSTableReader.PartitionPositionBounds> sections) |
| { |
| this.sections = sections; |
| return this; |
| } |
| |
| public Builder withCompressionInfo(CompressionInfo compressionInfo) |
| { |
| this.compressionInfo = compressionInfo; |
| return this; |
| } |
| |
| public Builder withSerializationHeader(SerializationHeader.Component header) |
| { |
| this.serializationHeader = header; |
| return this; |
| } |
| |
| public Builder withTableId(TableId tableId) |
| { |
| this.tableId = tableId; |
| return this; |
| } |
| |
| public Builder isEntireSSTable(boolean isEntireSSTable) |
| { |
| this.isEntireSSTable = isEntireSSTable; |
| return this; |
| } |
| |
| public Builder withComponentManifest(ComponentManifest componentManifest) |
| { |
| this.componentManifest = componentManifest; |
| return this; |
| } |
| |
| public Builder withFirstKey(DecoratedKey firstKey) |
| { |
| this.firstKey = firstKey; |
| return this; |
| } |
| |
| public CassandraStreamHeader build() |
| { |
| checkNotNull(version); |
| checkNotNull(format); |
| checkNotNull(sections); |
| checkNotNull(serializationHeader); |
| checkNotNull(tableId); |
| |
| if (isEntireSSTable) |
| { |
| checkNotNull(componentManifest); |
| checkNotNull(firstKey); |
| } |
| |
| return new CassandraStreamHeader(this); |
| } |
| } |
| } |