The Airavata Scheduler uses a distributed worker architecture where standalone worker binaries communicate with the scheduler via gRPC. This design enables scalable, fault-tolerant task execution across multiple compute resources.
┌─────────────────────────────────────────────────────────────────┐
│ Scheduler Server │
├─────────────────────────────────────────────────────────────────┤
│ gRPC Server (Port 50051) │
│ ├── WorkerService (generated from proto/worker.proto) │
│ ├── Task Assignment │
│ ├── Status Monitoring │
│ └── Heartbeat Management │
└─────────────────────────────────────────────────────────────────┘
│
│ gRPC
▼
┌─────────────────────────────────────────────────────────────────┐
│ Worker Binary │
├─────────────────────────────────────────────────────────────────┤
│ gRPC Client │
│ ├── Task Polling │
│ ├── Status Reporting │
│ └── Heartbeat Sending │
├─────────────────────────────────────────────────────────────────┤
│ Task Execution Engine │
│ ├── Script Generation │
│ ├── Data Staging │
│ ├── Command Execution │
│ └── Result Collection │
└─────────────────────────────────────────────────────────────────┘
Workers are deployed to compute resources using runtime-specific scripts:
# SLURM deployment sbatch worker_spawn_script.sh # Kubernetes deployment kubectl apply -f worker_job.yaml # Bare metal deployment ssh compute-node 'bash -s' < worker_script.sh
Upon startup, workers connect to the scheduler gRPC server:
// Worker registration conn, err := grpc.Dial("scheduler:50051", grpc.WithInsecure()) client := workerpb.NewWorkerServiceClient(conn) // Register with scheduler resp, err := client.RegisterWorker(ctx, &workerpb.RegisterWorkerRequest{ WorkerId: workerID, Capabilities: capabilities, Status: workerpb.WorkerStatus_AVAILABLE, })
Workers continuously poll for available tasks:
// Poll for tasks for { resp, err := client.PollForTask(ctx, &workerpb.PollForTaskRequest{ WorkerId: workerID, Capabilities: capabilities, }) if resp.Task != nil { // Execute task executeTask(resp.Task) } time.Sleep(pollInterval) }
Workers execute assigned tasks with proper isolation:
func executeTask(task *workerpb.Task) error { // Update status to running client.UpdateTaskStatus(ctx, &workerpb.UpdateTaskStatusRequest{ TaskId: task.Id, Status: workerpb.TaskStatus_RUNNING, }) // Stage input files for _, input := range task.InputFiles { stageFile(input) } // Execute command cmd := exec.Command("bash", "-c", task.Command) output, err := cmd.CombinedOutput() // Update status status := workerpb.TaskStatus_COMPLETED if err != nil { status = workerpb.TaskStatus_FAILED } client.UpdateTaskStatus(ctx, &workerpb.UpdateTaskStatusRequest{ TaskId: task.Id, Status: status, Output: string(output), }) return err }
Workers report progress and completion status:
// Send heartbeat client.SendHeartbeat(ctx, &workerpb.HeartbeatRequest{ WorkerId: workerID, Status: workerpb.WorkerStatus_AVAILABLE, Metrics: &workerpb.WorkerMetrics{ CpuUsage: cpuUsage, MemoryUsage: memoryUsage, ActiveTasks: activeTaskCount, }, })
Workers clean up resources and report final status:
// Cleanup on shutdown client.UpdateWorkerStatus(ctx, &workerpb.UpdateWorkerStatusRequest{ WorkerId: workerID, Status: workerpb.WorkerStatus_TERMINATED, }) conn.Close()
The system generates runtime-specific scripts for deploying workers to different compute resources.
#!/bin/bash #SBATCH --job-name=worker_${WORKER_ID} #SBATCH --time=${WALLTIME} #SBATCH --cpus-per-task=${CPU_CORES} #SBATCH --mem=${MEMORY_MB} #SBATCH --partition=${QUEUE} #SBATCH --account=${ACCOUNT} # Set up environment export WORKER_ID="${WORKER_ID}" export EXPERIMENT_ID="${EXPERIMENT_ID}" export COMPUTE_RESOURCE_ID="${COMPUTE_RESOURCE_ID}" export WORKING_DIR="${WORKING_DIR}" export WORKER_BINARY_URL="${WORKER_BINARY_URL}" export SERVER_ADDRESS="${SERVER_ADDRESS}" export SERVER_PORT="${SERVER_PORT}" # Create working directory mkdir -p "${WORKING_DIR}" cd "${WORKING_DIR}" # Download worker binary echo "Downloading worker binary from ${WORKER_BINARY_URL}" curl -L "${WORKER_BINARY_URL}" -o worker chmod +x worker # Start worker echo "Starting worker with ID: ${WORKER_ID}" ./worker \ --server-address="${SERVER_ADDRESS}:${SERVER_PORT}" \ --worker-id="${WORKER_ID}" \ --working-dir="${WORKING_DIR}" \ --experiment-id="${EXPERIMENT_ID}" \ --compute-resource-id="${COMPUTE_RESOURCE_ID}" echo "Worker ${WORKER_ID} completed"
apiVersion: batch/v1 kind: Job metadata: name: worker-${WORKER_ID} namespace: airavata spec: template: spec: restartPolicy: Never containers: - name: worker image: worker-binary:latest command: ["./worker"] args: - "--server-address=${SERVER_ADDRESS}:${SERVER_PORT}" - "--worker-id=${WORKER_ID}" - "--working-dir=${WORKING_DIR}" - "--experiment-id=${EXPERIMENT_ID}" - "--compute-resource-id=${COMPUTE_RESOURCE_ID}" env: - name: WORKER_ID value: "${WORKER_ID}" - name: EXPERIMENT_ID value: "${EXPERIMENT_ID}" - name: COMPUTE_RESOURCE_ID value: "${COMPUTE_RESOURCE_ID}" - name: WORKING_DIR value: "${WORKING_DIR}" resources: requests: cpu: "${CPU_CORES}" memory: "${MEMORY_MB}Mi" limits: cpu: "${CPU_CORES}" memory: "${MEMORY_MB}Mi" volumeMounts: - name: worker-storage mountPath: "${WORKING_DIR}" volumes: - name: worker-storage emptyDir: {}
#!/bin/bash set -euo pipefail # Configuration WORKER_ID="${WORKER_ID}" EXPERIMENT_ID="${EXPERIMENT_ID}" COMPUTE_RESOURCE_ID="${COMPUTE_RESOURCE_ID}" WORKING_DIR="${WORKING_DIR}" WORKER_BINARY_URL="${WORKER_BINARY_URL}" SERVER_ADDRESS="${SERVER_ADDRESS}" SERVER_PORT="${SERVER_PORT}" WALLTIME_SECONDS="${WALLTIME_SECONDS}" # Create working directory mkdir -p "${WORKING_DIR}" cd "${WORKING_DIR}" # Download worker binary echo "Downloading worker binary from ${WORKER_BINARY_URL}" curl -L "${WORKER_BINARY_URL}" -o worker chmod +x worker # Set up signal handling for cleanup cleanup() { echo "Cleaning up worker ${WORKER_ID}" # Kill any running processes pkill -f "worker.*${WORKER_ID}" || true # Clean up working directory rm -rf "${WORKING_DIR}" || true } trap cleanup EXIT INT TERM # Start worker with timeout echo "Starting worker with ID: ${WORKER_ID}" timeout "${WALLTIME_SECONDS}" ./worker \ --server-address="${SERVER_ADDRESS}:${SERVER_PORT}" \ --worker-id="${WORKER_ID}" \ --working-dir="${WORKING_DIR}" \ --experiment-id="${EXPERIMENT_ID}" \ --compute-resource-id="${COMPUTE_RESOURCE_ID}" echo "Worker ${WORKER_ID} completed"
Workers are configured through environment variables and command-line flags:
# Required configuration --server-address=localhost:50051 # Scheduler gRPC server address --worker-id=worker_12345 # Unique worker identifier --working-dir=/tmp/worker # Working directory for tasks # Optional configuration --heartbeat-interval=30s # Heartbeat frequency --task-timeout=1h # Maximum task execution time --log-level=info # Logging level --experiment-id=exp_123 # Associated experiment ID --compute-resource-id=slurm_01 # Compute resource ID
# Worker configuration export WORKER_ID="worker_$(date +%s)_$$" export SERVER_ADDRESS="localhost:50051" export WORKING_DIR="/tmp/worker" export HEARTBEAT_INTERVAL="30s" export TASK_TIMEOUT="1h" export LOG_LEVEL="info" # Experiment context export EXPERIMENT_ID="exp_12345" export COMPUTE_RESOURCE_ID="slurm_cluster_01" # Network configuration export GRPC_KEEPALIVE_TIME="30s" export GRPC_KEEPALIVE_TIMEOUT="5s" export GRPC_KEEPALIVE_PERMIT_WITHOUT_STREAMS=true
Tasks are assigned to workers based on:
Input files are staged to workers before task execution:
func stageInputFiles(task *workerpb.Task) error { for _, input := range task.InputFiles { // Download from storage err := downloadFile(input.Source, input.Destination) if err != nil { return fmt.Errorf("failed to stage file %s: %w", input.Source, err) } } return nil }
Tasks are executed in isolated environments:
func executeCommand(command string, workingDir string) (*exec.Cmd, error) { cmd := exec.Command("bash", "-c", command) cmd.Dir = workingDir // Set up environment cmd.Env = append(os.Environ(), "WORKING_DIR="+workingDir, "TASK_ID="+taskID, ) // Set resource limits cmd.SysProcAttr = &syscall.SysProcAttr{ Setpgid: true, } return cmd, nil }
Output files are collected after task completion:
func collectOutputFiles(task *workerpb.Task) error { for _, output := range task.OutputFiles { // Upload to storage err := uploadFile(output.Source, output.Destination) if err != nil { return fmt.Errorf("failed to collect file %s: %w", output.Source, err) } } return nil }
Workers send periodic heartbeats to the scheduler:
func sendHeartbeat(client workerpb.WorkerServiceClient, workerID string) {
ticker := time.NewTicker(heartbeatInterval)
defer ticker.Stop()
for range ticker.C {
_, err := client.SendHeartbeat(ctx, &workerpb.HeartbeatRequest{
WorkerId: workerID,
Status: workerpb.WorkerStatus_AVAILABLE,
Metrics: &workerpb.WorkerMetrics{
CpuUsage: getCPUUsage(),
MemoryUsage: getMemoryUsage(),
ActiveTasks: getActiveTaskCount(),
Timestamp: time.Now().Unix(),
},
})
if err != nil {
log.Printf("Failed to send heartbeat: %v", err)
}
}
}
The scheduler monitors worker health and handles failures:
func monitorWorkerHealth(workerID string) { ticker := time.NewTicker(healthCheckInterval) defer ticker.Stop() for range ticker.C { if !isWorkerHealthy(workerID) { // Mark worker as unhealthy markWorkerUnhealthy(workerID) // Reassign tasks to other workers reassignWorkerTasks(workerID) } } }
When workers fail, the scheduler:
Task failures are handled gracefully:
func handleTaskFailure(taskID string, err error) { // Update task status updateTaskStatus(taskID, TaskStatusFailed, err.Error()) // Log failure log.Printf("Task %s failed: %v", taskID, err) // Retry if appropriate if shouldRetry(taskID) { scheduleTaskRetry(taskID) } }
Network connectivity issues are handled with:
Workers authenticate with the scheduler using:
Task execution is isolated through:
gRPC connections are pooled for efficiency:
type ConnectionPool struct { connections map[string]*grpc.ClientConn mutex sync.RWMutex } func (p *ConnectionPool) GetConnection(address string) (*grpc.ClientConn, error) { p.mutex.RLock() conn, exists := p.connections[address] p.mutex.RUnlock() if exists { return conn, nil } // Create new connection conn, err := grpc.Dial(address, grpc.WithInsecure()) if err != nil { return nil, err } p.mutex.Lock() p.connections[address] = conn p.mutex.Unlock() return conn, nil }
Multiple operations are batched for efficiency:
func batchUpdateTaskStatus(updates []TaskStatusUpdate) error {
req := &workerpb.BatchUpdateTaskStatusRequest{
Updates: make([]*workerpb.TaskStatusUpdate, len(updates)),
}
for i, update := range updates {
req.Updates[i] = &workerpb.TaskStatusUpdate{
TaskId: update.TaskID,
Status: update.Status,
Output: update.Output,
}
}
_, err := client.BatchUpdateTaskStatus(ctx, req)
return err
}
Symptoms: Worker fails to connect to scheduler Causes: Network issues, incorrect server address, firewall blocking Solutions:
telnet scheduler-host 50051Symptoms: Tasks fail to execute or complete Causes: Resource limits, permission issues, command errors Solutions:
Symptoms: Workers consuming excessive memory Causes: Memory leaks, large input files, inefficient processing Solutions:
htop or ps# Set debug log level export LOG_LEVEL=debug # Start worker with verbose output ./worker --log-level=debug --server-address=localhost:50051
# Check worker processes ps aux | grep worker # Monitor network connections netstat -an | grep 50051 # Check worker logs tail -f /var/log/worker.log
# Test gRPC server connectivity grpcurl -plaintext localhost:50051 list # Test specific service grpcurl -plaintext localhost:50051 worker.WorkerService/ListWorkers
For more information, see the Architecture Guide and Development Guide.