blob: d6517c1bfbccf8cfcbb7125e23090a962d852dda [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment.join;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.query.QueryMetrics;
import org.apache.druid.query.filter.Filter;
import org.apache.druid.segment.Cursor;
import org.apache.druid.segment.Metadata;
import org.apache.druid.segment.StorageAdapter;
import org.apache.druid.segment.VirtualColumn;
import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.data.Indexed;
import org.apache.druid.segment.data.ListIndexed;
import org.apache.druid.segment.join.filter.JoinFilterAnalyzer;
import org.apache.druid.segment.join.filter.JoinFilterPreAnalysis;
import org.apache.druid.segment.join.filter.JoinFilterPreAnalysisKey;
import org.apache.druid.segment.join.filter.JoinFilterSplit;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
public class HashJoinSegmentStorageAdapter implements StorageAdapter
{
private final StorageAdapter baseAdapter;
private final List<JoinableClause> clauses;
private final JoinFilterPreAnalysis joinFilterPreAnalysis;
/**
* @param baseAdapter A StorageAdapter for the left-hand side base segment
* @param clauses The right-hand side clauses. The caller is responsible for ensuring that there are no
* @param joinFilterPreAnalysis Pre-analysis for the query we expect to run on this storage adapter
*/
HashJoinSegmentStorageAdapter(
final StorageAdapter baseAdapter,
final List<JoinableClause> clauses,
final JoinFilterPreAnalysis joinFilterPreAnalysis
)
{
this.baseAdapter = baseAdapter;
this.clauses = clauses;
this.joinFilterPreAnalysis = joinFilterPreAnalysis;
}
@Override
public Interval getInterval()
{
return baseAdapter.getInterval();
}
@Override
public Indexed<String> getAvailableDimensions()
{
// Use a Set since we may encounter duplicates, if a field from a Joinable shadows one of the base fields.
final LinkedHashSet<String> availableDimensions = new LinkedHashSet<>();
baseAdapter.getAvailableDimensions().forEach(availableDimensions::add);
for (JoinableClause clause : clauses) {
availableDimensions.addAll(clause.getAvailableColumnsPrefixed());
}
return new ListIndexed<>(Lists.newArrayList(availableDimensions));
}
@Override
public Iterable<String> getAvailableMetrics()
{
return baseAdapter.getAvailableMetrics();
}
@Override
public int getDimensionCardinality(String column)
{
final Optional<JoinableClause> maybeClause = getClauseForColumn(column);
if (maybeClause.isPresent()) {
final JoinableClause clause = maybeClause.get();
return clause.getJoinable().getCardinality(clause.unprefix(column));
} else {
return baseAdapter.getDimensionCardinality(column);
}
}
@Override
public DateTime getMinTime()
{
return baseAdapter.getMinTime();
}
@Override
public DateTime getMaxTime()
{
return baseAdapter.getMaxTime();
}
@Nullable
@Override
public Comparable getMinValue(String column)
{
if (isBaseColumn(column)) {
return baseAdapter.getMinValue(column);
} else {
return null;
}
}
@Nullable
@Override
public Comparable getMaxValue(String column)
{
if (isBaseColumn(column)) {
return baseAdapter.getMaxValue(column);
} else {
return null;
}
}
@Nullable
@Override
public ColumnCapabilities getColumnCapabilities(String column)
{
final Optional<JoinableClause> maybeClause = getClauseForColumn(column);
if (maybeClause.isPresent()) {
final JoinableClause clause = maybeClause.get();
return clause.getJoinable().getColumnCapabilities(clause.unprefix(column));
} else {
return baseAdapter.getColumnCapabilities(column);
}
}
@Nullable
@Override
public String getColumnTypeName(String column)
{
final Optional<JoinableClause> maybeClause = getClauseForColumn(column);
if (maybeClause.isPresent()) {
final JoinableClause clause = maybeClause.get();
final ColumnCapabilities capabilities = clause.getJoinable().getColumnCapabilities(clause.unprefix(column));
return capabilities != null ? capabilities.getType().toString() : null;
} else {
return baseAdapter.getColumnTypeName(column);
}
}
@Override
public int getNumRows()
{
// Cannot determine the number of rows ahead of time for a join segment (rows may be added or removed based
// on the join condition). At the time of this writing, this method is only used by the 'segmentMetadata' query,
// which isn't meant to support join segments anyway.
throw new UnsupportedOperationException("Cannot retrieve number of rows from join segment");
}
@Override
public DateTime getMaxIngestedEventTime()
{
return baseAdapter.getMaxIngestedEventTime();
}
@Override
public Metadata getMetadata()
{
// Cannot get meaningful Metadata for this segment, since it isn't real. At the time of this writing, this method
// is only used by the 'segmentMetadata' query, which isn't meant to support join segments anyway.
throw new UnsupportedOperationException("Cannot retrieve metadata from join segment");
}
@Override
public Sequence<Cursor> makeCursors(
@Nullable final Filter filter,
@Nonnull final Interval interval,
@Nonnull final VirtualColumns virtualColumns,
@Nonnull final Granularity gran,
final boolean descending,
@Nullable final QueryMetrics<?> queryMetrics
)
{
// Filter pre-analysis key implied by the call to "makeCursors". We need to sanity-check that it matches
// the actual pre-analysis that was done. Note: we can't infer a rewrite config from the "makeCursors" call (it
// requires access to the query context) so we'll need to skip sanity-checking it, by re-using the one present
// in the cached key.)
final JoinFilterPreAnalysisKey keyIn =
new JoinFilterPreAnalysisKey(
joinFilterPreAnalysis.getKey().getRewriteConfig(),
clauses,
virtualColumns,
filter
);
final JoinFilterPreAnalysisKey keyCached = joinFilterPreAnalysis.getKey();
if (!keyIn.equals(keyCached)) {
// It is a bug if this happens. The implied key and the cached key should always match.
throw new ISE("Pre-analysis mismatch, cannot execute query");
}
final List<VirtualColumn> preJoinVirtualColumns = new ArrayList<>();
final List<VirtualColumn> postJoinVirtualColumns = new ArrayList<>();
determineBaseColumnsWithPreAndPostJoinVirtualColumns(
virtualColumns,
preJoinVirtualColumns,
postJoinVirtualColumns
);
JoinFilterSplit joinFilterSplit = JoinFilterAnalyzer.splitFilter(joinFilterPreAnalysis);
preJoinVirtualColumns.addAll(joinFilterSplit.getPushDownVirtualColumns());
// Soon, we will need a way to push filters past a join when possible. This could potentially be done right here
// (by splitting out pushable pieces of 'filter') or it could be done at a higher level (i.e. in the SQL planner).
//
// If it's done in the SQL planner, that will likely mean adding a 'baseFilter' parameter to this class that would
// be passed in to the below baseAdapter.makeCursors call (instead of the null filter).
final Sequence<Cursor> baseCursorSequence = baseAdapter.makeCursors(
joinFilterSplit.getBaseTableFilter().isPresent() ? joinFilterSplit.getBaseTableFilter().get() : null,
interval,
VirtualColumns.create(preJoinVirtualColumns),
gran,
descending,
queryMetrics
);
Closer joinablesCloser = Closer.create();
return Sequences.<Cursor, Cursor>map(
baseCursorSequence,
cursor -> {
assert cursor != null;
Cursor retVal = cursor;
for (JoinableClause clause : clauses) {
retVal = HashJoinEngine.makeJoinCursor(retVal, clause, descending, joinablesCloser);
}
return PostJoinCursor.wrap(
retVal,
VirtualColumns.create(postJoinVirtualColumns),
joinFilterSplit.getJoinTableFilter().isPresent() ? joinFilterSplit.getJoinTableFilter().get() : null
);
}
).withBaggage(joinablesCloser);
}
/**
* Returns whether "column" will be selected from "baseAdapter". This is true if it is not shadowed by any joinables
* (i.e. if it does not start with any of their prefixes).
*/
public boolean isBaseColumn(final String column)
{
return !getClauseForColumn(column).isPresent();
}
/**
* Return a String set containing the name of columns that belong to the base table (including any pre-join virtual
* columns as well).
*
* Additionally, if the preJoinVirtualColumns and/or postJoinVirtualColumns arguments are provided, this method
* will add each VirtualColumn in the provided virtualColumns to either preJoinVirtualColumns or
* postJoinVirtualColumns based on whether the virtual column is pre-join or post-join.
*
* @param virtualColumns List of virtual columns from the query
* @param preJoinVirtualColumns If provided, virtual columns determined to be pre-join will be added to this list
* @param postJoinVirtualColumns If provided, virtual columns determined to be post-join will be added to this list
*
* @return The set of base column names, including any pre-join virtual columns.
*/
public Set<String> determineBaseColumnsWithPreAndPostJoinVirtualColumns(
VirtualColumns virtualColumns,
@Nullable List<VirtualColumn> preJoinVirtualColumns,
@Nullable List<VirtualColumn> postJoinVirtualColumns
)
{
final Set<String> baseColumns = new HashSet<>();
baseColumns.add(ColumnHolder.TIME_COLUMN_NAME);
Iterables.addAll(baseColumns, baseAdapter.getAvailableDimensions());
Iterables.addAll(baseColumns, baseAdapter.getAvailableMetrics());
for (VirtualColumn virtualColumn : virtualColumns.getVirtualColumns()) {
// Virtual columns cannot depend on each other, so we don't need to check transitive dependencies.
if (baseColumns.containsAll(virtualColumn.requiredColumns())) {
// Since pre-join virtual columns can be computed using only base columns, we include them in the
// base column set.
baseColumns.add(virtualColumn.getOutputName());
if (preJoinVirtualColumns != null) {
preJoinVirtualColumns.add(virtualColumn);
}
} else {
if (postJoinVirtualColumns != null) {
postJoinVirtualColumns.add(virtualColumn);
}
}
}
return baseColumns;
}
/**
* Returns the JoinableClause corresponding to a particular column, based on the clauses' prefixes.
*
* @param column column name
*
* @return the clause, or absent if the column does not correspond to any clause
*/
private Optional<JoinableClause> getClauseForColumn(final String column)
{
// Check clauses in reverse, since "makeCursors" creates the cursor in such a way that the last clause
// gets first dibs to claim a column.
return Lists.reverse(clauses)
.stream()
.filter(clause -> clause.includesColumn(column))
.findFirst();
}
}