blob: 2414feeb34f9ff81e10d7019651dc2657e7702af [file] [log] [blame]
/*
* Copyright (C) 2014 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.dataflow.sdk.io;
import com.google.cloud.dataflow.sdk.coders.Coder;
import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
import com.google.cloud.dataflow.sdk.coders.VoidCoder;
import com.google.cloud.dataflow.sdk.transforms.PTransform;
import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindows;
import com.google.cloud.dataflow.sdk.values.PCollection;
import com.google.cloud.dataflow.sdk.values.PDone;
import com.google.cloud.dataflow.sdk.values.PInput;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* [Whitelisting Required] Read and Write transforms for Pub/Sub streams. These transforms create
* and consume unbounded {@link com.google.cloud.dataflow.sdk.values.PCollection}s.
*
* <p> <b>Important:</b> PubsubIO is experimental. It is not supported by the
* {@link com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner} and is only supported in the
* {@link com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner} for users whitelisted in a
* streaming early access program and who enable
* {@link com.google.cloud.dataflow.sdk.options.StreamingOptions#setStreaming(boolean)}.
*
* <p> You should expect this class to change significantly in future versions of the SDK
* or be removed entirely.
*/
public class PubsubIO {
/**
* Project IDs must contain 6-63 lowercase letters, digits, or dashes.
* IDs must start with a letter and may not end with a dash.
* This regex isn't exact - this allows for patterns that would be rejected by
* the service, but this is sufficient for basic parsing of table references.
*/
private static final Pattern PROJECT_ID_REGEXP =
Pattern.compile("[a-z][-a-z0-9:.]{4,61}[a-z0-9]");
private static final Pattern SUBSCRIPTION_REGEXP =
Pattern.compile("/subscriptions/([^/]+)/(.+)");
private static final Pattern TOPIC_REGEXP =
Pattern.compile("/topics/([^/]+)/(.+)");
private static final Pattern PUBSUB_NAME_REGEXP =
Pattern.compile("[a-z][-._a-z0-9]+[a-z0-9]");
private static final int PUBSUB_NAME_MAX_LENGTH = 255;
private static final String SUBSCRIPTION_RANDOM_TEST_PREFIX = "_random/";
private static final String TOPIC_DEV_NULL_TEST_NAME = "/topics/dev/null";
/**
* Utility class to validate topic and subscription names.
*/
public static class Validator {
public static void validateTopicName(String topic) {
if (topic.equals(TOPIC_DEV_NULL_TEST_NAME)) {
return;
}
Matcher match = TOPIC_REGEXP.matcher(topic);
if (!match.matches()) {
throw new IllegalArgumentException(
"Pubsub topic is not in /topics/project_id/topic_name format: "
+ topic);
}
validateProjectName(match.group(1));
validatePubsubName(match.group(2));
}
public static void validateSubscriptionName(String subscription) {
if (subscription.startsWith(SUBSCRIPTION_RANDOM_TEST_PREFIX)) {
return;
}
Matcher match = SUBSCRIPTION_REGEXP.matcher(subscription);
if (!match.matches()) {
throw new IllegalArgumentException(
"Pubsub subscription is not in /subscriptions/project_id/subscription_name format: "
+ subscription);
}
validateProjectName(match.group(1));
validatePubsubName(match.group(2));
}
private static void validateProjectName(String project) {
Matcher match = PROJECT_ID_REGEXP.matcher(project);
if (!match.matches()) {
throw new IllegalArgumentException(
"Illegal project name specified in Pubsub subscription: " + project);
}
}
private static void validatePubsubName(String name) {
if (name.length() > PUBSUB_NAME_MAX_LENGTH) {
throw new IllegalArgumentException(
"Pubsub object name is longer than 255 characters: " + name);
}
if (name.startsWith("goog")) {
throw new IllegalArgumentException(
"Pubsub object name cannot start with goog: " + name);
}
Matcher match = PUBSUB_NAME_REGEXP.matcher(name);
if (!match.matches()) {
throw new IllegalArgumentException(
"Illegal Pubsub object name specified: " + name
+ " Please see Javadoc for naming rules.");
}
}
}
/**
* A PTransform that continuously reads from a Pubsub stream and
* returns a {@code PCollection<String>} containing the items from
* the stream.
*/
// TODO: Support non-String encodings.
public static class Read {
public static Bound named(String name) {
return new Bound().named(name);
}
/**
* Creates and returns a PubsubIO.Read PTransform for reading from
* a Pubsub topic with the specified publisher topic. Format for
* Cloud Pubsub topic names should be of the form
* {@code /topics/<project>/<topic>}, where {@code <project>} is the name of
* the publishing project. The {@code <topic>} component must comply with
* the below requirements.
*
* <ul>
* <li>Can only contain lowercase letters, numbers, dashes ('-'), underscores ('_') and periods
* ('.').</li>
* <li>Must be between 3 and 255 characters.</li>
* <li>Must begin with a letter.</li>
* <li>Must end with a letter or a number.</li>
* <li>Cannot begin with 'goog' prefix.</li>
* </ul>
*
* Dataflow will start reading data published on this topic from the time the pipeline is
* started. Any data published on the topic before the pipeline is started will not be read
* by Dataflow.
*/
public static Bound topic(String topic) {
return new Bound().topic(topic);
}
/**
* Creates and returns a PubsubIO.Read PTransform for reading from
* a specific Pubsub subscription. Mutually exclusive with
* PubsubIO.Read.topic().
* Cloud Pubsub subscription names should be of the form
* {@code /subscriptions/<project>/<<subscription>},
* where {@code <project>} is the name of the project the subscription belongs to.
* The {@code <subscription>} component must comply with the below requirements.
*
* <ul>
* <li>Can only contain lowercase letters, numbers, dashes ('-'), underscores ('_') and periods
* ('.').</li>
* <li>Must be between 3 and 255 characters.</li>
* <li>Must begin with a letter.</li>
* <li>Must end with a letter or a number.</li>
* <li>Cannot begin with 'goog' prefix.</li>
* </ul>
*/
public static Bound subscription(String subscription) {
return new Bound().subscription(subscription);
}
/**
* Creates and returns a PubsubIO.Read PTransform where record timestamps are expected
* to be provided using the PubSub labeling API. The {@code <timestampLabel>} parameter
* specifies the label name. The label value sent to PubsSub is a numerical value representing
* the number of milliseconds since the Unix epoch. For example, if using the joda time classes,
* org.joda.time.Instant.getMillis() returns the correct value for this label.
*
* <p> If {@code <timestampLabel>} is not provided, the system will generate record timestamps
* the first time it sees each record. All windowing will be done relative to these timestamps.
* Windows are closed based on an estimate of when this source has finished producing data for
* a timestamp range, which means that late data can arrive after a window has been closed. The
* {#dropLateData} field allows you to control what to do with late data.
*/
public static Bound timestampLabel(String timestampLabel) {
return new Bound().timestampLabel(timestampLabel);
}
/**
* If true, then late-arriving data from this source will be dropped.
*/
public static Bound dropLateData(boolean dropLateData) {
return new Bound().dropLateData(dropLateData);
}
/**
* Creates and returns a PubSubIO.Read PTransform where unique record identifiers are
* expected to be provided using the PubSub labeling API. The {@code <idLabel>} parameter
* specifies the label name. The label value sent to PubSub can be any string value that
* uniquely identifies this record.
*
* <p> If idLabel is not provided, Dataflow cannot guarantee that no duplicate data will be
* delivered on the PubSub stream. In this case, deduplication of the stream will be
* stricly best effort.
*/
public static Bound idLabel(String idLabel) {
return new Bound().idLabel(idLabel);
}
/**
* A PTransform that reads from a PubSub source and returns
* a unbounded PCollection containing the items from the stream.
*/
@SuppressWarnings("serial")
public static class Bound
extends PTransform<PInput, PCollection<String>> {
/** The Pubsub topic to read from. */
String topic;
/** The Pubsub subscription to read from. */
String subscription;
/** The Pubsub label to read timestamps from. */
String timestampLabel;
Boolean dropLateData;
/** This is set for backwards compatibility with old services. If dropLateData is not
* explicitly called, then we won't forward that parameter to the service. */
Boolean dropLateDataExplicit;
/** The Pubsub label to read ids from. */
String idLabel;
Bound() {
this.dropLateData = true;
this.dropLateDataExplicit = false;
}
Bound(String name, String subscription, String topic, String timestampLabel,
boolean dropLateData, boolean dropLateDataExplicit, String idLabel) {
super(name);
if (subscription != null) {
Validator.validateSubscriptionName(subscription);
}
if (topic != null) {
Validator.validateTopicName(topic);
}
this.subscription = subscription;
this.topic = topic;
this.timestampLabel = timestampLabel;
this.dropLateData = dropLateData;
this.dropLateDataExplicit = dropLateDataExplicit;
this.idLabel = idLabel;
}
/**
* Returns a new TextIO.Read PTransform that's like this one but with the given
* step name. Does not modify the object.
*/
public Bound named(String name) {
return new Bound(name, subscription, topic, timestampLabel, dropLateData,
dropLateDataExplicit, idLabel);
}
/**
* Returns a new TextIO.Read PTransform that's like this one but reading from the
* given subscription. Does not modify the object.
*/
public Bound subscription(String subscription) {
return new Bound(name, subscription, topic, timestampLabel, dropLateData,
dropLateDataExplicit, idLabel);
}
/**
* Returns a new TextIO.Read PTransform that's like this one but reading from the
* give topic. Does not modify the object.
*/
public Bound topic(String topic) {
return new Bound(name, subscription, topic, timestampLabel, dropLateData,
dropLateDataExplicit, idLabel);
}
/**
* Returns a new TextIO.Read PTransform that's like this one but reading timestamps
* from the given PubSub label. Does not modify the object.
*/
public Bound timestampLabel(String timestampLabel) {
return new Bound(name, subscription, topic, timestampLabel, dropLateData,
dropLateDataExplicit, idLabel);
}
/**
* Returns a new TextIO.Read PTransform that's like this one but with the specified
* setting for dropLateData. Does not modify the object.
*/
public Bound dropLateData(boolean dropLateData) {
return new Bound(name, subscription, topic, timestampLabel, dropLateData, true, idLabel);
}
/**
* Returns a new TextIO.Read PTransform that's like this one but reading unique ids
* from the given PubSub label. Does not modify the object.
*/
public Bound idLabel(String idLabel) {
return new Bound(name, subscription, topic, timestampLabel, dropLateData,
dropLateDataExplicit, idLabel);
}
@Override
public PCollection<String> apply(PInput input) {
if (topic == null && subscription == null) {
throw new IllegalStateException(
"need to set either the topic or the subscription for "
+ "a PubsubIO.Read transform");
}
if (topic != null && subscription != null) {
throw new IllegalStateException(
"Can't set both the topic and the subscription for a "
+ "PubsubIO.Read transform");
}
return PCollection.<String>createPrimitiveOutputInternal(
new GlobalWindows());
}
@Override
protected Coder<String> getDefaultOutputCoder() {
return StringUtf8Coder.of();
}
@Override
protected String getKindString() { return "PubsubIO.Read"; }
public String getTopic() {
return topic;
}
public String getSubscription() {
return subscription;
}
public String getTimestampLabel() {
return timestampLabel;
}
public boolean getDropLateData() {
return dropLateData;
}
public boolean getDropLateDataExplicit() {
return dropLateDataExplicit;
}
public String getIdLabel() {
return idLabel;
}
static {
// TODO: Figure out how to make this work under
// DirectPipelineRunner.
}
}
}
/////////////////////////////////////////////////////////////////////////////
/**
* A PTransform that continuously writes a
* {@code PCollection<String>} to a Pubsub stream.
*/
// TODO: Support non-String encodings.
public static class Write {
public static Bound named(String name) {
return new Bound().named(name);
}
/** The topic to publish to.
* Cloud Pubsub topic names should be {@code /topics/<project>/<topic>},
* where {@code <project>} is the name of the publishing project.
*/
public static Bound topic(String topic) {
return new Bound().topic(topic);
}
/**
* If specified, Dataflow will add a Pubsub label to each output record specifying the logical
* timestamp of the record. {@code <timestampLabel>} determines the label name. The label value
* is a numerical value representing the number of milliseconds since the Unix epoch. For
* example, if using the joda time classes, the org.joda.time.Instant(long) constructor can be
* used to parse this value. If the output from this sink is being read by another Dataflow
* source, then PubsubIO.Read.timestampLabel can be used to ensure that the other source reads
* these timestamps from the appropriate label.
*/
public static Bound timestampLabel(String timestampLabel) {
return new Bound().timestampLabel(timestampLabel);
}
/**
* If specified, Dataflow will add a Pubsub label to each output record containing a unique
* identifier for that record. {@code <idLabel>} determines the label name. The label value
* is an opaque string value. This is useful if the the output from this sink is being read
* by another Dataflow source, in which case PubsubIO.Read.idLabel can be used to ensure that
* the other source reads these ids from the appropriate label.
*/
public static Bound idLabel(String idLabel) {
return new Bound().idLabel(idLabel);
}
/**
* A PTransfrom that writes a unbounded {@code PCollection<String>}
* to a PubSub stream.
*/
@SuppressWarnings("serial")
public static class Bound
extends PTransform<PCollection<String>, PDone> {
/** The Pubsub topic to publish to. */
String topic;
String timestampLabel;
String idLabel;
Bound() {}
Bound(String name, String topic, String timestampLabel, String idLabel) {
super(name);
if (topic != null) {
Validator.validateTopicName(topic);
this.topic = topic;
}
this.timestampLabel = timestampLabel;
this.idLabel = idLabel;
}
/**
* Returns a new TextIO.Write PTransform that's like this one but with the given step
* name. Does not modify the object.
*/
public Bound named(String name) {
return new Bound(name, topic, timestampLabel, idLabel);
}
/**
* Returns a new TextIO.Write PTransform that's like this one but writing to the given
* topic. Does not modify the object.
*/
public Bound topic(String topic) {
return new Bound(name, topic, timestampLabel, idLabel);
}
/**
* Returns a new TextIO.Write PTransform that's like this one but publishing timestamps
* to the given PubSub label. Does not modify the object.
*/
public Bound timestampLabel(String timestampLabel) {
return new Bound(name, topic, timestampLabel, idLabel);
}
/**
* Returns a new TextIO.Write PTransform that's like this one but publishing record ids
* to the given PubSub label. Does not modify the object.
*/
public Bound idLabel(String idLabel) {
return new Bound(name, topic, timestampLabel, idLabel);
}
@Override
public PDone apply(PCollection<String> input) {
if (topic == null) {
throw new IllegalStateException(
"need to set the topic of a PubsubIO.Write transform");
}
return new PDone();
}
@Override
protected Coder<Void> getDefaultOutputCoder() {
return VoidCoder.of();
}
@Override
protected String getKindString() { return "PubsubIO.Write"; }
public String getTopic() {
return topic;
}
public String getTimestampLabel() {
return timestampLabel;
}
public String getIdLabel() {
return idLabel;
}
static {
// TODO: Figure out how to make this work under
// DirectPipelineRunner.
}
}
}
}