blob: e1982e6b9c491f9f6a19a6bb198620fdcef4ac66 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.qp.physical.sys;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.qp.logical.Operator;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.utils.StatusUtils;
import org.apache.iotdb.service.rpc.thrift.TSStatus;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.TreeMap;
/**
* create multiple timeSeries, could be split to several sub Plans to execute in different DataGroup
*/
public class CreateMultiTimeSeriesPlan extends PhysicalPlan {
private List<PartialPath> paths;
private List<TSDataType> dataTypes;
private List<TSEncoding> encodings;
private List<CompressionType> compressors;
private List<String> alias = null;
private List<Map<String, String>> props = null;
private List<Map<String, String>> tags = null;
private List<Map<String, String>> attributes = null;
/** record the result of creation of time series */
private Map<Integer, TSStatus> results = new TreeMap<>();
private List<Integer> indexes;
public CreateMultiTimeSeriesPlan() {
super(false, Operator.OperatorType.CREATE_MULTI_TIMESERIES);
}
@Override
public List<PartialPath> getPaths() {
return paths;
}
@Override
public void setPaths(List<PartialPath> paths) {
this.paths = paths;
}
public List<TSDataType> getDataTypes() {
return dataTypes;
}
public void setDataTypes(List<TSDataType> dataTypes) {
this.dataTypes = dataTypes;
}
public List<TSEncoding> getEncodings() {
return encodings;
}
public void setEncodings(List<TSEncoding> encodings) {
this.encodings = encodings;
}
public List<CompressionType> getCompressors() {
return compressors;
}
public void setCompressors(List<CompressionType> compressors) {
this.compressors = compressors;
}
public List<String> getAlias() {
return alias;
}
public void setAlias(List<String> alias) {
this.alias = alias;
}
public List<Map<String, String>> getProps() {
return props;
}
public void setProps(List<Map<String, String>> props) {
this.props = props;
}
public List<Map<String, String>> getTags() {
return tags;
}
public void setTags(List<Map<String, String>> tags) {
this.tags = tags;
}
public List<Map<String, String>> getAttributes() {
return attributes;
}
public void setAttributes(List<Map<String, String>> attributes) {
this.attributes = attributes;
}
public List<Integer> getIndexes() {
return indexes;
}
public void setIndexes(List<Integer> indexes) {
this.indexes = indexes;
}
public Map<Integer, TSStatus> getResults() {
return results;
}
public TSStatus[] getFailingStatus() {
return StatusUtils.getFailingStatus(results, paths.size());
}
public void setResults(Map<Integer, TSStatus> results) {
this.results = results;
}
@Override
public void serialize(DataOutputStream stream) throws IOException {
int type = PhysicalPlanType.CREATE_MULTI_TIMESERIES.ordinal();
stream.write(type);
stream.writeInt(paths.size());
stream.writeInt(dataTypes.size()); // size of datatypes, encodings for aligned timeseries
for (PartialPath path : paths) {
putString(stream, path.getFullPath());
}
for (TSDataType dataType : dataTypes) {
stream.write(dataType.ordinal());
}
for (TSEncoding encoding : encodings) {
stream.write(encoding.ordinal());
}
for (CompressionType compressor : compressors) {
stream.write(compressor.ordinal());
}
serializeOptional(stream);
stream.writeLong(index);
}
private void serializeOptional(DataOutputStream stream) throws IOException {
if (alias != null) {
stream.write(1);
putStrings(stream, alias);
} else {
stream.write(0);
}
if (props != null) {
stream.write(1);
ReadWriteIOUtils.write(props, stream);
} else {
stream.write(0);
}
if (tags != null) {
stream.write(1);
ReadWriteIOUtils.write(tags, stream);
} else {
stream.write(0);
}
if (attributes != null) {
stream.write(1);
ReadWriteIOUtils.write(attributes, stream);
} else {
stream.write(0);
}
}
@Override
public void serialize(ByteBuffer buffer) {
int type = PhysicalPlanType.CREATE_MULTI_TIMESERIES.ordinal();
buffer.put((byte) type);
buffer.putInt(paths.size());
buffer.putInt(dataTypes.size()); // size of datatypes, encodings for aligned timeseries
for (PartialPath path : paths) {
putString(buffer, path.getFullPath());
}
for (TSDataType dataType : dataTypes) {
buffer.put((byte) dataType.ordinal());
}
for (TSEncoding encoding : encodings) {
buffer.put((byte) encoding.ordinal());
}
for (CompressionType compressor : compressors) {
buffer.put((byte) compressor.ordinal());
}
serializeOptional(buffer);
buffer.putLong(index);
}
private void serializeOptional(ByteBuffer buffer) {
if (alias != null) {
buffer.put((byte) 1);
putStrings(buffer, alias);
} else {
buffer.put((byte) 0);
}
if (props != null) {
buffer.put((byte) 1);
ReadWriteIOUtils.write(props, buffer);
} else {
buffer.put((byte) 0);
}
if (tags != null) {
buffer.put((byte) 1);
ReadWriteIOUtils.write(tags, buffer);
} else {
buffer.put((byte) 0);
}
if (attributes != null) {
buffer.put((byte) 1);
ReadWriteIOUtils.write(attributes, buffer);
} else {
buffer.put((byte) 0);
}
}
@Override
public void deserialize(ByteBuffer buffer) throws IllegalPathException {
int totalSize = buffer.getInt();
int dataTypeSize = buffer.getInt();
paths = new ArrayList<>(totalSize);
for (int i = 0; i < totalSize; i++) {
paths.add(new PartialPath(readString(buffer)));
}
dataTypes = new ArrayList<>(totalSize);
for (int i = 0; i < dataTypeSize; i++) {
dataTypes.add(TSDataType.values()[buffer.get()]);
}
encodings = new ArrayList<>(totalSize);
for (int i = 0; i < dataTypeSize; i++) {
encodings.add(TSEncoding.values()[buffer.get()]);
}
compressors = new ArrayList<>(totalSize);
for (int i = 0; i < totalSize; i++) {
compressors.add(CompressionType.values()[buffer.get()]);
}
deserializeOptional(buffer, totalSize);
this.index = buffer.getLong();
}
private void deserializeOptional(ByteBuffer buffer, int totalSize) {
if (buffer.get() == 1) {
alias = readStrings(buffer, totalSize);
}
if (buffer.get() == 1) {
props = ReadWriteIOUtils.readMaps(buffer, totalSize);
}
if (buffer.get() == 1) {
tags = ReadWriteIOUtils.readMaps(buffer, totalSize);
}
if (buffer.get() == 1) {
attributes = ReadWriteIOUtils.readMaps(buffer, totalSize);
}
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
CreateMultiTimeSeriesPlan that = (CreateMultiTimeSeriesPlan) o;
return Objects.equals(paths, that.paths)
&& Objects.equals(dataTypes, that.dataTypes)
&& Objects.equals(encodings, that.encodings)
&& Objects.equals(compressors, that.compressors);
}
@Override
public int hashCode() {
return Objects.hash(paths, dataTypes, encodings, compressors);
}
@Override
public void checkIntegrity() throws QueryProcessException {
if (paths == null || paths.isEmpty()) {
throw new QueryProcessException("sub timeseries are empty");
}
if (paths.size() != dataTypes.size()) {
throw new QueryProcessException(
String.format(
"Measurements length [%d] does not match " + " datatype length [%d]",
paths.size(), dataTypes.size()));
}
for (PartialPath path : paths) {
if (path == null) {
throw new QueryProcessException("Paths contain null: " + Arrays.toString(paths.toArray()));
}
}
}
}