| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.internal.jdbc.thin; |
| |
| import java.io.IOException; |
| import java.net.InetAddress; |
| import java.net.InetSocketAddress; |
| import java.net.SocketTimeoutException; |
| import java.sql.Array; |
| import java.sql.BatchUpdateException; |
| import java.sql.Blob; |
| import java.sql.CallableStatement; |
| import java.sql.Clob; |
| import java.sql.Connection; |
| import java.sql.DatabaseMetaData; |
| import java.sql.NClob; |
| import java.sql.PreparedStatement; |
| import java.sql.SQLClientInfoException; |
| import java.sql.SQLException; |
| import java.sql.SQLFeatureNotSupportedException; |
| import java.sql.SQLPermission; |
| import java.sql.SQLTimeoutException; |
| import java.sql.SQLWarning; |
| import java.sql.SQLXML; |
| import java.sql.Savepoint; |
| import java.sql.Statement; |
| import java.sql.Struct; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.IdentityHashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Properties; |
| import java.util.Random; |
| import java.util.Set; |
| import java.util.UUID; |
| import java.util.concurrent.ConcurrentSkipListMap; |
| import java.util.concurrent.Executor; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.ScheduledExecutorService; |
| import java.util.concurrent.ScheduledFuture; |
| import java.util.concurrent.Semaphore; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.concurrent.atomic.AtomicLong; |
| import java.util.logging.Level; |
| import java.util.logging.Logger; |
| import java.util.stream.Collectors; |
| import org.apache.ignite.IgniteCheckedException; |
| import org.apache.ignite.cache.query.QueryCancelledException; |
| import org.apache.ignite.internal.jdbc2.JdbcUtils; |
| import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; |
| import org.apache.ignite.internal.processors.cache.GridCacheUtils; |
| import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode; |
| import org.apache.ignite.internal.processors.odbc.ClientListenerResponse; |
| import org.apache.ignite.internal.processors.odbc.SqlStateCode; |
| import org.apache.ignite.internal.processors.odbc.jdbc.JdbcBulkLoadBatchRequest; |
| import org.apache.ignite.internal.processors.odbc.jdbc.JdbcCachePartitionsRequest; |
| import org.apache.ignite.internal.processors.odbc.jdbc.JdbcCachePartitionsResult; |
| import org.apache.ignite.internal.processors.odbc.jdbc.JdbcOrderedBatchExecuteRequest; |
| import org.apache.ignite.internal.processors.odbc.jdbc.JdbcOrderedBatchExecuteResult; |
| import org.apache.ignite.internal.processors.odbc.jdbc.JdbcQuery; |
| import org.apache.ignite.internal.processors.odbc.jdbc.JdbcQueryCancelRequest; |
| import org.apache.ignite.internal.processors.odbc.jdbc.JdbcQueryExecuteRequest; |
| import org.apache.ignite.internal.processors.odbc.jdbc.JdbcQueryExecuteResult; |
| import org.apache.ignite.internal.processors.odbc.jdbc.JdbcRequest; |
| import org.apache.ignite.internal.processors.odbc.jdbc.JdbcResponse; |
| import org.apache.ignite.internal.processors.odbc.jdbc.JdbcResultWithIo; |
| import org.apache.ignite.internal.processors.odbc.jdbc.JdbcStatementType; |
| import org.apache.ignite.internal.sql.command.SqlCommand; |
| import org.apache.ignite.internal.sql.command.SqlSetStreamingCommand; |
| import org.apache.ignite.internal.sql.optimizer.affinity.PartitionClientContext; |
| import org.apache.ignite.internal.sql.optimizer.affinity.PartitionResult; |
| import org.apache.ignite.internal.util.HostAndPortRange; |
| import org.apache.ignite.internal.util.future.GridFutureAdapter; |
| import org.apache.ignite.internal.util.typedef.F; |
| import org.apache.ignite.lang.IgniteProductVersion; |
| import org.jetbrains.annotations.Nullable; |
| |
| import static java.sql.ResultSet.CLOSE_CURSORS_AT_COMMIT; |
| import static java.sql.ResultSet.CONCUR_READ_ONLY; |
| import static java.sql.ResultSet.HOLD_CURSORS_OVER_COMMIT; |
| import static java.sql.ResultSet.TYPE_FORWARD_ONLY; |
| |
| /** |
| * JDBC connection implementation. |
| * |
| * See documentation of {@link org.apache.ignite.IgniteJdbcThinDriver} for details. |
| */ |
| public class JdbcThinConnection implements Connection { |
| /** Logger. */ |
| private static final Logger LOG = Logger.getLogger(JdbcThinConnection.class.getName()); |
| |
| /** Request timeout period. */ |
| private static final int REQUEST_TIMEOUT_PERIOD = 1_000; |
| |
| /** Reconnection period. */ |
| public static final int RECONNECTION_DELAY = 200; |
| |
| /** Reconnection maximum period. */ |
| private static final int RECONNECTION_MAX_DELAY = 300_000; |
| |
| /** Network timeout permission */ |
| private static final String SET_NETWORK_TIMEOUT_PERM = "setNetworkTimeout"; |
| |
| /** Zero timeout as query timeout means no timeout. */ |
| static final int NO_TIMEOUT = 0; |
| |
| /** Index generator. */ |
| private static final AtomicLong IDX_GEN = new AtomicLong(); |
| |
| /** Affinity awareness enabled flag. */ |
| private final boolean affinityAwareness; |
| |
| /** Statements modification mutex. */ |
| private final Object stmtsMux = new Object(); |
| |
| /** Schema name. */ |
| private String schema; |
| |
| /** Closed flag. */ |
| private volatile boolean closed; |
| |
| /** Current transaction isolation. */ |
| private int txIsolation; |
| |
| /** Auto-commit flag. */ |
| private boolean autoCommit; |
| |
| /** Read-only flag. */ |
| private boolean readOnly; |
| |
| /** Streaming flag. */ |
| private volatile StreamState streamState; |
| |
| /** Current transaction holdability. */ |
| private int holdability; |
| |
| /** Jdbc metadata. Cache the JDBC object on the first access */ |
| private JdbcThinDatabaseMetadata metadata; |
| |
| /** Connection properties. */ |
| private final ConnectionProperties connProps; |
| |
| /** The amount of potentially alive {@code JdbcThinTcpIo} instances - connections to server nodes. */ |
| private final AtomicInteger connCnt = new AtomicInteger(); |
| |
| /** Tracked statements to close on disconnect. */ |
| private final Set<JdbcThinStatement> stmts = Collections.newSetFromMap(new IdentityHashMap<>()); |
| |
| /** Affinity cache. */ |
| private AffinityCache affinityCache; |
| |
| /** Ignite endpoint. */ |
| private volatile JdbcThinTcpIo singleIo; |
| |
| /** Node Ids tp ignite endpoints. */ |
| private final ConcurrentSkipListMap<UUID, JdbcThinTcpIo> ios = new ConcurrentSkipListMap<>(); |
| |
| /** Server index. */ |
| private int srvIdx; |
| |
| /** Ignite server version. */ |
| private Thread ownThread; |
| |
| /** Mutex. */ |
| private final Object mux = new Object(); |
| |
| /** Ignite endpoint to use within transactional context. */ |
| private volatile JdbcThinTcpIo txIo; |
| |
| /** Random generator. */ |
| private static final Random RND = new Random(System.currentTimeMillis()); |
| |
| /** Network timeout. */ |
| private int netTimeout; |
| |
| /** Background periodical maintenance: query timeouts and reconnection handler. */ |
| private final ScheduledExecutorService maintenanceExecutor = Executors.newScheduledThreadPool(2); |
| |
| /** Cancelable future for query timeout task. */ |
| private ScheduledFuture<?> qryTimeoutScheduledFut; |
| |
| /** Cancelable future for connections handler task. */ |
| private ScheduledFuture<?> connectionsHndScheduledFut; |
| |
| /** |
| * Creates new connection. |
| * |
| * @param connProps Connection properties. |
| * @throws SQLException In case Ignite client failed to start. |
| */ |
| public JdbcThinConnection(ConnectionProperties connProps) throws SQLException { |
| this.connProps = connProps; |
| |
| holdability = HOLD_CURSORS_OVER_COMMIT; |
| autoCommit = true; |
| txIsolation = Connection.TRANSACTION_NONE; |
| |
| schema = JdbcUtils.normalizeSchema(connProps.getSchema()); |
| |
| affinityAwareness = connProps.isAffinityAwareness(); |
| |
| ensureConnected(); |
| |
| if (affinityAwareness) |
| connectionsHndScheduledFut = maintenanceExecutor.scheduleWithFixedDelay(new ConnectionHandlerTask(), |
| 0, RECONNECTION_DELAY, TimeUnit.MILLISECONDS); |
| } |
| |
| /** |
| * @throws SQLException On connection error. |
| */ |
| private void ensureConnected() throws SQLException { |
| if (connCnt.get() > 0) |
| return; |
| |
| assert !closed; |
| |
| assert ios.isEmpty(); |
| |
| if (affinityAwareness) |
| connectInBestEffortAffinityMode(); |
| else |
| connectInCommonMode(); |
| } |
| |
| /** |
| * @return Whether this connection is streamed or not. |
| */ |
| boolean isStream() { |
| return streamState != null; |
| } |
| |
| /** |
| * @param sql Statement. |
| * @param cmd Parsed form of {@code sql}. |
| * @param stmt Jdbc thin statement. |
| * @throws SQLException if failed. |
| */ |
| void executeNative(String sql, SqlCommand cmd, JdbcThinStatement stmt) throws SQLException { |
| if (cmd instanceof SqlSetStreamingCommand) { |
| SqlSetStreamingCommand cmd0 = (SqlSetStreamingCommand)cmd; |
| |
| // If streaming is already on, we have to close it first. |
| if (streamState != null) { |
| streamState.close(); |
| |
| streamState = null; |
| } |
| |
| boolean newVal = ((SqlSetStreamingCommand)cmd).isTurnOn(); |
| |
| ensureConnected(); |
| |
| JdbcThinTcpIo cliIo = cliIo(null); |
| |
| // Actual ON, if needed. |
| if (newVal) { |
| if (!cmd0.isOrdered() && !cliIo.isUnorderedStreamSupported()) { |
| throw new SQLException("Streaming without order doesn't supported by server [remoteNodeVer=" |
| + cliIo.igniteVersion() + ']', SqlStateCode.INTERNAL_ERROR); |
| } |
| |
| streamState = new StreamState((SqlSetStreamingCommand)cmd, cliIo); |
| |
| sendRequest(new JdbcQueryExecuteRequest(JdbcStatementType.ANY_STATEMENT_TYPE, |
| schema, 1, 1, autoCommit, sql, null), stmt, cliIo); |
| |
| streamState.start(); |
| } |
| } |
| else |
| throw IgniteQueryErrorCode.createJdbcSqlException("Unsupported native statement: " + sql, |
| IgniteQueryErrorCode.UNSUPPORTED_OPERATION); |
| } |
| |
| /** |
| * Add another query for batched execution. |
| * |
| * @param sql Query. |
| * @param args Arguments. |
| * @throws SQLException On error. |
| */ |
| void addBatch(String sql, List<Object> args) throws SQLException { |
| assert isStream(); |
| |
| streamState.addBatch(sql, args); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public Statement createStatement() throws SQLException { |
| return createStatement(TYPE_FORWARD_ONLY, CONCUR_READ_ONLY, HOLD_CURSORS_OVER_COMMIT); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public Statement createStatement(int resSetType, int resSetConcurrency) throws SQLException { |
| return createStatement(resSetType, resSetConcurrency, HOLD_CURSORS_OVER_COMMIT); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public Statement createStatement(int resSetType, int resSetConcurrency, |
| int resSetHoldability) throws SQLException { |
| ensureNotClosed(); |
| |
| checkCursorOptions(resSetType, resSetConcurrency); |
| |
| JdbcThinStatement stmt = new JdbcThinStatement(this, resSetHoldability, schema); |
| |
| synchronized (stmtsMux) { |
| stmts.add(stmt); |
| } |
| |
| return stmt; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public PreparedStatement prepareStatement(String sql) throws SQLException { |
| return prepareStatement(sql, TYPE_FORWARD_ONLY, CONCUR_READ_ONLY, HOLD_CURSORS_OVER_COMMIT); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public PreparedStatement prepareStatement(String sql, int resSetType, |
| int resSetConcurrency) throws SQLException { |
| return prepareStatement(sql, resSetType, resSetConcurrency, HOLD_CURSORS_OVER_COMMIT); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public PreparedStatement prepareStatement(String sql, int resSetType, int resSetConcurrency, |
| int resSetHoldability) throws SQLException { |
| ensureNotClosed(); |
| |
| checkCursorOptions(resSetType, resSetConcurrency); |
| |
| if (sql == null) |
| throw new SQLException("SQL string cannot be null."); |
| |
| JdbcThinPreparedStatement stmt = new JdbcThinPreparedStatement(this, sql, resSetHoldability, schema); |
| |
| synchronized (stmtsMux) { |
| stmts.add(stmt); |
| } |
| |
| return stmt; |
| } |
| |
| /** |
| * @param resSetType Cursor option. |
| * @param resSetConcurrency Cursor option. |
| * @throws SQLException If options unsupported. |
| */ |
| private void checkCursorOptions(int resSetType, int resSetConcurrency) throws SQLException { |
| if (resSetType != TYPE_FORWARD_ONLY) |
| throw new SQLFeatureNotSupportedException("Invalid result set type (only forward is supported)."); |
| |
| if (resSetConcurrency != CONCUR_READ_ONLY) |
| throw new SQLFeatureNotSupportedException("Invalid concurrency (updates are not supported)."); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public CallableStatement prepareCall(String sql) throws SQLException { |
| ensureNotClosed(); |
| |
| throw new SQLFeatureNotSupportedException("Callable functions are not supported."); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public CallableStatement prepareCall(String sql, int resSetType, int resSetConcurrency) |
| throws SQLException { |
| ensureNotClosed(); |
| |
| throw new SQLFeatureNotSupportedException("Callable functions are not supported."); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public String nativeSQL(String sql) throws SQLException { |
| ensureNotClosed(); |
| |
| if (sql == null) |
| throw new SQLException("SQL string cannot be null."); |
| |
| return sql; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void setAutoCommit(boolean autoCommit) throws SQLException { |
| ensureNotClosed(); |
| |
| // Do nothing if resulting value doesn't actually change. |
| if (autoCommit != this.autoCommit) { |
| doCommit(); |
| |
| this.autoCommit = autoCommit; |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean getAutoCommit() throws SQLException { |
| ensureNotClosed(); |
| |
| return autoCommit; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void commit() throws SQLException { |
| ensureNotClosed(); |
| |
| if (autoCommit) |
| throw new SQLException("Transaction cannot be committed explicitly in auto-commit mode."); |
| |
| doCommit(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void rollback() throws SQLException { |
| ensureNotClosed(); |
| |
| if (autoCommit) |
| throw new SQLException("Transaction cannot be rolled back explicitly in auto-commit mode."); |
| |
| try (Statement s = createStatement()) { |
| s.execute("ROLLBACK"); |
| } |
| } |
| |
| /** |
| * Send to the server {@code COMMIT} command. |
| * |
| * @throws SQLException if failed. |
| */ |
| private void doCommit() throws SQLException { |
| try (Statement s = createStatement()) { |
| s.execute("COMMIT"); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void close() throws SQLException { |
| if (isClosed()) |
| return; |
| |
| closed = true; |
| |
| maintenanceExecutor.shutdown(); |
| |
| if (streamState != null) { |
| streamState.close(); |
| |
| streamState = null; |
| } |
| |
| synchronized (stmtsMux) { |
| stmts.clear(); |
| } |
| |
| SQLException err = null; |
| |
| if (affinityAwareness) { |
| for (JdbcThinTcpIo clioIo : ios.values()) |
| clioIo.close(); |
| |
| ios.clear(); |
| } |
| else { |
| if (singleIo != null) |
| singleIo.close(); |
| } |
| |
| if (err != null) |
| throw err; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean isClosed() { |
| return closed; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public DatabaseMetaData getMetaData() throws SQLException { |
| ensureNotClosed(); |
| |
| if (metadata == null) |
| metadata = new JdbcThinDatabaseMetadata(this); |
| |
| return metadata; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void setReadOnly(boolean readOnly) throws SQLException { |
| ensureNotClosed(); |
| |
| this.readOnly = readOnly; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean isReadOnly() throws SQLException { |
| ensureNotClosed(); |
| |
| return readOnly; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void setCatalog(String catalog) throws SQLException { |
| ensureNotClosed(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public String getCatalog() throws SQLException { |
| ensureNotClosed(); |
| |
| return null; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void setTransactionIsolation(int level) throws SQLException { |
| ensureNotClosed(); |
| |
| switch (level) { |
| case Connection.TRANSACTION_READ_UNCOMMITTED: |
| case Connection.TRANSACTION_READ_COMMITTED: |
| case Connection.TRANSACTION_REPEATABLE_READ: |
| case Connection.TRANSACTION_SERIALIZABLE: |
| case Connection.TRANSACTION_NONE: |
| break; |
| |
| default: |
| throw new SQLException("Invalid transaction isolation level.", SqlStateCode.INVALID_TRANSACTION_LEVEL); |
| } |
| |
| txIsolation = level; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public int getTransactionIsolation() throws SQLException { |
| ensureNotClosed(); |
| |
| return txIsolation; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public SQLWarning getWarnings() throws SQLException { |
| ensureNotClosed(); |
| |
| return null; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void clearWarnings() throws SQLException { |
| ensureNotClosed(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public Map<String, Class<?>> getTypeMap() throws SQLException { |
| ensureNotClosed(); |
| |
| throw new SQLFeatureNotSupportedException("Types mapping is not supported."); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void setTypeMap(Map<String, Class<?>> map) throws SQLException { |
| ensureNotClosed(); |
| |
| throw new SQLFeatureNotSupportedException("Types mapping is not supported."); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void setHoldability(int holdability) throws SQLException { |
| ensureNotClosed(); |
| |
| if (holdability != HOLD_CURSORS_OVER_COMMIT && holdability != CLOSE_CURSORS_AT_COMMIT) |
| throw new SQLException("Invalid result set holdability value."); |
| |
| this.holdability = holdability; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public int getHoldability() throws SQLException { |
| ensureNotClosed(); |
| |
| return holdability; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public Savepoint setSavepoint() throws SQLException { |
| ensureNotClosed(); |
| |
| if (autoCommit) |
| throw new SQLException("Savepoint cannot be set in auto-commit mode."); |
| |
| throw new SQLFeatureNotSupportedException("Savepoints are not supported."); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public Savepoint setSavepoint(String name) throws SQLException { |
| ensureNotClosed(); |
| |
| if (name == null) |
| throw new SQLException("Savepoint name cannot be null."); |
| |
| if (autoCommit) |
| throw new SQLException("Savepoint cannot be set in auto-commit mode."); |
| |
| throw new SQLFeatureNotSupportedException("Savepoints are not supported."); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void rollback(Savepoint savepoint) throws SQLException { |
| ensureNotClosed(); |
| |
| if (savepoint == null) |
| throw new SQLException("Invalid savepoint."); |
| |
| if (autoCommit) |
| throw new SQLException("Auto-commit mode."); |
| |
| throw new SQLFeatureNotSupportedException("Savepoints are not supported."); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void releaseSavepoint(Savepoint savepoint) throws SQLException { |
| ensureNotClosed(); |
| |
| if (savepoint == null) |
| throw new SQLException("Savepoint cannot be null."); |
| |
| throw new SQLFeatureNotSupportedException("Savepoints are not supported."); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public CallableStatement prepareCall(String sql, int resSetType, int resSetConcurrency, |
| int resSetHoldability) throws SQLException { |
| ensureNotClosed(); |
| |
| throw new SQLFeatureNotSupportedException("Callable functions are not supported."); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) throws SQLException { |
| ensureNotClosed(); |
| |
| throw new SQLFeatureNotSupportedException("Auto generated keys are not supported."); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public PreparedStatement prepareStatement(String sql, int[] colIndexes) throws SQLException { |
| ensureNotClosed(); |
| |
| throw new SQLFeatureNotSupportedException("Auto generated keys are not supported."); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public PreparedStatement prepareStatement(String sql, String[] colNames) throws SQLException { |
| ensureNotClosed(); |
| |
| throw new SQLFeatureNotSupportedException("Auto generated keys are not supported."); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public Clob createClob() throws SQLException { |
| ensureNotClosed(); |
| |
| throw new SQLFeatureNotSupportedException("SQL-specific types are not supported."); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public Blob createBlob() throws SQLException { |
| ensureNotClosed(); |
| |
| throw new SQLFeatureNotSupportedException("SQL-specific types are not supported."); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public NClob createNClob() throws SQLException { |
| ensureNotClosed(); |
| |
| throw new SQLFeatureNotSupportedException("SQL-specific types are not supported."); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public SQLXML createSQLXML() throws SQLException { |
| ensureNotClosed(); |
| |
| throw new SQLFeatureNotSupportedException("SQL-specific types are not supported."); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean isValid(int timeout) throws SQLException { |
| if (timeout < 0) |
| throw new SQLException("Invalid timeout: " + timeout); |
| |
| return !closed; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void setClientInfo(String name, String val) throws SQLClientInfoException { |
| if (closed) |
| throw new SQLClientInfoException("Connection is closed.", null); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void setClientInfo(Properties props) throws SQLClientInfoException { |
| if (closed) |
| throw new SQLClientInfoException("Connection is closed.", null); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public String getClientInfo(String name) throws SQLException { |
| ensureNotClosed(); |
| |
| return null; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public Properties getClientInfo() throws SQLException { |
| ensureNotClosed(); |
| |
| return new Properties(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public Array createArrayOf(String typeName, Object[] elements) throws SQLException { |
| ensureNotClosed(); |
| |
| if (typeName == null) |
| throw new SQLException("Type name cannot be null."); |
| |
| throw new SQLFeatureNotSupportedException("SQL-specific types are not supported."); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public Struct createStruct(String typeName, Object[] attrs) throws SQLException { |
| ensureNotClosed(); |
| |
| if (typeName == null) |
| throw new SQLException("Type name cannot be null."); |
| |
| throw new SQLFeatureNotSupportedException("SQL-specific types are not supported."); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public <T> T unwrap(Class<T> iface) throws SQLException { |
| if (!isWrapperFor(iface)) |
| throw new SQLException("Connection is not a wrapper for " + iface.getName()); |
| |
| return (T)this; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean isWrapperFor(Class<?> iface) throws SQLException { |
| return iface != null && iface.isAssignableFrom(JdbcThinConnection.class); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void setSchema(String schema) throws SQLException { |
| ensureNotClosed(); |
| |
| this.schema = JdbcUtils.normalizeSchema(schema); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public String getSchema() throws SQLException { |
| ensureNotClosed(); |
| |
| return schema; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void abort(Executor executor) throws SQLException { |
| if (executor == null) |
| throw new SQLException("Executor cannot be null."); |
| |
| close(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void setNetworkTimeout(Executor executor, int ms) throws SQLException { |
| ensureNotClosed(); |
| |
| if (ms < 0) |
| throw new SQLException("Network timeout cannot be negative."); |
| |
| SecurityManager secMgr = System.getSecurityManager(); |
| |
| if (secMgr != null) |
| secMgr.checkPermission(new SQLPermission(SET_NETWORK_TIMEOUT_PERM)); |
| |
| netTimeout = ms; |
| |
| if (affinityAwareness) { |
| for (JdbcThinTcpIo clioIo : ios.values()) |
| clioIo.timeout(ms); |
| } |
| else |
| singleIo.timeout(ms); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public int getNetworkTimeout() throws SQLException { |
| ensureNotClosed(); |
| |
| return netTimeout; |
| } |
| |
| /** |
| * Ensures that connection is not closed. |
| * |
| * @throws SQLException If connection is closed. |
| */ |
| public void ensureNotClosed() throws SQLException { |
| if (closed) |
| throw new SQLException("Connection is closed.", SqlStateCode.CONNECTION_CLOSED); |
| } |
| |
| /** |
| * @return Ignite server version. |
| */ |
| IgniteProductVersion igniteVersion() { |
| // TODO: IGNITE-11321: JDBC Thin: implement nodes multi version support. |
| return cliIo(null).igniteVersion(); |
| } |
| |
| /** |
| * @return Auto close server cursors flag. |
| */ |
| boolean autoCloseServerCursor() { |
| return connProps.isAutoCloseServerCursor(); |
| } |
| |
| /** |
| * Send request for execution via corresponding singleIo from {@link #ios}. |
| * |
| * @param req Request. |
| * @return Server response. |
| * @throws SQLException On any error. |
| */ |
| JdbcResultWithIo sendRequest(JdbcRequest req) throws SQLException { |
| return sendRequest(req, null, null); |
| } |
| |
| /** |
| * Send request for execution via corresponding singleIo from {@link #ios} or sticky singleIo. |
| * |
| * @param req Request. |
| * @param stmt Jdbc thin statement. |
| * @param stickyIo Sticky ignite endpoint. |
| * @return Server response. |
| * @throws SQLException On any error. |
| */ |
| JdbcResultWithIo sendRequest(JdbcRequest req, JdbcThinStatement stmt, @Nullable JdbcThinTcpIo stickyIo) |
| throws SQLException { |
| ensureConnected(); |
| |
| RequestTimeoutTask reqTimeoutTask = null; |
| |
| synchronized (mux) { |
| if (ownThread != null) { |
| throw new SQLException("Concurrent access to JDBC connection is not allowed" |
| + " [ownThread=" + ownThread.getName() |
| + ", curThread=" + Thread.currentThread().getName(), SqlStateCode.CONNECTION_FAILURE); |
| } |
| |
| ownThread = Thread.currentThread(); |
| } |
| try { |
| JdbcThinTcpIo cliIo = null; |
| try { |
| cliIo = (stickyIo == null || !stickyIo.connected()) ? cliIo(calculateNodeIds(req)) : stickyIo; |
| |
| if (stmt != null && stmt.requestTimeout() != NO_TIMEOUT) { |
| reqTimeoutTask = new RequestTimeoutTask( |
| req instanceof JdbcBulkLoadBatchRequest ? stmt.currentRequestId() : req.requestId(), |
| cliIo, |
| stmt.requestTimeout()); |
| |
| qryTimeoutScheduledFut = maintenanceExecutor.scheduleAtFixedRate(reqTimeoutTask, 0, |
| REQUEST_TIMEOUT_PERIOD, TimeUnit.MILLISECONDS); |
| } |
| |
| JdbcQueryExecuteRequest qryReq = null; |
| |
| if (req instanceof JdbcQueryExecuteRequest) |
| qryReq = (JdbcQueryExecuteRequest)req; |
| |
| JdbcResponse res = cliIo.sendRequest(req, stmt); |
| |
| txIo = res.activeTransaction() ? cliIo : null; |
| |
| if (res.status() == IgniteQueryErrorCode.QUERY_CANCELED && stmt != null && |
| stmt.requestTimeout() != NO_TIMEOUT && reqTimeoutTask != null && |
| reqTimeoutTask.expired.get()) { |
| |
| throw new SQLTimeoutException(QueryCancelledException.ERR_MSG, SqlStateCode.QUERY_CANCELLED, |
| IgniteQueryErrorCode.QUERY_CANCELED); |
| } |
| else if (res.status() != ClientListenerResponse.STATUS_SUCCESS) |
| throw new SQLException(res.error(), IgniteQueryErrorCode.codeToSqlState(res.status()), |
| res.status()); |
| |
| updateAffinityCache(qryReq, res); |
| |
| return new JdbcResultWithIo(res.response(), cliIo); |
| } |
| catch (SQLException e) { |
| throw e; |
| } |
| catch (Exception e) { |
| onDisconnect(cliIo); |
| |
| if (e instanceof SocketTimeoutException) |
| throw new SQLException("Connection timed out.", SqlStateCode.CONNECTION_FAILURE, e); |
| else |
| throw new SQLException("Failed to communicate with Ignite cluster.", |
| SqlStateCode.CONNECTION_FAILURE, e); |
| } |
| finally { |
| if (stmt != null && stmt.requestTimeout() != NO_TIMEOUT && reqTimeoutTask != null) |
| qryTimeoutScheduledFut.cancel(false); |
| } |
| } |
| finally { |
| synchronized (mux) { |
| ownThread = null; |
| } |
| } |
| } |
| |
| /** |
| * Calculate node UUIDs. |
| * |
| * @param req Jdbc request for which we'll try to calculate node id. |
| * @return node UUID or null if failed to calculate. |
| * @throws IOException If Exception occurred during the network partition distribution retrieval. |
| * @throws SQLException If Failed to calculate derived partitions. |
| */ |
| @Nullable private List<UUID> calculateNodeIds(JdbcRequest req) throws IOException, SQLException { |
| if (!affinityAwareness || !(req instanceof JdbcQueryExecuteRequest)) |
| return null; |
| |
| JdbcQueryExecuteRequest qry = (JdbcQueryExecuteRequest)req; |
| |
| if (affinityCache == null) { |
| qry.partitionResponseRequest(true); |
| |
| return null; |
| } |
| |
| JdbcThinPartitionResultDescriptor partResDesc = affinityCache.partitionResult( |
| new QualifiedSQLQuery(qry.schemaName(), qry.sqlQuery())); |
| |
| // Value is empty. |
| if (partResDesc == JdbcThinPartitionResultDescriptor.EMPTY_DESCRIPTOR) |
| return null; |
| |
| // Key is missing. |
| if (partResDesc == null) { |
| qry.partitionResponseRequest(true); |
| |
| return null; |
| } |
| |
| Collection<Integer> parts = calculatePartitions(partResDesc, qry.arguments()); |
| |
| if (parts == null || parts.isEmpty()) |
| return null; |
| |
| UUID[] cacheDistr = retrieveCacheDistribution(partResDesc.cacheId(), |
| partResDesc.partitionResult().partitionsCount()); |
| |
| if (parts.size() == 1) |
| return Collections.singletonList(cacheDistr[parts.iterator().next()]); |
| else { |
| List<UUID> affinityAwarenessNodeIds = new ArrayList<>(); |
| |
| for (int part : parts) |
| affinityAwarenessNodeIds.add(cacheDistr[part]); |
| |
| return affinityAwarenessNodeIds; |
| } |
| } |
| |
| /** |
| * Retrieve cache distribution for specified cache Id. |
| * |
| * @param cacheId Cache Id. |
| * @param partCnt Partitions count. |
| * @return Partitions cache distribution. |
| * @throws IOException If Exception occurred during the network partition distribution retrieval. |
| */ |
| private UUID[] retrieveCacheDistribution(int cacheId, int partCnt) throws IOException { |
| UUID[] cacheDistr = affinityCache.cacheDistribution(cacheId); |
| |
| if (cacheDistr != null) |
| return cacheDistr; |
| |
| JdbcResponse res; |
| |
| res = cliIo(null).sendRequest(new JdbcCachePartitionsRequest(Collections.singleton(cacheId)), |
| null); |
| |
| assert res.status() == ClientListenerResponse.STATUS_SUCCESS; |
| |
| AffinityTopologyVersion resAffinityVer = res.affinityVersion(); |
| |
| if (affinityCache.version().compareTo(resAffinityVer) < 0) |
| affinityCache = new AffinityCache(resAffinityVer); |
| else if (affinityCache.version().compareTo(resAffinityVer) > 0) { |
| // Jdbc thin affinity cache is binded to the newer affinity topology version, so we should ignore retrieved |
| // partition distribution. Given situation might occur in case of concurrent race and is not |
| // possible in single-threaded jdbc thin client, so it's a reserve for the future. |
| return null; |
| } |
| |
| List<JdbcThinAffinityAwarenessMappingGroup> mappings = |
| ((JdbcCachePartitionsResult)res.response()).getMappings(); |
| |
| // Despite the fact that, at this moment, we request partition distribution only for one cache, |
| // we might retrieve multiple caches but exactly with same distribution. |
| assert mappings.size() == 1; |
| |
| JdbcThinAffinityAwarenessMappingGroup mappingGrp = mappings.get(0); |
| |
| cacheDistr = mappingGrp.revertMappings(partCnt); |
| |
| for (int mpCacheId : mappingGrp.cacheIds()) |
| affinityCache.addCacheDistribution(mpCacheId, cacheDistr); |
| |
| return cacheDistr; |
| } |
| |
| /** |
| * Calculate partitions for the query. |
| * |
| * @param partResDesc Partition result descriptor. |
| * @param args Arguments. |
| * @return Calculated partitions or {@code null} if failed to calculate and there should be a broadcast. |
| * @throws SQLException If Failed to calculate derived partitions. |
| */ |
| public static Collection<Integer> calculatePartitions(JdbcThinPartitionResultDescriptor partResDesc, Object[] args) |
| throws SQLException { |
| PartitionResult derivedParts = partResDesc.partitionResult(); |
| |
| if (derivedParts != null) { |
| try { |
| return derivedParts.tree().apply(partResDesc.partitionClientContext(), args); |
| } |
| catch (IgniteCheckedException e) { |
| throw new SQLException("Failed to calculate derived partitions for query.", |
| SqlStateCode.INTERNAL_ERROR); |
| } |
| } |
| |
| return null; |
| } |
| |
| /** |
| * Send request for execution via corresponding singleIo from {@link #ios} or sticky singleIo. |
| * Response is waited at the separate thread (see {@link StreamState#asyncRespReaderThread}). |
| * |
| * @param req Request. |
| * @throws SQLException On any error. |
| */ |
| void sendQueryCancelRequest(JdbcQueryCancelRequest req, JdbcThinTcpIo cliIo) throws SQLException { |
| if (connCnt.get() == 0) |
| throw new SQLException("Failed to communicate with Ignite cluster.", SqlStateCode.CONNECTION_FAILURE); |
| |
| assert cliIo != null; |
| |
| try { |
| cliIo.sendCancelRequest(req); |
| } |
| catch (Exception e) { |
| throw new SQLException("Failed to communicate with Ignite cluster.", SqlStateCode.CONNECTION_FAILURE, e); |
| } |
| } |
| |
| /** |
| * Send request for execution via corresponding singleIo from {@link #ios} or sticky singleIo. |
| * Response is waited at the separate thread (see {@link StreamState#asyncRespReaderThread}). |
| * |
| * @param req Request. |
| * @param stickyIO Sticky ignite endpoint. |
| * @throws SQLException On any error. |
| */ |
| private void sendRequestNotWaitResponse(JdbcOrderedBatchExecuteRequest req, JdbcThinTcpIo stickyIO) |
| throws SQLException { |
| ensureConnected(); |
| |
| synchronized (mux) { |
| if (ownThread != null) { |
| throw new SQLException("Concurrent access to JDBC connection is not allowed" |
| + " [ownThread=" + ownThread.getName() |
| + ", curThread=" + Thread.currentThread().getName(), SqlStateCode.CONNECTION_FAILURE); |
| } |
| |
| ownThread = Thread.currentThread(); |
| } |
| |
| try { |
| stickyIO.sendBatchRequestNoWaitResponse(req); |
| } |
| catch (SQLException e) { |
| throw e; |
| } |
| catch (Exception e) { |
| onDisconnect(stickyIO); |
| |
| if (e instanceof SocketTimeoutException) |
| throw new SQLException("Connection timed out.", SqlStateCode.CONNECTION_FAILURE, e); |
| else |
| throw new SQLException("Failed to communicate with Ignite cluster.", |
| SqlStateCode.CONNECTION_FAILURE, e); |
| } |
| finally { |
| synchronized (mux) { |
| ownThread = null; |
| } |
| } |
| } |
| |
| /** |
| * @return Connection URL. |
| */ |
| public String url() { |
| return connProps.getUrl(); |
| } |
| |
| /** |
| * Called on IO disconnect: close the client IO and opened statements. |
| */ |
| private void onDisconnect(JdbcThinTcpIo cliIo) { |
| assert connCnt.get() > 0; |
| |
| if (affinityAwareness) { |
| cliIo.close(); |
| |
| ios.remove(cliIo.nodeId()); |
| } |
| else { |
| if (singleIo != null) |
| singleIo.close(); |
| } |
| |
| connCnt.decrementAndGet(); |
| |
| if (streamState != null) { |
| streamState.close0(); |
| |
| streamState = null; |
| } |
| |
| synchronized (stmtsMux) { |
| for (JdbcThinStatement s : stmts) |
| s.closeOnDisconnect(); |
| |
| stmts.clear(); |
| } |
| } |
| |
| /** |
| * @param stmt Statement to close. |
| */ |
| void closeStatement(JdbcThinStatement stmt) { |
| synchronized (stmtsMux) { |
| stmts.remove(stmt); |
| } |
| } |
| |
| /** |
| * Streamer state and |
| */ |
| private class StreamState { |
| /** Maximum requests count that may be sent before any responses. */ |
| private static final int MAX_REQUESTS_BEFORE_RESPONSE = 10; |
| |
| /** Batch size for streaming. */ |
| private int streamBatchSize; |
| |
| /** Batch for streaming. */ |
| private List<JdbcQuery> streamBatch; |
| |
| /** Last added query to recognize batches. */ |
| private String lastStreamQry; |
| |
| /** Keep request order on execution. */ |
| private long order; |
| |
| /** Async response reader thread. */ |
| private Thread asyncRespReaderThread; |
| |
| /** Async response error. */ |
| private volatile Exception err; |
| |
| /** The order of the last batch request at the stream. */ |
| private long lastRespOrder = -1; |
| |
| /** Last response future. */ |
| private final GridFutureAdapter<Void> lastRespFut = new GridFutureAdapter<>(); |
| |
| /** Response semaphore sem. */ |
| private Semaphore respSem = new Semaphore(MAX_REQUESTS_BEFORE_RESPONSE); |
| |
| /** Streaming sticky ignite endpoint. */ |
| private final JdbcThinTcpIo streamingStickyIo; |
| |
| /** |
| * @param cmd Stream cmd. |
| * @param stickyIo Sticky ignite endpoint. |
| */ |
| StreamState(SqlSetStreamingCommand cmd, JdbcThinTcpIo stickyIo) { |
| streamBatchSize = cmd.batchSize(); |
| |
| asyncRespReaderThread = new Thread(this::readResponses); |
| |
| streamingStickyIo = stickyIo; |
| } |
| |
| /** |
| * Start reader. |
| */ |
| void start() { |
| asyncRespReaderThread.start(); |
| } |
| |
| /** |
| * Add another query for batched execution. |
| * |
| * @param sql Query. |
| * @param args Arguments. |
| * @throws SQLException On error. |
| */ |
| void addBatch(String sql, List<Object> args) throws SQLException { |
| checkError(); |
| |
| boolean newQry = (args == null || !F.eq(lastStreamQry, sql)); |
| |
| // Providing null as SQL here allows for recognizing subbatches on server and handling them more efficiently. |
| JdbcQuery q = new JdbcQuery(newQry ? sql : null, args != null ? args.toArray() : null); |
| |
| if (streamBatch == null) |
| streamBatch = new ArrayList<>(streamBatchSize); |
| |
| streamBatch.add(q); |
| |
| // Null args means "addBatch(String)" was called on non-prepared Statement, |
| // we don't want to remember its query string. |
| lastStreamQry = (args != null ? sql : null); |
| |
| if (streamBatch.size() == streamBatchSize) |
| executeBatch(false); |
| } |
| |
| /** |
| * @param lastBatch Whether open data streamers must be flushed and closed after this batch. |
| * @throws SQLException if failed. |
| */ |
| private void executeBatch(boolean lastBatch) throws SQLException { |
| checkError(); |
| |
| if (lastBatch) |
| lastRespOrder = order; |
| |
| try { |
| respSem.acquire(); |
| |
| sendRequestNotWaitResponse( |
| new JdbcOrderedBatchExecuteRequest(schema, streamBatch, autoCommit, lastBatch, order), |
| streamingStickyIo); |
| |
| streamBatch = null; |
| |
| lastStreamQry = null; |
| |
| if (lastBatch) { |
| try { |
| lastRespFut.get(); |
| } |
| catch (IgniteCheckedException ignored) { |
| // No-op. |
| // No exceptions are expected here. |
| } |
| |
| checkError(); |
| } |
| else |
| order++; |
| } |
| catch (InterruptedException e) { |
| throw new SQLException("Streaming operation was interrupted", SqlStateCode.INTERNAL_ERROR, e); |
| } |
| } |
| |
| /** |
| * Throws at the user thread exception that was thrown at the {@link #asyncRespReaderThread} thread. |
| * |
| * @throws SQLException Saved exception. |
| */ |
| void checkError() throws SQLException { |
| if (err != null) { |
| Exception err0 = err; |
| |
| err = null; |
| |
| if (err0 instanceof SQLException) |
| throw (SQLException)err0; |
| else { |
| onDisconnect(streamingStickyIo); |
| |
| if (err0 instanceof SocketTimeoutException) |
| throw new SQLException("Connection timed out.", SqlStateCode.CONNECTION_FAILURE, err0); |
| throw new SQLException("Failed to communicate with Ignite cluster on JDBC streaming.", |
| SqlStateCode.CONNECTION_FAILURE, err0); |
| } |
| } |
| } |
| |
| /** |
| * @throws SQLException On error. |
| */ |
| void close() throws SQLException { |
| close0(); |
| |
| checkError(); |
| } |
| |
| /** |
| */ |
| void close0() { |
| if (connCnt.get() > 0) { |
| try { |
| executeBatch(true); |
| } |
| catch (SQLException e) { |
| err = e; |
| |
| LOG.log(Level.WARNING, "Exception during batch send on streamed connection close", e); |
| } |
| } |
| |
| if (asyncRespReaderThread != null) |
| asyncRespReaderThread.interrupt(); |
| } |
| |
| /** |
| * |
| */ |
| void readResponses() { |
| try { |
| while (true) { |
| JdbcResponse resp = streamingStickyIo.readResponse(); |
| |
| if (resp.response() instanceof JdbcOrderedBatchExecuteResult) { |
| JdbcOrderedBatchExecuteResult res = (JdbcOrderedBatchExecuteResult)resp.response(); |
| |
| respSem.release(); |
| |
| if (res.errorCode() != ClientListenerResponse.STATUS_SUCCESS) { |
| err = new BatchUpdateException(res.errorMessage(), |
| IgniteQueryErrorCode.codeToSqlState(res.errorCode()), |
| res.errorCode(), res.updateCounts()); |
| } |
| |
| // Receive the response for the last request. |
| if (res.order() == lastRespOrder) { |
| lastRespFut.onDone(); |
| |
| break; |
| } |
| } |
| else if (resp.status() != ClientListenerResponse.STATUS_SUCCESS) |
| err = new SQLException(resp.error(), IgniteQueryErrorCode.codeToSqlState(resp.status())); |
| else |
| assert false : "Invalid response: " + resp; |
| } |
| } |
| catch (Exception e) { |
| err = e; |
| } |
| } |
| } |
| |
| /** |
| * @return True if query cancellation supported, false otherwise. |
| */ |
| boolean isQueryCancellationSupported() { |
| // TODO: IGNITE-11321: JDBC Thin: implement nodes multi version support. |
| return cliIo(null).isQueryCancellationSupported(); |
| } |
| |
| /** |
| * @param nodeIds Set of node's UUIDs. |
| * @return Ignite endpoint to use for request/response transferring. |
| */ |
| private JdbcThinTcpIo cliIo(List<UUID> nodeIds) { |
| if (!affinityAwareness) |
| return singleIo; |
| |
| if (txIo != null) |
| return txIo; |
| |
| if (nodeIds == null || nodeIds.isEmpty()) |
| return randomIo(); |
| |
| JdbcThinTcpIo io = null; |
| |
| if (nodeIds.size() == 1) |
| io = ios.get(nodeIds.get(0)); |
| else { |
| int initNodeId = RND.nextInt(nodeIds.size()); |
| |
| int iterCnt = 0; |
| |
| while (io == null) { |
| io = ios.get(nodeIds.get(initNodeId)); |
| |
| initNodeId = initNodeId == nodeIds.size() ? 0 : initNodeId + 1; |
| |
| iterCnt++; |
| |
| if (iterCnt == nodeIds.size()) |
| break; |
| } |
| } |
| |
| return io != null ? io : randomIo(); |
| } |
| |
| /** |
| * Returns random tcpIo, based on random UUID, generated in a custom way with the help of {@code Random} |
| * instead of {@code SecureRandom}. It's valid, cause cryptographically strong pseudo |
| * random number generator is not required in this particular case. {@code Random} is much faster |
| * than {@code SecureRandom}. |
| * |
| * @return random tcpIo |
| */ |
| private JdbcThinTcpIo randomIo() { |
| byte[] randomBytes = new byte[16]; |
| |
| RND.nextBytes(randomBytes); |
| |
| randomBytes[6] &= 0x0f; /* clear version */ |
| randomBytes[6] |= 0x40; /* set to version 4 */ |
| randomBytes[8] &= 0x3f; /* clear variant */ |
| randomBytes[8] |= 0x80; /* set to IETF variant */ |
| |
| long msb = 0; |
| |
| long lsb = 0; |
| |
| for (int i=0; i<8; i++) |
| msb = (msb << 8) | (randomBytes[i] & 0xff); |
| |
| for (int i=8; i<16; i++) |
| lsb = (lsb << 8) | (randomBytes[i] & 0xff); |
| |
| UUID randomUUID = new UUID(msb, lsb); |
| |
| Map.Entry<UUID, JdbcThinTcpIo> entry = ios.ceilingEntry(randomUUID); |
| |
| return entry != null ? entry.getValue() : ios.floorEntry(randomUUID).getValue(); |
| } |
| |
| /** |
| * @return Current server index. |
| */ |
| public int serverIndex() { |
| return srvIdx; |
| } |
| |
| /** |
| * Get next server index. |
| * |
| * @param len Number of servers. |
| * @return Index of the next server to connect to. |
| */ |
| private static int nextServerIndex(int len) { |
| if (len == 1) |
| return 0; |
| else { |
| long nextIdx = IDX_GEN.getAndIncrement(); |
| |
| return (int)(nextIdx % len); |
| } |
| } |
| |
| /** |
| * Establishes a connection to ignite endpoint, trying all specified hosts and ports one by one. |
| * Stops as soon as any connection is established. |
| * |
| * @throws SQLException If failed to connect to ignite cluster. |
| */ |
| private void connectInCommonMode() throws SQLException { |
| HostAndPortRange[] srvs = connProps.getAddresses(); |
| |
| List<Exception> exceptions = null; |
| |
| for (int i = 0; i < srvs.length; i++) { |
| srvIdx = nextServerIndex(srvs.length); |
| |
| HostAndPortRange srv = srvs[srvIdx]; |
| |
| try { |
| InetAddress[] addrs = InetAddress.getAllByName(srv.host()); |
| |
| for (InetAddress addr : addrs) { |
| for (int port = srv.portFrom(); port <= srv.portTo(); ++port) { |
| try { |
| JdbcThinTcpIo cliIo = new JdbcThinTcpIo(connProps, new InetSocketAddress(addr, port), |
| 0); |
| |
| cliIo.timeout(netTimeout); |
| |
| singleIo = cliIo; |
| |
| connCnt.incrementAndGet(); |
| |
| return; |
| } |
| catch (Exception exception) { |
| if (exceptions == null) |
| exceptions = new ArrayList<>(); |
| |
| exceptions.add(exception); |
| } |
| |
| } |
| } |
| } |
| catch (Exception exception) { |
| if (exceptions == null) |
| exceptions = new ArrayList<>(); |
| |
| exceptions.add(exception); |
| } |
| } |
| |
| handleConnectExceptions(exceptions); |
| } |
| |
| /** |
| * Prepare and throw general {@code SQLException} with all specified exceptions as suppressed items. |
| * |
| * @param exceptions Exceptions list. |
| * @throws SQLException Umbrella exception. |
| */ |
| private void handleConnectExceptions(List<Exception> exceptions) throws SQLException { |
| if (connCnt.get() == 0 && exceptions != null) { |
| close(); |
| |
| if (exceptions.size() == 1) { |
| Exception ex = exceptions.get(0); |
| |
| if (ex instanceof SQLException) |
| throw (SQLException)ex; |
| else if (ex instanceof IOException) |
| throw new SQLException("Failed to connect to Ignite cluster [url=" + connProps.getUrl() + ']', |
| SqlStateCode.CLIENT_CONNECTION_FAILED, ex); |
| } |
| |
| SQLException e = new SQLException("Failed to connect to server [url=" + connProps.getUrl() + ']', |
| SqlStateCode.CLIENT_CONNECTION_FAILED); |
| |
| for (Exception ex : exceptions) |
| e.addSuppressed(ex); |
| |
| throw e; |
| } |
| } |
| |
| /** |
| * Establishes a connection to ignite endpoint, trying all specified hosts and ports one by one. |
| * Stops as soon as all iosArr are established. |
| * |
| * @throws SQLException If failed to connect to at least one ignite endpoint, |
| * or if endpoints versions are not the same. |
| */ |
| private void connectInBestEffortAffinityMode() throws SQLException { |
| List<Exception> exceptions = null; |
| |
| IgniteProductVersion prevIgniteEndpointVer = null; |
| |
| for (int i = 0; i < connProps.getAddresses().length; i++) { |
| HostAndPortRange srv = connProps.getAddresses()[i]; |
| |
| try { |
| InetAddress[] addrs = InetAddress.getAllByName(srv.host()); |
| |
| for (InetAddress addr : addrs) { |
| for (int port = srv.portFrom(); port <= srv.portTo(); ++port) { |
| try { |
| JdbcThinTcpIo cliIo = |
| new JdbcThinTcpIo(connProps, new InetSocketAddress(addr, port), 0); |
| |
| if (!cliIo.isAffinityAwarenessSupported()) { |
| cliIo.close(); |
| |
| throw new SQLException("Failed to connect to Ignite node [url=" + |
| connProps.getUrl() + "]. address = [" + addr + ':' + port + "]." + |
| "Node doesn't support affinity awareness mode.", |
| SqlStateCode.INTERNAL_ERROR); |
| } |
| |
| if (prevIgniteEndpointVer != null && !prevIgniteEndpointVer.equals(cliIo.igniteVersion())) { |
| // TODO: 13.02.19 IGNITE-11321 JDBC Thin: implement nodes multi version support. |
| cliIo.close(); |
| |
| throw new SQLException("Failed to connect to Ignite node [url=" + |
| connProps.getUrl() + "]. address = [" + addr + ':' + port + "]." + |
| "Different versions of nodes are not supported in affinity awareness mode.", |
| SqlStateCode.INTERNAL_ERROR); |
| } |
| |
| cliIo.timeout(netTimeout); |
| |
| JdbcThinTcpIo ioToSameNode = ios.putIfAbsent(cliIo.nodeId(), cliIo); |
| |
| // This can happen if the same node has several IPs or if connection manager background |
| // timer task runs concurrently. |
| if (ioToSameNode != null) |
| cliIo.close(); |
| else |
| connCnt.incrementAndGet(); |
| |
| prevIgniteEndpointVer = cliIo.igniteVersion(); |
| |
| return; |
| } |
| catch (Exception exception) { |
| if (exceptions == null) |
| exceptions = new ArrayList<>(); |
| |
| exceptions.add(exception); |
| } |
| } |
| } |
| } |
| catch (Exception exception) { |
| if (exceptions == null) |
| exceptions = new ArrayList<>(); |
| |
| exceptions.add(exception); |
| } |
| } |
| |
| handleConnectExceptions(exceptions); |
| } |
| |
| /** |
| * Recreates affinity cache if affinity topology version was changed and adds partition result to sql cache. |
| * |
| * @param qryReq Query request. |
| * @param res Jdbc Response. |
| */ |
| private void updateAffinityCache(JdbcQueryExecuteRequest qryReq, JdbcResponse res) { |
| if (affinityAwareness) { |
| AffinityTopologyVersion resAffVer = res.affinityVersion(); |
| |
| if (resAffVer != null && (affinityCache == null || affinityCache.version().compareTo(resAffVer) < 0)) |
| affinityCache = new AffinityCache(resAffVer); |
| |
| // Partition result was requested. |
| if (res.response() instanceof JdbcQueryExecuteResult && qryReq.partitionResponseRequest()) { |
| PartitionResult partRes = ((JdbcQueryExecuteResult)res.response()).partitionResult(); |
| |
| if (partRes == null || affinityCache.version().equals(partRes.topologyVersion())) { |
| int cacheId = (partRes != null && partRes.tree() != null) ? |
| GridCacheUtils.cacheId(partRes.cacheName()) : |
| -1; |
| |
| PartitionClientContext partClientCtx = partRes != null ? |
| new PartitionClientContext(partRes.partitionsCount()) : |
| null; |
| |
| QualifiedSQLQuery qry = new QualifiedSQLQuery(qryReq.schemaName(), qryReq.sqlQuery()); |
| |
| JdbcThinPartitionResultDescriptor partResDescr = |
| new JdbcThinPartitionResultDescriptor(partRes, cacheId, partClientCtx); |
| |
| affinityCache.addSqlQuery(qry, partResDescr); |
| } |
| } |
| } |
| } |
| |
| /** |
| * Request Timeout Task |
| */ |
| private class RequestTimeoutTask implements Runnable { |
| /** Request id. */ |
| private final long reqId; |
| |
| /** Sticky singleIo. */ |
| private final JdbcThinTcpIo stickyIO; |
| |
| /** Remaining query timeout. */ |
| private int remainingQryTimeout; |
| |
| /** Flag that shows whether TimerTask was expired or not. */ |
| private AtomicBoolean expired; |
| |
| /** |
| * @param reqId Request Id to cancel in case of timeout |
| * @param initReqTimeout Initial request timeout |
| */ |
| RequestTimeoutTask(long reqId, JdbcThinTcpIo stickyIO, int initReqTimeout) { |
| this.reqId = reqId; |
| |
| this.stickyIO = stickyIO; |
| |
| remainingQryTimeout = initReqTimeout; |
| |
| expired = new AtomicBoolean(false); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void run() { |
| try { |
| if (remainingQryTimeout <= 0) { |
| expired.set(true); |
| |
| sendQueryCancelRequest(new JdbcQueryCancelRequest(reqId), stickyIO); |
| |
| qryTimeoutScheduledFut.cancel(false); |
| |
| return; |
| } |
| |
| remainingQryTimeout -= REQUEST_TIMEOUT_PERIOD; |
| } |
| catch (SQLException e) { |
| LOG.log(Level.WARNING, |
| "Request timeout processing failure: unable to cancel request [reqId=" + reqId + ']', e); |
| |
| qryTimeoutScheduledFut.cancel(false); |
| } |
| } |
| } |
| |
| /** |
| * Connection Handler Task |
| */ |
| private class ConnectionHandlerTask implements Runnable { |
| /** Map with reconnection delays. */ |
| private Map<InetSocketAddress, Integer> reconnectionDelays = new HashMap<>(); |
| |
| /** Map with reconnection delays remainder. */ |
| private Map<InetSocketAddress, Integer> reconnectionDelaysRemainder = new HashMap<>(); |
| |
| /** {@inheritDoc} */ |
| @Override public void run() { |
| try { |
| for (Map.Entry<InetSocketAddress, Integer> delayEntry : reconnectionDelaysRemainder.entrySet()) |
| reconnectionDelaysRemainder.put(delayEntry.getKey(), delayEntry.getValue() - RECONNECTION_DELAY); |
| |
| Set<InetSocketAddress> aliveSockAddrs = |
| ios.values().stream().map(JdbcThinTcpIo::socketAddress).collect(Collectors.toSet()); |
| |
| IgniteProductVersion prevIgniteEndpointVer = null; |
| |
| for (int i = 0; i < connProps.getAddresses().length; i++) { |
| HostAndPortRange srv = connProps.getAddresses()[i]; |
| |
| try { |
| InetAddress[] addrs = InetAddress.getAllByName(srv.host()); |
| |
| for (InetAddress addr : addrs) { |
| for (int port = srv.portFrom(); port <= srv.portTo(); ++port) { |
| InetSocketAddress sockAddr = null; |
| |
| try { |
| sockAddr = new InetSocketAddress(addr, port); |
| |
| if (aliveSockAddrs.contains(sockAddr)) { |
| reconnectionDelaysRemainder.remove(sockAddr); |
| reconnectionDelays.remove(sockAddr); |
| |
| continue; |
| } |
| |
| Integer delayRemainder = reconnectionDelaysRemainder.get(sockAddr); |
| |
| if (delayRemainder != null && delayRemainder != 0) |
| continue; |
| |
| if (closed) { |
| maintenanceExecutor.shutdown(); |
| |
| return; |
| } |
| |
| JdbcThinTcpIo cliIo = |
| new JdbcThinTcpIo(connProps, new InetSocketAddress(addr, port), 0); |
| |
| if (!cliIo.isAffinityAwarenessSupported()) { |
| processDelay(sockAddr); |
| |
| LOG.log(Level.WARNING, "Failed to connect to Ignite node [url=" + |
| connProps.getUrl() + "]. address = [" + addr + ':' + port + "]." + |
| "Node doesn't support best effort affinity mode."); |
| |
| cliIo.close(); |
| |
| continue; |
| } |
| |
| if (prevIgniteEndpointVer != null && |
| !prevIgniteEndpointVer.equals(cliIo.igniteVersion())) { |
| processDelay(sockAddr); |
| |
| LOG.log(Level.WARNING, "Failed to connect to Ignite node [url=" + |
| connProps.getUrl() + "]. address = [" + addr + ':' + port + "]." + |
| "Different versions of nodes are not supported in best " + |
| "effort affinity mode."); |
| |
| cliIo.close(); |
| |
| continue; |
| } |
| |
| cliIo.timeout(netTimeout); |
| |
| JdbcThinTcpIo ioToSameNode = ios.putIfAbsent(cliIo.nodeId(), cliIo); |
| |
| // This can happen if the same node has several IPs or if ensureConnected() runs |
| // concurrently |
| if (ioToSameNode != null) |
| cliIo.close(); |
| else |
| connCnt.incrementAndGet(); |
| |
| prevIgniteEndpointVer = cliIo.igniteVersion(); |
| |
| if (closed) { |
| maintenanceExecutor.shutdown(); |
| |
| cliIo.close(); |
| |
| ios.remove(cliIo.nodeId()); |
| |
| return; |
| } |
| } |
| catch (Exception exception) { |
| if (sockAddr != null) |
| processDelay(sockAddr); |
| |
| LOG.log(Level.WARNING, "Failed to connect to Ignite node [url=" + |
| connProps.getUrl() + "]. address = [" + addr + ':' + port + "]."); |
| } |
| } |
| } |
| } |
| catch (Exception exception) { |
| LOG.log(Level.WARNING, "Failed to connect to Ignite node [url=" + |
| connProps.getUrl() + "]. server = [" + srv + "]."); |
| } |
| } |
| } |
| catch (Exception e) { |
| LOG.log(Level.WARNING, "Connection handler processing failure. Reconnection processes was stopped." |
| , e); |
| |
| connectionsHndScheduledFut.cancel(false); |
| } |
| } |
| |
| /** |
| * Increase reconnection delay if needed and store it to corresponding maps. |
| * |
| * @param sockAddr Socket address. |
| */ |
| private void processDelay(InetSocketAddress sockAddr) { |
| Integer delay = reconnectionDelays.get(sockAddr); |
| |
| delay = delay == null ? RECONNECTION_DELAY : delay * 2; |
| |
| if (delay > RECONNECTION_MAX_DELAY) |
| delay = RECONNECTION_MAX_DELAY; |
| |
| reconnectionDelays.put(sockAddr, delay); |
| |
| reconnectionDelaysRemainder.put(sockAddr, delay); |
| } |
| } |
| } |