blob: 0a4e3e7458f5933eb5d275866baebd5b9bbb1d78 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.gcp.pubsublite;
import static com.google.cloud.pubsublite.internal.testing.UnitTestExamples.example;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.when;
import static org.mockito.MockitoAnnotations.initMocks;
import com.google.api.core.ApiFutures;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.StatusCode.Code;
import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.internal.CheckedApiException;
import com.google.cloud.pubsublite.internal.testing.FakeApiService;
import com.google.cloud.pubsublite.internal.wire.Committer;
import com.google.cloud.pubsublite.proto.SequencedMessage;
import java.io.ByteArrayOutputStream;
import java.io.ObjectOutputStream;
import java.util.Optional;
import javax.annotation.Nonnull;
import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation;
import org.apache.beam.sdk.transforms.SerializableBiFunction;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker.Progress;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.math.DoubleMath;
import org.joda.time.Duration;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.Spy;
@RunWith(JUnit4.class)
@SuppressWarnings("initialization.fields.uninitialized")
public class PerSubscriptionPartitionSdfTest {
private static final Duration MAX_SLEEP_TIME =
Duration.standardMinutes(10).plus(Duration.millis(10));
private static final OffsetByteRange RESTRICTION =
OffsetByteRange.of(new OffsetRange(1, Long.MAX_VALUE), 0);
private static final SubscriptionPartition PARTITION =
SubscriptionPartition.of(example(SubscriptionPath.class), example(Partition.class));
@Mock SerializableFunction<SubscriptionPartition, InitialOffsetReader> offsetReaderFactory;
@Mock ManagedBacklogReaderFactory backlogReaderFactory;
@Mock TopicBacklogReader backlogReader;
@Mock
SerializableBiFunction<TopicBacklogReader, OffsetByteRange, TrackerWithProgress> trackerFactory;
@Mock SubscriptionPartitionProcessorFactory processorFactory;
@Mock SerializableFunction<SubscriptionPartition, Committer> committerFactory;
@Mock InitialOffsetReader initialOffsetReader;
@Spy TrackerWithProgress tracker;
@Mock OutputReceiver<SequencedMessage> output;
@Mock SubscriptionPartitionProcessor processor;
abstract static class FakeCommitter extends FakeApiService implements Committer {}
@Spy FakeCommitter committer;
PerSubscriptionPartitionSdf sdf;
@Before
public void setUp() {
initMocks(this);
when(offsetReaderFactory.apply(any())).thenReturn(initialOffsetReader);
when(processorFactory.newProcessor(any(), any(), any())).thenReturn(processor);
when(trackerFactory.apply(any(), any())).thenReturn(tracker);
when(committerFactory.apply(any())).thenReturn(committer);
when(tracker.currentRestriction()).thenReturn(RESTRICTION);
when(backlogReaderFactory.newReader(any())).thenReturn(backlogReader);
sdf =
new PerSubscriptionPartitionSdf(
MAX_SLEEP_TIME,
backlogReaderFactory,
offsetReaderFactory,
trackerFactory,
processorFactory,
committerFactory);
}
@Test
public void getInitialRestrictionReadSuccess() {
when(initialOffsetReader.read()).thenReturn(example(Offset.class));
OffsetByteRange range = sdf.getInitialRestriction(PARTITION);
assertEquals(example(Offset.class).value(), range.getRange().getFrom());
assertEquals(Long.MAX_VALUE, range.getRange().getTo());
assertEquals(0, range.getByteCount());
verify(offsetReaderFactory).apply(PARTITION);
}
@Test
public void getInitialRestrictionReadFailure() {
when(initialOffsetReader.read()).thenThrow(new CheckedApiException(Code.INTERNAL).underlying);
assertThrows(ApiException.class, () -> sdf.getInitialRestriction(PARTITION));
}
@Test
public void newTrackerCallsFactory() {
assertSame(tracker, sdf.newTracker(PARTITION, RESTRICTION));
verify(trackerFactory).apply(backlogReader, RESTRICTION);
}
@Test
public void tearDownClosesBacklogReaderFactory() {
sdf.teardown();
verify(backlogReaderFactory).close();
}
@Test
@SuppressWarnings("argument.type.incompatible")
public void process() throws Exception {
when(processor.waitForCompletion(MAX_SLEEP_TIME)).thenReturn(ProcessContinuation.resume());
when(processorFactory.newProcessor(any(), any(), any()))
.thenAnswer(
args -> {
@Nonnull
RestrictionTracker<OffsetRange, OffsetByteProgress> wrapped = args.getArgument(1);
when(tracker.tryClaim(any())).thenReturn(true).thenReturn(false);
assertTrue(wrapped.tryClaim(OffsetByteProgress.of(example(Offset.class), 123)));
assertFalse(wrapped.tryClaim(OffsetByteProgress.of(Offset.of(333333), 123)));
return processor;
});
doReturn(Optional.of(example(Offset.class))).when(processor).lastClaimed();
when(committer.commitOffset(any())).thenReturn(ApiFutures.immediateFuture(null));
assertEquals(ProcessContinuation.resume(), sdf.processElement(tracker, PARTITION, output));
verify(processorFactory).newProcessor(eq(PARTITION), any(), eq(output));
InOrder order = inOrder(processor);
order.verify(processor).start();
order.verify(processor).waitForCompletion(MAX_SLEEP_TIME);
order.verify(processor).lastClaimed();
order.verify(processor).close();
InOrder order2 = inOrder(committerFactory, committer);
order2.verify(committer).startAsync();
order2.verify(committer).awaitRunning();
order2.verify(committer).commitOffset(Offset.of(example(Offset.class).value() + 1));
order2.verify(committer).stopAsync();
order2.verify(committer).awaitTerminated();
}
private static final class NoopManagedBacklogReaderFactory
implements ManagedBacklogReaderFactory {
@Override
public TopicBacklogReader newReader(SubscriptionPartition subscriptionPartition) {
return null;
}
@Override
public void close() {}
}
@Test
@SuppressWarnings("return.type.incompatible")
public void dofnIsSerializable() throws Exception {
ObjectOutputStream output = new ObjectOutputStream(new ByteArrayOutputStream());
output.writeObject(
new PerSubscriptionPartitionSdf(
MAX_SLEEP_TIME,
new NoopManagedBacklogReaderFactory(),
x -> null,
(x, y) -> null,
(x, y, z) -> null,
(x) -> null));
}
@Test
public void getProgressUnboundedRangeDelegates() {
Progress progress = Progress.from(0, 0.2);
when(tracker.getProgress()).thenReturn(progress);
assertTrue(
DoubleMath.fuzzyEquals(
progress.getWorkRemaining(), sdf.getSize(PARTITION, RESTRICTION), .0001));
verify(tracker).getProgress();
}
@Test
public void getProgressBoundedReturnsBytes() {
assertTrue(
DoubleMath.fuzzyEquals(
123.0,
sdf.getSize(PARTITION, OffsetByteRange.of(new OffsetRange(87, 8000), 123)),
.0001));
verifyNoInteractions(tracker);
}
}