blob: 75306a42a40840f6fe7fb4f15baa302cf9bf244e [file] [log] [blame]
/*
* Copyright (c) 2013 DataTorrent, Inc. ALL Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.datatorrent.contrib.mongodb;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.Operator;
import com.mongodb.*;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.validation.constraints.Min;
import org.bson.types.ObjectId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This is the base implementation for a non transactional output operator for MongoDB. 
* Subclasses should implement the column mapping for writing tuples out to MongoDB.
* <p>
* <br>
* Ports:<br>
* <b>Input</b>: Can have one input port <br>
* <b>Output</b>: no output port<br>
* <br>
* Properties:<br>
* <b>maxWindowTable</b>:the table to save the most recent inserted windowId, operatorId information for recovery use<br>
* <b>maxWindowCollection</b>:mongoDB collection of the maxWindowTable<br>
* <b>windowId</b>:Id of current window<br>
* <b>operatorId</b>:Id of the operator<br>
* <b>batchSize</b>:size for each batch insert, default value is 1000<br>
* <b>lastWindowId</b>:last inserted windowId, is obtained at setup from maxWindowTable with specific operatorId<br>
* <b>ignoreWindow</b>:the flag to indicate ignoring out of date window <br>
* <b>tupleId</b>:the Id of the tuple, incrementing at each tuple process, start from 1 at beginWindow()<br>
* <b>windowIdColumnName</b>:the name of the windowId column in maxWindowTable, should be set by the user<br>
* <b>operatorIdColumnName</b>:the name of the operatorId column in maxWindowTable, should be set by the user<br>
* <b>tableList</b>: the list of all the tables of the mapping<br>
* <b>tableToDocument</b>:each tuple corresponds to one document for one collection to be inserted<br>
* <b>tableToDocumentList</b>:for bulk insert, each table has a document list to insert. This is table and document list map <br>
* <b>tupleId</b>:the Id of the tuple, incrementing at each tuple process, start from 1 at beginWindow()<br>
* <b>queryFunction</b>:corresponding to the option for the ObjectId of 12 bytes format saving. The windowId, tupleId, operatorId of each tuple are saved in each collection as the column ObjectId for recovery<br>
* It Currently has 3 format for the ObjectId. When the operator recovers, it will remove the document which has the same windowId, operatorId as maxWindowTable in the collections, and insert the documents again<br>
* <br>
* Compile time checks:<br>
* None<br>
* <br>
* Run time checks:<br>
* hostName
* batchSize <br>
* <b>data type:</br>the insertion data can support all the Objects mongoDB supports<br>
*
* <b>Benchmarks</b>:
* <br>
* </p>
* @displayName MongoDB Output
* @category Database
* @tags output operator
* @since 0.3.2
*/
public abstract class MongoDBOutputOperator<T> extends MongoDBConnectable implements Operator
{
private static final Logger logger = LoggerFactory.getLogger(MongoDBOutputOperator.class);
protected static final int DEFAULT_BATCH_SIZE = 1000;
@Min(1)
protected long batchSize = DEFAULT_BATCH_SIZE;
protected transient ArrayList<String> tableList = new ArrayList<String>(); // all the tables in the mapping
protected transient HashMap<String, BasicDBObject> tableToDocument = new HashMap<String, BasicDBObject>(); // each table has one document to insert
protected transient HashMap<String, List<DBObject>> tableToDocumentList = new HashMap<String, List<DBObject>>();
protected String maxWindowTable;
protected transient DBCollection maxWindowCollection;
protected transient long windowId;
protected transient int operatorId;
// protected transient String applicationId;
protected transient long lastWindowId;
protected transient boolean ignoreWindow;
protected String windowIdColumnName;
protected String operatorIdColumnName;
protected transient int tupleId;
protected int queryFunction;
/**
* Implement how to process tuple in derived class based on HashMap or ArrayList.
* The tuple values are binded with SQL prepared statement to be inserted to database.
*
* @param tuple
* @throws SQLException
*/
public abstract void processTuple(T tuple);
/**
* This input port receives tuples which will be written to MongoDB.
*/
public final transient DefaultInputPort<T> inputPort = new DefaultInputPort<T>()
{
@Override
public void process(T tuple)
{
if (ignoreWindow) {
return; // ignore
}
try {
processTuple(tuple);
}
catch (Exception ex) {
throw new RuntimeException("Exception during process tuple", ex);
}
}
};
/**
* init last completed windowId information with operatorId, read from maxWindowTable.
* If the table is empty, insert a default value document
*/
public void initLastWindowInfo()
{
maxWindowCollection = db.getCollection(maxWindowTable);
BasicDBObject query = new BasicDBObject();
query.put(operatorIdColumnName, operatorId);
// query.put(applicationIdName, "0");
DBCursor cursor = maxWindowCollection.find(query);
if (cursor.hasNext()) {
Object obj = cursor.next().get(windowIdColumnName);
lastWindowId = (Long)obj;
}
else {
BasicDBObject doc = new BasicDBObject();
doc.put(windowIdColumnName, (long)0);
// doc.put(applicationIdName, 0);
doc.put(operatorIdColumnName, operatorId);
maxWindowCollection.save(doc);
}
System.out.println("last windowid:" + lastWindowId);
}
/**
* Implement Operator Interface.
* If windowId is less than the last completed windowId, then ignore the window.
* If windowId is equal to the last completed windowId, then remove the documents with same windowId of the operatorId, and insert the documents later
* If windowId is greater then the last completed windowId, then process the window
*
* @param windowId
*/
@Override
public void beginWindow(long windowId)
{
this.windowId = windowId;
tupleId = 1;
if (windowId < lastWindowId) {
ignoreWindow = true;
}
else if (windowId == lastWindowId) {
ignoreWindow = false;
BasicDBObject query = new BasicDBObject();
// query.put(windowIdColumnName, windowId);
// query.put(operatorIdColumnName, operatorId);
ByteBuffer bb = ByteBuffer.allocate(12);
bb.order(ByteOrder.BIG_ENDIAN);
StringBuilder low = new StringBuilder();
StringBuilder high = new StringBuilder();
if (queryFunction == 1) {
queryFunction1(bb, high, low);
}
else if (queryFunction == 2) {
queryFunction2(bb, high, low);
}
else if (queryFunction == 3) {
queryFunction3(bb, high, low);
}
else {
throw new RuntimeException("unknown queryFunction type:" + queryFunction);
}
query.put("_id", new BasicDBObject("$gte", new ObjectId(low.toString())).append("$lte", new ObjectId(high.toString())));
// query.put(applicationIdName, 0);
for (String table : tableList) {
db.getCollection(table).remove(query);
}
}
else {
ignoreWindow = false;
}
}
/**
* At endWindow, if not ignoring window, then insert bulk document list
*/
@Override
public void endWindow()
{
if (ignoreWindow) {
return;
}
BasicDBObject where = new BasicDBObject(); // update maxWindowTable for windowId information
where.put(operatorIdColumnName, operatorId);
BasicDBObject value = new BasicDBObject();
value.put(operatorIdColumnName, operatorId);
value.put(windowIdColumnName, windowId);
maxWindowCollection.update(where, value);
for (String table : tableList) {
List<DBObject> docList = tableToDocumentList.get(table);
db.getCollection(table).insert(docList);
}
}
/**
* At setup time, init last completed windowId from maxWindowTable
*
* @param context
*/
@Override
public void setup(OperatorContext context)
{
operatorId = context.getId();
try {
mongoClient = new MongoClient(hostName);
db = mongoClient.getDB(dataBase);
if (userName != null && passWord != null) {
db.authenticate(userName, passWord.toCharArray());
}
initLastWindowInfo();
for (String table : tableList) {
tableToDocumentList.put(table, new ArrayList<DBObject>());
tableToDocument.put(table, new BasicDBObject());
}
}
catch (UnknownHostException ex) {
logger.debug(ex.toString());
}
}
abstract public void setColumnMapping(String[] mapping);
@Override
public void teardown()
{
}
/**
* shared processTuple for HashMap and ArrayList output Operator.
*/
public void processTupleCommon()
{
ByteBuffer bb = ByteBuffer.allocate(12);
bb.order(ByteOrder.BIG_ENDIAN);
if (queryFunction == 1) {
insertFunction1(bb);
}
else if (queryFunction == 2) {
insertFunction2(bb);
}
else if (queryFunction == 3) {
insertFunction3(bb);
}
else {
throw new RuntimeException("unknown insertFunction type:" + queryFunction);
}
// String str = Hex.encodeHexString(bb.array());
StringBuilder objStr = new StringBuilder();
for (byte b : bb.array()) {
objStr.append(String.format("%02x", b & 0xff));
}
BasicDBObject doc = null;
for (Map.Entry<String, BasicDBObject> entry : tableToDocument.entrySet()) {
String table = entry.getKey();
doc = entry.getValue();
doc.put("_id", new ObjectId(objStr.toString()));
List<DBObject> docList = tableToDocumentList.get(table);
docList.add(doc);
if (tupleId % batchSize == 0) { // do batch insert here
BasicDBObject where = new BasicDBObject(); // update maxWindowTable for windowId information
where.put(operatorIdColumnName, operatorId);
BasicDBObject value = new BasicDBObject();
value.put(operatorIdColumnName, operatorId);
value.put(windowIdColumnName, windowId);
maxWindowCollection.update(where, value);
db.getCollection(table).insert(docList);
tableToDocumentList.put(table, new ArrayList<DBObject>());
}
else {
tableToDocumentList.put(table, docList);
}
}
++tupleId;
}
/**
* 8B windowId | 1B opratorId | 3B tupleId
*/
public void queryFunction1(ByteBuffer bb, StringBuilder high, StringBuilder low)
{
bb.putLong(windowId);
byte opId = (byte)(operatorId);
bb.put(opId);
ByteBuffer lowbb = bb;
lowbb.put((byte)0);
lowbb.put((byte)0);
lowbb.put((byte)0);
// String str = Hex.encodeHexString(lowbb.array());
for (byte b : lowbb.array()) {
low.append(String.format("02x", b & 0xff));
}
ByteBuffer highbb = bb;
highbb.put((byte)0xff);
highbb.put((byte)0xff);
highbb.put((byte)0xff);
for (byte b : highbb.array()) {
high.append(String.format("02x", b & 0xff));
}
}
/**
* 4B baseSec | 2B windowId | 3B operatorId | 3B tupleId
*/
public void queryFunction2(ByteBuffer bb, StringBuilder high, StringBuilder low)
{
int baseSec = (int)(windowId >> 32);
bb.putInt(baseSec);
short winId = (short)(windowId & 0xffff);
bb.putShort(winId);
Integer operId = operatorId;
for (int i = 0; i < 3; i++) {
byte num = (byte)(operId >> 8 * (2 - i));
bb.put(num);
}
ByteBuffer lowbb = bb.duplicate();
lowbb.put((byte)0);
lowbb.put((byte)0);
lowbb.put((byte)0);
for (byte b : lowbb.array()) {
low.append(String.format("%02x", b & 0xff));
}
ByteBuffer highbb = bb.duplicate();
highbb.put((byte)0xff);
highbb.put((byte)0xff);
highbb.put((byte)0xff);
for (byte b : highbb.array()) {
high.append(String.format("%02x", b & 0xff));
}
}
/**
* 4B baseSec | 3B operatorId | 2B windowId | 3B tupleId
*/
public void queryFunction3(ByteBuffer bb, StringBuilder high, StringBuilder low)
{
int baseSec = (int)(windowId >> 32);
bb.putInt(baseSec);
Integer operId = operatorId;
for (int i = 0; i < 3; i++) {
byte num = (byte)(operId >> 8 * (2 - i));
bb.put(num);
}
short winId = (short)(windowId & 0xffff);
bb.putShort(winId);
ByteBuffer lowbb = bb.duplicate();
lowbb.put((byte)0);
lowbb.put((byte)0);
lowbb.put((byte)0);
for (byte b : lowbb.array()) {
low.append(String.format("%02x", b & 0xff));
}
ByteBuffer highbb = bb.duplicate();
highbb.put((byte)0xff);
highbb.put((byte)0xff);
highbb.put((byte)0xff);
for (byte b : highbb.array()) {
high.append(String.format("%02x", b & 0xff));
}
}
/**
* 8B windowId | 1B operatorId | 3B tupleId
*/
void insertFunction1(ByteBuffer bb)
{
bb.putLong(windowId);
byte oid = (byte)(operatorId);
bb.put(oid);
for (int i = 0; i < 3; i++) {
byte num = (byte)(tupleId >> 8 * (2 - i));
bb.put(num);
}
}
/**
* 4B baseSec | 3B operatorId | 2B windowId | 3B tupleId
*/
void insertFunction2(ByteBuffer bb)
{
int baseSec = (int)(windowId >> 32);
bb.putInt(baseSec);
Integer operId = operatorId;
for (int i = 0; i < 3; i++) {
byte num = (byte)(operId >> 8 * (2 - i));
bb.put(num);
}
bb.putShort((short)(windowId & 0xffff));
for (int i = 0; i < 3; i++) {
byte num = (byte)(tupleId >> 8 * (2 - i));
bb.put(num);
}
}
/**
* 4B baseSec | 2B windowId | 3B operatorId | 3B tupleId
*/
void insertFunction3(ByteBuffer bb)
{
int baseSec = (int)(windowId >> 32);
bb.putInt(baseSec);
short winId = (short)(windowId & 0xffff);
bb.putShort(winId);
Integer operId = operatorId;
for (int i = 0; i < 3; i++) {
byte num = (byte)(operId >> 8 * (2 - i));
bb.put(num);
}
for (int i = 0; i < 3; i++) {
byte num = (byte)(tupleId >> 8 * (2 - i));
bb.put(num);
}
}
public void addTable(String table)
{
tableList.add(table);
}
public ArrayList<String> getTableList()
{
return tableList;
}
public void setQueryFunction(int queryFunction)
{
this.queryFunction = queryFunction;
}
public long getBatchSize()
{
return batchSize;
}
public void setBatchSize(long batchSize)
{
this.batchSize = batchSize;
}
public String getMaxWindowTable()
{
return maxWindowTable;
}
public void setMaxWindowTable(String maxWindowTable)
{
this.maxWindowTable = maxWindowTable;
}
public String getWindowIdColumnName()
{
return windowIdColumnName;
}
public void setWindowIdColumnName(String windowIdColumnName)
{
this.windowIdColumnName = windowIdColumnName;
}
public String getOperatorIdColumnName()
{
return operatorIdColumnName;
}
public void setOperatorIdColumnName(String operatorIdColumnName)
{
this.operatorIdColumnName = operatorIdColumnName;
}
public long getLastWindowId()
{
return lastWindowId;
}
public void setLastWindowId(long lastWindowId)
{
this.lastWindowId = lastWindowId;
}
}