blob: d9cb4fcdafecd7d569c32de1ba7740bf4ac764aa [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.tsfile.write.writer;
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.encoding.encoder.Encoder;
import org.apache.iotdb.tsfile.encoding.encoder.TSEncodingBuilder;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
import org.apache.iotdb.tsfile.file.metadata.IDeviceID;
import org.apache.iotdb.tsfile.file.metadata.PlainDeviceID;
import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
import org.apache.iotdb.tsfile.write.TsFileIntegrityCheckingTool;
import org.apache.iotdb.tsfile.write.chunk.AlignedChunkWriterImpl;
import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl;
import org.apache.iotdb.tsfile.write.chunk.TimeChunkWriter;
import org.apache.iotdb.tsfile.write.chunk.ValueChunkWriter;
import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.apache.iotdb.tsfile.write.writer.tsmiterator.TSMIterator;
import org.apache.commons.io.FileUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
public class TsFileIOWriterMemoryControlTest {
private static File testFile = new File("target", "1-1-0-0.tsfile");
private static File emptyFile = new File("target", "temp");
private long TEST_CHUNK_SIZE = 1000;
private List<String> sortedSeriesId = new ArrayList<>();
private List<IDeviceID> sortedDeviceId = new ArrayList<>();
private boolean init = false;
@Before
public void setUp() throws IOException {
if (!init) {
init = true;
for (int i = 0; i < 2048; ++i) {
sortedSeriesId.add("s" + i);
sortedDeviceId.add(new PlainDeviceID("root.sg.d" + i));
}
sortedSeriesId.sort((String::compareTo));
sortedDeviceId.sort((IDeviceID::compareTo));
}
TEST_CHUNK_SIZE = 1000;
}
@After
public void tearDown() throws IOException {
if (testFile.exists()) {
FileUtils.delete(testFile);
}
if (new File(testFile.getPath() + TsFileIOWriter.CHUNK_METADATA_TEMP_FILE_SUFFIX).exists()) {
FileUtils.delete(
new File(testFile.getPath() + TsFileIOWriter.CHUNK_METADATA_TEMP_FILE_SUFFIX));
}
if (emptyFile.exists()) {
FileUtils.delete(emptyFile);
}
}
/** The following tests is for ChunkMetadata serialization and deserialization. */
@Test
public void testSerializeAndDeserializeChunkMetadata() throws IOException {
try (TsFileIOWriter writer = new TsFileIOWriter(testFile, 1024 * 1024 * 10)) {
List<ChunkMetadata> originChunkMetadataList = new ArrayList<>();
for (int i = 0; i < 10; ++i) {
IDeviceID deviceId = sortedDeviceId.get(i);
writer.startChunkGroup(deviceId);
for (int j = 0; j < 5; ++j) {
ChunkWriterImpl chunkWriter;
switch (j) {
case 0:
chunkWriter = generateIntData(j, 0L, new ArrayList<>());
break;
case 1:
chunkWriter = generateBooleanData(j, 0, new ArrayList<>());
break;
case 2:
chunkWriter = generateFloatData(j, 0L, new ArrayList<>());
break;
case 3:
chunkWriter = generateDoubleData(j, 0L, new ArrayList<>());
break;
case 4:
default:
chunkWriter = generateTextData(j, 0L, new ArrayList<>());
break;
}
chunkWriter.writeToFileWriter(writer);
}
originChunkMetadataList.addAll(writer.chunkMetadataList);
writer.endChunkGroup();
}
writer.sortAndFlushChunkMetadata();
writer.tempOutput.flush();
TSMIterator iterator =
TSMIterator.getTSMIteratorInDisk(
writer.chunkMetadataTempFile,
writer.chunkGroupMetadataList,
writer.endPosInCMTForDevice);
for (int i = 0; iterator.hasNext(); ++i) {
Pair<Path, TimeseriesMetadata> timeseriesMetadataPair = iterator.next();
TimeseriesMetadata timeseriesMetadata = timeseriesMetadataPair.right;
Assert.assertEquals(sortedSeriesId.get(i % 5), timeseriesMetadata.getMeasurementId());
Assert.assertEquals(
originChunkMetadataList.get(i).getDataType(), timeseriesMetadata.getTsDataType());
Assert.assertEquals(
originChunkMetadataList.get(i).getStatistics(), timeseriesMetadata.getStatistics());
}
}
}
@Test
public void testSerializeAndDeserializeAlignedChunkMetadata() throws IOException {
try (TsFileIOWriter writer = new TsFileIOWriter(testFile, 1024 * 1024 * 10)) {
List<ChunkMetadata> originChunkMetadataList = new ArrayList<>();
for (int i = 0; i < 10; ++i) {
IDeviceID deviceId = sortedDeviceId.get(i);
writer.startChunkGroup(deviceId);
AlignedChunkWriterImpl chunkWriter = generateVectorData(0L, new ArrayList<>(), 6);
chunkWriter.writeToFileWriter(writer);
originChunkMetadataList.addAll(writer.chunkMetadataList);
writer.endChunkGroup();
}
writer.sortAndFlushChunkMetadata();
writer.tempOutput.flush();
List<String> measurementIds = new ArrayList<>();
for (int i = 0; i < 10; ++i) {
measurementIds.add(((PlainDeviceID) sortedDeviceId.get(i)).toStringID() + ".");
for (int j = 1; j <= 6; ++j) {
measurementIds.add(((PlainDeviceID) sortedDeviceId.get(i)).toStringID() + ".s" + j);
}
}
TSMIterator iterator =
TSMIterator.getTSMIteratorInDisk(
writer.chunkMetadataTempFile, new ArrayList<>(), writer.endPosInCMTForDevice);
for (int i = 0; iterator.hasNext(); ++i) {
Pair<Path, TimeseriesMetadata> timeseriesMetadataPair = iterator.next();
String fullPath = timeseriesMetadataPair.left.getFullPath();
TimeseriesMetadata timeseriesMetadata = timeseriesMetadataPair.right;
Assert.assertEquals(measurementIds.get(i), fullPath);
Assert.assertEquals(
originChunkMetadataList.get(i).getDataType(), timeseriesMetadata.getTsDataType());
Assert.assertEquals(
originChunkMetadataList.get(i).getStatistics(), timeseriesMetadata.getStatistics());
}
}
}
@Test
public void testSerializeAndDeserializeMixedChunkMetadata() throws IOException {
try (TsFileIOWriter writer = new TsFileIOWriter(testFile, 1024 * 1024 * 10)) {
List<IChunkMetadata> originChunkMetadataList = new ArrayList<>();
List<String> seriesIds = new ArrayList<>();
for (int i = 0; i < 10; ++i) {
IDeviceID deviceId = sortedDeviceId.get(i);
writer.startChunkGroup(deviceId);
if (i % 2 == 0) {
// write normal series
for (int j = 0; j < 5; ++j) {
ChunkWriterImpl chunkWriter;
switch (j) {
case 0:
chunkWriter = generateIntData(j, 0L, new ArrayList<>());
break;
case 1:
chunkWriter = generateBooleanData(j, 0L, new ArrayList<>());
break;
case 2:
chunkWriter = generateFloatData(j, 0L, new ArrayList<>());
break;
case 3:
chunkWriter = generateDoubleData(j, 0L, new ArrayList<>());
break;
case 4:
default:
chunkWriter = generateTextData(j, 0L, new ArrayList<>());
break;
}
chunkWriter.writeToFileWriter(writer);
seriesIds.add(((PlainDeviceID) deviceId).toStringID() + "." + sortedSeriesId.get(j));
}
} else {
// write vector
AlignedChunkWriterImpl chunkWriter = generateVectorData(0L, new ArrayList<>(), 6);
chunkWriter.writeToFileWriter(writer);
seriesIds.add(((PlainDeviceID) deviceId).toStringID() + ".");
for (int l = 1; l <= 6; ++l) {
seriesIds.add(((PlainDeviceID) deviceId).toStringID() + ".s" + l);
}
}
originChunkMetadataList.addAll(writer.chunkMetadataList);
writer.endChunkGroup();
}
writer.sortAndFlushChunkMetadata();
writer.tempOutput.flush();
TSMIterator iterator =
TSMIterator.getTSMIteratorInDisk(
writer.chunkMetadataTempFile, new ArrayList<>(), writer.endPosInCMTForDevice);
for (int i = 0; i < originChunkMetadataList.size(); ++i) {
Pair<Path, TimeseriesMetadata> timeseriesMetadataPair = iterator.next();
Assert.assertEquals(seriesIds.get(i), timeseriesMetadataPair.left.getFullPath());
Assert.assertEquals(
originChunkMetadataList.get(i).getDataType(),
timeseriesMetadataPair.right.getTsDataType());
Assert.assertEquals(
originChunkMetadataList.get(i).getStatistics(),
timeseriesMetadataPair.right.getStatistics());
}
}
}
/** The following tests is for writing normal series in different nums. */
/**
* Write a file with 10 devices and 5 series in each device. For each series, we write one chunk
* for it. This test make sure that each chunk
*
* @throws IOException
*/
@Test
public void testWriteCompleteFileWithNormalChunk() throws IOException {
Map<IDeviceID, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originData =
new HashMap<>();
try (TsFileIOWriter writer = new TsFileIOWriter(testFile, 1024)) {
for (int i = 0; i < 10; ++i) {
IDeviceID deviceId = sortedDeviceId.get(i);
writer.startChunkGroup(deviceId);
for (int j = 0; j < 5; ++j) {
List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
ChunkWriterImpl chunkWriter;
switch (j) {
case 0:
chunkWriter = generateIntData(j, 0L, valList);
break;
case 1:
chunkWriter = generateBooleanData(j, 0L, valList);
break;
case 2:
chunkWriter = generateFloatData(j, 0L, valList);
break;
case 3:
chunkWriter = generateDoubleData(j, 0L, valList);
break;
case 4:
default:
chunkWriter = generateTextData(j, 0L, valList);
break;
}
chunkWriter.writeToFileWriter(writer);
writer.checkMetadataSizeAndMayFlush();
originData
.computeIfAbsent(deviceId, x -> new HashMap<>())
.computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
.add(valList);
}
writer.endChunkGroup();
}
Assert.assertTrue(writer.hasChunkMetadataInDisk);
writer.endFile();
}
Assert.assertFalse(
new File(testFile.getAbsolutePath() + TsFileIOWriter.CHUNK_METADATA_TEMP_FILE_SUFFIX)
.exists());
TsFileIntegrityCheckingTool.checkIntegrityBySequenceRead(testFile.getPath());
TsFileIntegrityCheckingTool.checkIntegrityByQuery(testFile.getPath(), originData);
}
/**
* Write a file with 10 devices and 5 series in each device. For each series, we write 100 chunks
* for it. This test make sure that each chunk
*
* @throws IOException
*/
@Test
public void testWriteCompleteFileWithMultipleNormalChunk() throws IOException {
Map<IDeviceID, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originData =
new HashMap<>();
try (TsFileIOWriter writer = new TsFileIOWriter(testFile, 1024)) {
for (int i = 0; i < 10; ++i) {
IDeviceID deviceId = sortedDeviceId.get(i);
writer.startChunkGroup(deviceId);
for (int j = 0; j < 5; ++j) {
ChunkWriterImpl chunkWriter;
switch (j) {
case 0:
for (int k = 0; k < 10; ++k) {
List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
chunkWriter = generateIntData(j, (long) TEST_CHUNK_SIZE * k, valList);
chunkWriter.writeToFileWriter(writer);
originData
.computeIfAbsent(deviceId, x -> new HashMap<>())
.computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
.add(valList);
}
break;
case 1:
for (int k = 0; k < 10; ++k) {
List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
chunkWriter = generateBooleanData(j, (long) TEST_CHUNK_SIZE * k, valList);
chunkWriter.writeToFileWriter(writer);
originData
.computeIfAbsent(deviceId, x -> new HashMap<>())
.computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
.add(valList);
}
break;
case 2:
for (int k = 0; k < 10; ++k) {
List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
chunkWriter = generateFloatData(j, (long) TEST_CHUNK_SIZE * k, valList);
chunkWriter.writeToFileWriter(writer);
originData
.computeIfAbsent(deviceId, x -> new HashMap<>())
.computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
.add(valList);
}
break;
case 3:
for (int k = 0; k < 10; ++k) {
List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
chunkWriter = generateDoubleData(j, (long) TEST_CHUNK_SIZE * k, valList);
chunkWriter.writeToFileWriter(writer);
originData
.computeIfAbsent(deviceId, x -> new HashMap<>())
.computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
.add(valList);
}
break;
case 4:
default:
for (int k = 0; k < 10; ++k) {
List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
chunkWriter = generateTextData(j, (long) TEST_CHUNK_SIZE * k, valList);
chunkWriter.writeToFileWriter(writer);
originData
.computeIfAbsent(deviceId, x -> new HashMap<>())
.computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
.add(valList);
}
break;
}
writer.checkMetadataSizeAndMayFlush();
}
writer.endChunkGroup();
}
Assert.assertTrue(writer.hasChunkMetadataInDisk);
writer.endFile();
}
Assert.assertFalse(
new File(testFile.getAbsolutePath() + TsFileIOWriter.CHUNK_METADATA_TEMP_FILE_SUFFIX)
.exists());
TsFileIntegrityCheckingTool.checkIntegrityBySequenceRead(testFile.getPath());
TsFileIntegrityCheckingTool.checkIntegrityByQuery(testFile.getPath(), originData);
}
/**
* Write a file with 10 devices and 5 series in each device. For each series, we write 100 chunks
* for it. We maintain some chunk metadata in memory when calling endFile().
*
* @throws IOException
*/
@Test
public void testWriteCompleteFileWithMetadataRemainsInMemoryWhenEndFile() throws IOException {
Map<IDeviceID, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originData =
new HashMap<>();
try (TsFileIOWriter writer = new TsFileIOWriter(testFile, 1024)) {
for (int i = 0; i < 10; ++i) {
IDeviceID deviceId = sortedDeviceId.get(i);
writer.startChunkGroup(deviceId);
for (int j = 0; j < 5; ++j) {
ChunkWriterImpl chunkWriter;
switch (j) {
case 0:
for (int k = 0; k < 10; ++k) {
List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
chunkWriter = generateIntData(j, (long) TEST_CHUNK_SIZE * k, valList);
chunkWriter.writeToFileWriter(writer);
originData
.computeIfAbsent(deviceId, x -> new HashMap<>())
.computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
.add(valList);
}
break;
case 1:
for (int k = 0; k < 10; ++k) {
List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
chunkWriter = generateBooleanData(j, (long) TEST_CHUNK_SIZE * k, valList);
chunkWriter.writeToFileWriter(writer);
originData
.computeIfAbsent(deviceId, x -> new HashMap<>())
.computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
.add(valList);
}
break;
case 2:
for (int k = 0; k < 10; ++k) {
List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
chunkWriter = generateFloatData(j, (long) TEST_CHUNK_SIZE * k, valList);
chunkWriter.writeToFileWriter(writer);
originData
.computeIfAbsent(deviceId, x -> new HashMap<>())
.computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
.add(valList);
}
break;
case 3:
for (int k = 0; k < 10; ++k) {
List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
chunkWriter = generateDoubleData(j, (long) TEST_CHUNK_SIZE * k, valList);
chunkWriter.writeToFileWriter(writer);
originData
.computeIfAbsent(deviceId, x -> new HashMap<>())
.computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
.add(valList);
}
break;
case 4:
default:
for (int k = 0; k < 10; ++k) {
List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
chunkWriter = generateTextData(j, (long) TEST_CHUNK_SIZE * k, valList);
chunkWriter.writeToFileWriter(writer);
originData
.computeIfAbsent(deviceId, x -> new HashMap<>())
.computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
.add(valList);
}
break;
}
if (i < 9) {
writer.checkMetadataSizeAndMayFlush();
}
}
writer.endChunkGroup();
}
Assert.assertTrue(writer.hasChunkMetadataInDisk);
Assert.assertFalse(writer.chunkGroupMetadataList.isEmpty());
writer.endFile();
}
Assert.assertFalse(
new File(testFile.getAbsolutePath() + TsFileIOWriter.CHUNK_METADATA_TEMP_FILE_SUFFIX)
.exists());
TsFileIntegrityCheckingTool.checkIntegrityBySequenceRead(testFile.getPath());
TsFileIntegrityCheckingTool.checkIntegrityByQuery(testFile.getPath(), originData);
}
/**
* Write a file with 2 devices and 5 series in each device. For each series, we write 1024 chunks
* for it. This test make sure that each chunk
*
* @throws IOException
*/
@Test
public void testWriteCompleteFileWithEnormousNormalChunk() throws IOException {
Map<IDeviceID, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originData =
new HashMap<>();
long originTestChunkSize = TEST_CHUNK_SIZE;
TEST_CHUNK_SIZE = 10;
try (TsFileIOWriter writer = new TsFileIOWriter(testFile, 1024)) {
for (int i = 0; i < 2; ++i) {
IDeviceID deviceId = sortedDeviceId.get(i);
writer.startChunkGroup(deviceId);
for (int j = 0; j < 5; ++j) {
ChunkWriterImpl chunkWriter;
switch (j) {
case 0:
for (int k = 0; k < 1024; ++k) {
List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
chunkWriter = generateIntData(j, (long) TEST_CHUNK_SIZE * k, valList);
chunkWriter.writeToFileWriter(writer);
originData
.computeIfAbsent(deviceId, x -> new HashMap<>())
.computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
.add(valList);
}
break;
case 1:
for (int k = 0; k < 1024; ++k) {
List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
chunkWriter = generateBooleanData(j, (long) TEST_CHUNK_SIZE * k, valList);
chunkWriter.writeToFileWriter(writer);
originData
.computeIfAbsent(deviceId, x -> new HashMap<>())
.computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
.add(valList);
}
break;
case 2:
for (int k = 0; k < 1024; ++k) {
List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
chunkWriter = generateFloatData(j, (long) TEST_CHUNK_SIZE * k, valList);
chunkWriter.writeToFileWriter(writer);
originData
.computeIfAbsent(deviceId, x -> new HashMap<>())
.computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
.add(valList);
}
break;
case 3:
for (int k = 0; k < 1024; ++k) {
List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
chunkWriter = generateDoubleData(j, (long) TEST_CHUNK_SIZE * k, valList);
chunkWriter.writeToFileWriter(writer);
originData
.computeIfAbsent(deviceId, x -> new HashMap<>())
.computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
.add(valList);
}
break;
case 4:
default:
for (int k = 0; k < 1024; ++k) {
List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
chunkWriter = generateTextData(j, (long) TEST_CHUNK_SIZE * k, valList);
chunkWriter.writeToFileWriter(writer);
originData
.computeIfAbsent(deviceId, x -> new HashMap<>())
.computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
.add(valList);
}
break;
}
writer.checkMetadataSizeAndMayFlush();
}
writer.endChunkGroup();
}
Assert.assertTrue(writer.hasChunkMetadataInDisk);
writer.endFile();
} finally {
TEST_CHUNK_SIZE = originTestChunkSize;
}
Assert.assertFalse(
new File(testFile.getAbsolutePath() + TsFileIOWriter.CHUNK_METADATA_TEMP_FILE_SUFFIX)
.exists());
TsFileIntegrityCheckingTool.checkIntegrityBySequenceRead(testFile.getPath());
TsFileIntegrityCheckingTool.checkIntegrityByQuery(testFile.getPath(), originData);
}
/**
* Write a file with 2 devices and 1024 series in each device. For each series, we write 50 chunks
* for it. This test make sure that each chunk
*
* @throws IOException
*/
@Test
public void testWriteCompleteFileWithEnormousSeriesNum() throws IOException {
Map<IDeviceID, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originTimes =
new HashMap<>();
long originTestChunkSize = TEST_CHUNK_SIZE;
TEST_CHUNK_SIZE = 1;
try (TsFileIOWriter writer = new TsFileIOWriter(testFile, 1024)) {
for (int i = 0; i < 2; ++i) {
IDeviceID deviceId = sortedDeviceId.get(i);
writer.startChunkGroup(deviceId);
for (int j = 0; j < 1024; ++j) {
ChunkWriterImpl chunkWriter;
switch (j % 5) {
case 0:
for (int k = 0; k < 50; ++k) {
List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
chunkWriter = generateIntData(j, (long) TEST_CHUNK_SIZE * k, valList);
chunkWriter.writeToFileWriter(writer);
originTimes
.computeIfAbsent(deviceId, x -> new HashMap<>())
.computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
.add(valList);
}
break;
case 1:
for (int k = 0; k < 50; ++k) {
List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
chunkWriter = generateBooleanData(j, (long) TEST_CHUNK_SIZE * k, valList);
chunkWriter.writeToFileWriter(writer);
originTimes
.computeIfAbsent(deviceId, x -> new HashMap<>())
.computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
.add(valList);
}
break;
case 2:
for (int k = 0; k < 50; ++k) {
List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
chunkWriter = generateFloatData(j, (long) TEST_CHUNK_SIZE * k, valList);
chunkWriter.writeToFileWriter(writer);
originTimes
.computeIfAbsent(deviceId, x -> new HashMap<>())
.computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
.add(valList);
}
break;
case 3:
for (int k = 0; k < 50; ++k) {
List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
chunkWriter = generateDoubleData(j, (long) TEST_CHUNK_SIZE * k, valList);
chunkWriter.writeToFileWriter(writer);
originTimes
.computeIfAbsent(deviceId, x -> new HashMap<>())
.computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
.add(valList);
}
break;
case 4:
default:
for (int k = 0; k < 50; ++k) {
List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
chunkWriter = generateTextData(j, (long) TEST_CHUNK_SIZE * k, valList);
chunkWriter.writeToFileWriter(writer);
originTimes
.computeIfAbsent(deviceId, x -> new HashMap<>())
.computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
.add(valList);
}
break;
}
writer.checkMetadataSizeAndMayFlush();
}
writer.endChunkGroup();
}
Assert.assertTrue(writer.hasChunkMetadataInDisk);
writer.endFile();
} finally {
TEST_CHUNK_SIZE = originTestChunkSize;
}
Assert.assertFalse(
new File(testFile.getAbsolutePath() + TsFileIOWriter.CHUNK_METADATA_TEMP_FILE_SUFFIX)
.exists());
TsFileIntegrityCheckingTool.checkIntegrityBySequenceRead(testFile.getPath());
TsFileIntegrityCheckingTool.checkIntegrityByQuery(testFile.getPath(), originTimes);
}
/**
* Write a file with 1024 devices and 5 series in each device. For each series, we write 10 chunks
* for it. This test make sure that each chunk
*
* @throws IOException
*/
@Test
public void testWriteCompleteFileWithEnormousDeviceNum() throws IOException {
Map<IDeviceID, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originTimes =
new HashMap<>();
long originTestChunkSize = TEST_CHUNK_SIZE;
TEST_CHUNK_SIZE = 10;
try (TsFileIOWriter writer = new TsFileIOWriter(testFile, 1024)) {
for (int i = 0; i < 1024; ++i) {
IDeviceID deviceId = sortedDeviceId.get(i);
writer.startChunkGroup(deviceId);
for (int j = 0; j < 5; ++j) {
ChunkWriterImpl chunkWriter;
switch (j % 5) {
case 0:
for (int k = 0; k < 10; ++k) {
List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
chunkWriter = generateIntData(j, (long) TEST_CHUNK_SIZE * k, valList);
chunkWriter.writeToFileWriter(writer);
originTimes
.computeIfAbsent(deviceId, x -> new HashMap<>())
.computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
.add(valList);
}
break;
case 1:
for (int k = 0; k < 10; ++k) {
List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
chunkWriter = generateBooleanData(j, (long) TEST_CHUNK_SIZE * k, valList);
chunkWriter.writeToFileWriter(writer);
originTimes
.computeIfAbsent(deviceId, x -> new HashMap<>())
.computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
.add(valList);
}
break;
case 2:
for (int k = 0; k < 10; ++k) {
List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
chunkWriter = generateFloatData(j, (long) TEST_CHUNK_SIZE * k, valList);
chunkWriter.writeToFileWriter(writer);
originTimes
.computeIfAbsent(deviceId, x -> new HashMap<>())
.computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
.add(valList);
}
break;
case 3:
for (int k = 0; k < 10; ++k) {
List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
chunkWriter = generateDoubleData(j, (long) TEST_CHUNK_SIZE * k, valList);
chunkWriter.writeToFileWriter(writer);
originTimes
.computeIfAbsent(deviceId, x -> new HashMap<>())
.computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
.add(valList);
}
break;
case 4:
default:
for (int k = 0; k < 10; ++k) {
List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
chunkWriter = generateTextData(j, (long) TEST_CHUNK_SIZE * k, valList);
chunkWriter.writeToFileWriter(writer);
originTimes
.computeIfAbsent(deviceId, x -> new HashMap<>())
.computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
.add(valList);
}
break;
}
writer.checkMetadataSizeAndMayFlush();
}
writer.endChunkGroup();
}
Assert.assertTrue(writer.hasChunkMetadataInDisk);
writer.endFile();
} finally {
TEST_CHUNK_SIZE = originTestChunkSize;
}
Assert.assertFalse(
new File(testFile.getAbsolutePath() + TsFileIOWriter.CHUNK_METADATA_TEMP_FILE_SUFFIX)
.exists());
TsFileIntegrityCheckingTool.checkIntegrityBySequenceRead(testFile.getPath());
TsFileIntegrityCheckingTool.checkIntegrityByQuery(testFile.getPath(), originTimes);
}
/** The following tests is for writing aligned series. */
/**
* Test writing 10 align series, 6 in a group.
*
* @throws IOException
*/
@Test
public void testWriteCompleteFileWithAlignedSeriesWithOneChunk() throws IOException {
Map<IDeviceID, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originData =
new HashMap<>();
try (TsFileIOWriter writer = new TsFileIOWriter(testFile, 1024)) {
for (int i = 0; i < 10; ++i) {
IDeviceID deviceId = sortedDeviceId.get(i);
writer.startChunkGroup(deviceId);
List<List<Pair<Long, TsPrimitiveType>>> valList = new ArrayList<>();
AlignedChunkWriterImpl chunkWriter = generateVectorData(0L, valList, 6);
for (int j = 1; j <= 6; ++j) {
originData
.computeIfAbsent(deviceId, x -> new HashMap<>())
.computeIfAbsent("s" + j, x -> new ArrayList<>())
.add(valList.get(j - 1));
}
chunkWriter.writeToFileWriter(writer);
writer.endChunkGroup();
writer.checkMetadataSizeAndMayFlush();
}
writer.endFile();
Assert.assertTrue(writer.hasChunkMetadataInDisk);
}
Assert.assertFalse(
new File(testFile.getAbsolutePath() + TsFileIOWriter.CHUNK_METADATA_TEMP_FILE_SUFFIX)
.exists());
TsFileIntegrityCheckingTool.checkIntegrityBySequenceRead(testFile.getPath());
TsFileIntegrityCheckingTool.checkIntegrityByQuery(testFile.getPath(), originData);
}
/**
* Test writing 1 aligned series, for each series we write 512 chunks
*
* @throws IOException
*/
@Test
public void testWriteCompleteFileWithAlignedSeriesWithMultiChunks() throws IOException {
Map<IDeviceID, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originData =
new HashMap<>();
int chunkNum = 512, seriesNum = 6;
try (TsFileIOWriter writer = new TsFileIOWriter(testFile, 1024)) {
for (int i = 0; i < 1; ++i) {
IDeviceID deviceId = sortedDeviceId.get(i);
for (int k = 0; k < chunkNum; ++k) {
writer.startChunkGroup(deviceId);
List<List<Pair<Long, TsPrimitiveType>>> valList = new ArrayList<>();
AlignedChunkWriterImpl chunkWriter =
generateVectorData(k * TEST_CHUNK_SIZE, valList, seriesNum);
for (int j = 1; j <= seriesNum; ++j) {
originData
.computeIfAbsent(deviceId, x -> new HashMap<>())
.computeIfAbsent("s" + j, x -> new ArrayList<>())
.add(valList.get(j - 1));
}
chunkWriter.writeToFileWriter(writer);
writer.endChunkGroup();
}
writer.checkMetadataSizeAndMayFlush();
}
writer.endFile();
Assert.assertTrue(writer.hasChunkMetadataInDisk);
}
Assert.assertFalse(
new File(testFile.getAbsolutePath() + TsFileIOWriter.CHUNK_METADATA_TEMP_FILE_SUFFIX)
.exists());
TsFileIntegrityCheckingTool.checkIntegrityBySequenceRead(testFile.getPath());
TsFileIntegrityCheckingTool.checkIntegrityByQuery(testFile.getPath(), originData);
}
/**
* Test write aligned chunk metadata, for each aligned series, we write 1024 components.
*
* @throws IOException
*/
@Test
public void testWriteCompleteFileWithAlignedSeriesWithManyComponents() throws IOException {
Map<IDeviceID, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originData =
new HashMap<>();
int chunkNum = 5, seriesNum = 1024;
long originTestPointNum = TEST_CHUNK_SIZE;
TEST_CHUNK_SIZE = 10;
try {
try (TsFileIOWriter writer = new TsFileIOWriter(testFile, 1024)) {
for (int i = 0; i < 10; ++i) {
IDeviceID deviceId = sortedDeviceId.get(i);
for (int k = 0; k < chunkNum; ++k) {
writer.startChunkGroup(deviceId);
List<List<Pair<Long, TsPrimitiveType>>> valList = new ArrayList<>();
AlignedChunkWriterImpl chunkWriter =
generateVectorData(k * TEST_CHUNK_SIZE, valList, seriesNum);
for (int j = 1; j <= seriesNum; ++j) {
originData
.computeIfAbsent(deviceId, x -> new HashMap<>())
.computeIfAbsent("s" + j, x -> new ArrayList<>())
.add(valList.get(j - 1));
}
chunkWriter.writeToFileWriter(writer);
writer.endChunkGroup();
}
writer.checkMetadataSizeAndMayFlush();
}
writer.endFile();
Assert.assertTrue(writer.hasChunkMetadataInDisk);
}
} finally {
TEST_CHUNK_SIZE = originTestPointNum;
}
Assert.assertFalse(
new File(testFile.getAbsolutePath() + TsFileIOWriter.CHUNK_METADATA_TEMP_FILE_SUFFIX)
.exists());
TsFileIntegrityCheckingTool.checkIntegrityBySequenceRead(testFile.getPath());
TsFileIntegrityCheckingTool.checkIntegrityByQuery(testFile.getPath(), originData);
}
@Test
public void testWriteCompleteFileWithLotsAlignedSeries() throws IOException {
Map<IDeviceID, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originData =
new HashMap<>();
int chunkNum = 5, seriesNum = 12;
long originTestPointNum = TEST_CHUNK_SIZE;
TEST_CHUNK_SIZE = 10;
int deviceNum = 1024;
try {
try (TsFileIOWriter writer = new TsFileIOWriter(testFile, 1024)) {
for (int i = 0; i < deviceNum; ++i) {
IDeviceID deviceId = sortedDeviceId.get(i);
for (int k = 0; k < chunkNum; ++k) {
writer.startChunkGroup(deviceId);
List<List<Pair<Long, TsPrimitiveType>>> valList = new ArrayList<>();
AlignedChunkWriterImpl chunkWriter =
generateVectorData(k * TEST_CHUNK_SIZE, valList, seriesNum);
for (int j = 1; j <= seriesNum; ++j) {
originData
.computeIfAbsent(deviceId, x -> new HashMap<>())
.computeIfAbsent("s" + j, x -> new ArrayList<>())
.add(valList.get(j - 1));
}
chunkWriter.writeToFileWriter(writer);
writer.endChunkGroup();
}
writer.checkMetadataSizeAndMayFlush();
}
writer.endFile();
Assert.assertTrue(writer.hasChunkMetadataInDisk);
}
} finally {
TEST_CHUNK_SIZE = originTestPointNum;
}
Assert.assertFalse(
new File(testFile.getAbsolutePath() + TsFileIOWriter.CHUNK_METADATA_TEMP_FILE_SUFFIX)
.exists());
TsFileIntegrityCheckingTool.checkIntegrityBySequenceRead(testFile.getPath());
TsFileIntegrityCheckingTool.checkIntegrityByQuery(testFile.getPath(), originData);
}
@Test
public void testWritingAlignedSeriesByColumnWithMultiComponents() throws IOException {
Map<IDeviceID, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originValue =
new HashMap<>();
TEST_CHUNK_SIZE = 10;
try (TsFileIOWriter writer = new TsFileIOWriter(testFile, 1024)) {
for (int i = 0; i < 5; i++) {
IDeviceID deviceId = sortedDeviceId.get(i);
writer.startChunkGroup(deviceId);
TSEncoding timeEncoding =
TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder());
TSDataType timeType = TSFileDescriptor.getInstance().getConfig().getTimeSeriesDataType();
Encoder encoder = TSEncodingBuilder.getEncodingBuilder(timeEncoding).getEncoder(timeType);
for (int chunkIdx = 0; chunkIdx < 10; ++chunkIdx) {
TimeChunkWriter timeChunkWriter =
new TimeChunkWriter("", CompressionType.SNAPPY, TSEncoding.PLAIN, encoder);
for (long j = TEST_CHUNK_SIZE * chunkIdx; j < TEST_CHUNK_SIZE * (chunkIdx + 1); ++j) {
timeChunkWriter.write(j);
}
timeChunkWriter.writeToFileWriter(writer);
}
writer.sortAndFlushChunkMetadata();
Assert.assertTrue(writer.hasChunkMetadataInDisk);
for (int k = 0; k < 1024; ++k) {
TSEncodingBuilder builder = TSEncodingBuilder.getEncodingBuilder(TSEncoding.PLAIN);
builder.initFromProps(null);
for (int chunkIdx = 0; chunkIdx < 10; ++chunkIdx) {
ValueChunkWriter chunkWriter =
new ValueChunkWriter(
sortedSeriesId.get(k),
CompressionType.SNAPPY,
TSDataType.DOUBLE,
TSEncoding.PLAIN,
builder.getEncoder(TSDataType.DOUBLE));
Random random = new Random();
List<Pair<Long, TsPrimitiveType>> valueList = new ArrayList<>();
for (long j = TEST_CHUNK_SIZE * chunkIdx; j < TEST_CHUNK_SIZE * (chunkIdx + 1); ++j) {
double val = random.nextDouble();
chunkWriter.write(j, val, false);
valueList.add(new Pair<>((long) j, new TsPrimitiveType.TsDouble(val)));
}
chunkWriter.writeToFileWriter(writer);
originValue
.computeIfAbsent(deviceId, x -> new HashMap<>())
.computeIfAbsent(sortedSeriesId.get(k), x -> new ArrayList<>())
.add(valueList);
}
writer.sortAndFlushChunkMetadata();
}
writer.endChunkGroup();
}
writer.endFile();
}
Assert.assertFalse(
new File(testFile.getAbsolutePath() + TsFileIOWriter.CHUNK_METADATA_TEMP_FILE_SUFFIX)
.exists());
TsFileIntegrityCheckingTool.checkIntegrityBySequenceRead(testFile.getPath());
TsFileIntegrityCheckingTool.checkIntegrityByQuery(testFile.getPath(), originValue);
}
@Test
public void testWritingCompleteMixedFiles() throws IOException {
Map<IDeviceID, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originData =
new HashMap<>();
try (TsFileIOWriter writer = new TsFileIOWriter(testFile, 1024)) {
for (int i = 0; i < 5; ++i) {
IDeviceID deviceId = sortedDeviceId.get(i);
for (int k = 0; k < 10; ++k) {
writer.startChunkGroup(deviceId);
List<List<Pair<Long, TsPrimitiveType>>> valList = new ArrayList<>();
AlignedChunkWriterImpl chunkWriter = generateVectorData(k * TEST_CHUNK_SIZE, valList, 6);
for (int j = 1; j <= 6; ++j) {
originData
.computeIfAbsent(deviceId, x -> new HashMap<>())
.computeIfAbsent("s" + j, x -> new ArrayList<>())
.add(valList.get(j - 1));
}
chunkWriter.writeToFileWriter(writer);
writer.endChunkGroup();
}
writer.checkMetadataSizeAndMayFlush();
}
for (int i = 5; i < 10; ++i) {
IDeviceID deviceId = sortedDeviceId.get(i);
writer.startChunkGroup(deviceId);
for (int j = 0; j < 5; ++j) {
ChunkWriterImpl chunkWriter;
switch (j) {
case 0:
for (int k = 0; k < 10; ++k) {
List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
chunkWriter = generateIntData(j, (long) TEST_CHUNK_SIZE * k, valList);
chunkWriter.writeToFileWriter(writer);
originData
.computeIfAbsent(deviceId, x -> new HashMap<>())
.computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
.add(valList);
}
break;
case 1:
for (int k = 0; k < 10; ++k) {
List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
chunkWriter = generateBooleanData(j, (long) TEST_CHUNK_SIZE * k, valList);
chunkWriter.writeToFileWriter(writer);
originData
.computeIfAbsent(deviceId, x -> new HashMap<>())
.computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
.add(valList);
}
break;
case 2:
for (int k = 0; k < 10; ++k) {
List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
chunkWriter = generateFloatData(j, (long) TEST_CHUNK_SIZE * k, valList);
chunkWriter.writeToFileWriter(writer);
originData
.computeIfAbsent(deviceId, x -> new HashMap<>())
.computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
.add(valList);
}
break;
case 3:
for (int k = 0; k < 10; ++k) {
List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
chunkWriter = generateDoubleData(j, (long) TEST_CHUNK_SIZE * k, valList);
chunkWriter.writeToFileWriter(writer);
originData
.computeIfAbsent(deviceId, x -> new HashMap<>())
.computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
.add(valList);
}
break;
case 4:
default:
for (int k = 0; k < 10; ++k) {
List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
chunkWriter = generateTextData(j, (long) TEST_CHUNK_SIZE * k, valList);
chunkWriter.writeToFileWriter(writer);
originData
.computeIfAbsent(deviceId, x -> new HashMap<>())
.computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
.add(valList);
}
break;
}
writer.checkMetadataSizeAndMayFlush();
}
writer.endChunkGroup();
}
writer.endFile();
Assert.assertTrue(writer.hasChunkMetadataInDisk);
}
Assert.assertFalse(
new File(testFile.getAbsolutePath() + TsFileIOWriter.CHUNK_METADATA_TEMP_FILE_SUFFIX)
.exists());
TsFileIntegrityCheckingTool.checkIntegrityBySequenceRead(testFile.getPath());
TsFileIntegrityCheckingTool.checkIntegrityByQuery(testFile.getPath(), originData);
}
@Test
public void testWritingAlignedSeriesByColumn() throws IOException {
Map<IDeviceID, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originValue =
new HashMap<>();
try (TsFileIOWriter writer = new TsFileIOWriter(testFile, 1024)) {
for (int i = 0; i < 5; i++) {
IDeviceID deviceId = sortedDeviceId.get(i);
writer.startChunkGroup(deviceId);
TSEncoding timeEncoding =
TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder());
TSDataType timeType = TSFileDescriptor.getInstance().getConfig().getTimeSeriesDataType();
Encoder encoder = TSEncodingBuilder.getEncodingBuilder(timeEncoding).getEncoder(timeType);
TimeChunkWriter timeChunkWriter =
new TimeChunkWriter("", CompressionType.SNAPPY, TSEncoding.PLAIN, encoder);
for (int j = 0; j < TEST_CHUNK_SIZE; ++j) {
timeChunkWriter.write(j);
}
timeChunkWriter.writeToFileWriter(writer);
writer.sortAndFlushChunkMetadata();
Assert.assertTrue(writer.hasChunkMetadataInDisk);
for (int k = 0; k < 5; ++k) {
TSEncodingBuilder builder = TSEncodingBuilder.getEncodingBuilder(TSEncoding.PLAIN);
builder.initFromProps(null);
ValueChunkWriter chunkWriter =
new ValueChunkWriter(
sortedSeriesId.get(k),
CompressionType.SNAPPY,
TSDataType.DOUBLE,
TSEncoding.PLAIN,
builder.getEncoder(TSDataType.DOUBLE));
Random random = new Random();
List<Pair<Long, TsPrimitiveType>> valueList = new ArrayList<>();
for (int j = 0; j < TEST_CHUNK_SIZE; ++j) {
double val = random.nextDouble();
chunkWriter.write(j, val, false);
valueList.add(new Pair<>((long) j, new TsPrimitiveType.TsDouble(val)));
}
chunkWriter.writeToFileWriter(writer);
originValue
.computeIfAbsent(deviceId, x -> new HashMap<>())
.computeIfAbsent(sortedSeriesId.get(k), x -> new ArrayList<>())
.add(valueList);
writer.sortAndFlushChunkMetadata();
}
writer.endChunkGroup();
}
writer.endFile();
}
Assert.assertFalse(
new File(testFile.getAbsolutePath() + TsFileIOWriter.CHUNK_METADATA_TEMP_FILE_SUFFIX)
.exists());
TsFileIntegrityCheckingTool.checkIntegrityBySequenceRead(testFile.getPath());
TsFileIntegrityCheckingTool.checkIntegrityByQuery(testFile.getPath(), originValue);
}
@Test
public void testWritingAlignedSeriesByColumnWithMultiChunks() throws IOException {
Map<IDeviceID, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originValue =
new HashMap<>();
try (TsFileIOWriter writer = new TsFileIOWriter(testFile, 1024)) {
for (int i = 0; i < 5; i++) {
IDeviceID deviceId = sortedDeviceId.get(i);
writer.startChunkGroup(deviceId);
TSEncoding timeEncoding =
TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder());
TSDataType timeType = TSFileDescriptor.getInstance().getConfig().getTimeSeriesDataType();
Encoder encoder = TSEncodingBuilder.getEncodingBuilder(timeEncoding).getEncoder(timeType);
for (int chunkIdx = 0; chunkIdx < 10; ++chunkIdx) {
TimeChunkWriter timeChunkWriter =
new TimeChunkWriter("", CompressionType.SNAPPY, TSEncoding.PLAIN, encoder);
for (long j = TEST_CHUNK_SIZE * chunkIdx; j < TEST_CHUNK_SIZE * (chunkIdx + 1); ++j) {
timeChunkWriter.write(j);
}
timeChunkWriter.writeToFileWriter(writer);
}
writer.sortAndFlushChunkMetadata();
Assert.assertTrue(writer.hasChunkMetadataInDisk);
for (int k = 0; k < 5; ++k) {
TSEncodingBuilder builder = TSEncodingBuilder.getEncodingBuilder(TSEncoding.PLAIN);
builder.initFromProps(null);
for (int chunkIdx = 0; chunkIdx < 10; ++chunkIdx) {
ValueChunkWriter chunkWriter =
new ValueChunkWriter(
sortedSeriesId.get(k),
CompressionType.SNAPPY,
TSDataType.DOUBLE,
TSEncoding.PLAIN,
builder.getEncoder(TSDataType.DOUBLE));
Random random = new Random();
List<Pair<Long, TsPrimitiveType>> valueList = new ArrayList<>();
for (long j = TEST_CHUNK_SIZE * chunkIdx; j < TEST_CHUNK_SIZE * (chunkIdx + 1); ++j) {
double val = random.nextDouble();
chunkWriter.write(j, val, false);
valueList.add(new Pair<>((long) j, new TsPrimitiveType.TsDouble(val)));
}
chunkWriter.writeToFileWriter(writer);
originValue
.computeIfAbsent(deviceId, x -> new HashMap<>())
.computeIfAbsent(sortedSeriesId.get(k), x -> new ArrayList<>())
.add(valueList);
}
writer.sortAndFlushChunkMetadata();
}
writer.endChunkGroup();
}
writer.endFile();
}
Assert.assertFalse(
new File(testFile.getAbsolutePath() + TsFileIOWriter.CHUNK_METADATA_TEMP_FILE_SUFFIX)
.exists());
TsFileIntegrityCheckingTool.checkIntegrityBySequenceRead(testFile.getPath());
TsFileIntegrityCheckingTool.checkIntegrityByQuery(testFile.getPath(), originValue);
}
/** The following tests is for writing mixed of normal series and aligned series */
private ChunkWriterImpl generateIntData(
int idx, long startTime, List<Pair<Long, TsPrimitiveType>> record) {
ChunkWriterImpl chunkWriter =
new ChunkWriterImpl(new MeasurementSchema(sortedSeriesId.get(idx), TSDataType.INT64));
Random random = new Random();
for (long i = startTime; i < startTime + TEST_CHUNK_SIZE; ++i) {
long val = random.nextLong();
chunkWriter.write(i, val);
record.add(new Pair<>(i, new TsPrimitiveType.TsLong(val)));
}
return chunkWriter;
}
private ChunkWriterImpl generateFloatData(
int idx, long startTime, List<Pair<Long, TsPrimitiveType>> record) {
ChunkWriterImpl chunkWriter =
new ChunkWriterImpl(new MeasurementSchema(sortedSeriesId.get(idx), TSDataType.FLOAT));
Random random = new Random();
for (long i = startTime; i < startTime + TEST_CHUNK_SIZE; ++i) {
float val = random.nextFloat();
chunkWriter.write(i, val);
record.add(new Pair<>(i, new TsPrimitiveType.TsFloat(val)));
}
return chunkWriter;
}
private ChunkWriterImpl generateDoubleData(
int idx, long startTime, List<Pair<Long, TsPrimitiveType>> record) {
ChunkWriterImpl chunkWriter =
new ChunkWriterImpl(new MeasurementSchema(sortedSeriesId.get(idx), TSDataType.DOUBLE));
Random random = new Random();
for (long i = startTime; i < startTime + TEST_CHUNK_SIZE; ++i) {
double val = random.nextDouble();
chunkWriter.write(i, val);
record.add(new Pair<>(i, new TsPrimitiveType.TsDouble(val)));
}
return chunkWriter;
}
private ChunkWriterImpl generateBooleanData(
int idx, long startTime, List<Pair<Long, TsPrimitiveType>> record) {
ChunkWriterImpl chunkWriter =
new ChunkWriterImpl(new MeasurementSchema(sortedSeriesId.get(idx), TSDataType.BOOLEAN));
Random random = new Random();
for (long i = startTime; i < startTime + TEST_CHUNK_SIZE; ++i) {
boolean val = random.nextBoolean();
chunkWriter.write(i, val);
record.add(new Pair<>(i, new TsPrimitiveType.TsBoolean(val)));
}
return chunkWriter;
}
private AlignedChunkWriterImpl generateVectorData(
long startTime, List<List<Pair<Long, TsPrimitiveType>>> record, int seriesNum) {
List<IMeasurementSchema> measurementSchemas = new ArrayList<>();
TSDataType[] dataTypes =
new TSDataType[] {
TSDataType.INT32,
TSDataType.INT64,
TSDataType.FLOAT,
TSDataType.DOUBLE,
TSDataType.BOOLEAN,
TSDataType.TEXT
};
for (int i = 0; i < seriesNum; ++i) {
measurementSchemas.add(new MeasurementSchema("s" + (i + 1), dataTypes[i % 6]));
}
AlignedChunkWriterImpl chunkWriter = new AlignedChunkWriterImpl(measurementSchemas);
Random random = new Random();
for (int i = 0; i < seriesNum; ++i) {
record.add(new ArrayList<>());
}
for (long i = startTime; i < startTime + TEST_CHUNK_SIZE; ++i) {
TsPrimitiveType[] points = new TsPrimitiveType[seriesNum];
for (int j = 0; j < seriesNum; ++j) {
switch (j % 6) {
case 0:
points[j] = new TsPrimitiveType.TsInt(random.nextInt());
break;
case 1:
points[j] = new TsPrimitiveType.TsLong(random.nextLong());
break;
case 2:
points[j] = new TsPrimitiveType.TsFloat(random.nextFloat());
break;
case 3:
points[j] = new TsPrimitiveType.TsDouble(random.nextDouble());
break;
case 4:
points[j] = new TsPrimitiveType.TsBoolean(random.nextBoolean());
break;
case 5:
points[j] =
new TsPrimitiveType.TsBinary(
new Binary(String.valueOf(random.nextDouble()), TSFileConfig.STRING_CHARSET));
break;
}
}
for (int j = 0; j < seriesNum; ++j) {
record.get(j).add(new Pair<>(i, points[j]));
}
chunkWriter.write(i, points);
}
return chunkWriter;
}
private ChunkWriterImpl generateTextData(
int idx, long startTime, List<Pair<Long, TsPrimitiveType>> record) {
ChunkWriterImpl chunkWriter =
new ChunkWriterImpl(new MeasurementSchema(sortedSeriesId.get(idx), TSDataType.TEXT));
Random random = new Random();
for (long i = startTime; i < startTime + TEST_CHUNK_SIZE; ++i) {
Binary val = new Binary(String.valueOf(random.nextDouble()), TSFileConfig.STRING_CHARSET);
chunkWriter.write(i, val);
record.add(new Pair<>(i, new TsPrimitiveType.TsBinary(val)));
}
return chunkWriter;
}
}