This guide explains the development workflow, patterns, and best practices for the Airavata Scheduler. The system follows a clean hexagonal architecture that promotes maintainability, testability, and extensibility.
For practical setup instructions, see Testing Guide
The Airavata Scheduler follows a hexagonal architecture (ports-and-adapters pattern):
core/domain/): Pure business logic with no external dependenciescore/service/): Implementation of domain interfacescore/port/): Infrastructure interfaces that services depend onadapters/): Concrete implementations of infrastructure portscore/app/): Dependency injection and application wiringcore/domain/)Contains pure business logic with no external dependencies:
core/domain/ ├── interface.go # 6 core domain interfaces ├── model.go # Domain entities (Experiment, Task, Worker, etc.) ├── enum.go # Status enums and types (TaskStatus, WorkerStatus, etc.) ├── value.go # Value objects ├── error.go # Domain-specific error types └── event.go # Domain events for event-driven architecture
Key Files:
interface.go: Defines the 6 core domain interfacesmodel.go: Contains domain entities like Experiment, Task, Workerenum.go: Contains status enums like TaskStatus, WorkerStatusvalue.go: Contains value objectserror.go: Domain-specific error types and constantsevent.go: Domain events for event-driven architectureservices/)Implements the domain interfaces with business logic:
services/
├── registry/ # ResourceRegistry implementation
│ ├── service.go # Core resource management logic
│ └── factory.go # Service factory
├── vault/ # CredentialVault implementation
│ ├── service.go # Credential management logic
│ └── factory.go # Service factory
├── orchestrator/ # ExperimentOrchestrator implementation
│ ├── service.go # Experiment lifecycle management
│ └── factory.go # Service factory
├── scheduler/ # TaskScheduler implementation
│ ├── service.go # Cost-based scheduling logic
│ └── factory.go # Service factory
├── datamover/ # DataMover implementation
│ ├── service.go # Data staging and caching logic
│ └── factory.go # Service factory
└── worker/ # WorkerLifecycle implementation
├── service.go # Worker management logic
└── factory.go # Service factory
Key Patterns:
ports/)Defines infrastructure interfaces that services depend on:
ports/ ├── database.go # Database operations interface ├── cache.go # Caching operations interface ├── events.go # Event publishing interface ├── security.go # Authentication/authorization interface ├── storage.go # File storage interface ├── compute.go # Compute resource interaction interface └── metrics.go # Metrics collection interface
Key Patterns:
adapters/)Provides concrete implementations of the ports:
adapters/
├── primary/ # Inbound adapters (driving the system)
│ └── http/ # HTTP API handlers
│ └── handlers.go
├── secondary/ # Outbound adapters (driven by the system)
│ └── database/ # PostgreSQL implementation
│ ├── adapter.go
│ └── repositories.go
└── external/ # External system adapters
├── compute/ # SLURM, Kubernetes, Bare Metal
│ ├── slurm.go
│ ├── kubernetes.go
│ └── baremetal.go
└── storage/ # S3, NFS, SFTP
├── s3.go
├── nfs.go
└── sftp.go
Key Patterns:
app/)Handles dependency injection and application wiring:
app/ └── bootstrap.go # Application bootstrap and dependency injection
Key Patterns:
Start with Domain: Begin by understanding the core business logic:
domain/interfaces.go: The 6 core interfacesdomain/models.go: Domain entities and their relationshipsdomain/value_objects.go: Value objects and enumsdomain/errors.go: Domain-specific error handlingStudy Services: Understand the business logic implementations:
services/*/service.go: Core business logicservices/*/factory.go: Service creation and dependency injectionExamine Ports: Understand the infrastructure interfaces:
ports/*.go: Infrastructure interfaces that services depend onReview Adapters: See how external systems are integrated:
adapters/primary/: HTTP API handlersadapters/secondary/: Database and cache implementationsadapters/external/: Third-party system integrationsdomain/interfaces.go:type NewService interface {
DoSomething(ctx context.Context, req *DoSomethingRequest) (*DoSomethingResponse, error)
}
services/newservice/:// services/newservice/service.go type Service struct { repo ports.RepositoryPort cache ports.CachePort } func (s *Service) DoSomething(ctx context.Context, req *domain.DoSomethingRequest) (*domain.DoSomethingResponse, error) { // Business logic implementation }
services/newservice/factory.go:func NewFactory(repo ports.RepositoryPort, cache ports.CachePort) domain.NewService {
return &Service{
repo: repo,
cache: cache,
}
}
app/bootstrap.go:newService := newservice.NewFactory(repo, cache)
// adapters/secondary/newsystem/adapter.go type Adapter struct { client *NewSystemClient } func (a *Adapter) DoSomething(ctx context.Context, req *ports.DoSomethingRequest) (*ports.DoSomethingResponse, error) { // External system integration }
app/bootstrap.go:newSystemAdapter := newsystem.NewAdapter(config)
Test services in isolation using mocks:
func TestExperimentService_CreateExperiment(t *testing.T) { // Arrange mockRepo := &MockRepository{} mockCache := &MockCache{} service := orchestrator.NewFactory(mockRepo, mockCache) // Act result, err := service.CreateExperiment(ctx, req) // Assert assert.NoError(t, err) assert.NotNil(t, result) }
Test with real infrastructure:
func TestExperimentService_Integration(t *testing.T) { // Setup test database db := setupTestDatabase(t) defer cleanupTestDatabase(t, db) // Create real services app := app.Bootstrap(testConfig) // Test with real infrastructure result, err := app.ExperimentService.CreateExperiment(ctx, req) assert.NoError(t, err) }
Test adapters with real external systems:
func TestSlurmAdapter_Integration(t *testing.T) { if !*integration { t.Skip("Integration tests disabled") } adapter := slurm.NewAdapter(slurmConfig) // Test with real SLURM cluster result, err := adapter.SpawnWorker(ctx, 1*time.Hour) assert.NoError(t, err) }
service.go for implementation, factory.go for creationadapter.go for main implementation, repositories.go for data access*_test.go with descriptive namesconfig.go or embedded in bootstrapimport ( // Standard library "context" "fmt" // Third-party packages "github.com/gorilla/mux" "gorm.io/gorm" // Internal packages "github.com/apache/airavata/scheduler/domain" "github.com/apache/airavata/scheduler/ports" "github.com/apache/airavata/scheduler/services/orchestrator" )
Use domain-specific errors:
// In domain/errors.go var ( ErrExperimentNotFound = errors.New("experiment not found") ErrInvalidParameter = errors.New("invalid parameter") ) // In service func (s *Service) GetExperiment(ctx context.Context, id string) (*domain.Experiment, error) { experiment, err := s.repo.GetExperimentByID(ctx, id) if err != nil { if errors.Is(err, gorm.ErrRecordNotFound) { return nil, domain.ErrExperimentNotFound } return nil, fmt.Errorf("failed to get experiment: %w", err) } return experiment, nil }
type Service struct { repo ports.RepositoryPort cache ports.CachePort events ports.EventPort } func (s *Service) DoSomething(ctx context.Context, req *domain.Request) (*domain.Response, error) { // 1. Validate input if err := s.validateRequest(req); err != nil { return nil, err } // 2. Check cache if cached, err := s.cache.Get(ctx, req.ID); err == nil { return cached, nil } // 3. Business logic result, err := s.repo.DoSomething(ctx, req) if err != nil { return nil, err } // 4. Cache result s.cache.Set(ctx, req.ID, result, time.Hour) // 5. Publish event s.events.Publish(ctx, &domain.Event{Type: "SomethingDone", Data: result}) return result, nil }
type Adapter struct { client *ExternalClient config *Config } func (a *Adapter) DoSomething(ctx context.Context, req *ports.Request) (*ports.Response, error) { // 1. Transform request externalReq := a.transformRequest(req) // 2. Call external system externalResp, err := a.client.DoSomething(ctx, externalReq) if err != nil { return nil, fmt.Errorf("external system error: %w", err) } // 3. Transform response response := a.transformResponse(externalResp) return response, nil }
func NewFactory(repo ports.RepositoryPort, cache ports.CachePort, events ports.EventPort) domain.Service {
return &Service{
repo: repo,
cache: cache,
events: events,
}
}
The Airavata Scheduler includes a comprehensive command-line interface built with Cobra.
The CLI follows a modular structure with separate command groups:
cmd/cli/ ├── main.go # Root command and experiment management ├── auth.go # Authentication commands ├── user.go # User profile and account management ├── resources.go # Resource management (compute, storage, credentials) ├── data.go # Data upload/download commands ├── project.go # Project management commands └── config.go # Configuration management
To add a new command group:
func createNewCommands() *cobra.Command { newCmd := &cobra.Command{ Use: "new", Short: "New command group", Long: "Description of new command group", } // Add subcommands newCmd.AddCommand(createSubCommand()) return newCmd }
main.go:rootCmd.AddCommand(createNewCommands())
func executeNewCommand(cmd *cobra.Command, args []string) error { // Command implementation return nil }
All commands should check authentication:
configManager := NewConfigManager()
if !configManager.IsAuthenticated() {
return fmt.Errorf("not authenticated - run 'airavata auth login' first")
}
Use consistent HTTP client patterns:
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
if err != nil {
return fmt.Errorf("failed to create request: %w", err)
}
req.Header.Set("Authorization", "Bearer "+token)
Provide clear, actionable error messages:
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("failed to %s: %s", operation, string(body))
}
Show progress for long-running operations:
fmt.Printf("📤 Uploading %s...\n", filename) // ... operation ... fmt.Printf("✅ Upload completed successfully!\n")
Test CLI commands with:
# Test command help ./bin/airavata --help ./bin/airavata experiment --help # Test command execution ./bin/airavata auth status ./bin/airavata resource compute list
Update documentation when adding new commands:
docs/reference/cli.mdThe system uses Protocol Buffers for gRPC communication. Generate proto code before building:
# Install protoc and Go plugins go install google.golang.org/protobuf/cmd/protoc-gen-go@latest go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest # Generate proto code make proto # Or manually protoc --go_out=core/dto --go-grpc_out=core/dto \ --go_opt=paths=source_relative \ --go-grpc_opt=paths=source_relative \ --proto_path=proto \ proto/*.proto
For integration testing with SLURM clusters, the system uses a deterministic munge key to ensure reproducible authentication across all SLURM nodes:
# Generate deterministic munge key for SLURM clusters ./scripts/generate-slurm-munge-key.sh # This creates tests/docker/slurm/shared-munge.key with: # - Deterministic content based on fixed seed "airavata-munge-test-seed-v1" # - 1024-byte binary key generated from SHA256 hashes # - Same key used across all SLURM containers for consistent authentication
Key Features:
Verification:
# Verify all containers share the same munge key docker exec airavata-scheduler-slurm-cluster-01-1 sha256sum /etc/munge/munge.key docker exec airavata-scheduler-slurm-cluster-02-1 sha256sum /etc/munge/munge.key docker exec airavata-scheduler-slurm-node-01-01-1 sha256sum /etc/munge/munge.key docker exec airavata-scheduler-slurm-node-02-01-1 sha256sum /etc/munge/munge.key # All should output identical SHA256 hash
# Build both scheduler and worker make build # Or build individually make build-server # Builds build/scheduler make build-worker # Builds build/worker # Verify binaries ./build/scheduler --help ./build/worker --help
# 1. Generate proto code make proto # 2. Build binaries make build # 3. Run scheduler ./build/scheduler --mode=server # 4. Run worker (in separate terminal) ./build/worker --server-address=localhost:50051
# Test worker gRPC client go test ./cmd/worker -v # Test scheduler gRPC server go test ./adapters -v -run TestWorkerService
# Start test services docker compose --profile test up -d # Test worker integration go test ./tests/integration -v -run TestWorkerIntegration # Clean up docker compose --profile test down
# Test gRPC connectivity grpcurl -plaintext localhost:50051 list grpcurl -plaintext localhost:50051 worker.WorkerService/ListWorkers # Test worker registration ./build/worker --server-address=localhost:50051 --worker-id=test-worker-1
# Start all infrastructure services (PostgreSQL, SpiceDB, OpenBao, MinIO, etc.) make docker-up # Wait for services to be healthy make wait-services # Upload SpiceDB authorization schema make spicedb-schema # Verify all services are running docker compose ps
# Check PostgreSQL psql postgres://user:password@localhost:5432/airavata -c "SELECT 1;" # Check SpiceDB grpcurl -plaintext -d '{"resource": {"object_type": "credential", "object_id": "test"}}' \ localhost:50051 authzed.api.v1.PermissionsService/CheckPermission # Check OpenBao export VAULT_ADDR='http://localhost:8200' export VAULT_TOKEN='dev-token' vault status # Check MinIO curl http://localhost:9000/minio/health/live
# Terminal 1: Start scheduler ./build/scheduler --mode=server --log-level=debug # Terminal 2: Start worker ./build/worker --server-address=localhost:50051 --log-level=debug # Terminal 3: Test API curl http://localhost:8080/health curl http://localhost:8080/api/v1/credentials # Test credential management
Create a .env file for local development:
# .env # PostgreSQL DATABASE_URL=postgres://user:password@localhost:5432/airavata?sslmode=disable # SpiceDB SPICEDB_ENDPOINT=localhost:50051 SPICEDB_TOKEN=somerandomkeyhere SPICEDB_INSECURE=true # OpenBao VAULT_ADDR=http://localhost:8200 VAULT_TOKEN=dev-token VAULT_MOUNT_PATH=secret # MinIO S3_ENDPOINT=localhost:9000 S3_ACCESS_KEY=minioadmin S3_SECRET_KEY=minioadmin S3_USE_SSL=false # Server SERVER_PORT=8080 LOG_LEVEL=debug
# Install air for hot reloading go install github.com/cosmtrek/air@latest # Run scheduler with hot reload air -c .air.toml # Or use go run go run ./core/cmd --mode=server go run ./cmd/worker --server-address=localhost:50051
# Build with debug symbols go build -gcflags="all=-N -l" -o build/scheduler ./core/cmd go build -gcflags="all=-N -l" -o build/worker ./cmd/worker # Run with debugger dlv exec ./build/scheduler -- --mode=server dlv exec ./build/worker -- --server-address=localhost:50051
# Install zed CLI brew install authzed/tap/zed # or go install github.com/authzed/zed@latest # Validate schema make spicedb-validate # Read current schema zed schema read \ --endpoint localhost:50051 \ --token "somerandomkeyhere" \ --insecure # Write relationships manually (for testing) zed relationship create \ --endpoint localhost:50051 \ --token "somerandomkeyhere" \ --insecure \ credential:test-cred owner user:alice
# List all relationships for a credential zed relationship read \ --endpoint localhost:50051 \ --token "somerandomkeyhere" \ --insecure \ --filter 'credential:test-cred' # Check if user has permission zed permission check \ --endpoint localhost:50051 \ --token "somerandomkeyhere" \ --insecure \ credential:test-cred read user:alice
# Set environment export VAULT_ADDR='http://localhost:8200' export VAULT_TOKEN='dev-token' # Enable KV v2 engine (if not already enabled) vault secrets enable -version=2 -path=secret kv # Store a test credential vault kv put secret/credentials/test-key \ type=ssh_key \ data="$(cat ~/.ssh/id_rsa)" # Retrieve credential vault kv get secret/credentials/test-key # List all credentials vault kv list secret/credentials/ # Delete credential vault kv delete secret/credentials/test-key
# Create a test policy cat > test-policy.hcl <<EOF path "secret/data/credentials/*" { capabilities = ["create", "read", "update", "delete", "list"] } EOF vault policy write test-policy test-policy.hcl # Create token with policy vault token create -policy=test-policy # Test token VAULT_TOKEN=<new-token> vault kv get secret/credentials/test-key
// tests/integration/credential_test.go func TestCredentialLifecycle(t *testing.T) { suite := testutil.SetupIntegrationTest(t) defer suite.Cleanup() // Start SpiceDB and OpenBao err := suite.StartServices(t, "postgres", "spicedb", "openbao") require.NoError(t, err) // Create user user, err := suite.CreateUser("test-user", 1001, 1001) require.NoError(t, err) // Create credential (stored in OpenBao) cred, err := suite.CreateCredential("test-ssh-key", user.ID) require.NoError(t, err) // Verify ownership in SpiceDB owner, err := suite.SpiceDBAdapter.GetCredentialOwner(context.Background(), cred.ID) require.NoError(t, err) assert.Equal(t, user.ID, owner) // Share with another user user2, err := suite.CreateUser("user2", 1002, 1002) require.NoError(t, err) err = suite.AddCredentialACL(cred.ID, "USER", user2.ID, "read") require.NoError(t, err) // Verify access hasAccess := suite.CheckCredentialAccess(cred.ID, user2.ID, "read") assert.True(t, hasAccess) // Retrieve credential data (from OpenBao) data, _, err := suite.VaultService.RetrieveCredential(context.Background(), cred.ID, user2.ID) require.NoError(t, err) assert.NotNil(t, data) }
# Via API curl -X POST http://localhost:8080/api/v1/groups \ -H "Authorization: Bearer $TOKEN" \ -d '{"name": "engineering"}' curl -X POST http://localhost:8080/api/v1/groups/engineering/members \ -H "Authorization: Bearer $TOKEN" \ -d '{"user_id": "alice", "member_type": "user"}' curl -X POST http://localhost:8080/api/v1/credentials/cred-123/share \ -H "Authorization: Bearer $TOKEN" \ -d '{"principal_type": "group", "principal_id": "engineering", "permission": "read"}'
# Check if SpiceDB is running docker compose ps spicedb # Check logs docker compose logs spicedb # Test connectivity grpcurl -plaintext localhost:50051 list # Verify preshared key grpcurl -plaintext \ -H "authorization: Bearer somerandomkeyhere" \ localhost:50051 authzed.api.v1.SchemaService/ReadSchema
# Check if OpenBao is running docker compose ps openbao # Check logs docker compose logs openbao # Test connectivity vault status # Check mount points vault secrets list
# Debug SpiceDB relationships zed relationship read \ --endpoint localhost:50051 \ --token "somerandomkeyhere" \ --insecure \ --filter 'credential:YOUR_CRED_ID' # Check if schema is loaded make spicedb-schema # Verify user membership zed relationship read \ --endpoint localhost:50051 \ --token "somerandomkeyhere" \ --insecure \ --filter 'group:YOUR_GROUP_ID'
# List all secrets vault kv list secret/credentials/ # Check secret metadata vault kv metadata get secret/credentials/YOUR_CRED_ID # Verify token has correct policy vault token lookup
# Install k6 brew install k6 # Run load test k6 run tests/performance/credential_load_test.js # Test concurrent permission checks k6 run tests/performance/permission_check_load_test.js
# Enable pprof in scheduler ./build/scheduler --mode=server --pprof=true # Capture CPU profile go tool pprof http://localhost:6060/debug/pprof/profile?seconds=30 # Capture memory profile go tool pprof http://localhost:6060/debug/pprof/heap # Analyze with web interface go tool pprof -http=:8081 cpu.prof
This development guide provides the foundation for working effectively with the Airavata Scheduler's hexagonal architecture. Follow these patterns and principles to maintain code quality and system reliability.
For more detailed information, see: