blob: 1aa841df4c31355f8ea75745b27b0e9acb33db2c [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# pytype: skip-file
import copy
import itertools
import json
import logging
import shutil
import tempfile
import uuid
import zipfile
from concurrent import futures
from typing import TYPE_CHECKING
from typing import Dict
from typing import Iterator
from typing import Optional
from typing import Tuple
from typing import Union
import grpc
from google.protobuf import json_format
from google.protobuf import timestamp_pb2
from apache_beam.portability.api import beam_artifact_api_pb2_grpc
from apache_beam.portability.api import beam_job_api_pb2
from apache_beam.portability.api import beam_job_api_pb2_grpc
from apache_beam.portability.api import endpoints_pb2
from apache_beam.runners.portability import artifact_service
from apache_beam.utils.timestamp import Timestamp
if TYPE_CHECKING:
# pylint: disable=ungrouped-imports
from typing import BinaryIO
from google.protobuf import struct_pb2
from apache_beam.portability.api import beam_runner_api_pb2
_LOGGER = logging.getLogger(__name__)
StateEvent = Tuple[int, Union[timestamp_pb2.Timestamp, Timestamp]]
def make_state_event(state, timestamp):
if isinstance(timestamp, Timestamp):
proto_timestamp = timestamp.to_proto()
elif isinstance(timestamp, timestamp_pb2.Timestamp):
proto_timestamp = timestamp
else:
raise ValueError(
"Expected apache_beam.utils.timestamp.Timestamp, "
"or google.protobuf.timestamp_pb2.Timestamp. "
"Got %s" % type(timestamp))
return beam_job_api_pb2.JobStateEvent(state=state, timestamp=proto_timestamp)
class AbstractJobServiceServicer(beam_job_api_pb2_grpc.JobServiceServicer):
"""Manages one or more pipelines, possibly concurrently.
Experimental: No backward compatibility guaranteed.
Servicer for the Beam Job API.
"""
def __init__(self):
self._jobs = {} # type: Dict[str, AbstractBeamJob]
def create_beam_job(self,
preparation_id, # stype: str
job_name, # type: str
pipeline, # type: beam_runner_api_pb2.Pipeline
options # type: struct_pb2.Struct
):
# type: (...) -> AbstractBeamJob
"""Returns an instance of AbstractBeamJob specific to this servicer."""
raise NotImplementedError(type(self))
def Prepare(self,
request, # type: beam_job_api_pb2.PrepareJobRequest
context=None,
timeout=None
):
# type: (...) -> beam_job_api_pb2.PrepareJobResponse
_LOGGER.debug('Got Prepare request.')
preparation_id = '%s-%s' % (request.job_name, uuid.uuid4())
self._jobs[preparation_id] = self.create_beam_job(
preparation_id,
request.job_name,
request.pipeline,
request.pipeline_options)
self._jobs[preparation_id].prepare()
_LOGGER.debug("Prepared job '%s' as '%s'", request.job_name, preparation_id)
return beam_job_api_pb2.PrepareJobResponse(
preparation_id=preparation_id,
artifact_staging_endpoint=self._jobs[preparation_id].
artifact_staging_endpoint(),
staging_session_token=preparation_id)
def Run(self,
request, # type: beam_job_api_pb2.RunJobRequest
context=None,
timeout=None
):
# type: (...) -> beam_job_api_pb2.RunJobResponse
# For now, just use the preparation id as the job id.
job_id = request.preparation_id
_LOGGER.info("Running job '%s'", job_id)
self._jobs[job_id].run()
return beam_job_api_pb2.RunJobResponse(job_id=job_id)
def GetJobs(self,
request, # type: beam_job_api_pb2.GetJobsRequest
context=None,
timeout=None
):
# type: (...) -> beam_job_api_pb2.GetJobsResponse
return beam_job_api_pb2.GetJobsResponse(
job_info=[job.to_runner_api() for job in self._jobs.values()])
def GetState(
self,
request, # type: beam_job_api_pb2.GetJobStateRequest
context=None):
# type: (...) -> beam_job_api_pb2.JobStateEvent
return make_state_event(*self._jobs[request.job_id].get_state())
def GetPipeline(self,
request, # type: beam_job_api_pb2.GetJobPipelineRequest
context=None,
timeout=None
):
# type: (...) -> beam_job_api_pb2.GetJobPipelineResponse
return beam_job_api_pb2.GetJobPipelineResponse(
pipeline=self._jobs[request.job_id].get_pipeline())
def Cancel(self,
request, # type: beam_job_api_pb2.CancelJobRequest
context=None,
timeout=None
):
# type: (...) -> beam_job_api_pb2.CancelJobResponse
self._jobs[request.job_id].cancel()
return beam_job_api_pb2.CancelJobResponse(
state=self._jobs[request.job_id].get_state()[0])
def GetStateStream(self, request, context=None, timeout=None):
# type: (...) -> Iterator[beam_job_api_pb2.JobStateEvent]
"""Yields state transitions since the stream started.
"""
if request.job_id not in self._jobs:
raise LookupError("Job {} does not exist".format(request.job_id))
job = self._jobs[request.job_id]
for state, timestamp in job.get_state_stream():
yield make_state_event(state, timestamp)
def GetMessageStream(self, request, context=None, timeout=None):
# type: (...) -> Iterator[beam_job_api_pb2.JobMessagesResponse]
"""Yields messages since the stream started.
"""
if request.job_id not in self._jobs:
raise LookupError("Job {} does not exist".format(request.job_id))
job = self._jobs[request.job_id]
for msg in job.get_message_stream():
if isinstance(msg, tuple):
resp = beam_job_api_pb2.JobMessagesResponse(
state_response=make_state_event(*msg))
else:
resp = beam_job_api_pb2.JobMessagesResponse(message_response=msg)
yield resp
def DescribePipelineOptions(self, request, context=None, timeout=None):
# type: (...) -> beam_job_api_pb2.DescribePipelineOptionsResponse
return beam_job_api_pb2.DescribePipelineOptionsResponse()
class AbstractBeamJob(object):
"""Abstract baseclass for managing a single Beam job."""
def __init__(self,
job_id, # type: str
job_name, # type: str
pipeline, # type: beam_runner_api_pb2.Pipeline
options # type: struct_pb2.Struct
):
self._job_id = job_id
self._job_name = job_name
self._pipeline_proto = pipeline
self._pipeline_options = options
self._state_history = [(beam_job_api_pb2.JobState.STOPPED, Timestamp.now())]
def prepare(self):
# type: () -> None
"""Called immediately after this class is instantiated"""
raise NotImplementedError(self)
def run(self):
# type: () -> None
raise NotImplementedError(self)
def cancel(self):
# type: () -> Optional[beam_job_api_pb2.JobState.Enum]
raise NotImplementedError(self)
def artifact_staging_endpoint(self):
# type: () -> Optional[endpoints_pb2.ApiServiceDescriptor]
raise NotImplementedError(self)
def get_state_stream(self):
# type: () -> Iterator[StateEvent]
raise NotImplementedError(self)
def get_message_stream(self):
# type: () -> Iterator[Union[StateEvent, Optional[beam_job_api_pb2.JobMessage]]]
raise NotImplementedError(self)
@property
def state(self):
"""Get the latest state enum."""
return self.get_state()[0]
def get_state(self):
"""Get a tuple of the latest state and its timestamp."""
# this is safe: initial state is set in __init__
return self._state_history[-1]
def set_state(self, new_state):
"""Set the latest state as an int enum and update the state history.
:param new_state: int
latest state enum
:return: Timestamp or None
the new timestamp if the state has not changed, else None
"""
if new_state != self._state_history[-1][0]:
timestamp = Timestamp.now()
self._state_history.append((new_state, timestamp))
return timestamp
else:
return None
def with_state_history(self, state_stream):
"""Utility to prepend recorded state history to an active state stream"""
return itertools.chain(self._state_history[:], state_stream)
def get_pipeline(self):
# type: () -> beam_runner_api_pb2.Pipeline
return self._pipeline_proto
@staticmethod
def is_terminal_state(state):
from apache_beam.runners.portability import portable_runner
return state in portable_runner.TERMINAL_STATES
def to_runner_api(self):
# type: () -> beam_job_api_pb2.JobInfo
return beam_job_api_pb2.JobInfo(
job_id=self._job_id,
job_name=self._job_name,
pipeline_options=self._pipeline_options,
state=self.state)
class JarArtifactManager(object):
def __init__(self, jar_path, root):
self._root = root
self._zipfile_handle = zipfile.ZipFile(jar_path, 'a')
def close(self):
self._zipfile_handle.close()
def file_writer(self, path):
# type: (str) -> Tuple[BinaryIO, str]
"""Given a relative path, returns an open handle that can be written to
and an reference that can later be used to read this file."""
full_path = '%s/%s' % (self._root, path)
return self._zipfile_handle.open(
full_path, 'w', force_zip64=True), 'classpath://%s' % full_path
def zipfile_handle(self):
return self._zipfile_handle
class UberJarBeamJob(AbstractBeamJob):
"""Abstract baseclass for creating a Beam job. The resulting job will be
packaged and run in an executable uber jar."""
# These must agree with those defined in PortablePipelineJarUtils.java.
PIPELINE_FOLDER = 'BEAM-PIPELINE'
PIPELINE_MANIFEST = PIPELINE_FOLDER + '/pipeline-manifest.json'
# We only stage a single pipeline in the jar.
PIPELINE_NAME = 'pipeline'
PIPELINE_PATH = '/'.join([PIPELINE_FOLDER, PIPELINE_NAME, "pipeline.json"])
PIPELINE_OPTIONS_PATH = '/'.join(
[PIPELINE_FOLDER, PIPELINE_NAME, 'pipeline-options.json'])
ARTIFACT_FOLDER = '/'.join([PIPELINE_FOLDER, PIPELINE_NAME, 'artifacts'])
def __init__(
self,
executable_jar,
job_id,
job_name,
pipeline,
options,
artifact_port=0):
super().__init__(job_id, job_name, pipeline, options)
self._executable_jar = executable_jar
self._jar_uploaded = False
self._artifact_port = artifact_port
def prepare(self):
# Copy the executable jar, injecting the pipeline and options as resources.
with tempfile.NamedTemporaryFile(suffix='.jar') as tout:
self._jar = tout.name
shutil.copy(self._executable_jar, self._jar)
self._start_artifact_service(self._jar, self._artifact_port)
def _start_artifact_service(self, jar, requested_port):
self._artifact_manager = JarArtifactManager(self._jar, self.ARTIFACT_FOLDER)
self._artifact_staging_service = artifact_service.ArtifactStagingService(
self._artifact_manager.file_writer)
self._artifact_staging_service.register_job(
self._job_id,
{
env_id: env.dependencies
for (env_id,
env) in self._pipeline_proto.components.environments.items()
})
options = [("grpc.http2.max_pings_without_data", 0),
("grpc.http2.max_ping_strikes", 0)]
self._artifact_staging_server = grpc.server(
futures.ThreadPoolExecutor(), options=options)
port = self._artifact_staging_server.add_insecure_port(
'[::]:%s' % requested_port)
beam_artifact_api_pb2_grpc.add_ArtifactStagingServiceServicer_to_server(
self._artifact_staging_service, self._artifact_staging_server)
self._artifact_staging_endpoint = endpoints_pb2.ApiServiceDescriptor(
url='localhost:%d' % port)
self._artifact_staging_server.start()
_LOGGER.info('Artifact server started on port %s', port)
return port
def _stop_artifact_service(self):
self._artifact_staging_server.stop(1)
# Update dependencies to point to staged files.
pipeline = copy.copy(self._pipeline_proto)
if any(env.dependencies
for env in pipeline.components.environments.values()):
for env_id, deps in self._artifact_staging_service.resolved_deps(
self._job_id).items():
# Slice assignment not supported for repeated fields.
env = self._pipeline_proto.components.environments[env_id]
del env.dependencies[:]
env.dependencies.extend(deps)
# Copy the pipeline definition and metadata into the jar.
z = self._artifact_manager.zipfile_handle()
with z.open(self.PIPELINE_PATH, 'w') as fout:
fout.write(
json_format.MessageToJson(self._pipeline_proto).encode('utf-8'))
with z.open(self.PIPELINE_OPTIONS_PATH, 'w') as fout:
fout.write(
json_format.MessageToJson(self._pipeline_options).encode('utf-8'))
with z.open(self.PIPELINE_MANIFEST, 'w') as fout:
fout.write(
json.dumps({
'defaultJobName': self.PIPELINE_NAME
}).encode('utf-8'))
# Closes the jar file.
self._artifact_manager.close()
def artifact_staging_endpoint(self):
return self._artifact_staging_endpoint