blob: 08f91baf86d410968db7e6892ba5c7f780559a78 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
use crate::state::command::EntryCommand;
use crate::state::{COMPONENT, StateEntry};
use crate::streaming::persistence::persister::PersisterKind;
use crate::streaming::utils::file;
use bytes::{Buf, BufMut, Bytes, BytesMut};
use compio::io::AsyncReadExt;
use err_trail::ErrContext;
use iggy_common::BytesSerializable;
use iggy_common::EncryptorKind;
use iggy_common::IggyByteSize;
use iggy_common::IggyError;
use iggy_common::IggyTimestamp;
use iggy_common::SemanticVersion;
use std::fmt::Debug;
use std::path::Path;
use std::sync::Arc;
use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
use tracing::{debug, error, info};
pub const BUF_CURSOR_CAPACITY_BYTES: usize = 512 * 1000;
const FILE_STATE_PARSE_ERROR: &str = "STATE - failed to parse file state";
#[derive(Debug)]
pub struct FileState {
current_index: Arc<AtomicU64>,
entries_count: Arc<AtomicU64>,
current_leader: Arc<AtomicU32>,
term: Arc<AtomicU64>,
version: u32,
path: String,
persister: Arc<PersisterKind>,
encryptor: Option<EncryptorKind>,
}
impl FileState {
#[allow(clippy::too_many_arguments)]
pub fn new(
path: &str,
version: &SemanticVersion,
persister: Arc<PersisterKind>,
encryptor: Option<EncryptorKind>,
current_index: Arc<AtomicU64>,
entries_count: Arc<AtomicU64>,
current_leader: Arc<AtomicU32>,
term: Arc<AtomicU64>,
) -> Self {
Self {
current_index,
entries_count,
current_leader,
term,
path: path.into(),
persister,
encryptor,
version: version.get_numeric_version().expect("Invalid version"),
}
}
pub fn current_index(&self) -> u64 {
self.current_index.load(Ordering::SeqCst)
}
pub fn entries_count(&self) -> u64 {
self.entries_count.load(Ordering::SeqCst)
}
pub fn term(&self) -> u64 {
self.term.load(Ordering::SeqCst)
}
pub async fn init(&self) -> Result<Vec<StateEntry>, IggyError> {
assert!(Path::new(&self.path).exists());
let entries = self
.load_entries()
.await
.error(|e: &IggyError| format!("{COMPONENT} (error: {e}) - failed to load entries"))?;
let entries_count = entries.len() as u64;
self.entries_count.store(entries_count, Ordering::SeqCst);
if entries_count == 0 {
self.current_index.store(0, Ordering::SeqCst);
} else {
let last_index = entries[entries_count as usize - 1].index;
self.current_index.store(last_index, Ordering::SeqCst);
}
Ok(entries)
}
pub async fn load_entries(&self) -> Result<Vec<StateEntry>, IggyError> {
if !Path::new(&self.path).exists() {
return Err(IggyError::StateFileNotFound);
}
let file = file::open(&self.path)
.await
.error(|e: &std::io::Error| {
format!(
"{COMPONENT} (error: {e}) - failed to open state file, path: {}",
self.path
)
})
.map_err(|_| IggyError::CannotReadFile)?;
let file_size = file
.metadata()
.await
.error(|e: &std::io::Error| {
format!(
"{COMPONENT} (error: {e}) - failed to load state file metadata, path: {}",
self.path
)
})
.map_err(|_| IggyError::CannotReadFileMetadata)?
.len();
if file_size == 0 {
info!("State file is empty");
return Ok(Vec::new());
}
info!(
"Loading state, file size: {}",
IggyByteSize::from(file_size).as_human_string()
);
let mut entries = Vec::new();
let mut total_size: u64 = 0;
let mut cursor = std::io::Cursor::new(file);
let mut current_index = 0;
let mut entries_count = 0;
loop {
let index = cursor
.read_u64_le()
.await
.error(|e: &std::io::Error| format!("{FILE_STATE_PARSE_ERROR} index. {e}"))
.map_err(|_| IggyError::InvalidNumberEncoding)?;
total_size += 8;
// Greater than one, because one of the entries after a fresh reboot is the default root user.
if entries_count > 1 && index != current_index + 1 {
error!(
"State file is corrupted, expected index: {}, got: {}",
current_index + 1,
index
);
return Err(IggyError::StateFileCorrupted);
}
current_index = index;
entries_count += 1;
let term = cursor
.read_u64_le()
.await
.error(|e: &std::io::Error| format!("{FILE_STATE_PARSE_ERROR} term. {e}"))
.map_err(|_| IggyError::InvalidNumberEncoding)?;
total_size += 8;
let leader_id = cursor
.read_u32_le()
.await
.error(|e: &std::io::Error| format!("{FILE_STATE_PARSE_ERROR} leader_id. {e}"))
.map_err(|_| IggyError::InvalidNumberEncoding)?;
total_size += 4;
let version = cursor
.read_u32_le()
.await
.error(|e: &std::io::Error| format!("{FILE_STATE_PARSE_ERROR} version. {e}"))
.map_err(|_| IggyError::InvalidNumberEncoding)?;
total_size += 4;
let flags = cursor
.read_u64_le()
.await
.error(|e: &std::io::Error| format!("{FILE_STATE_PARSE_ERROR} flags. {e}"))
.map_err(|_| IggyError::InvalidNumberEncoding)?;
total_size += 8;
let timestamp = IggyTimestamp::from(
cursor
.read_u64_le()
.await
.error(|e: &std::io::Error| format!("{FILE_STATE_PARSE_ERROR} timestamp. {e}"))
.map_err(|_| IggyError::InvalidNumberEncoding)?,
);
total_size += 8;
let user_id = cursor
.read_u32_le()
.await
.error(|e: &std::io::Error| format!("{FILE_STATE_PARSE_ERROR} user_id. {e}"))
.map_err(|_| IggyError::InvalidNumberEncoding)?;
total_size += 4;
let checksum = cursor
.read_u64_le()
.await
.error(|e: &std::io::Error| format!("{FILE_STATE_PARSE_ERROR} checksum. {e}"))
.map_err(|_| IggyError::InvalidNumberEncoding)?;
total_size += 8;
let context_length = cursor
.read_u32_le()
.await
.error(|e: &std::io::Error| {
format!("{FILE_STATE_PARSE_ERROR} context context_length. {e}")
})
.map_err(|_| IggyError::InvalidNumberEncoding)?
as usize;
total_size += 4;
let mut context = BytesMut::with_capacity(context_length);
context.put_bytes(0, context_length);
let (result, context) = cursor.read_exact(context).await.into();
result
.error(|e: &std::io::Error| format!("{FILE_STATE_PARSE_ERROR} code. {e}"))
.map_err(|_| IggyError::CannotReadFile)?;
let context = context.freeze();
total_size += context_length as u64;
let code = cursor
.read_u32_le()
.await
.error(|e: &std::io::Error| format!("{FILE_STATE_PARSE_ERROR} code. {e}"))
.map_err(|_| IggyError::InvalidNumberEncoding)?;
total_size += 4;
let mut command_length = cursor
.read_u32_le()
.await
.error(|e: &std::io::Error| format!("{FILE_STATE_PARSE_ERROR} command_length. {e}"))
.map_err(|_| IggyError::InvalidNumberEncoding)?
as usize;
total_size += 4;
let mut command = BytesMut::with_capacity(command_length);
command.put_bytes(0, command_length);
let (result, command) = cursor.read_exact(command).await.into();
result
.error(|e: &std::io::Error| format!("{FILE_STATE_PARSE_ERROR} command. {e}"))
.map_err(|_| IggyError::CannotReadFile)?;
total_size += command_length as u64;
let command_payload;
if let Some(encryptor) = &self.encryptor {
debug!("Decrypting state entry with index: {index}");
command_payload = Bytes::from(encryptor.decrypt(&command.freeze())?);
command_length = command_payload.len();
} else {
command_payload = command.freeze();
}
let mut entry_command = BytesMut::with_capacity(4 + 4 + command_length);
entry_command.put_u32_le(code);
entry_command.put_u32_le(command_length as u32);
entry_command.extend(command_payload);
let command = entry_command.freeze();
EntryCommand::from_bytes(command.clone()).error(|e: &IggyError| {
format!("{COMPONENT} (error: {e}) - failed to parse entry command from bytes")
})?;
let calculated_checksum = StateEntry::calculate_checksum(
index, term, leader_id, version, flags, timestamp, user_id, &context, &command,
);
let entry = StateEntry::new(
index,
term,
leader_id,
version,
flags,
timestamp,
user_id,
calculated_checksum,
context,
command,
);
debug!("Read state entry: {entry}");
if entry.checksum != checksum {
return Err(IggyError::InvalidStateEntryChecksum(
entry.checksum,
checksum,
entry.index,
));
}
entries.push(entry);
if total_size == file_size {
break;
}
}
info!("Loaded {entries_count} state entries, current index: {current_index}");
Ok(entries)
}
pub async fn apply(&self, user_id: u32, command: &EntryCommand) -> Result<(), IggyError> {
debug!("Applying state entry with command: {command}, user ID: {user_id}");
let timestamp = IggyTimestamp::now();
let index = if self.entries_count.load(Ordering::SeqCst) == 0 {
0
} else {
self.current_index.fetch_add(1, Ordering::SeqCst) + 1
};
let term = self.term.load(Ordering::SeqCst);
let current_leader = self.current_leader.load(Ordering::SeqCst);
let version = self.version;
let flags = 0;
let context = Bytes::new();
let mut command = command.to_bytes();
let checksum = StateEntry::calculate_checksum(
index,
term,
current_leader,
version,
flags,
timestamp,
user_id,
&context,
&command,
);
if let Some(encryptor) = &self.encryptor {
debug!("Encrypting state entry command with index: {index}");
let command_code = command.slice(0..4).get_u32_le();
let mut command_length = command.slice(4..8).get_u32_le() as usize;
let command_payload = command.slice(8..8 + command_length);
let encrypted_command_payload = encryptor
.encrypt(&command_payload)
.error(|e: &IggyError| {
format!(
"{COMPONENT} (error: {e}) - failed to encrypt state entry command, index: {index}"
)
})?;
command_length = encrypted_command_payload.len();
let mut command_bytes = BytesMut::with_capacity(4 + 4 + command_length);
command_bytes.put_u32_le(command_code);
command_bytes.put_u32_le(command_length as u32);
command_bytes.extend(encrypted_command_payload);
command = command_bytes.freeze();
}
let entry = StateEntry::new(
index,
term,
current_leader,
version,
flags,
timestamp,
user_id,
checksum,
context,
command,
);
let bytes = entry.to_bytes();
let len = bytes.len();
self.entries_count.fetch_add(1, Ordering::SeqCst);
self.persister
.append(&self.path, bytes)
.await
.error(|e: &IggyError| {
format!(
"{COMPONENT} (error: {e}) - failed to append state entry data to file, path: {}, data size: {}",
self.path,
len
)
})?;
debug!("Applied state entry: {entry}");
Ok(())
}
}