Merge pull request #10108 from ibzib/artifact-endpoint

[BEAM-8660] Override returned artifact staging endpoint
diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py
index 41cd306..b92ee44 100644
--- a/sdks/python/apache_beam/options/pipeline_options.py
+++ b/sdks/python/apache_beam/options/pipeline_options.py
@@ -872,8 +872,13 @@
   def _add_argparse_args(cls, parser):
     parser.add_argument(
         '--job_endpoint', default=None,
-        help=('Job service endpoint to use. Should be in the form of address '
-              'and port, e.g. localhost:3000'))
+        help=('Job service endpoint to use. Should be in the form of host '
+              'and port, e.g. localhost:8099.'))
+    parser.add_argument(
+        '--artifact_endpoint', default=None,
+        help=('Artifact staging endpoint to use. Should be in the form of host '
+              'and port, e.g. localhost:8098. If none is specified, the '
+              'artifact endpoint sent from the job server is used.'))
     parser.add_argument(
         '--job-server-timeout', default=60, type=int,
         help=('Job service request timeout in seconds. The timeout '
diff --git a/sdks/python/apache_beam/runners/portability/portable_runner.py b/sdks/python/apache_beam/runners/portability/portable_runner.py
index 77fad5d..8b078a5 100644
--- a/sdks/python/apache_beam/runners/portability/portable_runner.py
+++ b/sdks/python/apache_beam/runners/portability/portable_runner.py
@@ -262,9 +262,12 @@
     prepare_response = job_service.Prepare(
         prepare_request,
         timeout=portable_options.job_server_timeout)
-    if prepare_response.artifact_staging_endpoint.url:
+    artifact_endpoint = (portable_options.artifact_endpoint
+                         if portable_options.artifact_endpoint
+                         else prepare_response.artifact_staging_endpoint.url)
+    if artifact_endpoint:
       stager = portable_stager.PortableStager(
-          grpc.insecure_channel(prepare_response.artifact_staging_endpoint.url),
+          grpc.insecure_channel(artifact_endpoint),
           prepare_response.staging_session_token)
       retrieval_token, _ = stager.stage_job_resources(
           options,