/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.beam.sdk.io.gcp.bigtable.changestreams.action;

import static org.apache.beam.sdk.io.gcp.bigtable.changestreams.ByteStringRangeHelper.formatByteStringRange;

import com.google.cloud.bigtable.data.v2.models.ChangeStreamContinuationToken;
import com.google.cloud.bigtable.data.v2.models.ChangeStreamMutation;
import com.google.cloud.bigtable.data.v2.models.ChangeStreamRecord;
import com.google.cloud.bigtable.data.v2.models.CloseStream;
import com.google.cloud.bigtable.data.v2.models.Heartbeat;
import com.google.cloud.bigtable.data.v2.models.Range;
import com.google.protobuf.ByteString;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMetrics;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.TimestampConverter;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.model.PartitionRecord;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.restriction.StreamProgress;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.values.KV;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** This class is responsible for processing individual ChangeStreamRecord. */
@SuppressWarnings({"UnusedVariable", "UnusedMethod"})
public class ChangeStreamAction {
  private static final Logger LOG = LoggerFactory.getLogger(ChangeStreamAction.class);

  private final ChangeStreamMetrics metrics;

  /**
   * Constructs ChangeStreamAction to process individual ChangeStreamRecord.
   *
   * @param metrics record beam metrics.
   */
  public ChangeStreamAction(ChangeStreamMetrics metrics) {
    this.metrics = metrics;
  }

  /**
   * This class processes ReadChangeStreamResponse from bigtable server. There are 3 possible
   * response types, Heartbeat, ChangeStreamMutation, CloseStream.
   *
   * <ul>
   *   <li>Heartbeat happens periodically based on the initial configuration set at the start of the
   *       beam pipeline. Heartbeat can advance the watermark forward and includes a continuation
   *       token that provides a point to resume from after a checkpoint.
   *   <li>ChangeStreamMutation includes the actual mutation that took place in the Bigtable.
   *       ChangeStreamMutation also includes watermark and continuation token. All
   *       ChangeStreamMutation are emitted to the outputReceiver with the timestamp of 0 (instead
   *       of the commit timestamp). Setting the timestamp to 0 discourages the use of windowing on
   *       this connector. All ChangeStreamMutations will be late data when windowing. This design
   *       decision prefers availability over consistency in the event that partitions are streamed
   *       slowly (due to an outages or other unavailabilities) the commit timestamp which drives
   *       the watermark may lag behind preventing windows from closing.
   *   <li>CloseStream indicates that the stream has come to an end. The CloseStream is not
   *       processed but stored in the RestrictionTracker to be processed later. This ensures that
   *       we successfully commit all pending ChangeStreamMutations.
   * </ul>
   *
   * CloseStream is the only response that type will initiate a resume. Other response type will
   * simply process the response and return empty. Returning empty signals to caller that we have
   * processed the response, and it does not require any additional action.
   *
   * <p>There are 2 cases that cause this function to return a non-empty ProcessContinuation.
   *
   * <ol>
   *   <li>We fail to claim a RestrictionTracker. This can happen for a runner-initiated checkpoint.
   *       When the runner initiates a checkpoint, we will stop and checkpoint pending
   *       ChangeStreamMutations and resume from the previous RestrictionTracker.
   *   <li>The response is a CloseStream. RestrictionTracker claims the CloseStream. We don't do any
   *       additional processing of the response. We return resume to signal to the caller that to
   *       checkpoint all pending ChangeStreamMutations. We expect the caller to check the
   *       RestrictionTracker includes a CloseStream and process it to close the stream.
   * </ol>
   *
   * @param partitionRecord the stream partition that generated the response
   * @param record the change stream record to be processed
   * @param tracker restrictionTracker that we use to claim next block and also to store CloseStream
   * @param receiver to output DataChange
   * @param watermarkEstimator manually progress watermark when processing responses with watermark
   * @return Optional.of(ProcessContinuation) if the run should be stopped or resumed, otherwise
   *     Optional.empty() to do nothing.
   */
  public Optional<DoFn.ProcessContinuation> run(
      PartitionRecord partitionRecord,
      ChangeStreamRecord record,
      RestrictionTracker<StreamProgress, StreamProgress> tracker,
      DoFn.OutputReceiver<KV<ByteString, ChangeStreamMutation>> receiver,
      ManualWatermarkEstimator<Instant> watermarkEstimator,
      boolean shouldDebug) {
    if (record instanceof Heartbeat) {
      Heartbeat heartbeat = (Heartbeat) record;
      StreamProgress streamProgress =
          new StreamProgress(
              heartbeat.getChangeStreamContinuationToken(), heartbeat.getLowWatermark());
      final Instant watermark = TimestampConverter.toInstant(heartbeat.getLowWatermark());
      watermarkEstimator.setWatermark(watermark);

      if (shouldDebug) {
        LOG.info(
            "RCSP {}: Heartbeat partition: {} token: {} watermark: {}",
            formatByteStringRange(partitionRecord.getPartition()),
            formatByteStringRange(heartbeat.getChangeStreamContinuationToken().getPartition()),
            heartbeat.getChangeStreamContinuationToken().getToken(),
            heartbeat.getLowWatermark());
      }
      // If the tracker fail to claim the streamProgress, it most likely means the runner initiated
      // a checkpoint. See {@link
      // org.apache.beam.sdk.io.gcp.bigtable.changestreams.restriction.ReadChangeStreamPartitionProgressTracker}
      // for more information regarding runner initiated checkpoints.
      if (!tracker.tryClaim(streamProgress)) {
        if (shouldDebug) {
          LOG.info(
              "RCSP {}: Failed to claim heart beat tracker",
              formatByteStringRange(partitionRecord.getPartition()));
        }
        return Optional.of(DoFn.ProcessContinuation.stop());
      }
      metrics.incHeartbeatCount();
    } else if (record instanceof CloseStream) {
      CloseStream closeStream = (CloseStream) record;
      StreamProgress streamProgress = new StreamProgress(closeStream);

      if (shouldDebug) {
        LOG.info(
            "RCSP {}: CloseStream: {}",
            formatByteStringRange(partitionRecord.getPartition()),
            closeStream.getChangeStreamContinuationTokens().stream()
                .map(
                    c ->
                        "{partition: "
                            + formatByteStringRange(c.getPartition())
                            + " token: "
                            + c.getToken()
                            + "}")
                .collect(Collectors.joining(", ", "[", "]")));
      }
      // If the tracker fail to claim the streamProgress, it most likely means the runner initiated
      // a checkpoint. See {@link
      // org.apache.beam.sdk.io.gcp.bigtable.changestreams.restriction.ReadChangeStreamPartitionProgressTracker}
      // for more information regarding runner initiated checkpoints.
      if (!tracker.tryClaim(streamProgress)) {
        if (shouldDebug) {
          LOG.info(
              "RCSP {}: Failed to claim close stream tracker",
              formatByteStringRange(partitionRecord.getPartition()));
        }
        return Optional.of(DoFn.ProcessContinuation.stop());
      }
      metrics.incClosestreamCount();
      return Optional.of(DoFn.ProcessContinuation.resume());
    } else if (record instanceof ChangeStreamMutation) {
      ChangeStreamMutation changeStreamMutation = (ChangeStreamMutation) record;
      final Instant watermark =
          TimestampConverter.toInstant(changeStreamMutation.getLowWatermark());
      watermarkEstimator.setWatermark(watermark);
      // Build a new StreamProgress with the continuation token to be claimed.
      ChangeStreamContinuationToken changeStreamContinuationToken =
          new ChangeStreamContinuationToken(
              Range.ByteStringRange.create(
                  partitionRecord.getPartition().getStart(),
                  partitionRecord.getPartition().getEnd()),
              changeStreamMutation.getToken());
      StreamProgress streamProgress =
          new StreamProgress(changeStreamContinuationToken, changeStreamMutation.getLowWatermark());
      // If the tracker fail to claim the streamProgress, it most likely means the runner initiated
      // a checkpoint. See ReadChangeStreamPartitionProgressTracker for more information regarding
      // runner initiated checkpoints.
      if (!tracker.tryClaim(streamProgress)) {
        if (shouldDebug) {
          LOG.info(
              "RCSP {}: Failed to claim data change tracker",
              formatByteStringRange(partitionRecord.getPartition()));
        }
        return Optional.of(DoFn.ProcessContinuation.stop());
      }
      if (changeStreamMutation.getType() == ChangeStreamMutation.MutationType.GARBAGE_COLLECTION) {
        metrics.incChangeStreamMutationGcCounter();
      } else if (changeStreamMutation.getType() == ChangeStreamMutation.MutationType.USER) {
        metrics.incChangeStreamMutationUserCounter();
      }
      Instant delay = TimestampConverter.toInstant(changeStreamMutation.getCommitTimestamp());
      metrics.updateProcessingDelayFromCommitTimestamp(
          Instant.now().getMillis() - delay.getMillis());

      KV<ByteString, ChangeStreamMutation> outputRecord =
          KV.of(changeStreamMutation.getRowKey(), changeStreamMutation);
      // We are outputting elements with timestamp of 0 to prevent reliance on event time. This
      // limits the ability to window on commit time of any data changes. It is still possible to
      // window on processing time.
      receiver.outputWithTimestamp(outputRecord, Instant.EPOCH);
    } else {
      LOG.warn(
          "RCSP {}: Invalid response type", formatByteStringRange(partitionRecord.getPartition()));
    }
    return Optional.empty();
  }
}
