[BEAM-8376] Google Cloud Firestore Connector - Add handling for partitions queries with zero cursors
Firestore PartitionQueryResponses can have zero cursors if there are
not enough documents to cause a partition cursor to be recorded. Update
`PartitionQueryResponseToRunQueryRequest` to emit the provided
StructuredQuery without modification if there aren't any cursors.
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1.java
index dd5202e..74d6636 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1.java
@@ -1218,6 +1218,10 @@
List<Cursor> cursors = new ArrayList<>(partitionQueryResponse.getPartitionsList());
cursors.sort(CURSOR_REFERENCE_VALUE_COMPARATOR);
final int size = cursors.size();
+ if (size == 0) {
+ emit(c, dbRoot, structuredQuery.toBuilder());
+ return;
+ }
final int lastIdx = size - 1;
for (int i = 0; i < size; i++) {
Cursor curr = cursors.get(i);
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1FnPartitionQueryTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1FnPartitionQueryTest.java
index 0c9bbf1..1f29883 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1FnPartitionQueryTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1FnPartitionQueryTest.java
@@ -99,6 +99,41 @@
assertEquals(expected, allValues);
}
+ @Test
+ public void endToEnd_emptyCursors() throws Exception {
+ // First page of the response
+ PartitionQueryRequest request1 =
+ PartitionQueryRequest.newBuilder()
+ .setParent(String.format("projects/%s/databases/(default)/document", projectId))
+ .build();
+ PartitionQueryResponse response1 = PartitionQueryResponse.newBuilder().build();
+ when(callable.call(request1)).thenReturn(pagedResponse1);
+ when(page1.getResponse()).thenReturn(response1);
+ when(pagedResponse1.iteratePages()).thenReturn(ImmutableList.of(page1));
+
+ when(stub.partitionQueryPagedCallable()).thenReturn(callable);
+
+ when(ff.getFirestoreStub(any())).thenReturn(stub);
+ RpcQosOptions options = RpcQosOptions.defaultOptions();
+ when(ff.getRpcQos(any()))
+ .thenReturn(FirestoreStatefulComponentFactory.INSTANCE.getRpcQos(options));
+
+ ArgumentCaptor<PartitionQueryPair> responses =
+ ArgumentCaptor.forClass(PartitionQueryPair.class);
+
+ doNothing().when(processContext).output(responses.capture());
+
+ when(processContext.element()).thenReturn(request1);
+
+ PartitionQueryFn fn = new PartitionQueryFn(clock, ff, options);
+
+ runFunction(fn);
+
+ List<PartitionQueryPair> expected = newArrayList(new PartitionQueryPair(request1, response1));
+ List<PartitionQueryPair> allValues = responses.getAllValues();
+ assertEquals(expected, allValues);
+ }
+
@Override
public void resumeFromLastReadValue() throws Exception {
when(ff.getFirestoreStub(any())).thenReturn(stub);
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/PartitionQueryResponseToRunQueryRequestTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/PartitionQueryResponseToRunQueryRequestTest.java
index 25ed63c..ed789da 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/PartitionQueryResponseToRunQueryRequestTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/PartitionQueryResponseToRunQueryRequestTest.java
@@ -121,6 +121,39 @@
assertEquals(expectedQueries, actualQueries);
}
+ @Test
+ public void ensureCursorPairingWorks_emptyCursorsInResponse() {
+ StructuredQuery query =
+ StructuredQuery.newBuilder()
+ .addFrom(
+ CollectionSelector.newBuilder()
+ .setAllDescendants(true)
+ .setCollectionId("c1")
+ .build())
+ .build();
+
+ List<StructuredQuery> expectedQueries = newArrayList(query);
+
+ PartitionQueryPair partitionQueryPair =
+ new PartitionQueryPair(
+ PartitionQueryRequest.newBuilder().setStructuredQuery(query).build(),
+ PartitionQueryResponse.newBuilder().build());
+
+ ArgumentCaptor<RunQueryRequest> captor = ArgumentCaptor.forClass(RunQueryRequest.class);
+ when(processContext.element()).thenReturn(partitionQueryPair);
+ doNothing().when(processContext).output(captor.capture());
+
+ PartitionQueryResponseToRunQueryRequest fn = new PartitionQueryResponseToRunQueryRequest();
+ fn.processElement(processContext);
+
+ List<StructuredQuery> actualQueries =
+ captor.getAllValues().stream()
+ .map(RunQueryRequest::getStructuredQuery)
+ .collect(Collectors.toList());
+
+ assertEquals(expectedQueries, actualQueries);
+ }
+
private static Cursor referenceValueCursor(String referenceValue) {
return Cursor.newBuilder()
.addValues(Value.newBuilder().setReferenceValue(referenceValue).build())