blob: 2470d6480ec189ccd7685b60389e01954fb48a2e [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from cloudify.decorators import operation
from cloudify import ctx
from cloudify.exceptions import NonRecoverableError
from datetime import datetime
from aria.cli.core import aria
@operation
def proxy_connect(**kwargs):
service_names = get_service_names()
duration = ctx.node.properties['wait_config']['wait_time']
installed = False
service_exists = ctx.node.properties['service_name'] in service_names
if service_exists:
installed = is_installed(
service_names[ctx.node.properties['service_name']])
wait_for_service = ctx.node.properties['wait_config']['wait_for_service']
wait_expr = ctx.node.properties['wait_config'].get('wait_expression',
None)
if not service_exists or not installed:
if wait_for_service:
if duration > ctx.operation.retry_number:
return ctx.operation.retry(
message = 'Waiting for service', retry_after=1)
else:
if not service_exists:
raise NonRecoverableError(
"service {} not found".format(
ctx.node.properties['service_name']))
else:
raise NonRecoverableError(
"service {} not installed".format(
ctx.node.properties['service_name']))
else:
if not service_exists:
raise NonRecoverableError(
"service {} not found".format(
ctx.node.properties['service_name']))
else:
raise NonRecoverableError(
"service {} not installed".format(
ctx.node.properties['service_name']))
# Service exists, so see if outputs exist yet
elif( ctx.node.properties['outputs']):
outputs = service_names[ctx.node.properties['service_name']].outputs
if not output_equivalence( ctx.node.properties['outputs'], outputs):
return fail_or_wait(wait_for_service,
duration,
'Waiting for service outputs',
"service {} outputs {} not found".format(
ctx.node.properties['service_name'],
ctx.node.properties['outputs']))
else:
# Success
# place outputs in attributes
# final wicket: expression
if wait_expr:
if not eval_waitexpr(wait_expr, outputs):
return(fail_or_wait(wait_for_service, duration,
"waiting for expr",
"Expr {} evaluates false".format(
wait_expr)))
service_outputs = []
if('service_outputs' in ctx.instance.runtime_properties and
ctx.instance.runtime_properties['service_outputs']):
service_outputs = list(
ctx.instance.runtime_properties['service_outputs'])
for k,v in outputs.iteritems():
service_outputs.append( dict(name = k,value = v.value))
ctx.instance.runtime_properties['service_outputs'] = service_outputs
ctx.instance.runtime_properties['last_update'] = str(datetime.utcnow())
else:
ctx.logger.info("service exists, but no outputs specified = success")
# returns service names
@aria.pass_model_storage
def get_service_names(model_storage):
"""
Lists all services
"""
services_list = model_storage.service.list()
outdict = {}
for service in services_list:
outdict[str(service.name)] = service
return outdict
# Tests whether the list of configured outputs (a simple string list) is equivalent
# to the list returned from Aria (possible duplicate keys)
def output_equivalence( config_list, service_list ):
sset = set()
for k,v in service_list.iteritems():
sset.add( k )
if not len(sset) == len(config_list):
return False
for entry in sset:
if not entry in config_list:
return False
return True
# Looks at the execution history to determine of service is installed
@aria.pass_model_storage
def is_installed( service, model_storage ):
executions = model_storage.execution.list(
filters=dict(service=service)).items
for e in reversed(executions):
if e.workflow_name == "uninstall":
return False
if e.workflow_name == "install" and e.status == "succeeded":
return True
return False
# Evaluates wait_expr in the context of supplied outputs
def eval_waitexpr( expr, outputs):
locals = {}
for k,v in outputs.iteritems():
locals[k] = v.value
return eval(expr,locals)
def fail_or_wait( wait_flag, duration, wait_msg, fail_msg ):
if wait_flag:
if duration > ctx.operation.retry_number:
return ctx.operation.retry(
message = wait_msg, retry_after=1)
else:
raise NonRecoverableError( fail_msg)
else:
raise NonRecoverableError( fail_msg)