| #!/usr/bin/env python |
| ################################################################################# |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| ################################################################################ |
| """ |
| This script is a python implementation of the "boot.go" script in "beam-sdks-python-container" |
| project of Apache Beam, see in: |
| |
| https://github.com/apache/beam/blob/release-2.14.0/sdks/python/container/boot.go |
| |
| It is implemented in golang and will introduce unnecessary dependencies if used in pure python |
| project. So we add a python implementation which will be used when the python worker runs in |
| process mode. It downloads and installs users' python artifacts, then launches the python SDK |
| harness of Apache Beam. |
| """ |
| import argparse |
| import os |
| from subprocess import call |
| |
| import grpc |
| import logging |
| import sys |
| |
| from apache_beam.portability.api.beam_fn_api_pb2_grpc import BeamFnExternalWorkerPoolStub |
| from apache_beam.portability.api.beam_fn_api_pb2 import StartWorkerRequest |
| from apache_beam.portability.api.beam_provision_api_pb2_grpc import ProvisionServiceStub |
| from apache_beam.portability.api.beam_provision_api_pb2 import GetProvisionInfoRequest |
| from apache_beam.portability.api.endpoints_pb2 import ApiServiceDescriptor |
| |
| from google.protobuf import json_format, text_format |
| |
| |
| def check_not_empty(check_str, error_message): |
| if check_str == "": |
| logging.fatal(error_message) |
| exit(1) |
| |
| |
| python_exec = sys.executable |
| |
| |
| if __name__ == "__main__": |
| # print INFO and higher level messages |
| logging.basicConfig(stream=sys.stdout, level=logging.INFO) |
| |
| parser = argparse.ArgumentParser() |
| |
| parser.add_argument("--id", default="", help="Local identifier (required).") |
| parser.add_argument("--provision_endpoint", default="", |
| help="Provision endpoint (required).") |
| parser.add_argument("--semi_persist_dir", default="/tmp", |
| help="Local semi-persistent directory (optional).") |
| |
| args = parser.parse_known_args()[0] |
| |
| worker_id = args.id |
| provision_endpoint = args.provision_endpoint |
| semi_persist_dir = args.semi_persist_dir |
| |
| check_not_empty(worker_id, "No id provided.") |
| check_not_empty(provision_endpoint, "No provision endpoint provided.") |
| |
| logging.info("Initializing Python harness: %s" % " ".join(sys.argv)) |
| |
| if 'PYFLINK_LOOPBACK_SERVER_ADDRESS' in os.environ: |
| logging.info("Starting up Python harness in loopback mode.") |
| |
| params = dict(os.environ) |
| params.update({'SEMI_PERSISTENT_DIRECTORY': semi_persist_dir}) |
| with grpc.insecure_channel(os.environ['PYFLINK_LOOPBACK_SERVER_ADDRESS']) as channel: |
| client = BeamFnExternalWorkerPoolStub(channel=channel) |
| request = StartWorkerRequest( |
| worker_id=worker_id, |
| provision_endpoint=ApiServiceDescriptor(url=provision_endpoint), |
| params=params) |
| client.StartWorker(request) |
| else: |
| logging.info("Starting up Python harness in a standalone process.") |
| metadata = [("worker_id", worker_id)] |
| |
| # read job information from provision stub |
| with grpc.insecure_channel(provision_endpoint) as channel: |
| client = ProvisionServiceStub(channel=channel) |
| info = client.GetProvisionInfo(GetProvisionInfoRequest(), metadata=metadata).info |
| options = json_format.MessageToJson(info.pipeline_options) |
| logging_endpoint = info.logging_endpoint.url |
| control_endpoint = info.control_endpoint.url |
| |
| os.environ["WORKER_ID"] = worker_id |
| os.environ["PIPELINE_OPTIONS"] = options |
| os.environ["SEMI_PERSISTENT_DIRECTORY"] = semi_persist_dir |
| os.environ["LOGGING_API_SERVICE_DESCRIPTOR"] = text_format.MessageToString( |
| ApiServiceDescriptor(url=logging_endpoint)) |
| os.environ["CONTROL_API_SERVICE_DESCRIPTOR"] = text_format.MessageToString( |
| ApiServiceDescriptor(url=control_endpoint)) |
| |
| env = dict(os.environ) |
| |
| if "FLINK_BOOT_TESTING" in os.environ and os.environ["FLINK_BOOT_TESTING"] == "1": |
| logging.info("Shut down Python harness due to FLINK_BOOT_TESTING is set.") |
| exit(0) |
| |
| call([python_exec, "-m", "pyflink.fn_execution.beam.beam_sdk_worker_main"], |
| stdout=sys.stdout, stderr=sys.stderr, env=env) |