blob: 4068be732b18cd749d5d6ef88fe605871600a730 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.io.compress;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.util.Optional;
import java.util.zip.CRC32;
import org.apache.cassandra.io.FSReadError;
import org.apache.cassandra.io.FSWriteError;
import org.apache.cassandra.io.sstable.CorruptSSTableException;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
import org.apache.cassandra.io.util.*;
import org.apache.cassandra.schema.CompressionParams;
import static org.apache.cassandra.utils.Throwables.merge;
public class CompressedSequentialWriter extends SequentialWriter
{
private final ChecksumWriter crcMetadata;
// holds offset in the file where current chunk should be written
// changed only by flush() method where data buffer gets compressed and stored to the file
private long chunkOffset = 0;
// index file writer (random I/O)
private final CompressionMetadata.Writer metadataWriter;
private final ICompressor compressor;
// used to store compressed data
private ByteBuffer compressed;
// holds a number of already written chunks
private int chunkCount = 0;
private long uncompressedSize = 0, compressedSize = 0;
private final MetadataCollector sstableMetadataCollector;
private final ByteBuffer crcCheckBuffer = ByteBuffer.allocate(4);
private final Optional<File> digestFile;
/**
* Create CompressedSequentialWriter without digest file.
*
* @param file File to write
* @param offsetsPath File name to write compression metadata
* @param digestFile File to write digest
* @param option Write option (buffer size and type will be set the same as compression params)
* @param parameters Compression mparameters
* @param sstableMetadataCollector Metadata collector
*/
public CompressedSequentialWriter(File file,
String offsetsPath,
File digestFile,
SequentialWriterOption option,
CompressionParams parameters,
MetadataCollector sstableMetadataCollector)
{
super(file, SequentialWriterOption.newBuilder()
.bufferSize(option.bufferSize())
.bufferType(option.bufferType())
.bufferSize(parameters.chunkLength())
.bufferType(parameters.getSstableCompressor().preferredBufferType())
.finishOnClose(option.finishOnClose())
.build());
this.compressor = parameters.getSstableCompressor();
this.digestFile = Optional.ofNullable(digestFile);
// buffer for compression should be the same size as buffer itself
compressed = compressor.preferredBufferType().allocate(compressor.initialCompressedBufferLength(buffer.capacity()));
/* Index File (-CompressionInfo.db component) and it's header */
metadataWriter = CompressionMetadata.Writer.open(parameters, offsetsPath);
this.sstableMetadataCollector = sstableMetadataCollector;
crcMetadata = new ChecksumWriter(new DataOutputStream(Channels.newOutputStream(channel)));
}
@Override
public long getOnDiskFilePointer()
{
try
{
return fchannel.position();
}
catch (IOException e)
{
throw new FSReadError(e, getPath());
}
}
/**
* Get a quick estimation on how many bytes have been written to disk
*
* It should for the most part be exactly the same as getOnDiskFilePointer()
*/
@Override
public long getEstimatedOnDiskBytesWritten()
{
return chunkOffset;
}
@Override
public void flush()
{
throw new UnsupportedOperationException();
}
@Override
protected void flushData()
{
seekToChunkStart(); // why is this necessary? seems like it should always be at chunk start in normal operation
try
{
// compressing data with buffer re-use
buffer.flip();
compressed.clear();
compressor.compress(buffer, compressed);
}
catch (IOException e)
{
throw new RuntimeException("Compression exception", e); // shouldn't happen
}
int compressedLength = compressed.position();
uncompressedSize += buffer.position();
compressedSize += compressedLength;
try
{
// write an offset of the newly written chunk to the index file
metadataWriter.addOffset(chunkOffset);
chunkCount++;
// write out the compressed data
compressed.flip();
channel.write(compressed);
// write corresponding checksum
compressed.rewind();
crcMetadata.appendDirect(compressed, true);
lastFlushOffset = uncompressedSize;
}
catch (IOException e)
{
throw new FSWriteError(e, getPath());
}
// next chunk should be written right after current + length of the checksum (int)
chunkOffset += compressedLength + 4;
if (runPostFlush != null)
runPostFlush.run();
}
public CompressionMetadata open(long overrideLength)
{
if (overrideLength <= 0)
overrideLength = uncompressedSize;
return metadataWriter.open(overrideLength, chunkOffset);
}
@Override
public DataPosition mark()
{
if (!buffer.hasRemaining())
doFlush(0);
return new CompressedFileWriterMark(chunkOffset, current(), buffer.position(), chunkCount + 1);
}
@Override
public synchronized void resetAndTruncate(DataPosition mark)
{
assert mark instanceof CompressedFileWriterMark;
CompressedFileWriterMark realMark = (CompressedFileWriterMark) mark;
// reset position
long truncateTarget = realMark.uncDataOffset;
if (realMark.chunkOffset == chunkOffset)
{
// simply drop bytes to the right of our mark
buffer.position(realMark.validBufferBytes);
return;
}
// synchronize current buffer with disk - we don't want any data loss
syncInternal();
chunkOffset = realMark.chunkOffset;
// compressed chunk size (- 4 bytes reserved for checksum)
int chunkSize = (int) (metadataWriter.chunkOffsetBy(realMark.nextChunkIndex) - chunkOffset - 4);
if (compressed.capacity() < chunkSize)
{
FileUtils.clean(compressed);
compressed = compressor.preferredBufferType().allocate(chunkSize);
}
try
{
compressed.clear();
compressed.limit(chunkSize);
fchannel.position(chunkOffset);
fchannel.read(compressed);
try
{
// Repopulate buffer from compressed data
buffer.clear();
compressed.flip();
compressor.uncompress(compressed, buffer);
}
catch (IOException e)
{
throw new CorruptBlockException(getPath(), chunkOffset, chunkSize, e);
}
CRC32 checksum = new CRC32();
compressed.rewind();
checksum.update(compressed);
crcCheckBuffer.clear();
fchannel.read(crcCheckBuffer);
crcCheckBuffer.flip();
if (crcCheckBuffer.getInt() != (int) checksum.getValue())
throw new CorruptBlockException(getPath(), chunkOffset, chunkSize);
}
catch (CorruptBlockException e)
{
throw new CorruptSSTableException(e, getPath());
}
catch (EOFException e)
{
throw new CorruptSSTableException(new CorruptBlockException(getPath(), chunkOffset, chunkSize), getPath());
}
catch (IOException e)
{
throw new FSReadError(e, getPath());
}
// Mark as dirty so we can guarantee the newly buffered bytes won't be lost on a rebuffer
buffer.position(realMark.validBufferBytes);
bufferOffset = truncateTarget - buffer.position();
chunkCount = realMark.nextChunkIndex - 1;
// truncate data and index file
truncate(chunkOffset, bufferOffset);
metadataWriter.resetAndTruncate(realMark.nextChunkIndex - 1);
}
private void truncate(long toFileSize, long toBufferOffset)
{
try
{
fchannel.truncate(toFileSize);
lastFlushOffset = toBufferOffset;
}
catch (IOException e)
{
throw new FSWriteError(e, getPath());
}
}
/**
* Seek to the offset where next compressed data chunk should be stored.
*/
private void seekToChunkStart()
{
if (getOnDiskFilePointer() != chunkOffset)
{
try
{
fchannel.position(chunkOffset);
}
catch (IOException e)
{
throw new FSReadError(e, getPath());
}
}
}
protected class TransactionalProxy extends SequentialWriter.TransactionalProxy
{
@Override
protected Throwable doCommit(Throwable accumulate)
{
return super.doCommit(metadataWriter.commit(accumulate));
}
@Override
protected Throwable doAbort(Throwable accumulate)
{
return super.doAbort(metadataWriter.abort(accumulate));
}
@Override
protected void doPrepare()
{
syncInternal();
digestFile.ifPresent(crcMetadata::writeFullChecksum);
sstableMetadataCollector.addCompressionRatio(compressedSize, uncompressedSize);
metadataWriter.finalizeLength(current(), chunkCount).prepareToCommit();
}
@Override
protected Throwable doPreCleanup(Throwable accumulate)
{
accumulate = super.doPreCleanup(accumulate);
if (compressed != null)
{
try { FileUtils.clean(compressed); }
catch (Throwable t) { accumulate = merge(accumulate, t); }
compressed = null;
}
return accumulate;
}
}
@Override
protected SequentialWriter.TransactionalProxy txnProxy()
{
return new TransactionalProxy();
}
/**
* Class to hold a mark to the position of the file
*/
protected static class CompressedFileWriterMark implements DataPosition
{
// chunk offset in the compressed file
final long chunkOffset;
// uncompressed data offset (real data offset)
final long uncDataOffset;
final int validBufferBytes;
final int nextChunkIndex;
public CompressedFileWriterMark(long chunkOffset, long uncDataOffset, int validBufferBytes, int nextChunkIndex)
{
this.chunkOffset = chunkOffset;
this.uncDataOffset = uncDataOffset;
this.validBufferBytes = validBufferBytes;
this.nextChunkIndex = nextChunkIndex;
}
}
}