| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| import json |
| import logging |
| from typing import Any, Dict, Set |
| |
| from sqlalchemy.orm import Session |
| |
| from superset.models.dashboard import Dashboard |
| |
| logger = logging.getLogger(__name__) |
| |
| |
| JSON_KEYS = {"position": "position_json", "metadata": "json_metadata"} |
| |
| |
| def find_chart_uuids(position: Dict[str, Any]) -> Set[str]: |
| return set(build_uuid_to_id_map(position)) |
| |
| |
| def build_uuid_to_id_map(position: Dict[str, Any]) -> Dict[str, int]: |
| return { |
| child["meta"]["uuid"]: child["meta"]["chartId"] |
| for child in position.values() |
| if ( |
| isinstance(child, dict) |
| and child["type"] == "CHART" |
| and "uuid" in child["meta"] |
| ) |
| } |
| |
| |
| def update_id_refs(config: Dict[str, Any], chart_ids: Dict[str, int]) -> Dict[str, Any]: |
| """Update dashboard metadata to use new IDs""" |
| fixed = config.copy() |
| |
| # build map old_id => new_id |
| old_ids = build_uuid_to_id_map(fixed["position"]) |
| id_map = {old_id: chart_ids[uuid] for uuid, old_id in old_ids.items()} |
| |
| # fix metadata |
| metadata = fixed.get("metadata", {}) |
| if "timed_refresh_immune_slices" in metadata: |
| metadata["timed_refresh_immune_slices"] = [ |
| id_map[old_id] for old_id in metadata["timed_refresh_immune_slices"] |
| ] |
| |
| if "filter_scopes" in metadata: |
| # in filter_scopes the key is the chart ID as a string; we need to udpate |
| # them to be the new ID as a string: |
| metadata["filter_scopes"] = { |
| str(id_map[int(old_id)]): columns |
| for old_id, columns in metadata["filter_scopes"].items() |
| } |
| |
| # now update columns to use new IDs: |
| for columns in metadata["filter_scopes"].values(): |
| for attributes in columns.values(): |
| attributes["immune"] = [ |
| id_map[old_id] for old_id in attributes["immune"] |
| ] |
| |
| if "expanded_slices" in metadata: |
| metadata["expanded_slices"] = { |
| str(id_map[int(old_id)]): value |
| for old_id, value in metadata["expanded_slices"].items() |
| } |
| |
| if "default_filters" in metadata: |
| default_filters = json.loads(metadata["default_filters"]) |
| metadata["default_filters"] = json.dumps( |
| { |
| str(id_map[int(old_id)]): value |
| for old_id, value in default_filters.items() |
| } |
| ) |
| |
| # fix position |
| position = fixed.get("position", {}) |
| for child in position.values(): |
| if ( |
| isinstance(child, dict) |
| and child["type"] == "CHART" |
| and "uuid" in child["meta"] |
| ): |
| child["meta"]["chartId"] = chart_ids[child["meta"]["uuid"]] |
| |
| return fixed |
| |
| |
| def import_dashboard( |
| session: Session, config: Dict[str, Any], overwrite: bool = False |
| ) -> Dashboard: |
| existing = session.query(Dashboard).filter_by(uuid=config["uuid"]).first() |
| if existing: |
| if not overwrite: |
| return existing |
| config["id"] = existing.id |
| |
| # TODO (betodealmeida): move this logic to import_from_dict |
| config = config.copy() |
| for key, new_name in JSON_KEYS.items(): |
| if config.get(key) is not None: |
| value = config.pop(key) |
| try: |
| config[new_name] = json.dumps(value) |
| except TypeError: |
| logger.info("Unable to encode `%s` field: %s", key, value) |
| |
| dashboard = Dashboard.import_from_dict(session, config, recursive=False) |
| if dashboard.id is None: |
| session.flush() |
| |
| return dashboard |