blob: 9bde70ed0d09e1dde269dad099d0f9861990029e [file]
#!/usr/bin/env python
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Download release binaries."""
import argparse
import concurrent.futures as cf
import functools
import json
import os
import random
import re
import subprocess
import time
import urllib.request
DEFAULT_PARALLEL_DOWNLOADS = 8
class Downloader:
def get_file_list(self, prefix, filter=None):
def traverse(directory, files, directories):
url = f"{self.URL_ROOT}/{directory}"
response = urllib.request.urlopen(url).read().decode()
paths = re.findall('<a href="(.+?)"', response)
for path in paths:
path = re.sub(f"^{re.escape(url)}", "", path)
if path == "../":
continue
resolved_path = f"{directory}{path}"
if filter and not filter(path):
continue
if path.endswith("/"):
directories.append(resolved_path)
else:
files.append(resolved_path)
files = []
if prefix != "" and not prefix.endswith("/"):
prefix += "/"
directories = [prefix]
while len(directories) > 0:
directory = directories.pop()
traverse(directory, files, directories)
return files
def download_files(self, files, dest=None, num_parallel=None, re_match=None):
"""
Download files from Bintray in parallel. If file already exists, will
overwrite if the checksum does not match what Bintray says it should be
Parameters
----------
files : List[Dict]
File listing from Bintray
dest : str, default None
Defaults to current working directory
num_parallel : int, default 8
Number of files to download in parallel. If set to None, uses
default
"""
if dest is None:
dest = os.getcwd()
if num_parallel is None:
num_parallel = DEFAULT_PARALLEL_DOWNLOADS
if re_match is not None:
files = self._filter_files(files, re_match)
if num_parallel == 1:
for path in files:
self._download_file(dest, path)
else:
parallel_map_terminate_early(
functools.partial(self._download_file,
dest), files, num_parallel
)
def _filter_files(self, files, re_match):
regex = re.compile(re_match)
return [x for x in files if regex.match(x)]
def _download_file(self, dest, path):
base, filename = os.path.split(path)
dest_dir = os.path.join(dest, base)
os.makedirs(dest_dir, exist_ok=True)
dest_path = os.path.join(dest_dir, filename)
print(f"Downloading {path} to {dest_path}")
url = f"{self.URL_ROOT}/{path}"
self._download_url(url, dest_path)
def _download_url(self, url, dest_path, *, extra_args=None):
cmd = [
"curl",
"--fail",
"--location",
"--retry",
"5",
*(extra_args or []),
"--output",
dest_path,
url,
]
# Retry subprocess in case it fails with OpenSSL Connection errors
# https://issues.apache.org/jira/browse/INFRA-25274
for attempt in range(5):
if attempt > 0:
delay = attempt * 3
print(f"Waiting {delay} seconds before retrying {url}")
time.sleep(delay)
proc = subprocess.Popen(
cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = proc.communicate()
if proc.returncode != 0:
try:
# Don't leave possibly partial file around
os.remove(dest_path)
except IOError:
pass
if b"OpenSSL" not in stderr:
# We assume curl has already retried on other errors.
break
else:
return
raise Exception(
f"Downloading {url} failed\n" f"stdout: {stdout}\nstderr: {stderr}"
)
def _curl_version(self):
cmd = ["curl", "--version"]
out = subprocess.run(cmd, capture_output=True, check=True).stdout
match = re.search(r"curl (\d+)\.(\d+)\.(\d+) ", out.decode())
return (int(match.group(1)), int(match.group(2)), int(match.group(3)))
class Artifactory(Downloader):
URL_ROOT = "https://packages.apache.org/artifactory/arrow"
class Maven(Downloader):
URL_ROOT = (
"https://repository.apache.org"
+ "/content/repositories/staging/org/apache/arrow"
)
class GitHub(Downloader):
def __init__(self, repository, tag):
super().__init__()
if repository is None:
raise ValueError("--repository is required")
if tag is None:
raise ValueError("--tag is required")
self._repository = repository
self._tag = tag
# use the same name as the gh CLI
self._token = os.environ.get("GH_TOKEN")
def get_file_list(self, prefix, filter=None):
url = (
f"https://api.github.com/repos/{self._repository}/"
f"releases/tags/{self._tag}"
)
print("Fetching release from", url)
headers = {
"Accept": "application/vnd.github+json",
}
if self._token:
headers["Authorization"] = f"Bearer {self._token}"
request = urllib.request.Request(
url,
method="GET",
headers=headers,
)
raw_response = urllib.request.urlopen(request).read().decode()
response = json.loads(raw_response)
files = []
for asset in response["assets"]:
if filter and not filter(asset["name"]):
continue
# Don't use the API URL since it has a fairly strict rate
# limit unless logged in, and we have a lot of tiny
# artifacts
url = (
f"https://github.com/{self._repository}/"
f"releases/download/{self._tag}/{asset['name']}"
)
files.append((asset["name"], url))
return files
def _filter_files(self, files, re_match):
regex = re.compile(re_match)
return [x for x in files if regex.match(x[0])]
def _download_file(self, dest, asset):
name, url = asset
os.makedirs(dest, exist_ok=True)
dest_path = os.path.join(dest, name)
print(f"Downloading {url} to {dest_path}")
if os.path.isfile(dest_path):
print("Already downloaded", dest_path)
return
delay = random.randint(0, 3)
print(f"Waiting {delay} seconds to avoid rate limit")
time.sleep(delay)
extra_args = [
"--header",
"Accept: application/octet-stream",
]
if self._curl_version() >= (7, 71, 0):
# Also retry 403s
extra_args.append("--retry-all-errors")
self._download_url(url, dest_path, extra_args=extra_args)
def parallel_map_terminate_early(f, iterable, num_parallel):
tasks = []
with cf.ProcessPoolExecutor(num_parallel) as pool:
for v in iterable:
tasks.append(pool.submit(functools.partial(f, v)))
for task in cf.as_completed(tasks):
if task.exception() is not None:
e = task.exception()
for task in tasks:
task.cancel()
raise e
ARROW_REPOSITORY_PACKAGE_TYPES = [
"almalinux",
"amazon-linux",
"centos",
"debian",
"ubuntu",
]
ARROW_STANDALONE_PACKAGE_TYPES = ["nuget", "python"]
ARROW_PACKAGE_TYPES = ARROW_REPOSITORY_PACKAGE_TYPES + ARROW_STANDALONE_PACKAGE_TYPES
def download_rc_binaries(
version,
rc_number,
re_match=None,
dest=None,
num_parallel=None,
target_package_type=None,
repository=None,
tag=None,
):
version_string = f'{version}-rc{rc_number}'
version_pattern = re.compile(r"\d+\.\d+\.\d+")
if target_package_type:
package_types = [target_package_type]
else:
package_types = ARROW_PACKAGE_TYPES
for package_type in package_types:
def is_target(path):
match = version_pattern.search(path)
if not match:
return True
return match[0] == version
filter = is_target
if package_type == "github" or package_type in ARROW_STANDALONE_PACKAGE_TYPES:
downloader = GitHub(repository, tag)
prefix = ""
filter = None
elif package_type in ARROW_REPOSITORY_PACKAGE_TYPES:
downloader = Artifactory()
prefix = f'{package_type}-rc'
else:
downloader = Artifactory()
prefix = f'{package_type}-rc/{version_string}'
filter = None
files = downloader.get_file_list(prefix, filter=filter)
downloader.download_files(
files, re_match=re_match, dest=dest, num_parallel=num_parallel
)
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Download release candidate binaries")
parser.add_argument("version", type=str, help="The version number")
parser.add_argument(
"rc_number", type=int, help="The release candidate number, e.g. 0, 1, etc"
)
parser.add_argument(
"-e",
"--regexp",
type=str,
default=None,
help=(
"Regular expression to match on file names "
"to only download certain files"
),
)
parser.add_argument(
"--dest",
type=str,
default=os.getcwd(),
help="The output folder for the downloaded files",
)
parser.add_argument(
"--num_parallel",
type=int,
default=DEFAULT_PARALLEL_DOWNLOADS,
help="The number of concurrent downloads to do",
)
parser.add_argument(
"--package_type",
type=str,
default=None,
help="The package type to be downloaded",
)
parser.add_argument(
"--repository",
type=str,
help=("The repository to pull from " "(required if --package_type=github)"),
)
parser.add_argument(
"--tag",
type=str,
help=("The release tag to download " "(required if --package_type=github)"),
)
args = parser.parse_args()
download_rc_binaries(
args.version,
args.rc_number,
dest=args.dest,
re_match=args.regexp,
num_parallel=args.num_parallel,
target_package_type=args.package_type,
repository=args.repository,
tag=args.tag,
)