blob: 8cb6d4c4dfec080975bb36b106e1017b5b5f74d3 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.
""" module for the workflow's worker utilities.
All the workflow related code is gathered in a package that will be built as a
source distribution, staged in the staging area for the workflow being run and
then installed in the workers when they start running.
This behavior is triggered by specifying the --setup_file command line option
when running the workflow for remote execution.
from __future__ import absolute_import
from __future__ import print_function
import subprocess
from import build as _build
# TODO: (BEAM-8411): re-enable lint check.
import setuptools # pylint: disable-all
# This class handles the pip install mechanism.
class build(_build): # pylint: disable=invalid-name
"""A build command class that will be invoked during package install.
The package built using the current will be staged and later
installed in the worker using `pip install package'. This class will be
instantiated during install for this specific scenario and will trigger
running the custom commands specified.
sub_commands = _build.sub_commands + [('CustomCommands', None)]
# Some custom command to run during setup. The command is not essential for this
# workflow. It is used here as an example. Each command will spawn a child
# process. Typically, these commands will include steps to install non-Python
# packages. For instance, to install a C++-based library libjpeg62 the following
# two commands will have to be added:
# ['apt-get', 'update'],
# ['apt-get', '--assume-yes', 'install', 'libjpeg62'],
# First, note that there is no need to use the sudo command because the setup
# script runs with appropriate access.
# Second, if apt-get tool is used then the first command needs to be 'apt-get
# update' so the tool refreshes itself and initializes links to download
# repositories. Without this initial step the other apt-get install commands
# will fail with package not found errors. Note also --assume-yes option which
# shortcuts the interactive confirmation.
# Note that in this example custom commands will run after installing required
# packages. If you have a PyPI package that depends on one of the custom
# commands, move installation of the dependent package to the list of custom
# commands, e.g.:
# ['pip', 'install', 'my_package'],
# TODO(BEAM-3237): Output from the custom commands are missing from the logs.
# The output of custom commands (including failures) will be logged in the
# worker-startup log.
['echo', 'Custom command worked!']]
class CustomCommands(setuptools.Command):
"""A setuptools Command class able to run arbitrary commands."""
def initialize_options(self):
def finalize_options(self):
def RunCustomCommand(self, command_list):
print('Running command: %s' % command_list)
p = subprocess.Popen(
stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
# Can use communicate(input='y\n'.encode()) if the command run requires
# some confirmation.
stdout_data, _ = p.communicate()
print('Command output: %s' % stdout_data)
if p.returncode != 0:
raise RuntimeError(
'Command %s failed: exit code: %s' % (command_list, p.returncode))
def run(self):
for command in CUSTOM_COMMANDS:
# Configure the required packages and scripts to install.
# Note that the Python Dataflow containers come with numpy already installed
# so this dependency will not trigger anything to be installed unless a version
# restriction is specified.
description='Julia set workflow package.',
# Command class instantiated and run during pip install scenarios.
'build': build,
'CustomCommands': CustomCommands,