/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package com.datatorrent.lib.logs;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import javax.validation.constraints.NotNull;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.commons.lang.mutable.MutableDouble;

import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.Operator;
import com.datatorrent.lib.util.KeyValPair;

/**
 * This class aggregates the value of given dimension across windows.
 * <p></p>
 * @displayName Multi Window Dimension Aggregation
 * @category Stats and Aggregations
 * @tags aggregation
 *
 * @since 0.3.4
 */
public class MultiWindowDimensionAggregation implements Operator
{

  @SuppressWarnings("unused")
  private static final Logger logger = LoggerFactory.getLogger(MultiWindowDimensionAggregation.class);

  public enum AggregateOperation
  {
    SUM, AVERAGE
  }

  private int windowSize = 2;
  private int currentWindow = 0;
  private String timeBucket = "m";
  private String dimensionKeyVal = "0";
  private List<String> dimensionArrayString;
  @NotNull
  private List<int[]> dimensionArray;
  private AggregateOperation operationType = AggregateOperation.SUM;
  private Map<String, Map<String, KeyValPair<MutableDouble, Integer>>> outputMap;
  private Map<Integer, Map<String, Map<String, Number>>> cacheOject;

  private transient List<Pattern> patternList;
  private transient int applicationWindowSize = 500;
  /**
   * This is the output port which emits aggregated dimensions.
   */
  public final transient DefaultOutputPort<Map<String, DimensionObject<String>>> output = new DefaultOutputPort<Map<String, DimensionObject<String>>>();
  /**
   * This is the input port which receives multi dimensional data.
   */
  public final transient DefaultInputPort<Map<String, Map<String, Number>>> data = new DefaultInputPort<Map<String, Map<String, Number>>>()
  {
    @Override
    public void process(Map<String, Map<String, Number>> tuple)
    {
      cacheOject.put(currentWindow, tuple);
      for (Map.Entry<String, Map<String, Number>> tupleEntry : tuple.entrySet()) {
        String tupleKey = tupleEntry.getKey();
        Map<String, Number> tupleValue = tupleEntry.getValue();
        int currentPattern = 0;
        for (Pattern pattern : patternList) {
          Matcher matcher = pattern.matcher(tupleKey);
          if (matcher.matches()) {
            String currentPatternString = dimensionArrayString.get(currentPattern);
            Map<String, KeyValPair<MutableDouble, Integer>> currentPatternMap = outputMap.get(currentPatternString);
            if (currentPatternMap == null) {
              currentPatternMap = new HashMap<String, KeyValPair<MutableDouble, Integer>>();
              outputMap.put(currentPatternString, currentPatternMap);
            }
            StringBuilder builder = new StringBuilder(matcher.group(2));
            for (int i = 1; i < dimensionArray.get(currentPattern).length; i++) {
              builder.append("," + matcher.group(i + 2));
            }

            KeyValPair<MutableDouble, Integer> currentDimensionKeyValPair = currentPatternMap.get(builder.toString());
            if (currentDimensionKeyValPair == null) {
              currentDimensionKeyValPair = new KeyValPair<MutableDouble, Integer>(new MutableDouble(tupleValue.get(dimensionKeyVal)), 1);
              currentPatternMap.put(builder.toString(), currentDimensionKeyValPair);
            } else {
              currentDimensionKeyValPair.getKey().add(tupleValue.get(dimensionKeyVal));
              currentDimensionKeyValPair.setValue(currentDimensionKeyValPair.getValue() + 1);
            }
            break;
          }
          currentPattern++;
        }

      }
    }
  };

  public String getDimensionKeyVal()
  {
    return dimensionKeyVal;
  }

  public void setDimensionKeyVal(String dimensionKeyVal)
  {
    this.dimensionKeyVal = dimensionKeyVal;
  }

  public String getTimeBucket()
  {
    return timeBucket;
  }

  public void setTimeBucket(String timeBucket)
  {
    this.timeBucket = timeBucket;
  }

  public List<int[]> getDimensionArray()
  {
    return dimensionArray;
  }

  public void setDimensionArray(List<int[]> dimensionArray)
  {
    this.dimensionArray = dimensionArray;
    dimensionArrayString = new ArrayList<String>();
    for (int[] e : dimensionArray) {
      StringBuilder builder = new StringBuilder("" + e[0]);
      for (int i = 1; i < e.length; i++) {
        builder.append("," + e[i]);
      }
      dimensionArrayString.add(builder.toString());

    }

  }

  public int getWindowSize()
  {
    return windowSize;
  }

  public void setWindowSize(int windowSize)
  {
    this.windowSize = windowSize;
  }

  @Override
  public void setup(OperatorContext arg0)
  {
    if (arg0 != null) {
      applicationWindowSize = arg0.getValue(OperatorContext.APPLICATION_WINDOW_COUNT);
    }
    if (cacheOject == null) {
      cacheOject = new HashMap<Integer, Map<String, Map<String, Number>>>(windowSize);
    }
    if (outputMap == null) {
      outputMap = new HashMap<String, Map<String, KeyValPair<MutableDouble, Integer>>>();
    }
    setUpPatternList();

  }

  private void setUpPatternList()
  {
    patternList = new ArrayList<Pattern>();
    for (int[] e : dimensionArray) {
      Pattern pattern;
      StringBuilder builder = new StringBuilder(timeBucket + "\\|(\\d+)");
      for (int i = 0; i < e.length; i++) {
        builder.append("\\|" + e[i] + ":([^\\|]+)");
      }

      pattern = Pattern.compile(builder.toString());
      patternList.add(pattern);
    }
  }

  @Override
  public void teardown()
  {

  }

  @Override
  public void beginWindow(long arg0)
  {
    Map<String, Map<String, Number>> currentWindowMap = cacheOject.get(currentWindow);
    if (currentWindowMap == null) {
      currentWindowMap = new HashMap<String, Map<String, Number>>();
    } else {
      for (Map.Entry<String, Map<String, Number>> tupleEntry : currentWindowMap.entrySet()) {
        String tupleKey = tupleEntry.getKey();
        Map<String, Number> tupleValue = tupleEntry.getValue();
        int currentPattern = 0;
        for (Pattern pattern : patternList) {
          Matcher matcher = pattern.matcher(tupleKey);
          if (matcher.matches()) {
            String currentPatternString = dimensionArrayString.get(currentPattern);
            Map<String, KeyValPair<MutableDouble, Integer>> currentPatternMap = outputMap.get(currentPatternString);
            if (currentPatternMap != null) {
              StringBuilder builder = new StringBuilder(matcher.group(2));
              for (int i = 1; i < dimensionArray.get(currentPattern).length; i++) {
                builder.append("," + matcher.group(i + 2));
              }
              KeyValPair<MutableDouble, Integer> currentDimensionKeyValPair = currentPatternMap.get(builder.toString());
              if (currentDimensionKeyValPair != null) {
                currentDimensionKeyValPair.getKey().add(0 - tupleValue.get(dimensionKeyVal).doubleValue());
                currentDimensionKeyValPair.setValue(currentDimensionKeyValPair.getValue() - 1);
                if (currentDimensionKeyValPair.getKey().doubleValue() == 0.0) {
                  currentPatternMap.remove(builder.toString());
                }
              }
            }
            break;
          }
          currentPattern++;
        }

      }
    }
    currentWindowMap.clear();
    if (patternList == null || patternList.isEmpty()) {
      setUpPatternList();
    }

  }

  @Override
  public void endWindow()
  {
    int totalWindowsOccupied = cacheOject.size();
    for (Map.Entry<String, Map<String, KeyValPair<MutableDouble, Integer>>> e : outputMap.entrySet()) {
      for (Map.Entry<String, KeyValPair<MutableDouble, Integer>> dimensionValObj : e.getValue().entrySet()) {
        Map<String, DimensionObject<String>> outputData = new HashMap<String, DimensionObject<String>>();
        KeyValPair<MutableDouble, Integer> keyVal = dimensionValObj.getValue();
        if (operationType == AggregateOperation.SUM) {
          outputData.put(e.getKey(), new DimensionObject<String>(keyVal.getKey(), dimensionValObj.getKey()));
        } else if (operationType == AggregateOperation.AVERAGE) {
          if (keyVal.getValue() != 0) {
            double totalCount = ((double)(totalWindowsOccupied * applicationWindowSize)) / 1000;
            outputData.put(e.getKey(), new DimensionObject<String>(new MutableDouble(keyVal.getKey().doubleValue() / totalCount), dimensionValObj.getKey()));
          }
        }
        if (!outputData.isEmpty()) {
          output.emit(outputData);
        }
      }
    }
    currentWindow = (currentWindow + 1) % windowSize;

  }

  public AggregateOperation getOperationType()
  {
    return operationType;
  }

  public void setOperationType(AggregateOperation operationType)
  {
    this.operationType = operationType;
  }

}
