blob: 3f9ffa2b0525acbc787f264efb097e6f7be66d7d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.runtime.join.batch;
import org.apache.flink.table.codegen.Projection;
import org.apache.flink.table.dataformat.BinaryRow;
import org.apache.flink.table.runtime.sort.RecordComparator;
import org.apache.flink.table.runtime.util.ResettableExternalBuffer;
import org.apache.flink.table.typeutils.BinaryRowSerializer;
import org.apache.flink.util.MutableObjectIterator;
import java.io.Closeable;
import java.io.IOException;
/**
* Gets two matched rows for full outer join.
*/
public class SortMergeFullOuterJoinIterator implements Closeable {
private final Projection<BinaryRow, BinaryRow> projection1;
private final Projection<BinaryRow, BinaryRow> projection2;
private final RecordComparator keyComparator;
private final MutableObjectIterator<BinaryRow> iterator1;
private final MutableObjectIterator<BinaryRow> iterator2;
private BinaryRow row1;
private BinaryRow key1;
private BinaryRow row2;
private BinaryRow key2;
private BinaryRow matchKey;
private ResettableExternalBuffer buffer1;
private ResettableExternalBuffer buffer2;
private final int[] nullFilterKeys;
private final boolean nullSafe;
private final boolean filterAllNulls;
public SortMergeFullOuterJoinIterator(
BinaryRowSerializer serializer1, BinaryRowSerializer serializer2,
Projection projection1, Projection projection2, RecordComparator keyComparator,
MutableObjectIterator<BinaryRow> iterator1,
MutableObjectIterator<BinaryRow> iterator2,
ResettableExternalBuffer buffer1,
ResettableExternalBuffer buffer2,
boolean[] filterNulls) throws IOException {
this.projection1 = projection1;
this.projection2 = projection2;
this.keyComparator = keyComparator;
this.iterator1 = iterator1;
this.iterator2 = iterator2;
this.row1 = serializer1.createInstance();
this.row2 = serializer2.createInstance();
this.buffer1 = buffer1;
this.buffer2 = buffer2;
this.nullFilterKeys = NullAwareJoinHelper.getNullFilterKeys(filterNulls);
this.nullSafe = nullFilterKeys.length == 0;
this.filterAllNulls = nullFilterKeys.length == filterNulls.length;
nextRow1();
nextRow2();
}
private boolean shouldFilter(BinaryRow key) {
return NullAwareJoinHelper.shouldFilter(nullSafe, filterAllNulls, nullFilterKeys, key);
}
public boolean nextOuterJoin() throws IOException {
if (key1 != null && (shouldFilter(key1) || key2 == null)) {
matchKey = null;
bufferRows1();
buffer2.reset();
return true; // outer row1.
} else if (key2 != null && (shouldFilter(key2) || key1 == null)) {
matchKey = null;
buffer1.reset();
bufferRows2();
return true; // outer row2.
} else if (key1 != null && key2 != null) {
int cmp = keyComparator.compare(key1, key2);
if (cmp == 0) {
matchKey = key1;
bufferRows1();
bufferRows2(); // match join.
} else if (cmp > 0) {
matchKey = null;
buffer1.reset();
bufferRows2(); // outer row2.
} else {
matchKey = null;
buffer2.reset();
bufferRows1(); // outer row1.
}
return true;
} else {
return false; // bye bye.
}
}
/**
* Buffer rows from iterator1 with same key.
*/
private void bufferRows1() throws IOException {
BinaryRow copy = key1.copy();
buffer1.reset();
do {
buffer1.add(row1);
} while (nextRow1() && keyComparator.compare(key1, copy) == 0);
}
/**
* Buffer rows from iterator2 with same key.
*/
private void bufferRows2() throws IOException {
BinaryRow copy = key2.copy();
buffer2.reset();
do {
buffer2.add(row2);
} while (nextRow2() && keyComparator.compare(key2, copy) == 0);
}
private boolean nextRow1() throws IOException {
if ((row1 = iterator1.next(row1)) != null) {
key1 = projection1.apply(row1);
return true;
} else {
row1 = null;
key1 = null;
return false;
}
}
private boolean nextRow2() throws IOException {
if ((row2 = iterator2.next(row2)) != null) {
key2 = projection2.apply(row2);
return true;
} else {
row2 = null;
key2 = null;
return false;
}
}
public BinaryRow getMatchKey() {
return matchKey;
}
public ResettableExternalBuffer getBuffer1() {
return buffer1;
}
public ResettableExternalBuffer getBuffer2() {
return buffer2;
}
@Override
public void close() {
buffer1.close();
buffer2.close();
}
}