blob: d25623c6986dd6326d53ebec669b129e18b1c5be [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.core.query.reduce;
import java.util.Iterator;
import java.util.Map;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.common.request.BrokerRequest;
import org.apache.pinot.common.response.broker.BrokerResponseNative;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DataTable;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
import org.apache.pinot.core.transport.ServerRoutingInstance;
import org.apache.pinot.core.util.GapfillUtils;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
/**
* The <code>BrokerReduceService</code> class provides service to reduce data tables gathered from multiple servers
* to {@link BrokerResponseNative}.
*/
@ThreadSafe
public class BrokerReduceService extends BaseReduceService {
public BrokerReduceService(PinotConfiguration config) {
super(config);
}
public BrokerResponseNative reduceOnDataTable(BrokerRequest brokerRequest, BrokerRequest serverBrokerRequest,
Map<ServerRoutingInstance, DataTable> dataTableMap, long reduceTimeOutMs, @Nullable BrokerMetrics brokerMetrics) {
if (dataTableMap.isEmpty()) {
// Empty response.
return BrokerResponseNative.empty();
}
Map<String, String> queryOptions = brokerRequest.getPinotQuery().getQueryOptions();
boolean enableTrace =
queryOptions != null && Boolean.parseBoolean(queryOptions.get(CommonConstants.Broker.Request.TRACE));
ExecutionStatsAggregator aggregator = new ExecutionStatsAggregator(enableTrace);
BrokerResponseNative brokerResponseNative = new BrokerResponseNative();
// Cache a data schema from data tables (try to cache one with data rows associated with it).
DataSchema cachedDataSchema = null;
// Process server response metadata.
Iterator<Map.Entry<ServerRoutingInstance, DataTable>> iterator = dataTableMap.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<ServerRoutingInstance, DataTable> entry = iterator.next();
DataTable dataTable = entry.getValue();
// aggregate metrics
aggregator.aggregate(entry.getKey(), dataTable);
// After processing the metadata, remove data tables without data rows inside.
DataSchema dataSchema = dataTable.getDataSchema();
if (dataSchema == null) {
iterator.remove();
} else {
// Try to cache a data table with data rows inside, or cache one with data schema inside.
if (dataTable.getNumberOfRows() == 0) {
if (cachedDataSchema == null) {
cachedDataSchema = dataSchema;
}
iterator.remove();
} else {
cachedDataSchema = dataSchema;
}
}
}
String tableName = serverBrokerRequest.getQuerySource().getTableName();
String rawTableName = TableNameBuilder.extractRawTableName(tableName);
// Set execution statistics and Update broker metrics.
aggregator.setStats(rawTableName, brokerResponseNative, brokerMetrics);
// NOTE: When there is no cached data schema, that means all servers encountered exception. In such case, return the
// response with metadata only.
if (cachedDataSchema == null) {
return brokerResponseNative;
}
QueryContext serverQueryContext = QueryContextConverterUtils.getQueryContext(serverBrokerRequest.getPinotQuery());
DataTableReducer dataTableReducer = ResultReducerFactory.getResultReducer(serverQueryContext);
dataTableReducer.reduceAndSetResults(rawTableName, cachedDataSchema, dataTableMap, brokerResponseNative,
new DataTableReducerContext(_reduceExecutorService, _maxReduceThreadsPerQuery, reduceTimeOutMs,
_groupByTrimThreshold), brokerMetrics);
QueryContext queryContext;
if (brokerRequest == serverBrokerRequest) {
queryContext = serverQueryContext;
} else {
queryContext = QueryContextConverterUtils.getQueryContext(brokerRequest.getPinotQuery());
GapfillUtils.GapfillType gapfillType = GapfillUtils.getGapfillType(queryContext);
BaseGapfillProcessor gapfillProcessor =
GapfillProcessorFactory.getGapfillProcessor(queryContext, gapfillType);
gapfillProcessor.process(brokerResponseNative);
}
if (!serverQueryContext.isExplain()) {
updateAlias(queryContext, brokerResponseNative);
}
return brokerResponseNative;
}
public void shutDown() {
_reduceExecutorService.shutdownNow();
}
}