SUBMARINE-1287. Experiment codes refactoring

### What is this PR for?
Based on the modified refactored notebook codes, the experiment codes are also need to be refactored.

### What type of PR is it?
Refactoring

### Todos
* [x] - Experiment codes refactoring in K8sSubmitter
* [x] - Add experiment test cases
* [x] - Fix some test cases error (Add h2 database dependency for test)
* [x] - Add `submarine-submitter-k8s` package test cases to workflow

### What is the Jira issue?
https://issues.apache.org/jira/browse/SUBMARINE-1287

### How should this be tested?
Add experiment test cases (tensorflow, pytorch, xgboots)

### Screenshots (if appropriate)
No

### Questions:
* Do the license files need updating? No
* Are there breaking changes for older versions? Yes
* Does this need new documentation? No

Author: cdmikechen <cdmikechen@hotmail.com>

Signed-off-by: Kevin <pingsutw@apache.org>

Closes #974 from cdmikechen/SUBMARINE-1287 and squashes the following commits:

01e2828f [cdmikechen] add jacoco cache
4b50fe23 [cdmikechen] Fix codecov error
09f19423 [cdmikechen] add submarine-submitter-k8s codecov
4f761d75 [cdmikechen] Add submarine-submitter-k8s to build
4d8fee16 [cdmikechen] Add test workflow
580e2cce [cdmikechen] Fix SubmitterTransactionTest error
25b3a088 [cdmikechen] Add PyTorch/XGBoost test
d0d9b55a [cdmikechen] Change agent name
a62e6117 [cdmikechen] Add SubmitterPyTorchApiTest
f857521f [cdmikechen] Fix NotebookSpecParserTest error
5401e2a5 [cdmikechen] Refactor Experiment
diff --git a/.github/workflows/master.yml b/.github/workflows/master.yml
index 9d74e5b..1f3ca70 100644
--- a/.github/workflows/master.yml
+++ b/.github/workflows/master.yml
@@ -555,11 +555,17 @@
       - uses: actions/checkout@v2
         with:
           fetch-depth: 50
-      - name: Cache jacoco.exec
+      - name: Cache submarine-server jacoco.exec
         uses: actions/cache@v2
         with:
           path: |
             ./submarine-server/server-submitter/target/jacoco.exec
+          key: ${{ runner.os }}-docker-${{ github.sha }}      
+      - name: Cache submitter-k8s jacoco.exec
+        uses: actions/cache@v2
+        with:
+          path: |
+            ./submarine-server/server-submitter/submitter-k8s/target/jacoco.exec
           key: ${{ runner.os }}-docker-${{ github.sha }}
       - name: Set up JDK 1.8
         uses: actions/setup-java@v1
@@ -575,13 +581,14 @@
           java -version
       - name: Build
         env:
-          MODULES: "-pl :submarine-server-submitter"
+          MODULES: "-pl :submarine-server-submitter,:submarine-submitter-k8s"
         run: |
           echo ">>> mvn $BUILD_FLAG $MODULES -B"
           mvn $BUILD_FLAG $MODULES -B
       - name: Test
         env:
-          TEST_MODULES: "-pl :submarine-server-submitter"
+          # There is a `submarine-submitter-k8s` package under the `submarine-server-submitter` that also needs to be tested
+          TEST_MODULES: "-pl :submarine-server-submitter,:submarine-submitter-k8s"
         run: |
           echo ">>> mvn $TEST_FLAG $TEST_MODULES -B"
           mvn $TEST_FLAG $TEST_MODULES -B
@@ -708,6 +715,12 @@
           path: |
             ./submarine-server/server-submitter/target/jacoco.exec
           key: ${{ runner.os }}-docker-${{ github.sha }}
+      - name: Cache submarine-submitter-k8s data
+        uses: actions/cache@v2
+        with:
+          path: |
+            ./submarine-server/server-submitter/submitter-k8s/target/jacoco.exec
+          key: ${{ runner.os }}-docker-${{ github.sha }}
       - name: Cache SonarCloud packages
         uses: actions/cache@v1
         with:
diff --git a/pom.xml b/pom.xml
index f21c1d5..301decd 100644
--- a/pom.xml
+++ b/pom.xml
@@ -88,6 +88,7 @@
     <bytebuddy.version>1.11.20</bytebuddy.version>
     <mybatis.version>3.2.8</mybatis.version>
     <mysql-connector-java.version>5.1.41</mysql-connector-java.version>
+    <h2-connector-java.version>1.4.194</h2-connector-java.version>
     <grpc.version>1.25.0</grpc.version>
 
     <!-- frontend maven plugin related versions-->
diff --git a/submarine-commons/commons-utils/src/main/java/org/apache/submarine/commons/utils/SubmarineConfiguration.java b/submarine-commons/commons-utils/src/main/java/org/apache/submarine/commons/utils/SubmarineConfiguration.java
index 3259062..fa5895e 100644
--- a/submarine-commons/commons-utils/src/main/java/org/apache/submarine/commons/utils/SubmarineConfiguration.java
+++ b/submarine-commons/commons-utils/src/main/java/org/apache/submarine/commons/utils/SubmarineConfiguration.java
@@ -240,6 +240,10 @@
     properties.put(SubmarineConfVars.ConfVars.METASTORE_JDBC_URL.getVarName(), testMetastoreJdbcUrl);
   }
 
+  @VisibleForTesting
+  public void setJdbcDriverClassName(String driverClassName) {
+    properties.put(SubmarineConfVars.ConfVars.JDBC_DRIVERCLASSNAME.getVarName(), driverClassName);
+  }
 
   @VisibleForTesting
   public void setJdbcUrl(String testJdbcUrl) {
@@ -314,7 +318,7 @@
   public String getServerServiceName() {
     return getString(SubmarineConfVars.ConfVars.SUBMARINE_SERVER_SERVICE_NAME);
   }
-  
+
   private String getStringValue(String name, String d) {
     String value = this.properties.get(name);
     if (value != null) {
diff --git a/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/exception/InvalidSpecException.java b/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/exception/InvalidSpecException.java
index 5e14f70..e911b79 100644
--- a/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/exception/InvalidSpecException.java
+++ b/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/exception/InvalidSpecException.java
@@ -19,7 +19,7 @@
 
 package org.apache.submarine.server.api.exception;
 
-public class InvalidSpecException extends Exception {
+public class InvalidSpecException extends RuntimeException {
   private static final long serialVersionUID = -1148223492821245434L;
 
   public InvalidSpecException(String message) {
diff --git a/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/spec/ExperimentMeta.java b/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/spec/ExperimentMeta.java
index 6e8d3dd..e4e8172 100644
--- a/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/spec/ExperimentMeta.java
+++ b/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/spec/ExperimentMeta.java
@@ -151,7 +151,8 @@
   public enum SupportedMLFramework {
     TENSORFLOW("tensorflow"),
     PYTORCH("pytorch"),
-    XGBOOST("xgboost");
+    XGBOOST("xgboost"),
+    UNKNOWN("known");
 
     private final String name;
 
@@ -171,6 +172,15 @@
       }
       return names;
     }
+
+    public static SupportedMLFramework valueOfName(String name) {
+      for (SupportedMLFramework framework : values()) {
+        if (framework.getName().equalsIgnoreCase(name)) {
+          return framework;
+        }
+      }
+      return UNKNOWN;
+    }
   }
 
   @Override
diff --git a/submarine-server/server-core/src/main/java/org/apache/submarine/server/manager/NotebookManager.java b/submarine-server/server-core/src/main/java/org/apache/submarine/server/manager/NotebookManager.java
index c691809..7698673 100644
--- a/submarine-server/server-core/src/main/java/org/apache/submarine/server/manager/NotebookManager.java
+++ b/submarine-server/server-core/src/main/java/org/apache/submarine/server/manager/NotebookManager.java
@@ -30,9 +30,7 @@
 
 import javax.ws.rs.core.Response;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.submarine.server.database.notebook.service.NotebookService;
 import org.slf4j.Logger;
@@ -83,16 +81,8 @@
     spec.getMeta().setName(lowerName);
     NotebookId notebookId = generateNotebookId();
 
-    Map<String, String> labels = spec.getMeta().getLabels();
-
-    if (labels == null) {
-      labels = new HashMap<>();
-    }
-    labels.put("notebook-owner-id", spec.getMeta().getOwnerId());
-    labels.put("notebook-id", notebookId.toString());
-    spec.getMeta().setLabels(labels);
+    // create notebook
     Notebook notebook = submitter.createNotebook(spec, notebookId.toString());
-
     notebook.setNotebookId(notebookId);
     notebook.setSpec(spec);
 
diff --git a/submarine-server/server-database/src/main/java/org/apache/submarine/server/database/utils/MyBatisUtil.java b/submarine-server/server-database/src/main/java/org/apache/submarine/server/database/utils/MyBatisUtil.java
index 28c4bd4..69ab6c1 100755
--- a/submarine-server/server-database/src/main/java/org/apache/submarine/server/database/utils/MyBatisUtil.java
+++ b/submarine-server/server-database/src/main/java/org/apache/submarine/server/database/utils/MyBatisUtil.java
@@ -85,10 +85,12 @@
     LOG.info("Run the test unit using the test database");
     // Run the test unit using the test database
     SubmarineConfiguration conf = SubmarineConfiguration.getInstance();
-    conf.setJdbcUrl("jdbc:mysql://127.0.0.1:3306/submarine_test?" +
-            "useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&allowMultiQueries=true&" +
-            "failOverReadOnly=false&zeroDateTimeBehavior=convertToNull&useSSL=false");
-    conf.setJdbcUserName("submarine_test");
-    conf.setJdbcPassword("password_test");
+    if (conf.getJdbcUrl().startsWith("jdbc:mysql")) {
+      conf.setJdbcUrl("jdbc:mysql://127.0.0.1:3306/submarine_test?" +
+              "useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&allowMultiQueries=true&" +
+              "failOverReadOnly=false&zeroDateTimeBehavior=convertToNull&useSSL=false");
+      conf.setJdbcUserName("submarine_test");
+      conf.setJdbcPassword("password_test");
+    }
   }
 }
diff --git a/submarine-server/server-submitter/submitter-k8s/pom.xml b/submarine-server/server-submitter/submitter-k8s/pom.xml
index 497b766..96e518b 100644
--- a/submarine-server/server-submitter/submitter-k8s/pom.xml
+++ b/submarine-server/server-submitter/submitter-k8s/pom.xml
@@ -88,6 +88,10 @@
           <groupId>mysql</groupId>
           <artifactId>mysql-connector-java</artifactId>
         </exclusion>
+        <exclusion>
+          <groupId>org.apache.derby</groupId>
+          <artifactId>derby</artifactId>
+        </exclusion>
       </exclusions>
     </dependency>
 
@@ -122,6 +126,13 @@
       <version>${jackson-databind.version}</version>
     </dependency>
 
+    <dependency>
+      <groupId>com.h2database</groupId>
+      <artifactId>h2</artifactId>
+      <version>${h2-connector-java.version}</version>
+      <scope>test</scope>
+    </dependency>
+
   </dependencies>
 
   <build>
diff --git a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/K8sSubmitter.java b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/K8sSubmitter.java
index d696dce..a550cec 100644
--- a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/K8sSubmitter.java
+++ b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/K8sSubmitter.java
@@ -23,20 +23,14 @@
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.function.Function;
 
-import com.google.gson.Gson;
-import com.google.gson.JsonSyntaxException;
-
-import io.kubernetes.client.util.generic.options.PatchOptions;
 import io.kubernetes.client.openapi.ApiException;
-import io.kubernetes.client.openapi.JSON;
-import io.kubernetes.client.custom.V1Patch;
 import io.kubernetes.client.openapi.models.V1Deployment;
 import io.kubernetes.client.openapi.models.V1ObjectMeta;
 import io.kubernetes.client.openapi.models.V1Pod;
 import io.kubernetes.client.openapi.models.V1PodList;
-import io.kubernetes.client.openapi.models.V1Status;
 import io.kubernetes.client.util.generic.options.CreateOptions;
 import io.kubernetes.client.util.generic.options.DeleteOptions;
 import io.kubernetes.client.util.generic.options.ListOptions;
@@ -70,12 +64,8 @@
 import org.apache.submarine.server.submitter.k8s.model.common.NullResource;
 import org.apache.submarine.server.submitter.k8s.model.common.PersistentVolumeClaim;
 import org.apache.submarine.server.submitter.k8s.model.mljob.MLJob;
+import org.apache.submarine.server.submitter.k8s.model.mljob.MLJobFactory;
 import org.apache.submarine.server.submitter.k8s.model.notebook.NotebookCR;
-import org.apache.submarine.server.submitter.k8s.model.tfjob.TFJob;
-import org.apache.submarine.server.submitter.k8s.model.pytorchjob.PyTorchJob;
-import org.apache.submarine.server.submitter.k8s.model.xgboostjob.XGBoostJob;
-import org.apache.submarine.server.submitter.k8s.parser.ExperimentSpecParser;
-import org.apache.submarine.server.submitter.k8s.util.MLJobConverter;
 import org.apache.submarine.server.submitter.k8s.util.NotebookUtils;
 import org.apache.submarine.server.submitter.k8s.util.OwnerReferenceUtils;
 
@@ -165,6 +155,30 @@
     }
   }
 
+  /**
+   * Delete resources with transaction
+   * This is an experimental API, Our main consideration is that:
+   * k8s resources transactional deletion cannot handle the rollback of transactions well,
+   * so we only guarantee the deletion of primary resource for the time being.
+   * We can tolerate the deletion failure of other dependent resources,
+   * so as to maximize the availability of the deletion API.
+   * @param primary primary resource, Failure of this resource will cause API exceptions
+   * @param dependentResources dependent resources
+   */
+  public <T> T deleteResourcesTransaction(K8sResource<T> primary, K8sResource... dependentResources) {
+    T returnResource = primary.delete(k8sClient);
+    for (K8sResource dependent : dependentResources) {
+      try {
+        dependent.delete(k8sClient);
+      } catch (Exception e) {
+        LOG.warn(String.format("Delete %s/%s failed. %s", dependent.getKind(),
+                dependent.getMetadata().getName(), e.getMessage()), e);
+        // TODO(cdmikechen): Record the error information into audit service for later tracking
+      }
+    }
+    return returnResource;
+  }
+
   public static V1ObjectMeta createMeta(String namespace, String name) {
     V1ObjectMeta metadata = new V1ObjectMeta();
     metadata.setNamespace(namespace);
@@ -183,190 +197,64 @@
     return deleteOptions;
   }
 
-  private enum ParseOp {
-    PARSE_OP_RESULT,
-    PARSE_OP_DELETE
-  }
-
   @Override
   public Experiment createExperiment(ExperimentSpec spec) throws SubmarineRuntimeException {
-    Experiment experiment;
     try {
-      MLJob mlJob = ExperimentSpecParser.parseJob(spec);
-      mlJob.getMetadata().setNamespace(getServerNamespace());
+      // MLJob K8s resource object
+      MLJob mlJob = MLJobFactory.getMLJob(spec);
       mlJob.getMetadata().setOwnerReferences(OwnerReferenceUtils.getOwnerReference());
-
-      CustomResourceType customResourceType;
-      if (mlJob.getPlural().equals(TFJob.CRD_TF_PLURAL_V1)) {
-        customResourceType = CustomResourceType.TFJob;
-      } else if (mlJob.getPlural().equals(XGBoostJob.CRD_XGBOOST_PLURAL_V1)) {
-        customResourceType = CustomResourceType.XGBoost;
-      } else {
-        customResourceType = CustomResourceType.PyTorchJob;
-      }
-
-      AgentPod agentPod = new AgentPod(getServerNamespace(), spec.getMeta().getName(), customResourceType,
-              spec.getMeta().getExperimentId());
-
-      Object object;
-      if (mlJob.getPlural().equals(TFJob.CRD_TF_PLURAL_V1)) {
-        object = k8sClient.getTfJobClient().create(getServerNamespace(), (TFJob) mlJob,
-                new CreateOptions()).throwsApiException().getObject();
-      } else if (mlJob.getPlural().equals(XGBoostJob.CRD_XGBOOST_PLURAL_V1)) {
-        object = k8sClient.getXGBoostJobClient().create(getServerNamespace(), (XGBoostJob) mlJob,
-                new CreateOptions()).throwsApiException().getObject();
-      } else {
-        object = k8sClient.getPyTorchJobClient().create(getServerNamespace(), (PyTorchJob) mlJob,
-                new CreateOptions()).throwsApiException().getObject();
-      }
-
-      k8sClient.getPodClient().create(agentPod).throwsApiException().getObject();
-      experiment = parseExperimentResponseObject(object, ParseOp.PARSE_OP_RESULT);
+      // Agent pod K8s resource object
+      AgentPod agentPod = new AgentPod(getServerNamespace(), spec.getMeta().getName(),
+          mlJob.getResourceType(), spec.getMeta().getExperimentId());
+      // commit resources/CRD with transaction
+      List<Object> values = resourceTransaction(mlJob, agentPod);
+      return (Experiment) values.get(0);
     } catch (InvalidSpecException e) {
-      LOG.error("K8s submitter: parse Job object failed by " + e.getMessage(), e);
-      throw new SubmarineRuntimeException(400, e.getMessage());
-    } catch (ApiException e) {
-      LOG.error("K8s submitter: failed to create pod " + e.getMessage(), e);
-      throw new SubmarineRuntimeException(e.getCode(), "K8s submitter: failed to create pod " +
-              e.getMessage());
+      LOG.error(String.format("K8s submitter: parse %s object failed by %s",
+              spec.getMeta().getFramework(), e.getMessage()), e);
+      throw new SubmarineRuntimeException(500, e.getMessage());
     }
-    return experiment;
   }
 
   @Override
   public Experiment findExperiment(ExperimentSpec spec) throws SubmarineRuntimeException {
-    Experiment experiment;
     try {
-
-      MLJob mlJob = ExperimentSpecParser.parseJob(spec);
-      mlJob.getMetadata().setNamespace(getServerNamespace());
-
-      Object object;
-      if (mlJob.getPlural().equals(TFJob.CRD_TF_PLURAL_V1)) {
-        object = k8sClient.getTfJobClient().get(getServerNamespace(),
-                mlJob.getMetadata().getName()).throwsApiException().getObject();
-      } else if (mlJob.getPlural().equals(XGBoostJob.CRD_XGBOOST_PLURAL_V1)) {
-        object = k8sClient.getXGBoostJobClient().get(getServerNamespace(),
-                mlJob.getMetadata().getName()).throwsApiException().getObject();
-      } else {
-        object = k8sClient.getPyTorchJobClient().get(getServerNamespace(),
-                mlJob.getMetadata().getName()).throwsApiException().getObject();
-      }
-
-      experiment = parseExperimentResponseObject(object, ParseOp.PARSE_OP_RESULT);
-
+      // MLJob K8s resource object
+      MLJob mlJob = MLJobFactory.getMLJob(spec);
+      // Read Experiment
+      return mlJob.read(k8sClient);
     } catch (InvalidSpecException e) {
-      throw new SubmarineRuntimeException(200, e.getMessage());
-    } catch (ApiException e) {
-      throw new SubmarineRuntimeException(e.getCode(), e.getMessage());
+      throw new SubmarineRuntimeException(400, e.getMessage());
     }
-
-    return experiment;
   }
 
   @Override
   public Experiment patchExperiment(ExperimentSpec spec) throws SubmarineRuntimeException {
-    Experiment experiment;
     try {
-      MLJob mlJob = ExperimentSpecParser.parseJob(spec);
-      mlJob.getMetadata().setNamespace(getServerNamespace());
-      // Using apply yaml patch, field manager must be set, and it must be forced.
-      // https://kubernetes.io/docs/reference/using-api/server-side-apply/#field-management
-      PatchOptions patchOptions = new PatchOptions();
-      patchOptions.setFieldManager(spec.getMeta().getExperimentId());
-      patchOptions.setForce(true);
-      Object object;
-      if (mlJob.getPlural().equals(TFJob.CRD_TF_PLURAL_V1)) {
-        object = k8sClient.getTfJobClient().patch(getServerNamespace(), mlJob.getMetadata().getName(),
-                V1Patch.PATCH_FORMAT_APPLY_YAML,
-                new V1Patch(new Gson().toJson(mlJob)),
-                patchOptions).throwsApiException().getObject();
-      } else if (mlJob.getPlural().equals(XGBoostJob.CRD_XGBOOST_PLURAL_V1)) {
-        object = k8sClient.getXGBoostJobClient().patch(getServerNamespace(), mlJob.getMetadata().getName(),
-                V1Patch.PATCH_FORMAT_APPLY_YAML,
-                new V1Patch(new Gson().toJson(mlJob)),
-                patchOptions).throwsApiException().getObject();
-      } else {
-        object = k8sClient.getPyTorchJobClient().patch(getServerNamespace(), mlJob.getMetadata().getName(),
-                V1Patch.PATCH_FORMAT_APPLY_YAML,
-                new V1Patch(new Gson().toJson(mlJob)),
-                patchOptions).throwsApiException().getObject();
-      }
-
-      experiment = parseExperimentResponseObject(object, ParseOp.PARSE_OP_RESULT);
+      // MLJob K8s resource object
+      MLJob mlJob = MLJobFactory.getMLJob(spec);
+      // Patch Experiment
+      return mlJob.replace(k8sClient);
     } catch (InvalidSpecException e) {
       throw new SubmarineRuntimeException(409, e.getMessage());
-    } catch (ApiException e) {
-      throw new SubmarineRuntimeException(e.getCode(), e.getMessage());
     } catch (Error e) {
       throw new SubmarineRuntimeException(500, String.format("Unhandled error: %s", e.getMessage()));
     }
-    return experiment;
   }
 
   @Override
   public Experiment deleteExperiment(ExperimentSpec spec) throws SubmarineRuntimeException {
-    Experiment experiment;
     try {
-      MLJob mlJob = ExperimentSpecParser.parseJob(spec);
-      mlJob.getMetadata().setNamespace(getServerNamespace());
-
-      CustomResourceType customResourceType;
-      if (mlJob.getPlural().equals(TFJob.CRD_TF_PLURAL_V1)) {
-        customResourceType = CustomResourceType.TFJob;
-      } else if (mlJob.getPlural().equals(XGBoostJob.CRD_XGBOOST_PLURAL_V1)) {
-        customResourceType = CustomResourceType.XGBoost;
-      } else {
-        customResourceType = CustomResourceType.PyTorchJob;
-      }
-
-      AgentPod agentPod = new AgentPod(getServerNamespace(), spec.getMeta().getName(), customResourceType,
-              spec.getMeta().getExperimentId());
-
-      Object object;
-      if (mlJob.getPlural().equals(TFJob.CRD_TF_PLURAL_V1)) {
-        object = k8sClient.getTfJobClient().delete(getServerNamespace(), mlJob.getMetadata().getName(),
-                MLJobConverter.toDeleteOptionsFromMLJob(mlJob)).throwsApiException().getStatus();
-      } else if (mlJob.getPlural().equals(XGBoostJob.CRD_XGBOOST_PLURAL_V1)) {
-        object = k8sClient.getXGBoostJobClient().delete(getServerNamespace(), mlJob.getMetadata().getName(),
-                        MLJobConverter.toDeleteOptionsFromMLJob(mlJob))
-                .throwsApiException().getStatus();
-      } else {
-        object = k8sClient.getPyTorchJobClient().delete(getServerNamespace(), mlJob.getMetadata().getName(),
-                        MLJobConverter.toDeleteOptionsFromMLJob(mlJob))
-                .throwsApiException().getStatus();
-      }
-
-      LOG.info(String.format("Experiment:%s had been deleted, start to delete agent pod:%s",
-              spec.getMeta().getName(), agentPod.getMetadata().getName()));
-      k8sClient.getPodClient().delete(agentPod.getMetadata().getNamespace(),
-              agentPod.getMetadata().getName());
-      experiment = parseExperimentResponseObject(object, ParseOp.PARSE_OP_DELETE);
+      // MLJob K8s resource object
+      MLJob mlJob = MLJobFactory.getMLJob(spec);
+      // Agent pod K8s resource object
+      AgentPod agentPod = new AgentPod(getServerNamespace(), spec.getMeta().getName(),
+              mlJob.getResourceType(), spec.getMeta().getExperimentId());
+      // Delete with transaction
+      return deleteResourcesTransaction(mlJob, agentPod);
     } catch (InvalidSpecException e) {
       throw new SubmarineRuntimeException(200, e.getMessage());
-    } catch (ApiException e) {
-      throw new SubmarineRuntimeException(e.getCode(), e.getMessage());
     }
-    return experiment;
-  }
-
-  private Experiment parseExperimentResponseObject(Object object, ParseOp op)
-          throws SubmarineRuntimeException {
-    Gson gson = new JSON().getGson();
-    String jsonString = gson.toJson(object);
-    LOG.info("Upstream response JSON: {}", jsonString);
-    try {
-      if (op == ParseOp.PARSE_OP_RESULT) {
-        MLJob mlJob = gson.fromJson(jsonString, MLJob.class);
-        return MLJobConverter.toJobFromMLJob(mlJob);
-      } else if (op == ParseOp.PARSE_OP_DELETE) {
-        V1Status status = gson.fromJson(jsonString, V1Status.class);
-        return MLJobConverter.toJobFromStatus(status);
-      }
-    } catch (JsonSyntaxException e) {
-      LOG.error("K8s submitter: parse response object failed by " + e.getMessage(), e);
-    }
-    throw new SubmarineRuntimeException(500, "K8s Submitter parse upstream response failed.");
   }
 
   @Override
@@ -435,7 +323,8 @@
   private boolean isDeploymentAvailable(String name) throws ApiException{
     V1Deployment deploy = k8sClient.getAppsV1Api()
             .readNamespacedDeploymentStatus(name, getServerNamespace(), "true");
-    return deploy.getStatus().getAvailableReplicas() > 0; // at least one replica is running
+    return deploy == null ? false : Optional.ofNullable(deploy.getStatus().getAvailableReplicas())
+        .map(ar -> ar > 0).orElse(false); // at least one replica is running
   }
 
   @Override
diff --git a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/AgentPod.java b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/AgentPod.java
index 2455ce5..f220d35 100644
--- a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/AgentPod.java
+++ b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/AgentPod.java
@@ -106,19 +106,18 @@
     this.setSpec(spec);
   }
 
-  private String getNormalizePodName(CustomResourceType type, String name, String resourceId) {
-    return String.format("%s-%s-%s-%s", resourceId.toString().toLowerCase().replace('_', '-'),
+  public static String getNormalizePodName(CustomResourceType type, String name, String resourceId) {
+    return String.format("%s-%s-%s-%s", resourceId.toLowerCase().replace('_', '-'),
             type.toString().toLowerCase(), name, CONTAINER_NAME);
   }
 
   @Override
   public AgentPod read(K8sClient api) {
-    return null;
+    throw new UnsupportedOperationException();
   }
 
   @Override
   public AgentPod create(K8sClient api) {
-    // create notebook custom resource
     try {
       if (LOG.isDebugEnabled()) {
         LOG.debug("Create AgentPod resource: \n{}", YamlUtils.toPrettyYaml(this));
diff --git a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/K8sResource.java b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/K8sResource.java
index 61c0134..d74859b 100644
--- a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/K8sResource.java
+++ b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/K8sResource.java
@@ -28,11 +28,23 @@
 
   V1ObjectMeta getMetadata();
 
+  /**
+   * Resource get api
+   */
   R read(K8sClient api);
 
+  /**
+   * Resource create api
+   */
   R create(K8sClient api);
 
+  /**
+   * Resource patch api
+   */
   R replace(K8sClient api);
 
+  /**
+   * Resource delete api
+   */
   R delete(K8sClient api);
 }
diff --git a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/mljob/MLJob.java b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/mljob/MLJob.java
index 567d86f..45c174c 100644
--- a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/mljob/MLJob.java
+++ b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/mljob/MLJob.java
@@ -19,16 +19,52 @@
 
 package org.apache.submarine.server.submitter.k8s.model.mljob;
 
+import com.google.gson.JsonSyntaxException;
 import com.google.gson.annotations.SerializedName;
 import io.kubernetes.client.common.KubernetesObject;
 import io.kubernetes.client.openapi.models.V1JobStatus;
 import io.kubernetes.client.openapi.models.V1ObjectMeta;
+import io.kubernetes.client.openapi.models.V1ObjectMetaBuilder;
+import io.kubernetes.client.openapi.models.V1Status;
+import org.apache.submarine.commons.utils.exception.SubmarineRuntimeException;
+import org.apache.submarine.server.api.common.CustomResourceType;
+import org.apache.submarine.server.api.experiment.Experiment;
+import org.apache.submarine.server.api.spec.ExperimentMeta;
+import org.apache.submarine.server.api.spec.ExperimentSpec;
+import org.apache.submarine.server.k8s.utils.K8sUtils;
+import org.apache.submarine.server.submitter.k8s.client.K8sClient;
+import org.apache.submarine.server.submitter.k8s.model.K8sResource;
+import org.apache.submarine.server.submitter.k8s.util.JsonUtils;
+import org.apache.submarine.server.submitter.k8s.util.MLJobConverter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * The machine learning job for the CRD job.
- * It be serialized as body input to k8s api client
+ * It is serialized as body input to k8s api client.
+ * <p>
+ * For job resource definitions and related information,
+ * please refer to https://github.com/kubeflow/training-operator
  */
-public class MLJob implements KubernetesObject{
+public abstract class MLJob implements KubernetesObject, K8sResource<Experiment> {
+
+  protected final Logger LOG = LoggerFactory.getLogger(getClass());
+
+  public MLJob(ExperimentSpec experimentSpec) {
+    // set metadata
+    V1ObjectMetaBuilder metaBuilder = new V1ObjectMetaBuilder();
+    metaBuilder.withNamespace(K8sUtils.getNamespace())
+        // SUBMARINE-880 replace name to experimentId
+        .withName(experimentSpec.getMeta().getExperimentId())
+        //.addAllToOwnerReferences(OwnerReferenceUtils.getOwnerReference())
+        .addToLabels(ExperimentMeta.SUBMARINE_EXPERIMENT_NAME, experimentSpec.getMeta().getName());
+    setMetadata(metaBuilder.build());
+    // set framework
+    setFramework(experimentSpec.getMeta().getFramework());
+    // set experimentId
+    setExperimentId(experimentSpec.getMeta().getExperimentId());
+  }
+
   @SerializedName("apiVersion")
   private String apiVersion;
 
@@ -48,6 +84,10 @@
   @SerializedName("status")
   private V1JobStatus status;
 
+  private String framework;
+
+  private String experimentId;
+
   /**
    * Set the api with version
    *
@@ -148,4 +188,76 @@
   public void setStatus(V1JobStatus status) {
     this.status = status;
   }
+
+  public String getFramework() {
+    return framework;
+  }
+
+  public void setFramework(String framework) {
+    this.framework = framework;
+  }
+
+  public String getExperimentId() {
+    return experimentId;
+  }
+
+  public void setExperimentId(String experimentId) {
+    this.experimentId = experimentId;
+  }
+
+  /**
+   * Convert MLJob object to return Experiment object
+   */
+  protected <T extends MLJob> Experiment parseExperimentResponseObject(T object, Class<T> tClass)
+          throws SubmarineRuntimeException {
+    String jsonString = JsonUtils.toJson(object);
+    LOG.info("Upstream response JSON: {}", jsonString);
+    try {
+      return MLJobConverter.toJobFromMLJob(JsonUtils.fromJson(jsonString, tClass));
+    } catch (JsonSyntaxException e) {
+      LOG.error("K8s submitter: parse response object failed by " + e.getMessage(), e);
+      throw new SubmarineRuntimeException(500, "K8s Submitter parse upstream response failed.");
+    }
+  }
+
+  /**
+   * Convert MLJob status to return Experiment object
+   */
+  protected Experiment parseExperimentResponseStatus(V1Status status)
+          throws SubmarineRuntimeException {
+    String jsonString = JsonUtils.toJson(status);
+    LOG.info("Upstream response JSON: {}", jsonString);
+    try {
+      return MLJobConverter.toJobFromStatus(JsonUtils.fromJson(jsonString, V1Status.class));
+    } catch (JsonSyntaxException e) {
+      LOG.error("K8s submitter: parse response object failed by " + e.getMessage(), e);
+      throw new SubmarineRuntimeException(500, "K8s Submitter parse upstream response failed.");
+    }
+  }
+
+  /**
+   * Get custom resource type
+   * This method is mainly used by pod agent
+   */
+  public abstract CustomResourceType getResourceType();
+
+  @Override
+  public Experiment read(K8sClient api) {
+    throw new UnsupportedOperationException("MLJob does not implement this method!");
+  }
+
+  @Override
+  public Experiment create(K8sClient api) {
+    throw new UnsupportedOperationException("MLJob does not implement this method!");
+  }
+
+  @Override
+  public Experiment replace(K8sClient api) {
+    throw new UnsupportedOperationException("MLJob does not implement this method!");
+  }
+
+  @Override
+  public Experiment delete(K8sClient api) {
+    throw new UnsupportedOperationException("MLJob does not implement this method!");
+  }
 }
diff --git a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/mljob/MLJobFactory.java b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/mljob/MLJobFactory.java
new file mode 100644
index 0000000..d766ea0
--- /dev/null
+++ b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/mljob/MLJobFactory.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.submarine.server.submitter.k8s.model.mljob;
+
+import org.apache.submarine.server.api.exception.InvalidSpecException;
+import org.apache.submarine.server.api.spec.ExperimentMeta;
+import org.apache.submarine.server.api.spec.ExperimentSpec;
+import org.apache.submarine.server.submitter.k8s.model.pytorchjob.PyTorchJob;
+import org.apache.submarine.server.submitter.k8s.model.tfjob.TFJob;
+import org.apache.submarine.server.submitter.k8s.model.xgboostjob.XGBoostJob;
+
+/**
+ * Select different MLJob implementation classes according to different framework
+ */
+public class MLJobFactory {
+
+  /**
+   * Get MLJob by framework name
+   */
+  public static MLJob getMLJob(ExperimentSpec experimentSpec) {
+    String frameworkName = experimentSpec.getMeta().getFramework();
+    ExperimentMeta.SupportedMLFramework framework = ExperimentMeta.SupportedMLFramework
+            .valueOfName(frameworkName);
+    switch (framework) {
+      case TENSORFLOW:
+        return new TFJob(experimentSpec);
+      case PYTORCH:
+        return new PyTorchJob(experimentSpec);
+      case XGBOOST:
+        return new XGBoostJob(experimentSpec);
+      default:
+        throw new InvalidSpecException("Unsupported framework name: " + frameworkName +
+                ". Supported frameworks are: " +
+                String.join(",", ExperimentMeta.SupportedMLFramework.names()));
+    }
+  }
+
+}
diff --git a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/mljob/MLJobReplicaSpec.java b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/mljob/MLJobReplicaSpec.java
index ceb7d04..770f789 100644
--- a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/mljob/MLJobReplicaSpec.java
+++ b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/mljob/MLJobReplicaSpec.java
@@ -101,7 +101,7 @@
     V1PodTemplateSpec podSpec = getTemplate();
     return String.join(" ",
         podSpec.getSpec().getContainers().get(0)
-            .getResources().getLimits().get("memory").
+            .getResources().getRequests().get("memory").
             getNumber().divide(BigDecimal.valueOf(1000000)).toString() + "M");
   }
 
diff --git a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/notebook/NotebookCR.java b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/notebook/NotebookCR.java
index c5c58f6..6dcd0a7 100644
--- a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/notebook/NotebookCR.java
+++ b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/notebook/NotebookCR.java
@@ -39,6 +39,9 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.HashMap;
+import java.util.Map;
+
 import static org.apache.submarine.server.submitter.k8s.K8sSubmitter.getDeleteOptions;
 
 public class NotebookCR implements KubernetesObject, K8sResource<Notebook> {
@@ -170,7 +173,14 @@
       V1ObjectMeta meta = new V1ObjectMeta();
       meta.setName(notebookName);
       meta.setNamespace(namespace);
-      meta.setLabels(notebookSpec.getMeta().getLabels());
+      // we need to use some labels to define/filter the properties of notebook
+      Map<String, String> labels = notebookSpec.getMeta().getLabels();
+      if (labels == null) {
+        labels = new HashMap<>(2);
+      }
+      labels.put("notebook-owner-id", notebookSpec.getMeta().getOwnerId());
+      labels.put("notebook-id", notebookId);
+      meta.setLabels(labels);
       meta.setOwnerReferences(OwnerReferenceUtils.getOwnerReference());
       this.setMetadata(meta);
       // set spec
diff --git a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/pytorchjob/PyTorchJob.java b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/pytorchjob/PyTorchJob.java
index 4bda8db..7da916c 100644
--- a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/pytorchjob/PyTorchJob.java
+++ b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/pytorchjob/PyTorchJob.java
@@ -20,7 +20,28 @@
 package org.apache.submarine.server.submitter.k8s.model.pytorchjob;
 
 import com.google.gson.annotations.SerializedName;
+import io.kubernetes.client.custom.V1Patch;
+import io.kubernetes.client.openapi.ApiException;
+import io.kubernetes.client.openapi.models.V1PodTemplateSpec;
+import io.kubernetes.client.openapi.models.V1Status;
+import io.kubernetes.client.util.generic.options.CreateOptions;
+import io.kubernetes.client.util.generic.options.PatchOptions;
+import org.apache.submarine.commons.utils.exception.SubmarineRuntimeException;
+import org.apache.submarine.server.api.common.CustomResourceType;
+import org.apache.submarine.server.api.exception.InvalidSpecException;
+import org.apache.submarine.server.api.experiment.Experiment;
+import org.apache.submarine.server.api.spec.ExperimentSpec;
+import org.apache.submarine.server.api.spec.ExperimentTaskSpec;
+import org.apache.submarine.server.submitter.k8s.client.K8sClient;
 import org.apache.submarine.server.submitter.k8s.model.mljob.MLJob;
+import org.apache.submarine.server.submitter.k8s.model.mljob.MLJobReplicaSpec;
+import org.apache.submarine.server.submitter.k8s.parser.ExperimentSpecParser;
+import org.apache.submarine.server.submitter.k8s.util.JsonUtils;
+import org.apache.submarine.server.submitter.k8s.util.MLJobConverter;
+import org.apache.submarine.server.submitter.k8s.util.YamlUtils;
+
+import java.util.HashMap;
+import java.util.Map;
 
 public class PyTorchJob extends MLJob {
 
@@ -34,12 +55,48 @@
   @SerializedName("spec")
   private PyTorchJobSpec spec;
 
-  public PyTorchJob() {
+  public PyTorchJob(ExperimentSpec experimentSpec) throws InvalidSpecException {
+    super(experimentSpec);
     setApiVersion(CRD_PYTORCH_API_VERSION_V1);
     setKind(CRD_PYTORCH_KIND_V1);
     setPlural(CRD_PYTORCH_PLURAL_V1);
     setVersion(CRD_PYTORCH_VERSION_V1);
     setGroup(CRD_PYTORCH_GROUP_V1);
+    // set spec
+    setSpec(parsePyTorchJobSpec(experimentSpec));
+  }
+
+  @Override
+  public CustomResourceType getResourceType() {
+    return CustomResourceType.PyTorchJob;
+  }
+
+  /**
+   * Parse PyTorchJob Spec
+   */
+  private PyTorchJobSpec parsePyTorchJobSpec(ExperimentSpec experimentSpec)
+          throws InvalidSpecException {
+    PyTorchJobSpec pyTorchJobSpec = new PyTorchJobSpec();
+
+    Map<PyTorchJobReplicaType, MLJobReplicaSpec> replicaSpecMap = new HashMap<>();
+    for (Map.Entry<String, ExperimentTaskSpec> entry : experimentSpec.getSpec().entrySet()) {
+      String replicaType = entry.getKey();
+      ExperimentTaskSpec taskSpec = entry.getValue();
+      if (PyTorchJobReplicaType.isSupportedReplicaType(replicaType)) {
+        MLJobReplicaSpec replicaSpec = new MLJobReplicaSpec();
+        replicaSpec.setReplicas(taskSpec.getReplicas());
+        V1PodTemplateSpec podTemplateSpec = ExperimentSpecParser.parseTemplateSpec(taskSpec, experimentSpec);
+        replicaSpec.setTemplate(podTemplateSpec);
+        replicaSpecMap.put(PyTorchJobReplicaType.valueOf(replicaType), replicaSpec);
+      } else {
+        throw new InvalidSpecException("Unrecognized replica type name: " +
+            entry.getKey() + ", it should be " +
+            String.join(",", PyTorchJobReplicaType.names()) +
+            " for PyTorch experiment.");
+      }
+    }
+    pyTorchJobSpec.setReplicaSpecs(replicaSpecMap);
+    return pyTorchJobSpec;
   }
 
   /**
@@ -59,4 +116,76 @@
   public void setSpec(PyTorchJobSpec spec) {
     this.spec = spec;
   }
+
+  @Override
+  public Experiment read(K8sClient api) {
+    try {
+      PyTorchJob pyTorchJob = api.getPyTorchJobClient()
+          .get(getMetadata().getNamespace(), getMetadata().getName())
+          .throwsApiException().getObject();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Get PyTorchJob resource: \n{}", YamlUtils.toPrettyYaml(pyTorchJob));
+      }
+      return parseExperimentResponseObject(pyTorchJob, PyTorchJob.class);
+    } catch (ApiException e) {
+      throw new SubmarineRuntimeException(e.getCode(), e.getMessage());
+    }
+  }
+
+  @Override
+  public Experiment create(K8sClient api) {
+    try {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Create PyTorchJob resource: \n{}", YamlUtils.toPrettyYaml(this));
+      }
+      PyTorchJob pyTorchJob = api.getPyTorchJobClient()
+          .create(getMetadata().getNamespace(), this, new CreateOptions())
+          .throwsApiException().getObject();
+      return parseExperimentResponseObject(pyTorchJob, PyTorchJob.class);
+    } catch (ApiException e) {
+      LOG.error("K8s submitter: parse PyTorchJob object failed by " + e.getMessage(), e);
+      throw new SubmarineRuntimeException(e.getCode(), "K8s submitter: parse PyTorchJob object failed by " +
+              e.getMessage());
+    }
+  }
+
+  @Override
+  public Experiment replace(K8sClient api) {
+    try {
+      // Using apply yaml patch, field manager must be set, and it must be forced.
+      // https://kubernetes.io/docs/reference/using-api/server-side-apply/#field-management
+      PatchOptions patchOptions = new PatchOptions();
+      patchOptions.setFieldManager(getExperimentId());
+      patchOptions.setForce(true);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Patch PyTorchJob resource: \n{}", YamlUtils.toPrettyYaml(this));
+      }
+      PyTorchJob pyTorchJob = api.getPyTorchJobClient()
+          .patch(getMetadata().getNamespace(), getMetadata().getName(),
+              V1Patch.PATCH_FORMAT_APPLY_YAML,
+              new V1Patch(JsonUtils.toJson(this)),
+              patchOptions)
+          .throwsApiException().getObject();
+      return parseExperimentResponseObject(pyTorchJob, PyTorchJob.class);
+    }  catch (ApiException e) {
+      throw new SubmarineRuntimeException(e.getCode(), e.getMessage());
+    }
+  }
+
+  @Override
+  public Experiment delete(K8sClient api) {
+    try {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Delete PyTorchJob resource in namespace: {} and name: {}",
+                this.getMetadata().getNamespace(), this.getMetadata().getName());
+      }
+      V1Status status = api.getPyTorchJobClient()
+          .delete(getMetadata().getNamespace(), getMetadata().getName(),
+              MLJobConverter.toDeleteOptionsFromMLJob(this))
+          .throwsApiException().getStatus();
+      return parseExperimentResponseStatus(status);
+    } catch (ApiException e) {
+      throw new SubmarineRuntimeException(e.getCode(), e.getMessage());
+    }
+  }
 }
diff --git a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/pytorchjob/PyTorchJobSpec.java b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/pytorchjob/PyTorchJobSpec.java
index 0a11771..7137d6c 100644
--- a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/pytorchjob/PyTorchJobSpec.java
+++ b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/pytorchjob/PyTorchJobSpec.java
@@ -24,6 +24,10 @@
 
 import java.util.Map;
 
+/**
+ * For the entity definition of PyTorchJobSpec, refer to
+ * https://github.com/kubeflow/training-operator/tree/master/examples/pytorch/mnist/v1
+ */
 public class PyTorchJobSpec {
 
   /**
diff --git a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/tfjob/TFJob.java b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/tfjob/TFJob.java
index eb35c5d..ee9d0d9 100644
--- a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/tfjob/TFJob.java
+++ b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/tfjob/TFJob.java
@@ -21,14 +21,33 @@
 
 import com.google.gson.annotations.SerializedName;
 
-import io.kubernetes.client.common.KubernetesObject;
-
+import io.kubernetes.client.custom.V1Patch;
+import io.kubernetes.client.openapi.ApiException;
+import io.kubernetes.client.openapi.models.V1PodTemplateSpec;
+import io.kubernetes.client.openapi.models.V1Status;
+import io.kubernetes.client.util.generic.options.CreateOptions;
+import io.kubernetes.client.util.generic.options.PatchOptions;
+import org.apache.submarine.commons.utils.exception.SubmarineRuntimeException;
+import org.apache.submarine.server.api.common.CustomResourceType;
+import org.apache.submarine.server.api.exception.InvalidSpecException;
+import org.apache.submarine.server.api.experiment.Experiment;
+import org.apache.submarine.server.api.spec.ExperimentSpec;
+import org.apache.submarine.server.api.spec.ExperimentTaskSpec;
+import org.apache.submarine.server.submitter.k8s.client.K8sClient;
 import org.apache.submarine.server.submitter.k8s.model.mljob.MLJob;
+import org.apache.submarine.server.submitter.k8s.model.mljob.MLJobReplicaSpec;
+import org.apache.submarine.server.submitter.k8s.parser.ExperimentSpecParser;
+import org.apache.submarine.server.submitter.k8s.util.JsonUtils;
+import org.apache.submarine.server.submitter.k8s.util.MLJobConverter;
+import org.apache.submarine.server.submitter.k8s.util.YamlUtils;
+
+import java.util.HashMap;
+import java.util.Map;
 
 /**
  * It's the tf-operator's entry model.
  */
-public class TFJob extends MLJob implements KubernetesObject {
+public class TFJob extends MLJob {
 
   public static final  String CRD_TF_KIND_V1 = "TFJob";
   public static final  String CRD_TF_PLURAL_V1 = "tfjobs";
@@ -40,12 +59,50 @@
   @SerializedName("spec")
   private TFJobSpec spec;
 
-  public TFJob() {
+  public TFJob(ExperimentSpec experimentSpec) throws InvalidSpecException {
+    super(experimentSpec);
     setApiVersion(CRD_TF_API_VERSION_V1);
     setKind(CRD_TF_KIND_V1);
     setPlural(CRD_TF_PLURAL_V1);
     setVersion(CRD_TF_VERSION_V1);
     setGroup(CRD_TF_GROUP_V1);
+    // set spec
+    setSpec(parseTFJobSpec(experimentSpec));
+  }
+
+  @Override
+  public CustomResourceType getResourceType() {
+    return CustomResourceType.TFJob;
+  }
+
+  /**
+   * Parse TFJob Spec
+   */
+  private TFJobSpec parseTFJobSpec(ExperimentSpec experimentSpec)
+          throws InvalidSpecException {
+    TFJobSpec tfJobSpec = new TFJobSpec();
+    Map<TFJobReplicaType, MLJobReplicaSpec> replicaSpecMap = new HashMap<>();
+
+    for (Map.Entry<String, ExperimentTaskSpec> entry : experimentSpec.getSpec().entrySet()) {
+      String replicaType = entry.getKey();
+      ExperimentTaskSpec taskSpec = entry.getValue();
+
+      if (TFJobReplicaType.isSupportedReplicaType(replicaType)) {
+        MLJobReplicaSpec replicaSpec = new MLJobReplicaSpec();
+        replicaSpec.setReplicas(taskSpec.getReplicas());
+        V1PodTemplateSpec podTemplateSpec = ExperimentSpecParser.parseTemplateSpec(taskSpec, experimentSpec);
+        replicaSpec.setTemplate(podTemplateSpec);
+        replicaSpecMap.put(TFJobReplicaType.valueOf(replicaType), replicaSpec);
+      } else {
+        throw new InvalidSpecException("Unrecognized replica type name: " +
+            entry.getKey() +
+            ", it should be " +
+            String.join(",", TFJobReplicaType.names()) +
+            " for TensorFlow experiment.");
+      }
+    }
+    tfJobSpec.setReplicaSpecs(replicaSpecMap);
+    return tfJobSpec;
   }
 
   /**
@@ -63,4 +120,77 @@
   public void setSpec(TFJobSpec spec) {
     this.spec = spec;
   }
+
+  @Override
+  public Experiment read(K8sClient api) {
+    try {
+      TFJob tfJob = api.getTfJobClient().get(getMetadata().getNamespace(), getMetadata().getName())
+          .throwsApiException().getObject();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Get TFJob resource: \n{}", YamlUtils.toPrettyYaml(tfJob));
+      }
+      return parseExperimentResponseObject(tfJob, TFJob.class);
+    } catch (ApiException e) {
+      throw new SubmarineRuntimeException(e.getCode(), e.getMessage());
+    }
+  }
+
+  @Override
+  public Experiment create(K8sClient api) {
+    try {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Create TFJob resource: \n{}", YamlUtils.toPrettyYaml(this));
+      }
+      TFJob tfJob = api.getTfJobClient().create(getMetadata().getNamespace(), this,
+          new CreateOptions()).throwsApiException().getObject();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Get TFJob resource: \n{}", YamlUtils.toPrettyYaml(tfJob));
+      }
+      return parseExperimentResponseObject(tfJob, TFJob.class);
+    } catch (ApiException e) {
+      LOG.error("K8s submitter: parse TFJob object failed by " + e.getMessage(), e);
+      throw new SubmarineRuntimeException(e.getCode(), "K8s submitter: parse TFJob object failed by " +
+              e.getMessage());
+    }
+  }
+
+  @Override
+  public Experiment replace(K8sClient api) {
+    try {
+      // Using apply yaml patch, field manager must be set, and it must be forced.
+      // https://kubernetes.io/docs/reference/using-api/server-side-apply/#field-management
+      PatchOptions patchOptions = new PatchOptions();
+      patchOptions.setFieldManager(getExperimentId());
+      patchOptions.setForce(true);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Patch TFJob resource: \n{}", YamlUtils.toPrettyYaml(this));
+      }
+      TFJob tfJob = api.getTfJobClient()
+          .patch(getMetadata().getNamespace(), getMetadata().getName(),
+              V1Patch.PATCH_FORMAT_APPLY_YAML,
+              new V1Patch(JsonUtils.toJson(this)), patchOptions)
+          .throwsApiException().getObject();
+      return parseExperimentResponseObject(tfJob, TFJob.class);
+    }  catch (ApiException e) {
+      throw new SubmarineRuntimeException(e.getCode(), e.getMessage());
+    }
+  }
+
+  @Override
+  public Experiment delete(K8sClient api) {
+    try {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Delete TFJob resource in namespace: {} and name: {}",
+                this.getMetadata().getNamespace(), this.getMetadata().getName());
+      }
+      V1Status status = api.getTfJobClient()
+          .delete(getMetadata().getNamespace(), getMetadata().getName(),
+              MLJobConverter.toDeleteOptionsFromMLJob(this))
+          .throwsApiException().getStatus();
+      return parseExperimentResponseStatus(status);
+    } catch (ApiException e) {
+      throw new SubmarineRuntimeException(e.getCode(), e.getMessage());
+    }
+  }
+
 }
diff --git a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/tfjob/TFJobSpec.java b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/tfjob/TFJobSpec.java
index 1277f14..0e1f3e9 100644
--- a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/tfjob/TFJobSpec.java
+++ b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/tfjob/TFJobSpec.java
@@ -26,6 +26,8 @@
 
 /**
  * The replica spec of TFJob.
+ * For the entity definition of TFJobSpec, refer to
+ * https://github.com/kubeflow/training-operator/blob/master/examples/tensorflow/dist-mnist/tf_job_mnist.yaml
  */
 public class TFJobSpec {
   /**
diff --git a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/xgboostjob/XGBoostJob.java b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/xgboostjob/XGBoostJob.java
index 968ed7b..0bacbfd 100644
--- a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/xgboostjob/XGBoostJob.java
+++ b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/xgboostjob/XGBoostJob.java
@@ -20,7 +20,28 @@
 package org.apache.submarine.server.submitter.k8s.model.xgboostjob;
 
 import com.google.gson.annotations.SerializedName;
+import io.kubernetes.client.custom.V1Patch;
+import io.kubernetes.client.openapi.ApiException;
+import io.kubernetes.client.openapi.models.V1PodTemplateSpec;
+import io.kubernetes.client.openapi.models.V1Status;
+import io.kubernetes.client.util.generic.options.CreateOptions;
+import io.kubernetes.client.util.generic.options.PatchOptions;
+import org.apache.submarine.commons.utils.exception.SubmarineRuntimeException;
+import org.apache.submarine.server.api.common.CustomResourceType;
+import org.apache.submarine.server.api.exception.InvalidSpecException;
+import org.apache.submarine.server.api.experiment.Experiment;
+import org.apache.submarine.server.api.spec.ExperimentSpec;
+import org.apache.submarine.server.api.spec.ExperimentTaskSpec;
+import org.apache.submarine.server.submitter.k8s.client.K8sClient;
 import org.apache.submarine.server.submitter.k8s.model.mljob.MLJob;
+import org.apache.submarine.server.submitter.k8s.model.mljob.MLJobReplicaSpec;
+import org.apache.submarine.server.submitter.k8s.parser.ExperimentSpecParser;
+import org.apache.submarine.server.submitter.k8s.util.JsonUtils;
+import org.apache.submarine.server.submitter.k8s.util.MLJobConverter;
+import org.apache.submarine.server.submitter.k8s.util.YamlUtils;
+
+import java.util.HashMap;
+import java.util.Map;
 
 public class XGBoostJob extends MLJob {
 
@@ -34,12 +55,43 @@
   @SerializedName("spec")
   private XGBoostJobSpec spec;
 
-  public XGBoostJob() {
+  public XGBoostJob(ExperimentSpec experimentSpec) {
+    super(experimentSpec);
     setApiVersion(CRD_XGBOOST_API_VERSION_V1);
     setKind(CRD_XGBOOST_KIND_V1);
     setPlural(CRD_XGBOOST_PLURAL_V1);
     setVersion(CRD_XGBOOST_VERSION_V1);
     setGroup(CRD_XGBOOST_GROUP_V1);
+    // set spec
+    setSpec(parseXGBoostJobSpec(experimentSpec));
+  }
+
+  private XGBoostJobSpec parseXGBoostJobSpec(ExperimentSpec experimentSpec)
+          throws InvalidSpecException {
+    XGBoostJobSpec xGBoostJobSpec = new XGBoostJobSpec();
+
+    Map<XGBoostJobReplicaType, MLJobReplicaSpec> replicaSpecMap = new HashMap<>();
+
+    for (Map.Entry<String, ExperimentTaskSpec> entry : experimentSpec.getSpec().entrySet()) {
+      String replicaType = entry.getKey();
+      ExperimentTaskSpec taskSpec = entry.getValue();
+
+      if (XGBoostJobReplicaType.isSupportedReplicaType(replicaType)) {
+        MLJobReplicaSpec replicaSpec = new MLJobReplicaSpec();
+        replicaSpec.setReplicas(taskSpec.getReplicas());
+        V1PodTemplateSpec podTemplateSpec = ExperimentSpecParser.parseTemplateSpec(taskSpec, experimentSpec);
+        replicaSpec.setTemplate(podTemplateSpec);
+        replicaSpecMap.put(XGBoostJobReplicaType.valueOf(replicaType), replicaSpec);
+      } else {
+        throw new InvalidSpecException("Unrecognized replica type name: " +
+            entry.getKey() +
+            ", it should be " +
+            String.join(",", XGBoostJobReplicaType.names()) +
+            " for XGBoost experiment.");
+      }
+    }
+    xGBoostJobSpec.setReplicaSpecs(replicaSpecMap);
+    return xGBoostJobSpec;
   }
 
   /**
@@ -57,5 +109,82 @@
   public void setSpec(XGBoostJobSpec spec) {
     this.spec = spec;
   }
+
+  @Override
+  public CustomResourceType getResourceType() {
+    return CustomResourceType.XGBoost;
+  }
+
+  @Override
+  public Experiment read(K8sClient api) {
+    try {
+      XGBoostJob xgBoostJob = api.getXGBoostJobClient()
+          .get(getMetadata().getNamespace(), getMetadata().getName())
+          .throwsApiException().getObject();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Get XGBoostJob resource: \n{}", YamlUtils.toPrettyYaml(xgBoostJob));
+      }
+      return parseExperimentResponseObject(xgBoostJob, XGBoostJob.class);
+    } catch (ApiException e) {
+      throw new SubmarineRuntimeException(e.getCode(), e.getMessage());
+    }
+  }
+
+  @Override
+  public Experiment create(K8sClient api) {
+    try {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Create XGBoostJob resource: \n{}", YamlUtils.toPrettyYaml(this));
+      }
+      XGBoostJob xgBoostJob = api.getXGBoostJobClient()
+            .create(getMetadata().getNamespace(), this, new CreateOptions())
+            .throwsApiException().getObject();
+      return parseExperimentResponseObject(xgBoostJob, XGBoostJob.class);
+    } catch (ApiException e) {
+      LOG.error("K8s submitter: parse XGBoostJob object failed by " + e.getMessage(), e);
+      throw new SubmarineRuntimeException(e.getCode(), "K8s submitter: parse XGBoostJob object failed by " +
+              e.getMessage());
+    }
+  }
+
+  @Override
+  public Experiment replace(K8sClient api) {
+    try {
+      // Using apply yaml patch, field manager must be set, and it must be forced.
+      // https://kubernetes.io/docs/reference/using-api/server-side-apply/#field-management
+      PatchOptions patchOptions = new PatchOptions();
+      patchOptions.setFieldManager(getExperimentId());
+      patchOptions.setForce(true);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Patch XGBoostJob resource: \n{}", YamlUtils.toPrettyYaml(this));
+      }
+      XGBoostJob xgBoostJob = api.getXGBoostJobClient()
+          .patch(getMetadata().getNamespace(), getMetadata().getName(),
+              V1Patch.PATCH_FORMAT_APPLY_YAML,
+              new V1Patch(JsonUtils.toJson(this)),
+              patchOptions)
+          .throwsApiException().getObject();
+      return parseExperimentResponseObject(xgBoostJob, XGBoostJob.class);
+    }  catch (ApiException e) {
+      throw new SubmarineRuntimeException(e.getCode(), e.getMessage());
+    }
+  }
+
+  @Override
+  public Experiment delete(K8sClient api) {
+    try {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Delete XGBoostJob resource in namespace: {} and name: {}",
+            this.getMetadata().getNamespace(), this.getMetadata().getName());
+      }
+      V1Status status = api.getXGBoostJobClient()
+          .delete(getMetadata().getNamespace(), getMetadata().getName(),
+              MLJobConverter.toDeleteOptionsFromMLJob(this))
+          .throwsApiException().getStatus();
+      return parseExperimentResponseStatus(status);
+    } catch (ApiException e) {
+      throw new SubmarineRuntimeException(e.getCode(), e.getMessage());
+    }
+  }
 }
 
diff --git a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/parser/ExperimentSpecParser.java b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/parser/ExperimentSpecParser.java
index 94e3c2c..991e03c 100644
--- a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/parser/ExperimentSpecParser.java
+++ b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/parser/ExperimentSpecParser.java
@@ -23,7 +23,7 @@
 
 import io.kubernetes.client.openapi.models.V1Container;
 import io.kubernetes.client.openapi.models.V1EnvVar;
-import io.kubernetes.client.openapi.models.V1ObjectMeta;
+import io.kubernetes.client.openapi.models.V1ObjectMetaBuilder;
 import io.kubernetes.client.openapi.models.V1PersistentVolumeClaimVolumeSource;
 import io.kubernetes.client.openapi.models.V1PodSecurityContext;
 import io.kubernetes.client.openapi.models.V1PodSpec;
@@ -36,7 +36,6 @@
 import org.apache.submarine.commons.utils.SubmarineConfiguration;
 import org.apache.submarine.server.api.environment.Environment;
 import org.apache.submarine.server.api.exception.InvalidSpecException;
-import org.apache.submarine.server.api.spec.ExperimentMeta;
 import org.apache.submarine.server.api.spec.ExperimentSpec;
 import org.apache.submarine.server.api.spec.ExperimentTaskSpec;
 import org.apache.submarine.server.api.spec.EnvironmentSpec;
@@ -44,17 +43,6 @@
 import org.apache.submarine.server.submitter.k8s.experiment.codelocalizer.AbstractCodeLocalizer;
 import org.apache.submarine.server.submitter.k8s.experiment.codelocalizer.CodeLocalizer;
 import org.apache.submarine.server.submitter.k8s.experiment.codelocalizer.SSHGitCodeLocalizer;
-import org.apache.submarine.server.submitter.k8s.model.mljob.MLJob;
-import org.apache.submarine.server.submitter.k8s.model.mljob.MLJobReplicaSpec;
-import org.apache.submarine.server.submitter.k8s.model.pytorchjob.PyTorchJob;
-import org.apache.submarine.server.submitter.k8s.model.pytorchjob.PyTorchJobReplicaType;
-import org.apache.submarine.server.submitter.k8s.model.pytorchjob.PyTorchJobSpec;
-import org.apache.submarine.server.submitter.k8s.model.tfjob.TFJob;
-import org.apache.submarine.server.submitter.k8s.model.tfjob.TFJobReplicaType;
-import org.apache.submarine.server.submitter.k8s.model.tfjob.TFJobSpec;
-import org.apache.submarine.server.submitter.k8s.model.xgboostjob.XGBoostJob;
-import org.apache.submarine.server.submitter.k8s.model.xgboostjob.XGBoostJobReplicaType;
-import org.apache.submarine.server.submitter.k8s.model.xgboostjob.XGBoostJobSpec;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -63,146 +51,18 @@
 import java.util.Map;
 
 public class ExperimentSpecParser {
-  private static SubmarineConfiguration conf = SubmarineConfiguration.getInstance();
 
-  public static MLJob parseJob(ExperimentSpec experimentSpec) throws InvalidSpecException {
-    String framework = experimentSpec.getMeta().getFramework();
-    if (ExperimentMeta.SupportedMLFramework.TENSORFLOW.
-        getName().equalsIgnoreCase(framework)) {
-      return parseTFJob(experimentSpec);
-    } else if (ExperimentMeta.SupportedMLFramework.PYTORCH.
-        getName().equalsIgnoreCase(framework)) {
-      return parsePyTorchJob(experimentSpec);
-    } else if (ExperimentMeta.SupportedMLFramework.XGBOOST.
-        getName().equalsIgnoreCase(framework)) {
-      return parseXGBoostJob(experimentSpec);
-    } else {
-      throw new InvalidSpecException("Unsupported framework name: " + framework +
-          ". Supported frameworks are: " +
-          String.join(",", ExperimentMeta.SupportedMLFramework.names()));
-    }
-  }
+  private static final SubmarineConfiguration conf = SubmarineConfiguration.getInstance();
 
-  public static XGBoostJob parseXGBoostJob(
-      ExperimentSpec experimentSpec) throws InvalidSpecException {
-    XGBoostJob xGBoostJob = new XGBoostJob();
-    xGBoostJob.setMetadata(parseMetadata(experimentSpec));
-    xGBoostJob.setSpec(parseXGBoostJobSpec(experimentSpec));
-    return xGBoostJob;
-  }
-
-  public static XGBoostJobSpec parseXGBoostJobSpec(ExperimentSpec experimentSpec)
-      throws InvalidSpecException {
-    XGBoostJobSpec xGBoostJobSpec = new XGBoostJobSpec();
-
-    Map<XGBoostJobReplicaType, MLJobReplicaSpec> replicaSpecMap = new HashMap<>();
-
-    for (Map.Entry<String, ExperimentTaskSpec> entry : experimentSpec.getSpec().entrySet()) {
-      String replicaType = entry.getKey();
-      ExperimentTaskSpec taskSpec = entry.getValue();
-
-      if (XGBoostJobReplicaType.isSupportedReplicaType(replicaType)) {
-        MLJobReplicaSpec replicaSpec = new MLJobReplicaSpec();
-        replicaSpec.setReplicas(taskSpec.getReplicas());
-        V1PodTemplateSpec podTemplateSpec = parseTemplateSpec(taskSpec, experimentSpec);
-
-        replicaSpec.setTemplate(podTemplateSpec);
-        replicaSpecMap.put(XGBoostJobReplicaType.valueOf(replicaType), replicaSpec);
-      } else {
-        throw new InvalidSpecException("Unrecognized replica type name: " +
-            entry.getKey() +
-            ", it should be " +
-            String.join(",", XGBoostJobReplicaType.names()) +
-            " for XGBoost experiment.");
-      }
-    }
-    xGBoostJobSpec.setReplicaSpecs(replicaSpecMap);
-    return xGBoostJobSpec;
-  }
-
-  public static PyTorchJob parsePyTorchJob(
-      ExperimentSpec experimentSpec) throws InvalidSpecException {
-    PyTorchJob pyTorchJob = new PyTorchJob();
-    pyTorchJob.setMetadata(parseMetadata(experimentSpec));
-    pyTorchJob.setSpec(parsePyTorchJobSpec(experimentSpec));
-    return pyTorchJob;
-  }
-
-  public static PyTorchJobSpec parsePyTorchJobSpec(ExperimentSpec experimentSpec)
-      throws InvalidSpecException {
-    PyTorchJobSpec pyTorchJobSpec = new PyTorchJobSpec();
-
-    Map<PyTorchJobReplicaType, MLJobReplicaSpec> replicaSpecMap = new HashMap<>();
-    for (Map.Entry<String, ExperimentTaskSpec> entry : experimentSpec.getSpec().entrySet()) {
-      String replicaType = entry.getKey();
-      ExperimentTaskSpec taskSpec = entry.getValue();
-      if (PyTorchJobReplicaType.isSupportedReplicaType(replicaType)) {
-        MLJobReplicaSpec replicaSpec = new MLJobReplicaSpec();
-        replicaSpec.setReplicas(taskSpec.getReplicas());
-        V1PodTemplateSpec podTemplateSpec = parseTemplateSpec(taskSpec, experimentSpec);
-
-        replicaSpec.setTemplate(podTemplateSpec);
-        replicaSpecMap.put(PyTorchJobReplicaType.valueOf(replicaType), replicaSpec);
-      } else {
-        throw new InvalidSpecException("Unrecognized replica type name: " +
-            entry.getKey() + ", it should be " +
-            String.join(",", PyTorchJobReplicaType.names()) +
-            " for PyTorch experiment.");
-      }
-    }
-    pyTorchJobSpec.setReplicaSpecs(replicaSpecMap);
-    return pyTorchJobSpec;
-  }
-
-  public static TFJob parseTFJob(ExperimentSpec experimentSpec)
-      throws InvalidSpecException {
-    TFJob tfJob = new TFJob();
-    tfJob.setMetadata(parseMetadata(experimentSpec));
-    tfJob.setSpec(parseTFJobSpec(experimentSpec));
-    return tfJob;
-  }
-
-  private static V1ObjectMeta parseMetadata(ExperimentSpec experimentSpec) {
-    V1ObjectMeta meta = new V1ObjectMeta();
-    meta.setName(experimentSpec.getMeta().getExperimentId());
-    Map<String, String> labels = new HashMap<>();
-    labels.put(ExperimentMeta.SUBMARINE_EXPERIMENT_NAME, experimentSpec.getMeta().getName());
-    meta.setLabels(labels);
-
-    return meta;
-  }
-
-  private static TFJobSpec parseTFJobSpec(ExperimentSpec experimentSpec)
-          throws InvalidSpecException {
-    TFJobSpec tfJobSpec = new TFJobSpec();
-    Map<TFJobReplicaType, MLJobReplicaSpec> replicaSpecMap = new HashMap<>();
-
-    for (Map.Entry<String, ExperimentTaskSpec> entry : experimentSpec.getSpec().entrySet()) {
-      String replicaType = entry.getKey();
-      ExperimentTaskSpec taskSpec = entry.getValue();
-
-      if (TFJobReplicaType.isSupportedReplicaType(replicaType)) {
-        MLJobReplicaSpec replicaSpec = new MLJobReplicaSpec();
-        replicaSpec.setReplicas(taskSpec.getReplicas());
-        V1PodTemplateSpec podTemplateSpec = parseTemplateSpec(taskSpec, experimentSpec);
-
-        replicaSpec.setTemplate(podTemplateSpec);
-        replicaSpecMap.put(TFJobReplicaType.valueOf(replicaType), replicaSpec);
-      } else {
-        throw new InvalidSpecException("Unrecognized replica type name: " +
-            entry.getKey() +
-            ", it should be " +
-            String.join(",", TFJobReplicaType.names()) +
-            " for TensorFlow experiment.");
-      }
-    }
-    tfJobSpec.setReplicaSpecs(replicaSpecMap);
-    return tfJobSpec;
-  }
-
-  private static V1PodTemplateSpec parseTemplateSpec(
+  public static V1PodTemplateSpec parseTemplateSpec(
       ExperimentTaskSpec taskSpec, ExperimentSpec experimentSpec) throws InvalidSpecException {
     V1PodTemplateSpec templateSpec = new V1PodTemplateSpec();
+    // There is no need to add istio sidecar. Otherwise, the pod may not end normally
+    // https://istio.io/latest/docs/setup/additional-setup/sidecar-injection/
+    // Controlling the injection policy Section
+    V1ObjectMetaBuilder metaBuilder = new V1ObjectMetaBuilder()
+            .addToAnnotations("sidecar.istio.io/inject", "false");
+    templateSpec.setMetadata(metaBuilder.build());
     V1PodSpec podSpec = new V1PodSpec();
     List<V1Container> containers = new ArrayList<>();
     V1Container container = new V1Container();
@@ -402,6 +262,7 @@
       if (request) {
         resources.put("memory", new Quantity(memoryRequest)); // ex: 1024M
       } else {
+        // SUBMARINE-948. Allow experiments to overcommit memory
         String suffix = memoryRequest.substring(memoryRequest.length() - 1);
         String value = memoryRequest.substring(0, memoryRequest.length() - 1);
         String memoryLimit = String.valueOf(Integer.parseInt(value) * 2) + suffix;
diff --git a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/parser/NotebookSpecParser.java b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/parser/NotebookSpecParser.java
index 16b8cea..8a92467 100644
--- a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/parser/NotebookSpecParser.java
+++ b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/parser/NotebookSpecParser.java
@@ -230,7 +230,7 @@
     return resources;
   }
 
-  private static Environment getEnvironment(NotebookSpec notebookSpec) {
+  public static Environment getEnvironment(NotebookSpec notebookSpec) {
     if (notebookSpec.getEnvironment().getName() != null) {
       EnvironmentManager environmentManager = EnvironmentManager.getInstance();
       return environmentManager
diff --git a/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/ExperimentSpecParserTest.java b/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/ExperimentSpecParserTest.java
index 4e2deb2..e6f8b34 100644
--- a/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/ExperimentSpecParserTest.java
+++ b/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/ExperimentSpecParserTest.java
@@ -21,6 +21,10 @@
 
 import java.io.IOException;
 import java.net.URISyntaxException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -35,8 +39,10 @@
 import org.apache.submarine.server.api.spec.ExperimentTaskSpec;
 import org.apache.submarine.server.api.spec.EnvironmentSpec;
 import org.apache.submarine.server.api.spec.KernelSpec;
+import org.apache.submarine.server.k8s.utils.K8sUtils;
 import org.apache.submarine.server.manager.EnvironmentManager;
 import org.apache.submarine.server.submitter.k8s.model.mljob.MLJob;
+import org.apache.submarine.server.submitter.k8s.model.mljob.MLJobFactory;
 import org.apache.submarine.server.submitter.k8s.model.mljob.MLJobReplicaSpec;
 import org.apache.submarine.server.submitter.k8s.model.mljob.MLJobReplicaType;
 import org.apache.submarine.server.submitter.k8s.model.pytorchjob.PyTorchJob;
@@ -45,11 +51,11 @@
 import org.apache.submarine.server.submitter.k8s.model.tfjob.TFJobReplicaType;
 import org.apache.submarine.server.submitter.k8s.model.xgboostjob.XGBoostJob;
 import org.apache.submarine.server.submitter.k8s.model.xgboostjob.XGBoostJobReplicaType;
-import org.apache.submarine.server.submitter.k8s.parser.ExperimentSpecParser;
 import org.apache.submarine.server.submitter.k8s.experiment.codelocalizer.AbstractCodeLocalizer;
 import org.apache.submarine.server.submitter.k8s.experiment.codelocalizer.GitCodeLocalizer;
 import org.apache.submarine.server.submitter.k8s.experiment.codelocalizer.SSHGitCodeLocalizer;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
 import io.kubernetes.client.openapi.models.V1Container;
 import io.kubernetes.client.openapi.models.V1EmptyDirVolumeSource;
@@ -58,14 +64,29 @@
 
 public class ExperimentSpecParserTest extends SpecBuilder {
 
-  private static SubmarineConfiguration conf =
+  private static final SubmarineConfiguration conf =
       SubmarineConfiguration.getInstance();
 
+  @Before
+  public void beforeInit() {
+    conf.setJdbcUrl(H2_JDBC_URL);
+    conf.setJdbcDriverClassName(H2_JDBC_DRIVERCLASS);
+    conf.setJdbcUserName(H2_JDBC_USERNAME);
+    conf.setJdbcPassword(H2_JDBC_PASSWORD);
+    try (Connection conn = DriverManager.getConnection(H2_JDBC_URL,
+            H2_JDBC_USERNAME, H2_JDBC_PASSWORD);
+         Statement stmt = conn.createStatement()) {
+      stmt.execute("RUNSCRIPT FROM 'classpath:/db/experiment.sql'");
+    } catch (SQLException e) {
+      e.printStackTrace();
+    }
+  }
+
   @Test
   public void testValidTensorFlowExperiment() throws IOException,
       URISyntaxException, InvalidSpecException {
     ExperimentSpec experimentSpec = (ExperimentSpec) buildFromJsonFile(ExperimentSpec.class, tfJobReqFile);
-    TFJob tfJob = (TFJob) ExperimentSpecParser.parseJob(experimentSpec);
+    TFJob tfJob = (TFJob) MLJobFactory.getMLJob(experimentSpec);
     validateMetadata(experimentSpec.getMeta(), tfJob.getMetadata(),
         ExperimentMeta.SupportedMLFramework.TENSORFLOW.getName().toLowerCase()
     );
@@ -86,7 +107,7 @@
     // Case 1. Invalid framework name
     experimentSpec.getMeta().setFramework("fooframework");
     try {
-      ExperimentSpecParser.parseJob(experimentSpec);
+      MLJobFactory.getMLJob(experimentSpec);
       Assert.fail("It should throw InvalidSpecException");
     } catch (InvalidSpecException e) {
       Assert.assertTrue(e.getMessage().contains("Unsupported framework name"));
@@ -97,7 +118,7 @@
     experimentSpec.getSpec().put("foo", experimentSpec.getSpec().get(TFJobReplicaType.Ps.getTypeName()));
     experimentSpec.getSpec().remove(TFJobReplicaType.Ps.getTypeName());
     try {
-      ExperimentSpecParser.parseJob(experimentSpec);
+      MLJobFactory.getMLJob(experimentSpec);
       Assert.fail("It should throw InvalidSpecException");
     } catch (InvalidSpecException e) {
       Assert.assertTrue(e.getMessage().contains("Unrecognized replica type name"));
@@ -109,7 +130,7 @@
       URISyntaxException, InvalidSpecException {
     ExperimentSpec experimentSpec =
             (ExperimentSpec) buildFromJsonFile(ExperimentSpec.class, pytorchJobReqFile);
-    PyTorchJob pyTorchJob = (PyTorchJob) ExperimentSpecParser.parseJob(experimentSpec);
+    PyTorchJob pyTorchJob = (PyTorchJob) MLJobFactory.getMLJob(experimentSpec);
     validateMetadata(experimentSpec.getMeta(), pyTorchJob.getMetadata(),
         ExperimentMeta.SupportedMLFramework.PYTORCH.getName().toLowerCase()
     );
@@ -131,7 +152,7 @@
     // Case 1. Invalid framework name
     experimentSpec.getMeta().setFramework("fooframework");
     try {
-      ExperimentSpecParser.parseJob(experimentSpec);
+      MLJobFactory.getMLJob(experimentSpec);
       Assert.fail("It should throw InvalidSpecException");
     } catch (InvalidSpecException e) {
       Assert.assertTrue(e.getMessage().contains("Unsupported framework name"));
@@ -143,7 +164,7 @@
         PyTorchJobReplicaType.Master.getTypeName()));
     experimentSpec.getSpec().remove(PyTorchJobReplicaType.Master.getTypeName());
     try {
-      ExperimentSpecParser.parseJob(experimentSpec);
+      MLJobFactory.getMLJob(experimentSpec);
       Assert.fail("It should throw InvalidSpecException");
     } catch (InvalidSpecException e) {
       Assert.assertTrue(e.getMessage().contains("Unrecognized replica type name"));
@@ -155,7 +176,7 @@
       URISyntaxException, InvalidSpecException {
     ExperimentSpec experimentSpec = (ExperimentSpec) buildFromJsonFile(ExperimentSpec.class,
         xgboostJobReqFile);
-    XGBoostJob xgboostJob = (XGBoostJob) ExperimentSpecParser.parseJob(experimentSpec);
+    XGBoostJob xgboostJob = (XGBoostJob) MLJobFactory.getMLJob(experimentSpec);
     validateMetadata(experimentSpec.getMeta(), xgboostJob.getMetadata(),
         ExperimentMeta.SupportedMLFramework.XGBOOST.getName().toLowerCase()
     );
@@ -172,7 +193,7 @@
     // Case 1. Invalid framework name
     experimentSpec.getMeta().setFramework("fooframework");
     try {
-      ExperimentSpecParser.parseJob(experimentSpec);
+      MLJobFactory.getMLJob(experimentSpec);
       Assert.fail("It should throw InvalidSpecException");
     } catch (InvalidSpecException e) {
       Assert.assertTrue(e.getMessage().contains("Unsupported framework name"));
@@ -184,7 +205,7 @@
         XGBoostJobReplicaType.Master.getTypeName()));
     experimentSpec.getSpec().remove(XGBoostJobReplicaType.Master.getTypeName());
     try {
-      ExperimentSpecParser.parseJob(experimentSpec);
+      MLJobFactory.getMLJob(experimentSpec);
       Assert.fail("It should throw InvalidSpecException");
     } catch (InvalidSpecException e) {
       Assert.assertTrue(e.getMessage().contains("Unrecognized replica type name"));
@@ -193,8 +214,8 @@
 
   private void validateMetadata(ExperimentMeta expectedMeta, V1ObjectMeta actualMeta,
       String actualFramework) {
-    Assert.assertEquals(expectedMeta.getName(), actualMeta.getName());
-    Assert.assertEquals(expectedMeta.getNamespace(), actualMeta.getNamespace());
+    Assert.assertEquals(expectedMeta.getExperimentId(), actualMeta.getName());
+    Assert.assertEquals(K8sUtils.getNamespace(), actualMeta.getNamespace());
     Assert.assertEquals(expectedMeta.getFramework().toLowerCase(), actualFramework);
   }
 
@@ -209,7 +230,7 @@
       mlJobReplicaSpec = ((XGBoostJob) mlJob).getSpec().getReplicaSpecs().get(type);
     }
     Assert.assertNotNull(mlJobReplicaSpec);
-    
+
     ExperimentTaskSpec definedPyTorchMasterTask = experimentSpec.getSpec().
         get(type.getTypeName());
 
@@ -273,7 +294,7 @@
 
     ExperimentSpec jobSpec =
             (ExperimentSpec) buildFromJsonFile(ExperimentSpec.class, pytorchJobWithEnvReqFile);
-    PyTorchJob pyTorchJob = (PyTorchJob) ExperimentSpecParser.parseJob(jobSpec);
+    PyTorchJob pyTorchJob = (PyTorchJob) MLJobFactory.getMLJob(jobSpec);
 
     MLJobReplicaSpec mlJobReplicaSpec = pyTorchJob.getSpec().getReplicaSpecs()
         .get(PyTorchJobReplicaType.Master);
@@ -319,7 +340,7 @@
     ExperimentSpec jobSpec =
         (ExperimentSpec) buildFromJsonFile(ExperimentSpec.class,
             pytorchJobWithHTTPGitCodeLocalizerFile);
-    PyTorchJob pyTorchJob = (PyTorchJob) ExperimentSpecParser.parseJob(jobSpec);
+    PyTorchJob pyTorchJob = (PyTorchJob) MLJobFactory.getMLJob(jobSpec);
 
     MLJobReplicaSpec mlJobReplicaSpec = pyTorchJob.getSpec().getReplicaSpecs()
         .get(PyTorchJobReplicaType.Master);
@@ -351,8 +372,10 @@
       }
     }
 
-    V1Volume V1Volume =
-        mlJobReplicaSpec.getTemplate().getSpec().getVolumes().get(0);
+    // we need to filter code-dir first
+    V1Volume V1Volume = mlJobReplicaSpec.getTemplate().getSpec().getVolumes().stream()
+        .filter(v -> v.getName().equals(AbstractCodeLocalizer.CODE_LOCALIZER_MOUNT_NAME))
+        .findFirst().get();
     Assert.assertEquals(new V1EmptyDirVolumeSource(), V1Volume.getEmptyDir());
     Assert.assertEquals(AbstractCodeLocalizer.CODE_LOCALIZER_MOUNT_NAME,
         V1Volume.getName());
@@ -364,7 +387,7 @@
     ExperimentSpec jobSpec =
         (ExperimentSpec) buildFromJsonFile(ExperimentSpec.class,
             pytorchJobWithSSHGitCodeLocalizerFile);
-    PyTorchJob pyTorchJob = (PyTorchJob) ExperimentSpecParser.parseJob(jobSpec);
+    PyTorchJob pyTorchJob = (PyTorchJob) MLJobFactory.getMLJob(jobSpec);
 
     MLJobReplicaSpec mlJobReplicaSpec = pyTorchJob.getSpec().getReplicaSpecs()
         .get(PyTorchJobReplicaType.Master);
@@ -402,8 +425,10 @@
       }
     }
 
-    V1Volume V1Volume =
-        mlJobReplicaSpec.getTemplate().getSpec().getVolumes().get(0);
+    // we need to filter code-dir first
+    V1Volume V1Volume = mlJobReplicaSpec.getTemplate().getSpec().getVolumes().stream()
+        .filter(v -> v.getName().equals(AbstractCodeLocalizer.CODE_LOCALIZER_MOUNT_NAME))
+        .findFirst().get();
     Assert.assertEquals(new V1EmptyDirVolumeSource(), V1Volume.getEmptyDir());
     Assert.assertEquals(AbstractCodeLocalizer.CODE_LOCALIZER_MOUNT_NAME,
         V1Volume.getName());
diff --git a/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/K8SJobSubmitterTest.java b/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/K8SJobSubmitterTest.java
index eaa8b06..f68e4ba 100644
--- a/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/K8SJobSubmitterTest.java
+++ b/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/K8SJobSubmitterTest.java
@@ -51,6 +51,10 @@
  * file. Local: docker run -it --privileged -p 8443:8443 -p 10080:10080
  * bsycorp/kind:latest-1.15 Travis: See '.travis.yml'
  */
+
+// We have created mljob in org.apache.submarine.server.submitter.k8s.mljob package
+// So that we will ignore this test case for the time being
+@Ignore
 public class K8SJobSubmitterTest extends SpecBuilder {
   private static final Logger LOG = LoggerFactory.getLogger(K8SJobSubmitterTest.class);
   private K8sSubmitter submitter;
@@ -90,7 +94,7 @@
 
   @Test
   public void testRunTFJobPerRequest() throws URISyntaxException, IOException, SubmarineRuntimeException {
-    ExperimentSpec spec = (ExperimentSpec) buildFromJsonFile(ExperimentSpec.class, tfJobReqFile);
+    ExperimentSpec spec = (ExperimentSpec) buildFromJsonFile(ExperimentSpec.class, xgboostJobReqFile);
     run(spec);
   }
 
diff --git a/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/MLJobConverterTest.java b/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/MLJobConverterTest.java
index 54eef99..63c050c 100644
--- a/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/MLJobConverterTest.java
+++ b/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/MLJobConverterTest.java
@@ -34,9 +34,9 @@
 import org.apache.submarine.server.api.exception.InvalidSpecException;
 import org.apache.submarine.server.api.experiment.Experiment;
 import org.apache.submarine.server.api.spec.ExperimentSpec;
+import org.apache.submarine.server.submitter.k8s.model.mljob.MLJobFactory;
 import org.apache.submarine.server.submitter.k8s.util.MLJobConverter;
 import org.apache.submarine.server.submitter.k8s.model.mljob.MLJob;
-import org.apache.submarine.server.submitter.k8s.parser.ExperimentSpecParser;
 import org.joda.time.DateTime;
 import org.junit.Assert;
 import org.junit.Test;
@@ -46,7 +46,7 @@
   public void testMLJob2Job() throws IOException, URISyntaxException, InvalidSpecException {
     // Accepted Status
     ExperimentSpec spec = (ExperimentSpec) buildFromJsonFile(ExperimentSpec.class, tfJobReqFile);
-    MLJob mlJob = ExperimentSpecParser.parseJob(spec);
+    MLJob mlJob = MLJobFactory.getMLJob(spec);
     V1JobStatus status = new V1JobStatusBuilder().build();
     mlJob.setStatus(status);
     Experiment experiment = MLJobConverter.toJobFromMLJob(mlJob);
@@ -73,8 +73,8 @@
     condition = new V1JobConditionBuilder().withStatus("True")
         .withType("Running").withLastTransitionTime(runningTime).build();
     conditions.add(condition);
-
     mlJob.getStatus().setConditions(conditions);
+
     experiment = MLJobConverter.toJobFromMLJob(mlJob);
     Assert.assertEquals(Experiment.Status.STATUS_RUNNING.toString(), experiment.getStatus());
     Assert.assertEquals(runningTime.toString(), experiment.getRunningTime());
@@ -82,6 +82,11 @@
     // Succeeded Status
     DateTime finishedTime = new DateTime();
     mlJob.getStatus().setCompletionTime(finishedTime);
+    condition = new V1JobConditionBuilder().withStatus("True")
+            .withType("Succeeded").withLastTransitionTime(runningTime).build();
+    conditions.add(condition);
+    mlJob.getStatus().setConditions(conditions);
+
     experiment = MLJobConverter.toJobFromMLJob(mlJob);
     Assert.assertEquals(Experiment.Status.STATUS_SUCCEEDED.toString(), experiment.getStatus());
     Assert.assertEquals(finishedTime.toString(), experiment.getFinishedTime());
@@ -99,7 +104,7 @@
   public void testMLJob2DeleteOptions() throws IOException, URISyntaxException,
       InvalidSpecException {
     ExperimentSpec spec = (ExperimentSpec) buildFromJsonFile(ExperimentSpec.class, tfJobReqFile);
-    MLJob mlJob = ExperimentSpecParser.parseJob(spec);
+    MLJob mlJob = MLJobFactory.getMLJob(spec);
     V1DeleteOptions options = MLJobConverter.toDeleteOptionsFromMLJob(mlJob);
     Assert.assertNotNull(options);
     Assert.assertEquals(mlJob.getApiVersion(), options.getApiVersion());
diff --git a/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/NotebookSpecParserTest.java b/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/NotebookSpecParserTest.java
index 27fd5fb..7d9f576 100644
--- a/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/NotebookSpecParserTest.java
+++ b/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/NotebookSpecParserTest.java
@@ -21,21 +21,46 @@
 
 import io.kubernetes.client.openapi.models.V1EnvVar;
 import io.kubernetes.client.openapi.models.V1ObjectMeta;
+import org.apache.submarine.commons.utils.SubmarineConfiguration;
+import org.apache.submarine.server.api.environment.Environment;
 import org.apache.submarine.server.api.spec.NotebookMeta;
 import org.apache.submarine.server.api.spec.NotebookPodSpec;
 import org.apache.submarine.server.api.spec.NotebookSpec;
 import org.apache.submarine.server.submitter.k8s.model.common.Configmap;
 import org.apache.submarine.server.submitter.k8s.model.notebook.NotebookCR;
 import org.apache.submarine.server.submitter.k8s.model.notebook.NotebookCRSpec;
+import org.apache.submarine.server.submitter.k8s.parser.NotebookSpecParser;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
 
 import java.io.IOException;
 import java.net.URISyntaxException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
 import java.util.Map;
 
 public class NotebookSpecParserTest extends SpecBuilder {
 
+  private static final SubmarineConfiguration conf = SubmarineConfiguration.getInstance();
+
+  @Before
+  public void beforeInit() {
+    conf.setJdbcUrl(H2_JDBC_URL);
+    conf.setJdbcDriverClassName(H2_JDBC_DRIVERCLASS);
+    conf.setJdbcUserName(H2_JDBC_USERNAME);
+    conf.setJdbcPassword(H2_JDBC_PASSWORD);
+    try (Connection conn = DriverManager.getConnection(H2_JDBC_URL,
+            H2_JDBC_USERNAME, H2_JDBC_PASSWORD);
+         Statement stmt = conn.createStatement()) {
+      stmt.execute("RUNSCRIPT FROM 'classpath:/db/notebook.sql'");
+    } catch (SQLException e) {
+      e.printStackTrace();
+    }
+  }
+
   @Test
   public void testValidNotebook() throws IOException, URISyntaxException {
     NotebookSpec notebookSpec = (NotebookSpec) buildFromJsonFile(NotebookSpec.class, notebookReqFile);
@@ -47,14 +72,15 @@
   }
 
   private void validateMetadata(NotebookMeta meta, V1ObjectMeta actualMeta) {
-    Assert.assertEquals(meta.getName(), actualMeta.getName());
+    Assert.assertEquals("notebook-1642402491519-0003-my-nb", actualMeta.getName());
     Assert.assertEquals(meta.getNamespace(), actualMeta.getNamespace());
     Assert.assertEquals(meta.getOwnerId(),
             actualMeta.getLabels().get(NotebookCR.NOTEBOOK_OWNER_SELECTOR_KEY));
   }
 
   private void validateEnvironment(NotebookSpec spec, NotebookCRSpec actualPodSpec) {
-    String expectedImage = spec.getEnvironment().getImage();
+    Environment env = NotebookSpecParser.getEnvironment(spec);
+    String expectedImage = env.getEnvironmentSpec().getDockerImage();
     String actualImage = actualPodSpec.getContainerImageName();
     Assert.assertEquals(expectedImage, actualImage);
   }
@@ -70,7 +96,7 @@
     for (Map.Entry<String, String> entry : podSpec.getEnvVars().entrySet()) {
       V1EnvVar env = new V1EnvVar();
       env.setName(entry.getKey());
-      env.setValue(env.getValue());
+      env.setValue(entry.getValue());
       Assert.assertTrue(notebook.getSpec().getEnvs().contains(env));
     }
 
diff --git a/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/SpecBuilder.java b/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/SpecBuilder.java
index 96f8908..1f401f4 100644
--- a/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/SpecBuilder.java
+++ b/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/SpecBuilder.java
@@ -34,9 +34,9 @@
 
 public abstract class SpecBuilder {
   // The spec files in test/resources
-  protected final String tfJobReqFile = "/tf_mnist_req.json";
-  protected final String pytorchJobReqFile = "/pytorch_job_req.json";
-  protected final String xgboostJobReqFile = "/xgboost_job_req.json";
+  public static final String tfJobReqFile = "/tf_mnist_req.json";
+  public static final String pytorchJobReqFile = "/pytorch_job_req.json";
+  public static final String xgboostJobReqFile = "/xgboost_job_req.json";
   protected final String pytorchJobWithEnvReqFile = "/pytorch_job_req_env.json";
   protected final String pytorchJobWithInvalidEnvReqFile =
       "/pytorch_job_req_invalid_env.json";
@@ -47,6 +47,11 @@
       "/pytorch_job_req_ssh_git_code_localizer.json";
   protected final String tfTfboardJobReqFile = "/tf_tfboard_mnist_req.json";
 
+  protected final String H2_JDBC_URL = "jdbc:h2:mem:submarine-test;MODE=MYSQL;DB_CLOSE_DELAY=-1";
+  protected final String H2_JDBC_DRIVERCLASS = "org.h2.Driver";
+  protected final String H2_JDBC_USERNAME = "root";
+  protected final String H2_JDBC_PASSWORD = "";
+
   protected Object buildFromJsonFile(Object obj, String filePath) throws IOException,
       URISyntaxException {
     Gson gson = new GsonBuilder().create();
@@ -55,7 +60,11 @@
       if (obj.equals(NotebookSpec.class)) {
         return gson.fromJson(reader, NotebookSpec.class);
       } else {
-        return gson.fromJson(reader, ExperimentSpec.class);
+        ExperimentSpec experimentSpec = gson.fromJson(reader, ExperimentSpec.class);
+        experimentSpec.getMeta().setExperimentId(
+            String.format("experiment-%s-0001", System.currentTimeMillis())
+        );
+        return experimentSpec;
       }
     }
   }
diff --git a/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/SubmitterTransactionTest.java b/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/SubmitterTransactionTest.java
index 6acdac6..1246188 100644
--- a/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/SubmitterTransactionTest.java
+++ b/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/SubmitterTransactionTest.java
@@ -29,8 +29,6 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.List;
-
 public class SubmitterTransactionTest {
 
   private static final Logger LOG = LoggerFactory.getLogger(SubmitterTransactionTest.class);
@@ -53,29 +51,41 @@
   }
 
   @Test
-  public void testOneFailedCommit() {
-    List<Object> commits = submitter.resourceTransaction(new FailedResource());
-    Assert.assertEquals(commits.size(), 0);
-  }
-
-  @Test
   public void testCombineCommit() {
-    List<Object> commits = submitter.resourceTransaction(new SuccessResource("test1"),
-            new FailedResource());
-    Assert.assertEquals(commits.size(), 1);
+    SuccessResource resource1 = new SuccessResource("test1");
+    FailedResource resource2 = new FailedResource();
+    try {
+      submitter.resourceTransaction(resource1, resource2);
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+    Assert.assertFalse(resource1.isCommit());
   }
 
   @Test
   public void testCommitOrder() {
-    List<Object> commits = submitter.resourceTransaction(new SuccessResource("test1"),
-            new FailedResource(), new SuccessResource("test3"));
-    Assert.assertEquals(commits.size(), 1);
+    SuccessResource resource1 = new SuccessResource("test1");
+    FailedResource resource2 = new FailedResource();
+    SuccessResource resource3 = new SuccessResource("test3\"");
+    try {
+      submitter.resourceTransaction(resource1, resource2, resource3);
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+    Assert.assertFalse(resource1.isCommit());
+    Assert.assertFalse(resource3.isCommit());
   }
 
 }
 
 class SuccessResource implements K8sResource<SuccessResource> {
 
+  private boolean commit = false;
+
+  public boolean isCommit() {
+    return commit;
+  }
+
   private final String name;
 
   SuccessResource(String name) {
@@ -99,16 +109,19 @@
 
   @Override
   public SuccessResource create(K8sClient api) {
+    this.commit = true;
     return this;
   }
 
   @Override
   public SuccessResource replace(K8sClient api) {
+    this.commit = true;
     return this;
   }
 
   @Override
   public SuccessResource delete(K8sClient api) {
+    this.commit = false;
     return this;
   }
 }
diff --git a/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/client/K8sMockClient.java b/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/client/K8sMockClient.java
index 684b30c..c6d19a4 100644
--- a/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/client/K8sMockClient.java
+++ b/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/client/K8sMockClient.java
@@ -88,6 +88,11 @@
   private final GenericKubernetesApi<IstioVirtualService, IstioVirtualServiceList> istioVirtualServiceClient;
   private final GenericKubernetesApi<V1Pod, V1PodList> podClient;
 
+  // train operator client
+  private final GenericKubernetesApi<TFJob, TFJobList> tfJobClient;
+  private final GenericKubernetesApi<PyTorchJob, PyTorchJobList> pyTorchJobClient;
+  private final GenericKubernetesApi<XGBoostJob, XGBoostJobList> xgboostJobClient;
+
   private static final WireMockRule wireMockRule = new WireMockRule(8384);
 
   public static WireMockRule getWireMockRule() {
@@ -139,6 +144,22 @@
             new GenericKubernetesApi<>(
                     V1Pod.class, V1PodList.class,
                     "", "v1", "pods", apiClient);
+
+    tfJobClient =
+            new GenericKubernetesApi<>(
+                    TFJob.class, TFJobList.class,
+                    TFJob.CRD_TF_GROUP_V1, TFJob.CRD_TF_VERSION_V1,
+                    TFJob.CRD_TF_PLURAL_V1, apiClient);
+    pyTorchJobClient =
+            new GenericKubernetesApi<>(
+                    PyTorchJob.class, PyTorchJobList.class,
+                    PyTorchJob.CRD_PYTORCH_GROUP_V1, PyTorchJob.CRD_PYTORCH_VERSION_V1,
+                    PyTorchJob.CRD_PYTORCH_PLURAL_V1, apiClient);
+    xgboostJobClient =
+            new GenericKubernetesApi<>(
+                    XGBoostJob.class, XGBoostJobList.class,
+                    XGBoostJob.CRD_XGBOOST_GROUP_V1, XGBoostJob.CRD_XGBOOST_VERSION_V1,
+                    XGBoostJob.CRD_XGBOOST_PLURAL_V1, apiClient);
   }
 
   public K8sMockClient(MappingBuilder... mappingBuilders) throws IOException {
@@ -192,17 +213,17 @@
 
   @Override
   public GenericKubernetesApi<TFJob, TFJobList> getTfJobClient() {
-    return null;
+    return tfJobClient;
   }
 
   @Override
   public GenericKubernetesApi<PyTorchJob, PyTorchJobList> getPyTorchJobClient() {
-    return null;
+    return pyTorchJobClient;
   }
 
   @Override
   public GenericKubernetesApi<XGBoostJob, XGBoostJobList> getXGBoostJobClient() {
-    return null;
+    return xgboostJobClient;
   }
 
   @Override
diff --git a/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/client/MockClientUtil.java b/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/client/MockClientUtil.java
index 8f37c0d..0649922 100644
--- a/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/client/MockClientUtil.java
+++ b/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/client/MockClientUtil.java
@@ -46,6 +46,18 @@
     return String.format("/api/v1/namespaces/%s/pods/%s", namespace, name);
   }
 
+  public static String getTfJobUrl(String namespace, String name) {
+    return String.format("/apis/kubeflow.org/v1/namespaces/%s/tfjobs/%s", namespace, name);
+  }
+
+  public static String getPytorchJobUrl(String namespace, String name) {
+    return String.format("/apis/kubeflow.org/v1/namespaces/%s/pytorchjobs/%s", namespace, name);
+  }
+
+  public static String getXGBoostJobUrl(String namespace, String name) {
+    return String.format("/apis/kubeflow.org/v1/namespaces/%s/xgboostjobs/%s", namespace, name);
+  }
+
   public static String getMockSuccessStatus(String name) {
     V1Status status = new V1Status();
     status.setApiVersion("v1");
diff --git a/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/mljob/SubmitterFileUtil.java b/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/mljob/SubmitterFileUtil.java
new file mode 100644
index 0000000..b997d93
--- /dev/null
+++ b/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/mljob/SubmitterFileUtil.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.submarine.server.submitter.k8s.mljob;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import org.apache.submarine.server.api.spec.ExperimentSpec;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Reader;
+import java.net.URISyntaxException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+
+public class SubmitterFileUtil {
+
+  public static ExperimentSpec buildFromJsonFile(String experimentId, String filePath) throws IOException,
+          URISyntaxException {
+    Gson gson = new GsonBuilder().create();
+    try (Reader reader = Files.newBufferedReader(
+            new File(SubmitterFileUtil.class.getResource(filePath).toURI()).toPath(),
+            StandardCharsets.UTF_8)) {
+      ExperimentSpec experimentSpec = gson.fromJson(reader, ExperimentSpec.class);
+      experimentSpec.getMeta().setExperimentId(experimentId);
+      return experimentSpec;
+    }
+  }
+
+}
diff --git a/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/mljob/SubmitterPyTorchApiTest.java b/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/mljob/SubmitterPyTorchApiTest.java
new file mode 100644
index 0000000..11b3730
--- /dev/null
+++ b/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/mljob/SubmitterPyTorchApiTest.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.submarine.server.submitter.k8s.mljob;
+
+import com.github.tomakehurst.wiremock.client.MappingBuilder;
+import com.github.tomakehurst.wiremock.junit.WireMockRule;
+import com.github.tomakehurst.wiremock.matching.EqualToPattern;
+import org.apache.submarine.server.api.common.CustomResourceType;
+import org.apache.submarine.server.api.experiment.Experiment;
+import org.apache.submarine.server.api.spec.ExperimentSpec;
+import org.apache.submarine.server.submitter.k8s.K8sSubmitter;
+import org.apache.submarine.server.submitter.k8s.SpecBuilder;
+import org.apache.submarine.server.submitter.k8s.client.K8sClient;
+import org.apache.submarine.server.submitter.k8s.client.K8sMockClient;
+import org.apache.submarine.server.submitter.k8s.client.MockClientUtil;
+import org.apache.submarine.server.submitter.k8s.model.AgentPod;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+
+import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
+import static com.github.tomakehurst.wiremock.client.WireMock.delete;
+import static com.github.tomakehurst.wiremock.client.WireMock.get;
+import static com.github.tomakehurst.wiremock.client.WireMock.post;
+import static com.github.tomakehurst.wiremock.client.WireMock.urlPathEqualTo;
+import static org.apache.submarine.server.submitter.k8s.client.K8sMockClient.getResourceFileContent;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+public class SubmitterPyTorchApiTest {
+
+  private static final Logger LOG = LoggerFactory.getLogger(SubmitterPyTorchApiTest.class);
+
+  private K8sSubmitter submitter;
+
+  private static final String namespace = "default";
+  private static final String experimentId = "experiment-1658656463509-0001";
+  private ExperimentSpec experimentSpec;
+
+  @Rule
+  public WireMockRule wireMockRule = K8sMockClient.getWireMockRule();
+
+  @Before
+  public void setup() throws IOException, URISyntaxException {
+    experimentSpec = SubmitterFileUtil.buildFromJsonFile(experimentId, SpecBuilder.pytorchJobReqFile);
+
+    // save pytorch url
+    MappingBuilder pytorchPost = post(urlPathEqualTo(
+            "/apis/kubeflow.org/v1/namespaces/default/pytorchjobs"))
+        .withHeader("Content-Type", new EqualToPattern("application/json; charset=UTF-8"))
+        .willReturn(
+            aResponse()
+              .withStatus(200)
+              .withBody(getResourceFileContent("client/experiment/pytorch-read-api.json")));
+    // save pod agent url
+    String agentName = AgentPod.getNormalizePodName(
+            CustomResourceType.PyTorchJob, "pytorch-dist-mnist", experimentId);
+    MappingBuilder agentPost = post(urlPathEqualTo("/api/v1/namespaces/default/pods"))
+        .withHeader("Content-Type", new EqualToPattern("application/json; charset=UTF-8"))
+        .willReturn(
+            aResponse()
+                .withStatus(200)
+                .withBody("{\"metadata\":{\"name\":\"" + agentName + "\"," + "\"namespace\":\"default\"}}"));
+
+    // get pytorch url
+    MappingBuilder pytorchGet = get(urlPathEqualTo(
+        MockClientUtil.getPytorchJobUrl(namespace, experimentId)))
+        .withHeader("Content-Type", new EqualToPattern("application/json"))
+        .willReturn(
+            aResponse()
+                .withStatus(200)
+                .withBody(getResourceFileContent("client/experiment/pytorch-read-api.json")));
+
+    //  delete pytorch url
+    MappingBuilder pytorchDelete = delete(urlPathEqualTo(
+        MockClientUtil.getPytorchJobUrl(namespace, experimentId)))
+        .withHeader("Content-Type", new EqualToPattern("application/json; charset=UTF-8"))
+        .willReturn(
+            aResponse()
+                .withStatus(200)
+                .withBody(getResourceFileContent("client/experiment/pytorch-delete-api.json")));
+    //  delete agent pod url
+    MappingBuilder agentDelete = delete(urlPathEqualTo(MockClientUtil.getPodUrl(namespace, agentName)))
+        .withHeader("Content-Type", new EqualToPattern("application/json; charset=UTF-8"))
+        .willReturn(
+            aResponse()
+                .withStatus(200)
+                .withBody(MockClientUtil.getMockSuccessStatus(agentName)));
+
+    K8sClient k8sClient = new K8sMockClient(pytorchPost, agentPost, pytorchGet, pytorchDelete, agentDelete);
+    try {
+      submitter = new K8sSubmitter(k8sClient);
+      submitter.initialize(null);
+    } catch (Exception e) {
+      LOG.warn("Init K8sSubmitter failed, but we can continue", e);
+    }
+  }
+
+  @Test
+  public void testCreatePyTorchJob() {
+    // create pytorch
+    Experiment experiment = submitter.createExperiment(experimentSpec);
+    // check return value
+    assertNotNull(experiment);
+    assertNotNull(experiment.getUid());
+    assertEquals(experiment.getStatus(), "Running");
+  }
+
+  @Test
+  public void testFindPyTorchJob() {
+    // get pytorch
+    Experiment experiment = submitter.findExperiment(experimentSpec);
+    // check return value
+    assertNotNull(experiment);
+    assertNotNull(experiment.getUid());
+    assertEquals("status is not running", experiment.getStatus(), "Running");
+  }
+
+  @Test
+  public void testDeletePyTorchJob() {
+    // delete pytorch
+    Experiment experiment = submitter.deleteExperiment(experimentSpec);
+    // check return value
+    assertNotNull(experiment);
+  }
+}
diff --git a/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/mljob/SubmitterTensorflowApiTest.java b/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/mljob/SubmitterTensorflowApiTest.java
new file mode 100644
index 0000000..7959ab2
--- /dev/null
+++ b/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/mljob/SubmitterTensorflowApiTest.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.submarine.server.submitter.k8s.mljob;
+
+import com.github.tomakehurst.wiremock.client.MappingBuilder;
+import com.github.tomakehurst.wiremock.junit.WireMockRule;
+import com.github.tomakehurst.wiremock.matching.EqualToPattern;
+import org.apache.submarine.server.api.common.CustomResourceType;
+import org.apache.submarine.server.api.experiment.Experiment;
+import org.apache.submarine.server.api.spec.ExperimentSpec;
+import org.apache.submarine.server.submitter.k8s.K8sSubmitter;
+import org.apache.submarine.server.submitter.k8s.SpecBuilder;
+import org.apache.submarine.server.submitter.k8s.client.K8sClient;
+import org.apache.submarine.server.submitter.k8s.client.K8sMockClient;
+import org.apache.submarine.server.submitter.k8s.client.MockClientUtil;
+import org.apache.submarine.server.submitter.k8s.model.AgentPod;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+
+import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
+import static com.github.tomakehurst.wiremock.client.WireMock.delete;
+import static com.github.tomakehurst.wiremock.client.WireMock.get;
+import static com.github.tomakehurst.wiremock.client.WireMock.post;
+import static com.github.tomakehurst.wiremock.client.WireMock.urlPathEqualTo;
+import static org.apache.submarine.server.submitter.k8s.client.K8sMockClient.getResourceFileContent;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+public class SubmitterTensorflowApiTest {
+
+  private static final Logger LOG = LoggerFactory.getLogger(SubmitterTensorflowApiTest.class);
+
+  private K8sSubmitter submitter;
+
+  private static final String namespace = "default";
+  private static final String experimentId = "experiment-1659167632755-0001";
+  private ExperimentSpec experimentSpec;
+
+  @Rule
+  public WireMockRule wireMockRule = K8sMockClient.getWireMockRule();
+
+  @Before
+  public void setup() throws IOException, URISyntaxException {
+    experimentSpec = SubmitterFileUtil.buildFromJsonFile(experimentId, SpecBuilder.tfJobReqFile);
+
+    // save tf url
+    MappingBuilder tfPost = post(urlPathEqualTo(
+        "/apis/kubeflow.org/v1/namespaces/default/tfjobs"))
+        .withHeader("Content-Type", new EqualToPattern("application/json; charset=UTF-8"))
+        .willReturn(
+            aResponse()
+                .withStatus(200)
+                .withBody(getResourceFileContent("client/experiment/tf-read-api.json")));
+    // save pod agent url
+    String agentName = AgentPod.getNormalizePodName(
+        CustomResourceType.TFJob, "tensorflow-dist-mnist", experimentId);
+    MappingBuilder agentPost = post(urlPathEqualTo("/api/v1/namespaces/default/pods"))
+        .withHeader("Content-Type", new EqualToPattern("application/json; charset=UTF-8"))
+        .willReturn(
+            aResponse()
+                .withStatus(200)
+                .withBody("{\"metadata\":{\"name\":\"" + agentName + "\"," + "\"namespace\":\"default\"}}"));
+
+    // get tf url
+    MappingBuilder tfGet = get(urlPathEqualTo(
+        MockClientUtil.getTfJobUrl(namespace, experimentId)))
+        .withHeader("Content-Type", new EqualToPattern("application/json"))
+        .willReturn(
+            aResponse()
+                .withStatus(200)
+                .withBody(getResourceFileContent("client/experiment/tf-read-api.json")));
+
+    //  delete tf url
+    MappingBuilder tfDelete = delete(urlPathEqualTo(
+        MockClientUtil.getTfJobUrl(namespace, experimentId)))
+        .withHeader("Content-Type", new EqualToPattern("application/json; charset=UTF-8"))
+        .willReturn(
+            aResponse()
+                .withStatus(200)
+                .withBody(getResourceFileContent("client/experiment/tf-delete-api.json")));
+    //  delete agent pod url
+    MappingBuilder agentDelete = delete(urlPathEqualTo(MockClientUtil.getPodUrl(namespace, agentName)))
+        .withHeader("Content-Type", new EqualToPattern("application/json; charset=UTF-8"))
+        .willReturn(
+            aResponse()
+                .withStatus(200)
+                .withBody(MockClientUtil.getMockSuccessStatus(agentName)));
+
+    K8sClient k8sClient = new K8sMockClient(tfPost, agentPost, tfGet, tfDelete, agentDelete);
+    try {
+      submitter = new K8sSubmitter(k8sClient);
+      submitter.initialize(null);
+    } catch (Exception e) {
+      LOG.warn("Init K8sSubmitter failed, but we can continue", e);
+    }
+  }
+
+  @Test
+  public void testCreateTensorflowJob() {
+    // create tensorflow
+    Experiment experiment = submitter.createExperiment(experimentSpec);
+    // check return value
+    assertNotNull(experiment);
+    assertNotNull(experiment.getUid());
+    assertEquals(experiment.getStatus(), "Running");
+  }
+
+  @Test
+  public void testFindTensorflowJob() {
+    // get tensorflow
+    Experiment experiment = submitter.findExperiment(experimentSpec);
+    // check return value
+    assertNotNull(experiment);
+    assertNotNull(experiment.getUid());
+    assertEquals("status is not running", experiment.getStatus(), "Running");
+  }
+
+  @Test
+  public void testDeleteTensorflowJob() {
+    // delete tensorflow
+    Experiment experiment = submitter.deleteExperiment(experimentSpec);
+    // check return value
+    assertNotNull(experiment);
+  }
+}
diff --git a/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/mljob/SubmitterXGBoostApiTest.java b/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/mljob/SubmitterXGBoostApiTest.java
new file mode 100644
index 0000000..b8f9317
--- /dev/null
+++ b/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/mljob/SubmitterXGBoostApiTest.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.submarine.server.submitter.k8s.mljob;
+
+import com.github.tomakehurst.wiremock.client.MappingBuilder;
+import com.github.tomakehurst.wiremock.junit.WireMockRule;
+import com.github.tomakehurst.wiremock.matching.EqualToPattern;
+import org.apache.submarine.server.api.common.CustomResourceType;
+import org.apache.submarine.server.api.experiment.Experiment;
+import org.apache.submarine.server.api.spec.ExperimentSpec;
+import org.apache.submarine.server.submitter.k8s.K8sSubmitter;
+import org.apache.submarine.server.submitter.k8s.SpecBuilder;
+import org.apache.submarine.server.submitter.k8s.client.K8sClient;
+import org.apache.submarine.server.submitter.k8s.client.K8sMockClient;
+import org.apache.submarine.server.submitter.k8s.client.MockClientUtil;
+import org.apache.submarine.server.submitter.k8s.model.AgentPod;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+
+import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
+import static com.github.tomakehurst.wiremock.client.WireMock.delete;
+import static com.github.tomakehurst.wiremock.client.WireMock.get;
+import static com.github.tomakehurst.wiremock.client.WireMock.post;
+import static com.github.tomakehurst.wiremock.client.WireMock.urlPathEqualTo;
+import static org.apache.submarine.server.submitter.k8s.client.K8sMockClient.getResourceFileContent;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+public class SubmitterXGBoostApiTest {
+
+  private static final Logger LOG = LoggerFactory.getLogger(SubmitterXGBoostApiTest.class);
+
+  private K8sSubmitter submitter;
+
+  private static final String namespace = "default";
+  private static final String experimentId = "experiment-1659181695811-0001";
+  private ExperimentSpec experimentSpec;
+
+  @Rule
+  public WireMockRule wireMockRule = K8sMockClient.getWireMockRule();
+
+  @Before
+  public void setup() throws IOException, URISyntaxException {
+    experimentSpec = SubmitterFileUtil.buildFromJsonFile(experimentId, SpecBuilder.xgboostJobReqFile);
+
+    // save xgboost url
+    MappingBuilder xgboostPost = post(urlPathEqualTo(
+        "/apis/kubeflow.org/v1/namespaces/default/xgboostjobs"))
+        .withHeader("Content-Type", new EqualToPattern("application/json; charset=UTF-8"))
+        .willReturn(
+            aResponse()
+                .withStatus(200)
+                .withBody(getResourceFileContent("client/experiment/xgboost-read-api.json")));
+    // save pod agent url
+    String agentName = AgentPod.getNormalizePodName(
+          CustomResourceType.XGBoost, "xgboost-dist-mnist", experimentId);
+    MappingBuilder agentPost = post(urlPathEqualTo("/api/v1/namespaces/default/pods"))
+        .withHeader("Content-Type", new EqualToPattern("application/json; charset=UTF-8"))
+        .willReturn(
+            aResponse()
+                .withStatus(200)
+                .withBody("{\"metadata\":{\"name\":\"" + agentName + "\"," + "\"namespace\":\"default\"}}"));
+
+    // get xgboost url
+    MappingBuilder xgboostGet = get(urlPathEqualTo(
+        MockClientUtil.getXGBoostJobUrl(namespace, experimentId)))
+        .withHeader("Content-Type", new EqualToPattern("application/json"))
+        .willReturn(
+            aResponse()
+                .withStatus(200)
+                .withBody(getResourceFileContent("client/experiment/xgboost-read-api.json")));
+
+    //  delete xgboost url
+    MappingBuilder xgboostDelete = delete(urlPathEqualTo(
+        MockClientUtil.getXGBoostJobUrl(namespace, experimentId)))
+        .withHeader("Content-Type", new EqualToPattern("application/json; charset=UTF-8"))
+        .willReturn(
+            aResponse()
+                .withStatus(200)
+                .withBody(getResourceFileContent("client/experiment/xgboost-delete-api.json")));
+    //  delete agent pod url
+    MappingBuilder agentDelete = delete(urlPathEqualTo(MockClientUtil.getPodUrl(namespace, agentName)))
+        .withHeader("Content-Type", new EqualToPattern("application/json; charset=UTF-8"))
+        .willReturn(
+            aResponse()
+                .withStatus(200)
+                .withBody(MockClientUtil.getMockSuccessStatus(agentName)));
+
+    K8sClient k8sClient = new K8sMockClient(xgboostPost, agentPost,
+        xgboostGet, xgboostDelete, agentDelete);
+    try {
+      submitter = new K8sSubmitter(k8sClient);
+      submitter.initialize(null);
+    } catch (Exception e) {
+      LOG.warn("Init K8sSubmitter failed, but we can continue", e);
+    }
+  }
+
+  @Test
+  public void testCreateXGBoostJob() {
+    // create XGBoost
+    Experiment experiment = submitter.createExperiment(experimentSpec);
+    // check return value
+    assertNotNull(experiment);
+    assertNotNull(experiment.getUid());
+    assertEquals(experiment.getStatus(), "Running");
+  }
+
+  @Test
+  public void testFindXGBoostJob() {
+    // get XGBoost
+    Experiment experiment = submitter.findExperiment(experimentSpec);
+    // check return value
+    assertNotNull(experiment);
+    assertNotNull(experiment.getUid());
+    assertEquals("status is not running", experiment.getStatus(), "Running");
+  }
+
+  @Test
+  public void testDeleteXGBoostJob() {
+    // delete XGBoost
+    Experiment experiment = submitter.deleteExperiment(experimentSpec);
+    // check return value
+    assertNotNull(experiment);
+  }
+}
diff --git a/submarine-server/server-submitter/submitter-k8s/src/test/resources/client/experiment/pytorch-delete-api.json b/submarine-server/server-submitter/submitter-k8s/src/test/resources/client/experiment/pytorch-delete-api.json
new file mode 100644
index 0000000..77e0681
--- /dev/null
+++ b/submarine-server/server-submitter/submitter-k8s/src/test/resources/client/experiment/pytorch-delete-api.json
@@ -0,0 +1,12 @@
+{
+  "apiVersion": "v1",
+  "details": {
+    "group": "kubeflow.org",
+    "kind": "PyTorchJob",
+    "name": "experiment-1658656463509-0001",
+    "uid": "b95d6769-26ff-469c-a346-d5399f543f39"
+  },
+  "kind": "Status",
+  "metadata": {},
+  "status": "Success"
+}
diff --git a/submarine-server/server-submitter/submitter-k8s/src/test/resources/client/experiment/pytorch-read-api.json b/submarine-server/server-submitter/submitter-k8s/src/test/resources/client/experiment/pytorch-read-api.json
new file mode 100644
index 0000000..18c92f3
--- /dev/null
+++ b/submarine-server/server-submitter/submitter-k8s/src/test/resources/client/experiment/pytorch-read-api.json
@@ -0,0 +1,160 @@
+{
+  "apiVersion": "kubeflow.org/v1",
+  "kind": "PyTorchJob",
+  "metadata": {
+    "creationTimestamp": "2022-07-24T17:54:23.000+08:00",
+    "generation": 1,
+    "labels": {
+      "submarine-experiment-name": "pytorch-dist-mnist"
+    },
+    "name": "experiment-1658656463509-0001",
+    "namespace": "default",
+    "resourceVersion": "26841",
+    "uid": "b95d6769-26ff-469c-a346-d5399f543f39"
+  },
+  "spec": {
+    "pytorchReplicaSpecs": {
+      "Master": {
+        "replicas": 1,
+        "template": {
+          "metadata": {
+            "annotations": {
+              "sidecar.istio.io/inject": "false"
+            }
+          },
+          "spec": {
+            "containers": [
+              {
+                "command": [
+                  "python",
+                  "/var/mnist.py",
+                  "--backend",
+                  "gloo"
+                ],
+                "env": [
+                  {
+                    "name": "ENV_1",
+                    "value": "ENV1"
+                  }
+                ],
+                "image": "apache/submarine:pytorch-dist-mnist-1.0",
+                "name": "pytorch",
+                "resources": {
+                  "limits": {
+                    "cpu": "2",
+                    "memory": "4096M"
+                  },
+                  "requests": {
+                    "cpu": "2",
+                    "memory": "2048M"
+                  }
+                },
+                "volumeMounts": [
+                  {
+                    "mountPath": "/logs",
+                    "name": "volume",
+                    "subPath": "submarine-tensorboard/pytorch-dist-mnist"
+                  }
+                ]
+              }
+            ],
+            "volumes": [
+              {
+                "name": "volume",
+                "persistentVolumeClaim": {
+                  "claimName": "submarine-tensorboard-pvc"
+                }
+              }
+            ]
+          }
+        },
+        "restartPolicy": "OnFailure"
+      },
+      "Worker": {
+        "replicas": 2,
+        "template": {
+          "metadata": {
+            "annotations": {
+              "sidecar.istio.io/inject": "false"
+            }
+          },
+          "spec": {
+            "containers": [
+              {
+                "command": [
+                  "python",
+                  "/var/mnist.py",
+                  "--backend",
+                  "gloo"
+                ],
+                "env": [
+                  {
+                    "name": "ENV_1",
+                    "value": "ENV1"
+                  }
+                ],
+                "image": "apache/submarine:pytorch-dist-mnist-1.0",
+                "name": "pytorch",
+                "resources": {
+                  "limits": {
+                    "cpu": "1",
+                    "memory": "2048M"
+                  },
+                  "requests": {
+                    "cpu": "1",
+                    "memory": "1024M"
+                  }
+                },
+                "volumeMounts": [
+                  {
+                    "mountPath": "/logs",
+                    "name": "volume",
+                    "subPath": "submarine-tensorboard/pytorch-dist-mnist"
+                  }
+                ]
+              }
+            ],
+            "volumes": [
+              {
+                "name": "volume",
+                "persistentVolumeClaim": {
+                  "claimName": "submarine-tensorboard-pvc"
+                }
+              }
+            ]
+          }
+        },
+        "restartPolicy": "OnFailure"
+      }
+    },
+    "backoffLimit": 3
+  },
+  "status": {
+    "conditions": [
+      {
+        "lastTransitionTime": "2022-07-24T11:23:25Z",
+        "lastUpdateTime": "2022-07-24T11:23:25Z",
+        "message": "PyTorchJob experiment-1658656463509-0001 is created.",
+        "reason": "PyTorchJobCreated",
+        "status": "True",
+        "type": "Created"
+      },
+      {
+        "lastTransitionTime": "2022-07-24T11:23:26Z",
+        "lastUpdateTime": "2022-07-24T11:23:26Z",
+        "message": "PyTorchJob experiment-1658656463509-0001 is running.",
+        "reason": "PyTorchJobRunning",
+        "status": "True",
+        "type": "Running"
+      }
+    ],
+    "replicaStatuses": {
+      "Master": {
+        "active": 1
+      },
+      "Worker": {
+        "active": 1
+      }
+    }
+  }
+}
diff --git a/submarine-server/server-submitter/submitter-k8s/src/test/resources/client/experiment/tf-delete-api.json b/submarine-server/server-submitter/submitter-k8s/src/test/resources/client/experiment/tf-delete-api.json
new file mode 100644
index 0000000..a0a1bf4
--- /dev/null
+++ b/submarine-server/server-submitter/submitter-k8s/src/test/resources/client/experiment/tf-delete-api.json
@@ -0,0 +1,12 @@
+{
+  "apiVersion": "v1",
+  "details": {
+    "group": "kubeflow.org",
+    "kind": "TFJob",
+    "name": "experiment-1659167632755-0001",
+    "uid": "d9b3c2dd-ce17-400a-a4f7-2781294ff3d5"
+  },
+  "kind": "Status",
+  "metadata": {},
+  "status": "Success"
+}
diff --git a/submarine-server/server-submitter/submitter-k8s/src/test/resources/client/experiment/tf-read-api.json b/submarine-server/server-submitter/submitter-k8s/src/test/resources/client/experiment/tf-read-api.json
new file mode 100644
index 0000000..d0ef50f
--- /dev/null
+++ b/submarine-server/server-submitter/submitter-k8s/src/test/resources/client/experiment/tf-read-api.json
@@ -0,0 +1,164 @@
+{
+  "apiVersion": "kubeflow.org/v1",
+  "kind": "TFJob",
+  "metadata": {
+    "creationTimestamp": "2022-07-30T15:53:52.000+08:00",
+    "generation": 1,
+    "labels": {
+      "submarine-experiment-name": "tensorflow-dist-mnist"
+    },
+    "name": "experiment-1659167632755-0001",
+    "namespace": "default",
+    "resourceVersion": "39556",
+    "uid": "d9b3c2dd-ce17-400a-a4f7-2781294ff3d5"
+  },
+  "spec": {
+    "tfReplicaSpecs": {
+      "Ps": {
+        "replicas": 1,
+        "template": {
+          "metadata": {
+            "annotations": {
+              "sidecar.istio.io/inject": "false"
+            }
+          },
+          "spec": {
+            "containers": [
+              {
+                "command": [
+                  "python",
+                  "/var/tf_mnist/mnist_with_summaries.py",
+                  "--log_dir\u003d/train/log",
+                  "--learning_rate\u003d0.01",
+                  "--batch_size\u003d150"
+                ],
+                "env": [
+                  {
+                    "name": "ENV_1",
+                    "value": "ENV1"
+                  }
+                ],
+                "image": "apache/submarine:tf-mnist-with-summaries-1.0",
+                "name": "tensorflow",
+                "resources": {
+                  "limits": {
+                    "cpu": "4",
+                    "memory": "4096M"
+                  },
+                  "requests": {
+                    "cpu": "4",
+                    "memory": "2048M"
+                  }
+                },
+                "volumeMounts": [
+                  {
+                    "mountPath": "/logs",
+                    "name": "volume",
+                    "subPath": "submarine-tensorboard/tensorflow-dist-mnist"
+                  }
+                ]
+              }
+            ],
+            "volumes": [
+              {
+                "name": "volume",
+                "persistentVolumeClaim": {
+                  "claimName": "submarine-tensorboard-pvc"
+                }
+              }
+            ]
+          }
+        },
+        "restartPolicy": "OnFailure"
+      },
+      "Worker": {
+        "replicas": 2,
+        "template": {
+          "metadata": {
+            "annotations": {
+              "sidecar.istio.io/inject": "false"
+            }
+          },
+          "spec": {
+            "containers": [
+              {
+                "command": [
+                  "python",
+                  "/var/tf_mnist/mnist_with_summaries.py",
+                  "--log_dir\u003d/train/log",
+                  "--learning_rate\u003d0.01",
+                  "--batch_size\u003d150"
+                ],
+                "env": [
+                  {
+                    "name": "ENV_1",
+                    "value": "ENV1"
+                  }
+                ],
+                "image": "apache/submarine:tf-mnist-with-summaries-1.0",
+                "name": "tensorflow",
+                "resources": {
+                  "limits": {
+                    "cpu": "2",
+                    "memory": "2048M",
+                    "nvidia.com/gpu": "1"
+                  },
+                  "requests": {
+                    "cpu": "2",
+                    "memory": "1024M",
+                    "nvidia.com/gpu": "1"
+                  }
+                },
+                "volumeMounts": [
+                  {
+                    "mountPath": "/logs",
+                    "name": "volume",
+                    "subPath": "submarine-tensorboard/tensorflow-dist-mnist"
+                  }
+                ]
+              }
+            ],
+            "volumes": [
+              {
+                "name": "volume",
+                "persistentVolumeClaim": {
+                  "claimName": "submarine-tensorboard-pvc"
+                }
+              }
+            ]
+          }
+        },
+        "restartPolicy": "OnFailure"
+      }
+    },
+    "backoffLimit": 3
+  },
+  "status": {
+    "conditions": [
+      {
+        "lastTransitionTime": "2022-07-30T07:59:01Z",
+        "lastUpdateTime": "2022-07-30T07:59:01Z",
+        "message": "TFJob experiment-1659167632755-0001 is created.",
+        "reason": "TFJobCreated",
+        "status": "True",
+        "type": "Created"
+      },
+      {
+        "lastTransitionTime": "2022-07-30T07:59:02Z",
+        "lastUpdateTime": "2022-07-30T07:59:02Z",
+        "message": "TFJob default/experiment-1659167632755-0001 is running.",
+        "reason": "TFJobRunning",
+        "status": "True",
+        "type": "Running"
+      }
+    ],
+    "replicaStatuses": {
+      "PS": {
+        "active": 1
+      },
+      "Worker": {
+        "active": 2
+      }
+    }
+  }
+}
diff --git a/submarine-server/server-submitter/submitter-k8s/src/test/resources/client/experiment/xgboost-delete-api.json b/submarine-server/server-submitter/submitter-k8s/src/test/resources/client/experiment/xgboost-delete-api.json
new file mode 100644
index 0000000..0077eec
--- /dev/null
+++ b/submarine-server/server-submitter/submitter-k8s/src/test/resources/client/experiment/xgboost-delete-api.json
@@ -0,0 +1,12 @@
+{
+  "apiVersion": "v1",
+  "details": {
+    "group": "kubeflow.org",
+    "kind": "TFJob",
+    "name": "experiment-1659181695811-0001",
+    "uid": "2e5a067e-153a-4a29-bee0-4e3ebfefad22"
+  },
+  "kind": "Status",
+  "metadata": {},
+  "status": "Success"
+}
diff --git a/submarine-server/server-submitter/submitter-k8s/src/test/resources/client/experiment/xgboost-read-api.json b/submarine-server/server-submitter/submitter-k8s/src/test/resources/client/experiment/xgboost-read-api.json
new file mode 100644
index 0000000..0742929
--- /dev/null
+++ b/submarine-server/server-submitter/submitter-k8s/src/test/resources/client/experiment/xgboost-read-api.json
@@ -0,0 +1,166 @@
+{
+  "apiVersion": "kubeflow.org/v1",
+  "kind": "XGBoostJob",
+  "metadata": {
+    "creationTimestamp": "2022-07-30T19:48:15.000+08:00",
+    "generation": 1,
+    "labels": {
+      "submarine-experiment-name": "xgboost-dist-mnist"
+    },
+    "name": "experiment-1659181695811-0001",
+    "namespace": "default",
+    "resourceVersion": "52320",
+    "uid": "2e5a067e-153a-4a29-bee0-4e3ebfefad22"
+  },
+  "spec": {
+    "xgbReplicaSpecs": {
+      "Master": {
+        "replicas": 1,
+        "template": {
+          "metadata": {
+            "annotations": {
+              "sidecar.istio.io/inject": "false"
+            }
+          },
+          "spec": {
+            "containers": [
+              {
+                "command": [
+                  "python",
+                  "/opt/mlkube/main.py",
+                  "--job_type\u003dTrain",
+                  "--xgboost_parameter\u003dobjective:multi:softprob,num_class:3",
+                  "",
+                  "--n_estimators\u003d10",
+                  "--learning_rate\u003d0.1"
+                ],
+                "env": [
+                  {
+                    "name": "ENV_1",
+                    "value": "ENV1"
+                  }
+                ],
+                "image": "apache/submarine:xgboost-dist-iris-1.0",
+                "name": "xgboost",
+                "resources": {
+                  "limits": {
+                    "cpu": "2",
+                    "memory": "4096M"
+                  },
+                  "requests": {
+                    "cpu": "2",
+                    "memory": "2048M"
+                  }
+                },
+                "volumeMounts": [
+                  {
+                    "mountPath": "/logs",
+                    "name": "volume",
+                    "subPath": "submarine-tensorboard/xgboost-dist-mnist"
+                  }
+                ]
+              }
+            ],
+            "volumes": [
+              {
+                "name": "volume",
+                "persistentVolumeClaim": {
+                  "claimName": "submarine-tensorboard-pvc"
+                }
+              }
+            ]
+          }
+        },
+        "restartPolicy": "OnFailure"
+      },
+      "Worker": {
+        "replicas": 2,
+        "template": {
+          "metadata": {
+            "annotations": {
+              "sidecar.istio.io/inject": "false"
+            }
+          },
+          "spec": {
+            "containers": [
+              {
+                "command": [
+                  "python",
+                  "/opt/mlkube/main.py",
+                  "--job_type\u003dTrain",
+                  "--xgboost_parameter\u003dobjective:multi:softprob,num_class:3",
+                  "",
+                  "--n_estimators\u003d10",
+                  "--learning_rate\u003d0.1"
+                ],
+                "env": [
+                  {
+                    "name": "ENV_1",
+                    "value": "ENV1"
+                  }
+                ],
+                "image": "apache/submarine:xgboost-dist-iris-1.0",
+                "name": "xgboost",
+                "resources": {
+                  "limits": {
+                    "cpu": "1",
+                    "memory": "2048M"
+                  },
+                  "requests": {
+                    "cpu": "1",
+                    "memory": "1024M"
+                  }
+                },
+                "volumeMounts": [
+                  {
+                    "mountPath": "/logs",
+                    "name": "volume",
+                    "subPath": "submarine-tensorboard/xgboost-dist-mnist"
+                  }
+                ]
+              }
+            ],
+            "volumes": [
+              {
+                "name": "volume",
+                "persistentVolumeClaim": {
+                  "claimName": "submarine-tensorboard-pvc"
+                }
+              }
+            ]
+          }
+        },
+        "restartPolicy": "OnFailure"
+      }
+    },
+    "backoffLimit": 3
+  },
+  "status": {
+    "conditions": [
+      {
+        "lastTransitionTime": "2022-07-30T11:51:39Z",
+        "lastUpdateTime": "2022-07-30T11:51:39Z",
+        "message": "xgboostJob experiment-1659181695811-0001 is created.",
+        "reason": "XGBoostJobCreated",
+        "status": "True",
+        "type": "Created"
+      },
+      {
+        "lastTransitionTime": "2022-07-30T11:51:40Z",
+        "lastUpdateTime": "2022-07-30T11:51:40Z",
+        "message": "xgboostJob experiment-1659181695811-0001 is running.",
+        "reason": "XGBoostJobRunning",
+        "status": "True",
+        "type": "Running"
+      }
+    ],
+    "replicaStatuses": {
+      "PS": {
+        "active": 1
+      },
+      "Worker": {
+        "active": 2
+      }
+    }
+  }
+}
diff --git a/submarine-server/server-submitter/submitter-k8s/src/test/resources/db/experiment.sql b/submarine-server/server-submitter/submitter-k8s/src/test/resources/db/experiment.sql
new file mode 100644
index 0000000..128e48a
--- /dev/null
+++ b/submarine-server/server-submitter/submitter-k8s/src/test/resources/db/experiment.sql
@@ -0,0 +1,38 @@
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--    http://www.apache.org/licenses/LICENSE-2.0
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+CREATE TABLE if not exists experiment
+(
+    id                varchar(64) primary key,
+    experiment_spec   text,
+    create_by         varchar(32),
+    create_time       datetime,
+    update_by         varchar(32),
+    update_time       datetime,
+    experiment_status varchar(20),
+    accepted_time     datetime,
+    running_time      datetime,
+    finished_time     datetime,
+    uid               varchar(64)
+    );
+
+CREATE TABLE if not exists environment
+(
+    id               varchar(64) primary key,
+    environment_name varchar(255),
+    environment_spec text,
+    create_by        varchar(32),
+    create_time      datetime,
+    update_by        varchar(32),
+    update_time      datetime
+    );
diff --git a/submarine-server/server-submitter/submitter-k8s/src/test/resources/db/notebook.sql b/submarine-server/server-submitter/submitter-k8s/src/test/resources/db/notebook.sql
new file mode 100644
index 0000000..df5b7be
--- /dev/null
+++ b/submarine-server/server-submitter/submitter-k8s/src/test/resources/db/notebook.sql
@@ -0,0 +1,27 @@
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--    http://www.apache.org/licenses/LICENSE-2.0
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+CREATE TABLE if not exists environment
+(
+    id               varchar(64) primary key,
+    environment_name varchar(255),
+    environment_spec text,
+    create_by        varchar(32),
+    create_time      datetime,
+    update_by        varchar(32),
+    update_time      datetime
+    );
+
+DELETE FROM environment WHERE id='environment_1600862964725_0001';
+INSERT INTO environment VALUES ('environment_1600862964725_0001', 'my-submarine-env', '{"name":"my-submarine-env","dockerImage":"apache/submarine:jupyter-notebook-x.x.x","kernelSpec":{"name":"submarine_jupyter_py3","channels":["defaults"],"condaDependencies":[],"pipDependencies":[]}}', 'admin', now(), 'admin', now());
+