blob: af57f762bfcd0aa9a7dd2e4ec92ffab684d5283e [file] [log] [blame]
#
# Background worker that loads statistics from the Api-Gateway endpoint and sends them to StatsD.
# Designed to be restarted every minute by a cronjob
#
import json, sys, socket, urllib2
import argparse
from threading import Timer
class StatsCollectorWorker:
UDP_MAX_BUFFER_SIZE = 800
def __init__(self, statsRestEndpoint, statsd_host, statsd_port ):
print(("Starting collecting form [%s], sending to StatsD on [%s:%s]") % (gateway_uri, statsd_host, statsd_port) )
self.statsRestEndpoint = statsRestEndpoint
self.statsd_host = statsd_host
self.statsd_port = statsd_port
self.udp_buffer = ""
def getStatsFromUrl(self, url):
try:
stats_txt = urllib2.urlopen(url).read()
return json.loads(stats_txt)
except Exception as e:
print "Could not read stats", e
return None
def flushBuffer(self):
# print "Sending:", self.udp_buffer
self.udp_sock.send(self.udp_buffer)
self.udp_buffer = ''
def sentStatsFromCollection(self, metricCollection, metricType):
for i, item in enumerate(metricCollection):
# print i, item, metricCollection[item]
statsd_metric = ('%s:%s|%s' % (item, metricCollection[item],metricType)).encode("utf-8")
if self.udp_buffer.__len__() + statsd_metric.__len__() > StatsCollectorWorker.UDP_MAX_BUFFER_SIZE:
self.flushBuffer()
self.udp_buffer = ''.join([self.udp_buffer, "\n", statsd_metric])
def sendStats(self, obj):
# TODO: split statsd_host by space in order to support sending data to multiple statsd backends
try:
self.udp_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.udp_sock.connect((self.statsd_host, self.statsd_port))
except Exception as e:
print "Could not connect to StatsD", e
return None
try:
self.sentStatsFromCollection(obj["timers"], "ms")
self.sentStatsFromCollection(obj["counters"], "c")
if self.udp_buffer.__len__() > 0:
self.flushBuffer()
except Exception as e:
print "Could not send statistics", e
self.udp_sock.close()
def getAndSendStats(self,nextInterval, maxRuns):
# Read the JSON from the gateway's endpoint
# TODO: add an exception in case JSON conversion fails
obj = self.getStatsFromUrl(self.statsRestEndpoint)
if obj is not None:
self.sendStats(obj)
# print ("Stats processed. %s runs left" % (maxRuns))
if maxRuns <= 0:
return None
t = Timer(nextInterval, self.getAndSendStats, [nextInterval, maxRuns - 1])
t.start()
statsd_host = "api-gateway-graphite"
statsd_port = int("8125")
gateway_uri = "http://127.0.0.1/api-gateway-stats"
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('-s', '--statsd-host', dest='statsd_host', required=True)
parser.add_argument('-p', '--statsd-port', dest='statsd_port')
parser.add_argument('-g', '--gateway-uri', dest='gateway_uri')
args = parser.parse_args()
statsd_host = args.statsd_host or statsd_host
statsd_port = int(args.statsd_port or statsd_port)
gateway_uri = args.gateway_uri or gateway_uri
collector = StatsCollectorWorker(statsRestEndpoint=gateway_uri, statsd_host=statsd_host, statsd_port=statsd_port)
# set the collector to run each 3 seconds , max 18 times
collector.getAndSendStats(nextInterval=3,maxRuns=18)
# Sample StatsD messages
# pub.publisher_name.consumer.consumer_name.app.application_name.service.service_name.prod.region.useast.request.device_links.POST.200.count
# pub.publisher_name.consumer.consumer_name.app.application_name.service.service_name.prod.region.useast.request.device_links.POST.200.responseTime
# pub.publisher_name.consumer.consumer_name.app.application_name.service.service_name.prod.region.useast.request.device_links.POST.200.upstreamResponseTime
# pub.publisher_name.consumer.consumer_name.app.application_name.service.service_name.prod.region.useast.request.validate_request.GET.200.responseTime