blob: 21087963b6f462880953875c913ab3e45cb794b4 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.apex.malhar.lib.io.fs;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.Maps;
import com.google.common.collect.Queues;
import com.datatorrent.api.AutoMetric;
import com.datatorrent.api.Context;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.Operator.CheckpointNotificationListener;
import com.datatorrent.api.Operator.IdleTimeHandler;
import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.common.util.NameableThreadFactory;
import com.datatorrent.netlet.util.DTThrowable;
/**
* This base operator queues input tuples for each window and asynchronously processes them after the window is committed.
*
* The operator holds all the tuple info in memory until the committed window and then calls the processCommittedData method
* to give an opportunity to process tuple info from each committed window.
*
* This operator can be implemented to asynchronously read and process data that is being written by current application.
*
* Use case examples: write to relational database, write to an external queue etc without blocking the dag i/o.
*
* @param <INPUT> input type
* @param <QUEUETUPLE> tuple enqueued each window to be processed after window is committed
* @since 2.0.0
*/
@org.apache.hadoop.classification.InterfaceStability.Evolving
public abstract class AbstractReconciler<INPUT, QUEUETUPLE> extends BaseOperator implements CheckpointNotificationListener, IdleTimeHandler
{
private static final Logger logger = LoggerFactory.getLogger(AbstractReconciler.class);
public transient DefaultInputPort<INPUT> input = new DefaultInputPort<INPUT>()
{
@Override
public void process(INPUT input)
{
processTuple(input);
}
};
protected transient ExecutorService executorService;
protected long currentWindowId;
protected transient int spinningTime = 10;
// this stores the mapping from the window to the list of enqueued tuples
private Map<Long, List<QUEUETUPLE>> currentWindowTuples = Maps.newConcurrentMap();
private Queue<Long> currentWindows = Queues.newLinkedBlockingQueue();
protected Queue<QUEUETUPLE> committedTuples = Queues.newLinkedBlockingQueue();
protected transient Queue<QUEUETUPLE> doneTuples = Queues.newLinkedBlockingQueue();
private transient Queue<QUEUETUPLE> waitingTuples = Queues.newLinkedBlockingQueue();
private transient volatile boolean execute;
private transient AtomicReference<Throwable> cause;
@AutoMetric
private long queueLength;
@Override
public void setup(Context.OperatorContext context)
{
if (context != null) {
spinningTime = context.getValue(Context.OperatorContext.SPIN_MILLIS);
}
execute = true;
cause = new AtomicReference<Throwable>();
waitingTuples.addAll(committedTuples);
executorService = Executors.newSingleThreadExecutor(new NameableThreadFactory("Reconciler-Helper"));
executorService.submit(processEnqueuedData());
}
@Override
public void beginWindow(long windowId)
{
currentWindowId = windowId;
currentWindowTuples.put(currentWindowId, new ArrayList<QUEUETUPLE>());
currentWindows.add(windowId);
}
@Override
public void endWindow()
{
while (doneTuples.peek() != null) {
committedTuples.remove(doneTuples.poll());
}
}
@Override
public void handleIdleTime()
{
if (execute) {
try {
Thread.sleep(spinningTime);
} catch (InterruptedException ie) {
throw new RuntimeException(ie);
}
} else {
logger.error("Exception: ", cause);
DTThrowable.rethrow(cause.get());
}
}
@Override
public void beforeCheckpoint(long l)
{
}
@Override
public void checkpointed(long l)
{
}
@Override
public void committed(long l)
{
logger.debug(" current committed window {}", l);
if (currentWindows.isEmpty()) {
return;
}
long processedWindowId = currentWindows.peek();
while (processedWindowId <= l) {
List<QUEUETUPLE> outputDataList = currentWindowTuples.get(processedWindowId);
if (outputDataList != null && !outputDataList.isEmpty()) {
committedTuples.addAll(outputDataList);
waitingTuples.addAll(outputDataList);
}
currentWindows.remove();
currentWindowTuples.remove(processedWindowId);
if (currentWindows.isEmpty()) {
return;
}
processedWindowId = currentWindows.peek();
}
}
@Override
public void teardown()
{
execute = false;
executorService.shutdownNow();
}
private Runnable processEnqueuedData()
{
return new Runnable()
{
@Override
public void run()
{
try {
while (execute) {
while (waitingTuples.isEmpty()) {
Thread.sleep(spinningTime);
}
QUEUETUPLE output = waitingTuples.remove();
processCommittedData(output);
--queueLength;
doneTuples.add(output);
}
} catch (Throwable e) {
cause.set(e);
execute = false;
}
}
};
}
/**
* The implementation class should call this method to enqueue output once input is converted to queue input.
*
* The queueTuple is processed once the window in which queueTuple is enqueued is committed.
*
* @param queueTuple
*/
protected void enqueueForProcessing(QUEUETUPLE queueTuple)
{
currentWindowTuples.get(currentWindowId).add(queueTuple);
++queueLength;
}
/**
* Process input tuple
*
* @param input
*/
protected abstract void processTuple(INPUT input);
/**
* This method is called once the window in which queueTuple was created is committed.
* Implement this method to define the functionality to synchronize data.
*
* @param queueInput
*/
protected abstract void processCommittedData(QUEUETUPLE queueInput);
}