blob: 91e3b173453664ae21a5d2380300e6452c3d7680 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.apex.malhar.contrib.couchbase;
import java.io.IOException;
import java.util.Iterator;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;
import org.codehaus.jackson.map.ObjectMapper;
import org.apache.apex.malhar.lib.db.AbstractStoreInputOperator;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import com.couchbase.client.protocol.views.Query;
import com.couchbase.client.protocol.views.Stale;
import com.couchbase.client.protocol.views.View;
import com.couchbase.client.protocol.views.ViewResponse;
import com.couchbase.client.protocol.views.ViewRow;
import com.datatorrent.api.Context.OperatorContext;
/**
* <p>
* CouchBasePOJOInputOperator</p>
* A generic implementation of AbstractStoreInputOperator that fetches rows of data from Couchbase store and emits them as POJOs.
* Each row is converted to a POJO.User needs to specify the design document name and view name against which he wants to query.
* User should also provide a mapping function to fetch the specific fields from database.The query is generated using the mapping
* function on top of the view.User has the option to specify the start key and limit of number of documents he wants to view.
* He can also specify whether he wants to view results in descending order or not.
* The start value is continuously updated with the value of the key of the last row from the result of the previous run of the query.
* Example:
* function (doc) {
* emit(doc._id, [doc.username, doc.first_name, doc.last_name, doc.last_login]);
* }
*
* @displayName Couchbase Input Operator
* @category Input
* @tags database, nosql, pojo, couchbase
* @since 3.0.0
*/
@Evolving
public class CouchBasePOJOInputOperator extends AbstractStoreInputOperator<Object, CouchBaseStore>
{
private transient Class<?> className = null;
private transient Query query;
private final transient ObjectMapper objectMapper;
//User has the option to specify a start key.
private String startkey;
@Min(1)
private int limit = 10;
private String startDocId;
@NotNull
private String designDocumentName;
@NotNull
private String viewName;
private int skip = 0;
private transient View view;
/*
* POJO class which is generated as output from this operator.
* Example:
* public class TestPOJO{ int intfield; public int getInt(){} public void setInt(){} }
* outputClass = TestPOJO
* POJOs will be generated on fly in later implementation.
*/
private String outputClass;
//User gets the option to specify the order of documents.
private boolean descending;
public boolean isDescending()
{
return descending;
}
public void setDescending(boolean descending)
{
this.descending = descending;
}
public String getOutputClass()
{
return outputClass;
}
public void setOutputClass(String outputClass)
{
this.outputClass = outputClass;
}
public String getStartDocId()
{
return startDocId;
}
public void setStartDocId(String startDocId)
{
this.startDocId = startDocId;
}
/*
* Name of the design document in which the view to be queried is added.
*/
public String getDesignDocumentName()
{
return designDocumentName;
}
public void setDesignDocumentName(String designDocumentName)
{
this.designDocumentName = designDocumentName;
}
/*
* Name of the view against which a user wants to query.
*/
public String getViewName()
{
return viewName;
}
public void setViewName(String viewName)
{
this.viewName = viewName;
}
public int getLimit()
{
return limit;
}
public void setLimit(int limit)
{
this.limit = limit;
}
public String getStartkey()
{
return startkey;
}
public void setStartkey(String startkey)
{
this.startkey = startkey;
}
public CouchBasePOJOInputOperator()
{
objectMapper = new ObjectMapper();
store = new CouchBaseStore();
}
@Override
public void setup(OperatorContext context)
{
super.setup(context);
try {
className = Class.forName(outputClass);
} catch (ClassNotFoundException ex) {
throw new RuntimeException(ex);
}
view = store.getInstance().getView(designDocumentName, viewName);
query = new Query();
query.setStale(Stale.FALSE);
query.setIncludeDocs(true);
query.setLimit(limit);
query.setDescending(descending);
}
@Override
public void emitTuples()
{
if (startkey != null) {
query.setRangeStart(startkey);
}
if (skip == 1) {
query.setSkip(skip);
}
ViewResponse result = store.getInstance().query(view, query);
Iterator<ViewRow> iterRow = result.iterator();
while (iterRow.hasNext()) {
ViewRow row = iterRow.next();
Object document = row.getDocument();
Object outputObj = null;
try {
outputObj = objectMapper.readValue(document.toString(), className);
} catch (IOException ex) {
throw new RuntimeException(ex);
}
outputPort.emit(outputObj);
//Update start key if it is last row
if (!iterRow.hasNext()) {
startkey = row.getKey();
skip = 1;
}
}
}
}