add option --forward-processing
diff --git a/bin/run_collections b/bin/run_collections
index 9406031..72c4ca2 100644
--- a/bin/run_collections
+++ b/bin/run_collections
@@ -19,6 +19,12 @@
                                            "and other configuration files (list of ingestion streams)",
                     default=os.path.join(sys.prefix, ".sdap_ingest_manager"))
 
+parser.add_argument("-fp", "--forward-processing",
+                    help="when configured, will only consider granules updated after the"
+                         " update time of the latest ingested granule (the older are ignored)",
+                    action='store_true',
+                    default=False)
+
 options = parser.parse_args()
 
 config = collections_ingester.read_local_configuration(config_path=options.config)
@@ -26,6 +32,8 @@
 
 def collection_row_callback(row):
 
+    row['forward_processing'] = options.forward_processing
+
     collections_ingester.collection_row_callback(row,
                                                 full_path(config.get("OPTIONS", "collection_config_template")),
                                                 full_path(config.get("LOCAL_PATHS", "granule_file_list_path")),
diff --git a/bin/run_single_collection b/bin/run_single_collection
index b44a1a4..c6a03b8 100755
--- a/bin/run_single_collection
+++ b/bin/run_single_collection
@@ -13,13 +13,13 @@
 
 
 def main():
-
     parser = argparse.ArgumentParser(
-        description='Ingest one collection of data granules',
+        description='Ingest one collection of data granules'
+                    'Examples '
+                    ' % run_single_collection -d avhrr-oi-analysed-sst -p sdap_ingest_manager/collections_ingester/test/data/avhrr_oi -v analysed_sst  --from 2020-03-22 --to 2020-04-23'
+                    ' % run_single_collection -d avhrr-oi-analysed-sst -p sdap_ingest_manager/collections_ingester/test/data/avhrr_oi -v analysed_sst  --forward-processing',
         formatter_class=argparse.ArgumentDefaultsHelpFormatter)
 
-    # example
-    # % run_single_collection -d avhrr-oi-analysed-sst -p sdap_ingest_manager/collections_ingester/test/data/avhrr_oi -v analysed_sst  --from 2020-03-22 --to 2020-04-23
     parser.add_argument('-d', '--id',
                         help='The dataset id in nexus, if it is not created yet a new dataset will be created'
                              'this must contain only [a-z], [0-9] and -',
@@ -40,21 +40,33 @@
                         required=True,
                         metavar='analyzed_sst')
 
+    #subset_group_mutex = parser.add_mutually_exclusive_group(required=False)
+    #update_time_group = subset_group_mutex.add_argument_group()
+    #forward_processing_group = subset_group_mutex.add_argument_group()
     parser.add_argument('-f', '--from',
-                        help='The earliest modification time of a granule file'
-                             ' which will be ingested. Uses file system time.'
-                             ' format is ISO8601, date only, for example "2020-02-24"',
-                        required=False,
-                        metavar="2020-02-24")
+                                   help='The earliest modification time of a granule file'
+                                        ' which will be ingested. Uses file system time.'
+                                        ' format is ISO8601, date only, for example "2020-02-24"',
+                                   required=False,
+                                   metavar="2020-02-24")
 
     parser.add_argument('-t', '--to',
-                        help='The latest modification time of a granule file'
-                             ' which will be ingested. Uses file system time.'
-                             ' format is ISO8601, date only, for example "2020-02-24"',
-                        required=False,
-                        metavar="2020-03-24")
-    parser.add_argument("-c", "--config", help="configuration directory which contains the sdap_ingest_manager.ini file"
-                                               "and other configuration files (list of ingestion streams)",
+                                   help='The latest modification time of a granule file'
+                                        ' which will be ingested. Uses file system time.'
+                                        ' format is ISO8601, date only, for example "2020-02-24"',
+                                   required=False,
+                                   metavar="2020-03-24")
+
+    parser.add_argument("-fp", "--forward-processing",
+                                          help="when configured, will only consider granules updated after the"
+                                               " update time of the latest ingested granule (the older are ignored)."
+                                               " When set, --from and --to options are ignored",
+                                          action='store_true',
+                                          default=False)
+
+    parser.add_argument("-c", "--config",
+                        help="configuration directory which contains the sdap_ingest_manager.ini file"
+                             "and other configuration files (list of ingestion streams)",
                         default=os.path.join(sys.prefix, ".sdap_ingest_manager"))
 
     the_args = parser.parse_args()
@@ -63,7 +75,7 @@
     config = collections_ingester.read_local_configuration(config_path=the_args.config)
 
     collection['path'] = os.path.join(os.getcwd(),
-                                 collection['path'])
+                                      collection['path'])
 
     collections_ingester.collection_row_callback(collection,
                                                  full_path(config.get("OPTIONS", "collection_config_template")),
diff --git a/sdap_ingest_manager/__init__.py b/sdap_ingest_manager/__init__.py
index 8ec55b4..db04118 100644
--- a/sdap_ingest_manager/__init__.py
+++ b/sdap_ingest_manager/__init__.py
@@ -1 +1 @@
-__version__ = "0.3.0rc1"
\ No newline at end of file
+__version__ = "0.3.0rc2"
\ No newline at end of file
diff --git a/sdap_ingest_manager/collections_ingester/collection_ingestion.py b/sdap_ingest_manager/collections_ingester/collection_ingestion.py
index 166f326..24f18ab 100644
--- a/sdap_ingest_manager/collections_ingester/collection_ingestion.py
+++ b/sdap_ingest_manager/collections_ingester/collection_ingestion.py
@@ -69,7 +69,9 @@
 
 
 def create_granule_list(file_path_pattern, dataset_ingestion_history_manager,
-                        granule_list_file_path, deconstruct_nfs=False, date_from=None, date_to=None):
+                        granule_list_file_path, deconstruct_nfs=False,
+                        date_from=None, date_to=None,
+                        forward_processing=False):
     """ Creates a granule list file from a file path pattern
         matching the granules.
         If a granules has already been ingested with same md5sum signature, it is not included in this list.
@@ -84,8 +86,16 @@
     logger.info("Granule list file created in directory %s", dir_path)
     Path(dir_path).mkdir(parents=True, exist_ok=True)
 
-    timestamp_from = date_from.timestamp() if date_from else None
-    timestamp_to = date_to.timestamp() if date_to else None
+    if forward_processing:
+        if dataset_ingestion_history_manager:
+            timestamp_from = dataset_ingestion_history_manager.get_latest_ingested_file_update()
+        if dataset_ingestion_history_manager is None or timestamp_from is None:
+            logger.info("No ingestion history available, forward processing ignored")
+            timestamp_from = None
+        timestamp_to = None
+    else:
+        timestamp_from = date_from.timestamp() if date_from else None
+        timestamp_to = date_to.timestamp() if date_to else None
 
     if deconstruct_nfs:
         mount_points = nfs_mount_parse.get_nfs_mount_points()
@@ -140,6 +150,11 @@
     netcdf_variable = collection['variable']
     netcdf_file_pattern = collection['path']
 
+    if 'forward_processing' in collection.keys():
+        forward_processing = collection['forward_processing']
+    else:
+        forward_processing = False
+
     granule_list_file_path = os.path.join(granule_file_list_root_path,
                                           f'{dataset_id}-granules.lst')
     dataset_ingestion_history_manager = sdap_ingest_manager.history_manager\
@@ -157,7 +172,8 @@
                         dataset_ingestion_history_manager,
                         granule_list_file_path,
                         deconstruct_nfs=deconstruct_nfs,
-                        **time_range)
+                        **time_range,
+                        forward_processing=forward_processing)
 
     dataset_configuration_file_path = os.path.join(dataset_configuration_root_path,
                                                    f'{dataset_id}-config.yml')
diff --git a/sdap_ingest_manager/collections_ingester/test/unit/collection_ingestion_test.py b/sdap_ingest_manager/collections_ingester/test/unit/collection_ingestion_test.py
index f518e01..1089c69 100644
--- a/sdap_ingest_manager/collections_ingester/test/unit/collection_ingestion_test.py
+++ b/sdap_ingest_manager/collections_ingester/test/unit/collection_ingestion_test.py
@@ -97,6 +97,22 @@
         os.remove(self.target_granule_list_file)
 
 
+    def test_create_granule_list_time_forward_processing(self):
+        logger.info("test create_granule_list with time range")
+        collection_ingestion.create_granule_list(self.granule_file_pattern,
+                                                 None,
+                                                 self.target_granule_list_file,
+                                                 forward_processing=True)
+        line_number = 0
+        with open(self.target_granule_list_file, 'r') as f:
+            for _ in f:
+                line_number += 1
+
+        self.assertGreaterEqual(line_number, 0)
+
+        os.remove(self.target_granule_list_file)
+
+
 
 
     def test_create_granule_list_no_history(self):
diff --git a/sdap_ingest_manager/history_manager/datasetingestionhistoryfile.py b/sdap_ingest_manager/history_manager/datasetingestionhistoryfile.py
index 21add6c..3e15e8a 100644
--- a/sdap_ingest_manager/history_manager/datasetingestionhistoryfile.py
+++ b/sdap_ingest_manager/history_manager/datasetingestionhistoryfile.py
@@ -14,6 +14,8 @@
     _history_file_path = None
     _history_file = None
     _history_dict = {}
+    _latest_ingested_file_update_file_path = None
+    _latest_ingested_file_update = None
 
     def __init__(self, history_path, dataset_id, signature_fun):
         """
@@ -30,6 +32,14 @@
         Path(history_path).mkdir(parents=True, exist_ok=True)
         self._history_file = open(f"{self._history_file_path}", 'a')
 
+        self._latest_ingested_file_update_file_path = os.path.join(history_path, f'{dataset_id}.ts')
+        if os.path.exists(self._latest_ingested_file_update_file_path):
+            logger.info(f"read latest ingested file update date from {self._latest_ingested_file_update_file_path}")
+            with open(self._latest_ingested_file_update_file_path, 'r') as f_ts:
+                self._latest_ingested_file_update = float(f_ts.readline())
+        else:
+            self._latest_ingested_file_update = None
+
     def _load_history_dict(self):
         logger.info(f"loading history file {self._history_file_path}")
         try:
@@ -46,8 +56,14 @@
     def __del__(self):
         self._history_file.close()
         self._purge()
+        self._save_latest_timestamp()
         del self._history_dict
 
+    def  _save_latest_timestamp(self):
+        if self._latest_ingested_file_update:
+            with open(self._latest_ingested_file_update_file_path, 'w') as f_ts:
+                f_ts.write(f'{str(self._latest_ingested_file_update)}\n')
+
     def _purge(self):
         logger.info("purge the history file from duplicates")
         unique_file_names = set()
@@ -85,6 +101,13 @@
         md5sum = self._signature_fun(file_path)
         self._push_record(file_name, md5sum)
 
+        if self._latest_ingested_file_update:
+            self._latest_ingested_file_update = max(self._latest_ingested_file_update,
+                                                    os.path.getmtime(file_path))
+        else:
+            self._latest_ingested_file_update = os.path.getmtime(file_path)
+
+
     def _get_md5sum(self, file_name):
         if file_name in self._history_dict.keys():
             return self._history_dict[file_name]
@@ -97,3 +120,6 @@
         md5sum = self._signature_fun(file_path)
         logger.debug(f"compare {md5sum} with {self._get_md5sum(file_name)}")
         return md5sum == self._get_md5sum(file_name)
+
+    def get_latest_ingested_file_update(self):
+        return self._latest_ingested_file_update