blob: 8e1e498f59193715e29dd3986e86e8e404186a4e [file] [log] [blame]
"""
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Dict, Generic, List, Optional
from pypaimon.common.json_util import T, json_field
from pypaimon.common.options import Options
from pypaimon.schema.schema import Schema
@dataclass
class PagedList(Generic[T]):
elements: List[T]
next_page_token: Optional[str] = None
class RESTResponse(ABC):
"""RESTResponse"""
@dataclass
class ErrorResponse(RESTResponse):
resource_type: Optional[str] = json_field("resourceType", default=None)
resource_name: Optional[str] = json_field("resourceName", default=None)
message: Optional[str] = json_field("message", default=None)
code: Optional[int] = json_field("code", default=None)
def __init__(
self,
resource_type: Optional[str] = None,
resource_name: Optional[str] = None,
message: Optional[str] = None,
code: Optional[int] = None,
):
self.resource_type = resource_type
self.resource_name = resource_name
self.message = message
self.code = code
@dataclass
class AuditRESTResponse(RESTResponse):
FIELD_OWNER = "owner"
FIELD_CREATED_AT = "createdAt"
FIELD_CREATED_BY = "createdBy"
FIELD_UPDATED_AT = "updatedAt"
FIELD_UPDATED_BY = "updatedBy"
owner: Optional[str] = json_field(FIELD_OWNER, default=None)
created_at: Optional[int] = json_field(FIELD_CREATED_AT, default=None)
created_by: Optional[str] = json_field(FIELD_CREATED_BY, default=None)
updated_at: Optional[int] = json_field(FIELD_UPDATED_AT, default=None)
updated_by: Optional[str] = json_field(FIELD_UPDATED_BY, default=None)
def get_owner(self) -> Optional[str]:
return self.owner
def get_created_at(self) -> Optional[int]:
return self.created_at
def get_created_by(self) -> Optional[str]:
return self.created_by
def get_updated_at(self) -> Optional[int]:
return self.updated_at
def get_updated_by(self) -> Optional[str]:
return self.updated_by
def put_audit_options_to(self, options: Dict[str, str]) -> None:
"""Puts audit-related options into the provided dictionary."""
options[self.FIELD_OWNER] = self.get_owner()
options[self.FIELD_CREATED_BY] = str(self.get_created_by())
options[self.FIELD_CREATED_AT] = str(self.get_created_at())
options[self.FIELD_UPDATED_BY] = str(self.get_updated_by())
options[self.FIELD_UPDATED_AT] = str(self.get_updated_at())
class PagedResponse(RESTResponse, Generic[T]):
FIELD_NEXT_PAGE_TOKEN = "nextPageToken"
@abstractmethod
def data(self) -> List[T]:
"""data"""
@abstractmethod
def get_next_page_token(self) -> str:
"""get_next_page_token"""
@dataclass
class ListDatabasesResponse(PagedResponse[str]):
FIELD_DATABASES = "databases"
databases: List[str] = json_field(FIELD_DATABASES)
next_page_token: str = json_field(PagedResponse.FIELD_NEXT_PAGE_TOKEN)
def data(self) -> List[str]:
return self.databases
def get_next_page_token(self) -> str:
return self.next_page_token
@dataclass
class ListTablesResponse(PagedResponse[str]):
FIELD_TABLES = "tables"
tables: Optional[List[str]] = json_field(FIELD_TABLES)
next_page_token: Optional[str] = json_field(
PagedResponse.FIELD_NEXT_PAGE_TOKEN)
def data(self) -> Optional[List[str]]:
return self.tables
def get_next_page_token(self) -> Optional[str]:
return self.next_page_token
@dataclass
class GetTableResponse(AuditRESTResponse):
"""Response for getting table"""
# Field constants for JSON serialization
FIELD_ID = "id"
FIELD_NAME = "name"
FIELD_PATH = "path"
FIELD_IS_EXTERNAL = "isExternal"
FIELD_SCHEMA_ID = "schemaId"
FIELD_SCHEMA = "schema"
id: Optional[str] = json_field(FIELD_ID, default=None)
name: Optional[str] = json_field(FIELD_NAME, default=None)
path: Optional[str] = json_field(FIELD_PATH, default=None)
is_external: Optional[bool] = json_field(FIELD_IS_EXTERNAL, default=None)
schema_id: Optional[int] = json_field(FIELD_SCHEMA_ID, default=None)
schema: Optional[Schema] = json_field(FIELD_SCHEMA, default=None)
def __init__(
self,
id: str,
name: str,
path: str,
is_external: bool,
schema_id: int,
schema: Schema,
owner: Optional[str] = None,
created_at: Optional[int] = None,
created_by: Optional[str] = None,
updated_at: Optional[int] = None,
updated_by: Optional[str] = None,
):
super().__init__(owner, created_at, created_by, updated_at, updated_by)
self.id = id
self.name = name
self.path = path
self.is_external = is_external
self.schema_id = schema_id
self.schema = schema
def get_id(self) -> str:
return self.id
def get_name(self) -> str:
return self.name
def get_path(self) -> str:
return self.path
def get_is_external(self) -> bool:
return self.is_external
def get_schema_id(self) -> int:
return self.schema_id
def get_schema(self) -> Schema:
return self.schema
@dataclass
class GetDatabaseResponse(AuditRESTResponse):
FIELD_ID = "id"
FIELD_NAME = "name"
FIELD_LOCATION = "location"
FIELD_OPTIONS = "options"
id: Optional[str] = json_field(FIELD_ID, default=None)
name: Optional[str] = json_field(FIELD_NAME, default=None)
location: Optional[str] = json_field(FIELD_LOCATION, default=None)
options: Optional[Dict[str, str]] = json_field(
FIELD_OPTIONS, default_factory=Dict)
def __init__(
self,
id: Optional[str] = None,
name: Optional[str] = None,
location: Optional[str] = None,
options: Optional[Dict[str, str]] = None,
owner: Optional[str] = None,
created_at: Optional[int] = None,
created_by: Optional[str] = None,
updated_at: Optional[int] = None,
updated_by: Optional[str] = None,
):
super().__init__(owner, created_at, created_by, updated_at, updated_by)
self.id = id
self.name = name
self.location = location
self.options = options or {}
def get_id(self) -> Optional[str]:
return self.id
def get_name(self) -> Optional[str]:
return self.name
def get_location(self) -> Optional[str]:
return self.location
def get_options(self) -> Dict[str, str]:
return self.options or {}
@dataclass
class ConfigResponse(RESTResponse):
FILED_DEFAULTS = "defaults"
defaults: Dict[str, str] = json_field(FILED_DEFAULTS)
def merge(self, options: Options) -> Options:
merged = options.copy()
merged.data.update(self.defaults)
return merged
@dataclass
class GetTableTokenResponse(RESTResponse):
FIELD_TOKEN = "token"
FIELD_EXPIRES_AT_MILLIS = "expiresAtMillis"
token: Dict[str, str] = json_field(FIELD_TOKEN, default=None)
expires_at_millis: Optional[int] = json_field(FIELD_EXPIRES_AT_MILLIS, default=None)
@dataclass
class CommitTableResponse(RESTResponse):
FIELD_SUCCESS = "success"
success: bool = json_field(FIELD_SUCCESS, default=False)
def is_success(self) -> bool:
return self.success