Apply elsewhere
diff --git a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/HighestRankingOSGi.java b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/HighestRankingOSGi.java
index 7bf89f7..99cd1dc 100644
--- a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/HighestRankingOSGi.java
+++ b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/HighestRankingOSGi.java
@@ -34,7 +34,7 @@
OSGi<T> previous, Comparator<? super T> comparator,
Function<OSGi<T>, OSGi<T>> notHighest) {
- super((executionContext, highestPipe) -> {
+ super((executionContext, publisher) -> {
Comparator<Tuple<T>> comparing = Comparator.comparing(
Tuple::getT, comparator);
PriorityQueue<Tuple<T>> set = new PriorityQueue<>(
@@ -42,11 +42,11 @@
AtomicReference<Tuple<T>> sent = new AtomicReference<>();
Pad<T, T> notHighestPad = new Pad<>(
- executionContext, notHighest, highestPipe);
+ executionContext, notHighest, publisher);
OSGiResult result = previous.run(
executionContext,
- t -> {
+ publisher.pipe(t -> {
Tuple<T> tuple = new Tuple<>(t);
synchronized (set) {
@@ -59,7 +59,7 @@
old._runnable.run();
}
- tuple._runnable = highestPipe.apply(t);
+ tuple._runnable = publisher.apply(t);
if (old != null) {
old._runnable = notHighestPad.publish(old._t);
@@ -83,7 +83,7 @@
if (current != old && current != null) {
current._runnable.run();
- current._runnable = highestPipe.apply(
+ current._runnable = publisher.apply(
current._t);
sent.set(current);
}
@@ -92,7 +92,7 @@
}
}
};
- });
+ }));
return new OSGiResultImpl(
() -> {
diff --git a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/IgnoreImpl.java b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/IgnoreImpl.java
index 751a22a..04e3979 100644
--- a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/IgnoreImpl.java
+++ b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/IgnoreImpl.java
@@ -26,7 +26,7 @@
public IgnoreImpl(OSGi<?> program) {
- super((executionContext, op) -> program.run(executionContext, t -> NOOP));
+ super((executionContext, op) -> program.run(executionContext, op.pipe(t -> NOOP)));
}
}
diff --git a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/Pad.java b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/Pad.java
index 63be2f7..d9615a7 100644
--- a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/Pad.java
+++ b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/Pad.java
@@ -43,10 +43,11 @@
_result = next.run(bundleContext, continuation);
- _publisher =
+ _publisher = continuation.pipe(
probe.getPublisher() != null ?
- probe.getPublisher() :
- __ -> NOOP;
+ probe.getPublisher()::publish :
+ __ -> NOOP
+ );
}
@Override
@@ -59,6 +60,11 @@
return _publisher.publish(t);
}
+ @Override
+ public <E extends Exception> OSGiResult error(T t, Exception e) throws E {
+ return _publisher.error(t, e);
+ }
+
private final OSGiResult _result;
private final Publisher<? super T> _publisher;
}