Added Collection Reader instead of generic annotator
Added parameter declaration by reader and annotators
diff --git a/ctakes-examples/src/user/resources/org/apache/ctakes/examples/ctakes_cnlpt_py/src/ctakes_cnlpt/ae/doctimerel_delegator.py b/ctakes-examples/src/user/resources/org/apache/ctakes/examples/ctakes_cnlpt_py/src/ctakes_cnlpt/ae/doctimerel_delegator.py
new file mode 100644
index 0000000..4581562
--- /dev/null
+++ b/ctakes-examples/src/user/resources/org/apache/ctakes/examples/ctakes_cnlpt_py/src/ctakes_cnlpt/ae/doctimerel_delegator.py
@@ -0,0 +1,46 @@
+from ctakes_pbj.component import cas_annotator
+from ctakes_pbj.pbj_tools.helper_functions import *
+from ctakes_pbj.pbj_tools.event_creator import EventCreator
+from ctakes_pbj.type_system import ctakes_types
+from cnlpt.api.cnlp_rest import EntityDocument
+import cnlpt.api.dtr_rest as dtr_rest
+import asyncio
+import time
+
+sem = asyncio.Semaphore(1)
+
+
+class DocTimeRelDelegator(cas_annotator.CasAnnotator):
+
+    def __init__(self, cas):
+        self.event_creator = EventCreator(cas)
+        self.event_mention_type = cas.typesystem.get_type(ctakes_types.EventMention)
+
+    # Initializes cNLPT, which loads its DocTimeRel model.
+    def initialize(self):
+        print(time.ctime((time.time())), "Initializing cnlp-transformers doctimerel ...")
+        asyncio.run(self.init_caller())
+        print(time.ctime((time.time())), "Done.")
+
+    # Processes the document to get DocTimeRel on Events from cNLPT.
+    def process(self, cas):
+        print(time.ctime((time.time())), "Processing cnlp-transformers doctimerel ...")
+        event_mentions = cas.select(ctakes_types.EventMention)
+        offsets = get_offsets(event_mentions)
+        asyncio.run(self.dtr_caller(cas, event_mentions, offsets))
+        print(time.ctime((time.time())), "cnlp-transformers doctimerel Done.")
+
+    async def init_caller(self):
+        await dtr_rest.startup_event()
+
+    async def dtr_caller(self, cas, event_mentions, offsets):
+        text = cas.sofa_string
+        e_doc = EntityDocument(doc_text=text, annotations=offsets)
+
+        #async with sem:
+        dtr_output = await dtr_rest.process(e_doc)
+        i = 0
+        for e in event_mentions:
+            event = self.event_creator.create_event(cas, dtr_output.statuses[i])
+            e.event = event
+            i += 1
diff --git a/ctakes-examples/src/user/resources/org/apache/ctakes/examples/ctakes_cnlpt_py/src/ctakes_cnlpt/ae/temporal_delegator.py b/ctakes-examples/src/user/resources/org/apache/ctakes/examples/ctakes_cnlpt_py/src/ctakes_cnlpt/ae/temporal_delegator.py
new file mode 100644
index 0000000..4183225
--- /dev/null
+++ b/ctakes-examples/src/user/resources/org/apache/ctakes/examples/ctakes_cnlpt_py/src/ctakes_cnlpt/ae/temporal_delegator.py
@@ -0,0 +1,113 @@
+from ctakes_pbj.component import cas_annotator
+from ctakes_pbj.pbj_tools import create_type
+from ctakes_pbj.pbj_tools.token_tools import *
+from ctakes_pbj.pbj_tools.create_relation import create_relation
+from ctakes_pbj.pbj_tools.helper_functions import *
+from ctakes_pbj.pbj_tools.event_creator import create_event
+from ctakes_pbj.type_system import ctakes_types
+import cnlpt.api.temporal_rest as temporal_rest
+import asyncio
+import time
+
+sem = asyncio.Semaphore(1)
+
+
+class TemporalDelegator(cas_annotator.CasAnnotator):
+
+    def __init__(self, cas):
+        self.event_mention_type = cas.typesystem.get_type(ctakes_types.EventMention)
+        self.timex_type = cas.typesystem.get_type(ctakes_types.TimeMention)
+        self.tlink_type = cas.typesystem.get_type(ctakes_types.TemporalTextRelation)
+        self.argument_type = cas.typesystem.get_type(ctakes_types.RelationArgument)
+
+    # Initializes cNLPT, which loads its Temporal model.
+    def initialize(self):
+        print(time.ctime((time.time())), "Initializing cnlp-transformers temporal ...")
+        asyncio.run(self.init_caller())
+        print(time.ctime((time.time())), "Done.")
+
+    # Process Sentences, adding Times, Events and TLinks found by cNLPT.
+    def process(self, cas):
+        print(time.ctime((time.time())), "Processing cnlp-transformers temporal ...")
+        sentences = cas.select(ctakes_types.Sentence)
+        event_mentions = cas.select(ctakes_types.EventMention)
+        sentence_events = get_covered_list(sentences, event_mentions)
+
+        # e_m_begins = []
+        # for e in e_mentions:
+        #     e_m_begins.append(e.begin)
+
+        tokens = cas.select(ctakes_types.BaseToken)
+        sentence_tokens = get_covered_list(sentences, tokens)
+        # token_begins = []
+        # for t in tokens:
+        #     token_begins.append(t.begin)
+
+        i = 0
+        while i < len(sentences):
+            if len(sentence_events[i]) > 0:
+                print(time.ctime((time.time())), "Processing cnlp-transformers temporal on sentence",
+                      str(i), "of", str(len(sentences)), "...")
+                event_offsets = get_windowed_offsets(sentence_events[i], sentences[i].begin)
+                token_offsets = get_windowed_offsets(sentence_tokens[i], sentences[i].begin)
+                asyncio.run(self.temporal_caller(cas, sentences[i], sentence_events[i], event_offsets, token_offsets))
+            i += 1
+        print(time.ctime((time.time())), "cnlp-transformers temporal Done.")
+
+    async def init_caller(self):
+        await temporal_rest.startup_event()
+
+    async def temporal_caller(self, cas, sentence, event_mentions, event_offsets, token_offsets):
+
+        sentence_doc = temporal_rest.SentenceDocument(sentence.get_covered_text())
+        temporal_result = await temporal_rest.process_sentence(sentence_doc)
+
+        events_times = {}
+        i = 0
+        for t in temporal_result.timexes:
+            for tt in t:
+                first_token_offset = token_offsets[tt.begin]
+                last_token_offset = token_offsets[tt.end]
+                timex = create_type.add_type(cas, self.timex_type,
+                                             sentence.begin + first_token_offset[0],
+                                             sentence.begin + last_token_offset[1])
+                events_times['TIMEX-' + str(i)] = timex
+                i += 1
+
+        i = 0
+        for e in temporal_result.events:
+            for ee in e:
+                first_token_offset = token_offsets[ee.begin]
+                last_token_offset = token_offsets[ee.end]
+                event_mention = get_or_create_event_mention(cas, event_mentions,
+                                                            sentence.begin + first_token_offset[0],
+                                                            sentence.begin + last_token_offset[1])
+                event = create_event(cas, ee.dtr)
+                event_mention.event = event
+                events_times['EVENT-' + str(i)] = event_mention
+                i += 1
+
+        for r in temporal_result.relations:
+            for rr in r:
+                arg1 = self.argument_type()
+                arg1.argument = events_times[rr.arg1]
+                print("Arg1 =", events_times[rr.arg1])
+                arg2 = self.argument_type()
+                arg2.argument = events_times[rr.arg2]
+                print("Arg2 =", events_times[rr.arg2])
+                tlink = create_relation(self.tlink_type, rr.category, arg1, arg2)
+                cas.add(tlink)
+
+    def get_index_by_offsets(tokens, begin, end):
+        i = 0
+        for token in tokens:
+            if token.begin == begin and token.end == end:
+                return i
+            i += 1
+        return -1
+
+    def get_or_create_event_mention(cas, event_mentions, begin, end):
+        i = get_index_by_offsets(event_mentions, begin, end)
+        if i == -1:
+            return create_type.add_type(cas, self.event_mention_type, begin, end)
+        return event_mentions[i]
diff --git a/ctakes-pbj/src/main/java/org/apache/ctakes/pbj/ae/PbjJmsSender.java b/ctakes-pbj/src/main/java/org/apache/ctakes/pbj/ae/PbjJmsSender.java
index 3805e13..42d7266 100644
--- a/ctakes-pbj/src/main/java/org/apache/ctakes/pbj/ae/PbjJmsSender.java
+++ b/ctakes-pbj/src/main/java/org/apache/ctakes/pbj/ae/PbjJmsSender.java
@@ -39,7 +39,7 @@
  */
 @PipeBitInfo(
       name = "PbjJmsSender",
-      description = "Sends jcas to Artemis Queue using Stomp",
+      description = "Sends jcas to Artemis Queue using JMS",
       role = PipeBitInfo.Role.SPECIAL
 )
 public class PbjJmsSender extends PbjSender {
diff --git a/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/component/cas_annotator.py b/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/component/cas_annotator.py
index 5541e59..505a399 100644
--- a/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/component/cas_annotator.py
+++ b/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/component/cas_annotator.py
@@ -2,6 +2,15 @@
 
 
 class CasAnnotator(ABC):
+
+    # Called once at the build of a pipeline.
+    def declare_params(self, arg_parser):
+        pass
+
+    # Called once at the beginning of a pipeline, before initialize.
+    def init_params(self, arg_parser):
+        pass
+
     # Called once at the beginning of a pipeline.
     def initialize(self):
         pass
diff --git a/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/component/collection_reader.py b/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/component/collection_reader.py
new file mode 100644
index 0000000..3ae3de7
--- /dev/null
+++ b/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/component/collection_reader.py
@@ -0,0 +1,26 @@
+from abc import ABC, abstractmethod
+
+
+class CollectionReader(ABC):
+
+    # Set the pipeline.  The collection reader controls the pipeline flow.
+    @abstractmethod
+    def set_pipeline(self, pipeline):
+        pass
+
+    # Called once at the build of a pipeline.
+    def declare_params(self, arg_parser):
+        pass
+
+    # Called once at the beginning of a pipeline, before initialize.
+    def init_params(self, arg_parser):
+        pass
+
+    # Called once at the beginning of a pipeline.
+    def initialize(self):
+        pass
+
+    # Called start reading cas objects and pass them to the pipeline.
+    @abstractmethod
+    def start(self):
+        pass
diff --git a/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/component/pbj_receiver.py b/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/component/pbj_receiver.py
index 6e3936f..79df482 100644
--- a/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/component/pbj_receiver.py
+++ b/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/component/pbj_receiver.py
@@ -1,86 +1,43 @@
-import time
-from threading import Event
-import stomp
-from ctakes_pbj.type_system.type_system_loader import *
-from ctakes_pbj.pipeline.pbj_pipeline import STOP_MESSAGE
-from ctakes_pbj.pbj_tools import arg_parser
-
-args = arg_parser.get_args()
-
-exit_event = Event()
+from ctakes_pbj.component.collection_reader import CollectionReader
+from ctakes_pbj.pbj_tools import pbj_defaults
+from ctakes_pbj.pbj_tools.stomp_receiver import start_receiver
 
 
-def start_receiver(pipeline, queue_name=args.receive_queue, host_name=args.host_name, port_name=args.port_name,
-                   password=args.password, username=args.username):
-    PBJReceiver(pipeline, queue_name, host_name, port_name, password, username)
-    while not exit_event.is_set():
-        exit_event.wait()
+class PBJReceiver(CollectionReader):
 
+    def __init__(self):
+        self.port_name = None
+        self.host_name = None
+        self.queue_name = None
+        self.password = None
+        self.username = None
+        self.pipeline = None
+        self.receiving = False
+        self.stomp_receiver = None
 
-class PBJReceiver(stomp.ConnectionListener):
-
-    def __init__(self, pipeline, queue_name, host_name, port_name, password, username):
-        self.source_queue = queue_name
-        self.source_host = host_name
-        self.source_port = port_name
+    # Set the pipeline.  The collection reader controls the pipeline flow.
+    def set_pipeline(self, pipeline):
         self.pipeline = pipeline
-        self.password = password
-        self.username = username
-        self.id = '1'
-        self.typesystem = None
-        print(time.ctime((time.time())), "Starting PBJ Receiver on", self.source_host, self.source_queue, "...")
-        # Use a heartbeat of 10 minutes  (in milliseconds)
-        self.conn = stomp.Connection12([(self.source_host, self.source_port)],
-                                       keepalive=True, heartbeats=(600000, 600000))
-        self.conn.set_listener('PBJ_Receiver', self)
-        self.stop = False
-        self.__connect_and_subscribe()
 
-    def __connect_and_subscribe(self):
-        self.conn.connect(self.username, self.password, wait=True)
-        self.conn.subscribe(destination=self.source_queue, id=self.id, ack='auto')
-        # self.conn.subscribe(destination=self.source_queue, id=self.id, ack='client')
+    # Called once at the build of a pipeline.
+    def declare_params(self, arg_parser):
+        arg_parser.add_arg('receive_queue')
+        arg_parser.add_arg('-rh', '--receive_host', default=pbj_defaults.DEFAULT_HOST)
+        arg_parser.add_arg('-rpt', '--receive_port', default=pbj_defaults.DEFAULT_PORT)
+        arg_parser.add_arg('-ru', '--receive_user', default=pbj_defaults.DEFAULT_USER)
+        arg_parser.add_arg('-rp', '--receive_pass', default=pbj_defaults.DEFAULT_PASS)
 
-    def set_typesystem(self, typesystem):
-        self.typesystem = typesystem
+    # Called once at the beginning of a pipeline, before initialize.
+    def init_params(self, args):
+        self.queue_name = args.receive_queue
+        self.host_name = args.receive_host
+        self.port_name = args.receive_port
+        self.username = args.receive_user
+        self.password = args.receive_pass
 
-    def get_typesystem(self):
-        if self.typesystem is None:
-            # Load the typesystem
-            type_system_accessor = TypeSystemLoader()
-            type_system_accessor.load_type_system()
-            self.set_typesystem(type_system_accessor.get_type_system())
-        return self.typesystem
-
-    def set_host(self, host_name):
-        self.source_host = host_name
-
-    def set_stop(self, stop):
-        self.stop = stop
-
-    def stop_receiver(self):
-        self.conn.unsubscribe(destination=self.source_queue, id=self.id)
-        self.conn.disconnect()
-        print(time.ctime((time.time())), "Disconnected PBJ Receiver on", self.source_host, self.source_queue)
-        self.pipeline.collection_process_complete()
-        exit_event.set()
-
-    def on_message(self, frame):
-        if frame.body == STOP_MESSAGE:
-            print(time.ctime((time.time())), "Received Stop code.")
-            self.stop = True
-            # time.sleep(3)
-            self.stop_receiver()
-        else:
-            if XMI_INDICATOR in frame.body:
-                cas = cassis.load_cas_from_xmi(frame.body, self.get_typesystem())
-                self.pipeline.process(cas)
-            else:
-                print(time.ctime((time.time())), "Malformed Message:\n", frame.body)
-
-    def on_disconnected(self):
-        if self.stop is False:
-            self.__connect_and_subscribe()
-
-    def on_error(self, frame):
-        print(time.ctime((time.time())), "Receiver Error:", frame.body)
+    # Called start reading cas objects and pass them to the pipeline.
+    def start(self):
+        if not self.receiving:
+            self.receiving = True
+            start_receiver(self.pipeline, self.queue_name, self.host_name, self.port_name,
+                           self.password, self.username)
diff --git a/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/component/pbj_sender.py b/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/component/pbj_sender.py
index 1cdf0e9..ef3c4c0 100644
--- a/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/component/pbj_sender.py
+++ b/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/component/pbj_sender.py
@@ -1,34 +1,51 @@
 import stomp
 import time
 from ctakes_pbj.component import cas_annotator
-from ctakes_pbj.pipeline.pbj_pipeline import STOP_MESSAGE
-from ctakes_pbj.pbj_tools import arg_parser
-
-args = arg_parser.get_args()
+from ctakes_pbj.pbj_tools.pbj_defaults import *
 
 
 class PBJSender(cas_annotator.CasAnnotator):
 
-    def __init__(self, queue_name=args.send_queue, host_name=args.host_name, port_name=args.port_name,
-                 password=args.password, username=args.username):
+    def __init__(self):
+        self.target_queue = None
+        self.target_host = None
+        self.target_port = None
+        self.password = None
+        self.username = None
+        self.conn = None
 
-        self.target_queue = queue_name
-        self.target_host = host_name
-        self.target_port = port_name
-        self.password = password
-        self.username = username
+    # Called once at the build of a pipeline.
+    def declare_params(self, arg_parser):
+        arg_parser.add_arg('send_queue')
+        arg_parser.add_arg('-sh', '--send_host', default=DEFAULT_HOST)
+        arg_parser.add_arg('-spt', '--send_port', default=DEFAULT_PORT)
+        arg_parser.add_arg('-su', '--send_user', default=DEFAULT_USER)
+        arg_parser.add_arg('-sp', '--send_pass', default=DEFAULT_PASS)
+
+    # Called once at the beginning of a pipeline, before initialize.
+    def init_params(self, args):
+        self.target_queue = args.send_queue
+        self.target_host = args.send_host
+        self.target_port = args.send_port
+        self.username = args.send_user
+        self.password = args.send_pass
+
+    # Called once at the beginning of a pipeline.
+    def initialize(self):
         print(time.ctime((time.time())), "Starting PBJ Sender on", self.target_host, self.target_queue, "...")
         # Use a heartbeat of 10 minutes  (in milliseconds)
         self.conn = stomp.Connection12([(self.target_host, self.target_port)],
                                        keepalive=True, heartbeats=(600000, 600000))
         self.conn.connect(self.username, self.password, wait=True)
 
+    # Called for every cas passed through the pipeline.
     def process(self, cas):
         print(time.ctime((time.time())), "Sending processed information to",
               self.target_host, self.target_queue, "...")
         xmi = cas.to_xmi()
         self.conn.send(self.target_queue, xmi)
 
+    # Called once at the end of the pipeline.
     def collection_process_complete(self):
         self.send_stop()
 
diff --git a/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/examples/sentence_printer_pipeline.py b/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/examples/sentence_printer_pipeline.py
index 134c268..e7d6452 100644
--- a/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/examples/sentence_printer_pipeline.py
+++ b/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/examples/sentence_printer_pipeline.py
@@ -4,8 +4,7 @@
 # These are the lines that ignore the typesystem errors
 import warnings
 
-
-from ctakes_pbj.component.pbj_receiver import start_receiver
+from ctakes_pbj.component.pbj_receiver import PBJReceiver
 from ctakes_pbj.examples.sentence_printer import SentencePrinter
 from ctakes_pbj.component.pbj_sender import PBJSender
 from ctakes_pbj.pipeline.pbj_pipeline import PBJPipeline
@@ -17,10 +16,11 @@
 def main():
 
     pipeline = PBJPipeline()
+    pipeline.reader(PBJReceiver())
     pipeline.add(SentencePrinter())
     pipeline.add(PBJSender())
     pipeline.initialize()
-    start_receiver(pipeline)
+    pipeline.run()
 
 
 if __name__ == "__main__":
diff --git a/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/examples/word_finder_pipeline.py b/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/examples/word_finder_pipeline.py
index 666d65b..6864101 100644
--- a/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/examples/word_finder_pipeline.py
+++ b/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/examples/word_finder_pipeline.py
@@ -4,9 +4,9 @@
 # These are the lines that ignore the typesystem errors
 import warnings
 
-from ctakes_pbj.component.pbj_receiver import start_receiver
 from ctakes_pbj.examples.word_finder import WordFinder
 from ctakes_pbj.component.pbj_sender import PBJSender
+from ctakes_pbj.component.pbj_receiver import PBJReceiver
 from ctakes_pbj.pipeline.pbj_pipeline import PBJPipeline
 
 warnings.filterwarnings("ignore")
@@ -15,10 +15,11 @@
 def main():
 
     pipeline = PBJPipeline()
+    pipeline.reader(PBJReceiver())
     pipeline.add(WordFinder())
     pipeline.add(PBJSender())
     pipeline.initialize()
-    start_receiver(pipeline)
+    pipeline.run()
 
 
 main()
diff --git a/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/pbj_tools/arg_parser.py b/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/pbj_tools/arg_parser.py
index ea84ef7..f10a37f 100644
--- a/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/pbj_tools/arg_parser.py
+++ b/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/pbj_tools/arg_parser.py
@@ -1,28 +1,24 @@
 import argparse
-DEFAULT_HOST = 'localhost'
-DEFAULT_PORT = 61616
-DEFAULT_USER = 'guest'
-DEFAULT_PASS = 'guest'
-DEFAULT_OUT_DIR = 'pbj_output/'
 
 
-def get_args():
-    parser = argparse.ArgumentParser(
-        prog='pbj_sender.py',
-        description='Sends...',
-        epilog='Text at the bottom of help'
-    )
-    parser.add_argument('receive_queue')
-    parser.add_argument('send_queue')
-    parser.add_argument('-hn', '--host_name', default=DEFAULT_HOST)
-    parser.add_argument('-pn', '--port_name', default=DEFAULT_PORT)
-    parser.add_argument('-u', '--username', default=DEFAULT_USER)
-    parser.add_argument('-p', '--password', default=DEFAULT_PASS)
-    parser.add_argument('-o', '--output_dir', default=DEFAULT_OUT_DIR)
+class ArgParser:
 
-    parser.parse_args()
-    args = parser.parse_args()
+    def __init__(self):
+        self.arg_parser = None
 
-    return args
+    def get_arg_parser(self):
+        if self.arg_parser is None:
+            print('Creating arg_parser')
+            self.arg_parser = argparse.ArgumentParser(
+                prog='ctakes-pbj',
+                description='Does wonderful stuff...',
+                epilog='Text at the bottom of help'
+            )
+        return self.arg_parser
 
+    def add_arg(self, *args, **kwargs):
+        self.get_arg_parser().add_argument(*args, **kwargs)
 
+    def get_args(self):
+        print('Parsing Arguments')
+        return self.get_arg_parser().parse_args()
diff --git a/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/pbj_tools/pbj_defaults.py b/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/pbj_tools/pbj_defaults.py
new file mode 100644
index 0000000..6e8acb8
--- /dev/null
+++ b/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/pbj_tools/pbj_defaults.py
@@ -0,0 +1,6 @@
+DEFAULT_HOST = 'localhost'
+DEFAULT_PORT = 61616
+DEFAULT_USER = 'guest'
+DEFAULT_PASS = 'guest'
+DEFAULT_OUT_DIR = 'pbj_output/'
+STOP_MESSAGE = "Apache cTAKES PBJ Stop Message."
diff --git a/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/pbj_tools/stomp_receiver.py b/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/pbj_tools/stomp_receiver.py
new file mode 100644
index 0000000..384827f
--- /dev/null
+++ b/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/pbj_tools/stomp_receiver.py
@@ -0,0 +1,84 @@
+import time
+from threading import Event
+import stomp
+from ctakes_pbj.type_system.type_system_loader import *
+from ctakes_pbj.pbj_tools.pbj_defaults import STOP_MESSAGE
+
+
+exit_event = Event()
+
+
+def start_receiver(pipeline, queue_name, host_name, port_name,
+                   password, username, r_id='1'):
+    StompReceiver(pipeline, queue_name, host_name, port_name, password, username, r_id)
+    while not exit_event.is_set():
+        exit_event.wait()
+
+
+class StompReceiver(stomp.ConnectionListener):
+
+    def __init__(self, pipeline, queue_name, host_name, port_name, password, username, r_id):
+        self.source_queue = queue_name
+        self.source_host = host_name
+        self.source_port = port_name
+        self.pipeline = pipeline
+        self.password = password
+        self.username = username
+        self.r_id = r_id
+        self.typesystem = None
+        print(time.ctime((time.time())), "Starting Stomp Receiver on", self.source_host, self.source_queue, "...")
+        # Use a heartbeat of 10 minutes  (in milliseconds)
+        self.conn = stomp.Connection12([(self.source_host, self.source_port)],
+                                       keepalive=True, heartbeats=(600000, 600000))
+        self.conn.set_listener('Stomp_Receiver', self)
+        self.stop = False
+        self.__connect_and_subscribe()
+
+    def __connect_and_subscribe(self):
+        self.conn.connect(self.username, self.password, wait=True)
+        self.conn.subscribe(destination=self.source_queue, id=self.r_id, ack='auto')
+        # self.conn.subscribe(destination=self.source_queue, id=self.id, ack='client')
+
+    def set_typesystem(self, typesystem):
+        self.typesystem = typesystem
+
+    def get_typesystem(self):
+        if self.typesystem is None:
+            # Load the typesystem
+            type_system_accessor = TypeSystemLoader()
+            type_system_accessor.load_type_system()
+            self.set_typesystem(type_system_accessor.get_type_system())
+        return self.typesystem
+
+    def set_host(self, host_name):
+        self.source_host = host_name
+
+    def set_stop(self, stop):
+        self.stop = stop
+
+    def stop_receiver(self):
+        self.conn.unsubscribe(destination=self.source_queue, id=self.r_id)
+        self.conn.disconnect()
+        print(time.ctime((time.time())), "Disconnected Stomp Receiver on", self.source_host, self.source_queue)
+        self.pipeline.collection_process_complete()
+        exit_event.set()
+
+    def on_message(self, frame):
+        if frame.body == STOP_MESSAGE:
+            print(time.ctime((time.time())), "Received Stop code.")
+            self.stop = True
+            # time.sleep(3)
+            self.stop_receiver()
+        else:
+            if XMI_INDICATOR in frame.body:
+                cas = cassis.load_cas_from_xmi(frame.body, self.get_typesystem())
+                self.pipeline.process(cas)
+            else:
+                print(time.ctime((time.time())), "Malformed Message:\n", frame.body)
+
+    def on_disconnected(self):
+        if self.stop is False:
+            self.__connect_and_subscribe()
+
+    def on_error(self, frame):
+        print(time.ctime((time.time())), "Receiver Error:", frame.body)
diff --git a/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/pipeline/pbj_pipeline.py b/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/pipeline/pbj_pipeline.py
index c61034f..0783453 100644
--- a/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/pipeline/pbj_pipeline.py
+++ b/ctakes-pbj/src/user/resources/org/apache/ctakes/pbj/ctakes_pbj_py/src/ctakes_pbj/pipeline/pbj_pipeline.py
@@ -1,4 +1,5 @@
-STOP_MESSAGE = "Apache cTAKES PBJ Stop Message."
+from ctakes_pbj.pbj_tools import pbj_defaults
+from ctakes_pbj.pbj_tools.arg_parser import ArgParser
 
 
 class PBJPipeline:
@@ -6,23 +7,53 @@
     def __init__(self):
         self.annotators = []
         self.initialized = False
+        self.c_reader = None
+        self.arg_parser = ArgParser()
 
-    def add(self, cas_processor):
-        self.annotators.append(cas_processor)
+    # Set the Collection Reader for the Corpus.
+    # This is absolutely necessary.  If you don't tell the pipeline how to get the notes ...
+    def reader(self, collection_reader):
+        collection_reader.declare_params(self.arg_parser)
+        collection_reader.set_pipeline(self)
+        self.c_reader = collection_reader
 
+    # Add an annotator to the pipeline.
+    def add(self, cas_annotator):
+        cas_annotator.declare_params(self.arg_parser)
+        self.annotators.append(cas_annotator)
+
+    # Fill command line parameters, then call each annotator to initialize.
     def initialize(self):
-        for processor in self.annotators:
-            self.initialized = True
-            processor.initialize()
+        if self.c_reader is None:
+            print('No Reader Specified, quitting')
+            exit(1)
+        self.arg_parser.add_arg('-o', '--output_dir', default=pbj_defaults.DEFAULT_OUT_DIR)
+        # Get/Init all of the declared parameter arguments.
+        # Do the actual argument parsing.
+        # If get_args has already been called then added parameters will crash the tool.
+        args = self.arg_parser.get_args()
+        # Set the necessary parameters in the collection reader.
+        self.c_reader.init_params(args)
+        # For each annotator set the necessary parameters.
+        for annotator in self.annotators:
+            annotator.init_params(args)
+        # For each annotator initialize resources, etc.
+        for annotator in self.annotators:
+            annotator.initialize()
+        self.initialized = True
 
-    def process(self, cas):
-        if self.initialized:
-            for processor in self.annotators:
-                processor.process(cas)
-        else:
+    # Starts / Runs the pipeline.  This calls start on the collection reader.
+    def run(self):
+        if not self.initialized:
             self.initialize()
-            self.process(cas)
+        self.c_reader.start()
 
+    # For a new cas, call each annotator to process that cas.
+    def process(self, cas):
+        for annotator in self.annotators:
+            annotator.process(cas)
+
+    # At the end of the corpus, call each annotator for cleanup, etc.
     def collection_process_complete(self):
-        for processor in self.annotators:
-            processor.collection_process_complete()
+        for annotator in self.annotators:
+            annotator.collection_process_complete()