blob: 2b6b6d1729e9d38b9baebd9ade7ca94998a066cf [file] [log] [blame]
/* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
use iggy_common::IggyError;
use std::io::IoSlice;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use tracing::debug;
const STATUS_OK: &[u8] = &[0; 4];
pub(crate) async fn read<T>(stream: &mut T, buffer: &mut [u8]) -> Result<usize, IggyError>
where
T: AsyncRead + AsyncWrite + Unpin,
{
match stream.read_exact(buffer).await {
Ok(0) => Err(IggyError::ConnectionClosed),
Ok(read_bytes) => Ok(read_bytes),
Err(error) => {
if error.kind() == std::io::ErrorKind::UnexpectedEof {
Err(IggyError::ConnectionClosed)
} else {
Err(IggyError::TcpError)
}
}
}
}
pub(crate) async fn send_empty_ok_response<T>(stream: &mut T) -> Result<(), IggyError>
where
T: AsyncRead + AsyncWrite + Unpin,
{
send_ok_response(stream, &[]).await
}
pub(crate) async fn send_ok_response<T>(stream: &mut T, payload: &[u8]) -> Result<(), IggyError>
where
T: AsyncRead + AsyncWrite + Unpin,
{
send_response(stream, STATUS_OK, payload).await
}
pub(crate) async fn send_ok_response_vectored<T>(
stream: &mut T,
length: &[u8],
slices: Vec<IoSlice<'_>>,
) -> Result<(), IggyError>
where
T: AsyncRead + AsyncWrite + Unpin,
{
send_response_vectored(stream, STATUS_OK, length, slices).await
}
pub(crate) async fn send_error_response<T>(
stream: &mut T,
error: IggyError,
) -> Result<(), IggyError>
where
T: AsyncRead + AsyncWrite + Unpin,
{
send_response(stream, &error.as_code().to_le_bytes(), &[]).await
}
pub(crate) async fn send_response<T>(
stream: &mut T,
status: &[u8],
payload: &[u8],
) -> Result<(), IggyError>
where
T: AsyncRead + AsyncWrite + Unpin,
{
debug!(
"Sending response of len: {} with status: {:?}...",
payload.len(),
status
);
let length = (payload.len() as u32).to_le_bytes();
stream
.write_all(&[status, &length, payload].as_slice().concat())
.await
.map_err(|_| IggyError::TcpError)?;
debug!("Sent response with status: {:?}", status);
Ok(())
}
pub(crate) async fn send_response_vectored<T>(
stream: &mut T,
status: &[u8],
length: &[u8],
mut slices: Vec<IoSlice<'_>>,
) -> Result<(), IggyError>
where
T: AsyncReadExt + AsyncWriteExt + Unpin,
{
debug!(
"Sending vectored response of len: {} with status: {:?}...",
slices.len(),
status
);
let prefix = [IoSlice::new(status), IoSlice::new(length)];
slices.splice(0..0, prefix);
let mut slice_refs = slices.as_mut_slice();
while !slice_refs.is_empty() {
let bytes_written = stream
.write_vectored(slice_refs)
.await
.map_err(|_| IggyError::TcpError)?;
IoSlice::advance_slices(&mut slice_refs, bytes_written);
}
debug!("Sent response with status: {:?}", status);
Ok(())
}