blob: cbe73d886ae94b0c07b707fd92307c5ebe2685d5 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import logging
from datetime import datetime
from typing import Optional
import pandas as pd
from sqlalchemy.orm import Session
from superset import jinja_context
from superset.models.alerts import Alert, SQLObservation
logger = logging.getLogger("tasks.email_reports")
# Session needs to be passed along in the celery workers and db.session cannot be used.
# For more info see: https://github.com/apache/superset/issues/10530
def observe(alert_id: int, session: Session) -> Optional[str]:
"""Collect observations for the alert.
Returns an error message if the observer value was not valid
"""
alert = session.query(Alert).filter_by(id=alert_id).one()
value = None
tp = jinja_context.get_template_processor(database=alert.database)
rendered_sql = tp.process_template(alert.sql)
df = alert.database.get_df(rendered_sql)
error_msg = validate_observer_result(df, alert.id, alert.label)
if not error_msg and not df.empty and df.to_records()[0][1] is not None:
value = float(df.to_records()[0][1])
observation = SQLObservation(
alert_id=alert_id, dttm=datetime.utcnow(), value=value, error_msg=error_msg,
)
session.add(observation)
session.commit()
return error_msg
def validate_observer_result(
sql_result: pd.DataFrame, alert_id: int, alert_label: str
) -> Optional[str]:
"""
Verifies if a DataFrame SQL query result to see if
it contains a valid value for a SQLObservation.
Returns an error message if the result is invalid.
"""
try:
if sql_result.empty:
# empty results are used for the not null validator
return None
rows = sql_result.to_records()
assert (
len(rows) == 1
), f"Observer for alert <{alert_id}:{alert_label}> returned more than 1 row"
assert (
len(rows[0]) == 2
), f"Observer for alert <{alert_id}:{alert_label}> returned more than 1 column"
if rows[0][1] is None:
return None
float(rows[0][1])
except AssertionError as error:
return str(error)
except (TypeError, ValueError):
return (
f"Observer for alert <{alert_id}:{alert_label}> returned a non-number value"
)
return None