| from flask.ext.appbuilder import Model |
| from datetime import timedelta |
| from flask.ext.appbuilder.models.mixins import AuditMixin |
| from flask import request, redirect, flash, Response |
| from sqlalchemy import Column, Integer, String, ForeignKey, Text, Boolean, DateTime |
| from sqlalchemy import create_engine, MetaData, desc |
| from sqlalchemy import Table as sqlaTable |
| from sqlalchemy.orm import relationship |
| from dateutil.parser import parse |
| from pydruid import client |
| from pydruid.utils.filters import Dimension, Filter |
| from pandas import read_sql_query |
| from sqlalchemy.sql import table, literal_column |
| from sqlalchemy import select, and_, text, String |
| |
| from copy import deepcopy, copy |
| from collections import namedtuple |
| from datetime import datetime |
| import logging |
| import json |
| import sqlparse |
| import requests |
| import textwrap |
| |
| from panoramix import db, get_session |
| |
| QueryResult = namedtuple('namedtuple', ['df', 'query', 'duration']) |
| |
| |
| class Queryable(object): |
| @property |
| def column_names(self): |
| return sorted([c.column_name for c in self.columns]) |
| |
| @property |
| def groupby_column_names(self): |
| return sorted([c.column_name for c in self.columns if c.groupby]) |
| |
| @property |
| def filterable_column_names(self): |
| return sorted([c.column_name for c in self.columns if c.filterable]) |
| |
| class Database(Model, AuditMixin): |
| __tablename__ = 'dbs' |
| id = Column(Integer, primary_key=True) |
| database_name = Column(String(255), unique=True) |
| sqlalchemy_uri = Column(String(1024)) |
| |
| |
| def __repr__(self): |
| return self.database_name |
| |
| def get_sqla_engine(self): |
| return create_engine(self.sqlalchemy_uri) |
| |
| def get_table(self, table_name): |
| meta = MetaData() |
| return sqlaTable( |
| table_name, meta, |
| autoload=True, |
| autoload_with=self.get_sqla_engine()) |
| |
| |
| class Table(Model, Queryable, AuditMixin): |
| __tablename__ = 'tables' |
| id = Column(Integer, primary_key=True) |
| table_name = Column(String(255), unique=True) |
| main_datetime_column_id = Column(Integer, ForeignKey('table_columns.id')) |
| main_datetime_column = relationship( |
| 'TableColumn', foreign_keys=[main_datetime_column_id]) |
| default_endpoint = Column(Text) |
| database_id = Column(Integer, ForeignKey('dbs.id'), nullable=False) |
| database = relationship( |
| 'Database', backref='tables', foreign_keys=[database_id]) |
| |
| baselink = "tableview" |
| |
| @property |
| def name(self): |
| return self.table_name |
| |
| @property |
| def table_link(self): |
| url = "/panoramix/table/{}/".format(self.id) |
| return '<a href="{url}">{self.table_name}</a>'.format(**locals()) |
| |
| @property |
| def metrics_combo(self): |
| return sorted( |
| [ |
| (m.metric_name, m.verbose_name) |
| for m in self.metrics], |
| key=lambda x: x[1]) |
| |
| def query_bkp( |
| self, groupby, metrics, |
| granularity, |
| from_dttm, to_dttm, |
| limit_spec=None, |
| filter=None, |
| is_timeseries=True, |
| timeseries_limit=15, row_limit=None): |
| """ |
| Unused, legacy way of querying by building a SQL string without |
| using the sqlalchemy expression API (new approach which supports |
| all dialects) |
| """ |
| from pandas import read_sql_query |
| qry_start_dttm = datetime.now() |
| metrics_exprs = [ |
| "{} AS {}".format(m.expression, m.metric_name) |
| for m in self.metrics if m.metric_name in metrics] |
| from_dttm_iso = from_dttm.isoformat() |
| to_dttm_iso = to_dttm.isoformat() |
| |
| if metrics: |
| main_metric_expr = [m.expression for m in self.metrics if m.metric_name == metrics[0]][0] |
| else: |
| main_metric_expr = "COUNT(*)" |
| |
| select_exprs = [] |
| groupby_exprs = [] |
| |
| if groupby: |
| select_exprs = copy(groupby) |
| groupby_exprs = [s for s in groupby] |
| inner_groupby_exprs = [s for s in groupby] |
| select_exprs += metrics_exprs |
| if granularity != "all": |
| select_exprs += ['ds as timestamp'] |
| groupby_exprs += ['ds'] |
| |
| select_exprs = ",\n".join(select_exprs) |
| groupby_exprs = ",\n".join(groupby_exprs) |
| |
| where_clause = [ |
| "ds >= '{from_dttm_iso}'", |
| "ds < '{to_dttm_iso}'" |
| ] |
| for col, op, eq in filter: |
| if op in ('in', 'not in'): |
| l = ["'{}'".format(s) for s in eq.split(",")] |
| l = ", ".join(l) |
| op = op.upper() |
| where_clause.append( |
| "{col} {op} ({l})".format(**locals()) |
| ) |
| where_clause = " AND\n".join(where_clause).format(**locals()) |
| on_clause = " AND ".join(["{g} = __{g}".format(g=g) for g in groupby]) |
| limiting_join = "" |
| if timeseries_limit and groupby: |
| inner_select = ", ".join(["{g} as __{g}".format(g=g) for g in inner_groupby_exprs]) |
| inner_groupby_exprs = ", ".join(inner_groupby_exprs) |
| limiting_join = ( |
| "JOIN ( \n" |
| " SELECT {inner_select} \n" |
| " FROM {self.table_name} \n" |
| " WHERE \n" |
| " {where_clause}\n" |
| " GROUP BY {inner_groupby_exprs}\n" |
| " ORDER BY {main_metric_expr} DESC\n" |
| " LIMIT {timeseries_limit}\n" |
| ") z ON {on_clause}\n" |
| ).format(**locals()) |
| |
| sql = ( |
| "SELECT\n" |
| " {select_exprs}\n" |
| "FROM {self.table_name}\n" |
| "{limiting_join}" |
| "WHERE\n" |
| " {where_clause}\n" |
| "GROUP BY\n" |
| " {groupby_exprs}\n" |
| ).format(**locals()) |
| df = read_sql_query( |
| sql=sql, |
| con=self.database.get_sqla_engine() |
| ) |
| textwrap.dedent(sql) |
| |
| return QueryResult( |
| df=df, duration=datetime.now() - qry_start_dttm, query=sql) |
| |
| def query( |
| self, groupby, metrics, |
| granularity, |
| from_dttm, to_dttm, |
| limit_spec=None, |
| filter=None, |
| is_timeseries=True, |
| timeseries_limit=15, row_limit=None): |
| |
| qry_start_dttm = datetime.now() |
| timestamp = literal_column( |
| self.main_datetime_column.column_name).label('timestamp') |
| metrics_exprs = [ |
| literal_column(m.expression).label(m.metric_name) |
| for m in self.metrics if m.metric_name in metrics] |
| |
| if metrics: |
| main_metric_expr = literal_column( |
| [m.expression for m in self.metrics if m.metric_name == metrics[0]][0]) |
| else: |
| main_metric_expr = literal_column("COUNT(*)") |
| |
| select_exprs = [] |
| groupby_exprs = [] |
| |
| if groupby: |
| select_exprs = [literal_column(s) for s in groupby] |
| groupby_exprs = [literal_column(s) for s in groupby] |
| inner_groupby_exprs = [literal_column(s).label('__' + s) for s in groupby] |
| if granularity != "all": |
| select_exprs += [timestamp] |
| groupby_exprs += [timestamp] |
| |
| select_exprs += metrics_exprs |
| qry = select(select_exprs) |
| from_clause = table(self.table_name) |
| qry = qry.group_by(*groupby_exprs) |
| |
| where_clause_and = [ |
| timestamp >= from_dttm.isoformat(), |
| timestamp < to_dttm.isoformat(), |
| ] |
| for col, op, eq in filter: |
| if op in ('in', 'not in'): |
| values = eq.split(",") |
| cond = literal_column(col).in_(values) |
| if op == 'not in': |
| cond = ~cond |
| where_clause_and.append(cond) |
| qry = qry.where(and_(*where_clause_and)) |
| qry = qry.order_by(desc(main_metric_expr)) |
| qry = qry.limit(row_limit) |
| |
| if timeseries_limit and groupby: |
| subq = select(inner_groupby_exprs) |
| subq = subq.select_from(table(self.table_name)) |
| subq = subq.where(and_(*where_clause_and)) |
| subq = subq.group_by(*inner_groupby_exprs) |
| subq = subq.order_by(desc(main_metric_expr)) |
| subq = subq.limit(timeseries_limit) |
| on_clause = [] |
| for gb in groupby: |
| on_clause.append(literal_column(gb)==literal_column("__" + gb)) |
| |
| from_clause = from_clause.join(subq.alias(), and_(*on_clause)) |
| |
| qry = qry.select_from(from_clause) |
| |
| engine = self.database.get_sqla_engine() |
| sql = str(qry.compile(engine, compile_kwargs={"literal_binds": True})) |
| df = read_sql_query( |
| sql=sql, |
| con=engine |
| ) |
| sql = sqlparse.format(sql, reindent=True) |
| return QueryResult( |
| df=df, duration=datetime.now() - qry_start_dttm, query=sql) |
| |
| |
| def fetch_metadata(self): |
| try: |
| table = self.database.get_table(self.table_name) |
| except Exception as e: |
| flash(str(e)) |
| flash( |
| "Table doesn't seem to exist in the specified database, " |
| "couldn't fetch column information", "danger") |
| return |
| |
| TC = TableColumn |
| M = SqlMetric |
| metrics = [] |
| any_date_col = None |
| for col in table.columns: |
| try: |
| datatype = str(col.type) |
| except Exception as e: |
| datatype = "UNKNOWN" |
| dbcol = ( |
| db.session |
| .query(TC) |
| .filter(TC.table==self) |
| .filter(TC.column_name==col.name) |
| .first() |
| ) |
| db.session.flush() |
| if not dbcol: |
| dbcol = TableColumn(column_name=col.name) |
| |
| if ( |
| str(datatype).startswith('VARCHAR') or |
| str(datatype).startswith('STRING')): |
| dbcol.groupby = True |
| dbcol.filterable = True |
| db.session.merge(self) |
| self.columns.append(dbcol) |
| |
| if not any_date_col and 'date' in datatype.lower(): |
| any_date_col = dbcol |
| |
| if dbcol.sum: |
| metrics.append(M( |
| metric_name='sum__' + dbcol.column_name, |
| verbose_name='sum__' + dbcol.column_name, |
| metric_type='sum', |
| expression="SUM({})".format(dbcol.column_name) |
| )) |
| if dbcol.max: |
| metrics.append(M( |
| metric_name='max__' + dbcol.column_name, |
| verbose_name='max__' + dbcol.column_name, |
| metric_type='max', |
| expression="MAX({})".format(dbcol.column_name) |
| )) |
| if dbcol.min: |
| metrics.append(M( |
| metric_name='min__' + dbcol.column_name, |
| verbose_name='min__' + dbcol.column_name, |
| metric_type='min', |
| expression="MIN({})".format(dbcol.column_name) |
| )) |
| if dbcol.count_distinct: |
| metrics.append(M( |
| metric_name='count_distinct__' + dbcol.column_name, |
| verbose_name='count_distinct__' + dbcol.column_name, |
| metric_type='count_distinct', |
| expression="COUNT(DISTINCT {})".format(dbcol.column_name) |
| )) |
| dbcol.type = datatype |
| db.session.merge(self) |
| db.session.commit() |
| |
| metrics.append(M( |
| metric_name='count', |
| verbose_name='COUNT(*)', |
| metric_type='count', |
| expression="COUNT(*)" |
| )) |
| for metric in metrics: |
| m = ( |
| db.session.query(M) |
| .filter(M.metric_name==metric.metric_name) |
| .filter(M.table==self) |
| .first() |
| ) |
| metric.table = self |
| if not m: |
| db.session.add(metric) |
| db.session.commit() |
| if not self.main_datetime_column: |
| self.main_datetime_column = any_date_col |
| |
| |
| |
| |
| class SqlMetric(Model, AuditMixin): |
| __tablename__ = 'sql_metrics' |
| id = Column(Integer, primary_key=True) |
| metric_name = Column(String(512)) |
| verbose_name = Column(String(1024)) |
| metric_type = Column(String(32)) |
| table_id = Column(Integer,ForeignKey('tables.id')) |
| table = relationship( |
| 'Table', backref='metrics', foreign_keys=[table_id]) |
| expression = Column(Text) |
| description = Column(Text) |
| |
| |
| class TableColumn(Model, AuditMixin): |
| __tablename__ = 'table_columns' |
| id = Column(Integer, primary_key=True) |
| table_id = Column(Integer, ForeignKey('tables.id')) |
| table = relationship('Table', backref='columns', foreign_keys=[table_id]) |
| column_name = Column(String(256)) |
| is_dttm = Column(Boolean, default=True) |
| is_active = Column(Boolean, default=True) |
| type = Column(String(32), default='') |
| groupby = Column(Boolean, default=False) |
| count_distinct = Column(Boolean, default=False) |
| sum = Column(Boolean, default=False) |
| max = Column(Boolean, default=False) |
| min = Column(Boolean, default=False) |
| filterable = Column(Boolean, default=False) |
| description = Column(Text, default='') |
| |
| def __repr__(self): |
| return self.column_name |
| |
| class Cluster(Model, AuditMixin): |
| __tablename__ = 'clusters' |
| id = Column(Integer, primary_key=True) |
| cluster_name = Column(String(255), unique=True) |
| coordinator_host = Column(String(256)) |
| coordinator_port = Column(Integer) |
| coordinator_endpoint = Column(String(256)) |
| broker_host = Column(String(256)) |
| broker_port = Column(Integer) |
| broker_endpoint = Column(String(256)) |
| metadata_last_refreshed = Column(DateTime) |
| |
| def __repr__(self): |
| return self.cluster_name |
| |
| def get_pydruid_client(self): |
| cli = client.PyDruid( |
| "http://{0}:{1}/".format(self.broker_host, self.broker_port), |
| self.broker_endpoint) |
| return cli |
| |
| def refresh_datasources(self): |
| endpoint = ( |
| "http://{self.coordinator_host}:{self.coordinator_port}/" |
| "{self.coordinator_endpoint}/datasources" |
| ).format(self=self) |
| |
| datasources = json.loads(requests.get(endpoint).text) |
| for datasource in datasources: |
| Datasource.sync_to_db(datasource, self) |
| |
| |
| class Datasource(Model, AuditMixin, Queryable): |
| |
| baselink = "datasourcemodelview" |
| |
| __tablename__ = 'datasources' |
| id = Column(Integer, primary_key=True) |
| datasource_name = Column(String(255), unique=True) |
| is_featured = Column(Boolean, default=False) |
| is_hidden = Column(Boolean, default=False) |
| description = Column(Text) |
| default_endpoint = Column(Text) |
| user_id = Column(Integer, ForeignKey('ab_user.id')) |
| owner = relationship('User', backref='datasources', foreign_keys=[user_id]) |
| cluster_name = Column(String(255), |
| ForeignKey('clusters.cluster_name')) |
| cluster = relationship('Cluster', backref='datasources', foreign_keys=[cluster_name]) |
| |
| @property |
| def metrics_combo(self): |
| return sorted( |
| [(m.metric_name, m.verbose_name) for m in self.metrics], |
| key=lambda x: x[1]) |
| |
| @property |
| def name(self): |
| return self.datasource_name |
| |
| def __repr__(self): |
| return self.datasource_name |
| |
| @property |
| def datasource_link(self): |
| url = "/panoramix/datasource/{}/".format(self.datasource_name) |
| return '<a href="{url}">{self.datasource_name}</a>'.format(**locals()) |
| |
| def get_metric_obj(self, metric_name): |
| return [ |
| m.json_obj for m in self.metrics |
| if m.metric_name == metric_name |
| ][0] |
| |
| def latest_metadata(self): |
| client = self.cluster.get_pydruid_client() |
| results = client.time_boundary(datasource=self.datasource_name) |
| if not results: |
| return |
| max_time = results[0]['result']['minTime'] |
| max_time = parse(max_time) |
| intervals = (max_time - timedelta(seconds=1)).isoformat() + '/' |
| intervals += (max_time + timedelta(seconds=1)).isoformat() |
| segment_metadata = client.segment_metadata( |
| datasource=self.datasource_name, |
| intervals=intervals) |
| if segment_metadata: |
| return segment_metadata[-1]['columns'] |
| |
| def generate_metrics(self): |
| for col in self.columns: |
| col.generate_metrics() |
| |
| @classmethod |
| def sync_to_db(cls, name, cluster): |
| session = get_session() |
| datasource = session.query(cls).filter_by(datasource_name=name).first() |
| if not datasource: |
| datasource = cls(datasource_name=name) |
| session.add(datasource) |
| datasource.cluster = cluster |
| |
| cols = datasource.latest_metadata() |
| if not cols: |
| return |
| for col in cols: |
| col_obj = ( |
| session |
| .query(Column) |
| .filter_by(datasource_name=name, column_name=col) |
| .first() |
| ) |
| datatype = cols[col]['type'] |
| if not col_obj: |
| col_obj = Column(datasource_name=name, column_name=col) |
| session.add(col_obj) |
| if datatype == "STRING": |
| col_obj.groupby = True |
| col_obj.filterable = True |
| if col_obj: |
| col_obj.type = cols[col]['type'] |
| col_obj.datasource = datasource |
| col_obj.generate_metrics() |
| #session.commit() |
| |
| def query( |
| self, groupby, metrics, |
| granularity, |
| from_dttm, to_dttm, |
| limit_spec=None, |
| filter=None, |
| is_timeseries=True, |
| timeseries_limit=None, |
| row_limit=None): |
| qry_start_dttm = datetime.now() |
| |
| query_str = "" |
| aggregations = { |
| m.metric_name: m.json_obj |
| for m in self.metrics if m.metric_name in metrics |
| } |
| if not isinstance(granularity, basestring): |
| granularity = {"type": "duration", "duration": granularity} |
| |
| qry = dict( |
| datasource=self.datasource_name, |
| dimensions=groupby, |
| aggregations=aggregations, |
| granularity=granularity, |
| intervals= from_dttm.isoformat() + '/' + to_dttm.isoformat(), |
| ) |
| filters = None |
| for col, op, eq in filter: |
| cond = None |
| if op == '==': |
| cond = Dimension(col)==eq |
| elif op == '!=': |
| cond = ~(Dimension(col)==eq) |
| elif op in ('in', 'not in'): |
| fields = [] |
| splitted = eq.split(',') |
| if len(splitted) > 1: |
| for s in eq.split(','): |
| s = s.strip() |
| fields.append(Filter.build_filter(Dimension(col)==s)) |
| cond = Filter(type="or", fields=fields) |
| else: |
| cond = Dimension(col)==eq |
| if op == 'not in': |
| cond = ~cond |
| if filters: |
| filters = Filter(type="and", fields=[ |
| Filter.build_filter(cond), |
| Filter.build_filter(filters) |
| ]) |
| else: |
| filters = cond |
| |
| if filters: |
| qry['filter'] = filters |
| |
| client = self.cluster.get_pydruid_client() |
| orig_filters = filters |
| if timeseries_limit and is_timeseries: |
| # Limit on the number of timeseries, doing a two-phases query |
| pre_qry = deepcopy(qry) |
| pre_qry['granularity'] = "all" |
| pre_qry['limit_spec'] = { |
| "type": "default", |
| "limit": timeseries_limit, |
| "columns": [{ |
| "dimension": metrics[0] if metrics else self.metrics[0], |
| "direction": "descending", |
| }], |
| } |
| client.groupby(**pre_qry) |
| query_str += "// Two phase query\n// Phase 1\n" |
| query_str += json.dumps(client.query_dict, indent=2) + "\n" |
| query_str += "//\nPhase 2 (built based on phase one's results)\n" |
| df = client.export_pandas() |
| if not df is None and not df.empty: |
| dims = qry['dimensions'] |
| filters = [] |
| for index, row in df.iterrows(): |
| fields = [] |
| for dim in dims: |
| f = Filter.build_filter(Dimension(dim) == row[dim]) |
| fields.append(f) |
| if len(fields) > 1: |
| filt = Filter(type="and", fields=fields) |
| filters.append(Filter.build_filter(filt)) |
| elif fields: |
| filters.append(fields[0]) |
| |
| if filters: |
| ff = Filter(type="or", fields=filters) |
| if not orig_filters: |
| qry['filter'] = ff |
| else: |
| qry['filter'] = Filter(type="and", fields=[ |
| Filter.build_filter(ff), |
| Filter.build_filter(orig_filters)]) |
| qry['limit_spec'] = None |
| if row_limit: |
| qry['limit_spec'] = { |
| "type": "default", |
| "limit": row_limit, |
| "columns": [{ |
| "dimension": metrics[0] if metrics else self.metrics[0], |
| "direction": "descending", |
| }], |
| } |
| client.groupby(**qry) |
| query_str += json.dumps(client.query_dict, indent=2) |
| df = client.export_pandas() |
| return QueryResult( |
| df=df, |
| query=query_str, |
| duration=datetime.now() - qry_start_dttm) |
| |
| |
| #class Metric(Model, AuditMixin): |
| class Metric(Model): |
| __tablename__ = 'metrics' |
| id = Column(Integer, primary_key=True) |
| metric_name = Column(String(512)) |
| verbose_name = Column(String(1024)) |
| metric_type = Column(String(32)) |
| datasource_name = Column( |
| String(256), |
| ForeignKey('datasources.datasource_name')) |
| datasource = relationship('Datasource', backref='metrics') |
| json = Column(Text) |
| description = Column(Text) |
| |
| @property |
| def json_obj(self): |
| try: |
| obj = json.loads(self.json) |
| except Exception as e: |
| obj = {} |
| return obj |
| |
| |
| class Column(Model, AuditMixin): |
| __tablename__ = 'columns' |
| id = Column(Integer, primary_key=True) |
| datasource_name = Column( |
| String(256), |
| ForeignKey('datasources.datasource_name')) |
| datasource = relationship('Datasource', backref='columns') |
| column_name = Column(String(256)) |
| is_active = Column(Boolean, default=True) |
| type = Column(String(32)) |
| groupby = Column(Boolean, default=False) |
| count_distinct = Column(Boolean, default=False) |
| sum = Column(Boolean, default=False) |
| max = Column(Boolean, default=False) |
| min = Column(Boolean, default=False) |
| filterable = Column(Boolean, default=False) |
| description = Column(Text) |
| |
| def __repr__(self): |
| return self.column_name |
| |
| @property |
| def isnum(self): |
| return self.type in ('LONG', 'DOUBLE', 'FLOAT') |
| |
| def generate_metrics(self): |
| M = Metric |
| metrics = [] |
| metrics.append(Metric( |
| metric_name='count', |
| verbose_name='COUNT(*)', |
| metric_type='count', |
| json=json.dumps({'type': 'count', 'name': 'count'}) |
| )) |
| # Somehow we need to reassign this for UDAFs |
| corrected_type = 'DOUBLE' if self.type in ('DOUBLE', 'FLOAT') else self.type |
| |
| if self.sum and self.isnum: |
| mt = corrected_type.lower() + 'Sum' |
| name='sum__' + self.column_name |
| metrics.append(Metric( |
| metric_name=name, |
| metric_type='sum', |
| verbose_name='SUM({})'.format(self.column_name), |
| json=json.dumps({ |
| 'type': mt, 'name': name, 'fieldName': self.column_name}) |
| )) |
| if self.min and self.isnum: |
| mt = corrected_type.lower() + 'Min' |
| name='min__' + self.column_name |
| metrics.append(Metric( |
| metric_name=name, |
| metric_type='min', |
| verbose_name='MIN({})'.format(self.column_name), |
| json=json.dumps({ |
| 'type': mt, 'name': name, 'fieldName': self.column_name}) |
| )) |
| if self.max and self.isnum: |
| mt = corrected_type.lower() + 'Max' |
| name='max__' + self.column_name |
| metrics.append(Metric( |
| metric_name=name, |
| metric_type='max', |
| verbose_name='MAX({})'.format(self.column_name), |
| json=json.dumps({ |
| 'type': mt, 'name': name, 'fieldName': self.column_name}) |
| )) |
| if self.count_distinct: |
| mt = 'count_distinct' |
| name='count_distinct__' + self.column_name |
| metrics.append(Metric( |
| metric_name=name, |
| verbose_name='COUNT(DISTINCT {})'.format(self.column_name), |
| metric_type='count_distinct', |
| json=json.dumps({ |
| 'type': 'cardinality', |
| 'name': name, |
| 'fieldNames': [self.column_name]}) |
| )) |
| session = get_session() |
| for metric in metrics: |
| m = ( |
| session.query(M) |
| .filter(M.metric_name==metric.metric_name) |
| .filter(M.datasource_name==self.datasource_name) |
| .filter(Cluster.cluster_name==self.datasource.cluster_name) |
| .first() |
| ) |
| metric.datasource_name = self.datasource_name |
| if not m: |
| session.add(metric) |
| session.commit() |