blob: ac192ef05c9fe68daf5c941fdd7bd23ed50ecaa5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.asterix.runtime.evaluators.functions;
import static org.apache.asterix.om.types.EnumDeserializer.ATYPETAGDESERIALIZER;
import java.io.IOException;
import java.util.List;
import org.apache.asterix.builders.AbvsBuilderFactory;
import org.apache.asterix.builders.ArrayListFactory;
import org.apache.asterix.builders.IAsterixListBuilder;
import org.apache.asterix.builders.OrderedListBuilder;
import org.apache.asterix.builders.UnorderedListBuilder;
import org.apache.asterix.common.annotations.MissingNullInOutFunction;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.exceptions.RuntimeDataException;
import org.apache.asterix.dataflow.data.nontagged.serde.AOrderedListSerializerDeserializer;
import org.apache.asterix.dataflow.data.nontagged.serde.AUnorderedListSerializerDeserializer;
import org.apache.asterix.formats.nontagged.BinaryComparatorFactoryProvider;
import org.apache.asterix.formats.nontagged.BinaryHashFunctionFactoryProvider;
import org.apache.asterix.om.functions.BuiltinFunctions;
import org.apache.asterix.om.functions.IFunctionDescriptor;
import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
import org.apache.asterix.om.functions.IFunctionTypeInferer;
import org.apache.asterix.om.pointables.PointableAllocator;
import org.apache.asterix.om.pointables.base.DefaultOpenFieldType;
import org.apache.asterix.om.types.ATypeTag;
import org.apache.asterix.om.types.AbstractCollectionType;
import org.apache.asterix.om.types.BuiltinType;
import org.apache.asterix.om.types.IAType;
import org.apache.asterix.om.util.container.IObjectFactory;
import org.apache.asterix.om.util.container.IObjectPool;
import org.apache.asterix.om.util.container.ListObjectPool;
import org.apache.asterix.runtime.evaluators.base.AbstractScalarFunctionDynamicDescriptor;
import org.apache.asterix.runtime.evaluators.common.ListAccessor;
import org.apache.asterix.runtime.functions.FunctionTypeInferers;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
import org.apache.hyracks.algebricks.runtime.base.IEvaluatorContext;
import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
import org.apache.hyracks.api.dataflow.value.IBinaryHashFunction;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.data.std.api.IMutableValueStorage;
import org.apache.hyracks.data.std.api.IPointable;
import org.apache.hyracks.data.std.api.IValueReference;
import org.apache.hyracks.data.std.primitive.VoidPointable;
import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
/**
* <pre>
* array_intersect(list1, list2, ...) returns a new list containing items that are present in all of the input
* lists. Null and missing items are ignored. It's case-sensitive to string items.
*
* array_intersect([null, 2, missing], [3,missing,2,null]) will result in [2].
*
* It throws an error at compile time if the number of arguments < 2
*
* It returns (or throws an error at runtime) in order:
* 1. missing, if any argument is missing.
* 2. an error if the input lists are not of the same type (one is an ordered list while the other is unordered).
* 3. null, if any input list is null or is not a list.
* 4. otherwise, a new list.
*
* </pre>
*/
@MissingNullInOutFunction
public class ArrayIntersectDescriptor extends AbstractScalarFunctionDynamicDescriptor {
private static final long serialVersionUID = 1L;
private IAType[] argTypes;
public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
@Override
public IFunctionDescriptor createFunctionDescriptor() {
return new ArrayIntersectDescriptor();
}
@Override
public IFunctionTypeInferer createFunctionTypeInferer() {
return FunctionTypeInferers.SET_ARGUMENTS_TYPE;
}
};
@Override
public FunctionIdentifier getIdentifier() {
return BuiltinFunctions.ARRAY_INTERSECT;
}
@Override
public void setImmutableStates(Object... states) {
argTypes = (IAType[]) states;
}
@Override
public IScalarEvaluatorFactory createEvaluatorFactory(final IScalarEvaluatorFactory[] args)
throws AlgebricksException {
return new IScalarEvaluatorFactory() {
private static final long serialVersionUID = 1L;
@Override
public IScalarEvaluator createScalarEvaluator(final IEvaluatorContext ctx) throws HyracksDataException {
return new ArrayIntersectEval(args, ctx);
}
};
}
protected class ValueListIndex implements IValueReference {
private IPointable value;
private int listIndex;
ValueListIndex() {
}
protected void set(IPointable value, int listIndex) {
this.value = value;
this.listIndex = listIndex;
}
@Override
public byte[] getByteArray() {
return value.getByteArray();
}
@Override
public int getStartOffset() {
return value.getStartOffset();
}
@Override
public int getLength() {
return value.getLength();
}
}
protected class ValueListIndexAllocator implements IObjectFactory<ValueListIndex, ATypeTag> {
ValueListIndexAllocator() {
}
@Override
public ValueListIndex create(ATypeTag arg) {
return new ValueListIndex();
}
}
public class ArrayIntersectEval implements IScalarEvaluator {
private final ListAccessor listAccessor;
private final IPointable pointable;
private final ArrayBackedValueStorage currentItemStorage;
private final IPointable[] listsArgs;
private final IScalarEvaluator[] listsEval;
private final IBinaryHashFunction binaryHashFunction;
private final Int2ObjectMap<List<ValueListIndex>> hashes;
private final PointableAllocator pointableAllocator;
private final IObjectPool<IMutableValueStorage, ATypeTag> storageAllocator;
private final IObjectPool<List<ValueListIndex>, ATypeTag> arrayListAllocator;
private final IObjectPool<ValueListIndex, ATypeTag> valueListIndexAllocator;
private final ArrayBackedValueStorage finalResult;
private final CastTypeEvaluator caster;
private final IBinaryComparator comp;
private IAsterixListBuilder orderedListBuilder;
private IAsterixListBuilder unorderedListBuilder;
ArrayIntersectEval(IScalarEvaluatorFactory[] args, IEvaluatorContext ctx) throws HyracksDataException {
orderedListBuilder = null;
unorderedListBuilder = null;
pointableAllocator = new PointableAllocator();
storageAllocator = new ListObjectPool<>(new AbvsBuilderFactory());
arrayListAllocator = new ListObjectPool<>(new ArrayListFactory<>());
valueListIndexAllocator = new ListObjectPool<>(new ValueListIndexAllocator());
hashes = new Int2ObjectOpenHashMap<>();
finalResult = new ArrayBackedValueStorage();
listAccessor = new ListAccessor();
caster = new CastTypeEvaluator();
// for functions that accept multiple lists arguments, they will be casted to open, hence item is ANY
comp = BinaryComparatorFactoryProvider.INSTANCE
.getBinaryComparatorFactory(BuiltinType.ANY, BuiltinType.ANY, true).createBinaryComparator();
listsArgs = new IPointable[args.length];
listsEval = new IScalarEvaluator[args.length];
pointable = new VoidPointable();
currentItemStorage = new ArrayBackedValueStorage();
for (int i = 0; i < args.length; i++) {
listsArgs[i] = new VoidPointable();
listsEval[i] = args[i].createScalarEvaluator(ctx);
}
binaryHashFunction = BinaryHashFunctionFactoryProvider.INSTANCE
.getBinaryHashFunctionFactory(BuiltinType.ANY).createBinaryHashFunction();
}
@Override
public void evaluate(IFrameTupleReference tuple, IPointable result) throws HyracksDataException {
byte listArgType;
boolean isReturnNull = false;
AbstractCollectionType outList = null;
ATypeTag listTag;
int minListIndex = 0;
int minSize = -1;
int nextSize;
// evaluate all the lists first to make sure they're all actually lists and of the same list type
try {
for (int i = 0; i < listsEval.length; i++) {
listsEval[i].evaluate(tuple, pointable);
if (PointableHelper.checkAndSetMissingOrNull(result, pointable)) {
if (result.getByteArray()[0] == ATypeTag.SERIALIZED_MISSING_TYPE_TAG) {
return;
}
// null value, but check other arguments for missing first (higher priority)
isReturnNull = true;
}
if (!isReturnNull) {
listArgType = pointable.getByteArray()[pointable.getStartOffset()];
listTag = ATYPETAGDESERIALIZER.deserialize(listArgType);
if (!listTag.isListType()) {
isReturnNull = true;
} else if (outList != null && outList.getTypeTag() != listTag) {
throw new RuntimeDataException(ErrorCode.DIFFERENT_LIST_TYPE_ARGS, sourceLoc);
} else {
if (outList == null) {
outList =
(AbstractCollectionType) DefaultOpenFieldType.getDefaultOpenFieldType(listTag);
}
caster.resetAndAllocate(outList, argTypes[i], listsEval[i]);
caster.cast(pointable, listsArgs[i]);
nextSize = getNumItems(outList, listsArgs[i].getByteArray(), listsArgs[i].getStartOffset());
if (nextSize < minSize || minSize == -1) {
minSize = nextSize;
minListIndex = i;
}
}
}
}
if (isReturnNull) {
PointableHelper.setNull(result);
return;
}
IAsterixListBuilder listBuilder;
if (outList.getTypeTag() == ATypeTag.ARRAY) {
if (orderedListBuilder == null) {
orderedListBuilder = new OrderedListBuilder();
}
listBuilder = orderedListBuilder;
} else {
if (unorderedListBuilder == null) {
unorderedListBuilder = new UnorderedListBuilder();
}
listBuilder = unorderedListBuilder;
}
IPointable listArg;
hashes.clear();
// first, get distinct items of the most restrictive (smallest) list.
// values will be added to listBuilder after inspecting all input lists
listArg = listsArgs[minListIndex];
listAccessor.reset(listArg.getByteArray(), listArg.getStartOffset());
buildRestrictiveList(listAccessor);
listBuilder.reset(outList);
if (!hashes.isEmpty()) {
// process each list one by one
for (int listIndex = 0; listIndex < listsArgs.length; listIndex++) {
// TODO(ali): find a way to avoid comparing the smallest list
listArg = listsArgs[listIndex];
listAccessor.reset(listArg.getByteArray(), listArg.getStartOffset());
processList(listAccessor, listIndex, listBuilder);
}
}
finalResult.reset();
listBuilder.write(finalResult.getDataOutput(), true);
result.set(finalResult);
} catch (IOException e) {
throw HyracksDataException.create(e);
} finally {
caster.deallocatePointables();
valueListIndexAllocator.reset();
storageAllocator.reset();
arrayListAllocator.reset();
pointableAllocator.reset();
}
}
private int getNumItems(AbstractCollectionType listType, byte[] listBytes, int offset) {
if (listType.getTypeTag() == ATypeTag.ARRAY) {
return AOrderedListSerializerDeserializer.getNumberOfItems(listBytes, offset);
} else {
return AUnorderedListSerializerDeserializer.getNumberOfItems(listBytes, offset);
}
}
// puts all the items of the smallest list in "hashes"
private void buildRestrictiveList(ListAccessor listAccessor) throws IOException {
if (listAccessor.size() > 0) {
int hash;
List<ValueListIndex> sameHashes;
boolean itemInStorage;
IPointable item = pointableAllocator.allocateEmpty();
ArrayBackedValueStorage storage = (ArrayBackedValueStorage) storageAllocator.allocate(null);
storage.reset();
for (int j = 0; j < listAccessor.size(); j++) {
itemInStorage = listAccessor.getOrWriteItem(j, item, storage);
if (notNullAndMissing(item)) {
hash = binaryHashFunction.hash(item.getByteArray(), item.getStartOffset(), item.getLength());
sameHashes = hashes.get(hash);
if (addToSmallestList(item, hash, sameHashes)) {
// item has been added to intersect list and is being used, allocate new pointable
item = pointableAllocator.allocateEmpty();
if (itemInStorage) {
storage = (ArrayBackedValueStorage) storageAllocator.allocate(null);
storage.reset();
}
}
}
}
}
}
private void processList(ListAccessor listAccessor, int listIndex, IAsterixListBuilder listBuilder)
throws IOException {
int hash;
List<ValueListIndex> sameHashes;
for (int j = 0; j < listAccessor.size(); j++) {
listAccessor.getOrWriteItem(j, pointable, currentItemStorage);
if (notNullAndMissing(pointable)) {
// hash the item and look up to see if it is common
hash = binaryHashFunction.hash(pointable.getByteArray(), pointable.getStartOffset(),
pointable.getLength());
sameHashes = hashes.get(hash);
incrementIfCommonValue(pointable, sameHashes, listIndex, listBuilder);
}
}
}
// collects the items of the most restrictive list, it initializes the list index as -1. each successive list
// should stamp the value with its list index if the list has the item. It starts with list index = 0
private boolean addToSmallestList(IPointable item, int hash, List<ValueListIndex> sameHashes)
throws IOException {
// add if new item
if (sameHashes == null) {
List<ValueListIndex> newHashes = arrayListAllocator.allocate(null);
newHashes.clear();
ValueListIndex valueListIndex = valueListIndexAllocator.allocate(null);
valueListIndex.set(item, -1);
newHashes.add(valueListIndex);
hashes.put(hash, newHashes);
return true;
} else if (PointableHelper.findItem(item, sameHashes, comp) == null) {
ValueListIndex valueListIndex = valueListIndexAllocator.allocate(null);
valueListIndex.set(item, -1);
sameHashes.add(valueListIndex);
return true;
}
// else ignore for duplicate values in the same list
return false;
}
private void incrementIfCommonValue(IPointable item, List<ValueListIndex> sameHashes, int listIndex,
IAsterixListBuilder listBuilder) throws IOException {
if (sameHashes != null) {
// look for the same equal item, add to list builder when all lists have seen this item
incrementIfExists(sameHashes, item, listIndex, listBuilder);
}
}
private boolean notNullAndMissing(IPointable item) {
byte tag = item.getByteArray()[item.getStartOffset()];
return tag != ATypeTag.SERIALIZED_NULL_TYPE_TAG && tag != ATypeTag.SERIALIZED_MISSING_TYPE_TAG;
}
private void incrementIfExists(List<ValueListIndex> sameHashes, IPointable item, int listIndex,
IAsterixListBuilder listBuilder) throws HyracksDataException {
ValueListIndex sameValue = PointableHelper.findItem(item, sameHashes, comp);
if (sameValue != null && listIndex - sameValue.listIndex == 1) {
// found the item, its stamp is OK (stamp saves the index of the last list that has seen this item)
// increment stamp of this item
sameValue.listIndex = listIndex;
if (listIndex == listsArgs.length - 1) {
// if this list is the last to stamp, then add to the final result
listBuilder.addItem(item);
}
}
}
}
}