/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package org.apache.pinot.core.query.reduce;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.metrics.BrokerMeter;
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.common.response.broker.BrokerResponseNative;
import org.apache.pinot.common.response.broker.QueryProcessingException;
import org.apache.pinot.common.response.broker.ResultTable;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DataTable;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.core.query.selection.SelectionOperatorService;
import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
import org.apache.pinot.core.transport.ServerRoutingInstance;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
 * Helper class to reduce and set Selection results into the BrokerResponseNative
 */
public class SelectionDataTableReducer implements DataTableReducer {
  private static final Logger LOGGER = LoggerFactory.getLogger(SelectionDataTableReducer.class);

  private final QueryContext _queryContext;

  SelectionDataTableReducer(QueryContext queryContext) {
    _queryContext = queryContext;
  }

  /**
   * Reduces data tables and sets selection results into ResultTable.
   */
  @Override
  public void reduceAndSetResults(String tableName, DataSchema dataSchema,
      Map<ServerRoutingInstance, DataTable> dataTableMap, BrokerResponseNative brokerResponseNative,
      DataTableReducerContext reducerContext, BrokerMetrics brokerMetrics) {
    if (dataTableMap.isEmpty()) {
      // For empty data table map, construct empty result using the cached data schema for selection query
      List<String> selectionColumns = SelectionOperatorUtils.getSelectionColumns(_queryContext, dataSchema);
      DataSchema selectionDataSchema = SelectionOperatorUtils.getResultTableDataSchema(dataSchema, selectionColumns);
      brokerResponseNative.setResultTable(new ResultTable(selectionDataSchema, Collections.emptyList()));
    } else {
      // For data table map with more than one data tables, remove conflicting data tables
      if (dataTableMap.size() > 1) {
        List<ServerRoutingInstance> droppedServers = removeConflictingResponses(dataSchema, dataTableMap);
        if (!droppedServers.isEmpty()) {
          String errorMessage = QueryException.MERGE_RESPONSE_ERROR.getMessage() + ": responses for table: " + tableName
              + " from servers: " + droppedServers + " got dropped due to data schema inconsistency.";
          LOGGER.warn(errorMessage);
          if (brokerMetrics != null) {
            brokerMetrics.addMeteredTableValue(TableNameBuilder.extractRawTableName(tableName),
                BrokerMeter.RESPONSE_MERGE_EXCEPTIONS, 1L);
          }
          brokerResponseNative.addToExceptions(
              new QueryProcessingException(QueryException.MERGE_RESPONSE_ERROR_CODE, errorMessage));
        }
      }

      int limit = _queryContext.getLimit();
      if (limit > 0 && _queryContext.getOrderByExpressions() != null) {
        // Selection order-by
        SelectionOperatorService selectionService = new SelectionOperatorService(_queryContext, dataSchema);
        selectionService.reduceWithOrdering(dataTableMap.values(), _queryContext.isNullHandlingEnabled());
        brokerResponseNative.setResultTable(selectionService.renderResultTableWithOrdering());
      } else {
        // Selection only
        List<String> selectionColumns = SelectionOperatorUtils.getSelectionColumns(_queryContext, dataSchema);
        List<Object[]> reducedRows = SelectionOperatorUtils.reduceWithoutOrdering(dataTableMap.values(), limit,
            _queryContext.isNullHandlingEnabled());
        brokerResponseNative.setResultTable(
            SelectionOperatorUtils.renderResultTableWithoutOrdering(reducedRows, dataSchema, selectionColumns));
      }
    }
  }

  /**
   * Given a data schema, remove data tables that are not compatible with this data schema.
   * <p>Upgrade the data schema passed in to cover all remaining data schemas.
   *
   * @param dataSchema data schema.
   * @param dataTableMap map from server to data table.
   * @return list of server names where the data table got removed.
   */
  private List<ServerRoutingInstance> removeConflictingResponses(DataSchema dataSchema,
      Map<ServerRoutingInstance, DataTable> dataTableMap) {
    List<ServerRoutingInstance> droppedServers = new ArrayList<>();
    Iterator<Map.Entry<ServerRoutingInstance, DataTable>> iterator = dataTableMap.entrySet().iterator();
    while (iterator.hasNext()) {
      Map.Entry<ServerRoutingInstance, DataTable> entry = iterator.next();
      DataSchema dataSchemaToCompare = entry.getValue().getDataSchema();
      assert dataSchemaToCompare != null;
      if (!dataSchema.isTypeCompatibleWith(dataSchemaToCompare)) {
        droppedServers.add(entry.getKey());
        iterator.remove();
      } else {
        dataSchema.upgradeToCover(dataSchemaToCompare);
      }
    }
    return droppedServers;
  }
}
