blob: ba144f5e6cc290197ea8c91ae191083881e164b9 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# pytype: skip-file
import hashlib
import os
import random
import tempfile
import unittest
import apache_beam as beam
from apache_beam.portability import common_urns
from apache_beam.portability.api import beam_runner_api_pb2
from apache_beam.runners.pipeline_utils import merge_common_environments
from apache_beam.runners.pipeline_utils import merge_superset_dep_environments
from apache_beam.runners.portability.expansion_service_test import FibTransform
class PipelineUtilitiesTest(unittest.TestCase):
def test_equal_environments_merged(self):
pipeline_proto = merge_common_environments(
beam_runner_api_pb2.Pipeline(
components=beam_runner_api_pb2.Components(
environments={
'a1': beam_runner_api_pb2.Environment(urn='A'),
'a2': beam_runner_api_pb2.Environment(urn='A'),
'b1': beam_runner_api_pb2.Environment(
urn='B', payload=b'x'),
'b2': beam_runner_api_pb2.Environment(
urn='B', payload=b'x'),
'b3': beam_runner_api_pb2.Environment(
urn='B', payload=b'y'),
},
transforms={
't1': beam_runner_api_pb2.PTransform(
unique_name='t1', environment_id='a1'),
't2': beam_runner_api_pb2.PTransform(
unique_name='t2', environment_id='a2'),
},
windowing_strategies={
'w1': beam_runner_api_pb2.WindowingStrategy(
environment_id='b1'),
'w2': beam_runner_api_pb2.WindowingStrategy(
environment_id='b2'),
})))
self.assertEqual(len(pipeline_proto.components.environments), 3)
self.assertTrue(('a1' in pipeline_proto.components.environments)
^ ('a2' in pipeline_proto.components.environments))
self.assertTrue(('b1' in pipeline_proto.components.environments)
^ ('b2' in pipeline_proto.components.environments))
self.assertEqual(
len(
set(
t.environment_id
for t in pipeline_proto.components.transforms.values())),
1)
self.assertEqual(
len(
set(
w.environment_id for w in
pipeline_proto.components.windowing_strategies.values())),
1)
def _make_dep(self, path):
hasher = hashlib.sha256()
if os.path.exists(path):
with open(path, 'rb') as fin:
hasher.update(fin.read())
else:
# A fake file, identified only by its path.
hasher.update(path.encode('utf-8'))
return beam_runner_api_pb2.ArtifactInformation(
type_urn=common_urns.artifact_types.FILE.urn,
type_payload=beam_runner_api_pb2.ArtifactFilePayload(
path=path, sha256=hasher.hexdigest()).SerializeToString(),
role_urn=common_urns.artifact_roles.STAGING_TO.urn,
role_payload=beam_runner_api_pb2.ArtifactStagingToRolePayload(
staged_name=os.path.basename(path)).SerializeToString())
def _docker_env(self, id, deps=()):
return beam_runner_api_pb2.Environment(
urn=common_urns.environments.DOCKER.urn,
payload=id.encode('utf8'),
dependencies=[self._make_dep(path) for path in deps],
)
def test_subset_deps_environments_merged(self):
environments = {
'A': self._docker_env('A'),
'Ax': self._docker_env('A', ['x']),
'Ay': self._docker_env('A', ['y']),
'Axy': self._docker_env('A', ['x', 'y']),
'Bx': self._docker_env('B', ['x']),
'Bxy': self._docker_env('B', ['x', 'y']),
'Byz': self._docker_env('B', ['y', 'z']),
}
transforms = {
env_id: beam_runner_api_pb2.PTransform(
unique_name=env_id, environment_id=env_id)
for env_id in environments.keys()
}
pipeline_proto = merge_superset_dep_environments(
beam_runner_api_pb2.Pipeline(
components=beam_runner_api_pb2.Components(
environments=environments, transforms=transforms)))
# These can all be merged into the same environment.
self.assertEqual(
pipeline_proto.components.transforms['A'].environment_id, 'Axy')
self.assertEqual(
pipeline_proto.components.transforms['Ax'].environment_id, 'Axy')
self.assertEqual(
pipeline_proto.components.transforms['Ay'].environment_id, 'Axy')
self.assertEqual(
pipeline_proto.components.transforms['Axy'].environment_id, 'Axy')
# Despite having the same dependencies, these must be merged into their own.
self.assertEqual(
pipeline_proto.components.transforms['Bx'].environment_id, 'Bxy')
self.assertEqual(
pipeline_proto.components.transforms['Bxy'].environment_id, 'Bxy')
# This is not a subset of any, must be left alone.
self.assertEqual(
pipeline_proto.components.transforms['Byz'].environment_id, 'Byz')
def test_subset_deps_environments_merged_with_requirements_txt(self):
with tempfile.TemporaryDirectory() as tmpdir:
def make_file(basename, content):
subdir = tempfile.mkdtemp(dir=tmpdir)
path = os.path.join(subdir, basename)
with open(path, 'w') as fout:
fout.write(content)
return path
def make_py_deps(*pkgs):
return [
make_file('requirements.txt', '\n'.join(pkgs)),
make_file(
'submission_environment_dependencies.txt', str(
random.random())),
] + [make_file(pkg, pkg) for pkg in pkgs]
environments = {
'A': self._docker_env('A'),
'Ax': self._docker_env('A', make_py_deps('x')),
'Ay': self._docker_env('A', make_py_deps('y')),
'Axy': self._docker_env('A', make_py_deps('x', 'y')),
}
transforms = {
env_id: beam_runner_api_pb2.PTransform(
unique_name=env_id, environment_id=env_id)
for env_id in environments.keys()
}
pipeline_proto = merge_superset_dep_environments(
beam_runner_api_pb2.Pipeline(
components=beam_runner_api_pb2.Components(
environments=environments, transforms=transforms)))
# These can all be merged into the same environment.
self.assertEqual(
pipeline_proto.components.transforms['A'].environment_id, 'Axy')
self.assertEqual(
pipeline_proto.components.transforms['Ax'].environment_id, 'Axy')
self.assertEqual(
pipeline_proto.components.transforms['Ay'].environment_id, 'Axy')
self.assertEqual(
pipeline_proto.components.transforms['Axy'].environment_id, 'Axy')
def test_external_merged(self):
p = beam.Pipeline()
# This transform recursively creates several external environments.
_ = p | FibTransform(4)
pipeline_proto = p.to_runner_api()
# All our external environments are equal and consolidated.
# We also have a placeholder "default" environment that has not been
# resolved do anything concrete yet.
envs = pipeline_proto.components.environments
self.assertEqual(
len(envs), 2, f'should be 2 environments, instead got: {envs}')
if __name__ == '__main__':
unittest.main()