| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more contributor license |
| * agreements. See the NOTICE file distributed with this work for additional information regarding |
| * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. You may obtain a |
| * copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software distributed under the License |
| * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
| * or implied. See the License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| package org.apache.geode.internal.cache.tier.sockets; |
| |
| import static java.util.concurrent.TimeUnit.MILLISECONDS; |
| import static java.util.concurrent.TimeUnit.SECONDS; |
| import static org.apache.geode.distributed.ConfigurationProperties.SECURITY_CLIENT_ACCESSOR_PP; |
| import static org.apache.geode.internal.cache.tier.CommunicationMode.ClientToServerForQueue; |
| import static org.apache.geode.internal.cache.tier.sockets.Handshake.REPLY_REFUSED; |
| |
| import java.io.DataOutputStream; |
| import java.io.EOFException; |
| import java.io.IOException; |
| import java.io.InterruptedIOException; |
| import java.io.OutputStream; |
| import java.net.Inet6Address; |
| import java.net.InetAddress; |
| import java.net.InetSocketAddress; |
| import java.net.ServerSocket; |
| import java.net.Socket; |
| import java.net.SocketException; |
| import java.net.SocketTimeoutException; |
| import java.net.UnknownHostException; |
| import java.nio.ByteBuffer; |
| import java.nio.channels.AsynchronousCloseException; |
| import java.nio.channels.CancelledKeyException; |
| import java.nio.channels.ClosedChannelException; |
| import java.nio.channels.ClosedSelectorException; |
| import java.nio.channels.SelectionKey; |
| import java.nio.channels.Selector; |
| import java.nio.channels.ServerSocketChannel; |
| import java.nio.channels.SocketChannel; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.concurrent.BlockingQueue; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.LinkedBlockingQueue; |
| import java.util.concurrent.RejectedExecutionException; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.function.Supplier; |
| |
| import io.micrometer.core.instrument.MeterRegistry; |
| import org.apache.logging.log4j.Logger; |
| |
| import org.apache.geode.CancelException; |
| import org.apache.geode.DataSerializer; |
| import org.apache.geode.StatisticsFactory; |
| import org.apache.geode.SystemFailure; |
| import org.apache.geode.ToDataException; |
| import org.apache.geode.annotations.internal.MakeNotStatic; |
| import org.apache.geode.cache.RegionDestroyedException; |
| import org.apache.geode.cache.client.internal.PoolImpl; |
| import org.apache.geode.cache.server.CacheServer; |
| import org.apache.geode.cache.wan.GatewayTransportFilter; |
| import org.apache.geode.distributed.DistributedMember; |
| import org.apache.geode.distributed.internal.DistributionConfig; |
| import org.apache.geode.distributed.internal.DistributionManager; |
| import org.apache.geode.distributed.internal.InternalDistributedSystem; |
| import org.apache.geode.distributed.internal.LonerDistributionManager; |
| import org.apache.geode.distributed.internal.ReplyProcessor21; |
| import org.apache.geode.internal.HeapDataOutputStream; |
| import org.apache.geode.internal.SystemTimer; |
| import org.apache.geode.internal.cache.BucketAdvisor; |
| import org.apache.geode.internal.cache.BucketAdvisor.BucketProfile; |
| import org.apache.geode.internal.cache.InternalCache; |
| import org.apache.geode.internal.cache.PartitionedRegion; |
| import org.apache.geode.internal.cache.partitioned.AllBucketProfilesUpdateMessage; |
| import org.apache.geode.internal.cache.tier.Acceptor; |
| import org.apache.geode.internal.cache.tier.CachedRegionHelper; |
| import org.apache.geode.internal.cache.tier.CommunicationMode; |
| import org.apache.geode.internal.cache.tier.OverflowAttributes; |
| import org.apache.geode.internal.cache.tier.sockets.CacheClientNotifier.CacheClientNotifierProvider; |
| import org.apache.geode.internal.cache.tier.sockets.ClientHealthMonitor.ClientHealthMonitorProvider; |
| import org.apache.geode.internal.cache.wan.GatewayReceiverStats; |
| import org.apache.geode.internal.inet.LocalHostUtil; |
| import org.apache.geode.internal.logging.CoreLoggingExecutors; |
| import org.apache.geode.internal.monitoring.ThreadsMonitoring; |
| import org.apache.geode.internal.net.SocketCreator; |
| import org.apache.geode.internal.security.SecurityService; |
| import org.apache.geode.internal.serialization.KnownVersion; |
| import org.apache.geode.internal.statistics.StatisticsClock; |
| import org.apache.geode.internal.statistics.StatisticsClockFactory; |
| import org.apache.geode.internal.tcp.ConnectionTable; |
| import org.apache.geode.internal.util.ArrayUtils; |
| import org.apache.geode.logging.internal.executors.LoggingThread; |
| import org.apache.geode.logging.internal.executors.LoggingThreadFactory.CommandWrapper; |
| import org.apache.geode.logging.internal.executors.LoggingThreadFactory.ThreadInitializer; |
| import org.apache.geode.logging.internal.log4j.api.LogService; |
| |
| /** |
| * Implements the acceptor thread on the cache server. Accepts connections from the edge and starts |
| * up threads to process requests from these. |
| * |
| * @since GemFire 2.0.2 |
| */ |
| @SuppressWarnings("deprecation") |
| public class AcceptorImpl implements Acceptor, Runnable { |
| private static final Logger logger = LogService.getLogger(); |
| |
| private static final boolean isJRockit = System.getProperty("java.vm.name").contains("JRockit"); |
| private static final int HANDSHAKER_DEFAULT_POOL_SIZE = 4; |
| private static final int CLIENT_QUEUE_INITIALIZATION_POOL_SIZE = 16; |
| |
| private final CacheServerStats stats; |
| private final int maxConnections; |
| private final int maxThreads; |
| private final int maximumTimeBetweenPings; |
| |
| private final ExecutorService pool; |
| /** |
| * A pool used to process handshakes. |
| */ |
| private final ExecutorService hsPool; |
| |
| /** |
| * A pool used to process client-queue-initializations. |
| */ |
| private final ExecutorService clientQueueInitPool; |
| |
| /** |
| * The port on which this acceptor listens for client connections |
| */ |
| private final int localPort; |
| |
| /** |
| * The server socket that handles requests for connections |
| */ |
| private ServerSocket serverSock; |
| |
| /** |
| * The GemFire cache served up by this acceptor |
| */ |
| private final InternalCache cache; |
| |
| /** |
| * Caches region information |
| */ |
| private final CachedRegionHelper crHelper; |
| |
| /** |
| * A lock to prevent close from occurring while creating a ServerConnection |
| */ |
| private final Object syncLock = new Object(); |
| |
| /** |
| * THE selector for the cache server; null if no selector. |
| */ |
| private final Selector selector; |
| |
| /** |
| * Used for managing direct byte buffer for client comms; null if no selector. |
| */ |
| private final LinkedBlockingQueue<ByteBuffer> commBufferQueue; |
| /** |
| * Used to timeout accepted sockets that we are waiting for the handshake packet |
| */ |
| private final SystemTimer hsTimer; |
| /** |
| * A queue used to feed register requests to the selector; null if no selector. |
| */ |
| private final LinkedBlockingQueue<ServerConnection> selectorQueue; |
| /** |
| * All the objects currently registered with selector. |
| */ |
| private final Set<ServerConnection> selectorRegistrations; |
| /** |
| * tcpNoDelay setting for outgoing sockets |
| */ |
| private final boolean tcpNoDelay; |
| |
| /** |
| * The name of a system property that sets the hand shake timeout (in milliseconds). This is how |
| * long a client will wait to hear back from a server. |
| */ |
| private static final String HANDSHAKE_TIMEOUT_PROPERTY_NAME = "BridgeServer.handShakeTimeout"; |
| |
| /** |
| * The default value of the {@link #HANDSHAKE_TIMEOUT_PROPERTY_NAME} system property. |
| */ |
| private static final int DEFAULT_HANDSHAKE_TIMEOUT_MS = 59000; |
| |
| /** |
| * Test value for handshake timeout |
| */ |
| private static final int handshakeTimeout = |
| Integer.getInteger(HANDSHAKE_TIMEOUT_PROPERTY_NAME, DEFAULT_HANDSHAKE_TIMEOUT_MS); |
| |
| /** |
| * The name of a system property that sets the accept timeout (in milliseconds). This is how long |
| * a server will wait to get its first byte from a client it has just accepted. |
| */ |
| public static final String ACCEPT_TIMEOUT_PROPERTY_NAME = "BridgeServer.acceptTimeout"; |
| |
| /** |
| * The default value of the {@link #ACCEPT_TIMEOUT_PROPERTY_NAME} system property. |
| */ |
| private static final int DEFAULT_ACCEPT_TIMEOUT_MS = 9900; |
| |
| /** |
| * Test value for accept timeout |
| */ |
| private final int acceptTimeout = |
| Integer.getInteger(ACCEPT_TIMEOUT_PROPERTY_NAME, DEFAULT_ACCEPT_TIMEOUT_MS); |
| |
| /** |
| * The minimum value of max-connections |
| */ |
| static final int MINIMUM_MAX_CONNECTIONS = 16; |
| |
| /** |
| * The buffer size for server-side sockets. |
| */ |
| private final int socketBufferSize; |
| |
| /** |
| * Notifies clients of updates |
| */ |
| private final CacheClientNotifier clientNotifier; |
| |
| /** |
| * The default value of the {@link ServerSocket} {@link #BACKLOG_PROPERTY_NAME}system property |
| */ |
| private static final int DEFAULT_BACKLOG = 1280; |
| |
| /** |
| * The system property name for setting the {@link ServerSocket}backlog |
| */ |
| private static final String BACKLOG_PROPERTY_NAME = "BridgeServer.backlog"; |
| |
| /** |
| * The name of a system property that Defines the time interval (in nano-seconds) with which |
| * checkRegisteredKeys function can be called. |
| */ |
| private static final String CHECK_REGISTERED_KEYS_INTERVAL_NAME = |
| "check-registered-keys-interval-ns"; |
| |
| /** |
| * The default value of {@link #CHECK_REGISTERED_KEYS_INTERVAL_NAME} system property. |
| */ |
| private static final int DEFAULT_CHECK_REGISTERED_KEYS_INTERVAL_NS = 0; |
| |
| /** |
| * Set value of check registered keys interval |
| */ |
| private final long checkRegisteredKeysInterval = Long |
| .getLong(CHECK_REGISTERED_KEYS_INTERVAL_NAME, DEFAULT_CHECK_REGISTERED_KEYS_INTERVAL_NS); |
| |
| /** |
| * Current number of ServerConnection instances that are CLIENT_TO_SERVER cons. |
| */ |
| private final AtomicInteger clientServerCnxCount = new AtomicInteger(); |
| |
| /** |
| * Has this acceptor been shut down |
| */ |
| private volatile boolean shutdownStarted; |
| |
| /** |
| * The thread that runs the acceptor |
| */ |
| private Thread thread; |
| |
| /** |
| * The thread that runs the selector loop if any |
| */ |
| private Thread selectorThread; |
| |
| /** |
| * Controls updates to {@link #allSCs} |
| */ |
| private final Object allSCsLock = new Object(); |
| |
| /** |
| * List of ServerConnection. |
| * |
| * Instances added when constructed; removed when terminated. |
| * |
| * guarded.By {@link #allSCsLock} |
| */ |
| private final Set<ServerConnection> allSCs = new HashSet<>(); |
| |
| /** |
| * List of ServerConnections, for {@link #emergencyClose()} |
| * |
| * guarded.By {@link #allSCsLock} |
| */ |
| private volatile ServerConnection[] allSCList = new ServerConnection[0]; |
| |
| /** |
| * The ip address or host name this acceptor is to bind to; {@code null} or "" indicates it |
| * will listen on all local addresses. |
| * |
| * @since GemFire 5.7 |
| */ |
| private final String bindHostName; |
| |
| /** |
| * A listener for connect/disconnect events |
| */ |
| private final ConnectionListener connectionListener; |
| |
| /** |
| * The client health monitor tracking connections for this acceptor |
| */ |
| private final ClientHealthMonitor healthMonitor; |
| |
| /** |
| * bridge's setting of notifyBySubscription |
| */ |
| private final boolean notifyBySubscription; |
| |
| /** |
| * The AcceptorImpl identifier, used to identify the clients connected to this Acceptor. |
| */ |
| private long acceptorId; |
| |
| @MakeNotStatic |
| private static boolean isAuthenticationRequired; |
| |
| @MakeNotStatic |
| private static boolean isPostAuthzCallbackPresent; |
| |
| private final boolean isGatewayReceiver; |
| |
| private final List<GatewayTransportFilter> gatewayTransportFilters; |
| private final StatisticsClock statisticsClock; |
| |
| private final SocketCreator socketCreator; |
| |
| private final SecurityService securityService; |
| |
| private final ServerConnectionFactory serverConnectionFactory; |
| |
| /** |
| * Constructs an AcceptorImpl for use within a CacheServer. |
| * |
| * <p> |
| * Initializes this acceptor thread to listen for connections on the given port. |
| * |
| * @param port The port on which this acceptor listens for connections. If {@code 0}, a |
| * random port will be chosen. |
| * @param bindHostName The ip address or host name this acceptor listens on for connections. If |
| * {@code null} or "" then all local addresses are used |
| * @param socketBufferSize The buffer size for server-side sockets |
| * @param maximumTimeBetweenPings The maximum time between client pings. This value is used by the |
| * {@code ClientHealthMonitor} to monitor the health of this server's clients. |
| * @param internalCache The GemFire cache whose contents is served to clients |
| * @param maxConnections the maximum number of connections allowed in the server pool |
| * @param maxThreads the maximum number of threads allowed in the server pool |
| * @param securityService the SecurityService to use for authentication and authorization |
| * |
| * @see ClientHealthMonitor |
| * |
| * @since GemFire 5.7 |
| */ |
| AcceptorImpl(final int port, final String bindHostName, final boolean notifyBySubscription, |
| final int socketBufferSize, final int maximumTimeBetweenPings, |
| final InternalCache internalCache, final int maxConnections, final int maxThreads, |
| final int maximumMessageCount, final int messageTimeToLive, |
| final ConnectionListener connectionListener, final OverflowAttributes overflowAttributes, |
| final boolean tcpNoDelay, final ServerConnectionFactory serverConnectionFactory, |
| final long timeLimitMillis, final SecurityService securityService, |
| final Supplier<SocketCreator> socketCreatorSupplier, |
| final CacheClientNotifierProvider cacheClientNotifierProvider, |
| final ClientHealthMonitorProvider clientHealthMonitorProvider) throws IOException { |
| this(port, bindHostName, notifyBySubscription, socketBufferSize, maximumTimeBetweenPings, |
| internalCache, maxConnections, maxThreads, maximumMessageCount, messageTimeToLive, |
| connectionListener, overflowAttributes, tcpNoDelay, serverConnectionFactory, |
| timeLimitMillis, securityService, socketCreatorSupplier, cacheClientNotifierProvider, |
| clientHealthMonitorProvider, false, Collections.emptyList(), |
| StatisticsClockFactory.disabledClock()); |
| } |
| |
| /** |
| * Constructs an AcceptorImpl for use within a GatewayReceiver. |
| * |
| * <p> |
| * Initializes this acceptor thread to listen for connections on the given port. |
| * |
| * @param port The port on which this acceptor listens for connections. If {@code 0}, a |
| * random port will be chosen. |
| * @param notifyBySubscription the bridges setting whether if it to be notified by subscription |
| * @param maximumMessageCount maximum message count setting in the Cache Client Notifier |
| * @param messageTimeToLive message time to live setting in the Cache Client Notifier |
| * @param connectionListener listener to detect if connect or disconnect events |
| * @param overflowAttributes overflow attributes of Cache Client Notifier |
| * @param tcpNoDelay TCP delay for the outgoing sockets |
| * @param serverConnectionFactory server connection factory for the client |
| * @param timeLimitMillis time limit to wait attemping to bind to a server socket |
| * @param socketCreatorSupplier socket creator for the server connection |
| * @param cacheClientNotifierProvider collection of cache client notifiers |
| * @param clientHealthMonitorProvider collection of clinet health monitors |
| * @param isGatewayReceiver flag to determine if member is gateway receiver |
| * @param statisticsClock maintains the JVM's clock |
| * @param bindHostName The ip address or host name this acceptor listens on for connections. If |
| * {@code null} or "" then all local addresses are used |
| * @param socketBufferSize The buffer size for server-side sockets |
| * @param maximumTimeBetweenPings The maximum time between client pings. This value is used by the |
| * {@code ClientHealthMonitor} to monitor the health of this server's clients. |
| * @param internalCache The GemFire cache whose contents is served to clients |
| * @param maxConnections the maximum number of connections allowed in the server pool |
| * @param maxThreads the maximum number of threads allowed in the server pool |
| * @param securityService the SecurityService to use for authentication and authorization |
| * @param gatewayTransportFilters List of GatewayTransportFilters |
| */ |
| AcceptorImpl(final int port, final String bindHostName, final boolean notifyBySubscription, |
| final int socketBufferSize, final int maximumTimeBetweenPings, |
| final InternalCache internalCache, final int maxConnections, final int maxThreads, |
| final int maximumMessageCount, final int messageTimeToLive, |
| final ConnectionListener connectionListener, |
| final OverflowAttributes overflowAttributes, |
| final boolean tcpNoDelay, final ServerConnectionFactory serverConnectionFactory, |
| final long timeLimitMillis, final SecurityService securityService, |
| final Supplier<SocketCreator> socketCreatorSupplier, |
| final CacheClientNotifierProvider cacheClientNotifierProvider, |
| final ClientHealthMonitorProvider clientHealthMonitorProvider, |
| final boolean isGatewayReceiver, |
| final List<GatewayTransportFilter> gatewayTransportFilters, |
| final StatisticsClock statisticsClock) throws IOException { |
| this.securityService = securityService; |
| this.statisticsClock = statisticsClock; |
| |
| this.isGatewayReceiver = isGatewayReceiver; |
| this.gatewayTransportFilters = gatewayTransportFilters; |
| |
| this.bindHostName = calcBindHostName(internalCache, bindHostName); |
| this.connectionListener = |
| connectionListener == null ? new ConnectionListenerAdapter() : connectionListener; |
| this.notifyBySubscription = notifyBySubscription; |
| this.serverConnectionFactory = serverConnectionFactory; |
| |
| { |
| int tmp_maxConnections = maxConnections; |
| if (tmp_maxConnections < MINIMUM_MAX_CONNECTIONS) { |
| tmp_maxConnections = MINIMUM_MAX_CONNECTIONS; |
| } |
| this.maxConnections = tmp_maxConnections; |
| } |
| { |
| int tmp_maxThreads = maxThreads; |
| if (maxThreads == CacheServer.DEFAULT_MAX_THREADS) { |
| // consult system properties for 5.0.2 backwards compatibility |
| if (DEPRECATED_SELECTOR) { |
| tmp_maxThreads = DEPRECATED_SELECTOR_POOL_SIZE; |
| } |
| } |
| if (tmp_maxThreads < 0) { |
| tmp_maxThreads = 0; |
| } else if (tmp_maxThreads > this.maxConnections) { |
| tmp_maxThreads = this.maxConnections; |
| } |
| boolean isWindows = false; |
| String os = System.getProperty("os.name"); |
| if (os != null) { |
| if (os.contains("Windows")) { |
| isWindows = true; |
| } |
| } |
| if (tmp_maxThreads > 0 && isWindows) { |
| // bug #40472 and JDK bug 6230761 - NIO can't be used with IPv6 on Windows |
| if (getBindAddress() instanceof Inet6Address) { |
| logger.warn( |
| "Ignoring max-threads setting and using zero instead due to JRockit NIO bugs. See GemFire bug #40198"); |
| tmp_maxThreads = 0; |
| } |
| // bug #40198 - Selector.wakeup() hangs if VM starts to exit |
| if (isJRockit) { |
| logger.warn( |
| "Ignoring max-threads setting and using zero instead due to Java bug 6230761: NIO does not work with IPv6 on Windows. See GemFire bug #40472"); |
| tmp_maxThreads = 0; |
| } |
| } |
| this.maxThreads = tmp_maxThreads; |
| } |
| { |
| Selector tmp_s = null; |
| LinkedBlockingQueue<ServerConnection> tmp_q = null; |
| LinkedBlockingQueue<ByteBuffer> tmp_commQ = null; |
| Set<ServerConnection> tmp_hs = null; |
| SystemTimer tmp_timer = null; |
| if (isSelector()) { |
| tmp_s = Selector.open(); // no longer catch ex to fix bug 36907 |
| tmp_q = new LinkedBlockingQueue<>(); |
| tmp_commQ = new LinkedBlockingQueue<>(); |
| tmp_hs = new HashSet<>(512); |
| tmp_timer = new SystemTimer(internalCache.getDistributedSystem()); |
| } |
| selector = tmp_s; |
| selectorQueue = tmp_q; |
| commBufferQueue = tmp_commQ; |
| selectorRegistrations = tmp_hs; |
| hsTimer = tmp_timer; |
| this.tcpNoDelay = tcpNoDelay; |
| } |
| |
| { |
| socketCreator = socketCreatorSupplier.get(); |
| |
| final InternalCache gc; |
| if (getCachedRegionHelper() != null) { |
| gc = getCachedRegionHelper().getCache(); |
| } else { |
| gc = null; |
| } |
| final int backLog = Integer.getInteger(BACKLOG_PROPERTY_NAME, DEFAULT_BACKLOG); |
| final long tilt = System.currentTimeMillis() + timeLimitMillis; |
| |
| if (isSelector()) { |
| if (socketCreator.forCluster().useSSL()) { |
| throw new IllegalArgumentException( |
| "Selector thread pooling can not be used with client/server SSL. The selector can be disabled by setting max-threads=0."); |
| } |
| ServerSocketChannel channel = ServerSocketChannel.open(); |
| serverSock = channel.socket(); |
| serverSock.setReuseAddress(true); |
| |
| // Set the receive buffer size before binding the socket so that large |
| // buffers will be allocated on accepted sockets (see |
| // java.net.ServerSocket.setReceiverBufferSize javadocs) |
| serverSock.setReceiveBufferSize(socketBufferSize); |
| |
| // fix for bug 36617. If BindException is thrown, retry after |
| // sleeping. The server may have been stopped and then |
| // immediately restarted, which sometimes results in a bind exception |
| for (;;) { |
| try { |
| serverSock.bind(new InetSocketAddress(getBindAddress(), port), backLog); |
| break; |
| } catch (SocketException b) { |
| if (System.currentTimeMillis() > tilt) { |
| throw b; |
| } |
| } |
| |
| boolean interrupted = Thread.interrupted(); |
| try { |
| Thread.sleep(1000); |
| } catch (InterruptedException e) { |
| interrupted = true; |
| } finally { |
| if (interrupted) { |
| Thread.currentThread().interrupt(); |
| } |
| } |
| if (gc != null) { |
| gc.getCancelCriterion().checkCancelInProgress(null); |
| } |
| } // for |
| } // isSelector |
| else { // !isSelector |
| // fix for bug 36617. If BindException is thrown, retry after |
| // sleeping. The server may have been stopped and then |
| // immediately restarted, which sometimes results in a bind exception |
| for (;;) { |
| try { |
| serverSock = socketCreator.createServerSocket(port, backLog, getBindAddress(), |
| this.gatewayTransportFilters, socketBufferSize); |
| break; |
| } catch (SocketException e) { |
| if (System.currentTimeMillis() > tilt) { |
| throw e; |
| } |
| } |
| |
| boolean interrupted = Thread.interrupted(); |
| try { |
| Thread.sleep(1000); |
| } catch (InterruptedException e) { |
| interrupted = true; |
| } finally { |
| if (interrupted) { |
| Thread.currentThread().interrupt(); |
| } |
| } |
| if (gc != null) { |
| gc.getCancelCriterion().checkCancelInProgress(null); |
| } |
| } // for |
| } // !isSelector |
| |
| if (port == 0) { |
| localPort = serverSock.getLocalPort(); |
| } else { |
| localPort = port; |
| } |
| { |
| InternalDistributedSystem ds = internalCache.getInternalDistributedSystem(); |
| if (ds != null) { |
| DistributionManager dm = ds.getDistributionManager(); |
| if (dm != null && dm.getDistributionManagerId().getMembershipPort() == 0 |
| && dm instanceof LonerDistributionManager) { |
| // a server with a loner distribution manager - update it's port number |
| ((LonerDistributionManager) dm).updateLonerPort(localPort); |
| } |
| } |
| } |
| String sockName = getServerName(); |
| logger.info("Cache server connection listener bound to address {} with backlog {}.", |
| sockName, backLog); |
| StatisticsFactory statisticsFactory = |
| internalCache.getInternalDistributedSystem().getStatisticsManager(); |
| if (isGatewayReceiver()) { |
| MeterRegistry meterRegistry = internalCache.getMeterRegistry(); |
| stats = GatewayReceiverStats.createGatewayReceiverStats(statisticsFactory, sockName, |
| meterRegistry); |
| } else { |
| stats = new CacheServerStats(statisticsFactory, sockName); |
| } |
| } |
| |
| cache = internalCache; |
| crHelper = new CachedRegionHelper(cache); |
| |
| clientNotifier = |
| cacheClientNotifierProvider.get(internalCache, new ClientRegistrationEventQueueManager(), |
| statisticsClock, stats, maximumMessageCount, |
| messageTimeToLive, this.connectionListener, overflowAttributes, isGatewayReceiver()); |
| |
| this.socketBufferSize = socketBufferSize; |
| |
| // Create the singleton ClientHealthMonitor |
| this.maximumTimeBetweenPings = maximumTimeBetweenPings; |
| healthMonitor = clientHealthMonitorProvider.get(internalCache, maximumTimeBetweenPings, |
| clientNotifier.getStats()); |
| |
| pool = initializeServerConnectionThreadPool(); |
| hsPool = initializeHandshakerThreadPool(); |
| clientQueueInitPool = initializeClientQueueInitializerThreadPool(); |
| |
| isAuthenticationRequired = securityService.isClientSecurityRequired(); |
| |
| String postAuthzFactoryName = |
| cache.getDistributedSystem().getProperties().getProperty(SECURITY_CLIENT_ACCESSOR_PP); |
| |
| isPostAuthzCallbackPresent = |
| postAuthzFactoryName != null && !postAuthzFactoryName.isEmpty(); |
| } |
| |
| private ExecutorService initializeHandshakerThreadPool() throws IOException { |
| String threadName = |
| "Handshaker " + serverSock.getInetAddress() + ":" + localPort + " Thread "; |
| try { |
| logger.warn("Handshaker max Pool size: " + HANDSHAKE_POOL_SIZE); |
| return CoreLoggingExecutors.newThreadPoolWithSynchronousFeedThatHandlesRejection(1, |
| HANDSHAKE_POOL_SIZE, 60, SECONDS, threadName, |
| thread -> getStats().incAcceptThreadsCreated(), null); |
| } catch (IllegalArgumentException poolInitException) { |
| stats.close(); |
| serverSock.close(); |
| pool.shutdown(); |
| throw poolInitException; |
| } |
| } |
| |
| private ExecutorService initializeClientQueueInitializerThreadPool() { |
| return CoreLoggingExecutors.newThreadPoolWithSynchronousFeed( |
| CLIENT_QUEUE_INITIALIZATION_POOL_SIZE, 60000, MILLISECONDS, |
| "Client Queue Initialization Thread ", |
| command -> { |
| try { |
| command.run(); |
| } catch (CancelException e) { |
| logger.debug("Client Queue Initialization was canceled.", e); |
| } |
| }, |
| getStats().getCnxPoolHelper(), getThreadMonitorObj()); |
| } |
| |
| private ExecutorService initializeServerConnectionThreadPool() throws IOException { |
| ThreadInitializer threadInitializer = thread -> getStats().incConnectionThreadsCreated(); |
| CommandWrapper commandWrapper = command -> { |
| try { |
| command.run(); |
| } catch (CancelException e) { // bug 39463 |
| // ignore |
| } finally { |
| ConnectionTable.releaseThreadsSockets(); |
| } |
| }; |
| try { |
| String threadName = "ServerConnection on port " + localPort + " Thread "; |
| if (isSelector()) { |
| return CoreLoggingExecutors.newThreadPoolWithUnlimitedFeed(maxThreads, Integer.MAX_VALUE, |
| MILLISECONDS, threadName, threadInitializer, commandWrapper, |
| getStats().getCnxPoolHelper(), getThreadMonitorObj()); |
| } |
| return CoreLoggingExecutors.newThreadPoolWithSynchronousFeed(MINIMUM_MAX_CONNECTIONS, |
| maxConnections, 0L, SECONDS, threadName, threadInitializer, commandWrapper); |
| } catch (IllegalArgumentException poolInitException) { |
| stats.close(); |
| serverSock.close(); |
| throw poolInitException; |
| } |
| } |
| |
| private ThreadsMonitoring getThreadMonitorObj() { |
| DistributionManager distributionManager = cache.getDistributionManager(); |
| if (distributionManager != null) { |
| return distributionManager.getThreadMonitoring(); |
| } |
| return null; |
| } |
| |
| @Override |
| public long getAcceptorId() { |
| return acceptorId; |
| } |
| |
| @Override |
| public CacheServerStats getStats() { |
| return stats; |
| } |
| |
| /** |
| * Returns true if this acceptor is using a selector to detect client events. |
| */ |
| @Override |
| public boolean isSelector() { |
| return maxThreads > 0; |
| } |
| |
| /** |
| * This system property is only used if max-threads == 0. This is for 5.0.2 backwards |
| * compatibility. |
| * |
| * @deprecated since 5.1 use cache-server max-threads instead |
| */ |
| @Deprecated |
| private static final boolean DEPRECATED_SELECTOR = Boolean.getBoolean("BridgeServer.SELECTOR"); |
| |
| /** |
| * This system property is only used if max-threads == 0. This is for 5.0.2 backwards |
| * compatibility. |
| * |
| * @deprecated since 5.1 use cache-server max-threads instead |
| */ |
| @Deprecated |
| private final int DEPRECATED_SELECTOR_POOL_SIZE = |
| Integer.getInteger("BridgeServer.SELECTOR_POOL_SIZE", 16); |
| private final int HANDSHAKE_POOL_SIZE = Integer |
| .getInteger("BridgeServer.HANDSHAKE_POOL_SIZE", HANDSHAKER_DEFAULT_POOL_SIZE); |
| |
| @Override |
| public void start() { |
| // This thread should not be a daemon to keep BridgeServers created |
| // in code from exiting immediately. |
| thread = |
| new LoggingThread("Cache Server Acceptor " + serverSock.getInetAddress() + ":" |
| + localPort + " local port: " + serverSock.getLocalPort(), false, this); |
| acceptorId = thread.getId(); |
| thread.start(); |
| |
| if (isSelector()) { |
| selectorThread = |
| new LoggingThread("Cache Server Selector " + serverSock.getInetAddress() + ":" |
| + localPort + " local port: " + serverSock.getLocalPort(), |
| false, |
| this::runSelectorLoop); |
| selectorThread.start(); |
| } |
| Set<PartitionedRegion> prs = cache.getPartitionedRegions(); |
| for (PartitionedRegion pr : prs) { |
| Map<Integer, BucketAdvisor.BucketProfile> profiles = |
| new HashMap<>(); |
| // get all local real bucket advisors |
| Map<Integer, BucketAdvisor> advisors = pr.getRegionAdvisor().getAllBucketAdvisors(); |
| for (Map.Entry<Integer, BucketAdvisor> entry : advisors.entrySet()) { |
| BucketAdvisor advisor = entry.getValue(); |
| // addLocally |
| BucketProfile bp = (BucketProfile) advisor.createProfile(); |
| advisor.updateServerBucketProfile(bp); |
| // advisor.basicAddClientProfile(bp); |
| profiles.put(entry.getKey(), bp); |
| } |
| Set recipients = pr.getRegionAdvisor().adviseAllPRNodes(); |
| // send it to all in one message |
| ReplyProcessor21 reply = AllBucketProfilesUpdateMessage.send(recipients, |
| pr.getDistributionManager(), pr.getPRId(), profiles); |
| if (reply != null) { |
| reply.waitForRepliesUninterruptibly(); |
| } |
| } |
| } |
| |
| @Override |
| public void registerServerConnection(ServerConnection serverConnection) { |
| synchronized (syncLock) { |
| if (!isRunning()) { |
| finishCon(serverConnection); |
| return; |
| } |
| } |
| getSelectorQueue().offer(serverConnection); |
| wakeupSelector(); |
| } |
| |
| /** |
| * wake up the selector thread |
| */ |
| private void wakeupSelector() { |
| Selector s = getSelector(); |
| if (s != null && s.isOpen()) { |
| selector.wakeup(); |
| } |
| } |
| |
| @Override |
| public void unregisterServerConnection(ServerConnection serverConnection) { |
| // removed syncLock synchronization to fix bug 37104 |
| synchronized (allSCsLock) { |
| allSCs.remove(serverConnection); |
| Iterator it = allSCs.iterator(); |
| ServerConnection[] again = new ServerConnection[allSCs.size()]; |
| for (int i = 0; i < again.length; i++) { |
| again[i] = (ServerConnection) it.next(); |
| } |
| allSCList = again; |
| } |
| if (!isRunning()) { |
| return; |
| } |
| // just need to wake the selector up so it will notice our socket was closed |
| wakeupSelector(); |
| } |
| |
| private void finishCon(ServerConnection sc) { |
| if (sc != null) { |
| sc.handleTermination(); |
| } |
| } |
| |
| private void drainSelectorQueue() { |
| ServerConnection sc = selectorQueue.poll(); |
| CancelException cce = null; |
| while (sc != null) { |
| try { |
| finishCon(sc); |
| } catch (CancelException e) { |
| if (cce == null) { |
| cce = e; |
| } |
| } |
| sc = selectorQueue.poll(); |
| } |
| for (ServerConnection selectorRegistration : selectorRegistrations) { |
| try { |
| finishCon(selectorRegistration); |
| } catch (CancelException e) { |
| if (cce == null) { |
| cce = e; |
| } |
| } |
| } // while |
| if (cce != null) { |
| throw cce; |
| } |
| } |
| |
| /** |
| * @see SystemFailure#emergencyClose() |
| */ |
| @Override |
| public void emergencyClose() { |
| ServerSocket ss = serverSock; |
| if (ss != null) { |
| try { |
| ss.close(); |
| } catch (IOException e) { |
| // ignore |
| } |
| } |
| // this.selector.close(); might NOT be safe |
| crHelper.setShutdown(true); |
| |
| // TODO I'm worried about a fat lock to acquire this synchronization |
| // synchronized (this.allSCsLock) |
| { |
| ServerConnection[] snap = allSCList; |
| for (ServerConnection serverConnection : snap) { |
| serverConnection.emergencyClose(); // part of cleanup() |
| } |
| } |
| } |
| |
| private boolean isRegisteredObjectClosed(ServerConnection sc) { |
| return sc.isClosed(); |
| } |
| |
| private int checkRegisteredKeys(int count) { |
| int result = count; |
| CancelException cce = null; |
| if (count > 0) { |
| Iterator it = selectorRegistrations.iterator(); |
| while (it.hasNext()) { |
| ServerConnection sc = (ServerConnection) it.next(); |
| if (isRegisteredObjectClosed(sc)) { |
| result--; |
| it.remove(); |
| try { |
| finishCon(sc); |
| } catch (CancelException e) { |
| if (cce == null) { |
| cce = e; |
| } |
| } |
| } |
| } // while |
| } |
| if (cce != null) { |
| throw cce; |
| } |
| return result; |
| } |
| |
| private static final boolean WORKAROUND_SELECTOR_BUG = |
| Boolean.getBoolean("CacheServer.NIO_SELECTOR_WORKAROUND"); |
| |
| private Selector tmpSel; |
| |
| private void checkForStuckKeys() { |
| if (!WORKAROUND_SELECTOR_BUG) { |
| return; |
| } |
| if (tmpSel == null) { |
| try { |
| tmpSel = Selector.open(); |
| } catch (IOException ignore) { |
| logger.warn("Could not check for stuck keys.", ignore); |
| return; |
| } |
| |
| } |
| // logger.info("DEBUG: checking for stuck keys"); |
| for (SelectionKey sk : new ArrayList<>(selector.keys())) { |
| ServerConnection sc = (ServerConnection) sk.attachment(); |
| if (sc == null) { |
| continue; |
| } |
| try { |
| sk.cancel(); |
| selector.selectNow(); // clear the cancelled key |
| SelectionKey tmpsk = sc.getSelectableChannel().register(tmpSel, |
| SelectionKey.OP_WRITE | SelectionKey.OP_READ); |
| try { |
| // it should always be writable |
| int events = tmpSel.selectNow(); |
| if (events == 0) { |
| logger.info("stuck selection key detected on {}", sc); |
| tmpsk.cancel(); |
| tmpSel.selectNow(); // clear canceled key |
| sc.registerWithSelector2(selector); |
| } else { |
| if (tmpsk.isValid() && tmpsk.isReadable()) { |
| try { |
| tmpsk.cancel(); |
| tmpSel.selectNow(); // clear canceled key |
| selectorRegistrations.remove(sc); |
| registeredKeys--; |
| sc.makeBlocking(); |
| // we need to say we are processing a message |
| // so that that client health monitor will not |
| // kill us while we wait for a thread in the thread pool. |
| // This is also be used to determine how long we are |
| // in the thread pool queue and to cancel operations that |
| // have waited too long in the queue. |
| sc.setProcessingMessage(); |
| } catch (ClosedChannelException ignore) { |
| finishCon(sc); |
| continue; |
| } catch (IOException ex) { |
| finishCon(sc); |
| if (isRunning()) { |
| logger.warn("Unexpected Exception:", ex); |
| } |
| continue; |
| } |
| try { |
| stats.incThreadQueueSize(); |
| pool.execute(sc); |
| } catch (RejectedExecutionException rejected) { |
| finishCon(sc); |
| stats.decThreadQueueSize(); |
| if (!isRunning()) { |
| break; |
| } |
| logger.warn("Unexpected Exception:", rejected); |
| } |
| } else if (tmpsk.isValid() && tmpsk.isWritable()) { |
| // this is expected |
| tmpsk.cancel(); |
| tmpSel.selectNow(); // clear canceled key |
| sc.registerWithSelector2(selector); |
| } else if (!tmpsk.isValid()) { |
| tmpsk.cancel(); |
| tmpSel.selectNow(); // clear canceled key |
| sc.registerWithSelector2(selector); |
| } |
| } |
| } catch (IOException ex) { |
| if (isRunning() && selector.isOpen() && tmpSel.isOpen()) { |
| logger.warn("Unexpected Exception:", ex); |
| try { |
| tmpsk.cancel(); |
| tmpSel.selectNow(); // clear canceled key |
| } catch (IOException ex2) { |
| if (isRunning() && selector.isOpen() && tmpSel.isOpen()) { |
| logger.warn("Unexpected Exception:", ex2); |
| } |
| } |
| } |
| } |
| } catch (ClosedChannelException ignore) { // fix for bug 39650 |
| // just ignore this channel and try the next one |
| finishCon(sc); |
| } catch (IOException | NullPointerException ex) { |
| if (isRunning() && selector.isOpen() && tmpSel.isOpen()) { |
| logger.warn("Unexpected Exception:", ex); |
| } |
| } // fix bug 39644 |
| |
| } |
| } |
| |
| private int registeredKeys; |
| |
| private void runSelectorLoop() { |
| // int zeroEventsCount = 0; |
| try { |
| long lastCheckedTime = System.nanoTime(); |
| logger.info("SELECTOR enabled"); |
| while (selector.isOpen() && !Thread.currentThread().isInterrupted()) { |
| { |
| SystemFailure.checkFailure(); |
| if (cache.isClosed()) { // bug 38834 |
| break; // TODO should just ask cache's CancelCriterion |
| } |
| if (cache.getCancelCriterion().isCancelInProgress()) { |
| break; |
| } |
| long delta = System.nanoTime() - lastCheckedTime; |
| if (checkRegisteredKeysInterval == 0 || delta >= checkRegisteredKeysInterval) { |
| registeredKeys = checkRegisteredKeys(registeredKeys); |
| lastCheckedTime = System.nanoTime(); |
| } |
| ServerConnection sc; |
| if (registeredKeys == 0) { |
| // do blocking wait on queue until we get some keys registered |
| // with the selector |
| sc = selectorQueue.take(); |
| } else { |
| // we already have some keys registered so just do a poll on queue |
| sc = selectorQueue.poll(); |
| } |
| while (sc != null) { |
| try { |
| sc.registerWithSelector2(selector); |
| registeredKeys++; |
| selectorRegistrations.add(sc); |
| } catch (ClosedChannelException cce) { |
| // for bug bug 38474 |
| finishCon(sc); |
| } catch (RuntimeException ex) { |
| finishCon(sc); |
| logger.warn("ignoring", ex); |
| } |
| sc = selectorQueue.poll(); |
| } |
| } |
| if (registeredKeys == 0) { |
| continue; |
| } |
| int events = selector.select(); |
| // select() could have returned due to wakeup() during close of cache |
| if (cache.getCancelCriterion().isCancelInProgress()) { |
| break; |
| } |
| if (events == 0) { |
| checkForStuckKeys(); |
| } |
| while (events > 0) { |
| Set sk = selector.selectedKeys(); |
| if (sk == null) { |
| // something really bad has happened I'm not even sure this is possible |
| // but lhughes so an NPE during close one time so perhaps it can happen |
| // during selector close. |
| events = 0; |
| break; |
| } |
| Iterator keysIterator = sk.iterator(); |
| int cancelCount = 0; |
| while (keysIterator.hasNext()) { |
| SelectionKey key = (SelectionKey) keysIterator.next(); |
| // Remove the key from the selector's selectedKeys |
| keysIterator.remove(); |
| final ServerConnection sc = (ServerConnection) key.attachment(); |
| try { |
| if (key.isValid() && key.isReadable()) { |
| // this is the only event we currently register for |
| try { |
| key.cancel(); |
| selectorRegistrations.remove(sc); |
| registeredKeys--; |
| cancelCount++; |
| sc.makeBlocking(); |
| // we need to say we are processing a message |
| // so that that client health monitor will not |
| // kill us while we wait for a thread in the thread pool. |
| // This is also be used to determine how long we are |
| // in the thread pool queue and to cancel operations that |
| // have waited too long in the queue. |
| sc.setProcessingMessage(); |
| } catch (ClosedChannelException ignore) { |
| finishCon(sc); |
| continue; |
| } catch (IOException ex) { |
| finishCon(sc); |
| if (isRunning()) { |
| logger.warn("unexpected", ex); |
| } |
| continue; |
| } |
| try { |
| stats.incThreadQueueSize(); |
| pool.execute(sc); |
| } catch (RejectedExecutionException rejected) { |
| finishCon(sc); |
| stats.decThreadQueueSize(); |
| if (!isRunning()) { |
| break; |
| } |
| logger.warn("unexpected", rejected); |
| } |
| } else { |
| finishCon(sc); |
| if (key.isValid()) { |
| logger.warn("ignoring event on selector key {}", key); |
| } |
| } |
| } catch (CancelledKeyException ex) { // fix for bug 37739 |
| finishCon(sc); |
| } |
| } |
| if (cancelCount > 0 && selector.isOpen()) { |
| // we need to do a select to cause the cancel to be unregisters. |
| events = selector.selectNow(); |
| } else { |
| events = 0; |
| } |
| } |
| } |
| } catch (InterruptedException ex) { |
| // allow this thread to die |
| Thread.currentThread().interrupt(); |
| } catch (ClosedSelectorException ex) { |
| // allow this thread to exit |
| } catch (IOException ex) { |
| logger.warn("unexpected", ex); |
| } finally { |
| try { |
| drainSelectorQueue(); |
| } finally { |
| // note that if this method was called by close then the |
| // following call is a noop since the first thing it does |
| // is call isRunning. |
| close(); // make sure this is called to fix bug 37749 |
| } |
| } |
| } |
| |
| @Override |
| public int getPort() { |
| return localPort; |
| } |
| |
| @Override |
| public String getServerName() { |
| String name = serverSock.getLocalSocketAddress().toString(); |
| try { |
| name = LocalHostUtil.getCanonicalLocalHostName() + "-" + name; |
| } catch (Exception e) { |
| } |
| return name; |
| } |
| |
| @Override |
| public InetAddress getServerInetAddress() { |
| return serverSock.getInetAddress(); |
| } |
| |
| /** |
| * The work loop of this acceptor |
| * |
| * @see #accept |
| */ |
| @Override |
| public void run() { |
| try { |
| accept(); |
| } catch (CancelException e) { // bug 39462 |
| // ignore |
| } finally { |
| try { |
| if (serverSock != null) { |
| serverSock.close(); |
| } |
| } catch (IOException ignore) { |
| } |
| if (stats != null) { |
| stats.close(); |
| } |
| } |
| } |
| |
| private Selector getSelector() { |
| return selector; |
| } |
| |
| private BlockingQueue<ServerConnection> getSelectorQueue() { |
| return selectorQueue; |
| } |
| |
| private boolean loggedAcceptError; |
| |
| private static void closeSocket(Socket s) { |
| if (s != null) { |
| try { |
| s.close(); |
| } catch (IOException ignore) { |
| } |
| } |
| } |
| |
| /** |
| * {@linkplain ServerSocket#accept Listens}for a client to connect and then creates a new |
| * {@link ServerConnection}to handle messages from that client. |
| */ |
| @Override |
| public void accept() { |
| while (isRunning()) { |
| if (SystemFailure.getFailure() != null) { |
| // Allocate no objects here! |
| ServerSocket s = serverSock; |
| if (s != null) { |
| try { |
| s.close(); |
| } catch (IOException e) { |
| // don't care |
| } |
| } |
| SystemFailure.checkFailure(); // throws |
| } |
| // moved this check out of the try. If we are cancelled then we need |
| // to break out of this while loop. |
| |
| crHelper.checkCancelInProgress(null); // throws |
| |
| Socket socket = null; |
| try { |
| socket = serverSock.accept(); |
| crHelper.checkCancelInProgress(null); // throws |
| |
| // Optionally enable SO_KEEPALIVE in the OS network protocol. |
| socket.setKeepAlive(SocketCreator.ENABLE_TCP_KEEP_ALIVE); |
| |
| // The synchronization below was added to prevent close from being |
| // called |
| // while a ServerConnection is being instantiated. This should prevent |
| // the |
| // following exception: |
| // [severe 2004/12/15 18:49:17.671 PST gemfire2 Server connection from |
| // balrog.gemstone.com:58478-0x6ce1 nid=0x1334aa] Uncaught exception in |
| // thread Server connection from balrog.gemstone.com:58478 |
| // java.lang.NullPointerException |
| // at |
| // org.apache.geode.internal.cache.tier.sockets.ServerConnection.run(ServerConnection.java:107) |
| |
| synchronized (syncLock) { |
| if (!isRunning()) { |
| closeSocket(socket); |
| break; |
| } |
| } |
| loggedAcceptError = false; |
| |
| handOffNewClientConnection(socket, serverConnectionFactory); |
| } catch (InterruptedIOException e) { // Solaris only |
| closeSocket(socket); |
| if (isRunning()) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("Aborted due to interrupt: {}", e); |
| } |
| } |
| } catch (IOException e) { |
| closeSocket(socket); |
| if (isRunning()) { |
| if (!loggedAcceptError) { |
| loggedAcceptError = true; |
| logger.error("Cache server: Unexpected IOException from accept", e); |
| } |
| // Why sleep? |
| // try {Thread.sleep(3000);} catch (InterruptedException ie) {} |
| } |
| } catch (CancelException e) { |
| closeSocket(socket); |
| throw e; |
| } catch (Exception e) { |
| closeSocket(socket); |
| if (isRunning()) { |
| logger.fatal("Cache server: Unexpected Exception", e); |
| } |
| } |
| } |
| } |
| |
| /** |
| * Hand off a new client connection to the thread pool that processes handshakes. If all the |
| * threads in this pool are busy then the hand off will block until a thread is available. This |
| * blocking is good because it will throttle the rate at which we create new connections. |
| */ |
| private void handOffNewClientConnection(final Socket socket, |
| final ServerConnectionFactory serverConnectionFactory) { |
| try { |
| stats.incAcceptsInProgress(); |
| hsPool.execute(() -> { |
| boolean finished = false; |
| try { |
| handleNewClientConnection(socket, serverConnectionFactory); |
| finished = true; |
| } catch (RegionDestroyedException rde) { |
| // aborted due to disconnect - bug 42273 |
| if (!rde.getMessage().contains("HARegion")) { |
| throw rde; |
| } |
| } catch (CancelException e) { |
| // aborted due to shutdown - bug 37318 |
| } catch (AsynchronousCloseException expected) { |
| // this is expected when our TimerTask times out an accepted socket |
| } catch (IOException | ToDataException ex) { // added ToDataException to fix bug 44659 |
| if (isRunning()) { |
| if (!loggedAcceptError) { |
| loggedAcceptError = true; |
| if (ex instanceof SocketTimeoutException) { |
| logger.warn( |
| "Cache server: failed accepting client connection due to socket timeout."); |
| } else { |
| logger.warn("Cache server: failed accepting client connection " + |
| ex, |
| ex); |
| } |
| } |
| } |
| } finally { |
| if (!finished) { |
| closeSocket(socket); |
| } |
| if (isRunning()) { |
| stats.decAcceptsInProgress(); |
| } |
| } |
| }); |
| } catch (RejectedExecutionException rejected) { |
| closeSocket(socket); |
| if (isRunning()) { |
| stats.decAcceptsInProgress(); |
| logger.warn("unexpected", rejected); |
| } |
| } |
| } |
| |
| private ByteBuffer takeCommBuffer() { |
| ByteBuffer result = commBufferQueue.poll(); |
| if (result == null) { |
| result = ByteBuffer.allocateDirect(socketBufferSize); |
| } |
| return result; |
| } |
| |
| private void releaseCommBuffer(ByteBuffer bb) { |
| if (bb == null) { // fix for bug 37107 |
| return; |
| } |
| if (isRunning()) { |
| commBufferQueue.offer(bb); |
| } |
| } |
| |
| private void incClientServerCnxCount() { |
| clientServerCnxCount.incrementAndGet(); |
| } |
| |
| @Override |
| public void decClientServerConnectionCount() { |
| clientServerCnxCount.decrementAndGet(); |
| } |
| |
| @Override |
| public int getClientServerConnectionCount() { |
| return clientServerCnxCount.get(); |
| } |
| |
| private boolean isNotifyBySubscription() { |
| return notifyBySubscription; |
| } |
| |
| private void handleNewClientConnection(final Socket socket, |
| final ServerConnectionFactory serverConnectionFactory) throws IOException { |
| // Read the first byte. If this socket is being used for 'client to server' |
| // communication, create a ServerConnection. If this socket is being used |
| // for 'server to client' communication, send it to the CacheClientNotifier |
| // for processing. |
| final CommunicationMode communicationMode; |
| try { |
| if (isSelector()) { |
| communicationMode = getCommunicationModeForSelector(socket); |
| } else { |
| communicationMode = getCommunicationModeForNonSelector(socket); |
| } |
| socket.setTcpNoDelay(tcpNoDelay); |
| |
| } catch (IllegalArgumentException e) { |
| // possible if a client uses SSL & the server isn't configured to use SSL, |
| // or if an invalid communication communication mode byte is sent. |
| logger.warn("Error processing client connection", e); |
| throw new EOFException(); |
| } |
| |
| // GEODE-3637 - If the communicationMode is client Subscriptions, hand-off the client queue |
| // initialization to be done in another threadPool |
| if (handOffQueueInitialization(socket, communicationMode)) { |
| return; |
| } |
| |
| logger.debug("cache server: Initializing {} communication socket: {}", communicationMode, |
| socket); |
| boolean notForQueue = communicationMode != ClientToServerForQueue; |
| if (notForQueue) { |
| int curCnt = getClientServerConnectionCount(); |
| if (curCnt >= maxConnections) { |
| logger.warn( |
| "Rejected connection from {} because current connection count of {} is greater than or equal to the configured max of {}", |
| new Object[] {socket.getInetAddress(), curCnt, |
| maxConnections}); |
| if (communicationMode.expectsConnectionRefusalMessage()) { |
| try { |
| refuseHandshake(socket.getOutputStream(), |
| String.format("exceeded max-connections %s", |
| maxConnections), |
| REPLY_REFUSED); |
| } catch (Exception ex) { |
| logger.debug("rejection message failed", ex); |
| } |
| } |
| closeSocket(socket); |
| return; |
| } |
| } |
| |
| ServerConnection serverConn = |
| serverConnectionFactory.makeServerConnection(socket, cache, crHelper, stats, |
| handshakeTimeout, socketBufferSize, communicationMode.toString(), |
| communicationMode.getModeNumber(), this, securityService); |
| |
| synchronized (allSCsLock) { |
| allSCs.add(serverConn); |
| ServerConnection[] snap = allSCList; // avoid volatile read |
| allSCList = (ServerConnection[]) ArrayUtils.insert(snap, snap.length, serverConn); |
| } |
| if (notForQueue) { |
| incClientServerCnxCount(); |
| } |
| if (isSelector()) { |
| serverConn.registerWithSelector(); |
| } else { |
| try { |
| pool.execute(serverConn); |
| } catch (RejectedExecutionException rejected) { |
| if (!isRunning()) { |
| return; |
| } |
| logger.warn( |
| "Rejected connection from {} because incoming request was rejected by pool possibly due to thread exhaustion", |
| serverConn); |
| try { |
| refuseHandshake(socket.getOutputStream(), |
| String.format("exceeded max-connections %s", |
| maxConnections), |
| REPLY_REFUSED); |
| |
| } catch (Exception ex) { |
| logger.debug("rejection message failed", ex); |
| } |
| serverConn.cleanup(); |
| } |
| } |
| } |
| |
| @Override |
| public void refuseHandshake(OutputStream out, String message, byte exception) throws IOException { |
| try (HeapDataOutputStream hdos = new HeapDataOutputStream(32, KnownVersion.CURRENT)) { |
| DataOutputStream dos = new DataOutputStream(hdos); |
| // Write refused reply |
| dos.writeByte(exception); |
| |
| // write dummy endpointType |
| dos.writeByte(0); |
| // write dummy queueSize |
| dos.writeInt(0); |
| |
| // Write the server's member |
| DistributedMember member = InternalDistributedSystem.getAnyInstance().getDistributedMember(); |
| HeapDataOutputStream memberDos = new HeapDataOutputStream(KnownVersion.CURRENT); |
| DataSerializer.writeObject(member, memberDos); |
| DataSerializer.writeByteArray(memberDos.toByteArray(), dos); |
| memberDos.close(); |
| |
| // Write the refusal message |
| if (message == null) { |
| message = ""; |
| } |
| dos.writeUTF(message); |
| |
| // Write dummy delta-propagation property value. This will never be read at |
| // receiver because the exception byte above will cause the receiver code |
| // throw an exception before the below byte could be read. |
| dos.writeBoolean(Boolean.TRUE); |
| |
| out.write(hdos.toByteArray()); |
| } |
| out.flush(); |
| } |
| |
| private boolean handOffQueueInitialization(Socket socket, CommunicationMode communicationMode) { |
| if (communicationMode.isSubscriptionFeed()) { |
| boolean isPrimaryServerToClient = |
| communicationMode == CommunicationMode.PrimaryServerToClient; |
| clientQueueInitPool |
| .execute(new ClientQueueInitializerTask(socket, isPrimaryServerToClient, this)); |
| return true; |
| } |
| return false; |
| } |
| |
| private CommunicationMode getCommunicationModeForNonSelector(Socket socket) throws IOException { |
| socket.setSoTimeout(0); |
| socketCreator.forCluster().handshakeIfSocketIsSSL(socket, acceptTimeout); |
| byte communicationModeByte = (byte) socket.getInputStream().read(); |
| if (communicationModeByte == -1) { |
| throw new EOFException(); |
| } |
| return CommunicationMode.fromModeNumber(communicationModeByte); |
| } |
| |
| private CommunicationMode getCommunicationModeForSelector(Socket socket) throws IOException { |
| ByteBuffer byteBuffer = ByteBuffer.allocateDirect(1); |
| final SocketChannel socketChannel = socket.getChannel(); |
| socketChannel.configureBlocking(false); |
| // try to read the byte first in non-blocking mode |
| int res = socketChannel.read(byteBuffer); |
| socketChannel.configureBlocking(true); |
| if (res < 0) { |
| throw new EOFException(); |
| } |
| if (res == 0) { |
| // now do a blocking read so setup a timer to close the socket if the |
| // the read takes too long |
| SystemTimer.SystemTimerTask timerTask = new SystemTimer.SystemTimerTask() { |
| @Override |
| public void run2() { |
| logger.warn("Cache server: timed out waiting for handshake from {}", |
| socket.getRemoteSocketAddress()); |
| closeSocket(socket); |
| } |
| }; |
| hsTimer.schedule(timerTask, acceptTimeout); |
| res = socketChannel.read(byteBuffer); |
| if (!timerTask.cancel() || res <= 0) { |
| throw new EOFException(); |
| } |
| } |
| return CommunicationMode.fromModeNumber(byteBuffer.get(0)); |
| } |
| |
| @Override |
| public boolean isRunning() { |
| return !shutdownStarted; |
| } |
| |
| @Override |
| public void close() { |
| try { |
| synchronized (syncLock) { |
| if (!isRunning()) { |
| return; |
| } |
| shutdownStarted = true; |
| logger.info("Cache server on port {} is shutting down.", localPort); |
| if (thread != null) { |
| thread.interrupt(); |
| } |
| try { |
| serverSock.close(); |
| } catch (IOException ignore) { |
| } |
| |
| crHelper.setShutdown(true); // set this before shutting down the pool |
| shutdownSelectorIfIsSelector(); |
| ClientHealthMonitor.shutdownInstance(); |
| shutdownSCs(); |
| clientNotifier.shutdown(acceptorId); |
| shutdownPools(); |
| stats.close(); |
| if (!cache.isClosed()) { |
| // the cache isn't closing so we need to inform peers that this CacheServer no longer |
| // exists |
| notifyCacheMembersOfClose(); |
| } |
| } // synchronized |
| } catch (RuntimeException e) {/* ignore and log */ |
| logger.warn("unexpected", e); |
| } |
| } |
| |
| @Override |
| public void notifyCacheMembersOfClose() { |
| if (logger.isDebugEnabled()) { |
| logger.debug("sending messages to all peers for removing this server.."); |
| } |
| for (PartitionedRegion pr : cache.getPartitionedRegions()) { |
| Map<Integer, BucketAdvisor.BucketProfile> profiles = new HashMap<>(); |
| // get all local real bucket advisors |
| Map<Integer, BucketAdvisor> advisors = pr.getRegionAdvisor().getAllBucketAdvisors(); |
| for (Map.Entry<Integer, BucketAdvisor> entry : advisors.entrySet()) { |
| BucketAdvisor advisor = entry.getValue(); |
| BucketProfile bp = (BucketProfile) advisor.createProfile(); |
| advisor.updateServerBucketProfile(bp); |
| profiles.put(entry.getKey(), bp); |
| } |
| |
| Set recipients = pr.getRegionAdvisor().adviseAllPRNodes(); |
| // send it to all in one message |
| ReplyProcessor21 reply = AllBucketProfilesUpdateMessage.send(recipients, |
| pr.getDistributionManager(), pr.getPRId(), profiles); |
| if (reply != null) { |
| reply.waitForRepliesUninterruptibly(); |
| } |
| } |
| } |
| |
| private void shutdownSelectorIfIsSelector() { |
| if (isSelector()) { |
| hsTimer.cancel(); |
| if (tmpSel != null) { |
| try { |
| tmpSel.close(); |
| } catch (IOException ignore) { |
| } |
| } |
| try { |
| wakeupSelector(); |
| selector.close(); |
| } catch (IOException ignore) { |
| } |
| if (selectorThread != null) { |
| selectorThread.interrupt(); |
| } |
| commBufferQueue.clear(); |
| } |
| } |
| |
| private void shutdownPools() { |
| pool.shutdown(); |
| try { |
| if (!pool.awaitTermination(PoolImpl.SHUTDOWN_TIMEOUT, MILLISECONDS)) { |
| logger.warn("Timeout waiting for background tasks to complete."); |
| pool.shutdownNow(); |
| } |
| } catch (InterruptedException ignore) { |
| Thread.currentThread().interrupt(); |
| pool.shutdownNow(); |
| } |
| clientQueueInitPool.shutdown(); |
| hsPool.shutdown(); |
| } |
| |
| private void shutdownSCs() { |
| // added to fix part 2 of bug 37351. |
| synchronized (allSCsLock) { |
| ServerConnection[] snap = allSCList; |
| for (ServerConnection serverConnection : snap) { |
| serverConnection.cleanup(); |
| } |
| } |
| } |
| |
| boolean isShutdownProperly() { |
| return !isRunning() && !thread.isAlive() |
| && (selectorThread == null || !selectorThread.isAlive()) |
| && (pool == null || pool.isShutdown()) && (hsPool == null || hsPool.isShutdown()) |
| && (clientQueueInitPool == null || clientQueueInitPool.isShutdown()) |
| && (selector == null || !selector.isOpen()) && (tmpSel == null || !tmpSel.isOpen()); |
| } |
| |
| /** |
| * @param bindName the ip address or host name that this acceptor should bind to. If null or "" |
| * then calculate it. |
| * @return the ip address or host name this acceptor will listen on. An "" if all local addresses |
| * will be listened to. |
| * @since GemFire 5.7 |
| */ |
| private static String calcBindHostName(InternalCache cache, String bindName) { |
| if (bindName != null && !bindName.isEmpty()) { |
| return bindName; |
| } |
| |
| InternalDistributedSystem system = cache.getInternalDistributedSystem(); |
| DistributionConfig config = system.getConfig(); |
| String hostName = null; |
| |
| // Get the server-bind-address. If it is not null, use it. |
| // If it is null, get the bind-address. If it is not null, use it. |
| // Otherwise set default. |
| String serverBindAddress = config.getServerBindAddress(); |
| if (serverBindAddress != null && !serverBindAddress.isEmpty()) { |
| hostName = serverBindAddress; |
| } else { |
| String bindAddress = config.getBindAddress(); |
| if (bindAddress != null && !bindAddress.isEmpty()) { |
| hostName = bindAddress; |
| } |
| } |
| return hostName; |
| } |
| |
| private InetAddress getBindAddress() throws IOException { |
| if (bindHostName == null || bindHostName.isEmpty()) { |
| return null; // pick default local address |
| } |
| return InetAddress.getByName(bindHostName); |
| } |
| |
| /** |
| * Gets the address that this cache server can be contacted on from external processes. |
| * |
| * @since GemFire 5.7 |
| */ |
| @Override |
| public String getExternalAddress() { |
| String result = bindHostName; |
| boolean needCanonicalHostName = false; |
| if (result == null || result.isEmpty()) { |
| needCanonicalHostName = true; |
| } else { |
| // check to see if we are listening on all local addresses |
| ServerSocket ss = serverSock; |
| if (ss != null && ss.getLocalSocketAddress() instanceof InetSocketAddress) { |
| InetSocketAddress isa = (InetSocketAddress) ss.getLocalSocketAddress(); |
| InetAddress ssAddr = isa.getAddress(); |
| if (ssAddr != null) { |
| if (ssAddr.isAnyLocalAddress()) { |
| needCanonicalHostName = true; |
| } |
| } |
| } |
| } |
| if (needCanonicalHostName) { |
| try { |
| result = LocalHostUtil.getCanonicalLocalHostName(); |
| } catch (UnknownHostException ex) { |
| throw new IllegalStateException("getLocalHost failed with " + ex); |
| } |
| } |
| return result; |
| } |
| |
| /** |
| * This method finds a client notifier and returns it. It is used to propagate interest |
| * registrations to other servers |
| * |
| * @return the instance that provides client notification |
| */ |
| @Override |
| public CacheClientNotifier getCacheClientNotifier() { |
| return clientNotifier; |
| } |
| |
| @Override |
| public CachedRegionHelper getCachedRegionHelper() { |
| return crHelper; |
| } |
| |
| @Override |
| public ClientHealthMonitor getClientHealthMonitor() { |
| return healthMonitor; |
| } |
| |
| @Override |
| public ConnectionListener getConnectionListener() { |
| return connectionListener; |
| } |
| |
| @Override |
| public boolean isGatewayReceiver() { |
| return isGatewayReceiver; |
| } |
| |
| public List<GatewayTransportFilter> getGatewayTransportFilters() { |
| return gatewayTransportFilters; |
| } |
| |
| static boolean isAuthenticationRequired() { |
| return isAuthenticationRequired; |
| } |
| |
| static boolean isPostAuthzCallbackPresent() { |
| return isPostAuthzCallbackPresent; |
| } |
| |
| @Override |
| public Set<ServerConnection> getAllServerConnections() { |
| return Collections.unmodifiableSet(allSCs); |
| } |
| |
| /** |
| * This method returns a thread safe structure which can be iterated over without worrying about |
| * ConcurrentModificationException. JMX MBeans/Commands need to iterate over this list to get |
| * client info. |
| */ |
| @Override |
| public ServerConnection[] getAllServerConnectionList() { |
| return allSCList; |
| } |
| |
| @Override |
| public void setTLCommBuffer() { |
| // The thread local will only be set if maxThreads has been set. |
| if (!isSelector()) { |
| return; |
| } |
| |
| Message.setTLCommBuffer(takeCommBuffer()); |
| } |
| |
| @Override |
| public void releaseTLCommBuffer() { |
| if (!isSelector()) { |
| return; |
| } |
| |
| releaseCommBuffer(Message.setTLCommBuffer(null)); |
| } |
| |
| @Override |
| public int getMaximumTimeBetweenPings() { |
| return maximumTimeBetweenPings; |
| } |
| |
| private static class ClientQueueInitializerTask implements Runnable { |
| private final Socket socket; |
| private final boolean isPrimaryServerToClient; |
| private final AcceptorImpl acceptor; |
| |
| ClientQueueInitializerTask(Socket socket, boolean isPrimaryServerToClient, |
| AcceptorImpl acceptor) { |
| this.socket = socket; |
| this.acceptor = acceptor; |
| this.isPrimaryServerToClient = isPrimaryServerToClient; |
| } |
| |
| @Override |
| public void run() { |
| logger.info(":Cache server: Initializing {} server-to-client communication socket: {}", |
| isPrimaryServerToClient ? "primary" : "secondary", socket); |
| try { |
| ClientRegistrationMetadata clientRegistrationMetadata = |
| new ClientRegistrationMetadata(acceptor.cache, socket); |
| |
| if (clientRegistrationMetadata.initialize()) { |
| acceptor.getCacheClientNotifier().registerClient(clientRegistrationMetadata, socket, |
| isPrimaryServerToClient, |
| acceptor.getAcceptorId(), acceptor.isNotifyBySubscription()); |
| } |
| } catch (final IOException ex) { |
| closeSocket(socket); |
| if (acceptor.isRunning()) { |
| if (!acceptor.loggedAcceptError) { |
| acceptor.loggedAcceptError = true; |
| if (ex instanceof SocketTimeoutException) { |
| logger |
| .warn("Cache server: failed accepting client connection due to socket timeout."); |
| } else { |
| logger.warn("Cache server: failed accepting client connection " + |
| ex, |
| ex); |
| } |
| } |
| } |
| } |
| } |
| } |
| } |