pipe makes sense for Publisher not for BaseOsgiImpl
diff --git a/component-dsl/src/main/java/org/apache/aries/component/dsl/Publisher.java b/component-dsl/src/main/java/org/apache/aries/component/dsl/Publisher.java
index c72f585..61565af 100644
--- a/component-dsl/src/main/java/org/apache/aries/component/dsl/Publisher.java
+++ b/component-dsl/src/main/java/org/apache/aries/component/dsl/Publisher.java
@@ -34,4 +34,20 @@
throw (E)e;
}
+ default <S> Publisher<S> pipe(Function<? super S, OSGiResult> next) {
+
+ return new Publisher<S>() {
+ @Override
+ public OSGiResult publish(S t) {
+ return next.apply(t);
+ }
+
+ @Override
+ public <E extends Exception> OSGiResult error(S s, Exception e) throws E {
+ return Publisher.this.error((T)s, e);
+ }
+ };
+
+ }
+
}
diff --git a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/BaseOSGiImpl.java b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/BaseOSGiImpl.java
index a9fdeea..1b68e2a 100644
--- a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/BaseOSGiImpl.java
+++ b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/BaseOSGiImpl.java
@@ -101,7 +101,7 @@
@Override
public <S> OSGi<S> applyTo(OSGi<Function<T, S>> fun) {
- return pipe((executionContext, op) -> {
+ return new BaseOSGiImpl<>((executionContext, op) -> {
ConcurrentDoublyLinkedList<T> identities = new ConcurrentDoublyLinkedList<>();
ConcurrentDoublyLinkedList<Function<T,S>> functions = new ConcurrentDoublyLinkedList<>();
IdentityHashMap<T, IdentityHashMap<Function<T, S>, Runnable>>
@@ -109,7 +109,7 @@
OSGiResult funRun = fun.run(
executionContext,
- wrap(op, f -> {
+ op.pipe(f -> {
synchronized(identities) {
ConcurrentDoublyLinkedList.Node node = functions.addLast(f);
@@ -137,7 +137,7 @@
OSGiResult myRun = run(
executionContext,
- wrap(op, t -> {
+ op.pipe(t -> {
synchronized (identities) {
ConcurrentDoublyLinkedList.Node node = identities.addLast(t);
@@ -176,13 +176,13 @@
Function<T, OSGi<Boolean>> chooser, Function<OSGi<T>, OSGi<S>> then,
Function<OSGi<T>, OSGi<S>> otherwise) {
- return pipe((executionContext, publisher) -> {
- Pad<T, S> thenPad = new Pad<>(executionContext, then, publisher);
- Pad<T, S> elsePad = new Pad<>(executionContext, otherwise, publisher);
+ return new BaseOSGiImpl<>((executionContext, op) -> {
+ Pad<T, S> thenPad = new Pad<>(executionContext, then, op);
+ Pad<T, S> elsePad = new Pad<>(executionContext, otherwise, op);
OSGiResult result = run(
executionContext,
- wrap(publisher, t -> chooser.apply(t).run(
+ op.pipe(t -> chooser.apply(t).run(
executionContext,
b -> {
if (b) {
@@ -211,10 +211,14 @@
Consumer<? super T> onRemovedBefore,
Consumer<? super T> onRemovedAfter) {
- return pipe((executionContext, op) ->
+ //TODO: logging
+ //TODO: logging
+ //TODO: logging
+ //TODO: logging
+ return new BaseOSGiImpl<>((executionContext, op) ->
run(
executionContext,
- wrap(op, t -> {
+ op.pipe(t -> {
onAddedBefore.accept(t);
try {
@@ -271,10 +275,10 @@
@Override
public OSGi<T> filter(Predicate<T> predicate) {
- return pipe((executionContext, op) ->
+ return new BaseOSGiImpl<>((executionContext, op) ->
run(
executionContext,
- wrap(op, t -> {
+ op.pipe(t -> {
if (predicate.test(t)) {
return op.apply(t);
}
@@ -287,21 +291,19 @@
@Override
public <S> OSGi<S> flatMap(Function<? super T, OSGi<? extends S>> fun) {
- return pipe((executionContext, op) ->
- run(executionContext, wrap(op, t -> fun.apply(t).run(executionContext, op)))
- );
+ return new BaseOSGiImpl<>((executionContext, op) ->
+ run(executionContext, op.pipe(t -> fun.apply(t).run(executionContext, op))));
}
@Override
public <S> OSGi<S> map(Function<? super T, ? extends S> function) {
- return pipe((executionContext, op) ->
- run(executionContext, wrap(op, t -> op.apply(function.apply(t))))
- );
+ return new BaseOSGiImpl<>((executionContext, op) ->
+ run(executionContext, op.pipe(t -> op.apply(function.apply(t)))));
}
@Override
public OSGi<T> recover(BiFunction<T, Exception, T> onError) {
- return pipe((executionContext, op) ->
+ return new BaseOSGiImpl<>((executionContext, op) ->
run(
executionContext,
t -> {
@@ -317,7 +319,7 @@
@Override
public OSGi<T> recoverWith(BiFunction<T, Exception, OSGi<T>> onError) {
- return pipe((executionContext, op) ->
+ return new BaseOSGiImpl<>((executionContext, op) ->
run(
executionContext,
t -> {
@@ -335,12 +337,12 @@
public <K, S> OSGi<S> splitBy(
Function<T, OSGi<K>> mapper, BiFunction<K, OSGi<T>, OSGi<S>> fun) {
- return pipe((executionContext, op) -> {
+ return new BaseOSGiImpl<>((executionContext, op) -> {
HashMap<K, Pad<T, S>> pads = new HashMap<>();
OSGiResult result = run(
executionContext,
- wrap(op, t -> mapper.apply(t).run(
+ op.pipe(t -> mapper.apply(t).run(
executionContext,
k -> pads.computeIfAbsent(
k,
@@ -361,29 +363,8 @@
@Override
public <S> OSGi<S> transform(Transformer<T, S> fun) {
- return pipe(
- (executionContext, op) -> run(
- executionContext, wrap(op, fun.transform(op))));
- }
-
- protected <S> BaseOSGiImpl<S> pipe(OSGiRunnable<S> runnable) {
- return new BaseOSGiImpl<>(runnable);
- }
-
- protected static <T, S> Publisher<S> wrap(
- Publisher<T> prev, Publisher<S> next) {
-
- return new Publisher<S>() {
- @Override
- public OSGiResult publish(S t) {
- return next.publish(t);
- }
-
- @Override
- public <E extends Exception> OSGiResult error(S s, Exception e) throws E {
- return prev.error((T)s, e);
- }
- };
+ return new BaseOSGiImpl<>((executionContext, op) -> run(
+ executionContext, op.pipe(fun.transform(op)::apply)));
}
}
diff --git a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/DistributeOSGiImpl.java b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/DistributeOSGiImpl.java
index f24c9c5..f0bdb71 100644
--- a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/DistributeOSGiImpl.java
+++ b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/DistributeOSGiImpl.java
@@ -16,6 +16,7 @@
import org.apache.aries.component.dsl.OSGi;
import org.apache.aries.component.dsl.OSGiResult;
+import org.apache.aries.component.dsl.Publisher;
import java.util.ArrayList;
import java.util.List;
@@ -39,7 +40,7 @@
OSGiResult result = operation.run(
executionContext,
- wrap(publisher, t -> {
+ publisher.pipe(t -> {
List<Runnable> terminators = new ArrayList<>(funs.length);
int i = 0;
diff --git a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/OSGiImpl.java b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/OSGiImpl.java
index dcb5c03..6529225 100644
--- a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/OSGiImpl.java
+++ b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/OSGiImpl.java
@@ -50,7 +50,7 @@
ExecutionContext ec, Publisher<? super T> op) {
return operation.run(ec,
- wrap(op, t -> {
+ op.pipe(t -> {
try {
return op.publish(t);
} catch (PublisherRethrowException pre) {