blob: 9dc446fa10a2b6029a9fcafa4328acf3fb5288d5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.gcp.pubsub;
import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument;
import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkNotNull;
import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkState;
import com.google.api.client.util.Clock;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.security.GeneralSecurityException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.ListCoder;
import org.apache.beam.sdk.coders.NullableCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.ProjectPath;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.PubsubClientFactory;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SubscriptionPath;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.SourceMetrics;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.DisplayData.Builder;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.BucketingFunction;
import org.apache.beam.sdk.util.MovingFunction;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Lists;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Users should use {@link PubsubIO#read} instead.
*
* <p>A PTransform which streams messages from Pubsub.
*
* <ul>
* <li>The underlying implementation in an {@link UnboundedSource} which receives messages in
* batches and hands them out one at a time.
* <li>The watermark (either in Pubsub processing time or custom timestamp time) is estimated by
* keeping track of the minimum of the last minutes worth of messages. This assumes Pubsub
* delivers the oldest (in Pubsub processing time) available message at least once a minute,
* and that custom timestamps are 'mostly' monotonic with Pubsub processing time.
* Unfortunately both of those assumptions are fragile. Thus the estimated watermark may get
* ahead of the 'true' watermark and cause some messages to be late.
* <li>Checkpoints are used both to ACK received messages back to Pubsub (so that they may be
* retired on the Pubsub end), and to NACK already consumed messages should a checkpoint need
* to be restored (so that Pubsub will resend those messages promptly).
* <li>The backlog is determined by each reader using the messages which have been pulled from
* Pubsub but not yet consumed downstream. The backlog does not take account of any messages
* queued by Pubsub for the subscription. Unfortunately there is currently no API to determine
* the size of the Pubsub queue's backlog.
* <li>The subscription must already exist.
* <li>The subscription timeout is read whenever a reader is started. However it is not checked
* thereafter despite the timeout being user-changeable on-the-fly.
* <li>We log vital stats every 30 seconds.
* <li>Though some background threads may be used by the underlying transport all Pubsub calls are
* blocking. We rely on the underlying runner to allow multiple {@link
* UnboundedSource.UnboundedReader} instances to execute concurrently and thus hide latency.
* </ul>
*/
public class PubsubUnboundedSource extends PTransform<PBegin, PCollection<PubsubMessage>> {
private static final Logger LOG = LoggerFactory.getLogger(PubsubUnboundedSource.class);
/** Default ACK timeout for created subscriptions. */
private static final int DEAULT_ACK_TIMEOUT_SEC = 60;
/** Coder for checkpoints. */
private static final PubsubCheckpointCoder<?> CHECKPOINT_CODER = PubsubCheckpointCoder.of();
/** Maximum number of messages per pull. */
private static final int PULL_BATCH_SIZE = 1000;
/** Maximum number of ACK ids per ACK or ACK extension call. */
private static final int ACK_BATCH_SIZE = 2000;
/** Maximum number of messages in flight. */
private static final int MAX_IN_FLIGHT = 20000;
/** Timeout for round trip from receiving a message to finally ACKing it back to Pubsub. */
private static final Duration PROCESSING_TIMEOUT = Duration.standardMinutes(2);
/** Percentage of ack timeout by which to extend acks when they are near timeout. */
private static final int ACK_EXTENSION_PCT = 50;
/**
* Percentage of ack timeout we should use as a safety margin. We'll try to extend acks by this
* margin before the ack actually expires.
*/
private static final int ACK_SAFETY_PCT = 20;
/**
* For stats only: How close we can get to an ack deadline before we risk it being already
* considered passed by Pubsub.
*/
private static final Duration ACK_TOO_LATE = Duration.standardSeconds(2);
/** Period of samples to determine watermark and other stats. */
private static final Duration SAMPLE_PERIOD = Duration.standardMinutes(1);
/** Period of updates to determine watermark and other stats. */
private static final Duration SAMPLE_UPDATE = Duration.standardSeconds(5);
/** Period for logging stats. */
private static final Duration LOG_PERIOD = Duration.standardSeconds(30);
/** Minimum number of unread messages required before considering updating watermark. */
private static final int MIN_WATERMARK_MESSAGES = 10;
/**
* Minimum number of SAMPLE_UPDATE periods over which unread messages shoud be spread before
* considering updating watermark.
*/
private static final int MIN_WATERMARK_SPREAD = 2;
/** Additional sharding so that we can hide read message latency. */
private static final int SCALE_OUT = 4;
// TODO: Would prefer to use MinLongFn but it is a BinaryCombineFn<Long> rather
// than a BinaryCombineLongFn. [BEAM-285]
private static final Combine.BinaryCombineLongFn MIN =
new Combine.BinaryCombineLongFn() {
@Override
public long apply(long left, long right) {
return Math.min(left, right);
}
@Override
public long identity() {
return Long.MAX_VALUE;
}
};
private static final Combine.BinaryCombineLongFn MAX =
new Combine.BinaryCombineLongFn() {
@Override
public long apply(long left, long right) {
return Math.max(left, right);
}
@Override
public long identity() {
return Long.MIN_VALUE;
}
};
private static final Combine.BinaryCombineLongFn SUM = Sum.ofLongs();
// ================================================================================
// Checkpoint
// ================================================================================
/**
* Which messages have been durably committed and thus can now be ACKed. Which messages have been
* read but not yet committed, in which case they should be NACKed if we need to restore.
*/
@VisibleForTesting
static class PubsubCheckpoint implements UnboundedSource.CheckpointMark {
/**
* The {@link SubscriptionPath} to the subscription the reader is reading from. May be {@code
* null} if the {@link PubsubUnboundedSource} contains the subscription.
*/
@VisibleForTesting @Nullable String subscriptionPath;
/**
* If the checkpoint is for persisting: the reader who's snapshotted state we are persisting. If
* the checkpoint is for restoring: {@literal null}. Not persisted in durable checkpoint.
* CAUTION: Between a checkpoint being taken and {@link #finalizeCheckpoint()} being called the
* 'true' active reader may have changed.
*/
@Nullable private PubsubReader reader;
/**
* If the checkpoint is for persisting: The ACK ids of messages which have been passed
* downstream since the last checkpoint. If the checkpoint is for restoring: {@literal null}.
* Not persisted in durable checkpoint.
*/
@Nullable private List<String> safeToAckIds;
/**
* If the checkpoint is for persisting: The ACK ids of messages which have been received from
* Pubsub but not yet passed downstream at the time of the snapshot. If the checkpoint is for
* restoring: Same, but recovered from durable storage.
*/
@VisibleForTesting final List<String> notYetReadIds;
public PubsubCheckpoint(
@Nullable String subscriptionPath,
@Nullable PubsubReader reader,
@Nullable List<String> safeToAckIds,
List<String> notYetReadIds) {
this.subscriptionPath = subscriptionPath;
this.reader = reader;
this.safeToAckIds = safeToAckIds;
this.notYetReadIds = notYetReadIds;
}
@Nullable
private SubscriptionPath getSubscription() {
return subscriptionPath == null
? null
: PubsubClient.subscriptionPathFromPath(subscriptionPath);
}
/**
* BLOCKING All messages which have been passed downstream have now been durably committed. We
* can ACK them upstream. CAUTION: This may never be called.
*/
@Override
public void finalizeCheckpoint() throws IOException {
checkState(reader != null && safeToAckIds != null, "Cannot finalize a restored checkpoint");
// Even if the 'true' active reader has changed since the checkpoint was taken we are
// fine:
// - The underlying Pubsub topic will not have changed, so the following ACKs will still
// go to the right place.
// - We'll delete the ACK ids from the readers in-flight state, but that only effects
// flow control and stats, neither of which are relevant anymore.
try {
int n = safeToAckIds.size();
List<String> batchSafeToAckIds = new ArrayList<>(Math.min(n, ACK_BATCH_SIZE));
for (String ackId : safeToAckIds) {
batchSafeToAckIds.add(ackId);
if (batchSafeToAckIds.size() >= ACK_BATCH_SIZE) {
reader.ackBatch(batchSafeToAckIds);
n -= batchSafeToAckIds.size();
// CAUTION: Don't reuse the same list since ackBatch holds on to it.
batchSafeToAckIds = new ArrayList<>(Math.min(n, ACK_BATCH_SIZE));
}
}
if (!batchSafeToAckIds.isEmpty()) {
reader.ackBatch(batchSafeToAckIds);
}
} finally {
int remainingInFlight = reader.numInFlightCheckpoints.decrementAndGet();
checkState(remainingInFlight >= 0, "Miscounted in-flight checkpoints");
reader.maybeCloseClient();
reader = null;
safeToAckIds = null;
}
}
/** Return current time according to {@code reader}. */
private static long now(PubsubReader reader) {
if (reader.outer.outer.clock == null) {
return System.currentTimeMillis();
} else {
return reader.outer.outer.clock.currentTimeMillis();
}
}
/**
* BLOCKING NACK all messages which have been read from Pubsub but not passed downstream. This
* way Pubsub will send them again promptly.
*/
public void nackAll(PubsubReader reader) throws IOException {
checkState(this.reader == null, "Cannot nackAll on persisting checkpoint");
List<String> batchYetToAckIds =
new ArrayList<>(Math.min(notYetReadIds.size(), ACK_BATCH_SIZE));
for (String ackId : notYetReadIds) {
batchYetToAckIds.add(ackId);
if (batchYetToAckIds.size() >= ACK_BATCH_SIZE) {
long nowMsSinceEpoch = now(reader);
reader.nackBatch(nowMsSinceEpoch, batchYetToAckIds);
batchYetToAckIds.clear();
}
}
if (!batchYetToAckIds.isEmpty()) {
long nowMsSinceEpoch = now(reader);
reader.nackBatch(nowMsSinceEpoch, batchYetToAckIds);
}
}
}
/** The coder for our checkpoints. */
private static class PubsubCheckpointCoder<T> extends AtomicCoder<PubsubCheckpoint> {
private static final Coder<String> SUBSCRIPTION_PATH_CODER =
NullableCoder.of(StringUtf8Coder.of());
private static final Coder<List<String>> LIST_CODER = ListCoder.of(StringUtf8Coder.of());
public static <T> PubsubCheckpointCoder<T> of() {
return new PubsubCheckpointCoder<>();
}
private PubsubCheckpointCoder() {}
@Override
public void encode(PubsubCheckpoint value, OutputStream outStream) throws IOException {
SUBSCRIPTION_PATH_CODER.encode(value.subscriptionPath, outStream);
LIST_CODER.encode(value.notYetReadIds, outStream);
}
@Override
public PubsubCheckpoint decode(InputStream inStream) throws IOException {
String path = SUBSCRIPTION_PATH_CODER.decode(inStream);
List<String> notYetReadIds = LIST_CODER.decode(inStream);
return new PubsubCheckpoint(path, null, null, notYetReadIds);
}
}
// ================================================================================
// Reader
// ================================================================================
/**
* A reader which keeps track of which messages have been received from Pubsub but not yet
* consumed downstream and/or ACKed back to Pubsub.
*/
@VisibleForTesting
static class PubsubReader extends UnboundedSource.UnboundedReader<PubsubMessage> {
/** For access to topic and checkpointCoder. */
private final PubsubSource outer;
@VisibleForTesting final SubscriptionPath subscription;
/** Client on which to talk to Pubsub. Contains a null value if the client has been closed. */
private AtomicReference<PubsubClient> pubsubClient;
/**
* The closed state of this {@link PubsubReader}. If true, the reader has not yet been closed,
* and it will have a non-null value within {@link #pubsubClient}.
*/
private AtomicBoolean active = new AtomicBoolean(true);
/**
* Ack timeout, in ms, as set on subscription when we first start reading. Not updated
* thereafter. -1 if not yet determined.
*/
private int ackTimeoutMs;
/** ACK ids of messages we have delivered downstream but not yet ACKed. */
private Set<String> safeToAckIds;
/**
* Messages we have received from Pubsub and not yet delivered downstream. We preserve their
* order.
*/
private final Queue<PubsubClient.IncomingMessage> notYetRead;
private static class InFlightState {
/** When request which yielded message was issues. */
long requestTimeMsSinceEpoch;
/**
* When Pubsub will consider this message's ACK to timeout and thus it needs to be extended.
*/
long ackDeadlineMsSinceEpoch;
public InFlightState(long requestTimeMsSinceEpoch, long ackDeadlineMsSinceEpoch) {
this.requestTimeMsSinceEpoch = requestTimeMsSinceEpoch;
this.ackDeadlineMsSinceEpoch = ackDeadlineMsSinceEpoch;
}
}
/**
* Map from ACK ids of messages we have received from Pubsub but not yet ACKed to their in
* flight state. Ordered from earliest to latest ACK deadline.
*/
private final LinkedHashMap<String, InFlightState> inFlight;
/**
* Batches of successfully ACKed ids which need to be pruned from the above. CAUTION: Accessed
* by both reader and checkpointing threads.
*/
private final Queue<List<String>> ackedIds;
/** Byte size of undecoded elements in {@link #notYetRead}. */
private long notYetReadBytes;
/**
* Bucketed map from received time (as system time, ms since epoch) to message timestamps
* (mssince epoch) of all received but not-yet read messages. Used to estimate watermark.
*/
private BucketingFunction minUnreadTimestampMsSinceEpoch;
/**
* Minimum of timestamps (ms since epoch) of all recently read messages. Used to estimate
* watermark.
*/
private MovingFunction minReadTimestampMsSinceEpoch;
/**
* System time (ms since epoch) we last received a message from Pubsub, or -1 if not yet
* received any messages.
*/
private long lastReceivedMsSinceEpoch;
/** The last reported watermark (ms since epoch), or beginning of time if none yet reported. */
private long lastWatermarkMsSinceEpoch;
/** The current message, or {@literal null} if none. */
@Nullable private PubsubClient.IncomingMessage current;
/** Stats only: System time (ms since epoch) we last logs stats, or -1 if never. */
private long lastLogTimestampMsSinceEpoch;
/** Stats only: Total number of messages received. */
private long numReceived;
/** Stats only: Number of messages which have recently been received. */
private MovingFunction numReceivedRecently;
/** Stats only: Number of messages which have recently had their deadline extended. */
private MovingFunction numExtendedDeadlines;
/**
* Stats only: Number of messages which have recenttly had their deadline extended even though
* it may be too late to do so.
*/
private MovingFunction numLateDeadlines;
/** Stats only: Number of messages which have recently been ACKed. */
private MovingFunction numAcked;
/**
* Stats only: Number of messages which have recently expired (ACKs were extended for too long).
*/
private MovingFunction numExpired;
/** Stats only: Number of messages which have recently been NACKed. */
private MovingFunction numNacked;
/** Stats only: Number of message bytes which have recently been read by downstream consumer. */
private MovingFunction numReadBytes;
/**
* Stats only: Minimum of timestamp (ms since epoch) of all recently received messages. Used to
* estimate timestamp skew. Does not contribute to watermark estimator.
*/
private MovingFunction minReceivedTimestampMsSinceEpoch;
/**
* Stats only: Maximum of timestamp (ms since epoch) of all recently received messages. Used to
* estimate timestamp skew.
*/
private MovingFunction maxReceivedTimestampMsSinceEpoch;
/** Stats only: Minimum of recent estimated watermarks (ms since epoch). */
private MovingFunction minWatermarkMsSinceEpoch;
/** Stats ony: Maximum of recent estimated watermarks (ms since epoch). */
private MovingFunction maxWatermarkMsSinceEpoch;
/**
* Stats only: Number of messages with timestamps strictly behind the estimated watermark at the
* time they are received. These may be considered 'late' by downstream computations.
*/
private MovingFunction numLateMessages;
/**
* Stats only: Current number of checkpoints in flight. CAUTION: Accessed by both checkpointing
* and reader threads.
*/
private AtomicInteger numInFlightCheckpoints;
/** Stats only: Maximum number of checkpoints in flight at any time. */
private int maxInFlightCheckpoints;
private static MovingFunction newFun(Combine.BinaryCombineLongFn function) {
return new MovingFunction(
SAMPLE_PERIOD.getMillis(),
SAMPLE_UPDATE.getMillis(),
MIN_WATERMARK_SPREAD,
MIN_WATERMARK_MESSAGES,
function);
}
/** Construct a reader. */
public PubsubReader(PubsubOptions options, PubsubSource outer, SubscriptionPath subscription)
throws IOException, GeneralSecurityException {
this.outer = outer;
this.subscription = subscription;
pubsubClient =
new AtomicReference<>(
outer.outer.pubsubFactory.newClient(
outer.outer.timestampAttribute, outer.outer.idAttribute, options));
ackTimeoutMs = -1;
safeToAckIds = new HashSet<>();
notYetRead = new ArrayDeque<>();
inFlight = new LinkedHashMap<>();
ackedIds = new ConcurrentLinkedQueue<>();
notYetReadBytes = 0;
minUnreadTimestampMsSinceEpoch =
new BucketingFunction(
SAMPLE_UPDATE.getMillis(), MIN_WATERMARK_SPREAD, MIN_WATERMARK_MESSAGES, MIN);
minReadTimestampMsSinceEpoch = newFun(MIN);
lastReceivedMsSinceEpoch = -1;
lastWatermarkMsSinceEpoch = BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis();
current = null;
lastLogTimestampMsSinceEpoch = -1;
numReceived = 0L;
numReceivedRecently = newFun(SUM);
numExtendedDeadlines = newFun(SUM);
numLateDeadlines = newFun(SUM);
numAcked = newFun(SUM);
numExpired = newFun(SUM);
numNacked = newFun(SUM);
numReadBytes = newFun(SUM);
minReceivedTimestampMsSinceEpoch = newFun(MIN);
maxReceivedTimestampMsSinceEpoch = newFun(MAX);
minWatermarkMsSinceEpoch = newFun(MIN);
maxWatermarkMsSinceEpoch = newFun(MAX);
numLateMessages = newFun(SUM);
numInFlightCheckpoints = new AtomicInteger();
maxInFlightCheckpoints = 0;
}
@VisibleForTesting
PubsubClient getPubsubClient() {
return pubsubClient.get();
}
/**
* Acks the provided {@code ackIds} back to Pubsub, blocking until all of the messages are
* ACKed.
*
* <p>CAUTION: May be invoked from a separate thread.
*
* <p>CAUTION: Retains {@code ackIds}.
*/
void ackBatch(List<String> ackIds) throws IOException {
pubsubClient.get().acknowledge(subscription, ackIds);
ackedIds.add(ackIds);
}
/**
* BLOCKING NACK (ie request deadline extension of 0) receipt of messages from Pubsub with the
* given {@code ockIds}. Does not retain {@code ackIds}.
*/
public void nackBatch(long nowMsSinceEpoch, List<String> ackIds) throws IOException {
pubsubClient.get().modifyAckDeadline(subscription, ackIds, 0);
numNacked.add(nowMsSinceEpoch, ackIds.size());
}
/**
* BLOCKING Extend the processing deadline for messages from Pubsub with the given {@code
* ackIds}. Does not retain {@code ackIds}.
*/
private void extendBatch(long nowMsSinceEpoch, List<String> ackIds) throws IOException {
int extensionSec = (ackTimeoutMs * ACK_EXTENSION_PCT) / (100 * 1000);
pubsubClient.get().modifyAckDeadline(subscription, ackIds, extensionSec);
numExtendedDeadlines.add(nowMsSinceEpoch, ackIds.size());
}
/** Return the current time, in ms since epoch. */
private long now() {
if (outer.outer.clock == null) {
return System.currentTimeMillis();
} else {
return outer.outer.clock.currentTimeMillis();
}
}
/**
* Messages which have been ACKed (via the checkpoint finalize) are no longer in flight. This is
* only used for flow control and stats.
*/
private void retire() throws IOException {
long nowMsSinceEpoch = now();
while (true) {
List<String> ackIds = ackedIds.poll();
if (ackIds == null) {
return;
}
numAcked.add(nowMsSinceEpoch, ackIds.size());
for (String ackId : ackIds) {
inFlight.remove(ackId);
safeToAckIds.remove(ackId);
}
}
}
/**
* BLOCKING Extend deadline for all messages which need it. CAUTION: If extensions can't keep up
* with wallclock then we'll never return.
*/
private void extend() throws IOException {
while (true) {
long nowMsSinceEpoch = now();
List<String> assumeExpired = new ArrayList<>();
List<String> toBeExtended = new ArrayList<>();
List<String> toBeExpired = new ArrayList<>();
// Messages will be in increasing deadline order.
for (Map.Entry<String, InFlightState> entry : inFlight.entrySet()) {
if (entry.getValue().ackDeadlineMsSinceEpoch - (ackTimeoutMs * ACK_SAFETY_PCT) / 100
> nowMsSinceEpoch) {
// All remaining messages don't need their ACKs to be extended.
break;
}
if (entry.getValue().ackDeadlineMsSinceEpoch - ACK_TOO_LATE.getMillis()
< nowMsSinceEpoch) {
// Pubsub may have already considered this message to have expired.
// If so it will (eventually) be made available on a future pull request.
// If this message ends up being committed then it will be considered a duplicate
// when re-pulled.
assumeExpired.add(entry.getKey());
continue;
}
if (entry.getValue().requestTimeMsSinceEpoch + PROCESSING_TIMEOUT.getMillis()
< nowMsSinceEpoch) {
// This message has been in-flight for too long.
// Give up on it, otherwise we risk extending its ACK indefinitely.
toBeExpired.add(entry.getKey());
continue;
}
// Extend the ACK for this message.
toBeExtended.add(entry.getKey());
if (toBeExtended.size() >= ACK_BATCH_SIZE) {
// Enough for one batch.
break;
}
}
if (assumeExpired.isEmpty() && toBeExtended.isEmpty() && toBeExpired.isEmpty()) {
// Nothing to be done.
return;
}
if (!assumeExpired.isEmpty()) {
// If we didn't make the ACK deadline assume expired and no longer in flight.
numLateDeadlines.add(nowMsSinceEpoch, assumeExpired.size());
for (String ackId : assumeExpired) {
inFlight.remove(ackId);
}
}
if (!toBeExpired.isEmpty()) {
// Expired messages are no longer considered in flight.
numExpired.add(nowMsSinceEpoch, toBeExpired.size());
for (String ackId : toBeExpired) {
inFlight.remove(ackId);
}
}
if (!toBeExtended.isEmpty()) {
// Pubsub extends acks from it's notion of current time.
// We'll try to track that on our side, but note the deadlines won't necessarily agree.
long newDeadlineMsSinceEpoch = nowMsSinceEpoch + (ackTimeoutMs * ACK_EXTENSION_PCT) / 100;
for (String ackId : toBeExtended) {
// Maintain increasing ack deadline order.
InFlightState state = inFlight.remove(ackId);
inFlight.put(
ackId, new InFlightState(state.requestTimeMsSinceEpoch, newDeadlineMsSinceEpoch));
}
// BLOCKs until extended.
extendBatch(nowMsSinceEpoch, toBeExtended);
}
}
}
/** BLOCKING Fetch another batch of messages from Pubsub. */
private void pull() throws IOException {
if (inFlight.size() >= MAX_IN_FLIGHT) {
// Wait for checkpoint to be finalized before pulling anymore.
// There may be lag while checkpoints are persisted and the finalizeCheckpoint method
// is invoked. By limiting the in-flight messages we can ensure we don't end up consuming
// messages faster than we can checkpoint them.
return;
}
long requestTimeMsSinceEpoch = now();
long deadlineMsSinceEpoch = requestTimeMsSinceEpoch + ackTimeoutMs;
// Pull the next batch.
// BLOCKs until received.
Collection<PubsubClient.IncomingMessage> receivedMessages =
pubsubClient.get().pull(requestTimeMsSinceEpoch, subscription, PULL_BATCH_SIZE, true);
if (receivedMessages.isEmpty()) {
// Nothing available yet. Try again later.
return;
}
lastReceivedMsSinceEpoch = requestTimeMsSinceEpoch;
// Capture the received messages.
for (PubsubClient.IncomingMessage incomingMessage : receivedMessages) {
notYetRead.add(incomingMessage);
notYetReadBytes += incomingMessage.elementBytes.length;
inFlight.put(
incomingMessage.ackId,
new InFlightState(requestTimeMsSinceEpoch, deadlineMsSinceEpoch));
numReceived++;
numReceivedRecently.add(requestTimeMsSinceEpoch, 1L);
minReceivedTimestampMsSinceEpoch.add(
requestTimeMsSinceEpoch, incomingMessage.timestampMsSinceEpoch);
maxReceivedTimestampMsSinceEpoch.add(
requestTimeMsSinceEpoch, incomingMessage.timestampMsSinceEpoch);
minUnreadTimestampMsSinceEpoch.add(
requestTimeMsSinceEpoch, incomingMessage.timestampMsSinceEpoch);
}
}
/** Log stats if time to do so. */
private void stats() {
long nowMsSinceEpoch = now();
if (lastLogTimestampMsSinceEpoch < 0) {
lastLogTimestampMsSinceEpoch = nowMsSinceEpoch;
return;
}
long deltaMs = nowMsSinceEpoch - lastLogTimestampMsSinceEpoch;
if (deltaMs < LOG_PERIOD.getMillis()) {
return;
}
String messageSkew = "unknown";
long minTimestamp = minReceivedTimestampMsSinceEpoch.get(nowMsSinceEpoch);
long maxTimestamp = maxReceivedTimestampMsSinceEpoch.get(nowMsSinceEpoch);
if (minTimestamp < Long.MAX_VALUE && maxTimestamp > Long.MIN_VALUE) {
messageSkew = (maxTimestamp - minTimestamp) + "ms";
}
String watermarkSkew = "unknown";
long minWatermark = minWatermarkMsSinceEpoch.get(nowMsSinceEpoch);
long maxWatermark = maxWatermarkMsSinceEpoch.get(nowMsSinceEpoch);
if (minWatermark < Long.MAX_VALUE && maxWatermark > Long.MIN_VALUE) {
watermarkSkew = (maxWatermark - minWatermark) + "ms";
}
String oldestInFlight = "no";
String oldestAckId = Iterables.getFirst(inFlight.keySet(), null);
if (oldestAckId != null) {
oldestInFlight =
(nowMsSinceEpoch - inFlight.get(oldestAckId).requestTimeMsSinceEpoch) + "ms";
}
LOG.debug(
"Pubsub {} has "
+ "{} received messages, "
+ "{} current unread messages, "
+ "{} current unread bytes, "
+ "{} current in-flight msgs, "
+ "{} oldest in-flight, "
+ "{} current in-flight checkpoints, "
+ "{} max in-flight checkpoints, "
+ "{}B/s recent read, "
+ "{} recent received, "
+ "{} recent extended, "
+ "{} recent late extended, "
+ "{} recent ACKed, "
+ "{} recent NACKed, "
+ "{} recent expired, "
+ "{} recent message timestamp skew, "
+ "{} recent watermark skew, "
+ "{} recent late messages, "
+ "{} last reported watermark",
subscription,
numReceived,
notYetRead.size(),
notYetReadBytes,
inFlight.size(),
oldestInFlight,
numInFlightCheckpoints.get(),
maxInFlightCheckpoints,
numReadBytes.get(nowMsSinceEpoch) / (SAMPLE_PERIOD.getMillis() / 1000L),
numReceivedRecently.get(nowMsSinceEpoch),
numExtendedDeadlines.get(nowMsSinceEpoch),
numLateDeadlines.get(nowMsSinceEpoch),
numAcked.get(nowMsSinceEpoch),
numNacked.get(nowMsSinceEpoch),
numExpired.get(nowMsSinceEpoch),
messageSkew,
watermarkSkew,
numLateMessages.get(nowMsSinceEpoch),
new Instant(lastWatermarkMsSinceEpoch));
lastLogTimestampMsSinceEpoch = nowMsSinceEpoch;
}
@Override
public boolean start() throws IOException {
// Determine the ack timeout.
ackTimeoutMs = pubsubClient.get().ackDeadlineSeconds(subscription) * 1000;
return advance();
}
/**
* BLOCKING Return {@literal true} if a Pubsub messaage is available, {@literal false} if none
* is available at this time or we are over-subscribed. May BLOCK while extending ACKs or
* fetching available messages. Will not block waiting for messages.
*/
@Override
public boolean advance() throws IOException {
// Emit stats.
stats();
if (current != null) {
// Current is consumed. It can no longer contribute to holding back the watermark.
minUnreadTimestampMsSinceEpoch.remove(current.requestTimeMsSinceEpoch);
current = null;
}
// Retire state associated with ACKed messages.
retire();
// Extend all pressing deadlines.
// Will BLOCK until done.
// If the system is pulling messages only to let them sit in a downsteam queue then
// this will have the effect of slowing down the pull rate.
// However, if the system is genuinely taking longer to process each message then
// the work to extend ACKs would be better done in the background.
extend();
if (notYetRead.isEmpty()) {
// Pull another batch.
// Will BLOCK until fetch returns, but will not block until a message is available.
pull();
}
// Take one message from queue.
current = notYetRead.poll();
if (current == null) {
// Try again later.
return false;
}
notYetReadBytes -= current.elementBytes.length;
checkState(notYetReadBytes >= 0);
long nowMsSinceEpoch = now();
numReadBytes.add(nowMsSinceEpoch, current.elementBytes.length);
minReadTimestampMsSinceEpoch.add(nowMsSinceEpoch, current.timestampMsSinceEpoch);
if (current.timestampMsSinceEpoch < lastWatermarkMsSinceEpoch) {
numLateMessages.add(nowMsSinceEpoch, 1L);
}
// Current message can be considered 'read' and will be persisted by the next
// checkpoint. So it is now safe to ACK back to Pubsub.
safeToAckIds.add(current.ackId);
return true;
}
@Override
public PubsubMessage getCurrent() throws NoSuchElementException {
if (current == null) {
throw new NoSuchElementException();
}
return new PubsubMessage(current.elementBytes, current.attributes);
}
@Override
public Instant getCurrentTimestamp() throws NoSuchElementException {
if (current == null) {
throw new NoSuchElementException();
}
return new Instant(current.timestampMsSinceEpoch);
}
@Override
public byte[] getCurrentRecordId() throws NoSuchElementException {
if (current == null) {
throw new NoSuchElementException();
}
return current.recordId.getBytes(StandardCharsets.UTF_8);
}
/**
* {@inheritDoc}.
*
* <p>Marks this {@link PubsubReader} as no longer active. The {@link PubsubClient} continue to
* exist and be active beyond the life of this call if there are any in-flight checkpoints. When
* no in-flight checkpoints remain, the reader will be closed.
*/
@Override
public void close() throws IOException {
active.set(false);
maybeCloseClient();
}
/**
* Close this reader's underlying {@link PubsubClient} if the reader has been closed and there
* are no outstanding checkpoints.
*/
private void maybeCloseClient() throws IOException {
if (!active.get() && numInFlightCheckpoints.get() == 0) {
// The reader has been closed and it has no more outstanding checkpoints. The client
// must be closed so it doesn't leak
PubsubClient client = pubsubClient.getAndSet(null);
if (client != null) {
client.close();
}
}
}
@Override
public PubsubSource getCurrentSource() {
return outer;
}
@Override
public Instant getWatermark() {
if (pubsubClient.get().isEOF() && notYetRead.isEmpty()) {
// For testing only: Advance the watermark to the end of time to signal
// the test is complete.
return BoundedWindow.TIMESTAMP_MAX_VALUE;
}
// NOTE: We'll allow the watermark to go backwards. The underlying runner is responsible
// for aggregating all reported watermarks and ensuring the aggregate is latched.
// If we attempt to latch locally then it is possible a temporary starvation of one reader
// could cause its estimated watermark to fast forward to current system time. Then when
// the reader resumes its watermark would be unable to resume tracking.
// By letting the underlying runner latch we avoid any problems due to localized starvation.
long nowMsSinceEpoch = now();
long readMin = minReadTimestampMsSinceEpoch.get(nowMsSinceEpoch);
long unreadMin = minUnreadTimestampMsSinceEpoch.get();
if (readMin == Long.MAX_VALUE
&& unreadMin == Long.MAX_VALUE
&& lastReceivedMsSinceEpoch >= 0
&& nowMsSinceEpoch > lastReceivedMsSinceEpoch + SAMPLE_PERIOD.getMillis()) {
// We don't currently have any unread messages pending, we have not had any messages
// read for a while, and we have not received any new messages from Pubsub for a while.
// Advance watermark to current time.
// TODO: Estimate a timestamp lag.
lastWatermarkMsSinceEpoch = nowMsSinceEpoch;
} else if (minReadTimestampMsSinceEpoch.isSignificant()
|| minUnreadTimestampMsSinceEpoch.isSignificant()) {
// Take minimum of the timestamps in all unread messages and recently read messages.
lastWatermarkMsSinceEpoch = Math.min(readMin, unreadMin);
}
// else: We're not confident enough to estimate a new watermark. Stick with the old one.
minWatermarkMsSinceEpoch.add(nowMsSinceEpoch, lastWatermarkMsSinceEpoch);
maxWatermarkMsSinceEpoch.add(nowMsSinceEpoch, lastWatermarkMsSinceEpoch);
return new Instant(lastWatermarkMsSinceEpoch);
}
@Override
public PubsubCheckpoint getCheckpointMark() {
int cur = numInFlightCheckpoints.incrementAndGet();
maxInFlightCheckpoints = Math.max(maxInFlightCheckpoints, cur);
// It's possible for a checkpoint to be taken but never finalized.
// So we simply copy whatever safeToAckIds we currently have.
// It's possible a later checkpoint will be taken before an earlier one is finalized,
// in which case we'll double ACK messages to Pubsub. However Pubsub is fine with that.
List<String> snapshotSafeToAckIds = Lists.newArrayList(safeToAckIds);
List<String> snapshotNotYetReadIds = new ArrayList<>(notYetRead.size());
for (PubsubClient.IncomingMessage incomingMessage : notYetRead) {
snapshotNotYetReadIds.add(incomingMessage.ackId);
}
if (outer.subscriptionPath == null) {
// need to include the subscription in case we resume, as it's not stored in the source.
return new PubsubCheckpoint(
subscription.getPath(), this, snapshotSafeToAckIds, snapshotNotYetReadIds);
}
return new PubsubCheckpoint(null, this, snapshotSafeToAckIds, snapshotNotYetReadIds);
}
@Override
public long getSplitBacklogBytes() {
return notYetReadBytes;
}
}
// ================================================================================
// Source
// ================================================================================
@VisibleForTesting
static class PubsubSource extends UnboundedSource<PubsubMessage, PubsubCheckpoint> {
public final PubsubUnboundedSource outer;
// The subscription to read from.
@VisibleForTesting final ValueProvider<SubscriptionPath> subscriptionPath;
public PubsubSource(PubsubUnboundedSource outer) {
this(outer, outer.getSubscriptionProvider());
}
private PubsubSource(
PubsubUnboundedSource outer, ValueProvider<SubscriptionPath> subscriptionPath) {
this.outer = outer;
this.subscriptionPath = subscriptionPath;
}
@Override
public List<PubsubSource> split(int desiredNumSplits, PipelineOptions options)
throws Exception {
List<PubsubSource> result = new ArrayList<>(desiredNumSplits);
PubsubSource splitSource = this;
if (subscriptionPath == null) {
splitSource =
new PubsubSource(
outer, StaticValueProvider.of(outer.createRandomSubscription(options)));
}
for (int i = 0; i < desiredNumSplits * SCALE_OUT; i++) {
// Since the source is immutable and Pubsub automatically shards we simply
// replicate ourselves the requested number of times
result.add(splitSource);
}
return result;
}
@Override
public PubsubReader createReader(
PipelineOptions options, @Nullable PubsubCheckpoint checkpoint) {
PubsubReader reader;
SubscriptionPath subscription;
if (subscriptionPath == null || subscriptionPath.get() == null) {
if (checkpoint == null) {
// This reader has never been started and there was no call to #split;
// create a single random subscription, which will be kept in the checkpoint.
subscription = outer.createRandomSubscription(options);
} else {
subscription = checkpoint.getSubscription();
}
} else {
subscription = subscriptionPath.get();
}
try {
reader = new PubsubReader(options.as(PubsubOptions.class), this, subscription);
} catch (GeneralSecurityException | IOException e) {
throw new RuntimeException("Unable to subscribe to " + subscriptionPath + ": ", e);
}
if (checkpoint != null) {
// NACK all messages we may have lost.
try {
// Will BLOCK until NACKed.
checkpoint.nackAll(reader);
} catch (IOException e) {
LOG.error(
"Pubsub {} cannot have {} lost messages NACKed, ignoring: {}",
subscriptionPath,
checkpoint.notYetReadIds.size(),
e);
}
}
return reader;
}
@Nullable
@Override
public Coder<PubsubCheckpoint> getCheckpointMarkCoder() {
return CHECKPOINT_CODER;
}
@Override
public Coder<PubsubMessage> getOutputCoder() {
return outer.getNeedsAttributes()
? PubsubMessageWithAttributesCoder.of()
: PubsubMessagePayloadOnlyCoder.of();
}
@Override
public void validate() {
// Nothing to validate.
}
@Override
public boolean requiresDeduping() {
// We cannot prevent re-offering already read messages after a restore from checkpoint.
return true;
}
}
// ================================================================================
// StatsFn
// ================================================================================
private static class StatsFn extends DoFn<PubsubMessage, PubsubMessage> {
private final Counter elementCounter = SourceMetrics.elementsRead();
private final PubsubClientFactory pubsubFactory;
@Nullable private final ValueProvider<SubscriptionPath> subscription;
@Nullable private final ValueProvider<TopicPath> topic;
@Nullable private final String timestampAttribute;
@Nullable private final String idAttribute;
public StatsFn(
PubsubClientFactory pubsubFactory,
@Nullable ValueProvider<SubscriptionPath> subscription,
@Nullable ValueProvider<TopicPath> topic,
@Nullable String timestampAttribute,
@Nullable String idAttribute) {
checkArgument(pubsubFactory != null, "pubsubFactory should not be null");
this.pubsubFactory = pubsubFactory;
this.subscription = subscription;
this.topic = topic;
this.timestampAttribute = timestampAttribute;
this.idAttribute = idAttribute;
}
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
elementCounter.inc();
c.output(c.element());
}
@Override
public void populateDisplayData(Builder builder) {
super.populateDisplayData(builder);
builder
.addIfNotNull(DisplayData.item("subscription", subscription))
.addIfNotNull(DisplayData.item("topic", topic))
.add(DisplayData.item("transport", pubsubFactory.getKind()))
.addIfNotNull(DisplayData.item("timestampAttribute", timestampAttribute))
.addIfNotNull(DisplayData.item("idAttribute", idAttribute));
}
}
// ================================================================================
// PubsubUnboundedSource
// ================================================================================
/** For testing only: Clock to use for all timekeeping. If {@literal null} use system clock. */
@Nullable private Clock clock;
/** Factory for creating underlying Pubsub transport. */
private final PubsubClientFactory pubsubFactory;
/** Project under which to create a subscription if only the {@link #topic} was given. */
@Nullable private final ValueProvider<ProjectPath> project;
/**
* Topic to read from. If {@literal null}, then {@link #subscription} must be given. Otherwise
* {@link #subscription} must be null.
*/
@Nullable private final ValueProvider<TopicPath> topic;
/**
* Subscription to read from. If {@literal null} then {@link #topic} must be given. Otherwise
* {@link #topic} must be null.
*
* <p>If no subscription is given a random one will be created when the transorm is applied. This
* field will be update with that subscription's path. The created subscription is never deleted.
*/
@Nullable private ValueProvider<SubscriptionPath> subscription;
/**
* Pubsub metadata field holding timestamp of each element, or {@literal null} if should use
* Pubsub message publish timestamp instead.
*/
@Nullable private final String timestampAttribute;
/**
* Pubsub metadata field holding id for each element, or {@literal null} if need to generate a
* unique id ourselves.
*/
@Nullable private final String idAttribute;
/** Whether this source should load the attributes of the PubsubMessage, or only the payload. */
private final boolean needsAttributes;
@VisibleForTesting
PubsubUnboundedSource(
Clock clock,
PubsubClientFactory pubsubFactory,
@Nullable ValueProvider<ProjectPath> project,
@Nullable ValueProvider<TopicPath> topic,
@Nullable ValueProvider<SubscriptionPath> subscription,
@Nullable String timestampAttribute,
@Nullable String idAttribute,
boolean needsAttributes) {
checkArgument(
(topic == null) != (subscription == null),
"Exactly one of topic and subscription must be given");
this.clock = clock;
this.pubsubFactory = checkNotNull(pubsubFactory);
this.project = project;
this.topic = topic;
this.subscription = subscription;
this.timestampAttribute = timestampAttribute;
this.idAttribute = idAttribute;
this.needsAttributes = needsAttributes;
}
/** Construct an unbounded source to consume from the Pubsub {@code subscription}. */
public PubsubUnboundedSource(
PubsubClientFactory pubsubFactory,
@Nullable ValueProvider<ProjectPath> project,
@Nullable ValueProvider<TopicPath> topic,
@Nullable ValueProvider<SubscriptionPath> subscription,
@Nullable String timestampAttribute,
@Nullable String idAttribute,
boolean needsAttributes) {
this(
null,
pubsubFactory,
project,
topic,
subscription,
timestampAttribute,
idAttribute,
needsAttributes);
}
/** Get the project path. */
@Nullable
public ProjectPath getProject() {
return project == null ? null : project.get();
}
/** Get the topic being read from. */
@Nullable
public TopicPath getTopic() {
return topic == null ? null : topic.get();
}
/** Get the {@link ValueProvider} for the topic being read from. */
@Nullable
public ValueProvider<TopicPath> getTopicProvider() {
return topic;
}
/** Get the subscription being read from. */
@Nullable
public SubscriptionPath getSubscription() {
return subscription == null ? null : subscription.get();
}
/** Get the {@link ValueProvider} for the subscription being read from. */
@Nullable
public ValueProvider<SubscriptionPath> getSubscriptionProvider() {
return subscription;
}
/** Get the timestamp attribute. */
@Nullable
public String getTimestampAttribute() {
return timestampAttribute;
}
/** Get the id attribute. */
@Nullable
public String getIdAttribute() {
return idAttribute;
}
public boolean getNeedsAttributes() {
return needsAttributes;
}
@Override
public PCollection<PubsubMessage> expand(PBegin input) {
return input
.getPipeline()
.begin()
.apply(Read.from(new PubsubSource(this)))
.apply(
"PubsubUnboundedSource.Stats",
ParDo.of(
new StatsFn(pubsubFactory, subscription, topic, timestampAttribute, idAttribute)));
}
private SubscriptionPath createRandomSubscription(PipelineOptions options) {
TopicPath topicPath = topic.get();
ProjectPath projectPath;
if (project != null) {
projectPath = project.get();
} else {
String projectId = options.as(GcpOptions.class).getProject();
checkState(
projectId != null,
"Cannot create subscription to topic %s because pipeline option 'project' not specified",
topicPath);
projectPath = PubsubClient.projectPathFromId(options.as(GcpOptions.class).getProject());
}
try {
try (PubsubClient pubsubClient =
pubsubFactory.newClient(
timestampAttribute, idAttribute, options.as(PubsubOptions.class))) {
SubscriptionPath subscriptionPath =
pubsubClient.createRandomSubscription(projectPath, topicPath, DEAULT_ACK_TIMEOUT_SEC);
LOG.warn(
"Created subscription {} to topic {}."
+ " Note this subscription WILL NOT be deleted when the pipeline terminates",
subscriptionPath,
topic);
return subscriptionPath;
}
} catch (Exception e) {
throw new RuntimeException(
String.format(
"Failed to create subscription to topic %s on project %s: %s",
topicPath, projectPath, e.getMessage()),
e);
}
}
}