Add update support to OSGi result
diff --git a/component-dsl/src/main/java/org/apache/aries/component/dsl/OSGi.java b/component-dsl/src/main/java/org/apache/aries/component/dsl/OSGi.java
index 619e22f..50502ba 100644
--- a/component-dsl/src/main/java/org/apache/aries/component/dsl/OSGi.java
+++ b/component-dsl/src/main/java/org/apache/aries/component/dsl/OSGi.java
@@ -44,6 +44,7 @@
import org.apache.aries.component.dsl.function.Function3;
import org.apache.aries.component.dsl.function.Function5;
import org.apache.aries.component.dsl.function.Function7;
+import org.apache.aries.component.dsl.update.UpdateQuery;
import org.osgi.framework.Bundle;
import org.osgi.framework.BundleContext;
import org.osgi.framework.ServiceFactory;
@@ -303,11 +304,12 @@
() -> effect.getOnIncoming().accept(null),
NOOP,
NOOP,
- () -> effect.getOnLeaving().accept(null));
+ () -> effect.getOnLeaving().accept(null),
+ UpdateQuery.onUpdate());
}
static OSGi<Void> effects(Runnable onAdding, Runnable onRemoving) {
- return new EffectsOSGi(onAdding, NOOP, NOOP, onRemoving);
+ return new EffectsOSGi(onAdding, NOOP, NOOP, onRemoving, UpdateQuery.onUpdate());
}
static OSGi<Void> effects(
@@ -315,7 +317,16 @@
Runnable onRemovingBefore, Runnable onRemovingAfter) {
return new EffectsOSGi(
- onAddingBefore, onAddingAfter, onRemovingBefore, onRemovingAfter);
+ onAddingBefore, onAddingAfter, onRemovingBefore, onRemovingAfter,
+ UpdateQuery.onUpdate());
+ }
+
+ static OSGi<Void> effects(
+ Runnable onAddingBefore, Runnable onAddingAfter,
+ Runnable onRemovingBefore, Runnable onRemovingAfter, UpdateQuery<Void> updateQuery) {
+
+ return new EffectsOSGi(
+ onAddingBefore, onAddingAfter, onRemovingBefore, onRemovingAfter, updateQuery);
}
static <T> OSGi<T> fromOsgiRunnable(OSGiRunnable<T> runnable) {
diff --git a/component-dsl/src/main/java/org/apache/aries/component/dsl/OSGiResult.java b/component-dsl/src/main/java/org/apache/aries/component/dsl/OSGiResult.java
index 0434e2b..ad56515 100644
--- a/component-dsl/src/main/java/org/apache/aries/component/dsl/OSGiResult.java
+++ b/component-dsl/src/main/java/org/apache/aries/component/dsl/OSGiResult.java
@@ -17,6 +17,8 @@
package org.apache.aries.component.dsl;
+import org.apache.aries.component.dsl.update.UpdateSelector;
+
/**
* @author Carlos Sierra Andrés
*/
@@ -29,4 +31,6 @@
close();
}
+ public default void update(UpdateSelector updateSelector) {};
+
}
diff --git a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/AggregateOSGiResult.java b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/AggregateOSGiResult.java
new file mode 100644
index 0000000..24e5124
--- /dev/null
+++ b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/AggregateOSGiResult.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.aries.component.dsl.internal;
+
+import org.apache.aries.component.dsl.OSGiResult;
+import org.apache.aries.component.dsl.update.UpdateSelector;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * @author Carlos Sierra Andrés
+ */
+public class AggregateOSGiResult implements OSGiResult {
+
+ private OSGiResult[] results;
+
+ public AggregateOSGiResult(OSGiResult ... results) {
+ this.results = results;
+ }
+
+ @Override
+ public void close() {
+ if (_closed.compareAndSet(false, true)) {
+ for (OSGiResult result : results) {
+ try {
+ result.close();
+ } catch (Exception e) {
+ }
+ }
+ }
+ }
+
+ @Override
+ public void update(UpdateSelector updateSelector) {
+ if (!_closed.get()) {
+ for (OSGiResult result : results) {
+ try {
+ result.update(updateSelector);
+ } catch (Exception e) {
+ }
+ }
+ }
+ }
+
+ private final AtomicBoolean _closed = new AtomicBoolean();
+
+}
diff --git a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/AllOSGi.java b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/AllOSGi.java
index 3c3e502..aa652c6 100644
--- a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/AllOSGi.java
+++ b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/AllOSGi.java
@@ -45,7 +45,8 @@
}
return new OSGiResultImpl(
- () -> cleanUp(results)
+ () -> cleanUp(results),
+ us -> results.forEach(result -> result.update(us))
);
});
}
diff --git a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/BaseOSGiImpl.java b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/BaseOSGiImpl.java
index 1b68e2a..cb554a2 100644
--- a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/BaseOSGiImpl.java
+++ b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/BaseOSGiImpl.java
@@ -18,6 +18,8 @@
package org.apache.aries.component.dsl.internal;
import org.apache.aries.component.dsl.*;
+import org.apache.aries.component.dsl.update.UpdateQuery;
+import org.apache.aries.component.dsl.update.UpdateSelector;
import org.osgi.framework.Filter;
import org.osgi.framework.InvalidSyntaxException;
@@ -104,7 +106,7 @@
return new BaseOSGiImpl<>((executionContext, op) -> {
ConcurrentDoublyLinkedList<T> identities = new ConcurrentDoublyLinkedList<>();
ConcurrentDoublyLinkedList<Function<T,S>> functions = new ConcurrentDoublyLinkedList<>();
- IdentityHashMap<T, IdentityHashMap<Function<T, S>, Runnable>>
+ IdentityHashMap<T, IdentityHashMap<Function<T, S>, OSGiResult>>
terminators = new IdentityHashMap<>();
OSGiResult funRun = fun.run(
@@ -114,23 +116,35 @@
ConcurrentDoublyLinkedList.Node node = functions.addLast(f);
for (T t : identities) {
- IdentityHashMap<Function<T, S>, Runnable> terminatorMap =
+ IdentityHashMap<Function<T, S>, OSGiResult> terminatorMap =
terminators.computeIfAbsent(
t, __ -> new IdentityHashMap<>());
terminatorMap.put(f, op.apply(f.apply(t)));
}
- return () -> {
- synchronized (identities) {
- node.remove();
+ return new OSGiResultImpl(
+ () -> {
+ synchronized (identities) {
+ node.remove();
- identities.forEach(t -> {
- Runnable terminator = terminators.get(t).remove(f);
+ identities.forEach(t -> {
+ Runnable terminator = terminators.get(t).remove(f);
- terminator.run();
- });
+ terminator.run();
+ });
+ }
+ },
+ us -> {
+ synchronized (identities) {
+
+ identities.forEach(t -> {
+ OSGiResult terminator = terminators.get(t).get(f);
+
+ terminator.update(us);
+ });
+ }
}
- };
+ );
}
}
));
@@ -142,32 +156,39 @@
ConcurrentDoublyLinkedList.Node node = identities.addLast(t);
for (Function<T, S> f : functions) {
- IdentityHashMap<Function<T, S>, Runnable> terminatorMap =
+ IdentityHashMap<Function<T, S>, OSGiResult> terminatorMap =
terminators.computeIfAbsent(
t, __ -> new IdentityHashMap<>());
terminatorMap.put(f, op.apply(f.apply(t)));
}
- return () -> {
- synchronized (identities) {
- node.remove();
+ return new OSGiResultImpl(
+ () -> {
+ synchronized (identities) {
+ node.remove();
- functions.forEach(f -> {
- Runnable terminator = terminators.get(t).remove(f);
+ functions.forEach(f -> {
+ Runnable terminator = terminators.get(t).remove(f);
- terminator.run();
- });
+ terminator.run();
+ });
+ }
+ },
+ us -> {
+ synchronized (identities) {
+ functions.forEach(f -> {
+ OSGiResult terminator = terminators.get(t).get(f);
+
+ terminator.update(us);
+ });
+ }
}
- };
+ );
}
})
);
- return () -> {
- myRun.close();
-
- funRun.close();
- };
+ return new AggregateOSGiResult(myRun, funRun);
});
}
@@ -192,11 +213,7 @@
}
}
)));
- return () -> {
- thenPad.close();
- elsePad.close();
- result.close();
- };
+ return new AggregateOSGiResult(thenPad, elsePad, result);
});
}
@@ -211,6 +228,15 @@
Consumer<? super T> onRemovedBefore,
Consumer<? super T> onRemovedAfter) {
+ return effects(onAddedBefore, onAddedAfter, onRemovedBefore, onRemovedAfter, UpdateQuery.onUpdate());
+ }
+
+ public OSGi<T> effects(
+ Consumer<? super T> onAddedBefore, Consumer<? super T> onAddedAfter,
+ Consumer<? super T> onRemovedBefore,
+ Consumer<? super T> onRemovedAfter,
+ UpdateQuery<T> updateQuery) {
+
//TODO: logging
//TODO: logging
//TODO: logging
@@ -222,9 +248,9 @@
onAddedBefore.accept(t);
try {
- Runnable terminator = op.publish(t);
+ OSGiResult terminator = op.publish(t);
- OSGiResult result = () -> {
+ OSGiResult result = new OSGiResultImpl(() -> {
try {
onRemovedBefore.accept(t);
}
@@ -245,7 +271,19 @@
catch (Exception e) {
//TODO: logging
}
- };
+ },
+ us -> {
+ UpdateQuery.From<T>[] froms = updateQuery.froms;
+
+ for (UpdateQuery.From<T> from : froms) {
+ if (from.selector == us || from.selector == UpdateSelector.ALL) {
+ from.consumer.accept(t);
+ }
+ }
+
+ terminator.update(us);
+ }
+ );
try {
onAddedAfter.accept(t);
@@ -353,11 +391,18 @@
)
));
- return () -> {
- pads.values().forEach(Pad::close);
+ return new OSGiResultImpl(
+ () -> {
+ pads.values().forEach(Pad::close);
- result.close();
- };
+ result.close();
+ },
+ us -> {
+ pads.values().forEach(pad -> pad.update(us));
+
+ result.close();
+ }
+ );
});
}
diff --git a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/BundleOSGi.java b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/BundleOSGi.java
index 48de45f..2b5ca5c 100644
--- a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/BundleOSGi.java
+++ b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/BundleOSGi.java
@@ -17,6 +17,8 @@
package org.apache.aries.component.dsl.internal;
+import org.apache.aries.component.dsl.OSGiResult;
+import org.apache.aries.component.dsl.update.UpdateSelector;
import org.osgi.framework.Bundle;
import org.osgi.framework.BundleEvent;
import org.osgi.util.tracker.BundleTracker;
@@ -29,13 +31,15 @@
public BundleOSGi(int stateMask) {
super((executionContext, op) -> {
- BundleTracker<Runnable> bundleTracker =
+ UpdateSelector updateSelector = new UpdateSelector() {};
+
+ BundleTracker<OSGiResult> bundleTracker =
new BundleTracker<>(
executionContext.getBundleContext(), stateMask,
- new BundleTrackerCustomizer<Runnable>() {
+ new BundleTrackerCustomizer<OSGiResult>() {
@Override
- public Runnable addingBundle(
+ public OSGiResult addingBundle(
Bundle bundle, BundleEvent bundleEvent) {
return op.apply(bundle);
@@ -44,22 +48,26 @@
@Override
public void modifiedBundle(
Bundle bundle, BundleEvent bundleEvent,
- Runnable runnable) {
+ OSGiResult osgiResult) {
+ osgiResult.update(updateSelector);
}
@Override
public void removedBundle(
Bundle bundle, BundleEvent bundleEvent,
- Runnable runnable) {
+ OSGiResult osgiResult) {
- runnable.run();
+ osgiResult.run();
}
});
bundleTracker.open();
- return new OSGiResultImpl(bundleTracker::close);
+ return new OSGiResultImpl(
+ bundleTracker::close,
+ us -> bundleTracker.getTracked().values().forEach(result -> result.update(us))
+ );
});
}
diff --git a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/CoalesceOSGiImpl.java b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/CoalesceOSGiImpl.java
index 2c44cbf..3aff2be 100644
--- a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/CoalesceOSGiImpl.java
+++ b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/CoalesceOSGiImpl.java
@@ -70,7 +70,7 @@
() -> result.set(op.publish(t)));
}
- return () -> UpdateSupport.deferTermination(() -> {
+ return new OSGiResultImpl(() -> UpdateSupport.deferTermination(() -> {
synchronized (initialized) {
result.get().close();
@@ -93,6 +93,11 @@
}
}
}
+ }),
+ us -> {
+ synchronized (initialized) {
+ result.get().update(us);
+ }
});
};
}
@@ -123,6 +128,13 @@
results[i].close();
}
}
+ },
+ us -> {
+ synchronized (initialized) {
+ for (int i = 0; i <= index.get(); i++) {
+ results[i].update(us);
+ }
+ }
}
);
});
diff --git a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/ConfigurationOSGiImpl.java b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/ConfigurationOSGiImpl.java
index 817c617..26cefbe 100644
--- a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/ConfigurationOSGiImpl.java
+++ b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/ConfigurationOSGiImpl.java
@@ -17,6 +17,7 @@
package org.apache.aries.component.dsl.internal;
+import org.apache.aries.component.dsl.OSGiResult;
import org.osgi.framework.BundleContext;
import org.osgi.framework.ServiceReference;
import org.osgi.framework.ServiceRegistration;
@@ -42,8 +43,9 @@
AtomicReference<Configuration> atomicReference =
new AtomicReference<>(null);
- AtomicReference<Runnable>
- terminatorAtomicReference = new AtomicReference<>(() -> {});
+ AtomicReference<OSGiResult>
+ terminatorAtomicReference = new AtomicReference<>(
+ new OSGiResultImpl(NOOP, __ -> {}));
AtomicBoolean closed = new AtomicBoolean();
@@ -138,7 +140,9 @@
serviceRegistration.unregister();
signalLeave(terminatorAtomicReference);
- });
+ },
+ us -> terminatorAtomicReference.get().update(us))
+ ;
});
}
@@ -180,9 +184,9 @@
}
private static void signalLeave(
- AtomicReference<Runnable> terminatorAtomicReference) {
+ AtomicReference<OSGiResult> terminatorAtomicReference) {
- Runnable old = terminatorAtomicReference.getAndSet(null);
+ OSGiResult old = terminatorAtomicReference.getAndSet(null);
if (old != null) {
old.run();
diff --git a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/ConfigurationsOSGiImpl.java b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/ConfigurationsOSGiImpl.java
index 2991daa..fe16e10 100644
--- a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/ConfigurationsOSGiImpl.java
+++ b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/ConfigurationsOSGiImpl.java
@@ -17,6 +17,7 @@
package org.apache.aries.component.dsl.internal;
+import org.apache.aries.component.dsl.OSGiResult;
import org.osgi.framework.BundleContext;
import org.osgi.framework.ServiceReference;
import org.osgi.framework.ServiceRegistration;
@@ -42,7 +43,7 @@
ConcurrentHashMap<String, Configuration> configurations =
new ConcurrentHashMap<>();
- ConcurrentHashMap<String, Runnable> terminators =
+ ConcurrentHashMap<String, OSGiResult> terminators =
new ConcurrentHashMap<>();
AtomicBoolean closed = new AtomicBoolean();
@@ -148,6 +149,13 @@
runnable.run();
}
}
+ },
+ us -> {
+ for (OSGiResult osgiResult : terminators.values()) {
+ if (osgiResult != null) {
+ osgiResult.run();
+ }
+ }
});
});
}
@@ -218,12 +226,12 @@
}
private static void signalLeave(
- String factoryPid, ConcurrentHashMap<String, Runnable> terminators) {
+ String factoryPid, ConcurrentHashMap<String, OSGiResult> terminators) {
- Runnable runnable = terminators.remove(factoryPid);
+ OSGiResult osgiResult = terminators.remove(factoryPid);
- if (runnable != null) {
- runnable.run();
+ if (osgiResult != null) {
+ osgiResult.run();
}
}
diff --git a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/DistributeOSGiImpl.java b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/DistributeOSGiImpl.java
index ee5f800..5f21e76 100644
--- a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/DistributeOSGiImpl.java
+++ b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/DistributeOSGiImpl.java
@@ -44,7 +44,7 @@
OSGiResult result = operation.run(
executionContext,
publisher.pipe(t -> {
- List<Runnable> terminators = new ArrayList<>(funs.length);
+ List<OSGiResult> terminators = new ArrayList<>(funs.length);
int i = 0;
@@ -59,25 +59,18 @@
throw e;
}
- return () -> cleanUp(terminators);
+ return new OSGiResultImpl(
+ () -> cleanUp(terminators),
+ us -> terminators.forEach(os -> os.update(us))
+ );
}));
- return () -> {
- result.close();
-
- for (Pad<T, S> pad : pads) {
- try {
- pad.close();
- }
- catch (Exception e) {
- }
- }
- };
+ return new AggregateOSGiResult(result, new AggregateOSGiResult(pads));
});
}
- private static void cleanUp(List<Runnable> terminators) {
- ListIterator<Runnable> iterator =
+ private static void cleanUp(List<OSGiResult> terminators) {
+ ListIterator<OSGiResult> iterator =
terminators.listIterator(terminators.size());
while (iterator.hasPrevious()) {
diff --git a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/EffectsOSGi.java b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/EffectsOSGi.java
index 8f9f910..932f29c 100644
--- a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/EffectsOSGi.java
+++ b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/EffectsOSGi.java
@@ -18,6 +18,8 @@
package org.apache.aries.component.dsl.internal;
import org.apache.aries.component.dsl.OSGiResult;
+import org.apache.aries.component.dsl.update.UpdateQuery;
+import org.apache.aries.component.dsl.update.UpdateSelector;
/**
* @author Carlos Sierra Andrés
@@ -26,36 +28,49 @@
public EffectsOSGi(
Runnable onAddingBefore, Runnable onAddingAfter,
- Runnable onRemovingBefore, Runnable onRemovingAfter) {
+ Runnable onRemovingBefore, Runnable onRemovingAfter, UpdateQuery<Void> updateQuery) {
super((executionContext, op) -> {
onAddingBefore.run();
try {
- Runnable terminator = op.publish(null);
+ OSGiResult terminator = op.publish(null);
- OSGiResult result = () -> {
- try {
- onRemovingBefore.run();
- }
- catch (Exception e) {
- //TODO: logging
- }
+ OSGiResult result = new OSGiResultImpl(
+ () -> {
+ try {
+ onRemovingBefore.run();
+ }
+ catch (Exception e) {
+ //TODO: logging
+ }
- try {
- terminator.run();
- }
- catch (Exception e) {
- //TODO: logging
- }
+ try {
+ terminator.run();
+ }
+ catch (Exception e) {
+ //TODO: logging
+ }
- try {
- onRemovingAfter.run();
+ try {
+ onRemovingAfter.run();
+ }
+ catch (Exception e) {
+ //TODO: logging
+ }
+ },
+ us -> {
+ UpdateQuery.From<Void>[] froms = updateQuery.froms;
+
+ for (UpdateQuery.From<Void> from : froms) {
+ if (from.selector == us || from.selector == UpdateSelector.ALL) {
+ from.consumer.accept(null);
+ }
+ }
+
+ terminator.update(us);
}
- catch (Exception e) {
- //TODO: logging
- }
- };
+ );
try {
onAddingAfter.run();
@@ -66,7 +81,7 @@
throw e;
}
- return new OSGiResultImpl(result);
+ return result;
}
catch (Exception e) {
try {
diff --git a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/HighestRankingOSGi.java b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/HighestRankingOSGi.java
index 99cd1dc..e9dc9fb 100644
--- a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/HighestRankingOSGi.java
+++ b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/HighestRankingOSGi.java
@@ -56,64 +56,68 @@
Tuple<T> old = sent.get();
if (old != null) {
- old._runnable.run();
+ old.osgiResult.run();
}
- tuple._runnable = publisher.apply(t);
+ tuple.osgiResult = publisher.apply(t);
if (old != null) {
- old._runnable = notHighestPad.publish(old._t);
+ old.osgiResult = notHighestPad.publish(old.t);
}
sent.set(tuple);
} else {
- tuple._runnable = notHighestPad.publish(t);
+ tuple.osgiResult = notHighestPad.publish(t);
}
}
- return () -> {
- synchronized (set) {
- Tuple<T> old = set.peek();
+ return new OSGiResultImpl(
+ () -> {
+ synchronized (set) {
+ Tuple<T> old = set.peek();
- set.remove(tuple);
+ set.remove(tuple);
- Tuple<T> current = set.peek();
+ Tuple<T> current = set.peek();
- tuple._runnable.run();
+ tuple.osgiResult.run();
- if (current != old && current != null) {
- current._runnable.run();
- current._runnable = publisher.apply(
- current._t);
- sent.set(current);
+ if (current != old && current != null) {
+ current.osgiResult.run();
+ current.osgiResult = publisher.apply(
+ current.t);
+ sent.set(current);
+ }
+ if (current == null) {
+ sent.set(null);
+ }
}
- if (current == null) {
- sent.set(null);
+ },
+ us -> {
+ synchronized (set) {
+ Tuple<T> current = set.peek();
+
+ current.osgiResult.update(us);
}
}
- };
+ );
}));
- return new OSGiResultImpl(
- () -> {
- result.close();
-
- notHighestPad.close();
- });
+ return new AggregateOSGiResult(result, notHighestPad);
});
}
private static class Tuple<T> {
Tuple(T t) {
- _t = t;
+ this.t = t;
}
public T getT() {
- return _t;
+ return t;
}
- T _t;
- Runnable _runnable;
+ T t;
+ OSGiResult osgiResult;
}
diff --git a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/JustOSGiImpl.java b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/JustOSGiImpl.java
index 047e9bf..8b4dae7 100644
--- a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/JustOSGiImpl.java
+++ b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/JustOSGiImpl.java
@@ -52,7 +52,9 @@
}
return new OSGiResultImpl(
- () -> cleanUp(references));
+ () -> cleanUp(references),
+ us -> {}
+ );
});
}
diff --git a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/NothingOSGiImpl.java b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/NothingOSGiImpl.java
index 54aa504..5c2dfd3 100644
--- a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/NothingOSGiImpl.java
+++ b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/NothingOSGiImpl.java
@@ -25,6 +25,6 @@
public class NothingOSGiImpl<S> extends OSGiImpl<S> {
public NothingOSGiImpl() {
- super((executionContext, __) -> new OSGiResultImpl(OSGi.NOOP));
+ super((executionContext, __) -> new OSGiResultImpl(OSGi.NOOP, ___ -> {}));
}
}
diff --git a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/OSGiResultImpl.java b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/OSGiResultImpl.java
index 8851c66..bce94c3 100644
--- a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/OSGiResultImpl.java
+++ b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/OSGiResultImpl.java
@@ -18,16 +18,19 @@
package org.apache.aries.component.dsl.internal;
import org.apache.aries.component.dsl.OSGiResult;
+import org.apache.aries.component.dsl.update.UpdateSelector;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
/**
* @author Carlos Sierra Andrés
*/
public class OSGiResultImpl implements OSGiResult {
- public OSGiResultImpl(OSGiResult close) {
+ public OSGiResultImpl(Runnable close, Consumer<UpdateSelector> onUpdate) {
this.close = close;
+ this.onUpdate = onUpdate;
}
@Override
@@ -37,7 +40,18 @@
}
}
+ @Override
+ public void update(UpdateSelector updateSelector) {
+ if (_closed.get()) {
+ return;
+ }
+
+ onUpdate.accept(updateSelector);
+ }
+
private final Runnable close;
+ private Consumer<UpdateSelector> onUpdate;
+ private Runnable update;
private AtomicBoolean _closed = new AtomicBoolean();
}
diff --git a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/OnlyLastPublisher.java b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/OnlyLastPublisher.java
index 50bf7f4..cd316da 100644
--- a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/OnlyLastPublisher.java
+++ b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/OnlyLastPublisher.java
@@ -44,7 +44,7 @@
private final Publisher<? super T> _op;
private AtomicLong _counter = new AtomicLong();
private Supplier<T> _injectOnLeave;
- private Runnable _terminator;
+ private OSGiResult _terminator;
@Override
public synchronized OSGiResult publish(T t) {
@@ -58,16 +58,24 @@
else {
_counter.incrementAndGet();
- return () -> {
- synchronized (this) {
- _terminator.run();
+ return new OSGiResultImpl(
+ () -> {
+ synchronized (this) {
+ _terminator.run();
- if (_counter.decrementAndGet() > 0) {
- _terminator = _op.publish(_injectOnLeave.get());
+ if (_counter.decrementAndGet() > 0) {
+ _terminator = _op.publish(_injectOnLeave.get());
+ }
}
- }
- };
+ },
+ us -> _terminator.update(us)
+ );
}
}
+ @Override
+ public <E extends Exception> OSGiResult error(T t, Exception e) throws E {
+ return _op.error(t, e);
+ }
+
}
diff --git a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/Pad.java b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/Pad.java
index d9615a7..fd6f305 100644
--- a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/Pad.java
+++ b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/Pad.java
@@ -21,8 +21,8 @@
import org.apache.aries.component.dsl.OSGiResult;
import org.apache.aries.component.dsl.OSGiRunnable.ExecutionContext;
import org.apache.aries.component.dsl.Publisher;
+import org.apache.aries.component.dsl.update.UpdateSelector;
-import java.io.Closeable;
import java.util.function.Function;
import static org.apache.aries.component.dsl.OSGi.NOOP;
@@ -30,7 +30,7 @@
/**
* @author Carlos Sierra Andrés
*/
-public class Pad<T, S> implements Publisher<T>, Closeable {
+public class Pad<T, S> implements Publisher<T>, OSGiResult {
public Pad(
ExecutionContext bundleContext,
@@ -56,6 +56,11 @@
}
@Override
+ public void update(UpdateSelector updateSelector) {
+ _result.update(updateSelector);
+ }
+
+ @Override
public OSGiResult publish(T t) {
return _publisher.publish(t);
}
diff --git a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/ProbeImpl.java b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/ProbeImpl.java
index 1ee95ab..b825dc3 100644
--- a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/ProbeImpl.java
+++ b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/ProbeImpl.java
@@ -36,7 +36,7 @@
private static class ProbeOperationImpl<T> implements OSGiRunnable<T> {
- private OSGiResult _onClose = NOOP;
+ private volatile OSGiResult _onClose = NOOP;
@Override
public OSGiResultImpl run(
@@ -44,7 +44,9 @@
_op = op;
return new OSGiResultImpl(
- () -> {_onClose.close(); _onClose = NOOP;});
+ () -> {_onClose.close(); _onClose = NOOP;},
+ us -> _onClose.update(us)
+ );
}
Publisher<? super T> _op;
diff --git a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/ServiceReferenceOSGi.java b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/ServiceReferenceOSGi.java
index 37ffe30..a5ac0d2 100644
--- a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/ServiceReferenceOSGi.java
+++ b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/ServiceReferenceOSGi.java
@@ -17,6 +17,7 @@
package org.apache.aries.component.dsl.internal;
+import org.apache.aries.component.dsl.OSGiResult;
import org.apache.aries.component.dsl.Refresher;
import org.apache.aries.component.dsl.CachingServiceReference;
import org.apache.aries.component.dsl.Publisher;
@@ -41,7 +42,7 @@
Refresher<? super CachingServiceReference<T>> refresher) {
super((executionContext, op) -> {
- ServiceTracker<T, ?>
+ ServiceTracker<T, Tracked<T>>
serviceTracker = new ServiceTracker<>(
executionContext.getBundleContext(),
buildFilter(executionContext, filterString, clazz),
@@ -49,7 +50,11 @@
serviceTracker.open();
- return new OSGiResultImpl(serviceTracker::close);
+ return new OSGiResultImpl(
+ serviceTracker::close,
+ us -> serviceTracker.getTracked().forEach(
+ (__, tracked) -> tracked.runnable.update(us))
+ );
});
}
@@ -105,14 +110,14 @@
public Tracked(
CachingServiceReference<T> cachingServiceReference,
- Runnable runnable) {
+ OSGiResult osgiResult) {
this.cachingServiceReference = cachingServiceReference;
- this.runnable = runnable;
+ this.runnable = osgiResult;
}
volatile CachingServiceReference<T> cachingServiceReference;
- volatile Runnable runnable;
+ volatile OSGiResult runnable;
}
}
diff --git a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/ServiceRegistrationOSGiImpl.java b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/ServiceRegistrationOSGiImpl.java
index df5cb23..d4479e8 100644
--- a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/ServiceRegistrationOSGiImpl.java
+++ b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/ServiceRegistrationOSGiImpl.java
@@ -17,6 +17,7 @@
package org.apache.aries.component.dsl.internal;
+import org.apache.aries.component.dsl.OSGiResult;
import org.apache.aries.component.dsl.Publisher;
import org.osgi.framework.ServiceFactory;
import org.osgi.framework.ServiceRegistration;
@@ -86,7 +87,7 @@
ServiceRegistration<?> serviceRegistration,
Publisher<? super ServiceRegistration<T>> op) {
- Runnable terminator = ((Publisher)op).publish(serviceRegistration);
+ OSGiResult terminator = ((Publisher)op).publish(serviceRegistration);
return new OSGiResultImpl(
() -> {
@@ -98,7 +99,9 @@
finally {
terminator.run();
}
- });
+ },
+ terminator::update
+ );
}
}
diff --git a/component-dsl/src/main/java/org/apache/aries/component/dsl/update/UpdateQuery.java b/component-dsl/src/main/java/org/apache/aries/component/dsl/update/UpdateQuery.java
new file mode 100644
index 0000000..d6e3870
--- /dev/null
+++ b/component-dsl/src/main/java/org/apache/aries/component/dsl/update/UpdateQuery.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.aries.component.dsl.update;
+
+import java.util.function.Consumer;
+
+/**
+ * @author Carlos Sierra Andrés
+ */
+public final class UpdateQuery<T> {
+ public final From<T>[] froms;
+
+ @SafeVarargs
+ public UpdateQuery(From<T>... froms) {
+ this.froms = froms;
+ }
+
+ @SafeVarargs
+ public static <T> UpdateQuery<T> onUpdate(From<T> ... froms) {
+ return new UpdateQuery<>(froms);
+ }
+
+ public static class From<T> {
+ public final UpdateSelector selector;
+ public final Consumer<T> consumer;
+
+ public From(UpdateSelector selector, Consumer<T> consumer) {
+ this.selector = selector;
+ this.consumer = consumer;
+ }
+
+ public static <T> From<T> from(UpdateSelector selector, Consumer<T> consumer) {
+ return new From<>(selector, consumer);
+ }
+ }
+}
diff --git a/component-dsl/src/main/java/org/apache/aries/component/dsl/update/UpdateSelector.java b/component-dsl/src/main/java/org/apache/aries/component/dsl/update/UpdateSelector.java
new file mode 100644
index 0000000..2478759
--- /dev/null
+++ b/component-dsl/src/main/java/org/apache/aries/component/dsl/update/UpdateSelector.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.aries.component.dsl.update;
+
+public interface UpdateSelector {
+
+ public static final UpdateSelector ALL = new UpdateSelector() {};
+}
diff --git a/component-dsl/src/main/java/org/apache/aries/component/dsl/update/package-info.java b/component-dsl/src/main/java/org/apache/aries/component/dsl/update/package-info.java
new file mode 100644
index 0000000..4a45062
--- /dev/null
+++ b/component-dsl/src/main/java/org/apache/aries/component/dsl/update/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+@org.osgi.annotation.bundle.Export
+@org.osgi.annotation.versioning.Version("1.0.0")
+package org.apache.aries.component.dsl.update;
diff --git a/itests-run/itest.bndrun b/itests-run/itest.bndrun
index 930f56e..a200c08 100644
--- a/itests-run/itest.bndrun
+++ b/itests-run/itest.bndrun
@@ -32,6 +32,7 @@
org.apache.servicemix.bundles.junit;version='[4.12.0,4.12.1)',\
org.apache.felix.configadmin;version='[1.9.14,1.9.15)',\
org.osgi.service.cm;version='[1.6.0,1.6.1)',\
- org.apache.aries.component-dsl.itests;version='[2.0.0,2.0.1)'
+ org.apache.aries.component-dsl.itests;version='[2.0.0,2.0.1)',\
+ org.apache.aries.component-dsl.component-dsl;version='[2.0.0,2.0.1)'
-include: -personal.bnd