blob: b97d890a99666765757879862f3005b68bd1a1ce [file] [log] [blame]
import glob
import os
class MetadataExtractor(object):
def __init__(self, *paths):
"""Extracts metadata from data filenames.
Instances of MetadataExtractor are used to extract metadata from
filenames in bulk. Example usage:
>>> extractor = MetadataExtractor('/path/to/data')
Suppose the data in this directory had the following files:
pr_*.nc, uas_*.nc, vas_*.nc
All of the metadata lies in the data attribute:
>>> extractor.data
[{'filename': /path/to/data/pr_*.nc, 'variable': 'pr'},
{'filename': /path/to/data/vas_*.nc, 'variable': 'vas'},
{'filename': /path/to/data/uas_*.nc, 'variable': 'uas'}]
Results can be narrowed down by specifying values for a field:
>>> extractor.query(variable='pr')
[{'filename': /path/to/data/pr_*.nc, 'variable': 'pr'}]
Finally, metadata from two sets of extractors can be grouped together
based on common field name as follows:
>>> extractor.group(extractor2, 'variable')
This class should only be used as a starting point. We recommend using
the included obs4MIPSMetadataExtractor and CORDEXMetadataExtractor
subclasses or creating your own subclass for your usecase.
"""
self.paths = paths
@property
def data(self):
"""
The extracted metadata for each file, with all fields listed in
the fields attribute included.
"""
return self._data
@property
def paths(self):
"""
Search paths containing the dataset files.
"""
return self._paths
@paths.setter
def paths(self, paths):
"""
Extracts the metadata from scratch when paths are reset.
"""
self._paths = paths
self._extract()
@property
def fields(self):
"""
The name of field in the filename, assuming the fully filtered
filename conforms to the following convention:
filename = <field[0]>_<field[1]>_..._<field[n]>.nc. Using fewer fields
than the filename defines is allowed.
"""
fields = ['variable']
return fields
@property
def files(self):
"""
List of files (or regular expressions) for each dataset.
"""
files = []
for path in self.paths:
files.extend(glob.glob(os.path.join(path, '*.nc')))
return list(set(self.get_pattern(fname) for fname in files))
@property
def variables(self):
"""
Get the list of variables included accross all the datasets.
"""
return self.get_field('variable')
@property
def field_filters(self):
"""
Override this to filter out specific characters contained in a field.
"""
return dict()
def query(self, **kwargs):
"""
Narrow down the list of files by field names.
"""
fields = kwargs.keys()
if not set(fields).issubset(set(self.fields)):
raise ValueError("Invalid fields: {}. Must be subset of: {}"
.format(fields, self.fields))
data = self.data
for field, value in kwargs.items():
value = value if isinstance(value, list) else [value]
data = [meta for meta in data
if self._match_filter(meta, field) in value]
return data
def group(self, extractor, field):
"""
Compare the data of this extractor with another extractor instance
and group each of their metadata together by given field.
"""
# First we only want to consider values of field which are contained
# in both extractors
subset = self.get_field(field)
other_subset = extractor.get_field(field)
intersection = list(subset.intersection(other_subset))
# Next we will group the datasets in each extractor together by common
# field values
kwargs = {field: intersection}
results = self.query(**kwargs)
groups = []
for meta in results:
val = self._match_filter(meta, field)
kwargs.update({field: val})
match = extractor.query(**kwargs)
groups.append((meta, match))
return groups
def get_field(self, field):
"""
Returns only the selected field of the extracted data.
"""
if field not in self.fields:
raise ValueError("Invalid field: {}. Must be one of: {}"
.format(field, self.fields))
sub = set(meta[field] for meta in self.data)
return sub
def filter_filename(self, fname):
"""
Applies a filter to each individual filename contained in the _files
attribute, which is useful if some files within a data set are known
to not follow conventions, and "fix" them so that they do.
"""
return os.path.basename(fname)
def get_pattern(self, fname):
"""
Used to group multiple file datasets together via regular expresssions.
The most common convention is to split files by time periods, which
are generally the last field in a filename.
"""
base = fname.split('_')
pattern = '_'.join(base[:len(self.fields)] + ['*.nc'])
return pattern
def _match_filter(self, meta, field):
"""
Filter (ignore) certain character patterns when matching a field.
"""
val = meta[field]
if field in self.field_filters:
for pattern in self.field_filters[field]:
val = val.replace(pattern, '')
return val
def _extract(self):
"""
Do the actual metadata extraction from the list of filename given
via filter_filelist(). Additionally, filenames can also be filtered
via filter_filename() to remove unwanted characters from the extraction.
"""
self._data = []
for fname in self.files:
meta = dict(filename=fname)
# Perform the actual metadata extraction
fname = self.filter_filename(fname)
meta.update(dict(zip(self.fields, fname.split('_')[:-1])))
self._data.append(meta)
class obs4MIPSMetadataExtractor(MetadataExtractor):
@property
def instruments(self):
"""
Get the list of instruments accross all the datasets.
"""
return self.get_field('instrument')
@property
def fields(self):
"""
obs4MIPs fields
"""
fields = ['variable', 'instrument', 'processing_level', 'version']
return fields
@property
def field_filters(self):
"""
Field filters for CALIPSO
"""
return dict(variable=['calipso', 'Lidarsr532'])
def filter_filename(self, fname):
"""
CALIPSO files have odd naming conventions, so we will use
a modified version to conform to standard obs4MIPs conventions.
"""
fname = os.path.basename(fname)
fname = fname.replace('_obs4MIPs_', '_')
return fname
def get_pattern(self, fname):
"""
Overriden to deal with CALIPSO filenames
"""
base = fname.split('_')
offset = -2 if len(base) != 5 else -1
pattern = '_'.join(base[:offset] + ['*.nc'])
return pattern
class CORDEXMetadataExtractor(MetadataExtractor):
@property
def models(self):
"""
Get the list of models accross all the datasets.
"""
return self.get_field('model')
@property
def fields(self):
"""
obs4MIPs fields
"""
fields = ['variable', 'domain', 'driving_model', 'experiment',
'ensemble', 'model', 'version', 'time_step']
return fields