blob: e50cbf7e4b722f1995571d1370e3b5e1e7deb863 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.odbc;
import java.io.Closeable;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.configuration.ClientConnectorConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.ThinClientConfiguration;
import org.apache.ignite.failure.FailureContext;
import org.apache.ignite.failure.FailureType;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.MarshallerContextImpl;
import org.apache.ignite.internal.binary.BinaryCachingMetadataHandler;
import org.apache.ignite.internal.binary.BinaryContext;
import org.apache.ignite.internal.binary.BinaryMarshaller;
import org.apache.ignite.internal.binary.BinaryReaderExImpl;
import org.apache.ignite.internal.binary.BinaryWriterExImpl;
import org.apache.ignite.internal.binary.streams.BinaryHeapInputStream;
import org.apache.ignite.internal.binary.streams.BinaryHeapOutputStream;
import org.apache.ignite.internal.processors.authentication.IgniteAccessControlException;
import org.apache.ignite.internal.processors.odbc.jdbc.JdbcConnectionContext;
import org.apache.ignite.internal.processors.odbc.odbc.OdbcConnectionContext;
import org.apache.ignite.internal.processors.platform.client.ClientConnectionContext;
import org.apache.ignite.internal.processors.platform.client.ClientStatus;
import org.apache.ignite.internal.processors.security.OperationSecurityContext;
import org.apache.ignite.internal.util.GridSpinBusyLock;
import org.apache.ignite.internal.util.nio.GridNioFuture;
import org.apache.ignite.internal.util.nio.GridNioServerListenerAdapter;
import org.apache.ignite.internal.util.nio.GridNioSession;
import org.apache.ignite.internal.util.nio.GridNioSessionMetaKey;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.jetbrains.annotations.Nullable;
/**
* Client message listener.
*/
public class ClientListenerNioListener extends GridNioServerListenerAdapter<ClientMessage> {
/** ODBC driver handshake code. */
public static final byte ODBC_CLIENT = 0;
/** JDBC driver handshake code. */
public static final byte JDBC_CLIENT = 1;
/** Thin client handshake code. */
public static final byte THIN_CLIENT = 2;
/** Connection handshake timeout task. */
public static final int CONN_CTX_HANDSHAKE_TIMEOUT_TASK = GridNioSessionMetaKey.nextUniqueKey();
/** Connection-related metadata key. */
public static final int CONN_CTX_META_KEY = GridNioSessionMetaKey.nextUniqueKey();
/** Next connection id. */
private static AtomicInteger nextConnId = new AtomicInteger(1);
/** Busy lock. */
private final GridSpinBusyLock busyLock;
/** Kernal context. */
private final GridKernalContext ctx;
/** Maximum allowed cursors. */
private final int maxCursors;
/** Logger. */
private final IgniteLogger log;
/** Client connection config. */
private final ClientConnectorConfiguration cliConnCfg;
/** Thin client configuration. */
private final ThinClientConfiguration thinCfg;
/** Metrics. */
private final ClientListenerMetrics metrics;
/**
* Constructor.
*
* @param ctx Context.
* @param busyLock Shutdown busy lock.
* @param cliConnCfg Client connector configuration.
*/
public ClientListenerNioListener(GridKernalContext ctx, GridSpinBusyLock busyLock,
ClientConnectorConfiguration cliConnCfg) {
assert cliConnCfg != null;
this.ctx = ctx;
this.busyLock = busyLock;
this.cliConnCfg = cliConnCfg;
maxCursors = cliConnCfg.getMaxOpenCursorsPerConnection();
log = ctx.log(getClass());
thinCfg = cliConnCfg.getThinClientConfiguration() == null ? new ThinClientConfiguration()
: new ThinClientConfiguration(cliConnCfg.getThinClientConfiguration());
metrics = new ClientListenerMetrics(ctx);
}
/** {@inheritDoc} */
@Override public void onConnected(GridNioSession ses) {
if (log.isDebugEnabled())
log.debug("Client connected: " + ses.remoteAddress());
long handshakeTimeout = cliConnCfg.getHandshakeTimeout();
if (handshakeTimeout > 0)
scheduleHandshakeTimeout(ses, handshakeTimeout);
}
/** {@inheritDoc} */
@Override public void onDisconnected(GridNioSession ses, @Nullable Exception e) {
ClientListenerConnectionContext connCtx = ses.meta(CONN_CTX_META_KEY);
if (connCtx != null) {
connCtx.onDisconnected();
metrics.onDisconnect(connCtx.clientType());
}
if (log.isDebugEnabled()) {
if (e == null)
log.debug("Client disconnected: " + ses.remoteAddress());
else
log.debug("Client disconnected due to an error [addr=" + ses.remoteAddress() + ", err=" + e + ']');
}
}
/** {@inheritDoc} */
@Override public void onMessage(GridNioSession ses, ClientMessage msg) {
assert msg != null;
ClientListenerConnectionContext connCtx = ses.meta(CONN_CTX_META_KEY);
if (connCtx == null) {
try {
onHandshake(ses, msg);
}
catch (Exception e) {
U.error(log, "Failed to handle handshake request " +
"(probably, connection has already been closed).", e);
}
return;
}
ClientListenerMessageParser parser = connCtx.parser();
ClientListenerRequestHandler handler = connCtx.handler();
ClientListenerRequest req;
try {
req = parser.decode(msg);
}
catch (Exception e) {
try {
handler.unregisterRequest(parser.decodeRequestId(msg));
}
catch (Exception e1) {
U.error(log, "Failed to unregister request.", e1);
}
U.error(log, "Failed to parse client request.", e);
ses.close();
return;
}
assert req != null;
try {
long startTime = 0;
if (log.isDebugEnabled()) {
startTime = System.nanoTime();
log.debug("Client request received [reqId=" + req.requestId() + ", addr=" +
ses.remoteAddress() + ", req=" + req + ']');
}
ClientListenerResponse resp;
try (OperationSecurityContext s = ctx.security().withContext(connCtx.securityContext())) {
resp = handler.handle(req);
}
if (resp != null) {
if (log.isDebugEnabled()) {
long dur = (System.nanoTime() - startTime) / 1000;
log.debug("Client request processed [reqId=" + req.requestId() + ", dur(mcs)=" + dur +
", resp=" + resp.status() + ']');
}
GridNioFuture<?> fut = ses.send(parser.encode(resp));
fut.listen(f -> {
if (f.error() == null)
resp.onSent();
});
}
}
catch (Throwable e) {
handler.unregisterRequest(req.requestId());
U.error(log, "Failed to process client request [req=" + req + ']', e);
ses.send(parser.encode(handler.handleException(e, req)));
if (e instanceof Error)
throw (Error)e;
}
}
/** {@inheritDoc} */
@Override public void onSessionIdleTimeout(GridNioSession ses) {
ses.close();
}
/** {@inheritDoc} */
@Override public void onFailure(FailureType failureType, Throwable failure) {
if (failure instanceof OutOfMemoryError)
ctx.failure().process(new FailureContext(failureType, failure));
}
/**
* Schedule handshake timeout.
* @param ses Connection session.
* @param handshakeTimeout Handshake timeout.
*/
private void scheduleHandshakeTimeout(GridNioSession ses, long handshakeTimeout) {
assert handshakeTimeout > 0;
Closeable timeoutTask = ctx.timeout().schedule(new Runnable() {
@Override public void run() {
ses.close();
metrics.onHandshakeTimeout();
U.warn(log, "Unable to perform handshake within timeout " +
"[timeout=" + handshakeTimeout + ", remoteAddr=" + ses.remoteAddress() + ']');
}
}, handshakeTimeout, -1);
ses.addMeta(CONN_CTX_HANDSHAKE_TIMEOUT_TASK, timeoutTask);
}
/**
* Cancel handshake timeout task execution.
* @param ses Connection session.
*/
private void cancelHandshakeTimeout(GridNioSession ses) {
Closeable timeoutTask = ses.removeMeta(CONN_CTX_HANDSHAKE_TIMEOUT_TASK);
try {
if (timeoutTask != null)
timeoutTask.close();
} catch (Exception e) {
U.warn(log, "Failed to cancel handshake timeout task " +
"[remoteAddr=" + ses.remoteAddress() + ", err=" + e + ']');
}
}
/**
* Perform handshake.
*
* @param ses Session.
* @param msg Message bytes.
*/
private void onHandshake(GridNioSession ses, ClientMessage msg) {
BinaryContext ctx = new BinaryContext(BinaryCachingMetadataHandler.create(), new IgniteConfiguration(), null);
BinaryMarshaller marsh = new BinaryMarshaller();
marsh.setContext(new MarshallerContextImpl(null, null));
ctx.configure(marsh);
BinaryReaderExImpl reader = new BinaryReaderExImpl(ctx, new BinaryHeapInputStream(msg.payload()), null, true);
byte cmd = reader.readByte();
if (cmd != ClientListenerRequest.HANDSHAKE) {
U.warn(log, "Unexpected client request (will close session): " + ses.remoteAddress());
ses.close();
return;
}
short verMajor = reader.readShort();
short verMinor = reader.readShort();
short verMaintenance = reader.readShort();
ClientListenerProtocolVersion ver = ClientListenerProtocolVersion.create(verMajor, verMinor, verMaintenance);
BinaryWriterExImpl writer = new BinaryWriterExImpl(null, new BinaryHeapOutputStream(8), null, null);
byte clientType = reader.readByte();
ClientListenerConnectionContext connCtx = null;
try {
connCtx = prepareContext(clientType, ses);
ensureClientPermissions(clientType);
if (connCtx.isVersionSupported(ver)) {
connCtx.initializeFromHandshake(ses, ver, reader);
ses.addMeta(CONN_CTX_META_KEY, connCtx);
}
else
throw new IgniteCheckedException("Unsupported version: " + ver.asString());
cancelHandshakeTimeout(ses);
connCtx.handler().writeHandshake(writer);
metrics.onHandshakeAccept(clientType);
}
catch (IgniteAccessControlException authEx) {
metrics.onFailedAuth();
writer.writeBoolean(false);
writer.writeShort((short)0);
writer.writeShort((short)0);
writer.writeShort((short)0);
writer.doWriteString(authEx.getMessage());
if (ver.compareTo(ClientConnectionContext.VER_1_1_0) >= 0)
writer.writeInt(ClientStatus.AUTH_FAILED);
}
catch (IgniteCheckedException e) {
U.warn(log, "Error during handshake [rmtAddr=" + ses.remoteAddress() + ", msg=" + e.getMessage() + ']');
metrics.onGeneralReject();
ClientListenerProtocolVersion currVer;
if (connCtx == null)
currVer = ClientListenerProtocolVersion.create(0, 0, 0);
else
currVer = connCtx.defaultVersion();
writer.writeBoolean(false);
writer.writeShort(currVer.major());
writer.writeShort(currVer.minor());
writer.writeShort(currVer.maintenance());
writer.doWriteString(e.getMessage());
if (ver.compareTo(ClientConnectionContext.VER_1_1_0) >= 0)
writer.writeInt(ClientStatus.FAILED);
}
ses.send(new ClientMessage(writer.array()));
}
/**
* Prepare context.
*
* @param ses Client's NIO session.
* @param clientType Client type.
* @return Context.
* @throws IgniteCheckedException If failed.
*/
private ClientListenerConnectionContext prepareContext(byte clientType, GridNioSession ses)
throws IgniteCheckedException {
long connId = nextConnectionId();
switch (clientType) {
case ODBC_CLIENT:
return new OdbcConnectionContext(ctx, ses, busyLock, connId, maxCursors);
case JDBC_CLIENT:
return new JdbcConnectionContext(ctx, ses, busyLock, connId, maxCursors);
case THIN_CLIENT:
return new ClientConnectionContext(ctx, ses, connId, maxCursors, thinCfg);
}
throw new IgniteCheckedException("Unknown client type: " + clientType);
}
/**
* Generate unique connection id.
* @return connection id.
*/
private long nextConnectionId() {
return (ctx.discovery().localNode().order() << 32) + nextConnId.getAndIncrement();
}
/**
* Ensures if the given type of client is enabled by config.
*
* @param clientType Client type.
* @throws IgniteCheckedException If failed.
*/
private void ensureClientPermissions(byte clientType) throws IgniteCheckedException {
switch (clientType) {
case ODBC_CLIENT: {
if (!cliConnCfg.isOdbcEnabled())
throw new IgniteCheckedException("ODBC connection is not allowed, " +
"see ClientConnectorConfiguration.odbcEnabled.");
break;
}
case JDBC_CLIENT: {
if (!cliConnCfg.isJdbcEnabled())
throw new IgniteCheckedException("JDBC connection is not allowed, " +
"see ClientConnectorConfiguration.jdbcEnabled.");
break;
}
case THIN_CLIENT: {
if (!cliConnCfg.isThinClientEnabled())
throw new IgniteCheckedException("Thin client connection is not allowed, " +
"see ClientConnectorConfiguration.thinClientEnabled.");
break;
}
default:
throw new IgniteCheckedException("Unknown client type: " + clientType);
}
}
}