[Component-DSL] Add refresher interface
this allows users of the API to specify a predicate to trigger removing
and adding of a CachingServiceReference when the underlying
ServiceReference has been modified.
git-svn-id: https://svn.apache.org/repos/asf/aries/trunk@1814214 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/component-dsl/src/main/java/org/apache/aries/osgi/functional/OSGi.java b/component-dsl/src/main/java/org/apache/aries/osgi/functional/OSGi.java
index 8383f9a..6029e70 100644
--- a/component-dsl/src/main/java/org/apache/aries/osgi/functional/OSGi.java
+++ b/component-dsl/src/main/java/org/apache/aries/osgi/functional/OSGi.java
@@ -234,6 +234,28 @@
return new ServiceReferenceOSGi<>(filterString, clazz);
}
+ static <T> OSGi<CachingServiceReference<T>> serviceReferences(
+ Class<T> clazz, String filterString,
+ Refresher<? super CachingServiceReference<T>> onModified) {
+
+ return new ServiceReferenceOSGi<>(filterString, clazz, onModified);
+ }
+
+ static <T> OSGi<CachingServiceReference<T>> serviceReferences(
+ Class<T> clazz, Refresher<? super CachingServiceReference<T>> onModified) {
+
+ return new ServiceReferenceOSGi<>(null, clazz, onModified);
+ }
+
+ static OSGi<CachingServiceReference<Object>> serviceReferences(
+ String filterString,
+ Refresher<? super CachingServiceReference<Object>> onModified) {
+
+ return new ServiceReferenceOSGi<>(filterString, null, onModified);
+ }
+
+
+
@SafeVarargs
static <T> OSGi<T> all(OSGi<T> ... programs) {
return new DistributeOSGi<>(programs);
diff --git a/component-dsl/src/main/java/org/apache/aries/osgi/functional/Refresher.java b/component-dsl/src/main/java/org/apache/aries/osgi/functional/Refresher.java
new file mode 100644
index 0000000..1409517
--- /dev/null
+++ b/component-dsl/src/main/java/org/apache/aries/osgi/functional/Refresher.java
@@ -0,0 +1,10 @@
+package org.apache.aries.osgi.functional;
+
+import java.util.function.Predicate;
+
+/**
+ * @author Carlos Sierra Andrés
+ */
+public interface Refresher<T> extends Predicate<T> {
+
+}
diff --git a/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ServiceReferenceOSGi.java b/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ServiceReferenceOSGi.java
index 0aac138..abe875f 100644
--- a/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ServiceReferenceOSGi.java
+++ b/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ServiceReferenceOSGi.java
@@ -18,14 +18,11 @@
package org.apache.aries.osgi.functional.internal;
import org.apache.aries.osgi.functional.CachingServiceReference;
-import org.apache.aries.osgi.functional.OSGi;
-import org.apache.aries.osgi.functional.OSGiResult;
-import org.osgi.framework.BundleContext;
+import org.apache.aries.osgi.functional.Refresher;
import org.osgi.framework.ServiceReference;
import org.osgi.util.tracker.ServiceTracker;
import org.osgi.util.tracker.ServiceTrackerCustomizer;
-import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
/**
@@ -34,37 +31,22 @@
public class ServiceReferenceOSGi<T>
extends OSGiImpl<CachingServiceReference<T>> {
- private String _filterString;
- private Class<T> _clazz;
+ public ServiceReferenceOSGi(
+ String filterString, Class<T> clazz) {
- public ServiceReferenceOSGi(String filterString, Class<T> clazz) {
+ this(filterString, clazz, CachingServiceReference::isDirty);
+ }
+
+ public ServiceReferenceOSGi(
+ String filterString, Class<T> clazz,
+ Refresher<? super CachingServiceReference<T>> refresher) {
+
super((bundleContext, op) -> {
- ServiceTracker<T, AtomicReference<Runnable>>
+ ServiceTracker<T, ?>
serviceTracker = new ServiceTracker<>(
bundleContext,
buildFilter(bundleContext, filterString, clazz),
- new DefaultServiceTrackerCustomizer<>(op));
-
- return new OSGiResultImpl(
- serviceTracker::open, serviceTracker::close);
- });
-
- _filterString = filterString;
- _clazz = clazz;
- }
-
- @Override
- public <S> OSGiImpl<S> flatMap(
- Function<? super CachingServiceReference<T>, OSGi<? extends S>> fun) {
-
- return new OSGiImpl<>((bundleContext, op) -> {
- ServiceTracker<T, ?> serviceTracker =
- new ServiceTracker<>(
- bundleContext,
- buildFilter(
- bundleContext, _filterString, _clazz),
- new FlatMapServiceTrackerCustomizer<>(
- fun, bundleContext, op));
+ new DefaultServiceTrackerCustomizer<>(op, refresher));
return new OSGiResultImpl(
serviceTracker::open, serviceTracker::close);
@@ -72,99 +54,62 @@
}
private static class DefaultServiceTrackerCustomizer<T>
- implements ServiceTrackerCustomizer<T, AtomicReference<Runnable>> {
+ implements ServiceTrackerCustomizer<T, Tracked<T>> {
private final Function<CachingServiceReference<T>, Runnable>
_addedSource;
+ private Refresher<? super CachingServiceReference<T>> _refresher;
public DefaultServiceTrackerCustomizer(
- Function<CachingServiceReference<T>, Runnable> addedSource) {
+ Function<CachingServiceReference<T>, Runnable> addedSource,
+ Refresher<? super CachingServiceReference<T>> refresher) {
_addedSource = addedSource;
+ _refresher = refresher;
}
@Override
- public AtomicReference<Runnable> addingService(
- ServiceReference<T> reference) {
+ public Tracked<T> addingService(ServiceReference<T> reference) {
+ CachingServiceReference<T> cachingServiceReference =
+ new CachingServiceReference<>(reference);
- return new AtomicReference<>(
- _addedSource.apply(new CachingServiceReference<>(reference)));
+ return new Tracked<>(
+ cachingServiceReference,
+ _addedSource.apply(cachingServiceReference));
}
@Override
public void modifiedService(
- ServiceReference<T> reference,
- AtomicReference<Runnable> tupleReference) {
+ ServiceReference<T> reference, Tracked<T> tracked) {
- tupleReference.get().run();
-
- tupleReference.set(
- _addedSource.apply(new CachingServiceReference<>(reference)));
+ if (_refresher.test(tracked.cachingServiceReference)) {
+ tracked.runnable.run();
+ tracked.cachingServiceReference = new CachingServiceReference<>(
+ reference);
+ tracked.runnable =
+ _addedSource.apply(tracked.cachingServiceReference);
+ }
}
@Override
public void removedService(
- ServiceReference<T> reference,
- AtomicReference<Runnable> runnable) {
+ ServiceReference<T> reference, Tracked<T> tracked) {
- runnable.get().run();
+ tracked.runnable.run();
}
}
- private static class FlatMapServiceTrackerCustomizer<T, S>
- implements ServiceTrackerCustomizer<T, AtomicReference<OSGiResult>> {
- private final Function<? super CachingServiceReference<T>, OSGi<? extends S>>
- _fun;
- private final BundleContext _bundleContext;
- private final Function<S, Runnable> _op;
+ private static class Tracked<T> {
- FlatMapServiceTrackerCustomizer(
- Function<? super CachingServiceReference<T>, OSGi<? extends S>> fun,
- BundleContext bundleContext, Function<S, Runnable> op) {
+ volatile CachingServiceReference<T> cachingServiceReference;
+ volatile Runnable runnable;
- _fun = fun;
- _bundleContext = bundleContext;
- _op = op;
+ public Tracked(
+ CachingServiceReference<T> cachingServiceReference,
+ Runnable runnable) {
+
+ this.cachingServiceReference = cachingServiceReference;
+ this.runnable = runnable;
}
-
- @Override
- public AtomicReference<OSGiResult> addingService(
- ServiceReference<T> reference) {
-
- OSGiResultImpl osgiResult = doFlatMap(
- new CachingServiceReference<>(reference));
-
- return new AtomicReference<>(osgiResult);
- }
-
- private OSGiResultImpl doFlatMap(CachingServiceReference<T> reference) {
- OSGiImpl<S> program = (OSGiImpl<S>) _fun.apply(reference);
-
- OSGiResultImpl result = program._operation.run(_bundleContext, _op);
-
- result.start();
-
- return result;
- }
-
- @Override
- public void modifiedService(
- ServiceReference<T> reference,
- AtomicReference<OSGiResult> tracked) {
-
- tracked.get().close();
-
- tracked.set(doFlatMap(new CachingServiceReference<>(reference)));
- }
-
- @Override
- public void removedService(
- ServiceReference<T> reference,
- AtomicReference<OSGiResult> tracked) {
-
- tracked.get().close();
- }
-
}
-
}
diff --git a/itests/src/main/java/org/apache/aries/osgi/functional/test/DSLTest.java b/itests/src/main/java/org/apache/aries/osgi/functional/test/DSLTest.java
index cc7d2c4..b6ef328 100644
--- a/itests/src/main/java/org/apache/aries/osgi/functional/test/DSLTest.java
+++ b/itests/src/main/java/org/apache/aries/osgi/functional/test/DSLTest.java
@@ -20,6 +20,7 @@
import org.apache.aries.osgi.functional.CachingServiceReference;
import org.apache.aries.osgi.functional.OSGi;
import org.apache.aries.osgi.functional.OSGiResult;
+import org.apache.aries.osgi.functional.Refresher;
import org.apache.aries.osgi.functional.SentEvent;
import org.apache.aries.osgi.functional.internal.ProbeImpl;
import org.junit.Test;
@@ -666,6 +667,48 @@
assertEquals(0, count.get());
}
+ @Test
+ public void testServiceReferenceRefresher() {
+ ServiceRegistration<Service> serviceRegistration =
+ bundleContext.registerService(
+ Service.class, new Service(),
+ new Hashtable<String, Object>() {{
+ put("good", 0);
+ put("bad", 0);
+ }});
+
+ AtomicInteger atomicInteger = new AtomicInteger();
+
+ try {
+ /* reload only when property "good" has changed */
+ OSGi<?> program = serviceReferences(
+ Service.class, csr -> csr.isDirty("good")).map(
+ csr -> csr.getProperty("good"));
+
+ program.run(bundleContext, (__) -> atomicInteger.incrementAndGet());
+
+ assertEquals(1, atomicInteger.get());
+
+ serviceRegistration.setProperties(
+ new Hashtable<String, Object>() {{
+ put("good", 0);
+ put("bad", 1);
+ }});
+
+ assertEquals(1, atomicInteger.get());
+
+ serviceRegistration.setProperties(
+ new Hashtable<String, Object>() {{
+ put("good", 1);
+ put("bad", 1);
+ }});
+
+ assertEquals(2, atomicInteger.get());
+ }
+ finally {
+ serviceRegistration.unregister();
+ }
+ }
private class Service {}