| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.internal.processors.odbc.jdbc; |
| |
| import java.util.EnumSet; |
| import java.util.HashSet; |
| import java.util.Set; |
| import java.util.concurrent.atomic.AtomicReference; |
| import org.apache.ignite.IgniteCheckedException; |
| import org.apache.ignite.IgniteLogger; |
| import org.apache.ignite.internal.GridKernalContext; |
| import org.apache.ignite.internal.binary.BinaryReaderExImpl; |
| import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; |
| import org.apache.ignite.internal.processors.odbc.ClientListenerAbstractConnectionContext; |
| import org.apache.ignite.internal.processors.odbc.ClientListenerMessageParser; |
| import org.apache.ignite.internal.processors.odbc.ClientListenerProtocolVersion; |
| import org.apache.ignite.internal.processors.odbc.ClientListenerRequestHandler; |
| import org.apache.ignite.internal.processors.odbc.ClientListenerResponse; |
| import org.apache.ignite.internal.processors.odbc.ClientListenerResponseSender; |
| import org.apache.ignite.internal.processors.query.NestedTxMode; |
| import org.apache.ignite.internal.util.GridSpinBusyLock; |
| import org.apache.ignite.internal.util.nio.GridNioSession; |
| import org.apache.ignite.internal.util.typedef.F; |
| |
| import static org.apache.ignite.internal.jdbc.thin.JdbcThinUtils.nullableBooleanFromByte; |
| import static org.apache.ignite.internal.processors.odbc.ClientListenerNioListener.JDBC_CLIENT; |
| |
| /** |
| * JDBC Connection Context. |
| */ |
| public class JdbcConnectionContext extends ClientListenerAbstractConnectionContext { |
| /** Version 2.1.0. */ |
| private static final ClientListenerProtocolVersion VER_2_1_0 = ClientListenerProtocolVersion.create(2, 1, 0); |
| |
| /** Version 2.1.5: added "lazy" flag. */ |
| private static final ClientListenerProtocolVersion VER_2_1_5 = ClientListenerProtocolVersion.create(2, 1, 5); |
| |
| /** Version 2.3.1: added "multiple statements query" feature. */ |
| static final ClientListenerProtocolVersion VER_2_3_0 = ClientListenerProtocolVersion.create(2, 3, 0); |
| |
| /** Version 2.4.0: adds default values for columns feature. */ |
| static final ClientListenerProtocolVersion VER_2_4_0 = ClientListenerProtocolVersion.create(2, 4, 0); |
| |
| /** Version 2.5.0: adds precision and scale for columns feature. */ |
| static final ClientListenerProtocolVersion VER_2_5_0 = ClientListenerProtocolVersion.create(2, 5, 0); |
| |
| /** Version 2.7.0: adds maximum length for columns feature.*/ |
| static final ClientListenerProtocolVersion VER_2_7_0 = ClientListenerProtocolVersion.create(2, 7, 0); |
| |
| /** Version 2.8.0: adds query id in order to implement cancel feature, partition awareness support: IEP-23.*/ |
| static final ClientListenerProtocolVersion VER_2_8_0 = ClientListenerProtocolVersion.create(2, 8, 0); |
| |
| /** Version 2.9.0: adds user attributes, adds features flags support. */ |
| static final ClientListenerProtocolVersion VER_2_9_0 = ClientListenerProtocolVersion.create(2, 9, 0); |
| |
| /** Current version. */ |
| public static final ClientListenerProtocolVersion CURRENT_VER = VER_2_9_0; |
| |
| /** Supported versions. */ |
| private static final Set<ClientListenerProtocolVersion> SUPPORTED_VERS = new HashSet<>(); |
| |
| /** Shutdown busy lock. */ |
| private final GridSpinBusyLock busyLock; |
| |
| /** Logger. */ |
| private final IgniteLogger log; |
| |
| /** Maximum allowed cursors. */ |
| private final int maxCursors; |
| |
| /** Message parser. */ |
| private JdbcMessageParser parser; |
| |
| /** Request handler. */ |
| private JdbcRequestHandler handler; |
| |
| /** Current protocol context. */ |
| private JdbcProtocolContext protoCtx; |
| |
| /** Last reported affinity topology version. */ |
| private AtomicReference<AffinityTopologyVersion> lastAffinityTopVer = new AtomicReference<>(); |
| |
| static { |
| SUPPORTED_VERS.add(CURRENT_VER); |
| SUPPORTED_VERS.add(VER_2_9_0); |
| SUPPORTED_VERS.add(VER_2_8_0); |
| SUPPORTED_VERS.add(VER_2_7_0); |
| SUPPORTED_VERS.add(VER_2_5_0); |
| SUPPORTED_VERS.add(VER_2_4_0); |
| SUPPORTED_VERS.add(VER_2_3_0); |
| SUPPORTED_VERS.add(VER_2_1_5); |
| SUPPORTED_VERS.add(VER_2_1_0); |
| } |
| |
| /** |
| * Constructor. |
| * @param ctx Kernal Context. |
| * @param ses Client's NIO session. |
| * @param busyLock Shutdown busy lock. |
| * @param connId Connection ID. |
| * @param maxCursors Maximum allowed cursors. |
| */ |
| public JdbcConnectionContext(GridKernalContext ctx, GridNioSession ses, GridSpinBusyLock busyLock, long connId, |
| int maxCursors) { |
| super(ctx, ses, connId); |
| |
| this.busyLock = busyLock; |
| this.maxCursors = maxCursors; |
| |
| log = ctx.log(getClass()); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public byte clientType() { |
| return JDBC_CLIENT; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean isVersionSupported(ClientListenerProtocolVersion ver) { |
| return SUPPORTED_VERS.contains(ver); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public ClientListenerProtocolVersion defaultVersion() { |
| return CURRENT_VER; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void initializeFromHandshake(GridNioSession ses, |
| ClientListenerProtocolVersion ver, BinaryReaderExImpl reader) |
| throws IgniteCheckedException { |
| assert SUPPORTED_VERS.contains(ver) : "Unsupported JDBC protocol version."; |
| |
| boolean distributedJoins = reader.readBoolean(); |
| boolean enforceJoinOrder = reader.readBoolean(); |
| boolean collocated = reader.readBoolean(); |
| boolean replicatedOnly = reader.readBoolean(); |
| boolean autoCloseCursors = reader.readBoolean(); |
| |
| boolean lazyExec = false; |
| boolean skipReducerOnUpdate = false; |
| |
| NestedTxMode nestedTxMode = NestedTxMode.DEFAULT; |
| |
| if (ver.compareTo(VER_2_1_5) >= 0) |
| lazyExec = reader.readBoolean(); |
| |
| if (ver.compareTo(VER_2_3_0) >= 0) |
| skipReducerOnUpdate = reader.readBoolean(); |
| |
| if (ver.compareTo(VER_2_7_0) >= 0) { |
| String nestedTxModeName = reader.readString(); |
| |
| if (!F.isEmpty(nestedTxModeName)) { |
| try { |
| nestedTxMode = NestedTxMode.valueOf(nestedTxModeName); |
| } |
| catch (IllegalArgumentException e) { |
| throw new IgniteCheckedException("Invalid nested transactions handling mode: " + nestedTxModeName); |
| } |
| } |
| } |
| |
| Boolean dataPageScanEnabled = null; |
| Integer updateBatchSize = null; |
| EnumSet<JdbcThinFeature> features = EnumSet.noneOf(JdbcThinFeature.class); |
| |
| if (ver.compareTo(VER_2_8_0) >= 0) { |
| dataPageScanEnabled = nullableBooleanFromByte(reader.readByte()); |
| |
| updateBatchSize = JdbcUtils.readNullableInteger(reader); |
| } |
| |
| if (ver.compareTo(VER_2_9_0) >= 0) { |
| userAttrs = reader.readMap(); |
| |
| byte[] cliFeatures = reader.readByteArray(); |
| |
| features = JdbcThinFeature.enumSet(cliFeatures); |
| } |
| |
| if (ver.compareTo(VER_2_5_0) >= 0) { |
| String user = null; |
| String passwd = null; |
| |
| try { |
| if (reader.available() > 0) { |
| user = reader.readString(); |
| passwd = reader.readString(); |
| } |
| } |
| catch (Exception e) { |
| throw new IgniteCheckedException("Handshake error: " + e.getMessage(), e); |
| } |
| |
| authenticate(ses, user, passwd); |
| } |
| |
| protoCtx = new JdbcProtocolContext(ver, features, true); |
| |
| initClientDescriptor("jdbc-thin"); |
| |
| parser = new JdbcMessageParser(ctx, protoCtx); |
| |
| ClientListenerResponseSender sender = new ClientListenerResponseSender() { |
| @Override public void send(ClientListenerResponse resp) { |
| if (resp != null) { |
| if (log.isDebugEnabled()) |
| log.debug("Async response: [resp=" + resp.status() + ']'); |
| |
| ses.send(parser.encode(resp)); |
| } |
| } |
| }; |
| |
| handler = new JdbcRequestHandler(busyLock, sender, maxCursors, distributedJoins, enforceJoinOrder, |
| collocated, replicatedOnly, autoCloseCursors, lazyExec, skipReducerOnUpdate, nestedTxMode, |
| dataPageScanEnabled, updateBatchSize, ver, this); |
| |
| handler.start(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public ClientListenerRequestHandler handler() { |
| return handler; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public ClientListenerMessageParser parser() { |
| return parser; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void onDisconnected() { |
| handler.onDisconnect(); |
| |
| super.onDisconnected(); |
| } |
| |
| /** |
| * @return Retrieves current affinity topology version and sets it as a last if it was changed, false otherwise. |
| */ |
| public AffinityTopologyVersion getAffinityTopologyVersionIfChanged() { |
| while (true) { |
| AffinityTopologyVersion oldVer = lastAffinityTopVer.get(); |
| AffinityTopologyVersion newVer = ctx.cache().context().exchange().readyAffinityVersion(); |
| |
| boolean changed = oldVer == null || oldVer.compareTo(newVer) < 0; |
| |
| if (changed) { |
| boolean success = lastAffinityTopVer.compareAndSet(oldVer, newVer); |
| |
| if (!success) |
| continue; |
| } |
| |
| return changed ? newVer : null; |
| } |
| } |
| |
| /** |
| * @return Binary context. |
| */ |
| public JdbcProtocolContext protocolContext() { |
| return protoCtx; |
| } |
| } |