blob: 8df60ab15a53ae9000b9e7e2e330ec4520e264c3 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.yarn.server.timeline;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.timeline.security.TimelineACLsManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
/**
* Cache item for timeline server v1.5 reader cache. Each cache item has a
* TimelineStore that can be filled with data within one entity group.
*/
public class EntityCacheItem {
private static final Logger LOG
= LoggerFactory.getLogger(EntityCacheItem.class);
private TimelineStore store;
private TimelineEntityGroupId groupId;
private EntityGroupFSTimelineStore.AppLogs appLogs;
private long lastRefresh;
private Configuration config;
public EntityCacheItem(TimelineEntityGroupId gId, Configuration config) {
this.groupId = gId;
this.config = config;
}
/**
* @return The application log associated to this cache item, may be null.
*/
public synchronized EntityGroupFSTimelineStore.AppLogs getAppLogs() {
return this.appLogs;
}
/**
* Set the application logs to this cache item. The entity group should be
* associated with this application.
*
* @param incomingAppLogs Application logs this cache item mapped to
*/
public synchronized void setAppLogs(
EntityGroupFSTimelineStore.AppLogs incomingAppLogs) {
this.appLogs = incomingAppLogs;
}
/**
* @return The timeline store, either loaded or unloaded, of this cache item.
* This method will not hold the storage from being reclaimed.
*/
public synchronized TimelineStore getStore() {
return store;
}
/**
* Refresh this cache item if it needs refresh. This will enforce an appLogs
* rescan and then load new data. The refresh process is synchronized with
* other operations on the same cache item.
*
* @param aclManager ACL manager for the timeline storage
* @param metrics Metrics to trace the status of the entity group store
* @return a {@link org.apache.hadoop.yarn.server.timeline.TimelineStore}
* object filled with all entities in the group.
* @throws IOException
*/
public synchronized TimelineStore refreshCache(TimelineACLsManager aclManager,
EntityGroupFSTimelineStoreMetrics metrics) throws IOException {
if (needRefresh()) {
long startTime = Time.monotonicNow();
// If an application is not finished, we only update summary logs (and put
// new entities into summary storage).
// Otherwise, since the application is done, we can update detail logs.
if (!appLogs.isDone()) {
appLogs.parseSummaryLogs();
} else if (appLogs.getDetailLogs().isEmpty()) {
appLogs.scanForLogs();
}
if (!appLogs.getDetailLogs().isEmpty()) {
if (store == null) {
store = ReflectionUtils.newInstance(config.getClass(
YarnConfiguration
.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_CACHE_STORE,
MemoryTimelineStore.class, TimelineStore.class),
config);
store.init(config);
store.start();
} else {
// Store is not null, the refresh is triggered by stale storage.
metrics.incrCacheStaleRefreshes();
}
try (TimelineDataManager tdm =
new TimelineDataManager(store, aclManager)) {
tdm.init(config);
tdm.start();
// Load data from appLogs to tdm
appLogs.loadDetailLog(tdm, groupId);
}
}
updateRefreshTimeToNow();
metrics.addCacheRefreshTime(Time.monotonicNow() - startTime);
} else {
LOG.debug("Cache new enough, skip refreshing");
metrics.incrNoRefreshCacheRead();
}
return store;
}
/**
* Force releasing the cache item for the given group id, even though there
* may be active references.
*/
public synchronized void forceRelease() {
try {
if (store != null) {
store.close();
}
} catch (IOException e) {
LOG.warn("Error closing timeline store", e);
}
store = null;
// reset offsets so next time logs are re-parsed
for (LogInfo log : appLogs.getDetailLogs()) {
if (log.getFilename().contains(groupId.toString())) {
log.setOffset(0);
}
}
LOG.debug("Cache for group {} released. ", groupId);
}
private boolean needRefresh() {
return (Time.monotonicNow() - lastRefresh > 10000);
}
private void updateRefreshTimeToNow() {
this.lastRefresh = Time.monotonicNow();
}
}