blob: a8cfa04c4104347aedc575f3a328d98d21ce5042 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.queryengine.plan.planner.plan.node.process;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
import org.apache.tsfile.utils.ReadWriteIOUtils;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
public class ExchangeNode extends SingleChildProcessNode {
// In current version, one ExchangeNode will only have one source.
// And the fragment which the sourceNode belongs to will only have one instance.
// Thus, by nodeId and endpoint, the ExchangeNode can know where its source from.
private TEndPoint upstreamEndpoint;
private FragmentInstanceId upstreamInstanceId;
private PlanNodeId upstreamPlanNodeId;
private List<String> outputColumnNames = new ArrayList<>();
/** Exchange needs to know which child of IdentitySinkNode/ShuffleSinkNode it matches */
private int indexOfUpstreamSinkHandle = 0;
public ExchangeNode(PlanNodeId id) {
super(id);
}
@Override
public PlanNodeType getType() {
return PlanNodeType.EXCHANGE;
}
@Override
public int allowedChildCount() {
return CHILD_COUNT_NO_LIMIT;
}
@Override
public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
return visitor.visitExchange(this, context);
}
@Override
public PlanNode clone() {
ExchangeNode node = new ExchangeNode(getPlanNodeId());
node.setOutputColumnNames(outputColumnNames);
node.setIndexOfUpstreamSinkHandle(indexOfUpstreamSinkHandle);
return node;
}
@Override
public List<String> getOutputColumnNames() {
return outputColumnNames;
}
public void setOutputColumnNames(List<String> outputColumnNames) {
this.outputColumnNames = outputColumnNames;
}
public void setUpstream(TEndPoint endPoint, FragmentInstanceId instanceId, PlanNodeId nodeId) {
this.upstreamEndpoint = endPoint;
this.upstreamInstanceId = instanceId;
this.upstreamPlanNodeId = nodeId;
}
public static ExchangeNode deserialize(ByteBuffer byteBuffer) {
TEndPoint endPoint =
new TEndPoint(
ReadWriteIOUtils.readString(byteBuffer), ReadWriteIOUtils.readInt(byteBuffer));
FragmentInstanceId fragmentInstanceId = FragmentInstanceId.deserialize(byteBuffer);
PlanNodeId upstreamPlanNodeId = PlanNodeId.deserialize(byteBuffer);
int outputColumnNamesSize = ReadWriteIOUtils.readInt(byteBuffer);
List<String> outputColumnNames = new ArrayList<>(outputColumnNamesSize);
while (outputColumnNamesSize > 0) {
outputColumnNames.add(ReadWriteIOUtils.readString(byteBuffer));
outputColumnNamesSize--;
}
int index = ReadWriteIOUtils.readInt(byteBuffer);
PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
ExchangeNode exchangeNode = new ExchangeNode(planNodeId);
exchangeNode.setUpstream(endPoint, fragmentInstanceId, upstreamPlanNodeId);
exchangeNode.setOutputColumnNames(outputColumnNames);
exchangeNode.setIndexOfUpstreamSinkHandle(index);
return exchangeNode;
}
@Override
protected void serializeAttributes(ByteBuffer byteBuffer) {
PlanNodeType.EXCHANGE.serialize(byteBuffer);
ReadWriteIOUtils.write(upstreamEndpoint.getIp(), byteBuffer);
ReadWriteIOUtils.write(upstreamEndpoint.getPort(), byteBuffer);
upstreamInstanceId.serialize(byteBuffer);
upstreamPlanNodeId.serialize(byteBuffer);
ReadWriteIOUtils.write(outputColumnNames.size(), byteBuffer);
for (String outputColumnName : outputColumnNames) {
ReadWriteIOUtils.write(outputColumnName, byteBuffer);
}
ReadWriteIOUtils.write(indexOfUpstreamSinkHandle, byteBuffer);
}
@Override
protected void serializeAttributes(DataOutputStream stream) throws IOException {
PlanNodeType.EXCHANGE.serialize(stream);
ReadWriteIOUtils.write(upstreamEndpoint.getIp(), stream);
ReadWriteIOUtils.write(upstreamEndpoint.getPort(), stream);
upstreamInstanceId.serialize(stream);
upstreamPlanNodeId.serialize(stream);
ReadWriteIOUtils.write(outputColumnNames.size(), stream);
for (String outputColumnName : outputColumnNames) {
ReadWriteIOUtils.write(outputColumnName, stream);
}
ReadWriteIOUtils.write(indexOfUpstreamSinkHandle, stream);
}
@Override
public String toString() {
return String.format(
"ExchangeNode-%s: [SourceAddress:%s]", getPlanNodeId(), getSourceAddress());
}
public String getSourceAddress() {
if (getUpstreamEndpoint() == null) {
return "Not assigned";
}
return String.format(
"%s/%s/%s",
getUpstreamEndpoint().getIp(), getUpstreamInstanceId(), getUpstreamPlanNodeId());
}
public int getIndexOfUpstreamSinkHandle() {
return indexOfUpstreamSinkHandle;
}
public void setIndexOfUpstreamSinkHandle(int indexOfUpstreamSinkHandle) {
this.indexOfUpstreamSinkHandle = indexOfUpstreamSinkHandle;
}
public TEndPoint getUpstreamEndpoint() {
return upstreamEndpoint;
}
public FragmentInstanceId getUpstreamInstanceId() {
return upstreamInstanceId;
}
public PlanNodeId getUpstreamPlanNodeId() {
return upstreamPlanNodeId;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
if (!super.equals(o)) {
return false;
}
ExchangeNode that = (ExchangeNode) o;
return Objects.equals(upstreamEndpoint, that.upstreamEndpoint)
&& Objects.equals(upstreamInstanceId, that.upstreamInstanceId)
&& Objects.equals(upstreamPlanNodeId, that.upstreamPlanNodeId);
}
@Override
public int hashCode() {
return Objects.hash(super.hashCode(), upstreamEndpoint, upstreamInstanceId, upstreamPlanNodeId);
}
}