| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| from dataclasses import dataclass |
| from datetime import datetime |
| from typing import Dict |
| |
| import pg8000 |
| |
| from otava.analysis import ChangePoint |
| from otava.test_config import PostgresTestConfig |
| |
| |
| @dataclass |
| class PostgresConfig: |
| NAME = "postgres" |
| |
| hostname: str |
| port: int |
| username: str |
| password: str |
| database: str |
| |
| @staticmethod |
| def add_parser_args(arg_group): |
| arg_group.add_argument("--postgres-hostname", help="PostgreSQL server hostname", env_var="POSTGRES_HOSTNAME") |
| arg_group.add_argument("--postgres-port", type=int, help="PostgreSQL server port", env_var="POSTGRES_PORT") |
| arg_group.add_argument("--postgres-username", help="PostgreSQL username", env_var="POSTGRES_USERNAME") |
| arg_group.add_argument("--postgres-password", help="PostgreSQL password", env_var="POSTGRES_PASSWORD") |
| arg_group.add_argument("--postgres-database", help="PostgreSQL database name", env_var="POSTGRES_DATABASE") |
| |
| @staticmethod |
| def from_parser_args(args): |
| return PostgresConfig( |
| hostname=getattr(args, 'postgres_hostname', None), |
| port=getattr(args, 'postgres_port', None), |
| username=getattr(args, 'postgres_username', None), |
| password=getattr(args, 'postgres_password', None), |
| database=getattr(args, 'postgres_database', None) |
| ) |
| |
| |
| @dataclass |
| class PostgresError(Exception): |
| message: str |
| |
| |
| class Postgres: |
| __conn = None |
| __config = None |
| |
| def __init__(self, config: PostgresConfig): |
| self.__config = config |
| |
| def __get_conn(self) -> pg8000.dbapi.Connection: |
| if self.__conn is None: |
| self.__conn = pg8000.dbapi.Connection( |
| host=self.__config.hostname, |
| port=self.__config.port, |
| user=self.__config.username, |
| password=self.__config.password, |
| database=self.__config.database, |
| ) |
| return self.__conn |
| |
| def fetch_data(self, query: str): |
| cursor = self.__get_conn().cursor() |
| cursor.execute(query) |
| columns = [c[0] for c in cursor.description] |
| return (columns, cursor.fetchall()) |
| |
| def insert_change_point( |
| self, |
| test: PostgresTestConfig, |
| metric_name: str, |
| attributes: Dict, |
| change_point: ChangePoint, |
| ): |
| cursor = self.__get_conn().cursor() |
| kwargs = {**attributes, **{test.time_column: datetime.utcfromtimestamp(change_point.time)}} |
| update_stmt = test.update_stmt.format(metric=metric_name, **kwargs) |
| cursor.execute( |
| update_stmt, |
| ( |
| change_point.forward_change_percent(), |
| change_point.backward_change_percent(), |
| change_point.stats.pvalue, |
| ), |
| ) |
| self.__get_conn().commit() |