blob: 8e05112336fe0df3ae4b9db1b2e53f4262b3ccb4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.db.commitlog;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.io.FSWriteError;
import org.apache.cassandra.io.compress.BufferType;
import org.apache.cassandra.io.compress.ICompressor;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.utils.SyncUtil;
/*
* Compressed commit log segment. Provides an in-memory buffer for the mutation threads. On sync compresses the written
* section of the buffer and writes it to the destination channel.
*/
public class CompressedSegment extends CommitLogSegment
{
private static final ThreadLocal<ByteBuffer> compressedBufferHolder = new ThreadLocal<ByteBuffer>() {
protected ByteBuffer initialValue()
{
return ByteBuffer.allocate(0);
}
};
static Queue<ByteBuffer> bufferPool = new ConcurrentLinkedQueue<>();
/**
* The number of buffers in use
*/
private static AtomicInteger usedBuffers = new AtomicInteger(0);
/**
* Maximum number of buffers in the compression pool. The default value is 3, it should not be set lower than that
* (one segment in compression, one written to, one in reserve); delays in compression may cause the log to use
* more, depending on how soon the sync policy stops all writing threads.
*/
static final int MAX_BUFFERPOOL_SIZE = DatabaseDescriptor.getCommitLogMaxCompressionBuffersInPool();
static final int COMPRESSED_MARKER_SIZE = SYNC_MARKER_SIZE + 4;
final ICompressor compressor;
final Runnable onClose;
volatile long lastWrittenPos = 0;
/**
* Constructs a new segment file.
*/
CompressedSegment(CommitLog commitLog, Runnable onClose)
{
super(commitLog);
this.compressor = commitLog.configuration.getCompressor();
this.onClose = onClose;
try
{
channel.write((ByteBuffer) buffer.duplicate().flip());
commitLog.allocator.addSize(lastWrittenPos = buffer.position());
}
catch (IOException e)
{
throw new FSWriteError(e, getPath());
}
}
ByteBuffer allocate(int size)
{
return compressor.preferredBufferType().allocate(size);
}
ByteBuffer createBuffer(CommitLog commitLog)
{
usedBuffers.incrementAndGet();
ByteBuffer buf = bufferPool.poll();
if (buf == null)
{
// this.compressor is not yet set, so we must use the commitLog's one.
buf = commitLog.configuration.getCompressor()
.preferredBufferType()
.allocate(DatabaseDescriptor.getCommitLogSegmentSize());
} else
buf.clear();
return buf;
}
static long startMillis = System.currentTimeMillis();
@Override
void write(int startMarker, int nextMarker)
{
int contentStart = startMarker + SYNC_MARKER_SIZE;
int length = nextMarker - contentStart;
// The length may be 0 when the segment is being closed.
assert length > 0 || length == 0 && !isStillAllocating();
try
{
int neededBufferSize = compressor.initialCompressedBufferLength(length) + COMPRESSED_MARKER_SIZE;
ByteBuffer compressedBuffer = compressedBufferHolder.get();
if (compressor.preferredBufferType() != BufferType.typeOf(compressedBuffer) ||
compressedBuffer.capacity() < neededBufferSize)
{
FileUtils.clean(compressedBuffer);
compressedBuffer = allocate(neededBufferSize);
compressedBufferHolder.set(compressedBuffer);
}
ByteBuffer inputBuffer = buffer.duplicate();
inputBuffer.limit(contentStart + length).position(contentStart);
compressedBuffer.limit(compressedBuffer.capacity()).position(COMPRESSED_MARKER_SIZE);
compressor.compress(inputBuffer, compressedBuffer);
compressedBuffer.flip();
compressedBuffer.putInt(SYNC_MARKER_SIZE, length);
// Only one thread can be here at a given time.
// Protected by synchronization on CommitLogSegment.sync().
writeSyncMarker(id, compressedBuffer, 0, (int) channel.position(), (int) channel.position() + compressedBuffer.remaining());
commitLog.allocator.addSize(compressedBuffer.limit());
channel.write(compressedBuffer);
assert channel.position() - lastWrittenPos == compressedBuffer.limit();
lastWrittenPos = channel.position();
}
catch (Exception e)
{
throw new FSWriteError(e, getPath());
}
}
@Override
protected void flush(int startMarker, int nextMarker)
{
try
{
SyncUtil.force(channel, true);
}
catch (Exception e)
{
throw new FSWriteError(e, getPath());
}
}
@Override
protected void internalClose()
{
usedBuffers.decrementAndGet();
try {
if (bufferPool.size() < MAX_BUFFERPOOL_SIZE)
bufferPool.add(buffer);
else
FileUtils.clean(buffer);
super.internalClose();
}
finally
{
onClose.run();
}
}
/**
* Checks if the number of buffers in use is greater or equals to the maximum number of buffers allowed in the pool.
*
* @return <code>true</code> if the number of buffers in use is greater or equals to the maximum number of buffers
* allowed in the pool, <code>false</code> otherwise.
*/
static boolean hasReachedPoolLimit()
{
return usedBuffers.get() >= MAX_BUFFERPOOL_SIZE;
}
static void shutdown()
{
bufferPool.clear();
}
@Override
public long onDiskSize()
{
return lastWrittenPos;
}
}