blob: 83680b785a2669ebfda1969feac6b5b2bdaabc00 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.core.memory;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.TaskManagerExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
import static org.apache.flink.util.Preconditions.checkArgument;
/** A factory for memory segments ({@link MemorySegment}). */
@Internal
public final class MemorySegmentFactory {
private static final Logger LOG = LoggerFactory.getLogger(MemorySegmentFactory.class);
private static final Runnable NO_OP = () -> {};
/**
* Creates a new memory segment that targets the given heap memory region.
*
* <p>This method should be used to turn short lived byte arrays into memory segments.
*
* @param buffer The heap memory region.
* @return A new memory segment that targets the given heap memory region.
*/
public static MemorySegment wrap(byte[] buffer) {
return new MemorySegment(buffer, null);
}
/**
* Copies the given heap memory region and creates a new memory segment wrapping it.
*
* @param bytes The heap memory region.
* @param start starting position, inclusive
* @param end end position, exclusive
* @return A new memory segment that targets a copy of the given heap memory region.
* @throws IllegalArgumentException if start > end or end > bytes.length
*/
public static MemorySegment wrapCopy(byte[] bytes, int start, int end)
throws IllegalArgumentException {
checkArgument(end >= start);
checkArgument(end <= bytes.length);
MemorySegment copy = allocateUnpooledSegment(end - start);
copy.put(0, bytes, start, copy.size());
return copy;
}
/**
* Wraps the four bytes representing the given number with a {@link MemorySegment}.
*
* @see ByteBuffer#putInt(int)
*/
public static MemorySegment wrapInt(int value) {
return wrap(ByteBuffer.allocate(Integer.BYTES).putInt(value).array());
}
/**
* Allocates some unpooled memory and creates a new memory segment that represents that memory.
*
* <p>This method is similar to {@link #allocateUnpooledSegment(int, Object)}, but the memory
* segment will have null as the owner.
*
* @param size The size of the memory segment to allocate.
* @return A new memory segment, backed by unpooled heap memory.
*/
public static MemorySegment allocateUnpooledSegment(int size) {
return allocateUnpooledSegment(size, null);
}
/**
* Allocates some unpooled memory and creates a new memory segment that represents that memory.
*
* <p>This method is similar to {@link #allocateUnpooledSegment(int)}, but additionally sets the
* owner of the memory segment.
*
* @param size The size of the memory segment to allocate.
* @param owner The owner to associate with the memory segment.
* @return A new memory segment, backed by unpooled heap memory.
*/
public static MemorySegment allocateUnpooledSegment(int size, Object owner) {
return new MemorySegment(new byte[size], owner);
}
/**
* Allocates some unpooled off-heap memory and creates a new memory segment that represents that
* memory.
*
* @param size The size of the off-heap memory segment to allocate.
* @return A new memory segment, backed by unpooled off-heap memory.
*/
public static MemorySegment allocateUnpooledOffHeapMemory(int size) {
return allocateUnpooledOffHeapMemory(size, null);
}
/**
* Allocates some unpooled off-heap memory and creates a new memory segment that represents that
* memory.
*
* @param size The size of the off-heap memory segment to allocate.
* @param owner The owner to associate with the off-heap memory segment.
* @return A new memory segment, backed by unpooled off-heap memory.
*/
public static MemorySegment allocateUnpooledOffHeapMemory(int size, Object owner) {
ByteBuffer memory = allocateDirectMemory(size);
return new MemorySegment(memory, owner);
}
@VisibleForTesting
public static MemorySegment allocateOffHeapUnsafeMemory(int size) {
return allocateOffHeapUnsafeMemory(size, null, NO_OP);
}
private static ByteBuffer allocateDirectMemory(int size) {
//noinspection ErrorNotRethrown
try {
return ByteBuffer.allocateDirect(size);
} catch (OutOfMemoryError outOfMemoryError) {
// TODO: this error handling can be removed in future,
// once we find a common way to handle OOM errors in netty threads.
// Here we enrich it to propagate better OOM message to the receiver
// if it happens in a netty thread.
TaskManagerExceptionUtils.tryEnrichTaskManagerError(outOfMemoryError);
if (ExceptionUtils.isDirectOutOfMemoryError(outOfMemoryError)) {
LOG.error("Cannot allocate direct memory segment", outOfMemoryError);
}
ExceptionUtils.rethrow(outOfMemoryError);
return null;
}
}
/**
* Allocates an off-heap unsafe memory and creates a new memory segment to represent that
* memory.
*
* <p>Creation of this segment schedules its memory freeing operation when its java wrapping
* object is about to be garbage collected, similar to {@link
* java.nio.DirectByteBuffer#DirectByteBuffer(int)}. The difference is that this memory
* allocation is out of option -XX:MaxDirectMemorySize limitation.
*
* @param size The size of the off-heap unsafe memory segment to allocate.
* @param owner The owner to associate with the off-heap unsafe memory segment.
* @param customCleanupAction A custom action to run upon calling GC cleaner.
* @return A new memory segment, backed by off-heap unsafe memory.
*/
public static MemorySegment allocateOffHeapUnsafeMemory(
int size, Object owner, Runnable customCleanupAction) {
long address = MemoryUtils.allocateUnsafe(size);
ByteBuffer offHeapBuffer = MemoryUtils.wrapUnsafeMemoryWithByteBuffer(address, size);
Runnable cleaner = MemoryUtils.createMemoryCleaner(address, customCleanupAction);
return new MemorySegment(offHeapBuffer, owner, false, cleaner);
}
/**
* Creates a memory segment that wraps the off-heap memory backing the given ByteBuffer. Note
* that the ByteBuffer needs to be a <i>direct ByteBuffer</i>.
*
* <p>This method is intended to be used for components which pool memory and create memory
* segments around long-lived memory regions.
*
* @param memory The byte buffer with the off-heap memory to be represented by the memory
* segment.
* @return A new memory segment representing the given off-heap memory.
*/
public static MemorySegment wrapOffHeapMemory(ByteBuffer memory) {
return new MemorySegment(memory, null);
}
}