Move method implementations to implementation class
diff --git a/component-dsl/src/main/java/org/apache/aries/component/dsl/OSGi.java b/component-dsl/src/main/java/org/apache/aries/component/dsl/OSGi.java
index 5eccb5d..e29f2a8 100644
--- a/component-dsl/src/main/java/org/apache/aries/component/dsl/OSGi.java
+++ b/component-dsl/src/main/java/org/apache/aries/component/dsl/OSGi.java
@@ -485,108 +485,13 @@
));
}
- default <S> OSGi<S> applyTo(OSGi<Function<T, S>> fun) {
- return fromOsgiRunnable((executionContext, op) -> {
- ConcurrentDoublyLinkedList<T> identities = new ConcurrentDoublyLinkedList<>();
- ConcurrentDoublyLinkedList<Function<T,S>> functions = new ConcurrentDoublyLinkedList<>();
- IdentityHashMap<T, IdentityHashMap<Function<T, S>, Runnable>>
- terminators = new IdentityHashMap<>();
+ <S> OSGi<S> applyTo(OSGi<Function<T, S>> fun);
- OSGiResult funRun = fun.run(
- executionContext,
- f -> {
- synchronized(identities) {
- ConcurrentDoublyLinkedList.Node node = functions.addLast(f);
-
- for (T t : identities) {
- IdentityHashMap<Function<T, S>, Runnable> terminatorMap =
- terminators.computeIfAbsent(
- t, __ -> new IdentityHashMap<>());
- terminatorMap.put(f, op.apply(f.apply(t)));
- }
-
- return () -> {
- synchronized (identities) {
- node.remove();
-
- identities.forEach(t -> {
- Runnable terminator = terminators.get(t).remove(f);
-
- terminator.run();
- });
- }
- };
- }
- }
- );
-
- OSGiResult myRun = run(
- executionContext,
- t -> {
- synchronized (identities) {
- ConcurrentDoublyLinkedList.Node node = identities.addLast(t);
-
- for (Function<T, S> f : functions) {
- IdentityHashMap<Function<T, S>, Runnable> terminatorMap =
- terminators.computeIfAbsent(
- t, __ -> new IdentityHashMap<>());
- terminatorMap.put(f, op.apply(f.apply(t)));
- }
-
- return () -> {
- synchronized (identities) {
- node.remove();
-
- functions.forEach(f -> {
- Runnable terminator = terminators.get(t).remove(f);
-
- terminator.run();
- });
- }
- };
- }
- }
- );
-
- return () -> {
- myRun.close();
-
- funRun.close();
- };
- });
- }
-
- default <S> OSGi<S> choose(
+ <S> OSGi<S> choose(
Function<T, OSGi<Boolean>> chooser, Function<OSGi<T>, OSGi<S>> then,
- Function<OSGi<T>, OSGi<S>> otherwise) {
+ Function<OSGi<T>, OSGi<S>> otherwise);
- return fromOsgiRunnable((executionContext, publisher) -> {
- Pad<T, S> thenPad = new Pad<>(executionContext, then, publisher);
- Pad<T, S> elsePad = new Pad<>(executionContext, otherwise, publisher);
-
- OSGiResult result = run(
- executionContext,
- t -> chooser.apply(t).run(
- executionContext,
- b -> {
- if (b) {
- return thenPad.publish(t);
- } else {
- return elsePad.publish(t);
- }
- }
- ));
- return () -> {
- thenPad.close();
- elsePad.close();
- result.close();
- };
- });
- }
-
- default <S> OSGi<S> distribute(Function<OSGi<T>, OSGi<S>> ... funs) {
- return new DistributeOSGiImpl<>(this, funs);
- }
+ <S> OSGi<S> distribute(Function<OSGi<T>, OSGi<S>> ... funs);
default OSGi<T> effects(
Consumer<? super T> onAdded, Consumer<? super T> onRemoved) {
@@ -594,93 +499,18 @@
return effects(onAdded, __ -> {}, __ -> {}, onRemoved);
}
- default OSGi<T> effects(
+ OSGi<T> effects(
Consumer<? super T> onAddedBefore, Consumer<? super T> onAddedAfter,
Consumer<? super T> onRemovedBefore,
- Consumer<? super T> onRemovedAfter) {
-
- return fromOsgiRunnable((executionContext, op) ->
- run(
- executionContext,
- t -> {
- onAddedBefore.accept(t);
-
- try {
- Runnable terminator = op.publish(t);
-
- OSGiResult result = () -> {
- try {
- onRemovedBefore.accept(t);
- }
- catch (Exception e) {
- //TODO: logging
- }
-
- try {
- terminator.run();
- }
- catch (Exception e) {
- //TODO: logging
- }
-
- try {
- onRemovedAfter.accept(t);
- }
- catch (Exception e) {
- //TODO: logging
- }
- };
-
- try {
- onAddedAfter.accept(t);
- }
- catch (Exception e) {
- result.run();
-
- throw e;
- }
-
- return result;
- }
- catch (Exception e) {
- try {
- onRemovedAfter.accept(t);
- }
- catch (Exception e1) {
- //TODO: logging
- }
-
- throw e;
- }
- }
- )
- );
- }
+ Consumer<? super T> onRemovedAfter);
default OSGi<T> effects(Effect<? super T> effect) {
return effects(effect.getOnIncoming(), effect.getOnLeaving());
}
- default OSGi<T> filter(Predicate<T> predicate) {
- return fromOsgiRunnable((executionContext, op) ->
- run(
- executionContext,
- t -> {
- if (predicate.test(t)) {
- return op.apply(t);
- }
- else {
- return NOOP;
- }
- }
- ));
- }
+ OSGi<T> filter(Predicate<T> predicate);
- default <S> OSGi<S> flatMap(Function<? super T, OSGi<? extends S>> fun) {
- return fromOsgiRunnable((executionContext, op) ->
- run(executionContext, t -> fun.apply(t).run(executionContext, op))
- );
- }
+ <S> OSGi<S> flatMap(Function<? super T, OSGi<? extends S>> fun);
default OSGi<Void> foreach(Consumer<? super T> onAdded) {
return foreach(onAdded, __ -> {});
@@ -692,76 +522,19 @@
return ignore(effects(onAdded, onRemoved));
}
- default <S> OSGi<S> map(Function<? super T, ? extends S> function) {
- return fromOsgiRunnable((executionContext, op) ->
- run(executionContext, t -> op.apply(function.apply(t)))
- );
- }
+ <S> OSGi<S> map(Function<? super T, ? extends S> function);
- default OSGi<T> recover(BiFunction<T, Exception, T> onError) {
- return fromOsgiRunnable((executionContext, op) ->
- run(
- executionContext,
- t -> {
- try {
- return op.apply(t);
- }
- catch (Exception e) {
- return op.apply(onError.apply(t, e));
- }
- }
- ));
- }
+ OSGi<T> recover(BiFunction<T, Exception, T> onError);
- default OSGi<T> recoverWith(BiFunction<T, Exception, OSGi<T>> onError) {
- return fromOsgiRunnable((executionContext, op) ->
- run(
- executionContext,
- t -> {
- try {
- return op.apply(t);
- }
- catch (Exception e) {
- return onError.apply(t, e).run(executionContext, op);
- }
- }
- ));
- }
+ OSGi<T> recoverWith(BiFunction<T, Exception, OSGi<T>> onError);
- default <K, S> OSGi<S> splitBy(
- Function<T, OSGi<K>> mapper, BiFunction<K, OSGi<T>, OSGi<S>> fun) {
-
- return fromOsgiRunnable((executionContext, op) -> {
- HashMap<K, Pad<T, S>> pads = new HashMap<>();
-
- OSGiResult result = run(
- executionContext,
- t -> mapper.apply(t).run(
- executionContext,
- k -> pads.computeIfAbsent(
- k,
- __ -> new Pad<>(
- executionContext,
- ___ -> fun.apply(k, ___), op)
- ).publish(t)
- )
- );
-
- return () -> {
- pads.values().forEach(Pad::close);
-
- result.close();
- };
- });
- }
+ <K, S> OSGi<S> splitBy(
+ Function<T, OSGi<K>> mapper, BiFunction<K, OSGi<T>, OSGi<S>> fun);
default public <S> OSGi<S> then(OSGi<S> next) {
return flatMap(__ -> next);
}
- default <S> OSGi<S> transform(Transformer<T, S> fun) {
- return fromOsgiRunnable(
- (executionContext, op) -> run(executionContext, fun.transform(op)));
- }
+ <S> OSGi<S> transform(Transformer<T, S> fun);
}
diff --git a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/DistributeOSGiImpl.java b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/DistributeOSGiImpl.java
index d8d1960..490d59a 100644
--- a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/DistributeOSGiImpl.java
+++ b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/DistributeOSGiImpl.java
@@ -30,7 +30,7 @@
@SafeVarargs
public DistributeOSGiImpl(
- OSGiRunnable<T> operation, Function<OSGi<T>, OSGi<S>>... funs) {
+ OSGiImpl<T> operation, Function<OSGi<T>, OSGi<S>>... funs) {
super((executionContext, publisher) -> {
Pad<T, S>[] pads = new Pad[funs.length];
diff --git a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/OSGiImpl.java b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/OSGiImpl.java
index 08ea1b3..7ec50c1 100644
--- a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/OSGiImpl.java
+++ b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/OSGiImpl.java
@@ -17,14 +17,17 @@
package org.apache.aries.component.dsl.internal;
-import org.apache.aries.component.dsl.OSGi;
-import org.apache.aries.component.dsl.OSGiRunnable;
-import org.apache.aries.component.dsl.Publisher;
-import org.apache.aries.component.dsl.OSGiResult;
-import org.osgi.framework.BundleContext;
+import org.apache.aries.component.dsl.*;
import org.osgi.framework.Filter;
import org.osgi.framework.InvalidSyntaxException;
+import java.util.HashMap;
+import java.util.IdentityHashMap;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Predicate;
+
/**
* @author Carlos Sierra Andrés
*/
@@ -101,6 +104,271 @@
OSGiRunnable<T> _operation;
+ @Override
+ public <S> OSGi<S> applyTo(OSGi<Function<T, S>> fun) {
+ return new OSGiImpl<>((executionContext, op) -> {
+ ConcurrentDoublyLinkedList<T> identities = new ConcurrentDoublyLinkedList<>();
+ ConcurrentDoublyLinkedList<Function<T,S>> functions = new ConcurrentDoublyLinkedList<>();
+ IdentityHashMap<T, IdentityHashMap<Function<T, S>, Runnable>>
+ terminators = new IdentityHashMap<>();
+
+ OSGiResult funRun = fun.run(
+ executionContext,
+ f -> {
+ synchronized(identities) {
+ ConcurrentDoublyLinkedList.Node node = functions.addLast(f);
+
+ for (T t : identities) {
+ IdentityHashMap<Function<T, S>, Runnable> terminatorMap =
+ terminators.computeIfAbsent(
+ t, __ -> new IdentityHashMap<>());
+ terminatorMap.put(f, op.apply(f.apply(t)));
+ }
+
+ return () -> {
+ synchronized (identities) {
+ node.remove();
+
+ identities.forEach(t -> {
+ Runnable terminator = terminators.get(t).remove(f);
+
+ terminator.run();
+ });
+ }
+ };
+ }
+ }
+ );
+
+ OSGiResult myRun = run(
+ executionContext,
+ t -> {
+ synchronized (identities) {
+ ConcurrentDoublyLinkedList.Node node = identities.addLast(t);
+
+ for (Function<T, S> f : functions) {
+ IdentityHashMap<Function<T, S>, Runnable> terminatorMap =
+ terminators.computeIfAbsent(
+ t, __ -> new IdentityHashMap<>());
+ terminatorMap.put(f, op.apply(f.apply(t)));
+ }
+
+ return () -> {
+ synchronized (identities) {
+ node.remove();
+
+ functions.forEach(f -> {
+ Runnable terminator = terminators.get(t).remove(f);
+
+ terminator.run();
+ });
+ }
+ };
+ }
+ }
+ );
+
+ return () -> {
+ myRun.close();
+
+ funRun.close();
+ };
+ });
+ }
+
+ @Override
+ public <S> OSGi<S> choose(
+ Function<T, OSGi<Boolean>> chooser, Function<OSGi<T>, OSGi<S>> then,
+ Function<OSGi<T>, OSGi<S>> otherwise) {
+
+ return new OSGiImpl<>((executionContext, publisher) -> {
+ Pad<T, S> thenPad = new Pad<>(executionContext, then, publisher);
+ Pad<T, S> elsePad = new Pad<>(executionContext, otherwise, publisher);
+
+ OSGiResult result = run(
+ executionContext,
+ t -> chooser.apply(t).run(
+ executionContext,
+ b -> {
+ if (b) {
+ return thenPad.publish(t);
+ } else {
+ return elsePad.publish(t);
+ }
+ }
+ ));
+ return () -> {
+ thenPad.close();
+ elsePad.close();
+ result.close();
+ };
+ });
+ }
+
+ @Override
+ public <S> OSGi<S> distribute(Function<OSGi<T>, OSGi<S>>... funs) {
+ return new DistributeOSGiImpl<>(this, funs);
+ }
+
+ @Override
+ public OSGi<T> effects(
+ Consumer<? super T> onAddedBefore, Consumer<? super T> onAddedAfter,
+ Consumer<? super T> onRemovedBefore,
+ Consumer<? super T> onRemovedAfter) {
+
+ return new OSGiImpl<>((executionContext, op) ->
+ run(
+ executionContext,
+ t -> {
+ onAddedBefore.accept(t);
+
+ try {
+ Runnable terminator = op.publish(t);
+
+ OSGiResult result = () -> {
+ try {
+ onRemovedBefore.accept(t);
+ }
+ catch (Exception e) {
+ //TODO: logging
+ }
+
+ try {
+ terminator.run();
+ }
+ catch (Exception e) {
+ //TODO: logging
+ }
+
+ try {
+ onRemovedAfter.accept(t);
+ }
+ catch (Exception e) {
+ //TODO: logging
+ }
+ };
+
+ try {
+ onAddedAfter.accept(t);
+ }
+ catch (Exception e) {
+ result.run();
+
+ throw e;
+ }
+
+ return result;
+ }
+ catch (Exception e) {
+ try {
+ onRemovedAfter.accept(t);
+ }
+ catch (Exception e1) {
+ //TODO: logging
+ }
+
+ throw e;
+ }
+ }
+ )
+ );
+ }
+
+ @Override
+ public OSGi<T> filter(Predicate<T> predicate) {
+ return new OSGiImpl<>((executionContext, op) ->
+ run(
+ executionContext,
+ t -> {
+ if (predicate.test(t)) {
+ return op.apply(t);
+ }
+ else {
+ return NOOP;
+ }
+ }
+ ));
+ }
+
+ @Override
+ public <S> OSGi<S> flatMap(Function<? super T, OSGi<? extends S>> fun) {
+ return new OSGiImpl<>((executionContext, op) ->
+ run(executionContext, t -> fun.apply(t).run(executionContext, op))
+ );
+ }
+
+ @Override
+ public <S> OSGi<S> map(Function<? super T, ? extends S> function) {
+ return new OSGiImpl<>((executionContext, op) ->
+ run(executionContext, t -> op.apply(function.apply(t)))
+ );
+ }
+
+ @Override
+ public OSGi<T> recover(BiFunction<T, Exception, T> onError) {
+ return new OSGiImpl<>((executionContext, op) ->
+ run(
+ executionContext,
+ t -> {
+ try {
+ return op.apply(t);
+ }
+ catch (Exception e) {
+ return op.apply(onError.apply(t, e));
+ }
+ }
+ ));
+ }
+
+ @Override
+ public OSGi<T> recoverWith(BiFunction<T, Exception, OSGi<T>> onError) {
+ return new OSGiImpl<>((executionContext, op) ->
+ run(
+ executionContext,
+ t -> {
+ try {
+ return op.apply(t);
+ }
+ catch (Exception e) {
+ return onError.apply(t, e).run(executionContext, op);
+ }
+ }
+ ));
+ }
+
+ @Override
+ public <K, S> OSGi<S> splitBy(
+ Function<T, OSGi<K>> mapper, BiFunction<K, OSGi<T>, OSGi<S>> fun) {
+
+ return new OSGiImpl<>((executionContext, op) -> {
+ HashMap<K, Pad<T, S>> pads = new HashMap<>();
+
+ OSGiResult result = run(
+ executionContext,
+ t -> mapper.apply(t).run(
+ executionContext,
+ k -> pads.computeIfAbsent(
+ k,
+ __ -> new Pad<>(
+ executionContext,
+ ___ -> fun.apply(k, ___), op)
+ ).publish(t)
+ )
+ );
+
+ return () -> {
+ pads.values().forEach(Pad::close);
+
+ result.close();
+ };
+ });
+ }
+
+ @Override
+ public <S> OSGi<S> transform(Transformer<T, S> fun) {
+ return new OSGiImpl<>(
+ (executionContext, op) -> run(executionContext, fun.transform(op)));
+ }
}