[BEAM-8376] Google Cloud Firestore Connector - Add handling for partitions queries with zero cursors

Firestore PartitionQueryResponses can have zero cursors if there are
not enough documents to cause a partition cursor to be recorded. Update
`PartitionQueryResponseToRunQueryRequest` to emit the provided
StructuredQuery without modification if there aren't any cursors.
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1.java
index dd5202e..74d6636 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1.java
@@ -1218,6 +1218,10 @@
         List<Cursor> cursors = new ArrayList<>(partitionQueryResponse.getPartitionsList());
         cursors.sort(CURSOR_REFERENCE_VALUE_COMPARATOR);
         final int size = cursors.size();
+        if (size == 0) {
+          emit(c, dbRoot, structuredQuery.toBuilder());
+          return;
+        }
         final int lastIdx = size - 1;
         for (int i = 0; i < size; i++) {
           Cursor curr = cursors.get(i);
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1FnPartitionQueryTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1FnPartitionQueryTest.java
index 0c9bbf1..1f29883 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1FnPartitionQueryTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1FnPartitionQueryTest.java
@@ -99,6 +99,41 @@
     assertEquals(expected, allValues);
   }
 
+  @Test
+  public void endToEnd_emptyCursors() throws Exception {
+    // First page of the response
+    PartitionQueryRequest request1 =
+        PartitionQueryRequest.newBuilder()
+            .setParent(String.format("projects/%s/databases/(default)/document", projectId))
+            .build();
+    PartitionQueryResponse response1 = PartitionQueryResponse.newBuilder().build();
+    when(callable.call(request1)).thenReturn(pagedResponse1);
+    when(page1.getResponse()).thenReturn(response1);
+    when(pagedResponse1.iteratePages()).thenReturn(ImmutableList.of(page1));
+
+    when(stub.partitionQueryPagedCallable()).thenReturn(callable);
+
+    when(ff.getFirestoreStub(any())).thenReturn(stub);
+    RpcQosOptions options = RpcQosOptions.defaultOptions();
+    when(ff.getRpcQos(any()))
+        .thenReturn(FirestoreStatefulComponentFactory.INSTANCE.getRpcQos(options));
+
+    ArgumentCaptor<PartitionQueryPair> responses =
+        ArgumentCaptor.forClass(PartitionQueryPair.class);
+
+    doNothing().when(processContext).output(responses.capture());
+
+    when(processContext.element()).thenReturn(request1);
+
+    PartitionQueryFn fn = new PartitionQueryFn(clock, ff, options);
+
+    runFunction(fn);
+
+    List<PartitionQueryPair> expected = newArrayList(new PartitionQueryPair(request1, response1));
+    List<PartitionQueryPair> allValues = responses.getAllValues();
+    assertEquals(expected, allValues);
+  }
+
   @Override
   public void resumeFromLastReadValue() throws Exception {
     when(ff.getFirestoreStub(any())).thenReturn(stub);
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/PartitionQueryResponseToRunQueryRequestTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/PartitionQueryResponseToRunQueryRequestTest.java
index 25ed63c..ed789da 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/PartitionQueryResponseToRunQueryRequestTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/PartitionQueryResponseToRunQueryRequestTest.java
@@ -121,6 +121,39 @@
     assertEquals(expectedQueries, actualQueries);
   }
 
+  @Test
+  public void ensureCursorPairingWorks_emptyCursorsInResponse() {
+    StructuredQuery query =
+        StructuredQuery.newBuilder()
+            .addFrom(
+                CollectionSelector.newBuilder()
+                    .setAllDescendants(true)
+                    .setCollectionId("c1")
+                    .build())
+            .build();
+
+    List<StructuredQuery> expectedQueries = newArrayList(query);
+
+    PartitionQueryPair partitionQueryPair =
+        new PartitionQueryPair(
+            PartitionQueryRequest.newBuilder().setStructuredQuery(query).build(),
+            PartitionQueryResponse.newBuilder().build());
+
+    ArgumentCaptor<RunQueryRequest> captor = ArgumentCaptor.forClass(RunQueryRequest.class);
+    when(processContext.element()).thenReturn(partitionQueryPair);
+    doNothing().when(processContext).output(captor.capture());
+
+    PartitionQueryResponseToRunQueryRequest fn = new PartitionQueryResponseToRunQueryRequest();
+    fn.processElement(processContext);
+
+    List<StructuredQuery> actualQueries =
+        captor.getAllValues().stream()
+            .map(RunQueryRequest::getStructuredQuery)
+            .collect(Collectors.toList());
+
+    assertEquals(expectedQueries, actualQueries);
+  }
+
   private static Cursor referenceValueCursor(String referenceValue) {
     return Cursor.newBuilder()
         .addValues(Value.newBuilder().setReferenceValue(referenceValue).build())