blob: bca92095fd9fa888c5cc58707dbfc744c88c8d4a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.qp.physical.crud;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
import org.apache.iotdb.db.qp.logical.Operator;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
public abstract class InsertPlan extends PhysicalPlan {
protected PartialPath prefixPath;
protected PartialPath originalPrefixPath;
protected boolean isAligned;
protected String[] measurements;
// get from client
protected TSDataType[] dataTypes;
// get from MManager
protected IMeasurementMNode[] measurementMNodes;
// record the failed measurements, their reasons, and positions in "measurements"
List<String> failedMeasurements;
private List<Exception> failedExceptions;
List<Integer> failedIndices;
public InsertPlan(Operator.OperatorType operatorType) {
super(false, operatorType);
super.canBeSplit = false;
}
public PartialPath getPrefixPath() {
return prefixPath;
}
public void setPrefixPath(PartialPath prefixPath) {
this.prefixPath = prefixPath;
}
/*
the original prefixPath needs to be recorded and recovered by recoverFromFailure because cluster may try to execute this plan twice
*/
public void setPrefixPathForAlignTimeSeries(PartialPath prefixPath) {
this.originalPrefixPath = this.prefixPath;
this.prefixPath = prefixPath;
}
public String[] getMeasurements() {
return this.measurements;
}
public void setMeasurements(String[] measurements) {
this.measurements = measurements;
}
public TSDataType[] getDataTypes() {
return dataTypes;
}
public void setDataTypes(TSDataType[] dataTypes) {
this.dataTypes = dataTypes;
}
public IMeasurementMNode[] getMeasurementMNodes() {
return measurementMNodes;
}
public void setMeasurementMNodes(IMeasurementMNode[] mNodes) {
this.measurementMNodes = mNodes;
}
public List<String> getFailedMeasurements() {
return failedMeasurements;
}
public List<Exception> getFailedExceptions() {
return failedExceptions;
}
public int getFailedMeasurementNumber() {
return failedMeasurements == null ? 0 : failedMeasurements.size();
}
public boolean isAligned() {
return isAligned;
}
public void setAligned(boolean aligned) {
isAligned = aligned;
}
public abstract long getMinTime();
/** @param index failed measurement index */
public void markFailedMeasurementInsertion(int index, Exception e) {
if (measurements[index] == null) {
return;
}
if (failedMeasurements == null) {
failedMeasurements = new ArrayList<>();
failedExceptions = new ArrayList<>();
failedIndices = new ArrayList<>();
}
failedMeasurements.add(measurements[index]);
failedExceptions.add(e);
failedIndices.add(index);
measurements[index] = null;
}
public void markFailedMeasurementAlignedInsertion(Exception e) {
if (failedMeasurements == null) {
failedMeasurements = new ArrayList<>();
failedExceptions = new ArrayList<>();
failedIndices = new ArrayList<>();
}
for (int i = 0; i < measurements.length; i++) {
if (measurements[i] == null) {
continue;
}
failedMeasurements.add(measurements[i]);
failedExceptions.add(e);
failedIndices.add(i);
measurements[i] = null;
}
}
/**
* Reconstruct this plan with the failed measurements.
*
* @return the plan itself, with measurements replaced with the previously failed ones.
*/
public InsertPlan getPlanFromFailed() {
if (failedMeasurements == null) {
return null;
}
measurements = failedMeasurements.toArray(new String[0]);
failedMeasurements = null;
if (dataTypes != null) {
TSDataType[] temp = dataTypes.clone();
dataTypes = new TSDataType[failedIndices.size()];
for (int i = 0; i < failedIndices.size(); i++) {
dataTypes[i] = temp[failedIndices.get(i)];
}
}
if (measurementMNodes != null) {
IMeasurementMNode[] temp = measurementMNodes.clone();
measurementMNodes = new IMeasurementMNode[failedIndices.size()];
for (int i = 0; i < failedIndices.size(); i++) {
measurementMNodes[i] = temp[failedIndices.get(i)];
}
}
failedIndices = null;
failedExceptions = null;
return this;
}
/** Reset measurements from failed measurements (if any), as if no failure had ever happened. */
public void recoverFromFailure() {
if (isAligned && originalPrefixPath != null) {
prefixPath = originalPrefixPath;
}
if (failedMeasurements == null) {
return;
}
for (int i = 0; i < failedMeasurements.size(); i++) {
int index = failedIndices.get(i);
measurements[index] = failedMeasurements.get(i);
}
failedIndices = null;
failedExceptions = null;
failedMeasurements = null;
}
@Override
public void checkIntegrity() throws QueryProcessException {
if (prefixPath == null) {
throw new QueryProcessException("DeviceId is null");
}
if (measurements == null) {
throw new QueryProcessException("Measurements are null");
}
for (String measurement : measurements) {
if (measurement == null || measurement.isEmpty()) {
throw new QueryProcessException(
"Measurement contains null or empty string: " + Arrays.toString(measurements));
}
}
}
}