blob: 17bc698190d7976dee9acd4775e1c7f2f7dda985 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.tsfile.read;
import org.apache.iotdb.tsfile.exception.TsFileSequenceReaderTimeseriesMetadataIteratorException;
import org.apache.iotdb.tsfile.file.IMetadataIndexEntry;
import org.apache.iotdb.tsfile.file.metadata.DeviceMetadataIndexEntry;
import org.apache.iotdb.tsfile.file.metadata.IDeviceID;
import org.apache.iotdb.tsfile.file.metadata.MetadataIndexNode;
import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
import org.apache.iotdb.tsfile.file.metadata.enums.MetadataIndexNodeType;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Deque;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
public class TsFileSequenceReaderTimeseriesMetadataIterator
implements Iterator<Map<IDeviceID, List<TimeseriesMetadata>>> {
private static final int DEFAULT_TIMESERIES_BATCH_READ_NUMBER = 4000;
private final TsFileSequenceReader reader;
private final boolean needChunkMetadata;
private final int timeseriesBatchReadNumber;
private ByteBuffer currentBuffer = null;
private long currentEndOffset = Long.MIN_VALUE;
private final Deque<MetadataIndexEntryInfo> metadataIndexEntryStack = new ArrayDeque<>();
private IDeviceID currentDeviceId;
private int currentTimeseriesMetadataCount = 0;
public TsFileSequenceReaderTimeseriesMetadataIterator(
TsFileSequenceReader reader, boolean needChunkMetadata, int timeseriesBatchReadNumber)
throws IOException {
this.reader = reader;
this.needChunkMetadata = needChunkMetadata;
this.timeseriesBatchReadNumber = timeseriesBatchReadNumber;
if (this.reader.tsFileMetaData == null) {
this.reader.readFileMetadata();
}
final MetadataIndexNode metadataIndexNode = reader.tsFileMetaData.getMetadataIndex();
long curEntryEndOffset = metadataIndexNode.getEndOffset();
List<IMetadataIndexEntry> metadataIndexEntryList = metadataIndexNode.getChildren();
for (int i = metadataIndexEntryList.size() - 1; i >= 0; i--) {
metadataIndexEntryStack.push(
new MetadataIndexEntryInfo(
metadataIndexEntryList.get(i), metadataIndexNode.getNodeType(), curEntryEndOffset));
curEntryEndOffset = metadataIndexEntryList.get(i).getOffset();
}
}
public TsFileSequenceReaderTimeseriesMetadataIterator(
TsFileSequenceReader reader, boolean needChunkMetadata) throws IOException {
this(reader, needChunkMetadata, DEFAULT_TIMESERIES_BATCH_READ_NUMBER);
}
@Override
public boolean hasNext() {
return !metadataIndexEntryStack.isEmpty()
|| (currentBuffer != null && currentBuffer.hasRemaining());
}
@Override
public Map<IDeviceID, List<TimeseriesMetadata>> next() {
if (!hasNext()) {
throw new NoSuchElementException();
}
final Map<IDeviceID, List<TimeseriesMetadata>> timeseriesMetadataMap = new HashMap<>();
while (currentTimeseriesMetadataCount < timeseriesBatchReadNumber) {
// 1. Check Buffer
// currentTimeseriesMetadataCount has reached the limit in the previous
// loop and maybe there is still some data that remains in the buffer.
if (currentBuffer != null && currentBuffer.hasRemaining()) {
timeseriesMetadataMap
.computeIfAbsent(currentDeviceId, k -> new ArrayList<>())
.addAll(deserializeTimeseriesMetadata());
} else if (currentEndOffset > Long.MIN_VALUE) {
try {
timeseriesMetadataMap
.computeIfAbsent(currentDeviceId, k -> new ArrayList<>())
.addAll(deserializeTimeseriesMetadataUsingTsFileInput(currentEndOffset));
} catch (IOException e) {
throw new TsFileSequenceReaderTimeseriesMetadataIteratorException(
String.format(
"TsFileSequenceReaderTimeseriesMetadataIterator: deserializeTimeseriesMetadataUsingTsFileInput failed, "
+ "currentEndOffset: %d, "
+ e.getMessage(),
currentEndOffset));
}
}
if (currentTimeseriesMetadataCount >= timeseriesBatchReadNumber
|| metadataIndexEntryStack.isEmpty()) {
break;
}
// 2. Deserialize MetadataIndexEntry
final MetadataIndexEntryInfo indexEntryInfo = metadataIndexEntryStack.pop();
try {
deserializeMetadataIndexEntry(indexEntryInfo, timeseriesMetadataMap);
} catch (IOException e) {
throw new TsFileSequenceReaderTimeseriesMetadataIteratorException(
String.format(
"TsFileSequenceReaderTimeseriesMetadataIterator: deserializeMetadataIndexEntry failed, "
+ "MetadataIndexEntryInfo: %s, "
+ e.getMessage(),
indexEntryInfo));
}
}
// 3. Reset currentTimeseriesMetadataCount
if (currentTimeseriesMetadataCount >= timeseriesBatchReadNumber) {
currentTimeseriesMetadataCount = 0;
}
return timeseriesMetadataMap;
}
private void deserializeMetadataIndexEntry(
MetadataIndexEntryInfo metadataIndexEntryInfo,
Map<IDeviceID, List<TimeseriesMetadata>> timeseriesMetadataMap)
throws IOException {
if (metadataIndexEntryInfo
.getMetadataIndexNodeType()
.equals(MetadataIndexNodeType.LEAF_MEASUREMENT)) {
deserializeLeafMeasurement(
metadataIndexEntryInfo.getMetadataIndexEntry(),
metadataIndexEntryInfo.getEndOffset(),
timeseriesMetadataMap);
} else {
deserializeInternalNode(
metadataIndexEntryInfo.getMetadataIndexEntry(),
metadataIndexEntryInfo.getEndOffset(),
metadataIndexEntryInfo.getMetadataIndexNodeType());
}
}
private void deserializeLeafMeasurement(
IMetadataIndexEntry metadataIndexEntry,
long endOffset,
Map<IDeviceID, List<TimeseriesMetadata>> timeseriesMetadataMap)
throws IOException {
if (currentBuffer != null && currentBuffer.hasRemaining()) {
throw new TsFileSequenceReaderTimeseriesMetadataIteratorException(
"currentBuffer still has some data left before deserializeLeafMeasurement");
}
if (endOffset - metadataIndexEntry.getOffset() < Integer.MAX_VALUE) {
currentBuffer = reader.readData(metadataIndexEntry.getOffset(), endOffset);
timeseriesMetadataMap
.computeIfAbsent(currentDeviceId, k -> new ArrayList<>())
.addAll(deserializeTimeseriesMetadata());
} else {
currentEndOffset = endOffset;
reader.position(metadataIndexEntry.getOffset());
timeseriesMetadataMap
.computeIfAbsent(currentDeviceId, k -> new ArrayList<>())
.addAll(deserializeTimeseriesMetadataUsingTsFileInput(endOffset));
}
}
private List<TimeseriesMetadata> deserializeTimeseriesMetadata() {
final List<TimeseriesMetadata> timeseriesMetadataList = new ArrayList<>();
while (currentBuffer.hasRemaining()
&& currentTimeseriesMetadataCount < timeseriesBatchReadNumber) {
timeseriesMetadataList.add(
TimeseriesMetadata.deserializeFrom(currentBuffer, needChunkMetadata));
currentTimeseriesMetadataCount++;
}
return timeseriesMetadataList;
}
private List<TimeseriesMetadata> deserializeTimeseriesMetadataUsingTsFileInput(long endOffset)
throws IOException {
final List<TimeseriesMetadata> timeseriesMetadataList = new ArrayList<>();
while (reader.position() < endOffset
&& currentTimeseriesMetadataCount < DEFAULT_TIMESERIES_BATCH_READ_NUMBER) {
timeseriesMetadataList.add(
TimeseriesMetadata.deserializeFrom(reader.tsFileInput, needChunkMetadata));
currentTimeseriesMetadataCount++;
}
if (reader.position() >= endOffset) {
currentEndOffset = Long.MIN_VALUE;
}
return timeseriesMetadataList;
}
private void deserializeInternalNode(
IMetadataIndexEntry metadataIndexEntry, long endOffset, MetadataIndexNodeType type)
throws IOException {
if (MetadataIndexNodeType.LEAF_DEVICE.equals(type)) {
currentDeviceId = ((DeviceMetadataIndexEntry) metadataIndexEntry).getDeviceID();
}
boolean currentChildLevelIsDevice = MetadataIndexNodeType.INTERNAL_DEVICE.equals(type);
final MetadataIndexNode metadataIndexNode =
MetadataIndexNode.deserializeFrom(
reader.readData(metadataIndexEntry.getOffset(), endOffset), currentChildLevelIsDevice);
MetadataIndexNodeType metadataIndexNodeType = metadataIndexNode.getNodeType();
List<IMetadataIndexEntry> children = metadataIndexNode.getChildren();
long curEntryEndOffset = metadataIndexNode.getEndOffset();
for (int i = children.size() - 1; i >= 0; i--) {
metadataIndexEntryStack.push(
new MetadataIndexEntryInfo(children.get(i), metadataIndexNodeType, curEntryEndOffset));
curEntryEndOffset = children.get(i).getOffset();
}
}
private static class MetadataIndexEntryInfo {
private final IMetadataIndexEntry metadataIndexEntry;
private final MetadataIndexNodeType metadataIndexNodeType;
private final long endOffset;
public MetadataIndexEntryInfo(
IMetadataIndexEntry metadataIndexEntry,
MetadataIndexNodeType metadataIndexNodeType,
long endOffset) {
this.metadataIndexEntry = metadataIndexEntry;
this.metadataIndexNodeType = metadataIndexNodeType;
this.endOffset = endOffset;
}
public IMetadataIndexEntry getMetadataIndexEntry() {
return metadataIndexEntry;
}
public MetadataIndexNodeType getMetadataIndexNodeType() {
return metadataIndexNodeType;
}
public long getEndOffset() {
return endOffset;
}
}
}