blob: d416391585c28bb432154a58f3533005a86812de [file] [log] [blame]
package com.clojurewerkz.cascading.mongodb;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import cascading.tuple.FieldsResolverException;
import cascading.tuple.TupleEntry;
import com.google.common.base.Joiner;
import com.mongodb.MongoURI;
import com.mongodb.hadoop.mapred.MongoOutputFormat;
import com.mongodb.DBObject;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.util.StringUtils;
import org.bson.types.ObjectId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import cascading.flow.FlowProcess;
import cascading.scheme.Scheme;
import cascading.scheme.SinkCall;
import cascading.scheme.SourceCall;
import cascading.tap.Tap;
import cascading.tuple.Fields;
import cascading.tuple.Tuple;
import com.mongodb.BasicDBObject;
import com.mongodb.hadoop.io.BSONWritable;
import com.mongodb.hadoop.mapred.MongoInputFormat;
import com.mongodb.hadoop.util.MongoConfigUtil;
@SuppressWarnings("rawtypes")
public class MongoDBScheme extends Scheme<JobConf, RecordReader, OutputCollector, BSONWritable[], BSONWritable[]> {
/**
* Field logger
*/
private static final Logger logger = LoggerFactory.getLogger(MongoDbCollector.class);
private String pathUUID;
public String mongoUri;
public List<String> columnFieldNames;
public Map<String, String> fieldMappings;
public String keyColumnName;
public int splitSize = 64;
private String host;
private Integer port;
private String database;
private String collection;
private DBObject query;
// with default id, without query
public MongoDBScheme(String[] hosts, Integer[] ports, String database, String collection, List<String> columnFieldNames, Map<String, String> fieldMappings) {
this(hosts, ports, database, collection, "_id", columnFieldNames, fieldMappings);
}
// with default id and query
public MongoDBScheme(String[] hosts, Integer[] ports, String database, String collection, List<String> columnFieldNames, Map<String, String> fieldMappings, DBObject query) {
this(hosts, ports, database, collection, "_id", columnFieldNames, fieldMappings, query);
}
// without query
public MongoDBScheme(String[] hosts, Integer[] ports, String database, String collection, String keyColumnName, List<String> columnFieldNames, Map<String, String> fieldMappings) {
this(hosts, ports, database, collection, keyColumnName, columnFieldNames, fieldMappings, new BasicDBObject());
}
public MongoDBScheme(String[] hosts, Integer[] ports, String database, String collection, String keyColumnName, List<String> columnFieldNames, Map<String, String> fieldMappings, DBObject query) {
String[] hostsAndPorts = new String[hosts.length];
for (int i = 0; i != hosts.length; i++) {
hostsAndPorts[i] = hosts[i] + ":" + ports[i];
}
this.mongoUri = String.format("mongodb://%s/%s.%s", Joiner.on(",").join(hostsAndPorts), database, collection);
this.pathUUID = UUID.randomUUID().toString();
this.columnFieldNames = columnFieldNames;
this.fieldMappings = fieldMappings;
this.keyColumnName = keyColumnName;
this.host = host;
this.port = port;
this.database = database;
this.collection = collection;
this.query = query;
}
/**
*
* @return
*/
public Path getPath() {
return new Path(pathUUID);
}
/**
*
* @return
*/
public String getIdentifier() {
return String.format("%s_%d_%s_%s", this.host, this.port, this.database, this.collection);
}
/**
*
* @param process
* @param tap
* @param conf
*/
@Override
public void sourceConfInit(FlowProcess<JobConf> process, Tap<JobConf, RecordReader, OutputCollector> tap, JobConf conf) {
MongoConfigUtil.setInputURI( conf, this.mongoUri );
FileInputFormat.setInputPaths(conf, this.getIdentifier());
conf.setInputFormat(MongoInputFormat.class);
// TODO: MongoConfigUtil.setFields(conf, fieldsBson);
//if (!this.query.isEmpty())
MongoConfigUtil.setQuery(conf, this.query);
MongoConfigUtil.setSplitSize(conf, splitSize);
// TODO: MongoConfigUtil.setFields(conf, fields);
}
/**
*
* @param process
* @param tap
* @param conf
*/
@Override
public void sinkConfInit(FlowProcess<JobConf> process, Tap<JobConf, RecordReader, OutputCollector> tap,
JobConf conf) {
conf.setOutputFormat(MongoOutputFormat.class);
MongoConfigUtil.setOutputURI(conf, this.mongoUri);
//FileOutputFormat.setOutputPath(conf, getPath());
}
/**
*
* @param flowProcess
* @param sourceCall
*/
@Override
public void sourcePrepare(FlowProcess<JobConf> flowProcess, SourceCall<BSONWritable[], RecordReader> sourceCall) {
sourceCall.setContext(new BSONWritable[2]);
sourceCall.getContext()[0] = (BSONWritable) sourceCall.getInput().createKey();
sourceCall.getContext()[1] = (BSONWritable) sourceCall.getInput().createValue();
}
/**
*
* @param flowProcess
* @param sourceCall
* @return
* @throws IOException
*/
@Override
public boolean source(FlowProcess<JobConf> flowProcess, SourceCall<BSONWritable[], RecordReader> sourceCall) throws IOException {
Tuple result = new Tuple();
BSONWritable key = sourceCall.getContext()[0];
BSONWritable value = sourceCall.getContext()[1];
if (!sourceCall.getInput().next(key, value)) {
logger.info("Nothing left to read, exiting");
return false;
}
for (String columnFieldName : columnFieldNames) {
Object tupleEntry= value.getDoc().get(columnFieldName);
if (tupleEntry != null) {
result.add(tupleEntry);
} else if (columnFieldName != this.keyColumnName) {
result.add(null);
}
}
sourceCall.getIncomingEntry().setTuple(result);
return true;
}
/**
*
* @param flowProcess
* @param sinkCall
* @throws IOException
*/
@Override
public void sink(FlowProcess<JobConf> flowProcess, SinkCall<BSONWritable[], OutputCollector> sinkCall) throws IOException {
TupleEntry tupleEntry = sinkCall.getOutgoingEntry();
OutputCollector outputCollector = sinkCall.getOutput();
String keyFieldName = this.fieldMappings.get(this.keyColumnName);
Object key;
// if fieldMappings doesn't have keyColumnName ("_id") field, then use new ObjectId() as key
if (keyFieldName == null) {
key = null;
} else {
key = new Text((String)(tupleEntry.selectTuple(new Fields(keyFieldName)).get(0)));
}
//Object key = tupleEntry.selectTuple(new Fields(this.fieldMappings.get(this.keyColumnName))).get(0);
BasicDBObject dbObject = new BasicDBObject();
for (String columnFieldName : columnFieldNames) {
String columnFieldMapping = fieldMappings.get(columnFieldName);
Object tupleEntryValue = null;
try {
if (columnFieldMapping != null) {
// columnFieldMapping is null if no corresponding field name defined in Mappings.
// only write the field value back to mongo if the field also defined in Mappings (ie. not null)
tupleEntryValue = tupleEntry.get(columnFieldMapping);
}
} catch(FieldsResolverException e) {
logger.error("Couldn't resolve field: {}", columnFieldName);
}
if(tupleEntryValue != null && columnFieldName != keyColumnName) {
//logger.info("Putting for output: {} {}", columnFieldName, tupleEntryValue);
dbObject.put(columnFieldName, tupleEntryValue);
}
}
//logger.info("Putting key for output: {} {}", key, dbObject);
outputCollector.collect(key, new BSONWritable(dbObject));
}
}