| /* | |
| * AbstractMetricsContext.java | |
| * | |
| * Licensed to the Apache Software Foundation (ASF) under one | |
| * or more contributor license agreements. See the NOTICE file | |
| * distributed with this work for additional information | |
| * regarding copyright ownership. The ASF licenses this file | |
| * to you under the Apache License, Version 2.0 (the | |
| * "License"); you may not use this file except in compliance | |
| * with the License. You may obtain a copy of the License at | |
| * | |
| * http://www.apache.org/licenses/LICENSE-2.0 | |
| * | |
| * Unless required by applicable law or agreed to in writing, software | |
| * distributed under the License is distributed on an "AS IS" BASIS, | |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
| * See the License for the specific language governing permissions and | |
| * limitations under the License. | |
| */ | |
| package org.apache.hadoop.metrics.spi; | |
| import java.io.IOException; | |
| import java.util.ArrayList; | |
| import java.util.Collection; | |
| import java.util.HashMap; | |
| import java.util.HashSet; | |
| import java.util.Iterator; | |
| import java.util.List; | |
| import java.util.Map; | |
| import java.util.Set; | |
| import java.util.Timer; | |
| import java.util.TimerTask; | |
| import java.util.TreeMap; | |
| import java.util.Map.Entry; | |
| import org.apache.hadoop.classification.InterfaceAudience; | |
| import org.apache.hadoop.classification.InterfaceStability; | |
| import org.apache.hadoop.metrics.ContextFactory; | |
| import org.apache.hadoop.metrics.MetricsContext; | |
| import org.apache.hadoop.metrics.MetricsException; | |
| import org.apache.hadoop.metrics.MetricsRecord; | |
| import org.apache.hadoop.metrics.Updater; | |
| /** | |
| * The main class of the Service Provider Interface. This class should be | |
| * extended in order to integrate the Metrics API with a specific metrics | |
| * client library. <p/> | |
| * | |
| * This class implements the internal table of metric data, and the timer | |
| * on which data is to be sent to the metrics system. Subclasses must | |
| * override the abstract <code>emitRecord</code> method in order to transmit | |
| * the data. <p/> | |
| */ | |
| @InterfaceAudience.Public | |
| @InterfaceStability.Evolving | |
| public abstract class AbstractMetricsContext implements MetricsContext { | |
| private int period = MetricsContext.DEFAULT_PERIOD; | |
| private Timer timer = null; | |
| private Set<Updater> updaters = new HashSet<Updater>(1); | |
| private volatile boolean isMonitoring = false; | |
| private ContextFactory factory = null; | |
| private String contextName = null; | |
| @InterfaceAudience.Private | |
| public static class TagMap extends TreeMap<String,Object> { | |
| private static final long serialVersionUID = 3546309335061952993L; | |
| TagMap() { | |
| super(); | |
| } | |
| TagMap(TagMap orig) { | |
| super(orig); | |
| } | |
| /** | |
| * Returns true if this tagmap contains every tag in other. | |
| */ | |
| public boolean containsAll(TagMap other) { | |
| for (Map.Entry<String,Object> entry : other.entrySet()) { | |
| Object value = get(entry.getKey()); | |
| if (value == null || !value.equals(entry.getValue())) { | |
| // either key does not exist here, or the value is different | |
| return false; | |
| } | |
| } | |
| return true; | |
| } | |
| } | |
| @InterfaceAudience.Private | |
| public static class MetricMap extends TreeMap<String,Number> { | |
| private static final long serialVersionUID = -7495051861141631609L; | |
| MetricMap() { | |
| super(); | |
| } | |
| MetricMap(MetricMap orig) { | |
| super(orig); | |
| } | |
| } | |
| static class RecordMap extends HashMap<TagMap,MetricMap> { | |
| private static final long serialVersionUID = 259835619700264611L; | |
| } | |
| private Map<String,RecordMap> bufferedData = new HashMap<String,RecordMap>(); | |
| /** | |
| * Creates a new instance of AbstractMetricsContext | |
| */ | |
| protected AbstractMetricsContext() { | |
| } | |
| /** | |
| * Initializes the context. | |
| */ | |
| public void init(String contextName, ContextFactory factory) | |
| { | |
| this.contextName = contextName; | |
| this.factory = factory; | |
| } | |
| /** | |
| * Convenience method for subclasses to access factory attributes. | |
| */ | |
| protected String getAttribute(String attributeName) { | |
| String factoryAttribute = contextName + "." + attributeName; | |
| return (String) factory.getAttribute(factoryAttribute); | |
| } | |
| /** | |
| * Returns an attribute-value map derived from the factory attributes | |
| * by finding all factory attributes that begin with | |
| * <i>contextName</i>.<i>tableName</i>. The returned map consists of | |
| * those attributes with the contextName and tableName stripped off. | |
| */ | |
| protected Map<String,String> getAttributeTable(String tableName) { | |
| String prefix = contextName + "." + tableName + "."; | |
| Map<String,String> result = new HashMap<String,String>(); | |
| for (String attributeName : factory.getAttributeNames()) { | |
| if (attributeName.startsWith(prefix)) { | |
| String name = attributeName.substring(prefix.length()); | |
| String value = (String) factory.getAttribute(attributeName); | |
| result.put(name, value); | |
| } | |
| } | |
| return result; | |
| } | |
| /** | |
| * Returns the context name. | |
| */ | |
| public String getContextName() { | |
| return contextName; | |
| } | |
| /** | |
| * Returns the factory by which this context was created. | |
| */ | |
| public ContextFactory getContextFactory() { | |
| return factory; | |
| } | |
| /** | |
| * Starts or restarts monitoring, the emitting of metrics records. | |
| */ | |
| public synchronized void startMonitoring() | |
| throws IOException { | |
| if (!isMonitoring) { | |
| startTimer(); | |
| isMonitoring = true; | |
| } | |
| } | |
| /** | |
| * Stops monitoring. This does not free buffered data. | |
| * @see #close() | |
| */ | |
| public synchronized void stopMonitoring() { | |
| if (isMonitoring) { | |
| stopTimer(); | |
| isMonitoring = false; | |
| } | |
| } | |
| /** | |
| * Returns true if monitoring is currently in progress. | |
| */ | |
| public boolean isMonitoring() { | |
| return isMonitoring; | |
| } | |
| /** | |
| * Stops monitoring and frees buffered data, returning this | |
| * object to its initial state. | |
| */ | |
| public synchronized void close() { | |
| stopMonitoring(); | |
| clearUpdaters(); | |
| } | |
| /** | |
| * Creates a new AbstractMetricsRecord instance with the given <code>recordName</code>. | |
| * Throws an exception if the metrics implementation is configured with a fixed | |
| * set of record names and <code>recordName</code> is not in that set. | |
| * | |
| * @param recordName the name of the record | |
| * @throws MetricsException if recordName conflicts with configuration data | |
| */ | |
| public final synchronized MetricsRecord createRecord(String recordName) { | |
| if (bufferedData.get(recordName) == null) { | |
| bufferedData.put(recordName, new RecordMap()); | |
| } | |
| return newRecord(recordName); | |
| } | |
| /** | |
| * Subclasses should override this if they subclass MetricsRecordImpl. | |
| * @param recordName the name of the record | |
| * @return newly created instance of MetricsRecordImpl or subclass | |
| */ | |
| protected MetricsRecord newRecord(String recordName) { | |
| return new MetricsRecordImpl(recordName, this); | |
| } | |
| /** | |
| * Registers a callback to be called at time intervals determined by | |
| * the configuration. | |
| * | |
| * @param updater object to be run periodically; it should update | |
| * some metrics records | |
| */ | |
| public synchronized void registerUpdater(final Updater updater) { | |
| if (!updaters.contains(updater)) { | |
| updaters.add(updater); | |
| } | |
| } | |
| /** | |
| * Removes a callback, if it exists. | |
| * | |
| * @param updater object to be removed from the callback list | |
| */ | |
| public synchronized void unregisterUpdater(Updater updater) { | |
| updaters.remove(updater); | |
| } | |
| private synchronized void clearUpdaters() { | |
| updaters.clear(); | |
| } | |
| /** | |
| * Starts timer if it is not already started | |
| */ | |
| private synchronized void startTimer() { | |
| if (timer == null) { | |
| timer = new Timer("Timer thread for monitoring " + getContextName(), | |
| true); | |
| TimerTask task = new TimerTask() { | |
| public void run() { | |
| try { | |
| timerEvent(); | |
| } | |
| catch (IOException ioe) { | |
| ioe.printStackTrace(); | |
| } | |
| } | |
| }; | |
| long millis = period * 1000; | |
| timer.scheduleAtFixedRate(task, millis, millis); | |
| } | |
| } | |
| /** | |
| * Stops timer if it is running | |
| */ | |
| private synchronized void stopTimer() { | |
| if (timer != null) { | |
| timer.cancel(); | |
| timer = null; | |
| } | |
| } | |
| /** | |
| * Timer callback. | |
| */ | |
| private void timerEvent() throws IOException { | |
| if (isMonitoring) { | |
| Collection<Updater> myUpdaters; | |
| synchronized (this) { | |
| myUpdaters = new ArrayList<Updater>(updaters); | |
| } | |
| // Run all the registered updates without holding a lock | |
| // on this context | |
| for (Updater updater : myUpdaters) { | |
| try { | |
| updater.doUpdates(this); | |
| } | |
| catch (Throwable throwable) { | |
| throwable.printStackTrace(); | |
| } | |
| } | |
| emitRecords(); | |
| } | |
| } | |
| /** | |
| * Emits the records. | |
| */ | |
| private synchronized void emitRecords() throws IOException { | |
| for (String recordName : bufferedData.keySet()) { | |
| RecordMap recordMap = bufferedData.get(recordName); | |
| synchronized (recordMap) { | |
| Set<Entry<TagMap, MetricMap>> entrySet = recordMap.entrySet (); | |
| for (Entry<TagMap, MetricMap> entry : entrySet) { | |
| OutputRecord outRec = new OutputRecord(entry.getKey(), entry.getValue()); | |
| emitRecord(contextName, recordName, outRec); | |
| } | |
| } | |
| } | |
| flush(); | |
| } | |
| /** | |
| * Retrieves all the records managed by this MetricsContext. | |
| * Useful for monitoring systems that are polling-based. | |
| * @return A non-null collection of all monitoring records. | |
| */ | |
| public synchronized Map<String, Collection<OutputRecord>> getAllRecords() { | |
| Map<String, Collection<OutputRecord>> out = new TreeMap<String, Collection<OutputRecord>>(); | |
| for (String recordName : bufferedData.keySet()) { | |
| RecordMap recordMap = bufferedData.get(recordName); | |
| synchronized (recordMap) { | |
| List<OutputRecord> records = new ArrayList<OutputRecord>(); | |
| Set<Entry<TagMap, MetricMap>> entrySet = recordMap.entrySet(); | |
| for (Entry<TagMap, MetricMap> entry : entrySet) { | |
| OutputRecord outRec = new OutputRecord(entry.getKey(), entry.getValue()); | |
| records.add(outRec); | |
| } | |
| out.put(recordName, records); | |
| } | |
| } | |
| return out; | |
| } | |
| /** | |
| * Sends a record to the metrics system. | |
| */ | |
| protected abstract void emitRecord(String contextName, String recordName, | |
| OutputRecord outRec) throws IOException; | |
| /** | |
| * Called each period after all records have been emitted, this method does nothing. | |
| * Subclasses may override it in order to perform some kind of flush. | |
| */ | |
| protected void flush() throws IOException { | |
| } | |
| /** | |
| * Called by MetricsRecordImpl.update(). Creates or updates a row in | |
| * the internal table of metric data. | |
| */ | |
| protected void update(MetricsRecordImpl record) { | |
| String recordName = record.getRecordName(); | |
| TagMap tagTable = record.getTagTable(); | |
| Map<String,MetricValue> metricUpdates = record.getMetricTable(); | |
| RecordMap recordMap = getRecordMap(recordName); | |
| synchronized (recordMap) { | |
| MetricMap metricMap = recordMap.get(tagTable); | |
| if (metricMap == null) { | |
| metricMap = new MetricMap(); | |
| TagMap tagMap = new TagMap(tagTable); // clone tags | |
| recordMap.put(tagMap, metricMap); | |
| } | |
| Set<Entry<String, MetricValue>> entrySet = metricUpdates.entrySet(); | |
| for (Entry<String, MetricValue> entry : entrySet) { | |
| String metricName = entry.getKey (); | |
| MetricValue updateValue = entry.getValue (); | |
| Number updateNumber = updateValue.getNumber(); | |
| Number currentNumber = metricMap.get(metricName); | |
| if (currentNumber == null || updateValue.isAbsolute()) { | |
| metricMap.put(metricName, updateNumber); | |
| } | |
| else { | |
| Number newNumber = sum(updateNumber, currentNumber); | |
| metricMap.put(metricName, newNumber); | |
| } | |
| } | |
| } | |
| } | |
| private synchronized RecordMap getRecordMap(String recordName) { | |
| return bufferedData.get(recordName); | |
| } | |
| /** | |
| * Adds two numbers, coercing the second to the type of the first. | |
| * | |
| */ | |
| private Number sum(Number a, Number b) { | |
| if (a instanceof Integer) { | |
| return Integer.valueOf(a.intValue() + b.intValue()); | |
| } | |
| else if (a instanceof Float) { | |
| return new Float(a.floatValue() + b.floatValue()); | |
| } | |
| else if (a instanceof Short) { | |
| return Short.valueOf((short)(a.shortValue() + b.shortValue())); | |
| } | |
| else if (a instanceof Byte) { | |
| return Byte.valueOf((byte)(a.byteValue() + b.byteValue())); | |
| } | |
| else if (a instanceof Long) { | |
| return Long.valueOf((a.longValue() + b.longValue())); | |
| } | |
| else { | |
| // should never happen | |
| throw new MetricsException("Invalid number type"); | |
| } | |
| } | |
| /** | |
| * Called by MetricsRecordImpl.remove(). Removes all matching rows in | |
| * the internal table of metric data. A row matches if it has the same | |
| * tag names and values as record, but it may also have additional | |
| * tags. | |
| */ | |
| protected void remove(MetricsRecordImpl record) { | |
| String recordName = record.getRecordName(); | |
| TagMap tagTable = record.getTagTable(); | |
| RecordMap recordMap = getRecordMap(recordName); | |
| synchronized (recordMap) { | |
| Iterator<TagMap> it = recordMap.keySet().iterator(); | |
| while (it.hasNext()) { | |
| TagMap rowTags = it.next(); | |
| if (rowTags.containsAll(tagTable)) { | |
| it.remove(); | |
| } | |
| } | |
| } | |
| } | |
| /** | |
| * Returns the timer period. | |
| */ | |
| public int getPeriod() { | |
| return period; | |
| } | |
| /** | |
| * Sets the timer period | |
| */ | |
| protected void setPeriod(int period) { | |
| this.period = period; | |
| } | |
| /** | |
| * If a period is set in the attribute passed in, override | |
| * the default with it. | |
| */ | |
| protected void parseAndSetPeriod(String attributeName) { | |
| String periodStr = getAttribute(attributeName); | |
| if (periodStr != null) { | |
| int period = 0; | |
| try { | |
| period = Integer.parseInt(periodStr); | |
| } catch (NumberFormatException nfe) { | |
| } | |
| if (period <= 0) { | |
| throw new MetricsException("Invalid period: " + periodStr); | |
| } | |
| setPeriod(period); | |
| } | |
| } | |
| } |