blob: f857cedab7e654210ccc07272149a02a17580279 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
package org.apache.groovy.ginq.provider.collection.runtime;
import groovy.lang.GroovyRuntimeException;
import groovy.lang.Tuple;
import groovy.lang.Tuple2;
import groovy.lang.Tuple3;
import groovy.transform.Internal;
import org.apache.groovy.internal.util.Supplier;
import org.apache.groovy.util.SystemUtil;
import org.codehaus.groovy.runtime.DefaultGroovyMethods;
import org.codehaus.groovy.runtime.dgmimpl.NumberNumberMinus;
import org.codehaus.groovy.runtime.typehandling.NumberMath;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.BiFunction;
import java.util.function.BiPredicate;
import java.util.function.Function;
import java.util.function.Predicate;
import static groovy.lang.Tuple.tuple;
import static java.lang.Math.sqrt;
import static java.util.Comparator.naturalOrder;
import static java.util.Comparator.nullsFirst;
import static java.util.Comparator.nullsLast;
import static java.util.Comparator.reverseOrder;
import static org.apache.groovy.ginq.provider.collection.runtime.Queryable.from;
import static org.apache.groovy.ginq.provider.collection.runtime.WindowImpl.composeOrders;
import static org.codehaus.groovy.runtime.typehandling.NumberMath.toBigDecimal;
* Represents the queryable collections
* @param <T> the type of Queryable element
* @since 4.0.0
class QueryableCollection<T> implements Queryable<T>, Serializable {
QueryableCollection(Iterable<T> sourceIterable) {
this.sourceIterable = sourceIterable;
QueryableCollection(Stream<T> sourceStream) {
this.sourceStream = sourceStream;
public Iterator<T> iterator() {
try {
if (null != sourceIterable) {
return sourceIterable.iterator();
return sourceStream.iterator();
} finally {
public <U> Queryable<Tuple2<T, U>> innerJoin(Queryable<? extends U> queryable, BiPredicate<? super T, ? super U> joiner) {
Stream<Tuple2<T, U>> stream =
.flatMap(p -> {
if (queryable instanceof QueryableCollection) {
((QueryableCollection) queryable).makeReusable();
.filter(c -> joiner.test(p, c))
.map(c -> tuple(p, c));
return from(stream);
public <U> Queryable<Tuple2<T, U>> innerHashJoin(Queryable<? extends U> queryable, Function<? super T, ?> fieldsExtractor1, Function<? super U, ?> fieldsExtractor2) {
final ConcurrentObjectHolder<Map<Integer, List<U>>> hashTableHolder = new ConcurrentObjectHolder<>();
final Supplier<Map<Integer, List<U>>> hashTableSupplier = createHashTableSupplier(queryable, fieldsExtractor2);
Stream<Tuple2<T, U>> stream = -> {
// build hash table
Map<Integer, List<U>> hashTable = buildHashTable(hashTableHolder, hashTableSupplier);
// probe the hash table
return probeHashTable(hashTable, p, fieldsExtractor1, fieldsExtractor2);
return from(stream);
private static <U> Supplier<Map<Integer, List<U>>> createHashTableSupplier(Queryable<? extends U> queryable, Function<? super U, ?> fieldsExtractor2) {
return () ->
c -> hash(fieldsExtractor2.apply(c)),
(oldList, newList) -> {
if (!(oldList instanceof ArrayList)) {
List<U> tmpList = new ArrayList<>(HASHTABLE_BUCKET_INITIAL_SIZE);
oldList = tmpList;
return oldList;
private static final int HASHTABLE_MAX_SIZE = SystemUtil.getIntegerSafe("groovy.ginq.hashtable.max.size", 128);
private static final int HASHTABLE_BUCKET_INITIAL_SIZE = SystemUtil.getIntegerSafe("groovy.ginq.hashtable.bucket.initial.size", 16);
private static Integer hash(Object obj) {
return Objects.hash(obj) % HASHTABLE_MAX_SIZE; // mod `HASHTABLE_MAX_SIZE` to limit the size of hash table
public <U> Queryable<Tuple2<T, U>> leftJoin(Queryable<? extends U> queryable, BiPredicate<? super T, ? super U> joiner) {
return outerJoin(this, queryable, joiner);
public <U> Queryable<Tuple2<T, U>> leftHashJoin(Queryable<? extends U> queryable, Function<? super T, ?> fieldsExtractor1, Function<? super U, ?> fieldsExtractor2) {
return outerHashJoin(this, queryable, fieldsExtractor1, fieldsExtractor2);
public <U> Queryable<Tuple2<T, U>> rightJoin(Queryable<? extends U> queryable, BiPredicate<? super T, ? super U> joiner) {
return outerJoin(queryable, this, (a, b) -> joiner.test(b, a)).select((e, q) -> tuple(e.getV2(), e.getV1()));
public <U> Queryable<Tuple2<T, U>> rightHashJoin(Queryable<? extends U> queryable, Function<? super T, ?> fieldsExtractor1, Function<? super U, ?> fieldsExtractor2) {
return outerHashJoin(queryable, this, fieldsExtractor2, fieldsExtractor1).select((e, q) -> tuple(e.getV2(), e.getV1()));
public <U> Queryable<Tuple2<T, U>> fullJoin(Queryable<? extends U> queryable, BiPredicate<? super T, ? super U> joiner) {
if (queryable instanceof QueryableCollection) {
((QueryableCollection) queryable).makeReusable();
Queryable<Tuple2<T, U>> lj = this.leftJoin(queryable, joiner);
Queryable<Tuple2<T, U>> rj = this.rightJoin(queryable, joiner);
return lj.union(rj);
public <U> Queryable<Tuple2<T, U>> fullHashJoin(Queryable<? extends U> queryable, Function<? super T, ?> fieldsExtractor1, Function<? super U, ?> fieldsExtractor2) {
if (queryable instanceof QueryableCollection) {
((QueryableCollection) queryable).makeReusable();
Queryable<Tuple2<T, U>> lj = this.leftHashJoin(queryable, fieldsExtractor1, fieldsExtractor2);
Queryable<Tuple2<T, U>> rj = this.rightHashJoin(queryable, fieldsExtractor1, fieldsExtractor2);
return lj.union(rj);
public <U> Queryable<Tuple2<T, U>> crossJoin(Queryable<? extends U> queryable) {
Stream<Tuple2<T, U>> stream =
.flatMap(p -> {
if (queryable instanceof QueryableCollection) {
((QueryableCollection) queryable).makeReusable();
.map(c -> tuple(p, c));
return from(stream);
public Queryable<T> where(Predicate<? super T> filter) {
Stream<T> stream =;
return from(stream);
public Queryable<Tuple2<?, Queryable<T>>> groupBy(Function<? super T, ?> classifier, Predicate<? super Tuple2<?, Queryable<? extends T>>> having) {
Collector<T, ?, ? extends Map<?, List<T>>> groupingBy =
isParallel() ? Collectors.groupingByConcurrent(classifier, Collectors.toList())
: Collectors.groupingBy(classifier, Collectors.toList());
Stream<Tuple2<?, Queryable<T>>> stream =
.filter(m -> null == having || having.test(tuple(m.getKey(), from(m.getValue()))))
.map(m -> tuple(m.getKey(), from(m.getValue())));
return Group.of(stream);
public <U extends Comparable<? super U>> Queryable<T> orderBy(Order<? super T, ? extends U>... orders) {
Comparator<T> comparator = makeComparator(orders);
if (null == comparator) {
return this;
return from(;
protected static <T, U extends Comparable<? super U>> Comparator<T> makeComparator(List<? extends Order<? super T, ? extends U>> orders) {
return makeComparator(orders.toArray(Order.EMPTY_ARRAY));
protected static <T, U extends Comparable<? super U>> Comparator<T> makeComparator(Order<? super T, ? extends U>... orders) {
if (null == orders || 0 == orders.length) {
return null;
Comparator<T> comparator = null;
for (int i = 0, n = orders.length; i < n; i++) {
Order<? super T, ? extends U> order = orders[i];
Comparator<U> ascOrDesc = order.isAsc() ? naturalOrder() : reverseOrder();
Comparator<U> nullsLastOrFirst = order.isNullsLast() ? nullsLast(ascOrDesc) : nullsFirst(ascOrDesc);
comparator =
0 == i
? Comparator.comparing(order.getKeyExtractor(), nullsLastOrFirst)
: comparator.thenComparing(order.getKeyExtractor(), nullsLastOrFirst);
return comparator;
public Queryable<T> limit(long offset, long size) {
Stream<T> stream =;
return from(stream);
public <U> Queryable<U> select(BiFunction<? super T, ? super Queryable<? extends T>, ? extends U> mapper) {
final String originalParallel = QueryableHelper.getVar(PARALLEL);
QueryableHelper.setVar(PARALLEL, FALSE_STR); // ensure the row number is generated sequentially
boolean useWindowFunction = TRUE_STR.equals(QueryableHelper.getVar(USE_WINDOW_FUNCTION));
if (useWindowFunction) {
Stream<U> stream = null;
if (this instanceof Group) {
if (0 == this.count()) {
stream = Stream.of((T) tuple(Collections.emptyMap(), EMPTY_QUERYABLE)).map((T t) -> mapper.apply(t, this));
if (null == stream) {
stream = t) -> mapper.apply(t, this));
if (TRUE_STR.equals(originalParallel)) {
// invoke `collect` to trigger the intermediate operator, which will create `CompletableFuture` instances
stream = stream.collect(Collectors.toList()).parallelStream().map((U u) -> {
try {
return (U) ((CompletableFuture) u).get();
} catch (InterruptedException | ExecutionException ex) {
throw new GroovyRuntimeException(ex);
QueryableHelper.setVar(PARALLEL, originalParallel);
return from(stream);
public Queryable<T> distinct() {
Stream<T> stream =;
return from(stream);
public Queryable<T> unionAll(Queryable<? extends T> queryable) {
Stream<T> stream = Stream.concat(,;
return from(stream);
public Queryable<T> intersect(Queryable<? extends T> queryable) {
Stream<T> stream = -> {
if (queryable instanceof QueryableCollection) {
((QueryableCollection) queryable).makeReusable();
return -> b.equals(a));
return from(stream);
public Queryable<T> minus(Queryable<? extends T> queryable) {
Stream<T> stream = -> {
if (queryable instanceof QueryableCollection) {
((QueryableCollection) queryable).makeReusable();
return -> b.equals(a));
return from(stream);
// ------------------------------ BEGIN AGGREGATE FUNCTIONS --------------------------------
public Long count() {
return agg(q ->;
public <U> Long count(Function<? super T, ? extends U> mapper) {
return agg(q ->
public BigDecimal sum(Function<? super T, ? extends Number> mapper) {
return agg(q ->
.map(e -> {
Number n = mapper.apply(e);
if (null == n) return BigDecimal.ZERO;
return toBigDecimal(n);
}).reduce(BigDecimal.ZERO, BigDecimal::add));
public BigDecimal avg(Function<? super T, ? extends Number> mapper) {
Object[] result = agg(q ->
.reduce(new Object[] {0L, BigDecimal.ZERO}, (r, e) -> {
r[0] = (Long) r[0] + 1;
r[1] = ((BigDecimal) r[1]).add(e);
return r;
}, (o1, o2) -> o1)
return ((BigDecimal) result[1]).divide(toBigDecimal((Long) result[0]), 16, RoundingMode.HALF_UP);
public <U extends Comparable<? super U>> U min(Function<? super T, ? extends U> mapper) {
return agg(q ->
public <U extends Comparable<? super U>> U max(Function<? super T, ? extends U> mapper) {
return agg(q ->
public BigDecimal median(Function<? super T, ? extends Number> mapper) {
List<BigDecimal> sortedNumList = agg(q ->
int size = sortedNumList.size();
if (0 == size) {
return null;
int index = size / 2;
BigDecimal num = sortedNumList.get(index);
if (0 == size % 2) {
return num.add(sortedNumList.get(index - 1)).divide(BD_TWO);
return num;
public BigDecimal stdev(Function<? super T, ? extends Number> mapper) {
return sd(mapper, 0);
public BigDecimal stdevp(Function<? super T, ? extends Number> mapper) {
return sd(mapper, 1);
public BigDecimal var(Function<? super T, ? extends Number> mapper) {
return vr(mapper, 0);
public BigDecimal varp(Function<? super T, ? extends Number> mapper) {
return vr(mapper, 1);
private BigDecimal vr(Function<? super T, ? extends Number> mapper, int diff) {
BigDecimal avg = this.avg(mapper);
Object[] result = agg(q ->
.map(e -> toBigDecimal(NumberNumberMinus.minus(e, avg)).pow(2))
.reduce(new Object[]{0L, BigDecimal.ZERO}, (r, e) -> {
r[0] = (Long) r[0] + 1;
r[1] = ((BigDecimal) r[1]).add(e);
return r;
}, (o1, o2) -> o1));
return ((BigDecimal) result[1]).divide(toBigDecimal((Long) result[0] - diff), 16, RoundingMode.HALF_UP);
private BigDecimal sd(Function<? super T, ? extends Number> mapper, int diff) {
// `BigDecimal.sqrt` is introduced since Java9, so we can not use it for now.
return toBigDecimal(sqrt(vr(mapper, diff).doubleValue()));
public <U> U agg(Function<? super Queryable<? extends T>, ? extends U> mapper) {
return mapper.apply(this);
// ------------------------------ END AGGREGATE FUNCTIONS --------------------------------
private static <T, U> Queryable<Tuple2<T, U>> outerJoin(Queryable<? extends T> queryable1, Queryable<? extends U> queryable2, BiPredicate<? super T, ? super U> joiner) {
Stream<Tuple2<T, U>> stream =
.flatMap(p -> {
if (queryable2 instanceof QueryableCollection) {
((QueryableCollection) queryable2).makeReusable();
List<Tuple2<T, U>> joinResultList =
.filter(c -> joiner.test(p, c))
.map(c -> tuple((T) p, (U) c))
return joinResultList.isEmpty() ? Stream.of(tuple(p, null)) :;
return from(stream);
private static <T, U> Queryable<Tuple2<T, U>> outerHashJoin(Queryable<? extends T> queryable1, Queryable<? extends U> queryable2, Function<? super T, ?> fieldsExtractor1, Function<? super U, ?> fieldsExtractor2) {
final ConcurrentObjectHolder<Map<Integer, List<U>>> hashTableHolder = new ConcurrentObjectHolder<>();
final Supplier<Map<Integer, List<U>>> hashTableSupplier = createHashTableSupplier(queryable2, fieldsExtractor2);
Stream<Tuple2<T, U>> stream = -> {
// build hash table
Map<Integer, List<U>> hashTable = buildHashTable(hashTableHolder, hashTableSupplier);
// probe the hash table
List<Tuple2<T, U>> joinResultList =
probeHashTable(hashTable, (T) p, fieldsExtractor1, fieldsExtractor2).collect(Collectors.toList());
return joinResultList.isEmpty() ? Stream.of(tuple(p, null)) :;
return from(stream);
private static <U> Map<Integer, List<U>> buildHashTable(final ConcurrentObjectHolder<Map<Integer, List<U>>> hashTableHolder, final Supplier<Map<Integer, List<U>>> hashTableSupplier) {
return hashTableHolder.getObject(hashTableSupplier);
private static <T, U> Stream<Tuple2<T, U>> probeHashTable(Map<Integer, List<U>> hashTable, T p, Function<? super T, ?> fieldsExtractor1, Function<? super U, ?> fieldsExtractor2) {
final Object otherFields = fieldsExtractor1.apply(p);
return hashTable.entrySet().stream()
.filter(entry -> hash(otherFields).equals(entry.getKey()))
.flatMap(entry -> {
List<U> candidateList = entry.getValue();
.filter(c -> Objects.equals(otherFields, fieldsExtractor2.apply(c)))
.map(c -> tuple(p, c));
public List<T> toList() {
try {
if (sourceIterable instanceof List) {
return (List<T>) sourceIterable;
final List<T> result = stream().collect(Collectors.toList());
sourceIterable = result;
return result;
} finally {
public long size() {
return stream().count();
public Stream<T> stream() {
try {
if (isReusable()) {
sourceStream = toStream(sourceIterable); // we have to create new stream every time because Java stream can not be reused
if (!sourceStream.isParallel() && isParallel()) {
sourceStream = sourceStream.parallel();
return sourceStream;
} finally {
public <U extends Comparable<? super U>> Window<T> over(Tuple2<T, Long> currentRecord, WindowDefinition<T, U> windowDefinition) {
final Tuple3<String, String, String> idTuple = (Tuple3<String, String, String>) windowDefinition.getId(); // (partitionId, orderId, windowDefinitionId)
final String partitionId = idTuple.getV1();
Partition<Tuple2<T, Long>> partition = partitionCache.computeIfAbsent(
new PartitionCacheKey(windowDefinition.partitionBy().apply(currentRecord.getV1()), partitionId),
partitionCacheKey -> from(Collections.singletonList(currentRecord)).innerHashJoin(
allPartitionCache.computeIfAbsent(partitionId, pid -> {
long[] rn = new long[]{0L};
List<Tuple2<T, Long>> listWithIndex =
.map(e -> Tuple.tuple(e, rn[0]++))
final Queryable<Tuple2<?, Partition<Tuple2<T, Long>>>> q =
.select((e, x) -> Tuple.tuple(e.getV1(), Partition.of(e.getV2().toList())));
if (q instanceof QueryableCollection) {
((QueryableCollection) q).makeReusable();
return q;
}), a -> partitionCacheKey.partitionKey, Tuple2::getV1
).select((e, q) -> e.getV2().getV2())
final String orderId = idTuple.getV2();
final SortedPartitionCacheKey<T> sortedPartitionCacheKey = new SortedPartitionCacheKey<>(partition, orderId);
Partition<Tuple2<T, Long>> sortedPartition = sortedPartitionCache.computeIfAbsent(
sortedPartitionId -> Partition.of(partition.orderBy(composeOrders(windowDefinition)).toList())
return Window.of(currentRecord, sortedPartition, windowDefinition);
private static class PartitionCacheKey {
private final Object partitionKey;
private final String partitionId;
public PartitionCacheKey(Object partitionKey, String partitionId) {
this.partitionKey = partitionKey;
this.partitionId = partitionId;
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof PartitionCacheKey)) return false;
PartitionCacheKey that = (PartitionCacheKey) o;
return partitionKey.equals(that.partitionKey) && partitionId.equals(that.partitionId);
public int hashCode() {
return Objects.hash(partitionKey, partitionId);
private static class SortedPartitionCacheKey<T> {
private final Partition<Tuple2<T, Long>> partition;
private final String orderId;
public SortedPartitionCacheKey(Partition<Tuple2<T, Long>> partition, String orderId) {
this.partition = partition;
this.orderId = orderId;
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof SortedPartitionCacheKey)) return false;
SortedPartitionCacheKey that = (SortedPartitionCacheKey) o;
return partition == that.partition && orderId.equals(that.orderId);
public int hashCode() {
return Objects.hash(partition.size(), orderId);
private static <T> Stream<T> toStream(Iterable<T> sourceIterable) {
return, isParallel());
private static boolean isParallel() {
return QueryableHelper.isParallel();
private boolean isReusable() {
try {
return null != sourceIterable;
} finally {
private void makeReusable() {
if (null != this.sourceIterable) return;
try {
if (null != this.sourceIterable) return;
this.sourceIterable = this.sourceStream.collect(Collectors.toList());
} finally {
public Object asType(Class<?> clazz) {
if (List.class == clazz || Collection.class == clazz || Iterable.class == clazz) {
return toList();
if (clazz.isArray()) {
return DefaultGroovyMethods.asType(toList(), clazz);
if (Set.class == clazz) {
return new LinkedHashSet<>(toList());
if (Stream.class == clazz) {
return stream();
if (Iterator.class == clazz) {
return iterator();
return DefaultGroovyMethods.asType(this, clazz);
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof QueryableCollection)) return false;
QueryableCollection<?> that = (QueryableCollection<?>) o;
return toList().equals(that.toList());
public int hashCode() {
return Objects.hash(toList());
public String toString() {
return AsciiTableMaker.makeAsciiTable(this);
private final Map<String, Queryable<Tuple2<?, Partition<Tuple2<T, Long>>>>> allPartitionCache = new ConcurrentHashMap<>(4);
private final Map<PartitionCacheKey, Partition<Tuple2<T, Long>>> partitionCache = new ConcurrentHashMap<>(4);
private final Map<SortedPartitionCacheKey<T>, Partition<Tuple2<T, Long>>> sortedPartitionCache = new ConcurrentHashMap<>(4);
private Stream<T> sourceStream;
private volatile Iterable<T> sourceIterable;
private final ReadWriteLock rwl = new ReentrantReadWriteLock();
private final Lock readLock = rwl.readLock();
private final Lock writeLock = rwl.writeLock();
private static final BigDecimal BD_TWO = BigDecimal.valueOf(2);
private static final String USE_WINDOW_FUNCTION = "useWindowFunction";
private static final String PARALLEL = "parallel";
private static final String TRUE_STR = "true";
private static final String FALSE_STR = "false";
private static final long serialVersionUID = -5067092453136522893L;