blob: 73ea30132da6ed1792eb9f80fc8ae8e78cd92170 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime
from enum import Enum
from typing import Optional
import re
from sqlmodel import Session, Column, Text
from pydevlake import Field, Connection, TransformationRule
from pydevlake.model import ToolModel, ToolScope
from pydevlake.pipeline_tasks import RefDiffOptions
class AzureDevOpsConnection(Connection):
token: str
organization: Optional[str]
class AzureDevOpsTransformationRule(TransformationRule):
refdiff: Optional[RefDiffOptions]
deployment_pattern: Optional[re.Pattern]
production_pattern: Optional[re.Pattern]
class GitRepository(ToolScope, table=True):
url: str
remote_url: str
default_branch: Optional[str]
project_id: str
org_id: str
parent_repository_url: Optional[str] = Field(source='parentRepository/url')
class GitPullRequest(ToolModel, table=True):
class PRStatus(Enum):
Abandoned = "abandoned"
Active = "active"
Completed = "completed"
pull_request_id: int = Field(primary_key=True)
description: Optional[str] = Field(sa_column=Column(Text))
status: PRStatus
created_by_id: str = Field(source='/createdBy/id')
created_by_name: str = Field(source='/createdBy/displayName')
creation_date: datetime.datetime
closed_date: Optional[datetime.datetime]
source_commit_sha: str = Field(source='/lastMergeSourceCommit/commitId')
target_commit_sha: str = Field(source='/lastMergeTargetCommit/commitId')
merge_commit_sha: Optional[str] = Field(source='/lastMergeCommit/commitId')
url: Optional[str]
type: Optional[str] = Field(source='/labels/0/name') # TODO: get this off transformation rules regex
title: Optional[str]
target_ref_name: Optional[str]
source_ref_name: Optional[str]
fork_repo_id: Optional[str] = Field(source='/forkSource/repository/id')
def migrate(self, session: Session):
dialect =
if dialect not in ['mysql', 'postgresql']:
raise Exception(f'Unsupported dialect {dialect}')
table = self.__tablename__
if dialect == 'mysql':
session.execute(f'ALTER TABLE {table} MODIFY COLUMN description TEXT')
session.execute(f'ALTER TABLE {table} MODIFY COLUMN merge_commit_sha VARCHAR(255) NULL')
elif dialect == 'postgresql':
session.execute(f'ALTER TABLE {table} ALTER COLUMN description TYPE TEXT')
session.execute(f'ALTER TABLE {table} ALTER COLUMN merge_commit_sha DROP NOT NULL')
class GitPullRequestCommit(ToolModel, table=True):
commit_id: str = Field(primary_key=True)
pull_request_id: str
author_name: str = Field(source='/author/name')
author_email: str = Field(source='/author/email')
author_date: datetime.datetime = Field(source='/author/date')
class Build(ToolModel, table=True):
class BuildStatus(Enum):
Cancelling = "cancelling"
Completed = "completed"
InProgress = "inProgress"
NotStarted = "notStarted"
Postponed = "postponed"
class BuildResult(Enum):
Canceled = "canceled"
Failed = "failed"
Non = "none"
PartiallySucceeded = "partiallySucceeded"
Succeeded = "succeeded"
id: int = Field(primary_key=True)
name: str = Field(source='/definition/name')
start_time: Optional[datetime.datetime]
finish_time: Optional[datetime.datetime]
status: BuildStatus
result: Optional[BuildResult]
source_branch: str
source_version: str
def migrate(self, session: Session):
dialect =
if dialect not in ['mysql', 'postgresql']:
raise Exception(f'Unsupported dialect {dialect}')
table = self.__tablename__
# Make column result nullable
if dialect == 'mysql':
session.execute(f'ALTER TABLE {table} MODIFY COLUMN result VARCHAR(255) NULL')
elif dialect == 'postgresql':
session.execute(f'ALTER TABLE {table} ALTER COLUMN "result" DROP NOT NULL')
class Job(ToolModel, table=True):
class JobState(Enum):
Completed = "completed"
InProgress = "inProgress"
Pending = "pending"
class JobResult(Enum):
Abandoned = "abandoned"
Canceled = "canceled"
Failed = "failed"
Skipped = "skipped"
Succeeded = "succeeded"
SucceededWithIssues = "succeededWithIssues"
id: str = Field(primary_key=True)
build_id: str = Field(primary_key=True)
name: str
startTime: Optional[datetime.datetime]
finishTime: Optional[datetime.datetime]
state: JobState
result: Optional[JobResult]
def migrate(self, session: Session):
dialect =
if dialect not in ['mysql', 'postgresql']:
raise Exception(f'Unsupported dialect {dialect}')
table = self.__tablename__
# Add build_id column and add it tp primary key
if dialect == 'mysql':
session.execute(f'ALTER TABLE {table} DROP PRIMARY KEY')
elif dialect == 'postgresql':
session.execute(f'ALTER TABLE {table} DROP CONSTRAINT {table}_pkey')
session.execute(f'ALTER TABLE {table} ADD PRIMARY KEY (id, build_id)')
# Make columns nullable
if dialect == 'mysql':
session.execute(f'ALTER TABLE {table} MODIFY COLUMN startTime DATETIME NULL')
session.execute(f'ALTER TABLE {table} MODIFY COLUMN finishTime DATETIME NULL')
session.execute(f'ALTER TABLE {table} MODIFY COLUMN result VARCHAR(255) NULL')
elif dialect == 'postgresql':
session.execute(f'ALTER TABLE {table} ALTER COLUMN "startTime" DROP NOT NULL')
session.execute(f'ALTER TABLE {table} ALTER COLUMN "finishTime" DROP NOT NULL')
session.execute(f'ALTER TABLE {table} ALTER COLUMN "result" DROP NOT NULL')