blob: 384827f019ad9998ff9cebd469102c84faf25400 [file] [log] [blame]
import time
from threading import Event
import stomp
from ctakes_pbj.type_system.type_system_loader import *
from ctakes_pbj.pbj_tools.pbj_defaults import STOP_MESSAGE
exit_event = Event()
def start_receiver(pipeline, queue_name, host_name, port_name,
password, username, r_id='1'):
StompReceiver(pipeline, queue_name, host_name, port_name, password, username, r_id)
while not exit_event.is_set():
exit_event.wait()
class StompReceiver(stomp.ConnectionListener):
def __init__(self, pipeline, queue_name, host_name, port_name, password, username, r_id):
self.source_queue = queue_name
self.source_host = host_name
self.source_port = port_name
self.pipeline = pipeline
self.password = password
self.username = username
self.r_id = r_id
self.typesystem = None
print(time.ctime((time.time())), "Starting Stomp Receiver on", self.source_host, self.source_queue, "...")
# Use a heartbeat of 10 minutes (in milliseconds)
self.conn = stomp.Connection12([(self.source_host, self.source_port)],
keepalive=True, heartbeats=(600000, 600000))
self.conn.set_listener('Stomp_Receiver', self)
self.stop = False
self.__connect_and_subscribe()
def __connect_and_subscribe(self):
self.conn.connect(self.username, self.password, wait=True)
self.conn.subscribe(destination=self.source_queue, id=self.r_id, ack='auto')
# self.conn.subscribe(destination=self.source_queue, id=self.id, ack='client')
def set_typesystem(self, typesystem):
self.typesystem = typesystem
def get_typesystem(self):
if self.typesystem is None:
# Load the typesystem
type_system_accessor = TypeSystemLoader()
type_system_accessor.load_type_system()
self.set_typesystem(type_system_accessor.get_type_system())
return self.typesystem
def set_host(self, host_name):
self.source_host = host_name
def set_stop(self, stop):
self.stop = stop
def stop_receiver(self):
self.conn.unsubscribe(destination=self.source_queue, id=self.r_id)
self.conn.disconnect()
print(time.ctime((time.time())), "Disconnected Stomp Receiver on", self.source_host, self.source_queue)
self.pipeline.collection_process_complete()
exit_event.set()
def on_message(self, frame):
if frame.body == STOP_MESSAGE:
print(time.ctime((time.time())), "Received Stop code.")
self.stop = True
# time.sleep(3)
self.stop_receiver()
else:
if XMI_INDICATOR in frame.body:
cas = cassis.load_cas_from_xmi(frame.body, self.get_typesystem())
self.pipeline.process(cas)
else:
print(time.ctime((time.time())), "Malformed Message:\n", frame.body)
def on_disconnected(self):
if self.stop is False:
self.__connect_and_subscribe()
def on_error(self, frame):
print(time.ctime((time.time())), "Receiver Error:", frame.body)