blob: 70ac77aac1fd47d1206c3e57564810e82448d745 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.io.util;
import java.io.*;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.db.RowIndexEntry;
import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.IndexSummary;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.JVMStabilityInspector;
public class MmappedSegmentedFile extends SegmentedFile
{
private static final Logger logger = LoggerFactory.getLogger(MmappedSegmentedFile.class);
// in a perfect world, MAX_SEGMENT_SIZE would be final, but we need to test with a smaller size to stay sane.
public static long MAX_SEGMENT_SIZE = Integer.MAX_VALUE;
/**
* Sorted array of segment offsets and MappedByteBuffers for segments. If mmap is completely disabled, or if the
* segment would be too long to mmap, the value for an offset will be null, indicating that we need to fall back
* to a RandomAccessFile.
*/
private final Segment[] segments;
public MmappedSegmentedFile(ChannelProxy channel, long length, Segment[] segments)
{
super(new Cleanup(channel, segments), channel, length);
this.segments = segments;
}
private MmappedSegmentedFile(MmappedSegmentedFile copy)
{
super(copy);
this.segments = copy.segments;
}
public MmappedSegmentedFile sharedCopy()
{
return new MmappedSegmentedFile(this);
}
/**
* @return The segment entry for the given position.
*/
private Segment floor(long position)
{
assert 0 <= position && position < length: String.format("%d >= %d in %s", position, length, path());
Segment seg = new Segment(position, null);
int idx = Arrays.binarySearch(segments, seg);
assert idx != -1 : String.format("Bad position %d for segments %s in %s", position, Arrays.toString(segments), path());
if (idx < 0)
// round down to entry at insertion point
idx = -(idx + 2);
return segments[idx];
}
/**
* @return The segment containing the given position: must be closed after use.
*/
public FileDataInput getSegment(long position)
{
Segment segment = floor(position);
if (segment.right != null)
{
// segment is mmap'd
return new ByteBufferDataInput(segment.right, path(), segment.left, (int) (position - segment.left));
}
// we can have single cells or partitions larger than 2Gb, which is our maximum addressable range in a single segment;
// in this case we open as a normal random access reader
// FIXME: brafs are unbounded, so this segment will cover the rest of the file, rather than just the row
RandomAccessReader file = RandomAccessReader.open(channel);
file.seek(position);
return file;
}
@Override
public long[] copyReadableBounds()
{
long[] bounds = new long[segments.length + 1];
for (int i = 0; i < segments.length; i++)
bounds[i] = segments[i].left;
bounds[segments.length] = length;
return bounds;
}
private static final class Cleanup extends SegmentedFile.Cleanup
{
final Segment[] segments;
protected Cleanup(ChannelProxy channel, Segment[] segments)
{
super(channel);
this.segments = segments;
}
public void tidy()
{
super.tidy();
if (!FileUtils.isCleanerAvailable())
return;
/*
* Try forcing the unmapping of segments using undocumented unsafe sun APIs.
* If this fails (non Sun JVM), we'll have to wait for the GC to finalize the mapping.
* If this works and a thread tries to access any segment, hell will unleash on earth.
*/
try
{
for (Segment segment : segments)
{
if (segment.right == null)
continue;
FileUtils.clean(segment.right);
}
logger.trace("All segments have been unmapped successfully");
}
catch (Exception e)
{
JVMStabilityInspector.inspectThrowable(e);
// This is not supposed to happen
logger.error("Error while unmapping segments", e);
}
}
}
// see CASSANDRA-10357
public static boolean maybeRepair(CFMetaData metadata, Descriptor descriptor, IndexSummary indexSummary, SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder)
{
boolean mayNeedRepair = false;
if (ibuilder instanceof Builder)
mayNeedRepair = ((Builder) ibuilder).mayNeedRepair(descriptor.filenameFor(Component.PRIMARY_INDEX));
if (dbuilder instanceof Builder)
mayNeedRepair |= ((Builder) dbuilder).mayNeedRepair(descriptor.filenameFor(Component.DATA));
if (mayNeedRepair)
forceRepair(metadata, descriptor, indexSummary, ibuilder, dbuilder);
return mayNeedRepair;
}
// if one of the index/data files have boundaries larger than we can mmap, and they were written by a version that did not guarantee correct boundaries were saved,
// rebuild the boundaries and save them again
private static void forceRepair(CFMetaData metadata, Descriptor descriptor, IndexSummary indexSummary, SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder)
{
if (ibuilder instanceof Builder)
((Builder) ibuilder).boundaries.clear();
if (dbuilder instanceof Builder)
((Builder) dbuilder).boundaries.clear();
RowIndexEntry.IndexSerializer rowIndexEntrySerializer = descriptor.version.getSSTableFormat().getIndexSerializer(metadata);
try (RandomAccessFile raf = new RandomAccessFile(descriptor.filenameFor(Component.PRIMARY_INDEX), "r");)
{
long iprev = 0, dprev = 0;
for (int i = 0; i < indexSummary.size(); i++)
{
// first read the position in the summary, and read the corresponding position in the data file
long icur = indexSummary.getPosition(i);
raf.seek(icur);
ByteBufferUtil.readWithShortLength(raf);
RowIndexEntry rie = rowIndexEntrySerializer.deserialize(raf, descriptor.version);
long dcur = rie.position;
// if these positions are small enough to map out a segment from the prior version (i.e. less than 2Gb),
// just add these as a boundary and proceed to the next index summary record; most scenarios will be
// served by this, keeping the cost of rebuild to a minimum.
if (Math.max(icur - iprev , dcur - dprev) > MAX_SEGMENT_SIZE)
{
// otherwise, loop over its index block, providing each RIE as a potential boundary for both files
raf.seek(iprev);
while (raf.getFilePointer() < icur)
{
// add the position of this record in the index file as an index file boundary
ibuilder.addPotentialBoundary(raf.getFilePointer());
// then read the RIE, and add its data file position as a boundary for the data file
ByteBufferUtil.readWithShortLength(raf);
rie = rowIndexEntrySerializer.deserialize(raf, descriptor.version);
dbuilder.addPotentialBoundary(rie.position);
}
}
ibuilder.addPotentialBoundary(icur);
dbuilder.addPotentialBoundary(dcur);
iprev = icur;
dprev = dcur;
}
}
catch (IOException e)
{
logger.error("Failed to recalculate boundaries for {}; mmap access may degrade to buffered for this file", descriptor);
}
}
/**
* Overrides the default behaviour to create segments of a maximum size.
*/
public static class Builder extends SegmentedFile.Builder
{
@VisibleForTesting
public static class Boundaries
{
private long[] boundaries;
// number of boundaries we have "fixed" (i.e. have determined the final value of)
private int fixedCount;
public Boundaries()
{
// we always have a boundary of zero, so we start with a fixedCount of 1
this(new long[8], 1);
}
public Boundaries(long[] boundaries, int fixedCount)
{
init(boundaries, fixedCount);
}
void init(long[] boundaries, int fixedCount)
{
this.boundaries = boundaries;
this.fixedCount = fixedCount;
}
public void addCandidate(long candidate)
{
// we make sure we have room before adding another element, so that we can share the addCandidate logic statically
boundaries = ensureCapacity(boundaries, fixedCount);
fixedCount = addCandidate(boundaries, fixedCount, candidate);
}
private static int addCandidate(long[] boundaries, int fixedCount, long candidate)
{
// check how far we are from the last fixed boundary
long delta = candidate - boundaries[fixedCount - 1];
assert delta >= 0;
if (delta != 0)
{
if (delta <= MAX_SEGMENT_SIZE)
// overwrite the unfixed (potential) boundary if the resultant segment would still be mmappable
boundaries[fixedCount] = candidate;
else if (boundaries[fixedCount] == 0)
// or, if it is not initialised, we cannot make an mmapped segment here, so this is the fixed boundary
boundaries[fixedCount++] = candidate;
else
// otherwise, fix the prior boundary and initialise our unfixed boundary
boundaries[++fixedCount] = candidate;
}
return fixedCount;
}
// ensures there is room for another fixed boundary AND an unfixed candidate boundary, i.e. fixedCount + 2 items
private static long[] ensureCapacity(long[] boundaries, int fixedCount)
{
if (fixedCount + 1 >= boundaries.length)
return Arrays.copyOf(boundaries, Math.max(fixedCount + 2, boundaries.length * 2));
return boundaries;
}
void clear()
{
fixedCount = 1;
Arrays.fill(boundaries, 0);
}
// returns the fixed boundaries, truncated to a correctly sized long[]
public long[] truncate()
{
return Arrays.copyOf(boundaries, fixedCount);
}
// returns the finished boundaries for the provided length, truncated to a correctly sized long[]
public long[] finish(long length, boolean isFinal)
{
assert length > 0;
// ensure there's room for the length to be added
boundaries = ensureCapacity(boundaries, fixedCount);
// clone our current contents, so we don't corrupt them
int fixedCount = this.fixedCount;
long[] boundaries = this.boundaries.clone();
// if we're finishing early, our length may be before some of our boundaries,
// so walk backwards until our boundaries are <= length
while (boundaries[fixedCount - 1] >= length)
boundaries[fixedCount--] = 0;
if (boundaries[fixedCount] >= length)
boundaries[fixedCount] = 0;
// add our length as a boundary
fixedCount = addCandidate(boundaries, fixedCount, length);
// if we have any unfixed boundary at the end, it's now fixed, since we're done
if (boundaries[fixedCount] != 0)
fixedCount++;
boundaries = Arrays.copyOf(boundaries, fixedCount);
if (isFinal)
{
// if this is the final one, save it
this.boundaries = boundaries;
this.fixedCount = fixedCount;
}
return boundaries;
}
}
private final Boundaries boundaries = new Boundaries();
public Builder()
{
super();
}
public long[] boundaries()
{
return boundaries.truncate();
}
// indicates if we may need to repair the mmapped file boundaries. this is a cheap check to see if there
// are any spans larger than an mmap segment size, which should be rare to occur in practice.
boolean mayNeedRepair(String path)
{
// old boundaries were created without the length, so add it as a candidate
long length = new File(path).length();
boundaries.addCandidate(length);
long[] boundaries = this.boundaries.truncate();
long prev = 0;
for (long boundary : boundaries)
{
if (boundary - prev > MAX_SEGMENT_SIZE)
return true;
prev = boundary;
}
return false;
}
public void addPotentialBoundary(long boundary)
{
boundaries.addCandidate(boundary);
}
public SegmentedFile complete(ChannelProxy channel, long overrideLength)
{
long length = overrideLength > 0 ? overrideLength : channel.size();
// create the segments
long[] boundaries = this.boundaries.finish(length, overrideLength <= 0);
int segcount = boundaries.length - 1;
Segment[] segments = new Segment[segcount];
for (int i = 0; i < segcount; i++)
{
long start = boundaries[i];
long size = boundaries[i + 1] - start;
MappedByteBuffer segment = size <= MAX_SEGMENT_SIZE
? channel.map(FileChannel.MapMode.READ_ONLY, start, size)
: null;
segments[i] = new Segment(start, segment);
}
return new MmappedSegmentedFile(channel, length, segments);
}
@Override
public void serializeBounds(DataOutput out) throws IOException
{
super.serializeBounds(out);
long[] boundaries = this.boundaries.truncate();
out.writeInt(boundaries.length);
for (long boundary : boundaries)
out.writeLong(boundary);
}
@Override
public void deserializeBounds(DataInput in) throws IOException
{
super.deserializeBounds(in);
int size = in.readInt();
long[] boundaries = new long[size];
for (int i = 0; i < size; i++)
boundaries[i] = in.readLong();
this.boundaries.init(boundaries, size);
}
}
}