| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.internal.client.impl.connection; |
| |
| import java.io.IOException; |
| import java.net.InetSocketAddress; |
| import java.nio.ByteOrder; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.Iterator; |
| import java.util.LinkedHashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.UUID; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ConcurrentMap; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.ScheduledExecutorService; |
| import java.util.logging.Level; |
| import java.util.logging.Logger; |
| import javax.net.ssl.SSLContext; |
| import org.apache.ignite.IgniteCheckedException; |
| import org.apache.ignite.IgniteLogger; |
| import org.apache.ignite.failure.FailureType; |
| import org.apache.ignite.internal.client.GridClientClosedException; |
| import org.apache.ignite.internal.client.GridClientConfiguration; |
| import org.apache.ignite.internal.client.GridClientException; |
| import org.apache.ignite.internal.client.GridClientHandshakeException; |
| import org.apache.ignite.internal.client.GridClientNode; |
| import org.apache.ignite.internal.client.GridClientProtocol; |
| import org.apache.ignite.internal.client.GridServerUnreachableException; |
| import org.apache.ignite.internal.client.impl.GridClientFutureAdapter; |
| import org.apache.ignite.internal.client.impl.GridClientThreadFactory; |
| import org.apache.ignite.internal.client.marshaller.GridClientMarshaller; |
| import org.apache.ignite.internal.client.marshaller.optimized.GridClientZipOptimizedMarshaller; |
| import org.apache.ignite.internal.client.util.GridClientStripedLock; |
| import org.apache.ignite.internal.client.util.GridClientUtils; |
| import org.apache.ignite.internal.processors.rest.client.message.GridClientHandshakeResponse; |
| import org.apache.ignite.internal.processors.rest.client.message.GridClientMessage; |
| import org.apache.ignite.internal.processors.rest.client.message.GridClientNodeStateBeforeStartRequest; |
| import org.apache.ignite.internal.processors.rest.client.message.GridClientPingPacket; |
| import org.apache.ignite.internal.processors.rest.protocols.tcp.GridTcpRestParser; |
| import org.apache.ignite.internal.util.nio.GridNioCodecFilter; |
| import org.apache.ignite.internal.util.nio.GridNioFilter; |
| import org.apache.ignite.internal.util.nio.GridNioServer; |
| import org.apache.ignite.internal.util.nio.GridNioServerListener; |
| import org.apache.ignite.internal.util.nio.GridNioSession; |
| import org.apache.ignite.internal.util.nio.ssl.GridNioSslFilter; |
| import org.apache.ignite.internal.util.typedef.F; |
| import org.apache.ignite.internal.util.typedef.internal.U; |
| import org.apache.ignite.logger.java.JavaLogger; |
| import org.apache.ignite.plugin.security.SecurityCredentials; |
| import org.jetbrains.annotations.Nullable; |
| |
| import static java.util.concurrent.TimeUnit.MILLISECONDS; |
| import static java.util.logging.Level.INFO; |
| import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_MACS; |
| import static org.apache.ignite.internal.client.impl.connection.GridClientConnectionCloseReason.CLIENT_CLOSED; |
| import static org.apache.ignite.internal.client.impl.connection.GridClientConnectionCloseReason.FAILED; |
| |
| /** |
| * Cached connections manager. |
| */ |
| public abstract class GridClientConnectionManagerAdapter implements GridClientConnectionManager { |
| /** Count of reconnect retries before init considered failed. */ |
| private static final int INIT_RETRY_CNT = 3; |
| |
| /** Initialization retry interval. */ |
| private static final int INIT_RETRY_INTERVAL = 1000; |
| |
| /** Class logger. */ |
| private final Logger log; |
| |
| /** All local enabled MACs. */ |
| private final Collection<String> macs; |
| |
| /** NIO server. */ |
| private GridNioServer srv; |
| |
| /** Active connections. */ |
| private final ConcurrentMap<InetSocketAddress, GridClientConnection> conns = new ConcurrentHashMap<>(); |
| |
| /** Active connections of nodes. */ |
| private final ConcurrentMap<UUID, GridClientConnection> nodeConns = new ConcurrentHashMap<>(); |
| |
| /** SSL context. */ |
| private final SSLContext sslCtx; |
| |
| /** Client configuration. */ |
| protected final GridClientConfiguration cfg; |
| |
| /** Topology. */ |
| private final GridClientTopology top; |
| |
| /** Client id. */ |
| private final UUID clientId; |
| |
| /** Router endpoints to use instead of topology info. */ |
| private final Collection<InetSocketAddress> routers; |
| |
| /** Closed flag. */ |
| private volatile boolean closed; |
| |
| /** Shared executor service. */ |
| private final ExecutorService executor; |
| |
| /** Endpoint striped lock. */ |
| private final GridClientStripedLock endpointStripedLock = new GridClientStripedLock(16); |
| |
| /** Service for ping requests, {@code null} if HTTP protocol is used. */ |
| private final ScheduledExecutorService pingExecutor; |
| |
| /** Marshaller ID. */ |
| private final Byte marshId; |
| |
| /** Connecting to a node before starting it without getting/updating topology. */ |
| private final boolean beforeNodeStart; |
| |
| /** |
| * @param clientId Client ID. |
| * @param sslCtx SSL context to enable secured connection or {@code null} to use unsecured one. |
| * @param cfg Client configuration. |
| * @param routers Routers or empty collection to use endpoints from topology info. |
| * @param top Topology. |
| * @param marshId Marshaller ID. |
| * @param beforeNodeStart Connecting to a node before starting it without getting/updating topology. |
| * @throws GridClientException In case of error. |
| */ |
| @SuppressWarnings("unchecked") |
| protected GridClientConnectionManagerAdapter(UUID clientId, |
| SSLContext sslCtx, |
| GridClientConfiguration cfg, |
| Collection<InetSocketAddress> routers, |
| GridClientTopology top, |
| @Nullable Byte marshId, |
| boolean routerClient, |
| boolean beforeNodeStart |
| ) throws GridClientException { |
| assert clientId != null : "clientId != null"; |
| assert cfg != null : "cfg != null"; |
| assert routers != null : "routers != null"; |
| assert top != null : "top != null"; |
| |
| this.clientId = clientId; |
| this.sslCtx = sslCtx; |
| this.cfg = cfg; |
| this.routers = new ArrayList<>(routers); |
| this.top = top; |
| this.beforeNodeStart = beforeNodeStart; |
| |
| log = Logger.getLogger(getClass().getName()); |
| |
| executor = cfg.getExecutorService() != null ? cfg.getExecutorService() : |
| Executors.newCachedThreadPool(new GridClientThreadFactory("exec", true)); |
| |
| pingExecutor = cfg.getProtocol() == GridClientProtocol.TCP ? Executors.newScheduledThreadPool( |
| Runtime.getRuntime().availableProcessors(), new GridClientThreadFactory("exec", true)) : null; |
| |
| this.marshId = marshId; |
| |
| if (marshId == null && cfg.getMarshaller() == null) |
| throw new GridClientException("Failed to start client (marshaller is not configured)."); |
| |
| macs = U.allLocalMACs(); |
| |
| if (cfg.getProtocol() == GridClientProtocol.TCP) { |
| try { |
| IgniteLogger gridLog = new JavaLogger(false); |
| |
| GridNioFilter[] filters; |
| |
| GridNioFilter codecFilter = new GridNioCodecFilter(new GridTcpRestParser(routerClient), gridLog, false); |
| |
| if (sslCtx != null) { |
| GridNioSslFilter sslFilter = new GridNioSslFilter(sslCtx, true, ByteOrder.nativeOrder(), gridLog, null); |
| |
| sslFilter.directMode(false); |
| |
| filters = new GridNioFilter[]{codecFilter, sslFilter}; |
| } |
| else |
| filters = new GridNioFilter[]{codecFilter}; |
| |
| srv = GridNioServer.builder().address(U.getLocalHost()) |
| .port(-1) |
| .listener(new NioListener(log)) |
| .filters(filters) |
| .logger(gridLog) |
| .selectorCount(Runtime.getRuntime().availableProcessors()) |
| .sendQueueLimit(1024) |
| .byteOrder(ByteOrder.nativeOrder()) |
| .tcpNoDelay(cfg.isTcpNoDelay()) |
| .directBuffer(true) |
| .directMode(false) |
| .socketReceiveBufferSize(0) |
| .socketSendBufferSize(0) |
| .idleTimeout(Long.MAX_VALUE) |
| .igniteInstanceName(routerClient ? "routerClient" : "gridClient") |
| .serverName("tcp-client") |
| .daemon(cfg.isDaemon()) |
| .build(); |
| |
| srv.start(); |
| } |
| catch (IOException | IgniteCheckedException e) { |
| throw new GridClientException("Failed to start connection server.", e); |
| } |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void init(Collection<InetSocketAddress> srvs) throws GridClientException, InterruptedException { |
| init0(); |
| |
| connect(srvs, conn -> { |
| if (beforeNodeStart) { |
| conn.messageBeforeStart(new GridClientNodeStateBeforeStartRequest()) |
| .get(cfg.getConnectTimeout(), MILLISECONDS); |
| } |
| else { |
| conn.topology(cfg.isAutoFetchAttributes(), cfg.isAutoFetchMetrics(), null) |
| .get(cfg.getConnectTimeout(), MILLISECONDS); |
| } |
| }); |
| } |
| |
| /** |
| * Additional initialization. |
| * |
| * @throws GridClientException In case of error. |
| */ |
| protected abstract void init0() throws GridClientException; |
| |
| /** |
| * Gets active communication facade. |
| * |
| * @param node Remote node to which connection should be established. |
| * @throws GridServerUnreachableException If none of the servers can be reached after the exception. |
| * @throws GridClientClosedException If client was closed manually. |
| * @throws InterruptedException If connection was interrupted. |
| */ |
| @Override public GridClientConnection connection(GridClientNode node) |
| throws GridClientClosedException, GridServerUnreachableException, InterruptedException { |
| assert node != null; |
| |
| // Use router's connections if defined. |
| if (!routers.isEmpty()) |
| return connection(null, routers); |
| |
| GridClientConnection conn = nodeConns.get(node.nodeId()); |
| |
| if (conn != null) { |
| // Ignore closed connections. |
| if (conn.closeIfIdle(cfg.getMaxConnectionIdleTime())) |
| closeIdle(); |
| else |
| return conn; |
| } |
| |
| // Use node's connection, if node is available over rest. |
| Collection<InetSocketAddress> endpoints = node.availableAddresses(cfg.getProtocol(), true); |
| |
| List<InetSocketAddress> resolvedEndpoints = new ArrayList<>(endpoints.size()); |
| |
| for (InetSocketAddress endpoint : endpoints) |
| if (!endpoint.isUnresolved()) |
| resolvedEndpoints.add(endpoint); |
| |
| if (resolvedEndpoints.isEmpty()) { |
| throw new GridServerUnreachableException("No available endpoints to connect " + |
| "(is rest enabled for this node?): " + node); |
| } |
| |
| boolean sameHost = node.attributes().isEmpty() || |
| F.containsAny(macs, node.attribute(ATTR_MACS).toString().split(", ")); |
| |
| Collection<InetSocketAddress> srvs = new LinkedHashSet<>(); |
| |
| if (sameHost) { |
| Collections.sort(resolvedEndpoints, U.inetAddressesComparator(true)); |
| |
| srvs.addAll(resolvedEndpoints); |
| } |
| else { |
| for (InetSocketAddress endpoint : resolvedEndpoints) |
| if (!endpoint.getAddress().isLoopbackAddress()) |
| srvs.add(endpoint); |
| } |
| |
| return connection(node.nodeId(), srvs); |
| } |
| |
| /** |
| * Returns connection to one of the given addresses. |
| * |
| * @param nodeId {@code UUID} of node for mapping with connection. |
| * {@code null} if no need of mapping. |
| * @param srvs Collection of addresses to connect to. |
| * @return Connection to use for operations, targeted for the given node. |
| * @throws GridServerUnreachableException If connection can't be established. |
| * @throws GridClientClosedException If connections manager has been closed already. |
| * @throws InterruptedException If connection was interrupted. |
| */ |
| public GridClientConnection connection(@Nullable UUID nodeId, Collection<InetSocketAddress> srvs) |
| throws GridServerUnreachableException, GridClientClosedException, InterruptedException { |
| if (srvs == null || srvs.isEmpty()) |
| throw new GridServerUnreachableException("Failed to establish connection to the grid" + |
| " (address list is empty)."); |
| |
| checkClosed(); |
| |
| // Search for existent connection. |
| for (InetSocketAddress endPoint : srvs) { |
| assert endPoint != null; |
| |
| GridClientConnection conn = conns.get(endPoint); |
| |
| if (conn == null) |
| continue; |
| |
| // Ignore closed connections. |
| if (conn.closeIfIdle(cfg.getMaxConnectionIdleTime())) { |
| closeIdle(); |
| |
| continue; |
| } |
| |
| if (nodeId != null) |
| nodeConns.put(nodeId, conn); |
| |
| return conn; |
| } |
| |
| return connect(nodeId, srvs); |
| } |
| |
| /** |
| * Creates a connected facade and returns it. Called either from constructor or inside |
| * a write lock. |
| * |
| * @param nodeId {@code UUID} of node for mapping with connection. |
| * {@code null} if no need of mapping. |
| * @param srvs List of server addresses that this method will try to connect to. |
| * @return Established connection. |
| * @throws GridServerUnreachableException If none of the servers can be reached. |
| * @throws InterruptedException If connection was interrupted. |
| */ |
| protected GridClientConnection connect(@Nullable UUID nodeId, Collection<InetSocketAddress> srvs) |
| throws GridServerUnreachableException, InterruptedException { |
| if (srvs.isEmpty()) |
| throw new GridServerUnreachableException("Failed to establish connection to the grid node (address " + |
| "list is empty)."); |
| |
| Exception cause = null; |
| |
| for (InetSocketAddress srv : srvs) { |
| try { |
| return connect(nodeId, srv); |
| } |
| catch (InterruptedException e) { |
| throw e; |
| } |
| catch (Exception e) { |
| if (cause == null) |
| cause = e; |
| else if (log.isLoggable(INFO)) |
| log.info("Unable to connect to grid node [srvAddr=" + srv + ", msg=" + e.getMessage() + ']'); |
| } |
| } |
| |
| assert cause != null; |
| |
| throw new GridServerUnreachableException("Failed to connect to any of the servers in list: " + srvs, cause); |
| } |
| |
| /** |
| * Create new connection to specified server. |
| * |
| * @param nodeId {@code UUID} of node for mapping with connection. |
| * {@code null} if no need of mapping. |
| * @param addr Remote socket to connect. |
| * @return Established connection. |
| * @throws IOException If connection failed. |
| * @throws GridClientException If protocol error happened. |
| * @throws InterruptedException If thread was interrupted before connection was established. |
| */ |
| protected GridClientConnection connect(@Nullable UUID nodeId, InetSocketAddress addr) |
| throws IOException, GridClientException, InterruptedException { |
| endpointStripedLock.lock(addr); |
| |
| try { |
| GridClientConnection old = conns.get(addr); |
| |
| if (old != null) { |
| if (old.isClosed()) { |
| conns.remove(addr, old); |
| |
| if (nodeId != null) |
| nodeConns.remove(nodeId, old); |
| } |
| else { |
| if (nodeId != null) |
| nodeConns.put(nodeId, old); |
| |
| return old; |
| } |
| } |
| |
| SecurityCredentials cred = null; |
| |
| try { |
| if (cfg.getSecurityCredentialsProvider() != null) |
| cred = cfg.getSecurityCredentialsProvider().credentials(); |
| } |
| catch (IgniteCheckedException e) { |
| throw new GridClientException("Failed to obtain client credentials.", e); |
| } |
| |
| GridClientConnection conn; |
| |
| if (cfg.getProtocol() == GridClientProtocol.TCP) { |
| GridClientMarshaller marsh = cfg.getMarshaller(); |
| |
| try { |
| conn = new GridClientNioTcpConnection(srv, clientId, addr, sslCtx, pingExecutor, |
| cfg.getConnectTimeout(), cfg.getPingInterval(), cfg.getPingTimeout(), |
| cfg.isTcpNoDelay(), marsh, marshId, top, cred, cfg.getUserAttributes()); |
| } |
| catch (GridClientException e) { |
| if (marsh instanceof GridClientZipOptimizedMarshaller) { |
| log.warning("Failed to connect with GridClientZipOptimizedMarshaller," + |
| " trying to fallback to default marshaller: " + e); |
| |
| conn = new GridClientNioTcpConnection(srv, clientId, addr, sslCtx, pingExecutor, |
| cfg.getConnectTimeout(), cfg.getPingInterval(), cfg.getPingTimeout(), |
| cfg.isTcpNoDelay(), ((GridClientZipOptimizedMarshaller)marsh).defaultMarshaller(), marshId, |
| top, cred, cfg.getUserAttributes()); |
| } |
| else |
| throw e; |
| } |
| } |
| else |
| throw new GridServerUnreachableException("Failed to create client (protocol is not supported): " + |
| cfg.getProtocol()); |
| |
| old = conns.putIfAbsent(addr, conn); |
| |
| assert old == null; |
| |
| if (nodeId != null) |
| nodeConns.put(nodeId, conn); |
| |
| return conn; |
| } |
| finally { |
| endpointStripedLock.unlock(addr); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void terminateConnection(GridClientConnection conn, GridClientNode node, Throwable e) { |
| if (log.isLoggable(Level.FINE)) |
| log.fine("Connection with remote node was terminated [node=" + node + ", srvAddr=" + |
| conn.serverAddress() + ", errMsg=" + e.getMessage() + ']'); |
| |
| closeIdle(); |
| |
| conn.close(FAILED, false); |
| } |
| |
| /** |
| * Closes all opened connections. |
| * |
| * @param waitCompletion If {@code true} waits for all pending requests to be proceeded. |
| */ |
| @SuppressWarnings("TooBroadScope") |
| @Override public void stop(boolean waitCompletion) { |
| Collection<GridClientConnection> closeConns; |
| |
| if (closed) |
| return; |
| |
| // Mark manager as closed. |
| closed = true; |
| |
| // Remove all connections from cache. |
| closeConns = new ArrayList<>(conns.values()); |
| |
| conns.clear(); |
| |
| nodeConns.clear(); |
| |
| // Close old connection outside the writer lock. |
| for (GridClientConnection conn : closeConns) |
| conn.close(CLIENT_CLOSED, waitCompletion); |
| |
| if (pingExecutor != null) |
| GridClientUtils.shutdownNow(GridClientConnectionManager.class, pingExecutor, log); |
| |
| GridClientUtils.shutdownNow(GridClientConnectionManager.class, executor, log); |
| |
| if (srv != null) |
| srv.stop(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public GridClientConnection connection( |
| Collection<InetSocketAddress> srvs |
| ) throws GridClientException, InterruptedException { |
| return connect(srvs, null); |
| } |
| |
| /** |
| * Returns connection to node using given server addresses. |
| * |
| * @param srvs Server addresses. |
| * @param clo Client connection closure. |
| * @return Established connection. |
| * @throws GridClientException If failed. |
| * @throws InterruptedException If was interrupted while waiting for connection to be established. |
| */ |
| private GridClientConnection connect( |
| Collection<InetSocketAddress> srvs, |
| @Nullable GridClientConnectionInClosure clo |
| ) throws InterruptedException, GridClientException { |
| GridClientException firstEx = null; |
| |
| for (int i = 0; i < INIT_RETRY_CNT; i++) { |
| Collection<InetSocketAddress> srvsCp = new ArrayList<>(srvs); |
| |
| while (!srvsCp.isEmpty()) { |
| GridClientConnection conn = null; |
| |
| try { |
| conn = connect(null, srvsCp); |
| |
| if (clo != null) |
| clo.apply(conn); |
| |
| return conn; |
| } |
| catch (GridServerUnreachableException e) { |
| // No connection could be opened to any of initial addresses - exit to retry loop. |
| assert conn == null : |
| "GridClientConnectionResetException was thrown from GridClientConnection#topology"; |
| |
| if (firstEx == null) |
| firstEx = e; |
| |
| break; |
| } |
| catch (GridClientConnectionResetException e) { |
| // Connection was established but topology update failed - |
| // trying other initial addresses if any. |
| assert conn != null : "GridClientConnectionResetException was thrown from connect()"; |
| |
| if (firstEx == null) |
| firstEx = e; |
| |
| if (!srvsCp.remove(conn.serverAddress())) |
| // We have misbehaving collection or equals - just exit to avoid infinite loop. |
| break; |
| } |
| } |
| |
| Thread.sleep(INIT_RETRY_INTERVAL); |
| } |
| |
| for (GridClientConnection c : conns.values()) { |
| conns.remove(c.serverAddress(), c); |
| |
| c.close(FAILED, false); |
| } |
| |
| throw firstEx; |
| } |
| |
| /** |
| * Close all connections idling for more then |
| * {@link GridClientConfiguration#getMaxConnectionIdleTime()} milliseconds. |
| */ |
| @SuppressWarnings("ForLoopReplaceableByForEach") |
| private void closeIdle() { |
| for (Iterator<Map.Entry<UUID, GridClientConnection>> it = nodeConns.entrySet().iterator(); it.hasNext(); ) { |
| Map.Entry<UUID, GridClientConnection> entry = it.next(); |
| |
| GridClientConnection conn = entry.getValue(); |
| |
| if (conn.closeIfIdle(cfg.getMaxConnectionIdleTime())) { |
| conns.remove(conn.serverAddress(), conn); |
| |
| nodeConns.remove(entry.getKey(), conn); |
| } |
| } |
| |
| for (GridClientConnection conn : conns.values()) |
| if (conn.closeIfIdle(cfg.getMaxConnectionIdleTime())) |
| conns.remove(conn.serverAddress(), conn); |
| } |
| |
| /** |
| * Checks and throws an exception if this client was closed. |
| * |
| * @throws GridClientClosedException If client was closed. |
| */ |
| private void checkClosed() throws GridClientClosedException { |
| if (closed) |
| throw new GridClientClosedException("Client was closed (no public methods of client can be used anymore)."); |
| } |
| |
| /** |
| */ |
| private static class NioListener implements GridNioServerListener { |
| /** */ |
| private final Logger log; |
| |
| /** |
| * @param log Logger. |
| */ |
| private NioListener(Logger log) { |
| this.log = log; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void onConnected(GridNioSession ses) { |
| if (log.isLoggable(Level.FINE)) |
| log.fine("Session connected: " + ses); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void onDisconnected(GridNioSession ses, @Nullable Exception e) { |
| if (log.isLoggable(Level.FINE)) |
| log.fine("Session disconnected: " + ses); |
| |
| GridClientFutureAdapter<Boolean> handshakeFut = |
| ses.removeMeta(GridClientNioTcpConnection.SES_META_HANDSHAKE); |
| |
| if (handshakeFut != null) |
| handshakeFut.onDone( |
| new GridClientConnectionResetException("Failed to perform handshake (connection failed).")); |
| else { |
| GridClientNioTcpConnection conn = ses.meta(GridClientNioTcpConnection.SES_META_CONN); |
| |
| if (conn != null) |
| conn.close(FAILED, false); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void onMessageSent(GridNioSession ses, Object msg) { |
| // No-op. |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void onMessage(GridNioSession ses, Object msg) { |
| GridClientFutureAdapter<Boolean> handshakeFut = |
| ses.removeMeta(GridClientNioTcpConnection.SES_META_HANDSHAKE); |
| |
| if (handshakeFut != null) { |
| assert msg instanceof GridClientHandshakeResponse; |
| |
| handleHandshakeResponse(handshakeFut, (GridClientHandshakeResponse)msg); |
| } |
| else { |
| GridClientNioTcpConnection conn = ses.meta(GridClientNioTcpConnection.SES_META_CONN); |
| |
| assert conn != null; |
| |
| if (msg instanceof GridClientPingPacket) |
| conn.handlePingResponse(); |
| else { |
| try { |
| conn.handleResponse((GridClientMessage)msg); |
| } |
| catch (IOException e) { |
| log.log(Level.SEVERE, "Failed to parse response.", e); |
| } |
| } |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void onFailure(FailureType failureType, Throwable failure) { |
| // No-op. |
| } |
| |
| /** |
| * Handles client handshake response. |
| * |
| * @param handshakeFut Future. |
| * @param msg A handshake response. |
| */ |
| private void handleHandshakeResponse(GridClientFutureAdapter<Boolean> handshakeFut, |
| GridClientHandshakeResponse msg) { |
| byte rc = msg.resultCode(); |
| |
| if (rc != GridClientHandshakeResponse.OK.resultCode()) { |
| handshakeFut.onDone(new GridClientHandshakeException(rc, |
| "Handshake failed due to internal error (see server log for more details).")); |
| } |
| else |
| handshakeFut.onDone(true); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void onSessionWriteTimeout(GridNioSession ses) { |
| if (log.isLoggable(Level.FINE)) |
| log.fine("Closing NIO session because of write timeout."); |
| |
| ses.close(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void onSessionIdleTimeout(GridNioSession ses) { |
| if (log.isLoggable(Level.FINE)) |
| log.fine("Closing NIO session because of idle timeout."); |
| |
| ses.close(); |
| } |
| } |
| |
| /** |
| * Client connection in closure. |
| */ |
| @FunctionalInterface |
| private static interface GridClientConnectionInClosure { |
| /** |
| * Closure body. |
| * |
| * @param conn Client connection. |
| * @throws GridClientException If failed. |
| */ |
| void apply(GridClientConnection conn) throws GridClientException; |
| } |
| } |