blob: d5e61137d8420fa3a714bba4a8471d1fffc83568 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.db.commitlog;
import java.nio.ByteBuffer;
import org.apache.cassandra.io.FSWriteError;
import org.apache.cassandra.io.compress.ICompressor;
/**
* Compressed commit log segment. Provides an in-memory buffer for the mutation threads. On sync compresses the written
* section of the buffer and writes it to the destination channel.
*
* The format of the compressed commit log is as follows:
* - standard commit log header (as written by {@link CommitLogDescriptor#writeHeader(ByteBuffer, CommitLogDescriptor)})
* - a series of 'sync segments' that are written every time the commit log is sync()'ed
* -- a sync section header, see {@link CommitLogSegment#writeSyncMarker(long, ByteBuffer, int, int, int)}
* -- total plain text length for this section
* -- a block of compressed data
*/
public class CompressedSegment extends FileDirectSegment
{
static final int COMPRESSED_MARKER_SIZE = SYNC_MARKER_SIZE + 4;
final ICompressor compressor;
/**
* Constructs a new segment file.
*/
CompressedSegment(CommitLog commitLog, AbstractCommitLogSegmentManager manager)
{
super(commitLog, manager);
this.compressor = commitLog.configuration.getCompressor();
manager.getBufferPool().setPreferredReusableBufferType(compressor.preferredBufferType());
}
ByteBuffer createBuffer(CommitLog commitLog)
{
return manager.getBufferPool().createBuffer(commitLog.configuration.getCompressor().preferredBufferType());
}
@Override
void write(int startMarker, int nextMarker)
{
int contentStart = startMarker + SYNC_MARKER_SIZE;
int length = nextMarker - contentStart;
// The length may be 0 when the segment is being closed.
assert length > 0 || length == 0 && !isStillAllocating();
try
{
int neededBufferSize = compressor.initialCompressedBufferLength(length) + COMPRESSED_MARKER_SIZE;
ByteBuffer compressedBuffer = manager.getBufferPool().getThreadLocalReusableBuffer(neededBufferSize);
ByteBuffer inputBuffer = buffer.duplicate();
inputBuffer.limit(contentStart + length).position(contentStart);
compressedBuffer.limit(compressedBuffer.capacity()).position(COMPRESSED_MARKER_SIZE);
compressor.compress(inputBuffer, compressedBuffer);
compressedBuffer.flip();
compressedBuffer.putInt(SYNC_MARKER_SIZE, length);
// Only one thread can be here at a given time.
// Protected by synchronization on CommitLogSegment.sync().
writeSyncMarker(id, compressedBuffer, 0, (int) channel.position(), (int) channel.position() + compressedBuffer.remaining());
manager.addSize(compressedBuffer.limit());
channel.write(compressedBuffer);
assert channel.position() - lastWrittenPos == compressedBuffer.limit();
lastWrittenPos = channel.position();
}
catch (Exception e)
{
throw new FSWriteError(e, getPath());
}
}
@Override
public long onDiskSize()
{
return lastWrittenPos;
}
}