blob: 1ed10ffeaf0e2ddafdbe5077fb5528b833285788 [file] [log] [blame]
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Mostly copied from downloadablefilesource.py in buildstream.
#
import contextlib
import shutil
import os
import netrc
import urllib.parse
import urllib.request
import urllib.error
class _NetrcPasswordManager:
def __init__(self, netrc_config):
self.netrc = netrc_config
def add_password(self, realm, uri, user, passwd):
pass
def find_user_password(self, realm, authuri):
if not self.netrc:
return None, None
parts = urllib.parse.urlsplit(authuri)
entry = self.netrc.authenticators(parts.hostname)
if not entry:
return None, None
else:
login, _, password = entry
return login, password
def _parse_netrc():
netrc_config = None
try:
netrc_config = netrc.netrc()
except (OSError, netrc.NetrcParseError):
# If the .netrc file was not found, FileNotFoundError will be
# raised, but OSError will be raised directly by the netrc package
# in the case that $HOME is not set.
#
# This will catch both cases.
pass
return netrc_config
class _UrlOpenerCreator:
def __init__(self, netrc_config):
self.netrc_config = netrc_config
def get_url_opener(self, bearer_auth):
if self.netrc_config and not bearer_auth:
netrc_pw_mgr = _NetrcPasswordManager(self.netrc_config)
http_auth = urllib.request.HTTPBasicAuthHandler(netrc_pw_mgr)
return urllib.request.build_opener(http_auth)
return urllib.request.build_opener()
def download_file(url, etag, directory, auth_scheme):
opener_creator = _UrlOpenerCreator(_parse_netrc())
opener = opener_creator.get_url_opener(auth_scheme == "bearer")
default_name = os.path.basename(url)
request = urllib.request.Request(url)
request.add_header("Accept", "*/*")
request.add_header("User-Agent", "BuildStream/2")
if opener_creator.netrc_config and auth_scheme == "bearer":
parts = urllib.parse.urlsplit(url)
entry = opener_creator.netrc_config.authenticators(parts.hostname)
if entry:
_, _, password = entry
auth_header = "Bearer " + password
request.add_header("Authorization", auth_header)
if etag is not None:
request.add_header("If-None-Match", etag)
try:
with contextlib.closing(opener.open(request)) as response:
info = response.info()
# some servers don't honor the 'If-None-Match' header
if etag and info["ETag"] == etag:
return None, None, None
etag = info["ETag"]
length = info.get("Content-Length")
filename = info.get_filename(default_name)
filename = os.path.basename(filename)
local_file = os.path.join(directory, filename)
with open(local_file, "wb") as dest:
shutil.copyfileobj(response, dest)
actual_length = dest.tell()
if length and actual_length < int(length):
raise ValueError(f"Partial file {actual_length}/{length}")
except urllib.error.HTTPError as e:
if e.code == 304:
# 304 Not Modified.
# Because we use etag only for matching ref, currently specified ref is what
# we would have downloaded.
return None, None, None
return None, None, str(e)
except (urllib.error.URLError, OSError, ValueError) as e:
# Note that urllib.request.Request in the try block may throw a
# ValueError for unknown url types, so we handle it here.
return None, None, str(e)
return local_file, etag, None