blob: 501f7c09c429f428204c6479acfa71c3f8f47d11 [file] [log] [blame]
#
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
import pika
from .pikaasyncconsumer import PikaAsyncConsumer
from .datacollector import DataCollector
from .datareporter import DataReporter
from .eventhandler import EventHandler
from .eventmonitor import EventMonitor
class ReporterExistsException(Exception):
"""Thrown when attempting to add a DataReporter with a conflicting name"""
pass
class SimStream(object):
"""
Manager for routing messages to their correct reporter.
"""
DEFAULT_CONFIG_PATH="simstream.cnf"
class MessageParser(object):
"""
Internal message parsing facilities.
"""
def __init__(self):
self.parsed = None
def __call__(self, message):
pass
def __init__(self, reporters={}, config={}):
self.reporters = reporters
self.consumer = None
self.config = config
def add_data_reporter(self, reporter):
"""
Add a new DataReporter object.
Arguments:
reporter -- the DataReporter to add
"""
if reporter.name in self.reporters:
raise ReporterExistsException
self.reporters[reporter.name] = reporter
def parse_config(self):
"""
Read the config file and set up the specified, data collection and
event handling resources.
"""
# TODO: Read in config
# TODO: Set up configuration dict
pass
def route_message(self, message):
"""
Send a message to the correct reporter.
"""
# TODO: Create new MessageParser
# TODO: Run message through MessageParser
# TODO: Route message to the correct DataReporter/EventMonitor
parser = MessageParser()
parser(message)
if parser.reporter_name in self.reporters:
self.reporters[parser.reporter_name].start_streaming(
parser.collector_name,
parser.routing_key
)
def start_collecting(self):
"""
Begin collecting data and monitoring for events.
"""
for reporter in self.reporters:
self.reporters[reporter].start_collecting()
def setup(self):
"""
Set up the SimStream instance: create DataCollectors, create
EventMonitors, configure AMQP consumer.
"""
self.parse_config()
#self.setup_consumer()
self.setup_data_collection()
self.setup_event_monitoring()
def setup_data_collection(self):
"""
Set up all DataReporters and DataCollectors.
"""
# TODO: Create and configure all DataReporters
# TODO: Create and configure all DataCollectors
# TODO: Assign each DataCollector to the correct DataReporter
if "reporters" in self.config:
for reporter in self.config.reporters:
pass
for collector in self.config.collectors:
pass
def setup_event_monitoring(self):
#TODO: Create and configure all EventMonitors
#TODO: Create and configure all EventHandlers
#TODO: Assign each EventHandler to the correct EventMonitor
#TODO: Assign each EventMonitor to the correct DataCollector
pass
def setup_consumer(self):
"""
Set up and configure the consumer.
"""
if len(self.config) > 0 and self.consumer is None:
if "message_handler" in self.config:
message_handler = self.config["message_handler"]
else:
message_handler = self.route_message
self.consumer = PikaAsyncConsumer(self.config["url"],
self.config["exchange"],
self.config["queue"],
message_handler,
exchange_type=self.config["exchange_type"],
routing_key=self.config["routing_key"]
)
def start(self):
"""
Configure and start SimStream.
"""
if self.consumer is None:
self.setup()
self.start_collecting()
#self.consumer.start()
def stop(self):
"""
Stop all data collection, event monitoring, and message consumption.
"""
self.consumer.stop()
self.stop_collecting()
if __name__ == "__main__":
def print_message(message):
with open("test.out", "w") as f:
print(message)
print(SimStream.DEFAULT_CONFIG_PATH)
config = {
"url": "amqp://guest:guest@localhost:5672",
"exchange": "simstream",
"queue": "simstream_test",
"message_handler": print_message,
"routing_key": "test_consumer",
"exchange_type": "topic"
}
streamer = SimStream(config=config)
try:
streamer.start()
except KeyboardInterrupt:
streamer.stop()