[BEAM-8660] Override returned artifact staging endpoint
diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py
index 5ade96f..462c5ab 100644
--- a/sdks/python/apache_beam/options/pipeline_options.py
+++ b/sdks/python/apache_beam/options/pipeline_options.py
@@ -865,8 +865,13 @@
def _add_argparse_args(cls, parser):
parser.add_argument(
'--job_endpoint', default=None,
- help=('Job service endpoint to use. Should be in the form of address '
- 'and port, e.g. localhost:3000'))
+ help=('Job service endpoint to use. Should be in the form of host '
+ 'and port, e.g. localhost:8099.'))
+ parser.add_argument(
+ '--artifact_endpoint', default=None,
+ help=('Artifact staging endpoint to use. Should be in the form of host '
+ 'and port, e.g. localhost:8098. If none is specified, the '
+ 'artifact endpoint sent from the job server is used.'))
parser.add_argument(
'--job-server-timeout', default=60, type=int,
help=('Job service request timeout in seconds. The timeout '
diff --git a/sdks/python/apache_beam/runners/portability/portable_runner.py b/sdks/python/apache_beam/runners/portability/portable_runner.py
index 6de9e5c..6e10085 100644
--- a/sdks/python/apache_beam/runners/portability/portable_runner.py
+++ b/sdks/python/apache_beam/runners/portability/portable_runner.py
@@ -280,9 +280,12 @@
prepare_response = job_service.Prepare(
prepare_request,
timeout=portable_options.job_server_timeout)
- if prepare_response.artifact_staging_endpoint.url:
+ artifact_endpoint = (portable_options.artifact_endpoint
+ if portable_options.artifact_endpoint
+ else prepare_response.artifact_staging_endpoint.url)
+ if artifact_endpoint:
stager = portable_stager.PortableStager(
- grpc.insecure_channel(prepare_response.artifact_staging_endpoint.url),
+ grpc.insecure_channel(artifact_endpoint),
prepare_response.staging_session_token)
retrieval_token, _ = stager.stage_job_resources(
options,