blob: 126273d3102073fe654ae78d277e06e113503b8d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.query.service;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.pinot.common.proto.PinotQueryWorkerGrpc;
import org.apache.pinot.common.proto.Worker;
import org.apache.pinot.common.utils.NamedThreadFactory;
import org.apache.pinot.core.transport.grpc.GrpcQueryServer;
import org.apache.pinot.query.runtime.QueryRunner;
import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
import org.apache.pinot.query.runtime.plan.serde.QueryPlanSerDeUtils;
import org.apache.pinot.query.service.dispatch.QueryDispatcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* {@link QueryServer} is the GRPC server that accepts query plan requests sent from {@link QueryDispatcher}.
*/
public class QueryServer extends PinotQueryWorkerGrpc.PinotQueryWorkerImplBase {
private static final Logger LOGGER = LoggerFactory.getLogger(GrpcQueryServer.class);
// TODO: Inbound messages can get quite large because we send the entire stage metadata map in each call.
// See https://github.com/apache/pinot/issues/10331
private static final int MAX_INBOUND_MESSAGE_SIZE = 64 * 1024 * 1024;
private final int _port;
private final QueryRunner _queryRunner;
// query submission service is only used for plan submission for now.
// TODO: with complex query submission logic we should allow asynchronous query submission return instead of
// directly return from submission response observer.
private final ExecutorService _querySubmissionExecutorService;
private Server _server = null;
public QueryServer(int port, QueryRunner queryRunner) {
_port = port;
_queryRunner = queryRunner;
_querySubmissionExecutorService = Executors.newCachedThreadPool(
new NamedThreadFactory("query_submission_executor_on_" + _port + "_port"));
}
public void start() {
LOGGER.info("Starting QueryServer");
try {
if (_server == null) {
_server = ServerBuilder.forPort(_port).addService(this).maxInboundMessageSize(MAX_INBOUND_MESSAGE_SIZE).build();
LOGGER.info("Initialized QueryServer on port: {}", _port);
}
_queryRunner.start();
_server.start();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public void shutdown() {
LOGGER.info("Shutting down QueryServer");
try {
_queryRunner.shutDown();
if (_server != null) {
_server.shutdown();
_server.awaitTermination();
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@Override
public void submit(Worker.QueryRequest request, StreamObserver<Worker.QueryResponse> responseObserver) {
// Deserialize the request
List<DistributedStagePlan> distributedStagePlans;
Map<String, String> requestMetadataMap;
requestMetadataMap = request.getMetadataMap();
long requestId = Long.parseLong(requestMetadataMap.get(QueryConfig.KEY_OF_BROKER_REQUEST_ID));
try {
distributedStagePlans = QueryPlanSerDeUtils.deserializeStagePlan(request);
} catch (Exception e) {
LOGGER.error("Caught exception while deserializing the request: {}, payload: {}", requestId, request, e);
responseObserver.onError(Status.INVALID_ARGUMENT.withDescription("Bad request").withCause(e).asException());
return;
}
// TODO: allow thrown exception to return back to broker in asynchronous manner.
distributedStagePlans.forEach(distributedStagePlan -> _querySubmissionExecutorService.submit(() -> {
try {
_queryRunner.processQuery(distributedStagePlan, requestMetadataMap);
} catch (Throwable t) {
LOGGER.error("Caught exception while compiling opChain for request: {}, stage: {}", requestId,
distributedStagePlan.getStageId(), t);
}
})
);
responseObserver.onNext(Worker.QueryResponse.newBuilder()
.putMetadata(QueryConfig.KEY_OF_SERVER_RESPONSE_STATUS_OK, "").build());
responseObserver.onCompleted();
}
@Override
public void cancel(Worker.CancelRequest request, StreamObserver<Worker.CancelResponse> responseObserver) {
try {
_queryRunner.cancel(request.getRequestId());
} catch (Throwable t) {
LOGGER.error("Caught exception while cancelling opChain for request: {}", request.getRequestId(), t);
}
// we always return completed even if cancel attempt fails, server will self clean up in this case.
responseObserver.onCompleted();
}
}