SFTP: Refactor to reduce code duplication

Introduce an SftpResponse abstraction to centralize basic reply buffer
parsing and validation.
diff --git a/sshd-sftp/src/main/java/org/apache/sshd/sftp/client/extensions/helpers/AbstractSftpClientExtension.java b/sshd-sftp/src/main/java/org/apache/sshd/sftp/client/extensions/helpers/AbstractSftpClientExtension.java
index 4d2c126..eca4156 100644
--- a/sshd-sftp/src/main/java/org/apache/sshd/sftp/client/extensions/helpers/AbstractSftpClientExtension.java
+++ b/sshd-sftp/src/main/java/org/apache/sshd/sftp/client/extensions/helpers/AbstractSftpClientExtension.java
@@ -37,6 +37,7 @@
 import org.apache.sshd.sftp.client.SftpClient;
 import org.apache.sshd.sftp.client.SftpClient.Handle;
 import org.apache.sshd.sftp.client.extensions.SftpClientExtension;
+import org.apache.sshd.sftp.client.impl.SftpResponse;
 import org.apache.sshd.sftp.client.impl.SftpStatus;
 import org.apache.sshd.sftp.common.SftpConstants;
 import org.apache.sshd.sftp.common.SftpException;
@@ -186,37 +187,26 @@
      *                     {@link SftpConstants#SSH_FXP_EXTENDED_REPLY} buffer
      */
     protected Buffer checkExtendedReplyBuffer(Buffer buffer) throws IOException {
-        int length = buffer.getInt();
-        int type = buffer.getUByte();
-        int id = buffer.getInt();
-        validateIncomingResponse(SftpConstants.SSH_FXP_EXTENDED, id, type, length, buffer);
+        SftpResponse response = SftpResponse.parse(SftpConstants.SSH_FXP_EXTENDED, buffer);
 
-        if (type == SftpConstants.SSH_FXP_STATUS) {
-            SftpStatus status = SftpStatus.parse(buffer);
-            if (log.isDebugEnabled()) {
-                log.debug("checkExtendedReplyBuffer({})[id={}] - status: {}", getName(), id, status);
-            }
+        switch (response.getType()) {
+            case SftpConstants.SSH_FXP_EXTENDED_REPLY:
+                return response.getBuffer();
+            case SftpConstants.SSH_FXP_STATUS:
+                SftpStatus status = SftpStatus.parse(response);
+                if (log.isDebugEnabled()) {
+                    log.debug("checkExtendedReplyBuffer({})[id={}] - status: {}", getName(), response.getId(), status);
+                }
 
-            if (!status.isOk()) {
-                throwStatusException(id, status);
-            }
+                if (!status.isOk()) {
+                    throwStatusException(response.getId(), status);
+                }
 
-            return null;
-        } else if (type == SftpConstants.SSH_FXP_EXTENDED_REPLY) {
-            return buffer;
-        } else {
-            throw new SshException("Unexpected SFTP packet received: type=" + type + ", id=" + id + ", length=" + length);
-        }
-    }
-
-    protected void validateIncomingResponse(
-            int cmd, int id, int type, int length, Buffer buffer)
-            throws IOException {
-        int remaining = buffer.available();
-        if ((length < 0) || (length > (remaining + 5 /* type + id */))) {
-            throw new SshException("Bad length (" + length + ") for remaining data (" + remaining + ")"
-                                   + " in response to " + SftpConstants.getCommandMessageName(cmd)
-                                   + ": type=" + SftpConstants.getCommandMessageName(type) + ", id=" + id);
+                return null;
+            default:
+                throw new SshException(
+                        "Unexpected SFTP packet received: type=" + SftpConstants.getCommandMessageName(response.getType())
+                                       + ", id=" + response.getId() + ", length=" + response.getLength());
         }
     }
 
diff --git a/sshd-sftp/src/main/java/org/apache/sshd/sftp/client/impl/AbstractSftpClient.java b/sshd-sftp/src/main/java/org/apache/sshd/sftp/client/impl/AbstractSftpClient.java
index cdea1ec..8a8bcd9 100644
--- a/sshd-sftp/src/main/java/org/apache/sshd/sftp/client/impl/AbstractSftpClient.java
+++ b/sshd-sftp/src/main/java/org/apache/sshd/sftp/client/impl/AbstractSftpClient.java
@@ -149,6 +149,69 @@
     }
 
     /**
+     * Perform an SFTP request and wait until the response has been received.
+     *
+     * @param  cmd         the SFTP command code
+     * @param  request     a {@link Buffer} containing the request data
+     * @return             the {@link SftpResponse} for the request
+     * @throws IOException if an error occurs
+     */
+    protected SftpResponse rpc(int cmd, Buffer request) throws IOException {
+        int reqId = send(cmd, request);
+        return response(cmd, reqId);
+    }
+
+    /**
+     * Receives a response buffer, validates and returns it as a {@link SftpResponse}.
+     *
+     * @param  cmd         the command of the request the response is for
+     * @param  requestId   the id of the request
+     * @return             the {@link SftpResponse}
+     * @throws IOException if the received buffer is malformed
+     */
+    protected SftpResponse response(int cmd, int requestId) throws IOException {
+        SftpResponse result = SftpResponse.parse(cmd, receive(requestId));
+        if (log.isDebugEnabled()) {
+            switch (result.getType()) {
+                case SftpConstants.SSH_FXP_STATUS:
+                    Buffer buffer = result.getBuffer();
+                    if (buffer.available() >= 4) {
+                        int rpos = buffer.rpos();
+                        int status = buffer.getInt();
+                        buffer.rpos(rpos);
+                        if (status == SftpConstants.SSH_FX_OK && cmd == SftpConstants.SSH_FXP_WRITE) {
+                            // Only trace logging for data write if the status is OK
+                            if (log.isTraceEnabled()) {
+                                log.trace("response({}): received {}({}) for command {} (id={})", getClientChannel(),
+                                        SftpConstants.getCommandMessageName(result.getType()),
+                                        SftpConstants.getStatusName(status), SftpConstants.getCommandMessageName(cmd),
+                                        result.getId());
+                            }
+                        } else {
+                            log.debug("response({}): received {}({}) for command {} (id={})", getClientChannel(),
+                                    SftpConstants.getCommandMessageName(result.getType()), SftpConstants.getStatusName(status),
+                                    SftpConstants.getCommandMessageName(cmd), result.getId());
+                        }
+                    }
+                    break;
+                case SftpConstants.SSH_FXP_DATA:
+                    if (log.isTraceEnabled()) {
+                        log.debug("response({}): received {} for command {} (id={})", getClientChannel(),
+                                SftpConstants.getCommandMessageName(result.getType()), SftpConstants.getCommandMessageName(cmd),
+                                result.getId());
+                    }
+                    break;
+                default:
+                    log.debug("response({}): received {} for command {} (id={})", getClientChannel(),
+                            SftpConstants.getCommandMessageName(result.getType()), SftpConstants.getCommandMessageName(cmd),
+                            result.getId());
+                    break;
+            }
+        }
+        return result;
+    }
+
+    /**
      * Sends the specified command, waits for the response and then invokes {@link #checkResponseStatus(int, Buffer)}
      *
      * @param  cmd         The command to send
@@ -159,30 +222,22 @@
      * @see                #checkResponseStatus(int, Buffer)
      */
     protected void checkCommandStatus(int cmd, Buffer request) throws IOException {
-        int reqId = send(cmd, request);
-        Buffer response = receive(reqId);
-        checkResponseStatus(cmd, response);
+        checkResponseStatus(rpc(cmd, request));
     }
 
     /**
      * Checks if the incoming response is an {@code SSH_FXP_STATUS} one, and if so whether the substatus is
      * {@code SSH_FX_OK}.
      *
-     * @param  cmd         The sent command opcode
-     * @param  buffer      The received response {@link Buffer}
+     * @param  response    The received {@link SftpResponse}
      * @throws IOException If response does not carry a status or carries a bad status code
      * @see                #checkResponseStatus(int, int, int, String, String)
      */
-    protected void checkResponseStatus(int cmd, Buffer buffer) throws IOException {
-        int length = buffer.getInt();
-        int type = buffer.getUByte();
-        int id = buffer.getInt();
-        validateIncomingResponse(cmd, id, type, length, buffer);
-
-        if (type == SftpConstants.SSH_FXP_STATUS) {
-            checkResponseStatus(cmd, id, SftpStatus.parse(buffer));
+    protected void checkResponseStatus(SftpResponse response) throws IOException {
+        if (response.getType() == SftpConstants.SSH_FXP_STATUS) {
+            checkResponseStatus(response.getCmd(), response.getId(), SftpStatus.parse(response));
         } else {
-            IOException err = handleUnexpectedPacket(cmd, SftpConstants.SSH_FXP_STATUS, id, type, length, buffer);
+            IOException err = handleUnexpectedPacket(SftpConstants.SSH_FXP_STATUS, response);
             if (err != null) {
                 throw err;
             }
@@ -224,37 +279,29 @@
      * @see                #checkHandleResponse(int, Buffer)
      */
     protected byte[] checkHandle(int cmd, Buffer request) throws IOException {
-        int reqId = send(cmd, request);
-        Buffer response = receive(reqId);
-        return checkHandleResponse(cmd, response);
+        return checkHandleResponse(rpc(cmd, request));
     }
 
-    protected byte[] checkHandleResponse(int cmd, Buffer buffer) throws IOException {
-        int length = buffer.getInt();
-        int type = buffer.getUByte();
-        int id = buffer.getInt();
-        validateIncomingResponse(cmd, id, type, length, buffer);
-
-        if (type == SftpConstants.SSH_FXP_HANDLE) {
-            return ValidateUtils.checkNotNullAndNotEmpty(buffer.getBytes(), "Null/empty handle in buffer",
-                    GenericUtils.EMPTY_OBJECT_ARRAY);
+    protected byte[] checkHandleResponse(SftpResponse response) throws IOException {
+        switch (response.getType()) {
+            case SftpConstants.SSH_FXP_HANDLE:
+                return ValidateUtils.checkNotNullAndNotEmpty(response.getBuffer().getBytes(), "Null/empty handle in buffer",
+                        GenericUtils.EMPTY_OBJECT_ARRAY);
+            case SftpConstants.SSH_FXP_STATUS:
+                throwStatusException(response.getCmd(), response.getId(), SftpStatus.parse(response));
+                return null;
+            default:
+                return handleUnexpectedHandlePacket(response);
         }
-
-        if (type == SftpConstants.SSH_FXP_STATUS) {
-            throwStatusException(cmd, id, SftpStatus.parse(buffer));
-        }
-
-        return handleUnexpectedHandlePacket(cmd, id, type, length, buffer);
     }
 
-    protected byte[] handleUnexpectedHandlePacket(int cmd, int id, int type, int length, Buffer buffer) throws IOException {
-        IOException err = handleUnexpectedPacket(cmd, SftpConstants.SSH_FXP_HANDLE, id, type, length, buffer);
+    protected byte[] handleUnexpectedHandlePacket(SftpResponse response) throws IOException {
+        IOException err = handleUnexpectedPacket(SftpConstants.SSH_FXP_HANDLE, response);
         if (err != null) {
             throw err;
         }
-
-        throw new SshException("No handling for unexpected handle packet id=" + id
-                               + ", type=" + SftpConstants.getCommandMessageName(type) + ", length=" + length);
+        throw new SshException("No handling for unexpected handle packet id=" + response.getId() + ", type="
+                               + SftpConstants.getCommandMessageName(response.getType()) + ", length=" + response.getLength());
     }
 
     /**
@@ -267,35 +314,27 @@
      * @see                #checkAttributesResponse(int, Buffer)
      */
     protected Attributes checkAttributes(int cmd, Buffer request) throws IOException {
-        int reqId = send(cmd, request);
-        Buffer response = receive(reqId);
-        return checkAttributesResponse(cmd, response);
+        return checkAttributesResponse(rpc(cmd, request));
     }
 
-    protected Attributes checkAttributesResponse(int cmd, Buffer buffer) throws IOException {
-        int length = buffer.getInt();
-        int type = buffer.getUByte();
-        int id = buffer.getInt();
-        validateIncomingResponse(cmd, id, type, length, buffer);
-
-        if (type == SftpConstants.SSH_FXP_ATTRS) {
-            return readAttributes(cmd, buffer, new AtomicInteger(0));
+    protected Attributes checkAttributesResponse(SftpResponse response) throws IOException {
+        switch (response.getType()) {
+            case SftpConstants.SSH_FXP_ATTRS:
+                return readAttributes(response.getCmd(), response.getBuffer(), new AtomicInteger(0));
+            case SftpConstants.SSH_FXP_STATUS:
+                throwStatusException(response.getCmd(), response.getId(), SftpStatus.parse(response));
+                return null;
+            default:
+                return handleUnexpectedAttributesPacket(response);
         }
-
-        if (type == SftpConstants.SSH_FXP_STATUS) {
-            throwStatusException(cmd, id, SftpStatus.parse(buffer));
-        }
-
-        return handleUnexpectedAttributesPacket(cmd, id, type, length, buffer);
     }
 
-    protected Attributes handleUnexpectedAttributesPacket(int cmd, int id, int type, int length, Buffer buffer)
+    protected Attributes handleUnexpectedAttributesPacket(SftpResponse response)
             throws IOException {
-        IOException err = handleUnexpectedPacket(cmd, SftpConstants.SSH_FXP_ATTRS, id, type, length, buffer);
+        IOException err = handleUnexpectedPacket(SftpConstants.SSH_FXP_ATTRS, response);
         if (err != null) {
             throw err;
         }
-
         return null;
     }
 
@@ -309,52 +348,46 @@
      * @see                #checkOneNameResponse(int, Buffer)
      */
     protected String checkOneName(int cmd, Buffer request) throws IOException {
-        int reqId = send(cmd, request);
-        Buffer response = receive(reqId);
-        return checkOneNameResponse(cmd, response);
+        return checkOneNameResponse(rpc(cmd, request));
     }
 
-    protected String checkOneNameResponse(int cmd, Buffer buffer) throws IOException {
-        int length = buffer.getInt();
-        int type = buffer.getUByte();
-        int id = buffer.getInt();
-        validateIncomingResponse(cmd, id, type, length, buffer);
+    protected String checkOneNameResponse(SftpResponse response) throws IOException {
+        switch (response.getType()) {
+            case SftpConstants.SSH_FXP_NAME:
+                Buffer buffer = response.getBuffer();
+                int cmd = response.getCmd();
+                int len = buffer.getInt();
+                if (len != 1) {
+                    throw new SshException("SFTP error: received " + len + " names instead of 1");
+                }
 
-        if (type == SftpConstants.SSH_FXP_NAME) {
-            int len = buffer.getInt();
-            if (len != 1) {
-                throw new SshException("SFTP error: received " + len + " names instead of 1");
-            }
+                AtomicInteger nameIndex = new AtomicInteger(0);
+                String name = getReferencedName(cmd, buffer, nameIndex.getAndIncrement());
 
-            AtomicInteger nameIndex = new AtomicInteger(0);
-            String name = getReferencedName(cmd, buffer, nameIndex.getAndIncrement());
+                String longName = null;
+                int version = getVersion();
+                if (version == SftpConstants.SFTP_V3) {
+                    longName = getReferencedName(cmd, buffer, nameIndex.getAndIncrement());
+                }
 
-            String longName = null;
-            int version = getVersion();
-            if (version == SftpConstants.SFTP_V3) {
-                longName = getReferencedName(cmd, buffer, nameIndex.getAndIncrement());
-            }
-
-            Attributes attrs = readAttributes(cmd, buffer, nameIndex);
-            Boolean indicator = SftpHelper.getEndOfListIndicatorValue(buffer, version);
-            // TODO decide what to do if not-null and not TRUE
-            if (log.isTraceEnabled()) {
-                log.trace("checkOneNameResponse({})[id={}] {} ({})[{}] eol={}: {}",
-                        getClientChannel(), id, SftpConstants.getCommandMessageName(cmd),
-                        name, longName, indicator, attrs);
-            }
-            return name;
+                Attributes attrs = readAttributes(cmd, buffer, nameIndex);
+                Boolean indicator = SftpHelper.getEndOfListIndicatorValue(buffer, version);
+                // TODO decide what to do if not-null and not TRUE
+                if (log.isTraceEnabled()) {
+                    log.trace("checkOneNameResponse({})[id={}] {} ({})[{}] eol={}: {}", getClientChannel(), response.getId(),
+                            SftpConstants.getCommandMessageName(cmd), name, longName, indicator, attrs);
+                }
+                return name;
+            case SftpConstants.SSH_FXP_STATUS:
+                throwStatusException(response.getCmd(), response.getId(), SftpStatus.parse(response));
+                return null;
+            default:
+                return handleUnknownOneNamePacket(response);
         }
-
-        if (type == SftpConstants.SSH_FXP_STATUS) {
-            throwStatusException(cmd, id, SftpStatus.parse(buffer));
-        }
-
-        return handleUnknownOneNamePacket(cmd, id, type, length, buffer);
     }
 
-    protected String handleUnknownOneNamePacket(int cmd, int id, int type, int length, Buffer buffer) throws IOException {
-        IOException err = handleUnexpectedPacket(cmd, SftpConstants.SSH_FXP_NAME, id, type, length, buffer);
+    protected String handleUnknownOneNamePacket(SftpResponse response) throws IOException {
+        IOException err = handleUnexpectedPacket(SftpConstants.SSH_FXP_NAME, response);
         if (err != null) {
             throw err;
         }
@@ -452,8 +485,8 @@
                 }
                 if ((flags & SftpConstants.SSH_FILEXFER_ATTR_UNTRANSLATED_NAME) != 0) {
                     @SuppressWarnings("unused")
-                    String untranslated = getReferencedName(cmd, buffer, nameIndex.getAndIncrement()); // TODO: handle
-                                                                                                      // untranslated-name
+                    String untranslated = getReferencedName(cmd, buffer, nameIndex.getAndIncrement());
+                    // TODO: handle untranslated name
                 }
             }
         } else {
@@ -543,7 +576,7 @@
         buffer = writeAttributes(SftpConstants.SSH_FXP_OPEN, buffer, fileOpenAttributes);
 
         if (log.isDebugEnabled()) {
-            log.debug("open({}): send SSH_FXP_OPEN {}", getClientChannel(), path);
+            log.debug("open({}): send SSH_FXP_OPEN {} mode={}", getClientChannel(), path, String.format("0x%04x", mode));
         }
         CloseableHandle handle = new DefaultCloseableHandle(this, path, checkHandle(SftpConstants.SSH_FXP_OPEN, buffer));
         if (log.isTraceEnabled()) {
@@ -643,55 +676,47 @@
     protected int checkData(
             int cmd, Buffer request, int dstOffset, byte[] dst, AtomicReference<Boolean> eofSignalled)
             throws IOException {
-        int reqId = send(cmd, request);
-        Buffer response = receive(reqId);
-        return checkDataResponse(cmd, response, dstOffset, dst, eofSignalled);
+        return checkDataResponse(rpc(cmd, request), dstOffset, dst, eofSignalled);
     }
 
-    protected int checkDataResponse(
-            int cmd, Buffer buffer, int dstoff, byte[] dst, AtomicReference<Boolean> eofSignalled)
+    protected int checkDataResponse(SftpResponse response, int dstoff, byte[] dst, AtomicReference<Boolean> eofSignalled)
             throws IOException {
-        int length = buffer.getInt();
-        int type = buffer.getUByte();
-        int id = buffer.getInt();
-        validateIncomingResponse(cmd, id, type, length, buffer);
-
-        if (type == SftpConstants.SSH_FXP_DATA) {
-            int len = buffer.getInt();
-            ValidateUtils.checkTrue(len >= 0, "Invalid response data len: %d", len);
-            buffer.getRawBytes(dst, dstoff, len);
-            Boolean indicator = SftpHelper.getEndOfFileIndicatorValue(buffer, getVersion());
-            if (log.isTraceEnabled()) {
-                log.trace("checkDataResponse({}][id={}] {} offset={}, len={}, EOF={}",
-                        getClientChannel(), SftpConstants.getCommandMessageName(cmd),
-                        id, dstoff, len, indicator);
-            }
-            if (eofSignalled != null) {
-                eofSignalled.set(indicator);
-            }
-
-            return len;
-        }
-
-        if (type == SftpConstants.SSH_FXP_STATUS) {
-            SftpStatus status = SftpStatus.parse(buffer);
-
-            if (status.getStatusCode() == SftpConstants.SSH_FX_EOF) {
+        switch (response.getType()) {
+            case SftpConstants.SSH_FXP_DATA:
+                Buffer buffer = response.getBuffer();
+                int len = buffer.getInt();
+                ValidateUtils.checkTrue(len >= 0, "Invalid response data len: %d", len);
+                buffer.getRawBytes(dst, dstoff, len);
+                Boolean indicator = SftpHelper.getEndOfFileIndicatorValue(buffer, getVersion());
                 if (log.isTraceEnabled()) {
-                    log.trace("checkDataResponse({})[id={}] {} status: {}", getClientChannel(), id,
-                            SftpConstants.getCommandMessageName(cmd), status);
+                    log.trace("checkDataResponse({}][id={}] {} offset={}, len={}, EOF={}", getClientChannel(),
+                            SftpConstants.getCommandMessageName(response.getCmd()), response.getId(), dstoff, len, indicator);
                 }
-                return -1;
-            }
+                if (eofSignalled != null) {
+                    eofSignalled.set(indicator);
+                }
 
-            throwStatusException(cmd, id, status);
+                return len;
+            case SftpConstants.SSH_FXP_STATUS:
+                SftpStatus status = SftpStatus.parse(response);
+
+                if (status.getStatusCode() == SftpConstants.SSH_FX_EOF) {
+                    if (log.isTraceEnabled()) {
+                        log.trace("checkDataResponse({})[id={}] {} status: {}", getClientChannel(), response.getId(),
+                                SftpConstants.getCommandMessageName(response.getCmd()), status);
+                    }
+                    return -1;
+                }
+
+                throwStatusException(response.getCmd(), response.getId(), status);
+                return 0;
+            default:
+                return handleUnknownDataPacket(response);
         }
-
-        return handleUnknownDataPacket(cmd, id, type, length, buffer);
     }
 
-    protected int handleUnknownDataPacket(int cmd, int id, int type, int length, Buffer buffer) throws IOException {
-        IOException err = handleUnexpectedPacket(cmd, SftpConstants.SSH_FXP_DATA, id, type, length, buffer);
+    protected int handleUnknownDataPacket(SftpResponse response) throws IOException {
+        IOException err = handleUnexpectedPacket(SftpConstants.SSH_FXP_DATA, response);
         if (err != null) {
             throw err;
         }
@@ -828,122 +853,79 @@
         if (log.isDebugEnabled()) {
             log.debug("readDir({})[{}]: send SSH_FXP_READDIR", getClientChannel(), handle);
         }
-        int cmdId = send(SftpConstants.SSH_FXP_READDIR, buffer);
-        Buffer response = receive(cmdId);
-        return checkDirResponse(SftpConstants.SSH_FXP_READDIR, response, eolIndicator);
+        return checkDirResponse(rpc(SftpConstants.SSH_FXP_READDIR, buffer), eolIndicator);
     }
 
-    protected List<DirEntry> checkDirResponse(int cmd, Buffer buffer, AtomicReference<Boolean> eolIndicator)
+    protected List<DirEntry> checkDirResponse(SftpResponse response, AtomicReference<Boolean> eolIndicator)
             throws IOException {
         if (eolIndicator != null) {
             eolIndicator.set(null); // assume unknown
         }
 
-        int length = buffer.getInt();
-        int type = buffer.getUByte();
-        int id = buffer.getInt();
-        validateIncomingResponse(cmd, id, type, length, buffer);
-
         boolean traceEnabled = log.isTraceEnabled();
-        if (type == SftpConstants.SSH_FXP_NAME) {
-            ClientChannel channel = getClientChannel();
-            int count = buffer.getInt();
-            int version = getVersion();
-            // Protect against malicious or corrupted packets
-            if ((count < 0) || (count > SshConstants.SSH_REQUIRED_PAYLOAD_PACKET_LENGTH_SUPPORT)) {
-                log.error("checkDirResponse({})[id={}] illogical dir entries count: {}", channel, id, count);
-                throw new SshException("Illogical dir entries count: " + count);
-            }
-
-            boolean debugEnabled = log.isDebugEnabled();
-            if (debugEnabled) {
-                log.debug("checkDirResponse({})[id={}] reading {} entries", channel, id, count);
-            }
-
-            List<DirEntry> entries = new ArrayList<>(count);
-            AtomicInteger nameIndex = new AtomicInteger(0);
-            for (int index = 1; index <= count; index++) {
-                String name = getReferencedName(cmd, buffer, nameIndex.getAndIncrement());
-                String longName = null;
-                if (version == SftpConstants.SFTP_V3) {
-                    longName = getReferencedName(cmd, buffer, nameIndex.getAndIncrement());
+        switch (response.getType()) {
+            case SftpConstants.SSH_FXP_NAME:
+                ClientChannel channel = getClientChannel();
+                Buffer buffer = response.getBuffer();
+                int cmd = response.getCmd();
+                int count = buffer.getInt();
+                int version = getVersion();
+                // Protect against malicious or corrupted packets
+                if ((count < 0) || (count > SshConstants.SSH_REQUIRED_PAYLOAD_PACKET_LENGTH_SUPPORT)) {
+                    log.error("checkDirResponse({})[id={}] illogical dir entries count: {}", channel, response.getId(), count);
+                    throw new SshException("Illogical dir entries count: " + count);
                 }
 
-                Attributes attrs = readAttributes(cmd, buffer, nameIndex);
-                if (traceEnabled) {
-                    log.trace("checkDirResponse({})[id={}][{}/{}] ({})[{}]: {}",
-                            channel, id, index, count, name, longName, attrs);
+                boolean debugEnabled = log.isDebugEnabled();
+                if (debugEnabled) {
+                    log.debug("checkDirResponse({})[id={}] reading {} entries", channel, response.getId(), count);
                 }
 
-                entries.add(new DirEntry(name, longName, attrs));
-            }
+                List<DirEntry> entries = new ArrayList<>(count);
+                AtomicInteger nameIndex = new AtomicInteger(0);
+                for (int index = 1; index <= count; index++) {
+                    String name = getReferencedName(cmd, buffer, nameIndex.getAndIncrement());
+                    String longName = null;
+                    if (version == SftpConstants.SFTP_V3) {
+                        longName = getReferencedName(cmd, buffer, nameIndex.getAndIncrement());
+                    }
 
-            Boolean indicator = SftpHelper.getEndOfListIndicatorValue(buffer, version);
-            if (eolIndicator != null) {
-                eolIndicator.set(indicator);
-            }
+                    Attributes attrs = readAttributes(cmd, buffer, nameIndex);
+                    if (traceEnabled) {
+                        log.trace("checkDirResponse({})[id={}][{}/{}] ({})[{}]: {}", channel, response.getId(), index, count,
+                                name, longName, attrs);
+                    }
 
-            if (debugEnabled) {
-                log.debug("checkDirResponse({})[id={}] read count={}, eol={}",
-                        channel, id, entries.size(), indicator);
-            }
-            return entries;
-        }
+                    entries.add(new DirEntry(name, longName, attrs));
+                }
 
-        if (type == SftpConstants.SSH_FXP_STATUS) {
-            SftpStatus status = SftpStatus.parse(buffer);
+                Boolean indicator = SftpHelper.getEndOfListIndicatorValue(buffer, version);
+                if (eolIndicator != null) {
+                    eolIndicator.set(indicator);
+                }
 
-            if (status.getStatusCode() == SftpConstants.SSH_FX_EOF) {
-                if (traceEnabled) {
-                    log.trace("checkDirResponse({})[id={}] - status: {}", getClientChannel(), id, status);
+                if (debugEnabled) {
+                    log.debug("checkDirResponse({})[id={}] read count={}, eol={}", channel, response.getId(), entries.size(),
+                            indicator);
+                }
+                return entries;
+            case SftpConstants.SSH_FXP_STATUS:
+                SftpStatus status = SftpStatus.parse(response);
+
+                if (status.getStatusCode() != SftpConstants.SSH_FX_EOF) {
+                    throwStatusException(response.getCmd(), response.getId(), status);
+                } else if (traceEnabled) {
+                    log.trace("checkDirResponse({})[id={}] - status: {}", getClientChannel(), response.getId(), status);
                 }
                 return null;
-            }
-
-            throwStatusException(cmd, id, status);
-        }
-
-        return handleUnknownDirListingPacket(cmd, id, type, length, buffer);
-    }
-
-    protected void validateIncomingResponse(
-            int cmd, int id, int type, int length, Buffer buffer)
-            throws IOException {
-        int remaining = buffer.available();
-        if ((length < 0) || (length > (remaining + 5 /* type + id */))) {
-            throw new SshException("Bad length (" + length + ") for remaining data (" + remaining + ")"
-                                   + " in response to " + SftpConstants.getCommandMessageName(cmd)
-                                   + ": type=" + SftpConstants.getCommandMessageName(type) + ", id=" + id);
-        }
-        if (log.isDebugEnabled()) {
-            if (type == SftpConstants.SSH_FXP_STATUS && remaining >= 4) {
-                int rpos = buffer.rpos();
-                int status = buffer.getInt();
-                buffer.rpos(rpos);
-                if (status == SftpConstants.SSH_FX_OK
-                        && (cmd == SftpConstants.SSH_FXP_WRITE || cmd == SftpConstants.SSH_FXP_READ)) {
-                    // Only trace logging for data read/write if the status is OK
-                    if (log.isTraceEnabled()) {
-                        log.trace("validateIncomingResponse({}): received {}({}) for command {} (id={})", getClientChannel(),
-                                SftpConstants.getCommandMessageName(type), SftpConstants.getStatusName(status),
-                                SftpConstants.getCommandMessageName(cmd), id);
-                    }
-                } else {
-                    log.debug("validateIncomingResponse({}): received {}({}) for command {} (id={})", getClientChannel(),
-                            SftpConstants.getCommandMessageName(type), SftpConstants.getStatusName(status),
-                            SftpConstants.getCommandMessageName(cmd), id);
-                }
-            } else {
-                log.debug("validateIncomingResponse({}): received {} for command {} (id={})", getClientChannel(),
-                        SftpConstants.getCommandMessageName(type), SftpConstants.getCommandMessageName(cmd), id);
-            }
+            default:
+                return handleUnknownDirListingPacket(response);
         }
     }
 
-    protected List<DirEntry> handleUnknownDirListingPacket(
-            int cmd, int id, int type, int length, Buffer buffer)
+    protected List<DirEntry> handleUnknownDirListingPacket(SftpResponse response)
             throws IOException {
-        IOException err = handleUnexpectedPacket(cmd, SftpConstants.SSH_FXP_NAME, id, type, length, buffer);
+        IOException err = handleUnexpectedPacket(SftpConstants.SSH_FXP_NAME, response);
         if (err != null) {
             throw err;
         }
@@ -951,23 +933,19 @@
     }
 
     /**
-     * @param  cmd         The initial command sent
      * @param  expected    The expected packet type
-     * @param  id          The reported identifier
-     * @param  type        The reported SFTP response type
-     * @param  length      The packet length
-     * @param  buffer      The {@link Buffer} after reading from it whatever data led to this call
+     * @param  response    The actual {@link SftpResponse} received
      * @return             The exception to throw - if {@code null} then implementor assumed to handle the exception
      *                     internal. Otherwise, the exception is re-thrown
      * @throws IOException If failed to handle the exception internally
      */
-    protected IOException handleUnexpectedPacket(
-            int cmd, int expected, int id, int type, int length, Buffer buffer)
+    protected IOException handleUnexpectedPacket(int expected, SftpResponse response)
             throws IOException {
         return new SshException(
                 "Unexpected SFTP packet received while awaiting " + SftpConstants.getCommandMessageName(expected)
-                                + " response to " + SftpConstants.getCommandMessageName(cmd)
-                                + ": type=" + SftpConstants.getCommandMessageName(type) + ", id=" + id + ", length=" + length);
+                                + " response to " + SftpConstants.getCommandMessageName(response.getCmd()) + ": type="
+                                + SftpConstants.getCommandMessageName(response.getType()) + ", id=" + response.getId()
+                                + ", length=" + response.getLength());
     }
 
     @Override
diff --git a/sshd-sftp/src/main/java/org/apache/sshd/sftp/client/impl/DefaultSftpClient.java b/sshd-sftp/src/main/java/org/apache/sshd/sftp/client/impl/DefaultSftpClient.java
index aec1ca8..f486f23 100644
--- a/sshd-sftp/src/main/java/org/apache/sshd/sftp/client/impl/DefaultSftpClient.java
+++ b/sshd-sftp/src/main/java/org/apache/sshd/sftp/client/impl/DefaultSftpClient.java
@@ -388,42 +388,45 @@
 
     protected void handleInitResponse(Buffer buffer) throws IOException {
         boolean traceEnabled = log.isTraceEnabled();
+        SftpResponse response = SftpResponse.parse(SftpConstants.SSH_FXP_INIT, buffer);
         ClientChannel clientChannel = getClientChannel();
-        int length = buffer.getInt();
-        int type = buffer.getUByte();
-        int id = buffer.getInt();
+        int length = response.getLength();
+        int type = response.getType();
+        int id = response.getId();
         if (traceEnabled) {
             log.trace("handleInitResponse({}) id={} type={} len={}",
                     clientChannel, id, SftpConstants.getCommandMessageName(type), length);
         }
 
-        if (type == SftpConstants.SSH_FXP_VERSION) {
-            if ((id < SftpConstants.SFTP_V3) || (id > SftpConstants.SFTP_V6)) {
-                throw new SshException("Unsupported sftp version " + id);
-            }
-            versionHolder.set(id);
-
-            if (traceEnabled) {
-                log.trace("handleInitResponse({}) version={}", clientChannel, versionHolder);
-            }
-
-            while (buffer.available() > 0) {
-                String name = buffer.getString();
-                byte[] data = buffer.getBytes();
-                if (traceEnabled) {
-                    log.trace("handleInitResponse({}) added extension={}", clientChannel, name);
+        switch (type) {
+            case SftpConstants.SSH_FXP_VERSION:
+                if ((id < SftpConstants.SFTP_V3) || (id > SftpConstants.SFTP_V6)) {
+                    throw new SshException("Unsupported sftp version " + id);
                 }
-                extensions.put(name, data);
-            }
-        } else if (type == SftpConstants.SSH_FXP_STATUS) {
-            throwStatusException(SftpConstants.SSH_FXP_INIT, id, SftpStatus.parse(buffer));
-        } else {
-            IOException err = handleUnexpectedPacket(
-                    SftpConstants.SSH_FXP_INIT, SftpConstants.SSH_FXP_VERSION, id, type, length, buffer);
-            if (err != null) {
-                throw err;
-            }
+                versionHolder.set(id);
 
+                if (traceEnabled) {
+                    log.trace("handleInitResponse({}) version={}", clientChannel, versionHolder);
+                }
+
+                while (buffer.available() > 0) {
+                    String name = buffer.getString();
+                    byte[] data = buffer.getBytes();
+                    if (traceEnabled) {
+                        log.trace("handleInitResponse({}) added extension={}", clientChannel, name);
+                    }
+                    extensions.put(name, data);
+                }
+                break;
+            case SftpConstants.SSH_FXP_STATUS:
+                throwStatusException(SftpConstants.SSH_FXP_INIT, id, SftpStatus.parse(response));
+                break;
+            default:
+                IOException err = handleUnexpectedPacket(SftpConstants.SSH_FXP_VERSION, response);
+                if (err != null) {
+                    throw err;
+                }
+                break;
         }
     }
 
diff --git a/sshd-sftp/src/main/java/org/apache/sshd/sftp/client/impl/SftpInputStreamAsync.java b/sshd-sftp/src/main/java/org/apache/sshd/sftp/client/impl/SftpInputStreamAsync.java
index acab5b2..7208521 100644
--- a/sshd-sftp/src/main/java/org/apache/sshd/sftp/client/impl/SftpInputStreamAsync.java
+++ b/sshd-sftp/src/main/java/org/apache/sshd/sftp/client/impl/SftpInputStreamAsync.java
@@ -312,39 +312,39 @@
         }
 
         AbstractSftpClient client = getClient();
-        Buffer buf = client.receive(ack.id);
-        int length = buf.getInt();
-        int type = buf.getUByte();
-        int id = buf.getInt();
+        SftpResponse response = client.response(SftpConstants.SSH_FXP_READ, ack.id);
         if (traceEnabled) {
-            log.trace("pollBuffer({}) response={} for ack={} - len={}", this, type, ack, length);
+            log.trace("pollBuffer({}) response={} for ack={} - len={}", this, response.getType(), ack, response.getLength());
         }
-        client.validateIncomingResponse(SshConstants.SSH_MSG_CHANNEL_DATA, id, type, length, buf);
 
-        if (type == SftpConstants.SSH_FXP_DATA) {
-            int dlen = buf.getInt();
-            int rpos = buf.rpos();
-            buf.rpos(rpos + dlen);
-            Boolean b = SftpHelper.getEndOfFileIndicatorValue(buf, client.getVersion());
-            if ((b != null) && b.booleanValue()) {
-                eofIndicator = true;
-            }
-            buf.rpos(rpos);
-            buf.wpos(rpos + dlen);
-            this.buffer = buf;
-        } else if (type == SftpConstants.SSH_FXP_STATUS) {
-            SftpStatus status = SftpStatus.parse(buf);
-            if (status.getStatusCode() == SftpConstants.SSH_FX_EOF) {
-                eofIndicator = true;
-            } else {
-                client.checkResponseStatus(SshConstants.SSH_MSG_CHANNEL_DATA, id, status);
-            }
-        } else {
-            IOException err = client.handleUnexpectedPacket(SshConstants.SSH_MSG_CHANNEL_DATA,
-                    SftpConstants.SSH_FXP_STATUS, id, type, length, buf);
-            if (err != null) {
-                throw err;
-            }
+        switch (response.getType()) {
+            case SftpConstants.SSH_FXP_DATA:
+                Buffer buf = response.getBuffer();
+                int dlen = buf.getInt();
+                int rpos = buf.rpos();
+                buf.rpos(rpos + dlen);
+                Boolean b = SftpHelper.getEndOfFileIndicatorValue(buf, client.getVersion());
+                if ((b != null) && b.booleanValue()) {
+                    eofIndicator = true;
+                }
+                buf.rpos(rpos);
+                buf.wpos(rpos + dlen);
+                this.buffer = buf;
+                break;
+            case SftpConstants.SSH_FXP_STATUS:
+                SftpStatus status = SftpStatus.parse(response);
+                if (status.getStatusCode() == SftpConstants.SSH_FX_EOF) {
+                    eofIndicator = true;
+                } else {
+                    client.checkResponseStatus(SftpConstants.SSH_FXP_READ, response.getId(), status);
+                }
+                break;
+            default:
+                IOException err = client.handleUnexpectedPacket(SftpConstants.SSH_FXP_DATA, response);
+                if (err != null) {
+                    throw err;
+                }
+                break;
         }
     }
 
diff --git a/sshd-sftp/src/main/java/org/apache/sshd/sftp/client/impl/SftpOutputStreamAsync.java b/sshd-sftp/src/main/java/org/apache/sshd/sftp/client/impl/SftpOutputStreamAsync.java
index 623f4a3..0f3184c 100644
--- a/sshd-sftp/src/main/java/org/apache/sshd/sftp/client/impl/SftpOutputStreamAsync.java
+++ b/sshd-sftp/src/main/java/org/apache/sshd/sftp/client/impl/SftpOutputStreamAsync.java
@@ -150,7 +150,7 @@
 
         boolean debugEnabled = log.isDebugEnabled();
         AbstractSftpClient client = getClient();
-        for (int ackIndex = 0;;) {
+        for (int ackIndex = 1;; ackIndex++) {
             SftpAckData ack = pendingWrites.peek();
             if (ack == null) {
                 if (debugEnabled) {
@@ -159,13 +159,12 @@
                 break;
             }
 
-            ackIndex++;
             if (debugEnabled) {
                 log.debug("flush({}) waiting for ack #{}: {}", this, ackIndex, ack);
             }
 
-            Buffer response = client.receive(ack.id, 0L);
-            if (response == null) {
+            Buffer buf = client.receive(ack.id, 0L);
+            if (buf == null) {
                 if (debugEnabled) {
                     log.debug("flush({}) no response for ack #{}: {}", this, ackIndex, ack);
                 }
@@ -177,7 +176,8 @@
             }
 
             pendingWrites.removeFirst();
-            client.checkResponseStatus(SftpConstants.SSH_FXP_WRITE, response);
+            SftpResponse response = SftpResponse.parse(SftpConstants.SSH_FXP_WRITE, buf);
+            client.checkResponseStatus(SftpConstants.SSH_FXP_WRITE, ack.id, SftpStatus.parse(response));
         }
 
         if (buffer == null) {
@@ -242,11 +242,11 @@
                         log.debug("close({}) processing ack #{}: {}", this, ackIndex, ack);
                     }
 
-                    Buffer response = client.receive(ack.id);
+                    SftpResponse response = client.response(SftpConstants.SSH_FXP_WRITE, ack.id);
                     if (debugEnabled) {
                         log.debug("close({}) processing ack #{} response for {}", this, ackIndex, ack);
                     }
-                    client.checkResponseStatus(SftpConstants.SSH_FXP_WRITE, response);
+                    client.checkResponseStatus(SftpConstants.SSH_FXP_WRITE, response.getId(), SftpStatus.parse(response));
                 }
             } finally {
                 if (debugEnabled) {
diff --git a/sshd-sftp/src/main/java/org/apache/sshd/sftp/client/impl/SftpResponse.java b/sshd-sftp/src/main/java/org/apache/sshd/sftp/client/impl/SftpResponse.java
new file mode 100644
index 0000000..fd8787d
--- /dev/null
+++ b/sshd-sftp/src/main/java/org/apache/sshd/sftp/client/impl/SftpResponse.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.sftp.client.impl;
+
+import java.io.IOException;
+
+import org.apache.sshd.common.SshException;
+import org.apache.sshd.common.util.buffer.Buffer;
+import org.apache.sshd.sftp.common.SftpConstants;
+
+/**
+ * A representation of an SFTP response to a request.
+ */
+public final class SftpResponse {
+
+    private final int cmd;
+
+    private final int id;
+
+    private final int type;
+
+    private final int length;
+
+    private final Buffer buffer;
+
+    private SftpResponse(int cmd, int id, int type, int length, Buffer buffer) {
+        this.cmd = cmd;
+        this.id = id;
+        this.type = type;
+        this.length = length;
+        this.buffer = buffer;
+    }
+
+    public int getCmd() {
+        return cmd;
+    }
+
+    public int getId() {
+        return id;
+    }
+
+    public int getType() {
+        return type;
+    }
+
+    public int getLength() {
+        return length;
+    }
+
+    public Buffer getBuffer() {
+        return buffer;
+    }
+
+    public static SftpResponse parse(int cmd, Buffer buffer) throws IOException {
+        int length = buffer.getInt();
+        int type = buffer.getUByte();
+        int id = buffer.getInt();
+        validateIncomingResponse(cmd, id, type, length, buffer);
+        return new SftpResponse(cmd, id, type, length, buffer);
+    }
+
+    public static void validateIncomingResponse(int cmd, int id, int type, int length, Buffer buffer) throws IOException {
+        int remaining = buffer.available();
+        if ((length < 0) || (length > (remaining + 5 /* type + id */))) {
+            throw new SshException("Bad length (" + length + ") for remaining data (" + remaining + ")" + " in response to "
+                                   + SftpConstants.getCommandMessageName(cmd) + ": type="
+                                   + SftpConstants.getCommandMessageName(type) + ", id=" + id);
+        }
+    }
+}
diff --git a/sshd-sftp/src/main/java/org/apache/sshd/sftp/client/impl/SftpStatus.java b/sshd-sftp/src/main/java/org/apache/sshd/sftp/client/impl/SftpStatus.java
index 57baebd..5d1511f 100644
--- a/sshd-sftp/src/main/java/org/apache/sshd/sftp/client/impl/SftpStatus.java
+++ b/sshd-sftp/src/main/java/org/apache/sshd/sftp/client/impl/SftpStatus.java
@@ -56,11 +56,11 @@
 
     @Override
     public String toString() {
-        return "SSH_FXP_STATUS[" + SftpConstants.getStatusName(statusCode) + " ,language=" + language + " ,message=" + message
+        return "SSH_FXP_STATUS[" + SftpConstants.getStatusName(statusCode) + ", language=" + language + ", message=" + message
                + ']';
     }
 
-    public static SftpStatus parse(Buffer buffer) {
+    static SftpStatus parse(Buffer buffer) {
         int code = buffer.getInt();
         // Treat the message and language tag as optional. These fields did not exist in SFTP v0-2, and there are
         // apparently SFTP v3 servers that sometimes send SSH_FXP_STATUS without them.
@@ -68,4 +68,8 @@
         String language = buffer.available() > 0 ? buffer.getString() : null;
         return new SftpStatus(code, message, language);
     }
+
+    public static SftpStatus parse(SftpResponse response) {
+        return parse(response.getBuffer());
+    }
 }