remove ingestion order references
diff --git a/collection_manager/collection_manager/entities/Collection.py b/collection_manager/collection_manager/entities/Collection.py
index 92fc56d..d033c69 100644
--- a/collection_manager/collection_manager/entities/Collection.py
+++ b/collection_manager/collection_manager/entities/Collection.py
@@ -19,15 +19,15 @@
date_to = datetime.fromisoformat(properties['to']) if 'to' in properties else None
date_from = datetime.fromisoformat(properties['from']) if 'from' in properties else None
- new_order = Collection(dataset_id=properties['id'],
- variable=properties['variable'],
- path=properties['path'],
- historical_priority=properties['priority'],
- forward_processing_priority=properties.get('forward_processing_priority',
- properties['priority']),
- date_to=date_to,
- date_from=date_from)
- return new_order
+ collection = Collection(dataset_id=properties['id'],
+ variable=properties['variable'],
+ path=properties['path'],
+ historical_priority=properties['priority'],
+ forward_processing_priority=properties.get('forward_processing_priority',
+ properties['priority']),
+ date_to=date_to,
+ date_from=date_from)
+ return collection
def directory(self):
if os.path.isdir(self.path):
diff --git a/collection_manager/collection_manager/main.py b/collection_manager/collection_manager/main.py
index 3cec804..fa4d7ce 100644
--- a/collection_manager/collection_manager/main.py
+++ b/collection_manager/collection_manager/main.py
@@ -15,8 +15,8 @@
parser.add_argument("--refresh",
help="refresh interval in seconds to check for new or updated granules",
default=300)
- parser.add_argument("--local-ingestion-orders",
- help="path to local ingestion orders file",
+ parser.add_argument("--collections",
+ help="path to collections configuration file",
required=True)
parser.add_argument('--rabbitmq_host',
default='localhost',
@@ -53,13 +53,13 @@
password=options.rabbitmq_password,
queue=options.rabbitmq_queue)
publisher.connect()
- order_executor = CollectionProcessor(message_publisher=publisher,
- history_manager_builder=history_manager_builder)
- collections_watcher = CollectionWatcher(collections_path=options.local_ingestion_orders,
- collection_updated_callback=order_executor.process_collection,
- granule_updated_callback=order_executor.process_granule)
+ collection_processor = CollectionProcessor(message_publisher=publisher,
+ history_manager_builder=history_manager_builder)
+ collection_watcher = CollectionWatcher(collections_path=options.collections,
+ collection_updated_callback=collection_processor.process_collection,
+ granule_updated_callback=collection_processor.process_granule)
- collections_watcher.start_watching()
+ collection_watcher.start_watching()
while True:
time.sleep(1)
diff --git a/collection_manager/collection_manager/services/CollectionWatcher.py b/collection_manager/collection_manager/services/CollectionWatcher.py
index 1c8afca..912ddad 100644
--- a/collection_manager/collection_manager/services/CollectionWatcher.py
+++ b/collection_manager/collection_manager/services/CollectionWatcher.py
@@ -42,27 +42,27 @@
"""
return list(self._collections.values())
- def _load_orders(self):
+ def _load_collections(self):
try:
with open(self._collections_path, 'r') as f:
- orders_yml = yaml.load(f, Loader=yaml.FullLoader)
- new_ingestion_orders = {}
- for _, ingestion_order in orders_yml.items():
- new_order = Collection.from_dict(ingestion_order)
- directory = new_order.directory()
+ collections_yaml = yaml.load(f, Loader=yaml.FullLoader)
+ new_collections = {}
+ for _, collection_dict in collections_yaml.items():
+ collection = Collection.from_dict(collection_dict)
+ directory = collection.directory()
if directory == os.path.dirname(self._collections_path):
- logger.error(f"Ingestion order {new_order.dataset_id} uses granule directory {new_order.path} "
- f"which is the same directory as the collections file, {self._collections_path}. The "
- f"collections file cannot share a directory with the granules. Ignoring ingestion "
- f"order {new_order.dataset_id} for now.")
- if directory in new_ingestion_orders:
- logger.error(f"Ingestion order {new_order.dataset_id} uses granule directory {directory} "
- f"which conflicts with ingestion order {new_ingestion_orders[directory].dataset_id}."
- f" Ignoring {new_order.dataset_id}.")
+ logger.error(f"Collection {collection.dataset_id} uses granule directory {collection.path} "
+ f"which is the same directory as the collection configuration file, "
+ f"{self._collections_path}. The granules need to be in their own directory. "
+ f"Ignoring collection {collection.dataset_id} for now.")
+ if directory in new_collections:
+ logger.error(f"Ingestion order {collection.dataset_id} uses granule directory {directory} "
+ f"which conflicts with ingestion order {new_collections[directory].dataset_id}."
+ f" Ignoring {collection.dataset_id}.")
else:
- new_ingestion_orders[directory] = new_order
+ new_collections[directory] = collection
- self._collections = new_ingestion_orders
+ self._collections = new_collections
except FileNotFoundError:
logger.error(f"Collection configuration file not found at {self._collections}.")
except yaml.scanner.ScannerError:
@@ -70,16 +70,16 @@
f"after the next configuration change.")
def _refresh(self):
- for updated_order in self._get_updated_orders():
- self._collection_updated(updated_order)
+ for collection in self._get_updated_collections():
+ self._collection_updated(collection)
self._unschedule_watches()
self._schedule_watches()
- def _get_updated_orders(self) -> List[Collection]:
- old_orders = self.collections()
- self._load_orders()
- return list(set(self.collections()) - set(old_orders))
+ def _get_updated_collections(self) -> List[Collection]:
+ old_collections = self.collections()
+ self._load_collections()
+ return list(set(self.collections()) - set(old_collections))
def _unschedule_watches(self):
for directory, watch in self._watches.items():
@@ -87,9 +87,9 @@
self._watches.clear()
def _schedule_watches(self):
- for ingestion_order in self.collections():
- granule_event_handler = _GranuleEventHandler(self._granule_updated, ingestion_order)
- directory = ingestion_order.directory()
+ for collection in self.collections():
+ granule_event_handler = _GranuleEventHandler(self._granule_updated, collection)
+ directory = collection.directory()
if directory not in self._watches:
self._watches[directory] = self._observer.schedule(granule_event_handler, directory)
@@ -114,11 +114,11 @@
EventHandler that watches for new or modified granule files.
"""
- def __init__(self, granule_updated: Callable[[str, Collection], any], ingestion_order: Collection):
+ def __init__(self, granule_updated: Callable[[str, Collection], any], collection: Collection):
self._granule_updated = granule_updated
- self._ingestion_order = ingestion_order
+ self._collection = collection
def on_created(self, event):
super().on_created(event)
- if self._ingestion_order.owns_file(event.src_path):
- self._granule_updated(event.src_path, self._ingestion_order)
+ if self._collection.owns_file(event.src_path):
+ self._granule_updated(event.src_path, self._collection)