blob: 0d1aad7934557543c683a65c73aeae9bb95a90e9 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.core.query.reduce;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.metrics.BrokerMeter;
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.common.response.broker.BrokerResponseNative;
import org.apache.pinot.common.response.broker.QueryProcessingException;
import org.apache.pinot.common.response.broker.ResultTable;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DataTable;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.core.query.selection.SelectionOperatorService;
import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
import org.apache.pinot.core.transport.ServerRoutingInstance;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Helper class to reduce and set Selection results into the BrokerResponseNative
*/
public class SelectionDataTableReducer implements DataTableReducer {
private static final Logger LOGGER = LoggerFactory.getLogger(SelectionDataTableReducer.class);
private final QueryContext _queryContext;
SelectionDataTableReducer(QueryContext queryContext) {
_queryContext = queryContext;
}
/**
* Reduces data tables and sets selection results into ResultTable.
*/
@Override
public void reduceAndSetResults(String tableName, DataSchema dataSchema,
Map<ServerRoutingInstance, DataTable> dataTableMap, BrokerResponseNative brokerResponseNative,
DataTableReducerContext reducerContext, BrokerMetrics brokerMetrics) {
if (dataTableMap.isEmpty()) {
// For empty data table map, construct empty result using the cached data schema for selection query
List<String> selectionColumns = SelectionOperatorUtils.getSelectionColumns(_queryContext, dataSchema);
DataSchema selectionDataSchema = SelectionOperatorUtils.getResultTableDataSchema(dataSchema, selectionColumns);
brokerResponseNative.setResultTable(new ResultTable(selectionDataSchema, Collections.emptyList()));
} else {
// For data table map with more than one data tables, remove conflicting data tables
if (dataTableMap.size() > 1) {
List<ServerRoutingInstance> droppedServers = removeConflictingResponses(dataSchema, dataTableMap);
if (!droppedServers.isEmpty()) {
String errorMessage = QueryException.MERGE_RESPONSE_ERROR.getMessage() + ": responses for table: " + tableName
+ " from servers: " + droppedServers + " got dropped due to data schema inconsistency.";
LOGGER.warn(errorMessage);
if (brokerMetrics != null) {
brokerMetrics.addMeteredTableValue(TableNameBuilder.extractRawTableName(tableName),
BrokerMeter.RESPONSE_MERGE_EXCEPTIONS, 1L);
}
brokerResponseNative.addToExceptions(
new QueryProcessingException(QueryException.MERGE_RESPONSE_ERROR_CODE, errorMessage));
}
}
int limit = _queryContext.getLimit();
if (limit > 0 && _queryContext.getOrderByExpressions() != null) {
// Selection order-by
SelectionOperatorService selectionService = new SelectionOperatorService(_queryContext, dataSchema);
selectionService.reduceWithOrdering(dataTableMap.values(), _queryContext.isNullHandlingEnabled());
brokerResponseNative.setResultTable(selectionService.renderResultTableWithOrdering());
} else {
// Selection only
List<String> selectionColumns = SelectionOperatorUtils.getSelectionColumns(_queryContext, dataSchema);
List<Object[]> reducedRows = SelectionOperatorUtils.reduceWithoutOrdering(dataTableMap.values(), limit,
_queryContext.isNullHandlingEnabled());
brokerResponseNative.setResultTable(
SelectionOperatorUtils.renderResultTableWithoutOrdering(reducedRows, dataSchema, selectionColumns));
}
}
}
/**
* Given a data schema, remove data tables that are not compatible with this data schema.
* <p>Upgrade the data schema passed in to cover all remaining data schemas.
*
* @param dataSchema data schema.
* @param dataTableMap map from server to data table.
* @return list of server names where the data table got removed.
*/
private List<ServerRoutingInstance> removeConflictingResponses(DataSchema dataSchema,
Map<ServerRoutingInstance, DataTable> dataTableMap) {
List<ServerRoutingInstance> droppedServers = new ArrayList<>();
Iterator<Map.Entry<ServerRoutingInstance, DataTable>> iterator = dataTableMap.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<ServerRoutingInstance, DataTable> entry = iterator.next();
DataSchema dataSchemaToCompare = entry.getValue().getDataSchema();
assert dataSchemaToCompare != null;
if (!dataSchema.isTypeCompatibleWith(dataSchemaToCompare)) {
droppedServers.add(entry.getKey());
iterator.remove();
} else {
dataSchema.upgradeToCover(dataSchemaToCompare);
}
}
return droppedServers;
}
}