Make Python code work with Pulsar Function localrun (#1930)
diff --git a/pulsar-functions/instance/src/main/python/python_instance_main.py b/pulsar-functions/instance/src/main/python/python_instance_main.py
index 7454132..f86eed4 100644
--- a/pulsar-functions/instance/src/main/python/python_instance_main.py
+++ b/pulsar-functions/instance/src/main/python/python_instance_main.py
@@ -95,10 +95,10 @@
try:
source_topics_serde_classname_dict = json.loads(args.source_topics_serde_classname)
except ValueError:
- log.critical("Cannot decode source_topics_serde_classname. This argument must be specifed as a JSON")
+ Log.critical("Cannot decode source_topics_serde_classname. This argument must be specifed as a JSON")
sys.exit(1)
if not source_topics_serde_classname_dict:
- log.critical("source_topics_serde_classname cannot be empty")
+ Log.critical("source_topics_serde_classname cannot be empty")
for topics, serde_classname in source_topics_serde_classname_dict.items():
sourceSpec.topicsToSerDeClassName[topics] = serde_classname
function_details.source.MergeFrom(sourceSpec)
diff --git a/pulsar-functions/instance/src/main/python/util.py b/pulsar-functions/instance/src/main/python/util.py
index 6aa4948..de39006 100644
--- a/pulsar-functions/instance/src/main/python/util.py
+++ b/pulsar-functions/instance/src/main/python/util.py
@@ -29,13 +29,14 @@
import log
Log = log.Log
-PULSARFUNCTIONAPIROOT = 'functions'
+PULSAR_API_ROOT = 'pulsar'
+PULSAR_FUNCTIONS_API_ROOT = 'functions'
def import_class(from_path, full_class_name):
kclass = import_class_from_path(from_path, full_class_name)
if kclass is None:
our_dir = os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe())))
- api_dir = os.path.join(our_dir, PULSARFUNCTIONAPIROOT)
+ api_dir = os.path.join(our_dir, PULSAR_API_ROOT, PULSAR_FUNCTIONS_API_ROOT)
kclass = import_class_from_path(api_dir, full_class_name)
return kclass