ATLAS-4370: Persist Metrics for user to retrieve Metrics info at a past timestamp
Co-authored-by: Disha Talreja <dishatalreja@cloudera.com>
Signed-off-by: Sarath Subramanian <sarath@apache.org>
diff --git a/addons/models/0000-Area0/0010-base_model.json b/addons/models/0000-Area0/0010-base_model.json
index 769d885..a4a9248 100644
--- a/addons/models/0000-Area0/0010-base_model.json
+++ b/addons/models/0000-Area0/0010-base_model.json
@@ -371,6 +371,48 @@
]
},
{
+ "name": "__AtlasMetricsStat",
+ "superTypes": [
+ "__internal"
+ ],
+ "serviceType": "atlas_core",
+ "typeVersion": "1.0",
+ "attributeDefs": [
+ {
+ "name": "metricsId",
+ "typeName": "string",
+ "isOptional": false,
+ "cardinality": "SINGLE",
+ "isUnique": true,
+ "isIndexable": true
+ },
+ {
+ "name": "metrics",
+ "typeName": "string",
+ "isOptional": true,
+ "cardinality": "SINGLE",
+ "isUnique": false,
+ "isIndexable": false
+ },
+ {
+ "name": "collectionTime",
+ "typeName": "long",
+ "isOptional": true,
+ "cardinality": "SINGLE",
+ "isUnique": false,
+ "isIndexable": true
+ },
+ {
+ "name": "timeToLiveMillis",
+ "typeName": "long",
+ "isOptional": true,
+ "cardinality": "SINGLE",
+ "isUnique": false,
+ "isIndexable": true
+ }
+ ]
+ },
+ {
"name": "__ExportImportAuditEntry",
"serviceType": "atlas_core",
"typeVersion": "1.0",
diff --git a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
index b63fab7..28fb68a 100644
--- a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
+++ b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
@@ -82,8 +82,8 @@
DEBUG_METRICS_ENABLED("atlas.debug.metrics.enabled", false),
TASKS_USE_ENABLED("atlas.tasks.enabled", true),
SESSION_TIMEOUT_SECS("atlas.session.timeout.secs", -1),
- UPDATE_COMPOSITE_INDEX_STATUS("atlas.update.composite.index.status", true);
-
+ UPDATE_COMPOSITE_INDEX_STATUS("atlas.update.composite.index.status", true),
+ METRICS_TIME_TO_LIVE_HOURS( "atlas.metrics.ttl.hours", 336); // 14 days default
private static final Configuration APPLICATION_PROPERTIES;
diff --git a/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java b/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java
index 7d09261..8bc7996 100644
--- a/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java
+++ b/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java
@@ -211,6 +211,7 @@
GLOSSARY_TERM_ALREADY_EXISTS(409, "ATLAS-409-00-009", "Glossary term with qualifiedName {0} already exists"),
GLOSSARY_CATEGORY_ALREADY_EXISTS(409, "ATLAS-409-00-00A", "Glossary category with qualifiedName {0} already exists"),
GLOSSARY_IMPORT_FAILED(409, "ATLAS-409-00-011", "Glossary import failed"),
+ METRICSSTAT_ALREADY_EXISTS(409, "ATLAS-409-00-012", "Metric Statistics already collected at {0}"),
// All internal errors go here
INTERNAL_ERROR(500, "ATLAS-500-00-001", "Internal server error {0}"),
diff --git a/intg/src/main/java/org/apache/atlas/model/metrics/AtlasMetricsMapToChart.java b/intg/src/main/java/org/apache/atlas/model/metrics/AtlasMetricsMapToChart.java
new file mode 100644
index 0000000..6e74f65
--- /dev/null
+++ b/intg/src/main/java/org/apache/atlas/model/metrics/AtlasMetricsMapToChart.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.model.metrics;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+
+import java.util.List;
+
+import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE;
+import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY;
+
+/**
+ * AtlasMetricsForChart is a formatted data type specifically for rendering the Stacked Area Chart.
+ * The Stacked Area Chart takes three String values as "key". For the Atlas Metrics entity, they are "Active", "Deleted", and "Shell".
+ * The Stacked Area Chart also takes a list of pairs (primitive values) as "values".
+ * The first element in the pair is collectionTime. It is used for rendering x-axis of the chart.
+ * The second element is the Atlas Metrics entity's count. It is used for rendering y-axis of the chart.
+ */
+@JsonAutoDetect(getterVisibility = PUBLIC_ONLY, setterVisibility = PUBLIC_ONLY, fieldVisibility = NONE)
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class AtlasMetricsMapToChart {
+ private String key;
+ private List<long[]> values;
+
+ public AtlasMetricsMapToChart(String key, List<long[]> values) {
+ this.key = key;
+ this.values = values;
+ }
+
+ public String getKey() { return key; }
+
+ public void setKey(String key) {
+ this.key = key;
+ }
+
+ public List<long[]> getValues() {
+ return values;
+ }
+
+ public void setValues(List<long[]> values) {
+ this.values = values;
+ }
+
+}
\ No newline at end of file
diff --git a/intg/src/main/java/org/apache/atlas/model/metrics/AtlasMetricsStat.java b/intg/src/main/java/org/apache/atlas/model/metrics/AtlasMetricsStat.java
new file mode 100644
index 0000000..5a30cad
--- /dev/null
+++ b/intg/src/main/java/org/apache/atlas/model/metrics/AtlasMetricsStat.java
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.model.metrics;
+
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import org.apache.atlas.AtlasConfiguration;
+import org.apache.atlas.model.AtlasBaseModelObject;
+import org.apache.atlas.utils.AtlasEntityUtil;
+import org.apache.commons.collections.CollectionUtils;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+
+import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE;
+import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY;
+
+
+/**
+ * Atlas MetricsStat which includes Metrics' collection time and time to live (TTL).
+ */
+@JsonAutoDetect(getterVisibility = PUBLIC_ONLY, setterVisibility = PUBLIC_ONLY, fieldVisibility = NONE)
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class AtlasMetricsStat extends AtlasBaseModelObject implements Serializable {
+
+ public static final String METRICS_CATEGORY_GENERAL_PROPERTY = "general";
+ public static final String METRICS_COLLECTION_TIME_PROPERTY = "collectionTime";
+ public static final String METRICS_ID_PREFIX_PROPERTY = "atlas_metrics_";
+
+ private String metricsId;
+ private long collectionTime;
+ private long timeToLiveMillis;
+
+ private Map<String, Object> typeData;
+
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ private AtlasMetrics metrics;
+
+ public AtlasMetricsStat() {
+ }
+
+ public AtlasMetricsStat(String guid) {
+ setGuid(guid);
+ }
+
+ public AtlasMetricsStat(AtlasMetrics metrics){
+ this(metrics, null);
+ }
+
+ public AtlasMetricsStat(AtlasMetrics metrics,List<String> listOfTypeNames) {
+ this(metrics, TimeUnit.HOURS.toMillis(AtlasConfiguration.METRICS_TIME_TO_LIVE_HOURS.getInt()),listOfTypeNames);
+
+ }
+
+ public AtlasMetricsStat(AtlasMetrics metrics, long timeToLiveMillis, List<String> listOfTypeNames) {
+ collectionTime = metrics == null ?
+ System.currentTimeMillis() : (long) metrics.getMetric(METRICS_CATEGORY_GENERAL_PROPERTY, METRICS_COLLECTION_TIME_PROPERTY);
+ setCollectionTime(collectionTime);
+
+ setMetricsId(METRICS_ID_PREFIX_PROPERTY + getCollectionTime() + "@" + AtlasEntityUtil.getMetadataNamespace());
+
+ setTimeToLiveMillis(timeToLiveMillis);
+ setMetrics(metrics);
+ setGuid(getGuid());
+
+ this.typeData = CollectionUtils.isEmpty(listOfTypeNames) ? null : new HashMap<>();
+ AtlasEntityUtil.metricsToTypeData(metrics, listOfTypeNames, typeData);
+ }
+
+
+
+ public String getMetricsId() {
+ return metricsId;
+ }
+
+ public void setMetricsId(String metricsId) {
+ this.metricsId = metricsId;
+ }
+
+ public long getCollectionTime() {
+ return collectionTime;
+ }
+
+ public void setCollectionTime(long collectionTime) {
+ this.collectionTime = collectionTime;
+ }
+
+ public long getTimeToLiveMillis() {
+ return timeToLiveMillis;
+ }
+
+ public void setTimeToLiveMillis(long timeToLiveMillis) {
+ this.timeToLiveMillis = timeToLiveMillis;
+ }
+
+ public AtlasMetrics getMetrics() {
+ return metrics;
+ }
+
+ public void setMetrics(AtlasMetrics metrics) {
+ this.metrics = metrics;
+ }
+
+ public Map<String, Object> getTypeData() {
+ return typeData;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ if (!super.equals(o)) return false;
+ AtlasMetricsStat that = (AtlasMetricsStat) o;
+ return Objects.equals(metrics, that.metrics);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(super.hashCode(), metrics);
+ }
+
+ @Override
+ protected StringBuilder toString(StringBuilder sb) {
+ sb.append(", metricsId=").append(metricsId);
+ sb.append(", collectionTime=").append(collectionTime);
+ sb.append(", timeToLiveMillis=").append(timeToLiveMillis);
+ sb.append(", metrics=");
+ if (metrics == null) {
+ sb.append("null");
+ } else {
+ sb.append(metrics);
+ }
+
+ return sb;
+ }
+}
\ No newline at end of file
diff --git a/intg/src/main/java/org/apache/atlas/utils/AtlasEntityUtil.java b/intg/src/main/java/org/apache/atlas/utils/AtlasEntityUtil.java
index 1e78e25..2b7846c 100644
--- a/intg/src/main/java/org/apache/atlas/utils/AtlasEntityUtil.java
+++ b/intg/src/main/java/org/apache/atlas/utils/AtlasEntityUtil.java
@@ -18,8 +18,11 @@
package org.apache.atlas.utils;
+import org.apache.atlas.ApplicationProperties;
+import org.apache.atlas.AtlasException;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.instance.AtlasRelatedObjectId;
+import org.apache.atlas.model.metrics.AtlasMetrics;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang.StringUtils;
@@ -41,6 +44,15 @@
private static final int SOFT_REFERENCE_FORMAT_INDEX_TYPE_NAME = 0;
private static final int SOFT_REFERENCE_FORMAT_INDEX_GUID = 1;
+ public static final String CONF_METADATA_NAMESPACE = "atlas.metadata.namespace";
+ public static final String CLUSTER_NAME_KEY = "atlas.cluster.name";
+ public static final String DEFAULT_CLUSTER_NAME = "default";
+
+ protected static final String ENTITY = "entity";
+ protected static final String ACTIVE = "Active";
+ protected static final String DELETED = "Deleted";
+ protected static final String SHELL = "Shell";
+ protected static final String[] STATUS_CATEGORY = {ACTIVE, DELETED, SHELL};
public static String formatSoftRefValue(String typeName, String guid) {
return String.format(SOFT_REF_FORMAT, typeName, guid);
@@ -166,4 +178,42 @@
return ret;
}
+ public static String getMetadataNamespace() {
+ String ret = StringUtils.EMPTY;
+ try {
+ ret = ApplicationProperties.get().getString(CONF_METADATA_NAMESPACE, StringUtils.EMPTY);
+ if (StringUtils.isEmpty(ret)) {
+ ret = ApplicationProperties.get().getString(CLUSTER_NAME_KEY, DEFAULT_CLUSTER_NAME);
+ }
+ } catch (AtlasException e) {
+ LOG.info("Failed to load application properties", e);
+ }
+ return StringUtils.isNotEmpty(ret) ? ret : DEFAULT_CLUSTER_NAME;
+ }
+
+ public static void metricsToTypeData(AtlasMetrics metrics, String typeName, Map<String, Object> typeData) {
+ if (typeData == null) {
+ return;
+ }
+
+ Map<String, Integer> innerVal = new HashMap<>();
+
+ for (String status : STATUS_CATEGORY) {
+ Map<String, Integer> metricsMap = (Map<String, Integer>) metrics.getData().get(ENTITY).get(ENTITY + status);
+ innerVal.put(status, metricsMap.getOrDefault(typeName, 0));
+ }
+
+ typeData.put(typeName, innerVal);
+ }
+
+ public static void metricsToTypeData(AtlasMetrics metrics, List<String> typeNames, Map<String, Object> typeData) {
+ if (typeData == null || CollectionUtils.isEmpty(typeNames)) {
+ return;
+ }
+
+ for (String typeName : typeNames) {
+ metricsToTypeData(metrics, typeName, typeData);
+ }
+ }
+
}
diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
index d65bb1a..9924b2e 100755
--- a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
+++ b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
@@ -356,6 +356,13 @@
// index recovery
createCommonVertexIndex(management, PROPERTY_KEY_INDEX_RECOVERY_NAME, UniqueKind.GLOBAL_UNIQUE, String.class, SINGLE, true, false);
+ //metrics
+ createCommonVertexIndex(management," __AtlasMetricsStat.metricsId", UniqueKind.GLOBAL_UNIQUE, String.class, SINGLE, true, false);
+ createCommonVertexIndex(management," __AtlasMetricsStat.__u_metricsId", UniqueKind.GLOBAL_UNIQUE, String.class, SINGLE, true, false);
+ createCommonVertexIndex(management," __AtlasMetricsStat.metrics", UniqueKind.NONE, String.class, SINGLE, true, false);
+ createCommonVertexIndex(management," __AtlasMetricsStat.collectionTime", UniqueKind.GLOBAL_UNIQUE, String.class, SINGLE, true, false);
+ createCommonVertexIndex(management," __AtlasMetricsStat.timeToLiveMillis", UniqueKind.NONE, String.class, SINGLE, true, false);
+
// create vertex-centric index
createVertexCentricIndex(management, CLASSIFICATION_LABEL, AtlasEdgeDirection.BOTH, CLASSIFICATION_EDGE_NAME_PROPERTY_KEY, String.class, SINGLE);
createVertexCentricIndex(management, CLASSIFICATION_LABEL, AtlasEdgeDirection.BOTH, CLASSIFICATION_EDGE_IS_PROPAGATED_PROPERTY_KEY, Boolean.class, SINGLE);
diff --git a/repository/src/main/java/org/apache/atlas/repository/ogm/metrics/AtlasMetricsStatDTO.java b/repository/src/main/java/org/apache/atlas/repository/ogm/metrics/AtlasMetricsStatDTO.java
new file mode 100644
index 0000000..6af935e
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/ogm/metrics/AtlasMetricsStatDTO.java
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.repository.ogm.metrics;
+
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
+import org.apache.atlas.model.metrics.AtlasMetrics;
+import org.apache.atlas.model.metrics.AtlasMetricsStat;
+import org.apache.atlas.repository.impexp.AuditsWriter;
+import org.apache.atlas.repository.ogm.AbstractDataTransferObject;
+import org.apache.atlas.type.AtlasType;
+import org.apache.atlas.type.AtlasTypeRegistry;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+import javax.inject.Inject;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.atlas.model.metrics.AtlasMetricsStat.METRICS_ID_PREFIX_PROPERTY;
+
+
+/**
+ * AtlasMetricsStatDTO is the bridge class in between AtlasMetricsStat and AtlasEntity.
+ */
+@Component
+public class AtlasMetricsStatDTO extends AbstractDataTransferObject<AtlasMetricsStat> {
+ private static final Logger LOG = LoggerFactory.getLogger(AtlasMetricsStatDTO.class);
+
+ public static final String METRICS_ENTITY_TYPE_NAME = "__AtlasMetricsStat";
+ public static final String METRICS_ID_PROPERTY = "metricsId";
+ private static final String METRICS_PROPERTY = "metrics";
+ private static final String COLLECTION_TIME_PROPERTY = "collectionTime";
+ private static final String TIME_TO_LIVE_PROPERTY = "timeToLiveMillis";
+ private static final String UNIQUE_NAME_PROPERTY = "uniqueName";
+
+ @Inject
+ public AtlasMetricsStatDTO(AtlasTypeRegistry typeRegistry) {
+ super(typeRegistry, AtlasMetricsStat.class, METRICS_ENTITY_TYPE_NAME);
+ }
+
+ @Override
+ public AtlasMetricsStat from(AtlasEntity entity) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("==> AtlasMetricsStatDTO.from({})", entity);
+ }
+
+ AtlasMetricsStat metricsStat = null;
+
+ String jsonMetrics = (String) entity.getAttribute(METRICS_PROPERTY);
+
+ if (StringUtils.isNotEmpty(jsonMetrics)) {
+ metricsStat = new AtlasMetricsStat(AtlasType.fromJson(jsonMetrics, AtlasMetrics.class));
+ }
+
+ if (metricsStat == null) {
+ LOG.error("MetricStat cannot be created without metric info. Null has been returned.");
+ } else {
+ metricsStat.setGuid(entity.getGuid());
+ metricsStat.setMetricsId((String) entity.getAttribute(METRICS_ID_PROPERTY));
+
+ metricsStat.setCollectionTime((long) entity.getAttribute(COLLECTION_TIME_PROPERTY));
+
+ metricsStat.setTimeToLiveMillis((long) entity.getAttribute(TIME_TO_LIVE_PROPERTY));
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("<== AtlasMetricsStatDTO.from() : {}", metricsStat);
+ }
+
+ return metricsStat;
+ }
+
+ @Override
+ public AtlasMetricsStat from(AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("==> AtlasMetricsStatDTO.from({})", entityWithExtInfo);
+ }
+
+ AtlasMetricsStat ret = from(entityWithExtInfo.getEntity());
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("<== AtlasMetricsStatDTO.from() : {}", ret);
+ }
+ return ret;
+ }
+
+ @Override
+ public AtlasEntity toEntity(AtlasMetricsStat obj) throws AtlasBaseException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("==> AtlasMetricsStatDTO.toEntity({})", obj);
+ }
+
+ AtlasEntity entity = getDefaultAtlasEntity(obj);
+
+ entity.setAttribute(METRICS_ID_PROPERTY, getUniqueValue(obj));
+
+ if (obj.getMetrics() != null) {
+ entity.setAttribute(METRICS_PROPERTY, AtlasType.toJson(obj.getMetrics()));
+ }
+
+ entity.setAttribute(COLLECTION_TIME_PROPERTY, obj.getCollectionTime());
+ entity.setAttribute(TIME_TO_LIVE_PROPERTY, obj.getTimeToLiveMillis());
+ entity.setAttribute(UNIQUE_NAME_PROPERTY, getUniqueValue(obj));
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("<== AtlasMetricsStatDTO.toEntity() : {}", entity);
+ }
+ return entity;
+ }
+
+ @Override
+ public AtlasEntityWithExtInfo toEntityWithExtInfo(AtlasMetricsStat obj) throws AtlasBaseException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("==> AtlasMetricsStatDTO.toEntityWithExtInfo({})", obj);
+ }
+ AtlasEntityWithExtInfo ret = new AtlasEntityWithExtInfo(toEntity(obj));
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("<== AtlasMetricsStatDTO.toEntityWithExtInfo() : {}", ret);
+ }
+ return ret;
+ }
+
+ @Override
+ public Map<String, Object> getUniqueAttributes(AtlasMetricsStat obj) {
+ Map<String, Object> ret = new HashMap<>();
+ ret.put(METRICS_ID_PROPERTY, getUniqueValue(obj));
+ return ret;
+ }
+
+ private String getUniqueValue(AtlasMetricsStat obj) {
+ return METRICS_ID_PREFIX_PROPERTY + obj.getCollectionTime() + "@" + AuditsWriter.getCurrentClusterName();
+ }
+}
diff --git a/repository/src/main/java/org/apache/atlas/services/MetricsService.java b/repository/src/main/java/org/apache/atlas/services/MetricsService.java
index ceb2528..9ec2cd2 100644
--- a/repository/src/main/java/org/apache/atlas/services/MetricsService.java
+++ b/repository/src/main/java/org/apache/atlas/services/MetricsService.java
@@ -17,40 +17,62 @@
*/
package org.apache.atlas.services;
+import org.apache.atlas.AtlasErrorCode;
+import org.apache.atlas.SortOrder;
import org.apache.atlas.annotation.AtlasService;
import org.apache.atlas.annotation.GraphTransaction;
import org.apache.atlas.authorize.AtlasAuthorizationUtils;
import org.apache.atlas.authorize.AtlasTypesDefFilterRequest;
+import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.instance.AtlasEntity.Status;
import org.apache.atlas.model.metrics.AtlasMetrics;
+import org.apache.atlas.model.metrics.AtlasMetricsMapToChart;
+import org.apache.atlas.model.metrics.AtlasMetricsStat;
import org.apache.atlas.model.typedef.AtlasClassificationDef;
import org.apache.atlas.model.typedef.AtlasEntityDef;
import org.apache.atlas.model.typedef.AtlasTypesDef;
import org.apache.atlas.repository.Constants;
import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.repository.graphdb.AtlasVertex;
+import org.apache.atlas.repository.ogm.DataAccess;
import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.util.AtlasMetricJVMUtil;
import org.apache.atlas.util.AtlasMetricsUtil;
import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.inject.Inject;
+import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
import java.util.Map;
+import java.util.Objects;
+import java.time.Instant;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
import static org.apache.atlas.discovery.SearchProcessor.AND_STR;
import static org.apache.atlas.model.instance.AtlasEntity.Status.ACTIVE;
import static org.apache.atlas.model.instance.AtlasEntity.Status.DELETED;
import static org.apache.atlas.repository.Constants.*;
+import static org.apache.atlas.repository.ogm.metrics.AtlasMetricsStatDTO.METRICS_ENTITY_TYPE_NAME;
+import static org.apache.atlas.repository.ogm.metrics.AtlasMetricsStatDTO.METRICS_ID_PROPERTY;
@AtlasService
public class MetricsService {
private static final Logger LOG = LoggerFactory.getLogger(MetricsService.class);
+ private final DataAccess dataAccess;
+
// Query Category constants
public static final String TYPE = "type";
public static final String TYPE_SUBTYPES = "typeAndSubTypes";
@@ -76,6 +98,7 @@
protected static final String METRIC_ENTITY_ACTIVE_INCL_SUBTYPES = ENTITY + "Active"+"-"+TYPE_SUBTYPES;
protected static final String METRIC_ENTITY_DELETED_INCL_SUBTYPES = ENTITY + "Deleted"+"-"+TYPE_SUBTYPES;
protected static final String METRIC_ENTITY_SHELL_INCL_SUBTYPES = ENTITY + "Shell"+"-"+TYPE_SUBTYPES;
+ protected static final String[] STATUS_CATEGORY = {"Active", "Deleted", "Shell"};
private final AtlasGraph atlasGraph;
private final AtlasTypeRegistry typeRegistry;
@@ -83,10 +106,12 @@
private final String indexSearchPrefix = AtlasGraphUtilsV2.getIndexSearchPrefix();
@Inject
- public MetricsService(final AtlasGraph graph, final AtlasTypeRegistry typeRegistry, AtlasMetricsUtil metricsUtil) {
+ public MetricsService(final AtlasGraph graph, final AtlasTypeRegistry typeRegistry, AtlasMetricsUtil metricsUtil,
+ DataAccess dataAccess) {
this.atlasGraph = graph;
this.typeRegistry = typeRegistry;
this.metricsUtil = metricsUtil;
+ this.dataAccess = dataAccess;
}
@SuppressWarnings("unchecked")
@@ -134,7 +159,6 @@
}
}
-
for (AtlasEntityDef entityDef : entityDefs) {
AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entityDef.getName());
@@ -262,4 +286,240 @@
return ret;
}
+ public AtlasMetricsStat saveMetricsStat(AtlasMetricsStat metricsStat) throws AtlasBaseException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("==> MetricsService.saveMetricsStat({})", metricsStat);
+ }
+
+ if (Objects.isNull(metricsStat) || StringUtils.isEmpty(metricsStat.getMetricsId())) {
+ throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, "MetricsStat definition missing.");
+ }
+
+ if (metricsStatExists(metricsStat)) {
+ throw new AtlasBaseException(AtlasErrorCode.METRICSSTAT_ALREADY_EXISTS, String.valueOf(metricsStat.getCollectionTime()));
+ }
+
+ AtlasMetricsStat storeObject = dataAccess.save(metricsStat);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("<== MetricsService.saveMetricsStat() : {}", storeObject);
+ }
+
+ return storeObject;
+ }
+
+ public void purgeMetricsStats() throws AtlasBaseException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("==> MetricsService.purgeMetricsStats()");
+ }
+
+ long currentTimeMillis = System.currentTimeMillis();
+
+ List<AtlasMetricsStat> metricsStats = getAllMetricsStats(true)
+ .stream()
+ .filter(c -> c.getCollectionTime() + c.getTimeToLiveMillis() < currentTimeMillis)
+ .collect(Collectors.toList());
+
+ for (AtlasMetricsStat a : metricsStats) {
+ long collectedTime = a.getCollectionTime();
+ deleteMetricsStatByCollectionTime(String.valueOf(collectedTime));
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("<== MetricsService.purgeMetricsStats() : {}", metricsStats);
+ }
+ }
+
+ @GraphTransaction
+ public AtlasMetricsStat getMetricsStatByCollectionTime(final String collectionTime) throws AtlasBaseException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("==> MetricsService.getMetricsStatByCollectionTime({})", collectionTime);
+ }
+
+ if (StringUtils.isBlank(collectionTime)) {
+ throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "collectionTime is null/empty");
+ }
+
+ AtlasMetricsStat ret;
+
+ AtlasMetricsStat metricsStat = new AtlasMetricsStat();
+ metricsStat.setCollectionTime(Long.parseLong(collectionTime));
+
+ ret = dataAccess.load(metricsStat);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("<== MetricsService.getMetricsStatByCollectionTime() : {}", ret);
+ }
+
+ return ret;
+ }
+
+ @GraphTransaction
+ public void deleteMetricsStatByCollectionTime(final String collectionTime) throws AtlasBaseException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("==> MetricsService.deleteMetricsStatByCollectionTime({})", collectionTime);
+ }
+
+ if (StringUtils.isEmpty(collectionTime)) {
+ throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, collectionTime);
+ }
+
+ AtlasMetricsStat deleteStat = getMetricsStatByCollectionTime(collectionTime);
+
+ dataAccess.delete(deleteStat.getGuid());
+
+ // delete log
+ if (LOG.isDebugEnabled()) {
+ long currTime = System.currentTimeMillis();
+ long collectedTime = deleteStat.getCollectionTime();
+
+ LOG.info("MetricsService.deleteMetricsStatByCollectionTime(): At {}, metricsStat with collectionTime: {}, persisted hours: {}, is deleted. ",
+ Instant.ofEpochMilli(currTime),
+ Instant.ofEpochMilli(collectedTime),
+ TimeUnit.MILLISECONDS.toHours(currTime - collectedTime)
+ );
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("<== MetricsService.deleteMetricsStatByCollectionTime({})", collectionTime);
+ }
+
+ }
+
+ @GraphTransaction
+ public List<AtlasMetricsStat> getAllMetricsStats(Boolean minInfo) throws AtlasBaseException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("==> MetricsService.getAllMetricsStats()");
+ }
+
+ List<AtlasMetricsStat> ret = new ArrayList<>();
+ // SortOrder.ASCENDING is a necessary input parameter. It only sorts GUIDs, but not collectionTime.
+ List<String> guids = AtlasGraphUtilsV2.findEntityGUIDsByType(METRICS_ENTITY_TYPE_NAME, SortOrder.ASCENDING);
+
+ if (CollectionUtils.isNotEmpty(guids)) {
+ List<AtlasMetricsStat> metricsToLoad = guids.stream()
+ .map(AtlasMetricsStat::new)
+ .collect(Collectors.toList());
+
+ Iterable<AtlasMetricsStat> metricsStats = dataAccess.load(metricsToLoad);
+
+
+ ret = StreamSupport.stream(metricsStats.spliterator(), false)
+ .sorted((a, b) -> (int) (b.getCollectionTime() - a.getCollectionTime()))
+ .map(m -> {
+ if(minInfo) {
+ m.setMetrics(null);
+ }
+ return m;
+ }).collect(Collectors.toList());
+
+ } else {
+ ret = Collections.emptyList();
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("<== MetricsService.getAllMetricsStats() : {}", ret);
+ }
+
+ return ret;
+ }
+
+
+ public List<AtlasMetricsStat> getMetricsInRangeByTypeNames(long startTime,
+ long endTime,
+ List<String> typeNames) throws AtlasBaseException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("==> MetricsService.getMetricsInRangeByTypeNames({}, {}, {})", startTime, endTime, String.join(", ", typeNames));
+ }
+
+ if (startTime >= endTime) {
+ throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS,
+ "startTime: '" + startTime + "', should be less than, endTime: '" + endTime + "'");
+ }
+
+ List<AtlasMetricsStat> metricsInRange;
+ List<AtlasMetricsStat> allMetrics = getAllMetricsStats(false);
+
+ metricsInRange = allMetrics.stream()
+ .filter(m -> m.getCollectionTime() >= startTime && m.getCollectionTime() <= endTime)
+ .map(m -> {
+ m = new AtlasMetricsStat(m.getMetrics(), typeNames);
+ m.setMetrics(null);
+ return m;
+ })
+ .collect(Collectors.toList());
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("<== MetricsService.getMetricsInRangeByTypeNames() : {}", metricsInRange);
+ }
+
+ return metricsInRange;
+ }
+
+ public Map<String, List<AtlasMetricsMapToChart>> getMetricsForChartByTypeNames(long startTime,
+ long endTime,
+ List<String> typeNames) throws AtlasBaseException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("==> MetricsService.getMetricsForChartByTypeNames({}, {}, {})", startTime, endTime, typeNames);
+ }
+
+ // Calling getMetricsInRangeByTypeNames() and constructing AtlasMetricsStat with list of typeNames, to retrieve JanusGraph only once.
+ Map<String, List<AtlasMetricsMapToChart>> ret = new HashMap<>();
+
+ // Returned metrics were sorted by collectionTime descendingly. Reverse it to ascending order to match stacked area chart's required input format.
+ List<AtlasMetricsStat> metrics = getMetricsInRangeByTypeNames(startTime, endTime, typeNames);
+ Collections.reverse(metrics);
+
+ for (String typeName : typeNames) {
+ Map<String, List<long[]>> statusCategory = mapToStatusCategoryByOneType(metrics, typeName);
+
+ if (MapUtils.isNotEmpty(statusCategory)) {
+ ret.put(typeName, statusCategory.entrySet()
+ .stream()
+ .map(c -> new AtlasMetricsMapToChart(c.getKey(), c.getValue()))
+ .collect(Collectors.toList())
+ );
+ } else {
+ LOG.info("MetricsService.getMetricsForChartByTypeNames() : data of typeName:{} cannot be found.", typeName);
+ ret.put(typeName, Collections.emptyList());
+ }
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("<== MetricsService.getMetricsForChartByTypeNames() : {}", ret);
+ }
+
+ return ret;
+ }
+
+ /** Mapping each typeName's counting info in AtlasMetricsStat to the required format to render the stacked area chart.
+ * Keys: 3 categories: Active, Deleted & Shell.
+ * Values: a list of pair with the first element as collectionTime, and the second element as count.
+ */
+ private Map<String, List<long[]>> mapToStatusCategoryByOneType(List<AtlasMetricsStat> metrics, String typeName) {
+ // Use LinkedHashMap to make sure the status are in order as Active, Deleted and Shell for rendering chart
+ Map<String, List<long[]>> statusCategory = new LinkedHashMap<>();
+
+ for (AtlasMetricsStat metric : metrics) {
+ Map<String, Integer> metricsMap = null;
+ if (metric.getTypeData() != null) {
+ metricsMap = (Map<String, Integer>) metric.getTypeData().get(typeName);
+ }
+
+ for (String status : STATUS_CATEGORY) {
+ long statusCnt = metricsMap == null? (long) 0: metricsMap.get(status);
+ statusCategory.computeIfAbsent(status, a -> new ArrayList<>()).add(new long[]{metric.getCollectionTime(), statusCnt});
+ }
+ }
+
+ return statusCategory;
+ }
+
+ private boolean metricsStatExists(AtlasMetricsStat metricsStat) {
+ AtlasVertex vertex = AtlasGraphUtilsV2.findByUniqueAttributes(typeRegistry.getEntityTypeByName(METRICS_ENTITY_TYPE_NAME), new HashMap<String, Object>() {{
+ put(METRICS_ID_PROPERTY, metricsStat.getMetricsId());
+ }});
+ return Objects.nonNull(vertex);
+ }
+
}
\ No newline at end of file
diff --git a/repository/src/test/java/org/apache/atlas/TestModules.java b/repository/src/test/java/org/apache/atlas/TestModules.java
index 8dda208..a0a6354 100644
--- a/repository/src/test/java/org/apache/atlas/TestModules.java
+++ b/repository/src/test/java/org/apache/atlas/TestModules.java
@@ -52,6 +52,7 @@
import org.apache.atlas.repository.ogm.glossary.AtlasGlossaryCategoryDTO;
import org.apache.atlas.repository.ogm.glossary.AtlasGlossaryDTO;
import org.apache.atlas.repository.ogm.glossary.AtlasGlossaryTermDTO;
+import org.apache.atlas.repository.ogm.metrics.AtlasMetricsStatDTO;
import org.apache.atlas.repository.ogm.profiles.AtlasSavedSearchDTO;
import org.apache.atlas.repository.ogm.profiles.AtlasUserProfileDTO;
import org.apache.atlas.repository.store.graph.AtlasEntityStore;
@@ -178,6 +179,7 @@
availableDTOs.addBinding().to(AtlasServerDTO.class);
availableDTOs.addBinding().to(ExportImportAuditEntryDTO.class);
availableDTOs.addBinding().to(AtlasAuditEntryDTO.class);
+ availableDTOs.addBinding().to(AtlasMetricsStatDTO.class);
bind(DTORegistry.class).asEagerSingleton();
bind(DataAccess.class).asEagerSingleton();
diff --git a/repository/src/test/java/org/apache/atlas/services/MetricsServiceTest.java b/repository/src/test/java/org/apache/atlas/services/MetricsServiceTest.java
index 0405921..d114bf5 100644
--- a/repository/src/test/java/org/apache/atlas/services/MetricsServiceTest.java
+++ b/repository/src/test/java/org/apache/atlas/services/MetricsServiceTest.java
@@ -17,10 +17,12 @@
*/
package org.apache.atlas.services;
+import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.RequestContext;
import org.apache.atlas.TestModules;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.metrics.AtlasMetrics;
+import org.apache.atlas.model.metrics.AtlasMetricsStat;
import org.apache.atlas.repository.AtlasTestBase;
import org.apache.atlas.repository.graph.AtlasGraphProvider;
import org.apache.atlas.repository.impexp.ImportService;
@@ -46,18 +48,9 @@
import java.util.Map;
import static org.apache.atlas.model.metrics.AtlasMetrics.*;
+import static org.apache.atlas.services.MetricsService.*;
import static org.apache.atlas.utils.TestLoadModelUtils.loadModelFromJson;
import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.runImportWithNoParameters;
-import static org.apache.atlas.services.MetricsService.ENTITY;
-import static org.apache.atlas.services.MetricsService.GENERAL;
-import static org.apache.atlas.services.MetricsService.METRIC_ENTITIES_PER_TAG;
-import static org.apache.atlas.services.MetricsService.METRIC_ENTITY_ACTIVE;
-import static org.apache.atlas.services.MetricsService.METRIC_ENTITY_COUNT;
-import static org.apache.atlas.services.MetricsService.METRIC_ENTITY_DELETED;
-import static org.apache.atlas.services.MetricsService.METRIC_TAG_COUNT;
-import static org.apache.atlas.services.MetricsService.METRIC_TYPE_COUNT;
-import static org.apache.atlas.services.MetricsService.METRIC_TYPE_UNUSED_COUNT;
-import static org.apache.atlas.services.MetricsService.TAG;
import static org.testng.Assert.*;
@Guice(modules = TestModules.TestOnlyModule.class)
@@ -118,6 +111,9 @@
put(STAT_NOTIFY_FAILED_COUNT_PREV_DAY, 1L);
}};
+ private AtlasMetrics metrics;
+ private AtlasMetricsStat blankMetricsStat, metricsStatInGraph;
+
@BeforeClass
public void setup() throws Exception {
RequestContext.clear();
@@ -145,9 +141,9 @@
super.cleanup();
}
- @Test
+ @Test(groups = "Metrics.CREATE")
public void testGetMetrics() {
- AtlasMetrics metrics = metricsService.getMetrics();
+ metrics = metricsService.getMetrics();
assertNotNull(metrics);
@@ -171,6 +167,64 @@
assertEquals(deletedEntityMetricsActual, deletedEntityMetricsExpected);
}
+ @Test(groups = "Metrics.CREATE", dependsOnMethods = "testGetMetrics")
+ public void testSaveMetricsStat() {
+ try {
+ blankMetricsStat = new AtlasMetricsStat(metrics);
+ metricsStatInGraph = metricsService.saveMetricsStat(blankMetricsStat);
+ } catch (AtlasBaseException e) {
+ fail("Save metricsStat should've succeeded", e);
+ }
+
+ // Duplicate create calls should fail
+ try {
+ AtlasMetricsStat blankMetricsStatDup = new AtlasMetricsStat(metrics);
+ metricsService.saveMetricsStat(blankMetricsStatDup);
+ fail("Save duplicate metricsStat should've failed");
+ } catch (AtlasBaseException e) {
+ assertEquals(e.getAtlasErrorCode(), AtlasErrorCode.METRICSSTAT_ALREADY_EXISTS);
+ }
+ }
+
+ @Test(groups = "Metrics.CREATE", dependsOnMethods = "testSaveMetricsStat")
+ public void testGetMetricsStatByCollectionTime() {
+ // collectionTime is empty string
+ try {
+ AtlasMetricsStat metricsStatRet = metricsService.getMetricsStatByCollectionTime(" ");
+ fail("Get metricsStat by collectionTime should've failed, when collectionTime is empty.");
+ } catch (AtlasBaseException e) {
+ assertEquals(e.getAtlasErrorCode(), AtlasErrorCode.INVALID_PARAMETERS);
+ }
+
+ // collectionTime is null
+ try {
+ AtlasMetricsStat metricsStatRet = metricsService.getMetricsStatByCollectionTime(null);
+ fail("Get metricsStat by collectionTime should've failed, when collectionTime is null.");
+ } catch (AtlasBaseException e) {
+ assertEquals(e.getAtlasErrorCode(), AtlasErrorCode.INVALID_PARAMETERS);
+ }
+
+ // collectionTime is NOT existed
+ try {
+ Long collectionTimeInGraph = System.currentTimeMillis();
+ AtlasMetricsStat metricsStatRet = metricsService.getMetricsStatByCollectionTime(String.valueOf(collectionTimeInGraph));
+ fail("Get metricsStat by collectionTime should've failed, when collectionTime is NOT existed.");
+ } catch (AtlasBaseException e) {
+ assertEquals(e.getAtlasErrorCode(), AtlasErrorCode.INSTANCE_BY_UNIQUE_ATTRIBUTE_NOT_FOUND);
+ }
+
+ // collectionTime is correct
+ try {
+ Long collectionTimeInGraph = (Long) metrics.getMetric(GENERAL, METRIC_COLLECTION_TIME);
+ AtlasMetricsStat metricsStatRet = metricsService.getMetricsStatByCollectionTime(String.valueOf(collectionTimeInGraph));
+ assertNotNull(metricsStatRet);
+ assertEquals(metricsStatRet.getGuid(), metricsStatInGraph.getGuid());
+ assertEquals(metricsStatRet.getMetricsId(), metricsStatInGraph.getMetricsId());
+ } catch (AtlasBaseException e) {
+ fail("Get metricsStat by valid collectionTime in Graph should've succeeded.");
+ }
+ }
+
@Test
public void testNotificationMetrics() {
Instant now = Clock.systemUTC().instant();
diff --git a/test-tools/src/main/resources/solr/core-template/solrconfig.xml b/test-tools/src/main/resources/solr/core-template/solrconfig.xml
index 3341e71..fa38e72 100644
--- a/test-tools/src/main/resources/solr/core-template/solrconfig.xml
+++ b/test-tools/src/main/resources/solr/core-template/solrconfig.xml
@@ -445,7 +445,7 @@
-->
<lst name="defaults">
<str name="defType">edismax</str>
- <str name="qf">35x_t 5j9_t 7wl_t a9x_t but_t dfp_l f0l_t i6d_l iyt_l jr9_t kjp_s lc5_t m4l_s mx1_t ohx_t xz9_i 1151_t 12px_t 14at_l 15vp_t 1891_t 19tx_t 1bet_t 1czp_t 1ekl_t 1gxx_t 1iit_l 1k3p_t 1lol_t 3z0l_t 4bnp_s 4a2t_s 47ph_s 464l_s 49ad_t 4h6t_t 4d8l_t 4eth_l 4flx_t 4lxh_t 4kcl_t 4nid_t 4umd_t 505h_t 54w5_t 52it_t 53b9_t 51qd_t 5b7p_t 5af9_t 5j45_l 5kp1_l 5hj9_t 5fyd_t 5m9x_t 5c05_t 5csl_t 5edh_t 5wjp_t 5xc5_l 5tdx_t 5q85_l 5rt1_l 5n2d_t 5uyt_t 8mit_t 8emd_t 8o3p_t 8k5h_t 8nb9_t 8lqd_t 8xl1_t 8pol_t 8zyd_t 8v7p_t 8wsl_t 8z5x_t 99fp_t 91j9_t 972d_t 98n9_t crut_t ckqt_t cphh_t cr2d_t cnwl_t cj5x_l cmbp_t cop1_t cjyd_l ctfp_l cy6d_t cyyt_t csn9_l cu85_t cv0l_i cxdx_t d24l_t d1c5_t d2x1_t d4hx_i d8g5_t da11_t ddz9_t derp_i dced_t dgcl_t dfk5_t dl39_t dngl_t dipx_t dlvp_t dkat_t dh51_l dmo5_t dhxh_l fcat_t fo5h_l fpqd_i fimd_t fqit_i frb9_i fs3p_f fyf9_d fxmt_l</str>
+ <str name="qf">35x_t 5j9_t 7wl_t a9x_t but_t dfp_l f0l_t i6d_l iyt_l jr9_t kjp_s lc5_t m4l_s mx1_t ohx_t xz9_i 1151_t 12px_t 14at_l 15vp_t 1891_t 19tx_t 1bet_t 1czp_t 1ekl_t 1gxx_t 1iit_l 1k3p_t 1lol_t 1o1x_t 1qf9_t 1ssl_t 1udh_t 1wqt_t 4d8l_t 4pvp_s 4oat_s 4lxh_s 4kcl_s 4nid_t 4vet_t 4rgl_t 4t1h_l 4ttx_t 505h_t 4ykl_t 51qd_t 58ud_t 5edh_t 5j45_t 5gqt_t 5hj9_t 5fyd_t 5pfp_t 5on9_t 5wjp_l 5y4l_l 5q85_t 5vr9_t 66th_l 68ed_l 658l_t 63np_t 69z9_t 5zph_t 60hx_t 622t_t 6k91_t 6l1h_l 6h39_t 6dxh_l 6fid_l 6arp_t 6io5_t 9a85_t 92bp_t 9bt1_t 97ut_t 9b0l_t 99fp_t 9lad_t 9ddx_t 9nnp_t 9ix1_t 9khx_t 9mv9_t 9x51_t 9p8l_t 9urp_t 9wcl_t dfk5_t d8g5_t dd6t_t derp_t dblx_t d6v9_l da11_t dced_t d7np_l dh51_l dlvp_t dmo5_t dgcl_l dhxh_t dipx_i dl39_t dptx_t dp1h_t dqmd_t ds79_i dw5h_t dxqd_t e1ol_t e2h1_i e03p_t e41x_t e39h_t e8sl_t eb5x_t e6f9_t e9l1_t e805_t e4ud_l eadh_t e5mt_l g005_t gbut_l gdfp_i g6bp_t ge85_i gf0l_i gft1_f gm4l_d glc5_l</str>
<str name="hl.fl">*</str>
<bool name="hl.requireFieldMatch">true</bool>
<bool name="lowercaseOperators">true</bool>
diff --git a/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java b/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java
index 0580f7f..d55ada7 100755
--- a/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java
+++ b/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java
@@ -47,6 +47,8 @@
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.model.metrics.AtlasMetrics;
+import org.apache.atlas.model.metrics.AtlasMetricsMapToChart;
+import org.apache.atlas.model.metrics.AtlasMetricsStat;
import org.apache.atlas.model.patches.AtlasPatch.AtlasPatches;
import org.apache.atlas.model.tasks.AtlasTask;
import org.apache.atlas.repository.audit.AtlasAuditService;
@@ -78,6 +80,9 @@
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.context.annotation.Bean;
+import org.springframework.scheduling.annotation.EnableScheduling;
+import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.security.core.Authentication;
import org.springframework.security.core.GrantedAuthority;
import org.springframework.security.core.context.SecurityContextHolder;
@@ -125,6 +130,7 @@
@Path("admin")
@Singleton
@Service
+@EnableScheduling
public class AdminResource {
private static final Logger LOG = LoggerFactory.getLogger(AdminResource.class);
private static final Logger PERF_LOG = AtlasPerfTracer.getPerfLogger("AdminResource");
@@ -145,6 +151,9 @@
private static final String OPERATION_STATUS = "operationStatus";
private static final List TIMEZONE_LIST = Arrays.asList(TimeZone.getAvailableIDs());
+ private static final String METRICS_PERSIST_INTERVAL = "atlas.metrics.persist.schedule";
+ private static final String METRICS_PERSIST_INTERVAL_DEFAULT = "0 0 0/1 * * *"; // 1 hour interval
+
@Context
private HttpServletRequest httpServletRequest;
@@ -409,8 +418,156 @@
return metrics;
}
- private void releaseExportImportLock() {
- importExportOperationLock.unlock();
+ /** Auto-scheduling API for both creating a Metrics entity and saving it to the database at in preset time interval,
+ * and sweeping through entities that are outside of the valid ttl hours.
+ * @throws AtlasBaseException when the MetricsStat entity has already existed.
+ */
+ @Scheduled(cron="#{getCronExpression}")
+ public void scheduleSaveAndDeleteMetrics() throws AtlasBaseException {
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("==> AdminResource.scheduleSaveAndDeleteMetrics()");
+ }
+
+ // auto persist
+ saveMetrics();
+
+ // auto purge
+ metricsService.purgeMetricsStats();
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("<== AdminResource.scheduleSaveAndDeleteMetrics()");
+ }
+ }
+
+ /**
+ * Bulk retrieval API for getting all MetricsStats, with mininfo flag return metrics with specific details, or with minimal information.
+ * @return all MetricsStats in Atlas.
+ * @throws AtlasBaseException when there is no MetricsStats entity in the database.
+ */
+ @GET
+ @Path("metricsstats")
+ @Produces(Servlets.JSON_MEDIA_TYPE)
+ public List<AtlasMetricsStat> getAllMetrics(@QueryParam("mininfo") @DefaultValue("true") Boolean minInfo) throws AtlasBaseException {
+ AtlasPerfTracer perf = null;
+
+ try {
+ if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) {
+ perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "AdminResource.getAllMetrics()");
+ }
+
+ return metricsService.getAllMetricsStats(minInfo);
+ } finally {
+ AtlasPerfTracer.log(perf);
+ }
+ }
+
+ /**
+ * Retrieval API for retrieving the MetricsStat with a specific collectionTime.
+ * @return the MetricsStat with the specific collectionTime.
+ * @throws AtlasBaseException when the MetricsStat entity with this specific collectionTime cannot be found.
+ */
+ @GET
+ @Path("metricsstat/{collectionTime}")
+ @Produces(Servlets.JSON_MEDIA_TYPE)
+ public AtlasMetricsStat getMetricsByCollectionTime(@PathParam("collectionTime") String collectionTime) throws AtlasBaseException {
+ Servlets.validateQueryParamLength("collectionTime", collectionTime);
+
+ AtlasPerfTracer perf = null;
+
+ try {
+ if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) {
+ perf = AtlasPerfTracer
+ .getPerfTracer(PERF_LOG,
+ "AdminResource.getMetricsByCollectionTime(collectionTime=" + collectionTime + ")");
+ }
+
+ return metricsService.getMetricsStatByCollectionTime(collectionTime);
+ } finally {
+ AtlasPerfTracer.log(perf);
+ }
+ }
+
+ /** Retrieval API for retrieving persisted MetricsStats with collectionTime within range of startTime and endTime.
+ * @param startTime start timestamp of the time range.
+ * @param endTime end timestamp of the time range.
+ * @param typeNames a list of typeNames with their counting information, as well as their metrics' minimal information.
+ * @return persisted Metrics with its collectionTime within time range, in the form of minimal information.
+ * @throws AtlasBaseException when the input of startTime and endTime is null or invalid.
+ */
+ @GET
+ @Path("metricsstats/range")
+ @Produces(Servlets.JSON_MEDIA_TYPE)
+ public List<AtlasMetricsStat> getMetricsInTimeRange(@QueryParam("startTime") String startTime,
+ @QueryParam("endTime") String endTime,
+ @QueryParam("typeName") List<String> typeNames) throws AtlasBaseException {
+ if (StringUtils.isBlank(startTime) || StringUtils.isBlank(endTime)) {
+ throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, "startTime or endTime is null/empty.");
+ }
+
+ Servlets.validateQueryParamLength("startTime", startTime);
+ Servlets.validateQueryParamLength("endTime", endTime);
+ for (String typeName : typeNames) {
+ Servlets.validateQueryParamLength("typeName", typeName);
+ }
+
+ AtlasPerfTracer perf = null;
+ try {
+ if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) {
+ perf = AtlasPerfTracer.getPerfTracer(PERF_LOG,
+ "AdminResource.getMetricsInTimeRange(startTime=" + startTime + ", " +
+ "endTime=" + endTime + ", " +
+ "listOfTypeNames=" + String.join(", ", typeNames) + ")" );
+ }
+
+ return metricsService.getMetricsInRangeByTypeNames(Long.parseLong(startTime), Long.parseLong(endTime), typeNames);
+ } finally {
+ AtlasPerfTracer.log(perf);
+ }
+ }
+
+ /** Retrieval API for retrieving & formatting MetricsStats (within valid range) to render stacked area chart. The process contains:
+ * 1. retrieve persisted MetricsStats with collectionTime within range of startTime and endTime by one typeName
+ * 2. map the returned MetricsStats to the required format for rendering stacked area chart
+ * Currently, one typeName corresponds to one chart. The API can take multiple typeNames. The returned JSON file can be used to render multiple charts.
+ * @param startTime start timestamp of the time range.
+ * @param endTime end timestamp of the time range.
+ * @param typeNames a list of typeNames with their counting information, as well as their metrics' minimal information.
+ * @return formatted metrics to render one or multiple stacked area charts.
+ * @throws AtlasBaseException when the input of startTime and endTime is null or invalid.
+ */
+ @GET
+ @Path("metricsstats/charts")
+ @Produces(Servlets.JSON_MEDIA_TYPE)
+ public Map<String, List<AtlasMetricsMapToChart>> getMetricsForChartByTypeNames(
+ @QueryParam("startTime") String startTime,
+ @QueryParam("endTime") String endTime,
+ @QueryParam("typeName") List<String> typeNames) throws AtlasBaseException {
+
+ if (StringUtils.isBlank(startTime) || StringUtils.isBlank(endTime)) {
+ throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, "startTime or endTime is null/empty.");
+ }
+
+ Servlets.validateQueryParamLength("startTime", startTime);
+ Servlets.validateQueryParamLength("endTime", endTime);
+ for (String typeName : typeNames) {
+ Servlets.validateQueryParamLength("typeName", typeName);
+ }
+
+ AtlasPerfTracer perf = null;
+ try {
+ if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) {
+ perf = AtlasPerfTracer.getPerfTracer(PERF_LOG,
+ "AdminResource.getMetricsForChartByTypeNames(" +
+ "startTime=" + startTime + ", " +
+ "endTime=" + endTime + ", " +
+ "listOfTypeNames=" + String.join(", ", typeNames) + ")" );
+ }
+
+ return metricsService.getMetricsForChartByTypeNames(Long.parseLong(startTime), Long.parseLong(endTime), typeNames);
+ } finally {
+ AtlasPerfTracer.log(perf);
+ }
}
@POST
@@ -867,4 +1024,39 @@
auditService.add(auditOperation, params, AtlasJson.toJson(entityCountByType), resultCount);
}
+
+ private void releaseExportImportLock() {
+ importExportOperationLock.unlock();
+ }
+
+ /** Get customized time interval to persist metrics in CM, or use default persist hour (1hr interval).
+ * There are 6 fields. Default 1 hr interval: 0 0 0/1 * * *
+ */
+ @Bean
+ private String getCronExpression() {
+ if (atlasProperties != null) {
+ return atlasProperties.getString(METRICS_PERSIST_INTERVAL, METRICS_PERSIST_INTERVAL_DEFAULT);
+ } else {
+ return METRICS_PERSIST_INTERVAL_DEFAULT;
+ }
+ }
+
+ /** Save an AtlasMetrics as AtlasMetricsStat to db.
+ * @throws AtlasBaseException when the AtlasMetricsStat is null or when the AtlasMetricsStat already exists in db.
+ */
+ private void saveMetrics() throws AtlasBaseException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("==> AdminResource.saveMetrics()");
+ }
+
+ AtlasMetrics metrics = metricsService.getMetrics();
+
+ AtlasMetricsStat metricsStat = new AtlasMetricsStat(metrics);
+ metricsService.saveMetricsStat(metricsStat);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("<== AdminResource.saveMetrics()");
+ }
+ }
+
}