blob: 45318cec700f43c63a282de620eafd035b78c4fb [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import logging
from contextlib import contextmanager
from six.moves import cPickle as pickle
from six.moves.urllib.parse import urlparse
from petastorm.etl.legacy import depickle_legacy_package_name_compatible
from petastorm.fs_utils import FilesystemResolver
from petastorm.unischema import Unischema
from petastorm.unischema import _numpy_and_codec_from_arrow_type
from petastorm.unischema import UnischemaField
from petastorm.etl.dataset_metadata import _init_spark, _cleanup_spark
from pycarbon.core.carbon import CarbonDataset
from pycarbon.core import carbon_utils
logger = logging.getLogger(__name__)
BLOCKLETS_PER_FILE_KEY = b'dataset-toolkit.num_blocklets_per_file.v1'
UNISCHEMA_KEY = b'dataset-toolkit.unischema.v1'
class PycarbonMetadataError(Exception):
"""
Error to specify when the pycarbon metadata does not exist, does not contain the necessary information,
or is corrupt/invalid.
"""
@contextmanager
def materialize_dataset_carbon(spark, dataset_url, schema, blocklet_size_mb=None, use_summary_metadata=False,
pyarrow_filesystem=None):
"""
A Context Manager which handles all the initialization and finalization necessary
to generate metadata for a pycarbon dataset. This should be used around your
spark logic to materialize a dataset (specifically the writing of carbon output).
Note: Any blocklet indexing should happen outside the materialize_dataset_carbon block
Example:
>>> spark = SparkSession.builder...
>>> ds_url = 'hdfs:///path/to/my/dataset'
>>> with materialize_dataset_carbon(spark, ds_url, MyUnischema, 64):
>>> spark.sparkContext.parallelize(range(0, 10)).
>>> ...
>>> .write.save(path=ds_url, format='carbon')
A user may provide their own instance of pyarrow filesystem object in ``pyarrow_filesystem`` argument (otherwise,
pycarbon will create a default one based on the url).
The following example shows how a custom pyarrow HDFS filesystem, instantiated using ``libhdfs`` driver can be used
during Pycarbon dataset generation:
>>> resolver=FilesystemResolver(dataset_url, spark.sparkContext._jsc.hadoopConfiguration(),
>>> hdfs_driver='libhdfs')
>>> with materialize_dataset_carbon(..., pyarrow_filesystem=resolver.filesystem()):
>>> ...
:param spark: The spark session you are using
:param dataset_url: The dataset url to output your dataset to (e.g. ``hdfs:///path/to/dataset``)
:param schema: The :class:`petastorm.unischema.Unischema` definition of your dataset
:param blocklet_size_mb: The carbon blocklet size to use for your dataset
:param use_summary_metadata: Whether to use the carbon summary metadata for blocklet indexing or a custom
indexing method. The custom indexing method is more scalable for very large datasets.
:param pyarrow_filesystem: A pyarrow filesystem object to be used when saving Pycarbon specific metadata to the
Carbon store.
"""
# After job completes, add the unischema metadata and check for the metadata summary file
spark_config = {}
_init_spark(spark, spark_config, blocklet_size_mb, use_summary_metadata)
yield
# After job completes, add the unischema metadata and check for the metadata summary file
if pyarrow_filesystem is None:
resolver = FilesystemResolver(dataset_url, spark.sparkContext._jsc.hadoopConfiguration())
# filesystem = resolver.filesystem()
dataset_path = resolver.get_dataset_path()
else:
# filesystem = pyarrow_filesystem
dataset_path = urlparse(dataset_url).path
carbon_dataset = CarbonDataset(dataset_path)
_generate_unischema_metadata_carbon(carbon_dataset, schema)
if not use_summary_metadata:
_generate_num_blocklets_per_file_carbon(carbon_dataset, spark.sparkContext)
_cleanup_spark(spark, spark_config, blocklet_size_mb)
def _generate_unischema_metadata_carbon(carbon_schema, schema):
"""
Generates the serialized unischema and adds it to the dataset carbon metadata to be used upon reading.
:param dataset: (CarbonDataset) Dataset to attach schema
:param schema: (Unischema) Schema to attach to dataset
:return: None
"""
# TODO: Simply pickling unischema will break if the UnischemaField class is changed,
# or the codec classes are changed. We likely need something more robust.
serialized_schema = pickle.dumps(schema)
carbon_utils.add_to_dataset_metadata_carbon(carbon_schema, UNISCHEMA_KEY, serialized_schema)
def _generate_num_blocklets_per_file_carbon(carbon_dataset, spark_context):
"""
Generates the metadata file containing the number of blocklets in each file
for the carbon dataset located at the dataset_url. It does this in spark by
opening all carbon files in the dataset on the executors and collecting the
number of blocklets in each file back on the driver.
:param dataset: CarbonDataset
:param spark_context: spark context to use for retrieving the number of blocklets
in each carbon file in parallel
:return: None, upon successful completion the metadata file will exist.
"""
# if not isinstance(dataset.paths, str):
# raise ValueError('Expected dataset.paths to be a single path, not a list of paths')
pieces = carbon_dataset.pieces
# Get the common prefix of all the base path in order to retrieve a relative path
paths = [piece.path for piece in pieces]
# Needed pieces from the dataset must be extracted for spark because the dataset object is not serializable
# TODO add number of blocklets for each carbonfile
def get_carbon_blocklet_info(path):
return path, 1
number_of_blocklets = spark_context.parallelize(paths, len(paths)) \
.map(get_carbon_blocklet_info) \
.collect()
number_of_blocklets_str = json.dumps(dict(number_of_blocklets))
# Add the dict for the number of blocklets in each file to the carbon file metadata footer
carbon_utils.add_to_dataset_metadata_carbon(carbon_dataset, BLOCKLETS_PER_FILE_KEY, number_of_blocklets_str)
def get_schema_carbon(carbon_dataset):
"""Retrieves schema object stored as part of dataset methadata.
:param dataset: CarbonDataset
:return: A :class:`petastorm.unischema.Unischema` object
"""
if not carbon_dataset.common_metadata:
raise PycarbonMetadataError(
'Could not find _common_metadata file. Use materialize_dataset(..) in'
' pycarbon.etl.carbon_dataset_metadata.py to generate this file in your ETL code.'
' You can generate it on an existing dataset using pycarbon-generate-metadata.py')
# TODO add pycarbon-generate-metadata.py
dataset_metadata_dict = carbon_dataset.common_metadata.metadata
# Read schema
if UNISCHEMA_KEY not in dataset_metadata_dict:
raise PycarbonMetadataError(
'Could not find the unischema in the dataset common metadata file.'
' Please provide or generate dataset with the unischema attached.'
' Common Metadata file might not be generated properly.'
' Make sure to use materialize_dataset(..) in pycarbon.etl.carbon_dataset_metadata to'
' properly generate this file in your ETL code.'
' You can generate it on an existing dataset using pycarbon-generate-metadata.py')
ser_schema = dataset_metadata_dict[UNISCHEMA_KEY]
# Since we have moved the unischema class around few times, unpickling old schemas will not work. In this case we
# override the old import path to get backwards compatibility
schema = depickle_legacy_package_name_compatible(ser_schema)
return schema
def get_schema_from_dataset_url_carbon(dataset_url,
key=None,
secret=None,
endpoint=None,
proxy=None,
proxy_port=None,
filesystem=None):
"""Returns a :class:`petastorm.unischema.Unischema` object loaded from a dataset specified by a url.
:param dataset_url: A dataset URL
:param key: access key
:param secret: secret key
:param endpoint: endpoint_url
:param proxy: proxy
:param proxy_port: proxy_port
:param filesystem: filesystem
:return: A :class:`petastorm.unischema.Unischema` object
"""
# Get a unischema stored in the dataset metadata.
stored_schema = get_schema_carbon(CarbonDataset(dataset_url,
key=key,
secret=secret,
endpoint=endpoint,
proxy=proxy,
proxy_port=proxy_port,
filesystem=filesystem))
return stored_schema
def infer_or_load_unischema_carbon(carbon_dataset):
"""Try to recover Unischema object stored by ``materialize_dataset`` function. If it can be loaded, infer
Unischema from native Carbon schema"""
try:
return get_schema_carbon(carbon_dataset)
except PycarbonMetadataError:
logger.info('Failed loading Unischema from metadata in %s. Assuming the dataset was not created with '
'Pycarbon. Will try to construct from native Carbon schema.')
unischema_fields = []
arrow_schema = carbon_dataset.schema
for column_name in arrow_schema.names:
arrow_field = arrow_schema.field_by_name(column_name)
field_type = arrow_field.type
codec, np_type = _numpy_and_codec_from_arrow_type(field_type)
unischema_fields.append(UnischemaField(column_name, np_type, (), codec, arrow_field.nullable))
return Unischema('inferred_schema', unischema_fields)