blob: cd4c1d6fd78b4d38769f43de40b9ce5f3af5a8dd [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.query.runtime.plan.serde;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.lang.StringUtils;
import org.apache.pinot.common.proto.Worker;
import org.apache.pinot.query.planner.DispatchablePlanFragment;
import org.apache.pinot.query.planner.DispatchableSubPlan;
import org.apache.pinot.query.planner.plannode.AbstractPlanNode;
import org.apache.pinot.query.planner.plannode.StageNodeSerDeUtils;
import org.apache.pinot.query.routing.MailboxMetadata;
import org.apache.pinot.query.routing.QueryServerInstance;
import org.apache.pinot.query.routing.VirtualServerAddress;
import org.apache.pinot.query.routing.WorkerMetadata;
import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
import org.apache.pinot.query.runtime.plan.StageMetadata;
/**
* This utility class serialize/deserialize between {@link Worker.StagePlan} elements to Planner elements.
*/
public class QueryPlanSerDeUtils {
private static final Pattern VIRTUAL_SERVER_PATTERN = Pattern.compile(
"(?<virtualid>[0-9]+)@(?<host>[^:]+):(?<port>[0-9]+)");
private QueryPlanSerDeUtils() {
// do not instantiate.
}
public static List<DistributedStagePlan> deserializeStagePlan(Worker.QueryRequest request) {
List<DistributedStagePlan> distributedStagePlans = new ArrayList<>();
for (Worker.StagePlan stagePlan : request.getStagePlanList()) {
distributedStagePlans.addAll(deserializeStagePlan(stagePlan));
}
return distributedStagePlans;
}
public static Worker.StagePlan serialize(DispatchableSubPlan dispatchableSubPlan, int stageId,
QueryServerInstance queryServerInstance, List<Integer> workerIds) {
return Worker.StagePlan.newBuilder()
.setStageId(stageId)
.setStageRoot(StageNodeSerDeUtils.serializeStageNode(
(AbstractPlanNode) dispatchableSubPlan.getQueryStageList().get(stageId).getPlanFragment()
.getFragmentRoot()))
.setStageMetadata(
toProtoStageMetadata(dispatchableSubPlan.getQueryStageList().get(stageId), queryServerInstance, workerIds))
.build();
}
public static VirtualServerAddress protoToAddress(String virtualAddressStr) {
Matcher matcher = VIRTUAL_SERVER_PATTERN.matcher(virtualAddressStr);
if (!matcher.matches()) {
throw new IllegalArgumentException("Unexpected virtualAddressStr '" + virtualAddressStr + "'. This might "
+ "happen if you are upgrading from an old version of the multistage engine to the current one in a rolling "
+ "fashion.");
}
// Skipped netty and grpc port as they are not used in worker instance.
return new VirtualServerAddress(matcher.group("host"),
Integer.parseInt(matcher.group("port")), Integer.parseInt(matcher.group("virtualid")));
}
public static String addressToProto(VirtualServerAddress serverAddress) {
return String.format("%s@%s:%s", serverAddress.workerId(), serverAddress.hostname(), serverAddress.port());
}
private static List<DistributedStagePlan> deserializeStagePlan(Worker.StagePlan stagePlan) {
List<DistributedStagePlan> distributedStagePlans = new ArrayList<>();
String serverAddress = stagePlan.getStageMetadata().getServerAddress();
String[] hostPort = StringUtils.split(serverAddress, ':');
String hostname = hostPort[0];
int port = Integer.parseInt(hostPort[1]);
AbstractPlanNode stageRoot = StageNodeSerDeUtils.deserializeStageNode(stagePlan.getStageRoot());
StageMetadata stageMetadata = fromProtoStageMetadata(stagePlan.getStageMetadata());
for (int workerId : stagePlan.getStageMetadata().getWorkerIdsList()) {
DistributedStagePlan distributedStagePlan = new DistributedStagePlan(stagePlan.getStageId());
VirtualServerAddress virtualServerAddress = new VirtualServerAddress(hostname, port, workerId);
distributedStagePlan.setServer(virtualServerAddress);
distributedStagePlan.setStageRoot(stageRoot);
distributedStagePlan.setStageMetadata(stageMetadata);
distributedStagePlans.add(distributedStagePlan);
}
return distributedStagePlans;
}
private static StageMetadata fromProtoStageMetadata(Worker.StageMetadata protoStageMetadata) {
StageMetadata.Builder builder = new StageMetadata.Builder();
List<WorkerMetadata> workerMetadataList = new ArrayList<>();
for (Worker.WorkerMetadata protoWorkerMetadata : protoStageMetadata.getWorkerMetadataList()) {
workerMetadataList.add(fromProtoWorkerMetadata(protoWorkerMetadata));
}
builder.setWorkerMetadataList(workerMetadataList);
builder.putAllCustomProperties(protoStageMetadata.getCustomPropertyMap());
return builder.build();
}
private static WorkerMetadata fromProtoWorkerMetadata(Worker.WorkerMetadata protoWorkerMetadata) {
WorkerMetadata.Builder builder = new WorkerMetadata.Builder();
builder.setVirtualServerAddress(protoToAddress(protoWorkerMetadata.getVirtualAddress()));
builder.putAllMailBoxInfosMap(fromProtoMailboxMetadataMap(protoWorkerMetadata.getMailboxMetadataMap()));
builder.putAllCustomProperties(protoWorkerMetadata.getCustomPropertyMap());
return builder.build();
}
private static Map<Integer, MailboxMetadata> fromProtoMailboxMetadataMap(
Map<Integer, Worker.MailboxMetadata> mailboxMetadataMap) {
Map<Integer, MailboxMetadata> mailboxMap = new HashMap<>();
for (Map.Entry<Integer, Worker.MailboxMetadata> entry : mailboxMetadataMap.entrySet()) {
mailboxMap.put(entry.getKey(), fromProtoMailbox(entry.getValue()));
}
return mailboxMap;
}
private static MailboxMetadata fromProtoMailbox(Worker.MailboxMetadata protoMailboxMetadata) {
List<String> mailboxIds = new ArrayList<>();
List<VirtualServerAddress> virtualAddresses = new ArrayList<>();
for (int i = 0; i < protoMailboxMetadata.getMailboxIdCount(); i++) {
mailboxIds.add(protoMailboxMetadata.getMailboxId(i));
virtualAddresses.add(protoToAddress(protoMailboxMetadata.getVirtualAddress(i)));
}
MailboxMetadata mailboxMetadata =
new MailboxMetadata(mailboxIds, virtualAddresses, protoMailboxMetadata.getCustomPropertyMap());
return mailboxMetadata;
}
private static Worker.StageMetadata toProtoStageMetadata(DispatchablePlanFragment planFragment,
QueryServerInstance queryServerInstance, List<Integer> workerIds) {
Worker.StageMetadata.Builder builder = Worker.StageMetadata.newBuilder();
for (WorkerMetadata workerMetadata : planFragment.getWorkerMetadataList()) {
builder.addWorkerMetadata(toProtoWorkerMetadata(workerMetadata));
}
builder.putAllCustomProperty(planFragment.getCustomProperties());
builder.setServerAddress(String.format("%s:%d", queryServerInstance.getHostname(),
queryServerInstance.getQueryMailboxPort()));
builder.addAllWorkerIds(workerIds);
return builder.build();
}
private static Worker.WorkerMetadata toProtoWorkerMetadata(WorkerMetadata workerMetadata) {
Worker.WorkerMetadata.Builder builder = Worker.WorkerMetadata.newBuilder();
builder.setVirtualAddress(addressToProto(workerMetadata.getVirtualServerAddress()));
builder.putAllMailboxMetadata(toProtoMailboxMap(workerMetadata.getMailBoxInfosMap()));
builder.putAllCustomProperty(workerMetadata.getCustomProperties());
return builder.build();
}
private static Map<Integer, Worker.MailboxMetadata> toProtoMailboxMap(
Map<Integer, MailboxMetadata> mailBoxInfosMap) {
Map<Integer, Worker.MailboxMetadata> mailboxMetadataMap = new HashMap<>();
for (Map.Entry<Integer, MailboxMetadata> entry : mailBoxInfosMap.entrySet()) {
mailboxMetadataMap.put(entry.getKey(), toProtoMailbox(entry.getValue()));
}
return mailboxMetadataMap;
}
private static Worker.MailboxMetadata toProtoMailbox(MailboxMetadata mailboxMetadata) {
Worker.MailboxMetadata.Builder builder = Worker.MailboxMetadata.newBuilder();
for (int i = 0; i < mailboxMetadata.getMailBoxIdList().size(); i++) {
builder.addMailboxId(mailboxMetadata.getMailBoxId(i));
builder.addVirtualAddress(mailboxMetadata.getVirtualAddress(i).toString());
}
builder.putAllCustomProperty(mailboxMetadata.getCustomProperties());
return builder.build();
}
}