blob: c07258a9b6c611dbe373c52e24c04b2e248c397d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.sql.engine.exec.rel;
import static org.apache.ignite.internal.sql.engine.util.TypeUtils.rowSchemaFromRelTypes;
import it.unimi.dsi.fastutil.objects.Object2ObjectOpenHashMap;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.rel.core.JoinInfo;
import org.apache.calcite.rel.core.JoinRelType;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.util.ImmutableIntList;
import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
import org.apache.ignite.internal.sql.engine.exec.RowHandler;
import org.apache.ignite.internal.sql.engine.exec.RowHandler.RowBuilder;
import org.apache.ignite.internal.sql.engine.exec.RowHandler.RowFactory;
import org.apache.ignite.internal.sql.engine.exec.row.RowSchema;
/** HashJoin implementor. */
public abstract class HashJoinNode<RowT> extends AbstractRightMaterializedJoinNode<RowT> {
final Map<RowWrapper<RowT>, TouchedCollection<RowT>> hashStore = new Object2ObjectOpenHashMap<>();
protected final RowHandler<RowT> handler;
private final List<Integer> leftJoinPositions;
private final List<Integer> rightJoinPositions;
final boolean touchResults;
Iterator<RowT> rightIt = Collections.emptyIterator();
private final RowSchema rightJoinRelatedRowSchema;
private final RowSchema leftJoinRelatedRowSchema;
private final RowBuilder<RowT> leftRowBuilder;
private final RowBuilder<RowT> rightRowBuilder;
private HashJoinNode(ExecutionContext<RowT> ctx, JoinInfo joinInfo, boolean touch,
RelDataType leftRowType, RelDataType rightRowType) {
super(ctx);
handler = ctx.rowHandler();
touchResults = touch;
leftJoinPositions = joinInfo.leftKeys.toIntegerList();
rightJoinPositions = joinInfo.rightKeys.toIntegerList();
assert leftJoinPositions.size() == rightJoinPositions.size();
ImmutableIntList rightKeys = joinInfo.rightKeys;
List<RelDataType> rightTypes = new ArrayList<>(rightKeys.size());
List<RelDataTypeField> rightFields = rightRowType.getFieldList();
for (int rightPos : rightKeys) {
rightTypes.add(rightFields.get(rightPos).getType());
}
rightJoinRelatedRowSchema = rowSchemaFromRelTypes(rightTypes);
ImmutableIntList leftKeys = joinInfo.leftKeys;
List<RelDataType> leftTypes = new ArrayList<>(leftKeys.size());
List<RelDataTypeField> leftFields = leftRowType.getFieldList();
for (int leftPos : leftKeys) {
leftTypes.add(leftFields.get(leftPos).getType());
}
leftJoinRelatedRowSchema = rowSchemaFromRelTypes(leftTypes);
RowFactory<RowT> leftRowFactory = handler.factory(leftJoinRelatedRowSchema);
leftRowBuilder = leftRowFactory.rowBuilder();
RowFactory<RowT> rightRowFactory = handler.factory(rightJoinRelatedRowSchema);
rightRowBuilder = rightRowFactory.rowBuilder();
}
@Override
protected void rewindInternal() {
rightIt = Collections.emptyIterator();
hashStore.clear();
super.rewindInternal();
}
/** Supplied algorithm implementation. */
public static <RowT> HashJoinNode<RowT> create(ExecutionContext<RowT> ctx, RelDataType outputRowType,
RelDataType leftRowType, RelDataType rightRowType, JoinRelType joinType, JoinInfo joinInfo) {
switch (joinType) {
case INNER:
return new InnerHashJoin<>(ctx, joinInfo, leftRowType, rightRowType);
case LEFT: {
RowSchema rightRowSchema = rowSchemaFromRelTypes(RelOptUtil.getFieldTypeList(rightRowType));
RowHandler.RowFactory<RowT> rightRowFactory = ctx.rowHandler().factory(rightRowSchema);
return new LeftHashJoin<>(ctx, rightRowFactory, joinInfo, leftRowType, rightRowType);
}
case RIGHT: {
RowSchema leftRowSchema = rowSchemaFromRelTypes(RelOptUtil.getFieldTypeList(leftRowType));
RowHandler.RowFactory<RowT> leftRowFactory = ctx.rowHandler().factory(leftRowSchema);
return new RightHashJoin<>(ctx, leftRowFactory, joinInfo, leftRowType, rightRowType);
}
case FULL: {
RowSchema leftRowSchema = rowSchemaFromRelTypes(RelOptUtil.getFieldTypeList(leftRowType));
RowSchema rightRowSchema = rowSchemaFromRelTypes(RelOptUtil.getFieldTypeList(rightRowType));
RowHandler.RowFactory<RowT> leftRowFactory = ctx.rowHandler().factory(leftRowSchema);
RowHandler.RowFactory<RowT> rightRowFactory = ctx.rowHandler().factory(rightRowSchema);
return new FullOuterHashJoin<>(ctx, leftRowFactory, rightRowFactory, joinInfo, leftRowType, rightRowType);
}
case SEMI:
return new SemiHashJoin<>(ctx, joinInfo, leftRowType, rightRowType);
case ANTI:
return new AntiHashJoin<>(ctx, joinInfo, leftRowType, rightRowType);
default:
throw new IllegalStateException("Join type \"" + joinType + "\" is not supported yet");
}
}
private static class InnerHashJoin<RowT> extends HashJoinNode<RowT> {
private InnerHashJoin(
ExecutionContext<RowT> ctx,
JoinInfo joinInfo,
RelDataType leftRowType,
RelDataType rightRowType
) {
super(ctx, joinInfo, false, leftRowType, rightRowType);
}
@Override
protected void join() throws Exception {
if (waitingRight == NOT_WAITING) {
inLoop = true;
try {
while (requested > 0 && (left != null || !leftInBuf.isEmpty())) {
if (!rightIt.hasNext()) {
left = leftInBuf.remove();
Collection<RowT> rightRows = lookup(left, touchResults);
rightIt = rightRows.iterator();
}
if (rightIt.hasNext()) {
while (rightIt.hasNext()) {
checkState();
RowT right = rightIt.next();
--requested;
RowT row = handler.concat(left, right);
downstream().push(row);
if (requested == 0) {
break;
}
}
}
if (!rightIt.hasNext()) {
left = null;
}
}
} finally {
inLoop = false;
}
}
getMoreOrEnd();
}
}
private static class LeftHashJoin<RowT> extends HashJoinNode<RowT> {
/** Right row factory. */
private final RowHandler.RowFactory<RowT> rightRowFactory;
private LeftHashJoin(
ExecutionContext<RowT> ctx,
RowHandler.RowFactory<RowT> rightRowFactory,
JoinInfo joinInfo,
RelDataType leftRowType,
RelDataType rightRowType
) {
super(ctx, joinInfo, false, leftRowType, rightRowType);
this.rightRowFactory = rightRowFactory;
}
/** {@inheritDoc} */
@Override
protected void join() throws Exception {
if (waitingRight == NOT_WAITING) {
inLoop = true;
try {
while (requested > 0 && (left != null || !leftInBuf.isEmpty())) {
checkState();
if (!rightIt.hasNext()) {
left = leftInBuf.remove();
Collection<RowT> rightRows = lookup(left, touchResults);
if (rightRows.isEmpty()) {
requested--;
downstream().push(handler.concat(left, rightRowFactory.create()));
}
rightIt = rightRows.iterator();
}
if (rightIt.hasNext()) {
while (rightIt.hasNext()) {
checkState();
RowT right = rightIt.next();
--requested;
RowT row = handler.concat(left, right);
downstream().push(row);
if (requested == 0) {
break;
}
}
}
if (!rightIt.hasNext()) {
left = null;
}
}
} finally {
inLoop = false;
}
}
getMoreOrEnd();
}
}
private static class RightHashJoin<RowT> extends HashJoinNode<RowT> {
/** Left row factory. */
private final RowHandler.RowFactory<RowT> leftRowFactory;
private RightHashJoin(
ExecutionContext<RowT> ctx,
RowHandler.RowFactory<RowT> leftRowFactory,
JoinInfo joinInfo,
RelDataType leftRowType,
RelDataType rightRowType
) {
super(ctx, joinInfo, true, leftRowType, rightRowType);
this.leftRowFactory = leftRowFactory;
}
@Override
protected void join() throws Exception {
if (waitingRight == NOT_WAITING) {
inLoop = true;
try {
while (requested > 0 && (left != null || !leftInBuf.isEmpty())) {
checkState();
if (!rightIt.hasNext()) {
left = leftInBuf.remove();
Collection<RowT> rightRows = lookup(left, touchResults);
rightIt = rightRows.iterator();
}
if (rightIt.hasNext()) {
while (rightIt.hasNext()) {
checkState();
RowT right = rightIt.next();
--requested;
RowT row = handler.concat(left, right);
downstream().push(row);
if (requested == 0) {
break;
}
}
}
if (!rightIt.hasNext()) {
left = null;
}
}
} finally {
inLoop = false;
}
}
if (left == null && leftInBuf.isEmpty() && waitingLeft == NOT_WAITING && waitingRight == NOT_WAITING && requested > 0) {
inLoop = true;
try {
if (!rightIt.hasNext()) {
rightIt = getUntouched(hashStore);
}
RowT emptyLeft = leftRowFactory.create();
while (rightIt.hasNext()) {
checkState();
RowT right = rightIt.next();
RowT row = handler.concat(emptyLeft, right);
--requested;
downstream().push(row);
if (requested == 0) {
break;
}
}
} finally {
inLoop = false;
}
}
getMoreOrEnd();
}
}
private static class FullOuterHashJoin<RowT> extends HashJoinNode<RowT> {
/** Left row factory. */
private final RowHandler.RowFactory<RowT> leftRowFactory;
/** Right row factory. */
private final RowHandler.RowFactory<RowT> rightRowFactory;
private FullOuterHashJoin(
ExecutionContext<RowT> ctx,
RowHandler.RowFactory<RowT> leftRowFactory,
RowHandler.RowFactory<RowT> rightRowFactory,
JoinInfo joinInfo,
RelDataType leftRowType,
RelDataType rightRowType
) {
super(ctx, joinInfo, true, leftRowType, rightRowType);
this.leftRowFactory = leftRowFactory;
this.rightRowFactory = rightRowFactory;
}
/** {@inheritDoc} */
@Override
protected void join() throws Exception {
if (waitingRight == NOT_WAITING) {
inLoop = true;
try {
while (requested > 0 && (left != null || !leftInBuf.isEmpty())) {
checkState();
if (!rightIt.hasNext()) {
left = leftInBuf.remove();
Collection<RowT> rightRows = lookup(left, touchResults);
if (rightRows.isEmpty()) {
requested--;
downstream().push(handler.concat(left, rightRowFactory.create()));
}
rightIt = rightRows.iterator();
}
if (rightIt.hasNext()) {
while (rightIt.hasNext()) {
checkState();
RowT right = rightIt.next();
--requested;
RowT row = handler.concat(left, right);
downstream().push(row);
if (requested == 0) {
break;
}
}
}
if (!rightIt.hasNext()) {
left = null;
}
}
} finally {
inLoop = false;
}
}
if (left == null && !rightIt.hasNext() && leftInBuf.isEmpty() && waitingLeft == NOT_WAITING
&& waitingRight == NOT_WAITING && requested > 0) {
inLoop = true;
try {
if (!rightIt.hasNext()) {
rightIt = getUntouched(hashStore);
}
RowT emptyLeft = leftRowFactory.create();
while (rightIt.hasNext()) {
checkState();
RowT right = rightIt.next();
RowT row = handler.concat(emptyLeft, right);
--requested;
downstream().push(row);
if (requested == 0) {
break;
}
}
} finally {
inLoop = false;
}
}
getMoreOrEnd();
}
}
private static class SemiHashJoin<RowT> extends HashJoinNode<RowT> {
private SemiHashJoin(
ExecutionContext<RowT> ctx,
JoinInfo joinInfo,
RelDataType leftRowType,
RelDataType rightRowType
) {
super(ctx, joinInfo, false, leftRowType, rightRowType);
}
/** {@inheritDoc} */
@Override
protected void join() throws Exception {
if (waitingRight == NOT_WAITING) {
inLoop = true;
try {
while (requested > 0 && (left != null || !leftInBuf.isEmpty())) {
checkState();
left = leftInBuf.remove();
Collection<RowT> rightRows = lookup(left, touchResults);
if (!rightRows.isEmpty()) {
requested--;
downstream().push(left);
if (requested == 0) {
break;
}
}
left = null;
}
} finally {
inLoop = false;
}
}
getMoreOrEnd();
}
}
private static class AntiHashJoin<RowT> extends HashJoinNode<RowT> {
private AntiHashJoin(
ExecutionContext<RowT> ctx,
JoinInfo joinInfo,
RelDataType leftRowType,
RelDataType rightRowType
) {
super(ctx, joinInfo, false, leftRowType, rightRowType);
}
/** {@inheritDoc} */
@Override
protected void join() throws Exception {
if (waitingRight == NOT_WAITING) {
inLoop = true;
try {
while (requested > 0 && (left != null || !leftInBuf.isEmpty())) {
checkState();
left = leftInBuf.remove();
Collection<RowT> rightRows = lookup(left, touchResults);
if (rightRows.isEmpty()) {
requested--;
downstream().push(left);
if (requested == 0) {
break;
}
}
left = null;
}
} finally {
inLoop = false;
}
}
getMoreOrEnd();
}
}
Collection<RowT> lookup(RowT row, boolean processTouched) {
Collection<RowT> coll = Collections.emptyList();
for (Integer entry : leftJoinPositions) {
Object ent = handler.get(entry, row);
if (ent == null) {
leftRowBuilder.reset();
return Collections.emptyList();
}
leftRowBuilder.addField(ent);
}
RowWrapper<RowT> row0 = new RowWrapper<>(leftRowBuilder.buildAndReset(), handler, leftJoinPositions.size());
TouchedCollection<RowT> found = hashStore.get(row0);
if (found != null) {
coll = found.items();
if (processTouched) {
found.touched = true;
}
}
return coll;
}
private static <RowT> Iterator<RowT> getUntouched(Map<RowWrapper<RowT>, TouchedCollection<RowT>> entries) {
return new Iterator<RowT>() {
private final Iterator<TouchedCollection<RowT>> it = entries.values().iterator();
private Iterator<RowT> innerIt = Collections.emptyIterator();
@Override
public boolean hasNext() {
if (innerIt.hasNext()) {
return true;
}
advance();
return innerIt.hasNext();
}
@Override
public RowT next() {
if (!hasNext()) {
throw new NoSuchElementException();
}
return innerIt.next();
}
void advance() {
while (it.hasNext()) {
TouchedCollection<RowT> coll = it.next();
if (!coll.touched && !coll.items().isEmpty()) {
innerIt = coll.items().iterator();
break;
}
}
}
};
}
@Override
protected void pushRight(RowT row) throws Exception {
assert downstream() != null;
assert waitingRight > 0;
checkState();
waitingRight--;
for (Integer entry : rightJoinPositions) {
Object ent = handler.get(entry, row);
rightRowBuilder.addField(ent);
}
RowWrapper<RowT> row0 = new RowWrapper<>(rightRowBuilder.buildAndReset(), handler, rightJoinPositions.size());
TouchedCollection<RowT> raw = hashStore.computeIfAbsent(row0, k -> new TouchedCollection<>());
raw.add(row);
if (waitingRight == 0) {
rightSource().request(waitingRight = inBufSize);
}
}
private static class RowWrapper<RowT> {
RowT row;
RowHandler<RowT> handler;
int itemsCount;
RowWrapper(RowT row, RowHandler<RowT> handler, int itemsCount) {
this.row = row;
this.handler = handler;
this.itemsCount = itemsCount;
}
@Override
public int hashCode() {
int hashCode = 0;
for (int i = 0; i < itemsCount; ++i) {
Object entHold = handler.get(i, row);
hashCode += Objects.hashCode(entHold);
}
return hashCode;
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}
RowWrapper<RowT> row0 = (RowWrapper<RowT>) obj;
for (int i = 0; i < itemsCount; ++i) {
Object input = handler.get(i, row0.row);
Object current = handler.get(i, row);
boolean comp = Objects.equals(input, current);
if (!comp) {
return comp;
}
}
return true;
}
}
void getMoreOrEnd() throws Exception {
if (waitingRight == 0) {
rightSource().request(waitingRight = inBufSize);
}
if (waitingLeft == 0 && leftInBuf.isEmpty()) {
leftSource().request(waitingLeft = inBufSize);
}
if (requested > 0 && waitingLeft == NOT_WAITING && waitingRight == NOT_WAITING && leftInBuf.isEmpty() && left == null
&& !rightIt.hasNext()) {
requested = 0;
downstream().end();
}
}
private static class TouchedCollection<RowT> {
Collection<RowT> coll;
boolean touched;
TouchedCollection() {
this.coll = new ArrayList<>();
}
void add(RowT row) {
coll.add(row);
}
Collection<RowT> items() {
return coll;
}
}
}