blob: f5868093faea45457a3186914dc5cb52d06f5cab [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
# -*- encoding: utf-8 -*-
"""util.py: Some misc utility functions
"""
import os
import inspect
import sys
import importlib
import configparser
from threading import Timer
from pulsar.functions import serde
import log
Log = log.Log
PULSAR_API_ROOT = 'pulsar'
PULSAR_FUNCTIONS_API_ROOT = 'functions'
def import_class(from_path, full_class_name):
from_path = str(from_path)
full_class_name = str(full_class_name)
try:
return import_class_from_path(from_path, full_class_name)
except Exception as e:
our_dir = os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe())))
api_dir = os.path.join(our_dir, PULSAR_API_ROOT, PULSAR_FUNCTIONS_API_ROOT)
try:
return import_class_from_path(api_dir, full_class_name)
except Exception as e:
Log.info("Failed to import class %s from path %s" % (full_class_name, from_path))
Log.info(e, exc_info=True)
return None
def import_class_from_path(from_path, full_class_name):
Log.debug('Trying to import %s from path %s' % (full_class_name, from_path))
split = full_class_name.split('.')
classname_path = '.'.join(split[:-1])
class_name = full_class_name.split('.')[-1]
if from_path not in sys.path:
Log.debug("Add a new dependency to the path: %s" % from_path)
sys.path.insert(0, from_path)
if not classname_path:
mod = importlib.import_module(class_name)
return mod
else:
# Serde modules is being used in unqualified form instead of using
# the full name `pulsar.functions.serde`, so we have to make sure
# it gets resolved correctly.
if classname_path == 'serde':
mod = serde
else:
mod = importlib.import_module(classname_path)
retval = getattr(mod, class_name)
return retval
def getFullyQualifiedFunctionName(tenant, namespace, name):
return "%s/%s/%s" % (tenant, namespace, name)
def getFullyQualifiedInstanceId(tenant, namespace, name, instance_id):
return "%s/%s/%s:%s" % (tenant, namespace, name, instance_id)
def get_properties(fullyQualifiedName, instanceId):
return {"application": "pulsar-function", "id": str(fullyQualifiedName), "instance_id": str(instanceId)}
def read_config(config_file):
"""
The content of the configuration file is styled as follows:
[DEFAULT]
parameter1 = value1
parameter2 = value2
parameter3 = value3
...
"""
if config_file == "":
return None
cfg = configparser.ConfigParser()
cfg.read(config_file)
return cfg
class FixedTimer():
def __init__(self, t, hFunction, name="timer-thread"):
self.t = t
self.hFunction = hFunction
self.thread = Timer(self.t, self.handle_function)
self.thread.setName(name)
self.thread.setDaemon(True)
def handle_function(self):
self.hFunction()
self.thread = Timer(self.t, self.handle_function)
self.thread.start()
def start(self):
self.thread.start()
def cancel(self):
self.thread.cancel()