blob: 6a17bb1cbf55c6ea6c16a3bb188f3525f18851a2 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from cloudify.decorators import operation
from cloudify import ctx
from cloudify.exceptions import NonRecoverableError
from datetime import datetime
from aria.cli.core import aria
WAIT_CONFIG_KEY = 'wait_config'
WAIT_TIME_KEY = 'wait_time'
WAIT_FOR_SERVICE_KEY = 'wait_for_service'
WAIT_EXPR_KEY = 'wait_expression'
SERVICE_NAME_KEY = 'service_name'
SERVICE_OUTPUTS_KEY = 'service_outputs'
OUTPUT_KEY = 'outputs'
LAST_UPDATE_KEY = 'last_update'
WORKFLOW_INSTALL = 'install'
WORKFLOW_UNINSTALL = 'uninstall'
WF_SUCCESS_STATUS = 'succeeded'
RETRY_DELAY_SECS = 1
# Only operation exposed. Waits for proxied service to be ready
# if necessary, then copies selected outputs to node attributes.
#
@operation
def proxy_connect(**kwargs):
#
# Collect node configuration
#
service_names = get_service_names()
duration = ctx.node.properties[WAIT_CONFIG_KEY][WAIT_TIME_KEY]
installed = False
service_exists = ctx.node.properties[SERVICE_NAME_KEY] in service_names
if service_exists:
installed = is_installed(
service_names[ctx.node.properties[SERVICE_NAME_KEY]])
wait_for_service = ctx.node.properties[WAIT_CONFIG_KEY][WAIT_FOR_SERVICE_KEY]
wait_expr = ctx.node.properties[WAIT_CONFIG_KEY].get(WAIT_EXPR_KEY,
None)
# If the service doesn't exist, or exists but hasn't been installed,
# and wait_for_service = true, retry for configured duration.
#
if not service_exists or not installed:
if wait_for_service:
if duration > ctx.operation.retry_number:
return ctx.operation.retry(
message = 'Waiting for service',
retry_after=RETRY_DELAY_SECS)
else:
if not service_exists:
raise NonRecoverableError(
"service {} not found".format(
ctx.node.properties[SERVICE_NAME_KEY]))
else:
raise NonRecoverableError(
"service {} not installed".format(
ctx.node.properties[SERVICE_NAME_KEY]))
else:
if not service_exists:
raise NonRecoverableError(
"service {} not found".format(
ctx.node.properties[SERVICE_NAME_KEY]))
else:
raise NonRecoverableError(
"service {} not installed".format(
ctx.node.properties[SERVICE_NAME_KEY]))
# Service ready. If outputs are configured in proxy, grab them
elif( ctx.node.properties[OUTPUT_KEY]):
outputs = service_names[ctx.node.properties[SERVICE_NAME_KEY]].outputs
# If the outputs are not ready yet, retry
if not output_equivalence( ctx.node.properties[OUTPUT_KEY], outputs):
return fail_or_wait(wait_for_service,
duration,
'Waiting for service outputs',
"service {} outputs {} not found".format(
ctx.node.properties[SERVICE_NAME_KEY],
ctx.node.properties[OUTPUT_KEY]))
# Outputs are ready. Copy them from target outputs into the
# this node instance attributes
else:
# Success
# place outputs in attributes
# final wicket: expression
if wait_expr:
if not eval_waitexpr(wait_expr, outputs):
return(fail_or_wait(wait_for_service, duration,
"waiting for expr",
"Expr {} evaluates false".format(
wait_expr)))
service_outputs = []
if(SERVICE_OUTPUTS_KEY in ctx.instance.runtime_properties and
ctx.instance.runtime_properties[SERVICE_OUTPUTS_KEY]):
service_outputs = list(
ctx.instance.runtime_properties[SERVICE_OUTPUTS_KEY])
for key,val in outputs.iteritems():
service_outputs.append( dict(name = key,value = val.value))
ctx.instance.runtime_properties[SERVICE_OUTPUTS_KEY] = service_outputs
ctx.instance.runtime_properties[LAST_UPDATE_KEY] = str(datetime.utcnow())
# Service exists, but outputs not configured, so we're done
else:
ctx.logger.info("service exists, but no outputs specified = success")
# returns service names
@aria.pass_model_storage
def get_service_names(model_storage):
"""
Lists all services
"""
services_list = model_storage.service.list()
outdict = {}
for service in services_list:
outdict[str(service.name)] = service
return outdict
# Tests whether the list of configured outputs (a simple string list)
# is equivalent # to the list returned from Aria (possible duplicate keys)
def output_equivalence(config_list, service_list):
sset = set()
for key, val in service_list.iteritems():
sset.add(key)
if not len(sset) == len(config_list):
return False
for entry in sset:
if entry not in config_list:
return False
return True
# Looks at the execution history to determine of service is installed
@aria.pass_model_storage
def is_installed(service, model_storage):
executions = model_storage.execution.list(
filters=dict(service=service)).items
for execution in reversed(executions):
if execution.workflow_name == WORKFLOW_UNINSTALL:
return False
if (execution.workflow_name == WORKFLOW_INSTALL and
execution.status == WF_SUCCESS_STATUS):
return True
return False
# Evaluates wait_expr in the context of supplied outputs
def eval_waitexpr(expr, outputs):
locals = {}
for key, val in outputs.iteritems():
locals[key] = val.value
return eval(expr, locals)
# Convenience function that either fails immediately (if wait_flag
# is false), or tests and retries or fails based on the wait condition
def fail_or_wait( wait_flag, duration, wait_msg, fail_msg ):
if wait_flag:
if duration > ctx.operation.retry_number:
return ctx.operation.retry(
message = wait_msg, retry_after=RETRY_DELAY_SECS)
else:
raise NonRecoverableError( fail_msg)
else:
raise NonRecoverableError( fail_msg)