blob: e042fe13b730e0b23e424b052865b17c486def9d [file] [log] [blame]
#Licensed to the Apache Software Foundation (ASF) under one
#or more contributor license agreements. See the NOTICE file
#distributed with this work for additional information
#regarding copyright ownership. The ASF licenses this file
#to you under the Apache License, Version 2.0 (the
#"License"); you may not use this file except in compliance
#with the License. You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
#Unless required by applicable law or agreed to in writing, software
#distributed under the License is distributed on an "AS IS" BASIS,
#WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#See the License for the specific language governing permissions and
#limitations under the License.
# $Id:setup.py 5158 2007-04-09 00:14:35Z zim $
#
#------------------------------------------------------------------------------
import os, time, shutil, xmlrpclib, socket, pprint
from signal import *
from hodlib.Common.logger import hodLog, hodDummyLogger
from hodlib.Common.socketServers import hodXMLRPCServer
from hodlib.Common.util import local_fqdn
from hodlib.Common.xmlrpc import hodXRClient
class hodBaseService:
"""hodBaseService class - This class provides service registration, logging,
and configuration access methods. It also provides an XML-RPC server.
This class should be extended to create hod services. Methods beginning
with _xr_method will automatically be added to instances of this class.
"""
def __init__(self, name, config, xrtype='threaded'):
""" Initialization requires a name string and a config object of type
hodlib.Common.setup.options or hodlib.Common.setup.config."""
self.name = name
self.hostname = local_fqdn()
self._cfg = config
self._xrc = None
self.logs = {}
self._baseLogger = None
self._serviceID = os.getenv('PBS_JOBID')
self.__logDir = None
self.__svcrgy = None
self.__stop = False
self.__xrtype = xrtype
self._init_logging()
if name != 'serviceRegistry': self._init_signals()
self._init_xrc_server()
def __set_logging_level(self, level):
self.logs['main'].info("Setting log level to %s." % level)
for loggerName in self.loggers.keys():
self.logs['main'].set_logger_level(loggerName, level)
def __get_logging_level(self):
if self._cfg.has_key('stream'):
return self.loggers['main'].get_level('stream', 'main')
elif self._cfg.has_key('log-dir'):
return self.loggers['main'].get_level('file', 'main')
else:
return 0
def _xr_method_stop(self, *args):
"""XML-RPC method, calls stop() on ourselves."""
return self.stop()
def _xr_method_status(self, *args):
"""XML-RPC method, calls status() on ourselves."""
return self.status()
def _init_logging(self):
if self._cfg.has_key('debug'):
if self._cfg['debug'] > 0:
self._baseLogger = hodLog(self.name)
self.logs['main'] = self._baseLogger.add_logger('main')
if self._cfg.has_key('stream'):
if self._cfg['stream']:
self._baseLogger.add_stream(level=self._cfg['debug'],
addToLoggerNames=('main',))
if self._cfg.has_key('log-dir'):
if self._serviceID:
self.__logDir = os.path.join(self._cfg['log-dir'], "%s.%s" % (
self._cfg['userid'], self._serviceID))
else:
self.__logDir = os.path.join(self._cfg['log-dir'],
self._cfg['userid'])
if not os.path.exists(self.__logDir):
os.mkdir(self.__logDir)
self._baseLogger.add_file(logDirectory=self.__logDir,
level=self._cfg['debug'], addToLoggerNames=('main',))
if self._cfg.has_key('syslog-address'):
self._baseLogger.add_syslog(self._cfg['syslog-address'],
level=self._cfg['debug'], addToLoggerNames=('main',))
if not self.logs.has_key('main'):
self.logs['main'] = hodDummyLogger()
else:
self.logs['main'] = hodDummyLogger()
else:
self.logs['main'] = hodDummyLogger()
def _init_signals(self):
def sigStop(sigNum, handler):
self.sig_wrapper(sigNum, self.stop)
def toggleLevel():
currentLevel = self.__get_logging_level()
if currentLevel == 4:
self.__set_logging_level(1)
else:
self.__set_logging_level(currentLevel + 1)
def sigStop(sigNum, handler):
self._sig_wrapper(sigNum, self.stop)
def sigDebug(sigNum, handler):
self.sig_wrapper(sigNum, toggleLevel)
signal(SIGTERM, sigStop)
signal(SIGQUIT, sigStop)
signal(SIGINT, sigStop)
signal(SIGUSR2, sigDebug)
def _sig_wrapper(self, sigNum, handler, *args):
self.logs['main'].info("Caught signal %s." % sigNum)
if args:
handler(args)
else:
handler()
def _init_xrc_server(self):
host = None
ports = None
if self._cfg.has_key('xrs-address'):
(host, port) = (self._cfg['xrs-address'][0], self._cfg['xrs-address'][1])
ports = (port,)
elif self._cfg.has_key('xrs-port-range'):
host = ''
ports = self._cfg['xrs-port-range']
if host != None:
if self.__xrtype == 'threaded':
self._xrc = hodXMLRPCServer(host, ports)
elif self.__xrtype == 'twisted':
try:
from socketServers import twistedXMLRPCServer
self._xrc = twistedXMLRPCServer(host, ports, self.logs['main'])
except ImportError:
self.logs['main'].error("Twisted XML-RPC server not available, "
+ "falling back on threaded server.")
self._xrc = hodXMLRPCServer(host, ports)
for attr in dir(self):
if attr.startswith('_xr_method_'):
self._xrc.register_function(getattr(self, attr),
attr[11:])
self._xrc.register_introspection_functions()
def _register_service(self, port=None, installSignalHandlers=1):
if self.__svcrgy:
self.logs['main'].info(
"Registering service with service registery %s... " % self.__svcrgy)
svcrgy = hodXRClient(self.__svcrgy, None, None, 0, 0, installSignalHandlers)
if self._xrc and self._http:
svcrgy.registerService(self._cfg['userid'], self._serviceID,
self.hostname, self.name, 'hod', {
'xrs' : "http://%s:%s" % (
self._xrc.server_address[0],
self._xrc.server_address[1]),'http' :
"http://%s:%s" % (self._http.server_address[0],
self._http.server_address[1])})
elif self._xrc:
svcrgy.registerService(self._cfg['userid'], self._serviceID,
self.hostname, self.name, 'hod', {
'xrs' : "http://%s:%s" % (
self._xrc.server_address[0],
self._xrc.server_address[1]),})
elif self._http:
svcrgy.registerService(self._cfg['userid'], self._serviceID,
self.hostname, self.name, 'hod', {'http' :
"http://%s:%s" % (self._http.server_address[0],
self._http.server_address[1]),})
else:
svcrgy.registerService(self._cfg['userid'], self._serviceID,
self.hostname, name, 'hod', {} )
def start(self):
""" Start XML-RPC server and register service."""
self.logs['main'].info("Starting HOD service: %s ..." % self.name)
if self._xrc: self._xrc.serve_forever()
if self._cfg.has_key('register') and self._cfg['register']:
self._register_service()
def stop(self):
""" Stop XML-RPC server, unregister service and set stop flag. """
self.logs['main'].info("Stopping service...")
if self._xrc: self._xrc.stop()
self.__stop = True
return True
def status(self):
"""Returns true, should be overriden."""
return True
def wait(self):
"""Wait until stop method is called."""
while not self.__stop:
time.sleep(.1)