blob: 74c85259de49738d6e1e53ec0269113e70909e90 [file] [log] [blame]
import cgi
import copy
import importlib
import io
import logging
import mimetypes
import os
import warnings
from http import HTTPStatus
from urllib.parse import quote, unquote, urlencode, urlparse
import requests
from airavata.model.data.replica.ttypes import (
DataProductModel,
DataProductType,
DataReplicaLocationModel,
ReplicaLocationCategory,
ReplicaPersistentType
)
from django.conf import settings
from django.core.exceptions import ObjectDoesNotExist
from django.urls import reverse
from airavata_django_portal_sdk.user_storage.backends.base import (
ProvidesDownloadUrl
)
from ..util import convert_iso8601_to_datetime
logger = logging.getLogger(__name__)
TMP_INPUT_FILE_UPLOAD_DIR = "tmp"
def get_user_storage_provider(request, owner_username=None, storage_resource_id=None):
# TODO: default the module_class_name to MFT provider
module_class_name = None
options = {}
if storage_resource_id is None:
if not hasattr(settings, 'USER_STORAGES'):
# make this backward compatible with the older settings
module_class_name = 'airavata_django_portal_sdk.user_storage.backends.DjangoFileSystemProvider'
storage_resource_id = settings.GATEWAY_DATA_STORE_RESOURCE_ID
options = dict(directory=settings.GATEWAY_DATA_STORE_DIR)
logger.warning("Please add the USER_STORAGES setting. Using legacy GATEWAY_DATA_STORE_RESOURCE_ID and GATEWAY_DATA_STORE_DIR settings.")
else:
conf = settings.USER_STORAGES["default"]
module_class_name = conf['BACKEND']
storage_resource_id = conf['STORAGE_RESOURCE_ID']
options = conf.get('OPTIONS', {})
else:
if not hasattr(settings, 'USER_STORAGES'):
# make this backward compatible with the older settings
module_class_name = 'airavata_django_portal_sdk.user_storage.backends.DjangoFileSystemProvider'
storage_resource_id = settings.GATEWAY_DATA_STORE_RESOURCE_ID
options = dict(directory=settings.GATEWAY_DATA_STORE_DIR)
logger.warning("Please add the USER_STORAGES setting. Using legacy GATEWAY_DATA_STORE_RESOURCE_ID and GATEWAY_DATA_STORE_DIR settings.")
else:
for key in settings.USER_STORAGES:
conf = settings.USER_STORAGES[key]
if conf['STORAGE_RESOURCE_ID'] == storage_resource_id:
module_class_name = conf['BACKEND']
options = conf.get('OPTIONS', {})
break
module_name, class_name = module_class_name.rsplit(".", 1)
module = importlib.import_module(module_name)
BackendClass = getattr(module, class_name)
authz_token = request.authz_token
context = {
'request': request,
'owner_username': owner_username,
}
instance = BackendClass(authz_token, storage_resource_id, context=context, **options)
return instance
def save(request, path, file, name=None, content_type=None, storage_resource_id=None, experiment_id=None):
"""
Save file in path in the user's storage and return DataProduct. If
`experiment_id` provided then the path will be relative to the experiment
data directory.
"""
if _is_remote_api():
if name is None and hasattr(file, 'name'):
name = os.path.basename(file.name)
files = {'file': (name, file, content_type)
if content_type is not None else (name, file)}
resp = _call_remote_api(request,
"/user-storage/~/{path}",
path_params={"path": path},
data={"experiment-id": experiment_id},
method="post",
files=files)
data = resp.json()
product_uri = data['uploaded']['productUri']
data_product = request.airavata_client.getDataProduct(
request.authz_token, product_uri)
return data_product
final_path = _get_final_path(request, path, experiment_id)
backend = get_user_storage_provider(request, storage_resource_id=storage_resource_id)
storage_resource_id, resource_path = backend.save(final_path, file, name=name, content_type=content_type)
data_product = _save_data_product(
request, resource_path, storage_resource_id, name=name, content_type=content_type, backend=backend
)
return data_product
def save_input_file(request, file, name=None, content_type=None, storage_resource_id=None):
"""Save input file in staging area for input files."""
if _is_remote_api():
if name is None and hasattr(file, 'name'):
name = os.path.basename(file.name)
files = {'file': (name, file, content_type)
if content_type is not None else (name, file)}
resp = _call_remote_api(request,
"/upload",
method="post",
files=files)
data = resp.json()
product_uri = data['data-product']['productUri']
data_product = request.airavata_client.getDataProduct(
request.authz_token, product_uri)
return data_product
else:
backend = get_user_storage_provider(request, storage_resource_id=storage_resource_id)
file_name = name if name is not None else os.path.basename(file.name)
storage_resource_id, resource_path = backend.save(
TMP_INPUT_FILE_UPLOAD_DIR, file, name=file_name)
data_product = _save_data_product(
request, resource_path, storage_resource_id, name=name, content_type=content_type, backend=backend
)
return data_product
def copy_input_file(request, data_product=None, data_product_uri=None, storage_resource_id=None):
if data_product is None:
data_product = _get_data_product(request, data_product_uri)
source_storage_resource_id, source_resource_path = _get_replica_resource_id_and_filepath(data_product)
source_backend = get_user_storage_provider(request,
owner_username=data_product.ownerName,
storage_resource_id=source_storage_resource_id)
file = source_backend.open(source_resource_path)
name = data_product.productName
target_backend = get_user_storage_provider(request, storage_resource_id=storage_resource_id)
storage_resource_id, full_path = target_backend.save(TMP_INPUT_FILE_UPLOAD_DIR, file, name=name)
return _save_copy_of_data_product(request, full_path, data_product, storage_resource_id)
def is_input_file(request, data_product=None, data_product_uri=None):
if data_product is None:
data_product = _get_data_product(request, data_product_uri)
if _is_remote_api():
resp = _call_remote_api(
request,
"/data-products/",
params={'product-uri': data_product.productUri})
data = resp.json()
return data['isInputFileUpload']
# Check if file is one of user's files and in TMP_INPUT_FILE_UPLOAD_DIR
storage_resource_id, path = _get_replica_resource_id_and_filepath(data_product)
backend = get_user_storage_provider(request,
owner_username=data_product.ownerName,
storage_resource_id=storage_resource_id)
if backend.exists(path):
directories, files = backend.get_metadata(path)
rel_path = files[0]['path']
return os.path.dirname(rel_path) == TMP_INPUT_FILE_UPLOAD_DIR
else:
return False
def move(request, data_product=None, path=None, data_product_uri=None, storage_resource_id=None):
if data_product is None:
data_product = _get_data_product(request, data_product_uri)
source_storage_resource_id, source_path = _get_replica_resource_id_and_filepath(data_product)
source_backend = get_user_storage_provider(request,
owner_username=data_product.ownerName,
storage_resource_id=source_storage_resource_id)
file = source_backend.open(source_path)
file_name = data_product.productName
target_backend = get_user_storage_provider(request, storage_resource_id=storage_resource_id)
storage_resource_id, full_path = target_backend.save(path, file, name=file_name)
data_product_copy = _save_copy_of_data_product(request, full_path, data_product, storage_resource_id)
# Remove the source file and data product metadata
source_backend.delete(source_path)
_delete_data_product(data_product.ownerName, source_path, storage_resource_id=source_storage_resource_id)
return data_product_copy
def move_input_file(request, data_product=None, path=None, data_product_uri=None, storage_resource_id=None):
warnings.warn("Use 'move' instead.", DeprecationWarning)
return move(request, data_product=data_product, path=path, data_product_uri=data_product_uri, storage_resource_id=storage_resource_id)
def get_download_url(request, data_product=None, data_product_uri=None, force_download=False, mime_type=None):
"Return URL for downloading data product. One of `data_product` or `data_product_uri` is required."
if data_product is None:
data_product = _get_data_product(request, data_product_uri)
storage_resource_id, path = _get_replica_resource_id_and_filepath(data_product)
backend = get_user_storage_provider(request,
owner_username=data_product.ownerName,
storage_resource_id=storage_resource_id)
if isinstance(backend, ProvidesDownloadUrl):
return backend.get_download_url(path)
else:
# if backend doesn't provide a download url, then use default one
# that uses backend to read the file
params = {"data-product-uri": data_product.productUri}
if force_download:
params['download'] = ''
if mime_type is not None:
params['mime-type'] = mime_type
return f"{reverse('airavata_django_portal_sdk:download_file')}?{urlencode(params)}"
def get_lazy_download_url(request, data_product=None, data_product_uri=None):
if data_product is None:
data_product = _get_data_product(request, data_product_uri)
# /download will call get_download_url and redirect to it
return (reverse("airavata_django_portal_sdk:download") + "?" +
urlencode({"data-product-uri": data_product.productUri}))
def open_file(request, data_product=None, data_product_uri=None):
"""
Return file object for replica if it exists in user storage. One of
`data_product` or `data_product_uri` is required.
"""
if data_product is None:
data_product = _get_data_product(request, data_product_uri)
if _is_remote_api():
resp = _call_remote_api(
request,
"/download",
params={'data-product-uri': data_product.productUri},
# TODO: once /sdk/download is integrated with airavata-django-portal, this can be updated to
# base_url="/sdk"
base_url="/api")
file = io.BytesIO(resp.content)
disposition = resp.headers['Content-Disposition']
disp_value, disp_params = cgi.parse_header(disposition)
# Give the file object a name just like a real opened file object
file.name = disp_params['filename']
return file
else:
storage_resource_id, path = _get_replica_resource_id_and_filepath(data_product)
backend = get_user_storage_provider(request,
owner_username=data_product.ownerName,
storage_resource_id=storage_resource_id)
return backend.open(path)
def exists(request, data_product=None, data_product_uri=None):
"""
Return True if replica for data_product exists in user storage. One of
`data_product` or `data_product_uri` is required.
"""
if data_product is None:
data_product = _get_data_product(request, data_product_uri)
if _is_remote_api():
resp = _call_remote_api(
request,
"/data-products/",
params={'product-uri': data_product.productUri},
raise_for_status=False)
if resp.status_code == HTTPStatus.NOT_FOUND:
return False
resp.raise_for_status()
data = resp.json()
return data['downloadURL'] is not None
else:
storage_resource_id, path = _get_replica_resource_id_and_filepath(data_product)
backend = get_user_storage_provider(request,
owner_username=data_product.ownerName,
storage_resource_id=storage_resource_id)
return backend.exists(path)
def dir_exists(request, path, storage_resource_id=None, experiment_id=None):
"""
Return True if path exists in user's data store. If `experiment_id` provided
then the path will be relative to the experiment data directory.
"""
if _is_remote_api():
resp = _call_remote_api(request,
"/user-storage/~/",
params={"path": path, "experiment-id": experiment_id},
raise_for_status=False)
if resp.status_code == HTTPStatus.NOT_FOUND:
return False
resp.raise_for_status()
return resp.json()['isDir']
else:
final_path, owner_username = _get_final_path_and_owner_username(request, path, experiment_id)
backend = get_user_storage_provider(
request, storage_resource_id=storage_resource_id, owner_username=owner_username)
return backend.is_dir(final_path)
def user_file_exists(request, path, storage_resource_id=None, experiment_id=None):
"""
If file exists, return data product URI, else None. If `experiment_id`
provided then the path will be relative to the experiment data directory.
"""
if _is_remote_api():
resp = _call_remote_api(request,
"/user-storage/~/{path}",
path_params={"path": path},
params={"experiment-id": experiment_id},
raise_for_status=False)
if resp.status_code == HTTPStatus.NOT_FOUND or resp.json()['isDir']:
return None
resp.raise_for_status()
return resp.json()['files'][0]['dataProductURI']
final_path, owner_username = _get_final_path_and_owner_username(request, path, experiment_id)
backend = get_user_storage_provider(
request, storage_resource_id=storage_resource_id, owner_username=owner_username)
if backend.is_file(final_path):
_, files = backend.get_metadata(final_path)
full_path = files[0]['resource_path']
data_product_uri = _get_data_product_uri(request, full_path,
backend.resource_id, owner=owner_username)
return data_product_uri
else:
return None
def delete_dir(request, path, storage_resource_id=None, experiment_id=None):
"""
Delete path in user's data store, if it exists. If `experiment_id` provided
then the path will be relative to the experiment data directory.
"""
if _is_remote_api():
resp = _call_remote_api(request,
"/user-storage/~/{path}",
path_params={"path": path},
data={"experiment-id": experiment_id},
method="delete",
raise_for_status=False)
_raise_404(resp, f"File path does not exist {path}")
resp.raise_for_status()
return
backend = get_user_storage_provider(request, storage_resource_id=storage_resource_id)
final_path = _get_final_path(request, path, experiment_id)
backend.delete(final_path)
def delete_user_file(request, path, storage_resource_id=None, experiment_id=None):
"""Delete file in user's data store, if it exists."""
if _is_remote_api():
resp = _call_remote_api(request,
"/user-storage/~/{path}",
path_params={"path": path},
data={"experiment-id": experiment_id},
method="delete",
raise_for_status=False)
_raise_404(resp, f"File path does not exist {path}")
resp.raise_for_status()
return
backend = get_user_storage_provider(request, storage_resource_id=storage_resource_id)
final_path = _get_final_path(request, path, experiment_id)
backend.delete(final_path)
def update_file_content(request, path, fileContentText, storage_resource_id=None):
if _is_remote_api():
_call_remote_api(request,
"/user-storage/~/{path}",
path_params={"path": path},
method="put",
data={"fileContentText": fileContentText}
)
return
else:
backend = get_user_storage_provider(request, storage_resource_id=storage_resource_id)
file = io.StringIO(fileContentText)
backend.update(path, file)
def update_data_product_content(request, data_product=None, fileContentText="", data_product_uri=None):
if data_product is None:
data_product = _get_data_product(request, data_product_uri)
if _is_remote_api():
_call_remote_api(request,
"/data-products/",
params={'product-uri': data_product.productUri},
method="put",
data={"fileContentText": fileContentText},
)
return
storage_resource_id, path = _get_replica_resource_id_and_filepath(data_product)
update_file_content(request, path, fileContentText, storage_resource_id=storage_resource_id)
def get_file_metadata(request, path, storage_resource_id=None, experiment_id=None):
if _is_remote_api():
resp = _call_remote_api(request,
"/user-storage/~/{path}",
path_params={"path": path},
params={"experiment-id": experiment_id},
raise_for_status=False
)
_raise_404(resp, "User storage file path does not exist")
data = resp.json()
if data["isDir"]:
raise Exception("User storage path is a directory, not a file")
file = data['files'][0]
file['created_time'] = convert_iso8601_to_datetime(file['createdTime'])
file['mime_type'] = file['mimeType']
file['data-product-uri'] = file['dataProductURI']
return file
final_path, owner_username = _get_final_path_and_owner_username(request, path, experiment_id)
backend = get_user_storage_provider(request, storage_resource_id=storage_resource_id, owner_username=owner_username)
if backend.is_file(final_path):
_, files = backend.get_metadata(final_path)
file = files[0]
data_product_uri = _get_data_product_uri(request, file['resource_path'],
storage_resource_id=backend.resource_id,
owner=owner_username)
data_product = request.airavata_client.getDataProduct(
request.authz_token, data_product_uri)
mime_type = None
if 'mime-type' in data_product.productMetadata:
mime_type = data_product.productMetadata['mime-type']
file['data-product-uri'] = data_product_uri
file['mime_type'] = mime_type
# TODO: remove this, there's no need for hidden files
file['hidden'] = False
return file
else:
raise ObjectDoesNotExist("File does not exist at that path.")
def get_data_product_metadata(request, data_product=None, data_product_uri=None):
if data_product is None:
data_product = _get_data_product(request, data_product_uri)
storage_resource_id, path = _get_replica_resource_id_and_filepath(data_product)
if _is_remote_api():
resp = _call_remote_api(
request,
"/data-products/",
params={'product-uri': data_product.productUri})
data = resp.json()
file = {
"name": os.path.basename(path),
# FIXME: since this isn't the true relative path, going to leave out for now
# "path": path,
"resource_path": path,
"created_time": convert_iso8601_to_datetime(data['creationTime'], microseconds=False),
"size": data['filesize']
}
mime_type = None
if 'mime-type' in data_product.productMetadata:
mime_type = data_product.productMetadata['mime-type']
file['data-product-uri'] = data_product_uri
file['mime_type'] = mime_type
# TODO: remove this, there's no need for hidden files
file['hidden'] = False
return file
backend = get_user_storage_provider(request,
owner_username=data_product.ownerName,
storage_resource_id=storage_resource_id)
if backend.is_file(path):
_, files = backend.get_metadata(path)
file = files[0]
mime_type = None
if 'mime-type' in data_product.productMetadata:
mime_type = data_product.productMetadata['mime-type']
file['data-product-uri'] = data_product_uri
file['mime_type'] = mime_type
# TODO: remove this, there's no need for hidden files
file['hidden'] = False
return file
else:
raise ObjectDoesNotExist("File does not exist at that path.")
def get_file(request, path, storage_resource_id=None, experiment_id=None):
warnings.warn("Use 'get_file_metadata' instead.", DeprecationWarning)
return get_file_metadata(request, path, storage_resource_id, experiment_id=experiment_id)
def delete(request, data_product=None, data_product_uri=None):
"""
Delete replica for data product in this data store. One of `data_product`
or `data_product_uri` is required.
"""
if data_product is None:
data_product = _get_data_product(request, data_product_uri)
if _is_remote_api():
_call_remote_api(
request,
"/delete-file",
params={'data-product-uri': data_product.productUri},
method="delete")
return
else:
storage_resource_id, path = _get_replica_resource_id_and_filepath(data_product)
backend = get_user_storage_provider(request, storage_resource_id=storage_resource_id)
try:
backend.delete(path)
_delete_data_product(data_product.ownerName, path, storage_resource_id)
except Exception:
logger.exception(
"Unable to delete file {} for data product uri {}".format(
path, data_product.productUri
)
)
raise
def listdir(request, path, storage_resource_id=None, experiment_id=None):
"""
Return a tuple of two lists, one for directories, the second for files. If
`experiment_id` provided then the path will be relative to the experiment
data directory.
"""
if _is_remote_api():
resp = _call_remote_api(request,
"/user-storage/~/",
params={"path": path, "experiment-id": experiment_id},
)
data = resp.json()
for directory in data['directories']:
# Convert JSON ISO8601 timestamp to datetime instance
directory['created_time'] = convert_iso8601_to_datetime(
directory['createdTime'])
for file in data['files']:
# Convert JSON ISO8601 timestamp to datetime instance
file['created_time'] = convert_iso8601_to_datetime(
file['createdTime'])
file['mime_type'] = file['mimeType']
file['data-product-uri'] = file['dataProductURI']
return data['directories'], data['files']
final_path, owner_username = _get_final_path_and_owner_username(request, path, experiment_id)
backend = get_user_storage_provider(request, storage_resource_id=storage_resource_id,
owner_username=owner_username)
directories, files = backend.get_metadata(final_path)
# Mark the TMP_INPUT_FILE_UPLOAD_DIR directory as hidden in the UI
for directory in directories:
directory['hidden'] = directory['path'] == TMP_INPUT_FILE_UPLOAD_DIR
# for each file, lookup or register a data product and enrich the file
# metadata with data-product-uri and mime-type
for file in files:
data_product_uri = _get_data_product_uri(request, file['resource_path'],
storage_resource_id=backend.resource_id,
owner=owner_username,
backend=backend)
data_product = request.airavata_client.getDataProduct(
request.authz_token, data_product_uri)
mime_type = None
if 'mime-type' in data_product.productMetadata:
mime_type = data_product.productMetadata['mime-type']
file['data-product-uri'] = data_product_uri
file['mime_type'] = mime_type
# TODO: remove this, there's no need for hidden files
file['hidden'] = False
return directories, files
def list_experiment_dir(request, experiment_id, path="", storage_resource_id=None):
"""
List files, directories in experiment data directory. Returns a tuple,
see `listdir`.
"""
if _is_remote_api():
resp = _call_remote_api(request,
"/experiment-storage/{experiment_id}/{path}",
path_params={"path": path,
"experiment_id": experiment_id},
)
data = resp.json()
for directory in data['directories']:
# Convert JSON ISO8601 timestamp to datetime instance
directory['created_time'] = convert_iso8601_to_datetime(
directory['createdTime'])
for file in data['files']:
# Convert JSON ISO8601 timestamp to datetime instance
file['created_time'] = convert_iso8601_to_datetime(
file['createdTime'])
file['mime_type'] = file['mimeType']
file['data-product-uri'] = file['dataProductURI']
return data['directories'], data['files']
experiment = request.airavata_client.getExperiment(
request.authz_token, experiment_id)
backend = get_user_storage_provider(request,
storage_resource_id=storage_resource_id,
owner_username=experiment.userName)
exp_data_dir = experiment.userConfigurationData.experimentDataDir
exp_data_path = os.path.join(exp_data_dir, path)
if backend.exists(exp_data_path):
directories, files = backend.get_metadata(exp_data_path)
for directory in directories:
# construct the relative path of the directory within the experiment data dir
directory['path'] = os.path.relpath(directory['resource_path'], exp_data_dir)
# for each file, lookup or register a data product and enrich the file
# metadata with data-product-uri and mime-type
for file in files:
data_product_uri = _get_data_product_uri(request, file['resource_path'],
storage_resource_id=backend.resource_id,
backend=backend, owner=experiment.userName)
data_product = request.airavata_client.getDataProduct(
request.authz_token, data_product_uri)
mime_type = None
if 'mime-type' in data_product.productMetadata:
mime_type = data_product.productMetadata['mime-type']
file['data-product-uri'] = data_product_uri
file['mime_type'] = mime_type
# construct the relative path of the file within the experiment data dir
file['path'] = os.path.relpath(file['resource_path'], exp_data_dir)
# TODO: remove this, there's no need for hidden files
file['hidden'] = False
return directories, files
else:
raise ObjectDoesNotExist("Experiment data directory does not exist")
def experiment_dir_exists(request, experiment_id, path="", storage_resource_id=None):
"Returns True if the path exists in the given experiment's data directory."
if _is_remote_api():
resp = _call_remote_api(request,
"/experiment-storage/{experiment_id}/{path}",
path_params={"path": path,
"experiment_id": experiment_id},
raise_for_status=False)
if resp.status_code == HTTPStatus.NOT_FOUND:
return False
resp.raise_for_status()
return resp.json()['isDir']
experiment = request.airavata_client.getExperiment(
request.authz_token, experiment_id)
exp_data_path = experiment.userConfigurationData.experimentDataDir
if exp_data_path is None:
return False
backend = get_user_storage_provider(request,
storage_resource_id=storage_resource_id,
owner_username=experiment.userName)
return backend.exists(exp_data_path)
def get_experiment_dir(request, project_name=None, experiment_name=None, path=None, storage_resource_id=None):
warnings.warn("Use 'create_user_dir' instead.", DeprecationWarning)
storage_resource_id, resource_path = create_user_dir(request,
dir_names=[project_name, experiment_name],
create_unique=True,
storage_resource_id=storage_resource_id)
return resource_path
def create_user_dir(request, path="", dir_names=(), create_unique=False, storage_resource_id=None, experiment_id=None):
"""
Creates a directory, and intermediate directories if given, at the given
path in the user's storage. `dir_names` should be either a list or tuple of
directories names to create at the given path. If `create_unique` is True
and the given `dir_names` results in an already existing directory, the
`dir_names` will be modified (for example, random suffix added) until it
results in a name for a directory that doesn't exist and that directory will
get created. If `create_unique` is False (the default) and the directory
already exists, no directory will be created, but the directory resource
information will be returned. `dir_names` may also be modified to sanitize
them for file paths, for example, converting spaces to underscores. Returns
a tuple of the storage_resource_id and resource_path of the directory
resource.
If `experiment_id` provided then the path will be relative to the experiment
data directory.
"""
if _is_remote_api():
logger.debug(f"path={path}")
resp = _call_remote_api(request,
"/user-storage/~/{path}",
path_params={"path": path},
data={"experiment-id": experiment_id},
method="post")
json = resp.json()
# 'path' is a new response attribute, for backwards compatibility check if it exists first
if 'path' in json:
path = json['path']
# FIXME: should use the storage_resource_id returned from remote API call
return storage_resource_id, path
backend = get_user_storage_provider(request, storage_resource_id=storage_resource_id)
# For backwards compatibility, manufacture the dir_names array as needed
final_path = _get_final_path(request, path, experiment_id)
if len(dir_names) == 0:
dir_names = []
while not backend.exists(final_path):
final_path, dir_name = os.path.split(final_path)
if dir_name == '':
break
dir_names.insert(0, dir_name)
storage_resource_id, resource_path = backend.create_dirs(final_path, dir_names=dir_names, create_unique=create_unique)
return storage_resource_id, resource_path
def create_symlink(request, src_path, dest_path, storage_resource_id=None):
"""Create link named dest_path pointing to src_path on storage resource."""
if _is_remote_api():
logger.warning("create_symlink isn't supported in Remote API mode")
return
backend = get_user_storage_provider(request, storage_resource_id=storage_resource_id)
backend.create_symlink(src_path, dest_path)
def get_rel_experiment_dir(request, experiment_id, storage_resource_id=None):
"""Return experiment data dir path relative to user's directory."""
warnings.warn("Use 'list_experiment_dir' instead.", DeprecationWarning)
if _is_remote_api():
resp = _call_remote_api(request,
"/experiments/{experimentId}/",
path_params={"experimentId": experiment_id})
resp.raise_for_status()
return resp.json()['relativeExperimentDataDir']
experiment = request.airavata_client.getExperiment(
request.authz_token, experiment_id)
if (experiment.userConfigurationData and
experiment.userConfigurationData.experimentDataDir):
backend = get_user_storage_provider(request, storage_resource_id=storage_resource_id)
data_dir = experiment.userConfigurationData.experimentDataDir
if backend.exists(data_dir):
directories, _ = backend.get_metadata(os.path.dirname(data_dir))
for directory in directories:
if directory['name'] == os.path.basename(data_dir):
return directory['path']
raise Exception(f"Could not find relative path to experiment data dir {data_dir}")
else:
return None
else:
return None
def _get_data_product_uri(request, full_path, storage_resource_id, owner=None, backend=None):
from airavata_django_portal_sdk import models
if owner is None:
owner = request.user.username
user_file = models.UserFiles.objects.filter(
username=owner, file_path=full_path, file_resource_id=storage_resource_id)
if user_file.exists():
product_uri = user_file[0].file_dpu
else:
data_product = _save_data_product(request, full_path, storage_resource_id, owner=owner, backend=backend)
product_uri = data_product.productUri
return product_uri
def _get_data_product(request, data_product_uri):
return request.airavata_client.getDataProduct(
request.authz_token, data_product_uri)
def _save_data_product(request, full_path, storage_resource_id, name=None, content_type=None, owner=None, backend=None):
"Create, register and record in DB a data product for full_path."
if owner is None:
owner = request.user.username
data_product = _create_data_product(
owner, full_path, storage_resource_id, name=name, content_type=content_type, backend=backend
)
product_uri = _register_data_product(request, full_path, data_product, storage_resource_id, owner=owner)
data_product.productUri = product_uri
return data_product
def _register_data_product(request, full_path, data_product, storage_resource_id, owner=None):
if owner is None:
owner = request.user.username
product_uri = request.airavata_client.registerDataProduct(
request.authz_token, data_product
)
from airavata_django_portal_sdk import models
user_file_instance = models.UserFiles(
username=owner,
file_path=full_path,
file_dpu=product_uri,
file_resource_id=storage_resource_id)
user_file_instance.save()
return product_uri
def _save_copy_of_data_product(request, full_path, data_product, storage_resource_id):
"""Save copy of a data product with a different path."""
data_product_copy = _copy_data_product(request, data_product, full_path, storage_resource_id)
product_uri = _register_data_product(request, full_path, data_product_copy, storage_resource_id)
data_product_copy.productUri = product_uri
return data_product_copy
def _copy_data_product(request, data_product, full_path, storage_resource_id):
"""Create an unsaved copy of a data product with different path."""
data_product_copy = copy.copy(data_product)
data_product_copy.productUri = None
data_product_copy.ownerName = request.user.username
data_replica_location = _create_replica_location(
full_path, data_product_copy.productName, storage_resource_id
)
data_product_copy.replicaLocations = [data_replica_location]
return data_product_copy
def _delete_data_product(username, full_path, storage_resource_id):
# TODO: call API to delete data product from replica catalog when it is
# available (not currently implemented)
from airavata_django_portal_sdk import models
user_file = models.UserFiles.objects.filter(
username=username, file_path=full_path, file_resource_id=storage_resource_id)
if user_file.exists():
user_file.delete()
def _create_data_product(username, full_path, storage_resource_id, name=None, content_type=None, backend=None):
data_product = DataProductModel()
data_product.gatewayId = settings.GATEWAY_ID
data_product.ownerName = username
if name is not None:
file_name = name
else:
file_name = os.path.basename(full_path)
data_product.productName = file_name
data_product.dataProductType = DataProductType.FILE
final_content_type = _determine_content_type(full_path, content_type, backend=backend)
if final_content_type is not None:
data_product.productMetadata = {"mime-type": final_content_type}
data_replica_location = _create_replica_location(full_path, file_name, storage_resource_id)
data_product.replicaLocations = [data_replica_location]
return data_product
def _determine_content_type(full_path, content_type=None, backend=None):
result = content_type
if result is None:
# Try to guess the content-type from file extension
guessed_type, encoding = mimetypes.guess_type(full_path)
result = guessed_type
if result is None or result == "application/octet-stream":
# Check if file is Unicode text by trying to read some of it
try:
if backend is not None:
file = backend.open(full_path)
# Try to decode the first kb as UTF8
file.read(1024).decode('utf-8')
result = "text/plain"
except UnicodeDecodeError:
logger.debug(f"Failed to read as Unicode text: {full_path}")
return result
def _create_replica_location(full_path, file_name, storage_resource_id):
data_replica_location = DataReplicaLocationModel()
data_replica_location.storageResourceId = storage_resource_id
data_replica_location.replicaName = "{} gateway data store copy".format(
file_name)
data_replica_location.replicaLocationCategory = (
ReplicaLocationCategory.GATEWAY_DATA_STORE
)
data_replica_location.replicaPersistentType = ReplicaPersistentType.TRANSIENT
data_replica_location.filePath = quote(full_path)
return data_replica_location
def _get_replica_filepath(data_product):
replica_filepaths = [
rep.filePath
for rep in data_product.replicaLocations
if rep.replicaLocationCategory == ReplicaLocationCategory.GATEWAY_DATA_STORE
]
replica_filepath = replica_filepaths[0] if len(
replica_filepaths) > 0 else None
if replica_filepath:
return unquote(urlparse(replica_filepath).path)
return None
def _get_replica_location(data_product, category=ReplicaLocationCategory.GATEWAY_DATA_STORE):
replica_locations = [
rep
for rep in data_product.replicaLocations
if rep.replicaLocationCategory == ReplicaLocationCategory.GATEWAY_DATA_STORE
]
return replica_locations[0] if len(replica_locations) > 0 else None
def _get_replica_resource_id_and_filepath(data_product):
replica_location = _get_replica_location(data_product)
if replica_location is not None:
return (replica_location.storageResourceId,
unquote(urlparse(replica_location.filePath).path))
else:
return None, None
def _get_final_path(request, path, experiment_id):
"If experiment_id is given, join the path to it's data directory"
final_path = path
if experiment_id is not None:
experiment = request.airavata_client.getExperiment(
request.authz_token, experiment_id)
exp_data_dir = experiment.userConfigurationData.experimentDataDir
final_path = os.path.join(exp_data_dir, path)
return final_path
def _get_final_path_and_owner_username(request, path, experiment_id):
"If experiment_id is given, join the path to it's data directory and also return owner username"
final_path = path
if experiment_id is not None:
experiment = request.airavata_client.getExperiment(
request.authz_token, experiment_id)
exp_data_dir = experiment.userConfigurationData.experimentDataDir
return os.path.join(exp_data_dir, path), experiment.userName
return final_path, None
def _is_remote_api():
return getattr(settings, 'GATEWAY_DATA_STORE_REMOTE_API', None) is not None
def _call_remote_api(
request,
path,
path_params=None,
method="get",
raise_for_status=True,
base_url="/api",
**kwargs):
headers = {
'Authorization': f'Bearer {request.authz_token.accessToken}'}
encoded_path_params = {}
if path_params is not None:
for pk, pv in path_params.items():
encoded_path_params[pk] = quote(pv)
encoded_path = path.format(**encoded_path_params)
logger.debug(f"encoded_path={encoded_path}")
remote_api_url = settings.GATEWAY_DATA_STORE_REMOTE_API
if remote_api_url.endswith("/api"):
warnings.warn(f"Set GATEWAY_DATA_STORE_REMOTE_API to \"{remote_api_url}\". /api is no longer needed.", DeprecationWarning)
remote_api_url = remote_api_url[0:remote_api_url.rfind("/api")]
r = requests.request(
method,
f'{remote_api_url}{base_url}{encoded_path}',
headers=headers,
**kwargs,
)
if raise_for_status:
r.raise_for_status()
return r
def _raise_404(response, msg, exception_class=ObjectDoesNotExist):
if response.status_code == 404:
raise exception_class(msg)