blob: 31114376547373403de086b893a88bfac999056c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.nativo.jlibaio;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This class is used as an aggregator for the {@link LibaioFile}.
* <br>
* It holds native data, and it will share a libaio queue that can be used by multiple files.
* <br>
* You need to use the poll methods to read the result of write and read submissions.
* <br>
* You also need to use the special buffer created by {@link LibaioFile} as you need special alignments
* when dealing with O_DIRECT files.
* <br>
* A Single controller can server multiple files. There's no need to create one controller per file.
* <br>
* <a href="https://ext4.wiki.kernel.org/index.php/Clarifying_Direct_IO's_Semantics">Interesting reading for this.</a>
*/
public class LibaioContext<Callback extends SubmitInfo> implements Closeable {
/* Notice: After making changes to the native interface, you have to use mvn install at least once to generate the include file.
This is because the maven compiler plugin is the one generating org_apache_activemq_artemis_native_jlibaio_LibaioContext.h
So that file needs to be updated before Cmake comes along to compile the module. */
private static final Logger logger = LoggerFactory.getLogger(LibaioContext.class);
private static final AtomicLong totalMaxIO = new AtomicLong(0);
/**
* The Native layer will look at this version.
*/
private static final int EXPECTED_NATIVE_VERSION = 11;
private static boolean loaded = false;
private static final AtomicBoolean shuttingDown = new AtomicBoolean(false);
private static final AtomicInteger contexts = new AtomicInteger(0);
public static boolean isLoaded() {
return loaded;
}
private static boolean loadLibrary(final String name) {
try {
logger.debug("Loading {}", name);
System.loadLibrary(name);
if (getNativeVersion() != EXPECTED_NATIVE_VERSION) {
NativeLogger.incompatibleNativeLibrary();
return false;
} else {
return true;
}
} catch (Throwable e) {
logger.debug(name + " -> not possible to load native library", e);
return false;
}
}
static {
String[] libraries = new String[]{"artemis-native-64", "artemis-native-32"};
for (String library : libraries) {
if (loadLibrary(library)) {
loaded = true;
if (System.getProperty("org.apache.activemq.artemis.native.jlibaio.FORCE_SYSCALL") != null) {
LibaioContext.setForceSyscall(true);
}
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
shuttingDown.set(true);
checkShutdown();
}
});
break;
} else {
logger.debug("Library {} not found!", library);
}
}
if (!loaded) {
logger.debug("Couldn't locate LibAIO Wrapper");
}
}
private static void checkShutdown() {
if (contexts.get() == 0 && shuttingDown.get()) {
shutdownHook();
}
}
private static native void shutdownHook();
public static native void setForceSyscall(boolean value);
/** The system may choose to set this if a failing condition happened inside the code. */
public static native boolean isForceSyscall();
/**
* This is used to validate leaks on tests.
*
* @return the number of allocated aio, to be used on test checks.
*/
public static long getTotalMaxIO() {
return totalMaxIO.get();
}
/**
* It will reset all the positions on the buffer to 0, using memset.
*
* @param buffer a native buffer.
* s
*/
public void memsetBuffer(ByteBuffer buffer) {
memsetBuffer(buffer, buffer.limit());
}
/**
* This is used on tests validating for leaks.
*/
public static void resetMaxAIO() {
totalMaxIO.set(0);
}
/**
* the native ioContext including the structure created.
*/
private final ByteBuffer ioContext;
private final AtomicBoolean closed = new AtomicBoolean(false);
final Semaphore ioSpace;
final int queueSize;
final boolean useFdatasync;
/**
* The queue size here will use resources defined on the kernel parameter
* <a href="https://www.kernel.org/doc/Documentation/sysctl/fs.txt">fs.aio-max-nr</a> .
*
* @param queueSize the size to be initialize on libaio
* io_queue_init which can't be higher than /proc/sys/fs/aio-max-nr.
* @param useSemaphore should block on a semaphore avoiding using more submits than what's available.
* @param useFdatasync should use fdatasync before calling callbacks.
*/
public LibaioContext(int queueSize, boolean useSemaphore, boolean useFdatasync) {
try {
contexts.incrementAndGet();
this.ioContext = newContext(queueSize);
this.useFdatasync = useFdatasync;
} catch (Exception e) {
throw e;
}
this.queueSize = queueSize;
totalMaxIO.addAndGet(queueSize);
if (useSemaphore) {
this.ioSpace = new Semaphore(queueSize);
} else {
this.ioSpace = null;
}
}
/**
* Documented at {@link LibaioFile#write(long, int, java.nio.ByteBuffer, SubmitInfo)}
*
* @param fd the file descriptor
* @param position the write position
* @param size number of bytes to use
* @param bufferWrite the native buffer
* @param callback a callback
* @throws IOException in case of error
*/
public void submitWrite(int fd,
long position,
int size,
ByteBuffer bufferWrite,
Callback callback) throws IOException {
if (closed.get()) {
throw new IOException("Libaio Context is closed!");
}
try {
if (ioSpace != null) {
ioSpace.acquire();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException(e.getMessage(), e);
}
submitWrite(fd, this.ioContext, position, size, bufferWrite, callback);
}
public void submitRead(int fd,
long position,
int size,
ByteBuffer bufferWrite,
Callback callback) throws IOException {
if (closed.get()) {
throw new IOException("Libaio Context is closed!");
}
try {
if (ioSpace != null) {
ioSpace.acquire();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException(e.getMessage(), e);
}
submitRead(fd, this.ioContext, position, size, bufferWrite, callback);
}
/**
* This is used to close the libaio queues and cleanup the native data used.
* <br>
* It is unsafe to close the controller while you have pending writes or files open as
* this could cause core dumps or VM crashes.
*/
@Override
public void close() {
if (!closed.getAndSet(true)) {
if (ioSpace != null) {
try {
ioSpace.tryAcquire(queueSize, 10, TimeUnit.SECONDS);
} catch (Exception e) {
logger.warn(e.getMessage(), e);
}
}
totalMaxIO.addAndGet(-queueSize);
if (ioContext != null) {
deleteContext(ioContext);
}
contexts.decrementAndGet();
checkShutdown();
}
}
/**
* It will open a file. If you set the direct flag = false then you won't need to use the special buffer.
* Notice: This will create an empty file if the file doesn't already exist.
*
* @param file the file to be open.
* @param direct will set ODIRECT.
* @return It will return a LibaioFile instance.
* @throws IOException in case of error.
*/
public LibaioFile<Callback> openFile(File file, boolean direct) throws IOException {
return openFile(file.getPath(), direct);
}
/**
* It will open a file. If you set the direct flag = false then you won't need to use the special buffer.
* Notice: This will create an empty file if the file doesn't already exist.
*
* @param file the file to be open.
* @param direct should use O_DIRECT when opening the file.
* @return a new open file.
* @throws IOException in case of error.
*/
public LibaioFile<Callback> openFile(String file, boolean direct) throws IOException {
checkNotNull(file, "path");
checkNotNull(ioContext, "IOContext");
// note: the native layer will throw an IOException in case of errors
int res = LibaioContext.open(file, direct);
return new LibaioFile<>(res, this);
}
/**
* It will open a file disassociated with any sort of factory.
* This is useful when you won't use reading / writing through libaio like locking files.
*
* @param file a file name
* @param direct will use O_DIRECT
* @return a new file
* @throws IOException in case of error.
*/
public static LibaioFile openControlFile(String file, boolean direct) throws IOException {
checkNotNull(file, "path");
// note: the native layer will throw an IOException in case of errors
int res = LibaioContext.open(file, direct);
return new LibaioFile<>(res, null);
}
/**
* Checks that the given argument is not null. If it is, throws {@link NullPointerException}.
* Otherwise, returns the argument.
*/
private static <T> T checkNotNull(T arg, String text) {
if (arg == null) {
throw new NullPointerException(text);
}
return arg;
}
/**
* It will poll the libaio queue for results. It should block until min is reached
* Results are placed on the callback.
* <br>
* This shouldn't be called concurrently. You should provide your own synchronization if you need more than one
* Thread polling for any reason.
* <br>
* Notice that the native layer will invoke {@link SubmitInfo#onError(int, String)} in case of failures,
* but it won't call done method for you.
*
* @param callbacks area to receive the callbacks passed on submission.The size of this callback has to
* be greater than the parameter max.
* @param min the minimum number of elements to receive. It will block until this is achieved.
* @param max The maximum number of elements to receive.
* @return Number of callbacks returned.
* @see LibaioFile#write(long, int, java.nio.ByteBuffer, SubmitInfo)
* @see LibaioFile#read(long, int, java.nio.ByteBuffer, SubmitInfo)
*/
public int poll(Callback[] callbacks, int min, int max) {
int released = poll(ioContext, callbacks, min, max);
if (ioSpace != null) {
if (released > 0) {
ioSpace.release(released);
}
}
return released;
}
/**
* It will start polling and will keep doing until the context is closed.
* This will call callbacks on {@link SubmitInfo#onError(int, String)} and
* {@link SubmitInfo#done()}.
* In case of error, both {@link SubmitInfo#onError(int, String)} and
* {@link SubmitInfo#done()} are called.
*/
public void poll() {
if (!closed.get()) {
blockedPoll(ioContext, useFdatasync);
}
}
/**
* Called from the native layer
*/
private void done(SubmitInfo info) {
info.done();
if (ioSpace != null) {
ioSpace.release();
}
}
/**
* This is the queue for libaio, initialized with queueSize.
*/
private native ByteBuffer newContext(int queueSize);
/**
* Internal method to be used when closing the controller.
*/
private native void deleteContext(ByteBuffer buffer);
/**
* it will return a file descriptor.
*
* @param path the file name.
* @param direct translates as O_DIRECT On open
* @return a fd from open C call.
*/
public static native int open(String path, boolean direct);
public static native void close(int fd);
/**
*/
/**
* Buffers for O_DIRECT need to use posix_memalign.
* <br>
* Documented at {@link LibaioFile#newBuffer(int)}.
*
* @param size needs to be % alignment
* @param alignment the alignment used at the dispositive
* @return a new native buffer used with posix_memalign
*/
public static native ByteBuffer newAlignedBuffer(int size, int alignment);
/**
* This will call posix free to release the inner buffer allocated at {@link #newAlignedBuffer(int, int)}.
*
* @param buffer a native buffer allocated with {@link #newAlignedBuffer(int, int)}.
*/
public static native void freeBuffer(ByteBuffer buffer);
/**
* Documented at {@link LibaioFile#write(long, int, java.nio.ByteBuffer, SubmitInfo)}.
*/
native void submitWrite(int fd,
ByteBuffer libaioContext,
long position,
int size,
ByteBuffer bufferWrite,
Callback callback) throws IOException;
/**
* Documented at {@link LibaioFile#read(long, int, java.nio.ByteBuffer, SubmitInfo)}.
*/
native void submitRead(int fd,
ByteBuffer libaioContext,
long position,
int size,
ByteBuffer bufferWrite,
Callback callback) throws IOException;
/**
* Note: this shouldn't be done concurrently.
* This method will block until the min condition is satisfied on the poll.
* <p/>
* The callbacks will include the original callback sent at submit (read or write).
*/
native int poll(ByteBuffer libaioContext, Callback[] callbacks, int min, int max);
/**
* This method will block as long as the context is open.
*/
native void blockedPoll(ByteBuffer libaioContext, boolean useFdatasync);
static native int getNativeVersion();
public static native boolean lock(int fd);
public static native void memsetBuffer(ByteBuffer buffer, int size);
static native long getSize(int fd);
static native int getBlockSizeFD(int fd);
public static int getBlockSize(File path) {
return getBlockSize(path.getAbsolutePath());
}
public static native int getBlockSize(String path);
static native void fallocate(int fd, long size);
static native void fill(int fd, int alignment, long size);
static native void writeInternal(int fd, long position, long size, ByteBuffer bufferWrite) throws IOException;
}