blob: 46453a4d892dc780a76892d8b2c81184db1edffd [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.gcp.bigtable.changestreams.action;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import com.google.cloud.bigtable.data.v2.models.ChangeStreamContinuationToken;
import com.google.cloud.bigtable.data.v2.models.ChangeStreamMutation;
import com.google.cloud.bigtable.data.v2.models.CloseStream;
import com.google.cloud.bigtable.data.v2.models.Heartbeat;
import com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange;
import com.google.protobuf.ByteString;
import com.google.protobuf.Timestamp;
import com.google.rpc.Status;
import java.util.Collections;
import java.util.Optional;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMetrics;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.TimestampConverter;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.model.PartitionRecord;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.restriction.ReadChangeStreamPartitionProgressTracker;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.restriction.StreamProgress;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.values.KV;
import org.joda.time.Instant;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
public class ChangeStreamActionTest {
private ChangeStreamMetrics metrics;
private ChangeStreamAction action;
private RestrictionTracker<StreamProgress, StreamProgress> tracker;
private PartitionRecord partitionRecord;
private DoFn.OutputReceiver<KV<ByteString, ChangeStreamMutation>> receiver;
private ManualWatermarkEstimator<Instant> watermarkEstimator;
@Before
public void setUp() {
metrics = mock(ChangeStreamMetrics.class);
tracker = mock(ReadChangeStreamPartitionProgressTracker.class);
partitionRecord = mock(PartitionRecord.class);
receiver = mock(DoFn.OutputReceiver.class);
watermarkEstimator = mock(ManualWatermarkEstimator.class);
action = new ChangeStreamAction(metrics);
when(tracker.tryClaim(any())).thenReturn(true);
}
@Test
public void testHeartBeat() {
final Timestamp lowWatermark = Timestamp.newBuilder().setSeconds(1000).build();
ChangeStreamContinuationToken changeStreamContinuationToken =
new ChangeStreamContinuationToken(ByteStringRange.create("a", "b"), "1234");
Heartbeat mockHeartBeat = Mockito.mock(Heartbeat.class);
Mockito.when(mockHeartBeat.getLowWatermark()).thenReturn(lowWatermark);
Mockito.when(mockHeartBeat.getChangeStreamContinuationToken())
.thenReturn(changeStreamContinuationToken);
final Optional<DoFn.ProcessContinuation> result =
action.run(partitionRecord, mockHeartBeat, tracker, receiver, watermarkEstimator, false);
assertFalse(result.isPresent());
verify(metrics).incHeartbeatCount();
verify(watermarkEstimator).setWatermark(eq(TimestampConverter.toInstant(lowWatermark)));
StreamProgress streamProgress = new StreamProgress(changeStreamContinuationToken, lowWatermark);
verify(tracker).tryClaim(eq(streamProgress));
}
@Test
public void testCloseStreamResume() {
ChangeStreamContinuationToken changeStreamContinuationToken =
new ChangeStreamContinuationToken(ByteStringRange.create("a", "b"), "1234");
CloseStream mockCloseStream = Mockito.mock(CloseStream.class);
Status statusProto = Status.newBuilder().setCode(11).build();
Mockito.when(mockCloseStream.getStatus())
.thenReturn(com.google.cloud.bigtable.common.Status.fromProto(statusProto));
Mockito.when(mockCloseStream.getChangeStreamContinuationTokens())
.thenReturn(Collections.singletonList(changeStreamContinuationToken));
final Optional<DoFn.ProcessContinuation> result =
action.run(partitionRecord, mockCloseStream, tracker, receiver, watermarkEstimator, false);
assertTrue(result.isPresent());
assertEquals(DoFn.ProcessContinuation.resume(), result.get());
verify(metrics).incClosestreamCount();
StreamProgress streamProgress = new StreamProgress(mockCloseStream);
verify(tracker).tryClaim(eq(streamProgress));
}
@Test
public void testChangeStreamMutationUser() {
ByteStringRange partition = ByteStringRange.create("", "");
when(partitionRecord.getPartition()).thenReturn(partition);
final Timestamp commitTimestamp = Timestamp.newBuilder().setSeconds(1000).build();
final Timestamp lowWatermark = Timestamp.newBuilder().setSeconds(500).build();
ChangeStreamContinuationToken changeStreamContinuationToken =
new ChangeStreamContinuationToken(ByteStringRange.create("", ""), "1234");
ChangeStreamMutation changeStreamMutation = Mockito.mock(ChangeStreamMutation.class);
Mockito.when(changeStreamMutation.getCommitTimestamp()).thenReturn(commitTimestamp);
Mockito.when(changeStreamMutation.getToken()).thenReturn("1234");
Mockito.when(changeStreamMutation.getLowWatermark()).thenReturn(lowWatermark);
Mockito.when(changeStreamMutation.getType()).thenReturn(ChangeStreamMutation.MutationType.USER);
KV<ByteString, ChangeStreamMutation> record =
KV.of(changeStreamMutation.getRowKey(), changeStreamMutation);
final Optional<DoFn.ProcessContinuation> result =
action.run(
partitionRecord, changeStreamMutation, tracker, receiver, watermarkEstimator, false);
assertFalse(result.isPresent());
verify(metrics).incChangeStreamMutationUserCounter();
verify(metrics, never()).incChangeStreamMutationGcCounter();
StreamProgress streamProgress = new StreamProgress(changeStreamContinuationToken, lowWatermark);
verify(tracker).tryClaim(eq(streamProgress));
verify(receiver).outputWithTimestamp(eq(record), eq(Instant.EPOCH));
verify(watermarkEstimator).setWatermark(eq(TimestampConverter.toInstant(lowWatermark)));
}
@Test
public void testChangeStreamMutationGc() {
ByteStringRange partition = ByteStringRange.create("", "");
when(partitionRecord.getPartition()).thenReturn(partition);
final Timestamp commitTimestamp = Timestamp.newBuilder().setSeconds(1000).build();
final Timestamp lowWatermark = Timestamp.newBuilder().setSeconds(500).build();
ChangeStreamContinuationToken changeStreamContinuationToken =
new ChangeStreamContinuationToken(ByteStringRange.create("", ""), "1234");
ChangeStreamMutation changeStreamMutation = Mockito.mock(ChangeStreamMutation.class);
Mockito.when(changeStreamMutation.getCommitTimestamp()).thenReturn(commitTimestamp);
Mockito.when(changeStreamMutation.getToken()).thenReturn("1234");
Mockito.when(changeStreamMutation.getLowWatermark()).thenReturn(lowWatermark);
Mockito.when(changeStreamMutation.getType())
.thenReturn(ChangeStreamMutation.MutationType.GARBAGE_COLLECTION);
KV<ByteString, ChangeStreamMutation> record =
KV.of(changeStreamMutation.getRowKey(), changeStreamMutation);
final Optional<DoFn.ProcessContinuation> result =
action.run(
partitionRecord, changeStreamMutation, tracker, receiver, watermarkEstimator, false);
assertFalse(result.isPresent());
verify(metrics).incChangeStreamMutationGcCounter();
verify(metrics, never()).incChangeStreamMutationUserCounter();
StreamProgress streamProgress = new StreamProgress(changeStreamContinuationToken, lowWatermark);
verify(tracker).tryClaim(eq(streamProgress));
verify(receiver).outputWithTimestamp(eq(record), eq(Instant.EPOCH));
verify(watermarkEstimator).setWatermark(eq(TimestampConverter.toInstant(lowWatermark)));
}
}