blob: 871b8db7645556378f8a908b7591833c02e4dbfb [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""DolphinScheduler oss resource plugin."""
from __future__ import annotations
from urllib.parse import urljoin, urlparse
import oss2
from pydolphinscheduler.constants import Symbol
from pydolphinscheduler.core.resource_plugin import ResourcePlugin
from pydolphinscheduler.resources_plugin.base.bucket import Bucket, OSSFileInfo
class OSS(ResourcePlugin, Bucket):
"""OSS object, declare OSS resource plugin for task and workflow to dolphinscheduler.
:param prefix: A string representing the prefix of OSS.
:param access_key_id: A string representing the ID of AccessKey for AliCloud OSS.
:param access_key_secret: A string representing the secret of AccessKey for AliCloud OSS.
"""
def __init__(
self,
prefix: str,
access_key_id: str | None = None,
access_key_secret: str | None = None,
*args,
**kwargs,
):
super().__init__(prefix, *args, **kwargs)
self.access_key_id = access_key_id
self.access_key_secret = access_key_secret
_bucket_file_info: OSSFileInfo | None = None
def get_bucket_file_info(self, path: str):
"""Get file information from the file url, like repository name, user, branch, and file path."""
self.get_index(path, Symbol.SLASH, 3)
result = urlparse(path)
hostname = result.hostname
elements = hostname.split(Symbol.POINT)
self._bucket_file_info = OSSFileInfo(
endpoint=f"{result.scheme}://"
f"{Symbol.POINT.join(str(elements[i]) for i in range(1, len(elements)))}",
bucket=hostname.split(Symbol.POINT)[0],
file_path=result.path[1:],
)
def read_file(self, suf: str):
"""Get the content of the file.
The address of the file is the prefix of the resource plugin plus the parameter suf.
"""
path = urljoin(self.prefix, suf)
self.get_bucket_file_info(path)
auth = oss2.Auth(self.access_key_id, self.access_key_secret)
bucket = oss2.Bucket(
auth, self._bucket_file_info.endpoint, self._bucket_file_info.bucket
)
result = bucket.get_object(self._bucket_file_info.file_path).read().decode()
return result.read().decode()