| # Licensed to the Apache Software Foundation (ASF) under one or more |
| # contributor license agreements. See the NOTICE file distributed with |
| # this work for additional information regarding copyright ownership. |
| # The ASF licenses this file to You under the Apache License, Version 2.0 |
| # (the "License"); you may not use this file except in compliance with |
| # the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| """ |
| Local execution of operations. |
| """ |
| |
| import os |
| import subprocess |
| import threading |
| import StringIO |
| |
| from . import ctx_proxy |
| from . import exceptions |
| from . import common |
| from . import constants |
| from . import environment_globals |
| from . import python_script_scope |
| |
| |
| def run_script(ctx, script_path, process, **kwargs): |
| if not script_path: |
| ctx.task.abort('Missing script_path') |
| process = process or {} |
| script_path = common.download_script(ctx, script_path) |
| script_func = _get_run_script_func(script_path, process) |
| return script_func( |
| ctx=ctx, |
| script_path=script_path, |
| process=process, |
| operation_kwargs=kwargs) |
| |
| |
| def _get_run_script_func(script_path, process): |
| if _treat_script_as_python_script(script_path, process): |
| return _eval_script_func |
| else: |
| if _treat_script_as_powershell_script(script_path): |
| process.setdefault('command_prefix', constants.DEFAULT_POWERSHELL_EXECUTABLE) |
| return _execute_func |
| |
| |
| def _treat_script_as_python_script(script_path, process): |
| eval_python = process.get('eval_python') |
| script_extension = os.path.splitext(script_path)[1].lower() |
| return (eval_python is True or (script_extension == constants.PYTHON_SCRIPT_FILE_EXTENSION and |
| eval_python is not False)) |
| |
| |
| def _treat_script_as_powershell_script(script_path): |
| script_extension = os.path.splitext(script_path)[1].lower() |
| return script_extension == constants.POWERSHELL_SCRIPT_FILE_EXTENSION |
| |
| |
| def _eval_script_func(script_path, ctx, operation_kwargs, **_): |
| with python_script_scope(operation_ctx=ctx, operation_inputs=operation_kwargs): |
| execfile(script_path, environment_globals.create_initial_globals(script_path)) |
| |
| |
| def _execute_func(script_path, ctx, process, operation_kwargs): |
| os.chmod(script_path, 0755) |
| process = common.create_process_config( |
| script_path=script_path, |
| process=process, |
| operation_kwargs=operation_kwargs) |
| command = process['command'] |
| env = os.environ.copy() |
| env.update(process['env']) |
| ctx.logger.info('Executing: {0}'.format(command)) |
| with ctx_proxy.server.CtxProxy(ctx, common.patch_ctx) as proxy: |
| env[ctx_proxy.client.CTX_SOCKET_URL] = proxy.socket_url |
| running_process = subprocess.Popen( |
| command, |
| shell=True, |
| stdout=subprocess.PIPE, |
| stderr=subprocess.PIPE, |
| env=env, |
| cwd=process.get('cwd'), |
| bufsize=1, |
| close_fds=not common.is_windows()) |
| stdout_consumer = _OutputConsumer(running_process.stdout) |
| stderr_consumer = _OutputConsumer(running_process.stderr) |
| exit_code = running_process.wait() |
| stdout_consumer.join() |
| stderr_consumer.join() |
| ctx.logger.info('Execution done (exit_code={0}): {1}'.format(exit_code, command)) |
| |
| def error_check_func(): |
| if exit_code: |
| raise exceptions.ProcessException( |
| command=command, |
| exit_code=exit_code, |
| stdout=stdout_consumer.read_output(), |
| stderr=stderr_consumer.read_output()) |
| return common.check_error(ctx, error_check_func=error_check_func) |
| |
| |
| class _OutputConsumer(object): |
| |
| def __init__(self, out): |
| self._out = out |
| self._buffer = StringIO.StringIO() |
| self._consumer = threading.Thread(target=self._consume_output) |
| self._consumer.daemon = True |
| self._consumer.start() |
| |
| def _consume_output(self): |
| for line in iter(self._out.readline, b''): |
| self._buffer.write(line) |
| self._out.close() |
| |
| def read_output(self): |
| return self._buffer.getvalue() |
| |
| def join(self): |
| self._consumer.join() |