| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| #ifndef _GNU_SOURCE |
| // libaio, O_DIRECT and other things won't be available without this define |
| #define _GNU_SOURCE |
| #endif |
| |
| //#define DEBUG |
| |
| #include <jni.h> |
| #include <unistd.h> |
| #include <errno.h> |
| #include <libaio.h> |
| #include <sys/types.h> |
| #include <sys/file.h> |
| #include <sys/stat.h> |
| #include <fcntl.h> |
| #include <stdlib.h> |
| #include <pthread.h> |
| #include <limits.h> |
| #include "org_apache_activemq_artemis_nativo_jlibaio_LibaioContext.h" |
| #include "exception_helper.h" |
| |
| //x86 has a strong memory model and there is no need of HW fences if just Write-Back (WB) memory is used |
| #define mem_barrier() __asm__ __volatile__ ("":::"memory") |
| #define read_barrier() __asm__ __volatile__("":::"memory") |
| #define store_barrier() __asm__ __volatile__("":::"memory") |
| |
| struct io_control { |
| io_context_t ioContext; |
| struct io_event * events; |
| |
| jobject thisObject; |
| |
| // This is used to make sure we don't return IOCB while something else is using them |
| // this is to guarantee the submits could be done concurrently with polling |
| pthread_mutex_t iocbLock; |
| |
| pthread_mutex_t pollLock; |
| |
| // a reusable pool of iocb |
| struct iocb ** iocb; |
| int queueSize; |
| int iocbPut; |
| int iocbGet; |
| int used; |
| |
| }; |
| |
| //These should be used to check if the user-space io_getevents is supported: |
| //Linux ABI for the ring buffer: https://elixir.bootlin.com/linux/v4.20.13/source/fs/aio.c#L54 |
| //aio_read_events_ring: https://elixir.bootlin.com/linux/v4.20.13/source/fs/aio.c#L1148 |
| |
| // NOTE: if the kernel ever updates the structure, the RING-MAGIC will change and the code will switch back to normal IO calls |
| #define AIO_RING_MAGIC 0xa10a10a1 |
| #define AIO_RING_INCOMPAT_FEATURES 0 |
| |
| // set this to 0 if you want to stop using ring reaping |
| #define RING_REAPER 1 |
| |
| jboolean forceSysCall = JNI_FALSE; |
| |
| /* |
| * Class: org_apache_activemq_artemis_nativo_jlibaio_LibaioContext |
| * Method: setForceSyscall |
| * Signature: (Z)V |
| */ |
| JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_nativo_jlibaio_LibaioContext_setForceSyscall |
| (JNIEnv * env, jclass clazz, jboolean _forceSysCall) { |
| forceSysCall = _forceSysCall; |
| } |
| |
| /* |
| * Class: org_apache_activemq_artemis_nativo_jlibaio_LibaioContext |
| * Method: isForceSyscall |
| * Signature: ()Z |
| */ |
| JNIEXPORT jboolean JNICALL Java_org_apache_activemq_artemis_nativo_jlibaio_LibaioContext_isForceSyscall |
| (JNIEnv * env, jclass clazz) { |
| return forceSysCall | !RING_REAPER; |
| } |
| |
| |
| /** There is no defined aio_ring anywhere in an include, |
| This is an implementation detail, that is a binary contract. |
| it is safe to use the feature though. */ |
| struct aio_ring { |
| unsigned id; /* kernel internal index number */ |
| unsigned nr; /* number of io_events */ |
| unsigned head; |
| unsigned tail; |
| |
| unsigned magic; |
| unsigned compat_features; |
| unsigned incompat_features; |
| unsigned header_length; /* size of aio_ring */ |
| |
| |
| struct io_event io_events[0]; |
| }; /* 128 bytes + ring size */ |
| |
| // Check if the implementation supports AIO_RING by checking this number directly. |
| static inline int has_usable_ring(struct aio_ring *ring) { |
| return ring->magic == AIO_RING_MAGIC && ring->incompat_features == AIO_RING_INCOMPAT_FEATURES; |
| } |
| |
| // Newer versions of the kernel (newer here being a relative word, a couple years already at the time |
| // I am writing this), will have io_context_t as an opaque type, and the real type being the aio_ring. |
| static inline struct aio_ring* to_aio_ring(io_context_t aio_ctx) { |
| return (struct aio_ring*) aio_ctx; |
| } |
| |
| //It implements a user space batch read io events implementation that attempts to read io avoiding any sys calls |
| // This implementation will look at the internal structure (aio_ring) and move along the memory result |
| static int ringio_get_events(io_context_t aio_ctx, long min_nr, long max, |
| struct io_event *events, struct timespec *timeout) { |
| struct aio_ring *ring = to_aio_ring(aio_ctx); |
| //checks if it could be completed in user space, saving a sys call |
| if (RING_REAPER && !forceSysCall && has_usable_ring(ring)) { |
| const unsigned ring_nr = ring->nr; |
| // We're assuming to be the exclusive writer to head, so we just need a compiler barrier |
| unsigned head = ring->head; |
| mem_barrier(); |
| const unsigned tail = ring->tail; |
| int available = tail - head; |
| if (available < 0) { |
| //a wrap has occurred |
| available += ring_nr; |
| } |
| #ifdef DEBUG |
| fprintf(stdout, "tail = %d head= %d nr = %d available = %d\n", tail, head, ring_nr, available); |
| #endif |
| if ((available >= min_nr) || (timeout && timeout->tv_sec == 0 && timeout->tv_nsec == 0)) { |
| if (!available) { |
| return 0; |
| } |
| |
| if (available >= max) { |
| short retryTail = 0; |
| // we first wait for 20 iterations, to see if the tail moved |
| for (retryTail = 0; retryTail < 20 && ring->tail == tail; retryTail++) { |
| mem_barrier(); |
| } |
| // This is to trap a possible bug from the kernel: |
| // https://bugzilla.redhat.com/show_bug.cgi?id=1845326 |
| // https://issues.apache.org/jira/browse/ARTEMIS-2800 |
| // |
| // On the race available would eventually be >= max, while ring->tail was invalid |
| // we could work around by waiting ring-tail to change: |
| // while (ring->tail == tail) mem_barrier(); |
| // |
| // however eventually we could have available==max in a legal situation what could lead to infinite loop here |
| if (retryTail == 20) { |
| // if the tail didn't move, we will then perform a regular syscall |
| return io_getevents(aio_ctx, min_nr, max, events, timeout); |
| } |
| |
| // also: I could have called io_getevents to the one at the end of this method |
| // but I really hate goto, so I would rather have a duplicate code here |
| // and I did not want to create another memory flag to stop the rest of the code |
| } |
| |
| //the kernel has written ring->tail from an interrupt: |
| //we need to load acquire the completed events here |
| read_barrier(); |
| const int available_nr = available < max? available : max; |
| //if isn't needed to wrap we can avoid % operations that are quite expansive |
| const int needMod = ((head + available_nr) >= ring_nr) ? 1 : 0; |
| for (int i = 0; i<available_nr; i++) { |
| events[i] = ring->io_events[head]; |
| if (needMod == 1) { |
| head = (head + 1) % ring_nr; |
| } else { |
| head = (head + 1); |
| } |
| } |
| //it allow the kernel to build its own view of the ring buffer size |
| //and push new events if there are any |
| store_barrier(); |
| ring->head = head; |
| #ifdef DEBUG |
| fprintf(stdout, "consumed non sys-call = %d\n", available_nr); |
| #endif |
| return available_nr; |
| } |
| } else { |
| #ifdef DEBUG |
| fprintf(stdout, "The kernel is not supoprting the ring buffer any longer\n"); |
| #endif |
| } |
| // if this next line ever needs to be changed, beware of a duplicate code on this method |
| // I explain why I duplicated the call instead of reuse it there ^^^^ |
| int sys_call_events = io_getevents(aio_ctx, min_nr, max, events, timeout); |
| #ifdef DEBUG |
| fprintf(stdout, "consumed sys-call = %d\n", sys_call_events); |
| #endif |
| return sys_call_events; |
| } |
| |
| // We need a fast and reliable way to stop the blocked poller |
| // for that we need a dumb file, |
| // We are using a temporary file for this. |
| int dumbWriteHandler = 0; |
| char dumbPath[PATH_MAX]; |
| |
| #define ONE_MEGA 1048576l |
| void * oneMegaBuffer = 0; |
| pthread_mutex_t oneMegaMutex; |
| |
| |
| jclass submitClass = NULL; |
| jmethodID errorMethod = NULL; |
| jmethodID doneMethod = NULL; |
| jmethodID libaioContextDone = NULL; |
| |
| jclass libaioContextClass = NULL; |
| jclass runtimeExceptionClass = NULL; |
| jclass ioExceptionClass = NULL; |
| |
| // util methods |
| void throwRuntimeException(JNIEnv* env, char* message) { |
| (*env)->ThrowNew(env, runtimeExceptionClass, message); |
| } |
| |
| void throwRuntimeExceptionErrorNo(JNIEnv* env, char* message, int errorNumber) { |
| char* allocatedMessage = exceptionMessage(message, errorNumber); |
| (*env)->ThrowNew(env, runtimeExceptionClass, allocatedMessage); |
| free(allocatedMessage); |
| } |
| |
| void throwIOException(JNIEnv* env, char* message) { |
| (*env)->ThrowNew(env, ioExceptionClass, message); |
| } |
| |
| void throwIOExceptionErrorNo(JNIEnv* env, char* message, int errorNumber) { |
| char* allocatedMessage = exceptionMessage(message, errorNumber); |
| (*env)->ThrowNew(env, ioExceptionClass, allocatedMessage); |
| free(allocatedMessage); |
| } |
| |
| void throwOutOfMemoryError(JNIEnv* env) { |
| jclass exceptionClass = (*env)->FindClass(env, "java/lang/OutOfMemoryError"); |
| (*env)->ThrowNew(env, exceptionClass, ""); |
| } |
| |
| /** Notice: every usage of exceptionMessage needs to release the allocated memory for the sequence of char */ |
| char* exceptionMessage(char* msg, int error) { |
| if (error < 0) { |
| // some functions return negative values |
| // and it's hard to keep track of when to send -error and when not |
| // this will just take care when things are forgotten |
| // what would generate a proper error |
| error = error * -1; |
| } |
| //strerror is returning a constant, so no need to free anything coming from strerror |
| char *result = NULL; |
| |
| if (asprintf(&result, "%s%s", msg, strerror(error)) == -1) { |
| fprintf(stderr, "Could not allocate enough memory for the error message: %s/%s\n", msg, strerror(error)); |
| } |
| |
| return result; |
| } |
| |
| static inline short verifyBuffer(int alignment) { |
| pthread_mutex_lock(&oneMegaMutex); |
| |
| if (oneMegaBuffer == 0) { |
| #ifdef DEBUG |
| fprintf (stdout, "oneMegaBuffer %ld\n", (long) oneMegaBuffer); |
| #endif |
| if (posix_memalign(&oneMegaBuffer, alignment, ONE_MEGA) != 0) { |
| fprintf(stderr, "Could not allocate the 1 Mega Buffer for initializing files\n"); |
| pthread_mutex_unlock(&oneMegaMutex); |
| return -1; |
| } |
| memset(oneMegaBuffer, 0, ONE_MEGA); |
| } |
| |
| pthread_mutex_unlock(&oneMegaMutex); |
| |
| return 0; |
| |
| } |
| |
| |
| jint JNI_OnLoad(JavaVM* vm, void* reserved) { |
| JNIEnv* env; |
| if ((*vm)->GetEnv(vm, (void**) &env, JNI_VERSION_1_6) != JNI_OK) { |
| return JNI_ERR; |
| } else { |
| |
| int res = pthread_mutex_init(&oneMegaMutex, 0); |
| if (res) { |
| fprintf(stderr, "could not initialize mutex on on_load, %d", res); |
| return JNI_ERR; |
| } |
| sprintf (dumbPath, "%s/artemisJLHandler_XXXXXX", P_tmpdir); |
| dumbWriteHandler = mkstemp (dumbPath); |
| |
| #ifdef DEBUG |
| fprintf (stdout, "Creating temp file %s for dumb writes\n", dumbPath); |
| fflush(stdout); |
| #endif |
| |
| if (dumbWriteHandler < 0) { |
| fprintf (stderr, "couldn't create stop file handler %s\n", dumbPath); |
| return JNI_ERR; |
| } |
| |
| // |
| // Accordingly to previous experiences we must hold Global Refs on Classes |
| // And |
| // |
| // Accordingly to IBM recommendations here: |
| // We don't need to hold a global reference on methods: |
| // http://www.ibm.com/developerworks/java/library/j-jni/index.html#notc |
| // Which actually caused core dumps |
| |
| jclass localRuntimeExceptionClass = (*env)->FindClass(env, "java/lang/RuntimeException"); |
| if (localRuntimeExceptionClass == NULL) { |
| // pending exception... |
| return JNI_ERR; |
| } |
| runtimeExceptionClass = (jclass) (*env)->NewGlobalRef(env, localRuntimeExceptionClass); |
| if (runtimeExceptionClass == NULL) { |
| // out-of-memory! |
| throwOutOfMemoryError(env); |
| return JNI_ERR; |
| } |
| |
| jclass localIoExceptionClass = (*env)->FindClass(env, "java/io/IOException"); |
| if (localIoExceptionClass == NULL) { |
| // pending exception... |
| return JNI_ERR; |
| } |
| ioExceptionClass = (jclass) (*env)->NewGlobalRef(env, localIoExceptionClass); |
| if (ioExceptionClass == NULL) { |
| // out-of-memory! |
| throwOutOfMemoryError(env); |
| return JNI_ERR; |
| } |
| |
| submitClass = (*env)->FindClass(env, "org/apache/activemq/artemis/nativo/jlibaio/SubmitInfo"); |
| if (submitClass == NULL) { |
| return JNI_ERR; |
| } |
| |
| submitClass = (jclass)(*env)->NewGlobalRef(env, (jobject)submitClass); |
| |
| errorMethod = (*env)->GetMethodID(env, submitClass, "onError", "(ILjava/lang/String;)V"); |
| if (errorMethod == NULL) { |
| return JNI_ERR; |
| } |
| |
| doneMethod = (*env)->GetMethodID(env, submitClass, "done", "()V"); |
| if (doneMethod == NULL) { |
| return JNI_ERR; |
| } |
| |
| libaioContextClass = (*env)->FindClass(env, "org/apache/activemq/artemis/nativo/jlibaio/LibaioContext"); |
| if (libaioContextClass == NULL) { |
| return JNI_ERR; |
| } |
| libaioContextClass = (jclass)(*env)->NewGlobalRef(env, (jobject)libaioContextClass); |
| |
| libaioContextDone = (*env)->GetMethodID(env, libaioContextClass, "done", "(Lorg/apache/activemq/artemis/nativo/jlibaio/SubmitInfo;)V"); |
| if (libaioContextDone == NULL) { |
| return JNI_ERR; |
| } |
| |
| return JNI_VERSION_1_6; |
| } |
| } |
| |
| static inline void closeDumbHandlers() { |
| if (dumbWriteHandler != 0) { |
| #ifdef DEBUG |
| fprintf (stdout, "Closing and removing dump handler %s\n", dumbPath); |
| #endif |
| dumbWriteHandler = 0; |
| close(dumbWriteHandler); |
| unlink(dumbPath); |
| } |
| } |
| |
| void JNI_OnUnload(JavaVM* vm, void* reserved) { |
| JNIEnv* env; |
| if ((*vm)->GetEnv(vm, (void**) &env, JNI_VERSION_1_6) != JNI_OK) { |
| // Something is wrong but nothing we can do about this :( |
| return; |
| } else { |
| closeDumbHandlers(); |
| |
| if (oneMegaBuffer != 0) { |
| free(oneMegaBuffer); |
| oneMegaBuffer = 0; |
| } |
| |
| pthread_mutex_destroy(&oneMegaMutex); |
| |
| // delete global references so the GC can collect them |
| if (runtimeExceptionClass != NULL) { |
| (*env)->DeleteGlobalRef(env, runtimeExceptionClass); |
| } |
| if (ioExceptionClass != NULL) { |
| (*env)->DeleteGlobalRef(env, ioExceptionClass); |
| } |
| |
| if (submitClass != NULL) { |
| (*env)->DeleteGlobalRef(env, (jobject)submitClass); |
| } |
| |
| if (libaioContextClass != NULL) { |
| (*env)->DeleteGlobalRef(env, (jobject)libaioContextClass); |
| } |
| } |
| } |
| |
| JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_nativo_jlibaio_LibaioContext_shutdownHook |
| (JNIEnv * env, jclass clazz) { |
| closeDumbHandlers(); |
| } |
| |
| |
| static inline struct io_control * getIOControl(JNIEnv* env, jobject pointer) { |
| struct io_control * ioControl = (struct io_control *) (*env)->GetDirectBufferAddress(env, pointer); |
| if (ioControl == NULL) { |
| throwRuntimeException(env, "Controller not initialized"); |
| } |
| return ioControl; |
| } |
| |
| /** |
| * remove an iocb from the pool of IOCBs. Returns null if full |
| */ |
| static inline struct iocb * getIOCB(struct io_control * control) { |
| struct iocb * iocb = 0; |
| |
| pthread_mutex_lock(&(control->iocbLock)); |
| |
| #ifdef DEBUG |
| fprintf (stdout, "getIOCB::used=%d, queueSize=%d, get=%d, put=%d\n", control->used, control->queueSize, control->iocbGet, control->iocbPut); |
| #endif |
| |
| if (control->used < control->queueSize) { |
| control->used++; |
| iocb = control->iocb[control->iocbGet++]; |
| |
| if (control->iocbGet >= control->queueSize) { |
| control->iocbGet = 0; |
| } |
| } |
| |
| pthread_mutex_unlock(&(control->iocbLock)); |
| return iocb; |
| } |
| |
| /** |
| * Put an iocb back on the pool of IOCBs |
| */ |
| static inline void putIOCB(struct io_control * control, struct iocb * iocbBack) { |
| pthread_mutex_lock(&(control->iocbLock)); |
| |
| #ifdef DEBUG |
| fprintf (stdout, "putIOCB::used=%d, queueSize=%d, get=%d, put=%d\n", control->used, control->queueSize, control->iocbGet, control->iocbPut); |
| #endif |
| |
| control->used--; |
| control->iocb[control->iocbPut++] = iocbBack; |
| if (control->iocbPut >= control->queueSize) { |
| control->iocbPut = 0; |
| } |
| pthread_mutex_unlock(&(control->iocbLock)); |
| } |
| |
| static inline short submit(JNIEnv * env, struct io_control * theControl, struct iocb * iocb) { |
| int result = io_submit(theControl->ioContext, 1, &iocb); |
| |
| if (result < 0) { |
| // Putting the Global Ref and IOCB back in case of a failure |
| if (iocb->data != NULL && iocb->data != (void *) -1) { |
| (*env)->DeleteGlobalRef(env, (jobject)iocb->data); |
| } |
| putIOCB(theControl, iocb); |
| |
| throwIOExceptionErrorNo(env, "Error while submitting IO: ", -result); |
| return 0; |
| } |
| |
| return 1; |
| } |
| |
| static inline void * getBuffer(JNIEnv* env, jobject pointer) { |
| return (*env)->GetDirectBufferAddress(env, pointer); |
| } |
| |
| JNIEXPORT jboolean JNICALL Java_org_apache_activemq_artemis_nativo_jlibaio_LibaioContext_lock |
| (JNIEnv * env, jclass clazz, jint handle) { |
| return flock(handle, LOCK_EX | LOCK_NB) == 0; |
| } |
| |
| |
| /** |
| * Destroys the individual members of the IOCB pool |
| * @param theControl the IO Control structure containing an IOCB pool |
| * @param upperBound the number of elements contained within the pool |
| */ |
| static inline void iocb_destroy_members(struct io_control * theControl, int upperBound) { |
| for (int i = 0; i < upperBound; i++) { |
| free(theControl->iocb[i]); |
| } |
| } |
| |
| |
| /** |
| * Destroys an IOCB pool and its members up to a certain limit. Should be used when the IOCB |
| * pool fails to initialize completely |
| * @param theControl the IO Control structure containing an IOCB pool |
| * @param upperBound the number of elements contained within the pool |
| */ |
| static inline void iocb_destroy_bounded(struct io_control * theControl, int upperBound) { |
| iocb_destroy_members(theControl, upperBound); |
| free(theControl->iocb); |
| } |
| |
| |
| /** |
| * Destroys an IOCB pool and all its members |
| * @param theControl |
| */ |
| static inline void iocb_destroy(struct io_control * theControl) { |
| iocb_destroy_bounded(theControl, theControl->queueSize); |
| } |
| |
| /** |
| * Everything that is allocated here will be freed at deleteContext when the class is unloaded. |
| */ |
| JNIEXPORT jobject JNICALL Java_org_apache_activemq_artemis_nativo_jlibaio_LibaioContext_newContext(JNIEnv* env, jobject thisObject, jint queueSize) { |
| int i = 0; |
| |
| #ifdef DEBUG |
| fprintf (stdout, "Initializing context\n"); |
| #endif |
| |
| struct io_control * theControl = (struct io_control *) malloc(sizeof(struct io_control)); |
| if (theControl == NULL) { |
| throwOutOfMemoryError(env); |
| return NULL; |
| } |
| |
| int res = io_queue_init(queueSize, &theControl->ioContext); |
| if (res) { |
| // Error, so need to release whatever was done before |
| io_queue_release(theControl->ioContext); |
| free(theControl); |
| |
| throwRuntimeExceptionErrorNo(env, "Cannot initialize queue:", res); |
| return NULL; |
| } |
| |
| theControl->iocb = (struct iocb **)malloc((sizeof(struct iocb *) * (size_t)queueSize)); |
| if (theControl->iocb == NULL) { |
| io_queue_release(theControl->ioContext); |
| free(theControl); |
| |
| throwOutOfMemoryError(env); |
| return NULL; |
| } |
| |
| for (i = 0; i < queueSize; i++) { |
| theControl->iocb[i] = (struct iocb *)malloc(sizeof(struct iocb)); |
| if (theControl->iocb[i] == NULL) { |
| |
| // It may not have been fully initialized, therefore limit the cleanup up to 'i' members. |
| iocb_destroy_bounded(theControl, i); |
| |
| io_queue_release(theControl->ioContext); |
| free(theControl); |
| |
| throwOutOfMemoryError(env); |
| return NULL; |
| } |
| } |
| theControl->queueSize = queueSize; |
| |
| |
| res = pthread_mutex_init(&(theControl->iocbLock), 0); |
| if (res) { |
| iocb_destroy(theControl); |
| |
| io_queue_release(theControl->ioContext); |
| free(theControl); |
| |
| throwRuntimeExceptionErrorNo(env, "Can't initialize mutext:", res); |
| return NULL; |
| } |
| |
| res = pthread_mutex_init(&(theControl->pollLock), 0); |
| if (res) { |
| iocb_destroy(theControl); |
| |
| io_queue_release(theControl->ioContext); |
| free(theControl); |
| |
| throwRuntimeExceptionErrorNo(env, "Can't initialize mutext:", res); |
| return NULL; |
| } |
| |
| theControl->events = (struct io_event *)malloc(sizeof(struct io_event) * (size_t)queueSize); |
| if (theControl->events == NULL) { |
| iocb_destroy(theControl); |
| |
| io_queue_release(theControl->ioContext); |
| free(theControl); |
| |
| throwRuntimeExceptionErrorNo(env, "Can't initialize mutext (not enough memory for the events member): ", res); |
| return NULL; |
| } |
| |
| |
| theControl->iocbPut = 0; |
| theControl->iocbGet = 0; |
| theControl->used = 0; |
| theControl->thisObject = (*env)->NewGlobalRef(env, thisObject); |
| |
| return (*env)->NewDirectByteBuffer(env, theControl, sizeof(struct io_control)); |
| } |
| |
| JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_nativo_jlibaio_LibaioContext_deleteContext(JNIEnv* env, jclass clazz, jobject contextPointer) { |
| int i; |
| struct io_control * theControl = getIOControl(env, contextPointer); |
| if (theControl == NULL) { |
| return; |
| } |
| |
| struct iocb * iocb = getIOCB(theControl); |
| |
| if (iocb == NULL) { |
| throwIOException(env, "Not enough space in libaio queue"); |
| return; |
| } |
| |
| // Submitting a dumb write so the loop finishes |
| io_prep_pwrite(iocb, dumbWriteHandler, 0, 0, 0); |
| iocb->data = (void *) -1; |
| if (!submit(env, theControl, iocb)) { |
| return; |
| } |
| |
| // to make sure the poll has finished |
| pthread_mutex_lock(&(theControl->pollLock)); |
| pthread_mutex_unlock(&(theControl->pollLock)); |
| |
| // To return any pending IOCBs |
| int result = ringio_get_events(theControl->ioContext, 0, 1, theControl->events, 0); |
| for (i = 0; i < result; i++) { |
| struct io_event * event = &(theControl->events[i]); |
| struct iocb * iocbp = event->obj; |
| putIOCB(theControl, iocbp); |
| } |
| |
| io_queue_release(theControl->ioContext); |
| |
| pthread_mutex_destroy(&(theControl->pollLock)); |
| pthread_mutex_destroy(&(theControl->iocbLock)); |
| |
| iocb_destroy(theControl); |
| |
| (*env)->DeleteGlobalRef(env, theControl->thisObject); |
| |
| free(theControl->events); |
| free(theControl); |
| } |
| |
| JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_nativo_jlibaio_LibaioContext_close(JNIEnv* env, jclass clazz, jint fd) { |
| if (close(fd) < 0) { |
| throwIOExceptionErrorNo(env, "Error closing file:", errno); |
| } |
| } |
| |
| JNIEXPORT int JNICALL Java_org_apache_activemq_artemis_nativo_jlibaio_LibaioContext_open(JNIEnv* env, jclass clazz, |
| jstring path, jboolean direct) { |
| const char* f_path = (*env)->GetStringUTFChars(env, path, 0); |
| |
| int res; |
| if (direct) { |
| res = open(f_path, O_RDWR | O_CREAT | O_DIRECT, 0666); |
| } else { |
| res = open(f_path, O_RDWR | O_CREAT, 0666); |
| } |
| |
| (*env)->ReleaseStringUTFChars(env, path, f_path); |
| |
| if (res < 0) { |
| throwIOExceptionErrorNo(env, "Cannot open file:", errno); |
| } |
| |
| return res; |
| } |
| |
| JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_nativo_jlibaio_LibaioContext_submitWrite |
| (JNIEnv * env, jclass clazz, jint fileHandle, jobject contextPointer, jlong position, jint size, jobject bufferWrite, jobject callback) { |
| struct io_control * theControl = getIOControl(env, contextPointer); |
| if (theControl == NULL) { |
| return; |
| } |
| |
| #ifdef DEBUG |
| fprintf (stdout, "submitWrite position %ld, size %d\n", position, size); |
| #endif |
| |
| struct iocb * iocb = getIOCB(theControl); |
| |
| if (iocb == NULL) { |
| throwIOException(env, "Not enough space in libaio queue"); |
| return; |
| } |
| |
| io_prep_pwrite(iocb, fileHandle, getBuffer(env, bufferWrite), (size_t)size, position); |
| |
| // The GlobalRef will be deleted when poll is called. this is done so |
| // the vm wouldn't crash if the Callback passed by the user is GCed between submission |
| // and callback. |
| // also as the real intention is to hold the reference until the life cycle is complete |
| iocb->data = (void *) (*env)->NewGlobalRef(env, callback); |
| |
| submit(env, theControl, iocb); |
| } |
| |
| JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_nativo_jlibaio_LibaioContext_submitRead |
| (JNIEnv * env, jclass clazz, jint fileHandle, jobject contextPointer, jlong position, jint size, jobject bufferRead, jobject callback) { |
| struct io_control * theControl = getIOControl(env, contextPointer); |
| if (theControl == NULL) { |
| return; |
| } |
| |
| struct iocb * iocb = getIOCB(theControl); |
| |
| if (iocb == NULL) { |
| throwIOException(env, "Not enough space in libaio queue"); |
| return; |
| } |
| |
| io_prep_pread(iocb, fileHandle, getBuffer(env, bufferRead), (size_t)size, position); |
| |
| // The GlobalRef will be deleted when poll is called. this is done so |
| // the vm wouldn't crash if the Callback passed by the user is GCed between submission |
| // and callback. |
| // also as the real intention is to hold the reference until the life cycle is complete |
| iocb->data = (void *) (*env)->NewGlobalRef(env, callback); |
| |
| submit(env, theControl, iocb); |
| } |
| |
| JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_nativo_jlibaio_LibaioContext_blockedPoll |
| (JNIEnv * env, jobject thisObject, jobject contextPointer, jboolean useFdatasync) { |
| |
| #ifdef DEBUG |
| fprintf (stdout, "Running blockedPoll\n"); |
| fflush(stdout); |
| #endif |
| |
| int i; |
| struct io_control * theControl = getIOControl(env, contextPointer); |
| if (theControl == NULL) { |
| return; |
| } |
| int max = theControl->queueSize; |
| if (max > 1) { |
| // We can't maximize the queue reading |
| // as we need to check eventually for the tail moving |
| // this is to minimize the number of syscalls we perform on trapping a previous race. |
| max = max / 2; |
| } |
| pthread_mutex_lock(&(theControl->pollLock)); |
| |
| short running = 1; |
| |
| int lastFile = -1; |
| |
| while (running) { |
| |
| int result = ringio_get_events(theControl->ioContext, 1, max, theControl->events, 0); |
| |
| if (result == -EINTR) |
| { |
| // ARTEMIS-353: jmap will issue some weird interrupt signal what would break the execution here |
| // we need to ignore such calls here |
| continue; |
| } |
| |
| if (result < 0) |
| { |
| throwIOExceptionErrorNo(env, "Error while calling io_getevents IO: ", -result); |
| break; |
| } |
| #ifdef DEBUG |
| fprintf (stdout, "blockedPoll returned %d events\n", result); |
| fflush(stdout); |
| #endif |
| |
| lastFile = -1; |
| |
| for (i = 0; i < result; i++) |
| { |
| #ifdef DEBUG |
| fprintf (stdout, "blockedPoll treating event %d\n", i); |
| fflush(stdout); |
| #endif |
| struct io_event * event = &(theControl->events[i]); |
| struct iocb * iocbp = event->obj; |
| |
| if (iocbp->aio_fildes == dumbWriteHandler) { |
| #ifdef DEBUG |
| fprintf (stdout, "Dumb write arrived, giving up the loop\n"); |
| fflush(stdout); |
| #endif |
| putIOCB(theControl, iocbp); |
| running = 0; |
| break; |
| } |
| |
| if (useFdatasync && lastFile != iocbp->aio_fildes) { |
| lastFile = iocbp->aio_fildes; |
| fdatasync(lastFile); |
| } |
| |
| |
| int eventResult = (int)event->res; |
| |
| #ifdef DEBUG |
| fprintf (stdout, "Poll res: %d totalRes=%d\n", eventResult, result); |
| fflush (stdout); |
| #endif |
| |
| if (eventResult < 0) { |
| #ifdef DEBUG |
| fprintf (stdout, "Error: %s\n", strerror(-eventResult)); |
| fflush (stdout); |
| #endif |
| |
| jstring jstrError = (*env)->NewStringUTF(env, strerror(-eventResult)); |
| |
| if (iocbp->data != NULL) { |
| (*env)->CallVoidMethod(env, (jobject)(iocbp->data), errorMethod, (jint)(-eventResult), jstrError); |
| } |
| } |
| |
| jobject obj = (jobject)iocbp->data; |
| iocbp->data = NULL; // this is to detect invalid elements on the buffer. |
| |
| if (obj != NULL) { |
| putIOCB(theControl, iocbp); |
| (*env)->CallVoidMethod(env, theControl->thisObject, libaioContextDone,obj); |
| // We delete the globalRef after the completion of the callback |
| (*env)->DeleteGlobalRef(env, obj); |
| } else { |
| if (!forceSysCall) { |
| fprintf (stdout, "Warning from ActiveMQ Artemis Native Layer: Your system is hitting duplicate / invalid records from libaio, which is a bug on the Linux Kernel you are using.\nYou should set property org.apache.activemq.artemis.native.jlibaio.FORCE_SYSCALL=1\nor upgrade to a kernel version that contains a fix"); |
| fflush(stdout); |
| } |
| forceSysCall = JNI_TRUE; |
| } |
| |
| } |
| } |
| |
| pthread_mutex_unlock(&(theControl->pollLock)); |
| |
| } |
| |
| JNIEXPORT jint JNICALL Java_org_apache_activemq_artemis_nativo_jlibaio_LibaioContext_poll |
| (JNIEnv * env, jobject obj, jobject contextPointer, jobjectArray callbacks, jint min, jint max) { |
| int i = 0; |
| struct io_control * theControl = getIOControl(env, contextPointer); |
| if (theControl == NULL) { |
| return 0; |
| } |
| |
| |
| int result = ringio_get_events(theControl->ioContext, min, max, theControl->events, 0); |
| int retVal = result; |
| |
| for (i = 0; i < result; i++) { |
| struct io_event * event = &(theControl->events[i]); |
| struct iocb * iocbp = event->obj; |
| int eventResult = (int)event->res; |
| |
| #ifdef DEBUG |
| fprintf (stdout, "Poll res: %d totalRes=%d\n", eventResult, result); |
| #endif |
| |
| if (eventResult < 0) { |
| #ifdef DEBUG |
| fprintf (stdout, "Error: %s\n", strerror(-eventResult)); |
| #endif |
| |
| if (iocbp->data != NULL && iocbp->data != (void *) -1) { |
| jstring jstrError = (*env)->NewStringUTF(env, strerror(-eventResult)); |
| |
| (*env)->CallVoidMethod(env, (jobject)(iocbp->data), errorMethod, (jint)(-eventResult), jstrError); |
| } |
| } |
| |
| if (iocbp->data != NULL && iocbp->data != (void *) -1) { |
| (*env)->SetObjectArrayElement(env, callbacks, i, (jobject)iocbp->data); |
| // We delete the globalRef after the completion of the callback |
| (*env)->DeleteGlobalRef(env, (jobject)iocbp->data); |
| } |
| |
| putIOCB(theControl, iocbp); |
| } |
| |
| return retVal; |
| } |
| |
| JNIEXPORT jobject JNICALL Java_org_apache_activemq_artemis_nativo_jlibaio_LibaioContext_newAlignedBuffer |
| (JNIEnv * env, jclass clazz, jint size, jint alignment) { |
| if (size % alignment != 0) { |
| throwRuntimeException(env, "Buffer size needs to be aligned to passed argument"); |
| return NULL; |
| } |
| |
| // This will allocate a buffer, aligned by alignment. |
| // Buffers created here need to be manually destroyed by destroyBuffer, or this would leak on the process heap away of Java's GC managed memory |
| // NOTE: this buffer will contain non initialized data, you must fill it up properly |
| void * buffer; |
| int result = posix_memalign(&buffer, (size_t)alignment, (size_t)size); |
| |
| if (result) { |
| throwRuntimeExceptionErrorNo(env, "Can't allocate posix buffer:", result); |
| return NULL; |
| } |
| |
| memset(buffer, 0, (size_t)size); |
| |
| return (*env)->NewDirectByteBuffer(env, buffer, size); |
| } |
| |
| JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_nativo_jlibaio_LibaioContext_freeBuffer |
| (JNIEnv * env, jclass clazz, jobject jbuffer) { |
| if (jbuffer == NULL) |
| { |
| throwRuntimeException(env, "Null pointer"); |
| return; |
| } |
| void * buffer = (*env)->GetDirectBufferAddress(env, jbuffer); |
| free(buffer); |
| } |
| |
| |
| /** It does nothing... just return true to make sure it has all the binary dependencies */ |
| JNIEXPORT jint JNICALL Java_org_apache_activemq_artemis_nativo_jlibaio_LibaioContext_getNativeVersion |
| (JNIEnv * env, jclass clazz) |
| |
| { |
| return org_apache_activemq_artemis_nativo_jlibaio_LibaioContext_EXPECTED_NATIVE_VERSION; |
| } |
| |
| JNIEXPORT jlong JNICALL Java_org_apache_activemq_artemis_nativo_jlibaio_LibaioContext_getSize |
| (JNIEnv * env, jclass clazz, jint fd) |
| { |
| struct stat statBuffer; |
| |
| if (fstat(fd, &statBuffer) < 0) |
| { |
| throwIOExceptionErrorNo(env, "Cannot determine file size:", errno); |
| return -1l; |
| } |
| return statBuffer.st_size; |
| } |
| |
| JNIEXPORT jint JNICALL Java_org_apache_activemq_artemis_nativo_jlibaio_LibaioContext_getBlockSizeFD |
| (JNIEnv * env, jclass clazz, jint fd) |
| { |
| struct stat statBuffer; |
| |
| if (fstat(fd, &statBuffer) < 0) |
| { |
| throwIOExceptionErrorNo(env, "Cannot determine file size:", errno); |
| return -1l; |
| } |
| return statBuffer.st_blksize; |
| } |
| |
| JNIEXPORT jint JNICALL Java_org_apache_activemq_artemis_nativo_jlibaio_LibaioContext_getBlockSize |
| (JNIEnv * env, jclass clazz, jstring path) |
| { |
| const char* f_path = (*env)->GetStringUTFChars(env, path, 0); |
| struct stat statBuffer; |
| |
| if (stat(f_path, &statBuffer) < 0) |
| { |
| throwIOExceptionErrorNo(env, "Cannot determine file size:", errno); |
| return -1l; |
| } |
| |
| (*env)->ReleaseStringUTFChars(env, path, f_path); |
| |
| return statBuffer.st_blksize; |
| } |
| |
| JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_nativo_jlibaio_LibaioContext_fallocate |
| (JNIEnv * env, jclass clazz, jint fd, jlong size) |
| { |
| if (fallocate(fd, 0, 0, (off_t) size) < 0) |
| { |
| throwIOExceptionErrorNo(env, "Could not preallocate file", errno); |
| } |
| fsync(fd); |
| lseek (fd, 0, SEEK_SET); |
| } |
| |
| JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_nativo_jlibaio_LibaioContext_fill |
| (JNIEnv * env, jclass clazz, jint fd, jint alignment, jlong size) |
| { |
| |
| int i; |
| int blocks = size / ONE_MEGA; |
| int rest = size % ONE_MEGA; |
| |
| #ifdef DEBUG |
| fprintf (stdout, "calling fill ... blocks = %d, rest=%d, alignment=%d\n", blocks, rest, alignment); |
| #endif |
| |
| |
| verifyBuffer(alignment); |
| |
| lseek (fd, 0, SEEK_SET); |
| for (i = 0; i < blocks; i++) |
| { |
| if (write(fd, oneMegaBuffer, ONE_MEGA) < 0) |
| { |
| #ifdef DEBUG |
| fprintf (stdout, "Errno is %d\n", errno); |
| #endif |
| throwIOException(env, "Cannot initialize file"); |
| return; |
| } |
| } |
| |
| if (rest != 0l) |
| { |
| if (write(fd, oneMegaBuffer, rest) < 0) |
| { |
| #ifdef DEBUG |
| fprintf (stdout, "Errno is %d\n", errno); |
| #endif |
| throwIOException(env, "Cannot initialize file with final rest"); |
| return; |
| } |
| } |
| lseek (fd, 0, SEEK_SET); |
| } |
| |
| JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_nativo_jlibaio_LibaioContext_memsetBuffer |
| (JNIEnv *env, jclass clazz, jobject jbuffer, jint size) |
| { |
| #ifdef DEBUG |
| fprintf (stdout, "Mem setting buffer with %d bytes\n", size); |
| #endif |
| void * buffer = (*env)->GetDirectBufferAddress(env, jbuffer); |
| |
| if (buffer == 0) |
| { |
| throwRuntimeException(env, "Invalid Buffer used, libaio requires NativeBuffer instead of Java ByteBuffer"); |
| return; |
| } |
| |
| memset(buffer, 0, (size_t)size); |
| } |