blob: 52701cc21022b19481ef9f86b1f6825e052b95f4 [file]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { logger } from "../utils/Logger";
/**
* Column encoding types matching Apache IoTDB ColumnEncoding enum
*/
export enum ColumnEncoding {
ByteArray = 0, // For BOOLEAN
Int32Array = 1, // For INT32, DATE, FLOAT
Int64Array = 2, // For INT64, TIMESTAMP, DOUBLE
BinaryArray = 3, // For TEXT, STRING, BLOB
Rle = 4, // Run-Length Encoding (compression)
}
/**
* Decoded column interface - stores all values for a column
*/
export interface Column {
dataType: number;
encoding: ColumnEncoding;
values: any[];
nullIndicators: boolean[] | null;
positionCount: number;
}
/**
* Column decoder interface
*/
export interface ColumnDecoder {
readColumn(
buffer: Buffer,
offset: number,
dataType: number,
positionCount: number,
): { column: Column; bytesRead: number };
}
/**
* Helper functions for deserializing column data
*/
export class ColumnDeserializer {
/**
* Deserialize null indicators (bitmap) for a column
* Format: 1 byte flag (0=no nulls, 1=has nulls) + optional bitmap
*/
static deserializeNullIndicators(
buffer: Buffer,
offset: number,
positionCount: number,
): { nullIndicators: boolean[] | null; bytesRead: number } {
const mayHaveNull = buffer.readUInt8(offset) !== 0;
let bytesRead = 1;
if (!mayHaveNull) {
return { nullIndicators: null, bytesRead };
}
// Read boolean array bitmap
const { values, bytesRead: bitmapBytes } = this.deserializeBooleanArray(
buffer,
offset + 1,
positionCount,
);
bytesRead += bitmapBytes;
return { nullIndicators: values, bytesRead };
}
/**
* Deserialize boolean array from packed bitmap
* Each byte stores 8 boolean values (1 bit per value)
*/
static deserializeBooleanArray(
buffer: Buffer,
offset: number,
size: number,
): { values: boolean[]; bytesRead: number } {
const packedSize = Math.ceil(size / 8);
const values: boolean[] = new Array(size);
let currentByte = 0;
const fullGroups = size & ~0b111; // Round down to nearest 8
// Process full groups of 8
for (let pos = 0; pos < fullGroups; pos += 8) {
const b = buffer.readUInt8(offset + currentByte++);
values[pos + 0] = (b & 0b10000000) !== 0;
values[pos + 1] = (b & 0b01000000) !== 0;
values[pos + 2] = (b & 0b00100000) !== 0;
values[pos + 3] = (b & 0b00010000) !== 0;
values[pos + 4] = (b & 0b00001000) !== 0;
values[pos + 5] = (b & 0b00000100) !== 0;
values[pos + 6] = (b & 0b00000010) !== 0;
values[pos + 7] = (b & 0b00000001) !== 0;
}
// Handle remaining bits
if (size % 8 !== 0) {
const b = buffer.readUInt8(offset + currentByte++);
let mask = 0b10000000;
for (let pos = fullGroups; pos < size; pos++) {
values[pos] = (b & mask) !== 0;
mask >>= 1;
}
}
return { values, bytesRead: packedSize };
}
}
/**
* Decoder for INT32 array encoding (encoding=1)
* Handles INT32, DATE, and FLOAT data types
*/
class Int32ArrayColumnDecoder implements ColumnDecoder {
readColumn(
buffer: Buffer,
offset: number,
dataType: number,
positionCount: number,
): { column: Column; bytesRead: number } {
let currentOffset = offset;
// Read null indicators
const { nullIndicators, bytesRead: nullBytes } =
ColumnDeserializer.deserializeNullIndicators(
buffer,
currentOffset,
positionCount,
);
currentOffset += nullBytes;
const values: any[] = new Array(positionCount);
switch (dataType) {
case 1: // INT32
case 9: // DATE
for (let i = 0; i < positionCount; i++) {
if (nullIndicators && nullIndicators[i]) {
values[i] = null;
continue;
}
values[i] = buffer.readInt32BE(currentOffset);
currentOffset += 4;
}
break;
case 3: // FLOAT
for (let i = 0; i < positionCount; i++) {
if (nullIndicators && nullIndicators[i]) {
values[i] = null;
continue;
}
values[i] = buffer.readFloatBE(currentOffset);
currentOffset += 4;
}
break;
default:
throw new Error(
`Invalid data type ${dataType} for Int32ArrayColumnDecoder`,
);
}
return {
column: {
dataType,
encoding: ColumnEncoding.Int32Array,
values,
nullIndicators,
positionCount,
},
bytesRead: currentOffset - offset,
};
}
}
/**
* Decoder for INT64 array encoding (encoding=2)
* Handles INT64, TIMESTAMP, and DOUBLE data types
*/
class Int64ArrayColumnDecoder implements ColumnDecoder {
readColumn(
buffer: Buffer,
offset: number,
dataType: number,
positionCount: number,
): { column: Column; bytesRead: number } {
let currentOffset = offset;
// Read null indicators
const { nullIndicators, bytesRead: nullBytes } =
ColumnDeserializer.deserializeNullIndicators(
buffer,
currentOffset,
positionCount,
);
currentOffset += nullBytes;
const values: any[] = new Array(positionCount);
switch (dataType) {
case 2: // INT64
case 8: // TIMESTAMP
for (let i = 0; i < positionCount; i++) {
if (nullIndicators && nullIndicators[i]) {
values[i] = null;
continue;
}
values[i] = buffer.readBigInt64BE(currentOffset);
currentOffset += 8;
}
break;
case 4: // DOUBLE
for (let i = 0; i < positionCount; i++) {
if (nullIndicators && nullIndicators[i]) {
values[i] = null;
continue;
}
values[i] = buffer.readDoubleBE(currentOffset);
currentOffset += 8;
}
break;
default:
throw new Error(
`Invalid data type ${dataType} for Int64ArrayColumnDecoder`,
);
}
return {
column: {
dataType,
encoding: ColumnEncoding.Int64Array,
values,
nullIndicators,
positionCount,
},
bytesRead: currentOffset - offset,
};
}
}
/**
* Decoder for BOOLEAN byte array encoding (encoding=0)
*/
class ByteArrayColumnDecoder implements ColumnDecoder {
readColumn(
buffer: Buffer,
offset: number,
dataType: number,
positionCount: number,
): { column: Column; bytesRead: number } {
if (dataType !== 0) {
// BOOLEAN
throw new Error(
`Invalid data type ${dataType} for ByteArrayColumnDecoder`,
);
}
let currentOffset = offset;
// Read null indicators
const { nullIndicators, bytesRead: nullBytes } =
ColumnDeserializer.deserializeNullIndicators(
buffer,
currentOffset,
positionCount,
);
currentOffset += nullBytes;
// Read boolean values
const { values, bytesRead: boolBytes } =
ColumnDeserializer.deserializeBooleanArray(
buffer,
currentOffset,
positionCount,
);
currentOffset += boolBytes;
return {
column: {
dataType,
encoding: ColumnEncoding.ByteArray,
values,
nullIndicators,
positionCount,
},
bytesRead: currentOffset - offset,
};
}
}
/**
* Decoder for BINARY array encoding (encoding=3)
* Handles TEXT, STRING, and BLOB data types with variable-length values
*/
class BinaryArrayColumnDecoder implements ColumnDecoder {
readColumn(
buffer: Buffer,
offset: number,
dataType: number,
positionCount: number,
): { column: Column; bytesRead: number } {
// Supports TEXT(5), BLOB(10), STRING(11)
if (dataType !== 5 && dataType !== 10 && dataType !== 11) {
throw new Error(
`Invalid data type ${dataType} for BinaryArrayColumnDecoder`,
);
}
let currentOffset = offset;
// Read null indicators
const { nullIndicators, bytesRead: nullBytes } =
ColumnDeserializer.deserializeNullIndicators(
buffer,
currentOffset,
positionCount,
);
currentOffset += nullBytes;
const values: any[] = new Array(positionCount);
for (let i = 0; i < positionCount; i++) {
if (nullIndicators && nullIndicators[i]) {
values[i] = null;
continue;
}
// Read length (INT32 BE - Java standard for string/binary lengths)
const length = buffer.readInt32BE(currentOffset);
currentOffset += 4;
// Read data
const data = buffer.slice(currentOffset, currentOffset + length);
currentOffset += length;
// Convert to appropriate type
if (dataType === 10) {
// BLOB - keep as Buffer
values[i] = data;
} else {
// TEXT/STRING - convert to UTF-8 string
values[i] = data.toString("utf8");
}
}
return {
column: {
dataType,
encoding: ColumnEncoding.BinaryArray,
values,
nullIndicators,
positionCount,
},
bytesRead: currentOffset - offset,
};
}
}
/**
* Decoder for Run-Length Encoding (encoding=4)
* Stores a single value that repeats for all positions (compression)
*/
class RunLengthColumnDecoder implements ColumnDecoder {
readColumn(
buffer: Buffer,
offset: number,
dataType: number,
positionCount: number,
): { column: Column; bytesRead: number } {
let currentOffset = offset;
// Read the inner encoding type (1 byte)
const innerEncoding: ColumnEncoding = buffer.readUInt8(currentOffset);
currentOffset += 1;
// Get decoder for inner encoding
const innerDecoder = BaseColumnDecoder.getDecoder(innerEncoding);
// Read single value (positionCount=1)
const { column: innerColumn, bytesRead: innerBytes } =
innerDecoder.readColumn(buffer, currentOffset, dataType, 1);
currentOffset += innerBytes;
// Expand the single value to fill all positions
const singleValue = innerColumn.values[0];
const values: any[] = new Array(positionCount).fill(singleValue);
// Check if the single value is null
const isNull = innerColumn.nullIndicators && innerColumn.nullIndicators[0];
const nullIndicators = isNull ? new Array(positionCount).fill(true) : null;
return {
column: {
dataType,
encoding: ColumnEncoding.Rle,
values,
nullIndicators,
positionCount,
},
bytesRead: currentOffset - offset,
};
}
}
/**
* Base column decoder factory (declared after decoder classes)
*/
export class BaseColumnDecoder {
private static decoders: Map<ColumnEncoding, ColumnDecoder> = new Map([
[ColumnEncoding.Int32Array, new Int32ArrayColumnDecoder()],
[ColumnEncoding.Int64Array, new Int64ArrayColumnDecoder()],
[ColumnEncoding.ByteArray, new ByteArrayColumnDecoder()],
[ColumnEncoding.BinaryArray, new BinaryArrayColumnDecoder()],
[ColumnEncoding.Rle, new RunLengthColumnDecoder()],
]);
static getDecoder(encoding: ColumnEncoding): ColumnDecoder {
const decoder = this.decoders.get(encoding);
if (!decoder) {
throw new Error(`Unsupported encoding: ${encoding}`);
}
return decoder;
}
}