blob: 52716bb50698b430d4aef462c5bdca149b0451dd [file] [log] [blame]
"""
Module containing worker thread classes and shims
"""
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
from json import loads
from os import getenv, getpgid, killpg, path, setsid
from signal import SIGKILL, SIGTERM
from subprocess import Popen, PIPE, CalledProcessError
from sys import stdout
from threading import Thread
from time import sleep
THREAD_TIMEOUT = 800.0 # seconds to complete before join is forced
class ShimWorkerThread(Thread):
"""Parent class for shim worker threads and return a string once the thread has ended"""
def __init__(self, thread_name):
super(ShimWorkerThread, self).__init__(name=thread_name)
self.arg_list = []
self.return_obj = None
self.proc = None
def get_return_object(self):
"""Get the return object from the completed thread"""
return self.return_obj
def join_or_kill(self, timeout):
"""
Wait for thread to join after timeout (seconds). If still alive, it is then terminated, then if still alive,
killed
"""
self.join(timeout)
if self.is_alive():
if self.proc is not None:
if self._terminate_pg_loop():
if self._kill_pg_loop():
print '\n ERROR: Thread %s (pid=%d) alive after kill' % (self.name, self.proc.pid)
else:
print 'Killed'
stdout.flush()
else:
print 'Terminated'
stdout.flush()
else:
print 'ERROR: shims.join_or_kill(): Process joined and is alive, yet proc is None.'
def _terminate_pg_loop(self, num_attempts=2, wait_time=2):
cnt = 0
while cnt < num_attempts and self.is_alive():
cnt += 1
print '\n Thread %s (pid=%d) alive after timeout, terminating (try #%d)...' % (self.name, self.proc.pid,
cnt),
stdout.flush()
killpg(getpgid(self.proc.pid), SIGTERM)
sleep(wait_time)
return self.is_alive()
def _kill_pg_loop(self, num_attempts=2, wait_time=5):
cnt = 0
while cnt < num_attempts and self.is_alive():
cnt += 1
print '\n Thread %s (pid=%d) alive after terminate, killing (try #%d)...' % (self.name, self.proc.pid,
cnt),
stdout.flush()
killpg(getpgid(self.proc.pid), SIGKILL)
sleep(wait_time)
return self.is_alive()
class Sender(ShimWorkerThread):
"""Sender class for multi-threaded send"""
def __init__(self, use_shell_flag, send_shim_args, broker_addr, queue_name, test_key, json_test_str):
super(Sender, self).__init__('sender_thread_%s' % queue_name)
if send_shim_args is None:
print 'ERROR: Sender: send_shim_args == None'
self.use_shell_flag = use_shell_flag
self.arg_list.extend(send_shim_args)
self.arg_list.extend([broker_addr, queue_name, test_key, json_test_str])
def run(self):
"""Thread starts here"""
try:
#print str('\n>>SNDR>>' + str(self.arg_list)) # DEBUG - useful to see command-line sent to shim
self.proc = Popen(self.arg_list, stdout=PIPE, stderr=PIPE, shell=self.use_shell_flag, preexec_fn=setsid)
(stdoutdata, stderrdata) = self.proc.communicate()
if len(stderrdata) > 0:
#print '<<SNDR ERROR<<', stderrdata # DEBUG - useful to see shim's failure message
self.return_obj = (stdoutdata, stderrdata)
else:
#print '<<SNDR<<', stdoutdata # DEBUG - useful to see text received from shim
str_tvl = stdoutdata.split('\n')[0:-1] # remove trailing \n
if len(str_tvl) == 2:
try:
self.return_obj = (str_tvl[0], loads(str_tvl[1]))
except ValueError:
self.return_obj = stdoutdata
else: # Make a single line of all the bits and return that
self.return_obj = stdoutdata
except OSError as exc:
self.return_obj = str(exc) + ': shim=' + self.arg_list[0]
except CalledProcessError as exc:
self.return_obj = str(exc) + '\n\nOutput:\n' + exc.output
class Receiver(ShimWorkerThread):
"""Receiver class for multi-threaded receive"""
def __init__(self, receive_shim_args, broker_addr, queue_name, test_key, json_test_str):
super(Receiver, self).__init__('receiver_thread_%s' % queue_name)
if receive_shim_args is None:
print 'ERROR: Receiver: receive_shim_args == None'
self.arg_list.extend(receive_shim_args)
self.arg_list.extend([broker_addr, queue_name, test_key, json_test_str])
def run(self):
"""Thread starts here"""
try:
#print str('\n>>RCVR>>' + str(self.arg_list)) # DEBUG - useful to see command-line sent to shim
self.proc = Popen(self.arg_list, stdout=PIPE, stderr=PIPE, preexec_fn=setsid)
(stdoutdata, stderrdata) = self.proc.communicate()
if len(stderrdata) > 0:
#print '<<RCVR ERROR<<', stderrdata # DEBUG - useful to see shim's failure message
self.return_obj = (stdoutdata, stderrdata)
else:
#print '<<RCVR<<', stdoutdata # DEBUG - useful to see text received from shim
str_tvl = stdoutdata.split('\n')[0:-1] # remove trailing \n
if len(str_tvl) == 2:
try:
self.return_obj = (str_tvl[0], loads(str_tvl[1]))
except ValueError:
self.return_obj = stdoutdata
else: # Make a single line of all the bits and return that
self.return_obj = stdoutdata
except OSError as exc:
self.return_obj = str(exc) + ': shim=' + self.arg_list[0]
except CalledProcessError as exc:
self.return_obj = str(exc) + '\n\n' + exc.output
class Shim(object):
"""Abstract shim class, parent of all shims."""
NAME = None
JMS_CLIENT = False # Enables certain JMS-specific message checks
def __init__(self, sender_shim, receiver_shim):
self.sender_shim = sender_shim
self.receiver_shim = receiver_shim
self.send_params = None
self.receive_params = None
self.use_shell_flag = False
def create_sender(self, broker_addr, queue_name, test_key, json_test_str):
"""Create a new sender instance"""
sender = Sender(self.use_shell_flag, self.send_params, broker_addr, queue_name, test_key, json_test_str)
sender.daemon = True
return sender
def create_receiver(self, broker_addr, queue_name, test_key, json_test_str):
"""Create a new receiver instance"""
receiver = Receiver(self.receive_params, broker_addr, queue_name, test_key, json_test_str)
receiver.daemon = True
return receiver
class ProtonPythonShim(Shim):
"""Shim for qpid-proton Python client"""
NAME = 'ProtonPython'
def __init__(self, sender_shim, receiver_shim):
super(ProtonPythonShim, self).__init__(sender_shim, receiver_shim)
self.send_params = [self.sender_shim]
self.receive_params = [self.receiver_shim]
class ProtonCppShim(Shim):
"""Shim for qpid-proton C++ client"""
NAME = 'ProtonCpp'
def __init__(self, sender_shim, receiver_shim):
super(ProtonCppShim, self).__init__(sender_shim, receiver_shim)
self.send_params = [self.sender_shim]
self.receive_params = [self.receiver_shim]
class RheaJsShim(Shim):
"""Shim for Rhea Javascript client"""
NAME = 'RheaJs'
def __init__(self, sender_shim, receiver_shim):
super(RheaJsShim, self).__init__(sender_shim, receiver_shim)
self.send_params = [self.sender_shim]
self.receive_params = [self.receiver_shim]
class QpidJmsShim(Shim):
"""Shim for qpid-jms JMS client"""
NAME = 'QpidJms'
JMS_CLIENT = True
JAVA_HOME = getenv('JAVA_HOME', '/usr/bin') # Default only works in Linux
JAVA_EXEC = path.join(JAVA_HOME, 'java')
def __init__(self, dependency_class_path, sender_shim, receiver_shim):
super(QpidJmsShim, self).__init__(sender_shim, receiver_shim)
self.dependency_class_path = dependency_class_path
self.send_params = [self.JAVA_EXEC, '-cp', self.get_java_class_path(), self.sender_shim]
self.receive_params = [self.JAVA_EXEC, '-cp', self.get_java_class_path(), self.receiver_shim]
def get_java_class_path(self):
"""Method to construct and return the Java class path necessary to run the shim"""
return self.dependency_class_path;
class AmqpNetLiteShim(Shim):
"""Shim for AMQP.Net Lite client"""
NAME = 'AmqpNetLite'
def __init__(self, sender_shim, receiver_shim):
super(AmqpNetLiteShim, self).__init__(sender_shim, receiver_shim)
self.send_params = ['mono' ,self.sender_shim]
self.receive_params = ['mono', self.receiver_shim]