blob: 0ffff76eba70e8b81f080ff63b3141ad496cf285 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ambari.metrics.core.timeline.discovery;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata;
/**
* Sync metadata info with the store
*/
public class TimelineMetricMetadataSync implements Runnable {
private static final Log LOG = LogFactory.getLog(TimelineMetricMetadataSync.class);
private final TimelineMetricMetadataManager cacheManager;
public TimelineMetricMetadataSync(TimelineMetricMetadataManager cacheManager) {
this.cacheManager = cacheManager;
}
@Override
public void run() {
LOG.debug("Persisting metric metadata...");
persistMetricMetadata();
LOG.debug("Persisting hosted apps metadata...");
persistHostAppsMetadata();
LOG.debug("Persisting hosted instance metadata...");
persistHostInstancesMetadata();
if (cacheManager.isDistributedModeEnabled()) {
LOG.debug("Refreshing metric metadata...");
refreshMetricMetadata();
LOG.debug("Refreshing hosted apps metadata...");
refreshHostAppsMetadata();
LOG.debug("Refreshing hosted instances metadata...");
refreshHostedInstancesMetadata();
}
}
/**
* Find metrics not persisted to store and persist them
*/
private void persistMetricMetadata() {
List<TimelineMetricMetadata> metadataToPersist = new ArrayList<>();
// Find all entries to persist
for (TimelineMetricMetadata metadata : cacheManager.getMetadataCache().values()) {
if (!metadata.isPersisted()) {
metadataToPersist.add(metadata);
}
}
boolean markSuccess = false;
if (!metadataToPersist.isEmpty()) {
try {
cacheManager.persistMetadata(metadataToPersist);
markSuccess = true;
} catch (SQLException e) {
LOG.warn("Error persisting metadata.", e);
}
}
// Mark corresponding entries as persisted to skip on next run
if (markSuccess) {
for (TimelineMetricMetadata metadata : metadataToPersist) {
TimelineMetricMetadataKey key = new TimelineMetricMetadataKey(
metadata.getMetricName(), metadata.getAppId(), metadata.getInstanceId()
);
// Mark entry as being persisted
metadata.setIsPersisted(true);
// Update cache
cacheManager.getMetadataCache().put(key, metadata);
}
}
}
/**
* Read all metric metadata and update cached values - HA mode
*/
private void refreshMetricMetadata() {
Map<TimelineMetricMetadataKey, TimelineMetricMetadata> metadataFromStore = null;
try {
metadataFromStore = cacheManager.getMetadataFromStore();
} catch (SQLException e) {
LOG.warn("Error refreshing metadata from store.", e);
}
if (metadataFromStore != null) {
Map<TimelineMetricMetadataKey, TimelineMetricMetadata> cachedMetadata =
cacheManager.getMetadataCache();
for (Map.Entry<TimelineMetricMetadataKey, TimelineMetricMetadata> metadataEntry : metadataFromStore.entrySet()) {
if (!cachedMetadata.containsKey(metadataEntry.getKey())) {
cachedMetadata.put(metadataEntry.getKey(), metadataEntry.getValue());
}
}
}
}
/**
* Sync hosted apps data if needed
*/
private void persistHostAppsMetadata() {
if (cacheManager.syncHostedAppsMetadata()) {
Map<String, TimelineMetricHostMetadata> persistedData = null;
try {
persistedData = cacheManager.getHostedAppsFromStore();
} catch (SQLException e) {
LOG.warn("Failed on fetching hosted apps data from store.", e);
return; // Something wrong with store
}
Map<String, TimelineMetricHostMetadata> cachedData = cacheManager.getHostedAppsCache();
Map<String, TimelineMetricHostMetadata> dataToSync = new HashMap<>();
if (cachedData != null && !cachedData.isEmpty()) {
for (Map.Entry<String, TimelineMetricHostMetadata> cacheEntry : cachedData.entrySet()) {
// No persistence / stale data in store
if (persistedData == null || persistedData.isEmpty() ||
!persistedData.containsKey(cacheEntry.getKey()) ||
!persistedData.get(cacheEntry.getKey()).getHostedApps().keySet().containsAll(cacheEntry.getValue().getHostedApps().keySet())) {
dataToSync.put(cacheEntry.getKey(), cacheEntry.getValue());
}
}
try {
cacheManager.persistHostedAppsMetadata(dataToSync);
cacheManager.markSuccessOnSyncHostedAppsMetadata();
} catch (SQLException e) {
LOG.warn("Error persisting hosted apps metadata.", e);
}
}
}
}
/**
* Sync apps instances data if needed
*/
private void persistHostInstancesMetadata() {
if (cacheManager.syncHostedInstanceMetadata()) {
Map<String, Set<String>> persistedData = null;
try {
persistedData = cacheManager.getHostedInstancesFromStore();
} catch (SQLException e) {
LOG.warn("Failed on fetching hosted instances data from store.", e);
return; // Something wrong with store
}
Map<String, Set<String>> cachedData = cacheManager.getHostedInstanceCache();
Map<String, Set<String>> dataToSync = new HashMap<>();
if (cachedData != null && !cachedData.isEmpty()) {
for (Map.Entry<String, Set<String>> cacheEntry : cachedData.entrySet()) {
// No persistence / stale data in store
if (persistedData == null || persistedData.isEmpty() ||
!persistedData.containsKey(cacheEntry.getKey()) ||
!persistedData.get(cacheEntry.getKey()).containsAll(cacheEntry.getValue())) {
dataToSync.put(cacheEntry.getKey(), cacheEntry.getValue());
}
}
try {
cacheManager.persistHostedInstanceMetadata(dataToSync);
cacheManager.markSuccessOnSyncHostedInstanceMetadata();
} catch (SQLException e) {
LOG.warn("Error persisting hosted apps metadata.", e);
}
}
}
}
/**
* Read all hosted apps metadata and update cached values - HA
*/
private void refreshHostAppsMetadata() {
Map<String, TimelineMetricHostMetadata> hostedAppsDataFromStore = null;
try {
hostedAppsDataFromStore = cacheManager.getHostedAppsFromStore();
} catch (SQLException e) {
LOG.warn("Error refreshing metadata from store.", e);
}
if (hostedAppsDataFromStore != null) {
Map<String, TimelineMetricHostMetadata> cachedData = cacheManager.getHostedAppsCache();
for (Map.Entry<String, TimelineMetricHostMetadata> storeEntry : hostedAppsDataFromStore.entrySet()) {
if (!cachedData.containsKey(storeEntry.getKey())) {
cachedData.put(storeEntry.getKey(), storeEntry.getValue());
}
}
}
}
private void refreshHostedInstancesMetadata() {
Map<String, Set<String>> hostedInstancesFromStore = null;
try {
hostedInstancesFromStore = cacheManager.getHostedInstancesFromStore();
} catch (SQLException e) {
LOG.warn("Error refreshing metadata from store.", e);
}
if (hostedInstancesFromStore != null) {
Map<String, Set<String>> cachedData = cacheManager.getHostedInstanceCache();
for (Map.Entry<String, Set<String>> storeEntry : hostedInstancesFromStore.entrySet()) {
if (!cachedData.containsKey(storeEntry.getKey())) {
cachedData.put(storeEntry.getKey(), storeEntry.getValue());
}
}
}
}
}