blob: d1574b67946c98779f587254b5fe78b23d3624e9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.util.nio;
import java.net.InetSocketAddress;
import java.security.cert.Certificate;
import java.util.concurrent.atomic.AtomicLong;
import javax.net.ssl.SSLPeerUnverifiedException;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.internal.util.nio.ssl.GridSslMeta;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteInClosure;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.internal.util.nio.GridNioSessionMetaKey.MAX_KEYS_CNT;
import static org.apache.ignite.internal.util.nio.GridNioSessionMetaKey.SSL_META;
/**
*
*/
public class GridNioSessionImpl implements GridNioSession {
/** Metadata "map". */
private final Object[] meta = new Object[MAX_KEYS_CNT];
/** Local connection address. */
private final InetSocketAddress locAddr;
/** Remote connection address */
private final InetSocketAddress rmtAddr;
/** Session create timestamp. */
private long createTime;
/** Session close timestamp. */
private final AtomicLong closeTime = new AtomicLong();
/** Sent bytes counter. */
private volatile long bytesSent;
/** Received bytes counter. */
private volatile long bytesRcvd;
/** Sent bytes since last NIO sessions balancing. */
private volatile long bytesSent0;
/** Received bytes since last NIO sessions balancing. */
private volatile long bytesRcvd0;
/** Last send schedule timestamp. */
private volatile long sndSchedTime;
/** Last send activity timestamp. */
private volatile long lastSndTime;
/** Last read activity timestamp. */
private volatile long lastRcvTime;
/** Reads paused flag. */
private volatile boolean readsPaused;
/** Filter chain that will handle write and close requests. */
private GridNioFilterChain filterChain;
/** Accepted flag. */
private final boolean accepted;
/** For debug purposes. */
private volatile boolean markedForClose;
/**
* @param filterChain Chain.
* @param locAddr Local address.
* @param rmtAddr Remote address.
* @param accepted {@code True} if this session was initiated from remote host.
*/
public GridNioSessionImpl(
GridNioFilterChain filterChain,
@Nullable InetSocketAddress locAddr,
@Nullable InetSocketAddress rmtAddr,
boolean accepted
) {
this.filterChain = filterChain;
this.locAddr = locAddr;
this.rmtAddr = rmtAddr;
this.accepted = accepted;
long now = U.currentTimeMillis();
sndSchedTime = now;
createTime = now;
lastSndTime = now;
lastRcvTime = now;
}
/** {@inheritDoc} */
@Override public GridNioFuture<?> send(Object msg) {
try {
resetSendScheduleTime();
return chain().onSessionWrite(this, msg, true, null);
}
catch (IgniteCheckedException e) {
close();
return new GridNioFinishedFuture<Object>(e);
}
}
/** {@inheritDoc} */
@Override public void sendNoFuture(Object msg, IgniteInClosure<IgniteException> ackC)
throws IgniteCheckedException {
try {
chain().onSessionWrite(this, msg, false, ackC);
}
catch (IgniteCheckedException e) {
close();
throw e;
}
}
/** {@inheritDoc} */
@Override public GridNioFuture<?> resumeReads() {
try {
return chain().onResumeReads(this);
}
catch (IgniteCheckedException e) {
close();
return new GridNioFinishedFuture<Object>(e);
}
}
/** {@inheritDoc} */
@Override public GridNioFuture<?> pauseReads() {
try {
return chain().onPauseReads(this);
}
catch (IgniteCheckedException e) {
close();
return new GridNioFinishedFuture<Object>(e);
}
}
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override public GridNioFuture<Boolean> close() {
markedForClose = true;
try {
return filterChain.onSessionClose(this);
}
catch (IgniteCheckedException e) {
return new GridNioFinishedFuture<>(e);
}
}
/** {@inheritDoc} */
@Override @Nullable public InetSocketAddress localAddress() {
return locAddr;
}
/** {@inheritDoc} */
@Override @Nullable public InetSocketAddress remoteAddress() {
return rmtAddr;
}
/** {@inheritDoc} */
@Override public long bytesSent() {
return bytesSent;
}
/** {@inheritDoc} */
@Override public long bytesReceived() {
return bytesRcvd;
}
/**
* @return Sent bytes since last NIO sessions balancing.
*/
public long bytesSent0() {
return bytesSent0;
}
/**
* @return Received bytes since last NIO sessions balancing.
*/
public long bytesReceived0() {
return bytesRcvd0;
}
/**
*
*/
public void reset0() {
bytesSent0 = 0;
bytesRcvd0 = 0;
}
/** {@inheritDoc} */
@Override public long createTime() {
return createTime;
}
/** {@inheritDoc} */
@Override public long closeTime() {
return closeTime.get();
}
/** {@inheritDoc} */
@Override public long lastReceiveTime() {
return lastRcvTime;
}
/** {@inheritDoc} */
@Override public long lastSendTime() {
return lastSndTime;
}
/** {@inheritDoc} */
@Override public long lastSendScheduleTime() {
return sndSchedTime;
}
/** {@inheritDoc} */
@Override public <T> T meta(int key) {
assert key < meta.length;
return (T)meta[key];
}
/** {@inheritDoc} */
@Override public <T> T addMeta(int key, @Nullable T val) {
assert key < meta.length;
Object prev = meta[key];
meta[key] = val;
return (T)prev;
}
/** {@inheritDoc} */
@Override public <T> T removeMeta(int key) {
assert key < meta.length;
Object prev = meta[key];
meta[key] = null;
return (T)prev;
}
/** {@inheritDoc} */
@Override public boolean accepted() {
return accepted;
}
/** */
@Override public Certificate[] certificates() {
GridSslMeta meta = meta(SSL_META.ordinal());
if (meta != null) {
try {
return meta.sslEngine().getSession().getPeerCertificates();
}
catch (SSLPeerUnverifiedException e) {
// Nothing to do.
}
}
return null;
}
/**
* @param <T> Chain type.
* @return Filter chain.
*/
@SuppressWarnings("unchecked")
protected <T> GridNioFilterChain<T> chain() {
return filterChain;
}
/**
* Adds given amount of bytes to the sent bytes counter.
* <p>
* Note that this method is designed to be called in one thread only.
*
* @param cnt Number of bytes sent.
*/
public void bytesSent(int cnt) {
bytesSent += cnt;
bytesSent0 += cnt;
lastSndTime = U.currentTimeMillis();
}
/**
* Adds given amount ob bytes to the received bytes counter.
* <p>
* Note that this method is designed to be called in one thread only.
*
* @param cnt Number of bytes received.
*/
public void bytesReceived(int cnt) {
bytesRcvd += cnt;
bytesRcvd0 += cnt;
lastRcvTime = U.currentTimeMillis();
}
/**
* Resets send schedule time to avoid multiple idle notifications.
*/
public void resetSendScheduleTime() {
sndSchedTime = U.currentTimeMillis();
}
/**
* Atomically moves this session into a closed state.
*
* @return {@code True} if session was moved to a closed state,
* {@code false} if session was already closed.
*/
public boolean setClosed() {
return closeTime.compareAndSet(0, U.currentTimeMillis());
}
/**
* @return {@code True} if this session was closed.
*/
public boolean closed() {
return closeTime.get() != 0;
}
/**
* @param readsPaused New reads paused flag.
*/
public void readsPaused(boolean readsPaused) {
this.readsPaused = readsPaused;
}
/**
* @return Reads paused flag.
*/
@Override public boolean readsPaused() {
return readsPaused;
}
/** {@inheritDoc} */
@Override public void outRecoveryDescriptor(GridNioRecoveryDescriptor recoveryDesc) {
throw new UnsupportedOperationException();
}
/** {@inheritDoc} */
@Nullable @Override public GridNioRecoveryDescriptor outRecoveryDescriptor() {
return null;
}
/** {@inheritDoc} */
@Override public void inRecoveryDescriptor(GridNioRecoveryDescriptor recoveryDesc) {
throw new UnsupportedOperationException();
}
/** {@inheritDoc} */
@Nullable @Override public GridNioRecoveryDescriptor inRecoveryDescriptor() {
return null;
}
/** {@inheritDoc} */
@Override public void systemMessage(Object msg) {
throw new UnsupportedOperationException();
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(GridNioSessionImpl.class, this);
}
}