blob: d8dd8a499cc347223cdfeee2bbcdca2d5446c736 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from __future__ import absolute_import
import atexit
import logging
import os
import signal
import socket
import sys
import time
from subprocess import Popen
from subprocess import check_output
from threading import Lock
class DockerizedJobServer(object):
"""
Spins up the JobServer in a docker container for local execution
"""
def __init__(self, job_host="localhost",
job_port=None,
artifact_port=None,
expansion_port=None,
harness_port_range=(8100, 8200),
max_connection_retries=5):
self.job_host = job_host
self.job_port = job_port
self.expansion_port = expansion_port
self.artifact_port = artifact_port
self.harness_port_range = harness_port_range
self.max_connection_retries = max_connection_retries
self.docker_process = None
self.process_lock = Lock()
def start(self):
# TODO This is hardcoded to Flink at the moment but should be changed
job_server_image_name = os.environ['USER'] + \
"-docker-apache.bintray.io/beam/flink-job-server:latest"
docker_path = check_output(['which', 'docker']).strip()
cmd = ["docker", "run",
# We mount the docker binary and socket to be able to spin up
# "sibling" containers for the SDK harness.
"-v", ':'.join([docker_path, "/bin/docker"]),
"-v", "/var/run/docker.sock:/var/run/docker.sock"]
self.job_port, self.artifact_port, self.expansion_port = \
DockerizedJobServer._pick_port(self.job_port,
self.artifact_port,
self.expansion_port)
args = ['--job-host', self.job_host,
'--job-port', str(self.job_port),
'--artifact-port', str(self.artifact_port),
'--expansion-port', str(self.expansion_port)]
if sys.platform == "darwin":
# Docker-for-Mac doesn't support host networking, so we need to explictly
# publish ports from the Docker container to be able to connect to it.
# Also, all other containers need to be aware that they run Docker-on-Mac
# to connect against the internal Docker-for-Mac address.
cmd += ["-e", "DOCKER_MAC_CONTAINER=1"]
cmd += ["-p", "{}:{}".format(self.job_port, self.job_port)]
cmd += ["-p", "{}:{}".format(self.artifact_port, self.artifact_port)]
cmd += ["-p", "{}:{}".format(self.expansion_port, self.expansion_port)]
cmd += ["-p", "{0}-{1}:{0}-{1}".format(
self.harness_port_range[0], self.harness_port_range[1])]
else:
# This shouldn't be set for MacOS because it detroys port forwardings,
# even though host networking is not supported on MacOS.
cmd.append("--network=host")
cmd.append(job_server_image_name)
cmd += args
logging.debug("Starting container with %s", cmd)
try:
self.docker_process = Popen(cmd)
atexit.register(self.stop)
signal.signal(signal.SIGINT, self.stop)
except: # pylint:disable=bare-except
logging.exception("Error bringing up container")
self.stop()
return "{}:{}".format(self.job_host, self.job_port)
def stop(self):
with self.process_lock:
if not self.docker_process:
return
num_retries = 0
while self.docker_process.poll() is None and \
num_retries < self.max_connection_retries:
logging.debug("Sending SIGINT to job_server container")
self.docker_process.send_signal(signal.SIGINT)
num_retries += 1
time.sleep(1)
if self.docker_process.poll is None:
self.docker_process.kill()
@staticmethod
def _pick_port(*ports):
"""
Returns a list of ports, same length as input ports list, but replaces
all None or 0 ports with a random free port.
"""
sockets = []
def find_free_port(port):
if port:
return port
else:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sockets.append(s)
s.bind(('localhost', 0))
_, free_port = s.getsockname()
return free_port
ports = list(map(find_free_port, ports))
# Close sockets only now to avoid the same port to be chosen twice
for s in sockets:
s.close()
return ports