blob: 70e9f12796d07f8ca97f08163a336edb1f33075d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sshd.common.session.helpers;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.IntUnaryOperator;
import org.apache.sshd.agent.common.AgentForwardSupport;
import org.apache.sshd.agent.common.DefaultAgentForwardSupport;
import org.apache.sshd.client.channel.AbstractClientChannel;
import org.apache.sshd.client.future.OpenFuture;
import org.apache.sshd.common.Closeable;
import org.apache.sshd.common.FactoryManager;
import org.apache.sshd.common.SshConstants;
import org.apache.sshd.common.channel.Channel;
import org.apache.sshd.common.channel.ChannelFactory;
import org.apache.sshd.common.channel.ChannelListener;
import org.apache.sshd.common.channel.LocalWindow;
import org.apache.sshd.common.channel.RequestHandler;
import org.apache.sshd.common.channel.exception.SshChannelNotFoundException;
import org.apache.sshd.common.channel.exception.SshChannelOpenException;
import org.apache.sshd.common.forward.Forwarder;
import org.apache.sshd.common.forward.ForwarderFactory;
import org.apache.sshd.common.forward.PortForwardingEventListener;
import org.apache.sshd.common.forward.PortForwardingEventListenerManager;
import org.apache.sshd.common.io.AbstractIoWriteFuture;
import org.apache.sshd.common.io.IoWriteFuture;
import org.apache.sshd.common.kex.KexState;
import org.apache.sshd.common.session.ConnectionService;
import org.apache.sshd.common.session.ReservedSessionMessagesHandler;
import org.apache.sshd.common.session.Session;
import org.apache.sshd.common.session.UnknownChannelReferenceHandler;
import org.apache.sshd.common.util.EventListenerUtils;
import org.apache.sshd.common.util.GenericUtils;
import org.apache.sshd.common.util.ValidateUtils;
import org.apache.sshd.common.util.buffer.Buffer;
import org.apache.sshd.common.util.closeable.AbstractInnerCloseable;
import org.apache.sshd.common.util.functors.Int2IntFunction;
import org.apache.sshd.core.CoreModuleProperties;
import org.apache.sshd.server.x11.DefaultX11ForwardSupport;
import org.apache.sshd.server.x11.X11ForwardSupport;
/**
* Base implementation of ConnectionService.
*
* @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
*/
public abstract class AbstractConnectionService
extends AbstractInnerCloseable
implements ConnectionService {
/**
* Default growth factor function used to resize response buffers
*/
public static final IntUnaryOperator RESPONSE_BUFFER_GROWTH_FACTOR = Int2IntFunction.add(Byte.SIZE);
/** Used in {@code SSH_MSH_IGNORE} messages for the keep-alive mechanism */
public static final String DEFAULT_SESSION_IGNORE_HEARTBEAT_STRING = "ignore@sshd.apache.org";
/**
* Map of channels keyed by the identifier
*/
protected final Map<Long, Channel> channels = new ConcurrentHashMap<>();
/**
* Next channel identifier - a UINT32 represented as a long
*/
protected final AtomicLong nextChannelId = new AtomicLong(0L);
protected final AtomicLong heartbeatCount = new AtomicLong(0L);
private ScheduledFuture<?> heartBeat;
private final AtomicReference<AgentForwardSupport> agentForwardHolder = new AtomicReference<>();
private final AtomicReference<X11ForwardSupport> x11ForwardHolder = new AtomicReference<>();
private final AtomicReference<Forwarder> forwarderHolder = new AtomicReference<>();
private final AtomicBoolean allowMoreSessions = new AtomicBoolean(true);
private final Collection<PortForwardingEventListener> listeners = new CopyOnWriteArraySet<>();
private final Collection<PortForwardingEventListenerManager> managersHolder = new CopyOnWriteArraySet<>();
private final Map<String, Object> properties = new ConcurrentHashMap<>();
private final PortForwardingEventListener listenerProxy;
private final AbstractSession sessionInstance;
private UnknownChannelReferenceHandler unknownChannelReferenceHandler;
protected AbstractConnectionService(AbstractSession session) {
sessionInstance = Objects.requireNonNull(session, "No session");
listenerProxy = EventListenerUtils.proxyWrapper(PortForwardingEventListener.class, listeners);
}
@Override
public Map<String, Object> getProperties() {
return properties;
}
@Override
public PortForwardingEventListener getPortForwardingEventListenerProxy() {
return listenerProxy;
}
@Override
public void addPortForwardingEventListener(PortForwardingEventListener listener) {
listeners.add(PortForwardingEventListener.validateListener(listener));
}
@Override
public void removePortForwardingEventListener(PortForwardingEventListener listener) {
if (listener == null) {
return;
}
listeners.remove(PortForwardingEventListener.validateListener(listener));
}
@Override
public UnknownChannelReferenceHandler getUnknownChannelReferenceHandler() {
return unknownChannelReferenceHandler;
}
@Override
public void setUnknownChannelReferenceHandler(UnknownChannelReferenceHandler handler) {
unknownChannelReferenceHandler = handler;
}
@Override
public Collection<PortForwardingEventListenerManager> getRegisteredManagers() {
return managersHolder.isEmpty() ? Collections.emptyList() : new ArrayList<>(managersHolder);
}
@Override
public boolean addPortForwardingEventListenerManager(PortForwardingEventListenerManager manager) {
return managersHolder.add(Objects.requireNonNull(manager, "No manager"));
}
@Override
public boolean removePortForwardingEventListenerManager(PortForwardingEventListenerManager manager) {
if (manager == null) {
return false;
}
return managersHolder.remove(manager);
}
public Collection<Channel> getChannels() {
return channels.values();
}
@Override
public AbstractSession getSession() {
return sessionInstance;
}
@Override
public void start() {
heartBeat = startHeartBeat();
}
protected synchronized ScheduledFuture<?> startHeartBeat() {
stopHeartBeat(); // make sure any existing heartbeat is stopped
HeartbeatType heartbeatType = getSessionHeartbeatType();
Duration interval = getSessionHeartbeatInterval();
Session session = getSession();
boolean debugEnabled = log.isDebugEnabled();
if (debugEnabled) {
log.debug("startHeartbeat({}) heartbeat type={}, interval={}", session, heartbeatType, interval);
}
if ((heartbeatType == null) || (heartbeatType == HeartbeatType.NONE) || (GenericUtils.isNegativeOrNull(interval))) {
return null;
}
FactoryManager manager = session.getFactoryManager();
ScheduledExecutorService service = manager.getScheduledExecutorService();
return service.scheduleAtFixedRate(
this::sendHeartBeat, interval.toMillis(), interval.toMillis(), TimeUnit.MILLISECONDS);
}
/**
* Sends a heartbeat message/packet
*
* @return {@code true} if heartbeat successfully sent
*/
protected boolean sendHeartBeat() {
HeartbeatType heartbeatType = getSessionHeartbeatType();
Duration interval = getSessionHeartbeatInterval();
Session session = getSession();
boolean traceEnabled = log.isTraceEnabled();
if (traceEnabled) {
log.trace("sendHeartbeat({}) heartbeat type={}, interval={}",
session, heartbeatType, interval);
}
if ((heartbeatType == null) || (GenericUtils.isNegativeOrNull(interval)) || (heartBeat == null)) {
return false;
}
// SSHD-1059
KexState kexState = session.getKexState();
if ((heartbeatType != HeartbeatType.NONE)
&& (kexState != KexState.DONE)) {
if (traceEnabled) {
log.trace("sendHeartbeat({}) heartbeat type={}, interval={} - skip due to KEX state={}",
session, heartbeatType, interval, kexState);
}
return false;
}
try {
switch (heartbeatType) {
case NONE:
return false;
case IGNORE: {
Buffer buffer = session.createBuffer(
SshConstants.SSH_MSG_IGNORE, DEFAULT_SESSION_IGNORE_HEARTBEAT_STRING.length() + Byte.SIZE);
buffer.putString(DEFAULT_SESSION_IGNORE_HEARTBEAT_STRING);
IoWriteFuture future = session.writePacket(buffer);
future.addListener(this::futureDone);
return true;
}
case RESERVED: {
ReservedSessionMessagesHandler handler = Objects.requireNonNull(
session.getReservedSessionMessagesHandler(),
"No customized heartbeat handler registered");
return handler.sendReservedHeartbeat(this);
}
default:
throw new UnsupportedOperationException("Unsupported heartbeat type: " + heartbeatType);
}
} catch (Throwable e) {
session.exceptionCaught(e);
warn("sendHeartBeat({}) failed ({}) to send heartbeat #{} request={}: {}",
session, e.getClass().getSimpleName(), heartbeatCount, heartbeatType, e.getMessage(), e);
return false;
}
}
protected void futureDone(IoWriteFuture future) {
Throwable t = future.getException();
if (t != null) {
Session session = getSession();
session.exceptionCaught(t);
}
}
protected synchronized void stopHeartBeat() {
boolean debugEnabled = log.isDebugEnabled();
Session session = getSession();
if (heartBeat == null) {
if (debugEnabled) {
log.debug("stopHeartBeat({}) no heartbeat to stop", session);
}
return;
}
if (debugEnabled) {
log.debug("stopHeartBeat({}) stopping", session);
}
try {
heartBeat.cancel(true);
} finally {
heartBeat = null;
}
if (debugEnabled) {
log.debug("stopHeartBeat({}) stopped", session);
}
}
@Override
public Forwarder getForwarder() {
Forwarder forwarder;
Session session = getSession();
synchronized (forwarderHolder) {
forwarder = forwarderHolder.get();
if (forwarder != null) {
return forwarder;
}
forwarder = ValidateUtils.checkNotNull(
createForwardingFilter(session), "No forwarder created for %s", session);
forwarderHolder.set(forwarder);
}
if (log.isDebugEnabled()) {
log.debug("getForwardingFilter({}) created instance", session);
}
return forwarder;
}
@Override
protected void preClose() {
stopHeartBeat();
this.listeners.clear();
this.managersHolder.clear();
super.preClose();
}
protected Forwarder createForwardingFilter(Session session) {
FactoryManager manager = Objects.requireNonNull(session.getFactoryManager(), "No factory manager");
ForwarderFactory factory = Objects.requireNonNull(manager.getForwarderFactory(), "No forwarder factory");
Forwarder forwarder = factory.create(this);
forwarder.addPortForwardingEventListenerManager(this);
return forwarder;
}
@Override
public X11ForwardSupport getX11ForwardSupport() {
X11ForwardSupport x11Support;
Session session = getSession();
synchronized (x11ForwardHolder) {
x11Support = x11ForwardHolder.get();
if (x11Support != null) {
return x11Support;
}
x11Support = ValidateUtils.checkNotNull(
createX11ForwardSupport(session), "No X11 forwarder created for %s", session);
x11ForwardHolder.set(x11Support);
}
if (log.isDebugEnabled()) {
log.debug("getX11ForwardSupport({}) created instance", session);
}
return x11Support;
}
protected X11ForwardSupport createX11ForwardSupport(Session session) {
return new DefaultX11ForwardSupport(this);
}
@Override
public AgentForwardSupport getAgentForwardSupport() {
AgentForwardSupport agentForward;
Session session = getSession();
synchronized (agentForwardHolder) {
agentForward = agentForwardHolder.get();
if (agentForward != null) {
return agentForward;
}
agentForward = ValidateUtils.checkNotNull(
createAgentForwardSupport(session), "No agent forward created for %s", session);
agentForwardHolder.set(agentForward);
}
if (log.isDebugEnabled()) {
log.debug("getAgentForwardSupport({}) created instance", session);
}
return agentForward;
}
protected AgentForwardSupport createAgentForwardSupport(Session session) {
return new DefaultAgentForwardSupport(this);
}
@Override
protected Closeable getInnerCloseable() {
return builder()
.sequential(forwarderHolder.get(), agentForwardHolder.get(), x11ForwardHolder.get())
.parallel(toString(), getChannels())
.build();
}
protected long getNextChannelId() {
return nextChannelId.getAndIncrement();
}
@Override
public long registerChannel(Channel channel) throws IOException {
Session session = getSession();
int maxChannels = CoreModuleProperties.MAX_CONCURRENT_CHANNELS.getRequired(this);
int curSize = channels.size();
if (curSize > maxChannels) {
throw new IllegalStateException("Currently active channels (" + curSize + ") at max.: " + maxChannels);
}
long channelId = getNextChannelId();
channel.init(this, session, channelId);
boolean registered = false;
synchronized (channels) {
if (!isClosing()) {
channels.put(channelId, channel);
registered = true;
}
}
if (log.isDebugEnabled()) {
log.debug("registerChannel({})[id={}, registered={}] {}", this, channelId, registered, channel);
}
channel.handleChannelRegistrationResult(this, session, channelId, registered);
return channelId;
}
/**
* Remove this channel from the list of managed channels
*
* @param channel the channel
*/
@Override
public void unregisterChannel(Channel channel) {
long channelId = channel.getChannelId();
Channel result;
synchronized (channels) {
result = channels.remove(channelId);
}
if (log.isDebugEnabled()) {
log.debug("unregisterChannel({}) result={}", channel, result);
}
if (result != null) {
result.handleChannelUnregistration(this);
}
}
@Override
public void process(int cmd, Buffer buffer) throws Exception {
switch (cmd) {
case SshConstants.SSH_MSG_CHANNEL_OPEN:
channelOpen(buffer);
break;
case SshConstants.SSH_MSG_CHANNEL_OPEN_CONFIRMATION:
channelOpenConfirmation(buffer);
break;
case SshConstants.SSH_MSG_CHANNEL_OPEN_FAILURE:
channelOpenFailure(buffer);
break;
case SshConstants.SSH_MSG_CHANNEL_REQUEST:
channelRequest(buffer);
break;
case SshConstants.SSH_MSG_CHANNEL_DATA:
channelData(buffer);
break;
case SshConstants.SSH_MSG_CHANNEL_EXTENDED_DATA:
channelExtendedData(buffer);
break;
case SshConstants.SSH_MSG_CHANNEL_FAILURE:
channelFailure(buffer);
break;
case SshConstants.SSH_MSG_CHANNEL_SUCCESS:
channelSuccess(buffer);
break;
case SshConstants.SSH_MSG_CHANNEL_WINDOW_ADJUST:
channelWindowAdjust(buffer);
break;
case SshConstants.SSH_MSG_CHANNEL_EOF:
channelEof(buffer);
break;
case SshConstants.SSH_MSG_CHANNEL_CLOSE:
channelClose(buffer);
break;
case SshConstants.SSH_MSG_GLOBAL_REQUEST:
globalRequest(buffer);
break;
case SshConstants.SSH_MSG_REQUEST_SUCCESS:
requestSuccess(buffer);
break;
case SshConstants.SSH_MSG_REQUEST_FAILURE:
requestFailure(buffer);
break;
default: {
/*
* According to https://tools.ietf.org/html/rfc4253#section-11.4
*
* An implementation MUST respond to all unrecognized messages with an SSH_MSG_UNIMPLEMENTED message in
* the order in which the messages were received.
*/
AbstractSession session = getSession();
if (log.isDebugEnabled()) {
log.debug("process({}) Unsupported command: {}",
session, SshConstants.getCommandMessageName(cmd));
}
session.notImplemented(cmd, buffer);
}
}
}
@Override
public boolean isAllowMoreSessions() {
return allowMoreSessions.get();
}
@Override
public void setAllowMoreSessions(boolean allow) {
if (log.isDebugEnabled()) {
log.debug("setAllowMoreSessions({}): {}", this, allow);
}
allowMoreSessions.set(allow);
}
public void channelOpenConfirmation(Buffer buffer) throws IOException {
Channel channel = getChannel(SshConstants.SSH_MSG_CHANNEL_OPEN_CONFIRMATION, buffer);
if (channel == null) {
return; // debug breakpoint
}
int sender = buffer.getInt();
long rwsize = buffer.getUInt();
long rmpsize = buffer.getUInt();
if (log.isDebugEnabled()) {
log.debug("channelOpenConfirmation({}) SSH_MSG_CHANNEL_OPEN_CONFIRMATION sender={}, window-size={}, packet-size={}",
channel, sender, rwsize, rmpsize);
}
/*
* NOTE: the 'sender' of the SSH_MSG_CHANNEL_OPEN_CONFIRMATION is the recipient on the client side - see rfc4254
* section 5.1:
*
* 'sender channel' is the channel number allocated by the other side
*
* in our case, the server
*/
channel.handleOpenSuccess(sender, rwsize, rmpsize, buffer);
}
public void channelOpenFailure(Buffer buffer) throws IOException {
AbstractClientChannel channel = (AbstractClientChannel) getChannel(SshConstants.SSH_MSG_CHANNEL_OPEN_FAILURE, buffer);
if (channel == null) {
return; // debug breakpoint
}
long id = channel.getChannelId();
boolean debugEnabled = log.isDebugEnabled();
if (debugEnabled) {
log.debug("channelOpenFailure({}) Received SSH_MSG_CHANNEL_OPEN_FAILURE", channel);
}
Channel removed;
synchronized (channels) {
removed = channels.remove(id);
}
if (debugEnabled) {
log.debug("channelOpenFailure({}) unregistered {}", channel, removed);
}
channel.handleOpenFailure(buffer);
}
/**
* Process incoming data on a channel
*
* @param buffer the buffer containing the data
* @throws IOException if an error occurs
*/
public void channelData(Buffer buffer) throws IOException {
Channel channel = getChannel(SshConstants.SSH_MSG_CHANNEL_DATA, buffer);
if (channel == null) {
return; // debug breakpoint
}
channel.handleData(buffer);
}
/**
* Process incoming extended data on a channel
*
* @param buffer the buffer containing the data
* @throws IOException if an error occurs
*/
public void channelExtendedData(Buffer buffer) throws IOException {
Channel channel = getChannel(SshConstants.SSH_MSG_CHANNEL_EXTENDED_DATA, buffer);
if (channel == null) {
return; // debug breakpoint
}
channel.handleExtendedData(buffer);
}
/**
* Process a window adjust packet on a channel
*
* @param buffer the buffer containing the window adjustment parameters
* @throws IOException if an error occurs
*/
public void channelWindowAdjust(Buffer buffer) throws IOException {
Channel channel = getChannel(SshConstants.SSH_MSG_CHANNEL_WINDOW_ADJUST, buffer);
if (channel == null) {
return; // debug breakpoint
}
channel.handleWindowAdjust(buffer);
}
/**
* Process end of file on a channel
*
* @param buffer the buffer containing the packet
* @throws IOException if an error occurs
*/
public void channelEof(Buffer buffer) throws IOException {
Channel channel = getChannel(SshConstants.SSH_MSG_CHANNEL_EOF, buffer);
if (channel == null) {
return; // debug breakpoint
}
channel.handleEof();
}
/**
* Close a channel due to a close packet received
*
* @param buffer the buffer containing the packet
* @throws IOException if an error occurs
*/
public void channelClose(Buffer buffer) throws IOException {
Channel channel = getChannel(SshConstants.SSH_MSG_CHANNEL_CLOSE, buffer);
if (channel == null) {
return; // debug breakpoint
}
channel.handleClose();
}
/**
* Service a request on a channel
*
* @param buffer the buffer containing the request
* @throws IOException if an error occurs
*/
public void channelRequest(Buffer buffer) throws IOException {
Channel channel = getChannel(SshConstants.SSH_MSG_CHANNEL_REQUEST, buffer);
if (channel == null) {
return; // debug breakpoint
}
channel.handleRequest(buffer);
}
/**
* Process a failure on a channel
*
* @param buffer the buffer containing the packet
* @throws IOException if an error occurs
*/
public void channelFailure(Buffer buffer) throws IOException {
Channel channel = getChannel(SshConstants.SSH_MSG_CHANNEL_FAILURE, buffer);
if (channel == null) {
return; // debug breakpoint
}
channel.handleFailure();
}
/**
* Process a success on a channel
*
* @param buffer the buffer containing the packet
* @throws IOException if an error occurs
*/
public void channelSuccess(Buffer buffer) throws IOException {
Channel channel = getChannel(SshConstants.SSH_MSG_CHANNEL_SUCCESS, buffer);
if (channel == null) {
return; // debug breakpoint
}
channel.handleSuccess();
}
/**
* Retrieve the channel designated by the given packet
*
* @param cmd The command being processed for the channel
* @param buffer the incoming packet
* @return the target channel
* @throws IOException if the channel does not exists
*/
protected Channel getChannel(byte cmd, Buffer buffer) throws IOException {
return getChannel(cmd, buffer.getUInt(), buffer);
}
protected Channel getChannel(byte cmd, long recipient, Buffer buffer) throws IOException {
Channel channel = channels.get(recipient);
if (channel != null) {
return channel;
}
UnknownChannelReferenceHandler handler = resolveUnknownChannelReferenceHandler();
if (handler == null) {
// Throw a special exception - SSHD-777
throw new SshChannelNotFoundException(recipient,
"Received " + SshConstants.getCommandMessageName(cmd) + " on unknown channel " + recipient);
}
channel = handler.handleUnknownChannelCommand(this, cmd, recipient, buffer);
return channel;
}
@Override
public UnknownChannelReferenceHandler resolveUnknownChannelReferenceHandler() {
UnknownChannelReferenceHandler handler = getUnknownChannelReferenceHandler();
if (handler != null) {
return handler;
}
Session s = getSession();
return (s == null) ? null : s.resolveUnknownChannelReferenceHandler();
}
protected void channelOpen(Buffer buffer) throws Exception {
String type = buffer.getString();
long sender = buffer.getUInt();
long rwsize = buffer.getUInt();
long rmpsize = buffer.getUInt();
/*
* NOTE: the 'sender' is the identifier assigned by the remote side - the server in this case
*/
boolean debugEnabled = log.isDebugEnabled();
if (debugEnabled) {
log.debug("channelOpen({}) SSH_MSG_CHANNEL_OPEN sender={}, type={}, window-size={}, packet-size={}",
this, sender, type, rwsize, rmpsize);
}
if (isClosing()) {
// TODO add language tag configurable control
sendChannelOpenFailure(buffer, sender, SshConstants.SSH_OPEN_CONNECT_FAILED,
"Server is shutting down while attempting to open channel type=" + type, "");
return;
}
if (!isAllowMoreSessions()) {
// TODO add language tag configurable control
sendChannelOpenFailure(buffer, sender, SshConstants.SSH_OPEN_CONNECT_FAILED, "additional sessions disabled", "");
return;
}
Session session = getSession();
FactoryManager manager = Objects.requireNonNull(session.getFactoryManager(), "No factory manager");
Channel channel = ChannelFactory.createChannel(session, manager.getChannelFactories(), type);
if (channel == null) {
// TODO add language tag configurable control
sendChannelOpenFailure(buffer, sender,
SshConstants.SSH_OPEN_UNKNOWN_CHANNEL_TYPE, "Unsupported channel type: " + type, "");
return;
}
long channelId = registerChannel(channel);
channel.addChannelListener(new ChannelListener() {
@Override
public void channelOpenSuccess(Channel channel) {
// Do not rely on the OpenFuture. We must be sure that we get the SSH_MSG_CHANNEL_OPEN_CONFIRMATION out
// before anything else.
try {
LocalWindow window = channel.getLocalWindow();
if (debugEnabled) {
log.debug(
"channelOpenSuccess({}) send SSH_MSG_CHANNEL_OPEN_CONFIRMATION recipient={}, sender={}, window-size={}, packet-size={}",
channel, sender, channelId, window.getSize(), window.getPacketSize());
}
Buffer buf = session.createBuffer(SshConstants.SSH_MSG_CHANNEL_OPEN_CONFIRMATION, Integer.SIZE);
buf.putUInt(sender); // remote (server side) identifier
buf.putUInt(channelId); // local (client side) identifier
buf.putUInt(window.getSize());
buf.putUInt(window.getPacketSize());
session.writePacket(buf);
} catch (IOException e) {
warn("channelOpenSuccess({}) {}: {}", AbstractConnectionService.this, e.getClass().getSimpleName(),
e.getMessage(), e);
session.exceptionCaught(e);
}
}
});
OpenFuture openFuture = channel.open(sender, rwsize, rmpsize, buffer);
openFuture.addListener(future -> {
try {
if (!future.isOpened()) {
int reasonCode = 0;
String message = "Generic error while opening channel: " + channelId;
Throwable exception = future.getException();
if (exception != null) {
if (exception instanceof SshChannelOpenException) {
reasonCode = ((SshChannelOpenException) exception).getReasonCode();
} else {
message = exception.getClass().getSimpleName() + " while opening channel: " + message;
}
} else {
log.warn("operationComplete({}) no exception on closed future={}",
AbstractConnectionService.this, future);
}
Buffer buf = session.createBuffer(SshConstants.SSH_MSG_CHANNEL_OPEN_FAILURE, message.length() + Long.SIZE);
sendChannelOpenFailure(buf, sender, reasonCode, message, "");
}
} catch (IOException e) {
warn("operationComplete({}) {}: {}",
AbstractConnectionService.this, e.getClass().getSimpleName(), e.getMessage(), e);
session.exceptionCaught(e);
}
});
}
protected IoWriteFuture sendChannelOpenFailure(
Buffer buffer, long sender, int reasonCode, String message, String lang)
throws IOException {
if (log.isDebugEnabled()) {
log.debug("sendChannelOpenFailure({}) sender={}, reason={}, lang={}, message='{}'",
this, sender, SshConstants.getOpenErrorCodeName(reasonCode), lang, message);
}
Session session = getSession();
Buffer buf = session.createBuffer(SshConstants.SSH_MSG_CHANNEL_OPEN_FAILURE,
Long.SIZE + GenericUtils.length(message) + GenericUtils.length(lang));
buf.putUInt(sender);
buf.putUInt(reasonCode);
buf.putString(message);
buf.putString(lang);
return session.writePacket(buf);
}
/**
* Process global requests
*
* @param buffer The request {@link Buffer}
* @return An {@link IoWriteFuture} representing the sent packet - <B>Note:</B> if no reply sent then an
* &quot;empty&quot; future is returned - i.e., any added listeners are triggered immediately with
* a synthetic &quot;success&quot;
* @throws Exception If failed to process the request
*/
protected IoWriteFuture globalRequest(Buffer buffer) throws Exception {
String req = buffer.getString();
boolean wantReply = buffer.getBoolean();
boolean debugEnabled = log.isDebugEnabled();
if (debugEnabled) {
log.debug("globalRequest({}) received SSH_MSG_GLOBAL_REQUEST {} want-reply={}", this, req, wantReply);
}
Session session = getSession();
FactoryManager manager = Objects.requireNonNull(session.getFactoryManager(), "No factory manager");
Collection<RequestHandler<ConnectionService>> handlers = manager.getGlobalRequestHandlers();
if (GenericUtils.size(handlers) > 0) {
boolean traceEnabled = log.isTraceEnabled();
for (RequestHandler<ConnectionService> handler : handlers) {
RequestHandler.Result result;
try {
result = handler.process(this, req, wantReply, buffer);
} catch (Throwable e) {
warn("globalRequest({})[{}, want-reply={}] failed ({}) to process: {}",
this, req, wantReply, e.getClass().getSimpleName(), e.getMessage(), e);
result = RequestHandler.Result.ReplyFailure;
}
// if Unsupported then check the next handler in line
if (RequestHandler.Result.Unsupported.equals(result)) {
if (traceEnabled) {
log.trace("globalRequest({}) {}#process({})[want-reply={}] : {}",
this, handler.getClass().getSimpleName(), req, wantReply, result);
}
} else {
return sendGlobalResponse(buffer, req, result, wantReply);
}
}
}
return handleUnknownRequest(buffer, req, wantReply);
}
protected IoWriteFuture handleUnknownRequest(Buffer buffer, String req, boolean wantReply) throws IOException {
log.warn("handleUnknownRequest({}) unknown global request: {}", this, req);
return sendGlobalResponse(buffer, req, RequestHandler.Result.Unsupported, wantReply);
}
protected IoWriteFuture sendGlobalResponse(
Buffer buffer, String req, RequestHandler.Result result, boolean wantReply)
throws IOException {
if (log.isDebugEnabled()) {
log.debug("sendGlobalResponse({})[{}] result={}, want-reply={}", this, req, result, wantReply);
}
if (RequestHandler.Result.Replied.equals(result) || (!wantReply)) {
return AbstractIoWriteFuture.fulfilled(req, Boolean.TRUE);
}
byte cmd = RequestHandler.Result.ReplySuccess.equals(result)
? SshConstants.SSH_MSG_REQUEST_SUCCESS
: SshConstants.SSH_MSG_REQUEST_FAILURE;
Session session = getSession();
Buffer rsp = session.createBuffer(cmd, 2);
return session.writePacket(rsp);
}
protected void requestSuccess(Buffer buffer) throws Exception {
AbstractSession s = getSession();
s.requestSuccess(buffer);
}
protected void requestFailure(Buffer buffer) throws Exception {
AbstractSession s = getSession();
s.requestFailure(buffer);
}
@Override
public String toString() {
return getClass().getSimpleName() + "[" + getSession() + "]";
}
}