blob: 8aa203fc2f7175c0b921f89ba548cf571f160804 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.tsfile.write.chunk;
import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
import org.apache.iotdb.tsfile.file.metadata.IDeviceID;
import org.apache.iotdb.tsfile.file.metadata.PlainDeviceID;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.write.record.Tablet;
import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
/** a implementation of IChunkGroupWriter. */
public class NonAlignedChunkGroupWriterImpl implements IChunkGroupWriter {
private static final Logger LOG = LoggerFactory.getLogger(NonAlignedChunkGroupWriterImpl.class);
private final IDeviceID deviceId;
/** Map(measurementID, ChunkWriterImpl). Aligned measurementId is empty. */
private final Map<String, ChunkWriterImpl> chunkWriters = new LinkedHashMap<>();
// measurementId -> lastTime
private Map<String, Long> lastTimeMap = new HashMap<>();
public NonAlignedChunkGroupWriterImpl(IDeviceID deviceId) {
this.deviceId = deviceId;
}
@Override
public void tryToAddSeriesWriter(MeasurementSchema schema) {
if (!chunkWriters.containsKey(schema.getMeasurementId())) {
this.chunkWriters.put(schema.getMeasurementId(), new ChunkWriterImpl(schema));
}
}
@Override
public void tryToAddSeriesWriter(List<MeasurementSchema> schemas) {
for (IMeasurementSchema schema : schemas) {
if (!chunkWriters.containsKey(schema.getMeasurementId())) {
this.chunkWriters.put(schema.getMeasurementId(), new ChunkWriterImpl(schema));
}
}
}
@Override
public int write(long time, List<DataPoint> data) throws IOException, WriteProcessException {
int pointCount = 0;
for (DataPoint point : data) {
checkIsHistoryData(point.getMeasurementId(), time);
if (pointCount == 0) {
pointCount++;
}
point.writeTo(
time, chunkWriters.get(point.getMeasurementId())); // write time and value to page
lastTimeMap.put(point.getMeasurementId(), time);
}
return pointCount;
}
@Override
public int write(Tablet tablet) throws WriteProcessException {
int maxPointCount = 0, pointCount;
List<MeasurementSchema> timeseries = tablet.getSchemas();
for (int column = 0; column < timeseries.size(); column++) {
String measurementId = timeseries.get(column).getMeasurementId();
TSDataType tsDataType = timeseries.get(column).getType();
pointCount = 0;
for (int row = 0; row < tablet.rowSize; row++) {
// check isNull in tablet
if (tablet.bitMaps != null
&& tablet.bitMaps[column] != null
&& tablet.bitMaps[column].isMarked(row)) {
continue;
}
long time = tablet.timestamps[row];
checkIsHistoryData(measurementId, time);
pointCount++;
switch (tsDataType) {
case INT32:
chunkWriters.get(measurementId).write(time, ((int[]) tablet.values[column])[row]);
break;
case INT64:
chunkWriters.get(measurementId).write(time, ((long[]) tablet.values[column])[row]);
break;
case FLOAT:
chunkWriters.get(measurementId).write(time, ((float[]) tablet.values[column])[row]);
break;
case DOUBLE:
chunkWriters.get(measurementId).write(time, ((double[]) tablet.values[column])[row]);
break;
case BOOLEAN:
chunkWriters.get(measurementId).write(time, ((boolean[]) tablet.values[column])[row]);
break;
case TEXT:
chunkWriters.get(measurementId).write(time, ((Binary[]) tablet.values[column])[row]);
break;
default:
throw new UnSupportedDataTypeException(
String.format("Data type %s is not supported.", tsDataType));
}
lastTimeMap.put(measurementId, time);
}
maxPointCount = Math.max(pointCount, maxPointCount);
}
return maxPointCount;
}
@Override
public long flushToFileWriter(TsFileIOWriter fileWriter) throws IOException {
LOG.debug("start flush device id:{}", deviceId);
// make sure all the pages have been compressed into buffers, so that we can get correct
// groupWriter.getCurrentChunkGroupSize().
sealAllChunks();
long currentChunkGroupSize = getCurrentChunkGroupSize();
for (IChunkWriter seriesWriter : chunkWriters.values()) {
seriesWriter.writeToFileWriter(fileWriter);
}
return currentChunkGroupSize;
}
@Override
public long updateMaxGroupMemSize() {
long bufferSize = 0;
for (IChunkWriter seriesWriter : chunkWriters.values()) {
bufferSize += seriesWriter.estimateMaxSeriesMemSize();
}
return bufferSize;
}
@Override
public long getCurrentChunkGroupSize() {
long size = 0;
for (IChunkWriter writer : chunkWriters.values()) {
size += writer.getSerializedChunkSize();
}
return size;
}
/** seal all the chunks which may has un-sealed pages in force. */
private void sealAllChunks() {
for (IChunkWriter writer : chunkWriters.values()) {
writer.sealCurrentPage();
}
}
private void checkIsHistoryData(String measurementId, long time) throws WriteProcessException {
if (time <= lastTimeMap.getOrDefault(measurementId, -1L)) {
throw new WriteProcessException(
"Not allowed to write out-of-order data in timeseries "
+ ((PlainDeviceID) deviceId).toStringID()
+ TsFileConstant.PATH_SEPARATOR
+ measurementId
+ ", time should later than "
+ lastTimeMap.get(measurementId));
}
}
public Map<String, Long> getLastTimeMap() {
return this.lastTimeMap;
}
public void setLastTimeMap(Map<String, Long> lastTimeMap) {
this.lastTimeMap = lastTimeMap;
}
}