blob: 5f3784721e26f4e69f59b2b5a4341c7a65ef71b5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.gcp.bigtable.changestreams.action;
import static org.apache.beam.sdk.io.gcp.bigtable.changestreams.ByteStringRangeHelper.formatByteStringRange;
import com.google.cloud.bigtable.data.v2.models.ChangeStreamContinuationToken;
import com.google.cloud.bigtable.data.v2.models.ChangeStreamMutation;
import com.google.cloud.bigtable.data.v2.models.ChangeStreamRecord;
import com.google.cloud.bigtable.data.v2.models.CloseStream;
import com.google.cloud.bigtable.data.v2.models.Heartbeat;
import com.google.cloud.bigtable.data.v2.models.Range;
import com.google.protobuf.ByteString;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMetrics;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.TimestampConverter;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.model.PartitionRecord;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.restriction.StreamProgress;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.values.KV;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** This class is responsible for processing individual ChangeStreamRecord. */
@SuppressWarnings({"UnusedVariable", "UnusedMethod"})
public class ChangeStreamAction {
private static final Logger LOG = LoggerFactory.getLogger(ChangeStreamAction.class);
private final ChangeStreamMetrics metrics;
/**
* Constructs ChangeStreamAction to process individual ChangeStreamRecord.
*
* @param metrics record beam metrics.
*/
public ChangeStreamAction(ChangeStreamMetrics metrics) {
this.metrics = metrics;
}
/**
* This class processes ReadChangeStreamResponse from bigtable server. There are 3 possible
* response types, Heartbeat, ChangeStreamMutation, CloseStream.
*
* <ul>
* <li>Heartbeat happens periodically based on the initial configuration set at the start of the
* beam pipeline. Heartbeat can advance the watermark forward and includes a continuation
* token that provides a point to resume from after a checkpoint.
* <li>ChangeStreamMutation includes the actual mutation that took place in the Bigtable.
* ChangeStreamMutation also includes watermark and continuation token. All
* ChangeStreamMutation are emitted to the outputReceiver with the timestamp of 0 (instead
* of the commit timestamp). Setting the timestamp to 0 discourages the use of windowing on
* this connector. All ChangeStreamMutations will be late data when windowing. This design
* decision prefers availability over consistency in the event that partitions are streamed
* slowly (due to an outages or other unavailabilities) the commit timestamp which drives
* the watermark may lag behind preventing windows from closing.
* <li>CloseStream indicates that the stream has come to an end. The CloseStream is not
* processed but stored in the RestrictionTracker to be processed later. This ensures that
* we successfully commit all pending ChangeStreamMutations.
* </ul>
*
* CloseStream is the only response that type will initiate a resume. Other response type will
* simply process the response and return empty. Returning empty signals to caller that we have
* processed the response, and it does not require any additional action.
*
* <p>There are 2 cases that cause this function to return a non-empty ProcessContinuation.
*
* <ol>
* <li>We fail to claim a RestrictionTracker. This can happen for a runner-initiated checkpoint.
* When the runner initiates a checkpoint, we will stop and checkpoint pending
* ChangeStreamMutations and resume from the previous RestrictionTracker.
* <li>The response is a CloseStream. RestrictionTracker claims the CloseStream. We don't do any
* additional processing of the response. We return resume to signal to the caller that to
* checkpoint all pending ChangeStreamMutations. We expect the caller to check the
* RestrictionTracker includes a CloseStream and process it to close the stream.
* </ol>
*
* @param partitionRecord the stream partition that generated the response
* @param record the change stream record to be processed
* @param tracker restrictionTracker that we use to claim next block and also to store CloseStream
* @param receiver to output DataChange
* @param watermarkEstimator manually progress watermark when processing responses with watermark
* @return Optional.of(ProcessContinuation) if the run should be stopped or resumed, otherwise
* Optional.empty() to do nothing.
*/
public Optional<DoFn.ProcessContinuation> run(
PartitionRecord partitionRecord,
ChangeStreamRecord record,
RestrictionTracker<StreamProgress, StreamProgress> tracker,
DoFn.OutputReceiver<KV<ByteString, ChangeStreamMutation>> receiver,
ManualWatermarkEstimator<Instant> watermarkEstimator,
boolean shouldDebug) {
if (record instanceof Heartbeat) {
Heartbeat heartbeat = (Heartbeat) record;
StreamProgress streamProgress =
new StreamProgress(
heartbeat.getChangeStreamContinuationToken(), heartbeat.getLowWatermark());
final Instant watermark = TimestampConverter.toInstant(heartbeat.getLowWatermark());
watermarkEstimator.setWatermark(watermark);
if (shouldDebug) {
LOG.info(
"RCSP {}: Heartbeat partition: {} token: {} watermark: {}",
formatByteStringRange(partitionRecord.getPartition()),
formatByteStringRange(heartbeat.getChangeStreamContinuationToken().getPartition()),
heartbeat.getChangeStreamContinuationToken().getToken(),
heartbeat.getLowWatermark());
}
// If the tracker fail to claim the streamProgress, it most likely means the runner initiated
// a checkpoint. See {@link
// org.apache.beam.sdk.io.gcp.bigtable.changestreams.restriction.ReadChangeStreamPartitionProgressTracker}
// for more information regarding runner initiated checkpoints.
if (!tracker.tryClaim(streamProgress)) {
if (shouldDebug) {
LOG.info(
"RCSP {}: Failed to claim heart beat tracker",
formatByteStringRange(partitionRecord.getPartition()));
}
return Optional.of(DoFn.ProcessContinuation.stop());
}
metrics.incHeartbeatCount();
} else if (record instanceof CloseStream) {
CloseStream closeStream = (CloseStream) record;
StreamProgress streamProgress = new StreamProgress(closeStream);
if (shouldDebug) {
LOG.info(
"RCSP {}: CloseStream: {}",
formatByteStringRange(partitionRecord.getPartition()),
closeStream.getChangeStreamContinuationTokens().stream()
.map(
c ->
"{partition: "
+ formatByteStringRange(c.getPartition())
+ " token: "
+ c.getToken()
+ "}")
.collect(Collectors.joining(", ", "[", "]")));
}
// If the tracker fail to claim the streamProgress, it most likely means the runner initiated
// a checkpoint. See {@link
// org.apache.beam.sdk.io.gcp.bigtable.changestreams.restriction.ReadChangeStreamPartitionProgressTracker}
// for more information regarding runner initiated checkpoints.
if (!tracker.tryClaim(streamProgress)) {
if (shouldDebug) {
LOG.info(
"RCSP {}: Failed to claim close stream tracker",
formatByteStringRange(partitionRecord.getPartition()));
}
return Optional.of(DoFn.ProcessContinuation.stop());
}
metrics.incClosestreamCount();
return Optional.of(DoFn.ProcessContinuation.resume());
} else if (record instanceof ChangeStreamMutation) {
ChangeStreamMutation changeStreamMutation = (ChangeStreamMutation) record;
final Instant watermark =
TimestampConverter.toInstant(changeStreamMutation.getLowWatermark());
watermarkEstimator.setWatermark(watermark);
// Build a new StreamProgress with the continuation token to be claimed.
ChangeStreamContinuationToken changeStreamContinuationToken =
new ChangeStreamContinuationToken(
Range.ByteStringRange.create(
partitionRecord.getPartition().getStart(),
partitionRecord.getPartition().getEnd()),
changeStreamMutation.getToken());
StreamProgress streamProgress =
new StreamProgress(changeStreamContinuationToken, changeStreamMutation.getLowWatermark());
// If the tracker fail to claim the streamProgress, it most likely means the runner initiated
// a checkpoint. See ReadChangeStreamPartitionProgressTracker for more information regarding
// runner initiated checkpoints.
if (!tracker.tryClaim(streamProgress)) {
if (shouldDebug) {
LOG.info(
"RCSP {}: Failed to claim data change tracker",
formatByteStringRange(partitionRecord.getPartition()));
}
return Optional.of(DoFn.ProcessContinuation.stop());
}
if (changeStreamMutation.getType() == ChangeStreamMutation.MutationType.GARBAGE_COLLECTION) {
metrics.incChangeStreamMutationGcCounter();
} else if (changeStreamMutation.getType() == ChangeStreamMutation.MutationType.USER) {
metrics.incChangeStreamMutationUserCounter();
}
Instant delay = TimestampConverter.toInstant(changeStreamMutation.getCommitTimestamp());
metrics.updateProcessingDelayFromCommitTimestamp(
Instant.now().getMillis() - delay.getMillis());
KV<ByteString, ChangeStreamMutation> outputRecord =
KV.of(changeStreamMutation.getRowKey(), changeStreamMutation);
// We are outputting elements with timestamp of 0 to prevent reliance on event time. This
// limits the ability to window on commit time of any data changes. It is still possible to
// window on processing time.
receiver.outputWithTimestamp(outputRecord, Instant.EPOCH);
} else {
LOG.warn(
"RCSP {}: Invalid response type", formatByteStringRange(partitionRecord.getPartition()));
}
return Optional.empty();
}
}