blob: f86eed4396f90570969089dd4a106aa1823db91f [file] [log] [blame]
#!/usr/bin/env python
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
# -*- encoding: utf-8 -*-
"""python_instance_main.py: The main for the Python Instance
"""
import argparse
import logging
import os
import sys
import signal
import time
import json
import pulsar
import Function_pb2
import log
import server
import python_instance
import util
to_run = True
Log = log.Log
def atexit_function(signo, _frame):
global to_run
Log.info("Interrupted by %d, shutting down" % signo)
to_run = False
def main():
# Setup signal handlers
signal.signal(signal.SIGTERM, atexit_function)
signal.signal(signal.SIGHUP, atexit_function)
signal.signal(signal.SIGINT, atexit_function)
parser = argparse.ArgumentParser(description='Pulsar Functions Python Instance')
parser.add_argument('--function_classname', required=True, help='Function Class Name')
parser.add_argument('--py', required=True, help='Full Path of Function Code File')
parser.add_argument('--name', required=True, help='Function Name')
parser.add_argument('--tenant', required=True, help='Tenant Name')
parser.add_argument('--namespace', required=True, help='Namespace name')
parser.add_argument('--source_topics_serde_classname', required=True, help='A mapping of Input topics to SerDe')
parser.add_argument('--output_topic', required=False, help='Output Topic')
parser.add_argument('--output_serde_classname', required=False, help='Output Serde Classnames')
parser.add_argument('--instance_id', required=True, help='Instance Id')
parser.add_argument('--function_id', required=True, help='Function Id')
parser.add_argument('--function_version', required=True, help='Function Version')
parser.add_argument('--processing_guarantees', required=True, help='Processing Guarantees')
parser.add_argument('--source_subscription_type', required=True, help='Subscription Type')
parser.add_argument('--pulsar_serviceurl', required=True, help='Pulsar Service Url')
parser.add_argument('--port', required=True, help='Instance Port', type=int)
parser.add_argument('--max_buffered_tuples', required=True, help='Maximum number of Buffered tuples')
parser.add_argument('--user_config', required=False, help='User Config')
parser.add_argument('--logging_directory', required=True, help='Logging Directory')
parser.add_argument('--logging_file', required=True, help='Log file name')
parser.add_argument('--auto_ack', required=True, help='Enable Autoacking?')
parser.add_argument('--log_topic', required=False, help='Topic to send Log Messages')
args = parser.parse_args()
log_file = os.path.join(args.logging_directory,
util.getFullyQualifiedFunctionName(args.tenant, args.namespace, args.name),
"%s-%s.log" % (args.logging_file, args.instance_id))
log.init_rotating_logger(level=logging.INFO, logfile=log_file,
max_files=5, max_bytes=10 * 1024 * 1024)
Log.info("Starting Python instance with %s" % str(args))
function_details = Function_pb2.FunctionDetails()
function_details.tenant = args.tenant
function_details.namespace = args.namespace
function_details.name = args.name
function_details.className = args.function_classname
sourceSpec = Function_pb2.SourceSpec()
sourceSpec.subscriptionType = Function_pb2.SubscriptionType.Value(args.source_subscription_type)
try:
source_topics_serde_classname_dict = json.loads(args.source_topics_serde_classname)
except ValueError:
Log.critical("Cannot decode source_topics_serde_classname. This argument must be specifed as a JSON")
sys.exit(1)
if not source_topics_serde_classname_dict:
Log.critical("source_topics_serde_classname cannot be empty")
for topics, serde_classname in source_topics_serde_classname_dict.items():
sourceSpec.topicsToSerDeClassName[topics] = serde_classname
function_details.source.MergeFrom(sourceSpec)
sinkSpec = Function_pb2.SinkSpec()
if args.output_topic != None and len(args.output_topic) != 0:
sinkSpec.topic = args.output_topic
if args.output_serde_classname != None and len(args.output_serde_classname) != 0:
sinkSpec.serDeClassName = args.output_serde_classname
function_details.sink.MergeFrom(sinkSpec)
function_details.processingGuarantees = Function_pb2.ProcessingGuarantees.Value(args.processing_guarantees)
if args.auto_ack == "true":
function_details.autoAck = True
else:
function_details.autoAck = False
if args.user_config != None and len(args.user_config) != 0:
function_details.userConfig = args.user_config
pulsar_client = pulsar.Client(args.pulsar_serviceurl)
pyinstance = python_instance.PythonInstance(str(args.instance_id), str(args.function_id),
str(args.function_version), function_details,
int(args.max_buffered_tuples), str(args.py),
args.log_topic, pulsar_client)
pyinstance.run()
server_instance = server.serve(args.port, pyinstance)
global to_run
while to_run:
time.sleep(1)
pyinstance.join()
sys.exit(1)
if __name__ == '__main__':
main()