blob: 8f818fda0d480de881899085138ad6c331c27d16 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.core.memory;
import org.apache.flink.annotation.Internal;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.lang.reflect.Field;
import java.nio.BufferOverflowException;
import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
import java.nio.ReadOnlyBufferException;
/**
* This class represents a piece of memory managed by Flink. The memory can be on-heap or off-heap,
* this is transparently handled by this class.
*
* <p>This class specializes byte access and byte copy calls for heap memory, while reusing the
* multi-byte type accesses and cross-segment operations from the MemorySegment.
*
* <p>This class subsumes the functionality of the {@link org.apache.flink.core.memory.HeapMemorySegment},
* but is a bit less efficient for operations on individual bytes.
*
* <p>Note that memory segments should usually not be allocated manually, but rather through the
* {@link MemorySegmentFactory}.
*/
@Internal
public final class HybridMemorySegment extends MemorySegment {
/**
* The direct byte buffer that allocated the off-heap memory. This memory segment holds a
* reference to that buffer, so as long as this memory segment lives, the memory will not be
* released.
*/
private ByteBuffer offHeapBuffer;
/**
* Creates a new memory segment that represents the memory backing the given direct byte buffer.
* Note that the given ByteBuffer must be direct {@link java.nio.ByteBuffer#allocateDirect(int)},
* otherwise this method with throw an IllegalArgumentException.
*
* <p>The owner referenced by this memory segment is null.
*
* @param buffer The byte buffer whose memory is represented by this memory segment.
* @throws IllegalArgumentException Thrown, if the given ByteBuffer is not direct.
*/
HybridMemorySegment(ByteBuffer buffer) {
this(buffer, null);
}
/**
* Creates a new memory segment that represents the memory backing the given direct byte buffer.
* Note that the given ByteBuffer must be direct {@link java.nio.ByteBuffer#allocateDirect(int)},
* otherwise this method with throw an IllegalArgumentException.
*
* <p>The memory segment references the given owner.
*
* @param buffer The byte buffer whose memory is represented by this memory segment.
* @param owner The owner references by this memory segment.
* @throws IllegalArgumentException Thrown, if the given ByteBuffer is not direct.
*/
HybridMemorySegment(ByteBuffer buffer, Object owner) {
super(checkBufferAndGetAddress(buffer), buffer.capacity(), owner);
this.offHeapBuffer = buffer;
}
/**
* Creates a new memory segment that represents the memory of the byte array.
*
* <p>The owner referenced by this memory segment is null.
*
* @param buffer The byte array whose memory is represented by this memory segment.
*/
HybridMemorySegment(byte[] buffer) {
this(buffer, null);
}
/**
* Creates a new memory segment that represents the memory of the byte array.
*
* <p>The memory segment references the given owner.
*
* @param buffer The byte array whose memory is represented by this memory segment.
* @param owner The owner references by this memory segment.
*/
HybridMemorySegment(byte[] buffer, Object owner) {
super(buffer, owner);
this.offHeapBuffer = null;
}
// -------------------------------------------------------------------------
// MemorySegment operations
// -------------------------------------------------------------------------
/**
* Gets the buffer that owns the memory of this memory segment.
*
* @return The byte buffer that owns the memory of this memory segment.
*/
public ByteBuffer getOffHeapBuffer() {
if (offHeapBuffer != null) {
return offHeapBuffer;
} else {
throw new IllegalStateException("Memory segment does not represent off heap memory");
}
}
@Override
public ByteBuffer wrap(int offset, int length) {
if (address <= addressLimit) {
if (heapMemory != null) {
return ByteBuffer.wrap(heapMemory, offset, length);
}
else {
try {
ByteBuffer wrapper = offHeapBuffer.duplicate();
wrapper.limit(offset + length);
wrapper.position(offset);
return wrapper;
}
catch (IllegalArgumentException e) {
throw new IndexOutOfBoundsException();
}
}
}
else {
throw new IllegalStateException("segment has been freed");
}
}
// ------------------------------------------------------------------------
// Random Access get() and put() methods
// ------------------------------------------------------------------------
@Override
public final byte get(int index) {
final long pos = address + index;
if (index >= 0 && pos < addressLimit) {
return UNSAFE.getByte(heapMemory, pos);
}
else if (address > addressLimit) {
throw new IllegalStateException("segment has been freed");
}
else {
// index is in fact invalid
throw new IndexOutOfBoundsException();
}
}
@Override
public final void put(int index, byte b) {
final long pos = address + index;
if (index >= 0 && pos < addressLimit) {
UNSAFE.putByte(heapMemory, pos, b);
}
else if (address > addressLimit) {
throw new IllegalStateException("segment has been freed");
}
else {
// index is in fact invalid
throw new IndexOutOfBoundsException();
}
}
@Override
public final void get(int index, byte[] dst) {
get(index, dst, 0, dst.length);
}
@Override
public final void put(int index, byte[] src) {
put(index, src, 0, src.length);
}
@Override
public final void get(int index, byte[] dst, int offset, int length) {
// check the byte array offset and length and the status
if ((offset | length | (offset + length) | (dst.length - (offset + length))) < 0) {
throw new IndexOutOfBoundsException();
}
final long pos = address + index;
if (index >= 0 && pos <= addressLimit - length) {
final long arrayAddress = BYTE_ARRAY_BASE_OFFSET + offset;
UNSAFE.copyMemory(heapMemory, pos, dst, arrayAddress, length);
}
else if (address > addressLimit) {
throw new IllegalStateException("segment has been freed");
}
else {
// index is in fact invalid
throw new IndexOutOfBoundsException();
}
}
@Override
public final void put(int index, byte[] src, int offset, int length) {
// check the byte array offset and length
if ((offset | length | (offset + length) | (src.length - (offset + length))) < 0) {
throw new IndexOutOfBoundsException();
}
final long pos = address + index;
if (index >= 0 && pos <= addressLimit - length) {
final long arrayAddress = BYTE_ARRAY_BASE_OFFSET + offset;
UNSAFE.copyMemory(src, arrayAddress, heapMemory, pos, length);
}
else if (address > addressLimit) {
throw new IllegalStateException("segment has been freed");
}
else {
// index is in fact invalid
throw new IndexOutOfBoundsException();
}
}
@Override
public final boolean getBoolean(int index) {
return get(index) != 0;
}
@Override
public final void putBoolean(int index, boolean value) {
put(index, (byte) (value ? 1 : 0));
}
// -------------------------------------------------------------------------
// Bulk Read and Write Methods
// -------------------------------------------------------------------------
@Override
public final void get(DataOutput out, int offset, int length) throws IOException {
if (address <= addressLimit) {
if (heapMemory != null) {
out.write(heapMemory, offset, length);
}
else {
while (length >= 8) {
out.writeLong(getLongBigEndian(offset));
offset += 8;
length -= 8;
}
while (length > 0) {
out.writeByte(get(offset));
offset++;
length--;
}
}
}
else {
throw new IllegalStateException("segment has been freed");
}
}
@Override
public final void put(DataInput in, int offset, int length) throws IOException {
if (address <= addressLimit) {
if (heapMemory != null) {
in.readFully(heapMemory, offset, length);
}
else {
while (length >= 8) {
putLongBigEndian(offset, in.readLong());
offset += 8;
length -= 8;
}
while (length > 0) {
put(offset, in.readByte());
offset++;
length--;
}
}
}
else {
throw new IllegalStateException("segment has been freed");
}
}
@Override
public final void get(int offset, ByteBuffer target, int numBytes) {
// check the byte array offset and length
if ((offset | numBytes | (offset + numBytes)) < 0) {
throw new IndexOutOfBoundsException();
}
final int targetOffset = target.position();
final int remaining = target.remaining();
if (remaining < numBytes) {
throw new BufferOverflowException();
}
if (target.isDirect()) {
if (target.isReadOnly()) {
throw new ReadOnlyBufferException();
}
// copy to the target memory directly
final long targetPointer = getAddress(target) + targetOffset;
final long sourcePointer = address + offset;
if (sourcePointer <= addressLimit - numBytes) {
UNSAFE.copyMemory(heapMemory, sourcePointer, null, targetPointer, numBytes);
target.position(targetOffset + numBytes);
}
else if (address > addressLimit) {
throw new IllegalStateException("segment has been freed");
}
else {
throw new IndexOutOfBoundsException();
}
}
else if (target.hasArray()) {
// move directly into the byte array
get(offset, target.array(), targetOffset + target.arrayOffset(), numBytes);
// this must be after the get() call to ensue that the byte buffer is not
// modified in case the call fails
target.position(targetOffset + numBytes);
}
else {
// neither heap buffer nor direct buffer
while (target.hasRemaining()) {
target.put(get(offset++));
}
}
}
@Override
public final void put(int offset, ByteBuffer source, int numBytes) {
// check the byte array offset and length
if ((offset | numBytes | (offset + numBytes)) < 0) {
throw new IndexOutOfBoundsException();
}
final int sourceOffset = source.position();
final int remaining = source.remaining();
if (remaining < numBytes) {
throw new BufferUnderflowException();
}
if (source.isDirect()) {
// copy to the target memory directly
final long sourcePointer = getAddress(source) + sourceOffset;
final long targetPointer = address + offset;
if (targetPointer <= addressLimit - numBytes) {
UNSAFE.copyMemory(null, sourcePointer, heapMemory, targetPointer, numBytes);
source.position(sourceOffset + numBytes);
}
else if (address > addressLimit) {
throw new IllegalStateException("segment has been freed");
}
else {
throw new IndexOutOfBoundsException();
}
}
else if (source.hasArray()) {
// move directly into the byte array
put(offset, source.array(), sourceOffset + source.arrayOffset(), numBytes);
// this must be after the get() call to ensue that the byte buffer is not
// modified in case the call fails
source.position(sourceOffset + numBytes);
}
else {
// neither heap buffer nor direct buffer
while (source.hasRemaining()) {
put(offset++, source.get());
}
}
}
@Override
public void pointTo(byte[] buffer, Object owner) {
super.pointTo(buffer, owner);
this.offHeapBuffer = null;
}
@Override
public void pointTo(byte[] buffer) {
pointTo(buffer, null);
}
@Override
public MemorySegment cloneReference() {
if (offHeapBuffer != null) {
return new HybridMemorySegment(offHeapBuffer, owner);
}
return new HybridMemorySegment(heapMemory, owner);
}
// --------------------------------------------------------------------------------------------
// Utilities for native memory accesses and checks
// --------------------------------------------------------------------------------------------
/**
* The reflection fields with which we access the off-heap pointer from direct ByteBuffers.
*/
private static final Field ADDRESS_FIELD;
static {
try {
ADDRESS_FIELD = java.nio.Buffer.class.getDeclaredField("address");
ADDRESS_FIELD.setAccessible(true);
}
catch (Throwable t) {
throw new RuntimeException(
"Cannot initialize HybridMemorySegment: off-heap memory is incompatible with this JVM.", t);
}
}
private static long getAddress(ByteBuffer buffer) {
if (buffer == null) {
throw new NullPointerException("buffer is null");
}
try {
return (Long) ADDRESS_FIELD.get(buffer);
}
catch (Throwable t) {
throw new RuntimeException("Could not access direct byte buffer address.", t);
}
}
private static long checkBufferAndGetAddress(ByteBuffer buffer) {
if (buffer == null) {
throw new NullPointerException("buffer is null");
}
if (!buffer.isDirect()) {
throw new IllegalArgumentException("Can't initialize from non-direct ByteBuffer.");
}
return getAddress(buffer);
}
}