| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to you under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.calcite.runtime; |
| |
| import org.apache.calcite.interpreter.Row; |
| import org.apache.calcite.linq4j.AbstractEnumerable; |
| import org.apache.calcite.linq4j.Enumerable; |
| import org.apache.calcite.linq4j.Enumerator; |
| import org.apache.calcite.linq4j.function.Function1; |
| |
| import org.checkerframework.checker.nullness.qual.Nullable; |
| |
| import java.util.ArrayDeque; |
| import java.util.Deque; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Objects; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.function.Consumer; |
| import java.util.function.Supplier; |
| |
| /** |
| * Utilities for processing {@link org.apache.calcite.linq4j.Enumerable} |
| * collections. |
| * |
| * <p>This class is a place to put things not yet added to linq4j. |
| * Methods are subject to removal without notice. |
| */ |
| public class Enumerables { |
| |
| private Enumerables() {} |
| |
| /** Converts an enumerable over singleton arrays into the enumerable of their |
| * first elements. */ |
| public static <E> Enumerable<E> slice0(Enumerable<E[]> enumerable) { |
| return enumerable.select(elements -> elements[0]); |
| } |
| |
| /** Converts an {@link Enumerable} over object arrays into an |
| * {@link Enumerable} over {@link Row} objects. */ |
| public static Enumerable<Row> toRow(final Enumerable<@Nullable Object[]> enumerable) { |
| return enumerable.select((Function1<@Nullable Object[], Row>) Row::asCopy); |
| } |
| |
| /** Converts a supplier of an {@link Enumerable} over object arrays into a |
| * supplier of an {@link Enumerable} over {@link Row} objects. */ |
| public static Supplier<Enumerable<Row>> toRow( |
| final Supplier<Enumerable<@Nullable Object[]>> supplier) { |
| return () -> toRow(supplier.get()); |
| } |
| |
| @SuppressWarnings("Guava") |
| @Deprecated // to be removed before 2.0 |
| public static com.google.common.base.Supplier<Enumerable<Row>> toRow( |
| final com.google.common.base.Supplier<Enumerable<@Nullable Object[]>> supplier) { |
| return () -> toRow(supplier.get()); |
| } |
| |
| public static <E, TKey, TResult> Enumerable<TResult> match( |
| Enumerable<E> enumerable, |
| final Function1<E, TKey> keySelector, |
| Matcher<E> matcher, |
| Emitter<E, TResult> emitter, int history, int future) { |
| return new AbstractEnumerable<TResult>() { |
| @Override public Enumerator<TResult> enumerator() { |
| return new Enumerator<TResult>() { |
| final Enumerator<E> inputEnumerator = enumerable.enumerator(); |
| |
| // State of each partition. |
| final Map<TKey, Matcher.PartitionState<E>> partitionStates = |
| new HashMap<>(); |
| |
| int inputRow = -1; |
| |
| final Deque<TResult> emitRows = new ArrayDeque<>(); |
| |
| /** Current result row. Null if no row is ready. */ |
| @Nullable TResult resultRow; |
| |
| /** Match counter is 1-based in Oracle. */ |
| final AtomicInteger matchCounter = new AtomicInteger(1); |
| |
| @Override public TResult current() { |
| Objects.requireNonNull(resultRow, "resultRow"); |
| return resultRow; |
| } |
| |
| @Override public boolean moveNext() { |
| for (;;) { |
| resultRow = emitRows.pollFirst(); |
| if (resultRow != null) { |
| return true; |
| } |
| // No rows are currently read to emit. Read the next input row, |
| // see whether it completes a match (or matches), and if so, add |
| // the resulting rows to the buffer. |
| if (!inputEnumerator.moveNext()) { |
| return false; |
| } |
| ++inputRow; |
| final E row = inputEnumerator.current(); |
| final TKey key = keySelector.apply(row); |
| final Matcher.PartitionState<E> partitionState = |
| partitionStates.computeIfAbsent(key, |
| k -> matcher.createPartitionState(history, future)); |
| |
| partitionState.getMemoryFactory().add(row); |
| |
| |
| matcher.matchOne(partitionState.getRows(), partitionState, |
| // TODO 26.12.18 jf: add row states (whatever this is?) |
| matches -> emitter.emit(matches.rows, null, matches.symbols, |
| matchCounter.getAndIncrement(), emitRows::add)); |
| /* |
| recentRows.add(e); |
| int earliestRetainedRow = Integer.MAX_VALUE; |
| for (int i = 0; i < partitionState.incompleteMatches.size(); i++) { |
| MatchState match = partitionState.incompleteMatches.get(i); |
| earliestRetainedRow = Math.min(earliestRetainedRow, match.firstRow); |
| final int state = automaton.nextState(match.state, e); |
| switch (state) { |
| case Automaton.ACCEPT: |
| final List<E> matchedRows = |
| recentRows.subList(0, 0); // TODO: |
| final List<Integer> rowStates = ImmutableList.of(); // TODO: |
| emitRows.addAll( |
| emitter.emit(matchedRows, rowStates, |
| partitionState.matchCount++)); |
| // fall through |
| case Automaton.FAIL: |
| partitionState.incompleteMatches.remove(i--); |
| break; |
| default: |
| match.state = state; |
| } |
| } |
| // Try to start a match based on the current row |
| final int state = automaton.nextState(Automaton.START_STATE, e); |
| switch (state) { |
| case Automaton.ACCEPT: |
| final List<E> matchedRows = ImmutableList.of(e); |
| final List<Integer> rowStates = ImmutableList.of(state); |
| emitRows.addAll( |
| emitter.emit(matchedRows, rowStates, |
| partitionState.matchCount++)); |
| // fall through |
| case Automaton.FAIL: |
| // since it immediately succeeded or failed, don't add |
| // it to the queue |
| break; |
| default: |
| partitionState.incompleteMatches.add( |
| new MatchState(inputRow, state)); |
| } |
| */ |
| } |
| } |
| |
| @Override public void reset() { |
| throw new UnsupportedOperationException(); |
| } |
| |
| @Override public void close() { |
| inputEnumerator.close(); |
| } |
| }; |
| } |
| }; |
| } |
| |
| /** Given a match (a list of rows, and their states) produces a list |
| * of rows to be output. |
| * |
| * @param <E> element type |
| * @param <TResult> result type */ |
| public interface Emitter<E, TResult> { |
| void emit(List<E> rows, @Nullable List<Integer> rowStates, List<String> rowSymbols, int match, |
| Consumer<TResult> consumer); |
| } |
| } |