| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # |
| # Security patch for CVE-2026-XXXXX (path traversal vulnerability leading to RCE) |
| # Author: Mohammed Tanveer (threatpointer) |
| # Date: 2026-01-12 |
| # |
| |
| import logging |
| import datetime |
| import asyncio |
| import os |
| import uuid |
| import re |
| import pandas as pd |
| from typing import Dict, Any, List, Union |
| |
| from iotdb.Session import Session |
| from iotdb.SessionPool import SessionPool, PoolConfig |
| from iotdb.utils.SessionDataSet import SessionDataSet |
| from iotdb.table_session import TableSession |
| from iotdb.table_session_pool import TableSessionPool, TableSessionPoolConfig |
| from fastmcp import FastMCP |
| from mcp.types import TextContent |
| |
| from iotdb_mcp_server.config import Config |
| |
| # Initialize FastMCP server |
| mcp = FastMCP("iotdb_mcp_server") |
| |
| # Configure logging |
| logging.basicConfig( |
| level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" |
| ) |
| |
| logger = logging.getLogger("iotdb_mcp_server") |
| |
| config = Config.from_env_arguments() |
| |
| db_config = { |
| "host": config.host, |
| "port": config.port, |
| "user": config.user, |
| "password": config.password, |
| "database": config.database, |
| "sql_dialect": config.sql_dialect, |
| "export_path": config.export_path, |
| } |
| |
| max_pool_size = 100 # Increased from 100 for better concurrency |
| |
| logger.info(f"IoTDB Config: {db_config}") |
| |
| # Ensure export directory exists |
| if not os.path.exists(config.export_path): |
| try: |
| os.makedirs(config.export_path) |
| logger.info(f"Created export directory: {config.export_path}") |
| except Exception as e: |
| logger.warning( |
| f"Failed to create export directory {config.export_path}: {str(e)}" |
| ) |
| |
| |
| def sanitize_filename(filename: str, base_dir: str) -> str: |
| """ |
| Sanitize and validate filename to prevent path traversal attacks. |
| |
| Security patch for CVE-2026-XXXXX |
| Author: Mohammed Tanveer (threatpointer) |
| Date: 2026-01-12 |
| |
| Args: |
| filename: The user-provided filename |
| base_dir: The base directory for exports (must be absolute path) |
| |
| Returns: |
| The sanitized absolute filepath |
| |
| Raises: |
| ValueError: If the filename contains invalid characters or attempts path traversal |
| |
| Security measures: |
| - Rejects any path separators or traversal sequences before processing |
| - Validates allowed characters (alphanumeric, underscore, hyphen, dot) |
| - Resolves absolute path and verifies it stays within base_dir boundary |
| - Prevents directory traversal, symlink attacks, and path manipulation |
| """ |
| if not filename: |
| raise ValueError("Filename cannot be empty") |
| |
| # First, reject any path separators or traversal patterns before processing |
| # This catches attacks before os.path.basename can process them |
| if "/" in filename or "\\" in filename or ".." in filename: |
| raise ValueError( |
| "Invalid filename: path separators and directory traversal sequences are not allowed" |
| ) |
| |
| # Remove any directory components - only keep the base filename |
| # (This is now a safety check since we already blocked separators) |
| filename = os.path.basename(filename) |
| |
| # Validate characters - only allow alphanumeric, underscore, hyphen, and dot |
| if not re.match(r"^[a-zA-Z0-9_\-\.]+$", filename): |
| raise ValueError( |
| "Invalid filename: only alphanumeric characters, underscore, hyphen, and dot are allowed" |
| ) |
| |
| # Prevent filenames that are just dots or empty after sanitization |
| if not filename or filename in (".", ".."): |
| raise ValueError("Invalid filename") |
| |
| # Prevent filenames that start with a dot followed by a dot (like ..something) |
| if filename.startswith(".."): |
| raise ValueError("Invalid filename: cannot start with '..'") |
| |
| # Construct the full path |
| filepath = os.path.join(base_dir, filename) |
| |
| # Resolve to absolute path (resolves symlinks and normalizes path) |
| filepath_real = os.path.realpath(filepath) |
| basedir_real = os.path.realpath(base_dir) |
| |
| # Ensure the resolved path is within the base directory boundary |
| # This prevents path traversal even with symlinks or complex path manipulation |
| if ( |
| not filepath_real.startswith(basedir_real + os.sep) |
| and filepath_real != basedir_real |
| ): |
| raise ValueError( |
| f"Path traversal detected: file must be within export directory" |
| ) |
| |
| return filepath_real |
| |
| |
| if config.sql_dialect == "tree": |
| # Configure connection pool with optimized settings |
| pool_config = PoolConfig( |
| node_urls=[str(config.host) + ":" + str(config.port)], |
| user_name=config.user, |
| password=config.password, |
| fetch_size=1024, # Fetch size for queries |
| time_zone="UTC+8", # Consistent timezone |
| max_retry=3, # Connection retry attempts |
| ) |
| # Optimize pool size based on expected concurrent queries |
| wait_timeout_in_ms = 5000 # Increased from 3000 for better reliability |
| session_pool = SessionPool(pool_config, max_pool_size, wait_timeout_in_ms) |
| |
| @mcp.tool() |
| async def metadata_query(query_sql: str) -> list[TextContent]: |
| """Execute metadata queries on IoTDB to explore database structure and statistics. |
| |
| Args: |
| query_sql: The metadata query to execute. Supported queries: |
| - SHOW DATABASES [path]: List all databases or databases under a specific path |
| - SHOW TIMESERIES [path]: List all time series or time series under a specific path |
| - SHOW CHILD PATHS [path]: List child paths under a specific path |
| - SHOW CHILD NODES [path]: List child nodes under a specific path |
| - SHOW DEVICES [path]: List all devices or devices under a specific path |
| - COUNT TIMESERIES [path]: Count time series under a specific path |
| - COUNT NODES [path]: Count nodes under a specific path |
| - COUNT DEVICES [path]: Count devices under a specific path |
| - if path is not provided, the query will be applied to root.** |
| |
| Examples: |
| SHOW DATABASES root.** |
| SHOW TIMESERIES root.ln.** |
| SHOW CHILD PATHS root.ln |
| SHOW CHILD PATHS root.ln.*.* |
| SHOW CHILD NODES root.ln |
| SHOW DEVICES root.ln.** |
| COUNT TIMESERIES root.ln.** |
| COUNT NODES root.ln |
| COUNT DEVICES root.ln |
| """ |
| session = None |
| try: |
| session = session_pool.get_session() |
| stmt = query_sql.strip().upper() |
| |
| # Process SHOW DATABASES |
| if ( |
| stmt.startswith("SHOW DATABASES") |
| or stmt.startswith("SHOW TIMESERIES") |
| or stmt.startswith("SHOW CHILD PATHS") |
| or stmt.startswith("SHOW CHILD NODES") |
| or stmt.startswith("SHOW DEVICES") |
| or stmt.startswith("COUNT TIMESERIES") |
| or stmt.startswith("COUNT NODES") |
| or stmt.startswith("COUNT DEVICES") |
| ): |
| res = session.execute_query_statement(query_sql) |
| return prepare_res(res, session) |
| else: |
| session.close() |
| raise ValueError( |
| "Unsupported metadata query. Please use one of the supported query types." |
| ) |
| except Exception as e: |
| if session: |
| session.close() |
| logger.error(f"Failed to execute metadata query: {str(e)}") |
| raise |
| |
| @mcp.tool() |
| async def select_query(query_sql: str) -> list[TextContent]: |
| """Execute a SELECT query on the IoTDB tree SQL dialect. |
| |
| Args: |
| query_sql: The SQL query to execute (using TREE dialect, time using ISO 8601 format, e.g. 2017-11-01T00:08:00.000). |
| |
| SQL Syntax: |
| SELECT [LAST] selectExpr [, selectExpr] ... |
| [INTO intoItem [, intoItem] ...] |
| FROM prefixPath [, prefixPath] ... |
| [WHERE whereCondition] |
| [GROUP BY { |
| ([startTime, endTime), interval [, slidingStep]) | |
| LEVEL = levelNum [, levelNum] ... | |
| TAGS(tagKey [, tagKey] ... | |
| VARIATION(expression[,delta][,ignoreNull=true/false]) | |
| CONDITION(expression,[keep>/>=/=/</<=]threshold[,ignoreNull=true/false]) | |
| SESSION(timeInterval) | |
| COUNT(expression, size[,ignoreNull=true/false]) |
| }] |
| [HAVING havingCondition] |
| [ORDER BY sortKey {ASC | DESC}] |
| [FILL ({PREVIOUS | LINEAR | constant}) (, interval=DURATION_LITERAL)?)] |
| [SLIMIT seriesLimit] [SOFFSET seriesOffset] |
| [LIMIT rowLimit] [OFFSET rowOffset] |
| [ALIGN BY {TIME | DEVICE}] |
| |
| Examples: |
| select temperature from root.ln.wf01.wt01 where time < 2017-11-01T00:08:00.000 |
| select status, temperature from root.ln.wf01.wt01 where (time > 2017-11-01T00:05:00.000 and time < 2017-11-01T00:12:00.000) or (time >= 2017-11-01T16:35:00.000 and time <= 2017-11-01T16:37:00.000) |
| select * from root.ln.** where time > 1 order by time desc limit 10; |
| |
| Supported Aggregate Functions: |
| SUM |
| COUNT |
| MAX_VALUE |
| MIN_VALUE |
| AVG |
| VARIANCE |
| MAX_TIME |
| MIN_TIME |
| ... |
| """ |
| session = None |
| try: |
| session = session_pool.get_session() |
| stmt = query_sql.strip().upper() |
| |
| # Regular SELECT queries |
| if stmt.startswith("SELECT"): |
| res = session.execute_query_statement(query_sql) |
| return prepare_res(res, session) |
| else: |
| session.close() |
| raise ValueError("Only SELECT queries are allowed for select_query") |
| except Exception as e: |
| if session: |
| session.close() |
| logger.error(f"Failed to execute select query: {str(e)}") |
| raise |
| |
| @mcp.tool() |
| async def export_query( |
| query_sql: str, format: str = "csv", filename: str = None |
| ) -> list[TextContent]: |
| """Execute a query and export the results to a CSV or Excel file. |
| |
| Args: |
| query_sql: The SQL query to execute (using TREE dialect, time using ISO 8601 format, e.g. 2017-11-01T00:08:00.000) |
| format: Export format, either "csv" or "excel" (default: "csv") |
| filename: Optional filename for the exported file. If not provided, a unique filename will be generated. |
| |
| SQL Syntax: |
| SELECT ⟨select_list⟩ |
| FROM ⟨tables⟩ |
| [WHERE ⟨condition⟩] |
| [GROUP BY ⟨groups⟩] |
| [HAVING ⟨group_filter⟩] |
| [FILL ⟨fill_methods⟩] |
| [ORDER BY ⟨order_expression⟩] |
| [OFFSET ⟨n⟩] |
| [LIMIT ⟨n⟩]; |
| |
| Returns: |
| Information about the exported file and a preview of the data (max 10 rows) |
| """ |
| session = None |
| try: |
| session = session_pool.get_session() |
| stmt = query_sql.strip().upper() |
| |
| if stmt.startswith("SELECT") or stmt.startswith("SHOW"): |
| # Execute the query |
| res = session.execute_query_statement(query_sql) |
| |
| # Create a pandas DataFrame |
| df = res.todf() |
| # Close the session |
| session.close() |
| |
| # Generate unique filename with timestamp |
| timestamp = int(datetime.datetime.now().timestamp()) |
| if filename is None: |
| # Generate a unique filename if not provided |
| filename = f"dump_{uuid.uuid4().hex[:4]}_{timestamp}" |
| |
| if format.lower() == "csv": |
| if filename.lower().endswith(".csv"): |
| filename = filename[:-4] |
| # Sanitize filename to prevent path traversal attacks |
| filepath = sanitize_filename(f"{filename}.csv", config.export_path) |
| df.to_csv(filepath, index=False) |
| elif format.lower() == "excel": |
| if filename.lower().endswith(".xlsx"): |
| filename = filename[:-5] |
| # Sanitize filename to prevent path traversal attacks |
| filepath = sanitize_filename(f"{filename}.xlsx", config.export_path) |
| df.to_excel(filepath, index=False) |
| else: |
| raise ValueError("Format must be either 'csv' or 'excel'") |
| |
| # Generate preview (first 10 rows) |
| preview_rows = min(10, len(df)) |
| preview_data = [] |
| preview_data.append(",".join(df.columns)) # Header |
| |
| for i in range(preview_rows): |
| preview_data.append(",".join(map(str, df.iloc[i]))) |
| |
| # Return information |
| return [ |
| TextContent( |
| type="text", |
| text=f"Query results exported to {filepath}\n\nPreview (first {preview_rows} rows):\n" |
| + "\n".join(preview_data), |
| ) |
| ] |
| else: |
| raise ValueError("Only SELECT or SHOW queries are allowed for export") |
| except Exception as e: |
| if session: |
| session.close() |
| logger.error(f"Failed to export query: {str(e)}") |
| raise |
| |
| def prepare_res(_res: SessionDataSet, _session: Session) -> list[TextContent]: |
| columns = _res.get_column_names() |
| result = [] |
| while _res.has_next(): |
| record = _res.next() |
| if columns[0] == "Time": |
| timestamp = record.get_timestamp() |
| # formatted_time = datetime.datetime.fromtimestamp(timestamp/1000).strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] |
| row = record.get_fields() |
| result.append(str(timestamp) + "," + ",".join(map(str, row))) |
| else: |
| row = record.get_fields() |
| result.append(",".join(map(str, row))) |
| _session.close() |
| return [ |
| TextContent( |
| type="text", |
| text="\n".join([",".join(columns)] + result), |
| ) |
| ] |
| |
| elif config.sql_dialect == "table": |
| session_pool_config = TableSessionPoolConfig( |
| node_urls=[str(config.host) + ":" + str(config.port)], |
| username=config.user, |
| password=config.password, |
| max_pool_size=max_pool_size, # Increased from 5 for better concurrency |
| database=None if len(config.database) == 0 else config.database, |
| ) |
| session_pool = TableSessionPool(session_pool_config) |
| |
| @mcp.tool() |
| async def read_query(query_sql: str) -> list[TextContent]: |
| """Execute a SELECT query on the IoTDB. Please use table sql_dialect when generating SQL queries. |
| |
| Args: |
| query_sql: The SQL query to execute (using TABLE dialect, time using ISO 8601 format, e.g. 2017-11-01T00:08:00.000) |
| """ |
| table_session = None |
| try: |
| table_session = session_pool.get_session() |
| stmt = query_sql.strip().upper() |
| |
| # Regular SELECT queries |
| if ( |
| stmt.startswith("SELECT") |
| or stmt.startswith("DESCRIBE") |
| or stmt.startswith("SHOW") |
| ): |
| res = table_session.execute_query_statement(query_sql) |
| return prepare_res(res, table_session) |
| else: |
| table_session.close() |
| raise ValueError("Only SELECT queries are allowed for read_query") |
| except Exception as e: |
| if table_session: |
| table_session.close() |
| logger.error(f"Failed to execute query: {str(e)}") |
| raise |
| |
| @mcp.tool() |
| async def list_tables() -> list[TextContent]: |
| """List all tables in the IoTDB database.""" |
| table_session = None |
| try: |
| table_session = session_pool.get_session() |
| res = table_session.execute_query_statement("SHOW TABLES") |
| |
| result = ["Tables_in_" + db_config["database"]] # Header |
| while res.has_next(): |
| result.append(str(res.next().get_fields()[0])) |
| table_session.close() |
| return [TextContent(type="text", text="\n".join(result))] |
| except Exception as e: |
| if table_session: |
| table_session.close() |
| logger.error(f"Failed to list tables: {str(e)}") |
| raise |
| |
| @mcp.tool() |
| async def describe_table(table_name: str) -> list[TextContent]: |
| """Get the schema information for a specific table |
| Args: |
| table_name: name of the table to describe |
| """ |
| table_session = None |
| try: |
| table_session = session_pool.get_session() |
| res = table_session.execute_query_statement( |
| "DESC " + table_name + " details" |
| ) |
| return prepare_res(res, table_session) |
| except Exception as e: |
| if table_session: |
| table_session.close() |
| logger.error(f"Failed to describe table {table_name}: {str(e)}") |
| raise |
| |
| @mcp.tool() |
| async def export_table_query( |
| query_sql: str, format: str = "csv", filename: str = None |
| ) -> list[TextContent]: |
| """Execute a query and export the results to a CSV or Excel file. |
| |
| Args: |
| query_sql: The SQL query to execute (using TABLE dialect, time using ISO 8601 format, e.g. 2017-11-01T00:08:00.000) |
| format: Export format, either "csv" or "excel" (default: "csv") |
| filename: Optional filename for the exported file. If not provided, a unique filename will be generated. |
| |
| SQL Syntax: |
| SELECT ⟨select_list⟩ |
| FROM ⟨tables⟩ |
| [WHERE ⟨condition⟩] |
| [GROUP BY ⟨groups⟩] |
| [HAVING ⟨group_filter⟩] |
| [FILL ⟨fill_methods⟩] |
| [ORDER BY ⟨order_expression⟩] |
| [OFFSET ⟨n⟩] |
| [LIMIT ⟨n⟩]; |
| |
| Returns: |
| Information about the exported file and a preview of the data (max 10 rows) |
| """ |
| table_session = None |
| try: |
| table_session = session_pool.get_session() |
| stmt = query_sql.strip().upper() |
| |
| if ( |
| stmt.startswith("SELECT") |
| or stmt.startswith("SHOW") |
| or stmt.startswith("DESCRIBE") |
| or stmt.startswith("DESC") |
| ): |
| # Execute the query |
| res = table_session.execute_query_statement(query_sql) |
| |
| # Create a pandas DataFrame |
| df = res.todf() |
| |
| # Close the session |
| table_session.close() |
| |
| # Generate unique filename with timestamp |
| timestamp = int(datetime.datetime.now().timestamp()) |
| if filename is None: |
| filename = f"dump_{uuid.uuid4().hex[:4]}_{timestamp}" |
| |
| if format.lower() == "csv": |
| if filename.lower().endswith(".csv"): |
| filename = filename[:-4] |
| # Sanitize filename to prevent path traversal attacks |
| filepath = sanitize_filename(f"{filename}.csv", config.export_path) |
| df.to_csv(filepath, index=False) |
| elif format.lower() == "excel": |
| if filename.lower().endswith(".xlsx"): |
| filename = filename[:-5] |
| # Sanitize filename to prevent path traversal attacks |
| filepath = sanitize_filename(f"{filename}.xlsx", config.export_path) |
| df.to_excel(filepath, index=False) |
| else: |
| raise ValueError("Format must be either 'csv' or 'excel'") |
| |
| # Generate preview (first 10 rows) |
| preview_rows = min(10, len(df)) |
| preview_data = [] |
| preview_data.append(",".join(df.columns)) # Header |
| |
| for i in range(preview_rows): |
| preview_data.append(",".join(map(str, df.iloc[i]))) |
| |
| # Return information |
| return [ |
| TextContent( |
| type="text", |
| text=f"Query results exported to {filepath}\n\nPreview (first {preview_rows} rows):\n" |
| + "\n".join(preview_data), |
| ) |
| ] |
| else: |
| raise ValueError( |
| "Only SELECT, SHOW or DESCRIBE queries are allowed for export" |
| ) |
| except Exception as e: |
| if table_session: |
| table_session.close() |
| logger.error(f"Failed to export table query: {str(e)}") |
| raise |
| |
| def prepare_res( |
| _res: SessionDataSet, _table_session: TableSession |
| ) -> list[TextContent]: |
| columns = _res.get_column_names() |
| result = [] |
| while _res.has_next(): |
| row = _res.next().get_fields() |
| result.append(",".join(map(str, row))) |
| _table_session.close() |
| return [ |
| TextContent( |
| type="text", |
| text="\n".join([",".join(columns)] + result), |
| ) |
| ] |
| |
| |
| def main(): |
| logger.info("iotdb_mcp_server running with stdio transport") |
| # Initialize and run the server |
| mcp.run(transport="stdio") |
| |
| |
| if __name__ == "__main__": |
| main() |