blob: 1e151218833a7a9f9e34b3a127673ad451584021 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.distributed.internal.tcpserver;
import java.io.BufferedOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.Socket;
import java.util.HashMap;
import java.util.Map;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
import javax.net.ssl.SSLException;
import javax.net.ssl.SSLHandshakeException;
import org.apache.logging.log4j.Logger;
import org.apache.geode.internal.serialization.KnownVersion;
import org.apache.geode.internal.serialization.ObjectDeserializer;
import org.apache.geode.internal.serialization.ObjectSerializer;
import org.apache.geode.internal.serialization.VersionedDataInputStream;
import org.apache.geode.internal.serialization.VersionedDataOutputStream;
import org.apache.geode.internal.serialization.Versioning;
import org.apache.geode.logging.internal.log4j.api.LogService;
/**
* Client for the TcpServer component of the Locator.
*
* @see TcpServer#TcpServer(int, InetAddress, TcpHandler, String, ProtocolChecker,
* LongSupplier, Supplier, TcpSocketCreator, ObjectSerializer, ObjectDeserializer, String,
* String)
*/
public class TcpClient {
private static final Logger logger = LogService.getLogger();
private static final int DEFAULT_REQUEST_TIMEOUT = 60 * 2 * 1000;
private final Map<HostAndPort, Short> serverVersions =
new HashMap<>();
private final TcpSocketCreator socketCreator;
private final ObjectSerializer objectSerializer;
private final ObjectDeserializer objectDeserializer;
private final TcpSocketFactory socketFactory;
/**
* Constructs a new TcpClient
*
* @param socketCreator the SocketCreator to use in communicating with the Locator
* @param objectSerializer serializer for messages sent to the TcpServer
* @param objectDeserializer deserializer for responses from the TcpServer
*/
public TcpClient(TcpSocketCreator socketCreator, final ObjectSerializer objectSerializer,
final ObjectDeserializer objectDeserializer, TcpSocketFactory socketFactory) {
this.socketCreator = socketCreator;
this.objectSerializer = objectSerializer;
this.objectDeserializer = objectDeserializer;
this.socketFactory = socketFactory;
}
/**
* Stops the TcpServer running on a given host and port
*/
public void stop(HostAndPort addr) throws java.net.ConnectException {
try {
ShutdownRequest request = new ShutdownRequest();
requestToServer(addr, request, DEFAULT_REQUEST_TIMEOUT);
} catch (java.net.ConnectException ce) {
// must not be running, rethrow so the caller can handle.
// In most cases this Exception should be ignored.
throw ce;
} catch (Exception ex) {
logger.error(
"TcpClient.stop(): exception connecting to locator " + addr + ex);
}
}
/**
* Contacts the Locator running on the given host, and port and gets information about it. Two
* <code>String</code>s are returned: the first string is the working directory of the locator
* and the second string is the product directory of the locator.
*
* @deprecated this was created for the deprecated Admin API
*/
@Deprecated
public String[] getInfo(HostAndPort addr) {
try {
InfoRequest request = new InfoRequest();
InfoResponse response =
(InfoResponse) requestToServer(addr, request, DEFAULT_REQUEST_TIMEOUT);
return response.getInfo();
} catch (java.net.ConnectException ignore) {
return null;
} catch (Exception ex) {
logger.error(
"TcpClient.getInfo(): exception connecting to locator " + addr + ": " + ex);
return null;
}
}
/**
* Send a request to a Locator and expect a reply
*
* @param addr The locator's address
* @param request The request message
* @param timeout Timeout for sending the message and receiving a reply
* @return the reply. This may return a null
* if we're unable to form a connection to the TcpServer before the given timeout elapses
*/
public Object requestToServer(HostAndPort addr, Object request, int timeout)
throws IOException, ClassNotFoundException {
return requestToServer(addr, request, timeout, true);
}
/**
* Send a request to a Locator
*
* @param addr The locator's address
* @param request The request message
* @param timeout Timeout for sending the message and receiving a reply
* @param replyExpected Whether to wait for a reply
* @return The reply, or null if no reply is expected. This may also return a null
* if we're unable to form a connection to the TcpServer before the given timeout elapses
* @throws ClassNotFoundException if the deserializer throws this exception
* @throws IOException if there is a problem interacting with the server
*/
public Object requestToServer(HostAndPort addr, Object request, int timeout,
boolean replyExpected) throws IOException, ClassNotFoundException {
long giveupTime = System.currentTimeMillis() + timeout;
// Get the GemFire version of the TcpServer first, before sending any other request.
final short serverVersionShort = getServerVersion(addr, timeout);
KnownVersion serverVersion =
Versioning.getKnownVersionOrDefault(
Versioning.getVersion(serverVersionShort),
null);
final String debugVersionMessage;
if (serverVersion == null) {
serverVersion = KnownVersion.CURRENT;
debugVersionMessage =
"Remote TcpServer version: " + serverVersionShort + " is higher than local version: "
+ KnownVersion.CURRENT_ORDINAL + ". This is never expected as remoteVersion";
} else {
debugVersionMessage = null;
}
// establish the old GossipVersion for the server
int gossipVersion = TcpServer.getCurrentGossipVersion();
if (serverVersion.isNotNewerThan(KnownVersion.GFE_71)) {
gossipVersion = TcpServer.getOldGossipVersion();
}
long newTimeout = giveupTime - System.currentTimeMillis();
if (newTimeout <= 0) {
return null;
}
logger.debug("TcpClient sending {} to {}", request, addr);
Socket sock =
socketCreator.forCluster().connect(addr, (int) newTimeout, null, socketFactory);
sock.setSoTimeout((int) newTimeout);
DataOutputStream out = null;
try {
out = new DataOutputStream(new BufferedOutputStream(sock.getOutputStream()));
if (serverVersion.isOlderThan(KnownVersion.CURRENT)) {
out = new VersionedDataOutputStream(out, serverVersion);
}
out.writeInt(gossipVersion);
if (gossipVersion > TcpServer.getOldGossipVersion()) {
out.writeShort(serverVersionShort);
}
objectSerializer.writeObject(request, out);
out.flush();
if (replyExpected) {
try (DataInputStream dataInputStream = new DataInputStream(sock.getInputStream());
VersionedDataInputStream versionedDataInputStream =
new VersionedDataInputStream(dataInputStream, serverVersion)) {
if (debugVersionMessage != null && logger.isDebugEnabled()) {
logger.debug(debugVersionMessage);
}
try {
Object response = objectDeserializer.readObject(versionedDataInputStream);
logger.debug("received response: {}", response);
return response;
} catch (EOFException ex) {
logger.debug("requestToServer EOFException ", ex);
EOFException eof = new EOFException("Locator at " + addr
+ " did not respond. This is normal if the locator was shutdown. If it wasn't check its log for exceptions.");
eof.initCause(ex);
throw eof;
}
}
} else {
return null;
}
} finally {
try {
if (replyExpected) {
// Since we've read a response we know that the Locator is finished
// with the socket and is closing it. Aborting the connection by
// setting SO_LINGER to zero will clean up the TIME_WAIT socket on
// the locator's machine.
if (!sock.isClosed() && !socketCreator.forCluster().useSSL()) {
sock.setSoLinger(true, 0);
}
}
sock.close();
} catch (Exception e) {
logger.error("Error closing socket ", e);
}
if (out != null) {
out.close();
}
}
}
private Short getServerVersion(HostAndPort addr, int timeout)
throws IOException, ClassNotFoundException {
int gossipVersion;
Short serverVersion;
Socket sock;
// Get GemFire version of TcpServer first, before sending any other request.
synchronized (serverVersions) {
serverVersion = serverVersions.get(addr);
}
if (serverVersion != null) {
return serverVersion;
}
gossipVersion = TcpServer.getOldGossipVersion();
try {
sock = socketCreator.forCluster().connect(addr, timeout, null, socketFactory);
sock.setSoTimeout(timeout);
} catch (SSLHandshakeException e) {
if ((e.getCause() instanceof EOFException)
&& (e.getCause().getMessage().contains("SSL peer shut down incorrectly"))) {
throw new IOException("Remote host terminated the handshake", e);
} else {
throw new IllegalStateException("Unable to form SSL connection", e);
}
} catch (SSLException e) {
throw new IllegalStateException("Unable to form SSL connection", e);
}
try (OutputStream outputStream = new BufferedOutputStream(sock.getOutputStream());
DataOutputStream out =
new VersionedDataOutputStream(new DataOutputStream(outputStream), KnownVersion.GFE_57);
InputStream inputStream = sock.getInputStream();
DataInputStream in = new DataInputStream(inputStream);
VersionedDataInputStream versionedIn =
new VersionedDataInputStream(in, KnownVersion.GFE_57)) {
out.writeInt(gossipVersion);
VersionRequest verRequest = new VersionRequest();
objectSerializer.writeObject(verRequest, out);
out.flush();
try {
Object readObject = objectDeserializer.readObject(versionedIn);
if (!(readObject instanceof VersionResponse)) {
throw new IllegalThreadStateException(
"Server version response invalid: "
+ "This could be the result of trying to connect a non-SSL-enabled client to an SSL-enabled locator.");
}
VersionResponse response = (VersionResponse) readObject;
serverVersion = response.getVersionOrdinal();
synchronized (serverVersions) {
serverVersions.put(addr, serverVersion);
}
return serverVersion;
} catch (EOFException ex) {
// old locators will not recognize the version request and will close the connection
}
} finally {
if (!sock.isClosed()) {
try {
sock.setSoLinger(true, 0); // initiate an abort on close to shut down the server's socket
} catch (Exception e) {
logger.error("Error aborting socket ", e);
}
try {
sock.close();
} catch (Exception e) {
logger.error("Error closing socket ", e);
}
}
}
synchronized (serverVersions) {
serverVersions.put(addr, KnownVersion.GFE_57.ordinal());
}
return KnownVersion.GFE_57.ordinal();
}
}