Unify implementations
diff --git a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/BaseOSGiImpl.java b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/BaseOSGiImpl.java
index 1bf1358..74b0219 100644
--- a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/BaseOSGiImpl.java
+++ b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/BaseOSGiImpl.java
@@ -239,84 +239,78 @@
Consumer<? super T> onRemovedAfter,
Consumer<? super T> onUpdate) {
- return new BaseOSGiImpl<>((executionContext, op) ->
- run(
- executionContext,
- op.pipe(t -> {
- onAddedBefore.accept(t);
+ return transform(op -> t -> {
+ onAddedBefore.accept(t);
+ try {
+ OSGiResult terminator = op.publish(t);
+
+ OSGiResult result = new OSGiResultImpl(() -> {
try {
- OSGiResult terminator = op.publish(t);
-
- OSGiResult result = new OSGiResultImpl(() -> {
- try {
- onRemovedBefore.accept(t);
- }
- catch (Exception e) {
- //TODO: logging
- }
-
- try {
- terminator.run();
- }
- catch (Exception e) {
- //TODO: logging
- }
-
- try {
- onRemovedAfter.accept(t);
- }
- catch (Exception e) {
- //TODO: logging
- }
- },
- () -> {
- onUpdate.accept(t);
-
- return terminator.update();
- }
- );
-
- try {
- onAddedAfter.accept(t);
- }
- catch (Exception e) {
- result.run();
-
- throw e;
- }
-
- return result;
+ onRemovedBefore.accept(t);
}
catch (Exception e) {
- try {
- onRemovedAfter.accept(t);
- }
- catch (Exception e1) {
- //TODO: logging
- }
-
- throw e;
+ //TODO: logging
}
+
+ try {
+ terminator.run();
+ }
+ catch (Exception e) {
+ //TODO: logging
+ }
+
+ try {
+ onRemovedAfter.accept(t);
+ }
+ catch (Exception e) {
+ //TODO: logging
+ }
+ },
+ () -> {
+ onUpdate.accept(t);
+
+ return terminator.update();
+ }
+ );
+
+ try {
+ onAddedAfter.accept(t);
}
- )
- ));
+ catch (Exception e) {
+ result.run();
+
+ throw e;
+ }
+
+ return result;
+ }
+ catch (Exception e) {
+ try {
+ onRemovedAfter.accept(t);
+ }
+ catch (Exception e1) {
+ //TODO: logging
+ }
+
+ throw e;
+ }
+ }
+ );
}
@Override
public OSGi<T> filter(Predicate<T> predicate) {
- return new BaseOSGiImpl<>((executionContext, op) ->
- run(
- executionContext,
- op.pipe(t -> {
- if (predicate.test(t)) {
- return op.apply(t);
- }
- else {
- return NOOP;
- }
+ return transform(
+ op -> t -> {
+ if (predicate.test(t)) {
+ return op.apply(t);
}
- )));
+ else {
+ return NOOP;
+ }
+ }
+ );
}
@Override
@@ -327,8 +321,7 @@
@Override
public <S> OSGi<S> map(Function<? super T, ? extends S> function) {
- return new BaseOSGiImpl<>((executionContext, op) ->
- run(executionContext, op.pipe(t -> op.apply(function.apply(t)))));
+ return transform(op -> t -> op.apply(function.apply(t)));
}
@Override