blob: 816ca58ebd2e00b620c9e4c14f94c6957d1dcd30 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.apex.malhar.lib.join;
import java.util.Arrays;
import java.util.List;
import javax.validation.constraints.NotNull;
import org.apache.apex.malhar.lib.state.spillable.Spillable;
import org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponent;
import org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponentImpl;
import org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableStateStore;
import com.google.common.base.Preconditions;
import com.datatorrent.api.AutoMetric;
import com.datatorrent.api.Context;
import com.datatorrent.api.annotation.OperatorAnnotation;
import com.datatorrent.common.util.BaseOperator;
/**
* <p>
* An abstract implementation of inner join operator. Operator receives tuples from two streams,
* applies the join operation based on constraint and emit the joined value.
* Concrete classes should provide implementation to extractKey, extractTime, mergeTuples methods.
*
* <b>Properties:</b><br>
* <b>includeFieldStr</b>: List of comma separated fields to be added to the output tuple.
* Ex: Field1,Field2;Field3,Field4<br>
* <b>leftKeyExpression</b>: key field expression for stream1.<br>
* <b>rightKeyExpression</b>: key field expression for stream2.<br>
* <b>timeFields</b>: List of comma separated time field for both the streams. Ex: Field1,Field2<br>
* <b>expiryTime</b>: Expiry time in milliseconds for stored tuples which comes from both streams<br>
* <b>isLeftKeyPrimary</b>: : Specifies whether the left key(Stream1 key) is primary or not<br>
* <b>isRightKeyPrimary</b>: : Specifies whether the right key(stream2 key) is primary or not<br>
*
* <b> Example: </b> <br>
* Left input port receives customer details and right input port receives Order details.
* Schema for the Customer be in the form of {ID, Name, CTime}
* Schema for the Order be in the form of {OID, CID, OTime}
* Now, Join the tuples of Customer and Order streams where Customer.ID = Order.CID and the constraint is
* matched tuples must have timestamp within 5 minutes.
* Here, leftKeyExpression = ID, rightKeyExpression = CID and Time Fields = CTime,
* OTime, expiryTime = 5 minutes </b> <br>
*
* @displayName Abstract Inner Join Operator
* @tags join
*
* @since 3.5.0
*/
@org.apache.hadoop.classification.InterfaceStability.Evolving
@OperatorAnnotation(checkpointableWithinAppWindow = false)
public abstract class AbstractInnerJoinOperator<K,T> extends BaseOperator
{
@NotNull
private String leftKeyExpression;
@NotNull
private String rightKeyExpression;
protected transient String[][] includeFields;
protected transient List<String> keyFieldExpressions;
protected transient List<String> timeFields;
@AutoMetric
private long tuplesJoinedPerSec;
private double windowTimeSec;
private int tuplesCount;
@NotNull
private String includeFieldStr;
private String timeFieldsStr;
@NotNull
private Long expiryTime;
private boolean isLeftKeyPrimary = false;
private boolean isRightKeyPrimary = false;
protected SpillableComplexComponent component;
protected Spillable.SpillableByteArrayListMultimap<K,T> stream1Data;
protected Spillable.SpillableByteArrayListMultimap<K,T> stream2Data;
/**
* Process the tuple which are received from input ports with the following steps:
* 1) Extract key from the given tuple
* 2) Insert <key,tuple> into the store where store is the stream1Data if the tuple
* receives from stream1 or viceversa.
* 3) Get the values of the key if found it in opposite store
* 4) Merge the given tuple and values found from step (3)
* @param tuple given tuple
* @param isStream1Data Specifies whether the given tuple belongs to stream1 or not.
*/
protected void processTuple(T tuple, boolean isStream1Data)
{
Spillable.SpillableByteArrayListMultimap<K,T> store = isStream1Data ? stream1Data : stream2Data;
K key = extractKey(tuple,isStream1Data);
if (!store.put(key, tuple)) {
return;
}
Spillable.SpillableByteArrayListMultimap<K, T> valuestore = isStream1Data ? stream2Data : stream1Data;
joinStream(tuple,isStream1Data, valuestore.get(key));
}
/**
* Merge the given tuple and list of values.
* @param tuple given tuple
* @param isStream1Data Specifies whether the given tuple belongs to stream1 or not.
* @param value list of tuples
*/
protected void joinStream(T tuple, boolean isStream1Data, List<T> value)
{
// Join the input tuple with the joined tuples
if (value != null) {
for (T joinedValue : value) {
T result = isStream1Data ? mergeTuples(tuple, joinedValue) :
mergeTuples(joinedValue, tuple);
if (result != null) {
tuplesCount++;
emitTuple(result);
}
}
}
}
@Override
public void setup(Context.OperatorContext context)
{
component = new SpillableComplexComponentImpl(new InMemSpillableStateStore());
if (stream1Data == null && stream2Data == null) {
createStores();
}
component.setup(context);
keyFieldExpressions = Arrays.asList(leftKeyExpression,rightKeyExpression);
if (timeFields != null) {
timeFields = Arrays.asList(timeFieldsStr.split(","));
}
String[] streamFields = includeFieldStr.split(";");
includeFields = new String[2][];
for (int i = 0; i < streamFields.length; i++) {
includeFields[i] = streamFields[i].split(",");
}
windowTimeSec = (context.getValue(Context.OperatorContext.APPLICATION_WINDOW_COUNT) *
context.getValue(Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS) * 1.0) / 1000.0;
}
@Override
public void beginWindow(long windowId)
{
component.beginWindow(windowId);
tuplesJoinedPerSec = 0;
tuplesCount = 0;
}
@Override
public void endWindow()
{
component.endWindow();
tuplesJoinedPerSec = (long)(tuplesCount / windowTimeSec);
}
@Override
public void teardown()
{
component.teardown();
}
/**
* Extract the key from the given tuple
* @param tuple given tuple
* @param isStream1Data Specifies whether the given tuple belongs to stream1 or not.
* @return the key
*/
public abstract K extractKey(T tuple, boolean isStream1Data);
/**
* Extract the time from the given tuple
* @param tuple given tuple
* @param isStream1Data Specifies whether the given tuple belongs to stream1 or not.
* @return the time
*/
public abstract long extractTime(T tuple, boolean isStream1Data);
/**
* Merge the given tuples
* @param tuple1 tuple belongs to stream1
* @param tuple2 tuple belongs to stream1
* @return the merge tuple
*/
public abstract T mergeTuples(T tuple1, T tuple2);
/**
* Emit the given tuple
* @param tuple given tuple
*/
public abstract void emitTuple(T tuple);
/**
* Create stores for both the streams
*/
public void createStores()
{
stream1Data = component.newSpillableByteArrayListMultimap(0,null,null);
stream2Data = component.newSpillableByteArrayListMultimap(0,null,null);
}
/**
* Get the left key expression
* @return the leftKeyExpression
*/
public String getLeftKeyExpression()
{
return leftKeyExpression;
}
/**
* Set the left key expression
* @param leftKeyExpression given leftKeyExpression
*/
public void setLeftKeyExpression(String leftKeyExpression)
{
this.leftKeyExpression = leftKeyExpression;
}
/**
* Get the right key expression
* @return the rightKeyExpression
*/
public String getRightKeyExpression()
{
return rightKeyExpression;
}
/**
* Set the right key expression
* @param rightKeyExpression given rightKeyExpression
*/
public void setRightKeyExpression(String rightKeyExpression)
{
this.rightKeyExpression = rightKeyExpression;
}
/**
* Return the include fields of two streams
* @return the includeFieldStr
*/
public String getIncludeFieldStr()
{
return includeFieldStr;
}
/**
* List of comma separated fields to be added to the output tuple.
* @param includeFieldStr given includeFieldStr
*/
public void setIncludeFieldStr(@NotNull String includeFieldStr)
{
this.includeFieldStr = Preconditions.checkNotNull(includeFieldStr);
}
/**
* Return the time fields for both the streams
* @return the timeFieldsStr
*/
public String getTimeFieldsStr()
{
return timeFieldsStr;
}
/**
* Set the time fields as comma separated for both the streams
* @param timeFieldsStr given timeFieldsStr
*/
public void setTimeFieldsStr(String timeFieldsStr)
{
this.timeFieldsStr = timeFieldsStr;
}
/**
* returns the expiry time
* @return the expiryTime
*/
public Long getExpiryTime()
{
return expiryTime;
}
/**
* Sets the expiry time
* @return the expiryTime
*/
public void setExpiryTime(@NotNull Long expiryTime)
{
this.expiryTime = Preconditions.checkNotNull(expiryTime);
}
/**
* return whether the left key is primary or not
* @return the isLeftKeyPrimary
*/
public boolean isLeftKeyPrimary()
{
return isLeftKeyPrimary;
}
/**
* Set the leftKeyPrimary
* @param leftKeyPrimary given leftKeyPrimary
*/
public void setLeftKeyPrimary(boolean leftKeyPrimary)
{
isLeftKeyPrimary = leftKeyPrimary;
}
/**
* return whether the right key is primary or not
* @return the isRightKeyPrimary
*/
public boolean isRightKeyPrimary()
{
return isRightKeyPrimary;
}
/**
* Set the rightKeyPrimary
* @param rightKeyPrimary given rightKeyPrimary
*/
public void setRightKeyPrimary(boolean rightKeyPrimary)
{
isRightKeyPrimary = rightKeyPrimary;
}
}