blob: 5eeb096bc3f29cd0395930165990b559401f8212 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.gcp.bigtable.changestreams.restriction;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.splittabledofn.SplitResult;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.checkerframework.checker.nullness.qual.Nullable;
/**
* RestrictionTracker used by {@link
* org.apache.beam.sdk.io.gcp.bigtable.changestreams.dofn.ReadChangeStreamPartitionDoFn} to keep
* track of the progress of the stream and to split the restriction for runner initiated
* checkpoints.
*
* <p>StreamProgress usually is a continuation token which represents a position in time of the
* stream of a specific partition. The token is used to resume streaming a partition.
*
* <p>On ChangeStreamMutation or Heartbeat response, the tracker will try to claim the continuation
* token from the response. The tracker stores that continuation token (wrapped in a StreamProgress)
* so that if the DoFn checkpoints or restarts, the token can be used to resume the stream.
*
* <p>The tracker will fail to claim a token if runner has initiated a checkpoint (by calling <code>
* trySplit(0)</code>). This signals to the DoFn to stop.
*
* <p>When runner initiates a checkpoint, the tracker returns null for the primary split and the
* residual split includes the entire token. The next time the DoFn try to claim a new
* StreamProgress, it will fail, and stop. The residual will be scheduled on a new DoFn to resume
* the work from the previous StreamProgress
*/
public class ReadChangeStreamPartitionProgressTracker
extends RestrictionTracker<StreamProgress, StreamProgress> {
StreamProgress streamProgress;
boolean shouldStop = false;
/**
* Constructs a restriction tracker with the streamProgress.
*
* @param streamProgress represents a position in time of the stream.
*/
public ReadChangeStreamPartitionProgressTracker(StreamProgress streamProgress) {
this.streamProgress = streamProgress;
}
/**
* This restriction tracker is for unbounded streams.
*
* @return {@link
* org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker.IsBounded.UNBOUNDED}
*/
@Override
public RestrictionTracker.IsBounded isBounded() {
return IsBounded.UNBOUNDED;
}
/**
* This is to signal to the runner that this restriction has completed. Throw an exception if
* there is more work to be done, and it should not stop. A restriction tracker stops after a
* runner initiated checkpoint or the streamProgress contains a closeStream response and not a
* token.
*
* @throws java.lang.IllegalStateException when the restriction is not done and there is more work
* to be done.
*/
@Override
public void checkDone() throws java.lang.IllegalStateException {
boolean done = shouldStop || streamProgress.getCloseStream() != null;
Preconditions.checkState(done, "There's more work to be done");
}
/**
* Claims a new StreamProgress to be processed. StreamProgress can either be a ContinuationToken
* or a CloseStream.
*
* <p>The claim fails if the runner has previously initiated a checkpoint. The restriction tracker
* respects the runner initiated checkpoint and fails to claim this streamProgress. The new split
* will start from the previously successfully claimed streamProgress.
*
* @param streamProgress position in time of the stream that is being claimed.
* @return true if claim was successful, otherwise false.
*/
@Override
public boolean tryClaim(StreamProgress streamProgress) {
if (shouldStop) {
return false;
}
// We perform copy instead of assignment because we want to ensure all references of
// streamProgress gets updated.
this.streamProgress = streamProgress;
return true;
}
/**
* Returns the streamProgress that was successfully claimed.
*
* @return the streamProgress that was successfully claimed.
*/
@Override
public StreamProgress currentRestriction() {
return streamProgress;
}
/**
* Splits the work that's left. Since the position in the stream isn't a contiguous value, we
* cannot estimate how much work is left or breakdown the work into smaller chunks. Therefore,
* there's no way to split the work. To conform to the API, we return null for the primary split
* and then continue the work on the residual split.
*
* <p>Also note that, we only accept checkpoints (fractionOfRemainder = 0). Any other value, we
* reject (by returning <code>null</code>) the request to split since StreamProgress cannot be
* broken down into fractions.
*
* @param fractionOfRemainder the fraction of work remaining, where 0 is a request to checkpoint
* current work.
* @return split result when fractionOfRemainder = 0, otherwise null.
*/
@Override
public @Nullable SplitResult<StreamProgress> trySplit(double fractionOfRemainder) {
// When asked for fractionOfRemainder = 0, which means the runner is asking for a
// split/checkpoint. We can terminate the current worker and continue the rest of the work in a
// new worker.
if (fractionOfRemainder == 0) {
// shouldStop is only true if we have trySplit before. We don't want to trySplit again if we
// have already returned to the runner a split result. Future split should return null. To
// think of it another way, after trySplit has been called, the primary restriction tracker is
// null. Trying to split it is impossible, so we are returning null.
if (shouldStop) {
return null;
}
shouldStop = true;
return SplitResult.of(null, streamProgress);
}
return null;
}
@Override
public String toString() {
return "CustomRestrictionTracker{"
+ "streamProgress="
+ streamProgress
+ ", shouldStop="
+ shouldStop
+ '}';
}
}