blob: f5da9b60dbd6e2530606406018de0cc490bfc56f [file]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Integration tests for gcsio module.
Runs tests against Google Cloud Storage service.
Instantiates a TestPipeline to get options such as GCP project name, but
doesn't actually start a Beam pipeline or test any specific runner.
Run the following in 'sdks/python' directory to run these tests manually:
scripts/run_integration_test.sh \
--test_opts apache_beam/io/gcp/gcsio_integration_test.py
"""
# pytype: skip-file
import logging
import time
import unittest
import uuid
import zlib
import mock
import pytest
from parameterized import parameterized
from parameterized import parameterized_class
from apache_beam import Create
from apache_beam.io.filesystem import CompressionTypes
from apache_beam.io.filesystems import FileSystems
from apache_beam.io.textio import ReadAllFromText
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to
from apache_beam.transforms.combiners import Count
try:
from apache_beam.io.gcp import gcsio
except ImportError:
gcsio = None # type: ignore
try:
from google.api_core.exceptions import NotFound
except ImportError:
NotFound = None
# Number of seconds to wait for bucket deletion or creation to propagate.
WAIT_BUCKET_PROPAGATION_SECONDS = 60
@unittest.skipIf(gcsio is None, 'GCP dependencies are not installed')
@parameterized_class(
('no_gcsio_throttling_counter', 'enable_gcsio_blob_generation'),
[(False, False), (False, True), (True, False), (True, True)])
class GcsIOIntegrationTest(unittest.TestCase):
INPUT_FILE = 'gs://dataflow-samples/shakespeare/kinglear.txt'
def setUp(self):
self.test_pipeline = TestPipeline(is_integration_test=True)
self.runner_name = type(self.test_pipeline.runner).__name__
if self.runner_name != 'TestDataflowRunner':
# This test doesn't run a pipeline, so it doesn't make sense to try it on
# different runners. Running with TestDataflowRunner makes sense since
# it uses GoogleCloudOptions such as 'project'.
raise unittest.SkipTest('This test only runs with TestDataflowRunner.')
self.project = self.test_pipeline.get_option('project')
self.gcs_tempdir = (
self.test_pipeline.get_option('temp_location') + '/gcs_it-' +
str(uuid.uuid4()))
def tearDown(self):
FileSystems.delete([self.gcs_tempdir + '/'])
def _verify_copy(self, src, dest, dest_kms_key_name=None):
self.assertTrue(
FileSystems.exists(src), 'source file does not exist: %s' % src)
self.assertTrue(
FileSystems.exists(dest),
'copied file not present in destination: %s' % dest)
src_checksum = self.gcsio.checksum(src)
dest_checksum = self.gcsio.checksum(dest)
self.assertEqual(src_checksum, dest_checksum)
actual_dest_kms_key = self.gcsio.kms_key(dest)
if actual_dest_kms_key is None:
self.assertEqual(actual_dest_kms_key, dest_kms_key_name)
else:
self.assertTrue(
actual_dest_kms_key.startswith(dest_kms_key_name),
"got: %s, wanted startswith: %s" %
(actual_dest_kms_key, dest_kms_key_name))
@pytest.mark.it_postcommit
def test_copy(self):
self.gcsio = gcsio.GcsIO(
pipeline_options={
"no_gcsio_throttling_counter": self.no_gcsio_throttling_counter,
"enable_gcsio_blob_generation": self.enable_gcsio_blob_generation
})
src = self.INPUT_FILE
dest = self.gcs_tempdir + '/test_copy'
self.gcsio.copy(src, dest)
self._verify_copy(src, dest)
unknown_src = self.test_pipeline.get_option('temp_location') + \
'/gcs_it-' + str(uuid.uuid4())
with self.assertRaises(NotFound):
self.gcsio.copy(unknown_src, dest)
@pytest.mark.it_postcommit
def test_copy_and_delete(self):
self.gcsio = gcsio.GcsIO(
pipeline_options={
"no_gcsio_throttling_counter": self.no_gcsio_throttling_counter,
"enable_gcsio_blob_generation": self.enable_gcsio_blob_generation
})
src = self.INPUT_FILE
dest = self.gcs_tempdir + '/test_copy'
self.gcsio.copy(src, dest)
self._verify_copy(src, dest)
self.gcsio.delete(dest)
# no exception if we delete an nonexistent file.
self.gcsio.delete(dest)
@pytest.mark.it_postcommit
def test_batch_copy_and_delete(self):
self.gcsio = gcsio.GcsIO(
pipeline_options={
"no_gcsio_throttling_counter": self.no_gcsio_throttling_counter,
"enable_gcsio_blob_generation": self.enable_gcsio_blob_generation
})
num_copies = 10
srcs = [self.INPUT_FILE] * num_copies
dests = [
self.gcs_tempdir + '/test_copy_batch_%d' % i for i in range(num_copies)
]
src_dest_pairs = list(zip(srcs, dests))
copy_results = self.gcsio.copy_batch(src_dest_pairs)
self.assertEqual(len(copy_results), len(src_dest_pairs))
for pair, result in list(zip(src_dest_pairs, copy_results)):
self._verify_copy(pair[0], pair[1])
self.assertEqual(
pair[0],
result[0],
'copy source %s does not match %s' % (pair[0], str(result)))
self.assertEqual(
pair[1],
result[1],
'copy destination %s does not match %s' % (pair[1], result[1]))
self.assertFalse(
result[2],
'response code %s indicates that copy operation did not succeed' %
result[2])
delete_results = self.gcsio.delete_batch(dests)
self.assertEqual(len(delete_results), len(dests))
for dest, result in list(zip(dests, delete_results)):
self.assertFalse(
FileSystems.exists(dest), 'deleted file still exists: %s' % dest)
self.assertEqual(
dest,
result[0],
'delete path %s does not match %s' % (dest, result[0]))
self.assertFalse(
result[1],
'response code %s indicates that delete operation did not succeed' %
result[1])
redelete_results = self.gcsio.delete_batch(dests)
for dest, result in list(zip(dests, redelete_results)):
self.assertFalse(
result[1], 're-delete should not throw error: %s' % result[1])
@pytest.mark.it_postcommit
@mock.patch('apache_beam.io.gcp.gcsio.default_gcs_bucket_name')
@unittest.skipIf(NotFound is None, 'GCP dependencies are not installed')
def test_create_default_bucket(self, mock_default_gcs_bucket_name):
self.gcsio = gcsio.GcsIO()
google_cloud_options = self.test_pipeline.options.view_as(
GoogleCloudOptions)
# overwrite kms option here, because get_or_create_default_gcs_bucket()
# requires this option unset.
google_cloud_options.dataflow_kms_key = None
import random
from hashlib import blake2b
# Add a random number to avoid collision if multiple test instances
# are run at the same time. To avoid too many dangling buckets if bucket
# removal fails, we limit the max number of possible bucket names in this
# test to 1000.
overridden_bucket_name = 'gcsio-it-%d-%s-%s-%d' % (
random.randint(0, 999),
google_cloud_options.region,
blake2b(google_cloud_options.project.encode('utf8'),
digest_size=4).hexdigest(),
int(time.time()))
mock_default_gcs_bucket_name.return_value = overridden_bucket_name
# remove the existing bucket with the same name as the default bucket
existing_bucket = self.gcsio.get_bucket(overridden_bucket_name)
if existing_bucket:
try:
existing_bucket.delete()
time.sleep(WAIT_BUCKET_PROPAGATION_SECONDS)
except NotFound:
# Bucket existence check from get_bucket may be inaccurate due to gcs
# cache or delay
pass
bucket = gcsio.get_or_create_default_gcs_bucket(google_cloud_options)
time.sleep(WAIT_BUCKET_PROPAGATION_SECONDS)
self.assertIsNotNone(bucket)
self.assertEqual(bucket.name, overridden_bucket_name)
# verify soft delete policy is disabled by default in the default bucket
# after creation
self.assertEqual(bucket.soft_delete_policy.retention_duration_seconds, 0)
max_retries = 5
retry_delay = 1
existing_bucket = None
for attempt in range(max_retries):
try:
existing_bucket = self.gcsio.get_bucket(overridden_bucket_name)
break
except NotFound:
if attempt < max_retries - 1:
time.sleep(retry_delay)
retry_delay *= 2
else:
existing_bucket = None
if existing_bucket:
try:
existing_bucket.delete()
except NotFound:
pass
time.sleep(WAIT_BUCKET_PROPAGATION_SECONDS)
bucket_after_delete = self.gcsio.get_bucket(overridden_bucket_name)
self.assertIsNone(bucket_after_delete)
class GcsIOReadGzipTest(unittest.TestCase):
gcs_path_prefix = "gs://apache-beam-samples/textio/"
gzip_test_files = [
"textio-test-data.content-type-gzip-content-encoding-gzip.1k.txt.gz",
"textio-test-data.content-type-gzip-content-encoding-none.1k.txt.gz",
"textio-test-data.content-type-none-content-encoding-gzip.1k.txt.gz",
"textio-test-data.content-type-none-content-encoding-none.1k.txt.gz",
"textio-test-data.content-type-text-content-encoding-gzip.1k.txt.gz",
"textio-test-data.content-type-text-content-encoding-none.1k.txt.gz",
"textio-test-data.default.1k.txt",
"textio-test-data.default.1k.txt.gz",
"textio-test-data.gzip-local.1k.txt.gz",
]
@parameterized.expand([
(gzip_test_files[0], CompressionTypes.UNCOMPRESSED, NotImplementedError),
(gzip_test_files[0], CompressionTypes.GZIP, NotImplementedError),
(gzip_test_files[0], CompressionTypes.AUTO, NotImplementedError),
(gzip_test_files[1], CompressionTypes.UNCOMPRESSED, UnicodeDecodeError),
(gzip_test_files[1], CompressionTypes.GZIP, None),
(gzip_test_files[1], CompressionTypes.AUTO, None),
(gzip_test_files[2], CompressionTypes.UNCOMPRESSED, UnicodeDecodeError),
(gzip_test_files[2], CompressionTypes.GZIP, None),
(gzip_test_files[2], CompressionTypes.AUTO, None),
(gzip_test_files[3], CompressionTypes.UNCOMPRESSED, UnicodeDecodeError),
(gzip_test_files[3], CompressionTypes.GZIP, None),
(gzip_test_files[3], CompressionTypes.AUTO, None),
(gzip_test_files[4], CompressionTypes.UNCOMPRESSED, UnicodeDecodeError),
(gzip_test_files[4], CompressionTypes.GZIP, None),
(gzip_test_files[4], CompressionTypes.AUTO, None),
(gzip_test_files[5], CompressionTypes.UNCOMPRESSED, UnicodeDecodeError),
(gzip_test_files[5], CompressionTypes.GZIP, None),
(gzip_test_files[5], CompressionTypes.AUTO, None),
(gzip_test_files[6], CompressionTypes.UNCOMPRESSED, None),
(gzip_test_files[6], CompressionTypes.GZIP, zlib.error),
(gzip_test_files[6], CompressionTypes.AUTO, None),
(gzip_test_files[7], CompressionTypes.UNCOMPRESSED, UnicodeDecodeError),
(gzip_test_files[7], CompressionTypes.GZIP, None),
(gzip_test_files[7], CompressionTypes.AUTO, None),
(gzip_test_files[8], CompressionTypes.UNCOMPRESSED, UnicodeDecodeError),
(gzip_test_files[8], CompressionTypes.GZIP, None),
(gzip_test_files[8], CompressionTypes.AUTO, None),
])
@unittest.skipIf(NotFound is None, 'GCP dependencies are not installed')
def test_read_gzip_file(self, file_name, compression_type, exception):
p = TestPipeline(runner="Direct", is_integration_test=True)
r = (
p
| Create([f"{GcsIOReadGzipTest.gcs_path_prefix}{file_name}"])
| "Read File from GCS" >>
ReadAllFromText(compression_type=compression_type)
| Count.Globally())
assert_that(r, equal_to([1000]))
if exception is None:
result = p.run()
result.wait_until_finish()
else:
with self.assertRaises(exception):
result = p.run()
result.wait_until_finish()
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
unittest.main()