blob: 3df9355524dd984476cd8b3663a21e7deb9a6bbd [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Tests for Google Cloud Storage client."""
# pytype: skip-file
import datetime
import errno
import io
import logging
import os
import random
import time
import unittest
from email.message import Message
import httplib2
import mock
# Protect against environments where apitools library is not available.
# pylint: disable=wrong-import-order, wrong-import-position
from apache_beam.metrics import monitoring_infos
from apache_beam.metrics.execution import MetricsEnvironment
from apache_beam.metrics.metricbase import MetricName
try:
from apache_beam.io.gcp import gcsio, resource_identifiers
from apache_beam.io.gcp.internal.clients import storage
from apitools.base.py.exceptions import HttpError
except ImportError:
HttpError = None
# pylint: enable=wrong-import-order, wrong-import-position
DEFAULT_GCP_PROJECT = 'apache-beam-testing'
DEFAULT_PROJECT_NUMBER = 1
class FakeGcsClient(object):
# Fake storage client. Usage in gcsio.py is client.objects.Get(...) and
# client.objects.Insert(...).
def __init__(self):
self.objects = FakeGcsObjects()
self.buckets = FakeGcsBuckets()
# Referenced in GcsIO.copy_batch() and GcsIO.delete_batch().
self._http = object()
class FakeFile(object):
def __init__(
self, bucket, obj, contents, generation, crc32c=None, last_updated=None):
self.bucket = bucket
self.object = obj
self.contents = contents
self.generation = generation
self.crc32c = crc32c
self.last_updated = last_updated
def get_metadata(self):
last_updated_datetime = None
if self.last_updated:
last_updated_datetime = datetime.datetime.utcfromtimestamp(
self.last_updated)
return storage.Object(
bucket=self.bucket,
name=self.object,
generation=self.generation,
size=len(self.contents),
crc32c=self.crc32c,
updated=last_updated_datetime)
class FakeGcsBuckets(object):
def __init__(self):
pass
def get_bucket(self, bucket):
return storage.Bucket(name=bucket, projectNumber=DEFAULT_PROJECT_NUMBER)
def Get(self, get_request):
return self.get_bucket(get_request.bucket)
class FakeGcsObjects(object):
def __init__(self):
self.files = {}
# Store the last generation used for a given object name. Note that this
# has to persist even past the deletion of the object.
self.last_generation = {}
self.list_page_tokens = {}
def add_file(self, f):
self.files[(f.bucket, f.object)] = f
self.last_generation[(f.bucket, f.object)] = f.generation
def get_file(self, bucket, obj):
return self.files.get((bucket, obj), None)
def delete_file(self, bucket, obj):
del self.files[(bucket, obj)]
def get_last_generation(self, bucket, obj):
return self.last_generation.get((bucket, obj), 0)
def Get(self, get_request, download=None): # pylint: disable=invalid-name
f = self.get_file(get_request.bucket, get_request.object)
if f is None:
# Failing with an HTTP 404 if file does not exist.
raise HttpError({'status': 404}, None, None)
if download is None:
return f.get_metadata()
else:
stream = download.stream
def get_range_callback(start, end):
if not 0 <= start <= end < len(f.contents):
raise ValueError(
'start=%d end=%d len=%s' % (start, end, len(f.contents)))
stream.write(f.contents[start:end + 1])
download.GetRange = get_range_callback
def Insert(self, insert_request, upload=None): # pylint: disable=invalid-name
assert upload is not None
generation = self.get_last_generation(
insert_request.bucket, insert_request.name) + 1
f = FakeFile(insert_request.bucket, insert_request.name, b'', generation)
# Stream data into file.
stream = upload.stream
data_list = []
while True:
data = stream.read(1024 * 1024)
if not data:
break
data_list.append(data)
f.contents = b''.join(data_list)
self.add_file(f)
REWRITE_TOKEN = 'test_token'
def Rewrite(self, rewrite_request): # pylint: disable=invalid-name
if rewrite_request.rewriteToken == self.REWRITE_TOKEN:
dest_object = storage.Object()
return storage.RewriteResponse(
done=True,
objectSize=100,
resource=dest_object,
totalBytesRewritten=100)
src_file = self.get_file(
rewrite_request.sourceBucket, rewrite_request.sourceObject)
if not src_file:
raise HttpError(
httplib2.Response({'status': '404'}),
'404 Not Found',
'https://fake/url')
generation = self.get_last_generation(
rewrite_request.destinationBucket,
rewrite_request.destinationObject) + 1
dest_file = FakeFile(
rewrite_request.destinationBucket,
rewrite_request.destinationObject,
src_file.contents,
generation)
self.add_file(dest_file)
time.sleep(10) # time.sleep and time.time are mocked below.
return storage.RewriteResponse(
done=False,
objectSize=100,
rewriteToken=self.REWRITE_TOKEN,
totalBytesRewritten=5)
def Delete(self, delete_request): # pylint: disable=invalid-name
# Here, we emulate the behavior of the GCS service in raising a 404 error
# if this object already exists.
if self.get_file(delete_request.bucket, delete_request.object):
self.delete_file(delete_request.bucket, delete_request.object)
else:
raise HttpError(
httplib2.Response({'status': '404'}),
'404 Not Found',
'https://fake/url')
def List(self, list_request): # pylint: disable=invalid-name
bucket = list_request.bucket
prefix = list_request.prefix or ''
matching_files = []
for file_bucket, file_name in sorted(iter(self.files)):
if bucket == file_bucket and file_name.startswith(prefix):
file_object = self.files[(file_bucket, file_name)].get_metadata()
matching_files.append(file_object)
# Handle pagination.
items_per_page = 5
if not list_request.pageToken:
range_start = 0
else:
if list_request.pageToken not in self.list_page_tokens:
raise ValueError('Invalid page token.')
range_start = self.list_page_tokens[list_request.pageToken]
del self.list_page_tokens[list_request.pageToken]
result = storage.Objects(
items=matching_files[range_start:range_start + items_per_page])
if range_start + items_per_page < len(matching_files):
next_range_start = range_start + items_per_page
next_page_token = '_page_token_%s_%s_%d' % (
bucket, prefix, next_range_start)
self.list_page_tokens[next_page_token] = next_range_start
result.nextPageToken = next_page_token
return result
class FakeApiCall(object):
def __init__(self, exception, response):
self.exception = exception
self.is_error = exception is not None
# Response for Rewrite:
self.response = response
class FakeBatchApiRequest(object):
def __init__(self, **unused_kwargs):
self.operations = []
def Add(self, service, method, request): # pylint: disable=invalid-name
self.operations.append((service, method, request))
def Execute(self, unused_http, **unused_kwargs): # pylint: disable=invalid-name
api_calls = []
for service, method, request in self.operations:
exception = None
response = None
try:
response = getattr(service, method)(request)
except Exception as e: # pylint: disable=broad-except
exception = e
api_calls.append(FakeApiCall(exception, response))
return api_calls
@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
class TestGCSPathParser(unittest.TestCase):
BAD_GCS_PATHS = [
'gs://',
'gs://bucket',
'gs:///name',
'gs:///',
'gs:/blah/bucket/name',
]
def test_gcs_path(self):
self.assertEqual(
gcsio.parse_gcs_path('gs://bucket/name'), ('bucket', 'name'))
self.assertEqual(
gcsio.parse_gcs_path('gs://bucket/name/sub'), ('bucket', 'name/sub'))
def test_bad_gcs_path(self):
for path in self.BAD_GCS_PATHS:
self.assertRaises(ValueError, gcsio.parse_gcs_path, path)
self.assertRaises(ValueError, gcsio.parse_gcs_path, 'gs://bucket/')
def test_gcs_path_object_optional(self):
self.assertEqual(
gcsio.parse_gcs_path('gs://bucket/name', object_optional=True),
('bucket', 'name'))
self.assertEqual(
gcsio.parse_gcs_path('gs://bucket/', object_optional=True),
('bucket', ''))
def test_bad_gcs_path_object_optional(self):
for path in self.BAD_GCS_PATHS:
self.assertRaises(ValueError, gcsio.parse_gcs_path, path, True)
class SampleOptions(object):
def __init__(self, project, region, kms_key=None):
self.project = DEFAULT_GCP_PROJECT
self.region = region
self.dataflow_kms_key = kms_key
@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
@mock.patch.multiple(
'time', time=mock.MagicMock(side_effect=range(100)), sleep=mock.MagicMock())
class TestGCSIO(unittest.TestCase):
def _insert_random_file(
self, client, path, size, generation=1, crc32c=None, last_updated=None):
bucket, name = gcsio.parse_gcs_path(path)
f = FakeFile(
bucket,
name,
os.urandom(size),
generation,
crc32c=crc32c,
last_updated=last_updated)
client.objects.add_file(f)
return f
def setUp(self):
self.client = FakeGcsClient()
self.gcs = gcsio.GcsIO(self.client)
def test_default_bucket_name(self):
self.assertEqual(
gcsio.default_gcs_bucket_name(DEFAULT_GCP_PROJECT, "us-central1"),
'dataflow-staging-us-central1-77b801c0838aee13391c0d1885860494')
def test_default_bucket_name_failure(self):
self.assertEqual(
gcsio.get_or_create_default_gcs_bucket(
SampleOptions(
DEFAULT_GCP_PROJECT, "us-central1", kms_key="kmskey!")),
None)
def test_num_retries(self):
# BEAM-7424: update num_retries accordingly if storage_client is
# regenerated.
self.assertEqual(gcsio.GcsIO().client.num_retries, 20)
def test_retry_func(self):
# BEAM-7667: update retry_func accordingly if storage_client is
# regenerated.
self.assertIsNotNone(gcsio.GcsIO().client.retry_func)
def test_exists(self):
file_name = 'gs://gcsio-test/dummy_file'
file_size = 1234
self._insert_random_file(self.client, file_name, file_size)
self.assertFalse(self.gcs.exists(file_name + 'xyz'))
self.assertTrue(self.gcs.exists(file_name))
@mock.patch.object(FakeGcsObjects, 'Get')
def test_exists_failure(self, mock_get):
# Raising an error other than 404. Raising 404 is a valid failure for
# exists() call.
mock_get.side_effect = HttpError({'status': 400}, None, None)
file_name = 'gs://gcsio-test/dummy_file'
file_size = 1234
self._insert_random_file(self.client, file_name, file_size)
with self.assertRaises(HttpError) as cm:
self.gcs.exists(file_name)
self.assertEqual(400, cm.exception.status_code)
def test_checksum(self):
file_name = 'gs://gcsio-test/dummy_file'
file_size = 1234
checksum = 'deadbeef'
self._insert_random_file(self.client, file_name, file_size, crc32c=checksum)
self.assertTrue(self.gcs.exists(file_name))
self.assertEqual(checksum, self.gcs.checksum(file_name))
def test_size(self):
file_name = 'gs://gcsio-test/dummy_file'
file_size = 1234
self._insert_random_file(self.client, file_name, file_size)
self.assertTrue(self.gcs.exists(file_name))
self.assertEqual(1234, self.gcs.size(file_name))
def test_last_updated(self):
file_name = 'gs://gcsio-test/dummy_file'
file_size = 1234
last_updated = 123456.78
self._insert_random_file(
self.client, file_name, file_size, last_updated=last_updated)
self.assertTrue(self.gcs.exists(file_name))
self.assertEqual(last_updated, self.gcs.last_updated(file_name))
def test_file_mode(self):
file_name = 'gs://gcsio-test/dummy_mode_file'
with self.gcs.open(file_name, 'wb') as f:
assert f.mode == 'wb'
with self.gcs.open(file_name, 'rb') as f:
assert f.mode == 'rb'
def test_bad_file_modes(self):
file_name = 'gs://gcsio-test/dummy_mode_file'
with self.assertRaises(ValueError):
self.gcs.open(file_name, 'w+')
with self.assertRaises(ValueError):
self.gcs.open(file_name, 'r+b')
def test_empty_batches(self):
self.assertEqual([], self.gcs.copy_batch([]))
self.assertEqual([], self.gcs.delete_batch([]))
def test_delete(self):
file_name = 'gs://gcsio-test/delete_me'
file_size = 1024
# Test deletion of non-existent file.
self.gcs.delete(file_name)
self._insert_random_file(self.client, file_name, file_size)
self.assertTrue(
gcsio.parse_gcs_path(file_name) in self.client.objects.files)
self.gcs.delete(file_name)
self.assertFalse(
gcsio.parse_gcs_path(file_name) in self.client.objects.files)
@mock.patch('apache_beam.io.gcp.gcsio.BatchApiRequest')
def test_delete_batch(self, *unused_args):
gcsio.BatchApiRequest = FakeBatchApiRequest
file_name_pattern = 'gs://gcsio-test/delete_me_%d'
file_size = 1024
num_files = 10
# Test deletion of non-existent files.
result = self.gcs.delete_batch(
[file_name_pattern % i for i in range(num_files)])
self.assertTrue(result)
for i, (file_name, exception) in enumerate(result):
self.assertEqual(file_name, file_name_pattern % i)
self.assertEqual(exception, None)
self.assertFalse(self.gcs.exists(file_name_pattern % i))
# Insert some files.
for i in range(num_files):
self._insert_random_file(self.client, file_name_pattern % i, file_size)
# Check files inserted properly.
for i in range(num_files):
self.assertTrue(self.gcs.exists(file_name_pattern % i))
# Execute batch delete.
self.gcs.delete_batch([file_name_pattern % i for i in range(num_files)])
# Check files deleted properly.
for i in range(num_files):
self.assertFalse(self.gcs.exists(file_name_pattern % i))
def test_copy(self):
src_file_name = 'gs://gcsio-test/source'
dest_file_name = 'gs://gcsio-test/dest'
file_size = 1024
self._insert_random_file(self.client, src_file_name, file_size)
self.assertTrue(
gcsio.parse_gcs_path(src_file_name) in self.client.objects.files)
self.assertFalse(
gcsio.parse_gcs_path(dest_file_name) in self.client.objects.files)
self.gcs.copy(src_file_name, dest_file_name, dest_kms_key_name='kms_key')
self.assertTrue(
gcsio.parse_gcs_path(src_file_name) in self.client.objects.files)
self.assertTrue(
gcsio.parse_gcs_path(dest_file_name) in self.client.objects.files)
# Test copy of non-existent files.
with self.assertRaisesRegex(HttpError, r'Not Found'):
self.gcs.copy(
'gs://gcsio-test/non-existent',
'gs://gcsio-test/non-existent-destination')
@mock.patch('apache_beam.io.gcp.gcsio.BatchApiRequest')
def test_copy_batch(self, *unused_args):
gcsio.BatchApiRequest = FakeBatchApiRequest
from_name_pattern = 'gs://gcsio-test/copy_me_%d'
to_name_pattern = 'gs://gcsio-test/destination_%d'
file_size = 1024
num_files = 10
result = self.gcs.copy_batch([(from_name_pattern % i, to_name_pattern % i)
for i in range(num_files)],
dest_kms_key_name='kms_key')
self.assertTrue(result)
for i, (src, dest, exception) in enumerate(result):
self.assertEqual(src, from_name_pattern % i)
self.assertEqual(dest, to_name_pattern % i)
self.assertTrue(isinstance(exception, IOError))
self.assertEqual(exception.errno, errno.ENOENT)
self.assertFalse(self.gcs.exists(from_name_pattern % i))
self.assertFalse(self.gcs.exists(to_name_pattern % i))
# Insert some files.
for i in range(num_files):
self._insert_random_file(self.client, from_name_pattern % i, file_size)
# Check files inserted properly.
for i in range(num_files):
self.assertTrue(self.gcs.exists(from_name_pattern % i))
# Execute batch copy.
self.gcs.copy_batch([(from_name_pattern % i, to_name_pattern % i)
for i in range(num_files)])
# Check files copied properly.
for i in range(num_files):
self.assertTrue(self.gcs.exists(from_name_pattern % i))
self.assertTrue(self.gcs.exists(to_name_pattern % i))
def test_copytree(self):
src_dir_name = 'gs://gcsio-test/source/'
dest_dir_name = 'gs://gcsio-test/dest/'
file_size = 1024
paths = ['a', 'b/c', 'b/d']
for path in paths:
src_file_name = src_dir_name + path
dest_file_name = dest_dir_name + path
self._insert_random_file(self.client, src_file_name, file_size)
self.assertTrue(
gcsio.parse_gcs_path(src_file_name) in self.client.objects.files)
self.assertFalse(
gcsio.parse_gcs_path(dest_file_name) in self.client.objects.files)
self.gcs.copytree(src_dir_name, dest_dir_name)
for path in paths:
src_file_name = src_dir_name + path
dest_file_name = dest_dir_name + path
self.assertTrue(
gcsio.parse_gcs_path(src_file_name) in self.client.objects.files)
self.assertTrue(
gcsio.parse_gcs_path(dest_file_name) in self.client.objects.files)
def test_rename(self):
src_file_name = 'gs://gcsio-test/source'
dest_file_name = 'gs://gcsio-test/dest'
file_size = 1024
self._insert_random_file(self.client, src_file_name, file_size)
self.assertTrue(
gcsio.parse_gcs_path(src_file_name) in self.client.objects.files)
self.assertFalse(
gcsio.parse_gcs_path(dest_file_name) in self.client.objects.files)
self.gcs.rename(src_file_name, dest_file_name)
self.assertFalse(
gcsio.parse_gcs_path(src_file_name) in self.client.objects.files)
self.assertTrue(
gcsio.parse_gcs_path(dest_file_name) in self.client.objects.files)
def test_full_file_read(self):
file_name = 'gs://gcsio-test/full_file'
file_size = 5 * 1024 * 1024 + 100
random_file = self._insert_random_file(self.client, file_name, file_size)
f = self.gcs.open(file_name)
self.assertEqual(f.mode, 'r')
f.seek(0, os.SEEK_END)
self.assertEqual(f.tell(), file_size)
self.assertEqual(f.read(), b'')
f.seek(0)
self.assertEqual(f.read(), random_file.contents)
def test_file_random_seek(self):
file_name = 'gs://gcsio-test/seek_file'
file_size = 5 * 1024 * 1024 - 100
random_file = self._insert_random_file(self.client, file_name, file_size)
f = self.gcs.open(file_name)
random.seed(0)
for _ in range(0, 10):
a = random.randint(0, file_size - 1)
b = random.randint(0, file_size - 1)
start, end = min(a, b), max(a, b)
f.seek(start)
self.assertEqual(f.tell(), start)
self.assertEqual(
f.read(end - start + 1), random_file.contents[start:end + 1])
self.assertEqual(f.tell(), end + 1)
def test_file_iterator(self):
file_name = 'gs://gcsio-test/iterating_file'
lines = []
line_count = 10
for _ in range(line_count):
line_length = random.randint(100, 500)
line = os.urandom(line_length).replace(b'\n', b' ') + b'\n'
lines.append(line)
contents = b''.join(lines)
bucket, name = gcsio.parse_gcs_path(file_name)
self.client.objects.add_file(FakeFile(bucket, name, contents, 1))
f = self.gcs.open(file_name)
read_lines = 0
for line in f:
read_lines += 1
self.assertEqual(read_lines, line_count)
def test_file_read_line(self):
file_name = 'gs://gcsio-test/read_line_file'
lines = []
# Set a small buffer size to exercise refilling the buffer.
# First line is carefully crafted so the newline falls as the last character
# of the buffer to exercise this code path.
read_buffer_size = 1024
lines.append(b'x' * 1023 + b'\n')
for _ in range(1, 1000):
line_length = random.randint(100, 500)
line = os.urandom(line_length).replace(b'\n', b' ') + b'\n'
lines.append(line)
contents = b''.join(lines)
file_size = len(contents)
bucket, name = gcsio.parse_gcs_path(file_name)
self.client.objects.add_file(FakeFile(bucket, name, contents, 1))
f = self.gcs.open(file_name, read_buffer_size=read_buffer_size)
# Test read of first two lines.
f.seek(0)
self.assertEqual(f.readline(), lines[0])
self.assertEqual(f.tell(), len(lines[0]))
self.assertEqual(f.readline(), lines[1])
# Test read at line boundary.
f.seek(file_size - len(lines[-1]) - 1)
self.assertEqual(f.readline(), b'\n')
# Test read at end of file.
f.seek(file_size)
self.assertEqual(f.readline(), b'')
# Test reads at random positions.
random.seed(0)
for _ in range(0, 10):
start = random.randint(0, file_size - 1)
line_index = 0
# Find line corresponding to start index.
chars_left = start
while True:
next_line_length = len(lines[line_index])
if chars_left - next_line_length < 0:
break
chars_left -= next_line_length
line_index += 1
f.seek(start)
self.assertEqual(f.readline(), lines[line_index][chars_left:])
def test_file_write(self):
file_name = 'gs://gcsio-test/write_file'
file_size = 5 * 1024 * 1024 + 2000
contents = os.urandom(file_size)
f = self.gcs.open(file_name, 'w')
self.assertEqual(f.mode, 'w')
f.write(contents[0:1000])
f.write(contents[1000:1024 * 1024])
f.write(contents[1024 * 1024:])
f.close()
bucket, name = gcsio.parse_gcs_path(file_name)
self.assertEqual(
self.client.objects.get_file(bucket, name).contents, contents)
def test_file_close(self):
file_name = 'gs://gcsio-test/close_file'
file_size = 5 * 1024 * 1024 + 2000
contents = os.urandom(file_size)
f = self.gcs.open(file_name, 'w')
self.assertEqual(f.mode, 'w')
f.write(contents)
f.close()
f.close() # This should not crash.
bucket, name = gcsio.parse_gcs_path(file_name)
self.assertEqual(
self.client.objects.get_file(bucket, name).contents, contents)
def test_file_flush(self):
file_name = 'gs://gcsio-test/flush_file'
file_size = 5 * 1024 * 1024 + 2000
contents = os.urandom(file_size)
bucket, name = gcsio.parse_gcs_path(file_name)
f = self.gcs.open(file_name, 'w')
self.assertEqual(f.mode, 'w')
f.write(contents[0:1000])
f.flush()
f.write(contents[1000:1024 * 1024])
f.flush()
f.flush() # Should be a NOOP.
f.write(contents[1024 * 1024:])
f.close() # This should already call the equivalent of flush() in its body.
self.assertEqual(
self.client.objects.get_file(bucket, name).contents, contents)
def test_context_manager(self):
# Test writing with a context manager.
file_name = 'gs://gcsio-test/context_manager_file'
file_size = 1024
contents = os.urandom(file_size)
with self.gcs.open(file_name, 'w') as f:
f.write(contents)
bucket, name = gcsio.parse_gcs_path(file_name)
self.assertEqual(
self.client.objects.get_file(bucket, name).contents, contents)
# Test reading with a context manager.
with self.gcs.open(file_name) as f:
self.assertEqual(f.read(), contents)
# Test that exceptions are not swallowed by the context manager.
with self.assertRaises(ZeroDivisionError):
with self.gcs.open(file_name) as f:
f.read(0 // 0)
def test_list_prefix(self):
bucket_name = 'gcsio-test'
objects = [
('cow/cat/fish', 2),
('cow/cat/blubber', 3),
('cow/dog/blubber', 4),
]
for (object_name, size) in objects:
file_name = 'gs://%s/%s' % (bucket_name, object_name)
self._insert_random_file(self.client, file_name, size)
test_cases = [
(
'gs://gcsio-test/c',
[
('cow/cat/fish', 2),
('cow/cat/blubber', 3),
('cow/dog/blubber', 4),
]),
(
'gs://gcsio-test/cow/',
[
('cow/cat/fish', 2),
('cow/cat/blubber', 3),
('cow/dog/blubber', 4),
]),
('gs://gcsio-test/cow/cat/fish', [
('cow/cat/fish', 2),
]),
]
for file_pattern, expected_object_names in test_cases:
expected_file_names = [('gs://%s/%s' % (bucket_name, object_name), size)
for (object_name, size) in expected_object_names]
self.assertEqual(
set(self.gcs.list_prefix(file_pattern).items()),
set(expected_file_names))
def test_mime_binary_encoding(self):
# This test verifies that the MIME email_generator library works properly
# and does not corrupt '\r\n' during uploads (the patch to apitools in
# Python 3 is applied in io/gcp/__init__.py).
from apitools.base.py.transfer import email_generator
generator_cls = email_generator.BytesGenerator
output_buffer = io.BytesIO()
generator = generator_cls(output_buffer)
test_msg = 'a\nb\r\nc\n\r\n\n\nd'
message = Message()
message.set_payload(test_msg)
generator._handle_text(message)
self.assertEqual(test_msg.encode('ascii'), output_buffer.getvalue())
def test_downloader_monitoring_info(self):
file_name = 'gs://gcsio-metrics-test/dummy_mode_file'
file_size = 5 * 1024 * 1024 + 100
random_file = self._insert_random_file(self.client, file_name, file_size)
self.gcs.open(file_name, 'r')
resource = resource_identifiers.GoogleCloudStorageBucket(random_file.bucket)
labels = {
monitoring_infos.SERVICE_LABEL: 'Storage',
monitoring_infos.METHOD_LABEL: 'Objects.get',
monitoring_infos.RESOURCE_LABEL: resource,
monitoring_infos.GCS_BUCKET_LABEL: random_file.bucket,
monitoring_infos.GCS_PROJECT_ID_LABEL: DEFAULT_PROJECT_NUMBER,
monitoring_infos.STATUS_LABEL: 'ok'
}
metric_name = MetricName(
None, None, urn=monitoring_infos.API_REQUEST_COUNT_URN, labels=labels)
metric_value = MetricsEnvironment.process_wide_container().get_counter(
metric_name).get_cumulative()
self.assertEqual(metric_value, 2)
def test_uploader_monitoring_info(self):
file_name = 'gs://gcsio-metrics-test/dummy_mode_file'
file_size = 5 * 1024 * 1024 + 100
random_file = self._insert_random_file(self.client, file_name, file_size)
f = self.gcs.open(file_name, 'w')
resource = resource_identifiers.GoogleCloudStorageBucket(random_file.bucket)
labels = {
monitoring_infos.SERVICE_LABEL: 'Storage',
monitoring_infos.METHOD_LABEL: 'Objects.insert',
monitoring_infos.RESOURCE_LABEL: resource,
monitoring_infos.GCS_BUCKET_LABEL: random_file.bucket,
monitoring_infos.GCS_PROJECT_ID_LABEL: DEFAULT_PROJECT_NUMBER,
monitoring_infos.STATUS_LABEL: 'ok'
}
f.close()
metric_name = MetricName(
None, None, urn=monitoring_infos.API_REQUEST_COUNT_URN, labels=labels)
metric_value = MetricsEnvironment.process_wide_container().get_counter(
metric_name).get_cumulative()
self.assertEqual(metric_value, 1)
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
unittest.main()