| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.internal.util.nio; |
| |
| import java.io.IOException; |
| import java.lang.reflect.Field; |
| import java.net.InetAddress; |
| import java.net.InetSocketAddress; |
| import java.net.Socket; |
| import java.net.SocketAddress; |
| import java.nio.ByteBuffer; |
| import java.nio.ByteOrder; |
| import java.nio.channels.ClosedByInterruptException; |
| import java.nio.channels.ClosedChannelException; |
| import java.nio.channels.ClosedSelectorException; |
| import java.nio.channels.ReadableByteChannel; |
| import java.nio.channels.SelectionKey; |
| import java.nio.channels.Selector; |
| import java.nio.channels.ServerSocketChannel; |
| import java.nio.channels.SocketChannel; |
| import java.nio.channels.WritableByteChannel; |
| import java.nio.channels.spi.SelectorProvider; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.Deque; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Map.Entry; |
| import java.util.Set; |
| import java.util.concurrent.ConcurrentLinkedQueue; |
| import java.util.concurrent.ThreadLocalRandom; |
| import java.util.concurrent.atomic.AtomicLong; |
| import org.apache.ignite.IgniteCheckedException; |
| import org.apache.ignite.IgniteException; |
| import org.apache.ignite.IgniteLogger; |
| import org.apache.ignite.IgniteSystemProperties; |
| import org.apache.ignite.configuration.ConnectorConfiguration; |
| import org.apache.ignite.internal.IgniteInternalFuture; |
| import org.apache.ignite.internal.IgniteInterruptedCheckedException; |
| import org.apache.ignite.internal.managers.communication.GridIoMessage; |
| import org.apache.ignite.internal.processors.metric.MetricRegistry; |
| import org.apache.ignite.internal.processors.metric.impl.LongAdderMetric; |
| import org.apache.ignite.internal.processors.tracing.MTC; |
| import org.apache.ignite.internal.processors.tracing.MTC.TraceSurroundings; |
| import org.apache.ignite.internal.processors.tracing.NoopSpan; |
| import org.apache.ignite.internal.processors.tracing.NoopTracing; |
| import org.apache.ignite.internal.processors.tracing.Span; |
| import org.apache.ignite.internal.processors.tracing.SpanTags; |
| import org.apache.ignite.internal.processors.tracing.SpanType; |
| import org.apache.ignite.internal.processors.tracing.Tracing; |
| import org.apache.ignite.internal.util.GridConcurrentHashSet; |
| import org.apache.ignite.internal.util.GridUnsafe; |
| import org.apache.ignite.internal.util.future.GridCompoundFuture; |
| import org.apache.ignite.internal.util.nio.ssl.GridNioSslFilter; |
| import org.apache.ignite.internal.util.tostring.GridToStringExclude; |
| import org.apache.ignite.internal.util.typedef.F; |
| import org.apache.ignite.internal.util.typedef.internal.A; |
| import org.apache.ignite.internal.util.typedef.internal.LT; |
| import org.apache.ignite.internal.util.typedef.internal.S; |
| import org.apache.ignite.internal.util.typedef.internal.U; |
| import org.apache.ignite.internal.util.worker.GridWorker; |
| import org.apache.ignite.internal.util.worker.GridWorkerListener; |
| import org.apache.ignite.lang.IgniteBiInClosure; |
| import org.apache.ignite.lang.IgniteInClosure; |
| import org.apache.ignite.lang.IgnitePredicate; |
| import org.apache.ignite.lang.IgniteReducer; |
| import org.apache.ignite.lang.IgniteRunnable; |
| import org.apache.ignite.plugin.extensions.communication.Message; |
| import org.apache.ignite.plugin.extensions.communication.MessageReader; |
| import org.apache.ignite.plugin.extensions.communication.MessageWriter; |
| import org.apache.ignite.thread.IgniteThread; |
| import org.jetbrains.annotations.Nullable; |
| |
| import static org.apache.ignite.failure.FailureType.CRITICAL_ERROR; |
| import static org.apache.ignite.failure.FailureType.SYSTEM_WORKER_TERMINATION; |
| import static org.apache.ignite.internal.processors.tracing.messages.TraceableMessagesTable.traceName; |
| import static org.apache.ignite.internal.util.nio.GridNioSessionMetaKey.MSG_WRITER; |
| import static org.apache.ignite.internal.util.nio.GridNioSessionMetaKey.NIO_OPERATION; |
| import static org.apache.ignite.internal.processors.tracing.SpanType.COMMUNICATION_SOCKET_WRITE; |
| |
| /** |
| * TCP NIO server. Due to asynchronous nature of connections processing |
| * network events such as client connection, disconnection and message receiving are passed to |
| * the server listener. Once client connected, an associated {@link GridNioSession} object is |
| * created and can be used in communication. |
| * <p> |
| * This implementation supports several selectors and several reading threads. |
| * |
| * @param <T> Message type. |
| * |
| */ |
| public class GridNioServer<T> { |
| /** */ |
| public static final String IGNITE_IO_BALANCE_RANDOM_BALANCE = "IGNITE_IO_BALANCE_RANDOM_BALANCER"; |
| |
| /** Default session write timeout. */ |
| public static final int DFLT_SES_WRITE_TIMEOUT = 5000; |
| |
| /** Default send queue limit. */ |
| public static final int DFLT_SEND_QUEUE_LIMIT = 0; |
| |
| /** Time, which server will wait before retry operation. */ |
| private static final long ERR_WAIT_TIME = 2000; |
| |
| /** Buffer metadata key. */ |
| private static final int BUF_META_KEY = GridNioSessionMetaKey.nextUniqueKey(); |
| |
| /** SSL system data buffer metadata key. */ |
| private static final int BUF_SSL_SYSTEM_META_KEY = GridNioSessionMetaKey.nextUniqueKey(); |
| |
| /** SSL write buf limit. */ |
| private static final int WRITE_BUF_LIMIT = GridNioSessionMetaKey.nextUniqueKey(); |
| |
| /** Session future meta key. */ |
| public static final int RECOVERY_DESC_META_KEY = GridNioSessionMetaKey.nextUniqueKey(); |
| |
| /** Selection key meta key. */ |
| private static final int WORKER_IDX_META_KEY = GridNioSessionMetaKey.nextUniqueKey(); |
| |
| /** Meta key for pending requests to be written. */ |
| private static final int REQUESTS_META_KEY = GridNioSessionMetaKey.nextUniqueKey(); |
| |
| /** */ |
| private static final boolean DISABLE_KEYSET_OPTIMIZATION = |
| IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_NO_SELECTOR_OPTS); |
| |
| /** */ |
| public static final String OUTBOUND_MESSAGES_QUEUE_SIZE_METRIC_NAME = "outboundMessagesQueueSize"; |
| |
| /** */ |
| public static final String OUTBOUND_MESSAGES_QUEUE_SIZE_METRIC_DESC = "Number of messages waiting to be sent"; |
| |
| /** */ |
| public static final String RECEIVED_BYTES_METRIC_NAME = "receivedBytes"; |
| |
| /** */ |
| public static final String RECEIVED_BYTES_METRIC_DESC = "Total number of bytes received by current node"; |
| |
| /** */ |
| public static final String SENT_BYTES_METRIC_NAME = "sentBytes"; |
| |
| /** */ |
| public static final String SENT_BYTES_METRIC_DESC = "Total number of bytes sent by current node"; |
| |
| /** Defines how many times selector should do {@code selectNow()} before doing {@code select(long)}. */ |
| private long selectorSpins; |
| |
| /** Accept worker. */ |
| @GridToStringExclude |
| private final GridNioAcceptWorker acceptWorker; |
| |
| /** Read worker threads. */ |
| private final IgniteThread[] clientThreads; |
| |
| /** Read workers. */ |
| private final List<AbstractNioClientWorker> clientWorkers; |
| |
| /** Filter chain to use. */ |
| private final GridNioFilterChain<T> filterChain; |
| |
| /** Server listener. */ |
| private final GridNioServerListener<T> lsnr; |
| |
| /** Logger. */ |
| @GridToStringExclude |
| private final IgniteLogger log; |
| |
| /** Closed flag. */ |
| private volatile boolean closed; |
| |
| /** Flag indicating if this server should use direct buffers. */ |
| private final boolean directBuf; |
| |
| /** Index to select which thread will serve next incoming socket channel. Using round-robin balancing. */ |
| @GridToStringExclude |
| private int readBalanceIdx; |
| |
| /** Index to select which thread will serve next out socket channel. Using round-robin balancing. */ |
| @GridToStringExclude |
| private int writeBalanceIdx = 1; |
| |
| /** Tcp no delay flag. */ |
| private final boolean tcpNoDelay; |
| |
| /** Socket send buffer. */ |
| private final int sockSndBuf; |
| |
| /** Socket receive buffer. */ |
| private final int sockRcvBuf; |
| |
| /** Write timeout */ |
| private volatile long writeTimeout = DFLT_SES_WRITE_TIMEOUT; |
| |
| /** Idle timeout. */ |
| private volatile long idleTimeout = ConnectorConfiguration.DFLT_IDLE_TIMEOUT; |
| |
| /** For test purposes only. */ |
| private boolean skipWrite; |
| |
| /** For test purposes only. */ |
| private boolean skipRead; |
| |
| /** Local address. */ |
| private final InetSocketAddress locAddr; |
| |
| /** Order. */ |
| private final ByteOrder order; |
| |
| /** Send queue limit. */ |
| private final int sndQueueLimit; |
| |
| /** Whether direct mode is used. */ |
| private final boolean directMode; |
| |
| /** */ |
| @Nullable private final MetricRegistry mreg; |
| |
| /** Received bytes count metric. */ |
| @Nullable private final LongAdderMetric rcvdBytesCntMetric; |
| |
| /** Sent bytes count metric. */ |
| @Nullable private final LongAdderMetric sentBytesCntMetric; |
| |
| /** Outbound messages queue size. */ |
| @Nullable private final LongAdderMetric outboundMessagesQueueSizeMetric; |
| |
| /** Sessions. */ |
| private final GridConcurrentHashSet<GridSelectorNioSessionImpl> sessions = new GridConcurrentHashSet<>(); |
| |
| /** */ |
| private GridNioSslFilter sslFilter; |
| |
| /** */ |
| @GridToStringExclude |
| private GridNioMessageWriterFactory writerFactory; |
| |
| /** */ |
| @GridToStringExclude |
| private IgnitePredicate<Message> skipRecoveryPred; |
| |
| /** Optional listener to monitor outbound message queue size. */ |
| private IgniteBiInClosure<GridNioSession, Integer> msgQueueLsnr; |
| |
| /** */ |
| private final AtomicLong readerMoveCnt = new AtomicLong(); |
| |
| /** */ |
| private final AtomicLong writerMoveCnt = new AtomicLong(); |
| |
| /** */ |
| private final IgniteRunnable balancer; |
| |
| /** |
| * Interval in milliseconds between consequtive {@link GridWorkerListener#onIdle(GridWorker)} calls |
| * in server workers. |
| */ |
| private final boolean readWriteSelectorsAssign; |
| |
| /** Tracing processor. */ |
| private Tracing tracing; |
| |
| /** |
| * @param addr Address. |
| * @param port Port. |
| * @param log Log. |
| * @param selectorCnt Count of selectors and selecting threads. |
| * @param igniteInstanceName Ignite instance name. |
| * @param srvName Logical server name for threads identification. |
| * @param selectorSpins Defines how many non-blocking {@code selector.selectNow()} should be made before |
| * falling into {@code selector.select(long)} in NIO server. Long value. Default is {@code 0}. |
| * Can be set to {@code Long.MAX_VALUE} so selector threads will never block. |
| * @param tcpNoDelay If TCP_NODELAY option should be set to accepted sockets. |
| * @param directBuf Direct buffer flag. |
| * @param order Byte order. |
| * @param lsnr Listener. |
| * @param sockSndBuf Socket send buffer. |
| * @param sockRcvBuf Socket receive buffer. |
| * @param sndQueueLimit Send queue limit. |
| * @param directMode Whether direct mode is used. |
| * @param daemon Daemon flag to create threads. |
| * @param writerFactory Writer factory. |
| * @param skipRecoveryPred Skip recovery predicate. |
| * @param msgQueueLsnr Message queue size listener. |
| * @param readWriteSelectorsAssign If {@code true} then in/out connections are assigned to even/odd workers. |
| * @param workerLsnr Worker lifecycle listener. |
| * @param mreg Metrics registry. |
| * @param filters Filters for this server. |
| * @throws IgniteCheckedException If failed. |
| */ |
| private GridNioServer( |
| InetAddress addr, |
| int port, |
| IgniteLogger log, |
| int selectorCnt, |
| @Nullable String igniteInstanceName, |
| @Nullable String srvName, |
| long selectorSpins, |
| boolean tcpNoDelay, |
| boolean directBuf, |
| ByteOrder order, |
| GridNioServerListener<T> lsnr, |
| int sockSndBuf, |
| int sockRcvBuf, |
| int sndQueueLimit, |
| boolean directMode, |
| boolean daemon, |
| GridNioMessageWriterFactory writerFactory, |
| IgnitePredicate<Message> skipRecoveryPred, |
| IgniteBiInClosure<GridNioSession, Integer> msgQueueLsnr, |
| boolean readWriteSelectorsAssign, |
| @Nullable GridWorkerListener workerLsnr, |
| @Nullable MetricRegistry mreg, |
| Tracing tracing, |
| GridNioFilter... filters |
| ) throws IgniteCheckedException { |
| if (port != -1) |
| A.notNull(addr, "addr"); |
| |
| A.notNull(lsnr, "lsnr"); |
| A.notNull(log, "log"); |
| A.notNull(order, "order"); |
| |
| A.ensure(port == -1 || (port > 0 && port < 0xffff), "port"); |
| A.ensure(selectorCnt > 0, "selectorCnt"); |
| A.ensure(sockRcvBuf >= 0, "sockRcvBuf"); |
| A.ensure(sockSndBuf >= 0, "sockSndBuf"); |
| A.ensure(sndQueueLimit >= 0, "sndQueueLimit"); |
| |
| this.log = log; |
| this.directBuf = directBuf; |
| this.order = order; |
| this.tcpNoDelay = tcpNoDelay; |
| this.sockRcvBuf = sockRcvBuf; |
| this.sockSndBuf = sockSndBuf; |
| this.sndQueueLimit = sndQueueLimit; |
| this.msgQueueLsnr = msgQueueLsnr; |
| this.selectorSpins = selectorSpins; |
| this.readWriteSelectorsAssign = readWriteSelectorsAssign; |
| this.lsnr = lsnr; |
| this.tracing = tracing == null ? new NoopTracing() : tracing; |
| |
| filterChain = new GridNioFilterChain<>(log, lsnr, new HeadFilter(), filters); |
| |
| if (directMode) { |
| for (GridNioFilter filter : filters) { |
| if (filter instanceof GridNioSslFilter) { |
| sslFilter = (GridNioSslFilter)filter; |
| |
| assert sslFilter.directMode(); |
| } |
| } |
| } |
| |
| if (port != -1) { |
| // Once bind, we will not change the port in future. |
| locAddr = new InetSocketAddress(addr, port); |
| |
| // This method will throw exception if address already in use. |
| Selector acceptSelector = createSelector(locAddr); |
| |
| String threadName; |
| |
| if (srvName == null) |
| threadName = "nio-acceptor"; |
| else |
| threadName = "nio-acceptor-" + srvName; |
| |
| acceptWorker = new GridNioAcceptWorker(igniteInstanceName, threadName, log, acceptSelector, workerLsnr); |
| } |
| else { |
| locAddr = null; |
| acceptWorker = null; |
| } |
| |
| clientWorkers = new ArrayList<>(selectorCnt); |
| clientThreads = new IgniteThread[selectorCnt]; |
| |
| for (int i = 0; i < selectorCnt; i++) { |
| String threadName; |
| |
| if (srvName == null) |
| threadName = "grid-nio-worker-" + i; |
| else |
| threadName = "grid-nio-worker-" + srvName + "-" + i; |
| |
| AbstractNioClientWorker worker = directMode ? |
| new DirectNioClientWorker(i, igniteInstanceName, threadName, log, workerLsnr) : |
| new ByteBufferNioClientWorker(i, igniteInstanceName, threadName, log, workerLsnr); |
| |
| clientWorkers.add(worker); |
| |
| clientThreads[i] = new IgniteThread(worker); |
| |
| clientThreads[i].setDaemon(daemon); |
| } |
| |
| this.directMode = directMode; |
| this.writerFactory = writerFactory; |
| |
| this.skipRecoveryPred = skipRecoveryPred != null ? skipRecoveryPred : F.<Message>alwaysFalse(); |
| |
| long balancePeriod = IgniteSystemProperties.getLong(IgniteSystemProperties.IGNITE_IO_BALANCE_PERIOD, 5000); |
| |
| IgniteRunnable balancer0 = null; |
| |
| if (balancePeriod > 0) { |
| boolean rndBalance = IgniteSystemProperties.getBoolean(IGNITE_IO_BALANCE_RANDOM_BALANCE, false); |
| |
| if (rndBalance) |
| balancer0 = new RandomBalancer(); |
| else { |
| balancer0 = readWriteSelectorsAssign ? |
| new ReadWriteSizeBasedBalancer(balancePeriod) : |
| new SizeBasedBalancer(balancePeriod); |
| } |
| } |
| |
| this.balancer = balancer0; |
| |
| this.mreg = mreg; |
| |
| rcvdBytesCntMetric = mreg == null ? |
| null : mreg.longAdderMetric(RECEIVED_BYTES_METRIC_NAME, RECEIVED_BYTES_METRIC_DESC); |
| |
| sentBytesCntMetric = mreg == null ? |
| null : mreg.longAdderMetric(SENT_BYTES_METRIC_NAME, SENT_BYTES_METRIC_DESC); |
| |
| outboundMessagesQueueSizeMetric = mreg == null ? null : mreg.longAdderMetric( |
| OUTBOUND_MESSAGES_QUEUE_SIZE_METRIC_NAME, |
| OUTBOUND_MESSAGES_QUEUE_SIZE_METRIC_DESC |
| ); |
| } |
| |
| /** |
| * @return Number of reader sessions move. |
| */ |
| public long readerMoveCount() { |
| return readerMoveCnt.get(); |
| } |
| |
| /** |
| * @return Number of reader writer move. |
| */ |
| public long writerMoveCount() { |
| return writerMoveCnt.get(); |
| } |
| |
| /** |
| * @return Configured port. |
| */ |
| public int port() { |
| return locAddr != null ? locAddr.getPort() : -1; |
| } |
| |
| /** |
| * Creates and returns a builder for a new instance of this class. |
| * |
| * @return Builder for new instance. |
| */ |
| public static <T> Builder<T> builder() { |
| return new Builder<>(); |
| } |
| |
| /** |
| * Starts all associated threads to perform accept and read activities. |
| */ |
| public void start() { |
| filterChain.start(); |
| |
| if (acceptWorker != null) |
| new IgniteThread(acceptWorker).start(); |
| |
| for (IgniteThread thread : clientThreads) |
| thread.start(); |
| } |
| |
| /** |
| * Stops all threads and releases all resources. |
| */ |
| public void stop() { |
| if (!closed) { |
| closed = true; |
| |
| // Make sure to entirely stop acceptor if any. |
| U.cancel(acceptWorker); |
| U.join(acceptWorker, log); |
| |
| U.cancel(clientWorkers); |
| U.join(clientWorkers, log); |
| |
| filterChain.stop(); |
| |
| for (GridSelectorNioSessionImpl ses : sessions) |
| ses.onServerStopped(); |
| } |
| } |
| |
| /** |
| * Gets the address server is bound to. |
| * |
| * @return Address server is bound to. |
| */ |
| public InetSocketAddress localAddress() { |
| return locAddr; |
| } |
| |
| /** |
| * @return Selector spins. |
| */ |
| public long selectorSpins() { |
| return selectorSpins; |
| } |
| |
| /** |
| * @param ses Session to close. |
| * @return Future for operation. |
| */ |
| public GridNioFuture<Boolean> close(GridNioSession ses) { |
| assert ses instanceof GridSelectorNioSessionImpl : ses; |
| |
| GridSelectorNioSessionImpl impl = (GridSelectorNioSessionImpl)ses; |
| |
| if (impl.closed()) |
| return new GridNioFinishedFuture<>(false); |
| |
| NioOperationFuture<Boolean> fut = new NioOperationFuture<>(impl, NioOperation.CLOSE); |
| |
| impl.offerStateChange(fut); |
| |
| return fut; |
| } |
| |
| /** |
| * @param ses Session. |
| */ |
| public void closeFromWorkerThread(GridNioSession ses) { |
| assert ses instanceof GridSelectorNioSessionImpl : ses; |
| |
| GridSelectorNioSessionImpl ses0 = (GridSelectorNioSessionImpl)ses; |
| |
| ((AbstractNioClientWorker)ses0.worker()).close((GridSelectorNioSessionImpl)ses, null); |
| } |
| |
| /** |
| * @param ses Session. |
| * @param msg Message. |
| * @param createFut {@code True} if future should be created. |
| * @param ackC Closure invoked when message ACK is received. |
| * @return Future for operation. |
| */ |
| GridNioFuture<?> send(GridNioSession ses, |
| ByteBuffer msg, |
| boolean createFut, |
| IgniteInClosure<IgniteException> ackC) throws IgniteCheckedException { |
| assert ses instanceof GridSelectorNioSessionImpl : ses; |
| |
| GridSelectorNioSessionImpl impl = (GridSelectorNioSessionImpl)ses; |
| |
| if (createFut) { |
| NioOperationFuture<?> fut = new NioOperationFuture<Void>(impl, NioOperation.REQUIRE_WRITE, msg, ackC); |
| |
| send0(impl, fut, false); |
| |
| return fut; |
| } |
| else { |
| SessionWriteRequest req = new WriteRequestImpl(ses, msg, true, ackC); |
| |
| send0(impl, req, false); |
| |
| return null; |
| } |
| } |
| |
| /** |
| * @param ses Session. |
| * @param msg Message. |
| * @param createFut {@code True} if future should be created. |
| * @param ackC Closure invoked when message ACK is received. |
| * @return Future for operation. |
| */ |
| GridNioFuture<?> send(GridNioSession ses, |
| Message msg, |
| boolean createFut, |
| IgniteInClosure<IgniteException> ackC) throws IgniteCheckedException { |
| assert ses instanceof GridSelectorNioSessionImpl; |
| |
| GridSelectorNioSessionImpl impl = (GridSelectorNioSessionImpl)ses; |
| |
| if (createFut) { |
| NioOperationFuture<?> fut = new NioOperationFuture<Void>(impl, NioOperation.REQUIRE_WRITE, msg, |
| skipRecoveryPred.apply(msg), ackC); |
| |
| send0(impl, fut, false); |
| |
| return fut; |
| } |
| else { |
| SessionWriteRequest req = new WriteRequestImpl(ses, msg, skipRecoveryPred.apply(msg), ackC); |
| |
| send0(impl, req, false); |
| |
| return null; |
| } |
| } |
| |
| /** |
| * @param ses Session. |
| * @param req Request. |
| * @param sys System message flag. |
| * @throws IgniteCheckedException If session was closed. |
| */ |
| private void send0(GridSelectorNioSessionImpl ses, SessionWriteRequest req, boolean sys) throws IgniteCheckedException { |
| assert ses != null; |
| assert req != null; |
| |
| int msgCnt = sys ? ses.offerSystemFuture(req) : ses.offerFuture(req); |
| |
| if (ses.closed()) { |
| if (ses.removeFuture(req)) { |
| IOException err = new IOException("Failed to send message (connection was closed): " + ses); |
| |
| req.onError(err); |
| |
| if (!(req instanceof GridNioFuture)) |
| throw new IgniteCheckedException(err); |
| } |
| } |
| else if (!ses.procWrite.get() && ses.procWrite.compareAndSet(false, true)) { |
| AbstractNioClientWorker worker = (AbstractNioClientWorker)ses.worker(); |
| |
| if (worker != null) |
| worker.offer((SessionChangeRequest)req); |
| } |
| |
| if (msgQueueLsnr != null) |
| msgQueueLsnr.apply(ses, msgCnt); |
| } |
| |
| /** |
| * Adds message at the front of the queue without acquiring back pressure semaphore. |
| * |
| * @param ses Session. |
| * @param msg Message. |
| * @throws IgniteCheckedException If session was closed. |
| */ |
| public void sendSystem(GridNioSession ses, Message msg) throws IgniteCheckedException { |
| sendSystem(ses, msg, null); |
| } |
| |
| /** |
| * Adds message at the front of the queue without acquiring back pressure semaphore. |
| * |
| * @param ses Session. |
| * @param msg Message. |
| * @param lsnr Future listener notified from the session thread. |
| * @throws IgniteCheckedException If session was closed. |
| */ |
| public void sendSystem(GridNioSession ses, |
| Message msg, |
| @Nullable IgniteInClosure<? super IgniteInternalFuture<?>> lsnr) throws IgniteCheckedException { |
| assert ses instanceof GridSelectorNioSessionImpl; |
| |
| GridSelectorNioSessionImpl impl = (GridSelectorNioSessionImpl)ses; |
| |
| if (lsnr != null) { |
| NioOperationFuture<?> fut = new NioOperationFuture<Void>(impl, |
| NioOperation.REQUIRE_WRITE, |
| msg, |
| skipRecoveryPred.apply(msg), |
| null); |
| |
| fut.listen(lsnr); |
| |
| assert !fut.isDone(); |
| |
| send0(impl, fut, true); |
| } |
| else { |
| SessionWriteRequest req = new WriteRequestSystemImpl(ses, msg); |
| |
| send0(impl, req, true); |
| } |
| } |
| |
| /** |
| * @param ses Session. |
| */ |
| public void resend(GridNioSession ses) { |
| assert ses instanceof GridSelectorNioSessionImpl; |
| |
| GridNioRecoveryDescriptor recoveryDesc = ses.outRecoveryDescriptor(); |
| |
| if (recoveryDesc != null && !recoveryDesc.messagesRequests().isEmpty()) { |
| Deque<SessionWriteRequest> futs = recoveryDesc.messagesRequests(); |
| |
| if (log.isDebugEnabled()) |
| log.debug("Resend messages [rmtNode=" + recoveryDesc.node().id() + ", msgCnt=" + futs.size() + ']'); |
| |
| GridSelectorNioSessionImpl ses0 = (GridSelectorNioSessionImpl)ses; |
| |
| SessionWriteRequest fut0 = futs.iterator().next(); |
| |
| for (SessionWriteRequest fut : futs) { |
| fut.messageThread(true); |
| |
| fut.resetSession(ses0); |
| } |
| |
| ses0.resend(futs); |
| |
| // Wake up worker. |
| ses0.offerStateChange((GridNioServer.SessionChangeRequest)fut0); |
| } |
| } |
| |
| /** |
| * @return Sessions. |
| */ |
| public Collection<? extends GridNioSession> sessions() { |
| return sessions; |
| } |
| |
| /** |
| * @return Workers. |
| */ |
| public List<AbstractNioClientWorker> workers() { |
| return clientWorkers; |
| } |
| |
| /** |
| * @param ses Session. |
| * @param from Move from index. |
| * @param to Move to index. |
| */ |
| private void moveSession(GridNioSession ses, int from, int to) { |
| assert from >= 0 && from < clientWorkers.size() : from; |
| assert to >= 0 && to < clientWorkers.size() : to; |
| assert from != to; |
| |
| GridSelectorNioSessionImpl ses0 = (GridSelectorNioSessionImpl)ses; |
| |
| SessionMoveFuture fut = new SessionMoveFuture(ses0, to); |
| |
| if (!ses0.offerMove(clientWorkers.get(from), fut)) |
| fut.onDone(false); |
| } |
| |
| /** |
| * @param ses Session. |
| * @param op Operation. |
| * @return Future for operation. |
| */ |
| private GridNioFuture<?> pauseResumeReads(GridNioSession ses, NioOperation op) { |
| assert ses instanceof GridSelectorNioSessionImpl; |
| assert op == NioOperation.PAUSE_READ || op == NioOperation.RESUME_READ; |
| |
| GridSelectorNioSessionImpl impl = (GridSelectorNioSessionImpl)ses; |
| |
| if (impl.closed()) |
| return new GridNioFinishedFuture(new IOException("Failed to pause/resume reads " + |
| "(connection was closed): " + ses)); |
| |
| NioOperationFuture<?> fut = new NioOperationFuture<Void>(impl, op); |
| |
| impl.offerStateChange(fut); |
| |
| return fut; |
| } |
| |
| /** |
| * @return Future. |
| */ |
| public IgniteInternalFuture<String> dumpStats() { |
| String msg = "NIO server statistics [readerSesBalanceCnt=" + readerMoveCnt.get() + |
| ", writerSesBalanceCnt=" + writerMoveCnt.get() + ']'; |
| |
| return dumpStats(msg, null); |
| } |
| |
| /** |
| * @param msg Message to add. |
| * @param p Session predicate. |
| * @return Future. |
| */ |
| public IgniteInternalFuture<String> dumpStats(final String msg, IgnitePredicate<GridNioSession> p) { |
| GridCompoundFuture<String, String> fut = new GridCompoundFuture<>(new IgniteReducer<String, String>() { |
| private final StringBuilder sb = new StringBuilder(msg); |
| |
| @Override public boolean collect(@Nullable String msg) { |
| if (!F.isEmpty(msg)) { |
| synchronized (sb) { |
| if (sb.length() > 0) |
| sb.append(U.nl()); |
| |
| sb.append(msg); |
| } |
| } |
| |
| return true; |
| } |
| |
| @Override public String reduce() { |
| synchronized (sb) { |
| return sb.toString(); |
| } |
| } |
| }); |
| |
| for (int i = 0; i < clientWorkers.size(); i++) { |
| NioOperationFuture<String> opFut = new NioOperationFuture<>(null, NioOperation.DUMP_STATS); |
| |
| opFut.msg = p; |
| |
| clientWorkers.get(i).offer(opFut); |
| |
| fut.add(opFut); |
| } |
| |
| fut.markInitialized(); |
| |
| return fut; |
| } |
| |
| /** |
| * @param msg Message to add. |
| * @param p Session predicate. |
| * @return Future. |
| */ |
| @SuppressWarnings("ForLoopReplaceableByForEach") |
| public IgniteInternalFuture<String> dumpNodeStats(final String msg, IgnitePredicate<GridNioSession> p) { |
| GridCompoundFuture<String, String> fut = new GridCompoundFuture<>(new IgniteReducer<String, String>() { |
| private final StringBuilder sb = new StringBuilder(msg); |
| |
| @Override public boolean collect(@Nullable String msg) { |
| if (!F.isEmpty(msg)) { |
| synchronized (sb) { |
| if (sb.length() > 0) |
| sb.append(U.nl()); |
| |
| sb.append(msg); |
| } |
| } |
| |
| return true; |
| } |
| |
| @Override public String reduce() { |
| synchronized (sb) { |
| return sb.toString(); |
| } |
| } |
| }); |
| |
| for (int i = 0; i < clientWorkers.size(); i++) { |
| NioOperationFuture<String> opFut = new NioOperationFuture<>(null, NioOperation.DUMP_STATS); |
| |
| opFut.msg = p; |
| |
| clientWorkers.get(i).offer(opFut); |
| |
| fut.add(opFut); |
| } |
| |
| fut.markInitialized(); |
| |
| return fut; |
| } |
| |
| /** |
| * Establishes a session. |
| * |
| * @param ch Channel to register within the server and create session for. |
| * @param meta Optional meta for new session. |
| * @param async Async connection. |
| * @param lsnr Listener that should be invoked in NIO thread. |
| * @return Future to get session. |
| */ |
| public GridNioFuture<GridNioSession> createSession( |
| final SocketChannel ch, |
| @Nullable Map<Integer, Object> meta, |
| boolean async, |
| @Nullable IgniteInClosure<? super IgniteInternalFuture<GridNioSession>> lsnr |
| ) { |
| try { |
| if (!closed) { |
| ch.configureBlocking(false); |
| |
| NioOperationFuture<GridNioSession> req = new NioOperationFuture<>(ch, false, meta); |
| |
| if (async) { |
| assert meta != null; |
| |
| req.op = NioOperation.CONNECT; |
| } |
| |
| if (lsnr != null) |
| req.listen(lsnr); |
| |
| offerBalanced(req, meta); |
| |
| return req; |
| } |
| else |
| return new GridNioFinishedFuture<>( |
| new IgniteCheckedException("Failed to create session, server is stopped.")); |
| } |
| catch (IOException e) { |
| return new GridNioFinishedFuture<>(e); |
| } |
| } |
| |
| /** |
| * @param ch Channel. |
| * @param meta Session meta. |
| */ |
| public GridNioFuture<GridNioSession> cancelConnect(final SocketChannel ch, Map<Integer, ?> meta) { |
| if (!closed) { |
| NioOperationFuture<GridNioSession> req = new NioOperationFuture<>(ch, false, meta); |
| |
| req.op = NioOperation.CANCEL_CONNECT; |
| |
| Integer idx = (Integer)meta.get(WORKER_IDX_META_KEY); |
| |
| assert idx != null : meta; |
| |
| clientWorkers.get(idx).offer(req); |
| |
| return req; |
| } |
| else |
| return new GridNioFinishedFuture<>( |
| new IgniteCheckedException("Failed to cancel connection, server is stopped.")); |
| } |
| |
| /** |
| * Gets configurable write timeout for this session. If not set, default value is {@link #DFLT_SES_WRITE_TIMEOUT}. |
| * |
| * @return Write timeout in milliseconds. |
| */ |
| public long writeTimeout() { |
| return writeTimeout; |
| } |
| |
| /** |
| * Sets configurable write timeout for session. |
| * |
| * @param writeTimeout Write timeout in milliseconds. |
| */ |
| public void writeTimeout(long writeTimeout) { |
| this.writeTimeout = writeTimeout; |
| } |
| |
| /** |
| * Gets configurable idle timeout for this session. If not set, default value is |
| * {@link ConnectorConfiguration#DFLT_IDLE_TIMEOUT}. |
| * |
| * @return Idle timeout in milliseconds. |
| */ |
| public long idleTimeout() { |
| return idleTimeout; |
| } |
| |
| /** |
| * Sets configurable idle timeout for session. |
| * |
| * @param idleTimeout Idle timeout in milliseconds. |
| */ |
| public void idleTimeout(long idleTimeout) { |
| this.idleTimeout = idleTimeout; |
| } |
| |
| /** |
| * Creates selector and binds server socket to a given address and port. If address is null |
| * then will not bind any address and just creates a selector. |
| * |
| * @param addr Local address to listen on. |
| * @return Created selector. |
| * @throws IgniteCheckedException If selector could not be created or port is already in use. |
| */ |
| private Selector createSelector(@Nullable SocketAddress addr) throws IgniteCheckedException { |
| Selector selector = null; |
| |
| ServerSocketChannel srvrCh = null; |
| |
| try { |
| // Create a new selector |
| selector = SelectorProvider.provider().openSelector(); |
| |
| if (addr != null) { |
| // Create a new non-blocking server socket channel |
| srvrCh = ServerSocketChannel.open(); |
| |
| srvrCh.configureBlocking(false); |
| |
| if (sockRcvBuf > 0) |
| srvrCh.socket().setReceiveBufferSize(sockRcvBuf); |
| |
| // Bind the server socket to the specified address and port |
| srvrCh.socket().bind(addr); |
| |
| // Register the server socket channel, indicating an interest in |
| // accepting new connections |
| srvrCh.register(selector, SelectionKey.OP_ACCEPT); |
| } |
| |
| return selector; |
| } |
| catch (Throwable e) { |
| U.close(srvrCh, log); |
| U.close(selector, log); |
| |
| if (e instanceof Error) |
| throw (Error)e; |
| |
| throw new IgniteCheckedException("Failed to initialize NIO selector.", e); |
| } |
| } |
| |
| /** |
| * @param req Request to balance. |
| * @param meta Session metadata. |
| */ |
| private synchronized void offerBalanced(NioOperationFuture req, @Nullable Map<Integer, Object> meta) { |
| assert req.operation() == NioOperation.REGISTER || req.operation() == NioOperation.CONNECT : req; |
| assert req.socketChannel() != null : req; |
| |
| int workers = clientWorkers.size(); |
| |
| int balanceIdx; |
| |
| if (workers > 1) { |
| if (readWriteSelectorsAssign) { |
| if (req.accepted()) { |
| balanceIdx = readBalanceIdx; |
| |
| readBalanceIdx += 2; |
| |
| if (readBalanceIdx >= workers) |
| readBalanceIdx = 0; |
| } |
| else { |
| balanceIdx = writeBalanceIdx; |
| |
| writeBalanceIdx += 2; |
| |
| if (writeBalanceIdx >= workers) |
| writeBalanceIdx = 1; |
| } |
| } |
| else { |
| balanceIdx = readBalanceIdx; |
| |
| readBalanceIdx++; |
| |
| if (readBalanceIdx >= workers) |
| readBalanceIdx = 0; |
| } |
| } |
| else |
| balanceIdx = 0; |
| |
| if (meta != null) |
| meta.put(WORKER_IDX_META_KEY, balanceIdx); |
| |
| clientWorkers.get(balanceIdx).offer(req); |
| } |
| |
| /** |
| * Stop polling for write availability if write queue is empty. |
| */ |
| private void stopPollingForWrite(SelectionKey key, GridSelectorNioSessionImpl ses) { |
| if (ses.procWrite.get()) { |
| ses.procWrite.set(false); |
| |
| if (ses.writeQueue().isEmpty()) { |
| if ((key.interestOps() & SelectionKey.OP_WRITE) != 0) |
| key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE)); |
| } |
| else |
| ses.procWrite.set(true); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public String toString() { |
| return S.toString(GridNioServer.class, this); |
| } |
| |
| /** |
| * Client worker for byte buffer mode. |
| */ |
| private class ByteBufferNioClientWorker extends AbstractNioClientWorker { |
| /** Read buffer. */ |
| private final ByteBuffer readBuf; |
| |
| /** |
| * @param idx Index of this worker in server's array. |
| * @param igniteInstanceName Ignite instance name. |
| * @param name Worker name. |
| * @param log Logger. |
| * @param workerLsnr Worker lifecycle listener. |
| * @throws IgniteCheckedException If selector could not be created. |
| */ |
| protected ByteBufferNioClientWorker( |
| int idx, |
| @Nullable String igniteInstanceName, |
| String name, |
| IgniteLogger log, |
| @Nullable GridWorkerListener workerLsnr |
| ) throws IgniteCheckedException { |
| super(idx, igniteInstanceName, name, log, workerLsnr); |
| |
| readBuf = directBuf ? ByteBuffer.allocateDirect(8 << 10) : ByteBuffer.allocate(8 << 10); |
| |
| readBuf.order(order); |
| } |
| |
| /** |
| * Processes read-available event on the key. |
| * |
| * @param key Key that is ready to be read. |
| * @throws IOException If key read failed. |
| */ |
| @Override protected void processRead(SelectionKey key) throws IOException { |
| if (skipRead) { |
| try { |
| U.sleep(50); |
| } |
| catch (IgniteInterruptedCheckedException ignored) { |
| U.warn(log, "Sleep has been interrupted."); |
| } |
| |
| return; |
| } |
| |
| ReadableByteChannel sockCh = (ReadableByteChannel)key.channel(); |
| |
| final GridSelectorNioSessionImpl ses = (GridSelectorNioSessionImpl)key.attachment(); |
| |
| // Reset buffer to read bytes up to its capacity. |
| readBuf.clear(); |
| |
| // Attempt to read off the channel |
| int cnt = sockCh.read(readBuf); |
| |
| if (cnt == -1) { |
| if (log.isDebugEnabled()) |
| log.debug("Remote client closed connection: " + ses); |
| |
| close(ses, null); |
| |
| return; |
| } |
| else if (cnt == 0) |
| return; |
| |
| if (log.isTraceEnabled()) |
| log.trace("Bytes received [sockCh=" + sockCh + ", cnt=" + cnt + ']'); |
| |
| if (rcvdBytesCntMetric != null) |
| rcvdBytesCntMetric.add(cnt); |
| |
| ses.bytesReceived(cnt); |
| |
| // Sets limit to current position and |
| // resets position to 0. |
| readBuf.flip(); |
| |
| try { |
| assert readBuf.hasRemaining(); |
| |
| filterChain.onMessageReceived(ses, readBuf); |
| |
| if (readBuf.remaining() > 0) { |
| LT.warn(log, "Read buffer contains data after filter chain processing (will discard " + |
| "remaining bytes) [ses=" + ses + ", remainingCnt=" + readBuf.remaining() + ']'); |
| |
| readBuf.clear(); |
| } |
| } |
| catch (IgniteCheckedException e) { |
| close(ses, e); |
| } |
| } |
| |
| /** |
| * Processes write-ready event on the key. |
| * |
| * @param key Key that is ready to be written. |
| * @throws IOException If write failed. |
| */ |
| @Override protected void processWrite(SelectionKey key) throws IOException { |
| WritableByteChannel sockCh = (WritableByteChannel)key.channel(); |
| |
| final GridSelectorNioSessionImpl ses = (GridSelectorNioSessionImpl)key.attachment(); |
| |
| while (true) { |
| ByteBuffer buf = ses.removeMeta(BUF_META_KEY); |
| SessionWriteRequest req = ses.removeMeta(NIO_OPERATION.ordinal()); |
| |
| // Check if there were any pending data from previous writes. |
| if (buf == null) { |
| assert req == null; |
| |
| req = ses.pollFuture(); |
| |
| if (req == null) { |
| stopPollingForWrite(key, ses); |
| |
| break; |
| } |
| |
| buf = (ByteBuffer)req.message(); |
| } |
| |
| if (!skipWrite) { |
| Span span = tracing.create(COMMUNICATION_SOCKET_WRITE, req.span()); |
| |
| try (TraceSurroundings ignore = span.equals(NoopSpan.INSTANCE) ? null : MTC.support(span)) { |
| int cnt = sockCh.write(buf); |
| |
| if (log.isTraceEnabled()) |
| log.trace("Bytes sent [sockCh=" + sockCh + ", cnt=" + cnt + ']'); |
| |
| if (sentBytesCntMetric != null) |
| sentBytesCntMetric.add(cnt); |
| |
| ses.bytesSent(cnt); |
| } |
| } |
| else { |
| // For test purposes only (skipWrite is set to true in tests only). |
| try { |
| U.sleep(50); |
| } |
| catch (IgniteInterruptedCheckedException e) { |
| throw new IOException("Thread has been interrupted.", e); |
| } |
| } |
| |
| if (buf.remaining() > 0) { |
| // Not all data was written. |
| ses.addMeta(BUF_META_KEY, buf); |
| ses.addMeta(NIO_OPERATION.ordinal(), req); |
| |
| break; |
| } |
| else { |
| // Message was successfully written. |
| assert req != null; |
| |
| req.onMessageWritten(); |
| } |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public String toString() { |
| return S.toString(ByteBufferNioClientWorker.class, this, super.toString()); |
| } |
| } |
| |
| /** |
| * Client worker for direct mode. |
| */ |
| private class DirectNioClientWorker extends AbstractNioClientWorker { |
| /** |
| * @param idx Index of this worker in server's array. |
| * @param igniteInstanceName Ignite instance name. |
| * @param name Worker name. |
| * @param log Logger. |
| * @param workerLsnr Worker lifecycle listener. |
| * @throws IgniteCheckedException If selector could not be created. |
| */ |
| protected DirectNioClientWorker( |
| int idx, |
| @Nullable String igniteInstanceName, |
| String name, |
| IgniteLogger log, |
| @Nullable GridWorkerListener workerLsnr |
| ) throws IgniteCheckedException { |
| super(idx, igniteInstanceName, name, log, workerLsnr); |
| } |
| |
| /** |
| * Processes read-available event on the key. |
| * |
| * @param key Key that is ready to be read. |
| * @throws IOException If key read failed. |
| */ |
| @Override protected void processRead(SelectionKey key) throws IOException { |
| if (skipRead) { |
| try { |
| U.sleep(50); |
| } |
| catch (IgniteInterruptedCheckedException ignored) { |
| U.warn(log, "Sleep has been interrupted."); |
| } |
| |
| return; |
| } |
| |
| ReadableByteChannel sockCh = (ReadableByteChannel)key.channel(); |
| |
| final GridSelectorNioSessionImpl ses = (GridSelectorNioSessionImpl)key.attachment(); |
| |
| ByteBuffer readBuf = ses.readBuffer(); |
| |
| // Attempt to read off the channel. |
| int cnt = sockCh.read(readBuf); |
| |
| if (cnt == -1) { |
| if (log.isDebugEnabled()) |
| log.debug("Remote client closed connection: " + ses); |
| |
| close(ses, null); |
| |
| return; |
| } |
| |
| if (log.isTraceEnabled()) |
| log.trace("Bytes received [sockCh=" + sockCh + ", cnt=" + cnt + ']'); |
| |
| if (cnt == 0) |
| return; |
| |
| if (rcvdBytesCntMetric != null) |
| rcvdBytesCntMetric.add(cnt); |
| |
| ses.bytesReceived(cnt); |
| onRead(cnt); |
| |
| readBuf.flip(); |
| |
| assert readBuf.hasRemaining(); |
| |
| try { |
| filterChain.onMessageReceived(ses, readBuf); |
| |
| if (readBuf.hasRemaining()) |
| readBuf.compact(); |
| else |
| readBuf.clear(); |
| |
| if (ses.hasSystemMessage() && !ses.procWrite.get()) { |
| ses.procWrite.set(true); |
| |
| registerWrite(ses); |
| } |
| } |
| catch (IgniteCheckedException e) { |
| close(ses, e); |
| } |
| } |
| |
| /** |
| * Processes write-ready event on the key. |
| * |
| * @param key Key that is ready to be written. |
| * @throws IOException If write failed. |
| */ |
| @Override protected void processWrite(SelectionKey key) throws IOException { |
| if (sslFilter != null) |
| processWriteSsl(key); |
| else |
| processWrite0(key); |
| } |
| |
| /** |
| * Processes write-ready event on the key. |
| * |
| * @param key Key that is ready to be written. |
| * @throws IOException If write failed. |
| */ |
| private void processWriteSsl(SelectionKey key) throws IOException { |
| WritableByteChannel sockCh = (WritableByteChannel)key.channel(); |
| |
| GridSelectorNioSessionImpl ses = (GridSelectorNioSessionImpl)key.attachment(); |
| |
| MessageWriter writer = ses.meta(MSG_WRITER.ordinal()); |
| |
| if (writer == null) { |
| try { |
| ses.addMeta(MSG_WRITER.ordinal(), writer = writerFactory.writer(ses)); |
| } |
| catch (IgniteCheckedException e) { |
| throw new IOException("Failed to create message writer.", e); |
| } |
| } |
| |
| boolean handshakeFinished = sslFilter.lock(ses); |
| |
| try { |
| boolean writeFinished = writeSslSystem(ses, sockCh); |
| |
| if (!handshakeFinished) { |
| if (writeFinished) |
| stopPollingForWrite(key, ses); |
| |
| return; |
| } |
| |
| ByteBuffer sslNetBuf = ses.removeMeta(BUF_META_KEY); |
| |
| if (sslNetBuf != null) { |
| int cnt = sockCh.write(sslNetBuf); |
| |
| if (sentBytesCntMetric != null) |
| sentBytesCntMetric.add(cnt); |
| |
| ses.bytesSent(cnt); |
| |
| if (sslNetBuf.hasRemaining()) { |
| ses.addMeta(BUF_META_KEY, sslNetBuf); |
| |
| return; |
| } |
| else { |
| List<SessionWriteRequest> requests = ses.removeMeta(REQUESTS_META_KEY); |
| |
| if (requests != null) |
| onRequestsWritten(ses, requests); |
| } |
| } |
| |
| ByteBuffer buf = ses.writeBuffer(); |
| |
| if (ses.meta(WRITE_BUF_LIMIT) != null) |
| buf.limit(ses.meta(WRITE_BUF_LIMIT)); |
| |
| SessionWriteRequest req = ses.removeMeta(NIO_OPERATION.ordinal()); |
| |
| while (true) { |
| if (req == null) { |
| req = systemMessage(ses); |
| |
| if (req == null) { |
| req = ses.pollFuture(); |
| |
| if (req == null && buf.position() == 0) { |
| stopPollingForWrite(key, ses); |
| |
| break; |
| } |
| } |
| } |
| |
| Message msg; |
| boolean finished = false; |
| |
| List<SessionWriteRequest> pendingRequests = new ArrayList<>(2); |
| |
| if (req != null) |
| finished = writeToBuffer(writer, buf, req, pendingRequests); |
| |
| // Fill up as many messages as possible to write buffer. |
| while (finished) { |
| req = systemMessage(ses); |
| |
| if (req == null) |
| req = ses.pollFuture(); |
| |
| if (req == null) |
| break; |
| |
| finished = writeToBuffer(writer, buf, req, pendingRequests); |
| } |
| |
| int sesBufLimit = buf.limit(); |
| int sesCap = buf.capacity(); |
| |
| buf.flip(); |
| |
| buf = sslFilter.encrypt(ses, buf); |
| |
| ByteBuffer sesBuf = ses.writeBuffer(); |
| |
| sesBuf.clear(); |
| |
| if (sesCap - buf.limit() < 0) { |
| int limit = sesBufLimit + (sesCap - buf.limit()) - 100; |
| |
| ses.addMeta(WRITE_BUF_LIMIT, limit); |
| |
| sesBuf.limit(limit); |
| } |
| |
| assert buf.hasRemaining(); |
| |
| if (!skipWrite) { |
| int cnt = sockCh.write(buf); |
| |
| if (log.isTraceEnabled()) |
| log.trace("Bytes sent [sockCh=" + sockCh + ", cnt=" + cnt + ']'); |
| |
| if (sentBytesCntMetric != null) |
| sentBytesCntMetric.add(cnt); |
| |
| ses.bytesSent(cnt); |
| } |
| else { |
| // For test purposes only (skipWrite is set to true in tests only). |
| try { |
| U.sleep(50); |
| } |
| catch (IgniteInterruptedCheckedException e) { |
| throw new IOException("Thread has been interrupted.", e); |
| } |
| } |
| |
| ses.addMeta(NIO_OPERATION.ordinal(), req); |
| |
| if (buf.hasRemaining()) { |
| ses.addMeta(BUF_META_KEY, buf); |
| |
| ses.addMeta(REQUESTS_META_KEY, pendingRequests); |
| |
| break; |
| } |
| else { |
| onRequestsWritten(ses, pendingRequests); |
| |
| buf = ses.writeBuffer(); |
| |
| if (ses.meta(WRITE_BUF_LIMIT) != null) |
| buf.limit(ses.meta(WRITE_BUF_LIMIT)); |
| } |
| } |
| } |
| finally { |
| sslFilter.unlock(ses); |
| } |
| } |
| |
| /** |
| * @param writer Customizer of writing. |
| * @param buf Buffer to write. |
| * @param req Source of data. |
| * @param pendingRequests List of requests which was successfully written. |
| * @return {@code true} if message successfully written to buffer and {@code false} otherwise. |
| */ |
| private boolean writeToBuffer( |
| MessageWriter writer, |
| ByteBuffer buf, |
| SessionWriteRequest req, |
| List<SessionWriteRequest> pendingRequests |
| ) { |
| Message msg; |
| boolean finished; |
| msg = (Message)req.message(); |
| |
| Span span = tracing.create(SpanType.COMMUNICATION_SOCKET_WRITE, req.span()); |
| |
| try (TraceSurroundings ignore = span.equals(NoopSpan.INSTANCE) ? null : MTC.support(span)) { |
| MTC.span().addTag(SpanTags.MESSAGE, traceName(msg)); |
| |
| assert msg != null; |
| |
| if (writer != null) |
| writer.setCurrentWriteClass(msg.getClass()); |
| |
| finished = msg.writeTo(buf, writer); |
| |
| if (finished) { |
| pendingRequests.add(req); |
| |
| if (writer != null) |
| writer.reset(); |
| } |
| |
| return finished; |
| } |
| } |
| |
| /** |
| * @param ses NIO session. |
| * @param sockCh Socket channel. |
| * @throws IOException If failed. |
| * |
| * @return {@code True} if there's nothing else to write (last buffer is written and queue is empty). |
| */ |
| private boolean writeSslSystem(GridSelectorNioSessionImpl ses, WritableByteChannel sockCh) |
| throws IOException { |
| ConcurrentLinkedQueue<ByteBuffer> queue = ses.meta(BUF_SSL_SYSTEM_META_KEY); |
| |
| assert queue != null; |
| |
| ByteBuffer buf; |
| |
| while ((buf = queue.peek()) != null) { |
| int cnt = sockCh.write(buf); |
| |
| if (sentBytesCntMetric != null) |
| sentBytesCntMetric.add(cnt); |
| |
| ses.bytesSent(cnt); |
| |
| if (!buf.hasRemaining()) |
| queue.poll(); |
| else |
| return false; |
| } |
| |
| return true; |
| } |
| |
| /** |
| * @param ses Session. |
| * @return System message request. |
| */ |
| private SessionWriteRequest systemMessage(GridSelectorNioSessionImpl ses) { |
| if (ses.hasSystemMessage()) { |
| Object msg = ses.systemMessage(); |
| |
| SessionWriteRequest req = new WriteRequestSystemImpl(ses, msg); |
| |
| assert !ses.hasSystemMessage(); |
| |
| return req; |
| } |
| |
| return null; |
| } |
| |
| /** |
| * Processes write-ready event on the key. |
| * |
| * @param key Key that is ready to be written. |
| * @throws IOException If write failed. |
| */ |
| private void processWrite0(SelectionKey key) throws IOException { |
| WritableByteChannel sockCh = (WritableByteChannel)key.channel(); |
| |
| GridSelectorNioSessionImpl ses = (GridSelectorNioSessionImpl)key.attachment(); |
| ByteBuffer buf = ses.writeBuffer(); |
| SessionWriteRequest req = ses.removeMeta(NIO_OPERATION.ordinal()); |
| |
| MessageWriter writer = ses.meta(MSG_WRITER.ordinal()); |
| |
| if (writer == null) { |
| try { |
| ses.addMeta(MSG_WRITER.ordinal(), writer = writerFactory.writer(ses)); |
| } |
| catch (IgniteCheckedException e) { |
| throw new IOException("Failed to create message writer.", e); |
| } |
| } |
| |
| if (req == null) { |
| req = systemMessage(ses); |
| |
| if (req == null) { |
| req = ses.pollFuture(); |
| |
| if (req == null && buf.position() == 0) { |
| stopPollingForWrite(key, ses); |
| |
| return; |
| } |
| } |
| } |
| |
| boolean finished = false; |
| |
| if (req != null) |
| finished = writeToBuffer(ses, buf, req, writer); |
| |
| // Fill up as many messages as possible to write buffer. |
| while (finished) { |
| req.onMessageWritten(); |
| |
| req = systemMessage(ses); |
| |
| if (req == null) |
| req = ses.pollFuture(); |
| |
| if (req == null) |
| break; |
| |
| finished = writeToBuffer(ses, buf, req, writer); |
| } |
| |
| buf.flip(); |
| |
| assert buf.hasRemaining(); |
| |
| if (!skipWrite) { |
| int cnt = sockCh.write(buf); |
| |
| if (log.isTraceEnabled()) |
| log.trace("Bytes sent [sockCh=" + sockCh + ", cnt=" + cnt + ']'); |
| |
| if (sentBytesCntMetric != null) |
| sentBytesCntMetric.add(cnt); |
| |
| ses.bytesSent(cnt); |
| onWrite(cnt); |
| } |
| else { |
| // For test purposes only (skipWrite is set to true in tests only). |
| try { |
| U.sleep(50); |
| } |
| catch (IgniteInterruptedCheckedException e) { |
| throw new IOException("Thread has been interrupted.", e); |
| } |
| } |
| |
| if (buf.hasRemaining() || !finished) { |
| buf.compact(); |
| |
| ses.addMeta(NIO_OPERATION.ordinal(), req); |
| } |
| else |
| buf.clear(); |
| } |
| |
| /** |
| * @param writer Customizer of writing. |
| * @param buf Buffer to write. |
| * @param req Source of data. |
| * @param ses Session for notification about writting. |
| * @return {@code true} if message successfully written to buffer and {@code false} otherwise. |
| */ |
| private boolean writeToBuffer(GridSelectorNioSessionImpl ses, ByteBuffer buf, SessionWriteRequest req, |
| MessageWriter writer) { |
| Message msg; |
| boolean finished; |
| msg = (Message)req.message(); |
| |
| assert msg != null : req; |
| |
| Span span = tracing.create(SpanType.COMMUNICATION_SOCKET_WRITE, req.span()); |
| |
| try (TraceSurroundings ignore = span.equals(NoopSpan.INSTANCE) ? null : MTC.support(span)) { |
| MTC.span().addTag(SpanTags.MESSAGE, traceName(msg)); |
| |
| if (writer != null) |
| writer.setCurrentWriteClass(msg.getClass()); |
| |
| finished = msg.writeTo(buf, writer); |
| |
| if (finished) { |
| onMessageWritten(ses, msg); |
| |
| if (writer != null) |
| writer.reset(); |
| } |
| |
| return finished; |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public String toString() { |
| return S.toString(DirectNioClientWorker.class, this, super.toString()); |
| } |
| } |
| |
| /** |
| * Notifies SessionWriteRequests and it's messages when requests were actually written. |
| * |
| * @param ses GridNioSession. |
| * @param requests SessionWriteRequests. |
| */ |
| private void onRequestsWritten(GridSelectorNioSessionImpl ses, List<SessionWriteRequest> requests) { |
| for (SessionWriteRequest request : requests) { |
| request.onMessageWritten(); |
| |
| onMessageWritten(ses, (Message)request.message()); |
| } |
| } |
| |
| /** |
| * Handle message written event. |
| * |
| * @param ses Session. |
| * @param msg Message. |
| */ |
| private void onMessageWritten(GridSelectorNioSessionImpl ses, Message msg) { |
| if (lsnr != null) |
| lsnr.onMessageSent(ses, (T)msg); |
| } |
| |
| /** |
| * Thread performing only read operations from the channel. |
| */ |
| private abstract class AbstractNioClientWorker extends GridWorker implements GridNioWorker { |
| /** Queue of change requests on this selector. */ |
| @GridToStringExclude |
| private final ConcurrentLinkedQueue<SessionChangeRequest> changeReqs = new ConcurrentLinkedQueue<>(); |
| |
| /** Selector to select read events. */ |
| @GridToStringExclude |
| private Selector selector; |
| |
| /** Selected keys. */ |
| @GridToStringExclude |
| private SelectedSelectionKeySet selectedKeys; |
| |
| /** Worker index. */ |
| private final int idx; |
| |
| /** */ |
| private long bytesRcvd; |
| |
| /** */ |
| private long bytesSent; |
| |
| /** */ |
| private volatile long bytesRcvd0; |
| |
| /** */ |
| private volatile long bytesSent0; |
| |
| /** Sessions assigned to this worker. */ |
| @GridToStringExclude |
| private final GridConcurrentHashSet<GridSelectorNioSessionImpl> workerSessions = |
| new GridConcurrentHashSet<>(); |
| |
| /** {@code True} if worker has called or is about to call {@code Selector.select()}. */ |
| private volatile boolean select; |
| |
| /** |
| * @param idx Index of this worker in server's array. |
| * @param igniteInstanceName Ignite instance name. |
| * @param name Worker name. |
| * @param log Logger. |
| * @param workerLsnr Worker lifecycle listener. |
| * @throws IgniteCheckedException If selector could not be created. |
| */ |
| AbstractNioClientWorker( |
| int idx, |
| @Nullable String igniteInstanceName, |
| String name, |
| IgniteLogger log, |
| @Nullable GridWorkerListener workerLsnr |
| ) throws IgniteCheckedException { |
| super(igniteInstanceName, name, log, workerLsnr); |
| |
| createSelector(); |
| |
| this.idx = idx; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException { |
| Throwable err = null; |
| |
| try { |
| boolean reset = false; |
| |
| while (!closed) { |
| updateHeartbeat(); |
| |
| try { |
| if (reset) |
| createSelector(); |
| |
| bodyInternal(); |
| |
| onIdle(); |
| } |
| catch (IgniteCheckedException e) { |
| if (!Thread.currentThread().isInterrupted()) { |
| U.error(log, "Failed to read data from remote connection (will wait for " + |
| ERR_WAIT_TIME + "ms).", e); |
| |
| U.sleep(ERR_WAIT_TIME); |
| |
| reset = true; |
| } |
| } |
| } |
| } |
| catch (Throwable e) { |
| U.error(log, "Caught unhandled exception in NIO worker thread (restart the node).", e); |
| |
| err = e; |
| |
| if (e instanceof Error) |
| throw e; |
| } |
| finally { |
| if (err instanceof OutOfMemoryError) |
| lsnr.onFailure(CRITICAL_ERROR, err); |
| else if (!closed) { |
| if (err == null) |
| lsnr.onFailure(SYSTEM_WORKER_TERMINATION, |
| new IllegalStateException("Thread " + name() + " is terminated unexpectedly")); |
| else |
| lsnr.onFailure(SYSTEM_WORKER_TERMINATION, err); |
| } |
| else if (err != null) |
| lsnr.onFailure(SYSTEM_WORKER_TERMINATION, err); |
| else |
| // In case of closed == true, prevent general-case termination handling. |
| cancel(); |
| } |
| } |
| |
| /** |
| * @throws IgniteCheckedException If failed. |
| */ |
| private void createSelector() throws IgniteCheckedException { |
| selectedKeys = null; |
| |
| selector = GridNioServer.this.createSelector(null); |
| |
| if (DISABLE_KEYSET_OPTIMIZATION) |
| return; |
| |
| try { |
| SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet(); |
| |
| Class<?> selectorImplCls = |
| Class.forName("sun.nio.ch.SelectorImpl", false, U.gridClassLoader()); |
| |
| // Ensure the current selector implementation is what we can instrument. |
| if (!selectorImplCls.isAssignableFrom(selector.getClass())) |
| return; |
| |
| Field selectedKeysField = selectorImplCls.getDeclaredField("selectedKeys"); |
| Field publicSelectedKeysField = selectorImplCls.getDeclaredField("publicSelectedKeys"); |
| |
| selectedKeysField.setAccessible(true); |
| publicSelectedKeysField.setAccessible(true); |
| |
| selectedKeysField.set(selector, selectedKeySet); |
| publicSelectedKeysField.set(selector, selectedKeySet); |
| |
| selectedKeys = selectedKeySet; |
| |
| if (log.isDebugEnabled()) |
| log.debug("Instrumented an optimized java.util.Set into: " + selector); |
| } |
| catch (Exception e) { |
| selectedKeys = null; |
| |
| if (log.isDebugEnabled()) |
| log.debug("Failed to instrument an optimized java.util.Set into selector [selector=" + selector |
| + ", err=" + e + ']'); |
| } |
| } |
| |
| /** |
| * Adds socket channel to the registration queue and wakes up reading thread. |
| * |
| * @param req Change request. |
| */ |
| @Override public void offer(SessionChangeRequest req) { |
| changeReqs.offer(req); |
| |
| if (select) |
| selector.wakeup(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void offer(Collection<SessionChangeRequest> reqs) { |
| for (SessionChangeRequest req : reqs) |
| changeReqs.offer(req); |
| |
| selector.wakeup(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public List<SessionChangeRequest> clearSessionRequests(GridNioSession ses) { |
| List<SessionChangeRequest> sesReqs = null; |
| |
| for (SessionChangeRequest changeReq : changeReqs) { |
| if (changeReq.session() == ses && !(changeReq instanceof SessionMoveFuture)) { |
| boolean rmv = changeReqs.remove(changeReq); |
| |
| assert rmv : changeReq; |
| |
| if (sesReqs == null) |
| sesReqs = new ArrayList<>(); |
| |
| sesReqs.add(changeReq); |
| } |
| } |
| |
| return sesReqs; |
| } |
| |
| /** |
| * Processes read and write events and registration requests. |
| * |
| * @throws IgniteCheckedException If IOException occurred or thread was unable to add worker to workers pool. |
| */ |
| @SuppressWarnings("unchecked") |
| private void bodyInternal() throws IgniteCheckedException, InterruptedException { |
| try { |
| long lastIdleCheck = U.currentTimeMillis(); |
| |
| mainLoop: |
| while (!closed && selector.isOpen()) { |
| SessionChangeRequest req0; |
| |
| updateHeartbeat(); |
| |
| while ((req0 = changeReqs.poll()) != null) { |
| updateHeartbeat(); |
| |
| switch (req0.operation()) { |
| case CONNECT: { |
| NioOperationFuture fut = (NioOperationFuture)req0; |
| |
| SocketChannel ch = fut.socketChannel(); |
| |
| try { |
| ch.register(selector, SelectionKey.OP_CONNECT, fut); |
| } |
| catch (IOException e) { |
| fut.onDone(new IgniteCheckedException("Failed to register channel on selector", e)); |
| } |
| |
| break; |
| } |
| |
| case CANCEL_CONNECT: { |
| NioOperationFuture req = (NioOperationFuture)req0; |
| |
| SocketChannel ch = req.socketChannel(); |
| |
| SelectionKey key = ch.keyFor(selector); |
| |
| if (key != null) |
| key.cancel(); |
| |
| U.closeQuiet(ch); |
| |
| req.onDone(); |
| |
| break; |
| } |
| |
| case REGISTER: { |
| register((NioOperationFuture)req0); |
| |
| break; |
| } |
| |
| case MOVE: { |
| SessionMoveFuture f = (SessionMoveFuture)req0; |
| |
| GridSelectorNioSessionImpl ses = f.session(); |
| |
| if (idx == f.toIdx) { |
| assert f.movedSocketChannel() != null : f; |
| |
| boolean add = workerSessions.add(ses); |
| |
| assert add; |
| |
| ses.finishMoveSession(this); |
| |
| if (idx % 2 == 0) |
| readerMoveCnt.incrementAndGet(); |
| else |
| writerMoveCnt.incrementAndGet(); |
| |
| SelectionKey key = f.movedSocketChannel().register(selector, |
| SelectionKey.OP_READ | SelectionKey.OP_WRITE, |
| ses); |
| |
| ses.key(key); |
| |
| ses.procWrite.set(true); |
| |
| f.onDone(true); |
| } |
| else { |
| assert f.movedSocketChannel() == null : f; |
| |
| if (workerSessions.remove(ses)) { |
| ses.startMoveSession(this); |
| |
| SelectionKey key = ses.key(); |
| |
| assert key.channel() != null : key; |
| |
| f.movedSocketChannel((SocketChannel)key.channel()); |
| |
| key.cancel(); |
| |
| clientWorkers.get(f.toIndex()).offer(f); |
| } |
| else |
| f.onDone(false); |
| } |
| |
| break; |
| } |
| |
| case REQUIRE_WRITE: { |
| SessionWriteRequest req = (SessionWriteRequest)req0; |
| |
| registerWrite((GridSelectorNioSessionImpl)req.session()); |
| |
| break; |
| } |
| |
| case CLOSE: { |
| NioOperationFuture req = (NioOperationFuture)req0; |
| |
| if (close(req.session(), null)) |
| req.onDone(true); |
| else |
| req.onDone(false); |
| |
| break; |
| } |
| |
| case PAUSE_READ: { |
| NioOperationFuture req = (NioOperationFuture)req0; |
| |
| SelectionKey key = req.session().key(); |
| |
| if (key.isValid()) { |
| key.interestOps(key.interestOps() & (~SelectionKey.OP_READ)); |
| |
| GridSelectorNioSessionImpl ses = (GridSelectorNioSessionImpl)key.attachment(); |
| |
| ses.readsPaused(true); |
| |
| req.onDone(true); |
| } |
| else |
| req.onDone(false); |
| |
| break; |
| } |
| |
| case RESUME_READ: { |
| NioOperationFuture req = (NioOperationFuture)req0; |
| |
| SelectionKey key = req.session().key(); |
| |
| if (key.isValid()) { |
| key.interestOps(key.interestOps() | SelectionKey.OP_READ); |
| |
| GridSelectorNioSessionImpl ses = (GridSelectorNioSessionImpl)key.attachment(); |
| |
| ses.readsPaused(false); |
| |
| req.onDone(true); |
| } |
| else |
| req.onDone(false); |
| |
| break; |
| } |
| |
| case DUMP_STATS: { |
| NioOperationFuture req = (NioOperationFuture)req0; |
| |
| IgnitePredicate<GridNioSession> p = |
| req.msg instanceof IgnitePredicate ? (IgnitePredicate<GridNioSession>)req.msg : null; |
| |
| StringBuilder sb = new StringBuilder(); |
| |
| try { |
| dumpStats(sb, p, p != null); |
| } |
| finally { |
| req.onDone(sb.toString()); |
| } |
| } |
| } |
| } |
| |
| int res = 0; |
| |
| for (long i = 0; i < selectorSpins && res == 0; i++) { |
| res = selector.selectNow(); |
| |
| if (res > 0) { |
| // Walk through the ready keys collection and process network events. |
| updateHeartbeat(); |
| |
| if (selectedKeys == null) |
| processSelectedKeys(selector.selectedKeys()); |
| else |
| processSelectedKeysOptimized(selectedKeys.flip()); |
| } |
| |
| if (!changeReqs.isEmpty()) |
| continue mainLoop; |
| |
| // Just in case we do busy selects. |
| long now = U.currentTimeMillis(); |
| |
| if (now - lastIdleCheck > 2000) { |
| lastIdleCheck = now; |
| |
| checkIdle(selector.keys()); |
| } |
| |
| if (isCancelled()) |
| return; |
| } |
| |
| // Falling to blocking select. |
| select = true; |
| |
| try { |
| if (!changeReqs.isEmpty()) |
| continue; |
| |
| updateHeartbeat(); |
| |
| // Wake up every 2 seconds to check if closed. |
| if (selector.select(2000) > 0) { |
| // Walk through the ready keys collection and process network events. |
| if (selectedKeys == null) |
| processSelectedKeys(selector.selectedKeys()); |
| else |
| processSelectedKeysOptimized(selectedKeys.flip()); |
| } |
| |
| // select() call above doesn't throw on interruption; checking it here to propagate timely. |
| if (!closed && !isCancelled && Thread.interrupted()) |
| throw new InterruptedException(); |
| } |
| finally { |
| select = false; |
| } |
| |
| long now = U.currentTimeMillis(); |
| |
| if (now - lastIdleCheck > 2000) { |
| lastIdleCheck = now; |
| |
| checkIdle(selector.keys()); |
| } |
| } |
| } |
| // Ignore this exception as thread interruption is equal to 'close' call. |
| catch (ClosedByInterruptException e) { |
| if (log.isDebugEnabled()) |
| log.debug("Closing selector due to thread interruption: " + e.getMessage()); |
| } |
| catch (ClosedSelectorException e) { |
| throw new IgniteCheckedException("Selector got closed while active.", e); |
| } |
| catch (IOException e) { |
| throw new IgniteCheckedException("Failed to select events on selector.", e); |
| } |
| finally { |
| if (selector.isOpen()) { |
| if (log.isDebugEnabled()) |
| log.debug("Closing all connected client sockets."); |
| |
| // Close all channels registered with selector. |
| for (SelectionKey key : selector.keys()) { |
| GridNioKeyAttachment attach = (GridNioKeyAttachment)key.attachment(); |
| |
| if (attach != null && attach.hasSession()) |
| close(attach.session(), null); |
| } |
| |
| if (log.isDebugEnabled()) |
| log.debug("Closing NIO selector."); |
| |
| U.close(selector, log); |
| } |
| } |
| } |
| |
| /** |
| * @param ses Session. |
| */ |
| @Override public final void registerWrite(GridSelectorNioSessionImpl ses) { |
| SelectionKey key = ses.key(); |
| |
| if (key.isValid()) { |
| if ((key.interestOps() & SelectionKey.OP_WRITE) == 0) |
| key.interestOps(key.interestOps() | SelectionKey.OP_WRITE); |
| |
| // Update timestamp to protected against false write timeout. |
| ses.bytesSent(0); |
| } |
| } |
| |
| /** |
| * @param sb Message builder. |
| * @param keys Keys. |
| */ |
| private void dumpSelectorInfo(StringBuilder sb, Set<SelectionKey> keys) { |
| sb.append(">> Selector info [id=").append(idx) |
| .append(", keysCnt=").append(keys.size()) |
| .append(", bytesRcvd=").append(bytesRcvd) |
| .append(", bytesRcvd0=").append(bytesRcvd0) |
| .append(", bytesSent=").append(bytesSent) |
| .append(", bytesSent0=").append(bytesSent0) |
| .append("]").append(U.nl()); |
| } |
| |
| /** |
| * @param sb Message builder. |
| * @param p Optional session predicate. |
| * @param shortInfo Short info flag. |
| */ |
| private void dumpStats(StringBuilder sb, |
| @Nullable IgnitePredicate<GridNioSession> p, |
| boolean shortInfo) { |
| Set<SelectionKey> keys = selector.keys(); |
| |
| boolean selInfo = p == null; |
| |
| if (selInfo) |
| dumpSelectorInfo(sb, keys); |
| |
| for (SelectionKey key : keys) { |
| GridNioKeyAttachment attach = (GridNioKeyAttachment)key.attachment(); |
| |
| if (!attach.hasSession()) |
| continue; |
| |
| GridSelectorNioSessionImpl ses = attach.session(); |
| |
| boolean sesInfo = p == null || p.apply(ses); |
| |
| if (sesInfo) { |
| if (!selInfo) { |
| dumpSelectorInfo(sb, keys); |
| |
| selInfo = true; |
| } |
| |
| sb.append(" Connection info [") |
| .append("in=").append(ses.accepted()) |
| .append(", rmtAddr=").append(ses.remoteAddress()) |
| .append(", locAddr=").append(ses.localAddress()); |
| |
| GridNioRecoveryDescriptor outDesc = ses.outRecoveryDescriptor(); |
| |
| if (outDesc != null) { |
| sb.append(", msgsSent=").append(outDesc.sent()) |
| .append(", msgsAckedByRmt=").append(outDesc.acked()) |
| .append(", descIdHash=").append(System.identityHashCode(outDesc)); |
| |
| if (!outDesc.messagesRequests().isEmpty()) { |
| int cnt = 0; |
| |
| sb.append(", unackedMsgs=["); |
| |
| for (SessionWriteRequest req : outDesc.messagesRequests()) { |
| if (cnt != 0) |
| sb.append(", "); |
| |
| Object msg = req.message(); |
| |
| if (shortInfo && msg instanceof GridIoMessage) |
| msg = ((GridIoMessage)msg).message().getClass().getSimpleName(); |
| |
| sb.append(msg); |
| |
| if (++cnt == 5) |
| break; |
| } |
| |
| sb.append(']'); |
| } |
| } |
| else |
| sb.append(", outRecoveryDesc=null"); |
| |
| GridNioRecoveryDescriptor inDesc = ses.inRecoveryDescriptor(); |
| |
| if (inDesc != null) { |
| sb.append(", msgsRcvd=").append(inDesc.received()) |
| .append(", lastAcked=").append(inDesc.lastAcknowledged()) |
| .append(", descIdHash=").append(System.identityHashCode(inDesc)); |
| } |
| else |
| sb.append(", inRecoveryDesc=null"); |
| |
| sb.append(", bytesRcvd=").append(ses.bytesReceived()) |
| .append(", bytesRcvd0=").append(ses.bytesReceived0()) |
| .append(", bytesSent=").append(ses.bytesSent()) |
| .append(", bytesSent0=").append(ses.bytesSent0()) |
| .append(", opQueueSize=").append(ses.writeQueueSize()); |
| |
| if (!shortInfo) { |
| MessageWriter writer = ses.meta(MSG_WRITER.ordinal()); |
| MessageReader reader = ses.meta(GridDirectParser.READER_META_KEY); |
| |
| sb.append(", msgWriter=").append(writer != null ? writer.toString() : "null") |
| .append(", msgReader=").append(reader != null ? reader.toString() : "null"); |
| } |
| |
| int cnt = 0; |
| |
| for (SessionWriteRequest req : ses.writeQueue()) { |
| Object msg = req.message(); |
| |
| if (shortInfo && msg instanceof GridIoMessage) |
| msg = ((GridIoMessage)msg).message().getClass().getSimpleName(); |
| |
| if (cnt == 0) |
| sb.append(",\n opQueue=[").append(msg); |
| else |
| sb.append(',').append(msg); |
| |
| if (++cnt == 5) { |
| sb.append(']'); |
| |
| break; |
| } |
| } |
| |
| sb.append("]"); |
| } |
| } |
| } |
| |
| /** |
| * Processes keys selected by a selector. |
| * |
| * @param keys Selected keys. |
| * @throws ClosedByInterruptException If this thread was interrupted while reading data. |
| */ |
| private void processSelectedKeysOptimized(SelectionKey[] keys) throws ClosedByInterruptException { |
| for (int i = 0; ; i++) { |
| final SelectionKey key = keys[i]; |
| |
| if (key == null) |
| break; |
| |
| // null out entry in the array to allow to have it GC'ed once the Channel close |
| // See https://github.com/netty/netty/issues/2363 |
| keys[i] = null; |
| |
| // Was key closed? |
| if (!key.isValid()) |
| continue; |
| |
| GridNioKeyAttachment attach = (GridNioKeyAttachment)key.attachment(); |
| |
| assert attach != null; |
| |
| try { |
| if (!attach.hasSession() && key.isConnectable()) { |
| processConnect(key); |
| |
| continue; |
| } |
| |
| if (key.isReadable()) |
| processRead(key); |
| |
| if (key.isValid() && key.isWritable()) |
| processWrite(key); |
| } |
| catch (ClosedByInterruptException e) { |
| // This exception will be handled in bodyInternal() method. |
| throw e; |
| } |
| catch (Exception | Error e) { // TODO IGNITE-2659. |
| try { |
| U.sleep(1000); |
| } |
| catch (IgniteInterruptedCheckedException ignore) { |
| // No-op. |
| } |
| |
| GridSelectorNioSessionImpl ses = attach.session(); |
| |
| if (!closed) |
| U.error(log, "Failed to process selector key [ses=" + ses + ']', e); |
| else if (log.isDebugEnabled()) |
| log.debug("Failed to process selector key [ses=" + ses + ", err=" + e + ']'); |
| |
| // Can be null if async connect failed. |
| if (ses != null) |
| close(ses, new GridNioException(e)); |
| else |
| closeKey(key); |
| } |
| } |
| } |
| |
| /** |
| * Processes keys selected by a selector. |
| * |
| * @param keys Selected keys. |
| * @throws ClosedByInterruptException If this thread was interrupted while reading data. |
| */ |
| private void processSelectedKeys(Set<SelectionKey> keys) throws ClosedByInterruptException { |
| if (log.isTraceEnabled()) |
| log.trace("Processing keys in client worker: " + keys.size()); |
| |
| if (keys.isEmpty()) |
| return; |
| |
| for (Iterator<SelectionKey> iter = keys.iterator(); iter.hasNext(); ) { |
| SelectionKey key = iter.next(); |
| |
| iter.remove(); |
| |
| // Was key closed? |
| if (!key.isValid()) |
| continue; |
| |
| GridNioKeyAttachment attach = (GridNioKeyAttachment)key.attachment(); |
| |
| assert attach != null; |
| |
| try { |
| if (!attach.hasSession() && key.isConnectable()) { |
| processConnect(key); |
| |
| continue; |
| } |
| |
| if (key.isReadable()) |
| processRead(key); |
| |
| if (key.isValid() && key.isWritable()) |
| processWrite(key); |
| } |
| catch (ClosedByInterruptException e) { |
| // This exception will be handled in bodyInternal() method. |
| throw e; |
| } |
| catch (Exception | Error e) { // TODO IGNITE-2659. |
| try { |
| U.sleep(1000); |
| } |
| catch (IgniteInterruptedCheckedException ignore) { |
| // No-op. |
| } |
| |
| GridSelectorNioSessionImpl ses = attach.session(); |
| |
| if (!closed) |
| U.error(log, "Failed to process selector key [ses=" + ses + ']', e); |
| else if (log.isDebugEnabled()) |
| log.debug("Failed to process selector key [ses=" + ses + ", err=" + e + ']'); |
| } |
| } |
| } |
| |
| /** |
| * Checks sessions assigned to a selector for timeouts. |
| * |
| * @param keys Keys registered to selector. |
| */ |
| private void checkIdle(Iterable<SelectionKey> keys) { |
| long now = U.currentTimeMillis(); |
| |
| for (SelectionKey key : keys) { |
| GridNioKeyAttachment attach = (GridNioKeyAttachment)key.attachment(); |
| |
| if (attach == null || !attach.hasSession()) |
| continue; |
| |
| GridSelectorNioSessionImpl ses = attach.session(); |
| |
| try { |
| long writeTimeout0 = writeTimeout; |
| |
| boolean opWrite = key.isValid() && (key.interestOps() & SelectionKey.OP_WRITE) != 0; |
| |
| // If we are writing and timeout passed. |
| if (opWrite && now - ses.lastSendTime() > writeTimeout0) { |
| filterChain.onSessionWriteTimeout(ses); |
| |
| // Update timestamp to avoid multiple notifications within one timeout interval. |
| ses.bytesSent(0); |
| |
| continue; |
| } |
| |
| long idleTimeout0 = idleTimeout; |
| |
| if (!opWrite && |
| now - ses.lastReceiveTime() > idleTimeout0 && |
| now - ses.lastSendScheduleTime() > idleTimeout0) { |
| filterChain.onSessionIdleTimeout(ses); |
| |
| // Update timestamp to avoid multiple notifications within one timeout interval. |
| ses.resetSendScheduleTime(); |
| ses.bytesReceived(0); |
| } |
| } |
| catch (IgniteCheckedException e) { |
| close(ses, e); |
| } |
| } |
| } |
| |
| /** |
| * Registers given socket channel to the selector, creates a session and notifies the listener. |
| * |
| * @param fut Registration future. |
| */ |
| private void register(NioOperationFuture<GridNioSession> fut) { |
| assert fut != null; |
| |
| SocketChannel sockCh = fut.socketChannel(); |
| |
| assert sockCh != null; |
| |
| Socket sock = sockCh.socket(); |
| |
| try { |
| ByteBuffer writeBuf = null; |
| ByteBuffer readBuf = null; |
| |
| if (directMode) { |
| writeBuf = directBuf ? ByteBuffer.allocateDirect(sock.getSendBufferSize()) : |
| ByteBuffer.allocate(sock.getSendBufferSize()); |
| readBuf = directBuf ? ByteBuffer.allocateDirect(sock.getReceiveBufferSize()) : |
| ByteBuffer.allocate(sock.getReceiveBufferSize()); |
| |
| writeBuf.order(order); |
| readBuf.order(order); |
| } |
| |
| final GridSelectorNioSessionImpl ses = new GridSelectorNioSessionImpl( |
| log, |
| this, |
| filterChain, |
| (InetSocketAddress)sockCh.getLocalAddress(), |
| (InetSocketAddress)sockCh.getRemoteAddress(), |
| fut.accepted(), |
| sndQueueLimit, |
| mreg, |
| writeBuf, |
| readBuf); |
| |
| Map<Integer, ?> meta = fut.meta(); |
| |
| if (meta != null) { |
| for (Entry<Integer, ?> e : meta.entrySet()) |
| ses.addMeta(e.getKey(), e.getValue()); |
| |
| if (!ses.accepted()) { |
| GridNioRecoveryDescriptor desc = |
| (GridNioRecoveryDescriptor)meta.get(RECOVERY_DESC_META_KEY); |
| |
| if (desc != null) { |
| ses.outRecoveryDescriptor(desc); |
| |
| if (!desc.pairedConnections()) |
| ses.inRecoveryDescriptor(desc); |
| } |
| } |
| } |
| |
| SelectionKey key; |
| |
| if (!sockCh.isRegistered()) { |
| assert fut.op == NioOperation.REGISTER : fut.op; |
| |
| key = sockCh.register(selector, SelectionKey.OP_READ, ses); |
| |
| ses.key(key); |
| |
| resend(ses); |
| } |
| else { |
| assert fut.op == NioOperation.CONNECT : fut.op; |
| |
| key = sockCh.keyFor(selector); |
| |
| key.attach(ses); |
| |
| key.interestOps(key.interestOps() & (~SelectionKey.OP_CONNECT)); |
| key.interestOps(key.interestOps() | SelectionKey.OP_READ); |
| |
| ses.key(key); |
| } |
| |
| sessions.add(ses); |
| workerSessions.add(ses); |
| |
| try { |
| filterChain.onSessionOpened(ses); |
| |
| fut.onDone(ses); |
| } |
| catch (IgniteCheckedException e) { |
| close(ses, e); |
| |
| fut.onDone(e); |
| } |
| |
| if (closed) |
| ses.onServerStopped(); |
| } |
| catch (ClosedChannelException e) { |
| U.warn(log, "Failed to register accepted socket channel to selector (channel was closed): " |
| + sock.getRemoteSocketAddress(), e); |
| } |
| catch (IOException e) { |
| U.error(log, "Failed to get socket addresses.", e); |
| } |
| } |
| |
| /** |
| * @param key Key. |
| */ |
| private void closeKey(SelectionKey key) { |
| // Shutdown input and output so that remote client will see correct socket close. |
| Socket sock = ((SocketChannel)key.channel()).socket(); |
| |
| try { |
| try { |
| sock.shutdownInput(); |
| } |
| catch (IOException ignored) { |
| // No-op. |
| } |
| |
| try { |
| sock.shutdownOutput(); |
| } |
| catch (IOException ignored) { |
| // No-op. |
| } |
| } |
| finally { |
| U.close(key, log); |
| U.close(sock, log); |
| } |
| } |
| |
| /** |
| * @param ses Session to be closed. |
| * @param e Exception to be passed to the listener, if any. |
| * @return {@code True} if this call closed the ses. |
| */ |
| protected boolean close(final GridSelectorNioSessionImpl ses, @Nullable final IgniteCheckedException e) { |
| return close(ses, e, ses.closeSocketOnSessionClose()); |
| } |
| |
| /** |
| * Closes the session and all associated resources, then notifies the listener. |
| * |
| * @param ses Session to be closed. |
| * @param e Exception to be passed to the listener, if any. |
| * @param closeSock If {@code True} the channel will be closed. |
| * @return {@code True} if this call closed the ses. |
| */ |
| protected boolean close( |
| final GridSelectorNioSessionImpl ses, |
| @Nullable final IgniteCheckedException e, |
| boolean closeSock |
| ) { |
| if (e != null) { |
| // Print stack trace only if has runtime exception in it's cause. |
| if (e.hasCause(IOException.class)) |
| U.warn(log, "Client disconnected abruptly due to network connection loss or because " + |
| "the connection was left open on application shutdown. [cls=" + e.getClass() + |
| ", msg=" + e.getMessage() + ']'); |
| else |
| U.error(log, "Closing NIO session because of unhandled exception.", e); |
| } |
| |
| sessions.remove(ses); |
| workerSessions.remove(ses); |
| |
| if (ses.setClosed()) { |
| ses.onClosed(); |
| |
| if (directBuf) { |
| if (ses.writeBuffer() != null) |
| GridUnsafe.cleanDirectBuffer(ses.writeBuffer()); |
| |
| if (ses.readBuffer() != null) |
| GridUnsafe.cleanDirectBuffer(ses.readBuffer()); |
| } |
| |
| if (closeSock) |
| closeKey(ses.key()); |
| else |
| ses.key().cancel(); // Unbind socket to the current SelectionKey. |
| |
| if (e != null) |
| filterChain.onExceptionCaught(ses, e); |
| |
| ses.removeMeta(BUF_META_KEY); |
| |
| // Since ses is in closed state, no write requests will be added. |
| SessionWriteRequest req = ses.removeMeta(NIO_OPERATION.ordinal()); |
| |
| GridNioRecoveryDescriptor outRecovery = ses.outRecoveryDescriptor(); |
| GridNioRecoveryDescriptor inRecovery = ses.inRecoveryDescriptor(); |
| |
| IOException err = new IOException("Failed to send message (connection was closed): " + ses); |
| |
| if (outRecovery != null || inRecovery != null) { |
| try { |
| // Poll will update recovery data. |
| while ((req = ses.pollFuture()) != null) { |
| if (req.skipRecovery()) |
| req.onError(err); |
| } |
| } |
| finally { |
| if (outRecovery != null) |
| outRecovery.release(); |
| |
| if (inRecovery != null && inRecovery != outRecovery) |
| inRecovery.release(); |
| } |
| } |
| else { |
| if (req != null) |
| req.onError(err); |
| |
| while ((req = ses.pollFuture()) != null) |
| req.onError(err); |
| } |
| |
| try { |
| filterChain.onSessionClosed(ses); |
| } |
| catch (IgniteCheckedException e1) { |
| filterChain.onExceptionCaught(ses, e1); |
| } |
| |
| return true; |
| } |
| |
| return false; |
| } |
| |
| /** |
| * @param key Key. |
| * @throws IOException If failed. |
| */ |
| private void processConnect(SelectionKey key) throws IOException { |
| SocketChannel ch = (SocketChannel)key.channel(); |
| |
| NioOperationFuture<GridNioSession> sesFut = (NioOperationFuture<GridNioSession>)key.attachment(); |
| |
| assert sesFut != null; |
| |
| try { |
| if (ch.finishConnect()) |
| register(sesFut); |
| } |
| catch (IOException e) { |
| U.closeQuiet(ch); |
| |
| sesFut.onDone(new GridNioException("Failed to connect to node", e)); |
| |
| throw e; |
| } |
| } |
| |
| /** |
| * Processes read-available event on the key. |
| * |
| * @param key Key that is ready to be read. |
| * @throws IOException If key read failed. |
| */ |
| protected abstract void processRead(SelectionKey key) throws IOException; |
| |
| /** |
| * Processes write-ready event on the key. |
| * |
| * @param key Key that is ready to be written. |
| * @throws IOException If write failed. |
| */ |
| protected abstract void processWrite(SelectionKey key) throws IOException; |
| |
| /** |
| * @param cnt |
| */ |
| final void onRead(int cnt) { |
| bytesRcvd += cnt; |
| bytesRcvd0 += cnt; |
| } |
| |
| /** |
| * @param cnt |
| */ |
| final void onWrite(int cnt) { |
| bytesSent += cnt; |
| bytesSent0 += cnt; |
| } |
| |
| /** |
| * |
| */ |
| final void reset0() { |
| bytesSent0 = 0; |
| bytesRcvd0 = 0; |
| |
| for (GridSelectorNioSessionImpl ses : workerSessions) |
| ses.reset0(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public String toString() { |
| return S.toString(AbstractNioClientWorker.class, this, super.toString()); |
| } |
| } |
| |
| /** |
| * Gets outbound messages queue size. |
| * |
| * @return Write queue size. |
| */ |
| public int outboundMessagesQueueSize() { |
| if (outboundMessagesQueueSizeMetric == null) |
| return -1; |
| |
| return (int) outboundMessagesQueueSizeMetric.value(); |
| } |
| |
| /** |
| * A separate thread that will accept incoming connections and schedule read to some worker. |
| */ |
| private class GridNioAcceptWorker extends GridWorker { |
| /** Selector for this thread. */ |
| private Selector selector; |
| |
| /** |
| * @param igniteInstanceName Ignite instance name. |
| * @param name Thread name. |
| * @param log Log. |
| * @param selector Which will accept incoming connections. |
| * @param workerLsnr Worker lifecycle listener. |
| */ |
| protected GridNioAcceptWorker( |
| @Nullable String igniteInstanceName, |
| String name, |
| IgniteLogger log, |
| Selector selector, |
| @Nullable GridWorkerListener workerLsnr |
| ) { |
| super(igniteInstanceName, name, log, workerLsnr); |
| |
| this.selector = selector; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void cancel() { |
| super.cancel(); |
| |
| // If accept worker never was started then explicitly close selector, otherwise selector will be closed |
| // in finally block when workers thread will be stopped. |
| if (runner() == null) |
| closeSelector(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException { |
| Throwable err = null; |
| |
| try { |
| boolean reset = false; |
| |
| while (!closed && !isCancelled()) { |
| try { |
| if (reset) |
| selector = createSelector(locAddr); |
| |
| accept(); |
| } |
| catch (IgniteCheckedException e) { |
| if (!Thread.currentThread().isInterrupted()) { |
| U.error(log, "Failed to accept remote connection (will wait for " + ERR_WAIT_TIME + "ms).", |
| e); |
| |
| U.sleep(ERR_WAIT_TIME); |
| |
| reset = true; |
| } |
| } |
| } |
| } |
| catch (Throwable t) { |
| if (!(t instanceof IgniteInterruptedCheckedException)) |
| err = t; |
| |
| throw t; |
| } |
| finally { |
| try { |
| closeSelector(); // Safety. |
| } |
| catch (RuntimeException ignore) { |
| // No-op. |
| } |
| |
| if (err == null && !closed) |
| err = new IllegalStateException("Thread " + name() + " is terminated unexpectedly"); |
| |
| if (err instanceof OutOfMemoryError) |
| lsnr.onFailure(CRITICAL_ERROR, err); |
| else if (err != null) |
| lsnr.onFailure(SYSTEM_WORKER_TERMINATION, err); |
| else |
| // In case of closed == true, prevent general-case termination handling. |
| cancel(); |
| } |
| } |
| |
| /** |
| * Accepts connections and schedules them for processing by one of read workers. |
| * |
| * @throws IgniteCheckedException If failed. |
| */ |
| private void accept() throws IgniteCheckedException { |
| try { |
| while (!closed && selector.isOpen() && !Thread.currentThread().isInterrupted()) { |
| updateHeartbeat(); |
| |
| // Wake up every 2 seconds to check if closed. |
| if (selector.select(2000) > 0) |
| // Walk through the ready keys collection and process date requests. |
| processSelectedKeys(selector.selectedKeys()); |
| else |
| updateHeartbeat(); |
| |
| if (balancer != null) |
| balancer.run(); |
| |
| onIdle(); |
| } |
| } |
| // Ignore this exception as thread interruption is equal to 'close' call. |
| catch (ClosedByInterruptException e) { |
| if (log.isDebugEnabled()) |
| log.debug("Closing selector due to thread interruption [srvr=" + this + |
| ", err=" + e.getMessage() + ']'); |
| } |
| catch (ClosedSelectorException e) { |
| throw new IgniteCheckedException("Selector got closed while active: " + this, e); |
| } |
| catch (IOException e) { |
| throw new IgniteCheckedException("Failed to accept connection: " + this, e); |
| } |
| finally { |
| closeSelector(); |
| } |
| } |
| |
| /** |
| * Close selector if needed. |
| */ |
| private void closeSelector() { |
| if (selector.isOpen()) { |
| if (log.isDebugEnabled()) |
| log.debug("Closing all listening sockets."); |
| |
| // Close all channels registered with selector. |
| for (SelectionKey key : selector.keys()) |
| U.close(key.channel(), log); |
| |
| if (log.isDebugEnabled()) |
| log.debug("Closing NIO selector."); |
| |
| U.close(selector, log); |
| } |
| } |
| |
| /** |
| * Processes selected accept requests for server socket. |
| * |
| * @param keys Selected keys from acceptor. |
| * @throws IOException If accept failed or IOException occurred while configuring channel. |
| */ |
| private void processSelectedKeys(Set<SelectionKey> keys) throws IOException { |
| if (log.isDebugEnabled()) |
| log.debug("Processing keys in accept worker: " + keys.size()); |
| |
| for (Iterator<SelectionKey> iter = keys.iterator(); iter.hasNext(); ) { |
| SelectionKey key = iter.next(); |
| |
| iter.remove(); |
| |
| // Was key closed? |
| if (!key.isValid()) |
| continue; |
| |
| if (key.isAcceptable()) { |
| // The key indexes into the selector so we |
| // can retrieve the socket that's ready for I/O |
| ServerSocketChannel srvrCh = (ServerSocketChannel)key.channel(); |
| |
| SocketChannel sockCh = srvrCh.accept(); |
| |
| sockCh.configureBlocking(false); |
| sockCh.socket().setTcpNoDelay(tcpNoDelay); |
| sockCh.socket().setKeepAlive(true); |
| |
| if (sockSndBuf > 0) |
| sockCh.socket().setSendBufferSize(sockSndBuf); |
| |
| if (sockRcvBuf > 0) |
| sockCh.socket().setReceiveBufferSize(sockRcvBuf); |
| |
| if (log.isDebugEnabled()) |
| log.debug("Accepted new client connection: " + sockCh.socket().getRemoteSocketAddress()); |
| |
| addRegistrationRequest(sockCh); |
| } |
| } |
| } |
| |
| /** |
| * Adds registration request for a given socket channel to the next selector. Next selector |
| * is selected according to a round-robin algorithm. |
| * |
| * @param sockCh Socket channel to be registered on one of the selectors. |
| */ |
| private void addRegistrationRequest(SocketChannel sockCh) { |
| offerBalanced(new NioOperationFuture<>(sockCh, true, null), null); |
| } |
| } |
| |
| /** |
| * Asynchronous operation that may be requested on selector. |
| */ |
| private enum NioOperation { |
| /** Register connect key selection. */ |
| CONNECT, |
| |
| /** Cancel connect. */ |
| CANCEL_CONNECT, |
| |
| /** Register read key selection. */ |
| REGISTER, |
| |
| /** Move session between workers. */ |
| MOVE, |
| |
| /** Register write key selection. */ |
| REQUIRE_WRITE, |
| |
| /** Close key. */ |
| CLOSE, |
| |
| /** Pause read. */ |
| PAUSE_READ, |
| |
| /** Resume read. */ |
| RESUME_READ, |
| |
| /** Dump statistics. */ |
| DUMP_STATS |
| } |
| |
| /** |
| * |
| */ |
| private static final class WriteRequestSystemImpl implements SessionWriteRequest, SessionChangeRequest { |
| /** */ |
| private final Object msg; |
| |
| /** */ |
| private final GridNioSession ses; |
| |
| /** */ |
| private Span span; |
| |
| /** |
| * @param ses Session. |
| * @param msg Message. |
| */ |
| WriteRequestSystemImpl(GridNioSession ses, Object msg) { |
| this.ses = ses; |
| this.msg = msg; |
| this.span = MTC.span(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void messageThread(boolean msgThread) { |
| // No-op. |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean messageThread() { |
| return true; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean skipRecovery() { |
| return true; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void onAckReceived() { |
| throw new UnsupportedOperationException(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public IgniteInClosure<IgniteException> ackClosure() { |
| return null; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void onError(Exception e) { |
| // No-op. |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public Object message() { |
| return msg; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void onMessageWritten() { |
| // No-op. |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void resetSession(GridNioSession ses) { |
| throw new UnsupportedOperationException(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public GridNioSession session() { |
| return ses; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public NioOperation operation() { |
| return NioOperation.REQUIRE_WRITE; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public Span span() { |
| return span; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public String toString() { |
| return S.toString(WriteRequestSystemImpl.class, this); |
| } |
| } |
| |
| /** |
| * |
| */ |
| private static final class WriteRequestImpl implements SessionWriteRequest, SessionChangeRequest { |
| /** */ |
| private GridNioSession ses; |
| |
| /** */ |
| private final Object msg; |
| |
| /** */ |
| private boolean msgThread; |
| |
| /** */ |
| private final boolean skipRecovery; |
| |
| /** */ |
| private final IgniteInClosure<IgniteException> ackC; |
| |
| /** Span for tracing. */ |
| private Span span; |
| |
| /** |
| * @param ses Session. |
| * @param msg Message. |
| * @param skipRecovery Skip recovery flag. |
| * @param ackC Closure invoked when message ACK is received. |
| */ |
| WriteRequestImpl(GridNioSession ses, |
| Object msg, |
| boolean skipRecovery, |
| IgniteInClosure<IgniteException> ackC) { |
| this.ses = ses; |
| this.msg = msg; |
| this.skipRecovery = skipRecovery; |
| this.ackC = ackC; |
| this.span = MTC.span(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void messageThread(boolean msgThread) { |
| this.msgThread = msgThread; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean messageThread() { |
| return msgThread; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean skipRecovery() { |
| return skipRecovery; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void onAckReceived() { |
| assert msg instanceof Message; |
| |
| ((Message)msg).onAckReceived(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public IgniteInClosure<IgniteException> ackClosure() { |
| return ackC; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void onError(Exception e) { |
| // No-op. |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public Object message() { |
| return msg; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void onMessageWritten() { |
| // No-op. |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void resetSession(GridNioSession ses) { |
| this.ses = ses; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public GridNioSession session() { |
| return ses; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public NioOperation operation() { |
| return NioOperation.REQUIRE_WRITE; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public Span span() { |
| return span; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public String toString() { |
| return S.toString(WriteRequestImpl.class, this); |
| } |
| } |
| |
| /** |
| * Class for requesting write and session close operations. |
| */ |
| private static class NioOperationFuture<R> extends GridNioFutureImpl<R> implements SessionWriteRequest, |
| SessionChangeRequest, GridNioKeyAttachment { |
| /** Socket channel in register request. */ |
| @GridToStringExclude |
| private SocketChannel sockCh; |
| |
| /** Session to perform operation on. */ |
| @GridToStringExclude |
| private GridSelectorNioSessionImpl ses; |
| |
| /** Is it a close request or a write request. */ |
| private NioOperation op; |
| |
| /** Message. */ |
| private Object msg; |
| |
| /** */ |
| @GridToStringExclude |
| private boolean accepted; |
| |
| /** */ |
| @GridToStringExclude |
| private Map<Integer, ?> meta; |
| |
| /** */ |
| @GridToStringExclude |
| private boolean skipRecovery; |
| |
| /** */ |
| private Span span; |
| |
| /** |
| * @param sockCh Socket channel. |
| * @param accepted {@code True} if socket has been accepted. |
| * @param meta Optional meta. |
| */ |
| NioOperationFuture( |
| SocketChannel sockCh, |
| boolean accepted, |
| @Nullable Map<Integer, ?> meta |
| ) { |
| super(null); |
| |
| op = NioOperation.REGISTER; |
| |
| this.sockCh = sockCh; |
| this.accepted = accepted; |
| this.meta = meta; |
| this.span = MTC.span(); |
| } |
| |
| /** |
| * Creates change request. |
| * |
| * @param ses Session to change. |
| * @param op Requested operation. |
| */ |
| NioOperationFuture(GridSelectorNioSessionImpl ses, NioOperation op) { |
| super(null); |
| |
| assert ses != null || op == NioOperation.DUMP_STATS : "Invalid params [ses=" + ses + ", op=" + op + ']'; |
| assert op != null; |
| assert op != NioOperation.REGISTER; |
| |
| this.ses = ses; |
| this.op = op; |
| this.span = MTC.span(); |
| } |
| |
| /** |
| * Creates change request. |
| * |
| * @param ses Session to change. |
| * @param op Requested operation. |
| * @param msg Message. |
| * @param ackC Closure invoked when message ACK is received. |
| */ |
| NioOperationFuture(GridSelectorNioSessionImpl ses, |
| NioOperation op, |
| Object msg, |
| IgniteInClosure<IgniteException> ackC) { |
| super(ackC); |
| |
| assert ses != null; |
| assert op != null; |
| assert op != NioOperation.REGISTER; |
| assert msg != null; |
| |
| this.ses = ses; |
| this.op = op; |
| this.msg = msg; |
| this.span = MTC.span(); |
| } |
| |
| /** |
| * Creates change request. |
| * |
| * @param ses Session to change. |
| * @param op Requested operation. |
| * @param commMsg Direct message. |
| * @param skipRecovery Skip recovery flag. |
| * @param ackC Closure invoked when message ACK is received. |
| */ |
| NioOperationFuture(GridSelectorNioSessionImpl ses, |
| NioOperation op, |
| Message commMsg, |
| boolean skipRecovery, |
| IgniteInClosure<IgniteException> ackC) { |
| super(ackC); |
| |
| assert ses != null; |
| assert op != null; |
| assert op != NioOperation.REGISTER; |
| assert commMsg != null; |
| |
| this.ses = ses; |
| this.op = op; |
| this.msg = commMsg; |
| this.skipRecovery = skipRecovery; |
| this.span = MTC.span(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean hasSession() { |
| return ses != null; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public NioOperation operation() { |
| return op; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public Span span() { |
| return span; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public Object message() { |
| return msg; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void resetSession(GridNioSession ses) { |
| assert msg instanceof Message : msg; |
| |
| this.ses = (GridSelectorNioSessionImpl)ses; |
| } |
| |
| /** |
| * @return Socket channel for register request. |
| */ |
| SocketChannel socketChannel() { |
| return sockCh; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public GridSelectorNioSessionImpl session() { |
| return ses; |
| } |
| |
| /** |
| * @return {@code True} if connection has been accepted. |
| */ |
| boolean accepted() { |
| return accepted; |
| } |
| |
| /** |
| * @return Meta. |
| */ |
| public Map<Integer, ?> meta() { |
| return meta; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void onError(Exception e) { |
| onDone(e); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void onAckReceived() { |
| assert msg instanceof Message : msg; |
| |
| ((Message)msg).onAckReceived(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void onMessageWritten() { |
| onDone(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean skipRecovery() { |
| return skipRecovery; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public String toString() { |
| return S.toString(NioOperationFuture.class, this); |
| } |
| } |
| |
| /** |
| * |
| */ |
| private static class SessionMoveFuture extends NioOperationFuture<Boolean> { |
| /** */ |
| private final int toIdx; |
| |
| /** */ |
| @GridToStringExclude |
| private SocketChannel movedSockCh; |
| |
| /** |
| * @param ses Session. |
| * @param toIdx Target worker index. |
| */ |
| SessionMoveFuture( |
| GridSelectorNioSessionImpl ses, |
| int toIdx |
| ) { |
| super(ses, NioOperation.MOVE); |
| |
| this.toIdx = toIdx; |
| } |
| |
| /** |
| * @return Target worker index. |
| */ |
| int toIndex() { |
| return toIdx; |
| } |
| |
| /** |
| * @return Moved session socket channel. |
| */ |
| SocketChannel movedSocketChannel() { |
| return movedSockCh; |
| } |
| |
| /** |
| * @param movedSockCh Moved session socket channel. |
| */ |
| void movedSocketChannel(SocketChannel movedSockCh) { |
| assert movedSockCh != null; |
| |
| this.movedSockCh = movedSockCh; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public String toString() { |
| return S.toString(SessionMoveFuture.class, this, super.toString()); |
| } |
| } |
| |
| /** |
| * Filter forwarding messages from chain's head to this server. |
| */ |
| private class HeadFilter extends GridNioFilterAdapter { |
| /** |
| * Assigns filter name. |
| */ |
| protected HeadFilter() { |
| super("HeadFilter"); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void onSessionOpened(GridNioSession ses) throws IgniteCheckedException { |
| if (directMode && sslFilter != null) |
| ses.addMeta(BUF_SSL_SYSTEM_META_KEY, new ConcurrentLinkedQueue<>()); |
| |
| proceedSessionOpened(ses); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void onSessionClosed(GridNioSession ses) throws IgniteCheckedException { |
| proceedSessionClosed(ses); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void onExceptionCaught(GridNioSession ses, |
| IgniteCheckedException ex) throws IgniteCheckedException { |
| proceedExceptionCaught(ses, ex); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, |
| Object msg, |
| boolean fut, |
| IgniteInClosure<IgniteException> ackC) throws IgniteCheckedException { |
| if (directMode) { |
| boolean sslSys = sslFilter != null && msg instanceof ByteBuffer; |
| |
| if (sslSys) { |
| ConcurrentLinkedQueue<ByteBuffer> queue = ses.meta(BUF_SSL_SYSTEM_META_KEY); |
| |
| assert queue != null; |
| |
| queue.offer((ByteBuffer)msg); |
| |
| GridSelectorNioSessionImpl ses0 = (GridSelectorNioSessionImpl)ses; |
| |
| if (!ses0.procWrite.get() && ses0.procWrite.compareAndSet(false, true)) { |
| GridNioWorker worker = ses0.worker(); |
| |
| if (worker != null) |
| worker.registerWrite(ses0); |
| } |
| |
| return null; |
| } |
| else |
| return send(ses, (Message)msg, fut, ackC); |
| } |
| else |
| return send(ses, (ByteBuffer)msg, fut, ackC); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void onMessageReceived(GridNioSession ses, Object msg) throws IgniteCheckedException { |
| proceedMessageReceived(ses, msg); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public GridNioFuture<Boolean> onSessionClose(GridNioSession ses) { |
| return close(ses); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void onSessionIdleTimeout(GridNioSession ses) throws IgniteCheckedException { |
| proceedSessionIdleTimeout(ses); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void onSessionWriteTimeout(GridNioSession ses) throws IgniteCheckedException { |
| proceedSessionWriteTimeout(ses); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public GridNioFuture<?> onPauseReads(GridNioSession ses) throws IgniteCheckedException { |
| return pauseResumeReads(ses, NioOperation.PAUSE_READ); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public GridNioFuture<?> onResumeReads(GridNioSession ses) throws IgniteCheckedException { |
| return pauseResumeReads(ses, NioOperation.RESUME_READ); |
| } |
| } |
| |
| /** |
| * Constructs a new instance of {@link GridNioServer}. |
| */ |
| @SuppressWarnings("PublicInnerClass") |
| public static class Builder<T> { |
| /** Empty filters. */ |
| private static final GridNioFilter[] EMPTY_FILTERS = new GridNioFilter[0]; |
| |
| /** Local address. */ |
| private InetAddress addr; |
| |
| /** Local port. */ |
| private int port; |
| |
| /** Logger. */ |
| private IgniteLogger log; |
| |
| /** Selector count. */ |
| private int selectorCnt; |
| |
| /** Ignite instance name. */ |
| private String igniteInstanceName; |
| |
| /** TCP_NO_DELAY flag. */ |
| private boolean tcpNoDelay; |
| |
| /** Direct buffer flag. */ |
| private boolean directBuf; |
| |
| /** Byte order. */ |
| private ByteOrder byteOrder = ByteOrder.LITTLE_ENDIAN; |
| |
| /** NIO server listener. */ |
| private GridNioServerListener<T> lsnr; |
| |
| /** Send buffer size. */ |
| private int sockSndBufSize; |
| |
| /** Receive buffer size. */ |
| private int sockRcvBufSize; |
| |
| /** Send queue limit. */ |
| private int sndQueueLimit = DFLT_SEND_QUEUE_LIMIT; |
| |
| /** Whether direct mode is used. */ |
| private boolean directMode; |
| |
| /** NIO filters. */ |
| private GridNioFilter[] filters; |
| |
| /** Idle timeout. */ |
| private long idleTimeout = -1; |
| |
| /** Write timeout. */ |
| private long writeTimeout = -1; |
| |
| /** Daemon flag. */ |
| private boolean daemon; |
| |
| /** Writer factory. */ |
| private GridNioMessageWriterFactory writerFactory; |
| |
| /** Skip recovery predicate. */ |
| private IgnitePredicate<Message> skipRecoveryPred; |
| |
| /** Message queue size listener. */ |
| private IgniteBiInClosure<GridNioSession, Integer> msgQueueLsnr; |
| |
| /** Name for threads identification. */ |
| private String srvName; |
| |
| /** */ |
| private long selectorSpins; |
| |
| /** */ |
| private boolean readWriteSelectorsAssign; |
| |
| /** Worker lifecycle listener to be used by server's worker threads. */ |
| private GridWorkerListener workerLsnr; |
| |
| /** Metrics registry. */ |
| private MetricRegistry mreg; |
| |
| /** Tracing processor */ |
| private Tracing tracing; |
| |
| /** |
| * Finishes building the instance. |
| * |
| * @return Final instance of {@link GridNioServer}. |
| * @throws IgniteCheckedException If NIO client worker creation failed or address is already in use. |
| */ |
| public GridNioServer<T> build() throws IgniteCheckedException { |
| GridNioServer<T> ret = new GridNioServer<>( |
| addr, |
| port, |
| log, |
| selectorCnt, |
| igniteInstanceName, |
| srvName, |
| selectorSpins, |
| tcpNoDelay, |
| directBuf, |
| byteOrder, |
| lsnr, |
| sockSndBufSize, |
| sockRcvBufSize, |
| sndQueueLimit, |
| directMode, |
| daemon, |
| writerFactory, |
| skipRecoveryPred, |
| msgQueueLsnr, |
| readWriteSelectorsAssign, |
| workerLsnr, |
| mreg, |
| tracing, |
| filters != null ? Arrays.copyOf(filters, filters.length) : EMPTY_FILTERS |
| ); |
| |
| if (idleTimeout >= 0) |
| ret.idleTimeout(idleTimeout); |
| |
| if (writeTimeout >= 0) |
| ret.writeTimeout(writeTimeout); |
| |
| return ret; |
| } |
| |
| /** |
| * @param readWriteSelectorsAssign {@code True} to assign in/out connections even/odd workers. |
| * @return This for chaining. |
| */ |
| public Builder<T> readWriteSelectorsAssign(boolean readWriteSelectorsAssign) { |
| this.readWriteSelectorsAssign = readWriteSelectorsAssign; |
| |
| return this; |
| } |
| |
| /** |
| * @param tracing Tracing processor. |
| * @return This for chaining. |
| */ |
| public Builder<T> tracing(Tracing tracing) { |
| this.tracing = tracing; |
| |
| return this; |
| } |
| |
| /** |
| * @param addr Local address. |
| * @return This for chaining. |
| */ |
| public Builder<T> address(InetAddress addr) { |
| this.addr = addr; |
| |
| return this; |
| } |
| |
| /** |
| * @param port Local port. If {@code -1} passed then server will not be |
| * accepting connections and only outgoing connections will be possible. |
| * @return This for chaining. |
| */ |
| public Builder<T> port(int port) { |
| this.port = port; |
| |
| return this; |
| } |
| |
| /** |
| * @param log Logger. |
| * @return This for chaining. |
| */ |
| public Builder<T> logger(IgniteLogger log) { |
| this.log = log; |
| |
| return this; |
| } |
| |
| /** |
| * @param selectorCnt Selector count. |
| * @return This for chaining. |
| */ |
| public Builder<T> selectorCount(int selectorCnt) { |
| this.selectorCnt = selectorCnt; |
| |
| return this; |
| } |
| |
| /** |
| * @param igniteInstanceName Ignite instance name. |
| * @return This for chaining. |
| */ |
| public Builder<T> igniteInstanceName(@Nullable String igniteInstanceName) { |
| this.igniteInstanceName = igniteInstanceName; |
| |
| return this; |
| } |
| |
| /** |
| * @param srvName Logical server name for threads identification. |
| * @return This for chaining. |
| */ |
| public Builder<T> serverName(@Nullable String srvName) { |
| this.srvName = srvName; |
| |
| return this; |
| } |
| |
| /** |
| * @param selectorSpins Defines how many non-blocking {@code selector.selectNow()} should be made before |
| * falling into {@code selector.select(long)} in NIO server. Long value. Default is {@code 0}. |
| * Can be set to {@code Long.MAX_VALUE} so selector threads will never block. |
| * @return This for chaining. |
| */ |
| public Builder<T> selectorSpins(long selectorSpins) { |
| this.selectorSpins = selectorSpins; |
| |
| return this; |
| } |
| |
| /** |
| * @param tcpNoDelay If TCP_NODELAY option should be set to accepted sockets. |
| * @return This for chaining. |
| */ |
| public Builder<T> tcpNoDelay(boolean tcpNoDelay) { |
| this.tcpNoDelay = tcpNoDelay; |
| |
| return this; |
| } |
| |
| /** |
| * @param directBuf Whether to use direct buffer. |
| * @return This for chaining. |
| */ |
| public Builder<T> directBuffer(boolean directBuf) { |
| this.directBuf = directBuf; |
| |
| return this; |
| } |
| |
| /** |
| * @param byteOrder Byte order to use. |
| * @return This for chaining. |
| */ |
| public Builder<T> byteOrder(ByteOrder byteOrder) { |
| this.byteOrder = byteOrder; |
| |
| return this; |
| } |
| |
| /** |
| * @param lsnr NIO server listener. |
| * @return This for chaining. |
| */ |
| public Builder<T> listener(GridNioServerListener<T> lsnr) { |
| this.lsnr = lsnr; |
| |
| return this; |
| } |
| |
| /** |
| * @param sockSndBufSize Socket send buffer size. |
| * @return This for chaining. |
| */ |
| public Builder<T> socketSendBufferSize(int sockSndBufSize) { |
| this.sockSndBufSize = sockSndBufSize; |
| |
| return this; |
| } |
| |
| /** |
| * @param sockRcvBufSize Socket receive buffer size. |
| * @return This for chaining. |
| */ |
| public Builder<T> socketReceiveBufferSize(int sockRcvBufSize) { |
| this.sockRcvBufSize = sockRcvBufSize; |
| |
| return this; |
| } |
| |
| /** |
| * @param sndQueueLimit Send queue limit. |
| * @return This for chaining. |
| */ |
| public Builder<T> sendQueueLimit(int sndQueueLimit) { |
| this.sndQueueLimit = sndQueueLimit; |
| |
| return this; |
| } |
| |
| /** |
| * @param directMode Whether direct mode is used. |
| * @return This for chaining. |
| */ |
| public Builder<T> directMode(boolean directMode) { |
| this.directMode = directMode; |
| |
| return this; |
| } |
| |
| /** |
| * @param filters NIO filters. |
| * @return This for chaining. |
| */ |
| public Builder<T> filters(GridNioFilter... filters) { |
| this.filters = filters; |
| |
| return this; |
| } |
| |
| /** |
| * @param idleTimeout Idle timeout. |
| * @return This for chaining. |
| */ |
| public Builder<T> idleTimeout(long idleTimeout) { |
| this.idleTimeout = idleTimeout; |
| |
| return this; |
| } |
| |
| /** |
| * @param writeTimeout Write timeout. |
| * @return This for chaining. |
| */ |
| public Builder<T> writeTimeout(long writeTimeout) { |
| this.writeTimeout = writeTimeout; |
| |
| return this; |
| } |
| |
| /** |
| * @param daemon Daemon flag to create threads. |
| * @return This for chaining. |
| */ |
| public Builder<T> daemon(boolean daemon) { |
| this.daemon = daemon; |
| |
| return this; |
| } |
| |
| /** |
| * @param writerFactory Writer factory. |
| * @return This for chaining. |
| */ |
| public Builder<T> writerFactory(GridNioMessageWriterFactory writerFactory) { |
| this.writerFactory = writerFactory; |
| |
| return this; |
| } |
| |
| /** |
| * @param skipRecoveryPred Skip recovery predicate. |
| * @return This for chaining. |
| */ |
| public Builder<T> skipRecoveryPredicate(IgnitePredicate<Message> skipRecoveryPred) { |
| this.skipRecoveryPred = skipRecoveryPred; |
| |
| return this; |
| } |
| |
| /** |
| * @param msgQueueLsnr Message queue size listener. |
| * @return Instance of this builder for chaining. |
| */ |
| public Builder<T> messageQueueSizeListener(IgniteBiInClosure<GridNioSession, Integer> msgQueueLsnr) { |
| this.msgQueueLsnr = msgQueueLsnr; |
| |
| return this; |
| } |
| |
| /** |
| * @param workerLsnr Worker lifecycle listener. |
| * @return This for chaining. |
| */ |
| public Builder<T> workerListener(GridWorkerListener workerLsnr) { |
| this.workerLsnr = workerLsnr; |
| |
| return this; |
| } |
| |
| /** |
| * @param mreg Metrics registry. |
| * @return This for chaining. |
| */ |
| public Builder<T> metricRegistry(MetricRegistry mreg) { |
| this.mreg = mreg; |
| |
| return this; |
| } |
| } |
| |
| /** |
| * |
| */ |
| private class ReadWriteSizeBasedBalancer implements IgniteRunnable { |
| /** */ |
| private static final long serialVersionUID = 0L; |
| |
| /** */ |
| private long lastBalance; |
| |
| /** */ |
| private final long balancePeriod; |
| |
| /** |
| * @param balancePeriod Period. |
| */ |
| ReadWriteSizeBasedBalancer(long balancePeriod) { |
| this.balancePeriod = balancePeriod; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void run() { |
| long now = U.currentTimeMillis(); |
| |
| if (lastBalance + balancePeriod < now) { |
| lastBalance = now; |
| |
| long maxRcvd0 = -1, minRcvd0 = -1, maxSent0 = -1, minSent0 = -1; |
| int maxRcvdIdx = -1, minRcvdIdx = -1, maxSentIdx = -1, minSentIdx = -1; |
| |
| for (int i = 0; i < clientWorkers.size(); i++) { |
| GridNioServer.AbstractNioClientWorker worker = clientWorkers.get(i); |
| |
| int sesCnt = worker.workerSessions.size(); |
| |
| if (i % 2 == 0) { |
| // Reader. |
| long bytesRcvd0 = worker.bytesRcvd0; |
| |
| if ((maxRcvd0 == -1 || bytesRcvd0 > maxRcvd0) && bytesRcvd0 > 0 && sesCnt > 1) { |
| maxRcvd0 = bytesRcvd0; |
| maxRcvdIdx = i; |
| } |
| |
| if (minRcvd0 == -1 || bytesRcvd0 < minRcvd0) { |
| minRcvd0 = bytesRcvd0; |
| minRcvdIdx = i; |
| } |
| } |
| else { |
| // Writer. |
| long bytesSent0 = worker.bytesSent0; |
| |
| if ((maxSent0 == -1 || bytesSent0 > maxSent0) && bytesSent0 > 0 && sesCnt > 1) { |
| maxSent0 = bytesSent0; |
| maxSentIdx = i; |
| } |
| |
| if (minSent0 == -1 || bytesSent0 < minSent0) { |
| minSent0 = bytesSent0; |
| minSentIdx = i; |
| } |
| } |
| } |
| |
| if (log.isDebugEnabled()) |
| log.debug("Balancing data [minSent0=" + minSent0 + ", minSentIdx=" + minSentIdx + |
| ", maxSent0=" + maxSent0 + ", maxSentIdx=" + maxSentIdx + |
| ", minRcvd0=" + minRcvd0 + ", minRcvdIdx=" + minRcvdIdx + |
| ", maxRcvd0=" + maxRcvd0 + ", maxRcvdIdx=" + maxRcvdIdx + ']'); |
| |
| if (maxSent0 != -1 && minSent0 != -1) { |
| GridSelectorNioSessionImpl ses = null; |
| |
| long sentDiff = maxSent0 - minSent0; |
| long delta = sentDiff; |
| double threshold = sentDiff * 0.9; |
| |
| GridConcurrentHashSet<GridSelectorNioSessionImpl> sessions = |
| clientWorkers.get(maxSentIdx).workerSessions; |
| |
| for (GridSelectorNioSessionImpl ses0 : sessions) { |
| long bytesSent0 = ses0.bytesSent0(); |
| |
| if (bytesSent0 < threshold && |
| (ses == null || delta > U.safeAbs(bytesSent0 - sentDiff / 2))) { |
| ses = ses0; |
| delta = U.safeAbs(bytesSent0 - sentDiff / 2); |
| } |
| } |
| |
| if (ses != null) { |
| if (log.isDebugEnabled()) |
| log.debug("Will move session to less loaded writer [ses=" + ses + |
| ", from=" + maxSentIdx + ", to=" + minSentIdx + ']'); |
| |
| moveSession(ses, maxSentIdx, minSentIdx); |
| } |
| else { |
| if (log.isDebugEnabled()) |
| log.debug("Unable to find session to move for writers."); |
| } |
| } |
| |
| if (maxRcvd0 != -1 && minRcvd0 != -1) { |
| GridSelectorNioSessionImpl ses = null; |
| |
| long rcvdDiff = maxRcvd0 - minRcvd0; |
| long delta = rcvdDiff; |
| double threshold = rcvdDiff * 0.9; |
| |
| GridConcurrentHashSet<GridSelectorNioSessionImpl> sessions = |
| clientWorkers.get(maxRcvdIdx).workerSessions; |
| |
| for (GridSelectorNioSessionImpl ses0 : sessions) { |
| long bytesRcvd0 = ses0.bytesReceived0(); |
| |
| if (bytesRcvd0 < threshold && |
| (ses == null || delta > U.safeAbs(bytesRcvd0 - rcvdDiff / 2))) { |
| ses = ses0; |
| delta = U.safeAbs(bytesRcvd0 - rcvdDiff / 2); |
| } |
| } |
| |
| if (ses != null) { |
| if (log.isDebugEnabled()) |
| log.debug("Will move session to less loaded reader [ses=" + ses + |
| ", from=" + maxRcvdIdx + ", to=" + minRcvdIdx + ']'); |
| |
| moveSession(ses, maxRcvdIdx, minRcvdIdx); |
| } |
| else { |
| if (log.isDebugEnabled()) |
| log.debug("Unable to find session to move for readers."); |
| } |
| } |
| |
| for (int i = 0; i < clientWorkers.size(); i++) { |
| GridNioServer.AbstractNioClientWorker worker = clientWorkers.get(i); |
| |
| worker.reset0(); |
| } |
| } |
| } |
| } |
| |
| /** |
| * |
| */ |
| private class SizeBasedBalancer implements IgniteRunnable { |
| /** */ |
| private static final long serialVersionUID = 0L; |
| |
| /** */ |
| private long lastBalance; |
| |
| /** */ |
| private final long balancePeriod; |
| |
| /** |
| * @param balancePeriod Period. |
| */ |
| SizeBasedBalancer(long balancePeriod) { |
| this.balancePeriod = balancePeriod; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void run() { |
| long now = U.currentTimeMillis(); |
| |
| if (lastBalance + balancePeriod < now) { |
| lastBalance = now; |
| |
| long maxBytes0 = -1, minBytes0 = -1; |
| int maxBytesIdx = -1, minBytesIdx = -1; |
| |
| for (int i = 0; i < clientWorkers.size(); i++) { |
| GridNioServer.AbstractNioClientWorker worker = clientWorkers.get(i); |
| |
| int sesCnt = worker.workerSessions.size(); |
| |
| long bytes0 = worker.bytesRcvd0 + worker.bytesSent0; |
| |
| if ((maxBytes0 == -1 || bytes0 > maxBytes0) && bytes0 > 0 && sesCnt > 1) { |
| maxBytes0 = bytes0; |
| maxBytesIdx = i; |
| } |
| |
| if (minBytes0 == -1 || bytes0 < minBytes0) { |
| minBytes0 = bytes0; |
| minBytesIdx = i; |
| } |
| } |
| |
| if (log.isDebugEnabled()) |
| log.debug("Balancing data [min0=" + minBytes0 + ", minIdx=" + minBytesIdx + |
| ", max0=" + maxBytes0 + ", maxIdx=" + maxBytesIdx + ']'); |
| |
| if (maxBytes0 != -1 && minBytes0 != -1) { |
| GridSelectorNioSessionImpl ses = null; |
| |
| long bytesDiff = maxBytes0 - minBytes0; |
| long delta = bytesDiff; |
| double threshold = bytesDiff * 0.9; |
| |
| GridConcurrentHashSet<GridSelectorNioSessionImpl> sessions = |
| clientWorkers.get(maxBytesIdx).workerSessions; |
| |
| for (GridSelectorNioSessionImpl ses0 : sessions) { |
| long bytesSent0 = ses0.bytesSent0(); |
| |
| if (bytesSent0 < threshold && |
| (ses == null || delta > U.safeAbs(bytesSent0 - bytesDiff / 2))) { |
| ses = ses0; |
| delta = U.safeAbs(bytesSent0 - bytesDiff / 2); |
| } |
| } |
| |
| if (ses != null) { |
| if (log.isDebugEnabled()) |
| log.debug("Will move session to less loaded worker [ses=" + ses + |
| ", from=" + maxBytesIdx + ", to=" + minBytesIdx + ']'); |
| |
| moveSession(ses, maxBytesIdx, minBytesIdx); |
| } |
| else { |
| if (log.isDebugEnabled()) |
| log.debug("Unable to find session to move."); |
| } |
| } |
| |
| for (int i = 0; i < clientWorkers.size(); i++) { |
| GridNioServer.AbstractNioClientWorker worker = clientWorkers.get(i); |
| |
| worker.reset0(); |
| } |
| } |
| } |
| } |
| |
| /** |
| * For tests only. |
| */ |
| @SuppressWarnings("unchecked") |
| private class RandomBalancer implements IgniteRunnable { |
| /** */ |
| private static final long serialVersionUID = 0L; |
| |
| /** {@inheritDoc} */ |
| @Override public void run() { |
| ThreadLocalRandom rnd = ThreadLocalRandom.current(); |
| |
| int w1 = rnd.nextInt(clientWorkers.size()); |
| |
| if (clientWorkers.get(w1).workerSessions.isEmpty()) |
| return; |
| |
| int w2 = rnd.nextInt(clientWorkers.size()); |
| |
| while (w2 == w1) |
| w2 = rnd.nextInt(clientWorkers.size()); |
| |
| GridNioSession ses = randomSession(clientWorkers.get(w1)); |
| |
| if (ses != null) { |
| if (log.isInfoEnabled()) |
| log.info("Move session [from=" + w1 + |
| ", to=" + w2 + |
| ", ses=" + ses + ']'); |
| |
| moveSession(ses, w1, w2); |
| } |
| } |
| |
| /** |
| * @param worker Worker. |
| * @return NIO session. |
| */ |
| private GridNioSession randomSession(GridNioServer.AbstractNioClientWorker worker) { |
| Collection<GridNioSession> sessions = worker.workerSessions; |
| |
| int size = sessions.size(); |
| |
| if (size == 0) |
| return null; |
| |
| int idx = ThreadLocalRandom.current().nextInt(size); |
| |
| Iterator<GridNioSession> it = sessions.iterator(); |
| |
| int cnt = 0; |
| |
| while (it.hasNext()) { |
| GridNioSession ses = it.next(); |
| |
| if (cnt == idx) |
| return ses; |
| } |
| |
| return null; |
| } |
| |
| } |
| |
| /** |
| * |
| */ |
| interface SessionChangeRequest { |
| /** |
| * @return Session. |
| */ |
| GridNioSession session(); |
| |
| /** |
| * @return Requested change operation. |
| */ |
| NioOperation operation(); |
| } |
| } |