blob: 0eb6ac4d480e386c757e599c543fb0a4cee119a6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.odbc;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.configuration.ClientConnectorConfiguration;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.binary.BinaryReaderExImpl;
import org.apache.ignite.internal.binary.BinaryWriterExImpl;
import org.apache.ignite.internal.binary.streams.BinaryHeapInputStream;
import org.apache.ignite.internal.binary.streams.BinaryHeapOutputStream;
import org.apache.ignite.internal.binary.streams.BinaryInputStream;
import org.apache.ignite.internal.processors.authentication.IgniteAccessControlException;
import org.apache.ignite.internal.processors.odbc.jdbc.JdbcConnectionContext;
import org.apache.ignite.internal.processors.odbc.odbc.OdbcConnectionContext;
import org.apache.ignite.internal.processors.platform.client.ClientConnectionContext;
import org.apache.ignite.internal.processors.platform.client.ClientStatus;
import org.apache.ignite.internal.util.GridSpinBusyLock;
import org.apache.ignite.internal.util.nio.GridNioServerListenerAdapter;
import org.apache.ignite.internal.util.nio.GridNioSession;
import org.apache.ignite.internal.util.nio.GridNioSessionMetaKey;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.jetbrains.annotations.Nullable;
/**
* Client message listener.
*/
public class ClientListenerNioListener extends GridNioServerListenerAdapter<byte[]> {
/** ODBC driver handshake code. */
public static final byte ODBC_CLIENT = 0;
/** JDBC driver handshake code. */
public static final byte JDBC_CLIENT = 1;
/** Thin client handshake code. */
public static final byte THIN_CLIENT = 2;
/** Connection handshake passed. */
public static final int CONN_CTX_HANDSHAKE_PASSED = GridNioSessionMetaKey.nextUniqueKey();
/** Maximum size of the handshake message. */
public static final int MAX_HANDSHAKE_MSG_SIZE = 128;
/** Connection-related metadata key. */
static final int CONN_CTX_META_KEY = GridNioSessionMetaKey.nextUniqueKey();
/** Next connection id. */
private static AtomicInteger nextConnId = new AtomicInteger(1);
/** Busy lock. */
private final GridSpinBusyLock busyLock;
/** Kernal context. */
private final GridKernalContext ctx;
/** Maximum allowed cursors. */
private final int maxCursors;
/** Logger. */
private final IgniteLogger log;
/** Client connection config. */
private ClientConnectorConfiguration cliConnCfg;
/**
* Constructor.
*
* @param ctx Context.
* @param busyLock Shutdown busy lock.
* @param cliConnCfg Client connector configuration.
*/
public ClientListenerNioListener(GridKernalContext ctx, GridSpinBusyLock busyLock,
ClientConnectorConfiguration cliConnCfg) {
assert cliConnCfg != null;
this.ctx = ctx;
this.busyLock = busyLock;
this.maxCursors = cliConnCfg.getMaxOpenCursorsPerConnection();
this.cliConnCfg = cliConnCfg;
log = ctx.log(getClass());
}
/** {@inheritDoc} */
@Override public void onConnected(GridNioSession ses) {
if (log.isDebugEnabled())
log.debug("Client connected: " + ses.remoteAddress());
}
/** {@inheritDoc} */
@Override public void onDisconnected(GridNioSession ses, @Nullable Exception e) {
ClientListenerConnectionContext connCtx = ses.meta(CONN_CTX_META_KEY);
if (connCtx != null)
connCtx.onDisconnected();
if (log.isDebugEnabled()) {
if (e == null)
log.debug("Client disconnected: " + ses.remoteAddress());
else
log.debug("Client disconnected due to an error [addr=" + ses.remoteAddress() + ", err=" + e + ']');
}
}
/** {@inheritDoc} */
@Override public void onMessage(GridNioSession ses, byte[] msg) {
assert msg != null;
ClientListenerConnectionContext connCtx = ses.meta(CONN_CTX_META_KEY);
if (connCtx == null) {
onHandshake(ses, msg);
return;
}
ClientListenerMessageParser parser = connCtx.parser();
ClientListenerRequestHandler handler = connCtx.handler();
ClientListenerRequest req;
try {
req = parser.decode(msg);
}
catch (Exception e) {
U.error(log, "Failed to parse client request.", e);
ses.close();
return;
}
assert req != null;
try {
long startTime = 0;
if (log.isDebugEnabled()) {
startTime = System.nanoTime();
log.debug("Client request received [reqId=" + req.requestId() + ", addr=" +
ses.remoteAddress() + ", req=" + req + ']');
}
ClientListenerResponse resp = handler.handle(req);
if (resp != null) {
if (log.isDebugEnabled()) {
long dur = (System.nanoTime() - startTime) / 1000;
log.debug("Client request processed [reqId=" + req.requestId() + ", dur(mcs)=" + dur +
", resp=" + resp.status() + ']');
}
byte[] outMsg = parser.encode(resp);
ses.send(outMsg);
}
}
catch (Exception e) {
U.error(log, "Failed to process client request [req=" + req + ']', e);
ses.send(parser.encode(handler.handleException(e, req)));
}
}
/** {@inheritDoc} */
@Override public void onSessionIdleTimeout(GridNioSession ses) {
ses.close();
}
/**
* Perform handshake.
*
* @param ses Session.
* @param msg Message bytes.
*/
private void onHandshake(GridNioSession ses, byte[] msg) {
BinaryInputStream stream = new BinaryHeapInputStream(msg);
BinaryReaderExImpl reader = new BinaryReaderExImpl(null, stream, null, true);
byte cmd = reader.readByte();
if (cmd != ClientListenerRequest.HANDSHAKE) {
U.warn(log, "Unexpected client request (will close session): " + ses.remoteAddress());
ses.close();
return;
}
short verMajor = reader.readShort();
short verMinor = reader.readShort();
short verMaintenance = reader.readShort();
ClientListenerProtocolVersion ver = ClientListenerProtocolVersion.create(verMajor, verMinor, verMaintenance);
BinaryWriterExImpl writer = new BinaryWriterExImpl(null, new BinaryHeapOutputStream(8), null, null);
byte clientType = reader.readByte();
ClientListenerConnectionContext connCtx = null;
try {
connCtx = prepareContext(ses, clientType);
ensureClientPermissions(clientType);
if (connCtx.isVersionSupported(ver)) {
connCtx.initializeFromHandshake(ver, reader);
ses.addMeta(CONN_CTX_META_KEY, connCtx);
}
else
throw new IgniteCheckedException("Unsupported version.");
connCtx.handler().writeHandshake(writer);
ses.addMeta(CONN_CTX_HANDSHAKE_PASSED, true);
}
catch (IgniteAccessControlException authEx) {
writer.writeBoolean(false);
writer.writeShort((short)0);
writer.writeShort((short)0);
writer.writeShort((short)0);
writer.doWriteString(authEx.getMessage());
if (ver.compareTo(ClientConnectionContext.VER_1_1_0) >= 0)
writer.writeInt(ClientStatus.AUTH_FAILED);
}
catch (IgniteCheckedException e) {
U.warn(log, "Error during handshake [rmtAddr=" + ses.remoteAddress() + ", msg=" + e.getMessage() + ']');
ClientListenerProtocolVersion currVer;
if (connCtx == null)
currVer = ClientListenerProtocolVersion.create(0, 0, 0);
else
currVer = connCtx.currentVersion();
writer.writeBoolean(false);
writer.writeShort(currVer.major());
writer.writeShort(currVer.minor());
writer.writeShort(currVer.maintenance());
writer.doWriteString(e.getMessage());
if (ver.compareTo(ClientConnectionContext.VER_1_1_0) >= 0)
writer.writeInt(ClientStatus.FAILED);
}
ses.send(writer.array());
}
/**
* Prepare context.
*
* @param ses Session.
* @param clientType Client type.
* @return Context.
* @throws IgniteCheckedException If failed.
*/
private ClientListenerConnectionContext prepareContext(GridNioSession ses, byte clientType) throws IgniteCheckedException {
long connId = nextConnectionId();
switch (clientType) {
case ODBC_CLIENT:
return new OdbcConnectionContext(ctx, ses, busyLock, connId, maxCursors);
case JDBC_CLIENT:
return new JdbcConnectionContext(ctx, ses, busyLock, connId, maxCursors);
case THIN_CLIENT:
return new ClientConnectionContext(ctx, connId, maxCursors);
}
throw new IgniteCheckedException("Unknown client type: " + clientType);
}
/**
* Generate unique connection id.
* @return connection id.
*/
private long nextConnectionId() {
return (ctx.discovery().localNode().order() << 32) + nextConnId.getAndIncrement();
}
/**
* Ensures if the given type of client is enabled by config.
*
* @param clientType Client type.
* @throws IgniteCheckedException If failed.
*/
private void ensureClientPermissions(byte clientType) throws IgniteCheckedException {
switch (clientType) {
case ODBC_CLIENT: {
if (!cliConnCfg.isOdbcEnabled())
throw new IgniteCheckedException("ODBC connection is not allowed, " +
"see ClientConnectorConfiguration.odbcEnabled.");
break;
}
case JDBC_CLIENT: {
if (!cliConnCfg.isJdbcEnabled())
throw new IgniteCheckedException("JDBC connection is not allowed, " +
"see ClientConnectorConfiguration.jdbcEnabled.");
break;
}
case THIN_CLIENT: {
if (!cliConnCfg.isThinClientEnabled())
throw new IgniteCheckedException("Thin client connection is not allowed, " +
"see ClientConnectorConfiguration.thinClientEnabled.");
break;
}
default:
throw new IgniteCheckedException("Unknown client type: " + clientType);
}
}
}