blob: 53e13870d78c943a9e329b77845bbae9cc9e9e81 [file] [log] [blame]
#
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
"""
Utilties for collecting system data.
Author: Jeff Kinnison (jkinniso@nd.edu)
"""
# TODO: Refactor to iterate over producers, not collectors. Collectors should
# execute concurrently.
# TODO: Add method to deactivate reporter
from threading import Thread, Event
from .datacollector import DataCollector
class CollectorExistsException(Exception):
"""Thrown when attempting to add a collector with a conflicting name."""
pass
class CollectorDoesNotExistException(Exception):
"""Thrown when attempting to access a collector that does not exist."""
pass
class DataReporter(object):
"""Manages collecting specified data.
Subclass of threading.Thread that modifies Thread.join() and Thread.run()
Instance variables:
collectors -- a dict of DataCollectors that are run at interval
Public methods:
add_collector -- add a new DataCollector to the list
run -- start the data collection loop
join -- end data collection and return control to main thread
start_collecting -- begin data collection for all collectors
start_collector -- begin data collection for a specific collector
stop_collecting -- stop all data collection
stop_collector -- stop a running DataCollector
"""
def __init__(self, collectors={}):
super(DataReporter, self).__init__()
self.collectors = {}
for key, value in collectors:
self.add_collector(
key,
value.limit,
value.callback,
value.url,
value.exchange,
value.postprocessor,
value.callback_args,
value.postprocessor_args
)
def add_collector(self, name, callback, rabbitmq_url, exchange, limit=250, interval=10, postprocessor=None,
exchange_type="direct", callback_args=[], postprocessor_args=[]):
"""Add a new collector.
Arguments:
name -- name of the new DataCollector
callback -- the data collection callback to run
Keyword arguments:
limit -- the number of data points to store (default 100)
postprocessor -- a postprocessing function to run on each data point
(default None)
callback_args -- a list of arguments to pass to the callback
(default [])
postprocessor_args -- a list of arguments to pass to the postprocessor
(default [])
Raises:
CollectorExistsException if a collector named name already exists
"""
if name in self.collectors:
raise CollectorExistsException
self.collectors[name] = DataCollector(
name,
callback,
rabbitmq_url,
exchange,
limit=limit,
interval=interval,
postprocessor=postprocessor,
exchange_type=exchange_type,
callback_args=callback_args,
postprocessor_args=postprocessor_args
)
def start_collecting(self):
"""
Start data collection for all associated collectors.
"""
for collector in self.collectors:
self.start_collector(collector)
def start_collector(self, name):
"""
Activate the specified collector.
Arguments:
name -- the name of the collector to start
Raises:
RuntimeError if the collector has already been started.
"""
try:
self.collectors[name].activate()
self.collectors[name].start()
except RuntimeError as e:
print("Error starting collector ", name)
print(e)
def stop_collecting(self):
"""
Stop all collectors.
"""
for collector in self.collectors:
self.stop_collector(collector)
def stop_collector(self, name):
"""Deactivate the specified collector.
Arguments:
name -- the name of the collector to stop
Raises:
CollectorDoesNotExistException if no collector named name exists
"""
if name not in self.collectors:
raise CollectorDoesNotExistException
try:
self.collectors[name].deactivate()
self.collectors[name].join()
except RuntimeError as e: # Catch deadlock
print(e)
def start_streaming(self, collector_name, routing_key):
"""
Begin streaming data from a collector to a particular recipient.
Arguments:
routing_key -- the routing key to reach the intended recipient
"""
if collector_name not in self.collectors: # Make sure collector exists
raise CollectorDoesNotExistException
self.collectors[collector_name].add_routing_key(routing_key)
def stop_streaming(self, collector_name, routing_key):
"""
Stop a particular stream.
Arguments:
collector_name -- the collector associated with the producer to stop
routing_key -- the routing key to reach the intended recipient
Raises:
ProducerDoesNotExistException if no producer named name exists
ValueError if the producer is removed by another call to this method
after the for loop begins
"""
pass