blob: 7663a808c8855e77c1c42a727fd8cf32b64b2d34 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.jdbc.thin;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.sql.SQLException;
import java.util.List;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cache.query.QueryCancelledException;
import org.apache.ignite.internal.binary.BinaryReaderExImpl;
import org.apache.ignite.internal.binary.BinaryWriterExImpl;
import org.apache.ignite.internal.binary.streams.BinaryHeapInputStream;
import org.apache.ignite.internal.binary.streams.BinaryHeapOutputStream;
import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
import org.apache.ignite.internal.processors.odbc.ClientListenerNioListener;
import org.apache.ignite.internal.processors.odbc.ClientListenerProtocolVersion;
import org.apache.ignite.internal.processors.odbc.ClientListenerRequest;
import org.apache.ignite.internal.processors.odbc.SqlStateCode;
import org.apache.ignite.internal.processors.odbc.jdbc.JdbcBatchExecuteRequest;
import org.apache.ignite.internal.processors.odbc.jdbc.JdbcOrderedBatchExecuteRequest;
import org.apache.ignite.internal.processors.odbc.jdbc.JdbcQuery;
import org.apache.ignite.internal.processors.odbc.jdbc.JdbcQueryCancelRequest;
import org.apache.ignite.internal.processors.odbc.jdbc.JdbcQueryCloseRequest;
import org.apache.ignite.internal.processors.odbc.jdbc.JdbcQueryExecuteRequest;
import org.apache.ignite.internal.processors.odbc.jdbc.JdbcQueryFetchRequest;
import org.apache.ignite.internal.processors.odbc.jdbc.JdbcQueryMetadataRequest;
import org.apache.ignite.internal.processors.odbc.jdbc.JdbcRequest;
import org.apache.ignite.internal.processors.odbc.jdbc.JdbcResponse;
import org.apache.ignite.internal.processors.odbc.jdbc.JdbcUtils;
import org.apache.ignite.internal.util.ipc.loopback.IpcClientTcpEndpoint;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteProductVersion;
import static java.lang.Math.abs;
import static org.apache.ignite.internal.jdbc.thin.JdbcThinUtils.nullableBooleanToByte;
/**
* JDBC IO layer implementation based on blocking IPC streams.
*/
public class JdbcThinTcpIo {
/** Version 2.1.0. */
private static final ClientListenerProtocolVersion VER_2_1_0 = ClientListenerProtocolVersion.create(2, 1, 0);
/** Version 2.1.5: added "lazy" flag. */
private static final ClientListenerProtocolVersion VER_2_1_5 = ClientListenerProtocolVersion.create(2, 1, 5);
/** Version 2.3.1. */
private static final ClientListenerProtocolVersion VER_2_3_0 = ClientListenerProtocolVersion.create(2, 3, 0);
/** Version 2.4.0. */
private static final ClientListenerProtocolVersion VER_2_4_0 = ClientListenerProtocolVersion.create(2, 4, 0);
/** Version 2.5.0. */
private static final ClientListenerProtocolVersion VER_2_5_0 = ClientListenerProtocolVersion.create(2, 5, 0);
/** Version 2.7.0. */
private static final ClientListenerProtocolVersion VER_2_7_0 = ClientListenerProtocolVersion.create(2, 7, 0);
/** Version 2.8.0. */
private static final ClientListenerProtocolVersion VER_2_8_0 = ClientListenerProtocolVersion.create(2, 8, 0);
/** Current version. */
private static final ClientListenerProtocolVersion CURRENT_VER = VER_2_8_0;
/** Initial output stream capacity for handshake. */
private static final int HANDSHAKE_MSG_SIZE = 13;
/** Initial output for query message. */
private static final int DYNAMIC_SIZE_MSG_CAP = 256;
/** Maximum batch query count. */
private static final int MAX_BATCH_QRY_CNT = 32;
/** Initial output for query fetch message. */
private static final int QUERY_FETCH_MSG_SIZE = 13;
/** Initial output for query fetch message. */
private static final int QUERY_META_MSG_SIZE = 9;
/** Initial output for query close message. */
private static final int QUERY_CLOSE_MSG_SIZE = 9;
/** Random. */
private static final AtomicLong IDX_GEN = new AtomicLong(new Random(U.currentTimeMillis()).nextLong());
/** Connection properties. */
private final ConnectionProperties connProps;
/** Socket address. */
private final InetSocketAddress sockAddr;
/** Endpoint. */
private final IpcClientTcpEndpoint endpoint;
/** Output stream. */
private final BufferedOutputStream out;
/** Input stream. */
private final BufferedInputStream in;
/** Connected flag. */
private volatile boolean connected;
/** Ignite server version. */
private final IgniteProductVersion igniteVer;
/** Node Id. */
private final UUID nodeId;
/** Connection mutex. */
private final Object connMux = new Object();
/** Current protocol version used to connection to Ignite. */
private final ClientListenerProtocolVersion srvProtoVer;
/**
* Start connection and perform handshake.
*
* @param connProps Connection properties.
* @param sockAddr Socket address.
* @param timeout Socket connection timeout in ms.
*
* @throws SQLException On connection error or reject.
* @throws IOException On IO error in handshake.
*/
public JdbcThinTcpIo(ConnectionProperties connProps, InetSocketAddress sockAddr, int timeout)
throws SQLException, IOException {
this.connProps = connProps;
this.sockAddr = sockAddr;
Socket sock = null;
try {
if (ConnectionProperties.SSL_MODE_REQUIRE.equalsIgnoreCase(connProps.getSslMode()))
sock = JdbcThinSSLUtil.createSSLSocket(sockAddr, connProps);
else if (ConnectionProperties.SSL_MODE_DISABLE.equalsIgnoreCase(connProps.getSslMode())) {
sock = new Socket();
try {
sock.connect(sockAddr, timeout);
}
catch (IOException e) {
throw new SQLException("Failed to connect to server [host=" + sockAddr.getHostName() +
", port=" + sockAddr.getPort() + ']', SqlStateCode.CLIENT_CONNECTION_FAILED, e);
}
}
else {
throw new SQLException("Unknown sslMode. [sslMode=" + connProps.getSslMode() + ']',
SqlStateCode.CLIENT_CONNECTION_FAILED);
}
if (connProps.getSocketSendBuffer() != 0)
sock.setSendBufferSize(connProps.getSocketSendBuffer());
if (connProps.getSocketReceiveBuffer() != 0)
sock.setReceiveBufferSize(connProps.getSocketReceiveBuffer());
sock.setTcpNoDelay(connProps.isTcpNoDelay());
BufferedOutputStream out = null;
BufferedInputStream in = null;
try {
endpoint = new IpcClientTcpEndpoint(sock);
out = new BufferedOutputStream(endpoint.outputStream());
in = new BufferedInputStream(endpoint.inputStream());
connected = true;
this.in = in;
this.out = out;
}
catch (IgniteCheckedException e) {
U.closeQuiet(in);
U.closeQuiet(out);
throw new SQLException("Failed to connect to server [url=" + connProps.getUrl() +
" address=" + sockAddr + ']', SqlStateCode.CLIENT_CONNECTION_FAILED, e);
}
}
catch (Exception e) {
if (sock != null && !sock.isClosed())
U.closeQuiet(sock);
throw e;
}
HandshakeResult handshakeRes = handshake(CURRENT_VER);
igniteVer = handshakeRes.igniteVersion();
nodeId = handshakeRes.nodeId();
srvProtoVer = handshakeRes.serverProtocolVersion();
}
/**
* Used for versions: 2.1.5 and 2.3.0. The protocol version is changed but handshake format isn't changed.
*
* @param ver JDBC client version.
* @throws IOException On IO error.
* @throws SQLException On connection reject.
*/
private HandshakeResult handshake(ClientListenerProtocolVersion ver) throws IOException, SQLException {
BinaryWriterExImpl writer = new BinaryWriterExImpl(null, new BinaryHeapOutputStream(HANDSHAKE_MSG_SIZE),
null, null);
writer.writeByte((byte)ClientListenerRequest.HANDSHAKE);
writer.writeShort(ver.major());
writer.writeShort(ver.minor());
writer.writeShort(ver.maintenance());
writer.writeByte(ClientListenerNioListener.JDBC_CLIENT);
writer.writeBoolean(connProps.isDistributedJoins());
writer.writeBoolean(connProps.isEnforceJoinOrder());
writer.writeBoolean(connProps.isCollocated());
writer.writeBoolean(connProps.isReplicatedOnly());
writer.writeBoolean(connProps.isAutoCloseServerCursor());
writer.writeBoolean(connProps.isLazy());
writer.writeBoolean(connProps.isSkipReducerOnUpdate());
if (ver.compareTo(VER_2_7_0) >= 0)
writer.writeString(connProps.nestedTxMode());
if (ver.compareTo(VER_2_8_0) >= 0) {
writer.writeByte(nullableBooleanToByte(connProps.isDataPageScanEnabled()));
JdbcUtils.writeNullableInteger(writer, connProps.getUpdateBatchSize());
}
if (!F.isEmpty(connProps.getUsername())) {
assert ver.compareTo(VER_2_5_0) >= 0 : "Authentication is supported since 2.5";
writer.writeString(connProps.getUsername());
writer.writeString(connProps.getPassword());
}
send(writer.array());
BinaryReaderExImpl reader = new BinaryReaderExImpl(null, new BinaryHeapInputStream(read()),
null, null, false);
boolean accepted = reader.readBoolean();
if (accepted) {
HandshakeResult handshakeRes = new HandshakeResult();
if (reader.available() > 0) {
byte maj = reader.readByte();
byte min = reader.readByte();
byte maintenance = reader.readByte();
String stage = reader.readString();
long ts = reader.readLong();
byte[] hash = reader.readByteArray();
if (ver.compareTo(VER_2_8_0) >= 0)
handshakeRes.nodeId(reader.readUuid());
handshakeRes.igniteVersion(new IgniteProductVersion(maj, min, maintenance, stage, ts, hash));
}
else {
handshakeRes.igniteVersion(
new IgniteProductVersion((byte)2, (byte)0, (byte)0, "Unknown", 0L, null));
}
handshakeRes.serverProtocolVersion(ver);
return handshakeRes;
}
else {
short maj = reader.readShort();
short min = reader.readShort();
short maintenance = reader.readShort();
String err = reader.readString();
ClientListenerProtocolVersion srvProtoVer0 = ClientListenerProtocolVersion.create(maj, min, maintenance);
if (srvProtoVer0.compareTo(VER_2_5_0) < 0 && !F.isEmpty(connProps.getUsername())) {
throw new SQLException("Authentication doesn't support by remote server[driverProtocolVer="
+ CURRENT_VER + ", remoteNodeProtocolVer=" + srvProtoVer0 + ", err=" + err
+ ", url=" + connProps.getUrl() + " address=" + sockAddr + ']', SqlStateCode.CONNECTION_REJECTED);
}
if (VER_2_7_0.equals(srvProtoVer0)
|| VER_2_5_0.equals(srvProtoVer0)
|| VER_2_4_0.equals(srvProtoVer0)
|| VER_2_3_0.equals(srvProtoVer0)
|| VER_2_1_5.equals(srvProtoVer0))
return handshake(srvProtoVer0);
else if (VER_2_1_0.equals(srvProtoVer0))
return handshake_2_1_0();
else {
throw new SQLException("Handshake failed [driverProtocolVer=" + CURRENT_VER +
", remoteNodeProtocolVer=" + srvProtoVer0 + ", err=" + err + ']',
SqlStateCode.CONNECTION_REJECTED);
}
}
}
/**
* Compatibility handshake for server version 2.1.0
*
* @throws IOException On IO error.
* @throws SQLException On connection reject.
*/
private HandshakeResult handshake_2_1_0() throws IOException, SQLException {
BinaryWriterExImpl writer = new BinaryWriterExImpl(null, new BinaryHeapOutputStream(HANDSHAKE_MSG_SIZE),
null, null);
writer.writeByte((byte)ClientListenerRequest.HANDSHAKE);
writer.writeShort(VER_2_1_0.major());
writer.writeShort(VER_2_1_0.minor());
writer.writeShort(VER_2_1_0.maintenance());
writer.writeByte(ClientListenerNioListener.JDBC_CLIENT);
writer.writeBoolean(connProps.isDistributedJoins());
writer.writeBoolean(connProps.isEnforceJoinOrder());
writer.writeBoolean(connProps.isCollocated());
writer.writeBoolean(connProps.isReplicatedOnly());
writer.writeBoolean(connProps.isAutoCloseServerCursor());
send(writer.array());
BinaryReaderExImpl reader = new BinaryReaderExImpl(null, new BinaryHeapInputStream(read()),
null, null, false);
boolean accepted = reader.readBoolean();
if (accepted) {
HandshakeResult handshakeRes = new HandshakeResult();
handshakeRes.igniteVersion(
new IgniteProductVersion((byte)2, (byte)1, (byte)0, "Unknown", 0L, null));
handshakeRes.serverProtocolVersion(VER_2_1_0);
return handshakeRes;
}
else {
short maj = reader.readShort();
short min = reader.readShort();
short maintenance = reader.readShort();
String err = reader.readString();
ClientListenerProtocolVersion ver = ClientListenerProtocolVersion.create(maj, min, maintenance);
throw new SQLException("Handshake failed [driverProtocolVer=" + CURRENT_VER +
", remoteNodeProtocolVer=" + ver + ", err=" + err + ']', SqlStateCode.CONNECTION_REJECTED);
}
}
/**
* @param req Request.
* @throws IOException In case of IO error.
* @throws SQLException On error.
*/
void sendBatchRequestNoWaitResponse(JdbcOrderedBatchExecuteRequest req) throws IOException, SQLException {
if (!isUnorderedStreamSupported()) {
throw new SQLException("Streaming without response doesn't supported by server [driverProtocolVer="
+ CURRENT_VER + ", remoteNodeVer=" + igniteVer + ']', SqlStateCode.INTERNAL_ERROR);
}
sendRequestRaw(req);
}
/**
* @param req Request.
* @param stmt Statement.
* @return Server response.
* @throws IOException In case of IO error.
*/
JdbcResponse sendRequest(JdbcRequest req, JdbcThinStatement stmt) throws IOException {
if (stmt != null) {
synchronized (stmt.cancellationMutex()) {
if (stmt.isCancelled()) {
if (req instanceof JdbcQueryCloseRequest)
return new JdbcResponse(null);
return new JdbcResponse(IgniteQueryErrorCode.QUERY_CANCELED, QueryCancelledException.ERR_MSG);
}
sendRequestRaw(req);
if (req instanceof JdbcQueryExecuteRequest || req instanceof JdbcBatchExecuteRequest)
stmt.currentRequestMeta(req.requestId(), this);
}
}
else
sendRequestRaw(req);
JdbcResponse resp = readResponse();
return stmt != null && stmt.isCancelled() ?
new JdbcResponse(IgniteQueryErrorCode.QUERY_CANCELED, QueryCancelledException.ERR_MSG) :
resp;
}
/**
* Sends cancel request.
*
* @param cancellationReq contains request id to be cancelled
* @throws IOException In case of IO error.
*/
void sendCancelRequest(JdbcQueryCancelRequest cancellationReq) throws IOException {
sendRequestRaw(cancellationReq);
}
/**
* @return Server response.
* @throws IOException In case of IO error.
*/
JdbcResponse readResponse() throws IOException {
BinaryReaderExImpl reader = new BinaryReaderExImpl(null, new BinaryHeapInputStream(read()), null,
null, false);
JdbcResponse res = new JdbcResponse();
res.readBinary(reader, srvProtoVer);
return res;
}
/**
* Try to guess request capacity.
*
* @param req Request.
* @return Expected capacity.
*/
private static int guessCapacity(JdbcRequest req) {
int cap;
if (req instanceof JdbcBatchExecuteRequest) {
List<JdbcQuery> qrys = ((JdbcBatchExecuteRequest)req).queries();
int cnt = !F.isEmpty(qrys) ? Math.min(MAX_BATCH_QRY_CNT, qrys.size()) : 0;
// One additional byte for autocommit and last batch flags.
cap = cnt * DYNAMIC_SIZE_MSG_CAP + 2;
}
else if (req instanceof JdbcQueryCloseRequest)
cap = QUERY_CLOSE_MSG_SIZE;
else if (req instanceof JdbcQueryMetadataRequest)
cap = QUERY_META_MSG_SIZE;
else if (req instanceof JdbcQueryFetchRequest)
cap = QUERY_FETCH_MSG_SIZE;
else
cap = DYNAMIC_SIZE_MSG_CAP;
return cap;
}
/**
* @param req Request.
* @throws IOException In case of IO error.
*/
private void sendRequestRaw(JdbcRequest req) throws IOException {
int cap = guessCapacity(req);
BinaryWriterExImpl writer = new BinaryWriterExImpl(null, new BinaryHeapOutputStream(cap),
null, null);
req.writeBinary(writer, srvProtoVer);
synchronized (connMux) {
send(writer.array());
}
}
/**
* @param req JDBC request bytes.
* @throws IOException On error.
*/
private void send(byte[] req) throws IOException {
int size = req.length;
out.write(size & 0xFF);
out.write((size >> 8) & 0xFF);
out.write((size >> 16) & 0xFF);
out.write((size >> 24) & 0xFF);
out.write(req);
out.flush();
}
/**
* @return Bytes of a response from server.
* @throws IOException On error.
*/
private byte[] read() throws IOException {
byte[] sizeBytes = read(4);
int msgSize = (((0xFF & sizeBytes[3]) << 24) | ((0xFF & sizeBytes[2]) << 16)
| ((0xFF & sizeBytes[1]) << 8) + (0xFF & sizeBytes[0]));
return read(msgSize);
}
/**
* @param size Count of bytes to read from stream.
* @return Read bytes.
* @throws IOException On error.
*/
private byte[] read(int size) throws IOException {
int off = 0;
byte[] data = new byte[size];
while (off != size) {
int res = in.read(data, off, size - off);
if (res == -1)
throw new IOException("Failed to read incoming message (not enough data).");
off += res;
}
return data;
}
/**
* Close the client IO.
*/
public void close() {
if (!connected)
return;
// Clean up resources.
U.closeQuiet(out);
U.closeQuiet(in);
if (endpoint != null)
endpoint.close();
connected = false;
}
/**
* @return Connection properties.
*/
public ConnectionProperties connectionProperties() {
return connProps;
}
/**
* @return Ignite server version.
*/
IgniteProductVersion igniteVersion() {
return igniteVer;
}
/**
* @return {@code true} If the unordered streaming supported.
*/
boolean isUnorderedStreamSupported() {
assert srvProtoVer != null;
return srvProtoVer.compareTo(VER_2_5_0) >= 0;
}
/**
* @return True if query cancellation supported, false otherwise.
*/
boolean isQueryCancellationSupported() {
assert srvProtoVer != null;
return srvProtoVer.compareTo(VER_2_8_0) >= 0;
}
/**
* @return True if affinity awareness supported, false otherwise.
*/
boolean isAffinityAwarenessSupported() {
assert srvProtoVer != null;
return srvProtoVer.compareTo(VER_2_8_0) >= 0;
}
/**
* Get next server index.
*
* @param len Number of servers.
* @return Index of the next server to connect to.
*/
private static int nextServerIndex(int len) {
if (len == 1)
return 0;
else {
long nextIdx = IDX_GEN.getAndIncrement();
return (int)(abs(nextIdx) % len);
}
}
/**
* Enable/disable socket timeout with specified timeout.
*
* @param ms the specified timeout, in milliseconds.
* @throws SQLException if there is an error in the underlying protocol.
*/
public void timeout(int ms) throws SQLException {
endpoint.timeout(ms);
}
/**
* Returns socket timeout.
*
* @throws SQLException if there is an error in the underlying protocol.
*/
public int timeout() throws SQLException {
return endpoint.timeout();
}
/**
* @return Node Id.
*/
public UUID nodeId() {
return nodeId;
}
/**
* @return Socket address.
*/
public InetSocketAddress socketAddress() {
return sockAddr;
}
/**
* @return Connected flag.
*/
public boolean connected() {
return connected;
}
}