blob: bfff1caf8702aaaf7f3a125bc32d2dc18f3b86ca [file] [log] [blame]
package integration
import (
"context"
"fmt"
"testing"
"time"
"github.com/apache/airavata/scheduler/core/domain"
"github.com/apache/airavata/scheduler/tests/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestAdapterE2E_CompleteWorkflow(t *testing.T) {
// Check required services are available before starting
checker := testutil.NewServiceChecker()
services := []struct {
name string
check func() error
}{
{"SLURM", checker.CheckSLURMService},
{"SSH", checker.CheckSSHService},
{"SFTP", checker.CheckSFTPService},
{"MinIO", checker.CheckMinIOService},
}
for _, svc := range services {
if err := svc.check(); err != nil {
t.Fatalf("Required service %s not available: %v", svc.name, err)
}
}
suite := testutil.SetupIntegrationTest(t)
defer suite.Cleanup()
// Services are already running and verified by SetupIntegrationTest
// Services are already verified by service checks above
// Inject SSH keys into all containers
var err error
err = suite.InjectSSHKeys("airavata-scheduler-slurm-node-01-01-1", "airavata-scheduler-slurm-node-02-01-1", "airavata-scheduler-baremetal-node-1-1")
require.NoError(t, err)
// 2. Register all compute and storage resources
slurmClusters, err := suite.RegisterAllSlurmClusters()
require.NoError(t, err)
require.Len(t, slurmClusters, 2)
baremetal, err := suite.RegisterBaremetalResource("baremetal", "localhost:2225")
require.NoError(t, err)
s3, err := suite.RegisterS3Resource("minio", "localhost:9000")
require.NoError(t, err)
sftp, err := suite.RegisterSFTPResource("sftp", "localhost:2222")
require.NoError(t, err)
// 3. Upload input data to storage
inputData := []byte("Hello from E2E test - input data")
err = suite.UploadFile(s3.ID, "input.txt", inputData)
require.NoError(t, err)
err = suite.UploadFile(sftp.ID, "input.txt", inputData)
require.NoError(t, err)
// 4. Create experiments for different compute resources
experiments := make([]*domain.Experiment, 0)
// SLURM experiment
slurmReq := &domain.CreateExperimentRequest{
Name: "e2e-slurm-test",
Description: "E2E test for SLURM cluster",
ProjectID: suite.TestProject.ID,
CommandTemplate: "(echo 'Processing on SLURM' && cat /tmp/input.txt && echo 'SLURM task completed') > output.txt 2>&1",
Parameters: []domain.ParameterSet{
{
Values: map[string]string{
"param1": "slurm-value",
},
},
},
Requirements: &domain.ResourceRequirements{
CPUCores: 1,
MemoryMB: 1024,
DiskGB: 1,
Walltime: "0:10:00",
},
Metadata: map[string]interface{}{
"input_files": []map[string]interface{}{
{
"path": "/tmp/input.txt",
"size": int64(len(inputData)),
"checksum": "test-checksum-slurm",
},
},
},
}
slurmExp, err := suite.OrchestratorSvc.CreateExperiment(context.Background(), slurmReq, suite.TestUser.ID)
require.NoError(t, err)
experiments = append(experiments, slurmExp.Experiment)
// Bare metal experiment
baremetalReq := &domain.CreateExperimentRequest{
Name: "e2e-baremetal-test",
Description: "E2E test for bare metal",
ProjectID: suite.TestProject.ID,
CommandTemplate: "(echo 'Processing on bare metal' && cat /tmp/input.txt && echo 'Bare metal task completed') > output.txt 2>&1",
Parameters: []domain.ParameterSet{
{
Values: map[string]string{
"param1": "baremetal-value",
},
},
},
Requirements: &domain.ResourceRequirements{
CPUCores: 1,
MemoryMB: 512,
DiskGB: 1,
},
Metadata: map[string]interface{}{
"input_files": []map[string]interface{}{
{
"path": "/tmp/input.txt",
"size": int64(len(inputData)),
"checksum": "test-checksum-baremetal",
},
},
},
}
baremetalExp, err := suite.OrchestratorSvc.CreateExperiment(context.Background(), baremetalReq, suite.TestUser.ID)
require.NoError(t, err)
experiments = append(experiments, baremetalExp.Experiment)
// 5. Stage input data to compute resources BEFORE submitting tasks
// For SLURM
err = suite.StageInputFileToComputeResource(slurmClusters[0].ID, "/tmp/input.txt", inputData)
require.NoError(t, err)
// For Bare Metal
err = suite.StageInputFileToComputeResource(baremetal.ID, "/tmp/input.txt", inputData)
require.NoError(t, err)
// 6. Real task execution with worker binary staging
for i, exp := range experiments {
t.Logf("Processing experiment %d: %s", i, exp.ID)
// First submit the experiment to generate tasks
err = suite.SubmitExperiment(exp)
require.NoError(t, err)
tasks, _, err := suite.DB.Repo.ListTasksByExperiment(context.Background(), exp.ID, 1, 0)
require.NoError(t, err)
require.Len(t, tasks, 1)
task := tasks[0]
t.Logf("Processing task: %s", task.ID)
// 1. Create task directory FIRST (before submitting to cluster)
var computeResource *domain.ComputeResource
if exp.ID == slurmExp.Experiment.ID {
computeResource = slurmClusters[0]
} else {
computeResource = baremetal
}
workDir, err := suite.CreateTaskDirectory(task.ID, computeResource.ID)
require.NoError(t, err)
t.Logf("Created task directory: %s", workDir)
// 2. Stage worker binary
err = suite.StageWorkerBinary(computeResource.ID, task.ID)
require.NoError(t, err)
t.Logf("Staged worker binary for task %s", task.ID)
// Add delay to avoid SSH connection limits
time.Sleep(3 * time.Second)
// 3. Submit task to cluster (now that work_dir is set)
// Get the updated task from database to include work_dir metadata
updatedTask, err := suite.DB.Repo.GetTaskByID(context.Background(), task.ID)
require.NoError(t, err)
if exp.ID == slurmExp.Experiment.ID {
err = suite.SubmitTaskToCluster(updatedTask, slurmClusters[0])
} else {
err = suite.SubmitTaskToCluster(updatedTask, baremetal)
}
require.NoError(t, err)
// 4. Start task monitoring for real status updates
err = suite.StartTaskMonitoring(task.ID)
require.NoError(t, err)
t.Logf("Started task monitoring for %s", task.ID)
// 5. Wait for actual task completion
err = suite.WaitForTaskState(task.ID, domain.TaskStatusCompleted, 3*time.Minute)
require.NoError(t, err, "Task %s should complete", task.ID)
// 6. Retrieve output from task directory
output, err := suite.GetTaskOutputFromWorkDir(task.ID)
require.NoError(t, err)
assert.Contains(t, output, "completed", "Output should contain completion message")
}
// 7. Verify task outputs
for _, exp := range experiments {
tasks, _, err := suite.DB.Repo.ListTasksByExperiment(context.Background(), exp.ID, 1, 0)
require.NoError(t, err)
require.Len(t, tasks, 1)
output, err := suite.GetTaskOutputFromWorkDir(tasks[0].ID)
require.NoError(t, err)
assert.Contains(t, output, "Processing on", "Task output should contain processing message")
assert.Contains(t, output, "task completed", "Task output should contain completion message")
}
// 8. Download and verify output data from storage
downloadedS3, err := suite.DownloadFile(s3.ID, "input.txt")
require.NoError(t, err)
assert.Equal(t, inputData, downloadedS3, "S3 download should match uploaded data")
downloadedSFTP, err := suite.DownloadFile(sftp.ID, "input.txt")
require.NoError(t, err)
assert.Equal(t, inputData, downloadedSFTP, "SFTP download should match uploaded data")
t.Log("E2E workflow test completed successfully")
}
func TestAdapterE2E_MultiClusterDistribution(t *testing.T) {
// Check required services are available before starting
checker := testutil.NewServiceChecker()
services := []struct {
name string
check func() error
}{
{"SLURM", checker.CheckSLURMService},
{"SSH", checker.CheckSSHService},
}
for _, svc := range services {
if err := svc.check(); err != nil {
t.Fatalf("Required service %s not available: %v", svc.name, err)
}
}
suite := testutil.SetupIntegrationTest(t)
defer suite.Cleanup()
// Services are already running and verified by SetupIntegrationTest
// Services are already verified by service checks above
// Inject SSH keys into all containers
var err error
err = suite.InjectSSHKeys("airavata-scheduler-slurm-node-01-01-1", "airavata-scheduler-slurm-node-02-01-1")
require.NoError(t, err)
// Register all SLURM clusters
clusters, err := suite.RegisterAllSlurmClusters()
require.NoError(t, err)
require.Len(t, clusters, 2)
// Create multiple experiments
numExperiments := 9
var experiments []*domain.Experiment
for i := 0; i < numExperiments; i++ {
req := &domain.CreateExperimentRequest{
Name: fmt.Sprintf("multi-cluster-exp-%d", i),
Description: fmt.Sprintf("Multi-cluster experiment %d", i),
ProjectID: suite.TestProject.ID,
CommandTemplate: fmt.Sprintf("echo 'Task %d on cluster' && sleep 2", i),
Parameters: []domain.ParameterSet{
{
Values: map[string]string{
"param1": fmt.Sprintf("value%d", i),
},
},
},
Requirements: &domain.ResourceRequirements{
CPUCores: 1,
MemoryMB: 1024,
DiskGB: 1,
Walltime: "0:05:00",
},
}
exp, err := suite.OrchestratorSvc.CreateExperiment(context.Background(), req, suite.TestUser.ID)
require.NoError(t, err)
experiments = append(experiments, exp.Experiment)
}
// Submit experiments to different clusters (round-robin)
for i, exp := range experiments {
cluster := clusters[i%len(clusters)]
err := suite.SubmitToCluster(exp, cluster)
require.NoError(t, err)
}
// Real task execution for all experiments
for _, exp := range experiments {
tasks, _, err := suite.DB.Repo.ListTasksByExperiment(context.Background(), exp.ID, 1, 0)
require.NoError(t, err)
require.Len(t, tasks, 1)
task := tasks[0]
// Create task directory and stage worker binary
computeResource, err := suite.GetComputeResourceFromTask(task)
require.NoError(t, err)
_, err = suite.CreateTaskDirectory(task.ID, computeResource.ID)
require.NoError(t, err)
err = suite.StageWorkerBinary(task.ComputeResourceID, task.ID)
require.NoError(t, err)
// Start task monitoring and wait for completion
err = suite.StartTaskMonitoring(task.ID)
require.NoError(t, err)
err = suite.WaitForTaskState(task.ID, domain.TaskStatusCompleted, 3*time.Minute)
require.NoError(t, err, "Task %s did not complete", task.ID)
}
// Verify distribution across clusters
clusterTaskCounts := make(map[string]int)
for _, exp := range experiments {
tasks, _, err := suite.DB.Repo.ListTasksByExperiment(context.Background(), exp.ID, 1, 0)
require.NoError(t, err)
require.Len(t, tasks, 1)
if tasks[0].ComputeResourceID != "" {
clusterTaskCounts[tasks[0].ComputeResourceID]++
}
}
t.Logf("Task distribution across clusters: %v", clusterTaskCounts)
// Verify tasks are distributed across all clusters
assert.Equal(t, 2, len(clusterTaskCounts), "Tasks should be distributed across all 2 clusters")
// Each cluster should have at least 4 tasks (9 tasks / 2 clusters = 4.5 tasks each)
for clusterID, count := range clusterTaskCounts {
assert.GreaterOrEqual(t, count, 4, "Cluster %s should have at least 4 tasks", clusterID)
}
}
func TestAdapterE2E_FailureRecovery(t *testing.T) {
// Check required services are available before starting
checker := testutil.NewServiceChecker()
services := []struct {
name string
check func() error
}{
{"SLURM", checker.CheckSLURMService},
{"SSH", checker.CheckSSHService},
}
for _, svc := range services {
if err := svc.check(); err != nil {
t.Fatalf("Required service %s not available: %v", svc.name, err)
}
}
suite := testutil.SetupIntegrationTest(t)
defer suite.Cleanup()
// Services are already running and verified by SetupIntegrationTest
// Services are already verified by service checks above
// Inject SSH keys into container
var err error
err = suite.InjectSSHKeys("airavata-scheduler-slurm-node-01-01-1")
require.NoError(t, err)
// Register SLURM cluster
cluster, err := suite.RegisterSlurmResource("cluster-1", "localhost:6817")
require.NoError(t, err)
// Create experiment with a command that will fail
req := &domain.CreateExperimentRequest{
Name: "failure-recovery-test",
Description: "Test failure recovery",
ProjectID: suite.TestProject.ID,
CommandTemplate: "echo 'Starting task' && exit 1", // This will fail
Parameters: []domain.ParameterSet{
{
Values: map[string]string{
"param1": "test-value",
},
},
},
Requirements: &domain.ResourceRequirements{
CPUCores: 1,
MemoryMB: 1024,
DiskGB: 1,
Walltime: "0:05:00",
},
}
exp, err := suite.OrchestratorSvc.CreateExperiment(context.Background(), req, suite.TestUser.ID)
require.NoError(t, err)
// Submit to cluster
err = suite.SubmitToCluster(exp.Experiment, cluster)
require.NoError(t, err)
// Real task execution that should fail
tasks, _, err := suite.DB.Repo.ListTasksByExperiment(context.Background(), exp.Experiment.ID, 1, 0)
require.NoError(t, err)
require.Len(t, tasks, 1)
task := tasks[0]
// Create task directory and stage worker binary
computeResource, err := suite.GetComputeResourceFromTask(task)
require.NoError(t, err)
_, err = suite.CreateTaskDirectory(task.ID, computeResource.ID)
require.NoError(t, err)
err = suite.StageWorkerBinary(task.ComputeResourceID, task.ID)
require.NoError(t, err)
// Start task monitoring and wait for failure
err = suite.StartTaskMonitoring(task.ID)
require.NoError(t, err)
err = suite.WaitForTaskState(task.ID, domain.TaskStatusFailed, 2*time.Minute)
require.NoError(t, err, "Task should have failed")
// Verify task status
failedTask, err := suite.DB.Repo.GetTaskByID(context.Background(), tasks[0].ID)
require.NoError(t, err)
assert.Equal(t, domain.TaskStatusFailed, failedTask.Status, "Task status should be failed")
// Create a new experiment with a successful command
successReq := &domain.CreateExperimentRequest{
Name: "recovery-success-test",
Description: "Test successful recovery",
ProjectID: suite.TestProject.ID,
CommandTemplate: "echo 'Recovery task succeeded' && sleep 2",
Parameters: []domain.ParameterSet{
{
Values: map[string]string{
"param1": "recovery-value",
},
},
},
Requirements: &domain.ResourceRequirements{
CPUCores: 1,
MemoryMB: 1024,
DiskGB: 1,
Walltime: "0:05:00",
},
}
successExp, err := suite.OrchestratorSvc.CreateExperiment(context.Background(), successReq, suite.TestUser.ID)
require.NoError(t, err)
// Submit successful experiment
err = suite.SubmitToCluster(successExp.Experiment, cluster)
require.NoError(t, err)
// Real successful task execution
successTasks, _, err := suite.DB.Repo.ListTasksByExperiment(context.Background(), successExp.Experiment.ID, 1, 0)
require.NoError(t, err)
require.Len(t, successTasks, 1)
task = successTasks[0]
// Create task directory and stage worker binary
computeResource, err = suite.GetComputeResourceFromTask(task)
require.NoError(t, err)
_, err = suite.CreateTaskDirectory(task.ID, computeResource.ID)
require.NoError(t, err)
err = suite.StageWorkerBinary(task.ComputeResourceID, task.ID)
require.NoError(t, err)
// Start task monitoring and wait for success
err = suite.StartTaskMonitoring(task.ID)
require.NoError(t, err)
err = suite.WaitForTaskState(task.ID, domain.TaskStatusCompleted, 3*time.Minute)
require.NoError(t, err, "Recovery task should succeed")
// Verify recovery task completed successfully
recoveryTask, err := suite.DB.Repo.GetTaskByID(context.Background(), successTasks[0].ID)
require.NoError(t, err)
assert.Equal(t, domain.TaskStatusCompleted, recoveryTask.Status, "Recovery task should be completed")
t.Log("Failure recovery test completed successfully")
}
func TestAdapterE2E_DataPipeline(t *testing.T) {
// Check required services are available before starting
checker := testutil.NewServiceChecker()
services := []struct {
name string
check func() error
}{
{"SLURM", checker.CheckSLURMService},
{"SSH", checker.CheckSSHService},
{"SFTP", checker.CheckSFTPService},
{"MinIO", checker.CheckMinIOService},
}
for _, svc := range services {
if err := svc.check(); err != nil {
t.Fatalf("Required service %s not available: %v", svc.name, err)
}
}
suite := testutil.SetupIntegrationTest(t)
defer suite.Cleanup()
// Services are already running and verified by SetupIntegrationTest
// Services are already verified by service checks above
// Inject SSH keys into container
var err error
err = suite.InjectSSHKeys("airavata-scheduler-slurm-node-01-01-1")
require.NoError(t, err)
// Register resources
cluster, err := suite.RegisterSlurmResource("cluster-1", "localhost:6817")
require.NoError(t, err)
s3, err := suite.RegisterS3Resource("minio", "localhost:9000")
require.NoError(t, err)
sftp, err := suite.RegisterSFTPResource("sftp", "localhost:2222")
require.NoError(t, err)
// Stage input data to both storage systems
inputData := []byte("Input data for pipeline processing")
err = suite.UploadFile(s3.ID, "pipeline-input.txt", inputData)
require.NoError(t, err)
err = suite.UploadFile(sftp.ID, "pipeline-input.txt", inputData)
require.NoError(t, err)
// Create experiment that processes data from storage
req := &domain.CreateExperimentRequest{
Name: "data-pipeline-test",
Description: "Test data pipeline processing",
ProjectID: suite.TestProject.ID,
CommandTemplate: "echo 'Processing pipeline data' && echo 'Input data for pipeline processing' > /tmp/pipeline-input.txt && cat /tmp/pipeline-input.txt && echo 'Pipeline processing completed' > /tmp/pipeline-output.txt",
Parameters: []domain.ParameterSet{
{
Values: map[string]string{
"param1": "pipeline-value",
},
},
},
Requirements: &domain.ResourceRequirements{
CPUCores: 1,
MemoryMB: 1024,
DiskGB: 1,
Walltime: "0:10:00",
},
}
exp, err := suite.OrchestratorSvc.CreateExperiment(context.Background(), req, suite.TestUser.ID)
require.NoError(t, err)
// Submit to cluster
err = suite.SubmitToCluster(exp.Experiment, cluster)
require.NoError(t, err)
// Real task execution
tasks, _, err := suite.DB.Repo.ListTasksByExperiment(context.Background(), exp.Experiment.ID, 1, 0)
require.NoError(t, err)
require.Len(t, tasks, 1)
task := tasks[0]
// Create task directory and stage worker binary
computeResource, err := suite.GetComputeResourceFromTask(task)
require.NoError(t, err)
_, err = suite.CreateTaskDirectory(task.ID, computeResource.ID)
require.NoError(t, err)
err = suite.StageWorkerBinary(task.ComputeResourceID, task.ID)
require.NoError(t, err)
// Start task monitoring and wait for completion
err = suite.StartTaskMonitoring(task.ID)
require.NoError(t, err)
err = suite.WaitForTaskState(task.ID, domain.TaskStatusCompleted, 3*time.Minute)
require.NoError(t, err, "Pipeline task should complete")
// Verify task output
output, err := suite.GetTaskOutputFromWorkDir(tasks[0].ID)
require.NoError(t, err)
assert.Contains(t, output, "Processing pipeline data", "Output should contain processing message")
assert.Contains(t, output, "Pipeline processing completed", "Output should contain completion message")
// Verify input data is still accessible
downloadedS3, err := suite.DownloadFile(s3.ID, "pipeline-input.txt")
require.NoError(t, err)
assert.Equal(t, inputData, downloadedS3, "S3 input data should be preserved")
downloadedSFTP, err := suite.DownloadFile(sftp.ID, "pipeline-input.txt")
require.NoError(t, err)
assert.Equal(t, inputData, downloadedSFTP, "SFTP input data should be preserved")
t.Log("Data pipeline test completed successfully")
}