blob: ca24f9704372e8adca5ef1aa9f285a683d5d6427 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.query.service;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.pinot.common.proto.PinotQueryWorkerGrpc;
import org.apache.pinot.common.proto.Worker;
import org.apache.pinot.common.utils.NamedThreadFactory;
import org.apache.pinot.core.query.scheduler.resources.ResourceManager;
import org.apache.pinot.core.routing.TimeBoundaryInfo;
import org.apache.pinot.query.QueryEnvironment;
import org.apache.pinot.query.QueryEnvironmentTestBase;
import org.apache.pinot.query.QueryTestSet;
import org.apache.pinot.query.planner.DispatchablePlanFragment;
import org.apache.pinot.query.planner.DispatchableSubPlan;
import org.apache.pinot.query.planner.plannode.PlanNode;
import org.apache.pinot.query.routing.QueryServerInstance;
import org.apache.pinot.query.routing.WorkerMetadata;
import org.apache.pinot.query.runtime.QueryRunner;
import org.apache.pinot.query.runtime.plan.StageMetadata;
import org.apache.pinot.query.runtime.plan.serde.QueryPlanSerDeUtils;
import org.apache.pinot.query.testutils.QueryTestUtils;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.EqualityUtils;
import org.apache.pinot.util.TestUtils;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
public class QueryServerTest extends QueryTestSet {
private static final Random RANDOM_REQUEST_ID_GEN = new Random();
private static final int QUERY_SERVER_COUNT = 2;
private static final String KEY_OF_SERVER_INSTANCE_HOST = "pinot.query.runner.server.hostname";
private static final String KEY_OF_SERVER_INSTANCE_PORT = "pinot.query.runner.server.port";
private static final ExecutorService LEAF_WORKER_EXECUTOR_SERVICE =
Executors.newFixedThreadPool(ResourceManager.DEFAULT_QUERY_WORKER_THREADS,
new NamedThreadFactory("QueryDispatcherTest_LeafWorker"));
private static final ExecutorService INTERM_WORKER_EXECUTOR_SERVICE =
Executors.newFixedThreadPool(ResourceManager.DEFAULT_QUERY_WORKER_THREADS,
new NamedThreadFactory("QueryDispatcherTest_IntermWorker"));
private static final ExecutorService RUNNER_EXECUTOR_SERVICE =
Executors.newFixedThreadPool(ResourceManager.DEFAULT_QUERY_RUNNER_THREADS,
new NamedThreadFactory("QueryServerTest_Runner"));
private final Map<Integer, QueryServer> _queryServerMap = new HashMap<>();
private final Map<Integer, QueryRunner> _queryRunnerMap = new HashMap<>();
private QueryEnvironment _queryEnvironment;
@BeforeClass
public void setUp()
throws Exception {
for (int i = 0; i < QUERY_SERVER_COUNT; i++) {
int availablePort = QueryTestUtils.getAvailablePort();
QueryRunner queryRunner = Mockito.mock(QueryRunner.class);
Mockito.when(queryRunner.getQueryWorkerLeafExecutorService()).thenReturn(LEAF_WORKER_EXECUTOR_SERVICE);
Mockito.when(queryRunner.getQueryWorkerIntermExecutorService()).thenReturn(INTERM_WORKER_EXECUTOR_SERVICE);
Mockito.when(queryRunner.getQueryRunnerExecutorService()).thenReturn(RUNNER_EXECUTOR_SERVICE);
QueryServer queryServer = new QueryServer(availablePort, queryRunner);
queryServer.start();
_queryServerMap.put(availablePort, queryServer);
_queryRunnerMap.put(availablePort, queryRunner);
}
List<Integer> portList = Lists.newArrayList(_queryServerMap.keySet());
// reducer port doesn't matter, we are testing the worker instance not GRPC.
_queryEnvironment = QueryEnvironmentTestBase.getQueryEnvironment(1, portList.get(0), portList.get(1),
QueryEnvironmentTestBase.TABLE_SCHEMAS, QueryEnvironmentTestBase.SERVER1_SEGMENTS,
QueryEnvironmentTestBase.SERVER2_SEGMENTS);
}
@AfterClass
public void tearDown() {
for (QueryServer worker : _queryServerMap.values()) {
worker.shutdown();
}
}
@SuppressWarnings("unchecked")
@Test(dataProvider = "testSql")
public void testWorkerAcceptsWorkerRequestCorrect(String sql)
throws Exception {
DispatchableSubPlan dispatchableSubPlan = _queryEnvironment.planQuery(sql);
for (int stageId = 0; stageId < dispatchableSubPlan.getQueryStageList().size(); stageId++) {
if (stageId > 0) { // we do not test reduce stage.
// only get one worker request out.
Worker.QueryRequest queryRequest = getQueryRequest(dispatchableSubPlan, stageId);
// submit the request for testing.
submitRequest(queryRequest);
DispatchablePlanFragment dispatchablePlanFragment = dispatchableSubPlan.getQueryStageList().get(stageId);
StageMetadata stageMetadata = new StageMetadata.Builder()
.setWorkerMetadataList(dispatchablePlanFragment.getWorkerMetadataList())
.addCustomProperties(dispatchablePlanFragment.getCustomProperties()).build();
// ensure mock query runner received correctly deserialized payload.
QueryRunner mockRunner =
_queryRunnerMap.get(Integer.parseInt(queryRequest.getMetadataOrThrow(KEY_OF_SERVER_INSTANCE_PORT)));
String requestIdStr = queryRequest.getMetadataOrThrow(QueryConfig.KEY_OF_BROKER_REQUEST_ID);
// since submitRequest is async, we need to wait for the mockRunner to receive the query payload.
int finalStageId = stageId;
TestUtils.waitForCondition(aVoid -> {
try {
Mockito.verify(mockRunner).processQuery(Mockito.argThat(distributedStagePlan -> {
PlanNode planNode =
dispatchableSubPlan.getQueryStageList().get(finalStageId).getPlanFragment().getFragmentRoot();
return isStageNodesEqual(planNode, distributedStagePlan.getStageRoot()) && isStageMetadataEqual(
stageMetadata, distributedStagePlan.getStageMetadata());
}), Mockito.argThat(requestMetadataMap -> requestIdStr.equals(
requestMetadataMap.get(QueryConfig.KEY_OF_BROKER_REQUEST_ID))));
return true;
} catch (Throwable t) {
return false;
}
}, 10000L, "Error verifying mock QueryRunner intercepted query payload!");
// reset the mock runner.
Mockito.reset(mockRunner);
}
}
}
private boolean isStageMetadataEqual(StageMetadata expected, StageMetadata actual) {
if (!EqualityUtils.isEqual(StageMetadata.getTableName(expected),
StageMetadata.getTableName(actual))) {
return false;
}
TimeBoundaryInfo expectedTimeBoundaryInfo = StageMetadata.getTimeBoundary(expected);
TimeBoundaryInfo actualTimeBoundaryInfo = StageMetadata.getTimeBoundary(actual);
if (expectedTimeBoundaryInfo == null && actualTimeBoundaryInfo != null
|| expectedTimeBoundaryInfo != null && actualTimeBoundaryInfo == null) {
return false;
}
if (expectedTimeBoundaryInfo != null && actualTimeBoundaryInfo != null && (
!EqualityUtils.isEqual(expectedTimeBoundaryInfo.getTimeColumn(), actualTimeBoundaryInfo.getTimeColumn())
|| !EqualityUtils.isEqual(expectedTimeBoundaryInfo.getTimeValue(),
actualTimeBoundaryInfo.getTimeValue()))) {
return false;
}
List<WorkerMetadata> expectedWorkerMetadataList = expected.getWorkerMetadataList();
List<WorkerMetadata> actualWorkerMetadataList = actual.getWorkerMetadataList();
if (expectedWorkerMetadataList.size() != actualWorkerMetadataList.size()) {
return false;
}
for (int i = 0; i < expectedWorkerMetadataList.size(); i++) {
if (!isWorkerMetadataEqual(expectedWorkerMetadataList.get(i), actualWorkerMetadataList.get(i))) {
return false;
}
}
return true;
}
private static boolean isWorkerMetadataEqual(WorkerMetadata expected, WorkerMetadata actual) {
if (!expected.getVirtualServerAddress().hostname().equals(actual.getVirtualServerAddress().hostname())
|| expected.getVirtualServerAddress().port() != actual.getVirtualServerAddress().port()
|| expected.getVirtualServerAddress().workerId() != actual.getVirtualServerAddress().workerId()) {
return false;
}
return EqualityUtils.isEqual(WorkerMetadata.getTableSegmentsMap(expected),
WorkerMetadata.getTableSegmentsMap(actual));
}
private static boolean isStageNodesEqual(PlanNode left, PlanNode right) {
// This only checks the stage tree structure is correct. because the input/stageId fields are not
// part of the generic proto ser/de; which is tested in query planner.
if (left.getPlanFragmentId() != right.getPlanFragmentId() || left.getClass() != right.getClass()
|| left.getInputs().size() != right.getInputs().size()) {
return false;
}
left.getInputs().sort(Comparator.comparingInt(PlanNode::getPlanFragmentId));
right.getInputs().sort(Comparator.comparingInt(PlanNode::getPlanFragmentId));
for (int i = 0; i < left.getInputs().size(); i++) {
if (!isStageNodesEqual(left.getInputs().get(i), right.getInputs().get(i))) {
return false;
}
}
return true;
}
private void submitRequest(Worker.QueryRequest queryRequest) {
String host = queryRequest.getMetadataMap().get(KEY_OF_SERVER_INSTANCE_HOST);
int port = Integer.parseInt(queryRequest.getMetadataMap().get(KEY_OF_SERVER_INSTANCE_PORT));
ManagedChannel channel = ManagedChannelBuilder.forAddress(host, port).usePlaintext().build();
PinotQueryWorkerGrpc.PinotQueryWorkerBlockingStub stub = PinotQueryWorkerGrpc.newBlockingStub(channel);
Worker.QueryResponse resp = stub.submit(queryRequest);
// TODO: validate meaningful return value
Assert.assertNotNull(resp.getMetadataMap().get(QueryConfig.KEY_OF_SERVER_RESPONSE_STATUS_OK));
channel.shutdown();
}
private Worker.QueryRequest getQueryRequest(DispatchableSubPlan dispatchableSubPlan, int stageId) {
Map<QueryServerInstance, List<Integer>> serverInstanceToWorkerIdMap =
dispatchableSubPlan.getQueryStageList().get(stageId).getServerInstanceToWorkerIdMap();
// this particular test set requires the request to have a single QueryServerInstance to dispatch to
// as it is not testing the multi-tenancy dispatch (which is in the QueryDispatcherTest)
QueryServerInstance serverInstance = serverInstanceToWorkerIdMap.keySet().iterator().next();
int workerId = serverInstanceToWorkerIdMap.get(serverInstance).get(0);
return Worker.QueryRequest.newBuilder().addStagePlan(
QueryPlanSerDeUtils.serialize(dispatchableSubPlan, stageId, serverInstance, ImmutableList.of(workerId)))
// the default configurations that must exist.
.putMetadata(QueryConfig.KEY_OF_BROKER_REQUEST_ID, String.valueOf(RANDOM_REQUEST_ID_GEN.nextLong()))
.putMetadata(QueryConfig.KEY_OF_BROKER_REQUEST_TIMEOUT_MS,
String.valueOf(CommonConstants.Broker.DEFAULT_BROKER_TIMEOUT_MS))
// extra configurations we want to test also parsed out correctly.
.putMetadata(KEY_OF_SERVER_INSTANCE_HOST, serverInstance.getHostname())
.putMetadata(KEY_OF_SERVER_INSTANCE_PORT, String.valueOf(serverInstance.getQueryServicePort())).build();
}
}