[ARIES-1926] UpdateSupport should not reorder effects
So publication of new event should happen after the termination of the
deferred terminators when running inside an upgrade operation.
diff --git a/component-dsl/src/main/java/org/apache/aries/component/dsl/OSGi.java b/component-dsl/src/main/java/org/apache/aries/component/dsl/OSGi.java
index 9c5526b..7448ad4 100644
--- a/component-dsl/src/main/java/org/apache/aries/component/dsl/OSGi.java
+++ b/component-dsl/src/main/java/org/apache/aries/component/dsl/OSGi.java
@@ -289,16 +289,20 @@
return t -> {
if (count.getAndIncrement() == 0) {
- terminator.set(op.apply(t));
+ UpdateSupport.deferPublication(
+ () -> terminator.set(op.apply(t)));
}
- return () -> UpdateSupport.defer(() -> {
+ return () -> {
if (count.decrementAndGet() == 0) {
- Runnable runnable = terminator.getAndSet(NOOP);
+ UpdateSupport.deferTermination(() -> {
+ Runnable runnable = terminator.getAndSet(NOOP);
- runnable.run();
+ runnable.run();
+ });
}
- });
+
+ };
};
});
}
diff --git a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/CoalesceOSGiImpl.java b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/CoalesceOSGiImpl.java
index ab47efd..218f908 100644
--- a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/CoalesceOSGiImpl.java
+++ b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/CoalesceOSGiImpl.java
@@ -23,6 +23,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
/**
* @author Carlos Sierra Andrés
@@ -48,7 +49,8 @@
final int pos = i;
publishers[i] = t -> {
- OSGiResult result;
+ AtomicReference<OSGiResult> result =
+ new AtomicReference<>();
synchronized (initialized) {
atomicInteger.incrementAndGet();
@@ -64,35 +66,34 @@
}
}
- result = op.publish(t);
+ UpdateSupport.deferPublication(
+ () -> result.set(op.publish(t)));
}
- return () -> {
+ return () -> UpdateSupport.deferTermination(() -> {
synchronized (initialized) {
- result.close();
+ result.get().close();
- UpdateSupport.defer(() -> {
- int current = atomicInteger.decrementAndGet();
+ int current = atomicInteger.decrementAndGet();
- if (!initialized.get()) {
- return;
- }
+ if (!initialized.get()) {
+ return;
+ }
- if (pos <= index.get() && current == 0) {
- for (int j = pos + 1; j < results.length; j++) {
- results[j] = programs[j].run(
- bundleContext, publishers[j]);
+ if (pos <= index.get() && current == 0) {
+ for (int j = pos + 1; j < results.length; j++) {
+ results[j] = programs[j].run(
+ bundleContext, publishers[j]);
- index.set(j);
+ index.set(j);
- if (atomicIntegers[j].get() > 0) {
- break;
- }
+ if (atomicIntegers[j].get() > 0) {
+ break;
}
}
- });
+ }
}
- };
+ });
};
}
diff --git a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/UpdateSupport.java b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/UpdateSupport.java
index 7a71455..5008bf9 100644
--- a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/UpdateSupport.java
+++ b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/UpdateSupport.java
@@ -26,11 +26,22 @@
public class UpdateSupport {
private static final ThreadLocal<Deque<Deque<Runnable>>>
+ deferredPublishersStack = ThreadLocal.withInitial(LinkedList::new);
+ private static final ThreadLocal<Deque<Deque<Runnable>>>
deferredTerminatorsStack = ThreadLocal.withInitial(LinkedList::new);
private static final ThreadLocal<Boolean> isUpdate =
ThreadLocal.withInitial(() -> Boolean.FALSE);
- public static void defer(Runnable runnable) {
+ public static void deferPublication(Runnable runnable) {
+ if (isUpdate.get()) {
+ deferredPublishersStack.get().peekLast().addLast(runnable);
+ }
+ else {
+ runnable.run();
+ }
+ }
+
+ public static void deferTermination(Runnable runnable) {
if (isUpdate.get()) {
deferredTerminatorsStack.get().peekLast().addLast(runnable);
}
@@ -42,9 +53,13 @@
public static void runUpdate(Runnable runnable) {
isUpdate.set(true);
- Deque<Deque<Runnable>> deferred = deferredTerminatorsStack.get();
+ Deque<Deque<Runnable>> deferredPublishers =
+ deferredPublishersStack.get();
+ Deque<Deque<Runnable>> deferredTerminators =
+ deferredTerminatorsStack.get();
- deferred.addLast(new LinkedList<>());
+ deferredPublishers.addLast(new LinkedList<>());
+ deferredTerminators.addLast(new LinkedList<>());
try {
runnable.run();
@@ -58,6 +73,15 @@
for (Runnable terminator : terminators) {
terminator.run();
}
+
+ Deque<Runnable> publishers =
+ deferredPublishersStack.get().removeLast();
+
+ for (Runnable publisher : publishers) {
+ publisher.run();
+ }
+
+ isUpdate.set(!deferredTerminators.isEmpty());
}
}
}
diff --git a/component-dsl/src/test/java/org/apache/aries/component/dsl/internal/UpdateSupportTest.java b/component-dsl/src/test/java/org/apache/aries/component/dsl/internal/UpdateSupportTest.java
index 1d3f531..c635e06 100644
--- a/component-dsl/src/test/java/org/apache/aries/component/dsl/internal/UpdateSupportTest.java
+++ b/component-dsl/src/test/java/org/apache/aries/component/dsl/internal/UpdateSupportTest.java
@@ -37,12 +37,27 @@
UpdateSupport.runUpdate(() -> {
list.add(1);
- UpdateSupport.defer(() -> list.add(3));
+ UpdateSupport.deferPublication(() -> list.add(4));
+ UpdateSupport.deferTermination(() -> list.add(3));
list.add(2);
});
- assertEquals(Arrays.asList(1, 2, 3), list);
+ assertEquals(Arrays.asList(1, 2, 3, 4), list);
+ }
+
+ @Test
+ public void testDeferOutsideUpdate() {
+ List<Integer> list = new ArrayList<>();
+
+ list.add(1);
+
+ UpdateSupport.deferPublication(() -> list.add(2));
+ UpdateSupport.deferTermination(() -> list.add(3));
+
+ list.add(4);
+
+ assertEquals(Arrays.asList(1, 2, 3, 4), list);
}
@Test
@@ -52,48 +67,58 @@
UpdateSupport.runUpdate(() -> {
list.add(1);
- UpdateSupport.defer(() -> list.add(6));
+ UpdateSupport.deferTermination(() -> list.add(9));
UpdateSupport.runUpdate(() -> {
list.add(2);
- UpdateSupport.defer(() -> {
+ UpdateSupport.deferTermination(() -> {
list.add(3);
- UpdateSupport.defer(() ->
- UpdateSupport.defer(() -> list.add(4)));
+ UpdateSupport.deferTermination(() -> {
+ UpdateSupport.deferPublication(() -> list.add(4));
+ UpdateSupport.deferTermination(() -> list.add(5));
- list.add(5);
+ UpdateSupport.runUpdate(() -> {
+ UpdateSupport.deferPublication(() -> list.add(7));
+ UpdateSupport.deferTermination(() -> list.add(6));
+ });
+ });
+
+ list.add(8);
});
});
+
+ UpdateSupport.deferPublication(() -> list.add(10));
});
- assertEquals(Arrays.asList(1, 2, 3, 4, 5, 6), list);
+ assertEquals(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), list);
}
@Test
- public void testDeferNestedStackWithUpdate() {
+ public void testDeferTerminationNestedStackWithUpdate() {
List<Integer> list = new ArrayList<>();
UpdateSupport.runUpdate(() -> {
list.add(1);
- UpdateSupport.defer(() -> list.add(9));
+ UpdateSupport.deferTermination(() -> list.add(9));
UpdateSupport.runUpdate(() -> {
list.add(2);
- UpdateSupport.defer(() -> {
+ UpdateSupport.deferTermination(() -> {
UpdateSupport.runUpdate(
() -> {
list.add(4);
- UpdateSupport.defer(
+ UpdateSupport.deferTermination(
() -> UpdateSupport.runUpdate(
() -> {
list.add(6);
- UpdateSupport.defer(() -> list.add(8));
+ UpdateSupport.deferTermination(
+ () -> list.add(8));
list.add(7);
}));
@@ -110,13 +135,13 @@
}
@Test
- public void testDeferStack() {
+ public void testDeferTerminationStack() {
List<Integer> list = new ArrayList<>();
UpdateSupport.runUpdate(() -> {
list.add(1);
- UpdateSupport.defer(() -> list.add(6));
+ UpdateSupport.deferTermination(() -> list.add(6));
UpdateSupport.runUpdate(() -> {
list.add(2);
@@ -124,10 +149,10 @@
UpdateSupport.runUpdate(() -> {
list.add(3);
- UpdateSupport.defer(() -> list.add(4));
+ UpdateSupport.deferTermination(() -> list.add(4));
});
- UpdateSupport.defer(() -> list.add(5));
+ UpdateSupport.deferTermination(() -> list.add(5));
});
});