blob: c1f6ef8d2366137eb9aa3c6f68d8c432599ab7df [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.core.query.reduce;
import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.common.proto.Server;
import org.apache.pinot.common.request.BrokerRequest;
import org.apache.pinot.common.request.PinotQuery;
import org.apache.pinot.common.response.broker.BrokerResponseNative;
import org.apache.pinot.common.utils.DataTable;
import org.apache.pinot.core.common.datatable.DataTableFactory;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
import org.apache.pinot.core.transport.ServerRoutingInstance;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The <code>StreamingReduceService</code> class provides service to reduce grpc response gathered from multiple servers
* to {@link BrokerResponseNative}.
*/
@ThreadSafe
public class StreamingReduceService extends BaseReduceService {
private static final Logger LOGGER = LoggerFactory.getLogger(StreamingReduceService.class);
public StreamingReduceService(PinotConfiguration config) {
super(config);
}
public BrokerResponseNative reduceOnStreamResponse(BrokerRequest brokerRequest,
Map<ServerRoutingInstance, Iterator<Server.ServerResponse>> serverResponseMap, long reduceTimeOutMs,
@Nullable BrokerMetrics brokerMetrics)
throws IOException {
if (serverResponseMap.isEmpty()) {
// Empty response.
return BrokerResponseNative.empty();
}
// prepare contextual info for reduce.
PinotQuery pinotQuery = brokerRequest.getPinotQuery();
Map<String, String> queryOptions = pinotQuery.getQueryOptions();
boolean enableTrace =
queryOptions != null && Boolean.parseBoolean(queryOptions.get(CommonConstants.Broker.Request.TRACE));
QueryContext queryContext = QueryContextConverterUtils.getQueryContext(pinotQuery);
String tableName = queryContext.getTableName();
String rawTableName = TableNameBuilder.extractRawTableName(tableName);
// initialize empty response.
ExecutionStatsAggregator aggregator = new ExecutionStatsAggregator(enableTrace);
// Process server response.
DataTableReducerContext dataTableReducerContext =
new DataTableReducerContext(_reduceExecutorService, _maxReduceThreadsPerQuery, reduceTimeOutMs,
_groupByTrimThreshold);
StreamingReducer streamingReducer = ResultReducerFactory.getStreamingReducer(queryContext);
streamingReducer.init(dataTableReducerContext);
try {
processIterativeServerResponse(streamingReducer, _reduceExecutorService, serverResponseMap, reduceTimeOutMs,
aggregator);
} catch (Exception e) {
LOGGER.error("Unable to process streaming query response!", e);
throw new IOException("Unable to process streaming query response!", e);
}
// seal the streaming response.
BrokerResponseNative brokerResponseNative = streamingReducer.seal();
// Set execution statistics and Update broker metrics.
aggregator.setStats(rawTableName, brokerResponseNative, brokerMetrics);
updateAlias(queryContext, brokerResponseNative);
return brokerResponseNative;
}
@VisibleForTesting
static void processIterativeServerResponse(StreamingReducer reducer, ExecutorService executorService,
Map<ServerRoutingInstance, Iterator<Server.ServerResponse>> serverResponseMap, long reduceTimeOutMs,
ExecutionStatsAggregator aggregator)
throws Exception {
int cnt = 0;
CompletableFuture<Void>[] futures = new CompletableFuture[serverResponseMap.size()];
// based on ideas from on https://stackoverflow.com/questions/19348248/waiting-on-a-list-of-future
// and https://stackoverflow.com/questions/23301598/transform-java-future-into-a-completablefuture
// Future created via ExecutorService.submit() can be created by CompletableFuture.supplyAsync()
for (Map.Entry<ServerRoutingInstance, Iterator<Server.ServerResponse>> entry : serverResponseMap.entrySet()) {
futures[cnt++] = CompletableFuture.runAsync(() -> {
Iterator<Server.ServerResponse> streamingResponses = entry.getValue();
try {
while (streamingResponses.hasNext()) {
Server.ServerResponse streamingResponse = streamingResponses.next();
DataTable dataTable = DataTableFactory.getDataTable(streamingResponse.getPayload().asReadOnlyByteBuffer());
// null dataSchema is a metadata-only block.
if (dataTable.getDataSchema() != null) {
reducer.reduce(entry.getKey(), dataTable);
} else {
aggregator.aggregate(entry.getKey(), dataTable);
}
}
} catch (Exception e) {
LOGGER.error("Unable to process streaming response. Failure occurred!", e);
throw new RuntimeException("Unable to process streaming response. Failure occurred!", e);
}
}, executorService);
}
CompletableFuture<Void> syncWaitPoint = CompletableFuture.allOf(futures);
try {
syncWaitPoint.get(reduceTimeOutMs, TimeUnit.MILLISECONDS);
} catch (Exception ex) {
syncWaitPoint.cancel(true);
throw ex;
}
}
public void shutDown() {
_reduceExecutorService.shutdownNow();
}
}