Merge pull request #23967: Merge pull request #23954: Emit job ids via side output in TriggerFileLoads process to keep beam.Flatten() happy for Spark and Flink runners
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
index 8b899a3..b16b0db 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
@@ -437,18 +437,18 @@
     # Trigger potential schema modification by loading zero rows into the
     # destination table with the temporary table schema.
     schema_update_job_reference = self.bq_wrapper.perform_load_job(
-      destination=table_reference,
-      source_stream=io.BytesIO(),  # file with zero rows
-      job_id=job_name,
-      schema=temp_table_schema,
-      write_disposition='WRITE_APPEND',
-      create_disposition='CREATE_NEVER',
-      additional_load_parameters=additional_parameters,
-      job_labels=self._bq_io_metadata.add_additional_bq_job_labels(),
-      # JSON format is hardcoded because zero rows load(unlike AVRO) and
-      # a nested schema(unlike CSV, which a default one) is permitted.
-      source_format="NEWLINE_DELIMITED_JSON",
-      load_job_project_id=self._load_job_project_id)
+        destination=table_reference,
+        source_stream=io.BytesIO(),  # file with zero rows
+        job_id=job_name,
+        schema=temp_table_schema,
+        write_disposition='WRITE_APPEND',
+        create_disposition='CREATE_NEVER',
+        additional_load_parameters=additional_parameters,
+        job_labels=self._bq_io_metadata.add_additional_bq_job_labels(),
+        # JSON format is hardcoded because zero rows load(unlike AVRO) and
+        # a nested schema(unlike CSV, which a default one) is permitted.
+        source_format="NEWLINE_DELIMITED_JSON",
+        load_job_project_id=self._load_job_project_id)
     self.pending_jobs.append(
         GlobalWindows.windowed_value(
             (destination, schema_update_job_reference)))
@@ -597,6 +597,7 @@
   """
 
   TEMP_TABLES = 'TemporaryTables'
+  ONGOING_JOBS = 'OngoingJobs'
 
   def __init__(
       self,
@@ -718,6 +719,8 @@
         source_format=self.source_format,
         job_labels=self.bq_io_metadata.add_additional_bq_job_labels(),
         load_job_project_id=self.load_job_project_id)
+    yield pvalue.TaggedOutput(
+        TriggerLoadJobs.ONGOING_JOBS, (destination, job_reference))
     self.pending_jobs.append(
         GlobalWindows.windowed_value((destination, job_reference)))
 
@@ -1054,13 +1057,17 @@
                 load_job_project_id=self.load_job_project_id),
             load_job_name_pcv,
             *self.schema_side_inputs).with_outputs(
-                TriggerLoadJobs.TEMP_TABLES, main='main'))
+                TriggerLoadJobs.TEMP_TABLES,
+                TriggerLoadJobs.ONGOING_JOBS,
+                main='main'))
 
-    temp_tables_load_job_ids_pc = trigger_loads_outputs['main']
+    finished_temp_tables_load_job_ids_pc = trigger_loads_outputs['main']
+    temp_tables_load_job_ids_pc = trigger_loads_outputs[
+        TriggerLoadJobs.ONGOING_JOBS]
     temp_tables_pc = trigger_loads_outputs[TriggerLoadJobs.TEMP_TABLES]
 
     schema_mod_job_ids_pc = (
-        temp_tables_load_job_ids_pc
+        finished_temp_tables_load_job_ids_pc
         | beam.ParDo(
             UpdateDestinationSchema(
                 project=self.project,
@@ -1072,7 +1079,7 @@
             schema_mod_job_name_pcv))
 
     copy_job_outputs = (
-        temp_tables_load_job_ids_pc
+        finished_temp_tables_load_job_ids_pc
         | beam.ParDo(
             TriggerCopyJobs(
                 project=self.project,
@@ -1113,7 +1120,9 @@
                 step_name=step_name,
                 load_job_project_id=self.load_job_project_id),
             load_job_name_pcv,
-            *self.schema_side_inputs))
+            *self.schema_side_inputs).with_outputs(
+                TriggerLoadJobs.ONGOING_JOBS, main='main')
+    )[TriggerLoadJobs.ONGOING_JOBS]
 
     destination_load_job_ids_pc = (
         (temp_tables_load_job_ids_pc, destination_load_job_ids_pc)