blob: a3365a06160be0037d34d6bb47a2a9b145b51de3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sis.internal.storage;
import java.util.Map;
import java.util.List;
import java.util.Arrays;
import java.util.Collections;
import java.util.Spliterator;
import java.util.function.Consumer;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.opengis.util.GenericName;
import org.apache.sis.feature.FeatureOperations;
import org.apache.sis.feature.DefaultFeatureType;
import org.apache.sis.feature.DefaultAssociationRole;
import org.apache.sis.internal.feature.AttributeConvention;
import org.apache.sis.internal.storage.query.SimpleQuery;
import org.apache.sis.storage.FeatureSet;
import org.apache.sis.storage.DataStoreException;
import org.apache.sis.storage.event.StoreListeners;
import org.apache.sis.util.ArraysExt;
import org.apache.sis.util.collection.BackingStoreException;
import org.apache.sis.util.collection.Containers;
// Branch-dependent imports
import org.opengis.feature.Feature;
import org.opengis.feature.FeatureType;
import org.opengis.feature.Operation;
import org.opengis.feature.PropertyType;
import org.opengis.filter.Filter;
import org.opengis.filter.FilterFactory;
import org.opengis.filter.PropertyIsEqualTo;
import org.opengis.filter.expression.Expression;
import org.apache.sis.filter.DefaultFilterFactory;
/**
* Features containing association to features from two different sources, joined by a SQL-like {@code JOIN} condition.
* Each feature in this {@code FeatureSet} contains two or three properties:
*
* <ul>
* <li>An optional identifier created from the identifiers of the left and right features.</li>
* <li>Zero or one association to a "left" feature.</li>
* <li>Zero or one association to a "right" feature.</li>
* </ul>
*
* The left and right features appear together in an {@code JoinFeatureSet} instance when a value from
* {@code leftProperty} in the first feature is equal to a value from {@code rightProperty} in the second feature.
*
* <h2>Implementation note</h2>
* If iterations in one feature set is cheaper than iterations in the other feature set, then the "costly" or larger
* {@code FeatureSet} should be on the left side and the "cheap" {@code FeatureSet} should be on the right side.
*
* <p>This implementation is read-only.</p>
*
* @author Johann Sorel (Geomatys)
* @author Martin Desruisseaux (Geomatys)
* @version 1.0
* @since 1.0
* @module
*/
public class JoinFeatureSet extends AggregatedFeatureSet {
/**
* Specifies whether values on both sides are required (inner join), or only one side (outer join).
*/
public enum Type {
/**
* Only records having a value on both side will be included.
* The {@link JoinFeatureSet} {@code "left"} and {@code "right"} properties will never be null.
*/
INNER(false, false),
/**
* All records from the left side will be included. If there is no matching feature on the right side,
* then the {@link JoinFeatureSet} {@code "right"} property will be {@code null}.
*/
LEFT_OUTER(true, false),
/**
* All records from the right side will be included. If there is no matching feature on the left side,
* then the {@link JoinFeatureSet} {@code "left"} property will be {@code null}.
*/
RIGHT_OUTER(true, true);
/**
* Whether to include all "main" feature instances even if there is no match in the other side.
* This is {@code true} for outer joins and {@code false} for inner joins.
*/
final boolean isOuterJoin;
/**
* {@code true} if the "main" side is the right side instead than the left side.
* See {@link JoinFeatureSet.Iterator} for a definition of "main side".
*/
final boolean swapSides;
/**
* Creates an enumeration.
*/
private Type(final boolean isOuterJoin, final boolean swapSides) {
this.isOuterJoin = isOuterJoin;
this.swapSides = swapSides;
}
/**
* Returns the minimum occurrences for properties on the left or right side.
*
* @param right {@code false} for the left side, or {@code true} for the right side.
*/
final int minimumOccurs(final boolean right) {
return !isOuterJoin | (swapSides == right) ? 1 : 0;
}
/**
* Returns the enumeration value for the given characteristics.
*/
static Type valueOf(final boolean isOuterJoin, final boolean swapSides) {
return isOuterJoin ? (swapSides ? RIGHT_OUTER : LEFT_OUTER) : INNER;
}
}
/**
* The type of features included in this set. Contains two associations as described in class javadoc.
*/
private final FeatureType type;
/**
* The first source of features.
*/
public final FeatureSet left;
/**
* The second source of features.
*/
public final FeatureSet right;
/**
* Name of the associations to the {@link #left} features.
* This may be the name of the {@link #left} feature type, but not necessarily.
*/
private final String leftName;
/**
* Name of the associations to the {@link #right} features.
* This may be the name of the {@link #right} feature type, but not necessarily.
*/
private final String rightName;
/**
* {@code true} if the "main" side is the right side instead than the left side.
* See {@link JoinFeatureSet.Iterator} for a definition of "main side".
*/
private final boolean swapSides;
/**
* Whether to include all "main" feature instances even if there is no match in the other side.
* This is {@code true} for outer joins and {@code false} for inner joins.
*/
private final boolean isOuterJoin;
/**
* The join condition in the form <var>property from left feature</var> = <var>property from right feature</var>.
* This condition specifies also if the comparison is {@linkplain PropertyIsEqualTo#isMatchingCase() case sensitive}
* and {@linkplain PropertyIsEqualTo#getMatchAction() how to compare multi-values}.
*/
public final PropertyIsEqualTo condition;
/**
* The factory to use for creating {@code Query} expressions for retrieving subsets of feature sets.
*/
private final FilterFactory factory;
/**
* Creates a new feature set joining the two given sets. The {@code featureInfo} map defines the name,
* description or other information for the {@code FeatureType} created by this method. It can contain all
* the properties described in {@link org.apache.sis.feature.DefaultFeatureType} plus the following ones:
*
* <ul>
* <li>{@code "identifierDelimiter"} — string to insert between left and right identifiers in the identifiers
* generated by the join operation. If this property is not specified, then no identifier will be generated.</li>
* <li>{@code "identifierPrefix"} — string to insert at the beginning of join identifiers (optional).</li>
* <li>{@code "identifierSuffix"} — string to insert at the end of join identifiers (optional).</li>
* </ul>
*
* @param parent listeners of the parent resource, or {@code null} if none.
* @param left the first source of features. This is often (but not necessarily) the largest set.
* @param leftAlias name of the associations to the {@code left} features, or {@code null} for a default name.
* @param right the second source of features. Should be the set in which iterations are cheapest.
* @param rightAlias name of the associations to the {@code right} features, or {@code null} for a default name.
* @param joinType whether values on both sides are required (inner join), or only one side (outer join).
* @param condition join condition as <var>property from left feature</var> = <var>property from right feature</var>.
* @param featureInfo information about the {@link FeatureType} of this
* @throws DataStoreException if an error occurred while creating the feature set.
*/
public JoinFeatureSet(final StoreListeners parent,
final FeatureSet left, String leftAlias,
final FeatureSet right, String rightAlias,
final Type joinType, final PropertyIsEqualTo condition,
Map<String,?> featureInfo)
throws DataStoreException
{
super(parent);
final FeatureType leftType = left.getType();
final FeatureType rightType = right.getType();
final GenericName leftName = leftType.getName();
final GenericName rightName = rightType.getName();
if (leftAlias == null) leftAlias = leftName.toString();
if (rightAlias == null) rightAlias = rightName.toString();
this.left = left;
this.right = right;
this.leftName = leftAlias;
this.rightName = rightAlias;
this.swapSides = joinType.swapSides;
this.isOuterJoin = joinType.isOuterJoin;
this.condition = condition;
this.factory = new DefaultFilterFactory(); // TODO: replace by some static instance?
/*
* We could build the FeatureType only when first needed, but the type is required by the iterators.
* Since we are going to need the type for any use of this JoinFeatureSet, better to create it now.
*/
PropertyType[] properties = new PropertyType[] {
new DefaultAssociationRole(name(leftAlias), leftType, joinType.minimumOccurs(false), 1),
new DefaultAssociationRole(name(rightAlias), rightType, joinType.minimumOccurs(true), 1)
};
final String identifierDelimiter = Containers.property(featureInfo, "identifierDelimiter", String.class);
if (identifierDelimiter != null && AttributeConvention.hasIdentifier(leftType)
&& AttributeConvention.hasIdentifier(rightType))
{
final Operation identifier = FeatureOperations.compound(
name(AttributeConvention.IDENTIFIER_PROPERTY), identifierDelimiter,
Containers.property(featureInfo, "identifierPrefix", String.class),
Containers.property(featureInfo, "identifierSuffix", String.class), properties);
properties = ArraysExt.insert(properties, 0, 1);
properties[0] = identifier;
}
if (featureInfo == null) {
featureInfo = name(leftName.tip().toString() + '-' + rightName.tip());
}
type = new DefaultFeatureType(featureInfo, false, null, properties);
}
/**
* Creates a minimal {@code properties} map for feature type or property type constructors.
* This minimalist map contain only the mandatory entry, which is the name.
*/
private static Map<String,?> name(final Object name) {
return Collections.singletonMap(DefaultFeatureType.NAME_KEY, name);
}
/**
* Returns the two feature sets used by this {@code JoinFeatureSet}.
* The "main" dependency is at index 0 and the other dependency at index 1.
*
* @return the dependencies in a list of size 2 with the "main" dependency first.
*/
@Override
final List<FeatureSet> dependencies() {
final FeatureSet[] sets = new FeatureSet[] {left, right};
if (swapSides) ArraysExt.swap(sets, 0, 1);
return Arrays.asList(sets);
}
/**
* Specifies whether values on both sides are required (inner join), or only one side (outer join).
*
* @return whether values on both sides are required (inner join), or only one side (outer join).
*/
public Type getJoinType() {
return Type.valueOf(isOuterJoin, swapSides);
}
/**
* Returns a description of properties that are common to all features in this dataset.
* This type may contain one identifier and always contains two associations,
* to the {@linkplain #left} and {@link #right} set of features respectively.
*
* @return a description of properties that are common to all features in this dataset.
*/
@Override
public FeatureType getType() {
return type;
}
/**
* Returns a stream of all features contained in this dataset.
*
* @param parallel {@code true} for a parallel stream (if supported), or {@code false} for a sequential stream.
* @return all features contained in this dataset.
* @throws DataStoreException if an error occurred while creating the stream.
*/
@Override
public Stream<Feature> features(final boolean parallel) throws DataStoreException {
final Iterator it = new Iterator();
return StreamSupport.stream(it, parallel).onClose(it);
}
/**
* Creates a new features containing an association to the two given features.
* The {@code main} feature can not be null (this is not verified).
*/
private Feature join(Feature main, Feature filtered) {
if (swapSides) {
final Feature t = main;
main = filtered;
filtered = t;
}
final Feature f = type.newInstance();
f.setPropertyValue(leftName, main);
f.setPropertyValue(rightName, filtered);
return f;
}
/**
* Iterator over the features resulting from the inner or outer join operation.
* The {@link #run()} method disposes the resources.
*/
private final class Iterator implements Spliterator<Feature>, Consumer<Feature>, Runnable {
/**
* The main stream or a split iterator to close when the {@link #run()} method will be invoked.
* This is initially the stream from which {@link #mainIterator} has been created. However if
* {@link #trySplit()} has been invoked, then this handler may be the other {@code Iterator}
* instance which itself contains a reference to the stream to close, thus forming a chain.
*/
private Runnable mainCloseHandler;
/**
* An iterator over all features in the "main" (usually left) side. The "main" side is the side which
* may include all features: in a "left outer join" this is the left side, and in a "right outer join"
* this is the right side. For inner join we arbitrarily take the left side in accordance with public
* class javadoc, which suggests to put the most costly or larger set on the left side.
*
* <p>Only one iteration will be performed on those features, contrarily to the other side where we may
* iterate over the same elements many times.</p>
*/
private final Spliterator<Feature> mainIterator;
/**
* A feature fetched from the {@link #mainIterator}. The join operation will match this feature with
* zero, one or more features from the other side. A {@code null} value means that this feature needs
* to be retrieved with {@code mainIterator.tryAdvance(…)}.
*/
private Feature mainFeature;
/**
* The stream over features in the other (usually right) side. A new stream will be created every time a new
* feature from the main side is processed. For this reason, it should be the cheapest stream if possible.
*/
private Stream<Feature> filteredStream;
/**
* Iterator for the {@link #filteredStream}. A new iterator will be recreated every time a new feature
* from the main side is processed.
*/
private Spliterator<Feature> filteredIterator;
/**
* A feature fetched from the {@link #filteredIterator}, or {@code null} if none.
*/
private Feature filteredFeature;
/**
* Creates a new iterator. We do not use parallelized {@code mainStream} here because the {@code accept(…)}
* methods used by this {@code Iterator} can not be invoked concurrently by different threads. It does not
* present parallelization at a different level since this {@code Iterator} supports {@link #trySplit()},
* so the {@link Stream} wrapping it can use parallelization.
*/
Iterator() throws DataStoreException {
final Stream<Feature> mainStream = (swapSides ? right : left).features(false);
mainCloseHandler = mainStream::close;
mainIterator = mainStream.spliterator();
}
/**
* Creates an iterator resulting from the call to {@link #trySplit()}.
*/
private Iterator(final Spliterator<Feature> it) {
mainIterator = it;
}
/**
* If this iterator can be partitioned, returns a spliterator covering a prefix of the feature set.
* Upon return from this method, this iterator will cover a suffix of the feature set.
* Returns {@code null} if this iterator can not be partitioned.
*/
@Override
public Spliterator<Feature> trySplit() {
final Spliterator<Feature> s = mainIterator.trySplit();
if (s == null) {
return null;
}
final Iterator it = new Iterator(s);
it.mainCloseHandler = mainCloseHandler;
mainCloseHandler = it;
return it;
}
/**
* Specifies that the iterator will return only non-null elements. Whether those elements will
* be ordered depends on whether the main iterator provides ordered elements in the first place.
*
* <p><b>NOTE:</b> to be strict, we should check if the "filtered" stream is also ordered. But this
* is more difficult to check. Current implementation assumes that if the "mean" stream is ordered,
* then the other stream is ordered too. Furthermore the {@link #trySplit()} method works only on
* the main stream, so at least the {@code trySplit} requirement about prefix and suffix order is
* still fulfill even if the other stream is unordered.</p>
*/
@Override
public int characteristics() {
return (mainIterator.characteristics() & ORDERED) | NONNULL;
}
/**
* Estimated size is unknown.
*/
@Override
public long estimateSize() {
return Long.MAX_VALUE;
}
/**
* Closes the streams used by this iterator, together with the streams used by any spliterator
* created by {@link #trySplit()}. This method is registered to {@link Stream#onClose(Runnable)}.
*/
@Override
public void run() {
closeFilteredIterator();
final Runnable toClose = mainCloseHandler;
if (toClose != null) {
mainCloseHandler = null; // Cleared first in case of error.
toClose.run();
}
}
/**
* Invoked when iteration on the filtered stream ended, before to move on the next feature of the main stream.
* This method is idempotent: it has no effect if the stream is already closed.
*/
private void closeFilteredIterator() {
final Stream<Feature> stream = filteredStream;
filteredStream = null; // Cleared before call to close() in case of error.
filteredIterator = null;
filteredFeature = null; // Used as a sentinel value by this.forEachRemaining(…).
mainFeature = null; // Indicate that we will need to advance in mainIterator.
if (stream != null) {
stream.close();
}
}
/**
* Creates a new iterator over the filtered set of features (usually the right side).
* The filtering condition is determined by the current {@link #mainFeature}.
*/
private void createFilteredIterator() {
Expression expression1 = condition.getExpression1();
Expression expression2 = condition.getExpression2();
FeatureSet filteredSet = right;
if (swapSides) {
filteredSet = left;
Expression t = expression2;
expression2 = expression1;
expression1 = t;
}
final Object mainValue = expression1.evaluate(mainFeature);
final Filter filter;
if (mainValue != null) {
filter = factory.equal(expression2, factory.literal(mainValue),
condition.isMatchingCase(), condition.getMatchAction());
} else {
filter = factory.isNull(expression2);
}
final SimpleQuery query = new SimpleQuery();
query.setFilter(filter);
try {
filteredStream = filteredSet.subset(query).features(false);
} catch (DataStoreException e) {
throw new BackingStoreException(e);
}
filteredIterator = filteredStream.spliterator();
}
/**
* Executes the given action on all remaining features in the {@code JoinFeatureSet}.
*/
@Override
public void forEachRemaining(final Consumer<? super Feature> action) {
final Consumer<Feature> forFiltered = (final Feature feature) -> {
if (feature != null) {
action.accept(join(mainFeature, filteredFeature = feature));
}
};
final Consumer<Feature> forMain = (final Feature feature) -> {
if (feature != null) {
mainFeature = feature;
createFilteredIterator();
filteredIterator.forEachRemaining(forFiltered);
final boolean none = (filteredFeature == null);
closeFilteredIterator();
if (none && isOuterJoin) {
action.accept(join(feature, null));
}
// Do not close the main stream since it may be in use by other Spliterators.
}
};
forMain.accept(mainFeature); // In case some 'tryAdvance' has been invoked before.
mainIterator.forEachRemaining(forMain);
}
/**
* Callback for {@code Spliterator.tryAdvance(this)} on {@link #filteredIterator}.
* Used by {@link #tryAdvance(Consumer)} implementation only.
*/
@Override
public void accept(final Feature feature) {
filteredFeature = feature;
}
/**
* Executes the given action on the next feature in the {@code JoinFeatureSet}.
*/
@Override
public boolean tryAdvance​(final Consumer<? super Feature> action) {
for (;;) {
if (mainFeature == null) {
do if (!mainIterator.tryAdvance(this)) {
return false;
} while (filteredFeature == null);
mainFeature = filteredFeature;
filteredFeature = null;
}
if (filteredIterator == null) {
createFilteredIterator();
}
final boolean none = (filteredFeature == null);
while (filteredIterator.tryAdvance(this)) {
if (filteredFeature != null) {
action.accept(join(mainFeature, filteredFeature));
return true;
}
}
final Feature feature = mainFeature;
closeFilteredIterator();
if (none && isOuterJoin) {
action.accept(join(feature, null));
return true;
}
}
}
}
}