blob: 4fa92cddc55ab0e5898eacac28cee718c84bf066 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""A job server submitting portable pipelines as uber jars to Flink."""
# pytype: skip-file
from __future__ import absolute_import
from __future__ import print_function
import logging
import os
import tempfile
import time
import urllib
import requests
from google.protobuf import json_format
from apache_beam.options import pipeline_options
from apache_beam.portability.api import beam_job_api_pb2
from apache_beam.runners.portability import abstract_job_service
from apache_beam.runners.portability import job_server
_LOGGER = logging.getLogger(__name__)
class FlinkUberJarJobServer(abstract_job_service.AbstractJobServiceServicer):
"""A Job server which submits a self-contained Jar to a Flink cluster.
The jar contains the Beam pipeline definition, dependencies, and
the pipeline artifacts.
"""
def __init__(self, master_url, options):
super(FlinkUberJarJobServer, self).__init__()
self._master_url = master_url
self._executable_jar = (
options.view_as(
pipeline_options.FlinkRunnerOptions).flink_job_server_jar)
self._artifact_port = (
options.view_as(pipeline_options.JobServerOptions).artifact_port)
self._temp_dir = tempfile.mkdtemp(prefix='apache-beam-flink')
def start(self):
return self
def stop(self):
pass
def executable_jar(self):
if self._executable_jar:
if not os.path.exists(self._executable_jar):
parsed = urllib.parse.urlparse(self._executable_jar)
if not parsed.scheme:
raise ValueError(
'Unable to parse jar URL "%s". If using a full URL, make sure '
'the scheme is specified. If using a local file path, make sure '
'the file exists; you may have to first build the job server '
'using `./gradlew runners:flink:%s:job-server:shadowJar`.' %
(self._executable_jar, self._flink_version))
url = self._executable_jar
else:
url = job_server.JavaJarJobServer.path_to_beam_jar(
'runners:flink:%s:job-server:shadowJar' % self.flink_version())
return job_server.JavaJarJobServer.local_jar(url)
def flink_version(self):
full_version = requests.get('%s/v1/config' %
self._master_url).json()['flink-version']
# Only return up to minor version.
return '.'.join(full_version.split('.')[:2])
def create_beam_job(self, job_id, job_name, pipeline, options):
return FlinkBeamJob(
self._master_url,
self.executable_jar(),
job_id,
job_name,
pipeline,
options,
artifact_port=self._artifact_port)
def GetJobMetrics(self, request, context=None):
if request.job_id not in self._jobs:
raise LookupError("Job {} does not exist".format(request.job_id))
metrics_text = self._jobs[request.job_id].get_metrics()
response = beam_job_api_pb2.GetJobMetricsResponse()
json_format.Parse(metrics_text, response)
return response
class FlinkBeamJob(abstract_job_service.UberJarBeamJob):
"""Runs a single Beam job on Flink by staging all contents into a Jar
and uploading it via the Flink Rest API."""
def __init__(
self,
master_url,
executable_jar,
job_id,
job_name,
pipeline,
options,
artifact_port=0):
super(FlinkBeamJob, self).__init__(
executable_jar,
job_id,
job_name,
pipeline,
options,
artifact_port=artifact_port)
self._master_url = master_url
def request(self, method, path, expected_status=200, **kwargs):
url = '%s/%s' % (self._master_url, path)
response = method(url, **kwargs)
if response.status_code != expected_status:
raise RuntimeError(
"Request to %s failed with status %d: %s" %
(url, response.status_code, response.text))
if response.text:
return response.json()
def get(self, path, **kwargs):
return self.request(requests.get, path, **kwargs)
def post(self, path, **kwargs):
return self.request(requests.post, path, **kwargs)
def delete(self, path, **kwargs):
return self.request(requests.delete, path, **kwargs)
def run(self):
self._stop_artifact_service()
# Upload the jar and start the job.
with open(self._jar, 'rb') as jar_file:
self._flink_jar_id = self.post(
'v1/jars/upload',
files={'jarfile': ('beam.jar', jar_file)})['filename'].split('/')[-1]
self._jar_uploaded = True
self._flink_job_id = self.post(
'v1/jars/%s/run' % self._flink_jar_id,
json={
'entryClass': 'org.apache.beam.runners.flink.FlinkPipelineRunner'
})['jobid']
os.unlink(self._jar)
_LOGGER.info('Started Flink job as %s' % self._flink_job_id)
def cancel(self):
self.post('v1/%s/stop' % self._flink_job_id, expected_status=202)
self.delete_jar()
def delete_jar(self):
if self._jar_uploaded:
self._jar_uploaded = False
try:
self.delete('v1/jars/%s' % self._flink_jar_id)
except Exception:
_LOGGER.info(
'Error deleting jar %s' % self._flink_jar_id, exc_info=True)
def _get_state(self):
"""Query flink to get the current state.
:return: tuple of int and Timestamp or None
timestamp will be None if the state has not changed since the last query.
"""
# For just getting the status, execution-result seems cheaper.
flink_status = self.get('v1/jobs/%s/execution-result' %
self._flink_job_id)['status']['id']
if flink_status == 'COMPLETED':
flink_status = self.get('v1/jobs/%s' % self._flink_job_id)['state']
beam_state = {
'CREATED': beam_job_api_pb2.JobState.STARTING,
'RUNNING': beam_job_api_pb2.JobState.RUNNING,
'FAILING': beam_job_api_pb2.JobState.RUNNING,
'FAILED': beam_job_api_pb2.JobState.FAILED,
'CANCELLING': beam_job_api_pb2.JobState.CANCELLING,
'CANCELED': beam_job_api_pb2.JobState.CANCELLED,
'FINISHED': beam_job_api_pb2.JobState.DONE,
'RESTARTING': beam_job_api_pb2.JobState.RUNNING,
'SUSPENDED': beam_job_api_pb2.JobState.RUNNING,
'RECONCILING': beam_job_api_pb2.JobState.RUNNING,
'IN_PROGRESS': beam_job_api_pb2.JobState.RUNNING,
'COMPLETED': beam_job_api_pb2.JobState.DONE,
}.get(flink_status, beam_job_api_pb2.JobState.UNSPECIFIED)
if self.is_terminal_state(beam_state):
self.delete_jar()
# update the state history if it has changed
return beam_state, self.set_state(beam_state)
def get_state(self):
state, timestamp = self._get_state()
if timestamp is None:
# state has not changed since it was last checked: use previous timestamp
return super(FlinkBeamJob, self).get_state()
else:
return state, timestamp
def get_state_stream(self):
def _state_iter():
sleep_secs = 1.0
while True:
yield self.get_state()
sleep_secs = min(60, sleep_secs * 1.2)
time.sleep(sleep_secs)
for state, timestamp in self.with_state_history(_state_iter()):
yield state, timestamp
if self.is_terminal_state(state):
break
def get_message_stream(self):
for state, timestamp in self.get_state_stream():
if self.is_terminal_state(state):
response = self.get('v1/jobs/%s/exceptions' % self._flink_job_id)
for ix, exc in enumerate(response['all-exceptions']):
yield beam_job_api_pb2.JobMessage(
message_id='message%d' % ix,
time=str(exc['timestamp']),
importance=beam_job_api_pb2.JobMessage.MessageImportance.
JOB_MESSAGE_ERROR,
message_text=exc['exception'])
yield state, timestamp
break
else:
yield state, timestamp
def get_metrics(self):
accumulators = self.get('v1/jobs/%s/accumulators' %
self._flink_job_id)['user-task-accumulators']
for accumulator in accumulators:
if accumulator['name'] == '__metricscontainers':
return accumulator['value']
raise LookupError(
"Found no metrics container for job {}".format(self._flink_job_id))