Merge pull request #10308 from ibzib/zip-artifacts

[BEAM-8835] Stage artifacts to BEAM-PIPELINE dir in zip
diff --git a/sdks/python/apache_beam/runners/portability/artifact_service.py b/sdks/python/apache_beam/runners/portability/artifact_service.py
index 100eca5..1ba9602 100644
--- a/sdks/python/apache_beam/runners/portability/artifact_service.py
+++ b/sdks/python/apache_beam/runners/portability/artifact_service.py
@@ -146,12 +146,12 @@
   Writing to zip files requires Python 3.6+.
   """
 
-  def __init__(self, path, chunk_size=None):
+  def __init__(self, path, internal_root, chunk_size=None):
     if sys.version_info < (3, 6):
       raise RuntimeError(
           'Writing to zip files requires Python 3.6+, '
           'but current version is %s' % sys.version)
-    super(ZipFileArtifactService, self).__init__('', chunk_size)
+    super(ZipFileArtifactService, self).__init__(internal_root, chunk_size)
     self._zipfile = zipfile.ZipFile(path, 'a')
     self._lock = threading.Lock()
 
@@ -172,6 +172,10 @@
     pass
 
   def _open(self, path, mode):
+    if path.startswith('/'):
+      raise ValueError(
+          'ZIP file entry %s invalid: '
+          'path must not contain a leading slash.' % path)
     return self._zipfile.open(path, mode, force_zip64=True)
 
   def PutArtifact(self, request_iterator, context=None):
diff --git a/sdks/python/apache_beam/runners/portability/artifact_service_test.py b/sdks/python/apache_beam/runners/portability/artifact_service_test.py
index f5da724..6efb60d 100644
--- a/sdks/python/apache_beam/runners/portability/artifact_service_test.py
+++ b/sdks/python/apache_beam/runners/portability/artifact_service_test.py
@@ -219,7 +219,7 @@
 class ZipFileArtifactServiceTest(AbstractArtifactServiceTest):
   def create_service(self, staging_dir):
     return artifact_service.ZipFileArtifactService(
-        os.path.join(staging_dir, 'test.zip'), chunk_size=10)
+        os.path.join(staging_dir, 'test.zip'), 'root', chunk_size=10)
 
 
 class BeamFilesystemArtifactServiceTest(AbstractArtifactServiceTest):
diff --git a/sdks/python/apache_beam/runners/portability/flink_uber_jar_job_server.py b/sdks/python/apache_beam/runners/portability/flink_uber_jar_job_server.py
index c6348cf..b318971 100644
--- a/sdks/python/apache_beam/runners/portability/flink_uber_jar_job_server.py
+++ b/sdks/python/apache_beam/runners/portability/flink_uber_jar_job_server.py
@@ -105,6 +105,7 @@
       [PIPELINE_FOLDER, PIPELINE_NAME, 'pipeline-options.json'])
   ARTIFACT_MANIFEST_PATH = '/'.join(
       [PIPELINE_FOLDER, PIPELINE_NAME, 'artifact-manifest.json'])
+  ARTIFACT_FOLDER = '/'.join([PIPELINE_FOLDER, PIPELINE_NAME, 'artifacts'])
 
   def __init__(
       self, master_url, executable_jar, job_id, job_name, pipeline, options,
@@ -134,7 +135,7 @@
 
   def _start_artifact_service(self, jar, requested_port):
     self._artifact_staging_service = artifact_service.ZipFileArtifactService(
-        jar)
+        jar, self.ARTIFACT_FOLDER)
     self._artifact_staging_server = grpc.server(futures.ThreadPoolExecutor())
     port = self._artifact_staging_server.add_insecure_port(
         '[::]:%s' % requested_port)