blob: 014c2689c680f569d6790d0f4515a3d30cf0056a [file] [log] [blame]
/*
* Copyright (c) 2013 DataTorrent, Inc. ALL Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.datatorrent.demos.yahoofinance;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import com.datatorrent.api.annotation.ApplicationAnnotation;
import com.datatorrent.api.DAG;
import com.datatorrent.demos.yahoofinance.StockTickInput;
import com.datatorrent.lib.chart.CandleStick;
import com.datatorrent.lib.chart.TimeSeriesAverageChartOperator;
import com.datatorrent.lib.chart.TimeSeriesCandleStickChartOperator;
import com.datatorrent.lib.chart.XYChartOperator.NumberType;
import com.datatorrent.lib.stream.DevNull;
import com.datatorrent.lib.util.KeyValPair;
/**
* <p>YahooFinanceApplication class
*
* This demo is used to demonstrate the usage of chart operators.
* The demo plots the stock prices against time and showcases 2 types of charts : AverageCha avergert and CandleStickChart.
*
* It has the following operators:
* StockTickInput operator: Feeds in the actual Yahoo finance ticks of stock prices.
* AverageChart Operator: Plots the average (mean) value of stock prices against time.
* CandleStick Operator: Plots the candle stick chart of stock price against time. Candle stick chart is a style of bar-chart.
* The tuple recording should be turned on to see the charts on UI.
*
* </p>
*
* @since 0.3.2
*/
@ApplicationAnnotation(name="YahooFinanceDemoWithChart")
public class YahooFinanceApplicationWithChart extends com.datatorrent.demos.yahoofinance.YahooFinanceApplication
{
public static class YahooFinanceTimeSeriesAverageChartOperator extends TimeSeriesAverageChartOperator<String>
{
@Override
public Number convertTupleToY(Object tuple)
{
KeyValPair<String, Double> kvp = (KeyValPair<String, Double>)tuple;
return kvp.getValue();
}
@Override
public String convertTupleToKey(Object tuple)
{
KeyValPair<String, Double> kvp = (KeyValPair<String, Double>)tuple;
return kvp.getKey();
}
}
public static class YahooFinanceTimeSeriesCandleStickChartOperator extends TimeSeriesCandleStickChartOperator<String>
{
@Override
public Number convertTupleToY(Object tuple)
{
KeyValPair<String, Double> kvp = (KeyValPair<String, Double>)tuple;
return kvp.getValue();
}
@Override
public String convertTupleToKey(Object tuple)
{
KeyValPair<String, Double> kvp = (KeyValPair<String, Double>)tuple;
return kvp.getKey();
}
}
TimeSeriesAverageChartOperator<String> getAverageChartOperator(String name, DAG dag)
{
YahooFinanceTimeSeriesAverageChartOperator op = new YahooFinanceTimeSeriesAverageChartOperator();
op.setyNumberType(NumberType.FLOAT);
return dag.addOperator(name, op);
}
TimeSeriesCandleStickChartOperator<String> getCandleStickChartOperator(String name, DAG dag)
{
YahooFinanceTimeSeriesCandleStickChartOperator op = new YahooFinanceTimeSeriesCandleStickChartOperator();
op.setyNumberType(NumberType.FLOAT);
return dag.addOperator(name, op);
}
@Override
public void populateDAG(DAG dag, Configuration conf)
{
StockTickInput tick = getStockTickInputOperator("StockTickInput", dag);
tick.setOutputEvenIfZeroVolume(true);
DAG.StreamMeta stream = dag.addStream("price", tick.price);
TimeSeriesAverageChartOperator<String> averageChartOperator = getAverageChartOperator("AverageChart", dag);
TimeSeriesCandleStickChartOperator<String> candleStickChartOperator = getCandleStickChartOperator("CandleStickChart", dag);
DevNull<Map<String, Map<Number, Number>>> devnull1 = dag.addOperator("devnull1", new DevNull<Map<String, Map<Number, Number>>>());
DevNull<Map<String, Map<Number, CandleStick>>> devnull2 = dag.addOperator("devnull2", new DevNull<Map<String, Map<Number, CandleStick>>>());
stream.addSink(averageChartOperator.in1);
stream.addSink(candleStickChartOperator.in1);
dag.addStream("averageDummyStream", averageChartOperator.chart, devnull1.data);
dag.addStream("candleStickDummyStream", candleStickChartOperator.chart, devnull2.data);
}
}