#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""Unit tests for :class:`HadoopFileSystem`."""

from __future__ import absolute_import

import io
import logging
import posixpath
import sys
import unittest
from builtins import object

from future.utils import itervalues

from apache_beam.io import hadoopfilesystem as hdfs
from apache_beam.io.filesystem import BeamIOError
from apache_beam.options.pipeline_options import HadoopFileSystemOptions
from apache_beam.options.pipeline_options import PipelineOptions


class FakeFile(io.BytesIO):
  """File object for FakeHdfs"""
  __hash__ = None

  def __init__(self, path, mode='', type='FILE'):
    io.BytesIO.__init__(self)

    self.stat = {
        'path': path,
        'mode': mode,
        'type': type,
    }
    self.saved_data = None

  def __eq__(self, other):
    return self.stat == other.stat and self.getvalue() == self.getvalue()

  def __ne__(self, other):
    # TODO(BEAM-5949): Needed for Python 2 compatibility.
    return not self == other

  def close(self):
    self.saved_data = self.getvalue()
    io.BytesIO.close(self)

  def __enter__(self):
    return self

  def __exit__(self, exc_type, exc_val, exc_tb):
    self.close()

  @property
  def size(self):
    if self.closed:
      if self.saved_data is None:
        return 0
      return len(self.saved_data)
    return len(self.getvalue())

  def get_file_status(self):
    """Returns a partial WebHDFS FileStatus object."""
    return {
        hdfs._FILE_STATUS_PATH_SUFFIX: posixpath.basename(self.stat['path']),
        hdfs._FILE_STATUS_LENGTH: self.size,
        hdfs._FILE_STATUS_TYPE: self.stat['type'],
    }

  def get_file_checksum(self):
    """Returns a WebHDFS FileChecksum object."""
    return {
        hdfs._FILE_CHECKSUM_ALGORITHM: 'fake_algo',
        hdfs._FILE_CHECKSUM_BYTES: 'checksum_byte_sequence',
        hdfs._FILE_CHECKSUM_LENGTH: 5,
    }


class FakeHdfsError(Exception):
  """Generic error for FakeHdfs methods."""


class FakeHdfs(object):
  """Fake implementation of ``hdfs.Client``."""

  def __init__(self):
    self.files = {}

  def write(self, path):
    if self.status(path, strict=False) is not None:
      raise FakeHdfsError('Path already exists: %s' % path)

    new_file = FakeFile(path, 'wb')
    self.files[path] = new_file
    return new_file

  def read(self, path, offset=0, length=None):
    old_file = self.files.get(path, None)
    if old_file is None:
      raise FakeHdfsError('Path not found: %s' % path)
    if old_file.stat['type'] == 'DIRECTORY':
      raise FakeHdfsError('Cannot open a directory: %s' % path)
    if not old_file.closed:
      raise FakeHdfsError('File already opened: %s' % path)

    # old_file is closed and can't be operated upon. Return a copy instead.
    new_file = FakeFile(path, 'rb')
    if old_file.saved_data:
      new_file.write(old_file.saved_data)
      new_file.seek(0)
    return new_file

  def list(self, path, status=False):
    if not status:
      raise ValueError('status must be True')
    fs = self.status(path, strict=False)
    if (fs is not None and
        fs[hdfs._FILE_STATUS_TYPE] == hdfs._FILE_STATUS_TYPE_FILE):
      raise ValueError(
          'list must be called on a directory, got file: %s' % path)

    result = []
    for file in itervalues(self.files):
      if file.stat['path'].startswith(path):
        fs = file.get_file_status()
        result.append((fs[hdfs._FILE_STATUS_PATH_SUFFIX], fs))
    return result

  def makedirs(self, path):
    self.files[path] = FakeFile(path, type='DIRECTORY')

  def status(self, path, strict=True):
    f = self.files.get(path)
    if f is None:
      if strict:
        raise FakeHdfsError('Path not found: %s' % path)
      else:
        return f
    return f.get_file_status()

  def delete(self, path, recursive=True):
    if not recursive:
      raise FakeHdfsError('Non-recursive mode not implemented')

    _ = self.status(path)

    for filepath in list(self.files):
      if filepath.startswith(path):
        del self.files[filepath]

  def walk(self, path):
    paths = [path]
    while paths:
      path = paths.pop()
      files = []
      dirs = []
      for full_path in self.files:
        if not full_path.startswith(path):
          continue
        short_path = posixpath.relpath(full_path, path)
        if '/' not in short_path:
          if self.status(full_path)[hdfs._FILE_STATUS_TYPE] == 'DIRECTORY':
            if short_path != '.':
              dirs.append(short_path)
          else:
            files.append(short_path)

      yield path, dirs, files
      paths = [posixpath.join(path, dir) for dir in dirs]

  def rename(self, path1, path2):
    if self.status(path1, strict=False) is None:
      raise FakeHdfsError('Path1 not found: %s' % path1)

    for fullpath in self.files.keys():  # pylint: disable=consider-iterating-dictionary
      if fullpath == path1 or fullpath.startswith(path1 + '/'):
        f = self.files.pop(fullpath)
        newpath = path2 + fullpath[len(path1):]
        f.stat['path'] = newpath
        self.files[newpath] = f

  def checksum(self, path):
    f = self.files.get(path, None)
    if f is None:
      raise FakeHdfsError('Path not found: %s' % path)
    return f.get_file_checksum()


class HadoopFileSystemTest(unittest.TestCase):

  @classmethod
  def setUpClass(cls):
    # Method has been renamed in Python 3
    if sys.version_info[0] < 3:
      cls.assertCountEqual = cls.assertItemsEqual

  def setUp(self):
    self._fake_hdfs = FakeHdfs()
    hdfs.hdfs.InsecureClient = (
        lambda *args, **kwargs: self._fake_hdfs)
    pipeline_options = PipelineOptions()
    hdfs_options = pipeline_options.view_as(HadoopFileSystemOptions)
    hdfs_options.hdfs_host = ''
    hdfs_options.hdfs_port = 0
    hdfs_options.hdfs_user = ''

    self.fs = hdfs.HadoopFileSystem(pipeline_options)
    self.tmpdir = 'hdfs://test_dir'

    for filename in ['old_file1', 'old_file2']:
      url = self.fs.join(self.tmpdir, filename)
      self.fs.create(url).close()

  def test_scheme(self):
    self.assertEqual(self.fs.scheme(), 'hdfs')
    self.assertEqual(hdfs.HadoopFileSystem.scheme(), 'hdfs')

  def test_url_join(self):
    self.assertEqual('hdfs://tmp/path/to/file',
                     self.fs.join('hdfs://tmp/path', 'to', 'file'))
    self.assertEqual('hdfs://tmp/path/to/file',
                     self.fs.join('hdfs://tmp/path', 'to/file'))
    self.assertEqual('hdfs://tmp/path/',
                     self.fs.join('hdfs://tmp/path/', ''))
    self.assertEqual('hdfs://bar',
                     self.fs.join('hdfs://foo', '/bar'))
    with self.assertRaises(ValueError):
      self.fs.join('/no/scheme', 'file')

  def test_url_split(self):
    self.assertEqual(('hdfs://tmp/path/to', 'file'),
                     self.fs.split('hdfs://tmp/path/to/file'))
    self.assertEqual(('hdfs://', 'tmp'), self.fs.split('hdfs://tmp'))
    self.assertEqual(('hdfs://tmp', ''), self.fs.split('hdfs://tmp/'))
    with self.assertRaisesRegexp(ValueError, r'parse'):
      self.fs.split('tmp')

  def test_mkdirs(self):
    url = self.fs.join(self.tmpdir, 't1/t2')
    self.fs.mkdirs(url)
    self.assertTrue(self.fs.exists(url))

  def test_mkdirs_failed(self):
    url = self.fs.join(self.tmpdir, 't1/t2')
    self.fs.mkdirs(url)

    with self.assertRaises(IOError):
      self.fs.mkdirs(url)

  def test_match_file(self):
    expected_files = [self.fs.join(self.tmpdir, filename)
                      for filename in ['old_file1', 'old_file2']]
    match_patterns = expected_files
    result = self.fs.match(match_patterns)
    returned_files = [f.path
                      for match_result in result
                      for f in match_result.metadata_list]
    self.assertCountEqual(expected_files, returned_files)

  def test_match_file_with_limits(self):
    expected_files = [self.fs.join(self.tmpdir, filename)
                      for filename in ['old_file1', 'old_file2']]
    result = self.fs.match([self.tmpdir + '/'], [1])[0]
    files = [f.path for f in result.metadata_list]
    self.assertEquals(len(files), 1)
    self.assertIn(files[0], expected_files)

  def test_match_file_with_zero_limit(self):
    result = self.fs.match([self.tmpdir + '/'], [0])[0]
    self.assertEquals(len(result.metadata_list), 0)

  def test_match_file_empty(self):
    url = self.fs.join(self.tmpdir, 'nonexistent_file')
    result = self.fs.match([url])[0]
    files = [f.path for f in result.metadata_list]
    self.assertEqual(files, [])

  def test_match_file_error(self):
    url = self.fs.join(self.tmpdir, 'old_file1')
    bad_url = 'bad_url'
    with self.assertRaisesRegexp(BeamIOError,
                                 r'^Match operation failed .* %s' % bad_url):
      result = self.fs.match([bad_url, url])[0]
      files = [f.path for f in result.metadata_list]
      self.assertEqual(files, [self.fs._parse_url(url)])

  def test_match_directory(self):
    expected_files = [self.fs.join(self.tmpdir, filename)
                      for filename in ['old_file1', 'old_file2']]

    # Listing without a trailing '/' should return the directory itself and not
    # its contents. The fake HDFS client here has a "sparse" directory
    # structure, so listing without a '/' will return no results.
    result = self.fs.match([self.tmpdir + '/'])[0]
    files = [f.path for f in result.metadata_list]
    self.assertCountEqual(files, expected_files)

  def test_match_directory_trailing_slash(self):
    expected_files = [self.fs.join(self.tmpdir, filename)
                      for filename in ['old_file1', 'old_file2']]

    result = self.fs.match([self.tmpdir + '/'])[0]
    files = [f.path for f in result.metadata_list]
    self.assertCountEqual(files, expected_files)

  def test_create_success(self):
    url = self.fs.join(self.tmpdir, 'new_file')
    handle = self.fs.create(url)
    self.assertIsNotNone(handle)
    url = self.fs._parse_url(url)
    expected_file = FakeFile(url, 'wb')
    self.assertEqual(self._fake_hdfs.files[url], expected_file)

  def test_create_write_read_compressed(self):
    url = self.fs.join(self.tmpdir, 'new_file.gz')

    handle = self.fs.create(url)
    self.assertIsNotNone(handle)
    path = self.fs._parse_url(url)
    expected_file = FakeFile(path, 'wb')
    self.assertEqual(self._fake_hdfs.files[path], expected_file)
    data = b'abc' * 10
    handle.write(data)
    # Compressed data != original data
    self.assertNotEquals(data, self._fake_hdfs.files[path].getvalue())
    handle.close()

    handle = self.fs.open(url)
    read_data = handle.read(len(data))
    self.assertEqual(data, read_data)
    handle.close()

  def test_open(self):
    url = self.fs.join(self.tmpdir, 'old_file1')
    handle = self.fs.open(url)
    expected_data = b''
    data = handle.read()
    self.assertEqual(data, expected_data)

  def test_open_bad_path(self):
    with self.assertRaises(FakeHdfsError):
      self.fs.open(self.fs.join(self.tmpdir, 'nonexistent/path'))

  def _cmpfiles(self, url1, url2):
    with self.fs.open(url1) as f1:
      with self.fs.open(url2) as f2:
        data1 = f1.read()
        data2 = f2.read()
        return data1 == data2

  def test_copy_file(self):
    url1 = self.fs.join(self.tmpdir, 'new_file1')
    url2 = self.fs.join(self.tmpdir, 'new_file2')
    url3 = self.fs.join(self.tmpdir, 'new_file3')
    with self.fs.create(url1) as f1:
      f1.write(b'Hello')
    self.fs.copy([url1, url1], [url2, url3])
    self.assertTrue(self._cmpfiles(url1, url2))
    self.assertTrue(self._cmpfiles(url1, url3))

  def test_copy_file_overwrite_error(self):
    url1 = self.fs.join(self.tmpdir, 'new_file1')
    url2 = self.fs.join(self.tmpdir, 'new_file2')
    with self.fs.create(url1) as f1:
      f1.write(b'Hello')
    with self.fs.create(url2) as f2:
      f2.write(b'nope')
    with self.assertRaisesRegexp(
        BeamIOError, r'already exists.*%s' % posixpath.basename(url2)):
      self.fs.copy([url1], [url2])

  def test_copy_file_error(self):
    url1 = self.fs.join(self.tmpdir, 'new_file1')
    url2 = self.fs.join(self.tmpdir, 'new_file2')
    url3 = self.fs.join(self.tmpdir, 'new_file3')
    url4 = self.fs.join(self.tmpdir, 'new_file4')
    with self.fs.create(url3) as f:
      f.write(b'Hello')
    with self.assertRaisesRegexp(
        BeamIOError, r'^Copy operation failed .*%s.*%s.* not found' % (
            url1, url2)):
      self.fs.copy([url1, url3], [url2, url4])
    self.assertTrue(self._cmpfiles(url3, url4))

  def test_copy_directory(self):
    url_t1 = self.fs.join(self.tmpdir, 't1')
    url_t1_inner = self.fs.join(self.tmpdir, 't1/inner')
    url_t2 = self.fs.join(self.tmpdir, 't2')
    url_t2_inner = self.fs.join(self.tmpdir, 't2/inner')
    self.fs.mkdirs(url_t1)
    self.fs.mkdirs(url_t1_inner)
    self.fs.mkdirs(url_t2)

    url1 = self.fs.join(url_t1_inner, 'f1')
    url2 = self.fs.join(url_t2_inner, 'f1')
    with self.fs.create(url1) as f:
      f.write(b'Hello')

    self.fs.copy([url_t1], [url_t2])
    self.assertTrue(self._cmpfiles(url1, url2))

  def test_copy_directory_overwrite_error(self):
    url_t1 = self.fs.join(self.tmpdir, 't1')
    url_t1_inner = self.fs.join(self.tmpdir, 't1/inner')
    url_t2 = self.fs.join(self.tmpdir, 't2')
    url_t2_inner = self.fs.join(self.tmpdir, 't2/inner')
    self.fs.mkdirs(url_t1)
    self.fs.mkdirs(url_t1_inner)
    self.fs.mkdirs(url_t2)
    self.fs.mkdirs(url_t2_inner)

    url1 = self.fs.join(url_t1, 'f1')
    url1_inner = self.fs.join(url_t1_inner, 'f2')
    url2 = self.fs.join(url_t2, 'f1')
    unused_url2_inner = self.fs.join(url_t2_inner, 'f2')
    url3_inner = self.fs.join(url_t2_inner, 'f3')
    for url in [url1, url1_inner, url3_inner]:
      with self.fs.create(url) as f:
        f.write(b'Hello')
    with self.fs.create(url2) as f:
      f.write(b'nope')

    with self.assertRaisesRegexp(BeamIOError, r'already exists'):
      self.fs.copy([url_t1], [url_t2])

  def test_rename_file(self):
    url1 = self.fs.join(self.tmpdir, 'f1')
    url2 = self.fs.join(self.tmpdir, 'f2')
    with self.fs.create(url1) as f:
      f.write(b'Hello')

    self.fs.rename([url1], [url2])
    self.assertFalse(self.fs.exists(url1))
    self.assertTrue(self.fs.exists(url2))

  def test_rename_file_error(self):
    url1 = self.fs.join(self.tmpdir, 'f1')
    url2 = self.fs.join(self.tmpdir, 'f2')
    url3 = self.fs.join(self.tmpdir, 'f3')
    url4 = self.fs.join(self.tmpdir, 'f4')
    with self.fs.create(url3) as f:
      f.write(b'Hello')

    with self.assertRaisesRegexp(
        BeamIOError, r'^Rename operation failed .*%s.*%s' % (url1, url2)):
      self.fs.rename([url1, url3], [url2, url4])
    self.assertFalse(self.fs.exists(url3))
    self.assertTrue(self.fs.exists(url4))

  def test_rename_directory(self):
    url_t1 = self.fs.join(self.tmpdir, 't1')
    url_t2 = self.fs.join(self.tmpdir, 't2')
    self.fs.mkdirs(url_t1)
    url1 = self.fs.join(url_t1, 'f1')
    url2 = self.fs.join(url_t2, 'f1')
    with self.fs.create(url1) as f:
      f.write(b'Hello')

    self.fs.rename([url_t1], [url_t2])
    self.assertFalse(self.fs.exists(url_t1))
    self.assertTrue(self.fs.exists(url_t2))
    self.assertFalse(self.fs.exists(url1))
    self.assertTrue(self.fs.exists(url2))

  def test_exists(self):
    url1 = self.fs.join(self.tmpdir, 'old_file1')
    url2 = self.fs.join(self.tmpdir, 'nonexistent')
    self.assertTrue(self.fs.exists(url1))
    self.assertFalse(self.fs.exists(url2))

  def test_size(self):
    url = self.fs.join(self.tmpdir, 'f1')
    with self.fs.create(url) as f:
      f.write(b'Hello')
    self.assertEqual(5, self.fs.size(url))

  def test_checksum(self):
    url = self.fs.join(self.tmpdir, 'f1')
    with self.fs.create(url) as f:
      f.write(b'Hello')
    self.assertEqual('fake_algo-5-checksum_byte_sequence',
                     self.fs.checksum(url))

  def test_delete_file(self):
    url = self.fs.join(self.tmpdir, 'old_file1')

    self.assertTrue(self.fs.exists(url))
    self.fs.delete([url])
    self.assertFalse(self.fs.exists(url))

  def test_delete_dir(self):
    url_t1 = self.fs.join(self.tmpdir, 'new_dir1')
    url_t2 = self.fs.join(url_t1, 'new_dir2')
    url1 = self.fs.join(url_t2, 'new_file1')
    url2 = self.fs.join(url_t2, 'new_file2')
    self.fs.mkdirs(url_t1)
    self.fs.mkdirs(url_t2)
    self.fs.create(url1).close()
    self.fs.create(url2).close()

    self.assertTrue(self.fs.exists(url1))
    self.assertTrue(self.fs.exists(url2))
    self.fs.delete([url_t1])
    self.assertFalse(self.fs.exists(url_t1))
    self.assertFalse(self.fs.exists(url_t2))
    self.assertFalse(self.fs.exists(url2))
    self.assertFalse(self.fs.exists(url1))

  def test_delete_error(self):
    url1 = self.fs.join(self.tmpdir, 'nonexistent')
    url2 = self.fs.join(self.tmpdir, 'old_file1')

    self.assertTrue(self.fs.exists(url2))
    path1 = self.fs._parse_url(url1)
    with self.assertRaisesRegexp(BeamIOError,
                                 r'^Delete operation failed .* %s' % path1):
      self.fs.delete([url1, url2])
    self.assertFalse(self.fs.exists(url2))


class HadoopFileSystemRuntimeValueProviderTest(unittest.TestCase):
  """Tests pipeline_options, in the form of a
  RuntimeValueProvider.runtime_options object."""

  def test_dict_options(self):
    self._fake_hdfs = FakeHdfs()
    hdfs.hdfs.InsecureClient = (
        lambda *args, **kwargs: self._fake_hdfs)
    pipeline_options = {
        'hdfs_host': '',
        'hdfs_port': 0,
        'hdfs_user': '',
    }

    self.fs = hdfs.HadoopFileSystem(pipeline_options=pipeline_options)

  def test_dict_options_missing(self):
    self._fake_hdfs = FakeHdfs()
    hdfs.hdfs.InsecureClient = (
        lambda *args, **kwargs: self._fake_hdfs)

    with self.assertRaisesRegexp(ValueError, r'hdfs_host'):
      self.fs = hdfs.HadoopFileSystem(
          pipeline_options={
              'hdfs_port': 0,
              'hdfs_user': '',
          }
      )

    with self.assertRaisesRegexp(ValueError, r'hdfs_port'):
      self.fs = hdfs.HadoopFileSystem(
          pipeline_options={
              'hdfs_host': '',
              'hdfs_user': '',
          }
      )

    with self.assertRaisesRegexp(ValueError, r'hdfs_user'):
      self.fs = hdfs.HadoopFileSystem(
          pipeline_options={
              'hdfs_host': '',
              'hdfs_port': 0,
          }
      )


if __name__ == '__main__':
  logging.getLogger().setLevel(logging.INFO)
  unittest.main()
