blob: cecf30d2e139bd6c8ded2714189c303db22c55cb [file] [log] [blame]
import datetime
from typing import List, Literal, Optional
from ninja import ModelSchema, Schema
from pydantic import BaseModel
from trackingserver_run_tracking.models import DAGRun, ExecutionStatus, NodeRun, NodeRunAttribute
from trackingserver_template.schema import CodeArtifactOut, NodeTemplateOut
class DAGRunIn(ModelSchema):
class Meta:
model = DAGRun
exclude = ["id", "created_at", "updated_at", "dag_template"]
class DAGRunUpdate(Schema):
# TODO -- find a better way of representing this
run_status: Literal[tuple(ExecutionStatus.values)]
run_end_time: datetime.datetime
upsert_tags: Optional[dict] = None
class DAGRunOut(ModelSchema):
class Meta:
model = DAGRun
fields = "__all__"
username_resolved: Optional[str] = None
dag_template_id: int
@classmethod
def create_with_username(cls, orm_model: DAGRun) -> "DAGRunOut":
return DAGRunOut(
**{
**DAGRunOut.from_orm(orm_model).dict(),
**{
"username_resolved": (
orm_model.launched_by.email if orm_model.launched_by else None
),
"dag_template_id": orm_model.dag_template_id,
},
}
)
class NodeRunIn(ModelSchema):
class Meta:
model = NodeRun
exclude = ["id", "created_at", "updated_at", "dag_run"]
class NodeRunOut(ModelSchema):
class Meta:
model = NodeRun
fields = "__all__"
class NodeRunAttributeIn(ModelSchema):
class Meta:
model = NodeRunAttribute
exclude = ["id", "created_at", "updated_at", "dag_run"]
class NodeRunAttributeOut(ModelSchema):
class Meta:
model = NodeRunAttribute
fields = "__all__"
class NodeRunOutWithAttributes(NodeRunOut):
attributes: List[NodeRunAttributeOut]
dag_run_id: int
@classmethod
def from_data(cls, node_run: NodeRun, attributes: List[NodeRunAttributeOut]):
return NodeRunOutWithAttributes(
**{
**NodeRunOut.from_orm(node_run).dict(),
**{"attributes": attributes, "dag_run_id": node_run.dag_run_id},
}
)
class DAGRunOutWithData(DAGRunOut):
node_runs: List[NodeRunOutWithAttributes]
dag_template_id: int
@classmethod
def from_data(cls, dag_run: DAGRun, node_runs: List[NodeRunOutWithAttributes]):
return DAGRunOutWithData(
**{
**DAGRunOut.from_orm(dag_run).dict(),
# Not sure why this isn't showing up -- todo, clean this up across the board
**{"node_runs": node_runs, "dag_template_id": dag_run.dag_template_id},
}
)
class DagRunsBulkRequest(BaseModel):
attributes: List[NodeRunAttributeIn]
task_updates: List[NodeRunIn]
class NodeRunOutWithExtraData(NodeRunOut, BaseModel):
dag_template_id: int
dag_run_id: int
@classmethod
def from_orm(cls, obj, dag_template_id):
node_run_out = NodeRunOut.from_orm(obj)
return NodeRunOutWithExtraData(
**node_run_out.dict(),
dag_template_id=dag_template_id,
dag_run_id=obj.dag_run_id,
)
# This is probably not the best
# We're doing a weird join
# We should probably just have the right ofreign key
class CatalogZoomResponse(BaseModel):
node_runs: List[NodeRunOutWithExtraData]
node_templates: List[NodeTemplateOut]
code_artifacts: List[CodeArtifactOut]