blob: 680c17b5c739b8aabfd8470ddec0463e3ec2d5b0 [file] [log] [blame]
package performance
import (
"context"
"fmt"
"sync"
"testing"
"time"
"github.com/apache/airavata/scheduler/core/domain"
"github.com/apache/airavata/scheduler/tests/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
// TestConcurrentExperimentSubmissions tests the system's ability to handle concurrent experiment submissions
func TestConcurrentExperimentSubmissions(t *testing.T) {
suite := testutil.SetupIntegrationTest(t)
defer suite.Cleanup()
// Start all SLURM clusters
err := suite.Compose.StartServices(t, "slurm-controller", "slurm-node-1", "slurm-node-2", "slurm-node-3")
require.NoError(t, err)
// Wait for clusters to be ready
err = suite.Compose.WaitForServices(t, 3*time.Minute)
require.NoError(t, err)
// Inject SSH keys into all containers
err = suite.InjectSSHKeys("slurm-controller", "slurm-node-1", "slurm-node-2", "slurm-node-3")
require.NoError(t, err)
// Register all SLURM clusters
clusters, err := suite.RegisterAllSlurmClusters()
require.NoError(t, err)
require.Len(t, clusters, 3)
// Test concurrent experiment submissions
numExperiments := 20
var wg sync.WaitGroup
results := make(chan error, numExperiments)
startTime := time.Now()
for i := 0; i < numExperiments; i++ {
wg.Add(1)
go func(index int) {
defer wg.Done()
// Create experiment
req := &domain.CreateExperimentRequest{
Name: fmt.Sprintf("concurrent-exp-%d", index),
Description: fmt.Sprintf("Concurrent experiment %d", index),
ProjectID: suite.TestProject.ID,
CommandTemplate: "echo 'Hello from concurrent experiment' && sleep 2",
Parameters: []domain.ParameterSet{
{
Values: map[string]string{
"param1": fmt.Sprintf("value%d", index),
},
},
},
Requirements: &domain.ResourceRequirements{
CPUCores: 1,
MemoryMB: 1024,
DiskGB: 1,
Walltime: "0:05:00",
},
}
exp, err := suite.OrchestratorSvc.CreateExperiment(context.Background(), req, suite.TestUser.ID)
if err != nil {
results <- err
return
}
// Submit to a random cluster
cluster := clusters[index%len(clusters)]
err = suite.SubmitToCluster(exp.Experiment, cluster)
results <- err
}(i)
}
wg.Wait()
close(results)
// Check results
var errors []error
for err := range results {
if err != nil {
errors = append(errors, err)
}
}
duration := time.Since(startTime)
t.Logf("Submitted %d experiments in %v", numExperiments, duration)
t.Logf("Throughput: %.2f experiments/second", float64(numExperiments)/duration.Seconds())
// Allow some failures but not too many
assert.Less(t, len(errors), numExperiments/4, "Too many submission failures: %d", len(errors))
}
// TestConcurrentExperimentQueries tests concurrent experiment queries
func TestConcurrentExperimentQueries(t *testing.T) {
suite := testutil.SetupIntegrationTest(t)
defer suite.Cleanup()
// Create multiple experiments first
numExperiments := 50
for i := 0; i < numExperiments; i++ {
req := &domain.CreateExperimentRequest{
Name: fmt.Sprintf("query-exp-%d", i),
Description: fmt.Sprintf("Query experiment %d", i),
ProjectID: suite.TestProject.ID,
CommandTemplate: "echo 'Hello World'",
Parameters: []domain.ParameterSet{
{
Values: map[string]string{
"param1": fmt.Sprintf("value%d", i),
},
},
},
}
_, err := suite.OrchestratorSvc.CreateExperiment(context.Background(), req, suite.TestUser.ID)
require.NoError(t, err)
}
// Test concurrent queries
numQueries := 100
var wg sync.WaitGroup
results := make(chan error, numQueries)
startTime := time.Now()
for i := 0; i < numQueries; i++ {
wg.Add(1)
go func(index int) {
defer wg.Done()
// List experiments
req := &domain.ListExperimentsRequest{
ProjectID: suite.TestProject.ID,
OwnerID: suite.TestUser.ID,
Limit: 10,
Offset: index % 5, // Vary offset to test different queries
}
_, err := suite.OrchestratorSvc.ListExperiments(context.Background(), req)
results <- err
}(i)
}
wg.Wait()
close(results)
// Check results
var errors []error
for err := range results {
if err != nil {
errors = append(errors, err)
}
}
duration := time.Since(startTime)
t.Logf("Executed %d queries in %v", numQueries, duration)
t.Logf("Query throughput: %.2f queries/second", float64(numQueries)/duration.Seconds())
// All queries should succeed
assert.Empty(t, errors, "Query failures: %v", errors)
}
func TestDatabaseConnectionPooling(t *testing.T) {
suite := testutil.SetupIntegrationTest(t)
defer suite.Cleanup()
// Test concurrent database operations
numOperations := 100
var wg sync.WaitGroup
results := make(chan error, numOperations)
startTime := time.Now()
for i := 0; i < numOperations; i++ {
wg.Add(1)
go func(index int) {
defer wg.Done()
// Create a user (database operation)
user := &domain.User{
ID: fmt.Sprintf("test-user-%d", index),
Username: fmt.Sprintf("user%d", index),
Email: fmt.Sprintf("user%d@example.com", index),
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
}
err := suite.DB.Repo.CreateUser(context.Background(), user)
results <- err
}(i)
}
wg.Wait()
close(results)
// Check results
var errors []error
for err := range results {
if err != nil {
errors = append(errors, err)
}
}
duration := time.Since(startTime)
t.Logf("Executed %d database operations in %v", numOperations, duration)
t.Logf("Database throughput: %.2f operations/second", float64(numOperations)/duration.Seconds())
// All operations should succeed
assert.Empty(t, errors, "Database operation failures: %v", errors)
}
func TestHighLoadTaskScheduling(t *testing.T) {
suite := testutil.SetupIntegrationTest(t)
defer suite.Cleanup()
// Start all SLURM clusters
err := suite.Compose.StartServices(t, "slurm-controller", "slurm-node-1", "slurm-node-2", "slurm-node-3")
require.NoError(t, err)
// Wait for clusters to be ready
err = suite.Compose.WaitForServices(t, 3*time.Minute)
require.NoError(t, err)
// Inject SSH keys into all containers
err = suite.InjectSSHKeys("slurm-controller", "slurm-node-1", "slurm-node-2", "slurm-node-3")
require.NoError(t, err)
// Register all SLURM clusters
clusters, err := suite.RegisterAllSlurmClusters()
require.NoError(t, err)
require.Len(t, clusters, 3)
// Create many experiments with multiple tasks each
numExperiments := 10
tasksPerExperiment := 5
totalTasks := numExperiments * tasksPerExperiment
var experiments []*domain.Experiment
for i := 0; i < numExperiments; i++ {
// Create experiment with multiple parameter sets (tasks)
var parameters []domain.ParameterSet
for j := 0; j < tasksPerExperiment; j++ {
parameters = append(parameters, domain.ParameterSet{
Values: map[string]string{
"param1": fmt.Sprintf("value%d-%d", i, j),
},
})
}
req := &domain.CreateExperimentRequest{
Name: fmt.Sprintf("load-exp-%d", i),
Description: fmt.Sprintf("Load test experiment %d", i),
ProjectID: suite.TestProject.ID,
CommandTemplate: "echo 'Hello from load test' && sleep 1",
Parameters: parameters,
Requirements: &domain.ResourceRequirements{
CPUCores: 1,
MemoryMB: 1024,
DiskGB: 1,
Walltime: "0:05:00",
},
}
exp, err := suite.OrchestratorSvc.CreateExperiment(context.Background(), req, suite.TestUser.ID)
require.NoError(t, err)
experiments = append(experiments, exp.Experiment)
}
// Submit all experiments concurrently
var wg sync.WaitGroup
results := make(chan error, numExperiments)
startTime := time.Now()
for i, exp := range experiments {
wg.Add(1)
go func(index int, experiment *domain.Experiment) {
defer wg.Done()
// Submit to a random cluster
cluster := clusters[index%len(clusters)]
err := suite.SubmitToCluster(experiment, cluster)
results <- err
}(i, exp)
}
wg.Wait()
close(results)
// Check results
var errors []error
for err := range results {
if err != nil {
errors = append(errors, err)
}
}
duration := time.Since(startTime)
t.Logf("Scheduled %d experiments (%d total tasks) in %v", numExperiments, totalTasks, duration)
t.Logf("Scheduling throughput: %.2f experiments/second", float64(numExperiments)/duration.Seconds())
t.Logf("Task throughput: %.2f tasks/second", float64(totalTasks)/duration.Seconds())
// Allow some failures but not too many
assert.Less(t, len(errors), numExperiments/4, "Too many scheduling failures: %d", len(errors))
// Wait for some tasks to complete
time.Sleep(10 * time.Second)
// Check task distribution across clusters
clusterTaskCounts := make(map[string]int)
for _, exp := range experiments {
tasks, _, err := suite.DB.Repo.ListTasksByExperiment(context.Background(), exp.ID, 100, 0)
require.NoError(t, err)
for _, task := range tasks {
if task.ComputeResourceID != "" {
clusterTaskCounts[task.ComputeResourceID]++
}
}
}
t.Logf("Task distribution across clusters: %v", clusterTaskCounts)
// Verify tasks are distributed across multiple clusters
assert.Greater(t, len(clusterTaskCounts), 1, "Tasks should be distributed across multiple clusters")
}