blob: 9eaa0021a769bb8a814a9d81bce6cd0625740006 [file] [log] [blame]
/*
* Copyright (c) 2013 DataTorrent, Inc. ALL Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.datatorrent.demos.yahoofinance;
import java.util.Map;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.DAG;
import com.datatorrent.api.annotation.ApplicationAnnotation;
import com.datatorrent.lib.io.ConsoleOutputOperator;
import com.datatorrent.lib.io.smtp.SmtpIdempotentOutputOperator;
import com.datatorrent.lib.streamquery.DerbySqlStreamOperator;
import org.apache.hadoop.conf.Configuration;
import com.google.common.collect.Maps;
/**
* This demo will output the stock market data from yahoo finance
*
* @since 0.3.2
*/
@ApplicationAnnotation(name = "YahooFinanceWithAlertDemo")
public class ApplicationWithAlert implements StreamingApplication
{
@Override
public void populateDAG(DAG dag, Configuration conf)
{
String symbolStr = conf.get(ApplicationWithAlert.class.getName() + ".tickerSymbols", "YHOO,GOOG,AAPL,FB,AMZN,NFLX,IBM");
String thresholdStr = conf.get(ApplicationWithAlert.class.getName() + ".tickerThreshold", "39.00, 1100.00, 540.00, 73.00, 365.00, 448.00, 185.00");
String[] symbols = symbolStr.split(",");
String[] priceThreshold = thresholdStr.split(",");
Double[] tickerThreshold = new Double[priceThreshold.length];
for (int i = 0; i < priceThreshold.length; i++) {
tickerThreshold[i] = Double.parseDouble(priceThreshold[i]);
}
YahooFinanceCSVInputOperator input1 = dag.addOperator("input1", new YahooFinanceCSVInputOperator());
DerbySqlStreamOperator sqlOper = dag.addOperator("sqlOper", new DerbySqlStreamOperator());
YahooFinanceAlertEscalationOperator alertOper = dag.addOperator("alert", new YahooFinanceAlertEscalationOperator());
Integer alertInterval = Integer.valueOf(conf.get(ApplicationWithAlert.class.getName() + ".alertInterval", "60000"));
alertOper.setAlertInterval(alertInterval); // 60 seconds
for (String symbol : symbols) {
input1.addSymbol(symbol);
}
input1.addFormat("s0");
input1.addFormat("l1");
DerbySqlStreamOperator.InputSchema inputSchema1 = new DerbySqlStreamOperator.InputSchema("t1");
inputSchema1.setColumnInfo("s0", "varchar(100)", true); // symbol
inputSchema1.setColumnInfo("l1", "float", false); // last trade
sqlOper.setInputSchema(0, inputSchema1);
// Generate the alert for each symbols.
Integer i = 0;
for (String smbol : symbols) {
sqlOper.addExecStatementString("SELECT SESSION.t1.s0 AS symbol, SESSION.t1.l1 AS last_trade FROM SESSION.t1 WHERE SESSION.t1.s0 = '" + smbol + "' AND SESSION.t1.l1 < " + tickerThreshold[i]);
i++;
}
// sqlOper.setStatement("SELECT SESSION.t1.s0 AS symbol, SESSION.t1.l1 AS last_trade FROM SESSION.t1 WHERE SESSION.t1.s0 = 'AAPL' AND SESSION.t1.l1 < 450");
dag.addStream("input1_sql", input1.outputPort, sqlOper.in1);
dag.addStream("sql_alert", sqlOper.result, alertOper.in);
boolean isSmtp = validateSmtpParams(conf);
if (isSmtp) {
SmtpIdempotentOutputOperator mailOper = dag.addOperator("mail", getSMTPOperator(conf));
dag.addStream("alert_mail", alertOper.alert, mailOper.input);
}
else {
ConsoleOutputOperator consoleOperator = new ConsoleOutputOperator();
dag.addOperator("console", consoleOperator);
dag.addStream("alert_console", alertOper.alert, consoleOperator.input);
}
}
private boolean validateSmtpParams(Configuration conf)
{
String From = conf.get(ApplicationWithAlert.class.getName() + ".From");
String To = conf.get(ApplicationWithAlert.class.getName() + ".To");
String smtpHost = conf.get(ApplicationWithAlert.class.getName() + ".smtpHost");
String smtpPort = conf.get(ApplicationWithAlert.class.getName() + ".smtpPort");
String smtpUser = conf.get(ApplicationWithAlert.class.getName() + ".smtpUser");
String smtpPasswd = conf.get(ApplicationWithAlert.class.getName() + ".smtpPasswd");
return (isNotNull(From) && isNotNull(To) && isNotNull(smtpHost) && isNotNull(smtpPort) && isNotNull(smtpUser) && isNotNull(smtpPasswd));
}
private SmtpIdempotentOutputOperator getSMTPOperator(Configuration conf)
{
String From = conf.get(ApplicationWithAlert.class.getName() + ".From");
String To = conf.get(ApplicationWithAlert.class.getName() + ".To");
String smtpHost = conf.get(ApplicationWithAlert.class.getName() + ".smtpHost");
Integer smtpPort = Integer.parseInt(conf.get(ApplicationWithAlert.class.getName() + ".smtpPort"));
String smtpUser = conf.get(ApplicationWithAlert.class.getName() + ".smtpUser");
String smtpPasswd = conf.get(ApplicationWithAlert.class.getName() + ".smtpPasswd");
Boolean useSsl = Boolean.valueOf(conf.get(ApplicationWithAlert.class.getName() + ".useSsl"));
SmtpIdempotentOutputOperator mailOper = new SmtpIdempotentOutputOperator();
mailOper.setFrom(From);
Map<String, String> recipients = Maps.newHashMap();
recipients.put("To", To);
mailOper.setRecipients(recipients);
mailOper.setContent("Threshold breach: {}\nThis is an auto-generated message. Do not reply.");
mailOper.setSubject("ALERT: Price drops below threshold");
mailOper.setSmtpHost(smtpHost);
mailOper.setSmtpPort(smtpPort);
mailOper.setSmtpUserName(smtpUser);
mailOper.setSmtpPassword(smtpPasswd);
mailOper.setUseSsl(useSsl);
return mailOper;
}
private boolean isNotNull(String str)
{
return (str != null && str.length() > 0);
}
}