blob: 7eaf7cd6ea039020ac05670f2c9245292604a694 [file] [log] [blame]
/*
* Copyright 2009-2011 by The Regents of the University of California
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.uci.ics.asterix.external.dataset.adapter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import edu.uci.ics.asterix.external.data.adapter.api.IDatasourceAdapter;
import edu.uci.ics.asterix.external.data.parser.IDataParser;
import edu.uci.ics.asterix.external.data.parser.IDataStreamParser;
import edu.uci.ics.asterix.external.data.parser.IManagedDataParser;
import edu.uci.ics.asterix.external.data.parser.ManagedDelimitedDataStreamParser;
import edu.uci.ics.asterix.feed.intake.FeedStream;
import edu.uci.ics.asterix.feed.intake.IFeedClient;
import edu.uci.ics.asterix.feed.intake.RSSFeedClient;
import edu.uci.ics.asterix.feed.managed.adapter.IManagedFeedAdapter;
import edu.uci.ics.asterix.feed.managed.adapter.IMutableFeedAdapter;
import edu.uci.ics.asterix.om.types.ARecordType;
import edu.uci.ics.asterix.om.types.IAType;
import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
public class RSSFeedAdapter extends AbstractDatasourceAdapter implements IDatasourceAdapter, IManagedFeedAdapter,
IMutableFeedAdapter {
private List<String> feedURLs = new ArrayList<String>();
private boolean isStopRequested = false;
private boolean isAlterRequested = false;
private Map<String, String> alteredParams = new HashMap<String, String>();
private String id_prefix = "";
public static final String KEY_RSS_URL = "url";
public static final String KEY_INTERVAL = "interval";
public boolean isStopRequested() {
return isStopRequested;
}
public void setStopRequested(boolean isStopRequested) {
this.isStopRequested = isStopRequested;
}
@Override
public IDataParser getDataParser(int partition) throws Exception {
IDataParser dataParser = new ManagedDelimitedDataStreamParser();
((IManagedDataParser) dataParser).setAdapter(this);
dataParser.configure(configuration);
dataParser.initialize((ARecordType) atype, ctx);
IFeedClient feedClient = new RSSFeedClient(this, feedURLs.get(partition), id_prefix);
FeedStream feedStream = new FeedStream(feedClient, ctx);
((IDataStreamParser) dataParser).setInputStream(feedStream);
return dataParser;
}
@Override
public void alter(Map<String, String> properties) throws Exception {
isAlterRequested = true;
this.alteredParams = properties;
reconfigure(properties);
}
public void postAlteration() {
alteredParams = null;
isAlterRequested = false;
}
@Override
public void beforeSuspend() throws Exception {
// TODO Auto-generated method stub
}
@Override
public void beforeResume() throws Exception {
// TODO Auto-generated method stub
}
@Override
public void beforeStop() throws Exception {
// TODO Auto-generated method stub
}
@Override
public void stop() throws Exception {
isStopRequested = true;
}
@Override
public AdapterDataFlowType getAdapterDataFlowType() {
return AdapterDataFlowType.PULL;
}
@Override
public AdapterType getAdapterType() {
return AdapterType.READ;
}
@Override
public void configure(Map<String, String> arguments, IAType atype) throws Exception {
configuration = arguments;
this.atype = atype;
String rssURLProperty = configuration.get(KEY_RSS_URL);
if (rssURLProperty == null) {
throw new IllegalArgumentException("no rss url provided");
}
initializeFeedURLs(rssURLProperty);
configurePartitionConstraints();
}
private void initializeFeedURLs(String rssURLProperty) {
feedURLs.clear();
String[] feedURLProperty = rssURLProperty.split(",");
for (String feedURL : feedURLProperty) {
feedURLs.add(feedURL);
}
}
protected void reconfigure(Map<String, String> arguments) {
String rssURLProperty = configuration.get(KEY_RSS_URL);
if (rssURLProperty != null) {
initializeFeedURLs(rssURLProperty);
}
}
protected void configurePartitionConstraints() {
partitionConstraint = new AlgebricksCountPartitionConstraint(feedURLs.size());
}
@Override
public void initialize(IHyracksTaskContext ctx) throws Exception {
this.ctx = ctx;
id_prefix = ctx.getJobletContext().getApplicationContext().getNodeId();
}
public boolean isAlterRequested() {
return isAlterRequested;
}
public Map<String, String> getAlteredParams() {
return alteredParams;
}
}