blob: b93ac61f33be954abd2426f771258e5aaad50722 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.gcp.pubsublite;
import com.google.cloud.pubsublite.proto.PubSubMessage;
import com.google.cloud.pubsublite.proto.SequencedMessage;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
/**
* I/O transforms for reading from Google Pub/Sub Lite.
*
* <p>For the differences between this and Google Pub/Sub, please refer to the <a
* href="https://cloud.google.com/pubsub/docs/choosing-pubsub-or-lite">product documentation</a>.
*/
@Experimental
public final class PubsubLiteIO {
private PubsubLiteIO() {}
/**
* Read messages from Pub/Sub Lite. These messages may contain duplicates if the publisher
* retried, which the PubsubLiteIO write method will do. Use the dedupe transform to remove these
* duplicates.
*
* <pre>{@code
* Pipeline p = ...;
*
* SubscriptionPath subscriptionPath =
* SubscriptionPath.newBuilder()
* .setLocation(zone)
* .setProjectNumber(projectNum)
* .setName(subscriptionName)
* .build();
*
* PCollection<SequencedMessage> messages = p.apply(PubsubLiteIO.read(SubscriberOptions.newBuilder()
* .setSubscriptionPath(subscriptionPath)
* .build()), "read");
* }</pre>
*/
public static PTransform<PBegin, PCollection<SequencedMessage>> read(SubscriberOptions options) {
return new SubscribeTransform(options);
}
/**
* Remove duplicates from the PTransform from a read. Assumes by default that the uuids were added
* by a call to PubsubLiteIO.addUuids() when published.
*
* <pre>{@code
* PCollection<SequencedMessage> messages = ... (above) ...;
* messages = messages.apply(PubsubLiteIO.deduplicate(
* UuidDeduplicationOptions.newBuilder().build()));
*
* }</pre>
*/
public static PTransform<PCollection<SequencedMessage>, PCollection<SequencedMessage>>
deduplicate(UuidDeduplicationOptions options) {
return new UuidDeduplicationTransform(options);
}
/**
* Add Uuids to to-be-published messages that ensures that uniqueness is maintained.
*
* <pre>{@code
* PCollection<Message> messages = ...;
* messages = messages.apply(PubsubLiteIO.addUuids());
*
* }</pre>
*/
public static PTransform<PCollection<PubSubMessage>, PCollection<PubSubMessage>> addUuids() {
return new AddUuidsTransform();
}
/**
* Write messages to Pub/Sub Lite.
*
* <pre>{@code
* TopicPath topicPath =
* TopicPath.newBuilder()
* .setProjectNumber(projectNum)
* .setLocation(zone)
* .setName(topicName)
* .build();
*
* PCollection<Message> messages = ...;
* messages.apply(PubsubLiteIO.write(
* PublisherOptions.newBuilder().setTopicPath(topicPath).build());
*
* }</pre>
*/
public static PTransform<PCollection<PubSubMessage>, PDone> write(PublisherOptions options) {
return new PTransform<PCollection<PubSubMessage>, PDone>() {
@Override
public PDone expand(PCollection<PubSubMessage> input) {
PubsubLiteSink sink = new PubsubLiteSink(options);
input.apply("Write", ParDo.of(sink));
return PDone.in(input.getPipeline());
}
};
}
}