blob: 1f298837ea6a28ee5ee27954d44d5949ec1be0c3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.gcp.firestore;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists.newArrayList;
import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import com.google.api.gax.rpc.UnaryCallable;
import com.google.cloud.firestore.v1.FirestoreClient.PartitionQueryPage;
import com.google.cloud.firestore.v1.FirestoreClient.PartitionQueryPagedResponse;
import com.google.cloud.firestore.v1.stub.FirestoreStub;
import com.google.firestore.v1.Cursor;
import com.google.firestore.v1.PartitionQueryRequest;
import com.google.firestore.v1.PartitionQueryResponse;
import com.google.firestore.v1.Value;
import java.util.Iterator;
import java.util.List;
import org.apache.beam.sdk.io.gcp.firestore.FirestoreV1ReadFn.PartitionQueryFn;
import org.apache.beam.sdk.io.gcp.firestore.FirestoreV1ReadFn.PartitionQueryPair;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.AbstractIterator;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
@SuppressWarnings(
"initialization.fields.uninitialized") // mockito fields are initialized via the Mockito Runner
public final class FirestoreV1FnPartitionQueryTest
extends BaseFirestoreV1ReadFnTest<PartitionQueryRequest, PartitionQueryPair> {
@Mock private UnaryCallable<PartitionQueryRequest, PartitionQueryPagedResponse> callable;
@Mock private PartitionQueryPagedResponse pagedResponse1;
@Mock private PartitionQueryPage page1;
@Mock private PartitionQueryPagedResponse pagedResponse2;
@Mock private PartitionQueryPage page2;
@Test
public void endToEnd() throws Exception {
// First page of the response
PartitionQueryRequest request1 =
PartitionQueryRequest.newBuilder()
.setParent(String.format("projects/%s/databases/(default)/document", projectId))
.build();
PartitionQueryResponse response1 =
PartitionQueryResponse.newBuilder()
.addPartitions(
Cursor.newBuilder().addValues(Value.newBuilder().setReferenceValue("doc-100")))
.addPartitions(
Cursor.newBuilder().addValues(Value.newBuilder().setReferenceValue("doc-200")))
.addPartitions(
Cursor.newBuilder().addValues(Value.newBuilder().setReferenceValue("doc-300")))
.addPartitions(
Cursor.newBuilder().addValues(Value.newBuilder().setReferenceValue("doc-400")))
.build();
when(callable.call(request1)).thenReturn(pagedResponse1);
when(page1.getResponse()).thenReturn(response1);
when(pagedResponse1.iteratePages()).thenReturn(ImmutableList.of(page1));
when(stub.partitionQueryPagedCallable()).thenReturn(callable);
when(ff.getFirestoreStub(any())).thenReturn(stub);
RpcQosOptions options = RpcQosOptions.defaultOptions();
when(ff.getRpcQos(any()))
.thenReturn(FirestoreStatefulComponentFactory.INSTANCE.getRpcQos(options));
ArgumentCaptor<PartitionQueryPair> responses =
ArgumentCaptor.forClass(PartitionQueryPair.class);
doNothing().when(processContext).output(responses.capture());
when(processContext.element()).thenReturn(request1);
PartitionQueryFn fn = new PartitionQueryFn(clock, ff, options);
runFunction(fn);
List<PartitionQueryPair> expected = newArrayList(new PartitionQueryPair(request1, response1));
List<PartitionQueryPair> allValues = responses.getAllValues();
assertEquals(expected, allValues);
}
@Test
public void endToEnd_emptyCursors() throws Exception {
// First page of the response
PartitionQueryRequest request1 =
PartitionQueryRequest.newBuilder()
.setParent(String.format("projects/%s/databases/(default)/document", projectId))
.build();
PartitionQueryResponse response1 = PartitionQueryResponse.newBuilder().build();
when(callable.call(request1)).thenReturn(pagedResponse1);
when(page1.getResponse()).thenReturn(response1);
when(pagedResponse1.iteratePages()).thenReturn(ImmutableList.of(page1));
when(stub.partitionQueryPagedCallable()).thenReturn(callable);
when(ff.getFirestoreStub(any())).thenReturn(stub);
RpcQosOptions options = RpcQosOptions.defaultOptions();
when(ff.getRpcQos(any()))
.thenReturn(FirestoreStatefulComponentFactory.INSTANCE.getRpcQos(options));
ArgumentCaptor<PartitionQueryPair> responses =
ArgumentCaptor.forClass(PartitionQueryPair.class);
doNothing().when(processContext).output(responses.capture());
when(processContext.element()).thenReturn(request1);
PartitionQueryFn fn = new PartitionQueryFn(clock, ff, options);
runFunction(fn);
List<PartitionQueryPair> expected = newArrayList(new PartitionQueryPair(request1, response1));
List<PartitionQueryPair> allValues = responses.getAllValues();
assertEquals(expected, allValues);
}
@Override
public void resumeFromLastReadValue() throws Exception {
when(ff.getFirestoreStub(any())).thenReturn(stub);
when(ff.getRpcQos(any())).thenReturn(rpcQos);
when(rpcQos.newReadAttempt(any())).thenReturn(attempt);
when(attempt.awaitSafeToProceed(any())).thenReturn(true);
// First page of the response
PartitionQueryRequest request1 =
PartitionQueryRequest.newBuilder()
.setParent(String.format("projects/%s/databases/(default)/document", projectId))
.build();
PartitionQueryResponse response1 =
PartitionQueryResponse.newBuilder()
.addPartitions(
Cursor.newBuilder().addValues(Value.newBuilder().setReferenceValue("doc-100")))
.addPartitions(
Cursor.newBuilder().addValues(Value.newBuilder().setReferenceValue("doc-200")))
.addPartitions(
Cursor.newBuilder().addValues(Value.newBuilder().setReferenceValue("doc-300")))
.setNextPageToken("page2")
.build();
when(page1.getResponse()).thenReturn(response1);
when(callable.call(request1)).thenReturn(pagedResponse1);
doNothing().when(attempt).checkCanRetry(any(), eq(RETRYABLE_ERROR));
when(pagedResponse1.iteratePages())
.thenAnswer(
invocation ->
new Iterable<PartitionQueryPage>() {
@Override
public Iterator<PartitionQueryPage> iterator() {
return new AbstractIterator<PartitionQueryPage>() {
private boolean first = true;
@Override
protected PartitionQueryPage computeNext() {
if (first) {
first = false;
return page1;
} else {
throw RETRYABLE_ERROR;
}
}
};
}
});
// Second page of the response
PartitionQueryRequest request2 =
PartitionQueryRequest.newBuilder()
.setParent(String.format("projects/%s/databases/(default)/document", projectId))
.setPageToken("page2")
.build();
PartitionQueryResponse response2 =
PartitionQueryResponse.newBuilder()
.addPartitions(
Cursor.newBuilder().addValues(Value.newBuilder().setReferenceValue("doc-400")))
.build();
PartitionQueryResponse expectedResponse =
response1
.toBuilder()
.clearNextPageToken()
.addAllPartitions(response2.getPartitionsList())
.build();
when(page2.getResponse()).thenReturn(response2);
when(page2.hasNextPage()).thenReturn(false);
when(callable.call(request2)).thenReturn(pagedResponse2);
when(pagedResponse2.iteratePages()).thenReturn(ImmutableList.of(page2));
when(stub.partitionQueryPagedCallable()).thenReturn(callable);
when(ff.getFirestoreStub(any())).thenReturn(stub);
ArgumentCaptor<PartitionQueryPair> responses =
ArgumentCaptor.forClass(PartitionQueryPair.class);
doNothing().when(processContext).output(responses.capture());
when(processContext.element()).thenReturn(request1);
PartitionQueryFn fn = new PartitionQueryFn(clock, ff, rpcQosOptions);
runFunction(fn);
List<PartitionQueryPair> expected =
newArrayList(new PartitionQueryPair(request1, expectedResponse));
List<PartitionQueryPair> allValues = responses.getAllValues();
assertEquals(expected, allValues);
}
@Override
protected V1RpcFnTestCtx newCtx() {
return new V1RpcFnTestCtx() {
@Override
public PartitionQueryRequest getRequest() {
return PartitionQueryRequest.newBuilder()
.setParent(String.format("projects/%s/databases/(default)/document", projectId))
.build();
}
@Override
public void mockRpcToCallable(FirestoreStub stub) {
when(stub.partitionQueryPagedCallable()).thenReturn(callable);
}
@Override
public void whenCallableCall(PartitionQueryRequest in, Throwable... throwables) {
when(callable.call(in)).thenThrow(throwables);
}
@Override
public void verifyNoInteractionsWithCallable() {
verifyNoMoreInteractions(callable);
}
};
}
@Override
protected PartitionQueryFn getFn(
JodaClock clock,
FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
RpcQosOptions rpcQosOptions) {
return new PartitionQueryFn(clock, firestoreStatefulComponentFactory, rpcQosOptions);
}
}