blob: 17d748518ffec7d24310a00a1a25199769cf00ee [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.core.memory;
import org.apache.flink.annotation.Internal;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Objects;
/**
* This class represents a piece of heap memory managed by Flink.
* The segment is backed by a byte array and features random put and get methods for the basic types,
* as well as compare and swap methods.
*
* <p>This class specializes byte access and byte copy calls for heap memory, while reusing the
* multi-byte type accesses and cross-segment operations from the MemorySegment.
*
* <p>Note that memory segments should usually not be allocated manually, but rather through the
* {@link MemorySegmentFactory}.
*/
@SuppressWarnings("unused")
@Internal
public final class HeapMemorySegment extends MemorySegment {
/**
* An extra reference to the heap memory, so we can let byte array checks fail by the built-in
* checks automatically without extra checks.
*/
private byte[] memory;
/**
* Creates a new memory segment that represents the data in the given byte array.
* The owner of this memory segment is null.
*
* @param memory The byte array that holds the data.
*/
HeapMemorySegment(byte[] memory) {
this(memory, null);
}
/**
* Creates a new memory segment that represents the data in the given byte array.
* The memory segment references the given owner.
*
* @param memory The byte array that holds the data.
* @param owner The owner referenced by the memory segment.
*/
HeapMemorySegment(byte[] memory, Object owner) {
super(Objects.requireNonNull(memory), owner);
this.memory = memory;
}
// -------------------------------------------------------------------------
// MemorySegment operations
// -------------------------------------------------------------------------
@Override
public void free() {
super.free();
this.memory = null;
}
@Override
public ByteBuffer wrap(int offset, int length) {
try {
return ByteBuffer.wrap(this.memory, offset, length);
}
catch (NullPointerException e) {
throw new IllegalStateException("segment has been freed");
}
}
/**
* Gets the byte array that backs this memory segment.
*
* @return The byte array that backs this memory segment, or null, if the segment has been freed.
*/
public byte[] getArray() {
return this.heapMemory;
}
// ------------------------------------------------------------------------
// Random Access get() and put() methods
// ------------------------------------------------------------------------
@Override
public final byte get(int index) {
return this.memory[index];
}
@Override
public final void put(int index, byte b) {
this.memory[index] = b;
}
@Override
public final void get(int index, byte[] dst) {
get(index, dst, 0, dst.length);
}
@Override
public final void put(int index, byte[] src) {
put(index, src, 0, src.length);
}
@Override
public final void get(int index, byte[] dst, int offset, int length) {
// system arraycopy does the boundary checks anyways, no need to check extra
System.arraycopy(this.memory, index, dst, offset, length);
}
@Override
public final void put(int index, byte[] src, int offset, int length) {
// system arraycopy does the boundary checks anyways, no need to check extra
System.arraycopy(src, offset, this.memory, index, length);
}
@Override
public final boolean getBoolean(int index) {
return this.memory[index] != 0;
}
@Override
public final void putBoolean(int index, boolean value) {
this.memory[index] = (byte) (value ? 1 : 0);
}
// -------------------------------------------------------------------------
// Bulk Read and Write Methods
// -------------------------------------------------------------------------
@Override
public final void get(DataOutput out, int offset, int length) throws IOException {
out.write(this.memory, offset, length);
}
@Override
public final void put(DataInput in, int offset, int length) throws IOException {
in.readFully(this.memory, offset, length);
}
@Override
public final void get(int offset, ByteBuffer target, int numBytes) {
// ByteBuffer performs the boundary checks
target.put(this.memory, offset, numBytes);
}
@Override
public final void put(int offset, ByteBuffer source, int numBytes) {
// ByteBuffer performs the boundary checks
source.get(this.memory, offset, numBytes);
}
@Override
public void pointTo(byte[] buffer, Object owner) {
super.pointTo(buffer, owner);
this.memory = buffer;
}
@Override
public void pointTo(byte[] buffer) {
pointTo(buffer, null);
}
@Override
public MemorySegment cloneReference() {
return new HeapMemorySegment(memory, owner);
}
// -------------------------------------------------------------------------
// Factoring
// -------------------------------------------------------------------------
/**
* A memory segment factory that produces heap memory segments. Note that this factory does not
* support to allocate off-heap memory.
*/
public static final class HeapMemorySegmentFactory {
/**
* Creates a new memory segment that targets the given heap memory region.
*
* @param memory The heap memory region.
* @return A new memory segment that targets the given heap memory region.
*/
public HeapMemorySegment wrap(byte[] memory) {
return new HeapMemorySegment(memory);
}
/**
* Allocates some unpooled memory and creates a new memory segment that represents
* that memory.
*
* @param size The size of the memory segment to allocate.
* @param owner The owner to associate with the memory segment.
* @return A new memory segment, backed by unpooled heap memory.
*/
public HeapMemorySegment allocateUnpooledSegment(int size, Object owner) {
return new HeapMemorySegment(new byte[size], owner);
}
/**
* Creates a memory segment that wraps the given byte array.
*
* <p>This method is intended to be used for components which pool memory and create
* memory segments around long-lived memory regions.
*
* @param memory The heap memory to be represented by the memory segment.
* @param owner The owner to associate with the memory segment.
* @return A new memory segment representing the given heap memory.
*/
public HeapMemorySegment wrapPooledHeapMemory(byte[] memory, Object owner) {
return new HeapMemorySegment(memory, owner);
}
/**
* Prevent external instantiation.
*/
HeapMemorySegmentFactory() {}
}
public static final HeapMemorySegmentFactory FACTORY = new HeapMemorySegmentFactory();
}