| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.jena.dboe.base.file; |
| |
| import static java.lang.String.format; |
| |
| import java.io.IOException; |
| import java.nio.ByteBuffer; |
| import java.nio.MappedByteBuffer; |
| import java.nio.channels.FileChannel.MapMode; |
| import java.util.Arrays; |
| |
| import org.apache.jena.dboe.base.block.Block; |
| import org.apache.jena.dboe.sys.SystemIndex; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** FileAccess for a file, using memory mapped I/O */ |
| final |
| public class BlockAccessMapped extends BlockAccessBase |
| { |
| /* Blocks are addressed by positive ints - |
| * Is that a limit? |
| * One billion is 2^30 |
| * If a block is 8K, the 2^31*2^13 = 2^44 bits or 2^14 billion = 16K Billion. = 16 trillion bytes. |
| * No limit at the moment - later performance tuning will see what the cost of 48 or 63 bit addresses would be. |
| */ |
| |
| private static Logger log = LoggerFactory.getLogger(BlockAccessMapped.class); |
| private enum CopyContents { Overwrite, NoCopy } |
| |
| // Segmentation avoids over-mapping; allows file to grow (in chunks) |
| private final int GrowthFactor = 2; |
| private final int SegmentSize = SystemIndex.SegmentSize; |
| private final int blocksPerSegment; |
| |
| private int initialNumSegements = 1; |
| private MappedByteBuffer[] segments = new MappedByteBuffer[initialNumSegements]; |
| |
| // Unflushed segments. |
| private int segmentDirtyCount = 0; |
| private boolean[] segmentDirty = new boolean[initialNumSegements]; |
| |
| public BlockAccessMapped(String filename, int blockSize) { |
| super(filename, blockSize); |
| blocksPerSegment = SegmentSize/blockSize; |
| if ( SegmentSize%blockSize != 0 ) |
| getLog().warn(format("%s: Segment size(%d) not a multiple of blocksize (%d)", filename, SegmentSize, blockSize)); |
| |
| for ( int i = 0; i < initialNumSegements ; i++ ) |
| // Not strictly necessary - default value is false. |
| segmentDirty[i] = false; |
| segmentDirtyCount = 0; |
| |
| if ( getLog().isDebugEnabled() ) |
| getLog().debug(format("Segment:%d BlockSize=%d blocksPerSegment=%d", SegmentSize, blockSize, blocksPerSegment)); |
| } |
| |
| @Override |
| public Block allocate(int blkSize) { |
| if ( blkSize > 0 && blkSize != this.blockSize ) |
| throw new FileException("Fixed blocksize only: request= "+blkSize+"fixed size="+this.blockSize); |
| int id = allocateId(); |
| ByteBuffer bb = getByteBuffer(id); |
| bb.position(0); |
| Block block = new Block(id, bb); |
| return block; |
| } |
| |
| @Override |
| public Block read(long id) { |
| check(id); |
| checkIfClosed(); |
| ByteBuffer bb = getByteBuffer(id); |
| bb.position(0); |
| Block block = new Block(id, bb); |
| return block; |
| } |
| |
| @Override |
| public void write(Block block) { |
| write(block, CopyContents.NoCopy); |
| } |
| |
| @Override |
| public void overwrite(Block block) { |
| overwriteNotification(block); |
| write(block, CopyContents.Overwrite); |
| } |
| |
| private void write(Block block, CopyContents copyContents) { |
| check(block); |
| checkIfClosed(); |
| int id = block.getId().intValue(); |
| |
| if ( copyContents == CopyContents.Overwrite ) { |
| ByteBuffer bbDst = getByteBuffer(id); |
| bbDst.position(0); |
| ByteBuffer bbSrc = block.getByteBuffer(); |
| bbSrc.rewind(); |
| bbDst.put(bbSrc); |
| } |
| |
| // Assumed MRSW - no need to sync as we are the only Writer |
| segmentDirty[segment(id)] = true; |
| writeNotification(block); |
| } |
| |
| @Override |
| public void sync() { |
| checkIfClosed(); |
| force(); |
| } |
| |
| private ByteBuffer getByteBuffer(long _id) { |
| // Limitation: ids must be integers. |
| // ids are used to index into []-arrays. |
| int id = (int)_id; |
| if ( id < 0 ) { |
| String msg = String.format("%s: ByteBuffer index is negative: %d (long = %d)", label, id, _id); |
| getLog().error(msg); |
| throw new IllegalArgumentException(msg); |
| } |
| |
| int seg = segment(id); // Segment. |
| int segOff = byteOffset(id); // Byte offset in segment |
| |
| if ( getLog().isTraceEnabled() ) |
| getLog().trace(format("%d => [%d, %d]", id, seg, segOff)); |
| |
| synchronized (this) { |
| try { |
| // Need to put the alloc AND the slice/reset inside a sync. |
| ByteBuffer segBuffer = allocSegment(seg); |
| // Now slice the buffer to get the ByteBuffer to return |
| segBuffer.position(segOff); |
| segBuffer.limit(segOff+blockSize); |
| ByteBuffer dst = segBuffer.slice(); |
| |
| // And then reset limit to max for segment. |
| segBuffer.limit(segBuffer.capacity()); |
| // Extend block count when we allocate above end. |
| numFileBlocks = Math.max(numFileBlocks, id+1); |
| return dst; |
| } catch (IllegalArgumentException ex) { |
| // Shouldn't (ha!) happen because the second "limit" resets |
| log.error("Id: "+id); |
| log.error("Seg="+seg); |
| log.error("Segoff="+segOff); |
| log.error(ex.getMessage(), ex); |
| throw ex; |
| } |
| } |
| } |
| |
| private final int segment(int id) { return id/blocksPerSegment; } |
| private final int byteOffset(int id) { return (id%blocksPerSegment)*blockSize; } |
| private final long fileLocationForSegment(long segmentNumber) { return segmentNumber*SegmentSize; } |
| |
| // Even for MultipleReader this needs to be sync'ed.?? |
| private MappedByteBuffer allocSegment(int seg) { |
| // Auxiliary function for get - which holds the lock needed here. |
| // The MappedByteBuffer must be sliced and reset once found/allocated |
| // so as not to mess up the underlying MappedByteBuffer in segments[]. |
| |
| // Only allocSegment(seg) and flushDirtySegements() and close() |
| // directly access segments[] |
| |
| if ( seg < 0 ) { |
| getLog().error("Segment negative: "+seg); |
| throw new FileException("Negative segment: "+seg); |
| } |
| |
| while ( seg >= segments.length ) { |
| // More space needed. |
| MappedByteBuffer[] segments2 = new MappedByteBuffer[GrowthFactor*segments.length]; |
| System.arraycopy(segments, 0, segments2, 0, segments.length); |
| boolean[] segmentDirty2 = new boolean[GrowthFactor*segmentDirty.length]; |
| System.arraycopy(segmentDirty, 0, segmentDirty2, 0, segmentDirty.length); |
| |
| segmentDirty = segmentDirty2; |
| segments = segments2; |
| } |
| |
| long offset = fileLocationForSegment(seg); |
| |
| if ( offset < 0 ) { |
| getLog().error("Segment offset gone negative: "+seg); |
| throw new FileException("Negative segment offset: "+seg); |
| } |
| |
| MappedByteBuffer segBuffer = segments[seg]; |
| if ( segBuffer == null ) { |
| try { |
| segBuffer = file.map(MapMode.READ_WRITE, offset, SegmentSize); |
| if ( getLog().isDebugEnabled() ) |
| getLog().debug(format("Segment: %d", seg)); |
| segments[seg] = segBuffer; |
| } |
| catch (IOException ex) { |
| if ( ex.getCause() instanceof java.lang.OutOfMemoryError ) |
| throw new FileException("BlockMgrMapped.segmentAllocate: Segment = " + seg + " : Offset = " + offset); |
| throw new FileException("BlockMgrMapped.segmentAllocate: Segment = " + seg, ex); |
| } |
| } |
| return segBuffer; |
| } |
| |
| private synchronized void flushDirtySegments() { |
| |
| // A linked list (with uniqueness) of dirty segments may be better. |
| for ( int i = 0; i < segments.length ; i++ ) { |
| if ( segments[i] != null && segmentDirty[i] ) { |
| // Can we "flush" them all at once? |
| segments[i].force(); |
| segmentDirty[i] = false; |
| segmentDirtyCount--; |
| } |
| } |
| // This on its own does not force dirty segments to disk. |
| super.force(); |
| } |
| |
| @Override |
| protected void _resetAllocBoundary(long boundary) { |
| |
| } |
| |
| @Override |
| protected void _close() { |
| force(); |
| // There is no unmap operation for MappedByteBuffers. |
| // Sun Bug id bug_id=4724038 |
| // http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=4724038 |
| Arrays.fill(segments, null); |
| Arrays.fill(segmentDirty, false); |
| segmentDirtyCount = 0; |
| } |
| |
| @Override |
| protected void force() { |
| flushDirtySegments(); |
| } |
| |
| @Override |
| protected Logger getLog() { |
| return log; |
| } |
| |
| @Override |
| public String toString() { |
| return super.getLabel(); |
| } |
| } |