| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.internal.processors.odbc; |
| |
| import java.io.Closeable; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import org.apache.ignite.IgniteCheckedException; |
| import org.apache.ignite.IgniteLogger; |
| import org.apache.ignite.configuration.ClientConnectorConfiguration; |
| import org.apache.ignite.configuration.IgniteConfiguration; |
| import org.apache.ignite.configuration.ThinClientConfiguration; |
| import org.apache.ignite.failure.FailureContext; |
| import org.apache.ignite.failure.FailureType; |
| import org.apache.ignite.internal.GridKernalContext; |
| import org.apache.ignite.internal.MarshallerContextImpl; |
| import org.apache.ignite.internal.binary.BinaryCachingMetadataHandler; |
| import org.apache.ignite.internal.binary.BinaryContext; |
| import org.apache.ignite.internal.binary.BinaryMarshaller; |
| import org.apache.ignite.internal.binary.BinaryReaderExImpl; |
| import org.apache.ignite.internal.binary.BinaryWriterExImpl; |
| import org.apache.ignite.internal.binary.streams.BinaryHeapInputStream; |
| import org.apache.ignite.internal.binary.streams.BinaryHeapOutputStream; |
| import org.apache.ignite.internal.processors.authentication.IgniteAccessControlException; |
| import org.apache.ignite.internal.processors.odbc.jdbc.JdbcConnectionContext; |
| import org.apache.ignite.internal.processors.odbc.odbc.OdbcConnectionContext; |
| import org.apache.ignite.internal.processors.platform.client.ClientConnectionContext; |
| import org.apache.ignite.internal.processors.platform.client.ClientStatus; |
| import org.apache.ignite.internal.processors.security.OperationSecurityContext; |
| import org.apache.ignite.internal.util.GridSpinBusyLock; |
| import org.apache.ignite.internal.util.nio.GridNioFuture; |
| import org.apache.ignite.internal.util.nio.GridNioServerListenerAdapter; |
| import org.apache.ignite.internal.util.nio.GridNioSession; |
| import org.apache.ignite.internal.util.nio.GridNioSessionMetaKey; |
| import org.apache.ignite.internal.util.typedef.internal.U; |
| import org.jetbrains.annotations.Nullable; |
| |
| /** |
| * Client message listener. |
| */ |
| public class ClientListenerNioListener extends GridNioServerListenerAdapter<ClientMessage> { |
| /** ODBC driver handshake code. */ |
| public static final byte ODBC_CLIENT = 0; |
| |
| /** JDBC driver handshake code. */ |
| public static final byte JDBC_CLIENT = 1; |
| |
| /** Thin client handshake code. */ |
| public static final byte THIN_CLIENT = 2; |
| |
| /** Connection handshake timeout task. */ |
| public static final int CONN_CTX_HANDSHAKE_TIMEOUT_TASK = GridNioSessionMetaKey.nextUniqueKey(); |
| |
| /** Connection-related metadata key. */ |
| public static final int CONN_CTX_META_KEY = GridNioSessionMetaKey.nextUniqueKey(); |
| |
| /** Next connection id. */ |
| private static AtomicInteger nextConnId = new AtomicInteger(1); |
| |
| /** Busy lock. */ |
| private final GridSpinBusyLock busyLock; |
| |
| /** Kernal context. */ |
| private final GridKernalContext ctx; |
| |
| /** Maximum allowed cursors. */ |
| private final int maxCursors; |
| |
| /** Logger. */ |
| private final IgniteLogger log; |
| |
| /** Client connection config. */ |
| private final ClientConnectorConfiguration cliConnCfg; |
| |
| /** Thin client configuration. */ |
| private final ThinClientConfiguration thinCfg; |
| |
| /** Metrics. */ |
| private final ClientListenerMetrics metrics; |
| |
| /** |
| * Constructor. |
| * |
| * @param ctx Context. |
| * @param busyLock Shutdown busy lock. |
| * @param cliConnCfg Client connector configuration. |
| */ |
| public ClientListenerNioListener(GridKernalContext ctx, GridSpinBusyLock busyLock, |
| ClientConnectorConfiguration cliConnCfg) { |
| assert cliConnCfg != null; |
| |
| this.ctx = ctx; |
| this.busyLock = busyLock; |
| this.cliConnCfg = cliConnCfg; |
| |
| maxCursors = cliConnCfg.getMaxOpenCursorsPerConnection(); |
| log = ctx.log(getClass()); |
| |
| thinCfg = cliConnCfg.getThinClientConfiguration() == null ? new ThinClientConfiguration() |
| : new ThinClientConfiguration(cliConnCfg.getThinClientConfiguration()); |
| |
| metrics = new ClientListenerMetrics(ctx); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void onConnected(GridNioSession ses) { |
| if (log.isDebugEnabled()) |
| log.debug("Client connected: " + ses.remoteAddress()); |
| |
| long handshakeTimeout = cliConnCfg.getHandshakeTimeout(); |
| |
| if (handshakeTimeout > 0) |
| scheduleHandshakeTimeout(ses, handshakeTimeout); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void onDisconnected(GridNioSession ses, @Nullable Exception e) { |
| ClientListenerConnectionContext connCtx = ses.meta(CONN_CTX_META_KEY); |
| |
| if (connCtx != null) { |
| connCtx.onDisconnected(); |
| |
| metrics.onDisconnect(connCtx.clientType()); |
| } |
| |
| if (log.isDebugEnabled()) { |
| if (e == null) |
| log.debug("Client disconnected: " + ses.remoteAddress()); |
| else |
| log.debug("Client disconnected due to an error [addr=" + ses.remoteAddress() + ", err=" + e + ']'); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void onMessage(GridNioSession ses, ClientMessage msg) { |
| assert msg != null; |
| |
| ClientListenerConnectionContext connCtx = ses.meta(CONN_CTX_META_KEY); |
| |
| if (connCtx == null) { |
| try { |
| onHandshake(ses, msg); |
| } |
| catch (Exception e) { |
| U.error(log, "Failed to handle handshake request " + |
| "(probably, connection has already been closed).", e); |
| } |
| |
| return; |
| } |
| |
| ClientListenerMessageParser parser = connCtx.parser(); |
| ClientListenerRequestHandler handler = connCtx.handler(); |
| |
| ClientListenerRequest req; |
| |
| try { |
| req = parser.decode(msg); |
| } |
| catch (Exception e) { |
| try { |
| handler.unregisterRequest(parser.decodeRequestId(msg)); |
| } |
| catch (Exception e1) { |
| U.error(log, "Failed to unregister request.", e1); |
| } |
| |
| U.error(log, "Failed to parse client request.", e); |
| |
| ses.close(); |
| |
| return; |
| } |
| |
| assert req != null; |
| |
| try { |
| long startTime = 0; |
| |
| if (log.isDebugEnabled()) { |
| startTime = System.nanoTime(); |
| |
| log.debug("Client request received [reqId=" + req.requestId() + ", addr=" + |
| ses.remoteAddress() + ", req=" + req + ']'); |
| } |
| |
| ClientListenerResponse resp; |
| |
| try (OperationSecurityContext s = ctx.security().withContext(connCtx.securityContext())) { |
| resp = handler.handle(req); |
| } |
| |
| if (resp != null) { |
| if (log.isDebugEnabled()) { |
| long dur = (System.nanoTime() - startTime) / 1000; |
| |
| log.debug("Client request processed [reqId=" + req.requestId() + ", dur(mcs)=" + dur + |
| ", resp=" + resp.status() + ']'); |
| } |
| |
| GridNioFuture<?> fut = ses.send(parser.encode(resp)); |
| |
| fut.listen(f -> { |
| if (f.error() == null) |
| resp.onSent(); |
| }); |
| } |
| } |
| catch (Throwable e) { |
| handler.unregisterRequest(req.requestId()); |
| |
| U.error(log, "Failed to process client request [req=" + req + ']', e); |
| |
| ses.send(parser.encode(handler.handleException(e, req))); |
| |
| if (e instanceof Error) |
| throw (Error)e; |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void onSessionIdleTimeout(GridNioSession ses) { |
| ses.close(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void onFailure(FailureType failureType, Throwable failure) { |
| if (failure instanceof OutOfMemoryError) |
| ctx.failure().process(new FailureContext(failureType, failure)); |
| } |
| |
| /** |
| * Schedule handshake timeout. |
| * @param ses Connection session. |
| * @param handshakeTimeout Handshake timeout. |
| */ |
| private void scheduleHandshakeTimeout(GridNioSession ses, long handshakeTimeout) { |
| assert handshakeTimeout > 0; |
| |
| Closeable timeoutTask = ctx.timeout().schedule(new Runnable() { |
| @Override public void run() { |
| ses.close(); |
| |
| metrics.onHandshakeTimeout(); |
| |
| U.warn(log, "Unable to perform handshake within timeout " + |
| "[timeout=" + handshakeTimeout + ", remoteAddr=" + ses.remoteAddress() + ']'); |
| } |
| }, handshakeTimeout, -1); |
| |
| ses.addMeta(CONN_CTX_HANDSHAKE_TIMEOUT_TASK, timeoutTask); |
| } |
| |
| /** |
| * Cancel handshake timeout task execution. |
| * @param ses Connection session. |
| */ |
| private void cancelHandshakeTimeout(GridNioSession ses) { |
| Closeable timeoutTask = ses.removeMeta(CONN_CTX_HANDSHAKE_TIMEOUT_TASK); |
| |
| try { |
| if (timeoutTask != null) |
| timeoutTask.close(); |
| } catch (Exception e) { |
| U.warn(log, "Failed to cancel handshake timeout task " + |
| "[remoteAddr=" + ses.remoteAddress() + ", err=" + e + ']'); |
| } |
| } |
| |
| /** |
| * Perform handshake. |
| * |
| * @param ses Session. |
| * @param msg Message bytes. |
| */ |
| private void onHandshake(GridNioSession ses, ClientMessage msg) { |
| BinaryContext ctx = new BinaryContext(BinaryCachingMetadataHandler.create(), new IgniteConfiguration(), null); |
| |
| BinaryMarshaller marsh = new BinaryMarshaller(); |
| |
| marsh.setContext(new MarshallerContextImpl(null, null)); |
| |
| ctx.configure(marsh); |
| |
| BinaryReaderExImpl reader = new BinaryReaderExImpl(ctx, new BinaryHeapInputStream(msg.payload()), null, true); |
| |
| byte cmd = reader.readByte(); |
| |
| if (cmd != ClientListenerRequest.HANDSHAKE) { |
| U.warn(log, "Unexpected client request (will close session): " + ses.remoteAddress()); |
| |
| ses.close(); |
| |
| return; |
| } |
| |
| short verMajor = reader.readShort(); |
| short verMinor = reader.readShort(); |
| short verMaintenance = reader.readShort(); |
| |
| ClientListenerProtocolVersion ver = ClientListenerProtocolVersion.create(verMajor, verMinor, verMaintenance); |
| |
| BinaryWriterExImpl writer = new BinaryWriterExImpl(null, new BinaryHeapOutputStream(8), null, null); |
| |
| byte clientType = reader.readByte(); |
| |
| ClientListenerConnectionContext connCtx = null; |
| |
| try { |
| connCtx = prepareContext(clientType, ses); |
| |
| ensureClientPermissions(clientType); |
| |
| if (connCtx.isVersionSupported(ver)) { |
| connCtx.initializeFromHandshake(ses, ver, reader); |
| |
| ses.addMeta(CONN_CTX_META_KEY, connCtx); |
| } |
| else |
| throw new IgniteCheckedException("Unsupported version: " + ver.asString()); |
| |
| cancelHandshakeTimeout(ses); |
| |
| connCtx.handler().writeHandshake(writer); |
| |
| metrics.onHandshakeAccept(clientType); |
| } |
| catch (IgniteAccessControlException authEx) { |
| metrics.onFailedAuth(); |
| |
| writer.writeBoolean(false); |
| |
| writer.writeShort((short)0); |
| writer.writeShort((short)0); |
| writer.writeShort((short)0); |
| |
| writer.doWriteString(authEx.getMessage()); |
| |
| if (ver.compareTo(ClientConnectionContext.VER_1_1_0) >= 0) |
| writer.writeInt(ClientStatus.AUTH_FAILED); |
| } |
| catch (IgniteCheckedException e) { |
| U.warn(log, "Error during handshake [rmtAddr=" + ses.remoteAddress() + ", msg=" + e.getMessage() + ']'); |
| |
| metrics.onGeneralReject(); |
| |
| ClientListenerProtocolVersion currVer; |
| |
| if (connCtx == null) |
| currVer = ClientListenerProtocolVersion.create(0, 0, 0); |
| else |
| currVer = connCtx.defaultVersion(); |
| |
| writer.writeBoolean(false); |
| |
| writer.writeShort(currVer.major()); |
| writer.writeShort(currVer.minor()); |
| writer.writeShort(currVer.maintenance()); |
| |
| writer.doWriteString(e.getMessage()); |
| |
| if (ver.compareTo(ClientConnectionContext.VER_1_1_0) >= 0) |
| writer.writeInt(ClientStatus.FAILED); |
| } |
| |
| ses.send(new ClientMessage(writer.array())); |
| } |
| |
| /** |
| * Prepare context. |
| * |
| * @param ses Client's NIO session. |
| * @param clientType Client type. |
| * @return Context. |
| * @throws IgniteCheckedException If failed. |
| */ |
| private ClientListenerConnectionContext prepareContext(byte clientType, GridNioSession ses) |
| throws IgniteCheckedException { |
| long connId = nextConnectionId(); |
| |
| switch (clientType) { |
| case ODBC_CLIENT: |
| return new OdbcConnectionContext(ctx, ses, busyLock, connId, maxCursors); |
| |
| case JDBC_CLIENT: |
| return new JdbcConnectionContext(ctx, ses, busyLock, connId, maxCursors); |
| |
| case THIN_CLIENT: |
| return new ClientConnectionContext(ctx, ses, connId, maxCursors, thinCfg); |
| } |
| |
| throw new IgniteCheckedException("Unknown client type: " + clientType); |
| } |
| |
| /** |
| * Generate unique connection id. |
| * @return connection id. |
| */ |
| private long nextConnectionId() { |
| return (ctx.discovery().localNode().order() << 32) + nextConnId.getAndIncrement(); |
| } |
| |
| /** |
| * Ensures if the given type of client is enabled by config. |
| * |
| * @param clientType Client type. |
| * @throws IgniteCheckedException If failed. |
| */ |
| private void ensureClientPermissions(byte clientType) throws IgniteCheckedException { |
| switch (clientType) { |
| case ODBC_CLIENT: { |
| if (!cliConnCfg.isOdbcEnabled()) |
| throw new IgniteCheckedException("ODBC connection is not allowed, " + |
| "see ClientConnectorConfiguration.odbcEnabled."); |
| break; |
| } |
| |
| case JDBC_CLIENT: { |
| if (!cliConnCfg.isJdbcEnabled()) |
| throw new IgniteCheckedException("JDBC connection is not allowed, " + |
| "see ClientConnectorConfiguration.jdbcEnabled."); |
| |
| break; |
| } |
| |
| case THIN_CLIENT: { |
| if (!cliConnCfg.isThinClientEnabled()) |
| throw new IgniteCheckedException("Thin client connection is not allowed, " + |
| "see ClientConnectorConfiguration.thinClientEnabled."); |
| |
| break; |
| } |
| |
| default: |
| throw new IgniteCheckedException("Unknown client type: " + clientType); |
| } |
| } |
| } |