blob: cc2074844dba464bbfe5006ddac500d94d3ee338 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.util.nio;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.processors.metric.MetricRegistry;
import org.apache.ignite.internal.processors.metric.impl.LongAdderMetric;
import org.apache.ignite.internal.processors.tracing.MTC;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.util.deque.FastSizeDeque;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.internal.processors.tracing.messages.TraceableMessagesTable.traceName;
import static org.apache.ignite.internal.util.nio.GridNioServer.OUTBOUND_MESSAGES_QUEUE_SIZE_METRIC_DESC;
import static org.apache.ignite.internal.util.nio.GridNioServer.OUTBOUND_MESSAGES_QUEUE_SIZE_METRIC_NAME;
/**
* Session implementation bound to selector API and socket API.
* Note that this implementation requires non-null values for local and remote
* socket addresses.
*/
public class GridSelectorNioSessionImpl extends GridNioSessionImpl implements GridNioKeyAttachment {
/** Pending write requests. */
private final FastSizeDeque<SessionWriteRequest> queue = new FastSizeDeque<>(new ConcurrentLinkedDeque<>());
/** Selection key associated with this session. */
@GridToStringExclude
private SelectionKey key;
/** Current worker thread. */
private volatile GridNioWorker worker;
/** Semaphore. */
@GridToStringExclude
private final Semaphore sem;
/** Write buffer. */
private ByteBuffer writeBuf;
/** Read buffer. */
private ByteBuffer readBuf;
/** Incoming recovery data. */
private GridNioRecoveryDescriptor inRecovery;
/** Outgoing recovery data. */
private GridNioRecoveryDescriptor outRecovery;
/** Logger. */
private final IgniteLogger log;
/** */
private List<GridNioServer.SessionChangeRequest> pendingStateChanges;
/** */
final AtomicBoolean procWrite = new AtomicBoolean();
/** */
private Object sysMsg;
/** Close channel on session #close() called. */
private volatile boolean closeSocket = true;
/** Outbound messages queue size metric. */
@Nullable private final LongAdderMetric outboundMessagesQueueSizeMetric;
/**
* Creates session instance.
*
* @param log Logger.
* @param worker NIO worker thread.
* @param filterChain Filter chain that will handle requests.
* @param locAddr Local address.
* @param rmtAddr Remote address.
* @param accepted Accepted flag.
* @param sndQueueLimit Send queue limit.
* @param writeBuf Write buffer.
* @param readBuf Read buffer.
*/
GridSelectorNioSessionImpl(
IgniteLogger log,
GridNioWorker worker,
GridNioFilterChain filterChain,
InetSocketAddress locAddr,
InetSocketAddress rmtAddr,
boolean accepted,
int sndQueueLimit,
@Nullable MetricRegistry mreg,
@Nullable ByteBuffer writeBuf,
@Nullable ByteBuffer readBuf
) {
super(filterChain, locAddr, rmtAddr, accepted);
assert worker != null;
assert sndQueueLimit >= 0;
assert locAddr != null : "GridSelectorNioSessionImpl should have local socket address.";
assert rmtAddr != null : "GridSelectorNioSessionImpl should have remote socket address.";
assert log != null;
this.log = log;
this.worker = worker;
sem = sndQueueLimit > 0 ? new Semaphore(sndQueueLimit) : null;
if (writeBuf != null) {
writeBuf.clear();
this.writeBuf = writeBuf;
}
if (readBuf != null) {
readBuf.clear();
this.readBuf = readBuf;
}
outboundMessagesQueueSizeMetric = mreg == null ? null : mreg.longAdderMetric(
OUTBOUND_MESSAGES_QUEUE_SIZE_METRIC_NAME,
OUTBOUND_MESSAGES_QUEUE_SIZE_METRIC_DESC
);
}
/** {@inheritDoc} */
@Override public boolean hasSession() {
return true;
}
/** {@inheritDoc} */
@Nullable @Override public GridSelectorNioSessionImpl session() {
return this;
}
/**
* @return Worker.
*/
GridNioWorker worker() {
return worker;
}
/**
* Sets selection key for this session.
*
* @param key Selection key.
*/
void key(SelectionKey key) {
assert key != null;
this.key = key;
}
/**
* @return Write buffer.
*/
public ByteBuffer writeBuffer() {
return writeBuf;
}
/**
* @return Read buffer.
*/
public ByteBuffer readBuffer() {
return readBuf;
}
/**
* @return Registered selection key for this session.
*/
public SelectionKey key() {
return key;
}
/**
* @return {@code True} to close SocketChannel on current session close occured.
*/
public boolean closeSocketOnSessionClose() {
return closeSocket;
}
/**
* @param closeSocket {@code False} remain SocketChannel open on session close.
*/
public void closeSocketOnSessionClose(boolean closeSocket) {
this.closeSocket = closeSocket;
}
/**
* @param from Current session worker.
* @param fut Move future.
* @return {@code True} if session move was scheduled.
*/
boolean offerMove(GridNioWorker from, GridNioServer.SessionChangeRequest fut) {
synchronized (this) {
if (log.isDebugEnabled())
log.debug("Offered move [ses=" + this + ", fut=" + fut + ']');
GridNioWorker worker0 = worker;
if (worker0 != from)
return false;
worker.offer(fut);
}
return true;
}
/**
* @param fut Future.
*/
void offerStateChange(GridNioServer.SessionChangeRequest fut) {
synchronized (this) {
if (log.isDebugEnabled())
log.debug("Offered move [ses=" + this + ", fut=" + fut + ']');
GridNioWorker worker0 = worker;
if (worker0 == null) {
if (pendingStateChanges == null)
pendingStateChanges = new ArrayList<>();
pendingStateChanges.add(fut);
}
else
worker0.offer(fut);
}
}
/**
* @param moveFrom Current session worker.
*/
void startMoveSession(GridNioWorker moveFrom) {
synchronized (this) {
assert this.worker == moveFrom;
if (log.isDebugEnabled())
log.debug("Started moving [ses=" + this + ", from=" + moveFrom + ']');
List<GridNioServer.SessionChangeRequest> sesReqs = moveFrom.clearSessionRequests(this);
worker = null;
if (sesReqs != null) {
if (pendingStateChanges == null)
pendingStateChanges = new ArrayList<>();
pendingStateChanges.addAll(sesReqs);
}
}
}
/**
* @param moveTo New session worker.
*/
void finishMoveSession(GridNioWorker moveTo) {
synchronized (this) {
assert worker == null;
if (log.isDebugEnabled())
log.debug("Finishing moving [ses=" + this + ", to=" + moveTo + ']');
worker = moveTo;
if (pendingStateChanges != null) {
moveTo.offer(pendingStateChanges);
pendingStateChanges = null;
}
}
}
/**
* Adds write future at the front of the queue without acquiring back pressure semaphore.
*
* @param writeFut Write request.
* @return Updated size of the queue.
*/
int offerSystemFuture(SessionWriteRequest writeFut) {
writeFut.messageThread(true);
boolean res = queue.offerFirst(writeFut);
MTC.span().addLog("Added to system queue - " + traceName(writeFut.message()));
assert res : "Future was not added to queue";
if (outboundMessagesQueueSizeMetric != null)
outboundMessagesQueueSizeMetric.increment();
return queue.sizex();
}
/**
* Adds write future to the pending list and returns the size of the queue.
* <p>
* Note that separate counter for the queue size is needed because in case of concurrent
* calls this method should return different values (when queue size is 0 and 2 concurrent calls
* occur exactly one call will return 1)
*
* @param writeFut Write request to add.
* @return Updated size of the queue.
*/
int offerFuture(SessionWriteRequest writeFut) {
boolean msgThread = GridNioBackPressureControl.threadProcessingMessage();
if (sem != null && !msgThread)
sem.acquireUninterruptibly();
writeFut.messageThread(msgThread);
boolean res = queue.offer(writeFut);
MTC.span().addLog("Added to queue - " + traceName(writeFut.message()));
assert res : "Future was not added to queue";
if (outboundMessagesQueueSizeMetric != null)
outboundMessagesQueueSizeMetric.increment();
return queue.sizex();
}
/**
* @param futs Futures to resend.
*/
void resend(Collection<SessionWriteRequest> futs) {
assert queue.isEmpty() : queue.size();
boolean add = queue.addAll(futs);
assert add;
if (outboundMessagesQueueSizeMetric != null)
outboundMessagesQueueSizeMetric.add(futs.size());
}
/**
* @return Message that is in the head of the queue, {@code null} if queue is empty.
*/
@Nullable SessionWriteRequest pollFuture() {
SessionWriteRequest last = queue.poll();
if (last != null) {
if (outboundMessagesQueueSizeMetric != null)
outboundMessagesQueueSizeMetric.decrement();
if (sem != null && !last.messageThread())
sem.release();
if (outRecovery != null) {
if (!outRecovery.add(last)) {
LT.warn(log, "Unacknowledged messages queue size overflow, will attempt to reconnect " +
"[remoteAddr=" + remoteAddress() +
", queueLimit=" + outRecovery.queueLimit() + ']');
if (log.isDebugEnabled())
log.debug("Unacknowledged messages queue size overflow, will attempt to reconnect " +
"[remoteAddr=" + remoteAddress() +
", queueSize=" + outRecovery.messagesRequests().size() +
", queueLimit=" + outRecovery.queueLimit() + ']');
close();
}
}
}
return last;
}
/**
* @param fut Future.
* @return {@code True} if future was removed from queue.
*/
boolean removeFuture(SessionWriteRequest fut) {
assert closed();
boolean rmv = queue.removeLastOccurrence(fut);
if (rmv && outboundMessagesQueueSizeMetric != null)
outboundMessagesQueueSizeMetric.decrement();
return rmv;
}
/**
* Gets number of write requests in a queue that have not been processed yet.
*
* @return Number of write requests.
*/
int writeQueueSize() {
return queue.sizex();
}
/**
* @return Write requests.
*/
Collection<SessionWriteRequest> writeQueue() {
return queue;
}
/** {@inheritDoc} */
@Override public void outRecoveryDescriptor(GridNioRecoveryDescriptor recoveryDesc) {
assert recoveryDesc != null;
outRecovery = recoveryDesc;
outRecovery.session(this);
}
/** {@inheritDoc} */
@Nullable @Override public GridNioRecoveryDescriptor outRecoveryDescriptor() {
return outRecovery;
}
/** {@inheritDoc} */
@Override public void inRecoveryDescriptor(GridNioRecoveryDescriptor recoveryDesc) {
assert recoveryDesc != null;
inRecovery = recoveryDesc;
}
/** {@inheritDoc} */
@Nullable @Override public GridNioRecoveryDescriptor inRecoveryDescriptor() {
return inRecovery;
}
/**
*
*/
void onServerStopped() {
onClosed();
}
/**
*
*/
void onClosed() {
if (sem != null)
sem.release(1_000_000);
}
/** {@inheritDoc} */
@Override public void systemMessage(Object sysMsg) {
this.sysMsg = sysMsg;
}
/**
* @return {@code True} if have pending system message to send.
*/
boolean hasSystemMessage() {
return sysMsg != null;
}
/**
* Gets and clears pending system message.
*
* @return Pending system message.
*/
Object systemMessage() {
Object ret = sysMsg;
sysMsg = null;
return ret;
}
/** {@inheritDoc} */
@Override public GridNioFuture<Boolean> close() {
GridNioFuture<Boolean> fut = super.close();
if (!fut.isDone()) {
fut.listen(fut0 -> {
try {
fut0.get();
}
catch (IgniteCheckedException e) {
log.error("Failed to close session [ses=" + GridSelectorNioSessionImpl.this + ']', e);
}
});
}
else if (fut.error() != null)
log.error("Failed to close session [ses=" + GridSelectorNioSessionImpl.this + ']', fut.error());
return fut;
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(GridSelectorNioSessionImpl.class, this, super.toString());
}
}