blob: d141ccc1fac14041bc916b87518fa7c51dae64db [file] [log] [blame]
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
'''config.py: module for defining config'''
import heronpy.api.api_constants as api_constants
from heronpy.streamlet.resources import Resources
class Config(object):
"""Config is the way users configure the execution of the topology.
Things like tuple delivery semantics, resources used, as well as
user defined key/value pairs are passed on to the runner via
this class.
"""
ATMOST_ONCE = 1
ATLEAST_ONCE = 2
EFFECTIVELY_ONCE = 3
def __init__(self, config=None):
if config is not None and not isinstance(config, dict):
raise RuntimeError("Config has to be of type dict")
self._api_config = config
if self._api_config is None:
self._api_config = {}
def set_delivery_semantics(self, semantics):
if semantics == Config.ATMOST_ONCE:
self._api_config[api_constants.TOPOLOGY_RELIABILITY_MODE] =\
api_constants.TopologyReliabilityMode.ATMOST_ONCE
elif semantics == Config.ATLEAST_ONCE:
self._api_config[api_constants.TOPOLOGY_RELIABILITY_MODE] =\
api_constants.TopologyReliabilityMode.ATLEAST_ONCE
elif semantics == Config.EFFECTIVELY_ONCE:
self._api_config[api_constants.TOPOLOGY_RELIABILITY_MODE] =\
api_constants.TopologyReliabilityMode.EFFECTIVELY_ONCE
else:
raise RuntimeError("Unknown Topology delivery semantics %s" % str(semantics))
def set_num_containers(self, ncontainers):
self._api_config[api_constants.TOPOLOGY_STMGRS] = int(ncontainers)
def set_container_resources(self, resources):
if not isinstance(resources, Resources):
raise RuntimeError("container resources have to be of type Resources")
self._api_config[api_constants.TOPOLOGY_CONTAINER_CPU_REQUESTED] = resources.get_cpu()
self._api_config[api_constants.TOPOLOGY_CONTAINER_RAM_REQUESTED] = resources.get_ram()
def set_user_config(self, key, value):
self._api_config[key] = value