blob: 7cfb73cb668fa0966ccb9af33d09ec1b8720f8bb [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import random
import threading
import time
import unittest
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import TimeoutError
from unittest.mock import patch
from apache_beam.utils import multi_process_shared
try:
from apache_beam.ml.inference.model_manager import GPUMonitor
from apache_beam.ml.inference.model_manager import ModelManager
from apache_beam.ml.inference.model_manager import ResourceEstimator
except ImportError as e:
raise unittest.SkipTest("Model Manager dependencies are not installed")
class MockGPUMonitor:
"""
Simulates GPU hardware with cumulative memory tracking.
Allows simulating specific allocation spikes and baseline usage.
"""
def __init__(self, total_memory=12000.0, peak_window: int = 5):
self._current = 0.0
self._peak = 0.0
self._total = total_memory
self._lock = threading.Lock()
self.running = False
self.history = []
self.peak_window = peak_window
def start(self):
self.running = True
def stop(self):
self.running = False
def get_stats(self):
with self._lock:
return self._current, self._peak, self._total
def reset_peak(self):
with self._lock:
self._peak = self._current
self.history = [self._current]
def set_usage(self, current_mb):
"""Sets absolute usage (legacy helper)."""
with self._lock:
self._current = current_mb
self._peak = max(self._peak, current_mb)
def allocate(self, amount_mb):
"""Simulates memory allocation (e.g., tensors loaded to VRAM)."""
with self._lock:
self._current += amount_mb
self.history.append(self._current)
if len(self.history) > self.peak_window:
self.history.pop(0)
self._peak = max(self.history)
def free(self, amount_mb):
"""Simulates memory freeing (not used often if pooling is active)."""
with self._lock:
self._current = max(0.0, self._current - amount_mb)
self.history.append(self._current)
if len(self.history) > self.peak_window:
self.history.pop(0)
self._peak = max(self.history)
def refresh(self):
"""Simulates a refresh of the monitor stats (no-op for mock)."""
pass
class MockModel:
def __init__(self, name, size, monitor):
self.name = name
self.size = size
self.monitor = monitor
self.deleted = False
self.monitor.allocate(size)
def mock_model_unsafe_hard_delete(self):
if not self.deleted:
self.monitor.free(self.size)
self.deleted = True
class Counter(object):
def __init__(self, start=0):
self.running = start
self.lock = threading.Lock()
def get(self):
return self.running
def increment(self, value=1):
with self.lock:
self.running += value
return self.running
class TestModelManager(unittest.TestCase):
def setUp(self):
"""Force reset the Singleton ModelManager before every test."""
self.mock_monitor = MockGPUMonitor()
self.manager = ModelManager(monitor=self.mock_monitor)
def tearDown(self):
self.manager.shutdown()
def test_model_manager_deletes_multiprocessshared_instances(self):
"""Test that MultiProcessShared instances are deleted properly."""
model_name = "test_model_shared"
tag = f"model_manager_test_{model_name}"
def loader():
multi_process_shared.MultiProcessShared(
lambda: Counter, tag=tag, always_proxy=True)
return tag
instance = self.manager.acquire_model(model_name, loader)
instance_before = multi_process_shared.MultiProcessShared(
Counter, tag=tag, always_proxy=True).acquire()
instance_before.increment()
self.assertEqual(instance_before.get(), 1)
self.manager.release_model(model_name, instance)
# Force delete all models
self.manager._force_reset()
# Verify that the MultiProcessShared instance is deleted
# and the counter is reseted
with self.assertRaises(Exception):
instance_before.get()
instance_after = multi_process_shared.MultiProcessShared(
Counter, tag=tag, always_proxy=True).acquire()
self.assertEqual(instance_after.get(), 0)
def test_model_manager_timeout_on_acquire(self):
"""Test that acquiring a model times out properly."""
model_name = "timeout_model"
self.manager = ModelManager(
monitor=self.mock_monitor,
wait_timeout_seconds=1.0,
lock_timeout_seconds=1.0)
def loader():
self.mock_monitor.allocate(self.mock_monitor._total)
return model_name
# Acquire the model in one thread to block others
_ = self.manager.acquire_model(model_name, loader)
def acquire_model_with_timeout():
return self.manager.acquire_model(model_name, loader)
with ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(acquire_model_with_timeout)
with self.assertRaises(RuntimeError) as context:
future.result(timeout=5.0)
self.assertIn("Timeout waiting to acquire model", str(context.exception))
def test_model_manager_capacity_check(self):
"""
Test that the manager blocks when spawning models exceeds the limit,
and unblocks when resources become available (via reuse).
"""
model_name = "known_model"
model_cost = 3000.0
self.manager._estimator.set_initial_estimate(model_name, model_cost)
acquired_refs = []
def loader():
self.mock_monitor.allocate(model_cost)
return model_name
# 1. Saturate GPU with 3 models (9000 MB usage)
for _ in range(3):
inst = self.manager.acquire_model(model_name, loader)
acquired_refs.append(inst)
# 2. Spawn one more (Should Block because 9000 + 3000 > Limit)
def run_inference():
return self.manager.acquire_model(model_name, loader)
with ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(run_inference)
try:
future.result(timeout=0.5)
self.fail("Should have blocked due to capacity")
except TimeoutError:
pass
# 3. Release resources to unblock
item_to_release = acquired_refs.pop()
self.manager.release_model(model_name, item_to_release)
result = future.result(timeout=2.0)
self.assertIsNotNone(result)
self.assertEqual(result, item_to_release)
def test_model_manager_unknown_model_runs_isolated(self):
"""Test that a model with no history runs in isolation."""
model_name = "unknown_model_v1"
self.assertTrue(self.manager._estimator.is_unknown(model_name))
def dummy_loader():
time.sleep(0.05)
return "model_instance"
instance = self.manager.acquire_model(model_name, dummy_loader)
self.assertTrue(self.manager._isolation_mode)
self.assertEqual(self.manager._total_active_jobs, 1)
self.manager.release_model(model_name, instance)
self.assertFalse(self.manager._isolation_mode)
self.assertFalse(self.manager._estimator.is_unknown(model_name))
def test_model_manager_concurrent_execution(self):
"""Test that multiple small known models can run together."""
model_a = "small_model_a"
model_b = "small_model_b"
self.manager._estimator.set_initial_estimate(model_a, 1000.0)
self.manager._estimator.set_initial_estimate(model_b, 1000.0)
self.mock_monitor.set_usage(1000.0)
inst_a = self.manager.acquire_model(model_a, lambda: "A")
inst_b = self.manager.acquire_model(model_b, lambda: "B")
self.assertEqual(self.manager._total_active_jobs, 2)
self.manager.release_model(model_a, inst_a)
self.manager.release_model(model_b, inst_b)
self.assertEqual(self.manager._total_active_jobs, 0)
def test_model_manager_concurrent_mixed_workload_convergence(self):
"""
Simulates a production environment with multiple model types running
concurrently. Verifies that the estimator converges.
"""
TRUE_COSTS = {"model_small": 1500.0, "model_medium": 3000.0}
def run_job(model_name):
cost = TRUE_COSTS[model_name]
def loader():
model = MockModel(model_name, cost, self.mock_monitor)
return model
instance = self.manager.acquire_model(model_name, loader)
time.sleep(random.uniform(0.01, 0.05))
self.manager.release_model(model_name, instance)
# Create a workload stream
workload = ["model_small"] * 15 + ["model_medium"] * 15
random.shuffle(workload)
with ThreadPoolExecutor(max_workers=8) as executor:
futures = [executor.submit(run_job, name) for name in workload]
for f in futures:
f.result()
est_small = self.manager._estimator.get_estimate("model_small")
est_med = self.manager._estimator.get_estimate("model_medium")
self.assertAlmostEqual(est_small, TRUE_COSTS["model_small"], delta=100.0)
self.assertAlmostEqual(est_med, TRUE_COSTS["model_medium"], delta=100.0)
def test_model_manager_oom_recovery(self):
"""Test that the manager recovers state if a loader crashes."""
model_name = "crasher_model"
self.manager._estimator.set_initial_estimate(model_name, 1000.0)
def crashing_loader():
raise RuntimeError("CUDA OOM or similar")
with self.assertRaises(RuntimeError):
self.manager.acquire_model(model_name, crashing_loader)
self.assertEqual(self.manager._total_active_jobs, 0)
self.assertEqual(self.manager._pending_reservations, 0.0)
self.assertFalse(self.manager._cv._is_owned())
def test_model_manager_force_reset_on_exception(self):
"""Test that force_reset clears all models from the manager."""
model_name = "test_model"
def dummy_loader():
self.mock_monitor.allocate(1000.0)
raise RuntimeError("Simulated loader exception")
try:
instance = self.manager.acquire_model(
model_name, lambda: "model_instance")
self.manager.release_model(model_name, instance)
instance = self.manager.acquire_model(model_name, dummy_loader)
except RuntimeError:
self.manager._force_reset()
self.assertTrue(len(self.manager._models[model_name]) == 0)
self.assertEqual(self.manager._total_active_jobs, 0)
self.assertEqual(self.manager._pending_reservations, 0.0)
self.assertFalse(self.manager._isolation_mode)
pass
instance = self.manager.acquire_model(model_name, lambda: "model_instance")
self.manager.release_model(model_name, instance)
def test_single_model_convergence_with_fluctuations(self):
"""
Tests that the estimator converges to the true usage with fluctuations.
"""
model_name = "fluctuating_model"
model_cost = 3000.0
# Fix random seed for reproducibility
random.seed(42)
def loader():
self.mock_monitor.allocate(model_cost)
return model_name
model = self.manager.acquire_model(model_name, loader)
self.manager.release_model(model_name, model)
initial_est = self.manager._estimator.get_estimate(model_name)
self.assertEqual(initial_est, model_cost)
def run_inference():
model = self.manager.acquire_model(model_name, loader)
noise = random.uniform(-300.0, 300.0)
self.mock_monitor.allocate(noise)
time.sleep(0.1)
self.mock_monitor.free(noise)
self.manager.release_model(model_name, model)
return
with ThreadPoolExecutor(max_workers=8) as executor:
futures = [executor.submit(run_inference) for _ in range(100)]
for f in futures:
f.result()
est_cost = self.manager._estimator.get_estimate(model_name)
self.assertAlmostEqual(est_cost, model_cost, delta=100.0)
class TestModelManagerEviction(unittest.TestCase):
def setUp(self):
self.mock_monitor = MockGPUMonitor(total_memory=12000.0)
self.manager = ModelManager(
monitor=self.mock_monitor,
slack_percentage=0.0,
min_data_points=1,
eviction_cooldown_seconds=10.0,
min_model_copies=1)
def tearDown(self):
self.manager.shutdown()
def create_loader(self, name, size):
return lambda: MockModel(name, size, self.mock_monitor)
def test_basic_lru_eviction(self):
self.manager._estimator.set_initial_estimate("A", 4000)
self.manager._estimator.set_initial_estimate("B", 4000)
self.manager._estimator.set_initial_estimate("C", 5000)
model_a = self.manager.acquire_model("A", self.create_loader("A", 4000))
self.manager.release_model("A", model_a)
model_b = self.manager.acquire_model("B", self.create_loader("B", 4000))
self.manager.release_model("B", model_b)
key_a = list(self.manager._idle_lru.keys())[0]
self.manager._idle_lru[key_a] = ("A", model_a, time.time() - 20.0)
key_b = list(self.manager._idle_lru.keys())[1]
self.manager._idle_lru[key_b] = ("B", model_b, time.time() - 20.0)
model_a_again = self.manager.acquire_model(
"A", self.create_loader("A", 4000))
self.manager.release_model("A", model_a_again)
self.manager.acquire_model("C", self.create_loader("C", 5000))
self.assertEqual(len(self.manager.all_models("B")), 0)
self.assertEqual(len(self.manager.all_models("A")), 1)
def test_chained_eviction(self):
self.manager._estimator.set_initial_estimate("big_guy", 8000)
models = []
for i in range(4):
name = f"small_{i}"
m = self.manager.acquire_model(name, self.create_loader(name, 3000))
self.manager.release_model(name, m)
models.append(m)
self.manager.acquire_model("big_guy", self.create_loader("big_guy", 8000))
self.assertTrue(models[0].deleted)
self.assertTrue(models[1].deleted)
self.assertTrue(models[2].deleted)
self.assertFalse(models[3].deleted)
def test_active_models_are_protected(self):
self.manager._estimator.set_initial_estimate("A", 6000)
self.manager._estimator.set_initial_estimate("B", 4000)
self.manager._estimator.set_initial_estimate("C", 4000)
model_a = self.manager.acquire_model("A", self.create_loader("A", 6000))
model_b = self.manager.acquire_model("B", self.create_loader("B", 4000))
self.manager.release_model("B", model_b)
key_b = list(self.manager._idle_lru.keys())[0]
self.manager._idle_lru[key_b] = ("B", model_b, time.time() - 20.0)
def acquire_c():
return self.manager.acquire_model("C", self.create_loader("C", 4000))
with ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(acquire_c)
model_c = future.result(timeout=2.0)
self.assertTrue(model_b.deleted)
self.assertFalse(model_a.deleted)
self.manager.release_model("A", model_a)
self.manager.release_model("C", model_c)
def test_unknown_model_clears_memory(self):
self.manager._estimator.set_initial_estimate("A", 2000)
model_a = self.manager.acquire_model("A", self.create_loader("A", 2000))
self.manager.release_model("A", model_a)
self.assertFalse(model_a.deleted)
self.assertTrue(self.manager._estimator.is_unknown("X"))
model_x = self.manager.acquire_model("X", self.create_loader("X", 10000))
self.assertTrue(model_a.deleted, "Model A should be deleted for isolation")
self.assertEqual(len(self.manager.all_models("A")), 0)
self.assertTrue(self.manager._isolation_mode)
self.manager.release_model("X", model_x)
def test_concurrent_eviction_pressure(self):
def worker(idx):
name = f"model_{idx % 5}"
try:
m = self.manager.acquire_model(name, self.create_loader(name, 4000))
time.sleep(0.001)
self.manager.release_model(name, m)
except Exception:
pass
with ThreadPoolExecutor(max_workers=8) as executor:
futures = [executor.submit(worker, i) for i in range(50)]
for f in futures:
f.result()
curr, _, _ = self.mock_monitor.get_stats()
expected_usage = 0
for _, instances in self.manager._models.items():
expected_usage += len(instances) * 4000
self.assertAlmostEqual(curr, expected_usage)
def test_starvation_prevention_overrides_demand(self):
self.manager._estimator.set_initial_estimate("A", 12000)
m_a = self.manager.acquire_model("A", self.create_loader("A", 12000))
self.manager.release_model("A", m_a)
def cycle_a():
try:
m = self.manager.acquire_model("A", self.create_loader("A", 12000))
time.sleep(0.3)
self.manager.release_model("A", m)
except Exception:
pass
executor = ThreadPoolExecutor(max_workers=5)
for _ in range(5):
executor.submit(cycle_a)
def acquire_b():
return self.manager.acquire_model("B", self.create_loader("B", 4000))
b_future = executor.submit(acquire_b)
model_b = b_future.result()
self.assertTrue(m_a.deleted)
self.manager.release_model("B", model_b)
executor.shutdown(wait=True)
class TestGPUMonitor(unittest.TestCase):
def setUp(self):
self.subprocess_patcher = patch('subprocess.check_output')
self.mock_subprocess = self.subprocess_patcher.start()
def tearDown(self):
self.subprocess_patcher.stop()
def test_init_hardware_detected(self):
"""Test that init correctly reads total memory when nvidia-smi exists."""
self.mock_subprocess.return_value = "24576"
monitor = GPUMonitor()
monitor.start()
self.assertTrue(monitor._gpu_available)
self.assertEqual(monitor._total_memory, 24576.0)
def test_init_hardware_missing(self):
"""Test fallback behavior when nvidia-smi is missing."""
self.mock_subprocess.side_effect = FileNotFoundError()
monitor = GPUMonitor(fallback_memory_mb=12000.0)
monitor.start()
self.assertFalse(monitor._gpu_available)
self.assertEqual(monitor._total_memory, 12000.0)
@patch('time.sleep')
def test_polling_updates_stats(self, mock_sleep):
"""Test that the polling loop updates current and peak usage."""
def subprocess_side_effect(*args, **kwargs):
if isinstance(args[0], list) and "memory.total" in args[0][1]:
return "16000"
if isinstance(args[0], list) and any("memory.free" in part
for part in args[0]):
return "12000"
raise Exception("Unexpected command")
self.mock_subprocess.side_effect = subprocess_side_effect
self.mock_subprocess.return_value = None
monitor = GPUMonitor()
monitor.start()
time.sleep(0.1)
curr, peak, total = monitor.get_stats()
monitor.stop()
self.assertEqual(curr, 4000.0)
self.assertEqual(peak, 4000.0)
self.assertEqual(total, 16000.0)
def test_reset_peak(self):
"""Test that resetting peak usage works."""
monitor = GPUMonitor()
monitor._gpu_available = True
with monitor._lock:
monitor._current_usage = 2000.0
monitor._peak_usage = 8000.0
monitor._memory_history.append((time.time(), 8000.0))
monitor._memory_history.append((time.time(), 2000.0))
monitor.reset_peak()
_, peak, _ = monitor.get_stats()
self.assertEqual(peak, 2000.0)
class TestResourceEstimatorSolver(unittest.TestCase):
def setUp(self):
self.estimator = ResourceEstimator()
@patch('apache_beam.ml.inference.model_manager.nnls')
def test_solver_respects_min_data_points(self, mock_nnls):
mock_nnls.return_value = ([100.0, 50.0], 0.0)
self.estimator.add_observation({'model_A': 1}, 500)
self.estimator.add_observation({'model_B': 1}, 500)
self.assertFalse(mock_nnls.called)
self.estimator.add_observation({'model_A': 1, 'model_B': 1}, 1000)
self.assertFalse(mock_nnls.called)
self.estimator.add_observation({'model_A': 1}, 500)
self.assertFalse(mock_nnls.called)
self.estimator.add_observation({'model_B': 1}, 500)
self.assertTrue(mock_nnls.called)
@patch('apache_beam.ml.inference.model_manager.nnls')
def test_solver_respects_unique_model_constraint(self, mock_nnls):
mock_nnls.return_value = ([100.0, 100.0, 50.0], 0.0)
for _ in range(5):
self.estimator.add_observation({'model_A': 1, 'model_B': 1}, 800)
for _ in range(5):
self.estimator.add_observation({'model_C': 1}, 400)
self.assertFalse(mock_nnls.called)
self.estimator.add_observation({'model_A': 1}, 300)
self.estimator.add_observation({'model_B': 1}, 300)
self.assertTrue(mock_nnls.called)
if __name__ == "__main__":
unittest.main()