This document describes the WebSocket protocol used by the Airavata Scheduler for real-time communication between the server and client applications.
The Airavata Scheduler WebSocket protocol provides real-time updates for:
The system provides several WebSocket endpoints for different types of subscriptions:
ws://localhost:8080/ws/experiments/{experimentId}
ws://localhost:8080/ws/tasks/{taskId}
ws://localhost:8080/ws/projects/{projectId}
ws://localhost:8080/ws/user
ws://host:port/ws/{resourceType}/{resourceId}?token={jwt_token}
Parameters:
host: Server hostname or IP addressport: Server port (default: 8080)resourceType: Type of resource to subscribe toresourceId: ID of the specific resourcetoken: JWT authentication token// Connect to experiment updates const experimentWs = new WebSocket('ws://localhost:8080/ws/experiments/exp-123?token=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...'); // Connect to user-wide updates const userWs = new WebSocket('ws://localhost:8080/ws/user?token=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...'); // Connect to project updates const projectWs = new WebSocket('ws://localhost:8080/ws/projects/proj-456?token=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...');
All WebSocket connections require a valid JWT token for authentication. The token can be provided in two ways:
Query Parameter (Recommended):
ws://localhost:8080/ws/experiments/exp-123?token=your_jwt_token
Authorization Header (Alternative):
const ws = new WebSocket('ws://localhost:8080/ws/experiments/exp-123', { headers: { 'Authorization': 'Bearer your_jwt_token' } });
The server validates the JWT token on connection and:
{ "type": "error", "id": "error-123", "timestamp": "2024-01-15T10:30:00Z", "error": "Authentication required" }
All WebSocket messages follow a consistent JSON format:
{ "type": "message_type", "id": "unique_message_id", "timestamp": "2024-01-15T10:30:00Z", "data": { // Message-specific data }, "error": "error_message_if_applicable" }
| Field | Type | Required | Description |
|---|---|---|---|
type | string | Yes | Message type identifier |
id | string | Yes | Unique message identifier |
timestamp | string | Yes | ISO 8601 timestamp |
data | object | No | Message payload |
error | string | No | Error message (for error types) |
Message IDs follow a specific format for easy identification:
exp_{experimentId}_{timestamp}task_{taskId}_{timestamp}worker_{workerId}_{timestamp}system_{timestamp}user_{userId}_{timestamp}{ "type": "ping", "id": "ping-123", "timestamp": "2024-01-15T10:30:00Z" }
{ "type": "pong", "id": "pong-123", "timestamp": "2024-01-15T10:30:00Z" }
{ "type": "system_status", "id": "system-123", "timestamp": "2024-01-15T10:30:00Z", "data": { "totalConnections": 150, "activeConnections": 120, "totalMessages": 15420, "messagesPerSecond": 25.5, "averageLatency": "0.05s", "uptime": "2h 30m 15s" } }
{ "type": "error", "id": "error-123", "timestamp": "2024-01-15T10:30:00Z", "error": "Invalid message format" }
{ "type": "experiment_created", "id": "exp_exp-123_1642248600", "timestamp": "2024-01-15T10:30:00Z", "data": { "experiment": { "id": "exp-123", "name": "Parameter Sweep", "status": "CREATED", "ownerId": "user-456", "projectId": "proj-789" }, "summary": { "id": "exp-123", "name": "Parameter Sweep", "status": "CREATED", "ownerId": "user-456" } } }
{ "type": "experiment_updated", "id": "exp_exp-123_1642248660", "timestamp": "2024-01-15T10:31:00Z", "data": { "experiment": { "id": "exp-123", "name": "Parameter Sweep", "status": "RUNNING", "ownerId": "user-456", "projectId": "proj-789" }, "summary": { "id": "exp-123", "name": "Parameter Sweep", "status": "RUNNING", "ownerId": "user-456" } } }
{ "type": "experiment_progress", "id": "exp_exp-123_1642248720", "timestamp": "2024-01-15T10:32:00Z", "data": { "experimentId": "exp-123", "totalTasks": 100, "completedTasks": 45, "failedTasks": 5, "runningTasks": 10, "progressPercent": 45.0, "estimatedTimeRemaining": "1h 30m", "lastUpdated": "2024-01-15T10:32:00Z" } }
{ "type": "experiment_completed", "id": "exp_exp-123_1642249200", "timestamp": "2024-01-15T10:40:00Z", "data": { "experiment": { "id": "exp-123", "name": "Parameter Sweep", "status": "COMPLETED", "ownerId": "user-456", "projectId": "proj-789" }, "summary": { "id": "exp-123", "name": "Parameter Sweep", "status": "COMPLETED", "ownerId": "user-456" } } }
{ "type": "experiment_failed", "id": "exp_exp-123_1642249260", "timestamp": "2024-01-15T10:41:00Z", "data": { "experiment": { "id": "exp-123", "name": "Parameter Sweep", "status": "FAILED", "ownerId": "user-456", "projectId": "proj-789" }, "summary": { "id": "exp-123", "name": "Parameter Sweep", "status": "FAILED", "ownerId": "user-456" } } }
{ "type": "task_created", "id": "task_task-456_1642248600", "timestamp": "2024-01-15T10:30:00Z", "data": { "task": { "id": "task-456", "experimentId": "exp-123", "status": "CREATED", "command": "python script.py --param1 value1", "workerId": null }, "summary": { "id": "task-456", "experimentId": "exp-123", "status": "CREATED", "workerId": null } } }
{ "type": "task_updated", "id": "task_task-456_1642248660", "timestamp": "2024-01-15T10:31:00Z", "data": { "task": { "id": "task-456", "experimentId": "exp-123", "status": "RUNNING", "command": "python script.py --param1 value1", "workerId": "worker-789" }, "summary": { "id": "task-456", "experimentId": "exp-123", "status": "RUNNING", "workerId": "worker-789" } } }
{ "type": "task_progress", "id": "task_task-456_1642248720", "timestamp": "2024-01-15T10:32:00Z", "data": { "taskId": "task-456", "experimentId": "exp-123", "status": "RUNNING", "progressPercent": 75.0, "currentStage": "RUNNING", "workerId": "worker-789", "startedAt": "2024-01-15T10:31:00Z", "estimatedCompletion": "2024-01-15T10:35:00Z", "lastUpdated": "2024-01-15T10:32:00Z" } }
{ "type": "task_completed", "id": "task_task-456_1642249200", "timestamp": "2024-01-15T10:40:00Z", "data": { "task": { "id": "task-456", "experimentId": "exp-123", "status": "COMPLETED", "command": "python script.py --param1 value1", "workerId": "worker-789" }, "summary": { "id": "task-456", "experimentId": "exp-123", "status": "COMPLETED", "workerId": "worker-789" } } }
{ "type": "task_failed", "id": "task_task-456_1642249260", "timestamp": "2024-01-15T10:41:00Z", "data": { "task": { "id": "task-456", "experimentId": "exp-123", "status": "FAILED", "command": "python script.py --param1 value1", "workerId": "worker-789", "error": "Script execution failed" }, "summary": { "id": "task-456", "experimentId": "exp-123", "status": "FAILED", "workerId": "worker-789", "error": "Script execution failed" } } }
{ "type": "worker_registered", "id": "worker_worker-789_1642248600", "timestamp": "2024-01-15T10:30:00Z", "data": { "worker": { "id": "worker-789", "computeResourceId": "compute-123", "experimentId": "exp-123", "status": "RUNNING", "currentTaskId": null }, "summary": { "id": "worker-789", "computeResourceId": "compute-123", "experimentId": "exp-123", "status": "RUNNING" } } }
{ "type": "worker_updated", "id": "worker_worker-789_1642248660", "timestamp": "2024-01-15T10:31:00Z", "data": { "worker": { "id": "worker-789", "computeResourceId": "compute-123", "experimentId": "exp-123", "status": "RUNNING", "currentTaskId": "task-456" }, "summary": { "id": "worker-789", "computeResourceId": "compute-123", "experimentId": "exp-123", "status": "RUNNING" } } }
{ "type": "worker_offline", "id": "worker_worker-789_1642249200", "timestamp": "2024-01-15T10:40:00Z", "data": { "worker": { "id": "worker-789", "computeResourceId": "compute-123", "experimentId": "exp-123", "status": "OFFLINE", "currentTaskId": null }, "summary": { "id": "worker-789", "computeResourceId": "compute-123", "experimentId": "exp-123", "status": "OFFLINE" } } }
To subscribe to specific resources or events, send a subscription message:
{ "type": "system_status", "data": { "action": "subscribe", "resourceType": "experiment", "resourceId": "exp-123" } }
To unsubscribe from resources or events:
{ "type": "system_status", "data": { "action": "unsubscribe", "resourceType": "experiment", "resourceId": "exp-123" } }
The server responds to subscription requests:
{ "type": "system_status", "id": "sub-response-123", "timestamp": "2024-01-15T10:30:00Z", "data": { "action": "subscribed", "resourceType": "experiment", "resourceId": "exp-123", "status": "success" } }
| Resource Type | Description | Events |
|---|---|---|
experiment | Experiment-specific events | All experiment and task events |
task | Task-specific events | Task events only |
project | Project-wide events | All experiments in project |
user | User-specific events | All user's experiments |
system | System-wide events | System status and health |
Events are broadcast to clients based on their subscriptions:
Experiment Event → Experiment Subscribers + Project Subscribers + User Subscribers Task Event → Task Subscribers + Experiment Subscribers + Project Subscribers + User Subscribers Worker Event → Worker Subscribers + System Subscribers System Event → All Subscribers
Events are delivered in the order they occur, with timestamps to ensure proper sequencing:
{ "type": "experiment_updated", "id": "exp_exp-123_1642248600", "timestamp": "2024-01-15T10:30:00Z", "data": { ... } }
{ "type": "error", "id": "error-123", "timestamp": "2024-01-15T10:30:00Z", "error": "Authentication required" }
{ "type": "error", "id": "error-124", "timestamp": "2024-01-15T10:30:00Z", "error": "Invalid or expired token" }
{ "type": "error", "id": "error-125", "timestamp": "2024-01-15T10:30:00Z", "error": "Rate limit exceeded" }
{ "type": "error", "id": "error-126", "timestamp": "2024-01-15T10:30:00Z", "error": "Invalid message format" }
{ "type": "error", "id": "error-127", "timestamp": "2024-01-15T10:30:00Z", "error": "Unknown message type" }
const ws = new WebSocket('ws://localhost:8080/ws/experiments/exp-123?token=your_token'); ws.onerror = (error) => { console.error('WebSocket error:', error); // Handle connection errors }; ws.onmessage = (event) => { const message = JSON.parse(event.data); if (message.type === 'error') { console.error('Server error:', message.error); // Handle server errors } else { // Handle normal messages handleMessage(message); } };
The server sends periodic ping messages to maintain connection health:
{ "type": "ping", "id": "ping-123", "timestamp": "2024-01-15T10:30:00Z" }
Clients should respond with pong messages:
{ "type": "pong", "id": "pong-123", "timestamp": "2024-01-15T10:30:00Z" }
Clients should implement automatic reconnection:
class WebSocketManager { constructor() { this.reconnectAttempts = 0; this.maxReconnectAttempts = 5; this.reconnectDelay = 1000; // 1 second } connect(url) { this.ws = new WebSocket(url); this.ws.onopen = () => { console.log('Connected'); this.reconnectAttempts = 0; }; this.ws.onclose = () => { console.log('Disconnected'); this.handleReconnect(url); }; this.ws.onerror = (error) => { console.error('WebSocket error:', error); }; } handleReconnect(url) { if (this.reconnectAttempts < this.maxReconnectAttempts) { this.reconnectAttempts++; const delay = this.reconnectDelay * Math.pow(2, this.reconnectAttempts - 1); setTimeout(() => { console.log(`Reconnecting... (attempt ${this.reconnectAttempts})`); this.connect(url); }, delay); } else { console.error('Max reconnection attempts reached'); } } }
class AiravataWebSocketClient { constructor(baseUrl, token) { this.baseUrl = baseUrl; this.token = token; this.connections = new Map(); this.subscriptions = new Map(); } connectToExperiment(experimentId, onMessage) { const url = `${this.baseUrl}/ws/experiments/${experimentId}?token=${this.token}`; const ws = new WebSocket(url); ws.onopen = () => { console.log(`Connected to experiment ${experimentId}`); this.subscribe(ws, 'experiment', experimentId); }; ws.onmessage = (event) => { const message = JSON.parse(event.data); onMessage(message); }; ws.onclose = () => { console.log(`Disconnected from experiment ${experimentId}`); this.connections.delete(experimentId); }; this.connections.set(experimentId, ws); return ws; } subscribe(ws, resourceType, resourceId) { const message = { type: 'system_status', data: { action: 'subscribe', resourceType: resourceType, resourceId: resourceId } }; ws.send(JSON.stringify(message)); } disconnect(experimentId) { const ws = this.connections.get(experimentId); if (ws) { ws.close(); this.connections.delete(experimentId); } } sendPing(experimentId) { const ws = this.connections.get(experimentId); if (ws && ws.readyState === WebSocket.OPEN) { const message = { type: 'ping', id: `ping-${Date.now()}`, timestamp: new Date().toISOString() }; ws.send(JSON.stringify(message)); } } } // Usage const client = new AiravataWebSocketClient('ws://localhost:8080', 'your_jwt_token'); client.connectToExperiment('exp-123', (message) => { switch (message.type) { case 'experiment_updated': console.log('Experiment updated:', message.data); break; case 'experiment_progress': console.log('Progress update:', message.data); break; case 'task_updated': console.log('Task updated:', message.data); break; case 'pong': console.log('Pong received'); break; } });
import { useState, useEffect, useRef } from 'react'; const useWebSocket = (url, token) => { const [socket, setSocket] = useState(null); const [lastMessage, setLastMessage] = useState(null); const [connectionStatus, setConnectionStatus] = useState('Connecting'); const reconnectTimeoutRef = useRef(null); useEffect(() => { const ws = new WebSocket(`${url}?token=${token}`); ws.onopen = () => { setConnectionStatus('Connected'); setSocket(ws); }; ws.onmessage = (event) => { const message = JSON.parse(event.data); setLastMessage(message); }; ws.onclose = () => { setConnectionStatus('Disconnected'); setSocket(null); // Auto-reconnect after 5 seconds reconnectTimeoutRef.current = setTimeout(() => { setConnectionStatus('Reconnecting'); }, 5000); }; ws.onerror = (error) => { console.error('WebSocket error:', error); setConnectionStatus('Error'); }; return () => { if (reconnectTimeoutRef.current) { clearTimeout(reconnectTimeoutRef.current); } ws.close(); }; }, [url, token]); const sendMessage = (message) => { if (socket && socket.readyState === WebSocket.OPEN) { socket.send(JSON.stringify(message)); } }; return { socket, lastMessage, connectionStatus, sendMessage }; }; // Usage in component const ExperimentDashboard = ({ experimentId }) => { const { lastMessage, connectionStatus, sendMessage } = useWebSocket( `ws://localhost:8080/ws/experiments/${experimentId}`, 'your_jwt_token' ); useEffect(() => { if (lastMessage) { switch (lastMessage.type) { case 'experiment_progress': // Update progress bar break; case 'task_updated': // Update task list break; } } }, [lastMessage]); return ( <div> <div>Status: {connectionStatus}</div> {/* Dashboard content */} </div> ); };
This comprehensive WebSocket protocol documentation provides everything needed to implement real-time communication with the Airavata Scheduler system. The protocol is designed to be reliable, scalable, and easy to integrate with any modern web application.