Move method implementations to implementation class
diff --git a/component-dsl/src/main/java/org/apache/aries/component/dsl/OSGi.java b/component-dsl/src/main/java/org/apache/aries/component/dsl/OSGi.java
index 5eccb5d..e29f2a8 100644
--- a/component-dsl/src/main/java/org/apache/aries/component/dsl/OSGi.java
+++ b/component-dsl/src/main/java/org/apache/aries/component/dsl/OSGi.java
@@ -485,108 +485,13 @@
 		));
 	}
 
-	default  <S> OSGi<S> applyTo(OSGi<Function<T, S>> fun) {
-		return fromOsgiRunnable((executionContext, op) -> {
-			ConcurrentDoublyLinkedList<T> identities = new ConcurrentDoublyLinkedList<>();
-			ConcurrentDoublyLinkedList<Function<T,S>> functions = new ConcurrentDoublyLinkedList<>();
-			IdentityHashMap<T, IdentityHashMap<Function<T, S>, Runnable>>
-				terminators = new IdentityHashMap<>();
+	<S> OSGi<S> applyTo(OSGi<Function<T, S>> fun);
 
-			OSGiResult funRun = fun.run(
-				executionContext,
-				f -> {
-					synchronized(identities) {
-						ConcurrentDoublyLinkedList.Node node = functions.addLast(f);
-
-						for (T t : identities) {
-							IdentityHashMap<Function<T, S>, Runnable> terminatorMap =
-								terminators.computeIfAbsent(
-									t, __ -> new IdentityHashMap<>());
-							terminatorMap.put(f, op.apply(f.apply(t)));
-						}
-
-						return () -> {
-							synchronized (identities) {
-								node.remove();
-
-								identities.forEach(t -> {
-									Runnable terminator = terminators.get(t).remove(f);
-
-									terminator.run();
-								});
-							}
-						};
-					}
-				}
-			);
-
-			OSGiResult myRun = run(
-				executionContext,
-				t -> {
-					synchronized (identities) {
-						ConcurrentDoublyLinkedList.Node node = identities.addLast(t);
-
-						for (Function<T, S> f : functions) {
-							IdentityHashMap<Function<T, S>, Runnable> terminatorMap =
-								terminators.computeIfAbsent(
-									t, __ -> new IdentityHashMap<>());
-							terminatorMap.put(f, op.apply(f.apply(t)));
-						}
-
-						return () -> {
-							synchronized (identities) {
-								node.remove();
-
-								functions.forEach(f -> {
-									Runnable terminator = terminators.get(t).remove(f);
-
-									terminator.run();
-								});
-							}
-						};
-					}
-				}
-			);
-
-			return () -> {
-				myRun.close();
-
-				funRun.close();
-			};
-		});
-	}
-
-	default <S> OSGi<S> choose(
+	<S> OSGi<S> choose(
 		Function<T, OSGi<Boolean>> chooser, Function<OSGi<T>, OSGi<S>> then,
-		Function<OSGi<T>, OSGi<S>> otherwise) {
+		Function<OSGi<T>, OSGi<S>> otherwise);
 
-		return fromOsgiRunnable((executionContext, publisher) -> {
-			Pad<T, S> thenPad = new Pad<>(executionContext, then, publisher);
-			Pad<T, S> elsePad = new Pad<>(executionContext, otherwise, publisher);
-
-			OSGiResult result = run(
-				executionContext,
-				t -> chooser.apply(t).run(
-                    executionContext,
-                    b -> {
-                        if (b) {
-                            return thenPad.publish(t);
-                        } else {
-                            return elsePad.publish(t);
-                        }
-                    }
-                ));
-			return () -> {
-				thenPad.close();
-				elsePad.close();
-				result.close();
-			};
-		});
-	}
-
-	default <S> OSGi<S> distribute(Function<OSGi<T>, OSGi<S>> ... funs) {
-		return new DistributeOSGiImpl<>(this, funs);
-	}
+	<S> OSGi<S> distribute(Function<OSGi<T>, OSGi<S>> ... funs);
 
 	default OSGi<T> effects(
 		Consumer<? super T> onAdded, Consumer<? super T> onRemoved) {
@@ -594,93 +499,18 @@
 		return effects(onAdded, __ -> {}, __ -> {}, onRemoved);
 	}
 
-	default OSGi<T> effects(
+	OSGi<T> effects(
 		Consumer<? super T> onAddedBefore, Consumer<? super T> onAddedAfter,
 		Consumer<? super T> onRemovedBefore,
-		Consumer<? super T> onRemovedAfter) {
-
-		return fromOsgiRunnable((executionContext, op) ->
-			run(
-				executionContext,
-				t -> {
-					onAddedBefore.accept(t);
-
-					try {
-						Runnable terminator = op.publish(t);
-
-						OSGiResult result = () -> {
-							try {
-								onRemovedBefore.accept(t);
-							}
-							catch (Exception e) {
-								//TODO: logging
-							}
-
-							try {
-								terminator.run();
-							}
-							catch (Exception e) {
-								//TODO: logging
-							}
-
-							try {
-								onRemovedAfter.accept(t);
-							}
-							catch (Exception e) {
-								//TODO: logging
-							}
-						};
-
-						try {
-							onAddedAfter.accept(t);
-						}
-						catch (Exception e) {
-							result.run();
-
-							throw e;
-						}
-
-						return result;
-					}
-					catch (Exception e) {
-						try {
-							onRemovedAfter.accept(t);
-						}
-						catch (Exception e1) {
-							//TODO: logging
-						}
-
-						throw e;
-					}
-				}
-			)
-		);
-	}
+		Consumer<? super T> onRemovedAfter);
 
 	default OSGi<T> effects(Effect<? super T> effect) {
 		return effects(effect.getOnIncoming(), effect.getOnLeaving());
 	}
 
-	default OSGi<T> filter(Predicate<T> predicate) {
-		return fromOsgiRunnable((executionContext, op) ->
-			run(
-				executionContext,
-				t -> {
-					if (predicate.test(t)) {
-						return op.apply(t);
-					}
-					else {
-						return NOOP;
-					}
-				}
-			));
-	}
+	OSGi<T> filter(Predicate<T> predicate);
 
-	default <S> OSGi<S> flatMap(Function<? super T, OSGi<? extends S>> fun) {
-		return fromOsgiRunnable((executionContext, op) ->
-			run(executionContext, t -> fun.apply(t).run(executionContext, op))
-		);
-	}
+	<S> OSGi<S> flatMap(Function<? super T, OSGi<? extends S>> fun);
 
 	default OSGi<Void> foreach(Consumer<? super T> onAdded) {
 		return foreach(onAdded, __ -> {});
@@ -692,76 +522,19 @@
 		return ignore(effects(onAdded, onRemoved));
 	}
 
-	default <S> OSGi<S> map(Function<? super T, ? extends S> function) {
-		return fromOsgiRunnable((executionContext, op) ->
-			run(executionContext, t -> op.apply(function.apply(t)))
-		);
-	}
+	<S> OSGi<S> map(Function<? super T, ? extends S> function);
 
-	default OSGi<T> recover(BiFunction<T, Exception, T> onError) {
-		return fromOsgiRunnable((executionContext, op) ->
-			run(
-				executionContext,
-				t -> {
-					try {
-						return op.apply(t);
-					}
-					catch (Exception e) {
-						return op.apply(onError.apply(t, e));
-					}
-				}
-			));
-	}
+	OSGi<T> recover(BiFunction<T, Exception, T> onError);
 
-	default OSGi<T> recoverWith(BiFunction<T, Exception, OSGi<T>> onError) {
-		return fromOsgiRunnable((executionContext, op) ->
-			run(
-				executionContext,
-				t -> {
-					try {
-						return op.apply(t);
-					}
-					catch (Exception e) {
-						return onError.apply(t, e).run(executionContext, op);
-					}
-				}
-			));
-	}
+	OSGi<T> recoverWith(BiFunction<T, Exception, OSGi<T>> onError);
 
-	default <K, S> OSGi<S> splitBy(
-		Function<T, OSGi<K>> mapper, BiFunction<K, OSGi<T>, OSGi<S>> fun) {
-
-		return fromOsgiRunnable((executionContext, op) -> {
-			HashMap<K, Pad<T, S>> pads = new HashMap<>();
-
-			OSGiResult result = run(
-				executionContext,
-				t -> mapper.apply(t).run(
-					executionContext,
-					k -> pads.computeIfAbsent(
-						k,
-						__ -> new Pad<>(
-							executionContext,
-							___ -> fun.apply(k, ___), op)
-					).publish(t)
-				)
-			);
-
-			return () -> {
-				pads.values().forEach(Pad::close);
-
-				result.close();
-			};
-		});
-	}
+	<K, S> OSGi<S> splitBy(
+		Function<T, OSGi<K>> mapper, BiFunction<K, OSGi<T>, OSGi<S>> fun);
 
 	default public <S> OSGi<S> then(OSGi<S> next) {
 		return flatMap(__ -> next);
 	}
 
-	default <S> OSGi<S> transform(Transformer<T, S> fun) {
-		return fromOsgiRunnable(
-			(executionContext, op) -> run(executionContext, fun.transform(op)));
-	}
+	<S> OSGi<S> transform(Transformer<T, S> fun);
 
 }
diff --git a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/DistributeOSGiImpl.java b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/DistributeOSGiImpl.java
index d8d1960..490d59a 100644
--- a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/DistributeOSGiImpl.java
+++ b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/DistributeOSGiImpl.java
@@ -30,7 +30,7 @@
 
     @SafeVarargs
     public DistributeOSGiImpl(
-        OSGiRunnable<T> operation, Function<OSGi<T>, OSGi<S>>... funs) {
+        OSGiImpl<T> operation, Function<OSGi<T>, OSGi<S>>... funs) {
 
         super((executionContext, publisher) -> {
             Pad<T, S>[] pads = new Pad[funs.length];
diff --git a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/OSGiImpl.java b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/OSGiImpl.java
index 08ea1b3..7ec50c1 100644
--- a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/OSGiImpl.java
+++ b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/OSGiImpl.java
@@ -17,14 +17,17 @@
 
 package org.apache.aries.component.dsl.internal;
 
-import org.apache.aries.component.dsl.OSGi;
-import org.apache.aries.component.dsl.OSGiRunnable;
-import org.apache.aries.component.dsl.Publisher;
-import org.apache.aries.component.dsl.OSGiResult;
-import org.osgi.framework.BundleContext;
+import org.apache.aries.component.dsl.*;
 import org.osgi.framework.Filter;
 import org.osgi.framework.InvalidSyntaxException;
 
+import java.util.HashMap;
+import java.util.IdentityHashMap;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Predicate;
+
 /**
  * @author Carlos Sierra Andrés
  */
@@ -101,6 +104,271 @@
 
 	OSGiRunnable<T> _operation;
 
+	@Override
+	public <S> OSGi<S> applyTo(OSGi<Function<T, S>> fun) {
+		return new OSGiImpl<>((executionContext, op) -> {
+			ConcurrentDoublyLinkedList<T> identities = new ConcurrentDoublyLinkedList<>();
+			ConcurrentDoublyLinkedList<Function<T,S>> functions = new ConcurrentDoublyLinkedList<>();
+			IdentityHashMap<T, IdentityHashMap<Function<T, S>, Runnable>>
+				terminators = new IdentityHashMap<>();
+
+			OSGiResult funRun = fun.run(
+				executionContext,
+				f -> {
+					synchronized(identities) {
+						ConcurrentDoublyLinkedList.Node node = functions.addLast(f);
+
+						for (T t : identities) {
+							IdentityHashMap<Function<T, S>, Runnable> terminatorMap =
+								terminators.computeIfAbsent(
+									t, __ -> new IdentityHashMap<>());
+							terminatorMap.put(f, op.apply(f.apply(t)));
+						}
+
+						return () -> {
+							synchronized (identities) {
+								node.remove();
+
+								identities.forEach(t -> {
+									Runnable terminator = terminators.get(t).remove(f);
+
+									terminator.run();
+								});
+							}
+						};
+					}
+				}
+			);
+
+			OSGiResult myRun = run(
+				executionContext,
+				t -> {
+					synchronized (identities) {
+						ConcurrentDoublyLinkedList.Node node = identities.addLast(t);
+
+						for (Function<T, S> f : functions) {
+							IdentityHashMap<Function<T, S>, Runnable> terminatorMap =
+								terminators.computeIfAbsent(
+									t, __ -> new IdentityHashMap<>());
+							terminatorMap.put(f, op.apply(f.apply(t)));
+						}
+
+						return () -> {
+							synchronized (identities) {
+								node.remove();
+
+								functions.forEach(f -> {
+									Runnable terminator = terminators.get(t).remove(f);
+
+									terminator.run();
+								});
+							}
+						};
+					}
+				}
+			);
+
+			return () -> {
+				myRun.close();
+
+				funRun.close();
+			};
+		});
+	}
+
+	@Override
+	public <S> OSGi<S> choose(
+		Function<T, OSGi<Boolean>> chooser, Function<OSGi<T>, OSGi<S>> then,
+		Function<OSGi<T>, OSGi<S>> otherwise) {
+
+		return new OSGiImpl<>((executionContext, publisher) -> {
+			Pad<T, S> thenPad = new Pad<>(executionContext, then, publisher);
+			Pad<T, S> elsePad = new Pad<>(executionContext, otherwise, publisher);
+
+			OSGiResult result = run(
+				executionContext,
+				t -> chooser.apply(t).run(
+                    executionContext,
+                    b -> {
+                        if (b) {
+                            return thenPad.publish(t);
+                        } else {
+                            return elsePad.publish(t);
+                        }
+                    }
+                ));
+			return () -> {
+				thenPad.close();
+				elsePad.close();
+				result.close();
+			};
+		});
+	}
+
+	@Override
+	public <S> OSGi<S> distribute(Function<OSGi<T>, OSGi<S>>... funs) {
+		return new DistributeOSGiImpl<>(this, funs);
+	}
+
+	@Override
+	public OSGi<T> effects(
+		Consumer<? super T> onAddedBefore, Consumer<? super T> onAddedAfter,
+		Consumer<? super T> onRemovedBefore,
+		Consumer<? super T> onRemovedAfter) {
+
+		return new OSGiImpl<>((executionContext, op) ->
+			run(
+				executionContext,
+				t -> {
+					onAddedBefore.accept(t);
+
+					try {
+						Runnable terminator = op.publish(t);
+
+						OSGiResult result = () -> {
+							try {
+								onRemovedBefore.accept(t);
+							}
+							catch (Exception e) {
+								//TODO: logging
+							}
+
+							try {
+								terminator.run();
+							}
+							catch (Exception e) {
+								//TODO: logging
+							}
+
+							try {
+								onRemovedAfter.accept(t);
+							}
+							catch (Exception e) {
+								//TODO: logging
+							}
+						};
+
+						try {
+							onAddedAfter.accept(t);
+						}
+						catch (Exception e) {
+							result.run();
+
+							throw e;
+						}
+
+						return result;
+					}
+					catch (Exception e) {
+						try {
+							onRemovedAfter.accept(t);
+						}
+						catch (Exception e1) {
+							//TODO: logging
+						}
+
+						throw e;
+					}
+				}
+			)
+		);
+	}
+
+	@Override
+	public OSGi<T> filter(Predicate<T> predicate) {
+		return new OSGiImpl<>((executionContext, op) ->
+			run(
+				executionContext,
+				t -> {
+					if (predicate.test(t)) {
+						return op.apply(t);
+					}
+					else {
+						return NOOP;
+					}
+				}
+			));
+	}
+
+	@Override
+	public <S> OSGi<S> flatMap(Function<? super T, OSGi<? extends S>> fun) {
+		return new OSGiImpl<>((executionContext, op) ->
+			run(executionContext, t -> fun.apply(t).run(executionContext, op))
+		);
+	}
+
+	@Override
+	public <S> OSGi<S> map(Function<? super T, ? extends S> function) {
+		return new OSGiImpl<>((executionContext, op) ->
+			run(executionContext, t -> op.apply(function.apply(t)))
+		);
+	}
+
+	@Override
+	public OSGi<T> recover(BiFunction<T, Exception, T> onError) {
+		return new OSGiImpl<>((executionContext, op) ->
+			run(
+				executionContext,
+				t -> {
+					try {
+						return op.apply(t);
+					}
+					catch (Exception e) {
+						return op.apply(onError.apply(t, e));
+					}
+				}
+			));
+	}
+
+	@Override
+	public OSGi<T> recoverWith(BiFunction<T, Exception, OSGi<T>> onError) {
+		return new OSGiImpl<>((executionContext, op) ->
+			run(
+				executionContext,
+				t -> {
+					try {
+						return op.apply(t);
+					}
+					catch (Exception e) {
+						return onError.apply(t, e).run(executionContext, op);
+					}
+				}
+			));
+	}
+
+	@Override
+	public <K, S> OSGi<S> splitBy(
+		Function<T, OSGi<K>> mapper, BiFunction<K, OSGi<T>, OSGi<S>> fun) {
+
+		return new OSGiImpl<>((executionContext, op) -> {
+			HashMap<K, Pad<T, S>> pads = new HashMap<>();
+
+			OSGiResult result = run(
+				executionContext,
+				t -> mapper.apply(t).run(
+					executionContext,
+					k -> pads.computeIfAbsent(
+						k,
+						__ -> new Pad<>(
+							executionContext,
+							___ -> fun.apply(k, ___), op)
+					).publish(t)
+				)
+			);
+
+			return () -> {
+				pads.values().forEach(Pad::close);
+
+				result.close();
+			};
+		});
+	}
+
+	@Override
+	public <S> OSGi<S> transform(Transformer<T, S> fun) {
+		return new OSGiImpl<>(
+			(executionContext, op) -> run(executionContext, fun.transform(op)));
+	}
 }