blob: 749ac11c5b5d374b1a32dd4d0b005ad26006a734 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.apex.malhar.lib.window.accumulation;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.apex.malhar.lib.util.KeyValPair;
import org.apache.apex.malhar.lib.util.PojoUtils;
import org.apache.apex.malhar.lib.window.MergeAccumulation;
import org.apache.commons.lang3.ClassUtils;
import org.apache.hadoop.classification.InterfaceStability;
import com.google.common.base.Throwables;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap;
import static org.apache.apex.malhar.lib.window.accumulation.AbstractPojoJoin.STREAM.LEFT;
/**
* Join Accumulation for Pojo Streams.
*
*
* @since 3.7.0
*/
@InterfaceStability.Evolving
public abstract class AbstractPojoJoin<InputT1, InputT2>
implements MergeAccumulation<InputT1, InputT2, List<Multimap<List<Object>, Object>>, List<?>>
{
protected String[] keys;
protected Class<?> outClass;
private transient Map<String,PojoUtils.Getter> gettersStream1;
private transient Map<String,PojoUtils.Getter> gettersStream2;
protected transient Map<String,PojoUtils.Setter> setters;
protected transient Map<String, KeyValPair<STREAM, String>> outputToInputMap;
protected transient String[] leftKeys;
protected transient String[] rightKeys;
public enum STREAM
{
LEFT, RIGHT
}
public AbstractPojoJoin()
{
leftKeys = new String[]{};
rightKeys = new String[]{};
outClass = null;
}
/**
* This constructor will be used when the user wants to include all the fields of Output POJO
* and the field names of output POJO match the field names of POJO coming on input streams.
*/
public AbstractPojoJoin(Class<?> outClass, String[] leftKeys, String[] rightKeys)
{
if (leftKeys.length != rightKeys.length) {
throw new IllegalArgumentException("Number of keys in left stream should match in right stream");
}
this.leftKeys = leftKeys;
this.rightKeys = rightKeys;
this.outClass = outClass;
}
/**
* This constructor will be used when the user wants to include some specific
* fields of the output POJO and/or wants to have a mapping of the fields of output
* POJO to POJO coming on input streams.
*/
public AbstractPojoJoin(Class<?> outClass, String[] leftKeys, String[] rightKeys, Map<String, KeyValPair<STREAM, String>> outputToInputMap)
{
this(outClass,leftKeys,rightKeys);
this.outputToInputMap = outputToInputMap;
}
private void createSetters()
{
Field[] fields = outClass.getDeclaredFields();
setters = new HashMap<>();
for (Field field : fields) {
Class outputField = ClassUtils.primitiveToWrapper(field.getType());
String fieldName = field.getName();
setters.put(fieldName,PojoUtils.createSetter(outClass,fieldName,outputField));
}
}
private Map<String,PojoUtils.Getter> createGetters(Class<?> input)
{
Field[] fields = input.getDeclaredFields();
Map<String,PojoUtils.Getter> getters = new HashMap<>();
for (Field field : fields) {
Class inputField = ClassUtils.primitiveToWrapper(field.getType());
String fieldName = field.getName();
getters.put(fieldName,PojoUtils.createGetter(input, fieldName, inputField));
}
return getters;
}
@Override
public List<Multimap<List<Object>, Object>> accumulate(List<Multimap<List<Object>, Object>> accumulatedValue, InputT1 input)
{
if (gettersStream1 == null) {
gettersStream1 = createGetters(input.getClass());
}
try {
return accumulateWithIndex(0, accumulatedValue, input);
} catch (NoSuchFieldException e) {
throw Throwables.propagate(e);
}
}
@Override
public List<Multimap<List<Object>, Object>> accumulate2(List<Multimap<List<Object>, Object>> accumulatedValue, InputT2 input)
{
if (gettersStream2 == null) {
gettersStream2 = createGetters(input.getClass());
}
try {
return accumulateWithIndex(1, accumulatedValue, input);
} catch (NoSuchFieldException e) {
throw Throwables.propagate(e);
}
}
@Override
public List<Multimap<List<Object>, Object>> defaultAccumulatedValue()
{
List<Multimap<List<Object>, Object>> accu = new ArrayList<>();
for (int i = 0; i < 2; i++) {
Multimap<List<Object>, Object> mMap = ArrayListMultimap.create();
accu.add(mMap);
}
return accu;
}
private List<Multimap<List<Object>, Object>> accumulateWithIndex(int index,
List<Multimap<List<Object>, Object>> accu, Object input) throws NoSuchFieldException
{
// TODO: If a stream never sends out any tuple during one window, a wrong key would not be detected.
Multimap<List<Object>, Object> curMap = accu.get(index);
List<Object> key = getKeyForMultiMap(input,index);
curMap.put(key,input);
return accu;
}
private List<Object> getKeyForMultiMap(Object input, int index)
{
List<Object> key = new ArrayList<>();
String[] reqKeys = index == 0 ? leftKeys : rightKeys;
Map<String,PojoUtils.Getter> gettersStream = index == 0 ? gettersStream1 : gettersStream2;
for (String lkey : reqKeys ) {
key.add(gettersStream.get(lkey).get(input));
}
return key;
}
@Override
public List<Multimap<List<Object>, Object>> merge(List<Multimap<List<Object>, Object>> accumulatedValue1, List<Multimap<List<Object>, Object>> accumulatedValue2)
{
for (int i = 0; i < 2; i++) {
Multimap<List<Object>, Object> mMap1 = accumulatedValue1.get(i);
Multimap<List<Object>, Object> mMap2 = accumulatedValue2.get(i);
for (Map.Entry<List<Object>, Object> entry : mMap2.entries()) {
mMap1.put(entry.getKey(),entry.getValue());
}
}
return accumulatedValue1;
}
@Override
public List<?> getOutput(List<Multimap<List<Object>, Object>> accumulatedValue)
{
if (setters == null) {
createSetters();
}
// TODO: May need to revisit (use state manager).
return getAllCombo(accumulatedValue);
}
protected void setObjectForResult(Map<String,PojoUtils.Getter> stream, Object input, Object output)
{
for (Map.Entry<String, PojoUtils.Getter> getter : stream.entrySet()) {
if (setters.containsKey(getter.getKey())) {
setters.get(getter.getKey()).set(output, getter.getValue().get(input));
}
}
}
/**
* This function takes the required join on the 2 input streams for matching keys
* and allows the derived classes to implement the logic in case of non matching keys.
*
* It is designed such that for outer joins it will always assume that it is
* a left outer join and hence it considers right stream as left in case of
* right outer join keeping the code and logic the same.
*
* For each key in the left stream a corresponding key is searched in the right stream
* if a match is found then the all the objects with that key are added to Output list,
* also that key is removed from right stream as it will no longer be required in any join
* scenario.If a match is not found then it relies on derived class implementation to handle it.
*
* @param accu which is the main accumulation data structure
* @return List of objects got after joining the streams
*/
private List<Object> getAllCombo(List<Multimap<List<Object>, Object>> accu)
{
List<Object> result = new ArrayList<>();
int leftStreamIndex = getLeftStreamIndex();
Multimap<List<Object>, Object> leftStream = accu.get(leftStreamIndex);
Multimap<List<Object>, Object> rightStream = ArrayListMultimap.create(accu.get((leftStreamIndex + 1) % 2));
Map<String,PojoUtils.Getter> leftGettersStream = leftStreamIndex == 0 ? gettersStream1 : gettersStream2;
Map<String,PojoUtils.Getter> rightGettersStream = leftStreamIndex == 1 ? gettersStream1 : gettersStream2;
for (List lMap : leftStream.keySet()) {
Collection<Object> left = leftStream.get(lMap);
if (rightStream.containsKey(lMap)) {
Collection<Object> right = rightStream.get(lMap);
Object o;
try {
o = outClass.newInstance();
} catch (Throwable e) {
throw Throwables.propagate(e);
}
for (Object lObj:left) {
for (Object rObj:right) {
if (outputToInputMap != null) {
for (Map.Entry<String, KeyValPair<STREAM,String>> entry : outputToInputMap.entrySet()) {
KeyValPair<STREAM,String> kv = entry.getValue();
Object reqObject;
Map<String,PojoUtils.Getter> reqStream;
if (kv.getKey() == LEFT) {
reqObject = leftStreamIndex == 0 ? lObj : rObj;
reqStream = leftStreamIndex == 0 ? leftGettersStream : rightGettersStream;
} else {
reqObject = leftStreamIndex == 0 ? rObj : lObj;
reqStream = leftStreamIndex == 0 ? rightGettersStream : leftGettersStream;
}
setters.get(entry.getKey()).set(o,reqStream.get(entry.getValue().getValue()).get(reqObject));
}
} else {
setObjectForResult(leftGettersStream, lObj, o);
setObjectForResult(rightGettersStream, rObj, o);
}
}
result.add(o);
}
rightStream.removeAll(lMap);
} else {
addNonMatchingResult(left, leftGettersStream, result);
}
}
addNonMatchingRightStream(rightStream, rightGettersStream, result);
return result;
}
/**
* This function defines the strategy to be used when no matching key is found.
*/
protected abstract void addNonMatchingResult(Collection<Object> left, Map<String,PojoUtils.Getter> leftGettersStream, List<Object> result);
/**
* This function defines the strategy to be used when the join is interested to add POJO's
* from right stream when no matching key is found.
*/
protected abstract void addNonMatchingRightStream(Multimap<List<Object>, Object> rightStream, Map<String,PojoUtils.Getter> rightGettersStream, List<Object> result);
/**
* This function lets the join decide which is the left stream and which is the right stream.
*/
protected abstract int getLeftStreamIndex();
@Override
public List<?> getRetraction(List<?> value)
{
return null;
}
}