blob: 281e2d9e26ed6e1e3b282aab37b5fc83d0850850 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# pytype: skip-file
import contextlib
import logging
import os
import tempfile
import unittest
import zipfile
import requests_mock
from apache_beam.options import pipeline_options
from apache_beam.portability.api import beam_job_api_pb2
from apache_beam.portability.api import beam_runner_api_pb2
from apache_beam.runners.portability import flink_runner
from apache_beam.runners.portability import flink_uber_jar_job_server
from apache_beam.runners.portability.local_job_service_test import TestJobServicePlan
@contextlib.contextmanager
def temp_name(*args, **kwargs):
with tempfile.NamedTemporaryFile(*args, **kwargs) as t:
name = t.name
yield name
if os.path.exists(name):
os.unlink(name)
class FlinkUberJarJobServerTest(unittest.TestCase):
@requests_mock.mock()
def test_flink_version(self, http_mock):
http_mock.get('http://flink/v1/config', json={'flink-version': '3.1.4.1'})
job_server = flink_uber_jar_job_server.FlinkUberJarJobServer(
'http://flink', pipeline_options.FlinkRunnerOptions())
self.assertEqual(job_server.flink_version(), "3.1")
@requests_mock.mock()
def test_get_job_metrics(self, http_mock):
response = {
"user-task-accumulators": [{
"name": "__metricscontainers",
"type": "MetricsAccumulator",
"value": "{\"metrics\": {\"attempted\": [{\"urn\": "
"\"metric_urn\", \"type\": \"beam:metrics:sum_int64:v1\", "
"\"payload\": \"AA==\", \"labels\": "
"{\"PTRANSFORM\": \"ptransform_id\"}}]}}"
}]
}
http_mock.get(
'http://flink/v1/jobs/flink_job_id/accumulators', json=response)
options = pipeline_options.FlinkRunnerOptions()
job_server = flink_uber_jar_job_server.FlinkUberJarJobServer(
'http://flink', options)
job = flink_uber_jar_job_server.FlinkBeamJob(
'http://flink', None, 'job_id', 'job_name', None, options)
job._flink_job_id = 'flink_job_id'
job_server._jobs['job_id'] = job
request = beam_job_api_pb2.GetJobMetricsRequest(job_id='job_id')
expected = beam_job_api_pb2.GetJobMetricsResponse(
metrics=beam_job_api_pb2.MetricResults(
attempted=[{
"urn": "metric_urn",
"type": "beam:metrics:sum_int64:v1",
"payload": b'\000',
"labels": {
"PTRANSFORM": "ptransform_id"
}
}]))
actual = job_server.GetJobMetrics(request)
self.assertEqual(actual, expected)
@requests_mock.mock()
def test_end_to_end(self, http_mock):
with temp_name(suffix='fake.jar') as fake_jar:
# Create the jar file with some trivial contents.
with zipfile.ZipFile(fake_jar, 'w') as zip:
with zip.open('FakeClass.class', 'w') as fout:
fout.write(b'[original_contents]')
options = pipeline_options.FlinkRunnerOptions()
options.flink_job_server_jar = fake_jar
job_server = flink_uber_jar_job_server.FlinkUberJarJobServer(
'http://flink', options)
plan = TestJobServicePlan(job_server)
# Prepare the job.
prepare_response = plan.prepare(beam_runner_api_pb2.Pipeline())
plan.stage(
beam_runner_api_pb2.Pipeline(),
prepare_response.artifact_staging_endpoint.url,
prepare_response.staging_session_token)
# Now actually run the job.
http_mock.post(
'http://flink/v1/jars/upload',
json={'filename': '/path/to/jar/nonce'})
http_mock.post(
'http://flink/v1/jars/nonce/run', json={'jobid': 'some_job_id'})
_, message_stream, state_stream = plan.run(
prepare_response.preparation_id)
# Check the status until the job is "done" and get all error messages.
http_mock.get(
'http://flink/v1/jobs/some_job_id/execution-result',
[{
'json': {
'status': {
'id': 'IN_PROGRESS'
}
}
}, {
'json': {
'status': {
'id': 'IN_PROGRESS'
}
}
}, {
'json': {
'status': {
'id': 'COMPLETED'
}
}
}])
http_mock.get(
'http://flink/v1/jobs/some_job_id', json={'state': 'FINISHED'})
http_mock.delete('http://flink/v1/jars/nonce')
self.assertEqual([s.state for s in state_stream],
[
beam_job_api_pb2.JobState.STOPPED,
beam_job_api_pb2.JobState.RUNNING,
beam_job_api_pb2.JobState.RUNNING,
beam_job_api_pb2.JobState.DONE
])
http_mock.get(
'http://flink/v1/jobs/some_job_id/exceptions',
json={'all-exceptions': [{
'exception': 'exc_text', 'timestamp': 0
}]})
def get_item(x):
if x.HasField('message_response'):
return x.message_response
else:
return x.state_response.state
self.assertEqual([get_item(m) for m in message_stream],
[
beam_job_api_pb2.JobState.STOPPED,
beam_job_api_pb2.JobState.RUNNING,
beam_job_api_pb2.JobMessage(
message_id='message0',
time='0',
importance=beam_job_api_pb2.JobMessage.
MessageImportance.JOB_MESSAGE_ERROR,
message_text='exc_text'),
beam_job_api_pb2.JobState.DONE,
])
def test_retain_unknown_options(self):
original_options = pipeline_options.PipelineOptions(
['--unknown_option_foo=some_value'])
flink_options = original_options.view_as(
pipeline_options.FlinkRunnerOptions)
flink_options.flink_submit_uber_jar = True
flink_options.flink_master = 'http://host:port'
runner = flink_runner.FlinkRunner()
job_service_handle = runner.create_job_service(original_options)
options_proto = job_service_handle.get_pipeline_options()
self.assertEqual(
options_proto['beam:option:unknown_option_foo:v1'], 'some_value')
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
unittest.main()