blob: 057bad68ba98e136399f67b57949890a5d84d831 [file] [log] [blame]
package com.clojurewerkz.cascading.mongodb;
import cascading.flow.FlowProcess;
import cascading.flow.hadoop.HadoopFlowProcess;
import cascading.tap.Tap;
import cascading.tap.TapException;
import cascading.tuple.TupleEntrySchemeCollector;
import com.mongodb.DBObject;
import com.mongodb.hadoop.MongoOutput;
import com.mongodb.hadoop.io.BSONWritable;
import com.mongodb.hadoop.mapred.output.MongoRecordWriter;
import org.apache.hadoop.mapred.*;
import org.bson.BSONObject;
import org.bson.types.ObjectId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
public class MongoDbCollector extends TupleEntrySchemeCollector<JobConf, OutputCollector> implements OutputCollector {
/**
* Field LOG
*/
private static final Logger LOG = LoggerFactory.getLogger(MongoDbCollector.class);
private final JobConf conf;
private RecordWriter writer;
private final FlowProcess<JobConf> hadoopFlowProcess;
private final Tap<JobConf, RecordReader, OutputCollector> tap;
private final Reporter reporter = Reporter.NULL;
/**
* Constructor TapCollector creates a new TapCollector instance.
*
* @param flowProcess
* @param tap of type Tap
* @throws IOException when fails to initialize
*/
public MongoDbCollector(FlowProcess<JobConf> flowProcess, Tap<JobConf, RecordReader, OutputCollector> tap) throws IOException {
super(flowProcess, tap.getScheme());
this.hadoopFlowProcess = flowProcess;
this.tap = tap;
this.conf = new JobConf(flowProcess.getConfigCopy());
this.setOutput(this);
}
/**
*
*/
@Override
public void prepare() {
try {
initialize();
} catch (IOException e) {
throw new RuntimeException(e);
}
super.prepare();
}
/**
*
* @throws IOException
*/
private void initialize() throws IOException {
tap.sinkConfInit(hadoopFlowProcess, conf);
OutputFormat outputFormat = conf.getOutputFormat();
LOG.info("Output format class is: " + outputFormat.getClass().toString());
writer = outputFormat.getRecordWriter(null, conf, tap.getIdentifier(), Reporter.NULL);
sinkCall.setOutput(this);
}
/**
*
*/
@Override
public void close() {
try {
LOG.info("closing tap collector for: {}", tap);
writer.close(reporter);
} catch (IOException exception) {
LOG.warn("exception closing: {}", exception);
throw new TapException("exception closing JDBCTapCollector", exception);
} finally {
super.close();
}
}
/**
*
* @param writableComparable
* @param writable
* @throws IOException
*/
public void collect(Object writableComparable, Object writable) throws IOException {
if (hadoopFlowProcess instanceof HadoopFlowProcess)
((HadoopFlowProcess) hadoopFlowProcess).getReporter().progress();
((MongoRecordWriter) writer).write(writableComparable, writable);
}
}