blob: 1155d12600c41b5a7517b40a7f34d55eaabec222 [file] [log] [blame]
/*
* Copyright (c) 2013 DataTorrent, Inc. ALL Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.datatorrent.demos.yahoofinance;
import au.com.bytecode.opencsv.CSVReader;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.lib.io.SimpleSinglePortInputOperator;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import org.apache.commons.httpclient.HttpClient;
import org.apache.commons.httpclient.HttpStatus;
import org.apache.commons.httpclient.cookie.CookiePolicy;
import org.apache.commons.httpclient.methods.GetMethod;
import org.apache.commons.httpclient.params.DefaultHttpParams;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Grabs Yahoo Finance quotes data and emits HashMap, with key equals the format name (e.g. "s0") <p>
*
* @since 0.3.2
*/
public class YahooFinanceCSVInputOperator extends SimpleSinglePortInputOperator<HashMap<String, Object>> implements Runnable
{
private static final Logger logger = LoggerFactory.getLogger(YahooFinanceCSVInputOperator.class);
/**
* Timeout interval for reading from server. 0 or negative indicates no timeout.
*/
private int readIntervalMillis = 500;
/**
* The URL of the web service resource for the POST request.
*/
private String url;
private transient HttpClient client;
private transient GetMethod method;
private ArrayList<String> symbolList = new ArrayList<String>();
private ArrayList<String> parameterList = new ArrayList<String>();
public void addSymbol(String symbol)
{
symbolList.add(symbol);
}
public void addFormat(String format)
{
parameterList.add(format);
}
public ArrayList<String> getSymbolList()
{
return symbolList;
}
public ArrayList<String> getParameterList()
{
return parameterList;
}
public int getReadIntervalMillis()
{
return readIntervalMillis;
}
public void setReadIntervalMillis(int readIntervalMillis)
{
this.readIntervalMillis = readIntervalMillis;
}
/**
* Prepare URL from symbols and parameters.
* URL will be something like: http://download.finance.yahoo.com/d/quotes.csv?s=GOOG,FB,YHOO&f=sl1vt1&e=.csv
* @return
*/
private String prepareURL()
{
String str = "http://download.finance.yahoo.com/d/quotes.csv?";
str += "s=";
for (int i = 0; i < symbolList.size(); i++) {
if (i != 0) {
str += ",";
}
str += symbolList.get(i);
}
str += "&f=";
for (String format: parameterList) {
str += format;
}
str += "&e=.csv";
return str;
}
@Override
public void setup(OperatorContext context)
{
url = prepareURL();
client = new HttpClient();
method = new GetMethod(url);
DefaultHttpParams.getDefaultParams().setParameter("http.protocol.cookie-policy", CookiePolicy.BROWSER_COMPATIBILITY);
}
@Override
public void run()
{
while (true) {
try {
int statusCode = client.executeMethod(method);
if (statusCode != HttpStatus.SC_OK) {
System.err.println("Method failed: " + method.getStatusLine());
}
else {
InputStream istream;
istream = method.getResponseBodyAsStream();
// Process response
InputStreamReader isr = new InputStreamReader(istream);
CSVReader reader = new CSVReader(isr);
List<String[]> myEntries;
myEntries = reader.readAll();
for (String[] stringArr: myEntries) {
HashMap<String,Object> hm = new HashMap<String,Object>();
for (int i = 0; i < parameterList.size(); i++) {
hm.put(parameterList.get(i), stringArr[i]);
}
outputPort.emit(hm); // send out one symbol at a time
}
}
Thread.sleep(readIntervalMillis);
}
catch (InterruptedException ex) {
logger.debug(ex.toString());
}
catch (IOException ex) {
logger.debug(ex.toString());
}
}
}
}