blob: cfe4263438f16c2516592406738b9093c1a30b6f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.odbc.jdbc;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.binary.BinaryReaderExImpl;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.odbc.ClientListenerAbstractConnectionContext;
import org.apache.ignite.internal.processors.odbc.ClientListenerMessageParser;
import org.apache.ignite.internal.processors.odbc.ClientListenerProtocolVersion;
import org.apache.ignite.internal.processors.odbc.ClientListenerRequestHandler;
import org.apache.ignite.internal.processors.odbc.ClientListenerResponse;
import org.apache.ignite.internal.processors.odbc.ClientListenerResponseSender;
import org.apache.ignite.internal.processors.query.NestedTxMode;
import org.apache.ignite.internal.util.GridSpinBusyLock;
import org.apache.ignite.internal.util.nio.GridNioSession;
import org.apache.ignite.internal.util.typedef.F;
import static org.apache.ignite.internal.jdbc.thin.JdbcThinUtils.nullableBooleanFromByte;
import static org.apache.ignite.internal.processors.odbc.ClientListenerNioListener.JDBC_CLIENT;
/**
* JDBC Connection Context.
*/
public class JdbcConnectionContext extends ClientListenerAbstractConnectionContext {
/** Version 2.1.0. */
private static final ClientListenerProtocolVersion VER_2_1_0 = ClientListenerProtocolVersion.create(2, 1, 0);
/** Version 2.1.5: added "lazy" flag. */
private static final ClientListenerProtocolVersion VER_2_1_5 = ClientListenerProtocolVersion.create(2, 1, 5);
/** Version 2.3.1: added "multiple statements query" feature. */
static final ClientListenerProtocolVersion VER_2_3_0 = ClientListenerProtocolVersion.create(2, 3, 0);
/** Version 2.4.0: adds default values for columns feature. */
static final ClientListenerProtocolVersion VER_2_4_0 = ClientListenerProtocolVersion.create(2, 4, 0);
/** Version 2.5.0: adds precision and scale for columns feature. */
static final ClientListenerProtocolVersion VER_2_5_0 = ClientListenerProtocolVersion.create(2, 5, 0);
/** Version 2.7.0: adds maximum length for columns feature.*/
static final ClientListenerProtocolVersion VER_2_7_0 = ClientListenerProtocolVersion.create(2, 7, 0);
/** Version 2.8.0: adds query id in order to implement cancel feature, partition awareness support: IEP-23.*/
static final ClientListenerProtocolVersion VER_2_8_0 = ClientListenerProtocolVersion.create(2, 8, 0);
/** Version 2.9.0: adds user attributes, adds features flags support. */
static final ClientListenerProtocolVersion VER_2_9_0 = ClientListenerProtocolVersion.create(2, 9, 0);
/** Current version. */
public static final ClientListenerProtocolVersion CURRENT_VER = VER_2_9_0;
/** Supported versions. */
private static final Set<ClientListenerProtocolVersion> SUPPORTED_VERS = new HashSet<>();
/** Shutdown busy lock. */
private final GridSpinBusyLock busyLock;
/** Logger. */
private final IgniteLogger log;
/** Maximum allowed cursors. */
private final int maxCursors;
/** Message parser. */
private JdbcMessageParser parser;
/** Request handler. */
private JdbcRequestHandler handler;
/** Current protocol context. */
private JdbcProtocolContext protoCtx;
/** Last reported affinity topology version. */
private AtomicReference<AffinityTopologyVersion> lastAffinityTopVer = new AtomicReference<>();
static {
SUPPORTED_VERS.add(CURRENT_VER);
SUPPORTED_VERS.add(VER_2_9_0);
SUPPORTED_VERS.add(VER_2_8_0);
SUPPORTED_VERS.add(VER_2_7_0);
SUPPORTED_VERS.add(VER_2_5_0);
SUPPORTED_VERS.add(VER_2_4_0);
SUPPORTED_VERS.add(VER_2_3_0);
SUPPORTED_VERS.add(VER_2_1_5);
SUPPORTED_VERS.add(VER_2_1_0);
}
/**
* Constructor.
* @param ctx Kernal Context.
* @param ses Client's NIO session.
* @param busyLock Shutdown busy lock.
* @param connId Connection ID.
* @param maxCursors Maximum allowed cursors.
*/
public JdbcConnectionContext(GridKernalContext ctx, GridNioSession ses, GridSpinBusyLock busyLock, long connId,
int maxCursors) {
super(ctx, ses, connId);
this.busyLock = busyLock;
this.maxCursors = maxCursors;
log = ctx.log(getClass());
}
/** {@inheritDoc} */
@Override public byte clientType() {
return JDBC_CLIENT;
}
/** {@inheritDoc} */
@Override public boolean isVersionSupported(ClientListenerProtocolVersion ver) {
return SUPPORTED_VERS.contains(ver);
}
/** {@inheritDoc} */
@Override public ClientListenerProtocolVersion defaultVersion() {
return CURRENT_VER;
}
/** {@inheritDoc} */
@Override public void initializeFromHandshake(GridNioSession ses,
ClientListenerProtocolVersion ver, BinaryReaderExImpl reader)
throws IgniteCheckedException {
assert SUPPORTED_VERS.contains(ver) : "Unsupported JDBC protocol version.";
boolean distributedJoins = reader.readBoolean();
boolean enforceJoinOrder = reader.readBoolean();
boolean collocated = reader.readBoolean();
boolean replicatedOnly = reader.readBoolean();
boolean autoCloseCursors = reader.readBoolean();
boolean lazyExec = false;
boolean skipReducerOnUpdate = false;
NestedTxMode nestedTxMode = NestedTxMode.DEFAULT;
if (ver.compareTo(VER_2_1_5) >= 0)
lazyExec = reader.readBoolean();
if (ver.compareTo(VER_2_3_0) >= 0)
skipReducerOnUpdate = reader.readBoolean();
if (ver.compareTo(VER_2_7_0) >= 0) {
String nestedTxModeName = reader.readString();
if (!F.isEmpty(nestedTxModeName)) {
try {
nestedTxMode = NestedTxMode.valueOf(nestedTxModeName);
}
catch (IllegalArgumentException e) {
throw new IgniteCheckedException("Invalid nested transactions handling mode: " + nestedTxModeName);
}
}
}
Boolean dataPageScanEnabled = null;
Integer updateBatchSize = null;
EnumSet<JdbcThinFeature> features = EnumSet.noneOf(JdbcThinFeature.class);
if (ver.compareTo(VER_2_8_0) >= 0) {
dataPageScanEnabled = nullableBooleanFromByte(reader.readByte());
updateBatchSize = JdbcUtils.readNullableInteger(reader);
}
if (ver.compareTo(VER_2_9_0) >= 0) {
userAttrs = reader.readMap();
byte[] cliFeatures = reader.readByteArray();
features = JdbcThinFeature.enumSet(cliFeatures);
}
if (ver.compareTo(VER_2_5_0) >= 0) {
String user = null;
String passwd = null;
try {
if (reader.available() > 0) {
user = reader.readString();
passwd = reader.readString();
}
}
catch (Exception e) {
throw new IgniteCheckedException("Handshake error: " + e.getMessage(), e);
}
authenticate(ses, user, passwd);
}
protoCtx = new JdbcProtocolContext(ver, features, true);
initClientDescriptor("jdbc-thin");
parser = new JdbcMessageParser(ctx, protoCtx);
ClientListenerResponseSender sender = new ClientListenerResponseSender() {
@Override public void send(ClientListenerResponse resp) {
if (resp != null) {
if (log.isDebugEnabled())
log.debug("Async response: [resp=" + resp.status() + ']');
ses.send(parser.encode(resp));
}
}
};
handler = new JdbcRequestHandler(busyLock, sender, maxCursors, distributedJoins, enforceJoinOrder,
collocated, replicatedOnly, autoCloseCursors, lazyExec, skipReducerOnUpdate, nestedTxMode,
dataPageScanEnabled, updateBatchSize, ver, this);
handler.start();
}
/** {@inheritDoc} */
@Override public ClientListenerRequestHandler handler() {
return handler;
}
/** {@inheritDoc} */
@Override public ClientListenerMessageParser parser() {
return parser;
}
/** {@inheritDoc} */
@Override public void onDisconnected() {
handler.onDisconnect();
super.onDisconnected();
}
/**
* @return Retrieves current affinity topology version and sets it as a last if it was changed, false otherwise.
*/
public AffinityTopologyVersion getAffinityTopologyVersionIfChanged() {
while (true) {
AffinityTopologyVersion oldVer = lastAffinityTopVer.get();
AffinityTopologyVersion newVer = ctx.cache().context().exchange().readyAffinityVersion();
boolean changed = oldVer == null || oldVer.compareTo(newVer) < 0;
if (changed) {
boolean success = lastAffinityTopVer.compareAndSet(oldVer, newVer);
if (!success)
continue;
}
return changed ? newVer : null;
}
}
/**
* @return Binary context.
*/
public JdbcProtocolContext protocolContext() {
return protoCtx;
}
}