blob: 992aa0899455b84ab5ab5f3e3637b86402900d1e [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from __future__ import absolute_import
from __future__ import print_function
import inspect
import logging
import platform
import signal
import socket
import subprocess
import sys
import threading
import time
import traceback
import unittest
import grpc
import apache_beam as beam
from apache_beam.options.pipeline_options import DebugOptions
from apache_beam.options.pipeline_options import DirectOptions
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import PortableOptions
from apache_beam.portability import python_urns
from apache_beam.portability.api import beam_job_api_pb2
from apache_beam.portability.api import beam_job_api_pb2_grpc
from apache_beam.runners.portability import fn_api_runner_test
from apache_beam.runners.portability import portable_runner
from apache_beam.runners.portability.local_job_service import LocalJobServicer
from apache_beam.runners.portability.portable_runner import PortableRunner
from apache_beam.runners.worker import worker_pool_main
from apache_beam.runners.worker.channel_factory import GRPCChannelFactory
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to
from apache_beam.transforms import environments
from apache_beam.transforms import userstate
class PortableRunnerTest(fn_api_runner_test.FnApiRunnerTest):
TIMEOUT_SECS = 60
# Controls job service interaction, not sdk harness interaction.
_use_subprocesses = False
def setUp(self):
if platform.system() != 'Windows':
def handler(signum, frame):
msg = 'Timed out after %s seconds.' % self.TIMEOUT_SECS
print('=' * 20, msg, '=' * 20)
traceback.print_stack(frame)
threads_by_id = {th.ident: th for th in threading.enumerate()}
for thread_id, stack in sys._current_frames().items():
th = threads_by_id.get(thread_id)
print()
print('# Thread:', th or thread_id)
traceback.print_stack(stack)
raise BaseException(msg)
signal.signal(signal.SIGALRM, handler)
signal.alarm(self.TIMEOUT_SECS)
def tearDown(self):
if platform.system() != 'Windows':
signal.alarm(0)
@classmethod
def _pick_unused_port(cls):
return cls._pick_unused_ports(num_ports=1)[0]
@staticmethod
def _pick_unused_ports(num_ports):
"""Not perfect, but we have to provide a port to the subprocess."""
# TODO(robertwb): Consider letting the subprocess communicate a choice of
# port back.
sockets = []
ports = []
for _ in range(0, num_ports):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sockets.append(s)
s.bind(('localhost', 0))
_, port = s.getsockname()
ports.append(port)
try:
return ports
finally:
for s in sockets:
s.close()
@classmethod
def _start_local_runner_subprocess_job_service(cls):
cls._maybe_kill_subprocess()
# TODO(robertwb): Consider letting the subprocess pick one and
# communicate it back...
# pylint: disable=unbalanced-tuple-unpacking
job_port, expansion_port = cls._pick_unused_ports(num_ports=2)
logging.info('Starting server on port %d.', job_port)
cls._subprocess = subprocess.Popen(
cls._subprocess_command(job_port, expansion_port))
address = 'localhost:%d' % job_port
job_service = beam_job_api_pb2_grpc.JobServiceStub(
GRPCChannelFactory.insecure_channel(address))
logging.info('Waiting for server to be ready...')
start = time.time()
timeout = 30
while True:
time.sleep(0.1)
if cls._subprocess.poll() is not None:
raise RuntimeError(
'Subprocess terminated unexpectedly with exit code %d.' %
cls._subprocess.returncode)
elif time.time() - start > timeout:
raise RuntimeError(
'Pipeline timed out waiting for job service subprocess.')
else:
try:
job_service.GetState(
beam_job_api_pb2.GetJobStateRequest(job_id='[fake]'))
break
except grpc.RpcError as exn:
if exn.code() != grpc.StatusCode.UNAVAILABLE:
# We were able to contact the service for our fake state request.
break
logging.info('Server ready.')
return address
@classmethod
def _get_job_endpoint(cls):
if '_job_endpoint' not in cls.__dict__:
cls._job_endpoint = cls._create_job_endpoint()
return cls._job_endpoint
@classmethod
def _create_job_endpoint(cls):
if cls._use_subprocesses:
return cls._start_local_runner_subprocess_job_service()
else:
cls._servicer = LocalJobServicer()
return 'localhost:%d' % cls._servicer.start_grpc_server()
@classmethod
def get_runner(cls):
return portable_runner.PortableRunner()
@classmethod
def tearDownClass(cls):
cls._maybe_kill_subprocess()
@classmethod
def _maybe_kill_subprocess(cls):
if hasattr(cls, '_subprocess') and cls._subprocess.poll() is None:
cls._subprocess.kill()
time.sleep(0.1)
def create_options(self):
def get_pipeline_name():
for _, _, _, method_name, _, _ in inspect.stack():
if method_name.find('test') != -1:
return method_name
return 'unknown_test'
# Set the job name for better debugging.
options = PipelineOptions.from_dictionary({
'job_name': get_pipeline_name() + '_' + str(time.time())
})
options.view_as(PortableOptions).job_endpoint = self._get_job_endpoint()
# Override the default environment type for testing.
options.view_as(PortableOptions).environment_type = (
python_urns.EMBEDDED_PYTHON)
# Enable caching (disabled by default)
options.view_as(DebugOptions).add_experiment('state_cache_size=100')
return options
def create_pipeline(self):
return beam.Pipeline(self.get_runner(), self.create_options())
def test_metrics(self):
self.skipTest('Metrics not supported.')
def test_pardo_state_with_custom_key_coder(self):
"""Tests that state requests work correctly when the key coder is an
SDK-specific coder, i.e. non standard coder. This is additionally enforced
by Java's ProcessBundleDescriptorsTest and by Flink's
ExecutableStageDoFnOperator which detects invalid encoding by checking for
the correct key group of the encoded key."""
index_state_spec = userstate.CombiningValueStateSpec('index', sum)
# Test params
# Ensure decent amount of elements to serve all partitions
n = 200
duplicates = 1
split = n // (duplicates + 1)
inputs = [(i % split, str(i % split)) for i in range(0, n)]
# Use a DoFn which has to use FastPrimitivesCoder because the type cannot
# be inferred
class Input(beam.DoFn):
def process(self, impulse):
for i in inputs:
yield i
class AddIndex(beam.DoFn):
def process(self, kv,
index=beam.DoFn.StateParam(index_state_spec)):
k, v = kv
index.add(1)
yield k, v, index.read()
expected = [(i % split, str(i % split), i // split + 1)
for i in range(0, n)]
with self.create_pipeline() as p:
assert_that(p
| beam.Impulse()
| beam.ParDo(Input())
| beam.ParDo(AddIndex()),
equal_to(expected))
# Inherits all other tests from fn_api_runner_test.FnApiRunnerTest
@unittest.skip("BEAM-7248")
class PortableRunnerOptimized(PortableRunnerTest):
def create_options(self):
options = super(PortableRunnerOptimized, self).create_options()
options.view_as(DebugOptions).add_experiment('pre_optimize=all')
options.view_as(DebugOptions).add_experiment('state_cache_size=100')
return options
class PortableRunnerTestWithExternalEnv(PortableRunnerTest):
@classmethod
def setUpClass(cls):
cls._worker_address, cls._worker_server = (
worker_pool_main.BeamFnExternalWorkerPoolServicer.start(
state_cache_size=100))
@classmethod
def tearDownClass(cls):
cls._worker_server.stop(1)
def create_options(self):
options = super(PortableRunnerTestWithExternalEnv, self).create_options()
options.view_as(PortableOptions).environment_type = 'EXTERNAL'
options.view_as(PortableOptions).environment_config = self._worker_address
return options
class PortableRunnerTestWithSubprocesses(PortableRunnerTest):
_use_subprocesses = True
def create_options(self):
options = super(PortableRunnerTestWithSubprocesses, self).create_options()
options.view_as(PortableOptions).environment_type = (
python_urns.SUBPROCESS_SDK)
options.view_as(PortableOptions).environment_config = (
b'%s -m apache_beam.runners.worker.sdk_worker_main' %
sys.executable.encode('ascii')).decode('utf-8')
# Enable caching (disabled by default)
options.view_as(DebugOptions).add_experiment('state_cache_size=100')
return options
@classmethod
def _subprocess_command(cls, job_port, _):
return [
sys.executable,
'-m', 'apache_beam.runners.portability.local_job_service_main',
'-p', str(job_port),
]
class PortableRunnerTestWithSubprocessesAndMultiWorkers(
PortableRunnerTestWithSubprocesses):
_use_subprocesses = True
def create_options(self):
options = super(PortableRunnerTestWithSubprocessesAndMultiWorkers, self) \
.create_options()
options.view_as(DirectOptions).direct_num_workers = 2
return options
class PortableRunnerInternalTest(unittest.TestCase):
def test__create_default_environment(self):
docker_image = PortableRunner.default_docker_image()
self.assertEqual(
PortableRunner._create_environment(PipelineOptions.from_dictionary({})),
environments.DockerEnvironment(container_image=docker_image))
def test__create_docker_environment(self):
docker_image = 'py-docker'
self.assertEqual(
PortableRunner._create_environment(PipelineOptions.from_dictionary({
'environment_type': 'DOCKER',
'environment_config': docker_image,
})), environments.DockerEnvironment(container_image=docker_image))
def test__create_process_environment(self):
self.assertEqual(
PortableRunner._create_environment(PipelineOptions.from_dictionary({
'environment_type': "PROCESS",
'environment_config': '{"os": "linux", "arch": "amd64", '
'"command": "run.sh", '
'"env":{"k1": "v1"} }',
})), environments.ProcessEnvironment('run.sh', os='linux', arch='amd64',
env={'k1': 'v1'}))
self.assertEqual(
PortableRunner._create_environment(PipelineOptions.from_dictionary({
'environment_type': 'PROCESS',
'environment_config': '{"command": "run.sh"}',
})), environments.ProcessEnvironment('run.sh'))
def test__create_external_environment(self):
self.assertEqual(
PortableRunner._create_environment(PipelineOptions.from_dictionary({
'environment_type': "EXTERNAL",
'environment_config': 'localhost:50000',
})), environments.ExternalEnvironment('localhost:50000'))
raw_config = ' {"url":"localhost:50000", "params":{"k1":"v1"}} '
for env_config in (raw_config, raw_config.lstrip(), raw_config.strip()):
self.assertEqual(
PortableRunner._create_environment(PipelineOptions.from_dictionary({
'environment_type': "EXTERNAL",
'environment_config': env_config,
})), environments.ExternalEnvironment('localhost:50000',
params={"k1":"v1"}))
with self.assertRaises(ValueError):
PortableRunner._create_environment(PipelineOptions.from_dictionary({
'environment_type': "EXTERNAL",
'environment_config': '{invalid}',
}))
with self.assertRaises(ValueError) as ctx:
PortableRunner._create_environment(PipelineOptions.from_dictionary({
'environment_type': "EXTERNAL",
'environment_config': '{"params":{"k1":"v1"}}',
}))
self.assertIn(
'External environment endpoint must be set.', ctx.exception.args)
def hasDockerImage():
image = PortableRunner.default_docker_image()
try:
check_image = subprocess.check_output("docker images -q %s" % image,
shell=True)
return check_image != ''
except Exception:
return False
@unittest.skipIf(not hasDockerImage(), "docker not installed or "
"no docker image")
class PortableRunnerTestWithLocalDocker(PortableRunnerTest):
def create_options(self):
options = super(PortableRunnerTestWithLocalDocker, self).create_options()
options.view_as(PortableOptions).job_endpoint = 'embed'
return options
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
unittest.main()