blob: 0e08d2f0db4b3577f527d85e4c2f5fe5b804409c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.session.subscription;
import org.apache.iotdb.isession.ISessionDataSet;
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.Field;
import org.apache.iotdb.tsfile.read.common.RowRecord;
import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.write.record.Tablet;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.TreeMap;
import java.util.stream.Collectors;
public class SubscriptionSessionDataSet implements ISessionDataSet {
private Tablet tablet;
public Tablet getTablet() {
return tablet;
}
public SubscriptionSessionDataSet(Tablet tablet) {
this.tablet = tablet;
generateRowIterator();
}
/////////////////////////////// override ///////////////////////////////
private List<String> columnNameList;
private List<String> columnTypeList;
@Override
public List<String> getColumnNames() {
if (Objects.nonNull(columnNameList)) {
return columnNameList;
}
columnNameList = new ArrayList<>();
columnNameList.add("Time");
String deviceId = tablet.deviceId;
List<MeasurementSchema> schemas = tablet.getSchemas();
columnNameList.addAll(
schemas.stream()
.map((schema) -> deviceId + "." + schema.getMeasurementId())
.collect(Collectors.toList()));
return columnNameList;
}
@Override
public List<String> getColumnTypes() {
if (Objects.nonNull(columnTypeList)) {
return columnTypeList;
}
columnTypeList = new ArrayList<>();
columnTypeList.add(TSDataType.INT64.toString());
List<MeasurementSchema> schemas = tablet.getSchemas();
columnTypeList.addAll(
schemas.stream().map((schema) -> schema.getType().toString()).collect(Collectors.toList()));
return columnTypeList;
}
public boolean hasNext() {
return rowIterator.hasNext();
}
@Override
public RowRecord next() {
Map.Entry<Long, Integer> entry = this.rowIterator.next();
final int columnSize = getColumnSize();
final List<Field> fields = new ArrayList<>();
final long timestamp = entry.getKey();
final int rowIndex = entry.getValue();
for (int columnIndex = 0; columnIndex < columnSize; ++columnIndex) {
final Field field;
if (tablet.bitMaps[columnIndex].isMarked(rowIndex)) {
field = new Field(null);
} else {
TSDataType dataType = tablet.getSchemas().get(columnIndex).getType();
field = generateFieldFromTabletValue(dataType, tablet.values[columnIndex], rowIndex);
}
fields.add(field);
}
return new RowRecord(timestamp, fields);
}
@Override
public void close() throws Exception {
// maybe friendly for gc
tablet = null;
}
/////////////////////////////// utility ///////////////////////////////
private Iterator<Map.Entry<Long, Integer>> rowIterator;
private int getColumnSize() {
return tablet.getSchemas().size();
}
private void generateRowIterator() {
// timestamp -> row index
long[] timestamps = tablet.timestamps;
TreeMap<Long, Integer> timestampToRowIndex = new TreeMap<>();
final int rowSize = timestamps.length;
for (int rowIndex = 0; rowIndex < rowSize; ++rowIndex) {
Long timestamp = timestamps[rowIndex];
timestampToRowIndex.put(timestamp, rowIndex);
}
this.rowIterator = timestampToRowIndex.entrySet().iterator();
}
private static Field generateFieldFromTabletValue(TSDataType dataType, Object value, int index) {
final Field field = new Field(dataType);
switch (dataType) {
case BOOLEAN:
boolean booleanValue = ((boolean[]) value)[index];
field.setBoolV(booleanValue);
break;
case INT32:
int intValue = ((int[]) value)[index];
field.setIntV(intValue);
break;
case INT64:
long longValue = ((long[]) value)[index];
field.setLongV(longValue);
break;
case FLOAT:
float floatValue = ((float[]) value)[index];
field.setFloatV(floatValue);
break;
case DOUBLE:
double doubleValue = ((double[]) value)[index];
field.setDoubleV(doubleValue);
break;
case TEXT:
Binary binaryValue = new Binary((((Binary[]) value)[index]).getValues());
field.setBinaryV(binaryValue);
break;
default:
throw new UnSupportedDataTypeException(
String.format("Data type %s is not supported.", dataType));
}
return field;
}
}