/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package org.apache.apex.examples.yahoofinance;

import org.apache.apex.malhar.lib.io.ConsoleOutputOperator;
import org.apache.apex.malhar.lib.math.RangeKeyVal;
import org.apache.apex.malhar.lib.math.SumKeyVal;
import org.apache.apex.malhar.lib.multiwindow.SimpleMovingAverage;
import org.apache.apex.malhar.lib.stream.ConsolidatorKeyVal;
import org.apache.apex.malhar.lib.util.BaseKeyValueOperator.DefaultPartitionCodec;
import org.apache.apex.malhar.lib.util.HighLow;
import org.apache.hadoop.conf.Configuration;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.Context.PortContext;
import com.datatorrent.api.DAG;
import com.datatorrent.api.Operator.InputPort;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.annotation.ApplicationAnnotation;

/**
 * Yahoo! Finance Application Example :<br>
 * Get Yahoo finance feed and calculate minute price range, minute volume,
 * simple moving average of 5 minutes. <br>
 * <br>
 * Functional Description : <br>
 * Application samples yahoo finance ticker every 200ms. All data points in one
 * second are streamed from input adapter. <br>
 * <br>
 *
 * Application calculates following Real Time Value(s):<br>
 * <ul>
 * <li>Quotes for IBM, Google, Apple, Yahoo stocks price/volume/time displayed
 * every second.</li>
 * <li>Charts for Stocks in terms for high/low price vs volume for last minute.</li>
 * <li>Simple moving average over last 5 minutes for IBM, Google, Apple, Yahoo
 * stocks.</li>
 * </ul>
 * <br>
 * <br>
 *
 * Custom Attribute : <br>
 * <ul>
 * <li>Application streaming window size(STREAMING_WINDOW_SIZE_MILLIS) = 1 sec,
 * since we are only interested in quotes every second.</li>
 * <li>Range/Minute Volume operator's window size(APPLICATION_WINDOW_COUNT) =
 * 60, aggregate over one minute.</li>
 * <li>Sum operator window length : 300, sliding average over last 5 minutes.</li>
 * </ul>
 * <br>
 *
 * Input Adapter : <br>
 * Stock Tick input operator get yahoo finance real time stock quotes data and
 * pushes application. <br>
 * <br>
 *
 * Output Adapter : <br>
 * Output values are written to console through ConsoleOutputOerator<br>
 * if you need to change write to HDFS,HTTP .. instead of console, <br>
 * Please refer to {@link org.apache.apex.malhar.lib.io.HttpOutputOperator} or
 * {@link org.apache.apex.malhar.lib.io.fs.HdfsOutputOperator}. <br>
 * <br>
 *
 * Run Sample Application : <br>
 * <p>
 * Running Java Test or Main app in IDE:
 *
 * <pre>
 * LocalMode.runApp(new Application(), 600000); // 10 min run
 * </pre>
 *
 * Run Success : <br>
 * For successful deployment and run, user should see following output on
 * console:
 *
 * <pre>
 *  Price SMA: AAPL=435.965
 * Price SMA: GOOG=877.0
 * QUOTE: {AABA=[26.37, 9760360, 4:00pm, null, null], IBM=[203.77, 2899698, 4:00pm, null, null], GOOG=[877.0, 2069614, 4:00pm, null, null], AAPL=[435.965, 10208099, 4:00pm, null, null]}
 * Price SMA: AABA=26.37
 * </pre>
 *
 * Scaling Options : <br>
 * <ul>
 * <li>Volume operator can be replicated using a {@link StatelessPartitioner}
 * on an operator.</li>
 * <li>Range value operator can replicated but using proper unifier
 * operator(read App Dev Guide).</li>
 * <li>Slinging window operator can be replicated with proper unifier operator.</li>
 * </ul>
 * <br>
 *
 * Application DAG : <br>
 * <img src="doc-files/Application.gif" width=600px > <br>
 * <br>
 *
 * Streaming Window Size : 1000 ms(1 Sec) <br>
 * Operator Details : <br>
 * <ul>
 * <li>
 * <p>
 * <b>The operator DailyVolume:</b> This operator reads from the input port,
 * which contains the incremental volume tuples from StockTickInput, and
 * aggregates the data to provide the cumulative volume. It just utilizes the
 * library class SumKeyVal<K,V> provided in math package. In this case,
 * SumKeyVal<String,Long>, where K is the stock symbol, V is the aggregated
 * volume, with cumulative set to true. (Otherwise if cumulative was set to
 * false, SumKeyVal would provide the sum for the application window.) The platform
 * provides a number of built-in operators for simple operations like this so
 * that application developers do not have to write them. More examples to
 * follow. This operator assumes that the application restarts before market
 * opens every day.
 * </p>
 * Class : {@link org.apache.apex.malhar.lib.math.SumKeyVal} <br>
 * Operator Application Window Count : 1 <br>
 * StateFull : Yes, volume gets aggregated every window count.</li>
 *
 * <li>
 * <p>
 * <b>The operator MinuteVolume:</b> This operator reads from the input port,
 * which contains the volume tuples from StockTickInput, and aggregates the data
 * to provide the sum of the volume within one minute. Like the operator
 * DailyVolume, this operator is also SumKeyVal<String,Long>, but with
 * cumulative set to false. Application Window is set to 1 minute. We will
 * explain how to set this later. <br>
 * Class : {@link org.apache.apex.malhar.lib.math.SumKeyVal} <br>
 * Operator App Window Count : 60 (1 Minute) <br>
 * StateFull : Yes, aggregate over last 60 windows.</li>
 *
 * <li>
 * <p>
 * <b>The operator Quote:</b> This operator has three input ports, which are
 * price (from StockTickInput), daily_vol (from Daily Volume), and time (from
 * StockTickInput). This operator just consolidates the three data and and emits
 * the consolidated data. It utilizes the class ConsolidatorKeyVal<K> from
 * stream package.<br>
 * Class : {@link org.apache.apex.malhar.lib.stream.ConsolidatorKeyVal} <br>
 * Operator App Window Count : 1 <br>
 * StateFull : No</li>
 *
 * <li>
 * <p>
 * <b>The operator Chart:</b> This operator is very similar to the operator
 * Quote, except that it takes inputs from High Low and Minute Vol and outputs
 * the consolidated tuples to the output port. <br>
 * Class : {@link org.apache.apex.malhar.lib.stream.ConsolidatorKeyVal} <br>
 * StateFull : No<br>
 * Operator App Window Count : 1</li>
 *
 *
 * <li>
 * <p>
 * <b>The operator PriceSMA:</b> SMA stands for - Simple Moving Average. It
 * reads from the input port, which contains the price tuples from
 * StockTickInput, and provides the moving average price of the stock. It
 * utilizes SimpleMovingAverage<String,Double>, which is provided in multiwindow
 * package. SimpleMovingAverage keeps track of the data of the previous N
 * application windows in a sliding manner. For each end window event, it
 * provides the average of the data in those application windows. <br>
 * Class : {@link org.apache.apex.malhar.lib.multiwindow.SimpleMovingAverage} <br>
 * StateFull : Yes, stores values across application window. <br>
 * Operator App Window : 1 <br>
 * Operator Sliding Window : 300 (5 mins).</li>
 *
 * <li>
 * <p>
 * <b>The operator Console: </b> This operator just outputs the input tuples to
 * the console (or stdout). In this example, there are four console operators,
 * which connect to the output of Quote, Chart, PriceSMA and VolumeSMA. In
 * practice, they should be replaced by operators which would do something about
 * the data, like drawing charts. </li>
 *
 * </ul>
 * <br>
 *
 * @since 0.3.2
 */
@ApplicationAnnotation(name = "YahooFinanceExample")
public class YahooFinanceApplication implements StreamingApplication
{
  protected int streamingWindowSizeMilliSeconds = 1000; // 1 second
  protected int appWindowCountMinute = 60;   // 1 minute
  protected int appWindowCountSMA = 300;  // 5 minute
  //protected String[] tickers = {"IBM", "GOOG", "AAPL", "AABA"};

  /**
   * Instantiate stock input operator for actual Yahoo finance ticks of symbol, last price, total daily volume, and last traded price.
   * @param name  Operator name
   * @param dag   Application DAG graph.
   * @return StockTickInput instance.
   */
  public StockTickInput getStockTickInputOperator(String name, DAG dag)
  {
    StockTickInput oper = dag.addOperator(name, StockTickInput.class);
    oper.readIntervalMillis = 200;
    //oper.symbols = tickers;
    return oper;
  }

  /**
   * Instantiate {@link org.apache.apex.malhar.lib.math.SumKeyVal} operator
   * to sends total daily volume by adding volumes from each ticks.
   * @param name  Operator name
   * @param dag   Application DAG graph.
   * @return SumKeyVal instance.
   */
  public SumKeyVal<String, Long> getDailyVolumeOperator(String name, DAG dag)
  {
    SumKeyVal<String, Long> oper = dag.addOperator(name, new SumKeyVal<String, Long>());
    oper.setType(Long.class);
    oper.setCumulative(true);
    return oper;
  }

  /**
   * Instantiate {@link org.apache.apex.malhar.lib.math.SumKeyVal} operator
   * Get aggregated volume of 1 minute and send at the end window of 1 minute.
   * @param name  Operator name
   * @param dag   Application DAG graph.
   * @param appWindowCount Operator aggregate window size.
   * @return SumKeyVal instance.
   */
  public SumKeyVal<String, Long> getMinuteVolumeOperator(String name, DAG dag, int appWindowCount)
  {
    SumKeyVal<String, Long> oper = dag.addOperator(name, new SumKeyVal<String, Long>());
    oper.setType(Long.class);
    dag.getOperatorMeta(name).getAttributes().put(OperatorContext.APPLICATION_WINDOW_COUNT, appWindowCount);
    return oper;
  }

  /**
   * Instantiate {@link org.apache.apex.malhar.lib.math.RangeKeyVal} operator to get high/low
   * value for each key within given application window.
   * Get High-low range for 1 minute.
   * @param name  Operator name
   * @param dag   Application DAG graph.
   * @param appWindowCount Operator aggregate window size.
   * @return RangeKeyVal instance.
   */
  public RangeKeyVal<String, Double> getHighLowOperator(String name, DAG dag, int appWindowCount)
  {
    RangeKeyVal<String, Double> oper = dag.addOperator(name, new RangeKeyVal<String, Double>());
    dag.getOperatorMeta(name).getAttributes().put(OperatorContext.APPLICATION_WINDOW_COUNT, appWindowCount);
    oper.setType(Double.class);
    return oper;
  }

  /**
   * Instantiate {@link org.apache.apex.malhar.lib.stream.ConsolidatorKeyVal} to send
   * Quote (Merge price, daily volume, time)
   * @param name  Operator name
   * @param dag   Application DAG graph.
   * @return ConsolidatorKeyVal instance.
   */
  public ConsolidatorKeyVal<String,Double,Long,String,?,?> getQuoteOperator(String name, DAG dag)
  {
    ConsolidatorKeyVal<String,Double,Long,String,?,?> oper = dag.addOperator(name, new ConsolidatorKeyVal<String,Double,Long,String,Object,Object>());
    return oper;
  }

  /**
   * Instantiate {@link org.apache.apex.malhar.lib.stream.ConsolidatorKeyVal} to send
   * Chart (Merge minute volume and minute high-low)
   * @param name  Operator name
   * @param dag   Application DAG graph.
   * @return ConsolidatorKeyVal instance.
   */
  public ConsolidatorKeyVal<String,HighLow<Double>,Long,?,?,?> getChartOperator(String name, DAG dag)
  {
    ConsolidatorKeyVal<String,HighLow<Double>,Long,?,?,?> oper = dag.addOperator(name, new ConsolidatorKeyVal<String,HighLow<Double>,Long,Object,Object,Object>());
    return oper;
  }

  /**
   * Instantiate {@link org.apache.apex.malhar.lib.multiwindow.SimpleMovingAverage} to calculate moving average for price
   * over given window size. Sliding window size is 1.
   * @param name  Operator name
   * @param dag   Application DAG graph.
   * @param appWindowCount Operator aggregate window size.
   * @return SimpleMovingAverage instance.
   */
  public SimpleMovingAverage<String, Double> getPriceSimpleMovingAverageOperator(String name, DAG dag, int appWindowCount)
  {
    SimpleMovingAverage<String, Double> oper = dag.addOperator(name, new SimpleMovingAverage<String, Double>());
    oper.setWindowSize(appWindowCount);
    oper.setType(Double.class);
    return oper;
  }

  /**
   * Get console for output operator.
   * @param name  Operator name
   * @param dag   Application DAG graph.
   * @return input port for console output.
   */
  public InputPort<Object> getConsole(String name, /*String nodeName,*/ DAG dag, String prefix)
  {
    // hack to output to HTTP based on actual environment
        /*
     String serverAddr = System.getenv("MALHAR_AJAXSERVER_ADDRESS");
     if (serverAddr != null) {
     HttpOutputOperator<Object> oper = dag.addOperator(name, new HttpOutputOperator<Object>());
     oper.setResourceURL(URI.create("http://" + serverAddr + "/channel/" + nodeName));
     return oper.input;
     }
     */

    ConsoleOutputOperator oper = dag.addOperator(name, ConsoleOutputOperator.class);
    oper.setStringFormat(prefix + ": %s");
    return oper.input;
  }

  /**
   * Populate Yahoo Finance Example Application DAG.
   */
  @SuppressWarnings("unchecked")
  @Override
  public void populateDAG(DAG dag, Configuration conf)
  {

    dag.getAttributes().put(DAG.STREAMING_WINDOW_SIZE_MILLIS, streamingWindowSizeMilliSeconds);

    StockTickInput tick = getStockTickInputOperator("StockTickInput", dag);
    SumKeyVal<String, Long> dailyVolume = getDailyVolumeOperator("DailyVolume", dag);
    ConsolidatorKeyVal<String,Double,Long,String,?,?> quoteOperator = getQuoteOperator("Quote", dag);

    RangeKeyVal<String, Double> highlow = getHighLowOperator("HighLow", dag, appWindowCountMinute);
    SumKeyVal<String, Long> minuteVolume = getMinuteVolumeOperator("MinuteVolume", dag, appWindowCountMinute);
    ConsolidatorKeyVal<String,HighLow<Double>,Long,?,?,?> chartOperator = getChartOperator("Chart", dag);

    SimpleMovingAverage<String, Double> priceSMA = getPriceSimpleMovingAverageOperator("PriceSMA", dag, appWindowCountSMA);
    DefaultPartitionCodec<String, Double> codec = new DefaultPartitionCodec<String, Double>();
    dag.setInputPortAttribute(highlow.data, PortContext.STREAM_CODEC, codec);
    dag.setInputPortAttribute(priceSMA.data, PortContext.STREAM_CODEC, codec);
    dag.addStream("price", tick.price, quoteOperator.in1, highlow.data, priceSMA.data);
    dag.addStream("vol", tick.volume, dailyVolume.data, minuteVolume.data);
    dag.addStream("time", tick.time, quoteOperator.in3);
    dag.addStream("daily_vol", dailyVolume.sum, quoteOperator.in2);

    dag.addStream("quote_data", quoteOperator.out, getConsole("quoteConsole", dag, "QUOTE"));

    dag.addStream("high_low", highlow.range, chartOperator.in1);
    dag.addStream("vol_1min", minuteVolume.sum, chartOperator.in2);
    dag.addStream("chart_data", chartOperator.out, getConsole("chartConsole", dag, "CHART"));

    dag.addStream("sma_price", priceSMA.doubleSMA, getConsole("priceSMAConsole", dag, "Price SMA"));

  }

}
