SDAP-278: Add Cassandra authentication support, remove hardcoded http:// prefix in Solr connector (#106)
diff --git a/analysis/webservice/algorithms/Capabilities.py b/analysis/webservice/algorithms/Capabilities.py
index f507587..fa85a7c 100644
--- a/analysis/webservice/algorithms/Capabilities.py
+++ b/analysis/webservice/algorithms/Capabilities.py
@@ -29,9 +29,6 @@
params = {}
singleton = True
- def __init__(self):
- NexusCalcHandler.__init__(self)
-
def calc(self, computeOptions, **args):
capabilities = []
diff --git a/analysis/webservice/algorithms/CorrelationMap.py b/analysis/webservice/algorithms/CorrelationMap.py
index 1726412..1d8a0ad 100644
--- a/analysis/webservice/algorithms/CorrelationMap.py
+++ b/analysis/webservice/algorithms/CorrelationMap.py
@@ -41,9 +41,6 @@
}
singleton = True
- def __init__(self):
- NexusCalcHandler.__init__(self)
-
def calc(self, computeOptions, **args):
minLat = computeOptions.get_min_lat()
maxLat = computeOptions.get_max_lat()
diff --git a/analysis/webservice/algorithms/DailyDifferenceAverage.py b/analysis/webservice/algorithms/DailyDifferenceAverage.py
index 1b4d642..0ffd83b 100644
--- a/analysis/webservice/algorithms/DailyDifferenceAverage.py
+++ b/analysis/webservice/algorithms/DailyDifferenceAverage.py
@@ -80,9 +80,6 @@
}
singleton = True
- def __init__(self):
- NexusCalcHandler.__init__(self, skipCassandra=True)
-
def calc(self, request, **args):
min_lat, max_lat, min_lon, max_lon = request.get_min_lat(), request.get_max_lat(), request.get_min_lon(), request.get_max_lon()
dataset1 = request.get_argument("ds1", None)
diff --git a/analysis/webservice/algorithms/DataInBoundsSearch.py b/analysis/webservice/algorithms/DataInBoundsSearch.py
index 2da6891..fa69416 100644
--- a/analysis/webservice/algorithms/DataInBoundsSearch.py
+++ b/analysis/webservice/algorithms/DataInBoundsSearch.py
@@ -14,7 +14,6 @@
# limitations under the License.
-import logging
from datetime import datetime
from pytz import timezone
@@ -67,13 +66,8 @@
}
singleton = True
- def __init__(self):
- NexusCalcHandler.__init__(self)
- self.log = logging.getLogger(__name__)
-
def parse_arguments(self, request):
# Parse input arguments
- self.log.debug("Parsing arguments")
try:
ds = request.get_dataset()[0]
diff --git a/analysis/webservice/algorithms/DataSeriesList.py b/analysis/webservice/algorithms/DataSeriesList.py
index 16736b2..e9275ed 100644
--- a/analysis/webservice/algorithms/DataSeriesList.py
+++ b/analysis/webservice/algorithms/DataSeriesList.py
@@ -20,6 +20,10 @@
from webservice.NexusHandler import nexus_handler
from webservice.webmodel import cached
+import logging
+
+
+logger = logging.getLogger(__name__)
@nexus_handler
class DataSeriesListCalcHandlerImpl(NexusCalcHandler):
@@ -28,9 +32,6 @@
description = "Lists datasets currently available for analysis"
params = {}
- def __init__(self):
- NexusCalcHandler.__init__(self, skipCassandra=True)
-
@cached(ttl=(60 * 60 * 1000)) # 1 hour cached
def calc(self, computeOptions, **args):
class SimpleResult(object):
diff --git a/analysis/webservice/algorithms/DelayTest.py b/analysis/webservice/algorithms/DelayTest.py
index e2c1b30..de56f56 100644
--- a/analysis/webservice/algorithms/DelayTest.py
+++ b/analysis/webservice/algorithms/DelayTest.py
@@ -28,9 +28,6 @@
params = {}
singleton = True
- def __init__(self):
- NexusCalcHandler.__init__(self)
-
def calc(self, computeOptions, **args):
time.sleep(10)
diff --git a/analysis/webservice/algorithms/ErrorTosserTest.py b/analysis/webservice/algorithms/ErrorTosserTest.py
index dc4d617..0100552 100644
--- a/analysis/webservice/algorithms/ErrorTosserTest.py
+++ b/analysis/webservice/algorithms/ErrorTosserTest.py
@@ -26,9 +26,6 @@
params = {}
singleton = True
- def __init__(self):
- NexusCalcHandler.__init__(self)
-
def calc(self, computeOptions, **args):
a = 100 / 0.0
# raise Exception("I'm Mad!")
diff --git a/analysis/webservice/algorithms/Heartbeat.py b/analysis/webservice/algorithms/Heartbeat.py
index ae7fcee..bc1f50f 100644
--- a/analysis/webservice/algorithms/Heartbeat.py
+++ b/analysis/webservice/algorithms/Heartbeat.py
@@ -28,9 +28,6 @@
params = {}
singleton = True
- def __init__(self):
- NexusCalcHandler.__init__(self, skipCassandra=True)
-
def calc(self, computeOptions, **args):
solrOnline = self._get_tile_service().pingSolr()
diff --git a/analysis/webservice/algorithms/HofMoeller.py b/analysis/webservice/algorithms/HofMoeller.py
index 563ea3d..60252ab 100644
--- a/analysis/webservice/algorithms/HofMoeller.py
+++ b/analysis/webservice/algorithms/HofMoeller.py
@@ -39,6 +39,9 @@
if not matplotlib.get_backend():
matplotlib.use('Agg')
+logger = logging.getLogger(__name__)
+
+
class LongitudeHofMoellerCalculator(object):
def longitude_time_hofmoeller_stats(self, tile, index):
stat = {
@@ -93,9 +96,6 @@
class BaseHoffMoellerCalcHandlerImpl(NexusCalcHandler):
- def __init__(self):
- NexusCalcHandler.__init__(self)
- self.log = logging.getLogger(__name__)
def applyDeseasonToHofMoellerByField(self, results, pivot="lats", field="avg", append=True):
shape = (len(results), len(results[0][pivot]))
@@ -168,7 +168,7 @@
result = done_queue.get()
try:
error_str = result['error']
- self.log.error(error_str)
+ logger.error(error_str)
raise NexusProcessingException(reason="Error calculating latitude_time_hofmoeller_stats.")
except KeyError:
pass
@@ -234,7 +234,7 @@
result = done_queue.get()
try:
error_str = result['error']
- self.log.error(error_str)
+ logger.error(error_str)
raise NexusProcessingException(reason="Error calculating longitude_time_hofmoeller_stats.")
except KeyError:
pass
diff --git a/analysis/webservice/algorithms/LongitudeLatitudeMap.py b/analysis/webservice/algorithms/LongitudeLatitudeMap.py
index 3f0467a..031d893 100644
--- a/analysis/webservice/algorithms/LongitudeLatitudeMap.py
+++ b/analysis/webservice/algorithms/LongitudeLatitudeMap.py
@@ -14,7 +14,6 @@
# limitations under the License.
-import logging
import math
from datetime import datetime
@@ -74,13 +73,8 @@
}
singleton = True
- def __init__(self):
- NexusCalcHandler.__init__(self, skipCassandra=True)
- self.log = logging.getLogger(__name__)
-
def parse_arguments(self, request):
# Parse input arguments
- self.log.debug("Parsing arguments")
try:
ds = request.get_dataset()[0]
except:
diff --git a/analysis/webservice/algorithms/NexusCalcHandler.py b/analysis/webservice/algorithms/NexusCalcHandler.py
index b5f220f..74f5bf7 100644
--- a/analysis/webservice/algorithms/NexusCalcHandler.py
+++ b/analysis/webservice/algorithms/NexusCalcHandler.py
@@ -22,13 +22,9 @@
if "params" not in cls.__dict__:
raise Exception("Property 'params' has not been defined")
- def __init__(self, algorithm_config=None, skipCassandra=False, skipSolr=False):
- self.algorithm_config = algorithm_config
- self._skipCassandra = skipCassandra
- self._skipSolr = skipSolr
- self._tile_service = NexusTileService(skipDatastore=self._skipCassandra,
- skipMetadatastore=self._skipSolr,
- config=self.algorithm_config)
+ def __init__(self, tile_service_factory, **kwargs):
+ self._tile_service_factory = tile_service_factory
+ self._tile_service = tile_service_factory()
def _get_tile_service(self):
return self._tile_service
diff --git a/analysis/webservice/algorithms/StandardDeviationSearch.py b/analysis/webservice/algorithms/StandardDeviationSearch.py
index 231c687..1975d2d 100644
--- a/analysis/webservice/algorithms/StandardDeviationSearch.py
+++ b/analysis/webservice/algorithms/StandardDeviationSearch.py
@@ -73,13 +73,8 @@
}
singleton = True
- def __init__(self):
- NexusCalcHandler.__init__(self)
- self.log = logging.getLogger(__name__)
-
def parse_arguments(self, request):
# Parse input arguments
- self.log.debug("Parsing arguments")
try:
ds = request.get_dataset()[0]
except:
diff --git a/analysis/webservice/algorithms/TileSearch.py b/analysis/webservice/algorithms/TileSearch.py
index a3758bc..321d94f 100644
--- a/analysis/webservice/algorithms/TileSearch.py
+++ b/analysis/webservice/algorithms/TileSearch.py
@@ -62,9 +62,6 @@
}
}
- def __init__(self):
- NexusCalcHandler.__init__(self, skipCassandra=True)
-
def calc(self, computeOptions, **args):
minLat = computeOptions.get_min_lat()
maxLat = computeOptions.get_max_lat()
diff --git a/analysis/webservice/algorithms/TimeAvgMap.py b/analysis/webservice/algorithms/TimeAvgMap.py
index 3a609c5..93a9a00 100644
--- a/analysis/webservice/algorithms/TimeAvgMap.py
+++ b/analysis/webservice/algorithms/TimeAvgMap.py
@@ -37,9 +37,6 @@
params = DEFAULT_PARAMETERS_SPEC
singleton = True
- def __init__(self):
- NexusCalcHandler.__init__(self, skipCassandra=False)
-
def _find_native_resolution(self):
# Get a quick set of tiles (1 degree at center of box) at 1 time stamp
midLat = (self._minLat + self._maxLat) / 2
diff --git a/analysis/webservice/algorithms/TimeSeries.py b/analysis/webservice/algorithms/TimeSeries.py
index 85613d9..b1d0675 100644
--- a/analysis/webservice/algorithms/TimeSeries.py
+++ b/analysis/webservice/algorithms/TimeSeries.py
@@ -41,6 +41,7 @@
EPOCH = timezone('UTC').localize(datetime(1970, 1, 1))
ISO_8601 = '%Y-%m-%dT%H:%M:%S%z'
+logger = logging.getLogger(__name__)
@nexus_handler
class TimeSeriesCalcHandlerImpl(NexusCalcHandler):
@@ -84,13 +85,8 @@
}
singleton = True
- def __init__(self):
- NexusCalcHandler.__init__(self)
- self.log = logging.getLogger(__name__)
-
def parse_arguments(self, request):
# Parse input arguments
- self.log.debug("Parsing arguments")
try:
ds = request.get_dataset()
@@ -185,7 +181,7 @@
except Exception:
stats = {}
tb = traceback.format_exc()
- self.log.warn("Error when calculating comparison stats:\n%s" % tb)
+ logger.warn("Error when calculating comparison stats:\n%s" % tb)
else:
stats = {}
@@ -199,7 +195,7 @@
maxLon=bounding_polygon.bounds[2], ds=ds, startTime=start_seconds_from_epoch,
endTime=end_seconds_from_epoch)
- self.log.info("Merging results and calculating comparisons took %s" % (str(datetime.now() - the_time)))
+ logger.info("Merging results and calculating comparisons took %s" % (str(datetime.now() - the_time)))
return res
def getTimeSeriesStatsForBoxSingleDataSet(self, bounding_polygon, ds, start_seconds_from_epoch,
@@ -214,7 +210,7 @@
ds,
start_seconds_from_epoch,
end_seconds_from_epoch)
- self.log.info("Finding days in range took %s for dataset %s" % (str(datetime.now() - the_time), ds))
+ logger.info("Finding days in range took %s for dataset %s" % (str(datetime.now() - the_time), ds))
if len(daysinrange) == 0:
raise NoDataException(reason="No data found for selected timeframe")
@@ -248,7 +244,7 @@
result = done_queue.get()
try:
error_str = result['error']
- self.log.error(error_str)
+ logger.error(error_str)
raise NexusProcessingException(reason="Error calculating average by day.")
except KeyError:
pass
@@ -259,7 +255,7 @@
manager.shutdown()
results = sorted(results, key=lambda entry: entry["time"])
- self.log.info("Time series calculation took %s for dataset %s" % (str(datetime.now() - the_time), ds))
+ logger.info("Time series calculation took %s for dataset %s" % (str(datetime.now() - the_time), ds))
if apply_seasonal_cycle_filter:
the_time = datetime.now()
@@ -272,7 +268,7 @@
result['meanSeasonal'] = seasonal_mean
result['minSeasonal'] = seasonal_min
result['maxSeasonal'] = seasonal_max
- self.log.info(
+ logger.info(
"Seasonal calculation took %s for dataset %s" % (str(datetime.now() - the_time), ds))
the_time = datetime.now()
@@ -291,9 +287,9 @@
except Exception as e:
# If it doesn't work log the error but ignore it
tb = traceback.format_exc()
- self.log.warn("Error calculating SeasonalLowPass filter:\n%s" % tb)
+ logger.warn("Error calculating SeasonalLowPass filter:\n%s" % tb)
- self.log.info(
+ logger.info(
"LowPass filter calculation took %s for dataset %s" % (str(datetime.now() - the_time), ds))
return results, {}
diff --git a/analysis/webservice/algorithms/TimeSeriesSolr.py b/analysis/webservice/algorithms/TimeSeriesSolr.py
index fbe4d43..49d75db 100644
--- a/analysis/webservice/algorithms/TimeSeriesSolr.py
+++ b/analysis/webservice/algorithms/TimeSeriesSolr.py
@@ -33,6 +33,7 @@
SENTINEL = 'STOP'
+logger = logging.getLogger(__name__)
@nexus_handler
class TimeSeriesCalcHandlerImpl(NexusCalcHandler):
@@ -42,10 +43,6 @@
params = DEFAULT_PARAMETERS_SPEC
singleton = True
- def __init__(self):
- NexusCalcHandler.__init__(self, skipCassandra=True)
- self.log = logging.getLogger(__name__)
-
def calc(self, computeOptions, **args):
"""
@@ -133,7 +130,7 @@
result = done_queue.get()
try:
error_str = result['error']
- self.log.error(error_str)
+ logger.error(error_str)
raise NexusProcessingException(reason="Error calculating average by day.")
except KeyError:
pass
diff --git a/analysis/webservice/algorithms/doms/DomsInitialization.py b/analysis/webservice/algorithms/doms/DomsInitialization.py
index 2d429ca..b9ee6f3 100644
--- a/analysis/webservice/algorithms/doms/DomsInitialization.py
+++ b/analysis/webservice/algorithms/doms/DomsInitialization.py
@@ -14,16 +14,18 @@
# limitations under the License.
-
import ConfigParser
import logging
import pkg_resources
-from cassandra.cluster import Cluster
-from cassandra.policies import TokenAwarePolicy, DCAwareRoundRobinPolicy, WhiteListRoundRobinPolicy
+from cassandra.auth import PlainTextAuthProvider
+from cassandra.cluster import Cluster
+from cassandra.policies import (DCAwareRoundRobinPolicy, TokenAwarePolicy,
+ WhiteListRoundRobinPolicy)
from webservice.NexusHandler import nexus_initializer
+
@nexus_initializer
class DomsInitializer:
def __init__(self):
@@ -35,9 +37,12 @@
domsconfig = ConfigParser.SafeConfigParser()
domsconfig.read(DomsInitializer._get_config_files('domsconfig.ini'))
+ domsconfig = self.override_config(domsconfig, config)
cassHost = domsconfig.get("cassandra", "host")
cassPort = domsconfig.get("cassandra", "port")
+ cassUsername = domsconfig.get("cassandra", "username")
+ cassPassword = domsconfig.get("cassandra", "password")
cassKeyspace = domsconfig.get("cassandra", "keyspace")
cassDatacenter = domsconfig.get("cassandra", "local_datacenter")
cassVersion = int(domsconfig.get("cassandra", "protocol_version"))
@@ -55,13 +60,29 @@
dc_policy = WhiteListRoundRobinPolicy([cassHost])
token_policy = TokenAwarePolicy(dc_policy)
- with Cluster([host for host in cassHost.split(',')], port=int(cassPort), load_balancing_policy=token_policy,
- protocol_version=cassVersion) as cluster:
+ if cassUsername and cassPassword:
+ auth_provider = PlainTextAuthProvider(username=cassUsername, password=cassPassword)
+ else:
+ auth_provider = None
+
+ with Cluster([host for host in cassHost.split(',')],
+ port=int(cassPort),
+ load_balancing_policy=token_policy,
+ protocol_version=cassVersion,
+ auth_provider=auth_provider) as cluster:
session = cluster.connect()
self.createKeyspace(session, cassKeyspace)
self.createTables(session)
+ def override_config(self, first, second):
+ for section in second.sections():
+ if first.has_section(section): # only override preexisting section, ignores the other
+ for option in second.options(section):
+ if second.get(section, option) is not None:
+ first.set(section, option, second.get(section, option))
+ return first
+
def createKeyspace(self, session, cassKeyspace):
log = logging.getLogger(__name__)
log.info("Verifying DOMS keyspace '%s'" % cassKeyspace)
diff --git a/analysis/webservice/algorithms/doms/domsconfig.ini.default b/analysis/webservice/algorithms/doms/domsconfig.ini.default
index d1814bf..6191f9e 100644
--- a/analysis/webservice/algorithms/doms/domsconfig.ini.default
+++ b/analysis/webservice/algorithms/doms/domsconfig.ini.default
@@ -1,10 +1,12 @@
[cassandra]
-host=sdap-cassandra
+host=localhost
port=9042
keyspace=doms
local_datacenter=datacenter1
protocol_version=3
dc_policy=DCAwareRoundRobinPolicy
+username=
+password=
[cassandraDD]
diff --git a/analysis/webservice/algorithms_spark/ClimMapSpark.py b/analysis/webservice/algorithms_spark/ClimMapSpark.py
index e870a2a..78f11f8 100644
--- a/analysis/webservice/algorithms_spark/ClimMapSpark.py
+++ b/analysis/webservice/algorithms_spark/ClimMapSpark.py
@@ -25,7 +25,7 @@
from webservice.NexusHandler import nexus_handler, DEFAULT_PARAMETERS_SPEC
from webservice.algorithms_spark.NexusCalcSparkHandler import NexusCalcSparkHandler
from webservice.webmodel import NexusResults, NexusProcessingException, NoDataException
-
+from functools import partial
@nexus_handler
class ClimMapNexusSparkHandlerImpl(NexusCalcSparkHandler):
@@ -35,14 +35,14 @@
params = DEFAULT_PARAMETERS_SPEC
@staticmethod
- def _map(tile_in_spark):
+ def _map(tile_service_factory, tile_in_spark):
tile_bounds = tile_in_spark[0]
(min_lat, max_lat, min_lon, max_lon,
min_y, max_y, min_x, max_x) = tile_bounds
startTime = tile_in_spark[1]
endTime = tile_in_spark[2]
ds = tile_in_spark[3]
- tile_service = NexusTileService()
+ tile_service = tile_service_factory()
# print 'Started tile', tile_bounds
# sys.stdout.flush()
tile_inbounds_shape = (max_y - min_y + 1, max_x - min_x + 1)
@@ -196,7 +196,7 @@
spark_nparts = self._spark_nparts(nparts_requested)
self.log.info('Using {} partitions'.format(spark_nparts))
rdd = self._sc.parallelize(nexus_tiles_spark, spark_nparts)
- sum_count_part = rdd.map(self._map)
+ sum_count_part = rdd.map(partial(self._map, self._tile_service_factory))
sum_count = \
sum_count_part.combineByKey(lambda val: val,
lambda x, val: (x[0] + val[0],
diff --git a/analysis/webservice/algorithms_spark/CorrMapSpark.py b/analysis/webservice/algorithms_spark/CorrMapSpark.py
index 1af8cab..4d2c4fe 100644
--- a/analysis/webservice/algorithms_spark/CorrMapSpark.py
+++ b/analysis/webservice/algorithms_spark/CorrMapSpark.py
@@ -13,15 +13,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-
import json
-import math
-import logging
from datetime import datetime
-import numpy as np
-from nexustiles.nexustiles import NexusTileService
+from functools import partial
-# from time import time
+import numpy as np
+
from webservice.NexusHandler import nexus_handler, DEFAULT_PARAMETERS_SPEC
from webservice.algorithms_spark.NexusCalcSparkHandler import NexusCalcSparkHandler
from webservice.webmodel import NexusProcessingException, NexusResults, NoDataException
@@ -35,7 +32,7 @@
params = DEFAULT_PARAMETERS_SPEC
@staticmethod
- def _map(tile_in):
+ def _map(tile_service_factory, tile_in):
# Unpack input
tile_bounds, start_time, end_time, ds = tile_in
(min_lat, max_lat, min_lon, max_lon,
@@ -60,7 +57,7 @@
# print 'days_at_a_time = ', days_at_a_time
t_incr = 86400 * days_at_a_time
- tile_service = NexusTileService()
+ tile_service = tile_service_factory
# Compute the intermediate summations needed for the Pearson
# Correlation Coefficient. We use a one-pass online algorithm
@@ -194,12 +191,12 @@
self.log.debug('nlats={0}, nlons={1}'.format(self._nlats, self._nlons))
daysinrange = self._get_tile_service().find_days_in_range_asc(self._minLat,
- self._maxLat,
- self._minLon,
- self._maxLon,
- self._ds[0],
- self._startTime,
- self._endTime)
+ self._maxLat,
+ self._minLon,
+ self._maxLon,
+ self._ds[0],
+ self._startTime,
+ self._endTime)
ndays = len(daysinrange)
if ndays == 0:
raise NoDataException(reason="No data found for selected timeframe")
@@ -224,7 +221,9 @@
max_time_parts = 72
num_time_parts = min(max_time_parts, ndays)
- spark_part_time_ranges = np.tile(np.array([a[[0,-1]] for a in np.array_split(np.array(daysinrange), num_time_parts)]), (len(nexus_tiles_spark),1))
+ spark_part_time_ranges = np.tile(
+ np.array([a[[0, -1]] for a in np.array_split(np.array(daysinrange), num_time_parts)]),
+ (len(nexus_tiles_spark), 1))
nexus_tiles_spark = np.repeat(nexus_tiles_spark, num_time_parts, axis=0)
nexus_tiles_spark[:, 1:3] = spark_part_time_ranges
@@ -233,7 +232,7 @@
self.log.info('Using {} partitions'.format(spark_nparts))
rdd = self._sc.parallelize(nexus_tiles_spark, spark_nparts)
- sum_tiles_part = rdd.map(self._map)
+ sum_tiles_part = rdd.map(partial(self._map, self._tile_service_factory))
# print "sum_tiles_part = ",sum_tiles_part.collect()
sum_tiles = \
sum_tiles_part.combineByKey(lambda val: val,
diff --git a/analysis/webservice/algorithms_spark/DailyDifferenceAverageSpark.py b/analysis/webservice/algorithms_spark/DailyDifferenceAverageSpark.py
index 51be431..f21585b 100644
--- a/analysis/webservice/algorithms_spark/DailyDifferenceAverageSpark.py
+++ b/analysis/webservice/algorithms_spark/DailyDifferenceAverageSpark.py
@@ -72,7 +72,7 @@
singleton = True
def __init__(self, **kwargs):
- NexusCalcSparkHandler.__init__(self, skipCassandra=True, **kwargs)
+ NexusCalcSparkHandler.__init__(self, **kwargs)
self.log = logging.getLogger(__name__)
def parse_arguments(self, request):
@@ -134,15 +134,18 @@
# Get tile ids in box
tile_ids = [tile.tile_id for tile in
self._get_tile_service().find_tiles_in_polygon(bounding_polygon, dataset,
- start_seconds_from_epoch, end_seconds_from_epoch,
- fetch_data=False, fl='id',
- sort=['tile_min_time_dt asc', 'tile_min_lon asc',
- 'tile_min_lat asc'], rows=5000)]
+ start_seconds_from_epoch, end_seconds_from_epoch,
+ fetch_data=False, fl='id',
+ sort=['tile_min_time_dt asc', 'tile_min_lon asc',
+ 'tile_min_lat asc'], rows=5000)]
# Call spark_matchup
- self.log.debug("Calling Spark Driver")
try:
- spark_result = spark_anomolies_driver(tile_ids, wkt.dumps(bounding_polygon), dataset, climatology,
+ spark_result = spark_anomalies_driver(self._tile_service_factory,
+ tile_ids,
+ wkt.dumps(bounding_polygon),
+ dataset,
+ climatology,
sc=self._sc)
except Exception as e:
self.log.exception(e)
@@ -264,7 +267,7 @@
return num_partitions
-def spark_anomolies_driver(tile_ids, bounding_wkt, dataset, climatology, sc=None):
+def spark_anomalies_driver(tile_service_factory, tile_ids, bounding_wkt, dataset, climatology, sc=None):
from functools import partial
with DRIVER_LOCK:
@@ -297,7 +300,7 @@
return sum_cnt_var_tuple[0] / sum_cnt_var_tuple[1], np.sqrt(sum_cnt_var_tuple[2])
result = rdd \
- .mapPartitions(partial(calculate_diff, bounding_wkt=bounding_wkt_b, dataset=dataset_b,
+ .mapPartitions(partial(calculate_diff, tile_service_factory, bounding_wkt=bounding_wkt_b, dataset=dataset_b,
climatology=climatology_b)) \
.reduceByKey(add_tuple_elements) \
.mapValues(compute_avg_and_std) \
@@ -307,7 +310,7 @@
return result
-def calculate_diff(tile_ids, bounding_wkt, dataset, climatology):
+def calculate_diff(tile_service_factory, tile_ids, bounding_wkt, dataset, climatology):
from itertools import chain
# Construct a list of generators that yield (day, sum, count, variance)
@@ -316,7 +319,7 @@
tile_ids = list(tile_ids)
if len(tile_ids) == 0:
return []
- tile_service = NexusTileService()
+ tile_service = tile_service_factory()
for tile_id in tile_ids:
# Get the dataset tile
diff --git a/analysis/webservice/algorithms_spark/HofMoellerSpark.py b/analysis/webservice/algorithms_spark/HofMoellerSpark.py
index c4bc019..7c3041a 100644
--- a/analysis/webservice/algorithms_spark/HofMoellerSpark.py
+++ b/analysis/webservice/algorithms_spark/HofMoellerSpark.py
@@ -14,7 +14,6 @@
# limitations under the License.
import itertools
-import logging
from cStringIO import StringIO
from datetime import datetime
from functools import partial
@@ -25,8 +24,8 @@
import shapely.geometry
from matplotlib import cm
from matplotlib.ticker import FuncFormatter
-from nexustiles.nexustiles import NexusTileService
from pytz import timezone
+
from webservice.NexusHandler import nexus_handler
from webservice.algorithms_spark.NexusCalcSparkHandler import NexusCalcSparkHandler
from webservice.webmodel import NexusResults, NoDataException, NexusProcessingException
@@ -41,12 +40,12 @@
class HofMoellerCalculator(object):
@staticmethod
- def hofmoeller_stats(metrics_callback, tile_in_spark):
+ def hofmoeller_stats(tile_service_factory, metrics_callback, tile_in_spark):
(latlon, tile_id, index,
min_lat, max_lat, min_lon, max_lon) = tile_in_spark
- tile_service = NexusTileService()
+ tile_service = tile_service_factory()
try:
# Load the dataset tile
tile = tile_service.find_tile_by_id(tile_id, metrics_callback=metrics_callback)[0]
@@ -263,7 +262,7 @@
'min': t[7]}
-def spark_driver(sc, latlon, nexus_tiles_spark, metrics_callback):
+def spark_driver(sc, latlon, tile_service_factory, nexus_tiles_spark, metrics_callback):
# Parallelize list of tile ids
rdd = sc.parallelize(nexus_tiles_spark, determine_parllelism(len(nexus_tiles_spark)))
if latlon == 0:
@@ -279,7 +278,7 @@
# the value is a tuple of intermediate statistics for the specified
# coordinate within a single NEXUS tile.
metrics_callback(partitions=rdd.getNumPartitions())
- results = rdd.flatMap(partial(HofMoellerCalculator.hofmoeller_stats, metrics_callback))
+ results = rdd.flatMap(partial(HofMoellerCalculator.hofmoeller_stats, tile_service_factory, metrics_callback))
# Combine tuples across tiles with input key = (time, lat|lon)
# Output a key value pair with key = (time)
@@ -349,15 +348,19 @@
nexus_tiles_spark = [(self._latlon, tile.tile_id, x, min_lat, max_lat, min_lon, max_lon) for x, tile in
enumerate(self._get_tile_service().find_tiles_in_box(min_lat, max_lat, min_lon, max_lon,
- ds, start_time, end_time,
- metrics_callback=metrics_record.record_metrics,
- fetch_data=False))]
+ ds, start_time, end_time,
+ metrics_callback=metrics_record.record_metrics,
+ fetch_data=False))]
print ("Got {} tiles".format(len(nexus_tiles_spark)))
if len(nexus_tiles_spark) == 0:
raise NoDataException(reason="No data found for selected timeframe")
- results = spark_driver(self._sc, self._latlon, nexus_tiles_spark, metrics_record.record_metrics)
+ results = spark_driver(self._sc,
+ self._latlon,
+ self._tile_service_factory,
+ nexus_tiles_spark,
+ metrics_record.record_metrics)
results = filter(None, results)
results = sorted(results, key=lambda entry: entry['time'])
for i in range(len(results)):
@@ -400,15 +403,19 @@
nexus_tiles_spark = [(self._latlon, tile.tile_id, x, min_lat, max_lat, min_lon, max_lon) for x, tile in
enumerate(self._get_tile_service().find_tiles_in_box(min_lat, max_lat, min_lon, max_lon,
- ds, start_time, end_time,
- metrics_callback=metrics_record.record_metrics,
- fetch_data=False))]
+ ds, start_time, end_time,
+ metrics_callback=metrics_record.record_metrics,
+ fetch_data=False))]
print ("Got {} tiles".format(len(nexus_tiles_spark)))
if len(nexus_tiles_spark) == 0:
raise NoDataException(reason="No data found for selected timeframe")
- results = spark_driver(self._sc, self._latlon, nexus_tiles_spark, metrics_record.record_metrics)
+ results = spark_driver(self._sc,
+ self._latlon,
+ self._tile_service_factory,
+ nexus_tiles_spark,
+ metrics_record.record_metrics)
results = filter(None, results)
results = sorted(results, key=lambda entry: entry["time"])
diff --git a/analysis/webservice/algorithms_spark/Matchup.py b/analysis/webservice/algorithms_spark/Matchup.py
index 9ae7557..fc90aa1 100644
--- a/analysis/webservice/algorithms_spark/Matchup.py
+++ b/analysis/webservice/algorithms_spark/Matchup.py
@@ -132,7 +132,7 @@
singleton = True
def __init__(self, algorithm_config=None, sc=None):
- NexusCalcSparkHandler.__init__(self, algorithm_config=algorithm_config, sc=sc, skipCassandra=True)
+ NexusCalcSparkHandler.__init__(self, algorithm_config=algorithm_config, sc=sc)
self.log = logging.getLogger(__name__)
def parse_arguments(self, request):
diff --git a/analysis/webservice/algorithms_spark/MaximaMinimaSpark.py b/analysis/webservice/algorithms_spark/MaximaMinimaSpark.py
index 3bd9698..5b4bd83 100644
--- a/analysis/webservice/algorithms_spark/MaximaMinimaSpark.py
+++ b/analysis/webservice/algorithms_spark/MaximaMinimaSpark.py
@@ -14,13 +14,11 @@
# limitations under the License.
-import math
-import logging
from datetime import datetime
+from functools import partial
import numpy as np
import shapely.geometry
-from nexustiles.nexustiles import NexusTileService
from pytz import timezone
from webservice.NexusHandler import nexus_handler
@@ -207,7 +205,7 @@
self.log.info('Using {} partitions'.format(spark_nparts))
rdd = self._sc.parallelize(nexus_tiles_spark, spark_nparts)
- max_min_part = rdd.map(self._map)
+ max_min_part = rdd.map(partial(self._map, self._tile_service_factory))
max_min_count = \
max_min_part.combineByKey(lambda val: val,
lambda x, val: (np.maximum(x[0], val[0]), # Max
@@ -283,7 +281,7 @@
# this operates on only one nexus tile bound over time. Can assume all nexus_tiles are the same shape
@staticmethod
- def _map(tile_in_spark):
+ def _map(tile_service_factory, tile_in_spark):
# tile_in_spark is a spatial tile that corresponds to nexus tiles of the same area
tile_bounds = tile_in_spark[0]
(min_lat, max_lat, min_lon, max_lon,
@@ -291,7 +289,7 @@
startTime = tile_in_spark[1]
endTime = tile_in_spark[2]
ds = tile_in_spark[3]
- tile_service = NexusTileService()
+ tile_service = tile_service_factory()
tile_inbounds_shape = (max_y - min_y + 1, max_x - min_x + 1)
diff --git a/analysis/webservice/algorithms_spark/NexusCalcSparkHandler.py b/analysis/webservice/algorithms_spark/NexusCalcSparkHandler.py
index 12b84c1..fe3541a 100644
--- a/analysis/webservice/algorithms_spark/NexusCalcSparkHandler.py
+++ b/analysis/webservice/algorithms_spark/NexusCalcSparkHandler.py
@@ -6,6 +6,8 @@
from webservice.metrics import MetricsRecord, SparkAccumulatorMetricsField, NumberMetricsField
from webservice.webmodel import NexusProcessingException
+logger = logging.getLogger(__name__)
+
class NexusCalcSparkHandler(NexusCalcHandler):
class SparkJobContext(object):
@@ -32,14 +34,15 @@
self.log.debug("Returning %s" % self.job_name)
self.spark_job_stack.append(self.job_name)
- def __init__(self, algorithm_config=None, sc=None, **kwargs):
+ def __init__(self, tile_service_factory, sc=None, **kwargs):
import inspect
- NexusCalcHandler.__init__(self, algorithm_config=algorithm_config, **kwargs)
+ NexusCalcHandler.__init__(self, tile_service_factory=tile_service_factory, **kwargs)
self.spark_job_stack = []
self._sc = sc
- max_concurrent_jobs = algorithm_config.getint("spark", "maxconcurrentjobs") if algorithm_config.has_section(
- "spark") and algorithm_config.has_option("spark", "maxconcurrentjobs") else 10
+ # max_concurrent_jobs = algorithm_config.getint("spark", "maxconcurrentjobs") if algorithm_config.has_section(
+ # "spark") and algorithm_config.has_option("spark", "maxconcurrentjobs") else 10
+ max_concurrent_jobs = 10
self.spark_job_stack = list(["Job %s" % x for x in xrange(1, max_concurrent_jobs + 1)])
self.log = logging.getLogger(__name__)
@@ -349,4 +352,4 @@
accumulator=self._sc.accumulator(0)),
NumberMetricsField(key='reduce', description='Actual time to reduce results'),
NumberMetricsField(key="actual_time", description="Total (actual) time")
- ])
\ No newline at end of file
+ ])
diff --git a/analysis/webservice/algorithms_spark/TimeAvgMapSpark.py b/analysis/webservice/algorithms_spark/TimeAvgMapSpark.py
index c668130..6231873 100644
--- a/analysis/webservice/algorithms_spark/TimeAvgMapSpark.py
+++ b/analysis/webservice/algorithms_spark/TimeAvgMapSpark.py
@@ -13,14 +13,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-import logging
from datetime import datetime
from functools import partial
import numpy as np
import shapely.geometry
-from nexustiles.nexustiles import NexusTileService
from pytz import timezone
+
from webservice.NexusHandler import nexus_handler
from webservice.algorithms_spark.NexusCalcSparkHandler import NexusCalcSparkHandler
from webservice.webmodel import NexusResults, NexusProcessingException, NoDataException
@@ -198,7 +197,7 @@
rdd = self._sc.parallelize(nexus_tiles_spark, spark_nparts)
metrics_record.record_metrics(partitions=rdd.getNumPartitions())
- sum_count_part = rdd.map(partial(self._map, metrics_record.record_metrics))
+ sum_count_part = rdd.map(partial(self._map, self._tile_service_factory, metrics_record.record_metrics))
reduce_duration = 0
reduce_start = datetime.now()
sum_count = sum_count_part.combineByKey(lambda val: val,
@@ -264,14 +263,14 @@
endTime=end_time)
@staticmethod
- def _map(metrics_callback, tile_in_spark):
+ def _map(tile_service_factory, metrics_callback, tile_in_spark):
tile_bounds = tile_in_spark[0]
(min_lat, max_lat, min_lon, max_lon,
min_y, max_y, min_x, max_x) = tile_bounds
startTime = tile_in_spark[1]
endTime = tile_in_spark[2]
ds = tile_in_spark[3]
- tile_service = NexusTileService()
+ tile_service = tile_service_factory()
tile_inbounds_shape = (max_y - min_y + 1, max_x - min_x + 1)
diff --git a/analysis/webservice/algorithms_spark/TimeSeriesSpark.py b/analysis/webservice/algorithms_spark/TimeSeriesSpark.py
index bf5963e..43f7f6d 100644
--- a/analysis/webservice/algorithms_spark/TimeSeriesSpark.py
+++ b/analysis/webservice/algorithms_spark/TimeSeriesSpark.py
@@ -195,7 +195,10 @@
spark_nparts = self._spark_nparts(nparts_requested)
self.log.info('Using {} partitions'.format(spark_nparts))
results, meta = spark_driver(daysinrange, bounding_polygon,
- shortName, metrics_record.record_metrics, spark_nparts=spark_nparts,
+ shortName,
+ self._tile_service_factory,
+ metrics_record.record_metrics,
+ spark_nparts=spark_nparts,
sc=self._sc)
if apply_seasonal_cycle_filter:
@@ -487,7 +490,7 @@
return sio.getvalue()
-def spark_driver(daysinrange, bounding_polygon, ds, metrics_callback, fill=-9999.,
+def spark_driver(daysinrange, bounding_polygon, ds, tile_service_factory, metrics_callback, fill=-9999.,
spark_nparts=1, sc=None):
nexus_tiles_spark = [(bounding_polygon.wkt, ds,
list(daysinrange_part), fill)
@@ -497,14 +500,14 @@
# Launch Spark computations
rdd = sc.parallelize(nexus_tiles_spark, spark_nparts)
metrics_callback(partitions=rdd.getNumPartitions())
- results = rdd.flatMap(partial(calc_average_on_day, metrics_callback)).collect()
+ results = rdd.flatMap(partial(calc_average_on_day, tile_service_factory, metrics_callback)).collect()
results = list(itertools.chain.from_iterable(results))
results = sorted(results, key=lambda entry: entry["time"])
return results, {}
-def calc_average_on_day(metrics_callback, tile_in_spark):
+def calc_average_on_day(tile_service_factory, metrics_callback, tile_in_spark):
import shapely.wkt
from datetime import datetime
from pytz import timezone
@@ -513,7 +516,7 @@
(bounding_wkt, dataset, timestamps, fill) = tile_in_spark
if len(timestamps) == 0:
return []
- tile_service = NexusTileService()
+ tile_service = tile_service_factory()
ds1_nexus_tiles = \
tile_service.get_tiles_bounded_by_polygon(shapely.wkt.loads(bounding_wkt),
dataset,
diff --git a/analysis/webservice/algorithms_spark/VarianceSpark.py b/analysis/webservice/algorithms_spark/VarianceSpark.py
index 698385d..24ffbf0 100644
--- a/analysis/webservice/algorithms_spark/VarianceSpark.py
+++ b/analysis/webservice/algorithms_spark/VarianceSpark.py
@@ -14,13 +14,11 @@
# limitations under the License.
-import math
-import logging
from datetime import datetime
+from functools import partial
import numpy as np
import shapely.geometry
-from nexustiles.nexustiles import NexusTileService
from pytz import timezone
from webservice.NexusHandler import nexus_handler
@@ -207,7 +205,7 @@
self.log.info('Using {} partitions'.format(spark_nparts))
rdd = self._sc.parallelize(nexus_tiles_spark, spark_nparts)
- sum_count_part = rdd.map(self._map)
+ sum_count_part = rdd.map(partial(self._map, self._tile_service_factory))
sum_count = \
sum_count_part.combineByKey(lambda val: val,
lambda x, val: (x[0] + val[0],
@@ -235,7 +233,7 @@
self.log.info('Using {} partitions'.format(spark_nparts))
rdd = self._sc.parallelize(nexus_tiles_spark, spark_nparts)
- anomaly_squared_part = rdd.map(self._calc_variance)
+ anomaly_squared_part = rdd.map(partial(self._calc_variance, self._tile_service_factory))
anomaly_squared = \
anomaly_squared_part.combineByKey(lambda val: val,
lambda x, val: (x[0] + val[0],
@@ -303,7 +301,7 @@
endTime=end_time)
@staticmethod
- def _map(tile_in_spark):
+ def _map(tile_service_factory, tile_in_spark):
# tile_in_spark is a spatial tile that corresponds to nexus tiles of the same area
tile_bounds = tile_in_spark[0]
(min_lat, max_lat, min_lon, max_lon,
@@ -311,7 +309,7 @@
startTime = tile_in_spark[1]
endTime = tile_in_spark[2]
ds = tile_in_spark[3]
- tile_service = NexusTileService()
+ tile_service = tile_service_factory()
tile_inbounds_shape = (max_y - min_y + 1, max_x - min_x + 1)
@@ -345,7 +343,7 @@
return tile_bounds, (sum_tile, cnt_tile)
@staticmethod
- def _calc_variance(tile_in_spark):
+ def _calc_variance(tile_service_factory, tile_in_spark):
# tile_in_spark is a spatial tile that corresponds to nexus tiles of the same area
tile_bounds = tile_in_spark[0]
(min_lat, max_lat, min_lon, max_lon,
@@ -354,7 +352,7 @@
endTime = tile_in_spark[2]
ds = tile_in_spark[3]
x_bar = tile_in_spark[4]
- tile_service = NexusTileService()
+ tile_service = tile_service_factory()
tile_inbounds_shape = (max_y - min_y + 1, max_x - min_x + 1)
diff --git a/analysis/webservice/webapp.py b/analysis/webservice/webapp.py
index adfedda..d1ada7f 100644
--- a/analysis/webservice/webapp.py
+++ b/analysis/webservice/webapp.py
@@ -13,19 +13,22 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-import os
import ConfigParser
import importlib
import logging
import sys
+from functools import partial
+
import pkg_resources
import tornado.web
-import webservice.algorithms_spark.NexusCalcSparkHandler
from tornado.options import define, options, parse_command_line
+import webservice.algorithms_spark.NexusCalcSparkHandler
+from nexustiles.nexustiles import NexusTileService
from webservice import NexusHandler
from webservice.nexus_tornado.request.handlers import NexusRequestHandler
+
def inject_args_in_config(args, config):
"""
Takes command argparse arguments and push them in the config
@@ -37,9 +40,9 @@
n = t_opt.name
first_ = n.find('_')
if first_ > 0:
- s, o = n[:first_], n[first_+1:]
+ s, o = n[:first_], n[first_ + 1:]
v = t_opt.value()
- log.info('inject argument {} = {} in configuration section {}, option {}'.format(n, v , s, o))
+ log.info('inject argument {} = {} in configuration section {}, option {}'.format(n, v, s, o))
if not config.has_section(s):
config.add_section(s)
config.set(s, o, v)
@@ -67,6 +70,10 @@
define('solr_time_out', default=60,
help='time out for solr requests in seconds, default (60) is ok for most deployments'
' when solr performances are not good this might need to be increased')
+ define('solr_host', help='solr host and port')
+ define('cassandra_host', help='cassandra host')
+ define('cassandra_username', help='cassandra username')
+ define('cassandra_password', help='cassandra password')
parse_command_line()
algorithm_config = inject_args_in_config(options, algorithm_config)
@@ -96,29 +103,33 @@
log.info("Initializing request ThreadPool to %s" % max_request_threads)
request_thread_pool = tornado.concurrent.futures.ThreadPoolExecutor(max_request_threads)
+ tile_service_factory = partial(NexusTileService, False, False, algorithm_config)
spark_context = None
for clazzWrapper in NexusHandler.AVAILABLE_HANDLERS:
if issubclass(clazzWrapper, webservice.algorithms_spark.NexusCalcSparkHandler.NexusCalcSparkHandler):
if spark_context is None:
from pyspark.sql import SparkSession
+
spark = SparkSession.builder.appName("nexus-analysis").getOrCreate()
spark_context = spark.sparkContext
- handlers.append(
- (clazzWrapper.path, NexusRequestHandler,
- dict(clazz=clazzWrapper, algorithm_config=algorithm_config, sc=spark_context,
- thread_pool=request_thread_pool)))
+ handlers.append((clazzWrapper.path,
+ NexusRequestHandler,
+ dict(clazz=clazzWrapper,
+ tile_service_factory=tile_service_factory,
+ sc=spark_context,
+ thread_pool=request_thread_pool)))
else:
- handlers.append(
- (clazzWrapper.path, NexusRequestHandler,
- dict(clazz=clazzWrapper, thread_pool=request_thread_pool)))
-
+ handlers.append((clazzWrapper.path,
+ NexusRequestHandler,
+ dict(clazz=clazzWrapper,
+ tile_service_factory=tile_service_factory,
+ thread_pool=request_thread_pool)))
class VersionHandler(tornado.web.RequestHandler):
def get(self):
self.write(pkg_resources.get_distribution("nexusanalysis").version)
-
handlers.append((r"/version", VersionHandler))
if staticEnabled:
diff --git a/data-access/nexustiles/config/datastores.ini.default b/data-access/nexustiles/config/datastores.ini.default
index 0fe8d9d..ed40068 100644
--- a/data-access/nexustiles/config/datastores.ini.default
+++ b/data-access/nexustiles/config/datastores.ini.default
@@ -1,10 +1,12 @@
[cassandra]
-host=sdap-cassandra
+host=localhost
port=9042
keyspace=nexustiles
local_datacenter=datacenter1
protocol_version=3
dc_policy=DCAwareRoundRobinPolicy
+username=
+password=
[s3]
bucket=nexus-jpl
@@ -15,7 +17,7 @@
region=us-west-2
[solr]
-host=sdap-solr:8983
+host=http://localhost:8983
core=nexustiles
[datastore]
diff --git a/data-access/nexustiles/dao/CassandraProxy.py b/data-access/nexustiles/dao/CassandraProxy.py
index ed37c5c..a8a4e6e 100644
--- a/data-access/nexustiles/dao/CassandraProxy.py
+++ b/data-access/nexustiles/dao/CassandraProxy.py
@@ -13,19 +13,22 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+import logging
import uuid
from ConfigParser import NoOptionError
-from multiprocessing.synchronize import Lock
import nexusproto.DataTile_pb2 as nexusproto
import numpy as np
+from cassandra.auth import PlainTextAuthProvider
from cassandra.cqlengine import columns, connection, CQLEngineException
from cassandra.cqlengine.models import Model
from cassandra.policies import TokenAwarePolicy, DCAwareRoundRobinPolicy, WhiteListRoundRobinPolicy
+from multiprocessing.synchronize import Lock
from nexusproto.serialization import from_shaped_array
INIT_LOCK = Lock()
+logger = logging.getLogger(__name__)
class NexusTileData(Model):
__table_name__ = 'sea_surface_temp'
@@ -151,6 +154,8 @@
def __init__(self, config):
self.config = config
self.__cass_url = config.get("cassandra", "host")
+ self.__cass_username = config.get("cassandra", "username")
+ self.__cass_password = config.get("cassandra", "password")
self.__cass_keyspace = config.get("cassandra", "keyspace")
self.__cass_local_DC = config.get("cassandra", "local_datacenter")
self.__cass_protocol_version = config.getint("cassandra", "protocol_version")
@@ -168,16 +173,20 @@
self.__open()
def __open(self):
-
if self.__cass_dc_policy == 'DCAwareRoundRobinPolicy':
dc_policy = DCAwareRoundRobinPolicy(self.__cass_local_DC)
elif self.__cass_dc_policy == 'WhiteListRoundRobinPolicy':
dc_policy = WhiteListRoundRobinPolicy([self.__cass_url])
+ if self.__cass_username and self.__cass_password:
+ auth_provider = PlainTextAuthProvider(username=self.__cass_username, password=self.__cass_password)
+ else:
+ auth_provider = None
token_policy = TokenAwarePolicy(dc_policy)
connection.setup([host for host in self.__cass_url.split(',')], self.__cass_keyspace,
protocol_version=self.__cass_protocol_version, load_balancing_policy=token_policy,
- port=self.__cass_port)
+ port=self.__cass_port,
+ auth_provider=auth_provider)
def fetch_nexus_tiles(self, *tile_ids):
tile_ids = [uuid.UUID(str(tile_id)) for tile_id in tile_ids if
diff --git a/data-access/nexustiles/dao/SolrProxy.py b/data-access/nexustiles/dao/SolrProxy.py
index bf41107..28127a7 100644
--- a/data-access/nexustiles/dao/SolrProxy.py
+++ b/data-access/nexustiles/dao/SolrProxy.py
@@ -44,7 +44,7 @@
with SOLR_CON_LOCK:
solrcon = getattr(thread_local, 'solrcon', None)
if solrcon is None:
- solr_url = 'http://%s/solr/%s' % (self.solrUrl, self.solrCore)
+ solr_url = '%s/solr/%s' % (self.solrUrl, self.solrCore)
self.logger.info("connect to solr, url {} with option(s) = {}".format(solr_url, solr_kargs))
solrcon = pysolr.Solr(solr_url, **solr_kargs)
thread_local.solrcon = solrcon
@@ -665,7 +665,7 @@
return (self.convert_iso_to_datetime(date) - EPOCH).total_seconds()
def ping(self):
- solrAdminPing = 'http://%s/solr/%s/admin/ping' % (self.solrUrl, self.solrCore)
+ solrAdminPing = '%s/solr/%s/admin/ping' % (self.solrUrl, self.solrCore)
try:
r = requests.get(solrAdminPing, params={'wt': 'json'})
results = json.loads(r.text)
diff --git a/data-access/nexustiles/nexustiles.py b/data-access/nexustiles/nexustiles.py
index 24db1ae..3e7e2f8 100644
--- a/data-access/nexustiles/nexustiles.py
+++ b/data-access/nexustiles/nexustiles.py
@@ -102,10 +102,10 @@
def override_config(self, config):
for section in config.sections():
- if self._config.has_section(section): # only override preexisting section, ignores the other
+ if self._config.has_section(section): # only override preexisting section, ignores the other
for option in config.options(section):
- self._config.set(section, option, config.get(section, option))
-
+ if config.get(section, option) is not None:
+ self._config.set(section, option, config.get(section, option))
def get_dataseries_list(self, simple=False):
if simple:
diff --git a/data-access/tests/nexustiles_test.py b/data-access/tests/nexustiles_test.py
index 9f533a8..d79d441 100644
--- a/data-access/tests/nexustiles_test.py
+++ b/data-access/tests/nexustiles_test.py
@@ -32,7 +32,7 @@
port=9042
[solr]
-host=localhost:8983
+host=http://localhost:8983
core=nexustiles
[datastore]