blob: 23b95311e893870cfab42d154ff38cac62ed3e5d [file] [log] [blame]
#!/usr/bin/env python
'''
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
'''
import logging
import os
import subprocess
import Constants
import time
import traceback
from resource_management import *
logger = logging.getLogger()
class YarnDockerManager(Script):
stored_status_command = ''
stored_command = ''
def __init__(self, tmpdir, workroot, customServiceOrchestrator):
self.tmpdir = tmpdir
self.workroot = workroot
self.customServiceOrchestrator = customServiceOrchestrator
def execute_command(self, command, store_command=False):
returncode = ''
out = ''
err = ''
if store_command:
logger.info("Storing applied config: " + str(command['configurations']))
self.stored_command = command
status_command_str = self.extract_config_from_command(command, 'docker.status_command')
if status_command_str:
self.stored_status_command = status_command_str
logger.info("status command" + self.stored_status_command)
if command['roleCommand'] == 'INSTALL':
with Environment(self.workroot) as env:
self.install_container(command, env)
returncode = 0; out = ''; err = ''
if command['roleCommand'] == 'START':
returncode, out, err = self.start_container(command)
# need check
return {Constants.EXIT_CODE:returncode, 'stdout':out, 'stderr':err}
def extract_config_files_from_command(self, command):
if 'containers' in command and len(command['containers']) > 0:
if 'configFiles' in command['containers'][0]:
return command['containers'][0]['configFiles']
return []
def extract_config_file_properties_from_command(self, command, file):
if 'configurations' in command:
if 'dictionaryName' in file and file['dictionaryName'] in command['configurations']:
properties = {}
for k,v in command['configurations'][file['dictionaryName']].iteritems():
properties[k] = format(v, **os.environ)
return properties
return {}
def extract_config_from_command(self, command, field):
value = ''
if 'configurations' in command:
if 'docker' in command['configurations']:
if field in command['configurations']['docker']:
logger.info(field + ': ' + str( command['configurations']['docker'][field]))
value = command['configurations']['docker'][field]
return value
# will evolve into a class hierarch, linux and windows
def execute_command_on_linux(self, docker_command, blocking):
logger.info("command str: " + docker_command)
logger.info("command env: " + str(os.environ))
proc = subprocess.Popen(docker_command, stdout = subprocess.PIPE, shell=True)
returncode = 0
out = ''
err = ''
if blocking is True:
out, err = proc.communicate()
returncode = proc.returncode
else:
time.sleep(5)
if proc.returncode is not None:
# this means the command has already returned
returncode = proc.returncode
out, err = proc.communicate()
logger.info("returncode: " + str(returncode) + " out: " + str(out) + " err: " + str(err))
return returncode, out, err
def enqueue_output(out, queue):
for line in iter(out.readline, b''):
queue.put(line)
out.close()
def get_config_file_global(self, command, file, property, useEnv=True):
keyName = file['dictionaryName'] + '.' + property
if keyName in command['configurations']['global']:
if useEnv:
return format(command['configurations']['global'][keyName], **os.environ)
else:
return command['configurations']['global'][keyName]
return None
def install_container(self, command, env):
try:
configFiles = self.extract_config_files_from_command(command)
for configFile in configFiles:
properties = self.extract_config_file_properties_from_command(command, configFile)
fileName = configFile['fileName']
dir = self.get_config_file_global(command, configFile, 'destDir')
if dir is None:
dir = self.workroot
logger.info("creating config file " + str(configFile) + " in directory "+str(dir))
Directory(dir, recursive = True)
if configFile['type'] == 'properties':
PropertiesFile(fileName,
dir=dir,
properties=properties)
elif configFile['type'] == 'env':
content = self.get_config_file_global(command, configFile, 'content', useEnv=False)
if content is not None:
File(os.path.join(dir, fileName),
content=InlineTemplate(content, **properties))
elif configFile['type'] == 'template':
templateFile = self.get_config_file_global(command, configFile, 'templateFile')
if templateFile is not None:
with open(templateFile,"r") as fp:
fileContent = fp.read()
File(os.path.join(dir, fileName),
content=InlineTemplate(fileContent, **properties))
elif configFile['type'] == 'xml':
XmlConfig(fileName,
conf_dir=dir,
configurations=properties)
except:
traceback.print_exc()
def start_container(self, command):
#extracting param needed by docker run from the command passed from AM
startCommand = self.extract_config_from_command(command, 'docker.startCommand')
returncode,out,err = self.execute_command_on_linux(startCommand, False)
return returncode,out,err
def query_status(self, command):
if command['roleCommand'] == "GET_CONFIG":
return self.getConfig(command)
else:
returncode = ''
out = ''
err = ''
status_command_str = self.extract_config_from_command(command, 'docker.status_command')
if status_command_str:
self.stored_status_command = status_command_str
logger.info("in query_status, got stored status command" + self.stored_status_command)
if self.stored_status_command:
logger.info("stored status command to run: " + self.stored_status_command)
returncode, out, err = self.execute_command_on_linux(self.stored_status_command, True)
logger.info("status of the app in docker container: " + str(returncode) + ";" + str(out) + ";" + str(err))
return {Constants.EXIT_CODE:returncode, 'stdout':out, 'stderr':err}
def getConfig(self, command):
logger.info("get config command: " + str(command))
config = {}
if 'configurations' in self.stored_command:
if 'commandParams' in command and 'config_type' in command['commandParams']:
config_type = command['commandParams']['config_type']
logger.info("Requesting applied config for type {0}".format(config_type))
if config_type in self.stored_command['configurations']:
logger.info("get config result: " + self.stored_command['configurations'][config_type])
config = {
'configurations': {config_type: self.stored_command['configurations'][config_type]}
}
else:
config = {
'configurations': {}
}
pass
else:
logger.info("Requesting all applied config." + str(self.stored_command['configurations']))
config = {
'configurations': self.stored_command['configurations']
}
pass
else:
config = {
'configurations': {}
}
#query the ip and hostname of the docker container where the agent is running
ip_query_command = "ip addr show dev eth0 | grep \'inet \' | awk \'{print $2}\' | cut -d / -f 1"
proc = subprocess.Popen(ip_query_command, stdout = subprocess.PIPE, shell=True)
ip, err = proc.communicate()
if err is not None:
logger.error("error when retrieving ip: " + err)
hostname_query_command = "hostname"
proc = subprocess.Popen(hostname_query_command, stdout = subprocess.PIPE, shell=True)
hostname, err = proc.communicate()
if err is not None:
logger.error("error when retrieving hostname: " + err)
config['ip'] = ip.rstrip()
config['hostname'] = hostname.rstrip()
logger.info('response from getconfig: ' + str(config))
return config