| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.iotdb.db.queryengine.plan.planner.plan.node.write; |
| |
| import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; |
| import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot; |
| import org.apache.iotdb.commons.exception.IllegalPathException; |
| import org.apache.iotdb.commons.path.PartialPath; |
| import org.apache.iotdb.commons.utils.TestOnly; |
| import org.apache.iotdb.commons.utils.TimePartitionUtils; |
| import org.apache.iotdb.db.queryengine.plan.analyze.Analysis; |
| import org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeDevicePathCache; |
| import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; |
| import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; |
| import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType; |
| import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; |
| import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode; |
| import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView; |
| import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntryValue; |
| import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALWriteUtils; |
| import org.apache.iotdb.db.utils.QueryDataSetUtils; |
| import org.apache.iotdb.tsfile.exception.NotImplementedException; |
| import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException; |
| import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; |
| import org.apache.iotdb.tsfile.read.TimeValuePair; |
| import org.apache.iotdb.tsfile.utils.Binary; |
| import org.apache.iotdb.tsfile.utils.BitMap; |
| import org.apache.iotdb.tsfile.utils.BytesUtils; |
| import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; |
| import org.apache.iotdb.tsfile.utils.TsPrimitiveType; |
| import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; |
| |
| import java.io.DataInputStream; |
| import java.io.DataOutputStream; |
| import java.io.IOException; |
| import java.nio.ByteBuffer; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Objects; |
| |
| public class InsertTabletNode extends InsertNode implements WALEntryValue { |
| |
| private static final String DATATYPE_UNSUPPORTED = "Data type %s is not supported."; |
| |
| private long[] times; // times should be sorted. It is done in the session API. |
| |
| private BitMap[] bitMaps; |
| private Object[] columns; |
| |
| private int rowCount = 0; |
| |
| // When this plan is sub-plan split from another InsertTabletNode, this indicates the original |
| // positions of values in |
| // this plan. For example, if the plan contains 5 timestamps, and range = [1,4,10,12], then it |
| // means that the first 3 |
| // timestamps in this plan are from range[1,4) of the parent plan, and the last 2 timestamps are |
| // from range[10,12) |
| // of the parent plan. |
| // this is usually used to back-propagate exceptions to the parent plan without losing their |
| // proper positions. |
| private List<Integer> range; |
| |
| public InsertTabletNode(PlanNodeId id) { |
| super(id); |
| } |
| |
| @TestOnly |
| public InsertTabletNode( |
| PlanNodeId id, |
| PartialPath devicePath, |
| boolean isAligned, |
| String[] measurements, |
| TSDataType[] dataTypes, |
| long[] times, |
| BitMap[] bitMaps, |
| Object[] columns, |
| int rowCount) { |
| super(id, devicePath, isAligned, measurements, dataTypes); |
| this.times = times; |
| this.bitMaps = bitMaps; |
| this.columns = columns; |
| this.rowCount = rowCount; |
| } |
| |
| public InsertTabletNode( |
| PlanNodeId id, |
| PartialPath devicePath, |
| boolean isAligned, |
| String[] measurements, |
| TSDataType[] dataTypes, |
| MeasurementSchema[] measurementSchemas, |
| long[] times, |
| BitMap[] bitMaps, |
| Object[] columns, |
| int rowCount) { |
| super(id, devicePath, isAligned, measurements, dataTypes); |
| this.measurementSchemas = measurementSchemas; |
| this.times = times; |
| this.bitMaps = bitMaps; |
| this.columns = columns; |
| this.rowCount = rowCount; |
| } |
| |
| public long[] getTimes() { |
| return times; |
| } |
| |
| public void setTimes(long[] times) { |
| this.times = times; |
| } |
| |
| public BitMap[] getBitMaps() { |
| return bitMaps; |
| } |
| |
| public void setBitMaps(BitMap[] bitMaps) { |
| this.bitMaps = bitMaps; |
| } |
| |
| public Object[] getColumns() { |
| return columns; |
| } |
| |
| public void setColumns(Object[] columns) { |
| this.columns = columns; |
| } |
| |
| public int getRowCount() { |
| return rowCount; |
| } |
| |
| public void setRowCount(int rowCount) { |
| this.rowCount = rowCount; |
| } |
| |
| public List<Integer> getRange() { |
| return range; |
| } |
| |
| public void setRange(List<Integer> range) { |
| this.range = range; |
| } |
| |
| @Override |
| public List<PlanNode> getChildren() { |
| return null; |
| } |
| |
| @Override |
| public void addChild(PlanNode child) {} |
| |
| @Override |
| public PlanNodeType getType() { |
| return PlanNodeType.INSERT_TABLET; |
| } |
| |
| @Override |
| public PlanNode clone() { |
| throw new NotImplementedException("clone of Insert is not implemented"); |
| } |
| |
| @Override |
| public int allowedChildCount() { |
| return NO_CHILD_ALLOWED; |
| } |
| |
| @Override |
| public List<String> getOutputColumnNames() { |
| return null; |
| } |
| |
| @Override |
| public List<WritePlanNode> splitByPartition(Analysis analysis) { |
| // only single device in single database |
| List<WritePlanNode> result = new ArrayList<>(); |
| if (times.length == 0) { |
| return Collections.emptyList(); |
| } |
| long upperBoundOfTimePartition = TimePartitionUtils.getTimePartitionUpperBound(times[0]); |
| TTimePartitionSlot timePartitionSlot = TimePartitionUtils.getTimePartitionSlot(times[0]); |
| int startLoc = 0; // included |
| |
| List<TTimePartitionSlot> timePartitionSlots = new ArrayList<>(); |
| // for each List in split, they are range1.start, range1.end, range2.start, range2.end, ... |
| List<Integer> ranges = new ArrayList<>(); |
| for (int i = 1; i < times.length; i++) { // times are sorted in session API. |
| if (times[i] >= upperBoundOfTimePartition) { |
| // a new range. |
| ranges.add(startLoc); // included |
| ranges.add(i); // excluded |
| timePartitionSlots.add(timePartitionSlot); |
| // next init |
| startLoc = i; |
| upperBoundOfTimePartition = TimePartitionUtils.getTimePartitionUpperBound(times[i]); |
| timePartitionSlot = TimePartitionUtils.getTimePartitionSlot(times[i]); |
| } |
| } |
| |
| // the final range |
| ranges.add(startLoc); // included |
| ranges.add(times.length); // excluded |
| timePartitionSlots.add(timePartitionSlot); |
| |
| // data region for each time partition |
| List<TRegionReplicaSet> dataRegionReplicaSets = |
| analysis |
| .getDataPartitionInfo() |
| .getDataRegionReplicaSetForWriting(devicePath.getFullPath(), timePartitionSlots); |
| |
| // collect redirectInfo |
| analysis.addEndPointToRedirectNodeList( |
| dataRegionReplicaSets |
| .get(dataRegionReplicaSets.size() - 1) |
| .getDataNodeLocations() |
| .get(0) |
| .getClientRpcEndPoint()); |
| |
| Map<TRegionReplicaSet, List<Integer>> splitMap = new HashMap<>(); |
| for (int i = 0; i < dataRegionReplicaSets.size(); i++) { |
| List<Integer> sub_ranges = |
| splitMap.computeIfAbsent(dataRegionReplicaSets.get(i), x -> new ArrayList<>()); |
| sub_ranges.add(ranges.get(2 * i)); |
| sub_ranges.add(ranges.get(2 * i + 1)); |
| } |
| |
| List<Integer> locs; |
| for (Map.Entry<TRegionReplicaSet, List<Integer>> entry : splitMap.entrySet()) { |
| // generate a new times and values |
| locs = entry.getValue(); |
| // Avoid using system arraycopy when there is no need to split |
| if (splitMap.size() == 1 && locs.size() == 2) { |
| setRange(locs); |
| setDataRegionReplicaSet(entry.getKey()); |
| result.add(this); |
| return result; |
| } |
| for (int i = 0; i < locs.size(); i += 2) { |
| int start = locs.get(i); |
| int end = locs.get(i + 1); |
| int count = end - start; |
| long[] subTimes = new long[count]; |
| int destLoc = 0; |
| Object[] values = initTabletValues(dataTypes.length, count, dataTypes); |
| BitMap[] bitMaps = this.bitMaps == null ? null : initBitmaps(dataTypes.length, count); |
| System.arraycopy(times, start, subTimes, destLoc, end - start); |
| for (int k = 0; k < values.length; k++) { |
| if (dataTypes[k] != null) { |
| System.arraycopy(columns[k], start, values[k], destLoc, end - start); |
| } |
| if (bitMaps != null && this.bitMaps[k] != null) { |
| BitMap.copyOfRange(this.bitMaps[k], start, bitMaps[k], destLoc, end - start); |
| } |
| } |
| InsertTabletNode subNode = |
| new InsertTabletNode( |
| getPlanNodeId(), |
| devicePath, |
| isAligned, |
| measurements, |
| dataTypes, |
| measurementSchemas, |
| subTimes, |
| bitMaps, |
| values, |
| subTimes.length); |
| subNode.setFailedMeasurementNumber(getFailedMeasurementNumber()); |
| subNode.setRange(locs); |
| subNode.setDataRegionReplicaSet(entry.getKey()); |
| result.add(subNode); |
| } |
| } |
| return result; |
| } |
| |
| @TestOnly |
| public List<TTimePartitionSlot> getTimePartitionSlots() { |
| List<TTimePartitionSlot> result = new ArrayList<>(); |
| long upperBoundOfTimePartition = TimePartitionUtils.getTimePartitionUpperBound(times[0]); |
| TTimePartitionSlot timePartitionSlot = TimePartitionUtils.getTimePartitionSlot(times[0]); |
| for (int i = 1; i < times.length; i++) { // times are sorted in session API. |
| if (times[i] >= upperBoundOfTimePartition) { |
| result.add(timePartitionSlot); |
| // next init |
| upperBoundOfTimePartition = TimePartitionUtils.getTimePartitionUpperBound(times[i]); |
| timePartitionSlot = TimePartitionUtils.getTimePartitionSlot(times[i]); |
| } |
| } |
| result.add(timePartitionSlot); |
| return result; |
| } |
| |
| private Object[] initTabletValues(int columnSize, int rowSize, TSDataType[] dataTypes) { |
| Object[] values = new Object[columnSize]; |
| for (int i = 0; i < values.length; i++) { |
| if (dataTypes[i] != null) { |
| switch (dataTypes[i]) { |
| case TEXT: |
| values[i] = new Binary[rowSize]; |
| break; |
| case FLOAT: |
| values[i] = new float[rowSize]; |
| break; |
| case INT32: |
| values[i] = new int[rowSize]; |
| break; |
| case INT64: |
| values[i] = new long[rowSize]; |
| break; |
| case DOUBLE: |
| values[i] = new double[rowSize]; |
| break; |
| case BOOLEAN: |
| values[i] = new boolean[rowSize]; |
| break; |
| } |
| } |
| } |
| return values; |
| } |
| |
| private BitMap[] initBitmaps(int columnSize, int rowSize) { |
| BitMap[] bitMaps = new BitMap[columnSize]; |
| for (int i = 0; i < columnSize; i++) { |
| bitMaps[i] = new BitMap(rowSize); |
| } |
| return bitMaps; |
| } |
| |
| @Override |
| public void markFailedMeasurement(int index) { |
| if (measurements[index] == null) { |
| return; |
| } |
| measurements[index] = null; |
| dataTypes[index] = null; |
| columns[index] = null; |
| } |
| |
| @Override |
| public long getMinTime() { |
| return times[0]; |
| } |
| |
| @Override |
| protected void serializeAttributes(ByteBuffer byteBuffer) { |
| PlanNodeType.INSERT_TABLET.serialize(byteBuffer); |
| subSerialize(byteBuffer); |
| } |
| |
| @Override |
| protected void serializeAttributes(DataOutputStream stream) throws IOException { |
| PlanNodeType.INSERT_TABLET.serialize(stream); |
| subSerialize(stream); |
| } |
| |
| void subSerialize(ByteBuffer buffer) { |
| ReadWriteIOUtils.write(devicePath.getFullPath(), buffer); |
| writeMeasurementsOrSchemas(buffer); |
| writeDataTypes(buffer); |
| writeTimes(buffer); |
| writeBitMaps(buffer); |
| writeValues(buffer); |
| ReadWriteIOUtils.write((byte) (isAligned ? 1 : 0), buffer); |
| } |
| |
| void subSerialize(DataOutputStream stream) throws IOException { |
| ReadWriteIOUtils.write(devicePath.getFullPath(), stream); |
| writeMeasurementsOrSchemas(stream); |
| writeDataTypes(stream); |
| writeTimes(stream); |
| writeBitMaps(stream); |
| writeValues(stream); |
| ReadWriteIOUtils.write((byte) (isAligned ? 1 : 0), stream); |
| } |
| |
| /** Serialize measurements or measurement schemas, ignoring failed time series */ |
| private void writeMeasurementsOrSchemas(ByteBuffer buffer) { |
| ReadWriteIOUtils.write(measurements.length - getFailedMeasurementNumber(), buffer); |
| ReadWriteIOUtils.write((byte) (measurementSchemas != null ? 1 : 0), buffer); |
| |
| for (int i = 0; i < measurements.length; i++) { |
| // ignore failed partial insert |
| if (measurements[i] == null) { |
| continue; |
| } |
| // serialize measurement schemas when exist |
| if (measurementSchemas != null) { |
| measurementSchemas[i].serializeTo(buffer); |
| } else { |
| ReadWriteIOUtils.write(measurements[i], buffer); |
| } |
| } |
| } |
| |
| /** Serialize measurements or measurement schemas, ignoring failed time series */ |
| private void writeMeasurementsOrSchemas(DataOutputStream stream) throws IOException { |
| ReadWriteIOUtils.write(measurements.length - getFailedMeasurementNumber(), stream); |
| ReadWriteIOUtils.write((byte) (measurementSchemas != null ? 1 : 0), stream); |
| |
| for (int i = 0; i < measurements.length; i++) { |
| // ignore failed partial insert |
| if (measurements[i] == null) { |
| continue; |
| } |
| // serialize measurement schemas when exist |
| if (measurementSchemas != null) { |
| measurementSchemas[i].serializeTo(stream); |
| } else { |
| ReadWriteIOUtils.write(measurements[i], stream); |
| } |
| } |
| } |
| |
| /** Serialize data types, ignoring failed time series */ |
| private void writeDataTypes(ByteBuffer buffer) { |
| for (int i = 0; i < dataTypes.length; i++) { |
| // ignore failed partial insert |
| if (measurements[i] == null) { |
| continue; |
| } |
| dataTypes[i].serializeTo(buffer); |
| } |
| } |
| |
| /** Serialize data types, ignoring failed time series */ |
| private void writeDataTypes(DataOutputStream stream) throws IOException { |
| for (int i = 0; i < dataTypes.length; i++) { |
| // ignore failed partial insert |
| if (measurements[i] == null) { |
| continue; |
| } |
| dataTypes[i].serializeTo(stream); |
| } |
| } |
| |
| private void writeTimes(ByteBuffer buffer) { |
| ReadWriteIOUtils.write(rowCount, buffer); |
| for (long time : times) { |
| ReadWriteIOUtils.write(time, buffer); |
| } |
| } |
| |
| private void writeTimes(DataOutputStream stream) throws IOException { |
| ReadWriteIOUtils.write(rowCount, stream); |
| for (long time : times) { |
| ReadWriteIOUtils.write(time, stream); |
| } |
| } |
| |
| /** Serialize bitmaps, ignoring failed time series */ |
| private void writeBitMaps(ByteBuffer buffer) { |
| ReadWriteIOUtils.write(BytesUtils.boolToByte(bitMaps != null), buffer); |
| if (bitMaps != null) { |
| for (int i = 0; i < bitMaps.length; i++) { |
| // ignore failed partial insert |
| if (measurements[i] == null) { |
| continue; |
| } |
| |
| if (bitMaps[i] == null) { |
| ReadWriteIOUtils.write(BytesUtils.boolToByte(false), buffer); |
| } else { |
| ReadWriteIOUtils.write(BytesUtils.boolToByte(true), buffer); |
| buffer.put(bitMaps[i].getByteArray()); |
| } |
| } |
| } |
| } |
| |
| /** Serialize bitmaps, ignoring failed time series */ |
| private void writeBitMaps(DataOutputStream stream) throws IOException { |
| ReadWriteIOUtils.write(BytesUtils.boolToByte(bitMaps != null), stream); |
| if (bitMaps != null) { |
| for (int i = 0; i < bitMaps.length; i++) { |
| // ignore failed partial insert |
| if (measurements[i] == null) { |
| continue; |
| } |
| |
| if (bitMaps[i] == null) { |
| ReadWriteIOUtils.write(BytesUtils.boolToByte(false), stream); |
| } else { |
| ReadWriteIOUtils.write(BytesUtils.boolToByte(true), stream); |
| stream.write(bitMaps[i].getByteArray()); |
| } |
| } |
| } |
| } |
| |
| /** Serialize values, ignoring failed time series */ |
| private void writeValues(ByteBuffer buffer) { |
| for (int i = 0; i < columns.length; i++) { |
| // ignore failed partial insert |
| if (measurements[i] == null) { |
| continue; |
| } |
| serializeColumn(dataTypes[i], columns[i], buffer); |
| } |
| } |
| |
| /** Serialize values, ignoring failed time series */ |
| private void writeValues(DataOutputStream stream) throws IOException { |
| for (int i = 0; i < columns.length; i++) { |
| // ignore failed partial insert |
| if (measurements[i] == null) { |
| continue; |
| } |
| serializeColumn(dataTypes[i], columns[i], stream); |
| } |
| } |
| |
| private void serializeColumn(TSDataType dataType, Object column, ByteBuffer buffer) { |
| switch (dataType) { |
| case INT32: |
| int[] intValues = (int[]) column; |
| for (int j = 0; j < rowCount; j++) { |
| ReadWriteIOUtils.write(intValues[j], buffer); |
| } |
| break; |
| case INT64: |
| long[] longValues = (long[]) column; |
| for (int j = 0; j < rowCount; j++) { |
| ReadWriteIOUtils.write(longValues[j], buffer); |
| } |
| break; |
| case FLOAT: |
| float[] floatValues = (float[]) column; |
| for (int j = 0; j < rowCount; j++) { |
| ReadWriteIOUtils.write(floatValues[j], buffer); |
| } |
| break; |
| case DOUBLE: |
| double[] doubleValues = (double[]) column; |
| for (int j = 0; j < rowCount; j++) { |
| ReadWriteIOUtils.write(doubleValues[j], buffer); |
| } |
| break; |
| case BOOLEAN: |
| boolean[] boolValues = (boolean[]) column; |
| for (int j = 0; j < rowCount; j++) { |
| ReadWriteIOUtils.write(BytesUtils.boolToByte(boolValues[j]), buffer); |
| } |
| break; |
| case TEXT: |
| Binary[] binaryValues = (Binary[]) column; |
| for (int j = 0; j < rowCount; j++) { |
| ReadWriteIOUtils.write(binaryValues[j], buffer); |
| } |
| break; |
| default: |
| throw new UnSupportedDataTypeException(String.format(DATATYPE_UNSUPPORTED, dataType)); |
| } |
| } |
| |
| private void serializeColumn(TSDataType dataType, Object column, DataOutputStream stream) |
| throws IOException { |
| switch (dataType) { |
| case INT32: |
| int[] intValues = (int[]) column; |
| for (int j = 0; j < rowCount; j++) { |
| ReadWriteIOUtils.write(intValues[j], stream); |
| } |
| break; |
| case INT64: |
| long[] longValues = (long[]) column; |
| for (int j = 0; j < rowCount; j++) { |
| ReadWriteIOUtils.write(longValues[j], stream); |
| } |
| break; |
| case FLOAT: |
| float[] floatValues = (float[]) column; |
| for (int j = 0; j < rowCount; j++) { |
| ReadWriteIOUtils.write(floatValues[j], stream); |
| } |
| break; |
| case DOUBLE: |
| double[] doubleValues = (double[]) column; |
| for (int j = 0; j < rowCount; j++) { |
| ReadWriteIOUtils.write(doubleValues[j], stream); |
| } |
| break; |
| case BOOLEAN: |
| boolean[] boolValues = (boolean[]) column; |
| for (int j = 0; j < rowCount; j++) { |
| ReadWriteIOUtils.write(BytesUtils.boolToByte(boolValues[j]), stream); |
| } |
| break; |
| case TEXT: |
| Binary[] binaryValues = (Binary[]) column; |
| for (int j = 0; j < rowCount; j++) { |
| ReadWriteIOUtils.write(binaryValues[j], stream); |
| } |
| break; |
| default: |
| throw new UnSupportedDataTypeException(String.format(DATATYPE_UNSUPPORTED, dataType)); |
| } |
| } |
| |
| public static InsertTabletNode deserialize(ByteBuffer byteBuffer) { |
| InsertTabletNode insertNode = new InsertTabletNode(new PlanNodeId("")); |
| insertNode.subDeserialize(byteBuffer); |
| insertNode.setPlanNodeId(PlanNodeId.deserialize(byteBuffer)); |
| return insertNode; |
| } |
| |
| public void subDeserialize(ByteBuffer buffer) { |
| try { |
| devicePath = new PartialPath(ReadWriteIOUtils.readString(buffer)); |
| } catch (IllegalPathException e) { |
| throw new IllegalArgumentException("Cannot deserialize InsertTabletNode", e); |
| } |
| |
| int measurementSize = buffer.getInt(); |
| measurements = new String[measurementSize]; |
| |
| boolean hasSchema = buffer.get() == 1; |
| if (hasSchema) { |
| this.measurementSchemas = new MeasurementSchema[measurementSize]; |
| for (int i = 0; i < measurementSize; i++) { |
| measurementSchemas[i] = MeasurementSchema.deserializeFrom(buffer); |
| measurements[i] = measurementSchemas[i].getMeasurementId(); |
| } |
| } else { |
| for (int i = 0; i < measurementSize; i++) { |
| measurements[i] = ReadWriteIOUtils.readString(buffer); |
| } |
| } |
| |
| dataTypes = new TSDataType[measurementSize]; |
| for (int i = 0; i < measurementSize; i++) { |
| dataTypes[i] = TSDataType.deserialize(buffer.get()); |
| } |
| |
| rowCount = buffer.getInt(); |
| times = new long[rowCount]; |
| times = QueryDataSetUtils.readTimesFromBuffer(buffer, rowCount); |
| |
| boolean hasBitMaps = BytesUtils.byteToBool(buffer.get()); |
| if (hasBitMaps) { |
| bitMaps = |
| QueryDataSetUtils.readBitMapsFromBuffer(buffer, measurementSize, rowCount).orElse(null); |
| } |
| columns = |
| QueryDataSetUtils.readTabletValuesFromBuffer(buffer, dataTypes, measurementSize, rowCount); |
| isAligned = buffer.get() == 1; |
| } |
| |
| // region serialize & deserialize methods for WAL |
| /** Serialized size for wal */ |
| @Override |
| public int serializedSize() { |
| return serializedSize(0, rowCount); |
| } |
| |
| /** Serialized size for wal */ |
| public int serializedSize(int start, int end) { |
| return Short.BYTES + subSerializeSize(start, end); |
| } |
| |
| int subSerializeSize(int start, int end) { |
| int size = 0; |
| size += Long.BYTES; |
| size += ReadWriteIOUtils.sizeToWrite(devicePath.getFullPath()); |
| // measurements size |
| size += Integer.BYTES; |
| size += serializeMeasurementSchemasSize(); |
| // times size |
| size += Integer.BYTES; |
| size += Long.BYTES * (end - start); |
| // bitmaps size |
| size += Byte.BYTES; |
| if (bitMaps != null) { |
| for (int i = 0; i < bitMaps.length; i++) { |
| // ignore failed partial insert |
| if (measurements[i] == null) { |
| continue; |
| } |
| |
| size += Byte.BYTES; |
| if (bitMaps[i] != null) { |
| int len = end - start; |
| BitMap partBitMap = new BitMap(len); |
| BitMap.copyOfRange(bitMaps[i], start, partBitMap, 0, len); |
| size += partBitMap.getByteArray().length; |
| } |
| } |
| } |
| // values size |
| for (int i = 0; i < dataTypes.length; i++) { |
| if (columns[i] != null) { |
| size += getColumnSize(dataTypes[i], columns[i], start, end); |
| } |
| } |
| |
| size += Byte.BYTES; |
| return size; |
| } |
| |
| private int getColumnSize(TSDataType dataType, Object column, int start, int end) { |
| int size = 0; |
| switch (dataType) { |
| case INT32: |
| size += Integer.BYTES * (end - start); |
| break; |
| case INT64: |
| size += Long.BYTES * (end - start); |
| break; |
| case FLOAT: |
| size += Float.BYTES * (end - start); |
| break; |
| case DOUBLE: |
| size += Double.BYTES * (end - start); |
| break; |
| case BOOLEAN: |
| size += Byte.BYTES * (end - start); |
| break; |
| case TEXT: |
| Binary[] binaryValues = (Binary[]) column; |
| for (int j = start; j < end; j++) { |
| size += ReadWriteIOUtils.sizeToWrite(binaryValues[j]); |
| } |
| break; |
| } |
| return size; |
| } |
| |
| /** |
| * Compared with {@link this#serialize(ByteBuffer)}, more info: search index and data types, less |
| * info: isNeedInferType |
| */ |
| @Override |
| public void serializeToWAL(IWALByteBufferView buffer) { |
| serializeToWAL(buffer, 0, rowCount); |
| } |
| |
| public void serializeToWAL(IWALByteBufferView buffer, int start, int end) { |
| buffer.putShort(PlanNodeType.INSERT_TABLET.getNodeType()); |
| subSerialize(buffer, start, end); |
| } |
| |
| void subSerialize(IWALByteBufferView buffer, int start, int end) { |
| buffer.putLong(searchIndex); |
| WALWriteUtils.write(devicePath.getFullPath(), buffer); |
| // data types are serialized in measurement schemas |
| writeMeasurementSchemas(buffer); |
| writeTimes(buffer, start, end); |
| writeBitMaps(buffer, start, end); |
| writeValues(buffer, start, end); |
| buffer.put((byte) (isAligned ? 1 : 0)); |
| } |
| |
| /** Serialize measurement schemas, ignoring failed time series */ |
| private void writeMeasurementSchemas(IWALByteBufferView buffer) { |
| buffer.putInt(measurements.length - getFailedMeasurementNumber()); |
| serializeMeasurementSchemasToWAL(buffer); |
| } |
| |
| private void writeTimes(IWALByteBufferView buffer, int start, int end) { |
| buffer.putInt(end - start); |
| for (int i = start; i < end; i++) { |
| buffer.putLong(times[i]); |
| } |
| } |
| |
| /** Serialize bitmaps, ignoring failed time series */ |
| private void writeBitMaps(IWALByteBufferView buffer, int start, int end) { |
| buffer.put(BytesUtils.boolToByte(bitMaps != null)); |
| if (bitMaps != null) { |
| for (int i = 0; i < bitMaps.length; i++) { |
| // ignore failed partial insert |
| if (measurements[i] == null) { |
| continue; |
| } |
| |
| if (bitMaps[i] == null) { |
| buffer.put(BytesUtils.boolToByte(false)); |
| } else { |
| buffer.put(BytesUtils.boolToByte(true)); |
| int len = end - start; |
| BitMap partBitMap = new BitMap(len); |
| BitMap.copyOfRange(bitMaps[i], start, partBitMap, 0, len); |
| buffer.put(partBitMap.getByteArray()); |
| } |
| } |
| } |
| } |
| |
| /** Serialize values, ignoring failed time series */ |
| private void writeValues(IWALByteBufferView buffer, int start, int end) { |
| for (int i = 0; i < columns.length; i++) { |
| // ignore failed partial insert |
| if (measurements[i] == null) { |
| continue; |
| } |
| serializeColumn(dataTypes[i], columns[i], buffer, start, end); |
| } |
| } |
| |
| private void serializeColumn( |
| TSDataType dataType, Object column, IWALByteBufferView buffer, int start, int end) { |
| switch (dataType) { |
| case INT32: |
| int[] intValues = (int[]) column; |
| for (int j = start; j < end; j++) { |
| buffer.putInt(intValues[j]); |
| } |
| break; |
| case INT64: |
| long[] longValues = (long[]) column; |
| for (int j = start; j < end; j++) { |
| buffer.putLong(longValues[j]); |
| } |
| break; |
| case FLOAT: |
| float[] floatValues = (float[]) column; |
| for (int j = start; j < end; j++) { |
| buffer.putFloat(floatValues[j]); |
| } |
| break; |
| case DOUBLE: |
| double[] doubleValues = (double[]) column; |
| for (int j = start; j < end; j++) { |
| buffer.putDouble(doubleValues[j]); |
| } |
| break; |
| case BOOLEAN: |
| boolean[] boolValues = (boolean[]) column; |
| for (int j = start; j < end; j++) { |
| buffer.put(BytesUtils.boolToByte(boolValues[j])); |
| } |
| break; |
| case TEXT: |
| Binary[] binaryValues = (Binary[]) column; |
| for (int j = start; j < end; j++) { |
| buffer.putInt(binaryValues[j].getLength()); |
| buffer.put(binaryValues[j].getValues()); |
| } |
| break; |
| default: |
| throw new UnSupportedDataTypeException(String.format(DATATYPE_UNSUPPORTED, dataType)); |
| } |
| } |
| |
| /** Deserialize from wal */ |
| public static InsertTabletNode deserializeFromWAL(DataInputStream stream) throws IOException { |
| // we do not store plan node id in wal entry |
| InsertTabletNode insertNode = new InsertTabletNode(new PlanNodeId("")); |
| insertNode.subDeserializeFromWAL(stream); |
| return insertNode; |
| } |
| |
| private void subDeserializeFromWAL(DataInputStream stream) throws IOException { |
| searchIndex = stream.readLong(); |
| try { |
| devicePath = |
| DataNodeDevicePathCache.getInstance().getPartialPath(ReadWriteIOUtils.readString(stream)); |
| } catch (IllegalPathException e) { |
| throw new IllegalArgumentException("Cannot deserialize InsertTabletNode", e); |
| } |
| |
| int measurementSize = stream.readInt(); |
| measurements = new String[measurementSize]; |
| measurementSchemas = new MeasurementSchema[measurementSize]; |
| dataTypes = new TSDataType[measurementSize]; |
| deserializeMeasurementSchemas(stream); |
| |
| rowCount = stream.readInt(); |
| times = new long[rowCount]; |
| times = QueryDataSetUtils.readTimesFromStream(stream, rowCount); |
| |
| boolean hasBitMaps = BytesUtils.byteToBool(stream.readByte()); |
| if (hasBitMaps) { |
| bitMaps = |
| QueryDataSetUtils.readBitMapsFromStream(stream, measurementSize, rowCount).orElse(null); |
| } |
| columns = |
| QueryDataSetUtils.readTabletValuesFromStream(stream, dataTypes, measurementSize, rowCount); |
| isAligned = stream.readByte() == 1; |
| } |
| |
| public static InsertTabletNode deserializeFromWAL(ByteBuffer buffer) { |
| // we do not store plan node id in wal entry |
| InsertTabletNode insertNode = new InsertTabletNode(new PlanNodeId("")); |
| insertNode.subDeserializeFromWAL(buffer); |
| return insertNode; |
| } |
| |
| private void subDeserializeFromWAL(ByteBuffer buffer) { |
| searchIndex = buffer.getLong(); |
| try { |
| devicePath = new PartialPath(ReadWriteIOUtils.readString(buffer)); |
| } catch (IllegalPathException e) { |
| throw new IllegalArgumentException("Cannot deserialize InsertTabletNode", e); |
| } |
| |
| int measurementSize = buffer.getInt(); |
| measurements = new String[measurementSize]; |
| measurementSchemas = new MeasurementSchema[measurementSize]; |
| deserializeMeasurementSchemas(buffer); |
| |
| // data types are serialized in measurement schemas |
| dataTypes = new TSDataType[measurementSize]; |
| for (int i = 0; i < measurementSize; i++) { |
| dataTypes[i] = measurementSchemas[i].getType(); |
| } |
| |
| rowCount = buffer.getInt(); |
| times = new long[rowCount]; |
| times = QueryDataSetUtils.readTimesFromBuffer(buffer, rowCount); |
| |
| boolean hasBitMaps = BytesUtils.byteToBool(buffer.get()); |
| if (hasBitMaps) { |
| bitMaps = |
| QueryDataSetUtils.readBitMapsFromBuffer(buffer, measurementSize, rowCount).orElse(null); |
| } |
| columns = |
| QueryDataSetUtils.readTabletValuesFromBuffer(buffer, dataTypes, measurementSize, rowCount); |
| isAligned = buffer.get() == 1; |
| } |
| // endregion |
| |
| @Override |
| public int hashCode() { |
| int result = Objects.hash(super.hashCode(), rowCount, range); |
| result = 31 * result + Arrays.hashCode(times); |
| result = 31 * result + Arrays.hashCode(bitMaps); |
| result = 31 * result + Arrays.deepHashCode(columns); |
| return result; |
| } |
| |
| @Override |
| public boolean equals(Object o) { |
| if (this == o) return true; |
| if (o == null || getClass() != o.getClass()) return false; |
| if (!super.equals(o)) return false; |
| InsertTabletNode that = (InsertTabletNode) o; |
| return rowCount == that.rowCount |
| && Arrays.equals(times, that.times) |
| && Arrays.equals(bitMaps, that.bitMaps) |
| && equals(that.columns) |
| && Objects.equals(range, that.range); |
| } |
| |
| private boolean equals(Object[] columns) { |
| if (this.columns == columns) { |
| return true; |
| } |
| |
| if (columns == null || this.columns == null || columns.length != this.columns.length) { |
| return false; |
| } |
| |
| for (int i = 0; i < columns.length; i++) { |
| if (dataTypes[i] != null) { |
| switch (dataTypes[i]) { |
| case INT32: |
| if (!Arrays.equals((int[]) this.columns[i], (int[]) columns[i])) { |
| return false; |
| } |
| break; |
| case INT64: |
| if (!Arrays.equals((long[]) this.columns[i], (long[]) columns[i])) { |
| return false; |
| } |
| break; |
| case FLOAT: |
| if (!Arrays.equals((float[]) this.columns[i], (float[]) columns[i])) { |
| return false; |
| } |
| break; |
| case DOUBLE: |
| if (!Arrays.equals((double[]) this.columns[i], (double[]) columns[i])) { |
| return false; |
| } |
| break; |
| case BOOLEAN: |
| if (!Arrays.equals((boolean[]) this.columns[i], (boolean[]) columns[i])) { |
| return false; |
| } |
| break; |
| case TEXT: |
| if (!Arrays.equals((Binary[]) this.columns[i], (Binary[]) columns[i])) { |
| return false; |
| } |
| break; |
| default: |
| throw new UnSupportedDataTypeException( |
| String.format(DATATYPE_UNSUPPORTED, dataTypes[i])); |
| } |
| } else if (!columns[i].equals(columns)) { |
| return false; |
| } |
| } |
| |
| return true; |
| } |
| |
| @Override |
| public <R, C> R accept(PlanVisitor<R, C> visitor, C context) { |
| return visitor.visitInsertTablet(this, context); |
| } |
| |
| public TimeValuePair composeLastTimeValuePair(int measurementIndex) { |
| if (measurementIndex >= columns.length) { |
| return null; |
| } |
| |
| // get non-null value |
| int lastIdx = rowCount - 1; |
| if (bitMaps != null && bitMaps[measurementIndex] != null) { |
| BitMap bitMap = bitMaps[measurementIndex]; |
| while (lastIdx >= 0) { |
| if (!bitMap.isMarked(lastIdx)) { |
| break; |
| } |
| lastIdx--; |
| } |
| } |
| if (lastIdx < 0) { |
| return null; |
| } |
| |
| TsPrimitiveType value; |
| switch (dataTypes[measurementIndex]) { |
| case INT32: |
| int[] intValues = (int[]) columns[measurementIndex]; |
| value = new TsPrimitiveType.TsInt(intValues[lastIdx]); |
| break; |
| case INT64: |
| long[] longValues = (long[]) columns[measurementIndex]; |
| value = new TsPrimitiveType.TsLong(longValues[lastIdx]); |
| break; |
| case FLOAT: |
| float[] floatValues = (float[]) columns[measurementIndex]; |
| value = new TsPrimitiveType.TsFloat(floatValues[lastIdx]); |
| break; |
| case DOUBLE: |
| double[] doubleValues = (double[]) columns[measurementIndex]; |
| value = new TsPrimitiveType.TsDouble(doubleValues[lastIdx]); |
| break; |
| case BOOLEAN: |
| boolean[] boolValues = (boolean[]) columns[measurementIndex]; |
| value = new TsPrimitiveType.TsBoolean(boolValues[lastIdx]); |
| break; |
| case TEXT: |
| Binary[] binaryValues = (Binary[]) columns[measurementIndex]; |
| value = new TsPrimitiveType.TsBinary(binaryValues[lastIdx]); |
| break; |
| default: |
| throw new UnSupportedDataTypeException( |
| String.format(DATATYPE_UNSUPPORTED, dataTypes[measurementIndex])); |
| } |
| return new TimeValuePair(times[lastIdx], value); |
| } |
| } |