| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more contributor license |
| * agreements. See the NOTICE file distributed with this work for additional information regarding |
| * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. You may obtain a |
| * copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software distributed under the License |
| * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
| * or implied. See the License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| |
| package org.apache.geode.cache.client.internal; |
| |
| import java.net.InetSocketAddress; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Set; |
| |
| import org.apache.logging.log4j.Logger; |
| |
| import org.apache.geode.annotations.Immutable; |
| import org.apache.geode.distributed.internal.DistributionConfig; |
| import org.apache.geode.distributed.internal.ServerLocation; |
| import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID; |
| import org.apache.geode.internal.cache.tier.sockets.ServerQueueStatus; |
| import org.apache.geode.logging.internal.log4j.api.LogService; |
| import org.apache.geode.security.GemFireSecurityException; |
| |
| /** |
| * A connection source where the list of endpoints is specified explicitly. |
| * |
| * @since GemFire 5.7 |
| * |
| * TODO - the UnusedServerMonitor basically will force the pool to have at least one |
| * connection to each server. Maybe we need to have it create connections that are outside |
| * the pool? |
| * |
| */ |
| public class ExplicitConnectionSourceImpl implements ConnectionSource { |
| |
| private static final Logger logger = LogService.getLogger(); |
| |
| private List<ServerLocation> serverList; |
| private int nextServerIndex = 0; |
| private int nextQueueIndex = 0; |
| private InternalPool pool; |
| |
| /** |
| * A debug flag, which can be toggled by tests to disable/enable shuffling of the endpoints list |
| */ |
| private boolean DISABLE_SHUFFLING = |
| Boolean.getBoolean(DistributionConfig.GEMFIRE_PREFIX + "bridge.disableShufflingOfEndpoints"); |
| |
| ExplicitConnectionSourceImpl(List<InetSocketAddress> contacts) { |
| ArrayList<ServerLocation> serverList = new ArrayList<>(contacts.size()); |
| for (InetSocketAddress addr : contacts) { |
| serverList.add(new ServerLocation(addr.getHostName(), addr.getPort())); |
| } |
| shuffle(serverList); |
| this.serverList = Collections.unmodifiableList(serverList); |
| } |
| |
| @Override |
| public synchronized void start(InternalPool pool) { |
| this.pool = pool; |
| pool.getStats().setInitialContacts(serverList.size()); |
| } |
| |
| @Override |
| public void stop() { |
| // do nothing |
| } |
| |
| @Override |
| public ServerLocation findReplacementServer(ServerLocation currentServer, |
| Set<ServerLocation> excludedServers) { |
| // at this time we always try to find a server other than currentServer |
| // and if we do return it. Otherwise return null; |
| // so that clients would attempt to keep the same number of connections |
| // to each server but it would be a bit of work. |
| // Plus we need to make sure it would work ok for hardware load balancers. |
| HashSet<ServerLocation> excludedPlusCurrent = new HashSet<>(excludedServers); |
| excludedPlusCurrent.add(currentServer); |
| return findServer(excludedPlusCurrent); |
| } |
| |
| @Override |
| public synchronized ServerLocation findServer(Set excludedServers) { |
| if (PoolImpl.TEST_DURABLE_IS_NET_DOWN) { |
| return null; |
| } |
| ServerLocation nextServer; |
| int startIndex = nextServerIndex; |
| do { |
| nextServer = serverList.get(nextServerIndex); |
| if (++nextServerIndex >= serverList.size()) { |
| nextServerIndex = 0; |
| } |
| if (!excludedServers.contains(nextServer)) { |
| return nextServer; |
| } |
| } while (nextServerIndex != startIndex); |
| |
| return null; |
| } |
| |
| /** |
| * TODO - this algorithm could be cleaned up. Right now we have to connect to every server in the |
| * system to find where our durable queue lives. |
| */ |
| @Override |
| public synchronized List<ServerLocation> findServersForQueue(Set<ServerLocation> excludedServers, |
| int numServers, |
| ClientProxyMembershipID proxyId, boolean findDurableQueue) { |
| if (PoolImpl.TEST_DURABLE_IS_NET_DOWN) { |
| return new ArrayList<>(); |
| } |
| if (numServers == -1) { |
| numServers = Integer.MAX_VALUE; |
| } |
| if (findDurableQueue && proxyId.isDurable()) { |
| return findDurableQueues(excludedServers, numServers); |
| } else { |
| return pickQueueServers(excludedServers, numServers); |
| } |
| } |
| |
| @Override |
| public boolean isBalanced() { |
| return false; |
| } |
| |
| private List<ServerLocation> pickQueueServers(Set<ServerLocation> excludedServers, |
| int numServers) { |
| |
| ArrayList<ServerLocation> result = new ArrayList<>(); |
| ServerLocation nextQueue; |
| int startIndex = nextQueueIndex; |
| do { |
| nextQueue = serverList.get(nextQueueIndex); |
| if (++nextQueueIndex >= serverList.size()) { |
| nextQueueIndex = 0; |
| } |
| if (!excludedServers.contains(nextQueue)) { |
| result.add(nextQueue); |
| } |
| } while (nextQueueIndex != startIndex && result.size() < numServers); |
| |
| return result; |
| } |
| |
| /** |
| * a "fake" operation which just extracts the queue status from the connection |
| */ |
| private static class HasQueueOp implements Op { |
| @Immutable |
| static final HasQueueOp SINGLETON = new HasQueueOp(); |
| |
| @Override |
| public Object attempt(Connection cnx) throws Exception { |
| ServerQueueStatus status = cnx.getQueueStatus(); |
| return status.isNonRedundant() ? Boolean.FALSE : Boolean.TRUE; |
| } |
| } |
| |
| private List<ServerLocation> findDurableQueues(Set<ServerLocation> excludedServers, |
| int numServers) { |
| ArrayList<ServerLocation> durableServers = new ArrayList<>(); |
| ArrayList<ServerLocation> otherServers = new ArrayList<>(); |
| |
| logger.debug("ExplicitConnectionSource - looking for durable queue"); |
| |
| for (ServerLocation server : serverList) { |
| if (excludedServers.contains(server)) { |
| continue; |
| } |
| |
| // the pool will automatically create a connection to this server |
| // and store it for future use. |
| Boolean hasQueue; |
| try { |
| hasQueue = (Boolean) pool.executeOn(server, HasQueueOp.SINGLETON); |
| } catch (GemFireSecurityException e) { |
| throw e; |
| } catch (Exception e) { |
| if (e.getCause() instanceof GemFireSecurityException) { |
| throw (GemFireSecurityException) e.getCause(); |
| } |
| if (logger.isDebugEnabled()) { |
| logger.debug("Unabled to check for durable queue on server {}: {}", server, e); |
| } |
| continue; |
| } |
| if (hasQueue != null) { |
| if (hasQueue) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("Durable queue found on {}", server); |
| } |
| durableServers.add(server); |
| } else { |
| if (logger.isDebugEnabled()) { |
| logger.debug("Durable queue was not found on {}", server); |
| } |
| otherServers.add(server); |
| } |
| } |
| } |
| |
| int remainingServers = numServers - durableServers.size(); |
| if (remainingServers > otherServers.size()) { |
| remainingServers = otherServers.size(); |
| } |
| // note, we're always prefering the servers in the beginning of the list |
| // but that's ok because we already shuffled the list in our constructor. |
| if (remainingServers > 0) { |
| durableServers.addAll(otherServers.subList(0, remainingServers)); |
| nextQueueIndex = remainingServers % serverList.size(); |
| } |
| |
| if (logger.isDebugEnabled()) { |
| logger.debug("found {} servers out of {}", durableServers.size(), numServers); |
| } |
| |
| return durableServers; |
| } |
| |
| private void shuffle(List endpoints) { |
| // this check was copied from ConnectionProxyImpl |
| if (endpoints.size() < 2 || DISABLE_SHUFFLING) { |
| /* |
| * It is not safe to shuffle an ArrayList of size 1 java.lang.IndexOutOfBoundsException: |
| * Index: 1, Size: 1 at java.util.ArrayList.RangeCheck(Unknown Source) at |
| * java.util.ArrayList.get(Unknown Source) at java.util.Collections.swap(Unknown Source) at |
| * java.util.Collections.shuffle(Unknown Source) |
| */ |
| return; |
| } |
| Collections.shuffle(endpoints); |
| } |
| |
| @Override |
| public String toString() { |
| StringBuilder sb = new StringBuilder(); |
| sb.append("EndPoints["); |
| synchronized (this) { |
| Iterator it = serverList.iterator(); |
| while (it.hasNext()) { |
| ServerLocation loc = (ServerLocation) it.next(); |
| sb.append(loc.getHostName()).append(":").append(loc.getPort()); |
| if (it.hasNext()) { |
| sb.append(","); |
| } |
| } |
| } |
| sb.append("]"); |
| return sb.toString(); |
| } |
| |
| @Override |
| public ArrayList<ServerLocation> getAllServers() { |
| return new ArrayList<>(serverList); |
| } |
| |
| @Override |
| public List<InetSocketAddress> getOnlineLocators() { |
| return Collections.emptyList(); |
| } |
| } |