add FLEA (#15773)

diff --git a/client-cpp/src/main/Session.h b/client-cpp/src/main/Session.h
index 7d10356..c2de083 100644
--- a/client-cpp/src/main/Session.h
+++ b/client-cpp/src/main/Session.h
@@ -217,7 +217,7 @@
         GORILLA = (char)8,
         ZIGZAG = (char)9,
         FREQ = (char)10,
-        PERIOD = (char)14
+        FLEA = (char)14
     };
 }
 
diff --git a/client-py/iotdb/utils/IoTDBConstants.py b/client-py/iotdb/utils/IoTDBConstants.py
index eda947e..00c4d8b 100644
--- a/client-py/iotdb/utils/IoTDBConstants.py
+++ b/client-py/iotdb/utils/IoTDBConstants.py
@@ -61,7 +61,7 @@
     GORILLA = 8
     ZIGZAG = 9
     FREQ = 10
-    PERIOD = 14
+    FLEA = 14
 
     # this method is implemented to avoid the issue reported by:
     # https://bugs.python.org/issue30545
diff --git a/docs/UserGuide/Data-Concept/Encoding.md b/docs/UserGuide/Data-Concept/Encoding.md
index 6fe412f..4771b1c 100644
--- a/docs/UserGuide/Data-Concept/Encoding.md
+++ b/docs/UserGuide/Data-Concept/Encoding.md
@@ -64,11 +64,9 @@
   
 ZIGZAG encoding maps signed integers to unsigned integers so that numbers with a small absolute value (for instance, -1) have a small variant encoded value too. It does this in a way that "zig-zags" back and forth through the positive and negative integers.
 
-* PERIOD
+* FLEA
 
-PERIOD encoding is suitable for encoding data that is periodic in nature. By calculating the periodicity of the data, it is possible to use lossy frequency-domain data to preserve the periodic character of the data, thus applying a smaller space for storing the residual data.
-
-For the residual data, since most of the residual data is small and a small portion of the residual data is large, PERIOD encoding employs a separate storage method, storing the high and low bits of the residual data separately.
+FLEA (Frequency-based Lossless Encoding Algorithm) is a novel lossless compression algorithm designed specifically for patterned time series. It uniquely leverages frequency-domain analysis within a rate-optimal framework, decomposing data into frequency and residual components whose encoding costs are jointly minimized by adaptively selecting a quantization parameter. By further employing a bi-regional encoding strategy with specialized coders for dense and sparse frequency data, alongside a hybrid residual encoder, FLEA achieves state-of-the-art compression ratios, particularly on data exhibiting periodic or cyclical structures.
 
 ## Correspondence between data type and encoding
 
diff --git a/iotdb-client/client-cpp/src/main/Session.h b/iotdb-client/client-cpp/src/main/Session.h
index 24b2622..6ab03d8 100644
--- a/iotdb-client/client-cpp/src/main/Session.h
+++ b/iotdb-client/client-cpp/src/main/Session.h
@@ -231,7 +231,7 @@
         CHIMP = (char)11,
         SPRINTZ = (char)12,
         RLBE = (char)13,
-        PERIOD = (char)14
+        FLEA = (char)14
     };
 }
 
diff --git a/iotdb-client/client-py/iotdb/utils/IoTDBConstants.py b/iotdb-client/client-py/iotdb/utils/IoTDBConstants.py
index 3dfbe3c..85ceeed 100644
--- a/iotdb-client/client-py/iotdb/utils/IoTDBConstants.py
+++ b/iotdb-client/client-py/iotdb/utils/IoTDBConstants.py
@@ -54,7 +54,7 @@
     CHIMP = 11
     SPRINTZ = 12
     RLBE = 13
-    PERIOD = 14
+    FLEA = 14
 
 
 @unique
diff --git a/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBDatabaseMetadata.java b/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBDatabaseMetadata.java
index 6f35f51..5aa715a 100644
--- a/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBDatabaseMetadata.java
+++ b/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBDatabaseMetadata.java
@@ -238,7 +238,7 @@
       "ON",
       "OFF",
       "OF",
-      "PERIOD",
+      "FLEA",
       "PROCESSLIST",
       "PREVIOUS",
       "PREVIOUSUNTILLAST",
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/SchemaUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/SchemaUtils.java
index 714689a..26a2a03 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/SchemaUtils.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/SchemaUtils.java
@@ -59,7 +59,7 @@
     intSet.add(TSEncoding.CHIMP);
     intSet.add(TSEncoding.SPRINTZ);
     intSet.add(TSEncoding.RLBE);
-    intSet.add(TSEncoding.PERIOD);
+    intSet.add(TSEncoding.FLEA);
 
     schemaChecker.put(TSDataType.INT32, intSet);
     schemaChecker.put(TSDataType.INT64, intSet);
diff --git a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBDatabaseMetadata.java b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBDatabaseMetadata.java
index 3df1318..c4044a3 100644
--- a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBDatabaseMetadata.java
+++ b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBDatabaseMetadata.java
@@ -66,8 +66,8 @@
   private static final String METHOD_NOT_SUPPORTED_STRING = "Method not supported";
   // when running the program in IDE, we can not get the version info using
   // getImplementationVersion()
-  private static final String DATABASE_VERSION =
-      IoTDBDatabaseMetadata.class.getPackage().getImplementationVersion() != null
+  private static final String DATABASE_VERSION = IoTDBDatabaseMetadata.class.getPackage()
+      .getImplementationVersion() != null
           ? IoTDBDatabaseMetadata.class.getPackage().getImplementationVersion()
           : "UNKNOWN";
   private long sessionId;
@@ -84,382 +84,382 @@
 
   static {
     String[] allIotdbSQLKeywords = {
-      "ALTER",
-      "ADD",
-      "ALIAS",
-      "ALL",
-      "AVG",
-      "ALIGN",
-      "ATTRIBUTES",
-      "AS",
-      "ASC",
-      "BY",
-      "BOOLEAN",
-      "BITMAP",
-      "CREATE",
-      "CONFIGURATION",
-      "COMPRESSOR",
-      "CHILD",
-      "COUNT",
-      "COMPRESSION",
-      "CLEAR",
-      "CACHE",
-      "CONTAIN",
-      "CONCAT",
-      "DELETE",
-      "DEVICE",
-      "DESCRIBE",
-      "DATATYPE",
-      "DOUBLE",
-      "DIFF",
-      "DROP",
-      "DEVICES",
-      "DISABLE",
-      "DESC",
-      "ENCODING",
-      "FROM",
-      "FILL",
-      "FLOAT",
-      "FLUSH",
-      "FIRST_VALUE",
-      "FULL",
-      "FALSE",
-      "FOR",
-      "FUNCTION",
-      "FUNCTIONS",
-      "GRANT",
-      "GROUP",
-      "GORILLA",
-      "GLOBAL",
-      "GZIP",
-      "INSERT",
-      "INTO",
-      "INT32",
-      "INT64",
-      "INDEX",
-      "INFO",
-      "KILL",
-      "LIMIT",
-      "LINEAR",
-      "LABEL",
-      "LINK",
-      "LIST",
-      "LOAD",
-      "LEVEL",
-      "LAST_VALUE",
-      "LAST",
-      "LZO",
-      "LZ4",
-      "LATEST",
-      "LIKE",
-      "METADATA",
-      "MERGE",
-      "MOVE",
-      "MIN_TIME",
-      "MAX_TIME",
-      "MIN_VALUE",
-      "MAX_VALUE",
-      "NOW",
-      "NODES",
-      "ORDER",
-      "OFFSET",
-      "ON",
-      "OFF",
-      "OF",
-      "PERIOD",
-      "PROCESSLIST",
-      "PREVIOUS",
-      "PREVIOUSUNTILLAST",
-      "PROPERTY",
-      "PLAIN",
-      "PLAIN_DICTIONARY",
-      "PRIVILEGES",
-      "PASSWORD",
-      "PATHS",
-      "PAA",
-      "PLA",
-      "PARTITION",
-      "QUERY",
-      "ROOT",
-      "RLE",
-      "REGULAR",
-      "ROLE",
-      "REVOKE",
-      "REMOVE",
-      "RENAME",
-      "SELECT",
-      "SHOW",
-      "SET",
-      "SLIMIT",
-      "SOFFSET",
-      "STORAGE",
-      "SUM",
-      "SNAPPY",
-      "SNAPSHOT",
-      "SCHEMA",
-      "TO",
-      "TIMESERIES",
-      "TIMESTAMP",
-      "TEXT",
-      "TS_2DIFF",
-      "TRACING",
-      "TTL",
-      "TASK",
-      "TIME",
-      "TAGS",
-      "TRUE",
-      "TEMPORARY",
-      "TOP",
-      "TOLERANCE",
-      "UPDATE",
-      "UNLINK",
-      "UPSERT",
-      "USING",
-      "USER",
-      "UNSET",
-      "UNCOMPRESSED",
-      "VALUES",
-      "VERSION",
-      "WHERE",
-      "WITH",
-      "WATERMARK_EMBEDDING"
+        "ALTER",
+        "ADD",
+        "ALIAS",
+        "ALL",
+        "AVG",
+        "ALIGN",
+        "ATTRIBUTES",
+        "AS",
+        "ASC",
+        "BY",
+        "BOOLEAN",
+        "BITMAP",
+        "CREATE",
+        "CONFIGURATION",
+        "COMPRESSOR",
+        "CHILD",
+        "COUNT",
+        "COMPRESSION",
+        "CLEAR",
+        "CACHE",
+        "CONTAIN",
+        "CONCAT",
+        "DELETE",
+        "DEVICE",
+        "DESCRIBE",
+        "DATATYPE",
+        "DOUBLE",
+        "DIFF",
+        "DROP",
+        "DEVICES",
+        "DISABLE",
+        "DESC",
+        "ENCODING",
+        "FROM",
+        "FILL",
+        "FLOAT",
+        "FLUSH",
+        "FIRST_VALUE",
+        "FULL",
+        "FALSE",
+        "FOR",
+        "FUNCTION",
+        "FUNCTIONS",
+        "GRANT",
+        "GROUP",
+        "GORILLA",
+        "GLOBAL",
+        "GZIP",
+        "INSERT",
+        "INTO",
+        "INT32",
+        "INT64",
+        "INDEX",
+        "INFO",
+        "KILL",
+        "LIMIT",
+        "LINEAR",
+        "LABEL",
+        "LINK",
+        "LIST",
+        "LOAD",
+        "LEVEL",
+        "LAST_VALUE",
+        "LAST",
+        "LZO",
+        "LZ4",
+        "LATEST",
+        "LIKE",
+        "METADATA",
+        "MERGE",
+        "MOVE",
+        "MIN_TIME",
+        "MAX_TIME",
+        "MIN_VALUE",
+        "MAX_VALUE",
+        "NOW",
+        "NODES",
+        "ORDER",
+        "OFFSET",
+        "ON",
+        "OFF",
+        "OF",
+        "FLEA",
+        "PROCESSLIST",
+        "PREVIOUS",
+        "PREVIOUSUNTILLAST",
+        "PROPERTY",
+        "PLAIN",
+        "PLAIN_DICTIONARY",
+        "PRIVILEGES",
+        "PASSWORD",
+        "PATHS",
+        "PAA",
+        "PLA",
+        "PARTITION",
+        "QUERY",
+        "ROOT",
+        "RLE",
+        "REGULAR",
+        "ROLE",
+        "REVOKE",
+        "REMOVE",
+        "RENAME",
+        "SELECT",
+        "SHOW",
+        "SET",
+        "SLIMIT",
+        "SOFFSET",
+        "STORAGE",
+        "SUM",
+        "SNAPPY",
+        "SNAPSHOT",
+        "SCHEMA",
+        "TO",
+        "TIMESERIES",
+        "TIMESTAMP",
+        "TEXT",
+        "TS_2DIFF",
+        "TRACING",
+        "TTL",
+        "TASK",
+        "TIME",
+        "TAGS",
+        "TRUE",
+        "TEMPORARY",
+        "TOP",
+        "TOLERANCE",
+        "UPDATE",
+        "UNLINK",
+        "UPSERT",
+        "USING",
+        "USER",
+        "UNSET",
+        "UNCOMPRESSED",
+        "VALUES",
+        "VERSION",
+        "WHERE",
+        "WITH",
+        "WATERMARK_EMBEDDING"
     };
     String[] sql92Keywords = {
-      "ABSOLUTE",
-      "EXEC",
-      "OVERLAPS",
-      "ACTION",
-      "EXECUTE",
-      "PAD",
-      "ADA",
-      "EXISTS",
-      "PARTIAL",
-      "ADD",
-      "EXTERNAL",
-      "PASCAL",
-      "ALL",
-      "EXTRACT",
-      "POSITION",
-      "ALLOCATE",
-      "FALSE",
-      "PRECISION",
-      "ALTER",
-      "FETCH",
-      "PREPARE",
-      "AND",
-      "FIRST",
-      "PRESERVE",
-      "ANY",
-      "FLOAT",
-      "PRIMARY",
-      "ARE",
-      "FOR",
-      "PRIOR",
-      "AS",
-      "FOREIGN",
-      "PRIVILEGES",
-      "ASC",
-      "FORTRAN",
-      "PROCEDURE",
-      "ASSERTION",
-      "FOUND",
-      "PUBLIC",
-      "AT",
-      "FROM",
-      "READ",
-      "AUTHORIZATION",
-      "FULL",
-      "REAL",
-      "AVG",
-      "GET",
-      "REFERENCES",
-      "BEGIN",
-      "GLOBAL",
-      "RELATIVE",
-      "BETWEEN",
-      "GO",
-      "RESTRICT",
-      "BIT",
-      "GOTO",
-      "REVOKE",
-      "BIT_LENGTH",
-      "GRANT",
-      "RIGHT",
-      "BOTH",
-      "GROUP",
-      "ROLLBACK",
-      "BY",
-      "HAVING",
-      "ROWS",
-      "CASCADE",
-      "HOUR",
-      "SCHEMA",
-      "CASCADED",
-      "IDENTITY",
-      "SCROLL",
-      "CASE",
-      "IMMEDIATE",
-      "SECOND",
-      "CAST",
-      "IN",
-      "SECTION",
-      "CATALOG",
-      "INCLUDE",
-      "SELECT",
-      "CHAR",
-      "INDEX",
-      "SESSION",
-      "CHAR_LENGTH",
-      "INDICATOR",
-      "SESSION_USER",
-      "CHARACTER",
-      "INITIALLY",
-      "SET",
-      "CHARACTER_LENGTH",
-      "INNER",
-      "SIZE",
-      "CHECK",
-      "INPUT",
-      "SMALLINT",
-      "CLOSE",
-      "INSENSITIVE",
-      "SOME",
-      "COALESCE",
-      "INSERT",
-      "SPACE",
-      "COLLATE",
-      "INT",
-      "SQL",
-      "COLLATION",
-      "INTEGER",
-      "SQLCA",
-      "COLUMN",
-      "INTERSECT",
-      "SQLCODE",
-      "COMMIT",
-      "INTERVAL",
-      "SQLERROR",
-      "CONNECT",
-      "INTO",
-      "SQLSTATE",
-      "CONNECTION",
-      "IS",
-      "SQLWARNING",
-      "CONSTRAINT",
-      "ISOLATION",
-      "SUBSTRING",
-      "CONSTRAINTS",
-      "JOIN",
-      "SUM",
-      "CONTINUE",
-      "KEY",
-      "SYSTEM_USER",
-      "CONVERT",
-      "LANGUAGE",
-      "TABLE",
-      "CORRESPONDING",
-      "LAST",
-      "TEMPORARY",
-      "COUNT",
-      "LEADING",
-      "THEN",
-      "CREATE",
-      "LEFT",
-      "TIME",
-      "CROSS",
-      "LEVEL",
-      "TIMESTAMP",
-      "CURRENT",
-      "LIKE",
-      "TIMEZONE_HOUR",
-      "CURRENT_DATE",
-      "LOCAL",
-      "TIMEZONE_MINUTE",
-      "CURRENT_TIME",
-      "LOWER",
-      "TO",
-      "CURRENT_TIMESTAMP",
-      "MATCH",
-      "TRAILING",
-      "CURRENT_USER",
-      "MAX",
-      "TRANSACTION",
-      "CURSOR",
-      "MIN",
-      "TRANSLATE",
-      "DATE",
-      "MINUTE",
-      "TRANSLATION",
-      "DAY",
-      "MODULE",
-      "TRIM",
-      "DEALLOCATE",
-      "MONTH",
-      "TRUE",
-      "DEC",
-      "NAMES",
-      "UNION",
-      "DECIMAL",
-      "NATIONAL",
-      "UNIQUE",
-      "DECLARE",
-      "NATURAL",
-      "UNKNOWN",
-      "DEFAULT",
-      "NCHAR",
-      "UPDATE",
-      "DEFERRABLE",
-      "NEXT",
-      "UPPER",
-      "DEFERRED",
-      "NO",
-      "USAGE",
-      "DELETE",
-      "NONE",
-      "USER",
-      "DESC",
-      "NOT",
-      "USING",
-      "DESCRIBE",
-      "NULL",
-      "VALUE",
-      "DESCRIPTOR",
-      "NULLIF",
-      "VALUES",
-      "DIAGNOSTICS",
-      "NUMERIC",
-      "VARCHAR",
-      "DISCONNECT",
-      "OCTET_LENGTH",
-      "VARYING",
-      "DISTINCT",
-      "OF",
-      "VIEW",
-      "DOMAIN",
-      "ON",
-      "WHEN",
-      "DOUBLE",
-      "ONLY",
-      "WHENEVER",
-      "DROP",
-      "OPEN",
-      "WHERE",
-      "ELSE",
-      "OPTION",
-      "WITH",
-      "END",
-      "OR",
-      "WORK",
-      "END-EXEC",
-      "ORDER",
-      "WRITE",
-      "ESCAPE",
-      "OUTER",
-      "YEAR",
-      "EXCEPT",
-      "OUTPUT",
-      "ZONE",
-      "EXCEPTION"
+        "ABSOLUTE",
+        "EXEC",
+        "OVERLAPS",
+        "ACTION",
+        "EXECUTE",
+        "PAD",
+        "ADA",
+        "EXISTS",
+        "PARTIAL",
+        "ADD",
+        "EXTERNAL",
+        "PASCAL",
+        "ALL",
+        "EXTRACT",
+        "POSITION",
+        "ALLOCATE",
+        "FALSE",
+        "PRECISION",
+        "ALTER",
+        "FETCH",
+        "PREPARE",
+        "AND",
+        "FIRST",
+        "PRESERVE",
+        "ANY",
+        "FLOAT",
+        "PRIMARY",
+        "ARE",
+        "FOR",
+        "PRIOR",
+        "AS",
+        "FOREIGN",
+        "PRIVILEGES",
+        "ASC",
+        "FORTRAN",
+        "PROCEDURE",
+        "ASSERTION",
+        "FOUND",
+        "PUBLIC",
+        "AT",
+        "FROM",
+        "READ",
+        "AUTHORIZATION",
+        "FULL",
+        "REAL",
+        "AVG",
+        "GET",
+        "REFERENCES",
+        "BEGIN",
+        "GLOBAL",
+        "RELATIVE",
+        "BETWEEN",
+        "GO",
+        "RESTRICT",
+        "BIT",
+        "GOTO",
+        "REVOKE",
+        "BIT_LENGTH",
+        "GRANT",
+        "RIGHT",
+        "BOTH",
+        "GROUP",
+        "ROLLBACK",
+        "BY",
+        "HAVING",
+        "ROWS",
+        "CASCADE",
+        "HOUR",
+        "SCHEMA",
+        "CASCADED",
+        "IDENTITY",
+        "SCROLL",
+        "CASE",
+        "IMMEDIATE",
+        "SECOND",
+        "CAST",
+        "IN",
+        "SECTION",
+        "CATALOG",
+        "INCLUDE",
+        "SELECT",
+        "CHAR",
+        "INDEX",
+        "SESSION",
+        "CHAR_LENGTH",
+        "INDICATOR",
+        "SESSION_USER",
+        "CHARACTER",
+        "INITIALLY",
+        "SET",
+        "CHARACTER_LENGTH",
+        "INNER",
+        "SIZE",
+        "CHECK",
+        "INPUT",
+        "SMALLINT",
+        "CLOSE",
+        "INSENSITIVE",
+        "SOME",
+        "COALESCE",
+        "INSERT",
+        "SPACE",
+        "COLLATE",
+        "INT",
+        "SQL",
+        "COLLATION",
+        "INTEGER",
+        "SQLCA",
+        "COLUMN",
+        "INTERSECT",
+        "SQLCODE",
+        "COMMIT",
+        "INTERVAL",
+        "SQLERROR",
+        "CONNECT",
+        "INTO",
+        "SQLSTATE",
+        "CONNECTION",
+        "IS",
+        "SQLWARNING",
+        "CONSTRAINT",
+        "ISOLATION",
+        "SUBSTRING",
+        "CONSTRAINTS",
+        "JOIN",
+        "SUM",
+        "CONTINUE",
+        "KEY",
+        "SYSTEM_USER",
+        "CONVERT",
+        "LANGUAGE",
+        "TABLE",
+        "CORRESPONDING",
+        "LAST",
+        "TEMPORARY",
+        "COUNT",
+        "LEADING",
+        "THEN",
+        "CREATE",
+        "LEFT",
+        "TIME",
+        "CROSS",
+        "LEVEL",
+        "TIMESTAMP",
+        "CURRENT",
+        "LIKE",
+        "TIMEZONE_HOUR",
+        "CURRENT_DATE",
+        "LOCAL",
+        "TIMEZONE_MINUTE",
+        "CURRENT_TIME",
+        "LOWER",
+        "TO",
+        "CURRENT_TIMESTAMP",
+        "MATCH",
+        "TRAILING",
+        "CURRENT_USER",
+        "MAX",
+        "TRANSACTION",
+        "CURSOR",
+        "MIN",
+        "TRANSLATE",
+        "DATE",
+        "MINUTE",
+        "TRANSLATION",
+        "DAY",
+        "MODULE",
+        "TRIM",
+        "DEALLOCATE",
+        "MONTH",
+        "TRUE",
+        "DEC",
+        "NAMES",
+        "UNION",
+        "DECIMAL",
+        "NATIONAL",
+        "UNIQUE",
+        "DECLARE",
+        "NATURAL",
+        "UNKNOWN",
+        "DEFAULT",
+        "NCHAR",
+        "UPDATE",
+        "DEFERRABLE",
+        "NEXT",
+        "UPPER",
+        "DEFERRED",
+        "NO",
+        "USAGE",
+        "DELETE",
+        "NONE",
+        "USER",
+        "DESC",
+        "NOT",
+        "USING",
+        "DESCRIBE",
+        "NULL",
+        "VALUE",
+        "DESCRIPTOR",
+        "NULLIF",
+        "VALUES",
+        "DIAGNOSTICS",
+        "NUMERIC",
+        "VARCHAR",
+        "DISCONNECT",
+        "OCTET_LENGTH",
+        "VARYING",
+        "DISTINCT",
+        "OF",
+        "VIEW",
+        "DOMAIN",
+        "ON",
+        "WHEN",
+        "DOUBLE",
+        "ONLY",
+        "WHENEVER",
+        "DROP",
+        "OPEN",
+        "WHERE",
+        "ELSE",
+        "OPTION",
+        "WITH",
+        "END",
+        "OR",
+        "WORK",
+        "END-EXEC",
+        "ORDER",
+        "WRITE",
+        "ESCAPE",
+        "OUTER",
+        "YEAR",
+        "EXCEPT",
+        "OUTPUT",
+        "ZONE",
+        "EXCEPTION"
     };
     TreeMap myKeywordMap = new TreeMap();
     for (int i = 0; i < allIotdbSQLKeywords.length; i++) {
@@ -487,12 +487,11 @@
 
   private WatermarkEncoder getWatermarkEncoder() {
     try {
-      groupedLSBWatermarkEncoder =
-          new GroupedLSBWatermarkEncoder(
-              client.getProperties().getWatermarkSecretKey(),
-              client.getProperties().getWatermarkBitString(),
-              client.getProperties().getWatermarkParamMarkRate(),
-              client.getProperties().getWatermarkParamMaxRightBit());
+      groupedLSBWatermarkEncoder = new GroupedLSBWatermarkEncoder(
+          client.getProperties().getWatermarkSecretKey(),
+          client.getProperties().getWatermarkBitString(),
+          client.getProperties().getWatermarkParamMarkRate(),
+          client.getProperties().getWatermarkParamMaxRightBit());
     } catch (TException e) {
       e.printStackTrace();
     }
@@ -878,8 +877,7 @@
       for (Map<String, Object> map : listPaths) {
         TSDataType columnType = (TSDataType) map.get("type");
         Object val = map.get("val");
-        org.apache.iotdb.tsfile.read.common.Field field =
-            new org.apache.iotdb.tsfile.read.common.Field(columnType);
+        org.apache.iotdb.tsfile.read.common.Field field = new org.apache.iotdb.tsfile.read.common.Field(columnType);
         switch (columnType) {
           case TEXT:
             field.setBinaryV(new Binary(val.toString()));
@@ -922,8 +920,8 @@
     fields[1] = new Field("", "MAX_LEN", "INT32");
     fields[2] = new Field("", "DEFAULT_VALUE", "INT32");
     fields[3] = new Field("", "DESCRIPTION", "TEXT");
-    List<TSDataType> tsDataTypeList =
-        Arrays.asList(TSDataType.TEXT, TSDataType.INT32, TSDataType.INT32, TSDataType.TEXT);
+    List<TSDataType> tsDataTypeList = Arrays.asList(TSDataType.TEXT, TSDataType.INT32, TSDataType.INT32,
+        TSDataType.TEXT);
     List<Object> values = Arrays.asList("fetch_size", 10, 10, "");
 
     List<String> columnNameList = new ArrayList<>();
@@ -981,7 +979,7 @@
       sql = sql + " " + schemaPattern;
     }
     if (((catalog != null && catalog.length() > 0)
-            || schemaPattern != null && schemaPattern.length() > 0)
+        || schemaPattern != null && schemaPattern.length() > 0)
         && tableNamePattern != null
         && tableNamePattern.length() > 0) {
       if (tableNamePattern.contains("%")) {
@@ -991,7 +989,7 @@
     }
 
     if (((catalog != null && catalog.length() > 0)
-            || schemaPattern != null && schemaPattern.length() > 0)
+        || schemaPattern != null && schemaPattern.length() > 0)
         && tableNamePattern != null
         && tableNamePattern.length() > 0
         && columnNamePattern != null
@@ -1008,16 +1006,15 @@
     fields[5] = new Field("", "GRANTEE", "TEXT");
     fields[6] = new Field("", "PRIVILEGE", "TEXT");
     fields[7] = new Field("", "IS_GRANTABLE", "TEXT");
-    List<TSDataType> tsDataTypeList =
-        Arrays.asList(
-            TSDataType.TEXT,
-            TSDataType.TEXT,
-            TSDataType.TEXT,
-            TSDataType.TEXT,
-            TSDataType.TEXT,
-            TSDataType.TEXT,
-            TSDataType.TEXT,
-            TSDataType.TEXT);
+    List<TSDataType> tsDataTypeList = Arrays.asList(
+        TSDataType.TEXT,
+        TSDataType.TEXT,
+        TSDataType.TEXT,
+        TSDataType.TEXT,
+        TSDataType.TEXT,
+        TSDataType.TEXT,
+        TSDataType.TEXT,
+        TSDataType.TEXT);
 
     List<String> columnNameList = new ArrayList<>();
     List<String> columnTypeList = new ArrayList<>();
@@ -1272,25 +1269,24 @@
     fields[14] = new Field("", "ORDINAL_POSITION", "INT32");
     fields[15] = new Field("", "IS_NULLABLE", "TEXT");
     fields[16] = new Field("", "SPECIFIC_NAME", "TEXT");
-    List<TSDataType> tsDataTypeList =
-        Arrays.asList(
-            TSDataType.TEXT,
-            TSDataType.TEXT,
-            TSDataType.TEXT,
-            TSDataType.TEXT,
-            TSDataType.INT32,
-            TSDataType.INT32,
-            TSDataType.TEXT,
-            TSDataType.INT32,
-            TSDataType.INT32,
-            TSDataType.INT32,
-            TSDataType.INT32,
-            TSDataType.INT32,
-            TSDataType.TEXT,
-            TSDataType.INT32,
-            TSDataType.INT32,
-            TSDataType.TEXT,
-            TSDataType.TEXT);
+    List<TSDataType> tsDataTypeList = Arrays.asList(
+        TSDataType.TEXT,
+        TSDataType.TEXT,
+        TSDataType.TEXT,
+        TSDataType.TEXT,
+        TSDataType.INT32,
+        TSDataType.INT32,
+        TSDataType.TEXT,
+        TSDataType.INT32,
+        TSDataType.INT32,
+        TSDataType.INT32,
+        TSDataType.INT32,
+        TSDataType.INT32,
+        TSDataType.TEXT,
+        TSDataType.INT32,
+        TSDataType.INT32,
+        TSDataType.TEXT,
+        TSDataType.TEXT);
 
     List<String> columnNameList = new ArrayList<>();
     List<String> columnTypeList = new ArrayList<>();
@@ -1352,14 +1348,13 @@
     fields[3] = new Field("", "REMARKS", "TEXT");
     fields[4] = new Field("", "FUNCTION_TYPE", "INT32");
     fields[5] = new Field("", "SPECIFIC_NAME", "TEXT");
-    List<TSDataType> tsDataTypeList =
-        Arrays.asList(
-            TSDataType.TEXT,
-            TSDataType.TEXT,
-            TSDataType.TEXT,
-            TSDataType.TEXT,
-            TSDataType.INT32,
-            TSDataType.TEXT);
+    List<TSDataType> tsDataTypeList = Arrays.asList(
+        TSDataType.TEXT,
+        TSDataType.TEXT,
+        TSDataType.TEXT,
+        TSDataType.TEXT,
+        TSDataType.INT32,
+        TSDataType.TEXT);
 
     List<String> columnNameList = new ArrayList<>();
     List<String> columnTypeList = new ArrayList<>();
@@ -1673,14 +1668,13 @@
     List<String> columnNameList = new ArrayList<>();
     List<String> columnTypeList = new ArrayList<>();
     Map<String, Integer> columnNameIndex = new HashMap<>();
-    List<TSDataType> tsDataTypeList =
-        Arrays.asList(
-            TSDataType.TEXT,
-            TSDataType.TEXT,
-            TSDataType.TEXT,
-            TSDataType.TEXT,
-            TSDataType.INT32,
-            TSDataType.TEXT);
+    List<TSDataType> tsDataTypeList = Arrays.asList(
+        TSDataType.TEXT,
+        TSDataType.TEXT,
+        TSDataType.TEXT,
+        TSDataType.TEXT,
+        TSDataType.INT32,
+        TSDataType.TEXT);
     Field[] fields = new Field[6];
     fields[0] = new Field("", "TABLE_CAT", "TEXT");
     fields[1] = new Field("", "TABLE_SCHEM", "TEXT");
@@ -1841,24 +1835,22 @@
     fields[9] = new Field("", "REMARKS", "TEXT");
     fields[10] = new Field("", "CHAR_OCTET_LENGTH", "INT32");
     fields[11] = new Field("", "IS_NULLABLE", "TEXT");
-    List<TSDataType> tsDataTypeList =
-        Arrays.asList(
-            TSDataType.TEXT,
-            TSDataType.TEXT,
-            TSDataType.TEXT,
-            TSDataType.TEXT,
-            TSDataType.INT32,
-            TSDataType.INT32,
-            TSDataType.INT32,
-            TSDataType.INT32,
-            TSDataType.TEXT,
-            TSDataType.TEXT,
-            TSDataType.INT32,
-            TSDataType.TEXT);
+    List<TSDataType> tsDataTypeList = Arrays.asList(
+        TSDataType.TEXT,
+        TSDataType.TEXT,
+        TSDataType.TEXT,
+        TSDataType.TEXT,
+        TSDataType.INT32,
+        TSDataType.INT32,
+        TSDataType.INT32,
+        TSDataType.INT32,
+        TSDataType.TEXT,
+        TSDataType.TEXT,
+        TSDataType.INT32,
+        TSDataType.TEXT);
 
-    List<Object> value =
-        Arrays.asList(
-            catalog, catalog, tableNamePattern, "times", Types.BIGINT, 1, 0, 2, "", "", 13, "NO");
+    List<Object> value = Arrays.asList(
+        catalog, catalog, tableNamePattern, "times", Types.BIGINT, 1, 0, 2, "", "", 13, "NO");
     List<String> columnNameList = new ArrayList<>();
     List<String> columnTypeList = new ArrayList<>();
     Map<String, Integer> columnNameIndex = new HashMap<>();
@@ -2106,7 +2098,7 @@
       sql = sql + " " + schemaPattern;
     }
     if (((catalog != null && catalog.length() > 0)
-            || schemaPattern != null && schemaPattern.length() > 0)
+        || schemaPattern != null && schemaPattern.length() > 0)
         && tableNamePattern != null
         && tableNamePattern.length() > 0) {
       if (tableNamePattern.contains("%")) {
@@ -2125,16 +2117,15 @@
     fields[5] = new Field("", "GRANTEE", "TEXT");
     fields[6] = new Field("", "PRIVILEGE", "TEXT");
     fields[7] = new Field("", "IS_GRANTABLE", "TEXT");
-    List<TSDataType> tsDataTypeList =
-        Arrays.asList(
-            TSDataType.TEXT,
-            TSDataType.TEXT,
-            TSDataType.TEXT,
-            TSDataType.TEXT,
-            TSDataType.TEXT,
-            TSDataType.TEXT,
-            TSDataType.TEXT,
-            TSDataType.TEXT);
+    List<TSDataType> tsDataTypeList = Arrays.asList(
+        TSDataType.TEXT,
+        TSDataType.TEXT,
+        TSDataType.TEXT,
+        TSDataType.TEXT,
+        TSDataType.TEXT,
+        TSDataType.TEXT,
+        TSDataType.TEXT,
+        TSDataType.TEXT);
 
     List<String> columnNameList = new ArrayList<>();
     List<String> columnTypeList = new ArrayList<>();
@@ -2247,7 +2238,7 @@
       sql = sql + " " + schemaPattern;
     }
     if (((catalog != null && catalog.length() > 0)
-            || schemaPattern != null && schemaPattern.length() > 0)
+        || schemaPattern != null && schemaPattern.length() > 0)
         && tableNamePattern != null
         && tableNamePattern.length() > 0) {
       if (tableNamePattern.contains("%")) {
@@ -2257,7 +2248,7 @@
     }
 
     if (((catalog != null && catalog.length() > 0)
-            || schemaPattern != null && schemaPattern.length() > 0)
+        || schemaPattern != null && schemaPattern.length() > 0)
         && tableNamePattern != null
         && tableNamePattern.length() > 0
         && columnNamePattern != null
@@ -2291,32 +2282,31 @@
     fields[22] = new Field("", "IS_AUTOINCREMENT", "TEXT");
     fields[23] = new Field("", "IS_GENERATEDCOLUMN", "TEXT");
 
-    List<TSDataType> tsDataTypeList =
-        Arrays.asList(
-            TSDataType.TEXT,
-            TSDataType.TEXT,
-            TSDataType.TEXT,
-            TSDataType.TEXT,
-            TSDataType.INT32,
-            TSDataType.TEXT,
-            TSDataType.INT32,
-            TSDataType.INT32,
-            TSDataType.INT32,
-            TSDataType.INT32,
-            TSDataType.INT32,
-            TSDataType.TEXT,
-            TSDataType.TEXT,
-            TSDataType.INT32,
-            TSDataType.INT32,
-            TSDataType.INT32,
-            TSDataType.INT32,
-            TSDataType.TEXT,
-            TSDataType.TEXT,
-            TSDataType.TEXT,
-            TSDataType.TEXT,
-            TSDataType.INT32,
-            TSDataType.TEXT,
-            TSDataType.TEXT);
+    List<TSDataType> tsDataTypeList = Arrays.asList(
+        TSDataType.TEXT,
+        TSDataType.TEXT,
+        TSDataType.TEXT,
+        TSDataType.TEXT,
+        TSDataType.INT32,
+        TSDataType.TEXT,
+        TSDataType.INT32,
+        TSDataType.INT32,
+        TSDataType.INT32,
+        TSDataType.INT32,
+        TSDataType.INT32,
+        TSDataType.TEXT,
+        TSDataType.TEXT,
+        TSDataType.INT32,
+        TSDataType.INT32,
+        TSDataType.INT32,
+        TSDataType.INT32,
+        TSDataType.TEXT,
+        TSDataType.TEXT,
+        TSDataType.TEXT,
+        TSDataType.TEXT,
+        TSDataType.INT32,
+        TSDataType.TEXT,
+        TSDataType.TEXT);
     List<String> columnNameList = new ArrayList<>();
     List<String> columnTypeList = new ArrayList<>();
     Map<String, Integer> columnNameIndex = new HashMap<>();
@@ -2495,7 +2485,7 @@
       sql = sql + " " + schemaPattern;
     }
     if (((catalog != null && catalog.length() > 0)
-            || schemaPattern != null && schemaPattern.length() > 0)
+        || schemaPattern != null && schemaPattern.length() > 0)
         && tableNamePattern != null
         && tableNamePattern.length() > 0) {
       if (tableNamePattern.contains("%")) {
@@ -2516,18 +2506,17 @@
     fields[8] = new Field("", "SELF_REFERENCING_COL_NAME", "TEXT");
     fields[9] = new Field("", "REF_GENERATION", "TEXT");
 
-    List<TSDataType> tsDataTypeList =
-        Arrays.asList(
-            TSDataType.TEXT,
-            TSDataType.TEXT,
-            TSDataType.TEXT,
-            TSDataType.TEXT,
-            TSDataType.TEXT,
-            TSDataType.TEXT,
-            TSDataType.TEXT,
-            TSDataType.TEXT,
-            TSDataType.TEXT,
-            TSDataType.TEXT);
+    List<TSDataType> tsDataTypeList = Arrays.asList(
+        TSDataType.TEXT,
+        TSDataType.TEXT,
+        TSDataType.TEXT,
+        TSDataType.TEXT,
+        TSDataType.TEXT,
+        TSDataType.TEXT,
+        TSDataType.TEXT,
+        TSDataType.TEXT,
+        TSDataType.TEXT,
+        TSDataType.TEXT);
 
     List<String> columnNameList = new ArrayList<>();
     List<String> columnTypeList = new ArrayList<>();
@@ -2604,149 +2593,141 @@
     fields[15] = new Field("", "SQL_DATA_TYPE", "INT32");
     fields[16] = new Field("", "SQL_DATETIME_SUB", "INT32");
     fields[17] = new Field("", "NUM_PREC_RADIX", "INT32");
-    List<TSDataType> tsDataTypeList =
-        Arrays.asList(
-            TSDataType.TEXT,
-            TSDataType.INT32,
-            TSDataType.INT32,
-            TSDataType.TEXT,
-            TSDataType.TEXT,
-            TSDataType.TEXT,
-            TSDataType.INT32,
-            TSDataType.BOOLEAN,
-            TSDataType.TEXT,
-            TSDataType.BOOLEAN,
-            TSDataType.BOOLEAN,
-            TSDataType.BOOLEAN,
-            TSDataType.TEXT,
-            TSDataType.INT32,
-            TSDataType.INT32,
-            TSDataType.INT32,
-            TSDataType.INT32,
-            TSDataType.INT32);
-    List<Object> listValSub_1 =
-        Arrays.asList(
-            "INT32",
-            Types.INTEGER,
-            10,
-            "",
-            "",
-            "",
-            1,
-            true,
-            "",
-            false,
-            true,
-            false,
-            "",
-            0,
-            10,
-            0,
-            0,
-            10);
-    List<Object> listValSub_2 =
-        Arrays.asList(
-            "INT64",
-            Types.BIGINT,
-            19,
-            "",
-            "",
-            "",
-            1,
-            true,
-            "",
-            false,
-            true,
-            false,
-            "",
-            0,
-            10,
-            0,
-            0,
-            10);
-    List<Object> listValSub_3 =
-        Arrays.asList(
-            "BOOLEAN",
-            Types.BOOLEAN,
-            1,
-            "",
-            "",
-            "",
-            1,
-            true,
-            "",
-            false,
-            true,
-            false,
-            "",
-            0,
-            10,
-            0,
-            0,
-            10);
-    List<Object> listValSub_4 =
-        Arrays.asList(
-            "FLOAT",
-            Types.FLOAT,
-            38,
-            "",
-            "",
-            "",
-            1,
-            true,
-            "",
-            false,
-            true,
-            false,
-            "",
-            0,
-            10,
-            0,
-            0,
-            10);
-    List<Object> listValSub_5 =
-        Arrays.asList(
-            "DOUBLE",
-            Types.DOUBLE,
-            308,
-            "",
-            "",
-            "",
-            1,
-            true,
-            "",
-            false,
-            true,
-            false,
-            "",
-            0,
-            10,
-            0,
-            0,
-            10);
-    List<Object> listValSub_6 =
-        Arrays.asList(
-            "TEXT",
-            Types.LONGVARCHAR,
-            64,
-            "",
-            "",
-            "",
-            1,
-            true,
-            "",
-            false,
-            true,
-            false,
-            "",
-            0,
-            10,
-            0,
-            0,
-            10);
-    List<List<Object>> valuesList =
-        Arrays.asList(
-            listValSub_1, listValSub_2, listValSub_3, listValSub_4, listValSub_5, listValSub_6);
+    List<TSDataType> tsDataTypeList = Arrays.asList(
+        TSDataType.TEXT,
+        TSDataType.INT32,
+        TSDataType.INT32,
+        TSDataType.TEXT,
+        TSDataType.TEXT,
+        TSDataType.TEXT,
+        TSDataType.INT32,
+        TSDataType.BOOLEAN,
+        TSDataType.TEXT,
+        TSDataType.BOOLEAN,
+        TSDataType.BOOLEAN,
+        TSDataType.BOOLEAN,
+        TSDataType.TEXT,
+        TSDataType.INT32,
+        TSDataType.INT32,
+        TSDataType.INT32,
+        TSDataType.INT32,
+        TSDataType.INT32);
+    List<Object> listValSub_1 = Arrays.asList(
+        "INT32",
+        Types.INTEGER,
+        10,
+        "",
+        "",
+        "",
+        1,
+        true,
+        "",
+        false,
+        true,
+        false,
+        "",
+        0,
+        10,
+        0,
+        0,
+        10);
+    List<Object> listValSub_2 = Arrays.asList(
+        "INT64",
+        Types.BIGINT,
+        19,
+        "",
+        "",
+        "",
+        1,
+        true,
+        "",
+        false,
+        true,
+        false,
+        "",
+        0,
+        10,
+        0,
+        0,
+        10);
+    List<Object> listValSub_3 = Arrays.asList(
+        "BOOLEAN",
+        Types.BOOLEAN,
+        1,
+        "",
+        "",
+        "",
+        1,
+        true,
+        "",
+        false,
+        true,
+        false,
+        "",
+        0,
+        10,
+        0,
+        0,
+        10);
+    List<Object> listValSub_4 = Arrays.asList(
+        "FLOAT",
+        Types.FLOAT,
+        38,
+        "",
+        "",
+        "",
+        1,
+        true,
+        "",
+        false,
+        true,
+        false,
+        "",
+        0,
+        10,
+        0,
+        0,
+        10);
+    List<Object> listValSub_5 = Arrays.asList(
+        "DOUBLE",
+        Types.DOUBLE,
+        308,
+        "",
+        "",
+        "",
+        1,
+        true,
+        "",
+        false,
+        true,
+        false,
+        "",
+        0,
+        10,
+        0,
+        0,
+        10);
+    List<Object> listValSub_6 = Arrays.asList(
+        "TEXT",
+        Types.LONGVARCHAR,
+        64,
+        "",
+        "",
+        "",
+        1,
+        true,
+        "",
+        false,
+        true,
+        false,
+        "",
+        0,
+        10,
+        0,
+        0,
+        10);
+    List<List<Object>> valuesList = Arrays.asList(
+        listValSub_1, listValSub_2, listValSub_3, listValSub_4, listValSub_5, listValSub_6);
     List<String> columnNameList = new ArrayList<>();
     List<String> columnTypeList = new ArrayList<>();
     Map<String, Integer> columnNameIndex = new HashMap<>();
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/SchemaUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/SchemaUtils.java
index 7b303e3..a9c25f5 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/SchemaUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/SchemaUtils.java
@@ -50,10 +50,10 @@
 
 public class SchemaUtils {
 
-  private SchemaUtils() {}
+  private SchemaUtils() {
+  }
 
-  private static final Map<TSDataType, Set<TSEncoding>> schemaChecker =
-      new EnumMap<>(TSDataType.class);
+  private static final Map<TSDataType, Set<TSEncoding>> schemaChecker = new EnumMap<>(TSDataType.class);
 
   static {
     Set<TSEncoding> booleanSet = new HashSet<>();
@@ -68,7 +68,7 @@
     intSet.add(TSEncoding.GORILLA);
     intSet.add(TSEncoding.ZIGZAG);
     intSet.add(TSEncoding.FREQ);
-    intSet.add(TSEncoding.PERIOD);
+    intSet.add(TSEncoding.FLEA);
     schemaChecker.put(TSDataType.INT32, intSet);
     schemaChecker.put(TSDataType.INT64, intSet);
 
@@ -118,8 +118,10 @@
   }
 
   /**
-   * If the datatype of 'aggregation' depends on 'measurementDataType' (min_value, max_value),
-   * return 'measurementDataType' directly, or return a list whose elements are all the datatype of
+   * If the datatype of 'aggregation' depends on 'measurementDataType' (min_value,
+   * max_value),
+   * return 'measurementDataType' directly, or return a list whose elements are
+   * all the datatype of
    * 'aggregation' and its length is the same as 'measurementDataType'.
    *
    * @param measurementDataType
@@ -186,7 +188,8 @@
   }
 
   /**
-   * judge whether the order of aggregation calculation is consistent with the order of traversing
+   * judge whether the order of aggregation calculation is consistent with the
+   * order of traversing
    * data
    */
   public static boolean isConsistentWithScanOrder(
@@ -213,7 +216,8 @@
   }
 
   /**
-   * If e or one of its recursive causes is a PathNotExistException or StorageGroupNotSetException,
+   * If e or one of its recursive causes is a PathNotExistException or
+   * StorageGroupNotSetException,
    * return such an exception or null if it cannot be found.
    *
    * @param currEx
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/encoding/decoder/Decoder.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/encoding/decoder/Decoder.java
index a459f98..69d4f12 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/encoding/decoder/Decoder.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/encoding/decoder/Decoder.java
@@ -123,11 +123,11 @@
         }
       case FREQ:
         return new FreqDecoder();
-      case PERIOD:
+      case FLEA:
         switch (dataType) {
           case INT32:
             // Spade TODO: replace plain with period parameter
-            return new PeriodDecoder();
+            return new FLEADecoder();
           default:
             throw new TsFileDecodingException(String.format(ERROR_MSG, encoding, dataType));
         }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/encoding/decoder/FLEADecoder.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/encoding/decoder/FLEADecoder.java
new file mode 100644
index 0000000..d98132b
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/encoding/decoder/FLEADecoder.java
@@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.tsfile.encoding.decoder;
+
+import org.apache.iotdb.tsfile.exception.encoding.TsFileDecodingException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.utils.Binary;
+
+import org.apache.commons.math3.complex.Complex;
+import org.apache.commons.math3.transform.DftNormalization;
+import org.apache.commons.math3.transform.FastFourierTransformer;
+import org.apache.commons.math3.transform.TransformType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Queue;
+
+public class FLEADecoder extends Decoder {
+
+  private static final Logger logger = LoggerFactory.getLogger(FLEADecoder.class);
+  private Queue<Integer> decodedValues;
+
+  public FLEADecoder() {
+    super(TSEncoding.FLEA);
+    this.decodedValues = new ArrayDeque<>();
+  }
+
+  @Override
+  public int readInt(ByteBuffer buffer) {
+    if (!hasNext(buffer)) {
+      throw new TsFileDecodingException("No more values to read.");
+    }
+    return decodedValues.poll();
+  }
+
+  @Override
+  public boolean hasNext(ByteBuffer buffer) {
+    if (!decodedValues.isEmpty()) {
+      return true;
+    }
+    if (buffer.remaining() > 0) {
+      decodeNextBlock(buffer);
+      return !decodedValues.isEmpty();
+    }
+    return false;
+  }
+
+  private void decodeNextBlock(ByteBuffer buffer) {
+    if (buffer.remaining() < 4) {
+      return;
+    }
+    int blockLength = buffer.getInt();
+
+    if (buffer.remaining() < blockLength) {
+      throw new TsFileDecodingException(
+          "Error when decoding FLEA block: expected "
+              + blockLength
+              + " bytes, but only "
+              + buffer.remaining()
+              + " are available.");
+    }
+
+    byte[] blockBytes = new byte[blockLength];
+    buffer.get(blockBytes);
+
+    try {
+      int[] blockOfInts = decodeBlock(blockBytes);
+      for (int val : blockOfInts) {
+        decodedValues.add(val);
+      }
+    } catch (IOException e) {
+      throw new TsFileDecodingException("Error when decoding FLEA block.", e);
+    }
+  }
+
+  private int[] decodeBlock(byte[] blockBytes) throws IOException {
+    ByteArrayInputStream bais = new ByteArrayInputStream(blockBytes);
+
+    OptimalParams params = decodeMetadata(bais);
+    int n = params.n;
+
+    Complex[] F_hat_complex_dequant = decodeFrequencyComponent(params, bais);
+
+    int[] R_star = decodeResidualComponent(params, bais);
+
+    Complex[] F_hat_full = new Complex[n];
+    for (int j = 0; j < F_hat_complex_dequant.length; j++) {
+      F_hat_full[j] = F_hat_complex_dequant[j];
+    }
+    for (int j = 1; j < F_hat_complex_dequant.length; j++) {
+      if (n - j < n) {
+        F_hat_full[n - j] = F_hat_complex_dequant[j].conjugate();
+      }
+    }
+
+    FastFourierTransformer fft = new FastFourierTransformer(DftNormalization.STANDARD);
+    Complex[] reconstructedTimeComplex = fft.transform(F_hat_full, TransformType.INVERSE);
+
+    int[] finalValues = new int[n];
+    for (int k = 0; k < n; k++) {
+      finalValues[k] = (int) Math.round(reconstructedTimeComplex[k].getReal() + R_star[k]);
+    }
+
+    return finalValues;
+  }
+
+  private static class OptimalParams {
+    int n;
+    int betaStar;
+    int pReStar;
+    int pImStar;
+    int dResidualStar;
+  }
+
+  private OptimalParams decodeMetadata(ByteArrayInputStream bais) throws IOException {
+
+    final int METADATA_SIZE_BYTES = 20;
+    byte[] metaBytes = new byte[METADATA_SIZE_BYTES];
+
+    int bytesRead = bais.read(metaBytes);
+    if (bytesRead != METADATA_SIZE_BYTES) {
+      throw new IOException(
+          "Failed to read complete metadata block. Expected "
+              + METADATA_SIZE_BYTES
+              + " bytes, but got "
+              + bytesRead);
+    }
+
+    ByteBuffer metaBuffer = ByteBuffer.wrap(metaBytes);
+
+    OptimalParams params = new OptimalParams();
+    params.n = metaBuffer.getInt();
+    params.betaStar = metaBuffer.getInt();
+    params.pReStar = metaBuffer.getInt();
+    params.pImStar = metaBuffer.getInt();
+    params.dResidualStar = metaBuffer.getInt();
+
+    logger.debug(
+        "Decoded metadata: n={}, beta={}, p_re={}, p_im={}, d_res={}",
+        params.n,
+        params.betaStar,
+        params.pReStar,
+        params.pImStar,
+        params.dResidualStar);
+
+    if (params.n <= 0
+        || params.betaStar < 0
+        || params.pReStar < 0
+        || params.pImStar < 0
+        || params.dResidualStar < 0) {
+      throw new TsFileDecodingException("Invalid metadata decoded: " + params);
+    }
+
+    return params;
+  }
+
+  private Complex[] decodeFrequencyComponent(OptimalParams params, ByteArrayInputStream bais)
+      throws IOException {
+    BitInputStream bitIn = new BitInputStream(bais);
+
+    int n_half = params.n / 2 + 1;
+
+    int[] F_re_desc = decodeGVLE(params.pReStar, bitIn);
+    int[] F_re_sparse = decodeDBP(n_half - params.pReStar, bitIn);
+
+    int[] F_im_desc = decodeGVLE(params.pImStar, bitIn);
+    int[] F_im_sparse = decodeDBP(n_half - params.pImStar, bitIn);
+
+    bitIn.close();
+
+    int[] F_hat_re = new int[n_half];
+    System.arraycopy(F_re_desc, 0, F_hat_re, 0, F_re_desc.length);
+    if (F_re_sparse.length > 0)
+      System.arraycopy(F_re_sparse, 0, F_hat_re, F_re_desc.length, F_re_sparse.length);
+    int[] F_hat_im = new int[n_half];
+    System.arraycopy(F_im_desc, 0, F_hat_im, 0, F_im_desc.length);
+    if (F_im_sparse.length > 0)
+      System.arraycopy(F_im_sparse, 0, F_hat_im, F_im_desc.length, F_im_sparse.length);
+
+    double quantizationFactor = Math.pow(2, params.betaStar);
+    Complex[] dequantizedFreq = new Complex[n_half];
+    for (int j = 0; j < n_half; j++) {
+      dequantizedFreq[j] =
+          new Complex(
+              (double) F_hat_re[j] * quantizationFactor, (double) F_hat_im[j] * quantizationFactor);
+    }
+    return dequantizedFreq;
+  }
+
+  private int[] decodeGVLE(int totalValuesToRead, BitInputStream bitIn) throws IOException {
+    if (totalValuesToRead == 0) {
+      return new int[0];
+    }
+
+    List<Integer> decodedList = new ArrayList<>(totalValuesToRead);
+    int valuesRead = 0;
+
+    while (valuesRead < totalValuesToRead) {
+      int sharedWidth = bitIn.read(5);
+      int groupLength = bitIn.read(11);
+
+      if (valuesRead + groupLength > totalValuesToRead) {
+        throw new TsFileDecodingException(
+            "GVLE decoding error: group length "
+                + groupLength
+                + " exceeds remaining values to read "
+                + (totalValuesToRead - valuesRead));
+      }
+
+      for (int i = 0; i < groupLength; i++) {
+        int sign = bitIn.read(1);
+
+        int magnitude = 0;
+        if (sharedWidth > 0) {
+          magnitude = bitIn.read(sharedWidth);
+        }
+
+        int value = (sign == 1) ? -magnitude : magnitude;
+        decodedList.add(value);
+      }
+
+      valuesRead += groupLength;
+    }
+
+    return decodedList.stream().mapToInt(i -> i).toArray();
+  }
+
+  private static class DBPValue {
+    int value;
+    int position;
+
+    DBPValue(int value, int position) {
+      this.value = value;
+      this.position = position;
+    }
+  }
+
+  private int[] decodeDBP(int segmentLength, BitInputStream bitIn) throws IOException {
+    int nonZeroCount = bitIn.read(16);
+
+    if (nonZeroCount == 0) {
+      return new int[segmentLength];
+    }
+
+    List<Integer> positions = new ArrayList<>(nonZeroCount);
+    int posWidth = (segmentLength == 0) ? 0 : 32 - Integer.numberOfLeadingZeros(segmentLength - 1);
+    if (posWidth == 0) posWidth = 1;
+
+    for (int i = 0; i < nonZeroCount; i++) {
+      positions.add(bitIn.read(posWidth));
+    }
+
+    List<Integer> values = new ArrayList<>(nonZeroCount);
+
+    int prevValWidth = bitIn.read(5);
+
+    int sign = bitIn.read(1);
+    int magnitude = bitIn.read(prevValWidth);
+    int firstVal = (sign == 1) ? -magnitude : magnitude;
+    values.add(firstVal);
+
+    for (int i = 1; i < nonZeroCount; i++) {
+      int currentSign = bitIn.read(1);
+      int currentMagnitude = bitIn.read(prevValWidth);
+      int currentVal = (currentSign == 1) ? -currentMagnitude : currentMagnitude;
+      values.add(currentVal);
+
+      int currentAbsVal = Math.abs(currentVal);
+      prevValWidth = (currentAbsVal == 0) ? 1 : 32 - Integer.numberOfLeadingZeros(currentAbsVal);
+    }
+
+    int[] sequence = new int[segmentLength];
+    for (int i = 0; i < nonZeroCount; i++) {
+      int pos = positions.get(i);
+      int val = values.get(i);
+      if (pos < segmentLength) {
+        sequence[pos] = val;
+      } else {
+        throw new TsFileDecodingException(
+            "Decoded position " + pos + " is out of bounds for segment length " + segmentLength);
+      }
+    }
+
+    return sequence;
+  }
+
+  private int[] decodeResidualComponent(OptimalParams params, ByteArrayInputStream bais)
+      throws IOException {
+    BitInputStream bitIn = new BitInputStream(bais);
+    int[] residuals = decodeHybridResiduals(params.n, params.dResidualStar, bitIn);
+    return residuals;
+  }
+
+  private int[] decodeHybridResiduals(int n, int optimalD, BitInputStream bitIn)
+      throws IOException {
+    if (n == 0) {
+      return new int[0];
+    }
+
+    int[] residuals = new int[n];
+    int lowBitMask = (1 << optimalD) - 1;
+
+    for (int i = 0; i < n; i++) {
+      int sign = bitIn.read(1);
+
+      int lowMagnitude = 0;
+      if (optimalD > 0) {
+        lowMagnitude = bitIn.read(optimalD);
+      }
+
+      residuals[i] = (sign == 1) ? -lowMagnitude : lowMagnitude;
+    }
+
+    int[] highBitSequence = decodeDBP(n, bitIn);
+
+    for (int i = 0; i < n; i++) {
+      if (highBitSequence[i] != 0) {
+        int signedHighPart = highBitSequence[i];
+        int absHighPart = Math.abs(signedHighPart);
+
+        int absLowPart = Math.abs(residuals[i]);
+        int fullMagnitude = (absHighPart << optimalD) | absLowPart;
+
+        residuals[i] = (signedHighPart < 0) ? -fullMagnitude : fullMagnitude;
+      }
+    }
+
+    return residuals;
+  }
+
+  private static class BitInputStream {
+
+    private final ByteArrayInputStream byteInput;
+    private int bitBuffer;
+    private int bitsAvailable;
+
+    public BitInputStream(ByteArrayInputStream byteInput) {
+      this.byteInput = byteInput;
+      this.bitBuffer = 0;
+      this.bitsAvailable = 0;
+    }
+
+    public int read(int numBits) throws IOException {
+      if (numBits <= 0 || numBits > 32) {
+        throw new IllegalArgumentException("Number of bits must be between 1 and 32.");
+      }
+
+      while (bitsAvailable < numBits) {
+        int nextByte = byteInput.read();
+        if (nextByte == -1) {
+          throw new IOException(
+              "Unexpected end of stream while trying to read " + numBits + " bits.");
+        }
+        bitBuffer = (bitBuffer << 8) | (nextByte & 0xFF);
+        bitsAvailable += 8;
+      }
+
+      int value = (bitBuffer >> (bitsAvailable - numBits));
+
+      bitsAvailable -= numBits;
+
+      int mask = (1 << bitsAvailable) - 1;
+      bitBuffer &= mask;
+
+      return value;
+    }
+
+    public void close() {}
+  }
+
+  @Override
+  public boolean readBoolean(ByteBuffer buffer) {
+    throw new UnsupportedOperationException("FLEA only supports INT32.");
+  }
+
+  @Override
+  public short readShort(ByteBuffer buffer) {
+    throw new UnsupportedOperationException("FLEA only supports INT32.");
+  }
+
+  @Override
+  public long readLong(ByteBuffer buffer) {
+    throw new UnsupportedOperationException("FLEA only supports INT32.");
+  }
+
+  @Override
+  public float readFloat(ByteBuffer buffer) {
+    throw new UnsupportedOperationException("FLEA only supports INT32.");
+  }
+
+  @Override
+  public double readDouble(ByteBuffer buffer) {
+    throw new UnsupportedOperationException("FLEA only supports INT32.");
+  }
+
+  @Override
+  public Binary readBinary(ByteBuffer buffer) {
+    throw new UnsupportedOperationException("FLEA only supports INT32.");
+  }
+
+  @Override
+  public BigDecimal readBigDecimal(ByteBuffer buffer) {
+    throw new UnsupportedOperationException("FLEA only supports INT32.");
+  }
+
+  @Override
+  public void reset() {
+    decodedValues.clear();
+  }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/encoding/decoder/PeriodDecoder.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/encoding/decoder/PeriodDecoder.java
deleted file mode 100644
index e45a2b7..0000000
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/encoding/decoder/PeriodDecoder.java
+++ /dev/null
@@ -1,255 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.tsfile.encoding.decoder;
-
-import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
-
-import org.apache.commons.math3.complex.Complex;
-import org.apache.commons.math3.transform.DftNormalization;
-import org.apache.commons.math3.transform.FastFourierTransformer;
-import org.apache.commons.math3.transform.TransformType;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-
-public class PeriodDecoder extends Decoder {
-
-  private static final int MAX_SIZE = 0x7FFFFFFF;
-  private static final int MAX_VALUE = 0x7FFFFFFF;
-  private static final int GROUP_SIZE = 8;
-
-  private static final FastFourierTransformer transformer =
-      new FastFourierTransformer(DftNormalization.STANDARD);
-
-  private static double[] irfft(Complex[] input, int n) {
-    Complex[] conjugates = new Complex[n];
-
-    // 构造逆FFT所需的复数数组
-    for (int i = 0; i < input.length; i++) {
-      if (i == 0 || (i == input.length - 1 && n % 2 == 0)) {
-        conjugates[i] = input[i];
-      } else {
-        conjugates[i] = input[i];
-        conjugates[n - i] = input[i].conjugate();
-      }
-    }
-
-    // 进行逆FFT,并除以长度进行归一化
-    Complex[] inverse = transformer.transform(conjugates, TransformType.INVERSE);
-    double[] result = new double[n];
-    for (int i = 0; i < n; i++) {
-      result[i] = inverse[i].getReal() / n;
-    }
-
-    return result;
-  }
-
-  private static Complex[] compRoundInverse(int[] ret, int beta) {
-    Complex[] result = new Complex[ret.length / 2];
-
-    for (int i = 0; i < ret.length; i += 2) {
-      double real = ret[i] * Math.pow(2, beta);
-      double imag = ret[i + 1] * Math.pow(2, beta);
-      result[i / 2] = new Complex(real, imag);
-    }
-
-    return result;
-  }
-
-  private static int bitLength(int value) {
-    return Integer.SIZE - Integer.numberOfLeadingZeros(value);
-  }
-
-  private int decode(ByteBuffer buffer, int bits) {
-    int value = 0;
-    for (int i = 0; i < bits; i++) {
-      value = (value << 1) | (buffer.get() & 1);
-    }
-    return value;
-  }
-
-  private List<Integer> descendingBitPackingDecode(ByteBuffer buffer) {
-    boolean sgn = decode(buffer, 1) == 1;
-    int dataLength = decode(buffer, bitLength(MAX_SIZE));
-    List<Integer> data = new ArrayList<>(dataLength);
-    for (int i = 0; i < dataLength; i++) {
-      data.add(0);
-    }
-    int indexLength = decode(buffer, bitLength(MAX_SIZE));
-    List<Integer> index = new ArrayList<>(indexLength);
-    for (int i = 0; i < indexLength; i++) {
-      index.add(0);
-    }
-    if (indexLength == 0) {
-      return data;
-    }
-    for (int i = 0; i < indexLength; i += GROUP_SIZE) {
-      int maxLen = decode(buffer, bitLength(bitLength(MAX_SIZE)));
-      for (int j = i; j < Math.min(i + GROUP_SIZE, indexLength); j++) {
-        index.set(j, decode(buffer, maxLen));
-      }
-    }
-    int firstLen = decode(buffer, bitLength(bitLength(MAX_VALUE)));
-    int currentLen = firstLen;
-    for (int i = 0; i < indexLength; i++) {
-      boolean currentSgn = sgn && decode(buffer, 1) == 1;
-      int value = decode(buffer, currentLen);
-      data.set(index.get(i), currentSgn ? -value : value);
-      currentLen = bitLength(value);
-    }
-    return data;
-  }
-
-  private List<Integer> separateStorageDecode(ByteBuffer buffer) {
-    int dataLength = decode(buffer, bitLength(MAX_SIZE));
-    List<Integer> sgn = new ArrayList<>(dataLength);
-    List<Integer> data = new ArrayList<>(dataLength);
-    for (int i = 0; i < dataLength; i++) {
-      sgn.add(0);
-      data.add(0);
-    }
-    int D = decode(buffer, bitLength(bitLength(MAX_VALUE)));
-    for (int i = 0; i < dataLength; i++) {
-      sgn.set(i, decode(buffer, 1));
-      data.set(i, decode(buffer, D));
-    }
-    List<Integer> high = descendingBitPackingDecode(buffer);
-    for (int i = 0; i < dataLength; i++) {
-      data.set(i, (sgn.get(i) == 0 ? 1 : -1) * ((high.get(i) << D) | data.get(i)));
-    }
-    return data;
-  }
-
-  public List<Integer> periodDecode(ByteBuffer buffer) throws IOException {
-    int p = decode(buffer, bitLength(MAX_SIZE));
-    if (p == 0) {
-      return separateStorageDecode(buffer);
-    }
-    boolean betaSgn = decode(buffer, 1) == 1;
-    int beta = decode(buffer, bitLength(bitLength(MAX_VALUE)));
-    if (betaSgn) {
-      beta *= -1;
-    }
-    Complex[] dataf =
-        compRoundInverse(
-            descendingBitPackingDecode(buffer).stream().mapToInt(i -> i).toArray(), beta);
-    List<Integer> res = cumulativeSum(separateStorageDecode(buffer));
-    int k = (res.size() + p - 1) / p;
-    double[] tmp = irfft(dataf, p);
-    List<Integer> tiledList = new ArrayList<>();
-    for (int j = 0; j < k; j++) for (int i = 0; i < tmp.length; i++) tiledList.add((int) tmp[i]);
-    return IntStream.range(0, res.size())
-        .map(i -> tiledList.get(i) - res.get(i))
-        .boxed()
-        .collect(Collectors.toList());
-  }
-
-  // 辅助方法: 累积求和
-  private List<Integer> cumulativeSum(List<Integer> list) {
-    int sum = 0;
-    for (int i = 0; i < list.size(); i++) {
-      sum += list.get(i);
-      list.set(i, sum);
-    }
-    return list;
-  }
-
-  List<Integer> data;
-  int curPos;
-  boolean isRead;
-
-  public PeriodDecoder() {
-    super(TSEncoding.PERIOD);
-    reset();
-  }
-
-  // @Override
-  // public boolean readBoolean(ByteBuffer buffer) {
-  // return buffer.get() != 0;
-  // }
-
-  // @Override
-  // public short readShort(ByteBuffer buffer) {
-  // return buffer.getShort();
-  // }
-
-  @Override
-  public int readInt(ByteBuffer buffer) {
-    try {
-      if (isRead == false) {
-        data = periodDecode(buffer);
-        isRead = true;
-        curPos = 0;
-      }
-      curPos++;
-      return data.get(curPos - 1);
-    } catch (Exception e) {
-      return -1;
-    }
-  }
-
-  // @Override
-  // public long readLong(ByteBuffer buffer) {
-  // return buffer.getLong();
-  // }
-
-  // @Override
-  // public float readFloat(ByteBuffer buffer) {
-  // return buffer.getFloat();
-  // }
-
-  // @Override
-  // public double readDouble(ByteBuffer buffer) {
-  // return buffer.getDouble();
-  // }
-
-  // @Override
-  // public Binary readBinary(ByteBuffer buffer) {
-  // int length = readInt(buffer);
-  // byte[] buf = new byte[length];
-  // buffer.get(buf, 0, buf.length);
-  // return new Binary(buf);
-  // }
-
-  @Override
-  public boolean hasNext(ByteBuffer buffer) throws IOException {
-    if (isRead == false) {
-      data = periodDecode(buffer);
-      isRead = true;
-      curPos = 0;
-    }
-    return curPos < data.size();
-  }
-
-  // @Override
-  // public BigDecimal readBigDecimal(ByteBuffer buffer) {
-  // throw new TsFileDecodingException("Method readBigDecimal is not supported by
-  // PeriodDecoder");
-  // }
-
-  @Override
-  public void reset() {
-    isRead = false;
-  }
-}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/encoding/encoder/FLEAEncoder.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/encoding/encoder/FLEAEncoder.java
new file mode 100644
index 0000000..95b44f9
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/encoding/encoder/FLEAEncoder.java
@@ -0,0 +1,681 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.tsfile.encoding.encoder;
+
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.utils.Binary;
+
+import org.apache.commons.math3.complex.Complex;
+import org.apache.commons.math3.transform.DftNormalization;
+import org.apache.commons.math3.transform.FastFourierTransformer;
+import org.apache.commons.math3.transform.TransformType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public class FLEAEncoder extends Encoder {
+
+  private static final Logger logger = LoggerFactory.getLogger(FLEAEncoder.class);
+  private final TSDataType dataType;
+
+  private final List<Integer> valuesBuffer;
+  private final int blockSize;
+
+  private static final int DEFAULT_BLOCK_SIZE = 256;
+  private static final int MAX_BETA_CANDIDATES = 10;
+
+  public FLEAEncoder() {
+    this(TSDataType.INT32, DEFAULT_BLOCK_SIZE);
+  }
+
+  public FLEAEncoder(TSDataType dataType, int blockSize) {
+    super(TSEncoding.FLEA);
+    if (dataType != TSDataType.INT32) {
+      throw new UnsupportedOperationException(
+          "FLEAEncoder currently only supports INT32 data type.");
+    }
+    this.dataType = dataType;
+    this.valuesBuffer = new ArrayList<>();
+    this.blockSize = blockSize;
+    logger.info("FLEAEncoder initialized for INT32 with block size: {}", this.blockSize);
+  }
+
+  @Override
+  public void encode(int value, ByteArrayOutputStream out) {
+    valuesBuffer.add(value);
+    if (valuesBuffer.size() >= blockSize) {
+      try {
+        flush(out);
+      } catch (IOException e) {
+        logger.error("FLEAEncoder: IOException during flush operation for INT32.", e);
+      }
+    }
+  }
+
+  @Override
+  public void flush(ByteArrayOutputStream out) throws IOException {
+    if (valuesBuffer.isEmpty()) {
+      return;
+    }
+
+    logger.debug("FLEAEncoder: Flushing {} INT32 values.", valuesBuffer.size());
+
+    double[] blockData = valuesBuffer.stream().mapToDouble(Integer::doubleValue).toArray();
+    int n = blockData.length;
+
+    OptimalParams optimalParams = findOptimalBetaAndSplitPoints(blockData);
+
+    byte[] encodedData = performActualEncoding(blockData, optimalParams, n);
+
+    out.write(ByteBuffer.allocate(4).putInt(encodedData.length).array());
+    out.write(encodedData);
+
+    valuesBuffer.clear();
+  }
+
+  private static class OptimalParams {
+    int betaStar;
+    int pReStar;
+    int pImStar;
+    int dResidualStar;
+
+    OptimalParams(int beta, int pRe, int pIm) {
+      this.betaStar = beta;
+      this.pReStar = pRe;
+      this.pImStar = pIm;
+      this.dResidualStar = 0;
+    }
+  }
+
+  private OptimalParams findOptimalBetaAndSplitPoints(double[] blockData) {
+    logger.debug("Finding optimal beta and split points for block of size {}", blockData.length);
+
+    final int n = blockData.length;
+    if (n == 0) {
+      return new OptimalParams(0, 0, 0);
+    }
+    final FastFourierTransformer fft = new FastFourierTransformer(DftNormalization.STANDARD);
+    final Complex[] originalFreqComplex = fft.transform(blockData, TransformType.FORWARD);
+
+    final int n_half = (n / 2) + 1;
+
+    double minTotalCost = Double.MAX_VALUE;
+    OptimalParams bestParams = new OptimalParams(0, 0, 0);
+
+    for (int betaCandidate = 8; betaCandidate < 24; betaCandidate += 2) {
+
+      int[] quantizedFreqRe = new int[n_half];
+      int[] quantizedFreqIm = new int[n_half];
+      double quantizationFactor = Math.pow(2, betaCandidate);
+
+      for (int j = 0; j < n_half; j++) {
+        quantizedFreqRe[j] =
+            (int) Math.round(originalFreqComplex[j].getReal() / quantizationFactor);
+        quantizedFreqIm[j] =
+            (int) Math.round(originalFreqComplex[j].getImaginary() / quantizationFactor);
+      }
+
+      double costResidual =
+          estimateResidualCost(
+              n, originalFreqComplex, quantizedFreqRe, quantizedFreqIm, betaCandidate);
+
+      CostAndSplits freqCostResult = estimateFrequencyCost(quantizedFreqRe, quantizedFreqIm);
+
+      double currentTotalCost = costResidual + freqCostResult.cost;
+      logger.trace(
+          "For beta={}: FreqCost={}, ResidCost={}, TotalCost={}",
+          betaCandidate,
+          freqCostResult.cost,
+          costResidual,
+          currentTotalCost);
+
+      if (currentTotalCost < minTotalCost) {
+        minTotalCost = currentTotalCost;
+        bestParams.betaStar = betaCandidate;
+        bestParams.pReStar = freqCostResult.pRe;
+        bestParams.pImStar = freqCostResult.pIm;
+      }
+    }
+
+    logger.info(
+        "Found optimal parameters: beta={}, p_re={}, p_im={}",
+        bestParams.betaStar,
+        bestParams.pReStar,
+        bestParams.pImStar);
+
+    return bestParams;
+  }
+
+  private static class CostAndSplits {
+    double cost;
+    int pRe;
+    int pIm;
+
+    CostAndSplits(double c, int pr, int pi) {
+      this.cost = c;
+      this.pRe = pr;
+      this.pIm = pi;
+    }
+  }
+
+  private CostAndSplits estimateFrequencyCost(int[] quantizedFreqRe, int[] quantizedFreqIm) {
+    logger.debug("Estimating frequency cost...");
+
+    SplitResult reResult = findOptimalSplitForSequence(quantizedFreqRe);
+    SplitResult imResult = findOptimalSplitForSequence(quantizedFreqIm);
+
+    double totalCost = reResult.minCost + imResult.minCost;
+    return new CostAndSplits(totalCost, reResult.pStar, imResult.pStar);
+  }
+
+  private static class SplitResult {
+    double minCost;
+    int pStar;
+
+    SplitResult(double mc, int ps) {
+      this.minCost = mc;
+      this.pStar = ps;
+    }
+  }
+
+  private SplitResult findOptimalSplitForSequence(int[] sequence) {
+    logger.debug("Finding optimal split for sequence of length {}", sequence.length);
+    final int n = sequence.length;
+    if (n == 0) {
+      return new SplitResult(0, 0);
+    }
+
+    double[] costPrefixGVLE = new double[n + 1];
+    costPrefixGVLE[0] = 0;
+    for (int p = 1; p <= n; p++) {
+      costPrefixGVLE[p] = costGVLE(Arrays.copyOfRange(sequence, 0, p));
+    }
+
+    double[] costSuffixDBP = new double[n + 1];
+    costSuffixDBP[n] = 0;
+    for (int p = n - 1; p >= 0; p--) {
+      costSuffixDBP[p] = costDBP(Arrays.copyOfRange(sequence, p, n));
+    }
+
+    double minTotalCost = Double.MAX_VALUE;
+    int bestP = 0;
+    for (int p = 0; p <= n; p++) {
+      double currentTotalCost = costPrefixGVLE[p] + costSuffixDBP[p];
+      if (currentTotalCost < minTotalCost) {
+        minTotalCost = currentTotalCost;
+        bestP = p;
+      }
+    }
+
+    logger.trace("Found best split point p*={} with cost={}", bestP, minTotalCost);
+    return new SplitResult(minTotalCost, bestP);
+  }
+
+  private double estimateResidualCost(
+      int n, Complex[] originalFreq, int[] quantizedFreqRe, int[] quantizedFreqIm, int beta) {
+    logger.debug("Estimating residual cost for beta={}", beta);
+    double quantizationFactor = Math.pow(2, beta);
+    double eFreq = 0;
+
+    for (int j = 0; j < quantizedFreqRe.length; j++) {
+      double dequantReal = (double) quantizedFreqRe[j] * quantizationFactor;
+      double dequantImag = (double) quantizedFreqIm[j] * quantizationFactor;
+      double errorReal = dequantReal - originalFreq[j].getReal();
+      double errorImag = dequantImag - originalFreq[j].getImaginary();
+
+      double errorMagnitudeSq = errorReal * errorReal + errorImag * errorImag;
+
+      if (j > 0 && j < (n / 2)) {
+        eFreq += 2 * errorMagnitudeSq;
+      } else {
+        eFreq += errorMagnitudeSq;
+      }
+    }
+
+    if (n == 0 || eFreq < 1e-9) return 0;
+
+    double eR = (1.0 / n) * Math.sqrt(eFreq);
+    if (eR < 1e-9) eR = 1e-9;
+
+    int de = (int) Math.ceil(Math.log(eR) / Math.log(2));
+    int ds = de + 1;
+
+    return (double) n * (ds + 1);
+  }
+
+  private double costGVLE(int[] sequence) {
+    final int m = sequence.length;
+    if (m == 0) {
+      return 0;
+    }
+
+    int[] wSuffix = new int[m];
+    wSuffix[m - 1] =
+        (sequence[m - 1] == 0) ? 0 : (32 - Integer.numberOfLeadingZeros(Math.abs(sequence[m - 1])));
+    for (int i = m - 2; i >= 0; i--) {
+      int currentWidth =
+          (sequence[i] == 0) ? 0 : (32 - Integer.numberOfLeadingZeros(Math.abs(sequence[i])));
+      wSuffix[i] = Math.max(currentWidth, wSuffix[i + 1]);
+    }
+
+    double totalCost = 0;
+    final int HEADER_COST = 16;
+
+    int currentIdx = 0;
+    while (currentIdx < m) {
+      int sharedWidth = wSuffix[currentIdx];
+      int groupStartIdx = currentIdx;
+
+      while (currentIdx < m && wSuffix[currentIdx] == sharedWidth) {
+        currentIdx++;
+      }
+      int groupLength = currentIdx - groupStartIdx;
+
+      totalCost += HEADER_COST;
+      totalCost += (double) groupLength * (sharedWidth + 1);
+    }
+
+    return totalCost;
+  }
+
+  private double costDBP(int[] sequence) {
+    final int m_prime = sequence.length;
+    if (m_prime == 0) {
+      return 0;
+    }
+
+    final int pos_cost = (m_prime == 0) ? 0 : (32 - Integer.numberOfLeadingZeros(m_prime));
+    double totalCost = 0;
+
+    for (int val : sequence) {
+      if (val != 0) {
+        int absVal = Math.abs(val);
+        int val_cost = (absVal == 0) ? 0 : (32 - Integer.numberOfLeadingZeros(absVal));
+
+        totalCost += pos_cost + val_cost + 1;
+      }
+    }
+
+    return totalCost;
+  }
+
+  private byte[] performActualEncoding(double[] blockData, OptimalParams params, int n)
+      throws IOException {
+    logger.info(
+        "Performing actual encoding with beta={}, p_re={}, p_im={}",
+        params.betaStar,
+        params.pReStar,
+        params.pImStar);
+
+    final FastFourierTransformer fft = new FastFourierTransformer(DftNormalization.STANDARD);
+    final Complex[] freqComplex = fft.transform(blockData, TransformType.FORWARD);
+    final int n_half = (n / 2) + 1;
+    final double quantizationFactor = Math.pow(2, params.betaStar);
+
+    int[] F_hat_re = new int[n_half];
+    int[] F_hat_im = new int[n_half];
+    Complex[] F_hat_complex_dequant = new Complex[n];
+
+    for (int j = 0; j < n_half; j++) {
+      F_hat_re[j] = (int) Math.round(freqComplex[j].getReal() / quantizationFactor);
+      F_hat_im[j] = (int) Math.round(freqComplex[j].getImaginary() / quantizationFactor);
+
+      F_hat_complex_dequant[j] =
+          new Complex(
+              (double) F_hat_re[j] * quantizationFactor, (double) F_hat_im[j] * quantizationFactor);
+    }
+
+    for (int j = 1; j < n_half; j++) {
+      if (n - j < n) {
+        F_hat_complex_dequant[n - j] = F_hat_complex_dequant[j].conjugate();
+      }
+    }
+
+    final Complex[] reconstructedTimeComplex =
+        fft.transform(F_hat_complex_dequant, TransformType.INVERSE);
+    final int[] R_star = new int[n];
+    for (int k = 0; k < n; k++) {
+      R_star[k] = (int) Math.round(blockData[k] - reconstructedTimeComplex[k].getReal());
+    }
+
+    params.dResidualStar = findOptimalDForResiduals(R_star);
+    logger.debug("Found optimal D* for residuals: {}", params.dResidualStar);
+
+    ByteArrayOutputStream metadataBaos = new ByteArrayOutputStream();
+    ByteArrayOutputStream freqBaos = new ByteArrayOutputStream();
+    ByteArrayOutputStream residBaos = new ByteArrayOutputStream();
+
+    encodeMetadata(params, n, metadataBaos);
+    encodeFrequencyComponent(F_hat_re, F_hat_im, params.pReStar, params.pImStar, freqBaos);
+    encodeResidualComponent(R_star, params.dResidualStar, residBaos);
+
+    ByteArrayOutputStream finalBaos = new ByteArrayOutputStream();
+    finalBaos.write(metadataBaos.toByteArray());
+    finalBaos.write(freqBaos.toByteArray());
+    finalBaos.write(residBaos.toByteArray());
+
+    return finalBaos.toByteArray();
+  }
+
+  private void encodeMetadata(OptimalParams params, int n, ByteArrayOutputStream out)
+      throws IOException {
+    // IMPORTANT: 'n' must be encoded for the decoder.
+
+    // 5 integers * 4 bytes/int = 20 bytes
+    ByteBuffer metaBuffer = ByteBuffer.allocate(20);
+    metaBuffer.putInt(n); // Add 'n' to the metadata
+    metaBuffer.putInt(params.betaStar);
+    metaBuffer.putInt(params.pReStar);
+    metaBuffer.putInt(params.pImStar);
+    metaBuffer.putInt(params.dResidualStar);
+    out.write(metaBuffer.array());
+    logger.debug(
+        "Encoded metadata: n={}, beta={}, p_re={}, p_im={}, d_res={}",
+        n,
+        params.betaStar,
+        params.pReStar,
+        params.pImStar,
+        params.dResidualStar);
+  }
+
+  private int findOptimalDForResiduals(int[] residuals) {
+    int bestD = 0;
+    double minCost = Double.MAX_VALUE;
+
+    for (int d = 0; d < 16; d++) {
+      double currentCost = 0;
+      currentCost += (double) residuals.length * (d + 1);
+
+      List<Integer> highBitValues = new ArrayList<>();
+      int threshold = 1 << d;
+      for (int r : residuals) {
+        if (Math.abs(r) >= threshold) {
+          highBitValues.add(r >> d);
+        }
+      }
+
+      currentCost += costDBP(highBitValues.stream().mapToInt(i -> i).toArray());
+
+      if (currentCost < minCost) {
+        minCost = currentCost;
+        bestD = d;
+      }
+    }
+    return bestD;
+  }
+
+  private void encodeFrequencyComponent(
+      int[] F_re, int[] F_im, int p_re, int p_im, ByteArrayOutputStream out) throws IOException {
+    logger.debug("Encoding frequency components: p_re={}, p_im={}", p_re, p_im);
+
+    encodeGVLE(Arrays.copyOfRange(F_re, 0, p_re), out);
+    encodeDBP(Arrays.copyOfRange(F_re, p_re, F_re.length), out);
+
+    encodeGVLE(Arrays.copyOfRange(F_im, 0, p_im), out);
+    encodeDBP(Arrays.copyOfRange(F_im, p_im, F_im.length), out);
+  }
+
+  private void encodeGVLE(int[] sequence, ByteArrayOutputStream out) throws IOException {
+    final int m = sequence.length;
+    if (m == 0) {
+      return;
+    }
+    logger.trace("Actual GVLE encoding for seq length {}", m);
+
+    BitOutputStream bitOut = new BitOutputStream(out);
+
+    int[] wSuffix = new int[m];
+    wSuffix[m - 1] =
+        (sequence[m - 1] == 0) ? 0 : 32 - Integer.numberOfLeadingZeros(Math.abs(sequence[m - 1]));
+    for (int i = m - 2; i >= 0; i--) {
+      int currentWidth =
+          (sequence[i] == 0) ? 0 : 32 - Integer.numberOfLeadingZeros(Math.abs(sequence[i]));
+      wSuffix[i] = Math.max(currentWidth, wSuffix[i + 1]);
+    }
+
+    int currentIdx = 0;
+    while (currentIdx < m) {
+      int sharedWidth = wSuffix[currentIdx];
+      int groupStartIdx = currentIdx;
+      while (currentIdx < m && wSuffix[currentIdx] == sharedWidth) {
+        currentIdx++;
+      }
+      int groupLength = currentIdx - groupStartIdx;
+
+      bitOut.write(sharedWidth, 5);
+      bitOut.write(groupLength, 11);
+
+      if (sharedWidth > 0) {
+        for (int i = groupStartIdx; i < currentIdx; i++) {
+          int val = sequence[i];
+          bitOut.write(val >= 0 ? 0 : 1, 1);
+          bitOut.write(Math.abs(val), sharedWidth);
+        }
+      }
+    }
+
+    bitOut.flush();
+  }
+
+  private static class DBPValue {
+    int value;
+    int position;
+
+    DBPValue(int value, int position) {
+      this.value = value;
+      this.position = position;
+    }
+  }
+
+  private void encodeDBP(int[] sequence, ByteArrayOutputStream out) throws IOException {
+    final int m = sequence.length;
+    if (m == 0) {
+      return;
+    }
+    logger.trace("Actual DBP encoding for seq length {}", m);
+
+    List<DBPValue> nonZeros = new ArrayList<>();
+    for (int i = 0; i < m; i++) {
+      if (sequence[i] != 0) {
+        nonZeros.add(new DBPValue(sequence[i], i));
+      }
+    }
+
+    if (nonZeros.isEmpty()) {
+      return;
+    }
+
+    nonZeros.sort((a, b) -> Integer.compare(Math.abs(b.value), Math.abs(a.value)));
+
+    BitOutputStream bitOut = new BitOutputStream(out);
+
+    int countWidth = (m == 0) ? 0 : 32 - Integer.numberOfLeadingZeros(m);
+    bitOut.write(nonZeros.size(), 16);
+
+    int posWidth = (m == 0) ? 0 : 32 - Integer.numberOfLeadingZeros(m - 1);
+    if (posWidth == 0) posWidth = 1;
+
+    for (DBPValue dbpVal : nonZeros) {
+      bitOut.write(dbpVal.position, posWidth);
+    }
+
+    int firstVal = nonZeros.get(0).value;
+    int firstValAbs = Math.abs(firstVal);
+    int prevValWidth = (firstValAbs == 0) ? 1 : 32 - Integer.numberOfLeadingZeros(firstValAbs);
+
+    bitOut.write(prevValWidth, 5);
+
+    bitOut.write(firstVal >= 0 ? 0 : 1, 1);
+    bitOut.write(firstValAbs, prevValWidth);
+
+    for (int i = 1; i < nonZeros.size(); i++) {
+      int currentVal = nonZeros.get(i).value;
+      int currentValAbs = Math.abs(currentVal);
+
+      bitOut.write(currentVal >= 0 ? 0 : 1, 1);
+      bitOut.write(currentValAbs, prevValWidth);
+
+      prevValWidth = (currentValAbs == 0) ? 1 : 32 - Integer.numberOfLeadingZeros(currentValAbs);
+    }
+
+    bitOut.flush();
+  }
+
+  private void encodeResidualComponent(int[] residuals, int optimalD, ByteArrayOutputStream out)
+      throws IOException {
+    final int n = residuals.length;
+    if (n == 0) {
+      return;
+    }
+    logger.debug("Actual Hybrid Residual Encoding for {} residuals with optimalD={}", n, optimalD);
+
+    BitOutputStream bitOut = new BitOutputStream(out);
+
+    List<DBPValue> highBitNonZeros = new ArrayList<>();
+    int lowBitMask = (1 << optimalD) - 1;
+
+    for (int i = 0; i < n; i++) {
+      int r = residuals[i];
+      int absR = Math.abs(r);
+
+      bitOut.write(r >= 0 ? 0 : 1, 1);
+
+      if (optimalD > 0) {
+        bitOut.write(absR & lowBitMask, optimalD);
+      }
+
+      if (absR >= (1 << optimalD)) {
+        int highBitValue = absR >> optimalD;
+        int signedHighBitValue = (r < 0) ? -highBitValue : highBitValue;
+
+        highBitNonZeros.add(new DBPValue(signedHighBitValue, i));
+      }
+    }
+
+    bitOut.flush();
+
+    ByteArrayOutputStream dbpBaos = new ByteArrayOutputStream();
+
+    int[] highBitSequence = new int[n];
+    for (DBPValue dbpVal : highBitNonZeros) {
+      highBitSequence[dbpVal.position] = dbpVal.value;
+    }
+
+    encodeDBP(highBitSequence, dbpBaos);
+
+    out.write(dbpBaos.toByteArray());
+  }
+
+  @Override
+  public void encode(boolean value, ByteArrayOutputStream out) {
+    throw new UnsupportedOperationException("FLEA only supports INT32.");
+  }
+
+  @Override
+  public void encode(short value, ByteArrayOutputStream out) {
+    throw new UnsupportedOperationException("FLEA only supports INT32.");
+  }
+
+  @Override
+  public void encode(long value, ByteArrayOutputStream out) {
+    throw new UnsupportedOperationException("FLEA only supports INT32.");
+  }
+
+  @Override
+  public void encode(float value, ByteArrayOutputStream out) {
+    throw new UnsupportedOperationException("FLEA only supports INT32.");
+  }
+
+  @Override
+  public void encode(double value, ByteArrayOutputStream out) {
+    throw new UnsupportedOperationException("FLEA only supports INT32.");
+  }
+
+  @Override
+  public void encode(Binary value, ByteArrayOutputStream out) {
+    throw new UnsupportedOperationException("FLEA only supports INT32.");
+  }
+
+  @Override
+  public void encode(BigDecimal value, ByteArrayOutputStream out) {
+    throw new UnsupportedOperationException("FLEA only supports INT32.");
+  }
+
+  @Override
+  public int getOneItemMaxSize() {
+    if (dataType == TSDataType.INT32) {
+      return 4 + 1;
+    }
+    throw new UnsupportedOperationException(dataType.toString());
+  }
+
+  @Override
+  public long getMaxByteSize() {
+    return (long) blockSize * 4 + 64;
+  }
+
+  private static class BitOutputStream {
+
+    private final ByteArrayOutputStream byteOutput;
+    private int bitBuffer;
+    private int bitsInBuffer;
+
+    public BitOutputStream(ByteArrayOutputStream byteOutput) {
+      this.byteOutput = byteOutput;
+      this.bitBuffer = 0;
+      this.bitsInBuffer = 0;
+    }
+
+    public void write(int value, int numBits) throws IOException {
+      if (numBits <= 0 || numBits > 32) {
+        throw new IllegalArgumentException("Number of bits must be between 1 and 32.");
+      }
+
+      long mask = (1L << numBits) - 1;
+      bitBuffer = (bitBuffer << numBits) | ((int) value & (int) mask);
+      bitsInBuffer += numBits;
+
+      while (bitsInBuffer >= 8) {
+        int byteToWrite = (bitBuffer >> (bitsInBuffer - 8));
+        byteOutput.write(byteToWrite);
+
+        bitsInBuffer -= 8;
+      }
+    }
+
+    public void flush() throws IOException {
+      if (bitsInBuffer > 0) {
+        int byteToWrite = (bitBuffer << (8 - bitsInBuffer));
+        byteOutput.write(byteToWrite);
+
+        bitsInBuffer = 0;
+        bitBuffer = 0;
+      }
+    }
+  }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/encoding/encoder/PeriodEncoder.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/encoding/encoder/PeriodEncoder.java
deleted file mode 100644
index 10b094d..0000000
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/encoding/encoder/PeriodEncoder.java
+++ /dev/null
@@ -1,582 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.tsfile.encoding.encoder;
-
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
-
-import org.apache.commons.math3.complex.Complex;
-import org.apache.commons.math3.transform.DftNormalization;
-import org.apache.commons.math3.transform.FastFourierTransformer;
-import org.apache.commons.math3.transform.TransformType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-public class PeriodEncoder extends Encoder {
-
-  private static final Logger logger = LoggerFactory.getLogger(PeriodEncoder.class);
-  private TSDataType dataType;
-  private int maxStringLength;
-
-  private static Object[] compRound(Complex[] dataf, int beta) {
-    Complex[] newDataf = new Complex[dataf.length];
-    int[] ret = new int[dataf.length * 2];
-
-    for (int i = 0; i < dataf.length; i++) {
-      int a = (int) Math.round(dataf[i].getReal() / Math.pow(2, beta));
-      int b = (int) Math.round(dataf[i].getImaginary() / Math.pow(2, beta));
-      ret[2 * i] = a;
-      ret[2 * i + 1] = b;
-
-      double newA = a * Math.pow(2, beta);
-      double newB = b * Math.pow(2, beta);
-      newDataf[i] = new Complex(newA, newB);
-    }
-    return new Object[] {newDataf, ret};
-  }
-
-  private static final FastFourierTransformer transformer =
-      new FastFourierTransformer(DftNormalization.STANDARD);
-
-  private static Complex[] fft(double[] input) {
-    Complex[] transformed = transformer.transform(input, TransformType.FORWARD);
-    return transformed;
-  }
-
-  private static double[] ifft(Complex[] input) {
-
-    // 进行逆FFT,并除以长度进行归一化
-    Complex[] inverse = transformer.transform(input, TransformType.INVERSE);
-    double[] result = new double[input.length];
-    for (int i = 0; i < input.length; i++) {
-      result[i] = inverse[i].getReal() / input.length;
-    }
-
-    return result;
-  }
-
-  private static Complex[] rfft(double[] input) {
-    Complex[] transformed = transformer.transform(input, TransformType.FORWARD);
-
-    // 只保留一半的结果(正频率部分)
-    int n = input.length;
-    Complex[] rfftResult = new Complex[n / 2 + 1];
-    for (int i = 0; i < n / 2 + 1; i++) {
-      rfftResult[i] = transformed[i];
-    }
-
-    return rfftResult;
-  }
-
-  private static double[] irfft(Complex[] input, int n) {
-    Complex[] conjugates = new Complex[n];
-
-    // 构造逆FFT所需的复数数组
-    for (int i = 0; i < input.length; i++) {
-      if (i == 0 || (i == input.length - 1 && n % 2 == 0)) {
-        conjugates[i] = input[i];
-      } else {
-        conjugates[i] = input[i];
-        conjugates[n - i] = input[i].conjugate();
-      }
-    }
-
-    // 进行逆FFT,并除以长度进行归一化
-    Complex[] inverse = transformer.transform(conjugates, TransformType.INVERSE);
-    double[] result = new double[n];
-    for (int i = 0; i < n; i++) {
-      result[i] = inverse[i].getReal() / n;
-    }
-
-    return result;
-  }
-
-  // 计算多项式乘法
-  private static double[] polyMul(double[] p1, double[] p2) {
-    int n = (int) Math.pow(2, Math.ceil(Math.log(p1.length + p2.length - 1) / Math.log(2)));
-    p1 = padArray(p1, n);
-    p2 = padArray(p2, n);
-    Complex[] p1Transformed = fft(p1);
-    Complex[] p2Transformed = fft(p2);
-    Complex[] resultTransformed = new Complex[n];
-    for (int i = 0; i < n; i++) {
-      resultTransformed[i] = p1Transformed[i].multiply(p2Transformed[i]);
-    }
-    return ifft(resultTransformed);
-  }
-
-  // 数组填充
-  private static double[] padArray(double[] array, int length) {
-    double[] padded = new double[length];
-    System.arraycopy(array, 0, padded, 0, array.length);
-    return padded;
-  }
-
-  // 归一化
-  private static double[] normalize(double[] x) {
-    double mean = mean(x);
-    double std = std(x);
-    double[] normalized = new double[x.length];
-    for (int i = 0; i < x.length; i++) {
-      normalized[i] = (x[i] - mean) / std;
-    }
-    return normalized;
-  }
-
-  // 计算均值
-  private static double mean(double[] x) {
-    double sum = 0;
-    for (double v : x) {
-      sum += v;
-    }
-    return sum / x.length;
-  }
-
-  // 计算标准差
-  private static double std(double[] x) {
-    double mean = mean(x);
-    double sum = 0;
-    for (double v : x) {
-      sum += Math.pow(v - mean, 2);
-    }
-    return Math.sqrt(sum / x.length);
-  }
-
-  // 用fft加速计算自相关函数
-  private static double[] selfCorrFast(double[] x) {
-    x = normalize(x);
-    int N = x.length;
-    double[] xSelf = polyMul(x, reverseArray(x));
-    double[] result = new double[(int) (N * (2.0 / 3.0))];
-    for (int i = 0; i < result.length; i++) {
-      result[i] = xSelf[N - 1 - i] / (N - i);
-    }
-    return result;
-  }
-
-  // 数组反转
-  private static double[] reverseArray(double[] x) {
-    double[] reversed = new double[x.length];
-    for (int i = 0; i < x.length; i++) {
-      reversed[i] = x[x.length - 1 - i];
-    }
-    return reversed;
-  }
-
-  // 给出数组中是邻近k个内最大的数的下标数组
-  private static int[] pinkLocalMax(double[] data) {
-    final int k = 8; // 直接引入常数
-    int[] result = new int[data.length];
-    int count = 0;
-    for (int i = k; i < data.length - k; i++) {
-      double maxVal = data[i];
-      for (int j = Math.max(0, i - k); j < Math.min(data.length, i + k); j++) {
-        maxVal = Math.max(maxVal, data[j]);
-      }
-      if (maxVal == data[i]) {
-        result[count++] = i;
-      }
-    }
-    return Arrays.copyOf(result, count);
-  }
-
-  // 借助fft求出周期长度
-  private static int getPeriod(double[] data) {
-    final double p = 0.5; // 直接引入常数
-    double[] dataCorr = selfCorrFast(data);
-    int[] points = pinkLocalMax(dataCorr);
-    for (int point : points) {
-      if (point > 0 && dataCorr[point] > p) {
-        return point;
-      }
-    }
-    return 0;
-  }
-
-  static class ByteOutToys {
-    private final ByteArrayOutputStream outputStream;
-
-    public ByteOutToys(ByteArrayOutputStream outputStream) {
-      this.outputStream = outputStream;
-    }
-
-    public void encode(int value, int bits) throws IOException {
-      for (int i = bits - 1; i >= 0; i--) {
-        outputStream.write((value >> i) & 1);
-      }
-    }
-  }
-
-  private static final int MAX_SIZE = 0x7FFFFFFF;
-  private static final int MAX_VALUE = 0x7FFFFFFF;
-  private static final int GROUP_SIZE = 8;
-
-  private static int bitLength(int value) {
-    return Integer.SIZE - Integer.numberOfLeadingZeros(value);
-  }
-
-  private static int[] getCnt(int[] data) {
-    int[] cnt = new int[bitLength(MAX_VALUE) + 1];
-    for (int i = 0; i < data.length; i++) {
-      if (data[i] != 0) {
-        cnt[bitLength(Math.abs(data[i]))]++;
-      }
-    }
-    return cnt;
-  }
-
-  private static int[] bitLengthOrder(int[] data) {
-    int[] cnt = getCnt(data);
-    cnt[0] = 0;
-    for (int i = cnt.length - 2; i >= 0; i--) {
-      cnt[i] += cnt[i + 1];
-    }
-    int n = cnt[0];
-    int[] result = new int[n];
-    for (int i = data.length - 1; i >= 0; i--) {
-      if (data[i] != 0) {
-        result[cnt[bitLength(Math.abs(data[i]))] - 1] = i;
-        cnt[bitLength(Math.abs(data[i]))]--;
-      }
-    }
-    return result;
-  }
-
-  private static void descendingBitPacking(ByteOutToys stream, int[] data, boolean sgn)
-      throws IOException {
-    stream.encode(sgn ? 1 : 0, 1);
-    int[] index = bitLengthOrder(data);
-    stream.encode(data.length, bitLength(MAX_SIZE));
-    stream.encode(index.length, bitLength(MAX_SIZE));
-    if (index.length == 0) {
-      return;
-    }
-    for (int i = 0; i < index.length; i += GROUP_SIZE) {
-      int maxLen = 0;
-      for (int j = i; j < Math.min(i + GROUP_SIZE, index.length); j++) {
-        maxLen = Math.max(maxLen, bitLength(index[j]));
-      }
-      stream.encode(maxLen, bitLength(bitLength(MAX_SIZE)));
-      for (int j = i; j < Math.min(i + GROUP_SIZE, index.length); j++) {
-        stream.encode(index[j], maxLen);
-      }
-    }
-    int firstLen = bitLength(Math.abs(data[index[0]]));
-    int currentLen = firstLen;
-    stream.encode(firstLen, bitLength(bitLength(MAX_VALUE)));
-    for (int i : index) {
-      if (sgn) {
-        stream.encode(data[i] < 0 ? 1 : 0, 1);
-      }
-      stream.encode(Math.abs(data[i]), currentLen);
-      currentLen = bitLength(Math.abs(data[i]));
-    }
-  }
-
-  private static int descendingBitPackingEstimate(int[] cnt, int n, boolean sgn) {
-    int sum1 = 0;
-    int sum2 = 0;
-    for (int i = 1; i < cnt.length; i++) {
-      sum1 += cnt[i];
-      sum2 += cnt[i] * (i + (sgn ? 1 : 0));
-    }
-    return bitLength(n) * sum1 + sum2;
-  }
-
-  private static int calcSeparateStorageLength(int[] cnt, int n, int D) {
-    return n * (D + 1)
-        + descendingBitPackingEstimate(Arrays.copyOfRange(cnt, D, cnt.length), n, false);
-  }
-
-  private static int[] separateStorageEstimate(int[] data) {
-    int[] cnt = getCnt(data);
-    int result = calcSeparateStorageLength(cnt, data.length, 0);
-    int D = 0;
-    for (int current_D = 1; current_D <= bitLength(MAX_VALUE); current_D++) {
-      int tmp = calcSeparateStorageLength(cnt, data.length, current_D);
-      if (tmp < result) {
-        result = tmp;
-        D = current_D;
-      }
-    }
-    return new int[] {result, D};
-  }
-
-  private static void separateStorage(ByteOutToys stream, int[] data) throws IOException {
-    int[] result = separateStorageEstimate(data);
-    int n = data.length;
-    int D = result[1];
-
-    stream.encode(n, bitLength(MAX_SIZE));
-    stream.encode(D, bitLength(MAX_VALUE));
-
-    // low-bit part
-    for (int i : data) {
-      stream.encode(i < 0 ? 1 : 0, 1); // sgn bit
-      stream.encode(Math.abs(i) & ((1 << D) - 1), D); // low bits
-    }
-
-    // high-bit part
-    int[] highBits = new int[n];
-    for (int i = 0; i < n; i++) {
-      highBits[i] = Math.abs(data[i]) >> D;
-    }
-    descendingBitPacking(stream, highBits, false);
-  }
-
-  public static int[] getRes(int[] data, Complex[] dataf, int p, int k) {
-
-    double[] irfftResult = irfft(dataf, p);
-
-    // 进行四舍五入并缩放到整数
-    int[] rounded = new int[irfftResult.length];
-    for (int i = 0; i < irfftResult.length; i++) {
-      rounded[i] = (int) Math.round(irfftResult[i] / k);
-    }
-
-    // 复制 k 次并截取到与原始数据相同的长度
-    int[] repeated = new int[data.length];
-    for (int i = 0; i < data.length; i++) {
-      repeated[i] = rounded[i % rounded.length];
-    }
-
-    // 计算残差
-    int[] res = new int[data.length];
-    for (int i = 0; i < data.length; i++) {
-      res[i] = repeated[i] - data[i];
-    }
-
-    return res;
-  }
-
-  private static Object[] getDatafAndRes(int[] data, Complex[] dataf, int p, int k, int beta) {
-    Object[] roundResult = compRound(dataf, beta);
-    Complex[] roundedDataf = (Complex[]) roundResult[0];
-    int[] ret = (int[]) roundResult[1];
-
-    int[] res = getRes(data, roundedDataf, p, k);
-
-    return new Object[] {roundedDataf, ret, res};
-  }
-
-  private static void encodeWithBeta(
-      ByteOutToys stream, int[] data, Complex[] dataf, int p, int k, int beta) throws IOException {
-    Object[] result = getDatafAndRes(data, dataf, p, k, beta);
-    Complex[] roundedDataf = (Complex[]) result[0];
-    int[] ret = (int[]) result[1];
-    int[] res = (int[]) result[2];
-
-    descendingBitPacking(stream, ret, true);
-
-    int[] diffRes = new int[res.length];
-    diffRes[0] = res[0];
-    for (int i = 1; i < res.length; i++) {
-      diffRes[i] = res[i] - res[i - 1];
-    }
-
-    separateStorage(stream, diffRes);
-  }
-
-  private static int encodeWithBetaEstimate(int[] data, Complex[] dataf, int p, int k, int beta) {
-    Object[] roundResult = compRound(dataf, beta);
-    Complex[] roundedDataf = (Complex[]) roundResult[0];
-    int[] ret = (int[]) roundResult[1];
-
-    int maxLen = 0;
-    for (int x : ret) {
-      maxLen = Math.max(maxLen, bitLength(x));
-    }
-    if (maxLen > bitLength(MAX_VALUE)) {
-      return -1;
-    }
-
-    int result = descendingBitPackingEstimate(getCnt(ret), ret.length, true);
-
-    int[] res = getRes(data, roundedDataf, p, k);
-    int[] diffRes = new int[res.length];
-    diffRes[0] = res[0];
-    for (int i = 1; i < res.length; i++) {
-      diffRes[i] = res[i] - res[i - 1];
-    }
-    int[] separateResult = separateStorageEstimate(diffRes);
-    result += (int) separateResult[0];
-
-    return result;
-  }
-
-  private static int getBeta(int[] data, Complex[] dataf, int p, int k) {
-    int result = encodeWithBetaEstimate(data, dataf, p, k, 0);
-    int beta = 0;
-    for (int currentBeta = -bitLength(MAX_VALUE) / 2;
-        currentBeta <= bitLength(MAX_VALUE);
-        currentBeta++) {
-      if (currentBeta != 0) {
-        int tmp = encodeWithBetaEstimate(data, dataf, p, k, currentBeta);
-        if (result == -1 || (tmp != -1 && tmp < result)) {
-          result = tmp;
-          beta = currentBeta;
-        }
-      }
-    }
-    return beta;
-  }
-
-  private static void periodEncode(int[] data, ByteArrayOutputStream out) throws IOException {
-    ByteOutToys stream = new ByteOutToys(out);
-    double[] dataDouble = new double[data.length];
-    for (int i = 0; i < data.length; i++) dataDouble[i] = (double) data[i];
-    int p = getPeriod(dataDouble);
-    if (p == 0) {
-      stream.encode(p, bitLength(MAX_SIZE));
-      separateStorage(stream, data);
-    } else {
-      int k = (data.length + p - 1) / p;
-      int[] dataFull;
-      if (data.length % p == 0) {
-        dataFull = data;
-      } else {
-        dataFull = new int[p * k];
-        for (int i = 0; i < data.length; i++) dataFull[i] = data[i];
-        for (int i = data.length; i < p * k; i++) dataFull[i] = data[i - p];
-      }
-      double[] dataFullDouble = new double[dataFull.length];
-      for (int i = 0; i < dataFull.length; i++) dataFullDouble[i] = (double) dataFull[i];
-      Complex[] datafPre = rfft(dataFullDouble);
-      Complex[] dataf = new Complex[(datafPre.length + k - 1) / k];
-      for (int i = 0; i < dataf.length; i++) dataf[i] = datafPre[i * k];
-      int result = encodeWithBetaEstimate(data, dataf, p, k, 0);
-      int beta = getBeta(data, dataf, p, k);
-      stream.encode(p, bitLength(MAX_SIZE));
-      stream.encode(beta >= 0 ? 0 : 1, 1);
-      stream.encode(Math.abs(beta), bitLength(bitLength(MAX_VALUE)));
-      encodeWithBeta(stream, data, dataf, p, k, beta);
-    }
-    return;
-  }
-
-  List<Integer> data;
-
-  public PeriodEncoder() {
-    super(TSEncoding.PERIOD);
-    data = new ArrayList<>();
-    // this.dataType = dataType;
-    // this.maxStringLength = maxStringLength;
-  }
-
-  // @Override
-  // public void encode(boolean value, ByteArrayOutputStream out) {
-  // if (value) {
-  // out.write(1);
-  // } else {
-  // out.write(0);
-  // }
-  // }
-
-  // @Override
-  // public void encode(short value, ByteArrayOutputStream out) {
-  // out.write((value >> 8) & 0xFF);
-  // out.write(value & 0xFF);
-  // }
-
-  @Override
-  public void encode(int value, ByteArrayOutputStream out) {
-    data.add(value);
-    // ReadWriteForEncodingUtils.writeVarInt(value, out);
-  }
-
-  // @Override
-  // public void encode(long value, ByteArrayOutputStream out) {
-  // for (int i = 7; i >= 0; i--) {
-  // out.write((byte) (((value) >> (i * 8)) & 0xFF));
-  // }
-  // }
-
-  // @Override
-  // public void encode(float value, ByteArrayOutputStream out) {
-  // int floatInt = Float.floatToIntBits(value);
-  // out.write((floatInt >> 24) & 0xFF);
-  // out.write((floatInt >> 16) & 0xFF);
-  // out.write((floatInt >> 8) & 0xFF);
-  // out.write(floatInt & 0xFF);
-  // }
-
-  // @Override
-  // public void encode(double value, ByteArrayOutputStream out) {
-  // encode(Double.doubleToLongBits(value), out);
-  // }
-
-  // @Override
-  // public void encode(Binary value, ByteArrayOutputStream out) {
-  // try {
-  // // write the length of the bytes
-  // encode(value.getLength(), out);
-  // // write value
-  // out.write(value.getValues());
-  // } catch (IOException e) {
-  // logger.error(
-  // "tsfile-encoding PlainEncoder: error occurs when encode Binary value {}",
-  // value, e);
-  // }
-  // }
-
-  @Override
-  public void flush(ByteArrayOutputStream out) throws IOException {
-    periodEncode(data.stream().mapToInt(i -> i).toArray(), out);
-  }
-
-  @Override
-  public int getOneItemMaxSize() {
-    switch (dataType) {
-        // case BOOLEAN:
-        // return 1;
-      case INT32:
-        return 4;
-        // case INT64:
-        // return 8;
-        // case FLOAT:
-        // return 4;
-        // case DOUBLE:
-        // return 8;
-        // case TEXT:
-        // // refer to encode(Binary,ByteArrayOutputStream)
-        // return 4 + TSFileConfig.BYTE_SIZE_PER_CHAR * maxStringLength;
-      default:
-        throw new UnsupportedOperationException(dataType.toString());
-    }
-  }
-
-  @Override
-  public long getMaxByteSize() {
-    return 0;
-  }
-
-  // @Override
-  // public void encode(BigDecimal value, ByteArrayOutputStream out) {
-  // throw new TsFileEncodingException(
-  // "tsfile-encoding PlainEncoder: current version does not support BigDecimal
-  // value encoding");
-  // }
-}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/encoding/encoder/TSEncodingBuilder.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/encoding/encoder/TSEncodingBuilder.java
index b4febd2..fa5c6a4 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/encoding/encoder/TSEncodingBuilder.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/encoding/encoder/TSEncodingBuilder.java
@@ -73,8 +73,8 @@
         return new Freq();
       case ZIGZAG:
         return new Zigzag();
-      case PERIOD:
-        return new Period();
+      case FLEA:
+        return new Flea();
       default:
         throw new UnsupportedOperationException(type.toString());
     }
@@ -403,7 +403,7 @@
   }
 
   /** for all TSDataType. */
-  public static class Period extends TSEncodingBuilder {
+  public static class Flea extends TSEncodingBuilder {
     // private int maxStringLength =
     // TSFileDescriptor.getInstance().getConfig().getMaxStringLength();
 
@@ -411,9 +411,9 @@
     public Encoder getEncoder(TSDataType type) {
       switch (type) {
         case INT32:
-          return new PeriodEncoder();
+          return new FLEAEncoder();
         default:
-          throw new UnSupportedDataTypeException("PERIOD doesn't support data type: " + type);
+          throw new UnSupportedDataTypeException("FLEA doesn't support data type: " + type);
       }
     }
 
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/enums/TSEncoding.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/enums/TSEncoding.java
index 4f9d29b..8658a3b 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/enums/TSEncoding.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/enums/TSEncoding.java
@@ -30,7 +30,7 @@
   GORILLA((byte) 8),
   ZIGZAG((byte) 9),
   FREQ((byte) 10),
-  PERIOD((byte) 14);
+  FLEA((byte) 14);
 
   private final byte type;
 
@@ -73,7 +73,7 @@
       case 10:
         return TSEncoding.FREQ;
       case 14:
-        return TSEncoding.PERIOD;
+        return TSEncoding.FLEA;
       default:
         throw new IllegalArgumentException("Invalid input: " + encoding);
     }