blob: 3cefb6ca30d8a2e1384a8c36adf9c474b83837b9 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Integration tests for gcsfilesystem module.
Runs tests against Google Cloud Storage service.
Instantiates a TestPipeline to get options such as GCP project name, but
doesn't actually start a Beam pipeline or test any specific runner.
Run the following in 'sdks/python' directory to run these tests manually:
scripts/run_integration_test.sh \
--test_opts apache_beam/io/gcp/gcsfilesystem_integration_test.py
"""
# pytype: skip-file
import logging
import unittest
import uuid
import pytest
from apache_beam.io.filesystem import BeamIOError
from apache_beam.io.filesystems import FileSystems
from apache_beam.testing.test_pipeline import TestPipeline
try:
from apache_beam.io.gcp.gcsfilesystem import GCSFileSystem
fs_not_available = False
except ImportError:
fs_not_available = True # type: ignore
@unittest.skipIf(fs_not_available, 'GCP dependencies are not installed')
class GcsFileSystemIntegrationTest(unittest.TestCase):
INPUT_FILE = 'gs://dataflow-samples/shakespeare/kinglear.txt'
def setUp(self):
self.test_pipeline = TestPipeline(is_integration_test=True)
self.runner_name = type(self.test_pipeline.runner).__name__
if self.runner_name != 'TestDataflowRunner':
# This test doesn't run a pipeline, so it doesn't make sense to try it on
# different runners. Running with TestDataflowRunner makes sense since
# it uses GoogleCloudOptions such as 'project'.
raise unittest.SkipTest('This test only runs with TestDataflowRunner.')
self.project = self.test_pipeline.get_option('project')
self.gcs_tempdir = (
self.test_pipeline.get_option('temp_location') + '/gcs_it-' +
str(uuid.uuid4()))
self.fs = GCSFileSystem(self.test_pipeline.get_pipeline_options())
def tearDown(self):
FileSystems.delete([self.gcs_tempdir + '/'])
def _verify_copy(self, src, dest):
self.assertTrue(FileSystems.exists(src), 'src does not exist: %s' % src)
self.assertTrue(FileSystems.exists(dest), 'dest does not exist: %s' % dest)
src_checksum = self.fs.checksum(src)
dest_checksum = self.fs.checksum(dest)
self.assertEqual(src_checksum, dest_checksum)
def _verify_rename(self, src, dest):
self.assertFalse(FileSystems.exists(src), 'file %s not renamed' % src)
self.assertTrue(FileSystems.exists(dest), 'file not renamed to %s' % dest)
def _test_copy(self, name, max_bytes_rewritten_per_call=None, src=None):
src = src or self.INPUT_FILE
dest = self.gcs_tempdir + '/%s' % name
extra_kwargs = {}
if max_bytes_rewritten_per_call is not None:
extra_kwargs['max_bytes_rewritten_per_call'] = (
max_bytes_rewritten_per_call)
self.fs.copy([src], [dest])
self._verify_copy(src, dest)
@pytest.mark.it_postcommit
def test_copy(self):
self._test_copy("test_copy")
@pytest.mark.it_postcommit
def test_rename(self):
num_copies = 10
srcs = [self.INPUT_FILE] * num_copies
dests = [
self.gcs_tempdir + '/%s_%d' % (self.INPUT_FILE, i)
for i in range(num_copies)
]
self.fs.copy(srcs, dests)
new_names = [
self.gcs_tempdir + '/%s_%d' % ("renamed", i) for i in range(num_copies)
]
self.fs.rename(dests, new_names)
for _old, _new in list(zip(dests, new_names)):
self._verify_rename(_old, _new)
@pytest.mark.it_postcommit
def test_rename_error(self):
num_copies = 10
srcs = [self.INPUT_FILE] * num_copies
dests = [
self.gcs_tempdir + '/%s_%d' % (self.INPUT_FILE, i)
for i in range(num_copies)
]
self.fs.copy(srcs, dests)
new_names = [
self.gcs_tempdir + '/%s_%d' % ("renamed", i) for i in range(num_copies)
]
# corrupt one of the destination names
bad_name = self.gcs_tempdir + '/errorbadwrong'
dests[int(num_copies / 2)] = bad_name
with self.assertRaises(BeamIOError):
self.fs.rename(dests, new_names)
for _old, _new in list(zip(dests, new_names)):
if _old != bad_name:
self._verify_rename(_old, _new)
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
unittest.main()