blob: 0172b489f4bbc8475085125756a0cfe0ada1639f [file] [log] [blame]
/*
* Copyright (c) 2013 DataTorrent, Inc. ALL Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.datatorrent.demos.frauddetect.operator;
import com.datatorrent.api.BaseOperator;
import com.datatorrent.api.Context;
import javax.validation.constraints.NotNull;
import com.datatorrent.api.DefaultInputPort;
import com.mongodb.*;
import com.mongodb.util.JSON;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.List;
/**
* Operator to write data into MongoDB
*
* @since 0.9.0
*/
public class MongoDBOutputOperator extends BaseOperator {
@NotNull
protected String hostName;
@NotNull
protected String dataBase;
@NotNull
protected String collection;
protected WriteConcern writeConcern = WriteConcern.ACKNOWLEDGED;
protected String userName;
protected String passWord;
protected transient MongoClient mongoClient;
protected transient DB db;
protected transient DBCollection dbCollection;
protected List<DBObject> dataList = new ArrayList<DBObject>();
public MongoDBOutputOperator() {
}
/**
* Take the JSON formatted string and convert it to DBObject
*/
public transient final DefaultInputPort<String> inputPort = new DefaultInputPort<String>() {
@Override
public void process(String tuple) {
dataList.add((DBObject)JSON.parse(tuple));
}
};
@Override
public void setup(Context.OperatorContext context) {
super.setup(context);
try {
mongoClient = new MongoClient(hostName);
db = mongoClient.getDB(dataBase);
if (userName != null && passWord != null) {
if (!db.authenticate(userName, passWord.toCharArray())) {
throw new IllegalArgumentException("MongoDB authentication failed. Illegal username and password for MongoDB!!");
}
}
dbCollection = db.getCollection(collection);
}
catch (UnknownHostException ex) {
logger.debug(ex.toString());
}
}
@Override
public void beginWindow(long windowId) {
// nothing
}
@Override
public void endWindow() {
logger.debug("mongo datalist size: " + dataList.size());
if (dataList.size() > 0) {
WriteResult result = dbCollection.insert(dataList, writeConcern);
logger.debug("Result for MongoDB insert: " + result);
dataList.clear();
}
}
@Override
public void teardown() {
if (mongoClient != null) {
mongoClient.close();
}
}
public String getHostName() {
return hostName;
}
public void setHostName(String hostName) {
this.hostName = hostName;
}
public String getDataBase() {
return dataBase;
}
public void setDataBase(String dataBase) {
this.dataBase = dataBase;
}
public String getCollection() {
return collection;
}
public void setCollection(String collection) {
this.collection = collection;
}
public String getUserName() {
return userName;
}
public void setUserName(String userName) {
this.userName = userName;
}
public String getPassWord() {
return passWord;
}
public void setPassWord(String passWord) {
this.passWord = passWord;
}
public WriteConcern getWriteConcern() {
return writeConcern;
}
public void setWriteConcern(WriteConcern writeConcern) {
this.writeConcern = writeConcern;
}
private static final Logger logger = LoggerFactory.getLogger(MongoDBOutputOperator.class);
}