| /*========================================================================= |
| * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved. |
| * This product is protected by U.S. and international copyright |
| * and intellectual property laws. Pivotal products are covered by |
| * more patents listed at http://www.pivotal.io/patents. |
| *========================================================================= |
| */ |
| |
| package com.gemstone.gemfire.internal.cache.tier.sockets; |
| |
| import java.io.EOFException; |
| import java.io.IOException; |
| import java.io.InterruptedIOException; |
| import java.io.PrintWriter; |
| import java.io.StringWriter; |
| import java.net.BindException; |
| import java.net.Inet6Address; |
| import java.net.InetAddress; |
| import java.net.InetSocketAddress; |
| import java.net.ServerSocket; |
| import java.net.Socket; |
| import java.net.SocketException; |
| import java.net.SocketTimeoutException; |
| import java.net.UnknownHostException; |
| import java.nio.ByteBuffer; |
| import java.nio.channels.CancelledKeyException; |
| import java.nio.channels.ClosedChannelException; |
| import java.nio.channels.ClosedSelectorException; |
| import java.nio.channels.SelectionKey; |
| import java.nio.channels.Selector; |
| import java.nio.channels.ServerSocketChannel; |
| import java.nio.channels.SocketChannel; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.concurrent.BlockingQueue; |
| import java.util.concurrent.LinkedBlockingQueue; |
| import java.util.concurrent.RejectedExecutionException; |
| import java.util.concurrent.RejectedExecutionHandler; |
| import java.util.concurrent.SynchronousQueue; |
| import java.util.concurrent.ThreadFactory; |
| import java.util.concurrent.ThreadPoolExecutor; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicInteger; |
| |
| import javax.net.ssl.SSLException; |
| |
| import org.apache.logging.log4j.Logger; |
| |
| import com.gemstone.gemfire.CancelException; |
| import com.gemstone.gemfire.SystemFailure; |
| import com.gemstone.gemfire.ToDataException; |
| import com.gemstone.gemfire.cache.Cache; |
| import com.gemstone.gemfire.cache.RegionDestroyedException; |
| import com.gemstone.gemfire.cache.client.internal.PoolImpl; |
| import com.gemstone.gemfire.cache.util.BridgeServer; |
| import com.gemstone.gemfire.cache.wan.GatewayTransportFilter; |
| import com.gemstone.gemfire.distributed.internal.DM; |
| import com.gemstone.gemfire.distributed.internal.DistributionConfig; |
| import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem; |
| import com.gemstone.gemfire.distributed.internal.LonerDistributionManager; |
| import com.gemstone.gemfire.distributed.internal.PooledExecutorWithDMStats; |
| import com.gemstone.gemfire.distributed.internal.ReplyProcessor21; |
| import com.gemstone.gemfire.internal.SocketCreator; |
| import com.gemstone.gemfire.internal.SocketUtils; |
| import com.gemstone.gemfire.internal.SystemTimer; |
| import com.gemstone.gemfire.internal.cache.BucketAdvisor; |
| import com.gemstone.gemfire.internal.cache.BucketAdvisor.BucketProfile; |
| import com.gemstone.gemfire.internal.cache.GemFireCacheImpl; |
| import com.gemstone.gemfire.internal.cache.InternalCache; |
| import com.gemstone.gemfire.internal.cache.PartitionedRegion; |
| import com.gemstone.gemfire.internal.cache.partitioned.AllBucketProfilesUpdateMessage; |
| import com.gemstone.gemfire.internal.cache.tier.Acceptor; |
| import com.gemstone.gemfire.internal.cache.tier.CachedRegionHelper; |
| import com.gemstone.gemfire.internal.cache.wan.GatewayReceiverStats; |
| import com.gemstone.gemfire.internal.i18n.LocalizedStrings; |
| import com.gemstone.gemfire.internal.logging.LogService; |
| import com.gemstone.gemfire.internal.logging.LoggingThreadGroup; |
| import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage; |
| import com.gemstone.gemfire.internal.tcp.ConnectionTable; |
| import com.gemstone.gemfire.internal.util.ArrayUtils; |
| |
| import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; |
| |
| /** |
| * Implements the acceptor thread on the bridge server. Accepts connections from |
| * the edge and starts up threads to process requests from these. |
| * |
| * @see com.gemstone.gemfire.cache.util.BridgeServer |
| * |
| * @author Sudhir Menon |
| * @since 2.0.2 |
| */ |
| @SuppressWarnings("deprecation") |
| public class AcceptorImpl extends Acceptor implements Runnable |
| { |
| private static final Logger logger = LogService.getLogger(); |
| |
| private static final boolean isJRockit = System.getProperty("java.vm.name").contains("JRockit"); |
| |
| protected final CacheServerStats stats; |
| private final int maxConnections; |
| private final int maxThreads; |
| |
| private final ThreadPoolExecutor pool; |
| /** |
| * A pool used to process handshakes. |
| */ |
| private final ThreadPoolExecutor hsPool; |
| |
| /** The port on which this acceptor listens for client connections */ |
| private final int localPort; |
| |
| /** The server socket that handles requests for connections */ |
| private ServerSocket serverSock = null; |
| |
| /** The GemFire cache served up by this acceptor */ |
| protected final InternalCache cache; |
| |
| /** Caches region information */ |
| private final CachedRegionHelper crHelper; |
| |
| /** A lock to prevent close from occurring while creating a ServerConnection */ |
| private final Object syncLock = new Object(); |
| |
| /** |
| * THE selector for the bridge server; null if no selector. |
| */ |
| private final Selector selector; |
| //private final Selector tmpSel; |
| /** |
| * Used for managing direct byte buffer for client comms; null if no selector. |
| */ |
| private final LinkedBlockingQueue commBufferQueue; |
| /** |
| * Used to timeout accepted sockets that we are waiting for the handshake packet |
| */ |
| private final SystemTimer hsTimer; |
| /** |
| * A queue used to feed register requests to the selector; null if no selector. |
| */ |
| private final LinkedBlockingQueue selectorQueue; |
| /** |
| * All the objects currently registered with selector. |
| */ |
| private final HashSet selectorRegistrations; |
| /** |
| * tcpNoDelay setting for outgoing sockets |
| */ |
| private final boolean tcpNoDelay; |
| |
| /** |
| * The name of a system property that sets the hand shake timeout (in |
| * milliseconds). This is how long a client will wait to hear back from |
| * a server. |
| */ |
| public static final String HANDSHAKE_TIMEOUT_PROPERTY_NAME = "BridgeServer.handShakeTimeout"; |
| |
| /** |
| * The default value of the {@link #HANDSHAKE_TIMEOUT_PROPERTY_NAME} system |
| * property. |
| */ |
| public static final int DEFAULT_HANDSHAKE_TIMEOUT_MS = 59000; |
| |
| /** Test value for handshake timeout */ |
| protected static final int handShakeTimeout = Integer.getInteger(HANDSHAKE_TIMEOUT_PROPERTY_NAME, DEFAULT_HANDSHAKE_TIMEOUT_MS).intValue(); |
| |
| /** |
| * The name of a system property that sets the accept timeout (in |
| * milliseconds). This is how long a server will wait to get its first |
| * byte from a client it has just accepted. |
| */ |
| public static final String ACCEPT_TIMEOUT_PROPERTY_NAME = "BridgeServer.acceptTimeout"; |
| |
| /** |
| * The default value of the {@link #ACCEPT_TIMEOUT_PROPERTY_NAME} system |
| * property. |
| */ |
| public static final int DEFAULT_ACCEPT_TIMEOUT_MS = 9900; |
| |
| /** Test value for accept timeout */ |
| private final int acceptTimeout = Integer.getInteger(ACCEPT_TIMEOUT_PROPERTY_NAME, DEFAULT_ACCEPT_TIMEOUT_MS).intValue(); |
| |
| /** The mininum value of max-connections */ |
| public static final int MINIMUM_MAX_CONNECTIONS = 16; |
| |
| /** The buffer size for server-side sockets. */ |
| private final int socketBufferSize; |
| |
| /** Notifies clients of updates */ |
| private final CacheClientNotifier clientNotifier; |
| |
| /** |
| * The default value of the {@link ServerSocket} |
| * {@link #BACKLOG_PROPERTY_NAME}system property |
| */ |
| private static final int DEFAULT_BACKLOG = 1000; |
| |
| /** The system property name for setting the {@link ServerSocket}backlog */ |
| public static final String BACKLOG_PROPERTY_NAME = "BridgeServer.backlog"; |
| |
| /** |
| * Current number of ServerConnection instances that are CLIENT_TO_SERVER cons. |
| */ |
| public final AtomicInteger clientServerCnxCount = new AtomicInteger(); |
| |
| /** Has this acceptor been shut down */ |
| private volatile boolean shutdown = false; |
| |
| /** The thread that runs the acceptor */ |
| private Thread thread = null; |
| |
| /** The thread that runs the selector loop if any */ |
| private Thread selectorThread = null; |
| |
| /** |
| * Controls updates to {@link #allSCs} |
| */ |
| private final Object allSCsLock = new Object(); |
| |
| /** |
| * List of ServerConnection. |
| * |
| * Instances added when constructed; removed when terminated. |
| * |
| * @guarded.By {@link #allSCsLock} |
| */ |
| private final HashSet allSCs = new HashSet(); |
| |
| /** List of ServerConnections, for {@link #emergencyClose()} |
| * |
| * @guarded.By {@link #allSCsLock} |
| */ |
| private volatile ServerConnection allSCList[] = new ServerConnection[0]; |
| |
| /** |
| * The ip address or host name this acceptor is to bind to; |
| * <code>null</code> or "" indicates |
| * it will listen on all local addresses. |
| * @since 5.7 |
| */ |
| private final String bindHostName; |
| |
| /** |
| * A listener for connect/disconnect events |
| */ |
| private final ConnectionListener connectionListener; |
| |
| /** The client health monitor tracking connections for this acceptor */ |
| private ClientHealthMonitor healthMonitor; |
| |
| //Identifies if this Acceptor was started by SqlFabric Procedure |
| private final boolean isSqlFabricHub ; |
| |
| /** bridge's setting of notifyBySubscription */ |
| private final boolean notifyBySubscription; |
| |
| /** |
| * The AcceptorImpl identifier, used to identify the clients connected to |
| * this Acceptor. |
| */ |
| private long acceptorId; |
| |
| private static boolean isAuthenticationRequired; |
| |
| private static boolean isPostAuthzCallbackPresent; |
| |
| private boolean isGatewayReceiver; |
| private List<GatewayTransportFilter> gatewayTransportFilters; |
| /** |
| * Initializes this acceptor thread to listen for connections on the given |
| * port. |
| * |
| * @param port |
| * The port on which this acceptor listens for connections. If |
| * <code>0</code>, a random port will be chosen. |
| * @param bindHostName The ip address or host name this acceptor listens on for |
| * connections. |
| * If <code>null</code> or "" then all local addresses are used |
| * @param socketBufferSize |
| * The buffer size for server-side sockets |
| * @param maximumTimeBetweenPings |
| * The maximum time between client pings. This value is used by the |
| * <code>ClientHealthMonitor</code> to monitor the health of this |
| * server's clients. |
| * @param c |
| * The GemFire cache whose contents is served to clients |
| * @param maxConnections |
| * the maximum number of connections allowed in the server pool |
| * @param maxThreads |
| * the maximum number of threads allowed in the server pool |
| * @param isSqlfStarted |
| * true if the Accpetor is started via SqlFabric procedure |
| * |
| * @see SocketCreator#createServerSocket(int, int, InetAddress) |
| * @see ClientHealthMonitor |
| * @since 5.7 |
| */ |
| public AcceptorImpl(int port, |
| String bindHostName, boolean notifyBySubscription, |
| int socketBufferSize, int maximumTimeBetweenPings, |
| InternalCache c, int maxConnections, int maxThreads, |
| int maximumMessageCount, int messageTimeToLive, |
| int transactionTimeToLive, |
| ConnectionListener listener,List overflowAttributesList, |
| boolean isSqlfStarted, |
| boolean isGatewayReceiver, List<GatewayTransportFilter> transportFilter, |
| boolean tcpNoDelay) |
| throws IOException |
| { |
| this.bindHostName = calcBindHostName(c, bindHostName); |
| this.connectionListener = listener == null ? new ConnectionListenerAdapter() : listener; |
| this.notifyBySubscription = notifyBySubscription; |
| this.isSqlFabricHub = isSqlfStarted; |
| this.isGatewayReceiver = isGatewayReceiver; |
| this.gatewayTransportFilters = transportFilter; |
| { |
| int tmp_maxConnections = maxConnections; |
| if (tmp_maxConnections < MINIMUM_MAX_CONNECTIONS) { |
| tmp_maxConnections = MINIMUM_MAX_CONNECTIONS; |
| } |
| this.maxConnections = tmp_maxConnections; |
| } |
| { |
| int tmp_maxThreads = maxThreads; |
| if (maxThreads == BridgeServer.DEFAULT_MAX_THREADS) { |
| // consult system properties for 5.0.2 backwards compatibility |
| if (DEPRECATED_SELECTOR) { |
| tmp_maxThreads = DEPRECATED_SELECTOR_POOL_SIZE; |
| } |
| } |
| if (tmp_maxThreads < 0) { |
| tmp_maxThreads = 0; |
| } |
| else if (tmp_maxThreads > this.maxConnections) { |
| tmp_maxThreads = this.maxConnections; |
| } |
| boolean isWindows = false; |
| String os = System.getProperty("os.name"); |
| if (os != null) { |
| if (os.indexOf("Windows") != -1) { |
| isWindows = true; |
| } |
| } |
| if (tmp_maxThreads > 0 && isWindows) { |
| // bug #40472 and JDK bug 6230761 - NIO can't be used with IPv6 on Windows |
| if (getBindAddress() instanceof Inet6Address) { |
| logger.warn(LocalizedMessage.create(LocalizedStrings.AcceptorImpl_IGNORING_MAX_THREADS_DUE_TO_JROCKIT_NIO_BUG)); |
| tmp_maxThreads = 0; |
| } |
| // bug #40198 - Selector.wakeup() hangs if VM starts to exit |
| if (isJRockit) { |
| logger.warn(LocalizedMessage.create(LocalizedStrings.AcceptorImpl_IGNORING_MAX_THREADS_DUE_TO_WINDOWS_IPV6_BUG)); |
| tmp_maxThreads = 0; |
| } |
| } |
| this.maxThreads = tmp_maxThreads; |
| } |
| { |
| Selector tmp_s = null; |
| //Selector tmp2_s = null; |
| LinkedBlockingQueue tmp_q = null; |
| LinkedBlockingQueue tmp_commQ = null; |
| HashSet tmp_hs = null; |
| SystemTimer tmp_timer = null; |
| if (isSelector()) { |
| tmp_s = Selector.open(); // no longer catch ex to fix bug 36907 |
| //tmp2_s = Selector.open(); // workaround for bug 39624 |
| tmp_q = new LinkedBlockingQueue(); |
| tmp_commQ = new LinkedBlockingQueue(); |
| tmp_hs = new HashSet(512); |
| tmp_timer = new SystemTimer( |
| c.getDistributedSystem(), true); |
| } |
| this.selector = tmp_s; |
| //this.tmpSel = tmp2_s; |
| this.selectorQueue = tmp_q; |
| this.commBufferQueue = tmp_commQ; |
| this.selectorRegistrations = tmp_hs; |
| this.hsTimer = tmp_timer; |
| this.tcpNoDelay = tcpNoDelay; |
| } |
| |
| { |
| final int backLog = Integer.getInteger(BACKLOG_PROPERTY_NAME, DEFAULT_BACKLOG).intValue(); |
| SocketCreator sc = null; |
| DistributionConfig config = ((InternalDistributedSystem)c.getDistributedSystem()).getConfig(); |
| if(!isGatewayReceiver) { |
| //If configured use SSL properties for cache-server |
| sc = SocketCreator.createNonDefaultInstance(config.getServerSSLEnabled(), |
| config.getServerSSLRequireAuthentication(), |
| config.getServerSSLProtocols(), |
| config.getServerSSLCiphers(), |
| config.getServerSSLProperties()); |
| if(config.getServerSSLEnabled()) { |
| StringWriter sw = new StringWriter(); |
| PrintWriter writer = new PrintWriter(sw); |
| config.getServerSSLProperties().list(writer); |
| logger.info( |
| "Starting CacheServer with SSL config : Authentication Required {} Ciphers {} Protocols {} Other Properties {} ", |
| config.getServerSSLRequireAuthentication(), |
| config.getServerSSLCiphers(), |
| config.getServerSSLProtocols(), |
| sw.toString()); |
| } |
| } else { |
| sc = SocketCreator.createNonDefaultInstance(config.getGatewaySSLEnabled(), |
| config.getGatewaySSLRequireAuthentication(), |
| config.getGatewaySSLProtocols(), |
| config.getGatewaySSLCiphers(), |
| config.getGatewaySSLProperties()); |
| if(config.getGatewaySSLEnabled()) { |
| StringWriter sw = new StringWriter(); |
| PrintWriter writer = new PrintWriter(sw); |
| config.getGatewaySSLProperties().list(writer); |
| logger.info( |
| "Starting Gateway with SSL config : Authentication Required {} Ciphers {} Protocols {} Other Properties {} ", |
| config.getGatewaySSLRequireAuthentication(), |
| config.getGatewaySSLCiphers(), |
| config.getGatewaySSLProtocols(), |
| sw.toString()); |
| } |
| } |
| |
| final GemFireCacheImpl gc; |
| if (getCachedRegionHelper() != null) { |
| gc = (GemFireCacheImpl)getCachedRegionHelper().getCache(); |
| } |
| else { |
| gc = null; |
| } |
| final long tilt = System.currentTimeMillis() + 120 * 1000; |
| |
| if (isSelector()) { |
| if (sc.useSSL()) { |
| throw new IllegalArgumentException(LocalizedStrings.AcceptorImpl_SELECTOR_THREAD_POOLING_CAN_NOT_BE_USED_WITH_CLIENTSERVER_SSL_THE_SELECTOR_CAN_BE_DISABLED_BY_SETTING_MAXTHREADS0.toLocalizedString()); |
| } |
| ServerSocketChannel channel = ServerSocketChannel.open(); |
| this.serverSock = channel.socket(); |
| this.serverSock.setReuseAddress(true); |
| |
| // Set the receive buffer size before binding the socket so that large |
| // buffers will be allocated on accepted sockets (see |
| // java.net.ServerSocket.setReceiverBufferSize javadocs) |
| this.serverSock.setReceiveBufferSize(socketBufferSize); |
| |
| // fix for bug 36617. If BindException is thrown, retry after |
| // sleeping. The server may have been stopped and then |
| // immediately restarted, which sometimes results in a bind exception |
| for (;;) { |
| try { |
| this.serverSock.bind(new InetSocketAddress(getBindAddress(), port), |
| backLog); |
| break; |
| } |
| catch (SocketException b) { |
| if (! treatAsBindException(b) || |
| System.currentTimeMillis() > tilt) { |
| throw b; |
| } |
| } |
| |
| boolean interrupted = Thread.interrupted(); |
| try { |
| Thread.sleep(1000); |
| } |
| catch (InterruptedException e) { |
| interrupted = true; |
| } |
| finally { |
| if (interrupted) { |
| Thread.currentThread().interrupt(); |
| } |
| } |
| if (gc != null) { |
| gc.getCancelCriterion().checkCancelInProgress(null); |
| } |
| } // for |
| } // isSelector |
| else { // !isSelector |
| // fix for bug 36617. If BindException is thrown, retry after |
| // sleeping. The server may have been stopped and then |
| // immediately restarted, which sometimes results in a bind exception |
| for (;;) { |
| try { |
| this.serverSock = sc.createServerSocket(port, backLog, |
| getBindAddress(), this.gatewayTransportFilters, |
| socketBufferSize); |
| break; |
| } |
| catch (SocketException e) { |
| if (! treatAsBindException(e) || |
| System.currentTimeMillis() > tilt) { |
| throw e; |
| } |
| } |
| |
| boolean interrupted = Thread.interrupted(); |
| try { |
| Thread.sleep(1000); |
| } |
| catch (InterruptedException e) { |
| interrupted = true; |
| } |
| finally { |
| if (interrupted) { |
| Thread.currentThread().interrupt(); |
| } |
| } |
| if (gc != null) { |
| gc.getCancelCriterion().checkCancelInProgress(null); |
| } |
| } // for |
| } // !isSelector |
| |
| if (port == 0) { |
| port = this.serverSock.getLocalPort(); |
| } |
| { |
| InternalDistributedSystem ds = InternalDistributedSystem.getConnectedInstance(); |
| if (ds != null) { |
| DM dm = ds.getDistributionManager(); |
| if (dm != null && dm.getDistributionManagerId().getPort() == 0 && (dm instanceof LonerDistributionManager)) { |
| // a server with a loner distribution manager - update it's port number |
| ((LonerDistributionManager)dm).updateLonerPort(port); |
| } |
| } |
| } |
| this.localPort = port; |
| String sockName = this.serverSock.getLocalSocketAddress().toString(); |
| logger.info(LocalizedMessage.create( |
| LocalizedStrings.AcceptorImpl_BRIDGE_SERVER_CONNECTION_LISTENER_BOUND_TO_ADDRESS_0_WITH_BACKLOG_1, |
| new Object[] {sockName, Integer.valueOf(backLog)})); |
| if(isGatewayReceiver){ |
| this.stats = GatewayReceiverStats.createGatewayReceiverStats(sockName); |
| }else{ |
| this.stats = new CacheServerStats(sockName); |
| } |
| |
| } |
| |
| this.cache = c; |
| this.crHelper = new CachedRegionHelper(this.cache); |
| |
| this.clientNotifier = CacheClientNotifier.getInstance(cache, this.stats, |
| maximumMessageCount,messageTimeToLive, transactionTimeToLive, |
| connectionListener,overflowAttributesList, isGatewayReceiver); |
| this.socketBufferSize = socketBufferSize; |
| |
| // Create the singleton ClientHealthMonitor |
| this.healthMonitor = ClientHealthMonitor.getInstance(c, maximumTimeBetweenPings, |
| this.clientNotifier.getStats()); |
| |
| { |
| ThreadPoolExecutor tmp_pool = null; |
| String gName = "ServerConnection " |
| //+ serverSock.getInetAddress() |
| + "on port " + this.localPort; |
| final ThreadGroup socketThreadGroup |
| = LoggingThreadGroup.createThreadGroup(gName, logger); |
| |
| ThreadFactory socketThreadFactory = new ThreadFactory() { |
| int connNum = -1; |
| |
| public Thread newThread(final Runnable command) { |
| int tnum; |
| synchronized (this) { |
| tnum = ++connNum; |
| } |
| String tName = socketThreadGroup.getName() + " Thread " + tnum; |
| getStats().incConnectionThreadsCreated(); |
| Runnable r = new Runnable() { |
| public void run() { |
| try { |
| command.run(); |
| } |
| catch (CancelException e) { // bug 39463 |
| // ignore |
| } finally { |
| ConnectionTable.releaseThreadsSockets(); |
| } |
| } |
| }; |
| return new Thread(socketThreadGroup, r, tName); |
| } |
| }; |
| try { |
| if (isSelector()) { |
| tmp_pool = new PooledExecutorWithDMStats(new LinkedBlockingQueue(), |
| this.maxThreads, |
| getStats().getCnxPoolHelper(), |
| socketThreadFactory, |
| Integer.MAX_VALUE); |
| } else { |
| tmp_pool |
| = new ThreadPoolExecutor(MINIMUM_MAX_CONNECTIONS, |
| this.maxConnections, |
| 0L, |
| TimeUnit.MILLISECONDS, |
| new SynchronousQueue(), |
| socketThreadFactory); |
| } |
| } |
| catch (IllegalArgumentException poolInitException) { |
| this.stats.close(); |
| this.serverSock.close(); |
| throw poolInitException; |
| } |
| this.pool = tmp_pool; |
| } |
| { |
| ThreadPoolExecutor tmp_hsPool = null; |
| String gName = "Handshaker " |
| + serverSock.getInetAddress() |
| + ":" + this.localPort; |
| final ThreadGroup socketThreadGroup |
| = LoggingThreadGroup.createThreadGroup(gName, logger); |
| |
| ThreadFactory socketThreadFactory = new ThreadFactory() { |
| int connNum = -1; |
| |
| public Thread newThread(Runnable command) { |
| int tnum; |
| synchronized (this) { |
| tnum = ++connNum; |
| } |
| String tName = socketThreadGroup.getName() + " Thread " + tnum; |
| getStats().incAcceptThreadsCreated(); |
| return new Thread(socketThreadGroup, command, tName); |
| } |
| }; |
| try { |
| final BlockingQueue bq = new SynchronousQueue(); |
| final RejectedExecutionHandler reh = new RejectedExecutionHandler() { |
| public void rejectedExecution(Runnable r, ThreadPoolExecutor pool) { |
| try { |
| bq.put(r); |
| } |
| catch (InterruptedException ex) { |
| Thread.currentThread().interrupt(); // preserve the state |
| throw new RejectedExecutionException(LocalizedStrings.AcceptorImpl_INTERRUPTED.toLocalizedString(), ex); |
| } |
| } |
| }; |
| tmp_hsPool = new ThreadPoolExecutor(1, |
| HANDSHAKE_POOL_SIZE, |
| 60, TimeUnit.SECONDS, |
| bq, |
| socketThreadFactory, |
| reh); |
| } |
| catch (IllegalArgumentException poolInitException) { |
| this.stats.close(); |
| this.serverSock.close(); |
| this.pool.shutdownNow(); |
| throw poolInitException; |
| } |
| this.hsPool = tmp_hsPool; |
| } |
| |
| String authenticator = this.cache.getDistributedSystem().getProperties() |
| .getProperty(DistributionConfig.SECURITY_CLIENT_AUTHENTICATOR_NAME); |
| isAuthenticationRequired = (authenticator != null && authenticator.length() > 0) ? true |
| : false; |
| |
| String postAuthzFactoryName = this.cache.getDistributedSystem() |
| .getProperties().getProperty( |
| DistributionConfig.SECURITY_CLIENT_ACCESSOR_PP_NAME); |
| |
| isPostAuthzCallbackPresent = (postAuthzFactoryName != null && postAuthzFactoryName |
| .length() > 0) ? true : false; |
| } |
| |
| public long getAcceptorId(){ |
| return this.acceptorId; |
| } |
| |
| public CacheServerStats getStats() { |
| return this.stats; |
| } |
| |
| /** |
| * Returns true if this acceptor is using a selector to detect client events. |
| */ |
| public boolean isSelector() { |
| return this.maxThreads > 0; |
| } |
| /** |
| * This system property is only used if max-threads == 0. |
| * This is for 5.0.2 backwards compatibility. |
| * @deprecated since 5.1 use cache-server max-threads instead |
| */ |
| @Deprecated |
| private static final boolean DEPRECATED_SELECTOR = Boolean.getBoolean("BridgeServer.SELECTOR"); |
| |
| /** |
| * This system property is only used if max-threads == 0. |
| * This is for 5.0.2 backwards compatibility. |
| * @deprecated since 5.1 use cache-server max-threads instead |
| */ |
| @Deprecated |
| private final static int DEPRECATED_SELECTOR_POOL_SIZE = Integer.getInteger("BridgeServer.SELECTOR_POOL_SIZE", 16).intValue(); |
| private final static int HANDSHAKE_POOL_SIZE = Integer.getInteger("BridgeServer.HANDSHAKE_POOL_SIZE", 4).intValue(); |
| |
| @Override |
| public void start() throws IOException |
| { |
| ThreadGroup tg = LoggingThreadGroup.createThreadGroup("Acceptor " |
| + this.serverSock.getInetAddress() + ":" + this.localPort, logger); |
| thread = new Thread(tg, this, "Cache Server Acceptor " |
| + this.serverSock.getInetAddress() + ":" + this.localPort |
| + " local port: " + this.serverSock.getLocalPort()); |
| |
| this.acceptorId = thread.getId(); |
| |
| // This thread should not be a daemon to keep BridgeServers created |
| // in code from exiting immediately. |
| thread.start(); |
| |
| if (isSelector()) { |
| Runnable r = new Runnable() { |
| public void run() { |
| AcceptorImpl.this.runSelectorLoop(); |
| } |
| }; |
| this.selectorThread = new Thread(tg, r, "Cache Server Selector " |
| + this.serverSock.getInetAddress() |
| + ":" + this.localPort |
| + " local port: " |
| + this.serverSock.getLocalPort()); |
| this.selectorThread.start(); |
| } |
| GemFireCacheImpl myCache = (GemFireCacheImpl)cache; |
| Set<PartitionedRegion> prs = myCache.getPartitionedRegions(); |
| for(PartitionedRegion pr : prs){ |
| Map<Integer, BucketAdvisor.BucketProfile> profiles = new HashMap<Integer,BucketAdvisor.BucketProfile>(); |
| // get all local real bucket advisors |
| Map<Integer,BucketAdvisor> advisors = pr.getRegionAdvisor().getAllBucketAdvisors(); |
| for(Map.Entry<Integer,BucketAdvisor> entry : advisors.entrySet()){ |
| BucketAdvisor advisor = entry.getValue(); |
| // addLocally |
| BucketProfile bp = (BucketProfile) advisor.createProfile(); |
| advisor.updateServerBucketProfile(bp); |
| //advisor.basicAddClientProfile(bp); |
| profiles.put(entry.getKey(), bp); |
| } |
| Set receipients = new HashSet(); |
| receipients = pr.getRegionAdvisor().adviseAllPRNodes(); |
| // send it to all in one messgae |
| ReplyProcessor21 reply = AllBucketProfilesUpdateMessage.send(receipients, pr.getDistributionManager(), |
| pr.getPRId(), profiles, true); |
| if(reply != null) { |
| reply.waitForRepliesUninterruptibly(); |
| } |
| } |
| } |
| |
| public void registerSC(ServerConnection sc) { |
| synchronized (this.syncLock) { |
| if (!isRunning()) { |
| finishCon(sc); |
| return; |
| } |
| } |
| getSelectorQueue().offer(sc); |
| wakeupSelector(); |
| } |
| |
| /** wake up the selector thread */ |
| private void wakeupSelector() { |
| Selector s = getSelector(); |
| if (s != null && s.isOpen()) { |
| this.selector.wakeup(); |
| } |
| } |
| |
| public void unregisterSC(ServerConnection sc) { |
| // removed syncLock synchronization to fix bug 37104 |
| synchronized (this.allSCsLock) { |
| this.allSCs.remove(sc); |
| Iterator it = this.allSCs.iterator(); |
| |
| // TODO: if we know *where* in the list this connection was, we |
| // could use ArrayUtils here. |
| ServerConnection again[] = new ServerConnection[this.allSCs.size()]; |
| for (int i = 0; i < again.length; i ++) { |
| again[i] = (ServerConnection)it.next(); |
| } |
| this.allSCList = again; |
| } |
| if (!isRunning()) { |
| return; |
| } |
| // just need to wake the selector up so it will notice our socket was closed |
| wakeupSelector(); |
| } |
| |
| private void finishCon(ServerConnection sc) { |
| if (sc != null) { |
| sc.handleTermination(); |
| } |
| } |
| |
| private void drainSelectorQueue() { |
| ServerConnection sc = (ServerConnection)this.selectorQueue.poll(); |
| CancelException cce = null; |
| while (sc != null) { |
| try { |
| finishCon(sc); |
| } |
| catch (CancelException e) { |
| if (cce == null) { |
| cce = e; |
| } |
| } |
| sc = (ServerConnection)this.selectorQueue.poll(); |
| } |
| Iterator it = selectorRegistrations.iterator(); |
| while (it.hasNext()) { |
| try { |
| finishCon((ServerConnection)it.next()); |
| } |
| catch (CancelException e) { |
| if (cce == null) { |
| cce = e; |
| } |
| } |
| } // while |
| if (cce != null) { |
| throw cce; |
| } |
| } |
| |
| /** |
| * break any potential circularity in {@link #loadEmergencyClasses()} |
| */ |
| private static volatile boolean emergencyClassesLoaded = false; |
| |
| /** |
| * Ensure that the CachedRegionHelper and ServerConnection classes |
| * get loaded. |
| * |
| * @see SystemFailure#loadEmergencyClasses() |
| */ |
| public static void loadEmergencyClasses() { |
| if (emergencyClassesLoaded) return; |
| emergencyClassesLoaded = true; |
| CachedRegionHelper.loadEmergencyClasses(); |
| ServerConnection.loadEmergencyClasses(); |
| } |
| |
| /** |
| * @see SystemFailure#emergencyClose() |
| */ |
| public void emergencyClose() { |
| ServerSocket ss = this.serverSock; |
| if (ss != null) { |
| try { |
| ss.close(); |
| } |
| catch (IOException e) { |
| // ignore |
| } |
| } |
| // this.selector.close(); might NOT be safe |
| this.crHelper.setShutdown(true); |
| |
| // TODO I'm worried about a fat lock to acquire this synchronization |
| // synchronized (this.allSCsLock) |
| { |
| ServerConnection snap[] = this.allSCList; |
| for (int i = 0; i < snap.length; i ++) { |
| snap[i].emergencyClose(); // part of cleanup() |
| } |
| } |
| } |
| |
| private boolean isRegisteredObjectClosed(ServerConnection sc) { |
| return sc.isClosed(); |
| } |
| |
| private int checkRegisteredKeys(int count) { |
| int result = count; |
| CancelException cce = null; |
| if (count > 0) { |
| Iterator it = this.selectorRegistrations.iterator(); |
| while (it.hasNext()) { |
| ServerConnection sc = (ServerConnection)it.next(); |
| if (isRegisteredObjectClosed(sc)) { |
| result--; |
| it.remove(); |
| try { |
| finishCon(sc); |
| } |
| catch (CancelException e) { |
| if (cce == null) { |
| cce = e; |
| } |
| } |
| } |
| } // while |
| } |
| if (cce != null) { |
| throw cce; |
| } |
| return result; |
| } |
| |
| private static final boolean WORKAROUND_SELECTOR_BUG = Boolean.getBoolean("CacheServer.NIO_SELECTOR_WORKAROUND"); |
| |
| private Selector tmpSel; |
| |
| private void checkForStuckKeys() { |
| if (!WORKAROUND_SELECTOR_BUG) return; |
| if (tmpSel == null) { |
| try { |
| tmpSel = Selector.open(); |
| } catch (IOException ignore) { |
| logger.warn(LocalizedMessage.create(LocalizedStrings.AcceptorImpl_COULD_NOT_CHECK_FOR_STUCK_KEYS, ignore)); |
| return; |
| } |
| |
| } |
| //logger.info("DEBUG: checking for stuck keys"); |
| Iterator it = (new ArrayList(this.selector.keys())).iterator(); |
| while (it.hasNext()) { |
| SelectionKey sk = (SelectionKey)it.next(); |
| ServerConnection sc = (ServerConnection)sk.attachment(); |
| if (sc == null) continue; |
| try { |
| sk.cancel(); |
| this.selector.selectNow(); // clear the cancelled key |
| SelectionKey tmpsk = sc.getSelectableChannel().register(this.tmpSel, SelectionKey.OP_WRITE|SelectionKey.OP_READ); |
| try { |
| // it should always be writable |
| int events = this.tmpSel.selectNow(); |
| if (events == 0) { |
| logger.info(LocalizedMessage.create(LocalizedStrings.AcceptorImpl_STUCK_SELECTION_KEY_DETECTED_ON_0, sc)); |
| tmpsk.cancel(); |
| tmpSel.selectNow(); // clear canceled key |
| sc.registerWithSelector2(this.selector); |
| } else { |
| if (tmpsk.isValid() && tmpsk.isReadable()) { |
| // logger.info("DEBUG detected read event on " + sc); |
| try { |
| tmpsk.cancel(); |
| this.tmpSel.selectNow(); // clear canceled key |
| this.selectorRegistrations.remove(sc); |
| registeredKeys--; |
| sc.makeBlocking(); |
| // we need to say we are processing a message |
| // so that that client health monitor will not |
| // kill us while we wait for a thread in the thread pool. |
| // This is also be used to determine how long we are |
| // in the thread pool queue and to cancel operations that |
| // have waited too long in the queue. |
| sc.setProcessingMessage(); |
| } catch (ClosedChannelException ignore) { |
| finishCon(sc); |
| continue; |
| } catch (IOException ex) { |
| finishCon(sc); |
| if(isRunning()) { |
| logger.warn(LocalizedMessage.create(LocalizedStrings.AcceptorImpl_UNEXPECTED_EXCEPTION, ex)); |
| } |
| continue; |
| } |
| try { |
| AcceptorImpl.this.stats.incThreadQueueSize(); |
| AcceptorImpl.this.pool.execute(sc); |
| } catch (RejectedExecutionException rejected) { |
| finishCon(sc); |
| AcceptorImpl.this.stats.decThreadQueueSize(); |
| if (!isRunning()) { |
| break; |
| } |
| logger.warn(LocalizedMessage.create(LocalizedStrings.AcceptorImpl_UNEXPECTED_EXCEPTION, rejected)); |
| } |
| } else if (tmpsk.isValid() && tmpsk.isWritable()) { |
| // this is expected |
| tmpsk.cancel(); |
| this.tmpSel.selectNow(); // clear canceled key |
| sc.registerWithSelector2(this.selector); |
| } else if (!tmpsk.isValid()) { |
| tmpsk.cancel(); |
| this.tmpSel.selectNow(); // clear canceled key |
| sc.registerWithSelector2(this.selector); |
| } |
| } |
| } catch (IOException ex) { |
| if (isRunning() && this.selector.isOpen() && this.tmpSel.isOpen()) { |
| logger.warn(LocalizedMessage.create(LocalizedStrings.AcceptorImpl_UNEXPECTED_EXCEPTION, ex)); |
| try { |
| tmpsk.cancel(); |
| tmpSel.selectNow(); // clear canceled key |
| } catch (IOException ex2) { |
| if (isRunning() && this.selector.isOpen() && this.tmpSel.isOpen()) { |
| logger.warn(LocalizedMessage.create(LocalizedStrings.AcceptorImpl_UNEXPECTED_EXCEPTION, ex2)); |
| } |
| } |
| } |
| } |
| } catch (ClosedChannelException ignore) { // fix for bug 39650 |
| // just ignore this channel and try the next one |
| finishCon(sc); |
| continue; |
| } catch (IOException ex) { |
| if (isRunning() && this.selector.isOpen() && this.tmpSel.isOpen()) { |
| logger.warn(LocalizedMessage.create(LocalizedStrings.AcceptorImpl_UNEXPECTED_EXCEPTION, ex)); |
| } |
| } catch (NullPointerException npe) { // fix bug 39644 |
| if (isRunning() && this.selector.isOpen() && this.tmpSel.isOpen()) { |
| logger.warn(LocalizedMessage.create(LocalizedStrings.AcceptorImpl_UNEXPECTED_EXCEPTION, npe)); |
| } |
| } |
| } |
| } |
| |
| private int registeredKeys = 0; |
| |
| public void runSelectorLoop() { |
| //int zeroEventsCount = 0; |
| try { |
| logger.info(LocalizedMessage.create(LocalizedStrings.AcceptorImpl_SELECTOR_ENABLED)); |
| while (this.selector.isOpen() && !Thread.currentThread().isInterrupted()) { |
| { |
| SystemFailure.checkFailure(); |
| // this.cache.getDistributedSystem().getCancelCriterion().checkCancelInProgress(null); |
| if (((GemFireCacheImpl)this.cache).isClosed()) { // bug 38834 |
| break; // TODO should just ask cache's CancelCriterion |
| } |
| if (this.cache.getCancelCriterion().cancelInProgress() != null) { |
| break; |
| } |
| ServerConnection sc; |
| registeredKeys = checkRegisteredKeys(registeredKeys); |
| if (registeredKeys == 0) { |
| // do blocking wait on queue until we get some guys registered |
| // with the selector |
| sc = (ServerConnection)this.selectorQueue.take(); |
| } else { |
| // we already have some guys registered so just do a poll on queue |
| sc = (ServerConnection)this.selectorQueue.poll(); |
| } |
| while (sc != null) { |
| try { |
| sc.registerWithSelector2(this.selector); |
| registeredKeys++; |
| this.selectorRegistrations.add(sc); |
| } catch (ClosedChannelException cce) { |
| // for bug bug 38474 |
| finishCon(sc); |
| } catch (IOException ex) { |
| |
| finishCon(sc); |
| logger.warn(LocalizedMessage.create(LocalizedStrings.AcceptorImpl_IGNORING, ex)); |
| } catch (RuntimeException ex) { |
| finishCon(sc); |
| logger.warn(LocalizedMessage.create(LocalizedStrings.AcceptorImpl_IGNORING, ex)); |
| } |
| sc = (ServerConnection)this.selectorQueue.poll(); |
| } |
| } |
| if (registeredKeys == 0) { |
| continue; |
| } |
| int events = this.selector.select(); |
| // select() could have returned due to wakeup() during close of cache |
| if (this.cache.getCancelCriterion().cancelInProgress() != null) { |
| break; |
| } |
| if (events == 0) { |
| // zeroEventsCount++; |
| // if (zeroEventsCount > 0) { |
| // zeroEventsCount = 0; |
| checkForStuckKeys(); |
| |
| // try { |
| // this.selector.close(); // this selector is sick! |
| // } catch (IOException ignore) { |
| // } |
| // this.selector = Selector.open(); |
| // { |
| // Iterator it = selectorRegistrations.iterator(); |
| // while (it.hasNext()) { |
| // ServerConnection sc = (ServerConnection)it.next(); |
| // sc.registerWithSelector2(this.selector); |
| // } |
| // } |
| // } |
| // ArrayList al = new ArrayList(); |
| // Iterator keysIt = this.selector.keys().iterator(); |
| // while (keysIt.hasNext()) { |
| // SelectionKey sk = (SelectionKey)keysIt.next(); |
| // al.add(sk.attachment()); |
| // sk.cancel(); |
| // } |
| // events = this.selector.selectNow(); |
| // Iterator alIt = al.iterator(); |
| // while (alIt.hasNext()) { |
| // ServerConnection sc = (ServerConnection)alIt.next(); |
| // sc.registerWithSelector2(this.selector); |
| // } |
| // events = this.selector.select(); |
| // } else { |
| // zeroEventsCount = 0; |
| } |
| while (events > 0) { |
| int cancelCount = 0; |
| Set sk = this.selector.selectedKeys(); |
| if (sk == null) { |
| // something really bad has happened I'm not even sure this is possible |
| // but lhughes so an NPE during close one time so perhaps it can happen |
| // during selector close. |
| events = 0; |
| break; |
| } |
| Iterator keysIterator = sk.iterator(); |
| while (keysIterator.hasNext()) { |
| SelectionKey key = (SelectionKey) keysIterator.next(); |
| // Remove the key from the selector's selectedKeys |
| keysIterator.remove(); |
| final ServerConnection sc = (ServerConnection)key.attachment(); |
| try { |
| if (key.isValid() && key.isReadable()) { |
| // this is the only event we currently register for |
| try { |
| key.cancel(); |
| this.selectorRegistrations.remove(sc); |
| registeredKeys--; |
| cancelCount++; |
| sc.makeBlocking(); |
| // we need to say we are processing a message |
| // so that that client health monitor will not |
| // kill us while we wait for a thread in the thread pool. |
| // This is also be used to determine how long we are |
| // in the thread pool queue and to cancel operations that |
| // have waited too long in the queue. |
| sc.setProcessingMessage(); |
| } catch (ClosedChannelException ignore) { |
| finishCon(sc); |
| continue; |
| } catch (IOException ex) { |
| finishCon(sc); |
| if(isRunning()) { |
| logger.warn(LocalizedMessage.create(LocalizedStrings.AcceptorImpl_UNEXPECTED, ex)); |
| } |
| continue; |
| } |
| try { |
| AcceptorImpl.this.stats.incThreadQueueSize(); |
| AcceptorImpl.this.pool.execute(sc); |
| } catch (RejectedExecutionException rejected) { |
| finishCon(sc); |
| AcceptorImpl.this.stats.decThreadQueueSize(); |
| if (!isRunning()) { |
| break; |
| } |
| logger.warn(LocalizedMessage.create(LocalizedStrings.AcceptorImpl_UNEXPECTED, rejected)); |
| } |
| // } else if (key.isValid() && key.isConnectable()) { |
| // logger.info("DEBUG isConnectable and isValid key=" + key); |
| // finishCon(sc); |
| } else { |
| finishCon(sc); |
| if (key.isValid()) { |
| logger.warn(LocalizedMessage.create(LocalizedStrings.AcceptorImpl_IGNORING_EVENT_ON_SELECTOR_KEY__0, key)); |
| // } else { |
| // logger.info("DEBUG !isValid key=" + key); |
| } |
| } |
| } catch (CancelledKeyException ex) { // fix for bug 37739 |
| finishCon(sc); |
| } |
| } |
| if (cancelCount > 0 && this.selector.isOpen()) { |
| // we need to do a select to cause the cancel to be unregisters. |
| events = this.selector.selectNow(); |
| } else { |
| events = 0; |
| } |
| } |
| } |
| } catch (InterruptedException ex) { |
| // allow this thread to die |
| Thread.currentThread().interrupt(); |
| } catch (ClosedSelectorException ex) { |
| // allow this thread to exit |
| } catch (IOException ex) { |
| logger.warn(LocalizedMessage.create(LocalizedStrings.AcceptorImpl_UNEXPECTED, ex)); |
| } finally { |
| try { |
| drainSelectorQueue(); |
| } |
| finally { |
| // note that if this method was called by close then the |
| // following call is a noop since the first thing it does |
| // is call isRunning. |
| close(); // make sure this is called to fix bug 37749 |
| } |
| } |
| } |
| |
| @Override |
| public int getPort() |
| { |
| return localPort; |
| } |
| |
| public InetAddress getServerInetAddr() |
| { |
| return this.serverSock.getInetAddress(); |
| } |
| |
| /** |
| * The work loop of this acceptor |
| * |
| * @see #accept |
| */ |
| public void run() |
| { |
| try { |
| accept(); |
| } |
| catch (CancelException e) { // bug 39462 |
| // ignore |
| } |
| finally { |
| try { |
| if (this.serverSock != null) { |
| this.serverSock.close(); |
| } |
| } |
| catch (IOException ignore) { |
| } |
| if (this.stats != null) { |
| this.stats.close(); |
| } |
| } |
| } |
| |
| public Selector getSelector() { |
| return this.selector; |
| } |
| public BlockingQueue getSelectorQueue() { |
| return this.selectorQueue; |
| } |
| |
| protected boolean loggedAcceptError = false; |
| |
| protected static void closeSocket(Socket s) { |
| if (s != null) { |
| try { |
| s.close(); |
| } |
| catch (IOException ignore) { |
| } |
| } |
| } |
| /** |
| * {@linkplain ServerSocket#accept Listens}for a client to connect and then |
| * creates a new {@link ServerConnection}to handle messages from that client. |
| */ |
| @Override |
| public void accept() |
| { |
| while (isRunning()) { |
| if (SystemFailure.getFailure() != null) { |
| // Allocate no objects here! |
| ServerSocket s = serverSock; |
| if (s != null) { |
| try { |
| s.close(); |
| } |
| catch (IOException e) { |
| // don't care |
| } |
| } |
| SystemFailure.checkFailure(); // throws |
| } |
| // moved this check out of the try. If we are cancelled then we need |
| // to break out of this while loop. |
| |
| crHelper.checkCancelInProgress(null); // throws |
| |
| Socket s = null; |
| try { |
| s = serverSock.accept(); |
| crHelper.checkCancelInProgress(null); // throws |
| |
| // Optionally enable SO_KEEPALIVE in the OS network protocol. |
| s.setKeepAlive(SocketCreator.ENABLE_TCP_KEEP_ALIVE); |
| |
| // The synchronization below was added to prevent close from being |
| // called |
| // while a ServerConnection is being instantiated. This should prevent |
| // the |
| // following exception: |
| // [severe 2004/12/15 18:49:17.671 PST gemfire2 Server connection from |
| // balrog.gemstone.com:58478-0x6ce1 nid=0x1334aa] Uncaught exception in |
| // thread Server connection from balrog.gemstone.com:58478 |
| // java.lang.NullPointerException |
| // at |
| // com.gemstone.gemfire.internal.cache.tier.sockets.ServerConnection.run(ServerConnection.java:107) |
| |
| if (this.crHelper.emulateSlowServer() > 0) { |
| try { |
| Thread.sleep(this.crHelper.emulateSlowServer()); |
| } |
| catch (InterruptedException ugh) { |
| // This had better be due to shutdown; don't reenable the bit, |
| // it would just cause a hot-loop. |
| // Thread.currentThread().interrupt(); |
| }; |
| } |
| |
| synchronized (this.syncLock) { |
| if (!isRunning()) { |
| closeSocket(s); |
| break; |
| } |
| } |
| SocketCreator.getDefaultInstance().configureServerSSLSocket(s); |
| this.loggedAcceptError = false; |
| |
| handOffNewClientConnection(s); |
| } |
| catch (InterruptedIOException e) { // Solaris only |
| closeSocket(s); |
| if (isRunning()) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("Aborted due to interrupt: {}", e); |
| } |
| } |
| } |
| catch (IOException e) { |
| if (isRunning()) { |
| if (e instanceof SSLException) { |
| try { |
| // Try to send a proper rejection message |
| ServerHandShakeProcessor.refuse(s.getOutputStream(), e.toString(), |
| HandShake.REPLY_EXCEPTION_AUTHENTICATION_FAILED); |
| } |
| catch (IOException ex) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("Bridge server: Unable to write SSL error"); |
| } |
| } |
| } |
| } |
| closeSocket(s); |
| if (isRunning()) { |
| if (!this.loggedAcceptError) { |
| this.loggedAcceptError = true; |
| logger.error(LocalizedMessage.create(LocalizedStrings.AcceptorImpl_BRIDGE_SERVER_UNEXPECTED_IOEXCEPTION_FROM_ACCEPT, e)); |
| } |
| // Why sleep? |
| // try {Thread.sleep(3000);} catch (InterruptedException ie) {} |
| } |
| } |
| catch (CancelException e) { |
| closeSocket(s); |
| throw e; |
| } |
| catch (Exception e) { |
| closeSocket(s); |
| if (isRunning()) { |
| logger.fatal(LocalizedMessage.create(LocalizedStrings.AcceptorImpl_BRIDGE_SERVER_UNEXPECTED_EXCEPTION, e)); |
| } |
| } |
| } |
| } |
| |
| |
| /** |
| * Hand off a new client connection to the thread pool that processes handshakes. |
| * If all the threads in this pool are busy then the hand off will block until |
| * a thread is available. |
| * This blocking is good because it will throttle the rate at which we create |
| * new connections. |
| */ |
| private void handOffNewClientConnection(final Socket s) { |
| try { |
| this.stats.incAcceptsInProgress(); |
| this.hsPool.execute(new Runnable() { |
| public void run() { |
| boolean finished = false; |
| try { |
| handleNewClientConnection(s); |
| finished = true; |
| } |
| catch (RegionDestroyedException rde) { |
| // aborted due to disconnect - bug 42273 |
| if (rde.getMessage().indexOf("HARegion") == -1) { |
| throw rde; |
| } |
| } |
| catch (CancelException e) { |
| // aborted due to shutdown - bug 37318 |
| } |
| catch (java.nio.channels.AsynchronousCloseException expected) { |
| // this is expected when our TimerTask times out an accepted socket |
| } catch (IOException | ToDataException ex) { // added ToDataException to fix bug 44659 |
| if (isRunning()) { |
| if (!AcceptorImpl.this.loggedAcceptError) { |
| AcceptorImpl.this.loggedAcceptError = true; |
| if (ex instanceof SocketTimeoutException) { |
| logger.warn(LocalizedMessage.create(LocalizedStrings.AcceptorImpl_BRIDGE_SERVER_FAILED_ACCEPTING_CLIENT_CONNECTION_DUE_TO_SOCKET_TIMEOUT)); |
| } |
| else { |
| logger.warn(LocalizedMessage.create(LocalizedStrings.AcceptorImpl_BRIDGE_SERVER_FAILED_ACCEPTING_CLIENT_CONNECTION__0, ex), ex); |
| } |
| } |
| } |
| } |
| finally { |
| if (!finished) { |
| closeSocket(s); |
| } |
| if (isRunning()) { |
| AcceptorImpl.this.stats.decAcceptsInProgress(); |
| } |
| } |
| } |
| }); |
| } catch (RejectedExecutionException rejected) { |
| closeSocket(s); |
| if (isRunning()) { |
| this.stats.decAcceptsInProgress(); |
| logger.warn(LocalizedMessage.create(LocalizedStrings.AcceptorImpl_UNEXPECTED, rejected)); |
| } |
| } |
| } |
| |
| public ByteBuffer takeCommBuffer() { |
| ByteBuffer result = (ByteBuffer)this.commBufferQueue.poll(); |
| if (result == null) { |
| result = ByteBuffer.allocateDirect(this.socketBufferSize); |
| } |
| return result; |
| } |
| public void releaseCommBuffer(ByteBuffer bb) { |
| if (bb == null) { // fix for bug 37107 |
| return; |
| } |
| if (isRunning()) { |
| this.commBufferQueue.offer(bb); |
| } |
| } |
| |
| public void incClientServerCnxCount() { |
| this.clientServerCnxCount.incrementAndGet(); |
| } |
| |
| public void decClientServerCnxCount() { |
| this.clientServerCnxCount.decrementAndGet(); |
| } |
| public int getClientServerCnxCount() { |
| return this.clientServerCnxCount.get(); |
| } |
| |
| protected void handleNewClientConnection(final Socket s) throws IOException |
| { |
| // Read the first byte. If this socket is being used for 'client to server' |
| // communication, create a ServerConnection. If this socket is being used |
| // for 'server to client' communication, send it to the CacheClientNotifier |
| // for processing. |
| byte communicationMode; |
| if (isSelector()) { |
| ByteBuffer bb = ByteBuffer.allocateDirect(1); |
| final SocketChannel sc = s.getChannel(); |
| sc.configureBlocking(false); |
| // try to read the byte first in non-blocking mode |
| int res = sc.read(bb); |
| sc.configureBlocking(true); |
| if (res < 0) { |
| throw new EOFException(); |
| } |
| else if (res == 0) { |
| // now do a blocking read so setup a timer to close the socket if the |
| // the read takes too long |
| SystemTimer.SystemTimerTask st = new SystemTimer.SystemTimerTask() { |
| @Override |
| public void run2() { |
| logger.warn(LocalizedMessage.create(LocalizedStrings.AcceptorImpl_BRIDGE_SERVER_TIMED_OUT_WAITING_FOR_HANDSHAKE_FROM__0, s.getRemoteSocketAddress())); |
| closeSocket(s); |
| } |
| }; |
| this.hsTimer.schedule(st, this.acceptTimeout); |
| res = sc.read(bb); |
| if ((!st.cancel()) || res <= 0) { |
| throw new EOFException(); |
| } |
| } |
| communicationMode = bb.get(0); |
| if (logger.isTraceEnabled()) { |
| logger.trace("read communications mode(1) ", communicationMode); |
| } |
| } else { |
| s.setSoTimeout(this.acceptTimeout); |
| communicationMode = (byte)SocketUtils.getInputStream(s).read();//getInputStream().read(); |
| if (logger.isTraceEnabled()) { |
| logger.trace("read communications mode(2) ", communicationMode); |
| } |
| if (communicationMode == -1) { |
| throw new EOFException(); |
| } |
| s.setSoTimeout(0); |
| } |
| |
| s.setTcpNoDelay(this.tcpNoDelay); |
| |
| if (communicationMode == CLIENT_TO_SERVER |
| || communicationMode == GATEWAY_TO_GATEWAY |
| || communicationMode == MONITOR_TO_SERVER |
| || communicationMode == CLIENT_TO_SERVER_FOR_QUEUE) { |
| String communicationModeStr = ""; |
| switch (communicationMode) { |
| case CLIENT_TO_SERVER: |
| communicationModeStr = "client"; |
| break; |
| case GATEWAY_TO_GATEWAY: |
| communicationModeStr = "gateway"; |
| break; |
| case MONITOR_TO_SERVER: |
| communicationModeStr = "monitor"; |
| break; |
| case CLIENT_TO_SERVER_FOR_QUEUE: |
| communicationModeStr = "clientToServerForQueue"; |
| break; |
| } |
| if (logger.isDebugEnabled()) { |
| logger.debug("Bridge server: Initializing {} communication socket: {}", communicationModeStr, s); |
| } |
| if (communicationMode != CLIENT_TO_SERVER_FOR_QUEUE) |
| { |
| int curCnt = this.getClientServerCnxCount(); |
| if (curCnt >= this.maxConnections) { |
| logger.warn(LocalizedMessage.create( |
| LocalizedStrings.AcceptorImpl_REJECTED_CONNECTION_FROM_0_BECAUSE_CURRENT_CONNECTION_COUNT_OF_1_IS_GREATER_THAN_OR_EQUAL_TO_THE_CONFIGURED_MAX_OF_2, |
| new Object[] {s.getInetAddress(), Integer.valueOf(curCnt), Integer.valueOf(this.maxConnections)})); |
| // if (s != null) (cannot be null) |
| { |
| try { |
| ServerHandShakeProcessor.refuse(s.getOutputStream(), |
| LocalizedStrings.AcceptorImpl_EXCEEDED_MAX_CONNECTIONS_0 |
| .toLocalizedString(Integer.valueOf(this.maxConnections))); |
| } |
| catch (Exception ex) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("rejection message failed", ex); |
| } |
| } |
| closeSocket(s); |
| } |
| return; |
| } |
| } |
| ServerConnection serverConn = new ServerConnection(s, this.cache, |
| this.crHelper, this.stats, AcceptorImpl.handShakeTimeout, |
| this.socketBufferSize, communicationModeStr, communicationMode, this); |
| synchronized (this.allSCsLock) { |
| this.allSCs.add(serverConn); |
| ServerConnection snap[] = this.allSCList; // avoid volatile read |
| this.allSCList = (ServerConnection[]) |
| ArrayUtils.insert(snap, snap.length, serverConn); |
| } |
| if (communicationMode != CLIENT_TO_SERVER_FOR_QUEUE) { |
| incClientServerCnxCount(); |
| } |
| if (isSelector()) { |
| serverConn.registerWithSelector(); |
| } else { |
| try { |
| pool.execute(serverConn); |
| } catch (RejectedExecutionException rejected) { |
| if (!isRunning()) { |
| return; |
| } |
| logger.warn(LocalizedMessage.create( |
| LocalizedStrings.AcceptorImpl_REJECTED_CONNECTION_FROM_0_BECAUSE_REQUEST_REJECTED_BY_POOL, |
| new Object[] {serverConn})); |
| try { |
| ServerHandShakeProcessor.refuse(s.getOutputStream(), |
| LocalizedStrings.AcceptorImpl_EXCEEDED_MAX_CONNECTIONS_0 |
| .toLocalizedString(Integer.valueOf(this.maxConnections))); |
| |
| } |
| catch (Exception ex) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("rejection message failed", ex); |
| } |
| } |
| serverConn.cleanup(); |
| } |
| } |
| } |
| else if (communicationMode == PRIMARY_SERVER_TO_CLIENT) { |
| if (logger.isDebugEnabled()) { |
| logger.debug(":Bridge server: Initializing primary server-to-client communication socket: {}", s); |
| } |
| // try { |
| AcceptorImpl.this.clientNotifier.registerClient(s, true, this.acceptorId, |
| this.notifyBySubscription); |
| } |
| else if (communicationMode == SECONDARY_SERVER_TO_CLIENT) { |
| if (logger.isDebugEnabled()) { |
| logger.debug(":Bridge server: Initializing secondary server-to-client communication socket: {}", s); |
| } |
| AcceptorImpl.this.clientNotifier.registerClient(s, false, this.acceptorId, |
| this.notifyBySubscription); |
| } else { |
| throw new IOException("Acceptor received unknown communication mode: " |
| + communicationMode); |
| } |
| } |
| |
| @Override |
| public boolean isRunning() |
| { |
| return !this.shutdown; |
| } |
| |
| @Override |
| @SuppressFBWarnings(value="REC_CATCH_EXCEPTION", justification="Allow this thread to die") |
| public void close() { |
| if (!isRunning()) { |
| return; |
| } |
| |
| try { |
| synchronized (syncLock) { |
| this.shutdown = true; |
| logger.info(LocalizedMessage.create(LocalizedStrings.AcceptorImpl_BRIDGE_SERVER_ON_PORT_0_IS_SHUTTING_DOWN, this.localPort)); |
| if (this.thread != null) { |
| this.thread.interrupt(); |
| } |
| this.serverSock.close(); |
| crHelper.setShutdown(true); // set this before shutting down the pool |
| if (isSelector()) { |
| this.hsTimer.cancel(); |
| if (this.tmpSel != null) { |
| try { |
| this.tmpSel.close(); |
| } catch (IOException ignore) { |
| } |
| } |
| try { |
| wakeupSelector(); |
| this.selector.close(); |
| } catch (IOException ignore) { |
| } |
| if (this.selectorThread != null) { |
| this.selectorThread.interrupt(); |
| } |
| this.commBufferQueue.clear(); |
| } |
| ClientHealthMonitor.shutdownInstance(); |
| shutdownSCs(); |
| this.clientNotifier.shutdown(this.acceptorId); |
| this.pool.shutdown(); |
| if (!this.pool.awaitTermination(PoolImpl.SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS)) { |
| logger.warn(LocalizedMessage.create(LocalizedStrings.PoolImpl_TIMEOUT_WAITING_FOR_BACKGROUND_TASKS_TO_COMPLETE)); |
| this.pool.shutdownNow(); |
| } |
| this.hsPool.shutdownNow(); |
| this.stats.close(); |
| GemFireCacheImpl myCache = (GemFireCacheImpl)cache; |
| if (!myCache.forcedDisconnect()) { |
| Set<PartitionedRegion> prs = myCache.getPartitionedRegions(); |
| for (PartitionedRegion pr : prs) { |
| Map<Integer, BucketAdvisor.BucketProfile> profiles = new HashMap<Integer, BucketAdvisor.BucketProfile>(); |
| // get all local real bucket advisors |
| Map<Integer, BucketAdvisor> advisors = pr.getRegionAdvisor() |
| .getAllBucketAdvisors(); |
| for (Map.Entry<Integer, BucketAdvisor> entry : advisors.entrySet()) { |
| BucketAdvisor advisor = entry.getValue(); |
| BucketProfile bp = (BucketProfile)advisor.createProfile(); |
| advisor.updateServerBucketProfile(bp); |
| profiles.put(entry.getKey(), bp); |
| } |
| Set receipients = new HashSet(); |
| receipients = pr.getRegionAdvisor().adviseAllPRNodes(); |
| // send it to all in one messgae |
| ReplyProcessor21 reply = AllBucketProfilesUpdateMessage.send( |
| receipients, pr.getDistributionManager(), pr.getPRId(), profiles, |
| true); |
| if (reply != null) { |
| reply.waitForRepliesUninterruptibly(); |
| } |
| |
| if (logger.isDebugEnabled()) { |
| logger.debug("sending messages to all peers for removing this server.."); |
| } |
| } |
| } |
| |
| } // synchronized |
| } |
| catch (Exception e) {/* ignore */ |
| logger.warn(LocalizedMessage.create(LocalizedStrings.AcceptorImpl_UNEXPECTED), e); |
| } |
| finally{ |
| } |
| } |
| |
| private void shutdownSCs() { |
| // added to fix part 2 of bug 37351. |
| synchronized (this.allSCsLock) { |
| ServerConnection snap[] = this.allSCList; |
| for (int i = 0; i < snap.length; i ++) { |
| snap[i].cleanup(); |
| } |
| } |
| } |
| |
| |
| // protected InetAddress getBindAddress() { |
| // return this.bindAddress; |
| // } |
| |
| // /** |
| // * Calculates the bind address based on gemfire.properties. |
| // * Returns null if no bind address is configured. |
| // * @since 5.7 |
| // */ |
| // public static InetAddress calcBindAddress(Cache cache) throws IOException { |
| // InternalDistributedSystem system = (InternalDistributedSystem)cache |
| // .getDistributedSystem(); |
| // DistributionConfig config = system.getConfig(); |
| // InetAddress address = null; |
| |
| // // Get the server-bind-address. If it is not null, use it. |
| // // If it is null, get the bind-address. If it is not null, use it. |
| // // Otherwise set default. |
| // String serverBindAddress = config.getServerBindAddress(); |
| // if (serverBindAddress != null && serverBindAddress.length() > 0) { |
| // address = InetAddress.getByName(serverBindAddress); |
| // } else { |
| // String bindAddress = config.getBindAddress(); |
| // if (bindAddress != null && bindAddress.length() > 0) { |
| // address = InetAddress.getByName(bindAddress); |
| // } |
| // } |
| // return address; |
| // } |
| /** |
| * @param bindName the ip address or host name that this acceptor should |
| * bind to. If null or "" then calculate it. |
| * @return the ip address or host name this acceptor will listen on. |
| * An "" if all local addresses will be listened to. |
| |
| * @since 5.7 |
| */ |
| private static String calcBindHostName(Cache cache, String bindName) { |
| if (bindName != null && !bindName.equals("")) { |
| return bindName; |
| } |
| |
| InternalDistributedSystem system = (InternalDistributedSystem)cache |
| .getDistributedSystem(); |
| DistributionConfig config = system.getConfig(); |
| String hostName = null; |
| |
| // Get the server-bind-address. If it is not null, use it. |
| // If it is null, get the bind-address. If it is not null, use it. |
| // Otherwise set default. |
| String serverBindAddress = config.getServerBindAddress(); |
| if (serverBindAddress != null && serverBindAddress.length() > 0) { |
| hostName = serverBindAddress; |
| } else { |
| String bindAddress = config.getBindAddress(); |
| if (bindAddress != null && bindAddress.length() > 0) { |
| hostName = bindAddress; |
| } |
| } |
| return hostName; |
| } |
| |
| private InetAddress getBindAddress() throws IOException { |
| if (this.bindHostName == null || "".equals(this.bindHostName)) { |
| return null; // pick default local address |
| } else { |
| return InetAddress.getByName(this.bindHostName); |
| } |
| } |
| |
| /** |
| * Gets the address that this bridge server can be contacted on from external |
| * processes. |
| * @since 5.7 |
| */ |
| public String getExternalAddress() { |
| String result = this.bindHostName; |
| boolean needCanonicalHostName = false; |
| if (result == null || "".equals(result)) { |
| needCanonicalHostName = true; |
| } else { |
| // check to see if we are listening on all local addresses |
| ServerSocket ss = this.serverSock; |
| if (ss != null && ss.getLocalSocketAddress() instanceof InetSocketAddress) { |
| InetSocketAddress isa = (InetSocketAddress)ss.getLocalSocketAddress(); |
| InetAddress ssAddr = isa.getAddress(); |
| if (ssAddr != null) { |
| if (ssAddr.isAnyLocalAddress()) { |
| needCanonicalHostName = true; |
| } |
| } |
| } |
| } |
| if (needCanonicalHostName) { |
| try { |
| result = SocketCreator.getLocalHost().getCanonicalHostName(); |
| } catch (UnknownHostException ex) { |
| throw new IllegalStateException("getLocalHost failed with " + ex); |
| } |
| } |
| return result; |
| } |
| |
| /** |
| * This method finds a client notifier and returns it. It is used to |
| * propagate interest registrations to other servers |
| * |
| * @return the instance that provides client notification |
| */ |
| public CacheClientNotifier getCacheClientNotifier() |
| { |
| return this.clientNotifier; |
| } |
| |
| public CachedRegionHelper getCachedRegionHelper() { |
| return this.crHelper; |
| } |
| |
| public ClientHealthMonitor getClientHealthMonitor() { |
| return healthMonitor; |
| } |
| |
| public ConnectionListener getConnectionListener() { |
| return connectionListener; |
| } |
| |
| public boolean isSqlFabricSystem() { |
| return this.isSqlFabricHub; |
| } |
| |
| public boolean isGatewayReceiver() { |
| return this.isGatewayReceiver; |
| } |
| |
| public List<GatewayTransportFilter> getGatewayTransportFilters(){ |
| return this.gatewayTransportFilters; |
| } |
| |
| //IBM J9 sometimes reports "listen failed" instead of BindException |
| //see bug #40589 |
| public static boolean treatAsBindException(SocketException se) { |
| if(se instanceof BindException) { |
| return true; |
| } |
| final String msg = se.getMessage(); |
| return (msg != null && msg.contains("Invalid argument: listen failed")); |
| } |
| |
| public static boolean isAuthenticationRequired() { |
| return isAuthenticationRequired; |
| } |
| |
| public static boolean isPostAuthzCallbackPresent() { |
| return isPostAuthzCallbackPresent; |
| } |
| |
| public Set<ServerConnection> getAllServerConnections(){ |
| return Collections.unmodifiableSet(allSCs); |
| } |
| |
| /** |
| * This method returns a thread safe structure which can be iterated over without worrying about ConcurrentModificationException. |
| * JMX MBeans/Commands need to iterate over this list to get client info. |
| * |
| */ |
| public ServerConnection[] getAllServerConnectionList(){ |
| return this.allSCList; |
| } |
| } |