| /* |
| * Copyright (c) 2013 DataTorrent, Inc. ALL Rights Reserved. |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package com.datatorrent.demos.yahoofinance; |
| |
| import au.com.bytecode.opencsv.CSVReader; |
| |
| import com.datatorrent.api.Context.OperatorContext; |
| import com.datatorrent.lib.io.SimpleSinglePortInputOperator; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.io.InputStreamReader; |
| import java.util.ArrayList; |
| import java.util.HashMap; |
| import java.util.List; |
| import org.apache.commons.httpclient.HttpClient; |
| import org.apache.commons.httpclient.HttpStatus; |
| import org.apache.commons.httpclient.cookie.CookiePolicy; |
| import org.apache.commons.httpclient.methods.GetMethod; |
| import org.apache.commons.httpclient.params.DefaultHttpParams; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * Grabs Yahoo Finance quotes data and emits HashMap, with key equals the format name (e.g. "s0") <p> |
| * |
| * @since 0.3.2 |
| */ |
| public class YahooFinanceCSVInputOperator extends SimpleSinglePortInputOperator<HashMap<String, Object>> implements Runnable |
| { |
| private static final Logger logger = LoggerFactory.getLogger(YahooFinanceCSVInputOperator.class); |
| /** |
| * Timeout interval for reading from server. 0 or negative indicates no timeout. |
| */ |
| private int readIntervalMillis = 500; |
| |
| /** |
| * The URL of the web service resource for the POST request. |
| */ |
| private String url; |
| private transient HttpClient client; |
| private transient GetMethod method; |
| |
| private ArrayList<String> symbolList = new ArrayList<String>(); |
| private ArrayList<String> parameterList = new ArrayList<String>(); |
| |
| public void addSymbol(String symbol) |
| { |
| symbolList.add(symbol); |
| } |
| |
| public void addFormat(String format) |
| { |
| parameterList.add(format); |
| } |
| |
| public ArrayList<String> getSymbolList() |
| { |
| return symbolList; |
| } |
| |
| public ArrayList<String> getParameterList() |
| { |
| return parameterList; |
| } |
| |
| public int getReadIntervalMillis() |
| { |
| return readIntervalMillis; |
| } |
| |
| public void setReadIntervalMillis(int readIntervalMillis) |
| { |
| this.readIntervalMillis = readIntervalMillis; |
| } |
| |
| /** |
| * Prepare URL from symbols and parameters. |
| * URL will be something like: http://download.finance.yahoo.com/d/quotes.csv?s=GOOG,FB,YHOO&f=sl1vt1&e=.csv |
| * @return |
| */ |
| private String prepareURL() |
| { |
| String str = "http://download.finance.yahoo.com/d/quotes.csv?"; |
| |
| str += "s="; |
| for (int i = 0; i < symbolList.size(); i++) { |
| if (i != 0) { |
| str += ","; |
| } |
| str += symbolList.get(i); |
| } |
| str += "&f="; |
| for (String format: parameterList) { |
| str += format; |
| } |
| str += "&e=.csv"; |
| return str; |
| } |
| |
| @Override |
| public void setup(OperatorContext context) |
| { |
| url = prepareURL(); |
| client = new HttpClient(); |
| method = new GetMethod(url); |
| DefaultHttpParams.getDefaultParams().setParameter("http.protocol.cookie-policy", CookiePolicy.BROWSER_COMPATIBILITY); |
| } |
| |
| @Override |
| public void run() |
| { |
| while (true) { |
| try { |
| int statusCode = client.executeMethod(method); |
| if (statusCode != HttpStatus.SC_OK) { |
| System.err.println("Method failed: " + method.getStatusLine()); |
| } |
| else { |
| InputStream istream; |
| istream = method.getResponseBodyAsStream(); |
| // Process response |
| InputStreamReader isr = new InputStreamReader(istream); |
| CSVReader reader = new CSVReader(isr); |
| List<String[]> myEntries; |
| myEntries = reader.readAll(); |
| for (String[] stringArr: myEntries) { |
| HashMap<String,Object> hm = new HashMap<String,Object>(); |
| for (int i = 0; i < parameterList.size(); i++) { |
| hm.put(parameterList.get(i), stringArr[i]); |
| } |
| outputPort.emit(hm); // send out one symbol at a time |
| } |
| } |
| Thread.sleep(readIntervalMillis); |
| } |
| catch (InterruptedException ex) { |
| logger.debug(ex.toString()); |
| } |
| catch (IOException ex) { |
| logger.debug(ex.toString()); |
| } |
| } |
| } |
| } |