blob: 9eb6a886e736347e4835f40bf917b05aba941931 [file] [log] [blame]
package adapters
import (
"context"
"fmt"
"os"
"os/exec"
"path/filepath"
"strings"
"text/template"
"time"
"github.com/apache/airavata/scheduler/core/domain"
ports "github.com/apache/airavata/scheduler/core/port"
v1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
metricsclientset "k8s.io/metrics/pkg/client/clientset/versioned"
)
// KubernetesAdapter implements the ComputeAdapter interface for Kubernetes clusters
type KubernetesAdapter struct {
resource domain.ComputeResource
vault domain.CredentialVault
clientset *kubernetes.Clientset
metricsClient metricsclientset.Interface
namespace string
config *ScriptConfig
}
// Compile-time interface verification
var _ ports.ComputePort = (*KubernetesAdapter)(nil)
// NewKubernetesAdapter creates a new Kubernetes adapter
func NewKubernetesAdapter(resource domain.ComputeResource, vault domain.CredentialVault) *KubernetesAdapter {
return NewKubernetesAdapterWithConfig(resource, vault, nil)
}
// NewKubernetesAdapterWithConfig creates a new Kubernetes adapter with custom script configuration
func NewKubernetesAdapterWithConfig(resource domain.ComputeResource, vault domain.CredentialVault, config *ScriptConfig) *KubernetesAdapter {
if config == nil {
config = &ScriptConfig{
WorkerBinaryURL: "https://server/api/worker-binary",
ServerGRPCAddress: "scheduler", // Use service name for container-to-container communication
ServerGRPCPort: 50051,
DefaultWorkingDir: "/tmp/worker",
}
}
return &KubernetesAdapter{
resource: resource,
vault: vault,
namespace: "default", // Default namespace
config: config,
}
}
// kubernetesWorkerSpawnTemplate defines the Kubernetes worker spawn pod template
const kubernetesWorkerSpawnTemplate = `apiVersion: v1
kind: Pod
metadata:
name: worker-{{.WorkerID}}
labels:
app: airavata-worker
experiment-id: "{{.ExperimentID}}"
compute-resource-id: "{{.ComputeResourceID}}"
spec:
restartPolicy: Never
activeDeadlineSeconds: {{.WalltimeSeconds}}
containers:
- name: worker
image: ubuntu:20.04
command: ["/bin/bash"]
args:
- -c
- |
set -euo pipefail
# Install curl
apt-get update && apt-get install -y curl
# Set environment variables
export WORKER_ID="{{.WorkerID}}"
export EXPERIMENT_ID="{{.ExperimentID}}"
export COMPUTE_RESOURCE_ID="{{.ComputeResourceID}}"
export SERVER_URL="grpc://{{.ServerAddress}}:{{.ServerPort}}"
# Create working directory
WORK_DIR="{{.WorkingDir}}/{{.WorkerID}}"
mkdir -p "$WORK_DIR"
cd "$WORK_DIR"
# Download worker binary
echo "Downloading worker binary..."
curl -L -o worker "{{.WorkerBinaryURL}}"
chmod +x worker
# Start worker
echo "Starting worker: $WORKER_ID"
exec ./worker \
--server-url="$SERVER_URL" \
--worker-id="$WORKER_ID" \
--experiment-id="$EXPERIMENT_ID" \
--compute-resource-id="$COMPUTE_RESOURCE_ID" \
--working-dir="$WORK_DIR"
resources:
requests:
cpu: "{{.CPUCores}}"
memory: "{{.MemoryMB}}Mi"
limits:
cpu: "{{.CPUCores}}"
memory: "{{.MemoryMB}}Mi"
{{if .GPUs}}
limits:
nvidia.com/gpu: {{.GPUs}}
{{end}}
env:
- name: WORKER_ID
value: "{{.WorkerID}}"
- name: EXPERIMENT_ID
value: "{{.ExperimentID}}"
- name: COMPUTE_RESOURCE_ID
value: "{{.ComputeResourceID}}"
- name: SERVER_URL
value: "grpc://{{.ServerAddress}}:{{.ServerPort}}"
`
// connect establishes connection to the Kubernetes cluster
func (k *KubernetesAdapter) connect() error {
if k.clientset != nil {
return nil // Already connected
}
// Unmarshal resource metadata
// Extract resource metadata
resourceMetadata := make(map[string]string)
if k.resource.Metadata != nil {
for key, value := range k.resource.Metadata {
resourceMetadata[key] = fmt.Sprintf("%v", value)
}
}
// Get namespace from resource metadata
if ns, ok := resourceMetadata["namespace"]; ok {
k.namespace = ns
}
// Create Kubernetes client
var config *rest.Config
var err error
// Check if we're running inside a cluster
if k.resource.Endpoint == "" || k.resource.Endpoint == "in-cluster" {
// Use in-cluster config
config, err = rest.InClusterConfig()
if err != nil {
return fmt.Errorf("failed to create in-cluster config: %w", err)
}
} else {
// Use external cluster config
// Get kubeconfig path from resource metadata
kubeconfigPath := ""
if kubeconfig, ok := resourceMetadata["kubeconfig"]; ok {
kubeconfigPath = kubeconfig
} else {
// Use default kubeconfig location
kubeconfigPath = filepath.Join(homeDir(), ".kube", "config")
}
// Build config from kubeconfig file
loadingRules := clientcmd.NewDefaultClientConfigLoadingRules()
loadingRules.ExplicitPath = kubeconfigPath
// Get context from metadata or use current context
context := ""
if ctx, ok := resourceMetadata["context"]; ok {
context = ctx
}
configOverrides := &clientcmd.ConfigOverrides{}
if context != "" {
configOverrides.CurrentContext = context
}
kubeConfig := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(
loadingRules,
configOverrides,
)
config, err = kubeConfig.ClientConfig()
if err != nil {
return fmt.Errorf("failed to build config from kubeconfig %s: %w", kubeconfigPath, err)
}
}
// Create clientset
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
return fmt.Errorf("failed to create Kubernetes clientset: %w", err)
}
// Create metrics client
metricsClient, err := metricsclientset.NewForConfig(config)
if err != nil {
// Metrics client is optional - if metrics-server is not available, we'll handle gracefully
// Don't fail the connection, just log the warning
fmt.Printf("Warning: Failed to create metrics client (metrics-server may not be available): %v\n", err)
metricsClient = nil
}
k.clientset = clientset
k.metricsClient = metricsClient
return nil
}
// GenerateScript generates a Kubernetes Job manifest for the task
func (k *KubernetesAdapter) GenerateScript(task domain.Task, outputDir string) (string, error) {
err := k.connect()
if err != nil {
return "", err
}
// Create job manifest
job := &v1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("task-%s", task.ID),
Namespace: k.namespace,
Labels: map[string]string{
"app": "airavata-scheduler",
"task-id": task.ID,
"experiment": task.ExperimentID,
},
},
Spec: v1.JobSpec{
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"app": "airavata-scheduler",
"task-id": task.ID,
"experiment": task.ExperimentID,
},
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "task-executor",
Image: k.getContainerImage(),
Command: []string{"/bin/bash", "-c"},
Args: []string{task.Command},
Env: k.getEnvironmentVariables(task),
VolumeMounts: []corev1.VolumeMount{
{
Name: "output-volume",
MountPath: "/output",
},
},
},
},
RestartPolicy: corev1.RestartPolicyNever,
Volumes: []corev1.Volume{
{
Name: "output-volume",
VolumeSource: corev1.VolumeSource{
EmptyDir: &corev1.EmptyDirVolumeSource{},
},
},
},
},
},
BackoffLimit: int32Ptr(3),
},
}
// Save job manifest to file
manifestPath := filepath.Join(outputDir, fmt.Sprintf("%s-job.yaml", task.ID))
err = k.saveJobManifest(job, manifestPath)
if err != nil {
return "", fmt.Errorf("failed to save job manifest: %w", err)
}
return manifestPath, nil
}
// SubmitTask submits the task to Kubernetes using kubectl apply
func (k *KubernetesAdapter) SubmitTask(ctx context.Context, scriptPath string) (string, error) {
err := k.connect()
if err != nil {
return "", err
}
// Apply the job manifest
job, err := k.applyJobManifest(ctx, scriptPath)
if err != nil {
return "", fmt.Errorf("failed to apply job manifest: %w", err)
}
return job.Name, nil
}
// GetJobStatus gets the status of a Kubernetes job (interface method)
func (k *KubernetesAdapter) GetJobStatus(ctx context.Context, jobID string) (*ports.JobStatus, error) {
status, err := k.getJobStatus(ctx, jobID)
if err != nil {
return nil, err
}
jobStatus := ports.JobStatus(status)
return &jobStatus, nil
}
// GetNodeInfo gets information about a specific node
func (k *KubernetesAdapter) GetNodeInfo(ctx context.Context, nodeID string) (*ports.NodeInfo, error) {
err := k.connect()
if err != nil {
return nil, err
}
// Get node from Kubernetes
node, err := k.clientset.CoreV1().Nodes().Get(ctx, nodeID, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("failed to get node: %w", err)
}
// Extract node information
info := &ports.NodeInfo{
ID: nodeID,
Name: node.Name,
Status: ports.NodeStatusUp,
CPUCores: 0,
MemoryGB: 0,
}
// Parse resource capacity
if cpu, exists := node.Status.Capacity["cpu"]; exists {
if cores, ok := cpu.AsInt64(); ok {
info.CPUCores = int(cores)
}
}
if memory, exists := node.Status.Capacity["memory"]; exists {
if mem, ok := memory.AsInt64(); ok {
info.MemoryGB = int(mem / (1024 * 1024 * 1024)) // Convert to GB
}
}
return info, nil
}
// GetQueueInfo gets information about a specific queue
func (k *KubernetesAdapter) GetQueueInfo(ctx context.Context, queueName string) (*ports.QueueInfo, error) {
// For Kubernetes, we can get namespace information
info := &ports.QueueInfo{
Name: queueName,
Status: ports.QueueStatusActive,
MaxWalltime: time.Hour * 24,
MaxCPUCores: 8,
MaxMemoryMB: 16384,
MaxDiskGB: 100,
MaxGPUs: 0,
MaxJobs: 100,
MaxJobsPerUser: 10,
Priority: 1,
}
// In practice, you'd query the Kubernetes cluster for actual queue info
return info, nil
}
// GetResourceInfo gets information about the compute resource
func (k *KubernetesAdapter) GetResourceInfo(ctx context.Context) (*ports.ResourceInfo, error) {
// For Kubernetes, we can get cluster information
info := &ports.ResourceInfo{
Name: k.resource.Name,
Type: k.resource.Type,
Version: "1.0",
TotalNodes: 0,
ActiveNodes: 0,
TotalCPUCores: 0,
AvailableCPUCores: 0,
TotalMemoryGB: 0,
AvailableMemoryGB: 0,
TotalDiskGB: 0,
AvailableDiskGB: 0,
TotalGPUs: 0,
AvailableGPUs: 0,
Queues: []*ports.QueueInfo{},
Metadata: make(map[string]interface{}),
}
// In practice, you'd query the Kubernetes cluster for actual resource info
return info, nil
}
// GetStats gets statistics about the compute resource
func (k *KubernetesAdapter) GetStats(ctx context.Context) (*ports.ComputeStats, error) {
// For Kubernetes, we have simple stats
stats := &ports.ComputeStats{
TotalJobs: 0,
ActiveJobs: 0,
CompletedJobs: 0,
FailedJobs: 0,
CancelledJobs: 0,
AverageJobTime: time.Minute * 10,
TotalCPUTime: time.Hour,
TotalWalltime: time.Hour * 2,
UtilizationRate: 0.0,
ErrorRate: 0.0,
Uptime: time.Hour * 24,
LastActivity: time.Now(),
}
// In practice, you'd query the Kubernetes cluster for actual stats
return stats, nil
}
// GetWorkerStatus gets the status of a worker
func (k *KubernetesAdapter) GetWorkerStatus(ctx context.Context, workerID string) (*ports.WorkerStatus, error) {
// For Kubernetes, workers are pods
status, err := k.GetJobStatus(ctx, workerID)
if err != nil {
return nil, err
}
// Convert job status to worker status
workerStatus := &ports.WorkerStatus{
WorkerID: workerID,
Status: domain.WorkerStatusBusy,
CPULoad: 0.0,
MemoryUsage: 0.0,
DiskUsage: 0.0,
WalltimeRemaining: time.Hour,
LastHeartbeat: time.Now(),
TasksCompleted: 0,
TasksFailed: 0,
AverageTaskDuration: time.Minute * 10,
}
// Map job status to worker status
switch *status {
case ports.JobStatusRunning:
workerStatus.Status = domain.WorkerStatusBusy
case ports.JobStatusCompleted:
workerStatus.Status = domain.WorkerStatusIdle
case ports.JobStatusFailed:
workerStatus.Status = domain.WorkerStatusIdle
default:
workerStatus.Status = domain.WorkerStatusIdle
}
return workerStatus, nil
}
// IsConnected checks if the adapter is connected
func (k *KubernetesAdapter) IsConnected() bool {
// For Kubernetes, we can check if the client is initialized
return k.clientset != nil
}
// ListJobs lists all jobs on the compute resource
func (k *KubernetesAdapter) ListJobs(ctx context.Context, filters *ports.JobFilters) ([]*ports.Job, error) {
err := k.connect()
if err != nil {
return nil, err
}
// List jobs from Kubernetes
jobs, err := k.clientset.BatchV1().Jobs(k.namespace).List(ctx, metav1.ListOptions{})
if err != nil {
return nil, fmt.Errorf("failed to list jobs: %w", err)
}
var jobList []*ports.Job
for _, job := range jobs.Items {
// Map Kubernetes job status to our job status
var status ports.JobStatus
if job.Status.Succeeded > 0 {
status = ports.JobStatusCompleted
} else if job.Status.Failed > 0 {
status = ports.JobStatusFailed
} else if job.Status.Active > 0 {
status = ports.JobStatusRunning
} else {
status = ports.JobStatusPending
}
jobInfo := &ports.Job{
ID: job.Name,
Name: job.Name,
Status: status,
NodeID: "", // Kubernetes doesn't directly map to nodes in job info
}
// Apply filters if provided
if filters != nil {
if filters.UserID != nil && *filters.UserID != "" && jobInfo.Metadata["userID"] != *filters.UserID {
continue
}
if filters.Status != nil && string(jobInfo.Status) != string(*filters.Status) {
continue
}
}
jobList = append(jobList, jobInfo)
}
return jobList, nil
}
// ListNodes lists all nodes in the compute resource
func (k *KubernetesAdapter) ListNodes(ctx context.Context) ([]*ports.NodeInfo, error) {
err := k.connect()
if err != nil {
return nil, err
}
// List nodes from Kubernetes
nodes, err := k.clientset.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
if err != nil {
return nil, fmt.Errorf("failed to list nodes: %w", err)
}
var nodeList []*ports.NodeInfo
for _, node := range nodes.Items {
nodeInfo := &ports.NodeInfo{
ID: node.Name,
Name: node.Name,
Status: ports.NodeStatusUp, // Default to up
CPUCores: 0,
MemoryGB: 0,
}
// Parse resource capacity
if cpu, exists := node.Status.Capacity["cpu"]; exists {
if cores, ok := cpu.AsInt64(); ok {
nodeInfo.CPUCores = int(cores)
}
}
if memory, exists := node.Status.Capacity["memory"]; exists {
if mem, ok := memory.AsInt64(); ok {
nodeInfo.MemoryGB = int(mem / (1024 * 1024 * 1024)) // Convert to GB
}
}
nodeList = append(nodeList, nodeInfo)
}
return nodeList, nil
}
// ListQueues lists all queues in the compute resource
func (k *KubernetesAdapter) ListQueues(ctx context.Context) ([]*ports.QueueInfo, error) {
// For Kubernetes, we don't have traditional queues
// Return empty list or implement based on your Kubernetes queue system
return []*ports.QueueInfo{}, nil
}
// ListWorkers lists all workers in the compute resource
func (k *KubernetesAdapter) ListWorkers(ctx context.Context) ([]*ports.Worker, error) {
// For Kubernetes, we typically don't have workers in the traditional sense
// Return empty list or implement based on your Kubernetes worker system
return []*ports.Worker{}, nil
}
// Ping checks if the compute resource is reachable
func (k *KubernetesAdapter) Ping(ctx context.Context) error {
err := k.connect()
if err != nil {
return err
}
// Try to list namespaces to check connectivity
_, err = k.clientset.CoreV1().Namespaces().List(ctx, metav1.ListOptions{Limit: 1})
if err != nil {
return fmt.Errorf("failed to ping Kubernetes: %w", err)
}
return nil
}
// getJobStatus gets the status of a Kubernetes job (internal method)
func (k *KubernetesAdapter) getJobStatus(ctx context.Context, jobID string) (string, error) {
err := k.connect()
if err != nil {
return "", err
}
// Get job from Kubernetes
job, err := k.clientset.BatchV1().Jobs(k.namespace).Get(ctx, jobID, metav1.GetOptions{})
if err != nil {
return "UNKNOWN", fmt.Errorf("failed to get job: %w", err)
}
// Check job conditions
for _, condition := range job.Status.Conditions {
if condition.Type == v1.JobComplete && condition.Status == corev1.ConditionTrue {
return "COMPLETED", nil
}
if condition.Type == v1.JobFailed && condition.Status == corev1.ConditionTrue {
return "FAILED", nil
}
}
// Check if job is running
if job.Status.Active > 0 {
return "RUNNING", nil
}
// Check if job is pending
if job.Status.Succeeded == 0 && job.Status.Failed == 0 {
return "PENDING", nil
}
return "UNKNOWN", nil
}
// CancelJob cancels a Kubernetes job
func (k *KubernetesAdapter) CancelJob(ctx context.Context, jobID string) error {
err := k.connect()
if err != nil {
return err
}
// Delete the job
err = k.clientset.BatchV1().Jobs(k.namespace).Delete(ctx, jobID, metav1.DeleteOptions{})
if err != nil {
return fmt.Errorf("failed to delete job: %w", err)
}
return nil
}
// GetType returns the compute resource type
func (k *KubernetesAdapter) GetType() string {
return "kubernetes"
}
// Connect establishes connection to the compute resource
func (k *KubernetesAdapter) Connect(ctx context.Context) error {
return k.connect()
}
// Disconnect closes the connection to the compute resource
func (k *KubernetesAdapter) Disconnect(ctx context.Context) error {
// No persistent connections to close for Kubernetes
return nil
}
// GetConfig returns the compute resource configuration
func (k *KubernetesAdapter) GetConfig() *ports.ComputeConfig {
return &ports.ComputeConfig{
Type: "kubernetes",
Endpoint: k.resource.Endpoint,
Metadata: k.resource.Metadata,
}
}
// getContainerImage returns the container image to use
func (k *KubernetesAdapter) getContainerImage() string {
// Extract resource metadata
resourceMetadata := make(map[string]string)
if k.resource.Metadata != nil {
for key, value := range k.resource.Metadata {
resourceMetadata[key] = fmt.Sprintf("%v", value)
}
}
if image, ok := resourceMetadata["container_image"]; ok {
return image
}
return "airavata/scheduler-worker:latest" // Default worker image
}
// getEnvironmentVariables returns environment variables for the task
func (k *KubernetesAdapter) getEnvironmentVariables(task domain.Task) []corev1.EnvVar {
envVars := []corev1.EnvVar{
{
Name: "TASK_ID",
Value: task.ID,
},
{
Name: "EXPERIMENT_ID",
Value: task.ExperimentID,
},
{
Name: "OUTPUT_DIR",
Value: "/output",
},
}
// Add task-specific environment variables from metadata
if task.Metadata != nil {
for key, value := range task.Metadata {
envVars = append(envVars, corev1.EnvVar{
Name: key,
Value: fmt.Sprintf("%v", value),
})
}
}
return envVars
}
// saveJobManifest saves a job manifest to a file
func (k *KubernetesAdapter) saveJobManifest(job *v1.Job, path string) error {
// This would typically use a YAML marshaler
// For now, we'll create a simple YAML representation
yamlContent := fmt.Sprintf(`apiVersion: batch/v1
kind: Job
metadata:
name: %s
namespace: %s
labels:
app: airavata-scheduler
task-id: %s
experiment: %s
spec:
template:
metadata:
labels:
app: airavata-scheduler
task-id: %s
experiment: %s
spec:
containers:
- name: task-executor
image: %s
command: ["/bin/bash", "-c"]
args: ["%s"]
env:
- name: TASK_ID
value: "%s"
- name: EXPERIMENT_ID
value: "%s"
- name: OUTPUT_DIR
value: "/output"
volumeMounts:
- name: output-volume
mountPath: /output
restartPolicy: Never
volumes:
- name: output-volume
emptyDir: {}
backoffLimit: 3
`,
job.Name,
job.Namespace,
job.Labels["task-id"],
job.Labels["experiment"],
job.Labels["task-id"],
job.Labels["experiment"],
k.getContainerImage(),
strings.ReplaceAll(job.Spec.Template.Spec.Containers[0].Args[0], `"`, `\"`),
job.Labels["task-id"],
job.Labels["experiment"],
)
// Write to file
err := k.writeToFile(path, yamlContent)
if err != nil {
return fmt.Errorf("failed to write job manifest: %w", err)
}
return nil
}
// applyJobManifest applies a job manifest to the Kubernetes cluster
func (k *KubernetesAdapter) applyJobManifest(ctx context.Context, manifestPath string) (*v1.Job, error) {
// Read the manifest file
content, err := k.readFromFile(manifestPath)
if err != nil {
return nil, fmt.Errorf("failed to read manifest file: %w", err)
}
// Parse the YAML (simplified - in production, use proper YAML parser)
job, err := k.parseJobManifest(content)
if err != nil {
return nil, fmt.Errorf("failed to parse job manifest: %w", err)
}
// Create the job in Kubernetes
createdJob, err := k.clientset.BatchV1().Jobs(k.namespace).Create(ctx, job, metav1.CreateOptions{})
if err != nil {
return nil, fmt.Errorf("failed to create job: %w", err)
}
return createdJob, nil
}
// parseJobManifest parses a job manifest from YAML content
func (k *KubernetesAdapter) parseJobManifest(content string) (*v1.Job, error) {
// This is a simplified parser - in production, use proper YAML parsing
// For now, we'll create a basic job structure
lines := strings.Split(content, "\n")
var jobName, namespace, command string
for _, line := range lines {
line = strings.TrimSpace(line)
if strings.HasPrefix(line, "name:") {
jobName = strings.TrimSpace(strings.TrimPrefix(line, "name:"))
} else if strings.HasPrefix(line, "namespace:") {
namespace = strings.TrimSpace(strings.TrimPrefix(line, "namespace:"))
} else if strings.HasPrefix(line, "args: [\"") {
command = strings.TrimSpace(strings.TrimPrefix(line, "args: [\""))
command = strings.TrimSuffix(command, "\"]")
}
}
if jobName == "" {
return nil, fmt.Errorf("job name not found in manifest")
}
// Create job object
job := &v1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: jobName,
Namespace: namespace,
},
Spec: v1.JobSpec{
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "task-executor",
Image: k.getContainerImage(),
Command: []string{"/bin/bash", "-c"},
Args: []string{command},
},
},
RestartPolicy: corev1.RestartPolicyNever,
},
},
BackoffLimit: int32Ptr(3),
},
}
return job, nil
}
// writeToFile writes content to a file
func (k *KubernetesAdapter) writeToFile(path, content string) error {
// Create directory if it doesn't exist
dir := filepath.Dir(path)
if err := os.MkdirAll(dir, 0755); err != nil {
return fmt.Errorf("failed to create directory %s: %w", dir, err)
}
// Write content to file with proper permissions
err := os.WriteFile(path, []byte(content), 0644)
if err != nil {
return fmt.Errorf("failed to write file %s: %w", path, err)
}
return nil
}
// readFromFile reads content from a file
func (k *KubernetesAdapter) readFromFile(path string) (string, error) {
// Check if file exists
if _, err := os.Stat(path); os.IsNotExist(err) {
return "", fmt.Errorf("file does not exist: %s", path)
}
// Read file content
content, err := os.ReadFile(path)
if err != nil {
return "", fmt.Errorf("failed to read file %s: %w", path, err)
}
return string(content), nil
}
// homeDir returns the home directory
func homeDir() string {
if h := os.Getenv("HOME"); h != "" {
return h
}
return os.Getenv("USERPROFILE") // windows
}
// int32Ptr returns a pointer to an int32
func int32Ptr(i int32) *int32 { return &i }
// Close closes the Kubernetes adapter connections
func (k *KubernetesAdapter) Close() error {
// No persistent connections to close
return nil
}
// SpawnWorker spawns a worker on the Kubernetes cluster
func (k *KubernetesAdapter) SpawnWorker(ctx context.Context, req *ports.SpawnWorkerRequest) (*ports.Worker, error) {
// Create worker record
worker := &ports.Worker{
ID: req.WorkerID,
JobID: "", // Will be set when job is submitted
Status: domain.WorkerStatusIdle,
CPUCores: req.CPUCores,
MemoryMB: req.MemoryMB,
DiskGB: req.DiskGB,
GPUs: req.GPUs,
Walltime: req.Walltime,
WalltimeRemaining: req.Walltime,
NodeID: "", // Will be set when worker is assigned to a node
Queue: req.Queue,
Priority: req.Priority,
CreatedAt: time.Now(),
Metadata: req.Metadata,
}
// Generate worker spawn script using local implementation
experiment := &domain.Experiment{
ID: req.ExperimentID,
}
spawnScript, err := k.GenerateWorkerSpawnScript(context.Background(), experiment, req.Walltime)
if err != nil {
return nil, fmt.Errorf("failed to generate worker spawn script: %w", err)
}
// Write script to temporary file
scriptPath := fmt.Sprintf("/tmp/worker_spawn_%s.yaml", req.WorkerID)
if err := os.WriteFile(scriptPath, []byte(spawnScript), 0644); err != nil {
return nil, fmt.Errorf("failed to write spawn script: %w", err)
}
// Apply the Kubernetes pod specification
cmd := exec.CommandContext(ctx, "kubectl", "apply", "-f", scriptPath)
if err := cmd.Run(); err != nil {
os.Remove(scriptPath) // Clean up script file
return nil, fmt.Errorf("failed to apply worker pod: %w", err)
}
// Update worker with pod name
worker.JobID = fmt.Sprintf("pod_%s", req.WorkerID)
worker.Status = domain.WorkerStatusIdle
// Clean up script file
os.Remove(scriptPath)
return worker, nil
}
// SubmitJob submits a job to the compute resource
func (k *KubernetesAdapter) SubmitJob(ctx context.Context, req *ports.SubmitJobRequest) (*ports.Job, error) {
// Generate a unique job ID
jobID := fmt.Sprintf("job_%s_%d", k.resource.ID, time.Now().UnixNano())
// Create job record
job := &ports.Job{
ID: jobID,
Name: req.Name,
Status: ports.JobStatusPending,
CPUCores: req.CPUCores,
MemoryMB: req.MemoryMB,
DiskGB: req.DiskGB,
GPUs: req.GPUs,
Walltime: req.Walltime,
NodeID: "", // Will be set when job is assigned to a node
Queue: req.Queue,
Priority: req.Priority,
CreatedAt: time.Now(),
Metadata: req.Metadata,
}
// In a real implementation, this would:
// 1. Create a Kubernetes Job resource
// 2. Submit the job to the cluster
// 3. Return the job record
return job, nil
}
// SubmitTaskWithWorker submits a task using the worker context
func (k *KubernetesAdapter) SubmitTaskWithWorker(ctx context.Context, task *domain.Task, worker *domain.Worker) (string, error) {
// Generate script with worker context
outputDir := fmt.Sprintf("/tmp/worker_%s", worker.ID)
scriptPath, err := k.GenerateScript(*task, outputDir)
if err != nil {
return "", fmt.Errorf("failed to generate script: %w", err)
}
// Submit task
jobID, err := k.SubmitTask(ctx, scriptPath)
if err != nil {
return "", fmt.Errorf("failed to submit task: %w", err)
}
return jobID, nil
}
// GetWorkerMetrics retrieves worker performance metrics from Kubernetes
// GetWorkerMetrics retrieves worker performance metrics from Kubernetes cluster
func (k *KubernetesAdapter) GetWorkerMetrics(ctx context.Context, worker *domain.Worker) (*domain.WorkerMetrics, error) {
// Query Kubernetes metrics API for real metrics
// This would require the metrics-server to be installed in the cluster
metrics := &domain.WorkerMetrics{
WorkerID: worker.ID,
CPUUsagePercent: k.getCPUUsageFromK8s(ctx, worker.ID),
MemoryUsagePercent: k.getMemoryUsageFromK8s(ctx, worker.ID),
TasksCompleted: 0,
TasksFailed: 0,
AverageTaskDuration: 0,
LastTaskDuration: 0,
Uptime: time.Since(worker.CreatedAt),
CustomMetrics: make(map[string]string),
Timestamp: time.Now(),
}
return metrics, nil
}
// getCPUUsageFromK8s queries Kubernetes metrics API for CPU usage
func (k *KubernetesAdapter) getCPUUsageFromK8s(ctx context.Context, workerID string) float64 {
if k.metricsClient == nil {
// Metrics client not available (metrics-server not installed)
return 0.0
}
// Find the pod for this worker
podName := fmt.Sprintf("worker-%s", workerID)
// Get pod metrics
podMetrics, err := k.metricsClient.MetricsV1beta1().PodMetricses(k.namespace).Get(ctx, podName, metav1.GetOptions{})
if err != nil {
// Pod not found or metrics not available
return 0.0
}
// Calculate total CPU usage across all containers
var totalCPUUsage int64
for _, container := range podMetrics.Containers {
if container.Usage.Cpu() != nil {
totalCPUUsage += container.Usage.Cpu().MilliValue()
}
}
// Get pod resource requests/limits to calculate percentage
pod, err := k.clientset.CoreV1().Pods(k.namespace).Get(ctx, podName, metav1.GetOptions{})
if err != nil {
// Can't get pod specs, return raw usage
return float64(totalCPUUsage) / 1000.0 // Convert millicores to cores
}
// Calculate total CPU requests/limits
var totalCPULimit int64
for _, container := range pod.Spec.Containers {
if container.Resources.Limits != nil {
if cpu := container.Resources.Limits.Cpu(); cpu != nil {
totalCPULimit += cpu.MilliValue()
}
} else if container.Resources.Requests != nil {
if cpu := container.Resources.Requests.Cpu(); cpu != nil {
totalCPULimit += cpu.MilliValue()
}
}
}
if totalCPULimit == 0 {
// No limits set, return raw usage
return float64(totalCPUUsage) / 1000.0
}
// Calculate percentage
usagePercent := float64(totalCPUUsage) / float64(totalCPULimit) * 100.0
if usagePercent > 100.0 {
usagePercent = 100.0
}
return usagePercent
}
// getMemoryUsageFromK8s queries Kubernetes metrics API for memory usage
func (k *KubernetesAdapter) getMemoryUsageFromK8s(ctx context.Context, workerID string) float64 {
if k.metricsClient == nil {
// Metrics client not available (metrics-server not installed)
return 0.0
}
// Find the pod for this worker
podName := fmt.Sprintf("worker-%s", workerID)
// Get pod metrics
podMetrics, err := k.metricsClient.MetricsV1beta1().PodMetricses(k.namespace).Get(ctx, podName, metav1.GetOptions{})
if err != nil {
// Pod not found or metrics not available
return 0.0
}
// Calculate total memory usage across all containers
var totalMemoryUsage int64
for _, container := range podMetrics.Containers {
if container.Usage.Memory() != nil {
totalMemoryUsage += container.Usage.Memory().Value()
}
}
// Get pod resource requests/limits to calculate percentage
pod, err := k.clientset.CoreV1().Pods(k.namespace).Get(ctx, podName, metav1.GetOptions{})
if err != nil {
// Can't get pod specs, return raw usage in MB
return float64(totalMemoryUsage) / (1024 * 1024) // Convert bytes to MB
}
// Calculate total memory requests/limits
var totalMemoryLimit int64
for _, container := range pod.Spec.Containers {
if container.Resources.Limits != nil {
if memory := container.Resources.Limits.Memory(); memory != nil {
totalMemoryLimit += memory.Value()
}
} else if container.Resources.Requests != nil {
if memory := container.Resources.Requests.Memory(); memory != nil {
totalMemoryLimit += memory.Value()
}
}
}
if totalMemoryLimit == 0 {
// No limits set, return raw usage in MB
return float64(totalMemoryUsage) / (1024 * 1024)
}
// Calculate percentage
usagePercent := float64(totalMemoryUsage) / float64(totalMemoryLimit) * 100.0
if usagePercent > 100.0 {
usagePercent = 100.0
}
return usagePercent
}
// TerminateWorker terminates a worker on the Kubernetes cluster
func (k *KubernetesAdapter) TerminateWorker(ctx context.Context, workerID string) error {
// In a real implementation, this would:
// 1. Delete the Kubernetes pod for the worker
// 2. Clean up worker resources
// 3. Update worker status
// For now, just log the termination
fmt.Printf("Terminating worker %s\n", workerID)
return nil
}
// GenerateWorkerSpawnScript generates a Kubernetes-specific script to spawn a worker
func (k *KubernetesAdapter) GenerateWorkerSpawnScript(ctx context.Context, experiment *domain.Experiment, walltime time.Duration) (string, error) {
capabilities := k.resource.Capabilities
data := struct {
WorkerID string
ExperimentID string
ComputeResourceID string
WalltimeSeconds int64
CPUCores int
MemoryMB int
GPUs int
WorkingDir string
WorkerBinaryURL string
ServerAddress string
ServerPort int
}{
WorkerID: fmt.Sprintf("worker_%s_%d", k.resource.ID, time.Now().UnixNano()),
ExperimentID: experiment.ID,
ComputeResourceID: k.resource.ID,
WalltimeSeconds: int64(walltime.Seconds()),
CPUCores: getIntFromCapabilities(capabilities, "cpu_cores", 1),
MemoryMB: getIntFromCapabilities(capabilities, "memory_mb", 1024),
GPUs: getIntFromCapabilities(capabilities, "gpus", 0),
WorkingDir: k.config.DefaultWorkingDir,
WorkerBinaryURL: k.config.WorkerBinaryURL,
ServerAddress: k.config.ServerGRPCAddress,
ServerPort: k.config.ServerGRPCPort,
}
t, err := template.New("kubernetes_spawn").Parse(kubernetesWorkerSpawnTemplate)
if err != nil {
return "", fmt.Errorf("failed to parse Kubernetes spawn template: %w", err)
}
var buf strings.Builder
if err := t.Execute(&buf, data); err != nil {
return "", fmt.Errorf("failed to execute Kubernetes spawn template: %w", err)
}
return buf.String(), nil
}