blob: 9727850236afc7989e85eb53dbf6871838b564f3 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.timelineservice.collector;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter;
import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Class that manages adding and removing collectors and their lifecycle. It
* provides thread safety access to the collectors inside.
*
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class TimelineCollectorManager extends CompositeService {
private static final Logger LOG =
LoggerFactory.getLogger(TimelineCollectorManager.class);
private TimelineWriter writer;
private ScheduledExecutorService writerFlusher;
private int flushInterval;
private boolean writerFlusherRunning;
@Override
protected void serviceInit(Configuration conf) throws Exception {
writer = createTimelineWriter(conf);
writer.init(conf);
// create a single dedicated thread for flushing the writer on a periodic
// basis
writerFlusher = Executors.newSingleThreadScheduledExecutor();
flushInterval = conf.getInt(
YarnConfiguration.
TIMELINE_SERVICE_WRITER_FLUSH_INTERVAL_SECONDS,
YarnConfiguration.
DEFAULT_TIMELINE_SERVICE_WRITER_FLUSH_INTERVAL_SECONDS);
super.serviceInit(conf);
}
private TimelineWriter createTimelineWriter(final Configuration conf) {
String timelineWriterClassName = conf.get(
YarnConfiguration.TIMELINE_SERVICE_WRITER_CLASS,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WRITER_CLASS);
LOG.info("Using TimelineWriter: " + timelineWriterClassName);
try {
Class<?> timelineWriterClazz = Class.forName(timelineWriterClassName);
if (TimelineWriter.class.isAssignableFrom(timelineWriterClazz)) {
return (TimelineWriter) ReflectionUtils.newInstance(
timelineWriterClazz, conf);
} else {
throw new YarnRuntimeException("Class: " + timelineWriterClassName
+ " not instance of " + TimelineWriter.class.getCanonicalName());
}
} catch (ClassNotFoundException e) {
throw new YarnRuntimeException("Could not instantiate TimelineWriter: "
+ timelineWriterClassName, e);
}
}
@Override
protected void serviceStart() throws Exception {
super.serviceStart();
if (writer != null) {
writer.start();
}
// schedule the flush task
writerFlusher.scheduleAtFixedRate(new WriterFlushTask(writer),
flushInterval, flushInterval, TimeUnit.SECONDS);
writerFlusherRunning = true;
}
// access to this map is synchronized with the map itself
private final Map<ApplicationId, TimelineCollector> collectors =
Collections.synchronizedMap(
new HashMap<ApplicationId, TimelineCollector>());
public TimelineCollectorManager(String name) {
super(name);
}
protected TimelineWriter getWriter() {
return writer;
}
/**
* Put the collector into the collection if an collector mapped by id does
* not exist.
*
* @param appId Application Id for which collector needs to be put.
* @param collector timeline collector to be put.
* @throws YarnRuntimeException if there was any exception in initializing
* and starting the app level service
* @return the collector associated with id after the potential put.
*/
public TimelineCollector putIfAbsent(ApplicationId appId,
TimelineCollector collector) {
TimelineCollector collectorInTable = null;
synchronized (collectors) {
collectorInTable = collectors.get(appId);
if (collectorInTable == null) {
try {
// initialize, start, and add it to the collection so it can be
// cleaned up when the parent shuts down
collector.init(getConfig());
collector.setWriter(writer);
collector.start();
collectors.put(appId, collector);
LOG.info("the collector for " + appId + " was added");
collectorInTable = collector;
postPut(appId, collectorInTable);
} catch (Exception e) {
throw new YarnRuntimeException(e);
}
} else {
LOG.info("the collector for " + appId + " already exists!");
}
}
return collectorInTable;
}
/**
* Callback handler for the timeline collector manager when a collector has
* been added into the collector map.
* @param appId Application id of the collector.
* @param collector The actual timeline collector that has been added.
*/
public void postPut(ApplicationId appId, TimelineCollector collector) {
doPostPut(appId, collector);
collector.setReadyToAggregate();
}
/**
* A template method that will be called by
* {@link #postPut(ApplicationId, TimelineCollector)}.
* @param appId Application id of the collector.
* @param collector The actual timeline collector that has been added.
*/
protected void doPostPut(ApplicationId appId, TimelineCollector collector) {
}
/**
* Removes the collector for the specified id. The collector is also stopped
* as a result. If the collector does not exist, no change is made.
*
* @param appId Application Id to remove.
* @return whether it was removed successfully
*/
public boolean remove(ApplicationId appId) {
TimelineCollector collector = collectors.remove(appId);
if (collector == null) {
LOG.error("the collector for " + appId + " does not exist!");
} else {
synchronized (collector) {
postRemove(appId, collector);
// stop the service to do clean up
collector.stop();
}
LOG.info("The collector service for " + appId + " was removed");
}
return collector != null;
}
protected void postRemove(ApplicationId appId, TimelineCollector collector) {
}
/**
* Returns the collector for the specified id.
*
* @param appId Application Id for which we need to get the collector.
* @return the collector or null if it does not exist
*/
public TimelineCollector get(ApplicationId appId) {
return collectors.get(appId);
}
/**
* Returns whether the collector for the specified id exists in this
* collection.
* @param appId Application Id.
* @return true if collector for the app id is found, false otherwise.
*/
public boolean containsTimelineCollector(ApplicationId appId) {
return collectors.containsKey(appId);
}
@Override
protected void serviceStop() throws Exception {
if (collectors != null && collectors.size() > 0) {
synchronized (collectors) {
for (TimelineCollector c : collectors.values()) {
c.serviceStop();
}
}
}
// stop the flusher first
if (writerFlusher != null) {
writerFlusher.shutdown();
writerFlusherRunning = false;
if (!writerFlusher.awaitTermination(30, TimeUnit.SECONDS)) {
// in reality it should be ample time for the flusher task to finish
// even if it times out, writers may be able to handle closing in this
// situation fine
// proceed to close the writer
LOG.warn("failed to stop the flusher task in time. " +
"will still proceed to close the writer.");
}
}
if (writer != null) {
writer.close();
}
super.serviceStop();
}
@VisibleForTesting
boolean writerFlusherRunning() {
return writerFlusherRunning;
}
/**
* Task that invokes the flush operation on the timeline writer.
*/
private static class WriterFlushTask implements Runnable {
private final TimelineWriter writer;
public WriterFlushTask(TimelineWriter writer) {
this.writer = writer;
}
public void run() {
try {
// synchronize on the writer object to avoid flushing timeline
// entities placed on the buffer by synchronous putEntities
// requests.
synchronized (writer) {
writer.flush();
}
} catch (Throwable th) {
// we need to handle all exceptions or subsequent execution may be
// suppressed
LOG.error("exception during timeline writer flush!", th);
}
}
}
}