| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.beam.sdk.io.gcp.pubsub; |
| |
| import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument; |
| import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkState; |
| |
| import com.google.api.client.util.DateTime; |
| import java.io.Closeable; |
| import java.io.IOException; |
| import java.io.Serializable; |
| import java.util.Arrays; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.concurrent.ThreadLocalRandom; |
| import javax.annotation.Nullable; |
| import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Objects; |
| import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Splitter; |
| import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Strings; |
| |
| /** An (abstract) helper class for talking to Pubsub via an underlying transport. */ |
| public abstract class PubsubClient implements Closeable { |
| /** Factory for creating clients. */ |
| public interface PubsubClientFactory extends Serializable { |
| /** |
| * Construct a new Pubsub client. It should be closed via {@link #close} in order to ensure tidy |
| * cleanup of underlying netty resources (or use the try-with-resources construct). Uses {@code |
| * options} to derive pubsub endpoints and application credentials. If non-{@literal null}, use |
| * {@code timestampAttribute} and {@code idAttribute} to store custom timestamps/ids within |
| * message metadata. |
| */ |
| PubsubClient newClient( |
| @Nullable String timestampAttribute, @Nullable String idAttribute, PubsubOptions options) |
| throws IOException; |
| |
| /** Return the display name for this factory. Eg "Json", "gRPC". */ |
| String getKind(); |
| } |
| |
| /** |
| * Return timestamp as ms-since-unix-epoch corresponding to {@code timestamp}. Return {@literal |
| * null} if no timestamp could be found. Throw {@link IllegalArgumentException} if timestamp |
| * cannot be recognized. |
| */ |
| @Nullable |
| private static Long asMsSinceEpoch(@Nullable String timestamp) { |
| if (Strings.isNullOrEmpty(timestamp)) { |
| return null; |
| } |
| try { |
| // Try parsing as milliseconds since epoch. Note there is no way to parse a |
| // string in RFC 3339 format here. |
| // Expected IllegalArgumentException if parsing fails; we use that to fall back |
| // to RFC 3339. |
| return Long.parseLong(timestamp); |
| } catch (IllegalArgumentException e1) { |
| // Try parsing as RFC3339 string. DateTime.parseRfc3339 will throw an |
| // IllegalArgumentException if parsing fails, and the caller should handle. |
| return DateTime.parseRfc3339(timestamp).getValue(); |
| } |
| } |
| |
| /** |
| * Return the timestamp (in ms since unix epoch) to use for a Pubsub message with {@code |
| * attributes} and {@code pubsubTimestamp}. |
| * |
| * <p>If {@code timestampAttribute} is non-{@literal null} then the message attributes must |
| * contain that attribute, and the value of that attribute will be taken as the timestamp. |
| * Otherwise the timestamp will be taken from the Pubsub publish timestamp {@code |
| * pubsubTimestamp}. |
| * |
| * @throws IllegalArgumentException if the timestamp cannot be recognized as a ms-since-unix-epoch |
| * or RFC3339 time. |
| */ |
| protected static long extractTimestamp( |
| @Nullable String timestampAttribute, |
| @Nullable String pubsubTimestamp, |
| @Nullable Map<String, String> attributes) { |
| Long timestampMsSinceEpoch; |
| if (Strings.isNullOrEmpty(timestampAttribute)) { |
| timestampMsSinceEpoch = asMsSinceEpoch(pubsubTimestamp); |
| checkArgument( |
| timestampMsSinceEpoch != null, |
| "Cannot interpret PubSub publish timestamp: %s", |
| pubsubTimestamp); |
| } else { |
| String value = attributes == null ? null : attributes.get(timestampAttribute); |
| checkArgument( |
| value != null, |
| "PubSub message is missing a value for timestamp attribute %s", |
| timestampAttribute); |
| timestampMsSinceEpoch = asMsSinceEpoch(value); |
| checkArgument( |
| timestampMsSinceEpoch != null, |
| "Cannot interpret value of attribute %s as timestamp: %s", |
| timestampAttribute, |
| value); |
| } |
| return timestampMsSinceEpoch; |
| } |
| |
| /** Path representing a cloud project id. */ |
| public static class ProjectPath implements Serializable { |
| private final String projectId; |
| |
| /** |
| * Creates a {@link ProjectPath} from a {@link String} representation, which must be of the form |
| * {@code "projects/" + projectId}. |
| */ |
| ProjectPath(String path) { |
| List<String> splits = Splitter.on('/').splitToList(path); |
| checkArgument( |
| splits.size() == 2 && "projects".equals(splits.get(0)), |
| "Malformed project path \"%s\": must be of the form \"projects/\" + <project id>", |
| path); |
| this.projectId = splits.get(1); |
| } |
| |
| public String getPath() { |
| return String.format("projects/%s", projectId); |
| } |
| |
| public String getId() { |
| return projectId; |
| } |
| |
| @Override |
| public boolean equals(Object o) { |
| if (this == o) { |
| return true; |
| } |
| if (o == null || getClass() != o.getClass()) { |
| return false; |
| } |
| |
| ProjectPath that = (ProjectPath) o; |
| |
| return projectId.equals(that.projectId); |
| } |
| |
| @Override |
| public int hashCode() { |
| return projectId.hashCode(); |
| } |
| |
| @Override |
| public String toString() { |
| return getPath(); |
| } |
| } |
| |
| public static ProjectPath projectPathFromPath(String path) { |
| return new ProjectPath(path); |
| } |
| |
| public static ProjectPath projectPathFromId(String projectId) { |
| return new ProjectPath(String.format("projects/%s", projectId)); |
| } |
| |
| /** Path representing a Pubsub subscription. */ |
| public static class SubscriptionPath implements Serializable { |
| private final String projectId; |
| private final String subscriptionName; |
| |
| SubscriptionPath(String path) { |
| List<String> splits = Splitter.on('/').splitToList(path); |
| checkState( |
| splits.size() == 4 |
| && "projects".equals(splits.get(0)) |
| && "subscriptions".equals(splits.get(2)), |
| "Malformed subscription path %s: " |
| + "must be of the form \"projects/\" + <project id> + \"subscriptions\"", |
| path); |
| this.projectId = splits.get(1); |
| this.subscriptionName = splits.get(3); |
| } |
| |
| public String getPath() { |
| return String.format("projects/%s/subscriptions/%s", projectId, subscriptionName); |
| } |
| |
| public String getName() { |
| return subscriptionName; |
| } |
| |
| public String getV1Beta1Path() { |
| return String.format("/subscriptions/%s/%s", projectId, subscriptionName); |
| } |
| |
| @Override |
| public boolean equals(Object o) { |
| if (this == o) { |
| return true; |
| } |
| if (o == null || getClass() != o.getClass()) { |
| return false; |
| } |
| SubscriptionPath that = (SubscriptionPath) o; |
| return this.subscriptionName.equals(that.subscriptionName) |
| && this.projectId.equals(that.projectId); |
| } |
| |
| @Override |
| public int hashCode() { |
| return Objects.hashCode(projectId, subscriptionName); |
| } |
| |
| @Override |
| public String toString() { |
| return getPath(); |
| } |
| } |
| |
| public static SubscriptionPath subscriptionPathFromPath(String path) { |
| return new SubscriptionPath(path); |
| } |
| |
| public static SubscriptionPath subscriptionPathFromName( |
| String projectId, String subscriptionName) { |
| return new SubscriptionPath( |
| String.format("projects/%s/subscriptions/%s", projectId, subscriptionName)); |
| } |
| |
| /** Path representing a Pubsub topic. */ |
| public static class TopicPath implements Serializable { |
| private final String path; |
| |
| TopicPath(String path) { |
| this.path = path; |
| } |
| |
| public String getPath() { |
| return path; |
| } |
| |
| public String getName() { |
| List<String> splits = Splitter.on('/').splitToList(path); |
| |
| checkState(splits.size() == 4, "Malformed topic path %s", path); |
| return splits.get(3); |
| } |
| |
| public String getV1Beta1Path() { |
| List<String> splits = Splitter.on('/').splitToList(path); |
| checkState(splits.size() == 4, "Malformed topic path %s", path); |
| return String.format("/topics/%s/%s", splits.get(1), splits.get(3)); |
| } |
| |
| @Override |
| public boolean equals(Object o) { |
| if (this == o) { |
| return true; |
| } |
| if (o == null || getClass() != o.getClass()) { |
| return false; |
| } |
| TopicPath topicPath = (TopicPath) o; |
| return path.equals(topicPath.path); |
| } |
| |
| @Override |
| public int hashCode() { |
| return path.hashCode(); |
| } |
| |
| @Override |
| public String toString() { |
| return path; |
| } |
| } |
| |
| public static TopicPath topicPathFromPath(String path) { |
| return new TopicPath(path); |
| } |
| |
| public static TopicPath topicPathFromName(String projectId, String topicName) { |
| return new TopicPath(String.format("projects/%s/topics/%s", projectId, topicName)); |
| } |
| |
| /** |
| * A message to be sent to Pubsub. |
| * |
| * <p>NOTE: This class is {@link Serializable} only to support the {@link PubsubTestClient}. Java |
| * serialization is never used for non-test clients. |
| */ |
| public static class OutgoingMessage implements Serializable { |
| /** Underlying (encoded) element. */ |
| public final byte[] elementBytes; |
| |
| public final Map<String, String> attributes; |
| |
| /** Timestamp for element (ms since epoch). */ |
| public final long timestampMsSinceEpoch; |
| |
| /** |
| * If using an id attribute, the record id to associate with this record's metadata so the |
| * receiver can reject duplicates. Otherwise {@literal null}. |
| */ |
| @Nullable public final String recordId; |
| |
| public OutgoingMessage( |
| byte[] elementBytes, |
| Map<String, String> attributes, |
| long timestampMsSinceEpoch, |
| @Nullable String recordId) { |
| this.elementBytes = elementBytes; |
| this.attributes = attributes; |
| this.timestampMsSinceEpoch = timestampMsSinceEpoch; |
| this.recordId = recordId; |
| } |
| |
| @Override |
| public String toString() { |
| return String.format( |
| "OutgoingMessage(%db, %dms)", elementBytes.length, timestampMsSinceEpoch); |
| } |
| |
| @Override |
| public boolean equals(Object o) { |
| if (this == o) { |
| return true; |
| } |
| if (o == null || getClass() != o.getClass()) { |
| return false; |
| } |
| |
| OutgoingMessage that = (OutgoingMessage) o; |
| |
| return timestampMsSinceEpoch == that.timestampMsSinceEpoch |
| && Arrays.equals(elementBytes, that.elementBytes) |
| && Objects.equal(attributes, that.attributes) |
| && Objects.equal(recordId, that.recordId); |
| } |
| |
| @Override |
| public int hashCode() { |
| return Objects.hashCode( |
| Arrays.hashCode(elementBytes), attributes, timestampMsSinceEpoch, recordId); |
| } |
| } |
| |
| /** |
| * A message received from Pubsub. |
| * |
| * <p>NOTE: This class is {@link Serializable} only to support the {@link PubsubTestClient}. Java |
| * serialization is never used for non-test clients. |
| */ |
| static class IncomingMessage implements Serializable { |
| /** Underlying (encoded) element. */ |
| public final byte[] elementBytes; |
| |
| public Map<String, String> attributes; |
| |
| /** |
| * Timestamp for element (ms since epoch). Either Pubsub's processing time, or the custom |
| * timestamp associated with the message. |
| */ |
| public final long timestampMsSinceEpoch; |
| |
| /** Timestamp (in system time) at which we requested the message (ms since epoch). */ |
| public final long requestTimeMsSinceEpoch; |
| |
| /** Id to pass back to Pubsub to acknowledge receipt of this message. */ |
| public final String ackId; |
| |
| /** Id to pass to the runner to distinguish this message from all others. */ |
| public final String recordId; |
| |
| public IncomingMessage( |
| byte[] elementBytes, |
| Map<String, String> attributes, |
| long timestampMsSinceEpoch, |
| long requestTimeMsSinceEpoch, |
| String ackId, |
| String recordId) { |
| this.elementBytes = elementBytes; |
| this.attributes = attributes; |
| this.timestampMsSinceEpoch = timestampMsSinceEpoch; |
| this.requestTimeMsSinceEpoch = requestTimeMsSinceEpoch; |
| this.ackId = ackId; |
| this.recordId = recordId; |
| } |
| |
| public IncomingMessage withRequestTime(long requestTimeMsSinceEpoch) { |
| return new IncomingMessage( |
| elementBytes, |
| attributes, |
| timestampMsSinceEpoch, |
| requestTimeMsSinceEpoch, |
| ackId, |
| recordId); |
| } |
| |
| @Override |
| public String toString() { |
| return String.format( |
| "IncomingMessage(%db, %dms)", elementBytes.length, timestampMsSinceEpoch); |
| } |
| |
| @Override |
| public boolean equals(Object o) { |
| if (this == o) { |
| return true; |
| } |
| if (o == null || getClass() != o.getClass()) { |
| return false; |
| } |
| |
| IncomingMessage that = (IncomingMessage) o; |
| |
| return timestampMsSinceEpoch == that.timestampMsSinceEpoch |
| && requestTimeMsSinceEpoch == that.requestTimeMsSinceEpoch |
| && ackId.equals(that.ackId) |
| && recordId.equals(that.recordId) |
| && Arrays.equals(elementBytes, that.elementBytes) |
| && Objects.equal(attributes, that.attributes); |
| } |
| |
| @Override |
| public int hashCode() { |
| return Objects.hashCode( |
| Arrays.hashCode(elementBytes), |
| attributes, |
| timestampMsSinceEpoch, |
| requestTimeMsSinceEpoch, |
| ackId, |
| recordId); |
| } |
| } |
| |
| /** |
| * Publish {@code outgoingMessages} to Pubsub {@code topic}. Return number of messages published. |
| */ |
| public abstract int publish(TopicPath topic, List<OutgoingMessage> outgoingMessages) |
| throws IOException; |
| |
| /** |
| * Request the next batch of up to {@code batchSize} messages from {@code subscription}. Return |
| * the received messages, or empty collection if none were available. Does not wait for messages |
| * to arrive if {@code returnImmediately} is {@literal true}. Returned messages will record their |
| * request time as {@code requestTimeMsSinceEpoch}. |
| */ |
| public abstract List<IncomingMessage> pull( |
| long requestTimeMsSinceEpoch, |
| SubscriptionPath subscription, |
| int batchSize, |
| boolean returnImmediately) |
| throws IOException; |
| |
| /** Acknowldege messages from {@code subscription} with {@code ackIds}. */ |
| public abstract void acknowledge(SubscriptionPath subscription, List<String> ackIds) |
| throws IOException; |
| |
| /** |
| * Modify the ack deadline for messages from {@code subscription} with {@code ackIds} to be {@code |
| * deadlineSeconds} from now. |
| */ |
| public abstract void modifyAckDeadline( |
| SubscriptionPath subscription, List<String> ackIds, int deadlineSeconds) throws IOException; |
| |
| /** Create {@code topic}. */ |
| public abstract void createTopic(TopicPath topic) throws IOException; |
| |
| /* |
| * Delete {@code topic}. |
| */ |
| public abstract void deleteTopic(TopicPath topic) throws IOException; |
| |
| /** Return a list of topics for {@code project}. */ |
| public abstract List<TopicPath> listTopics(ProjectPath project) throws IOException; |
| |
| /** Create {@code subscription} to {@code topic}. */ |
| public abstract void createSubscription( |
| TopicPath topic, SubscriptionPath subscription, int ackDeadlineSeconds) throws IOException; |
| |
| /** |
| * Create a random subscription for {@code topic}. Return the {@link SubscriptionPath}. It is the |
| * responsibility of the caller to later delete the subscription. |
| */ |
| public SubscriptionPath createRandomSubscription( |
| ProjectPath project, TopicPath topic, int ackDeadlineSeconds) throws IOException { |
| // Create a randomized subscription derived from the topic name. |
| String subscriptionName = topic.getName() + "_beam_" + ThreadLocalRandom.current().nextLong(); |
| SubscriptionPath subscription = |
| PubsubClient.subscriptionPathFromName(project.getId(), subscriptionName); |
| createSubscription(topic, subscription, ackDeadlineSeconds); |
| return subscription; |
| } |
| |
| /** Delete {@code subscription}. */ |
| public abstract void deleteSubscription(SubscriptionPath subscription) throws IOException; |
| |
| /** Return a list of subscriptions for {@code topic} in {@code project}. */ |
| public abstract List<SubscriptionPath> listSubscriptions(ProjectPath project, TopicPath topic) |
| throws IOException; |
| |
| /** Return the ack deadline, in seconds, for {@code subscription}. */ |
| public abstract int ackDeadlineSeconds(SubscriptionPath subscription) throws IOException; |
| |
| /** |
| * Return {@literal true} if {@link #pull} will always return empty list. Actual clients will |
| * return {@literal false}. Test clients may return {@literal true} to signal that all expected |
| * messages have been pulled and the test may complete. |
| */ |
| public abstract boolean isEOF(); |
| } |