blob: 8c1edda746608dc7f3b348ecb86197bfe4a7cabe [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
import unittest
from pyarrow.filesystem import LocalFileSystem, S3FSWrapper
from pyarrow.lib import ArrowIOError
from six.moves.urllib.parse import urlparse
from petastorm.hdfs.tests.test_hdfs_namenode import HC, MockHadoopConfiguration, \
MockHdfs, MockHdfsConnector
from pycarbon.core.carbon_fs_utils import CarbonFilesystemResolver
from pycarbon.tests import access_key, secret_key, endpoint
ABS_PATH = '/abs/path'
class FilesystemResolverTest(unittest.TestCase):
"""
Checks the full filesystem resolution functionality, exercising each URL interpretation case.
"""
@classmethod
def setUpClass(cls):
cls.mock = MockHdfsConnector()
def setUp(self):
"""Initializes a mock hadoop config and populate with basic properties."""
# Reset counters in mock connector
self.mock.reset()
self._hadoop_configuration = MockHadoopConfiguration()
self._hadoop_configuration.set('fs.defaultFS', HC.FS_WARP_TURTLE)
self._hadoop_configuration.set('dfs.ha.namenodes.{}'.format(HC.WARP_TURTLE), 'nn2,nn1')
self._hadoop_configuration.set('dfs.namenode.rpc-address.{}.nn1'.format(HC.WARP_TURTLE), HC.WARP_TURTLE_NN1)
self._hadoop_configuration.set('dfs.namenode.rpc-address.{}.nn2'.format(HC.WARP_TURTLE), HC.WARP_TURTLE_NN2)
def test_error_url_cases(self):
"""Various error cases that result in exception raised."""
# Case 1: Schemeless path asserts
with self.assertRaises(ValueError):
CarbonFilesystemResolver(ABS_PATH, {})
# Case 4b: HDFS default path case with NO defaultFS
with self.assertRaises(RuntimeError):
CarbonFilesystemResolver('hdfs:///some/path', {})
# Case 4b: Using `default` as host, while apparently a pyarrow convention, is NOT valid
with self.assertRaises(ArrowIOError):
CarbonFilesystemResolver('hdfs://default', {})
# Case 5: other schemes result in ValueError; urlparse to cover an else branch!
with self.assertRaises(ValueError):
CarbonFilesystemResolver(urlparse('http://foo/bar'), {})
with self.assertRaises(ValueError):
CarbonFilesystemResolver(urlparse('ftp://foo/bar'), {})
with self.assertRaises(ValueError):
CarbonFilesystemResolver(urlparse('ssh://foo/bar'), {})
# s3 paths must have the bucket as the netloc
with self.assertRaises(ValueError):
CarbonFilesystemResolver(urlparse('s3:///foo/bar'), {})
def test_file_url(self):
""" Case 2: File path, agnostic to content of hadoop configuration."""
suj = CarbonFilesystemResolver('file://{}'.format(ABS_PATH), hadoop_configuration=self._hadoop_configuration,
connector=self.mock)
self.assertTrue(isinstance(suj.filesystem(), LocalFileSystem))
self.assertEqual('', suj.parsed_dataset_url().netloc)
self.assertEqual(ABS_PATH, suj.get_dataset_path())
def test_hdfs_url_with_nameservice(self):
""" Case 3a: HDFS nameservice."""
suj = CarbonFilesystemResolver(dataset_url=HC.WARP_TURTLE_PATH, hadoop_configuration=self._hadoop_configuration,
connector=self.mock)
self.assertEqual(MockHdfs, type(suj.filesystem()._hdfs))
self.assertEqual(HC.WARP_TURTLE, suj.parsed_dataset_url().netloc)
self.assertEqual(1, self.mock.connect_attempted(HC.WARP_TURTLE_NN2))
self.assertEqual(0, self.mock.connect_attempted(HC.WARP_TURTLE_NN1))
self.assertEqual(0, self.mock.connect_attempted(HC.DEFAULT_NN))
def test_hdfs_url_no_nameservice(self):
""" Case 3b: HDFS with no nameservice should connect to default namenode."""
suj = CarbonFilesystemResolver(dataset_url='hdfs:///some/path', hadoop_configuration=self._hadoop_configuration,
connector=self.mock)
self.assertEqual(MockHdfs, type(suj.filesystem()._hdfs))
self.assertEqual(HC.WARP_TURTLE, suj.parsed_dataset_url().netloc)
# ensure path is preserved in parsed URL
self.assertEqual('/some/path', suj.get_dataset_path())
self.assertEqual(1, self.mock.connect_attempted(HC.WARP_TURTLE_NN2))
self.assertEqual(0, self.mock.connect_attempted(HC.WARP_TURTLE_NN1))
self.assertEqual(0, self.mock.connect_attempted(HC.DEFAULT_NN))
def test_hdfs_url_direct_namenode(self):
""" Case 4: direct namenode."""
suj = CarbonFilesystemResolver('hdfs://{}/path'.format(HC.WARP_TURTLE_NN1),
hadoop_configuration=self._hadoop_configuration,
connector=self.mock)
self.assertEqual(MockHdfs, type(suj.filesystem()))
self.assertEqual(HC.WARP_TURTLE_NN1, suj.parsed_dataset_url().netloc)
self.assertEqual(0, self.mock.connect_attempted(HC.WARP_TURTLE_NN2))
self.assertEqual(1, self.mock.connect_attempted(HC.WARP_TURTLE_NN1))
self.assertEqual(0, self.mock.connect_attempted(HC.DEFAULT_NN))
def test_hdfs_url_direct_namenode_retries(self):
""" Case 4: direct namenode fails first two times thru, but 2nd retry succeeds."""
self.mock.set_fail_n_next_connect(2)
with self.assertRaises(ArrowIOError):
suj = CarbonFilesystemResolver('hdfs://{}/path'.format(HC.WARP_TURTLE_NN2),
hadoop_configuration=self._hadoop_configuration,
connector=self.mock)
self.assertEqual(1, self.mock.connect_attempted(HC.WARP_TURTLE_NN2))
self.assertEqual(0, self.mock.connect_attempted(HC.WARP_TURTLE_NN1))
self.assertEqual(0, self.mock.connect_attempted(HC.DEFAULT_NN))
with self.assertRaises(ArrowIOError):
suj = CarbonFilesystemResolver('hdfs://{}/path'.format(HC.WARP_TURTLE_NN2),
hadoop_configuration=self._hadoop_configuration,
connector=self.mock)
self.assertEqual(2, self.mock.connect_attempted(HC.WARP_TURTLE_NN2))
self.assertEqual(0, self.mock.connect_attempted(HC.WARP_TURTLE_NN1))
self.assertEqual(0, self.mock.connect_attempted(HC.DEFAULT_NN))
# this one should connect "successfully"
suj = CarbonFilesystemResolver('hdfs://{}/path'.format(HC.WARP_TURTLE_NN2),
hadoop_configuration=self._hadoop_configuration,
connector=self.mock)
self.assertEqual(MockHdfs, type(suj.filesystem()))
self.assertEqual(HC.WARP_TURTLE_NN2, suj.parsed_dataset_url().netloc)
self.assertEqual(3, self.mock.connect_attempted(HC.WARP_TURTLE_NN2))
self.assertEqual(0, self.mock.connect_attempted(HC.WARP_TURTLE_NN1))
self.assertEqual(0, self.mock.connect_attempted(HC.DEFAULT_NN))
def test_s3_without_s3fs(self):
with mock.patch.dict('sys.modules', s3fs=None):
# `import s3fs` will fail in this context
with self.assertRaises(ValueError):
CarbonFilesystemResolver(urlparse('s3a://foo/bar'), {})
def test_s3_url(self):
suj = CarbonFilesystemResolver('s3a://bucket{}'.format(ABS_PATH),
key=access_key,
secret=secret_key,
endpoint=endpoint,
hadoop_configuration=self._hadoop_configuration, connector=self.mock)
self.assertTrue(isinstance(suj.filesystem(), S3FSWrapper))
self.assertEqual('bucket', suj.parsed_dataset_url().netloc)
self.assertEqual('bucket' + ABS_PATH, suj.get_dataset_path())
if __name__ == '__main__':
unittest.main()