Added Collection Reader instead of generic annotator
Added parameter declaration by reader and annotators
diff --git a/ctakes-examples/src/user/resources/org/apache/ctakes/examples/ctakes_cnlpt_py/src/ctakes_cnlpt/ae/doctimerel_delegator.py b/ctakes-examples/src/user/resources/org/apache/ctakes/examples/ctakes_cnlpt_py/src/ctakes_cnlpt/ae/doctimerel_delegator.py
new file mode 100644
index 0000000..4581562
--- /dev/null
+++ b/ctakes-examples/src/user/resources/org/apache/ctakes/examples/ctakes_cnlpt_py/src/ctakes_cnlpt/ae/doctimerel_delegator.py
@@ -0,0 +1,46 @@
+from ctakes_pbj.component import cas_annotator
+from ctakes_pbj.pbj_tools.helper_functions import *
+from ctakes_pbj.pbj_tools.event_creator import EventCreator
+from ctakes_pbj.type_system import ctakes_types
+from cnlpt.api.cnlp_rest import EntityDocument
+import cnlpt.api.dtr_rest as dtr_rest
+import asyncio
+import time
+
+sem = asyncio.Semaphore(1)
+
+
+class DocTimeRelDelegator(cas_annotator.CasAnnotator):
+
+ def __init__(self, cas):
+ self.event_creator = EventCreator(cas)
+ self.event_mention_type = cas.typesystem.get_type(ctakes_types.EventMention)
+
+ # Initializes cNLPT, which loads its DocTimeRel model.
+ def initialize(self):
+ print(time.ctime((time.time())), "Initializing cnlp-transformers doctimerel ...")
+ asyncio.run(self.init_caller())
+ print(time.ctime((time.time())), "Done.")
+
+ # Processes the document to get DocTimeRel on Events from cNLPT.
+ def process(self, cas):
+ print(time.ctime((time.time())), "Processing cnlp-transformers doctimerel ...")
+ event_mentions = cas.select(ctakes_types.EventMention)
+ offsets = get_offsets(event_mentions)
+ asyncio.run(self.dtr_caller(cas, event_mentions, offsets))
+ print(time.ctime((time.time())), "cnlp-transformers doctimerel Done.")
+
+ async def init_caller(self):
+ await dtr_rest.startup_event()
+
+ async def dtr_caller(self, cas, event_mentions, offsets):
+ text = cas.sofa_string
+ e_doc = EntityDocument(doc_text=text, annotations=offsets)
+
+ #async with sem:
+ dtr_output = await dtr_rest.process(e_doc)
+ i = 0
+ for e in event_mentions:
+ event = self.event_creator.create_event(cas, dtr_output.statuses[i])
+ e.event = event
+ i += 1
diff --git a/ctakes-examples/src/user/resources/org/apache/ctakes/examples/ctakes_cnlpt_py/src/ctakes_cnlpt/ae/temporal_delegator.py b/ctakes-examples/src/user/resources/org/apache/ctakes/examples/ctakes_cnlpt_py/src/ctakes_cnlpt/ae/temporal_delegator.py
new file mode 100644
index 0000000..4183225
--- /dev/null
+++ b/ctakes-examples/src/user/resources/org/apache/ctakes/examples/ctakes_cnlpt_py/src/ctakes_cnlpt/ae/temporal_delegator.py
@@ -0,0 +1,113 @@
+from ctakes_pbj.component import cas_annotator
+from ctakes_pbj.pbj_tools import create_type
+from ctakes_pbj.pbj_tools.token_tools import *
+from ctakes_pbj.pbj_tools.create_relation import create_relation
+from ctakes_pbj.pbj_tools.helper_functions import *
+from ctakes_pbj.pbj_tools.event_creator import create_event
+from ctakes_pbj.type_system import ctakes_types
+import cnlpt.api.temporal_rest as temporal_rest
+import asyncio
+import time
+
+sem = asyncio.Semaphore(1)
+
+
+class TemporalDelegator(cas_annotator.CasAnnotator):
+
+ def __init__(self, cas):
+ self.event_mention_type = cas.typesystem.get_type(ctakes_types.EventMention)
+ self.timex_type = cas.typesystem.get_type(ctakes_types.TimeMention)
+ self.tlink_type = cas.typesystem.get_type(ctakes_types.TemporalTextRelation)
+ self.argument_type = cas.typesystem.get_type(ctakes_types.RelationArgument)
+
+ # Initializes cNLPT, which loads its Temporal model.
+ def initialize(self):
+ print(time.ctime((time.time())), "Initializing cnlp-transformers temporal ...")
+ asyncio.run(self.init_caller())
+ print(time.ctime((time.time())), "Done.")
+
+ # Process Sentences, adding Times, Events and TLinks found by cNLPT.
+ def process(self, cas):
+ print(time.ctime((time.time())), "Processing cnlp-transformers temporal ...")
+ sentences = cas.select(ctakes_types.Sentence)
+ event_mentions = cas.select(ctakes_types.EventMention)
+ sentence_events = get_covered_list(sentences, event_mentions)
+
+ # e_m_begins = []
+ # for e in e_mentions:
+ # e_m_begins.append(e.begin)
+
+ tokens = cas.select(ctakes_types.BaseToken)
+ sentence_tokens = get_covered_list(sentences, tokens)
+ # token_begins = []
+ # for t in tokens:
+ # token_begins.append(t.begin)
+
+ i = 0
+ while i < len(sentences):
+ if len(sentence_events[i]) > 0:
+ print(time.ctime((time.time())), "Processing cnlp-transformers temporal on sentence",
+ str(i), "of", str(len(sentences)), "...")
+ event_offsets = get_windowed_offsets(sentence_events[i], sentences[i].begin)
+ token_offsets = get_windowed_offsets(sentence_tokens[i], sentences[i].begin)
+ asyncio.run(self.temporal_caller(cas, sentences[i], sentence_events[i], event_offsets, token_offsets))
+ i += 1
+ print(time.ctime((time.time())), "cnlp-transformers temporal Done.")
+
+ async def init_caller(self):
+ await temporal_rest.startup_event()
+
+ async def temporal_caller(self, cas, sentence, event_mentions, event_offsets, token_offsets):
+
+ sentence_doc = temporal_rest.SentenceDocument(sentence.get_covered_text())
+ temporal_result = await temporal_rest.process_sentence(sentence_doc)
+
+ events_times = {}
+ i = 0
+ for t in temporal_result.timexes:
+ for tt in t:
+ first_token_offset = token_offsets[tt.begin]
+ last_token_offset = token_offsets[tt.end]
+ timex = create_type.add_type(cas, self.timex_type,
+ sentence.begin + first_token_offset[0],
+ sentence.begin + last_token_offset[1])
+ events_times['TIMEX-' + str(i)] = timex
+ i += 1
+
+ i = 0
+ for e in temporal_result.events:
+ for ee in e:
+ first_token_offset = token_offsets[ee.begin]
+ last_token_offset = token_offsets[ee.end]
+ event_mention = get_or_create_event_mention(cas, event_mentions,
+ sentence.begin + first_token_offset[0],
+ sentence.begin + last_token_offset[1])
+ event = create_event(cas, ee.dtr)
+ event_mention.event = event
+ events_times['EVENT-' + str(i)] = event_mention
+ i += 1
+
+ for r in temporal_result.relations:
+ for rr in r:
+ arg1 = self.argument_type()
+ arg1.argument = events_times[rr.arg1]
+ print("Arg1 =", events_times[rr.arg1])
+ arg2 = self.argument_type()
+ arg2.argument = events_times[rr.arg2]
+ print("Arg2 =", events_times[rr.arg2])
+ tlink = create_relation(self.tlink_type, rr.category, arg1, arg2)
+ cas.add(tlink)
+
+ def get_index_by_offsets(tokens, begin, end):
+ i = 0
+ for token in tokens:
+ if token.begin == begin and token.end == end:
+ return i
+ i += 1
+ return -1
+
+ def get_or_create_event_mention(cas, event_mentions, begin, end):
+ i = get_index_by_offsets(event_mentions, begin, end)
+ if i == -1:
+ return create_type.add_type(cas, self.event_mention_type, begin, end)
+ return event_mentions[i]
diff --git a/ctakes-pbj/src/main/java/org/apache/ctakes/pbj/ae/PbjJmsSender.java b/ctakes-pbj/src/main/java/org/apache/ctakes/pbj/ae/PbjJmsSender.java
index 3805e13..42d7266 100644
--- a/ctakes-pbj/src/main/java/org/apache/ctakes/pbj/ae/PbjJmsSender.java
+++ b/ctakes-pbj/src/main/java/org/apache/ctakes/pbj/ae/PbjJmsSender.java
@@ -39,7 +39,7 @@
*/
@PipeBitInfo(
name = "PbjJmsSender",
- description = "Sends jcas to Artemis Queue using Stomp",
+ description = "Sends jcas to Artemis Queue using JMS",
role = PipeBitInfo.Role.SPECIAL
)
public class PbjJmsSender extends PbjSender {
diff --git a/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/component/cas_annotator.py b/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/component/cas_annotator.py
index 5541e59..505a399 100644
--- a/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/component/cas_annotator.py
+++ b/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/component/cas_annotator.py
@@ -2,6 +2,15 @@
class CasAnnotator(ABC):
+
+ # Called once at the build of a pipeline.
+ def declare_params(self, arg_parser):
+ pass
+
+ # Called once at the beginning of a pipeline, before initialize.
+ def init_params(self, arg_parser):
+ pass
+
# Called once at the beginning of a pipeline.
def initialize(self):
pass
diff --git a/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/component/collection_reader.py b/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/component/collection_reader.py
new file mode 100644
index 0000000..3ae3de7
--- /dev/null
+++ b/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/component/collection_reader.py
@@ -0,0 +1,26 @@
+from abc import ABC, abstractmethod
+
+
+class CollectionReader(ABC):
+
+ # Set the pipeline. The collection reader controls the pipeline flow.
+ @abstractmethod
+ def set_pipeline(self, pipeline):
+ pass
+
+ # Called once at the build of a pipeline.
+ def declare_params(self, arg_parser):
+ pass
+
+ # Called once at the beginning of a pipeline, before initialize.
+ def init_params(self, arg_parser):
+ pass
+
+ # Called once at the beginning of a pipeline.
+ def initialize(self):
+ pass
+
+ # Called start reading cas objects and pass them to the pipeline.
+ @abstractmethod
+ def start(self):
+ pass
diff --git a/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/component/pbj_receiver.py b/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/component/pbj_receiver.py
index 6e3936f..79df482 100644
--- a/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/component/pbj_receiver.py
+++ b/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/component/pbj_receiver.py
@@ -1,86 +1,43 @@
-import time
-from threading import Event
-import stomp
-from ctakes_pbj.type_system.type_system_loader import *
-from ctakes_pbj.pipeline.pbj_pipeline import STOP_MESSAGE
-from ctakes_pbj.pbj_tools import arg_parser
-
-args = arg_parser.get_args()
-
-exit_event = Event()
+from ctakes_pbj.component.collection_reader import CollectionReader
+from ctakes_pbj.pbj_tools import pbj_defaults
+from ctakes_pbj.pbj_tools.stomp_receiver import start_receiver
-def start_receiver(pipeline, queue_name=args.receive_queue, host_name=args.host_name, port_name=args.port_name,
- password=args.password, username=args.username):
- PBJReceiver(pipeline, queue_name, host_name, port_name, password, username)
- while not exit_event.is_set():
- exit_event.wait()
+class PBJReceiver(CollectionReader):
+ def __init__(self):
+ self.port_name = None
+ self.host_name = None
+ self.queue_name = None
+ self.password = None
+ self.username = None
+ self.pipeline = None
+ self.receiving = False
+ self.stomp_receiver = None
-class PBJReceiver(stomp.ConnectionListener):
-
- def __init__(self, pipeline, queue_name, host_name, port_name, password, username):
- self.source_queue = queue_name
- self.source_host = host_name
- self.source_port = port_name
+ # Set the pipeline. The collection reader controls the pipeline flow.
+ def set_pipeline(self, pipeline):
self.pipeline = pipeline
- self.password = password
- self.username = username
- self.id = '1'
- self.typesystem = None
- print(time.ctime((time.time())), "Starting PBJ Receiver on", self.source_host, self.source_queue, "...")
- # Use a heartbeat of 10 minutes (in milliseconds)
- self.conn = stomp.Connection12([(self.source_host, self.source_port)],
- keepalive=True, heartbeats=(600000, 600000))
- self.conn.set_listener('PBJ_Receiver', self)
- self.stop = False
- self.__connect_and_subscribe()
- def __connect_and_subscribe(self):
- self.conn.connect(self.username, self.password, wait=True)
- self.conn.subscribe(destination=self.source_queue, id=self.id, ack='auto')
- # self.conn.subscribe(destination=self.source_queue, id=self.id, ack='client')
+ # Called once at the build of a pipeline.
+ def declare_params(self, arg_parser):
+ arg_parser.add_arg('receive_queue')
+ arg_parser.add_arg('-rh', '--receive_host', default=pbj_defaults.DEFAULT_HOST)
+ arg_parser.add_arg('-rpt', '--receive_port', default=pbj_defaults.DEFAULT_PORT)
+ arg_parser.add_arg('-ru', '--receive_user', default=pbj_defaults.DEFAULT_USER)
+ arg_parser.add_arg('-rp', '--receive_pass', default=pbj_defaults.DEFAULT_PASS)
- def set_typesystem(self, typesystem):
- self.typesystem = typesystem
+ # Called once at the beginning of a pipeline, before initialize.
+ def init_params(self, args):
+ self.queue_name = args.receive_queue
+ self.host_name = args.receive_host
+ self.port_name = args.receive_port
+ self.username = args.receive_user
+ self.password = args.receive_pass
- def get_typesystem(self):
- if self.typesystem is None:
- # Load the typesystem
- type_system_accessor = TypeSystemLoader()
- type_system_accessor.load_type_system()
- self.set_typesystem(type_system_accessor.get_type_system())
- return self.typesystem
-
- def set_host(self, host_name):
- self.source_host = host_name
-
- def set_stop(self, stop):
- self.stop = stop
-
- def stop_receiver(self):
- self.conn.unsubscribe(destination=self.source_queue, id=self.id)
- self.conn.disconnect()
- print(time.ctime((time.time())), "Disconnected PBJ Receiver on", self.source_host, self.source_queue)
- self.pipeline.collection_process_complete()
- exit_event.set()
-
- def on_message(self, frame):
- if frame.body == STOP_MESSAGE:
- print(time.ctime((time.time())), "Received Stop code.")
- self.stop = True
- # time.sleep(3)
- self.stop_receiver()
- else:
- if XMI_INDICATOR in frame.body:
- cas = cassis.load_cas_from_xmi(frame.body, self.get_typesystem())
- self.pipeline.process(cas)
- else:
- print(time.ctime((time.time())), "Malformed Message:\n", frame.body)
-
- def on_disconnected(self):
- if self.stop is False:
- self.__connect_and_subscribe()
-
- def on_error(self, frame):
- print(time.ctime((time.time())), "Receiver Error:", frame.body)
+ # Called start reading cas objects and pass them to the pipeline.
+ def start(self):
+ if not self.receiving:
+ self.receiving = True
+ start_receiver(self.pipeline, self.queue_name, self.host_name, self.port_name,
+ self.password, self.username)
diff --git a/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/component/pbj_sender.py b/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/component/pbj_sender.py
index 1cdf0e9..ef3c4c0 100644
--- a/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/component/pbj_sender.py
+++ b/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/component/pbj_sender.py
@@ -1,34 +1,51 @@
import stomp
import time
from ctakes_pbj.component import cas_annotator
-from ctakes_pbj.pipeline.pbj_pipeline import STOP_MESSAGE
-from ctakes_pbj.pbj_tools import arg_parser
-
-args = arg_parser.get_args()
+from ctakes_pbj.pbj_tools.pbj_defaults import *
class PBJSender(cas_annotator.CasAnnotator):
- def __init__(self, queue_name=args.send_queue, host_name=args.host_name, port_name=args.port_name,
- password=args.password, username=args.username):
+ def __init__(self):
+ self.target_queue = None
+ self.target_host = None
+ self.target_port = None
+ self.password = None
+ self.username = None
+ self.conn = None
- self.target_queue = queue_name
- self.target_host = host_name
- self.target_port = port_name
- self.password = password
- self.username = username
+ # Called once at the build of a pipeline.
+ def declare_params(self, arg_parser):
+ arg_parser.add_arg('send_queue')
+ arg_parser.add_arg('-sh', '--send_host', default=DEFAULT_HOST)
+ arg_parser.add_arg('-spt', '--send_port', default=DEFAULT_PORT)
+ arg_parser.add_arg('-su', '--send_user', default=DEFAULT_USER)
+ arg_parser.add_arg('-sp', '--send_pass', default=DEFAULT_PASS)
+
+ # Called once at the beginning of a pipeline, before initialize.
+ def init_params(self, args):
+ self.target_queue = args.send_queue
+ self.target_host = args.send_host
+ self.target_port = args.send_port
+ self.username = args.send_user
+ self.password = args.send_pass
+
+ # Called once at the beginning of a pipeline.
+ def initialize(self):
print(time.ctime((time.time())), "Starting PBJ Sender on", self.target_host, self.target_queue, "...")
# Use a heartbeat of 10 minutes (in milliseconds)
self.conn = stomp.Connection12([(self.target_host, self.target_port)],
keepalive=True, heartbeats=(600000, 600000))
self.conn.connect(self.username, self.password, wait=True)
+ # Called for every cas passed through the pipeline.
def process(self, cas):
print(time.ctime((time.time())), "Sending processed information to",
self.target_host, self.target_queue, "...")
xmi = cas.to_xmi()
self.conn.send(self.target_queue, xmi)
+ # Called once at the end of the pipeline.
def collection_process_complete(self):
self.send_stop()
diff --git a/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/examples/sentence_printer_pipeline.py b/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/examples/sentence_printer_pipeline.py
index 134c268..e7d6452 100644
--- a/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/examples/sentence_printer_pipeline.py
+++ b/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/examples/sentence_printer_pipeline.py
@@ -4,8 +4,7 @@
# These are the lines that ignore the typesystem errors
import warnings
-
-from ctakes_pbj.component.pbj_receiver import start_receiver
+from ctakes_pbj.component.pbj_receiver import PBJReceiver
from ctakes_pbj.examples.sentence_printer import SentencePrinter
from ctakes_pbj.component.pbj_sender import PBJSender
from ctakes_pbj.pipeline.pbj_pipeline import PBJPipeline
@@ -17,10 +16,11 @@
def main():
pipeline = PBJPipeline()
+ pipeline.reader(PBJReceiver())
pipeline.add(SentencePrinter())
pipeline.add(PBJSender())
pipeline.initialize()
- start_receiver(pipeline)
+ pipeline.run()
if __name__ == "__main__":
diff --git a/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/examples/word_finder_pipeline.py b/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/examples/word_finder_pipeline.py
index 666d65b..6864101 100644
--- a/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/examples/word_finder_pipeline.py
+++ b/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/examples/word_finder_pipeline.py
@@ -4,9 +4,9 @@
# These are the lines that ignore the typesystem errors
import warnings
-from ctakes_pbj.component.pbj_receiver import start_receiver
from ctakes_pbj.examples.word_finder import WordFinder
from ctakes_pbj.component.pbj_sender import PBJSender
+from ctakes_pbj.component.pbj_receiver import PBJReceiver
from ctakes_pbj.pipeline.pbj_pipeline import PBJPipeline
warnings.filterwarnings("ignore")
@@ -15,10 +15,11 @@
def main():
pipeline = PBJPipeline()
+ pipeline.reader(PBJReceiver())
pipeline.add(WordFinder())
pipeline.add(PBJSender())
pipeline.initialize()
- start_receiver(pipeline)
+ pipeline.run()
main()
diff --git a/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/pbj_tools/arg_parser.py b/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/pbj_tools/arg_parser.py
index ea84ef7..f10a37f 100644
--- a/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/pbj_tools/arg_parser.py
+++ b/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/pbj_tools/arg_parser.py
@@ -1,28 +1,24 @@
import argparse
-DEFAULT_HOST = 'localhost'
-DEFAULT_PORT = 61616
-DEFAULT_USER = 'guest'
-DEFAULT_PASS = 'guest'
-DEFAULT_OUT_DIR = 'pbj_output/'
-def get_args():
- parser = argparse.ArgumentParser(
- prog='pbj_sender.py',
- description='Sends...',
- epilog='Text at the bottom of help'
- )
- parser.add_argument('receive_queue')
- parser.add_argument('send_queue')
- parser.add_argument('-hn', '--host_name', default=DEFAULT_HOST)
- parser.add_argument('-pn', '--port_name', default=DEFAULT_PORT)
- parser.add_argument('-u', '--username', default=DEFAULT_USER)
- parser.add_argument('-p', '--password', default=DEFAULT_PASS)
- parser.add_argument('-o', '--output_dir', default=DEFAULT_OUT_DIR)
+class ArgParser:
- parser.parse_args()
- args = parser.parse_args()
+ def __init__(self):
+ self.arg_parser = None
- return args
+ def get_arg_parser(self):
+ if self.arg_parser is None:
+ print('Creating arg_parser')
+ self.arg_parser = argparse.ArgumentParser(
+ prog='ctakes-pbj',
+ description='Does wonderful stuff...',
+ epilog='Text at the bottom of help'
+ )
+ return self.arg_parser
+ def add_arg(self, *args, **kwargs):
+ self.get_arg_parser().add_argument(*args, **kwargs)
+ def get_args(self):
+ print('Parsing Arguments')
+ return self.get_arg_parser().parse_args()
diff --git a/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/pbj_tools/pbj_defaults.py b/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/pbj_tools/pbj_defaults.py
new file mode 100644
index 0000000..6e8acb8
--- /dev/null
+++ b/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/pbj_tools/pbj_defaults.py
@@ -0,0 +1,6 @@
+DEFAULT_HOST = 'localhost'
+DEFAULT_PORT = 61616
+DEFAULT_USER = 'guest'
+DEFAULT_PASS = 'guest'
+DEFAULT_OUT_DIR = 'pbj_output/'
+STOP_MESSAGE = "Apache cTAKES PBJ Stop Message."
diff --git a/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/pbj_tools/stomp_receiver.py b/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/pbj_tools/stomp_receiver.py
new file mode 100644
index 0000000..384827f
--- /dev/null
+++ b/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/pbj_tools/stomp_receiver.py
@@ -0,0 +1,84 @@
+import time
+from threading import Event
+import stomp
+from ctakes_pbj.type_system.type_system_loader import *
+from ctakes_pbj.pbj_tools.pbj_defaults import STOP_MESSAGE
+
+
+exit_event = Event()
+
+
+def start_receiver(pipeline, queue_name, host_name, port_name,
+ password, username, r_id='1'):
+ StompReceiver(pipeline, queue_name, host_name, port_name, password, username, r_id)
+ while not exit_event.is_set():
+ exit_event.wait()
+
+
+class StompReceiver(stomp.ConnectionListener):
+
+ def __init__(self, pipeline, queue_name, host_name, port_name, password, username, r_id):
+ self.source_queue = queue_name
+ self.source_host = host_name
+ self.source_port = port_name
+ self.pipeline = pipeline
+ self.password = password
+ self.username = username
+ self.r_id = r_id
+ self.typesystem = None
+ print(time.ctime((time.time())), "Starting Stomp Receiver on", self.source_host, self.source_queue, "...")
+ # Use a heartbeat of 10 minutes (in milliseconds)
+ self.conn = stomp.Connection12([(self.source_host, self.source_port)],
+ keepalive=True, heartbeats=(600000, 600000))
+ self.conn.set_listener('Stomp_Receiver', self)
+ self.stop = False
+ self.__connect_and_subscribe()
+
+ def __connect_and_subscribe(self):
+ self.conn.connect(self.username, self.password, wait=True)
+ self.conn.subscribe(destination=self.source_queue, id=self.r_id, ack='auto')
+ # self.conn.subscribe(destination=self.source_queue, id=self.id, ack='client')
+
+ def set_typesystem(self, typesystem):
+ self.typesystem = typesystem
+
+ def get_typesystem(self):
+ if self.typesystem is None:
+ # Load the typesystem
+ type_system_accessor = TypeSystemLoader()
+ type_system_accessor.load_type_system()
+ self.set_typesystem(type_system_accessor.get_type_system())
+ return self.typesystem
+
+ def set_host(self, host_name):
+ self.source_host = host_name
+
+ def set_stop(self, stop):
+ self.stop = stop
+
+ def stop_receiver(self):
+ self.conn.unsubscribe(destination=self.source_queue, id=self.r_id)
+ self.conn.disconnect()
+ print(time.ctime((time.time())), "Disconnected Stomp Receiver on", self.source_host, self.source_queue)
+ self.pipeline.collection_process_complete()
+ exit_event.set()
+
+ def on_message(self, frame):
+ if frame.body == STOP_MESSAGE:
+ print(time.ctime((time.time())), "Received Stop code.")
+ self.stop = True
+ # time.sleep(3)
+ self.stop_receiver()
+ else:
+ if XMI_INDICATOR in frame.body:
+ cas = cassis.load_cas_from_xmi(frame.body, self.get_typesystem())
+ self.pipeline.process(cas)
+ else:
+ print(time.ctime((time.time())), "Malformed Message:\n", frame.body)
+
+ def on_disconnected(self):
+ if self.stop is False:
+ self.__connect_and_subscribe()
+
+ def on_error(self, frame):
+ print(time.ctime((time.time())), "Receiver Error:", frame.body)
diff --git a/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/pipeline/pbj_pipeline.py b/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/pipeline/pbj_pipeline.py
index c61034f..0783453 100644
--- a/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/pipeline/pbj_pipeline.py
+++ b/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/pipeline/pbj_pipeline.py
@@ -1,4 +1,5 @@
-STOP_MESSAGE = "Apache cTAKES PBJ Stop Message."
+from ctakes_pbj.pbj_tools import pbj_defaults
+from ctakes_pbj.pbj_tools.arg_parser import ArgParser
class PBJPipeline:
@@ -6,23 +7,53 @@
def __init__(self):
self.annotators = []
self.initialized = False
+ self.c_reader = None
+ self.arg_parser = ArgParser()
- def add(self, cas_processor):
- self.annotators.append(cas_processor)
+ # Set the Collection Reader for the Corpus.
+ # This is absolutely necessary. If you don't tell the pipeline how to get the notes ...
+ def reader(self, collection_reader):
+ collection_reader.declare_params(self.arg_parser)
+ collection_reader.set_pipeline(self)
+ self.c_reader = collection_reader
+ # Add an annotator to the pipeline.
+ def add(self, cas_annotator):
+ cas_annotator.declare_params(self.arg_parser)
+ self.annotators.append(cas_annotator)
+
+ # Fill command line parameters, then call each annotator to initialize.
def initialize(self):
- for processor in self.annotators:
- self.initialized = True
- processor.initialize()
+ if self.c_reader is None:
+ print('No Reader Specified, quitting')
+ exit(1)
+ self.arg_parser.add_arg('-o', '--output_dir', default=pbj_defaults.DEFAULT_OUT_DIR)
+ # Get/Init all of the declared parameter arguments.
+ # Do the actual argument parsing.
+ # If get_args has already been called then added parameters will crash the tool.
+ args = self.arg_parser.get_args()
+ # Set the necessary parameters in the collection reader.
+ self.c_reader.init_params(args)
+ # For each annotator set the necessary parameters.
+ for annotator in self.annotators:
+ annotator.init_params(args)
+ # For each annotator initialize resources, etc.
+ for annotator in self.annotators:
+ annotator.initialize()
+ self.initialized = True
- def process(self, cas):
- if self.initialized:
- for processor in self.annotators:
- processor.process(cas)
- else:
+ # Starts / Runs the pipeline. This calls start on the collection reader.
+ def run(self):
+ if not self.initialized:
self.initialize()
- self.process(cas)
+ self.c_reader.start()
+ # For a new cas, call each annotator to process that cas.
+ def process(self, cas):
+ for annotator in self.annotators:
+ annotator.process(cas)
+
+ # At the end of the corpus, call each annotator for cleanup, etc.
def collection_process_complete(self):
- for processor in self.annotators:
- processor.collection_process_complete()
+ for annotator in self.annotators:
+ annotator.collection_process_complete()