ARTEMIS-2800 Adding a property to disable reaping and force syscalls, and adding checks for invalid data and switching it on automatically
diff --git a/pom.xml b/pom.xml
index d236da3..6cfdac0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -45,6 +45,8 @@
         <version.org.jacoco.plugin>0.7.9</version.org.jacoco.plugin>
         <version.maven.jar.plugin>2.4</version.maven.jar.plugin>
 
+        <test.stress.time>5000</test.stress.time>
+
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
         <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
 
@@ -52,7 +54,7 @@
         <maven.compiler.target>1.8</maven.compiler.target>
 
         <activemq-surefire-argline>
-            -Djava.library.path="${activemq.basedir}/target/lib/linux-${os.arch}"
+            -Dtest.stress.time=${test.stress.time} -Djava.library.path="${activemq.basedir}/target/lib/linux-${os.arch}"
         </activemq-surefire-argline>
         <activemq.basedir>${project.basedir}</activemq.basedir>
         <skipLicenseCheck>true</skipLicenseCheck>
diff --git a/src/main/c/org_apache_activemq_artemis_nativo_jlibaio_LibaioContext.c b/src/main/c/org_apache_activemq_artemis_nativo_jlibaio_LibaioContext.c
index 8130139..e057311 100644
--- a/src/main/c/org_apache_activemq_artemis_nativo_jlibaio_LibaioContext.c
+++ b/src/main/c/org_apache_activemq_artemis_nativo_jlibaio_LibaioContext.c
@@ -70,6 +70,31 @@
 #define AIO_RING_MAGIC	0xa10a10a1
 #define AIO_RING_INCOMPAT_FEATURES	0
 
+// set this to 0 if you want to stop using ring reaping
+#define RING_REAPER 1
+
+jboolean forceSysCall = JNI_FALSE;
+
+/*
+ * Class:     org_apache_activemq_artemis_nativo_jlibaio_LibaioContext
+ * Method:    setForceSyscall
+ * Signature: (Z)V
+ */
+JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_nativo_jlibaio_LibaioContext_setForceSyscall
+  (JNIEnv * env, jclass clazz, jboolean _forceSysCall) {
+  forceSysCall = _forceSysCall;
+}
+
+/*
+ * Class:     org_apache_activemq_artemis_nativo_jlibaio_LibaioContext
+ * Method:    isForceSyscall
+ * Signature: ()Z
+ */
+JNIEXPORT jboolean JNICALL Java_org_apache_activemq_artemis_nativo_jlibaio_LibaioContext_isForceSyscall
+  (JNIEnv * env, jclass clazz) {
+  return forceSysCall | !RING_REAPER;
+}
+
 
 /** There is no defined aio_ring anywhere in an include,
     This is an implementation detail, that is a binary contract.
@@ -106,7 +131,7 @@
                                                        struct io_event *events, struct timespec *timeout) {
     struct aio_ring *ring = to_aio_ring(aio_ctx);
     //checks if it could be completed in user space, saving a sys call
-    if (has_usable_ring(ring)) {
+    if (RING_REAPER && !forceSysCall && has_usable_ring(ring)) {
         const unsigned ring_nr = ring->nr;
         // We're assuming to be the exclusive writer to head, so we just need a compiler barrier
         unsigned head = ring->head;
@@ -178,8 +203,6 @@
 jclass libaioContextClass = NULL;
 jclass runtimeExceptionClass = NULL;
 jclass ioExceptionClass = NULL;
-jclass nioBufferClass = NULL;
-jfieldID nioBufferAddressFieldId = NULL;
 
 // util methods
 void throwRuntimeException(JNIEnv* env, char* message) {
@@ -333,13 +356,6 @@
            return JNI_ERR;
         }
 
-        nioBufferClass = (*env)->FindClass(env, "java/nio/Buffer");
-        if (nioBufferClass == NULL) {
-           return JNI_ERR;
-        }
-        nioBufferClass = (jclass)(*env)->NewGlobalRef(env, (jobject)nioBufferClass);
-        nioBufferAddressFieldId = (*env)->GetFieldID(env, nioBufferClass, "address", "J");
-
         return JNI_VERSION_1_6;
     }
 }
@@ -385,10 +401,6 @@
         if (libaioContextClass != NULL) {
             (*env)->DeleteGlobalRef(env, (jobject)libaioContextClass);
         }
-
-        if (nioBufferClass != NULL) {
-            (*env)->DeleteGlobalRef(env, (jobject)nioBufferClass);
-        }
     }
 }
 
@@ -399,8 +411,7 @@
 
 
 static inline struct io_control * getIOControl(JNIEnv* env, jobject pointer) {
-    jlong address = (*env)->GetLongField(env, pointer, nioBufferAddressFieldId);
-    struct io_control * ioControl = (struct io_control *) address;
+    struct io_control * ioControl = (struct io_control *) (*env)->GetDirectBufferAddress(env, pointer);
     if (ioControl == NULL) {
        throwRuntimeException(env, "Controller not initialized");
     }
@@ -468,7 +479,7 @@
 }
 
 static inline void * getBuffer(JNIEnv* env, jobject pointer) {
-    return (void *) (*env)->GetLongField(env, pointer, nioBufferAddressFieldId);;
+    return (*env)->GetDirectBufferAddress(env, pointer);
 }
 
 JNIEXPORT jboolean JNICALL Java_org_apache_activemq_artemis_nativo_jlibaio_LibaioContext_lock
@@ -818,12 +829,19 @@
             }
 
             jobject obj = (jobject)iocbp->data;
-            putIOCB(theControl, iocbp);
+            iocbp->data = NULL; // this is to detect invalid elements on the buffer.
 
             if (obj != NULL) {
+                putIOCB(theControl, iocbp);
                 (*env)->CallVoidMethod(env, theControl->thisObject, libaioContextDone,obj);
                 // We delete the globalRef after the completion of the callback
                 (*env)->DeleteGlobalRef(env, obj);
+            } else {
+                if (!forceSysCall) {
+                    fprintf (stdout, "Warning from ActiveMQ Artemis Native Layer: Your system is hitting duplicate / invalid records from libaio, which is a bug on the Linux Kernel you are using.\nYou should set property org.apache.activemq.artemis.native.jlibaio.FORCE_SYSCALL=1\nor upgrade to a kernel version that contains a fix");
+                    fflush(stdout);
+                }
+                forceSysCall = JNI_TRUE;
             }
 
         }
@@ -908,7 +926,7 @@
        throwRuntimeException(env, "Null pointer");
        return;
     }
-  	void *  buffer = getBuffer(env, jbuffer);
+  	void *  buffer = (*env)->GetDirectBufferAddress(env, jbuffer);
   	free(buffer);
 }
 
@@ -1023,7 +1041,7 @@
     #ifdef DEBUG
         fprintf (stdout, "Mem setting buffer with %d bytes\n", size);
     #endif
-    void * buffer = getBuffer(env, jbuffer);
+    void * buffer = (*env)->GetDirectBufferAddress(env, jbuffer);
 
     if (buffer == 0)
     {
diff --git a/src/main/c/org_apache_activemq_artemis_nativo_jlibaio_LibaioContext.h b/src/main/c/org_apache_activemq_artemis_nativo_jlibaio_LibaioContext.h
new file mode 100644
index 0000000..7adfd43
--- /dev/null
+++ b/src/main/c/org_apache_activemq_artemis_nativo_jlibaio_LibaioContext.h
@@ -0,0 +1,191 @@
+/* DO NOT EDIT THIS FILE - it is machine generated */
+#include <jni.h>
+/* Header for class org_apache_activemq_artemis_nativo_jlibaio_LibaioContext */
+
+#ifndef _Included_org_apache_activemq_artemis_nativo_jlibaio_LibaioContext
+#define _Included_org_apache_activemq_artemis_nativo_jlibaio_LibaioContext
+#ifdef __cplusplus
+extern "C" {
+#endif
+#undef org_apache_activemq_artemis_nativo_jlibaio_LibaioContext_EXPECTED_NATIVE_VERSION
+#define org_apache_activemq_artemis_nativo_jlibaio_LibaioContext_EXPECTED_NATIVE_VERSION 10L
+/*
+ * Class:     org_apache_activemq_artemis_nativo_jlibaio_LibaioContext
+ * Method:    shutdownHook
+ * Signature: ()V
+ */
+JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_nativo_jlibaio_LibaioContext_shutdownHook
+  (JNIEnv *, jclass);
+
+/*
+ * Class:     org_apache_activemq_artemis_nativo_jlibaio_LibaioContext
+ * Method:    setForceSyscall
+ * Signature: (Z)V
+ */
+JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_nativo_jlibaio_LibaioContext_setForceSyscall
+  (JNIEnv *, jclass, jboolean);
+
+/*
+ * Class:     org_apache_activemq_artemis_nativo_jlibaio_LibaioContext
+ * Method:    isForceSyscall
+ * Signature: ()Z
+ */
+JNIEXPORT jboolean JNICALL Java_org_apache_activemq_artemis_nativo_jlibaio_LibaioContext_isForceSyscall
+  (JNIEnv *, jclass);
+
+/*
+ * Class:     org_apache_activemq_artemis_nativo_jlibaio_LibaioContext
+ * Method:    newContext
+ * Signature: (I)Ljava/nio/ByteBuffer;
+ */
+JNIEXPORT jobject JNICALL Java_org_apache_activemq_artemis_nativo_jlibaio_LibaioContext_newContext
+  (JNIEnv *, jobject, jint);
+
+/*
+ * Class:     org_apache_activemq_artemis_nativo_jlibaio_LibaioContext
+ * Method:    deleteContext
+ * Signature: (Ljava/nio/ByteBuffer;)V
+ */
+JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_nativo_jlibaio_LibaioContext_deleteContext
+  (JNIEnv *, jobject, jobject);
+
+/*
+ * Class:     org_apache_activemq_artemis_nativo_jlibaio_LibaioContext
+ * Method:    open
+ * Signature: (Ljava/lang/String;Z)I
+ */
+JNIEXPORT jint JNICALL Java_org_apache_activemq_artemis_nativo_jlibaio_LibaioContext_open
+  (JNIEnv *, jclass, jstring, jboolean);
+
+/*
+ * Class:     org_apache_activemq_artemis_nativo_jlibaio_LibaioContext
+ * Method:    close
+ * Signature: (I)V
+ */
+JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_nativo_jlibaio_LibaioContext_close
+  (JNIEnv *, jclass, jint);
+
+/*
+ * Class:     org_apache_activemq_artemis_nativo_jlibaio_LibaioContext
+ * Method:    newAlignedBuffer
+ * Signature: (II)Ljava/nio/ByteBuffer;
+ */
+JNIEXPORT jobject JNICALL Java_org_apache_activemq_artemis_nativo_jlibaio_LibaioContext_newAlignedBuffer
+  (JNIEnv *, jclass, jint, jint);
+
+/*
+ * Class:     org_apache_activemq_artemis_nativo_jlibaio_LibaioContext
+ * Method:    freeBuffer
+ * Signature: (Ljava/nio/ByteBuffer;)V
+ */
+JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_nativo_jlibaio_LibaioContext_freeBuffer
+  (JNIEnv *, jclass, jobject);
+
+/*
+ * Class:     org_apache_activemq_artemis_nativo_jlibaio_LibaioContext
+ * Method:    submitWrite
+ * Signature: (ILjava/nio/ByteBuffer;JILjava/nio/ByteBuffer;Lorg/apache/activemq/artemis/nativo/jlibaio/SubmitInfo;)V
+ */
+JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_nativo_jlibaio_LibaioContext_submitWrite
+  (JNIEnv *, jobject, jint, jobject, jlong, jint, jobject, jobject);
+
+/*
+ * Class:     org_apache_activemq_artemis_nativo_jlibaio_LibaioContext
+ * Method:    submitRead
+ * Signature: (ILjava/nio/ByteBuffer;JILjava/nio/ByteBuffer;Lorg/apache/activemq/artemis/nativo/jlibaio/SubmitInfo;)V
+ */
+JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_nativo_jlibaio_LibaioContext_submitRead
+  (JNIEnv *, jobject, jint, jobject, jlong, jint, jobject, jobject);
+
+/*
+ * Class:     org_apache_activemq_artemis_nativo_jlibaio_LibaioContext
+ * Method:    poll
+ * Signature: (Ljava/nio/ByteBuffer;[Lorg/apache/activemq/artemis/nativo/jlibaio/SubmitInfo;II)I
+ */
+JNIEXPORT jint JNICALL Java_org_apache_activemq_artemis_nativo_jlibaio_LibaioContext_poll
+  (JNIEnv *, jobject, jobject, jobjectArray, jint, jint);
+
+/*
+ * Class:     org_apache_activemq_artemis_nativo_jlibaio_LibaioContext
+ * Method:    blockedPoll
+ * Signature: (Ljava/nio/ByteBuffer;Z)V
+ */
+JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_nativo_jlibaio_LibaioContext_blockedPoll
+  (JNIEnv *, jobject, jobject, jboolean);
+
+/*
+ * Class:     org_apache_activemq_artemis_nativo_jlibaio_LibaioContext
+ * Method:    getNativeVersion
+ * Signature: ()I
+ */
+JNIEXPORT jint JNICALL Java_org_apache_activemq_artemis_nativo_jlibaio_LibaioContext_getNativeVersion
+  (JNIEnv *, jclass);
+
+/*
+ * Class:     org_apache_activemq_artemis_nativo_jlibaio_LibaioContext
+ * Method:    lock
+ * Signature: (I)Z
+ */
+JNIEXPORT jboolean JNICALL Java_org_apache_activemq_artemis_nativo_jlibaio_LibaioContext_lock
+  (JNIEnv *, jclass, jint);
+
+/*
+ * Class:     org_apache_activemq_artemis_nativo_jlibaio_LibaioContext
+ * Method:    memsetBuffer
+ * Signature: (Ljava/nio/ByteBuffer;I)V
+ */
+JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_nativo_jlibaio_LibaioContext_memsetBuffer
+  (JNIEnv *, jclass, jobject, jint);
+
+/*
+ * Class:     org_apache_activemq_artemis_nativo_jlibaio_LibaioContext
+ * Method:    getSize
+ * Signature: (I)J
+ */
+JNIEXPORT jlong JNICALL Java_org_apache_activemq_artemis_nativo_jlibaio_LibaioContext_getSize
+  (JNIEnv *, jclass, jint);
+
+/*
+ * Class:     org_apache_activemq_artemis_nativo_jlibaio_LibaioContext
+ * Method:    getBlockSizeFD
+ * Signature: (I)I
+ */
+JNIEXPORT jint JNICALL Java_org_apache_activemq_artemis_nativo_jlibaio_LibaioContext_getBlockSizeFD
+  (JNIEnv *, jclass, jint);
+
+/*
+ * Class:     org_apache_activemq_artemis_nativo_jlibaio_LibaioContext
+ * Method:    getBlockSize
+ * Signature: (Ljava/lang/String;)I
+ */
+JNIEXPORT jint JNICALL Java_org_apache_activemq_artemis_nativo_jlibaio_LibaioContext_getBlockSize
+  (JNIEnv *, jclass, jstring);
+
+/*
+ * Class:     org_apache_activemq_artemis_nativo_jlibaio_LibaioContext
+ * Method:    fallocate
+ * Signature: (IJ)V
+ */
+JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_nativo_jlibaio_LibaioContext_fallocate
+  (JNIEnv *, jclass, jint, jlong);
+
+/*
+ * Class:     org_apache_activemq_artemis_nativo_jlibaio_LibaioContext
+ * Method:    fill
+ * Signature: (IIJ)V
+ */
+JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_nativo_jlibaio_LibaioContext_fill
+  (JNIEnv *, jclass, jint, jint, jlong);
+
+/*
+ * Class:     org_apache_activemq_artemis_nativo_jlibaio_LibaioContext
+ * Method:    writeInternal
+ * Signature: (IJJLjava/nio/ByteBuffer;)V
+ */
+JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_nativo_jlibaio_LibaioContext_writeInternal
+  (JNIEnv *, jclass, jint, jlong, jlong, jobject);
+
+#ifdef __cplusplus
+}
+#endif
+#endif
diff --git a/src/main/java/org/apache/activemq/artemis/nativo/jlibaio/LibaioContext.java b/src/main/java/org/apache/activemq/artemis/nativo/jlibaio/LibaioContext.java
index 945b2de..794e112 100644
--- a/src/main/java/org/apache/activemq/artemis/nativo/jlibaio/LibaioContext.java
+++ b/src/main/java/org/apache/activemq/artemis/nativo/jlibaio/LibaioContext.java
@@ -47,7 +47,7 @@
    /**
     * The Native layer will look at this version.
     */
-   private static final int EXPECTED_NATIVE_VERSION = 9;
+   private static final int EXPECTED_NATIVE_VERSION = 10;
 
    private static boolean loaded = false;
 
@@ -81,6 +81,9 @@
       for (String library : libraries) {
          if (loadLibrary(library)) {
             loaded = true;
+            if (System.getProperty("org.apache.activemq.artemis.native.jlibaio.FORCE_SYSCALL") != null) {
+               LibaioContext.setForceSyscall(true);
+            }
             Runtime.getRuntime().addShutdownHook(new Thread() {
                @Override
                public void run() {
@@ -107,6 +110,11 @@
 
    private static native void shutdownHook();
 
+   public static native void setForceSyscall(boolean value);
+
+   /** The system may choose to set this if a failing condition happened inside the code. */
+   public static native boolean isForceSyscall();
+
    /**
     * This is used to validate leaks on tests.
     *
diff --git a/src/test/java/org/apache/activemq/artemis/nativo/jlibaio/test/LibaioStressTest.java b/src/test/java/org/apache/activemq/artemis/nativo/jlibaio/test/LibaioStressTest.java
new file mode 100644
index 0000000..e4e55b8
--- /dev/null
+++ b/src/test/java/org/apache/activemq/artemis/nativo/jlibaio/test/LibaioStressTest.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.nativo.jlibaio.test;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.artemis.nativo.jlibaio.LibaioContext;
+import org.apache.activemq.artemis.nativo.jlibaio.LibaioFile;
+import org.apache.activemq.artemis.nativo.jlibaio.SubmitInfo;
+import org.apache.activemq.artemis.nativo.jlibaio.util.CallbackCache;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+/**
+ * This test is using a different package from {@link LibaioFile}
+ * as I need to validate public methods on the API
+ */
+public class LibaioStressTest {
+
+    private static final int STRESS_TIME = Integer.parseInt(System.getProperty("test.stress.time", "5000"));
+
+    /**
+     * This is just an arbitrary number for a number of elements you need to pass to the libaio init method
+     * Some of the tests are using half of this number, so if anyone decide to change this please use an even number.
+     */
+    private static final int LIBAIO_QUEUE_SIZE = 4096;
+
+    private int errors = 0;
+
+    private boolean running = true;
+
+    @Rule
+    public TemporaryFolder temporaryFolder;
+
+    public LibaioContext<MyClass> control;
+
+    @Before
+    public void setUpFactory() {
+        control = new LibaioContext<>(LIBAIO_QUEUE_SIZE, true, false);
+    }
+
+    @After
+    public void deleteFactory() {
+        control.close();
+        validateLibaio();
+    }
+
+    public void validateLibaio() {
+        Assert.assertEquals(0, LibaioContext.getTotalMaxIO());
+    }
+
+    public LibaioStressTest() {
+        /*
+         *  I didn't use /tmp for three reasons
+         *  - Most systems now will use tmpfs which is not compatible with O_DIRECT
+         *  - This would fill up /tmp in case of failures.
+         *  - target is cleaned up every time you do a mvn clean, so it's safer
+         */
+        File parent = new File("./target");
+        parent.mkdirs();
+        temporaryFolder = new TemporaryFolder(parent);
+    }
+
+    @Test
+    public void testOpen() throws Exception {
+        LibaioFile fileDescriptor = control.openFile(temporaryFolder.newFile("test.bin"), true);
+        fileDescriptor.close();
+    }
+
+
+    CallbackCache<MyClass> callbackCache = new CallbackCache<>(LIBAIO_QUEUE_SIZE);
+
+    class MyClass implements SubmitInfo {
+
+        ReusableLatch reusableLatch;
+
+        @Override
+        public void onError(int errno, String message) {
+
+        }
+
+        @Override
+        public void done() {
+            try {
+                reusableLatch.countDown();
+                reusableLatch = null;
+                callbackCache.put(this);
+            } catch (Throwable e) {
+                e.printStackTrace();
+                System.exit(-1);
+            }
+        }
+    }
+
+    @Test
+    public void testForceSyscall() {
+        Assert.assertFalse(LibaioContext.isForceSyscall());
+        LibaioContext.setForceSyscall(true);
+        Assert.assertTrue(LibaioContext.isForceSyscall());
+        LibaioContext.setForceSyscall(false);
+    }
+
+    @Test
+    public void testStressWrites() throws Exception {
+        Assume.assumeFalse(LibaioContext.isForceSyscall());
+
+        Thread t = new Thread() {
+            @Override
+            public void run() {
+                control.poll();
+            }
+        };
+
+        t.start();
+
+
+        Thread t2 = new Thread(() -> {
+            while (running) {
+                try {
+                    Thread.sleep(1000);
+                } catch (Exception e) {
+                }
+                // this is just to make things more interesting from the POV of testing
+                System.gc();
+            }
+        });
+
+        t2.start();
+
+        Thread test1 = startThread("test1.bin");
+        Thread test2 = startThread("test2.bin");
+        Thread.sleep(STRESS_TIME); // Configured timeout on the test
+        running = false;
+        test2.join();
+        test1.join();
+        t2.join();
+
+        Assert.assertFalse(LibaioContext.isForceSyscall());
+        return;
+    }
+
+    private Thread startThread(String name) {
+        Thread t_test = new Thread(() -> {
+            try {
+                doFile(name);
+            } catch (IOException e) {
+                e.printStackTrace();
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            }
+        });
+        t_test.start();
+
+        return t_test;
+    }
+
+    private void doFile(String fileName) throws IOException, InterruptedException {
+        ReusableLatch latchWrites = new ReusableLatch(0);
+
+        File file = temporaryFolder.newFile(fileName);
+        LibaioFile fileDescriptor = control.openFile(file, true);
+
+        // ByteBuffer buffer = ByteBuffer.allocateDirect(4096);
+        ByteBuffer buffer = LibaioContext.newAlignedBuffer(4096, 4096);
+
+        int maxSize = 4096 * LIBAIO_QUEUE_SIZE;
+        fileDescriptor.fill(4096, maxSize);
+        for (int i = 0; i < 4096; i++) {
+            buffer.put((byte) 'a');
+        }
+
+        buffer.rewind();
+
+        int pos = 0;
+
+        long count = 0;
+
+        long nextBreak = System.currentTimeMillis() + 3000;
+
+        while (running) {
+            count++;
+
+            if (System.currentTimeMillis() > nextBreak) {
+                if (!latchWrites.await(10, TimeUnit.SECONDS)) {
+                    System.err.println("Latch did not complete for some reason");
+                    errors++;
+                    return;
+                }
+                fileDescriptor.close();
+
+                fileDescriptor = control.openFile(file, true);
+                pos = 0;
+                // we close / open a file every 5 seconds
+                nextBreak = System.currentTimeMillis() + 5000;
+            }
+
+            if (count % 1_000 == 0) {
+                System.out.println("Writen "  + count + " buffers at " + fileName);
+            }
+            MyClass myClass = callbackCache.get();
+
+            if (myClass == null) {
+                myClass = new MyClass();
+            }
+
+            myClass.reusableLatch = latchWrites;
+            myClass.reusableLatch.countUp();
+
+
+            if (count % 100 == 0) {
+                Thread.sleep(100);
+            }
+            fileDescriptor.write(pos, 4096, buffer, myClass);
+            pos += 4096;
+
+            if (pos >= maxSize) {
+                pos = 0;
+            }
+
+        }
+
+        fileDescriptor.close();
+    }
+
+}
diff --git a/src/test/java/org/apache/activemq/artemis/nativo/jlibaio/test/ReusableLatch.java b/src/test/java/org/apache/activemq/artemis/nativo/jlibaio/test/ReusableLatch.java
new file mode 100644
index 0000000..86a8848
--- /dev/null
+++ b/src/test/java/org/apache/activemq/artemis/nativo/jlibaio/test/ReusableLatch.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.nativo.jlibaio.test;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.AbstractQueuedSynchronizer;
+
+/**
+ * <p>This class will use the framework provided to by AbstractQueuedSynchronizer.</p>
+ * <p>AbstractQueuedSynchronizer is the framework for any sort of concurrent synchronization, such as Semaphores, events, etc, based on AtomicIntegers.</p>
+ *
+ * <p>This class works just like CountDownLatch, with the difference you can also increase the counter</p>
+ *
+ * <p>It could be used for sync points when one process is feeding the latch while another will wait when everything is done. (e.g. waiting IO completions to finish)</p>
+ *
+ * <p>On ActiveMQ Artemis we have the requirement of increment and decrement a counter until the user fires a ready event (commit). At that point we just act as a regular countDown.</p>
+ *
+ * <p>Note: This latch is reusable. Once it reaches zero, you can call up again, and reuse it on further waits.</p>
+ *
+ * <p>For example: prepareTransaction will wait for the current completions, and further adds will be called on the latch. Later on when commit is called you can reuse the same latch.</p>
+ */
+public class ReusableLatch {
+
+   /**
+    * Look at the doc and examples provided by AbstractQueuedSynchronizer for more information
+    *
+    * @see AbstractQueuedSynchronizer
+    */
+   @SuppressWarnings("serial")
+   private static class CountSync extends AbstractQueuedSynchronizer {
+
+      private CountSync(int count) {
+         setState(count);
+      }
+
+      public int getCount() {
+         return getState();
+      }
+
+      public void setCount(final int count) {
+         setState(count);
+      }
+
+      @Override
+      public int tryAcquireShared(final int numberOfAqcquires) {
+         return getState() == 0 ? 1 : -1;
+      }
+
+      public void add() {
+         for (;;) {
+            int actualState = getState();
+            int newState = actualState + 1;
+            if (compareAndSetState(actualState, newState)) {
+               return;
+            }
+         }
+      }
+
+      @Override
+      public boolean tryReleaseShared(final int numberOfReleases) {
+         for (;;) {
+            int actualState = getState();
+            if (actualState == 0) {
+               return true;
+            }
+
+            int newState = actualState - numberOfReleases;
+
+            if (newState < 0) {
+               newState = 0;
+            }
+
+            if (compareAndSetState(actualState, newState)) {
+               return newState == 0;
+            }
+         }
+      }
+   }
+
+   private final CountSync control;
+
+   public ReusableLatch() {
+      this(0);
+   }
+
+   public ReusableLatch(final int count) {
+      control = new CountSync(count);
+   }
+
+   public int getCount() {
+      return control.getCount();
+   }
+
+   public void setCount(final int count) {
+      control.setCount(count);
+   }
+
+   public void countUp() {
+      control.add();
+   }
+
+   public void countDown() {
+      control.releaseShared(1);
+   }
+
+   public void countDown(final int count) {
+      control.releaseShared(count);
+   }
+
+   public void await() throws InterruptedException {
+      control.acquireSharedInterruptibly(1);
+   }
+
+   public boolean await(final long milliseconds) throws InterruptedException {
+      return control.tryAcquireSharedNanos(1, TimeUnit.MILLISECONDS.toNanos(milliseconds));
+   }
+
+   public boolean await(final long timeWait, TimeUnit timeUnit) throws InterruptedException {
+      return control.tryAcquireSharedNanos(1, timeUnit.toNanos(timeWait));
+   }
+}