blob: 779bddfa0748d38cb6eea4e9cafb0e31efbda02e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.submarine.server.k8s.agent;
import io.fabric8.kubernetes.api.model.ObjectMeta;
import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
import io.fabric8.kubernetes.api.model.OwnerReferenceBuilder;
import io.fabric8.kubernetes.api.model.apiextensions.v1.CustomResourceDefinition;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.server.mock.KubernetesServer;
import io.javaoperatorsdk.operator.Operator;
import org.apache.ibatis.session.SqlSession;
import org.apache.submarine.commons.utils.SubmarineConfiguration;
import org.apache.submarine.server.database.experiment.entity.ExperimentEntity;
import org.apache.submarine.server.database.experiment.mappers.ExperimentMapper;
import org.apache.submarine.server.database.notebook.entity.NotebookEntity;
import org.apache.submarine.server.database.notebook.mappers.NotebookMapper;
import org.apache.submarine.server.database.utils.MyBatisUtil;
import org.apache.submarine.server.k8s.agent.model.notebook.NotebookResource;
import org.apache.submarine.server.k8s.agent.model.notebook.status.NotebookCondition;
import org.apache.submarine.server.k8s.agent.model.notebook.status.NotebookStatus;
import org.apache.submarine.server.k8s.agent.model.training.resource.PyTorchJob;
import org.apache.submarine.server.k8s.agent.model.training.resource.TFJob;
import org.apache.submarine.server.k8s.agent.model.training.resource.XGBoostJob;
import org.apache.submarine.server.k8s.agent.model.training.status.JobCondition;
import org.apache.submarine.server.k8s.agent.model.training.status.JobStatus;
import org.apache.submarine.server.k8s.agent.model.training.status.ReplicaStatus;
import org.apache.submarine.server.k8s.agent.reconciler.NotebookReconciler;
import org.apache.submarine.server.k8s.agent.reconciler.PyTorchJobReconciler;
import org.apache.submarine.server.k8s.agent.reconciler.TFJobReconciler;
import org.apache.submarine.server.k8s.agent.reconciler.XGBoostJobReconciler;
import org.apache.submarine.server.k8s.utils.OwnerReferenceConfig;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
public class SubmitSubmarineAgentTest {
private static final Logger LOGGER = LoggerFactory.getLogger(SubmitSubmarineAgentTest.class);
@ClassRule
public static KubernetesServer server = new KubernetesServer(true, true);
private static KubernetesClient client;
private static Operator operator;
private static final SubmarineConfiguration conf = SubmarineConfiguration.getInstance();
private static final String H2_JDBC_URL = "jdbc:h2:mem:submarine-test;MODE=MYSQL;DB_CLOSE_DELAY=-1";
private static final String H2_JDBC_DRIVERCLASS = "org.h2.Driver";
private static final String H2_JDBC_USERNAME = "root";
private static final String H2_JDBC_PASSWORD = "";
@BeforeClass
public static void beforeInit() {
// setup h2 database
conf.setJdbcUrl(H2_JDBC_URL);
conf.setJdbcDriverClassName(H2_JDBC_DRIVERCLASS);
conf.setJdbcUserName(H2_JDBC_USERNAME);
conf.setJdbcPassword(H2_JDBC_PASSWORD);
try (Connection conn = DriverManager.getConnection(H2_JDBC_URL,
H2_JDBC_USERNAME, H2_JDBC_PASSWORD);
Statement stmt = conn.createStatement()) {
stmt.execute("RUNSCRIPT FROM 'classpath:/db/agent-init.sql'");
} catch (SQLException e) {
e.printStackTrace();
}
// set client and operator
client = server.getClient();
operator = new Operator(client, null);
// create notbook resource
client.getKubernetesSerialization().registerKubernetesResource("apiextensions.k8s.io/v1","Notebook", NotebookResource.class);
CustomResourceDefinition notebookCrd = client
.apiextensions().v1()
.customResourceDefinitions()
.load(SubmitSubmarineAgentTest.class.getResourceAsStream("/custom-resources/notebook.yml"))
.item();
LOGGER.info("Create Notebook CRD ...");
client.apiextensions().v1().customResourceDefinitions().createOrReplace(notebookCrd);
// create tf resource
client.getKubernetesSerialization().registerKubernetesResource("apiextensions.k8s.io/v1", "TFJob", TFJob.class);
CustomResourceDefinition tfCrd = client
.apiextensions().v1()
.customResourceDefinitions()
.load(SubmitSubmarineAgentTest.class.getResourceAsStream("/custom-resources/tfjobs.yaml"))
.item();
LOGGER.info("Create TF CRD ...");
client.apiextensions().v1().customResourceDefinitions().create(tfCrd);
// create pytorch resource
client.getKubernetesSerialization().registerKubernetesResource("apiextensions.k8s.io/v1", "PyTorchJob", PyTorchJob.class);
CustomResourceDefinition ptCrd = client
.apiextensions().v1()
.customResourceDefinitions()
.load(SubmitSubmarineAgentTest.class.getResourceAsStream("/custom-resources/pytorchjobs.yaml"))
.item();
LOGGER.info("Create PyTorch CRD ...");
client.apiextensions().v1().customResourceDefinitions().create(ptCrd);
// create xgboost resource
client.getKubernetesSerialization().registerKubernetesResource("apiextensions.k8s.io/v1", "XGBoostJob", XGBoostJob.class);
CustomResourceDefinition xgbCrd = client
.apiextensions().v1()
.customResourceDefinitions()
.load(SubmitSubmarineAgentTest.class.getResourceAsStream("/custom-resources/xgboostjobs.yaml"))
.item();
LOGGER.info("Create XGBoost CRD ...");
client.apiextensions().v1().customResourceDefinitions().create(xgbCrd);
// add reconcilers to listen custom resources
operator.register(new NotebookReconciler());
operator.register(new TFJobReconciler());
operator.register(new PyTorchJobReconciler());
operator.register(new XGBoostJobReconciler());
// start operator
operator.start();
}
@Test
public void testTfJobAgent() throws InterruptedException {
// add notebook
JobStatus status = new JobStatus();
JobCondition condition = new JobCondition();
condition.setMessage("TFJob test/experiment-1659167632755-0001 is running.");
condition.setReason("TFJobRunning");
condition.setStatus("True");
condition.setType("Running");
condition.setLastTransitionTime(LocalDateTime.now().atZone(ZoneOffset.UTC).toString());
condition.setLastUpdateTime(LocalDateTime.now().atZone(ZoneOffset.UTC).toString());
status.setConditions(List.of(condition));
status.setReplicaStatuses(Map.of("PS", new ReplicaStatus(1, 0, 0),
"Worker", new ReplicaStatus(1, 0, 0)));
ObjectMeta meta = new ObjectMetaBuilder()
.withName("experiment-1659167632755-0001")
.withNamespace(client.getNamespace())
.withLabels(Map.of("submarine-experiment-name", "test-tfjob"))
.addToOwnerReferences(new OwnerReferenceBuilder()
.withUid(OwnerReferenceConfig.getSubmarineUid())
.withApiVersion(OwnerReferenceConfig.DEFAULT_SUBMARINE_APIVERSION)
.withKind(OwnerReferenceConfig.DEFAULT_SUBMARINE_KIND)
.build())
.build();
TFJob resource = new TFJob();
resource.setMetadata(meta);
resource.setStatus(status);
client.resource(resource).create();
client.resource(resource).updateStatus();
// left 5s to process
Thread.sleep(TimeUnit.SECONDS.toMillis(5));
// check status have changed
try (SqlSession sqlSession = MyBatisUtil.getSqlSession()) {
ExperimentMapper mapper = sqlSession.getMapper(ExperimentMapper.class);
ExperimentEntity tfjob = mapper.select("experiment-1659167632755-0001");
Assert.assertEquals("Running", tfjob.getExperimentStatus());
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
throw e;
}
}
@Test
public void testPytorchJobAgent() throws InterruptedException {
// add notebook
JobStatus status = new JobStatus();
JobCondition condition = new JobCondition();
condition.setMessage("PytorchJob test/experiment-1659167632755-0002 is running.");
condition.setReason("PytorchJobRunning");
condition.setStatus("True");
condition.setType("Running");
condition.setLastTransitionTime(LocalDateTime.now().atZone(ZoneOffset.UTC).toString());
condition.setLastUpdateTime(LocalDateTime.now().atZone(ZoneOffset.UTC).toString());
status.setConditions(List.of(condition));
ObjectMeta meta = new ObjectMetaBuilder()
.withName("experiment-1659167632755-0002")
.withNamespace(client.getNamespace())
.withLabels(Map.of("submarine-experiment-name", "test-pytorchjob"))
.addToOwnerReferences(new OwnerReferenceBuilder()
.withUid(OwnerReferenceConfig.getSubmarineUid())
.withApiVersion(OwnerReferenceConfig.DEFAULT_SUBMARINE_APIVERSION)
.withKind(OwnerReferenceConfig.DEFAULT_SUBMARINE_KIND)
.build())
.build();
PyTorchJob resource = new PyTorchJob();
resource.setMetadata(meta);
resource.setStatus(status);
client.resource(resource).create();
client.resource(resource).updateStatus();
// left 5s to process
Thread.sleep(TimeUnit.SECONDS.toMillis(5));
// check status have changed
try (SqlSession sqlSession = MyBatisUtil.getSqlSession()) {
ExperimentMapper mapper = sqlSession.getMapper(ExperimentMapper.class);
ExperimentEntity tfjob = mapper.select("experiment-1659167632755-0002");
Assert.assertEquals("Running", tfjob.getExperimentStatus());
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
throw e;
}
}
@Test
public void testXGBoostJobAgent() throws InterruptedException {
// add notebook
JobStatus status = new JobStatus();
JobCondition condition = new JobCondition();
condition.setMessage("XGBoostJob test/experiment-1659167632755-0003 is running.");
condition.setReason("XGBoostJobRunning");
condition.setStatus("True");
condition.setType("Running");
condition.setLastTransitionTime(LocalDateTime.now().atZone(ZoneOffset.UTC).toString());
condition.setLastUpdateTime(LocalDateTime.now().atZone(ZoneOffset.UTC).toString());
status.setConditions(List.of(condition));
ObjectMeta meta = new ObjectMetaBuilder()
.withName("experiment-1659167632755-0003")
.withNamespace(client.getNamespace())
.withLabels(Map.of("submarine-experiment-name", "test-xgboostjob"))
.addToOwnerReferences(new OwnerReferenceBuilder()
.withUid(OwnerReferenceConfig.getSubmarineUid())
.withApiVersion(OwnerReferenceConfig.DEFAULT_SUBMARINE_APIVERSION)
.withKind(OwnerReferenceConfig.DEFAULT_SUBMARINE_KIND)
.build())
.build();
XGBoostJob resource = new XGBoostJob();
resource.setMetadata(meta);
resource.setStatus(status);
client.resource(resource).create();
client.resource(resource).updateStatus();
// left 5s to process
Thread.sleep(TimeUnit.SECONDS.toMillis(5));
// check status have changed
try (SqlSession sqlSession = MyBatisUtil.getSqlSession()) {
ExperimentMapper mapper = sqlSession.getMapper(ExperimentMapper.class);
ExperimentEntity tfjob = mapper.select("experiment-1659167632755-0003");
Assert.assertEquals("Running", tfjob.getExperimentStatus());
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
throw e;
}
}
/**
* This can test notebook-controller 1.4.0
*/
@Test
public void testNotebookAgent() throws InterruptedException {
// add notebook
NotebookStatus status = new NotebookStatus();
status.setReadyReplicas(1);
NotebookCondition condition = new NotebookCondition();
condition.setType("Running");
condition.setLastProbeTime(LocalDateTime.now().atZone(ZoneOffset.UTC).toString());
status.setConditions(List.of(condition));
ObjectMeta meta = new ObjectMetaBuilder()
.withName("notebook-1642402491519-0001-test-notebook")
.withNamespace(client.getNamespace())
.withLabels(Map.of("notebook-id", "notebook_1642402491519_0001",
"notebook-owner-id", "e9ca23d68d884d4ebb19d07889727dae"))
.addToOwnerReferences(new OwnerReferenceBuilder()
.withUid(OwnerReferenceConfig.getSubmarineUid())
.withApiVersion(OwnerReferenceConfig.DEFAULT_SUBMARINE_APIVERSION)
.withKind(OwnerReferenceConfig.DEFAULT_SUBMARINE_KIND)
.build())
.build();
NotebookResource resource = new NotebookResource();
resource.setMetadata(meta);
resource.setStatus(status);
client.resource(resource).create();
client.resource(resource).updateStatus();
// left 5s to process
Thread.sleep(TimeUnit.SECONDS.toMillis(5));
// check status have changed
try (SqlSession sqlSession = MyBatisUtil.getSqlSession()) {
NotebookMapper mapper = sqlSession.getMapper(NotebookMapper.class);
NotebookEntity notebook = mapper.select("notebook_1642402491519_0001");
Assert.assertEquals("running", notebook.getNotebookStatus());
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
throw e;
}
}
/**
* This can test notebook-controller 1.7.0
*/
@Test
public void testNotebookAgentNewConditions() throws InterruptedException {
// add notebook
NotebookStatus status = new NotebookStatus();
status.setReadyReplicas(1);
String probeTime = LocalDateTime.now().atZone(ZoneOffset.UTC).toString();
NotebookCondition condition1 = new NotebookCondition();
condition1.setType("Initialized");
condition1.setLastProbeTime(probeTime);
condition1.setLastTransitionTime(LocalDateTime.now().atZone(ZoneOffset.UTC).toString());
NotebookCondition condition2 = new NotebookCondition();
condition2.setType("Ready");
condition2.setLastProbeTime(probeTime);
condition2.setLastTransitionTime(LocalDateTime.now().atZone(ZoneOffset.UTC).toString());
NotebookCondition condition3 = new NotebookCondition();
condition3.setType("ContainersReady");
condition3.setLastProbeTime(probeTime);
condition3.setLastTransitionTime(LocalDateTime.now().atZone(ZoneOffset.UTC).toString());
status.setConditions(List.of(condition1, condition2, condition3));
ObjectMeta meta = new ObjectMetaBuilder()
.withName("notebook-1642402491519-0002-test-notebook")
.withNamespace(client.getNamespace())
.withLabels(Map.of("notebook-id", "notebook_1642402491519_0002",
"notebook-owner-id", "e9ca23d68d884d4ebb19d07889727dae"))
.addToOwnerReferences(new OwnerReferenceBuilder()
.withUid(OwnerReferenceConfig.getSubmarineUid())
.withApiVersion(OwnerReferenceConfig.DEFAULT_SUBMARINE_APIVERSION)
.withKind(OwnerReferenceConfig.DEFAULT_SUBMARINE_KIND)
.build())
.build();
NotebookResource resource = new NotebookResource();
resource.setMetadata(meta);
resource.setStatus(status);
client.resource(resource).create();
client.resource(resource).updateStatus();
// left 5s to process
Thread.sleep(TimeUnit.SECONDS.toMillis(5));
// check status have changed
try (SqlSession sqlSession = MyBatisUtil.getSqlSession()) {
NotebookMapper mapper = sqlSession.getMapper(NotebookMapper.class);
NotebookEntity notebook = mapper.select("notebook_1642402491519_0002");
Assert.assertEquals("running", notebook.getNotebookStatus());
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
throw e;
}
}
@AfterClass
public static void close() {
operator.stop();
client.close();
}
}