| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.iotdb.jdbc; |
| |
| import org.apache.iotdb.common.rpc.thrift.TSStatus; |
| import org.apache.iotdb.rpc.DeepCopyRpcTransportFactory; |
| import org.apache.iotdb.rpc.RpcUtils; |
| import org.apache.iotdb.rpc.StatementExecutionException; |
| import org.apache.iotdb.service.rpc.thrift.IClientRPCService; |
| import org.apache.iotdb.service.rpc.thrift.ServerProperties; |
| import org.apache.iotdb.service.rpc.thrift.TSCloseSessionReq; |
| import org.apache.iotdb.service.rpc.thrift.TSOpenSessionReq; |
| import org.apache.iotdb.service.rpc.thrift.TSOpenSessionResp; |
| import org.apache.iotdb.service.rpc.thrift.TSProtocolVersion; |
| import org.apache.iotdb.service.rpc.thrift.TSSetTimeZoneReq; |
| |
| import org.apache.thrift.TException; |
| import org.apache.thrift.protocol.TBinaryProtocol; |
| import org.apache.thrift.protocol.TCompactProtocol; |
| import org.apache.thrift.transport.TTransport; |
| import org.apache.thrift.transport.TTransportException; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.sql.Array; |
| import java.sql.Blob; |
| import java.sql.CallableStatement; |
| import java.sql.ClientInfoStatus; |
| import java.sql.Clob; |
| import java.sql.Connection; |
| import java.sql.DatabaseMetaData; |
| import java.sql.NClob; |
| import java.sql.PreparedStatement; |
| import java.sql.ResultSet; |
| import java.sql.SQLClientInfoException; |
| import java.sql.SQLException; |
| import java.sql.SQLWarning; |
| import java.sql.SQLXML; |
| import java.sql.Savepoint; |
| import java.sql.Statement; |
| import java.sql.Struct; |
| import java.time.ZoneId; |
| import java.util.HashMap; |
| import java.util.Map; |
| import java.util.Properties; |
| import java.util.concurrent.Executor; |
| |
| public class IoTDBConnection implements Connection { |
| |
| private static final Logger logger = LoggerFactory.getLogger(IoTDBConnection.class); |
| private static final TSProtocolVersion protocolVersion = |
| TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V3; |
| private static final String NOT_SUPPORT_PREPARE_CALL = "Does not support prepareCall"; |
| private static final String NOT_SUPPORT_PREPARE_STATEMENT = "Does not support prepareStatement"; |
| private IClientRPCService.Iface client = null; |
| private long sessionId = -1; |
| private IoTDBConnectionParams params; |
| private boolean isClosed = true; |
| private SQLWarning warningChain = null; |
| private TTransport transport; |
| |
| /** |
| * Timeout of query can be set by users. Unit: s If not set, default value 0 will be used, which |
| * will use server configuration. |
| */ |
| private int queryTimeout = 0; |
| |
| /** |
| * ConnectionTimeout and SocketTimeout. Unit: ms. If not set, default value 0 will be used, which |
| * means that there's no timeout in the client side. |
| */ |
| private int networkTimeout = Config.DEFAULT_CONNECTION_TIMEOUT_MS; |
| |
| private ZoneId zoneId; |
| private boolean autoCommit; |
| private String url; |
| |
| public String getUserName() { |
| return userName; |
| } |
| |
| private String userName; |
| |
| public IoTDBConnection() { |
| // allowed to create an instance without parameter input. |
| } |
| |
| public IoTDBConnection(String url, Properties info) throws SQLException, TTransportException { |
| if (url == null) { |
| throw new IoTDBURLException("Input url cannot be null"); |
| } |
| params = Utils.parseUrl(url, info); |
| this.url = url; |
| this.userName = info.get("user").toString(); |
| this.networkTimeout = params.getNetworkTimeout(); |
| this.zoneId = ZoneId.of(params.getTimeZone()); |
| openTransport(); |
| if (Config.rpcThriftCompressionEnable) { |
| setClient(new IClientRPCService.Client(new TCompactProtocol(transport))); |
| } else { |
| setClient(new IClientRPCService.Client(new TBinaryProtocol(transport))); |
| } |
| // open client session |
| openSession(); |
| // Wrap the client with a thread-safe proxy to serialize the RPC calls |
| setClient(RpcUtils.newSynchronizedClient(getClient())); |
| autoCommit = false; |
| } |
| |
| public String getUrl() { |
| return url; |
| } |
| |
| @Override |
| public boolean isWrapperFor(Class<?> arg0) throws SQLException { |
| throw new SQLException("Does not support isWrapperFor"); |
| } |
| |
| @Override |
| public <T> T unwrap(Class<T> arg0) throws SQLException { |
| throw new SQLException("Does not support unwrap"); |
| } |
| |
| @Override |
| public void abort(Executor arg0) throws SQLException { |
| throw new SQLException("Does not support abort"); |
| } |
| |
| @Override |
| public void clearWarnings() { |
| warningChain = null; |
| } |
| |
| @Override |
| public void close() throws SQLException { |
| if (isClosed) { |
| return; |
| } |
| TSCloseSessionReq req = new TSCloseSessionReq(sessionId); |
| try { |
| getClient().closeSession(req); |
| } catch (TException e) { |
| throw new SQLException( |
| "Error occurs when closing session at server. Maybe server is down.", e); |
| } finally { |
| isClosed = true; |
| if (transport != null) { |
| transport.close(); |
| } |
| } |
| } |
| |
| @Override |
| public void commit() throws SQLException {} |
| |
| @Override |
| public Array createArrayOf(String arg0, Object[] arg1) throws SQLException { |
| throw new SQLException("Does not support createArrayOf"); |
| } |
| |
| @Override |
| public Blob createBlob() throws SQLException { |
| throw new SQLException("Does not support createBlob"); |
| } |
| |
| @Override |
| public Clob createClob() throws SQLException { |
| throw new SQLException("Does not support createClob"); |
| } |
| |
| @Override |
| public NClob createNClob() throws SQLException { |
| throw new SQLException("Does not suppport createNClob"); |
| } |
| |
| @Override |
| public SQLXML createSQLXML() throws SQLException { |
| throw new SQLException("Does not support createSQLXML"); |
| } |
| |
| @Override |
| public Statement createStatement() throws SQLException { |
| if (isClosed) { |
| throw new SQLException("Cannot create statement because connection is closed"); |
| } |
| return new IoTDBStatement(this, getClient(), sessionId, zoneId, queryTimeout); |
| } |
| |
| @Override |
| public Statement createStatement(int resultSetType, int resultSetConcurrency) |
| throws SQLException { |
| if (resultSetConcurrency != ResultSet.CONCUR_READ_ONLY) { |
| throw new SQLException( |
| String.format( |
| "Statements with result set concurrency %d are not supported", resultSetConcurrency)); |
| } |
| if (resultSetType == ResultSet.TYPE_SCROLL_SENSITIVE) { |
| throw new SQLException( |
| String.format("Statements with ResultSet type %d are not supported", resultSetType)); |
| } |
| return new IoTDBStatement(this, getClient(), sessionId, zoneId, queryTimeout); |
| } |
| |
| @Override |
| public Statement createStatement(int arg0, int arg1, int arg2) throws SQLException { |
| throw new SQLException("Does not support createStatement"); |
| } |
| |
| @Override |
| public Struct createStruct(String arg0, Object[] arg1) throws SQLException { |
| throw new SQLException("Does not support createStruct"); |
| } |
| |
| @Override |
| public boolean getAutoCommit() { |
| return autoCommit; |
| } |
| |
| @Override |
| public void setAutoCommit(boolean arg0) { |
| autoCommit = arg0; |
| } |
| |
| @Override |
| public String getCatalog() { |
| return "Apache IoTDB"; |
| } |
| |
| @Override |
| public void setCatalog(String arg0) throws SQLException { |
| throw new SQLException("Does not support setCatalog"); |
| } |
| |
| @Override |
| public Properties getClientInfo() throws SQLException { |
| throw new SQLException("Does not support getClientInfo"); |
| } |
| |
| @Override |
| public void setClientInfo(Properties arg0) throws SQLClientInfoException { |
| throw new SQLClientInfoException("Does not support setClientInfo", null); |
| } |
| |
| @Override |
| public String getClientInfo(String arg0) throws SQLException { |
| throw new SQLException("Does not support getClientInfo"); |
| } |
| |
| @Override |
| public int getHoldability() { |
| return 0; |
| } |
| |
| @Override |
| public void setHoldability(int arg0) throws SQLException { |
| throw new SQLException("Does not support setHoldability"); |
| } |
| |
| @Override |
| public DatabaseMetaData getMetaData() throws SQLException { |
| if (isClosed) { |
| throw new SQLException("Cannot create statement because connection is closed"); |
| } |
| return new IoTDBDatabaseMetadata(this, getClient(), sessionId); |
| } |
| |
| @Override |
| public int getNetworkTimeout() { |
| return networkTimeout; |
| } |
| |
| @Override |
| public String getSchema() throws SQLException { |
| throw new SQLException("Does not support getSchema"); |
| } |
| |
| @Override |
| public void setSchema(String arg0) throws SQLException { |
| throw new SQLException("Does not support setSchema"); |
| } |
| |
| @Override |
| public int getTransactionIsolation() { |
| return Connection.TRANSACTION_NONE; |
| } |
| |
| @Override |
| public void setTransactionIsolation(int arg0) throws SQLException { |
| throw new SQLException("Does not support setTransactionIsolation"); |
| } |
| |
| @Override |
| public Map<String, Class<?>> getTypeMap() throws SQLException { |
| throw new SQLException("Does not support getTypeMap"); |
| } |
| |
| @Override |
| public void setTypeMap(Map<String, Class<?>> arg0) throws SQLException { |
| throw new SQLException("Does not support setTypeMap"); |
| } |
| |
| @Override |
| public SQLWarning getWarnings() { |
| return warningChain; |
| } |
| |
| @Override |
| public boolean isClosed() { |
| return isClosed; |
| } |
| |
| @Override |
| public boolean isReadOnly() { |
| return false; |
| } |
| |
| @Override |
| public void setReadOnly(boolean readonly) throws SQLException { |
| if (readonly) { |
| throw new SQLException("Does not support readOnly"); |
| } |
| } |
| |
| @Override |
| public boolean isValid(int arg0) { |
| return !isClosed; |
| } |
| |
| @Override |
| public String nativeSQL(String arg0) throws SQLException { |
| throw new SQLException("Does not support nativeSQL"); |
| } |
| |
| @Override |
| public CallableStatement prepareCall(String arg0) throws SQLException { |
| throw new SQLException(NOT_SUPPORT_PREPARE_CALL); |
| } |
| |
| @Override |
| public CallableStatement prepareCall(String arg0, int arg1, int arg2) throws SQLException { |
| throw new SQLException(NOT_SUPPORT_PREPARE_CALL); |
| } |
| |
| @Override |
| public CallableStatement prepareCall(String arg0, int arg1, int arg2, int arg3) |
| throws SQLException { |
| throw new SQLException(NOT_SUPPORT_PREPARE_CALL); |
| } |
| |
| @Override |
| public PreparedStatement prepareStatement(String sql) throws SQLException { |
| return new IoTDBPreparedStatement(this, getClient(), sessionId, sql, zoneId); |
| } |
| |
| @Override |
| public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) throws SQLException { |
| throw new SQLException(NOT_SUPPORT_PREPARE_STATEMENT); |
| } |
| |
| @Override |
| public PreparedStatement prepareStatement(String sql, int[] columnIndexes) throws SQLException { |
| throw new SQLException(NOT_SUPPORT_PREPARE_STATEMENT); |
| } |
| |
| @Override |
| public PreparedStatement prepareStatement(String sql, String[] columnNames) throws SQLException { |
| throw new SQLException(NOT_SUPPORT_PREPARE_STATEMENT); |
| } |
| |
| @Override |
| public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency) |
| throws SQLException { |
| throw new SQLException(NOT_SUPPORT_PREPARE_STATEMENT); |
| } |
| |
| @Override |
| public PreparedStatement prepareStatement( |
| String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) |
| throws SQLException { |
| throw new SQLException(NOT_SUPPORT_PREPARE_STATEMENT); |
| } |
| |
| @Override |
| public void releaseSavepoint(Savepoint arg0) throws SQLException { |
| throw new SQLException("Does not support releaseSavepoint"); |
| } |
| |
| @Override |
| public void rollback() { |
| // do nothing in rollback |
| } |
| |
| @Override |
| public void rollback(Savepoint arg0) { |
| // do nothing in rollback |
| } |
| |
| @Override |
| public void setClientInfo(String name, String value) throws SQLClientInfoException { |
| if (name.equalsIgnoreCase("time_zone")) { |
| try { |
| setTimeZone(value); |
| } catch (TException | IoTDBSQLException e) { |
| throw new SQLClientInfoException("Set time_zone error: ", null, e); |
| } |
| } else { |
| HashMap<String, ClientInfoStatus> hashMap = new HashMap<>(); |
| hashMap.put(name, ClientInfoStatus.REASON_UNKNOWN_PROPERTY); |
| throw new SQLClientInfoException("Does not support this type of client info: ", hashMap); |
| } |
| } |
| |
| @Override |
| public void setNetworkTimeout(Executor arg0, int arg1) throws SQLException { |
| throw new SQLException("Does not support setNetworkTimeout"); |
| } |
| |
| public int getQueryTimeout() { |
| return this.queryTimeout; |
| } |
| |
| public void setQueryTimeout(int seconds) throws SQLException { |
| if (seconds < 0) { |
| throw new SQLException(String.format("queryTimeout %d must be >= 0!", seconds)); |
| } |
| this.queryTimeout = seconds; |
| } |
| |
| @Override |
| public Savepoint setSavepoint() throws SQLException { |
| throw new SQLException("Does not support setSavepoint"); |
| } |
| |
| @Override |
| public Savepoint setSavepoint(String arg0) throws SQLException { |
| throw new SQLException("Does not support setSavepoint"); |
| } |
| |
| public IClientRPCService.Iface getClient() { |
| return client; |
| } |
| |
| public long getSessionId() { |
| return sessionId; |
| } |
| |
| public void setClient(IClientRPCService.Iface client) { |
| this.client = client; |
| } |
| |
| private void openTransport() throws TTransportException { |
| DeepCopyRpcTransportFactory.setDefaultBufferCapacity(params.getThriftDefaultBufferSize()); |
| DeepCopyRpcTransportFactory.setThriftMaxFrameSize(params.getThriftMaxFrameSize()); |
| |
| if (params.isUseSSL()) { |
| transport = |
| DeepCopyRpcTransportFactory.INSTANCE.getTransport( |
| params.getHost(), |
| params.getPort(), |
| getNetworkTimeout(), |
| params.getTrustStore(), |
| params.getTrustStorePwd()); |
| } else { |
| transport = |
| DeepCopyRpcTransportFactory.INSTANCE.getTransport( |
| params.getHost(), params.getPort(), getNetworkTimeout()); |
| } |
| if (!transport.isOpen()) { |
| transport.open(); |
| } |
| } |
| |
| private void openSession() throws SQLException { |
| TSOpenSessionReq openReq = new TSOpenSessionReq(); |
| |
| openReq.setUsername(params.getUsername()); |
| openReq.setPassword(params.getPassword()); |
| openReq.setZoneId(getTimeZone()); |
| openReq.putToConfiguration("version", params.getVersion().toString()); |
| |
| TSOpenSessionResp openResp = null; |
| try { |
| openResp = client.openSession(openReq); |
| sessionId = openResp.getSessionId(); |
| // validate connection |
| RpcUtils.verifySuccess(openResp.getStatus()); |
| |
| if (protocolVersion.getValue() != openResp.getServerProtocolVersion().getValue()) { |
| logger.warn( |
| "Protocol differ, Client version is {}}, but Server version is {}", |
| protocolVersion.getValue(), |
| openResp.getServerProtocolVersion().getValue()); |
| if (openResp.getServerProtocolVersion().getValue() == 0) { // less than 0.10 |
| throw new TException( |
| String.format( |
| "Protocol not supported, Client version is %s, but Server version is %s", |
| protocolVersion.getValue(), openResp.getServerProtocolVersion().getValue())); |
| } |
| } |
| |
| } catch (TException e) { |
| transport.close(); |
| if (e.getMessage().contains("Required field 'client_protocol' was not present!")) { |
| // the server is an old version (less than 0.10) |
| throw new SQLException( |
| String.format( |
| "Can not establish connection with %s : You may try to connect an old version IoTDB instance using a client with new version: %s. ", |
| params.getJdbcUriString(), e.getMessage()), |
| e); |
| } |
| throw new SQLException( |
| String.format( |
| "Can not establish connection with %s : %s. ", |
| params.getJdbcUriString(), e.getMessage()), |
| e); |
| } catch (StatementExecutionException e) { |
| // failed to connect, disconnect from the server |
| transport.close(); |
| throw new IoTDBSQLException(e.getMessage(), openResp.getStatus()); |
| } |
| isClosed = false; |
| } |
| |
| boolean reconnect() { |
| boolean flag = false; |
| for (int i = 1; i <= Config.RETRY_NUM; i++) { |
| try { |
| if (transport != null) { |
| transport.close(); |
| openTransport(); |
| if (Config.rpcThriftCompressionEnable) { |
| setClient(new IClientRPCService.Client(new TCompactProtocol(transport))); |
| } else { |
| setClient(new IClientRPCService.Client(new TBinaryProtocol(transport))); |
| } |
| openSession(); |
| setClient(RpcUtils.newSynchronizedClient(getClient())); |
| flag = true; |
| break; |
| } |
| } catch (Exception e) { |
| try { |
| Thread.sleep(Config.RETRY_INTERVAL_MS); |
| } catch (InterruptedException e1) { |
| logger.error("reconnect is interrupted.", e1); |
| Thread.currentThread().interrupt(); |
| } |
| } |
| } |
| return flag; |
| } |
| |
| public String getTimeZone() { |
| if (zoneId == null) { |
| zoneId = ZoneId.systemDefault(); |
| } |
| return zoneId.toString(); |
| } |
| |
| public void setTimeZone(String timeZone) throws TException, IoTDBSQLException { |
| TSSetTimeZoneReq req = new TSSetTimeZoneReq(sessionId, timeZone); |
| TSStatus resp = getClient().setTimeZone(req); |
| try { |
| RpcUtils.verifySuccess(resp); |
| } catch (StatementExecutionException e) { |
| throw new IoTDBSQLException(e.getMessage(), resp); |
| } |
| this.zoneId = ZoneId.of(timeZone); |
| } |
| |
| public ServerProperties getServerProperties() throws TException { |
| return getClient().getProperties(); |
| } |
| } |