| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.apex.malhar.lib.join; |
| |
| import java.util.ArrayList; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| |
| import org.apache.apex.malhar.lib.streamquery.condition.Condition; |
| import org.apache.apex.malhar.lib.streamquery.index.Index; |
| import org.apache.hadoop.classification.InterfaceStability.Evolving; |
| |
| import com.datatorrent.api.Context.OperatorContext; |
| import com.datatorrent.api.DefaultInputPort; |
| import com.datatorrent.api.DefaultOutputPort; |
| import com.datatorrent.api.Operator; |
| import com.datatorrent.api.annotation.OperatorAnnotation; |
| |
| /** |
| * An implementation of Operator that reads table row data from two table data input ports. <br> |
| * <p> |
| * Operator semi-joins row on given condition and selected names, emits |
| * semi-joined result at output port. |
| * |
| * Note: A semi-join is not a left-join or left-outer-join. In semi-join only the joined rows |
| * from the left table are returned. However, in a left-outer-join, all the rows from left table |
| * will be returned (also padding with nulls for columns from the right table when not joined). |
| * |
| * For more information see {@link http://docs.oracle.com/cd/B28359_01/server.111/b28286/statements_10002.htm#i2166436} |
| * |
| * <br> |
| * <b>StateFull : Yes,</b> Operator aggregates input over application window. <br> |
| * <b>Partitions : No, </b> will yield wrong result(s). <br> |
| * <br> |
| * <b>Ports : </b> <br> |
| * <b> inport1 : </b> Input port for table 1, expects HashMap<String, Object> <br> |
| * <b> inport2 : </b> Input port for table 2, expects HashMap<String, Object> <br> |
| * <b> outport : </b> Output semi-joined row port, emits HashMap<String, ArrayList<Object>> <br> |
| * <br> |
| * <b> Properties : </b> |
| * <b> joinCondition : </b> Join condition for table rows. <br> |
| * <b> table1Columns : </b> Columns to be selected from table1. <br> |
| * <b> table2Columns : </b> Columns to be selected from table2. <br> |
| * <br> |
| * |
| * @displayName Semi join |
| * @category Join Manipulators |
| * @tags sql, semi join operator |
| * @since 0.3.3 |
| */ |
| @OperatorAnnotation(partitionable = false) |
| @Evolving |
| public class SemiJoinOperator implements Operator |
| { |
| |
| /** |
| * Join Condition. |
| */ |
| private Condition joinCondition; |
| |
| /** |
| * Table1 select columns. |
| * Note: only left table (Table1) will be output in an semi-join |
| */ |
| private ArrayList<Index> table1Columns = new ArrayList<>(); |
| |
| /** |
| * Collect data rows from input port 1. |
| */ |
| private List<Map<String, Object>> table1; |
| |
| /** |
| * Collect data from input port 2. |
| */ |
| private List<Map<String, Object>> table2; |
| |
| /** |
| * Input port 1 that takes a map of <string,object>. |
| */ |
| public final transient DefaultInputPort<Map<String, Object>> inport1 = new DefaultInputPort<Map<String, Object>>() |
| { |
| @Override |
| public void process(Map<String, Object> tuple) |
| { |
| table1.add(tuple); |
| for (int j = 0; j < table2.size(); j++) { |
| if ((joinCondition != null) && (joinCondition.isValidJoin(tuple, table2.get(j)))) { |
| joinRows(tuple); |
| // row has been join, and can be removed now |
| table1.remove(tuple); |
| } |
| } |
| } |
| }; |
| |
| /** |
| * Input port 2 that takes a map of <string,object>. |
| */ |
| public final transient DefaultInputPort<Map<String, Object>> inport2 = new DefaultInputPort<Map<String, Object>>() |
| { |
| @Override |
| public void process(Map<String, Object> tuple) |
| { |
| table2.add(tuple); |
| |
| for (int j = 0; j < table1.size(); j++) { |
| if ((joinCondition != null) |
| && (joinCondition.isValidJoin(table1.get(j), tuple))) { |
| joinRows(table1.get(j)); |
| table1.remove(table1.get(j)); |
| } |
| } |
| } |
| }; |
| |
| /** |
| * Output port that emits a map of <string,object>. |
| */ |
| public final transient DefaultOutputPort<Map<String, Object>> outport = new DefaultOutputPort<>(); |
| |
| @Override |
| public void setup(OperatorContext arg0) |
| { |
| table1 = new ArrayList<>(); |
| table2 = new ArrayList<>(); |
| } |
| |
| @Override |
| public void teardown() |
| { |
| } |
| |
| @Override |
| public void beginWindow(long arg0) |
| { |
| } |
| |
| @Override |
| public void endWindow() |
| { |
| table1.clear(); |
| table2.clear(); |
| } |
| |
| /** |
| * @return the joinCondition |
| */ |
| public Condition getJoinCondition() |
| { |
| return joinCondition; |
| } |
| |
| /** |
| * Pick the supported condition. Currently only equal join is supported. |
| * |
| * @param joinCondition - join condition |
| */ |
| public void setJoinCondition(Condition joinCondition) |
| { |
| this.joinCondition = joinCondition; |
| } |
| |
| /** |
| * Select table1 column name. |
| */ |
| public void selectTable1Column(Index column) |
| { |
| table1Columns.add(column); |
| } |
| |
| /** |
| * Join row from table1 (only left table is used in semi-join). |
| */ |
| protected void joinRows(Map<String, Object> row) |
| { |
| // joined row |
| Map<String, Object> join = new HashMap<>(); |
| |
| // filter table1 columns |
| if (row != null) { |
| for (Index index : table1Columns) { |
| index.filter(row, join); |
| } |
| } |
| |
| // emit row |
| outport.emit(join); |
| } |
| } |