| # Licensed to the Apache Software Foundation (ASF) under one or more |
| # contributor license agreements. See the NOTICE file distributed with |
| # this work for additional information regarding copyright ownership. |
| # The ASF licenses this file to You under the Apache License, Version 2.0 |
| # (the "License"); you may not use this file except in compliance with |
| # the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| |
| import pyarrow |
| import six |
| from six.moves.urllib.parse import urlparse |
| |
| from petastorm.hdfs.namenode import HdfsNamenodeResolver, HdfsConnector |
| |
| |
| class CarbonFilesystemResolver(object): |
| """Resolves a dataset URL, makes a connection via pyarrow, and provides a filesystem object.""" |
| |
| def __init__(self, dataset_url, key=None, secret=None, endpoint=None, proxy=None, proxy_port=None, |
| hadoop_configuration=None, connector=HdfsConnector, hdfs_driver='libhdfs3'): |
| """ |
| Given a dataset URL and an optional hadoop configuration, parse and interpret the URL to |
| instantiate a pyarrow filesystem. |
| |
| Interpretation of the URL ``scheme://hostname:port/path`` occurs in the following order: |
| |
| 1. If no ``scheme``, no longer supported, so raise an exception! |
| 2. If ``scheme`` is ``file``, use local filesystem path. |
| 3. If ``scheme`` is ``hdfs``: |
| a. Try the ``hostname`` as a namespace and attempt to connect to a name node. |
| 1. If that doesn't work, try connecting directly to namenode ``hostname:port``. |
| b. If no host, connect to the default name node. |
| 5. If ``scheme`` is ``s3``, use s3fs. The user must manually install s3fs before using s3 |
| 6. Fail otherwise. |
| |
| :param dataset_url: The hdfs URL or absolute path to the dataset |
| :param key: access key of obs |
| :param secret: secret key of obs |
| :param endpoint: endpoint of obs |
| :param proxy: proxy |
| :param proxy_port: proxy_port |
| :param hadoop_configuration: an optional hadoop configuration |
| :param connector: the HDFS connector object to use (ONLY override for testing purposes) |
| :param hdfs_driver: A string denoting the hdfs driver to use (if using a dataset on hdfs). Current choices are |
| libhdfs (java through JNI) or libhdfs3 (C++) |
| """ |
| # Cache both the original URL and the resolved, urlparsed dataset_url |
| self._dataset_url = dataset_url |
| self._parsed_dataset_url = None |
| # Cache the instantiated filesystem object |
| self._filesystem = None |
| |
| if isinstance(self._dataset_url, six.string_types): |
| self._parsed_dataset_url = urlparse(self._dataset_url) |
| else: |
| self._parsed_dataset_url = self._dataset_url |
| |
| if not self._parsed_dataset_url.scheme: |
| # Case 1 |
| raise ValueError('ERROR! A scheme-less dataset url ({}) is no longer supported. ' |
| 'Please prepend "file://" for local filesystem.'.format(self._parsed_dataset_url.scheme)) |
| |
| elif self._parsed_dataset_url.scheme == 'file': |
| # Case 2: definitely local |
| self._filesystem = pyarrow.localfs |
| |
| elif self._parsed_dataset_url.scheme == 'hdfs': |
| |
| if hdfs_driver == 'libhdfs3': |
| # libhdfs3 does not do any namenode resolution itself so we do it manually. This is not necessary |
| # if using libhdfs |
| |
| # Obtain singleton and force hadoop config evaluation |
| namenode_resolver = HdfsNamenodeResolver(hadoop_configuration) |
| |
| # Since we can't tell for sure, first treat the URL as though it references a name service |
| if self._parsed_dataset_url.netloc: |
| # Case 3a: Use the portion of netloc before any port, which doesn't get lowercased |
| nameservice = self._parsed_dataset_url.netloc.split(':')[0] |
| namenodes = namenode_resolver.resolve_hdfs_name_service(nameservice) |
| if namenodes: |
| self._filesystem = connector.connect_to_either_namenode(namenodes) |
| if self._filesystem is None: |
| # Case 3a1: That didn't work; try the URL as a namenode host |
| self._filesystem = connector.hdfs_connect_namenode(self._parsed_dataset_url) |
| else: |
| # Case 3b: No netloc, so let's try to connect to default namenode |
| # HdfsNamenodeResolver will raise exception if it fails to connect. |
| nameservice, namenodes = namenode_resolver.resolve_default_hdfs_service() |
| filesystem = connector.connect_to_either_namenode(namenodes) |
| if filesystem is not None: |
| # Properly replace the parsed dataset URL once default namenode is confirmed |
| self._parsed_dataset_url = urlparse( |
| 'hdfs://{}{}'.format(nameservice, self._parsed_dataset_url.path)) |
| self._filesystem = filesystem |
| else: |
| self._filesystem = connector.hdfs_connect_namenode(self._parsed_dataset_url, hdfs_driver) |
| |
| elif self._parsed_dataset_url.scheme == "s3a": |
| # Case 5 |
| # S3 support requires s3fs to be installed |
| try: |
| import s3fs |
| except ImportError: |
| raise ValueError('Must have s3fs installed in order to use datasets on s3. ' |
| 'Please install s3fs and try again.') |
| |
| if not self._parsed_dataset_url.netloc: |
| raise ValueError('URLs must be of the form s3://bucket/path') |
| |
| if key is None or secret is None or endpoint is None: |
| raise ValueError('key, secret, endpoint should not be None') |
| |
| http_proxy = 'http://' + proxy + ':' + str(proxy_port) if ( |
| proxy is not None and proxy_port is not None) else None |
| |
| https_proxy = 'https://' + proxy + ':' + str(proxy_port) if ( |
| proxy is not None and proxy_port is not None) else None |
| |
| config_kwargs = {'proxies': {'http': http_proxy, 'https': https_proxy}} if ( |
| http_proxy is not None) else None |
| |
| fs = s3fs.S3FileSystem(key=key, |
| secret=secret, |
| client_kwargs={'endpoint_url': endpoint}, |
| config_kwargs=config_kwargs) |
| |
| self._filesystem = pyarrow.filesystem.S3FSWrapper(fs) |
| |
| else: |
| # Case 6 |
| raise ValueError('Unsupported scheme in dataset url {}. ' |
| 'Currently, only "file" and "hdfs" are supported.'.format(self._parsed_dataset_url.scheme)) |
| |
| def parsed_dataset_url(self): |
| """ |
| :return: The urlparse'd dataset_url |
| """ |
| return self._parsed_dataset_url |
| |
| def get_dataset_path(self): |
| """ |
| The dataset path is different than the one in `_parsed_dataset_url` for some filesystems. |
| For example s3fs expects the bucket name to be included in the path and doesn't support |
| paths that start with a `/` |
| """ |
| if isinstance(self._filesystem, pyarrow.filesystem.S3FSWrapper): |
| # s3fs expects paths of the form `bucket/path` |
| return self._parsed_dataset_url.netloc + self._parsed_dataset_url.path |
| |
| return self._parsed_dataset_url.path |
| |
| def filesystem(self): |
| """ |
| :return: The pyarrow filesystem object |
| """ |
| return self._filesystem |