| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.cassandra.db.streaming; |
| |
| import java.io.IOException; |
| import java.nio.ByteBuffer; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.List; |
| |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import org.apache.cassandra.io.compress.CompressionMetadata; |
| import org.apache.cassandra.io.sstable.Component; |
| import org.apache.cassandra.io.sstable.format.SSTableReader; |
| import org.apache.cassandra.io.util.ChannelProxy; |
| import org.apache.cassandra.io.util.DataOutputStreamPlus; |
| import org.apache.cassandra.net.AsyncStreamingOutputPlus; |
| import org.apache.cassandra.streaming.ProgressInfo; |
| import org.apache.cassandra.streaming.StreamSession; |
| import org.apache.cassandra.utils.FBUtilities; |
| |
| /** |
| * CassandraStreamWriter for compressed SSTable. |
| */ |
| public class CassandraCompressedStreamWriter extends CassandraStreamWriter |
| { |
| private static final int CHUNK_SIZE = 1 << 16; |
| private static final int CRC_LENGTH = 4; |
| |
| private static final Logger logger = LoggerFactory.getLogger(CassandraCompressedStreamWriter.class); |
| |
| private final CompressionInfo compressionInfo; |
| private final long totalSize; |
| |
| public CassandraCompressedStreamWriter(SSTableReader sstable, CassandraStreamHeader header, StreamSession session) |
| { |
| super(sstable, header, session); |
| this.compressionInfo = header.compressionInfo; |
| this.totalSize = header.size(); |
| } |
| |
| @Override |
| public void write(DataOutputStreamPlus output) throws IOException |
| { |
| AsyncStreamingOutputPlus out = (AsyncStreamingOutputPlus) output; |
| long totalSize = totalSize(); |
| logger.debug("[Stream #{}] Start streaming file {} to {}, repairedAt = {}, totalSize = {}", session.planId(), |
| sstable.getFilename(), session.peer, sstable.getSSTableMetadata().repairedAt, totalSize); |
| try (ChannelProxy fc = sstable.getDataChannel().newChannel()) |
| { |
| long progress = 0L; |
| |
| // we want to send continuous chunks together to minimise reads from disk and network writes |
| List<Section> sections = fuseAdjacentChunks(compressionInfo.chunks()); |
| |
| int sectionIdx = 0; |
| |
| // stream each of the required sections of the file |
| for (Section section : sections) |
| { |
| // length of the section to stream |
| long length = section.end - section.start; |
| |
| logger.debug("[Stream #{}] Writing section {} with length {} to stream.", session.planId(), sectionIdx++, length); |
| |
| // tracks write progress |
| long bytesTransferred = 0; |
| while (bytesTransferred < length) |
| { |
| int toTransfer = (int) Math.min(CHUNK_SIZE, length - bytesTransferred); |
| long position = section.start + bytesTransferred; |
| |
| out.writeToChannel(bufferSupplier -> { |
| ByteBuffer outBuffer = bufferSupplier.get(toTransfer); |
| long read = fc.read(outBuffer, position); |
| assert read == toTransfer : String.format("could not read required number of bytes from file to be streamed: read %d bytes, wanted %d bytes", read, toTransfer); |
| outBuffer.flip(); |
| }, limiter); |
| |
| bytesTransferred += toTransfer; |
| progress += toTransfer; |
| session.progress(sstable.descriptor.filenameFor(Component.DATA), ProgressInfo.Direction.OUT, progress, totalSize); |
| } |
| } |
| logger.debug("[Stream #{}] Finished streaming file {} to {}, bytesTransferred = {}, totalSize = {}", |
| session.planId(), sstable.getFilename(), session.peer, FBUtilities.prettyPrintMemory(progress), FBUtilities.prettyPrintMemory(totalSize)); |
| } |
| } |
| |
| @Override |
| protected long totalSize() |
| { |
| return totalSize; |
| } |
| |
| // chunks are assumed to be sorted by offset |
| private List<Section> fuseAdjacentChunks(CompressionMetadata.Chunk[] chunks) |
| { |
| if (chunks.length == 0) |
| return Collections.emptyList(); |
| |
| long start = chunks[0].offset; |
| long end = start + chunks[0].length + CRC_LENGTH; |
| |
| List<Section> sections = new ArrayList<>(); |
| |
| for (int i = 1; i < chunks.length; i++) |
| { |
| CompressionMetadata.Chunk chunk = chunks[i]; |
| |
| if (chunk.offset == end) |
| { |
| end += (chunk.length + CRC_LENGTH); |
| } |
| else |
| { |
| sections.add(new Section(start, end)); |
| |
| start = chunk.offset; |
| end = start + chunk.length + CRC_LENGTH; |
| } |
| } |
| sections.add(new Section(start, end)); |
| |
| return sections; |
| } |
| |
| // [start, end) positions in the compressed sstable file that we want to stream; |
| // each section contains 1..n adjacent compressed chunks in it. |
| private static class Section |
| { |
| private final long start; |
| private final long end; |
| |
| private Section(long start, long end) |
| { |
| this.start = start; |
| this.end = end; |
| } |
| } |
| } |