blob: cf0fc7260ce877c7ac79a51958091a68d903d1a3 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Provides base classes for working with storage
"""
# Backward compatibility for Python 2.5
from __future__ import with_statement
from typing import Dict
from typing import Iterator
from typing import List
from typing import Optional
from typing import Type
import os.path # pylint: disable-msg=W0404
import hashlib
import warnings
import errno
from os.path import join as pjoin
from libcloud.utils.py3 import httplib
from libcloud.utils.py3 import b
import libcloud.utils.files
from libcloud.common.types import LibcloudError
from libcloud.common.base import Connection
from libcloud.common.base import ConnectionUserAndKey, BaseDriver
from libcloud.storage.types import ObjectDoesNotExistError
__all__ = ["Object", "Container", "StorageDriver", "CHUNK_SIZE", "DEFAULT_CONTENT_TYPE"]
CHUNK_SIZE = 8096
# Default Content-Type which is sent when uploading an object if one is not
# supplied and can't be detected when using non-strict mode.
DEFAULT_CONTENT_TYPE = "application/octet-stream"
class Object(object):
"""
Represents an object (BLOB).
"""
def __init__(
self,
name, # type: str
size, # type: int
hash, # type: str
extra, # type: dict
meta_data, # type: dict
container, # type: Container
driver, # type: StorageDriver
):
"""
:param name: Object name (must be unique per container).
:type name: ``str``
:param size: Object size in bytes.
:type size: ``int``
:param hash: Object hash.
:type hash: ``str``
:param container: Object container.
:type container: :class:`libcloud.storage.base.Container`
:param extra: Extra attributes.
:type extra: ``dict``
:param meta_data: Optional object meta data.
:type meta_data: ``dict``
:param driver: StorageDriver instance.
:type driver: :class:`libcloud.storage.base.StorageDriver`
"""
self.name = name
self.size = size
self.hash = hash
self.container = container
self.extra = extra or {}
self.meta_data = meta_data or {}
self.driver = driver
def get_cdn_url(self):
# type: () -> str
return self.driver.get_object_cdn_url(obj=self)
def enable_cdn(self):
# type: () -> bool
return self.driver.enable_object_cdn(obj=self)
def download(
self, destination_path, overwrite_existing=False, delete_on_failure=True
):
# type: (str, bool, bool) -> bool
return self.driver.download_object(
obj=self,
destination_path=destination_path,
overwrite_existing=overwrite_existing,
delete_on_failure=delete_on_failure,
)
def as_stream(self, chunk_size=None):
# type: (Optional[int]) -> Iterator[bytes]
return self.driver.download_object_as_stream(obj=self, chunk_size=chunk_size)
def download_range(
self,
destination_path,
start_bytes,
end_bytes=None,
overwrite_existing=False,
delete_on_failure=True,
):
# type: (str, int, Optional[int], bool, bool) -> bool
return self.driver.download_object_range(
obj=self,
destination_path=destination_path,
start_bytes=start_bytes,
end_bytes=end_bytes,
overwrite_existing=overwrite_existing,
delete_on_failure=delete_on_failure,
)
def range_as_stream(self, start_bytes, end_bytes=None, chunk_size=None):
# type: (int, Optional[int], Optional[int]) -> Iterator[bytes]
return self.driver.download_object_range_as_stream(
obj=self,
start_bytes=start_bytes,
end_bytes=end_bytes,
chunk_size=chunk_size,
)
def delete(self):
# type: () -> bool
return self.driver.delete_object(self)
def __repr__(self):
return "<Object: name=%s, size=%s, hash=%s, provider=%s ...>" % (
self.name,
self.size,
self.hash,
self.driver.name,
)
class Container(object):
"""
Represents a container (bucket) which can hold multiple objects.
"""
def __init__(
self,
name, # type: str
extra, # type: dict
driver, # type: StorageDriver
):
"""
:param name: Container name (must be unique).
:type name: ``str``
:param extra: Extra attributes.
:type extra: ``dict``
:param driver: StorageDriver instance.
:type driver: :class:`libcloud.storage.base.StorageDriver`
"""
self.name = name
self.extra = extra or {}
self.driver = driver
def iterate_objects(self, prefix=None, ex_prefix=None):
# type: (Optional[str], Optional[str]) -> Iterator[Object]
return self.driver.iterate_container_objects(
container=self, prefix=prefix, ex_prefix=ex_prefix
)
def list_objects(self, prefix=None, ex_prefix=None):
# type: (Optional[str], Optional[str]) -> List[Object]
return self.driver.list_container_objects(
container=self, prefix=prefix, ex_prefix=ex_prefix
)
def get_cdn_url(self):
# type: () -> str
return self.driver.get_container_cdn_url(container=self)
def enable_cdn(self):
# type: () -> bool
return self.driver.enable_container_cdn(container=self)
def get_object(self, object_name):
# type: (str) -> Object
return self.driver.get_object(container_name=self.name, object_name=object_name)
def upload_object(
self, file_path, object_name, extra=None, verify_hash=True, headers=None
):
# type: (str, str, Optional[dict], bool, Optional[Dict[str, str]]) -> Object # noqa: E501
return self.driver.upload_object(
file_path,
self,
object_name,
extra=extra,
verify_hash=verify_hash,
headers=headers,
)
def upload_object_via_stream(self, iterator, object_name, extra=None, headers=None):
# type: (Iterator[bytes], str, Optional[dict], Optional[Dict[str, str]]) -> Object # noqa: E501
return self.driver.upload_object_via_stream(
iterator, self, object_name, extra=extra, headers=headers
)
def download_object(
self, obj, destination_path, overwrite_existing=False, delete_on_failure=True
):
# type: (Object, str, bool, bool) -> bool
return self.driver.download_object(
obj,
destination_path,
overwrite_existing=overwrite_existing,
delete_on_failure=delete_on_failure,
)
def download_object_as_stream(self, obj, chunk_size=None):
# type: (Object, Optional[int]) -> Iterator[bytes]
return self.driver.download_object_as_stream(obj, chunk_size)
def download_object_range(
self,
obj,
destination_path,
start_bytes,
end_bytes=None,
overwrite_existing=False,
delete_on_failure=True,
):
# type: (Object, str, int, Optional[int], bool, bool) -> bool
return self.driver.download_object_range(
obj=obj,
destination_path=destination_path,
start_bytes=start_bytes,
end_bytes=end_bytes,
overwrite_existing=overwrite_existing,
delete_on_failure=delete_on_failure,
)
def download_object_range_as_stream(
self, obj, start_bytes, end_bytes=None, chunk_size=None
):
# type: (Object, int, Optional[int], Optional[int]) -> Iterator[bytes]
return self.driver.download_object_range_as_stream(
obj=obj, start_bytes=start_bytes, end_bytes=end_bytes, chunk_size=chunk_size
)
def delete_object(self, obj):
# type: (Object) -> bool
return self.driver.delete_object(obj)
def delete(self):
# type: () -> bool
return self.driver.delete_container(self)
def __repr__(self):
return "<Container: name=%s, provider=%s>" % (self.name, self.driver.name)
class StorageDriver(BaseDriver):
"""
A base StorageDriver to derive from.
"""
connectionCls = ConnectionUserAndKey # type: Type[Connection]
name = None # type: str
hash_type = "md5" # type: str
supports_chunked_encoding = False # type: bool
# When strict mode is used, exception will be thrown if no content type is
# provided and none can be detected when uploading an object
strict_mode = False # type: bool
def iterate_containers(self):
# type: () -> Iterator[Container]
"""
Return a iterator of containers for the given account
:return: A iterator of Container instances.
:rtype: ``iterator`` of :class:`libcloud.storage.base.Container`
"""
raise NotImplementedError("iterate_containers not implemented for this driver")
def list_containers(self):
# type: () -> List[Container]
"""
Return a list of containers.
:return: A list of Container instances.
:rtype: ``list`` of :class:`Container`
"""
return list(self.iterate_containers())
def iterate_container_objects(self, container, prefix=None, ex_prefix=None):
# type: (Container, Optional[str], Optional[str]) -> Iterator[Object]
"""
Return a iterator of objects for the given container.
:param container: Container instance
:type container: :class:`libcloud.storage.base.Container`
:param prefix: Filter objects starting with a prefix.
:type prefix: ``str``
:param ex_prefix: (Deprecated.) Filter objects starting with a prefix.
:type ex_prefix: ``str``
:return: A iterator of Object instances.
:rtype: ``iterator`` of :class:`libcloud.storage.base.Object`
"""
raise NotImplementedError(
"iterate_container_objects not implemented for this driver"
)
def list_container_objects(self, container, prefix=None, ex_prefix=None):
# type: (Container, Optional[str], Optional[str]) -> List[Object]
"""
Return a list of objects for the given container.
:param container: Container instance.
:type container: :class:`libcloud.storage.base.Container`
:param prefix: Filter objects starting with a prefix.
:type prefix: ``str``
:param ex_prefix: (Deprecated.) Filter objects starting with a prefix.
:type ex_prefix: ``str``
:return: A list of Object instances.
:rtype: ``list`` of :class:`libcloud.storage.base.Object`
"""
return list(
self.iterate_container_objects(
container, prefix=prefix, ex_prefix=ex_prefix
)
)
def _normalize_prefix_argument(self, prefix, ex_prefix):
if ex_prefix:
warnings.warn(
"The ``ex_prefix`` argument is deprecated - "
"please update code to use ``prefix``",
DeprecationWarning,
)
return ex_prefix
return prefix
def _filter_listed_container_objects(self, objects, prefix):
if prefix is not None:
warnings.warn(
"Driver %s does not implement native object "
"filtering; falling back to filtering the full "
"object stream." % self.__class__.__name__
)
for obj in objects:
if prefix is None or obj.name.startswith(prefix):
yield obj
def get_container(self, container_name):
# type: (str) -> Container
"""
Return a container instance.
:param container_name: Container name.
:type container_name: ``str``
:return: :class:`Container` instance.
:rtype: :class:`libcloud.storage.base.Container`
"""
raise NotImplementedError("get_object not implemented for this driver")
def get_container_cdn_url(self, container):
# type: (Container) -> str
"""
Return a container CDN URL.
:param container: Container instance
:type container: :class:`libcloud.storage.base.Container`
:return: A CDN URL for this container.
:rtype: ``str``
"""
raise NotImplementedError(
"get_container_cdn_url not implemented for this driver"
)
def get_object(self, container_name, object_name):
# type: (str, str) -> Object
"""
Return an object instance.
:param container_name: Container name.
:type container_name: ``str``
:param object_name: Object name.
:type object_name: ``str``
:return: :class:`Object` instance.
:rtype: :class:`libcloud.storage.base.Object`
"""
raise NotImplementedError("get_object not implemented for this driver")
def get_object_cdn_url(self, obj):
# type: (Object) -> str
"""
Return an object CDN URL.
:param obj: Object instance
:type obj: :class:`libcloud.storage.base.Object`
:return: A CDN URL for this object.
:rtype: ``str``
"""
raise NotImplementedError("get_object_cdn_url not implemented for this driver")
def enable_container_cdn(self, container):
# type: (Container) -> bool
"""
Enable container CDN.
:param container: Container instance
:type container: :class:`libcloud.storage.base.Container`
:rtype: ``bool``
"""
raise NotImplementedError(
"enable_container_cdn not implemented for this driver"
)
def enable_object_cdn(self, obj):
# type: (Object) -> bool
"""
Enable object CDN.
:param obj: Object instance
:type obj: :class:`libcloud.storage.base.Object`
:rtype: ``bool``
"""
raise NotImplementedError("enable_object_cdn not implemented for this driver")
def download_object(
self, obj, destination_path, overwrite_existing=False, delete_on_failure=True
):
# type: (Object, str, bool, bool) -> bool
"""
Download an object to the specified destination path.
:param obj: Object instance.
:type obj: :class:`libcloud.storage.base.Object`
:param destination_path: Full path to a file or a directory where the
incoming file will be saved.
:type destination_path: ``str``
:param overwrite_existing: True to overwrite an existing file,
defaults to False.
:type overwrite_existing: ``bool``
:param delete_on_failure: True to delete a partially downloaded file if
the download was not successful (hash
mismatch / file size).
:type delete_on_failure: ``bool``
:return: True if an object has been successfully downloaded, False
otherwise.
:rtype: ``bool``
"""
raise NotImplementedError("download_object not implemented for this driver")
def download_object_as_stream(self, obj, chunk_size=None):
# type: (Object, Optional[int]) -> Iterator[bytes]
"""
Return a iterator which yields object data.
:param obj: Object instance
:type obj: :class:`libcloud.storage.base.Object`
:param chunk_size: Optional chunk size (in bytes).
:type chunk_size: ``int``
:rtype: ``iterator`` of ``bytes``
"""
raise NotImplementedError(
"download_object_as_stream not implemented for this driver"
)
def download_object_range(
self,
obj,
destination_path,
start_bytes,
end_bytes=None,
overwrite_existing=False,
delete_on_failure=True,
):
# type: (Object, str, int, Optional[int], bool, bool) -> bool
"""
Download part of an object.
:param obj: Object instance.
:type obj: :class:`libcloud.storage.base.Object`
:param destination_path: Full path to a file or a directory where the
incoming file will be saved.
:type destination_path: ``str``
:param start_bytes: Start byte offset (inclusive) for the range
download. Offset is 0 index based so the first
byte in file file is "0".
:type start_bytes: ``int``
:param end_bytes: End byte offset (non-inclusive) for the range
download. If not provided, it will default to the
end of the file.
:type end_bytes: ``int``
:param overwrite_existing: True to overwrite an existing file,
defaults to False.
:type overwrite_existing: ``bool``
:param delete_on_failure: True to delete a partially downloaded file if
the download was not successful (hash
mismatch / file size).
:type delete_on_failure: ``bool``
:return: True if an object has been successfully downloaded, False
otherwise.
:rtype: ``bool``
"""
raise NotImplementedError(
"download_object_range not implemented for this driver"
)
def download_object_range_as_stream(
self, obj, start_bytes, end_bytes=None, chunk_size=None
):
# type: (Object, int, Optional[int], Optional[int]) -> Iterator[bytes]
"""
Return a iterator which yields range / part of the object data.
:param obj: Object instance
:type obj: :class:`libcloud.storage.base.Object`
:param start_bytes: Start byte offset (inclusive) for the range
download. Offset is 0 index based so the first
byte in file file is "0".
:type start_bytes: ``int``
:param end_bytes: End byte offset (non-inclusive) for the range
download. If not provided, it will default to the
end of the file.
:type end_bytes: ``int``
:param chunk_size: Optional chunk size (in bytes).
:type chunk_size: ``int``
:rtype: ``iterator`` of ``bytes``
"""
raise NotImplementedError(
"download_object_range_as_stream not implemented for this driver"
)
def upload_object(
self,
file_path,
container,
object_name,
extra=None,
verify_hash=True,
headers=None,
):
# type: (str, Container, str, Optional[dict], bool, Optional[Dict[str, str]]) -> Object # noqa: E501
"""
Upload an object currently located on a disk.
:param file_path: Path to the object on disk.
:type file_path: ``str``
:param container: Destination container.
:type container: :class:`libcloud.storage.base.Container`
:param object_name: Object name.
:type object_name: ``str``
:param verify_hash: Verify hash
:type verify_hash: ``bool``
:param extra: Extra attributes (driver specific). (optional)
:type extra: ``dict``
:param headers: (optional) Additional request headers,
such as CORS headers. For example:
headers = {'Access-Control-Allow-Origin': 'http://mozilla.com'}
:type headers: ``dict``
:rtype: :class:`libcloud.storage.base.Object`
"""
raise NotImplementedError("upload_object not implemented for this driver")
def upload_object_via_stream(
self, iterator, container, object_name, extra=None, headers=None
):
# type: (Iterator[bytes], Container, str, Optional[dict], Optional[Dict[str, str]]) -> Object # noqa: E501
"""
Upload an object using an iterator.
If a provider supports it, chunked transfer encoding is used and you
don't need to know in advance the amount of data to be uploaded.
Otherwise if a provider doesn't support it, iterator will be exhausted
so a total size for data to be uploaded can be determined.
Note: Exhausting the iterator means that the whole data must be
buffered in memory which might result in memory exhausting when
uploading a very large object.
If a file is located on a disk you are advised to use upload_object
function which uses fs.stat function to determine the file size and it
doesn't need to buffer whole object in the memory.
:param iterator: An object which implements the iterator interface.
:type iterator: :class:`object`
:param container: Destination container.
:type container: :class:`libcloud.storage.base.Container`
:param object_name: Object name.
:type object_name: ``str``
:param extra: (optional) Extra attributes (driver specific). Note:
This dictionary must contain a 'content_type' key which represents
a content type of the stored object.
:type extra: ``dict``
:param headers: (optional) Additional request headers,
such as CORS headers. For example:
headers = {'Access-Control-Allow-Origin': 'http://mozilla.com'}
:type headers: ``dict``
:rtype: ``libcloud.storage.base.Object``
"""
raise NotImplementedError(
"upload_object_via_stream not implemented for this driver"
)
def delete_object(self, obj):
# type: (Object) -> bool
"""
Delete an object.
:param obj: Object instance.
:type obj: :class:`libcloud.storage.base.Object`
:return: ``bool`` True on success.
:rtype: ``bool``
"""
raise NotImplementedError("delete_object not implemented for this driver")
def create_container(self, container_name):
# type: (str) -> Container
"""
Create a new container.
:param container_name: Container name.
:type container_name: ``str``
:return: Container instance on success.
:rtype: :class:`libcloud.storage.base.Container`
"""
raise NotImplementedError("create_container not implemented for this driver")
def delete_container(self, container):
# type: (Container) -> bool
"""
Delete a container.
:param container: Container instance
:type container: :class:`libcloud.storage.base.Container`
:return: ``True`` on success, ``False`` otherwise.
:rtype: ``bool``
"""
raise NotImplementedError("delete_container not implemented for this driver")
def _get_object(
self, obj, callback, callback_kwargs, response, success_status_code=None
):
"""
Call passed callback and start transfer of the object'
:param obj: Object instance.
:type obj: :class:`Object`
:param callback: Function which is called with the passed
callback_kwargs
:type callback: :class:`function`
:param callback_kwargs: Keyword arguments which are passed to the
callback.
:type callback_kwargs: ``dict``
:param response: Response instance.
:type response: :class:`Response`
:param success_status_code: Status code which represents a successful
transfer (defaults to httplib.OK)
:type success_status_code: ``int``
:return: ``True`` on success, ``False`` otherwise.
:rtype: ``bool``
"""
success_status_code = success_status_code or httplib.OK
if not isinstance(success_status_code, (list, tuple)):
success_status_codes = [success_status_code]
else:
success_status_codes = success_status_code
if response.status in success_status_codes:
return callback(**callback_kwargs)
elif response.status == httplib.NOT_FOUND:
raise ObjectDoesNotExistError(object_name=obj.name, value="", driver=self)
raise LibcloudError(
value="Unexpected status code: %s" % (response.status), driver=self
)
def _save_object(
self,
response,
obj,
destination_path,
overwrite_existing=False,
delete_on_failure=True,
chunk_size=None,
partial_download=False,
):
"""
Save object to the provided path.
:param response: RawResponse instance.
:type response: :class:`RawResponse`
:param obj: Object instance.
:type obj: :class:`Object`
:param destination_path: Destination directory.
:type destination_path: ``str``
:param delete_on_failure: True to delete partially downloaded object if
the download fails.
:type delete_on_failure: ``bool``
:param overwrite_existing: True to overwrite a local path if it already
exists.
:type overwrite_existing: ``bool``
:param chunk_size: Optional chunk size
(defaults to ``libcloud.storage.base.CHUNK_SIZE``, 8kb)
:type chunk_size: ``int``
:param partial_download: True if this is a range (partial) save,
False otherwise.
:type partial_download: ``bool``
:return: ``True`` on success, ``False`` otherwise.
:rtype: ``bool``
"""
chunk_size = chunk_size or CHUNK_SIZE
base_name = os.path.basename(destination_path)
if not base_name and not os.path.exists(destination_path):
raise LibcloudError(
value="Path %s does not exist" % (destination_path), driver=self
)
if not base_name:
file_path = pjoin(destination_path, obj.name)
else:
file_path = destination_path
if os.path.exists(file_path) and not overwrite_existing:
raise LibcloudError(
value="File %s already exists, but " % (file_path)
+ "overwrite_existing=False",
driver=self,
)
bytes_transferred = 0
with open(file_path, "wb") as file_handle:
for chunk in response._response.iter_content(chunk_size):
file_handle.write(b(chunk))
bytes_transferred += len(chunk)
if not partial_download and int(obj.size) != int(bytes_transferred):
# Transfer failed, support retry?
# NOTE: We only perform this check if this is a regular and not a
# partial / range download
if delete_on_failure:
try:
os.unlink(file_path)
except Exception:
pass
return False
return True
def _upload_object(
self,
object_name,
content_type,
request_path,
request_method="PUT",
headers=None,
file_path=None,
stream=None,
chunked=False,
multipart=False,
):
"""
Helper function for setting common request headers and calling the
passed in callback which uploads an object.
"""
headers = headers or {}
if file_path and not os.path.exists(file_path):
raise OSError("File %s does not exist" % (file_path))
if (
stream is not None
and not hasattr(stream, "next")
and not hasattr(stream, "__next__")
):
raise AttributeError("iterator object must implement next() " + "method.")
headers["Content-Type"] = self._determine_content_type(
content_type, object_name, file_path=file_path
)
if stream:
response = self.connection.request(
request_path,
method=request_method,
data=stream,
headers=headers,
raw=True,
)
stream_hash, stream_length = self._hash_buffered_stream(
stream, self._get_hash_function()
)
else:
with open(file_path, "rb") as file_stream:
response = self.connection.request(
request_path,
method=request_method,
data=file_stream,
headers=headers,
raw=True,
)
with open(file_path, "rb") as file_stream:
stream_hash, stream_length = self._hash_buffered_stream(
file_stream, self._get_hash_function()
)
if not response.success():
response.parse_error()
return {
"response": response,
"bytes_transferred": stream_length,
"data_hash": stream_hash,
}
def _determine_content_type(self, content_type, object_name, file_path=None):
if content_type:
return content_type
name = file_path or object_name
content_type, _ = libcloud.utils.files.guess_file_mime_type(name)
if self.strict_mode and not content_type:
raise AttributeError(
"File content-type could not be guessed for "
'"%s" and no content_type value is provided' % name
)
return content_type or DEFAULT_CONTENT_TYPE
def _hash_buffered_stream(self, stream, hasher, blocksize=65536):
total_len = 0
if hasattr(stream, "__next__") or hasattr(stream, "next"):
# Ensure we start from the begining of a stream in case stream is
# not at the beginning
if hasattr(stream, "seek"):
try:
stream.seek(0)
except OSError as e:
if e.errno != errno.ESPIPE:
# This represents "OSError: [Errno 29] Illegal seek"
# error. This could either mean that the underlying
# handle doesn't support seek operation (e.g. pipe) or
# that the invalid seek position is provided. Sadly
# there is no good robust way to distinghuish that so
# we simply ignore all the "Illeal seek" errors so
# this function works correctly with pipes.
# See https://github.com/apache/libcloud/pull/1427 for
# details
raise e
for chunk in libcloud.utils.files.read_in_chunks(iterator=stream):
hasher.update(b(chunk))
total_len += len(chunk)
return (hasher.hexdigest(), total_len)
if not hasattr(stream, "__exit__"):
for s in stream:
hasher.update(s)
total_len = total_len + len(s)
return (hasher.hexdigest(), total_len)
with stream:
buf = stream.read(blocksize)
while len(buf) > 0:
total_len = total_len + len(buf)
hasher.update(buf)
buf = stream.read(blocksize)
return (hasher.hexdigest(), total_len)
def _get_hash_function(self):
"""
Return instantiated hash function for the hash type supported by
the provider.
"""
try:
func = getattr(hashlib, self.hash_type)()
except AttributeError:
raise RuntimeError(
"Invalid or unsupported hash type: %s" % (self.hash_type)
)
return func
def _validate_start_and_end_bytes(self, start_bytes, end_bytes=None):
# type: (int, Optional[int]) -> bool
"""
Method which validates that start_bytes and end_bytes arguments contain
valid values.
"""
if start_bytes < 0:
raise ValueError("start_bytes must be greater than 0")
if end_bytes is not None:
if start_bytes > end_bytes:
raise ValueError("start_bytes must be smaller than end_bytes")
elif start_bytes == end_bytes:
raise ValueError(
"start_bytes and end_bytes can't be the "
"same. end_bytes is non-inclusive"
)
return True
def _get_standard_range_str(
self, start_bytes, end_bytes=None, end_bytes_inclusive=False
):
# type: (int, Optional[int], bool) -> str
"""
Return range string which is used as a Range header value for range
requests for drivers which follow standard Range header notation
This returns range string in the following format:
bytes=<start_bytes>-<end bytes>.
For example:
bytes=1-10
bytes=0-2
bytes=5-
bytes=100-5000
:param end_bytes_inclusive: True if "end_bytes" offset should be
inclusive (aka opposite from the Python indexing behavior where the end
index is not inclusive).
"""
range_str = "bytes=%s-" % (start_bytes)
if end_bytes is not None:
if end_bytes_inclusive:
range_str += str(end_bytes)
else:
range_str += str(end_bytes - 1)
return range_str