blob: 8fe265dad7fbef53f372db0d102dd6f79df5fbec [file] [log] [blame]
#!/usr/bin/env python
"""
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import os
import time
from resource_management import *
def tensorflow_service(action='start'): # 'start' or 'stop' or 'status'
import params
import functions
container_id = format("{container_id}")
application_id = functions.get_application_id(container_id)
componentName = format("{componentName}")
if action == 'start':
checkpoint_dir = format("{user_checkpoint_prefix}/{service_name}/checkpoints")
mem = functions.get_allocated_resources()['mem.' + componentName]
allocated_port = params.ports_dict['port.' + componentName]
# Always launch role tensorboard
if componentName == "tensorboard":
daemon_cmd = format("/usr/bin/docker run -d -u $(id -u yarn) --cgroup-parent={yarn_cg_root}/{container_id} -m {mem}m " \
"-v {hadoop_conf}:/usr/local/hadoop/etc/hadoop " \
"-v /etc/passwd:/etc/passwd -v /etc/group:/etc/group " \
"-p {allocated_port}:{allocated_port} --name={container_id} ytensorflow:0.2.1 " \
"/bin/bash -c 'tensorboard --logdir={checkpoint_dir} --port={allocated_port}'")
Execute(daemon_cmd)
else:
# Waiting for all ps/worker to be exported
num_ps, num_worker = functions.get_allocated_instances_num()
num_allocated = num_ps + num_worker
ps_list, worker_list = functions.get_launched_instances()
num_launched = len(ps_list) + len(worker_list)
while num_launched < num_allocated:
print format("Waiting for all ports({num_launched}/{num_allocated}) to be exported")
time.sleep(5)
ps_list, worker_list = functions.get_launched_instances()
num_launched = len(ps_list) + len(worker_list)
# Generate parameters
ps_hosts = ",".join(ps_list)
worker_hosts = ",".join(worker_list)
task_index = (ps_list.index(format("{hostname}:{allocated_port}"))) if (componentName == 'ps') else (
worker_list.index(format("{hostname}:{allocated_port}")))
job_name = "worker" if (componentName == 'chiefworker') else componentName
# Build clusterSpec and command
daemon_cmd = format("/usr/bin/docker run -d -u $(id -u yarn) --cgroup-parent={yarn_cg_root}/{container_id} -m {mem}m " \
"-v {hadoop_conf}:/usr/local/hadoop/etc/hadoop " \
"-v /etc/passwd:/etc/passwd -v /etc/group:/etc/group " \
"-v {app_root}:{app_root} -v {app_log_dir}:{app_log_dir} " \
"-p {allocated_port}:{allocated_port} --name={container_id} {docker_image} " \
"/bin/bash -c 'export HADOOP_USER_NAME={user_name}; /usr/bin/python {app_root}/{user_scripts_entry} " \
"--ps_hosts={ps_hosts} --worker_hosts={worker_hosts} --job_name={job_name} --task_index={task_index} " \
"--ckp_dir={checkpoint_dir} --work_dir={app_root} >>{app_log_dir}/tensorflow.out 2>>{app_log_dir}/tensorflow.err'")
Execute(daemon_cmd)
elif action == 'stop':
cmd = format("/usr/bin/docker stop {container_id}")
op_test = format("/usr/bin/docker ps | grep {container_id} >/dev/null 2>&1")
Execute(cmd,
tries=5,
try_sleep=10,
wait_for_finish=True,
only_if=op_test
)
elif action == 'status':
cmd_status = "/usr/bin/docker inspect -f '{{.State.Running}}' %s" % container_id
running = os.popen(cmd_status).read().strip('\n')
if running == 'true':
print "Component instance is running..."
# Role tensorboard will watch all workers' status
if componentName == "tensorboard":
running, finished = functions.get_workers()
print "Running tensorflow workers(%s) : %s \nFinished tensorflow workers(%s) : %s" \
% (len(running), ','.join(running), len(finished), ','.join(finished))
# All worker has finished successfully, going to stop cluster...
num_ps, num_worker = functions.get_allocated_instances_num()
if len(finished) == num_worker:
functions.stop_cluster()
else:
cmd_exit = "/usr/bin/docker inspect -f '{{.State.ExitCode}}' %s" % container_id
exit_code = int(os.popen(cmd_exit).read().strip('\n'))
if exit_code != 0:
retry = functions.set_retry_num(container_id)
if retry <= 5:
# Remove failed docker container
cmd_rm = format("/usr/bin/docker rm -f {container_id}")
Execute(cmd_rm)
# restart user tensorflow script
tensorflow_service(action='start')
else:
raise ComponentIsNotRunning()
else:
print "Component instance has finished successfully"
functions.set_container_status(container_id)