A Data Access Object (DAO) is a pattern that provides an abstract interface to the SQLAlchemy Object Relational Mapper (ORM). The DAOs are critical as they form the building block of the application which are wrapped by the associated commands and RESTful API endpoints.
Currently there are numerous inconsistencies and violation of the DRY principal within the codebase as it relates to DAOs and ORMs—unnecessary commits, non-ACID transactions, etc.—which makes the code unnecessarily complex and convoluted. Addressing the underlying issues with the DAOs should help simplify the downstream operations and improve the developer experience.
To ensure consistency the following rules should be adhered to:
All database operations (including testing) should be defined within a DAO, i.e., there should not be any explicit db.session.add, db.session.merge, etc. calls outside of a DAO.
A DAO should use create, update, delete, upsert terms—typical database operations which ensure consistency with commands—rather than action based terms like save, saveas, override, etc.
Sessions should be managed via a context manager which auto-commits on success and rolls back on failure, i.e., there should be no explicit db.session.commit or db.session.rollback calls within the DAO.
There should be a single atomic transaction representing the entirety of the operation, i.e., when creating a dataset with associated columns and metrics either all the changes succeed when the transaction is committed, or all the changes are undone when the transaction is rolled back. SQLAlchemy supports nested transactions via the begin_nested method which can be nested—inline with how DAOs are invoked.
The database layer should adopt a “shift left” mentality i.e., uniqueness/foreign key constraints, relationships, cascades, etc. should all be defined in the database layer rather than being enforced in the application layer.
from typing import List, Optional, Dict, Any from sqlalchemy.orm import Session from superset.extensions import db from superset.models.dashboard import Dashboard class DashboardDAO: """Data Access Object for Dashboard operations""" @classmethod def find_by_id(cls, dashboard_id: int) -> Optional[Dashboard]: """Find dashboard by ID""" return db.session.query(Dashboard).filter_by(id=dashboard_id).first() @classmethod def find_by_ids(cls, dashboard_ids: List[int]) -> List[Dashboard]: """Find dashboards by list of IDs""" return db.session.query(Dashboard).filter( Dashboard.id.in_(dashboard_ids) ).all() @classmethod def create(cls, properties: Dict[str, Any]) -> Dashboard: """Create a new dashboard""" with db.session.begin(): dashboard = Dashboard(**properties) db.session.add(dashboard) db.session.flush() # Get the ID return dashboard @classmethod def update(cls, dashboard: Dashboard, properties: Dict[str, Any]) -> Dashboard: """Update an existing dashboard""" with db.session.begin(): for key, value in properties.items(): setattr(dashboard, key, value) return dashboard @classmethod def delete(cls, dashboard: Dashboard) -> None: """Delete a dashboard""" with db.session.begin(): db.session.delete(dashboard)
class DatasetDAO: """Data Access Object for Dataset operations""" @classmethod def create_with_columns_and_metrics( cls, dataset_properties: Dict[str, Any], columns: List[Dict[str, Any]], metrics: List[Dict[str, Any]] ) -> Dataset: """Create dataset with associated columns and metrics atomically""" with db.session.begin(): # Create the dataset dataset = Dataset(**dataset_properties) db.session.add(dataset) db.session.flush() # Get the dataset ID # Create columns for column_props in columns: column_props['dataset_id'] = dataset.id column = TableColumn(**column_props) db.session.add(column) # Create metrics for metric_props in metrics: metric_props['dataset_id'] = dataset.id metric = SqlMetric(**metric_props) db.session.add(metric) return dataset @classmethod def bulk_delete(cls, dataset_ids: List[int]) -> int: """Delete multiple datasets and return count""" with db.session.begin(): count = db.session.query(Dataset).filter( Dataset.id.in_(dataset_ids) ).delete(synchronize_session=False) return count
class DashboardDAO: @classmethod def find_by_slug(cls, slug: str) -> Optional[Dashboard]: """Find dashboard by slug""" return db.session.query(Dashboard).filter_by(slug=slug).first() @classmethod def find_by_owner(cls, owner_id: int) -> List[Dashboard]: """Find all dashboards owned by a user""" return db.session.query(Dashboard).filter_by( created_by_fk=owner_id ).all() @classmethod def search( cls, query: str, page: int = 0, page_size: int = 25 ) -> Tuple[List[Dashboard], int]: """Search dashboards with pagination""" base_query = db.session.query(Dashboard).filter( Dashboard.dashboard_title.ilike(f"%{query}%") ) total_count = base_query.count() dashboards = base_query.offset(page * page_size).limit(page_size).all() return dashboards, total_count
from sqlalchemy.exc import IntegrityError from superset.exceptions import DAOCreateFailedError, DAODeleteFailedError class DashboardDAO: @classmethod def create(cls, properties: Dict[str, Any]) -> Dashboard: """Create a new dashboard with error handling""" try: with db.session.begin(): dashboard = Dashboard(**properties) db.session.add(dashboard) db.session.flush() return dashboard except IntegrityError as ex: raise DAOCreateFailedError(str(ex)) from ex @classmethod def delete(cls, dashboard: Dashboard) -> None: """Delete a dashboard with error handling""" try: with db.session.begin(): db.session.delete(dashboard) except IntegrityError as ex: raise DAODeleteFailedError( f"Cannot delete dashboard {dashboard.id}: {str(ex)}" ) from ex
All DAO methods should be class methods (@classmethod) rather than instance methods:
# ✅ Good class DashboardDAO: @classmethod def find_by_id(cls, dashboard_id: int) -> Optional[Dashboard]: return db.session.query(Dashboard).filter_by(id=dashboard_id).first() # ❌ Avoid class DashboardDAO: def find_by_id(self, dashboard_id: int) -> Optional[Dashboard]: return db.session.query(Dashboard).filter_by(id=dashboard_id).first()
Always use context managers to ensure proper transaction handling:
# ✅ Good - automatic commit/rollback @classmethod def create(cls, properties: Dict[str, Any]) -> Dashboard: with db.session.begin(): dashboard = Dashboard(**properties) db.session.add(dashboard) return dashboard # ❌ Avoid - manual commit/rollback @classmethod def create(cls, properties: Dict[str, Any]) -> Dashboard: try: dashboard = Dashboard(**properties) db.session.add(dashboard) db.session.commit() return dashboard except Exception: db.session.rollback() raise
Method names should clearly indicate the operation:
# ✅ Good - clear CRUD operations create() update() delete() find_by_id() find_by_slug() # ❌ Avoid - ambiguous names save() remove() get()
Always include type hints for parameters and return values:
@classmethod def find_by_ids(cls, dashboard_ids: List[int]) -> List[Dashboard]: """Find dashboards by list of IDs""" return db.session.query(Dashboard).filter( Dashboard.id.in_(dashboard_ids) ).all()
Provide efficient batch operations when needed:
@classmethod def bulk_update_published_status( cls, dashboard_ids: List[int], published: bool ) -> int: """Update published status for multiple dashboards""" with db.session.begin(): count = db.session.query(Dashboard).filter( Dashboard.id.in_(dashboard_ids) ).update( {Dashboard.published: published}, synchronize_session=False ) return count
import pytest from superset.dashboards.dao import DashboardDAO from superset.models.dashboard import Dashboard def test_dashboard_create(session): """Test creating a dashboard""" properties = { "dashboard_title": "Test Dashboard", "slug": "test-dashboard" } dashboard = DashboardDAO.create(properties) assert dashboard.id is not None assert dashboard.dashboard_title == "Test Dashboard" assert dashboard.slug == "test-dashboard" def test_dashboard_find_by_slug(session): """Test finding dashboard by slug""" # Create test data dashboard = Dashboard( dashboard_title="Test Dashboard", slug="test-dashboard" ) session.add(dashboard) session.commit() # Test the DAO method found_dashboard = DashboardDAO.find_by_slug("test-dashboard") assert found_dashboard is not None assert found_dashboard.dashboard_title == "Test Dashboard" def test_dashboard_delete(session): """Test deleting a dashboard""" dashboard = Dashboard(dashboard_title="Test Dashboard") session.add(dashboard) session.commit() dashboard_id = dashboard.id DashboardDAO.delete(dashboard) deleted_dashboard = DashboardDAO.find_by_id(dashboard_id) assert deleted_dashboard is None
def test_create_dataset_with_columns_atomic(app_context): """Test that creating dataset with columns is atomic""" dataset_properties = {"table_name": "test_table"} columns = [{"column_name": "col1"}, {"column_name": "col2"}] # This should succeed completely or fail completely dataset = DatasetDAO.create_with_columns_and_metrics( dataset_properties, columns, [] ) assert dataset.id is not None assert len(dataset.columns) == 2