blob: 13858b62c4291072b465e0f257313353baad572d [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
pub mod bus;
pub mod client;
pub mod deps;
pub mod replica;
use bus::MemBus;
use consensus::{Plane, PlaneIdentity};
use iggy_common::header::{GenericHeader, ReplyHeader};
use iggy_common::message::{Message, MessageBag};
use message_bus::MessageBus;
use replica::Replica;
use std::sync::Arc;
pub struct Simulator {
pub replicas: Vec<Replica>,
pub message_bus: Arc<MemBus>,
}
impl Simulator {
/// Initialize a partition with its own consensus group on all replicas.
pub fn init_partition(&mut self, namespace: iggy_common::sharding::IggyNamespace) {
for replica in &mut self.replicas {
replica.init_partition(namespace);
}
}
pub fn new(replica_count: usize, clients: impl Iterator<Item = u128>) -> Self {
let mut message_bus = MemBus::new();
for client in clients {
message_bus.add_client(client, ());
}
for i in 0..replica_count as u8 {
message_bus.add_replica(i);
}
let message_bus = Arc::new(message_bus);
let replicas = (0..replica_count)
.map(|i| {
Replica::new(
i as u8,
format!("replica-{}", i),
Arc::clone(&message_bus),
replica_count as u8,
)
})
.collect();
Self {
replicas,
message_bus,
}
}
pub fn with_message_bus(replica_count: usize, mut message_bus: MemBus) -> Self {
for i in 0..replica_count as u8 {
message_bus.add_replica(i);
}
let message_bus = Arc::new(message_bus);
let replicas = (0..replica_count)
.map(|i| {
Replica::new(
i as u8,
format!("replica-{}", i),
Arc::clone(&message_bus),
replica_count as u8,
)
})
.collect();
Self {
replicas,
message_bus,
}
}
}
impl Simulator {
pub async fn step(&self) -> Option<Message<ReplyHeader>> {
if let Some(envelope) = self.message_bus.receive() {
if let Some(_client_id) = envelope.to_client {
let reply: Message<ReplyHeader> = envelope
.message
.try_into_typed()
.expect("invalid message, wrong command type for an client response");
return Some(reply);
}
if let Some(replica_id) = envelope.to_replica
&& let Some(replica) = self.replicas.get(replica_id as usize)
{
self.dispatch_to_replica(replica, envelope.message).await;
}
}
None
}
async fn dispatch_to_replica(&self, replica: &Replica, message: Message<GenericHeader>) {
let planes = replica.plane.inner();
match MessageBag::from(message) {
MessageBag::Request(request) => {
if planes.0.is_applicable(&request) {
planes.0.on_request(request).await;
} else {
planes.1.0.on_request(request).await;
}
}
MessageBag::Prepare(prepare) => {
if planes.0.is_applicable(&prepare) {
planes.0.on_replicate(prepare).await;
} else {
planes.1.0.on_replicate(prepare).await;
}
}
MessageBag::PrepareOk(prepare_ok) => {
if planes.0.is_applicable(&prepare_ok) {
planes.0.on_ack(prepare_ok).await;
} else {
planes.1.0.on_ack(prepare_ok).await;
}
}
}
}
}
// TODO(IGGY-66): Add acceptance test for per-partition consensus independence.
// Setup: 3-replica simulator, two partitions (ns_a, ns_b).
// 1. Fill ns_a's pipeline to PIPELINE_PREPARE_QUEUE_MAX without delivering acks.
// 2. Send a request to ns_b, step until ns_b reply arrives.
// 3. Assert ns_b committed while ns_a pipeline is still full.
// Requires namespace-aware stepping (filter bus by namespace) or two-phase delivery.