blob: 0c41ef3b8656b1397c52efcd83cac577bce82bce [file] [log] [blame]
package integration
import (
"context"
"testing"
"time"
"github.com/apache/airavata/scheduler/core/domain"
"github.com/apache/airavata/scheduler/tests/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestMultiRuntime_SlurmKubernetesBareMetal(t *testing.T) {
// Check required services are available before starting
checker := testutil.NewServiceChecker()
services := []struct {
name string
check func() error
}{
{"SLURM", checker.CheckSLURMService},
{"SSH", checker.CheckSSHService},
{"SFTP", checker.CheckSFTPService},
{"MinIO", checker.CheckMinIOService},
}
for _, svc := range services {
if err := svc.check(); err != nil {
t.Fatalf("Required service %s not available: %v", svc.name, err)
}
}
suite := testutil.SetupIntegrationTest(t)
defer suite.Cleanup()
// Services are already verified by service checks above
// Register all three compute types
slurm, err := suite.RegisterSlurmResource("cluster-1", "localhost:6817")
require.NoError(t, err)
k8s, err := suite.RegisterKubernetesResource("docker-desktop-k8s")
require.NoError(t, err)
assert.NotNil(t, k8s)
bare, err := suite.RegisterBareMetalResource("baremetal", "localhost:2225")
require.NoError(t, err)
assert.NotNil(t, bare)
// Create experiments on each runtime
experiments := []struct {
name string
resource *domain.ComputeResource
command string
}{
{"slurm-exp", slurm, "squeue && echo 'SLURM test completed'"},
{"k8s-exp", k8s, "kubectl version --client && echo 'K8s test completed'"},
{"bare-exp", bare, "uname -a && echo 'Bare metal test completed'"},
}
for _, exp := range experiments {
t.Run(exp.name, func(t *testing.T) {
// Start gRPC server for worker communication
_, grpcAddr := suite.StartGRPCServer(t)
t.Logf("Started gRPC server at %s", grpcAddr)
e, err := suite.CreateExperimentOnResource(exp.name, exp.command, exp.resource.ID)
require.NoError(t, err)
assert.NotNil(t, e)
// Note: CreateExperimentOnResource already submits the experiment
// Get tasks for this experiment
tasks, _, err := suite.DB.Repo.ListTasksByExperiment(context.Background(), e.ID, 1, 0)
require.NoError(t, err)
require.Len(t, tasks, 1)
task := tasks[0]
// Wait for task to be assigned to a compute resource
assignedTask, err := suite.WaitForTaskAssignment(task.ID, 30*time.Second)
require.NoError(t, err)
require.NotEmpty(t, assignedTask.ComputeResourceID)
task = assignedTask
// Spawn worker for this experiment
worker, workerCmd, err := suite.SpawnWorkerForExperiment(t, e.ID, task.ComputeResourceID)
require.NoError(t, err)
defer suite.TerminateWorker(workerCmd)
// Wait for worker to register and become idle
err = suite.WaitForWorkerIdle(worker.ID, 20*time.Second)
require.NoError(t, err)
t.Logf("Worker %s is ready", worker.ID)
// Wait for task to progress through all expected state transitions
expectedStates := []domain.TaskStatus{
domain.TaskStatusCreated,
domain.TaskStatusQueued,
domain.TaskStatusDataStaging,
domain.TaskStatusEnvSetup,
domain.TaskStatusRunning,
domain.TaskStatusOutputStaging,
domain.TaskStatusCompleted,
}
observedStates, err := suite.WaitForTaskStateTransitions(task.ID, expectedStates, 3*time.Minute)
require.NoError(t, err, "Task %s should complete with proper state transitions", task.ID)
t.Logf("Task %s completed with state transitions: %v", task.ID, observedStates)
// Retrieve output from task directory
output, err := suite.GetTaskOutputFromWorkDir(task.ID)
require.NoError(t, err)
assert.Contains(t, output, "test completed")
})
}
}
func TestMultiRuntime_ResourceSpecificFeatures(t *testing.T) {
// Check required services are available before starting
checker := testutil.NewServiceChecker()
services := []struct {
name string
check func() error
}{
{"SLURM", checker.CheckSLURMService},
{"SSH", checker.CheckSSHService},
{"SFTP", checker.CheckSFTPService},
{"MinIO", checker.CheckMinIOService},
}
for _, svc := range services {
if err := svc.check(); err != nil {
t.Fatalf("Required service %s not available: %v", svc.name, err)
}
}
suite := testutil.SetupIntegrationTest(t)
defer suite.Cleanup()
// Services are already verified by service checks above
// Test SLURM-specific features
t.Run("SLURM_JobSubmission", func(t *testing.T) {
slurm, err := suite.RegisterSlurmResource("cluster-1", "localhost:6817")
require.NoError(t, err)
exp, err := suite.CreateExperimentOnResource("slurm-job", "sbatch --wrap='echo SLURM job submitted'", slurm.ID)
require.NoError(t, err)
task, err := suite.GetFirstTask(exp.ID)
require.NoError(t, err)
err = suite.WaitForTaskCompletion(task.ID, 30*time.Second)
require.NoError(t, err)
output, err := suite.GetTaskOutput(task.ID)
require.NoError(t, err)
assert.Contains(t, output, "SLURM job submitted")
})
// Test Kubernetes-specific features
t.Run("Kubernetes_PodCreation", func(t *testing.T) {
k8s, err := suite.RegisterKubernetesResource("docker-desktop-k8s")
require.NoError(t, err)
exp, err := suite.CreateExperimentOnResource("k8s-pod", "kubectl run test-pod --image=busybox --rm --restart=Never -- echo 'K8s pod created'", k8s.ID)
require.NoError(t, err)
task, err := suite.GetFirstTask(exp.ID)
require.NoError(t, err)
err = suite.WaitForTaskCompletion(task.ID, 30*time.Second)
require.NoError(t, err)
output, err := suite.GetTaskOutput(task.ID)
require.NoError(t, err)
assert.Contains(t, output, "K8s pod created")
})
// Test bare metal-specific features
t.Run("BareMetal_SystemInfo", func(t *testing.T) {
bare, err := suite.RegisterBareMetalResource("baremetal", "localhost:2225")
require.NoError(t, err)
exp, err := suite.CreateExperimentOnResource("baremetal-info", "cat /etc/os-release && free -h", bare.ID)
require.NoError(t, err)
// Experiment is already submitted by CreateExperimentOnResource
// Real task execution with worker binary staging
tasks, _, err := suite.DB.Repo.ListTasksByExperiment(context.Background(), exp.ID, 1, 0)
require.NoError(t, err)
require.Len(t, tasks, 1)
task := tasks[0]
// 1. Create task directory
workDir, err := suite.CreateTaskDirectory(task.ID, task.ComputeResourceID)
require.NoError(t, err)
t.Logf("Created task directory: %s", workDir)
// 2. Stage worker binary
err = suite.StageWorkerBinary(task.ComputeResourceID, task.ID)
require.NoError(t, err)
t.Logf("Staged worker binary for task %s", task.ID)
// 3. Start task monitoring for real status updates
err = suite.StartTaskMonitoring(task.ID)
require.NoError(t, err)
t.Logf("Started task monitoring for %s", task.ID)
// 4. Wait for actual task completion
err = suite.WaitForTaskState(task.ID, domain.TaskStatusCompleted, 2*time.Minute)
require.NoError(t, err, "Task %s should complete", task.ID)
// 5. Retrieve output from task directory
output, err := suite.GetTaskOutputFromWorkDir(task.ID)
require.NoError(t, err)
assert.Contains(t, output, "Ubuntu")
})
}
func TestMultiRuntime_ConcurrentExecution(t *testing.T) {
suite := testutil.SetupIntegrationTest(t)
defer suite.Cleanup()
// Services are already verified by service checks above
// Register multiple resources
slurm1, err := suite.RegisterSlurmResource("cluster-1", "localhost:6817")
require.NoError(t, err)
slurm2, err := suite.RegisterSlurmResource("cluster-2", "localhost:6819")
require.NoError(t, err)
bare, err := suite.RegisterBareMetalResource("baremetal", "localhost:2224")
require.NoError(t, err)
// Create concurrent experiments across different runtimes
experiments := []struct {
name string
resource *domain.ComputeResource
command string
}{
{"concurrent-slurm1", slurm1, "echo 'SLURM cluster 1 concurrent test' && sleep 5"},
{"concurrent-slurm2", slurm2, "echo 'SLURM cluster 2 concurrent test' && sleep 5"},
{"concurrent-bare", bare, "echo 'Bare metal concurrent test' && sleep 5"},
}
// Start all experiments concurrently
startTime := time.Now()
for _, exp := range experiments {
e, err := suite.CreateExperimentOnResource(exp.name, exp.command, exp.resource.ID)
require.NoError(t, err)
assert.NotNil(t, e)
}
// Wait for all experiments to complete
for _, exp := range experiments {
t.Run(exp.name, func(t *testing.T) {
e, err := suite.GetExperimentByName(exp.name)
require.NoError(t, err)
task, err := suite.GetFirstTask(e.ID)
require.NoError(t, err)
err = suite.WaitForTaskCompletion(task.ID, 30*time.Second)
require.NoError(t, err)
output, err := suite.GetTaskOutput(task.ID)
require.NoError(t, err)
assert.Contains(t, output, "concurrent test")
})
}
// Verify all experiments completed within reasonable time
totalTime := time.Since(startTime)
assert.Less(t, totalTime, 3*time.Minute, "Concurrent execution should complete within 3 minutes")
}
func TestMultiRuntime_ResourceFailover(t *testing.T) {
suite := testutil.SetupIntegrationTest(t)
defer suite.Cleanup()
// Services are already verified by service checks above
// Register two SLURM clusters
slurm1, err := suite.RegisterSlurmResource("cluster-1", "localhost:6817")
require.NoError(t, err)
slurm2, err := suite.RegisterSlurmResource("cluster-2", "localhost:6819")
require.NoError(t, err)
// Create experiment that will fail on first cluster
exp, err := suite.CreateExperimentOnResource("failover-test", "echo 'Failover test completed'", slurm1.ID)
require.NoError(t, err)
task, err := suite.GetFirstTask(exp.ID)
require.NoError(t, err)
// Simulate first cluster failure by stopping it
err = suite.StopService("slurm-cluster-1")
require.NoError(t, err)
// Wait for task to be retried on second cluster
time.Sleep(2 * time.Minute)
// Verify task completed on second cluster (slurm2)
completedTask, err := suite.GetTask(task.ID)
require.NoError(t, err)
assert.Equal(t, domain.TaskStatusCompleted, completedTask.Status)
// Verify the task was retried on the second cluster
assert.Equal(t, slurm2.ID, completedTask.ComputeResourceID)
output, err := suite.GetTaskOutput(task.ID)
require.NoError(t, err)
assert.Contains(t, output, "Failover test completed")
}
func TestMultiRuntime_ResourceCapacity(t *testing.T) {
suite := testutil.SetupIntegrationTest(t)
defer suite.Cleanup()
// Services are already verified by service checks above
// Register resources with different capacities
slurm1, err := suite.RegisterSlurmResource("cluster-1", "localhost:6817")
require.NoError(t, err)
slurm2, err := suite.RegisterSlurmResource("cluster-2", "localhost:6819")
require.NoError(t, err)
bare, err := suite.RegisterBareMetalResource("baremetal", "localhost:2224")
require.NoError(t, err)
// Test resource capacity limits
capacityTests := []struct {
name string
resource *domain.ComputeResource
command string
timeout time.Duration
}{
{"slurm1-capacity", slurm1, "echo 'SLURM cluster 1 capacity test'", 2 * time.Minute},
{"slurm2-capacity", slurm2, "echo 'SLURM cluster 2 capacity test'", 2 * time.Minute},
{"bare-capacity", bare, "echo 'Bare metal capacity test'", 2 * time.Minute},
}
for _, test := range capacityTests {
t.Run(test.name, func(t *testing.T) {
exp, err := suite.CreateExperimentOnResource(test.name, test.command, test.resource.ID)
require.NoError(t, err)
task, err := suite.GetFirstTask(exp.ID)
require.NoError(t, err)
err = suite.WaitForTaskCompletion(task.ID, test.timeout)
require.NoError(t, err)
output, err := suite.GetTaskOutput(task.ID)
require.NoError(t, err)
assert.Contains(t, output, "capacity test")
})
}
}
func TestMultiRuntime_CrossRuntimeDataSharing(t *testing.T) {
suite := testutil.SetupIntegrationTest(t)
defer suite.Cleanup()
// Services are already verified by service checks above
// Register resources
slurm, err := suite.RegisterSlurmResource("cluster-1", "localhost:6817")
require.NoError(t, err)
bare, err := suite.RegisterBareMetalResource("baremetal", "localhost:2224")
require.NoError(t, err)
// Create experiment on SLURM that generates data
exp1, err := suite.CreateExperimentOnResource("data-generator", "echo 'shared data content' > shared-data.txt", slurm.ID)
require.NoError(t, err)
task1, err := suite.GetFirstTask(exp1.ID)
require.NoError(t, err)
err = suite.WaitForTaskCompletion(task1.ID, 30*time.Second)
require.NoError(t, err)
// Create experiment on bare metal that consumes the data
exp2, err := suite.CreateExperimentOnResource("data-consumer", "cat shared-data.txt", bare.ID)
require.NoError(t, err)
task2, err := suite.GetFirstTask(exp2.ID)
require.NoError(t, err)
err = suite.WaitForTaskCompletion(task2.ID, 30*time.Second)
require.NoError(t, err)
// Verify data sharing worked
output, err := suite.GetTaskOutput(task2.ID)
require.NoError(t, err)
assert.Contains(t, output, "shared data content")
}
func TestMultiRuntime_ResourceHealthMonitoring(t *testing.T) {
suite := testutil.SetupIntegrationTest(t)
defer suite.Cleanup()
// Services are already verified by service checks above
// Register resources - use the same SLURM controller for both tests
slurm1, err := suite.RegisterSlurmResource("cluster-1", "localhost:6817")
require.NoError(t, err)
slurm2, err := suite.RegisterSlurmResource("cluster-2", "localhost:6817") // Use same controller
require.NoError(t, err)
// Test resource health monitoring
healthTests := []struct {
name string
resource *domain.ComputeResource
command string
}{
{"slurm1-health", slurm1, "scontrol ping && echo 'SLURM cluster 1 healthy'"},
{"slurm2-health", slurm2, "scontrol ping && echo 'SLURM cluster 2 healthy'"},
}
for _, test := range healthTests {
t.Run(test.name, func(t *testing.T) {
exp, err := suite.CreateExperimentOnResource(test.name, test.command, test.resource.ID)
require.NoError(t, err)
task, err := suite.GetFirstTask(exp.ID)
require.NoError(t, err)
// Wait for task to be assigned to a compute resource
assignedTask, err := suite.WaitForTaskAssignment(task.ID, 30*time.Second)
require.NoError(t, err)
require.NotEmpty(t, assignedTask.ComputeResourceID)
task = assignedTask
err = suite.WaitForTaskCompletion(task.ID, 30*time.Second)
require.NoError(t, err)
output, err := suite.GetTaskOutput(task.ID)
require.NoError(t, err)
assert.Contains(t, output, "healthy")
})
}
// Test resource failure detection
t.Run("ResourceFailureDetection", func(t *testing.T) {
// Stop one cluster
err = suite.StopService("slurm-cluster-1")
require.NoError(t, err)
// Create experiment that should fail on stopped cluster
exp, err := suite.CreateExperimentOnResource("failure-test", "echo 'This should fail'", slurm1.ID)
require.NoError(t, err)
task, err := suite.GetFirstTask(exp.ID)
require.NoError(t, err)
// Wait for failure detection and retry
time.Sleep(2 * time.Minute)
// Verify task was retried on healthy cluster
completedTask, err := suite.GetTask(task.ID)
require.NoError(t, err)
assert.Equal(t, domain.TaskStatusCompleted, completedTask.Status)
})
}