blob: b3453b33df2d473beba0ae1a0ff4839df19ac5d8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.distributed.internal.tcpserver;
import java.io.BufferedOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import javax.net.ssl.SSLException;
import org.apache.logging.log4j.Logger;
import org.apache.geode.DataSerializer;
import org.apache.geode.annotations.internal.MakeNotStatic;
import org.apache.geode.cache.UnsupportedVersionException;
import org.apache.geode.distributed.internal.DistributionConfig;
import org.apache.geode.internal.Version;
import org.apache.geode.internal.VersionedDataInputStream;
import org.apache.geode.internal.VersionedDataOutputStream;
import org.apache.geode.internal.admin.SSLConfig;
import org.apache.geode.internal.logging.LogService;
import org.apache.geode.internal.net.SSLConfigurationFactory;
import org.apache.geode.internal.net.SocketCreator;
import org.apache.geode.internal.net.SocketCreatorFactory;
import org.apache.geode.internal.security.SecurableCommunicationChannel;
/**
* <p>
* Client for the TcpServer component of the Locator.
* </p>
*
* @since GemFire 5.7
*/
public class TcpClient {
private static final Logger logger = LogService.getLogger();
private static final int DEFAULT_REQUEST_TIMEOUT = 60 * 2 * 1000;
@MakeNotStatic
private static final Map<InetSocketAddress, Short> serverVersions =
new HashMap<>();
private final SocketCreator socketCreator;
public TcpClient(DistributionConfig distributionConfig) {
this(SocketCreatorFactory.setDistributionConfig(distributionConfig)
.getSocketCreatorForComponent(SecurableCommunicationChannel.LOCATOR));
}
/**
* Constructs a new TcpClient using the default (Locator) SocketCreator. SocketCreatorFactory
* should be initialized before invoking this method.
*/
public TcpClient() {
this(SocketCreatorFactory.getSocketCreatorForComponent(SecurableCommunicationChannel.LOCATOR));
}
public TcpClient(Properties properties) {
SSLConfig sslConfig = SSLConfigurationFactory.getSSLConfigForComponent(properties,
SecurableCommunicationChannel.LOCATOR);
this.socketCreator = new SocketCreator(sslConfig);
}
/**
* Constructs a new TcpClient
*
* @param socketCreator the SocketCreator to use in communicating with the Locator
*/
public TcpClient(SocketCreator socketCreator) {
this.socketCreator = socketCreator;
}
/**
* Stops the Locator running on a given host and port
*/
public void stop(InetAddress addr, int port) throws java.net.ConnectException {
try {
ShutdownRequest request = new ShutdownRequest();
requestToServer(addr, port, request, DEFAULT_REQUEST_TIMEOUT);
} catch (java.net.ConnectException ce) {
// must not be running, rethrow so the caller can handle.
// In most cases this Exception should be ignored.
throw ce;
} catch (Exception ex) {
logger.error(
"TcpClient.stop(): exception connecting to locator " + addr + ":" + port + ": " + ex);
}
}
/**
* Contacts the Locator running on the given host, and port and gets information about it. Two
* <code>String</code>s are returned: the first string is the working directory of the locator
* and the second string is the product directory of the locator.
*/
public String[] getInfo(InetAddress addr, int port) {
try {
InfoRequest request = new InfoRequest();
InfoResponse response =
(InfoResponse) requestToServer(addr, port, request, DEFAULT_REQUEST_TIMEOUT);
return response.getInfo();
} catch (java.net.ConnectException ignore) {
return null;
} catch (Exception ex) {
logger.error(
"TcpClient.getInfo(): exception connecting to locator " + addr + ":" + port + ": " + ex);
return null;
}
}
/**
* Send a request to a Locator and expect a reply
*
* @param addr The locator's address
* @param port The locator's tcp/ip port
* @param request The request message
* @param timeout Timeout for sending the message and receiving a reply
* @return the reply
*/
public Object requestToServer(InetAddress addr, int port, Object request, int timeout)
throws IOException, ClassNotFoundException {
return requestToServer(addr, port, request, timeout, true);
}
/**
* Send a request to a Locator
*
* @param addr The locator's address
* @param port The locator's tcp/ip port
* @param request The request message
* @param timeout Timeout for sending the message and receiving a reply
* @param replyExpected Whether to wait for a reply
* @return the reply
*/
public Object requestToServer(InetAddress addr, int port, Object request, int timeout,
boolean replyExpected) throws IOException, ClassNotFoundException {
InetSocketAddress ipAddr;
if (addr == null) {
ipAddr = new InetSocketAddress(port);
} else {
ipAddr = new InetSocketAddress(addr, port); // fix for bug 30810
}
return requestToServer(ipAddr, request, timeout, replyExpected);
}
/**
* Send a request to a Locator
*
* @param ipAddr The locator's inet socket address
* @param request The request message
* @param timeout Timeout for sending the message and receiving a reply
* @param replyExpected Whether to wait for a reply
* @return The reply, or null if no reply is expected
*/
public Object requestToServer(InetSocketAddress ipAddr, Object request, int timeout,
boolean replyExpected) throws IOException, ClassNotFoundException {
long giveupTime = System.currentTimeMillis() + timeout;
// Get the GemFire version of the TcpServer first, before sending any other request.
short serverVersion = getServerVersion(ipAddr, timeout);
if (serverVersion > Version.CURRENT_ORDINAL) {
serverVersion = Version.CURRENT_ORDINAL;
}
// establish the old GossipVersion for the server
int gossipVersion = TcpServer.getCurrentGossipVersion();
if (Version.GFE_71.compareTo(serverVersion) > 0) {
gossipVersion = TcpServer.getOldGossipVersion();
}
long newTimeout = giveupTime - System.currentTimeMillis();
if (newTimeout <= 0) {
return null;
}
logger.debug("TcpClient sending {} to {}", request, ipAddr);
Socket sock =
socketCreator.connect(ipAddr.getAddress(), ipAddr.getPort(), (int) newTimeout, null, false);
sock.setSoTimeout((int) newTimeout);
DataOutputStream out = null;
try {
out = new DataOutputStream(new BufferedOutputStream(sock.getOutputStream()));
if (serverVersion < Version.CURRENT_ORDINAL) {
out = new VersionedDataOutputStream(out, Version.fromOrdinalNoThrow(serverVersion, false));
}
out.writeInt(gossipVersion);
if (gossipVersion > TcpServer.getOldGossipVersion()) {
out.writeShort(serverVersion);
}
DataSerializer.writeObject(request, out);
out.flush();
if (replyExpected) {
DataInputStream in = new DataInputStream(sock.getInputStream());
in = new VersionedDataInputStream(in, Version.fromOrdinal(serverVersion, false));
try {
Object response = DataSerializer.readObject(in);
logger.debug("received response: {}", response);
return response;
} catch (EOFException ex) {
logger.debug("requestToServer EOFException ", ex);
EOFException eof = new EOFException("Locator at " + ipAddr
+ " did not respond. This is normal if the locator was shutdown. If it wasn't check its log for exceptions.");
eof.initCause(ex);
throw eof;
}
} else {
return null;
}
} catch (UnsupportedVersionException ex) {
if (logger.isDebugEnabled()) {
logger
.debug("Remote TcpServer version: " + serverVersion + " is higher than local version: "
+ Version.CURRENT_ORDINAL + ". This is never expected as remoteVersion");
}
return null;
} finally {
try {
if (replyExpected) {
// Since we've read a response we know that the Locator is finished
// with the socket and is closing it. Aborting the connection by
// setting SO_LINGER to zero will clean up the TIME_WAIT socket on
// the locator's machine.
if (!sock.isClosed() && !socketCreator.useSSL()) {
sock.setSoLinger(true, 0);
}
}
sock.close();
} catch (Exception e) {
logger.error("Error closing socket ", e);
}
if (out != null) {
out.close();
}
}
}
private Short getServerVersion(InetSocketAddress ipAddr, int timeout)
throws IOException, ClassNotFoundException {
int gossipVersion;
Short serverVersion;
Socket sock;
final String locatorCancelExceptionString =
"This could be the result of trying to connect a non-SSL-enabled client to an SSL-enabled locator.";
// Get GemFire version of TcpServer first, before sending any other request.
synchronized (serverVersions) {
serverVersion = serverVersions.get(ipAddr);
}
if (serverVersion != null) {
return serverVersion;
}
gossipVersion = TcpServer.getOldGossipVersion();
try {
sock = socketCreator.connect(ipAddr.getAddress(), ipAddr.getPort(), timeout, null, false);
sock.setSoTimeout(timeout);
} catch (SSLException e) {
throw new LocatorCancelException("Unable to form SSL connection", e);
}
try {
OutputStream outputStream = new BufferedOutputStream(sock.getOutputStream());
DataOutputStream out =
new VersionedDataOutputStream(new DataOutputStream(outputStream), Version.GFE_57);
out.writeInt(gossipVersion);
VersionRequest verRequest = new VersionRequest();
DataSerializer.writeObject(verRequest, out);
out.flush();
InputStream inputStream = sock.getInputStream();
DataInputStream in = new DataInputStream(inputStream);
in = new VersionedDataInputStream(in, Version.GFE_57);
try {
Object readObject = DataSerializer.readObject(in);
if (!(readObject instanceof VersionResponse)) {
throw new LocatorCancelException(
"Server version response invalid: " + locatorCancelExceptionString);
}
VersionResponse response = (VersionResponse) readObject;
serverVersion = response.getVersionOrdinal();
synchronized (serverVersions) {
serverVersions.put(ipAddr, serverVersion);
}
return serverVersion;
} catch (EOFException ex) {
// old locators will not recognize the version request and will close the connection
}
} finally {
try {
sock.setSoLinger(true, 0); // initiate an abort on close to shut down the server's socket
} catch (Exception e) {
logger.error("Error aborting socket ", e);
}
try {
sock.close();
} catch (Exception e) {
logger.error("Error closing socket ", e);
}
}
synchronized (serverVersions) {
serverVersions.put(ipAddr, Version.GFE_57.ordinal());
}
return Short.valueOf(Version.GFE_57.ordinal());
}
/**
* Clear static class information concerning Locators. This is used in unit tests. It will force
* TcpClient to send version-request messages to locators to reestablish knowledge of their
* communication protocols.
*/
public static void clearStaticData() {
synchronized (serverVersions) {
serverVersions.clear();
}
}
}