[Component-DSL] Add refresher interface

this allows users of the API to specify a predicate to trigger removing
and adding of a CachingServiceReference when the underlying
ServiceReference has been modified.

git-svn-id: https://svn.apache.org/repos/asf/aries/trunk@1814214 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/component-dsl/src/main/java/org/apache/aries/osgi/functional/OSGi.java b/component-dsl/src/main/java/org/apache/aries/osgi/functional/OSGi.java
index 8383f9a..6029e70 100644
--- a/component-dsl/src/main/java/org/apache/aries/osgi/functional/OSGi.java
+++ b/component-dsl/src/main/java/org/apache/aries/osgi/functional/OSGi.java
@@ -234,6 +234,28 @@
 		return new ServiceReferenceOSGi<>(filterString, clazz);
 	}
 
+	static <T> OSGi<CachingServiceReference<T>> serviceReferences(
+		Class<T> clazz, String filterString,
+		Refresher<? super CachingServiceReference<T>> onModified) {
+
+		return new ServiceReferenceOSGi<>(filterString, clazz, onModified);
+	}
+
+	static <T> OSGi<CachingServiceReference<T>> serviceReferences(
+		Class<T> clazz, Refresher<? super CachingServiceReference<T>> onModified) {
+
+		return new ServiceReferenceOSGi<>(null, clazz, onModified);
+	}
+
+	static OSGi<CachingServiceReference<Object>> serviceReferences(
+		String filterString,
+		Refresher<? super CachingServiceReference<Object>> onModified) {
+
+		return new ServiceReferenceOSGi<>(filterString, null, onModified);
+	}
+
+
+
 	@SafeVarargs
 	static <T> OSGi<T> all(OSGi<T> ... programs) {
 		return new DistributeOSGi<>(programs);
diff --git a/component-dsl/src/main/java/org/apache/aries/osgi/functional/Refresher.java b/component-dsl/src/main/java/org/apache/aries/osgi/functional/Refresher.java
new file mode 100644
index 0000000..1409517
--- /dev/null
+++ b/component-dsl/src/main/java/org/apache/aries/osgi/functional/Refresher.java
@@ -0,0 +1,10 @@
+package org.apache.aries.osgi.functional;
+
+import java.util.function.Predicate;
+
+/**
+ * @author Carlos Sierra Andrés
+ */
+public interface Refresher<T> extends Predicate<T> {
+
+}
diff --git a/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ServiceReferenceOSGi.java b/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ServiceReferenceOSGi.java
index 0aac138..abe875f 100644
--- a/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ServiceReferenceOSGi.java
+++ b/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ServiceReferenceOSGi.java
@@ -18,14 +18,11 @@
 package org.apache.aries.osgi.functional.internal;
 
 import org.apache.aries.osgi.functional.CachingServiceReference;
-import org.apache.aries.osgi.functional.OSGi;
-import org.apache.aries.osgi.functional.OSGiResult;
-import org.osgi.framework.BundleContext;
+import org.apache.aries.osgi.functional.Refresher;
 import org.osgi.framework.ServiceReference;
 import org.osgi.util.tracker.ServiceTracker;
 import org.osgi.util.tracker.ServiceTrackerCustomizer;
 
-import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
 
 /**
@@ -34,37 +31,22 @@
 public class ServiceReferenceOSGi<T>
 	extends OSGiImpl<CachingServiceReference<T>> {
 
-	private String _filterString;
-	private Class<T> _clazz;
+	public ServiceReferenceOSGi(
+		String filterString, Class<T> clazz) {
 
-	public ServiceReferenceOSGi(String filterString, Class<T> clazz) {
+		this(filterString, clazz, CachingServiceReference::isDirty);
+	}
+
+	public ServiceReferenceOSGi(
+		String filterString, Class<T> clazz,
+		Refresher<? super CachingServiceReference<T>> refresher) {
+
 		super((bundleContext, op) -> {
-			ServiceTracker<T, AtomicReference<Runnable>>
+			ServiceTracker<T, ?>
 				serviceTracker = new ServiceTracker<>(
 					bundleContext,
 					buildFilter(bundleContext, filterString, clazz),
-					new DefaultServiceTrackerCustomizer<>(op));
-
-			return new OSGiResultImpl(
-				serviceTracker::open, serviceTracker::close);
-		});
-
-		_filterString = filterString;
-		_clazz = clazz;
-	}
-
-	@Override
-	public <S> OSGiImpl<S> flatMap(
-		Function<? super CachingServiceReference<T>, OSGi<? extends S>> fun) {
-
-		return new OSGiImpl<>((bundleContext, op) -> {
-			ServiceTracker<T, ?> serviceTracker =
-				new ServiceTracker<>(
-					bundleContext,
-					buildFilter(
-						bundleContext, _filterString, _clazz),
-						new FlatMapServiceTrackerCustomizer<>(
-							fun, bundleContext, op));
+					new DefaultServiceTrackerCustomizer<>(op, refresher));
 
 			return new OSGiResultImpl(
 				serviceTracker::open, serviceTracker::close);
@@ -72,99 +54,62 @@
 	}
 
 	private static class DefaultServiceTrackerCustomizer<T>
-		implements ServiceTrackerCustomizer<T, AtomicReference<Runnable>> {
+		implements ServiceTrackerCustomizer<T, Tracked<T>> {
 
 		private final Function<CachingServiceReference<T>, Runnable>
 			_addedSource;
+		private Refresher<? super CachingServiceReference<T>> _refresher;
 
 		public DefaultServiceTrackerCustomizer(
-			Function<CachingServiceReference<T>, Runnable> addedSource) {
+			Function<CachingServiceReference<T>, Runnable> addedSource,
+			Refresher<? super CachingServiceReference<T>> refresher) {
 
 			_addedSource = addedSource;
+			_refresher = refresher;
 		}
 
 		@Override
-		public AtomicReference<Runnable> addingService(
-			ServiceReference<T> reference) {
+		public Tracked<T> addingService(ServiceReference<T> reference) {
+			CachingServiceReference<T> cachingServiceReference =
+				new CachingServiceReference<>(reference);
 
-			return new AtomicReference<>(
-				_addedSource.apply(new CachingServiceReference<>(reference)));
+			return new Tracked<>(
+				cachingServiceReference,
+				_addedSource.apply(cachingServiceReference));
 		}
 
 		@Override
 		public void modifiedService(
-			ServiceReference<T> reference,
-			AtomicReference<Runnable> tupleReference) {
+			ServiceReference<T> reference, Tracked<T> tracked) {
 
-			tupleReference.get().run();
-
-			tupleReference.set(
-				_addedSource.apply(new CachingServiceReference<>(reference)));
+			if (_refresher.test(tracked.cachingServiceReference)) {
+				tracked.runnable.run();
+				tracked.cachingServiceReference = new CachingServiceReference<>(
+					reference);
+				tracked.runnable =
+                    _addedSource.apply(tracked.cachingServiceReference);
+			}
 		}
 
 		@Override
 		public void removedService(
-			ServiceReference<T> reference,
-			AtomicReference<Runnable> runnable) {
+			ServiceReference<T> reference, Tracked<T> tracked) {
 
-			runnable.get().run();
+			tracked.runnable.run();
 		}
 	}
 
-	private static class FlatMapServiceTrackerCustomizer<T, S>
-		implements ServiceTrackerCustomizer<T, AtomicReference<OSGiResult>> {
-		private final Function<? super CachingServiceReference<T>, OSGi<? extends S>>
-			_fun;
-		private final BundleContext _bundleContext;
-		private final Function<S, Runnable> _op;
+	private static class Tracked<T> {
 
-		FlatMapServiceTrackerCustomizer(
-			Function<? super CachingServiceReference<T>, OSGi<? extends S>> fun,
-			BundleContext bundleContext, Function<S, Runnable> op) {
+		volatile CachingServiceReference<T> cachingServiceReference;
+		volatile Runnable runnable;
 
-			_fun = fun;
-			_bundleContext = bundleContext;
-			_op = op;
+		public Tracked(
+			CachingServiceReference<T> cachingServiceReference,
+			Runnable runnable) {
+
+			this.cachingServiceReference = cachingServiceReference;
+			this.runnable = runnable;
 		}
-
-		@Override
-        public AtomicReference<OSGiResult> addingService(
-        	ServiceReference<T> reference) {
-
-			OSGiResultImpl osgiResult = doFlatMap(
-				new CachingServiceReference<>(reference));
-
-			return new AtomicReference<>(osgiResult);
-        }
-
-		private OSGiResultImpl doFlatMap(CachingServiceReference<T> reference) {
-			OSGiImpl<S> program = (OSGiImpl<S>) _fun.apply(reference);
-
-			OSGiResultImpl result = program._operation.run(_bundleContext, _op);
-
-			result.start();
-
-			return result;
-		}
-
-		@Override
-        public void modifiedService(
-        	ServiceReference<T> reference,
-			AtomicReference<OSGiResult> tracked) {
-
-			tracked.get().close();
-
-			tracked.set(doFlatMap(new CachingServiceReference<>(reference)));
-        }
-
-		@Override
-        public void removedService(
-            ServiceReference<T> reference,
-			AtomicReference<OSGiResult> tracked) {
-
-            tracked.get().close();
-        }
-
 	}
-
 }
diff --git a/itests/src/main/java/org/apache/aries/osgi/functional/test/DSLTest.java b/itests/src/main/java/org/apache/aries/osgi/functional/test/DSLTest.java
index cc7d2c4..b6ef328 100644
--- a/itests/src/main/java/org/apache/aries/osgi/functional/test/DSLTest.java
+++ b/itests/src/main/java/org/apache/aries/osgi/functional/test/DSLTest.java
@@ -20,6 +20,7 @@
 import org.apache.aries.osgi.functional.CachingServiceReference;
 import org.apache.aries.osgi.functional.OSGi;
 import org.apache.aries.osgi.functional.OSGiResult;
+import org.apache.aries.osgi.functional.Refresher;
 import org.apache.aries.osgi.functional.SentEvent;
 import org.apache.aries.osgi.functional.internal.ProbeImpl;
 import org.junit.Test;
@@ -666,6 +667,48 @@
         assertEquals(0, count.get());
     }
 
+    @Test
+    public void testServiceReferenceRefresher() {
+        ServiceRegistration<Service> serviceRegistration =
+            bundleContext.registerService(
+                Service.class, new Service(),
+                new Hashtable<String, Object>() {{
+                    put("good", 0);
+                    put("bad", 0);
+                }});
+
+        AtomicInteger atomicInteger = new AtomicInteger();
+
+        try {
+            /* reload only when property "good" has changed */
+            OSGi<?> program = serviceReferences(
+                Service.class, csr -> csr.isDirty("good")).map(
+                    csr -> csr.getProperty("good"));
+
+            program.run(bundleContext, (__) -> atomicInteger.incrementAndGet());
+
+            assertEquals(1, atomicInteger.get());
+
+            serviceRegistration.setProperties(
+                new Hashtable<String, Object>() {{
+                    put("good", 0);
+                    put("bad", 1);
+                }});
+
+            assertEquals(1, atomicInteger.get());
+
+            serviceRegistration.setProperties(
+                new Hashtable<String, Object>() {{
+                    put("good", 1);
+                    put("bad", 1);
+                }});
+
+            assertEquals(2, atomicInteger.get());
+        }
+        finally {
+            serviceRegistration.unregister();
+        }
+    }
 
     private class Service {}