blob: 13b476a9ea75a7e02ba3ff112f419fcffa7aa762 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.io.util;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.Arrays;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.io.FSReadError;
import org.apache.cassandra.io.compress.CompressionMetadata;
import org.apache.cassandra.utils.JVMStabilityInspector;
import org.apache.cassandra.utils.Throwables;
import org.apache.cassandra.utils.concurrent.RefCounted;
import org.apache.cassandra.utils.concurrent.SharedCloseableImpl;
import static java.util.stream.Stream.of;
import static org.apache.cassandra.utils.Throwables.perform;
public class MmappedRegions extends SharedCloseableImpl
{
/** In a perfect world, MAX_SEGMENT_SIZE would be final, but we need to test with a smaller size */
public static int MAX_SEGMENT_SIZE = Integer.MAX_VALUE;
/** When we need to grow the arrays, we add this number of region slots */
static final int REGION_ALLOC_SIZE = 15;
/** The original state, which is shared with the tidier and
* contains all the regions mapped so far. It also
* does the actual mapping. */
private final State state;
/** A copy of the latest state. We update this each time the original state is
* updated and we share this with copies. If we are a copy, then this
* is null. Copies can only access existing regions, they cannot create
* new ones. This is for thread safety and because MmappedRegions is
* reference counted, only the original state will be cleaned-up,
* therefore only the original state should create new mapped regions.
*/
private volatile State copy;
private MmappedRegions(ChannelProxy channel, CompressionMetadata metadata, long length)
{
this(new State(channel), metadata, length);
}
private MmappedRegions(State state, CompressionMetadata metadata, long length)
{
super(new Tidier(state));
this.state = state;
if (metadata != null)
{
assert length == 0 : "expected no length with metadata";
updateState(metadata);
}
else if (length > 0)
{
updateState(length);
}
this.copy = new State(state);
}
private MmappedRegions(MmappedRegions original)
{
super(original);
this.state = original.copy;
}
public static MmappedRegions empty(ChannelProxy channel)
{
return new MmappedRegions(channel, null, 0);
}
/**
* @param channel file to map. the MmappedRegions instance will hold shared copy of given channel.
* @param metadata
* @return new instance
*/
public static MmappedRegions map(ChannelProxy channel, CompressionMetadata metadata)
{
if (metadata == null)
throw new IllegalArgumentException("metadata cannot be null");
return new MmappedRegions(channel, metadata, 0);
}
public static MmappedRegions map(ChannelProxy channel, long length)
{
if (length <= 0)
throw new IllegalArgumentException("Length must be positive");
return new MmappedRegions(channel, null, length);
}
/**
* @return a snapshot of the memory mapped regions. The snapshot can
* only use existing regions, it cannot create new ones.
*/
public MmappedRegions sharedCopy()
{
return new MmappedRegions(this);
}
private boolean isCopy()
{
return copy == null;
}
public void extend(long length)
{
if (length < 0)
throw new IllegalArgumentException("Length must not be negative");
assert !isCopy() : "Copies cannot be extended";
if (length <= state.length)
return;
updateState(length);
copy = new State(state);
}
private void updateState(long length)
{
state.length = length;
long pos = state.getPosition();
while (pos < length)
{
long size = Math.min(MAX_SEGMENT_SIZE, length - pos);
state.add(pos, size);
pos += size;
}
}
private void updateState(CompressionMetadata metadata)
{
long offset = 0;
long lastSegmentOffset = 0;
long segmentSize = 0;
while (offset < metadata.dataLength)
{
CompressionMetadata.Chunk chunk = metadata.chunkFor(offset);
//Reached a new mmap boundary
if (segmentSize + chunk.length + 4 > MAX_SEGMENT_SIZE)
{
if (segmentSize > 0)
{
state.add(lastSegmentOffset, segmentSize);
lastSegmentOffset += segmentSize;
segmentSize = 0;
}
}
segmentSize += chunk.length + 4; //checksum
offset += metadata.chunkLength();
}
if (segmentSize > 0)
state.add(lastSegmentOffset, segmentSize);
state.length = lastSegmentOffset + segmentSize;
}
public boolean isValid(ChannelProxy channel)
{
return state.isValid(channel);
}
public boolean isEmpty()
{
return state.isEmpty();
}
public Region floor(long position)
{
assert !isCleanedUp() : "Attempted to use closed region";
return state.floor(position);
}
public void closeQuietly()
{
Throwable err = close(null);
if (err != null)
{
JVMStabilityInspector.inspectThrowable(err);
// This is not supposed to happen
LoggerFactory.getLogger(getClass()).error("Error while closing mmapped regions", err);
}
}
public static final class Region implements Rebufferer.BufferHolder
{
public final long offset;
public final ByteBuffer buffer;
public Region(long offset, ByteBuffer buffer)
{
this.offset = offset;
this.buffer = buffer;
}
public ByteBuffer buffer()
{
return buffer.duplicate();
}
public long offset()
{
return offset;
}
public long end()
{
return offset + buffer.capacity();
}
public void release()
{
// only released after no readers are present
}
}
private static final class State
{
/** The file channel */
private final ChannelProxy channel;
/** An array of region buffers, synchronized with offsets */
private ByteBuffer[] buffers;
/** An array of region offsets, synchronized with buffers */
private long[] offsets;
/** The maximum file length we have mapped */
private long length;
/** The index to the last region added */
private int last;
private State(ChannelProxy channel)
{
this.channel = channel.sharedCopy();
this.buffers = new ByteBuffer[REGION_ALLOC_SIZE];
this.offsets = new long[REGION_ALLOC_SIZE];
this.length = 0;
this.last = -1;
}
private State(State original)
{
this.channel = original.channel;
this.buffers = original.buffers;
this.offsets = original.offsets;
this.length = original.length;
this.last = original.last;
}
private boolean isEmpty()
{
return last < 0;
}
private boolean isValid(ChannelProxy channel)
{
return this.channel.filePath().equals(channel.filePath());
}
private Region floor(long position)
{
assert 0 <= position && position <= length : String.format("%d > %d", position, length);
int idx = Arrays.binarySearch(offsets, 0, last +1, position);
assert idx != -1 : String.format("Bad position %d for regions %s, last %d in %s", position, Arrays.toString(offsets), last, channel);
if (idx < 0)
idx = -(idx + 2); // round down to entry at insertion point
return new Region(offsets[idx], buffers[idx]);
}
private long getPosition()
{
return last < 0 ? 0 : offsets[last] + buffers[last].capacity();
}
private void add(long pos, long size)
{
ByteBuffer buffer = channel.map(FileChannel.MapMode.READ_ONLY, pos, size);
++last;
if (last == offsets.length)
{
offsets = Arrays.copyOf(offsets, offsets.length + REGION_ALLOC_SIZE);
buffers = Arrays.copyOf(buffers, buffers.length + REGION_ALLOC_SIZE);
}
offsets[last] = pos;
buffers[last] = buffer;
}
private Throwable close(Throwable accumulate)
{
accumulate = channel.close(accumulate);
/*
* Try forcing the unmapping of segments using undocumented unsafe sun APIs.
* If this fails (non Sun JVM), we'll have to wait for the GC to finalize the mapping.
* If this works and a thread tries to access any segment, hell will unleash on earth.
*/
if (!FileUtils.isCleanerAvailable)
return accumulate;
return perform(accumulate, channel.filePath(), Throwables.FileOpType.READ,
of(buffers)
.map((buffer) ->
() ->
{
if (buffer != null)
FileUtils.clean(buffer);
}));
}
}
public static final class Tidier implements RefCounted.Tidy
{
final State state;
Tidier(State state)
{
this.state = state;
}
public String name()
{
return state.channel.filePath();
}
public void tidy()
{
try
{
Throwables.maybeFail(state.close(null));
}
catch (Exception e)
{
throw new FSReadError(e, state.channel.filePath());
}
}
}
}