blob: a550cec11a083691f4b012f2b8e6df74cf94c088 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.submarine.server.submitter.k8s;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import io.kubernetes.client.openapi.ApiException;
import io.kubernetes.client.openapi.models.V1Deployment;
import io.kubernetes.client.openapi.models.V1ObjectMeta;
import io.kubernetes.client.openapi.models.V1Pod;
import io.kubernetes.client.openapi.models.V1PodList;
import io.kubernetes.client.util.generic.options.CreateOptions;
import io.kubernetes.client.util.generic.options.DeleteOptions;
import io.kubernetes.client.util.generic.options.ListOptions;
import org.apache.commons.lang3.StringUtils;
import org.apache.submarine.commons.utils.SubmarineConfVars;
import org.apache.submarine.commons.utils.SubmarineConfiguration;
import org.apache.submarine.commons.utils.exception.SubmarineRuntimeException;
import org.apache.submarine.serve.pytorch.SeldonPytorchServing;
import org.apache.submarine.serve.seldon.SeldonDeployment;
import org.apache.submarine.serve.tensorflow.SeldonTFServing;
import org.apache.submarine.server.k8s.utils.K8sUtils;
import org.apache.submarine.server.api.Submitter;
import org.apache.submarine.server.api.common.CustomResourceType;
import org.apache.submarine.server.api.exception.InvalidSpecException;
import org.apache.submarine.server.api.experiment.Experiment;
import org.apache.submarine.server.api.experiment.ExperimentLog;
import org.apache.submarine.server.api.experiment.MlflowInfo;
import org.apache.submarine.server.api.experiment.TensorboardInfo;
import org.apache.submarine.server.api.model.ServeSpec;
import org.apache.submarine.server.api.notebook.Notebook;
import org.apache.submarine.server.api.spec.ExperimentMeta;
import org.apache.submarine.server.api.spec.ExperimentSpec;
import org.apache.submarine.server.api.spec.NotebookSpec;
import org.apache.submarine.server.submitter.k8s.client.K8sClient;
import org.apache.submarine.server.submitter.k8s.client.K8sDefaultClient;
import org.apache.submarine.server.submitter.k8s.model.AgentPod;
import org.apache.submarine.server.submitter.k8s.model.K8sResource;
import org.apache.submarine.server.submitter.k8s.model.common.Configmap;
import org.apache.submarine.server.submitter.k8s.model.istio.IstioVirtualService;
import org.apache.submarine.server.submitter.k8s.model.common.NullResource;
import org.apache.submarine.server.submitter.k8s.model.common.PersistentVolumeClaim;
import org.apache.submarine.server.submitter.k8s.model.mljob.MLJob;
import org.apache.submarine.server.submitter.k8s.model.mljob.MLJobFactory;
import org.apache.submarine.server.submitter.k8s.model.notebook.NotebookCR;
import org.apache.submarine.server.submitter.k8s.util.NotebookUtils;
import org.apache.submarine.server.submitter.k8s.util.OwnerReferenceUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* JobSubmitter for Kubernetes Cluster.
*/
public class K8sSubmitter implements Submitter {
private static final Logger LOG = LoggerFactory.getLogger(K8sSubmitter.class);
private static final String TF_JOB_SELECTOR_KEY = "tf-job-name=";
private static final String PYTORCH_JOB_SELECTOR_KEY = "pytorch-job-name=";
private static final String XGBoost_JOB_SELECTOR_KEY = "xgboost-job-name=";
// Add an exception Consumer, handle the problem that delete operation does not have the resource
public static final Function<ApiException, Object> API_EXCEPTION_404_CONSUMER = e -> {
if (e.getCode() != 404) {
LOG.error("When submit resource to k8s get ApiException with code " + e.getCode(), e);
throw new SubmarineRuntimeException(e.getCode(), e.getMessage());
} else {
return null;
}
};
private static final String OVERWRITE_JSON;
static {
final SubmarineConfiguration conf = SubmarineConfiguration.getInstance();
OVERWRITE_JSON = conf.getString(
SubmarineConfVars.ConfVars.SUBMARINE_NOTEBOOK_DEFAULT_OVERWRITE_JSON);
}
// K8s API client for CRD
private K8sClient k8sClient;
public K8sSubmitter() {
}
public K8sSubmitter(K8sClient k8sClient) {
this.k8sClient = k8sClient;
}
@Override
public void initialize(SubmarineConfiguration conf) {
// move k8s clients init to org.apache.submarine.server.submitter.k8s.K8sClient
// For the compatibility of the current codes, several variables about api client are exposed temporarily
if (k8sClient == null) {
k8sClient = new K8sDefaultClient();
}
}
/**
* Commit resources with transaction
* @return committed return objects
*/
public List<Object> resourceTransaction(K8sResource... resources) {
Map<K8sResource, Object> commits = new LinkedHashMap<>();
try {
for (K8sResource resource : resources) {
if (resource != null) {
commits.put(resource, resource.create(k8sClient));
} else {
commits.put(new NullResource(), null);
}
}
return new ArrayList<>(commits.values());
} catch (Exception e) {
if (!commits.isEmpty()) {
// Rollback is performed in the reverse order of commits
List<K8sResource> rollbacks = new ArrayList<>(commits.keySet());
for (int i = rollbacks.size() - 1; i >= 0; i--) {
K8sResource rollback = rollbacks.get(i);
if (!(rollback instanceof NullResource)) {
LOG.info("Rollback resources {}/{}", rollback.getKind(), rollback.getMetadata().getName());
try {
rollbacks.get(i).delete(k8sClient);
} catch (Exception deleteErr) {
LOG.error("Failed to delete resource. You may need to delete it manually!", deleteErr);
}
}
}
}
throw e;
}
}
/**
* Delete resources with transaction
* This is an experimental API, Our main consideration is that:
* k8s resources transactional deletion cannot handle the rollback of transactions well,
* so we only guarantee the deletion of primary resource for the time being.
* We can tolerate the deletion failure of other dependent resources,
* so as to maximize the availability of the deletion API.
* @param primary primary resource, Failure of this resource will cause API exceptions
* @param dependentResources dependent resources
*/
public <T> T deleteResourcesTransaction(K8sResource<T> primary, K8sResource... dependentResources) {
T returnResource = primary.delete(k8sClient);
for (K8sResource dependent : dependentResources) {
try {
dependent.delete(k8sClient);
} catch (Exception e) {
LOG.warn(String.format("Delete %s/%s failed. %s", dependent.getKind(),
dependent.getMetadata().getName(), e.getMessage()), e);
// TODO(cdmikechen): Record the error information into audit service for later tracking
}
}
return returnResource;
}
public static V1ObjectMeta createMeta(String namespace, String name) {
V1ObjectMeta metadata = new V1ObjectMeta();
metadata.setNamespace(namespace);
metadata.setName(name);
metadata.setOwnerReferences(OwnerReferenceUtils.getOwnerReference());
return metadata;
}
private String getServerNamespace() {
return K8sUtils.getNamespace();
}
public static DeleteOptions getDeleteOptions(String apiVersion){
DeleteOptions deleteOptions = new DeleteOptions();
deleteOptions.setApiVersion(apiVersion);
return deleteOptions;
}
@Override
public Experiment createExperiment(ExperimentSpec spec) throws SubmarineRuntimeException {
try {
// MLJob K8s resource object
MLJob mlJob = MLJobFactory.getMLJob(spec);
mlJob.getMetadata().setOwnerReferences(OwnerReferenceUtils.getOwnerReference());
// Agent pod K8s resource object
AgentPod agentPod = new AgentPod(getServerNamespace(), spec.getMeta().getName(),
mlJob.getResourceType(), spec.getMeta().getExperimentId());
// commit resources/CRD with transaction
List<Object> values = resourceTransaction(mlJob, agentPod);
return (Experiment) values.get(0);
} catch (InvalidSpecException e) {
LOG.error(String.format("K8s submitter: parse %s object failed by %s",
spec.getMeta().getFramework(), e.getMessage()), e);
throw new SubmarineRuntimeException(500, e.getMessage());
}
}
@Override
public Experiment findExperiment(ExperimentSpec spec) throws SubmarineRuntimeException {
try {
// MLJob K8s resource object
MLJob mlJob = MLJobFactory.getMLJob(spec);
// Read Experiment
return mlJob.read(k8sClient);
} catch (InvalidSpecException e) {
throw new SubmarineRuntimeException(400, e.getMessage());
}
}
@Override
public Experiment patchExperiment(ExperimentSpec spec) throws SubmarineRuntimeException {
try {
// MLJob K8s resource object
MLJob mlJob = MLJobFactory.getMLJob(spec);
// Patch Experiment
return mlJob.replace(k8sClient);
} catch (InvalidSpecException e) {
throw new SubmarineRuntimeException(409, e.getMessage());
} catch (Error e) {
throw new SubmarineRuntimeException(500, String.format("Unhandled error: %s", e.getMessage()));
}
}
@Override
public Experiment deleteExperiment(ExperimentSpec spec) throws SubmarineRuntimeException {
try {
// MLJob K8s resource object
MLJob mlJob = MLJobFactory.getMLJob(spec);
// Agent pod K8s resource object
AgentPod agentPod = new AgentPod(getServerNamespace(), spec.getMeta().getName(),
mlJob.getResourceType(), spec.getMeta().getExperimentId());
// Delete with transaction
return deleteResourcesTransaction(mlJob, agentPod);
} catch (InvalidSpecException e) {
throw new SubmarineRuntimeException(200, e.getMessage());
}
}
@Override
public ExperimentLog getExperimentLogName(ExperimentSpec spec, String id) {
ExperimentLog experimentLog = new ExperimentLog();
experimentLog.setExperimentId(id);
try {
ListOptions listOptions = new ListOptions();
listOptions.setLabelSelector(getJobLabelSelector(spec));
final V1PodList podList = k8sClient.getPodClient().list(getServerNamespace(), listOptions)
.throwsApiException().getObject();
for (V1Pod pod : podList.getItems()) {
String podName = pod.getMetadata().getName();
experimentLog.addPodLog(podName, null);
}
} catch (final ApiException e) {
LOG.error("Error when listing pod for experiment:" + spec.getMeta().getName(), e.getMessage());
}
return experimentLog;
}
@Override
public ExperimentLog getExperimentLog(ExperimentSpec spec, String id) {
ExperimentLog experimentLog = new ExperimentLog();
experimentLog.setExperimentId(id);
try {
ListOptions listOptions = new ListOptions();
listOptions.setLabelSelector(getJobLabelSelector(spec));
final V1PodList podList = k8sClient.getPodClient().list(getServerNamespace(), listOptions)
.throwsApiException().getObject();
for (V1Pod pod : podList.getItems()) {
String podName = pod.getMetadata().getName();
String podLog = k8sClient.getCoreApi().readNamespacedPodLog(
podName, getServerNamespace(), null, Boolean.FALSE, null,
Integer.MAX_VALUE, null, Boolean.FALSE,
Integer.MAX_VALUE, null, Boolean.FALSE);
experimentLog.addPodLog(podName, podLog);
}
} catch (final ApiException e) {
LOG.error("Error when listing pod for experiment:" + spec.getMeta().getName(), e.getMessage());
}
return experimentLog;
}
@Override
public TensorboardInfo getTensorboardInfo() throws SubmarineRuntimeException {
final String name = "submarine-tensorboard";
try {
return new TensorboardInfo(isDeploymentAvailable(name));
} catch (ApiException e) {
throw new SubmarineRuntimeException(e.getCode(), e.getMessage());
}
}
@Override
public MlflowInfo getMlflowInfo() throws SubmarineRuntimeException {
final String name = "submarine-mlflow";
try {
return new MlflowInfo(isDeploymentAvailable(name));
} catch (ApiException e) {
throw new SubmarineRuntimeException(e.getCode(), e.getMessage());
}
}
private boolean isDeploymentAvailable(String name) throws ApiException{
V1Deployment deploy = k8sClient.getAppsV1Api()
.readNamespacedDeploymentStatus(name, getServerNamespace(), "true");
return deploy == null ? false : Optional.ofNullable(deploy.getStatus().getAvailableReplicas())
.map(ar -> ar > 0).orElse(false); // at least one replica is running
}
@Override
public Notebook createNotebook(NotebookSpec spec, String notebookId) throws SubmarineRuntimeException {
// index-3: parse notebook custom resource
NotebookCR notebookCR = new NotebookCR(spec, notebookId, getServerNamespace());
final String name = notebookCR.getMetadata().getName();
final String namespace = notebookCR.getMetadata().getNamespace();
// index-0: workspace pvc
PersistentVolumeClaim workspace = new PersistentVolumeClaim(namespace,
String.format("%s-%s", NotebookUtils.PVC_PREFIX, name), NotebookUtils.STORAGE);
// index-1: user setting pvc
PersistentVolumeClaim userset = new PersistentVolumeClaim(namespace,
String.format("%s-user-%s", NotebookUtils.PVC_PREFIX, name), NotebookUtils.DEFAULT_USER_STORAGE);
// index-2: overwrite.json configmap
Configmap overwrite = null;
if (StringUtils.isNotBlank(OVERWRITE_JSON)) {
overwrite = new Configmap(namespace, String.format("%s-%s", NotebookUtils.OVERWRITE_PREFIX, name),
NotebookUtils.DEFAULT_OVERWRITE_FILE_NAME, OVERWRITE_JSON);
}
// index-4: agent
AgentPod agentPod = new AgentPod(namespace, spec.getMeta().getName(),
CustomResourceType.Notebook, notebookId);
// index-5: notebook VirtualService custom resource
IstioVirtualService istioVirtualService = new IstioVirtualService(createMeta(namespace, name));
// commit resources/CRD with transaction
List<Object> values = resourceTransaction(workspace, userset, overwrite, notebookCR,
agentPod, istioVirtualService);
return (Notebook) values.get(3);
}
@Override
public Notebook findNotebook(NotebookSpec spec, String notebookId) throws SubmarineRuntimeException {
NotebookCR notebookCR = new NotebookCR(spec, notebookId, getServerNamespace());
Notebook notebook = notebookCR.read(k8sClient);
if (notebook.getSpec() == null) {
notebook.setSpec(spec);
}
return notebook;
}
@Override
public Notebook deleteNotebook(NotebookSpec spec, String notebookId) throws SubmarineRuntimeException {
NotebookCR notebookCR = new NotebookCR(spec, notebookId, getServerNamespace());
final String name = notebookCR.getMetadata().getName();
final String namespace = notebookCR.getMetadata().getNamespace();
// delete crd
Notebook notebook = notebookCR.delete(k8sClient);
// delete VirtualService
new IstioVirtualService(createMeta(namespace, name)).delete(k8sClient);
// delete pvc
// workspace pvc
new PersistentVolumeClaim(namespace, String.format("%s-%s", NotebookUtils.PVC_PREFIX, name),
NotebookUtils.STORAGE).delete(k8sClient);
// user set pvc
new PersistentVolumeClaim(namespace, String.format("%s-user-%s", NotebookUtils.PVC_PREFIX, name),
NotebookUtils.DEFAULT_USER_STORAGE).delete(k8sClient);
// configmap
if (StringUtils.isNoneBlank(OVERWRITE_JSON)) {
new Configmap(namespace, String.format("%s-%s", NotebookUtils.OVERWRITE_PREFIX, name))
.delete(k8sClient);
}
// delete agent
AgentPod agentPod = new AgentPod(namespace, spec.getMeta().getName(),
CustomResourceType.Notebook, notebookId);
LOG.info(String.format("Notebook:%s had been deleted, start to delete agent pod:%s",
spec.getMeta().getName(), agentPod.getMetadata().getName()));
new AgentPod(namespace, spec.getMeta().getName(), CustomResourceType.Notebook, notebookId)
.delete(k8sClient);
return notebook;
}
@Override
public List<Notebook> listNotebook(String id) throws SubmarineRuntimeException {
List<Notebook> notebookList;
String namespace = getServerNamespace();
try {
ListOptions listOptions = new ListOptions();
listOptions.setLabelSelector(NotebookCR.NOTEBOOK_OWNER_SELECTOR_KEY + "=" + id);
Object object = k8sClient.getNotebookCRClient().list(namespace, listOptions)
.throwsApiException().getObject();
notebookList = NotebookUtils.parseObjectForList(object);
} catch (ApiException e) {
throw new SubmarineRuntimeException(e.getCode(), e.getMessage());
}
return notebookList;
}
@Override
public void createServe(ServeSpec spec)
throws SubmarineRuntimeException {
SeldonDeployment seldonDeployment = parseServeSpec(spec);
IstioVirtualService istioVirtualService = new IstioVirtualService(spec.getModelName(),
spec.getModelVersion());
try {
k8sClient.getSeldonDeploymentClient().create("default", seldonDeployment,
new CreateOptions()).throwsApiException();
} catch (ApiException e) {
LOG.error(e.getMessage(), e);
throw new SubmarineRuntimeException(e.getCode(), e.getMessage());
}
try {
k8sClient.getIstioVirtualServiceClient().create("default", istioVirtualService, new CreateOptions())
.throwsApiException();
} catch (ApiException e) {
LOG.error(e.getMessage(), e);
try {
k8sClient.getSeldonDeploymentClient().delete("default", seldonDeployment.getMetadata().getName(),
getDeleteOptions(seldonDeployment.getApiVersion())).throwsApiException();
} catch (ApiException e1) {
LOG.error(e1.getMessage(), e1);
}
throw new SubmarineRuntimeException(e.getCode(), e.getMessage());
}
}
@Override
public void deleteServe(ServeSpec spec)
throws SubmarineRuntimeException {
SeldonDeployment seldonDeployment = parseServeSpec(spec);
IstioVirtualService istioVirtualService = new IstioVirtualService(spec.getModelName(),
spec.getModelVersion());
try {
k8sClient.getSeldonDeploymentClient().delete("default", seldonDeployment.getMetadata().getName(),
getDeleteOptions(seldonDeployment.getApiVersion())).throwsApiException();
k8sClient.getIstioVirtualServiceClient().delete("default", istioVirtualService.getMetadata().getName(),
getDeleteOptions(istioVirtualService.getApiVersion())).throwsApiException();
} catch (ApiException e) {
LOG.error(e.getMessage(), e);
throw new SubmarineRuntimeException(e.getCode(), e.getMessage());
}
}
private String getJobLabelSelector(ExperimentSpec experimentSpec) {
if (experimentSpec.getMeta().getFramework()
.equalsIgnoreCase(ExperimentMeta.SupportedMLFramework.TENSORFLOW.getName())) {
return TF_JOB_SELECTOR_KEY + experimentSpec.getMeta().getExperimentId();
} else if (experimentSpec.getMeta().getFramework()
.equalsIgnoreCase(ExperimentMeta.SupportedMLFramework.XGBOOST.getName())) {
return XGBoost_JOB_SELECTOR_KEY + experimentSpec.getMeta().getExperimentId();
}
else {
return PYTORCH_JOB_SELECTOR_KEY + experimentSpec.getMeta().getExperimentId();
}
}
private SeldonDeployment parseServeSpec(ServeSpec spec) throws SubmarineRuntimeException {
String modelName = spec.getModelName();
String modelType = spec.getModelType();
String modelURI = spec.getModelURI();
SeldonDeployment seldonDeployment;
if (modelType.equals("tensorflow")){
seldonDeployment = new SeldonTFServing(modelName, modelURI);
} else if (modelType.equals("pytorch")){
seldonDeployment = new SeldonPytorchServing(modelName, modelURI);
} else {
throw new SubmarineRuntimeException("Given serve type: " + modelType + " is not supported.");
}
return seldonDeployment;
}
}