blob: 28330e1a5a030021641a5a7e9268153c36e09777 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.client.thin;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.security.KeyManagementException;
import java.security.KeyStore;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.UnrecoverableKeyException;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
import java.util.Arrays;
import java.util.Collection;
import java.util.EnumSet;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Stream;
import javax.cache.configuration.Factory;
import javax.net.ssl.KeyManager;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSocket;
import javax.net.ssl.SSLSocketFactory;
import javax.net.ssl.TrustManager;
import javax.net.ssl.TrustManagerFactory;
import javax.net.ssl.X509TrustManager;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.client.ClientAuthenticationException;
import org.apache.ignite.client.ClientAuthorizationException;
import org.apache.ignite.client.ClientConnectionException;
import org.apache.ignite.client.ClientException;
import org.apache.ignite.client.SslMode;
import org.apache.ignite.client.SslProtocol;
import org.apache.ignite.configuration.ClientConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.binary.BinaryCachingMetadataHandler;
import org.apache.ignite.internal.binary.BinaryContext;
import org.apache.ignite.internal.binary.BinaryPrimitives;
import org.apache.ignite.internal.binary.BinaryReaderExImpl;
import org.apache.ignite.internal.binary.BinaryWriterExImpl;
import org.apache.ignite.internal.binary.streams.BinaryHeapInputStream;
import org.apache.ignite.internal.binary.streams.BinaryHeapOutputStream;
import org.apache.ignite.internal.binary.streams.BinaryInputStream;
import org.apache.ignite.internal.binary.streams.BinaryOutputStream;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.odbc.ClientListenerNioListener;
import org.apache.ignite.internal.processors.odbc.ClientListenerRequest;
import org.apache.ignite.internal.processors.platform.client.ClientFlag;
import org.apache.ignite.internal.processors.platform.client.ClientStatus;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.internal.client.thin.ProtocolBitmaskFeature.USER_ATTRIBUTES;
import static org.apache.ignite.internal.client.thin.ProtocolVersion.LATEST_VER;
import static org.apache.ignite.internal.client.thin.ProtocolVersion.V1_0_0;
import static org.apache.ignite.internal.client.thin.ProtocolVersion.V1_1_0;
import static org.apache.ignite.internal.client.thin.ProtocolVersion.V1_2_0;
import static org.apache.ignite.internal.client.thin.ProtocolVersion.V1_3_0;
import static org.apache.ignite.internal.client.thin.ProtocolVersion.V1_4_0;
import static org.apache.ignite.internal.client.thin.ProtocolVersion.V1_5_0;
import static org.apache.ignite.internal.client.thin.ProtocolVersion.V1_6_0;
import static org.apache.ignite.internal.client.thin.ProtocolVersion.V1_7_0;
import static org.apache.ignite.internal.client.thin.ProtocolVersionFeature.AUTHORIZATION;
import static org.apache.ignite.internal.client.thin.ProtocolVersionFeature.BITMAP_FEATURES;
import static org.apache.ignite.internal.client.thin.ProtocolVersionFeature.PARTITION_AWARENESS;
/**
* Implements {@link ClientChannel} over TCP.
*/
class TcpClientChannel implements ClientChannel {
/** Protocol version used by default on first connection attempt. */
private static final ProtocolVersion DEFAULT_VERSION = LATEST_VER;
/** Receiver thread prefix. */
static final String RECEIVER_THREAD_PREFIX = "thin-client-channel#";
/** Supported protocol versions. */
private static final Collection<ProtocolVersion> supportedVers = Arrays.asList(
V1_7_0,
V1_6_0,
V1_5_0,
V1_4_0,
V1_3_0,
V1_2_0,
V1_1_0,
V1_0_0
);
/** Protocol context. */
private ProtocolContext protocolCtx;
/** Server node ID. */
private UUID srvNodeId;
/** Server topology version. */
private AffinityTopologyVersion srvTopVer;
/** Channel. */
private final Socket sock;
/** Output stream. */
private final OutputStream out;
/** Data input. */
private final ByteCountingDataInput dataInput;
/** Request id. */
private final AtomicLong reqId = new AtomicLong(1);
/** Send lock. */
private final Lock sndLock = new ReentrantLock();
/** Pending requests. */
private final Map<Long, ClientRequestFuture> pendingReqs = new ConcurrentHashMap<>();
/** Topology change listeners. */
private final Collection<Consumer<ClientChannel>> topChangeLsnrs = new CopyOnWriteArrayList<>();
/** Notification listeners. */
private final Collection<NotificationListener> notificationLsnrs = new CopyOnWriteArrayList<>();
/** Closed flag. */
private final AtomicBoolean closed = new AtomicBoolean();
/** Executor for async operation listeners. */
private final Executor asyncContinuationExecutor;
/** Receiver thread (processes incoming messages). */
private Thread receiverThread;
/** Constructor. */
TcpClientChannel(ClientChannelConfiguration cfg)
throws ClientConnectionException, ClientAuthenticationException, ClientProtocolError {
validateConfiguration(cfg);
Executor cfgExec = cfg.getAsyncContinuationExecutor();
asyncContinuationExecutor = cfgExec != null ? cfgExec : ForkJoinPool.commonPool();
try {
sock = createSocket(cfg);
out = sock.getOutputStream();
dataInput = new ByteCountingDataInput(sock.getInputStream());
}
catch (IOException e) {
throw handleIOError("addr=" + cfg.getAddress(), e);
}
handshake(DEFAULT_VERSION, cfg.getUserName(), cfg.getUserPassword(), cfg.getUserAttributes());
}
/** {@inheritDoc} */
@Override public void close() {
close(null);
}
/**
* Close the channel with cause.
*/
private void close(Throwable cause) {
if (closed.compareAndSet(false, true)) {
U.closeQuiet(dataInput);
U.closeQuiet(out);
U.closeQuiet(sock);
sndLock.lock(); // Lock here to prevent creation of new pending requests.
try {
for (ClientRequestFuture pendingReq : pendingReqs.values())
pendingReq.onDone(new ClientConnectionException("Channel is closed", cause));
if (receiverThread != null)
receiverThread.interrupt();
}
finally {
sndLock.unlock();
}
}
}
/** {@inheritDoc} */
@Override public <T> T service(
ClientOperation op,
Consumer<PayloadOutputChannel> payloadWriter,
Function<PayloadInputChannel, T> payloadReader
) throws ClientConnectionException, ClientAuthorizationException, ClientServerError, ClientException {
long id = send(op, payloadWriter);
return receive(id, payloadReader);
}
/** {@inheritDoc} */
@Override public <T> CompletableFuture<T> serviceAsync(
ClientOperation op,
Consumer<PayloadOutputChannel> payloadWriter,
Function<PayloadInputChannel, T> payloadReader
) {
try {
long id = send(op, payloadWriter);
return receiveAsync(id, payloadReader);
} catch (Throwable t) {
CompletableFuture<T> fut = new CompletableFuture<>();
fut.completeExceptionally(t);
return fut;
}
}
/**
* @param op Operation.
* @param payloadWriter Payload writer to stream or {@code null} if request has no payload.
* @return Request ID.
*/
private long send(ClientOperation op, Consumer<PayloadOutputChannel> payloadWriter)
throws ClientException, ClientConnectionException {
long id = reqId.getAndIncrement();
// Only one thread at a time can have access to write to the channel.
sndLock.lock();
try (PayloadOutputChannel payloadCh = new PayloadOutputChannel(this)) {
if (closed())
throw new ClientConnectionException("Channel is closed");
initReceiverThread(); // Start the receiver thread with the first request.
pendingReqs.put(id, new ClientRequestFuture());
BinaryOutputStream req = payloadCh.out();
req.writeInt(0); // Reserve an integer for the request size.
req.writeShort(op.code());
req.writeLong(id);
if (payloadWriter != null)
payloadWriter.accept(payloadCh);
req.writeInt(0, req.position() - 4); // Actual size.
write(req.array(), req.position());
}
catch (Throwable t) {
pendingReqs.remove(id);
throw t;
}
finally {
sndLock.unlock();
}
return id;
}
/**
* @param reqId ID of the request to receive the response for.
* @param payloadReader Payload reader from stream.
* @return Received operation payload or {@code null} if response has no payload.
*/
private <T> T receive(long reqId, Function<PayloadInputChannel, T> payloadReader)
throws ClientServerError, ClientException, ClientConnectionException, ClientAuthorizationException {
ClientRequestFuture pendingReq = pendingReqs.get(reqId);
assert pendingReq != null : "Pending request future not found for request " + reqId;
try {
byte[] payload = pendingReq.get();
if (payload == null || payloadReader == null)
return null;
return payloadReader.apply(new PayloadInputChannel(this, payload));
}
catch (IgniteCheckedException e) {
if (e.getCause() instanceof ClientError)
throw new ClientException(e.getMessage(), e.getCause());
if (e.getCause() instanceof ClientConnectionException)
throw new ClientConnectionException(e.getMessage(), e.getCause());
if (e.getCause() instanceof ClientException)
throw new ClientException(e.getMessage(), e.getCause());
throw new ClientException(e.getMessage(), e);
}
finally {
pendingReqs.remove(reqId);
}
}
/**
* Receives the response asynchronously.
*
* @param reqId ID of the request to receive the response for.
* @param payloadReader Payload reader from stream.
* @return Future for the operation.
*/
private <T> CompletableFuture<T> receiveAsync(long reqId, Function<PayloadInputChannel, T> payloadReader) {
ClientRequestFuture pendingReq = pendingReqs.get(reqId);
assert pendingReq != null : "Pending request future not found for request " + reqId;
CompletableFuture<T> fut = new CompletableFuture<>();
pendingReq.listen(payloadFut -> asyncContinuationExecutor.execute(() -> {
try {
byte[] payload = payloadFut.get();
if (payload == null || payloadReader == null) {
fut.complete(null);
} else {
T res = payloadReader.apply(new PayloadInputChannel(this, payload));
fut.complete(res);
}
} catch (Throwable t) {
fut.completeExceptionally(convertException(t));
} finally {
pendingReqs.remove(reqId);
}
}));
return fut;
}
/**
* Converts exception to {@link org.apache.ignite.internal.processors.platform.client.IgniteClientException}.
* @param e Exception to convert.
* @return Resulting exception.
*/
private RuntimeException convertException(Throwable e) {
if (e.getCause() instanceof ClientError)
return new ClientException(e.getMessage(), e.getCause());
if (e.getCause() instanceof ClientConnectionException)
return new ClientConnectionException(e.getMessage(), e.getCause());
if (e.getCause() instanceof ClientException)
return new ClientException(e.getMessage(), e.getCause());
return new ClientException(e.getMessage(), e);
}
/**
* Init and start receiver thread if it wasn't started before.
*
* Note: Method should be called only under external synchronization.
*/
private void initReceiverThread() {
if (receiverThread == null) {
Socket sock = this.sock;
String sockInfo = sock == null ? null : sock.getInetAddress().getHostName() + ":" + sock.getPort();
receiverThread = new Thread(() -> {
try {
while (!closed())
processNextMessage();
}
catch (Throwable e) {
close(e);
}
}, RECEIVER_THREAD_PREFIX + sockInfo);
receiverThread.setDaemon(true);
receiverThread.start();
}
}
/**
* Process next message from the input stream and complete corresponding future.
*/
private void processNextMessage() throws ClientProtocolError, ClientConnectionException {
// blocking read a message header not to fall into a busy loop
int msgSize = dataInput.readInt(2048);
if (msgSize <= 0)
throw new ClientProtocolError(String.format("Invalid message size: %s", msgSize));
long bytesReadOnStartMsg = dataInput.totalBytesRead();
long resId = dataInput.spinReadLong();
int status = 0;
ClientOperation notificationOp = null;
BinaryInputStream resIn;
if (protocolCtx.isFeatureSupported(PARTITION_AWARENESS)) {
short flags = dataInput.spinReadShort();
if ((flags & ClientFlag.AFFINITY_TOPOLOGY_CHANGED) != 0) {
long topVer = dataInput.spinReadLong();
int minorTopVer = dataInput.spinReadInt();
srvTopVer = new AffinityTopologyVersion(topVer, minorTopVer);
for (Consumer<ClientChannel> lsnr : topChangeLsnrs)
lsnr.accept(this);
}
if ((flags & ClientFlag.NOTIFICATION) != 0) {
short notificationCode = dataInput.spinReadShort();
notificationOp = ClientOperation.fromCode(notificationCode);
if (notificationOp == null || !notificationOp.isNotification())
throw new ClientProtocolError(String.format("Unexpected notification code [%d]", notificationCode));
}
if ((flags & ClientFlag.ERROR) != 0)
status = dataInput.spinReadInt();
}
else
status = dataInput.spinReadInt();
int hdrSize = (int)(dataInput.totalBytesRead() - bytesReadOnStartMsg);
byte[] res = null;
Exception err = null;
if (status == 0) {
if (msgSize > hdrSize)
res = dataInput.spinRead(msgSize - hdrSize);
}
else if (status == ClientStatus.SECURITY_VIOLATION)
err = new ClientAuthorizationException();
else {
resIn = new BinaryHeapInputStream(dataInput.spinRead(msgSize - hdrSize));
String errMsg = ClientUtils.createBinaryReader(null, resIn).readString();
err = new ClientServerError(errMsg, status, resId);
}
if (notificationOp == null) { // Respone received.
ClientRequestFuture pendingReq = pendingReqs.get(resId);
if (pendingReq == null)
throw new ClientProtocolError(String.format("Unexpected response ID [%s]", resId));
pendingReq.onDone(res, err);
}
else { // Notification received.
for (NotificationListener lsnr : notificationLsnrs)
lsnr.acceptNotification(this, notificationOp, resId, res, err);
}
}
/** {@inheritDoc} */
@Override public ProtocolContext protocolCtx() {
return protocolCtx;
}
/** {@inheritDoc} */
@Override public UUID serverNodeId() {
return srvNodeId;
}
/** {@inheritDoc} */
@Override public AffinityTopologyVersion serverTopologyVersion() {
return srvTopVer;
}
/** {@inheritDoc} */
@Override public void addTopologyChangeListener(Consumer<ClientChannel> lsnr) {
topChangeLsnrs.add(lsnr);
}
/** {@inheritDoc} */
@Override public void addNotificationListener(NotificationListener lsnr) {
notificationLsnrs.add(lsnr);
}
/** {@inheritDoc} */
@Override public boolean closed() {
return closed.get();
}
/** Validate {@link ClientConfiguration}. */
private static void validateConfiguration(ClientChannelConfiguration cfg) {
String error = null;
InetSocketAddress addr = cfg.getAddress();
if (addr == null)
error = "At least one Ignite server node must be specified in the Ignite client configuration";
else if (addr.getPort() < 1024 || addr.getPort() > 49151)
error = String.format("Ignite client port %s is out of valid ports range 1024...49151", addr.getPort());
if (error != null)
throw new IllegalArgumentException(error);
}
/** Create socket. */
private static Socket createSocket(ClientChannelConfiguration cfg) throws IOException {
Socket sock = cfg.getSslMode() == SslMode.REQUIRED ?
new ClientSslSocketFactory(cfg).create() :
new Socket(cfg.getAddress().getHostName(), cfg.getAddress().getPort());
sock.setTcpNoDelay(cfg.isTcpNoDelay());
if (cfg.getTimeout() > 0)
sock.setSoTimeout(cfg.getTimeout());
if (cfg.getSendBufferSize() > 0)
sock.setSendBufferSize(cfg.getSendBufferSize());
if (cfg.getReceiveBufferSize() > 0)
sock.setReceiveBufferSize(cfg.getReceiveBufferSize());
return sock;
}
/** Client handshake. */
private void handshake(ProtocolVersion ver, String user, String pwd, Map<String, String> userAttrs)
throws ClientConnectionException, ClientAuthenticationException, ClientProtocolError {
handshakeReq(ver, user, pwd, userAttrs);
handshakeRes(ver, user, pwd, userAttrs);
}
/** Send handshake request. */
private void handshakeReq(ProtocolVersion proposedVer, String user, String pwd,
Map<String, String> userAttrs) throws ClientConnectionException {
BinaryContext ctx = new BinaryContext(BinaryCachingMetadataHandler.create(), new IgniteConfiguration(), null);
try (BinaryWriterExImpl writer = new BinaryWriterExImpl(ctx, new BinaryHeapOutputStream(32), null, null)) {
ProtocolContext protocolCtx = protocolContextFromVersion(proposedVer);
writer.writeInt(0); // reserve an integer for the request size
writer.writeByte((byte) ClientListenerRequest.HANDSHAKE);
writer.writeShort(proposedVer.major());
writer.writeShort(proposedVer.minor());
writer.writeShort(proposedVer.patch());
writer.writeByte(ClientListenerNioListener.THIN_CLIENT);
if (protocolCtx.isFeatureSupported(BITMAP_FEATURES)) {
byte[] features = ProtocolBitmaskFeature.featuresAsBytes(protocolCtx.features());
writer.writeByteArray(features);
}
if (protocolCtx.isFeatureSupported(USER_ATTRIBUTES))
writer.writeMap(userAttrs);
boolean authSupported = protocolCtx.isFeatureSupported(AUTHORIZATION);
if (authSupported && user != null && !user.isEmpty()) {
writer.writeString(user);
writer.writeString(pwd);
}
writer.out().writeInt(0, writer.out().position() - 4);// actual size
write(writer.array(), writer.out().position());
}
}
/**
* @param ver Protocol version.
* @return Protocol context for a version.
*/
private ProtocolContext protocolContextFromVersion(ProtocolVersion ver) {
EnumSet<ProtocolBitmaskFeature> features = null;
if (ProtocolContext.isFeatureSupported(ver, BITMAP_FEATURES))
features = ProtocolBitmaskFeature.allFeaturesAsEnumSet();
return new ProtocolContext(ver, features);
}
/** Receive and handle handshake response. */
private void handshakeRes(ProtocolVersion proposedVer, String user, String pwd, Map<String, String> userAttrs)
throws ClientConnectionException, ClientAuthenticationException, ClientProtocolError {
int resSize = dataInput.readInt();
if (resSize <= 0)
throw new ClientProtocolError(String.format("Invalid handshake response size: %s", resSize));
BinaryInputStream res = new BinaryHeapInputStream(dataInput.read(resSize));
try (BinaryReaderExImpl reader = ClientUtils.createBinaryReader(null, res)) {
boolean success = res.readBoolean();
if (success) {
byte[] features = new byte[0];
if (ProtocolContext.isFeatureSupported(proposedVer, BITMAP_FEATURES))
features = reader.readByteArray();
protocolCtx = new ProtocolContext(proposedVer, ProtocolBitmaskFeature.enumSet(features));
if (protocolCtx.isFeatureSupported(PARTITION_AWARENESS)) {
// Reading server UUID
srvNodeId = reader.readUuid();
}
} else {
ProtocolVersion srvVer = new ProtocolVersion(res.readShort(), res.readShort(), res.readShort());
String err = reader.readString();
int errCode = ClientStatus.FAILED;
if (res.remaining() > 0)
errCode = reader.readInt();
if (errCode == ClientStatus.AUTH_FAILED)
throw new ClientAuthenticationException(err);
else if (proposedVer.equals(srvVer))
throw new ClientProtocolError(err);
else if (!supportedVers.contains(srvVer) ||
(!ProtocolContext.isFeatureSupported(srvVer, AUTHORIZATION) && !F.isEmpty(user)))
// Server version is not supported by this client OR server version is less than 1.1.0 supporting
// authentication and authentication is required.
throw new ClientProtocolError(String.format(
"Protocol version mismatch: client %s / server %s. Server details: %s",
proposedVer,
srvVer,
err
));
else { // Retry with server version.
handshake(srvVer, user, pwd, userAttrs);
}
}
}
catch (IOException e) {
throw handleIOError(e);
}
}
/** Write bytes to the output stream. */
private void write(byte[] bytes, int len) throws ClientConnectionException {
try {
out.write(bytes, 0, len);
out.flush();
}
catch (IOException e) {
throw handleIOError(e);
}
}
/**
* @param ex IO exception (cause).
*/
private ClientException handleIOError(@Nullable IOException ex) {
return handleIOError("sock=" + sock, ex);
}
/**
* @param chInfo Additional channel info
* @param ex IO exception (cause).
*/
private ClientException handleIOError(String chInfo, @Nullable IOException ex) {
return new ClientConnectionException("Ignite cluster is unavailable [" + chInfo + ']', ex);
}
/**
* Auxiliary class to read byte buffers and numeric values, counting total bytes read.
* Numeric values are read in the little-endian byte order.
*/
private class ByteCountingDataInput implements AutoCloseable {
/** Input stream. */
private final InputStream in;
/** Total bytes read from the input stream. */
private long totalBytesRead;
/** Temporary buffer to read long, int and short values. */
private final byte[] tmpBuf = new byte[Long.BYTES];
/**
* @param in Input stream.
*/
public ByteCountingDataInput(InputStream in) {
this.in = in;
}
/** Read bytes from the input stream. */
public byte[] read(int len) throws ClientConnectionException {
byte[] bytes = new byte[len];
read(bytes, len, 0);
return bytes;
}
/** Read bytes from the input stream. */
public byte[] spinRead(int len) {
byte[] bytes = new byte[len];
read(bytes, len, Integer.MAX_VALUE);
return bytes;
}
/**
* Read bytes from the input stream to the buffer.
*
* @param bytes Bytes buffer.
* @param len Length.
* @param tryReadCnt Number of reads before falling into blocking read.
*/
public void read(byte[] bytes, int len, int tryReadCnt) throws ClientConnectionException {
int offset = 0;
try {
while (offset < len) {
int toRead;
if (tryReadCnt == 0)
toRead = len - offset;
else if ((toRead = Math.min(in.available(), len - offset)) == 0) {
tryReadCnt--;
continue;
}
int read = in.read(bytes, offset, toRead);
if (read < 0)
throw handleIOError(null);
offset += read;
totalBytesRead += read;
}
}
catch (IOException e) {
throw handleIOError(e);
}
}
/**
* Read long value from the input stream.
*/
public long readLong() throws ClientConnectionException {
return readLong(0);
}
/**
* Read long value from the input stream.
*/
public long spinReadLong() throws ClientConnectionException {
return readLong(Integer.MAX_VALUE);
}
/**
* Read long value from the input stream.
*
* @param tryReadCnt Number of reads before falling into blocking read.
*/
private long readLong(int tryReadCnt) throws ClientConnectionException {
read(tmpBuf, Long.BYTES, tryReadCnt);
return BinaryPrimitives.readLong(tmpBuf, 0);
}
/**
* Read int value from the input stream.
*/
public int readInt() throws ClientConnectionException {
return readInt(0);
}
/**
* Read int value from the input stream.
*/
public int spinReadInt() throws ClientConnectionException {
return readInt(Integer.MAX_VALUE);
}
/**
* Read int value from the input stream.
*
* @param tryReadCnt Number of reads before falling into blocking read.
*/
private int readInt(int tryReadCnt) throws ClientConnectionException {
read(tmpBuf, Integer.BYTES, tryReadCnt);
return BinaryPrimitives.readInt(tmpBuf, 0);
}
/**
* Read short value from the input stream.
*/
public short readShort() throws ClientConnectionException {
return readShort(0);
}
/**
* Read short value from the input stream.
*/
public short spinReadShort() throws ClientConnectionException {
return readShort(Integer.MAX_VALUE);
}
/**
* Read short value from the input stream.
*
* @param tryReadCnt Number of reads before falling into blocking read.
*/
public short readShort(int tryReadCnt) throws ClientConnectionException {
read(tmpBuf, Short.BYTES, tryReadCnt);
return BinaryPrimitives.readShort(tmpBuf, 0);
}
/**
* Gets total bytes read from the input stream.
*/
public long totalBytesRead() {
return totalBytesRead;
}
/**
* Close input stream.
*/
@Override public void close() throws IOException {
in.close();
}
}
/**
*
*/
private static class ClientRequestFuture extends GridFutureAdapter<byte[]> {
}
/** SSL Socket Factory. */
private static class ClientSslSocketFactory {
/** Trust manager ignoring all certificate checks. */
private static final TrustManager ignoreErrorsTrustMgr = new X509TrustManager() {
@Override public X509Certificate[] getAcceptedIssuers() {
return null;
}
@Override public void checkServerTrusted(X509Certificate[] arg0, String arg1) {
}
@Override public void checkClientTrusted(X509Certificate[] arg0, String arg1) {
}
};
/** Config. */
private final ClientChannelConfiguration cfg;
/** Constructor. */
ClientSslSocketFactory(ClientChannelConfiguration cfg) {
this.cfg = cfg;
}
/** Create SSL socket. */
SSLSocket create() throws IOException {
InetSocketAddress addr = cfg.getAddress();
SSLSocket sock = (SSLSocket)getSslSocketFactory(cfg).createSocket(addr.getHostName(), addr.getPort());
sock.setUseClientMode(true);
sock.startHandshake();
return sock;
}
/** Create SSL socket factory. */
private static SSLSocketFactory getSslSocketFactory(ClientChannelConfiguration cfg) {
Factory<SSLContext> sslCtxFactory = cfg.getSslContextFactory();
if (sslCtxFactory != null) {
try {
return sslCtxFactory.create().getSocketFactory();
}
catch (Exception e) {
throw new ClientError("SSL Context Factory failed", e);
}
}
BiFunction<String, String, String> or = (val, dflt) -> val == null || val.isEmpty() ? dflt : val;
String keyStore = or.apply(
cfg.getSslClientCertificateKeyStorePath(),
System.getProperty("javax.net.ssl.keyStore")
);
String keyStoreType = or.apply(
cfg.getSslClientCertificateKeyStoreType(),
or.apply(System.getProperty("javax.net.ssl.keyStoreType"), "JKS")
);
String keyStorePwd = or.apply(
cfg.getSslClientCertificateKeyStorePassword(),
System.getProperty("javax.net.ssl.keyStorePassword")
);
String trustStore = or.apply(
cfg.getSslTrustCertificateKeyStorePath(),
System.getProperty("javax.net.ssl.trustStore")
);
String trustStoreType = or.apply(
cfg.getSslTrustCertificateKeyStoreType(),
or.apply(System.getProperty("javax.net.ssl.trustStoreType"), "JKS")
);
String trustStorePwd = or.apply(
cfg.getSslTrustCertificateKeyStorePassword(),
System.getProperty("javax.net.ssl.trustStorePassword")
);
String algorithm = or.apply(cfg.getSslKeyAlgorithm(), "SunX509");
String proto = toString(cfg.getSslProtocol());
if (Stream.of(keyStore, keyStorePwd, keyStoreType, trustStore, trustStorePwd, trustStoreType)
.allMatch(s -> s == null || s.isEmpty())
) {
try {
return SSLContext.getDefault().getSocketFactory();
}
catch (NoSuchAlgorithmException e) {
throw new ClientError("Default SSL context cryptographic algorithm is not available", e);
}
}
KeyManager[] keyManagers = getKeyManagers(algorithm, keyStore, keyStoreType, keyStorePwd);
TrustManager[] trustManagers = cfg.isSslTrustAll() ?
new TrustManager[] {ignoreErrorsTrustMgr} :
getTrustManagers(algorithm, trustStore, trustStoreType, trustStorePwd);
try {
SSLContext sslCtx = SSLContext.getInstance(proto);
sslCtx.init(keyManagers, trustManagers, null);
return sslCtx.getSocketFactory();
}
catch (NoSuchAlgorithmException e) {
throw new ClientError("SSL context cryptographic algorithm is not available", e);
}
catch (KeyManagementException e) {
throw new ClientError("Failed to create SSL Context", e);
}
}
/**
* @return String representation of {@link SslProtocol} as required by {@link SSLContext}.
*/
private static String toString(SslProtocol proto) {
switch (proto) {
case TLSv1_1:
return "TLSv1.1";
case TLSv1_2:
return "TLSv1.2";
default:
return proto.toString();
}
}
/** */
private static KeyManager[] getKeyManagers(
String algorithm,
String keyStore,
String keyStoreType,
String keyStorePwd
) {
KeyManagerFactory keyMgrFactory;
try {
keyMgrFactory = KeyManagerFactory.getInstance(algorithm);
}
catch (NoSuchAlgorithmException e) {
throw new ClientError("Key manager cryptographic algorithm is not available", e);
}
Predicate<String> empty = s -> s == null || s.isEmpty();
if (!empty.test(keyStore) && !empty.test(keyStoreType)) {
char[] pwd = (keyStorePwd == null) ? new char[0] : keyStorePwd.toCharArray();
KeyStore store = loadKeyStore("Client", keyStore, keyStoreType, pwd);
try {
keyMgrFactory.init(store, pwd);
}
catch (UnrecoverableKeyException e) {
throw new ClientError("Could not recover key store key", e);
}
catch (KeyStoreException e) {
throw new ClientError(
String.format("Client key store provider of type [%s] is not available", keyStoreType),
e
);
}
catch (NoSuchAlgorithmException e) {
throw new ClientError("Client key store integrity check algorithm is not available", e);
}
}
return keyMgrFactory.getKeyManagers();
}
/** */
private static TrustManager[] getTrustManagers(
String algorithm,
String trustStore,
String trustStoreType,
String trustStorePwd
) {
TrustManagerFactory trustMgrFactory;
try {
trustMgrFactory = TrustManagerFactory.getInstance(algorithm);
}
catch (NoSuchAlgorithmException e) {
throw new ClientError("Trust manager cryptographic algorithm is not available", e);
}
Predicate<String> empty = s -> s == null || s.isEmpty();
if (!empty.test(trustStore) && !empty.test(trustStoreType)) {
char[] pwd = (trustStorePwd == null) ? new char[0] : trustStorePwd.toCharArray();
KeyStore store = loadKeyStore("Trust", trustStore, trustStoreType, pwd);
try {
trustMgrFactory.init(store);
}
catch (KeyStoreException e) {
throw new ClientError(
String.format("Trust key store provider of type [%s] is not available", trustStoreType),
e
);
}
}
return trustMgrFactory.getTrustManagers();
}
/** */
private static KeyStore loadKeyStore(String lb, String path, String type, char[] pwd) {
KeyStore store;
try {
store = KeyStore.getInstance(type);
}
catch (KeyStoreException e) {
throw new ClientError(
String.format("%s key store provider of type [%s] is not available", lb, type),
e
);
}
try (InputStream in = new FileInputStream(new File(path))) {
store.load(in, pwd);
return store;
}
catch (FileNotFoundException e) {
throw new ClientError(String.format("%s key store file [%s] does not exist", lb, path), e);
}
catch (NoSuchAlgorithmException e) {
throw new ClientError(
String.format("%s key store integrity check algorithm is not available", lb),
e
);
}
catch (CertificateException e) {
throw new ClientError(String.format("Could not load certificate from %s key store", lb), e);
}
catch (IOException e) {
throw new ClientError(String.format("Could not read %s key store", lb), e);
}
}
}
}