| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.sshd.common.session.helpers; |
| |
| import java.io.IOException; |
| import java.time.Duration; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.Map; |
| import java.util.Objects; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.CopyOnWriteArraySet; |
| import java.util.concurrent.ScheduledExecutorService; |
| import java.util.concurrent.ScheduledFuture; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicLong; |
| import java.util.concurrent.atomic.AtomicReference; |
| import java.util.function.IntUnaryOperator; |
| |
| import org.apache.sshd.agent.common.AgentForwardSupport; |
| import org.apache.sshd.agent.common.DefaultAgentForwardSupport; |
| import org.apache.sshd.client.channel.AbstractClientChannel; |
| import org.apache.sshd.client.future.OpenFuture; |
| import org.apache.sshd.common.Closeable; |
| import org.apache.sshd.common.FactoryManager; |
| import org.apache.sshd.common.SshConstants; |
| import org.apache.sshd.common.channel.Channel; |
| import org.apache.sshd.common.channel.ChannelFactory; |
| import org.apache.sshd.common.channel.ChannelListener; |
| import org.apache.sshd.common.channel.LocalWindow; |
| import org.apache.sshd.common.channel.RequestHandler; |
| import org.apache.sshd.common.channel.exception.SshChannelNotFoundException; |
| import org.apache.sshd.common.channel.exception.SshChannelOpenException; |
| import org.apache.sshd.common.forward.Forwarder; |
| import org.apache.sshd.common.forward.ForwarderFactory; |
| import org.apache.sshd.common.forward.PortForwardingEventListener; |
| import org.apache.sshd.common.forward.PortForwardingEventListenerManager; |
| import org.apache.sshd.common.io.AbstractIoWriteFuture; |
| import org.apache.sshd.common.io.IoWriteFuture; |
| import org.apache.sshd.common.kex.KexState; |
| import org.apache.sshd.common.session.ConnectionService; |
| import org.apache.sshd.common.session.ReservedSessionMessagesHandler; |
| import org.apache.sshd.common.session.Session; |
| import org.apache.sshd.common.session.UnknownChannelReferenceHandler; |
| import org.apache.sshd.common.util.EventListenerUtils; |
| import org.apache.sshd.common.util.GenericUtils; |
| import org.apache.sshd.common.util.ValidateUtils; |
| import org.apache.sshd.common.util.buffer.Buffer; |
| import org.apache.sshd.common.util.closeable.AbstractInnerCloseable; |
| import org.apache.sshd.common.util.functors.Int2IntFunction; |
| import org.apache.sshd.core.CoreModuleProperties; |
| import org.apache.sshd.server.x11.DefaultX11ForwardSupport; |
| import org.apache.sshd.server.x11.X11ForwardSupport; |
| |
| /** |
| * Base implementation of ConnectionService. |
| * |
| * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a> |
| */ |
| public abstract class AbstractConnectionService |
| extends AbstractInnerCloseable |
| implements ConnectionService { |
| |
| /** |
| * Default growth factor function used to resize response buffers |
| */ |
| public static final IntUnaryOperator RESPONSE_BUFFER_GROWTH_FACTOR = Int2IntFunction.add(Byte.SIZE); |
| |
| /** Used in {@code SSH_MSH_IGNORE} messages for the keep-alive mechanism */ |
| public static final String DEFAULT_SESSION_IGNORE_HEARTBEAT_STRING = "ignore@sshd.apache.org"; |
| |
| /** |
| * Map of channels keyed by the identifier |
| */ |
| protected final Map<Long, Channel> channels = new ConcurrentHashMap<>(); |
| /** |
| * Next channel identifier - a UINT32 represented as a long |
| */ |
| protected final AtomicLong nextChannelId = new AtomicLong(0L); |
| protected final AtomicLong heartbeatCount = new AtomicLong(0L); |
| private ScheduledFuture<?> heartBeat; |
| |
| private final AtomicReference<AgentForwardSupport> agentForwardHolder = new AtomicReference<>(); |
| private final AtomicReference<X11ForwardSupport> x11ForwardHolder = new AtomicReference<>(); |
| private final AtomicReference<Forwarder> forwarderHolder = new AtomicReference<>(); |
| private final AtomicBoolean allowMoreSessions = new AtomicBoolean(true); |
| private final Collection<PortForwardingEventListener> listeners = new CopyOnWriteArraySet<>(); |
| private final Collection<PortForwardingEventListenerManager> managersHolder = new CopyOnWriteArraySet<>(); |
| private final Map<String, Object> properties = new ConcurrentHashMap<>(); |
| private final PortForwardingEventListener listenerProxy; |
| private final AbstractSession sessionInstance; |
| private UnknownChannelReferenceHandler unknownChannelReferenceHandler; |
| |
| protected AbstractConnectionService(AbstractSession session) { |
| sessionInstance = Objects.requireNonNull(session, "No session"); |
| listenerProxy = EventListenerUtils.proxyWrapper(PortForwardingEventListener.class, listeners); |
| } |
| |
| @Override |
| public Map<String, Object> getProperties() { |
| return properties; |
| } |
| |
| @Override |
| public PortForwardingEventListener getPortForwardingEventListenerProxy() { |
| return listenerProxy; |
| } |
| |
| @Override |
| public void addPortForwardingEventListener(PortForwardingEventListener listener) { |
| listeners.add(PortForwardingEventListener.validateListener(listener)); |
| } |
| |
| @Override |
| public void removePortForwardingEventListener(PortForwardingEventListener listener) { |
| if (listener == null) { |
| return; |
| } |
| |
| listeners.remove(PortForwardingEventListener.validateListener(listener)); |
| } |
| |
| @Override |
| public UnknownChannelReferenceHandler getUnknownChannelReferenceHandler() { |
| return unknownChannelReferenceHandler; |
| } |
| |
| @Override |
| public void setUnknownChannelReferenceHandler(UnknownChannelReferenceHandler handler) { |
| unknownChannelReferenceHandler = handler; |
| } |
| |
| @Override |
| public Collection<PortForwardingEventListenerManager> getRegisteredManagers() { |
| return managersHolder.isEmpty() ? Collections.emptyList() : new ArrayList<>(managersHolder); |
| } |
| |
| @Override |
| public boolean addPortForwardingEventListenerManager(PortForwardingEventListenerManager manager) { |
| return managersHolder.add(Objects.requireNonNull(manager, "No manager")); |
| } |
| |
| @Override |
| public boolean removePortForwardingEventListenerManager(PortForwardingEventListenerManager manager) { |
| if (manager == null) { |
| return false; |
| } |
| |
| return managersHolder.remove(manager); |
| } |
| |
| public Collection<Channel> getChannels() { |
| return channels.values(); |
| } |
| |
| @Override |
| public AbstractSession getSession() { |
| return sessionInstance; |
| } |
| |
| @Override |
| public void start() { |
| heartBeat = startHeartBeat(); |
| } |
| |
| protected synchronized ScheduledFuture<?> startHeartBeat() { |
| stopHeartBeat(); // make sure any existing heartbeat is stopped |
| |
| HeartbeatType heartbeatType = getSessionHeartbeatType(); |
| Duration interval = getSessionHeartbeatInterval(); |
| Session session = getSession(); |
| boolean debugEnabled = log.isDebugEnabled(); |
| if (debugEnabled) { |
| log.debug("startHeartbeat({}) heartbeat type={}, interval={}", session, heartbeatType, interval); |
| } |
| |
| if ((heartbeatType == null) || (heartbeatType == HeartbeatType.NONE) || (GenericUtils.isNegativeOrNull(interval))) { |
| return null; |
| } |
| |
| FactoryManager manager = session.getFactoryManager(); |
| ScheduledExecutorService service = manager.getScheduledExecutorService(); |
| return service.scheduleAtFixedRate( |
| this::sendHeartBeat, interval.toMillis(), interval.toMillis(), TimeUnit.MILLISECONDS); |
| } |
| |
| /** |
| * Sends a heartbeat message/packet |
| * |
| * @return {@code true} if heartbeat successfully sent |
| */ |
| protected boolean sendHeartBeat() { |
| HeartbeatType heartbeatType = getSessionHeartbeatType(); |
| Duration interval = getSessionHeartbeatInterval(); |
| Session session = getSession(); |
| boolean traceEnabled = log.isTraceEnabled(); |
| if (traceEnabled) { |
| log.trace("sendHeartbeat({}) heartbeat type={}, interval={}", |
| session, heartbeatType, interval); |
| } |
| |
| if ((heartbeatType == null) || (GenericUtils.isNegativeOrNull(interval)) || (heartBeat == null)) { |
| return false; |
| } |
| |
| // SSHD-1059 |
| KexState kexState = session.getKexState(); |
| if ((heartbeatType != HeartbeatType.NONE) |
| && (kexState != KexState.DONE)) { |
| if (traceEnabled) { |
| log.trace("sendHeartbeat({}) heartbeat type={}, interval={} - skip due to KEX state={}", |
| session, heartbeatType, interval, kexState); |
| } |
| return false; |
| } |
| |
| try { |
| switch (heartbeatType) { |
| case NONE: |
| return false; |
| case IGNORE: { |
| Buffer buffer = session.createBuffer( |
| SshConstants.SSH_MSG_IGNORE, DEFAULT_SESSION_IGNORE_HEARTBEAT_STRING.length() + Byte.SIZE); |
| buffer.putString(DEFAULT_SESSION_IGNORE_HEARTBEAT_STRING); |
| |
| IoWriteFuture future = session.writePacket(buffer); |
| future.addListener(this::futureDone); |
| return true; |
| } |
| case RESERVED: { |
| ReservedSessionMessagesHandler handler = Objects.requireNonNull( |
| session.getReservedSessionMessagesHandler(), |
| "No customized heartbeat handler registered"); |
| return handler.sendReservedHeartbeat(this); |
| } |
| default: |
| throw new UnsupportedOperationException("Unsupported heartbeat type: " + heartbeatType); |
| } |
| |
| } catch (Throwable e) { |
| session.exceptionCaught(e); |
| warn("sendHeartBeat({}) failed ({}) to send heartbeat #{} request={}: {}", |
| session, e.getClass().getSimpleName(), heartbeatCount, heartbeatType, e.getMessage(), e); |
| return false; |
| } |
| } |
| |
| protected void futureDone(IoWriteFuture future) { |
| Throwable t = future.getException(); |
| if (t != null) { |
| Session session = getSession(); |
| session.exceptionCaught(t); |
| } |
| } |
| |
| protected synchronized void stopHeartBeat() { |
| boolean debugEnabled = log.isDebugEnabled(); |
| Session session = getSession(); |
| if (heartBeat == null) { |
| if (debugEnabled) { |
| log.debug("stopHeartBeat({}) no heartbeat to stop", session); |
| } |
| return; |
| } |
| |
| if (debugEnabled) { |
| log.debug("stopHeartBeat({}) stopping", session); |
| } |
| |
| try { |
| heartBeat.cancel(true); |
| } finally { |
| heartBeat = null; |
| } |
| |
| if (debugEnabled) { |
| log.debug("stopHeartBeat({}) stopped", session); |
| } |
| } |
| |
| @Override |
| public Forwarder getForwarder() { |
| Forwarder forwarder; |
| Session session = getSession(); |
| synchronized (forwarderHolder) { |
| forwarder = forwarderHolder.get(); |
| if (forwarder != null) { |
| return forwarder; |
| } |
| |
| forwarder = ValidateUtils.checkNotNull( |
| createForwardingFilter(session), "No forwarder created for %s", session); |
| forwarderHolder.set(forwarder); |
| } |
| |
| if (log.isDebugEnabled()) { |
| log.debug("getForwardingFilter({}) created instance", session); |
| } |
| return forwarder; |
| } |
| |
| @Override |
| protected void preClose() { |
| stopHeartBeat(); |
| this.listeners.clear(); |
| this.managersHolder.clear(); |
| super.preClose(); |
| } |
| |
| protected Forwarder createForwardingFilter(Session session) { |
| FactoryManager manager = Objects.requireNonNull(session.getFactoryManager(), "No factory manager"); |
| ForwarderFactory factory = Objects.requireNonNull(manager.getForwarderFactory(), "No forwarder factory"); |
| Forwarder forwarder = factory.create(this); |
| forwarder.addPortForwardingEventListenerManager(this); |
| return forwarder; |
| } |
| |
| @Override |
| public X11ForwardSupport getX11ForwardSupport() { |
| X11ForwardSupport x11Support; |
| Session session = getSession(); |
| synchronized (x11ForwardHolder) { |
| x11Support = x11ForwardHolder.get(); |
| if (x11Support != null) { |
| return x11Support; |
| } |
| |
| x11Support = ValidateUtils.checkNotNull( |
| createX11ForwardSupport(session), "No X11 forwarder created for %s", session); |
| x11ForwardHolder.set(x11Support); |
| } |
| |
| if (log.isDebugEnabled()) { |
| log.debug("getX11ForwardSupport({}) created instance", session); |
| } |
| return x11Support; |
| } |
| |
| protected X11ForwardSupport createX11ForwardSupport(Session session) { |
| return new DefaultX11ForwardSupport(this); |
| } |
| |
| @Override |
| public AgentForwardSupport getAgentForwardSupport() { |
| AgentForwardSupport agentForward; |
| Session session = getSession(); |
| synchronized (agentForwardHolder) { |
| agentForward = agentForwardHolder.get(); |
| if (agentForward != null) { |
| return agentForward; |
| } |
| |
| agentForward = ValidateUtils.checkNotNull( |
| createAgentForwardSupport(session), "No agent forward created for %s", session); |
| agentForwardHolder.set(agentForward); |
| } |
| |
| if (log.isDebugEnabled()) { |
| log.debug("getAgentForwardSupport({}) created instance", session); |
| } |
| |
| return agentForward; |
| } |
| |
| protected AgentForwardSupport createAgentForwardSupport(Session session) { |
| return new DefaultAgentForwardSupport(this); |
| } |
| |
| @Override |
| protected Closeable getInnerCloseable() { |
| return builder() |
| .sequential(forwarderHolder.get(), agentForwardHolder.get(), x11ForwardHolder.get()) |
| .parallel(toString(), getChannels()) |
| .build(); |
| } |
| |
| protected long getNextChannelId() { |
| return nextChannelId.getAndIncrement(); |
| } |
| |
| @Override |
| public long registerChannel(Channel channel) throws IOException { |
| Session session = getSession(); |
| int maxChannels = CoreModuleProperties.MAX_CONCURRENT_CHANNELS.getRequired(this); |
| int curSize = channels.size(); |
| if (curSize > maxChannels) { |
| throw new IllegalStateException("Currently active channels (" + curSize + ") at max.: " + maxChannels); |
| } |
| |
| long channelId = getNextChannelId(); |
| channel.init(this, session, channelId); |
| |
| boolean registered = false; |
| synchronized (channels) { |
| if (!isClosing()) { |
| channels.put(channelId, channel); |
| registered = true; |
| } |
| } |
| |
| if (log.isDebugEnabled()) { |
| log.debug("registerChannel({})[id={}, registered={}] {}", this, channelId, registered, channel); |
| } |
| |
| channel.handleChannelRegistrationResult(this, session, channelId, registered); |
| return channelId; |
| } |
| |
| /** |
| * Remove this channel from the list of managed channels |
| * |
| * @param channel the channel |
| */ |
| @Override |
| public void unregisterChannel(Channel channel) { |
| long channelId = channel.getChannelId(); |
| Channel result; |
| synchronized (channels) { |
| result = channels.remove(channelId); |
| } |
| |
| if (log.isDebugEnabled()) { |
| log.debug("unregisterChannel({}) result={}", channel, result); |
| } |
| |
| if (result != null) { |
| result.handleChannelUnregistration(this); |
| } |
| } |
| |
| @Override |
| public void process(int cmd, Buffer buffer) throws Exception { |
| switch (cmd) { |
| case SshConstants.SSH_MSG_CHANNEL_OPEN: |
| channelOpen(buffer); |
| break; |
| case SshConstants.SSH_MSG_CHANNEL_OPEN_CONFIRMATION: |
| channelOpenConfirmation(buffer); |
| break; |
| case SshConstants.SSH_MSG_CHANNEL_OPEN_FAILURE: |
| channelOpenFailure(buffer); |
| break; |
| case SshConstants.SSH_MSG_CHANNEL_REQUEST: |
| channelRequest(buffer); |
| break; |
| case SshConstants.SSH_MSG_CHANNEL_DATA: |
| channelData(buffer); |
| break; |
| case SshConstants.SSH_MSG_CHANNEL_EXTENDED_DATA: |
| channelExtendedData(buffer); |
| break; |
| case SshConstants.SSH_MSG_CHANNEL_FAILURE: |
| channelFailure(buffer); |
| break; |
| case SshConstants.SSH_MSG_CHANNEL_SUCCESS: |
| channelSuccess(buffer); |
| break; |
| case SshConstants.SSH_MSG_CHANNEL_WINDOW_ADJUST: |
| channelWindowAdjust(buffer); |
| break; |
| case SshConstants.SSH_MSG_CHANNEL_EOF: |
| channelEof(buffer); |
| break; |
| case SshConstants.SSH_MSG_CHANNEL_CLOSE: |
| channelClose(buffer); |
| break; |
| case SshConstants.SSH_MSG_GLOBAL_REQUEST: |
| globalRequest(buffer); |
| break; |
| case SshConstants.SSH_MSG_REQUEST_SUCCESS: |
| requestSuccess(buffer); |
| break; |
| case SshConstants.SSH_MSG_REQUEST_FAILURE: |
| requestFailure(buffer); |
| break; |
| default: { |
| /* |
| * According to https://tools.ietf.org/html/rfc4253#section-11.4 |
| * |
| * An implementation MUST respond to all unrecognized messages with an SSH_MSG_UNIMPLEMENTED message in |
| * the order in which the messages were received. |
| */ |
| AbstractSession session = getSession(); |
| if (log.isDebugEnabled()) { |
| log.debug("process({}) Unsupported command: {}", |
| session, SshConstants.getCommandMessageName(cmd)); |
| } |
| session.notImplemented(cmd, buffer); |
| } |
| } |
| } |
| |
| @Override |
| public boolean isAllowMoreSessions() { |
| return allowMoreSessions.get(); |
| } |
| |
| @Override |
| public void setAllowMoreSessions(boolean allow) { |
| if (log.isDebugEnabled()) { |
| log.debug("setAllowMoreSessions({}): {}", this, allow); |
| } |
| allowMoreSessions.set(allow); |
| } |
| |
| public void channelOpenConfirmation(Buffer buffer) throws IOException { |
| Channel channel = getChannel(SshConstants.SSH_MSG_CHANNEL_OPEN_CONFIRMATION, buffer); |
| if (channel == null) { |
| return; // debug breakpoint |
| } |
| |
| int sender = buffer.getInt(); |
| long rwsize = buffer.getUInt(); |
| long rmpsize = buffer.getUInt(); |
| if (log.isDebugEnabled()) { |
| log.debug("channelOpenConfirmation({}) SSH_MSG_CHANNEL_OPEN_CONFIRMATION sender={}, window-size={}, packet-size={}", |
| channel, sender, rwsize, rmpsize); |
| } |
| /* |
| * NOTE: the 'sender' of the SSH_MSG_CHANNEL_OPEN_CONFIRMATION is the recipient on the client side - see rfc4254 |
| * section 5.1: |
| * |
| * 'sender channel' is the channel number allocated by the other side |
| * |
| * in our case, the server |
| */ |
| channel.handleOpenSuccess(sender, rwsize, rmpsize, buffer); |
| } |
| |
| public void channelOpenFailure(Buffer buffer) throws IOException { |
| AbstractClientChannel channel = (AbstractClientChannel) getChannel(SshConstants.SSH_MSG_CHANNEL_OPEN_FAILURE, buffer); |
| if (channel == null) { |
| return; // debug breakpoint |
| } |
| |
| long id = channel.getChannelId(); |
| boolean debugEnabled = log.isDebugEnabled(); |
| if (debugEnabled) { |
| log.debug("channelOpenFailure({}) Received SSH_MSG_CHANNEL_OPEN_FAILURE", channel); |
| } |
| |
| Channel removed; |
| synchronized (channels) { |
| removed = channels.remove(id); |
| } |
| |
| if (debugEnabled) { |
| log.debug("channelOpenFailure({}) unregistered {}", channel, removed); |
| } |
| |
| channel.handleOpenFailure(buffer); |
| } |
| |
| /** |
| * Process incoming data on a channel |
| * |
| * @param buffer the buffer containing the data |
| * @throws IOException if an error occurs |
| */ |
| public void channelData(Buffer buffer) throws IOException { |
| Channel channel = getChannel(SshConstants.SSH_MSG_CHANNEL_DATA, buffer); |
| if (channel == null) { |
| return; // debug breakpoint |
| } |
| |
| channel.handleData(buffer); |
| } |
| |
| /** |
| * Process incoming extended data on a channel |
| * |
| * @param buffer the buffer containing the data |
| * @throws IOException if an error occurs |
| */ |
| public void channelExtendedData(Buffer buffer) throws IOException { |
| Channel channel = getChannel(SshConstants.SSH_MSG_CHANNEL_EXTENDED_DATA, buffer); |
| if (channel == null) { |
| return; // debug breakpoint |
| } |
| |
| channel.handleExtendedData(buffer); |
| } |
| |
| /** |
| * Process a window adjust packet on a channel |
| * |
| * @param buffer the buffer containing the window adjustment parameters |
| * @throws IOException if an error occurs |
| */ |
| public void channelWindowAdjust(Buffer buffer) throws IOException { |
| Channel channel = getChannel(SshConstants.SSH_MSG_CHANNEL_WINDOW_ADJUST, buffer); |
| if (channel == null) { |
| return; // debug breakpoint |
| } |
| |
| channel.handleWindowAdjust(buffer); |
| } |
| |
| /** |
| * Process end of file on a channel |
| * |
| * @param buffer the buffer containing the packet |
| * @throws IOException if an error occurs |
| */ |
| public void channelEof(Buffer buffer) throws IOException { |
| Channel channel = getChannel(SshConstants.SSH_MSG_CHANNEL_EOF, buffer); |
| if (channel == null) { |
| return; // debug breakpoint |
| } |
| |
| channel.handleEof(); |
| } |
| |
| /** |
| * Close a channel due to a close packet received |
| * |
| * @param buffer the buffer containing the packet |
| * @throws IOException if an error occurs |
| */ |
| public void channelClose(Buffer buffer) throws IOException { |
| Channel channel = getChannel(SshConstants.SSH_MSG_CHANNEL_CLOSE, buffer); |
| if (channel == null) { |
| return; // debug breakpoint |
| } |
| |
| channel.handleClose(); |
| } |
| |
| /** |
| * Service a request on a channel |
| * |
| * @param buffer the buffer containing the request |
| * @throws IOException if an error occurs |
| */ |
| public void channelRequest(Buffer buffer) throws IOException { |
| Channel channel = getChannel(SshConstants.SSH_MSG_CHANNEL_REQUEST, buffer); |
| if (channel == null) { |
| return; // debug breakpoint |
| } |
| |
| channel.handleRequest(buffer); |
| } |
| |
| /** |
| * Process a failure on a channel |
| * |
| * @param buffer the buffer containing the packet |
| * @throws IOException if an error occurs |
| */ |
| public void channelFailure(Buffer buffer) throws IOException { |
| Channel channel = getChannel(SshConstants.SSH_MSG_CHANNEL_FAILURE, buffer); |
| if (channel == null) { |
| return; // debug breakpoint |
| } |
| |
| channel.handleFailure(); |
| } |
| |
| /** |
| * Process a success on a channel |
| * |
| * @param buffer the buffer containing the packet |
| * @throws IOException if an error occurs |
| */ |
| public void channelSuccess(Buffer buffer) throws IOException { |
| Channel channel = getChannel(SshConstants.SSH_MSG_CHANNEL_SUCCESS, buffer); |
| if (channel == null) { |
| return; // debug breakpoint |
| } |
| |
| channel.handleSuccess(); |
| } |
| |
| /** |
| * Retrieve the channel designated by the given packet |
| * |
| * @param cmd The command being processed for the channel |
| * @param buffer the incoming packet |
| * @return the target channel |
| * @throws IOException if the channel does not exists |
| */ |
| protected Channel getChannel(byte cmd, Buffer buffer) throws IOException { |
| return getChannel(cmd, buffer.getUInt(), buffer); |
| } |
| |
| protected Channel getChannel(byte cmd, long recipient, Buffer buffer) throws IOException { |
| Channel channel = channels.get(recipient); |
| if (channel != null) { |
| return channel; |
| } |
| |
| UnknownChannelReferenceHandler handler = resolveUnknownChannelReferenceHandler(); |
| if (handler == null) { |
| // Throw a special exception - SSHD-777 |
| throw new SshChannelNotFoundException(recipient, |
| "Received " + SshConstants.getCommandMessageName(cmd) + " on unknown channel " + recipient); |
| } |
| |
| channel = handler.handleUnknownChannelCommand(this, cmd, recipient, buffer); |
| return channel; |
| } |
| |
| @Override |
| public UnknownChannelReferenceHandler resolveUnknownChannelReferenceHandler() { |
| UnknownChannelReferenceHandler handler = getUnknownChannelReferenceHandler(); |
| if (handler != null) { |
| return handler; |
| } |
| |
| Session s = getSession(); |
| return (s == null) ? null : s.resolveUnknownChannelReferenceHandler(); |
| } |
| |
| protected void channelOpen(Buffer buffer) throws Exception { |
| String type = buffer.getString(); |
| long sender = buffer.getUInt(); |
| long rwsize = buffer.getUInt(); |
| long rmpsize = buffer.getUInt(); |
| /* |
| * NOTE: the 'sender' is the identifier assigned by the remote side - the server in this case |
| */ |
| boolean debugEnabled = log.isDebugEnabled(); |
| if (debugEnabled) { |
| log.debug("channelOpen({}) SSH_MSG_CHANNEL_OPEN sender={}, type={}, window-size={}, packet-size={}", |
| this, sender, type, rwsize, rmpsize); |
| } |
| |
| if (isClosing()) { |
| // TODO add language tag configurable control |
| sendChannelOpenFailure(buffer, sender, SshConstants.SSH_OPEN_CONNECT_FAILED, |
| "Server is shutting down while attempting to open channel type=" + type, ""); |
| return; |
| } |
| |
| if (!isAllowMoreSessions()) { |
| // TODO add language tag configurable control |
| sendChannelOpenFailure(buffer, sender, SshConstants.SSH_OPEN_CONNECT_FAILED, "additional sessions disabled", ""); |
| return; |
| } |
| |
| Session session = getSession(); |
| FactoryManager manager = Objects.requireNonNull(session.getFactoryManager(), "No factory manager"); |
| Channel channel = ChannelFactory.createChannel(session, manager.getChannelFactories(), type); |
| if (channel == null) { |
| // TODO add language tag configurable control |
| sendChannelOpenFailure(buffer, sender, |
| SshConstants.SSH_OPEN_UNKNOWN_CHANNEL_TYPE, "Unsupported channel type: " + type, ""); |
| return; |
| } |
| |
| long channelId = registerChannel(channel); |
| channel.addChannelListener(new ChannelListener() { |
| |
| @Override |
| public void channelOpenSuccess(Channel channel) { |
| // Do not rely on the OpenFuture. We must be sure that we get the SSH_MSG_CHANNEL_OPEN_CONFIRMATION out |
| // before anything else. |
| try { |
| LocalWindow window = channel.getLocalWindow(); |
| if (debugEnabled) { |
| log.debug( |
| "channelOpenSuccess({}) send SSH_MSG_CHANNEL_OPEN_CONFIRMATION recipient={}, sender={}, window-size={}, packet-size={}", |
| channel, sender, channelId, window.getSize(), window.getPacketSize()); |
| } |
| Buffer buf = session.createBuffer(SshConstants.SSH_MSG_CHANNEL_OPEN_CONFIRMATION, Integer.SIZE); |
| buf.putUInt(sender); // remote (server side) identifier |
| buf.putUInt(channelId); // local (client side) identifier |
| buf.putUInt(window.getSize()); |
| buf.putUInt(window.getPacketSize()); |
| session.writePacket(buf); |
| } catch (IOException e) { |
| warn("channelOpenSuccess({}) {}: {}", AbstractConnectionService.this, e.getClass().getSimpleName(), |
| e.getMessage(), e); |
| session.exceptionCaught(e); |
| } |
| } |
| }); |
| OpenFuture openFuture = channel.open(sender, rwsize, rmpsize, buffer); |
| openFuture.addListener(future -> { |
| try { |
| if (!future.isOpened()) { |
| int reasonCode = 0; |
| String message = "Generic error while opening channel: " + channelId; |
| Throwable exception = future.getException(); |
| if (exception != null) { |
| if (exception instanceof SshChannelOpenException) { |
| reasonCode = ((SshChannelOpenException) exception).getReasonCode(); |
| } else { |
| message = exception.getClass().getSimpleName() + " while opening channel: " + message; |
| } |
| } else { |
| log.warn("operationComplete({}) no exception on closed future={}", |
| AbstractConnectionService.this, future); |
| } |
| |
| Buffer buf = session.createBuffer(SshConstants.SSH_MSG_CHANNEL_OPEN_FAILURE, message.length() + Long.SIZE); |
| sendChannelOpenFailure(buf, sender, reasonCode, message, ""); |
| } |
| } catch (IOException e) { |
| warn("operationComplete({}) {}: {}", |
| AbstractConnectionService.this, e.getClass().getSimpleName(), e.getMessage(), e); |
| session.exceptionCaught(e); |
| } |
| }); |
| } |
| |
| protected IoWriteFuture sendChannelOpenFailure( |
| Buffer buffer, long sender, int reasonCode, String message, String lang) |
| throws IOException { |
| if (log.isDebugEnabled()) { |
| log.debug("sendChannelOpenFailure({}) sender={}, reason={}, lang={}, message='{}'", |
| this, sender, SshConstants.getOpenErrorCodeName(reasonCode), lang, message); |
| } |
| |
| Session session = getSession(); |
| Buffer buf = session.createBuffer(SshConstants.SSH_MSG_CHANNEL_OPEN_FAILURE, |
| Long.SIZE + GenericUtils.length(message) + GenericUtils.length(lang)); |
| buf.putUInt(sender); |
| buf.putUInt(reasonCode); |
| buf.putString(message); |
| buf.putString(lang); |
| return session.writePacket(buf); |
| } |
| |
| /** |
| * Process global requests |
| * |
| * @param buffer The request {@link Buffer} |
| * @return An {@link IoWriteFuture} representing the sent packet - <B>Note:</B> if no reply sent then an |
| * "empty" future is returned - i.e., any added listeners are triggered immediately with |
| * a synthetic "success" |
| * @throws Exception If failed to process the request |
| */ |
| protected IoWriteFuture globalRequest(Buffer buffer) throws Exception { |
| String req = buffer.getString(); |
| boolean wantReply = buffer.getBoolean(); |
| boolean debugEnabled = log.isDebugEnabled(); |
| if (debugEnabled) { |
| log.debug("globalRequest({}) received SSH_MSG_GLOBAL_REQUEST {} want-reply={}", this, req, wantReply); |
| } |
| |
| Session session = getSession(); |
| FactoryManager manager = Objects.requireNonNull(session.getFactoryManager(), "No factory manager"); |
| Collection<RequestHandler<ConnectionService>> handlers = manager.getGlobalRequestHandlers(); |
| if (GenericUtils.size(handlers) > 0) { |
| boolean traceEnabled = log.isTraceEnabled(); |
| for (RequestHandler<ConnectionService> handler : handlers) { |
| RequestHandler.Result result; |
| try { |
| result = handler.process(this, req, wantReply, buffer); |
| } catch (Throwable e) { |
| warn("globalRequest({})[{}, want-reply={}] failed ({}) to process: {}", |
| this, req, wantReply, e.getClass().getSimpleName(), e.getMessage(), e); |
| result = RequestHandler.Result.ReplyFailure; |
| } |
| |
| // if Unsupported then check the next handler in line |
| if (RequestHandler.Result.Unsupported.equals(result)) { |
| if (traceEnabled) { |
| log.trace("globalRequest({}) {}#process({})[want-reply={}] : {}", |
| this, handler.getClass().getSimpleName(), req, wantReply, result); |
| } |
| } else { |
| return sendGlobalResponse(buffer, req, result, wantReply); |
| } |
| } |
| } |
| |
| return handleUnknownRequest(buffer, req, wantReply); |
| } |
| |
| protected IoWriteFuture handleUnknownRequest(Buffer buffer, String req, boolean wantReply) throws IOException { |
| log.warn("handleUnknownRequest({}) unknown global request: {}", this, req); |
| return sendGlobalResponse(buffer, req, RequestHandler.Result.Unsupported, wantReply); |
| } |
| |
| protected IoWriteFuture sendGlobalResponse( |
| Buffer buffer, String req, RequestHandler.Result result, boolean wantReply) |
| throws IOException { |
| if (log.isDebugEnabled()) { |
| log.debug("sendGlobalResponse({})[{}] result={}, want-reply={}", this, req, result, wantReply); |
| } |
| |
| if (RequestHandler.Result.Replied.equals(result) || (!wantReply)) { |
| return AbstractIoWriteFuture.fulfilled(req, Boolean.TRUE); |
| } |
| |
| byte cmd = RequestHandler.Result.ReplySuccess.equals(result) |
| ? SshConstants.SSH_MSG_REQUEST_SUCCESS |
| : SshConstants.SSH_MSG_REQUEST_FAILURE; |
| Session session = getSession(); |
| Buffer rsp = session.createBuffer(cmd, 2); |
| return session.writePacket(rsp); |
| } |
| |
| protected void requestSuccess(Buffer buffer) throws Exception { |
| AbstractSession s = getSession(); |
| s.requestSuccess(buffer); |
| } |
| |
| protected void requestFailure(Buffer buffer) throws Exception { |
| AbstractSession s = getSession(); |
| s.requestFailure(buffer); |
| } |
| |
| @Override |
| public String toString() { |
| return getClass().getSimpleName() + "[" + getSession() + "]"; |
| } |
| } |