other changes with retry policy for ingestion
diff --git a/granule_ingester/docker/Dockerfile b/granule_ingester/docker/Dockerfile
index 6f9d525..6d7414b 100644
--- a/granule_ingester/docker/Dockerfile
+++ b/granule_ingester/docker/Dockerfile
@@ -23,5 +23,5 @@
RUN apk del .build-deps
USER 1001
-
+ENV OPENBLAS_NUM_THREADS=1
ENTRYPOINT ["/bin/sh", "/entrypoint.sh"]
diff --git a/granule_ingester/granule_ingester/pipeline/Pipeline.py b/granule_ingester/granule_ingester/pipeline/Pipeline.py
index 59f02a0..6c32bf6 100644
--- a/granule_ingester/granule_ingester/pipeline/Pipeline.py
+++ b/granule_ingester/granule_ingester/pipeline/Pipeline.py
@@ -101,7 +101,7 @@
self._slicer = slicer
self._data_store_factory = data_store_factory
self._metadata_store_factory = metadata_store_factory
- self._max_concurrency = max_concurrency
+ self._max_concurrency: int = max_concurrency
# Create a SyncManager so that we can to communicate exceptions from the
# worker processes back to the main process.
@@ -188,8 +188,8 @@
self._data_store_factory,
self._metadata_store_factory,
shared_memory),
- maxtasksperchild=self._max_concurrency,
- childconcurrency=self._max_concurrency) as pool:
+ maxtasksperchild=int(self._max_concurrency),
+ childconcurrency=int(self._max_concurrency)) as pool:
serialized_tiles = [nexusproto.NexusTile.SerializeToString(tile) for tile in
self._slicer.generate_tiles(dataset, granule_name)]
# aiomultiprocess is built on top of the stdlib multiprocessing library, which has the limitation that
diff --git a/granule_ingester/granule_ingester/writers/CassandraStore.py b/granule_ingester/granule_ingester/writers/CassandraStore.py
index 514d12f..505b56a 100644
--- a/granule_ingester/granule_ingester/writers/CassandraStore.py
+++ b/granule_ingester/granule_ingester/writers/CassandraStore.py
@@ -17,6 +17,7 @@
import asyncio
import logging
import uuid
+import time
from cassandra.auth import PlainTextAuthProvider
from cassandra.cluster import Cluster, Session, NoHostAvailable
@@ -27,6 +28,7 @@
from granule_ingester.exceptions import CassandraFailedHealthCheckError, CassandraLostConnectionError
from granule_ingester.writers.DataStore import DataStore
+from granule_ingester.writers.CassandraStoreConnectionRetryPolicy import CassandraStoreConnectionRetryPolicy
logging.getLogger('cassandra').setLevel(logging.INFO)
logger = logging.getLogger(__name__)
@@ -67,10 +69,13 @@
#load_balancing_policy=DCAwareRoundRobinPolicy("dc1"),
protocol_version=4,
reconnection_policy=ConstantReconnectionPolicy(delay=5.0),
- default_retry_policy=RetryPolicy(),
+ default_retry_policy=CassandraStoreConnectionRetryPolicy(),
auth_provider=auth_provider)
+
session = cluster.connect()
session.set_keyspace('nexustiles')
+ session.default_timeout = 60
+
return session
def connect(self):
@@ -80,7 +85,7 @@
if self._session:
self._session.shutdown()
- async def save_data(self, tile: NexusTile) -> None:
+ async def save_data(self, tile: NexusTile, max_num_try=6, num_try=0) -> None:
try:
tile_id = uuid.UUID(tile.summary.tile_id)
serialized_tile_data = TileData.SerializeToString(tile.tile)
@@ -89,12 +94,18 @@
await self._execute_query_async(self._session, prepared_query,
[tile_id, bytearray(serialized_tile_data)])
except Exception as e:
- logger.error("exception while uploading tile data on cassandra %s", e)
- raise CassandraLostConnectionError(f"Lost connection to Cassandra, and cannot save tiles.")
+ if max_num_try >= num_try:
+ time.sleep(2**num_try)
+ logger.warning("exception while uploading tile data on cassandra %s, retry once more", e)
+ await self.save_data(tile, max_num_try=max_num_try, num_try=num_try+1)
+ else:
+ logger.error("exception while uploading tile data on cassandra %s, second attempt", e)
+ raise CassandraLostConnectionError(f"Lost connection to Cassandra, and cannot save tiles.")
+
@staticmethod
async def _execute_query_async(session: Session, query, parameters=None):
- cassandra_future = session.execute_async(query, parameters)
+ cassandra_future = session.execute_async(query, parameters, timeout=6000)
asyncio_future = asyncio.Future()
cassandra_future.add_callbacks(asyncio_future.set_result, asyncio_future.set_exception)
return await asyncio_future
diff --git a/granule_ingester/granule_ingester/writers/CassandraStoreConnectionRetryPolicy.py b/granule_ingester/granule_ingester/writers/CassandraStoreConnectionRetryPolicy.py
new file mode 100644
index 0000000..4318dc2
--- /dev/null
+++ b/granule_ingester/granule_ingester/writers/CassandraStoreConnectionRetryPolicy.py
@@ -0,0 +1,31 @@
+import logging
+from cassandra.policies import RetryPolicy
+from cassandra import WriteType as WT
+
+WriteType = WT
+
+logging.getLogger('cassandra').setLevel(logging.DEBUG)
+logger = logging.getLogger(__name__)
+
+
+class CassandraStoreConnectionRetryPolicy(RetryPolicy):
+
+ def on_write_timeout(self, query, consistency, write_type,
+ required_responses, received_responses, retry_num):
+ """
+ By default, failed write operations will retried at most once, and
+ they will only be retried if the `write_type` was
+ :attr:`~.WriteType.BATCH_LOG or SIMPLE`.
+ """
+ logger.debug("Write timeout policy applied num retry %i, write_type %i", retry_num, write_type)
+ if retry_num != 0:
+ return self.RETHROW, None
+ elif write_type == WriteType.BATCH_LOG or write_type == WriteType.SIMPLE:
+ return self.RETRY, consistency
+ else:
+ return self.RETHROW, None
+
+
+
+
+
diff --git a/granule_ingester/granule_ingester/writers/SolrStore.py b/granule_ingester/granule_ingester/writers/SolrStore.py
index 42ca066..3da6370 100644
--- a/granule_ingester/granule_ingester/writers/SolrStore.py
+++ b/granule_ingester/granule_ingester/writers/SolrStore.py
@@ -17,6 +17,7 @@
import functools
import json
import logging
+import time
from asyncio import AbstractEventLoop
from datetime import datetime
from pathlib import Path
@@ -57,10 +58,10 @@
"""
try:
- logger.info("getting solr configuration from zookeeper, node '%s'", parent_nodes[0])
+ logger.debug("getting solr configuration from zookeeper, node '%s'", parent_nodes[0])
return parent_nodes[0], zk.zk.get_children(parent_nodes[0])
except NoNodeError:
- logger.info("solr configuration not found in node '%s'", parent_nodes[0])
+ logger.debug("solr configuration not found in node '%s'", parent_nodes[0])
if len(parent_nodes)>1:
return self._get_collections(zk, parent_nodes[1:])
else:
@@ -111,11 +112,18 @@
await self._save_document(solr_doc)
@run_in_executor
- def _save_document(self, doc: dict):
+ def _save_document(self, doc: dict, max_num_try=6, num_try=0):
try:
self._solr.add([doc])
- except pysolr.SolrError:
- raise SolrLostConnectionError("Lost connection to Solr, and cannot save tiles.")
+ except pysolr.SolrError as e:
+ if max_num_try >= num_try :
+ time.sleep(2**num_try)
+ logger.warning("Lost connection to Solr, %s, retry once more", e)
+ self._save_document(doc,
+ max_num_try=max_num_try,
+ num_try=num_try+1)
+ else:
+ raise SolrLostConnectionError("Lost connection to Solr, and cannot save tiles.")
def _build_solr_doc(self, tile: NexusTile) -> Dict:
summary: TileSummary = tile.summary
diff --git a/granule_ingester/granule_ingester/writers/__init__.py b/granule_ingester/granule_ingester/writers/__init__.py
index 9323d8c..c9f30e0 100644
--- a/granule_ingester/granule_ingester/writers/__init__.py
+++ b/granule_ingester/granule_ingester/writers/__init__.py
@@ -1,4 +1,5 @@
from granule_ingester.writers.DataStore import DataStore
from granule_ingester.writers.MetadataStore import MetadataStore
from granule_ingester.writers.SolrStore import SolrStore
+from granule_ingester.writers.CassandraStoreConnectionRetryPolicy import CassandraStoreConnectionRetryPolicy
from granule_ingester.writers.CassandraStore import CassandraStore