Add update support to OSGi result
diff --git a/component-dsl/src/main/java/org/apache/aries/component/dsl/OSGi.java b/component-dsl/src/main/java/org/apache/aries/component/dsl/OSGi.java
index 619e22f..50502ba 100644
--- a/component-dsl/src/main/java/org/apache/aries/component/dsl/OSGi.java
+++ b/component-dsl/src/main/java/org/apache/aries/component/dsl/OSGi.java
@@ -44,6 +44,7 @@
 import org.apache.aries.component.dsl.function.Function3;
 import org.apache.aries.component.dsl.function.Function5;
 import org.apache.aries.component.dsl.function.Function7;
+import org.apache.aries.component.dsl.update.UpdateQuery;
 import org.osgi.framework.Bundle;
 import org.osgi.framework.BundleContext;
 import org.osgi.framework.ServiceFactory;
@@ -303,11 +304,12 @@
 			() -> effect.getOnIncoming().accept(null),
 			NOOP,
 			NOOP,
-			() -> effect.getOnLeaving().accept(null));
+			() -> effect.getOnLeaving().accept(null),
+			UpdateQuery.onUpdate());
 	}
 
 	static OSGi<Void> effects(Runnable onAdding, Runnable onRemoving) {
-		return new EffectsOSGi(onAdding, NOOP, NOOP, onRemoving);
+		return new EffectsOSGi(onAdding, NOOP, NOOP, onRemoving, UpdateQuery.onUpdate());
 	}
 
 	static OSGi<Void> effects(
@@ -315,7 +317,16 @@
 		Runnable onRemovingBefore, Runnable onRemovingAfter) {
 
 		return new EffectsOSGi(
-			onAddingBefore, onAddingAfter, onRemovingBefore, onRemovingAfter);
+			onAddingBefore, onAddingAfter, onRemovingBefore, onRemovingAfter,
+			UpdateQuery.onUpdate());
+	}
+
+	static OSGi<Void> effects(
+		Runnable onAddingBefore, Runnable onAddingAfter,
+		Runnable onRemovingBefore, Runnable onRemovingAfter, UpdateQuery<Void> updateQuery) {
+
+		return new EffectsOSGi(
+			onAddingBefore, onAddingAfter, onRemovingBefore, onRemovingAfter, updateQuery);
 	}
 
 	static <T> OSGi<T> fromOsgiRunnable(OSGiRunnable<T> runnable) {
diff --git a/component-dsl/src/main/java/org/apache/aries/component/dsl/OSGiResult.java b/component-dsl/src/main/java/org/apache/aries/component/dsl/OSGiResult.java
index 0434e2b..ad56515 100644
--- a/component-dsl/src/main/java/org/apache/aries/component/dsl/OSGiResult.java
+++ b/component-dsl/src/main/java/org/apache/aries/component/dsl/OSGiResult.java
@@ -17,6 +17,8 @@
 
 package org.apache.aries.component.dsl;
 
+import org.apache.aries.component.dsl.update.UpdateSelector;
+
 /**
  * @author Carlos Sierra Andrés
  */
@@ -29,4 +31,6 @@
 		close();
 	}
 
+	public default void update(UpdateSelector updateSelector) {};
+
 }
diff --git a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/AggregateOSGiResult.java b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/AggregateOSGiResult.java
new file mode 100644
index 0000000..24e5124
--- /dev/null
+++ b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/AggregateOSGiResult.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.aries.component.dsl.internal;
+
+import org.apache.aries.component.dsl.OSGiResult;
+import org.apache.aries.component.dsl.update.UpdateSelector;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * @author Carlos Sierra Andrés
+ */
+public class AggregateOSGiResult implements OSGiResult {
+
+    private OSGiResult[] results;
+
+    public AggregateOSGiResult(OSGiResult ... results) {
+        this.results = results;
+    }
+
+    @Override
+    public void close() {
+        if (_closed.compareAndSet(false, true)) {
+            for (OSGiResult result : results) {
+                try {
+                    result.close();
+                } catch (Exception e) {
+                }
+            }
+        }
+    }
+
+    @Override
+    public void update(UpdateSelector updateSelector) {
+        if (!_closed.get()) {
+            for (OSGiResult result : results) {
+                try {
+                    result.update(updateSelector);
+                } catch (Exception e) {
+                }
+            }
+        }
+    }
+
+    private final AtomicBoolean _closed = new AtomicBoolean();
+
+}
diff --git a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/AllOSGi.java b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/AllOSGi.java
index 3c3e502..aa652c6 100644
--- a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/AllOSGi.java
+++ b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/AllOSGi.java
@@ -45,7 +45,8 @@
             }
 
             return new OSGiResultImpl(
-                () -> cleanUp(results)
+                () -> cleanUp(results),
+                us -> results.forEach(result -> result.update(us))
             );
         });
     }
diff --git a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/BaseOSGiImpl.java b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/BaseOSGiImpl.java
index 1b68e2a..cb554a2 100644
--- a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/BaseOSGiImpl.java
+++ b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/BaseOSGiImpl.java
@@ -18,6 +18,8 @@
 package org.apache.aries.component.dsl.internal;
 
 import org.apache.aries.component.dsl.*;
+import org.apache.aries.component.dsl.update.UpdateQuery;
+import org.apache.aries.component.dsl.update.UpdateSelector;
 import org.osgi.framework.Filter;
 import org.osgi.framework.InvalidSyntaxException;
 
@@ -104,7 +106,7 @@
 		return new BaseOSGiImpl<>((executionContext, op) -> {
 			ConcurrentDoublyLinkedList<T> identities = new ConcurrentDoublyLinkedList<>();
 			ConcurrentDoublyLinkedList<Function<T,S>> functions = new ConcurrentDoublyLinkedList<>();
-			IdentityHashMap<T, IdentityHashMap<Function<T, S>, Runnable>>
+			IdentityHashMap<T, IdentityHashMap<Function<T, S>, OSGiResult>>
 				terminators = new IdentityHashMap<>();
 
 			OSGiResult funRun = fun.run(
@@ -114,23 +116,35 @@
 						ConcurrentDoublyLinkedList.Node node = functions.addLast(f);
 
 						for (T t : identities) {
-							IdentityHashMap<Function<T, S>, Runnable> terminatorMap =
+							IdentityHashMap<Function<T, S>, OSGiResult> terminatorMap =
 								terminators.computeIfAbsent(
 									t, __ -> new IdentityHashMap<>());
 							terminatorMap.put(f, op.apply(f.apply(t)));
 						}
 
-						return () -> {
-							synchronized (identities) {
-								node.remove();
+						return new OSGiResultImpl(
+							() -> {
+								synchronized (identities) {
+									node.remove();
 
-								identities.forEach(t -> {
-									Runnable terminator = terminators.get(t).remove(f);
+									identities.forEach(t -> {
+										Runnable terminator = terminators.get(t).remove(f);
 
-									terminator.run();
-								});
+										terminator.run();
+									});
+								}
+							},
+							us -> {
+								synchronized (identities) {
+
+									identities.forEach(t -> {
+										OSGiResult terminator = terminators.get(t).get(f);
+
+										terminator.update(us);
+									});
+								}
 							}
-						};
+						);
 					}
 				}
 			));
@@ -142,32 +156,39 @@
 						ConcurrentDoublyLinkedList.Node node = identities.addLast(t);
 
 						for (Function<T, S> f : functions) {
-							IdentityHashMap<Function<T, S>, Runnable> terminatorMap =
+							IdentityHashMap<Function<T, S>, OSGiResult> terminatorMap =
 								terminators.computeIfAbsent(
 									t, __ -> new IdentityHashMap<>());
 							terminatorMap.put(f, op.apply(f.apply(t)));
 						}
 
-						return () -> {
-							synchronized (identities) {
-								node.remove();
+						return new OSGiResultImpl(
+							() -> {
+								synchronized (identities) {
+									node.remove();
 
-								functions.forEach(f -> {
-									Runnable terminator = terminators.get(t).remove(f);
+									functions.forEach(f -> {
+										Runnable terminator = terminators.get(t).remove(f);
 
-									terminator.run();
-								});
+										terminator.run();
+									});
+								}
+							},
+							us -> {
+								synchronized (identities) {
+									functions.forEach(f -> {
+										OSGiResult terminator = terminators.get(t).get(f);
+
+										terminator.update(us);
+									});
+								}
 							}
-						};
+						);
 					}
 				})
 			);
 
-			return () -> {
-				myRun.close();
-
-				funRun.close();
-			};
+			return new AggregateOSGiResult(myRun, funRun);
 		});
 	}
 
@@ -192,11 +213,7 @@
                         }
                     }
                 )));
-			return () -> {
-				thenPad.close();
-				elsePad.close();
-				result.close();
-			};
+			return new AggregateOSGiResult(thenPad, elsePad, result);
 		});
 	}
 
@@ -211,6 +228,15 @@
 		Consumer<? super T> onRemovedBefore,
 		Consumer<? super T> onRemovedAfter) {
 
+		return effects(onAddedBefore, onAddedAfter, onRemovedBefore, onRemovedAfter, UpdateQuery.onUpdate());
+	}
+
+	public OSGi<T> effects(
+		Consumer<? super T> onAddedBefore, Consumer<? super T> onAddedAfter,
+		Consumer<? super T> onRemovedBefore,
+		Consumer<? super T> onRemovedAfter,
+		UpdateQuery<T> updateQuery) {
+
 		//TODO: logging
 		//TODO: logging
 		//TODO: logging
@@ -222,9 +248,9 @@
 					onAddedBefore.accept(t);
 
 					try {
-						Runnable terminator = op.publish(t);
+						OSGiResult terminator = op.publish(t);
 
-						OSGiResult result = () -> {
+						OSGiResult result = new OSGiResultImpl(() -> {
 							try {
 								onRemovedBefore.accept(t);
 							}
@@ -245,7 +271,19 @@
 							catch (Exception e) {
 								//TODO: logging
 							}
-						};
+						},
+							us -> {
+								UpdateQuery.From<T>[] froms = updateQuery.froms;
+
+								for (UpdateQuery.From<T> from : froms) {
+									if (from.selector == us || from.selector == UpdateSelector.ALL) {
+										from.consumer.accept(t);
+									}
+								}
+
+								terminator.update(us);
+							}
+						);
 
 						try {
 							onAddedAfter.accept(t);
@@ -353,11 +391,18 @@
 				)
 			));
 
-			return () -> {
-				pads.values().forEach(Pad::close);
+			return new OSGiResultImpl(
+				() -> {
+					pads.values().forEach(Pad::close);
 
-				result.close();
-			};
+					result.close();
+				},
+				us -> {
+					pads.values().forEach(pad -> pad.update(us));
+
+					result.close();
+				}
+			);
 		});
 	}
 
diff --git a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/BundleOSGi.java b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/BundleOSGi.java
index 48de45f..2b5ca5c 100644
--- a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/BundleOSGi.java
+++ b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/BundleOSGi.java
@@ -17,6 +17,8 @@
 
 package org.apache.aries.component.dsl.internal;
 
+import org.apache.aries.component.dsl.OSGiResult;
+import org.apache.aries.component.dsl.update.UpdateSelector;
 import org.osgi.framework.Bundle;
 import org.osgi.framework.BundleEvent;
 import org.osgi.util.tracker.BundleTracker;
@@ -29,13 +31,15 @@
 
 	public BundleOSGi(int stateMask) {
 		super((executionContext, op) -> {
-			BundleTracker<Runnable> bundleTracker =
+			UpdateSelector updateSelector = new UpdateSelector() {};
+
+			BundleTracker<OSGiResult> bundleTracker =
 				new BundleTracker<>(
 					executionContext.getBundleContext(), stateMask,
-					new BundleTrackerCustomizer<Runnable>() {
+					new BundleTrackerCustomizer<OSGiResult>() {
 
 						@Override
-						public Runnable addingBundle(
+						public OSGiResult addingBundle(
 							Bundle bundle, BundleEvent bundleEvent) {
 
 							return op.apply(bundle);
@@ -44,22 +48,26 @@
 						@Override
 						public void modifiedBundle(
 							Bundle bundle, BundleEvent bundleEvent,
-							Runnable runnable) {
+							OSGiResult osgiResult) {
 
+							osgiResult.update(updateSelector);
 						}
 
 						@Override
 						public void removedBundle(
 							Bundle bundle, BundleEvent bundleEvent,
-							Runnable runnable) {
+							OSGiResult osgiResult) {
 
-							runnable.run();
+							osgiResult.run();
 						}
 					});
 
 			bundleTracker.open();
 
-			return new OSGiResultImpl(bundleTracker::close);
+			return new OSGiResultImpl(
+				bundleTracker::close,
+				us -> bundleTracker.getTracked().values().forEach(result -> result.update(us))
+			);
 		});
 
 	}
diff --git a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/CoalesceOSGiImpl.java b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/CoalesceOSGiImpl.java
index 2c44cbf..3aff2be 100644
--- a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/CoalesceOSGiImpl.java
+++ b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/CoalesceOSGiImpl.java
@@ -70,7 +70,7 @@
                             () -> result.set(op.publish(t)));
                     }
 
-                    return () -> UpdateSupport.deferTermination(() -> {
+                    return new OSGiResultImpl(() -> UpdateSupport.deferTermination(() -> {
                         synchronized (initialized) {
                             result.get().close();
 
@@ -93,6 +93,11 @@
                                 }
                             }
                         }
+                    }),
+                    us -> {
+                        synchronized (initialized) {
+                            result.get().update(us);
+                        }
                     });
                 };
             }
@@ -123,6 +128,13 @@
                             results[i].close();
                         }
                     }
+                },
+                us -> {
+                    synchronized (initialized) {
+                        for (int i = 0; i <= index.get(); i++) {
+                            results[i].update(us);
+                        }
+                    }
                 }
             );
         });
diff --git a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/ConfigurationOSGiImpl.java b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/ConfigurationOSGiImpl.java
index 817c617..26cefbe 100644
--- a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/ConfigurationOSGiImpl.java
+++ b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/ConfigurationOSGiImpl.java
@@ -17,6 +17,7 @@
 
 package org.apache.aries.component.dsl.internal;
 
+import org.apache.aries.component.dsl.OSGiResult;
 import org.osgi.framework.BundleContext;
 import org.osgi.framework.ServiceReference;
 import org.osgi.framework.ServiceRegistration;
@@ -42,8 +43,9 @@
 			AtomicReference<Configuration> atomicReference =
 				new AtomicReference<>(null);
 
-			AtomicReference<Runnable>
-				terminatorAtomicReference = new AtomicReference<>(() -> {});
+			AtomicReference<OSGiResult>
+				terminatorAtomicReference = new AtomicReference<>(
+					new OSGiResultImpl(NOOP, __ -> {}));
 
 			AtomicBoolean closed = new AtomicBoolean();
 
@@ -138,7 +140,9 @@
 					serviceRegistration.unregister();
 
 					signalLeave(terminatorAtomicReference);
-				});
+				},
+				us -> terminatorAtomicReference.get().update(us))
+			;
 		});
 	}
 
@@ -180,9 +184,9 @@
 	}
 
 	private static void signalLeave(
-		AtomicReference<Runnable> terminatorAtomicReference) {
+		AtomicReference<OSGiResult> terminatorAtomicReference) {
 
-		Runnable old = terminatorAtomicReference.getAndSet(null);
+		OSGiResult old = terminatorAtomicReference.getAndSet(null);
 
 		if (old != null) {
             old.run();
diff --git a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/ConfigurationsOSGiImpl.java b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/ConfigurationsOSGiImpl.java
index 2991daa..fe16e10 100644
--- a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/ConfigurationsOSGiImpl.java
+++ b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/ConfigurationsOSGiImpl.java
@@ -17,6 +17,7 @@
 
 package org.apache.aries.component.dsl.internal;
 
+import org.apache.aries.component.dsl.OSGiResult;
 import org.osgi.framework.BundleContext;
 import org.osgi.framework.ServiceReference;
 import org.osgi.framework.ServiceRegistration;
@@ -42,7 +43,7 @@
 			ConcurrentHashMap<String, Configuration> configurations =
 				new ConcurrentHashMap<>();
 
-			ConcurrentHashMap<String, Runnable> terminators =
+			ConcurrentHashMap<String, OSGiResult> terminators =
 				new ConcurrentHashMap<>();
 
 			AtomicBoolean closed = new AtomicBoolean();
@@ -148,6 +149,13 @@
 							runnable.run();
 						}
 					}
+				},
+				us -> {
+					for (OSGiResult osgiResult : terminators.values()) {
+						if (osgiResult != null) {
+							osgiResult.run();
+						}
+					}
 				});
 		});
 	}
@@ -218,12 +226,12 @@
 	}
 
 	private static void signalLeave(
-		String factoryPid, ConcurrentHashMap<String, Runnable> terminators) {
+		String factoryPid, ConcurrentHashMap<String, OSGiResult> terminators) {
 
-		Runnable runnable = terminators.remove(factoryPid);
+		OSGiResult osgiResult = terminators.remove(factoryPid);
 
-		if (runnable != null) {
-			runnable.run();
+		if (osgiResult != null) {
+			osgiResult.run();
 		}
 	}
 
diff --git a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/DistributeOSGiImpl.java b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/DistributeOSGiImpl.java
index ee5f800..5f21e76 100644
--- a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/DistributeOSGiImpl.java
+++ b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/DistributeOSGiImpl.java
@@ -44,7 +44,7 @@
             OSGiResult result = operation.run(
                 executionContext,
                 publisher.pipe(t -> {
-                    List<Runnable> terminators = new ArrayList<>(funs.length);
+                    List<OSGiResult> terminators = new ArrayList<>(funs.length);
 
                     int i = 0;
 
@@ -59,25 +59,18 @@
                         throw e;
                     }
 
-                    return () -> cleanUp(terminators);
+                    return new OSGiResultImpl(
+                        () -> cleanUp(terminators),
+                        us -> terminators.forEach(os -> os.update(us))
+                    );
                 }));
 
-            return () -> {
-                result.close();
-
-                for (Pad<T, S> pad : pads) {
-                    try {
-                        pad.close();
-                    }
-                    catch (Exception e) {
-                    }
-                }
-            };
+            return new AggregateOSGiResult(result, new AggregateOSGiResult(pads));
         });
     }
 
-    private static void cleanUp(List<Runnable> terminators) {
-        ListIterator<Runnable> iterator =
+    private static void cleanUp(List<OSGiResult> terminators) {
+        ListIterator<OSGiResult> iterator =
             terminators.listIterator(terminators.size());
 
         while (iterator.hasPrevious()) {
diff --git a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/EffectsOSGi.java b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/EffectsOSGi.java
index 8f9f910..932f29c 100644
--- a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/EffectsOSGi.java
+++ b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/EffectsOSGi.java
@@ -18,6 +18,8 @@
 package org.apache.aries.component.dsl.internal;
 
 import org.apache.aries.component.dsl.OSGiResult;
+import org.apache.aries.component.dsl.update.UpdateQuery;
+import org.apache.aries.component.dsl.update.UpdateSelector;
 
 /**
  * @author Carlos Sierra Andrés
@@ -26,36 +28,49 @@
 
     public EffectsOSGi(
         Runnable onAddingBefore, Runnable onAddingAfter,
-        Runnable onRemovingBefore, Runnable onRemovingAfter) {
+        Runnable onRemovingBefore, Runnable onRemovingAfter, UpdateQuery<Void> updateQuery) {
 
         super((executionContext, op) -> {
             onAddingBefore.run();
 
             try {
-                Runnable terminator = op.publish(null);
+                OSGiResult terminator = op.publish(null);
 
-                OSGiResult result = () -> {
-                    try {
-                        onRemovingBefore.run();
-                    }
-                    catch (Exception e) {
-                        //TODO: logging
-                    }
+                OSGiResult result = new OSGiResultImpl(
+                    () -> {
+                        try {
+                            onRemovingBefore.run();
+                        }
+                        catch (Exception e) {
+                            //TODO: logging
+                        }
 
-                    try {
-                        terminator.run();
-                    }
-                    catch (Exception e) {
-                        //TODO: logging
-                    }
+                        try {
+                            terminator.run();
+                        }
+                        catch (Exception e) {
+                            //TODO: logging
+                        }
 
-                    try {
-                        onRemovingAfter.run();
+                        try {
+                            onRemovingAfter.run();
+                        }
+                        catch (Exception e) {
+                            //TODO: logging
+                        }
+                    },
+                    us -> {
+                        UpdateQuery.From<Void>[] froms = updateQuery.froms;
+
+                        for (UpdateQuery.From<Void> from : froms) {
+                            if (from.selector == us || from.selector == UpdateSelector.ALL) {
+                                from.consumer.accept(null);
+                            }
+                        }
+
+                        terminator.update(us);
                     }
-                    catch (Exception e) {
-                        //TODO: logging
-                    }
-                };
+                );
 
                 try {
                     onAddingAfter.run();
@@ -66,7 +81,7 @@
                     throw e;
                 }
 
-                return new OSGiResultImpl(result);
+                return result;
             }
             catch (Exception e) {
                 try {
diff --git a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/HighestRankingOSGi.java b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/HighestRankingOSGi.java
index 99cd1dc..e9dc9fb 100644
--- a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/HighestRankingOSGi.java
+++ b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/HighestRankingOSGi.java
@@ -56,64 +56,68 @@
                             Tuple<T> old = sent.get();
 
                             if (old != null) {
-                                old._runnable.run();
+                                old.osgiResult.run();
                             }
 
-                            tuple._runnable = publisher.apply(t);
+                            tuple.osgiResult = publisher.apply(t);
 
                             if (old != null) {
-                                old._runnable = notHighestPad.publish(old._t);
+                                old.osgiResult = notHighestPad.publish(old.t);
                             }
 
                             sent.set(tuple);
                         } else {
-                            tuple._runnable = notHighestPad.publish(t);
+                            tuple.osgiResult = notHighestPad.publish(t);
                         }
                     }
 
-                    return () -> {
-                        synchronized (set) {
-                            Tuple<T> old = set.peek();
+                    return new OSGiResultImpl(
+                        () -> {
+                            synchronized (set) {
+                                Tuple<T> old = set.peek();
 
-                            set.remove(tuple);
+                                set.remove(tuple);
 
-                            Tuple<T> current = set.peek();
+                                Tuple<T> current = set.peek();
 
-                            tuple._runnable.run();
+                                tuple.osgiResult.run();
 
-                            if (current != old && current != null) {
-                                current._runnable.run();
-                                current._runnable = publisher.apply(
-                                    current._t);
-                                sent.set(current);
+                                if (current != old && current != null) {
+                                    current.osgiResult.run();
+                                    current.osgiResult = publisher.apply(
+                                        current.t);
+                                    sent.set(current);
+                                }
+                                if (current == null) {
+                                    sent.set(null);
+                                }
                             }
-                            if (current == null) {
-                                sent.set(null);
+                        },
+                        us -> {
+                            synchronized (set) {
+                                Tuple<T> current = set.peek();
+
+                                current.osgiResult.update(us);
                             }
                         }
-                    };
+                    );
                 }));
 
-            return new OSGiResultImpl(
-                () -> {
-                    result.close();
-
-                    notHighestPad.close();
-                });
+            return new AggregateOSGiResult(result, notHighestPad);
         });
     }
 
     private static class Tuple<T> {
 
         Tuple(T t) {
-            _t = t;
+            this.t = t;
         }
 
         public T getT() {
-            return _t;
+            return t;
         }
-        T _t;
-        Runnable _runnable;
+        T t;
+        OSGiResult osgiResult;
 
     }
 
diff --git a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/JustOSGiImpl.java b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/JustOSGiImpl.java
index 047e9bf..8b4dae7 100644
--- a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/JustOSGiImpl.java
+++ b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/JustOSGiImpl.java
@@ -52,7 +52,9 @@
 			}
 
 			return new OSGiResultImpl(
-				() -> cleanUp(references));
+				() -> cleanUp(references),
+				us -> {}
+			);
 		});
 	}
 
diff --git a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/NothingOSGiImpl.java b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/NothingOSGiImpl.java
index 54aa504..5c2dfd3 100644
--- a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/NothingOSGiImpl.java
+++ b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/NothingOSGiImpl.java
@@ -25,6 +25,6 @@
 public class NothingOSGiImpl<S> extends OSGiImpl<S> {
 
 	public NothingOSGiImpl() {
-		super((executionContext, __) -> new OSGiResultImpl(OSGi.NOOP));
+		super((executionContext, __) -> new OSGiResultImpl(OSGi.NOOP,  ___ -> {}));
 	}
 }
diff --git a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/OSGiResultImpl.java b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/OSGiResultImpl.java
index 8851c66..bce94c3 100644
--- a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/OSGiResultImpl.java
+++ b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/OSGiResultImpl.java
@@ -18,16 +18,19 @@
 package org.apache.aries.component.dsl.internal;
 
 import org.apache.aries.component.dsl.OSGiResult;
+import org.apache.aries.component.dsl.update.UpdateSelector;
 
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
 
 /**
  * @author Carlos Sierra Andrés
  */
 public class OSGiResultImpl implements OSGiResult {
 
-	public OSGiResultImpl(OSGiResult close) {
+	public OSGiResultImpl(Runnable close, Consumer<UpdateSelector> onUpdate) {
 		this.close = close;
+		this.onUpdate = onUpdate;
 	}
 
 	@Override
@@ -37,7 +40,18 @@
 		}
 	}
 
+	@Override
+	public void update(UpdateSelector updateSelector) {
+		if (_closed.get()) {
+			return;
+		}
+
+		onUpdate.accept(updateSelector);
+	}
+
 	private final Runnable close;
+	private Consumer<UpdateSelector> onUpdate;
+	private Runnable update;
 	private AtomicBoolean _closed = new AtomicBoolean();
 
 }
diff --git a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/OnlyLastPublisher.java b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/OnlyLastPublisher.java
index 50bf7f4..cd316da 100644
--- a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/OnlyLastPublisher.java
+++ b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/OnlyLastPublisher.java
@@ -44,7 +44,7 @@
     private final Publisher<? super T> _op;
     private AtomicLong _counter = new AtomicLong();
     private Supplier<T> _injectOnLeave;
-    private Runnable _terminator;
+    private OSGiResult _terminator;
 
     @Override
     public synchronized OSGiResult publish(T t) {
@@ -58,16 +58,24 @@
         else {
             _counter.incrementAndGet();
 
-            return () -> {
-                synchronized (this) {
-                    _terminator.run();
+            return new OSGiResultImpl(
+                () -> {
+                    synchronized (this) {
+                        _terminator.run();
 
-                    if (_counter.decrementAndGet() > 0) {
-                        _terminator = _op.publish(_injectOnLeave.get());
+                        if (_counter.decrementAndGet() > 0) {
+                            _terminator = _op.publish(_injectOnLeave.get());
+                        }
                     }
-                }
-            };
+                },
+                us -> _terminator.update(us)
+            );
         }
     }
 
+    @Override
+    public <E extends Exception> OSGiResult error(T t, Exception e) throws E {
+        return _op.error(t, e);
+    }
+
 }
diff --git a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/Pad.java b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/Pad.java
index d9615a7..fd6f305 100644
--- a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/Pad.java
+++ b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/Pad.java
@@ -21,8 +21,8 @@
 import org.apache.aries.component.dsl.OSGiResult;
 import org.apache.aries.component.dsl.OSGiRunnable.ExecutionContext;
 import org.apache.aries.component.dsl.Publisher;
+import org.apache.aries.component.dsl.update.UpdateSelector;
 
-import java.io.Closeable;
 import java.util.function.Function;
 
 import static org.apache.aries.component.dsl.OSGi.NOOP;
@@ -30,7 +30,7 @@
 /**
  * @author Carlos Sierra Andrés
  */
-public class Pad<T, S> implements Publisher<T>, Closeable {
+public class Pad<T, S> implements Publisher<T>, OSGiResult {
 
     public Pad(
         ExecutionContext bundleContext,
@@ -56,6 +56,11 @@
     }
 
     @Override
+    public void update(UpdateSelector updateSelector) {
+        _result.update(updateSelector);
+    }
+
+    @Override
     public OSGiResult publish(T t) {
         return _publisher.publish(t);
     }
diff --git a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/ProbeImpl.java b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/ProbeImpl.java
index 1ee95ab..b825dc3 100644
--- a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/ProbeImpl.java
+++ b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/ProbeImpl.java
@@ -36,7 +36,7 @@
 
     private static class ProbeOperationImpl<T> implements OSGiRunnable<T> {
 
-        private OSGiResult _onClose = NOOP;
+        private volatile OSGiResult _onClose = NOOP;
 
         @Override
         public OSGiResultImpl run(
@@ -44,7 +44,9 @@
             _op = op;
 
             return new OSGiResultImpl(
-                () -> {_onClose.close(); _onClose = NOOP;});
+                () -> {_onClose.close(); _onClose = NOOP;},
+                us -> _onClose.update(us)
+            );
         }
 
         Publisher<? super T> _op;
diff --git a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/ServiceReferenceOSGi.java b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/ServiceReferenceOSGi.java
index 37ffe30..a5ac0d2 100644
--- a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/ServiceReferenceOSGi.java
+++ b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/ServiceReferenceOSGi.java
@@ -17,6 +17,7 @@
 
 package org.apache.aries.component.dsl.internal;
 
+import org.apache.aries.component.dsl.OSGiResult;
 import org.apache.aries.component.dsl.Refresher;
 import org.apache.aries.component.dsl.CachingServiceReference;
 import org.apache.aries.component.dsl.Publisher;
@@ -41,7 +42,7 @@
 		Refresher<? super CachingServiceReference<T>> refresher) {
 
 		super((executionContext, op) -> {
-			ServiceTracker<T, ?>
+			ServiceTracker<T, Tracked<T>>
 				serviceTracker = new ServiceTracker<>(
 					executionContext.getBundleContext(),
 					buildFilter(executionContext, filterString, clazz),
@@ -49,7 +50,11 @@
 
 			serviceTracker.open();
 
-			return new OSGiResultImpl(serviceTracker::close);
+			return new OSGiResultImpl(
+				serviceTracker::close,
+				us -> serviceTracker.getTracked().forEach(
+					(__, tracked) -> tracked.runnable.update(us))
+			);
 		});
 	}
 
@@ -105,14 +110,14 @@
 
 		public Tracked(
 			CachingServiceReference<T> cachingServiceReference,
-			Runnable runnable) {
+			OSGiResult osgiResult) {
 
 			this.cachingServiceReference = cachingServiceReference;
-			this.runnable = runnable;
+			this.runnable = osgiResult;
 		}
 
 		volatile CachingServiceReference<T> cachingServiceReference;
-		volatile Runnable runnable;
+		volatile OSGiResult runnable;
 
 	}
 }
diff --git a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/ServiceRegistrationOSGiImpl.java b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/ServiceRegistrationOSGiImpl.java
index df5cb23..d4479e8 100644
--- a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/ServiceRegistrationOSGiImpl.java
+++ b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/ServiceRegistrationOSGiImpl.java
@@ -17,6 +17,7 @@
 
 package org.apache.aries.component.dsl.internal;
 
+import org.apache.aries.component.dsl.OSGiResult;
 import org.apache.aries.component.dsl.Publisher;
 import org.osgi.framework.ServiceFactory;
 import org.osgi.framework.ServiceRegistration;
@@ -86,7 +87,7 @@
 		ServiceRegistration<?> serviceRegistration,
 		Publisher<? super ServiceRegistration<T>> op) {
 
-		Runnable terminator = ((Publisher)op).publish(serviceRegistration);
+		OSGiResult terminator = ((Publisher)op).publish(serviceRegistration);
 
 		return new OSGiResultImpl(
             () -> {
@@ -98,7 +99,9 @@
                 finally {
                     terminator.run();
                 }
-            });
+            },
+			terminator::update
+		);
 	}
 
 }
diff --git a/component-dsl/src/main/java/org/apache/aries/component/dsl/update/UpdateQuery.java b/component-dsl/src/main/java/org/apache/aries/component/dsl/update/UpdateQuery.java
new file mode 100644
index 0000000..d6e3870
--- /dev/null
+++ b/component-dsl/src/main/java/org/apache/aries/component/dsl/update/UpdateQuery.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.aries.component.dsl.update;
+
+import java.util.function.Consumer;
+
+/**
+ * @author Carlos Sierra Andrés
+ */
+public final class UpdateQuery<T> {
+    public final From<T>[] froms;
+
+    @SafeVarargs
+    public UpdateQuery(From<T>... froms) {
+        this.froms = froms;
+    }
+
+    @SafeVarargs
+    public static <T> UpdateQuery<T> onUpdate(From<T> ... froms) {
+        return new UpdateQuery<>(froms);
+    }
+
+    public static class From<T> {
+        public final UpdateSelector selector;
+        public final Consumer<T> consumer;
+
+        public From(UpdateSelector selector, Consumer<T> consumer) {
+            this.selector = selector;
+            this.consumer = consumer;
+        }
+
+        public static <T> From<T> from(UpdateSelector selector, Consumer<T> consumer) {
+            return new From<>(selector, consumer);
+        }
+    }
+}
diff --git a/component-dsl/src/main/java/org/apache/aries/component/dsl/update/UpdateSelector.java b/component-dsl/src/main/java/org/apache/aries/component/dsl/update/UpdateSelector.java
new file mode 100644
index 0000000..2478759
--- /dev/null
+++ b/component-dsl/src/main/java/org/apache/aries/component/dsl/update/UpdateSelector.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.aries.component.dsl.update;
+
+public interface UpdateSelector {
+
+    public static final UpdateSelector ALL = new UpdateSelector() {};
+}
diff --git a/component-dsl/src/main/java/org/apache/aries/component/dsl/update/package-info.java b/component-dsl/src/main/java/org/apache/aries/component/dsl/update/package-info.java
new file mode 100644
index 0000000..4a45062
--- /dev/null
+++ b/component-dsl/src/main/java/org/apache/aries/component/dsl/update/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+@org.osgi.annotation.bundle.Export
+@org.osgi.annotation.versioning.Version("1.0.0")
+package org.apache.aries.component.dsl.update;
diff --git a/itests-run/itest.bndrun b/itests-run/itest.bndrun
index 930f56e..a200c08 100644
--- a/itests-run/itest.bndrun
+++ b/itests-run/itest.bndrun
@@ -32,6 +32,7 @@
 	org.apache.servicemix.bundles.junit;version='[4.12.0,4.12.1)',\
 	org.apache.felix.configadmin;version='[1.9.14,1.9.15)',\
 	org.osgi.service.cm;version='[1.6.0,1.6.1)',\
-	org.apache.aries.component-dsl.itests;version='[2.0.0,2.0.1)'
+	org.apache.aries.component-dsl.itests;version='[2.0.0,2.0.1)',\
+	org.apache.aries.component-dsl.component-dsl;version='[2.0.0,2.0.1)'
 
 -include: -personal.bnd