blob: 4c18647729e33fc6ff486fcb639c7df1e6e3f558 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Tests for Google Cloud Storage client."""
# pytype: skip-file
import logging
import os
import random
import unittest
from datetime import datetime
import mock
from apache_beam import version as beam_version
from apache_beam.metrics.execution import MetricsContainer
from apache_beam.metrics.execution import MetricsEnvironment
from apache_beam.metrics.metricbase import MetricName
from apache_beam.pipeline import PipelineOptions
from apache_beam.runners.worker import statesampler
from apache_beam.utils import counters
# pylint: disable=wrong-import-order, wrong-import-position
try:
from apache_beam.io.gcp import gcsio
from apache_beam.io.gcp.gcsio_retry import DEFAULT_RETRY_WITH_THROTTLING_COUNTER
from google.cloud.exceptions import BadRequest, NotFound
except ImportError:
NotFound = None
# pylint: enable=wrong-import-order, wrong-import-position
DEFAULT_GCP_PROJECT = 'apache-beam-testing'
class FakeGcsClient(object):
# Fake storage client.
def __init__(self):
self.buckets = {}
def _add_bucket(self, bucket):
self.buckets[bucket.name] = bucket
return self.buckets[bucket.name]
def bucket(self, name):
return FakeBucket(self, name)
def create_bucket(self, name):
return self._add_bucket(self.bucket(name))
def get_bucket(self, name):
if name in self.buckets:
return self.buckets[name]
else:
raise NotFound("Bucket not found")
def lookup_bucket(self, name):
if name in self.buckets:
return self.buckets[name]
else:
return self.create_bucket(name)
def batch(self, raise_exception=True):
# Return a mock object configured to act as a context manager
# and provide the necessary _responses attribute after __exit__.
# test_delete performs 3 deletions.
num_expected_responses = 3
mock_batch = mock.Mock()
# Configure the mock responses (assuming success for test_delete)
# These need to be available *after* the 'with' block finishes.
# We'll store them temporarily and assign in __exit__.
successful_responses = [
mock.Mock(status_code=204) for _ in range(num_expected_responses)
]
# Define the exit logic
def mock_exit_logic(exc_type, exc_val, exc_tb):
# Assign responses to the mock instance itself
# so they are available after the 'with' block.
mock_batch._responses = successful_responses
# Configure the mock to behave like a context manager
mock_batch.configure_mock(
__enter__=mock.Mock(return_value=mock_batch),
__exit__=mock.Mock(side_effect=mock_exit_logic))
# The loop inside _batch_with_retry calls fn(request) for each item.
# The real batch object might have methods like add() or similar,
# but the core logic in gcsio.py calls the passed function `fn` directly
# within the `with` block. So, no specific action methods seem needed
# on the mock_batch itself for this test case.
return mock_batch
def add_file(self, bucket, blob, contents):
folder = self.lookup_bucket(bucket)
holder = folder.lookup_blob(blob)
holder.contents = contents
def get_file(self, bucket, blob):
folder = self.get_bucket(bucket.name)
holder = folder.get_blob(blob.name)
return holder
def list_blobs(self, bucket_or_path, prefix=None, **unused_kwargs):
bucket = self.get_bucket(bucket_or_path.name)
if not prefix:
return list(bucket.blobs.values())
else:
output = []
for name in list(bucket.blobs):
if name[0:len(prefix)] == prefix:
output.append(bucket.blobs[name])
return output
class FakeBucket(object):
#Fake bucket for storing test blobs locally.
def __init__(self, client, name):
self.client = client
self.name = name
self.blobs = {}
self.default_kms_key_name = None
def _get_canonical_bucket(self):
return self.client.get_bucket(self.name)
def _create_blob(self, name):
return FakeBlob(name, self)
def add_blob(self, blob):
bucket = self._get_canonical_bucket()
bucket.blobs[blob.name] = blob
return bucket.blobs[blob.name]
def blob(self, name):
return self._create_blob(name)
def copy_blob(self, blob, dest, new_name=None, **kwargs):
if self.get_blob(blob.name) is None:
raise NotFound("source blob not found")
if not new_name:
new_name = blob.name
new_blob = FakeBlob(new_name, dest)
dest.add_blob(new_blob)
return new_blob
def get_blob(self, blob_name, **unused_kwargs):
bucket = self._get_canonical_bucket()
if blob_name in bucket.blobs:
return bucket.blobs[blob_name]
else:
return None
def lookup_blob(self, name):
bucket = self._get_canonical_bucket()
if name in bucket.blobs:
return bucket.blobs[name]
else:
new_blob = bucket._create_blob(name)
return bucket.add_blob(new_blob)
def set_default_kms_key_name(self, name):
self.default_kms_key_name = name
def delete_blob(self, name, **kwargs):
bucket = self._get_canonical_bucket()
if name in bucket.blobs:
del bucket.blobs[name]
def list_blobs(self, prefix=None, **kwargs):
bucket = self._get_canonical_bucket()
return self.client.list_blobs(bucket, prefix, **kwargs)
class FakeBlob(object):
def __init__(
self,
name,
bucket,
size=0,
contents=None,
generation=1,
crc32c=None,
kms_key_name=None,
updated=None,
fail_when_getting_metadata=False,
fail_when_reading=False):
self.name = name
self.bucket = bucket
self.size = size
self.contents = contents
self._generation = generation
self.crc32c = crc32c
self.kms_key_name = kms_key_name
self.updated = updated
self._fail_when_getting_metadata = fail_when_getting_metadata
self._fail_when_reading = fail_when_reading
self.generation = random.randint(0, (1 << 63) - 1)
self.content_encoding = None
self.content_type = None
def reload(self):
pass
def delete(self):
self.bucket.delete_blob(self.name)
def download_as_bytes(self, **kwargs):
blob = self.bucket.get_blob(self.name)
if blob is None:
raise NotFound("blob not found")
return blob.contents
def __eq__(self, other):
return self.bucket.get_blob(self.name) is other.bucket.get_blob(other.name)
def exists(self, **kwargs):
return self.bucket.get_blob(self.name) is not None
@unittest.skipIf(NotFound is None, 'GCP dependencies are not installed')
class TestGCSPathParser(unittest.TestCase):
BAD_GCS_PATHS = [
'gs://',
'gs://bucket',
'gs:///name',
'gs:///',
'gs:/blah/bucket/name',
]
def test_gcs_path(self):
self.assertEqual(
gcsio.parse_gcs_path('gs://bucket/name'), ('bucket', 'name'))
self.assertEqual(
gcsio.parse_gcs_path('gs://bucket/name/sub'), ('bucket', 'name/sub'))
def test_bad_gcs_path(self):
for path in self.BAD_GCS_PATHS:
self.assertRaises(ValueError, gcsio.parse_gcs_path, path)
self.assertRaises(ValueError, gcsio.parse_gcs_path, 'gs://bucket/')
def test_gcs_path_object_optional(self):
self.assertEqual(
gcsio.parse_gcs_path('gs://bucket/name', object_optional=True),
('bucket', 'name'))
self.assertEqual(
gcsio.parse_gcs_path('gs://bucket/', object_optional=True),
('bucket', ''))
def test_bad_gcs_path_object_optional(self):
for path in self.BAD_GCS_PATHS:
self.assertRaises(ValueError, gcsio.parse_gcs_path, path, True)
class SampleOptions(object):
def __init__(self, project, region, kms_key=None):
self.project = DEFAULT_GCP_PROJECT
self.region = region
self.dataflow_kms_key = kms_key
_DEFAULT_UNIVERSE_DOMAIN = "googleapis.com"
def _make_credentials(project=None, universe_domain=_DEFAULT_UNIVERSE_DOMAIN):
import google.auth.credentials
if project is not None:
return mock.Mock(
spec=google.auth.credentials.Credentials,
project_id=project,
universe_domain=universe_domain,
)
return mock.Mock(
spec=google.auth.credentials.Credentials, universe_domain=universe_domain)
@unittest.skipIf(NotFound is None, 'GCP dependencies are not installed')
class TestGCSIO(unittest.TestCase):
def _insert_random_file(
self,
client,
path,
size=0,
crc32c=None,
kms_key_name=None,
updated=None,
fail_when_getting_metadata=False,
fail_when_reading=False):
bucket_name, blob_name = gcsio.parse_gcs_path(path)
bucket = client.lookup_bucket(bucket_name)
blob = FakeBlob(
blob_name,
bucket,
size=size,
contents=os.urandom(size),
crc32c=crc32c,
kms_key_name=kms_key_name,
updated=updated,
fail_when_getting_metadata=fail_when_getting_metadata,
fail_when_reading=fail_when_reading)
bucket.add_blob(blob)
return blob
def setUp(self):
self.client = FakeGcsClient()
self.gcs = gcsio.GcsIO(self.client)
self.client.create_bucket("gcsio-test")
def test_read_bucket_metric(self):
sampler = statesampler.StateSampler('', counters.CounterFactory())
statesampler.set_current_tracker(sampler)
state1 = sampler.scoped_state(
'mystep', 'myState', metrics_container=MetricsContainer('mystep'))
try:
sampler.start()
with state1:
client = FakeGcsClient()
gcs = gcsio.GcsIO(client, {"enable_bucket_read_metric_counter": True})
client.create_bucket("gcsio-test")
file_name = 'gs://gcsio-test/dummy_file'
file_size = 1234
self._insert_random_file(client, file_name, file_size)
reader = gcs.open(file_name, 'r')
reader.read()
container = MetricsEnvironment.current_container()
self.assertEqual(
container.get_counter(
MetricName(
"apache_beam.io.gcp.gcsio.BeamBlobReader",
"GCS_read_bytes_counter_gcsio-test")).get_cumulative(),
file_size)
finally:
sampler.stop()
def test_write_bucket_metric(self):
sampler = statesampler.StateSampler('', counters.CounterFactory())
statesampler.set_current_tracker(sampler)
state1 = sampler.scoped_state(
'mystep', 'myState', metrics_container=MetricsContainer('mystep'))
try:
sampler.start()
with state1:
client = FakeGcsClient()
gcs = gcsio.GcsIO(client, {"enable_bucket_write_metric_counter": True})
client.create_bucket("gcsio-test")
file_name = 'gs://gcsio-test/dummy_file'
gcsFile = gcs.open(file_name, 'w')
gcsFile.write(str.encode("some text"))
container = MetricsEnvironment.current_container()
self.assertEqual(
container.get_counter(
MetricName(
"apache_beam.io.gcp.gcsio.BeamBlobWriter",
"GCS_write_bytes_counter_gcsio-test")).get_cumulative(),
9)
finally:
sampler.stop()
def test_default_bucket_name(self):
self.assertEqual(
gcsio.default_gcs_bucket_name(DEFAULT_GCP_PROJECT, "us-central1"),
'dataflow-staging-us-central1-77b801c0838aee13391c0d1885860494')
def test_default_bucket_name_failure(self):
self.assertEqual(
gcsio.get_or_create_default_gcs_bucket(
SampleOptions(
DEFAULT_GCP_PROJECT, "us-central1", kms_key="kmskey!")),
None)
def test_exists(self):
file_name = 'gs://gcsio-test/dummy_file'
file_size = 1234
self._insert_random_file(self.client, file_name, file_size)
self.assertFalse(self.gcs.exists(file_name + 'xyz'))
self.assertTrue(self.gcs.exists(file_name))
@mock.patch.object(FakeBlob, 'exists')
def test_exists_failure(self, mock_get):
# Raising an error other than 404. Raising 404 is a valid failure for
# exists() call.
mock_get.side_effect = BadRequest("Try again")
file_name = 'gs://gcsio-test/dummy_file'
file_size = 1234
self._insert_random_file(self.client, file_name, file_size)
with self.assertRaises(BadRequest):
self.gcs.exists(file_name)
def test_checksum(self):
file_name = 'gs://gcsio-test/dummy_file'
file_size = 1234
checksum = 'deadbeef'
self._insert_random_file(self.client, file_name, file_size, crc32c=checksum)
self.assertTrue(self.gcs.exists(file_name))
self.assertEqual(checksum, self.gcs.checksum(file_name))
def test_size(self):
file_name = 'gs://gcsio-test/dummy_file'
file_size = 1234
self._insert_random_file(self.client, file_name, file_size)
self.assertTrue(self.gcs.exists(file_name))
self.assertEqual(1234, self.gcs.size(file_name))
def test_kms_key(self):
file_name = 'gs://gcsio-test/dummy_file'
file_size = 1234
kms_key_name = "dummy"
self._insert_random_file(
self.client, file_name, file_size, kms_key_name=kms_key_name)
self.assertTrue(self.gcs.exists(file_name))
self.assertEqual(kms_key_name, self.gcs.kms_key(file_name))
def test_last_updated(self):
file_name = 'gs://gcsio-test/dummy_file'
file_size = 1234
updated = datetime.fromtimestamp(123456.78)
self._insert_random_file(self.client, file_name, file_size, updated=updated)
self.assertTrue(self.gcs.exists(file_name))
self.assertEqual(
gcsio.GcsIO._updated_to_seconds(updated),
self.gcs.last_updated(file_name))
def test_file_status(self):
file_name = 'gs://gcsio-test/dummy_file'
file_size = 1234
updated = datetime.fromtimestamp(123456.78)
checksum = 'deadbeef'
self._insert_random_file(
self.client, file_name, file_size, updated=updated, crc32c=checksum)
file_checksum = self.gcs.checksum(file_name)
file_status = self.gcs._status(file_name)
self.assertEqual(file_status['size'], file_size)
self.assertEqual(file_status['checksum'], file_checksum)
self.assertEqual(
file_status['updated'], gcsio.GcsIO._updated_to_seconds(updated))
def test_file_mode_calls(self):
file_name = 'gs://gcsio-test/dummy_mode_file'
self._insert_random_file(self.client, file_name)
with mock.patch('apache_beam.io.gcp.gcsio.BeamBlobWriter') as writer:
self.gcs.open(file_name, 'wb')
writer.assert_called()
with mock.patch('apache_beam.io.gcp.gcsio.BeamBlobReader') as reader:
self.gcs.open(file_name, 'rb')
reader.assert_called()
def test_bad_file_modes(self):
file_name = 'gs://gcsio-test/dummy_mode_file'
self._insert_random_file(self.client, file_name)
with self.assertRaises(ValueError):
self.gcs.open(file_name, 'w+')
with self.assertRaises(ValueError):
self.gcs.open(file_name, 'r+b')
def test_delete(self):
# File path.
file_name = 'gs://gcsio-test/delete_me'
file_size = 1024
bucket_name, blob_name = gcsio.parse_gcs_path(file_name)
# Test deletion of non-existent file.
bucket = self.client.get_bucket(bucket_name)
self.gcs.delete(file_name)
# Insert a random file for testing.
self._insert_random_file(self.client, file_name, file_size)
self.assertTrue(blob_name in bucket.blobs)
# Deleting the file.
self.gcs.delete(file_name)
self.assertFalse(blob_name in bucket.blobs)
# Now test deleting a directory (prefix) with multiple files.
prefix = 'gs://gcsio-test/directory_to_delete/'
file_names = [f"{prefix}file1", f"{prefix}file2", f"{prefix}file3"]
blobs = [gcsio.parse_gcs_path(file_name) for file_name in file_names]
# Insert random files under the prefix.
for file_name in file_names:
self._insert_random_file(self.client, file_name, file_size)
# Verify the files exist before deletion
for blob in blobs:
self.assertTrue(blob[1] in bucket.blobs)
# Deleting the directory (all files under the prefix).
self.gcs.delete(prefix, recursive=True)
# Verify that the files are deleted.
for blob in blobs:
self.assertFalse(blob[1] in bucket.blobs)
def test_copy(self):
src_file_name = 'gs://gcsio-test/source'
dest_file_name = 'gs://gcsio-test/dest'
src_bucket_name, src_blob_name = gcsio.parse_gcs_path(src_file_name)
dest_bucket_name, dest_blob_name = gcsio.parse_gcs_path(dest_file_name)
src_bucket = self.client.lookup_bucket(src_bucket_name)
dest_bucket = self.client.lookup_bucket(dest_bucket_name)
file_size = 1024
self._insert_random_file(self.client, src_file_name, file_size)
self.assertTrue(src_blob_name in src_bucket.blobs)
self.assertFalse(dest_blob_name in dest_bucket.blobs)
self.gcs.copy(src_file_name, dest_file_name)
self.assertTrue(src_blob_name in src_bucket.blobs)
self.assertTrue(dest_blob_name in dest_bucket.blobs)
# Test copy of non-existent files.
with self.assertRaises(NotFound):
self.gcs.copy(
'gs://gcsio-test/non-existent',
'gs://gcsio-test/non-existent-destination')
@staticmethod
def _fake_batch_responses(status_codes):
return mock.Mock(
__enter__=mock.Mock(),
__exit__=mock.Mock(),
_responses=[
mock.Mock(
**{
'json.return_value': {
'error': {
'message': 'error'
}
},
'request.method': 'BATCH',
'request.url': 'contentid://None',
},
status_code=code,
) for code in status_codes
],
)
@mock.patch('apache_beam.io.gcp.gcsio.MAX_BATCH_OPERATION_SIZE', 3)
@mock.patch('time.sleep', mock.Mock())
def test_copy_batch(self):
src_dest_pairs = [
(f'gs://source_bucket/file{i}.txt', f'gs://dest_bucket/file{i}.txt')
for i in range(7)
]
gcs_io = gcsio.GcsIO(
storage_client=mock.Mock(
batch=mock.Mock(
side_effect=[
self._fake_batch_responses([200, 404, 429]),
self._fake_batch_responses([429]),
self._fake_batch_responses([429]),
self._fake_batch_responses([200]),
self._fake_batch_responses([200, 429, 200]),
self._fake_batch_responses([200]),
self._fake_batch_responses([200]),
]),
))
results = gcs_io.copy_batch(src_dest_pairs)
expected = [
('gs://source_bucket/file0.txt', 'gs://dest_bucket/file0.txt', None),
('gs://source_bucket/file1.txt', 'gs://dest_bucket/file1.txt', 404),
('gs://source_bucket/file2.txt', 'gs://dest_bucket/file2.txt', None),
('gs://source_bucket/file3.txt', 'gs://dest_bucket/file3.txt', None),
('gs://source_bucket/file4.txt', 'gs://dest_bucket/file4.txt', None),
('gs://source_bucket/file5.txt', 'gs://dest_bucket/file5.txt', None),
('gs://source_bucket/file6.txt', 'gs://dest_bucket/file6.txt', None),
]
self.assertEqual(results, expected)
@mock.patch('time.sleep', mock.Mock())
@mock.patch('time.monotonic', mock.Mock(side_effect=[0, 120]))
def test_copy_batch_timeout_exceeded(self):
src_dest_pairs = [
('gs://source_bucket/file0.txt', 'gs://dest_bucket/file0.txt')
]
gcs_io = gcsio.GcsIO(
storage_client=mock.Mock(
batch=mock.Mock(side_effect=[self._fake_batch_responses([429])])))
results = gcs_io.copy_batch(src_dest_pairs)
expected = [
('gs://source_bucket/file0.txt', 'gs://dest_bucket/file0.txt', 429),
]
self.assertEqual(results, expected)
def test_copytree(self):
src_dir_name = 'gs://gcsio-test/source/'
dest_dir_name = 'gs://gcsio-test/dest/'
file_size = 1024
paths = ['a', 'b/c', 'b/d']
for path in paths:
src_file_name = src_dir_name + path
dest_file_name = dest_dir_name + path
src_bucket_name, src_blob_name = gcsio.parse_gcs_path(src_file_name)
dest_bucket_name, dest_blob_name = gcsio.parse_gcs_path(dest_file_name)
src_bucket = self.client.lookup_bucket(src_bucket_name)
dest_bucket = self.client.lookup_bucket(dest_bucket_name)
file_size = 1024
self._insert_random_file(self.client, src_file_name, file_size)
self.assertTrue(src_blob_name in src_bucket.blobs)
self.assertFalse(dest_blob_name in dest_bucket.blobs)
self.gcs.copytree(src_dir_name, dest_dir_name)
for path in paths:
src_file_name = src_dir_name + path
dest_file_name = dest_dir_name + path
src_bucket_name, src_blob_name = gcsio.parse_gcs_path(src_file_name)
dest_bucket_name, dest_blob_name = gcsio.parse_gcs_path(dest_file_name)
src_bucket = self.client.lookup_bucket(src_bucket_name)
dest_bucket = self.client.lookup_bucket(dest_bucket_name)
self.assertTrue(src_blob_name in src_bucket.blobs)
self.assertTrue(dest_blob_name in dest_bucket.blobs)
def test_rename(self):
src_file_name = 'gs://gcsio-test/source'
dest_file_name = 'gs://gcsio-test/dest'
src_bucket_name, src_blob_name = gcsio.parse_gcs_path(src_file_name)
dest_bucket_name, dest_blob_name = gcsio.parse_gcs_path(dest_file_name)
file_size = 1024
self._insert_random_file(self.client, src_file_name, file_size)
src_bucket = self.client.lookup_bucket(src_bucket_name)
dest_bucket = self.client.lookup_bucket(dest_bucket_name)
self.assertTrue(src_blob_name in src_bucket.blobs)
self.assertFalse(dest_blob_name in dest_bucket.blobs)
self.gcs.rename(src_file_name, dest_file_name)
self.assertFalse(src_blob_name in src_bucket.blobs)
self.assertTrue(dest_blob_name in dest_bucket.blobs)
def test_file_buffered_read_call(self):
file_name = 'gs://gcsio-test/read_line_file'
read_buffer_size = 1024
self._insert_random_file(self.client, file_name, 10240)
bucket_name, blob_name = gcsio.parse_gcs_path(file_name)
bucket = self.client.get_bucket(bucket_name)
blob = bucket.get_blob(blob_name)
with mock.patch('apache_beam.io.gcp.gcsio.BeamBlobReader') as reader:
self.gcs.open(file_name, read_buffer_size=read_buffer_size)
reader.assert_called_with(
blob,
chunk_size=read_buffer_size,
enable_read_bucket_metric=False,
retry=DEFAULT_RETRY_WITH_THROTTLING_COUNTER)
def test_file_write_call(self):
file_name = 'gs://gcsio-test/write_file'
with mock.patch('apache_beam.io.gcp.gcsio.BeamBlobWriter') as writer:
self.gcs.open(file_name, 'w')
writer.assert_called()
def test_list_prefix(self):
bucket_name = 'gcsio-test'
objects = [
('cow/cat/fish', 2),
('cow/cat/blubber', 3),
('cow/dog/blubber', 4),
]
for (object_name, size) in objects:
file_name = 'gs://%s/%s' % (bucket_name, object_name)
self._insert_random_file(self.client, file_name, size)
test_cases = [
(
'gs://gcsio-test/c',
[
('cow/cat/fish', 2),
('cow/cat/blubber', 3),
('cow/dog/blubber', 4),
]),
(
'gs://gcsio-test/cow/',
[
('cow/cat/fish', 2),
('cow/cat/blubber', 3),
('cow/dog/blubber', 4),
]),
('gs://gcsio-test/cow/cat/fish', [
('cow/cat/fish', 2),
]),
]
for file_pattern, expected_object_names in test_cases:
expected_file_names = [('gs://%s/%s' % (bucket_name, object_name), size)
for (object_name, size) in expected_object_names]
self.assertEqual(
set(self.gcs.list_prefix(file_pattern).items()),
set(expected_file_names))
def test_downloader_fail_non_existent_object(self):
file_name = 'gs://gcsio-metrics-test/dummy_mode_file'
with self.assertRaises(NotFound):
with self.gcs.open(file_name, 'r') as f:
f.read(1)
def test_blob_delete(self):
file_name = 'gs://gcsio-test/delete_me'
file_size = 1024
bucket_name, blob_name = gcsio.parse_gcs_path(file_name)
# Test deletion of non-existent file.
bucket = self.client.get_bucket(bucket_name)
self.gcs.delete(file_name)
self._insert_random_file(self.client, file_name, file_size)
self.assertTrue(blob_name in bucket.blobs)
blob = bucket.get_blob(blob_name)
self.assertIsNotNone(blob)
blob.delete()
self.assertFalse(blob_name in bucket.blobs)
@mock.patch('google.cloud._http.JSONConnection._do_request')
@mock.patch('apache_beam.internal.gcp.auth.get_service_credentials')
def test_headers(self, mock_get_service_credentials, mock_do_request):
from apache_beam.internal.gcp.auth import _ApitoolsCredentialsAdapter
mock_get_service_credentials.return_value = _ApitoolsCredentialsAdapter(
_make_credentials("test-project"))
options = PipelineOptions([
"--job_name=test-job-name",
"--gcs_custom_audit_entry=user=test-user-id",
"--gcs_custom_audit_entries={\"id\": \"1234\", \"status\": \"ok\"}"
])
gcs = gcsio.GcsIO(pipeline_options=options)
# no HTTP request when initializing GcsIO
mock_do_request.assert_not_called()
import requests
response = requests.Response()
response.status_code = 200
mock_do_request.return_value = response
# The function of get_bucket() is supposed to send only one HTTP request
gcs.get_bucket("test-bucket")
mock_do_request.assert_called_once()
call_args = mock_do_request.call_args[0]
# Headers are specified as the third argument of
# google.cloud._http.JSONConnection._do_request
actual_headers = call_args[2]
beam_user_agent = "apache-beam/%s (GPN:Beam)" % beam_version.__version__
self.assertIn(beam_user_agent, actual_headers['User-Agent'])
self.assertEqual(actual_headers['x-goog-custom-audit-job'], 'test-job-name')
self.assertEqual(actual_headers['x-goog-custom-audit-user'], 'test-user-id')
self.assertEqual(actual_headers['x-goog-custom-audit-id'], '1234')
self.assertEqual(actual_headers['x-goog-custom-audit-status'], 'ok')
@mock.patch('google.cloud._http.JSONConnection._do_request')
@mock.patch('apache_beam.internal.gcp.auth.get_service_credentials')
def test_create_default_bucket(
self, mock_get_service_credentials, mock_do_request):
from apache_beam.internal.gcp.auth import _ApitoolsCredentialsAdapter
mock_get_service_credentials.return_value = _ApitoolsCredentialsAdapter(
_make_credentials("test-project"))
gcs = gcsio.GcsIO(pipeline_options={"job_name": "test-job-name"})
# no HTTP request when initializing GcsIO
mock_do_request.assert_not_called()
import requests
response = requests.Response()
response.status_code = 200
mock_do_request.return_value = response
# The function of create_bucket() is supposed to send only one HTTP request
gcs.create_bucket("test-bucket", "test-project")
mock_do_request.assert_called_once()
call_args = mock_do_request.call_args[0]
# Request data is specified as the fourth argument of
# google.cloud._http.JSONConnection._do_request
actual_request_data = call_args[3]
import json
request_data_json = json.loads(actual_request_data)
# verify soft delete policy is disabled by default in the bucket creation
# request
self.assertEqual(
request_data_json['softDeletePolicy']['retentionDurationSeconds'], 0)
@mock.patch("apache_beam.io.gcp.gcsio.GcsIO.get_bucket")
def test_is_soft_delete_enabled(self, mock_get_bucket):
bucket = mock.MagicMock()
mock_get_bucket.return_value = bucket
# soft delete policy enabled
bucket.soft_delete_policy.retention_duration_seconds = 1024
self.assertTrue(
self.gcs.is_soft_delete_enabled("gs://beam_with_soft_delete/tmp"))
# soft delete policy disabled
bucket.soft_delete_policy.retention_duration_seconds = 0
self.assertFalse(
self.gcs.is_soft_delete_enabled("gs://beam_without_soft_delete/tmp"))
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
unittest.main()