| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more contributor license |
| * agreements. See the NOTICE file distributed with this work for additional information regarding |
| * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. You may obtain a |
| * copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software distributed under the License |
| * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
| * or implied. See the License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| package org.apache.geode.distributed.internal.tcpserver; |
| |
| import java.io.BufferedOutputStream; |
| import java.io.DataInputStream; |
| import java.io.DataOutputStream; |
| import java.io.EOFException; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.io.OutputStream; |
| import java.net.InetAddress; |
| import java.net.InetSocketAddress; |
| import java.net.Socket; |
| import java.util.HashMap; |
| import java.util.Map; |
| import java.util.Properties; |
| |
| import javax.net.ssl.SSLException; |
| |
| import org.apache.logging.log4j.Logger; |
| |
| import org.apache.geode.DataSerializer; |
| import org.apache.geode.annotations.internal.MakeNotStatic; |
| import org.apache.geode.distributed.internal.DistributionConfig; |
| import org.apache.geode.internal.admin.SSLConfig; |
| import org.apache.geode.internal.net.SSLConfigurationFactory; |
| import org.apache.geode.internal.net.SocketCreator; |
| import org.apache.geode.internal.net.SocketCreatorFactory; |
| import org.apache.geode.internal.security.SecurableCommunicationChannel; |
| import org.apache.geode.internal.serialization.UnsupportedSerializationVersionException; |
| import org.apache.geode.internal.serialization.Version; |
| import org.apache.geode.internal.serialization.VersionedDataInputStream; |
| import org.apache.geode.internal.serialization.VersionedDataOutputStream; |
| import org.apache.geode.logging.internal.log4j.api.LogService; |
| |
| /** |
| * <p> |
| * Client for the TcpServer component of the Locator. |
| * </p> |
| * |
| * @since GemFire 5.7 |
| */ |
| public class TcpClient { |
| |
| private static final Logger logger = LogService.getLogger(); |
| |
| private static final int DEFAULT_REQUEST_TIMEOUT = 60 * 2 * 1000; |
| |
| @MakeNotStatic |
| private static final Map<InetSocketAddress, Short> serverVersions = |
| new HashMap<>(); |
| |
| private final SocketCreator socketCreator; |
| |
| public TcpClient(DistributionConfig distributionConfig) { |
| this(SocketCreatorFactory.setDistributionConfig(distributionConfig) |
| .getSocketCreatorForComponent(SecurableCommunicationChannel.LOCATOR)); |
| } |
| |
| /** |
| * Constructs a new TcpClient using the default (Locator) SocketCreator. SocketCreatorFactory |
| * should be initialized before invoking this method. |
| */ |
| public TcpClient() { |
| this(SocketCreatorFactory.getSocketCreatorForComponent(SecurableCommunicationChannel.LOCATOR)); |
| } |
| |
| public TcpClient(Properties properties) { |
| SSLConfig sslConfig = SSLConfigurationFactory.getSSLConfigForComponent(properties, |
| SecurableCommunicationChannel.LOCATOR); |
| this.socketCreator = new SocketCreator(sslConfig); |
| } |
| |
| /** |
| * Constructs a new TcpClient |
| * |
| * @param socketCreator the SocketCreator to use in communicating with the Locator |
| */ |
| public TcpClient(SocketCreator socketCreator) { |
| this.socketCreator = socketCreator; |
| } |
| |
| /** |
| * Stops the Locator running on a given host and port |
| */ |
| public void stop(InetAddress addr, int port) throws java.net.ConnectException { |
| try { |
| ShutdownRequest request = new ShutdownRequest(); |
| requestToServer(addr, port, request, DEFAULT_REQUEST_TIMEOUT); |
| } catch (java.net.ConnectException ce) { |
| // must not be running, rethrow so the caller can handle. |
| // In most cases this Exception should be ignored. |
| throw ce; |
| } catch (Exception ex) { |
| logger.error( |
| "TcpClient.stop(): exception connecting to locator " + addr + ":" + port + ": " + ex); |
| } |
| } |
| |
| /** |
| * Contacts the Locator running on the given host, and port and gets information about it. Two |
| * <code>String</code>s are returned: the first string is the working directory of the locator |
| * and the second string is the product directory of the locator. |
| */ |
| public String[] getInfo(InetAddress addr, int port) { |
| try { |
| InfoRequest request = new InfoRequest(); |
| InfoResponse response = |
| (InfoResponse) requestToServer(addr, port, request, DEFAULT_REQUEST_TIMEOUT); |
| return response.getInfo(); |
| } catch (java.net.ConnectException ignore) { |
| return null; |
| } catch (Exception ex) { |
| logger.error( |
| "TcpClient.getInfo(): exception connecting to locator " + addr + ":" + port + ": " + ex); |
| return null; |
| } |
| |
| } |
| |
| /** |
| * Send a request to a Locator and expect a reply |
| * |
| * @param addr The locator's address |
| * @param port The locator's tcp/ip port |
| * @param request The request message |
| * @param timeout Timeout for sending the message and receiving a reply |
| * @return the reply |
| */ |
| public Object requestToServer(InetAddress addr, int port, Object request, int timeout) |
| throws IOException, ClassNotFoundException { |
| |
| return requestToServer(addr, port, request, timeout, true); |
| } |
| |
| /** |
| * Send a request to a Locator |
| * |
| * @param addr The locator's address |
| * @param port The locator's tcp/ip port |
| * @param request The request message |
| * @param timeout Timeout for sending the message and receiving a reply |
| * @param replyExpected Whether to wait for a reply |
| * @return the reply |
| */ |
| public Object requestToServer(InetAddress addr, int port, Object request, int timeout, |
| boolean replyExpected) throws IOException, ClassNotFoundException { |
| InetSocketAddress ipAddr; |
| if (addr == null) { |
| ipAddr = new InetSocketAddress(port); |
| } else { |
| ipAddr = new InetSocketAddress(addr, port); // fix for bug 30810 |
| } |
| return requestToServer(ipAddr, request, timeout, replyExpected); |
| } |
| |
| /** |
| * Send a request to a Locator |
| * |
| * @param ipAddr The locator's inet socket address |
| * @param request The request message |
| * @param timeout Timeout for sending the message and receiving a reply |
| * @param replyExpected Whether to wait for a reply |
| * @return The reply, or null if no reply is expected |
| */ |
| public Object requestToServer(InetSocketAddress ipAddr, Object request, int timeout, |
| boolean replyExpected) throws IOException, ClassNotFoundException { |
| long giveupTime = System.currentTimeMillis() + timeout; |
| |
| // Get the GemFire version of the TcpServer first, before sending any other request. |
| short serverVersion = getServerVersion(ipAddr, timeout); |
| |
| if (serverVersion > Version.CURRENT_ORDINAL) { |
| serverVersion = Version.CURRENT_ORDINAL; |
| } |
| |
| // establish the old GossipVersion for the server |
| int gossipVersion = TcpServer.getCurrentGossipVersion(); |
| |
| if (Version.GFE_71.compareTo(serverVersion) > 0) { |
| gossipVersion = TcpServer.getOldGossipVersion(); |
| } |
| |
| long newTimeout = giveupTime - System.currentTimeMillis(); |
| if (newTimeout <= 0) { |
| return null; |
| } |
| |
| logger.debug("TcpClient sending {} to {}", request, ipAddr); |
| |
| Socket sock = |
| socketCreator.connect(ipAddr.getAddress(), ipAddr.getPort(), (int) newTimeout, null, false); |
| sock.setSoTimeout((int) newTimeout); |
| DataOutputStream out = null; |
| try { |
| |
| out = new DataOutputStream(new BufferedOutputStream(sock.getOutputStream())); |
| |
| if (serverVersion < Version.CURRENT_ORDINAL) { |
| out = new VersionedDataOutputStream(out, Version.fromOrdinalNoThrow(serverVersion, false)); |
| } |
| |
| out.writeInt(gossipVersion); |
| if (gossipVersion > TcpServer.getOldGossipVersion()) { |
| out.writeShort(serverVersion); |
| } |
| DataSerializer.writeObject(request, out); |
| out.flush(); |
| |
| if (replyExpected) { |
| DataInputStream in = new DataInputStream(sock.getInputStream()); |
| in = new VersionedDataInputStream(in, Version.fromOrdinal(serverVersion)); |
| try { |
| Object response = DataSerializer.readObject(in); |
| logger.debug("received response: {}", response); |
| return response; |
| } catch (EOFException ex) { |
| logger.debug("requestToServer EOFException ", ex); |
| EOFException eof = new EOFException("Locator at " + ipAddr |
| + " did not respond. This is normal if the locator was shutdown. If it wasn't check its log for exceptions."); |
| eof.initCause(ex); |
| throw eof; |
| } |
| } else { |
| return null; |
| } |
| } catch (UnsupportedSerializationVersionException ex) { |
| if (logger.isDebugEnabled()) { |
| logger |
| .debug("Remote TcpServer version: " + serverVersion + " is higher than local version: " |
| + Version.CURRENT_ORDINAL + ". This is never expected as remoteVersion"); |
| } |
| return null; |
| } finally { |
| try { |
| if (replyExpected) { |
| // Since we've read a response we know that the Locator is finished |
| // with the socket and is closing it. Aborting the connection by |
| // setting SO_LINGER to zero will clean up the TIME_WAIT socket on |
| // the locator's machine. |
| if (!sock.isClosed() && !socketCreator.useSSL()) { |
| sock.setSoLinger(true, 0); |
| } |
| } |
| sock.close(); |
| } catch (Exception e) { |
| logger.error("Error closing socket ", e); |
| } |
| if (out != null) { |
| out.close(); |
| } |
| } |
| } |
| |
| private Short getServerVersion(InetSocketAddress ipAddr, int timeout) |
| throws IOException, ClassNotFoundException { |
| |
| int gossipVersion; |
| Short serverVersion; |
| Socket sock; |
| |
| final String locatorCancelExceptionString = |
| "This could be the result of trying to connect a non-SSL-enabled client to an SSL-enabled locator."; |
| |
| // Get GemFire version of TcpServer first, before sending any other request. |
| synchronized (serverVersions) { |
| serverVersion = serverVersions.get(ipAddr); |
| } |
| |
| if (serverVersion != null) { |
| return serverVersion; |
| } |
| |
| gossipVersion = TcpServer.getOldGossipVersion(); |
| |
| try { |
| sock = socketCreator.connect(ipAddr.getAddress(), ipAddr.getPort(), timeout, null, false); |
| sock.setSoTimeout(timeout); |
| } catch (SSLException e) { |
| throw new LocatorCancelException("Unable to form SSL connection", e); |
| } |
| |
| try { |
| OutputStream outputStream = new BufferedOutputStream(sock.getOutputStream()); |
| DataOutputStream out = |
| new VersionedDataOutputStream(new DataOutputStream(outputStream), Version.GFE_57); |
| |
| out.writeInt(gossipVersion); |
| |
| VersionRequest verRequest = new VersionRequest(); |
| DataSerializer.writeObject(verRequest, out); |
| out.flush(); |
| |
| InputStream inputStream = sock.getInputStream(); |
| DataInputStream in = new DataInputStream(inputStream); |
| in = new VersionedDataInputStream(in, Version.GFE_57); |
| try { |
| Object readObject = DataSerializer.readObject(in); |
| if (!(readObject instanceof VersionResponse)) { |
| throw new LocatorCancelException( |
| "Server version response invalid: " + locatorCancelExceptionString); |
| } |
| |
| VersionResponse response = (VersionResponse) readObject; |
| serverVersion = response.getVersionOrdinal(); |
| synchronized (serverVersions) { |
| serverVersions.put(ipAddr, serverVersion); |
| } |
| |
| return serverVersion; |
| |
| } catch (EOFException ex) { |
| // old locators will not recognize the version request and will close the connection |
| } |
| } finally { |
| try { |
| sock.setSoLinger(true, 0); // initiate an abort on close to shut down the server's socket |
| } catch (Exception e) { |
| logger.error("Error aborting socket ", e); |
| } |
| try { |
| sock.close(); |
| } catch (Exception e) { |
| logger.error("Error closing socket ", e); |
| } |
| |
| } |
| synchronized (serverVersions) { |
| serverVersions.put(ipAddr, Version.GFE_57.ordinal()); |
| } |
| return Short.valueOf(Version.GFE_57.ordinal()); |
| } |
| |
| |
| /** |
| * Clear static class information concerning Locators. This is used in unit tests. It will force |
| * TcpClient to send version-request messages to locators to reestablish knowledge of their |
| * communication protocols. |
| */ |
| public static void clearStaticData() { |
| synchronized (serverVersions) { |
| serverVersions.clear(); |
| } |
| } |
| } |