SUBMARINE-1287. Experiment codes refactoring
### What is this PR for?
Based on the modified refactored notebook codes, the experiment codes are also need to be refactored.
### What type of PR is it?
Refactoring
### Todos
* [x] - Experiment codes refactoring in K8sSubmitter
* [x] - Add experiment test cases
* [x] - Fix some test cases error (Add h2 database dependency for test)
* [x] - Add `submarine-submitter-k8s` package test cases to workflow
### What is the Jira issue?
https://issues.apache.org/jira/browse/SUBMARINE-1287
### How should this be tested?
Add experiment test cases (tensorflow, pytorch, xgboots)
### Screenshots (if appropriate)
No
### Questions:
* Do the license files need updating? No
* Are there breaking changes for older versions? Yes
* Does this need new documentation? No
Author: cdmikechen <cdmikechen@hotmail.com>
Signed-off-by: Kevin <pingsutw@apache.org>
Closes #974 from cdmikechen/SUBMARINE-1287 and squashes the following commits:
01e2828f [cdmikechen] add jacoco cache
4b50fe23 [cdmikechen] Fix codecov error
09f19423 [cdmikechen] add submarine-submitter-k8s codecov
4f761d75 [cdmikechen] Add submarine-submitter-k8s to build
4d8fee16 [cdmikechen] Add test workflow
580e2cce [cdmikechen] Fix SubmitterTransactionTest error
25b3a088 [cdmikechen] Add PyTorch/XGBoost test
d0d9b55a [cdmikechen] Change agent name
a62e6117 [cdmikechen] Add SubmitterPyTorchApiTest
f857521f [cdmikechen] Fix NotebookSpecParserTest error
5401e2a5 [cdmikechen] Refactor Experiment
diff --git a/.github/workflows/master.yml b/.github/workflows/master.yml
index 9d74e5b..1f3ca70 100644
--- a/.github/workflows/master.yml
+++ b/.github/workflows/master.yml
@@ -555,11 +555,17 @@
- uses: actions/checkout@v2
with:
fetch-depth: 50
- - name: Cache jacoco.exec
+ - name: Cache submarine-server jacoco.exec
uses: actions/cache@v2
with:
path: |
./submarine-server/server-submitter/target/jacoco.exec
+ key: ${{ runner.os }}-docker-${{ github.sha }}
+ - name: Cache submitter-k8s jacoco.exec
+ uses: actions/cache@v2
+ with:
+ path: |
+ ./submarine-server/server-submitter/submitter-k8s/target/jacoco.exec
key: ${{ runner.os }}-docker-${{ github.sha }}
- name: Set up JDK 1.8
uses: actions/setup-java@v1
@@ -575,13 +581,14 @@
java -version
- name: Build
env:
- MODULES: "-pl :submarine-server-submitter"
+ MODULES: "-pl :submarine-server-submitter,:submarine-submitter-k8s"
run: |
echo ">>> mvn $BUILD_FLAG $MODULES -B"
mvn $BUILD_FLAG $MODULES -B
- name: Test
env:
- TEST_MODULES: "-pl :submarine-server-submitter"
+ # There is a `submarine-submitter-k8s` package under the `submarine-server-submitter` that also needs to be tested
+ TEST_MODULES: "-pl :submarine-server-submitter,:submarine-submitter-k8s"
run: |
echo ">>> mvn $TEST_FLAG $TEST_MODULES -B"
mvn $TEST_FLAG $TEST_MODULES -B
@@ -708,6 +715,12 @@
path: |
./submarine-server/server-submitter/target/jacoco.exec
key: ${{ runner.os }}-docker-${{ github.sha }}
+ - name: Cache submarine-submitter-k8s data
+ uses: actions/cache@v2
+ with:
+ path: |
+ ./submarine-server/server-submitter/submitter-k8s/target/jacoco.exec
+ key: ${{ runner.os }}-docker-${{ github.sha }}
- name: Cache SonarCloud packages
uses: actions/cache@v1
with:
diff --git a/pom.xml b/pom.xml
index f21c1d5..301decd 100644
--- a/pom.xml
+++ b/pom.xml
@@ -88,6 +88,7 @@
<bytebuddy.version>1.11.20</bytebuddy.version>
<mybatis.version>3.2.8</mybatis.version>
<mysql-connector-java.version>5.1.41</mysql-connector-java.version>
+ <h2-connector-java.version>1.4.194</h2-connector-java.version>
<grpc.version>1.25.0</grpc.version>
<!-- frontend maven plugin related versions-->
diff --git a/submarine-commons/commons-utils/src/main/java/org/apache/submarine/commons/utils/SubmarineConfiguration.java b/submarine-commons/commons-utils/src/main/java/org/apache/submarine/commons/utils/SubmarineConfiguration.java
index 3259062..fa5895e 100644
--- a/submarine-commons/commons-utils/src/main/java/org/apache/submarine/commons/utils/SubmarineConfiguration.java
+++ b/submarine-commons/commons-utils/src/main/java/org/apache/submarine/commons/utils/SubmarineConfiguration.java
@@ -240,6 +240,10 @@
properties.put(SubmarineConfVars.ConfVars.METASTORE_JDBC_URL.getVarName(), testMetastoreJdbcUrl);
}
+ @VisibleForTesting
+ public void setJdbcDriverClassName(String driverClassName) {
+ properties.put(SubmarineConfVars.ConfVars.JDBC_DRIVERCLASSNAME.getVarName(), driverClassName);
+ }
@VisibleForTesting
public void setJdbcUrl(String testJdbcUrl) {
@@ -314,7 +318,7 @@
public String getServerServiceName() {
return getString(SubmarineConfVars.ConfVars.SUBMARINE_SERVER_SERVICE_NAME);
}
-
+
private String getStringValue(String name, String d) {
String value = this.properties.get(name);
if (value != null) {
diff --git a/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/exception/InvalidSpecException.java b/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/exception/InvalidSpecException.java
index 5e14f70..e911b79 100644
--- a/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/exception/InvalidSpecException.java
+++ b/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/exception/InvalidSpecException.java
@@ -19,7 +19,7 @@
package org.apache.submarine.server.api.exception;
-public class InvalidSpecException extends Exception {
+public class InvalidSpecException extends RuntimeException {
private static final long serialVersionUID = -1148223492821245434L;
public InvalidSpecException(String message) {
diff --git a/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/spec/ExperimentMeta.java b/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/spec/ExperimentMeta.java
index 6e8d3dd..e4e8172 100644
--- a/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/spec/ExperimentMeta.java
+++ b/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/spec/ExperimentMeta.java
@@ -151,7 +151,8 @@
public enum SupportedMLFramework {
TENSORFLOW("tensorflow"),
PYTORCH("pytorch"),
- XGBOOST("xgboost");
+ XGBOOST("xgboost"),
+ UNKNOWN("known");
private final String name;
@@ -171,6 +172,15 @@
}
return names;
}
+
+ public static SupportedMLFramework valueOfName(String name) {
+ for (SupportedMLFramework framework : values()) {
+ if (framework.getName().equalsIgnoreCase(name)) {
+ return framework;
+ }
+ }
+ return UNKNOWN;
+ }
}
@Override
diff --git a/submarine-server/server-core/src/main/java/org/apache/submarine/server/manager/NotebookManager.java b/submarine-server/server-core/src/main/java/org/apache/submarine/server/manager/NotebookManager.java
index c691809..7698673 100644
--- a/submarine-server/server-core/src/main/java/org/apache/submarine/server/manager/NotebookManager.java
+++ b/submarine-server/server-core/src/main/java/org/apache/submarine/server/manager/NotebookManager.java
@@ -30,9 +30,7 @@
import javax.ws.rs.core.Response;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.submarine.server.database.notebook.service.NotebookService;
import org.slf4j.Logger;
@@ -83,16 +81,8 @@
spec.getMeta().setName(lowerName);
NotebookId notebookId = generateNotebookId();
- Map<String, String> labels = spec.getMeta().getLabels();
-
- if (labels == null) {
- labels = new HashMap<>();
- }
- labels.put("notebook-owner-id", spec.getMeta().getOwnerId());
- labels.put("notebook-id", notebookId.toString());
- spec.getMeta().setLabels(labels);
+ // create notebook
Notebook notebook = submitter.createNotebook(spec, notebookId.toString());
-
notebook.setNotebookId(notebookId);
notebook.setSpec(spec);
diff --git a/submarine-server/server-database/src/main/java/org/apache/submarine/server/database/utils/MyBatisUtil.java b/submarine-server/server-database/src/main/java/org/apache/submarine/server/database/utils/MyBatisUtil.java
index 28c4bd4..69ab6c1 100755
--- a/submarine-server/server-database/src/main/java/org/apache/submarine/server/database/utils/MyBatisUtil.java
+++ b/submarine-server/server-database/src/main/java/org/apache/submarine/server/database/utils/MyBatisUtil.java
@@ -85,10 +85,12 @@
LOG.info("Run the test unit using the test database");
// Run the test unit using the test database
SubmarineConfiguration conf = SubmarineConfiguration.getInstance();
- conf.setJdbcUrl("jdbc:mysql://127.0.0.1:3306/submarine_test?" +
- "useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&allowMultiQueries=true&" +
- "failOverReadOnly=false&zeroDateTimeBehavior=convertToNull&useSSL=false");
- conf.setJdbcUserName("submarine_test");
- conf.setJdbcPassword("password_test");
+ if (conf.getJdbcUrl().startsWith("jdbc:mysql")) {
+ conf.setJdbcUrl("jdbc:mysql://127.0.0.1:3306/submarine_test?" +
+ "useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&allowMultiQueries=true&" +
+ "failOverReadOnly=false&zeroDateTimeBehavior=convertToNull&useSSL=false");
+ conf.setJdbcUserName("submarine_test");
+ conf.setJdbcPassword("password_test");
+ }
}
}
diff --git a/submarine-server/server-submitter/submitter-k8s/pom.xml b/submarine-server/server-submitter/submitter-k8s/pom.xml
index 497b766..96e518b 100644
--- a/submarine-server/server-submitter/submitter-k8s/pom.xml
+++ b/submarine-server/server-submitter/submitter-k8s/pom.xml
@@ -88,6 +88,10 @@
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>org.apache.derby</groupId>
+ <artifactId>derby</artifactId>
+ </exclusion>
</exclusions>
</dependency>
@@ -122,6 +126,13 @@
<version>${jackson-databind.version}</version>
</dependency>
+ <dependency>
+ <groupId>com.h2database</groupId>
+ <artifactId>h2</artifactId>
+ <version>${h2-connector-java.version}</version>
+ <scope>test</scope>
+ </dependency>
+
</dependencies>
<build>
diff --git a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/K8sSubmitter.java b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/K8sSubmitter.java
index d696dce..a550cec 100644
--- a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/K8sSubmitter.java
+++ b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/K8sSubmitter.java
@@ -23,20 +23,14 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.function.Function;
-import com.google.gson.Gson;
-import com.google.gson.JsonSyntaxException;
-
-import io.kubernetes.client.util.generic.options.PatchOptions;
import io.kubernetes.client.openapi.ApiException;
-import io.kubernetes.client.openapi.JSON;
-import io.kubernetes.client.custom.V1Patch;
import io.kubernetes.client.openapi.models.V1Deployment;
import io.kubernetes.client.openapi.models.V1ObjectMeta;
import io.kubernetes.client.openapi.models.V1Pod;
import io.kubernetes.client.openapi.models.V1PodList;
-import io.kubernetes.client.openapi.models.V1Status;
import io.kubernetes.client.util.generic.options.CreateOptions;
import io.kubernetes.client.util.generic.options.DeleteOptions;
import io.kubernetes.client.util.generic.options.ListOptions;
@@ -70,12 +64,8 @@
import org.apache.submarine.server.submitter.k8s.model.common.NullResource;
import org.apache.submarine.server.submitter.k8s.model.common.PersistentVolumeClaim;
import org.apache.submarine.server.submitter.k8s.model.mljob.MLJob;
+import org.apache.submarine.server.submitter.k8s.model.mljob.MLJobFactory;
import org.apache.submarine.server.submitter.k8s.model.notebook.NotebookCR;
-import org.apache.submarine.server.submitter.k8s.model.tfjob.TFJob;
-import org.apache.submarine.server.submitter.k8s.model.pytorchjob.PyTorchJob;
-import org.apache.submarine.server.submitter.k8s.model.xgboostjob.XGBoostJob;
-import org.apache.submarine.server.submitter.k8s.parser.ExperimentSpecParser;
-import org.apache.submarine.server.submitter.k8s.util.MLJobConverter;
import org.apache.submarine.server.submitter.k8s.util.NotebookUtils;
import org.apache.submarine.server.submitter.k8s.util.OwnerReferenceUtils;
@@ -165,6 +155,30 @@
}
}
+ /**
+ * Delete resources with transaction
+ * This is an experimental API, Our main consideration is that:
+ * k8s resources transactional deletion cannot handle the rollback of transactions well,
+ * so we only guarantee the deletion of primary resource for the time being.
+ * We can tolerate the deletion failure of other dependent resources,
+ * so as to maximize the availability of the deletion API.
+ * @param primary primary resource, Failure of this resource will cause API exceptions
+ * @param dependentResources dependent resources
+ */
+ public <T> T deleteResourcesTransaction(K8sResource<T> primary, K8sResource... dependentResources) {
+ T returnResource = primary.delete(k8sClient);
+ for (K8sResource dependent : dependentResources) {
+ try {
+ dependent.delete(k8sClient);
+ } catch (Exception e) {
+ LOG.warn(String.format("Delete %s/%s failed. %s", dependent.getKind(),
+ dependent.getMetadata().getName(), e.getMessage()), e);
+ // TODO(cdmikechen): Record the error information into audit service for later tracking
+ }
+ }
+ return returnResource;
+ }
+
public static V1ObjectMeta createMeta(String namespace, String name) {
V1ObjectMeta metadata = new V1ObjectMeta();
metadata.setNamespace(namespace);
@@ -183,190 +197,64 @@
return deleteOptions;
}
- private enum ParseOp {
- PARSE_OP_RESULT,
- PARSE_OP_DELETE
- }
-
@Override
public Experiment createExperiment(ExperimentSpec spec) throws SubmarineRuntimeException {
- Experiment experiment;
try {
- MLJob mlJob = ExperimentSpecParser.parseJob(spec);
- mlJob.getMetadata().setNamespace(getServerNamespace());
+ // MLJob K8s resource object
+ MLJob mlJob = MLJobFactory.getMLJob(spec);
mlJob.getMetadata().setOwnerReferences(OwnerReferenceUtils.getOwnerReference());
-
- CustomResourceType customResourceType;
- if (mlJob.getPlural().equals(TFJob.CRD_TF_PLURAL_V1)) {
- customResourceType = CustomResourceType.TFJob;
- } else if (mlJob.getPlural().equals(XGBoostJob.CRD_XGBOOST_PLURAL_V1)) {
- customResourceType = CustomResourceType.XGBoost;
- } else {
- customResourceType = CustomResourceType.PyTorchJob;
- }
-
- AgentPod agentPod = new AgentPod(getServerNamespace(), spec.getMeta().getName(), customResourceType,
- spec.getMeta().getExperimentId());
-
- Object object;
- if (mlJob.getPlural().equals(TFJob.CRD_TF_PLURAL_V1)) {
- object = k8sClient.getTfJobClient().create(getServerNamespace(), (TFJob) mlJob,
- new CreateOptions()).throwsApiException().getObject();
- } else if (mlJob.getPlural().equals(XGBoostJob.CRD_XGBOOST_PLURAL_V1)) {
- object = k8sClient.getXGBoostJobClient().create(getServerNamespace(), (XGBoostJob) mlJob,
- new CreateOptions()).throwsApiException().getObject();
- } else {
- object = k8sClient.getPyTorchJobClient().create(getServerNamespace(), (PyTorchJob) mlJob,
- new CreateOptions()).throwsApiException().getObject();
- }
-
- k8sClient.getPodClient().create(agentPod).throwsApiException().getObject();
- experiment = parseExperimentResponseObject(object, ParseOp.PARSE_OP_RESULT);
+ // Agent pod K8s resource object
+ AgentPod agentPod = new AgentPod(getServerNamespace(), spec.getMeta().getName(),
+ mlJob.getResourceType(), spec.getMeta().getExperimentId());
+ // commit resources/CRD with transaction
+ List<Object> values = resourceTransaction(mlJob, agentPod);
+ return (Experiment) values.get(0);
} catch (InvalidSpecException e) {
- LOG.error("K8s submitter: parse Job object failed by " + e.getMessage(), e);
- throw new SubmarineRuntimeException(400, e.getMessage());
- } catch (ApiException e) {
- LOG.error("K8s submitter: failed to create pod " + e.getMessage(), e);
- throw new SubmarineRuntimeException(e.getCode(), "K8s submitter: failed to create pod " +
- e.getMessage());
+ LOG.error(String.format("K8s submitter: parse %s object failed by %s",
+ spec.getMeta().getFramework(), e.getMessage()), e);
+ throw new SubmarineRuntimeException(500, e.getMessage());
}
- return experiment;
}
@Override
public Experiment findExperiment(ExperimentSpec spec) throws SubmarineRuntimeException {
- Experiment experiment;
try {
-
- MLJob mlJob = ExperimentSpecParser.parseJob(spec);
- mlJob.getMetadata().setNamespace(getServerNamespace());
-
- Object object;
- if (mlJob.getPlural().equals(TFJob.CRD_TF_PLURAL_V1)) {
- object = k8sClient.getTfJobClient().get(getServerNamespace(),
- mlJob.getMetadata().getName()).throwsApiException().getObject();
- } else if (mlJob.getPlural().equals(XGBoostJob.CRD_XGBOOST_PLURAL_V1)) {
- object = k8sClient.getXGBoostJobClient().get(getServerNamespace(),
- mlJob.getMetadata().getName()).throwsApiException().getObject();
- } else {
- object = k8sClient.getPyTorchJobClient().get(getServerNamespace(),
- mlJob.getMetadata().getName()).throwsApiException().getObject();
- }
-
- experiment = parseExperimentResponseObject(object, ParseOp.PARSE_OP_RESULT);
-
+ // MLJob K8s resource object
+ MLJob mlJob = MLJobFactory.getMLJob(spec);
+ // Read Experiment
+ return mlJob.read(k8sClient);
} catch (InvalidSpecException e) {
- throw new SubmarineRuntimeException(200, e.getMessage());
- } catch (ApiException e) {
- throw new SubmarineRuntimeException(e.getCode(), e.getMessage());
+ throw new SubmarineRuntimeException(400, e.getMessage());
}
-
- return experiment;
}
@Override
public Experiment patchExperiment(ExperimentSpec spec) throws SubmarineRuntimeException {
- Experiment experiment;
try {
- MLJob mlJob = ExperimentSpecParser.parseJob(spec);
- mlJob.getMetadata().setNamespace(getServerNamespace());
- // Using apply yaml patch, field manager must be set, and it must be forced.
- // https://kubernetes.io/docs/reference/using-api/server-side-apply/#field-management
- PatchOptions patchOptions = new PatchOptions();
- patchOptions.setFieldManager(spec.getMeta().getExperimentId());
- patchOptions.setForce(true);
- Object object;
- if (mlJob.getPlural().equals(TFJob.CRD_TF_PLURAL_V1)) {
- object = k8sClient.getTfJobClient().patch(getServerNamespace(), mlJob.getMetadata().getName(),
- V1Patch.PATCH_FORMAT_APPLY_YAML,
- new V1Patch(new Gson().toJson(mlJob)),
- patchOptions).throwsApiException().getObject();
- } else if (mlJob.getPlural().equals(XGBoostJob.CRD_XGBOOST_PLURAL_V1)) {
- object = k8sClient.getXGBoostJobClient().patch(getServerNamespace(), mlJob.getMetadata().getName(),
- V1Patch.PATCH_FORMAT_APPLY_YAML,
- new V1Patch(new Gson().toJson(mlJob)),
- patchOptions).throwsApiException().getObject();
- } else {
- object = k8sClient.getPyTorchJobClient().patch(getServerNamespace(), mlJob.getMetadata().getName(),
- V1Patch.PATCH_FORMAT_APPLY_YAML,
- new V1Patch(new Gson().toJson(mlJob)),
- patchOptions).throwsApiException().getObject();
- }
-
- experiment = parseExperimentResponseObject(object, ParseOp.PARSE_OP_RESULT);
+ // MLJob K8s resource object
+ MLJob mlJob = MLJobFactory.getMLJob(spec);
+ // Patch Experiment
+ return mlJob.replace(k8sClient);
} catch (InvalidSpecException e) {
throw new SubmarineRuntimeException(409, e.getMessage());
- } catch (ApiException e) {
- throw new SubmarineRuntimeException(e.getCode(), e.getMessage());
} catch (Error e) {
throw new SubmarineRuntimeException(500, String.format("Unhandled error: %s", e.getMessage()));
}
- return experiment;
}
@Override
public Experiment deleteExperiment(ExperimentSpec spec) throws SubmarineRuntimeException {
- Experiment experiment;
try {
- MLJob mlJob = ExperimentSpecParser.parseJob(spec);
- mlJob.getMetadata().setNamespace(getServerNamespace());
-
- CustomResourceType customResourceType;
- if (mlJob.getPlural().equals(TFJob.CRD_TF_PLURAL_V1)) {
- customResourceType = CustomResourceType.TFJob;
- } else if (mlJob.getPlural().equals(XGBoostJob.CRD_XGBOOST_PLURAL_V1)) {
- customResourceType = CustomResourceType.XGBoost;
- } else {
- customResourceType = CustomResourceType.PyTorchJob;
- }
-
- AgentPod agentPod = new AgentPod(getServerNamespace(), spec.getMeta().getName(), customResourceType,
- spec.getMeta().getExperimentId());
-
- Object object;
- if (mlJob.getPlural().equals(TFJob.CRD_TF_PLURAL_V1)) {
- object = k8sClient.getTfJobClient().delete(getServerNamespace(), mlJob.getMetadata().getName(),
- MLJobConverter.toDeleteOptionsFromMLJob(mlJob)).throwsApiException().getStatus();
- } else if (mlJob.getPlural().equals(XGBoostJob.CRD_XGBOOST_PLURAL_V1)) {
- object = k8sClient.getXGBoostJobClient().delete(getServerNamespace(), mlJob.getMetadata().getName(),
- MLJobConverter.toDeleteOptionsFromMLJob(mlJob))
- .throwsApiException().getStatus();
- } else {
- object = k8sClient.getPyTorchJobClient().delete(getServerNamespace(), mlJob.getMetadata().getName(),
- MLJobConverter.toDeleteOptionsFromMLJob(mlJob))
- .throwsApiException().getStatus();
- }
-
- LOG.info(String.format("Experiment:%s had been deleted, start to delete agent pod:%s",
- spec.getMeta().getName(), agentPod.getMetadata().getName()));
- k8sClient.getPodClient().delete(agentPod.getMetadata().getNamespace(),
- agentPod.getMetadata().getName());
- experiment = parseExperimentResponseObject(object, ParseOp.PARSE_OP_DELETE);
+ // MLJob K8s resource object
+ MLJob mlJob = MLJobFactory.getMLJob(spec);
+ // Agent pod K8s resource object
+ AgentPod agentPod = new AgentPod(getServerNamespace(), spec.getMeta().getName(),
+ mlJob.getResourceType(), spec.getMeta().getExperimentId());
+ // Delete with transaction
+ return deleteResourcesTransaction(mlJob, agentPod);
} catch (InvalidSpecException e) {
throw new SubmarineRuntimeException(200, e.getMessage());
- } catch (ApiException e) {
- throw new SubmarineRuntimeException(e.getCode(), e.getMessage());
}
- return experiment;
- }
-
- private Experiment parseExperimentResponseObject(Object object, ParseOp op)
- throws SubmarineRuntimeException {
- Gson gson = new JSON().getGson();
- String jsonString = gson.toJson(object);
- LOG.info("Upstream response JSON: {}", jsonString);
- try {
- if (op == ParseOp.PARSE_OP_RESULT) {
- MLJob mlJob = gson.fromJson(jsonString, MLJob.class);
- return MLJobConverter.toJobFromMLJob(mlJob);
- } else if (op == ParseOp.PARSE_OP_DELETE) {
- V1Status status = gson.fromJson(jsonString, V1Status.class);
- return MLJobConverter.toJobFromStatus(status);
- }
- } catch (JsonSyntaxException e) {
- LOG.error("K8s submitter: parse response object failed by " + e.getMessage(), e);
- }
- throw new SubmarineRuntimeException(500, "K8s Submitter parse upstream response failed.");
}
@Override
@@ -435,7 +323,8 @@
private boolean isDeploymentAvailable(String name) throws ApiException{
V1Deployment deploy = k8sClient.getAppsV1Api()
.readNamespacedDeploymentStatus(name, getServerNamespace(), "true");
- return deploy.getStatus().getAvailableReplicas() > 0; // at least one replica is running
+ return deploy == null ? false : Optional.ofNullable(deploy.getStatus().getAvailableReplicas())
+ .map(ar -> ar > 0).orElse(false); // at least one replica is running
}
@Override
diff --git a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/AgentPod.java b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/AgentPod.java
index 2455ce5..f220d35 100644
--- a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/AgentPod.java
+++ b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/AgentPod.java
@@ -106,19 +106,18 @@
this.setSpec(spec);
}
- private String getNormalizePodName(CustomResourceType type, String name, String resourceId) {
- return String.format("%s-%s-%s-%s", resourceId.toString().toLowerCase().replace('_', '-'),
+ public static String getNormalizePodName(CustomResourceType type, String name, String resourceId) {
+ return String.format("%s-%s-%s-%s", resourceId.toLowerCase().replace('_', '-'),
type.toString().toLowerCase(), name, CONTAINER_NAME);
}
@Override
public AgentPod read(K8sClient api) {
- return null;
+ throw new UnsupportedOperationException();
}
@Override
public AgentPod create(K8sClient api) {
- // create notebook custom resource
try {
if (LOG.isDebugEnabled()) {
LOG.debug("Create AgentPod resource: \n{}", YamlUtils.toPrettyYaml(this));
diff --git a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/K8sResource.java b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/K8sResource.java
index 61c0134..d74859b 100644
--- a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/K8sResource.java
+++ b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/K8sResource.java
@@ -28,11 +28,23 @@
V1ObjectMeta getMetadata();
+ /**
+ * Resource get api
+ */
R read(K8sClient api);
+ /**
+ * Resource create api
+ */
R create(K8sClient api);
+ /**
+ * Resource patch api
+ */
R replace(K8sClient api);
+ /**
+ * Resource delete api
+ */
R delete(K8sClient api);
}
diff --git a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/mljob/MLJob.java b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/mljob/MLJob.java
index 567d86f..45c174c 100644
--- a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/mljob/MLJob.java
+++ b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/mljob/MLJob.java
@@ -19,16 +19,52 @@
package org.apache.submarine.server.submitter.k8s.model.mljob;
+import com.google.gson.JsonSyntaxException;
import com.google.gson.annotations.SerializedName;
import io.kubernetes.client.common.KubernetesObject;
import io.kubernetes.client.openapi.models.V1JobStatus;
import io.kubernetes.client.openapi.models.V1ObjectMeta;
+import io.kubernetes.client.openapi.models.V1ObjectMetaBuilder;
+import io.kubernetes.client.openapi.models.V1Status;
+import org.apache.submarine.commons.utils.exception.SubmarineRuntimeException;
+import org.apache.submarine.server.api.common.CustomResourceType;
+import org.apache.submarine.server.api.experiment.Experiment;
+import org.apache.submarine.server.api.spec.ExperimentMeta;
+import org.apache.submarine.server.api.spec.ExperimentSpec;
+import org.apache.submarine.server.k8s.utils.K8sUtils;
+import org.apache.submarine.server.submitter.k8s.client.K8sClient;
+import org.apache.submarine.server.submitter.k8s.model.K8sResource;
+import org.apache.submarine.server.submitter.k8s.util.JsonUtils;
+import org.apache.submarine.server.submitter.k8s.util.MLJobConverter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* The machine learning job for the CRD job.
- * It be serialized as body input to k8s api client
+ * It is serialized as body input to k8s api client.
+ * <p>
+ * For job resource definitions and related information,
+ * please refer to https://github.com/kubeflow/training-operator
*/
-public class MLJob implements KubernetesObject{
+public abstract class MLJob implements KubernetesObject, K8sResource<Experiment> {
+
+ protected final Logger LOG = LoggerFactory.getLogger(getClass());
+
+ public MLJob(ExperimentSpec experimentSpec) {
+ // set metadata
+ V1ObjectMetaBuilder metaBuilder = new V1ObjectMetaBuilder();
+ metaBuilder.withNamespace(K8sUtils.getNamespace())
+ // SUBMARINE-880 replace name to experimentId
+ .withName(experimentSpec.getMeta().getExperimentId())
+ //.addAllToOwnerReferences(OwnerReferenceUtils.getOwnerReference())
+ .addToLabels(ExperimentMeta.SUBMARINE_EXPERIMENT_NAME, experimentSpec.getMeta().getName());
+ setMetadata(metaBuilder.build());
+ // set framework
+ setFramework(experimentSpec.getMeta().getFramework());
+ // set experimentId
+ setExperimentId(experimentSpec.getMeta().getExperimentId());
+ }
+
@SerializedName("apiVersion")
private String apiVersion;
@@ -48,6 +84,10 @@
@SerializedName("status")
private V1JobStatus status;
+ private String framework;
+
+ private String experimentId;
+
/**
* Set the api with version
*
@@ -148,4 +188,76 @@
public void setStatus(V1JobStatus status) {
this.status = status;
}
+
+ public String getFramework() {
+ return framework;
+ }
+
+ public void setFramework(String framework) {
+ this.framework = framework;
+ }
+
+ public String getExperimentId() {
+ return experimentId;
+ }
+
+ public void setExperimentId(String experimentId) {
+ this.experimentId = experimentId;
+ }
+
+ /**
+ * Convert MLJob object to return Experiment object
+ */
+ protected <T extends MLJob> Experiment parseExperimentResponseObject(T object, Class<T> tClass)
+ throws SubmarineRuntimeException {
+ String jsonString = JsonUtils.toJson(object);
+ LOG.info("Upstream response JSON: {}", jsonString);
+ try {
+ return MLJobConverter.toJobFromMLJob(JsonUtils.fromJson(jsonString, tClass));
+ } catch (JsonSyntaxException e) {
+ LOG.error("K8s submitter: parse response object failed by " + e.getMessage(), e);
+ throw new SubmarineRuntimeException(500, "K8s Submitter parse upstream response failed.");
+ }
+ }
+
+ /**
+ * Convert MLJob status to return Experiment object
+ */
+ protected Experiment parseExperimentResponseStatus(V1Status status)
+ throws SubmarineRuntimeException {
+ String jsonString = JsonUtils.toJson(status);
+ LOG.info("Upstream response JSON: {}", jsonString);
+ try {
+ return MLJobConverter.toJobFromStatus(JsonUtils.fromJson(jsonString, V1Status.class));
+ } catch (JsonSyntaxException e) {
+ LOG.error("K8s submitter: parse response object failed by " + e.getMessage(), e);
+ throw new SubmarineRuntimeException(500, "K8s Submitter parse upstream response failed.");
+ }
+ }
+
+ /**
+ * Get custom resource type
+ * This method is mainly used by pod agent
+ */
+ public abstract CustomResourceType getResourceType();
+
+ @Override
+ public Experiment read(K8sClient api) {
+ throw new UnsupportedOperationException("MLJob does not implement this method!");
+ }
+
+ @Override
+ public Experiment create(K8sClient api) {
+ throw new UnsupportedOperationException("MLJob does not implement this method!");
+ }
+
+ @Override
+ public Experiment replace(K8sClient api) {
+ throw new UnsupportedOperationException("MLJob does not implement this method!");
+ }
+
+ @Override
+ public Experiment delete(K8sClient api) {
+ throw new UnsupportedOperationException("MLJob does not implement this method!");
+ }
}
diff --git a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/mljob/MLJobFactory.java b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/mljob/MLJobFactory.java
new file mode 100644
index 0000000..d766ea0
--- /dev/null
+++ b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/mljob/MLJobFactory.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.submarine.server.submitter.k8s.model.mljob;
+
+import org.apache.submarine.server.api.exception.InvalidSpecException;
+import org.apache.submarine.server.api.spec.ExperimentMeta;
+import org.apache.submarine.server.api.spec.ExperimentSpec;
+import org.apache.submarine.server.submitter.k8s.model.pytorchjob.PyTorchJob;
+import org.apache.submarine.server.submitter.k8s.model.tfjob.TFJob;
+import org.apache.submarine.server.submitter.k8s.model.xgboostjob.XGBoostJob;
+
+/**
+ * Select different MLJob implementation classes according to different framework
+ */
+public class MLJobFactory {
+
+ /**
+ * Get MLJob by framework name
+ */
+ public static MLJob getMLJob(ExperimentSpec experimentSpec) {
+ String frameworkName = experimentSpec.getMeta().getFramework();
+ ExperimentMeta.SupportedMLFramework framework = ExperimentMeta.SupportedMLFramework
+ .valueOfName(frameworkName);
+ switch (framework) {
+ case TENSORFLOW:
+ return new TFJob(experimentSpec);
+ case PYTORCH:
+ return new PyTorchJob(experimentSpec);
+ case XGBOOST:
+ return new XGBoostJob(experimentSpec);
+ default:
+ throw new InvalidSpecException("Unsupported framework name: " + frameworkName +
+ ". Supported frameworks are: " +
+ String.join(",", ExperimentMeta.SupportedMLFramework.names()));
+ }
+ }
+
+}
diff --git a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/mljob/MLJobReplicaSpec.java b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/mljob/MLJobReplicaSpec.java
index ceb7d04..770f789 100644
--- a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/mljob/MLJobReplicaSpec.java
+++ b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/mljob/MLJobReplicaSpec.java
@@ -101,7 +101,7 @@
V1PodTemplateSpec podSpec = getTemplate();
return String.join(" ",
podSpec.getSpec().getContainers().get(0)
- .getResources().getLimits().get("memory").
+ .getResources().getRequests().get("memory").
getNumber().divide(BigDecimal.valueOf(1000000)).toString() + "M");
}
diff --git a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/notebook/NotebookCR.java b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/notebook/NotebookCR.java
index c5c58f6..6dcd0a7 100644
--- a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/notebook/NotebookCR.java
+++ b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/notebook/NotebookCR.java
@@ -39,6 +39,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.HashMap;
+import java.util.Map;
+
import static org.apache.submarine.server.submitter.k8s.K8sSubmitter.getDeleteOptions;
public class NotebookCR implements KubernetesObject, K8sResource<Notebook> {
@@ -170,7 +173,14 @@
V1ObjectMeta meta = new V1ObjectMeta();
meta.setName(notebookName);
meta.setNamespace(namespace);
- meta.setLabels(notebookSpec.getMeta().getLabels());
+ // we need to use some labels to define/filter the properties of notebook
+ Map<String, String> labels = notebookSpec.getMeta().getLabels();
+ if (labels == null) {
+ labels = new HashMap<>(2);
+ }
+ labels.put("notebook-owner-id", notebookSpec.getMeta().getOwnerId());
+ labels.put("notebook-id", notebookId);
+ meta.setLabels(labels);
meta.setOwnerReferences(OwnerReferenceUtils.getOwnerReference());
this.setMetadata(meta);
// set spec
diff --git a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/pytorchjob/PyTorchJob.java b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/pytorchjob/PyTorchJob.java
index 4bda8db..7da916c 100644
--- a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/pytorchjob/PyTorchJob.java
+++ b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/pytorchjob/PyTorchJob.java
@@ -20,7 +20,28 @@
package org.apache.submarine.server.submitter.k8s.model.pytorchjob;
import com.google.gson.annotations.SerializedName;
+import io.kubernetes.client.custom.V1Patch;
+import io.kubernetes.client.openapi.ApiException;
+import io.kubernetes.client.openapi.models.V1PodTemplateSpec;
+import io.kubernetes.client.openapi.models.V1Status;
+import io.kubernetes.client.util.generic.options.CreateOptions;
+import io.kubernetes.client.util.generic.options.PatchOptions;
+import org.apache.submarine.commons.utils.exception.SubmarineRuntimeException;
+import org.apache.submarine.server.api.common.CustomResourceType;
+import org.apache.submarine.server.api.exception.InvalidSpecException;
+import org.apache.submarine.server.api.experiment.Experiment;
+import org.apache.submarine.server.api.spec.ExperimentSpec;
+import org.apache.submarine.server.api.spec.ExperimentTaskSpec;
+import org.apache.submarine.server.submitter.k8s.client.K8sClient;
import org.apache.submarine.server.submitter.k8s.model.mljob.MLJob;
+import org.apache.submarine.server.submitter.k8s.model.mljob.MLJobReplicaSpec;
+import org.apache.submarine.server.submitter.k8s.parser.ExperimentSpecParser;
+import org.apache.submarine.server.submitter.k8s.util.JsonUtils;
+import org.apache.submarine.server.submitter.k8s.util.MLJobConverter;
+import org.apache.submarine.server.submitter.k8s.util.YamlUtils;
+
+import java.util.HashMap;
+import java.util.Map;
public class PyTorchJob extends MLJob {
@@ -34,12 +55,48 @@
@SerializedName("spec")
private PyTorchJobSpec spec;
- public PyTorchJob() {
+ public PyTorchJob(ExperimentSpec experimentSpec) throws InvalidSpecException {
+ super(experimentSpec);
setApiVersion(CRD_PYTORCH_API_VERSION_V1);
setKind(CRD_PYTORCH_KIND_V1);
setPlural(CRD_PYTORCH_PLURAL_V1);
setVersion(CRD_PYTORCH_VERSION_V1);
setGroup(CRD_PYTORCH_GROUP_V1);
+ // set spec
+ setSpec(parsePyTorchJobSpec(experimentSpec));
+ }
+
+ @Override
+ public CustomResourceType getResourceType() {
+ return CustomResourceType.PyTorchJob;
+ }
+
+ /**
+ * Parse PyTorchJob Spec
+ */
+ private PyTorchJobSpec parsePyTorchJobSpec(ExperimentSpec experimentSpec)
+ throws InvalidSpecException {
+ PyTorchJobSpec pyTorchJobSpec = new PyTorchJobSpec();
+
+ Map<PyTorchJobReplicaType, MLJobReplicaSpec> replicaSpecMap = new HashMap<>();
+ for (Map.Entry<String, ExperimentTaskSpec> entry : experimentSpec.getSpec().entrySet()) {
+ String replicaType = entry.getKey();
+ ExperimentTaskSpec taskSpec = entry.getValue();
+ if (PyTorchJobReplicaType.isSupportedReplicaType(replicaType)) {
+ MLJobReplicaSpec replicaSpec = new MLJobReplicaSpec();
+ replicaSpec.setReplicas(taskSpec.getReplicas());
+ V1PodTemplateSpec podTemplateSpec = ExperimentSpecParser.parseTemplateSpec(taskSpec, experimentSpec);
+ replicaSpec.setTemplate(podTemplateSpec);
+ replicaSpecMap.put(PyTorchJobReplicaType.valueOf(replicaType), replicaSpec);
+ } else {
+ throw new InvalidSpecException("Unrecognized replica type name: " +
+ entry.getKey() + ", it should be " +
+ String.join(",", PyTorchJobReplicaType.names()) +
+ " for PyTorch experiment.");
+ }
+ }
+ pyTorchJobSpec.setReplicaSpecs(replicaSpecMap);
+ return pyTorchJobSpec;
}
/**
@@ -59,4 +116,76 @@
public void setSpec(PyTorchJobSpec spec) {
this.spec = spec;
}
+
+ @Override
+ public Experiment read(K8sClient api) {
+ try {
+ PyTorchJob pyTorchJob = api.getPyTorchJobClient()
+ .get(getMetadata().getNamespace(), getMetadata().getName())
+ .throwsApiException().getObject();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Get PyTorchJob resource: \n{}", YamlUtils.toPrettyYaml(pyTorchJob));
+ }
+ return parseExperimentResponseObject(pyTorchJob, PyTorchJob.class);
+ } catch (ApiException e) {
+ throw new SubmarineRuntimeException(e.getCode(), e.getMessage());
+ }
+ }
+
+ @Override
+ public Experiment create(K8sClient api) {
+ try {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Create PyTorchJob resource: \n{}", YamlUtils.toPrettyYaml(this));
+ }
+ PyTorchJob pyTorchJob = api.getPyTorchJobClient()
+ .create(getMetadata().getNamespace(), this, new CreateOptions())
+ .throwsApiException().getObject();
+ return parseExperimentResponseObject(pyTorchJob, PyTorchJob.class);
+ } catch (ApiException e) {
+ LOG.error("K8s submitter: parse PyTorchJob object failed by " + e.getMessage(), e);
+ throw new SubmarineRuntimeException(e.getCode(), "K8s submitter: parse PyTorchJob object failed by " +
+ e.getMessage());
+ }
+ }
+
+ @Override
+ public Experiment replace(K8sClient api) {
+ try {
+ // Using apply yaml patch, field manager must be set, and it must be forced.
+ // https://kubernetes.io/docs/reference/using-api/server-side-apply/#field-management
+ PatchOptions patchOptions = new PatchOptions();
+ patchOptions.setFieldManager(getExperimentId());
+ patchOptions.setForce(true);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Patch PyTorchJob resource: \n{}", YamlUtils.toPrettyYaml(this));
+ }
+ PyTorchJob pyTorchJob = api.getPyTorchJobClient()
+ .patch(getMetadata().getNamespace(), getMetadata().getName(),
+ V1Patch.PATCH_FORMAT_APPLY_YAML,
+ new V1Patch(JsonUtils.toJson(this)),
+ patchOptions)
+ .throwsApiException().getObject();
+ return parseExperimentResponseObject(pyTorchJob, PyTorchJob.class);
+ } catch (ApiException e) {
+ throw new SubmarineRuntimeException(e.getCode(), e.getMessage());
+ }
+ }
+
+ @Override
+ public Experiment delete(K8sClient api) {
+ try {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Delete PyTorchJob resource in namespace: {} and name: {}",
+ this.getMetadata().getNamespace(), this.getMetadata().getName());
+ }
+ V1Status status = api.getPyTorchJobClient()
+ .delete(getMetadata().getNamespace(), getMetadata().getName(),
+ MLJobConverter.toDeleteOptionsFromMLJob(this))
+ .throwsApiException().getStatus();
+ return parseExperimentResponseStatus(status);
+ } catch (ApiException e) {
+ throw new SubmarineRuntimeException(e.getCode(), e.getMessage());
+ }
+ }
}
diff --git a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/pytorchjob/PyTorchJobSpec.java b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/pytorchjob/PyTorchJobSpec.java
index 0a11771..7137d6c 100644
--- a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/pytorchjob/PyTorchJobSpec.java
+++ b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/pytorchjob/PyTorchJobSpec.java
@@ -24,6 +24,10 @@
import java.util.Map;
+/**
+ * For the entity definition of PyTorchJobSpec, refer to
+ * https://github.com/kubeflow/training-operator/tree/master/examples/pytorch/mnist/v1
+ */
public class PyTorchJobSpec {
/**
diff --git a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/tfjob/TFJob.java b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/tfjob/TFJob.java
index eb35c5d..ee9d0d9 100644
--- a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/tfjob/TFJob.java
+++ b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/tfjob/TFJob.java
@@ -21,14 +21,33 @@
import com.google.gson.annotations.SerializedName;
-import io.kubernetes.client.common.KubernetesObject;
-
+import io.kubernetes.client.custom.V1Patch;
+import io.kubernetes.client.openapi.ApiException;
+import io.kubernetes.client.openapi.models.V1PodTemplateSpec;
+import io.kubernetes.client.openapi.models.V1Status;
+import io.kubernetes.client.util.generic.options.CreateOptions;
+import io.kubernetes.client.util.generic.options.PatchOptions;
+import org.apache.submarine.commons.utils.exception.SubmarineRuntimeException;
+import org.apache.submarine.server.api.common.CustomResourceType;
+import org.apache.submarine.server.api.exception.InvalidSpecException;
+import org.apache.submarine.server.api.experiment.Experiment;
+import org.apache.submarine.server.api.spec.ExperimentSpec;
+import org.apache.submarine.server.api.spec.ExperimentTaskSpec;
+import org.apache.submarine.server.submitter.k8s.client.K8sClient;
import org.apache.submarine.server.submitter.k8s.model.mljob.MLJob;
+import org.apache.submarine.server.submitter.k8s.model.mljob.MLJobReplicaSpec;
+import org.apache.submarine.server.submitter.k8s.parser.ExperimentSpecParser;
+import org.apache.submarine.server.submitter.k8s.util.JsonUtils;
+import org.apache.submarine.server.submitter.k8s.util.MLJobConverter;
+import org.apache.submarine.server.submitter.k8s.util.YamlUtils;
+
+import java.util.HashMap;
+import java.util.Map;
/**
* It's the tf-operator's entry model.
*/
-public class TFJob extends MLJob implements KubernetesObject {
+public class TFJob extends MLJob {
public static final String CRD_TF_KIND_V1 = "TFJob";
public static final String CRD_TF_PLURAL_V1 = "tfjobs";
@@ -40,12 +59,50 @@
@SerializedName("spec")
private TFJobSpec spec;
- public TFJob() {
+ public TFJob(ExperimentSpec experimentSpec) throws InvalidSpecException {
+ super(experimentSpec);
setApiVersion(CRD_TF_API_VERSION_V1);
setKind(CRD_TF_KIND_V1);
setPlural(CRD_TF_PLURAL_V1);
setVersion(CRD_TF_VERSION_V1);
setGroup(CRD_TF_GROUP_V1);
+ // set spec
+ setSpec(parseTFJobSpec(experimentSpec));
+ }
+
+ @Override
+ public CustomResourceType getResourceType() {
+ return CustomResourceType.TFJob;
+ }
+
+ /**
+ * Parse TFJob Spec
+ */
+ private TFJobSpec parseTFJobSpec(ExperimentSpec experimentSpec)
+ throws InvalidSpecException {
+ TFJobSpec tfJobSpec = new TFJobSpec();
+ Map<TFJobReplicaType, MLJobReplicaSpec> replicaSpecMap = new HashMap<>();
+
+ for (Map.Entry<String, ExperimentTaskSpec> entry : experimentSpec.getSpec().entrySet()) {
+ String replicaType = entry.getKey();
+ ExperimentTaskSpec taskSpec = entry.getValue();
+
+ if (TFJobReplicaType.isSupportedReplicaType(replicaType)) {
+ MLJobReplicaSpec replicaSpec = new MLJobReplicaSpec();
+ replicaSpec.setReplicas(taskSpec.getReplicas());
+ V1PodTemplateSpec podTemplateSpec = ExperimentSpecParser.parseTemplateSpec(taskSpec, experimentSpec);
+ replicaSpec.setTemplate(podTemplateSpec);
+ replicaSpecMap.put(TFJobReplicaType.valueOf(replicaType), replicaSpec);
+ } else {
+ throw new InvalidSpecException("Unrecognized replica type name: " +
+ entry.getKey() +
+ ", it should be " +
+ String.join(",", TFJobReplicaType.names()) +
+ " for TensorFlow experiment.");
+ }
+ }
+ tfJobSpec.setReplicaSpecs(replicaSpecMap);
+ return tfJobSpec;
}
/**
@@ -63,4 +120,77 @@
public void setSpec(TFJobSpec spec) {
this.spec = spec;
}
+
+ @Override
+ public Experiment read(K8sClient api) {
+ try {
+ TFJob tfJob = api.getTfJobClient().get(getMetadata().getNamespace(), getMetadata().getName())
+ .throwsApiException().getObject();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Get TFJob resource: \n{}", YamlUtils.toPrettyYaml(tfJob));
+ }
+ return parseExperimentResponseObject(tfJob, TFJob.class);
+ } catch (ApiException e) {
+ throw new SubmarineRuntimeException(e.getCode(), e.getMessage());
+ }
+ }
+
+ @Override
+ public Experiment create(K8sClient api) {
+ try {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Create TFJob resource: \n{}", YamlUtils.toPrettyYaml(this));
+ }
+ TFJob tfJob = api.getTfJobClient().create(getMetadata().getNamespace(), this,
+ new CreateOptions()).throwsApiException().getObject();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Get TFJob resource: \n{}", YamlUtils.toPrettyYaml(tfJob));
+ }
+ return parseExperimentResponseObject(tfJob, TFJob.class);
+ } catch (ApiException e) {
+ LOG.error("K8s submitter: parse TFJob object failed by " + e.getMessage(), e);
+ throw new SubmarineRuntimeException(e.getCode(), "K8s submitter: parse TFJob object failed by " +
+ e.getMessage());
+ }
+ }
+
+ @Override
+ public Experiment replace(K8sClient api) {
+ try {
+ // Using apply yaml patch, field manager must be set, and it must be forced.
+ // https://kubernetes.io/docs/reference/using-api/server-side-apply/#field-management
+ PatchOptions patchOptions = new PatchOptions();
+ patchOptions.setFieldManager(getExperimentId());
+ patchOptions.setForce(true);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Patch TFJob resource: \n{}", YamlUtils.toPrettyYaml(this));
+ }
+ TFJob tfJob = api.getTfJobClient()
+ .patch(getMetadata().getNamespace(), getMetadata().getName(),
+ V1Patch.PATCH_FORMAT_APPLY_YAML,
+ new V1Patch(JsonUtils.toJson(this)), patchOptions)
+ .throwsApiException().getObject();
+ return parseExperimentResponseObject(tfJob, TFJob.class);
+ } catch (ApiException e) {
+ throw new SubmarineRuntimeException(e.getCode(), e.getMessage());
+ }
+ }
+
+ @Override
+ public Experiment delete(K8sClient api) {
+ try {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Delete TFJob resource in namespace: {} and name: {}",
+ this.getMetadata().getNamespace(), this.getMetadata().getName());
+ }
+ V1Status status = api.getTfJobClient()
+ .delete(getMetadata().getNamespace(), getMetadata().getName(),
+ MLJobConverter.toDeleteOptionsFromMLJob(this))
+ .throwsApiException().getStatus();
+ return parseExperimentResponseStatus(status);
+ } catch (ApiException e) {
+ throw new SubmarineRuntimeException(e.getCode(), e.getMessage());
+ }
+ }
+
}
diff --git a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/tfjob/TFJobSpec.java b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/tfjob/TFJobSpec.java
index 1277f14..0e1f3e9 100644
--- a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/tfjob/TFJobSpec.java
+++ b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/tfjob/TFJobSpec.java
@@ -26,6 +26,8 @@
/**
* The replica spec of TFJob.
+ * For the entity definition of TFJobSpec, refer to
+ * https://github.com/kubeflow/training-operator/blob/master/examples/tensorflow/dist-mnist/tf_job_mnist.yaml
*/
public class TFJobSpec {
/**
diff --git a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/xgboostjob/XGBoostJob.java b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/xgboostjob/XGBoostJob.java
index 968ed7b..0bacbfd 100644
--- a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/xgboostjob/XGBoostJob.java
+++ b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/xgboostjob/XGBoostJob.java
@@ -20,7 +20,28 @@
package org.apache.submarine.server.submitter.k8s.model.xgboostjob;
import com.google.gson.annotations.SerializedName;
+import io.kubernetes.client.custom.V1Patch;
+import io.kubernetes.client.openapi.ApiException;
+import io.kubernetes.client.openapi.models.V1PodTemplateSpec;
+import io.kubernetes.client.openapi.models.V1Status;
+import io.kubernetes.client.util.generic.options.CreateOptions;
+import io.kubernetes.client.util.generic.options.PatchOptions;
+import org.apache.submarine.commons.utils.exception.SubmarineRuntimeException;
+import org.apache.submarine.server.api.common.CustomResourceType;
+import org.apache.submarine.server.api.exception.InvalidSpecException;
+import org.apache.submarine.server.api.experiment.Experiment;
+import org.apache.submarine.server.api.spec.ExperimentSpec;
+import org.apache.submarine.server.api.spec.ExperimentTaskSpec;
+import org.apache.submarine.server.submitter.k8s.client.K8sClient;
import org.apache.submarine.server.submitter.k8s.model.mljob.MLJob;
+import org.apache.submarine.server.submitter.k8s.model.mljob.MLJobReplicaSpec;
+import org.apache.submarine.server.submitter.k8s.parser.ExperimentSpecParser;
+import org.apache.submarine.server.submitter.k8s.util.JsonUtils;
+import org.apache.submarine.server.submitter.k8s.util.MLJobConverter;
+import org.apache.submarine.server.submitter.k8s.util.YamlUtils;
+
+import java.util.HashMap;
+import java.util.Map;
public class XGBoostJob extends MLJob {
@@ -34,12 +55,43 @@
@SerializedName("spec")
private XGBoostJobSpec spec;
- public XGBoostJob() {
+ public XGBoostJob(ExperimentSpec experimentSpec) {
+ super(experimentSpec);
setApiVersion(CRD_XGBOOST_API_VERSION_V1);
setKind(CRD_XGBOOST_KIND_V1);
setPlural(CRD_XGBOOST_PLURAL_V1);
setVersion(CRD_XGBOOST_VERSION_V1);
setGroup(CRD_XGBOOST_GROUP_V1);
+ // set spec
+ setSpec(parseXGBoostJobSpec(experimentSpec));
+ }
+
+ private XGBoostJobSpec parseXGBoostJobSpec(ExperimentSpec experimentSpec)
+ throws InvalidSpecException {
+ XGBoostJobSpec xGBoostJobSpec = new XGBoostJobSpec();
+
+ Map<XGBoostJobReplicaType, MLJobReplicaSpec> replicaSpecMap = new HashMap<>();
+
+ for (Map.Entry<String, ExperimentTaskSpec> entry : experimentSpec.getSpec().entrySet()) {
+ String replicaType = entry.getKey();
+ ExperimentTaskSpec taskSpec = entry.getValue();
+
+ if (XGBoostJobReplicaType.isSupportedReplicaType(replicaType)) {
+ MLJobReplicaSpec replicaSpec = new MLJobReplicaSpec();
+ replicaSpec.setReplicas(taskSpec.getReplicas());
+ V1PodTemplateSpec podTemplateSpec = ExperimentSpecParser.parseTemplateSpec(taskSpec, experimentSpec);
+ replicaSpec.setTemplate(podTemplateSpec);
+ replicaSpecMap.put(XGBoostJobReplicaType.valueOf(replicaType), replicaSpec);
+ } else {
+ throw new InvalidSpecException("Unrecognized replica type name: " +
+ entry.getKey() +
+ ", it should be " +
+ String.join(",", XGBoostJobReplicaType.names()) +
+ " for XGBoost experiment.");
+ }
+ }
+ xGBoostJobSpec.setReplicaSpecs(replicaSpecMap);
+ return xGBoostJobSpec;
}
/**
@@ -57,5 +109,82 @@
public void setSpec(XGBoostJobSpec spec) {
this.spec = spec;
}
+
+ @Override
+ public CustomResourceType getResourceType() {
+ return CustomResourceType.XGBoost;
+ }
+
+ @Override
+ public Experiment read(K8sClient api) {
+ try {
+ XGBoostJob xgBoostJob = api.getXGBoostJobClient()
+ .get(getMetadata().getNamespace(), getMetadata().getName())
+ .throwsApiException().getObject();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Get XGBoostJob resource: \n{}", YamlUtils.toPrettyYaml(xgBoostJob));
+ }
+ return parseExperimentResponseObject(xgBoostJob, XGBoostJob.class);
+ } catch (ApiException e) {
+ throw new SubmarineRuntimeException(e.getCode(), e.getMessage());
+ }
+ }
+
+ @Override
+ public Experiment create(K8sClient api) {
+ try {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Create XGBoostJob resource: \n{}", YamlUtils.toPrettyYaml(this));
+ }
+ XGBoostJob xgBoostJob = api.getXGBoostJobClient()
+ .create(getMetadata().getNamespace(), this, new CreateOptions())
+ .throwsApiException().getObject();
+ return parseExperimentResponseObject(xgBoostJob, XGBoostJob.class);
+ } catch (ApiException e) {
+ LOG.error("K8s submitter: parse XGBoostJob object failed by " + e.getMessage(), e);
+ throw new SubmarineRuntimeException(e.getCode(), "K8s submitter: parse XGBoostJob object failed by " +
+ e.getMessage());
+ }
+ }
+
+ @Override
+ public Experiment replace(K8sClient api) {
+ try {
+ // Using apply yaml patch, field manager must be set, and it must be forced.
+ // https://kubernetes.io/docs/reference/using-api/server-side-apply/#field-management
+ PatchOptions patchOptions = new PatchOptions();
+ patchOptions.setFieldManager(getExperimentId());
+ patchOptions.setForce(true);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Patch XGBoostJob resource: \n{}", YamlUtils.toPrettyYaml(this));
+ }
+ XGBoostJob xgBoostJob = api.getXGBoostJobClient()
+ .patch(getMetadata().getNamespace(), getMetadata().getName(),
+ V1Patch.PATCH_FORMAT_APPLY_YAML,
+ new V1Patch(JsonUtils.toJson(this)),
+ patchOptions)
+ .throwsApiException().getObject();
+ return parseExperimentResponseObject(xgBoostJob, XGBoostJob.class);
+ } catch (ApiException e) {
+ throw new SubmarineRuntimeException(e.getCode(), e.getMessage());
+ }
+ }
+
+ @Override
+ public Experiment delete(K8sClient api) {
+ try {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Delete XGBoostJob resource in namespace: {} and name: {}",
+ this.getMetadata().getNamespace(), this.getMetadata().getName());
+ }
+ V1Status status = api.getXGBoostJobClient()
+ .delete(getMetadata().getNamespace(), getMetadata().getName(),
+ MLJobConverter.toDeleteOptionsFromMLJob(this))
+ .throwsApiException().getStatus();
+ return parseExperimentResponseStatus(status);
+ } catch (ApiException e) {
+ throw new SubmarineRuntimeException(e.getCode(), e.getMessage());
+ }
+ }
}
diff --git a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/parser/ExperimentSpecParser.java b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/parser/ExperimentSpecParser.java
index 94e3c2c..991e03c 100644
--- a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/parser/ExperimentSpecParser.java
+++ b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/parser/ExperimentSpecParser.java
@@ -23,7 +23,7 @@
import io.kubernetes.client.openapi.models.V1Container;
import io.kubernetes.client.openapi.models.V1EnvVar;
-import io.kubernetes.client.openapi.models.V1ObjectMeta;
+import io.kubernetes.client.openapi.models.V1ObjectMetaBuilder;
import io.kubernetes.client.openapi.models.V1PersistentVolumeClaimVolumeSource;
import io.kubernetes.client.openapi.models.V1PodSecurityContext;
import io.kubernetes.client.openapi.models.V1PodSpec;
@@ -36,7 +36,6 @@
import org.apache.submarine.commons.utils.SubmarineConfiguration;
import org.apache.submarine.server.api.environment.Environment;
import org.apache.submarine.server.api.exception.InvalidSpecException;
-import org.apache.submarine.server.api.spec.ExperimentMeta;
import org.apache.submarine.server.api.spec.ExperimentSpec;
import org.apache.submarine.server.api.spec.ExperimentTaskSpec;
import org.apache.submarine.server.api.spec.EnvironmentSpec;
@@ -44,17 +43,6 @@
import org.apache.submarine.server.submitter.k8s.experiment.codelocalizer.AbstractCodeLocalizer;
import org.apache.submarine.server.submitter.k8s.experiment.codelocalizer.CodeLocalizer;
import org.apache.submarine.server.submitter.k8s.experiment.codelocalizer.SSHGitCodeLocalizer;
-import org.apache.submarine.server.submitter.k8s.model.mljob.MLJob;
-import org.apache.submarine.server.submitter.k8s.model.mljob.MLJobReplicaSpec;
-import org.apache.submarine.server.submitter.k8s.model.pytorchjob.PyTorchJob;
-import org.apache.submarine.server.submitter.k8s.model.pytorchjob.PyTorchJobReplicaType;
-import org.apache.submarine.server.submitter.k8s.model.pytorchjob.PyTorchJobSpec;
-import org.apache.submarine.server.submitter.k8s.model.tfjob.TFJob;
-import org.apache.submarine.server.submitter.k8s.model.tfjob.TFJobReplicaType;
-import org.apache.submarine.server.submitter.k8s.model.tfjob.TFJobSpec;
-import org.apache.submarine.server.submitter.k8s.model.xgboostjob.XGBoostJob;
-import org.apache.submarine.server.submitter.k8s.model.xgboostjob.XGBoostJobReplicaType;
-import org.apache.submarine.server.submitter.k8s.model.xgboostjob.XGBoostJobSpec;
import java.util.ArrayList;
import java.util.Arrays;
@@ -63,146 +51,18 @@
import java.util.Map;
public class ExperimentSpecParser {
- private static SubmarineConfiguration conf = SubmarineConfiguration.getInstance();
- public static MLJob parseJob(ExperimentSpec experimentSpec) throws InvalidSpecException {
- String framework = experimentSpec.getMeta().getFramework();
- if (ExperimentMeta.SupportedMLFramework.TENSORFLOW.
- getName().equalsIgnoreCase(framework)) {
- return parseTFJob(experimentSpec);
- } else if (ExperimentMeta.SupportedMLFramework.PYTORCH.
- getName().equalsIgnoreCase(framework)) {
- return parsePyTorchJob(experimentSpec);
- } else if (ExperimentMeta.SupportedMLFramework.XGBOOST.
- getName().equalsIgnoreCase(framework)) {
- return parseXGBoostJob(experimentSpec);
- } else {
- throw new InvalidSpecException("Unsupported framework name: " + framework +
- ". Supported frameworks are: " +
- String.join(",", ExperimentMeta.SupportedMLFramework.names()));
- }
- }
+ private static final SubmarineConfiguration conf = SubmarineConfiguration.getInstance();
- public static XGBoostJob parseXGBoostJob(
- ExperimentSpec experimentSpec) throws InvalidSpecException {
- XGBoostJob xGBoostJob = new XGBoostJob();
- xGBoostJob.setMetadata(parseMetadata(experimentSpec));
- xGBoostJob.setSpec(parseXGBoostJobSpec(experimentSpec));
- return xGBoostJob;
- }
-
- public static XGBoostJobSpec parseXGBoostJobSpec(ExperimentSpec experimentSpec)
- throws InvalidSpecException {
- XGBoostJobSpec xGBoostJobSpec = new XGBoostJobSpec();
-
- Map<XGBoostJobReplicaType, MLJobReplicaSpec> replicaSpecMap = new HashMap<>();
-
- for (Map.Entry<String, ExperimentTaskSpec> entry : experimentSpec.getSpec().entrySet()) {
- String replicaType = entry.getKey();
- ExperimentTaskSpec taskSpec = entry.getValue();
-
- if (XGBoostJobReplicaType.isSupportedReplicaType(replicaType)) {
- MLJobReplicaSpec replicaSpec = new MLJobReplicaSpec();
- replicaSpec.setReplicas(taskSpec.getReplicas());
- V1PodTemplateSpec podTemplateSpec = parseTemplateSpec(taskSpec, experimentSpec);
-
- replicaSpec.setTemplate(podTemplateSpec);
- replicaSpecMap.put(XGBoostJobReplicaType.valueOf(replicaType), replicaSpec);
- } else {
- throw new InvalidSpecException("Unrecognized replica type name: " +
- entry.getKey() +
- ", it should be " +
- String.join(",", XGBoostJobReplicaType.names()) +
- " for XGBoost experiment.");
- }
- }
- xGBoostJobSpec.setReplicaSpecs(replicaSpecMap);
- return xGBoostJobSpec;
- }
-
- public static PyTorchJob parsePyTorchJob(
- ExperimentSpec experimentSpec) throws InvalidSpecException {
- PyTorchJob pyTorchJob = new PyTorchJob();
- pyTorchJob.setMetadata(parseMetadata(experimentSpec));
- pyTorchJob.setSpec(parsePyTorchJobSpec(experimentSpec));
- return pyTorchJob;
- }
-
- public static PyTorchJobSpec parsePyTorchJobSpec(ExperimentSpec experimentSpec)
- throws InvalidSpecException {
- PyTorchJobSpec pyTorchJobSpec = new PyTorchJobSpec();
-
- Map<PyTorchJobReplicaType, MLJobReplicaSpec> replicaSpecMap = new HashMap<>();
- for (Map.Entry<String, ExperimentTaskSpec> entry : experimentSpec.getSpec().entrySet()) {
- String replicaType = entry.getKey();
- ExperimentTaskSpec taskSpec = entry.getValue();
- if (PyTorchJobReplicaType.isSupportedReplicaType(replicaType)) {
- MLJobReplicaSpec replicaSpec = new MLJobReplicaSpec();
- replicaSpec.setReplicas(taskSpec.getReplicas());
- V1PodTemplateSpec podTemplateSpec = parseTemplateSpec(taskSpec, experimentSpec);
-
- replicaSpec.setTemplate(podTemplateSpec);
- replicaSpecMap.put(PyTorchJobReplicaType.valueOf(replicaType), replicaSpec);
- } else {
- throw new InvalidSpecException("Unrecognized replica type name: " +
- entry.getKey() + ", it should be " +
- String.join(",", PyTorchJobReplicaType.names()) +
- " for PyTorch experiment.");
- }
- }
- pyTorchJobSpec.setReplicaSpecs(replicaSpecMap);
- return pyTorchJobSpec;
- }
-
- public static TFJob parseTFJob(ExperimentSpec experimentSpec)
- throws InvalidSpecException {
- TFJob tfJob = new TFJob();
- tfJob.setMetadata(parseMetadata(experimentSpec));
- tfJob.setSpec(parseTFJobSpec(experimentSpec));
- return tfJob;
- }
-
- private static V1ObjectMeta parseMetadata(ExperimentSpec experimentSpec) {
- V1ObjectMeta meta = new V1ObjectMeta();
- meta.setName(experimentSpec.getMeta().getExperimentId());
- Map<String, String> labels = new HashMap<>();
- labels.put(ExperimentMeta.SUBMARINE_EXPERIMENT_NAME, experimentSpec.getMeta().getName());
- meta.setLabels(labels);
-
- return meta;
- }
-
- private static TFJobSpec parseTFJobSpec(ExperimentSpec experimentSpec)
- throws InvalidSpecException {
- TFJobSpec tfJobSpec = new TFJobSpec();
- Map<TFJobReplicaType, MLJobReplicaSpec> replicaSpecMap = new HashMap<>();
-
- for (Map.Entry<String, ExperimentTaskSpec> entry : experimentSpec.getSpec().entrySet()) {
- String replicaType = entry.getKey();
- ExperimentTaskSpec taskSpec = entry.getValue();
-
- if (TFJobReplicaType.isSupportedReplicaType(replicaType)) {
- MLJobReplicaSpec replicaSpec = new MLJobReplicaSpec();
- replicaSpec.setReplicas(taskSpec.getReplicas());
- V1PodTemplateSpec podTemplateSpec = parseTemplateSpec(taskSpec, experimentSpec);
-
- replicaSpec.setTemplate(podTemplateSpec);
- replicaSpecMap.put(TFJobReplicaType.valueOf(replicaType), replicaSpec);
- } else {
- throw new InvalidSpecException("Unrecognized replica type name: " +
- entry.getKey() +
- ", it should be " +
- String.join(",", TFJobReplicaType.names()) +
- " for TensorFlow experiment.");
- }
- }
- tfJobSpec.setReplicaSpecs(replicaSpecMap);
- return tfJobSpec;
- }
-
- private static V1PodTemplateSpec parseTemplateSpec(
+ public static V1PodTemplateSpec parseTemplateSpec(
ExperimentTaskSpec taskSpec, ExperimentSpec experimentSpec) throws InvalidSpecException {
V1PodTemplateSpec templateSpec = new V1PodTemplateSpec();
+ // There is no need to add istio sidecar. Otherwise, the pod may not end normally
+ // https://istio.io/latest/docs/setup/additional-setup/sidecar-injection/
+ // Controlling the injection policy Section
+ V1ObjectMetaBuilder metaBuilder = new V1ObjectMetaBuilder()
+ .addToAnnotations("sidecar.istio.io/inject", "false");
+ templateSpec.setMetadata(metaBuilder.build());
V1PodSpec podSpec = new V1PodSpec();
List<V1Container> containers = new ArrayList<>();
V1Container container = new V1Container();
@@ -402,6 +262,7 @@
if (request) {
resources.put("memory", new Quantity(memoryRequest)); // ex: 1024M
} else {
+ // SUBMARINE-948. Allow experiments to overcommit memory
String suffix = memoryRequest.substring(memoryRequest.length() - 1);
String value = memoryRequest.substring(0, memoryRequest.length() - 1);
String memoryLimit = String.valueOf(Integer.parseInt(value) * 2) + suffix;
diff --git a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/parser/NotebookSpecParser.java b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/parser/NotebookSpecParser.java
index 16b8cea..8a92467 100644
--- a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/parser/NotebookSpecParser.java
+++ b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/parser/NotebookSpecParser.java
@@ -230,7 +230,7 @@
return resources;
}
- private static Environment getEnvironment(NotebookSpec notebookSpec) {
+ public static Environment getEnvironment(NotebookSpec notebookSpec) {
if (notebookSpec.getEnvironment().getName() != null) {
EnvironmentManager environmentManager = EnvironmentManager.getInstance();
return environmentManager
diff --git a/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/ExperimentSpecParserTest.java b/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/ExperimentSpecParserTest.java
index 4e2deb2..e6f8b34 100644
--- a/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/ExperimentSpecParserTest.java
+++ b/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/ExperimentSpecParserTest.java
@@ -21,6 +21,10 @@
import java.io.IOException;
import java.net.URISyntaxException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
@@ -35,8 +39,10 @@
import org.apache.submarine.server.api.spec.ExperimentTaskSpec;
import org.apache.submarine.server.api.spec.EnvironmentSpec;
import org.apache.submarine.server.api.spec.KernelSpec;
+import org.apache.submarine.server.k8s.utils.K8sUtils;
import org.apache.submarine.server.manager.EnvironmentManager;
import org.apache.submarine.server.submitter.k8s.model.mljob.MLJob;
+import org.apache.submarine.server.submitter.k8s.model.mljob.MLJobFactory;
import org.apache.submarine.server.submitter.k8s.model.mljob.MLJobReplicaSpec;
import org.apache.submarine.server.submitter.k8s.model.mljob.MLJobReplicaType;
import org.apache.submarine.server.submitter.k8s.model.pytorchjob.PyTorchJob;
@@ -45,11 +51,11 @@
import org.apache.submarine.server.submitter.k8s.model.tfjob.TFJobReplicaType;
import org.apache.submarine.server.submitter.k8s.model.xgboostjob.XGBoostJob;
import org.apache.submarine.server.submitter.k8s.model.xgboostjob.XGBoostJobReplicaType;
-import org.apache.submarine.server.submitter.k8s.parser.ExperimentSpecParser;
import org.apache.submarine.server.submitter.k8s.experiment.codelocalizer.AbstractCodeLocalizer;
import org.apache.submarine.server.submitter.k8s.experiment.codelocalizer.GitCodeLocalizer;
import org.apache.submarine.server.submitter.k8s.experiment.codelocalizer.SSHGitCodeLocalizer;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.Test;
import io.kubernetes.client.openapi.models.V1Container;
import io.kubernetes.client.openapi.models.V1EmptyDirVolumeSource;
@@ -58,14 +64,29 @@
public class ExperimentSpecParserTest extends SpecBuilder {
- private static SubmarineConfiguration conf =
+ private static final SubmarineConfiguration conf =
SubmarineConfiguration.getInstance();
+ @Before
+ public void beforeInit() {
+ conf.setJdbcUrl(H2_JDBC_URL);
+ conf.setJdbcDriverClassName(H2_JDBC_DRIVERCLASS);
+ conf.setJdbcUserName(H2_JDBC_USERNAME);
+ conf.setJdbcPassword(H2_JDBC_PASSWORD);
+ try (Connection conn = DriverManager.getConnection(H2_JDBC_URL,
+ H2_JDBC_USERNAME, H2_JDBC_PASSWORD);
+ Statement stmt = conn.createStatement()) {
+ stmt.execute("RUNSCRIPT FROM 'classpath:/db/experiment.sql'");
+ } catch (SQLException e) {
+ e.printStackTrace();
+ }
+ }
+
@Test
public void testValidTensorFlowExperiment() throws IOException,
URISyntaxException, InvalidSpecException {
ExperimentSpec experimentSpec = (ExperimentSpec) buildFromJsonFile(ExperimentSpec.class, tfJobReqFile);
- TFJob tfJob = (TFJob) ExperimentSpecParser.parseJob(experimentSpec);
+ TFJob tfJob = (TFJob) MLJobFactory.getMLJob(experimentSpec);
validateMetadata(experimentSpec.getMeta(), tfJob.getMetadata(),
ExperimentMeta.SupportedMLFramework.TENSORFLOW.getName().toLowerCase()
);
@@ -86,7 +107,7 @@
// Case 1. Invalid framework name
experimentSpec.getMeta().setFramework("fooframework");
try {
- ExperimentSpecParser.parseJob(experimentSpec);
+ MLJobFactory.getMLJob(experimentSpec);
Assert.fail("It should throw InvalidSpecException");
} catch (InvalidSpecException e) {
Assert.assertTrue(e.getMessage().contains("Unsupported framework name"));
@@ -97,7 +118,7 @@
experimentSpec.getSpec().put("foo", experimentSpec.getSpec().get(TFJobReplicaType.Ps.getTypeName()));
experimentSpec.getSpec().remove(TFJobReplicaType.Ps.getTypeName());
try {
- ExperimentSpecParser.parseJob(experimentSpec);
+ MLJobFactory.getMLJob(experimentSpec);
Assert.fail("It should throw InvalidSpecException");
} catch (InvalidSpecException e) {
Assert.assertTrue(e.getMessage().contains("Unrecognized replica type name"));
@@ -109,7 +130,7 @@
URISyntaxException, InvalidSpecException {
ExperimentSpec experimentSpec =
(ExperimentSpec) buildFromJsonFile(ExperimentSpec.class, pytorchJobReqFile);
- PyTorchJob pyTorchJob = (PyTorchJob) ExperimentSpecParser.parseJob(experimentSpec);
+ PyTorchJob pyTorchJob = (PyTorchJob) MLJobFactory.getMLJob(experimentSpec);
validateMetadata(experimentSpec.getMeta(), pyTorchJob.getMetadata(),
ExperimentMeta.SupportedMLFramework.PYTORCH.getName().toLowerCase()
);
@@ -131,7 +152,7 @@
// Case 1. Invalid framework name
experimentSpec.getMeta().setFramework("fooframework");
try {
- ExperimentSpecParser.parseJob(experimentSpec);
+ MLJobFactory.getMLJob(experimentSpec);
Assert.fail("It should throw InvalidSpecException");
} catch (InvalidSpecException e) {
Assert.assertTrue(e.getMessage().contains("Unsupported framework name"));
@@ -143,7 +164,7 @@
PyTorchJobReplicaType.Master.getTypeName()));
experimentSpec.getSpec().remove(PyTorchJobReplicaType.Master.getTypeName());
try {
- ExperimentSpecParser.parseJob(experimentSpec);
+ MLJobFactory.getMLJob(experimentSpec);
Assert.fail("It should throw InvalidSpecException");
} catch (InvalidSpecException e) {
Assert.assertTrue(e.getMessage().contains("Unrecognized replica type name"));
@@ -155,7 +176,7 @@
URISyntaxException, InvalidSpecException {
ExperimentSpec experimentSpec = (ExperimentSpec) buildFromJsonFile(ExperimentSpec.class,
xgboostJobReqFile);
- XGBoostJob xgboostJob = (XGBoostJob) ExperimentSpecParser.parseJob(experimentSpec);
+ XGBoostJob xgboostJob = (XGBoostJob) MLJobFactory.getMLJob(experimentSpec);
validateMetadata(experimentSpec.getMeta(), xgboostJob.getMetadata(),
ExperimentMeta.SupportedMLFramework.XGBOOST.getName().toLowerCase()
);
@@ -172,7 +193,7 @@
// Case 1. Invalid framework name
experimentSpec.getMeta().setFramework("fooframework");
try {
- ExperimentSpecParser.parseJob(experimentSpec);
+ MLJobFactory.getMLJob(experimentSpec);
Assert.fail("It should throw InvalidSpecException");
} catch (InvalidSpecException e) {
Assert.assertTrue(e.getMessage().contains("Unsupported framework name"));
@@ -184,7 +205,7 @@
XGBoostJobReplicaType.Master.getTypeName()));
experimentSpec.getSpec().remove(XGBoostJobReplicaType.Master.getTypeName());
try {
- ExperimentSpecParser.parseJob(experimentSpec);
+ MLJobFactory.getMLJob(experimentSpec);
Assert.fail("It should throw InvalidSpecException");
} catch (InvalidSpecException e) {
Assert.assertTrue(e.getMessage().contains("Unrecognized replica type name"));
@@ -193,8 +214,8 @@
private void validateMetadata(ExperimentMeta expectedMeta, V1ObjectMeta actualMeta,
String actualFramework) {
- Assert.assertEquals(expectedMeta.getName(), actualMeta.getName());
- Assert.assertEquals(expectedMeta.getNamespace(), actualMeta.getNamespace());
+ Assert.assertEquals(expectedMeta.getExperimentId(), actualMeta.getName());
+ Assert.assertEquals(K8sUtils.getNamespace(), actualMeta.getNamespace());
Assert.assertEquals(expectedMeta.getFramework().toLowerCase(), actualFramework);
}
@@ -209,7 +230,7 @@
mlJobReplicaSpec = ((XGBoostJob) mlJob).getSpec().getReplicaSpecs().get(type);
}
Assert.assertNotNull(mlJobReplicaSpec);
-
+
ExperimentTaskSpec definedPyTorchMasterTask = experimentSpec.getSpec().
get(type.getTypeName());
@@ -273,7 +294,7 @@
ExperimentSpec jobSpec =
(ExperimentSpec) buildFromJsonFile(ExperimentSpec.class, pytorchJobWithEnvReqFile);
- PyTorchJob pyTorchJob = (PyTorchJob) ExperimentSpecParser.parseJob(jobSpec);
+ PyTorchJob pyTorchJob = (PyTorchJob) MLJobFactory.getMLJob(jobSpec);
MLJobReplicaSpec mlJobReplicaSpec = pyTorchJob.getSpec().getReplicaSpecs()
.get(PyTorchJobReplicaType.Master);
@@ -319,7 +340,7 @@
ExperimentSpec jobSpec =
(ExperimentSpec) buildFromJsonFile(ExperimentSpec.class,
pytorchJobWithHTTPGitCodeLocalizerFile);
- PyTorchJob pyTorchJob = (PyTorchJob) ExperimentSpecParser.parseJob(jobSpec);
+ PyTorchJob pyTorchJob = (PyTorchJob) MLJobFactory.getMLJob(jobSpec);
MLJobReplicaSpec mlJobReplicaSpec = pyTorchJob.getSpec().getReplicaSpecs()
.get(PyTorchJobReplicaType.Master);
@@ -351,8 +372,10 @@
}
}
- V1Volume V1Volume =
- mlJobReplicaSpec.getTemplate().getSpec().getVolumes().get(0);
+ // we need to filter code-dir first
+ V1Volume V1Volume = mlJobReplicaSpec.getTemplate().getSpec().getVolumes().stream()
+ .filter(v -> v.getName().equals(AbstractCodeLocalizer.CODE_LOCALIZER_MOUNT_NAME))
+ .findFirst().get();
Assert.assertEquals(new V1EmptyDirVolumeSource(), V1Volume.getEmptyDir());
Assert.assertEquals(AbstractCodeLocalizer.CODE_LOCALIZER_MOUNT_NAME,
V1Volume.getName());
@@ -364,7 +387,7 @@
ExperimentSpec jobSpec =
(ExperimentSpec) buildFromJsonFile(ExperimentSpec.class,
pytorchJobWithSSHGitCodeLocalizerFile);
- PyTorchJob pyTorchJob = (PyTorchJob) ExperimentSpecParser.parseJob(jobSpec);
+ PyTorchJob pyTorchJob = (PyTorchJob) MLJobFactory.getMLJob(jobSpec);
MLJobReplicaSpec mlJobReplicaSpec = pyTorchJob.getSpec().getReplicaSpecs()
.get(PyTorchJobReplicaType.Master);
@@ -402,8 +425,10 @@
}
}
- V1Volume V1Volume =
- mlJobReplicaSpec.getTemplate().getSpec().getVolumes().get(0);
+ // we need to filter code-dir first
+ V1Volume V1Volume = mlJobReplicaSpec.getTemplate().getSpec().getVolumes().stream()
+ .filter(v -> v.getName().equals(AbstractCodeLocalizer.CODE_LOCALIZER_MOUNT_NAME))
+ .findFirst().get();
Assert.assertEquals(new V1EmptyDirVolumeSource(), V1Volume.getEmptyDir());
Assert.assertEquals(AbstractCodeLocalizer.CODE_LOCALIZER_MOUNT_NAME,
V1Volume.getName());
diff --git a/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/K8SJobSubmitterTest.java b/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/K8SJobSubmitterTest.java
index eaa8b06..f68e4ba 100644
--- a/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/K8SJobSubmitterTest.java
+++ b/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/K8SJobSubmitterTest.java
@@ -51,6 +51,10 @@
* file. Local: docker run -it --privileged -p 8443:8443 -p 10080:10080
* bsycorp/kind:latest-1.15 Travis: See '.travis.yml'
*/
+
+// We have created mljob in org.apache.submarine.server.submitter.k8s.mljob package
+// So that we will ignore this test case for the time being
+@Ignore
public class K8SJobSubmitterTest extends SpecBuilder {
private static final Logger LOG = LoggerFactory.getLogger(K8SJobSubmitterTest.class);
private K8sSubmitter submitter;
@@ -90,7 +94,7 @@
@Test
public void testRunTFJobPerRequest() throws URISyntaxException, IOException, SubmarineRuntimeException {
- ExperimentSpec spec = (ExperimentSpec) buildFromJsonFile(ExperimentSpec.class, tfJobReqFile);
+ ExperimentSpec spec = (ExperimentSpec) buildFromJsonFile(ExperimentSpec.class, xgboostJobReqFile);
run(spec);
}
diff --git a/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/MLJobConverterTest.java b/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/MLJobConverterTest.java
index 54eef99..63c050c 100644
--- a/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/MLJobConverterTest.java
+++ b/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/MLJobConverterTest.java
@@ -34,9 +34,9 @@
import org.apache.submarine.server.api.exception.InvalidSpecException;
import org.apache.submarine.server.api.experiment.Experiment;
import org.apache.submarine.server.api.spec.ExperimentSpec;
+import org.apache.submarine.server.submitter.k8s.model.mljob.MLJobFactory;
import org.apache.submarine.server.submitter.k8s.util.MLJobConverter;
import org.apache.submarine.server.submitter.k8s.model.mljob.MLJob;
-import org.apache.submarine.server.submitter.k8s.parser.ExperimentSpecParser;
import org.joda.time.DateTime;
import org.junit.Assert;
import org.junit.Test;
@@ -46,7 +46,7 @@
public void testMLJob2Job() throws IOException, URISyntaxException, InvalidSpecException {
// Accepted Status
ExperimentSpec spec = (ExperimentSpec) buildFromJsonFile(ExperimentSpec.class, tfJobReqFile);
- MLJob mlJob = ExperimentSpecParser.parseJob(spec);
+ MLJob mlJob = MLJobFactory.getMLJob(spec);
V1JobStatus status = new V1JobStatusBuilder().build();
mlJob.setStatus(status);
Experiment experiment = MLJobConverter.toJobFromMLJob(mlJob);
@@ -73,8 +73,8 @@
condition = new V1JobConditionBuilder().withStatus("True")
.withType("Running").withLastTransitionTime(runningTime).build();
conditions.add(condition);
-
mlJob.getStatus().setConditions(conditions);
+
experiment = MLJobConverter.toJobFromMLJob(mlJob);
Assert.assertEquals(Experiment.Status.STATUS_RUNNING.toString(), experiment.getStatus());
Assert.assertEquals(runningTime.toString(), experiment.getRunningTime());
@@ -82,6 +82,11 @@
// Succeeded Status
DateTime finishedTime = new DateTime();
mlJob.getStatus().setCompletionTime(finishedTime);
+ condition = new V1JobConditionBuilder().withStatus("True")
+ .withType("Succeeded").withLastTransitionTime(runningTime).build();
+ conditions.add(condition);
+ mlJob.getStatus().setConditions(conditions);
+
experiment = MLJobConverter.toJobFromMLJob(mlJob);
Assert.assertEquals(Experiment.Status.STATUS_SUCCEEDED.toString(), experiment.getStatus());
Assert.assertEquals(finishedTime.toString(), experiment.getFinishedTime());
@@ -99,7 +104,7 @@
public void testMLJob2DeleteOptions() throws IOException, URISyntaxException,
InvalidSpecException {
ExperimentSpec spec = (ExperimentSpec) buildFromJsonFile(ExperimentSpec.class, tfJobReqFile);
- MLJob mlJob = ExperimentSpecParser.parseJob(spec);
+ MLJob mlJob = MLJobFactory.getMLJob(spec);
V1DeleteOptions options = MLJobConverter.toDeleteOptionsFromMLJob(mlJob);
Assert.assertNotNull(options);
Assert.assertEquals(mlJob.getApiVersion(), options.getApiVersion());
diff --git a/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/NotebookSpecParserTest.java b/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/NotebookSpecParserTest.java
index 27fd5fb..7d9f576 100644
--- a/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/NotebookSpecParserTest.java
+++ b/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/NotebookSpecParserTest.java
@@ -21,21 +21,46 @@
import io.kubernetes.client.openapi.models.V1EnvVar;
import io.kubernetes.client.openapi.models.V1ObjectMeta;
+import org.apache.submarine.commons.utils.SubmarineConfiguration;
+import org.apache.submarine.server.api.environment.Environment;
import org.apache.submarine.server.api.spec.NotebookMeta;
import org.apache.submarine.server.api.spec.NotebookPodSpec;
import org.apache.submarine.server.api.spec.NotebookSpec;
import org.apache.submarine.server.submitter.k8s.model.common.Configmap;
import org.apache.submarine.server.submitter.k8s.model.notebook.NotebookCR;
import org.apache.submarine.server.submitter.k8s.model.notebook.NotebookCRSpec;
+import org.apache.submarine.server.submitter.k8s.parser.NotebookSpecParser;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.net.URISyntaxException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
import java.util.Map;
public class NotebookSpecParserTest extends SpecBuilder {
+ private static final SubmarineConfiguration conf = SubmarineConfiguration.getInstance();
+
+ @Before
+ public void beforeInit() {
+ conf.setJdbcUrl(H2_JDBC_URL);
+ conf.setJdbcDriverClassName(H2_JDBC_DRIVERCLASS);
+ conf.setJdbcUserName(H2_JDBC_USERNAME);
+ conf.setJdbcPassword(H2_JDBC_PASSWORD);
+ try (Connection conn = DriverManager.getConnection(H2_JDBC_URL,
+ H2_JDBC_USERNAME, H2_JDBC_PASSWORD);
+ Statement stmt = conn.createStatement()) {
+ stmt.execute("RUNSCRIPT FROM 'classpath:/db/notebook.sql'");
+ } catch (SQLException e) {
+ e.printStackTrace();
+ }
+ }
+
@Test
public void testValidNotebook() throws IOException, URISyntaxException {
NotebookSpec notebookSpec = (NotebookSpec) buildFromJsonFile(NotebookSpec.class, notebookReqFile);
@@ -47,14 +72,15 @@
}
private void validateMetadata(NotebookMeta meta, V1ObjectMeta actualMeta) {
- Assert.assertEquals(meta.getName(), actualMeta.getName());
+ Assert.assertEquals("notebook-1642402491519-0003-my-nb", actualMeta.getName());
Assert.assertEquals(meta.getNamespace(), actualMeta.getNamespace());
Assert.assertEquals(meta.getOwnerId(),
actualMeta.getLabels().get(NotebookCR.NOTEBOOK_OWNER_SELECTOR_KEY));
}
private void validateEnvironment(NotebookSpec spec, NotebookCRSpec actualPodSpec) {
- String expectedImage = spec.getEnvironment().getImage();
+ Environment env = NotebookSpecParser.getEnvironment(spec);
+ String expectedImage = env.getEnvironmentSpec().getDockerImage();
String actualImage = actualPodSpec.getContainerImageName();
Assert.assertEquals(expectedImage, actualImage);
}
@@ -70,7 +96,7 @@
for (Map.Entry<String, String> entry : podSpec.getEnvVars().entrySet()) {
V1EnvVar env = new V1EnvVar();
env.setName(entry.getKey());
- env.setValue(env.getValue());
+ env.setValue(entry.getValue());
Assert.assertTrue(notebook.getSpec().getEnvs().contains(env));
}
diff --git a/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/SpecBuilder.java b/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/SpecBuilder.java
index 96f8908..1f401f4 100644
--- a/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/SpecBuilder.java
+++ b/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/SpecBuilder.java
@@ -34,9 +34,9 @@
public abstract class SpecBuilder {
// The spec files in test/resources
- protected final String tfJobReqFile = "/tf_mnist_req.json";
- protected final String pytorchJobReqFile = "/pytorch_job_req.json";
- protected final String xgboostJobReqFile = "/xgboost_job_req.json";
+ public static final String tfJobReqFile = "/tf_mnist_req.json";
+ public static final String pytorchJobReqFile = "/pytorch_job_req.json";
+ public static final String xgboostJobReqFile = "/xgboost_job_req.json";
protected final String pytorchJobWithEnvReqFile = "/pytorch_job_req_env.json";
protected final String pytorchJobWithInvalidEnvReqFile =
"/pytorch_job_req_invalid_env.json";
@@ -47,6 +47,11 @@
"/pytorch_job_req_ssh_git_code_localizer.json";
protected final String tfTfboardJobReqFile = "/tf_tfboard_mnist_req.json";
+ protected final String H2_JDBC_URL = "jdbc:h2:mem:submarine-test;MODE=MYSQL;DB_CLOSE_DELAY=-1";
+ protected final String H2_JDBC_DRIVERCLASS = "org.h2.Driver";
+ protected final String H2_JDBC_USERNAME = "root";
+ protected final String H2_JDBC_PASSWORD = "";
+
protected Object buildFromJsonFile(Object obj, String filePath) throws IOException,
URISyntaxException {
Gson gson = new GsonBuilder().create();
@@ -55,7 +60,11 @@
if (obj.equals(NotebookSpec.class)) {
return gson.fromJson(reader, NotebookSpec.class);
} else {
- return gson.fromJson(reader, ExperimentSpec.class);
+ ExperimentSpec experimentSpec = gson.fromJson(reader, ExperimentSpec.class);
+ experimentSpec.getMeta().setExperimentId(
+ String.format("experiment-%s-0001", System.currentTimeMillis())
+ );
+ return experimentSpec;
}
}
}
diff --git a/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/SubmitterTransactionTest.java b/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/SubmitterTransactionTest.java
index 6acdac6..1246188 100644
--- a/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/SubmitterTransactionTest.java
+++ b/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/SubmitterTransactionTest.java
@@ -29,8 +29,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.List;
-
public class SubmitterTransactionTest {
private static final Logger LOG = LoggerFactory.getLogger(SubmitterTransactionTest.class);
@@ -53,29 +51,41 @@
}
@Test
- public void testOneFailedCommit() {
- List<Object> commits = submitter.resourceTransaction(new FailedResource());
- Assert.assertEquals(commits.size(), 0);
- }
-
- @Test
public void testCombineCommit() {
- List<Object> commits = submitter.resourceTransaction(new SuccessResource("test1"),
- new FailedResource());
- Assert.assertEquals(commits.size(), 1);
+ SuccessResource resource1 = new SuccessResource("test1");
+ FailedResource resource2 = new FailedResource();
+ try {
+ submitter.resourceTransaction(resource1, resource2);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ Assert.assertFalse(resource1.isCommit());
}
@Test
public void testCommitOrder() {
- List<Object> commits = submitter.resourceTransaction(new SuccessResource("test1"),
- new FailedResource(), new SuccessResource("test3"));
- Assert.assertEquals(commits.size(), 1);
+ SuccessResource resource1 = new SuccessResource("test1");
+ FailedResource resource2 = new FailedResource();
+ SuccessResource resource3 = new SuccessResource("test3\"");
+ try {
+ submitter.resourceTransaction(resource1, resource2, resource3);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ Assert.assertFalse(resource1.isCommit());
+ Assert.assertFalse(resource3.isCommit());
}
}
class SuccessResource implements K8sResource<SuccessResource> {
+ private boolean commit = false;
+
+ public boolean isCommit() {
+ return commit;
+ }
+
private final String name;
SuccessResource(String name) {
@@ -99,16 +109,19 @@
@Override
public SuccessResource create(K8sClient api) {
+ this.commit = true;
return this;
}
@Override
public SuccessResource replace(K8sClient api) {
+ this.commit = true;
return this;
}
@Override
public SuccessResource delete(K8sClient api) {
+ this.commit = false;
return this;
}
}
diff --git a/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/client/K8sMockClient.java b/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/client/K8sMockClient.java
index 684b30c..c6d19a4 100644
--- a/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/client/K8sMockClient.java
+++ b/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/client/K8sMockClient.java
@@ -88,6 +88,11 @@
private final GenericKubernetesApi<IstioVirtualService, IstioVirtualServiceList> istioVirtualServiceClient;
private final GenericKubernetesApi<V1Pod, V1PodList> podClient;
+ // train operator client
+ private final GenericKubernetesApi<TFJob, TFJobList> tfJobClient;
+ private final GenericKubernetesApi<PyTorchJob, PyTorchJobList> pyTorchJobClient;
+ private final GenericKubernetesApi<XGBoostJob, XGBoostJobList> xgboostJobClient;
+
private static final WireMockRule wireMockRule = new WireMockRule(8384);
public static WireMockRule getWireMockRule() {
@@ -139,6 +144,22 @@
new GenericKubernetesApi<>(
V1Pod.class, V1PodList.class,
"", "v1", "pods", apiClient);
+
+ tfJobClient =
+ new GenericKubernetesApi<>(
+ TFJob.class, TFJobList.class,
+ TFJob.CRD_TF_GROUP_V1, TFJob.CRD_TF_VERSION_V1,
+ TFJob.CRD_TF_PLURAL_V1, apiClient);
+ pyTorchJobClient =
+ new GenericKubernetesApi<>(
+ PyTorchJob.class, PyTorchJobList.class,
+ PyTorchJob.CRD_PYTORCH_GROUP_V1, PyTorchJob.CRD_PYTORCH_VERSION_V1,
+ PyTorchJob.CRD_PYTORCH_PLURAL_V1, apiClient);
+ xgboostJobClient =
+ new GenericKubernetesApi<>(
+ XGBoostJob.class, XGBoostJobList.class,
+ XGBoostJob.CRD_XGBOOST_GROUP_V1, XGBoostJob.CRD_XGBOOST_VERSION_V1,
+ XGBoostJob.CRD_XGBOOST_PLURAL_V1, apiClient);
}
public K8sMockClient(MappingBuilder... mappingBuilders) throws IOException {
@@ -192,17 +213,17 @@
@Override
public GenericKubernetesApi<TFJob, TFJobList> getTfJobClient() {
- return null;
+ return tfJobClient;
}
@Override
public GenericKubernetesApi<PyTorchJob, PyTorchJobList> getPyTorchJobClient() {
- return null;
+ return pyTorchJobClient;
}
@Override
public GenericKubernetesApi<XGBoostJob, XGBoostJobList> getXGBoostJobClient() {
- return null;
+ return xgboostJobClient;
}
@Override
diff --git a/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/client/MockClientUtil.java b/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/client/MockClientUtil.java
index 8f37c0d..0649922 100644
--- a/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/client/MockClientUtil.java
+++ b/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/client/MockClientUtil.java
@@ -46,6 +46,18 @@
return String.format("/api/v1/namespaces/%s/pods/%s", namespace, name);
}
+ public static String getTfJobUrl(String namespace, String name) {
+ return String.format("/apis/kubeflow.org/v1/namespaces/%s/tfjobs/%s", namespace, name);
+ }
+
+ public static String getPytorchJobUrl(String namespace, String name) {
+ return String.format("/apis/kubeflow.org/v1/namespaces/%s/pytorchjobs/%s", namespace, name);
+ }
+
+ public static String getXGBoostJobUrl(String namespace, String name) {
+ return String.format("/apis/kubeflow.org/v1/namespaces/%s/xgboostjobs/%s", namespace, name);
+ }
+
public static String getMockSuccessStatus(String name) {
V1Status status = new V1Status();
status.setApiVersion("v1");
diff --git a/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/mljob/SubmitterFileUtil.java b/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/mljob/SubmitterFileUtil.java
new file mode 100644
index 0000000..b997d93
--- /dev/null
+++ b/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/mljob/SubmitterFileUtil.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.submarine.server.submitter.k8s.mljob;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import org.apache.submarine.server.api.spec.ExperimentSpec;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Reader;
+import java.net.URISyntaxException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+
+public class SubmitterFileUtil {
+
+ public static ExperimentSpec buildFromJsonFile(String experimentId, String filePath) throws IOException,
+ URISyntaxException {
+ Gson gson = new GsonBuilder().create();
+ try (Reader reader = Files.newBufferedReader(
+ new File(SubmitterFileUtil.class.getResource(filePath).toURI()).toPath(),
+ StandardCharsets.UTF_8)) {
+ ExperimentSpec experimentSpec = gson.fromJson(reader, ExperimentSpec.class);
+ experimentSpec.getMeta().setExperimentId(experimentId);
+ return experimentSpec;
+ }
+ }
+
+}
diff --git a/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/mljob/SubmitterPyTorchApiTest.java b/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/mljob/SubmitterPyTorchApiTest.java
new file mode 100644
index 0000000..11b3730
--- /dev/null
+++ b/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/mljob/SubmitterPyTorchApiTest.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.submarine.server.submitter.k8s.mljob;
+
+import com.github.tomakehurst.wiremock.client.MappingBuilder;
+import com.github.tomakehurst.wiremock.junit.WireMockRule;
+import com.github.tomakehurst.wiremock.matching.EqualToPattern;
+import org.apache.submarine.server.api.common.CustomResourceType;
+import org.apache.submarine.server.api.experiment.Experiment;
+import org.apache.submarine.server.api.spec.ExperimentSpec;
+import org.apache.submarine.server.submitter.k8s.K8sSubmitter;
+import org.apache.submarine.server.submitter.k8s.SpecBuilder;
+import org.apache.submarine.server.submitter.k8s.client.K8sClient;
+import org.apache.submarine.server.submitter.k8s.client.K8sMockClient;
+import org.apache.submarine.server.submitter.k8s.client.MockClientUtil;
+import org.apache.submarine.server.submitter.k8s.model.AgentPod;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+
+import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
+import static com.github.tomakehurst.wiremock.client.WireMock.delete;
+import static com.github.tomakehurst.wiremock.client.WireMock.get;
+import static com.github.tomakehurst.wiremock.client.WireMock.post;
+import static com.github.tomakehurst.wiremock.client.WireMock.urlPathEqualTo;
+import static org.apache.submarine.server.submitter.k8s.client.K8sMockClient.getResourceFileContent;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+public class SubmitterPyTorchApiTest {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SubmitterPyTorchApiTest.class);
+
+ private K8sSubmitter submitter;
+
+ private static final String namespace = "default";
+ private static final String experimentId = "experiment-1658656463509-0001";
+ private ExperimentSpec experimentSpec;
+
+ @Rule
+ public WireMockRule wireMockRule = K8sMockClient.getWireMockRule();
+
+ @Before
+ public void setup() throws IOException, URISyntaxException {
+ experimentSpec = SubmitterFileUtil.buildFromJsonFile(experimentId, SpecBuilder.pytorchJobReqFile);
+
+ // save pytorch url
+ MappingBuilder pytorchPost = post(urlPathEqualTo(
+ "/apis/kubeflow.org/v1/namespaces/default/pytorchjobs"))
+ .withHeader("Content-Type", new EqualToPattern("application/json; charset=UTF-8"))
+ .willReturn(
+ aResponse()
+ .withStatus(200)
+ .withBody(getResourceFileContent("client/experiment/pytorch-read-api.json")));
+ // save pod agent url
+ String agentName = AgentPod.getNormalizePodName(
+ CustomResourceType.PyTorchJob, "pytorch-dist-mnist", experimentId);
+ MappingBuilder agentPost = post(urlPathEqualTo("/api/v1/namespaces/default/pods"))
+ .withHeader("Content-Type", new EqualToPattern("application/json; charset=UTF-8"))
+ .willReturn(
+ aResponse()
+ .withStatus(200)
+ .withBody("{\"metadata\":{\"name\":\"" + agentName + "\"," + "\"namespace\":\"default\"}}"));
+
+ // get pytorch url
+ MappingBuilder pytorchGet = get(urlPathEqualTo(
+ MockClientUtil.getPytorchJobUrl(namespace, experimentId)))
+ .withHeader("Content-Type", new EqualToPattern("application/json"))
+ .willReturn(
+ aResponse()
+ .withStatus(200)
+ .withBody(getResourceFileContent("client/experiment/pytorch-read-api.json")));
+
+ // delete pytorch url
+ MappingBuilder pytorchDelete = delete(urlPathEqualTo(
+ MockClientUtil.getPytorchJobUrl(namespace, experimentId)))
+ .withHeader("Content-Type", new EqualToPattern("application/json; charset=UTF-8"))
+ .willReturn(
+ aResponse()
+ .withStatus(200)
+ .withBody(getResourceFileContent("client/experiment/pytorch-delete-api.json")));
+ // delete agent pod url
+ MappingBuilder agentDelete = delete(urlPathEqualTo(MockClientUtil.getPodUrl(namespace, agentName)))
+ .withHeader("Content-Type", new EqualToPattern("application/json; charset=UTF-8"))
+ .willReturn(
+ aResponse()
+ .withStatus(200)
+ .withBody(MockClientUtil.getMockSuccessStatus(agentName)));
+
+ K8sClient k8sClient = new K8sMockClient(pytorchPost, agentPost, pytorchGet, pytorchDelete, agentDelete);
+ try {
+ submitter = new K8sSubmitter(k8sClient);
+ submitter.initialize(null);
+ } catch (Exception e) {
+ LOG.warn("Init K8sSubmitter failed, but we can continue", e);
+ }
+ }
+
+ @Test
+ public void testCreatePyTorchJob() {
+ // create pytorch
+ Experiment experiment = submitter.createExperiment(experimentSpec);
+ // check return value
+ assertNotNull(experiment);
+ assertNotNull(experiment.getUid());
+ assertEquals(experiment.getStatus(), "Running");
+ }
+
+ @Test
+ public void testFindPyTorchJob() {
+ // get pytorch
+ Experiment experiment = submitter.findExperiment(experimentSpec);
+ // check return value
+ assertNotNull(experiment);
+ assertNotNull(experiment.getUid());
+ assertEquals("status is not running", experiment.getStatus(), "Running");
+ }
+
+ @Test
+ public void testDeletePyTorchJob() {
+ // delete pytorch
+ Experiment experiment = submitter.deleteExperiment(experimentSpec);
+ // check return value
+ assertNotNull(experiment);
+ }
+}
diff --git a/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/mljob/SubmitterTensorflowApiTest.java b/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/mljob/SubmitterTensorflowApiTest.java
new file mode 100644
index 0000000..7959ab2
--- /dev/null
+++ b/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/mljob/SubmitterTensorflowApiTest.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.submarine.server.submitter.k8s.mljob;
+
+import com.github.tomakehurst.wiremock.client.MappingBuilder;
+import com.github.tomakehurst.wiremock.junit.WireMockRule;
+import com.github.tomakehurst.wiremock.matching.EqualToPattern;
+import org.apache.submarine.server.api.common.CustomResourceType;
+import org.apache.submarine.server.api.experiment.Experiment;
+import org.apache.submarine.server.api.spec.ExperimentSpec;
+import org.apache.submarine.server.submitter.k8s.K8sSubmitter;
+import org.apache.submarine.server.submitter.k8s.SpecBuilder;
+import org.apache.submarine.server.submitter.k8s.client.K8sClient;
+import org.apache.submarine.server.submitter.k8s.client.K8sMockClient;
+import org.apache.submarine.server.submitter.k8s.client.MockClientUtil;
+import org.apache.submarine.server.submitter.k8s.model.AgentPod;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+
+import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
+import static com.github.tomakehurst.wiremock.client.WireMock.delete;
+import static com.github.tomakehurst.wiremock.client.WireMock.get;
+import static com.github.tomakehurst.wiremock.client.WireMock.post;
+import static com.github.tomakehurst.wiremock.client.WireMock.urlPathEqualTo;
+import static org.apache.submarine.server.submitter.k8s.client.K8sMockClient.getResourceFileContent;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+public class SubmitterTensorflowApiTest {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SubmitterTensorflowApiTest.class);
+
+ private K8sSubmitter submitter;
+
+ private static final String namespace = "default";
+ private static final String experimentId = "experiment-1659167632755-0001";
+ private ExperimentSpec experimentSpec;
+
+ @Rule
+ public WireMockRule wireMockRule = K8sMockClient.getWireMockRule();
+
+ @Before
+ public void setup() throws IOException, URISyntaxException {
+ experimentSpec = SubmitterFileUtil.buildFromJsonFile(experimentId, SpecBuilder.tfJobReqFile);
+
+ // save tf url
+ MappingBuilder tfPost = post(urlPathEqualTo(
+ "/apis/kubeflow.org/v1/namespaces/default/tfjobs"))
+ .withHeader("Content-Type", new EqualToPattern("application/json; charset=UTF-8"))
+ .willReturn(
+ aResponse()
+ .withStatus(200)
+ .withBody(getResourceFileContent("client/experiment/tf-read-api.json")));
+ // save pod agent url
+ String agentName = AgentPod.getNormalizePodName(
+ CustomResourceType.TFJob, "tensorflow-dist-mnist", experimentId);
+ MappingBuilder agentPost = post(urlPathEqualTo("/api/v1/namespaces/default/pods"))
+ .withHeader("Content-Type", new EqualToPattern("application/json; charset=UTF-8"))
+ .willReturn(
+ aResponse()
+ .withStatus(200)
+ .withBody("{\"metadata\":{\"name\":\"" + agentName + "\"," + "\"namespace\":\"default\"}}"));
+
+ // get tf url
+ MappingBuilder tfGet = get(urlPathEqualTo(
+ MockClientUtil.getTfJobUrl(namespace, experimentId)))
+ .withHeader("Content-Type", new EqualToPattern("application/json"))
+ .willReturn(
+ aResponse()
+ .withStatus(200)
+ .withBody(getResourceFileContent("client/experiment/tf-read-api.json")));
+
+ // delete tf url
+ MappingBuilder tfDelete = delete(urlPathEqualTo(
+ MockClientUtil.getTfJobUrl(namespace, experimentId)))
+ .withHeader("Content-Type", new EqualToPattern("application/json; charset=UTF-8"))
+ .willReturn(
+ aResponse()
+ .withStatus(200)
+ .withBody(getResourceFileContent("client/experiment/tf-delete-api.json")));
+ // delete agent pod url
+ MappingBuilder agentDelete = delete(urlPathEqualTo(MockClientUtil.getPodUrl(namespace, agentName)))
+ .withHeader("Content-Type", new EqualToPattern("application/json; charset=UTF-8"))
+ .willReturn(
+ aResponse()
+ .withStatus(200)
+ .withBody(MockClientUtil.getMockSuccessStatus(agentName)));
+
+ K8sClient k8sClient = new K8sMockClient(tfPost, agentPost, tfGet, tfDelete, agentDelete);
+ try {
+ submitter = new K8sSubmitter(k8sClient);
+ submitter.initialize(null);
+ } catch (Exception e) {
+ LOG.warn("Init K8sSubmitter failed, but we can continue", e);
+ }
+ }
+
+ @Test
+ public void testCreateTensorflowJob() {
+ // create tensorflow
+ Experiment experiment = submitter.createExperiment(experimentSpec);
+ // check return value
+ assertNotNull(experiment);
+ assertNotNull(experiment.getUid());
+ assertEquals(experiment.getStatus(), "Running");
+ }
+
+ @Test
+ public void testFindTensorflowJob() {
+ // get tensorflow
+ Experiment experiment = submitter.findExperiment(experimentSpec);
+ // check return value
+ assertNotNull(experiment);
+ assertNotNull(experiment.getUid());
+ assertEquals("status is not running", experiment.getStatus(), "Running");
+ }
+
+ @Test
+ public void testDeleteTensorflowJob() {
+ // delete tensorflow
+ Experiment experiment = submitter.deleteExperiment(experimentSpec);
+ // check return value
+ assertNotNull(experiment);
+ }
+}
diff --git a/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/mljob/SubmitterXGBoostApiTest.java b/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/mljob/SubmitterXGBoostApiTest.java
new file mode 100644
index 0000000..b8f9317
--- /dev/null
+++ b/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/mljob/SubmitterXGBoostApiTest.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.submarine.server.submitter.k8s.mljob;
+
+import com.github.tomakehurst.wiremock.client.MappingBuilder;
+import com.github.tomakehurst.wiremock.junit.WireMockRule;
+import com.github.tomakehurst.wiremock.matching.EqualToPattern;
+import org.apache.submarine.server.api.common.CustomResourceType;
+import org.apache.submarine.server.api.experiment.Experiment;
+import org.apache.submarine.server.api.spec.ExperimentSpec;
+import org.apache.submarine.server.submitter.k8s.K8sSubmitter;
+import org.apache.submarine.server.submitter.k8s.SpecBuilder;
+import org.apache.submarine.server.submitter.k8s.client.K8sClient;
+import org.apache.submarine.server.submitter.k8s.client.K8sMockClient;
+import org.apache.submarine.server.submitter.k8s.client.MockClientUtil;
+import org.apache.submarine.server.submitter.k8s.model.AgentPod;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+
+import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
+import static com.github.tomakehurst.wiremock.client.WireMock.delete;
+import static com.github.tomakehurst.wiremock.client.WireMock.get;
+import static com.github.tomakehurst.wiremock.client.WireMock.post;
+import static com.github.tomakehurst.wiremock.client.WireMock.urlPathEqualTo;
+import static org.apache.submarine.server.submitter.k8s.client.K8sMockClient.getResourceFileContent;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+public class SubmitterXGBoostApiTest {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SubmitterXGBoostApiTest.class);
+
+ private K8sSubmitter submitter;
+
+ private static final String namespace = "default";
+ private static final String experimentId = "experiment-1659181695811-0001";
+ private ExperimentSpec experimentSpec;
+
+ @Rule
+ public WireMockRule wireMockRule = K8sMockClient.getWireMockRule();
+
+ @Before
+ public void setup() throws IOException, URISyntaxException {
+ experimentSpec = SubmitterFileUtil.buildFromJsonFile(experimentId, SpecBuilder.xgboostJobReqFile);
+
+ // save xgboost url
+ MappingBuilder xgboostPost = post(urlPathEqualTo(
+ "/apis/kubeflow.org/v1/namespaces/default/xgboostjobs"))
+ .withHeader("Content-Type", new EqualToPattern("application/json; charset=UTF-8"))
+ .willReturn(
+ aResponse()
+ .withStatus(200)
+ .withBody(getResourceFileContent("client/experiment/xgboost-read-api.json")));
+ // save pod agent url
+ String agentName = AgentPod.getNormalizePodName(
+ CustomResourceType.XGBoost, "xgboost-dist-mnist", experimentId);
+ MappingBuilder agentPost = post(urlPathEqualTo("/api/v1/namespaces/default/pods"))
+ .withHeader("Content-Type", new EqualToPattern("application/json; charset=UTF-8"))
+ .willReturn(
+ aResponse()
+ .withStatus(200)
+ .withBody("{\"metadata\":{\"name\":\"" + agentName + "\"," + "\"namespace\":\"default\"}}"));
+
+ // get xgboost url
+ MappingBuilder xgboostGet = get(urlPathEqualTo(
+ MockClientUtil.getXGBoostJobUrl(namespace, experimentId)))
+ .withHeader("Content-Type", new EqualToPattern("application/json"))
+ .willReturn(
+ aResponse()
+ .withStatus(200)
+ .withBody(getResourceFileContent("client/experiment/xgboost-read-api.json")));
+
+ // delete xgboost url
+ MappingBuilder xgboostDelete = delete(urlPathEqualTo(
+ MockClientUtil.getXGBoostJobUrl(namespace, experimentId)))
+ .withHeader("Content-Type", new EqualToPattern("application/json; charset=UTF-8"))
+ .willReturn(
+ aResponse()
+ .withStatus(200)
+ .withBody(getResourceFileContent("client/experiment/xgboost-delete-api.json")));
+ // delete agent pod url
+ MappingBuilder agentDelete = delete(urlPathEqualTo(MockClientUtil.getPodUrl(namespace, agentName)))
+ .withHeader("Content-Type", new EqualToPattern("application/json; charset=UTF-8"))
+ .willReturn(
+ aResponse()
+ .withStatus(200)
+ .withBody(MockClientUtil.getMockSuccessStatus(agentName)));
+
+ K8sClient k8sClient = new K8sMockClient(xgboostPost, agentPost,
+ xgboostGet, xgboostDelete, agentDelete);
+ try {
+ submitter = new K8sSubmitter(k8sClient);
+ submitter.initialize(null);
+ } catch (Exception e) {
+ LOG.warn("Init K8sSubmitter failed, but we can continue", e);
+ }
+ }
+
+ @Test
+ public void testCreateXGBoostJob() {
+ // create XGBoost
+ Experiment experiment = submitter.createExperiment(experimentSpec);
+ // check return value
+ assertNotNull(experiment);
+ assertNotNull(experiment.getUid());
+ assertEquals(experiment.getStatus(), "Running");
+ }
+
+ @Test
+ public void testFindXGBoostJob() {
+ // get XGBoost
+ Experiment experiment = submitter.findExperiment(experimentSpec);
+ // check return value
+ assertNotNull(experiment);
+ assertNotNull(experiment.getUid());
+ assertEquals("status is not running", experiment.getStatus(), "Running");
+ }
+
+ @Test
+ public void testDeleteXGBoostJob() {
+ // delete XGBoost
+ Experiment experiment = submitter.deleteExperiment(experimentSpec);
+ // check return value
+ assertNotNull(experiment);
+ }
+}
diff --git a/submarine-server/server-submitter/submitter-k8s/src/test/resources/client/experiment/pytorch-delete-api.json b/submarine-server/server-submitter/submitter-k8s/src/test/resources/client/experiment/pytorch-delete-api.json
new file mode 100644
index 0000000..77e0681
--- /dev/null
+++ b/submarine-server/server-submitter/submitter-k8s/src/test/resources/client/experiment/pytorch-delete-api.json
@@ -0,0 +1,12 @@
+{
+ "apiVersion": "v1",
+ "details": {
+ "group": "kubeflow.org",
+ "kind": "PyTorchJob",
+ "name": "experiment-1658656463509-0001",
+ "uid": "b95d6769-26ff-469c-a346-d5399f543f39"
+ },
+ "kind": "Status",
+ "metadata": {},
+ "status": "Success"
+}
diff --git a/submarine-server/server-submitter/submitter-k8s/src/test/resources/client/experiment/pytorch-read-api.json b/submarine-server/server-submitter/submitter-k8s/src/test/resources/client/experiment/pytorch-read-api.json
new file mode 100644
index 0000000..18c92f3
--- /dev/null
+++ b/submarine-server/server-submitter/submitter-k8s/src/test/resources/client/experiment/pytorch-read-api.json
@@ -0,0 +1,160 @@
+{
+ "apiVersion": "kubeflow.org/v1",
+ "kind": "PyTorchJob",
+ "metadata": {
+ "creationTimestamp": "2022-07-24T17:54:23.000+08:00",
+ "generation": 1,
+ "labels": {
+ "submarine-experiment-name": "pytorch-dist-mnist"
+ },
+ "name": "experiment-1658656463509-0001",
+ "namespace": "default",
+ "resourceVersion": "26841",
+ "uid": "b95d6769-26ff-469c-a346-d5399f543f39"
+ },
+ "spec": {
+ "pytorchReplicaSpecs": {
+ "Master": {
+ "replicas": 1,
+ "template": {
+ "metadata": {
+ "annotations": {
+ "sidecar.istio.io/inject": "false"
+ }
+ },
+ "spec": {
+ "containers": [
+ {
+ "command": [
+ "python",
+ "/var/mnist.py",
+ "--backend",
+ "gloo"
+ ],
+ "env": [
+ {
+ "name": "ENV_1",
+ "value": "ENV1"
+ }
+ ],
+ "image": "apache/submarine:pytorch-dist-mnist-1.0",
+ "name": "pytorch",
+ "resources": {
+ "limits": {
+ "cpu": "2",
+ "memory": "4096M"
+ },
+ "requests": {
+ "cpu": "2",
+ "memory": "2048M"
+ }
+ },
+ "volumeMounts": [
+ {
+ "mountPath": "/logs",
+ "name": "volume",
+ "subPath": "submarine-tensorboard/pytorch-dist-mnist"
+ }
+ ]
+ }
+ ],
+ "volumes": [
+ {
+ "name": "volume",
+ "persistentVolumeClaim": {
+ "claimName": "submarine-tensorboard-pvc"
+ }
+ }
+ ]
+ }
+ },
+ "restartPolicy": "OnFailure"
+ },
+ "Worker": {
+ "replicas": 2,
+ "template": {
+ "metadata": {
+ "annotations": {
+ "sidecar.istio.io/inject": "false"
+ }
+ },
+ "spec": {
+ "containers": [
+ {
+ "command": [
+ "python",
+ "/var/mnist.py",
+ "--backend",
+ "gloo"
+ ],
+ "env": [
+ {
+ "name": "ENV_1",
+ "value": "ENV1"
+ }
+ ],
+ "image": "apache/submarine:pytorch-dist-mnist-1.0",
+ "name": "pytorch",
+ "resources": {
+ "limits": {
+ "cpu": "1",
+ "memory": "2048M"
+ },
+ "requests": {
+ "cpu": "1",
+ "memory": "1024M"
+ }
+ },
+ "volumeMounts": [
+ {
+ "mountPath": "/logs",
+ "name": "volume",
+ "subPath": "submarine-tensorboard/pytorch-dist-mnist"
+ }
+ ]
+ }
+ ],
+ "volumes": [
+ {
+ "name": "volume",
+ "persistentVolumeClaim": {
+ "claimName": "submarine-tensorboard-pvc"
+ }
+ }
+ ]
+ }
+ },
+ "restartPolicy": "OnFailure"
+ }
+ },
+ "backoffLimit": 3
+ },
+ "status": {
+ "conditions": [
+ {
+ "lastTransitionTime": "2022-07-24T11:23:25Z",
+ "lastUpdateTime": "2022-07-24T11:23:25Z",
+ "message": "PyTorchJob experiment-1658656463509-0001 is created.",
+ "reason": "PyTorchJobCreated",
+ "status": "True",
+ "type": "Created"
+ },
+ {
+ "lastTransitionTime": "2022-07-24T11:23:26Z",
+ "lastUpdateTime": "2022-07-24T11:23:26Z",
+ "message": "PyTorchJob experiment-1658656463509-0001 is running.",
+ "reason": "PyTorchJobRunning",
+ "status": "True",
+ "type": "Running"
+ }
+ ],
+ "replicaStatuses": {
+ "Master": {
+ "active": 1
+ },
+ "Worker": {
+ "active": 1
+ }
+ }
+ }
+}
diff --git a/submarine-server/server-submitter/submitter-k8s/src/test/resources/client/experiment/tf-delete-api.json b/submarine-server/server-submitter/submitter-k8s/src/test/resources/client/experiment/tf-delete-api.json
new file mode 100644
index 0000000..a0a1bf4
--- /dev/null
+++ b/submarine-server/server-submitter/submitter-k8s/src/test/resources/client/experiment/tf-delete-api.json
@@ -0,0 +1,12 @@
+{
+ "apiVersion": "v1",
+ "details": {
+ "group": "kubeflow.org",
+ "kind": "TFJob",
+ "name": "experiment-1659167632755-0001",
+ "uid": "d9b3c2dd-ce17-400a-a4f7-2781294ff3d5"
+ },
+ "kind": "Status",
+ "metadata": {},
+ "status": "Success"
+}
diff --git a/submarine-server/server-submitter/submitter-k8s/src/test/resources/client/experiment/tf-read-api.json b/submarine-server/server-submitter/submitter-k8s/src/test/resources/client/experiment/tf-read-api.json
new file mode 100644
index 0000000..d0ef50f
--- /dev/null
+++ b/submarine-server/server-submitter/submitter-k8s/src/test/resources/client/experiment/tf-read-api.json
@@ -0,0 +1,164 @@
+{
+ "apiVersion": "kubeflow.org/v1",
+ "kind": "TFJob",
+ "metadata": {
+ "creationTimestamp": "2022-07-30T15:53:52.000+08:00",
+ "generation": 1,
+ "labels": {
+ "submarine-experiment-name": "tensorflow-dist-mnist"
+ },
+ "name": "experiment-1659167632755-0001",
+ "namespace": "default",
+ "resourceVersion": "39556",
+ "uid": "d9b3c2dd-ce17-400a-a4f7-2781294ff3d5"
+ },
+ "spec": {
+ "tfReplicaSpecs": {
+ "Ps": {
+ "replicas": 1,
+ "template": {
+ "metadata": {
+ "annotations": {
+ "sidecar.istio.io/inject": "false"
+ }
+ },
+ "spec": {
+ "containers": [
+ {
+ "command": [
+ "python",
+ "/var/tf_mnist/mnist_with_summaries.py",
+ "--log_dir\u003d/train/log",
+ "--learning_rate\u003d0.01",
+ "--batch_size\u003d150"
+ ],
+ "env": [
+ {
+ "name": "ENV_1",
+ "value": "ENV1"
+ }
+ ],
+ "image": "apache/submarine:tf-mnist-with-summaries-1.0",
+ "name": "tensorflow",
+ "resources": {
+ "limits": {
+ "cpu": "4",
+ "memory": "4096M"
+ },
+ "requests": {
+ "cpu": "4",
+ "memory": "2048M"
+ }
+ },
+ "volumeMounts": [
+ {
+ "mountPath": "/logs",
+ "name": "volume",
+ "subPath": "submarine-tensorboard/tensorflow-dist-mnist"
+ }
+ ]
+ }
+ ],
+ "volumes": [
+ {
+ "name": "volume",
+ "persistentVolumeClaim": {
+ "claimName": "submarine-tensorboard-pvc"
+ }
+ }
+ ]
+ }
+ },
+ "restartPolicy": "OnFailure"
+ },
+ "Worker": {
+ "replicas": 2,
+ "template": {
+ "metadata": {
+ "annotations": {
+ "sidecar.istio.io/inject": "false"
+ }
+ },
+ "spec": {
+ "containers": [
+ {
+ "command": [
+ "python",
+ "/var/tf_mnist/mnist_with_summaries.py",
+ "--log_dir\u003d/train/log",
+ "--learning_rate\u003d0.01",
+ "--batch_size\u003d150"
+ ],
+ "env": [
+ {
+ "name": "ENV_1",
+ "value": "ENV1"
+ }
+ ],
+ "image": "apache/submarine:tf-mnist-with-summaries-1.0",
+ "name": "tensorflow",
+ "resources": {
+ "limits": {
+ "cpu": "2",
+ "memory": "2048M",
+ "nvidia.com/gpu": "1"
+ },
+ "requests": {
+ "cpu": "2",
+ "memory": "1024M",
+ "nvidia.com/gpu": "1"
+ }
+ },
+ "volumeMounts": [
+ {
+ "mountPath": "/logs",
+ "name": "volume",
+ "subPath": "submarine-tensorboard/tensorflow-dist-mnist"
+ }
+ ]
+ }
+ ],
+ "volumes": [
+ {
+ "name": "volume",
+ "persistentVolumeClaim": {
+ "claimName": "submarine-tensorboard-pvc"
+ }
+ }
+ ]
+ }
+ },
+ "restartPolicy": "OnFailure"
+ }
+ },
+ "backoffLimit": 3
+ },
+ "status": {
+ "conditions": [
+ {
+ "lastTransitionTime": "2022-07-30T07:59:01Z",
+ "lastUpdateTime": "2022-07-30T07:59:01Z",
+ "message": "TFJob experiment-1659167632755-0001 is created.",
+ "reason": "TFJobCreated",
+ "status": "True",
+ "type": "Created"
+ },
+ {
+ "lastTransitionTime": "2022-07-30T07:59:02Z",
+ "lastUpdateTime": "2022-07-30T07:59:02Z",
+ "message": "TFJob default/experiment-1659167632755-0001 is running.",
+ "reason": "TFJobRunning",
+ "status": "True",
+ "type": "Running"
+ }
+ ],
+ "replicaStatuses": {
+ "PS": {
+ "active": 1
+ },
+ "Worker": {
+ "active": 2
+ }
+ }
+ }
+}
diff --git a/submarine-server/server-submitter/submitter-k8s/src/test/resources/client/experiment/xgboost-delete-api.json b/submarine-server/server-submitter/submitter-k8s/src/test/resources/client/experiment/xgboost-delete-api.json
new file mode 100644
index 0000000..0077eec
--- /dev/null
+++ b/submarine-server/server-submitter/submitter-k8s/src/test/resources/client/experiment/xgboost-delete-api.json
@@ -0,0 +1,12 @@
+{
+ "apiVersion": "v1",
+ "details": {
+ "group": "kubeflow.org",
+ "kind": "TFJob",
+ "name": "experiment-1659181695811-0001",
+ "uid": "2e5a067e-153a-4a29-bee0-4e3ebfefad22"
+ },
+ "kind": "Status",
+ "metadata": {},
+ "status": "Success"
+}
diff --git a/submarine-server/server-submitter/submitter-k8s/src/test/resources/client/experiment/xgboost-read-api.json b/submarine-server/server-submitter/submitter-k8s/src/test/resources/client/experiment/xgboost-read-api.json
new file mode 100644
index 0000000..0742929
--- /dev/null
+++ b/submarine-server/server-submitter/submitter-k8s/src/test/resources/client/experiment/xgboost-read-api.json
@@ -0,0 +1,166 @@
+{
+ "apiVersion": "kubeflow.org/v1",
+ "kind": "XGBoostJob",
+ "metadata": {
+ "creationTimestamp": "2022-07-30T19:48:15.000+08:00",
+ "generation": 1,
+ "labels": {
+ "submarine-experiment-name": "xgboost-dist-mnist"
+ },
+ "name": "experiment-1659181695811-0001",
+ "namespace": "default",
+ "resourceVersion": "52320",
+ "uid": "2e5a067e-153a-4a29-bee0-4e3ebfefad22"
+ },
+ "spec": {
+ "xgbReplicaSpecs": {
+ "Master": {
+ "replicas": 1,
+ "template": {
+ "metadata": {
+ "annotations": {
+ "sidecar.istio.io/inject": "false"
+ }
+ },
+ "spec": {
+ "containers": [
+ {
+ "command": [
+ "python",
+ "/opt/mlkube/main.py",
+ "--job_type\u003dTrain",
+ "--xgboost_parameter\u003dobjective:multi:softprob,num_class:3",
+ "",
+ "--n_estimators\u003d10",
+ "--learning_rate\u003d0.1"
+ ],
+ "env": [
+ {
+ "name": "ENV_1",
+ "value": "ENV1"
+ }
+ ],
+ "image": "apache/submarine:xgboost-dist-iris-1.0",
+ "name": "xgboost",
+ "resources": {
+ "limits": {
+ "cpu": "2",
+ "memory": "4096M"
+ },
+ "requests": {
+ "cpu": "2",
+ "memory": "2048M"
+ }
+ },
+ "volumeMounts": [
+ {
+ "mountPath": "/logs",
+ "name": "volume",
+ "subPath": "submarine-tensorboard/xgboost-dist-mnist"
+ }
+ ]
+ }
+ ],
+ "volumes": [
+ {
+ "name": "volume",
+ "persistentVolumeClaim": {
+ "claimName": "submarine-tensorboard-pvc"
+ }
+ }
+ ]
+ }
+ },
+ "restartPolicy": "OnFailure"
+ },
+ "Worker": {
+ "replicas": 2,
+ "template": {
+ "metadata": {
+ "annotations": {
+ "sidecar.istio.io/inject": "false"
+ }
+ },
+ "spec": {
+ "containers": [
+ {
+ "command": [
+ "python",
+ "/opt/mlkube/main.py",
+ "--job_type\u003dTrain",
+ "--xgboost_parameter\u003dobjective:multi:softprob,num_class:3",
+ "",
+ "--n_estimators\u003d10",
+ "--learning_rate\u003d0.1"
+ ],
+ "env": [
+ {
+ "name": "ENV_1",
+ "value": "ENV1"
+ }
+ ],
+ "image": "apache/submarine:xgboost-dist-iris-1.0",
+ "name": "xgboost",
+ "resources": {
+ "limits": {
+ "cpu": "1",
+ "memory": "2048M"
+ },
+ "requests": {
+ "cpu": "1",
+ "memory": "1024M"
+ }
+ },
+ "volumeMounts": [
+ {
+ "mountPath": "/logs",
+ "name": "volume",
+ "subPath": "submarine-tensorboard/xgboost-dist-mnist"
+ }
+ ]
+ }
+ ],
+ "volumes": [
+ {
+ "name": "volume",
+ "persistentVolumeClaim": {
+ "claimName": "submarine-tensorboard-pvc"
+ }
+ }
+ ]
+ }
+ },
+ "restartPolicy": "OnFailure"
+ }
+ },
+ "backoffLimit": 3
+ },
+ "status": {
+ "conditions": [
+ {
+ "lastTransitionTime": "2022-07-30T11:51:39Z",
+ "lastUpdateTime": "2022-07-30T11:51:39Z",
+ "message": "xgboostJob experiment-1659181695811-0001 is created.",
+ "reason": "XGBoostJobCreated",
+ "status": "True",
+ "type": "Created"
+ },
+ {
+ "lastTransitionTime": "2022-07-30T11:51:40Z",
+ "lastUpdateTime": "2022-07-30T11:51:40Z",
+ "message": "xgboostJob experiment-1659181695811-0001 is running.",
+ "reason": "XGBoostJobRunning",
+ "status": "True",
+ "type": "Running"
+ }
+ ],
+ "replicaStatuses": {
+ "PS": {
+ "active": 1
+ },
+ "Worker": {
+ "active": 2
+ }
+ }
+ }
+}
diff --git a/submarine-server/server-submitter/submitter-k8s/src/test/resources/db/experiment.sql b/submarine-server/server-submitter/submitter-k8s/src/test/resources/db/experiment.sql
new file mode 100644
index 0000000..128e48a
--- /dev/null
+++ b/submarine-server/server-submitter/submitter-k8s/src/test/resources/db/experiment.sql
@@ -0,0 +1,38 @@
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements. See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License. You may obtain a copy of the License at
+-- http://www.apache.org/licenses/LICENSE-2.0
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+CREATE TABLE if not exists experiment
+(
+ id varchar(64) primary key,
+ experiment_spec text,
+ create_by varchar(32),
+ create_time datetime,
+ update_by varchar(32),
+ update_time datetime,
+ experiment_status varchar(20),
+ accepted_time datetime,
+ running_time datetime,
+ finished_time datetime,
+ uid varchar(64)
+ );
+
+CREATE TABLE if not exists environment
+(
+ id varchar(64) primary key,
+ environment_name varchar(255),
+ environment_spec text,
+ create_by varchar(32),
+ create_time datetime,
+ update_by varchar(32),
+ update_time datetime
+ );
diff --git a/submarine-server/server-submitter/submitter-k8s/src/test/resources/db/notebook.sql b/submarine-server/server-submitter/submitter-k8s/src/test/resources/db/notebook.sql
new file mode 100644
index 0000000..df5b7be
--- /dev/null
+++ b/submarine-server/server-submitter/submitter-k8s/src/test/resources/db/notebook.sql
@@ -0,0 +1,27 @@
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements. See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License. You may obtain a copy of the License at
+-- http://www.apache.org/licenses/LICENSE-2.0
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+CREATE TABLE if not exists environment
+(
+ id varchar(64) primary key,
+ environment_name varchar(255),
+ environment_spec text,
+ create_by varchar(32),
+ create_time datetime,
+ update_by varchar(32),
+ update_time datetime
+ );
+
+DELETE FROM environment WHERE id='environment_1600862964725_0001';
+INSERT INTO environment VALUES ('environment_1600862964725_0001', 'my-submarine-env', '{"name":"my-submarine-env","dockerImage":"apache/submarine:jupyter-notebook-x.x.x","kernelSpec":{"name":"submarine_jupyter_py3","channels":["defaults"],"condaDependencies":[],"pipDependencies":[]}}', 'admin', now(), 'admin', now());
+