blob: 453da80bcfdd1f535d09b5e255daafb6f8751bf4 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.apex.malhar.lib.join;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.joda.time.Duration;
import org.apache.apex.malhar.lib.state.managed.ManagedTimeStateImpl;
import org.apache.apex.malhar.lib.state.managed.ManagedTimeStateMultiValue;
import org.apache.apex.malhar.lib.state.managed.MovingBoundaryTimeBucketAssigner;
import org.apache.apex.malhar.lib.state.spillable.Spillable;
import org.apache.hadoop.fs.Path;
import com.google.common.collect.Maps;
import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.api.Operator;
import com.datatorrent.lib.fileaccess.FileAccessFSImpl;
/**
* An abstract implementation of inner join operator over Managed state which extends from
* AbstractInnerJoinOperator.
*
* <b>Properties:</b><br>
* <b>noOfBuckets</b>: Number of buckets required for Managed state. <br>
* <b>bucketSpanTime</b>: Indicates the length of the time bucket. <br>
*
* @since 3.5.0
*/
@org.apache.hadoop.classification.InterfaceStability.Evolving
public abstract class AbstractManagedStateInnerJoinOperator<K,T> extends AbstractInnerJoinOperator<K,T> implements
Operator.CheckpointNotificationListener, Operator.IdleTimeHandler
{
public static final String stateDir = "managedState";
public static final String stream1State = "stream1Data";
public static final String stream2State = "stream2Data";
private transient Map<JoinEvent<K,T>, Future<List>> waitingEvents = Maps.newLinkedHashMap();
private int noOfBuckets = 1;
private Long bucketSpanTime;
protected ManagedTimeStateImpl stream1Store;
protected ManagedTimeStateImpl stream2Store;
/**
* Create Managed states and stores for both the streams.
*/
@Override
public void createStores()
{
stream1Store = new ManagedTimeStateImpl();
stream2Store = new ManagedTimeStateImpl();
stream1Store.setNumBuckets(noOfBuckets);
stream2Store.setNumBuckets(noOfBuckets);
assert stream1Store.getTimeBucketAssigner() == stream2Store.getTimeBucketAssigner();
if (bucketSpanTime != null) {
stream1Store.getTimeBucketAssigner().setBucketSpan(Duration.millis(bucketSpanTime));
}
if (stream1Store.getTimeBucketAssigner() instanceof MovingBoundaryTimeBucketAssigner) {
((MovingBoundaryTimeBucketAssigner)stream1Store.getTimeBucketAssigner()).setExpireBefore(Duration.millis(getExpiryTime()));
}
stream1Data = new ManagedTimeStateMultiValue(stream1Store, !isLeftKeyPrimary());
stream2Data = new ManagedTimeStateMultiValue(stream2Store, !isRightKeyPrimary());
}
/**
* Process the tuple which are received from input ports with the following steps:
* 1) Extract key from the given tuple
* 2) Insert <key,tuple> into the store where store is the stream1Data if the tuple
* receives from stream1 or viceversa.
* 3) Get the values of the key in asynchronous if found it in opposite store
* 4) If the future is done then Merge the given tuple and values found from step (3) otherwise
* put it in waitingEvents
* @param tuple given tuple
* @param isStream1Data Specifies whether the given tuple belongs to stream1 or not.
*/
@Override
protected void processTuple(T tuple, boolean isStream1Data)
{
Spillable.SpillableListMultimap<K,T> store = isStream1Data ? stream1Data : stream2Data;
K key = extractKey(tuple,isStream1Data);
long timeBucket = extractTime(tuple,isStream1Data);
if (!((ManagedTimeStateMultiValue)store).put(key, tuple,timeBucket)) {
return;
}
Spillable.SpillableListMultimap<K, T> valuestore = isStream1Data ? stream2Data : stream1Data;
Future<List> future = ((ManagedTimeStateMultiValue)valuestore).getAsync(key);
if (future.isDone()) {
try {
joinStream(tuple,isStream1Data, future.get());
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
} else {
waitingEvents.put(new JoinEvent<>(key,tuple,isStream1Data),future);
}
}
@Override
public void handleIdleTime()
{
if (waitingEvents.size() > 0) {
processWaitEvents(false);
}
}
@Override
public void beforeCheckpoint(long l)
{
stream1Store.beforeCheckpoint(l);
stream2Store.beforeCheckpoint(l);
}
@Override
public void checkpointed(long l)
{
stream1Store.checkpointed(l);
stream2Store.checkpointed(l);
}
@Override
public void committed(long l)
{
stream1Store.committed(l);
stream2Store.committed(l);
}
@Override
public void setup(Context.OperatorContext context)
{
super.setup(context);
((FileAccessFSImpl)stream1Store.getFileAccess()).setBasePath(context.getValue(DAG.APPLICATION_PATH) + Path.SEPARATOR + stateDir + Path.SEPARATOR + String.valueOf(context.getId()) + Path.SEPARATOR + stream1State);
((FileAccessFSImpl)stream2Store.getFileAccess()).setBasePath(context.getValue(DAG.APPLICATION_PATH) + Path.SEPARATOR + stateDir + Path.SEPARATOR + String.valueOf(context.getId()) + Path.SEPARATOR + stream2State);
stream1Store.getCheckpointManager().setStatePath("managed_state_" + stream1State);
stream1Store.getCheckpointManager().setStatePath("managed_state_" + stream2State);
stream1Store.setup(context);
stream2Store.setup(context);
}
@Override
public void beginWindow(long windowId)
{
stream1Store.beginWindow(windowId);
stream2Store.beginWindow(windowId);
super.beginWindow(windowId);
}
/**
* Process the waiting events
* @param finalize finalize Whether or not to wait for future to return
*/
private void processWaitEvents(boolean finalize)
{
Iterator<Map.Entry<JoinEvent<K,T>, Future<List>>> waitIterator = waitingEvents.entrySet().iterator();
while (waitIterator.hasNext()) {
Map.Entry<JoinEvent<K,T>, Future<List>> waitingEvent = waitIterator.next();
Future<List> future = waitingEvent.getValue();
if (future.isDone() || finalize) {
try {
JoinEvent<K,T> event = waitingEvent.getKey();
joinStream(event.value,event.isStream1Data,future.get());
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException("end window", e);
}
waitIterator.remove();
if (!finalize) {
break;
}
}
}
}
@Override
public void endWindow()
{
processWaitEvents(true);
stream1Store.endWindow();
stream2Store.endWindow();
super.endWindow();
}
@Override
public void teardown()
{
stream1Store.teardown();
stream2Store.teardown();
super.teardown();
}
/**
* Return the number of buckets
* @return the noOfBuckets
*/
public int getNoOfBuckets()
{
return noOfBuckets;
}
/**
* Set the number of buckets required for managed state
* @param noOfBuckets noOfBuckets
*/
public void setNoOfBuckets(int noOfBuckets)
{
this.noOfBuckets = noOfBuckets;
}
/**
* Return the bucketSpanTime
* @return the bucketSpanTime
*/
public Long getBucketSpanTime()
{
return bucketSpanTime;
}
/**
* Sets the length of the time bucket required for managed state.
* @param bucketSpanTime given bucketSpanTime
*/
public void setBucketSpanTime(Long bucketSpanTime)
{
this.bucketSpanTime = bucketSpanTime;
}
public static class JoinEvent<K,T>
{
public K key;
public T value;
public boolean isStream1Data;
public JoinEvent(K key, T value, boolean isStream1Data)
{
this.key = key;
this.value = value;
this.isStream1Data = isStream1Data;
}
}
}