blob: df7e7f3ba640b7eded0d06b3949fdd7a4b7f61ed [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment.join;
import com.google.common.base.Preconditions;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.Query;
import org.apache.druid.query.cache.CacheKeyBuilder;
import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.query.planning.PreJoinableClause;
import org.apache.druid.segment.SegmentReference;
import org.apache.druid.segment.filter.Filters;
import org.apache.druid.segment.join.filter.JoinFilterAnalyzer;
import org.apache.druid.segment.join.filter.JoinFilterPreAnalysis;
import org.apache.druid.segment.join.filter.JoinFilterPreAnalysisKey;
import org.apache.druid.segment.join.filter.JoinableClauses;
import org.apache.druid.segment.join.filter.rewrite.JoinFilterRewriteConfig;
import org.apache.druid.utils.JvmUtils;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
/**
* A wrapper class over {@link JoinableFactory} for working with {@link Joinable} related classes.
*/
public class JoinableFactoryWrapper
{
private static final byte JOIN_OPERATION = 0x1;
private static final Logger log = new Logger(JoinableFactoryWrapper.class);
private final JoinableFactory joinableFactory;
public JoinableFactoryWrapper(final JoinableFactory joinableFactory)
{
this.joinableFactory = Preconditions.checkNotNull(joinableFactory, "joinableFactory");
}
/**
* Creates a Function that maps base segments to {@link HashJoinSegment} if needed (i.e. if the number of join
* clauses is > 0). If mapping is not needed, this method will return {@link Function#identity()}.
*
* @param clauses Pre-joinable clauses
* @param cpuTimeAccumulator An accumulator that we will add CPU nanos to; this is part of the function to encourage
* callers to remember to track metrics on CPU time required for creation of Joinables
* @param query The query that will be run on the mapped segments. Usually this should be
* {@code analysis.getBaseQuery().orElse(query)}, where "analysis" is a
* {@link DataSourceAnalysis} and "query" is the original
* query from the end user.
*/
public Function<SegmentReference, SegmentReference> createSegmentMapFn(
final List<PreJoinableClause> clauses,
final AtomicLong cpuTimeAccumulator,
final Query<?> query
)
{
// compute column correlations here and RHS correlated values
return JvmUtils.safeAccumulateThreadCpuTime(
cpuTimeAccumulator,
() -> {
if (clauses.isEmpty()) {
return Function.identity();
} else {
final JoinableClauses joinableClauses = JoinableClauses.createClauses(clauses, joinableFactory);
final JoinFilterPreAnalysis joinFilterPreAnalysis = JoinFilterAnalyzer.computeJoinFilterPreAnalysis(
new JoinFilterPreAnalysisKey(
JoinFilterRewriteConfig.forQuery(query),
joinableClauses.getJoinableClauses(),
query.getVirtualColumns(),
Filters.toFilter(query.getFilter())
)
);
return baseSegment ->
new HashJoinSegment(
baseSegment,
joinableClauses.getJoinableClauses(),
joinFilterPreAnalysis
);
}
}
);
}
/**
* Compute a cache key prefix for data sources that participate in the RHS of a join. This key prefix
* can be used in segment level cache or result level cache. The function can return following wrapped in an
* Optional
* - Non-empty byte array - If there is join datasource involved and caching is possible. The result includes
* join condition expression, join type and cache key returned by joinable factory for each {@link PreJoinableClause}
* - NULL - There is a join but caching is not possible. It may happen if one of the participating datasource
* in the JOIN is not cacheable.
*
* @param dataSourceAnalysis for the join datasource
* @return the optional cache key to be used as part of query cache key
* @throws {@link IAE} if this operation is called on a non-join data source
*/
public Optional<byte[]> computeJoinDataSourceCacheKey(
final DataSourceAnalysis dataSourceAnalysis
)
{
final List<PreJoinableClause> clauses = dataSourceAnalysis.getPreJoinableClauses();
if (clauses.isEmpty()) {
throw new IAE("No join clauses to build the cache key for data source [%s]", dataSourceAnalysis.getDataSource());
}
final CacheKeyBuilder keyBuilder;
keyBuilder = new CacheKeyBuilder(JOIN_OPERATION);
for (PreJoinableClause clause : clauses) {
Optional<byte[]> bytes = joinableFactory.computeJoinCacheKey(clause.getDataSource(), clause.getCondition());
if (!bytes.isPresent()) {
// Encountered a data source which didn't support cache yet
log.debug("skipping caching for join since [%s] does not support caching", clause.getDataSource());
return Optional.empty();
}
keyBuilder.appendByteArray(bytes.get());
keyBuilder.appendString(clause.getCondition().getOriginalExpression());
keyBuilder.appendString(clause.getPrefix());
keyBuilder.appendString(clause.getJoinType().name());
}
return Optional.of(keyBuilder.build());
}
}