blob: ef3be01f20c60f045ed930bab3013ea30250927a [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.datatorrent.contrib.couchbase;
import java.util.HashMap;
import java.util.Iterator;
import java.util.TreeMap;
import net.spy.memcached.internal.OperationCompletionListener;
import net.spy.memcached.internal.OperationFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.datatorrent.lib.db.AbstractAggregateTransactionableStoreOutputOperator;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.netlet.util.DTThrowable;
/**
* AbstractCouchBaseOutputOperator which extends Transactionable Store Output Operator.
* Classes extending from this operator should implement the abstract functionality of generateKey, getValue and insertOrUpdate.
* Insert and Update couchbase operators extend from this class.
* If the processing mode is EXACTLY ONCE, then in case of update the functionality will always be correct since it will overwrite the same keys
* but for inserts there might be duplicates when operator fails.
*
* @since 2.0.0
*/
public abstract class AbstractCouchBaseOutputOperator<T> extends AbstractAggregateTransactionableStoreOutputOperator<T, CouchBaseWindowStore>
{
private static final transient Logger logger = LoggerFactory.getLogger(AbstractCouchBaseOutputOperator.class);
protected transient HashMap<OperationFuture<Boolean>, Long> mapFuture;
protected int numTuples;
protected CouchBaseSerializer serializer;
protected TreeMap<Long, T> mapTuples;
protected long id = 0;
private final transient CompletionListener listener;
private transient boolean failure;
private final transient Object syncObj;
public AbstractCouchBaseOutputOperator()
{
mapFuture = new HashMap<OperationFuture<Boolean>, Long>();
mapTuples = new TreeMap<Long, T>();
store = new CouchBaseWindowStore();
listener = new CompletionListener();
numTuples = 0;
syncObj = new Object();
}
@Override
@SuppressWarnings("unchecked")
public void setup(OperatorContext context)
{
ProcessingMode mode = context.getValue(OperatorContext.PROCESSING_MODE);
if (mode == ProcessingMode.AT_MOST_ONCE) {
//map must be cleared to avoid writing same data twice
if (numTuples == 0) {
mapTuples.clear();
}
}
numTuples = 0;
//atleast once, check leftovers in map and send them to couchbase.
if (!mapTuples.isEmpty()) {
Iterator<?> itr = mapTuples.values().iterator();
while (itr.hasNext()) {
processTuple((T)itr.next());
}
}
super.setup(context);
}
@Override
public void beginWindow(long windowId)
{
numTuples = 0;
super.beginWindow(windowId);
}
@Override
public void processTuple(T tuple)
{
if (failure) {
throw new RuntimeException("Operation Failed");
}
waitForQueueSize(store.queueSize - 1);
setKeyValueInCouchBase(tuple);
}
public void setKeyValueInCouchBase(T tuple)
{
id++;
String key = getKey(tuple);
Object value = getValue(tuple);
if(!(value instanceof Boolean) && !(value instanceof Integer) && !(value instanceof String) && !(value instanceof Float) && !(value instanceof Double) && !(value instanceof Character) && !(value instanceof Long) && !(value instanceof Short) && !(value instanceof Byte)){
if (serializer != null) {
value = serializer.serialize(value);
}
}
OperationFuture<Boolean> future = processKeyValue(key, value);
synchronized (syncObj) {
future.addListener(listener);
mapFuture.put(future, id);
if (!mapTuples.containsKey(id)) {
mapTuples.put(id, tuple);
}
numTuples++;
}
}
@Override
public void storeAggregate()
{
waitForQueueSize(0);
id = 0;
}
public void waitForQueueSize(int sizeOfQueue)
{
long startTms = System.currentTimeMillis();
long elapsedTime ;
while (numTuples > sizeOfQueue) {
synchronized (syncObj) {
if (numTuples > sizeOfQueue) {
try {
elapsedTime = System.currentTimeMillis() - startTms;
if (elapsedTime >= store.timeout) {
throw new RuntimeException("Timed out waiting for space in queue");
}
else {
syncObj.wait(store.timeout - elapsedTime);
}
}
catch (InterruptedException ex) {
DTThrowable.rethrow(ex);
}
}
}
elapsedTime = System.currentTimeMillis() - startTms;
if (elapsedTime >= store.timeout) {
throw new RuntimeException("Timed out waiting for space in queue");
}
}
}
protected class CompletionListener implements OperationCompletionListener
{
@Override
public void onComplete(OperationFuture<?> f) throws Exception
{
if (!((Boolean)f.get())) {
logger.error("Operation failed {}" , f);
failure = true;
return;
}
synchronized (syncObj) {
long idProcessed = mapFuture.get(f);
mapTuples.remove(idProcessed);
mapFuture.remove(f);
numTuples--;
syncObj.notify();
}
}
}
public CouchBaseSerializer getSerializer()
{
return serializer;
}
public void setSerializer(CouchBaseSerializer serializer)
{
this.serializer = serializer;
}
public abstract String getKey(T tuple);
public abstract Object getValue(T tuple);
protected abstract OperationFuture<Boolean> processKeyValue(String key, Object value);
}