blob: c9102c4af85cf1334128ecd90299aee7e4deaa29 [file] [log] [blame]
import {$} from "bun";
import {VolumeManager} from "./volume-manager";
import {LogFormatter} from "./log-formatter";
import {DfRow} from "./models";
interface JobOperator {
/**
* Executes a Kubernetes job to analyze volume usage.
*
* Explanations:
* 1. Loads the job template using VolumeManager
* 2. Retrieves a list of bound PVCs from k8s cluster
* 3. Mounts each PVC as a read-only volume in the job
* 4. Applies the job template to the cluster
* 5. Waits for job completion and extracts raw logs
* 6. Processes the raw logs into structured data using the LogFormatter
*
* TODO: Add support for label/annotation-based PVC selection once supported by openserverless-operator
*
* @param {string} jobName - The name of the Kubernetes job (as defined within the template).
* @return {Promise<DfRow[]>} A promise that resolves with disk usage data for each PVC
*/
runJob(jobName: string): Promise<DfRow[]>
/**
* Cleans up the job to avoid further run conflicts.
*
* @param jobName the job to be deleted (as defined within the template).
*/
cleanup(jobName: string): Promise<void>
}
const kubectl = ['kubectl', '-n', 'nuvolaris'];
class JobOperatorImpl implements JobOperator {
constructor(private volumeManager: VolumeManager, private logProcessor: LogFormatter) {
}
async runJob(jobName: string): Promise<DfRow[]> {
await this.ensureJobNotExists(jobName);
await this.volumeManager.load();
const pvcList = await this.getPvcList();
pvcList.forEach((pvcName, i) => {
this.volumeManager.addVolume(
{
name: `pvc-${pvcName}`,
persistentVolumeClaim: {
claimName: pvcName
}
},
`/mnt/data${i}`
);
});
const rawLogs = await this.volumeManager.save()
.then(tpl => this.applyTemplate(tpl))
.then(() => this.extractJobLogs(jobName));
return this.logProcessor.pretty(pvcList, rawLogs);
}
async cleanup(jobName: string) {
await $`${kubectl} delete job/${jobName}`.quiet();
}
private async getPvcList() {
const {stdout: pvcListRaw} = await $`${kubectl} get pvc -o jsonpath='{range .items[?(@.status.phase=="Bound")]}{.metadata.name}{","}{end}`.quiet();
return pvcListRaw.toString().split(",").filter(v => v.length > 0);
}
private async applyTemplate(target: string) {
await $`${kubectl} apply -f ${target}`.quiet();
}
private async extractJobLogs(jobName: string) {
try {
await $`${kubectl} wait --for=condition=complete job/${jobName} --timeout=1m`.quiet();
} catch (error) {
console.error(`Job ${jobName} timeout or error.`, error);
await $`${kubectl} get job ${jobName}`;
process.exit(1);
}
const {stdout: logs} = await $`${kubectl} logs job/${jobName}`.quiet();
return logs.toString().trim();
}
private async ensureJobNotExists(jobName: string) {
try {
const {exitCode} = await $`${kubectl} get job/${jobName}`.quiet();
if (exitCode === 0) {
await $`${kubectl} delete job/${jobName}`.quiet();
}
} catch (error) {
// Job does not exist, nothing to do
}
}
}
class JobOperatorLogger implements JobOperator {
constructor(private next: JobOperator) {
}
async runJob(jobName: string): Promise<DfRow[]> {
console.log(`[JobOperator] Starting job: ${jobName}`);
const result = await this.next.runJob(jobName);
console.log(`[JobOperator] Job ${jobName} completed successfully`);
console.log(`[JobOperator] Processed ${result.length} PVC entries`);
return result;
}
async cleanup(jobName: string): Promise<void> {
console.log(`[JobOperator] Cleaning up job: ${jobName}`);
await this.next.cleanup(jobName);
console.log(`[JobOperator] Job ${jobName} cleaned up successfully`);
}
}
export const jobOperator = (volumeManager: VolumeManager, logFormatter: LogFormatter, debug: boolean): JobOperator => {
const operator = new JobOperatorImpl(volumeManager, logFormatter);
if (debug) {
return new JobOperatorLogger(operator);
}
return operator;
};