blob: d212f4ca7252675ea66d95928b994f219e1d1f40 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.queryengine.plan.planner.plan;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.commons.exception.runtime.SerializationRunTimeException;
import org.apache.iotdb.commons.partition.ExecutorType;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
import org.apache.iotdb.db.queryengine.common.SessionInfo;
import org.apache.iotdb.db.queryengine.plan.analyze.QueryType;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeUtil;
import org.apache.tsfile.utils.PublicBAOS;
import org.apache.tsfile.utils.ReadWriteIOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Objects;
public class FragmentInstance implements IConsensusRequest {
private static final Logger LOGGER = LoggerFactory.getLogger(FragmentInstance.class);
private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig();
private final FragmentInstanceId id;
private final QueryType type;
// The reference of PlanFragment which this instance is generated from
private final PlanFragment fragment;
// Where the FragmentInstance should run
private ExecutorType executorType;
private TDataNodeLocation hostDataNode;
private final TimePredicate globalTimePredicate;
private final long timeOut;
private boolean isRoot;
private final SessionInfo sessionInfo;
// The num of all FI on the dispatched DataNode in this query
private int dataNodeFINum;
private boolean isHighestPriority;
// indicate which index we are retrying
private transient int nextRetryIndex = 0;
// If this query is an EXPLAIN ANALYZE query
// We need to cache and calculate the statistics of this FragmentInstance if it is.
private boolean isExplainAnalyze = false;
// We can add some more params for a specific FragmentInstance
// So that we can make different FragmentInstance owns different data range.
public FragmentInstance(
PlanFragment fragment,
FragmentInstanceId id,
TimePredicate globalTimePredicate,
QueryType type,
long timeOut,
SessionInfo sessionInfo) {
this.fragment = fragment;
this.globalTimePredicate = globalTimePredicate;
this.id = id;
this.type = type;
this.timeOut = timeOut > 0 ? timeOut : CONFIG.getQueryTimeoutThreshold();
this.isRoot = false;
this.sessionInfo = sessionInfo;
}
public FragmentInstance(
PlanFragment fragment,
FragmentInstanceId id,
TimePredicate globalTimePredicate,
QueryType type,
long timeOut,
SessionInfo sessionInfo,
boolean isExplainAnalyze,
boolean isRoot) {
this(fragment, id, globalTimePredicate, type, timeOut, sessionInfo);
this.isRoot = isRoot;
this.isExplainAnalyze = isExplainAnalyze;
}
public FragmentInstance(
PlanFragment fragment,
FragmentInstanceId id,
TimePredicate globalTimePredicate,
QueryType type,
long timeOut,
SessionInfo sessionInfo,
int dataNodeFINum) {
this(fragment, id, globalTimePredicate, type, timeOut, sessionInfo);
this.dataNodeFINum = dataNodeFINum;
}
public void setExecutorAndHost(ExecutorType executorType) {
if (executorType == null) {
return;
}
this.executorType = executorType;
this.hostDataNode = executorType.getDataNodeLocation();
}
// Although the HostDataNode is set in method setDataRegionAndHost(),
// we still keep another method for customized needs
public void setHostDataNode(TDataNodeLocation hostDataNode) {
this.hostDataNode = hostDataNode;
}
public ExecutorType getExecutorType() {
return executorType;
}
@TestOnly
public void setExecutorType(ExecutorType executorType) {
this.executorType = executorType;
}
public TRegionReplicaSet getRegionReplicaSet() {
return executorType.getRegionReplicaSet();
}
public PlanFragment getFragment() {
return fragment;
}
public FragmentInstanceId getId() {
return id;
}
public boolean isRoot() {
return isRoot;
}
public boolean isHighestPriority() {
return isHighestPriority;
}
public void setHighestPriority(boolean highestPriority) {
isHighestPriority = highestPriority;
}
public TimePredicate getGlobalTimePredicate() {
return globalTimePredicate;
}
public QueryType getType() {
return type;
}
public int getDataNodeFINum() {
return dataNodeFINum;
}
public void setDataNodeFINum(int dataNodeFINum) {
this.dataNodeFINum = dataNodeFINum;
}
public String toString() {
StringBuilder ret = new StringBuilder();
ret.append(String.format("FragmentInstance-%s:", getId()));
ret.append(
String.format(
"Host: %s ",
getHostDataNode() == null
? "Not set"
: getHostDataNode().dataNodeId + " - " + getHostDataNode().internalEndPoint));
ret.append(
String.format(
"Region: %s ",
getRegionReplicaSet() == null ? "Not set" : getRegionReplicaSet().getRegionId()));
ret.append("\n---- Plan Node Tree ----\n");
ret.append(PlanNodeUtil.nodeToString(getFragment().getPlanNodeTree()));
ret.append(String.format("timeOut-%s:", getTimeOut()));
return ret.toString();
}
public static FragmentInstance deserializeFrom(ByteBuffer buffer) {
FragmentInstanceId id = FragmentInstanceId.deserialize(buffer);
PlanFragment planFragment = PlanFragment.deserialize(buffer);
long timeOut = ReadWriteIOUtils.readLong(buffer);
boolean hasSessionInfo = ReadWriteIOUtils.readBool(buffer);
SessionInfo sessionInfo = hasSessionInfo ? SessionInfo.deserializeFrom(buffer) : null;
boolean hasTimePredicate = ReadWriteIOUtils.readBool(buffer);
TimePredicate globalTimePredicate = hasTimePredicate ? TimePredicate.deserialize(buffer) : null;
QueryType queryType = QueryType.values()[ReadWriteIOUtils.readInt(buffer)];
int dataNodeFINum = ReadWriteIOUtils.readInt(buffer);
FragmentInstance fragmentInstance =
new FragmentInstance(
planFragment, id, globalTimePredicate, queryType, timeOut, sessionInfo, dataNodeFINum);
boolean hasHostDataNode = ReadWriteIOUtils.readBool(buffer);
fragmentInstance.hostDataNode =
hasHostDataNode ? ThriftCommonsSerDeUtils.deserializeTDataNodeLocation(buffer) : null;
fragmentInstance.isExplainAnalyze = ReadWriteIOUtils.readBool(buffer);
return fragmentInstance;
}
public ByteBuffer serializeToByteBuffer() {
try (PublicBAOS publicBAOS = new PublicBAOS();
DataOutputStream outputStream = new DataOutputStream(publicBAOS)) {
id.serialize(outputStream);
fragment.serialize(outputStream);
ReadWriteIOUtils.write(timeOut, outputStream);
ReadWriteIOUtils.write(sessionInfo != null, outputStream);
if (sessionInfo != null) {
sessionInfo.serialize(outputStream);
}
ReadWriteIOUtils.write(globalTimePredicate != null, outputStream);
if (globalTimePredicate != null) {
globalTimePredicate.serialize(outputStream);
}
ReadWriteIOUtils.write(type.ordinal(), outputStream);
ReadWriteIOUtils.write(dataNodeFINum, outputStream);
ReadWriteIOUtils.write(hostDataNode != null, outputStream);
if (hostDataNode != null) {
ThriftCommonsSerDeUtils.serializeTDataNodeLocation(hostDataNode, outputStream);
}
ReadWriteIOUtils.write(isExplainAnalyze, outputStream);
return ByteBuffer.wrap(publicBAOS.getBuf(), 0, publicBAOS.size());
} catch (IOException e) {
LOGGER.error("Unexpected error occurs when serializing this FragmentInstance.", e);
throw new SerializationRunTimeException(e);
}
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
FragmentInstance instance = (FragmentInstance) o;
return Objects.equals(id, instance.id)
&& type == instance.type
&& Objects.equals(fragment, instance.fragment)
&& Objects.equals(executorType, instance.executorType)
&& Objects.equals(hostDataNode, instance.hostDataNode)
&& Objects.equals(globalTimePredicate, instance.globalTimePredicate);
}
@Override
public int hashCode() {
return Objects.hash(id, type, fragment, executorType, hostDataNode, globalTimePredicate);
}
public TDataNodeLocation getHostDataNode() {
return hostDataNode;
}
public TDataNodeLocation getNextRetriedHostDataNode() {
nextRetryIndex =
(nextRetryIndex + 1) % executorType.getRegionReplicaSet().getDataNodeLocations().size();
this.hostDataNode =
executorType.getRegionReplicaSet().getDataNodeLocations().get(nextRetryIndex);
return hostDataNode;
}
public long getTimeOut() {
return timeOut;
}
public SessionInfo getSessionInfo() {
return sessionInfo;
}
public boolean isExplainAnalyze() {
return isExplainAnalyze;
}
}