add support for `skip_initial_run` on workflow policies and sensors
diff --git a/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/WorkflowYamlTest.java b/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/WorkflowYamlTest.java
index fd9b157..7779fc6 100644
--- a/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/WorkflowYamlTest.java
+++ b/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/WorkflowYamlTest.java
@@ -1068,23 +1068,35 @@
         EntityAsserts.assertAttributeEqualsEventually(entity, Sensors.newIntegerSensor("x"), 7);
     }
 
-    @Test
-    public void testAddPolicyStep() throws Exception {
-        Entity app = createAndStartApplication(
-                "services:",
-                "- type: " + BasicEntity.class.getName());
-        Entity entity = Iterables.getOnlyElement(app.getChildren());
+    private void addSetXPolicy(Entity entity, String value, boolean ignoreTrigger, boolean skipInitial) {
         WorkflowExecutionContext x = WorkflowBasicTest.runWorkflow(entity, Strings.lines(
                 "steps:",
                 "  - type: add-policy",
                 "    blueprint:",
                 "      type: workflow-policy",
                 "      brooklyn.config:",
-                "        triggers: [ other_sensor ]",
-                "        steps: [ set-sensor integer x = 1 ]"
+                "        triggers: [ other_sensor" + (ignoreTrigger ? "_ignored" : "") + " ]",
+                "        " + (skipInitial ? "skip_initial_run: true" : ""),
+                "        steps: [ set-sensor integer x = "+value+" ]"
         ), "add-policy");
         x.getTask(false).get().getUnchecked();
+    }
+
+    @Test
+    public void testAddPolicyStep() throws Exception {
+        Entity app = createAndStartApplication(
+                "services:",
+                "- type: " + BasicEntity.class.getName());
+        Entity entity = Iterables.getOnlyElement(app.getChildren());
+        addSetXPolicy(entity, "1", false, false);
         EntityAsserts.assertAttributeEqualsEventually(entity, Sensors.newIntegerSensor("x"), 1);
+        addSetXPolicy(entity, "2", true, false);  // runs initially
+        EntityAsserts.assertAttributeEqualsEventually(entity, Sensors.newIntegerSensor("x"), 2);
+        addSetXPolicy(entity, "3", false, true);  // does not run initially
+        // Time.sleep(Duration.millis(250));  // uncomment this to really test it
+        EntityAsserts.assertAttribute(entity, Sensors.newIntegerSensor("x"), v -> !new Integer(3).equals(v));
+        entity.sensors().set(Sensors.newStringSensor("other_sensor"), "go"); // now it will run
+        EntityAsserts.assertAttributeEqualsEventually(entity, Sensors.newIntegerSensor("x"), 3);
     }
 
     @Test
diff --git a/core/src/main/java/org/apache/brooklyn/core/feed/PollConfig.java b/core/src/main/java/org/apache/brooklyn/core/feed/PollConfig.java
index 983ee71..3358162 100644
--- a/core/src/main/java/org/apache/brooklyn/core/feed/PollConfig.java
+++ b/core/src/main/java/org/apache/brooklyn/core/feed/PollConfig.java
@@ -35,6 +35,7 @@
  */
 public class PollConfig<V, T, F extends PollConfig<V, T, F>> extends FeedConfig<V, T, F> {
 
+    private Boolean skipInitialRun = null;  // null default is false
     private long period = -1;
     private Object otherTriggers;
     private String description;
@@ -52,6 +53,9 @@
         this.description = other.description;
     }
 
+    public Boolean getSkipInitialRun() { return skipInitialRun; }
+    public F skipInitialRun(Boolean val) { this.skipInitialRun = val; return self(); }
+
     public long getPeriod() {
         return period;
     }
diff --git a/core/src/main/java/org/apache/brooklyn/core/feed/Poller.java b/core/src/main/java/org/apache/brooklyn/core/feed/Poller.java
index 440228b..57f7098 100644
--- a/core/src/main/java/org/apache/brooklyn/core/feed/Poller.java
+++ b/core/src/main/java/org/apache/brooklyn/core/feed/Poller.java
@@ -26,7 +26,12 @@
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
-import com.google.common.collect.*;
+import com.google.common.base.MoreObjects;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multimaps;
+import com.google.common.collect.SetMultimap;
+import com.google.common.collect.Sets;
 import org.apache.brooklyn.api.entity.Entity;
 import org.apache.brooklyn.api.mgmt.SubscriptionHandle;
 import org.apache.brooklyn.api.mgmt.Task;
@@ -51,8 +56,6 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.MoreObjects;
-
 
 /** 
  * For executing periodic polls.
@@ -91,6 +94,7 @@
     public void schedulePoll(AbstractEntityAdjunct feed, Set<? extends PollConfig> pollConfigs, Callable pollCallable, PollHandler pollHandler) {
         boolean subscribed = false;
         long minPeriodMillis = Long.MAX_VALUE;
+        boolean overallSkipInitialRun = false;
         Set<Supplier<DslPredicates.DslPredicate>> conditions = MutableSet.of();
 
         for (PollConfig pc: pollConfigs) {
@@ -108,9 +112,10 @@
             }
 
             for (Pair<Entity, Sensor> pair : triggersResolved) {
-                subscribe(pollCallable, pollHandler, pair.getLeft(), pair.getRight(), pc.getCondition());
+                subscribe(pollCallable, pollHandler, pair.getLeft(), pair.getRight(), Boolean.TRUE.equals(pc.getSkipInitialRun()), pc.getCondition());
                 subscribed = true;
             }
+            overallSkipInitialRun |= Boolean.TRUE.equals(pc.getSkipInitialRun());
         }
 
         if (minPeriodMillis >0 && (minPeriodMillis < Duration.PRACTICALLY_FOREVER.toMilliseconds() || !subscribed)) {
@@ -124,13 +129,14 @@
                     return aggregate;
                 };
             }
-            scheduleAtFixedRate(pollCallable, pollHandler, Duration.millis(minPeriodMillis), condition);
+            scheduleAtFixedRate(pollCallable, pollHandler, Duration.millis(minPeriodMillis), overallSkipInitialRun, condition);
         }
     }
 
     private static class PollJob<V> {
         final PollHandler<? super V> handler;
         final Duration pollPeriod;
+        boolean skipInitialRun = false;
         final Callable<?> job;
         final Runnable wrappedJob;
         final Entity pollTriggerEntity;
@@ -140,14 +146,15 @@
         private boolean loggedPreviousException = false;
 
         PollJob(final Callable<V> job, final PollHandler<? super V> handler, Duration period) {
-            this(job, handler, period, null, null, null);
+            this(job, handler, period, null, null, false, null);
         }
 
-        PollJob(final Callable<V> job, final PollHandler<? super V> handler, Duration period, Entity sensorSource, Sensor<?> sensor, Supplier<DslPredicates.DslPredicate> pollCondition) {
+        PollJob(final Callable<V> job, final PollHandler<? super V> handler, Duration period, Entity sensorSource, Sensor<?> sensor, boolean skipInitialRun, Supplier<DslPredicates.DslPredicate> pollCondition) {
             this.handler = handler;
             this.pollPeriod = period;
             this.pollTriggerEntity = sensorSource;
             this.pollTriggerSensor = sensor;
+            this.skipInitialRun = skipInitialRun;
             this.pollCondition = pollCondition;
             this.job = job;
             wrappedJob = new Runnable() {
@@ -200,21 +207,27 @@
     }
 
     public void scheduleAtFixedRate(Callable<V> job, PollHandler<? super V> handler, long periodMillis) {
-        scheduleAtFixedRate(job, handler, Duration.millis(periodMillis), null);
+        scheduleAtFixedRate(job, handler, Duration.millis(periodMillis), false, null);
     }
     public void scheduleAtFixedRate(Callable<V> job, PollHandler<? super V> handler, Duration period) {
-        scheduleAtFixedRate(job, handler, period, null);
+        scheduleAtFixedRate(job, handler, period, false, null);
     }
     public void scheduleAtFixedRate(Callable<V> job, PollHandler<? super V> handler, Duration period, Supplier<DslPredicates.DslPredicate> pollCondition) {
+        scheduleAtFixedRate(job, handler, period, false, pollCondition);
+    }
+    public void scheduleAtFixedRate(Callable<V> job, PollHandler<? super V> handler, Duration period, boolean skipInitialRun, Supplier<DslPredicates.DslPredicate> pollCondition) {
         if (started) {
             throw new IllegalStateException("Cannot schedule additional tasks after poller has started");
         }
-        PollJob<V> foo = new PollJob<V>(job, handler, period, null, null, pollCondition);
+        PollJob<V> foo = new PollJob<V>(job, handler, period, null, null, skipInitialRun, pollCondition);
         pollJobs.add(foo);
     }
 
     public void subscribe(Callable<V> job, PollHandler<? super V> handler, Entity sensorSource, Sensor<?> sensor, Supplier<DslPredicates.DslPredicate> condition) {
-        pollJobs.add(new PollJob<V>(job, handler, null, sensorSource, sensor, condition));
+        subscribe(job, handler, sensorSource, sensor, false, condition);
+    }
+    public void subscribe(Callable<V> job, PollHandler<? super V> handler, Entity sensorSource, Sensor<?> sensor, boolean skipInitialRun, Supplier<DslPredicates.DslPredicate> condition) {
+        pollJobs.add(new PollJob<V>(job, handler, null, sensorSource, sensor, skipInitialRun, condition));
     }
 
     @SuppressWarnings({ "unchecked" })
@@ -254,7 +267,7 @@
             return task;
         };
         Multimap<Callable,PollJob> nonScheduledJobs = Multimaps.newSetMultimap(MutableMap.of(), MutableSet::of);
-        pollJobs.forEach(pollJob -> nonScheduledJobs.put(pollJob.job, pollJob));
+        pollJobs.stream().filter(pj -> !pj.skipInitialRun).forEach(pollJob -> nonScheduledJobs.put(pollJob.job, pollJob));
 
         // 'runInitially' could be an option on the job; currently we always do
         // if it's a scheduled task, that happens automatically; if it's a triggered task
@@ -272,6 +285,7 @@
                 added = true;
                 tb.displayName("Periodic: " + scheduleName);
                 tb.period(pollJob.pollPeriod);
+                if (pollJob.skipInitialRun) tb.delay(pollJob.pollPeriod);
 
                 if (minPeriod==null || (pollJob.pollPeriod.isShorterThan(minPeriod))) {
                     minPeriod = pollJob.pollPeriod;
@@ -309,7 +323,7 @@
         }
 
         // no period for these, but we do need to run them initially, but combine if the Callable is the same (e.g. multiple triggers)
-        // not the PollJob is one per trigger, and the wrappedJob is specific to the poll job, but doesn't depend on the trigger, so we can just take the first
+        // note the PollJob is one per trigger, and the wrappedJob is specific to the poll job, but doesn't depend on the trigger, so we can just take the first
         nonScheduledJobs.asMap().forEach( (jobC,jobP) -> {
             Runnable job = jobP.iterator().next().wrappedJob;
             String jobSummaries = jobP.stream().map(j -> j.handler.getDescription()).filter(Strings::isNonBlank).collect(Collectors.joining(", "));
diff --git a/core/src/main/java/org/apache/brooklyn/core/sensor/AbstractAddTriggerableSensor.java b/core/src/main/java/org/apache/brooklyn/core/sensor/AbstractAddTriggerableSensor.java
index a5c2c5a..afa88dc 100644
--- a/core/src/main/java/org/apache/brooklyn/core/sensor/AbstractAddTriggerableSensor.java
+++ b/core/src/main/java/org/apache/brooklyn/core/sensor/AbstractAddTriggerableSensor.java
@@ -58,6 +58,7 @@
     public static final ConfigKey<Duration> SENSOR_PERIOD = ConfigKeys.newConfigKey(Duration.class, "period", "Period, including units e.g. 1m or 5s or 200ms", null);
     public static final ConfigKey<Object> SENSOR_TRIGGERS = ConfigKeys.newConfigKey(new TypeToken<Object>() {}, "triggers",
             "Sensors which should trigger this feed, supplied with list of maps containing sensor (name or sensor instance) and entity (ID or entity instance), or just sensor names or just one sensor");
+    public static final ConfigKey<Boolean> SKIP_INITIAL_RUN = ConfigKeys.newConfigKey(Boolean.class, "skip_initial_run", "If set, skips running when added; runs only after the period or on a subsequent trigger");
     public static final ConfigKey<DslPredicates.DslPredicate> CONDITION = ConfigKeys.newConfigKey(DslPredicates.DslPredicate.class, "condition", "Optional condition required for this sensor feed to run");
 
     public static final ConfigKey<Boolean> ONLY_IF_SERVICE_UP = ConfigKeys.newBooleanConfigKey("onlyIfServiceUp", "Whether to run only if service is up.", null);
@@ -203,6 +204,7 @@
                 .logWarningGraceTimeOnStartup(logWarningGraceTimeOnStartup)
                 .logWarningGraceTime(logWarningGraceTime)
                 .period(getPeriod(entity, initParams()))
+                .skipInitialRun(initParam(SKIP_INITIAL_RUN))
                 .otherTriggers(getTriggersMaybe(entity, configBag).orNull())
                 .condition(new ConditionSupplierFromConfigBag(configBag, entity));
 
diff --git a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowPolicy.java b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowPolicy.java
index 0204351..2baefe0 100644
--- a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowPolicy.java
+++ b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowPolicy.java
@@ -28,6 +28,7 @@
 import org.apache.brooklyn.core.feed.PollHandler;
 import org.apache.brooklyn.core.feed.Poller;
 import org.apache.brooklyn.core.policy.AbstractPolicy;
+import org.apache.brooklyn.core.sensor.AbstractAddTriggerableSensor;
 import org.apache.brooklyn.util.collections.MutableSet;
 import org.apache.brooklyn.util.core.predicates.DslPredicates;
 import org.apache.brooklyn.util.exceptions.Exceptions;
@@ -50,9 +51,9 @@
 
     private static final Logger LOG = LoggerFactory.getLogger(WorkflowPolicy.class);
 
-    public static final ConfigKey<Duration> POLICY_PERIOD = ConfigKeys.newConfigKey(Duration.class, "period", "Period, including units e.g. 1m or 5s or 200ms", null);
-    public static final ConfigKey<Object> POLICY_TRIGGERS_SENSORS = ConfigKeys.newConfigKey(new TypeToken<Object>() {}, "triggers",
-            "Sensors which should trigger this policy, supplied with list of maps containing sensor (name or sensor instance) and entity (ID or entity instance), or just sensor names or just one sensor");
+    public static final ConfigKey<Duration> POLICY_PERIOD = AbstractAddTriggerableSensor.SENSOR_PERIOD;
+    public static final ConfigKey<Object> POLICY_TRIGGERS_SENSORS =  AbstractAddTriggerableSensor.SENSOR_TRIGGERS;
+    public static final ConfigKey<Boolean> SKIP_INITIAL_RUN = AbstractAddTriggerableSensor.SKIP_INITIAL_RUN;
 
     public static final ConfigKey<DslPredicates.DslPredicate> CONDITION = ConfigKeys.newConfigKey(DslPredicates.DslPredicate.class, "condition", "Optional condition required for this sensor feed to run");
 
@@ -137,6 +138,7 @@
 
         PollConfig pc = new PollConfig( (AttributeSensor) null )
                 .period(getConfig(POLICY_PERIOD))
+                .skipInitialRun(getConfig(SKIP_INITIAL_RUN))
                 .otherTriggers(getConfig(POLICY_TRIGGERS_SENSORS))
                 .condition(new ConditionSupplierFromAdjunct());
 
diff --git a/software/base/src/test/java/org/apache/brooklyn/entity/brooklynnode/SelectMasterEffectorTest.java b/software/base/src/test/java/org/apache/brooklyn/entity/brooklynnode/SelectMasterEffectorTest.java
index f17e527..2846625 100644
--- a/software/base/src/test/java/org/apache/brooklyn/entity/brooklynnode/SelectMasterEffectorTest.java
+++ b/software/base/src/test/java/org/apache/brooklyn/entity/brooklynnode/SelectMasterEffectorTest.java
@@ -79,8 +79,7 @@
                 }
             },
             new DelegatingPollHandler<Void>(Collections.<AttributePollHandler<? super Void>>emptyList()),
-            Duration.millis(20),
-            null);
+            Duration.millis(20));
         poller.start();
     }