Allows users of PubsubIO to specify which pubsub labels are used to propagate record timestamps and record ids.

[]
-------------
Created by MOE: http://code.google.com/p/moe-java
MOE_MIGRATED_REVID=86030744
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubIO.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubIO.java
index 4a37992..2414fee 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubIO.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubIO.java
@@ -153,6 +153,10 @@
      * <li>Must end with a letter or a number.</li>
      * <li>Cannot begin with 'goog' prefix.</li>
      * </ul>
+     *
+     * Dataflow will start reading data published on this topic from the time the pipeline is
+     * started. Any data published on the topic before the pipeline is started will not be read
+     * by Dataflow.
      */
     public static Bound topic(String topic) {
       return new Bound().topic(topic);
@@ -181,6 +185,44 @@
     }
 
     /**
+     * Creates and returns a PubsubIO.Read PTransform where record timestamps are expected
+     * to be provided using the PubSub labeling API. The {@code <timestampLabel>} parameter
+     * specifies the label name. The label value sent to PubsSub is a numerical value representing
+     * the number of milliseconds since the Unix epoch. For example, if using the joda time classes,
+     * org.joda.time.Instant.getMillis() returns the correct value for this label.
+     *
+     * <p> If {@code <timestampLabel>} is not provided, the system will generate record timestamps
+     * the first time it sees each record. All windowing will be done relative to these timestamps.
+     * Windows are closed based on an estimate of when this source has finished producing data for
+     * a timestamp range, which means that late data can arrive after a window has been closed. The
+     * {#dropLateData} field allows you to control what to do with late data.
+     */
+    public static Bound timestampLabel(String timestampLabel) {
+      return new Bound().timestampLabel(timestampLabel);
+    }
+
+    /**
+     * If true, then late-arriving data from this source will be dropped.
+     */
+    public static Bound dropLateData(boolean dropLateData) {
+      return new Bound().dropLateData(dropLateData);
+    }
+
+    /**
+     * Creates and returns a PubSubIO.Read PTransform where unique record identifiers are
+     * expected to be provided using the PubSub labeling API. The {@code <idLabel>} parameter
+     * specifies the label name. The label value sent to PubSub can be any string value that
+     * uniquely identifies this record.
+     *
+     * <p> If idLabel is not provided, Dataflow cannot guarantee that no duplicate data will be
+     * delivered on the PubSub stream. In this case,  deduplication of the stream will be
+     * stricly best effort.
+     */
+    public static Bound idLabel(String idLabel) {
+      return new Bound().idLabel(idLabel);
+    }
+
+    /**
      * A PTransform that reads from a PubSub source and returns
      * a unbounded PCollection containing the items from the stream.
      */
@@ -191,10 +233,22 @@
       String topic;
       /** The Pubsub subscription to read from. */
       String subscription;
+      /** The Pubsub label to read timestamps from. */
+      String timestampLabel;
+      Boolean dropLateData;
+      /** This is set for backwards compatibility with old services. If dropLateData is not
+       * explicitly called, then we won't forward that parameter to the service. */
+      Boolean dropLateDataExplicit;
+      /** The Pubsub label to read ids from. */
+      String idLabel;
 
-      Bound() {}
+      Bound() {
+        this.dropLateData = true;
+        this.dropLateDataExplicit = false;
+      }
 
-      Bound(String name, String subscription, String topic) {
+      Bound(String name, String subscription, String topic, String timestampLabel,
+          boolean dropLateData, boolean dropLateDataExplicit, String idLabel) {
         super(name);
         if (subscription != null) {
           Validator.validateSubscriptionName(subscription);
@@ -204,18 +258,63 @@
         }
         this.subscription = subscription;
         this.topic = topic;
+        this.timestampLabel = timestampLabel;
+        this.dropLateData = dropLateData;
+        this.dropLateDataExplicit = dropLateDataExplicit;
+        this.idLabel = idLabel;
       }
 
+      /**
+       * Returns a new TextIO.Read PTransform that's like this one but with the given
+       * step name. Does not modify the object.
+       */
       public Bound named(String name) {
-        return new Bound(name, subscription, topic);
+        return new Bound(name, subscription, topic, timestampLabel, dropLateData,
+            dropLateDataExplicit, idLabel);
       }
 
+      /**
+       * Returns a new TextIO.Read PTransform that's like this one but reading from the
+       * given subscription. Does not modify the object.
+       */
       public Bound subscription(String subscription) {
-        return new Bound(name, subscription, topic);
+        return new Bound(name, subscription, topic, timestampLabel, dropLateData,
+            dropLateDataExplicit, idLabel);
       }
 
+      /**
+       * Returns a new TextIO.Read PTransform that's like this one but reading from the
+       * give topic. Does not modify the object.
+       */
       public Bound topic(String topic) {
-        return new Bound(name, subscription, topic);
+        return new Bound(name, subscription, topic, timestampLabel, dropLateData,
+            dropLateDataExplicit, idLabel);
+      }
+
+      /**
+       * Returns a new TextIO.Read PTransform that's like this one but reading timestamps
+       * from the given PubSub label. Does not modify the object.
+       */
+      public Bound timestampLabel(String timestampLabel) {
+        return new Bound(name, subscription, topic, timestampLabel, dropLateData,
+            dropLateDataExplicit, idLabel);
+      }
+
+      /**
+       * Returns a new TextIO.Read PTransform that's like this one but with the specified
+       * setting for dropLateData. Does not modify the object.
+       */
+      public Bound dropLateData(boolean dropLateData) {
+        return new Bound(name, subscription, topic, timestampLabel, dropLateData, true, idLabel);
+      }
+
+      /**
+       * Returns a new TextIO.Read PTransform that's like this one but reading unique ids
+       * from the given PubSub label. Does not modify the object.
+       */
+      public Bound idLabel(String idLabel) {
+        return new Bound(name, subscription, topic, timestampLabel, dropLateData,
+            dropLateDataExplicit, idLabel);
       }
 
       @Override
@@ -250,6 +349,22 @@
         return subscription;
       }
 
+      public String getTimestampLabel() {
+        return timestampLabel;
+      }
+
+      public boolean getDropLateData() {
+        return dropLateData;
+      }
+
+      public boolean getDropLateDataExplicit() {
+        return dropLateDataExplicit;
+      }
+
+      public String getIdLabel() {
+        return idLabel;
+      }
+
       static {
         // TODO: Figure out how to make this work under
         // DirectPipelineRunner.
@@ -279,6 +394,30 @@
     }
 
     /**
+     * If specified, Dataflow will add a Pubsub label to each output record specifying the logical
+     * timestamp of the record. {@code <timestampLabel>} determines the label name. The label value
+     * is a numerical value representing the number of milliseconds since the Unix epoch. For
+     * example, if using the joda time classes, the org.joda.time.Instant(long) constructor can be
+     * used to parse this value. If the output from this sink is being read by another Dataflow
+     * source, then PubsubIO.Read.timestampLabel can be used to ensure that the other source reads
+     * these timestamps from the appropriate label.
+     */
+    public static Bound timestampLabel(String timestampLabel) {
+      return new Bound().timestampLabel(timestampLabel);
+    }
+
+    /**
+     * If specified, Dataflow will add a Pubsub label to each output record containing a unique
+     * identifier for that record. {@code <idLabel>} determines the label name. The label value
+     * is an opaque string value. This is useful if the the output from this sink is being read
+     * by another Dataflow source, in which case PubsubIO.Read.idLabel can be used to ensure that
+     * the other source reads these ids from the appropriate label.
+     */
+    public static Bound idLabel(String idLabel) {
+      return new Bound().idLabel(idLabel);
+    }
+
+    /**
      * A PTransfrom that writes a unbounded {@code PCollection<String>}
      * to a PubSub stream.
      */
@@ -287,23 +426,51 @@
         extends PTransform<PCollection<String>, PDone> {
       /** The Pubsub topic to publish to. */
       String topic;
+      String timestampLabel;
+      String idLabel;
 
       Bound() {}
 
-      Bound(String name, String topic) {
+      Bound(String name, String topic, String timestampLabel, String idLabel) {
         super(name);
         if (topic != null) {
           Validator.validateTopicName(topic);
           this.topic = topic;
         }
+        this.timestampLabel = timestampLabel;
+        this.idLabel = idLabel;
       }
 
+      /**
+       * Returns a new TextIO.Write PTransform that's like this one but with the given step
+       * name. Does not modify the object.
+       */
       public Bound named(String name) {
-        return new Bound(name, topic);
+        return new Bound(name, topic, timestampLabel, idLabel);
       }
 
+      /**
+       * Returns a new TextIO.Write PTransform that's like this one but writing to the given
+       * topic. Does not modify the object.
+       */
       public Bound topic(String topic) {
-        return new Bound(name, topic);
+        return new Bound(name, topic, timestampLabel, idLabel);
+      }
+
+      /**
+       * Returns a new TextIO.Write PTransform that's like this one but publishing timestamps
+       * to the given PubSub label. Does not modify the object.
+       */
+      public Bound timestampLabel(String timestampLabel) {
+        return new Bound(name, topic, timestampLabel, idLabel);
+      }
+
+      /**
+       * Returns a new TextIO.Write PTransform that's like this one but publishing record ids
+       * to the given PubSub label. Does not modify the object.
+       */
+     public Bound idLabel(String idLabel) {
+        return new Bound(name, topic, timestampLabel, idLabel);
       }
 
       @Override
@@ -327,6 +494,14 @@
         return topic;
       }
 
+      public String getTimestampLabel() {
+        return timestampLabel;
+      }
+
+      public String getIdLabel() {
+        return idLabel;
+      }
+
       static {
         // TODO: Figure out how to make this work under
         // DirectPipelineRunner.
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/PubsubIOTranslator.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/PubsubIOTranslator.java
index 706397b..8b297d6 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/PubsubIOTranslator.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/PubsubIOTranslator.java
@@ -57,6 +57,15 @@
       if (transform.getSubscription() != null) {
         context.addInput(PropertyNames.PUBSUB_SUBSCRIPTION, transform.getSubscription());
       }
+      if (transform.getTimestampLabel() != null) {
+        context.addInput(PropertyNames.PUBSUB_TIMESTAMP_LABEL, transform.getTimestampLabel());
+      }
+      if (transform.getDropLateDataExplicit()) {
+        context.addInput(PropertyNames.PUBSUB_DROP_LATE_DATA, transform.getDropLateData());
+      }
+      if (transform.getIdLabel() != null) {
+        context.addInput(PropertyNames.PUBSUB_ID_LABEL, transform.getIdLabel());
+      }
       context.addValueOnlyOutput(PropertyNames.OUTPUT, transform.getOutput());
       // TODO: Orderedness?
     }
@@ -83,6 +92,12 @@
       context.addStep(transform, "ParallelWrite");
       context.addInput(PropertyNames.FORMAT, "pubsub");
       context.addInput(PropertyNames.PUBSUB_TOPIC, transform.getTopic());
+      if (transform.getTimestampLabel() != null) {
+        context.addInput(PropertyNames.PUBSUB_TIMESTAMP_LABEL, transform.getTimestampLabel());
+      }
+      if (transform.getIdLabel() != null) {
+        context.addInput(PropertyNames.PUBSUB_ID_LABEL, transform.getIdLabel());
+      }
       context.addEncodingInput(
           WindowedValue.getValueOnlyCoder(transform.getInput().getCoder()));
       context.addInput(PropertyNames.PARALLEL_INPUT, transform.getInput());
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PropertyNames.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PropertyNames.java
index 0afe5ae..a22f789 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PropertyNames.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PropertyNames.java
@@ -68,7 +68,10 @@
   public static final String OUTPUT_NAME = "output_name";
   public static final String PARALLEL_INPUT = "parallel_input";
   public static final String PHASE = "phase";
+  public static final String PUBSUB_DROP_LATE_DATA = "pubsub_drop_late_data";
+  public static final String PUBSUB_ID_LABEL = "pubsub_id_label";
   public static final String PUBSUB_SUBSCRIPTION = "pubsub_subscription";
+  public static final String PUBSUB_TIMESTAMP_LABEL = "pubsub_timestamp_label";
   public static final String PUBSUB_TOPIC = "pubsub_topic";
   public static final String SCALAR_FIELD_NAME = "value";
   public static final String SERIALIZED_FN = "serialized_fn";