blob: b13e2d7bbb6439f714bbdcfdef64ac1b43237d02 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.datatorrent.lib.appdata.query;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.lib.appdata.QueueUtils.ConditionBarrier;
import com.datatorrent.lib.appdata.query.QueueList.QueueListNode;
/**
* This is an abstract implementation of a QueueManager which works in the following way.
* <br/>
* <br/>
* <ul>
* <li>All the queries are kept in a linked list.</li>
* <li>A pointer points to the next query to pop from the queue.</li>
* <li>If all the queries are popped within an application window, then no more queries can be popped from the queue</li>
* <li>When {@link #endWindow} is called</li>
* </ul>
* @param <QUERY_TYPE> The type of the query.
* @param <META_QUERY> The type of any metadata associated with the query.
* @param <QUEUE_CONTEXT> The type of the context used to manage the queueing of the query.
* @since 3.0.0
*/
public abstract class AbstractWindowEndQueueManager<QUERY_TYPE, META_QUERY, QUEUE_CONTEXT> implements QueueManager<QUERY_TYPE, META_QUERY, QUEUE_CONTEXT>
{
/**
* The {@link QueueList} which is backing this {@link QueueManager}.
*/
protected QueueList<QueryBundle<QUERY_TYPE, META_QUERY, QUEUE_CONTEXT>> queryQueue = new QueueList<>();
/**
* A pointer to the current node in the {@link QueueList}.
*/
private QueueListNode<QueryBundle<QUERY_TYPE, META_QUERY, QUEUE_CONTEXT>> currentNode;
private boolean readCurrent = false;
private final Semaphore semaphore = new Semaphore(0);
private final ConditionBarrier conditionBarrier = new ConditionBarrier();
private final AtomicInteger numLeft = new AtomicInteger();
/**
* Creates a new QueueManager.
*/
public AbstractWindowEndQueueManager()
{
//Do nothing
}
@Override
public boolean enqueue(QUERY_TYPE query, META_QUERY metaQuery, QUEUE_CONTEXT context)
{
Preconditions.checkNotNull(query);
conditionBarrier.gate();
return enqueueHelper(query, metaQuery, context);
}
/**
* This is a helper method which enqueues the query.
* @param query The query to enqueue.
* @param metaQuery Any meta data associated with the query.
* @param context The context used to manage queuing the query.
* @return True if the query was successfully enqueued. False otherwise.
*/
private boolean enqueueHelper(QUERY_TYPE query, META_QUERY metaQuery, QUEUE_CONTEXT context)
{
QueryBundle<QUERY_TYPE, META_QUERY, QUEUE_CONTEXT> queryQueueable = new QueryBundle<>(query, metaQuery, context);
QueueListNode<QueryBundle<QUERY_TYPE, META_QUERY, QUEUE_CONTEXT>> node = new QueueListNode<>(queryQueueable);
synchronized (numLeft) {
if (addingFilter(queryQueueable)) {
queryQueue.enqueue(node);
numLeft.getAndIncrement();
semaphore.release();
addedNode(node);
}
}
return true;
}
@Override
public QueryBundle<QUERY_TYPE, META_QUERY, QUEUE_CONTEXT> dequeue()
{
return dequeueHelper(false);
}
@Override
public QueryBundle<QUERY_TYPE, META_QUERY, QUEUE_CONTEXT> dequeueBlock()
{
return dequeueHelper(true);
}
/**
* This is a helper method for dequeueing queries.
* @return {@link QueryBundle} containing the dequeued query and its associated metadata.
*/
private QueryBundle<QUERY_TYPE, META_QUERY, QUEUE_CONTEXT> dequeueHelper(boolean block)
{
QueryBundle<QUERY_TYPE, META_QUERY, QUEUE_CONTEXT> qq = null;
boolean first = true;
if (block) {
acquire();
}
if (currentNode == null) {
currentNode = queryQueue.getHead();
readCurrent = false;
if (currentNode == null) {
return null;
}
} else {
if (readCurrent) {
QueueListNode<QueryBundle<QUERY_TYPE, META_QUERY, QUEUE_CONTEXT>> tempNode = currentNode.getNext();
if (tempNode != null) {
currentNode = tempNode;
readCurrent = false;
} else {
return null;
}
}
}
while (true) {
if (block && !first) {
acquire();
//TODO dedup this code
if (currentNode == null) {
currentNode = queryQueue.getHead();
readCurrent = false;
if (currentNode == null) {
return null;
}
} else {
if (readCurrent) {
QueueListNode<QueryBundle<QUERY_TYPE, META_QUERY, QUEUE_CONTEXT>> tempNode = currentNode.getNext();
if (tempNode != null) {
currentNode = tempNode;
readCurrent = false;
} else {
return null;
}
}
}
}
synchronized (numLeft) {
numLeft.getAndDecrement();
QueryBundle<QUERY_TYPE, META_QUERY, QUEUE_CONTEXT> queryQueueable = currentNode.getPayload();
first = false;
QueueListNode<QueryBundle<QUERY_TYPE, META_QUERY, QUEUE_CONTEXT>> nextNode = currentNode.getNext();
if (removeBundle(queryQueueable)) {
queryQueue.removeNode(currentNode);
removedNode(currentNode);
if (block) {
if (nextNode == null) {
readCurrent = true;
} else {
currentNode = nextNode;
readCurrent = false;
}
} else {
if (nextNode == null) {
readCurrent = true;
break;
} else {
currentNode = nextNode;
}
}
} else {
qq = currentNode.getPayload();
if (nextNode == null) {
readCurrent = true;
} else {
currentNode = nextNode;
readCurrent = false;
}
break;
}
}
}
//Handle the case where non blocking dequeue is happening, semaphore needs to be synched up.
if (semaphore.availablePermits() > numLeft.get()) {
try {
semaphore.acquire(semaphore.availablePermits() - numLeft.get());
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
}
return qq;
}
//Dirty hack TODO fix QueueManager interface
public boolean isEmptyAndBlocked()
{
return numLeft.get() == 0 && semaphore.availablePermits() == 0 && semaphore.hasQueuedThreads();
}
private void acquire()
{
try {
semaphore.acquire();
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
}
/**
* This filter is used to determine whether or not the given {@link QueryBundle} should be added to the queue or not.
* @param queryBundle The {@link QueryBundle} to determine whether or not to add to the queue.
* @return True if the given {@link QueryBundle} should be added to the queue. False otherwise.
*/
public abstract boolean addingFilter(QueryBundle<QUERY_TYPE, META_QUERY, QUEUE_CONTEXT> queryBundle);
/**
* This callback is called when a new query is enqueued.
* @param queryQueueable The node that was added to the {@link QueueList} of the queue.
*/
public abstract void addedNode(QueueListNode<QueryBundle<QUERY_TYPE, META_QUERY, QUEUE_CONTEXT>> queryQueueable);
/**
* This callback is called with the given node is removed from the queue.
* @param queryQueueable The node that was removed from the {@link QueueList}.
*/
public abstract void removedNode(QueueListNode<QueryBundle<QUERY_TYPE, META_QUERY, QUEUE_CONTEXT>> queryQueueable);
/**
* This is called to determine if the given query should be removed from the queue.
* @param queryQueueable The {@link QueryBundle} whose removal from the queue needs to be determined.
* @return True if the given query should be remove. False otherwise.
*/
public abstract boolean removeBundle(QueryBundle<QUERY_TYPE, META_QUERY, QUEUE_CONTEXT> queryQueueable);
@Override
public void setup(OperatorContext context)
{
}
@Override
public void beginWindow(long windowId)
{
synchronized (numLeft) {
currentNode = queryQueue.getHead();
readCurrent = false;
numLeft.set(queryQueue.getSize());
semaphore.drainPermits();
semaphore.release(queryQueue.getSize());
}
}
@Override
public void endWindow()
{
}
@Override
public void teardown()
{
}
@Override
public int getNumLeft()
{
return numLeft.get();
}
@VisibleForTesting
int getNumPermits()
{
return semaphore.availablePermits();
}
@Override
public void haltEnqueue()
{
conditionBarrier.lock();
}
@Override
public void resumeEnqueue()
{
conditionBarrier.unlock();
}
private static final Logger LOG = LoggerFactory.getLogger(AbstractWindowEndQueueManager.class);
}