blob: 9da6bc9d375e8b0cab96f20b7df70b94ed410a1f [file] [log] [blame]
use crate::binary::{command, sender::SenderKind};
use crate::command::ServerCommand;
use crate::server_error::ConnectionError;
use crate::streaming::session::Session;
use crate::streaming::systems::system::SharedSystem;
use bytes::{BufMut, BytesMut};
use iggy::bytes_serializable::BytesSerializable;
use iggy::error::IggyError;
use iggy::validatable::Validatable;
use std::io::ErrorKind;
use std::sync::Arc;
use tracing::{debug, error, info};
const INITIAL_BYTES_LENGTH: usize = 4;
pub(crate) async fn handle_connection(
session: Arc<Session>,
sender: &mut SenderKind,
system: SharedSystem,
) -> Result<(), ConnectionError> {
let mut initial_buffer = [0u8; INITIAL_BYTES_LENGTH];
loop {
let read_length = match sender.read(&mut initial_buffer).await {
Ok(read_length) => read_length,
Err(error) => {
if error.as_code() == IggyError::ConnectionClosed.as_code() {
return Err(ConnectionError::from(error));
} else {
sender.send_error_response(error).await?;
continue;
}
}
};
if read_length != INITIAL_BYTES_LENGTH {
sender.send_error_response(IggyError::CommandLengthError(format!(
"Unable to read the TCP request length, expected: {INITIAL_BYTES_LENGTH} bytes, received: {read_length} bytes."
))).await?;
continue;
}
let length = u32::from_le_bytes(initial_buffer);
debug!("Received a TCP request, length: {length}");
let mut command_buffer = BytesMut::with_capacity(length as usize);
command_buffer.put_bytes(0, length as usize);
sender.read(&mut command_buffer).await?;
let command = ServerCommand::from_bytes(command_buffer.freeze());
if command.is_err() {
error!("Received an invalid TCP command.");
sender
.send_error_response(IggyError::InvalidCommand)
.await?;
continue;
}
let command = command?;
if let Err(error) = command.validate() {
error!("Command validation failed: {error}");
sender.send_error_response(error).await?;
continue;
}
debug!("Received a TCP command: {command}, payload size: {length}");
command::handle(command, sender, &session, system.clone()).await?;
}
}
pub(crate) fn handle_error(error: ConnectionError) {
match error {
ConnectionError::IoError(error) => match error.kind() {
ErrorKind::UnexpectedEof => {
info!("Connection has been closed.");
}
ErrorKind::ConnectionAborted => {
info!("Connection has been aborted.");
}
ErrorKind::ConnectionRefused => {
info!("Connection has been refused.");
}
ErrorKind::ConnectionReset => {
info!("Connection has been reset.");
}
_ => {
error!("Connection has failed: {error}");
}
},
ConnectionError::SdkError(sdk_error) => match sdk_error {
IggyError::ConnectionClosed => {
debug!("Client closed connection.");
}
_ => {
error!("Failure in internal SDK call: {sdk_error}");
}
},
_ => {
error!("Connection has failed: {error}");
}
}
}