blob: da9aaaa03ac312b48ddf95c3df7055d4c2c67f4d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.gcp.pubsublite;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.proto.ComputeMessageStatsResponse;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.transforms.splittabledofn.SplitResult;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Stopwatch;
import org.joda.time.Duration;
/**
* OffsetByteRangeTracker is an unbounded restriction tracker for Pub/Sub lite partitions that
* tracks offsets for checkpointing and bytes for progress.
*
* <p>Any valid instance of an OffsetByteRangeTracker tracks one of exactly two types of ranges: -
* Unbounded ranges whose last offset is Long.MAX_VALUE - Completed ranges that are either empty
* (From == To) or fully claimed (lastClaimed == To - 1)
*
* <p>Also prevents splitting until minTrackingTime has passed or minBytesReceived have been
* received. IMPORTANT: minTrackingTime must be strictly smaller than the SDF read timeout when it
* would return ProcessContinuation.resume().
*/
class OffsetByteRangeTracker extends TrackerWithProgress {
private final TopicBacklogReader unownedBacklogReader;
private final Duration minTrackingTime;
private final long minBytesReceived;
private final Stopwatch stopwatch;
private OffsetByteRange range;
private @Nullable Long lastClaimed;
public OffsetByteRangeTracker(
OffsetByteRange range,
TopicBacklogReader unownedBacklogReader,
Stopwatch stopwatch,
Duration minTrackingTime,
long minBytesReceived) {
checkArgument(
range.getRange().getTo() == Long.MAX_VALUE,
"May only construct OffsetByteRangeTracker with an unbounded range with no progress.");
checkArgument(
range.getByteCount() == 0L,
"May only construct OffsetByteRangeTracker with an unbounded range with no progress.");
this.unownedBacklogReader = unownedBacklogReader;
this.minTrackingTime = minTrackingTime;
this.minBytesReceived = minBytesReceived;
this.stopwatch = stopwatch.reset().start();
this.range = range;
}
@Override
public IsBounded isBounded() {
return IsBounded.UNBOUNDED;
}
@Override
public boolean tryClaim(OffsetByteProgress position) {
long toClaim = position.lastOffset().value();
checkArgument(
lastClaimed == null || toClaim > lastClaimed,
"Trying to claim offset %s while last attempted was %s",
position.lastOffset().value(),
lastClaimed);
checkArgument(
toClaim >= range.getRange().getFrom(),
"Trying to claim offset %s before start of the range %s",
toClaim,
range);
// split() has already been called, truncating this range. No more offsets may be claimed.
if (range.getRange().getTo() != Long.MAX_VALUE) {
boolean isRangeEmpty = range.getRange().getTo() == range.getRange().getFrom();
boolean isValidClosedRange = nextOffset() == range.getRange().getTo();
checkState(
isRangeEmpty || isValidClosedRange,
"Violated class precondition: offset range improperly split. Please report a beam bug.");
return false;
}
lastClaimed = toClaim;
range = OffsetByteRange.of(range.getRange(), range.getByteCount() + position.batchBytes());
return true;
}
@Override
public OffsetByteRange currentRestriction() {
return range;
}
private long nextOffset() {
checkState(lastClaimed == null || lastClaimed < Long.MAX_VALUE);
return lastClaimed == null ? currentRestriction().getRange().getFrom() : lastClaimed + 1;
}
/**
* Whether the tracker has received enough data/been running for enough time that it can
* checkpoint and be confident it can get sufficient throughput.
*/
private boolean receivedEnough() {
Duration duration = Duration.millis(stopwatch.elapsed(TimeUnit.MILLISECONDS));
if (duration.isLongerThan(minTrackingTime)) {
return true;
}
if (currentRestriction().getByteCount() >= minBytesReceived) {
return true;
}
return false;
}
@Override
public @Nullable SplitResult<OffsetByteRange> trySplit(double fractionOfRemainder) {
// Cannot split a bounded range. This should already be completely claimed.
if (range.getRange().getTo() != Long.MAX_VALUE) {
return null;
}
if (!receivedEnough()) {
return null;
}
range =
OffsetByteRange.of(
new OffsetRange(currentRestriction().getRange().getFrom(), nextOffset()),
range.getByteCount());
return SplitResult.of(
this.range, OffsetByteRange.of(new OffsetRange(nextOffset(), Long.MAX_VALUE), 0));
}
@Override
@SuppressWarnings("unboxing.of.nullable")
public void checkDone() throws IllegalStateException {
if (range.getRange().getFrom() == range.getRange().getTo()) {
return;
}
checkState(
lastClaimed != null,
"Last attempted offset should not be null. No work was claimed in non-empty range %s.",
range);
long lastClaimedNotNull = checkNotNull(lastClaimed);
checkState(
lastClaimedNotNull >= range.getRange().getTo() - 1,
"Last attempted offset was %s in range %s, claiming work in [%s, %s) was not attempted",
lastClaimedNotNull,
range,
lastClaimedNotNull + 1,
range.getRange().getTo());
}
@Override
public Progress getProgress() {
ComputeMessageStatsResponse stats =
this.unownedBacklogReader.computeMessageStats(Offset.of(nextOffset()));
return Progress.from(range.getByteCount(), stats.getMessageBytes());
}
}