blob: 70ac77aac1fd47d1206c3e57564810e82448d745 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.db.RowIndexEntry;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.JVMStabilityInspector;
public class MmappedSegmentedFile extends SegmentedFile
private static final Logger logger = LoggerFactory.getLogger(MmappedSegmentedFile.class);
// in a perfect world, MAX_SEGMENT_SIZE would be final, but we need to test with a smaller size to stay sane.
public static long MAX_SEGMENT_SIZE = Integer.MAX_VALUE;
* Sorted array of segment offsets and MappedByteBuffers for segments. If mmap is completely disabled, or if the
* segment would be too long to mmap, the value for an offset will be null, indicating that we need to fall back
* to a RandomAccessFile.
private final Segment[] segments;
public MmappedSegmentedFile(ChannelProxy channel, long length, Segment[] segments)
super(new Cleanup(channel, segments), channel, length);
this.segments = segments;
private MmappedSegmentedFile(MmappedSegmentedFile copy)
this.segments = copy.segments;
public MmappedSegmentedFile sharedCopy()
return new MmappedSegmentedFile(this);
* @return The segment entry for the given position.
private Segment floor(long position)
assert 0 <= position && position < length: String.format("%d >= %d in %s", position, length, path());
Segment seg = new Segment(position, null);
int idx = Arrays.binarySearch(segments, seg);
assert idx != -1 : String.format("Bad position %d for segments %s in %s", position, Arrays.toString(segments), path());
if (idx < 0)
// round down to entry at insertion point
idx = -(idx + 2);
return segments[idx];
* @return The segment containing the given position: must be closed after use.
public FileDataInput getSegment(long position)
Segment segment = floor(position);
if (segment.right != null)
// segment is mmap'd
return new ByteBufferDataInput(segment.right, path(), segment.left, (int) (position - segment.left));
// we can have single cells or partitions larger than 2Gb, which is our maximum addressable range in a single segment;
// in this case we open as a normal random access reader
// FIXME: brafs are unbounded, so this segment will cover the rest of the file, rather than just the row
RandomAccessReader file =;;
return file;
public long[] copyReadableBounds()
long[] bounds = new long[segments.length + 1];
for (int i = 0; i < segments.length; i++)
bounds[i] = segments[i].left;
bounds[segments.length] = length;
return bounds;
private static final class Cleanup extends SegmentedFile.Cleanup
final Segment[] segments;
protected Cleanup(ChannelProxy channel, Segment[] segments)
this.segments = segments;
public void tidy()
if (!FileUtils.isCleanerAvailable())
* Try forcing the unmapping of segments using undocumented unsafe sun APIs.
* If this fails (non Sun JVM), we'll have to wait for the GC to finalize the mapping.
* If this works and a thread tries to access any segment, hell will unleash on earth.
for (Segment segment : segments)
if (segment.right == null)
logger.trace("All segments have been unmapped successfully");
catch (Exception e)
// This is not supposed to happen
logger.error("Error while unmapping segments", e);
// see CASSANDRA-10357
public static boolean maybeRepair(CFMetaData metadata, Descriptor descriptor, IndexSummary indexSummary, SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder)
boolean mayNeedRepair = false;
if (ibuilder instanceof Builder)
mayNeedRepair = ((Builder) ibuilder).mayNeedRepair(descriptor.filenameFor(Component.PRIMARY_INDEX));
if (dbuilder instanceof Builder)
mayNeedRepair |= ((Builder) dbuilder).mayNeedRepair(descriptor.filenameFor(Component.DATA));
if (mayNeedRepair)
forceRepair(metadata, descriptor, indexSummary, ibuilder, dbuilder);
return mayNeedRepair;
// if one of the index/data files have boundaries larger than we can mmap, and they were written by a version that did not guarantee correct boundaries were saved,
// rebuild the boundaries and save them again
private static void forceRepair(CFMetaData metadata, Descriptor descriptor, IndexSummary indexSummary, SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder)
if (ibuilder instanceof Builder)
((Builder) ibuilder).boundaries.clear();
if (dbuilder instanceof Builder)
((Builder) dbuilder).boundaries.clear();
RowIndexEntry.IndexSerializer rowIndexEntrySerializer = descriptor.version.getSSTableFormat().getIndexSerializer(metadata);
try (RandomAccessFile raf = new RandomAccessFile(descriptor.filenameFor(Component.PRIMARY_INDEX), "r");)
long iprev = 0, dprev = 0;
for (int i = 0; i < indexSummary.size(); i++)
// first read the position in the summary, and read the corresponding position in the data file
long icur = indexSummary.getPosition(i);;
RowIndexEntry rie = rowIndexEntrySerializer.deserialize(raf, descriptor.version);
long dcur = rie.position;
// if these positions are small enough to map out a segment from the prior version (i.e. less than 2Gb),
// just add these as a boundary and proceed to the next index summary record; most scenarios will be
// served by this, keeping the cost of rebuild to a minimum.
if (Math.max(icur - iprev , dcur - dprev) > MAX_SEGMENT_SIZE)
// otherwise, loop over its index block, providing each RIE as a potential boundary for both files;
while (raf.getFilePointer() < icur)
// add the position of this record in the index file as an index file boundary
// then read the RIE, and add its data file position as a boundary for the data file
rie = rowIndexEntrySerializer.deserialize(raf, descriptor.version);
iprev = icur;
dprev = dcur;
catch (IOException e)
logger.error("Failed to recalculate boundaries for {}; mmap access may degrade to buffered for this file", descriptor);
* Overrides the default behaviour to create segments of a maximum size.
public static class Builder extends SegmentedFile.Builder
public static class Boundaries
private long[] boundaries;
// number of boundaries we have "fixed" (i.e. have determined the final value of)
private int fixedCount;
public Boundaries()
// we always have a boundary of zero, so we start with a fixedCount of 1
this(new long[8], 1);
public Boundaries(long[] boundaries, int fixedCount)
init(boundaries, fixedCount);
void init(long[] boundaries, int fixedCount)
this.boundaries = boundaries;
this.fixedCount = fixedCount;
public void addCandidate(long candidate)
// we make sure we have room before adding another element, so that we can share the addCandidate logic statically
boundaries = ensureCapacity(boundaries, fixedCount);
fixedCount = addCandidate(boundaries, fixedCount, candidate);
private static int addCandidate(long[] boundaries, int fixedCount, long candidate)
// check how far we are from the last fixed boundary
long delta = candidate - boundaries[fixedCount - 1];
assert delta >= 0;
if (delta != 0)
if (delta <= MAX_SEGMENT_SIZE)
// overwrite the unfixed (potential) boundary if the resultant segment would still be mmappable
boundaries[fixedCount] = candidate;
else if (boundaries[fixedCount] == 0)
// or, if it is not initialised, we cannot make an mmapped segment here, so this is the fixed boundary
boundaries[fixedCount++] = candidate;
// otherwise, fix the prior boundary and initialise our unfixed boundary
boundaries[++fixedCount] = candidate;
return fixedCount;
// ensures there is room for another fixed boundary AND an unfixed candidate boundary, i.e. fixedCount + 2 items
private static long[] ensureCapacity(long[] boundaries, int fixedCount)
if (fixedCount + 1 >= boundaries.length)
return Arrays.copyOf(boundaries, Math.max(fixedCount + 2, boundaries.length * 2));
return boundaries;
void clear()
fixedCount = 1;
Arrays.fill(boundaries, 0);
// returns the fixed boundaries, truncated to a correctly sized long[]
public long[] truncate()
return Arrays.copyOf(boundaries, fixedCount);
// returns the finished boundaries for the provided length, truncated to a correctly sized long[]
public long[] finish(long length, boolean isFinal)
assert length > 0;
// ensure there's room for the length to be added
boundaries = ensureCapacity(boundaries, fixedCount);
// clone our current contents, so we don't corrupt them
int fixedCount = this.fixedCount;
long[] boundaries = this.boundaries.clone();
// if we're finishing early, our length may be before some of our boundaries,
// so walk backwards until our boundaries are <= length
while (boundaries[fixedCount - 1] >= length)
boundaries[fixedCount--] = 0;
if (boundaries[fixedCount] >= length)
boundaries[fixedCount] = 0;
// add our length as a boundary
fixedCount = addCandidate(boundaries, fixedCount, length);
// if we have any unfixed boundary at the end, it's now fixed, since we're done
if (boundaries[fixedCount] != 0)
boundaries = Arrays.copyOf(boundaries, fixedCount);
if (isFinal)
// if this is the final one, save it
this.boundaries = boundaries;
this.fixedCount = fixedCount;
return boundaries;
private final Boundaries boundaries = new Boundaries();
public Builder()
public long[] boundaries()
return boundaries.truncate();
// indicates if we may need to repair the mmapped file boundaries. this is a cheap check to see if there
// are any spans larger than an mmap segment size, which should be rare to occur in practice.
boolean mayNeedRepair(String path)
// old boundaries were created without the length, so add it as a candidate
long length = new File(path).length();
long[] boundaries = this.boundaries.truncate();
long prev = 0;
for (long boundary : boundaries)
if (boundary - prev > MAX_SEGMENT_SIZE)
return true;
prev = boundary;
return false;
public void addPotentialBoundary(long boundary)
public SegmentedFile complete(ChannelProxy channel, long overrideLength)
long length = overrideLength > 0 ? overrideLength : channel.size();
// create the segments
long[] boundaries = this.boundaries.finish(length, overrideLength <= 0);
int segcount = boundaries.length - 1;
Segment[] segments = new Segment[segcount];
for (int i = 0; i < segcount; i++)
long start = boundaries[i];
long size = boundaries[i + 1] - start;
MappedByteBuffer segment = size <= MAX_SEGMENT_SIZE
?, start, size)
: null;
segments[i] = new Segment(start, segment);
return new MmappedSegmentedFile(channel, length, segments);
public void serializeBounds(DataOutput out) throws IOException
long[] boundaries = this.boundaries.truncate();
for (long boundary : boundaries)
public void deserializeBounds(DataInput in) throws IOException
int size = in.readInt();
long[] boundaries = new long[size];
for (int i = 0; i < size; i++)
boundaries[i] = in.readLong();
this.boundaries.init(boundaries, size);