blob: 5dd6cf583dbac6ba646daa208abf6dff9cbda916 [file] [log] [blame]
package backtype.storm.clojure;
import backtype.storm.generated.StreamInfo;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import clojure.lang.IFn;
import clojure.lang.Keyword;
import clojure.lang.RT;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
public class ClojureBolt implements IRichBolt {
Map<Integer, StreamInfo> _fields;
String _namespace;
String _fnName;
List<Object> _args;
IFn _execute;
IFn _cleanup;
OutputCollector _collector;
public ClojureBolt(String namespace, String fnName, Map<Integer, StreamInfo> fields) {
this(namespace, fnName, new ArrayList<Object>(), fields);
}
public ClojureBolt(String namespace, String fnName, List<Object> args, Map<Integer, StreamInfo> fields) {
_namespace = namespace;
_fnName = fnName;
_args = args;
_fields = fields;
}
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
_collector = collector;
try {
clojure.lang.Compiler.eval(RT.readString("(require '" + _namespace + ")"));
} catch (Exception e) {
//if playing from the repl and defining functions, file won't exist
}
IFn hof = (IFn) RT.var(_namespace, _fnName).deref();
try {
Object fns = hof.applyTo(RT.seq(_args));
if(fns instanceof Map) {
Map fnMap = (Map) fns;
IFn prepare = (IFn) fnMap.get(Keyword.intern("prepare"));
if(prepare!=null)
prepare.invoke(stormConf, context, collector);
_execute = (IFn) fnMap.get(Keyword.intern("execute"));
_cleanup = (IFn) fnMap.get(Keyword.intern("cleanup"));
} else {
_execute = (IFn) fns;
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@Override
public void execute(Tuple input) {
try {
_execute.invoke(input, _collector);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@Override
public void cleanup() {
if(_cleanup!=null) {
try {
_cleanup.invoke(_collector);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
for(Integer stream: _fields.keySet()) {
StreamInfo info = _fields.get(stream);
declarer.declareStream(stream, info.is_direct(), new Fields(info.get_output_fields()));
}
}
}