Merge pull request #10308 from ibzib/zip-artifacts
[BEAM-8835] Stage artifacts to BEAM-PIPELINE dir in zip
diff --git a/sdks/python/apache_beam/runners/portability/artifact_service.py b/sdks/python/apache_beam/runners/portability/artifact_service.py
index 100eca5..1ba9602 100644
--- a/sdks/python/apache_beam/runners/portability/artifact_service.py
+++ b/sdks/python/apache_beam/runners/portability/artifact_service.py
@@ -146,12 +146,12 @@
Writing to zip files requires Python 3.6+.
"""
- def __init__(self, path, chunk_size=None):
+ def __init__(self, path, internal_root, chunk_size=None):
if sys.version_info < (3, 6):
raise RuntimeError(
'Writing to zip files requires Python 3.6+, '
'but current version is %s' % sys.version)
- super(ZipFileArtifactService, self).__init__('', chunk_size)
+ super(ZipFileArtifactService, self).__init__(internal_root, chunk_size)
self._zipfile = zipfile.ZipFile(path, 'a')
self._lock = threading.Lock()
@@ -172,6 +172,10 @@
pass
def _open(self, path, mode):
+ if path.startswith('/'):
+ raise ValueError(
+ 'ZIP file entry %s invalid: '
+ 'path must not contain a leading slash.' % path)
return self._zipfile.open(path, mode, force_zip64=True)
def PutArtifact(self, request_iterator, context=None):
diff --git a/sdks/python/apache_beam/runners/portability/artifact_service_test.py b/sdks/python/apache_beam/runners/portability/artifact_service_test.py
index f5da724..6efb60d 100644
--- a/sdks/python/apache_beam/runners/portability/artifact_service_test.py
+++ b/sdks/python/apache_beam/runners/portability/artifact_service_test.py
@@ -219,7 +219,7 @@
class ZipFileArtifactServiceTest(AbstractArtifactServiceTest):
def create_service(self, staging_dir):
return artifact_service.ZipFileArtifactService(
- os.path.join(staging_dir, 'test.zip'), chunk_size=10)
+ os.path.join(staging_dir, 'test.zip'), 'root', chunk_size=10)
class BeamFilesystemArtifactServiceTest(AbstractArtifactServiceTest):
diff --git a/sdks/python/apache_beam/runners/portability/flink_uber_jar_job_server.py b/sdks/python/apache_beam/runners/portability/flink_uber_jar_job_server.py
index c6348cf..b318971 100644
--- a/sdks/python/apache_beam/runners/portability/flink_uber_jar_job_server.py
+++ b/sdks/python/apache_beam/runners/portability/flink_uber_jar_job_server.py
@@ -105,6 +105,7 @@
[PIPELINE_FOLDER, PIPELINE_NAME, 'pipeline-options.json'])
ARTIFACT_MANIFEST_PATH = '/'.join(
[PIPELINE_FOLDER, PIPELINE_NAME, 'artifact-manifest.json'])
+ ARTIFACT_FOLDER = '/'.join([PIPELINE_FOLDER, PIPELINE_NAME, 'artifacts'])
def __init__(
self, master_url, executable_jar, job_id, job_name, pipeline, options,
@@ -134,7 +135,7 @@
def _start_artifact_service(self, jar, requested_port):
self._artifact_staging_service = artifact_service.ZipFileArtifactService(
- jar)
+ jar, self.ARTIFACT_FOLDER)
self._artifact_staging_server = grpc.server(futures.ThreadPoolExecutor())
port = self._artifact_staging_server.add_insecure_port(
'[::]:%s' % requested_port)