[BEAM-12191] Fix a bug that upload_graph doesn't reduce template file size (#14582)
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
index cedc56e..31d1296 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
@@ -650,13 +650,6 @@
template_location = (
job.options.view_as(GoogleCloudOptions).template_location)
- job_location = template_location or dataflow_job_file
- if job_location:
- gcs_or_local_path = os.path.dirname(job_location)
- file_name = os.path.basename(job_location)
- self.stage_file(
- gcs_or_local_path, file_name, io.BytesIO(job.json().encode('utf-8')))
-
if job.options.view_as(DebugOptions).lookup_experiment('upload_graph'):
self.stage_file(
job.options.view_as(GoogleCloudOptions).staging_location,
@@ -667,6 +660,15 @@
job.options.view_as(GoogleCloudOptions).staging_location,
"dataflow_graph.json")
+ # template file generation should be placed immediately before the
+ # conditional API call.
+ job_location = template_location or dataflow_job_file
+ if job_location:
+ gcs_or_local_path = os.path.dirname(job_location)
+ file_name = os.path.basename(job_location)
+ self.stage_file(
+ gcs_or_local_path, file_name, io.BytesIO(job.json().encode('utf-8')))
+
if not template_location:
return self.submit_job_description(job)