Allows users of PubsubIO to specify which pubsub labels are used to propagate record timestamps and record ids.
[]
-------------
Created by MOE: http://code.google.com/p/moe-java
MOE_MIGRATED_REVID=86030744
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubIO.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubIO.java
index 4a37992..2414fee 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubIO.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubIO.java
@@ -153,6 +153,10 @@
* <li>Must end with a letter or a number.</li>
* <li>Cannot begin with 'goog' prefix.</li>
* </ul>
+ *
+ * Dataflow will start reading data published on this topic from the time the pipeline is
+ * started. Any data published on the topic before the pipeline is started will not be read
+ * by Dataflow.
*/
public static Bound topic(String topic) {
return new Bound().topic(topic);
@@ -181,6 +185,44 @@
}
/**
+ * Creates and returns a PubsubIO.Read PTransform where record timestamps are expected
+ * to be provided using the PubSub labeling API. The {@code <timestampLabel>} parameter
+ * specifies the label name. The label value sent to PubsSub is a numerical value representing
+ * the number of milliseconds since the Unix epoch. For example, if using the joda time classes,
+ * org.joda.time.Instant.getMillis() returns the correct value for this label.
+ *
+ * <p> If {@code <timestampLabel>} is not provided, the system will generate record timestamps
+ * the first time it sees each record. All windowing will be done relative to these timestamps.
+ * Windows are closed based on an estimate of when this source has finished producing data for
+ * a timestamp range, which means that late data can arrive after a window has been closed. The
+ * {#dropLateData} field allows you to control what to do with late data.
+ */
+ public static Bound timestampLabel(String timestampLabel) {
+ return new Bound().timestampLabel(timestampLabel);
+ }
+
+ /**
+ * If true, then late-arriving data from this source will be dropped.
+ */
+ public static Bound dropLateData(boolean dropLateData) {
+ return new Bound().dropLateData(dropLateData);
+ }
+
+ /**
+ * Creates and returns a PubSubIO.Read PTransform where unique record identifiers are
+ * expected to be provided using the PubSub labeling API. The {@code <idLabel>} parameter
+ * specifies the label name. The label value sent to PubSub can be any string value that
+ * uniquely identifies this record.
+ *
+ * <p> If idLabel is not provided, Dataflow cannot guarantee that no duplicate data will be
+ * delivered on the PubSub stream. In this case, deduplication of the stream will be
+ * stricly best effort.
+ */
+ public static Bound idLabel(String idLabel) {
+ return new Bound().idLabel(idLabel);
+ }
+
+ /**
* A PTransform that reads from a PubSub source and returns
* a unbounded PCollection containing the items from the stream.
*/
@@ -191,10 +233,22 @@
String topic;
/** The Pubsub subscription to read from. */
String subscription;
+ /** The Pubsub label to read timestamps from. */
+ String timestampLabel;
+ Boolean dropLateData;
+ /** This is set for backwards compatibility with old services. If dropLateData is not
+ * explicitly called, then we won't forward that parameter to the service. */
+ Boolean dropLateDataExplicit;
+ /** The Pubsub label to read ids from. */
+ String idLabel;
- Bound() {}
+ Bound() {
+ this.dropLateData = true;
+ this.dropLateDataExplicit = false;
+ }
- Bound(String name, String subscription, String topic) {
+ Bound(String name, String subscription, String topic, String timestampLabel,
+ boolean dropLateData, boolean dropLateDataExplicit, String idLabel) {
super(name);
if (subscription != null) {
Validator.validateSubscriptionName(subscription);
@@ -204,18 +258,63 @@
}
this.subscription = subscription;
this.topic = topic;
+ this.timestampLabel = timestampLabel;
+ this.dropLateData = dropLateData;
+ this.dropLateDataExplicit = dropLateDataExplicit;
+ this.idLabel = idLabel;
}
+ /**
+ * Returns a new TextIO.Read PTransform that's like this one but with the given
+ * step name. Does not modify the object.
+ */
public Bound named(String name) {
- return new Bound(name, subscription, topic);
+ return new Bound(name, subscription, topic, timestampLabel, dropLateData,
+ dropLateDataExplicit, idLabel);
}
+ /**
+ * Returns a new TextIO.Read PTransform that's like this one but reading from the
+ * given subscription. Does not modify the object.
+ */
public Bound subscription(String subscription) {
- return new Bound(name, subscription, topic);
+ return new Bound(name, subscription, topic, timestampLabel, dropLateData,
+ dropLateDataExplicit, idLabel);
}
+ /**
+ * Returns a new TextIO.Read PTransform that's like this one but reading from the
+ * give topic. Does not modify the object.
+ */
public Bound topic(String topic) {
- return new Bound(name, subscription, topic);
+ return new Bound(name, subscription, topic, timestampLabel, dropLateData,
+ dropLateDataExplicit, idLabel);
+ }
+
+ /**
+ * Returns a new TextIO.Read PTransform that's like this one but reading timestamps
+ * from the given PubSub label. Does not modify the object.
+ */
+ public Bound timestampLabel(String timestampLabel) {
+ return new Bound(name, subscription, topic, timestampLabel, dropLateData,
+ dropLateDataExplicit, idLabel);
+ }
+
+ /**
+ * Returns a new TextIO.Read PTransform that's like this one but with the specified
+ * setting for dropLateData. Does not modify the object.
+ */
+ public Bound dropLateData(boolean dropLateData) {
+ return new Bound(name, subscription, topic, timestampLabel, dropLateData, true, idLabel);
+ }
+
+ /**
+ * Returns a new TextIO.Read PTransform that's like this one but reading unique ids
+ * from the given PubSub label. Does not modify the object.
+ */
+ public Bound idLabel(String idLabel) {
+ return new Bound(name, subscription, topic, timestampLabel, dropLateData,
+ dropLateDataExplicit, idLabel);
}
@Override
@@ -250,6 +349,22 @@
return subscription;
}
+ public String getTimestampLabel() {
+ return timestampLabel;
+ }
+
+ public boolean getDropLateData() {
+ return dropLateData;
+ }
+
+ public boolean getDropLateDataExplicit() {
+ return dropLateDataExplicit;
+ }
+
+ public String getIdLabel() {
+ return idLabel;
+ }
+
static {
// TODO: Figure out how to make this work under
// DirectPipelineRunner.
@@ -279,6 +394,30 @@
}
/**
+ * If specified, Dataflow will add a Pubsub label to each output record specifying the logical
+ * timestamp of the record. {@code <timestampLabel>} determines the label name. The label value
+ * is a numerical value representing the number of milliseconds since the Unix epoch. For
+ * example, if using the joda time classes, the org.joda.time.Instant(long) constructor can be
+ * used to parse this value. If the output from this sink is being read by another Dataflow
+ * source, then PubsubIO.Read.timestampLabel can be used to ensure that the other source reads
+ * these timestamps from the appropriate label.
+ */
+ public static Bound timestampLabel(String timestampLabel) {
+ return new Bound().timestampLabel(timestampLabel);
+ }
+
+ /**
+ * If specified, Dataflow will add a Pubsub label to each output record containing a unique
+ * identifier for that record. {@code <idLabel>} determines the label name. The label value
+ * is an opaque string value. This is useful if the the output from this sink is being read
+ * by another Dataflow source, in which case PubsubIO.Read.idLabel can be used to ensure that
+ * the other source reads these ids from the appropriate label.
+ */
+ public static Bound idLabel(String idLabel) {
+ return new Bound().idLabel(idLabel);
+ }
+
+ /**
* A PTransfrom that writes a unbounded {@code PCollection<String>}
* to a PubSub stream.
*/
@@ -287,23 +426,51 @@
extends PTransform<PCollection<String>, PDone> {
/** The Pubsub topic to publish to. */
String topic;
+ String timestampLabel;
+ String idLabel;
Bound() {}
- Bound(String name, String topic) {
+ Bound(String name, String topic, String timestampLabel, String idLabel) {
super(name);
if (topic != null) {
Validator.validateTopicName(topic);
this.topic = topic;
}
+ this.timestampLabel = timestampLabel;
+ this.idLabel = idLabel;
}
+ /**
+ * Returns a new TextIO.Write PTransform that's like this one but with the given step
+ * name. Does not modify the object.
+ */
public Bound named(String name) {
- return new Bound(name, topic);
+ return new Bound(name, topic, timestampLabel, idLabel);
}
+ /**
+ * Returns a new TextIO.Write PTransform that's like this one but writing to the given
+ * topic. Does not modify the object.
+ */
public Bound topic(String topic) {
- return new Bound(name, topic);
+ return new Bound(name, topic, timestampLabel, idLabel);
+ }
+
+ /**
+ * Returns a new TextIO.Write PTransform that's like this one but publishing timestamps
+ * to the given PubSub label. Does not modify the object.
+ */
+ public Bound timestampLabel(String timestampLabel) {
+ return new Bound(name, topic, timestampLabel, idLabel);
+ }
+
+ /**
+ * Returns a new TextIO.Write PTransform that's like this one but publishing record ids
+ * to the given PubSub label. Does not modify the object.
+ */
+ public Bound idLabel(String idLabel) {
+ return new Bound(name, topic, timestampLabel, idLabel);
}
@Override
@@ -327,6 +494,14 @@
return topic;
}
+ public String getTimestampLabel() {
+ return timestampLabel;
+ }
+
+ public String getIdLabel() {
+ return idLabel;
+ }
+
static {
// TODO: Figure out how to make this work under
// DirectPipelineRunner.
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/PubsubIOTranslator.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/PubsubIOTranslator.java
index 706397b..8b297d6 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/PubsubIOTranslator.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/PubsubIOTranslator.java
@@ -57,6 +57,15 @@
if (transform.getSubscription() != null) {
context.addInput(PropertyNames.PUBSUB_SUBSCRIPTION, transform.getSubscription());
}
+ if (transform.getTimestampLabel() != null) {
+ context.addInput(PropertyNames.PUBSUB_TIMESTAMP_LABEL, transform.getTimestampLabel());
+ }
+ if (transform.getDropLateDataExplicit()) {
+ context.addInput(PropertyNames.PUBSUB_DROP_LATE_DATA, transform.getDropLateData());
+ }
+ if (transform.getIdLabel() != null) {
+ context.addInput(PropertyNames.PUBSUB_ID_LABEL, transform.getIdLabel());
+ }
context.addValueOnlyOutput(PropertyNames.OUTPUT, transform.getOutput());
// TODO: Orderedness?
}
@@ -83,6 +92,12 @@
context.addStep(transform, "ParallelWrite");
context.addInput(PropertyNames.FORMAT, "pubsub");
context.addInput(PropertyNames.PUBSUB_TOPIC, transform.getTopic());
+ if (transform.getTimestampLabel() != null) {
+ context.addInput(PropertyNames.PUBSUB_TIMESTAMP_LABEL, transform.getTimestampLabel());
+ }
+ if (transform.getIdLabel() != null) {
+ context.addInput(PropertyNames.PUBSUB_ID_LABEL, transform.getIdLabel());
+ }
context.addEncodingInput(
WindowedValue.getValueOnlyCoder(transform.getInput().getCoder()));
context.addInput(PropertyNames.PARALLEL_INPUT, transform.getInput());
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PropertyNames.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PropertyNames.java
index 0afe5ae..a22f789 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PropertyNames.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PropertyNames.java
@@ -68,7 +68,10 @@
public static final String OUTPUT_NAME = "output_name";
public static final String PARALLEL_INPUT = "parallel_input";
public static final String PHASE = "phase";
+ public static final String PUBSUB_DROP_LATE_DATA = "pubsub_drop_late_data";
+ public static final String PUBSUB_ID_LABEL = "pubsub_id_label";
public static final String PUBSUB_SUBSCRIPTION = "pubsub_subscription";
+ public static final String PUBSUB_TIMESTAMP_LABEL = "pubsub_timestamp_label";
public static final String PUBSUB_TOPIC = "pubsub_topic";
public static final String SCALAR_FIELD_NAME = "value";
public static final String SERIALIZED_FN = "serialized_fn";