/*
 * AbstractMetricsContext.java
 *
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.metrics.spi;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.TreeMap;
import java.util.Map.Entry;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.metrics.ContextFactory;
import org.apache.hadoop.metrics.MetricsContext;
import org.apache.hadoop.metrics.MetricsException;
import org.apache.hadoop.metrics.MetricsRecord;
import org.apache.hadoop.metrics.Updater;

/**
 * The main class of the Service Provider Interface.  This class should be
 * extended in order to integrate the Metrics API with a specific metrics
 * client library. <p/>
 *
 * This class implements the internal table of metric data, and the timer
 * on which data is to be sent to the metrics system.  Subclasses must
 * override the abstract <code>emitRecord</code> method in order to transmit
 * the data. <p/>
 */
@InterfaceAudience.Public
@InterfaceStability.Evolving
public abstract class AbstractMetricsContext implements MetricsContext {
    
  private int period = MetricsContext.DEFAULT_PERIOD;
  private Timer timer = null;
    
  private Set<Updater> updaters = new HashSet<Updater>(1);
  private volatile boolean isMonitoring = false;
    
  private ContextFactory factory = null;
  private String contextName = null;
    
  @InterfaceAudience.Private
  public static class TagMap extends TreeMap<String,Object> {
    private static final long serialVersionUID = 3546309335061952993L;
    TagMap() {
      super();
    }
    TagMap(TagMap orig) {
      super(orig);
    }
    /**
     * Returns true if this tagmap contains every tag in other.
     */
    public boolean containsAll(TagMap other) {
      for (Map.Entry<String,Object> entry : other.entrySet()) {
        Object value = get(entry.getKey());
        if (value == null || !value.equals(entry.getValue())) {
          // either key does not exist here, or the value is different
          return false;
        }
      }
      return true;
    }
  }
  
  @InterfaceAudience.Private
  public static class MetricMap extends TreeMap<String,Number> {
    private static final long serialVersionUID = -7495051861141631609L;
    MetricMap() {
      super();
    }
    MetricMap(MetricMap orig) {
      super(orig);
    }
  }
            
  static class RecordMap extends HashMap<TagMap,MetricMap> {
    private static final long serialVersionUID = 259835619700264611L;
  }
    
  private Map<String,RecordMap> bufferedData = new HashMap<String,RecordMap>();
    

  /**
   * Creates a new instance of AbstractMetricsContext
   */
  protected AbstractMetricsContext() {
  }
    
  /**
   * Initializes the context.
   */
  public void init(String contextName, ContextFactory factory) 
  {
    this.contextName = contextName;
    this.factory = factory;
  }
    
  /**
   * Convenience method for subclasses to access factory attributes.
   */
  protected String getAttribute(String attributeName) {
    String factoryAttribute = contextName + "." + attributeName;
    return (String) factory.getAttribute(factoryAttribute);  
  }
    
  /**
   * Returns an attribute-value map derived from the factory attributes
   * by finding all factory attributes that begin with 
   * <i>contextName</i>.<i>tableName</i>.  The returned map consists of
   * those attributes with the contextName and tableName stripped off.
   */
  protected Map<String,String> getAttributeTable(String tableName) {
    String prefix = contextName + "." + tableName + ".";
    Map<String,String> result = new HashMap<String,String>();
    for (String attributeName : factory.getAttributeNames()) {
      if (attributeName.startsWith(prefix)) {
        String name = attributeName.substring(prefix.length());
        String value = (String) factory.getAttribute(attributeName);
        result.put(name, value);
      }
    }
    return result;
  }
    
  /**
   * Returns the context name.
   */
  public String getContextName() {
    return contextName;
  }
    
  /**
   * Returns the factory by which this context was created.
   */
  public ContextFactory getContextFactory() {
    return factory;
  }
    
  /**
   * Starts or restarts monitoring, the emitting of metrics records.
   */
  public synchronized void startMonitoring()
    throws IOException {
    if (!isMonitoring) {
      startTimer();
      isMonitoring = true;
    }
  }
    
  /**
   * Stops monitoring.  This does not free buffered data. 
   * @see #close()
   */
  public synchronized void stopMonitoring() {
    if (isMonitoring) {
      stopTimer();
      isMonitoring = false;
    }
  }
    
  /**
   * Returns true if monitoring is currently in progress.
   */
  public boolean isMonitoring() {
    return isMonitoring;
  }
    
  /**
   * Stops monitoring and frees buffered data, returning this
   * object to its initial state.  
   */
  public synchronized void close() {
    stopMonitoring();
    clearUpdaters();
  } 
    
  /**
   * Creates a new AbstractMetricsRecord instance with the given <code>recordName</code>.
   * Throws an exception if the metrics implementation is configured with a fixed
   * set of record names and <code>recordName</code> is not in that set.
   * 
   * @param recordName the name of the record
   * @throws MetricsException if recordName conflicts with configuration data
   */
  public final synchronized MetricsRecord createRecord(String recordName) {
    if (bufferedData.get(recordName) == null) {
      bufferedData.put(recordName, new RecordMap());
    }
    return newRecord(recordName);
  }
    
  /**
   * Subclasses should override this if they subclass MetricsRecordImpl.
   * @param recordName the name of the record
   * @return newly created instance of MetricsRecordImpl or subclass
   */
  protected MetricsRecord newRecord(String recordName) {
    return new MetricsRecordImpl(recordName, this);
  }
    
  /**
   * Registers a callback to be called at time intervals determined by
   * the configuration.
   *
   * @param updater object to be run periodically; it should update
   * some metrics records 
   */
  public synchronized void registerUpdater(final Updater updater) {
    if (!updaters.contains(updater)) {
      updaters.add(updater);
    }
  }
    
  /**
   * Removes a callback, if it exists.
   *
   * @param updater object to be removed from the callback list
   */
  public synchronized void unregisterUpdater(Updater updater) {
    updaters.remove(updater);
  }
    
  private synchronized void clearUpdaters() {
    updaters.clear();
  }
    
  /**
   * Starts timer if it is not already started
   */
  private synchronized void startTimer() {
    if (timer == null) {
      timer = new Timer("Timer thread for monitoring " + getContextName(), 
                        true);
      TimerTask task = new TimerTask() {
          public void run() {
            try {
              timerEvent();
            }
            catch (IOException ioe) {
              ioe.printStackTrace();
            }
          }
        };
      long millis = period * 1000;
      timer.scheduleAtFixedRate(task, millis, millis);
    }
  }
    
  /**
   * Stops timer if it is running
   */
  private synchronized void stopTimer() {
    if (timer != null) {
      timer.cancel();
      timer = null;
    }
  }
    
  /**
   * Timer callback.
   */
  private void timerEvent() throws IOException {
    if (isMonitoring) {
      Collection<Updater> myUpdaters;
      synchronized (this) {
        myUpdaters = new ArrayList<Updater>(updaters);
      }     
      // Run all the registered updates without holding a lock
      // on this context
      for (Updater updater : myUpdaters) {
        try {
          updater.doUpdates(this);
        }
        catch (Throwable throwable) {
          throwable.printStackTrace();
        }
      }
      emitRecords();
    }
  }
    
  /**
   *  Emits the records.
   */
  private synchronized void emitRecords() throws IOException {
    for (String recordName : bufferedData.keySet()) {
      RecordMap recordMap = bufferedData.get(recordName);
      synchronized (recordMap) {
        Set<Entry<TagMap, MetricMap>> entrySet = recordMap.entrySet ();
        for (Entry<TagMap, MetricMap> entry : entrySet) {
          OutputRecord outRec = new OutputRecord(entry.getKey(), entry.getValue());
          emitRecord(contextName, recordName, outRec);
        }
      }
    }
    flush();
  }
  
  /**
   * Retrieves all the records managed by this MetricsContext.
   * Useful for monitoring systems that are polling-based.
   * @return A non-null collection of all monitoring records.
   */
  public synchronized Map<String, Collection<OutputRecord>> getAllRecords() {
    Map<String, Collection<OutputRecord>> out = new TreeMap<String, Collection<OutputRecord>>();
    for (String recordName : bufferedData.keySet()) {
      RecordMap recordMap = bufferedData.get(recordName);
      synchronized (recordMap) {
        List<OutputRecord> records = new ArrayList<OutputRecord>();
        Set<Entry<TagMap, MetricMap>> entrySet = recordMap.entrySet();
        for (Entry<TagMap, MetricMap> entry : entrySet) {
          OutputRecord outRec = new OutputRecord(entry.getKey(), entry.getValue());
          records.add(outRec);
        }
        out.put(recordName, records);
      }
    }
    return out;
  }

  /**
   * Sends a record to the metrics system.
   */
  protected abstract void emitRecord(String contextName, String recordName, 
                                     OutputRecord outRec) throws IOException;
    
  /**
   * Called each period after all records have been emitted, this method does nothing.
   * Subclasses may override it in order to perform some kind of flush.
   */
  protected void flush() throws IOException {
  }
    
  /**
   * Called by MetricsRecordImpl.update().  Creates or updates a row in
   * the internal table of metric data.
   */
  protected void update(MetricsRecordImpl record) {
    String recordName = record.getRecordName();
    TagMap tagTable = record.getTagTable();
    Map<String,MetricValue> metricUpdates = record.getMetricTable();
        
    RecordMap recordMap = getRecordMap(recordName);
    synchronized (recordMap) {
      MetricMap metricMap = recordMap.get(tagTable);
      if (metricMap == null) {
        metricMap = new MetricMap();
        TagMap tagMap = new TagMap(tagTable); // clone tags
        recordMap.put(tagMap, metricMap);
      }

      Set<Entry<String, MetricValue>> entrySet = metricUpdates.entrySet();
      for (Entry<String, MetricValue> entry : entrySet) {
        String metricName = entry.getKey ();
        MetricValue updateValue = entry.getValue ();
        Number updateNumber = updateValue.getNumber();
        Number currentNumber = metricMap.get(metricName);
        if (currentNumber == null || updateValue.isAbsolute()) {
          metricMap.put(metricName, updateNumber);
        }
        else {
          Number newNumber = sum(updateNumber, currentNumber);
          metricMap.put(metricName, newNumber);
        }
      }
    }
  }
    
  private synchronized RecordMap getRecordMap(String recordName) {
    return bufferedData.get(recordName);
  }
    
  /**
   * Adds two numbers, coercing the second to the type of the first.
   *
   */
  private Number sum(Number a, Number b) {
    if (a instanceof Integer) {
      return Integer.valueOf(a.intValue() + b.intValue());
    }
    else if (a instanceof Float) {
      return new Float(a.floatValue() + b.floatValue());
    }
    else if (a instanceof Short) {
      return Short.valueOf((short)(a.shortValue() + b.shortValue()));
    }
    else if (a instanceof Byte) {
      return Byte.valueOf((byte)(a.byteValue() + b.byteValue()));
    }
    else if (a instanceof Long) {
      return Long.valueOf((a.longValue() + b.longValue()));
    }
    else {
      // should never happen
      throw new MetricsException("Invalid number type");
    }
            
  }
    
  /**
   * Called by MetricsRecordImpl.remove().  Removes all matching rows in
   * the internal table of metric data.  A row matches if it has the same
   * tag names and values as record, but it may also have additional
   * tags.
   */    
  protected void remove(MetricsRecordImpl record) {
    String recordName = record.getRecordName();
    TagMap tagTable = record.getTagTable();
        
    RecordMap recordMap = getRecordMap(recordName);
    synchronized (recordMap) {
      Iterator<TagMap> it = recordMap.keySet().iterator();
      while (it.hasNext()) {
        TagMap rowTags = it.next();
        if (rowTags.containsAll(tagTable)) {
          it.remove();
        }
      }
    }
  }
    
  /**
   * Returns the timer period.
   */
  public int getPeriod() {
    return period;
  }
    
  /**
   * Sets the timer period
   */
  protected void setPeriod(int period) {
    this.period = period;
  }
  
  /**
   * If a period is set in the attribute passed in, override
   * the default with it.
   */
  protected void parseAndSetPeriod(String attributeName) {
    String periodStr = getAttribute(attributeName);
    if (periodStr != null) {
      int period = 0;
      try {
        period = Integer.parseInt(periodStr);
      } catch (NumberFormatException nfe) {
      }
      if (period <= 0) {
        throw new MetricsException("Invalid period: " + periodStr);
      }
      setPeriod(period);
    }
  }
}
