| # Licensed to the Apache Software Foundation (ASF) under one or more |
| # contributor license agreements. See the NOTICE file distributed with |
| # this work for additional information regarding copyright ownership. |
| # The ASF licenses this file to You under the Apache License, Version 2.0 |
| # (the "License"); you may not use this file except in compliance with |
| # the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # |
| """Implementation of an Artifact{Staging,Retrieval}Service. |
| |
| The staging service here can be backed by any beam filesystem. |
| """ |
| |
| from __future__ import absolute_import |
| from __future__ import division |
| from __future__ import print_function |
| |
| import hashlib |
| import sys |
| import threading |
| import zipfile |
| |
| from google.protobuf import json_format |
| |
| from apache_beam.io import filesystems |
| from apache_beam.portability.api import beam_artifact_api_pb2 |
| from apache_beam.portability.api import beam_artifact_api_pb2_grpc |
| |
| |
| class AbstractArtifactService( |
| beam_artifact_api_pb2_grpc.ArtifactStagingServiceServicer, |
| beam_artifact_api_pb2_grpc.ArtifactRetrievalServiceServicer): |
| |
| _DEFAULT_CHUNK_SIZE = 2 << 20 # 2mb |
| |
| def __init__(self, root, chunk_size=None): |
| self._root = root |
| self._chunk_size = chunk_size or self._DEFAULT_CHUNK_SIZE |
| |
| def _sha256(self, string): |
| return hashlib.sha256(string.encode('utf-8')).hexdigest() |
| |
| def _join(self, *args): |
| raise NotImplementedError(type(self)) |
| |
| def _dirname(self, path): |
| raise NotImplementedError(type(self)) |
| |
| def _temp_path(self, path): |
| return path + '.tmp' |
| |
| def _open(self, path, mode): |
| raise NotImplementedError(type(self)) |
| |
| def _rename(self, src, dest): |
| raise NotImplementedError(type(self)) |
| |
| def _delete(self, path): |
| raise NotImplementedError(type(self)) |
| |
| def _artifact_path(self, retrieval_token, name): |
| return self._join(self._dirname(retrieval_token), self._sha256(name)) |
| |
| def _manifest_path(self, retrieval_token): |
| return retrieval_token |
| |
| def _get_manifest_proxy(self, retrieval_token): |
| with self._open(self._manifest_path(retrieval_token), 'r') as fin: |
| return json_format.Parse( |
| fin.read().decode('utf-8'), beam_artifact_api_pb2.ProxyManifest()) |
| |
| def retrieval_token(self, staging_session_token): |
| return self._join( |
| self._root, self._sha256(staging_session_token), 'MANIFEST') |
| |
| def PutArtifact(self, request_iterator, context=None): |
| first = True |
| for request in request_iterator: |
| if first: |
| first = False |
| metadata = request.metadata.metadata |
| retrieval_token = self.retrieval_token( |
| request.metadata.staging_session_token) |
| artifact_path = self._artifact_path(retrieval_token, metadata.name) |
| temp_path = self._temp_path(artifact_path) |
| fout = self._open(temp_path, 'w') |
| hasher = hashlib.sha256() |
| else: |
| hasher.update(request.data.data) |
| fout.write(request.data.data) |
| fout.close() |
| data_hash = hasher.hexdigest() |
| if metadata.sha256 and metadata.sha256 != data_hash: |
| self._delete(temp_path) |
| raise ValueError('Bad metadata hash: %s vs %s' % ( |
| metadata.sha256, data_hash)) |
| self._rename(temp_path, artifact_path) |
| return beam_artifact_api_pb2.PutArtifactResponse() |
| |
| def CommitManifest(self, request, context=None): |
| retrieval_token = self.retrieval_token(request.staging_session_token) |
| proxy_manifest = beam_artifact_api_pb2.ProxyManifest( |
| manifest=request.manifest, |
| location=[ |
| beam_artifact_api_pb2.ProxyManifest.Location( |
| name=metadata.name, |
| uri=self._artifact_path(retrieval_token, metadata.name)) |
| for metadata in request.manifest.artifact]) |
| with self._open(self._manifest_path(retrieval_token), 'w') as fout: |
| fout.write(json_format.MessageToJson(proxy_manifest).encode('utf-8')) |
| return beam_artifact_api_pb2.CommitManifestResponse( |
| retrieval_token=retrieval_token) |
| |
| def GetManifest(self, request, context=None): |
| return beam_artifact_api_pb2.GetManifestResponse( |
| manifest=self._get_manifest_proxy(request.retrieval_token).manifest) |
| |
| def GetArtifact(self, request, context=None): |
| for artifact in self._get_manifest_proxy(request.retrieval_token).location: |
| if artifact.name == request.name: |
| with self._open(artifact.uri, 'r') as fin: |
| # This value is not emitted, but lets us yield a single empty |
| # chunk on an empty file. |
| chunk = True |
| while chunk: |
| chunk = fin.read(self._chunk_size) |
| yield beam_artifact_api_pb2.ArtifactChunk(data=chunk) |
| break |
| else: |
| raise ValueError('Unknown artifact: %s' % request.name) |
| |
| |
| class ZipFileArtifactService(AbstractArtifactService): |
| """Stores artifacts in a zip file. |
| |
| This is particularly useful for storing artifacts as part of an UberJar for |
| submitting to an upstream runner's cluster. |
| |
| Writing to zip files requires Python 3.6+. |
| """ |
| |
| def __init__(self, path, chunk_size=None): |
| if sys.version_info < (3, 6): |
| raise RuntimeError( |
| 'Writing to zip files requires Python 3.6+, ' |
| 'but current version is %s' % sys.version) |
| super(ZipFileArtifactService, self).__init__('', chunk_size) |
| self._zipfile = zipfile.ZipFile(path, 'a') |
| self._lock = threading.Lock() |
| |
| def _join(self, *args): |
| return '/'.join(args) |
| |
| def _dirname(self, path): |
| return path.rsplit('/', 1)[0] |
| |
| def _temp_path(self, path): |
| return path # ZipFile offers no move operation. |
| |
| def _rename(self, src, dest): |
| assert src == dest |
| |
| def _delete(self, path): |
| # ZipFile offers no delete operation: https://bugs.python.org/issue6818 |
| pass |
| |
| def _open(self, path, mode): |
| return self._zipfile.open(path, mode, force_zip64=True) |
| |
| def PutArtifact(self, request_iterator, context=None): |
| # ZipFile only supports one writable channel at a time. |
| with self._lock: |
| return super( |
| ZipFileArtifactService, self).PutArtifact(request_iterator, context) |
| |
| def CommitManifest(self, request, context=None): |
| # ZipFile only supports one writable channel at a time. |
| with self._lock: |
| return super( |
| ZipFileArtifactService, self).CommitManifest(request, context) |
| |
| def GetManifest(self, request, context=None): |
| # ZipFile appears to not be threadsafe on some platforms. |
| with self._lock: |
| return super(ZipFileArtifactService, self).GetManifest(request, context) |
| |
| def GetArtifact(self, request, context=None): |
| # ZipFile appears to not be threadsafe on some platforms. |
| with self._lock: |
| for chunk in super(ZipFileArtifactService, self).GetArtifact( |
| request, context): |
| yield chunk |
| |
| def close(self): |
| self._zipfile.close() |
| |
| |
| class BeamFilesystemArtifactService(AbstractArtifactService): |
| |
| def _join(self, *args): |
| return filesystems.FileSystems.join(*args) |
| |
| def _dirname(self, path): |
| return filesystems.FileSystems.split(path)[0] |
| |
| def _rename(self, src, dest): |
| filesystems.FileSystems.rename([src], [dest]) |
| |
| def _delete(self, path): |
| filesystems.FileSystems.delete([path]) |
| |
| def _open(self, path, mode='r'): |
| dir = self._dirname(path) |
| if not filesystems.FileSystems.exists(dir): |
| try: |
| filesystems.FileSystems.mkdirs(dir) |
| except Exception: |
| pass |
| |
| if 'w' in mode: |
| return filesystems.FileSystems.create(path) |
| else: |
| return filesystems.FileSystems.open(path) |