pipe makes sense for Publisher not for BaseOsgiImpl
diff --git a/component-dsl/src/main/java/org/apache/aries/component/dsl/Publisher.java b/component-dsl/src/main/java/org/apache/aries/component/dsl/Publisher.java
index c72f585..61565af 100644
--- a/component-dsl/src/main/java/org/apache/aries/component/dsl/Publisher.java
+++ b/component-dsl/src/main/java/org/apache/aries/component/dsl/Publisher.java
@@ -34,4 +34,20 @@
         throw (E)e;
     }
 
+    default <S> Publisher<S> pipe(Function<? super S, OSGiResult> next) {
+
+        return new Publisher<S>() {
+            @Override
+            public OSGiResult publish(S t) {
+                return next.apply(t);
+            }
+
+            @Override
+            public <E extends Exception> OSGiResult error(S s, Exception e) throws E {
+                return Publisher.this.error((T)s, e);
+            }
+        };
+
+    }
+
 }
diff --git a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/BaseOSGiImpl.java b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/BaseOSGiImpl.java
index a9fdeea..1b68e2a 100644
--- a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/BaseOSGiImpl.java
+++ b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/BaseOSGiImpl.java
@@ -101,7 +101,7 @@
 
 	@Override
 	public <S> OSGi<S> applyTo(OSGi<Function<T, S>> fun) {
-		return pipe((executionContext, op) -> {
+		return new BaseOSGiImpl<>((executionContext, op) -> {
 			ConcurrentDoublyLinkedList<T> identities = new ConcurrentDoublyLinkedList<>();
 			ConcurrentDoublyLinkedList<Function<T,S>> functions = new ConcurrentDoublyLinkedList<>();
 			IdentityHashMap<T, IdentityHashMap<Function<T, S>, Runnable>>
@@ -109,7 +109,7 @@
 
 			OSGiResult funRun = fun.run(
 				executionContext,
-				wrap(op, f -> {
+				op.pipe(f -> {
 					synchronized(identities) {
 						ConcurrentDoublyLinkedList.Node node = functions.addLast(f);
 
@@ -137,7 +137,7 @@
 
 			OSGiResult myRun = run(
 				executionContext,
-				wrap(op, t -> {
+				op.pipe(t -> {
 					synchronized (identities) {
 						ConcurrentDoublyLinkedList.Node node = identities.addLast(t);
 
@@ -176,13 +176,13 @@
 		Function<T, OSGi<Boolean>> chooser, Function<OSGi<T>, OSGi<S>> then,
 		Function<OSGi<T>, OSGi<S>> otherwise) {
 
-		return pipe((executionContext, publisher) -> {
-			Pad<T, S> thenPad = new Pad<>(executionContext, then, publisher);
-			Pad<T, S> elsePad = new Pad<>(executionContext, otherwise, publisher);
+		return new BaseOSGiImpl<>((executionContext, op) -> {
+			Pad<T, S> thenPad = new Pad<>(executionContext, then, op);
+			Pad<T, S> elsePad = new Pad<>(executionContext, otherwise, op);
 
 			OSGiResult result = run(
 				executionContext,
-				wrap(publisher, t -> chooser.apply(t).run(
+				op.pipe(t -> chooser.apply(t).run(
                     executionContext,
                     b -> {
                         if (b) {
@@ -211,10 +211,14 @@
 		Consumer<? super T> onRemovedBefore,
 		Consumer<? super T> onRemovedAfter) {
 
-		return pipe((executionContext, op) ->
+		//TODO: logging
+		//TODO: logging
+		//TODO: logging
+		//TODO: logging
+		return new BaseOSGiImpl<>((executionContext, op) ->
 			run(
 				executionContext,
-				wrap(op, t -> {
+				op.pipe(t -> {
 					onAddedBefore.accept(t);
 
 					try {
@@ -271,10 +275,10 @@
 
 	@Override
 	public OSGi<T> filter(Predicate<T> predicate) {
-		return pipe((executionContext, op) ->
+		return new BaseOSGiImpl<>((executionContext, op) ->
 			run(
 				executionContext,
-				wrap(op, t -> {
+				op.pipe(t -> {
 					if (predicate.test(t)) {
 						return op.apply(t);
 					}
@@ -287,21 +291,19 @@
 
 	@Override
 	public <S> OSGi<S> flatMap(Function<? super T, OSGi<? extends S>> fun) {
-		return pipe((executionContext, op) ->
-			run(executionContext, wrap(op, t -> fun.apply(t).run(executionContext, op)))
-		);
+		return new BaseOSGiImpl<>((executionContext, op) ->
+			run(executionContext, op.pipe(t -> fun.apply(t).run(executionContext, op))));
 	}
 
 	@Override
 	public <S> OSGi<S> map(Function<? super T, ? extends S> function) {
-		return pipe((executionContext, op) ->
-			run(executionContext, wrap(op, t -> op.apply(function.apply(t))))
-		);
+		return new BaseOSGiImpl<>((executionContext, op) ->
+			run(executionContext, op.pipe(t -> op.apply(function.apply(t)))));
 	}
 
 	@Override
 	public OSGi<T> recover(BiFunction<T, Exception, T> onError) {
-		return pipe((executionContext, op) ->
+		return new BaseOSGiImpl<>((executionContext, op) ->
 			run(
 				executionContext,
 				t -> {
@@ -317,7 +319,7 @@
 
 	@Override
 	public OSGi<T> recoverWith(BiFunction<T, Exception, OSGi<T>> onError) {
-		return pipe((executionContext, op) ->
+		return new BaseOSGiImpl<>((executionContext, op) ->
 			run(
 				executionContext,
 				t -> {
@@ -335,12 +337,12 @@
 	public <K, S> OSGi<S> splitBy(
 		Function<T, OSGi<K>> mapper, BiFunction<K, OSGi<T>, OSGi<S>> fun) {
 
-		return pipe((executionContext, op) -> {
+		return new BaseOSGiImpl<>((executionContext, op) -> {
 			HashMap<K, Pad<T, S>> pads = new HashMap<>();
 
 			OSGiResult result = run(
 				executionContext,
-				wrap(op, t -> mapper.apply(t).run(
+				op.pipe(t -> mapper.apply(t).run(
 					executionContext,
 					k -> pads.computeIfAbsent(
 						k,
@@ -361,29 +363,8 @@
 
 	@Override
 	public <S> OSGi<S> transform(Transformer<T, S> fun) {
-		return pipe(
-			(executionContext, op) -> run(
-				executionContext, wrap(op, fun.transform(op))));
-	}
-
-	protected <S> BaseOSGiImpl<S> pipe(OSGiRunnable<S> runnable) {
-		return new BaseOSGiImpl<>(runnable);
-	}
-
-	protected static <T, S> Publisher<S> wrap(
-		Publisher<T> prev, Publisher<S> next) {
-
-		return new Publisher<S>() {
-			@Override
-			public OSGiResult publish(S t) {
-				return next.publish(t);
-			}
-
-			@Override
-			public <E extends Exception> OSGiResult error(S s, Exception e) throws E {
-				return prev.error((T)s, e);
-			}
-		};
+		return new BaseOSGiImpl<>((executionContext, op) -> run(
+				executionContext, op.pipe(fun.transform(op)::apply)));
 	}
 
 }
diff --git a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/DistributeOSGiImpl.java b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/DistributeOSGiImpl.java
index f24c9c5..f0bdb71 100644
--- a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/DistributeOSGiImpl.java
+++ b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/DistributeOSGiImpl.java
@@ -16,6 +16,7 @@
 
 import org.apache.aries.component.dsl.OSGi;
 import org.apache.aries.component.dsl.OSGiResult;
+import org.apache.aries.component.dsl.Publisher;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -39,7 +40,7 @@
 
             OSGiResult result = operation.run(
                 executionContext,
-                wrap(publisher, t -> {
+                publisher.pipe(t -> {
                     List<Runnable> terminators = new ArrayList<>(funs.length);
 
                     int i = 0;
diff --git a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/OSGiImpl.java b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/OSGiImpl.java
index dcb5c03..6529225 100644
--- a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/OSGiImpl.java
+++ b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/OSGiImpl.java
@@ -50,7 +50,7 @@
 			ExecutionContext ec, Publisher<? super T> op) {
 
 			return operation.run(ec,
-				wrap(op, t -> {
+				op.pipe(t -> {
 					try {
 						return op.publish(t);
 					} catch (PublisherRethrowException pre) {