ARTEMIS-832 Openwire was ignoring data syncs.
I'm also adding the possibility of sync on libaio, and not only relay on write-cache
diff --git a/bin/libartemis-native-64.so b/bin/libartemis-native-64.so
old mode 100644
new mode 100755
index 95a5451..8cbe851
--- a/bin/libartemis-native-64.so
+++ b/bin/libartemis-native-64.so
Binary files differ
diff --git a/src/main/c/org_apache_activemq_artemis_jlibaio_LibaioContext.c b/src/main/c/org_apache_activemq_artemis_jlibaio_LibaioContext.c
index 74545fc..3f7c213 100644
--- a/src/main/c/org_apache_activemq_artemis_jlibaio_LibaioContext.c
+++ b/src/main/c/org_apache_activemq_artemis_jlibaio_LibaioContext.c
@@ -536,7 +536,7 @@
}
JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_blockedPoll
- (JNIEnv * env, jobject thisObject, jobject contextPointer) {
+ (JNIEnv * env, jobject thisObject, jobject contextPointer, jboolean useFdatasync) {
#ifdef DEBUG
fprintf (stdout, "Running blockedPoll\n");
@@ -553,6 +553,8 @@
short running = 1;
+ int lastFile = -1;
+
while (running) {
int result = io_getevents(theControl->ioContext, 1, max, theControl->events, 0);
@@ -574,6 +576,8 @@
fflush(stdout);
#endif
+ lastFile = -1;
+
for (i = 0; i < result; i++)
{
#ifdef DEBUG
@@ -593,6 +597,11 @@
break;
}
+ if (useFdatasync && lastFile != iocbp->aio_fildes) {
+ lastFile = iocbp->aio_fildes;
+ fdatasync(lastFile);
+ }
+
int eventResult = (int)event->res;
diff --git a/src/main/java/org/apache/activemq/artemis/jlibaio/LibaioContext.java b/src/main/java/org/apache/activemq/artemis/jlibaio/LibaioContext.java
index 8049a97..cdaea55 100644
--- a/src/main/java/org/apache/activemq/artemis/jlibaio/LibaioContext.java
+++ b/src/main/java/org/apache/activemq/artemis/jlibaio/LibaioContext.java
@@ -49,7 +49,7 @@
* <br>
* Or else the native module won't be loaded because of version mismatches
*/
- private static final int EXPECTED_NATIVE_VERSION = 6;
+ private static final int EXPECTED_NATIVE_VERSION = 7;
private static boolean loaded = false;
@@ -146,6 +146,8 @@
final int queueSize;
+ final boolean useFdatasync;
+
/**
* The queue size here will use resources defined on the kernel parameter
* <a href="https://www.kernel.org/doc/Documentation/sysctl/fs.txt">fs.aio-max-nr</a> .
@@ -153,11 +155,13 @@
* @param queueSize the size to be initialize on libaio
* io_queue_init which can't be higher than /proc/sys/fs/aio-max-nr.
* @param useSemaphore should block on a semaphore avoiding using more submits than what's available.
+ * @param useFdatasync should use fdatasync before calling callbacks.
*/
- public LibaioContext(int queueSize, boolean useSemaphore) {
+ public LibaioContext(int queueSize, boolean useSemaphore, boolean useFdatasync) {
try {
contexts.incrementAndGet();
this.ioContext = newContext(queueSize);
+ this.useFdatasync = useFdatasync;
} catch (Exception e) {
throw e;
}
@@ -349,7 +353,7 @@
*/
public void poll() {
if (!closed.get()) {
- blockedPoll(ioContext);
+ blockedPoll(ioContext, useFdatasync);
}
}
@@ -436,7 +440,7 @@
/**
* This method will block as long as the context is open.
*/
- native void blockedPoll(ByteBuffer libaioContext);
+ native void blockedPoll(ByteBuffer libaioContext, boolean useFdatasync);
static native int getNativeVersion();
diff --git a/src/test/java/org/apache/activemq/artemis/jlibaio/test/LibaioTest.java b/src/test/java/org/apache/activemq/artemis/jlibaio/test/LibaioTest.java
index 7f98f0d..1013966 100644
--- a/src/test/java/org/apache/activemq/artemis/jlibaio/test/LibaioTest.java
+++ b/src/test/java/org/apache/activemq/artemis/jlibaio/test/LibaioTest.java
@@ -54,7 +54,7 @@
parent.mkdirs();
boolean failed = false;
- try (LibaioContext control = new LibaioContext<>(1, true); LibaioFile fileDescriptor = control.openFile(file, true)) {
+ try (LibaioContext control = new LibaioContext<>(1, true, true); LibaioFile fileDescriptor = control.openFile(file, true)) {
fileDescriptor.fallocate(4 * 1024);
} catch (Exception e) {
e.printStackTrace();
@@ -80,7 +80,7 @@
@Before
public void setUpFactory() {
- control = new LibaioContext<>(LIBAIO_QUEUE_SIZE, true);
+ control = new LibaioContext<>(LIBAIO_QUEUE_SIZE, true, true);
}
@After
@@ -532,10 +532,10 @@
boolean exceptionThrown = false;
control.close();
- control = new LibaioContext<>(LIBAIO_QUEUE_SIZE, false);
+ control = new LibaioContext<>(LIBAIO_QUEUE_SIZE, false, true);
try {
// There is no space for a queue this huge, the native layer should throw the exception
- LibaioContext newController = new LibaioContext(Integer.MAX_VALUE, false);
+ LibaioContext newController = new LibaioContext(Integer.MAX_VALUE, false, true);
} catch (RuntimeException e) {
exceptionThrown = true;
}
@@ -630,7 +630,7 @@
@Test
public void testBlockedCallback() throws Exception {
- final LibaioContext blockedContext = new LibaioContext(500, true);
+ final LibaioContext blockedContext = new LibaioContext(500, true, true);
Thread t = new Thread() {
@Override
public void run() {
diff --git a/src/test/java/org/apache/activemq/artemis/jlibaio/test/OpenCloseContextTest.java b/src/test/java/org/apache/activemq/artemis/jlibaio/test/OpenCloseContextTest.java
index c04bff4..b515663 100644
--- a/src/test/java/org/apache/activemq/artemis/jlibaio/test/OpenCloseContextTest.java
+++ b/src/test/java/org/apache/activemq/artemis/jlibaio/test/OpenCloseContextTest.java
@@ -53,7 +53,7 @@
for (int i = 0; i < 10; i++) {
System.out.println("#test " + i);
- final LibaioContext control = new LibaioContext<>(5, true);
+ final LibaioContext control = new LibaioContext<>(5, true, true);
Thread t = new Thread() {
@Override
public void run() {
@@ -111,7 +111,7 @@
for (int i = 0; i < 10; i++) {
System.out.println("#test " + i);
- final LibaioContext control = new LibaioContext<>(5, true);
+ final LibaioContext control = new LibaioContext<>(5, true, true);
Thread t = new Thread() {
@Override
public void run() {
@@ -164,9 +164,9 @@
@Test
public void testCloseAndStart() throws Exception {
- final LibaioContext control2 = new LibaioContext<>(5, true);
+ final LibaioContext control2 = new LibaioContext<>(5, true, true);
- final LibaioContext control = new LibaioContext<>(5, true);
+ final LibaioContext control = new LibaioContext<>(5, true, true);
control.close();
control.poll();