ARTEMIS-2800 Adding a property to disable reaping and force syscalls, and adding checks for invalid data and switching it on automatically
diff --git a/pom.xml b/pom.xml
index d236da3..6cfdac0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -45,6 +45,8 @@
<version.org.jacoco.plugin>0.7.9</version.org.jacoco.plugin>
<version.maven.jar.plugin>2.4</version.maven.jar.plugin>
+ <test.stress.time>5000</test.stress.time>
+
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
@@ -52,7 +54,7 @@
<maven.compiler.target>1.8</maven.compiler.target>
<activemq-surefire-argline>
- -Djava.library.path="${activemq.basedir}/target/lib/linux-${os.arch}"
+ -Dtest.stress.time=${test.stress.time} -Djava.library.path="${activemq.basedir}/target/lib/linux-${os.arch}"
</activemq-surefire-argline>
<activemq.basedir>${project.basedir}</activemq.basedir>
<skipLicenseCheck>true</skipLicenseCheck>
diff --git a/src/main/c/org_apache_activemq_artemis_nativo_jlibaio_LibaioContext.c b/src/main/c/org_apache_activemq_artemis_nativo_jlibaio_LibaioContext.c
index 8130139..e057311 100644
--- a/src/main/c/org_apache_activemq_artemis_nativo_jlibaio_LibaioContext.c
+++ b/src/main/c/org_apache_activemq_artemis_nativo_jlibaio_LibaioContext.c
@@ -70,6 +70,31 @@
#define AIO_RING_MAGIC 0xa10a10a1
#define AIO_RING_INCOMPAT_FEATURES 0
+// set this to 0 if you want to stop using ring reaping
+#define RING_REAPER 1
+
+jboolean forceSysCall = JNI_FALSE;
+
+/*
+ * Class: org_apache_activemq_artemis_nativo_jlibaio_LibaioContext
+ * Method: setForceSyscall
+ * Signature: (Z)V
+ */
+JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_nativo_jlibaio_LibaioContext_setForceSyscall
+ (JNIEnv * env, jclass clazz, jboolean _forceSysCall) {
+ forceSysCall = _forceSysCall;
+}
+
+/*
+ * Class: org_apache_activemq_artemis_nativo_jlibaio_LibaioContext
+ * Method: isForceSyscall
+ * Signature: ()Z
+ */
+JNIEXPORT jboolean JNICALL Java_org_apache_activemq_artemis_nativo_jlibaio_LibaioContext_isForceSyscall
+ (JNIEnv * env, jclass clazz) {
+ return forceSysCall | !RING_REAPER;
+}
+
/** There is no defined aio_ring anywhere in an include,
This is an implementation detail, that is a binary contract.
@@ -106,7 +131,7 @@
struct io_event *events, struct timespec *timeout) {
struct aio_ring *ring = to_aio_ring(aio_ctx);
//checks if it could be completed in user space, saving a sys call
- if (has_usable_ring(ring)) {
+ if (RING_REAPER && !forceSysCall && has_usable_ring(ring)) {
const unsigned ring_nr = ring->nr;
// We're assuming to be the exclusive writer to head, so we just need a compiler barrier
unsigned head = ring->head;
@@ -178,8 +203,6 @@
jclass libaioContextClass = NULL;
jclass runtimeExceptionClass = NULL;
jclass ioExceptionClass = NULL;
-jclass nioBufferClass = NULL;
-jfieldID nioBufferAddressFieldId = NULL;
// util methods
void throwRuntimeException(JNIEnv* env, char* message) {
@@ -333,13 +356,6 @@
return JNI_ERR;
}
- nioBufferClass = (*env)->FindClass(env, "java/nio/Buffer");
- if (nioBufferClass == NULL) {
- return JNI_ERR;
- }
- nioBufferClass = (jclass)(*env)->NewGlobalRef(env, (jobject)nioBufferClass);
- nioBufferAddressFieldId = (*env)->GetFieldID(env, nioBufferClass, "address", "J");
-
return JNI_VERSION_1_6;
}
}
@@ -385,10 +401,6 @@
if (libaioContextClass != NULL) {
(*env)->DeleteGlobalRef(env, (jobject)libaioContextClass);
}
-
- if (nioBufferClass != NULL) {
- (*env)->DeleteGlobalRef(env, (jobject)nioBufferClass);
- }
}
}
@@ -399,8 +411,7 @@
static inline struct io_control * getIOControl(JNIEnv* env, jobject pointer) {
- jlong address = (*env)->GetLongField(env, pointer, nioBufferAddressFieldId);
- struct io_control * ioControl = (struct io_control *) address;
+ struct io_control * ioControl = (struct io_control *) (*env)->GetDirectBufferAddress(env, pointer);
if (ioControl == NULL) {
throwRuntimeException(env, "Controller not initialized");
}
@@ -468,7 +479,7 @@
}
static inline void * getBuffer(JNIEnv* env, jobject pointer) {
- return (void *) (*env)->GetLongField(env, pointer, nioBufferAddressFieldId);;
+ return (*env)->GetDirectBufferAddress(env, pointer);
}
JNIEXPORT jboolean JNICALL Java_org_apache_activemq_artemis_nativo_jlibaio_LibaioContext_lock
@@ -818,12 +829,19 @@
}
jobject obj = (jobject)iocbp->data;
- putIOCB(theControl, iocbp);
+ iocbp->data = NULL; // this is to detect invalid elements on the buffer.
if (obj != NULL) {
+ putIOCB(theControl, iocbp);
(*env)->CallVoidMethod(env, theControl->thisObject, libaioContextDone,obj);
// We delete the globalRef after the completion of the callback
(*env)->DeleteGlobalRef(env, obj);
+ } else {
+ if (!forceSysCall) {
+ fprintf (stdout, "Warning from ActiveMQ Artemis Native Layer: Your system is hitting duplicate / invalid records from libaio, which is a bug on the Linux Kernel you are using.\nYou should set property org.apache.activemq.artemis.native.jlibaio.FORCE_SYSCALL=1\nor upgrade to a kernel version that contains a fix");
+ fflush(stdout);
+ }
+ forceSysCall = JNI_TRUE;
}
}
@@ -908,7 +926,7 @@
throwRuntimeException(env, "Null pointer");
return;
}
- void * buffer = getBuffer(env, jbuffer);
+ void * buffer = (*env)->GetDirectBufferAddress(env, jbuffer);
free(buffer);
}
@@ -1023,7 +1041,7 @@
#ifdef DEBUG
fprintf (stdout, "Mem setting buffer with %d bytes\n", size);
#endif
- void * buffer = getBuffer(env, jbuffer);
+ void * buffer = (*env)->GetDirectBufferAddress(env, jbuffer);
if (buffer == 0)
{
diff --git a/src/main/c/org_apache_activemq_artemis_nativo_jlibaio_LibaioContext.h b/src/main/c/org_apache_activemq_artemis_nativo_jlibaio_LibaioContext.h
new file mode 100644
index 0000000..7adfd43
--- /dev/null
+++ b/src/main/c/org_apache_activemq_artemis_nativo_jlibaio_LibaioContext.h
@@ -0,0 +1,191 @@
+/* DO NOT EDIT THIS FILE - it is machine generated */
+#include <jni.h>
+/* Header for class org_apache_activemq_artemis_nativo_jlibaio_LibaioContext */
+
+#ifndef _Included_org_apache_activemq_artemis_nativo_jlibaio_LibaioContext
+#define _Included_org_apache_activemq_artemis_nativo_jlibaio_LibaioContext
+#ifdef __cplusplus
+extern "C" {
+#endif
+#undef org_apache_activemq_artemis_nativo_jlibaio_LibaioContext_EXPECTED_NATIVE_VERSION
+#define org_apache_activemq_artemis_nativo_jlibaio_LibaioContext_EXPECTED_NATIVE_VERSION 10L
+/*
+ * Class: org_apache_activemq_artemis_nativo_jlibaio_LibaioContext
+ * Method: shutdownHook
+ * Signature: ()V
+ */
+JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_nativo_jlibaio_LibaioContext_shutdownHook
+ (JNIEnv *, jclass);
+
+/*
+ * Class: org_apache_activemq_artemis_nativo_jlibaio_LibaioContext
+ * Method: setForceSyscall
+ * Signature: (Z)V
+ */
+JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_nativo_jlibaio_LibaioContext_setForceSyscall
+ (JNIEnv *, jclass, jboolean);
+
+/*
+ * Class: org_apache_activemq_artemis_nativo_jlibaio_LibaioContext
+ * Method: isForceSyscall
+ * Signature: ()Z
+ */
+JNIEXPORT jboolean JNICALL Java_org_apache_activemq_artemis_nativo_jlibaio_LibaioContext_isForceSyscall
+ (JNIEnv *, jclass);
+
+/*
+ * Class: org_apache_activemq_artemis_nativo_jlibaio_LibaioContext
+ * Method: newContext
+ * Signature: (I)Ljava/nio/ByteBuffer;
+ */
+JNIEXPORT jobject JNICALL Java_org_apache_activemq_artemis_nativo_jlibaio_LibaioContext_newContext
+ (JNIEnv *, jobject, jint);
+
+/*
+ * Class: org_apache_activemq_artemis_nativo_jlibaio_LibaioContext
+ * Method: deleteContext
+ * Signature: (Ljava/nio/ByteBuffer;)V
+ */
+JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_nativo_jlibaio_LibaioContext_deleteContext
+ (JNIEnv *, jobject, jobject);
+
+/*
+ * Class: org_apache_activemq_artemis_nativo_jlibaio_LibaioContext
+ * Method: open
+ * Signature: (Ljava/lang/String;Z)I
+ */
+JNIEXPORT jint JNICALL Java_org_apache_activemq_artemis_nativo_jlibaio_LibaioContext_open
+ (JNIEnv *, jclass, jstring, jboolean);
+
+/*
+ * Class: org_apache_activemq_artemis_nativo_jlibaio_LibaioContext
+ * Method: close
+ * Signature: (I)V
+ */
+JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_nativo_jlibaio_LibaioContext_close
+ (JNIEnv *, jclass, jint);
+
+/*
+ * Class: org_apache_activemq_artemis_nativo_jlibaio_LibaioContext
+ * Method: newAlignedBuffer
+ * Signature: (II)Ljava/nio/ByteBuffer;
+ */
+JNIEXPORT jobject JNICALL Java_org_apache_activemq_artemis_nativo_jlibaio_LibaioContext_newAlignedBuffer
+ (JNIEnv *, jclass, jint, jint);
+
+/*
+ * Class: org_apache_activemq_artemis_nativo_jlibaio_LibaioContext
+ * Method: freeBuffer
+ * Signature: (Ljava/nio/ByteBuffer;)V
+ */
+JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_nativo_jlibaio_LibaioContext_freeBuffer
+ (JNIEnv *, jclass, jobject);
+
+/*
+ * Class: org_apache_activemq_artemis_nativo_jlibaio_LibaioContext
+ * Method: submitWrite
+ * Signature: (ILjava/nio/ByteBuffer;JILjava/nio/ByteBuffer;Lorg/apache/activemq/artemis/nativo/jlibaio/SubmitInfo;)V
+ */
+JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_nativo_jlibaio_LibaioContext_submitWrite
+ (JNIEnv *, jobject, jint, jobject, jlong, jint, jobject, jobject);
+
+/*
+ * Class: org_apache_activemq_artemis_nativo_jlibaio_LibaioContext
+ * Method: submitRead
+ * Signature: (ILjava/nio/ByteBuffer;JILjava/nio/ByteBuffer;Lorg/apache/activemq/artemis/nativo/jlibaio/SubmitInfo;)V
+ */
+JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_nativo_jlibaio_LibaioContext_submitRead
+ (JNIEnv *, jobject, jint, jobject, jlong, jint, jobject, jobject);
+
+/*
+ * Class: org_apache_activemq_artemis_nativo_jlibaio_LibaioContext
+ * Method: poll
+ * Signature: (Ljava/nio/ByteBuffer;[Lorg/apache/activemq/artemis/nativo/jlibaio/SubmitInfo;II)I
+ */
+JNIEXPORT jint JNICALL Java_org_apache_activemq_artemis_nativo_jlibaio_LibaioContext_poll
+ (JNIEnv *, jobject, jobject, jobjectArray, jint, jint);
+
+/*
+ * Class: org_apache_activemq_artemis_nativo_jlibaio_LibaioContext
+ * Method: blockedPoll
+ * Signature: (Ljava/nio/ByteBuffer;Z)V
+ */
+JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_nativo_jlibaio_LibaioContext_blockedPoll
+ (JNIEnv *, jobject, jobject, jboolean);
+
+/*
+ * Class: org_apache_activemq_artemis_nativo_jlibaio_LibaioContext
+ * Method: getNativeVersion
+ * Signature: ()I
+ */
+JNIEXPORT jint JNICALL Java_org_apache_activemq_artemis_nativo_jlibaio_LibaioContext_getNativeVersion
+ (JNIEnv *, jclass);
+
+/*
+ * Class: org_apache_activemq_artemis_nativo_jlibaio_LibaioContext
+ * Method: lock
+ * Signature: (I)Z
+ */
+JNIEXPORT jboolean JNICALL Java_org_apache_activemq_artemis_nativo_jlibaio_LibaioContext_lock
+ (JNIEnv *, jclass, jint);
+
+/*
+ * Class: org_apache_activemq_artemis_nativo_jlibaio_LibaioContext
+ * Method: memsetBuffer
+ * Signature: (Ljava/nio/ByteBuffer;I)V
+ */
+JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_nativo_jlibaio_LibaioContext_memsetBuffer
+ (JNIEnv *, jclass, jobject, jint);
+
+/*
+ * Class: org_apache_activemq_artemis_nativo_jlibaio_LibaioContext
+ * Method: getSize
+ * Signature: (I)J
+ */
+JNIEXPORT jlong JNICALL Java_org_apache_activemq_artemis_nativo_jlibaio_LibaioContext_getSize
+ (JNIEnv *, jclass, jint);
+
+/*
+ * Class: org_apache_activemq_artemis_nativo_jlibaio_LibaioContext
+ * Method: getBlockSizeFD
+ * Signature: (I)I
+ */
+JNIEXPORT jint JNICALL Java_org_apache_activemq_artemis_nativo_jlibaio_LibaioContext_getBlockSizeFD
+ (JNIEnv *, jclass, jint);
+
+/*
+ * Class: org_apache_activemq_artemis_nativo_jlibaio_LibaioContext
+ * Method: getBlockSize
+ * Signature: (Ljava/lang/String;)I
+ */
+JNIEXPORT jint JNICALL Java_org_apache_activemq_artemis_nativo_jlibaio_LibaioContext_getBlockSize
+ (JNIEnv *, jclass, jstring);
+
+/*
+ * Class: org_apache_activemq_artemis_nativo_jlibaio_LibaioContext
+ * Method: fallocate
+ * Signature: (IJ)V
+ */
+JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_nativo_jlibaio_LibaioContext_fallocate
+ (JNIEnv *, jclass, jint, jlong);
+
+/*
+ * Class: org_apache_activemq_artemis_nativo_jlibaio_LibaioContext
+ * Method: fill
+ * Signature: (IIJ)V
+ */
+JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_nativo_jlibaio_LibaioContext_fill
+ (JNIEnv *, jclass, jint, jint, jlong);
+
+/*
+ * Class: org_apache_activemq_artemis_nativo_jlibaio_LibaioContext
+ * Method: writeInternal
+ * Signature: (IJJLjava/nio/ByteBuffer;)V
+ */
+JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_nativo_jlibaio_LibaioContext_writeInternal
+ (JNIEnv *, jclass, jint, jlong, jlong, jobject);
+
+#ifdef __cplusplus
+}
+#endif
+#endif
diff --git a/src/main/java/org/apache/activemq/artemis/nativo/jlibaio/LibaioContext.java b/src/main/java/org/apache/activemq/artemis/nativo/jlibaio/LibaioContext.java
index 945b2de..794e112 100644
--- a/src/main/java/org/apache/activemq/artemis/nativo/jlibaio/LibaioContext.java
+++ b/src/main/java/org/apache/activemq/artemis/nativo/jlibaio/LibaioContext.java
@@ -47,7 +47,7 @@
/**
* The Native layer will look at this version.
*/
- private static final int EXPECTED_NATIVE_VERSION = 9;
+ private static final int EXPECTED_NATIVE_VERSION = 10;
private static boolean loaded = false;
@@ -81,6 +81,9 @@
for (String library : libraries) {
if (loadLibrary(library)) {
loaded = true;
+ if (System.getProperty("org.apache.activemq.artemis.native.jlibaio.FORCE_SYSCALL") != null) {
+ LibaioContext.setForceSyscall(true);
+ }
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
@@ -107,6 +110,11 @@
private static native void shutdownHook();
+ public static native void setForceSyscall(boolean value);
+
+ /** The system may choose to set this if a failing condition happened inside the code. */
+ public static native boolean isForceSyscall();
+
/**
* This is used to validate leaks on tests.
*
diff --git a/src/test/java/org/apache/activemq/artemis/nativo/jlibaio/test/LibaioStressTest.java b/src/test/java/org/apache/activemq/artemis/nativo/jlibaio/test/LibaioStressTest.java
new file mode 100644
index 0000000..e4e55b8
--- /dev/null
+++ b/src/test/java/org/apache/activemq/artemis/nativo/jlibaio/test/LibaioStressTest.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.nativo.jlibaio.test;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.artemis.nativo.jlibaio.LibaioContext;
+import org.apache.activemq.artemis.nativo.jlibaio.LibaioFile;
+import org.apache.activemq.artemis.nativo.jlibaio.SubmitInfo;
+import org.apache.activemq.artemis.nativo.jlibaio.util.CallbackCache;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+/**
+ * This test is using a different package from {@link LibaioFile}
+ * as I need to validate public methods on the API
+ */
+public class LibaioStressTest {
+
+ private static final int STRESS_TIME = Integer.parseInt(System.getProperty("test.stress.time", "5000"));
+
+ /**
+ * This is just an arbitrary number for a number of elements you need to pass to the libaio init method
+ * Some of the tests are using half of this number, so if anyone decide to change this please use an even number.
+ */
+ private static final int LIBAIO_QUEUE_SIZE = 4096;
+
+ private int errors = 0;
+
+ private boolean running = true;
+
+ @Rule
+ public TemporaryFolder temporaryFolder;
+
+ public LibaioContext<MyClass> control;
+
+ @Before
+ public void setUpFactory() {
+ control = new LibaioContext<>(LIBAIO_QUEUE_SIZE, true, false);
+ }
+
+ @After
+ public void deleteFactory() {
+ control.close();
+ validateLibaio();
+ }
+
+ public void validateLibaio() {
+ Assert.assertEquals(0, LibaioContext.getTotalMaxIO());
+ }
+
+ public LibaioStressTest() {
+ /*
+ * I didn't use /tmp for three reasons
+ * - Most systems now will use tmpfs which is not compatible with O_DIRECT
+ * - This would fill up /tmp in case of failures.
+ * - target is cleaned up every time you do a mvn clean, so it's safer
+ */
+ File parent = new File("./target");
+ parent.mkdirs();
+ temporaryFolder = new TemporaryFolder(parent);
+ }
+
+ @Test
+ public void testOpen() throws Exception {
+ LibaioFile fileDescriptor = control.openFile(temporaryFolder.newFile("test.bin"), true);
+ fileDescriptor.close();
+ }
+
+
+ CallbackCache<MyClass> callbackCache = new CallbackCache<>(LIBAIO_QUEUE_SIZE);
+
+ class MyClass implements SubmitInfo {
+
+ ReusableLatch reusableLatch;
+
+ @Override
+ public void onError(int errno, String message) {
+
+ }
+
+ @Override
+ public void done() {
+ try {
+ reusableLatch.countDown();
+ reusableLatch = null;
+ callbackCache.put(this);
+ } catch (Throwable e) {
+ e.printStackTrace();
+ System.exit(-1);
+ }
+ }
+ }
+
+ @Test
+ public void testForceSyscall() {
+ Assert.assertFalse(LibaioContext.isForceSyscall());
+ LibaioContext.setForceSyscall(true);
+ Assert.assertTrue(LibaioContext.isForceSyscall());
+ LibaioContext.setForceSyscall(false);
+ }
+
+ @Test
+ public void testStressWrites() throws Exception {
+ Assume.assumeFalse(LibaioContext.isForceSyscall());
+
+ Thread t = new Thread() {
+ @Override
+ public void run() {
+ control.poll();
+ }
+ };
+
+ t.start();
+
+
+ Thread t2 = new Thread(() -> {
+ while (running) {
+ try {
+ Thread.sleep(1000);
+ } catch (Exception e) {
+ }
+ // this is just to make things more interesting from the POV of testing
+ System.gc();
+ }
+ });
+
+ t2.start();
+
+ Thread test1 = startThread("test1.bin");
+ Thread test2 = startThread("test2.bin");
+ Thread.sleep(STRESS_TIME); // Configured timeout on the test
+ running = false;
+ test2.join();
+ test1.join();
+ t2.join();
+
+ Assert.assertFalse(LibaioContext.isForceSyscall());
+ return;
+ }
+
+ private Thread startThread(String name) {
+ Thread t_test = new Thread(() -> {
+ try {
+ doFile(name);
+ } catch (IOException e) {
+ e.printStackTrace();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ });
+ t_test.start();
+
+ return t_test;
+ }
+
+ private void doFile(String fileName) throws IOException, InterruptedException {
+ ReusableLatch latchWrites = new ReusableLatch(0);
+
+ File file = temporaryFolder.newFile(fileName);
+ LibaioFile fileDescriptor = control.openFile(file, true);
+
+ // ByteBuffer buffer = ByteBuffer.allocateDirect(4096);
+ ByteBuffer buffer = LibaioContext.newAlignedBuffer(4096, 4096);
+
+ int maxSize = 4096 * LIBAIO_QUEUE_SIZE;
+ fileDescriptor.fill(4096, maxSize);
+ for (int i = 0; i < 4096; i++) {
+ buffer.put((byte) 'a');
+ }
+
+ buffer.rewind();
+
+ int pos = 0;
+
+ long count = 0;
+
+ long nextBreak = System.currentTimeMillis() + 3000;
+
+ while (running) {
+ count++;
+
+ if (System.currentTimeMillis() > nextBreak) {
+ if (!latchWrites.await(10, TimeUnit.SECONDS)) {
+ System.err.println("Latch did not complete for some reason");
+ errors++;
+ return;
+ }
+ fileDescriptor.close();
+
+ fileDescriptor = control.openFile(file, true);
+ pos = 0;
+ // we close / open a file every 5 seconds
+ nextBreak = System.currentTimeMillis() + 5000;
+ }
+
+ if (count % 1_000 == 0) {
+ System.out.println("Writen " + count + " buffers at " + fileName);
+ }
+ MyClass myClass = callbackCache.get();
+
+ if (myClass == null) {
+ myClass = new MyClass();
+ }
+
+ myClass.reusableLatch = latchWrites;
+ myClass.reusableLatch.countUp();
+
+
+ if (count % 100 == 0) {
+ Thread.sleep(100);
+ }
+ fileDescriptor.write(pos, 4096, buffer, myClass);
+ pos += 4096;
+
+ if (pos >= maxSize) {
+ pos = 0;
+ }
+
+ }
+
+ fileDescriptor.close();
+ }
+
+}
diff --git a/src/test/java/org/apache/activemq/artemis/nativo/jlibaio/test/ReusableLatch.java b/src/test/java/org/apache/activemq/artemis/nativo/jlibaio/test/ReusableLatch.java
new file mode 100644
index 0000000..86a8848
--- /dev/null
+++ b/src/test/java/org/apache/activemq/artemis/nativo/jlibaio/test/ReusableLatch.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.nativo.jlibaio.test;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.AbstractQueuedSynchronizer;
+
+/**
+ * <p>This class will use the framework provided to by AbstractQueuedSynchronizer.</p>
+ * <p>AbstractQueuedSynchronizer is the framework for any sort of concurrent synchronization, such as Semaphores, events, etc, based on AtomicIntegers.</p>
+ *
+ * <p>This class works just like CountDownLatch, with the difference you can also increase the counter</p>
+ *
+ * <p>It could be used for sync points when one process is feeding the latch while another will wait when everything is done. (e.g. waiting IO completions to finish)</p>
+ *
+ * <p>On ActiveMQ Artemis we have the requirement of increment and decrement a counter until the user fires a ready event (commit). At that point we just act as a regular countDown.</p>
+ *
+ * <p>Note: This latch is reusable. Once it reaches zero, you can call up again, and reuse it on further waits.</p>
+ *
+ * <p>For example: prepareTransaction will wait for the current completions, and further adds will be called on the latch. Later on when commit is called you can reuse the same latch.</p>
+ */
+public class ReusableLatch {
+
+ /**
+ * Look at the doc and examples provided by AbstractQueuedSynchronizer for more information
+ *
+ * @see AbstractQueuedSynchronizer
+ */
+ @SuppressWarnings("serial")
+ private static class CountSync extends AbstractQueuedSynchronizer {
+
+ private CountSync(int count) {
+ setState(count);
+ }
+
+ public int getCount() {
+ return getState();
+ }
+
+ public void setCount(final int count) {
+ setState(count);
+ }
+
+ @Override
+ public int tryAcquireShared(final int numberOfAqcquires) {
+ return getState() == 0 ? 1 : -1;
+ }
+
+ public void add() {
+ for (;;) {
+ int actualState = getState();
+ int newState = actualState + 1;
+ if (compareAndSetState(actualState, newState)) {
+ return;
+ }
+ }
+ }
+
+ @Override
+ public boolean tryReleaseShared(final int numberOfReleases) {
+ for (;;) {
+ int actualState = getState();
+ if (actualState == 0) {
+ return true;
+ }
+
+ int newState = actualState - numberOfReleases;
+
+ if (newState < 0) {
+ newState = 0;
+ }
+
+ if (compareAndSetState(actualState, newState)) {
+ return newState == 0;
+ }
+ }
+ }
+ }
+
+ private final CountSync control;
+
+ public ReusableLatch() {
+ this(0);
+ }
+
+ public ReusableLatch(final int count) {
+ control = new CountSync(count);
+ }
+
+ public int getCount() {
+ return control.getCount();
+ }
+
+ public void setCount(final int count) {
+ control.setCount(count);
+ }
+
+ public void countUp() {
+ control.add();
+ }
+
+ public void countDown() {
+ control.releaseShared(1);
+ }
+
+ public void countDown(final int count) {
+ control.releaseShared(count);
+ }
+
+ public void await() throws InterruptedException {
+ control.acquireSharedInterruptibly(1);
+ }
+
+ public boolean await(final long milliseconds) throws InterruptedException {
+ return control.tryAcquireSharedNanos(1, TimeUnit.MILLISECONDS.toNanos(milliseconds));
+ }
+
+ public boolean await(final long timeWait, TimeUnit timeUnit) throws InterruptedException {
+ return control.tryAcquireSharedNanos(1, timeUnit.toNanos(timeWait));
+ }
+}