blob: 924eaef49ab1c86d2aaf572852b23ffd2333accc [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
package org.apache.geode.cache.client.internal;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.logging.log4j.Logger;
import org.apache.geode.CancelCriterion;
import org.apache.geode.CancelException;
import org.apache.geode.GemFireConfigException;
import org.apache.geode.annotations.internal.MutableForTesting;
import org.apache.geode.cache.GatewayConfigurationException;
import org.apache.geode.cache.client.ServerRefusedConnectionException;
import org.apache.geode.cache.client.internal.ServerDenyList.FailureTracker;
import org.apache.geode.cache.wan.GatewaySender;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.distributed.internal.ServerLocation;
import org.apache.geode.internal.cache.tier.sockets.CacheClientUpdater;
import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID;
import org.apache.geode.logging.internal.log4j.api.LogService;
* Creates connections, using a connection source to determine which server to connect to.
* @since GemFire 5.7
public class ConnectionFactoryImpl implements ConnectionFactory {
private static final Logger logger = LogService.getLogger();
// TODO - GEODE-1746, the handshake holds state. It seems like the code depends
// on all of the handshake operations happening in a single thread. I don't think we
// want that, need to refactor.
private final ServerDenyList denyList;
private ConnectionSource source;
private PoolImpl pool;
private final CancelCriterion cancelCriterion;
private final ConnectionConnector connectionConnector;
* Test hook for client version support
* @since GemFire 5.7
public static boolean testFailedConnectionToServer = false;
ConnectionFactoryImpl(ConnectionSource source, EndpointManager endpointManager,
InternalDistributedSystem sys, int socketBufferSize, int handshakeTimeout, int readTimeout,
ClientProxyMembershipID proxyId, CancelCriterion cancelCriterion, boolean usedByGateway,
GatewaySender sender, long pingInterval, boolean multiuserSecureMode, PoolImpl pool) {
new ConnectionConnector(endpointManager, sys, socketBufferSize, handshakeTimeout,
readTimeout, usedByGateway, sender,
(usedByGateway || sender != null) ? SocketCreatorFactory
: SocketCreatorFactory
new ClientSideHandshakeImpl(proxyId, sys, sys.getSecurityService(),
source, pingInterval, pool, cancelCriterion);
public ConnectionFactoryImpl(ConnectionConnector connectionConnector, ConnectionSource source,
long pingInterval, PoolImpl pool, CancelCriterion cancelCriterion) {
this.connectionConnector = connectionConnector;
this.source = source;
this.pool = pool;
this.cancelCriterion = cancelCriterion;
denyList = new ServerDenyList(pingInterval);
public void start(ScheduledExecutorService background) {
public ServerDenyList getDenyList() {
return denyList;
public Connection createClientToServerConnection(ServerLocation location, boolean forQueue)
throws GemFireSecurityException {
FailureTracker failureTracker = denyList.getFailureTracker(location);
Connection connection = null;
try {
connection = connectionConnector.connectClientToServer(location, forQueue);
} catch (GemFireConfigException | CancelException | GemFireSecurityException
| GatewayConfigurationException e) {
throw e;
} catch (ServerRefusedConnectionException src) {
// propagate this up, don't retry
logger.warn("Could not create a new connection to server: {}",
testFailedConnectionToServer = true;
throw src;
} catch (Exception e) {
if (e.getMessage() != null && (e.getMessage().equals("Connection refused")
|| e.getMessage().equals("Connection reset"))) {
// this is the most common case, so don't print an exception
if (logger.isDebugEnabled()) {
logger.debug("Unable to connect to {}: connection refused", location);
} else {
logger.warn("Could not connect to: " + location, e);
testFailedConnectionToServer = true;
return connection;
private void authenticateIfRequired(Connection conn) {
if (!pool.isUsedByGateway() && !pool.getMultiuserAuthentication()) {
ServerLocation server = conn.getServer();
if (server.getRequiresCredentials()) {
if (server.getUserId() == -1) {
Long uniqueID = (Long) AuthenticateUserOp.executeOn(conn, pool);
if (logger.isDebugEnabled()) {
logger.debug("CFI.authenticateIfRequired() Completed authentication on {}", conn);
public ServerLocation findBestServer(ServerLocation currentServer,
Set<ServerLocation> excludedServers) {
if (currentServer != null && source.isBalanced()) {
return currentServer;
final Set<ServerLocation> origExcludedServers = excludedServers;
excludedServers = new HashSet<>(excludedServers);
Set<ServerLocation> denyListedServers = denyList.getBadServers();
ServerLocation server = source.findReplacementServer(currentServer, excludedServers);
if (server == null) {
// Nothing worked! Let's try without the denylist.
if (excludedServers.size() > origExcludedServers.size()) {
// We had some servers denylisted so lets give this another whirl.
server = source.findReplacementServer(currentServer, origExcludedServers);
if (server == null && logger.isDebugEnabled()) {
logger.debug("Source was unable to findForReplacement any servers");
return server;
public Connection createClientToServerConnection(Set<ServerLocation> excludedServers)
throws GemFireSecurityException {
final Set<ServerLocation> origExcludedServers = excludedServers;
excludedServers = new HashSet<>(excludedServers);
Set<ServerLocation> denyListedServers = denyList.getBadServers();
Connection conn = null;
RuntimeException fatalException = null;
boolean tryDenyList = true;
do {
ServerLocation server = source.findServer(excludedServers);
if (server == null) {
if (tryDenyList) {
// Nothing worked! Let's try without the denylist.
tryDenyList = false;
int size = excludedServers.size();
// make sure we didn't remove any of the ones that the caller set not to use
if (excludedServers.size() < size) {
// We are able to remove some exclusions, so lets give this another whirl.
if (logger.isDebugEnabled()) {
logger.debug("Source was unable to locate any servers");
if (fatalException != null) {
throw fatalException;
return null;
try {
conn = createClientToServerConnection(server, false);
} catch (CancelException | GemFireSecurityException | GatewayConfigurationException e) {
throw e;
} catch (ServerRefusedConnectionException srce) {
fatalException = srce;
if (logger.isDebugEnabled()) {
logger.debug("ServerRefusedConnectionException attempting to connect to {}", server,
} catch (Exception e) {
logger.warn(String.format("Could not connect to: %s", server), e);
} while (conn == null);
return conn;
public ClientUpdater createServerToClientConnection(Endpoint endpoint, QueueManager qManager,
boolean isPrimary, ClientUpdater failedUpdater) {
String clientUpdateName = CacheClientUpdater.CLIENT_UPDATER_THREAD_NAME + " on "
+ endpoint.getMemberId() + " port " + endpoint.getLocation().getPort();
if (logger.isDebugEnabled()) {
logger.debug("Establishing: {}", clientUpdateName);
return connectionConnector.connectServerToClient(endpoint, qManager, isPrimary, failedUpdater,